Click here to Skip to main content
15,886,919 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
Hi everyone,
I have try to improve my previous question, but the changes are a lot, then I open a new question for ask you a solution, an help for me.

I have to manage 5 threads that doing images elaborations. I want an execution like assembly line, in which single thread wait for available image from previous thread, work on it, and puts in output queue for next threads.

But these operations must doing in competition between every thread...like

After first threads release first image, secondo going to elaborate it, third wait second, and so on...not a sequential elaboration.

With your suggestions, I introduce in my code CONDITION VARIABLES...and critical section, to manage condition of "produced item" from the producer side of every thread...because in this program, every thread it's producer and consumer at the same time.

I post here my code and ask you to help me.

Actually situation is: program give different output every time I run it...threads don't doing all elaborations, or if doing all 10 elaboration (a fixed number just for test) doing it sequentially...first thread finish and second start elaboration and so on...

Why this happened? Please help me to fix this situation...

This is the code:

C++
#include <opencv\cv.h>
#include <opencv\highgui.h>
#include <opencv2\highgui\highgui.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <windows.h>
#include <process.h>
#include <queue>

using namespace std;
using namespace cv;

template<typename T>
class coda_concorr
{
private:
	std::queue<T> la_coda;
	HANDLE mutex;
	
public:	
	bool elemento;
	coda_concorr()
	{
		mutex = CreateMutex(NULL,FALSE,NULL);
		}
	~coda_concorr()
	{}
	void push(T& data)
	{
		WaitForSingleObject(mutex,INFINITE);
		la_coda.push(data);
		ReleaseMutex(mutex);
		}
	bool vuota() const
	{
		WaitForSingleObject(mutex,INFINITE);
		bool RetCode = la_coda.empty();
		ReleaseMutex(mutex);
		return RetCode;
	}
	bool try_pop(T& popped)
    {
		WaitForSingleObject(mutex,INFINITE);
		while (la_coda.empty()){
            ReleaseMutex(mutex);
			return false;
        }
		WaitForSingleObject(mutex,INFINITE);
		popped = la_coda.front();
        la_coda.pop();
        ReleaseMutex(mutex);
		return true;
    }
};


struct Args
{
	coda_concorr<cv::Mat> in;
	coda_concorr<cv::Mat> *out; //puntatore a coda successiva
};


CONDITION_VARIABLE NonVuoto1;
CONDITION_VARIABLE NonVuoto2;
CONDITION_VARIABLE NonVuoto3;
CONDITION_VARIABLE NonVuoto4;
CRITICAL_SECTION  Lock;

bool stop;

//initial populating of queue
void puts (void* param){
	Args* arg = (Args*)param;
	int i=0;
    Mat image;	
	while(true){		
		EnterCriticalSection(&Lock);
		while(!stop && !arg->in.vuota()){
		if(stop==true){
			LeaveCriticalSection(&Lock);
			break;
			}
		arg->in.try_pop(image);
		arg->out->push(image);
		i++;		
		LeaveCriticalSection(&Lock);
		WakeConditionVariable(&NonVuoto1);
		}
	}
	//fine
	LeaveCriticalSection(&Lock);
	cout<<endl<<"Thread (PUSH) terminato con "<<i<<" elaborazioni."<<endl;
	_endthread();
}

//greyscale
void grey (void *param){
	Mat temp1,temp2;
	int add = 0;
	Args* arg = (Args*)param;
	while(true){
		EnterCriticalSection(&Lock);
		//se vuoto
		while(arg->in.vuota() && !stop){
		     SleepConditionVariableCS(&NonVuoto1,&Lock,INFINITE);
			}
			if(stop==true){
			LeaveCriticalSection(&Lock);
			break;
			}
		arg->in.try_pop(temp1);
		cvtColor(temp1,temp2,CV_BGR2GRAY);
		arg->out->push(temp2);
		add++;		
		cout<<endl<<"grey ha fatto: "<<add<<endl;
		LeaveCriticalSection(&Lock);
		WakeConditionVariable(&NonVuoto2);
		}
	//fine	
	cout<<endl<<"Thread (GREY) terminato con "<<add<<" elaborazioni."<<endl;
	_endthread();
}

