Thread overview
Question on shared memory concurrency
Mar 04, 2024
Andy Valencia
Mar 04, 2024
Andy Valencia
Mar 04, 2024
evilrat
Mar 04, 2024
Andy Valencia
Mar 06, 2024
Andy Valencia
March 04, 2024
I tried a shared memory parallel increment.  Yes, it's basically a cache line thrasher, but I wanted to see what's involved in shared memory programming.  Even though I tried to follow all the rules to make true shared memory (not thread local) it appears I failed, as the wait loop at the end only sees its own local 250 million increments?

import core.atomic : atomicFetchAdd;
import std.stdio : writeln;
import std.concurrency : spawn;
import core.time : msecs;
import core.thread : Thread;

const uint NSWEPT = 1_000_000_000;
const uint NCPU = 4;

void
doadd(ref shared(uint) val)
{
    for (uint count = 0; count < NSWEPT/NCPU; ++count) {
        atomicFetchAdd(val, 1);
    }
}

void
main()
{
    shared(uint) val = 0;

    for (int x = 0; x < NCPU-1; ++x) {
        spawn(&doadd, val);
    }
    doadd(val);
    while (val != NSWEPT) {
        Thread.sleep(1.msecs);
    }
}
March 04, 2024
A way to do this without spawning threads manually:

```d
import std.parallelism : TaskPool, parallel, taskPool, defaultPoolThreads;
import std.stdio : writeln;
import std.range : iota;

enum NSWEPT = 1_000_000;
enum NCPU = 4;

void main() {
    import core.atomic : atomicLoad, atomicOp;

    shared(uint) value;

    defaultPoolThreads(NCPU);
    TaskPool pool = taskPool();

    foreach(_; pool.parallel(iota(NSWEPT))) {
        atomicOp!"+="(value, 1);
    }

    writeln(pool.size);
    writeln(atomicLoad(value));
}
```

Unfortunately I could only use the default task pool, creating a new one took too long on run.dlang.io.

I also has to decrease NSWEPT because anything larger would take too long.
March 04, 2024
On Monday, 4 March 2024 at 03:42:48 UTC, Richard (Rikki) Andrew Cattermole wrote:
> A way to do this without spawning threads manually:
>...

Thank you!  Of course, a thread dispatch per atomic increment is going to be s.l.o.w., so not surprising you had to trim the iterations.

Bug I still hope to be able to share memory between spawned threads, and if it isn't a shared ref of a shared variable, then what would it be?  Do I have to use the memory allocator?
March 04, 2024
On Monday, 4 March 2024 at 16:02:50 UTC, Andy Valencia wrote:
> On Monday, 4 March 2024 at 03:42:48 UTC, Richard (Rikki) Andrew Cattermole wrote:
>> A way to do this without spawning threads manually:
>>...
>
> Thank you!  Of course, a thread dispatch per atomic increment is going to be s.l.o.w., so not surprising you had to trim the iterations.
>
> Bug I still hope to be able to share memory between spawned threads, and if it isn't a shared ref of a shared variable, then what would it be?  Do I have to use the memory allocator?

There is `__gshared` type qualifier, but unlike plain `shared` it is up to you to ensure valid concurrency access as stated in the docs.

https://dlang.org/spec/const3.html#shared_global
March 04, 2024
On Monday, 4 March 2024 at 16:02:50 UTC, Andy Valencia wrote:
> On Monday, 4 March 2024 at 03:42:48 UTC, Richard (Rikki) Andrew Cattermole wrote:
> ... I still hope to be able to share memory between spawned threads, and if it isn't a shared ref of a shared variable, then what would it be?  Do I have to use the memory allocator?

For any other newbie dlang voyagers, here's a version which works as expected using the system memory allocator.  On my little i7 I get 1.48 secs wallclock with 5.26 CPU seconds.



import core.atomic : atomicFetchAdd;
import std.concurrency : spawn;
import core.time : msecs;
import core.thread : Thread;
import core.memory : GC;

const uint NSWEPT = 100_000_000;
const uint NCPU = 4;

void
doadd(shared uint *buf)
{
    for (uint count = 0; count < NSWEPT/NCPU; ++count) {
        atomicFetchAdd(buf[0], 1);
    }
}

void
main()
{
    shared uint *buf =
        cast(shared uint *)GC.calloc(uint.sizeof * 1, GC.BlkAttr.NO_SCAN);

    for (uint x = 0; x < NCPU-1; ++x) {
        spawn(&doadd, buf);
    }
    doadd(buf);
    while (buf[0] != NSWEPT) {
        Thread.sleep(1.msecs);
    }
}
March 06, 2024
On Monday, 4 March 2024 at 18:08:52 UTC, Andy Valencia wrote:
> For any other newbie dlang voyagers, here's a version which works as expected using the system memory allocator.  On my little i7 I get 1.48 secs wallclock with 5.26 CPU seconds.
>...

Using a technique I found in a unit test in std/concurrency.d, I managed to share process memory without GC.  It counted up to 1,000,000,000 on my low end i7 in:

real    0m15.666s
user    0m59.913s
sys     0m0.004s



import core.atomic : atomicFetchAdd;
import std.concurrency : spawn, send, receiveOnly, ownerTid;
import core.thread : Thread;

const uint NSWEPT = 1_000_000_000;
const uint NCPU = 4;

void
doadd()
{
    auto val = receiveOnly!(shared(int)[]);
    for (uint count = 0; count < NSWEPT/NCPU; ++count) {
        atomicFetchAdd(val[0], 1);
    }
    ownerTid.send(true);
}

void
main()
{
    static shared int[] val = new shared(int)[1];

    // Parallel workers
    for (int x = 0; x < NCPU; ++x) {
        auto tid = spawn(&doadd);
        tid.send(val);
    }

    // Pick up all completed workers
    for (int x = 0; x < NCPU; ++x) {
        receiveOnly!(bool);
    }
    assert(val[0] == NSWEPT);
}