Thread overview | ||||||||
---|---|---|---|---|---|---|---|---|
|
March 04 Question on shared memory concurrency | ||||
---|---|---|---|---|
| ||||
I tried a shared memory parallel increment. Yes, it's basically a cache line thrasher, but I wanted to see what's involved in shared memory programming. Even though I tried to follow all the rules to make true shared memory (not thread local) it appears I failed, as the wait loop at the end only sees its own local 250 million increments? import core.atomic : atomicFetchAdd; import std.stdio : writeln; import std.concurrency : spawn; import core.time : msecs; import core.thread : Thread; const uint NSWEPT = 1_000_000_000; const uint NCPU = 4; void doadd(ref shared(uint) val) { for (uint count = 0; count < NSWEPT/NCPU; ++count) { atomicFetchAdd(val, 1); } } void main() { shared(uint) val = 0; for (int x = 0; x < NCPU-1; ++x) { spawn(&doadd, val); } doadd(val); while (val != NSWEPT) { Thread.sleep(1.msecs); } } |
March 04 Re: Question on shared memory concurrency | ||||
---|---|---|---|---|
| ||||
Posted in reply to Andy Valencia | A way to do this without spawning threads manually: ```d import std.parallelism : TaskPool, parallel, taskPool, defaultPoolThreads; import std.stdio : writeln; import std.range : iota; enum NSWEPT = 1_000_000; enum NCPU = 4; void main() { import core.atomic : atomicLoad, atomicOp; shared(uint) value; defaultPoolThreads(NCPU); TaskPool pool = taskPool(); foreach(_; pool.parallel(iota(NSWEPT))) { atomicOp!"+="(value, 1); } writeln(pool.size); writeln(atomicLoad(value)); } ``` Unfortunately I could only use the default task pool, creating a new one took too long on run.dlang.io. I also has to decrease NSWEPT because anything larger would take too long. |
March 04 Re: Question on shared memory concurrency | ||||
---|---|---|---|---|
| ||||
Posted in reply to Richard (Rikki) Andrew Cattermole | On Monday, 4 March 2024 at 03:42:48 UTC, Richard (Rikki) Andrew Cattermole wrote:
> A way to do this without spawning threads manually:
>...
Thank you! Of course, a thread dispatch per atomic increment is going to be s.l.o.w., so not surprising you had to trim the iterations.
Bug I still hope to be able to share memory between spawned threads, and if it isn't a shared ref of a shared variable, then what would it be? Do I have to use the memory allocator?
|
March 04 Re: Question on shared memory concurrency | ||||
---|---|---|---|---|
| ||||
Posted in reply to Andy Valencia | On Monday, 4 March 2024 at 16:02:50 UTC, Andy Valencia wrote: > On Monday, 4 March 2024 at 03:42:48 UTC, Richard (Rikki) Andrew Cattermole wrote: >> A way to do this without spawning threads manually: >>... > > Thank you! Of course, a thread dispatch per atomic increment is going to be s.l.o.w., so not surprising you had to trim the iterations. > > Bug I still hope to be able to share memory between spawned threads, and if it isn't a shared ref of a shared variable, then what would it be? Do I have to use the memory allocator? There is `__gshared` type qualifier, but unlike plain `shared` it is up to you to ensure valid concurrency access as stated in the docs. https://dlang.org/spec/const3.html#shared_global |
March 04 Re: Question on shared memory concurrency | ||||
---|---|---|---|---|
| ||||
Posted in reply to Andy Valencia | On Monday, 4 March 2024 at 16:02:50 UTC, Andy Valencia wrote:
> On Monday, 4 March 2024 at 03:42:48 UTC, Richard (Rikki) Andrew Cattermole wrote:
> ... I still hope to be able to share memory between spawned threads, and if it isn't a shared ref of a shared variable, then what would it be? Do I have to use the memory allocator?
For any other newbie dlang voyagers, here's a version which works as expected using the system memory allocator. On my little i7 I get 1.48 secs wallclock with 5.26 CPU seconds.
import core.atomic : atomicFetchAdd;
import std.concurrency : spawn;
import core.time : msecs;
import core.thread : Thread;
import core.memory : GC;
const uint NSWEPT = 100_000_000;
const uint NCPU = 4;
void
doadd(shared uint *buf)
{
for (uint count = 0; count < NSWEPT/NCPU; ++count) {
atomicFetchAdd(buf[0], 1);
}
}
void
main()
{
shared uint *buf =
cast(shared uint *)GC.calloc(uint.sizeof * 1, GC.BlkAttr.NO_SCAN);
for (uint x = 0; x < NCPU-1; ++x) {
spawn(&doadd, buf);
}
doadd(buf);
while (buf[0] != NSWEPT) {
Thread.sleep(1.msecs);
}
}
|
March 06 Re: Question on shared memory concurrency | ||||
---|---|---|---|---|
| ||||
Posted in reply to Andy Valencia | On Monday, 4 March 2024 at 18:08:52 UTC, Andy Valencia wrote:
> For any other newbie dlang voyagers, here's a version which works as expected using the system memory allocator. On my little i7 I get 1.48 secs wallclock with 5.26 CPU seconds.
>...
Using a technique I found in a unit test in std/concurrency.d, I managed to share process memory without GC. It counted up to 1,000,000,000 on my low end i7 in:
real 0m15.666s
user 0m59.913s
sys 0m0.004s
import core.atomic : atomicFetchAdd;
import std.concurrency : spawn, send, receiveOnly, ownerTid;
import core.thread : Thread;
const uint NSWEPT = 1_000_000_000;
const uint NCPU = 4;
void
doadd()
{
auto val = receiveOnly!(shared(int)[]);
for (uint count = 0; count < NSWEPT/NCPU; ++count) {
atomicFetchAdd(val[0], 1);
}
ownerTid.send(true);
}
void
main()
{
static shared int[] val = new shared(int)[1];
// Parallel workers
for (int x = 0; x < NCPU; ++x) {
auto tid = spawn(&doadd);
tid.send(val);
}
// Pick up all completed workers
for (int x = 0; x < NCPU; ++x) {
receiveOnly!(bool);
}
assert(val[0] == NSWEPT);
}
|
Copyright © 1999-2021 by the D Language Foundation