Jump to page: 1 2
Thread overview
dmd-concurrency
Nov 20, 2013
Chris Williams
Nov 20, 2013
Daniel Murphy
Nov 20, 2013
Chris Williams
Nov 22, 2013
Chris Williams
Nov 22, 2013
Shammah Chancellor
Nov 22, 2013
Shammah Chancellor
Nov 22, 2013
Chris Williams
Nov 22, 2013
Shammah Chancellor
Nov 23, 2013
Chris Williams
Nov 23, 2013
Shammah Chancellor
Nov 25, 2013
Chris Williams
Nov 26, 2013
Shammah Chancellor
Nov 26, 2013
Chris Williams
Nov 27, 2013
Shammah Chancellor
Nov 27, 2013
Chris Williams
November 20, 2013
I sent a message to the dmd-concurrency mailing list about a month ago, that I'd like to contribute a set of channel-based functions to std.concurrency, but it was never picked up for inclusion nor (I don't believe) rejected. I'm not sure if I should make the same proposal in an alternate forum (like this one) or if someone can check for the message and post it? Or should I assume that the lack of inclusion was its own answer?

I sent the message in as a "random" rather than subscribing to the mailing list. Is it better for me to fully subscribe, if I want to contribute like this?

Similarly, is there any process for proposals for Phobos? I see that for the language, everything has to be a DIP first, but I didn't see a parallel for Phobos, hence posting to the forums.
November 20, 2013
"Chris Williams" <yoreanon-chrisw@yahoo.co.jp> wrote in message news:kfmkxsgeeijwkndldrmj@forum.dlang.org...
>I sent a message to the dmd-concurrency mailing list about a month ago, that I'd like to contribute a set of channel-based functions to std.concurrency, but it was never picked up for inclusion nor (I don't believe) rejected. I'm not sure if I should make the same proposal in an alternate forum (like this one) or if someone can check for the message and post it? Or should I assume that the lack of inclusion was its own answer?
>

dmd-concurrency was created for the discussion of D2's new concurrency model to be described in TDPL.  It has not been used for years now.

> I sent the message in as a "random" rather than subscribing to the mailing list. Is it better for me to fully subscribe, if I want to contribute like this?
>
> Similarly, is there any process for proposals for Phobos? I see that for the language, everything has to be a DIP first, but I didn't see a parallel for Phobos, hence posting to the forums.


This is the correct forum to post phobos proposals on.


November 20, 2013
On Wednesday, 20 November 2013 at 04:24:14 UTC, Daniel Murphy wrote:
> This is the correct forum to post phobos proposals on.

Well then, here's what I had written:

A few applications I've considered implementing seem like they would be easier if there was a channel-based messaging system in std.concurrency. I'm happy to do this implementation, but I thought I would try to get some sort of sign-off before doing so. Following, I will lay out my argument for the addition, and then the API that I am considering.

---

One fairly common task is thread-pooling. With the standard send/receive model currently implemented, you have to choose a specific thread to target when you send a task. While it's true that you can simply iterate through your list of threads over and over, to spread the load evenly over them, that presumes that all tasks take even processing time. It makes more sense to be able to push data into a shared channel (secretly a work queue), and the first thread that finishes its previous task will be able to immediately pull the task before everyone else. This also means that the necessity of passing around references to your threads so that they can be looped over goes away.

I haven't tested it, but it looks like this sort of thing might be quasi-possible using the register/unregister/locate methods. As each thread starts, it can register itself with a named group (i.e. channel), and then anyone who wants to send an item to an arbitrary thread in that group can call locate() to retrieve one thread and call send() against the Tid. The target thread would then need to unregister itself while it is doing work, then re-register itself. My complaint against this is the need to unregister and re-register. If the thread issuing commands sends a large number of tasks all at once, they will all go to the same thread (if coded poorly) or the caller will need to use yield() or sleep() to allow the target thread to receive the task and unregister, so that locate() can find a different thread. That's not terribly efficient. I am also concerned that there's the chance that all threads will be unregistered when we call locate(), whereas a channeling system would be able to expand the mailbox during the times that all threads are busy.

The actual implementation within concurrency.d also concerns me as (if I read it correctly), the most recent item to register() will be the one which locate() finds, rather than the thread which has been registered the longest. While I suppose it's probably not too large of an issue if the same two threads keep taking all the tasks - that means that your load can't exceed two threads worth of processing power - it still seems like a LIFO system would be better. The registry is also based on an array rather than a set, which can make removal an O(n) operation, if the contents of the registry have to be shifted left, to fill an empty spot.

