Thread overview
GtkD code review - How to update a progressbar using data sharing concurrency
Jun 20, 2020
adnan338
Jun 21, 2020
Ali Çehreli
Jun 21, 2020
adnan338
Jun 21, 2020
adnan338
Jun 22, 2020
Ali Çehreli
Jun 21, 2020
Kagamin
June 20, 2020
Hello, I need a code review on my strategy of updating a GtkD progressbar. Gtk is not thread safe, I interpret that as "I must only access data available in the main thread from the Gtk objects".

This example is a simplified excerpt of my project. I have never done concurrency before and thus I would like a code review. The goal here is

1. Downloading a list of file in parallel
2. Update the gtk progressbar periodically to show the overall download progress.

import gio.Application : GioApplication = Application;
import gtk.Application : Application;
import gtk.ApplicationWindow : ApplicationWindow;
import gtk.ProgressBar : ProgressBar;
import glib.Timeout : Timeout;
import gtkc.gtktypes : GApplicationFlags, GPriority;

class Downloader
{
	string[] links = [`link1`, `link2`, `link3`, `link4`];
	private shared size_t completed = 0;

	double getFraction()
	{
		return cast(double) completed / links.length;
	}

	static void start(ref Downloader downloader)
	{
		import std.parallelism : parallel;
		import core.thread : Thread, seconds;

		{
			// emulate HTTP response overhead;
			Thread.sleep(seconds(2));
		}
		synchronized
		{
			// emulate random Download time
			import std.random : Random, uniform;

			auto rnd = Random(4361);

			foreach (_; downloader.links.parallel())
			{
				Thread.sleep(uniform(0, 6, rnd).seconds());
				++cast() downloader.completed;
			}
		}
	}
}

class ProgressIndicatorBar : ProgressBar
{
	this()
	{
		super.setShowText(true);
		super.setPulseStep(0.2);
	}
}

class PrimaryWindow : ApplicationWindow
{
	const int width = 320, height = 100;
	ProgressIndicatorBar pib;
	this(Application app)
	{
		super(app);
		super.setSizeRequest(width, height);
		scope (success)
			super.showAll();

		pib = new ProgressIndicatorBar();
		scope (success)
			add(pib);

		auto downloader = new Downloader();

		import std.parallelism : task;

		auto downloadTask = task!(Downloader.start)(downloader);
		downloadTask.executeInNewThread();

		auto timeout = new Timeout(100, delegate bool() {
			if (downloader.completed < downloader.links.length)
			{
				if (downloader.completed == 0)
				{
					pib.setText(`Awaiting response...`);
					pib.pulse();
				}
				else
				{
					pib.setText(`Downloading...`);
					pib.setFraction(downloader.getFraction());
				}
				return true;
			}
			else
			{
				super.setTitle(`Downloading complete`);
				// pib.setShowText(false);
				pib.setVisible(false);
				return false;
			}
		}, GPriority.HIGH);
	}
}

int main(string[] args)
{
	auto application = new Application(`org.gitlab.helloprogressbar`, GApplicationFlags.FLAGS_NONE);
	application.addOnActivate(delegate void(GioApplication app) {
		auto appWindow = new PrimaryWindow(application);
	});
	return application.run(args);
}

June 21, 2020
On 6/20/20 9:30 AM, adnan338 wrote:

> Hello, I need a code review on my strategy

I don't know gtkd so I did not compile the code and I did not review the code very carefully.

However, I don't think you need to 'synchronized' the whole parallel loop. Since there is only one thread that executes start(), that synchronized cannot have any effect at all. What you want to synchronize is the mutating access to 'completed' by the threads that parallel() starts automatically. So, move 'synchronized' just around that expression:

// REMOVE this one:
// synchronized
// {

  foreach (_; downloader.links.parallel())
  {
      Thread.sleep(uniform(0, 6, rnd).seconds());

      // ADD this one:
      synchronized {
        ++cast() downloader.completed;
      }
  }

// }

Ali

June 21, 2020
Not sure how much synchronization do you want to do.

import gio.Application : GioApplication = Application;
import gtk.Application : Application;
import gtk.ApplicationWindow : ApplicationWindow;
import gtk.ProgressBar : ProgressBar;
import glib.Timeout : Timeout;
import gtkc.gtktypes : GApplicationFlags, GPriority;

