Jump to page: 1 2
Thread overview
Simple parallel foreach and summation/reduction
Sep 20, 2018
Chris Katko
Sep 20, 2018
Neia Neutuladh
Sep 21, 2018
Chris Katko
Sep 21, 2018
Dennis
Sep 21, 2018
Ali Çehreli
Sep 22, 2018
Chris Katko
Sep 22, 2018
Chris Katko
Sep 24, 2018
Chris Katko
Sep 24, 2018
Chris Katko
Sep 24, 2018
Chris Katko
Sep 24, 2018
Russel Winder
September 20, 2018
All I want to do is loop from 0 to [constant] with a for or foreach, and have it split up across however many cores I have.

    ulong sum;
    foreach(i; [0 to 1 trillion])
      {
      //flip some dice using
      float die_value = uniform(0F,12F);
      if(die_value > [constant]) sum++;
      }
    writeln("The sum is %d", sum);

However, there are two caveats.:

 - One: I can't throw a range of values into an array and foreach on that like many examples use. Because 1 trillion (counting from zero) might be a little big for an array. (I'm using 1 trillion to illustrate a specific bottleneck / problem form.)

 - I want to merge the results at the end.

Which means I either need to use mutexes (BAD. NO. BOO. HISS.)  or each "thread" would need to know if it's separate, and then store their sums in, say, a thread[#].sum variable and then once all were completed, add those sums together.

I know this is an incredibly simple conceptual problem to solve. So I feel like I'm missing some huge, obvious, answer for doing it elegantly in D.

And this just occurred to me, if I had a trillion foreach, will that make 1 trillion threads? What I want is, IIRC, what OpenMP does. It divides up your range (blocks of sequential numbers) by the number of threads. So domain of [1 to 1000] with ten threads would become workloads on the indexes of [1-100], [101-200], [201-300], and so on. for each CPU. They each get a 100 element chunk.

So I guess foreach won't work here for that, will it? Hmmm...

 ----> But again, conceptually this is simple: I have, say, 1 trillion sequential numbers. I want to assign a "block" (or "range") to each CPU core. And since their math does not actually interfer with each other, I can simply sum each core's results at the end.

Thanks,
--Chris
September 20, 2018
On Thursday, 20 September 2018 at 05:34:42 UTC, Chris Katko wrote:
> All I want to do is loop from 0 to [constant] with a for or foreach, and have it split up across however many cores I have.

You're looking at std.parallelism.TaskPool, especially the amap and reduce functions. Should do pretty much exactly what you're asking.

auto taskpool = new TaskPool();
taskpool.reduce!((a, b) => a + b)(iota(1_000_000_000_000L));
September 21, 2018
On Thursday, 20 September 2018 at 05:51:17 UTC, Neia Neutuladh wrote:
> On Thursday, 20 September 2018 at 05:34:42 UTC, Chris Katko wrote:
>> All I want to do is loop from 0 to [constant] with a for or foreach, and have it split up across however many cores I have.
>
> You're looking at std.parallelism.TaskPool, especially the amap and reduce functions. Should do pretty much exactly what you're asking.
>
> auto taskpool = new TaskPool();
> taskpool.reduce!((a, b) => a + b)(iota(1_000_000_000_000L));

I get "Error: template instance `reduce!((a, b) => a + b)` cannot use local __lambda1 as parameter to non-global template reduce(functions...)" when trying to compile that using the online D editor with DMD and LDC.

Any ideas?
September 21, 2018
On Friday, 21 September 2018 at 07:25:17 UTC, Chris Katko wrote:
> I get "Error: template instance `reduce!((a, b) => a + b)` cannot use local __lambda1 as parameter to non-global template reduce(functions...)" when trying to compile that using the online D editor with DMD and LDC.
>
> Any ideas?

That's a long standing issue: https://issues.dlang.org/show_bug.cgi?id=5710

