On Sunday, 7 April 2024 at 10:33:08 UTC, Paolo Invernizzi wrote:
> What the way to perform the classic D example:
auto logs = new double[1_000_000];
foreach (i, ref elem; parallel(logs))
{
elem = log(i + 1.0);
}
Are Streams involved?
Thank you
Contrary to P2300, the whenAll
implementation in the concurrency library accepts an array as well.
import concurrency.thread : stdTaskPool;
import concurrency.operations : whenAll;
import concurrency : syncWait;
import std.algorithm : map;
import std.array : array;
auto pool = stdTaskPool(32);
auto scheduler = pool.getScheduler();
auto logs = new double[1_000_000]
logs
.map!((i) =>
just(i)
.then((double i) => log(i + 1.0))
.on(scheduler)
)
.array
.whenAll
.syncWait
.value;
whenAll
should be able to work with ranges that provide a size, but it doesn't yet, therefor a call to .array
is required.
Note this is not quite equivalent to your code, since it doesn't mutate the array in-place. You can do that using:
.map!((ref i) =>
just(&i)
.then((double* i) => *i = log(*i + 1.0))
.on(scheduler)
)
Unfortunately then
isn't smart enough to allow ref double i
, so you have to deal with ugly pointers. Something to improve.
As for streams, they are actually something that I am going to deprecate for the newer Sequence concept.
While that code hasn't been written, the api would allow you to do the following:
auto result = logs
.sequence()
.transform((double i) => log(i + 1.0))
.parallelize(pool.getScheduler)
.toList
.syncWait
.value
For the in-place mutation I suppose we could add a refSequence
(then we don't need .toList
anymore).
Yet another way would be to use an asyncScope
and spawn individual tasks:
import concurrency.thread : stdTaskPool;
import concurrency.operations : whenAll;
import concurrency : syncWait;
import std.algorithm : map;
import std.array : array;
import concurrency.asyncscope;
auto pool = stdTaskPool(32);
auto scheduler = pool.getScheduler();
auto scp = asyncScope();
foreach (i, ref elem; logs) {
scp.spawn(
just(&elem)
.then((double* elem)) => *elem = log(i + 1.0))
.on(scheduler)
);
}
scp.cleanup.syncWait();