March 22, 2012
Le 22/03/2012 16:50, David Nadlinger a écrit :
> On Thursday, 22 March 2012 at 15:14:31 UTC, deadalnix wrote:
>> They will, even with different boxes.
>
> Similarly succinct: How so?

They will because of synchronization. This cost will not be transparent. Not that it is a bad idea, but it depends what it meant by no interference.
March 22, 2012
On Mar 21, 2012, at 9:12 PM, "Nathan M. Swan" <nathanmswan@gmail.com> wrote:

> On Wednesday, 21 March 2012 at 03:37:35 UTC, Nathan M. Swan wrote:
>> After playing around with making a library with uses threads, I realized it would be nice if there could be multiple inter-thread mailboxes than just one per thread. That way, client code and third-party library code don't interfere with each other.
>> 
>> So this is my proposal: that std.concurrency is modified so that class MessageBox is public and MessageBoxs be passed around through other MessageBoxs, or perhaps another class is devised for this.
>> 
>> I'm not sure of the implications of this, though I know I would find it very useful by allowing me to write code without ("import core."~someLowLevelModule)
> 
> After thinking about this more, this is my proposal for the API. Backward compatibility is more important here than in other places because it is documented in TDPL.
> 
> Each Tid contains a default MessageQueue accessible by property messageQueue.
> 
> The send/receive functions and their variants operate on the tid's default messageQueue.
> 
> The interface for class MessageQueue:
> 
> void send(T...)(T vals);
> void prioritySend(T...)(T vals);
> void receive(T...)(T ops);
> receiveOnlyRet!(T) receiveOnly(T...)(T ops);
> bool receiveTimeout(T...)(Duration duration, T ops);
> @property void maxSize(size_t size);
> @property void onCrowding(OnCrowding oc);
> @property void onCrowding(bool function(MessageQueue) doThis);
> 
> Hopefully, this won't break any current code, and it will be easy to implement.

