Thread overview
Combining Unique type with concurrency module
Sep 13, 2015
Alex
Sep 14, 2015
Ali Çehreli
Sep 14, 2015
Alex
Sep 14, 2015
Ali Çehreli
Sep 15, 2015
Alex
Sep 14, 2015
Dicebot
Sep 14, 2015
Dicebot
September 13, 2015
Hi everybody! I'm new to this forum so, please excuse me in advance for asking silly questions. I think I'm not the first person which wondering about this topic, but I'm trying to combine Unique type and concurrency module, getting the compiler error

struct std.typecons.Unique!(S).Unique is not copyable because it is annotated with @disable

My test code is taken from the examples of the D docu from this page:

void spawnedFunc2(Tid ownerTid)
{
	receive(
		(ref Unique!S ur)
		{
			writeln("Recieved the number ", ur.i);
		}
	);
	send(ownerTid, true);
}

static struct S
{
    int i;
    this(int i){this.i = i;}
}

Unique!S produce()
{
    // Construct a unique instance of S on the heap
    Unique!S ut = new S(5);
    // Implicit transfer of ownership
    return ut;
}

void main()
{
    Unique!S u1;
    u1 = produce();
    auto childTid2 = spawn(&spawnedFunc2, thisTid);
    send(childTid2, u1);
    writeln("Successfully printed number.");
}

I'm aware of the fact, that my u1 struct can't be copied, but I don't intend to do so. As in the docu stated, I want to lend the struct to the other thread (by using ref), being sure, that any other thread can't access the struct during it is processed by the first one.
Is such a thing possible?
Thanks in advance.
Alex
September 14, 2015
On 09/13/2015 09:09 AM, Alex wrote:
> I'm new to this forum so, please excuse me in advance for
> asking silly questions.

Before somebody else says it: There are no silly questions. :)

> struct std.typecons.Unique!(S).Unique is not copyable because it is
> annotated with @disable

I have made the code compile and work (without any thread synchronization at all). See the comments with [Ali] annotations:

import std.stdio;
import std.concurrency;
import std.typecons;

void spawnedFunc2(Tid ownerTid)
{
    /* [Ali] Aside: ownerTid is already and automatically
     * available. You don't need to pass it in explicitly. */

    receive(
        /* [Ali] The compilation error comes from Variant, which
         * happens to be the catch all type for concurrency
         * messages. Unfortunately, there are issues with that
         * type.
         *
         * Although implemented as a pointer, according to
         * Variant, a 'ref' is not a pointer. (I am not sure
         * whether this one is a Variant issue or a language
         * issue.)
         *
         * Changing the message to a pointer to a shared
         * object: */
        (shared(Unique!S) * urShared)
        {
            /* [Ali] Because the expression ur.i does not work on
             * a shared object, we will hack it to unshared
             * first. */
            auto ur = cast(Unique!S*)urShared;
            writeln("Recieved the number ", ur.i);
        }
    );
    send(ownerTid, true);
}

static struct S
{
    int i;
    this(int i){this.i = i;}
}

Unique!S produce()
{
    // Construct a unique instance of S on the heap
    Unique!S ut = new S(5);
    // Implicit transfer of ownership
    return ut;
}

void main()
{
    Unique!S u1;
    u1 = produce();
    auto childTid2 = spawn(&spawnedFunc2, thisTid);

    /* [Ali] Cast it to shared so that it passes to the other
     * side. Unfortunately, there is no guarantee that this
     * object is not used by more than one thread. */
    send(childTid2, cast(shared(Unique!S*))&u1);

    /* [Ali] We must wait to ensure that u1 is not destroyed
     * before all workers have finished their tasks. */
    import core.thread;
    thread_joinAll();

    writeln("Successfully printed number.");
}

Note that thread synchronization is still the programmer's responsibility.

> I'm aware of the fact, that my u1 struct can't be copied, but I don't
> intend to do so.

Correct.

> As in the docu stated, I want to lend the struct to the
> other thread (by using ref), being sure, that any other thread can't
> access the struct during it is processed by the first one.

There is a misconception. Unique guarantees that the object will not be copied. It does not provide any guarantee that only one thread will access the object. It is possible to write a type that acquires a lock during certain operations but Unique isn't that type.

