Jump to page: 1 2
Thread overview
Tracing/Profiling D Applications
May 25, 2022
Christian Köstlin
May 25, 2022
Ali Çehreli
May 26, 2022
Christian Köstlin
May 26, 2022
Ali Çehreli
May 27, 2022
Christian Köstlin
May 29, 2022
Ali Çehreli
May 29, 2022
Christian Köstlin
May 29, 2022
Ali Çehreli
May 31, 2022
Christian Köstlin
May 29, 2022
Christian Köstlin
May 29, 2022
Ali Çehreli
May 31, 2022
Christian Köstlin
May 25, 2022
frame
May 26, 2022
Christian Köstlin
May 25, 2022

I experimented with application level tracing/profiling of d applications similar to what is described in https://dlang.org/blog/2020/03/13/tracing-d-applications/ as the "writef-based approach". Only difference is, that I am emitting json (https://docs.google.com/document/d/1CvAClvFfyA5R-PhYUmn5OOQtYMH4h6I0nSsKchNAySU/edit#heading=h.lpfof2aylapb) that is compatible with https://ui.perfetto.dev/ which gives nice interactive insights of the data.

The code is available at https://github.com/gizmomogwai/profiled and can be used as dub package by registering it locally with dub add-local. The github repository also contains one screenshot produced with a patched unit-threaded library to emit the tracing information on a small project of mine.

I do have a few question on how to do things in dlang though:

  1. I went for a singleton for storing tracing/logging information that needs to be initialized manually. Is __gshared the right way to do that?

  2. The data is stored in a std.array.appender protected by a mutex and only later printed as json. Any way to do this more elegantly than with the scope(exit) constructs to unlock the mutex?

  3. For the tracing/profiling events I went with subclasses of an abstract event class (I only implemented one kind of event, whereas the tracing json format supports a lot more scenarios). When one is interested in when something happens and how long it takes, one can call
    public Unique!CompleteEventProcess start(string name) on the profiler which an uniquepointer that is then automatically deleted if it goes out of scope (and then takes the time). Is this a reasonable way todo it? E.g. it would be an error if the unique ptr is passed to another thread, because the CompleteEvents of the tracing format expect to be starting and stopping on the same thread.

  4. Would something like that be interesting for others as a dub package (ATM its so small, that having it as an external dependency is not worth it IMHO)?

  5. The complete event normally expects the real process and thread id as used by the os. Looks like https://dlang.org/phobos/std_process.html#.Pid.processID delivers the process id. Is there also a way to get the "real" threadid?

Thanks in advance,
Christian

May 25, 2022
On 5/25/22 14:35, Christian Köstlin wrote:

> 1. I went for a singleton for storing tracing/logging information that
> needs to be initialized manually. Is __gshared the right way to do that?

I think this is where thread-local storage comes in handy. As the D runtime does for dmd's -profile command line switch, you can have a module-level Appender, which collects data for that thread without worrying about race conditions.

> to unlock the mutex?

You need the mutex only when each thread is exiting to combine their results. Each static ~this() could grab the mutex and add its result to the one global Appender. And that "global" Appender would be 'shared' or __gshared; doesn't matter.

Ali

May 25, 2022
On Wednesday, 25 May 2022 at 21:35:07 UTC, Christian Köstlin wrote:
Is there also a way to get the "real"
> threadid?

I'm using that functions inside threads:

core.sys.windows.winbase.GetCurrentThreadId on Windows
core.sys.posix.pthread.pthread_self on Unix (implied pthreads are used)


May 26, 2022

On 2022-05-26 01:05, frame wrote:

>

On Wednesday, 25 May 2022 at 21:35:07 UTC, Christian Köstlin wrote:
Is there also a way to get the "real"

>

threadid?

I'm using that functions inside threads:

core.sys.windows.winbase.GetCurrentThreadId on Windows
core.sys.posix.pthread.pthread_self on Unix (implied pthreads are used)Thanks, I will give those a try!