shared class Downloader
{
	import core.atomic;
	string[] links = [`link1`, `link2`, `link3`, `link4`];
	private size_t completed = 0;

	size_t count()
	{
		return atomicLoad(completed);
	}

	void increment()
	{
		atomicOp!"+="(completed, 1);
	}

	static void start(shared Downloader downloader)
	{
		import std.parallelism : parallel;
		import core.thread : Thread, seconds;

		{
			// emulate HTTP response overhead;
			Thread.sleep(seconds(2));
		}
		{
			// emulate random Download time
			import std.random : Random, uniform;

			auto rnd = Random(4361);

			foreach (ref link; downloader.links.parallel())
			{
				Thread.sleep(uniform(0, 6, rnd).seconds());
				downloader.increment();
			}
		}
	}
}

class ProgressIndicatorBar : ProgressBar
{
	this()
	{
		super.setShowText(true);
		super.setPulseStep(0.2);
	}
}

class PrimaryWindow : ApplicationWindow
{
	const int width = 320, height = 100;
	ProgressIndicatorBar pib;
	this(Application app)
	{
		super(app);
		super.setSizeRequest(width, height);
		scope (success)
			super.showAll();

		pib = new ProgressIndicatorBar();
		scope (success)
			add(pib);

		shared downloader = new shared Downloader();

		import std.parallelism : task;

		auto downloadTask = task!(Downloader.start)(downloader);
		downloadTask.executeInNewThread();

		auto timeout = new Timeout(100, delegate bool() {
			const long completed=downloader.count, total=downloader.links.length;
			if (completed < total)
			{
				if (completed == 0)
				{
					pib.setText(`Awaiting response...`);
					pib.pulse();
				}
				else
				{
					pib.setText(`Downloading...`);
					double fraction=completed; fraction/=total;
					pib.setFraction(fraction);
				}
				return true;
			}
			else
			{
				super.setTitle(`Downloading complete`);
				// pib.setShowText(false);
				pib.setVisible(false);
				return false;
			}
		}, GPriority.HIGH);
	}
}

int main(string[] args)
{
	auto application = new Application(`org.gitlab.helloprogressbar`, GApplicationFlags.FLAGS_NONE);
	application.addOnActivate(delegate void(GioApplication app) {
		auto appWindow = new PrimaryWindow(application);
	});
	return application.run(args);
}

June 21, 2020
On Sunday, 21 June 2020 at 09:16:06 UTC, Ali Çehreli wrote:
> On 6/20/20 9:30 AM, adnan338 wrote:
>
> > Hello, I need a code review on my strategy
>
> I don't know gtkd so I did not compile the code and I did not review the code very carefully.
>
> However, I don't think you need to 'synchronized' the whole parallel loop. Since there is only one thread that executes start(), that synchronized cannot have any effect at all. What you want to synchronize is the mutating access to 'completed' by the threads that parallel() starts automatically. So, move 'synchronized' just around that expression:
>
> // REMOVE this one:
> // synchronized
> // {
>
>   foreach (_; downloader.links.parallel())
>   {
>       Thread.sleep(uniform(0, 6, rnd).seconds());
>
>       // ADD this one:
>       synchronized {
>         ++cast() downloader.completed;
>       }
>   }
>
> // }
>
> Ali

Thank you. However I am concerned about a data race. I have at least two places where I am at risk of a data race.

1. In the static method `start`, where I mutably access the value `completed` from parallel threads.
    I *think* I have implemented it safely simply by making the `completed` a shared(size_t).

2. In the `timeout` delegate. The glib Timout is a struct that accepts a delegate and invokes it periodically. The ctor requires 3 values,
  i. polling time (in ms)
  ii. the delegate to execute in each polling
  iii. priority
  The return value is whether the timeout should continue. The gtk event loop executes the triggers timeout automatically afaik.

    I think I am at risk here. Before constructing the Timeout, I create a new task and invoke it in a new thread:

auto downloadTask = task!(Downloader.start)(downloader);
downloadTask.executeInNewThread();

Right after that I create a Timeout. I try to read the `completed` (which is the shared member as mentioned in the previous point) once in every 100 ms.

auto timeout = new Timeout(100, delegate bool() {
			if (downloader.completed < downloader.links.length)
			{
				if (downloader.completed == 0)
				{
					pib.setText(`Awaiting response...`);
					pib.pulse();
				}
				else
				{
					pib.setText(`Downloading...`);
					pib.setFraction(downloader.getFraction());
				}
				return true;
			}
			else
			{
				super.setTitle(`Downloading complete`);
				// pib.setShowText(false);
				pib.setVisible(false);
				return false;
			}
		}, GPriority.HIGH);

