Thread overview
Creating a future/promise object
Jul 06, 2015
Frank Pagliughi
Jul 06, 2015
Justin Whear
Jul 07, 2015
Frank Pagliughi
Jul 07, 2015
thedeemon
Jul 07, 2015
Sebastiaan Koppe
Jul 07, 2015
Frank Pagliughi
Jul 07, 2015
Frank Pagliughi
Jul 08, 2015
Sebastiaan Koppe
Jul 14, 2015
Frank Pagliughi
July 06, 2015
Hello All,

I'm trying to figure out how to create a shared object that can be used to track asynchronous operations. Something that can be used like:

  Token tok = mything.start_something();

  // do something else for a while

  int n = tok.get_result();  // Block until result is ready

The 'mything.start_something()' call sends a message to a spawn'ed thread to begin the operation, which also creates and sends back a 'Token' which it will signal when the operation is complete.

I don't see anything in the library to do this, so I tried to create it with a Mutex and Condition Variable, like:

  class Token {
    private Mutex mut;
    private Condition cond;
    private bool completed;
    private int retCode;

    this() {
      mut = new Mutex;
      cond = new Condition(mut);
      completed = false;
    }

    void set_result(int retCode) {
      synchronized (mut) {
        this.retCode = retCode;
        completed = true;
        cond.notify();
      }
    }

    int get_result() {
      synchronized (mut) {
        while (!completed)
          cond.wait();
        return retCode;
    }
  }


But I am getting totally tripped up by the 'shared' qualifier. All objects of type Token are meant to be shared, and yet when I try to define them as such, I get complaints that I can't implicitly convert Mutex and Condition objects to shared. When I do so manually, then I get errors that the Mutex and Condition member functions can't be called on shared objects.

That seems odd. Aren't Mutex objects and Condition Variables explicitly created to be shared across threads?

I assume that I am mixing older, low-level libraries with newer, higher-level libraries, but I can't figure out what the library requires and what the language/compiler is enforcing.

Any help would be appreciated.
July 06, 2015
On Mon, 06 Jul 2015 20:56:03 +0000, Frank Pagliughi wrote:

> Hello All,
> 
> I'm trying to figure out how to create a shared object that can be used to track asynchronous operations. Something that can be used like:
> 
>    Token tok = mything.start_something();
> 
>    // do something else for a while
> 
>    int n = tok.get_result();  // Block until result is ready
> 

std.parallelism.Task implements the high-level pattern you're looking for: http://dlang.org/phobos/std_parallelism.html#.Task
July 07, 2015
On Monday, 6 July 2015 at 20:56:04 UTC, Frank Pagliughi wrote:
>     void set_result(int retCode) {
>       synchronized (mut) {
>         this.retCode = retCode;
>         completed = true;
>         cond.notify();
>       }
>     }
>
>     int get_result() {
>       synchronized (mut) {
>         while (!completed)
>           cond.wait();
>         return retCode;
>     }
>   }
> Any help would be appreciated.

A bit of offtopic:
1) you don't need a mutex here, you can just use the token object itself in "synchronized".
2) I see a deadlock here: you're waiting for the job to be completed but it cannot notify() completion because it cannot enter set_result() because you took the mutex in the waiting thread.
July 07, 2015
Thanks for the replies!

The parallelism Task is *doing* what I would like to do, but the package does not seem to expose the underlying components. The 'Task' essentially has a thread, future, and promise. I already have a long-running thread hidden inside of 'mything' which is serializing access to the communications channel/socket. I send it tasks with send() / receive() operations. So I just need the future/promise part.

Or, I would need a way to execute the Task in a specific thread, like:
  executeInThread(Tid t);

I will try to dig through the parallelism code to see how it is implemented.

As for my attempted implementation, that's a typical pattern with condition variables. The mutex is required because the condition variable needs a specific lock. And there's no deadlock because the condition variable releases the lock when you call cond.wait().
July 07, 2015
On Monday, 6 July 2015 at 20:56:04 UTC, Frank Pagliughi wrote:
>     void set_result(int retCode) {
>       synchronized (mut) {
>         this.retCode = retCode;
>         completed = true;
>         cond.notify();
>       }
>     }
>
>     int get_result() {
>       synchronized (mut) {
>         while (!completed)
>           cond.wait();
>         return retCode;
>     }

Instead of pulling values out, have you considered pushing them? E.g. by supplying a delegate that gets called when the asynchronous action completed.

July 07, 2015
> Instead of pulling values out, have you considered pushing them? E.g. by supplying a delegate that gets called when the asynchronous action completed.

I've always been on the fence about push vs pull when writing libraries of this type. The callback/delegate method is probably more powerful and may reduce latency and context switching, but since the callback happens in the context of the library thread, it exposes the client application to the messiness and problems of managing threads, race conditions, locking and all that. And it can expose the library to performance issues by handing over it's thread to user code which may not return for a long, long time. Plus most of the time I've found that client apps just set a flag and signal a condition variable anyway.

Using a future hides the messiness of threads and race conditions from the user, and it seems that this "Task" paradigm is gaining popularity in a lot of languages.

But a lot of libraries of this sort seem to defer the choice to the user and provide both API's. Mine will have both.
July 07, 2015
It seems that just casting the Token (future) as "shared" to get it across the thread boundary works. From inside my thread:

  receive(
    (shared(Token) tok) {
      int res = do_something();
      (cast(Token)tok).set_result(res);
      ...

And then in the method which is exposed to the user:

  Token start_something() {
    auto tok = new Token();
    send(tid, cast(shared) tok);
    return tok;
  }

As I showed, the "Token" class is carefully crafted with locks and all to have an instance be shared across threads. But will this casting back and forth to "shared" do the right thing? Each thread now has a non-shared reference to the object.
July 08, 2015
On Tuesday, 7 July 2015 at 16:41:41 UTC, Frank Pagliughi wrote:
>> Instead of pulling values out, have you considered pushing them? E.g. by supplying a delegate that gets called when the asynchronous action completed.
>
> I've always been on the fence about push vs pull when writing libraries of this type. The callback/delegate method is probably more powerful and may reduce latency and context switching, but since the callback happens in the context of the library thread, it exposes the client application to the messiness and problems of managing threads, race conditions, locking and all that. And it can expose the library to performance issues by handing over it's thread to user code which may not return for a long, long time. Plus most of the time I've found that client apps just set a flag and signal a condition variable anyway.

The delegate could send a message to user code in another thread/fiber. In any case it removes a blocking get_result() and possibly the need for get_result_timeout().
July 14, 2015
> The delegate could send a message to user code in another thread/fiber. In any case it removes a blocking get_result() and possibly the need for get_result_timeout().

That sounds interesting, and I will look into it. (Questions to follow, no doubt).

But we've gone down a path at a specific solution, though my original curiosity is a little more general. Say I want to create a new class to synchronize threads in a way that is not currently supported in the library. In the spirit of modern D concurrency, I would want to declare the class as shared:

  shared class MyAmazingSyncObject { ... }

I've stumbled upon a few examples of this, but they all seem to use atomics internally. Say I want to use the traditional lock-based objects from the core sync library (mutexes, condition variables, etc). Those objects are not declared as "shared", though I would think they should be.

What is the purpose of a mutex or condition variable if not to be shared across threads?

And since they are not shared, they make using them from a higher-level class difficult.

I ask this from the perspective of a library writer. I don't mean to push traditional, lock based paradigms going forward, but internal to some new, high-order data passing algorithm, a lock or signal might be handy.