Thread overview
Synchronize Class fields between different threads
Nov 10, 2017
DrCataclysm
Nov 10, 2017
rikki cattermole
Nov 10, 2017
DrCataclysm
Nov 10, 2017
rikki cattermole
Nov 10, 2017
bauss
Nov 10, 2017
DrCataclysm
Nov 10, 2017
bauss
Nov 10, 2017
bauss
Nov 10, 2017
crimaniak
November 10, 2017
I am trying to understand concurrent/parallel programming with D but i just don't get how
i should usesome of the concepts.

This is the code i am using to tying out stuff.

public class TCPListener {
    ubyte[] _messageBuffer;
    Socket _server;
    Socket _client;

    // define server in constructor
    this(string address, ushort port) {
        _server = new TcpSocket();
        _server.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
    }

    // starts accept() in a new thread
    void startlistening(){...}

    // accepts connection and assigns client
    // after that start receiving in new thread and keep accepting
    void accept() {...}

    // receives data and adds to buffer
    // emits message if lineend in buffer
    void receiving() {...}

    // gets the client and sends the data (prefferably in its own task/thread)
    // emits signal when completed
    void send(string data) {...}

}

And I would like to use it in the following matter:

int main()
{
    auto o1 = new SpecialisedTCPListener();

    //do other stuff
}

class SpecialisedTCPListener{
    TCPListener _listener;

    this(){
        _listener= new TCPListener("127.0.0.1", 10000);
        _listener.connect(&MessageReceived);
        _listener.connect(&SendCallback);
        _listener.startListening();
    }

    public void MessageReceived(string message){
        auto answer = doSomeThings(message);
        // send the answer
        _listener.Send(answer);
    }

    // did it send correctly?
    public void SendCallback(CallBackData e){...}
}


When i tried this approach it did not work. Accepting and receiving worked normally but sending was
impossible because _client was thread local und would return a nullpointer.

How do i pass fields to the different threads?

I tried using spawn to start the threads but that only works with functions and not with class methods.
What would be better ways to do something like this?



November 10, 2017
Remember this bit: Everything on the heap, is not thread-local, it is global. This includes everything inside a class.

When you synchronize (statement) it is locking and then unlocking a mutex. A class has a mutex, simple! It only prevent multiple threads modifying a single thing at specific times, thats all.
November 10, 2017
On Friday, 10 November 2017 at 13:50:56 UTC, rikki cattermole wrote:
> Remember this bit: Everything on the heap, is not thread-local, it is global. This includes everything inside a class.
>
> When you synchronize (statement) it is locking and then unlocking a mutex. A class has a mutex, simple! It only prevent multiple threads modifying a single thing at specific times, thats all.

this is my implementation of Accept


private void Accept(){
        // start accepting in a different thread
        try{
            _client = _server.accept();
            emit(ClientConnected(_client.remoteAddress.toAddrString));
            auto _acceptTask = task(&this.Accept);
            _acceptTask.executeInNewThread();
            Receive();
        }
        catch (SocketAcceptException e){
            writeln("Error while accepting connection: " ~ e.msg);
        }
    }

Is _client on the Heap or the Stack? If it is on the Stack, how would i get in on the Heap?
November 10, 2017
On 10/11/2017 2:13 PM, DrCataclysm wrote:
> On Friday, 10 November 2017 at 13:50:56 UTC, rikki cattermole wrote:
>> Remember this bit: Everything on the heap, is not thread-local, it is global. This includes everything inside a class.
>>
>> When you synchronize (statement) it is locking and then unlocking a mutex. A class has a mutex, simple! It only prevent multiple threads modifying a single thing at specific times, thats all.
> 
> this is my implementation of Accept
> 
> 
> private void Accept(){
>          // start accepting in a different thread
>          try{
>              _client = _server.accept();
> emit(ClientConnected(_client.remoteAddress.toAddrString));
>              auto _acceptTask = task(&this.Accept);
>              _acceptTask.executeInNewThread();
>              Receive();
>          }
>          catch (SocketAcceptException e){
>              writeln("Error while accepting connection: " ~ e.msg);
>          }
>      }
> 
> Is _client on the Heap or the Stack? If it is on the Stack, how would i get in on the Heap?

