Files
stencilparallelpattern/threadPool.cpp

103 lines
2.8 KiB
C++
Raw Permalink Normal View History

2023-08-24 19:46:27 +02:00
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#include <threadPool.hpp>
#include <cstdint>
#include <functional>
#include <mutex>
#include <thread>
ThreadPool::ThreadPool() {
const uint32_t ThreadNum = std::thread::hardware_concurrency();
// create all threads waiting
for (uint32_t i = 0; i < ThreadNum; ++i) {
Threads.emplace_back(std::thread(&ThreadPool::spin, this));
}
}
// creates at maximum maxThreads threads, if 0 then use the maximum concurrency
// possible
ThreadPool::ThreadPool(uint32_t MaxThreads) {
const uint32_t ThreadNum =
(MaxThreads > 0)
? std::min(std::thread::hardware_concurrency(), MaxThreads)
: std::thread::hardware_concurrency();
// create all threads waiting
for (uint32_t i = 0; i < ThreadNum; ++i) {
Threads.emplace_back(std::thread(&ThreadPool::spin, this));
}
}
int ThreadPool::numberOfThreads() { return Threads.size(); }
void ThreadPool::spin() {
// wait for a new job on the mutex, then run the job
// comunicate that a job was finished after job is executed
while (true) {
std::function<void()> job;
{
std::unique_lock<std::mutex> lock(QueueMutex);
MutexCondition.wait(lock, [&] { return !Jobs.empty() || Terminate; });
if (Terminate) {
break;
}
++WorkingThreads;
job = Jobs.front();
Jobs.pop();
}
job();
{
std::unique_lock<std::mutex> lock(QueueMutex);
--WorkingThreads;
if (WorkingThreads == 0) {
JobCondition.notify_all();
}
}
}
}
void ThreadPool::addJob(const std::function<void()> &job) {
if (Terminated) {
return;
}
{ // add one job, sync with mutex
std::unique_lock<std::mutex> lock(QueueMutex);
Jobs.push(job);
}
// notify that there is one job
MutexCondition.notify_one();
}
bool ThreadPool::waitEnd() {
// waits on the condition until there are no more jobs and all threads are
// idle, should return false
if (Terminated) {
return false;
}
{
std::unique_lock<std::mutex> lock(QueueMutex);
JobCondition.wait(lock,
[&] { return Jobs.empty() && WorkingThreads == 0; });
}
return !Jobs.empty();
}
void ThreadPool::Stop() {
// joins all threads (lets them finish the current job)
{
std::unique_lock<std::mutex> lock(QueueMutex);
Terminate = true;
}
MutexCondition.notify_all();
for (std::thread &t : Threads) {
t.join();
}
Threads.clear();
Terminated = true;
}
ThreadPool::~ThreadPool() {
if (!Terminated) {
this->Stop();
}
}