May 13, 2015
On 05/12/2015 08:19 PM, thedeemon wrote:

> In case of Python's parallel.Pool() separate processes do the
> work without any synchronization issues. In case of D's
> std.parallelism it's just threads inside one process and they
> do fight for some locks, thus this result.

Right. To do the same in D, one must use fibers. Here is a draft of my understanding of them:

  http://ddili.org/ders/d.en/fibers.html

As noted there as well, for highest performance with fibers, one likely needs to use an M:N threading model.

Ali

May 13, 2015
On Wednesday, 13 May 2015 at 06:59:02 UTC, Ali Çehreli wrote:
> > In case of Python's parallel.Pool() separate processes do the
> > work without any synchronization issues. In case of D's
> > std.parallelism it's just threads inside one process and they
> > do fight for some locks, thus this result.
>
> Right. To do the same in D, one must use fibers.

No, to do the same one must use separate OS processes. Fibers won't help you against parallel threads fighting for GC & IO locks.
May 13, 2015
On Wednesday, 13 May 2015 at 03:19:17 UTC, thedeemon wrote:
> In case of Python's parallel.Pool() separate processes do the work without any synchronization issues. In case of D's std.parallelism it's just threads inside one process and they do fight for some locks, thus this result.

Okay, so to do something equivalent I would need to use std.process. My next question is how to pass the common data to the sub-processes. In the Python approach I guess this is automatically looked after by pickling serialization. Is there something similar in D? Alternatively, would the use of std.mmfile to temporarily store the common data be a reasonable approach?
May 13, 2015
On 13/05/2015 2:59 a.m., Gerald Jansen wrote:
> I am a data analyst trying to learn enough D to decide whether to use D
> for a  new project rather than Python + Fortran. I have recoded a
> non-trivial Python program to do some simple parallel data processing
> (using the map function in Python's multiprocessing module and parallel
> foreach in D). I was very happy that my D version ran considerably
> faster that Python version when running a single job but was soon
> dismayed to find that the performance of my D version deteriorates
> rapidly beyond a handful of jobs whereas the time for the Python version
> increases linearly with the number of jobs per cpu core.
>
> The server has 4 quad-core Xeons and abundant memory compared to my
> needs for this task even though there are several million records in
> each dataset. The basic structure of the D program is:
>
> import std.parallelism; // and other modules
> function main()
> {
>      // ...
>      // read common data and store in arrays
>      // ...
>      foreach (job; parallel(jobs, 1)) {
>          runJob(job, arr1, arr2.dup);
>      }
> }
> function runJob(string job, in int[] arr1, int[] arr2)
> {
>      // read file of job specific data file and modify arr2 copy
>      // write job specific output data file
> }
>
> The output of /usr/bin/time is as follows:
>
> Lang Jobs    User  System  Elapsed %CPU
> Py      1   45.17    1.44  0:46.65   99
> D       1    8.44    1.17  0:09.24  104
>
> Py      2   79.24    2.16  0:48.90  166
> D       2   19.41   10.14  0:17.96  164
>
> Py     30 1255.17   58.38  2:39.54  823 * Pool(12)
> D      30  421.61 4565.97  6:33.73 1241
>
> (Note that the Python program was somewhat optimized with numpy
> vectorization and a bit of numba jit compilation.)
>
> The system time varies widely between repititions for D with multiple
> jobs (eg. from 3.8 to 21.5 seconds for 2 jobs).
>
> Clearly simple my approach with parallel foreach has some problem(s).
> Any suggestions?
>
> Gerald Jansen

I managed to rewrite most of the core IO part to use ranges.
However runTrait I could not rewrite just by reading it. To do so, you would focus on the line being read instead of the trait.
There are many lines of input data. Very few traits.
There is a couple more places where IO is being performed in runTrait. Again turn them into ranges like I've done.

But more importantly, split up the functions that performs the logic in runTrait as much as possible. Small pieces of code can be optimized easier then large blobs.

Anyway that is my attempt at getting it faster.

/**
Assign unknown parent groups (UPG) - trait specific version

UPG defined in 5-year intervals for sires and dams of animals
with a domestic or foreign animal ID (eg. IT vs XX).
*/
// History:
// 2015.05.12 GJ - original version in D