Assuming _client is in a class, heap.
November 10, 2017
On Friday, 10 November 2017 at 14:13:26 UTC, DrCataclysm wrote:
> On Friday, 10 November 2017 at 13:50:56 UTC, rikki cattermole wrote:
>> Remember this bit: Everything on the heap, is not thread-local, it is global. This includes everything inside a class.
>>
>> When you synchronize (statement) it is locking and then unlocking a mutex. A class has a mutex, simple! It only prevent multiple threads modifying a single thing at specific times, thats all.
>
> this is my implementation of Accept
>
>
> private void Accept(){
>         // start accepting in a different thread
>         try{
>             _client = _server.accept();
>             emit(ClientConnected(_client.remoteAddress.toAddrString));
>             auto _acceptTask = task(&this.Accept);
>             _acceptTask.executeInNewThread();
>             Receive();
>         }
>         catch (SocketAcceptException e){
>             writeln("Error while accepting connection: " ~ e.msg);
>         }
>     }
>
> Is _client on the Heap or the Stack? If it is on the Stack, how would i get in on the Heap?

_client is allocated in the heap.

Socket accept(); returns the socket created from Socket accepting().

Which is like below:

 protected Socket accepting() pure nothrow
    {
        return new Socket;
    }
November 10, 2017
On Friday, 10 November 2017 at 14:27:41 UTC, bauss wrote:
> On Friday, 10 November 2017 at 14:13:26 UTC, DrCataclysm wrote:
>> On Friday, 10 November 2017 at 13:50:56 UTC, rikki cattermole wrote:
>>> Remember this bit: Everything on the heap, is not thread-local, it is global. This includes everything inside a class.
>>>
>>> When you synchronize (statement) it is locking and then unlocking a mutex. A class has a mutex, simple! It only prevent multiple threads modifying a single thing at specific times, thats all.
>>
>> this is my implementation of Accept
>>
>>
>> private void Accept(){
>>         // start accepting in a different thread
>>         try{
>>             _client = _server.accept();
>>             emit(ClientConnected(_client.remoteAddress.toAddrString));
>>             auto _acceptTask = task(&this.Accept);
>>             _acceptTask.executeInNewThread();
>>             Receive();
>>         }
>>         catch (SocketAcceptException e){
>>             writeln("Error while accepting connection: " ~ e.msg);
>>         }
>>     }
>>
>> Is _client on the Heap or the Stack? If it is on the Stack, how would i get in on the Heap?
>
> _client is allocated in the heap.
>
> Socket accept(); returns the socket created from Socket accepting().
>
> Which is like below:
>
>  protected Socket accepting() pure nothrow
>     {
>         return new Socket;
>     }

thank you, i thought i was going mad.

It is working now. The problem was that the debugger in eclipse ddt seems to completely broken. If i run it directly from bash it is working.

One last question: is there a function in the std to wait for a task to finish within a time limit?

November 10, 2017
On Friday, 10 November 2017 at 14:36:03 UTC, DrCataclysm wrote:
> On Friday, 10 November 2017 at 14:27:41 UTC, bauss wrote:
>> On Friday, 10 November 2017 at 14:13:26 UTC, DrCataclysm wrote:
>>> On Friday, 10 November 2017 at 13:50:56 UTC, rikki cattermole wrote:
>>>> Remember this bit: Everything on the heap, is not thread-local, it is global. This includes everything inside a class.
>>>>
>>>> When you synchronize (statement) it is locking and then unlocking a mutex. A class has a mutex, simple! It only prevent multiple threads modifying a single thing at specific times, thats all.
>>>
>>> this is my implementation of Accept
>>>
>>>
>>> private void Accept(){
>>>         // start accepting in a different thread
>>>         try{
>>>             _client = _server.accept();
>>>             emit(ClientConnected(_client.remoteAddress.toAddrString));
>>>             auto _acceptTask = task(&this.Accept);
>>>             _acceptTask.executeInNewThread();
>>>             Receive();
>>>         }
>>>         catch (SocketAcceptException e){
>>>             writeln("Error while accepting connection: " ~ e.msg);
>>>         }
>>>     }
>>>
>>> Is _client on the Heap or the Stack? If it is on the Stack, how would i get in on the Heap?
>>
>> _client is allocated in the heap.
>>
>> Socket accept(); returns the socket created from Socket accepting().
>>
>> Which is like below:
>>
>>  protected Socket accepting() pure nothrow
>>     {
>>         return new Socket;
>>     }
>
> thank you, i thought i was going mad.
>
> It is working now. The problem was that the debugger in eclipse ddt seems to completely broken. If i run it directly from bash it is working.
>
> One last question: is there a function in the std to wait for a task to finish within a time limit?