Overall, I think that adding a shared message box system would be a straightforward way to improve the handling of thread pooling via the actor model.

---

A less common use-case but I was also considering some world-simulators (e.g. for studying economics or building a game map) and here the ability to broadcast messages to a large set of other actors, based on location, interest, etc. seems useful. In this case, messages would need to be copied out to each subscriber in the channel rather than having an existence as a point to point connection. For a networked game, most likely you would want to break each channel into two, where locally all senders on a channel push to a single listener that pipes the messages over the network, and then remotely the messages would be broadcast to many listeners again, but that's a reasonably straightforward task for someone to implement on top of the channel functionality. I don't think that such functionality is needed in Phobos itself. Mostly, the presence of the broadcasting functionality in the standard library allows them to use the easy and safe actor model for more creative uses than a straight one-to-one pipe.

---

Overall, my hope would be to develop something that is conceptually no more difficult to deal with than the current send()/receive() model, but also able to be used in a wide variety of ways. The API that I would propose to develop is:

interface Channel {
	void send(T...)(T vals);
	void prioritySend(T...)(T vals);
	void receive(T...)(out Tid sender, T ops);
	receiveOnlyRet!(T) receiveOnly(T...)();
	bool receiveTimeout(T...)(Duration d, T ops);

	void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis);
	void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) doThisFunc);
}

class SingleChannel : Channel {}	// Send inserts a message into a shared message box. Receive removes message

class DuplicateChannel(bool echo = true) : Channel {}	// Send inserts the message into a message box per-recipient. Receive removes message in the calling thread's channel message box. If echo is false, messages will not be sent back to the sender, even if they are a registered listener

void registerSend(Channel c, Tid tid = thisTid);	// used by function sendAll(). Channel can be of either type
void unregisterSend(Channel c, Tid tid = thisTid);
void registerReceive(Channel c, Tid tid = thisTid);	// used by function receiveAll(). Channel can be of either type
void unregisterReceive(Channel c, Tid tid = thisTid);

void sendAll(T...)(T ops); // Sends a copy of message to all channels this thread has registered for.
void receiveAll(T...)(out Channel c, out Tid sender, T ops); // Receives a message of type T from any channel that we are registered for. Returns channel and sender

I believe that the look and feel stays fairly consistent with the current set of functions in std.concurrency. I've added the ability for the recipient to infer information about the sender since, in the duplication model, I believe there are quite a few cases where this would be important information. And of course, I've added the option to register/unregister threads other than ourselves to allow a greater range of code layouts, though it's possible that the lack of this sort of thing in the original code is due to some sort of safety concern?

The most straightforward way to implement the DuplicateChannel would be to use the individual threads' message boxes, but this would mean that data put into a channel could be pulled out via the traditional receive() method. Currently, my intention would be to partition these two systems (the direct send()/receive() model and the channel model), unless anyone has any reason to think they should be merged into a single whole?

Those are my thoughts, anyways. Comments? Complaints?
November 22, 2013
Sticking with the concept that there needn't be any difference (from a programmatic view) between another thread and another computer, I've also considered adding innate support for channel-based RPC. I'm not sure if anyone would ever actually use such a thing for a local-threaded application, but I figure that creating higher levels of abstraction (like "everything is a file" or "everything is a range") has the possibility to open new paradigms.

Anyways, I'm not sure if I should just start coding and submit a pull request, or if someone needs to sign off on contributions first?
November 22, 2013
On 2013-11-20 07:34:36 +0000, Chris Williams said:

