Click here to Skip to main content
15,867,308 members
Articles / Desktop Programming / MFC

A Programming Model to Use a Thread Pool

Rate me:
Please Sign up or sign in to vote.
5.00/5 (9 votes)
30 Sep 2000 146.4K   4.8K   48   20
A class to manage the thread pool

Introduction

On many occasions, we need to utilize multiple threads to boost the system performance. The logic for each thread is almost the same, but we need to manage these threads. If the system is busy, we create more threads, otherwise we kill some thread to avoid the extra overhead.

I have done a couple of projects involving the multiple-thread management. At last, I decided to write a class to wrap this mechanism. This class can dynamically allocate threads and assign jobs to these worker threads. You derive your own classes and you do not need to know the underlying mechanism to handle the multiple threading and synchronization between these threads. However, you need to make your worker classes thread safe since your objects may be assigned to different threads each time.

The other thing I want to demonstrate is using the feature of IOCompletion Port. I found that it is amazingly easy and useful, especially when used as a way to transfer data between threads.

Usage

To use the thread pool class, you need to derive your worker class from IWorker and your job class from IJobDesc. The processing logic must be embedded within the member function IWorker::ProcessJob(IJobDesc* pJob). After you are finished, you can declare a thread pool like this:

C++
CThreadPool pool;
pool.Start(6, 10);
//do some other jobs here
pool.Stop();

The Start function has two parameters. The first argument is the minimum number of the worker threads this thread pool object should spawn. The second argument indicates the maximum number of worker threads within this thread pool. If the thread pool is very busy working on the assigned jobs, it will automatically spawn more worker threads. On the other hand, when the thread pool is idle, some threads will be removed from the pool. Fine-tune these two parameters to get the best performance.

To assign jobs to the thread pool for processing, simply call the function:

C++
pool.ProcessJob(pJob, pWorker);

You must make sure that your derived worker class is thread-safe since a worker instance may be on multiple threads simultaneously. You have no control as to whether the process is on the same thread as the last time or not.

Note

If the processing takes a very long time, when you call Stop(), the processing may not finished immediately. The Stop() function will wait for a maximum of 2 minutes and then return. This function has an optional argument. If this argument is set to true, the function will terminate these worker threads anyway. If this argument is set to false, these worker threads will not get terminated harshly and still live. Under this situation, you have to take care that the worker object may not exist after calling Stop() and you will get an access violation error if you attempt to access them.

The job object must be generated on the heap using new operator. After the process ends, it will automatically be deleted by the framework.

License

This article has no explicit license attached to it, but may contain usage terms in the article text or the download files themselves. If in doubt, please contact the author via the discussion board below.

A list of licenses authors might use can be found here.


Written By
United States United States
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions

 
GeneralA question about CCriticalSection Pin
SeanQA28-Oct-08 20:26
SeanQA28-Oct-08 20:26 
GeneralWhen the menber function "Stop" is used twice and more Pin
SeanQA28-Oct-08 20:24
SeanQA28-Oct-08 20:24 
GeneralComment on design... Pin
Nigel de Costa4-Sep-06 23:02
Nigel de Costa4-Sep-06 23:02 
GeneralRe: Comment on design... Pin
alexquisi27-Mar-07 23:01
alexquisi27-Mar-07 23:01 
Generalhave memory leaks! Pin
pcbirdwang3-Aug-05 14:43
pcbirdwang3-Aug-05 14:43 
GeneralATL Server provides its own thread pool class Pin
Alexander Gräf6-May-04 4:10
Alexander Gräf6-May-04 4:10 
GeneralRemoveThreads bug Pin
LAT26-Jul-03 9:44
LAT26-Jul-03 9:44 
GeneralRe: RemoveThreads bug Pin
LAT27-Jul-03 0:54
LAT27-Jul-03 0:54 
GeneralRe: RemoveThreads bug Pin
WREY27-Jun-04 6:46
WREY27-Jun-04 6:46 
GeneralLinker could not find "pthreadVC.lib" Pin
WREY29-Aug-02 23:12
WREY29-Aug-02 23:12 
GeneralBug fixed! Pin
Simon.W25-Aug-02 16:27
Simon.W25-Aug-02 16:27 
GeneralOther considerations Pin
Bill Wilson21-Nov-01 11:48
Bill Wilson21-Nov-01 11:48 
GeneralRe: Other considerations Pin
Bill Wilson6-Dec-01 5:46
Bill Wilson6-Dec-01 5:46 
Here is my worker thread class:

#include <afxmt.h>
#if !defined(AFX_WORKER_H__EC52A7FA_9663_11D5_9FC7_00B0D081C96F__INCLUDED_)
#define AFX_WORKER_H__EC52A7FA_9663_11D5_9FC7_00B0D081C96F__INCLUDED_

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000
#include "errorlog.h"
class CWorker : public CWinThread  
{

public:
	CWorker();
	virtual ~CWorker();

	BOOL m_bStopNow;

	CEvent m_Event;
//	CErrorLog *m_pErrorLog;
	DWORD m_dwThreadID;

	BOOL Doit();
	Run();
	void Start();
	void SetStop();
	BOOL IsBusy();
	void SetBusy(BOOL bIsBusy);
	void SetRequest(BSTR bstrRequest);
	BSTR GetRequest();
	void SetID(long id);
	long GetID();
	void SetIndex(int n);
	GetIndex();
	DWORD GetThreadID();
	BOOL NotNeeded();
	
//	void SetFunctionThread(DWORD dwThead);
//	DWORD GetFunctionThread();


	// Overrides
	
	BOOL InitInstance();
	BOOL ExitInstance();
protected:
	BSTR m_bstrRequest;
	BOOL m_bIsBusy;
	DWORD m_dwFunctionThread;
	long m_lRowID;
	int m_nThreadIndex;
};

#endif // !defined(AFX_WORKER_H__EC52A7FA_9663_11D5_9FC7_00B0D081C96F__INCLUDED_)



#include "stdafx.h"
// Include object header to get IIDs
#if defined(_APM_)
#include "APMObject.h"
#endif
#if defined(_OTCS_)
#include "otcsobject.h"
#endif
#if defined(_ODCS_)
#include "ODCSObject.h"
#endif
#if defined(_ODPS_)
#include "ODPSObject.h"
#endif

#include "Worker.h"
#include "errorinfo.h"
#include "errorlog.h"


#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif
int MessageArrived(BSTR bstrRequest,CWorker *parent);
extern int GetMax();
// Seperate tracing for Worker threads
#define TRACEW(msg) \
{					\
	CString strTraceW; \
	strTraceW.Format("Request: %d, %s",m_lRowID, msg); \
	TRACEX(strTraceW); \
}

	
#define DIAGW(sev, msg, sender) \
	TRACEW(msg);  \
	g_pErrorLog->LogError(sev, msg, sender); 
 
#define DIAGXW(msg) \
	DIAGW(2,msg,g_strSender);

//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////

CWorker::CWorker()
{
	// Initialize worker thread
	g_strSender = OBJECTNAME + "CWorker::CWorker()";
	SetBusy( FALSE);
	m_bStopNow = FALSE;
	m_lRowID = 0;


	DIAGX("Thread Created");
}

void CWorker::SetIndex(int index)
{
	// We want to know where we are in the array for diagnositic purposes
	m_nThreadIndex = index;
}
int CWorker::GetIndex()
{
	// Tell us where we are in the thread array
	return m_nThreadIndex;
}

DWORD CWorker::GetThreadID()
{
	// Get our thread id
	return m_dwThreadID;
}
CWorker::~CWorker()
{
	g_strSender = OBJECTNAME + "CWorker::~CWorker()";
	DIAGX("Thread destroyed");
}