May 26, 2022

On 2022-05-25 23:56, Ali Çehreli wrote:

>

On 5/25/22 14:35, Christian Köstlin wrote:

>
  1. I went for a singleton for storing tracing/logging information that
    needs to be initialized manually. Is __gshared the right way to do that?

I think this is where thread-local storage comes in handy. As the D runtime does for dmd's -profile command line switch, you can have a module-level Appender, which collects data for that thread without worrying about race conditions.
Nice idea. Similar to the fast singleton here: https://p0nce.github.io/d-idioms/#Leveraging-TLS-for-a-fast-thread-safe-singleton

Not sure if this will work for me though, as I want to be able to dump tracings even while the program is still running. Then I would have to collect the tls data of all still running threads.

> >

to unlock the mutex?

You need the mutex only when each thread is exiting to combine their results. Each static ~this() could grab the mutex and add its result to the one global Appender. And that "global" Appender would be 'shared' or __gshared; doesn't matter.
Thanks for the explanation. From my understanding __gshared does not put the shared type attribute the the profiler. If I use shared I would also have to make all methods I call on the shared profiler shared and also would have to have a shared appender (or cast shared away)?

Kind regards,
Christian

May 26, 2022
On 5/26/22 12:54, Christian Köstlin wrote:

> I want to be able to dump
> tracings even while the program is still running. Then I would have to
> collect the tls data of all still running threads.

I am not sure without testing but I am under the impression that mutexes can be very slow especially if the data collection is happening very frequently. The threads may not have breathing room to perform their tasks. Since dumping would be much slower compared to the performance penalty of grabbing a mutex that it can be measured as zero-cost in the dumping time. :) So, you may still want to collect per-thread and combine incrementally as you dump? Perhaps a premature optimization... Please test. :)

If you want to combine before dumping, then collector registration may be needed so that the dumper can contact each collector and say "please add your data".

> __gshared does not put
> the shared type attribute the the profiler.

Correct.

> If I use shared I would also
> have to make all methods I call on the shared profiler shared and also
> would have to have a shared appender (or cast shared away)?

Possibly. :/

Ali

May 27, 2022

On 2022-05-26 22:19, Ali Çehreli wrote:

>

On 5/26/22 12:54, Christian Köstlin wrote:

>

I want to be able to dump
tracings even while the program is still running. Then I would have to
collect the tls data of all still running threads.

I am not sure without testing but I am under the impression that mutexes can be very slow especially if the data collection is happening very frequently. The threads may not have breathing room to perform their tasks. Since dumping would be much slower compared to the performance penalty of grabbing a mutex that it can be measured as zero-cost in the dumping time. :) So, you may still want to collect per-thread and combine incrementally as you dump? Perhaps a premature optimization... Please test. :)

You are certainly right.. It really depends on the usecase. I also think my current approach is not good to e.g. do syscall tracing of something like read. I wonder how I can synchronize the "dumping" and the collection of the threads. Would be cool to have an efficient lockless implementation of appender ...

>

If you want to combine before dumping, then collector registration may be needed so that the dumper can contact each collector and say "please add your data".

>

__gshared does not put
the shared type attribute the the profiler.

Correct.
thanks!

> >

If I use shared I would also
have to make all methods I call on the shared profiler shared and also
would have to have a shared appender (or cast shared away)?

Possibly. :/

Ali


Christian

May 29, 2022
On 5/27/22 06:55, Christian Köstlin wrote:

> I wonder how I can synchronize the "dumping" and the
> collection of the threads. Would be cool to have an efficient lockless
> implementation of appender ...

That turned out to be nontrivial.

The following is a draft I played with. Collector collects and Dumper dumps. They use a SpinLock, an unpublished feature of core.internal for locking. The implementation of spinlock (e.g. at /usr/include/dlang/dmd/core/internal/spinlock.d) has a reference to "test and test-and-set (TTAS)":

  https://en.wikipedia.org/wiki/Test_and_test-and-set