/*import std.stdio, std.path, std.conv, std.string, std.datetime;
import std.algorithm, std.parallelism;
import std.math: ceil;
import core.memory : GC;*/

struct ControlVars // TODO: read this from a control file
{
	import std.stdio : File;
	
	File log;
    string osmdata = ".";
    int ped_cycles = 2;
    int ped_cc_start = 3;
    string ped_country = "IT";
    int ped_cutoff = 1980;
    int ped_upg_size = 50;
    bool dereg_bulls_only = false;
}


//------------------------------------------------------------------------------;
void main() {
    /**
     pedupg main - store pednum.csv and process traits in parallel;
    */
	
	import std.stdio : stdout, File;
	import std.path : buildPath;
	import std.parallelism : parallel;
	import core.memory : GC;
	
	
	ControlVars g = ControlVars(/*stdout*/File("out.txt", "w"));

    GC.disable;
    GC.reserve(1024 * 1024 * 1024);

    datedMessage("pedupg started ...", g.log);
	
    auto suffix = "";
    auto logfile = buildPath(g.osmdata, "log", "pedupg" ~ suffix ~ ".log");

    int nPed = 1 + getPednumMax(g.osmdata);
    g.log.writefln("%8d nPed (i.e. pednum_max + 1)", nPed);

	// read and store pedigree info
	FileReader fileReader = FileReader(buildPath(g.osmdata, "wrk", "pednum.csv"), g);
	
	// TODO: readd
    //log.writefln("%8d records read from %s", j, pednum);

	
	auto traits = getRunTraits(g);
	foreach(fl; fileReader.parallel) {
		import std.stdio : writeln;
		import std.conv : text;
		g.log.write(text(fl) ~ "\n");
	}

    datedMessage("pedupg all done.", g.log);
}

struct FileLine {
	int sire;
	int dam;
	short byear;
	bool ccode;
}

struct FileReader {
	private {
		import std.stdio : File;
		import std.traits : ReturnType;
		
		ControlVars g;
		
		FileLine last;
		bool done;
		
		ReturnType!(File.byLine!()) fileReader;
	}
	
	this(string filename, ControlVars g) {
		this.g = g;
		fileReader = File(filename, "r").byLine;
		popFront;
	}
	
	@property {
		FileLine front() {
			return last;
		}
		
		bool empty() {
			return done;
		}
	}
	
	void popFront() {
		import std.conv : to;
		import std.string : split;
		assert(!done);
		
		auto rec = split(fileReader.front, ',');
        auto col0 = g.ped_cc_start - 1;
		
		last.sire = to!int(rec[1]);
        last.dam = to!int(rec[2]);
        last.byear = to!short(rec[4][0 .. 4]);
        // rec[7] is animID
        last.ccode = (rec[7][col0 .. col0 + 2] == g.ped_country) ? 0 : 1;
		
		fileReader.popFront;
		done = fileReader.empty;
	}
}

