Thread overview
Question on shared memory concurrency
Mar 04
evilrat
March 04
I tried a shared memory parallel increment.  Yes, it's basically a cache line thrasher, but I wanted to see what's involved in shared memory programming.  Even though I tried to follow all the rules to make true shared memory (not thread local) it appears I failed, as the wait loop at the end only sees its own local 250 million increments?

import core.atomic : atomicFetchAdd;
import std.stdio : writeln;
import std.concurrency : spawn;
import core.time : msecs;
import core.thread : Thread;

const uint NSWEPT = 1_000_000_000;
const uint NCPU = 4;

void
doadd(ref shared(uint) val)
{
    for (uint count = 0; count < NSWEPT/NCPU; ++count) {
        atomicFetchAdd(val, 1);
    }
}

void
main()
{
    shared(uint) val = 0;

    for (int x = 0; x < NCPU-1; ++x) {
        spawn(&doadd, val);
    }
    doadd(val);
    while (val != NSWEPT) {
        Thread.sleep(1.msecs);
    }
}
March 04
A way to do this without spawning threads manually:

```d
import std.parallelism : TaskPool, parallel, taskPool, defaultPoolThreads;
import std.stdio : writeln;
import std.range : iota;

enum NSWEPT = 1_000_000;
enum NCPU = 4;

void main() {
    import core.atomic : atomicLoad, atomicOp;

    shared(uint) value;

    defaultPoolThreads(NCPU);
    TaskPool pool = taskPool();

    foreach(_; pool.parallel(iota(NSWEPT))) {
        atomicOp!"+="(value, 1);
    }

    writeln(pool.size);
    writeln(atomicLoad(value));
}
```

Unfortunately I could only use the default task pool, creating a new one took too long on run.dlang.io.

I also has to decrease NSWEPT because anything larger would take too long.
March 04
On Monday, 4 March 2024 at 03:42:48 UTC, Richard (Rikki) Andrew Cattermole wrote:
> A way to do this without spawning threads manually:
>...

Thank you!  Of course, a thread dispatch per atomic increment is going to be s.l.o.w., so not surprising you had to trim the iterations.

Bug I still hope to be able to share memory between spawned threads, and if it isn't a shared ref of a shared variable, then what would it be?  Do I have to use the memory allocator?
March 04
On Monday, 4 March 2024 at 16:02:50 UTC, Andy Valencia wrote:
> On Monday, 4 March 2024 at 03:42:48 UTC, Richard (Rikki) Andrew Cattermole wrote:
>> A way to do this without spawning threads manually:
>>...
>
> Thank you!  Of course, a thread dispatch per atomic increment is going to be s.l.o.w., so not surprising you had to trim the iterations.
>
> Bug I still hope to be able to share memory between spawned threads, and if it isn't a shared ref of a shared variable, then what would it be?  Do I have to use the memory allocator?

There is `__gshared` type qualifier, but unlike plain `shared` it is up to you to ensure valid concurrency access as stated in the docs.

https://dlang.org/spec/const3.html#shared_global
March 04
On Monday, 4 March 2024 at 16:02:50 UTC, Andy Valencia wrote:
> On Monday, 4 March 2024 at 03:42:48 UTC, Richard (Rikki) Andrew Cattermole wrote:
> ... I still hope to be able to share memory between spawned threads, and if it isn't a shared ref of a shared variable, then what would it be?  Do I have to use the memory allocator?

For any other newbie dlang voyagers, here's a version which works as expected using the system memory allocator.  On my little i7 I get 1.48 secs wallclock with 5.26 CPU seconds.



import core.atomic : atomicFetchAdd;
import std.concurrency : spawn;
import core.time : msecs;
import core.thread : Thread;
import core.memory : GC;

const uint NSWEPT = 100_000_000;
const uint NCPU = 4;

void
doadd(shared uint *buf)
{
    for (uint count = 0; count < NSWEPT/NCPU; ++count) {
        atomicFetchAdd(buf[0], 1);
    }
}

void
main()
{
    shared uint *buf =
        cast(shared uint *)GC.calloc(uint.sizeof * 1, GC.BlkAttr.NO_SCAN);

    for (uint x = 0; x < NCPU-1; ++x) {
        spawn(&doadd, buf);
    }
    doadd(buf);
    while (buf[0] != NSWEPT) {
        Thread.sleep(1.msecs);
    }
}
March 06
On Monday, 4 March 2024 at 18:08:52 UTC, Andy Valencia wrote:
> For any other newbie dlang voyagers, here's a version which works as expected using the system memory allocator.  On my little i7 I get 1.48 secs wallclock with 5.26 CPU seconds.
>...

Using a technique I found in a unit test in std/concurrency.d, I managed to share process memory without GC.  It counted up to 1,000,000,000 on my low end i7 in:

real    0m15.666s
user    0m59.913s
sys     0m0.004s



import core.atomic : atomicFetchAdd;
import std.concurrency : spawn, send, receiveOnly, ownerTid;
import core.thread : Thread;

const uint NSWEPT = 1_000_000_000;
const uint NCPU = 4;

void
doadd()
{
    auto val = receiveOnly!(shared(int)[]);
    for (uint count = 0; count < NSWEPT/NCPU; ++count) {
        atomicFetchAdd(val[0], 1);
    }
    ownerTid.send(true);
}

void
main()
{
    static shared int[] val = new shared(int)[1];

    // Parallel workers
    for (int x = 0; x < NCPU; ++x) {
        auto tid = spawn(&doadd);
        tid.send(val);
    }

    // Pick up all completed workers
    for (int x = 0; x < NCPU; ++x) {
        receiveOnly!(bool);
    }
    assert(val[0] == NSWEPT);
}