> Is such a thing possible?
> Thanks in advance.
> Alex

Ali

September 14, 2015
On Monday, 14 September 2015 at 00:11:07 UTC, Ali Çehreli wrote:
> On 09/13/2015 09:09 AM, Alex wrote:
> > I'm new to this forum so, please excuse me in advance for
> > asking silly questions.
>
> Before somebody else says it: There are no silly questions. :)
>
> > struct std.typecons.Unique!(S).Unique is not copyable because
> it is
> > annotated with @disable
>
> I have made the code compile and work (without any thread synchronization at all). See the comments with [Ali] annotations:
>
> import std.stdio;
> import std.concurrency;
> import std.typecons;
>
> void spawnedFunc2(Tid ownerTid)
> {
>     /* [Ali] Aside: ownerTid is already and automatically
>      * available. You don't need to pass it in explicitly. */
>
>     receive(
>         /* [Ali] The compilation error comes from Variant, which
>          * happens to be the catch all type for concurrency
>          * messages. Unfortunately, there are issues with that
>          * type.
>          *
>          * Although implemented as a pointer, according to
>          * Variant, a 'ref' is not a pointer. (I am not sure
>          * whether this one is a Variant issue or a language
>          * issue.)
>          *
>          * Changing the message to a pointer to a shared
>          * object: */
>         (shared(Unique!S) * urShared)
>         {
>             /* [Ali] Because the expression ur.i does not work on
>              * a shared object, we will hack it to unshared
>              * first. */
>             auto ur = cast(Unique!S*)urShared;
>             writeln("Recieved the number ", ur.i);
>         }
>     );
>     send(ownerTid, true);
> }
>
> static struct S
> {
>     int i;
>     this(int i){this.i = i;}
> }
>
> Unique!S produce()
> {
>     // Construct a unique instance of S on the heap
>     Unique!S ut = new S(5);
>     // Implicit transfer of ownership
>     return ut;
> }
>
> void main()
> {
>     Unique!S u1;
>     u1 = produce();
>     auto childTid2 = spawn(&spawnedFunc2, thisTid);
>
>     /* [Ali] Cast it to shared so that it passes to the other
>      * side. Unfortunately, there is no guarantee that this
>      * object is not used by more than one thread. */
>     send(childTid2, cast(shared(Unique!S*))&u1);
>
>     /* [Ali] We must wait to ensure that u1 is not destroyed
>      * before all workers have finished their tasks. */
>     import core.thread;
>     thread_joinAll();
>
>     writeln("Successfully printed number.");
> }
>
> Note that thread synchronization is still the programmer's responsibility.
>
> > I'm aware of the fact, that my u1 struct can't be copied, but
> I don't
> > intend to do so.
>
> Correct.
>
> > As in the docu stated, I want to lend the struct to the
> > other thread (by using ref), being sure, that any other
> thread can't
> > access the struct during it is processed by the first one.
>
> There is a misconception. Unique guarantees that the object will not be copied. It does not provide any guarantee that only one thread will access the object. It is possible to write a type that acquires a lock during certain operations but Unique isn't that type.
>
> > Is such a thing possible?
> > Thanks in advance.
> > Alex
>
> Ali

Thanks for answering!
Do you have a hint how to create such a type? The needed operation is "onPassingTo" another thread. So the idea is to create a resource, which is not really shared (a question of definition, I think), as it should be accessible only from one thread at a time.
But there is a "main" thread, from which the resource can be lent to "worker" threads and there are "worker" threads, where only one worker can have the resource at a given time.

On my own the next possibility I would try is something with RefCounting and checking, how many references there exist. Deciding on this number allow or disallow accessing the reference again.

By the way, synchronizing by hand is ok. Don't know how important that is, but the idea is, that synchronization appears very rare, as the lending process acquires and releases resources automatically and the next thread can acquire the resource after a release, the synchronization should not be expected systematically but only at some strange time points... I can't even give an example of such times now... maybe only at the end of the program, to let all workers end their existence.
September 14, 2015
On Monday, 14 September 2015 at 00:11:07 UTC, Ali Çehreli wrote:
> There is a misconception. Unique guarantees that the object will not be copied. It does not provide any guarantee that only one thread will access the object. It is possible to write a type that acquires a lock during certain operations but Unique isn't that type.

