Click here to Skip to main content
15,887,676 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more: , +
I'm trying to implement my own dispatch queue to dispatch tasks in the same order they are added to a queue. The motivation for this is due to the lack of C++/CX support for dispatch queues for Windows runtime components. (https://msdn.microsoft.com/en-us/library/hh771042.aspx) is the documentation I've been considering. I understand that this is related to the [Producer-consumer problem](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem), but I haven't managed to solved it using C++/CX yet.

What I have tried:

Given this, I've tried several approaches to this problem.

According to several examples and documentation I found online, I tried using semaphores to solve this problem. For that, I created a DispatchQueue class that holds several attributes:

namespace Threading
    {
    	ref class DispatchQueue
    	{
    
    	private:
    		typedef IMap<String^, DispatchQueue^> MapType;
    		typedef std::function<void()> TaskType;
    		typedef std::queue<TaskType> QueueType;
    
    	internal:
    
    		static DispatchQueue^ Get();
    
    		void Add(TaskType task);
    
    	private:
    		TaskType DequeueTask();
    		void Runner();
    
    		QueueType & GetQueue();
    		const QueueType & GetQueue() const;
    
    	private:
    		QueueType _queue;
    		std::mutex _queue_mux;
    		std::mutex _add_mux;
    		HANDLE _emptySemaphore;
    		HANDLE _fullSemaphore;
    	};
    }


(Note: I'm not posting the actual code because the class has been largely iterated, and it's quite messy now. Let me know if you need other details.)

Here's how the semaphores are being initialized; at least that's the relevant part of the constructor:

Threading::DispatchQueue::DispatchQueue(String^ identifier)
    	: _queue(),_emptySemaphore(CreateSemaphore(NULL, 1, 1, NULL)), _fullSemaphore(CreateSemaphore(NULL, 0, 1, NULL))
    {
    }


Here's my producer:

void Threading::DispatchQueue::Add(TaskType task)
    {
    	// Prevent different threads from entering this method
        // Disregard the mutex being static
    	static std::mutex mux;
    	std::lock_guard<std::mutex> lock(mux);
    
    	WaitForSingleObject(_emptySemaphore, INFINITE);
    	{
    		// Prevent concorrent access to the queue
    		std::lock_guard<std::mutex> lock(_queue_mux);
    		GetQueue().push(task);
    	}
    	ReleaseSemaphore(_fullSemaphore, 1, NULL);
    }


Here's the consumer:

void Threading::DispatchQueue::Runner()
    {
    	while (true)
    	{
    		WaitForSingleObject(_fullSemaphore, INFINITE);
    
    		TaskType task = DequeueTask();
    
    		task();
    
    		if (!ReleaseSemaphore(
    			_emptySemaphore,
    			1,
    			NULL)) {
    		}
    	}
    }


I've also tried dispatching the task (`task();`) _after_ the semaphore has been released, but it didn't work either. It's also noticeable that `Runner` (the consumer) is started at an initialization point:

create_task([dispatchQueue]() {

			dispatchQueue->Runner();
		});


The problem with this approach is that calling task(); may also produce new tasks (TaskType), but this does not seem to be reentrant. As such, the implementation blocks. The lock happens because the task enters the "Add" method without having released the semaphore.

I've iterated and tried this producer instead:

void Threading::DispatchQueue::Add(TaskType task)
    {
        create_task([this, task]()
        {
        	WaitForSingleObject(_emptySemaphore, INFINITE);
        	{
        		std::lock_guard<std::mutex> lock(_queue_mux);
        		GetQueue().push(task);
        	}
        	ReleaseSemaphore(_fullSemaphore, 1, NULL);
        
        }).wait();   // Notice we're waiting
    }


The motivation behind that wait() is to guarantee that the tasks are inserted into the queue in the right order. This does work, but crashes if this method is called from the UI thread. This is not an option. Also, not waiting is not an option because, again, the tasks are not dispatched in the right order.

I've also tried (https://code.msdn.microsoft.com/windowsdesktop/Producer-Consumer-inter-cd30193b#content), which also blocks.

In short, I need a dispatch queue that respects the order of insertion without blocking the producing thread, as it can be main thread. Is there a fix for any of these approaches or any other approach you'd recommend?
Posted
Comments
KarstenK 7-Feb-17 4:03am    
I would create all tasks in waiting mode. And than another task is fetching them and when finished gets the next tasks started.
Member 11320435 7-Feb-17 8:07am    
What do you mean "create all tasks in waiting mode"? How do I do that?

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900