Thread overview
Implementing a Monitor
Jul 27, 2012
Minas Mina
Jul 30, 2012
Sean Kelly
Jul 31, 2012
Minas
July 27, 2012
I'm using condition variables to implement a monitor that simulates the producer-consumer(unbounded buffer) scenario.

Note:
monitor is __gshared and is initialized in main(). Then the other threads are created and run.

There is one producer thread, and 5 consumer threads.

void producer()
{
	while( 1 )
	{
		monitor.produce();
	}
}

void consumer()
{
	while( 1 )
	{
		monitor.consume();
	}
}

class Monitor
{
	Cond cond;
	
	char[] buffer;
	ulong sz;
	
	this()
	{
		// necessary: The condition variable constructor requires a mutex passed to it
		cond = new Cond(new Mutex());
	}
	
	void produce()
	{
		// produce a letter a-z
		char c = 'a' + rand() % 26;
		writeln("* Producer: Produced a '", c, "' buffer.length = ", buffer.length, "\n");
		
		// put it into the buffer
		++buffer.length;
		buffer[buffer.length - 1] = c;
		
		//if( buffer.length > 1 )
		notify(cond); // calls condition.notify()
		
		//Thread.sleep(dur!"msecs"(1000));
	}
	
	void consume()
	{
		if( buffer.length == 0 )
		{
			writeln("put to sleep");
			cwait(cond); // calls Condition.wait()
		}
		
		// take
		char c = buffer[buffer.length-1];
		--buffer.length;
		
		writeln("# Consumer has taken a '", c, "' buffer = [", buffer, "] (", buffer.length, " elements)\n");
	}
}





The output is something like:
put to sleep
put to sleep
put to sleep
put to sleep
* Producer: Produced a 'n' buffer.length = 0

put to sleep
* Producer: Produced a 'w' buffer.length = 1

* Producer: Produced a 'l' buffer.length = 2

* Producer: Produced a 'r' buffer.length = 3

* Producer: Produced a 'b' buffer.length = 4

* Producer: Produced a 'b' buffer.length = 5

* Producer: Produced a 'm' buffer.length = 6

* Producer: Produced a 'q' buffer.length = 7

...

Even though the producer calls notify() when he finishes, none of the consumer threads is ever resumed... (I ran the program for a while and redirected output to a file. Then I searched for "#" that the consumer prints but nothing).

Why is that happening? Is there something wrong with Condition.notify()?

Also, is there a function that I can call to immediatly suspend the running thread?

Thanks
July 30, 2012
You need to lock the mutex to synchronize access to the shared data.

On Jul 27, 2012, at 2:31 PM, Minas Mina <minas_mina1990@hotmail.co.uk> wrote:
> 
> class Monitor
> {
> 	Cond cond;
           Mutex mutex;

> 
> 	char[] buffer;
> 	ulong sz;
> 
> 	this()
> 	{
> 		// necessary: The condition variable constructor requires a mutex passed to it
                   mutex = new Mutex;
                  cond = new Cond(mutex);

> 	}
> 
> 	void produce()
> 	{
> 		// produce a letter a-z
> 		char c = 'a' + rand() % 26;
> 		writeln("* Producer: Produced a '", c, "' buffer.length = ", buffer.length, "\n");

              synchronized(mutex) {
> 
> 		// put it into the buffer
> 		++buffer.length;
> 		buffer[buffer.length - 1] = c;
> 
> 		//if( buffer.length > 1 )
> 		notify(cond); // calls condition.notify()

           }
> 
> 		//Thread.sleep(dur!"msecs"(1000));
> 	}
> 
> 	void consume()
> 	{

            synchronized(mutex) {

                   // note changed if to while
> 		while( buffer.length == 0 )
> 		{
> 			writeln("put to sleep");
> 			cwait(cond); // calls Condition.wait()
> 		}
> 
> 		// take
> 		char c = buffer[buffer.length-1];
> 		--buffer.length;

            }

> Also, is there a function that I can call to immediatly suspend the running thread?

Thread.yield().  Though Cond.wait() will suspend the thread as well.
July 31, 2012
Thank you for your reply.

I have some more questions:

I simplified my example. Now my monitor is used for mutual exclusion, to understand how monitors work first.

// 3 threads are running P() - the main thread is not one of them

void P()
{
	while( run )
	{
		monitor.EnterCritical2();
		
		// critical section
		writeln(count);
		
		monitor.ExitCritical2();
	}
}

class Monitor
{
	Mutex mutex;
	Cond cond;
	bool in_use = false;
	
	this()
	{
		mutex = new Mutex();
		cond = new Cond(mutex);
	}
	
	void EnterCritical()
	{
		// The synchronized statement is required to ensure only one thread will be able
		// to access the block
		synchronized(mutex)
		{
			while( in_use )
				cwait(cond);	
			in_use = true;
			
			++count;
		}
	}
	
	void ExitCritical()
	{
		synchronized(mutex)
		{
			--count;
			
			in_use = false;
			cnotify(cond);
		}
	}
}

1) In class (in college), I have learned that monitors "by nature" grant access only to one thread in their functions. That's what synchronized(mutex) is used for right?

2) Why synchronized(mutex) and not synchronized(this) (I know this is not good practise, but is there something more?) or not synchronized(some else object)?

3) You correctly changed my "EnterCritical()" code to "While" instead of "if", because, and please correct me if I'm wrong, we are using notify() and not signal().

4) I wrote my own version of signal. It's:
void csignal(Condition cond)
{
	cond.notify();
	Thread.yield();
}

The logic is that it notifies another Thread and then forces a context switch, so it's like calling signal(). Is it correct?

I changed my EnterCritical() to use "if" instead of "while" because I use my own version of signal() in ExitCritical().

void EnterCritical2()
	{
		synchronized(mutex)
		{
			if( in_use )
				cwait(cond);
			in_use = true;
			
			++count;
		}
	}
	
	void ExitCritical2()
	{
		synchronized(mutex)
		{
			--count;
			
			in_use = false;
			csignal(cond);
		}
	}

However, when "count" is printed in P(), its value is 3 (it was 1 before which it ensured that mutual exclusion was correct). If I change the "if" to "while" it works, but that's not the purpose of signal(). I'm pretty sure that when using "signal" it's "if", not "while". What am I doing wrong?


5) Are the threads running concurrently on one core or in parallel (provided that the PC has more than one cores). Mine has 2 cores x 2 threads each.

Thank you.

PS: I'm writing some small programs that demonstrate concurrency in D. The reason is I suggested it to my university professor as a way to learn about this stuff in a practical manner. So I guess if he likes them, D will be tought in a university :)