August 04, 2015
On Tuesday, 4 August 2015 at 11:42:54 UTC, Dicebot wrote:
> On Tuesday, 4 August 2015 at 11:33:11 UTC, 岩倉 澪 wrote:
>> On Tuesday, 4 August 2015 at 10:37:39 UTC, Dicebot wrote:
>>> std.concurrency does by-value message passing (in this case just ptr+length), it never deep copies automatically
>>
>> I assumed that it would deep copy (in the case of mutable data) since the data being sent is thread-local (unless I am misunderstanding something)
>
> It is heap-allocated and in there is no thread-local heap currently in D - only globals and static variables.
>
> std.concurrency never deep copies - if you are trying to send data which contains indirections (pointers/arrays) _and_ is not marked either immutable or shared, it will simply not compile.

Ahh, thanks for the clarification! That makes a lot of sense
August 06, 2015
On Tuesday, 4 August 2015 at 08:35:10 UTC, Dicebot wrote:
>     // in real app use `receiveTimeout` to do useful stuff until
>     // result message is received
>     auto output = receiveOnly!(immutable(Bar)[]);

New question: how would I receive a immutable value with receiveTimeout? I need the results from my worker thread outside of the delegate that receiveTimeout takes.

Also: what is the best way to kill off the worker thread when I close the application, without having to wait for the worker thread to complete? My first thought was to use receiveTimeout in the worker thread, but the work is being done in a parallel foreach loop, and I am not sure if there is a way to safely use receiveTimeout in a parallel situation...
I also found Thread.isDaemon in core.thread. I tried doing auto thread = Thread.getThis(); thread.isDaemon = true; at the start of the worker thread, but it still seems to wait for it to complete before closing.

Thanks again!

August 07, 2015
On Thursday, 6 August 2015 at 21:17:15 UTC, 岩倉 澪 wrote:
> On Tuesday, 4 August 2015 at 08:35:10 UTC, Dicebot wrote:
>>     // in real app use `receiveTimeout` to do useful stuff until
>>     // result message is received
>>     auto output = receiveOnly!(immutable(Bar)[]);
>
> New question: how would I receive a immutable value with receiveTimeout? I need the results from my worker thread outside of the delegate that receiveTimeout takes.
>
> Also: what is the best way to kill off the worker thread when I close the application, without having to wait for the worker thread to complete? My first thought was to use receiveTimeout in the worker thread, but the work is being done in a parallel foreach loop, and I am not sure if there is a way to safely use receiveTimeout in a parallel situation...
> I also found Thread.isDaemon in core.thread. I tried doing auto thread = Thread.getThis(); thread.isDaemon = true; at the start of the worker thread, but it still seems to wait for it to complete before closing.
>
> Thanks again!

receiveTimeout can be used like this:

void main()
{
    spawn(&workerFunc);

    writeln("Waiting for a message");
    bool received = false;
    while (!received) {
        received = receiveTimeout(600.msecs,
                                  (string message) {  // <=== Receiving a value
                                      writeln("received: ", message);
                                });

        if (!received) {
            writeln("... no message yet");

            /* ... other operations may be executed here ... */
        }
    }
}
(cf. http://ddili.org/ders/d.en/concurrency.html)

To stop threads immediately, I've found that the best way is to use a shared variable, typically a bool, that is changed only in one place. I hope I'll find the time on Monday to post a simple example.

1. shared bool ABORT;
2.
3.// in owner thread
4. ABORT = true;  // The only place where you do this.
5. bool res;
6. while ((res = receiveOnly!bool()) == false) { debug writeln("waiting for abort ..."); }



// in worker thread(s)

foreach ()
{
  if (ABORT)
    break;
  // working away
}
// ...
ownerTid.send(true);

If you have more than one thread to abort, you'll have to adapt lines 5 and 6 accordingly.

Unfortunately, sending an abort message to a thread as in `send(thread, true)` takes too long. Setting a global flag like ABORT is instantaneous. Beware of data races though. You might want to have a look at:

http://ddili.org/ders/d.en/concurrency_shared.html

Especially `synchronized` and atomicOp.
August 07, 2015
On Friday, 7 August 2015 at 15:55:33 UTC, Chris wrote:

Using a shared boolean is probably not the "best way", I should have said the most efficient and reliable way.


August 07, 2015
On Friday, 7 August 2015 at 15:55:33 UTC, Chris wrote:
> To stop threads immediately, I've found that the best way is to use a shared variable, typically a bool, that is changed only in one place.
> ...
> Unfortunately, sending an abort message to a thread as in `send(thread, true)` takes too long. Setting a global flag like ABORT is instantaneous. Beware of data races though. You might want to have a look at:
>
> http://ddili.org/ders/d.en/concurrency_shared.html
>
> Especially `synchronized` and atomicOp.

Ah, I already had a variable like ABORT in my application for signaling the main thread to close, so this was a surprisingly painless change! I made that variable shared and then did the following:

instead of
    ABORT = true;
I now do
    import core.atomic;
    atomicStore!(MemoryOrder.rel)(ABORT, true);
and instead of
    if(ABORT) break;
I now do
    import core.atomic;
    if(atomicLoad!(MemoryOrder.acq)(ABORT)) break;

This works great, and with the memory ordering specified I do not see a noticeable difference in performance, whereas with the default memory ordering my ~36 second processing takes ~38 seconds.

One concern I had was that `break` might be a bad idea inside of a parallel foreach. Luckily, it seems that the author(s) of std.parallelism thought of this - according to the documentation break inside of a parallel foreach throws an exception and some clever exception handling is done under the hood. I don't see an uncaught exception when I close my application, but it is now able to close without having to wait for the worker thread to complete, so everything seems fine and dandy! Thanks for the help!

On Friday, 7 August 2015 at 15:55:33 UTC, Chris wrote:
> receiveTimeout can be used like this: ...

My problem is that when you do this:

>         received = receiveTimeout(600.msecs,
>                                   (string message) {  // <=== Receiving a value
>                                       writeln("received: ", message);
>                                 });

"message" is local to the delegate that receiveTimeout takes.
I want to use "message" outside of the delegate in the receiving thread. However, if you send an immutable value from the worker thread, afaict there would be no way to assign it to a global/outer variable without making a mutable copy (expensive!)
I haven't really spent much time trying to pass my "message" as mutable via shared yet, but hopefully that could work...
August 08, 2015
On Friday, 7 August 2015 at 22:13:35 UTC, 岩倉 澪 wrote:
> "message" is local to the delegate that receiveTimeout takes.
> I want to use "message" outside of the delegate in the receiving thread. However, if you send an immutable value from the worker thread, afaict there would be no way to assign it to a global/outer variable without making a mutable copy (expensive!)
> I haven't really spent much time trying to pass my "message" as mutable via shared yet, but hopefully that could work...

Found the answer to this :) http://forum.dlang.org/post/mailman.1706.1340318206.24740.digitalmars-d-learn@puremagic.com