I learned about TTAS from Rikki Cattermole yesterday at TeaConf. :)

The code is attached and works on my system.

Ali

import std;
import std.datetime.stopwatch;
import core.thread;
import core.atomic;
import core.internal.spinlock;

enum workerCount = 8;
enum threadRunTime = 4.seconds;
enum mainRunTime = threadRunTime + 1.seconds;

shared struct ScopeLock {
  @disable this(this);
  @disable void opAssign(ref const(typeof(this)));

  SpinLock * lock;

  this(shared(SpinLock) * lock) {
    this.lock = lock;
    lock.lock();
  }

  ~this() {
    lock.unlock();
  }
}

struct Collector {
  long[] data;

  shared(SpinLock) lock;

  auto scopeLock() shared {
    return ScopeLock(&lock);
  }

  // Adds a data point to this collector.
  void add(long i) shared {
    auto sl = scopeLock();

    /// Some crazy way of adding data points. Real code should
    // make more sense.
    data ~= i;
  }

  // Adds the data of this collector to the specified array
  // array. Again, real code should use a more sophisticated
  // method.
  void aggregate(ref long[] where) shared {
    auto sl = scopeLock();

    where ~= data.sum;
    data.length = 0;
    (cast(long[])data).assumeSafeAppend();
  }
}

// A variable to help us trust the code. We will print this at
// the end of main.
long allThatHasBeenDumped = 0;
// Used only for validating the code.
shared long allCollectedByThreads;

synchronized class Dumper {
private:
  shared(Collector)*[] collectors;

  void register(shared(Collector) * collector) shared {
    writeln("registering ", collector);
    collectors ~= collector;
  }

  // Dumps current results.
  void dump(File output) shared {
    long[] data;

    foreach (collector; collectors) {
      collector.aggregate(data);
    }

    const allData = data.sum;

    if (allData != 0) {
      stdout.writefln!"Just collected:%-(\n  %,s%)"(data);
      allThatHasBeenDumped += allData;
    }
  }
}

shared(Dumper) dumper;

shared static this() {
  writeln("Making a Dumper");
  dumper = new Dumper();
}

shared(Collector) * collector;

static this() {
  writeln("Making a Collector");
  collector = new shared(Collector)();
  dumper.register(cast(shared)collector);
}

// Main thread function
void doWork() {
  try {
    doWorkImpl();

  } catch (Throwable exc) {
    stderr.writeln("Caught Throwable: ", exc.msg);
  }
}

// The implementation of each thread.
void doWorkImpl() {
  auto sw = StopWatch();
  sw.start();

  long i = 0;
  while (sw.peek < threadRunTime) {
    (cast(shared)collector).add(i);
    ++i;
  }

  --i;
  auto total = i * (i + 1) / 2;
  writefln("Thread collected %,s items equaling %,s with %s",
           i, total, collector);

  atomicOp!"+="(allCollectedByThreads, total);
}

void main() {
  writeln("main started");
  iota(workerCount).each!(_ => spawn(&doWork));

  auto sw = StopWatch();
  sw.start();

  while (sw.peek < mainRunTime) {
    dumper.dump(stdout);
    Thread.sleep(100.msecs);
  }

  // One final collection (and dump):
  dumper.dump(stdout);

  assert(allThatHasBeenDumped == allCollectedByThreads);
}

May 29, 2022

On 2022-05-29 20:52, Ali Çehreli wrote:

>

On 5/27/22 06:55, Christian Köstlin wrote:

>

I wonder how I can synchronize the "dumping" and the
collection of the threads. Would be cool to have an efficient lockless
implementation of appender ...

That turned out to be nontrivial.

The following is a draft I played with. Collector collects and Dumper dumps. They use a SpinLock, an unpublished feature of core.internal for locking. The implementation of spinlock (e.g. at /usr/include/dlang/dmd/core/internal/spinlock.d) has a reference to "test and test-and-set (TTAS)":

  https://en.wikipedia.org/wiki/Test_and_test-and-set

