| |
| Posted by Jason House in reply to Graham St Jack | PermalinkReply |
|
Jason House
Posted in reply to Graham St Jack
| Graham St Jack wrote:
> On Mon, 07 Dec 2009 22:45:17 -0500, Jason House wrote:
>> I've backed out most of my pro-shared changes and will try again in a few months :(
>
> I have also given up on shared and am also adopting a waiting strategy.
Yeah, it's sad. I have been successful in converting part of my code base to use shared, and suspect I can probably get another chunk working if I try really hard. I partially suspect that the viral nature of shared makes any conversion of a code base tough. It may be much easier to get something working if shared is used from the beginning while developing.
> I would love to get some tips from anyone (like Walter, for example) who thinks they have a way of using shared successfully.
Here's the one module that I was able to completely convert to use shared (and converted all uses of it to use shared). Maybe it'd help you figure out how to get things to work? It's a relatively brain dead message queue. It can't hold more than one message at a time (adding a second message will block until the first message has been received by all threads). It's intended for one master thread to send a message to all recipients in a thread group.
module hb.io.ipc;
import std.cstream;
import core.thread;
import tango.core.Atomic;
template broadcastMessageQueue(target, double sleepSec=0.1){
private enum int sleepTicks = cast(int) (sleepSec*100_000_000);
/// Broadcasts a delegate to a bunch of identical recipients
class sender{
alias void delegate(target) messageType;
private int id;
private int max;
private int pending;
private messageType msg;
this(int numberOfRecipients){ max = numberOfRecipients; }
/// Only blocks if queue is full
void send(shared messageType message) shared{
waitForQueueToEmpty;
id++;
msg = message;
pending = max;
}
/// Blocks until every recipient got the message
void push(shared messageType message) shared{
send(message);
waitForQueueToEmpty;
}
private bool receive(int messageId, target t) shared{
if (pending == 0 || id < messageId)
return false;
msg(t);
atomicDecrement!(msync.raw)(pending);
return true;
}
private void waitForQueueToEmpty() shared{
while(pending > 0)
Thread.sleep(sleepTicks);
}
}
/// Receives delegates from the specified sender. Never blocks.
class receiver{
private target parent;
private shared sender source;
private int nextMessageId = 1;
this(target t, shared sender s){ parent = t; source = s; }
bool receive(){
// Cast is hack to circumvent bugzilla issue #3089
if (source.receive(nextMessageId, parent)){
nextMessageId++;
return true;
}
return false;
}
}
}
version(test)
unittest{
derr.writefln("Testing broadcast message queue");
class dummy{ int x; }
auto foo = new dummy;
auto bar = new dummy;
// Extra parenthesis as hack to circumvent dmd bugzilla issue #3091
auto sender = new shared(broadcastMessageQueue!(dummy).sender)(2);
auto rx1 = new broadcastMessageQueue!(dummy).receiver(foo, sender);
auto rx2 = new broadcastMessageQueue!(dummy).receiver(bar, sender);
assert(rx1.receive == false);
assert(rx2.receive == false);
sender.send( cast(shared void delegate(dummy)) (dummy d){d.x++;});
assert(rx1.receive);
assert(rx2.receive);
assert(rx1.receive == false);
assert(rx2.receive == false);
assert(foo.x == 1);
assert(bar.x == 1);
}
|