diff --git a/CMakeLists.txt b/CMakeLists.txt index 28a3c7c..91adcf1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,7 +9,7 @@ message("* * * * * * * * * * * * * * * * *") cmake_minimum_required(VERSION 3.2.5) -project(CThreadPool VERSION 1.2.0) +project(CThreadPool VERSION 1.2.1) set(CMAKE_CXX_STANDARD 11) diff --git a/README.md b/README.md index b7e0f43..597fdee 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,9 @@ int main() { [2023.03.07 - v1.2.0 - Chunel] * 优化windows版本功能 +[2023.10.07 - v1.2.1 - Chunel] +* 更新执行策略,优化整体性能 + ------------ #### 附录-2. 联系方式 * 微信: ChunelFeng diff --git a/src/CBasic/CBasicInclude.h b/src/CBasic/CBasicInclude.h index 517c195..d43da3f 100644 --- a/src/CBasic/CBasicInclude.h +++ b/src/CBasic/CBasicInclude.h @@ -17,5 +17,7 @@ #include "CBasicDefine.h" #include "CStrDefine.h" #include "CStdEx.h" +#include "CDescInfo.h" +#include "CStruct.h" #endif //CGRAPH_CBASICINCLUDE_H diff --git a/src/CBasic/CDescInfo.h b/src/CBasic/CDescInfo.h new file mode 100644 index 0000000..9a2181b --- /dev/null +++ b/src/CBasic/CDescInfo.h @@ -0,0 +1,76 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: CDescInfo.h +@Time: 2023/2/19 15:56 +@Desc: 通用描述信息 +***************************/ + +#ifndef CGRAPH_CDESCINFO_H +#define CGRAPH_CDESCINFO_H + +#include + +#include "CBasicDefine.h" + +CGRAPH_NAMESPACE_BEGIN + +class CDescInfo { +public: + /** + * 获取名称信息 + * @return + */ + const std::string& getName() const { + return name_; + } + + /** + * 获取唯一id信息 + * @return + */ + const std::string& getSession() const { + return session_; + } + + /** + * 获取描述信息 + * @return + */ + const std::string& getDescription() const { + return description_; + } + + /** + * 设置名称信息 + * @param name + * @return + */ + virtual auto setName(const std::string& name) + -> decltype(this) { + name_ = name; + return this; + } + + /** + * 设置描述信息 + * @param description + * @return + */ + virtual auto setDescription(const std::string& description) + -> decltype(this) { + description_ = description; + return this; + } + + virtual ~CDescInfo() = default; + +protected: + std::string name_; // 名字 + std::string session_; // 唯一id信息 + std::string description_; // 描述信息 +}; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_CDESCINFO_H diff --git a/src/CBasic/CException.h b/src/CBasic/CException.h index 611034d..90335f4 100644 --- a/src/CBasic/CException.h +++ b/src/CBasic/CException.h @@ -18,20 +18,25 @@ CGRAPH_NAMESPACE_BEGIN class CEXCEPTION : public std::exception { public: - explicit CEXCEPTION(const std::string& info = CGRAPH_EMPTY) { - info_ = info.empty() ? CGRAPH_BASIC_EXCEPTION : info; + explicit CEXCEPTION(const std::string& info, + const std::string& locate = CGRAPH_EMPTY) { + /** + * 这里的设计,和CStatus有一个联动 + * 如果不了解具体情况,不建议做任何修改 + */ + exception_info_ = locate + " | " + info; } /** * 获取异常信息 * @return */ - [[nodiscard]] const char* what() const noexcept override { - return info_.c_str(); + const char* what() const noexcept override { + return exception_info_.c_str(); } private: - std::string info_; // 异常状态信息 + std::string exception_info_; // 异常状态信息 }; CGRAPH_NAMESPACE_END diff --git a/src/CBasic/CFuncType.h b/src/CBasic/CFuncType.h index 4e0732b..4e044f9 100644 --- a/src/CBasic/CFuncType.h +++ b/src/CBasic/CFuncType.h @@ -45,13 +45,30 @@ enum class CFunctionType { #define CGRAPH_EMPTY_FUNCTION \ return CStatus(); \ -/** 不支持当前功能 */ -#define CGRAPH_NO_SUPPORT \ - return CStatus(CGRAPH_FUNCTION_NO_SUPPORT); \ + +/** 获取当前代码所在的位置信息 */ +#define CGRAPH_GET_LOCATE \ + (std::string(__FILE__) + " | " + std::string(__FUNCTION__) \ + + " | line = [" + ::std::to_string( __LINE__) + "]") + + +/** 生成一个包含异常位置的 CStatus + * 这里这样实现,是为了符合 CStatus 类似写法 + * */ +#define CErrStatus(info) \ + CStatus(info, CGRAPH_GET_LOCATE) \ /** 返回异常信息和状态 */ #define CGRAPH_RETURN_ERROR_STATUS(info) \ - return CStatus(info); \ + return CErrStatus(info); \ + +/** 根据条件判断是否返回错误状态 */ +#define CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(cond, info) \ + if (unlikely(cond)) { CGRAPH_RETURN_ERROR_STATUS(info); } \ + +/** 不支持当前功能 */ +#define CGRAPH_NO_SUPPORT \ + return CErrStatus(CGRAPH_FUNCTION_NO_SUPPORT); \ /** 定义为不能赋值和拷贝的对象类型 */ #define CGRAPH_NO_ALLOWED_COPY(CType) \ @@ -60,7 +77,18 @@ enum class CFunctionType { /** 抛出异常 */ #define CGRAPH_THROW_EXCEPTION(info) \ - throw CException(info); \ + throw CException(info, CGRAPH_GET_LOCATE); \ + +/** 在异常状态的情况下,抛出异常 */ +#define CGRAPH_THROW_EXCEPTION_BY_STATUS(status) \ + if (unlikely((status).isErr())) { \ + CGRAPH_THROW_EXCEPTION((status).getInfo()); \ + } \ + +/** 根据条件判断是否抛出异常 */ +#define CGRAPH_THROW_EXCEPTION_BY_CONDITION(cond, info) \ + if (unlikely(cond)) { CGRAPH_THROW_EXCEPTION(info); } \ + CGRAPH_NAMESPACE_END diff --git a/src/CBasic/CStatus.h b/src/CBasic/CStatus.h index 731e345..227289b 100644 --- a/src/CBasic/CStatus.h +++ b/src/CBasic/CStatus.h @@ -19,86 +19,89 @@ CGRAPH_NAMESPACE_BEGIN /** * 说明: * 返回值为0,表示正常逻辑 - * 返回值为正整数,表示warning逻辑,程序仍会继续执行 * 返回值为负整数,表示error逻辑,程序终止执行 * 自定义返回值,请务必遵守以上约定 */ static const int STATUS_OK = 0; /** 正常流程返回值 */ static const int STATUS_ERR = -1; /** 异常流程返回值 */ +static const int STATUS_CRASH = -996; /** 异常流程返回值 */ static const char* STATUS_ERROR_INFO_CONNECTOR = " && "; /** 多异常信息连接符号 */ class CSTATUS { public: explicit CSTATUS() = default; - explicit CSTATUS(const std::string &errorInfo) { + explicit CSTATUS(const std::string &errorInfo, + const std::string &locateInfo = CGRAPH_EMPTY) { this->error_code_ = STATUS_ERR; // 默认的error code信息 this->error_info_ = errorInfo; + this->error_locate_ = locateInfo; } - explicit CSTATUS(int errorCode, const std::string &errorInfo) { + explicit CSTATUS(int errorCode, const std::string &errorInfo, + const std::string &locateInfo = CGRAPH_EMPTY) { this->error_code_ = errorCode; this->error_info_ = errorInfo; + this->error_locate_ = locateInfo; } CSTATUS(const CSTATUS &status) { this->error_code_ = status.error_code_; this->error_info_ = status.error_info_; + this->error_locate_ = status.error_locate_; } CSTATUS(const CSTATUS &&status) noexcept { this->error_code_ = status.error_code_; this->error_info_ = status.error_info_; + this->error_locate_ = status.error_locate_; } CSTATUS& operator=(const CSTATUS& status) = default; CSTATUS& operator+=(const CSTATUS& cur) { - if (this->isOK() && cur.isOK()) { - return (*this); + /** + * 如果当前状态已经异常,则不做改动 + * 如果当前状态正常,并且传入的状态是异常的话,则返回异常 + */ + if (!this->isErr() && cur.isErr()) { + this->error_code_ = cur.error_code_; + this->error_info_ = cur.error_info_; + this->error_locate_ = cur.error_locate_; } - error_info_ = this->isOK() - ? cur.error_info_ - : (cur.isOK() - ? error_info_ - : (error_info_ + STATUS_ERROR_INFO_CONNECTOR + cur.error_info_)); - error_code_ = STATUS_ERR; - return (*this); } - void setStatus(const std::string& info) { - error_code_ = STATUS_ERR; - error_info_ = info; - } - - void setStatus(int code, const std::string& info) { - error_code_ = code; - error_info_ = info; - } - - [[nodiscard]] int getCode() const { + /** + * 获取异常值信息 + * @return + */ + int getCode() const { return this->error_code_; } - [[nodiscard]] const std::string& getInfo() const { + /** + * 获取异常信息 + * @return + */ + const std::string& getInfo() const { return this->error_info_; } /** - * 恢复数据 + * 获取报错位置 + * @return */ - void reset() { - error_code_ = STATUS_OK; - error_info_ = CGRAPH_EMPTY; + const std::string& getLocate() const { + return this->error_locate_; } /** * 判断当前状态是否可行 * @return */ - [[nodiscard]] bool isOK() const { + bool isOK() const { return STATUS_OK == error_code_; } @@ -106,29 +109,22 @@ public: * 判断当前状态是否可行 * @return */ - [[nodiscard]] bool isErr() const { + bool isErr() const { return error_code_ < STATUS_OK; // 约定异常信息,均为负值 } /** - * 判断当前状态是否有异常 - * @return - */ - [[nodiscard]] bool isNotErr() const { - return error_code_ >= STATUS_OK; - } - - /** - * 判断当前状态,不是ok的(包含error 和 warning) + * 判断当前状态是否是崩溃了 * @return */ - [[nodiscard]] bool isNotOK() const { - return error_code_ != STATUS_OK; + bool isCrash() const { + return STATUS_CRASH == error_code_; } private: - int error_code_ { STATUS_OK }; // 错误码信息 + int error_code_ = STATUS_OK; // 错误码信息 std::string error_info_; // 错误信息描述 + std::string error_locate_; // 错误发生的具体位置,形如:file|function|line }; CGRAPH_NAMESPACE_END diff --git a/src/CBasic/CStrDefine.h b/src/CBasic/CStrDefine.h index 9b66209..a06abed 100644 --- a/src/CBasic/CStrDefine.h +++ b/src/CBasic/CStrDefine.h @@ -9,15 +9,18 @@ #ifndef CGRAPH_CSTRDEFINE_H #define CGRAPH_CSTRDEFINE_H +#include + #include "CBasicDefine.h" CGRAPH_NAMESPACE_BEGIN -static const char* CGRAPH_EMPTY = ""; -static const char* CGRAPH_DEFAULT = "default"; -static const char* CGRAPH_UNKNOWN = "unknown"; -static const char* CGRAPH_BASIC_EXCEPTION = "CGraph default exception"; -static const char* CGRAPH_FUNCTION_NO_SUPPORT = "CGraph function no support"; +static const std::string& CGRAPH_EMPTY = ""; +static const std::string& CGRAPH_DEFAULT = "default"; +static const std::string& CGRAPH_UNKNOWN = "unknown"; +static const std::string& CGRAPH_BASIC_EXCEPTION = "CGraph default exception"; +static const std::string& CGRAPH_FUNCTION_NO_SUPPORT = "CGraph function no support"; +static const std::string& CGRAPH_INPUT_IS_NULL = "input is nullptr"; CGRAPH_NAMESPACE_END diff --git a/src/CBasic/CStruct.h b/src/CBasic/CStruct.h new file mode 100644 index 0000000..65e8ea4 --- /dev/null +++ b/src/CBasic/CStruct.h @@ -0,0 +1,25 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: CStruct.h +@Time: 2023/7/16 11:36 +@Desc: +***************************/ + +#ifndef CGRAPH_CSTRUCT_H +#define CGRAPH_CSTRUCT_H + +#include "CBasicDefine.h" + +CGRAPH_NAMESPACE_BEGIN + +/** + * 所有框架内部结构体定义的基类 + * 仅针对类似 bean 数据类型的定义 + */ +class CStruct { +}; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_CSTRUCT_H diff --git a/src/CBasic/CValType.h b/src/CBasic/CValType.h index 9b935b9..9501c29 100644 --- a/src/CBasic/CValType.h +++ b/src/CBasic/CValType.h @@ -9,27 +9,30 @@ #ifndef CGRAPH_CVALTYPE_H #define CGRAPH_CVALTYPE_H +#include "CBasicDefine.h" #include "CStatus.h" #include "CException.h" using CChar = CTP::CCHAR; using CUint = CTP::CUINT; -using CSec = CTP::CUINT; // 表示秒信息, for second -using CMSec = CTP::CUINT; // 表示毫秒信息, for millisecond using CSize = CTP::CSIZE; using CVoid = CTP::CVOID; using CVoidPtr = CTP::CVOID *; using CInt = CTP::CINT; -using CLevel = CTP::CINT; using CLong = CTP::CLONG; using CULong = CTP::CULONG; using CBool = CTP::CBOOL; -using CIndex = CTP::CINT; // 表示标识信息,可以为负数 +using CIndex = CTP::CINT; // 表示标识信息,可以为负数 using CFloat = CTP::CFLOAT; using CDouble = CTP::CDOUBLE; -using CConStr = CTP::CCONSTR; // 表示 const char* +using CConStr = CTP::CCONSTR; // 表示 const char* using CBigBool = CTP::CBIGBOOL; +using CLevel = CTP::CINT; +using CSec = CTP::CLONG; // 表示秒信息, for second +using CMSec = CTP::CLONG; // 表示毫秒信息, for millisecond +using CFMSec = CTP::CDOUBLE; // 表示毫秒信息,包含小数点信息 + using CStatus = CTP::CSTATUS; using CException = CTP::CEXCEPTION; diff --git a/src/UtilsCtrl/ThreadPool/Queue/UAtomicPriorityQueue.h b/src/UtilsCtrl/ThreadPool/Queue/UAtomicPriorityQueue.h index b0d4999..552f4f5 100644 --- a/src/UtilsCtrl/ThreadPool/Queue/UAtomicPriorityQueue.h +++ b/src/UtilsCtrl/ThreadPool/Queue/UAtomicPriorityQueue.h @@ -26,13 +26,17 @@ public: * @return */ CBool tryPop(T& value) { - CGRAPH_LOCK_GUARD lk(mutex_); - if (priority_queue_.empty()) { - return false; + CBool result = false; + if (mutex_.try_lock()) { + if (!priority_queue_.empty()) { + value = std::move(*priority_queue_.top()); + priority_queue_.pop(); + result = true; + } + mutex_.unlock(); } - value = std::move(*priority_queue_.top()); - priority_queue_.pop(); - return true; + + return result; } @@ -43,17 +47,17 @@ public: * @return */ CBool tryPop(std::vector& values, int maxPoolBatchSize) { - CGRAPH_LOCK_GUARD lk(mutex_); - if (priority_queue_.empty() || maxPoolBatchSize <= 0) { - return false; - } - - while (!priority_queue_.empty() && maxPoolBatchSize--) { - values.emplace_back(std::move(*priority_queue_.top())); - priority_queue_.pop(); + CBool result = false; + if (mutex_.try_lock()) { + while (!priority_queue_.empty() && maxPoolBatchSize-- > 0) { + values.emplace_back(std::move(*priority_queue_.top())); + priority_queue_.pop(); + result = true; + } + mutex_.unlock(); } - return true; + return result; } @@ -74,7 +78,7 @@ public: * 判定队列是否为空 * @return */ - [[nodiscard]] CBool empty() { + CBool empty() { CGRAPH_LOCK_GUARD lk(mutex_); return priority_queue_.empty(); } diff --git a/src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h b/src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h index e55ab66..a018a0f 100644 --- a/src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h +++ b/src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h @@ -42,13 +42,17 @@ public: * @return */ CBool tryPop(T& value) { - CGRAPH_LOCK_GUARD lk(mutex_); - if (queue_.empty()) { - return false; + CBool result = false; + if (mutex_.try_lock()) { + if (!queue_.empty()) { + value = std::move(*queue_.front()); + queue_.pop(); + result = true; + } + mutex_.unlock(); } - value = std::move(*queue_.front()); - queue_.pop(); - return true; + + return result; } @@ -59,17 +63,17 @@ public: * @return */ CBool tryPop(std::vector& values, int maxPoolBatchSize) { - CGRAPH_LOCK_GUARD lk(mutex_); - if (queue_.empty() || maxPoolBatchSize <= 0) { - return false; - } - - while (!queue_.empty() && maxPoolBatchSize--) { - values.emplace_back(std::move(*queue_.front())); - queue_.pop(); + CBool result = false; + if (mutex_.try_lock()) { + while (!queue_.empty() && maxPoolBatchSize-- > 0) { + values.emplace_back(std::move(*queue_.front())); + queue_.pop(); + result = true; + } + mutex_.unlock(); } - return true; + return result; } @@ -77,11 +81,14 @@ public: * 阻塞式等待弹出 * @return */ - std::unique_ptr waitPop() { + std::unique_ptr popWithTimeout(CMSec ms) { CGRAPH_UNIQUE_LOCK lk(mutex_); - cv_.wait(lk, [this] { return !queue_.empty(); }); + if (!cv_.wait_for(lk, std::chrono::milliseconds(ms), [this] { return !queue_.empty(); })) { + return nullptr; + } + std::unique_ptr result = std::move(queue_.front()); - queue_.pop(); + queue_.pop(); // 如果等成功了,则弹出一个信息 return result; } @@ -104,9 +111,17 @@ public: * @param value */ CVoid push(T&& value) { - std::unique_ptr task(c_make_unique(std::move(value))); - CGRAPH_LOCK_GUARD lk(mutex_); - queue_.push(std::move(task)); + std::unique_ptr::type> \ + task(c_make_unique::type>(std::forward(value))); + while (true) { + if (mutex_.try_lock()) { + queue_.push(std::move(task)); + mutex_.unlock(); + break; + } else { + std::this_thread::yield(); + } + } cv_.notify_one(); } @@ -115,7 +130,7 @@ public: * 判定队列是否为空 * @return */ - [[nodiscard]] CBool empty() { + CBool empty() { CGRAPH_LOCK_GUARD lk(mutex_); return queue_.empty(); } diff --git a/src/UtilsCtrl/ThreadPool/Queue/UAtomicRingBufferQueue.h b/src/UtilsCtrl/ThreadPool/Queue/UAtomicRingBufferQueue.h index e30a2db..7b3ab3b 100644 --- a/src/UtilsCtrl/ThreadPool/Queue/UAtomicRingBufferQueue.h +++ b/src/UtilsCtrl/ThreadPool/Queue/UAtomicRingBufferQueue.h @@ -11,6 +11,7 @@ #include #include +#include #include "UQueueObject.h" @@ -46,21 +47,32 @@ public: * 获取容量信息 * @return */ - [[nodiscard]] CUint getCapacity() const { + CUint getCapacity() const { return capacity_; } /** * 写入信息 + * @tparam TImpl * @param value + * @param strategy * @return */ template - CVoid push(const TImpl& value) { + CVoid push(const TImpl& value, URingBufferPushStrategy strategy) { { CGRAPH_UNIQUE_LOCK lk(mutex_); if (isFull()) { - push_cv_.wait(lk, [this] { return !isFull(); }); + switch (strategy) { + case URingBufferPushStrategy::WAIT: + push_cv_.wait(lk, [this] { return !isFull(); }); + break; + case URingBufferPushStrategy::REPLACE: + head_ = (head_ + 1) % capacity_; + break; + case URingBufferPushStrategy::DROP: + return; // 直接返回,不写入即可 + } } ring_buffer_queue_[tail_] = std::move(c_make_unique(value)); @@ -70,23 +82,88 @@ public: } /** + * 写入智能指针类型的信息 + * @tparam TImpl + * @param value + * @param strategy + * @return + */ + template + CVoid push(std::unique_ptr& value, URingBufferPushStrategy strategy) { + { + CGRAPH_UNIQUE_LOCK lk(mutex_); + if (isFull()) { + switch (strategy) { + case URingBufferPushStrategy::WAIT: + push_cv_.wait(lk, [this] { return !isFull(); }); + break; + case URingBufferPushStrategy::REPLACE: + head_ = (head_ + 1) % capacity_; + break; + case URingBufferPushStrategy::DROP: + return; // 直接返回,不写入即可 + } + } + + ring_buffer_queue_[tail_] = std::move(value); + tail_ = (tail_ + 1) % capacity_; + } + pop_cv_.notify_one(); + } + + /** * 等待弹出信息 * @param value + * @param timeout * @return */ template - CVoid waitPop(TImpl& value) { + CStatus waitPopWithTimeout(TImpl& value, CMSec timeout) { + CGRAPH_FUNCTION_BEGIN { CGRAPH_UNIQUE_LOCK lk(mutex_); - if (isEmpty()) { - pop_cv_.wait(lk, [this] { return !isEmpty(); }); + if (isEmpty() + && !pop_cv_.wait_for(lk, std::chrono::milliseconds(timeout), + [this] { return !isEmpty(); })) { + // 如果timeout的时间内,等不到消息,则返回错误信息 + CGRAPH_RETURN_ERROR_STATUS("receive message timeout.") } - value = (*ring_buffer_queue_[head_]); - *ring_buffer_queue_[head_] = {}; + value = *ring_buffer_queue_[head_]; // 这里直接进行值copy head_ = (head_ + 1) % capacity_; } push_cv_.notify_one(); + CGRAPH_FUNCTION_END + } + + /** + * 等待弹出信息。ps:当入参为智能指针的情况 + * @tparam TImpl + * @param value + * @param timeout + * @return + */ + template + CStatus waitPopWithTimeout(std::unique_ptr& value, CMSec timeout) { + CGRAPH_FUNCTION_BEGIN + { + CGRAPH_UNIQUE_LOCK lk(mutex_); + if (isEmpty() + && !pop_cv_.wait_for(lk, std::chrono::milliseconds(timeout), + [this] { return !isEmpty(); })) { + // 如果timeout的时间内,等不到消息,则返回错误信息 + CGRAPH_RETURN_ERROR_STATUS("receive message timeout.") + } + + /** + * 当传入的内容,是智能指针的时候, + * 这里就直接通过 move转移过去好了,跟直接传值的方式,保持区别 + */ + value = std::move(ring_buffer_queue_[head_]); + head_ = (head_ + 1) % capacity_; + } + push_cv_.notify_one(); + CGRAPH_FUNCTION_END } /** @@ -102,11 +179,19 @@ public: } protected: + /** + * 当前队列是否为满 + * @return + */ CBool isFull() { // 空出来一个位置,这个时候不让 tail写入 return head_ == (tail_ + 1) % capacity_; } + /** + * 当前队列是否为空 + * @return + */ CBool isEmpty() { return head_ == tail_; } diff --git a/src/UtilsCtrl/ThreadPool/Queue/UQueueDefine.h b/src/UtilsCtrl/ThreadPool/Queue/UQueueDefine.h new file mode 100644 index 0000000..fb0af5e --- /dev/null +++ b/src/UtilsCtrl/ThreadPool/Queue/UQueueDefine.h @@ -0,0 +1,23 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: UQueueDefine.h +@Time: 2023/9/15 21:31 +@Desc: +***************************/ + +#ifndef CGRAPH_UQUEUEDEFINE_H +#define CGRAPH_UQUEUEDEFINE_H + +CGRAPH_NAMESPACE_BEGIN + +/** 当环形队列满的时候,写入信息时候的策略 */ +enum class URingBufferPushStrategy { + WAIT = 1, // 等待有数据被消费后,再写入 + REPLACE = 2, // 替换未被消费的最早进入的内容 + DROP = 3, // 丢弃当前信息 +}; + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_UQUEUEDEFINE_H diff --git a/src/UtilsCtrl/ThreadPool/Queue/UQueueObject.h b/src/UtilsCtrl/ThreadPool/Queue/UQueueObject.h index 75da626..4ae5417 100644 --- a/src/UtilsCtrl/ThreadPool/Queue/UQueueObject.h +++ b/src/UtilsCtrl/ThreadPool/Queue/UQueueObject.h @@ -12,6 +12,7 @@ #include #include "../UThreadObject.h" +#include "UQueueDefine.h" CGRAPH_NAMESPACE_BEGIN diff --git a/src/UtilsCtrl/ThreadPool/Queue/UWorkStealingQueue.h b/src/UtilsCtrl/ThreadPool/Queue/UWorkStealingQueue.h index f2d862b..48d4fac 100644 --- a/src/UtilsCtrl/ThreadPool/Queue/UWorkStealingQueue.h +++ b/src/UtilsCtrl/ThreadPool/Queue/UWorkStealingQueue.h @@ -10,7 +10,6 @@ #ifndef CGRAPH_UWORKSTEALINGQUEUE_H #define CGRAPH_UWORKSTEALINGQUEUE_H -#include #include #include "UQueueObject.h" @@ -27,7 +26,7 @@ public: */ CVoid push(UTask&& task) { while (true) { - if (lock_.tryLock()) { + if (lock_.try_lock()) { deque_.emplace_front(std::move(task)); lock_.unlock(); break; @@ -39,6 +38,22 @@ public: /** + * 尝试往队列里写入信息 + * @param task + * @return + */ + CBool tryPush(UTask&& task) { + CBool result = false; + if (lock_.try_lock()) { + deque_.emplace_back(std::move(task)); + lock_.unlock(); + result = true; + } + return result; + } + + + /** * 弹出节点,从头部进行 * @param task * @return @@ -46,7 +61,7 @@ public: CBool tryPop(UTask& task) { // 这里不使用raii锁,主要是考虑到多线程的情况下,可能会重复进入 bool result = false; - if (lock_.tryLock()) { + if (lock_.try_lock()) { if (!deque_.empty()) { task = std::move(deque_.front()); // 从前方弹出 deque_.pop_front(); @@ -65,10 +80,9 @@ public: * @param maxLocalBatchSize * @return */ - CBool tryPop(UTaskArrRef taskArr, - int maxLocalBatchSize) { + CBool tryPop(UTaskArrRef taskArr, int maxLocalBatchSize) { bool result = false; - if (lock_.tryLock()) { + if (lock_.try_lock()) { while (!deque_.empty() && maxLocalBatchSize--) { taskArr.emplace_back(std::move(deque_.front())); deque_.pop_front(); @@ -88,7 +102,7 @@ public: */ CBool trySteal(UTask& task) { bool result = false; - if (lock_.tryLock()) { + if (lock_.try_lock()) { if (!deque_.empty()) { task = std::move(deque_.back()); // 从后方窃取 deque_.pop_back(); @@ -108,7 +122,7 @@ public: */ CBool trySteal(UTaskArrRef taskArr, int maxStealBatchSize) { bool result = false; - if (lock_.tryLock()) { + if (lock_.try_lock()) { while (!deque_.empty() && maxStealBatchSize--) { taskArr.emplace_back(std::move(deque_.back())); deque_.pop_back(); @@ -126,7 +140,7 @@ public: private: std::deque deque_; // 存放任务的双向队列 - USpinLock lock_; // 用自旋锁处理 + std::mutex lock_; // 用于处理deque_的锁 }; CGRAPH_NAMESPACE_END diff --git a/src/UtilsCtrl/ThreadPool/Task/UTaskGroup.h b/src/UtilsCtrl/ThreadPool/Task/UTaskGroup.h index 2663ae1..5c1e098 100644 --- a/src/UtilsCtrl/ThreadPool/Task/UTaskGroup.h +++ b/src/UtilsCtrl/ThreadPool/Task/UTaskGroup.h @@ -66,7 +66,7 @@ public: * 获取最大超时时间信息 * @return */ - [[nodiscard]] CMSec getTtl() const { + CMSec getTtl() const { return this->ttl_; } @@ -81,7 +81,7 @@ public: * 获取任务组大小 * @return */ - [[nodiscard]] CSize getSize() const { + CSize getSize() const { auto size = task_arr_.size(); return size; } diff --git a/src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h b/src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h index 939442b..e34c049 100644 --- a/src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h +++ b/src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h @@ -3,7 +3,7 @@ @Contact: chunel@foxmail.com @File: UThreadBase.h @Time: 2021/7/2 11:24 下午 -@Desc: +@Desc: ***************************/ #ifndef CGRAPH_UTHREADBASE_H @@ -118,6 +118,40 @@ protected: total_task_num_ = 0; } + /** + * 执行单个消息 + * @return + */ + virtual CVoid processTask() = 0; + + + /** + * 获取批量执行task信息 + */ + virtual CVoid processTasks() = 0; + + + /** + * 循环处理任务 + * @return + */ + CStatus loopProcess() { + CGRAPH_FUNCTION_BEGIN + CGRAPH_ASSERT_NOT_NULL(config_) + + if (config_->batch_task_enable_) { + while (done_) { + processTasks(); // 批量任务获取执行接口 + } + } else { + while (done_) { + processTask(); // 单个任务获取执行接口 + } + } + + CGRAPH_FUNCTION_END + } + /** * 设置线程优先级,仅针对非windows平台使用 @@ -138,7 +172,7 @@ protected: sched_param param = { calcPriority(priority) }; int ret = pthread_setschedparam(handle, calcPolicy(policy), ¶m); if (0 != ret) { - CGRAPH_ECHO("warning : set thread sched param failed, error code is [%d]", ret); + CGRAPH_ECHO("warning : set thread sched param failed, system error code is [%d]", ret); } #endif } @@ -147,7 +181,7 @@ protected: * 设置线程亲和性,仅针对linux系统 */ CVoid setAffinity(int index) { -#ifdef __linux__ +#if defined(__linux__) && !defined(__ANDROID__) if (!config_->bind_cpu_enable_ || CGRAPH_CPU_NUM == 0 || index < 0) { return; } @@ -159,7 +193,7 @@ protected: auto handle = thread_.native_handle(); int ret = pthread_setaffinity_np(handle, sizeof(cpu_set_t), &mask); if (0 != ret) { - CGRAPH_ECHO("warning : set thread affinity failed, error code is [%d]", ret); + CGRAPH_ECHO("warning : set thread affinity failed, system error code is [%d]", ret); } #endif } @@ -172,7 +206,7 @@ private: * @param policy * @return */ - [[nodiscard]] static int calcPolicy(int policy) { + static int calcPolicy(int policy) { return (CGRAPH_THREAD_SCHED_OTHER == policy || CGRAPH_THREAD_SCHED_RR == policy || CGRAPH_THREAD_SCHED_FIFO == policy) @@ -186,7 +220,7 @@ private: * @param priority * @return */ - [[nodiscard]] static int calcPriority(int priority) { + static int calcPriority(int priority) { return (priority >= CGRAPH_THREAD_MIN_PRIORITY && priority <= CGRAPH_THREAD_MAX_PRIORITY) ? priority : CGRAPH_THREAD_MIN_PRIORITY; diff --git a/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h b/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h index e9e5677..5bbe498 100644 --- a/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h +++ b/src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h @@ -17,6 +17,7 @@ class UThreadPrimary : public UThreadBase { protected: explicit UThreadPrimary() { index_ = CGRAPH_SECONDARY_THREAD_COMMON_ID; + steal_range_ = 0; pool_threads_ = nullptr; type_ = CGRAPH_THREAD_TYPE_PRIMARY; } @@ -25,8 +26,10 @@ protected: CStatus init() override { CGRAPH_FUNCTION_BEGIN CGRAPH_ASSERT_INIT(false) + CGRAPH_ASSERT_NOT_NULL(config_) is_init_ = true; + steal_range_ = config_->calcStealRange(); thread_ = std::move(std::thread(&UThreadPrimary::run, this)); setSchedParam(); setAffinity(index_); @@ -47,9 +50,7 @@ protected: UThreadPoolConfigPtr config) { CGRAPH_FUNCTION_BEGIN CGRAPH_ASSERT_INIT(false) // 初始化之前,设置参数 - CGRAPH_ASSERT_NOT_NULL(poolTaskQueue) - CGRAPH_ASSERT_NOT_NULL(poolThreads) - CGRAPH_ASSERT_NOT_NULL(config) + CGRAPH_ASSERT_NOT_NULL(poolTaskQueue, poolThreads, config) this->index_ = index; this->pool_task_queue_ = poolTaskQueue; @@ -67,7 +68,6 @@ protected: CGRAPH_FUNCTION_BEGIN CGRAPH_ASSERT_INIT(true) CGRAPH_ASSERT_NOT_NULL(pool_threads_) - CGRAPH_ASSERT_NOT_NULL(config_) /** * 线程池中任何一个primary线程为null都不可以执行 @@ -81,38 +81,22 @@ protected: CGRAPH_RETURN_ERROR_STATUS("primary thread is null") } - if (config_->calcBatchTaskRatio()) { - while (done_) { - processTasks(); // 批量任务获取执行接口 - } - } else { - while (done_) { - processTask(); // 单个任务获取执行接口 - } - } - + status = loopProcess(); CGRAPH_FUNCTION_END } - /** - * 获取并执行任务 - * @return - */ - CVoid processTask() { + CVoid processTask() override { UTask task; if (popTask(task) || popPoolTask(task) || stealTask(task)) { runTask(task); } else { - std::this_thread::yield(); + std::this_thread::yield(); } } - /** - * 获取批量执行task信息 - */ - CVoid processTasks() { + CVoid processTasks() override { UTaskArr tasks; if (popTask(tasks) || popPoolTask(tasks) || stealTask(tasks)) { // 尝试从主线程中获取/盗取批量task,如果成功,则依次执行 @@ -124,12 +108,25 @@ protected: /** + * 依次push到任一队列里。如果都失败,则yield,然后重新push + * @param task + * @return + */ + CVoid pushTask(UTask&& task) { + while (!(primary_queue_.tryPush(std::move(task)) + || secondary_queue_.tryPush(std::move(task)))) { + std::this_thread::yield(); + } + } + + + /** * 从本地弹出一个任务 * @param task * @return */ bool popTask(UTaskRef task) { - return work_stealing_queue_.tryPop(task); + return primary_queue_.tryPop(task) || secondary_queue_.tryPop(task); } @@ -139,7 +136,13 @@ protected: * @return */ bool popTask(UTaskArrRef tasks) { - return work_stealing_queue_.tryPop(tasks, config_->max_local_batch_size_); + CBool result = primary_queue_.tryPop(tasks, config_->max_local_batch_size_); + auto leftSize = config_->max_local_batch_size_ - tasks.size(); + if (leftSize > 0) { + // 如果凑齐了,就不需要了。没凑齐的话,就继续 + result |= (secondary_queue_.tryPop(tasks, leftSize)); + } + return result; } @@ -161,15 +164,16 @@ protected: * 窃取的时候,仅从相邻的primary线程中窃取 * 待窃取相邻的数量,不能超过默认primary线程数 */ - int range = config_->calcStealRange(); - for (int i = 0; i < range; i++) { + for (int i = 0; i < steal_range_; i++) { /** * 从线程中周围的thread中,窃取任务。 * 如果成功,则返回true,并且执行任务。 + * steal 的时候,先从第二个队列里偷,从而降低触碰锁的概率 */ int curIndex = (index_ + i + 1) % config_->default_thread_size_; - if (nullptr != (*pool_threads_)[curIndex] - && ((*pool_threads_)[curIndex])->work_stealing_queue_.trySteal(task)) { + if (likely((*pool_threads_)[curIndex]) + && (((*pool_threads_)[curIndex])->secondary_queue_.trySteal(task)) + || ((*pool_threads_)[curIndex])->primary_queue_.trySteal(task)) { return true; } } @@ -184,16 +188,28 @@ protected: * @return */ bool stealTask(UTaskArrRef tasks) { - if (unlikely(pool_threads_->size() < config_->default_thread_size_)) { + if (unlikely(pool_threads_->size() != config_->default_thread_size_)) { return false; } - int range = config_->calcStealRange(); - for (int i = 0; i < range; i++) { + for (int i = 0; i < steal_range_; i++) { int curIndex = (index_ + i + 1) % config_->default_thread_size_; - if (nullptr != (*pool_threads_)[curIndex] - && ((*pool_threads_)[curIndex])->work_stealing_queue_.trySteal(tasks, config_->max_steal_batch_size_)) { - return true; + if (likely((*pool_threads_)[curIndex])) { + bool result = ((*pool_threads_)[curIndex])->secondary_queue_.trySteal(tasks, config_->max_steal_batch_size_); + auto leftSize = config_->max_steal_batch_size_ - tasks.size(); + if (leftSize > 0) { + result |= ((*pool_threads_)[curIndex])->primary_queue_.trySteal(tasks, leftSize); + } + + if (result) { + /** + * 在这里,我们对模型进行了简化。实现的思路是: + * 尝试从邻居主线程(先secondary,再primary)中,获取 x(=max_steal_batch_size_) 个task, + * 如果从某一个邻居中,获取了 y(<=x) 个task,则也终止steal的流程 + * 且如果如果有一次批量steal成功,就认定成功 + */ + return true; + } } } @@ -201,8 +217,10 @@ protected: } private: - int index_ {CGRAPH_SECONDARY_THREAD_COMMON_ID}; // 线程index - UWorkStealingQueue work_stealing_queue_; // 内部队列信息 + int index_; // 线程index + int steal_range_; // 偷窃的范围信息 + UWorkStealingQueue primary_queue_; // 内部队列信息 + UWorkStealingQueue secondary_queue_; // 第二个队列,用于减少触锁概率,提升性能 std::vector* pool_threads_; // 用于存放线程池中的线程信息 friend class UThreadPool; diff --git a/src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h b/src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h index 3ac21a8..4ab5dbd 100644 --- a/src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h +++ b/src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h @@ -47,9 +47,7 @@ protected: UThreadPoolConfigPtr config) { CGRAPH_FUNCTION_BEGIN CGRAPH_ASSERT_INIT(false) // 初始化之前,设置参数 - CGRAPH_ASSERT_NOT_NULL(poolTaskQueue) - CGRAPH_ASSERT_NOT_NULL(poolPriorityTaskQueue) - CGRAPH_ASSERT_NOT_NULL(config) + CGRAPH_ASSERT_NOT_NULL(poolTaskQueue, poolPriorityTaskQueue, config) this->pool_task_queue_ = poolTaskQueue; this->pool_priority_task_queue_ = poolPriorityTaskQueue; @@ -61,44 +59,43 @@ protected: CStatus run() override { CGRAPH_FUNCTION_BEGIN CGRAPH_ASSERT_INIT(true) - CGRAPH_ASSERT_NOT_NULL(config_) - - if (config_->calcBatchTaskRatio()) { - while (done_) { - processTasks(); // 批量任务获取执行接口 - } - } else { - while (done_) { - processTask(); // 单个任务获取执行接口 - } - } + status = loopProcess(); CGRAPH_FUNCTION_END } - /** - * 任务执行函数,从线程池的任务队列中获取信息 - */ - CVoid processTask() { + CVoid processTask() override { UTask task; if (popPoolTask(task)) { runTask(task); } else { - std::this_thread::yield(); + // 如果单词无法获取,则稍加等待 + waitRunTask(config_->queue_emtpy_interval_); } } - /** - * 批量执行n个任务 - */ - CVoid processTasks() { + CVoid processTasks() override { UTaskArr tasks; if (popPoolTask(tasks)) { runTasks(tasks); } else { - std::this_thread::yield(); + waitRunTask(config_->queue_emtpy_interval_); + } + } + + + /** + * 有等待的执行任务 + * @param ms + * @return + * @notice 目的是降低cpu的占用率 + */ + CVoid waitRunTask(CMSec ms) { + auto task = this->pool_task_queue_->popWithTimeout(ms); + if (nullptr != task) { + (*task)(); } } @@ -115,7 +112,7 @@ protected: cur_ttl_--; // 如果当前线程没有在执行,则ttl-1 } - return cur_ttl_ <= 0; + return cur_ttl_ <= 0 && done_; // 必须是正在执行的线程,才可以被回收 } private: diff --git a/src/UtilsCtrl/ThreadPool/UThreadObject.h b/src/UtilsCtrl/ThreadPool/UThreadObject.h index a15da3c..d4f9734 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadObject.h +++ b/src/UtilsCtrl/ThreadPool/UThreadObject.h @@ -10,6 +10,7 @@ #define CGRAPH_UTHREADOBJECT_H #include "../UtilsObject.h" +#include "UThreadPoolDefine.h" CGRAPH_NAMESPACE_BEGIN diff --git a/src/UtilsCtrl/ThreadPool/UThreadPool.cpp b/src/UtilsCtrl/ThreadPool/UThreadPool.cpp index 014ea20..2f28ead 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPool.cpp +++ b/src/UtilsCtrl/ThreadPool/UThreadPool.cpp @@ -13,15 +13,7 @@ CGRAPH_NAMESPACE_BEGIN UThreadPool::UThreadPool(CBool autoInit, const UThreadPoolConfig& config) noexcept { cur_index_ = 0; is_init_ = false; - input_task_num_ = 0; this->setConfig(config); // setConfig 函数,用在 is_init_ 设定之后 - is_monitor_ = config_.monitor_enable_; /** 根据参数设定,决定是否开启监控线程。默认开启 */ - /** - * CGraph 本身支持跨平台运行 - * 如果在windows平台上,通过Visual Studio(2017版本或以下) 版本,将 UThreadPool 类封装程.dll文件时,遇到无法启动的问题 - * 请参考此链接:https://github.com/ChunelFeng/CGraph/issues/17 - */ - monitor_thread_ = std::move(std::thread(&UThreadPool::monitor, this)); if (autoInit) { this->init(); } @@ -29,7 +21,7 @@ UThreadPool::UThreadPool(CBool autoInit, const UThreadPoolConfig& config) noexce UThreadPool::~UThreadPool() { - is_monitor_ = false; // 在析构的时候,才释放监控线程。先释放监控线程,再释放其他的线程 + this->config_.monitor_enable_ = false; // 在析构的时候,才释放监控线程。先释放监控线程,再释放其他的线程 if (monitor_thread_.joinable()) { monitor_thread_.join(); } @@ -53,6 +45,7 @@ CStatus UThreadPool::init() { CGRAPH_FUNCTION_END } + monitor_thread_ = std::move(std::thread(&UThreadPool::monitor, this)); thread_record_map_.clear(); primary_threads_.reserve(config_.default_thread_size_); for (int i = 0; i < config_.default_thread_size_; i++) { @@ -89,16 +82,16 @@ CStatus UThreadPool::submit(const UTaskGroup& taskGroup, CMSec ttl) { } // 计算最终运行时间信息 - auto deadline = std::chrono::system_clock::now() + auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(std::min(taskGroup.getTtl(), ttl)); for (auto& fut : futures) { const auto& futStatus = fut.wait_until(deadline); switch (futStatus) { case std::future_status::ready: break; // 正常情况,直接返回了 - case std::future_status::timeout: status += CStatus("thread status timeout"); break; - case std::future_status::deferred: status += CStatus("thread status deferred"); break; - default: status += CStatus("thread status unknown"); + case std::future_status::timeout: status += CErrStatus("thread status timeout"); break; + case std::future_status::deferred: status += CErrStatus("thread status deferred"); break; + default: status += CErrStatus("thread status unknown"); } } @@ -116,7 +109,7 @@ CStatus UThreadPool::submit(CGRAPH_DEFAULT_CONST_FUNCTION_REF func, CMSec ttl, } -CIndex UThreadPool::getThreadNum(CSize tid) { +CIndex UThreadPool::getThreadIndex(CSize tid) { int threadNum = CGRAPH_SECONDARY_THREAD_COMMON_ID; auto result = thread_record_map_.find(tid); if (result != thread_record_map_.end()) { @@ -164,11 +157,35 @@ CStatus UThreadPool::destroy() { } -CIndex UThreadPool::dispatch(CIndex origIndex) { - if (unlikely(config_.fair_lock_enable_)) { - return CGRAPH_DEFAULT_TASK_STRATEGY; // 如果开启fair lock,则全部写入 pool的queue中,依次执行 +CBool UThreadPool::isInit() const { + return is_init_; +} + + +CStatus UThreadPool::releaseSecondaryThread(CInt size) { + CGRAPH_FUNCTION_BEGIN + + // 先将所有已经结束的,给删掉 + CGRAPH_LOCK_GUARD lock(st_mutex_); + for (auto iter = secondary_threads_.begin(); iter != secondary_threads_.end(); ) { + !(*iter)->done_ ? secondary_threads_.erase(iter++) : iter++; } + CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION((size > secondary_threads_.size()), \ + "cannot release [" + std::to_string(size) + "] secondary thread," \ + + "only [" + std::to_string(secondary_threads_.size()) + "] left.") + + // 再标记几个需要删除的信息 + for (auto iter = secondary_threads_.begin(); + iter != secondary_threads_.end() && size-- > 0; ) { + (*iter)->done_ = false; + iter++; + } + CGRAPH_FUNCTION_END +} + + +CIndex UThreadPool::dispatch(CIndex origIndex) { CIndex realIndex = 0; if (CGRAPH_DEFAULT_TASK_STRATEGY == origIndex) { /** @@ -192,6 +209,8 @@ CStatus UThreadPool::createSecondaryThread(CInt size) { int leftSize = (int)(config_.max_thread_size_ - config_.default_thread_size_ - secondary_threads_.size()); int realSize = std::min(size, leftSize); // 使用 realSize 来确保所有的线程数量之和,不会超过设定max值 + + CGRAPH_LOCK_GUARD lock(st_mutex_); for (int i = 0; i < realSize; i++) { auto ptr = CGRAPH_MAKE_UNIQUE_COBJECT(UThreadSecondary) ptr->setThreadPoolInfo(&task_queue_, &priority_task_queue_, &config_); @@ -204,21 +223,22 @@ CStatus UThreadPool::createSecondaryThread(CInt size) { CVoid UThreadPool::monitor() { - while (is_monitor_) { - while (is_monitor_ && !is_init_) { + while (config_.monitor_enable_) { + while (config_.monitor_enable_ && !is_init_) { // 如果没有init,则一直处于空跑状态 CGRAPH_SLEEP_SECOND(1) } int span = config_.monitor_span_; - while (is_monitor_ && is_init_ && span--) { + while (config_.monitor_enable_ && is_init_ && span--) { CGRAPH_SLEEP_SECOND(1) // 保证可以快速退出 } // 如果 primary线程都在执行,则表示忙碌 - bool busy = std::all_of(primary_threads_.begin(), primary_threads_.end(), + bool busy = !primary_threads_.empty() && std::all_of(primary_threads_.begin(), primary_threads_.end(), [](UThreadPrimaryPtr ptr) { return nullptr != ptr && ptr->is_running_; }); + CGRAPH_LOCK_GUARD lock(st_mutex_); // 如果忙碌或者priority_task_queue_中有任务,则需要添加 secondary线程 if (busy || !priority_task_queue_.empty()) { createSecondaryThread(1); diff --git a/src/UtilsCtrl/ThreadPool/UThreadPool.h b/src/UtilsCtrl/ThreadPool/UThreadPool.h index 3783630..8f466ea 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPool.h +++ b/src/UtilsCtrl/ThreadPool/UThreadPool.h @@ -78,7 +78,7 @@ public: template auto commitWithPriority(const FunctionType& func, int priority) - -> std::future()())>; + -> std::future()())>;; /** * 执行任务组信息 @@ -102,12 +102,12 @@ public: CGRAPH_CALLBACK_CONST_FUNCTION_REF onFinished = nullptr); /** - * 获取根据线程id信息,获取线程num信息 + * 获取根据线程id信息,获取线程index信息 * @param tid * @return * @notice 辅助线程返回-1 */ - CIndex getThreadNum(CSize tid); + CIndex getThreadIndex(CSize tid); /** * 释放所有的线程信息 @@ -115,14 +115,11 @@ public: */ CStatus destroy() final; - -protected: /** - * 根据传入的策略信息,确定最终执行方式 - * @param origIndex + * 判断线程池是否已经初始化了 * @return */ - virtual CIndex dispatch(CIndex origIndex); + CBool isInit() const; /** * 生成辅助线程。内部确保辅助线程数量不超过设定参数 @@ -132,6 +129,21 @@ protected: CStatus createSecondaryThread(CInt size); /** + * 删除辅助线程 + * @param size + * @return + */ + CStatus releaseSecondaryThread(CInt size); + +protected: + /** + * 根据传入的策略信息,确定最终执行方式 + * @param origIndex + * @return + */ + virtual CIndex dispatch(CIndex origIndex); + + /** * 监控线程执行函数,主要是判断是否需要增加线程,或销毁线程 * 增/删 操作,仅针对secondary类型线程生效 */ @@ -141,9 +153,7 @@ protected: private: CBool is_init_ { false }; // 是否初始化 - CBool is_monitor_ { true }; // 是否需要监控 CInt cur_index_ = 0; // 记录放入的线程数 - CULong input_task_num_ = 0; // 放入的任务的个数 UAtomicQueue task_queue_; // 用于存放普通任务 UAtomicPriorityQueue priority_task_queue_; // 运行时间较长的任务队列,仅在辅助线程中执行 std::vector primary_threads_; // 记录所有的主线程 @@ -151,6 +161,7 @@ private: UThreadPoolConfig config_; // 线程池设置值 std::thread monitor_thread_; // 监控线程 std::map thread_record_map_; // 线程记录的信息 + std::mutex st_mutex_; // 辅助线程发生变动的时候,加的mutex信息 }; using UThreadPoolPtr = UThreadPool *; diff --git a/src/UtilsCtrl/ThreadPool/UThreadPool.inl b/src/UtilsCtrl/ThreadPool/UThreadPool.inl index 7485df7..4335593 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPool.inl +++ b/src/UtilsCtrl/ThreadPool/UThreadPool.inl @@ -24,7 +24,7 @@ auto UThreadPool::commit(const FunctionType& func, CIndex index) CIndex realIndex = dispatch(index); if (realIndex >= 0 && realIndex < config_.default_thread_size_) { // 如果返回的结果,在主线程数量之间,则放到主线程的queue中执行 - primary_threads_[realIndex]->work_stealing_queue_.push(std::move(task)); + primary_threads_[realIndex]->pushTask(std::move(task)); } else if (CGRAPH_LONG_TIME_TASK_STRATEGY == realIndex) { /** * 如果是长时间任务,则交给特定的任务队列,仅由辅助线程处理 @@ -36,7 +36,6 @@ auto UThreadPool::commit(const FunctionType& func, CIndex index) // 返回其他结果,放到pool的queue中执行 task_queue_.push(std::move(task)); } - input_task_num_++; // 计数 return result; } @@ -54,10 +53,9 @@ auto UThreadPool::commitWithPriority(const FunctionType& func, int priority) } priority_task_queue_.push(std::move(task), priority); - input_task_num_++; return result; } CGRAPH_NAMESPACE_END -#endif // CGRAPH_UTHREADPOOL_INL \ No newline at end of file +#endif // CGRAPH_UTHREADPOOL_INL diff --git a/src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h b/src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h index 518d893..71996da 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h +++ b/src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h @@ -14,7 +14,7 @@ CGRAPH_NAMESPACE_BEGIN -struct UThreadPoolConfig : public UThreadObject { +struct UThreadPoolConfig : public CStruct { /** 具体值含义,参考UThreadPoolDefine.h文件 */ int default_thread_size_ = CGRAPH_DEFAULT_THREAD_SIZE; int secondary_thread_size_ = CGRAPH_SECONDARY_THREAD_SIZE; @@ -25,37 +25,41 @@ struct UThreadPoolConfig : public UThreadObject { int max_steal_batch_size_ = CGRAPH_MAX_STEAL_BATCH_SIZE; int secondary_thread_ttl_ = CGRAPH_SECONDARY_THREAD_TTL; int monitor_span_ = CGRAPH_MONITOR_SPAN; + CMSec queue_emtpy_interval_ = CGRAPH_QUEUE_EMPTY_INTERVAL; int primary_thread_policy_ = CGRAPH_PRIMARY_THREAD_POLICY; int secondary_thread_policy_ = CGRAPH_SECONDARY_THREAD_POLICY; int primary_thread_priority_ = CGRAPH_PRIMARY_THREAD_PRIORITY; int secondary_thread_priority_ = CGRAPH_SECONDARY_THREAD_PRIORITY; bool bind_cpu_enable_ = CGRAPH_BIND_CPU_ENABLE; bool batch_task_enable_ = CGRAPH_BATCH_TASK_ENABLE; - bool fair_lock_enable_ = CGRAPH_FAIR_LOCK_ENABLE; bool monitor_enable_ = CGRAPH_MONITOR_ENABLE; + CStatus check() const { + CGRAPH_FUNCTION_BEGIN + if (default_thread_size_ < 0 || secondary_thread_size_ < 0) { + CGRAPH_RETURN_ERROR_STATUS("thread size cannot less than 0") + } + + if (default_thread_size_ + secondary_thread_size_ > max_thread_size_) { + CGRAPH_RETURN_ERROR_STATUS("max thread size is less than default + secondary thread") + } + + if (monitor_enable_ && monitor_span_ <= 0) { + CGRAPH_RETURN_ERROR_STATUS("monitor span cannot less than 0") + } + CGRAPH_FUNCTION_END + } protected: /** * 计算可盗取的范围,盗取范围不能超过默认线程数-1 * @return */ - [[nodiscard]] int calcStealRange() const { + int calcStealRange() const { int range = std::min(this->max_task_steal_range_, this->default_thread_size_ - 1); return range; } - - /** - * 计算是否开启批量任务 - * 开启条件:开关批量开启,并且 未开启非公平锁 - * @return - */ - [[nodiscard]] bool calcBatchTaskRatio() const { - bool ratio = (this->batch_task_enable_) && (!this->fair_lock_enable_); - return ratio; - } - friend class UThreadPrimary; friend class UThreadSecondary; }; diff --git a/src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h b/src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h index 49fdaa8..7867e0c 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h +++ b/src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h @@ -21,17 +21,6 @@ CGRAPH_NAMESPACE_BEGIN - #if __cplusplus >= 201703L -using CGRAPH_READ_LOCK = std::shared_lock; -using CGRAPH_WRITE_LOCK = std::unique_lock; - #else -using CGRAPH_READ_LOCK = std::unique_lock; // C++14不支持读写锁,使用mutex替代 -using CGRAPH_WRITE_LOCK = std::unique_lock; - #endif - -using CGRAPH_LOCK_GUARD = std::lock_guard; -using CGRAPH_UNIQUE_LOCK = std::unique_lock; - static const int CGRAPH_CPU_NUM = (int)std::thread::hardware_concurrency(); static const int CGRAPH_THREAD_TYPE_PRIMARY = 1; static const int CGRAPH_THREAD_TYPE_SECONDARY = 2; @@ -45,37 +34,38 @@ static const int CGRAPH_THREAD_SCHED_OTHER = 0; static const int CGRAPH_THREAD_SCHED_RR = 0; static const int CGRAPH_THREAD_SCHED_FIFO = 0; #endif -static const int CGRAPH_THREAD_MIN_PRIORITY = 0; // 线程最低优先级 -static const int CGRAPH_THREAD_MAX_PRIORITY = 99; // 线程最高优先级 -static const CMSec CGRAPH_MAX_BLOCK_TTL = 10000000; // 最大阻塞时间,单位为ms +static const CInt CGRAPH_THREAD_MIN_PRIORITY = 0; // 线程最低优先级 +static const CInt CGRAPH_THREAD_MAX_PRIORITY = 99; // 线程最高优先级 +static const CMSec CGRAPH_MAX_BLOCK_TTL = 1999999999; // 最大阻塞时间,单位为ms static const CUint CGRAPH_DEFAULT_RINGBUFFER_SIZE = 1024; // 默认环形队列的大小 -const static CIndex CGRAPH_SECONDARY_THREAD_COMMON_ID = -1; // 辅助线程统一id标识 +static const CIndex CGRAPH_SECONDARY_THREAD_COMMON_ID = -1; // 辅助线程统一id标识 +static const CInt CGRAPH_DEFAULT_PRIORITY = 0; // 默认优先级 + static const int CGRAPH_DEFAULT_TASK_STRATEGY = -1; // 默认线程调度策略 +static const int CGRAPH_POOL_TASK_STRATEGY = -2; // 固定用pool中的队列的调度策略 static const int CGRAPH_LONG_TIME_TASK_STRATEGY = -101; // 长时间任务调度策略 -static const int CGRAPH_REGION_TASK_STRATEGY = -102; // region的调度策略 -static const int CGRAPH_EVENT_TASK_STRATEGY = -103; // event的调度策略 /** * 以下为线程池配置信息 */ -static const int CGRAPH_DEFAULT_THREAD_SIZE = 8; // 默认主线程个数 -static const int CGRAPH_SECONDARY_THREAD_SIZE = 0; // 默认开启辅助线程个数 -static const int CGRAPH_MAX_THREAD_SIZE = (CGRAPH_DEFAULT_THREAD_SIZE * 2) + 1; // 最大线程个数 +static const int CGRAPH_DEFAULT_THREAD_SIZE = 0; // 默认开启主线程个数 +static const int CGRAPH_SECONDARY_THREAD_SIZE = 8; // 默认开启辅助线程个数 +static const int CGRAPH_MAX_THREAD_SIZE = 16; // 最大线程个数 static const int CGRAPH_MAX_TASK_STEAL_RANGE = 2; // 盗取机制相邻范围 static const bool CGRAPH_BATCH_TASK_ENABLE = false; // 是否开启批量任务功能 static const int CGRAPH_MAX_LOCAL_BATCH_SIZE = 2; // 批量执行本地任务最大值 static const int CGRAPH_MAX_POOL_BATCH_SIZE = 2; // 批量执行通用任务最大值 static const int CGRAPH_MAX_STEAL_BATCH_SIZE = 2; // 批量盗取任务最大值 -static const bool CGRAPH_FAIR_LOCK_ENABLE = false; // 是否开启公平锁(非必须场景不建议开启,开启后CGRAPH_BATCH_TASK_ENABLE无效) static const int CGRAPH_SECONDARY_THREAD_TTL = 10; // 辅助线程ttl,单位为s -static const bool CGRAPH_MONITOR_ENABLE = true; // 是否开启监控程序(如果不开启,辅助线程策略将失效。建议开启) +static const bool CGRAPH_MONITOR_ENABLE = false; // 是否开启监控程序 static const int CGRAPH_MONITOR_SPAN = 5; // 监控线程执行间隔,单位为s +static const CMSec CGRAPH_QUEUE_EMPTY_INTERVAL = 3; // 队列为空时,等待的时间。仅针对辅助线程,单位为ms static const bool CGRAPH_BIND_CPU_ENABLE = false; // 是否开启绑定cpu模式(仅针对主线程) static const int CGRAPH_PRIMARY_THREAD_POLICY = CGRAPH_THREAD_SCHED_OTHER; // 主线程调度策略 static const int CGRAPH_SECONDARY_THREAD_POLICY = CGRAPH_THREAD_SCHED_OTHER; // 辅助线程调度策略 -static const int CGRAPH_PRIMARY_THREAD_PRIORITY = CGRAPH_THREAD_MIN_PRIORITY; // 主线程调度优先级(取值范围0~99) -static const int CGRAPH_SECONDARY_THREAD_PRIORITY = CGRAPH_THREAD_MIN_PRIORITY; // 辅助线程调度优先级(取值范围0~99) +static const int CGRAPH_PRIMARY_THREAD_PRIORITY = CGRAPH_THREAD_MIN_PRIORITY; // 主线程调度优先级(取值范围0~99,配合调度策略一起使用,不建议不了解相关内容的童鞋做修改) +static const int CGRAPH_SECONDARY_THREAD_PRIORITY = CGRAPH_THREAD_MIN_PRIORITY; // 辅助线程调度优先级(同上) CGRAPH_NAMESPACE_END diff --git a/src/UtilsCtrl/UAllocator.h b/src/UtilsCtrl/UAllocator.h index 7dbccf8..c2d95a0 100644 --- a/src/UtilsCtrl/UAllocator.h +++ b/src/UtilsCtrl/UAllocator.h @@ -22,18 +22,25 @@ CGRAPH_NAMESPACE_BEGIN class UAllocator : public CObject { public: /** - * 生成普通指针信息 + * 生成一个 CObject 对象 * @tparam T * @return */ template::value, int> = 0> static T* safeMallocCObject() { - T* ptr = nullptr; - while (!ptr) { - ptr = new(std::nothrow) T(); - } - return ptr; + return safeMalloc(); + } + + /** + * 生成一个 CStruct 的对象 + * @tparam T + * @return + */ + template::value, int> = 0> + static T* safeMallocCStruct() { + return safeMalloc(); } /** @@ -64,6 +71,21 @@ public: static std::unique_ptr makeUniqueCObject() { return c_make_unique(); } + +private: + /** + * 生成T类型的对象 + * @tparam T + * @return + */ + template + static T* safeMalloc() { + T* ptr = nullptr; + while (!ptr) { + ptr = new(std::nothrow) T(); + } + return ptr; + } }; diff --git a/src/UtilsCtrl/UtilsDefine.h b/src/UtilsCtrl/UtilsDefine.h index 91f50c2..27e5a45 100644 --- a/src/UtilsCtrl/UtilsDefine.h +++ b/src/UtilsCtrl/UtilsDefine.h @@ -12,6 +12,12 @@ #include #include + #if __cplusplus >= 201703L +#include + #else +#include + #endif + #include "../CBasic/CBasicInclude.h" #include "UAllocator.h" #include "UtilsFunction.h" @@ -26,60 +32,102 @@ CGRAPH_NAMESPACE_BEGIN #define unlikely #endif -/* 判断传入的指针信息是否为空 */ -#define CGRAPH_ASSERT_NOT_NULL(ptr) \ - if (unlikely(nullptr == (ptr))) { \ - return CStatus("input is nullptr"); \ - } \ +using CGRAPH_LOCK_GUARD = std::lock_guard; +using CGRAPH_UNIQUE_LOCK = std::unique_lock; + +#if __cplusplus >= 201703L + using CGRAPH_READ_LOCK = std::shared_lock; + using CGRAPH_WRITE_LOCK = std::unique_lock; +#else + using CGRAPH_READ_LOCK = CGRAPH_LOCK_GUARD; // C++14不支持读写锁,使用mutex替代 + using CGRAPH_WRITE_LOCK = CGRAPH_LOCK_GUARD; +#endif + -#define CGRAPH_ASSERT_NOT_NULL_RETURN_NULL(ptr) \ - if (unlikely(nullptr == (ptr))) { \ - return nullptr; \ - } \ +template +CStatus __ASSERT_NOT_NULL(T t) { + return (unlikely(nullptr == t)) + ? CErrStatus(CGRAPH_INPUT_IS_NULL) + : CStatus(); +} -#define CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(ptr) \ - if (unlikely(nullptr == (ptr))) { \ - CGRAPH_THROW_EXCEPTION("input is null") \ +template +CStatus __ASSERT_NOT_NULL(T t, Args... args) { + if (unlikely(t == nullptr)) { + return __ASSERT_NOT_NULL(t); } + return __ASSERT_NOT_NULL(args...); +} + +template +CVoid __ASSERT_NOT_NULL_THROW_EXCEPTION(T t) { + if (unlikely(nullptr == t)) { + CGRAPH_THROW_EXCEPTION("[CException] " + CGRAPH_INPUT_IS_NULL) + } +} + +template +CVoid __ASSERT_NOT_NULL_THROW_EXCEPTION(T t, Args... args) { + if (unlikely(nullptr == t)) { + __ASSERT_NOT_NULL_THROW_EXCEPTION(t); + } + + __ASSERT_NOT_NULL_THROW_EXCEPTION(args...); +} + + +/** 判断传入的多个指针信息,是否为空 */ +#define CGRAPH_ASSERT_NOT_NULL(ptr, ...) \ + { \ + const CStatus& __cur_status__ = __ASSERT_NOT_NULL(ptr, ##__VA_ARGS__); \ + if (unlikely(__cur_status__.isErr())) { return __cur_status__; } \ + } \ + + +/** 判断传入的多个指针,是否为空。如果为空,则抛出异常信息 */ +#define CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(ptr, ...) \ + __ASSERT_NOT_NULL_THROW_EXCEPTION(ptr, ##__VA_ARGS__); \ + /* 判断函数流程是否可以继续 */ static std::mutex g_check_status_mtx; #define CGRAPH_FUNCTION_CHECK_STATUS \ if (unlikely(status.isErr())) { \ - std::lock_guard lock{ g_check_status_mtx }; \ - CGRAPH_ECHO("%s | %s | line = [%d], errorCode = [%d], errorInfo = [%s].", \ - __FILE__, __FUNCTION__, __LINE__, status.getCode(), status.getInfo().c_str()); \ + if (status.isCrash()) { throw CException(status.getInfo()); } \ + CGRAPH_LOCK_GUARD lock{ g_check_status_mtx }; \ + CGRAPH_ECHO("%s, errorCode = [%d], errorInfo = [%s].", \ + status.getLocate().c_str(), status.getCode(), status.getInfo().c_str()); \ return status; \ } \ /* 删除资源信息 */ -#define CGRAPH_DELETE_PTR(ptr) \ - if (unlikely((ptr) != nullptr)) { \ - delete (ptr); \ - (ptr) = nullptr; \ - } \ +#define CGRAPH_DELETE_PTR(ptr) \ + if (unlikely((ptr) != nullptr)) { \ + delete (ptr); \ + (ptr) = nullptr; \ + } \ -#define CGRAPH_ASSERT_INIT(isInit) \ - if (unlikely((isInit) != is_init_)) { \ - return CStatus("init status is not suitable"); \ - } \ +#define CGRAPH_ASSERT_INIT(isInit) \ + if (unlikely((isInit) != is_init_)) { \ + CGRAPH_RETURN_ERROR_STATUS("init status is not suitable") \ + } \ -#define CGRAPH_ASSERT_INIT_RETURN_NULL(isInit) \ - if (unlikely((isInit) != is_init_)) { \ - return nullptr; \ - } \ +#define CGRAPH_ASSERT_INIT_THROW_ERROR(isInit) \ + if (unlikely((isInit) != is_init_)) { \ + CGRAPH_THROW_EXCEPTION("[CException] init status is not suitable") \ + } \ -#define CGRAPH_CHECK_STATUS_RETURN_THIS_OR_NULL \ - return status.isOK() ? this : nullptr; \ - -#define CGRAPH_SLEEP_MILLISECOND(ms) \ - std::this_thread::sleep_for(std::chrono::milliseconds(ms)); \ +#define CGRAPH_CHECK_STATUS_RETURN_THIS_OR_NULL \ + return status.isOK() ? this : nullptr; \ #define CGRAPH_SLEEP_SECOND(s) \ std::this_thread::sleep_for(std::chrono::seconds(s)); \ +#define CGRAPH_SLEEP_MILLISECOND(ms) \ + std::this_thread::sleep_for(std::chrono::milliseconds(ms)); \ + CGRAPH_NAMESPACE_END #endif //CGRAPH_UTILSDEFINE_H diff --git a/src/UtilsCtrl/UtilsFunction.h b/src/UtilsCtrl/UtilsFunction.h index 1b9b4b0..45a571c 100644 --- a/src/UtilsCtrl/UtilsFunction.h +++ b/src/UtilsCtrl/UtilsFunction.h @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -32,28 +33,11 @@ inline CVoid CGRAPH_ECHO(const char *cmd, ...) { #endif std::lock_guard lock{ g_echo_mtx }; -#ifndef _WIN32 - // 非windows系统,打印到毫秒 auto now = std::chrono::system_clock::now(); - // 通过不同精度获取相差的毫秒数 - uint64_t disMs = std::chrono::duration_cast(now.time_since_epoch()).count() - - std::chrono::duration_cast(now.time_since_epoch()).count() * 1000; - time_t tt = std::chrono::system_clock::to_time_t(now); - auto localTime = localtime(&tt); - char strTime[32] = { 0 }; - sprintf(strTime, "[%04d-%02d-%02d %02d:%02d:%02d.%03d]", localTime->tm_year + 1900, - localTime->tm_mon + 1, localTime->tm_mday, localTime->tm_hour, - localTime->tm_min, localTime->tm_sec, (int)disMs); - std::cout << "[CGraph] " << strTime << " "; -#else - // windows系统,打印到秒 - time_t curTime; - time(&curTime); - std::string ct = ctime(&curTime); - std::cout << "[CGraph] [" - << ct.assign(ct.begin(), ct.end()-1) // 去掉时间的最后一位\n信息 - << "] "; -#endif + auto time = std::chrono::system_clock::to_time_t(now); + auto ms = std::chrono::duration_cast(now.time_since_epoch()).count() % 1000; + std::cout << "[" << std::put_time(std::localtime(&time), "%Y-%m-%d %H:%M:%S.") \ + << std::setfill('0') << std::setw(3) << ms << "] "; va_list args; va_start(args, cmd); @@ -64,6 +48,28 @@ inline CVoid CGRAPH_ECHO(const char *cmd, ...) { /** + * 获取当前的ms信息 + * @return + */ +inline CMSec CGRAPH_GET_CURRENT_MS() { + // 获取当前的时间戳信息 + return (CMSec)std::chrono::time_point_cast \ + (std::chrono::steady_clock::now()).time_since_epoch().count(); +} + + +/** + * 获取当前的ms信息(包含小数) + * @return + */ +inline CFMSec CGRAPH_GET_CURRENT_ACCURATE_MS() { + // 获取当前的时间戳信息 + return (CFMSec)std::chrono::time_point_cast \ + (std::chrono::steady_clock::now()).time_since_epoch().count() / (CFMSec)1000.0; +} + + +/** * 通用容器累加信息 * @tparam T (例:std::vector) * @param container