//threshold funct
void soglia(void *param){
	Mat temp1a,temp2a;
	int add=0;
	Args* arg = (Args*)param;
	while(true){
		EnterCriticalSection(&Lock);
		if(arg->in.vuota() && !stop){
			 SleepConditionVariableCS(&NonVuoto2,&Lock,INFINITE);
			}
		    if(stop==true){
			LeaveCriticalSection(&Lock);
			break;
			     }
		arg->in.try_pop(temp1a);
		threshold(temp1a,temp2a,128,255,THRESH_BINARY);
		arg->out->push(temp2a);
		add++;
		cout<<endl<<"soglia ha fatto: "<<add<<endl;
		LeaveCriticalSection(&Lock);
		WakeConditionVariable(&NonVuoto3);
		}
		//fine 	
	 cout<<endl<<"Thread (SOGLIA) terminato con "<<add<<" elaborazioni."<<endl;
	 _endthread();
}

//erose/dilate
void finitura(void *param){
	Mat temp1b,temp2b,temp2c;
	int add = 0;
	Args* arg = (Args*)param;
	//come consumatore
	while(true){
		EnterCriticalSection(&Lock);
		while(arg->in.vuota() && !stop){
			 SleepConditionVariableCS(&NonVuoto3,&Lock,INFINITE);
			}
		if(stop==TRUE){
			LeaveCriticalSection(&Lock);
			break;
			}	
		arg->in.try_pop(temp1b);
		erode(temp1b,temp2b,cv::Mat());
		dilate(temp2b,temp2c,Mat());
		arg->out->push(temp2c);
		add++;
		cout<<endl<<"erode ha fatto: "<<add<<endl;
		LeaveCriticalSection(&Lock);
		WakeConditionVariable(&NonVuoto4);
		}
	 //fine	
	 cout<<endl<<"Thread (ERODE) terminato con "<<add<<" elaborazioni."<<endl;
	_endthread();
}

//countour funct
void contorno (void *param){
	Mat temp;
	int add=0;
	Args* arg = (Args*)param;
	//come consumatore
	while(true){
		EnterCriticalSection(&Lock);
		while(arg->in.vuota() && stop == false){
			 SleepConditionVariableCS(&NonVuoto4,&Lock,INFINITE);
			}
		if(stop==TRUE){
			LeaveCriticalSection(&Lock);
			break;
			}	
	//esegue pop
	arg->in.try_pop(temp);
	//trova i contorni
	vector<vector<Point>> contorni;
	findContours(temp,contorni,CV_RETR_LIST, CV_CHAIN_APPROX_SIMPLE);
	//disegna i contoni in un'immagine
	Mat dst(temp.size(), CV_8UC3, Scalar(0,0,0));
	Scalar colors[3];
	colors[0] = Scalar(255,0,0);
	colors[1] = Scalar(0,255,0);
	colors[2] = Scalar(0,0,255);
	for (size_t idx = 0; idx < contorni.size(); idx++){
		drawContours(dst,contorni,idx,colors[idx %3]);
		}

	//come produttore
	arg->out->push(dst);
	add++;
	cout<<endl<<"cont ha fatto: "<<add<<endl;
	LeaveCriticalSection(&Lock);
	}
    cout<<endl<<"Thread (CONTOUR) terminato con "<<add<<" elaborazioni."<<endl;
   _endthread();
}

