Jump to page: 1 2
Thread overview
std.parallelism: How to wait all tasks finished?
Feb 03, 2014
Cooler
Feb 03, 2014
Dan Killebrew
Feb 03, 2014
Cooler
Feb 04, 2014
Dan Killebrew
Feb 05, 2014
Cooler
Feb 05, 2014
Chris Williams
Feb 06, 2014
Andrea Fontana
Feb 06, 2014
Cooler
Feb 06, 2014
Cooler
Feb 06, 2014
Andrea Fontana
Feb 06, 2014
Andrea Fontana
Feb 06, 2014
Cooler
Feb 06, 2014
Russel Winder
Feb 06, 2014
Cooler
February 03, 2014
I have several tasks. Each task may or may not create another task. What is the best way to wait until all tasks finished?

The code:

void procData(){
  if(...)
    taskPool.put(task(&procData));
}

void main(){
  taskPool.put(task(&procData));
  taskPool.put(task(&procData));
  ...
  taskPool.put(task(&procData));

  // Next line will block execution until all tasks already in queue finished.
  // Almost all what I need, but new tasks will not be started.
  taskPool.finish(true);
}
February 03, 2014
>   // Next line will block execution until all tasks already in queue finished.
>   // Almost all what I need, but new tasks will not be started.
>   taskPool.finish(true);
> }

Are you sure TaskPool.finish isn't what you're looking for?

"Signals worker threads to terminate when the queue becomes empty."

It seems to me that worker threads will continue as long as the queue isn't empty. So if a task adds another task to the pool, some worker will process the newly enqueued task.
February 03, 2014
On Monday, 3 February 2014 at 06:56:35 UTC, Dan Killebrew wrote:
>>  // Next line will block execution until all tasks already in queue finished.
>>  // Almost all what I need, but new tasks will not be started.
>>  taskPool.finish(true);
>> }
>
> Are you sure TaskPool.finish isn't what you're looking for?
>
> "Signals worker threads to terminate when the queue becomes empty."
>
> It seems to me that worker threads will continue as long as the queue isn't empty. So if a task adds another task to the pool, some worker will process the newly enqueued task.

No. After taskPool.finish() no way to add new tasks to the queue. taskPool.put will not add new tasks.
February 04, 2014
>> It seems to me that worker threads will continue as long as the queue isn't empty. So if a task adds another task to the pool, some worker will process the newly enqueued task.
>
> No. After taskPool.finish() no way to add new tasks to the queue. taskPool.put will not add new tasks.

Then perhaps you need to create a new TaskPool (and make sure that workers add their tasks to the correct task pool), so that you can wait on the first task pool, then wait on the second task pool, etc.

auto phase1 = new TaskPool();
//make sure all new tasks are added to phase1
phase1.finish(true);

auto phase2 = new TaskPool();
//make sure all new tasks are added to phase2
phase2.finish(true);
February 05, 2014
On Tuesday, 4 February 2014 at 03:26:04 UTC, Dan Killebrew wrote:
>>> It seems to me that worker threads will continue as long as the queue isn't empty. So if a task adds another task to the pool, some worker will process the newly enqueued task.
>>
>> No. After taskPool.finish() no way to add new tasks to the queue. taskPool.put will not add new tasks.
>
> Then perhaps you need to create a new TaskPool (and make sure that workers add their tasks to the correct task pool), so that you can wait on the first task pool, then wait on the second task pool, etc.
>
> auto phase1 = new TaskPool();
> //make sure all new tasks are added to phase1
> phase1.finish(true);
>
> auto phase2 = new TaskPool();
> //make sure all new tasks are added to phase2
> phase2.finish(true);

Will not help. I don't know beforehand what tasks will be
created. procData is recursive and it decides create new task or
not.
February 05, 2014
On Wednesday, 5 February 2014 at 15:38:14 UTC, Cooler wrote:
> Will not help. I don't know beforehand what tasks will be
> created. procData is recursive and it decides create new task or
> not.

