Thread overview
Structured Concurrency vs P2300R7
April 07

I've seen again Sebastiaan talk about structured concurrency, I know it's not a verbatim port of P2300R7, so trying to grok it I started to code in D some P2300R7 examples.

The second example is basically some parallel computation over an array on doubles, powered by the execution::bulk adaptor, basically one way to execute one callable multiple times in parallel over an input.

What is confusing me is that the Stream concept is not present in P2300, so I'm not sure about one basic thing, how to span parallel computations and reduce the results.

What's the correct way to instantiate 'n' senders to execute a callable via a threadPool scheduler, and pass them as 'whenAll' parameter?

What the way to perform the classic D example:

auto logs = new double[1_000_000];

foreach (i, ref elem; parallel(logs))
{
    elem = log(i + 1.0);
}

Are Streams involved?

Thank you

April 07

On Sunday, 7 April 2024 at 10:33:08 UTC, Paolo Invernizzi wrote:

>

What the way to perform the classic D example:

auto logs = new double[1_000_000];

foreach (i, ref elem; parallel(logs))
{
    elem = log(i + 1.0);
}

Are Streams involved?

Thank you

Contrary to P2300, the whenAll implementation in the concurrency library accepts an array as well.

import concurrency.thread : stdTaskPool;
import concurrency.operations : whenAll;
import concurrency : syncWait;
import std.algorithm : map;
import std.array : array;

auto pool = stdTaskPool(32);
auto scheduler = pool.getScheduler();
auto logs = new double[1_000_000]

logs
    .map!((i) =>
        just(i)
            .then((double i) => log(i + 1.0))
            .on(scheduler)
        )
    .array
    .whenAll
    .syncWait
    .value;

whenAll should be able to work with ranges that provide a size, but it doesn't yet, therefor a call to .array is required.

Note this is not quite equivalent to your code, since it doesn't mutate the array in-place. You can do that using:

    .map!((ref i) =>
        just(&i)
            .then((double* i) => *i = log(*i + 1.0))
            .on(scheduler)
        )

Unfortunately then isn't smart enough to allow ref double i, so you have to deal with ugly pointers. Something to improve.


As for streams, they are actually something that I am going to deprecate for the newer Sequence concept.

While that code hasn't been written, the api would allow you to do the following:

auto result = logs
    .sequence()
    .transform((double i) => log(i + 1.0))
    .parallelize(pool.getScheduler)
    .toList
    .syncWait
    .value

For the in-place mutation I suppose we could add a refSequence (then we don't need .toList anymore).

Yet another way would be to use an asyncScope and spawn individual tasks:

import concurrency.thread : stdTaskPool;
import concurrency.operations : whenAll;
import concurrency : syncWait;
import std.algorithm : map;
import std.array : array;
import concurrency.asyncscope;

auto pool = stdTaskPool(32);
auto scheduler = pool.getScheduler();
auto scp = asyncScope();

foreach (i, ref elem; logs) {
    scp.spawn(
        just(&elem)
            .then((double* elem)) => *elem = log(i + 1.0))
            .on(scheduler)
    );
}

scp.cleanup.syncWait();
April 08

On Sunday, 7 April 2024 at 17:48:43 UTC, Sebastiaan Koppe wrote:

>

On Sunday, 7 April 2024 at 10:33:08 UTC, Paolo Invernizzi wrote:

>

[...]

Contrary to P2300, the whenAll implementation in the concurrency library accepts an array as well.

[...]

Thank you Sebastiaan for the explanations, I missed that whenAll accepts array too.

So I will avoid to dig into Streams, waiting for Sequence, and yes I think that 'parallelize' is the right sugar to add.

In the meantime, the trio Sender / Receiver / Scheduler should be sufficient to play with a lot of things.

/P