RSS 2.0

Personal Info:

Joe Send mail to the author(s) is the lead developer and architect for Parallel Extensions to .NET, tinkers with type systems, and is an author and speaker.

Blogroll:
Other
News
 C|Net
 Kuro5hin
 The Register
Technology
 <?xmlhack?>
 Daily WTF
 DevX
 Hacknot
 Java Today
 Microsoft Top 10 Downloads
 MSDN
 MSDN: "Longhorn"
 MSDN: XML Developer Center
 Slashdot
 Techdirt
 theserverside.com
 W3C
 Web Pages That Suck
 XML Cover Pages
 XML Journal
 xml.com
Technology Blogs
 Aaron Skonnard [PluralSight]
 Adam Bosworth [Google]
 Andy Rich [MS/C++]
 Arpan Desai [MS/XML]
 BCL Team [MS]
 Bill Clementson [Lisp]
 Bill de hÓra
 Bruce Eckel [J]
 Bruce Tate [J]
 Casey Chestnut
 Cedric Beust [Google]
 Chris Anderson [MS/Avalon]
 Chris Lyon [MS]
 Christian Weyer
 Clemens Vasters [newtelligence]
 Craig Andera [PluralSight]
 Dan Sugalski [Parrot]
 Daniel Cazzulino
 Dave Chappel
 Dave Roberts [Lisp]
 Dave Thomas [PragProg]
 Dave Winer
 Dion Almaer [J]
 Don Demsak
 Doug Purdy [MS/Indigo]
 Drew Marsh
 Eric Gunnerson [MS]
 Eric Rudder [MS]
 Eric Sink
 Fritz Onion [PluaralSight]
 Gavin King [J/Hibernate]
 Grady Booch [IBM]
 Hervey Wilson [MS/Indigo]
 Hillel Cooperman [MS/Shell]
 Howard Lewis Ship [J/Apache]
 Ingo Rammer [PluralSight]
 James Gosling [J/Sun]
 James Strachan [J/Groovy]
 Jason Matusow [MS/OSS]
 Jeffrey Schlimmer [MS/Indigo]
 Joe Beda [Google]
 Joel Spoelsky
 Jon Udell
 Josh Ledgard [MS/Evang]
 Joshua Allen [MS]
 Lambda
 Larry Osterman [MS]
 Maoni Stephens [MS/CLR]
 Mark Fussell [MS/XML]
 Martin Fowler
 Martin Gudgin [MS/Indigo]
 Me
 Michael Howard [MS]
 Miguel de Icaza [Mono]
 Mike Clark
 Omri Gazitt [MS/Indigo]
 Pat Helland [MS/PAG]
 Pinku Surana
 Raymond Chen [MS]
 Rich Lander [MS/CLR]
 Rob Howard
 Rob Relyea [MS/Avalon]
 Robert Cringely
 S. Somasegar [MS/DevDiv]
 Sam Gentile
 Scoble [MS/Evang]
 Scott Guthrie [MS/WebNet]
 Scott Hanselman
 Sean McGrath [J]
 Simon Fell
 Stanley Lippman [MS/C++]
 Steve Maine
 Steve Swartz [MS/Indigo]
 Steve Vinoski
 Steven Clarke [MS/Usability]
 Stuart Halloway
 Ted Leung
 Ted Neward [DM]
 Tim Bray [Sun]
 Tim Ewald [Mindreef]
 Tim O'Reilly
 Werner Vogels [Amazon]
 Wintellect
 Yasser Shohoud [MS/Indigo]
