Thread overview | ||||||
---|---|---|---|---|---|---|
|
May 17, 2014 std.concurrency thread communication problem | ||||
---|---|---|---|---|
| ||||
I'm building a program which I intend to have many threads that can each send messages to (and receive messages from) each other. The obvious way to do this would be to have a shared array of Tids, but this seems to not work. I'm continually fighting the system to get it to compile, and this makes me think it should probably be done some other way...but what? One possibility is to have each thread maintain a separate array that contains all the threads, which would mean that they would need to be initialized after they were created. This would avoid the problems of shared Tids, but each Tid contains a private mailbox, so this would be being duplicated, and that bothers me...it seems like a poor idea. (Maybe I'm wrong about that...but I don't know.) I do know that I want a n by n communication matrix (leaving out the main thread), with each thread sending messages to all to others. (Well, except for a few that I haven't really defined yet, but which handle separated functions.) My plan was to have each thread run an execution loop which frequently checked for messages received in between performing its own functions. They are not intended to synchronize with each other. They are not intended to be temporary, i.e., each of these threads would be started shortly after program initialization, and continue running until program termination. But how should I get them to know each other's address? I don't want the main thread to need to act as a switchboard between all the others, though I guess that would "sort of" work. (Actually, if I need to do that, that job would be pulled off into yet another thread...and I end up with more threads than processors. Still, that's a design that is possible, IIUC.) Any comments or suggestions? |
May 17, 2014 Re: std.concurrency thread communication problem | ||||
---|---|---|---|---|
| ||||
Posted in reply to Charles Hixson | On Saturday, 17 May 2014 at 18:43:25 UTC, Charles Hixson via Digitalmars-d-learn wrote:
> I'm building a program which I intend to have many threads that can each send
> messages to (and receive messages from) each other. The obvious way to do
> this would be to have a shared array of Tids, but this seems to not work. I'm
> continually fighting the system to get it to compile, and this makes me think
> it should probably be done some other way...but what?
>
> One possibility is to have each thread maintain a separate array that contains
> all the threads, which would mean that they would need to be initialized after
> they were created. This would avoid the problems of shared Tids, but each Tid
> contains a private mailbox, so this would be being duplicated, and that
> bothers me...it seems like a poor idea. (Maybe I'm wrong about that...but I
> don't know.)
If my understanding is correct, each Tid contains a reference to the corresponding thread's MessageBox (implemented by way of MessageBox being a class), not an independent instance. You should be fine to just have an array of the relevant Tids in each thread.
Alternatively, a single __gshared array of threads should work, given you are sufficiently careful with it. Remember, if no-one is doing any writing then you don't need to do any synchronisation of reads.
|
May 17, 2014 Re: std.concurrency thread communication problem | ||||
---|---|---|---|---|
| ||||
Posted in reply to John Colvin | On 05/17/2014 12:33 PM, John Colvin wrote:
> On Saturday, 17 May 2014 at 18:43:25 UTC, Charles Hixson via
> Digitalmars-d-learn wrote:
>> I'm building a program which I intend to have many threads that can
>> each send
>> messages to (and receive messages from) each other. The obvious way
>> to do
>> this would be to have a shared array of Tids, but this seems to not
>> work. I'm
>> continually fighting the system to get it to compile, and this makes
>> me think
>> it should probably be done some other way...but what?
>>
>> One possibility is to have each thread maintain a separate array that
>> contains
>> all the threads, which would mean that they would need to be
>> initialized after
>> they were created. This would avoid the problems of shared Tids, but
>> each Tid
>> contains a private mailbox, so this would be being duplicated, and that
>> bothers me...it seems like a poor idea. (Maybe I'm wrong about
>> that...but I
>> don't know.)
>
> If my understanding is correct, each Tid contains a reference to the
> corresponding thread's MessageBox (implemented by way of MessageBox
> being a class), not an independent instance. You should be fine to just
> have an array of the relevant Tids in each thread.
>
> Alternatively, a single __gshared array of threads should work, given
> you are sufficiently careful with it. Remember, if no-one is doing any
> writing then you don't need to do any synchronisation of reads.
The following is what I've come up with. I had to use a number of shared-related casts.
import std.stdio;
import std.concurrency;
import std.datetime;
import std.random;
import core.thread;
enum threadCount = 5;
enum messagePerThread = 3;
// Represents messages sent to threads to start their tasks
struct Start
{}
// Receives the number (id) of this thread and the workers to send messages to
void workerFunc(size_t id, shared(Tid)[] workers)
{
receiveOnly!Start();
// A local function to reduce code duplication
bool checkMessageForMe(Duration timeout)
{
return receiveTimeout(
timeout,
(size_t from) {
writefln("%s received from %s", id, from);
});
}
// My main task is to send messages to others:
size_t totalSent = 0;
while (totalSent < messagePerThread) {
auto to = uniform(0, workers.length);
// Only send to others; not to self
if (to != id) {
auto chosen = cast(Tid)workers[to];
writefln("%s sending to %s", id, to);
chosen.send(id);
++totalSent;
}
checkMessageForMe(0.seconds);
}
// Process trailing messages sent to me
bool received = false;
do {
received = checkMessageForMe(10.msecs);
} while (received);
}
void main()
{
auto workers = new shared(Tid)[threadCount];
foreach (id; 0 .. threadCount) {
auto worker = spawn(&workerFunc, id, workers);
workers[id] = cast(shared(Tid))worker;
}
foreach (sharedWorker; workers) {
auto worker = cast(Tid)sharedWorker;
worker.send(Start());
}
thread_joinAll();
}
Sample output:
0 sending to 2
4 sending to 3
4 sending to 2
1 sending to 4
3 received from 4
3 sending to 2
0 sending to 1
4 received from 1
1 received from 0
1 sending to 0
0 received from 1
0 sending to 1
1 received from 0
1 sending to 0
0 received from 1
3 sending to 2
4 sending to 2
2 sending to 0
2 received from 0
2 received from 4
3 sending to 1
2 sending to 3
0 received from 2
1 received from 3
2 received from 3
2 sending to 0
3 received from 2
0 received from 2
2 received from 3
2 received from 4
Ali
|
May 18, 2014 Re: std.concurrency thread communication problem | ||||
---|---|---|---|---|
| ||||
Posted in reply to Ali Çehreli | On Saturday, May 17, 2014 12:59:22 PM Ali Çehreli via Digitalmars-d-learn wrote:
> On 05/17/2014 12:33 PM, John Colvin wrote:
> > On Saturday, 17 May 2014 at 18:43:25 UTC, Charles Hixson via
> >
> > Digitalmars-d-learn wrote:
> >> I'm building a program which I intend to have many threads that can
> >> each send
> >> messages to (and receive messages from) each other. The obvious way
> >> to do
> >> this would be to have a shared array of Tids, but this seems to not
> >> work. I'm
> >> continually fighting the system to get it to compile, and this makes
> >> me think
> >> it should probably be done some other way...but what?
> >>
> >> One possibility is to have each thread maintain a separate array that
> >> contains
> >> all the threads, which would mean that they would need to be
> >> initialized after
> >> they were created. This would avoid the problems of shared Tids, but
> >> each Tid
> >> contains a private mailbox, so this would be being duplicated, and that
> >> bothers me...it seems like a poor idea. (Maybe I'm wrong about
> >> that...but I
> >> don't know.)
> >
> > If my understanding is correct, each Tid contains a reference to the corresponding thread's MessageBox (implemented by way of MessageBox being a class), not an independent instance. You should be fine to just have an array of the relevant Tids in each thread.
> >
> > Alternatively, a single __gshared array of threads should work, given you are sufficiently careful with it. Remember, if no-one is doing any writing then you don't need to do any synchronisation of reads.
>
> The following is what I've come up with. I had to use a number of shared-related casts.
>
> import std.stdio;
> import std.concurrency;
> import std.datetime;
> import std.random;
> import core.thread;
>
> enum threadCount = 5;
> enum messagePerThread = 3;
>
> // Represents messages sent to threads to start their tasks
> struct Start
> {}
>
> // Receives the number (id) of this thread and the workers to send
> messages to
> void workerFunc(size_t id, shared(Tid)[] workers)
> {
> receiveOnly!Start();
>
> // A local function to reduce code duplication
> bool checkMessageForMe(Duration timeout)
> {
> return receiveTimeout(
> timeout,
> (size_t from) {
> writefln("%s received from %s", id, from);
> });
> }
>
> // My main task is to send messages to others:
> size_t totalSent = 0;
> while (totalSent < messagePerThread) {
> auto to = uniform(0, workers.length);
>
> // Only send to others; not to self
> if (to != id) {
> auto chosen = cast(Tid)workers[to];
> writefln("%s sending to %s", id, to);
> chosen.send(id);
> ++totalSent;
> }
>
> checkMessageForMe(0.seconds);
> }
>
> // Process trailing messages sent to me
> bool received = false;
> do {
> received = checkMessageForMe(10.msecs);
> } while (received);
> }
>
> void main()
> {
> auto workers = new shared(Tid)[threadCount];
>
> foreach (id; 0 .. threadCount) {
> auto worker = spawn(&workerFunc, id, workers);
> workers[id] = cast(shared(Tid))worker;
> }
>
> foreach (sharedWorker; workers) {
> auto worker = cast(Tid)sharedWorker;
> worker.send(Start());
> }
>
> thread_joinAll();
> }
>
> Sample output:
>
> 0 sending to 2
> 4 sending to 3
> 4 sending to 2
> 1 sending to 4
> 3 received from 4
> 3 sending to 2
> 0 sending to 1
> 4 received from 1
> 1 received from 0
> 1 sending to 0
> 0 received from 1
> 0 sending to 1
> 1 received from 0
> 1 sending to 0
> 0 received from 1
> 3 sending to 2
> 4 sending to 2
> 2 sending to 0
> 2 received from 0
> 2 received from 4
> 3 sending to 1
> 2 sending to 3
> 0 received from 2
> 1 received from 3
> 2 received from 3
> 2 sending to 0
> 3 received from 2
> 0 received from 2
> 2 received from 3
> 2 received from 4
>
> Ali
Thank you immensely. That is precisely the kind of information I was hoping for.
|
Copyright © 1999-2021 by the D Language Foundation