Browse Source

feat : 第一个功能版本

chunelfeng 3 years ago
commit
3cfcc31ad3
40 changed files with 2806 additions and 0 deletions
  1. +4
    -0
      .gitignore
  2. +24
    -0
      CMakeLists.txt
  3. +36
    -0
      MyFunction.h
  4. +83
    -0
      README.md
  5. +20
    -0
      cmake/CThreadPool-env-include.cmake
  6. BIN
      doc/image/CThreadPool Author.jpg
  7. +37
    -0
      src/CBasic/CBasicDefine.h
  8. +20
    -0
      src/CBasic/CBasicInclude.h
  9. +39
    -0
      src/CBasic/CException.h
  10. +67
    -0
      src/CBasic/CFuncType.h
  11. +22
    -0
      src/CBasic/CInfoDefine.h
  12. +52
    -0
      src/CBasic/CObject.h
  13. +104
    -0
      src/CBasic/CStatus.h
  14. +36
    -0
      src/CBasic/CValType.h
  15. +15
    -0
      src/CThreadPool.h
  16. +90
    -0
      src/UtilsCtrl/ThreadPool/Queue/UAtomicPriorityQueue.h
  17. +131
    -0
      src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h
  18. +16
    -0
      src/UtilsCtrl/ThreadPool/Queue/UQueueInclude.h
  19. +26
    -0
      src/UtilsCtrl/ThreadPool/Queue/UQueueObject.h
  20. +133
    -0
      src/UtilsCtrl/ThreadPool/Queue/UWorkStealingQueue.h
  21. +80
    -0
      src/UtilsCtrl/ThreadPool/Task/UTask.h
  22. +102
    -0
      src/UtilsCtrl/ThreadPool/Task/UTaskGroup.h
  23. +15
    -0
      src/UtilsCtrl/ThreadPool/Task/UTaskInclude.h
  24. +212
    -0
      src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h
  25. +15
    -0
      src/UtilsCtrl/ThreadPool/Thread/UThreadInclude.h
  26. +216
    -0
      src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h
  27. +131
    -0
      src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h
  28. +30
    -0
      src/UtilsCtrl/ThreadPool/UThreadObject.h
  29. +201
    -0
      src/UtilsCtrl/ThreadPool/UThreadPool.cpp
  30. +152
    -0
      src/UtilsCtrl/ThreadPool/UThreadPool.h
  31. +62
    -0
      src/UtilsCtrl/ThreadPool/UThreadPool.inl
  32. +66
    -0
      src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h
  33. +78
    -0
      src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h
  34. +20
    -0
      src/UtilsCtrl/ThreadPool/UThreadPoolInclude.h
  35. +87
    -0
      src/UtilsCtrl/UAllocator.h
  36. +75
    -0
      src/UtilsCtrl/UtilsDefine.h
  37. +131
    -0
      src/UtilsCtrl/UtilsFunction.h
  38. +16
    -0
      src/UtilsCtrl/UtilsInclude.h
  39. +26
    -0
      src/UtilsCtrl/UtilsObject.h
  40. +136
    -0
      tutorial.cpp

+ 4
- 0
.gitignore View File

@ -0,0 +1,4 @@
/cmake-build-debug/
/cmake-build-release/
/.idea/
.DS_Store

+ 24
- 0
CMakeLists.txt View File

@ -0,0 +1,24 @@
cmake_minimum_required(VERSION 3.2.5)
project(CThreadPool VERSION 1.0.0)
set(CMAKE_CXX_STANDARD 14)
# add CThreadPool environment info
include(cmake/CThreadPool-env-include.cmake)
file(GLOB_RECURSE CTP_SRC_LIST "./src/*.cpp")
# CGraph
# add_definitions(-D_CGRAPH_SILENCE_)
# libCThreadPool
# add_library(CThreadPool SHARED ${CTP_SRC_LIST})
# libCThreadPool
# add_library(CThreadPool STATIC ${CTP_SRC_LIST})
add_executable(CThreadPool
${CTP_SRC_LIST}
tutorial.cpp)

+ 36
- 0
MyFunction.h View File

@ -0,0 +1,36 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: MyFunction.h
@Time: 2021/9/2 11:20
@Desc:
***************************/
#ifndef CGRAPH_MYFUNCTION_H
#define CGRAPH_MYFUNCTION_H
int add(int i, int j) {
return i + j;
}
static float minusBy5(float i) {
return i - 5.0f;
}
class MyFunction {
public:
std::string concat(std::string& str) const {
return info_ + str;
}
static int multiply(int i, int j) {
return i * j;
}
private:
std::string info_ = "MyFunction : ";
};
#endif //CGRAPH_MYFUNCTION_H

+ 83
- 0
README.md View File