Not an ideal solution, but should work: (I'm not aware of any build-in solutions using Phobos' tasks.

```
static const timeLimit = 1000; // Wait for the task up to 1000 milliseconds

while (!task.done && timeLimit)
{
    import core.time : Thread, dur;

    Thread.sleep( dur!("msecs")(1) ); // Preventing the CPU to go nuts
    timeLimit--;
}

if (task.done)
{
    auto value = task.yieldForce();
}
```

Could make it a function though:

```
bool yieldTimeLimit(Task)(Task task)
{
    while (!task.done && timeLimit)
    {
        import core.time : Thread, dur;

        Thread.sleep( dur!("msecs")(1) );
        timeLimit--;
    }

    return task.done;
}

...

if (yieldTimeLimit(task))
{
    auto value = task.yieldForce();
}
```
November 10, 2017
On Friday, 10 November 2017 at 15:01:30 UTC, bauss wrote:
> On Friday, 10 November 2017 at 14:36:03 UTC, DrCataclysm wrote:
>> On Friday, 10 November 2017 at 14:27:41 UTC, bauss wrote:
>>> On Friday, 10 November 2017 at 14:13:26 UTC, DrCataclysm wrote:
>>>> [...]
>>>
>>> _client is allocated in the heap.
>>>
>>> Socket accept(); returns the socket created from Socket accepting().
>>>
>>> Which is like below:
>>>
>>>  protected Socket accepting() pure nothrow
>>>     {
>>>         return new Socket;
>>>     }
>>
>> thank you, i thought i was going mad.
>>
>> It is working now. The problem was that the debugger in eclipse ddt seems to completely broken. If i run it directly from bash it is working.
>>
>> One last question: is there a function in the std to wait for a task to finish within a time limit?
>
> Not an ideal solution, but should work: (I'm not aware of any build-in solutions using Phobos' tasks.
>
> ```
> static const timeLimit = 1000; // Wait for the task up to 1000 milliseconds
>
> while (!task.done && timeLimit)
> {
>     import core.time : Thread, dur;
>
>     Thread.sleep( dur!("msecs")(1) ); // Preventing the CPU to go nuts
>     timeLimit--;
> }
>
> if (task.done)
> {
>     auto value = task.yieldForce();
> }
> ```
>
> Could make it a function though:
>
> ```
> bool yieldTimeLimit(Task)(Task task)
> {
>     while (!task.done && timeLimit)
>     {
>         import core.time : Thread, dur;
>
>         Thread.sleep( dur!("msecs")(1) );
>         timeLimit--;
>     }
>
>     return task.done;
> }
>
> ...
>
> if (yieldTimeLimit(task))
> {
>     auto value = task.yieldForce();
> }
> ```

Pardon my brain fart.

The last bit should be:

```
bool yieldTimeLimit(Task)(Task task, size_t timeLimit)
{
    while (!task.done && timeLimit)
    {
        import core.time : Thread, dur;

        Thread.sleep( dur!("msecs")(1) );
        timeLimit--;
    }

    return task.done;
}

...

if (task.yieldTimeLimit(1000)) // Waits 1000 milliseconds
{
    auto value = task.yieldForce();
}
```
November 10, 2017
On Friday, 10 November 2017 at 14:36:03 UTC, DrCataclysm wrote:

> It is working now. The problem was that the debugger in eclipse ddt seems to completely broken. If i run it directly from bash it is working.

Be careful with such statements. Typically, this situation means that there are Heisenbugs in the code that appear in certain conditions.