commit 3cfcc31ad3e2dde12cb10b5b15aa14015568cfaa Author: chunelfeng Date: Wed Oct 5 22:26:12 2022 +0800 feat : 第一个功能版本 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..143ceb6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/cmake-build-debug/ +/cmake-build-release/ +/.idea/ +.DS_Store diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..1ddbd30 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,24 @@ + +cmake_minimum_required(VERSION 3.2.5) + +project(CThreadPool VERSION 1.0.0) + +set(CMAKE_CXX_STANDARD 14) + +# add CThreadPool environment info +include(cmake/CThreadPool-env-include.cmake) + +file(GLOB_RECURSE CTP_SRC_LIST "./src/*.cpp") + +# 如果开启此宏定义,则CGraph执行过程中,不会在控制台打印任何信息 +# add_definitions(-D_CGRAPH_SILENCE_) + +# 编译libCThreadPool动态库 +# add_library(CThreadPool SHARED ${CTP_SRC_LIST}) + +# 编译libCThreadPool静态库 +# add_library(CThreadPool STATIC ${CTP_SRC_LIST}) + +add_executable(CThreadPool + ${CTP_SRC_LIST} + tutorial.cpp) diff --git a/MyFunction.h b/MyFunction.h new file mode 100644 index 0000000..6cbf6c4 --- /dev/null +++ b/MyFunction.h @@ -0,0 +1,36 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: MyFunction.h +@Time: 2021/9/2 11:20 下午 +@Desc: +***************************/ + +#ifndef CGRAPH_MYFUNCTION_H +#define CGRAPH_MYFUNCTION_H + + +int add(int i, int j) { + return i + j; +} + +static float minusBy5(float i) { + return i - 5.0f; +} + + +class MyFunction { +public: + std::string concat(std::string& str) const { + return info_ + str; + } + + static int multiply(int i, int j) { + return i * j; + } + +private: + std::string info_ = "MyFunction : "; +}; + +#endif //CGRAPH_MYFUNCTION_H diff --git a/README.md b/README.md new file mode 100644 index 0000000..138402d --- /dev/null +++ b/README.md @@ -0,0 +1,83 @@ +

+ languages + os + stars + forks +

+ +

+ CThreadPool 说明文档 +

