Thread overview | |||||
---|---|---|---|---|---|
|
August 09, 2013 std.concurrency.receive() and event demultiplexer | ||||
---|---|---|---|---|
| ||||
Hi, I'm playing around with std.concurrency and found it quite interesting. I drop all praise words (they all already said) and go to the question itself. With std.concurrency we could have a number of asynchronously operating routines doing job "linearly", for instance, reading from socket and sending received data to consumer thread. that ok. but what if socket or generally handling task blocks indefinitely? We lose ability to respond to external world (other threads). Okay, we could employ async event-based model for handling our task (reading socket), but now we would block in event demultiplexer loop. Now we could go further and overcome it with complication of logic: - timed wait for event on socket - when timeout occur check receive for incoming messages again with timeout - switch back to events waiting The drawbacks are obvious: - complicated logic - artificial switching between two demultiplexers: event loop and std.concurrency.receive() - need to choose good timeout to meet both: responsiveness and cpu load Alternatively it is possible to take away all blocking operations to another child thread, but this does not eliminate problem of resource freeing. with socket example this is dangerous with hanging socket descriptor and (1) not telling peer socket to shut up conversation (2) overflow of number of open file descriptors The problem would be solved quite elegant if either (1) receive() could handle different events, not just communication messages, (2) it would be possible to get waitable descriptor for receive() that could be registered in 3-party event demultiplexer. Of course receive() is not aimed for (1) and i know no ways to get (2) working. So the question: how to overcome the problem i described if i described it clear. |
August 12, 2013 Re: std.concurrency.receive() and event demultiplexer | ||||
---|---|---|---|---|
| ||||
Posted in reply to Ruslan Mullakhmetov | On Aug 9, 2013, at 10:59 AM, Ruslan Mullakhmetov <nobody@example.com> wrote: > > With std.concurrency we could have a number of asynchronously operating routines doing job "linearly", for instance, reading from socket and sending received data to consumer thread. that ok. but what if socket or generally handling task blocks indefinitely? We lose ability to respond to external world (other threads). Okay, we could employ async event-based model for handling our task (reading socket), but now we would block in event demultiplexer loop. The current design was deliberate, for a few reasons. First, Phobos doesn't have much in the way of network support. std.socket has been on the chopping block for ages, and we haven't yet gotten a performant option to replace it. So in large part I simply didn't feel we had a network package to integrate. Second, while integrating IO events with receive is appealing as far as convenience, I had difficulty coming up with a way that was sufficiently performant to satisfy most people who would actually be doing event-based IO. To be acceptable the method would have to avoid per-event allocations entirely, and work in a similar manner on both Windows (which uses a proctor pattern) and *nix (which uses a reactor pattern). I had some ideas for how to approach this, but it seemed like the first step would be to simply provide an adaptor that acted as a proxy between the IO event loop and the send/receive loop, running in a separate thread. That would handle the convenience for casual use, and the performance-minded folks could just ignore it and do their own thing, which is what they'd want to do anyway. > Alternatively it is possible to take away all blocking operations to another child thread, but this does not eliminate problem of resource freeing. with socket example this is dangerous with hanging socket descriptor and (1) not telling peer socket to shut up conversation (2) overflow of number of open file descriptors Could you expand on this? > The problem would be solved quite elegant if either (1) receive() could handle different events, not just communication messages, (2) it would be possible to get waitable descriptor for receive() that could be registered in 3-party event demultiplexer. > > Of course receive() is not aimed for (1) and i know no ways to get (2) working. For (1) I think it's more that not every platform provides a performant way to signal the receipt of multiple kinds of messages. On Windows you can use WaitForMultipleObjects, and on *nix you can kind of fake it with poll, but once you move to an IO event library like libev or libevent things get a lot trickier, which leads into (2). To solve (2), one approach would be to have each "thread" created by spawn() be a kernel thread running a Fiber (potentially multiple fibers at some point once the TLS/FLS issue is sorted out). Then when receive() determines that waiting for a signal is necessary, the call could yield() back to the event loop and let something else process. This is basically how Vibe.d works today, but obviously without the nifty std.concurrency integration. And it's a direction I'd like to consider heading, but first we really need a solution to the TLS/FLS problem, which becomes pretty complicated if we want to support dynamic libraries (we do). |
August 17, 2013 Re: std.concurrency.receive() and event demultiplexer | ||||
---|---|---|---|---|
| ||||
Posted in reply to Ruslan Mullakhmetov | On Friday, 9 August 2013 at 17:59:33 UTC, Ruslan Mullakhmetov wrote: > Now we could go further and overcome it with complication of logic: > - timed wait for event on socket > - when timeout occur check receive for incoming messages again with timeout > - switch back to events waiting > > The drawbacks are obvious: > - complicated logic > - artificial switching between two demultiplexers: event loop and std.concurrency.receive() > - need to choose good timeout to meet both: responsiveness and cpu load > > Alternatively it is possible to take away all blocking operations to another child thread, but this does not eliminate problem of resource freeing. with socket example this is dangerous with hanging socket descriptor and (1) not telling peer socket to shut up conversation (2) overflow of number of open file descriptors I ended going that way with my small toy IRC bot; one thread to *read* from the connected stream and pass on incoming lines, only briefly checking for messages inbetween (short) stream read timeouts; another thread to *write* to the same stream, indefinitely blocking in std.concurrency.receive() until a string comes along. Somewhat dumbed-down excerpt with added clarifying comments; /* --8<----8<----8<----8<----8<----8<----8<----8<----8<----8<--*/ __gshared Socket __gsocket; // *right* in the pride D: __gshared SocketStream __gstream; void serverRead() { bool halt; char[512] buf; char[] slice; Tid broker = locateTid("broker"); register("reader"); // thread string identifier __gsocket.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, 5.seconds); // arbitrary value // some template mixins to reduce duplicate code mixin MessageActionLocal!(halt,true,Imperative.Abort) killswitch; mixin MessageActionLocal!(halt,true,OwnerTerminated) ownerTerm; mixin MessageAction!Variant variant; // for debugging while (!halt) { slice = __gstream.readLine(buf); if (slice.length) broker.send(slice.idup); // we just want to check for queued messages, // not block waiting for new ones, so timeout immediately receiveTimeout(0.seconds, &killswitch.trigger, // these set halt = true &ownerTerm.trigger, // ^ &variant.doPrint // this just prints what it received ); } } void serverWrite() { bool halt; long prev; register("writer"); // even so, ugh for the boilerplate that remains mixin MessageActionLocal!(halt,true,Imperative.Abort) killswitch; mixin MessageActionLocal!(halt,true,OwnerTerminated) ownerTerm; mixin MessageAction!Variant variant; // likewise while (!halt) { receive( (string text) { __gstream.sendLine(text); // heavily abbreviated; with only this you'll soon // get kicked due to spam }, &killswitch.trigger, &ownerTerm.trigger, &variant.doPrint ); } } /* --8<----8<----8<----8<----8<----8<----8<----8<----8<----8<--*/ In particular I'm not happy about the __gshared resources, but this works well enough for my purposes (again, IRC bot). I initialize said socket and stream in the thread that spawns these two, so while used by both, neither will close them when exiting scope. But yes, the reader keeps switching around. Both need to be able to catch OwnerTerminated and some other choice imperatives of import. The longest stall will naturally be when it's sent a message while blocked reading from the stream, but in this context 5 seconds is not that big a deal. Still, I wish I knew some other way -- to salvage my pride, if nothing else. |
Copyright © 1999-2021 by the D Language Foundation