You seem to be saying that you want to be able to wait for all tasks to complete an indefinite number of times, adding more tasks after each one. Why would you want to do that? The queue for the pool is infinitely long, so just keep adding tasks till you have no more tasks to add. Or if you have a progression of types, like all tasks of type A have to be complete before you can start running the tasks of type B, then you should be able to have a separate thread pool for each type.
February 06, 2014
On Wednesday, 5 February 2014 at 15:38:14 UTC, Cooler wrote:
> On Tuesday, 4 February 2014 at 03:26:04 UTC, Dan Killebrew wrote:
>>>> It seems to me that worker threads will continue as long as the queue isn't empty. So if a task adds another task to the pool, some worker will process the newly enqueued task.
>>>
>>> No. After taskPool.finish() no way to add new tasks to the queue. taskPool.put will not add new tasks.
>>
>> Then perhaps you need to create a new TaskPool (and make sure that workers add their tasks to the correct task pool), so that you can wait on the first task pool, then wait on the second task pool, etc.
>>
>> auto phase1 = new TaskPool();
>> //make sure all new tasks are added to phase1
>> phase1.finish(true);
>>
>> auto phase2 = new TaskPool();
>> //make sure all new tasks are added to phase2
>> phase2.finish(true);
>
> Will not help. I don't know beforehand what tasks will be
> created. procData is recursive and it decides create new task or
> not.


Something like this? (not tested...)

shared bool more = true;
...
...
...

void procData(){
  if(...)
  {
    taskPool.put(task(&procData));
    more = true;
  }
}

while(true)
{
   taskPool.finish(true);
   if (!more) break;
   else more = false;
}


February 06, 2014
On Thursday, 6 February 2014 at 11:30:17 UTC, Andrea Fontana wrote:
> On Wednesday, 5 February 2014 at 15:38:14 UTC, Cooler wrote:
>> On Tuesday, 4 February 2014 at 03:26:04 UTC, Dan Killebrew wrote:
>>>>> It seems to me that worker threads will continue as long as the queue isn't empty. So if a task adds another task to the pool, some worker will process the newly enqueued task.
>>>>
>>>> No. After taskPool.finish() no way to add new tasks to the queue. taskPool.put will not add new tasks.
>>>
>>> Then perhaps you need to create a new TaskPool (and make sure that workers add their tasks to the correct task pool), so that you can wait on the first task pool, then wait on the second task pool, etc.
>>>
>>> auto phase1 = new TaskPool();
>>> //make sure all new tasks are added to phase1
>>> phase1.finish(true);
>>>
>>> auto phase2 = new TaskPool();
>>> //make sure all new tasks are added to phase2
>>> phase2.finish(true);
>>
>> Will not help. I don't know beforehand what tasks will be
>> created. procData is recursive and it decides create new task or
>> not.
>
>
> Something like this? (not tested...)
>
> shared bool more = true;
> ...
> ...
> ...
>
> void procData(){
>   if(...)
>   {
>     taskPool.put(task(&procData));
>     more = true;
>   }
> }
>
> while(true)
> {
>    taskPool.finish(true);
>    if (!more) break;
>    else more = false;
> }

It is closer, but after taskPool.finish() all tries to taskPool.put() will be rejected. Let's me clear example.

import std.stdio, std.parallelism, core.thread;

shared int i;

void procData(){
  synchronized ++i;
  if(i >= 100)
    return;
  foreach(i; 0 .. 100)
    taskPool.put(task(&procData)); // New tasks will be rejected after
                                   // taskPool.finish()
}

void main(){
  taskPool.put(task(&procData));
  Thread.sleep(1.msecs); // The final output of "i" depends on duration here
  taskPool.finish(true);
  writefln("i = %s", i);
}