Using a string for the expression does work though:
```
import std.stdio, std.parallelism, std.range;

void main() {
    taskPool.reduce!"a + b"(iota(1_000L)).writeln;
}
```
September 21, 2018
On 09/21/2018 12:25 AM, Chris Katko wrote:
> On Thursday, 20 September 2018 at 05:51:17 UTC, Neia Neutuladh wrote:
>> On Thursday, 20 September 2018 at 05:34:42 UTC, Chris Katko wrote:
>>> All I want to do is loop from 0 to [constant] with a for or foreach, and have it split up across however many cores I have.
>>
>> You're looking at std.parallelism.TaskPool, especially the amap and reduce functions. Should do pretty much exactly what you're asking.
>>
>> auto taskpool = new TaskPool();
>> taskpool.reduce!((a, b) => a + b)(iota(1_000_000_000_000L));
> 
> I get "Error: template instance `reduce!((a, b) => a + b)` cannot use local __lambda1 as parameter to non-global template reduce(functions...)" when trying to compile that using the online D editor with DMD and LDC.
> 
> Any ideas?

You can use a free-standing function as a workaround, which is included in the following chapter that explains most of std.parallelism:

  http://ddili.org/ders/d.en/parallelism.html

That chapter is missing e.g. the newly-added fold():

  https://dlang.org/phobos/std_parallelism.html#.TaskPool.fold

Ali
September 22, 2018
On Friday, 21 September 2018 at 12:15:59 UTC, Ali Çehreli wrote:
> On 09/21/2018 12:25 AM, Chris Katko wrote:
>> On Thursday, 20 September 2018 at 05:51:17 UTC, Neia Neutuladh wrote:
>>> On Thursday, 20 September 2018 at 05:34:42 UTC, Chris Katko wrote:
>>>> All I want to do is loop from 0 to [constant] with a for or foreach, and have it split up across however many cores I have.
>>>
>>> You're looking at std.parallelism.TaskPool, especially the amap and reduce functions. Should do pretty much exactly what you're asking.
>>>
>>> auto taskpool = new TaskPool();
>>> taskpool.reduce!((a, b) => a + b)(iota(1_000_000_000_000L));
>> 
>> I get "Error: template instance `reduce!((a, b) => a + b)` cannot use local __lambda1 as parameter to non-global template reduce(functions...)" when trying to compile that using the online D editor with DMD and LDC.
>> 
>> Any ideas?
>
> You can use a free-standing function as a workaround, which is included in the following chapter that explains most of std.parallelism:
>
>   http://ddili.org/ders/d.en/parallelism.html
>
> That chapter is missing e.g. the newly-added fold():
>
>   https://dlang.org/phobos/std_parallelism.html#.TaskPool.fold
>
> Ali

Okay... so I've got it running. The problem is, it uses tons of RAM. In fact, proportional to the working set.

T test(T)(T x, T y)
	{
	return x + y;
	}
	
double monte(T)(T x)
	{
	double v = uniform(-1F, 1F);
	double u = uniform(-1F, 1F);
	if(sqrt(v*v + u*u) < 1.0)
		{
		return 1;
		}else{
		return 0;
		}
	}

	auto taskpool = new TaskPool();
	sum = taskpool.reduce!(test)(
	taskpool.amap!monte(
		iota(num)
		)	);	
	taskpool.finish(true);

1000000 becomes ~8MB
10000000 becomes 80MB
100000000, I can't even run because it says "Exception: Memory Allocation failed"
September 22, 2018
On Saturday, 22 September 2018 at 02:13:58 UTC, Chris Katko wrote:
> On Friday, 21 September 2018 at 12:15:59 UTC, Ali Çehreli wrote:
>> On 09/21/2018 12:25 AM, Chris Katko wrote:
>>> [...]
>>
>> You can use a free-standing function as a workaround, which is included in the following chapter that explains most of std.parallelism:
>>
>>   http://ddili.org/ders/d.en/parallelism.html
>>
>> That chapter is missing e.g. the newly-added fold():
>>
>>   https://dlang.org/phobos/std_parallelism.html#.TaskPool.fold
>>
>> Ali
>
> Okay... so I've got it running. The problem is, it uses tons of RAM. In fact, proportional to the working set.
>
> T test(T)(T x, T y)
> 	{
> 	return x + y;
> 	}
> 	
> double monte(T)(T x)
> 	{
> 	double v = uniform(-1F, 1F);
> 	double u = uniform(-1F, 1F);
> 	if(sqrt(v*v + u*u) < 1.0)
> 		{
> 		return 1;
> 		}else{
> 		return 0;
> 		}
> 	}
>
> 	auto taskpool = new TaskPool();
> 	sum = taskpool.reduce!(test)(
> 	taskpool.amap!monte(
> 		iota(num)
> 		)	);	
> 	taskpool.finish(true);
>
> 1000000 becomes ~8MB
> 10000000 becomes 80MB
> 100000000, I can't even run because it says "Exception: Memory Allocation failed"