//main
int main()
{
	coda_concorr<cv::Mat> ingresso;
	coda_concorr<cv::Mat> uscita;


	InitializeConditionVariable(&NonVuoto1);
	InitializeConditionVariable(&NonVuoto2);
	InitializeConditionVariable(&NonVuoto3);
	InitializeConditionVariable(&NonVuoto4);
	InitializeCriticalSection(&Lock);

	
	//counter var
	LARGE_INTEGER count1, count2, freq;
	double elapsed;
	
        //just temp var for test use
	Mat temp[10];
	Mat out;
	
	//queues
	Args dati0,dati1,dati2,dati3,dati4;
	
	
	//start counter
	QueryPerformanceFrequency(&freq);	
	QueryPerformanceCounter (&count1);
		
	for(int i=0;i<9;i++){
		temp[i] = imread("C:/OPENCV/Test/imgtest/bird1.jpg",1);
		ingresso.push(temp[i]);
	}

	//next queue pointer
	dati0.in=ingresso;
	dati0.out=&dati1.in;
	dati1.out=&dati2.in;
	dati2.out=&dati3.in;
	dati3.out=&dati4.in;
	dati4.out=&uscita;	

	//handle
	HANDLE handle0,handle1,handle2,handle3,handle4;
	
	//threads
	handle0 = (HANDLE) _beginthread(puts,0,&dati0);
	handle1 = (HANDLE) _beginthread(grey,0,&dati1);
	handle2 = (HANDLE) _beginthread(soglia,0,&dati2);
	handle3 = (HANDLE) _beginthread(finitura,0,&dati3);
	handle4 = (HANDLE) _beginthread(contorno,0,&dati4);
	
	_putws(L"Press ENTER to stop elaboration...");
    getchar();
 
    EnterCriticalSection(&Lock);
    stop = true;
    LeaveCriticalSection(&Lock);
	
	//wakeup condition
	WakeAllConditionVariable(&NonVuoto1);
	WakeAllConditionVariable(&NonVuoto2);
	WakeAllConditionVariable(&NonVuoto3);
	WakeAllConditionVariable(&NonVuoto4);

	//join
	WaitForSingleObject(handle0,INFINITE);
	WaitForSingleObject(handle1,INFINITE);
	WaitForSingleObject(handle2,INFINITE);
	WaitForSingleObject(handle3,INFINITE);
	WaitForSingleObject(handle4,INFINITE);

	//close counter and output
	QueryPerformanceCounter (&count2);
	//calcolo tempo trascorso
	elapsed = (count2.QuadPart - count1.QuadPart) * 1000.0 / freq.QuadPart;
	cout <<endl<<"Tempo di esecuzione approssimativo: " <<elapsed<<" ms."<<endl;

	system("PAUSE");
	
return 0;
}



Please....help me...I'm going crazy...
Posted

1 solution

To be honest I haven't checked out all your code just check out what synchronization primitives are you using in coda_concorr and that is already suspicious. The same is true for your thread funcs that use ugly lockings and antipaterns like "try". You are not using the right synchronization primitives for sure. Depending on the circumstances your solution may have not only technical problems (bugs) but huge conceptual ones too. Lets say we overlook the conceptual problems of your solutions and we focus only on the bugs of your current solution. What you need here is a single producer single consumer blocking message queue class that you can put in between your threads. I guess coda_concorr wants to be that stuff. To give you some boost in your progress here is one such blocking queue that has several optimizations and uses the right synchronization primitives on windows:
C++
// single/multi producer, single consumer blocking queue without upper
// limit on queue size so producers never block!
// WARNING!!! UNTESTED CODE!!! PROVIDED AS IS WITHOUT ANY WARRANTY!!!
template <typename T>
class BlockingQueue
{
public:
    BlockingQueue()
    {
        m_AddedItemEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
        InitializeCriticalSection(&m_Lock);
        m_SwappedItemIndex = 0;
    }

    ~BlockingQueue()
    {
        assert(m_Items.empty() && m_SwappedItemIndex >= m_SwappedItems.size());
        CloseHandle(m_AddedItemEvent);
        DeleteCriticalSection(&m_Lock);
    }

