Jump to page: 1 2
Thread overview
Starting and managing threads
Dec 27, 2021
Bagomot
Dec 27, 2021
Ali Çehreli
Dec 28, 2021
Bagomot
Dec 28, 2021
Tejas
Dec 28, 2021
Bagomot
Dec 28, 2021
Tejas
Dec 28, 2021
Bagomot
Jan 12, 2022
Bagomot
Jan 15, 2022
Bagomot
Jan 15, 2022
frame
Jan 15, 2022
Bagomot
Jan 15, 2022
Ali Çehreli
Jan 16, 2022
Bagomot
Jan 16, 2022
forkit
December 27, 2021

Hello everybody!

My program uses the fswatch library to track changes in a directory. It runs on the main thread of the program. I need it to do its work in a separate thread, without blocking the main one. In addition, I need to be able to terminate the thread at the moment I want from the main thread of the program.

I tried to get my head around Thread and Fiber but still didn't figure out how to properly start and manage threads. I have using Thread turns it into a zombie when the main thread of the program ends.

I will not even write my code here, because it is at the level of examples from the documentation. Please tell me how to start threads correctly, how to manage them and how to end them without turning them into zombies.

December 27, 2021
On 12/27/21 1:33 AM, Bagomot wrote:

> separate thread, without blocking the main one.

I think you can use std.concurrency there. I have a chapter here:

  http://ddili.org/ders/d.en/concurrency.html

Look for 'struct Exit' to see how the main thread signals workers to stop running.

And some std.concurrency hints appear in my DConf Online 2020 presentation here:

  https://dconf.org/2020/online/#ali1

Ali

December 28, 2021

On Monday, 27 December 2021 at 10:59:07 UTC, Ali Çehreli wrote:

>

On 12/27/21 1:33 AM, Bagomot wrote:

>

separate thread, without blocking the main one.

I think you can use std.concurrency there. I have a chapter here:

http://ddili.org/ders/d.en/concurrency.html

Look for 'struct Exit' to see how the main thread signals workers to stop running.

And some std.concurrency hints appear in my DConf Online 2020 presentation here:

https://dconf.org/2020/online/#ali1

Ali

I tried to run with std.concurrency via spawn, but this does not work for me for the reason that in the program I run the thread not from main, but from the object. It looks something like this:

import std.concurrency;
import std.thread;

void main() {
	Test.getInstance.run;
}

class Test {
	private {
		__gshared Test instance;
		Watcher[] watchers;
	}

	protected this() {
	}

	public static Test getInstance() {
		if (!instance) {
			synchronized (Test.classinfo) {
				if (!instance)
					instance = new Test;
			}
		}

		return instance;
	}

	public void run() {
		foreach (Watcher watcher; this.watchers) {
			spawn(&watcher.run);
		}
	}
}

class Watcher {
	public void run() {
		while (true) {
			// job
		}
	}
}

Error: template std.concurrency.spawn cannot deduce function from argument types !()(void delegate()).

I would not want to do this from main because it breaks the structure of my program. Is there a way to do it the way I want?

December 28, 2021

On Tuesday, 28 December 2021 at 14:19:46 UTC, Bagomot wrote:

>

On Monday, 27 December 2021 at 10:59:07 UTC, Ali Çehreli wrote:

>

On 12/27/21 1:33 AM, Bagomot wrote:

>

separate thread, without blocking the main one.

I think you can use std.concurrency there. I have a chapter here:

http://ddili.org/ders/d.en/concurrency.html

Look for 'struct Exit' to see how the main thread signals workers to stop running.

And some std.concurrency hints appear in my DConf Online 2020 presentation here:

https://dconf.org/2020/online/#ali1

Ali

I tried to run with std.concurrency via spawn, but this does not work for me for the reason that in the program I run the thread not from main, but from the object. It looks something like this:

import std.concurrency;
import std.thread;

void main() {
	Test.getInstance.run;
}