Also, when I don't call .finish(true) at the end, it just sits there forever (after running) like one of the threads won't terminate. Requiring a control-C. But the docs and examples don't seem to indicate I should need that...
September 24, 2018
On Saturday, 22 September 2018 at 02:26:41 UTC, Chris Katko wrote:
> On Saturday, 22 September 2018 at 02:13:58 UTC, Chris Katko wrote:
>> On Friday, 21 September 2018 at 12:15:59 UTC, Ali Çehreli wrote:
>>> On 09/21/2018 12:25 AM, Chris Katko wrote:
>>>> [...]
>>>
>>> You can use a free-standing function as a workaround, which is included in the following chapter that explains most of std.parallelism:
>>>
>>>   http://ddili.org/ders/d.en/parallelism.html
>>>
>>> That chapter is missing e.g. the newly-added fold():
>>>
>>>   https://dlang.org/phobos/std_parallelism.html#.TaskPool.fold
>>>
>>> Ali
>>
>> Okay... so I've got it running. The problem is, it uses tons of RAM. In fact, proportional to the working set.
>>
>> T test(T)(T x, T y)
>> 	{
>> 	return x + y;
>> 	}
>> 	
>> double monte(T)(T x)
>> 	{
>> 	double v = uniform(-1F, 1F);
>> 	double u = uniform(-1F, 1F);
>> 	if(sqrt(v*v + u*u) < 1.0)
>> 		{
>> 		return 1;
>> 		}else{
>> 		return 0;
>> 		}
>> 	}
>>
>> 	auto taskpool = new TaskPool();
>> 	sum = taskpool.reduce!(test)(
>> 	taskpool.amap!monte(
>> 		iota(num)
>> 		)	);	
>> 	taskpool.finish(true);
>>
>> 1000000 becomes ~8MB
>> 10000000 becomes 80MB
>> 100000000, I can't even run because it says "Exception: Memory Allocation failed"
>
> Also, when I don't call .finish(true) at the end, it just sits there forever (after running) like one of the threads won't terminate. Requiring a control-C. But the docs and examples don't seem to indicate I should need that...

So I looked into it. It's amap that explodes in RAM.

Per the docs, amap has "less overhead but more memory usage." While map has more overhead but less memory usage and "avoids the need to keep all results in memory."

But, if I make a call to map... it doesn't compile! I get:

    Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map

Simply changing amap to map here:

    sum = taskPool.reduce!(test)
        (
        taskPool.map!(monte)(range)
        );
