import std.datetime;
import std.stdio;
import std.cstream;
import std.concurrency;
import std.algorithm;
import std.array;
import std.format;
import std.typetuple;
import std.math;
import std.random;
import core.thread;
import std.container;
import std.conv;
import std.exception;

import Queue_Examples;
import PrettyPrint;

struct Sender_Done 
{
  uint sender_id;
}

struct Receiver_Done {}

struct Threads_Ready {}

shared static double[] shared_latency;    



template Thread_Functions(Q) 
{
  alias Queues!(Q.Data_Type) QT;
  
  void send_data(in uint sender_id, in uint how_many, uint us_delay, uint us_jitter, 
		 Tid ownerTid, Tid receiverTid) 
  {
    debug (2) { writefln("Entering sender %s with us_delay=%s and us_jitter=%s",sender_id,us_delay,us_jitter); }
    Q.set_receiver_Tid(receiverTid);
    int usj = cast(int)us_jitter; // uniform(-x,x) hangs when x is uint
    //receive((Threads_Ready td) {});
    foreach(k; 0..how_many) {
      QT.Package p;
      p.sender_sequence = k;
      p.sender_id = sender_id;
      p.pTime = systime();
      debug (5) { writefln("sending: %s",QT.package_tostring(p)); }
      debug (5) { writefln("calling Enqueue"); }
      Q.Enqueue(p);
      if (us_delay > 0) {
	int next_sleep_us = us_delay;//-usj+((usj > 0) ? uniform(0,2*usj) : 0);
	debug (8) { writefln("next_sleep_us=%s",next_sleep_us); }
	Thread.sleep(10*next_sleep_us);
      }
    }
    debug (2) { writefln("Finished sender %s",sender_id); }
    Sender_Done sd;
    sd.sender_id = sender_id;
    send(ownerTid, sd);
  }
  
  void receive_data(in int num_to_be_received, Tid ownerTid) {
    debug (2) { printf("Entering receive\n"); }
    Array!double latency;
    latency.length(num_to_be_received);
    int received = 0;
    QT.Package p;
    //receive((Threads_Ready td) {});
    while (received < num_to_be_received)  {
      if (Q.Dequeue(p)) {
	debug (5) { printf("Rec'd: (pkt %i) %.*s\n",received,QT.package_tostring(p)); }
	Ticks rTime = systime(); 
	process_package(p,received,rTime,latency);
	++received;
      }
      else {
	debug (7) { printf("Empty Queue (rec'd %i of %i)\n",received,num_to_be_received); }
      }
    }
    // copy data into shared space
    shared_latency.length = latency.length;
    foreach (k;0..latency.length)
      shared_latency[k] = latency[k]; 

    debug (2) { writefln("Finished receiver"); }
    Receiver_Done rd;
    send(ownerTid,rd);
  }
  
  void process_package(in QT.Package p, in uint receive_number, in Ticks rTime, Array!double latency) 
  {
    double elapsed_us = (rTime - p.pTime).toMicroseconds!(double)();
    debug (5) writefln("Elapsed: %f us",elapsed_us);
    latency[receive_number] = elapsed_us;
  }
}

struct Test_Parameters 
{
  this(uint hms, uint pps, uint apps, uint mv) 
  {
    how_many_senders = hms;
    packets_per_sender = pps;
    average_packets_per_s = apps;
    microsecond_variability = mv;
  } 

  uint how_many_senders;
  uint packets_per_sender;
  uint average_packets_per_s;
  uint microsecond_variability;
}



template Queue_Test_Functions(Q) 
{
  alias Queues!(Q.Data_Type) QT;
  alias Q Queue_Type;


  double[] run_test(in Test_Parameters tp) 
  {
    auto mainTid = thisTid;
    const uint total_packets = tp.how_many_senders * tp.packets_per_sender;
    double[] latency;
    latency.length = total_packets;
    
    Q.initialize();
    debug (5) { printf("created/initialized Q.\n"); }  
    
    uint us_delay = 0;
    if (tp.average_packets_per_s > 0)
      us_delay = cast(uint)(1000000*((cast(double)tp.how_many_senders)/tp.average_packets_per_s));

    uint us_jitter = min(us_delay,tp.microsecond_variability);

    //    Random gen;
    
    alias Thread_Functions!(Q) TF; 
 
    auto rTid = spawn(&TF.receive_data,total_packets,mainTid);

    foreach (k; 0..tp.how_many_senders) {
      auto sTid = spawn(&TF.send_data,k,tp.packets_per_sender,us_delay,us_jitter,mainTid,rTid);
    }


    // all threads started 
    // now wait until they are finished
    
    
    foreach (k; 0..tp.how_many_senders) 
      receive((Sender_Done sd) { debug (2) { writefln("Sender %s sent finished signal.",sd.sender_id); } });
    
    receive((Receiver_Done rd) { debug(2) { writefln("Receiver sent finished signal."); } });
    
    foreach (k; 0..total_packets)
      latency[k] = shared_latency[k];

    return latency;
  }
}