/*void runTrait(in string trt, in string suffix, in int nPed, int[] sire,
              int[] dam, in short[] byear, in byte[] ccode)
{

    auto logfile = g.osmdata~"/log/"~trt~"/pedupg"~suffix~".log";
    auto log = File(logfile, "w");

    datedMessage("pedupg started for trait " ~ trt, log);

    // set up initial counts
    int minYear = g.ped_cutoff;
    int maxYear = 2015; // date.today().year;

    // read and mark animals with EBV (generation 1);
    byte[] gen; gen.length = nPed; gen[0] = -1;

    auto ebvfile = g.osmdata~"/wrk/"~trt~"/ebv"~suffix~".csv";
    auto n = 0, n1 = 0;
    foreach (line; File(ebvfile, "r").byLine()) {
        n++;
        auto rec = split(chomp(line), ",");  // ja,ebv,rel,ori
        if (rec[3] < "3" || !g.dereg_bulls_only) {
            auto ja = to!int(rec[0]);
            if (!gen[ja]) {
                gen[to!int(rec[0])] = 1;
                n1++;
            }
        }
    }
    log.writef("%8d lines read from %s\n", n, ebvfile);
    log.writef("%8d bulls%s with initial rel>0\n", n1,
           (g.dereg_bulls_only) ? "" : " and cows");


    // mark ancestor generations (2 .. ped_cycles+1)) {
    int js, jd, m;
    foreach (i; 1 .. g.ped_cycles + 1) {
        m = 0;
        foreach (ja; 1 .. nPed) {
            if (gen[ja] != i) continue;
            js = sire[ja];
            if (js > 0 && !gen[js]) {
                gen[js] = cast(byte)(i + 1);
                m++;
            }
            jd = dam[ja];
            if (jd > 0 && !gen[jd]) {
                gen[jd] = cast(byte)(i + 1);
                m++;
            }
        }
        log.writef("%8d parents added in generation %d\n", m, i);
        n += m;
    }
    log.writef("%8d animals in initial pedigree\n", nPed);
    log.writef("%8d animals connected to EBV for trait %s\n\n", n, trt);


    // count unknown parents of marked animals by 5-year intervals
    int k1, kMax = 1 + (maxYear + 4 - minYear) / 5; // max year group
    int[][][] counts = new int[][][](2, 2, kMax);
    foreach (ja; 1 .. nPed) {
        if (gen[ja] == 0) continue;
        // set parents of final generation to unknown
        if (gen[ja] == g.ped_cycles + 1) {
            sire[ja] = 0;
            dam[ja] = 0;
        }
        if (!sire[ja]) {
            k1 = yearGroup(byear[ja], minYear, maxYear);
            counts[ccode[ja]][0][k1]++;
        }
        if (!dam[ja]) {
            // -- offset byear by 2 for dam
            k1 = yearGroup(byear[ja] + 2, minYear, maxYear);
            if (ccode[ja]>1 || k1 >= kMax) {
                writeln("ja=",ja," cc=",ccode[ja]," byear=",byear[ja]," k1=",k1);
                return;//(99999);
            }
            counts[ccode[ja]][1][k1]++;
        }
    }
    auto CC = g.ped_country;
    log.writef("starting group counts for trait %s\n", trt);
    log.writef("years      %ssire | years       %sdam", CC, CC);
    log.writef(" | years      XXsire | years       XXdam\n");
    foreach (k; 0 .. kMax) {
        auto years1 = format("%d-%d", (!k) ? 1900 : minYear+5*k-4,
                             minYear+5*k);
        auto years2 = format("%d-%d", (!k) ? 1900 : minYear+5*k-6,
                             (k == kMax - 1) ? maxYear : minYear+5*k-2);
        log.writef("%s %7d | %s %7d | %s %7d | %s %7d\n",
                   years1, counts[0][0][k], years2, counts[0][1][k],
                   years1, counts[1][0][k], years2, counts[1][1][k]);
    }


    // pool small groups and assign final groups;
    int[][][] group = new int[][][](2, 2, kMax);
    struct UpgInfo
    {
        int count, k1, k2, cc, sx;
    }
    UpgInfo[] info;
    UpgInfo u;
    int upg = (-1);
    foreach (cc; 0 .. 2) {              // eg cc: 1="IT", 2="XX")
        foreach (sx; 0 .. 2) {          // 0=sire, 1=dam
            // start a new group for each cc-sex combination;
            upg++;
            u.count = 0; u.k1 = 0; u.cc = cc; u.sx = sx;
            auto remaining = reduce!"a+b"(counts[cc][sx]);
            foreach (k; 0 .. kMax) {    // 5-year byear groups
                //writefln("cc=%s sx=%s k=%s upg=%s rem=%s",cc,sx,k,upg,remaining);
                if (u.count >= g.ped_upg_size && remaining >= g.ped_upg_size) {
                    // okay to start a new group
                    // -- first save info on prevous group
                    info ~= u;
                    upg++;
                    u.count = 0; u.k1 = k; u.cc = cc; u.sx = sx;
                }
                group[cc][sx][k] = upg;
                u.count += counts[cc][sx][k];
                remaining -= counts[cc][sx][k];
                u.k2 = k;
            }
            info ~= u; // save info on last group per cc*sx
        }
    }
    auto nUpg = upg + 1;
    log.writef("\nfinal group definitions and counts for trait %s\n", trt);
    log.writef("| years      %ssire UPG | years       %sdam", CC, CC);
    log.writef(" UPG | years      XXsire UPG | years       XXdam UPG |\n");
    foreach (k; 0 .. kMax) {
        auto years1 = format("%d-%d", (!k) ? 1900 : minYear+5*k-4,
                             minYear+5*k);
        auto years2 = format("%d-%d", (!k) ? 1900 : minYear+5*k-6,
                             (k == kMax - 1) ? maxYear : minYear+5*k-2);
        int g11 = group[0][0][k], g12 = group[0][1][k],
            g21 = group[1][0][k], g22 = group[1][1][k];
        log.writef("| %s %7d %3d | %s %7d %3d | %s %7d %3d | %s %7d %3d |\n",
                   years1, info[g11].count, g11 + 1,
                   years2, info[g12].count, g12 + 1,
                   years1, info[g21].count, g21 + 1,
                   years2, info[g22].count, g22 + 1);
    }
    // write UPG definitions to upgdef.csv file
    auto upglist = g.osmdata~"/wrk/"~trt~"/upglist"~suffix~".csv";
    auto f = File(upglist, "w");
    foreach (i; 0 .. nUpg) {
        u = info[i];
        auto yMin = (!u.k1) ? 1900 : minYear + 5*u.k1 - 2*(u.sx+2);
        auto yMax = (u.k2 == kMax-1) ? maxYear : minYear + 5*u.k2 - 2*(u.sx);
        f.writef("%d,%s%d,%d,%d,%d\n", -(i+1), (u.cc) ? "XX": g.ped_country,
                 u.sx + 1, yMin, yMax, u.count);
    }
    f.close;
    log.writef("\n%8d records written to %s\n", nUpg, upglist);
    log.flush;


    // re-write pednum.csv to pedupg.csv with UPG;
    auto pednum = g.osmdata~"/wrk/pednum"~suffix~".csv";
    auto pedupg = g.osmdata~"/wrk/"~trt~"/pedupg"~suffix~".csv";
    f = File(pedupg, "w");
    int ja = 0;
    foreach (line; File(pednum, "r").byLine()) {
        ja++;
        if (sire[ja] && dam[ja]) {      // neither sire nor dam unknown
            f.write(line);
            continue;
        }
        auto r = split(line, ',');
        js = (sire[ja]) ? sire[ja] :
            -(group[ccode[ja]][0][yearGroup(byear[ja], minYear, maxYear)] + 1);
        jd = (dam[ja]) ? dam[ja] :
            -(group[ccode[ja]][1][yearGroup(byear[ja] + 2, minYear, maxYear)] + 1);
        f.write("%d,%d,%d,%s,%s,%s,%s,%s,%s", ja, js, jd, r[3], r[4], r[5],
                r[6], r[7], r[8]);
    }
    log.writef("%8d records written to %s\n", nPed, pedupg);


    datedMessage("pedupg done.", log);
    return;// (nUpg);
}*/


