Another back to basics post for reference material
. I will follow this post with how to improve this basic pattern with .NET 4 objects.
In building a real time news aggregator which requires a producer – consumer queue to provide up to the second news feeds for a trading engine, I have updated my general pattern for .NET4. So this seemed like a good excuse for a post.
The pattern is used for allowing a multi-threaded execution of a set of tasks which are being produced, usually continuously and the tasks are discrete functional items that do need to interact with each other.
The idea is that the producer of the tasks simply loads the tasks as it produces them to a Queue. The Queue has a number of threaded workers that pull items of the queue and execute them as fast as they can.
The queue can be implemented using the .NET Queue class which is a first in, first out collection of items which we can load Action or Delegate types to.
The implementation of the Queue for this pattern is as follows:
public class PCQueue : IDisposable{private Thread[] _workers;Queue<Action> _itemQ = new Queue<Action>();public PCQueue(int workerCount){_workers = new Thread[workerCount];for(int i = 0;i<workerCount;i++){(_workers[i] = new Thread(Consume)).Start();}}/// <summary>/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources./// </summary>public void Dispose(){foreach (var thread in _workers){EnqueueItem(null);}}private void Consume(){while(true){Action item;lock (_itemQ){while (_itemQ.Count == 0){Monitor.Wait(_itemQ);}item = _itemQ.Dequeue();}if(item == null) { return; } item();}}public void EnqueueItem(Action Item){lock (_itemQ){_itemQ.Enqueue(Item);Monitor.Pulse(_itemQ);}}}
The PCQueue object is instantiated with the number of worker threads that we want the queue to use. Each worker thread is immediately started with the Consume() method of the queue.
The Consume() method enters an infinite loop which continually tries to ‘consume’ or execute the tasks on the Queue. When the Queue has no more items on it, the thread blocks and waits until it receives a signal (pulse) from the EnqueueItem method. The method also has a way of exiting the loop. When a null item is placed on the Queue, the loop with exit and the method with terminate which will also terminate the worker thread.
The EnqueueItem method, simply add a new item to the Queue and uses Pulse to signal to the next waiting thread to try to execute (dequeue) the next item.
The producer code can go something like this:
EventQueue.EnqueueItem(() => _service.UpdateListItem(update));
This line could be called in a timer or when stock data or in my case when a new news article is found.
Watch for the next post on .NET 4 improvements.
[...] Please first read Part one of the simple producer – consumer queue in .net4. [...]