> On Wednesday, 20 November 2013 at 04:24:14 UTC, Daniel Murphy wrote:
>> This is the correct forum to post phobos proposals on.
> 
> Well then, here's what I had written:
> 
> A few applications I've considered implementing seem like they would be easier if there was a channel-based messaging system in std.concurrency. I'm happy to do this implementation, but I thought I would try to get some sort of sign-off before doing so. Following, I will lay out my argument for the addition, and then the API that I am considering.
> 
> ---
> 
> One fairly common task is thread-pooling. With the standard send/receive model currently implemented, you have to choose a specific thread to target when you send a task. While it's true that you can simply iterate through your list of threads over and over, to spread the load evenly over them, that presumes that all tasks take even processing time. It makes more sense to be able to push data into a shared channel (secretly a work queue), and the first thread that finishes its previous task will be able to immediately pull the task before everyone else. This also means that the necessity of passing around references to your threads so that they can be looped over goes away.
> 
> I haven't tested it, but it looks like this sort of thing might be quasi-possible using the register/unregister/locate methods. As each thread starts, it can register itself with a named group (i.e. channel), and then anyone who wants to send an item to an arbitrary thread in that group can call locate() to retrieve one thread and call send() against the Tid. The target thread would then need to unregister itself while it is doing work, then re-register itself. My complaint against this is the need to unregister and re-register. If the thread issuing commands sends a large number of tasks all at once, they will all go to the same thread (if coded poorly) or the caller will need to use yield() or sleep() to allow the target thread to receive the task and unregister, so that locate() can find a different thread. That's not terribly efficient. I am also concerned that there's the chance that all threads will be unregistered when we call locate(), whereas a channeling system would be able to expand the mailbox during the times that all threads are busy.
> 
> The actual implementation within concurrency.d also concerns me as (if I read it correctly), the most recent item to register() will be the one which locate() finds, rather than the thread which has been registered the longest. While I suppose it's probably not too large of an issue if the same two threads keep taking all the tasks - that means that your load can't exceed two threads worth of processing power - it still seems like a LIFO system would be better. The registry is also based on an array rather than a set, which can make removal an O(n) operation, if the contents of the registry have to be shifted left, to fill an empty spot.
> 
> Overall, I think that adding a shared message box system would be a straightforward way to improve the handling of thread pooling via the actor model.
> 
> ---
> 
> A less common use-case but I was also considering some world-simulators (e.g. for studying economics or building a game map) and here the ability to broadcast messages to a large set of other actors, based on location, interest, etc. seems useful. In this case, messages would need to be copied out to each subscriber in the channel rather than having an existence as a point to point connection. For a networked game, most likely you would want to break each channel into two, where locally all senders on a channel push to a single listener that pipes the messages over the network, and then remotely the messages would be broadcast to many listeners again, but that's a reasonably straightforward task for someone to implement on top of the channel functionality. I don't think that such functionality is needed in Phobos itself. Mostly, the presence of the broadcasting functionality in the standard library allows them to use the easy and safe actor model for more creative uses than a straight one-to-one pipe.
> 
> ---
> 
> Overall, my hope would be to develop something that is conceptually no more difficult to deal with than the current send()/receive() model, but also able to be used in a wide variety of ways. The API that I would propose to develop is:
> 
> interface Channel {
> 	void send(T...)(T vals);
> 	void prioritySend(T...)(T vals);
> 	void receive(T...)(out Tid sender, T ops);
> 	receiveOnlyRet!(T) receiveOnly(T...)();
> 	bool receiveTimeout(T...)(Duration d, T ops);
> 
> 	void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis);
> 	void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) doThisFunc);
> }
> 
> class SingleChannel : Channel {}	// Send inserts a message into a shared message box. Receive removes message
> 
> class DuplicateChannel(bool echo = true) : Channel {}	// Send inserts the message into a message box per-recipient. Receive removes message in the calling thread's channel message box. If echo is false, messages will not be sent back to the sender, even if they are a registered listener
> 
> void registerSend(Channel c, Tid tid = thisTid);	// used by function sendAll(). Channel can be of either type
> void unregisterSend(Channel c, Tid tid = thisTid);
> void registerReceive(Channel c, Tid tid = thisTid);	// used by function receiveAll(). Channel can be of either type
> void unregisterReceive(Channel c, Tid tid = thisTid);
> 
> void sendAll(T...)(T ops); // Sends a copy of message to all channels this thread has registered for.
> void receiveAll(T...)(out Channel c, out Tid sender, T ops); // Receives a message of type T from any channel that we are registered for. Returns channel and sender
> 
> I believe that the look and feel stays fairly consistent with the current set of functions in std.concurrency. I've added the ability for the recipient to infer information about the sender since, in the duplication model, I believe there are quite a few cases where this would be important information. And of course, I've added the option to register/unregister threads other than ourselves to allow a greater range of code layouts, though it's possible that the lack of this sort of thing in the original code is due to some sort of safety concern?
> 
> The most straightforward way to implement the DuplicateChannel would be to use the individual threads' message boxes, but this would mean that data put into a channel could be pulled out via the traditional receive() method. Currently, my intention would be to partition these two systems (the direct send()/receive() model and the channel model), unless anyone has any reason to think they should be merged into a single whole?
> 
> Those are my thoughts, anyways. Comments? Complaints?

