View mode: basic / threaded / horizontal-split · Log in · Help
March 04, 2012
Shutting down thread with Socket blocking for connection
Coming from a C++/Java world I find D's approach to concurrency slightly 
difficult to grasp, perhaps someone could help me out a bit on this problem:

I'd like to have a method that spawns a new thread which sets up a 
socket and listens for connections (in a blocking fashion). This part is 
easy, the hard part is nicely shutting down the thread listening for 
connections.
Here's some quick code (ignore the fact that even if "sock" was shared, 
you would not be guaranteed that it was initialized correctly by the 
time you call "shutdown()" and "close()"):
*************
import std.concurrency;
import std.socket;
/*shared*/ Socket sock;
void main() {
        spawn(&listen);

        /* do stuff */

        sock.shutdown(SocketShutdown.BOTH);
        sock.close();
}
void listen() {
        sock = new TcpSocket();
        sock.blocking = true;
        sock.bind(new InternetAddress(8080));
        sock.listen(10);
        sock.accept(); // assume no connection was made, thus still 
blocking
}
*************

If I make "sock" shared, then I'm not allowed to call any methods on 
"sock" in "listen()" ("is not callable using argument types (...) shared").
I came across a post about a similar issue in this mail group from 
Andrew Wiley (2011-02-15 23:59) which had some example code, but I could 
not get that code to compile. Neither could I find an example in TDPL 
that shows how to deal with threads that are blocked.

Basically, how would I go on about nicely stopping a thread that's 
waiting for connections?
March 05, 2012
Re: Shutting down thread with Socket blocking for connection
On 05.03.2012 1:46, Vidar Wahlberg wrote:
> Coming from a C++/Java world I find D's approach to concurrency slightly
> difficult to grasp, perhaps someone could help me out a bit on this
> problem:

>
> I'd like to have a method that spawns a new thread which sets up a
> socket and listens for connections (in a blocking fashion). This part is
> easy, the hard part is nicely shutting down the thread listening for
> connections.

> Here's some quick code (ignore the fact that even if "sock" was shared,
> you would not be guaranteed that it was initialized correctly by the
> time you call "shutdown()" and "close()"):



> *************
> import std.concurrency;
> import std.socket;
> /*shared*/ Socket sock;
//yeah so this socket is
> void main() {
> spawn(&listen);
>
> /* do stuff */
>
> sock.shutdown(SocketShutdown.BOTH);
> sock.close();
> }
> void listen() {
> sock = new TcpSocket();
> sock.blocking = true;
> sock.bind(new InternetAddress(8080));
> sock.listen(10);
> sock.accept(); // assume no connection was made, thus still blocking
> }
> *************
>
> If I make "sock" shared, then I'm not allowed to call any methods on
> "sock" in "listen()" ("is not callable using argument types (...) shared").

Everything is thread-local by default. And thus there is extra 
protection with shared, for instance you *can* do atomic op on them 
(core.atomic), but sockets API wasn't designed with shared in mind 
(obviously). In any case, this oldschool sharing like that is a recipe 
for race conditions and bad scalability.

> I came across a post about a similar issue in this mail group from
> Andrew Wiley (2011-02-15 23:59) which had some example code, but I could
> not get that code to compile. Neither could I find an example in TDPL
> that shows how to deal with threads that are blocked.
>
> Basically, how would I go on about nicely stopping a thread that's
> waiting for connections?

Even in C I would put all responsibility on the listen thread.
In D, you have message passing in std(!), so do a select-pooling loop
and check for messages from main thread:

(*not tested*)
It may be that e.g. null are not accepted for select as an empty set, 
then just create a new empty SocketSet.

void main() {
 Tid listenT = spawn(&listen);

 /* do stuff */

 send(listenT, 42); //usually messages have extra info ;)
}

void listen() {
 bool working = true;
 Socket sock = new TcpSocket();
 sock.blocking = true;
 sock.bind(new InternetAddress(8080));
 scope(exit) {
   sock.shutdown(SocketShutdown.BOTH);
   sock.close();
 }
 sock.listen(10);
 SocketSet set = new SocketSet();
 set.add(sock);
 while(working){

  if(select(set, null, null, 10) > 0){ //10 usec wait on a socket, may 
do plain 0
	 sock.accept(); // no blocking here
  }
  set.reset();
  set.add(sock);
  receiveTimeout(dur!"us"(1), (int code){ working = false; });
 }

}

What the heck in such convenient manner you can send it message to  bind 
to another port, etc.

-- 
Dmitry Olshansky
March 05, 2012
Re: Shutting down thread with Socket blocking for connection
A more efficient approach is to use async socket routines and an event  
object.

So, in main you create a shared event object, then start the listen thread.
In listen you call an async select or accept, and then wait on that /and/  
the shared event object.

To stop listen you set the shared event, which wakes it, and it notices  
the event is set and performs the cleanup/stops itself.

I'm not sure if phobos has support for this sort of thing yet, but you  
could always leverage the underlying C library.  I may be able to offer  
some pointers on Windows, but I haven't done something like this on any  
unix platform for a while..

Regan
March 05, 2012
Re: Shutting down thread with Socket blocking for connection
On 05.03.2012 16:46, Regan Heath wrote:
> A more efficient approach is to use async socket routines and an event
> object.
>
> So, in main you create a shared event object, then start the listen thread.
> In listen you call an async select or accept, and then wait on that
> /and/ the shared event object.
>
> To stop listen you set the shared event, which wakes it, and it notices
> the event is set and performs the cleanup/stops itself.
>
> I'm not sure if phobos has support for this sort of thing yet, but you
> could always leverage the underlying C library. I may be able to offer
> some pointers on Windows, but I haven't done something like this on any
> unix platform for a while..
>

Yeah, the main problem with event systems & async I/O is they are not 
cross-platform at all. And looking at linux even across one OS. To wrap 
them sanely on all platforms is no trivial task. I wish we had it in 
phobos though.

-- 
Dmitry Olshansky
March 06, 2012
Re: Shutting down thread with Socket blocking for connection
On 2012-03-05 09:38, Dmitry Olshansky wrote:
> ...
> while(working){
>
> if(select(set, null, null, 10) > 0){ //10 usec wait on a socket, may do
> plain 0
> sock.accept(); // no blocking here
> }
> set.reset();
> set.add(sock);
> receiveTimeout(dur!"us"(1), (int code){ working = false; });
> }
>...

Thanks for the answers.

However, I still have a problem:
What I'd really like to do is have one thread listening for incoming 
connections and spawn a new thread for each connection (it's a simple 
HTTP server, I'm interested in learning about sockets & threads, I'm not 
looking for a HTTP server). "Socket.accept()" returns a Socket whenever 
a connection is made, my intention was to pass this to a new thread, but 
as I pointed out in the initial post I can't find a way to do so, and 
I'm unable to find a solution to this problem from your replies.
Does this mean that it's recommended to keep the listening socket and 
all the sockets it spawns in a single thread? I don't know a whole lot 
on how sockets works, but can't that easily cause poor performance when 
you're reading from or writing to multiple sockets?

I've not been able to find some D2 code that use multiple threads in 
conjunction with sockets, but if anyone know about some examples I'd be 
happy to read that.
March 15, 2012
Re: Shutting down thread with Socket blocking for connection
You could have a look at my attempt:

https://github.com/DannyArends/D-coding/tree/master/src/web
Top | Discussion index | About this forum | D home