Hello there,
I wanted to create a multi-process program, and share some data between the processes (for a heavy task, so a thread wouldn't be efficient). However, I didn't see any way to multi-process a function in the druntime/phobos, so I created a simple Process class :
module process;
import core.sys.posix.unistd : fork, _exit;
import core.sys.posix.sys.wait;
import std.process : ProcessException;
import std.functional : toDelegate;
class Process
{
private:
pid_t pid;
public:
this(T, Args...)(T function(Args) fn, Args args)
{
this(fn.toDelegate, args);
}
this(T, Args...)(T delegate(Args) dg, Args args)
if (is(T == int) || is(T == void))
{
pid_t pid = fork();
if (pid < 0)
{
throw ProcessException.newFromErrno("Failed to spawn new process");
}
else if (pid == 0)
{
// child
static if (is(T == int))
{
_exit(dg(args));
}
else
{
dg(args);
_exit(0);
}
}
else
{
// parent
this.pid = pid;
}
}
int wait()
{
int status = void;
waitpid(pid, &status, 0);
return WEXITSTATUS(status);
}
}
Nice. Now, let's use it :
import process;
import std.stdio;
import core.thread.osthread : Thread;
import core.time;
void job(int i)
{
Thread.sleep(dur!"seconds"(1));
writeln(i);
}
void main()
{
enum NB_PROCESS = 15;
Process[NB_PROCESS] processes;
foreach(i ; 0 .. NB_PROCESS)
{
processes[i] = new Process(&job, i);
}
scope(exit)
{
foreach(i ; 0 .. NB_PROCESS)
{
processes[i].wait();
}
}
}
[x] create multiple processes.
Ok, now we need to share data between all the processes.
Maybe the synchronized
keyword ? no, that's only for threads.
Let's use a Mutex from core.sync.mutex then. Wait, nope, it does not support multi-processes, pthread_mutexattr_getpshared
is not set to PTHREAD_PROCESS_SHARED
[source code][man].
So, I need to modify it. Let's copy-pasta core.sync.mutex
and add the following modifications to the line 92 :
!pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED) ||
abort("Error: pthread_mutexattr_setpshared failed.");
Rename the module to avoid conflicts, and voilĂ !
[x] Mutex for multiple processes.
Now let's give it a try. But how to check if it correctly works ? We need to create a shared memory space. Maybe a shared
variable ? No, same problem as before ; it's only for multi-threading. Arg, got it. D isn't made for multi-processing, I'll code it.
3 hours later :
module shared_memory;
import core.sys.posix.sys.mman;
import core.sys.posix.unistd;
import core.sys.posix.fcntl;
import std.conv : emplace;
class SharedMemory(T, Args...)
{
private:
static if (is(T == class))
T cl;
T* ptr;
string ident;
public:
@nogc
this(string identifier, Args args)
{
ident = identifier;
static if (is(T == class))
size_t size = __traits(classInstanceSize, T);
else
size_t size = T.sizeof;
int shm_fd = shm_open(ident.ptr, O_CREAT | O_RDWR, 0x1b6); // 0x1b6 = 0666 (octal)
ftruncate(shm_fd, size);
void[] memory = mmap(null, size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0)[0..size];
static if (is(T == class))
{
cl = emplace!(T, Args)(memory, args);
ptr = &cl;
}
else
{
ptr = emplace!(T, Args)(memory, args);
}
}
//@nogc
void unlink()
{
static if (is(T == class))
destroy(cl);
shm_unlink(ident.ptr); // shm_unlink also closes the file descriptor
}
T* data()
{
return ptr;
}
}
Seems to work, now let's create a test program :
import process;
import mutex;
import shared_memory;
import std.stdio;
import core.thread.osthread : Thread;
import core.time;
void job(int i, SharedMemory!Mutex mutex, SharedMemory!ulong sm)
{
auto m = *mutex.data;
Thread.sleep(dur!"msecs"(1000)); // force the scheduler to change the execution context
m.lock();
Thread.sleep(dur!"msecs"(100)); // ditto
foreach (j; 1 .. 100)
*sm.data += i * j - i + i*i*i*i; // "random" calculations
m.unlock();
}
void main()
{
enum NB_PROCESS = 100;
auto sum = new SharedMemory!ulong("sum");
auto mutex = new SharedMemory!Mutex("mutex");
*sum.data = 0;
Process[NB_PROCESS] processes;
foreach(i ; 0 .. NB_PROCESS)
{
processes[i] = new Process(&job, i, mutex, sum);
}
foreach(i ; 0 .. NB_PROCESS)
{
processes[i].wait();
}
if (*sum.data != 193107012120)
{
writeln("failed ! sum: ", *sum.data);
}
import core.memory : GC;
GC.collect(); // force collection
sum.unlink();
mutex.unlink();
}
Let's run our program a hundred times : for i in 0..100; do ./mulproc; done
. No output, so everything worked properly ! Yaay !
[x] Share data between processes
Achievement get: multi-processing in D.
Fine. This was fun to do.
However, I have two questions :
- Why wasn't this implemented in druntime/phobos ?
- Can we implement this (also for Non-POSIX systems) in druntime/phobos ? If yes, how to do it properly (i.e. by not using my code)
Bests,
Luhrel