View mode: basic / threaded / horizontal-split · Log in · Help
July 27, 2012
Implementing a Monitor
I'm using condition variables to implement a monitor that 
simulates the producer-consumer(unbounded buffer) scenario.

Note:
monitor is __gshared and is initialized in main(). Then the other 
threads are created and run.

There is one producer thread, and 5 consumer threads.

void producer()
{
	while( 1 )
	{
		monitor.produce();
	}
}

void consumer()
{
	while( 1 )
	{
		monitor.consume();
	}
}

class Monitor
{
	Cond cond;
	
	char[] buffer;
	ulong sz;
	
	this()
	{
		// necessary: The condition variable constructor requires a 
mutex passed to it
		cond = new Cond(new Mutex());
	}
	
	void produce()
	{
		// produce a letter a-z
		char c = 'a' + rand() % 26;
		writeln("* Producer: Produced a '", c, "' buffer.length = ", 
buffer.length, "\n");
		
		// put it into the buffer
		++buffer.length;
		buffer[buffer.length - 1] = c;
		
		//if( buffer.length > 1 )
		notify(cond); // calls condition.notify()
		
		//Thread.sleep(dur!"msecs"(1000));
	}
	
	void consume()
	{
		if( buffer.length == 0 )
		{
			writeln("put to sleep");
			cwait(cond); // calls Condition.wait()
		}
		
		// take
		char c = buffer[buffer.length-1];
		--buffer.length;
		
		writeln("# Consumer has taken a '", c, "' buffer = [", buffer, 
"] (", buffer.length, " elements)\n");
	}
}





The output is something like:
put to sleep
put to sleep
put to sleep
put to sleep
* Producer: Produced a 'n' buffer.length = 0

put to sleep
* Producer: Produced a 'w' buffer.length = 1

* Producer: Produced a 'l' buffer.length = 2

* Producer: Produced a 'r' buffer.length = 3

* Producer: Produced a 'b' buffer.length = 4

* Producer: Produced a 'b' buffer.length = 5

* Producer: Produced a 'm' buffer.length = 6

* Producer: Produced a 'q' buffer.length = 7

...

Even though the producer calls notify() when he finishes, none of 
the consumer threads is ever resumed... (I ran the program for a 
while and redirected output to a file. Then I searched for "#" 
that the consumer prints but nothing).

Why is that happening? Is there something wrong with 
Condition.notify()?

Also, is there a function that I can call to immediatly suspend 
the running thread?

Thanks
July 30, 2012
Re: Implementing a Monitor
You need to lock the mutex to synchronize access to the shared data.

On Jul 27, 2012, at 2:31 PM, Minas Mina <minas_mina1990@hotmail.co.uk> wrote:
> 
> class Monitor
> {
> 	Cond cond;
          Mutex mutex;

> 	
> 	char[] buffer;
> 	ulong sz;
> 	
> 	this()
> 	{
> 		// necessary: The condition variable constructor requires a mutex passed to it
                  mutex = new Mutex;
                 cond = new Cond(mutex);

> 	}
> 	
> 	void produce()
> 	{
> 		// produce a letter a-z
> 		char c = 'a' + rand() % 26;
> 		writeln("* Producer: Produced a '", c, "' buffer.length = ", buffer.length, "\n");

             synchronized(mutex) {
> 		
> 		// put it into the buffer
> 		++buffer.length;
> 		buffer[buffer.length - 1] = c;
> 		
> 		//if( buffer.length > 1 )
> 		notify(cond); // calls condition.notify()

          }
> 		
> 		//Thread.sleep(dur!"msecs"(1000));
> 	}
> 	
> 	void consume()
> 	{

           synchronized(mutex) {

                  // note changed if to while
> 		while( buffer.length == 0 )
> 		{
> 			writeln("put to sleep");
> 			cwait(cond); // calls Condition.wait()
> 		}
> 		
> 		// take
> 		char c = buffer[buffer.length-1];
> 		--buffer.length;

           }

> Also, is there a function that I can call to immediatly suspend the running thread?

Thread.yield().  Though Cond.wait() will suspend the thread as well.
July 31, 2012
Re: Implementing a Monitor
Thank you for your reply.

I have some more questions:

I simplified my example. Now my monitor is used for mutual 
exclusion, to understand how monitors work first.

// 3 threads are running P() - the main thread is not one of them

void P()
{
	while( run )
	{
		monitor.EnterCritical2();
		
		// critical section
		writeln(count);
		
		monitor.ExitCritical2();
	}
}

class Monitor
{
	Mutex mutex;
	Cond cond;
	bool in_use = false;
	
	this()
	{
		mutex = new Mutex();
		cond = new Cond(mutex);
	}
	
	void EnterCritical()
	{
		// The synchronized statement is required to ensure only one 
thread will be able
		// to access the block
		synchronized(mutex)
		{
			while( in_use )
				cwait(cond);	
			in_use = true;
			
			++count;
		}
	}
	
	void ExitCritical()
	{
		synchronized(mutex)
		{
			--count;
			
			in_use = false;
			cnotify(cond);
		}
	}
}

1) In class (in college), I have learned that monitors "by 
nature" grant access only to one thread in their functions. 
That's what synchronized(mutex) is used for right?

2) Why synchronized(mutex) and not synchronized(this) (I know 
this is not good practise, but is there something more?) or not 
synchronized(some else object)?

3) You correctly changed my "EnterCritical()" code to "While" 
instead of "if", because, and please correct me if I'm wrong, we 
are using notify() and not signal().

4) I wrote my own version of signal. It's:
void csignal(Condition cond)
{
	cond.notify();
	Thread.yield();
}

The logic is that it notifies another Thread and then forces a 
context switch, so it's like calling signal(). Is it correct?

I changed my EnterCritical() to use "if" instead of "while" 
because I use my own version of signal() in ExitCritical().

void EnterCritical2()
	{
		synchronized(mutex)
		{
			if( in_use )
				cwait(cond);
			in_use = true;
			
			++count;
		}
	}
	
	void ExitCritical2()
	{
		synchronized(mutex)
		{
			--count;
			
			in_use = false;
			csignal(cond);
		}
	}

However, when "count" is printed in P(), its value is 3 (it was 1 
before which it ensured that mutual exclusion was correct). If I 
change the "if" to "while" it works, but that's not the purpose 
of signal(). I'm pretty sure that when using "signal" it's "if", 
not "while". What am I doing wrong?


5) Are the threads running concurrently on one core or in 
parallel (provided that the PC has more than one cores). Mine has 
2 cores x 2 threads each.

Thank you.

PS: I'm writing some small programs that demonstrate concurrency 
in D. The reason is I suggested it to my university professor as 
a way to learn about this stuff in a practical manner. So I guess 
if he likes them, D will be tought in a university :)
Top | Discussion index | About this forum | D home