execq is kind of task-based approach of processing data using threadpool idea with extended features. It supports different task sources and maintains task execution in parallel on N threads (according to hardware concurrency).
- Providers are
queues
and streams
that allow to execute tasks in different ways queues
provide the ability to process the object just 'putting it to the queue' - Supports serial and concurrent
queues
- Work as you like: submitting an object to process on the
queue
returns nonblocking (unlike std::async
) std::future
that can be used to get result as far as the object is processed - Maintains optimal thread count to avoid excess CPU thread context switches
- Runs tasks from different
queues
/streams
'by turn', avoiding starvation for tasks that have been added very late - Designed to process multiple non-blocking task (generally, you do not want to sleep/wait inside task-processing function)
- C++11 compliant
Queues and Streams
execq
deals with concurrent task execution using two approaches: queue-based and stream-based.
You are free to use multiple queues and streams and any combinations of them!
1.1 Queue-Based Approach
Designed to process objects as 'push-and-forget'. Objects pushed into the queues are processed as soon as any thread is ready to handle it.
ExecutionQueue
combines usual queue, synchronization mechanisms and execution inside threadpool
.
Internally, ExecutionQueue
tracks tasks are being executed. If destroyed, the queue marks all running and pending tasks as 'canceled'. Even if task was canceled before execution, it wouldn't be discarded and will be called on its turn but with 'isCanceled' == true
.
ExecutionQueue
can be:
concurrent
: process objects in parallel on multiple threads // CreateConcurrentExecutionQueue
serial
: process objects strictly in 'one-after-one' order. You can be sure that no tasks are executed simultaneously // CreateSerialExecutionQueue
execq
allows to create 'IExecutionQueue
' (both serial and concurrent) instance to process your objects in specific IExecutionPool
.
IExecutionPool
is a kind of opaque threadpool
. The same IExecutionPool
object usually is used with multiple queues
and streams
.
Now that is no need to write your own queue and synchronization around it - all is done inside!
#include <execq/execq.h>
void ProcessObject(const std::atomic_bool& isCanceled, std::string&& object)
{
if (isCanceled)
{
std::cout << "Queue has been canceled. Skipping object...";
return;
}
std::cout << "Processing object: " << object << '\n';
}
int main(void)
{
std::shared_ptr<execq::IExecutionPool> pool = execq::CreateExecutionPool();
std::unique_ptr<execq::IExecutionQueue<void(std::string)>> queue =
execq::CreateConcurrentExecutionQueue<void, std::string>(pool, &ProcessObject);
queue->push("qwe");
queue->push("some string");
queue->push("");
return 0;
}
Standalone Serial Queue
Sometimes, you may need just single-thread implementation of the queue to process things in the right order. For this purpose, there is an ability to create pool-independent serial queue.
#include <execq/execq.h>
void ProcessObjectOneByOne(const std::atomic_bool& isCanceled, std::string&& object)
{
if (isCanceled)
{
std::cout << "Queue has been canceled. Skipping object...";
return;
}
std::cout << "Processing object: " << object << '\n';
}
int main(void)
{
std::unique_ptr<execq::IExecutionQueue<void(std::string)>> queue =
execq::CreateSerialExecutionQueue<void, std::string>(&ProcessObjectOneByOne);
queue->push("qwe");
queue->push("some string");
queue->push("");
return 0;
}
1.2 Queue-Based Approach: Future Inside!
All ExecutionQueue
s when pushing object into it return std::future
. Future object is bound to the pushed object and referers to result of object processing. Note: returned std::future
objects could be simply discarded. They wouldn't block in std::future
destructor.
#include <execq/execq.h>
size_t GetStringSize(const std::atomic_bool& isCanceled, std::string&& object)
{
if (isCanceled)
{
std::cout << "Queue has been canceled. Skipping object...";
return 0;
}
std::cout << "Processing object: " << object << '\n';
return object.size();
}
int main(void)
{
std::shared_ptr<execq::IExecutionPool> pool = execq::CreateExecutionPool();
std::unique_ptr<execq::IExecutionQueue<size_t(std::string)>> queue =
execq::CreateConcurrentExecutionQueue<size_t, std::string>(pool, &GetStringSize);
std::future<size_t> future1 = queue->push("qwe");
std::future<size_t> future2 = queue->push("some string");
std::future<size_t> future3 = queue->push("hello future");
const size_t totalSize = future1.get() + future2.get() + future3.get();
return 0;
}
execq
supports std::future
, so you can just wait until the object is processed.
2. Stream-Based Approach
Designed to process uncountable amount of tasks as fast as possible, i.e., process next task whenever new thread is available.
execq
allows to create 'IExecutionStream
' object that will execute your code each time the thread in the pool is ready to execute the next task. That approach should be considered as the most effective way to process unlimited (or almost unlimited) tasks.
#include <execq/execq.h>
void ProcessNextObject(const std::atomic_bool& isCanceled)
{
if (isCanceled)
{
std::cout << "Stream has been canceled. Skipping...";
return;
}
static std::atomic_int s_someObject { 0 };
const int nextObject = s_someObject++;
std::cout << "Processing object: " << nextObject << '\n';
}
int main(void)
{
std::shared_ptr<execq::IExecutionPool> pool = execq::CreateExecutionPool();
std::unique_ptr<execq::IExecutionStream> stream =
execq::CreateExecutionStream(pool, &ProcessNextObject);
stream->start();
sleep(5);
return 0;
}
Design Principles & Tech. Details
Consider to use single ExecutionPool
object (across the whole application) with multiple queues and streams. Combine queues and streams for free to achieve your goals. Be free to assign tasks to queue or operate stream even from the inside of its callback.
'By-Turn' Execution
execq
is designed in a special way of dealing with the tasks of queues and streams to avoid starvation.
Let's assume a simple example: there are 2 queues. First, 100 objects are pushed to queue #1. After 1 object is pushed to queue #2.
Now few tasks from queue #1 are being executed. But next task for execute will be the task from queue #2, and only then tasks from queue #1.
Avoiding Queue Starvation
Some tasks could be very time-consuming. That means they will block all pool threads execution for a long time. This causes i.e., starvation: none of the other queue tasks will be executed unless one of the existing tasks is done.
To prevent this, each queue and stream additionally has its own thread. This thread is some kind of 'insurance' thread, where the tasks from the queue/stream could be executed even if all pool's threads are busy for a long time.
Work To Be Done
- Replace using of
std::packaged_task
with reference counting
Tests
By default, unit-tests are off. To enable them, just add CMake
option -DEXECQ_TESTING_ENABLE=ON
History
- 27th September, 2019: Initial version