int yearGroup(in int year, const int minYear, const int maxYear) @safe pure {
    /** Compute five year group code: eg. 0=1900-1980, 1=1981..1985, etc.
     */
    if (year <= minYear) return(0);
    else if (year >= maxYear) return((maxYear + 4 - minYear) / 5);
    else return((year + 4 - minYear) / 5);
}


// some utility functions

void datedMessage(string msg, typeof(ControlVars.log) log) @system {
	import std.stdio : writeln;
	import std.datetime : Clock;
	
	try {
		log.writeln(Clock.currTime.toSimpleString(),"  ", msg);
	} catch (Exception e){
		writeln("ERROR: datedMessage");
	}
}

int getPednumMax(string osmdata) @system {
	import std.file : readText;
	import std.path : buildPath;
	import std.string : strip;
	import std.conv : to;

	return
			buildPath(osmdata, "wrk", "pednum_max")
			.readText
			.strip
			.to!int;
}

string[] getRunTraits(ControlVars g) @system {
	import std.file : readText;
	import std.path : buildPath;
	import std.string : strip, split;
	
	return
			buildPath(g.osmdata, "wrk", "run_traits")
			.readText
			.strip
			.split;
}

May 13, 2015
On Tuesday, 12 May 2015 at 18:14:56 UTC, Gerald Jansen wrote:
> On Tuesday, 12 May 2015 at 16:35:23 UTC, Rikki Cattermole wrote:
>> On 13/05/2015 4:20 a.m., Gerald Jansen wrote:
>>> At the risk of great embarassment ... here's my program:
>>> http://dekoppel.eu/tmp/pedupg.d
>>
>> Would it be possible to give us some example data?
>> I might give it a go to try rewriting it tomorrow.
>
> http://dekoppel.eu/tmp/pedupgLarge.tar.gz (89 Mb)
>
> Contains two largish datasets in a directory structure expected by the program.