+ +## 一. 简介 +`CThreadPool` 是一个跨平台的、无任何三方依赖的、高性能的C++14(含以上版本)版本的线程池,也是 [CGraph](https://github.com/ChunelFeng/CGraph) 项目中使用的跨平台线程池组件功能的最小集。 + +经过CGraph和关联项目的长期迭代和验证,功能已经趋于稳定,且性能优异。因为咨询相关内容的朋友较多,故做为独立的仓库提供出来,方便大家使用。 + +由于是CGraph项目中的剥离出来的功能类,故在项目中保留了多处 `CGRAPH_*` 的命名方式,仅将 namespace 修改为 `CTP`,预计今后与CGraph相互独立更新。 + +本项目参考了[《C++并发编程实战(第二版)》](https://nj.gitbooks.io/c/content/) 中的部分内容,和github上部分相关的优秀工程。并在此基础上进行大量的改动、扩展和优化,在功能、性能和易用性上均有较大的提升。 + +在开发过程中,也沉淀了详细的说明文档(见下方 推荐阅读),以便于大家快速了解代码和思路,也请大家不吝指教。 + +## 二. 编译说明 +* 本工程支持MacOS、Linux和Windows系统,无任何第三方依赖。推荐使用C++14(默认)或以上版本,不支持C++11或以下版本 + +* 使用CLion作为IDE的开发者,或使用Visual Studio 15(或以上版本)作为IDE的开发者,打开CMakeLists.txt文件作为工程,即可编译通过 + +* Linux环境开发者,在命令行模式下,输入以下指令,即可编译通过 +```shell +$ git clone https://github.com/ChunelFeng/CThreadPool.git +$ cd CThreadPool +$ cmake . -Bbuild +$ cd build +$ make -j8 +``` + +## 三. 使用Demo +```cpp +#include "src/CThreadPool.h" + +using namespace CTP; + +float add_by_5(float i) { + return i + 5.0f; +} + +void tutorial() { + UThreadPool tp; + int i = 6, j = 3; + auto r1 = tp.commit([i, j] { return i - j; }); + std::future r2 = tp.commit(std::bind(add_by_5, 8.5f)); + + std::cout << r1.get() << std::endl; + std::cout << r2.get() << std::endl; +} +``` +更多使用方法,请参考 `tutorial.cpp` 中的例子和文档中的内容。 + +## 四. 推荐阅读 +* [纯序员给你介绍图化框架的简单实现——线程池优化(一)](http://www.chunel.cn/archives/cgraph-threadpool-1-introduce) +* [纯序员给你介绍图化框架的简单实现——线程池优化(二)](http://www.chunel.cn/archives/cgraph-threadpool-2-introduce) +* [纯序员给你介绍图化框架的简单实现——线程池优化(三)](http://www.chunel.cn/archives/cgraph-threadpool-3-introduce) +* [纯序员给你介绍图化框架的简单实现——线程池优化(四)](http://www.chunel.cn/archives/cgraph-threadpool-4-introduce) +* [纯序员给你介绍图化框架的简单实现——线程池优化(五)](http://www.chunel.cn/archives/cgraph-threadpool-5-introduce) +* [纯序员给你介绍图化框架的简单实现——线程池优化(六)](http://www.chunel.cn/archives/cgraph-threadpool-6-introduce) + +## 五. 关联项目 +* [CGraph : A simple C++ DAG framework](https://github.com/ChunelFeng/CGraph) + +------------ +#### 附录-1. 版本信息 +[2022.10.05 - v1.0.0 - Chunel] +* 提供线程池基本功能 +* 提供对应的tutorial信息 + +------------ +#### 附录-2. 联系方式 +* 微信: ChunelFeng +* 邮箱: chunel@foxmail.com +* 源码: https://github.com/ChunelFeng/CThreadPool +* 论坛: www.chunel.cn + +![CGraph Author](https://github.com/ChunelFeng/CThreadPool/blob/main/doc/image/CThreadPool%20Author.jpg) diff --git a/cmake/CThreadPool-env-include.cmake b/cmake/CThreadPool-env-include.cmake new file mode 100644 index 0000000..9fee081 --- /dev/null +++ b/cmake/CThreadPool-env-include.cmake @@ -0,0 +1,20 @@ + +# 本cmake文件,供三方引入CGraph引用,用于屏蔽系统和C++版本的区别 + +IF(APPLE) + # 非mac平台,暂时不支持自动生成session信息 + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -m64 -finline-functions -Wno-deprecated-declarations -Wno-c++17-extensions") + add_definitions(-D_GENERATE_SESSION_) + add_definitions(-D_ENABLE_LIKELY_) +ELSEIF(UNIX) + # linux平台,加入多线程内容 + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O2 -pthread -Wno-format-overflow") + add_definitions(-D_ENABLE_LIKELY_) +ELSEIF(WIN32) + # windows平台,加入utf-8设置。否则无法通过编译 + add_definitions(/utf-8) + + # 禁止两处warning级别提示 + add_compile_options(/wd4996) + add_compile_options(/wd4267) +ENDIF() diff --git a/doc/image/CThreadPool Author.jpg b/doc/image/CThreadPool Author.jpg new file mode 100644 index 0000000..b625b76 Binary files /dev/null and b/doc/image/CThreadPool Author.jpg differ diff --git a/src/CBasic/CBasicDefine.h b/src/CBasic/CBasicDefine.h new file mode 100644 index 0000000..acd4e5f --- /dev/null +++ b/src/CBasic/CBasicDefine.h @@ -0,0 +1,37 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: CBasicDefine.h +@Time: 2021/4/26 8:15 下午 +@Desc: +***************************/ + +#ifndef CGRAPH_CBASICDEFINE_H +#define CGRAPH_CBASICDEFINE_H + +#include + +#define CGRAPH_NAMESPACE_BEGIN \ +namespace CTP { \ + +#define CGRAPH_NAMESPACE_END \ +} /* end of namespace CTP */ \ + +CGRAPH_NAMESPACE_BEGIN + +using CCHAR = char; +using CUINT = unsigned int; +using CVOID = void; +using CINT = int; +using CLONG = long; +using CULONG = unsigned long; +using CBOOL = bool; +using CBIGBOOL = int; +using CFLOAT = float; +using CDOUBLE = double; +using CCONSTR = const char*; +using CSIZE = size_t; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_CBASICDEFINE_H diff --git a/src/CBasic/CBasicInclude.h b/src/CBasic/CBasicInclude.h new file mode 100644 index 0000000..d694855 --- /dev/null +++ b/src/CBasic/CBasicInclude.h @@ -0,0 +1,20 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: CBasicInclude.h +@Time: 2022/2/1 4:23 下午 +@Desc: +***************************/ + +#ifndef CGRAPH_CBASICINCLUDE_H +#define CGRAPH_CBASICINCLUDE_H + +#include "CObject.h" +#include "CValType.h" +#include "CFuncType.h" +#include "CStatus.h" +#include "CException.h" +#include "CBasicDefine.h" +#include "CInfoDefine.h" + +#endif //CGRAPH_CBASICINCLUDE_H diff --git a/src/CBasic/CException.h b/src/CBasic/CException.h new file mode 100644 index 0000000..ada360a --- /dev/null +++ b/src/CBasic/CException.h @@ -0,0 +1,39 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: CException.h +@Time: 2022/4/15 20:51 +@Desc: 异常处理类 +***************************/ + +#ifndef CGRAPH_CEXCEPTION_H +#define CGRAPH_CEXCEPTION_H + +#include +#include + +#include "CInfoDefine.h" + +CGRAPH_NAMESPACE_BEGIN + +class CEXCEPTION : public std::exception { +public: + explicit CEXCEPTION(const std::string& info = CGRAPH_EMPTY) { + info_ = info.empty() ? CGRAPH_BASIC_EXCEPTION : info; + } + + /** + * 获取异常信息 + * @return + */ + [[nodiscard]] const char* what() const noexcept override { + return info_.c_str(); + } + +private: + std::string info_; // 异常状态信息 +}; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_CEXCEPTION_H diff --git a/src/CBasic/CFuncType.h b/src/CBasic/CFuncType.h new file mode 100644 index 0000000..9f68dd3 --- /dev/null +++ b/src/CBasic/CFuncType.h @@ -0,0 +1,67 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: CFuncType.h +@Time: 2022/2/3 1:05 下午 +@Desc: +***************************/ + +#ifndef CGRAPH_CFUNCTYPE_H +#define CGRAPH_CFUNCTYPE_H + +#include + +#include "CInfoDefine.h" +#include "CValType.h" + +CGRAPH_NAMESPACE_BEGIN + +using CGRAPH_DEFAULT_FUNCTION = std::function; +using CGRAPH_DEFAULT_CONST_FUNCTION_REF = const std::function&; +using CGRAPH_CSTATUS_FUNCTION = std::function; +using CGRAPH_CSTATUS_CONST_FUNCTION_REF = const std::function&; +using CGRAPH_CALLBACK_FUNCTION = std::function; +using CGRAPH_CALLBACK_CONST_FUNCTION_REF = const std::function&; + + +/** + * 描述函数类型 + */ +enum class CFunctionType { + INIT = 1, /** 初始化函数 */ + RUN = 2, /** 执行函数 */ + DESTROY = 3 /** 释放函数 */ +}; + +/** 开启函数流程 */ +#define CGRAPH_FUNCTION_BEGIN \ + CStatus status; \ + +/** 结束函数流程 */ +#define CGRAPH_FUNCTION_END \ + return status; \ + +/** 无任何功能函数 */ +#define CGRAPH_EMPTY_FUNCTION \ + return CStatus(); \ + +/** 不支持当前功能 */ +#define CGRAPH_NO_SUPPORT \ + return CStatus(CGRAPH_FUNCTION_NO_SUPPORT); \ + +/** 返回异常信息和状态 */ +#define CGRAPH_RETURN_ERROR_STATUS(info) \ + return CStatus(info); \ + +/** 定义为不能赋值和拷贝的对象类型 */ +#define CGRAPH_NO_ALLOWED_COPY(CType) \ + CType(const CType &) = delete; \ + const CType &operator=(const CType &) = delete; \ + +/** 抛出异常 */ +#define CGRAPH_THROW_EXCEPTION(info) \ + throw CException(info); \ + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_CFUNCTYPE_H diff --git a/src/CBasic/CInfoDefine.h b/src/CBasic/CInfoDefine.h new file mode 100644 index 0000000..e004ab8 --- /dev/null +++ b/src/CBasic/CInfoDefine.h @@ -0,0 +1,22 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: CInfoDefine.h +@Time: 2022/4/16 14:01 +@Desc: +***************************/ + +#ifndef CGRAPH_CINFODEFINE_H +#define CGRAPH_CINFODEFINE_H + +#include "CBasicDefine.h" + +CGRAPH_NAMESPACE_BEGIN + +static const char* CGRAPH_EMPTY = ""; +static const char* CGRAPH_BASIC_EXCEPTION = "CGraph Exception"; +static const char* CGRAPH_FUNCTION_NO_SUPPORT = "function no support"; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_CINFODEFINE_H diff --git a/src/CBasic/CObject.h b/src/CBasic/CObject.h new file mode 100644 index 0000000..8ccbe46 --- /dev/null +++ b/src/CBasic/CObject.h @@ -0,0 +1,52 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: CObject.h +@Time: 2021/4/26 8:12 下午 +@Desc: 所有类型的父节点,其中run()方法必须实现 +***************************/ + +#ifndef CGRAPH_COBJECT_H +#define CGRAPH_COBJECT_H + +#include "CBasicDefine.h" +#include "CValType.h" +#include "CFuncType.h" + +CGRAPH_NAMESPACE_BEGIN + +class CObject { +public: + /** + * 默认构造函数 + */ + explicit CObject() = default; + + /** + * 初始化函数 + */ + virtual CStatus init() { + CGRAPH_EMPTY_FUNCTION + } + + /** + * 流程处理函数 + */ + virtual CStatus run() = 0; + + /** + * 释放函数(对应原先deinit函数) + */ + virtual CStatus destroy() { + CGRAPH_EMPTY_FUNCTION + } + + /** + * 默认析构函数 + */ + virtual ~CObject() = default; +}; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_COBJECT_H diff --git a/src/CBasic/CStatus.h b/src/CBasic/CStatus.h new file mode 100644 index 0000000..dde3ea6 --- /dev/null +++ b/src/CBasic/CStatus.h @@ -0,0 +1,104 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: CStatus.h +@Time: 2021/12/17 10:32 下午 +@Desc: 命名为 CSTATUS,直接对外提供的是 CStatus 类 +***************************/ + +#ifndef CGRAPH_CSTATUS_H +#define CGRAPH_CSTATUS_H + +#include + +#include "CBasicDefine.h" + +CGRAPH_NAMESPACE_BEGIN + +static const int STATUS_OK = 0; /** 正常流程返回值 */ +static const int STATUS_ERR = -1; /** 异常流程返回值 */ +static const char* STATUS_ERROR_INFO_CONNECTOR = " && "; /** 多异常信息连接符号 */ + +class CSTATUS { +public: + explicit CSTATUS() = default; + + explicit CSTATUS(const std::string &errorInfo) { + this->error_code_ = STATUS_ERR; // 默认的error code信息 + this->error_info_ = errorInfo; + } + + explicit CSTATUS(int errorCode, const std::string &errorInfo) { + this->error_code_ = errorCode; + this->error_info_ = errorInfo; + } + + CSTATUS(const CSTATUS &status) { + this->error_code_ = status.error_code_; + this->error_info_ = status.error_info_; + } + + CSTATUS(const CSTATUS &&status) noexcept { + this->error_code_ = status.error_code_; + this->error_info_ = status.error_info_; + } + + CSTATUS& operator=(const CSTATUS& status) = default; + + CSTATUS& operator+=(const CSTATUS& cur) { + if (this->isOK() && cur.isOK()) { + return (*this); + } + + error_info_ = this->isOK() + ? cur.error_info_ + : (cur.isOK() + ? error_info_ + : (error_info_ + STATUS_ERROR_INFO_CONNECTOR + cur.error_info_)); + error_code_ = STATUS_ERR; + + return (*this); + } + + void setStatus(const std::string& info) { + error_code_ = STATUS_ERR; + error_info_ = info; + } + + void setStatus(int code, const std::string& info) { + error_code_ = code; + error_info_ = info; + } + + [[nodiscard]] int getCode() const { + return this->error_code_; + } + + [[nodiscard]] const std::string& getInfo() const { + return this->error_info_; + } + + /** + * 判断当前状态是否可行 + * @return + */ + [[nodiscard]] bool isOK() const { + return STATUS_OK == error_code_; + } + + /** + * 判断当前状态是否可行 + * @return + */ + [[nodiscard]] bool isErr() const { + return error_code_ < STATUS_OK; // 约定异常信息,均为负值 + } + +private: + int error_code_ { STATUS_OK }; // 错误码信息 + std::string error_info_; // 错误信息描述 +}; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_CSTATUS_H diff --git a/src/CBasic/CValType.h b/src/CBasic/CValType.h new file mode 100644 index 0000000..9b935b9 --- /dev/null +++ b/src/CBasic/CValType.h @@ -0,0 +1,36 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: CValTypes.h +@Time: 2022/2/3 12:58 下午 +@Desc: +***************************/ + +#ifndef CGRAPH_CVALTYPE_H +#define CGRAPH_CVALTYPE_H + +#include "CStatus.h" +#include "CException.h" + +using CChar = CTP::CCHAR; +using CUint = CTP::CUINT; +using CSec = CTP::CUINT; // 表示秒信息, for second +using CMSec = CTP::CUINT; // 表示毫秒信息, for millisecond +using CSize = CTP::CSIZE; +using CVoid = CTP::CVOID; +using CVoidPtr = CTP::CVOID *; +using CInt = CTP::CINT; +using CLevel = CTP::CINT; +using CLong = CTP::CLONG; +using CULong = CTP::CULONG; +using CBool = CTP::CBOOL; +using CIndex = CTP::CINT; // 表示标识信息,可以为负数 +using CFloat = CTP::CFLOAT; +using CDouble = CTP::CDOUBLE; +using CConStr = CTP::CCONSTR; // 表示 const char* +using CBigBool = CTP::CBIGBOOL; + +using CStatus = CTP::CSTATUS; +using CException = CTP::CEXCEPTION; + +#endif //CGRAPH_CVALTYPE_H diff --git a/src/CThreadPool.h b/src/CThreadPool.h new file mode 100644 index 0000000..bae1e5d --- /dev/null +++ b/src/CThreadPool.h @@ -0,0 +1,15 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: CThreadPool.h +@Time: 2022/10/5 20:08 +@Desc: +***************************/ + +#ifndef CTHREADPOOL_CTHREADPOOL_H +#define CTHREADPOOL_CTHREADPOOL_H + +#include "CBasic/CBasicInclude.h" +#include "UtilsCtrl/UtilsInclude.h" + +#endif //CTHREADPOOL_CTHREADPOOL_H diff --git a/src/UtilsCtrl/ThreadPool/Queue/UAtomicPriorityQueue.h b/src/UtilsCtrl/ThreadPool/Queue/UAtomicPriorityQueue.h new file mode 100644 index 0000000..f9a328b --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/Queue/UAtomicPriorityQueue.h @@ -0,0 +1,90 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UAtomicPriorityQueue.h +@Time: 2022/10/1 21:40 +@Desc: 线程安全的优先队列。因为 priority_queue和queue的弹出方式不一致,故暂时不做合并 +***************************/ + +#ifndef CGRAPH_UATOMICPRIORITYQUEUE_H +#define CGRAPH_UATOMICPRIORITYQUEUE_H + +#include + +#include "UQueueObject.h" + +CGRAPH_NAMESPACE_BEGIN + +template +class UAtomicPriorityQueue : public UQueueObject { +public: + UAtomicPriorityQueue() = default; + + /** + * 尝试弹出 + * @param value + * @return + */ + CBool tryPop(T& value) { + CGRAPH_LOCK_GUARD lk(mutex_); + if (priority_queue_.empty()) { + return false; + } + value = std::move(*priority_queue_.top()); + priority_queue_.pop(); + return true; + } + + + /** + * 尝试弹出多个任务 + * @param values + * @param maxPoolBatchSize + * @return + */ + CBool tryPop(std::vector& values, int maxPoolBatchSize) { + CGRAPH_LOCK_GUARD lk(mutex_); + if (priority_queue_.empty() || maxPoolBatchSize <= 0) { + return false; + } + + while (!priority_queue_.empty() && maxPoolBatchSize--) { + values.emplace_back(std::move(*priority_queue_.top())); + priority_queue_.pop(); + } + + return true; + } + + + /** + * 传入数据 + * @param value + * @param priority 任务优先级,数字排序 + * @return + */ + CVoid push(T&& value, int priority) { + std::unique_ptr task(std::make_unique(std::move(value), priority)); + CGRAPH_LOCK_GUARD lk(mutex_); + priority_queue_.push(std::move(task)); + } + + + /** + * 判定队列是否为空 + * @return + */ + [[nodiscard]] CBool empty() { + CGRAPH_LOCK_GUARD lk(mutex_); + return priority_queue_.empty(); + } + + CGRAPH_NO_ALLOWED_COPY(UAtomicPriorityQueue) + +private: + std::priority_queue > priority_queue_; // 优先队列信息,根据重要级别决定先后执行顺序 +}; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_UATOMICPRIORITYQUEUE_H diff --git a/src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h b/src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h new file mode 100644 index 0000000..c5f696f --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h @@ -0,0 +1,131 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UAtomicQueue.h +@Time: 2021/7/2 11:28 下午 +@Desc: 设计了一个安全队列 +***************************/ + +#ifndef CGRAPH_UATOMICQUEUE_H +#define CGRAPH_UATOMICQUEUE_H + +#include +#include +#include +#include + +#include "../UThreadPoolDefine.h" +#include "UQueueObject.h" + +CGRAPH_NAMESPACE_BEGIN + +template +class UAtomicQueue : public UQueueObject { +public: + UAtomicQueue() = default; + + /** + * 等待弹出 + * @param value + */ + CVoid waitPop(T& value) { + CGRAPH_UNIQUE_LOCK lk(mutex_); + cv_.wait(lk, [this] { return !queue_.empty(); }); + value = std::move(*queue_.front()); + queue_.pop(); + } + + + /** + * 尝试弹出 + * @param value + * @return + */ + CBool tryPop(T& value) { + CGRAPH_LOCK_GUARD lk(mutex_); + if (queue_.empty()) { + return false; + } + value = std::move(*queue_.front()); + queue_.pop(); + return true; + } + + + /** + * 尝试弹出多个任务 + * @param values + * @param maxPoolBatchSize + * @return + */ + CBool tryPop(std::vector& values, int maxPoolBatchSize) { + CGRAPH_LOCK_GUARD lk(mutex_); + if (queue_.empty() || maxPoolBatchSize <= 0) { + return false; + } + + while (!queue_.empty() && maxPoolBatchSize--) { + values.emplace_back(std::move(*queue_.front())); + queue_.pop(); + } + + return true; + } + + + /** + * 阻塞式等待弹出 + * @return + */ + std::unique_ptr waitPop() { + CGRAPH_UNIQUE_LOCK lk(mutex_); + cv_.wait(lk, [this] { return !queue_.empty(); }); + std::unique_ptr result = std::move(queue_.front()); + queue_.pop(); + return result; + } + + + /** + * 非阻塞式等待弹出 + * @return + */ + std::unique_ptr tryPop() { + CGRAPH_LOCK_GUARD lk(mutex_); + if (queue_.empty()) { return std::unique_ptr(); } + std::unique_ptr ptr = std::move(queue_.front()); + queue_.pop(); + return ptr; + } + + + /** + * 传入数据 + * @param value + */ + CVoid push(T&& value) { + std::unique_ptr task(std::make_unique(std::move(value))); + CGRAPH_LOCK_GUARD lk(mutex_); + queue_.push(std::move(task)); + cv_.notify_one(); + } + + + /** + * 判定队列是否为空 + * @return + */ + [[nodiscard]] CBool empty() { + CGRAPH_LOCK_GUARD lk(mutex_); + return queue_.empty(); + } + + CGRAPH_NO_ALLOWED_COPY(UAtomicQueue) + +private: + std::queue> queue_; +}; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_UATOMICQUEUE_H diff --git a/src/UtilsCtrl/ThreadPool/Queue/UQueueInclude.h b/src/UtilsCtrl/ThreadPool/Queue/UQueueInclude.h new file mode 100644 index 0000000..a1b070d --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/Queue/UQueueInclude.h @@ -0,0 +1,16 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UQueueInclude.h +@Time: 2022/1/12 11:09 下午 +@Desc: +***************************/ + +#ifndef CGRAPH_UQUEUEINCLUDE_H +#define CGRAPH_UQUEUEINCLUDE_H + +#include "UAtomicQueue.h" +#include "UWorkStealingQueue.h" +#include "UAtomicPriorityQueue.h" + +#endif //CGRAPH_UQUEUEINCLUDE_H diff --git a/src/UtilsCtrl/ThreadPool/Queue/UQueueObject.h b/src/UtilsCtrl/ThreadPool/Queue/UQueueObject.h new file mode 100644 index 0000000..75da626 --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/Queue/UQueueObject.h @@ -0,0 +1,26 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UQueueObject.h +@Time: 2022/10/1 20:31 +@Desc: +***************************/ + +#ifndef CGRAPH_UQUEUEOBJECT_H +#define CGRAPH_UQUEUEOBJECT_H + +#include + +#include "../UThreadObject.h" + +CGRAPH_NAMESPACE_BEGIN + +class UQueueObject : public UThreadObject { +protected: + std::mutex mutex_; + std::condition_variable cv_; +}; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_UQUEUEOBJECT_H diff --git a/src/UtilsCtrl/ThreadPool/Queue/UWorkStealingQueue.h b/src/UtilsCtrl/ThreadPool/Queue/UWorkStealingQueue.h new file mode 100644 index 0000000..9a4b5a7 --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/Queue/UWorkStealingQueue.h @@ -0,0 +1,133 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UWorkStealingQueue.h +@Time: 2021/7/2 11:29 下午 +@Desc: 实现了一个包含盗取功能的安全队列 +***************************/ + + +#ifndef CGRAPH_UWORKSTEALINGQUEUE_H +#define CGRAPH_UWORKSTEALINGQUEUE_H + +#include +#include +#include + +#include "UQueueObject.h" +#include "../Task/UTask.h" + +CGRAPH_NAMESPACE_BEGIN + +class UWorkStealingQueue : public UQueueObject { +public: + UWorkStealingQueue() = default; + + /** + * 向队列中写入信息 + * @param task + */ + CVoid push(UTask&& task) { + while (true) { + if (mutex_.try_lock()) { + deque_.emplace_front(std::move(task)); + mutex_.unlock(); + break; + } else { + std::this_thread::yield(); + } + } + } + + + /** + * 弹出节点,从头部进行 + * @param task + * @return + */ + CBool tryPop(UTask& task) { + // 这里不使用raii锁,主要是考虑到多线程的情况下,可能会重复进入 + bool result = false; + if (mutex_.try_lock()) { + if (!deque_.empty()) { + task = std::move(deque_.front()); // 从前方弹出 + deque_.pop_front(); + result = true; + } + mutex_.unlock(); + } + + return result; + } + + + /** + * 从头部开始批量获取可执行任务信息 + * @param taskArr + * @param maxLocalBatchSize + * @return + */ + CBool tryPop(UTaskArrRef taskArr, + int maxLocalBatchSize) { + bool result = false; + if (mutex_.try_lock()) { + while (!deque_.empty() && maxLocalBatchSize--) { + taskArr.emplace_back(std::move(deque_.front())); + deque_.pop_front(); + result = true; + } + mutex_.unlock(); + } + + return result; + } + + + /** + * 窃取节点,从尾部进行 + * @param task + * @return + */ + CBool trySteal(UTask& task) { + bool result = false; + if (mutex_.try_lock()) { + if (!deque_.empty()) { + task = std::move(deque_.back()); // 从后方窃取 + deque_.pop_back(); + result = true; + } + mutex_.unlock(); + } + + return result; + } + + + /** + * 批量窃取节点,从尾部进行 + * @param taskArr + * @return + */ + CBool trySteal(UTaskArrRef taskArr, int maxStealBatchSize) { + bool result = false; + if (mutex_.try_lock()) { + while (!deque_.empty() && maxStealBatchSize--) { + taskArr.emplace_back(std::move(deque_.back())); + deque_.pop_back(); + result = true; + } + mutex_.unlock(); + } + + return result; // 如果非空,表示盗取成功 + } + + CGRAPH_NO_ALLOWED_COPY(UWorkStealingQueue) + +private: + std::deque deque_; +}; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_UWORKSTEALINGQUEUE_H diff --git a/src/UtilsCtrl/ThreadPool/Task/UTask.h b/src/UtilsCtrl/ThreadPool/Task/UTask.h new file mode 100644 index 0000000..e29b317 --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/Task/UTask.h @@ -0,0 +1,80 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UTask.h +@Time: 2021/7/2 11:32 下午 +@Desc: +***************************/ + +#ifndef CGRAPH_UTASK_H +#define CGRAPH_UTASK_H + +#include +#include + +#include "../UThreadObject.h" + +CGRAPH_NAMESPACE_BEGIN + +class UTask : public UThreadObject { + struct taskBased { + explicit taskBased() = default; + virtual CVoid call() = 0; + virtual ~taskBased() = default; + }; + + template + struct taskDerided : taskBased { + F func_; + explicit taskDerided(F&& func) : func_(std::move(func)) {} + CVoid call() override { func_(); } + }; + +public: + template + UTask(F&& f, int priority = 0) + : impl_(new taskDerided(std::forward(f))) + , priority_(priority) {} + + CVoid operator()() { + if (likely(impl_)) { + impl_->call(); + } + } + + UTask() = default; + + UTask(UTask&& task) noexcept: + impl_(std::move(task.impl_)), + priority_(task.priority_) {} + + UTask &operator=(UTask&& task) noexcept { + impl_ = std::move(task.impl_); + priority_ = task.priority_; + return *this; + } + + CBool operator>(const UTask& task) const { + return priority_ < task.priority_; // 新加入的,放到后面 + } + + CBool operator<(const UTask& task) const { + return priority_ >= task.priority_; + } + + CGRAPH_NO_ALLOWED_COPY(UTask) + +private: + std::unique_ptr impl_ = nullptr; + int priority_ = 0; // 任务的优先级信息 +}; + + +using UTaskRef = UTask &; +using UTaskPtr = UTask *; +using UTaskArr = std::vector; +using UTaskArrRef = std::vector &; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_UTASK_H diff --git a/src/UtilsCtrl/ThreadPool/Task/UTaskGroup.h b/src/UtilsCtrl/ThreadPool/Task/UTaskGroup.h new file mode 100644 index 0000000..2663ae1 --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/Task/UTaskGroup.h @@ -0,0 +1,102 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UTaskGroup.h +@Time: 2022/1/2 2:17 下午 +@Desc: 任务组,用于批量提交 +***************************/ + +#ifndef CGRAPH_UTASKGROUP_H +#define CGRAPH_UTASKGROUP_H + +#include + +#include "../UThreadObject.h" + +CGRAPH_NAMESPACE_BEGIN + +class UTaskGroup : public UThreadObject { +public: + explicit UTaskGroup() = default; + CGRAPH_NO_ALLOWED_COPY(UTaskGroup) + + /** + * 直接通过函数来申明taskGroup + * @param task + * @param ttl + * @param onFinished + */ + explicit UTaskGroup(CGRAPH_DEFAULT_CONST_FUNCTION_REF task, + CMSec ttl = CGRAPH_MAX_BLOCK_TTL, + CGRAPH_CALLBACK_CONST_FUNCTION_REF onFinished = nullptr) noexcept { + this->addTask(task) + ->setTtl(ttl) + ->setOnFinished(onFinished); + } + + /** + * 添加一个任务 + * @param task + */ + UTaskGroup* addTask(CGRAPH_DEFAULT_CONST_FUNCTION_REF task) { + task_arr_.emplace_back(task); + return this; + } + + /** + * 设置任务最大超时时间 + * @param ttl + */ + UTaskGroup* setTtl(CMSec ttl) { + this->ttl_ = ttl; + return this; + } + + /** + * 设置执行完成后的回调函数 + * @param onFinished + * @return + */ + UTaskGroup* setOnFinished(CGRAPH_CALLBACK_CONST_FUNCTION_REF onFinished) { + this->on_finished_ = onFinished; + return this; + } + + /** + * 获取最大超时时间信息 + * @return + */ + [[nodiscard]] CMSec getTtl() const { + return this->ttl_; + } + + /** + * 清空任务组 + */ + CVoid clear() { + task_arr_.clear(); + } + + /** + * 获取任务组大小 + * @return + */ + [[nodiscard]] CSize getSize() const { + auto size = task_arr_.size(); + return size; + } + +private: + std::vector task_arr_; // 任务消息 + CMSec ttl_ = CGRAPH_MAX_BLOCK_TTL; // 任务组最大执行耗时(如果是0的话,则表示不阻塞) + CGRAPH_CALLBACK_FUNCTION on_finished_ = nullptr; // 执行函数任务结束 + + friend class UThreadPool; +}; + +using UTaskGroupPtr = UTaskGroup *; +using UTaskGroupRef = UTaskGroup &; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_UTASKGROUP_H diff --git a/src/UtilsCtrl/ThreadPool/Task/UTaskInclude.h b/src/UtilsCtrl/ThreadPool/Task/UTaskInclude.h new file mode 100644 index 0000000..f1d2015 --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/Task/UTaskInclude.h @@ -0,0 +1,15 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UTaskInclude.h +@Time: 2022/1/12 9:34 下午 +@Desc: +***************************/ + +#ifndef CGRAPH_UTASKINCLUDE_H +#define CGRAPH_UTASKINCLUDE_H + +#include "UTask.h" +#include "UTaskGroup.h" + +#endif //CGRAPH_UTASKINCLUDE_H diff --git a/src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h b/src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h new file mode 100644 index 0000000..95ecc35 --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h @@ -0,0 +1,212 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UThreadBase.h +@Time: 2021/7/2 11:24 下午 +@Desc: +***************************/ + +#ifndef CGRAPH_UTHREADBASE_H +#define CGRAPH_UTHREADBASE_H + +#include + +#include "../UThreadObject.h" +#include "../Queue/UQueueInclude.h" +#include "../Task/UTaskInclude.h" + + +CGRAPH_NAMESPACE_BEGIN + +class UThreadBase : public UThreadObject { + +protected: + explicit UThreadBase() { + done_ = true; + is_init_ = false; + is_running_ = false; + pool_task_queue_ = nullptr; + pool_priority_task_queue_ = nullptr; + config_ = nullptr; + total_task_num_ = 0; + } + + + ~UThreadBase() override { + reset(); + } + + + /** + * 所有线程类的 destroy 函数应该是一样的 + * 但是init函数不一样,因为线程构造函数不同 + * @return + */ + CStatus destroy() override { + CGRAPH_FUNCTION_BEGIN + CGRAPH_ASSERT_INIT(true) + + reset(); + CGRAPH_FUNCTION_END + } + + + /** + * 从线程池的队列中,获取任务 + * @param task + * @return + */ + virtual bool popPoolTask(UTaskRef task) { + bool result = pool_task_queue_->tryPop(task); + if (!result && CGRAPH_THREAD_TYPE_SECONDARY == type_) { + // 如果辅助线程没有获取到的话,还需要再尝试从长时间任务队列中,获取一次。 + result = pool_priority_task_queue_->tryPop(task); + } + return result; + } + + + /** + * 从线程池的队列中中,获取批量任务 + * @param tasks + * @return + */ + virtual bool popPoolTask(UTaskArrRef tasks) { + bool result = pool_task_queue_->tryPop(tasks, config_->max_pool_batch_size_); + if (!result && CGRAPH_THREAD_TYPE_SECONDARY == type_) { + result = pool_priority_task_queue_->tryPop(tasks, 1); + } + return result; + } + + + /** + * 执行单个任务 + * @param task + */ + CVoid runTask(UTask& task) { + is_running_ = true; + task(); + total_task_num_++; + is_running_ = false; + } + + + /** + * 批量执行任务 + * @param tasks + */ + CVoid runTasks(UTaskArr& tasks) { + is_running_ = true; + for (auto& task : tasks) { + task(); + } + total_task_num_ += tasks.size(); + is_running_ = false; + } + + + /** + * 清空所有任务内容 + */ + CVoid reset() { + done_ = false; + if (thread_.joinable()) { + thread_.join(); // 等待线程结束 + } + is_init_ = false; + is_running_ = false; + total_task_num_ = 0; + } + + + /** + * 设置线程优先级,仅针对非windows平台使用 + */ + CVoid setSchedParam() { +#ifndef _WIN32 + int priority = CGRAPH_THREAD_SCHED_OTHER; + int policy = CGRAPH_THREAD_MIN_PRIORITY; + if (type_ == CGRAPH_THREAD_TYPE_PRIMARY) { + priority = config_->primary_thread_priority_; + policy = config_->primary_thread_policy_; + } else if (type_ == CGRAPH_THREAD_TYPE_SECONDARY) { + priority = config_->secondary_thread_priority_; + policy = config_->secondary_thread_policy_; + } + + auto handle = thread_.native_handle(); + sched_param param = { calcPriority(priority) }; + int ret = pthread_setschedparam(handle, calcPolicy(policy), ¶m); + if (0 != ret) { + CGRAPH_ECHO("warning : set thread sched param failed, error code is [%d]", ret); + } +#endif + } + + /** + * 设置线程亲和性,仅针对linux系统 + */ + CVoid setAffinity(int index) { +#ifdef __linux__ + if (!config_->bind_cpu_enable_ || CGRAPH_CPU_NUM == 0 || index < 0) { + return; + } + + cpu_set_t mask; + CPU_ZERO(&mask); + CPU_SET(index % CGRAPH_CPU_NUM, &mask); + + auto handle = thread_.native_handle(); + int ret = pthread_setaffinity_np(handle, sizeof(cpu_set_t), &mask); + if (0 != ret) { + CGRAPH_ECHO("warning : set thread affinity failed, error code is [%d]", ret); + } +#endif + } + + +private: + /** + * 设定计算线程调度策略信息, + * 非OTHER/RR/FIFO对应数值,统一返回OTHER类型 + * @param policy + * @return + */ + [[nodiscard]] static int calcPolicy(int policy) { + return (CGRAPH_THREAD_SCHED_OTHER == policy + || CGRAPH_THREAD_SCHED_RR == policy + || CGRAPH_THREAD_SCHED_FIFO == policy) + ? policy : CGRAPH_THREAD_SCHED_OTHER; + } + + + /** + * 设定线程优先级信息 + * 超过[min,max]范围,统一设置为min值 + * @param priority + * @return + */ + [[nodiscard]] static int calcPriority(int priority) { + return (priority >= CGRAPH_THREAD_MIN_PRIORITY + && priority <= CGRAPH_THREAD_MAX_PRIORITY) + ? priority : CGRAPH_THREAD_MIN_PRIORITY; + } + + +protected: + bool done_; // 线程状态标记 + bool is_init_; // 标记初始化状态 + bool is_running_; // 是否正在执行 + int type_ = 0; // 用于区分线程类型(主线程、辅助线程) + unsigned long total_task_num_ = 0; // 处理的任务的数字 + + UAtomicQueue* pool_task_queue_; // 用于存放线程池中的普通任务 + UAtomicPriorityQueue* pool_priority_task_queue_; // 用于存放线程池中的包含优先级任务的队列,仅辅助线程可以执行 + UThreadPoolConfigPtr config_ = nullptr; // 配置参数信息 + std::thread thread_; // 线程类 +}; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_UTHREADBASE_H diff --git a/src/UtilsCtrl/ThreadPool/Thread/UThreadInclude.h b/src/UtilsCtrl/ThreadPool/Thread/UThreadInclude.h new file mode 100644 index 0000000..a88f7f3 --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/Thread/UThreadInclude.h @@ -0,0 +1,15 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UThreadInclude.h +@Time: 2022/1/12 11:09 下午 +@Desc: +***************************/ + +#ifndef CGRAPH_UTHREADINCLUDE_H +#define CGRAPH_UTHREADINCLUDE_H + +#include "UThreadPrimary.h" +#include "UThreadSecondary.h" + +#endif //CGRAPH_UTHREADINCLUDE_H diff --git a/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h b/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h new file mode 100644 index 0000000..a41cf0c --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h @@ -0,0 +1,216 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UThreadPrimary.h +@Time: 2021/7/8 11:02 下午 +@Desc: 核心线程,处理任务中 +***************************/ + +#ifndef CGRAPH_UTHREADPRIMARY_H +#define CGRAPH_UTHREADPRIMARY_H + +#include "UThreadBase.h" + +CGRAPH_NAMESPACE_BEGIN + +class UThreadPrimary : public UThreadBase { +protected: + explicit UThreadPrimary() { + index_ = -1; + pool_threads_ = nullptr; + type_ = CGRAPH_THREAD_TYPE_PRIMARY; + } + + + CStatus init() override { + CGRAPH_FUNCTION_BEGIN + CGRAPH_ASSERT_INIT(false) + + is_init_ = true; + thread_ = std::move(std::thread(&UThreadPrimary::run, this)); + setSchedParam(); + setAffinity(index_); + CGRAPH_FUNCTION_END + } + + + /** + * 注册线程池相关内容,需要在init之前使用 + * @param index + * @param poolTaskQueue + * @param poolThreads + * @param config + */ + CStatus setThreadPoolInfo(int index, + UAtomicQueue* poolTaskQueue, + std::vector* poolThreads, + UThreadPoolConfigPtr config) { + CGRAPH_FUNCTION_BEGIN + CGRAPH_ASSERT_INIT(false) // 初始化之前,设置参数 + CGRAPH_ASSERT_NOT_NULL(poolTaskQueue) + CGRAPH_ASSERT_NOT_NULL(poolThreads) + CGRAPH_ASSERT_NOT_NULL(config) + + this->index_ = index; + this->pool_task_queue_ = poolTaskQueue; + this->pool_threads_ = poolThreads; + this->config_ = config; + CGRAPH_FUNCTION_END + } + + + /** + * 线程执行函数 + * @return + */ + CStatus run() override { + CGRAPH_FUNCTION_BEGIN + CGRAPH_ASSERT_INIT(true) + CGRAPH_ASSERT_NOT_NULL(pool_threads_) + CGRAPH_ASSERT_NOT_NULL(config_) + + /** + * 线程池中任何一个primary线程为null都不可以执行 + * 防止线程初始化失败的情况,导致的崩溃 + * 理论不会走到这个判断逻辑里面 + */ + if (std::any_of(pool_threads_->begin(), pool_threads_->end(), + [](UThreadPrimary* thd) { + return nullptr == thd; + })) { + CGRAPH_RETURN_ERROR_STATUS("primary thread is null") + } + + if (config_->calcBatchTaskRatio()) { + while (done_) { + processTasks(); // 批量任务获取执行接口 + } + } else { + while (done_) { + processTask(); // 单个任务获取执行接口 + } + } + + CGRAPH_FUNCTION_END + } + + + /** + * 获取并执行任务 + * @return + */ + CVoid processTask() { + UTask task; + if (popTask(task) || popPoolTask(task) || stealTask(task)) { + runTask(task); + } else { + std::this_thread::yield(); + } + } + + + /** + * 获取批量执行task信息 + */ + CVoid processTasks() { + UTaskArr tasks; + if (popTask(tasks) || popPoolTask(tasks) || stealTask(tasks)) { + // 尝试从主线程中获取/盗取批量task,如果成功,则依次执行 + runTasks(tasks); + } else { + std::this_thread::yield(); + } + } + + + /** + * 从本地弹出一个任务 + * @param task + * @return + */ + bool popTask(UTaskRef task) { + return work_stealing_queue_.tryPop(task); + } + + + /** + * 从本地弹出一批任务 + * @param tasks + * @return + */ + bool popTask(UTaskArrRef tasks) { + return work_stealing_queue_.tryPop(tasks, config_->max_local_batch_size_); + } + + + /** + * 从其他线程窃取一个任务 + * @param task + * @return + */ + bool stealTask(UTaskRef task) { + if (unlikely(pool_threads_->size() < config_->default_thread_size_)) { + /** + * 线程池还未初始化完毕的时候,无法进行steal。 + * 确保程序安全运行。 + */ + return false; + } + + /** + * 窃取的时候,仅从相邻的primary线程中窃取 + * 待窃取相邻的数量,不能超过默认primary线程数 + */ + int range = config_->calcStealRange(); + for (int i = 0; i < range; i++) { + /** + * 从线程中周围的thread中,窃取任务。 + * 如果成功,则返回true,并且执行任务。 + */ + int curIndex = (index_ + i + 1) % config_->default_thread_size_; + if (nullptr != (*pool_threads_)[curIndex] + && ((*pool_threads_)[curIndex])->work_stealing_queue_.trySteal(task)) { + return true; + } + } + + return false; + } + + + /** + * 从其他线程盗取一批任务 + * @param tasks + * @return + */ + bool stealTask(UTaskArrRef tasks) { + if (unlikely(pool_threads_->size() < config_->default_thread_size_)) { + return false; + } + + int range = config_->calcStealRange(); + for (int i = 0; i < range; i++) { + int curIndex = (index_ + i + 1) % config_->default_thread_size_; + if (nullptr != (*pool_threads_)[curIndex] + && ((*pool_threads_)[curIndex])->work_stealing_queue_.trySteal(tasks, config_->max_steal_batch_size_)) { + return true; + } + } + + return false; + } + +private: + int index_ {-1}; // 线程index + UWorkStealingQueue work_stealing_queue_; // 内部队列信息 + std::vector* pool_threads_; // 用于存放线程池中的线程信息 + + friend class UThreadPool; + friend class UAllocator; +}; + +using UThreadPrimaryPtr = UThreadPrimary *; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_UTHREADPRIMARY_H diff --git a/src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h b/src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h new file mode 100644 index 0000000..3ac21a8 --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h @@ -0,0 +1,131 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UThreadSecondary.h +@Time: 2021/7/8 11:02 下午 +@Desc: +***************************/ + +#ifndef CGRAPH_UTHREADSECONDARY_H +#define CGRAPH_UTHREADSECONDARY_H + +#include "UThreadBase.h" + +CGRAPH_NAMESPACE_BEGIN + +class UThreadSecondary : public UThreadBase { +public: + explicit UThreadSecondary() { + cur_ttl_ = 0; + type_ = CGRAPH_THREAD_TYPE_SECONDARY; + } + + +protected: + CStatus init() override { + CGRAPH_FUNCTION_BEGIN + CGRAPH_ASSERT_INIT(false) + CGRAPH_ASSERT_NOT_NULL(config_) + + cur_ttl_ = config_->secondary_thread_ttl_; + is_init_ = true; + thread_ = std::move(std::thread(&UThreadSecondary::run, this)); + setSchedParam(); + CGRAPH_FUNCTION_END + } + + + /** + * 设置pool的信息 + * @param poolTaskQueue + * @param poolPriorityTaskQueue + * @param config + * @return + */ + CStatus setThreadPoolInfo(UAtomicQueue* poolTaskQueue, + UAtomicPriorityQueue* poolPriorityTaskQueue, + UThreadPoolConfigPtr config) { + CGRAPH_FUNCTION_BEGIN + CGRAPH_ASSERT_INIT(false) // 初始化之前,设置参数 + CGRAPH_ASSERT_NOT_NULL(poolTaskQueue) + CGRAPH_ASSERT_NOT_NULL(poolPriorityTaskQueue) + CGRAPH_ASSERT_NOT_NULL(config) + + this->pool_task_queue_ = poolTaskQueue; + this->pool_priority_task_queue_ = poolPriorityTaskQueue; + this->config_ = config; + CGRAPH_FUNCTION_END + } + + + CStatus run() override { + CGRAPH_FUNCTION_BEGIN + CGRAPH_ASSERT_INIT(true) + CGRAPH_ASSERT_NOT_NULL(config_) + + if (config_->calcBatchTaskRatio()) { + while (done_) { + processTasks(); // 批量任务获取执行接口 + } + } else { + while (done_) { + processTask(); // 单个任务获取执行接口 + } + } + + CGRAPH_FUNCTION_END + } + + + /** + * 任务执行函数,从线程池的任务队列中获取信息 + */ + CVoid processTask() { + UTask task; + if (popPoolTask(task)) { + runTask(task); + } else { + std::this_thread::yield(); + } + } + + + /** + * 批量执行n个任务 + */ + CVoid processTasks() { + UTaskArr tasks; + if (popPoolTask(tasks)) { + runTasks(tasks); + } else { + std::this_thread::yield(); + } + } + + + /** + * 判断本线程是否需要被自动释放 + * @return + */ + bool freeze() { + if (likely(is_running_)) { + cur_ttl_++; + cur_ttl_ = std::min(cur_ttl_, config_->secondary_thread_ttl_); + } else { + cur_ttl_--; // 如果当前线程没有在执行,则ttl-1 + } + + return cur_ttl_ <= 0; + } + +private: + int cur_ttl_ = 0; // 当前最大生存周期 + + friend class UThreadPool; +}; + +using UThreadSecondaryPtr = UThreadSecondary *; + +CGRAPH_NAMESPACE_END + +#endif // CGRAPH_UTHREADSECONDARY_H diff --git a/src/UtilsCtrl/ThreadPool/UThreadObject.h b/src/UtilsCtrl/ThreadPool/UThreadObject.h new file mode 100644 index 0000000..f3c26ac --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/UThreadObject.h @@ -0,0 +1,30 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: CThreadObject.h +@Time: 2021/7/2 10:39 下午 +@Desc: +***************************/ + +#ifndef CGRAPH_UTHREADOBJECT_H +#define CGRAPH_UTHREADOBJECT_H + +#include "../UtilsObject.h" + +CGRAPH_NAMESPACE_BEGIN + +class UThreadObject : public UtilsObject { + +protected: + /** + * 部分thread中的算子,可以不实现run方法 + * @return + */ + CStatus run() override { + CGRAPH_NO_SUPPORT + } +}; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_UTHREADOBJECT_H diff --git a/src/UtilsCtrl/ThreadPool/UThreadPool.cpp b/src/UtilsCtrl/ThreadPool/UThreadPool.cpp new file mode 100644 index 0000000..4b431b9 --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/UThreadPool.cpp @@ -0,0 +1,201 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UThreadPool.cpp +@Time: 2022/10/3 17:31 +@Desc: +***************************/ + +#include "UThreadPool.h" + +CGRAPH_NAMESPACE_BEGIN + +UThreadPool::UThreadPool(CBool autoInit, const UThreadPoolConfig& config) noexcept { + cur_index_ = 0; + is_init_ = false; + input_task_num_ = 0; + this->setConfig(config); // setConfig 函数,用在 is_init_ 设定之后 + is_monitor_ = config_.monitor_enable_; /** 根据参数设定,决定是否开启监控线程。默认开启 */ + /** + * CGraph 本身支持跨平台运行 + * 如果在windows平台上,通过Visual Studio(2017版本或以下) 版本,将 UThreadPool 类封装程.dll文件时,遇到无法启动的问题 + * 请参考此链接:https://github.com/ChunelFeng/CGraph/issues/17 + */ + monitor_thread_ = std::move(std::thread(&UThreadPool::monitor, this)); + if (autoInit) { + this->init(); + } +} + + +UThreadPool::~UThreadPool() { + is_monitor_ = false; // 在析构的时候,才释放监控线程。先释放监控线程,再释放其他的线程 + if (monitor_thread_.joinable()) { + monitor_thread_.join(); + } + + destroy(); +} + + +CStatus UThreadPool::setConfig(const UThreadPoolConfig &config) { + CGRAPH_FUNCTION_BEGIN + CGRAPH_ASSERT_INIT(false) // 初始化后,无法设置参数信息 + + this->config_ = config; + CGRAPH_FUNCTION_END +} + + +CStatus UThreadPool::init() { + CGRAPH_FUNCTION_BEGIN + if (is_init_) { + CGRAPH_FUNCTION_END + } + + primary_threads_.reserve(config_.default_thread_size_); + for (int i = 0; i < config_.default_thread_size_; i++) { + auto ptr = CGRAPH_SAFE_MALLOC_COBJECT(UThreadPrimary) // 创建核心线程数 + + ptr->setThreadPoolInfo(i, &task_queue_, &primary_threads_, &config_); + status += ptr->init(); + primary_threads_.emplace_back(ptr); + } + CGRAPH_FUNCTION_CHECK_STATUS + + is_init_ = true; + CGRAPH_FUNCTION_END +} + + +CStatus UThreadPool::submit(const UTaskGroup& taskGroup, CMSec ttl) { + CGRAPH_FUNCTION_BEGIN + CGRAPH_ASSERT_INIT(true) + + std::vector> futures; + for (const auto& task : taskGroup.task_arr_) { + futures.emplace_back(commit(task)); + } + + // 计算最终运行时间信息 + auto deadline = std::chrono::system_clock::now() + + std::chrono::milliseconds(std::min(taskGroup.getTtl(), ttl)); + + for (auto& fut : futures) { + const auto& futStatus = fut.wait_until(deadline); + switch (futStatus) { + case std::future_status::ready: break; // 正常情况,直接返回了 + case std::future_status::timeout: status += CStatus("thread status timeout"); break; + case std::future_status::deferred: status += CStatus("thread status deferred"); break; + default: status += CStatus("thread status unknown"); + } + } + + if (taskGroup.on_finished_) { + taskGroup.on_finished_(status); + } + + CGRAPH_FUNCTION_END +} + + +CStatus UThreadPool::submit(CGRAPH_DEFAULT_CONST_FUNCTION_REF func, CMSec ttl, + CGRAPH_CALLBACK_CONST_FUNCTION_REF onFinished) { + return submit(UTaskGroup(func, ttl, onFinished)); +} + + +CStatus UThreadPool::destroy() { + CGRAPH_FUNCTION_BEGIN + if (!is_init_) { + CGRAPH_FUNCTION_END + } + + // primary 线程是普通指针,需要delete + for (auto &pt : primary_threads_) { + status += pt->destroy(); + CGRAPH_DELETE_PTR(pt) + } + CGRAPH_FUNCTION_CHECK_STATUS + primary_threads_.clear(); + + // secondary 线程是智能指针,不需要delete + for (auto &st : secondary_threads_) { + status += st->destroy(); + } + CGRAPH_FUNCTION_CHECK_STATUS + secondary_threads_.clear(); + is_init_ = false; + + CGRAPH_FUNCTION_END +} + + +CIndex UThreadPool::dispatch(CIndex origIndex) { + if (unlikely(config_.fair_lock_enable_)) { + return CGRAPH_DEFAULT_TASK_STRATEGY; // 如果开启fair lock,则全部写入 pool的queue中,依次执行 + } + + CIndex realIndex = 0; + if (CGRAPH_DEFAULT_TASK_STRATEGY == origIndex) { + /** + * 如果是默认策略信息,在[0, default_thread_size_) 之间的,通过 thread 中queue来调度 + * 在[default_thread_size_, max_thread_size_) 之间的,通过 pool 中的queue来调度 + */ + realIndex = cur_index_++; + if (cur_index_ >= config_.max_thread_size_ || cur_index_ < 0) { + cur_index_ = 0; + } + } else { + realIndex = origIndex; + } + + return realIndex; // 交到上游去判断,走哪个线程 +} + + +CStatus UThreadPool::createSecondaryThread(CInt size) { + CGRAPH_FUNCTION_BEGIN + + int leftSize = (int)(config_.max_thread_size_ - config_.default_thread_size_ - secondary_threads_.size()); + int realSize = std::min(size, leftSize); // 使用 realSize 来确保所有的线程数量之和,不会超过设定max值 + for (int i = 0; i < realSize; i++) { + auto ptr = CGRAPH_MAKE_UNIQUE_COBJECT(UThreadSecondary) + ptr->setThreadPoolInfo(&task_queue_, &priority_task_queue_, &config_); + status += ptr->init(); + secondary_threads_.emplace_back(std::move(ptr)); + } + + CGRAPH_FUNCTION_END +} + + +CVoid UThreadPool::monitor() { + while (is_monitor_) { + while (is_monitor_ && !is_init_) { + // 如果没有init,则一直处于空跑状态 + CGRAPH_SLEEP_SECOND(1) + } + + int span = config_.monitor_span_; + while (is_monitor_ && is_init_ && span--) { + CGRAPH_SLEEP_SECOND(1) // 保证可以快速退出 + } + + // 如果 primary线程都在执行,则表示忙碌 + bool busy = std::all_of(primary_threads_.begin(), primary_threads_.end(), + [](UThreadPrimaryPtr ptr) { return nullptr != ptr && ptr->is_running_; }); + + // 如果忙碌或者priority_task_queue_中有任务,则需要添加 secondary线程 + if (busy || !priority_task_queue_.empty()) { + createSecondaryThread(1); + } + + // 判断 secondary 线程是否需要退出 + for (auto iter = secondary_threads_.begin(); iter != secondary_threads_.end(); ) { + (*iter)->freeze() ? secondary_threads_.erase(iter++) : iter++; + } + } +} + +CGRAPH_NAMESPACE_END \ No newline at end of file diff --git a/src/UtilsCtrl/ThreadPool/UThreadPool.h b/src/UtilsCtrl/ThreadPool/UThreadPool.h new file mode 100644 index 0000000..7f3928f --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/UThreadPool.h @@ -0,0 +1,152 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UThreadPool.h +@Time: 2021/7/4 1:34 下午 +@Desc: +***************************/ + +#ifndef CGRAPH_UTHREADPOOL_H +#define CGRAPH_UTHREADPOOL_H + +#include +#include +#include +#include +#include +#include +#include + +#include "UThreadObject.h" +#include "UThreadPoolConfig.h" +#include "Queue/UQueueInclude.h" +#include "Thread/UThreadInclude.h" +#include "Task/UTaskInclude.h" + +CGRAPH_NAMESPACE_BEGIN + +class UThreadPool : public UThreadObject { +public: + /** + * 通过默认设置参数,来创建线程池 + * @param autoInit 是否自动开启线程池功能 + * @param config + */ + explicit UThreadPool(CBool autoInit = true, + const UThreadPoolConfig& config = UThreadPoolConfig()) noexcept; + + /** + * 析构函数 + */ + ~UThreadPool() override; + + /** + * 设置线程池相关配置信息,需要在init()函数调用前,完成设置 + * @param config + * @return + * @notice 通过单例类(UThreadPoolSingleton)开启线程池,则线程池默认init。需要 destroy 后才可以设置参数 + */ + CStatus setConfig(const UThreadPoolConfig &config); + + /** + * 开启所有的线程信息 + * @return + */ + CStatus init() final; + + /** + * 提交任务信息 + * @tparam FunctionType + * @param func + * @param index + * @return + */ + template + auto commit(const FunctionType& func, + CIndex index = CGRAPH_DEFAULT_TASK_STRATEGY) + -> std::future::type>; + + /** + * 根据优先级,执行任务 + * @tparam FunctionType + * @param func + * @param priority 优先级别。自然序从大到小依次执行 + * @return + * @notice 建议,priority 范围在 [-100, 100] 之间 + */ + template + auto commitWithPriority(const FunctionType& func, + int priority) + -> std::future::type>; + + /** + * 执行任务组信息 + * 取taskGroup内部ttl和入参ttl的最小值,为计算ttl标准 + * @param taskGroup + * @param ttl + * @return + */ + CStatus submit(const UTaskGroup& taskGroup, + CMSec ttl = CGRAPH_MAX_BLOCK_TTL); + + /** + * 针对单个任务的情况,复用任务组信息,实现单个任务直接执行 + * @param task + * @param ttl + * @param onFinished + * @return + */ + CStatus submit(CGRAPH_DEFAULT_CONST_FUNCTION_REF func, + CMSec ttl = CGRAPH_MAX_BLOCK_TTL, + CGRAPH_CALLBACK_CONST_FUNCTION_REF onFinished = nullptr); + + /** + * 释放所有的线程信息 + * @return + */ + CStatus destroy() final; + + +protected: + /** + * 根据传入的策略信息,确定最终执行方式 + * @param origIndex + * @return + */ + virtual CIndex dispatch(CIndex origIndex); + + /** + * 生成辅助线程。内部确保辅助线程数量不超过设定参数 + * @param size + * @return + */ + CStatus createSecondaryThread(CInt size); + + /** + * 监控线程执行函数,主要是判断是否需要增加线程,或销毁线程 + * 增/删 操作,仅针对secondary类型线程生效 + */ + CVoid monitor(); + + CGRAPH_NO_ALLOWED_COPY(UThreadPool) + +protected: + CBool is_init_ { false }; // 是否初始化 + CBool is_monitor_ { true }; // 是否需要监控 + CInt cur_index_ = 0; // 记录放入的线程数 + CULong input_task_num_ = 0; // 放入的任务的个数 + UAtomicQueue task_queue_; // 用于存放普通任务 + UAtomicPriorityQueue priority_task_queue_; // 运行时间较长的任务队列,仅在辅助线程中执行 + std::vector primary_threads_; // 记录所有的主线程 + std::list> secondary_threads_; // 用于记录所有的辅助线程 + UThreadPoolConfig config_; // 线程池设置值 + std::thread monitor_thread_; // 监控线程 +}; + +using UThreadPoolPtr = UThreadPool *; + +CGRAPH_NAMESPACE_END + +#include "UThreadPool.inl" + +#endif //CGRAPH_UTHREADPOOL_H diff --git a/src/UtilsCtrl/ThreadPool/UThreadPool.inl b/src/UtilsCtrl/ThreadPool/UThreadPool.inl new file mode 100644 index 0000000..8d3d4f0 --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/UThreadPool.inl @@ -0,0 +1,62 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UThreadPool.inl +@Time: 2021/7/4 1:34 下午 +@Desc: +***************************/ + +#ifndef CGRAPH_UTHREADPOOL_INL +#define CGRAPH_UTHREADPOOL_INL + +#include "UThreadPool.h" + +CGRAPH_NAMESPACE_BEGIN + +template +auto UThreadPool::commit(const FunctionType& func, CIndex index) +-> std::future::type> { + using ResultType = typename std::result_of::type; + + std::packaged_task task(func); + std::future result(task.get_future()); + + CIndex realIndex = dispatch(index); + if (realIndex >= 0 && realIndex < config_.default_thread_size_) { + // 如果返回的结果,在主线程数量之间,则放到主线程的queue中执行 + primary_threads_[realIndex]->work_stealing_queue_.push(std::move(task)); + } else if (CGRAPH_LONG_TIME_TASK_STRATEGY == realIndex) { + /** + * 如果是长时间任务,则交给特定的任务队列,仅由辅助线程处理 + * 目的是防止有很多长时间任务,将所有运行的线程均阻塞 + * 长任务程序,默认优先级较低 + **/ + priority_task_queue_.push(std::move(task), CGRAPH_LONG_TIME_TASK_STRATEGY); + } else { + // 返回其他结果,放到pool的queue中执行 + task_queue_.push(std::move(task)); + } + input_task_num_++; // 计数 + return result; +} + +template +auto UThreadPool::commitWithPriority(const FunctionType& func, int priority) +-> std::future::type> { + using ResultType = typename std::result_of::type; + + std::packaged_task task(func); + std::future result(task.get_future()); + + if (secondary_threads_.empty()) { + createSecondaryThread(1); // 如果没有开启辅助线程,则直接开启一个 + } + + priority_task_queue_.push(std::move(task), priority); + input_task_num_++; + return result; +} + +CGRAPH_NAMESPACE_END + +#endif // CGRAPH_UTHREADPOOL_INL \ No newline at end of file diff --git a/src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h b/src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h new file mode 100644 index 0000000..6afca86 --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h @@ -0,0 +1,66 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UThreadPoolConfig.h +@Time: 2022/1/3 9:31 下午 +@Desc: 线程池配置信息 +***************************/ + +#ifndef CGRAPH_UTHREADPOOLCONFIG_H +#define CGRAPH_UTHREADPOOLCONFIG_H + +#include "UThreadObject.h" +#include "UThreadPoolDefine.h" + +CGRAPH_NAMESPACE_BEGIN + +struct UThreadPoolConfig : public UThreadObject { + /** 具体值含义,参考UThreadPoolDefine.h文件 */ + int default_thread_size_ = CGRAPH_DEFAULT_THREAD_SIZE; + int max_thread_size_ = CGRAPH_MAX_THREAD_SIZE; + int max_task_steal_range_ = CGRAPH_MAX_TASK_STEAL_RANGE; + int max_local_batch_size_ = CGRAPH_MAX_LOCAL_BATCH_SIZE; + int max_pool_batch_size_ = CGRAPH_MAX_POOL_BATCH_SIZE; + int max_steal_batch_size_ = CGRAPH_MAX_STEAL_BATCH_SIZE; + int secondary_thread_ttl_ = CGRAPH_SECONDARY_THREAD_TTL; + int monitor_span_ = CGRAPH_MONITOR_SPAN; + int primary_thread_policy_ = CGRAPH_PRIMARY_THREAD_POLICY; + int secondary_thread_policy_ = CGRAPH_SECONDARY_THREAD_POLICY; + int primary_thread_priority_ = CGRAPH_PRIMARY_THREAD_PRIORITY; + int secondary_thread_priority_ = CGRAPH_SECONDARY_THREAD_PRIORITY; + bool bind_cpu_enable_ = CGRAPH_BIND_CPU_ENABLE; + bool batch_task_enable_ = CGRAPH_BATCH_TASK_ENABLE; + bool fair_lock_enable_ = CGRAPH_FAIR_LOCK_ENABLE; + bool monitor_enable_ = CGRAPH_MONITOR_ENABLE; + + +protected: + /** + * 计算可盗取的范围,盗取范围不能超过默认线程数-1 + * @return + */ + [[nodiscard]] int calcStealRange() const { + int range = std::min(this->max_task_steal_range_, this->default_thread_size_ - 1); + return range; + } + + + /** + * 计算是否开启批量任务 + * 开启条件:开关批量开启,并且 未开启非公平锁 + * @return + */ + [[nodiscard]] bool calcBatchTaskRatio() const { + bool ratio = (this->batch_task_enable_) && (!this->fair_lock_enable_); + return ratio; + } + + friend class UThreadPrimary; + friend class UThreadSecondary; +}; + +using UThreadPoolConfigPtr = UThreadPoolConfig *; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_UTHREADPOOLCONFIG_H diff --git a/src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h b/src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h new file mode 100644 index 0000000..5126735 --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h @@ -0,0 +1,78 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: ThreadPoolDefine.h +@Time: 2021/7/3 12:24 上午 +@Desc: +***************************/ + +#ifndef CGRAPH_UTHREADPOOLDEFINE_H +#define CGRAPH_UTHREADPOOLDEFINE_H + +#include + #if _LIBCPP_STD_VER >= 17 +#include + #else +# include + #endif +#include + +CGRAPH_NAMESPACE_BEGIN + + #if _LIBCPP_STD_VER >= 17 +using CGRAPH_READ_LOCK = std::shared_lock; +using CGRAPH_WRITE_LOCK = std::unique_lock; + #else +using CGRAPH_READ_LOCK = std::unique_lock; // C++14不支持读写锁,使用mutex替代 +using CGRAPH_WRITE_LOCK = std::unique_lock; + #endif + +using CGRAPH_LOCK_GUARD = std::lock_guard; +using CGRAPH_UNIQUE_LOCK = std::unique_lock; + +static const int CGRAPH_CPU_NUM = (int)std::thread::hardware_concurrency(); +static const int CGRAPH_THREAD_TYPE_PRIMARY = 1; +static const int CGRAPH_THREAD_TYPE_SECONDARY = 2; + #ifndef _WIN32 +static const int CGRAPH_THREAD_SCHED_OTHER = SCHED_OTHER; +static const int CGRAPH_THREAD_SCHED_RR = SCHED_RR; +static const int CGRAPH_THREAD_SCHED_FIFO = SCHED_FIFO; + #else +/** 线程调度策略,暂不支持windows系统 */ +static const int CGRAPH_THREAD_SCHED_OTHER = 0; +static const int CGRAPH_THREAD_SCHED_RR = 0; +static const int CGRAPH_THREAD_SCHED_FIFO = 0; + #endif +static const int CGRAPH_THREAD_MIN_PRIORITY = 0; // 线程最低优先级 +static const int CGRAPH_THREAD_MAX_PRIORITY = 99; // 线程最高优先级 +static const CMSec CGRAPH_MAX_BLOCK_TTL = 10000000; // 最大阻塞时间,单位为ms +static const int CGRAPH_DEFAULT_TASK_STRATEGY = -1; // 默认线程调度策略 +static const int CGRAPH_LONG_TIME_TASK_STRATEGY = -101; // 长时间任务调度策略 + +/** + * 以下为线程池配置信息 + */ +static const int CGRAPH_DEFAULT_THREAD_SIZE = (CGRAPH_CPU_NUM > 0) ? CGRAPH_CPU_NUM : 8; // 默认主线程个数 +static const int CGRAPH_MAX_THREAD_SIZE = (CGRAPH_DEFAULT_THREAD_SIZE * 2) + 1; // 最大线程个数 + #ifndef _WIN32 +static const int CGRAPH_MAX_TASK_STEAL_RANGE = 2; // 盗取机制相邻范围 + #else +static const int CGRAPH_MAX_TASK_STEAL_RANGE = 0; // windows平台暂不支持任务盗取功能 + #endif +static const bool CGRAPH_BATCH_TASK_ENABLE = false; // 是否开启批量任务功能 +static const int CGRAPH_MAX_LOCAL_BATCH_SIZE = 2; // 批量执行本地任务最大值 +static const int CGRAPH_MAX_POOL_BATCH_SIZE = 2; // 批量执行通用任务最大值 +static const int CGRAPH_MAX_STEAL_BATCH_SIZE = 2; // 批量盗取任务最大值 +static const bool CGRAPH_FAIR_LOCK_ENABLE = false; // 是否开启公平锁(非必须场景不建议开启,开启后CGRAPH_BATCH_TASK_ENABLE无效) +static const int CGRAPH_SECONDARY_THREAD_TTL = 10; // 辅助线程ttl,单位为s +static const bool CGRAPH_MONITOR_ENABLE = true; // 是否开启监控程序 +static const int CGRAPH_MONITOR_SPAN = 5; // 监控线程执行间隔,单位为s +static const bool CGRAPH_BIND_CPU_ENABLE = true; // 是否开启绑定cpu模式(仅针对主线程) +static const int CGRAPH_PRIMARY_THREAD_POLICY = CGRAPH_THREAD_SCHED_OTHER; // 主线程调度策略 +static const int CGRAPH_SECONDARY_THREAD_POLICY = CGRAPH_THREAD_SCHED_OTHER; // 辅助线程调度策略 +static const int CGRAPH_PRIMARY_THREAD_PRIORITY = CGRAPH_THREAD_MIN_PRIORITY; // 主线程调度优先级(取值范围0~99) +static const int CGRAPH_SECONDARY_THREAD_PRIORITY = CGRAPH_THREAD_MIN_PRIORITY; // 辅助线程调度优先级(取值范围0~99) + +CGRAPH_NAMESPACE_END + +#endif // CGRAPH_UTHREADPOOLDEFINE_H diff --git a/src/UtilsCtrl/ThreadPool/UThreadPoolInclude.h b/src/UtilsCtrl/ThreadPool/UThreadPoolInclude.h new file mode 100644 index 0000000..afc5429 --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/UThreadPoolInclude.h @@ -0,0 +1,20 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: ThreadPoolInclude.h +@Time: 2021/7/3 12:25 上午 +@Desc: +***************************/ + +#ifndef CGRAPH_UTHREADPOOLINCLUDE_H +#define CGRAPH_UTHREADPOOLINCLUDE_H + +#include "UThreadObject.h" +#include "UThreadPool.h" +#include "UThreadPoolDefine.h" +#include "UThreadPoolConfig.h" +#include "Queue/UQueueInclude.h" +#include "Task/UTaskInclude.h" +#include "Thread/UThreadInclude.h" + +#endif //CGRAPH_UTHREADPOOLINCLUDE_H diff --git a/src/UtilsCtrl/UAllocator.h b/src/UtilsCtrl/UAllocator.h new file mode 100644 index 0000000..3ca20cf --- /dev/null +++ b/src/UtilsCtrl/UAllocator.h @@ -0,0 +1,87 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UAllocator.h +@Time: 2021/10/28 9:15 下午 +@Desc: +***************************/ + +#ifndef CGRAPH_UALLOCATOR_H +#define CGRAPH_UALLOCATOR_H + +#ifdef _GENERATE_SESSION_ + #include +#endif + +#include +#include + +CGRAPH_NAMESPACE_BEGIN + +static std::mutex g_session_mtx; + +/** + * 仅用于生成CObject类型的类 + */ +class UAllocator : public CObject { +public: + /** + * 生成普通指针信息 + * @tparam T + * @return + */ + template::value, int> = 0> + static T* safeMallocCObject() { + T* ptr = nullptr; + while (!ptr) { + ptr = new(std::nothrow) T(); + } + return ptr; + } + + + /** + * 生成unique智能指针信息 + * @tparam T + * @return + */ + template::value, int> = 0> + static std::unique_ptr makeUniqueCObject() { + return std::make_unique(); + } + + + /** + * 生成唯一标识信息 + * @return + */ + static std::string generateSession() { + #ifdef _GENERATE_SESSION_ + std::lock_guard lock{ g_session_mtx }; + uuid_t uuid; + char session[36] = {0}; // 36是特定值 + uuid_generate(uuid); + uuid_unparse(uuid, session); + + return session; + #else + return CGRAPH_EMPTY; // 非mac平台,暂时不支持自动生成session信息 + #endif + } +}; + + +#define CGRAPH_SAFE_MALLOC_COBJECT(Type) \ + UAllocator::safeMallocCObject(); \ + +#define CGRAPH_MAKE_UNIQUE_COBJECT(Type) \ + UAllocator::makeUniqueCObject(); \ + +#define CGRAPH_GENERATE_SESSION \ + UAllocator::generateSession(); \ + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_UALLOCATOR_H diff --git a/src/UtilsCtrl/UtilsDefine.h b/src/UtilsCtrl/UtilsDefine.h new file mode 100644 index 0000000..0e2cec8 --- /dev/null +++ b/src/UtilsCtrl/UtilsDefine.h @@ -0,0 +1,75 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UtilsDefine.h +@Time: 2021/4/30 8:52 下午 +@Desc: +***************************/ + +#ifndef CGRAPH_UTILSDEFINE_H +#define CGRAPH_UTILSDEFINE_H + +#include +#include + +#include "../CBasic/CBasicInclude.h" +#include "UAllocator.h" +#include "UtilsFunction.h" + +CGRAPH_NAMESPACE_BEGIN + +#ifdef _ENABLE_LIKELY_ + #define likely(x) __builtin_expect(!!(x), 1) + #define unlikely(x) __builtin_expect(!!(x), 0) +#else + #define likely + #define unlikely +#endif + +/* 判断传入的指针信息是否为空 */ +#define CGRAPH_ASSERT_NOT_NULL(ptr) \ + if (unlikely(nullptr == (ptr))) { \ + return CStatus("ptr is nullptr"); \ + } \ + +#define CGRAPH_ASSERT_NOT_NULL_RETURN_NULL(ptr) \ + if (unlikely(nullptr == (ptr))) { \ + return nullptr; \ + } \ + +/* 判断函数流程是否可以继续 */ +static std::mutex g_check_status_mtx; +#define CGRAPH_FUNCTION_CHECK_STATUS \ + if (unlikely(!status.isOK())) { \ + std::lock_guard lock{ g_check_status_mtx }; \ + CGRAPH_ECHO("%s | %s | line = [%d], errorCode = [%d], errorInfo = [%s].", \ + __FILE__, __FUNCTION__, __LINE__, status.getCode(), status.getInfo().c_str()); \ + return status; \ + } \ + +/* 删除资源信息 */ +#define CGRAPH_DELETE_PTR(ptr) \ + if (unlikely((ptr) != nullptr)) { \ + delete (ptr); \ + (ptr) = nullptr; \ + } \ + +#define CGRAPH_ASSERT_INIT(isInit) \ + if (unlikely((isInit) != is_init_)) { \ + return CStatus("init status is not suitable"); \ + } \ + +#define CGRAPH_ASSERT_INIT_RETURN_NULL(isInit) \ + if (unlikely((isInit) != is_init_)) { \ + return nullptr; \ + } \ + +#define CGRAPH_SLEEP_MILLISECOND(ms) \ + std::this_thread::sleep_for(std::chrono::milliseconds(ms)); \ + +#define CGRAPH_SLEEP_SECOND(s) \ + std::this_thread::sleep_for(std::chrono::seconds(s)); \ + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_UTILSDEFINE_H diff --git a/src/UtilsCtrl/UtilsFunction.h b/src/UtilsCtrl/UtilsFunction.h new file mode 100644 index 0000000..207dbdb --- /dev/null +++ b/src/UtilsCtrl/UtilsFunction.h @@ -0,0 +1,131 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UtilsFunction.h +@Time: 2022/1/26 11:27 下午 +@Desc: +***************************/ + +#ifndef CGRAPH_UTILSFUNCTION_H +#define CGRAPH_UTILSFUNCTION_H + +#include +#include +#include +#include +#include + +CGRAPH_NAMESPACE_BEGIN + +/** + * 定制化输出 + * @param cmd + * @param ... + * 注:内部包含全局锁,不建议正式上线的时候使用 + */ +static std::mutex g_echo_mtx; +inline CVoid CGRAPH_ECHO(const char *cmd, ...) { +#ifdef _CGRAPH_SILENCE_ + return; +#endif + + std::lock_guard lock{ g_echo_mtx }; +#ifndef _WIN32 + // 非windows系统,打印到毫秒 + auto now = std::chrono::system_clock::now(); + // 通过不同精度获取相差的毫秒数 + uint64_t disMs = std::chrono::duration_cast(now.time_since_epoch()).count() + - std::chrono::duration_cast(now.time_since_epoch()).count() * 1000; + time_t tt = std::chrono::system_clock::to_time_t(now); + auto localTime = localtime(&tt); + char strTime[32] = { 0 }; + sprintf(strTime, "[%04d-%02d-%02d %02d:%02d:%02d.%03d]", localTime->tm_year + 1900, + localTime->tm_mon + 1, localTime->tm_mday, localTime->tm_hour, + localTime->tm_min, localTime->tm_sec, (int)disMs); + std::cout << "[CGraph] " << strTime << " "; +#else + // windows系统,打印到秒 + time_t curTime; + time(&curTime); + std::string ct = ctime(&curTime); + std::cout << "[CGraph] [" + << ct.assign(ct.begin(), ct.end()-1) // 去掉时间的最后一位\n信息 + << "] "; +#endif + + va_list args; + va_start(args, cmd); + vprintf(cmd, args); + va_end(args); + std::cout << "\n"; +} + + +/** + * 通用容器累加信息 + * @tparam T (例:std::vector) + * @param container + * @return + */ +template +typename T::value_type CGRAPH_CONTAINER_SUM(const T& container) { + typename T::value_type result = 0; + for (const auto& val : container) { + result += val; + } + return result; +} + + +/** + * 通用容器累乘信息 + * @tparam T (例:std::vector) + * @param container + * @return + */ +template +typename T::value_type CGRAPH_CONTAINER_MULTIPLY(const T& container) { + typename T::value_type result = 1; + for (const auto& val : container) { + result *= val; + } + return result; +} + + +/** + * 获取max值 + * @tparam T + * @param value + * @return + */ +template +T CGRAPH_MAX(T val) { + return val; +} + +template +T CGRAPH_MAX(T val, Args... args) { + return std::max(val, CGRAPH_MAX(args...)); +} + + +/** + * 累加 + * @tparam T + * @param t + * @return + */ +template +T CGRAPH_SUM(T t) { + return t; +} + +template +T CGRAPH_SUM(T val, Args... args) { + return val + CGRAPH_SUM(args...); +} + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_UTILSFUNCTION_H diff --git a/src/UtilsCtrl/UtilsInclude.h b/src/UtilsCtrl/UtilsInclude.h new file mode 100644 index 0000000..0a31c2b --- /dev/null +++ b/src/UtilsCtrl/UtilsInclude.h @@ -0,0 +1,16 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UtilsInclude.h +@Time: 2021/4/30 9:14 下午 +@Desc: +***************************/ + +#ifndef CGRAPH_UTILSINCLUDE_H +#define CGRAPH_UTILSINCLUDE_H + +#include "UtilsDefine.h" +#include "UAllocator.h" +#include "ThreadPool/UThreadPoolInclude.h" + +#endif //CGRAPH_UTILSINCLUDE_H diff --git a/src/UtilsCtrl/UtilsObject.h b/src/UtilsCtrl/UtilsObject.h new file mode 100644 index 0000000..61e8750 --- /dev/null +++ b/src/UtilsCtrl/UtilsObject.h @@ -0,0 +1,26 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UtilsObject.h +@Time: 2021/9/19 12:00 上午 +@Desc: +***************************/ + +#ifndef CGRAPH_UTILSOBJECT_H +#define CGRAPH_UTILSOBJECT_H + +#include "UtilsDefine.h" + +CGRAPH_NAMESPACE_BEGIN + +class UtilsObject : public CObject { + +public: + CStatus run() override { + CGRAPH_NO_SUPPORT + } +}; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_UTILSOBJECT_H diff --git a/tutorial.cpp b/tutorial.cpp new file mode 100644 index 0000000..e1f9dbd --- /dev/null +++ b/tutorial.cpp @@ -0,0 +1,136 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: main.cpp +@Time: 2021/9/2 11:12 下午 +@Desc: 本例主要演示,threadpool工具的使用方法 +***************************/ + +#include "src/CThreadPool.h" + +#include "MyFunction.h" + +using namespace CTP; + +void tutorial_threadpool_1(UThreadPoolPtr tp) { + /** + * 依次向线程池中传入: + * 1、普通函数 + * 2、静态函数 + * 3、类成员函数 + * 4、类成员静态函数 + */ + int i = 6, j = 3; + std::string str = "5"; + MyFunction mf; + + auto r1 = tp->commit([i, j] { return add(i, j); }); // 可以通过lambda表达式传递函数 + std::future r2 = tp->commit(std::bind(minusBy5, 8.5f)); // 可以传入任意个数的入参 + auto r3 = tp->commit(std::bind(&MyFunction::concat, mf, str)); // 返回值可以是任意类型 + std::future r4 = tp->commit([i, j] { return MyFunction::multiply(i, j); }); // 返回值实际上是std::future类型 + + std::cout << r1.get() << std::endl; // 返回值可以是int类型 + std::cout << r2.get() << std::endl; // 等待r2对应函数执行完毕后,再继续执行。不调用get()为不等待 + std::cout << r3.get() << std::endl; // 返回值也可是string或其他任意类型 + std::cout << r4.get() << std::endl; +} + + +void tutorial_threadpool_2(UThreadPoolPtr tp) { + /** + * 通过添加工作组(taskGroup)来执行任务 + */ + UTaskGroup taskGroup; + + /** 添加一个不耗时的任务 */ + int i = 1, j = 2, k = 3; + auto hcg = [] { CGRAPH_ECHO("Hello, CGraph."); }; + taskGroup.addTask(hcg); + + /** 添加一个耗时为1000ms的任务 */ + taskGroup.addTask([i, j] { + int result = i + j; + CGRAPH_SLEEP_MILLISECOND(1000) + CGRAPH_ECHO("sleep for 1 second, [%d] + [%d] = [%d], run success.", i, j, result); + }); + + taskGroup.addTask([i, j, k] { + int result = i - j + k; + CGRAPH_SLEEP_MILLISECOND(2000) + CGRAPH_ECHO("sleep for 2 second, [%d] - [%d] + [%d] = [%d], run success.", i, j, k, result); + return result; // submit接口,不会对线程函数返回值进行判断。如果需要判断,考虑commit方式 + }); + + /** 如果添加耗时3000ms的任务,则整体执行失败 */ + /* taskGroup.addTask([] { + CGRAPH_SLEEP_MILLISECOND(3000) + }); */ + + /** + * 可以添加执行task group结束后的回调信息 + * 其中sts是task group整体执行结果的返回值信息 + * */ + /* taskGroup.setOnFinished([] (const CStatus& sts) { + if(sts.isOK()) { + CGRAPH_ECHO("task group run success."); + } else { + CGRAPH_ECHO("task group run failed, error info is [%s].", sts.getInfo().c_str()); + } + }); */ + + /** + * 设定超时时间=2500ms,确保以上任务能顺利执行完成 + * 如果加入sleep(3000)的任务,则也会在2500ms的时候退出 + * 并且在status中提示超时信息 + * */ + CStatus status = tp->submit(taskGroup, 2500); + CGRAPH_ECHO("task group run status is [%d].", status.getCode()); +} + + +void tutorial_threadpool_3(UThreadPoolPtr tp) { + /** + * 并发打印0~100之间的数字 + * 使用commit和submit函数的区别,主要在于: + * 1,commit()属于非阻塞执行,是将线程函数执行的结果以future的类型返回,交由上层处理 + * 2,submit()属于阻塞顺序执行,是在内部处理好超时等信息并作为结果返回,抛弃线程函数自身返回值 + * 3,不需要线程函数返回值,并且不需要判断超时信息的场景,两者无区别(如下例) + */ + const int size = 100; + CGRAPH_ECHO("thread pool task submit version : "); + for (int i = 0; i < size; i++) { + tp->submit([i] { std::cout << i << " "; }); // 可以看到,submit版本是有序执行的。如果需要想要无序执行,可以通过创建taskGroup的方式进行,或者使用commit方法 + } + CGRAPH_SLEEP_SECOND(1) // 等待上面函数执行完毕,以便于观察结果。无实际意义 + std::cout << "\r\n"; + + CGRAPH_ECHO("thread pool task group submit version : "); + UTaskGroup taskGroup; + for (int i = 0; i < size; i++) { + taskGroup.addTask([i] { std::cout << i << " "; }); // 将任务放到一个taskGroup中,并发执行。执行的结果是无序的 + } + tp->submit(taskGroup); + CGRAPH_SLEEP_SECOND(1) + std::cout << "\r\n"; + + CGRAPH_ECHO("thread pool task commit version : "); + for (int i = 0; i < size; i++) { + tp->commit([i] { std::cout << i << " "; }); // commit版本,是无序执行的 + } + CGRAPH_SLEEP_SECOND(1) + std::cout << "\r\n"; +} + + +int main() { + auto pool = std::make_unique(); // 构造一个线程池类的智能指针 + CGRAPH_ECHO("======== tutorial_threadpool_1 begin. ========"); + tutorial_threadpool_1(pool.get()); + + CGRAPH_ECHO("======== tutorial_threadpool_2 begin. ========"); + tutorial_threadpool_2(pool.get()); + + CGRAPH_ECHO("======== tutorial_threadpool_3 begin. ========"); + tutorial_threadpool_3(pool.get()); + return 0; +}