Click here to Skip to main content
15,997,284 members
Articles / Programming Languages / C#
Article

A Worker Thread Class For Processing Work Units

Rate me:
Please Sign up or sign in to vote.
4.82/5 (25 votes)
13 Sep 20064 min read 70.4K   735   59   8
Processing work units in a single thread instead of using a thread pool.

Introduction

There are many examples on CodeProject illustrating using thread pooling to manage work or job queues. These all assign the work item to a free thread, and thus the work is done concurrently. I, instead, needed the ability to queue work items that would be processed in a single thread. The typical scenario that I'm using this class for is to process activities that must occur on the WinForm main application thread, such as updating UI elements. Concurrent processing serves no purpose since using Invoke blocks until the main application thread becomes idle.

Architecture

The following sections describe some of the architectural decisions I made in the implementation of this class.

Generics

The class, ProcessingQueue<T>, is a generic class, allowing you to specify the work type as either a value or reference type, for example:

C#
processQueue = new ProcessingQueue<int>();

Of course, this means that each ProcessingQueue<T> instance is restricted to one type of work item, which suits me.

Events and Overrides

There are two events that are called from overridable methods to do the work and handle work exceptions.

Doing the Work

To do the actual work, I implemented the event DoWork, which is called for each work item in the queue. If you would prefer not to use events, you can derive a class from ProcessingQueue<T> and override the OnDoWork method. The work item is contained in the ProcessingQueueEventArgs<T> class.

Handling Exceptions

Exceptions that occur in the worker code (your application) are usually silently caught by a worker thread. To help expose exceptions to the application, you can use the WorkException event, or you can override the OnWorkException method in your own derived class. Both of these pass a ProcessingQueueExceptionEventArgs instance that wraps the Exception instance.

EventWaitHandle vs. Semaphores

I considered using semaphores for this implementation because you can release the semaphore for each work item in the queue, and the thread, which implements WaitOne, will release for the total release counts in the semaphore. However, a semaphore requires a maximum release count, because it's actually designed to release multiple threads, not a single thread. And if you release more than the maximum release count, the semaphore throws an exception, which isn't what I wanted. Since the queue depth is unknown, I didn't want to hardcode some arbitrary upper limit to the semaphore's maximum release count. It's the wrong tool for the job, basically.

