Jump to page: 1 2
Thread overview
December 02

How can I implement a program where I have a global integer variable g, and three threads: the first thread increments g by 1, the second thread increments g by 2, and the third thread increments g by 3? Additionally, while these threads are running, I should be able to access the value of g both inside and outside the threads.

December 02

On Monday, 2 December 2024 at 02:02:56 UTC, Ritina wrote:

>

How can I implement a program where I have a global integer variable g, and three threads: the first thread increments g by 1, the second thread increments g by 2, and the third thread increments g by 3? Additionally, while these threads are running, I should be able to access the value of g both inside and outside the threads.

Here's my own shared memory across multiple threads sample program, It does a fine job of thrashing that cache line!

import core.atomic : atomicFetchAdd;
import std.concurrency : spawn;
import core.time : msecs;
import core.thread : Thread;
import core.memory : GC;

const uint NSWEPT = 100_000_000;
const uint NCPU = 4;

void
doadd(shared uint *buf)
{
    for (uint count = 0; count < NSWEPT/NCPU; ++count) {
        atomicFetchAdd(buf[0], 1);
    }
}

void
main()
{
    shared uint *buf =
        cast(shared uint *)GC.calloc(uint.sizeof * 1, GC.BlkAttr.NO_SCAN);

    for (uint x = 0; x < NCPU-1; ++x) {
        spawn(&doadd, buf);
    }
    doadd(buf);
    while (buf[0] != NSWEPT) {
        Thread.sleep(1.msecs);
    }
}
December 01
On 12/1/24 6:23 PM, Andy Valencia wrote:
> On Monday, 2 December 2024 at 02:02:56 UTC, Ritina wrote:
>> How can I implement a program where I have a global integer variable g, and three threads: the first thread increments g by 1, the second thread increments g by 2, and the third thread increments g by 3? Additionally, while these threads are running, I should be able to access the value of g both inside and outside the threads.
> 
> Here's my own shared memory across multiple threads sample program,  It does a fine job of thrashing that cache line!
> 
> ```
> import core.atomic : atomicFetchAdd;
> import std.concurrency : spawn;
> import core.time : msecs;
> import core.thread : Thread;
> import core.memory : GC;
> 
> const uint NSWEPT = 100_000_000;
> const uint NCPU = 4;
> 
> void
> doadd(shared uint *buf)
> {
>      for (uint count = 0; count < NSWEPT/NCPU; ++count) {
>          atomicFetchAdd(buf[0], 1);
>      }
> }
> 
> void
> main()
> {
>      shared uint *buf =
>          cast(shared uint *)GC.calloc(uint.sizeof * 1, GC.BlkAttr.NO_SCAN);
> 
>      for (uint x = 0; x < NCPU-1; ++x) {
>          spawn(&doadd, buf);
>      }
>      doadd(buf);
>      while (buf[0] != NSWEPT) {
>          Thread.sleep(1.msecs);
>      }
> }
> ```
> 

And here's mine which has interesting amount of differences:

import core.atomic;
import core.thread;
import std.concurrency;
import std.conv;
import std.stdio;

// This is the variable that will be incremented collectively.
shared int g;

// This is the variable that will signal the threads to stop.
shared bool stopRequested;

// This is the thread function.
void modify(int increment) {
    while (!stopRequested) {
        g.atomicOp!"+="(increment);
    }
}

void main() {
    // Spawn some threads.
    enum totalThreads = 3;

    foreach (i; 0 .. totalThreads) {
        const increment = (i + 1).to!int;
        spawnLinked(&modify, increment);
    }

    // Wait for a while to request them to stop.
    Thread.sleep(2.seconds);
    stopRequested = true;

    // Wait until all threads are stopped.
    size_t stopped = 0;
    while (stopped != totalThreads) {
        receive((LinkTerminated _) {
                stopped++;
            });
    }

    // Print the final value of g.
    writefln!"%,s"(g);
}

I am not sure whether

  stopRequested = true

is correct even when there is a single writer of that variable. There are several other methods of communicating the request. I chose that one for this example.

Ali

December 02
On Monday, 2 December 2024 at 02:29:39 UTC, Ali Çehreli wrote:
> I am not sure whether
>
>   stopRequested = true
>
> is correct even when there is a single writer of that variable. There are several other methods of communicating the request. I chose that one for this example.

I notice core.atomic has an atomicLoad() / atomicStore() pair of APIs which might be the "canonical" way to code in that fashion.

Andy

December 02

On Monday, 2 December 2024 at 02:02:56 UTC, Ritina wrote:

>

How can I implement a program where I have a global integer variable g, and three threads: the first thread increments g by 1, the second thread increments g by 2, and the third thread increments g by 3? Additionally, while these threads are running, I should be able to access the value of g both inside and outside the threads.

When we ask claude.ai what is to be done, we get a very clear and great answer. This shows that D is an easy language. Yes, if the result is 600, each thread has increased it 100 times individually:

import std.conv, std.stdio;
import core.sync.mutex;
import core.atomic;
import core.thread;

struct GlobalCounter
{
    // Mutex to protect shared access to the global variable
    private Mutex mutex;
    // The global integer variable to be incremented
    private shared int gInt;

    // Method to safely increment the global variable
    void increment(int value)
    {
        synchronized(mutex)
        {
            gInt.atomicOp!"+="(value);
        }
    }

    // Getter method to safely read the global variable
    string toString()
    {
        synchronized(mutex)
        {
            return gInt.to!string;
        }
    }
}

void main()
{
    // Create a shared counter instance
    GlobalCounter counter;

    // Initialize the mutex
    counter.mutex = new Mutex();

    // Create three threads with different increment values
    auto thread1 = new Thread(()
    {
        for (int i = 0; i < 100; i++)
        {
            counter.increment(1);
        }
    });

    auto thread2 = new Thread(()
    {
        for (int i = 0; i < 100; i++)
        {
            counter.increment(2);
        }
    });

    auto thread3 = new Thread(()
    {
        for (int i = 0; i < 100; i++)
        {
            counter.increment(3);
        }
    });

    // Start all threads
    thread1.start();
    thread2.start();
    thread3.start();

    // Wait for all threads to complete
    thread1.join();
    thread2.join();
    thread3.join();

    // Print the final value
    counter.writefln!"Final value: %s";
}

SDB@79

December 02

On Monday, 2 December 2024 at 02:29:39 UTC, Ali Çehreli wrote:

>

...
There are several other methods of communicating the request..

I've slightly edited the code that the AI generates. Ali, how is it now, can we say that it is the best method?

import core.thread;
import core.atomic;
import core.sync.mutex;

struct GlobalCounter
{
    private Mutex mutex;
    private shared int gInt;

    void initialize(int initialValue = 0)
    {
        mutex = new Mutex();
        gInt = initialValue;
    }

    void increment(int value)
    {
        synchronized(mutex)
        {
            gInt.atomicOp!"+="(value);
        }
    }

    auto getValue()
    {
        synchronized(mutex)
        {
            return gInt;
        }
    }
}

enum INC = 100;
void incrementInThread(int incrementValue)
{
    for (int i = 0; i < INC; i++)
    {
        globalCounter.increment(incrementValue);
    }
}

__gshared GlobalCounter globalCounter;

void main()
{
    globalCounter.initialize(41);

    auto threads = [
      new Thread(() { incrementInThread(1); }),
      new Thread(() { incrementInThread(2); }),
      new Thread(() { incrementInThread(3); })
    ];

    foreach (ref thread; threads)
    {
      thread.start();
      thread.join();
    }

    assert(globalCounter.getValue == 641);
}

SDB@79

December 02
You don't need both atomics an mutex's, pick one.
December 02

On Monday, 2 December 2024 at 08:00:40 UTC, Richard (Rikki) Andrew Cattermole wrote:

>

You don't need both atomics an mutex's, pick one.

The compiler wants us to use atomicOp; If you want, take it out of the synchronized(mutex) { } block, it doesn't matter:

>

onlineapp.d(20): Error: read-modify-write operations are not allowed for shared variables
onlineapp.d(20): Use core.atomic.atomicOp!"+="(this.gInt, value) instead

SDB@79

December 02
On 12/2/24 7:18 AM, Salih Dincer wrote:
> On Monday, 2 December 2024 at 08:00:40 UTC, Richard (Rikki) Andrew
> Cattermole wrote:
>> You don't need both atomics an mutex's, pick one.
>
> The compiler wants us to use atomicOp; If you want, take it out of the
> synchronized(mutex) { } block, it doesn't matter:

Then use atomicOp. You don't need any mutex for this problem, especially when they can make this program very slow. Imagine how a thread will have to relinquish its execution time to wait for a mutex in order to increment a simple int. The CPU's atomic operation primitives already achieve that.

Another general reason for avoiding a mutex is that they are low level primitives, which should be needed only in special cases when existing solution that are already based on them don't work for your problem for some reason. For example, std.parallelism and std.concurrency already use features like mutexes but perhaps they can't be used for some reason.

Ali

December 03
On 03/12/2024 4:18 AM, Salih Dincer wrote:
> On Monday, 2 December 2024 at 08:00:40 UTC, Richard (Rikki) Andrew Cattermole wrote:
>> You don't need both atomics an mutex's, pick one.
> 
> The compiler wants us to use atomicOp; If you want, take it out of the synchronized(mutex) { } block, it doesn't matter:

That is because you used ``shared``.

As a type qualifier/storage class, ``shared`` should be called ``atomic``.

If you use it to indicate anything other than the variable can only be accessed/mutated via atomic operations, you are at best lieing to yourself about the native memory model.

All memory is owned by the process, until proven otherwise. Which is the exact opposite of what ``shared`` implies.

« First   ‹ Prev
1 2