I only see 2 traits in that example, so it's hard for anyone to explore your scaling problem, seeing as there are a maximum of 2 tasks.
May 13, 2015
On Wednesday, 13 May 2015 at 09:01:05 UTC, Gerald Jansen wrote:
> On Wednesday, 13 May 2015 at 03:19:17 UTC, thedeemon wrote:
>> In case of Python's parallel.Pool() separate processes do the work without any synchronization issues. In case of D's std.parallelism it's just threads inside one process and they do fight for some locks, thus this result.
>
> Okay, so to do something equivalent I would need to use std.process. My next question is how to pass the common data to the sub-processes. In the Python approach I guess this is automatically looked after by pickling serialization. Is there something similar in D? Alternatively, would the use of std.mmfile to temporarily store the common data be a reasonable approach?

Assuming you're on a POSIX compliant platform, you would just take advantage of fork()'s shared memory model and pipes - i.e, read the data, then fork in a loop to process it, then use pipes to communicate. It ran about 3x faster for me by doing this, and obviously scales with the workloads you have(the provided data only seems to have 2.) If you could provide a larger dataset and the python implementation, that would be great.

I'm actually surprised and disappointed that there isn't a fork()-backend to std.process OR std.parallel. You have to use stdc
May 13, 2015
On Wednesday, 13 May 2015 at 11:33:55 UTC, John Colvin wrote:
> On Tuesday, 12 May 2015 at 18:14:56 UTC, Gerald Jansen wrote:
>> On Tuesday, 12 May 2015 at 16:35:23 UTC, Rikki Cattermole wrote:
>>> On 13/05/2015 4:20 a.m., Gerald Jansen wrote:
>>>> At the risk of great embarassment ... here's my program:
>>>> http://dekoppel.eu/tmp/pedupg.d
>>>
>>> Would it be possible to give us some example data?
>>> I might give it a go to try rewriting it tomorrow.
>>
>> http://dekoppel.eu/tmp/pedupgLarge.tar.gz (89 Mb)
>>
>> Contains two largish datasets in a directory structure expected by the program.
>
> I only see 2 traits in that example, so it's hard for anyone to explore your scaling problem, seeing as there are a maximum of 2 tasks.

Either way, a few small changes were enough to cut the runtime by a factor of ~6 in the single-threaded case and improve the scaling a bit, although the printing to output files still looks like a bit of a bottleneck.

http://dpaste.dzfl.pl/80cd36fd6796

The key thing was reducing the number of allocations (more std.algorithm.splitter copying to static arrays, less std.array.split) and avoiding File.byLine. Other people in this thread have mentioned alternatives to it that may be faster/have lower memory usage, I just read the whole files in to memory and then lazily split them with std.algorithm.splitter. I ended up with some blank lines coming through, so i added if(line.empty) continue; in a few places, you might want to look more carefully at that, it could be my mistake.

The use of std.array.appender for `info` is just good practice, but it doesn't make much difference here.
May 13, 2015
On Wednesday, 13 May 2015 at 11:33:55 UTC, John Colvin wrote:
> On Tuesday, 12 May 2015 at 18:14:56 UTC, Gerald Jansen wrote:
>> On Tuesday, 12 May 2015 at 16:35:23 UTC, Rikki Cattermole wrote:
>>> On 13/05/2015 4:20 a.m., Gerald Jansen wrote:
>>>> At the risk of great embarassment ... here's my program:
>>>> http://dekoppel.eu/tmp/pedupg.d
>>>
>>> Would it be possible to give us some example data?
>>> I might give it a go to try rewriting it tomorrow.
>>
>> http://dekoppel.eu/tmp/pedupgLarge.tar.gz (89 Mb)
>>
>> Contains two largish datasets in a directory structure expected by the program.
>
> I only see 2 traits in that example, so it's hard for anyone to explore your scaling problem, seeing as there are a maximum of 2 tasks.