So, I chose the simpler EventWaitHandle using automatic reset. The concern here is that work may be queued while a work item is being processed. While this signals the wait event, it does so only once (there's no release count like in a Semaphore), so the worker thread has to process all the work currently in the queue, which also means it needs to check if it was signaled by having work put into the queue, which it processed, and therefore the queue is now empty.

So, the code got complicated enough that I figured a nice generic class to support this feature would be useful, and hence this article. I still can't believe there isn't something similar already here on Code Project. Maybe it's too simple!

Usage

The following sections describe the usage.

Constructor

To create a ProcessingQueue<T> instance for a specific work type, instantiate the class:

C#
processQueue = new ProcessingQueue<int>();

Events

Wire up the work event to a method that will perform the work on the work item, and wire up the exception event if you so desire:

C#
processQueue.DoWork += new ProcessingQueue<int>.DoWorkDlgt(OnDoWork);
processQueue.WorkException += 
     new ProcessingQueue<int>.WorkExceptionDlgt(OnWorkException);

Queuing Work

Queuing work is straightforward--call the QueueForWork method:

C#
processQueue.QueueForWork(1);

Stopping the Thread

To exit the thread waiting for work, call the Stop method:

C#
processQueue.Stop();

This is a non-blocking call, and it will also finish any remaining work in the queue before the work thread terminates.

Implementation

Here's the worker thread. It should be pretty straightforward from the comments and my description above as to what's going on.

C#
protected void ProcessQueueWork()
{
  while (!stop)
  {
    // Wait for some work.
    waitProcess.WaitOne();
    bool haveWork;

    // Finish remaining work before stopping.
    do
    {
      // Initialize to the default work value.
      T work = default(T);
      // Assume no work.
      haveWork = false;<BR>
      // Prevent enqueing from a different thread.
      lock (workQueue)
      {
        // Do we have work? This might be 0 if stopping or if all 
        // work is processed.
        if (workQueue.Count > 0)
        {
          // Get the work.
          work = workQueue.Dequeue();
          // Yes, we have work.
          haveWork = true;
        }
      }

      // If we have work...
      if (haveWork)
      {
        try
        {
          // Try processing it.
          OnDoWork(new ProcessingQueueEventArgs<T>(work));
        }
        catch (Exception e)
        {
          // Oops, inform application of a work error.
          OnWorkException(new ProcessingQueueExceptionEventArgs(e));
        }
      }

    } while (haveWork); // continue processing if there was work.
  }
}

Unit Tests

I wrote a couple really simple unit tests to verify the functionality, but certainly isn't a rigorous test:

C#
[TestFixture]
public class ProcessThreadTests
{
  protected ProcessingQueue<int> processQueue;
  protected bool exceptionRaised;
  protected bool workRaised;

  [TestFixtureSetUp]
  public void FixtureSetup()
  {
    processQueue = new ProcessingQueue<int>();
    processQueue.DoWork += new ProcessingQueue<int>.DoWorkDlgt(OnDoWork);
    processQueue.WorkException += 
         new ProcessingQueue<int>.WorkExceptionDlgt(OnWorkException);
  }

  [Test]
  public void QueueWorkTest()
  {
    processQueue.QueueForWork(1);
    while (!workRaised) { }
  }

  [Test]
  public void WorkExceptionTest()
  {
    processQueue.QueueForWork(2);
    while (!exceptionRaised) { }
  }

  void OnDoWork(object sender, ProcessingQueueEventArgs<int> args)
  {
    switch (args.Work)
    {
      case 1:
        workRaised = true;
        break;
      case 2:
        throw new ApplicationException("Exception");
    }
  }

  void OnWorkException(object sender, ProcessingQueueExceptionEventArgs args)
  {
    exceptionRaised = true;
  }
}

Conclusion

Hopefully, you will find this class useful and my implementation without error!

History

Updated on 9/13 as I discovered I wasn't clearing a flag, and the code never returned to the WaitOne instruction!  Fixing this also eliminated one of the flags.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here


Written By
Architect Interacx
United States United States
Blog: https://marcclifton.wordpress.com/
Home Page: http://www.marcclifton.com
Research: http://www.higherorderprogramming.com/
GitHub: https://github.com/cliftonm

All my life I have been passionate about architecture / software design, as this is the cornerstone to a maintainable and extensible application. As such, I have enjoyed exploring some crazy ideas and discovering that they are not so crazy after all. I also love writing about my ideas and seeing the community response. As a consultant, I've enjoyed working in a wide range of industries such as aerospace, boatyard management, remote sensing, emergency services / data management, and casino operations. I've done a variety of pro-bono work non-profit organizations related to nature conservancy, drug recovery and women's health.

Comments and Discussions

 
Generalc# v1.1 Pin
toffee4421-Mar-07 20:49
toffee4421-Mar-07 20:49 
QuestionHave you taken a look at Microsoft's CCR ? Pin
Sebastien Lorion19-Sep-06 10:50
Sebastien Lorion19-Sep-06 10:50 
GeneralQuestions Pin
Peter Ritchie18-Sep-06 8:02
Peter Ritchie18-Sep-06 8:02 
I'm a little confused. You say that you're typically expecting this class to be used for processing activities on a Form's thread (WinForm main application thread); but the only processing for the queue is done in a new thread (which can't be the Form's thread). You make no attempt to ensure that the DoWork event is executed on your UI thread either.

If by "Invoke" you mean Control.Invoke in your comment "concurrent processing serves no purpose since Invoke blocks until the main application thread becomes idle", your comment is nonsensical. Control.Invoke does not use multiple threads and is therefore not concurrent.

You also have no way to tell if the queue is still being processed, which means the objects connected to the OnWork delegates stored in your ProcessingQueue class cannot be guarded against disposal while the thread is still running. Running a delegate on a disposed object is not a wise idea.





GeneralRe: Questions Pin
Marc Clifton18-Sep-06 9:51
mvaMarc Clifton18-Sep-06 9:51 
GeneralRe: Questions Pin
Peter Ritchie18-Sep-06 10:30
Peter Ritchie18-Sep-06 10:30 
GeneralRe: Questions Pin
Marc Clifton18-Sep-06 13:00
mvaMarc Clifton18-Sep-06 13:00 
GeneralRe: Questions Pin
Peter Ritchie18-Sep-06 14:19
Peter Ritchie18-Sep-06 14:19 
GeneralRe: Questions Pin
Marc Clifton19-Sep-06 12:20
mvaMarc Clifton19-Sep-06 12:20 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.