Jump to page: 1 2
Thread overview
Aliases to mutable thread-local data not allowed
Mar 10, 2020
mark
Mar 10, 2020
Simen Kjærås
Mar 10, 2020
mark
Mar 10, 2020
mark
Mar 10, 2020
mark
Re: Aliases to mutable thread-local data not allowed [testable source code]
Mar 10, 2020
mark
Mar 11, 2020
Simen Kjærås
Mar 11, 2020
mark
Re: Aliases to mutable thread-local data not allowed [solved-ish]
Mar 11, 2020
mark
Mar 11, 2020
Simen Kjærås
Mar 11, 2020
mark
Mar 11, 2020
Simen Kjærås
Mar 11, 2020
mark
March 10, 2020
I have this struct:

struct Deb {
    string name;
    ...
    Unit[string] tags; // set of tags

    Deb dup() const {
        Deb deb;
        deb.name = name;
        ...
        foreach (key; tags.byKey)
            deb.tags[key] = unit;
        return deb;
    }
}

And I want to populate an AA of them:

Deb[string] debForName;

This takes 1.5 sec which is too slow.

So now I'm trying to read the files concurrently:

private struct DoneMessage {}

void initialize() { // main thread (some code elided)
    Tid[] tids;
    try {
        foreach (string filename; dirEntries(PATH_PATTERN, SpanMode.shallow))
            tids ~= spawn(&readPackageFile, thisTid, filename);
        auto jobs = tids.length;
        while (jobs) {
            receive(
                (Deb deb) { debForName[deb.name] = deb; },
                (DoneMessage m) { jobs--; }
            );
        }
    } catch (FileException err) {
        stderr.writeln("failed to read packages: ", err);
    }
}

// This is called once per child thread and calls send() 1000s of times to
// return Deb structs, finally sending DoneMessage.
void readPackageFile(Tid parentTid, string filename) { // (some code elided)
    try {
        Deb deb;
        auto file = File(filename);
        foreach(lino, line; file.byLine.enumerate(1))
            readPackageLine(parentTid, filename, lino, line, deb);
            // readPackageLine also calls send with same code as AAA below
        if (deb.valid)
            send(parentTid, deb.dup); // AAA
    } catch (FileException err) {
        stderr.writeln(err);
    }
    send(parentTid, DoneMessage());
}

Unfortunately, I can't send Debs:

src/model.d(71,30): Error: template std.concurrency.spawn cannot deduce function from argument types !()(void delegate(Tid parentTid, string filename), Tid, string), candidates are:
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(460,5):        spawn(F, T...)(F fn, T args)
  with F = void delegate(Tid, string),
       T = (Tid, string)
  must satisfy the following constraint:
       isSpawnable!(F, T)
src/model.d(72,13): Warning: statement is not reachable
src/model.d(73,13): Warning: statement is not reachable
src/model.d(79,13): Warning: statement is not reachable
src/model.d(72,13): Warning: statement is not reachable
src/model.d(73,13): Warning: statement is not reachable
src/model.d(79,13): Warning: statement is not reachable
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(625,5): Error: static assert:  "Aliases to mutable thread-local data not allowed."
src/model.d(102,21):        instantiated from here: send!(Deb)
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/ldc2 failed with exit code 1.

Is there any nice solution to this? Surely it is a common pattern to read multiple files and create lots of data items to be merged into a collection?
March 10, 2020
On Tuesday, 10 March 2020 at 08:13:19 UTC, mark wrote:
> I have this struct:
>
> struct Deb {
>     string name;
>     ...
>     Unit[string] tags; // set of tags
>
>     Deb dup() const {
>         Deb deb;
>         deb.name = name;
>         ...
>         foreach (key; tags.byKey)
>             deb.tags[key] = unit;
>         return deb;
>     }
> }