BOOL CWorker::InitInstance()
{

	// Initilization
	g_strSender = OBJECTNAME + "CWorker::InitInstance()";
	CoInitializeEx(NULL,COINIT_MULTITHREADED );
	m_dwThreadID = GetCurrentThreadId();
	m_bStopNow = FALSE;
//	m_pErrorLog =new CErrorLog();
//	DIAGX("new CErrorLog created");



	return TRUE;
}

BOOL CWorker::ExitInstance()
{

	g_CS.Lock();
	// unload us from the thread array
	g_ThreadsArray.RemoveAt(m_nThreadIndex);
	// reduce index of all threads in list after this one
	for (int i = m_nThreadIndex; i < g_ThreadsArray.GetSize(); i++)
		g_ThreadsArray[i]->m_nThreadIndex--;
	g_CS.Unlock();
	CoUninitialize();
	return TRUE;
}

BOOL CWorker::IsBusy()
{
	// Tell them if we are busy or stopping
	return m_bIsBusy || m_bStopNow;
}

void CWorker::SetBusy(BOOL bIsBusy)
{
	// Set the busy switch
	m_bIsBusy = bIsBusy;

}

long CWorker::GetID()
{
	// Return the request id being serviced by this threaed
	long lRow =  m_lRowID;
	return lRow;
}

void CWorker::SetID(long lID)
{
//	Remember the request ID 

	m_lRowID = lID;


}
BSTR CWorker::GetRequest()
{
//	return the request
	BSTR bstr =  m_bstrRequest;

	return bstr;
}

void CWorker::SetRequest(BSTR bstrRequest)
{
	// Store the request
	m_bstrRequest = bstrRequest;

}

void CWorker::Start()
{
	// Start a thread
	g_strSender = OBJECTNAME + "CWorker::Start()";

	g_CS.Lock();	// Keep everybody locked out until the thread starts
	m_Event.SetEvent();
	DIAGX("CWorker started");
}
int CWorker::Run()
{

	// Entry point for create thread.
	g_strSender = OBJECTNAME + "CWorker::Run()";

	while (TRUE)
	{
		DIAGXW("Waiting");
		// Wait for event set by start
		DWORD dwWaitResult = WaitForSingleObject(m_Event,INFINITE);
		if (m_lRowID == 0) 	// If we don't have a row ID this is a false start.
		{
			Sleep(10);	// WaitForSingleObject didn't wait (Don't know why, happens occasionally, try again
			CString szErr;
			szErr.Format("WaitForSingleObject returned %d",dwWaitResult);
			DIAGW(2,szErr,"APMObject::Worker::Run()");
		} else {
			SetBusy(TRUE);
			m_Event.ResetEvent();
			g_CS.Unlock();	// Free the lock, let other threads start
			
			if (m_bStopNow) break;	// If the stop flag was set while we were asleep, terminate the thread by returning from this procedure

			// Service the request
			DIAGXW("Ready to execute");
			int iResult = Doit();

			CString szTemp;
			szTemp.Format("Finished function execution Request: %d, thread %x, Result: %d",
							m_lRowID, GetCurrentThreadId(), iResult);
			DIAGXW(szTemp);
			
			// Signal main thread this request is complete.
			::PostThreadMessage(_Module.dwThreadID, WM_MSG_WORKERCOMPLETE,GetID(), iResult);
			if (GetID() == 0)	// Debug purposes, removeit
				m_lRowID = 0;	
			szTemp.Format("Posted message Request: %d, thread %x, Result: %d, Stop = %d",
							m_lRowID, GetCurrentThreadId(), iResult, m_bStopNow);
			DIAGXW(szTemp);
			g_CS.Lock();
			if (NotNeeded())	// Check to se if this thread is still needed
			{
				SetBusy(TRUE);
				m_bStopNow = TRUE;
				DIAG(2,"NotNeeded return TRUE","APMObject::CWorker::Run()");
				g_CS.Unlock();
				break; // Let surplus threads termintate
			}
			g_CS.Unlock();
			m_lRowID = 0;	// clear the request id so we can tell whether the wait actually waited.
			SetBusy(FALSE);
		}

	}

	DIAGXW("Thread terminated");
	return 0;		// Exiting this procedure terminates the thread
	
}

