From 53e0ef992d8d1d5c941adcd55f997030fbb38796 Mon Sep 17 00:00:00 2001 From: Jens Schmer Date: Fri, 17 Jan 2014 15:12:57 +0100 Subject: [PATCH 1/5] add wait method that blocks until all tasks are processed --- ThreadPool.h | 35 +++++++++++++++++++++++++++++++---- example.cpp | 16 +++++++++------- 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/ThreadPool.h b/ThreadPool.h index 0235c8b..e1a9617 100644 --- a/ThreadPool.h +++ b/ThreadPool.h @@ -18,6 +18,17 @@ class ThreadPool { auto enqueue(F&& f, Args&&... args) -> std::future::type>; ~ThreadPool(); + + // wait for the completion of all tasks + // (meaning that all worker are waiting for new tasks) + void wait() const { + std::unique_lock lock(work_done_mutex); + + // wait until all threads are done and tasks are empty + while (!(active_worker == 0 && tasks.empty())) + work_done_condition.wait(lock); + } + private: // need to keep track of threads so we can join them std::vector< std::thread > workers; @@ -28,11 +39,17 @@ class ThreadPool { std::mutex queue_mutex; std::condition_variable condition; bool stop; + + // waiting for completion + size_t active_worker; + mutable std::mutex work_done_mutex; + mutable std::condition_variable work_done_condition; }; // the constructor just launches some amount of workers inline ThreadPool::ThreadPool(size_t threads) - : stop(false) + : stop(false), + active_worker(threads) { for(size_t i = 0;i lock(this->queue_mutex); - while(!this->stop && this->tasks.empty()) - this->condition.wait(lock); + + --this->active_worker; + + while (!this->stop && this->tasks.empty()) { + this->work_done_condition.notify_one(); // signal that this thread is done + this->condition.wait(lock); // and wait for more tasks + } + if(this->stop && this->tasks.empty()) return; + + ++this->active_worker; + std::function task(this->tasks.front()); this->tasks.pop(); lock.unlock(); + task(); } } @@ -59,7 +86,7 @@ template auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future::type> { - typedef typename std::result_of::type return_type; + using return_type = std::result_of::type; // don't allow enqueueing after stopping the pool if(stop) diff --git a/example.cpp b/example.cpp index 66d6ab7..a67a8a7 100644 --- a/example.cpp +++ b/example.cpp @@ -6,11 +6,10 @@ int main() { - ThreadPool pool(4); - std::vector< std::future > results; + std::vector> results; - for(int i = 0; i < 8; ++i) { + for (int i = 0; i < 8; ++i) { results.push_back( pool.enqueue([i] { std::cout << "hello " << i << std::endl; @@ -19,11 +18,14 @@ int main() return i*i; }) ); - } - - for(size_t i = 0;i Date: Fri, 17 Jan 2014 15:23:35 +0100 Subject: [PATCH 2/5] default constructor launches a sane number of threads based on hardware capabilities --- ThreadPool.h | 95 ++++++++++++++++++++++++++-------------------------- 1 file changed, 48 insertions(+), 47 deletions(-) diff --git a/ThreadPool.h b/ThreadPool.h index e1a9617..4384b2a 100644 --- a/ThreadPool.h +++ b/ThreadPool.h @@ -15,26 +15,19 @@ class ThreadPool { public: ThreadPool(size_t); template - auto enqueue(F&& f, Args&&... args) - -> std::future::type>; + auto enqueue(F&& f, Args&&... args)->std::future::type>; ~ThreadPool(); // wait for the completion of all tasks // (meaning that all worker are waiting for new tasks) - void wait() const { - std::unique_lock lock(work_done_mutex); - - // wait until all threads are done and tasks are empty - while (!(active_worker == 0 && tasks.empty())) - work_done_condition.wait(lock); - } + void wait() const; private: // need to keep track of threads so we can join them std::vector< std::thread > workers; // the task queue std::queue< std::function > tasks; - + // synchronization std::mutex queue_mutex; std::condition_variable condition; @@ -45,57 +38,57 @@ class ThreadPool { mutable std::mutex work_done_mutex; mutable std::condition_variable work_done_condition; }; - + // the constructor just launches some amount of workers -inline ThreadPool::ThreadPool(size_t threads) - : stop(false), - active_worker(threads) +inline ThreadPool::ThreadPool(size_t threads = std::thread::hardware_concurrency()) +: stop(false), +active_worker(threads) { - for(size_t i = 0;i lock(this->queue_mutex); - - --this->active_worker; - - while (!this->stop && this->tasks.empty()) { - this->work_done_condition.notify_one(); // signal that this thread is done - this->condition.wait(lock); // and wait for more tasks - } - - if(this->stop && this->tasks.empty()) - return; - - ++this->active_worker; - - std::function task(this->tasks.front()); - this->tasks.pop(); - lock.unlock(); - - task(); - } + [this] + { + for (;;) + { + std::unique_lock lock(this->queue_mutex); + + --this->active_worker; + + while (!this->stop && this->tasks.empty()) { + this->work_done_condition.notify_one(); // signal that this thread is done + this->condition.wait(lock); // and wait for more tasks } - ); + + if (this->stop && this->tasks.empty()) + return; + + ++this->active_worker; + + std::function task(this->tasks.front()); + this->tasks.pop(); + lock.unlock(); + + task(); + } + } + ); } // add new work item to the pool template -auto ThreadPool::enqueue(F&& f, Args&&... args) - -> std::future::type> +auto ThreadPool::enqueue(F&& f, Args&&... args) +-> std::future::type> { using return_type = std::result_of::type; - + // don't allow enqueueing after stopping the pool - if(stop) + if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); auto task = std::make_shared< std::packaged_task >( - std::bind(std::forward(f), std::forward(args)...) + std::bind(std::forward(f), std::forward(args)...) ); - + std::future res = task->get_future(); { std::unique_lock lock(queue_mutex); @@ -105,6 +98,14 @@ auto ThreadPool::enqueue(F&& f, Args&&... args) return res; } +inline void ThreadPool::wait() const { + std::unique_lock lock(work_done_mutex); + + // wait until all threads are done and tasks are empty + while (!(active_worker == 0 && tasks.empty())) + work_done_condition.wait(lock); +} + // the destructor joins all threads inline ThreadPool::~ThreadPool() { @@ -113,7 +114,7 @@ inline ThreadPool::~ThreadPool() stop = true; } condition.notify_all(); - for(size_t i = 0;i Date: Fri, 17 Jan 2014 15:26:28 +0100 Subject: [PATCH 3/5] use atomic bool for stop flag --- ThreadPool.h | 54 ++++++++++++++++++++++++++-------------------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/ThreadPool.h b/ThreadPool.h index 4384b2a..6700252 100644 --- a/ThreadPool.h +++ b/ThreadPool.h @@ -10,14 +10,17 @@ #include #include #include +#include class ThreadPool { public: ThreadPool(size_t); - template - auto enqueue(F&& f, Args&&... args)->std::future::type>; ~ThreadPool(); + template + auto enqueue(F&& f, Args&&... args) + -> std::future::type>; + // wait for the completion of all tasks // (meaning that all worker are waiting for new tasks) void wait() const; @@ -31,7 +34,7 @@ class ThreadPool { // synchronization std::mutex queue_mutex; std::condition_variable condition; - bool stop; + std::atomic stop; // waiting for completion size_t active_worker; @@ -44,34 +47,34 @@ inline ThreadPool::ThreadPool(size_t threads = std::thread::hardware_concurrency : stop(false), active_worker(threads) { - for (size_t i = 0; i < threads; ++i) + for (size_t i = 0; i < threads; ++i) { workers.emplace_back( - [this] - { - for (;;) - { - std::unique_lock lock(this->queue_mutex); + [this] { + for (;;) + { + std::unique_lock lock(this->queue_mutex); - --this->active_worker; + --this->active_worker; - while (!this->stop && this->tasks.empty()) { - this->work_done_condition.notify_one(); // signal that this thread is done - this->condition.wait(lock); // and wait for more tasks - } + while (!this->stop && this->tasks.empty()) { + this->work_done_condition.notify_one(); // signal that this thread is done + this->condition.wait(lock); // and wait for more tasks + } - if (this->stop && this->tasks.empty()) - return; + if (this->stop && this->tasks.empty()) + return; - ++this->active_worker; + ++this->active_worker; - std::function task(this->tasks.front()); - this->tasks.pop(); - lock.unlock(); + std::function task(this->tasks.front()); + this->tasks.pop(); + lock.unlock(); - task(); - } + task(); + } + } + ); } - ); } // add new work item to the pool @@ -109,10 +112,7 @@ inline void ThreadPool::wait() const { // the destructor joins all threads inline ThreadPool::~ThreadPool() { - { - std::unique_lock lock(queue_mutex); - stop = true; - } + stop = true; condition.notify_all(); for (size_t i = 0; i < workers.size(); ++i) workers[i].join(); From c54c0a13afcf090bef31fe557b28f90754726eeb Mon Sep 17 00:00:00 2001 From: Jens Schmer Date: Fri, 17 Jan 2014 15:28:43 +0100 Subject: [PATCH 4/5] rename to hpp --- ThreadPool.h => ThreadPool.hpp | 8 ++++---- example.cpp | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) rename ThreadPool.h => ThreadPool.hpp (96%) diff --git a/ThreadPool.h b/ThreadPool.hpp similarity index 96% rename from ThreadPool.h rename to ThreadPool.hpp index 6700252..78c5b82 100644 --- a/ThreadPool.h +++ b/ThreadPool.hpp @@ -1,5 +1,5 @@ -#ifndef THREAD_POOL_H -#define THREAD_POOL_H +#ifndef THREAD_POOL_HPP +#define THREAD_POOL_HPP #include #include @@ -88,9 +88,9 @@ auto ThreadPool::enqueue(F&& f, Args&&... args) if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); - auto task = std::make_shared< std::packaged_task >( + auto task = std::make_shared>( std::bind(std::forward(f), std::forward(args)...) - ); + ); std::future res = task->get_future(); { diff --git a/example.cpp b/example.cpp index a67a8a7..cca960a 100644 --- a/example.cpp +++ b/example.cpp @@ -2,7 +2,7 @@ #include #include -#include "ThreadPool.h" +#include "ThreadPool.hpp" int main() { From 83a99205029ea63639ce0132ab8d57ef548ad6ef Mon Sep 17 00:00:00 2001 From: Jens Schmer Date: Fri, 17 Jan 2014 15:45:32 +0100 Subject: [PATCH 5/5] fix clang/gcc warning and error using std::atomic hangs in the destructor, waiting infinitely for a thread to terminate... --- ThreadPool.hpp | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/ThreadPool.hpp b/ThreadPool.hpp index 78c5b82..96d69f8 100644 --- a/ThreadPool.hpp +++ b/ThreadPool.hpp @@ -10,11 +10,10 @@ #include #include #include -#include class ThreadPool { public: - ThreadPool(size_t); + ThreadPool(size_t = std::thread::hardware_concurrency()); ~ThreadPool(); template @@ -34,7 +33,7 @@ class ThreadPool { // synchronization std::mutex queue_mutex; std::condition_variable condition; - std::atomic stop; + bool stop; // waiting for completion size_t active_worker; @@ -43,9 +42,9 @@ class ThreadPool { }; // the constructor just launches some amount of workers -inline ThreadPool::ThreadPool(size_t threads = std::thread::hardware_concurrency()) -: stop(false), -active_worker(threads) +inline ThreadPool::ThreadPool(size_t threads) + : stop(false), + active_worker(threads) { for (size_t i = 0; i < threads; ++i) { workers.emplace_back( @@ -82,7 +81,7 @@ template auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future::type> { - using return_type = std::result_of::type; + using return_type = typename std::result_of::type; // don't allow enqueueing after stopping the pool if (stop) @@ -112,7 +111,11 @@ inline void ThreadPool::wait() const { // the destructor joins all threads inline ThreadPool::~ThreadPool() { - stop = true; + { + std::unique_lock lock(this->queue_mutex); + stop = true; + } + condition.notify_all(); for (size_t i = 0; i < workers.size(); ++i) workers[i].join();