How does one receive from multiple channels out-of-order?   I would rather this sent it to the subscribed Tid via send, rather than having an additional queue.   It could possible send a ChannelMessage which has a reference to the sending channel and the message.   I understand this is a different model than what Go and whatnot use, but I think it's more pratical in some circumstances.   Maybe both ways would be good?  I personally use this method in my vibe-d server.

November 22, 2013
On 2013-11-22 22:34:19 +0000, Shammah Chancellor said:

> How does one receive from multiple channels out-of-order?   I would rather this sent it to the subscribed Tid via send, rather than having an additional queue.   It could possible send a ChannelMessage which has a reference to the sending channel and the message.   I understand this is a different model than what Go and whatnot use, but I think it's more pratical in some circumstances.   Maybe both ways would be good?  I personally use this method in my vibe-d server.

Edit, I see that you have receiveAll.. I didn't notice that.  However, that still doesn't satisfy the problem when you want to do receive() for your Tid, and receiveAll() from your channels.

November 22, 2013
On Friday, 22 November 2013 at 22:35:47 UTC, Shammah Chancellor wrote:
> On 2013-11-22 22:34:19 +0000, Shammah Chancellor said:
> 
> Edit, I see that you have receiveAll.. I didn't notice that.  However, that still doesn't satisfy the problem when you want to do receive() for your Tid, and receiveAll() from your channels.

receiveAll() could pull from the thread's personal message box as well as all of its subscribed channels, so that it truly was a "receive ALL".

My thoughts for reasons to avoid that, however, are:

1. Threads always have their own message box allocated - even if it's empty - whereas with channels, at most you only have as many message boxes as you subscribed to. So my thinking was that if people are unlikely to mix channels-based systems and direct-sends in the same application, then every call to receiveAll() is having to spend an extra cycle checking the direct-send message box.

2. Since a Channel is just an interface, the implementation of which can vary, anyone who wanted to implement a NetworkedDuplicateChannel() class would be able to do so and pass it into a module that only includes std.concurency. This allows the actual implementation of any given channel to behave completely different from one another and quickly port code from one type to the other. send()/receive() are just innate to threads, however, and can't be replaced except by changing the import in each file to something else. Knowing that and also knowing that any direct-messaging system would probably be built like a channel (so that it had a constructor/init function where an IP and port could be configured, and perhaps an explicit startListening() method to call), I don't see anyone trying to override send()/receive() as their method for receiving direct messages over the network. They would still use the Channel interface, so there's not much value in trying to tie the two together.
November 22, 2013
On 2013-11-22 23:14:44 +0000, Chris Williams said:

> On Friday, 22 November 2013 at 22:35:47 UTC, Shammah Chancellor wrote:
>> On 2013-11-22 22:34:19 +0000, Shammah Chancellor said:
>> 
>> Edit, I see that you have receiveAll.. I didn't notice that.  However, that still doesn't satisfy the problem when you want to do receive() for your Tid, and receiveAll() from your channels.
> 
> receiveAll() could pull from the thread's personal message box as well as all of its subscribed channels, so that it truly was a "receive ALL".
> 
> My thoughts for reasons to avoid that, however, are:
> 
> 1. Threads always have their own message box allocated - even if it's empty - whereas with channels, at most you only have as many message boxes as you subscribed to. So my thinking was that if people are unlikely to mix channels-based systems and direct-sends in the same application, then every call to receiveAll() is having to spend an extra cycle checking the direct-send message box.
> 
> 2. Since a Channel is just an interface, the implementation of which can vary, anyone who wanted to implement a NetworkedDuplicateChannel() class would be able to do so and pass it into a module that only includes std.concurency. This allows the actual implementation of any given channel to behave completely different from one another and quickly port code from one type to the other. send()/receive() are just innate to threads, however, and can't be replaced except by changing the import in each file to something else. Knowing that and also knowing that any direct-messaging system would probably be built like a channel (so that it had a constructor/init function where an IP and port could be configured, and perhaps an explicit startListening() method to call), I don't see anyone trying to override send()/receive() as their method for receiving direct messages over the network. They would still use the Channel interface, so there's not much value in trying to tie the two together.

