July 15, 2022

Hello there,

I wanted to create a multi-process program, and share some data between the processes (for a heavy task, so a thread wouldn't be efficient). However, I didn't see any way to multi-process a function in the druntime/phobos, so I created a simple Process class :

module process;

import core.sys.posix.unistd : fork, _exit;
import core.sys.posix.sys.wait;
import std.process : ProcessException;
import std.functional : toDelegate;

class Process
{
private:
    pid_t pid;

public:
    this(T, Args...)(T function(Args) fn, Args args)
    {
        this(fn.toDelegate, args);
    }

    this(T, Args...)(T delegate(Args) dg, Args args)
        if (is(T == int) || is(T == void))
    {
        pid_t pid = fork();
        if (pid < 0)
        {
            throw ProcessException.newFromErrno("Failed to spawn new process");
        }
        else if (pid == 0)
        {
            // child
            static if (is(T == int))
            {
                _exit(dg(args));
            }
            else
            {
                dg(args);
                _exit(0);
            }
        }
        else
        {
            // parent
            this.pid = pid;
        }
    }

    int wait()
    {
        int status = void;
        waitpid(pid, &status, 0);
        return WEXITSTATUS(status);
    }
}

Nice. Now, let's use it :

import process;
import std.stdio;
import core.thread.osthread : Thread;
import core.time;


void job(int i)
{
    Thread.sleep(dur!"seconds"(1));
    writeln(i);
}

void main()
{
    enum NB_PROCESS = 15;

    Process[NB_PROCESS] processes;
    foreach(i ; 0 .. NB_PROCESS)
    {
        processes[i] = new Process(&job, i);
    }

    scope(exit)
    {
        foreach(i ; 0 .. NB_PROCESS)
        {
            processes[i].wait();
        }
    }
}

[x] create multiple processes.

Ok, now we need to share data between all the processes.

Maybe the synchronized keyword ? no, that's only for threads.
Let's use a Mutex from core.sync.mutex then. Wait, nope, it does not support multi-processes, pthread_mutexattr_getpshared is not set to PTHREAD_PROCESS_SHARED [source code][man].

So, I need to modify it. Let's copy-pasta core.sync.mutex and add the following modifications to the line 92 :

            !pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED) ||
                abort("Error: pthread_mutexattr_setpshared failed.");

Rename the module to avoid conflicts, and voilĂ  !

[x] Mutex for multiple processes.

Now let's give it a try. But how to check if it correctly works ? We need to create a shared memory space. Maybe a shared variable ? No, same problem as before ; it's only for multi-threading. Arg, got it. D isn't made for multi-processing, I'll code it.

3 hours later :

module shared_memory;

import core.sys.posix.sys.mman;
import core.sys.posix.unistd;
import core.sys.posix.fcntl;
import std.conv : emplace;

class SharedMemory(T, Args...)
{
private:
    static if (is(T == class))
        T cl;
    T* ptr;
    string ident;

public:
    @nogc
    this(string identifier, Args args)
    {
        ident = identifier;

        static if (is(T == class))
            size_t size = __traits(classInstanceSize, T);
        else
            size_t size = T.sizeof;

        int shm_fd = shm_open(ident.ptr, O_CREAT | O_RDWR, 0x1b6); // 0x1b6 = 0666 (octal)
        ftruncate(shm_fd, size);
        void[] memory = mmap(null, size, PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0)[0..size];

        static if (is(T == class))
        {
            cl = emplace!(T, Args)(memory, args);
            ptr = &cl;
        }
        else
        {
            ptr = emplace!(T, Args)(memory, args);
        }
    }

    //@nogc
    void unlink()
    {
        static if (is(T == class))
            destroy(cl);
        shm_unlink(ident.ptr); // shm_unlink also closes the file descriptor
    }

    T* data()
    {
        return ptr;
    }
}

Seems to work, now let's create a test program :

import process;
import mutex;
import shared_memory;

import std.stdio;
import core.thread.osthread : Thread;
import core.time;

void job(int i, SharedMemory!Mutex mutex, SharedMemory!ulong sm)
{
    auto m = *mutex.data;
    Thread.sleep(dur!"msecs"(1000)); // force the scheduler to change the execution context
    m.lock();
    Thread.sleep(dur!"msecs"(100)); // ditto
    foreach (j; 1 .. 100)
        *sm.data += i * j - i + i*i*i*i; // "random" calculations
    m.unlock();
}

void main()
{
    enum NB_PROCESS = 100;

    auto sum = new SharedMemory!ulong("sum");
    auto mutex = new SharedMemory!Mutex("mutex");

    *sum.data = 0;

    Process[NB_PROCESS] processes;
    foreach(i ; 0 .. NB_PROCESS)
    {
        processes[i] = new Process(&job, i, mutex, sum);
    }

    foreach(i ; 0 .. NB_PROCESS)
    {
        processes[i].wait();
    }

    if (*sum.data != 193107012120)
    {
        writeln("failed ! sum: ", *sum.data);
    }

    import core.memory : GC;
    GC.collect(); // force collection

    sum.unlink();
    mutex.unlink();
}

Let's run our program a hundred times : for i in 0..100; do ./mulproc; done. No output, so everything worked properly ! Yaay !

[x] Share data between processes

Achievement get: multi-processing in D.


Fine. This was fun to do.

However, I have two questions :

  • Why wasn't this implemented in druntime/phobos ?
  • Can we implement this (also for Non-POSIX systems) in druntime/phobos ? If yes, how to do it properly (i.e. by not using my code)

Bests,
Luhrel