September 24, 2018
On Monday, 24 September 2018 at 05:59:20 UTC, Chris Katko wrote:
> On Saturday, 22 September 2018 at 02:26:41 UTC, Chris Katko wrote:
>> On Saturday, 22 September 2018 at 02:13:58 UTC, Chris Katko wrote:
>>> On Friday, 21 September 2018 at 12:15:59 UTC, Ali Çehreli wrote:
>>>> On 09/21/2018 12:25 AM, Chris Katko wrote:
>>>>> [...]
>>>>
>>>> You can use a free-standing function as a workaround, which is included in the following chapter that explains most of std.parallelism:
>>>>
>>>>   http://ddili.org/ders/d.en/parallelism.html
>>>>
>>>> That chapter is missing e.g. the newly-added fold():
>>>>
>>>>   https://dlang.org/phobos/std_parallelism.html#.TaskPool.fold
>>>>
>>>> Ali
>>>
>>> Okay... so I've got it running. The problem is, it uses tons of RAM. In fact, proportional to the working set.
>>>
>>> T test(T)(T x, T y)
>>> 	{
>>> 	return x + y;
>>> 	}
>>> 	
>>> double monte(T)(T x)
>>> 	{
>>> 	double v = uniform(-1F, 1F);
>>> 	double u = uniform(-1F, 1F);
>>> 	if(sqrt(v*v + u*u) < 1.0)
>>> 		{
>>> 		return 1;
>>> 		}else{
>>> 		return 0;
>>> 		}
>>> 	}
>>>
>>> 	auto taskpool = new TaskPool();
>>> 	sum = taskpool.reduce!(test)(
>>> 	taskpool.amap!monte(
>>> 		iota(num)
>>> 		)	);	
>>> 	taskpool.finish(true);
>>>
>>> 1000000 becomes ~8MB
>>> 10000000 becomes 80MB
>>> 100000000, I can't even run because it says "Exception: Memory Allocation failed"
>>
>> Also, when I don't call .finish(true) at the end, it just sits there forever (after running) like one of the threads won't terminate. Requiring a control-C. But the docs and examples don't seem to indicate I should need that...
>
> So I looked into it. It's amap that explodes in RAM.
>
> Per the docs, amap has "less overhead but more memory usage." While map has more overhead but less memory usage and "avoids the need to keep all results in memory."
>
> But, if I make a call to map... it doesn't compile! I get:
>
>     Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map
>
> Simply changing amap to map here:
>
>     sum = taskPool.reduce!(test)
>         (
>         taskPool.map!(monte)(range)
>         );


Actually, I just realized/remembered that the error occurs inside parallelism itself, and MANY times at that:

/usr/include/dmd/phobos/std/parallelism.d(2590): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map
/usr/include/dmd/phobos/std/parallelism.d(2596): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map
/usr/include/dmd/phobos/std/parallelism.d(2616): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map
/usr/include/dmd/phobos/std/parallelism.d(2616): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map
/usr/include/dmd/phobos/std/parallelism.d(2616): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map
/usr/include/dmd/phobos/std/parallelism.d(2616): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map
/usr/include/dmd/phobos/std/parallelism.d(2616): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map
/usr/include/dmd/phobos/std/parallelism.d(2616): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map
/usr/include/dmd/phobos/std/parallelism.d(2626): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map
/usr/include/dmd/phobos/std/parallelism.d(2626): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map
/usr/include/dmd/phobos/std/parallelism.d(2626): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map
/usr/include/dmd/phobos/std/parallelism.d(2626): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map
/usr/include/dmd/phobos/std/parallelism.d(2626): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map
/usr/include/dmd/phobos/std/parallelism.d(2626): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map
/usr/include/dmd/phobos/std/parallelism.d(2634): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map
monte.d(64): Error: template instance std.parallelism.TaskPool.reduce!(test).reduce!(Map) error instantiating

Though I tried looking up the git version of prallelism.d and the lines don't quite line up:

https://github.com/dlang/phobos/blob/master/std/parallelism.d
September 24, 2018
On Monday, 24 September 2018 at 07:13:24 UTC, Chris Katko wrote:
> On Monday, 24 September 2018 at 05:59:20 UTC, Chris Katko wrote:
>>         [...]
>
>
> Actually, I just realized/remembered that the error occurs inside parallelism itself, and MANY times at that:
>
> [...]

This JUST occurred to me. When I use an outer taskPool.[a]map, am I NOT supposed to use the taskPool version of reduce?! But instead, the std.algorithm one?

Because this is running with both/all cores, and only using 2.7MB of RAM:

	sum = taskPool.reduce!(test)(
	map!(monte)(range)   //map, not taskPool.map
	);		

If that's the correct case, the docs did NOT make that obvious!

FYI, I went from ~5200 samples / mSec, to 7490 samples / mSec. 36% difference for second "real" core. Better than nothing, I guess. I'll have to try it on my main machine with a proper CPU.
« First   ‹ Prev
1 2