Click here to Skip to main content
15,886,199 members
Articles / General Programming / Threads
Tip/Trick

Advanced Thread Pool

Rate me:
Please Sign up or sign in to vote.
5.00/5 (1 vote)
14 Mar 2019CPOL 3K  
Advanced thread pool

Below is my implementation of the thread pool described in this talk and a benchmark comparing it against my simple thread pool implementation. The advanced pool is 15x faster at scheduling and dispatching short random length work items on my 2018 MacBook Pro with i5 CPU and 4 logical cores. It uses a queue per worker thread and a work stealing dispatcher. It tries to enqueue the work items onto a queue that is not currently locked by a dispatch thread. It also tries to steal work from other unblocked queues. As always, the complete implementation is available at GitHub.

Benchmark Program

C++
#include <iostream>
#include <chrono>
#include <cstdlib>
#include "pool.h"
using namespace std;
using namespace chrono;

const unsigned int COUNT = 10'000'000;
const unsigned int REPS = 10;

int main()
{
	srand(0);
	auto start = high_resolution_clock::now();
	{
		simple_thread_pool tp;
		for(int i = 0; i < COUNT; ++i)
			tp.enqueue_work([i]() {
				int x;
				int reps = REPS + (REPS * (rand() % 5));
				for(int n = 0; n < reps; ++n)
					x = i + rand();
			});
	}
	auto end = high_resolution_clock::now();
	auto duration = duration_cast<milliseconds>(end - start);
	cout << "simple_thread_pool duration = " << duration.count() / 1000.f << " s" << endl;

	srand(0);
	start = high_resolution_clock::now();
	{
		thread_pool tp;
		for(int i = 0; i < COUNT; ++i)
			tp.enqueue_work([i]() {
				int x;
				int reps = REPS + (REPS * (rand() % 5));
				for(int n = 0; n < reps; ++n)
					x = i + rand();
			});
	}
	end = high_resolution_clock::now();
	duration = duration_cast<milliseconds>(end - start);
	cout << "thread_pool duration = " << duration.count() / 1000.f << " s" << endl;
}

Program output:

* Apple CLANG -Ofast -march=native -std=c++17 -lc++

simple_thread_pool duration = 30.337 s
thread_pool duration = 1.625 s

* LLVM -Ofast -march=native -std=c++17 -lc++

simple_thread_pool duration = 25.785 s
thread_pool duration = 1.615 s

* G++ -Ofast -march=native -std=c++17 -lstdc++ *

simple_thread_pool duration = 26.28 s
thread_pool duration = 1.614 s

thread_pool Class

C++
class thread_pool
{
public:
	thread_pool(unsigned int threads = std::thread::hardware_concurrency())
	: m_queues(threads), m_count(threads)
	{
		assert(threads != 0);
		auto worker = [&](unsigned int i)
		{
			while(true)
			{
				Proc f;
				for(unsigned int n = 0; n < m_count; ++n)
					if(m_queues[(i + n) % m_count].try_pop(f)) break;
				if(!f && !m_queues[i].pop(f)) break;
				f();
			}
		};
		for(unsigned int i = 0; i < threads; ++i)
			m_threads.emplace_back(worker, i);
	}

	~thread_pool() noexcept
	{
		for(auto& queue : m_queues)
			queue.done();
		for(auto& thread : m_threads)
			thread.join();
	}

	template<typename F, typename... Args>
	void enqueue_work(F&& f, Args&&... args)
	{
		auto work = [f,args...]() { f(args...); };
		unsigned int i = m_index++;
		for(unsigned int n = 0; n < m_count * K; ++n)
			if(m_queues[(i + n) % m_count].try_push(work)) return;
		m_queues[i % m_count].push(work);
	}

	template<typename F, typename... Args>
	auto enqueue_task(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
	{
		using return_type = typename std::result_of<F(Args...)>::type;
		auto task = std::make_shared<std::packaged_task<return_type()>>
                         (std::bind(std::forward<F>(f), std::forward<Args>(args)...));
		std::future<return_type> res = task->get_future();

		auto work = [task](){ (*task)(); };
		unsigned int i = m_index++;
		for(unsigned int n = 0; n < m_count * K; ++n)
			if(m_queues[(i + n) % m_count].try_push(work)) return res;
		m_queues[i % m_count].push(work);

		return res;
	}

private:
	using Proc = std::function<void(void)>;
	using Queues = std::vector<simple_blocking_queue<Proc>>;
	Queues m_queues;

	using Threads = std::vector<std::thread>;
	Threads m_threads;

	const unsigned int m_count;
	std::atomic_uint m_index = 0;

	inline static const unsigned int K = 3;
};
This article was originally posted at https://vorbrodt.blog/2019/02/27/advanced-thread-pool

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
United States United States
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions

 
-- There are no messages in this forum --