Thread overview
Help with Concurrency
Nov 03, 2015
bertg
Nov 04, 2015
Nicholas Wilson
Nov 04, 2015
bertg
Nov 04, 2015
Marc Schütz
Nov 04, 2015
JR
Nov 04, 2015
JR
Nov 05, 2015
Dmitri
November 03, 2015
I am having trouble with a simple use of concurrency.

Running the following code I get 3 different tid's, multiple "sock in" messages printed, but no receives. I am supposed to get a "received!" for each "sock in", but I am getting hung up on "receiving...".

Am I misusing or misunderstanding the use of mailboxes?

===

class Connection {
    Reactor reactor;
    WebSocket webSocket;

    this(Reactor r, WebSocket ws)
    {
        reactor = r;
        webSocket = ws;

        messageLoop();
    }

    void messageLoop()
    {
        std.concurrency.Tid tid = std.concurrency.thisTid();
        writeln("tid 1 ~ " ~ to!string(std.concurrency.thisTid()));
        writeln("tid 2 ~ " ~ to!string(std.concurrency.thisTid()));
        writeln("tid 3 ~ " ~ to!string(std.concurrency.thisTid()));

        // deal with websocket messages
        spawn(&handleConnectionWebSocket, tid, cast(shared) webSocket);
        // deal with pub/sub
        //spawn();

        while (true) {
            writeln("receiving...");
            std.concurrency.receive(
                (string msg) {
                    writeln("conn: received ws message: " ~ msg);
                }
            );
            writeln("received!");
        }
    }
}
void handleConnectionWebSocket(std.concurrency.Tid caller, shared WebSocket ws)
{
    auto sock = cast(WebSocket) ws;
    while (sock.connected) {
        writeln("sock in");
        auto msgIn = sock.receiveText();
        std.concurrency.send(caller, msgIn);
    }
}
November 04, 2015
On Tuesday, 3 November 2015 at 23:16:59 UTC, bertg wrote:
> I am having trouble with a simple use of concurrency.
>
> Running the following code I get 3 different tid's, multiple "sock in" messages printed, but no receives. I am supposed to get a "received!" for each "sock in", but I am getting hung up on "receiving...".
>
> Am I misusing or misunderstanding the use of mailboxes?
>
> ===
>
> class Connection {
>     Reactor reactor;
>     WebSocket webSocket;
>
>     this(Reactor r, WebSocket ws)
>     {
>         reactor = r;
>         webSocket = ws;
>
>         messageLoop();
>     }
>
>     void messageLoop()
>     {
>         std.concurrency.Tid tid = std.concurrency.thisTid();
>         writeln("tid 1 ~ " ~ to!string(std.concurrency.thisTid()));
>         writeln("tid 2 ~ " ~ to!string(std.concurrency.thisTid()));
>         writeln("tid 3 ~ " ~ to!string(std.concurrency.thisTid()));
>
>         // deal with websocket messages
>         spawn(&handleConnectionWebSocket, tid, cast(shared) webSocket);
>         // deal with pub/sub
>         //spawn();
>
Try replacing the following loop to have a receive that times out or while(true) to while(web socked.connected)
>         while (true) {
>             writeln("receiving...");
>             std.concurrency.receive(
>                 (string msg) {
>                     writeln("conn: received ws message: " ~ msg);
>                 }
>             );
>             writeln("received!");
>         }
>     }
> }
> void handleConnectionWebSocket(std.concurrency.Tid caller, shared WebSocket ws)
> {
>     auto sock = cast(WebSocket) ws;
>     while (sock.connected) {
>         writeln("sock in");
>         auto msgIn = sock.receiveText();
>         std.concurrency.send(caller, msgIn);
>     }
> }

November 04, 2015
On Wednesday, 4 November 2015 at 01:27:57 UTC, Nicholas Wilson wrote:
> On Tuesday, 3 November 2015 at 23:16:59 UTC, bertg wrote:
>>[...]
> Try replacing the following loop to have a receive that times out or while(true) to while(web socked.connected)
>> [...]

That didn't solve the problem. How would that solve the problem?

std.concurrency.receive does not have a timeout either.
November 04, 2015
On Tuesday, 3 November 2015 at 23:16:59 UTC, bertg wrote:
>         while (true) {
>             writeln("receiving...");
>             std.concurrency.receive(
>                 (string msg) {
>                     writeln("conn: received ws message: " ~ msg);
>                 }
>             );
>             writeln("received!");
>         }
>     }
> }
> void handleConnectionWebSocket(std.concurrency.Tid caller, shared WebSocket ws)
> {
>     auto sock = cast(WebSocket) ws;
>     while (sock.connected) {
>         writeln("sock in");
>         auto msgIn = sock.receiveText();
>         std.concurrency.send(caller, msgIn);
>     }
> }

What is the type of `msgIn`? Try inserting `pragma(msg, typeof(msgIn))` after the line where it's declared and look at the compiler's output. My suspicion is that it's something like `char[]` or `const(char)[]`, which doesn't match the `string` (aka `immutable(char)[]`) you're trying to receive.
November 04, 2015
On Tuesday, 3 November 2015 at 23:16:59 UTC, bertg wrote:
> Running the following code I get 3 different tid's, multiple "sock in" messages printed, but no receives. I am supposed to get a "received!" for each "sock in", but I am getting hung up on "receiving...".

[...]

>         while (true) {
>             writeln("receiving...");
>             std.concurrency.receive(
>                 (string msg) {
>                     writeln("conn: received ws message: " ~ msg);
>                 }
>             );
>             writeln("received!");
>         }

Unless I'm reading it wrong, you want std.concurrency.receiveTimeout.

    import core.time;
    import std.concurrency;

    bool received = receiveTimeout(1.seconds, // negative makes it not wait at all
        (string msg) {
            writeln("conn: received ws message: " ~ msg);
        });

        if (received) {
            writeln("received!");
        }
        else {
            writeln("timed out...");
            // stuff?
        }
    }

> Tries to receive but will give up if no matches arrive within duration. Won't wait at all if provided core.time.Duration is negative.
November 04, 2015
On Wednesday, 4 November 2015 at 16:49:59 UTC, JR wrote:
>[...]

And my indentation and brace-balancing there is wrong. Shows how dependent I've become on syntax highlighting.

    import core.time;
    import std.concurrency;

    bool received = receiveTimeout(1.seconds,
            writeln("conn: received ws message: " ~ msg);
        }
    );

    if (received) {
        writeln("received!");
    }
    else {
        writeln("timed out...");
        // stuff?
    }

November 05, 2015
On Tuesday, 3 November 2015 at 23:16:59 UTC, bertg wrote:
> I am having trouble with a simple use of concurrency.
>
> Running the following code I get 3 different tid's, multiple "sock in" messages printed, but no receives. I am supposed to get a "received!" for each "sock in", but I am getting hung up on "receiving...".
>
> [...]

I had a similarily odd experience with std.concurrency - my receive would not work unless I also received on Variant, although the Variant receiver was a no-op:

 receive(
      (Event event)
      {
          // handle event
      },
      (Variant v) {}
 );