    // Never blocks, can be called from any number of producer threads. Multiple producer threads
    // are allowed here just because I don't know any optimizations that would involve the
    // restriction of the number of threads for some performance gain... It works well with one
    // thread too so there is no problem.
    void Produce(const T& item)
    {
        EnterCriticalSection(&m_Lock);
        m_Items.push_back(item);
        SetEvent(m_AddedItemEvent);
        LeaveCriticalSection(&m_Lock);
    }

    // It may block, call it only from the consumer thread.
    void Consume(T& item)
    {
        // Several implementations possible here. This one exploits that we have only
        // one consumer thread so it is able to consume items in batch with less locking.
        // And of course this implementation may be buggy because I have just put it together.
        if (m_SwappedItemIndex >= m_SwappedItems.size())
        {
            m_SwappedItemIndex = 0;
            m_SwappedItems.clear();
            WaitForSingleObject(m_AddedItemEvent, INFINITE);
            EnterCriticalSection(&m_Lock);
            ResetEvent(m_AddedItemEvent);
            m_Items.swap(m_SwappedItems);
            LeaveCriticalSection(&m_Lock);
        }

        item = m_SwappedItems[m_SwappedItemIndex++];
    }

private:
    CRITICAL_SECTION m_Lock;
    HANDLE m_AddedItemEvent;
    std::vector<T> m_Items;
    // m_SwappedItems is accessed only by the consumer so we don't have to hold the lock.
    // After some time the size of allocated storage of m_Items and m_SwappedItems reaches
    // an upper bound and from that point the queue performs no allocation, just swapping
    // the data pointers in the two containers. Using simple array (vector) instead of
    // fancy containers like queue and list makes the locality of reference better
    // (less cache misses).
    std::vector<T> m_SwappedItems;
    size_t m_SwappedItemIndex;
};


