Thread overview | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
September 01, 2017 Simplest multithreading example | ||||
---|---|---|---|---|
| ||||
Hello, I am trying to get the most trivial example of multithreading working, but can't seem to figure it out. I want to split a task across threads, and wait for all those tasks to finish before moving to the next line of code. The following 2 attempts have failed : ----------------------------------------------------- Trial 1 : ----------------------------------------------------- auto I = std.range.iota(0,500); int [] X; // output foreach (i; parallel(I) ) X ~= i; core.thread.thread_joinAll(); // Apparently no applicable here ? writeln(X); // some random subset of indices ------------------------------------------------ Trial 2 : (closer to Java) ------------------------------------------------ class DerivedThread : Thread { int [] X; int i; this(int [] X, int i){ this.X = X; this.i = i; super(&run); } private: void run(){ X ~= i; } } void main(){ auto I = std.range.iota(0,500); int [] X; // output Thread [] threads; foreach (i; I ) threads ~= new DerivedThread( X,i); foreach( thread; threads) thread.start(); foreach( thread; threads) thread.join(); // does not seem to do anything core.thread.thread_joinAll(); // also not doing anything writeln(X); // X contains nothing at all } How can I get the program to wait until all threads have finished before moving to the next line of code ? Thank you ! |
September 01, 2017 Re: Simplest multithreading example | ||||
---|---|---|---|---|
| ||||
Posted in reply to Brian | On Friday, 1 September 2017 at 01:59:07 UTC, Brian wrote:
> Hello, I am trying to get the most trivial example of multithreading working, but can't seem to figure it out.
> I want to split a task across threads, and wait for all those tasks to finish before moving to the next line of code.
>
> The following 2 attempts have failed :
>
> -----------------------------------------------------
> Trial 1 :
> -----------------------------------------------------
>
> auto I = std.range.iota(0,500);
> int [] X; // output
> foreach (i; parallel(I) )
> X ~= i;
> core.thread.thread_joinAll(); // Apparently no applicable here ?
> writeln(X); // some random subset of indices
>
> ------------------------------------------------
> Trial 2 : (closer to Java)
> ------------------------------------------------
> class DerivedThread : Thread
> {
> int [] X;
> int i;
> this(int [] X, int i){
> this.X = X;
> this.i = i;
> super(&run);
> }
>
> private:
> void run(){
> X ~= i;
> }
> }
>
> void main(){
> auto I = std.range.iota(0,500);
> int [] X; // output
> Thread [] threads;
> foreach (i; I )
> threads ~= new DerivedThread( X,i);
> foreach( thread; threads)
> thread.start();
> foreach( thread; threads)
> thread.join(); // does not seem to do anything
> core.thread.thread_joinAll(); // also not doing anything
>
> writeln(X); // X contains nothing at all
> }
>
> How can I get the program to wait until all threads have finished before moving to the next line of code ?
>
> Thank you !
Just like a sequential loop, when you do "foreach (i; parallel(I) ) { ... }", execution will not continue past the foreach loop until all the tasks associated with each element of I have finished.
Your particular example of "X ~= i" in the body of the loop is not thread-safe, so if that is the code you really intend to run, you should protect X with a Mutex or something comparable.
|
August 31, 2017 Re: Simplest multithreading example | ||||
---|---|---|---|---|
| ||||
Posted in reply to Brian | On 08/31/2017 06:59 PM, Brian wrote: > Hello, I am trying to get the most trivial example of multithreading > working, but can't seem to figure it out. > I want to split a task across threads, and wait for all those tasks to > finish before moving to the next line of code. > > The following 2 attempts have failed : > > ----------------------------------------------------- > Trial 1 : > ----------------------------------------------------- > > auto I = std.range.iota(0,500); > int [] X; // output > foreach (i; parallel(I) ) > X ~= i; > core.thread.thread_joinAll(); // Apparently no applicable here ? As Michael Coulombe said, parallel() does that implicitly. If the problem is to generate numbers in parallel, I restructured the code by letting each thread touch only its element of a results array that has already been resized for all the results (so that there is no race condition): import std.stdio; import std.parallelism; import std.range; void main() { auto arrs = new int[][](totalCPUs); const perWorker = 10; foreach (i, arr; parallel(arrs)) { const beg = cast(int)i * perWorker; const end = beg + perWorker; arrs[i] = std.range.iota(beg,end).array; } writeln(arrs); } If needed, std.algorithm.joiner can be used to make it a single sequence of ints: import std.algorithm; writeln(arrs.joiner); Ali |
September 01, 2017 Re: Simplest multithreading example | ||||
---|---|---|---|---|
| ||||
Posted in reply to Ali Çehreli | On Friday, 1 September 2017 at 04:43:29 UTC, Ali Çehreli wrote:
> On 08/31/2017 06:59 PM, Brian wrote:
> > Hello, I am trying to get the most trivial example of
> multithreading
> > working, but can't seem to figure it out.
> > I want to split a task across threads, and wait for all those
> tasks to
> > finish before moving to the next line of code.
> >
> > The following 2 attempts have failed :
> >
> > -----------------------------------------------------
> > Trial 1 :
> > -----------------------------------------------------
> >
> > auto I = std.range.iota(0,500);
> > int [] X; // output
> > foreach (i; parallel(I) )
> > X ~= i;
> > core.thread.thread_joinAll(); // Apparently no applicable
> here ?
>
> As Michael Coulombe said, parallel() does that implicitly.
>
> If the problem is to generate numbers in parallel, I restructured the code by letting each thread touch only its element of a results array that has already been resized for all the results (so that there is no race condition):
>
> import std.stdio;
> import std.parallelism;
> import std.range;
>
> void main() {
> auto arrs = new int[][](totalCPUs);
> const perWorker = 10;
> foreach (i, arr; parallel(arrs)) {
> const beg = cast(int)i * perWorker;
> const end = beg + perWorker;
> arrs[i] = std.range.iota(beg,end).array;
> }
>
> writeln(arrs);
> }
>
> If needed, std.algorithm.joiner can be used to make it a single sequence of ints:
>
> import std.algorithm;
> writeln(arrs.joiner);
>
> Ali
Hello, thank you very much for your quick replies !
I was trying to make a trivial example, but the 'real' problem is trying to split a huge calculation to different threads.
Schematically :
double [] hugeCalc(int i){
// Code that takes a long time
}
so if I do
double[][int] _hugeCalcCache;
foreach(i ; I)
_hugeCalcCache[i] = hugeCalc(i);
of course the required time is I.length * (a long time), so I wanted to shorten this by splitting to different threads :
foreach(i ; parallel(I) )
_hugeCalcCache[i] = hugeCalc(i);
but as you can guess, it doesn't work that easily.
Very interesting approach about letting only the thread touch a particular element, I will try that.
FYI I did manage to make the following work, but not sure if this is really still multi-threaded ?
int [] I;
foreach (i; 0 .. 500) I ~= i;
int [] X; // output
class DerivedThread : Thread {
private int [] i;
this(int [] i){
this.i = i;
super(&run);
}
private void run(){
synchronized{ // Need synchronization here !
foreach( i0; i)
X ~= i0;
}
}
}
Thread [] threads;
foreach (i; std.range.chunks( I, 50 ) )
threads ~= new DerivedThread( i);
foreach( thread; threads)
thread.start();
core.thread.thread_joinAll(); // Does in fact seem to 'join all' threads
writeln(X);
|
September 01, 2017 Re: Simplest multithreading example | ||||
---|---|---|---|---|
| ||||
Posted in reply to Brian | On 08/31/2017 10:27 PM, Brian wrote: > the 'real' problem is trying > to split a huge calculation to different threads. I still think you can take advantage of std.parallelism: https://dlang.org/phobos/std_parallelism.html Unfortunately, its features like asyncBuf, map, and amap do not stand out in the documentation. Here's my interpretation of them: http://ddili.org/ders/d.en/parallelism.html Ali |
September 01, 2017 Re: Simplest multithreading example | ||||
---|---|---|---|---|
| ||||
Posted in reply to Brian | On 09/01/2017 07:27 AM, Brian wrote:
> double [] hugeCalc(int i){
> // Code that takes a long time
> }
>
> so if I do
>
>
> double[][int] _hugeCalcCache;
> foreach(i ; I)
> _hugeCalcCache[i] = hugeCalc(i);
>
> of course the required time is I.length * (a long time), so I wanted to shorten this by splitting to different threads :
>
> foreach(i ; parallel(I) )
> _hugeCalcCache[i] = hugeCalc(i);
>
> but as you can guess, it doesn't work that easily.
Works pretty well for me:
----
double [] hugeCalc(int i)
{
// Code that takes a long time
import core.thread: Thread;
import std.datetime: seconds;
Thread.sleep(1.seconds);
return [i];
}
void main()
{
static import std.range;
import std.parallelism: parallel;
auto I = std.range.iota(0, 10);
double[][int] _hugeCalcCache;
foreach(i ; parallel(I))
_hugeCalcCache[i] = hugeCalc(i);
}
----
That runs in about 3 seconds here. The serial version would of course take about 10 seconds. So, parallelism achieved!
Though I don't know if it's safe to access an associative array concurrently like that. I'd use a normal dynamic array instead and initialize it before going parallel:
----
auto _hugeCalcCache = new double[][](10);
----
|
September 05, 2017 Re: Simplest multithreading example | ||||
---|---|---|---|---|
| ||||
Posted in reply to ag0aep6g | On Friday, 1 September 2017 at 20:02:23 UTC, ag0aep6g wrote:
> On 09/01/2017 07:27 AM, Brian wrote:
>> double [] hugeCalc(int i){
>> // Code that takes a long time
>> }
>>
>> so if I do
>>
>>
>> double[][int] _hugeCalcCache;
>> foreach(i ; I)
>> _hugeCalcCache[i] = hugeCalc(i);
>>
>> of course the required time is I.length * (a long time), so I wanted to shorten this by splitting to different threads :
>>
>> foreach(i ; parallel(I) )
>> _hugeCalcCache[i] = hugeCalc(i);
>>
>> but as you can guess, it doesn't work that easily.
>
> Works pretty well for me:
>
> ----
> double [] hugeCalc(int i)
> {
> // Code that takes a long time
> import core.thread: Thread;
> import std.datetime: seconds;
> Thread.sleep(1.seconds);
> return [i];
> }
>
> void main()
> {
> static import std.range;
> import std.parallelism: parallel;
> auto I = std.range.iota(0, 10);
> double[][int] _hugeCalcCache;
> foreach(i ; parallel(I))
> _hugeCalcCache[i] = hugeCalc(i);
> }
> ----
>
> That runs in about 3 seconds here. The serial version would of course take about 10 seconds. So, parallelism achieved!
>
> Though I don't know if it's safe to access an associative array concurrently like that. I'd use a normal dynamic array instead and initialize it before going parallel:
>
> ----
> auto _hugeCalcCache = new double[][](10);
> ----
Thanks very much for your help, I finally had time to try your suggestions. The initial example you showed does indeed have the same problem of not iterating over all values :
double [] hugeCalc(int i){
// Code that takes a long time
import core.thread: Thread;
import std.datetime: seconds;
Thread.sleep(1.seconds);
return [i];
}
static import std.range;
import std.parallelism: parallel;
auto I = std.range.iota(0, 100);
double[][int] _hugeCalcCache;
foreach(i ; parallel(I))
_hugeCalcCache[i] = hugeCalc(i);
writeln( _hugeCalcCache.keys ); // this is some random subset of (0,100)
but this does seem to work using your other method of initialization :
auto _hugeCalcCache = new double[][](100);
foreach(i ; parallel(I))
_hugeCalcCache[i] = hugeCalc(i);
foreach( double[] x ; _hugeCalcCache)
writeln( x ); // this now contains all values
so I guess initializing the whole array at compile time makes it thread safe ?
(The second case runs in 16 seconds on my computer.)
Anyways it seems to work, thanks again !
|
September 05, 2017 Re: Simplest multithreading example | ||||
---|---|---|---|---|
| ||||
Posted in reply to Brian | On 09/05/2017 03:15 AM, Brian wrote: > Thanks very much for your help, I finally had time to try your suggestions. The initial example you showed does indeed have the same problem of not iterating over all values : > > > double [] hugeCalc(int i){ > // Code that takes a long time > import core.thread: Thread; > import std.datetime: seconds; > Thread.sleep(1.seconds); > return [i]; > } > > static import std.range; > import std.parallelism: parallel; > auto I = std.range.iota(0, 100); > double[][int] _hugeCalcCache; > foreach(i ; parallel(I)) > _hugeCalcCache[i] = hugeCalc(i); > > > writeln( _hugeCalcCache.keys ); // this is some random subset of (0,100) Yeah. As expected, associative array accesses are apparently not thread-safe. A simple writeln is a terrible way to figure that out, though. I'd suggest sorting the keys and comparing that to `I`: ---- import std.algorithm: equal, sort; auto sortedKeys = _hugeCalcCache.keys.sort; assert(sortedKeys.equal(I)); ---- > but this does seem to work using your other method of initialization : > > > auto _hugeCalcCache = new double[][](100); > foreach(i ; parallel(I)) > _hugeCalcCache[i] = hugeCalc(i); > > foreach( double[] x ; _hugeCalcCache) > writeln( x ); // this now contains all values > > > so I guess initializing the whole array at compile time makes it thread safe ? There's nothing compile-timey about the code. The initialization is done at run-time, but before the parallel stuff starts. Note that the type of `_hugeCalcCache` here is different from above. Here it's `double[][]`, i.e. a dynamic array. Above it's `double[][int]`, i.e. an associative array. Those types are quite different, despite their similar names. You can prepare an associative array in a similar way, before doing the parallel stuff. Then it might be thread-safe (not sure): ---- double[][int] _hugeCalcCache; /* associative array */ /* First initialize the elements serially: */ foreach(i; I) _hugeCalcCache[i] = []; /* Then do the huge calculations in parallel: */ foreach(i; parallel(I)) _hugeCalcCache[i] = hugeCalc(i); ---- But if your keys are consecutive numbers, I see no point in using an associative array. |
Copyright © 1999-2021 by the D Language Foundation