In the example above the total number of tasks executed depends on sleep duration.
February 06, 2014
On Thursday, 6 February 2014 at 14:42:57 UTC, Cooler wrote:
> On Thursday, 6 February 2014 at 11:30:17 UTC, Andrea Fontana wrote:
>> On Wednesday, 5 February 2014 at 15:38:14 UTC, Cooler wrote:
>>> On Tuesday, 4 February 2014 at 03:26:04 UTC, Dan Killebrew wrote:
>>>>>> It seems to me that worker threads will continue as long as the queue isn't empty. So if a task adds another task to the pool, some worker will process the newly enqueued task.
>>>>>
>>>>> No. After taskPool.finish() no way to add new tasks to the queue. taskPool.put will not add new tasks.
>>>>
>>>> Then perhaps you need to create a new TaskPool (and make sure that workers add their tasks to the correct task pool), so that you can wait on the first task pool, then wait on the second task pool, etc.
>>>>
>>>> auto phase1 = new TaskPool();
>>>> //make sure all new tasks are added to phase1
>>>> phase1.finish(true);
>>>>
>>>> auto phase2 = new TaskPool();
>>>> //make sure all new tasks are added to phase2
>>>> phase2.finish(true);
>>>
>>> Will not help. I don't know beforehand what tasks will be
>>> created. procData is recursive and it decides create new task or
>>> not.
>>
>>
>> Something like this? (not tested...)
>>
>> shared bool more = true;
>> ...
>> ...
>> ...
>>
>> void procData(){
>>  if(...)
>>  {
>>    taskPool.put(task(&procData));
>>    more = true;
>>  }
>> }
>>
>> while(true)
>> {
>>   taskPool.finish(true);
>>   if (!more) break;
>>   else more = false;
>> }
>
> It is closer, but after taskPool.finish() all tries to taskPool.put() will be rejected. Let's me clear example.
>
> import std.stdio, std.parallelism, core.thread;
>
> shared int i;
>
> void procData(){
>   synchronized ++i;
>   if(i >= 100)
>     return;
>   foreach(i; 0 .. 100)
>     taskPool.put(task(&procData)); // New tasks will be rejected after
>                                    // taskPool.finish()
> }
>
> void main(){
>   taskPool.put(task(&procData));
>   Thread.sleep(1.msecs); // The final output of "i" depends on duration here
>   taskPool.finish(true);
>   writefln("i = %s", i);
> }
>
> In the example above the total number of tasks executed depends on sleep duration.

Forgot to say - I know how to solve the topic problem. My
question is "What is the BEST way?".
One of my idea - may be introduce new function, named for example
"wait", that will block until there are working tasks?
February 06, 2014
On Thursday, 6 February 2014 at 14:52:36 UTC, Cooler wrote:
> On Thursday, 6 February 2014 at 14:42:57 UTC, Cooler wrote:
>> On Thursday, 6 February 2014 at 11:30:17 UTC, Andrea Fontana wrote:
>>> On Wednesday, 5 February 2014 at 15:38:14 UTC, Cooler wrote:
>>>> On Tuesday, 4 February 2014 at 03:26:04 UTC, Dan Killebrew wrote:
>>>>>>> It seems to me that worker threads will continue as long as the queue isn't empty. So if a task adds another task to the pool, some worker will process the newly enqueued task.
>>>>>>
>>>>>> No. After taskPool.finish() no way to add new tasks to the queue. taskPool.put will not add new tasks.
>>>>>
>>>>> Then perhaps you need to create a new TaskPool (and make sure that workers add their tasks to the correct task pool), so that you can wait on the first task pool, then wait on the second task pool, etc.
>>>>>
>>>>> auto phase1 = new TaskPool();
>>>>> //make sure all new tasks are added to phase1
>>>>> phase1.finish(true);
>>>>>
>>>>> auto phase2 = new TaskPool();
>>>>> //make sure all new tasks are added to phase2
>>>>> phase2.finish(true);
>>>>
>>>> Will not help. I don't know beforehand what tasks will be
>>>> created. procData is recursive and it decides create new task or
>>>> not.
>>>
>>>
>>> Something like this? (not tested...)
>>>
>>> shared bool more = true;
>>> ...
>>> ...
>>> ...
>>>
>>> void procData(){
>>> if(...)
>>> {
>>>   taskPool.put(task(&procData));
>>>   more = true;
>>> }
>>> }
>>>
>>> while(true)
>>> {
>>>  taskPool.finish(true);
>>>  if (!more) break;
>>>  else more = false;
>>> }
>>
>> It is closer, but after taskPool.finish() all tries to taskPool.put() will be rejected. Let's me clear example.
>>
>> import std.stdio, std.parallelism, core.thread;
>>
>> shared int i;
>>
>> void procData(){
>>  synchronized ++i;
>>  if(i >= 100)
>>    return;
>>  foreach(i; 0 .. 100)
>>    taskPool.put(task(&procData)); // New tasks will be rejected after
>>                                   // taskPool.finish()
>> }
>>
>> void main(){
>>  taskPool.put(task(&procData));
>>  Thread.sleep(1.msecs); // The final output of "i" depends on duration here
>>  taskPool.finish(true);
>>  writefln("i = %s", i);
>> }
>>
>> In the example above the total number of tasks executed depends on sleep duration.
>
> Forgot to say - I know how to solve the topic problem. My
> question is "What is the BEST way?".
> One of my idea - may be introduce new function, named for example
> "wait", that will block until there are working tasks?

What about sync ++taskCount when you put() something and --taskCount when task is done? And on main while(i > 0) Thread.yield(); ?
« First   ‹ Prev
1 2