I send the results from my worker thread with assumeUnique, and then simply cast away immutable in the receiving thread like so:

(in module scope)
    Bar[] baz;

(in application loop)
    import std.array
    if(baz.empty)
    {
        import std.concurrency, std.datetime;
        receiveTimeout(0.msecs,
                (immutable Bar[] bar){ baz = cast(Bar[])bar; });
    }

August 08, 2015
On Saturday, 8 August 2015 at 00:39:57 UTC, 岩倉 澪 wrote:
>         receiveTimeout(0.msecs,
>                 (immutable Bar[] bar){ baz = cast(Bar[])bar; });

Whoops, that should be:
         receiveTimeout(0.msecs,
                 (immutable(Bar)[] bar){ baz = cast(Bar[])bar; });
August 08, 2015
On Saturday, 8 August 2015 at 00:39:57 UTC, 岩倉 澪 wrote:
> Found the answer to this :) http://forum.dlang.org/post/mailman.1706.1340318206.24740.digitalmars-d-learn@puremagic.com
>
> I send the results from my worker thread with assumeUnique, and then simply cast away immutable in the receiving thread like so:
>
> (in module scope)
>     Bar[] baz;
>
> (in application loop)
>     import std.array
>     if(baz.empty)
>     {
>         import std.concurrency, std.datetime;
>         receiveTimeout(0.msecs,
>                 (immutable Bar[] bar){ baz = cast(Bar[])bar; });
>     }

I'm not completely sure that it's bad in this case, but you really shouldn't be casting away immutable. It's undefined behaviour in D.
August 08, 2015
On Saturday, 8 August 2015 at 01:24:04 UTC, 岩倉 澪 wrote:
> On Saturday, 8 August 2015 at 00:39:57 UTC, 岩倉 澪 wrote:
>>         receiveTimeout(0.msecs,
>>                 (immutable Bar[] bar){ baz = cast(Bar[])bar; });
>
> Whoops, that should be:
>          receiveTimeout(0.msecs,
>                  (immutable(Bar)[] bar){ baz = cast(Bar[])bar; });

Use negative value for `receiveTimeout`. http://stackoverflow.com/q/31616339/944911
August 08, 2015
On Saturday, 8 August 2015 at 00:39:57 UTC, 岩倉 澪 wrote:
> On Friday, 7 August 2015 at 22:13:35 UTC, 岩倉 澪 wrote:
>> "message" is local to the delegate that receiveTimeout takes.
>> I want to use "message" outside of the delegate in the receiving thread. However, if you send an immutable value from the worker thread, afaict there would be no way to assign it to a global/outer variable without making a mutable copy (expensive!)
>> I haven't really spent much time trying to pass my "message" as mutable via shared yet, but hopefully that could work...
>
> Found the answer to this :) http://forum.dlang.org/post/mailman.1706.1340318206.24740.digitalmars-d-learn@puremagic.com
>
> I send the results from my worker thread with assumeUnique, and then simply cast away immutable in the receiving thread like so:
>
> (in module scope)
>     Bar[] baz;
>
> (in application loop)
>     import std.array
>     if(baz.empty)
>     {
>         import std.concurrency, std.datetime;
>         receiveTimeout(0.msecs,
>                 (immutable Bar[] bar){ baz = cast(Bar[])bar; });
>     }

Note aside: if you only import what you need (say `import std.concurrency : receiveTimeout; std.datetime : msecs`), you can reduce the size of the executable considerably as your program grows.