Now we will take a look at the high level design of your solution to your problem. (I won't delve into the deatils and won't write here many books about multithreading). Facts: multithreading speeds up the execution of your program if the hardware supports multithreading and your program uses multithreading correctly. Making a program multithreading "just because" it said to speed up execution (and because its a fashion these days) has no point - in that case you are just making headaches for yourself and you sacrifice the sequential execution and deterministic behaviour and ease of debuggability of your program. (Of course in some cases you are creating background thread not to gain speed but to convert synchronous calls to async ones but that is another story...). Of course your problem maybe be quite well suited for multithreading but that should be debugged out since most of the heavy weighlifting will probably be done by opencv. You should write your program in a way so that it can be executed both sequentially (single threaded) and in multithread mode. Switching between the two modes can be compile time (with a #define) or runtime (with a bool variable). This is important because this way you can compare sequential execution time with the multithreaded one on a specific machine with specific number of cores (and optionally hyperthreads). Lets say you have a machine with 8 cores and your multithreaded version runs 6x of the speed of the sequential one, in this case you have done pretty well with multithreading! (I said only 6xspeed and not 8xspeed because the rest of the computation capacity has been spent on waiting on locks and on the management of common hardware resources between the cores (like memory and cache).) Another good thing about having both single and multithread version is that in my opinion this helps many people to organize and design more clear multithreaded code and forces you on the clear separation of individual jobs. In case of multithreading always think about separating your tasks into separate job classes that will be executed "on a thread somewhere" somehow and not about threads that randomly execute thread functions here and there. If your code is clear then the difference between the single/multi threaded version of your program will be only the central core logic that executes the jobs somehow somewhere that is only a little piece of code.

After the introduction lets talk a bit about your problem that is quite similar to a lot of other multithreading problems. You have items to process and processing consists of performing several operations (one after the other) on each item. Lets talk first about the single threaded version with no multithreading involved! Even in this case you have several choices because you have many items and many operations to perform on each item. One solution is taking one item and processing it by performing every operation on it one after the other. The other option is performing the first operation on every item, then performing the second operation on every item, and so on. This second solution may not be obvious at first, many people would start with the first option but the second is often better! The reason for this is that performing the first operation many times is faster than performing opeartion1, then opeartion2, and then operation3 and so on because the code of the same operation usually accesses more or less the same memory areas that results in less cache misses. Of course sometimes you can not afford that you process all items in one batch by performing operation1 on all items then operation2 on all items and so on because this means that all items will be ready almost all at once, sometimes you have to balance between the previously mentioned solutions by separating your items into smaller batches so first you execute operation1 in one smaller set of items, then you execute operation2 on this smaller set of items,... You can balance between the two if you need regular output by this system to be processed by another system but this is already optimization...

Multithreaded version: Your solution performs each operation in parallel on its own thread. We could make it more colorful by executing each operation by several threads, namely on a thread pool. The other way to the same problem (similarly to the single threaded version) is first performing operation1 on all items on a thread pool (with multiple threads) and then perofming operation2 an all items on a thread pool (usually the very same thread pool). This latter solution has several benefits, if not only benefits over the first version (like in case of the single threaded version). Performing only the same operation on multiple threads on multiple items results in less cache misses, better locality of reference and lets talk about the other advantages of this solution over the one that executes X number of operations on Y number of threads:

If you execute only one operation at a time on multiple threads by processing all of your items you can easily control the number of threads in the thread pool. Usually you want only one thread pool globally in your system and that thread pool usually contains only as many thread as the number of cores in your system (sometimes depending on the type of jobs its not a good idea to create thread for hyperthreads of your cores). All you do is putting together a lot of jobs (lets say 1000 jobs that perform operation 1 on all items) you feed it into the thread pool and sometime later the thread pool will process all of them in some random order. When all jobs are ready you put together another batch of jobs that perform operation2 on all items and then you feed it into the thread pool. Sometimes later all the jobs become ready. All you have is a master/main thread and several worker threads in your thread pool. Another thing I like about this solution is that making it single threaded is super easy: You create a thread pool that works this way: It has no worker threads, when the main thread calls the AddJobs() method of the thread pool the AddJobs() method executes all jobs immediately by the main thread and returns only after that! If you execute all operations in parallel like you do you have much less options to control the number of threads! You have to create at least as many threads as the number of operations, maybe if you have a lot of cores you can create more than one thread for some opeartions but thats all!
Lets continue the discussion of the execution of a "single operation on a single threadpool" solution:
The code involved in performing an operation will usually access "global" code and data that is used by the code of both operation1 and operation2 and operationX. These shared things can be global functions, global variables/objects. These all increase the number of cache misses. The most obvious shared data between these operations is the queue that connects two operations. If you use the better solution and execute only operation1 on a thread pool then the queue will only be written by many threads and then later it will only be read by another thread. The queue is still accessed by many threads but only its producer side, and maybe later only on its consumer side but usually the items are get out all at once by the main/coordinator thread! It never happens that both the producer and consuer side is accessed in parallel by threads! This allows for very nice optimizations in the queue!
When you put together a large number of jobs (lets say the jobs that execute operation1 on all items) and put them into the threadpool for processing (you do this from the main thread) then somehow you have to be able to wait on the main thread for the end of processing of all jobs by the threadpool. Putting this wait code into the threadpool is a bad idea, never complicate clean conceptual units like a thread pool with stupid operations like WaitForAll()!!! What I usually do is that I put a wait for all operation into my queue implementation that holds the items between two operations. I do the following on my main thread:

- preparing the queue between operation1 and operation2 to be able to receive produced items from operation1 (easy because currently no threads are accessing the queue)
- prepare jobs that perform operation1 on all items and putting it to the thread pool for processing
- waiting for the queue between operation1 and operation2 to receive all produced items
- getting out the results of operation1 from the queue (easy because currently no threads are accessing the queue)
- preparing the queue between operation2 and operation3 to be able to receive produced items from operation2 (easy because currently no threads are accessing the queue)
- prepare jobs (from the results we just got out from the queue) that perform operation2 on all items and putting it to the thread pool for processing
- ...

Here is something that looks like a queue I was talking about:
C++
// WARNING!!! UNTESTED CODE!!! PROVIDED AS IS WITHOUT ANY WARRANTY!!!
// This is just "illustration" of the optimized "queue" I was talking about.
template <typename T>
class ExampleQueue
{
public:
    ExampleQueue()
    {
        m_PrevIndex = -1;
        m_MaxIndex = 0;
        m_FinishedEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
    }

    ~ExampleQueue()
    {
        CloseHandle(m_FinishedEvent);
    }

    void PrepareForProduction(int max_items)
    {
        m_PrevIndex = -1;
        m_MaxIndex = (LONG)(max_items - 1);
        m_Items.resize((size_t)max_items);
        ResetEvent(m_FinishedEvent);
    }

    void Produce(const T& item)
    {
        LONG index = InterlockedIncrement(&m_PrevIndex);
        m_Items[(size_t)index] = item;
        if (index == m_MaxIndex)
            SetEvent(m_FinishedEvent);
    }

    void GetAllItems(std::vector<T>& items)
    {
        m_Items.swap(items);
        m_Items.clear();
    }

private:
    LONG m_PrevIndex;
    LONG m_MaxIndex;
    std::vector<T> m_Items;
    HANDLE m_FinishedEvent;
};


Another advice: make your multithreading code object oriented (lock, event, queue, thread, threadpool, job, jobgroup classes....).

EDIT: In multithreading usually there is no good solution. You have to profile your program and tweak usually the number of threads and the size of processed batches to perform well on a given hardware that has given amount of resources.

EDIT: A small example program about the usage of BlockingQueue plus I fixed the BlockingQueue implementation as it was buggy:
C++
class CJob
{
public:
    virtual void Execute() = 0;
};


BlockingQueue<CJob*> g_Queue1;
BlockingQueue<CJob*> g_Queue2;


class COperation2 : public CJob
{
public:
    virtual void Execute() override
    {
        printf("operation2\n");
        delete this;
    }
};

class COperation1 : public CJob
{
public:
    virtual void Execute() override
    {
        printf("operation1\n");
        g_Queue2.Produce(new COperation2);
        delete this;
    }
};


// a generic thread function for each thread
DWORD WINAPI ThreadFunc(LPVOID param)
{
    BlockingQueue<CJob*>& queue = *(BlockingQueue<CJob*>*)param;
    for (;;)
    {
        CJob* job;
        queue.Consume(job);
        // We signal the thread to exit by placing a NULL pointer in the queue.
        if (!job)
            break;
        job->Execute();
    }
    return 0;
}


void ManageThreads()
{
    DWORD thread_id;

    HANDLE thread1 = CreateThread(NULL, 0, ThreadFunc, &g_Queue1, 0, &thread_id);
    // we don't handle serious problems correctly, just catching them with assert in the debugger...
    assert(thread1);

    HANDLE thread2 = CreateThread(NULL, 0, ThreadFunc, &g_Queue2, 0, &thread_id);
    assert(thread2);

    g_Queue1.Produce(new COperation1);
    g_Queue1.Produce(new COperation1);
    g_Queue1.Produce(new COperation1);

    // asking the thread1 to exit and waiting for it....
    g_Queue1.Produce(NULL);
    WaitForSingleObject(thread1, INFINITE);

    // asking the thread2 to exit and waiting for it....
    // thread 1 has already placed the last job in the queue of thread2 so its safe to put in the NULL
    g_Queue2.Produce(NULL);
    WaitForSingleObject(thread2, INFINITE);
}



EDIT:
The exact same example as the previous but in an object oriented way without global variables:
C++
class Thread
{
public:
    Thread()
    {
        m_hThread = NULL;
    }
    ~Thread()
    {
        if (m_hThread)
        {
            assert(!IsRunning());
            CloseHandle(m_hThread);
        }
    }
    void Start()
    {
        assert(!IsRunning());
        DWORD thread_id;
        m_hThread = CreateThread(NULL, 0, StaticThreadProc, this, 0, &thread_id);
        assert(m_hThread);
    }
    void WaitForExit()
    {
        assert(m_hThread);
        WaitForSingleObject(m_hThread, INFINITE);
    }
protected:
    virtual void Run() = 0;
private:
    bool IsRunning()
    {
        return m_hThread && WaitForSingleObject(m_hThread, 0)==WAIT_TIMEOUT;
    }
    static DWORD WINAPI StaticThreadProc(LPVOID param)
    {
        Thread* thread = (Thread*)param;
        thread->Run();
        return 0;
    }
private:
    HANDLE m_hThread;
};


class CJob
{
public:
    virtual void Execute() = 0;
};


// A general purpose job executor worker thread.
class JobExecutorThread : public Thread
{
public:
    JobExecutorThread()
    {
        m_ExitRequested = false;
    }
    bool AddJob(CJob* job)
    {
        if (m_ExitRequested)
            return false;
        assert(job);
        m_JobQueue.Produce(job);
        return true;
    }
    void RequestExit()
    {
        assert(!m_ExitRequested);
        if (!m_ExitRequested)
            m_JobQueue.Produce(NULL);
    }
protected:
    virtual void Run() override
    {
        for (;;)
        {
            CJob* job;
            m_JobQueue.Consume(job);
            if (job)
                job->Execute();
            else
                break;
        }
    }
private:
    bool m_ExitRequested;
    BlockingQueue<CJob*> m_JobQueue;
};


class COperation2 : public CJob
{
public:
    virtual void Execute() override
    {
        printf("operation2\n");
        delete this;
    }
};

class COperation1 : public CJob
{
public:
    COperation1(JobExecutorThread* operation2_thread)
        : m_Operation2Thread(operation2_thread)
    {}
    virtual void Execute() override
    {
        printf("operation1\n");
        m_Operation2Thread->AddJob(new COperation2);
        delete this;
    }
private:
    JobExecutorThread* m_Operation2Thread;
};


void ManageThreads()
{
    JobExecutorThread operation1_thread;
    JobExecutorThread operation2_thread;
    operation1_thread.Start();
    operation2_thread.Start();

    operation1_thread.AddJob(new COperation1(&operation2_thread));
    operation1_thread.AddJob(new COperation1(&operation2_thread));
    operation1_thread.AddJob(new COperation1(&operation2_thread));

    // asking the thread1 to exit and waiting for it....
    operation1_thread.RequestExit();
    operation1_thread.WaitForExit();

    // asking the thread2 to exit and waiting for it....
    // thread 1 has already placed the last job in the queue of thread2 so its safe to put in the NULL
    operation2_thread.RequestExit();
    operation2_thread.WaitForExit();
}


The object oriented example may look a bit more but it contains no global variables and most of the boilerplate code is reusable code that would reside in a central library and reusability pays off well if you are using threading at more places in your program. Another benefit of the object oriented approach is that the platform specific stuff (thread creation, etc..) is wrapped into classes that can be ported easily to other platforms, you don't have to touch the core logic (the ManageThreads() function/method) the that coordinates the work of the threads.
 
Share this answer
 
v8
Comments
H.Brydon 7-Jul-13 14:39pm    
Zowee - +5 for the effort.
pasztorpisti 7-Jul-13 14:43pm    
Thank you! To be honest I had a free hour for doing nothing...
Domus1919 8-Jul-13 4:09am    
thanks for the patience...today I've read your answer...now I have to clear my mind..and hope find a solution with your suggestions...Thank you very much for your attention...This afternoon I try to do something...and let you know.
pasztorpisti 8-Jul-13 4:39am    
You are welcome. Updated the solution with a small blockingqueue example if you are going only for a hotfix with your current solution.
Domus1919 8-Jul-13 4:48am    
you're myth!! Thanks...This afternoon I'll try and let you know...

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900