Thread overview
Shutting down thread with Socket blocking for connection
Mar 04, 2012
Vidar Wahlberg
Mar 05, 2012
Dmitry Olshansky
Mar 05, 2012
Regan Heath
Mar 05, 2012
Dmitry Olshansky
Mar 06, 2012
Vidar Wahlberg
Mar 15, 2012
Danny Arends
March 04, 2012
Coming from a C++/Java world I find D's approach to concurrency slightly difficult to grasp, perhaps someone could help me out a bit on this problem:

I'd like to have a method that spawns a new thread which sets up a socket and listens for connections (in a blocking fashion). This part is easy, the hard part is nicely shutting down the thread listening for connections.
Here's some quick code (ignore the fact that even if "sock" was shared, you would not be guaranteed that it was initialized correctly by the time you call "shutdown()" and "close()"):
*************
import std.concurrency;
import std.socket;
/*shared*/ Socket sock;
void main() {
        spawn(&listen);

        /* do stuff */

        sock.shutdown(SocketShutdown.BOTH);
        sock.close();
}
void listen() {
        sock = new TcpSocket();
        sock.blocking = true;
        sock.bind(new InternetAddress(8080));
        sock.listen(10);
        sock.accept(); // assume no connection was made, thus still blocking
}
*************

If I make "sock" shared, then I'm not allowed to call any methods on "sock" in "listen()" ("is not callable using argument types (...) shared").
I came across a post about a similar issue in this mail group from Andrew Wiley (2011-02-15 23:59) which had some example code, but I could not get that code to compile. Neither could I find an example in TDPL that shows how to deal with threads that are blocked.

Basically, how would I go on about nicely stopping a thread that's waiting for connections?
March 05, 2012
On 05.03.2012 1:46, Vidar Wahlberg wrote:
> Coming from a C++/Java world I find D's approach to concurrency slightly
> difficult to grasp, perhaps someone could help me out a bit on this
> problem:

>
> I'd like to have a method that spawns a new thread which sets up a
> socket and listens for connections (in a blocking fashion). This part is
> easy, the hard part is nicely shutting down the thread listening for
> connections.

> Here's some quick code (ignore the fact that even if "sock" was shared,
> you would not be guaranteed that it was initialized correctly by the
> time you call "shutdown()" and "close()"):



> *************
> import std.concurrency;
> import std.socket;
> /*shared*/ Socket sock;
//yeah so this socket is
> void main() {
> spawn(&listen);
>
> /* do stuff */
>
> sock.shutdown(SocketShutdown.BOTH);
> sock.close();
> }
> void listen() {
> sock = new TcpSocket();
> sock.blocking = true;
> sock.bind(new InternetAddress(8080));
> sock.listen(10);
> sock.accept(); // assume no connection was made, thus still blocking
> }
> *************
>
> If I make "sock" shared, then I'm not allowed to call any methods on
> "sock" in "listen()" ("is not callable using argument types (...) shared").

Everything is thread-local by default. And thus there is extra protection with shared, for instance you *can* do atomic op on them (core.atomic), but sockets API wasn't designed with shared in mind (obviously). In any case, this oldschool sharing like that is a recipe for race conditions and bad scalability.

> I came across a post about a similar issue in this mail group from
> Andrew Wiley (2011-02-15 23:59) which had some example code, but I could
> not get that code to compile. Neither could I find an example in TDPL
> that shows how to deal with threads that are blocked.
>
> Basically, how would I go on about nicely stopping a thread that's
> waiting for connections?

Even in C I would put all responsibility on the listen thread.
In D, you have message passing in std(!), so do a select-pooling loop
and check for messages from main thread:

(*not tested*)
It may be that e.g. null are not accepted for select as an empty set, then just create a new empty SocketSet.

void main() {
 Tid listenT = spawn(&listen);

 /* do stuff */

 send(listenT, 42); //usually messages have extra info ;)
}

void listen() {
 bool working = true;
 Socket sock = new TcpSocket();
 sock.blocking = true;
 sock.bind(new InternetAddress(8080));
 scope(exit) {
   sock.shutdown(SocketShutdown.BOTH);
   sock.close();
 }
 sock.listen(10);
 SocketSet set = new SocketSet();
 set.add(sock);
 while(working){

  if(select(set, null, null, 10) > 0){ //10 usec wait on a socket, may do plain 0
	 sock.accept(); // no blocking here
  }
  set.reset();
  set.add(sock);
  receiveTimeout(dur!"us"(1), (int code){ working = false; });
 }

}

What the heck in such convenient manner you can send it message to  bind to another port, etc.

-- 
Dmitry Olshansky
March 05, 2012
A more efficient approach is to use async socket routines and an event object.

So, in main you create a shared event object, then start the listen thread.
In listen you call an async select or accept, and then wait on that /and/ the shared event object.

To stop listen you set the shared event, which wakes it, and it notices the event is set and performs the cleanup/stops itself.

I'm not sure if phobos has support for this sort of thing yet, but you could always leverage the underlying C library.  I may be able to offer some pointers on Windows, but I haven't done something like this on any unix platform for a while..

Regan
March 05, 2012
On 05.03.2012 16:46, Regan Heath wrote:
> A more efficient approach is to use async socket routines and an event
> object.
>
> So, in main you create a shared event object, then start the listen thread.
> In listen you call an async select or accept, and then wait on that
> /and/ the shared event object.
>
> To stop listen you set the shared event, which wakes it, and it notices
> the event is set and performs the cleanup/stops itself.
>
> I'm not sure if phobos has support for this sort of thing yet, but you
> could always leverage the underlying C library. I may be able to offer
> some pointers on Windows, but I haven't done something like this on any
> unix platform for a while..
>

Yeah, the main problem with event systems & async I/O is they are not cross-platform at all. And looking at linux even across one OS. To wrap them sanely on all platforms is no trivial task. I wish we had it in phobos though.

-- 
Dmitry Olshansky
March 06, 2012
On 2012-03-05 09:38, Dmitry Olshansky wrote:
> ...
> while(working){
>
> if(select(set, null, null, 10) > 0){ //10 usec wait on a socket, may do
> plain 0
> sock.accept(); // no blocking here
> }
> set.reset();
> set.add(sock);
> receiveTimeout(dur!"us"(1), (int code){ working = false; });
> }
>...

Thanks for the answers.

However, I still have a problem:
What I'd really like to do is have one thread listening for incoming connections and spawn a new thread for each connection (it's a simple HTTP server, I'm interested in learning about sockets & threads, I'm not looking for a HTTP server). "Socket.accept()" returns a Socket whenever a connection is made, my intention was to pass this to a new thread, but as I pointed out in the initial post I can't find a way to do so, and I'm unable to find a solution to this problem from your replies.
Does this mean that it's recommended to keep the listening socket and all the sockets it spawns in a single thread? I don't know a whole lot on how sockets works, but can't that easily cause poor performance when you're reading from or writing to multiple sockets?

I've not been able to find some D2 code that use multiple threads in conjunction with sockets, but if anyone know about some examples I'd be happy to read that.
March 15, 2012
You could have a look at my attempt:

https://github.com/DannyArends/D-coding/tree/master/src/web