class Test {
	private {
		__gshared Test instance;
		Watcher[] watchers;
	}

	protected this() {
	}

	public static Test getInstance() {
		if (!instance) {
			synchronized (Test.classinfo) {
				if (!instance)
					instance = new Test;
			}
		}

		return instance;
	}

	public void run() {
		foreach (Watcher watcher; this.watchers) {
			spawn(&watcher.run);
		}
	}
}

class Watcher {
	public void run() {
		while (true) {
			// job
		}
	}
}

Error: template std.concurrency.spawn cannot deduce function from argument types !()(void delegate()).

I would not want to do this from main because it breaks the structure of my program. Is there a way to do it the way I want?

Yes, you'll have to make the function that you want to run static and all the arguments must be shared qualified, ie, TLS not allowed


import std.concurrency;
import core.thread;
import std.stdio:writeln;

void main() {
	Test.getInstance.run;
}

class Test {
	private {
		__gshared Test instance;
		shared Watcher[] watchers = [new Watcher(), new Watcher()]; //notice the shared // I used 2 values just to show some output
	}

	protected this() {
	}

	public static Test getInstance() {
		if (!instance) {
			synchronized (Test.classinfo) {
				if (!instance)
					instance = new Test;
			}
		}

		return instance;
	}

	public void run() {
		foreach (ref/+use ref to ensure no copies are made. I don't tknow the right thing to do here, the errors went away when I used ref so...+/ watcher; this.watchers) {
			spawn(&Watcher.run, watcher);
		}
	}
}

class Watcher {
	static public void run(shared Watcher watcher/+sending as argument since function can't have an invisible this parameter anymore+/) {//now this is static
		while (true) {
			// job
      writeln("It works now :D");
      break; //wrote this so that you can copy-paste to run.dlang.io
		}
	}
}
December 28, 2021

On Tuesday, 28 December 2021 at 15:42:04 UTC, Tejas wrote:

>

On Tuesday, 28 December 2021 at 14:19:46 UTC, Bagomot wrote:

>

On Monday, 27 December 2021 at 10:59:07 UTC, Ali Çehreli wrote:

>

On 12/27/21 1:33 AM, Bagomot wrote:

>

separate thread, without blocking the main one.

I think you can use std.concurrency there. I have a chapter here:

http://ddili.org/ders/d.en/concurrency.html

Look for 'struct Exit' to see how the main thread signals workers to stop running.

And some std.concurrency hints appear in my DConf Online 2020 presentation here:

https://dconf.org/2020/online/#ali1

Ali

I tried to run with std.concurrency via spawn, but this does not work for me for the reason that in the program I run the thread not from main, but from the object. It looks something like this:

import std.concurrency;
import std.thread;

void main() {
	Test.getInstance.run;
}

class Test {
	private {
		__gshared Test instance;
		Watcher[] watchers;
	}

	protected this() {
	}

	public static Test getInstance() {
		if (!instance) {
			synchronized (Test.classinfo) {
				if (!instance)
					instance = new Test;
			}
		}

		return instance;
	}

	public void run() {
		foreach (Watcher watcher; this.watchers) {
			spawn(&watcher.run);
		}
	}
}

class Watcher {
	public void run() {
		while (true) {
			// job
		}
	}
}

Error: template std.concurrency.spawn cannot deduce function from argument types !()(void delegate()).

I would not want to do this from main because it breaks the structure of my program. Is there a way to do it the way I want?

Yes, you'll have to make the function that you want to run static and all the arguments must be shared qualified, ie, TLS not allowed


import std.concurrency;
import core.thread;
import std.stdio:writeln;

void main() {
	Test.getInstance.run;
}

class Test {
	private {
		__gshared Test instance;
		shared Watcher[] watchers = [new Watcher(), new Watcher()]; //notice the shared // I used 2 values just to show some output
	}

	protected this() {
	}

	public static Test getInstance() {
		if (!instance) {
			synchronized (Test.classinfo) {
				if (!instance)
					instance = new Test;
			}
		}

		return instance;
	}

