August 09, 2023

hi all,

question about runWorkerTaskH, since I have a CPU intensive task and need to execute it in thread.

In the example bellow, for each WS connection a new worker task is started. Since the task will perform heavy computations, I expect that each connection will have its own new thread.

But logs and task manager show me a strange behaviour, where computations for different WS are done on same thread???

Maybe I do something wrong?

/*
 * Case 01
 * -------
 * Worker task is executed always in same worker thread.
 *
 * How-To:
 * - open 4 parallel WS connection to service.
 * - in each connection send channel id, eg. "ch01", "ch02", ...
 * - observe logs
 * - worker thread name is always same
 */

import vibe.d;
import vibe.vibe;
import vibe.core.core;
import vibe.http.server;
import vibe.http.websockets;
import vibe.http.router;
import vibe.inet.url;

import core.time;
import core.thread : Thread;
import std.conv;

static void workerFuncPingPong(Task caller, string channel_id) nothrow {
	int counter = 5;
	try {
		logInfo("WORKER :: thread-id=%s caller=%s channel-id=%s THREAD=%s", thisTid, caller, channel_id, Thread.getThis().name);
		while (receiveOnly!string() == "ping" && --counter) {
			logInfo("%s :: %s :: pong=%s", Thread.getThis().name, channel_id, counter);
			caller.send("pong");
			sleep(2.seconds);
		}
		caller.send("goodbye");
	} catch (Exception e) assert(false, e.msg);
}

class WebsocketService {
	@path("/ws") void getWebsocket1(scope WebSocket ws){
		logInfo("X> connected=%s, ws=%s code=%s THREAD=%s", ws.connected, &ws, ws.closeCode, Thread.getThis().name);
		
		auto channel_id = ws.receiveText;
		logInfo("Receive channel '%s'.", channel_id);

		auto callee = runWorkerTaskH(&workerFuncPingPong, Task.getThis, channel_id);
		do {
			logInfo("ping");
			callee.send("ping");
		} while (receiveOnly!string() == "pong");
		logInfo("Client disconnected - worker is done. THREAD=%s", Thread.getThis().name);
	}
}

void helloWorld(HTTPServerRequest req, HTTPServerResponse res)
{	
    res.writeBody("Hello");
}

void main()
{
	logInfo("APP::CASE::01");
	auto router = new URLRouter;
	router.registerWebInterface(new WebsocketService());
	router.get("/hello", &helloWorld);

	auto settings = new HTTPServerSettings;
	settings.port = 8080;
	settings.bindAddresses = ["::1", "127.0.0.1"];

	auto listener = listenHTTP(settings, router);
	scope (exit)
	{
		listener.stopListening();
	}

	runApplication();
}

and here is console output

[main(----) INF] APP::CASE::01
[main(----) INF] Listening for requests on http://[::1]:8080/
[main(----) INF] Listening for requests on http://127.0.0.1:8080/
[main(8ZfR) INF] X> connected=true, ws=7F67F6D66DD8 code=0 THREAD=main
[main(W/zo) INF] X> connected=true, ws=7F67F5D65DD8 code=0 THREAD=main
[main(9a8B) INF] X> connected=true, ws=7F67F7D67DD8 code=0 THREAD=main
[main(KFgz) INF] X> connected=true, ws=7F67F8D68DD8 code=0 THREAD=main

[main(8ZfR) INF] Receive channel '3-fibonacci'.
[main(W/zo) INF] Receive channel '1-fibonacci'.
[main(9a8B) INF] Receive channel '2-fibonacci'.
[main(KFgz) INF] Receive channel '0-fibonacci'.

[vibe-4(wpkK) INF] WORKER :: thread-id=Tid(7f67fb24c8f0) caller=7F67FB239A00:1 channel-id=3-fibonacci THREAD=vibe-4
[vibe-14(Prm2) INF] WORKER :: thread-id=Tid(7f67fb24cd10) caller=7F67FB239C00:1 channel-id=1-fibonacci THREAD=vibe-14
[vibe-14(KRJj) INF] WORKER :: thread-id=Tid(7f67fb257000) caller=7F67FB239800:1 channel-id=2-fibonacci THREAD=vibe-14
[vibe-14(NkC2) INF] WORKER :: thread-id=Tid(7f67fb2570b0) caller=7F67FB239600:5 channel-id=0-fibonacci THREAD=vibe-14
[main(W/zo) INF] ping
[main(8ZfR) INF] ping
[main(9a8B) INF] ping
[main(KFgz) INF] ping
[vibe-14(Prm2) INF] vibe-14 :: 1-fibonacci :: pong=4
[vibe-4(wpkK) INF] vibe-4 :: 3-fibonacci :: pong=4
[vibe-14(KRJj) INF] vibe-14 :: 2-fibonacci :: pong=4
[vibe-14(NkC2) INF] vibe-14 :: 0-fibonacci :: pong=4
...

Above I would expect 4 different thread names since I have 4 ws connections running, but instead I have 4 connections and only 2 threads!

Thanks in advance for help and hints!