Top 20
 Brad Abrams [MS/CLR]
 Chris Brumme [MS/CLR]
 Chris Sells [MS/Ultra]
 Cyrus Najmabadi [MS/C#]
 Dominic Cooney [MS/XAF]
 Don Box [MS/Ultra]
 Don Syme [MS/R]
 Guido van Rossum [Python]
 Herb Sutter [MS/C++]
 Ian Griffiths
 Jason Zander [MS/CLR]
 Jim Hugunin [MS/CLR]
 Joel Pobar [MS/CLR]
 Krzysztof Cwalina [MS/CLR]
 Patrick Logan
 Paul Graham
 Rico Mariani [MS/CLR]
 Rory Blyth [MS/DN]
 Sam Ruby
 Wesner Moise
VC/Business Blogs
 Ed Sim
 Fred Wilson
 Jonathan Schwartz [J/Sun]
 Lawrence Lessig [Stanford]
 Mark Cuban
 Michael Hyatt
 Pierre Omidyar
 Ross Mayfield
 VentureBlog
 Weekly Read
Wine, Food & Tea
 The Silk Road of Wine
 Vinography: a wine blog
 Wine Whys

Disclaimer:
The content of this site are my own personal opinions and do not represent my employer's view in anyway.

© 2008, Joe Duffy

 
 Wednesday, June 09, 2004

I've been toying around with C# iterators a bit more lately, particularly regarding non-mainstream applications. I discussed partial algorithm computation during a couple earlier posts (here and here), but now wish to turn to producer/consumer models. A producer/consumer model is one in which a single producer is responsible for generating items of interest, which are later processed by one or more consumers.

I really should do a bit more explanation, but for now I'll simply present some code examples with brief comments.

To wire up threaded producers and consumers, quite a bit of plumbing is required; thus, I have encapsulated it all into a couple common abstract classes:

public abstract class Producer<T>

{

 

      public Producer()

      {

            worker = new Thread(new ThreadStart(this.ProductionCycle));

      }

 

      private Queue<T> buffer = new Queue<T>();

 

      public Thread worker;

 

      private bool done;

 

      public bool Done

      {

            get

            {

                  return done;

            }

      }

 

      public IEnumerable<T> ConsumerChannel

      {

            get

            {

                  if (done)

                        throw new InvalidOperationException("Production is not currently active");

 

                  while (!done)

                  {

                        Nullable<T> consumed = new Nullable<T>();

 

                        //BUG: compiler crashes when using lock(...) construct within iterator

                        Monitor.Enter(buffer);

                        if (buffer.Count == 0)

                              Monitor.Wait(buffer);

                        if (buffer.Count > 0)

                              consumed = new Nullable<T>(buffer.Dequeue());

                        Monitor.Exit(buffer);

 

                        if (consumed.HasValue)

                              yield return consumed.Value;

                  }

 

                  yield break;

            }

      }

 

      public void BeginProduction()

      {

            done = false;

            worker.Start();

      }

 

      public void EndProduction()

      {

            done = true;

            lock (buffer)

            {

                  Monitor.PulseAll(buffer);

            }

      }

 

      private void ProductionCycle()

      {

            while (!done)

            {

                  T t = ProduceNext();

                  lock (buffer)

                  {

                        buffer.Enqueue(t);

                        Monitor.Pulse(buffer);

                  }

            }

      }

 

      protected abstract T ProduceNext();

 

}

 

public abstract class Consumer<T>

{

 

      public Consumer(Producer<T> producer)

      {

            this.producer = producer;

            worker = new Thread(new ThreadStart(this.ConsumerCycle));

      }

 

      private Producer<T> producer;

 

      public Thread worker;

 

      private bool done = false;

 

      public bool Done

      {

            get

            {

                  return done;

            }

      }

 

      public void BeginConsumption()

      {

            done = false;

            worker.Start();

      }

 

      public void EndConsumption()

      {

            done = true;

      }

 

      private void ConsumerCycle()

      {

            foreach (T t in producer.ConsumerChannel)

            {

                  Consume(t);

                  if (done)

                        break;

            }

      }

 

      protected abstract void Consume(T t);

 

}

 

(Note: I haven't spent enough time on the threading issues, and as such may have overlooked a thing or two. It's pretty messy as it stands, but it'll do for now...)

To create specific concrete classes from these, it's a matter of simply overriding a couple methods. For instance, the following producer generates a never-ending sequence of random numbers, while the consumer simply prints them out:

class RandomNumberProducer : Producer<int>

{

 

      public RandomNumberProducer() : base()

      {

            rand = new Random();

      }

 

      private Random rand;

 

      protected override int ProduceNext()

      {

            return rand.Next();

      }

 

}

 

class RandomNumberConsumer : Consumer<int>

{

 

      public RandomNumberConsumer(RandomNumberProducer p) : base(p)

      {

      }

 

      private static int counter = 0;

 

      private int id = ++counter;

 

      protected override void Consume(int t)

      {

            Console.Out.WriteLine("#{0}: consumed {1}", id, t);

      }

 

}

 

To wire them up, and kick off the cycles, the following test code does the trick:

 

RandomNumberProducer p = new RandomNumberProducer();

 

RandomNumberConsumer c1 = new RandomNumberConsumer(p);

RandomNumberConsumer c2 = new RandomNumberConsumer(p);

RandomNumberConsumer c3 = new RandomNumberConsumer(p);

 

p.BeginProduction();

 

c1.BeginConsumption();

c2.BeginConsumption();

c3.BeginConsumption();

 

Thread.Sleep(2500);

 

c3.EndConsumption();

c2.EndConsumption();

c1.EndConsumption();

 

p.EndProduction();

 

These examples probably don't quite illustrate the advantages of this approach very well. Obviously, using well factored base classes provides a lot of benefit, namely that they encapsulate and implement the threading and synchronization boilerplate. Interestingly, the iterators remove the need for the consumer to worry about any synchronization.

This means that the following simple consumer code is actually thread safe - even if there are other consumers wired up to the producer:

foreach (int i in p.ConsumerChannel)

{

      Console.Out.WriteLine("consumed: {0}", i);

}

In theory, the threading and blocking is all handled by the producer's iterator (although I question if my implementation is entirely correct).

Pretty nifty, if you ask me...

[I apologize for the hideous HTML for the code samples - I really wanted to retain the Visual Studio color scheme, and the only easy way I could find to accomplish this was to cut and paste into Word. Probably not an issue on most browsers, but just in case...]

 

Recent Entries:

Search:

Browse by Date:
<December 2008>
SunMonTueWedThuFriSat
30123456
78910111213
14151617181920
2122