Thread overview
Simplest multithreading example
Sep 01, 2017
Brian
Sep 01, 2017
Michael Coulombe
Sep 01, 2017
Ali Çehreli
Sep 01, 2017
Brian
Sep 01, 2017
Ali Çehreli
Sep 01, 2017
ag0aep6g
Sep 05, 2017
Brian
Sep 05, 2017
ag0aep6g
September 01, 2017
Hello, I am trying to get the most trivial example of multithreading working, but can't seem to figure it out.
I want to split a task across threads, and wait for all those tasks to finish before moving to the next line of code.

The following 2 attempts have failed :

-----------------------------------------------------
Trial 1 :
-----------------------------------------------------

auto I = std.range.iota(0,500);
int [] X; // output
foreach (i; parallel(I) )
    X ~= i;
core.thread.thread_joinAll(); // Apparently no applicable here ?
writeln(X); // some random subset of indices

------------------------------------------------
Trial 2 : (closer to Java)
------------------------------------------------
class DerivedThread : Thread
{
    int [] X;
    int i;
    this(int [] X, int i){
        this.X = X;
	this.i = i;
        super(&run);
    }

    private:
        void run(){
            X ~= i;
        }
}

void main(){
	auto I = std.range.iota(0,500);
	int [] X; // output
	Thread [] threads;
	foreach (i; I )
		threads ~= new DerivedThread( X,i);
	foreach( thread; threads)
		thread.start();
	foreach( thread; threads)
		thread.join(); // does not seem to do anything
	core.thread.thread_joinAll(); // also not doing anything

	writeln(X); // X contains nothing at all
}

How can I get the program to wait until all threads have finished before moving to the next line of code ?

Thank you !

September 01, 2017
On Friday, 1 September 2017 at 01:59:07 UTC, Brian wrote:
> Hello, I am trying to get the most trivial example of multithreading working, but can't seem to figure it out.
> I want to split a task across threads, and wait for all those tasks to finish before moving to the next line of code.
>
> The following 2 attempts have failed :
>
> -----------------------------------------------------
> Trial 1 :
> -----------------------------------------------------
>
> auto I = std.range.iota(0,500);
> int [] X; // output
> foreach (i; parallel(I) )
>     X ~= i;
> core.thread.thread_joinAll(); // Apparently no applicable here ?
> writeln(X); // some random subset of indices
>
> ------------------------------------------------
> Trial 2 : (closer to Java)
> ------------------------------------------------
> class DerivedThread : Thread
> {
>     int [] X;
>     int i;
>     this(int [] X, int i){
>         this.X = X;
> 	this.i = i;
>         super(&run);
>     }
>
>     private:
>         void run(){
>             X ~= i;
>         }
> }
>
> void main(){
> 	auto I = std.range.iota(0,500);
> 	int [] X; // output
> 	Thread [] threads;
> 	foreach (i; I )
> 		threads ~= new DerivedThread( X,i);
> 	foreach( thread; threads)
> 		thread.start();
> 	foreach( thread; threads)
> 		thread.join(); // does not seem to do anything
> 	core.thread.thread_joinAll(); // also not doing anything
>
> 	writeln(X); // X contains nothing at all
> }
>
> How can I get the program to wait until all threads have finished before moving to the next line of code ?
>
> Thank you !

Just like a sequential loop, when you do "foreach (i; parallel(I) ) { ... }", execution will not continue past the foreach loop until all the tasks associated with each element of I have finished.

Your particular example of "X ~= i" in the body of the loop is not thread-safe, so if that is the code you really intend to run, you should protect X with a Mutex or something comparable.
August 31, 2017
On 08/31/2017 06:59 PM, Brian wrote:
> Hello, I am trying to get the most trivial example of multithreading
> working, but can't seem to figure it out.
> I want to split a task across threads, and wait for all those tasks to
> finish before moving to the next line of code.
>
> The following 2 attempts have failed :
>
> -----------------------------------------------------
> Trial 1 :
> -----------------------------------------------------
>
> auto I = std.range.iota(0,500);
> int [] X; // output
> foreach (i; parallel(I) )
>     X ~= i;
> core.thread.thread_joinAll(); // Apparently no applicable here ?

As Michael Coulombe said, parallel() does that implicitly.

If the problem is to generate numbers in parallel, I restructured the code by letting each thread touch only its element of a results array that has already been resized for all the results (so that there is no race condition):

import std.stdio;
import std.parallelism;
import std.range;

void main() {
    auto arrs = new int[][](totalCPUs);
    const perWorker = 10;
    foreach (i, arr; parallel(arrs)) {
        const beg = cast(int)i * perWorker;
        const end = beg + perWorker;
        arrs[i] = std.range.iota(beg,end).array;
    }

    writeln(arrs);
}

If needed, std.algorithm.joiner can be used to make it a single sequence of ints:

    import std.algorithm;
    writeln(arrs.joiner);