I am suggesting you define a particular type of message to be received, and then send that to the Thread/Fiber's MessageQueue.   Then the Channel is just an interface to broadcast messages to all the subscribers of a particular channel.  Then each thread need only poll one queue.  

November 23, 2013
On Friday, 22 November 2013 at 23:21:33 UTC, Shammah Chancellor wrote:
> On 2013-11-22 23:14:44 +0000, Chris Williams said:
>
> I am suggesting you define a particular type of message to be received, and then send that to the Thread/Fiber's MessageQueue.   Then the Channel is just an interface to broadcast messages to all the subscribers of a particular channel.  Then each thread need only poll one queue.

With a SingleChannel, that's not really an option. Theoretically, it could choose a random subscriber with space in its MessageBox as the next recipient, but there would be no guarantee that a thread which had just decided to shut itself down hadn't been selected, in which case the message would be lost. The channel really needs to have an internal queue that all the threads look at when they call receive.

For DuplicateChannel, I could indeed go and copy the data into each individual thread's box, so that it only had to check there for messages during receive (if not subscribed to any SingleChannel instances), but I would still need to have the logic in place to scan for in-channel boxes because of SingleChannel, in which case I might as well establish the presence of boxes in channels as the norm for DuplicateChannel as well.

This does give us the advantage that if the user requests calls against a single channel, there is no extra overhead associated with that act, because it has to scan through a large list to find the appropriate items. It also allows the user to customize the behavior of the MessageBox on a channel-by-channel basis. Setting a "max number of messages" per channel, when all messages go into the same bucket, would end up wonky.
November 23, 2013
On 2013-11-23 01:46:22 +0000, Chris Williams said:

> On Friday, 22 November 2013 at 23:21:33 UTC, Shammah Chancellor wrote:
>> On 2013-11-22 23:14:44 +0000, Chris Williams said:
>> 
>> I am suggesting you define a particular type of message to be received, and then send that to the Thread/Fiber's MessageQueue.   Then the Channel is just an interface to broadcast messages to all the subscribers of a particular channel.  Then each thread need only poll one queue.
> 
> With a SingleChannel, that's not really an option. Theoretically, it could choose a random subscriber with space in its MessageBox as the next recipient, but there would be no guarantee that a thread which had just decided to shut itself down hadn't been selected, in which case the message would be lost. The channel really needs to have an internal queue that all the threads look at when they call receive.
> 
> For DuplicateChannel, I could indeed go and copy the data into each individual thread's box, so that it only had to check there for messages during receive (if not subscribed to any SingleChannel instances), but I would still need to have the logic in place to scan for in-channel boxes because of SingleChannel, in which case I might as well establish the presence of boxes in channels as the norm for DuplicateChannel as well.
> 
> This does give us the advantage that if the user requests calls against a single channel, there is no extra overhead associated with that act, because it has to scan through a large list to find the appropriate items. It also allows the user to customize the behavior of the MessageBox on a channel-by-channel basis. Setting a "max number of messages" per channel, when all messages go into the same bucket, would end up wonky.

In my uses of channels, I have not found customizing the message box size per channel to be useful.  It may be, but it's not something I want.    I sitll think duplicate channels should behave the way I described.   Take IRC for example, where I am sending messages to other users, but also to channels which then broadcast.   I want my clients to be able to simply receive() and get messages that were intended for them specifically, or were broadcast.  I don't want to implement complex logic in order to avoid my thread hanging while trying to read from a channel if I have messages available that are specific to my thread.

With regards to SingleChannel,  picking a random thread would be bad for a plethora of reasons, so I agree here.  I think we should continue to disucss this issue.  There may be some way to get Tid.receive() to behave the expected way when subscribed to SingleChannels.

Also, SingleChannels seem somewhat strange in general to me though -- What is the expected behavior when multiple threads receive different types of mesages from the MessageBox?  Does it consume messages it doesn't understand until it finds one it does?  This would prevent other tasks which do understand them from processing.   What is the use case for SingleChannel that a simple synchronized Queue does not acheive?

It seems you want SingleChannel and DuplicateChannel not block each other for the same reason I don't want DuplicateChannels to block Task-specific receive()'s.   However, I think SingleChannels are the oddity here, and should be treated as such, rather than Tid.send()

-Shammah

« First   ‹ Prev
1 2