/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ #include #include #include #include #include ThreadPool::ThreadPool() { const uint32_t ThreadNum = std::thread::hardware_concurrency(); // create all threads waiting for (uint32_t i = 0; i < ThreadNum; ++i) { Threads.emplace_back(std::thread(&ThreadPool::spin, this)); } } // creates at maximum maxThreads threads, if 0 then use the maximum concurrency // possible ThreadPool::ThreadPool(uint32_t MaxThreads) { const uint32_t ThreadNum = (MaxThreads > 0) ? std::min(std::thread::hardware_concurrency(), MaxThreads) : std::thread::hardware_concurrency(); // create all threads waiting for (uint32_t i = 0; i < ThreadNum; ++i) { Threads.emplace_back(std::thread(&ThreadPool::spin, this)); } } int ThreadPool::numberOfThreads() { return Threads.size(); } void ThreadPool::spin() { // wait for a new job on the mutex, then run the job // comunicate that a job was finished after job is executed while (true) { std::function job; { std::unique_lock lock(QueueMutex); MutexCondition.wait(lock, [&] { return !Jobs.empty() || Terminate; }); if (Terminate) { break; } ++WorkingThreads; job = Jobs.front(); Jobs.pop(); } job(); { std::unique_lock lock(QueueMutex); --WorkingThreads; if (WorkingThreads == 0) { JobCondition.notify_all(); } } } } void ThreadPool::addJob(const std::function &job) { if (Terminated) { return; } { // add one job, sync with mutex std::unique_lock lock(QueueMutex); Jobs.push(job); } // notify that there is one job MutexCondition.notify_one(); } bool ThreadPool::waitEnd() { // waits on the condition until there are no more jobs and all threads are // idle, should return false if (Terminated) { return false; } { std::unique_lock lock(QueueMutex); JobCondition.wait(lock, [&] { return Jobs.empty() && WorkingThreads == 0; }); } return !Jobs.empty(); } void ThreadPool::Stop() { // joins all threads (lets them finish the current job) { std::unique_lock lock(QueueMutex); Terminate = true; } MutexCondition.notify_all(); for (std::thread &t : Threads) { t.join(); } Threads.clear(); Terminated = true; } ThreadPool::~ThreadPool() { if (!Terminated) { this->Stop(); } }