The problem is already evident with 2 traits: the Elapsed time is about doubled for the D version whereas it is practically unchanged for the Python version.

But just for fun here are 4 traits:
http://dekoppel.eu/tmp/pedupgLarge.tar.gz (109 Mb)

If you need even more traits, you can just make copies of the wrk/mil directory, make empty directories with the same name in (eg. log/mi4) and add the names on the first line of file wrk/run_traits. To run a single trait, just remove all names except mil from that file.

May 13, 2015
On Wednesday, 13 May 2015 at 14:11:25 UTC, Gerald Jansen wrote:
> On Wednesday, 13 May 2015 at 11:33:55 UTC, John Colvin wrote:
>> On Tuesday, 12 May 2015 at 18:14:56 UTC, Gerald Jansen wrote:
>>> On Tuesday, 12 May 2015 at 16:35:23 UTC, Rikki Cattermole wrote:
>>>> On 13/05/2015 4:20 a.m., Gerald Jansen wrote:
>>>>> At the risk of great embarassment ... here's my program:
>>>>> http://dekoppel.eu/tmp/pedupg.d
>>>>
>>>> Would it be possible to give us some example data?
>>>> I might give it a go to try rewriting it tomorrow.
>>>
>>> http://dekoppel.eu/tmp/pedupgLarge.tar.gz (89 Mb)
>>>
>>> Contains two largish datasets in a directory structure expected by the program.
>>
>> I only see 2 traits in that example, so it's hard for anyone to explore your scaling problem, seeing as there are a maximum of 2 tasks.
>
> The problem is already evident with 2 traits: the Elapsed time is about doubled for the D version whereas it is practically unchanged for the Python version.
>
> But just for fun here are 4 traits:
> http://dekoppel.eu/tmp/pedupgLarge.tar.gz (109 Mb)
>
> If you need even more traits, you can just make copies of the wrk/mil directory, make empty directories with the same name in (eg. log/mi4) and add the names on the first line of file wrk/run_traits. To run a single trait, just remove all names except mil from that file.
http://dekoppel.eu/tmp/pedupgLarge4.tar.gz (109 Mb)
May 13, 2015
On Wednesday, 13 May 2015 at 13:40:33 UTC, John Colvin wrote:
> On Wednesday, 13 May 2015 at 11:33:55 UTC, John Colvin wrote:
>> On Tuesday, 12 May 2015 at 18:14:56 UTC, Gerald Jansen wrote:
>>> On Tuesday, 12 May 2015 at 16:35:23 UTC, Rikki Cattermole wrote:
>>>> On 13/05/2015 4:20 a.m., Gerald Jansen wrote:
>>>>> At the risk of great embarassment ... here's my program:
>>>>> http://dekoppel.eu/tmp/pedupg.d
>>>>
>>>> Would it be possible to give us some example data?
>>>> I might give it a go to try rewriting it tomorrow.
>>>
>>> http://dekoppel.eu/tmp/pedupgLarge.tar.gz (89 Mb)
>>>
>>> Contains two largish datasets in a directory structure expected by the program.
>>
>> I only see 2 traits in that example, so it's hard for anyone to explore your scaling problem, seeing as there are a maximum of 2 tasks.
>
> Either way, a few small changes were enough to cut the runtime by a factor of ~6 in the single-threaded case and improve the scaling a bit, although the printing to output files still looks like a bit of a bottleneck.
>

> http://dpaste.dzfl.pl/80cd36fd6796
>
> The key thing was reducing the number of allocations (more std.algorithm.splitter copying to static arrays, less std.array.split) and avoiding File.byLine. Other people in this thread have mentioned alternatives to it that may be faster/have lower memory usage, I just read the whole files in to memory and then lazily split them with std.algorithm.splitter. I ended up with some blank lines coming through, so i added if(line.empty) continue; in a few places, you might want to look more carefully at that, it could be my mistake.
>
> The use of std.array.appender for `info` is just good practice, but it doesn't make much difference here.

Wow, I'm impressed with the effort you guys (John, Rikki, others) are making to teach me some efficiency tricks. I guess this is one of the strengths of D: its community. I'm studying your various contributions closely!

The empty line comes from the very last line on the files, which also end with a newline (as per "normal" practice?).