I learned about TTAS from Rikki Cattermole yesterday at TeaConf. :)

The code is attached and works on my system.

Ali

import std;
import std.datetime.stopwatch;
import core.thread;
import core.atomic;
import core.internal.spinlock;

enum workerCount = 8;
enum threadRunTime = 4.seconds;
enum mainRunTime = threadRunTime + 1.seconds;

shared struct ScopeLock {
  @disable this(this);
  @disable void opAssign(ref const(typeof(this)));

  SpinLock * lock;

  this(shared(SpinLock) * lock) {
    this.lock = lock;
    lock.lock();
  }

  ~this() {
    lock.unlock();
  }
}

struct Collector {
  long[] data;

  shared(SpinLock) lock;

  auto scopeLock() shared {
    return ScopeLock(&lock);
  }

  // Adds a data point to this collector.
  void add(long i) shared {
    auto sl = scopeLock();

    /// Some crazy way of adding data points. Real code should
    // make more sense.
    data ~= i;
  }

  // Adds the data of this collector to the specified array
  // array. Again, real code should use a more sophisticated
  // method.
  void aggregate(ref long[] where) shared {
    auto sl = scopeLock();

    where ~= data.sum;
    data.length = 0;
    (cast(long[])data).assumeSafeAppend();
  }
}

// A variable to help us trust the code. We will print this at
// the end of main.
long allThatHasBeenDumped = 0;
// Used only for validating the code.
shared long allCollectedByThreads;

synchronized class Dumper {
private:
  shared(Collector)*[] collectors;

  void register(shared(Collector) * collector) shared {
    writeln("registering ", collector);
    collectors ~= collector;
  }

  // Dumps current results.
  void dump(File output) shared {
    long[] data;

    foreach (collector; collectors) {
      collector.aggregate(data);
    }

    const allData = data.sum;

    if (allData != 0) {
      stdout.writefln!"Just collected:%-(\n  %,s%)"(data);
      allThatHasBeenDumped += allData;
    }
  }
}

shared(Dumper) dumper;

shared static this() {
  writeln("Making a Dumper");
  dumper = new Dumper();
} >
shared(Collector) * collector;

static this() {
  writeln("Making a Collector");
  collector = new shared(Collector)();
  dumper.register(cast(shared)collector);
}

// Main thread function
void doWork() {
  try {
    doWorkImpl();

  } catch (Throwable exc) {
    stderr.writeln("Caught Throwable: ", exc.msg);
  }
}

// The implementation of each thread.
void doWorkImpl() {
  auto sw = StopWatch();
  sw.start();

  long i = 0;
  while (sw.peek < threadRunTime) {
    (cast(shared)collector).add(i);
    ++i;
  }

  --i;
  auto total = i * (i + 1) / 2;
  writefln("Thread collected %,s items equaling %,s with %s",
           i, total, collector);

  atomicOp!"+="(allCollectedByThreads, total);
}

void main() {
  writeln("main started");
  iota(workerCount).each!(_ => spawn(&doWork));

  auto sw = StopWatch();
  sw.start();

  while (sw.peek < mainRunTime) {
    dumper.dump(stdout);
    Thread.sleep(100.msecs);
  }

  // One final collection (and dump):
  dumper.dump(stdout);

  assert(allThatHasBeenDumped == allCollectedByThreads);
}

Hi Ali,

thanks a lot for that, I will first have to digest that.
Just one first question: Our discussion with using TLS for the collectors proposed to not need any lock on the add method for collector, because its thread local and with that thread safe?

Kind regards,
Christian

May 29, 2022

On 2022-05-29 20:52, Ali Çehreli wrote:

>

On 5/27/22 06:55, Christian Köstlin wrote:

>

I wonder how I can synchronize the "dumping" and the
collection of the threads. Would be cool to have an efficient lockless
implementation of appender ...

That turned out to be nontrivial.