	public void run() {
		foreach (ref/+use ref to ensure no copies are made. I don't tknow the right thing to do here, the errors went away when I used ref so...+/ watcher; this.watchers) {
			spawn(&Watcher.run, watcher);
		}
	}
}

class Watcher {
	static public void run(shared Watcher watcher/+sending as argument since function can't have an invisible this parameter anymore+/) {//now this is static
		while (true) {
			// job
      writeln("It works now :D");
      break; //wrote this so that you can copy-paste to run.dlang.io
		}
	}
}

I can't do it according to your example, my Watcher list fills up at runtime.

December 28, 2021

On Tuesday, 28 December 2021 at 16:29:05 UTC, Bagomot wrote:

>

I can't do it according to your example, my Watcher list fills up at runtime.

Yes, it's possible to do it at runtime as well(it already was happening at runtime), although I'll be using a cast for convenience now.

import std.concurrency;
import core.thread;
import std.stdio:writeln,readf;
void main() {
  writeln("Please enter num of elements");
  int a;
  readf!"%d"(a);
  foreach(number; 0..a){
    Test.getInstance.watchers ~= new Watcher();//will have to use operations from core.atomic if you want to read/write shared variables, that's why I didn't declare the array as shared
  }
	Test.getInstance.run;
}

class Test {
	private {
		__gshared Test instance;
		/+shared+/ Watcher[] watchers;
	}

	protected this() {
	}

	public static Test getInstance() {
		if (!instance) {
			synchronized (Test.classinfo) {
				if (!instance)
					instance = new Test;
			}
		}

		return instance;
	}

	public void run() {
		foreach (ref watcher; cast(shared)/+using cast so that TLS gets disabled)+/this.watchers) {
			spawn(&Watcher.run, watcher);
		}
	}
}

class Watcher {
	static public void run(shared Watcher watcher) {
		while (true) {
			// job
      writeln("It works now :D");
      break;
		}
	}
}
December 28, 2021
Thanks! It works.
Perhaps there will still be difficulties, I will write here.
January 12, 2022

Good day! I keep giving rise to problems.
Above, Tejas helped me a lot, but still doesn't work.
I gave up using the fswatch library, thinking that the problem was in it.
Now trying to do it using libasync.

Here is the code that runs on the main thread, it blocks further actions on that thread.

#!/usr/bin/env dub
/+ dub.sdl:
	dependency "libasync" version="~>0.8.6"
+/

import std.stdio;
import std.file;
import std.algorithm;
import core.thread;
import std.concurrency;

import libasync;
import libasync.watcher;
import libasync.threads;

void main() {

	string testDir = "temp";
	if (!testDir.exists)
		mkdir(testDir);

	Guard guard = Guard.getInstance;
	guard.addWatchedDir(testDir, false);
	guard.run;

	writeln("Some kind of action...");
}

class Guard {
	private {
		__gshared Guard instance;
		static EventLoop eventLoop;
		WatchedDir[] watchedDirs;
	}

	protected this() {
		this.eventLoop = getThreadEventLoop();
	}

	shared static ~this() {
		destroyAsyncThreads();
	}

	public static Guard getInstance() {
		if (!instance) {
			synchronized (Guard.classinfo) {
				if (!instance)
					instance = new Guard;
			}
		}

		return instance;
	}

	public void run() {
		while (eventLoop.loop()) {
			continue;
		}
	}

	public void addWatchedDir(string dir, bool recursive = true, string[] exclude = [
		]) {
		if (this.watchedDirs.canFind!(a => a.dir == dir))
			return;

		this.watchedDirs ~= new WatchedDir(dir, recursive, exclude);
	}

	class WatchedDir {
		private {
			string dir;
			bool recursive;
			string[] exclude;
			AsyncDirectoryWatcher watcher;
			DWChangeInfo[8] changeBuf;
		}