struct Latency_Analysis
{
  uint N;
  double avg,maximum,minimum,std_dev,avg_min,avg_max;
}

Latency_Analysis analyze_latencies(ref double[] latency_data) 
{
  Latency_Analysis la;
  la.N = latency_data.length;
 
  //sorted here so any autocorrelation analysis must take place before 
  sort(latency_data); 
  la.maximum = latency_data[la.N-1];
  la.minimum = latency_data[0];
  
  auto sum = reduce!("a+b")(0.0,latency_data);
  
  la.avg = sum/la.N;
  
  double f(double a, double b) { 
    double q2 = (b-la.avg)*(b-la.avg);
    return a+q2; 
  }
  auto sd = reduce!(f)(0.0,latency_data); 
  
  la.std_dev = sqrt(sd/la.N);
  
  int pct = max(la.N/100,1); // how many in 1% ?  Must be at least 1.
  double[] smallest = latency_data[0..pct]; 
  double[] largest = latency_data[(la.N-pct)..$];
  la.avg_min = reduce!("a+b")(0.0,smallest)/pct;
  la.avg_max = reduce!("a+b")(0.0,largest)/pct;
  return la;
  //printf("%.*s: avg latency=%f us\n",Q.short_name(),avg);
}


struct Bin_Data
{
  double left_edge;
  double count;
  double CDF;
}


void make_binned_data(alias map_function=(x) { return x; })
  (in int queue_number, in uint bins, in double[] latencies, ref Bin_Data[] binned_latencies, bool density=true)
  if (is(typeof(map_function(latencies[0]) == latencies[0])==bool)) // check that map_function sig is "double f(double)"
{
  auto bin_width = map_function(latencies[latencies.length-1])/(bins-1);
  enforce(bin_width > 0);
  foreach (k; 0..bins) {
    binned_latencies[k].left_edge=k*bin_width; // left edge of bin
    binned_latencies[k].count = 0; // initialize count
  }
  
  foreach (k; 0..latencies.length) {
    int bin = cast(int)rndtol(floor(map_function(latencies[k])/bin_width));
    binned_latencies[bin].count++;
  }

  double CDF = 0;
  const double norm = 1.0/latencies.length;
  foreach (k; 0..bins) {
    double DF = norm*binned_latencies[k].count;
    CDF += DF;
    binned_latencies[k].CDF = CDF;
    if (density)
      binned_latencies[k].count = DF;
  }
}

// pretty printing helpers

string test_info(in Test_Parameters tp) { 
  auto writer = appender!string();
  if (tp.average_packets_per_s > 0)
    formattedWrite(writer,"%s producer(s) each producing %s packets at %s pkts/s (total) with %s us jitter:",tp.how_many_senders,tp.packets_per_sender,tp.average_packets_per_s,tp.microsecond_variability);
  else
    formattedWrite(writer,"%s producer(s) each producing %s packets at max rate with no jitter:",tp.how_many_senders,tp.packets_per_sender,tp.average_packets_per_s,tp.microsecond_variability);
  return writer.data;
}


immutable int[] widths = [11,10,10,10,10,10,10,10];
immutable string[] hformats = ["%s","%s","%s","%s","%s","%s","%s","%s"];
immutable string[] dformats = ["%s","%.3g","%.2g","%.2g","%.2g","%.2g","%.2g","%.2g"];

string run_info_header() 
{
    
  Justify djs[8] = Justify.R; // right justify number headers
  djs[0] = Justify.C; // center Queue header

  return 
    FormattedPrint(widths,2,Justify.C,hformats,"Queue","Tot s","avg","sd","min","max","<min>","<max>") 
    ~ "\n" ~ 
    FormattedPrintFill(widths,2,"_");
}

string run_info(Q)(in Latency_Analysis la, in double run_s) 
{
  Justify djs[8] = Justify.R; // right justify numbers
  djs[0] = Justify.L; // left justify Queue name

  return FormattedPrint(widths,2,djs,dformats,
			Q.short_name,run_s,la.avg,la.std_dev,la.minimum,la.maximum,la.avg_min,la.avg_max);
}