I am thinking I am prone to a data race here. The `completed` is being updated by the `start` method and also is being read by the main thread inside `timeout`

I am trying to figure out how to prevent this data race.

June 21, 2020
On Sunday, 21 June 2020 at 12:43:32 UTC, adnan338 wrote:
> On Sunday, 21 June 2020 at 09:16:06 UTC, Ali Çehreli wrote:
>> On 6/20/20 9:30 AM, adnan338 wrote:
>>
>> > Hello, I need a code review on my strategy
>>
>> I don't know gtkd so I did not compile the code and I did not review the code very carefully.
>>
>> However, I don't think you need to 'synchronized' the whole parallel loop. Since there is only one thread that executes start(), that synchronized cannot have any effect at all. What you want to synchronize is the mutating access to 'completed' by the threads that parallel() starts automatically. So, move 'synchronized' just around that expression:
>>
>> // REMOVE this one:
>> // synchronized
>> // {
>>
>>   foreach (_; downloader.links.parallel())
>>   {
>>       Thread.sleep(uniform(0, 6, rnd).seconds());
>>
>>       // ADD this one:
>>       synchronized {
>>         ++cast() downloader.completed;
>>       }
>>   }
>>
>> // }
>>
>> Ali
>
> Thank you. However I am concerned about a data race. I have at least two places where I am at risk of a data race.
>
> 1. In the static method `start`, where I mutably access the value `completed` from parallel threads.
>     I *think* I have implemented it safely simply by making the `completed` a shared(size_t).
>
> 2. In the `timeout` delegate. The glib Timout is a struct that accepts a delegate and invokes it periodically. The ctor requires 3 values,
>   i. polling time (in ms)
>   ii. the delegate to execute in each polling
>   iii. priority
>   The return value is whether the timeout should continue. The gtk event loop executes the triggers timeout automatically afaik.
>
>     I think I am at risk here. Before constructing the Timeout, I create a new task and invoke it in a new thread:
>
> auto downloadTask = task!(Downloader.start)(downloader);
> downloadTask.executeInNewThread();
>
> Right after that I create a Timeout. I try to read the `completed` (which is the shared member as mentioned in the previous point) once in every 100 ms.
>
> auto timeout = new Timeout(100, delegate bool() {
> 			if (downloader.completed < downloader.links.length)
> 			{
> 				if (downloader.completed == 0)
> 				{
> 					pib.setText(`Awaiting response...`);
> 					pib.pulse();
> 				}
> 				else
> 				{
> 					pib.setText(`Downloading...`);
> 					pib.setFraction(downloader.getFraction());
> 				}
> 				return true;
> 			}
> 			else
> 			{
> 				super.setTitle(`Downloading complete`);
> 				// pib.setShowText(false);
> 				pib.setVisible(false);
> 				return false;
> 			}
> 		}, GPriority.HIGH);
>
> I am thinking I am prone to a data race here. The `completed` is being updated by the `start` method and also is being read by the main thread inside `timeout`
>
> I am trying to figure out how to prevent this data race.

It is also worth noting that simply saving the values from a new sync block makes the `completed` remain stuck in 0;

auto downloadTask = task!(Downloader.start)(downloader);
		downloadTask.executeInNewThread();
		size_t currentlyCompleted;
		double currentFraction;
		synchronized
		{
			currentlyCompleted = cast(size_t) downloader.completed;
			currentFraction = cast(double) downloader.getFraction();
		}

		auto timeout = new Timeout(100, delegate bool() {
			if (currentlyCompleted < downloader.links.length)
			{
				if (currentlyCompleted == 0) // --- Stuck here! ---
				{
					pib.setText(`Awaiting response...`);
					pib.pulse();
				}
				else
				{
					pib.setText(`Downloading...`);
					pib.setFraction(currentFraction);
				}
				return true;
			}
			else
			{
				super.setTitle(`Downloading complete`);
				// pib.setShowText(false);
				pib.setVisible(false);
				return false;
			}
		}, GPriority.HIGH);
June 22, 2020
On 6/21/20 5:52 AM, adnan338 wrote:

>> I am trying to figure out how to prevent this data race.

I still like the std.concurrency method I used here:

  https://forum.dlang.org/post/rkitcprqvslexgqafqeh@forum.dlang.org

The only difference is that your individual progresses are from 0% to 100%. The example can be changed easily to report 100% once at the end of each download.

Ali