Ali

September 01, 2017
On Friday, 1 September 2017 at 04:43:29 UTC, Ali Çehreli wrote:
> On 08/31/2017 06:59 PM, Brian wrote:
> > Hello, I am trying to get the most trivial example of
> multithreading
> > working, but can't seem to figure it out.
> > I want to split a task across threads, and wait for all those
> tasks to
> > finish before moving to the next line of code.
> >
> > The following 2 attempts have failed :
> >
> > -----------------------------------------------------
> > Trial 1 :
> > -----------------------------------------------------
> >
> > auto I = std.range.iota(0,500);
> > int [] X; // output
> > foreach (i; parallel(I) )
> >     X ~= i;
> > core.thread.thread_joinAll(); // Apparently no applicable
> here ?
>
> As Michael Coulombe said, parallel() does that implicitly.
>
> If the problem is to generate numbers in parallel, I restructured the code by letting each thread touch only its element of a results array that has already been resized for all the results (so that there is no race condition):
>
> import std.stdio;
> import std.parallelism;
> import std.range;
>
> void main() {
>     auto arrs = new int[][](totalCPUs);
>     const perWorker = 10;
>     foreach (i, arr; parallel(arrs)) {
>         const beg = cast(int)i * perWorker;
>         const end = beg + perWorker;
>         arrs[i] = std.range.iota(beg,end).array;
>     }
>
>     writeln(arrs);
> }
>
> If needed, std.algorithm.joiner can be used to make it a single sequence of ints:
>
>     import std.algorithm;
>     writeln(arrs.joiner);
>
> Ali

Hello, thank you very much for your quick replies !

I was trying to make a trivial example, but the 'real' problem is trying to split a huge calculation to different threads.

Schematically :

double [] hugeCalc(int i){
    // Code that takes a long time
}

so if I do


double[][int] _hugeCalcCache;
foreach(i ; I)
   _hugeCalcCache[i] = hugeCalc(i);

of course the required time is I.length * (a long time), so I wanted to shorten this by splitting to different threads :

foreach(i ; parallel(I) )
   _hugeCalcCache[i] = hugeCalc(i);

but as you can guess, it doesn't work that easily.

Very interesting approach about letting only the thread touch a particular element, I will try that.

FYI I did manage to make the following work, but not sure if this is really still multi-threaded ?


	int [] I;
	foreach (i; 0 .. 500) I ~= i;
	int [] X; // output
	class DerivedThread : Thread {
		private int [] i;
		this(int [] i){
			this.i = i;
			super(&run);
		}
		private void run(){
			synchronized{ // Need synchronization here !
				foreach( i0; i)
					X ~= i0;
			}
		}
	}
	Thread [] threads;
	foreach (i; std.range.chunks( I, 50 ) )
		threads ~= new DerivedThread( i);
	foreach( thread; threads)
		thread.start();
	
	core.thread.thread_joinAll(); // Does in fact seem to 'join all' threads
	writeln(X);
September 01, 2017
On 08/31/2017 10:27 PM, Brian wrote:

> the 'real' problem is trying
> to split a huge calculation to different threads.

I still think you can take advantage of std.parallelism:

  https://dlang.org/phobos/std_parallelism.html

Unfortunately, its features like asyncBuf, map, and amap do not stand out in the documentation. Here's my interpretation of them:

  http://ddili.org/ders/d.en/parallelism.html

Ali

September 01, 2017
On 09/01/2017 07:27 AM, Brian wrote:
> double [] hugeCalc(int i){
>      // Code that takes a long time
> }
> 
> so if I do
> 
> 
> double[][int] _hugeCalcCache;
> foreach(i ; I)
>     _hugeCalcCache[i] = hugeCalc(i);
> 
> of course the required time is I.length * (a long time), so I wanted to shorten this by splitting to different threads :
> 
> foreach(i ; parallel(I) )
>     _hugeCalcCache[i] = hugeCalc(i);
> 
> but as you can guess, it doesn't work that easily.

Works pretty well for me:

----
double [] hugeCalc(int i)
{
    // Code that takes a long time
    import core.thread: Thread;
    import std.datetime: seconds;
    Thread.sleep(1.seconds);
    return [i];
}

void main()
{
    static import std.range;
    import std.parallelism: parallel;
    auto I = std.range.iota(0, 10);
    double[][int] _hugeCalcCache;
    foreach(i ; parallel(I))
        _hugeCalcCache[i] = hugeCalc(i);
}
----

That runs in about 3 seconds here. The serial version would of course take about 10 seconds. So, parallelism achieved!

Though I don't know if it's safe to access an associative array concurrently like that. I'd use a normal dynamic array instead and initialize it before going parallel:

----
auto _hugeCalcCache = new double[][](10);
----
September 05, 2017
On Friday, 1 September 2017 at 20:02:23 UTC, ag0aep6g wrote:
> On 09/01/2017 07:27 AM, Brian wrote:
>> double [] hugeCalc(int i){
>>      // Code that takes a long time
>> }
>> 
>> so if I do
>> 
>> 
>> double[][int] _hugeCalcCache;
>> foreach(i ; I)
>>     _hugeCalcCache[i] = hugeCalc(i);
>> 
>> of course the required time is I.length * (a long time), so I wanted to shorten this by splitting to different threads :
>> 
>> foreach(i ; parallel(I) )
>>     _hugeCalcCache[i] = hugeCalc(i);
>> 
>> but as you can guess, it doesn't work that easily.
>
> Works pretty well for me:
>
> ----
> double [] hugeCalc(int i)
> {
>     // Code that takes a long time
>     import core.thread: Thread;
>     import std.datetime: seconds;
>     Thread.sleep(1.seconds);
>     return [i];
> }
>
> void main()
> {
>     static import std.range;
>     import std.parallelism: parallel;
>     auto I = std.range.iota(0, 10);
>     double[][int] _hugeCalcCache;
>     foreach(i ; parallel(I))
>         _hugeCalcCache[i] = hugeCalc(i);
> }
> ----
>
> That runs in about 3 seconds here. The serial version would of course take about 10 seconds. So, parallelism achieved!
>
> Though I don't know if it's safe to access an associative array concurrently like that. I'd use a normal dynamic array instead and initialize it before going parallel:
>
> ----
> auto _hugeCalcCache = new double[][](10);
> ----

Thanks very much for your help, I finally had time to try your suggestions. The initial example you showed does indeed have the same problem of not iterating over all values :


    double [] hugeCalc(int i){
	// Code that takes a long time
	import core.thread: Thread;
	import std.datetime: seconds;
	Thread.sleep(1.seconds);
	return [i];
    }

    static import std.range;
    import std.parallelism: parallel;
    auto I = std.range.iota(0, 100);
    double[][int] _hugeCalcCache;
    foreach(i ; parallel(I))
        _hugeCalcCache[i] = hugeCalc(i);

	
     writeln( _hugeCalcCache.keys ); // this is some random subset of (0,100)

but this does seem to work using your other method of initialization :


    auto _hugeCalcCache = new double[][](100);
    foreach(i ; parallel(I))
        _hugeCalcCache[i] = hugeCalc(i);

    foreach( double[] x ; _hugeCalcCache)
	writeln( x ); // this now contains all values


so I guess initializing the whole array at compile time makes it thread safe ?
(The second case runs in 16 seconds on my computer.)
Anyways it seems to work, thanks again !

September 05, 2017
On 09/05/2017 03:15 AM, Brian wrote:
> Thanks very much for your help, I finally had time to try your suggestions. The initial example you showed does indeed have the same problem of not iterating over all values :
> 
> 
>      double [] hugeCalc(int i){
>      // Code that takes a long time
>      import core.thread: Thread;
>      import std.datetime: seconds;
>      Thread.sleep(1.seconds);
>      return [i];
>      }
> 
>      static import std.range;
>      import std.parallelism: parallel;
>      auto I = std.range.iota(0, 100);
>      double[][int] _hugeCalcCache;
>      foreach(i ; parallel(I))
>          _hugeCalcCache[i] = hugeCalc(i);
> 
> 
>       writeln( _hugeCalcCache.keys ); // this is some random subset of (0,100)

Yeah. As expected, associative array accesses are apparently not thread-safe.

A simple writeln is a terrible way to figure that out, though. I'd suggest sorting the keys and comparing that to `I`:

----
import std.algorithm: equal, sort;
auto sortedKeys = _hugeCalcCache.keys.sort;
assert(sortedKeys.equal(I));
----

> but this does seem to work using your other method of initialization :
> 
> 
>      auto _hugeCalcCache = new double[][](100);
>      foreach(i ; parallel(I))
>          _hugeCalcCache[i] = hugeCalc(i);
> 
>      foreach( double[] x ; _hugeCalcCache)
>      writeln( x ); // this now contains all values
> 
> 
> so I guess initializing the whole array at compile time makes it thread safe ?

There's nothing compile-timey about the code. The initialization is done at run-time, but before the parallel stuff starts.

Note that the type of `_hugeCalcCache` here is different from above. Here it's `double[][]`, i.e. a dynamic array. Above it's `double[][int]`, i.e. an associative array. Those types are quite different, despite their similar names.

You can prepare an associative array in a similar way, before doing the parallel stuff. Then it might be thread-safe (not sure):

----
double[][int] _hugeCalcCache; /* associative array */

/* First initialize the elements serially: */
foreach(i; I) _hugeCalcCache[i] = [];

/* Then do the huge calculations in parallel: */
foreach(i; parallel(I)) _hugeCalcCache[i] = hugeCalc(i);
----

But if your keys are consecutive numbers, I see no point in using an associative array.