		this(string dir, bool recursive, string[] exclude) {
			this.dir = dir;
			this.recursive = recursive;
			this.exclude = exclude;
			this.watcher = new AsyncDirectoryWatcher(eventLoop);

			this.watcher.run({
				DWChangeInfo[] changes = changeBuf[];
				uint cnt;

				do {
					cnt = this.watcher.readChanges(changes);
					foreach (i; 0 .. cnt) {
						writeln("Main Callback got directory event: ", changes[i]);
					}
				}
				while (cnt > 0);
			});
			this.watcher.watchDir(this.dir, DWFileEvent.ALL, this.recursive);
		}
	}
}

If I change the run method of the Guard class so that it starts a new thread, the program just does nothing:

public void run() {
	spawn((shared EventLoop eventLoop) {
		while ((cast() eventLoop).loop()) {
			continue;
		}
	}, cast(shared) this.eventLoop);
}

Why? What am I doing wrong?

January 15, 2022

On Wednesday, 12 January 2022 at 08:50:09 UTC, Bagomot wrote:

>

Good day! I keep giving rise to problems.
Above, Tejas helped me a lot, but still doesn't work.
I gave up using the fswatch library, thinking that the problem was in it.
Now trying to do it using libasync.

Here is the code that runs on the main thread, it blocks further actions on that thread.

#!/usr/bin/env dub
/+ dub.sdl:
	dependency "libasync" version="~>0.8.6"
+/

import std.stdio;
import std.file;
import std.algorithm;
import core.thread;
import std.concurrency;

import libasync;
import libasync.watcher;
import libasync.threads;

void main() {

	string testDir = "temp";
	if (!testDir.exists)
		mkdir(testDir);

	Guard guard = Guard.getInstance;
	guard.addWatchedDir(testDir, false);
	guard.run;

	writeln("Some kind of action...");
}

class Guard {
	private {
		__gshared Guard instance;
		static EventLoop eventLoop;
		WatchedDir[] watchedDirs;
	}

	protected this() {
		this.eventLoop = getThreadEventLoop();
	}

	shared static ~this() {
		destroyAsyncThreads();
	}

	public static Guard getInstance() {
		if (!instance) {
			synchronized (Guard.classinfo) {
				if (!instance)
					instance = new Guard;
			}
		}

		return instance;
	}

	public void run() {
		while (eventLoop.loop()) {
			continue;
		}
	}

	public void addWatchedDir(string dir, bool recursive = true, string[] exclude = [
		]) {
		if (this.watchedDirs.canFind!(a => a.dir == dir))
			return;

		this.watchedDirs ~= new WatchedDir(dir, recursive, exclude);
	}

	class WatchedDir {
		private {
			string dir;
			bool recursive;
			string[] exclude;
			AsyncDirectoryWatcher watcher;
			DWChangeInfo[8] changeBuf;
		}

		this(string dir, bool recursive, string[] exclude) {
			this.dir = dir;
			this.recursive = recursive;
			this.exclude = exclude;
			this.watcher = new AsyncDirectoryWatcher(eventLoop);

			this.watcher.run({
				DWChangeInfo[] changes = changeBuf[];
				uint cnt;

				do {
					cnt = this.watcher.readChanges(changes);
					foreach (i; 0 .. cnt) {
						writeln("Main Callback got directory event: ", changes[i]);
					}
				}
				while (cnt > 0);
			});
			this.watcher.watchDir(this.dir, DWFileEvent.ALL, this.recursive);
		}
	}
}

If I change the run method of the Guard class so that it starts a new thread, the program just does nothing:

public void run() {
	spawn((shared EventLoop eventLoop) {
		while ((cast() eventLoop).loop()) {
			continue;
		}
	}, cast(shared) this.eventLoop);
}

Why? What am I doing wrong?

Actual!

January 15, 2022

On Wednesday, 12 January 2022 at 08:50:09 UTC, Bagomot wrote:

>

Why? What am I doing wrong?

I guess your main() exits and just ends all threads?

« First   ‹ Prev
1 2