Thread overview
How to create Multi Producer-Single Consumer concurrency
Jun 12, 2020
adnan338
Jun 16, 2020
Ali Çehreli
Jul 13, 2022
Bagomot
Jul 13, 2022
Ali Çehreli
Jul 13, 2022
Bagomot
June 12, 2020
I have a list of files to download from the internet. Each of those downloads are a time consuming task.

What I want to do is download them concurrently and when each of those downloads finish, I want to activate something in my main thread, which will update a progressbar in the GUI thread.

So there are multiple "download finished" message producers, and one consumer of those messages. Furthermore, that producer has a callback that triggers an UI object.

For example,

auto list = [...]; // list of URLs
auto downloadableItems = list.length;
__gshared int downloaded = 0;
auto progressBar = // a gtk progressbar;

How should I proceed? I can do a parallel foreach and call download() on each of the URL and update `downloaded`, but I don't know how to listen to `downloaded` when it gets updated so I can make changes in `progressBar`

Not that my UI library is not thread safe and I cannot access UI object from different threads.
June 16, 2020
On 6/12/20 3:02 PM, adnan338 wrote:

> So there are multiple "download finished" message producers, and one
> consumer of those messages. Furthermore, that producer has a callback
> that triggers an UI object.

That's almost exactly what I do in some of my programs. I use std.concurrency and the following is a working sketch of what I do.

I assumed you get finer individual granularity of progress as opposed to the binary 0% -> 100%.

import std.stdio;
import std.concurrency;
import std.algorithm;
import std.range;
import std.exception;
import std.format;
import core.thread;

struct Progress {
  Tid tid;          // The id of the reporting thread
  size_t amount;    // The amount of progress so far
  size_t total;     // Total progress (can be file size)
}

void display(Progress[Tid] progresses) {
  const amount = progresses.byValue.map!(p => p.amount).sum;
  const total = progresses.byValue.map!(p => p.total).sum;
  writefln!"%6.2f%%"(100.0 * amount / total);
}

// The worker thread function
void download(string url) {
  writefln!"Worker %s downloading %s."(thisTid, url);
  enum total = 20;
  foreach (i; 0 .. total) {
    // Imitate some progress
    Thread.sleep(100.msecs);

    // Report progress to owner
    ownerTid.send(Progress(thisTid, i + 1, total));
  }
}

void main() {
  auto list = [ "dlang.org", "ddili.org" ];
  auto downloaders = list.length
                     .iota
                     .map!(i => spawnLinked(&download, list[i]))
                     .array;

  Progress[Tid] progresses;
  size_t finished = 0;

  while (finished != list.length) {
    receive(
      (LinkTerminated arg) {
        ++finished;

        // Check whether this thread is exiting prematurely
        enforce((arg.tid in progresses) &&
                (progresses[arg.tid].amount ==  progresses[arg.tid].total),
                format!"Thread %s exited unexpectedly"(arg.tid));
      },

      (Progress progress) {
        progresses[progress.tid] = progress;
        progresses.display();
      }
    );
  }

  writeln("Processing the downloaded files.");
}

Ali

July 13, 2022

On Tuesday, 16 June 2020 at 09:10:09 UTC, Ali Çehreli wrote:

>

On 6/12/20 3:02 PM, adnan338 wrote:

>

So there are multiple "download finished" message producers,
and one
consumer of those messages. Furthermore, that producer has a
callback
that triggers an UI object.

That's almost exactly what I do in some of my programs. I use std.concurrency and the following is a working sketch of what I do.

I assumed you get finer individual granularity of progress as opposed to the binary 0% -> 100%.

import std.stdio;
import std.concurrency;
import std.algorithm;
import std.range;
import std.exception;
import std.format;
import core.thread;

struct Progress {
Tid tid; // The id of the reporting thread
size_t amount; // The amount of progress so far
size_t total; // Total progress (can be file size)
}

void display(Progress[Tid] progresses) {
const amount = progresses.byValue.map!(p => p.amount).sum;
const total = progresses.byValue.map!(p => p.total).sum;
writefln!"%6.2f%%"(100.0 * amount / total);
}

// The worker thread function
void download(string url) {
writefln!"Worker %s downloading %s."(thisTid, url);
enum total = 20;
foreach (i; 0 .. total) {
// Imitate some progress
Thread.sleep(100.msecs);

// Report progress to owner
ownerTid.send(Progress(thisTid, i + 1, total));

}
}

void main() {
auto list = [ "dlang.org", "ddili.org" ];
auto downloaders = list.length
.iota
.map!(i => spawnLinked(&download, list[i]))
.array;

Progress[Tid] progresses;
size_t finished = 0;

while (finished != list.length) {
receive(
(LinkTerminated arg) {
++finished;

    // Check whether this thread is exiting prematurely
    enforce((arg.tid in progresses) &&
            (progresses[arg.tid].amount ==  progresses[arg.tid].total),
            format!"Thread %s exited unexpectedly"(arg.tid));
  },

  (Progress progress) {
    progresses[progress.tid] = progress;
    progresses.display();
  }
);

}

writeln("Processing the downloaded files.");
}

Ali

How to do the same with taskPool instead of spawnLinked?

July 13, 2022
On 7/13/22 02:25, Bagomot wrote:

> How to do the same with `taskPool` instead of `spawnLinked`?

You are hitting the nail on the head. :) std.parallelism, which taskPool is a concept of, is for cases where operations are independent.

However, producer and consumer are by definition dependent, so it's a problem for std.concurrency, which involves message boxes.

You can do the same with std.parallelism or core.thread but you would be implementing some of what std.concurrency already provides.

The following are my understandings of these topics:

  http://ddili.org/ders/d.en/parallelism.html

  http://ddili.org/ders/d.en/concurrency.html

  http://ddili.org/ders/d.en/concurrency_shared.html

The introduction section of the Concurrency chapter lists some differences.

Ali

July 13, 2022

On Wednesday, 13 July 2022 at 19:06:48 UTC, Ali Çehreli wrote:

>

On 7/13/22 02:25, Bagomot wrote:

>

How to do the same with taskPool instead of spawnLinked?

You are hitting the nail on the head. :) std.parallelism, which taskPool is a concept of, is for cases where operations are independent.

However, producer and consumer are by definition dependent, so it's a problem for std.concurrency, which involves message boxes.

You can do the same with std.parallelism or core.thread but you would be implementing some of what std.concurrency already provides.

The following are my understandings of these topics:

http://ddili.org/ders/d.en/parallelism.html

http://ddili.org/ders/d.en/concurrency.html

http://ddili.org/ders/d.en/concurrency_shared.html

The introduction section of the Concurrency chapter lists some differences.

Ali

Thank you! I understood the difference between std.parallelism and std.concurrency. My question no longer relevant :)