The following is a draft I played with. Collector collects and Dumper dumps. They use a SpinLock, an unpublished feature of core.internal for locking. The implementation of spinlock (e.g. at /usr/include/dlang/dmd/core/internal/spinlock.d) has a reference to "test and test-and-set (TTAS)":

  https://en.wikipedia.org/wiki/Test_and_test-and-set

I learned about TTAS from Rikki Cattermole yesterday at TeaConf. :)

The code is attached and works on my system.

Ali

import std;
import std.datetime.stopwatch;
import core.thread;
import core.atomic;
import core.internal.spinlock;

enum workerCount = 8;
enum threadRunTime = 4.seconds;
enum mainRunTime = threadRunTime + 1.seconds;

shared struct ScopeLock {
  @disable this(this);
  @disable void opAssign(ref const(typeof(this)));

  SpinLock * lock;

  this(shared(SpinLock) * lock) {
    this.lock = lock;
    lock.lock();
  }

  ~this() {
    lock.unlock();
  }
}

struct Collector {
  long[] data;

  shared(SpinLock) lock;

  auto scopeLock() shared {
    return ScopeLock(&lock);
  }

  // Adds a data point to this collector.
  void add(long i) shared {
    auto sl = scopeLock();

    /// Some crazy way of adding data points. Real code should
    // make more sense.
    data ~= i;
  }

  // Adds the data of this collector to the specified array
  // array. Again, real code should use a more sophisticated
  // method.
  void aggregate(ref long[] where) shared {
    auto sl = scopeLock();

    where ~= data.sum;
    data.length = 0;
    (cast(long[])data).assumeSafeAppend();
  }
}

// A variable to help us trust the code. We will print this at
// the end of main.
long allThatHasBeenDumped = 0;
// Used only for validating the code.
shared long allCollectedByThreads;

synchronized class Dumper {
private:
  shared(Collector)*[] collectors;

  void register(shared(Collector) * collector) shared {
    writeln("registering ", collector);
    collectors ~= collector;
  }

  // Dumps current results.
  void dump(File output) shared {
    long[] data;

    foreach (collector; collectors) {
      collector.aggregate(data);
    }

    const allData = data.sum;

    if (allData != 0) {
      stdout.writefln!"Just collected:%-(\n  %,s%)"(data);
      allThatHasBeenDumped += allData;
    }
  }
}

shared(Dumper) dumper;

shared static this() {
  writeln("Making a Dumper");
  dumper = new Dumper();
}

shared(Collector) * collector;

static this() {
  writeln("Making a Collector");
  collector = new shared(Collector)();
  dumper.register(cast(shared)collector);
}

// Main thread function
void doWork() {
  try {
    doWorkImpl();

  } catch (Throwable exc) {
    stderr.writeln("Caught Throwable: ", exc.msg);
  }
}

// The implementation of each thread.
void doWorkImpl() {
  auto sw = StopWatch();
  sw.start();

  long i = 0;
  while (sw.peek < threadRunTime) {
    (cast(shared)collector).add(i);
    ++i;
  }

  --i;
  auto total = i * (i + 1) / 2;
  writefln("Thread collected %,s items equaling %,s with %s",
           i, total, collector);

  atomicOp!"+="(allCollectedByThreads, total);
}

void main() {
  writeln("main started");
  iota(workerCount).each!(_ => spawn(&doWork));

  auto sw = StopWatch();
  sw.start();

  while (sw.peek < mainRunTime) {
    dumper.dump(stdout);
    Thread.sleep(100.msecs);
  }

  // One final collection (and dump):
  dumper.dump(stdout);

  assert(allThatHasBeenDumped == allCollectedByThreads);
}

According to https://www.schveiguy.com/blog/2022/05/comparing-exceptions-and-errors-in-d/ its bad to catch Errors ... so dowork should catch only Exception? Or is this a special case to just log the error per thread and be done with it? still if not everything is cleaned up correctly it might be better to crash directly ...

Kind regards,
Christian

« First   ‹ Prev
1 2