I can see adapting the API so that each thread has a default message queue (keep in mind that we'll be adding interprocess messaging at some point via the same routines). I'm not yet clear how the existence of alternate message queues could be communicated to other portions of the code though. register() is one way I suppose. Really what's happening here is that Tid is being replaced by a queue ID, not extended with a mutable variable.

I guess Tid would become an alias for the Qid created when a thread is spawned. What I really don't want though, is for receive() to operate on a message queue created in a different thread. Messaging would become substantially slower if receive() had to be synchronized.
March 22, 2012
On Mar 22, 2012, at 8:49 AM, David Nadlinger wrote:

> On Thursday, 22 March 2012 at 00:30:51 UTC, Nathan M. Swan wrote:
>> As I posted a while back, the concept of a variant message queue is wonderful and powerful, and the implementation is great. But the fact that you can't declare "auto mq = new MessageQueue()" is a gaping whole in an otherwise A+ API.
> 
> Nice to hear that I'm not alone with that opinion – I hit the same problem when implementing a request log file writer for Thrift, which runs in a separate thread and communicates with the main thread via a message queue.
> 
> It works great and performed even better than a similar C++ version due to lower lock contention, but the user'd better not try to call receiveOnly!() from »his« thread…

And this is why I've never used receiveOnly.  A simple way to receive a specific message type is great, but throwing if there's anything else in the queue isn't generally what you want, as it assumes complete knowledge of all messages received by the application.  Perhaps what's needed here is something semantically like receiveOnly that doesn't throw?
March 22, 2012
On Thursday, 22 March 2012 at 15:53:56 UTC, Sean Kelly wrote:
> I can see adapting the API so that each thread has a default message queue (keep in mind that we'll be adding interprocess messaging at some point via the same routines). I'm not yet clear how the existence of alternate message queues could be communicated to other portions of the code though. register() is one way I suppose. Really what's happening here is that Tid is being replaced by a queue ID, not extended with a mutable variable.

I think they would be passed as parameters to spawn or received from the default message queue.

> I guess Tid would become an alias for the Qid created when a thread is spawned. What I really don't want though, is for receive() to operate on a message queue created in a different thread. Messaging would become substantially slower if receive() had to be synchronized.

That's a drawback I haven't considered. To solve this, it would be made part of the contract that receiving must all be done in one thread.

I can't think of a use where receiving in multiple threads would apply, but if it would, a SynchronizedMessageQueue subclass could easily be drawn up that broadens the contract and synchronizes for receive().

BTW, how do you unittest just the std.concurrency module?

March 22, 2012
On Mar 22, 2012, at 12:06 PM, "Nathan M. Swan" <nathanmswan@gmail.com> wrote:

> On Thursday, 22 March 2012 at 15:53:56 UTC, Sean Kelly wrote:
>> I can see adapting the API so that each thread has a default message queue (keep in mind that we'll be adding interprocess messaging at some point via the same routines). I'm not yet clear how the existence of alternate message queues could be communicated to other portions of the code though. register() is one way I suppose. Really what's happening here is that Tid is being replaced by a queue ID, not extended with a mutable variable.
> 
> I think they would be passed as parameters to spawn or received from the default message queue.

But will either of those solve the problem you outlined where user code is calling receiveOnly and bumping into a message meant for a third-party API?  If the API is spawning threads they typically won't be running user code, or at least would certainly impose restrictions on message queue use by called user code. And in the case of sending the Qid to the default queue, you end up with a race condition where user code might call receiveOnly.

>> I guess Tid would become an alias for the Qid created when a thread is spawned. What I really don't want though, is for receive() to operate on a message queue created in a different thread. Messaging would become substantially slower if receive() had to be synchronized.
> 
> That's a drawback I haven't considered. To solve this, it would be made part of the contract that receiving must all be done in one thread.
> 
> I can't think of a use where receiving in multiple threads would apply, but if it would, a SynchronizedMessageQueue subclass could easily be drawn up that broadens the contract and synchronizes for receive().
> 
> BTW, how do you unittest just the std.concurrency module?

Not easily, since a failure often means that a thread hangs.
March 22, 2012
On Thursday, 22 March 2012 at 21:27:40 UTC, Sean Kelly wrote:
> On Mar 22, 2012, at 12:06 PM, "Nathan M. Swan" <nathanmswan@gmail.com> wrote:
>
>> On Thursday, 22 March 2012 at 15:53:56 UTC, Sean Kelly wrote:
>>> I can see adapting the API so that each thread has a default message queue (keep in mind that we'll be adding interprocess messaging at some point via the same routines). I'm not yet clear how the existence of alternate message queues could be communicated to other portions of the code though. register() is one way I suppose. Really what's happening here is that Tid is being replaced by a queue ID, not extended with a mutable variable.
>> 
>> I think they would be passed as parameters to spawn or received from the default message queue.
>
> But will either of those solve the problem you outlined where user code is calling receiveOnly and bumping into a message meant for a third-party API?  If the API is spawning threads they typically won't be running user code, or at least would certainly impose restrictions on message queue use by called user code. And in the case of sending the Qid to the default queue, you end up with a race condition where user code might call receiveOnly.
>

But what if the client spawns threads?

An example would be with a desktop GUI. In a background thread meant for a CPU-intensive task, they want to update a progress indicator and send partially-calculated data to a main-thread.

void mainThread() {
    string data;
    auto mq = new MessageQueue();
    spawn(&backgroundThread, mq, pi);
    pi.onChange = (double val) {
        if (val == 0.5) {
            data = me.receiveOnly!string();
        } else {
            data ~= me.receiveOnly!string();
        }
    };
}

void backgroundThread(MessageQueue me, ProgressIndicator pi) {
    // part 1 of calculations...
    me.send(partiallyCalculatedData);
    pi.value = 0.5; // implementation: this._queue.send(UpdateValue(value))
    // part 2...
    me.send(theRestOfTheData);
    pi.value = 1.0;
}

With one MessageQueue per thread, the mailbox would contain a (string, UpdateValue, string, UpdateValue). The mainThread would expect a (UpdateValue, string, UpdateValue, string).

This way, the client code is separated from the library.

The default queue is an idea suggested for backward compatibility, and new programmers wouldn't be encouraged to use it.

>>> I guess Tid would become an alias for the Qid created when a thread is spawned. What I really don't want though, is for receive() to operate on a message queue created in a different thread. Messaging would become substantially slower if receive() had to be synchronized.
>> 
>> That's a drawback I haven't considered. To solve this, it would be made part of the contract that receiving must all be done in one thread.
>> 
>> I can't think of a use where receiving in multiple threads would apply, but if it would, a SynchronizedMessageQueue subclass could easily be drawn up that broadens the contract and synchronizes for receive().
>> 
>> BTW, how do you unittest just the std.concurrency module?
>
> Not easily, since a failure often means that a thread hangs.

Linking fails (I'm on OSX):

$ rdmd --main -unittest std/concurrency.d
Undefined symbols for architecture x86_64:
  "_D3std3utf10strideImplFNaNeamZk", referenced from:
      _D3std3utf15__T6strideTAxaZ6strideFNaNfxAamZk in concurrency.d.o
      _D3std3utf14__T6strideTAaZ6strideFNaNfxAamZk in concurrency.d.o
      _D3std3utf15__T6strideTAyaZ6strideFNaNfxAyamZk in concurrency.d.o
ld: symbol(s) not found for architecture x86_64
collect2: ld returned 1 exit status
--- errorlevel 1

Thanks, NMS

March 23, 2012
On Mar 22, 2012, at 4:01 PM, Nathan M. Swan wrote:

> On Thursday, 22 March 2012 at 21:27:40 UTC, Sean Kelly wrote:
>> On Mar 22, 2012, at 12:06 PM, "Nathan M. Swan" <nathanmswan@gmail.com> wrote:
>> 
>>> On Thursday, 22 March 2012 at 15:53:56 UTC, Sean Kelly wrote:
>>>> I can see adapting the API so that each thread has a default message queue (keep in mind that we'll be adding interprocess messaging at some point via the same routines). I'm not yet clear how the existence of alternate message queues could be communicated to other portions of the code though. register() is one way I suppose. Really what's happening here is that Tid is being replaced by a queue ID, not extended with a mutable variable.
>>> I think they would be passed as parameters to spawn or received from the default message queue.
>> 
>> But will either of those solve the problem you outlined where user code is calling receiveOnly and bumping into a message meant for a third-party API?  If the API is spawning threads they typically won't be running user code, or at least would certainly impose restrictions on message queue use by called user code. And in the case of sending the Qid to the default queue, you end up with a race condition where user code might call receiveOnly.
>> 
> 
> But what if the client spawns threads?
> 
> An example would be with a desktop GUI. In a background thread meant for a CPU-intensive task, they want to update a progress indicator and send partially-calculated data to a main-thread.
> 
> void mainThread() {
>    string data;
>    auto mq = new MessageQueue();
>    spawn(&backgroundThread, mq, pi);
>    pi.onChange = (double val) {
>        if (val == 0.5) {
>            data = me.receiveOnly!string();
>        } else {
>            data ~= me.receiveOnly!string();
>        }
>    };
> }
> 
> void backgroundThread(MessageQueue me, ProgressIndicator pi) {
>    // part 1 of calculations...
>    me.send(partiallyCalculatedData);
>    pi.value = 0.5; // implementation: this._queue.send(UpdateValue(value))
>    // part 2...
>    me.send(theRestOfTheData);
>    pi.value = 1.0;
> }
> 
> With one MessageQueue per thread, the mailbox would contain a (string, UpdateValue, string, UpdateValue). The mainThread would expect a (UpdateValue, string, UpdateValue, string).

While sending messages like a bare string might be good for example code, any real application is going to use structured messages whose type is specific to what the message is for, contains fields like sender Tid, etc.  It seems like you're aiming more for CSP where you'd create a separate communication channel per use.  You could even fake it by wrapping send/receive with your own CSP-like API, though it's quite likely that a from-scratch CSP style implementation would be faster because there'd be no need to package messages.


>>>> I guess Tid would become an alias for the Qid created when a thread is spawned. What I really don't want though, is for receive() to operate on a message queue created in a different thread. Messaging would become substantially slower if receive() had to be synchronized.
>>> That's a drawback I haven't considered. To solve this, it would be made part of the contract that receiving must all be done in one thread.
>>> I can't think of a use where receiving in multiple threads would apply, but if it would, a SynchronizedMessageQueue subclass could easily be drawn up that broadens the contract and synchronizes for receive().
>>> BTW, how do you unittest just the std.concurrency module?
>> 
>> Not easily, since a failure often means that a thread hangs.
> 
> Linking fails (I'm on OSX):
> 
> $ rdmd --main -unittest std/concurrency.d
> Undefined symbols for architecture x86_64:
>  "_D3std3utf10strideImplFNaNeamZk", referenced from:
>      _D3std3utf15__T6strideTAxaZ6strideFNaNfxAamZk in concurrency.d.o
>      _D3std3utf14__T6strideTAaZ6strideFNaNfxAamZk in concurrency.d.o
>      _D3std3utf15__T6strideTAyaZ6strideFNaNfxAyamZk in concurrency.d.o
> ld: symbol(s) not found for architecture x86_64
> collect2: ld returned 1 exit status
> --- errorlevel 1

Used to work, and std.concurrency doesn't even use std.utf.  Not sure what's going on there.
March 23, 2012
On Friday, 23 March 2012 at 00:14:00 UTC, Sean Kelly wrote:
> While sending messages like a bare string might be good for example code, any real application is going to use structured messages whose type is specific to what the message is for, contains fields like sender Tid, etc.  It seems like you're aiming more for CSP where you'd create a separate communication channel per use.  You could even fake it by wrapping send/receive with your own CSP-like API, though it's quite likely that a from-scratch CSP style implementation would be faster because there'd be no need to package messages.

I see your point. To make this easier, may I suggest:

    T receiveNext(T)() {
        T r;
        receive((T t) {r = t;});
        return r;
    }

A big reason for the use of receiveOnly (in my code) is its convenience. receiveNext, and a discouragement of using receiveOnly, would be a simpler solution.

> Used to work, and std.concurrency doesn't even use std.utf.  Not sure what's going on there.

Weird :(

NMS

March 23, 2012
On Friday, 23 March 2012 at 01:35:05 UTC, Nathan M. Swan wrote:
>> Used to work, and std.concurrency doesn't even use std.utf.  Not sure what's going on there.
>
> Weird :(

Are you trying to build std.concurrency from Git master against Phobos 2.058 or something like that?

David

March 23, 2012
On Friday, 23 March 2012 at 12:36:42 UTC, David Nadlinger wrote:
> Are you trying to build std.concurrency from Git master against Phobos 2.058 or something like that?
>
> David

I cloned from git://github.com/D-Programming-Language/phobos.git

NMS