By intention Unique means more than just "no copies" - it also means "only one reference at a single point of time" which, naturally, leads to implicit moving (not sharing!) between threads. However, AFAIK there are still ways to break that rule with existing Unique implementation and, of course, std.concurrency was never pacthed for special Unique support (it should).
September 14, 2015
On 09/14/2015 12:07 AM, Alex wrote:

> Do you have a hint how to create such a type? The needed operation is
> "onPassingTo" another thread. So the idea is to create a resource, which
> is not really shared (a question of definition, I think), as it should
> be accessible only from one thread at a time.
> But there is a "main" thread, from which the resource can be lent to
> "worker" threads and there are "worker" threads, where only one worker
> can have the resource at a given time.

Here is an unpolished solution that enforces that the thread that is using it is really its owner:

struct MultiThreadedUnique(T) {
    Tid currentOwner;
    Unique!T u;

    this(Unique!T u) {
        this.u = u.release();
        this.currentOwner = thisTid;
    }

    void enforceRightOwner() {
        import std.exception;
        import std.string;
        enforce(currentOwner == thisTid, format("%s is the owner; not %s",
                                                currentOwner, thisTid));
    }

    ref Unique!T get() {
        enforceRightOwner();
        return u;
    }

    void giveTo(Tid newOwner) {
        enforceRightOwner();
        currentOwner = newOwner;
    }
}

The entire program that I tested it with:

import std.stdio;
import std.concurrency;
import std.typecons;

void spawnedFunc2(Tid ownerTid)
{
    receive(
        (shared(MultiThreadedUnique!S) * urShared)
        {
            auto ur = cast(MultiThreadedUnique!S*)urShared;
            writeln("Recieved the number ", ur.get().i);
            ur.giveTo(ownerTid);
        }
    );
    send(ownerTid, true);
}

static struct S
{
    int i;
    this(int i){this.i = i;}
}

Unique!S produce()
{
    // Construct a unique instance of S on the heap
    Unique!S ut = new S(5);
    // Implicit transfer of ownership
    return ut;
}

struct MultiThreadedUnique(T) {
    Tid currentOwner;
    Unique!T u;

    this(Unique!T u) {
        this.u = u.release();
        this.currentOwner = thisTid;
    }

    void enforceRightOwner() {
        import std.exception;
        import std.string;
        enforce(currentOwner == thisTid, format("%s is the owner; not %s",
                                                currentOwner, thisTid));
    }

    ref Unique!T get() {
        enforceRightOwner();
        return u;
    }

    void giveTo(Tid newOwner) {
        enforceRightOwner();
        currentOwner = newOwner;
    }
}

void main()
{
    MultiThreadedUnique!S u1 = produce();
    auto childTid2 = spawn(&spawnedFunc2, thisTid);

    u1.giveTo(childTid2);
    send(childTid2, cast(shared(MultiThreadedUnique!S*))&u1);

    import core.thread;
    thread_joinAll();

    writeln("Successfully printed number.");
    auto u2 = &u1.get();
}

Ali

September 14, 2015
On Monday, 14 September 2015 at 00:11:07 UTC, Ali Çehreli wrote:
>     send(childTid2, cast(shared(Unique!S*))&u1);

And yeah this violates the idea of Unique. Sadly, I am not aware of any way to prohibit taking address of an aggregate.
September 15, 2015
On Monday, 14 September 2015 at 08:08:35 UTC, Ali Çehreli wrote:
> void main()
> {
>     MultiThreadedUnique!S u1 = produce();
>     auto childTid2 = spawn(&spawnedFunc2, thisTid);
>
>     u1.giveTo(childTid2);
>     send(childTid2, cast(shared(MultiThreadedUnique!S*))&u1);
>
>     import core.thread;
>     thread_joinAll();
>
>     writeln("Successfully printed number.");
>     auto u2 = &u1.get();
> }
>
> Ali

ok... I tried the code. It works well in the first approach... But I have to think about it some more time... Thanks again!