> void readPackageFile(Tid parentTid, string filename) { // (some code elided)
>     try {
>         Deb deb;
>         auto file = File(filename);
>         foreach(lino, line; file.byLine.enumerate(1))
>             readPackageLine(parentTid, filename, lino, line, deb);
>             // readPackageLine also calls send with same code as AAA below
>         if (deb.valid)
>             send(parentTid, deb.dup); // AAA


> /home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(625,5): Error: static assert:  "Aliases to mutable thread-local data not allowed."
>
> Is there any nice solution to this? Surely it is a common pattern to read multiple files and create lots of data items to be merged into a collection?

As the error message hints at, the problem is Deb may hold references to data that is shared with other objects on the thread from which it originates. Since you know this is not the case, even if the compiler can't prove it, you can safely cast your Deb to immutable:

    if (deb.valid)
        send(parentTid, cast(immutable)deb.dup);

In fact, even the .dup is unnecessary here, since no data is shared with other objects, so you can simply write send(parentTid, cast(immutable)deb);. (Note on this point: since you have not included all your code, it could be other parts create shared mutable state, in which case .dup is necessary, and if badly written may not be sufficient.

--
  Simen
March 10, 2020
On Tuesday, 10 March 2020 at 10:02:16 UTC, Simen Kjærås wrote:
[snip]
> As the error message hints at, the problem is Deb may hold references to data that is shared with other objects on the thread from which it originates. Since you know this is not the case, even if the compiler can't prove it, you can safely cast your Deb to immutable:
>
>     if (deb.valid)
>         send(parentTid, cast(immutable)deb.dup);
>
> In fact, even the .dup is unnecessary here, since no data is shared with other objects, so you can simply write send(parentTid, cast(immutable)deb);. (Note on this point: since you have not included all your code, it could be other parts create shared mutable state, in which case .dup is necessary, and if badly written may not be sufficient.

Thanks, that's led to some progress.

Once each Deb is populated its contents are never modified.
Nonetheless I've kept with .dup since I reuse the same Deb each time to populate, then send a copy, then clear and reuse the original.

Unfortunately, I'm now getting different errors:

src/model.d(85,30): Error: template std.concurrency.spawn cannot deduce function from argument types !()(void delegate(Tid parentTid, string filename), Tid, string), candidates are:
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(460,5):        spawn(F, T...)(F fn, T args)
  with F = void delegate(Tid, string),
       T = (Tid, string)
  must satisfy the following constraint:
       isSpawnable!(F, T)
src/model.d(86,13): Warning: statement is not reachable
src/model.d(87,13): Warning: statement is not reachable
src/model.d(86,13): Warning: statement is not reachable
src/model.d(87,13): Warning: statement is not reachable
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/variant.d(701,26): Error: cannot implicitly convert expression rhs of type immutable(Deb) to Deb
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/variant.d(603,17): Error: template instance std.variant.VariantN!32LU.VariantN.opAssign!(immutable(Deb)) error instantiating
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(126,22):        instantiated from here: __ctor!(immutable(Deb))
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(656,23):        instantiated from here: __ctor!(immutable(Deb))
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(647,10):        instantiated from here: _send!(immutable(Deb))
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(626,10):        instantiated from here: _send!(immutable(Deb))
src/model.d(115,21):        instantiated from here: send!(immutable(Deb))
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/ldc2 failed with exit code 1.

Should I be somehow casting away the immutable at the receive end? (Even though received Debs are never modified?)


March 10, 2020
I just tried:

            auto jobs = tids.length;
            while (jobs) {
                receive(
                    (Deb deb) { debForName[deb.name] = cast(Deb)deb; },
                    (DoneMessage m) { jobs--; }
                );
            }

to cast away the immutable from the sender, but that doesn't work either:

src/model.d(85,30): Error: template std.concurrency.spawn cannot deduce function from argument types !()(void delegate(Tid parentTid, string filename), Tid, string), candidates are:
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(460,5):        spawn(F, T...)(F fn, T args)
  with F = void delegate(Tid, string),
       T = (Tid, string)
  must satisfy the following constraint:
       isSpawnable!(F, T)
src/model.d(86,13): Warning: statement is not reachable
src/model.d(87,13): Warning: statement is not reachable
src/model.d(86,13): Warning: statement is not reachable
src/model.d(87,13): Warning: statement is not reachable
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/variant.d(701,26): Error: cannot implicitly convert expression rhs of type immutable(Deb) to Deb
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/variant.d(603,17): Error: template instance std.variant.VariantN!32LU.VariantN.opAssign!(immutable(Deb)) error instantiating
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(126,22):        instantiated from here: __ctor!(immutable(Deb))
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(656,23):        instantiated from here: __ctor!(immutable(Deb))
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(647,10):        instantiated from here: _send!(immutable(Deb))
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(626,10):        instantiated from here: _send!(immutable(Deb))
src/model.d(115,21):        instantiated from here: send!(immutable(Deb))
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/ldc2 failed with exit code 1.
:

March 10, 2020
On 3/10/20 7:09 AM, mark wrote:
> I just tried:
> 
>              auto jobs = tids.length;
>              while (jobs) {
>                  receive(
>                      (Deb deb) { debForName[deb.name] = cast(Deb)deb; },
>                      (DoneMessage m) { jobs--; }
>                  );
>              }
> 
> to cast away the immutable from the sender, but that doesn't work either:

You aren't accepting an immutable(Deb), but a Deb. So you aren't actually casting away anything.

But this seems more like it would be a runtime error (std.concurrency can't find any function that handles the immutable(Deb) e.g.).

Still, the correct thing here is to handle immutable(Deb). However, I'd strongly caution you against casting away immutable, that can lead to undefined behavior in D. Better to just store it as immutable to begin with.

> src/model.d(85,30): Error: template std.concurrency.spawn cannot deduce function from argument types !()(void delegate(Tid parentTid, string filename), Tid, string), candidates are:
> /home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(460,5):        spawn(F, T...)(F fn, T args)
>    with F = void delegate(Tid, string),
>         T = (Tid, string)
>    must satisfy the following constraint:
>         isSpawnable!(F, T)
> src/model.d(86,13): Warning: statement is not reachable
> src/model.d(87,13): Warning: statement is not reachable
> src/model.d(86,13): Warning: statement is not reachable
> src/model.d(87,13): Warning: statement is not reachable

Hard to see what the problem is here, it looks like you have a function that has unreachable statements, which is why the spawn isn't compiling (the function you are passing it can't compile).

> /home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/variant.d(701,26): Error: cannot implicitly convert expression rhs of type immutable(Deb) to Deb
> /home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/variant.d(603,17): Error: template instance std.variant.VariantN!32LU.VariantN.opAssign!(immutable(Deb)) error instantiating
> /home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(126,22):        instantiated from here: __ctor!(immutable(Deb))
> /home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(656,23):        instantiated from here: __ctor!(immutable(Deb))
> /home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(647,10):        instantiated from here: _send!(immutable(Deb))
> /home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(626,10):        instantiated from here: _send!(immutable(Deb))
> src/model.d(115,21):        instantiated from here: send!(immutable(Deb))
> /home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/ldc2 failed with exit code 1.

This looks like an error in std.concurrency (or std.variant), and I'm not sure what is the problem here. It seems std.variant cannot accept an immutable, which makes no sense as immutable data is a huge usecase of std.concurrency.

Can you narrow this down to a small example that can be submitted as a bug report?

-Steve
March 10, 2020
On Tuesday, 10 March 2020 at 15:27:04 UTC, Steven Schveighoffer wrote:
> On 3/10/20 7:09 AM, mark wrote:
[snip]
> Still, the correct thing here is to handle immutable(Deb). However, I'd strongly caution you against casting away immutable, that can lead to undefined behavior in D. Better to just store it as immutable to begin with.

I'm happy to store immutable Debs in
Deb[string] debForName;
but the syntax
immutable(Deb)[string] debForName;
just produced errors.

[snip]
> Hard to see what the problem is here, it looks like you have a function that has unreachable statements, which is why the spawn isn't compiling (the function you are passing it can't compile).

They are only unreachable (I think) because of the stuff further down the backtrace.

[snip]
> This looks like an error in std.concurrency (or std.variant), and I'm not sure what is the problem here. It seems std.variant cannot accept an immutable, which makes no sense as immutable data is a huge usecase of std.concurrency.
>
> Can you narrow this down to a small example that can be submitted as a bug report?

I'll try to put the project on github -- if I ever get it working it'll be GPL, just that I don't like putting up pre-alpha stuff. So then I'll add the link.

Thanks.

March 10, 2020
I've managed to make a cut-down version that's < 170 LOC.
It needs to be run on Debian or a Debian-based Linux (e.g., Ubuntu).

Below is the full source followed by the error output.

// app.d
import std.typecons: Tuple;

void main() {
    import std.stdio: writeln, writefln;
    auto model = Model();
    model.readPackages();
    writefln("read %,d packages", model.length);
}

enum PACKAGE_DIR = "/var/lib/apt/lists";
enum PACKAGE_PATTERN = "*Packages";
alias Unit = void[0];
enum unit = Unit.init;
alias MaybeKeyValue = Tuple!(string, "key", string, "value", bool, "ok");
struct DoneMessage {}

struct Deb {
    string name;
    string description;
    Unit[string] tags; // set of tags

    Deb dup() const {
        Deb deb;
        deb.name = name;
        deb.description = description;
        foreach (key; tags.byKey)
            deb.tags[key] = unit;
        return deb;
    }

    bool valid() {
        import std.string: empty;
        return !name.empty && !description.empty;
    }

    void clear() {
        name = "";
        description = "";
        tags.clear;
    }
}

struct Model {
    import std.concurrency: Tid;

    private Deb[string] debForName; // Only read once populated

    size_t length() const { return debForName.length; }

    void readPackages() {
        import std.concurrency: receive, thisTid, spawn;
        import std.file: dirEntries, FileException, SpanMode;

        Tid[] tids;
        try {
            foreach (string filename; dirEntries(PACKAGE_DIR,
                                                 PACKAGE_PATTERN,
                                                 SpanMode.shallow))
                tids ~= spawn(&readPackageFile, thisTid, filename);
            auto jobs = tids.length;
            while (jobs) {
                receive(
                    (Deb deb) { debForName[deb.name] = deb; },
                    (DoneMessage) { jobs--; }
                );
            }
        } catch (FileException err) {
            import std.stdio: stderr;
            stderr.writeln("failed to read packages: ", err);
        }
    }

    private void readPackageFile(Tid parentTid, string filename) {
        import std.concurrency: send;
        import std.file: FileException;
        import std.range: enumerate;
        import std.stdio: File, stderr;

        try {
            bool inDescription = false; // Descriptions can by multi-line
            bool inContinuation = false; // Other things can be multi-line
            Deb deb;
            auto file = File(filename);
            foreach(lino, line; file.byLine.enumerate(1))
                readPackageLine(parentTid, filename, lino, line, deb,
                                inDescription, inContinuation);
            if (deb.valid)
                send(parentTid, cast(immutable)deb.dup);
        } catch (FileException err) {
            stderr.writefln("error: %s: failed to read packages: %s",
                            filename, err);
        }
        send(parentTid, DoneMessage());
    }

    private void readPackageLine(
            Tid parentTid, const string filename, const int lino,
            const(char[]) line, ref Deb deb, ref bool inDescription,
            ref bool inContinuation) {
        import std.concurrency: send;
        import std.path: baseName;
        import std.stdio: stderr;
        import std.string: empty, startsWith, strip;

        if (strip(line).empty) {
            if (deb.valid)
                send(parentTid, cast(immutable)deb.dup);
            else if (!deb.name.empty || !deb.description.empty ||
                     !deb.tags.empty)
                stderr.writefln("error: %s:%,d: incomplete package: %s",
                                baseName(filename), lino, deb);
            deb.clear;
            return;
        }
        if (inDescription || inContinuation) {
            if (line.startsWith(' ') || line.startsWith('\t')) {
                if (inDescription)
                    deb.description ~= line;
                return;
            }
            inDescription = inContinuation = false;
        }
        immutable keyValue = maybeKeyValue(line);
        if (!keyValue.ok)
            inContinuation = true;
        else
            inDescription = populateDeb(deb, keyValue.key,
                                        keyValue.value);
    }
}

MaybeKeyValue maybeKeyValue(const(char[]) line) {
    import std.string: indexOf, strip;

    immutable i = line.indexOf(':');
    if (i == -1)
        return MaybeKeyValue("", "", false);
    immutable key = strip(line[0..i]).idup;
    immutable value = strip(line[i + 1..$]).idup;
    return MaybeKeyValue(key, value, true);
}

bool populateDeb(ref Deb deb, const string key, const string value) {
    import std.conv: to;

    switch (key) {
        case "Package":
            deb.name = value;
            return false;
        case "Description", "Npp-Description": // XXX ignore Npp-?
            deb.description ~= value;
            return true; // We are now in a description
        case "Tag":
            maybePopulateTags(deb, value);
            return false;
        default: return false; // Ignore "uninteresting" fields
    }
}

void maybePopulateTags(ref Deb deb, const string tags) {
    import std.regex: ctRegex, split;

    auto rx = ctRegex!(`\s*,\s*`);
    foreach (tag; tags.split(rx))
        deb.tags[tag] = unit;
}
// end of app.d

Error output:
src/app.d(59,30): Error: template std.concurrency.spawn cannot deduce function from argument types !()(void delegate(Tid parentTid, string filename), Tid, string), candidates are:
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(460,5):        spawn(F, T...)(F fn, T args)
  with F = void delegate(Tid, string),
       T = (Tid, string)
  must satisfy the following constraint:
       isSpawnable!(F, T)
src/app.d(62,24): Error: template std.concurrency.receive cannot deduce function from argument types !()(void delegate(Deb deb) pure nothrow @safe, void), candidates are:
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(673,6):        receive(T...)(T ops)
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/variant.d(701,26): Error: cannot implicitly convert expression rhs of type immutable(Deb) to Deb
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/variant.d(603,17): Error: template instance std.variant.VariantN!32LU.VariantN.opAssign!(immutable(Deb)) error instantiating
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(126,22):        instantiated from here: __ctor!(immutable(Deb))
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(656,23):        instantiated from here: __ctor!(immutable(Deb))
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(647,10):        instantiated from here: _send!(immutable(Deb))
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(626,10):        instantiated from here: _send!(immutable(Deb))
src/app.d(88,21):        instantiated from here: send!(immutable(Deb))
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/ldc2 failed with exit code 1.


Hopefully this will help someone understand and be able to help!
March 11, 2020
On Tuesday, 10 March 2020 at 20:03:21 UTC, mark wrote:
> I've managed to make a cut-down version that's < 170 LOC.
> It needs to be run on Debian or a Debian-based Linux (e.g., Ubuntu).
>
> Hopefully this will help someone understand and be able to help!

This took some time figuring out. Turns out, std.concurrency.spawn won't take a delegate as its callable argument. There are sensible reasons for this - delegates have a context that is not guaranteed to be immutable, so allowing delegate callables could lead to mutable aliasing. I've filed an issue to improve documentation and error messages: https://issues.dlang.org/show_bug.cgi?id=20665

However, knowing that some things are impossible do not necessarily help us figure out what we can do to fix the problem, and the good news is, the problems can be fixed. Since the problem is we're giving a delegate where the API expects a function, we can simply turn it into a function. In the code you've given, that means making readPackageFile and readPackageLine static. This make spawn() run as it should.

In addition, there's a problem with this line in receive():

    (DoneMessage) { jobs--; }

That looks sensible, but since DoneMessage doesn't have a name, it is parsed as a templated function taking one argument of unspecified type and called DoneMessage. For some reason, templates passed as function arguments show up in compiler output as 'void', giving this weird error message:

    template std.concurrency.receive cannot deduce function from argument types !()(void delegate(Deb deb) pure nothrow @safe, void)

The solution here is to simply give DoneMessage a name:

    (DoneMessage d) { jobs--; }

With those changes, things at least compile. Now it's up to you to ensure the semantics are correct. :)

One last thing: you're passing parentTid around, and that's not actually necessary - std.concurrency.ownerTid does the exact same thing, and is available even if not explicitly passed anywhere.

--
  Simen
March 11, 2020
Hi Simen,

I think you must have done something else but didn't mention to get it to compile. I did the exact changes you said and it wouldn't compile. Here's what I get with changes mentioned below (with new full source):

/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/variant.d(701,26): Error: cannot implicitly convert expression rhs of type immutable(Deb) to Deb
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/variant.d(603,17): Error: template instance std.variant.VariantN!32LU.VariantN.opAssign!(immutable(Deb)) error instantiating
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(126,22):        instantiated from here: __ctor!(immutable(Deb))
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(656,23):        instantiated from here: __ctor!(immutable(Deb))
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(647,10):        instantiated from here: _send!(immutable(Deb))
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/../import/std/concurrency.d(626,10):        instantiated from here: _send!(immutable(Deb))
src/app.d(89,17):        instantiated from here: send!(immutable(Deb))
/home/mark/opt/ldc2-1.20.0-linux-x86_64/bin/ldc2 failed with exit code 1.

Here is a modified version that names the DoneMessage variable and makes readPackageFile and readPackageLine into functions. Instead of making them static, I just took them outside the struct: after all, they never access struct member data. I also now use ownerTid.

// app.d
import std.typecons: Tuple;

void main() {
    import std.stdio: writeln, writefln;
    auto model = Model();
    model.readPackages();
    writefln("read %,d packages", model.length);
}

enum PACKAGE_DIR = "/var/lib/apt/lists";
enum PACKAGE_PATTERN = "*Packages";
alias Unit = void[0]; // These two lines allow me to use AAs as sets
enum unit = Unit.init;
alias MaybeKeyValue = Tuple!(string, "key", string, "value", bool, "ok");
struct DoneMessage {}

struct Deb {
    string name;
    string description;
    Unit[string] tags; // set of tags

    Deb dup() const {
        Deb deb;
        deb.name = name;
        deb.description = description;
        foreach (key; tags.byKey)
            deb.tags[key] = unit;
        return deb;
    }

    bool valid() {
        import std.string: empty;
        return !name.empty && !description.empty;
    }

    void clear() {
        name = "";
        description = "";
        tags.clear;
    }
}

struct Model {
    import std.concurrency: Tid;

    private Deb[string] debForName; // Only read once populated

    size_t length() const { return debForName.length; }

    void readPackages() {
        import std.concurrency: receive, spawn;
        import std.file: dirEntries, FileException, SpanMode;

        Tid[] tids;
        try {
            foreach (string filename; dirEntries(PACKAGE_DIR,
                                                 PACKAGE_PATTERN,
                                                 SpanMode.shallow))
                tids ~= spawn(&readPackageFile, filename);
            auto jobs = tids.length;
            while (jobs) {
                receive(
                    (Deb deb) { debForName[deb.name] = deb; },
                    (DoneMessage m) { jobs--; }
                );
            }
        } catch (FileException err) {
            import std.stdio: stderr;
            stderr.writeln("failed to read packages: ", err);
        }
    }
}

void readPackageFile(string filename) {
    import std.concurrency: ownerTid, send;
    import std.file: FileException;
    import std.range: enumerate;
    import std.stdio: File, stderr;

    try {
        bool inDescription = false; // Descriptions can by multi-line
        bool inContinuation = false; // Other things can be multi-line
        Deb deb;
        auto file = File(filename);
        foreach(lino, line; file.byLine.enumerate(1))
            readPackageLine(filename, lino, line, deb, inDescription,
                            inContinuation);
        if (deb.valid)
            send(ownerTid, cast(immutable)deb.dup);
    } catch (FileException err) {
        stderr.writefln("error: %s: failed to read packages: %s",
                        filename, err);
    }
    send(ownerTid, DoneMessage());
}

void readPackageLine(const string filename, const int lino,
                     const(char[]) line, ref Deb deb,
                     ref bool inDescription, ref bool inContinuation) {
    import std.concurrency: ownerTid, send;
    import std.path: baseName;
    import std.stdio: stderr;
    import std.string: empty, startsWith, strip;

    if (strip(line).empty) {
        if (deb.valid)
            send(ownerTid, cast(immutable)deb.dup);
        else if (!deb.name.empty || !deb.description.empty ||
                 !deb.tags.empty)
            stderr.writefln("error: %s:%,d: incomplete package: %s",
                            baseName(filename), lino, deb);
        deb.clear;
        return;
    }
    if (inDescription || inContinuation) {
        if (line.startsWith(' ') || line.startsWith('\t')) {
            if (inDescription)
                deb.description ~= line;
            return;
        }
        inDescription = inContinuation = false;
    }
    immutable keyValue = maybeKeyValue(line);
    if (!keyValue.ok)
        inContinuation = true;
    else
        inDescription = populateDeb(deb, keyValue.key, keyValue.value);
}

MaybeKeyValue maybeKeyValue(const(char[]) line) {
    import std.string: indexOf, strip;

    immutable i = line.indexOf(':');
    if (i == -1)
        return MaybeKeyValue("", "", false);
    immutable key = strip(line[0..i]).idup;
    immutable value = strip(line[i + 1..$]).idup;
    return MaybeKeyValue(key, value, true);
}

bool populateDeb(ref Deb deb, const string key, const string value) {
    import std.conv: to;

    switch (key) {
        case "Package":
            deb.name = value;
            return false;
        case "Description", "Npp-Description": // XXX ignore Npp-?
            deb.description ~= value;
            return true; // We are now in a description
        case "Tag":
            maybePopulateTags(deb, value);
            return false;
        default: return false; // Ignore "uninteresting" fields
    }
}

void maybePopulateTags(ref Deb deb, const string tags) {
    import std.regex: ctRegex, split;

    auto rx = ctRegex!(`\s*,\s*`);
    foreach (tag; tags.split(rx))
        deb.tags[tag] = unit;
}
March 11, 2020
I finally got a threaded version that works, and a lot more cleanly than using send/receive. (But performance is dismal, see the end.)

Here's the heart of the solution:

    void readPackages() {
        import std.algorithm: max;
        import std.array: array;
        import std.parallelism: taskPool, totalCPUs;
        import std.file: dirEntries, FileException, SpanMode;

        try {
            auto filenames = dirEntries(PACKAGE_DIR, PACKAGE_PATTERN,
                                        SpanMode.shallow).array;
            foreach (debs; taskPool.map!readPackageFile(filenames))
                foreach (deb; debs)
                    debForName[deb.name] = deb.dup;
        } catch (FileException err) {
            import std.stdio: stderr;
            stderr.writeln("failed to read packages: ", err);
        }
    }

I had to change readPackageFile (and the functions it calls), e.g.,:

Deb[] readPackageFile(string filename) {
    import std.file: FileException;
    import std.range: enumerate;
    import std.stdio: File, stderr;

    Deb[] debs;
    Deb deb;
    try {
        bool inDescription = false; // Descriptions can by multi-line
        bool inContinuation = false; // Other things can be multi-line
        auto file = File(filename);
        foreach(lino, line; file.byLine.enumerate(1))
            readPackageLine(debs, deb, filename, lino, line, inDescription,
                            inContinuation);
        if (deb.valid)
            debs ~= deb.dup;
    } catch (FileException err) {
        stderr.writefln("error: %s: failed to read packages: %s",
                        filename, err);
    }
    return debs;
}

I also changed main() to do some timings & to allow me to compare outputs:

void main(const string[] args) {
    import std.datetime.stopwatch: AutoStart, StopWatch;
    import std.stdio: stderr, writeln;

    auto model = Model();
    auto timer = StopWatch(AutoStart.yes);
    model.readPackages();
    stderr.writefln("read %,d packages in %s", model.length, timer.peek);
    if (args.length > 1)
        foreach (deb; model.debForName)
            writeln(deb);
}


This produces the same output as the single-threaded version.

Here's the output of a typical single-threaded version's run:

read 65,480 packages in 1 sec, 314 ms, 798 μs, and 7 hnsecs

And here's the output of a typical task-based multi-threaded version's run:

read 65,480 packages in 1 sec, 377 ms, 605 μs, and 3 hnsecs

In fact, the multi-threaded has never yet been as fast as the single-threaded version!

I've put both versions on github in case anyone's interested:
https://github.com/mark-summerfield/d-debtest-experiment
« First   ‹ Prev
1 2