1 Star 6 Fork 2

Eterfree / ThreadPool

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
ThreadPool.cpp 14.61 KB
一键复制 编辑 原始数据 按行查看 历史
许聪 提交于 2023-12-13 00:24 . update v2.3.0
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623
#include "ThreadPool.h"
#include "Condition.hpp"
#include "DoubleQueue.hpp"
#include "Thread.h"
#include <cstdint>
#include <exception>
#include <atomic>
#include <thread>
ETERFREE_SPACE_BEGIN
// 生成原子的设置函数
#define SET_ATOMIC(SizeType, Arithmetic, functor, field) \
SizeType functor(SizeType _size, Arithmetic _arithmetic) noexcept \
{ \
constexpr auto MEMORY_ORDER = std::memory_order_relaxed; \
switch (_arithmetic) \
{ \
case Arithmetic::REPLACE: \
return field.exchange(_size, MEMORY_ORDER); \
case Arithmetic::INCREASE: \
return field.fetch_add(_size, MEMORY_ORDER); \
case Arithmetic::DECREASE: \
return field.fetch_sub(_size, MEMORY_ORDER); \
default: \
return field.load(MEMORY_ORDER); \
} \
}
// 线程池数据结构体
struct ThreadPool::Structure
{
// 算术枚举
enum class Arithmetic : std::uint8_t
{
REPLACE, // 替换
INCREASE, // 自增
DECREASE // 自减
};
using QueueType = DoubleQueue<TaskType>;
using Callback = Thread::Callback;
std::atomic_bool _valid; // 线程有效性
Condition<> _condition; // 强化条件变量
std::thread _thread; // 守护线程
std::list<Thread> _threadTable; // 线程表
std::atomic<SizeType> _capacity; // 线程池容量
std::atomic<SizeType> _totalSize; // 总线程数量
std::atomic<SizeType> _idleSize; // 闲置线程数量
std::shared_ptr<QueueType> _taskQueue; // 任务队列
Callback _callback; // 回调函数子
// 过滤任务
template <typename _TaskQueue>
static auto filterTask(_TaskQueue& _taskQueue);
/*
* 默认构造函数
* 若先以运算符new创建实例,再交由共享指针std::shared_ptr托管,
* 则至少二次分配内存,先为实例分配内存,再为共享指针的控制块分配内存。
* 而std::make_shared典型地仅分配一次内存,实例内存和控制块内存连续。
*/
Structure() : \
_taskQueue(std::make_shared<QueueType>()) {}
// 守护线程是否有效
bool isValid() const noexcept
{
return _valid.load(std::memory_order_relaxed);
}
// 设置有效性
void setValid(bool _valid) noexcept
{
this->_valid.store(_valid, \
std::memory_order_relaxed);
}
// 获取线程池容量
auto getCapacity() const noexcept
{
return _capacity.load(std::memory_order_relaxed);
}
// 设置线程池容量
void setCapacity(SizeType _capacity, bool _notified = false);
// 获取总线程数量
auto getTotalSize() const noexcept
{
return _totalSize.load(std::memory_order_relaxed);
}
// 设置总线程数量
SET_ATOMIC(SizeType, Arithmetic, setTotalSize, _totalSize);
// 获取闲置线程数量
auto getIdleSize() const noexcept
{
return _idleSize.load(std::memory_order_relaxed);
}
// 设置闲置线程数量
SET_ATOMIC(SizeType, Arithmetic, setIdleSize, _idleSize);
// 放入任务
bool pushTask(const TaskType& _task);
bool pushTask(TaskType&& _task);
// 批量放入任务
bool pushTask(TaskQueue& _taskQueue);
bool pushTask(TaskQueue&& _taskQueue);
};
#undef SET_ATOMIC
// 过滤无效任务
template <typename _TaskQueue>
auto ThreadPool::Structure::filterTask(_TaskQueue& _taskQueue)
{
decltype(_taskQueue.size()) size = 0;
for (auto iterator = _taskQueue.cbegin(); \
iterator != _taskQueue.cend();)
if (!*iterator)
iterator = _taskQueue.erase(iterator);
else
{
++iterator;
++size;
}
return size;
}
// 设置线程池容量
void ThreadPool::Structure::setCapacity(SizeType _capacity, \
bool _notified)
{
auto capacity = this->_capacity.exchange(_capacity, \
std::memory_order_relaxed);
if (_notified && capacity != _capacity)
_condition.notify_one(Condition<>::Policy::RELAXED);
}
// 放入任务
bool ThreadPool::Structure::pushTask(const TaskType& _task)
{
// 若放入任务之前,任务队列为空,则通知守护线程
auto result = _taskQueue->push(_task);
if (result && result.value() == 0)
_condition.notify_one(Condition<>::Policy::RELAXED);
return result.has_value();
}
// 放入任务
bool ThreadPool::Structure::pushTask(TaskType&& _task)
{
// 若放入任务之前,任务队列为空,则通知守护线程
auto result = _taskQueue->push(std::forward<TaskType>(_task));
if (result && result.value() == 0)
_condition.notify_one(Condition<>::Policy::RELAXED);
return result.has_value();
}
// 批量放入任务
bool ThreadPool::Structure::pushTask(TaskQueue& _taskQueue)
{
// 过滤无效任务
if (filterTask(_taskQueue) <= 0) return false;
// 若放入任务之前,任务队列为空,则通知守护线程
auto result = this->_taskQueue->push(_taskQueue);
if (result && result.value() == 0)
_condition.notify_one(Condition<>::Policy::RELAXED);
return result.has_value();
}
// 批量放入任务
bool ThreadPool::Structure::pushTask(TaskQueue&& _taskQueue)
{
// 过滤无效任务
if (filterTask(_taskQueue) <= 0) return false;
// 若放入任务之前,任务队列为空,则通知守护线程
auto result = this->_taskQueue->push(std::forward<TaskQueue>(_taskQueue));
if (result && result.value() == 0)
_condition.notify_one(Condition<>::Policy::RELAXED);
return result.has_value();
}
// 获取线程池容量
auto ThreadPool::Proxy::getCapacity() const noexcept \
-> SizeType
{
return _data ? _data->getCapacity() : 0;
}
// 设置线程池容量
bool ThreadPool::Proxy::setCapacity(SizeType _capacity)
{
if (_capacity <= 0 || !_data) return false;
_data->setCapacity(_capacity, true);
return true;
}
// 获取总线程数量
auto ThreadPool::Proxy::getTotalSize() const noexcept \
-> SizeType
{
return _data ? _data->getTotalSize() : 0;
}
// 获取闲置线程数量
auto ThreadPool::Proxy::getIdleSize() const noexcept \
-> SizeType
{
return _data ? _data->getIdleSize() : 0;
}
// 获取任务数量
auto ThreadPool::Proxy::getTaskSize() const noexcept \
-> SizeType
{
return _data ? _data->_taskQueue->size() : 0;
}
// 放入任务
bool ThreadPool::Proxy::pushTask(const TaskType& _task)
{
return _task && _data && _data->pushTask(_task);
}
// 放入任务
bool ThreadPool::Proxy::pushTask(TaskType&& _task)
{
return _task && _data \
&& _data->pushTask(std::forward<TaskType>(_task));
}
// 批量放入任务
bool ThreadPool::Proxy::pushTask(TaskQueue& _taskQueue)
{
return _data && _data->pushTask(_taskQueue);
}
// 批量放入任务
bool ThreadPool::Proxy::pushTask(TaskQueue&& _taskQueue)
{
return _data \
&& _data->pushTask(std::forward<TaskQueue>(_taskQueue));
}
// 批量取出任务
bool ThreadPool::Proxy::popTask(TaskQueue& _taskQueue)
{
return _data && _data->_taskQueue->pop(_taskQueue);
}
// 清空任务
void ThreadPool::Proxy::clearTask()
{
if (_data)
_data->_taskQueue->clear();
}
// 移动数据
auto ThreadPool::move(ThreadPool& _left, \
ThreadPool&& _right) -> DataType
{
std::lock_guard leftLock(_left._mutex);
auto data = std::move(_left._data);
std::lock_guard rightLock(_right._mutex);
_left._data = std::move(_right._data);
return data;
}
// 创建线程池
void ThreadPool::create(DataType&& _data, SizeType _capacity)
{
using Arithmetic = Structure::Arithmetic;
// 定义回调函数子
_data->_callback = [_data = std::weak_ptr(_data)](Thread::ThreadID _id, bool _idle)
{
// 线程并非闲置状态
if (!_idle) return;
// 若在增加之前,无闲置线程,或者在增加之后,所有线程闲置,则通知守护线程
if (auto data = _data.lock(); data \
&& (data->setIdleSize(1, Arithmetic::INCREASE) == 0 \
|| data->getIdleSize() >= data->getTotalSize()))
data->_condition.notify_one(Condition<>::Policy::RELAXED);
};
// 初始化线程并放入线程表
_capacity = _capacity > 0 ? _capacity : 1;
for (decltype(_capacity) index = 0; index < _capacity; ++index)
{
Thread thread;
thread.configure(_data->_taskQueue, _data->_callback);
_data->_threadTable.push_back(std::move(thread));
}
_data->setCapacity(_capacity); // 设置线程池容量
_data->setTotalSize(_capacity, Arithmetic::REPLACE); // 设置总线程数量
_data->setIdleSize(_capacity, Arithmetic::REPLACE); // 设置闲置线程数量
// 守护线程设为有效
_data->setValid(true);
// 创建std::thread对象,即守护线程,以_data为参数,执行函数execute
_data->_thread = std::thread(execute, _data);
}
// 销毁线程池
void ThreadPool::destroy(DataType&& _data)
{
using Arithmetic = Structure::Arithmetic;
using Policy = Condition<>::Policy;
// 避免重复销毁
if (!_data->isValid()) return;
// 守护线程设为无效
_data->setValid(false);
// 通知守护线程退出
_data->_condition.notify_all(Policy::RELAXED);
// 分离守护线程
//_data->_thread.detach();
// 挂起直到守护线程退出
if (_data->_thread.joinable())
_data->_thread.join();
_data->setCapacity(0); // 设置线程池容量
_data->setTotalSize(0, Arithmetic::REPLACE); // 设置总线程数量
_data->setIdleSize(0, Arithmetic::REPLACE); // 设置闲置线程数量
}
// 调整线程数量
auto ThreadPool::adjust(DataType& _data) -> SizeType
{
using Arithmetic = Structure::Arithmetic;
auto size = _data->getTotalSize();
auto capacity = _data->getCapacity();
// 1.删减线程
if (size >= capacity) return size - capacity;
// 2.增加线程
size = capacity - size;
// 添加线程至线程表
for (decltype(size) index = 0; index < size; ++index)
{
Thread thread;
thread.configure(_data->_taskQueue, _data->_callback);
_data->_threadTable.push_back(std::move(thread));
}
// 增加总线程数量
_data->setTotalSize(size, Arithmetic::INCREASE);
// 增加闲置线程数量
_data->setIdleSize(size, Arithmetic::INCREASE);
return 0;
}
// 守护线程主函数
void ThreadPool::execute(DataType _data)
{
using Arithmetic = Structure::Arithmetic;
/*
* 条件变量的谓词,不必等待通知的条件
* 1.在守护线程有效的情况下:
* a.任务队列非空并且存在闲置线程。
* b.任务队列非空并且需要增加线程。
* c.存在闲置线程并且需要删减线程。
* 2.在守护线程无效的情况下:
* a.任务队列非空并且存在闲置线程
* b.所有线程闲置
*/
auto predicate = [&_data]
{
bool empty = _data->_taskQueue->empty();
if (_data->isValid())
{
bool idle = _data->getIdleSize() > 0;
auto size = _data->getTotalSize();
auto capacity = _data->getCapacity();
return !empty \
&& (idle || size < capacity) \
|| idle && size > capacity;
}
else
{
auto size = _data->getIdleSize();
auto capacity = _data->getTotalSize();
bool idle = size > 0;
return !empty && idle \
|| size >= capacity;
}
};
// 若谓词非真,自动解锁互斥元,阻塞守护线程,直至通知激活,再次锁定互斥元
_data->_condition.wait(predicate);
/*
* 守护线程退出条件
* 1.守护线程无效
* 2.任务队列为空
* 3.所有线程闲置
*/
while (_data->isValid() || !_data->_taskQueue->empty() \
|| _data->getIdleSize() < _data->getTotalSize())
{
// 调整线程数量
auto size = adjust(_data);
// 遍历线程表,访问闲置线程
for (auto iterator = _data->_threadTable.begin(); \
iterator != _data->_threadTable.end() \
&& _data->getIdleSize() > 0;)
{
// 若线程处于闲置状态
if (auto& thread = *iterator; thread.idle())
{
// 若通知线程执行任务成功,则减少闲置线程数量
if (thread.notify())
_data->setIdleSize(1, Arithmetic::DECREASE);
// 删减线程
else if (size > 0)
{
iterator = _data->_threadTable.erase(iterator);
_data->setIdleSize(1, Arithmetic::DECREASE);
_data->setTotalSize(1, Arithmetic::DECREASE);
--size;
continue;
}
}
++iterator;
}
// 根据谓词真假,决定是否阻塞守护线程
_data->_condition.wait(predicate);
}
// 清空线程表
_data->_threadTable.clear();
}
// 获取支持的并发线程数量
auto ThreadPool::getConcurrency() noexcept -> SizeType
{
auto concurrency = std::thread::hardware_concurrency();
return concurrency > 0 ? concurrency : 1;
}
// 默认构造函数
ThreadPool::ThreadPool(SizeType _capacity) : \
_data(std::make_shared<Structure>())
{
create(load(), _capacity);
}
// 构造函数
ThreadPool::ThreadPool(SizeType _size, \
SizeType _capacity) : \
_data(std::make_shared<Structure>())
{
create(load(), _capacity);
}
// 默认移动构造函数
ThreadPool::ThreadPool(ThreadPool&& _another) noexcept
{
try
{
std::lock_guard lock(_another._mutex);
this->_data = std::move(_another._data);
}
catch (std::exception&) {}
}
// 默认析构函数
ThreadPool::~ThreadPool() noexcept
{
try
{
// 数据非空才进行销毁,以支持移动语义
if (auto data = load())
destroy(std::move(data));
}
catch (std::exception&) {}
}
// 默认移动赋值运算符函数
auto ThreadPool::operator=(ThreadPool&& _threadPool) noexcept \
-> ThreadPool&
{
if (&_threadPool != this)
{
try
{
auto data = move(*this, \
std::forward<ThreadPool>(_threadPool));
if (data) destroy(std::move(data));
}
catch (std::exception&) {}
}
return *this;
}
// 获取线程池容量
auto ThreadPool::getCapacity() const -> SizeType
{
auto data = load();
return data ? data->getCapacity() : 0;
}
// 设置线程池容量
bool ThreadPool::setCapacity(SizeType _capacity)
{
if (_capacity > 0)
if (auto data = load())
{
data->setCapacity(_capacity, true);
return true;
}
return false;
}
// 获取总线程数量
auto ThreadPool::getTotalSize() const -> SizeType
{
auto data = load();
return data ? data->getTotalSize() : 0;
}
// 获取闲置线程数量
auto ThreadPool::getIdleSize() const -> SizeType
{
auto data = load();
return data ? data->getIdleSize() : 0;
}
// 获取任务数量
auto ThreadPool::getTaskSize() const -> SizeType
{
auto data = load();
return data ? data->_taskQueue->size() : 0;
}
// 放入任务
bool ThreadPool::pushTask(const TaskType& _task)
{
// 过滤无效任务
if (!_task) return false;
auto data = load();
return data && data->pushTask(_task);
}
// 放入任务
bool ThreadPool::pushTask(TaskType&& _task)
{
// 过滤无效任务
if (!_task) return false;
auto data = load();
return data \
&& data->pushTask(std::forward<TaskType>(_task));
}
// 批量放入任务
bool ThreadPool::pushTask(TaskQueue& _taskQueue)
{
auto data = load();
return data && data->pushTask(_taskQueue);
}
// 批量放入任务
bool ThreadPool::pushTask(TaskQueue&& _taskQueue)
{
auto data = load();
return data \
&& data->pushTask(std::forward<TaskQueue>(_taskQueue));
}
// 批量取出任务
bool ThreadPool::popTask(TaskQueue& _taskQueue)
{
auto data = load();
return data \
&& data->_taskQueue->pop(_taskQueue);
}
// 清空任务
void ThreadPool::clearTask()
{
if (auto data = load())
data->_taskQueue->clear();
}
// 获取代理
auto ThreadPool::getProxy() const \
-> Proxy
{
return load();
}
ETERFREE_SPACE_END
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C++
1
https://gitee.com/eterfree/ThreadPool.git
git@gitee.com:eterfree/ThreadPool.git
eterfree
ThreadPool
ThreadPool
master

搜索帮助

344bd9b3 5694891 D2dac590 5694891