@ -0,0 +1,83 @@
<p align="left">
<a href="https://github.com/ChunelFeng/CThreadPool"><img src="https://badgen.net/badge/langs/C++/cyan?list=1" alt="languages"></a>
<a href="https://github.com/ChunelFeng/CThreadPool"><img src="https://badgen.net/badge/os/MacOS,Linux,Windows/cyan?list=1" alt="os"></a>
<a href="https://github.com/ChunelFeng/CThreadPool/stargazers"><img src="https://badgen.net/github/stars/ChunelFeng/CThreadPool?color=cyan" alt="stars"></a>
<a href="https://github.com/ChunelFeng/CThreadPool/network/members"><img src="https://badgen.net/github/forks/ChunelFeng/CThreadPool?color=cyan" alt="forks"></a>
</p>
<h1 align="center">
CThreadPool 说明文档
</h1>
## 一. 简介
`CThreadPool` 是一个跨平台的、无任何三方依赖的、高性能的C++14(含以上版本)版本的线程池,也是 [CGraph](https://github.com/ChunelFeng/CGraph) 项目中使用的跨平台线程池组件功能的最小集。
经过CGraph和关联项目的长期迭代和验证,功能已经趋于稳定,且性能优异。因为咨询相关内容的朋友较多,故做为独立的仓库提供出来,方便大家使用。
由于是CGraph项目中的剥离出来的功能类,故在项目中保留了多处 `CGRAPH_*` 的命名方式,仅将 namespace 修改为 `CTP`,预计今后与CGraph相互独立更新。
本项目参考了[《C++并发编程实战(第二版)》](https://nj.gitbooks.io/c/content/) 中的部分内容,和github上部分相关的优秀工程。并在此基础上进行大量的改动、扩展和优化,在功能、性能和易用性上均有较大的提升。
在开发过程中,也沉淀了详细的说明文档(见下方 <b>推荐阅读</b>),以便于大家快速了解代码和思路,也请大家不吝指教。
## 二. 编译说明
* 本工程支持MacOS、Linux和Windows系统,无任何第三方依赖。推荐使用C++14(默认)或以上版本,不支持C++11或以下版本
* 使用CLion作为IDE的开发者,或使用Visual Studio 15(或以上版本)作为IDE的开发者,打开CMakeLists.txt文件作为工程,即可编译通过
* Linux环境开发者,在命令行模式下,输入以下指令,即可编译通过
```shell
$ git clone https://github.com/ChunelFeng/CThreadPool.git
$ cd CThreadPool
$ cmake . -Bbuild
$ cd build
$ make -j8
```
## 三. 使用Demo
```cpp
#include "src/CThreadPool.h"
using namespace CTP;
float add_by_5(float i) {
return i + 5.0f;
}
void tutorial() {
UThreadPool tp;
int i = 6, j = 3;
auto r1 = tp.commit([i, j] { return i - j; });
std::future<float> r2 = tp.commit(std::bind(add_by_5, 8.5f));
std::cout << r1.get() << std::endl;
std::cout << r2.get() << std::endl;
}
```
更多使用方法,请参考 `tutorial.cpp` 中的例子和文档中的内容。
## 四. 推荐阅读
* [纯序员给你介绍图化框架的简单实现——线程池优化(一)](http://www.chunel.cn/archives/cgraph-threadpool-1-introduce)
* [纯序员给你介绍图化框架的简单实现——线程池优化(二)](http://www.chunel.cn/archives/cgraph-threadpool-2-introduce)
* [纯序员给你介绍图化框架的简单实现——线程池优化(三)](http://www.chunel.cn/archives/cgraph-threadpool-3-introduce)
* [纯序员给你介绍图化框架的简单实现——线程池优化(四)](http://www.chunel.cn/archives/cgraph-threadpool-4-introduce)
* [纯序员给你介绍图化框架的简单实现——线程池优化(五)](http://www.chunel.cn/archives/cgraph-threadpool-5-introduce)
* [纯序员给你介绍图化框架的简单实现——线程池优化(六)](http://www.chunel.cn/archives/cgraph-threadpool-6-introduce)
## 五. 关联项目
* [CGraph : A simple C++ DAG framework](https://github.com/ChunelFeng/CGraph)
------------
#### 附录-1. 版本信息
[2022.10.05 - v1.0.0 - Chunel]
* 提供线程池基本功能
* 提供对应的tutorial信息
------------
#### 附录-2. 联系方式
* 微信: ChunelFeng
* 邮箱: chunel@foxmail.com
* 源码: https://github.com/ChunelFeng/CThreadPool
* 论坛: www.chunel.cn
![CGraph Author](https://github.com/ChunelFeng/CThreadPool/blob/main/doc/image/CThreadPool%20Author.jpg)

+ 20
- 0
cmake/CThreadPool-env-include.cmake View File

@ -0,0 +1,20 @@
# cmakeCGraphC++
IF(APPLE)
# macsession
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -m64 -finline-functions -Wno-deprecated-declarations -Wno-c++17-extensions")
add_definitions(-D_GENERATE_SESSION_)
add_definitions(-D_ENABLE_LIKELY_)
ELSEIF(UNIX)
# linux线
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O2 -pthread -Wno-format-overflow")
add_definitions(-D_ENABLE_LIKELY_)
ELSEIF(WIN32)
# windowsutf-8
add_definitions(/utf-8)
# warning
add_compile_options(/wd4996)
add_compile_options(/wd4267)
ENDIF()

BIN
doc/image/CThreadPool Author.jpg View File

Before After
Width: 430  |  Height: 430  |  Size: 40 KiB

+ 37
- 0
src/CBasic/CBasicDefine.h View File

@ -0,0 +1,37 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: CBasicDefine.h
@Time: 2021/4/26 8:15
@Desc:
***************************/
#ifndef CGRAPH_CBASICDEFINE_H
#define CGRAPH_CBASICDEFINE_H
#include <cstddef>
#define CGRAPH_NAMESPACE_BEGIN \
namespace CTP { \
#define CGRAPH_NAMESPACE_END \
} /* end of namespace CTP */ \
CGRAPH_NAMESPACE_BEGIN
using CCHAR = char;
using CUINT = unsigned int;
using CVOID = void;
using CINT = int;
using CLONG = long;
using CULONG = unsigned long;
using CBOOL = bool;
using CBIGBOOL = int;
using CFLOAT = float;
using CDOUBLE = double;
using CCONSTR = const char*;
using CSIZE = size_t;
CGRAPH_NAMESPACE_END
#endif //CGRAPH_CBASICDEFINE_H

+ 20
- 0
src/CBasic/CBasicInclude.h View File

@ -0,0 +1,20 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: CBasicInclude.h
@Time: 2022/2/1 4:23
@Desc:
***************************/
#ifndef CGRAPH_CBASICINCLUDE_H
#define CGRAPH_CBASICINCLUDE_H
#include "CObject.h"
#include "CValType.h"
#include "CFuncType.h"
#include "CStatus.h"
#include "CException.h"
#include "CBasicDefine.h"
#include "CInfoDefine.h"
#endif //CGRAPH_CBASICINCLUDE_H

+ 39
- 0
src/CBasic/CException.h View File

@ -0,0 +1,39 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: CException.h
@Time: 2022/4/15 20:51
@Desc:
***************************/
#ifndef CGRAPH_CEXCEPTION_H
#define CGRAPH_CEXCEPTION_H
#include <string>
#include <exception>
#include "CInfoDefine.h"
CGRAPH_NAMESPACE_BEGIN
class CEXCEPTION : public std::exception {
public:
explicit CEXCEPTION(const std::string& info = CGRAPH_EMPTY) {
info_ = info.empty() ? CGRAPH_BASIC_EXCEPTION : info;
}
/**
*
* @return
*/
[[nodiscard]] const char* what() const noexcept override {
return info_.c_str();
}
private:
std::string info_; //
};
CGRAPH_NAMESPACE_END
#endif //CGRAPH_CEXCEPTION_H

+ 67
- 0
src/CBasic/CFuncType.h View File

@ -0,0 +1,67 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: CFuncType.h
@Time: 2022/2/3 1:05
@Desc:
***************************/
#ifndef CGRAPH_CFUNCTYPE_H
#define CGRAPH_CFUNCTYPE_H
#include <functional>
#include "CInfoDefine.h"
#include "CValType.h"
CGRAPH_NAMESPACE_BEGIN
using CGRAPH_DEFAULT_FUNCTION = std::function<void()>;
using CGRAPH_DEFAULT_CONST_FUNCTION_REF = const std::function<void()>&;
using CGRAPH_CSTATUS_FUNCTION = std::function<CStatus()>;
using CGRAPH_CSTATUS_CONST_FUNCTION_REF = const std::function<CStatus()>&;
using CGRAPH_CALLBACK_FUNCTION = std::function<void(CStatus)>;
using CGRAPH_CALLBACK_CONST_FUNCTION_REF = const std::function<void(CStatus)>&;
/**
*
*/
enum class CFunctionType {
INIT = 1, /** 初始化函数 */
RUN = 2, /** 执行函数 */
DESTROY = 3 /** 释放函数 */
};
/** 开启函数流程 */
#define CGRAPH_FUNCTION_BEGIN \
CStatus status; \
/** 结束函数流程 */
#define CGRAPH_FUNCTION_END \
return status; \
/** 无任何功能函数 */
#define CGRAPH_EMPTY_FUNCTION \
return CStatus(); \
/** 不支持当前功能 */
#define CGRAPH_NO_SUPPORT \
return CStatus(CGRAPH_FUNCTION_NO_SUPPORT); \
/** 返回异常信息和状态 */
#define CGRAPH_RETURN_ERROR_STATUS(info) \
return CStatus(info); \
/** 定义为不能赋值和拷贝的对象类型 */
#define CGRAPH_NO_ALLOWED_COPY(CType) \
CType(const CType &) = delete; \
const CType &operator=(const CType &) = delete; \
/** 抛出异常 */
#define CGRAPH_THROW_EXCEPTION(info) \
throw CException(info); \
CGRAPH_NAMESPACE_END
#endif //CGRAPH_CFUNCTYPE_H

+ 22
- 0
src/CBasic/CInfoDefine.h View File

@ -0,0 +1,22 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: CInfoDefine.h
@Time: 2022/4/16 14:01
@Desc:
***************************/
#ifndef CGRAPH_CINFODEFINE_H
#define CGRAPH_CINFODEFINE_H
#include "CBasicDefine.h"
CGRAPH_NAMESPACE_BEGIN
static const char* CGRAPH_EMPTY = "";
static const char* CGRAPH_BASIC_EXCEPTION = "CGraph Exception";
static const char* CGRAPH_FUNCTION_NO_SUPPORT = "function no support";
CGRAPH_NAMESPACE_END
#endif //CGRAPH_CINFODEFINE_H

+ 52
- 0
src/CBasic/CObject.h View File

@ -0,0 +1,52 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: CObject.h
@Time: 2021/4/26 8:12
@Desc: run()
***************************/
#ifndef CGRAPH_COBJECT_H
#define CGRAPH_COBJECT_H
#include "CBasicDefine.h"
#include "CValType.h"
#include "CFuncType.h"
CGRAPH_NAMESPACE_BEGIN
class CObject {
public:
/**
*
*/
explicit CObject() = default;
/**
*
*/
virtual CStatus init() {
CGRAPH_EMPTY_FUNCTION
}
/**
*
*/
virtual CStatus run() = 0;
/**
* deinit函数
*/
virtual CStatus destroy() {
CGRAPH_EMPTY_FUNCTION
}
/**
*
*/
virtual ~CObject() = default;
};
CGRAPH_NAMESPACE_END
#endif //CGRAPH_COBJECT_H

+ 104
- 0
src/CBasic/CStatus.h View File

@ -0,0 +1,104 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: CStatus.h
@Time: 2021/12/17 10:32
@Desc: CSTATUS CStatus
***************************/
#ifndef CGRAPH_CSTATUS_H
#define CGRAPH_CSTATUS_H
#include <string>
#include "CBasicDefine.h"
CGRAPH_NAMESPACE_BEGIN
static const int STATUS_OK = 0; /** 正常流程返回值 */
static const int STATUS_ERR = -1; /** 异常流程返回值 */
static const char* STATUS_ERROR_INFO_CONNECTOR = " && "; /** 多异常信息连接符号 */
class CSTATUS {
public:
explicit CSTATUS() = default;
explicit CSTATUS(const std::string &errorInfo) {
this->error_code_ = STATUS_ERR; // error code信息
this->error_info_ = errorInfo;
}
explicit CSTATUS(int errorCode, const std::string &errorInfo) {
this->error_code_ = errorCode;
this->error_info_ = errorInfo;
}
CSTATUS(const CSTATUS &status) {
this->error_code_ = status.error_code_;
this->error_info_ = status.error_info_;
}
CSTATUS(const CSTATUS &&status) noexcept {
this->error_code_ = status.error_code_;
this->error_info_ = status.error_info_;
}
CSTATUS& operator=(const CSTATUS& status) = default;
CSTATUS& operator+=(const CSTATUS& cur) {
if (this->isOK() && cur.isOK()) {
return (*this);
}
error_info_ = this->isOK()
? cur.error_info_
: (cur.isOK()
? error_info_
: (error_info_ + STATUS_ERROR_INFO_CONNECTOR + cur.error_info_));
error_code_ = STATUS_ERR;
return (*this);
}
void setStatus(const std::string& info) {
error_code_ = STATUS_ERR;
error_info_ = info;
}
void setStatus(int code, const std::string& info) {
error_code_ = code;
error_info_ = info;
}
[[nodiscard]] int getCode() const {
return this->error_code_;
}
[[nodiscard]] const std::string& getInfo() const {
return this->error_info_;
}
/**
*
* @return
*/
[[nodiscard]] bool isOK() const {
return STATUS_OK == error_code_;
}
/**
*
* @return
*/
[[nodiscard]] bool isErr() const {
return error_code_ < STATUS_OK; //
}
private:
int error_code_ { STATUS_OK }; //
std::string error_info_; //
};
CGRAPH_NAMESPACE_END
#endif //CGRAPH_CSTATUS_H

+ 36
- 0
src/CBasic/CValType.h View File

@ -0,0 +1,36 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: CValTypes.h
@Time: 2022/2/3 12:58
@Desc:
***************************/
#ifndef CGRAPH_CVALTYPE_H
#define CGRAPH_CVALTYPE_H
#include "CStatus.h"
#include "CException.h"
using CChar = CTP::CCHAR;
using CUint = CTP::CUINT;
using CSec = CTP::CUINT; // , for second
using CMSec = CTP::CUINT; // , for millisecond
using CSize = CTP::CSIZE;
using CVoid = CTP::CVOID;
using CVoidPtr = CTP::CVOID *;
using CInt = CTP::CINT;
using CLevel = CTP::CINT;
using CLong = CTP::CLONG;
using CULong = CTP::CULONG;
using CBool = CTP::CBOOL;
using CIndex = CTP::CINT; //
using CFloat = CTP::CFLOAT;
using CDouble = CTP::CDOUBLE;
using CConStr = CTP::CCONSTR; // const char*
using CBigBool = CTP::CBIGBOOL;
using CStatus = CTP::CSTATUS;
using CException = CTP::CEXCEPTION;
#endif //CGRAPH_CVALTYPE_H

+ 15
- 0
src/CThreadPool.h View File

@ -0,0 +1,15 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: CThreadPool.h
@Time: 2022/10/5 20:08
@Desc:
***************************/
#ifndef CTHREADPOOL_CTHREADPOOL_H
#define CTHREADPOOL_CTHREADPOOL_H
#include "CBasic/CBasicInclude.h"
#include "UtilsCtrl/UtilsInclude.h"
#endif //CTHREADPOOL_CTHREADPOOL_H

+ 90
- 0
src/UtilsCtrl/ThreadPool/Queue/UAtomicPriorityQueue.h View File

@ -0,0 +1,90 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UAtomicPriorityQueue.h
@Time: 2022/10/1 21:40
@Desc: 线 priority_queue和queue的弹出方式不一致
***************************/
#ifndef CGRAPH_UATOMICPRIORITYQUEUE_H
#define CGRAPH_UATOMICPRIORITYQUEUE_H
#include <queue>
#include "UQueueObject.h"
CGRAPH_NAMESPACE_BEGIN
template<typename T>
class UAtomicPriorityQueue : public UQueueObject {
public:
UAtomicPriorityQueue() = default;
/**
*
* @param value
* @return
*/
CBool tryPop(T& value) {
CGRAPH_LOCK_GUARD lk(mutex_);
if (priority_queue_.empty()) {
return false;
}
value = std::move(*priority_queue_.top());
priority_queue_.pop();
return true;
}
/**
*
* @param values
* @param maxPoolBatchSize
* @return
*/
CBool tryPop(std::vector<T>& values, int maxPoolBatchSize) {
CGRAPH_LOCK_GUARD lk(mutex_);
if (priority_queue_.empty() || maxPoolBatchSize <= 0) {
return false;
}
while (!priority_queue_.empty() && maxPoolBatchSize--) {
values.emplace_back(std::move(*priority_queue_.top()));
priority_queue_.pop();
}
return true;
}
/**
*
* @param value
* @param priority
* @return
*/
CVoid push(T&& value, int priority) {
std::unique_ptr<T> task(std::make_unique<T>(std::move(value), priority));
CGRAPH_LOCK_GUARD lk(mutex_);
priority_queue_.push(std::move(task));
}
/**
*
* @return
*/
[[nodiscard]] CBool empty() {
CGRAPH_LOCK_GUARD lk(mutex_);
return priority_queue_.empty();
}
CGRAPH_NO_ALLOWED_COPY(UAtomicPriorityQueue)
private:
std::priority_queue<std::unique_ptr<T> > priority_queue_; //
};
CGRAPH_NAMESPACE_END
#endif //CGRAPH_UATOMICPRIORITYQUEUE_H

+ 131
- 0
src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h View File

@ -0,0 +1,131 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UAtomicQueue.h
@Time: 2021/7/2 11:28
@Desc:
***************************/
#ifndef CGRAPH_UATOMICQUEUE_H
#define CGRAPH_UATOMICQUEUE_H
#include <memory>
#include <mutex>
#include <queue>
#include <condition_variable>
#include "../UThreadPoolDefine.h"
#include "UQueueObject.h"
CGRAPH_NAMESPACE_BEGIN
template<typename T>
class UAtomicQueue : public UQueueObject {
public:
UAtomicQueue() = default;
/**
*
* @param value
*/
CVoid waitPop(T& value) {
CGRAPH_UNIQUE_LOCK lk(mutex_);
cv_.wait(lk, [this] { return !queue_.empty(); });
value = std::move(*queue_.front());
queue_.pop();
}
/**
*
* @param value
* @return
*/
CBool tryPop(T& value) {
CGRAPH_LOCK_GUARD lk(mutex_);
if (queue_.empty()) {
return false;
}
value = std::move(*queue_.front());
queue_.pop();
return true;
}
/**
*
* @param values
* @param maxPoolBatchSize
* @return
*/
CBool tryPop(std::vector<T>& values, int maxPoolBatchSize) {
CGRAPH_LOCK_GUARD lk(mutex_);
if (queue_.empty() || maxPoolBatchSize <= 0) {
return false;
}
while (!queue_.empty() && maxPoolBatchSize--) {
values.emplace_back(std::move(*queue_.front()));
queue_.pop();
}
return true;
}
/**
*
* @return
*/
std::unique_ptr<T> waitPop() {
CGRAPH_UNIQUE_LOCK lk(mutex_);
cv_.wait(lk, [this] { return !queue_.empty(); });
std::unique_ptr<T> result = std::move(queue_.front());
queue_.pop();
return result;
}
/**
*
* @return
*/
std::unique_ptr<T> tryPop() {
CGRAPH_LOCK_GUARD lk(mutex_);
if (queue_.empty()) { return std::unique_ptr<T>(); }
std::unique_ptr<T> ptr = std::move(queue_.front());
queue_.pop();
return ptr;
}
/**
*
* @param value
*/
CVoid push(T&& value) {
std::unique_ptr<T> task(std::make_unique<T>(std::move(value)));
CGRAPH_LOCK_GUARD lk(mutex_);
queue_.push(std::move(task));
cv_.notify_one();
}
/**
*
* @return
*/
[[nodiscard]] CBool empty() {
CGRAPH_LOCK_GUARD lk(mutex_);
return queue_.empty();
}
CGRAPH_NO_ALLOWED_COPY(UAtomicQueue)
private:
std::queue<std::unique_ptr<T>> queue_;
};
CGRAPH_NAMESPACE_END
#endif //CGRAPH_UATOMICQUEUE_H

+ 16
- 0
src/UtilsCtrl/ThreadPool/Queue/UQueueInclude.h View File

@ -0,0 +1,16 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UQueueInclude.h
@Time: 2022/1/12 11:09
@Desc:
***************************/
#ifndef CGRAPH_UQUEUEINCLUDE_H
#define CGRAPH_UQUEUEINCLUDE_H
#include "UAtomicQueue.h"
#include "UWorkStealingQueue.h"
#include "UAtomicPriorityQueue.h"
#endif //CGRAPH_UQUEUEINCLUDE_H

+ 26
- 0
src/UtilsCtrl/ThreadPool/Queue/UQueueObject.h View File

@ -0,0 +1,26 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UQueueObject.h
@Time: 2022/10/1 20:31
@Desc:
***************************/
#ifndef CGRAPH_UQUEUEOBJECT_H
#define CGRAPH_UQUEUEOBJECT_H
#include <mutex>
#include "../UThreadObject.h"
CGRAPH_NAMESPACE_BEGIN
class UQueueObject : public UThreadObject {
protected:
std::mutex mutex_;
std::condition_variable cv_;
};
CGRAPH_NAMESPACE_END
#endif //CGRAPH_UQUEUEOBJECT_H

+ 133
- 0
src/UtilsCtrl/ThreadPool/Queue/UWorkStealingQueue.h View File

@ -0,0 +1,133 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UWorkStealingQueue.h
@Time: 2021/7/2 11:29
@Desc:
***************************/
#ifndef CGRAPH_UWORKSTEALINGQUEUE_H
#define CGRAPH_UWORKSTEALINGQUEUE_H
#include <queue>
#include <mutex>
#include <deque>
#include "UQueueObject.h"
#include "../Task/UTask.h"
CGRAPH_NAMESPACE_BEGIN
class UWorkStealingQueue : public UQueueObject {
public:
UWorkStealingQueue() = default;
/**
*
* @param task
*/
CVoid push(UTask&& task) {
while (true) {
if (mutex_.try_lock()) {
deque_.emplace_front(std::move(task));
mutex_.unlock();
break;
} else {
std::this_thread::yield();
}
}
}
/**
*
* @param task
* @return
*/
CBool tryPop(UTask& task) {
// 使raii锁线
bool result = false;
if (mutex_.try_lock()) {
if (!deque_.empty()) {
task = std::move(deque_.front()); //
deque_.pop_front();
result = true;
}
mutex_.unlock();
}
return result;
}
/**
*
* @param taskArr
* @param maxLocalBatchSize
* @return
*/
CBool tryPop(UTaskArrRef taskArr,
int maxLocalBatchSize) {
bool result = false;
if (mutex_.try_lock()) {
while (!deque_.empty() && maxLocalBatchSize--) {
taskArr.emplace_back(std::move(deque_.front()));
deque_.pop_front();
result = true;
}
mutex_.unlock();
}
return result;
}
/**
*
* @param task
* @return
*/
CBool trySteal(UTask& task) {
bool result = false;
if (mutex_.try_lock()) {
if (!deque_.empty()) {
task = std::move(deque_.back()); //
deque_.pop_back();
result = true;
}
mutex_.unlock();
}
return result;
}
/**
*
* @param taskArr
* @return
*/
CBool trySteal(UTaskArrRef taskArr, int maxStealBatchSize) {
bool result = false;
if (mutex_.try_lock()) {
while (!deque_.empty() && maxStealBatchSize--) {
taskArr.emplace_back(std::move(deque_.back()));
deque_.pop_back();
result = true;
}
mutex_.unlock();
}
return result; //
}
CGRAPH_NO_ALLOWED_COPY(UWorkStealingQueue)
private:
std::deque<UTask> deque_;
};
CGRAPH_NAMESPACE_END
#endif //CGRAPH_UWORKSTEALINGQUEUE_H

+ 80
- 0
src/UtilsCtrl/ThreadPool/Task/UTask.h View File

@ -0,0 +1,80 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UTask.h
@Time: 2021/7/2 11:32
@Desc:
***************************/
#ifndef CGRAPH_UTASK_H
#define CGRAPH_UTASK_H
#include <vector>
#include <memory>
#include "../UThreadObject.h"
CGRAPH_NAMESPACE_BEGIN
class UTask : public UThreadObject {
struct taskBased {
explicit taskBased() = default;
virtual CVoid call() = 0;
virtual ~taskBased() = default;
};
template<typename F>
struct taskDerided : taskBased {
F func_;
explicit taskDerided(F&& func) : func_(std::move(func)) {}
CVoid call() override { func_(); }
};
public:
template<typename F>
UTask(F&& f, int priority = 0)
: impl_(new taskDerided<F>(std::forward<F>(f)))
, priority_(priority) {}
CVoid operator()() {
if (likely(impl_)) {
impl_->call();
}
}
UTask() = default;
UTask(UTask&& task) noexcept:
impl_(std::move(task.impl_)),
priority_(task.priority_) {}
UTask &operator=(UTask&& task) noexcept {
impl_ = std::move(task.impl_);
priority_ = task.priority_;
return *this;
}
CBool operator>(const UTask& task) const {
return priority_ < task.priority_; //
}
CBool operator<(const UTask& task) const {
return priority_ >= task.priority_;
}
CGRAPH_NO_ALLOWED_COPY(UTask)
private:
std::unique_ptr<taskBased> impl_ = nullptr;
int priority_ = 0; //
};
using UTaskRef = UTask &;
using UTaskPtr = UTask *;
using UTaskArr = std::vector<UTask>;
using UTaskArrRef = std::vector<UTask> &;
CGRAPH_NAMESPACE_END
#endif //CGRAPH_UTASK_H

+ 102
- 0
src/UtilsCtrl/ThreadPool/Task/UTaskGroup.h View File

@ -0,0 +1,102 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UTaskGroup.h
@Time: 2022/1/2 2:17
@Desc:
***************************/
#ifndef CGRAPH_UTASKGROUP_H
#define CGRAPH_UTASKGROUP_H
#include <utility>
#include "../UThreadObject.h"
CGRAPH_NAMESPACE_BEGIN
class UTaskGroup : public UThreadObject {
public:
explicit UTaskGroup() = default;
CGRAPH_NO_ALLOWED_COPY(UTaskGroup)
/**
* taskGroup
* @param task
* @param ttl
* @param onFinished
*/
explicit UTaskGroup(CGRAPH_DEFAULT_CONST_FUNCTION_REF task,
CMSec ttl = CGRAPH_MAX_BLOCK_TTL,
CGRAPH_CALLBACK_CONST_FUNCTION_REF onFinished = nullptr) noexcept {
this->addTask(task)
->setTtl(ttl)
->setOnFinished(onFinished);
}
/**
*
* @param task
*/
UTaskGroup* addTask(CGRAPH_DEFAULT_CONST_FUNCTION_REF task) {
task_arr_.emplace_back(task);
return this;
}
/**
*
* @param ttl
*/
UTaskGroup* setTtl(CMSec ttl) {
this->ttl_ = ttl;
return this;
}
/**
*
* @param onFinished
* @return
*/
UTaskGroup* setOnFinished(CGRAPH_CALLBACK_CONST_FUNCTION_REF onFinished) {
this->on_finished_ = onFinished;
return this;
}
/**
*
* @return
*/
[[nodiscard]] CMSec getTtl() const {
return this->ttl_;
}
/**
*
*/
CVoid clear() {
task_arr_.clear();
}
/**
*
* @return
*/
[[nodiscard]] CSize getSize() const {
auto size = task_arr_.size();
return size;
}
private:
std::vector<CGRAPH_DEFAULT_FUNCTION> task_arr_; //
CMSec ttl_ = CGRAPH_MAX_BLOCK_TTL; // (0)
CGRAPH_CALLBACK_FUNCTION on_finished_ = nullptr; //
friend class UThreadPool;
};
using UTaskGroupPtr = UTaskGroup *;
using UTaskGroupRef = UTaskGroup &;
CGRAPH_NAMESPACE_END
#endif //CGRAPH_UTASKGROUP_H

+ 15
- 0
src/UtilsCtrl/ThreadPool/Task/UTaskInclude.h View File

@ -0,0 +1,15 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UTaskInclude.h
@Time: 2022/1/12 9:34
@Desc:
***************************/
#ifndef CGRAPH_UTASKINCLUDE_H
#define CGRAPH_UTASKINCLUDE_H
#include "UTask.h"
#include "UTaskGroup.h"
#endif //CGRAPH_UTASKINCLUDE_H

+ 212
- 0
src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h View File

@ -0,0 +1,212 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UThreadBase.h
@Time: 2021/7/2 11:24
@Desc:
***************************/
#ifndef CGRAPH_UTHREADBASE_H
#define CGRAPH_UTHREADBASE_H
#include <thread>
#include "../UThreadObject.h"
#include "../Queue/UQueueInclude.h"
#include "../Task/UTaskInclude.h"
CGRAPH_NAMESPACE_BEGIN
class UThreadBase : public UThreadObject {
protected:
explicit UThreadBase() {
done_ = true;
is_init_ = false;
is_running_ = false;
pool_task_queue_ = nullptr;
pool_priority_task_queue_ = nullptr;
config_ = nullptr;
total_task_num_ = 0;
}
~UThreadBase() override {
reset();
}
/**
* 线 destroy
* init函数不一样线
* @return
*/
CStatus destroy() override {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(true)
reset();
CGRAPH_FUNCTION_END
}
/**
* 线
* @param task
* @return
*/
virtual bool popPoolTask(UTaskRef task) {
bool result = pool_task_queue_->tryPop(task);
if (!result && CGRAPH_THREAD_TYPE_SECONDARY == type_) {
// 线
result = pool_priority_task_queue_->tryPop(task);
}
return result;
}
/**
* 线
* @param tasks
* @return
*/
virtual bool popPoolTask(UTaskArrRef tasks) {
bool result = pool_task_queue_->tryPop(tasks, config_->max_pool_batch_size_);
if (!result && CGRAPH_THREAD_TYPE_SECONDARY == type_) {
result = pool_priority_task_queue_->tryPop(tasks, 1);
}
return result;
}
/**
*
* @param task
*/
CVoid runTask(UTask& task) {
is_running_ = true;
task();
total_task_num_++;
is_running_ = false;
}
/**
*
* @param tasks
*/
CVoid runTasks(UTaskArr& tasks) {
is_running_ = true;
for (auto& task : tasks) {
task();
}
total_task_num_ += tasks.size();
is_running_ = false;
}
/**
*
*/
CVoid reset() {
done_ = false;
if (thread_.joinable()) {
thread_.join(); // 线
}
is_init_ = false;
is_running_ = false;
total_task_num_ = 0;
}
/**
* 线windows平台使用
*/
CVoid setSchedParam() {
#ifndef _WIN32
int priority = CGRAPH_THREAD_SCHED_OTHER;
int policy = CGRAPH_THREAD_MIN_PRIORITY;
if (type_ == CGRAPH_THREAD_TYPE_PRIMARY) {
priority = config_->primary_thread_priority_;
policy = config_->primary_thread_policy_;
} else if (type_ == CGRAPH_THREAD_TYPE_SECONDARY) {
priority = config_->secondary_thread_priority_;
policy = config_->secondary_thread_policy_;
}
auto handle = thread_.native_handle();
sched_param param = { calcPriority(priority) };
int ret = pthread_setschedparam(handle, calcPolicy(policy), &param);
if (0 != ret) {
CGRAPH_ECHO("warning : set thread sched param failed, error code is [%d]", ret);
}
#endif
}
/**
* 线linux系统
*/
CVoid setAffinity(int index) {
#ifdef __linux__
if (!config_->bind_cpu_enable_ || CGRAPH_CPU_NUM == 0 || index < 0) {
return;
}
cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET(index % CGRAPH_CPU_NUM, &mask);
auto handle = thread_.native_handle();
int ret = pthread_setaffinity_np(handle, sizeof(cpu_set_t), &mask);
if (0 != ret) {
CGRAPH_ECHO("warning : set thread affinity failed, error code is [%d]", ret);
}
#endif
}
private:
/**
* 线
* OTHER/RR/FIFO对应数值OTHER类型
* @param policy
* @return
*/
[[nodiscard]] static int calcPolicy(int policy) {
return (CGRAPH_THREAD_SCHED_OTHER == policy
|| CGRAPH_THREAD_SCHED_RR == policy
|| CGRAPH_THREAD_SCHED_FIFO == policy)
? policy : CGRAPH_THREAD_SCHED_OTHER;
}
/**
* 线
* [min,max]min值
* @param priority
* @return
*/
[[nodiscard]] static int calcPriority(int priority) {
return (priority >= CGRAPH_THREAD_MIN_PRIORITY
&& priority <= CGRAPH_THREAD_MAX_PRIORITY)
? priority : CGRAPH_THREAD_MIN_PRIORITY;
}
protected:
bool done_; // 线
bool is_init_; //
bool is_running_; //
int type_ = 0; // 线线线
unsigned long total_task_num_ = 0; //
UAtomicQueue<UTask>* pool_task_queue_; // 线
UAtomicPriorityQueue<UTask>* pool_priority_task_queue_; // 线线
UThreadPoolConfigPtr config_ = nullptr; //
std::thread thread_; // 线
};
CGRAPH_NAMESPACE_END
#endif //CGRAPH_UTHREADBASE_H

+ 15
- 0
src/UtilsCtrl/ThreadPool/Thread/UThreadInclude.h View File

@ -0,0 +1,15 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UThreadInclude.h
@Time: 2022/1/12 11:09
@Desc:
***************************/
#ifndef CGRAPH_UTHREADINCLUDE_H
#define CGRAPH_UTHREADINCLUDE_H
#include "UThreadPrimary.h"
#include "UThreadSecondary.h"
#endif //CGRAPH_UTHREADINCLUDE_H

+ 216
- 0
src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h View File

@ -0,0 +1,216 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UThreadPrimary.h
@Time: 2021/7/8 11:02
@Desc: 线
***************************/
#ifndef CGRAPH_UTHREADPRIMARY_H
#define CGRAPH_UTHREADPRIMARY_H
#include "UThreadBase.h"
CGRAPH_NAMESPACE_BEGIN
class UThreadPrimary : public UThreadBase {
protected:
explicit UThreadPrimary() {
index_ = -1;
pool_threads_ = nullptr;
type_ = CGRAPH_THREAD_TYPE_PRIMARY;
}
CStatus init() override {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(false)
is_init_ = true;
thread_ = std::move(std::thread(&UThreadPrimary::run, this));
setSchedParam();
setAffinity(index_);
CGRAPH_FUNCTION_END
}
/**
* 线init之前使用
* @param index
* @param poolTaskQueue
* @param poolThreads
* @param config
*/
CStatus setThreadPoolInfo(int index,
UAtomicQueue<UTask>* poolTaskQueue,
std::vector<UThreadPrimary *>* poolThreads,
UThreadPoolConfigPtr config) {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(false) //
CGRAPH_ASSERT_NOT_NULL(poolTaskQueue)
CGRAPH_ASSERT_NOT_NULL(poolThreads)
CGRAPH_ASSERT_NOT_NULL(config)
this->index_ = index;
this->pool_task_queue_ = poolTaskQueue;
this->pool_threads_ = poolThreads;
this->config_ = config;
CGRAPH_FUNCTION_END
}
/**
* 线
* @return
*/
CStatus run() override {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(true)
CGRAPH_ASSERT_NOT_NULL(pool_threads_)
CGRAPH_ASSERT_NOT_NULL(config_)
/**
* 线primary线程为null都不可以执行
* 线
*
*/
if (std::any_of(pool_threads_->begin(), pool_threads_->end(),
[](UThreadPrimary* thd) {
return nullptr == thd;
})) {
CGRAPH_RETURN_ERROR_STATUS("primary thread is null")
}
if (config_->calcBatchTaskRatio()) {
while (done_) {
processTasks(); //
}
} else {
while (done_) {
processTask(); //
}
}
CGRAPH_FUNCTION_END
}
/**
*
* @return
*/
CVoid processTask() {
UTask task;
if (popTask(task) || popPoolTask(task) || stealTask(task)) {
runTask(task);
} else {
std::this_thread::yield();
}
}
/**
* task信息
*/
CVoid processTasks() {
UTaskArr tasks;
if (popTask(tasks) || popPoolTask(tasks) || stealTask(tasks)) {
// 线/task
runTasks(tasks);
} else {
std::this_thread::yield();
}
}
/**
*
* @param task
* @return
*/
bool popTask(UTaskRef task) {
return work_stealing_queue_.tryPop(task);
}
/**
*
* @param tasks
* @return
*/
bool popTask(UTaskArrRef tasks) {
return work_stealing_queue_.tryPop(tasks, config_->max_local_batch_size_);
}
/**
* 线
* @param task
* @return
*/
bool stealTask(UTaskRef task) {
if (unlikely(pool_threads_->size() < config_->default_thread_size_)) {
/**
* 线steal
*
*/
return false;
}
/**
* primary线程中窃取
* primary线程数
*/
int range = config_->calcStealRange();
for (int i = 0; i < range; i++) {
/**
* 线thread中
* true
*/
int curIndex = (index_ + i + 1) % config_->default_thread_size_;
if (nullptr != (*pool_threads_)[curIndex]
&& ((*pool_threads_)[curIndex])->work_stealing_queue_.trySteal(task)) {
return true;
}
}
return false;
}
/**
* 线
* @param tasks
* @return
*/
bool stealTask(UTaskArrRef tasks) {
if (unlikely(pool_threads_->size() < config_->default_thread_size_)) {
return false;
}
int range = config_->calcStealRange();
for (int i = 0; i < range; i++) {
int curIndex = (index_ + i + 1) % config_->default_thread_size_;
if (nullptr != (*pool_threads_)[curIndex]
&& ((*pool_threads_)[curIndex])->work_stealing_queue_.trySteal(tasks, config_->max_steal_batch_size_)) {
return true;
}
}
return false;
}
private:
int index_ {-1}; // 线index
UWorkStealingQueue work_stealing_queue_; //
std::vector<UThreadPrimary *>* pool_threads_; // 线线
friend class UThreadPool;
friend class UAllocator;
};
using UThreadPrimaryPtr = UThreadPrimary *;
CGRAPH_NAMESPACE_END
#endif //CGRAPH_UTHREADPRIMARY_H

+ 131
- 0
src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h View File

@ -0,0 +1,131 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UThreadSecondary.h
@Time: 2021/7/8 11:02
@Desc:
***************************/
#ifndef CGRAPH_UTHREADSECONDARY_H
#define CGRAPH_UTHREADSECONDARY_H
#include "UThreadBase.h"
CGRAPH_NAMESPACE_BEGIN
class UThreadSecondary : public UThreadBase {
public:
explicit UThreadSecondary() {
cur_ttl_ = 0;
type_ = CGRAPH_THREAD_TYPE_SECONDARY;
}
protected:
CStatus init() override {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(false)
CGRAPH_ASSERT_NOT_NULL(config_)
cur_ttl_ = config_->secondary_thread_ttl_;
is_init_ = true;
thread_ = std::move(std::thread(&UThreadSecondary::run, this));
setSchedParam();
CGRAPH_FUNCTION_END
}
/**
* pool的信息
* @param poolTaskQueue
* @param poolPriorityTaskQueue
* @param config
* @return
*/
CStatus setThreadPoolInfo(UAtomicQueue<UTask>* poolTaskQueue,
UAtomicPriorityQueue<UTask>* poolPriorityTaskQueue,
UThreadPoolConfigPtr config) {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(false) //
CGRAPH_ASSERT_NOT_NULL(poolTaskQueue)
CGRAPH_ASSERT_NOT_NULL(poolPriorityTaskQueue)
CGRAPH_ASSERT_NOT_NULL(config)
this->pool_task_queue_ = poolTaskQueue;
this->pool_priority_task_queue_ = poolPriorityTaskQueue;
this->config_ = config;
CGRAPH_FUNCTION_END
}
CStatus run() override {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(true)
CGRAPH_ASSERT_NOT_NULL(config_)
if (config_->calcBatchTaskRatio()) {
while (done_) {
processTasks(); //
}
} else {
while (done_) {
processTask(); //
}
}
CGRAPH_FUNCTION_END
}
/**
* 线
*/
CVoid processTask() {
UTask task;
if (popPoolTask(task)) {
runTask(task);
} else {
std::this_thread::yield();
}
}
/**
* n个任务
*/
CVoid processTasks() {
UTaskArr tasks;
if (popPoolTask(tasks)) {
runTasks(tasks);
} else {
std::this_thread::yield();
}
}
/**
* 线
* @return
*/
bool freeze() {
if (likely(is_running_)) {
cur_ttl_++;
cur_ttl_ = std::min(cur_ttl_, config_->secondary_thread_ttl_);
} else {
cur_ttl_--; // 线ttl-1
}
return cur_ttl_ <= 0;
}
private:
int cur_ttl_ = 0; //
friend class UThreadPool;
};
using UThreadSecondaryPtr = UThreadSecondary *;
CGRAPH_NAMESPACE_END
#endif // CGRAPH_UTHREADSECONDARY_H

+ 30
- 0
src/UtilsCtrl/ThreadPool/UThreadObject.h View File

@ -0,0 +1,30 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: CThreadObject.h
@Time: 2021/7/2 10:39
@Desc:
***************************/
#ifndef CGRAPH_UTHREADOBJECT_H
#define CGRAPH_UTHREADOBJECT_H
#include "../UtilsObject.h"
CGRAPH_NAMESPACE_BEGIN
class UThreadObject : public UtilsObject {
protected:
/**
* thread中的算子run方法
* @return
*/
CStatus run() override {
CGRAPH_NO_SUPPORT
}
};
CGRAPH_NAMESPACE_END
#endif //CGRAPH_UTHREADOBJECT_H

+ 201
- 0
src/UtilsCtrl/ThreadPool/UThreadPool.cpp View File

@ -0,0 +1,201 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UThreadPool.cpp
@Time: 2022/10/3 17:31
@Desc:
***************************/
#include "UThreadPool.h"
CGRAPH_NAMESPACE_BEGIN
UThreadPool::UThreadPool(CBool autoInit, const UThreadPoolConfig& config) noexcept {
cur_index_ = 0;
is_init_ = false;
input_task_num_ = 0;
this->setConfig(config); // setConfig 函数,用在 is_init_ 设定之后
is_monitor_ = config_.monitor_enable_; /** 根据参数设定,决定是否开启监控线程。默认开启 */
/**
* CGraph
* windows平台上Visual Studio(2017) UThreadPool .dll文件时
* https://github.com/ChunelFeng/CGraph/issues/17
*/
monitor_thread_ = std::move(std::thread(&UThreadPool::monitor, this));
if (autoInit) {
this->init();
}
}
UThreadPool::~UThreadPool() {
is_monitor_ = false; // 在析构的时候,才释放监控线程。先释放监控线程,再释放其他的线程
if (monitor_thread_.joinable()) {
monitor_thread_.join();
}
destroy();
}
CStatus UThreadPool::setConfig(const UThreadPoolConfig &config) {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(false) // 初始化后,无法设置参数信息
this->config_ = config;
CGRAPH_FUNCTION_END
}
CStatus UThreadPool::init() {
CGRAPH_FUNCTION_BEGIN
if (is_init_) {
CGRAPH_FUNCTION_END
}
primary_threads_.reserve(config_.default_thread_size_);
for (int i = 0; i < config_.default_thread_size_; i++) {
auto ptr = CGRAPH_SAFE_MALLOC_COBJECT(UThreadPrimary) // 创建核心线程数
ptr->setThreadPoolInfo(i, &task_queue_, &primary_threads_, &config_);
status += ptr->init();
primary_threads_.emplace_back(ptr);
}
CGRAPH_FUNCTION_CHECK_STATUS
is_init_ = true;
CGRAPH_FUNCTION_END
}
CStatus UThreadPool::submit(const UTaskGroup& taskGroup, CMSec ttl) {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(true)
std::vector<std::future<CVoid>> futures;
for (const auto& task : taskGroup.task_arr_) {
futures.emplace_back(commit(task));
}
// 计算最终运行时间信息
auto deadline = std::chrono::system_clock::now()
+ std::chrono::milliseconds(std::min(taskGroup.getTtl(), ttl));
for (auto& fut : futures) {
const auto& futStatus = fut.wait_until(deadline);
switch (futStatus) {
case std::future_status::ready: break; // 正常情况,直接返回了
case std::future_status::timeout: status += CStatus("thread status timeout"); break;
case std::future_status::deferred: status += CStatus("thread status deferred"); break;
default: status += CStatus("thread status unknown");
}
}
if (taskGroup.on_finished_) {
taskGroup.on_finished_(status);
}
CGRAPH_FUNCTION_END
}
CStatus UThreadPool::submit(CGRAPH_DEFAULT_CONST_FUNCTION_REF func, CMSec ttl,
CGRAPH_CALLBACK_CONST_FUNCTION_REF onFinished) {
return submit(UTaskGroup(func, ttl, onFinished));
}
CStatus UThreadPool::destroy() {
CGRAPH_FUNCTION_BEGIN
if (!is_init_) {
CGRAPH_FUNCTION_END
}
// primary 线程是普通指针,需要delete
for (auto &pt : primary_threads_) {
status += pt->destroy();
CGRAPH_DELETE_PTR(pt)
}
CGRAPH_FUNCTION_CHECK_STATUS
primary_threads_.clear();
// secondary 线程是智能指针,不需要delete
for (auto &st : secondary_threads_) {
status += st->destroy();
}
CGRAPH_FUNCTION_CHECK_STATUS
secondary_threads_.clear();
is_init_ = false;
CGRAPH_FUNCTION_END
}
CIndex UThreadPool::dispatch(CIndex origIndex) {
if (unlikely(config_.fair_lock_enable_)) {
return CGRAPH_DEFAULT_TASK_STRATEGY; // 如果开启fair lock,则全部写入 pool的queue中,依次执行
}
CIndex realIndex = 0;
if (CGRAPH_DEFAULT_TASK_STRATEGY == origIndex) {
/**
* [0, default_thread_size_) thread queue来调度
* [default_thread_size_, max_thread_size_) pool queue来调度
*/
realIndex = cur_index_++;
if (cur_index_ >= config_.max_thread_size_ || cur_index_ < 0) {
cur_index_ = 0;
}
} else {
realIndex = origIndex;
}
return realIndex; // 交到上游去判断,走哪个线程
}
CStatus UThreadPool::createSecondaryThread(CInt size) {
CGRAPH_FUNCTION_BEGIN
int leftSize = (int)(config_.max_thread_size_ - config_.default_thread_size_ - secondary_threads_.size());
int realSize = std::min(size, leftSize); // 使用 realSize 来确保所有的线程数量之和,不会超过设定max值
for (int i = 0; i < realSize; i++) {
auto ptr = CGRAPH_MAKE_UNIQUE_COBJECT(UThreadSecondary)
ptr->setThreadPoolInfo(&task_queue_, &priority_task_queue_, &config_);
status += ptr->init();
secondary_threads_.emplace_back(std::move(ptr));
}
CGRAPH_FUNCTION_END
}
CVoid UThreadPool::monitor() {
while (is_monitor_) {
while (is_monitor_ && !is_init_) {
// 如果没有init,则一直处于空跑状态
CGRAPH_SLEEP_SECOND(1)
}
int span = config_.monitor_span_;
while (is_monitor_ && is_init_ && span--) {
CGRAPH_SLEEP_SECOND(1) // 保证可以快速退出
}
// 如果 primary线程都在执行,则表示忙碌
bool busy = std::all_of(primary_threads_.begin(), primary_threads_.end(),
[](UThreadPrimaryPtr ptr) { return nullptr != ptr && ptr->is_running_; });
// 如果忙碌或者priority_task_queue_中有任务,则需要添加 secondary线程
if (busy || !priority_task_queue_.empty()) {
createSecondaryThread(1);
}
// 判断 secondary 线程是否需要退出
for (auto iter = secondary_threads_.begin(); iter != secondary_threads_.end(); ) {
(*iter)->freeze() ? secondary_threads_.erase(iter++) : iter++;
}
}
}
CGRAPH_NAMESPACE_END

+ 152
- 0
src/UtilsCtrl/ThreadPool/UThreadPool.h View File

@ -0,0 +1,152 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UThreadPool.h
@Time: 2021/7/4 1:34
@Desc:
***************************/
#ifndef CGRAPH_UTHREADPOOL_H
#define CGRAPH_UTHREADPOOL_H
#include <vector>
#include <list>
#include <future>
#include <thread>
#include <algorithm>
#include <memory>
#include <functional>
#include "UThreadObject.h"
#include "UThreadPoolConfig.h"
#include "Queue/UQueueInclude.h"
#include "Thread/UThreadInclude.h"
#include "Task/UTaskInclude.h"
CGRAPH_NAMESPACE_BEGIN
class UThreadPool : public UThreadObject {
public:
/**
* 线
* @param autoInit 线
* @param config
*/
explicit UThreadPool(CBool autoInit = true,
const UThreadPoolConfig& config = UThreadPoolConfig()) noexcept;
/**
*
*/
~UThreadPool() override;
/**
* 线init()
* @param config
* @return
* @notice (UThreadPoolSingleton)线线init destroy
*/
CStatus setConfig(const UThreadPoolConfig &config);
/**
* 线
* @return
*/
CStatus init() final;
/**
*
* @tparam FunctionType
* @param func
* @param index
* @return
*/
template<typename FunctionType>
auto commit(const FunctionType& func,
CIndex index = CGRAPH_DEFAULT_TASK_STRATEGY)
-> std::future<typename std::result_of<FunctionType()>::type>;
/**
*
* @tparam FunctionType
* @param func
* @param priority
* @return
* @notice priority [-100, 100]
*/
template<typename FunctionType>
auto commitWithPriority(const FunctionType& func,
int priority)
-> std::future<typename std::result_of<FunctionType()>::type>;
/**
*
* taskGroup内部ttl和入参ttl的最小值ttl标准
* @param taskGroup
* @param ttl
* @return
*/
CStatus submit(const UTaskGroup& taskGroup,
CMSec ttl = CGRAPH_MAX_BLOCK_TTL);
/**
*
* @param task
* @param ttl
* @param onFinished
* @return
*/
CStatus submit(CGRAPH_DEFAULT_CONST_FUNCTION_REF func,
CMSec ttl = CGRAPH_MAX_BLOCK_TTL,
CGRAPH_CALLBACK_CONST_FUNCTION_REF onFinished = nullptr);
/**
* 线
* @return
*/
CStatus destroy() final;
protected:
/**
*
* @param origIndex
* @return
*/
virtual CIndex dispatch(CIndex origIndex);
/**
* 线线
* @param size
* @return
*/
CStatus createSecondaryThread(CInt size);
/**
* 线线线
* / secondary类型线程生效
*/
CVoid monitor();
CGRAPH_NO_ALLOWED_COPY(UThreadPool)
protected:
CBool is_init_ { false }; //
CBool is_monitor_ { true }; //
CInt cur_index_ = 0; // 线
CULong input_task_num_ = 0; //
UAtomicQueue<UTask> task_queue_; //
UAtomicPriorityQueue<UTask> priority_task_queue_; // 线
std::vector<UThreadPrimaryPtr> primary_threads_; // 线
std::list<std::unique_ptr<UThreadSecondary>> secondary_threads_; // 线
UThreadPoolConfig config_; // 线
std::thread monitor_thread_; // 线
};
using UThreadPoolPtr = UThreadPool *;
CGRAPH_NAMESPACE_END
#include "UThreadPool.inl"
#endif //CGRAPH_UTHREADPOOL_H

+ 62
- 0
src/UtilsCtrl/ThreadPool/UThreadPool.inl View File

@ -0,0 +1,62 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UThreadPool.inl
@Time: 2021/7/4 1:34 下午
@Desc:
***************************/
#ifndef CGRAPH_UTHREADPOOL_INL
#define CGRAPH_UTHREADPOOL_INL
#include "UThreadPool.h"
CGRAPH_NAMESPACE_BEGIN
template<typename FunctionType>
auto UThreadPool::commit(const FunctionType& func, CIndex index)
-> std::future<typename std::result_of<FunctionType()>::type> {
using ResultType = typename std::result_of<FunctionType()>::type;
std::packaged_task<ResultType()> task(func);
std::future<ResultType> result(task.get_future());
CIndex realIndex = dispatch(index);
if (realIndex >= 0 && realIndex < config_.default_thread_size_) {
// 如果返回的结果,在主线程数量之间,则放到主线程的queue中执行
primary_threads_[realIndex]->work_stealing_queue_.push(std::move(task));
} else if (CGRAPH_LONG_TIME_TASK_STRATEGY == realIndex) {
/**
* 如果是长时间任务,则交给特定的任务队列,仅由辅助线程处理
* 目的是防止有很多长时间任务,将所有运行的线程均阻塞
* 长任务程序,默认优先级较低
**/
priority_task_queue_.push(std::move(task), CGRAPH_LONG_TIME_TASK_STRATEGY);
} else {
// 返回其他结果,放到pool的queue中执行
task_queue_.push(std::move(task));
}
input_task_num_++; // 计数
return result;
}
template<typename FunctionType>
auto UThreadPool::commitWithPriority(const FunctionType& func, int priority)
-> std::future<typename std::result_of<FunctionType()>::type> {
using ResultType = typename std::result_of<FunctionType()>::type;
std::packaged_task<ResultType()> task(func);
std::future<ResultType> result(task.get_future());
if (secondary_threads_.empty()) {
createSecondaryThread(1); // 如果没有开启辅助线程,则直接开启一个
}
priority_task_queue_.push(std::move(task), priority);
input_task_num_++;
return result;
}
CGRAPH_NAMESPACE_END
#endif // CGRAPH_UTHREADPOOL_INL

+ 66
- 0
src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h View File

@ -0,0 +1,66 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UThreadPoolConfig.h
@Time: 2022/1/3 9:31
@Desc: 线
***************************/
#ifndef CGRAPH_UTHREADPOOLCONFIG_H
#define CGRAPH_UTHREADPOOLCONFIG_H
#include "UThreadObject.h"
#include "UThreadPoolDefine.h"
CGRAPH_NAMESPACE_BEGIN
struct UThreadPoolConfig : public UThreadObject {
/** 具体值含义,参考UThreadPoolDefine.h文件 */
int default_thread_size_ = CGRAPH_DEFAULT_THREAD_SIZE;
int max_thread_size_ = CGRAPH_MAX_THREAD_SIZE;
int max_task_steal_range_ = CGRAPH_MAX_TASK_STEAL_RANGE;
int max_local_batch_size_ = CGRAPH_MAX_LOCAL_BATCH_SIZE;
int max_pool_batch_size_ = CGRAPH_MAX_POOL_BATCH_SIZE;
int max_steal_batch_size_ = CGRAPH_MAX_STEAL_BATCH_SIZE;
int secondary_thread_ttl_ = CGRAPH_SECONDARY_THREAD_TTL;
int monitor_span_ = CGRAPH_MONITOR_SPAN;
int primary_thread_policy_ = CGRAPH_PRIMARY_THREAD_POLICY;
int secondary_thread_policy_ = CGRAPH_SECONDARY_THREAD_POLICY;
int primary_thread_priority_ = CGRAPH_PRIMARY_THREAD_PRIORITY;
int secondary_thread_priority_ = CGRAPH_SECONDARY_THREAD_PRIORITY;
bool bind_cpu_enable_ = CGRAPH_BIND_CPU_ENABLE;
bool batch_task_enable_ = CGRAPH_BATCH_TASK_ENABLE;
bool fair_lock_enable_ = CGRAPH_FAIR_LOCK_ENABLE;
bool monitor_enable_ = CGRAPH_MONITOR_ENABLE;
protected:
/**
* 线-1
* @return
*/
[[nodiscard]] int calcStealRange() const {
int range = std::min(this->max_task_steal_range_, this->default_thread_size_ - 1);
return range;
}
/**
*
*
* @return
*/
[[nodiscard]] bool calcBatchTaskRatio() const {
bool ratio = (this->batch_task_enable_) && (!this->fair_lock_enable_);
return ratio;
}
friend class UThreadPrimary;
friend class UThreadSecondary;
};
using UThreadPoolConfigPtr = UThreadPoolConfig *;
CGRAPH_NAMESPACE_END
#endif //CGRAPH_UTHREADPOOLCONFIG_H

+ 78
- 0
src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h View File

@ -0,0 +1,78 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: ThreadPoolDefine.h
@Time: 2021/7/3 12:24
@Desc:
***************************/
#ifndef CGRAPH_UTHREADPOOLDEFINE_H
#define CGRAPH_UTHREADPOOLDEFINE_H
#include <thread>
#if _LIBCPP_STD_VER >= 17
#include <shared_mutex>
#else
# include <mutex>
#endif
#include <memory>
CGRAPH_NAMESPACE_BEGIN
#if _LIBCPP_STD_VER >= 17
using CGRAPH_READ_LOCK = std::shared_lock<std::shared_mutex>;
using CGRAPH_WRITE_LOCK = std::unique_lock<std::shared_mutex>;
#else
using CGRAPH_READ_LOCK = std::unique_lock<std::mutex>; // C++14使mutex替代
using CGRAPH_WRITE_LOCK = std::unique_lock<std::mutex>;
#endif
using CGRAPH_LOCK_GUARD = std::lock_guard<std::mutex>;
using CGRAPH_UNIQUE_LOCK = std::unique_lock<std::mutex>;
static const int CGRAPH_CPU_NUM = (int)std::thread::hardware_concurrency();
static const int CGRAPH_THREAD_TYPE_PRIMARY = 1;
static const int CGRAPH_THREAD_TYPE_SECONDARY = 2;
#ifndef _WIN32
static const int CGRAPH_THREAD_SCHED_OTHER = SCHED_OTHER;
static const int CGRAPH_THREAD_SCHED_RR = SCHED_RR;
static const int CGRAPH_THREAD_SCHED_FIFO = SCHED_FIFO;
#else
/** 线程调度策略,暂不支持windows系统 */
static const int CGRAPH_THREAD_SCHED_OTHER = 0;
static const int CGRAPH_THREAD_SCHED_RR = 0;
static const int CGRAPH_THREAD_SCHED_FIFO = 0;
#endif
static const int CGRAPH_THREAD_MIN_PRIORITY = 0; // 线
static const int CGRAPH_THREAD_MAX_PRIORITY = 99; // 线
static const CMSec CGRAPH_MAX_BLOCK_TTL = 10000000; // ms
static const int CGRAPH_DEFAULT_TASK_STRATEGY = -1; // 线
static const int CGRAPH_LONG_TIME_TASK_STRATEGY = -101; //
/**
* 线
*/
static const int CGRAPH_DEFAULT_THREAD_SIZE = (CGRAPH_CPU_NUM > 0) ? CGRAPH_CPU_NUM : 8; // 线
static const int CGRAPH_MAX_THREAD_SIZE = (CGRAPH_DEFAULT_THREAD_SIZE * 2) + 1; // 线
#ifndef _WIN32
static const int CGRAPH_MAX_TASK_STEAL_RANGE = 2; //
#else
static const int CGRAPH_MAX_TASK_STEAL_RANGE = 0; // windows平台暂不支持任务盗取功能
#endif
static const bool CGRAPH_BATCH_TASK_ENABLE = false; //
static const int CGRAPH_MAX_LOCAL_BATCH_SIZE = 2; //
static const int CGRAPH_MAX_POOL_BATCH_SIZE = 2; //
static const int CGRAPH_MAX_STEAL_BATCH_SIZE = 2; //
static const bool CGRAPH_FAIR_LOCK_ENABLE = false; // CGRAPH_BATCH_TASK_ENABLE无效
static const int CGRAPH_SECONDARY_THREAD_TTL = 10; // 线ttls
static const bool CGRAPH_MONITOR_ENABLE = true; //
static const int CGRAPH_MONITOR_SPAN = 5; // 线s
static const bool CGRAPH_BIND_CPU_ENABLE = true; // cpu模式线
static const int CGRAPH_PRIMARY_THREAD_POLICY = CGRAPH_THREAD_SCHED_OTHER; // 线
static const int CGRAPH_SECONDARY_THREAD_POLICY = CGRAPH_THREAD_SCHED_OTHER; // 线
static const int CGRAPH_PRIMARY_THREAD_PRIORITY = CGRAPH_THREAD_MIN_PRIORITY; // 线0~99
static const int CGRAPH_SECONDARY_THREAD_PRIORITY = CGRAPH_THREAD_MIN_PRIORITY; // 线0~99
CGRAPH_NAMESPACE_END
#endif // CGRAPH_UTHREADPOOLDEFINE_H

+ 20
- 0
src/UtilsCtrl/ThreadPool/UThreadPoolInclude.h View File

@ -0,0 +1,20 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: ThreadPoolInclude.h
@Time: 2021/7/3 12:25
@Desc:
***************************/
#ifndef CGRAPH_UTHREADPOOLINCLUDE_H
#define CGRAPH_UTHREADPOOLINCLUDE_H
#include "UThreadObject.h"
#include "UThreadPool.h"
#include "UThreadPoolDefine.h"
#include "UThreadPoolConfig.h"
#include "Queue/UQueueInclude.h"
#include "Task/UTaskInclude.h"
#include "Thread/UThreadInclude.h"
#endif //CGRAPH_UTHREADPOOLINCLUDE_H

+ 87
- 0
src/UtilsCtrl/UAllocator.h View File

@ -0,0 +1,87 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UAllocator.h
@Time: 2021/10/28 9:15
@Desc:
***************************/
#ifndef CGRAPH_UALLOCATOR_H
#define CGRAPH_UALLOCATOR_H
#ifdef _GENERATE_SESSION_
#include <uuid/uuid.h>
#endif
#include <mutex>
#include <memory>
CGRAPH_NAMESPACE_BEGIN
static std::mutex g_session_mtx;
/**
* CObject类型的类
*/
class UAllocator : public CObject {
public:
/**
*
* @tparam T
* @return
*/
template<typename T,
std::enable_if_t<std::is_base_of<CObject, T>::value, int> = 0>
static T* safeMallocCObject() {
T* ptr = nullptr;
while (!ptr) {
ptr = new(std::nothrow) T();
}
return ptr;
}
/**
* unique智能指针信息
* @tparam T
* @return
*/
template<typename T,
std::enable_if_t<std::is_base_of<CObject, T>::value, int> = 0>
static std::unique_ptr<T> makeUniqueCObject() {
return std::make_unique<T>();
}
/**
*
* @return
*/
static std::string generateSession() {
#ifdef _GENERATE_SESSION_
std::lock_guard<std::mutex> lock{ g_session_mtx };
uuid_t uuid;
char session[36] = {0}; // 36
uuid_generate(uuid);
uuid_unparse(uuid, session);
return session;
#else
return CGRAPH_EMPTY; // mac平台session信息
#endif
}
};
#define CGRAPH_SAFE_MALLOC_COBJECT(Type) \
UAllocator::safeMallocCObject<Type>(); \
#define CGRAPH_MAKE_UNIQUE_COBJECT(Type) \
UAllocator::makeUniqueCObject<Type>(); \
#define CGRAPH_GENERATE_SESSION \
UAllocator::generateSession(); \
CGRAPH_NAMESPACE_END
#endif //CGRAPH_UALLOCATOR_H

+ 75
- 0
src/UtilsCtrl/UtilsDefine.h View File

@ -0,0 +1,75 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UtilsDefine.h
@Time: 2021/4/30 8:52
@Desc:
***************************/
#ifndef CGRAPH_UTILSDEFINE_H
#define CGRAPH_UTILSDEFINE_H
#include <iostream>
#include <string>
#include "../CBasic/CBasicInclude.h"
#include "UAllocator.h"
#include "UtilsFunction.h"
CGRAPH_NAMESPACE_BEGIN
#ifdef _ENABLE_LIKELY_
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
#else
#define likely
#define unlikely
#endif
/* 判断传入的指针信息是否为空 */
#define CGRAPH_ASSERT_NOT_NULL(ptr) \
if (unlikely(nullptr == (ptr))) { \
return CStatus("ptr is nullptr"); \
} \
#define CGRAPH_ASSERT_NOT_NULL_RETURN_NULL(ptr) \
if (unlikely(nullptr == (ptr))) { \
return nullptr; \
} \
/* 判断函数流程是否可以继续 */
static std::mutex g_check_status_mtx;
#define CGRAPH_FUNCTION_CHECK_STATUS \
if (unlikely(!status.isOK())) { \
std::lock_guard<std::mutex> lock{ g_check_status_mtx }; \
CGRAPH_ECHO("%s | %s | line = [%d], errorCode = [%d], errorInfo = [%s].", \
__FILE__, __FUNCTION__, __LINE__, status.getCode(), status.getInfo().c_str()); \
return status; \
} \
/* 删除资源信息 */
#define CGRAPH_DELETE_PTR(ptr) \
if (unlikely((ptr) != nullptr)) { \
delete (ptr); \
(ptr) = nullptr; \
} \
#define CGRAPH_ASSERT_INIT(isInit) \
if (unlikely((isInit) != is_init_)) { \
return CStatus("init status is not suitable"); \
} \
#define CGRAPH_ASSERT_INIT_RETURN_NULL(isInit) \
if (unlikely((isInit) != is_init_)) { \
return nullptr; \
} \
#define CGRAPH_SLEEP_MILLISECOND(ms) \
std::this_thread::sleep_for(std::chrono::milliseconds(ms)); \
#define CGRAPH_SLEEP_SECOND(s) \
std::this_thread::sleep_for(std::chrono::seconds(s)); \
CGRAPH_NAMESPACE_END
#endif //CGRAPH_UTILSDEFINE_H

+ 131
- 0
src/UtilsCtrl/UtilsFunction.h View File

@ -0,0 +1,131 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UtilsFunction.h
@Time: 2022/1/26 11:27
@Desc:
***************************/
#ifndef CGRAPH_UTILSFUNCTION_H
#define CGRAPH_UTILSFUNCTION_H
#include <mutex>
#include <chrono>
#include <ctime>
#include <cstdarg>
#include <algorithm>
CGRAPH_NAMESPACE_BEGIN
/**
*
* @param cmd
* @param ...
* 线使
*/
static std::mutex g_echo_mtx;
inline CVoid CGRAPH_ECHO(const char *cmd, ...) {
#ifdef _CGRAPH_SILENCE_
return;
#endif
std::lock_guard<std::mutex> lock{ g_echo_mtx };
#ifndef _WIN32
// windows系统
auto now = std::chrono::system_clock::now();
//
uint64_t disMs = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count()
- std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count() * 1000;
time_t tt = std::chrono::system_clock::to_time_t(now);
auto localTime = localtime(&tt);
char strTime[32] = { 0 };
sprintf(strTime, "[%04d-%02d-%02d %02d:%02d:%02d.%03d]", localTime->tm_year + 1900,
localTime->tm_mon + 1, localTime->tm_mday, localTime->tm_hour,
localTime->tm_min, localTime->tm_sec, (int)disMs);
std::cout << "[CGraph] " << strTime << " ";
#else
// windows系统
time_t curTime;
time(&curTime);
std::string ct = ctime(&curTime);
std::cout << "[CGraph] ["
<< ct.assign(ct.begin(), ct.end()-1) // \n信息
<< "] ";
#endif
va_list args;
va_start(args, cmd);
vprintf(cmd, args);
va_end(args);
std::cout << "\n";
}
/**
*
* @tparam T (std::vector<int>)
* @param container
* @return
*/
template<typename T>
typename T::value_type CGRAPH_CONTAINER_SUM(const T& container) {
typename T::value_type result = 0;
for (const auto& val : container) {
result += val;
}
return result;
}
/**
*
* @tparam T (std::vector<int>)
* @param container
* @return
*/
template<typename T>
typename T::value_type CGRAPH_CONTAINER_MULTIPLY(const T& container) {
typename T::value_type result = 1;
for (const auto& val : container) {
result *= val;
}
return result;
}
/**
* max值
* @tparam T
* @param value
* @return
*/
template <typename T>
T CGRAPH_MAX(T val) {
return val;
}
template <typename T, typename... Args>
T CGRAPH_MAX(T val, Args... args) {
return std::max(val, CGRAPH_MAX(args...));
}
/**
*
* @tparam T
* @param t
* @return
*/
template<typename T>
T CGRAPH_SUM(T t) {
return t;
}
template<typename T, typename... Args>
T CGRAPH_SUM(T val, Args... args) {
return val + CGRAPH_SUM(args...);
}
CGRAPH_NAMESPACE_END
#endif //CGRAPH_UTILSFUNCTION_H

+ 16
- 0
src/UtilsCtrl/UtilsInclude.h View File

@ -0,0 +1,16 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UtilsInclude.h
@Time: 2021/4/30 9:14
@Desc:
***************************/
#ifndef CGRAPH_UTILSINCLUDE_H
#define CGRAPH_UTILSINCLUDE_H
#include "UtilsDefine.h"
#include "UAllocator.h"
#include "ThreadPool/UThreadPoolInclude.h"
#endif //CGRAPH_UTILSINCLUDE_H

+ 26
- 0
src/UtilsCtrl/UtilsObject.h View File

@ -0,0 +1,26 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: UtilsObject.h
@Time: 2021/9/19 12:00
@Desc:
***************************/
#ifndef CGRAPH_UTILSOBJECT_H
#define CGRAPH_UTILSOBJECT_H
#include "UtilsDefine.h"
CGRAPH_NAMESPACE_BEGIN
class UtilsObject : public CObject {
public:
CStatus run() override {
CGRAPH_NO_SUPPORT
}
};
CGRAPH_NAMESPACE_END
#endif //CGRAPH_UTILSOBJECT_H

+ 136
- 0
tutorial.cpp View File

@ -0,0 +1,136 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: main.cpp
@Time: 2021/9/2 11:12
@Desc: threadpool工具的使用方法
***************************/
#include "src/CThreadPool.h"
#include "MyFunction.h"
using namespace CTP;
void tutorial_threadpool_1(UThreadPoolPtr tp) {
/**
* 线
* 1
* 2
* 3
* 4
*/
int i = 6, j = 3;
std::string str = "5";
MyFunction mf;
auto r1 = tp->commit([i, j] { return add(i, j); }); // 可以通过lambda表达式传递函数
std::future<float> r2 = tp->commit(std::bind(minusBy5, 8.5f)); // 可以传入任意个数的入参
auto r3 = tp->commit(std::bind(&MyFunction::concat, mf, str)); // 返回值可以是任意类型
std::future<int> r4 = tp->commit([i, j] { return MyFunction::multiply(i, j); }); // 返回值实际上是std::future<T>类型
std::cout << r1.get() << std::endl; // 返回值可以是int类型
std::cout << r2.get() << std::endl; // 等待r2对应函数执行完毕后,再继续执行。不调用get()为不等待
std::cout << r3.get() << std::endl; // 返回值也可是string或其他任意类型
std::cout << r4.get() << std::endl;
}
void tutorial_threadpool_2(UThreadPoolPtr tp) {
/**
* (taskGroup)
*/
UTaskGroup taskGroup;
/** 添加一个不耗时的任务 */
int i = 1, j = 2, k = 3;
auto hcg = [] { CGRAPH_ECHO("Hello, CGraph."); };
taskGroup.addTask(hcg);
/** 添加一个耗时为1000ms的任务 */
taskGroup.addTask([i, j] {
int result = i + j;
CGRAPH_SLEEP_MILLISECOND(1000)
CGRAPH_ECHO("sleep for 1 second, [%d] + [%d] = [%d], run success.", i, j, result);
});
taskGroup.addTask([i, j, k] {
int result = i - j + k;
CGRAPH_SLEEP_MILLISECOND(2000)
CGRAPH_ECHO("sleep for 2 second, [%d] - [%d] + [%d] = [%d], run success.", i, j, k, result);
return result; // submit接口,不会对线程函数返回值进行判断。如果需要判断,考虑commit方式
});
/** 如果添加耗时3000ms的任务,则整体执行失败 */
/* taskGroup.addTask([] {
CGRAPH_SLEEP_MILLISECOND(3000)
}); */
/**
* task group结束后的回调信息
* sts是task group整体执行结果的返回值信息
* */
/* taskGroup.setOnFinished([] (const CStatus& sts) {
if(sts.isOK()) {
CGRAPH_ECHO("task group run success.");
} else {
CGRAPH_ECHO("task group run failed, error info is [%s].", sts.getInfo().c_str());
}
}); */
/**
* =2500ms
* sleep(3000)2500ms的时候退出
* status中提示超时信息
* */
CStatus status = tp->submit(taskGroup, 2500);
CGRAPH_ECHO("task group run status is [%d].", status.getCode());
}
void tutorial_threadpool_3(UThreadPoolPtr tp) {
/**
* 0~100
* 使commit和submit函数的区别
* 1commit()线future的类型返回
* 2submit()线
* 3线
*/
const int size = 100;
CGRAPH_ECHO("thread pool task submit version : ");
for (int i = 0; i < size; i++) {
tp->submit([i] { std::cout << i << " "; }); // 可以看到,submit版本是有序执行的。如果需要想要无序执行,可以通过创建taskGroup的方式进行,或者使用commit方法
}
CGRAPH_SLEEP_SECOND(1) // 等待上面函数执行完毕,以便于观察结果。无实际意义
std::cout << "\r\n";
CGRAPH_ECHO("thread pool task group submit version : ");
UTaskGroup taskGroup;
for (int i = 0; i < size; i++) {
taskGroup.addTask([i] { std::cout << i << " "; }); // 将任务放到一个taskGroup中,并发执行。执行的结果是无序的
}
tp->submit(taskGroup);
CGRAPH_SLEEP_SECOND(1)
std::cout << "\r\n";
CGRAPH_ECHO("thread pool task commit version : ");
for (int i = 0; i < size; i++) {
tp->commit([i] { std::cout << i << " "; }); // commit版本,是无序执行的
}
CGRAPH_SLEEP_SECOND(1)
std::cout << "\r\n";
}
int main() {
auto pool = std::make_unique<UThreadPool>(); // 构造一个线程池类的智能指针
CGRAPH_ECHO("======== tutorial_threadpool_1 begin. ========");
tutorial_threadpool_1(pool.get());
CGRAPH_ECHO("======== tutorial_threadpool_2 begin. ========");
tutorial_threadpool_2(pool.get());
CGRAPH_ECHO("======== tutorial_threadpool_3 begin. ========");
tutorial_threadpool_3(pool.get());
return 0;
}

Loading…
Cancel
Save