// utility to decorate list of Queue names and make into TypeTuple for looping
string makeQueueTypeTuple(in string[] Queue_Types) 
{
  string result = "alias TypeTuple!(";
  foreach (k; 0..Queue_Types.length) {
    if (k>0) { result ~= ","; }
    result ~= "QT." ~ Queue_Types[k];
  }
  result ~= ") Queue_Types;";
  return result;
}


struct small_message 
{
  double[10] numbers;
  char[10] field1;
  char[10] field2;
}

void main() 
{
  scope (success) { writefln("Run Complete."); }
  
  // data packages to test
  alias TypeTuple!(small_message) Package_Types;
 
  immutable int bins_for_output = 100;
  bool write_data_files = false;

  // Queues to test
  immutable string[] queue_types = [
				    //"CSync_LL_FIFO_TA",
				    //"CSync_LL_FIFO_FL",
				    "ISync_LL_FIFO_TA",
                                    "ISync_LL_FIFO_FL",
                                    "FIFO_LinkedList_LockFree_Queue",
                                    "MP_FIFO"
                                    ];

  const int NQs = queue_types.length;  // this is annoying since it needs to be synced with the above.  How to fix?

  Bin_Data[] binned_latencies;  
  binned_latencies.length = bins_for_output;

  // tests to run
  Test_Parameters[] tests;
  
  tests ~= Test_Parameters(1,10,0,0);
  tests ~= Test_Parameters(4,10,0,0);
  tests ~= Test_Parameters(4,100,100,0);
  
  tests ~= Test_Parameters(4,1000,1000,0);
  
  tests ~= Test_Parameters(4,5000,5000,0);
  tests ~= Test_Parameters(4,50000,50000,0);
  tests ~= Test_Parameters(4,50000,100000,0);
  
  //tests ~= Test_Parameters(4,500000,100000,0);
  //tests ~= Test_Parameters(6,500000,100000,0);
  //tests ~= Test_Parameters(8,500000,100000,0);
  

  setbuf(dout.file,null); // unbuffer stdout so we see output as it occurs
  
  writefln("\nAll times in microseconds unless otherwise specified.");
  writefln("<min> and <max> are averages over fastest and slowest 1%% respectively.");
  writefln("");
  


  bool first_time_through = true;
  foreach (package_type; Package_Types) {
    alias Queues!(package_type) QT;
 
    mixin(makeQueueTypeTuple(queue_types)); // inject Queue_Types

    // write legend 
    if (first_time_through) {
      foreach (queue_type; Queue_Types)
	writefln("%s = %s",queue_type.short_name,queue_type.description);
      writefln("");
      
      first_time_through = false;
    }

    auto pkg_desc = appender!string;
    
    formattedWrite(pkg_desc,"Data package is dummy payload, a timestamp, sender ID and sequence #.  Payload is %s (Packet size=%s bytes).\n",package_type.stringof,QT.Package.sizeof);
    writefln("%s",pkg_desc.data);
    
    int test_number = 1;
    foreach (tp; tests) {
      writefln("%s\n",test_info(tp));
      writefln("%s",run_info_header());
 
      int queue_number = 1;

      foreach (queue_type; Queue_Types) {


	alias Queue_Test_Functions!(queue_type) QTF;
	auto start_time = systime();
	auto latencies = QTF.run_test(tp);
	auto end_time = systime();
	double run_s = (end_time - start_time).toSeconds!double();

	auto la = analyze_latencies(latencies);

	if (write_data_files) { 
	  string test_output_file = "T" ~ text(test_number) ~ "_Q" ~ text(queue_number) ~ ".dat";
	  auto dat_f = File(test_output_file,"w");
	  dat_f.writefln("# %s",pkg_desc.data);
	  dat_f.writefln("# %s",test_info(tp));
	  dat_f.writefln("# %s",queue_type.description);
	  dat_f.writefln("#val\tfraction\tCDF");	
	  
	  make_binned_data!((x) { return log(x); })(queue_number,bins_for_output, latencies, binned_latencies, true);
	  
	  foreach (k; 0..bins_for_output)
	    dat_f.writefln("%s\t%s\t%s",
			   binned_latencies[k].left_edge,
			   binned_latencies[k].count,
			   binned_latencies[k].CDF);
	  dat_f.close();
	}

	writefln("%s",run_info!queue_type(la,run_s));
	queue_number++;
      }
      test_number++;
      writefln("");
    }
  }
}