BOOL CWorker::NotNeeded()
{

	return FALSE; // Malfunction if removing threads???

	// Determine whether there are more than enough threads.
	// If so, signal that this one is not needed anymore
	CString strDiag;
	strDiag.Format(_T("GetMax()=%d, g_ThreadsArray.GetSize()=%d"),GetMax(),g_ThreadsArray.GetSize());
	DIAG(2,strDiag,"CWorker::NotNeeded");

	// See if we are in excess of our maximum
	long lMax = GetMax();
	if(lMax != 0 && lMax < g_ThreadsArray.GetSize())
	{

		return TRUE; // Too many threads
	}
	
	// Check to see if we're down to what we started with
	if (g_ThreadsArray.GetSize() <= g_nInitial) return FALSE;

	// Check what percentage of threads are not in use
	int iTotalUnused = 0;

	for (int i=0; i<g_ThreadsArray.GetSize();i++)
	{
		
		if (!g_ThreadsArray[i]->IsBusy()) iTotalUnused++;
	}
	strDiag.Format("Total unused = %d",iTotalUnused);
	DIAG(2,strDiag,"CWorker::NotNeeded");

	// Thread is surplus if more than 20% are unused
	strDiag.Format("(iTotalUnused * 100) / g_ThreadsArray.GetSize() = %d",
					 (iTotalUnused * 100) / g_ThreadsArray.GetSize());
	DIAG(2,strDiag,"CWorker::NotNeeded");

	if ((iTotalUnused * 100) / g_ThreadsArray.GetSize() > 20) 
	{
		DIAG(2,"RETURN TRUE","CWorker::NotNeeded");
		return TRUE;
	}
	DIAG(2,"RETURN FALSE","CWorker::NotNeeded");
	return FALSE;

}

void CWorker::SetStop()
{
	m_bStopNow = TRUE;
	if (!IsBusy())
		m_Event.SetEvent();		// get the thread started so it can exit

}
// Application specific operation
BOOL CWorker::Doit()
{

	// Service request
	g_strSender = OBJECTNAME + "CWorker::Doit";
	CString strMsg;
	strMsg.Format("Starting Request: %d, on thread %x", GetID(), m_dwThreadID);
	DIAGXW(strMsg);
	
	// Pefform request specific logic 
	// MessageArrived is different for each service.
	return MessageArrived(m_bstrRequest, this);
}


WARNING: There may be something wrong with the way a thread is removed. This is still a work in progress. Sorry, I didn't have time to clean it up.


Bill
GeneralRe: Other considerations Pin
Bill Wilson6-Dec-01 6:05
Bill Wilson6-Dec-01 6:05 
GeneralThis is more like a hack than a clean solution Pin
zoly29-Jan-01 8:19
zoly29-Jan-01 8:19 
GeneralIOCP on Win16 based OSes Pin
Jim Murphy4-Oct-00 4:42
Jim Murphy4-Oct-00 4:42 
GeneralRe: IOCP on Win16 based OSes Pin
connex22-Apr-01 5:03
connex22-Apr-01 5:03 
QuestionIs there any benefit using 'struct' instead of 'class' to define the interface? Pin
tonyz26-Sep-00 8:36
tonyz26-Sep-00 8:36 
AnswerRe: Is there any benefit using 'struct' instead of 'class' to define the interface? Pin
Sherwood Hu26-Sep-00 10:51
Sherwood Hu26-Sep-00 10:51 
AnswerRe: Is there any benefit using 'struct' instead of 'class' to define the interface? Pin
Marius Cabas1-Oct-00 20:26
sussMarius Cabas1-Oct-00 20:26 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.