Browse Source

feat: 优化主线程执行策略,提升整体性能

ChunelFeng 1 year ago
parent
commit
d6306ef444
13 changed files with 271 additions and 46 deletions
  1. +5
    -0
      .gitattributes
  2. +17
    -1
      src/CBasic/CStatus.h
  3. +2
    -2
      src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h
  4. +77
    -0
      src/UtilsCtrl/ThreadPool/Queue/ULockFreeRingBufferQueue.h
  5. +1
    -0
      src/UtilsCtrl/ThreadPool/Queue/UQueueInclude.h
  6. +55
    -19
      src/UtilsCtrl/ThreadPool/Queue/UWorkStealingQueue.h
  7. +51
    -0
      src/UtilsCtrl/ThreadPool/Semaphore/USemaphore.h
  8. +52
    -18
      src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h
  9. +1
    -1
      src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h
  10. +1
    -1
      src/UtilsCtrl/ThreadPool/UThreadPool.cpp
  11. +3
    -1
      src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h
  12. +5
    -3
      src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h
  13. +1
    -0
      src/UtilsCtrl/ThreadPool/UThreadPoolInclude.h

+ 5
- 0
.gitattributes View File

@ -0,0 +1,5 @@
# To shield the difference between Windows and Linux systems.
*.h text eol=native
*.cpp text eol=native
*.inl text eol=native

+ 17
- 1
src/CBasic/CStatus.h View File

@ -46,18 +46,34 @@ public:
}
CSTATUS(const CSTATUS &status) {
if (status.isOK()) {
return;
}
this->error_code_ = status.error_code_;
this->error_info_ = status.error_info_;
this->error_locate_ = status.error_locate_;
}
CSTATUS(const CSTATUS &&status) noexcept {
if (status.isOK()) {
return;
}
this->error_code_ = status.error_code_;
this->error_info_ = status.error_info_;
this->error_locate_ = status.error_locate_;
}
CSTATUS& operator=(const CSTATUS& status) = default;
CSTATUS& operator=(const CSTATUS& status) {
if (!status.isOK()) {
// status是正常的话
this->error_code_ = status.error_code_;
this->error_info_ = status.error_info_;
this->error_locate_ = status.error_locate_;
}
return (*this);
}
CSTATUS& operator+=(const CSTATUS& cur) {
/**

+ 2
- 2
src/UtilsCtrl/ThreadPool/Queue/UAtomicQueue.h View File

@ -43,7 +43,7 @@ public:
*/
CBool tryPop(T& value) {
CBool result = false;
if (mutex_.try_lock()) {
if (!queue_.empty() && mutex_.try_lock()) {
if (!queue_.empty()) {
value = std::move(*queue_.front());
queue_.pop();
@ -64,7 +64,7 @@ public:
*/
CBool tryPop(std::vector<T>& values, int maxPoolBatchSize) {
CBool result = false;
if (mutex_.try_lock()) {
if (!queue_.empty() && mutex_.try_lock()) {
while (!queue_.empty() && maxPoolBatchSize-- > 0) {
values.emplace_back(std::move(*queue_.front()));
queue_.pop();

+ 77
- 0
src/UtilsCtrl/ThreadPool/Queue/ULockFreeRingBufferQueue.h View File

@ -0,0 +1,77 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: ULockFreeRingBufferQueue.h
@Time: 2023/10/7 21:35
@Desc:
***************************/
#ifndef CGRAPH_ULOCKFREERINGBUFFERQUEUE_H
#define CGRAPH_ULOCKFREERINGBUFFERQUEUE_H
#include <atomic>
#include <memory>
#include "UQueueObject.h"
CGRAPH_NAMESPACE_BEGIN
template<typename T, CInt CAPACITY = 32>
class ULockFreeRingBufferQueue : public UQueueObject {
public:
explicit ULockFreeRingBufferQueue() {
head_ = 0;
tail_ = 0;
ring_buffer_.resize(CAPACITY);
}
~ULockFreeRingBufferQueue() override {
ring_buffer_.clear();
}
/**
*
* @param value
*/
CVoid push(T&& value) {
int curTail = tail_.load(std::memory_order_relaxed);
int nextTail = (curTail + 1) % CAPACITY;
while (nextTail == head_.load(std::memory_order_acquire)) {
// 线
std::this_thread::yield();
}
ring_buffer_[curTail] = std::move(value);
tail_.store(nextTail, std::memory_order_release);
}
/**
*
* @param value
* @return
*/
CBool tryPop(T& value) {
int curHead = head_.load(std::memory_order_relaxed);
if (curHead == tail_.load(std::memory_order_acquire)) {
// false
return false;
}
value = std::move(ring_buffer_[curHead]);
int nextHead = (curHead + 1) % CAPACITY;
head_.store(nextHead, std::memory_order_release);
return true;
}
private:
std::atomic<CInt> head_; //
std::atomic<CInt> tail_; //
std::vector<std::unique_ptr<T> > ring_buffer_; //
};
CGRAPH_NAMESPACE_END
#endif //CGRAPH_ULOCKFREERINGBUFFERQUEUE_H

+ 1
- 0
src/UtilsCtrl/ThreadPool/Queue/UQueueInclude.h View File

@ -13,5 +13,6 @@
#include "UWorkStealingQueue.h"
#include "UAtomicPriorityQueue.h"
#include "UAtomicRingBufferQueue.h"
#include "ULockFreeRingBufferQueue.h"
#endif //CGRAPH_UQUEUEINCLUDE_H

+ 55
- 19
src/UtilsCtrl/ThreadPool/Queue/UWorkStealingQueue.h View File

@ -13,21 +13,20 @@
#include <deque>
#include "UQueueObject.h"
#include "../Task/UTaskInclude.h"
#include "../Lock/ULockInclude.h"
CGRAPH_NAMESPACE_BEGIN
template<typename T>
class UWorkStealingQueue : public UQueueObject {
public:
/**
*
* @param task
*/
CVoid push(UTask&& task) {
CVoid push(T&& task) {
while (true) {
if (lock_.try_lock()) {
deque_.emplace_front(std::move(task));
deque_.emplace_front(std::forward<T>(task));
lock_.unlock();
break;
} else {
@ -42,10 +41,47 @@ public:
* @param task
* @return
*/
CBool tryPush(UTask&& task) {
CBool tryPush(T&& task) {
CBool result = false;
if (lock_.try_lock()) {
deque_.emplace_back(std::move(task));
deque_.emplace_back(std::forward<T>(task));
lock_.unlock();
result = true;
}
return result;
}
/**
*
* @param task
*/
CVoid push(std::vector<T>& tasks) {
while (true) {
if (lock_.try_lock()) {
for (const auto& task : tasks) {
deque_.emplace_front(std::forward<T>(task));
}
lock_.unlock();
break;
} else {
std::this_thread::yield();
}
}
}
/**
*
* @param tasks
* @return
*/
CBool tryPush(std::vector<T>& tasks) {
CBool result = false;
if (lock_.try_lock()) {
for (const auto& task : tasks) {
deque_.emplace_back(std::forward<T>(task));
}
lock_.unlock();
result = true;
}
@ -58,12 +94,12 @@ public:
* @param task
* @return
*/
CBool tryPop(UTask& task) {
CBool tryPop(T& task) {
// 使raii锁线
bool result = false;
if (lock_.try_lock()) {
if (!deque_.empty() && lock_.try_lock()) {
if (!deque_.empty()) {
task = std::move(deque_.front()); //
task = std::forward<T>(deque_.front()); //
deque_.pop_front();
result = true;
}
@ -80,11 +116,11 @@ public:
* @param maxLocalBatchSize
* @return
*/
CBool tryPop(UTaskArrRef taskArr, int maxLocalBatchSize) {
CBool tryPop(std::vector<T>& taskArr, int maxLocalBatchSize) {
bool result = false;
if (lock_.try_lock()) {
if (!deque_.empty() && lock_.try_lock()) {
while (!deque_.empty() && maxLocalBatchSize--) {
taskArr.emplace_back(std::move(deque_.front()));
taskArr.emplace_back(std::forward<T>(deque_.front()));
deque_.pop_front();
result = true;
}
@ -100,11 +136,11 @@ public:
* @param task
* @return
*/
CBool trySteal(UTask& task) {
CBool trySteal(T& task) {
bool result = false;
if (lock_.try_lock()) {
if (!deque_.empty() && lock_.try_lock()) {
if (!deque_.empty()) {
task = std::move(deque_.back()); //
task = std::forward<T>(deque_.back()); //
deque_.pop_back();
result = true;
}
@ -120,11 +156,11 @@ public:
* @param taskArr
* @return
*/
CBool trySteal(UTaskArrRef taskArr, int maxStealBatchSize) {
CBool trySteal(std::vector<T>& taskArr, int maxStealBatchSize) {
bool result = false;
if (lock_.try_lock()) {
if (!deque_.empty() && lock_.try_lock()) {
while (!deque_.empty() && maxStealBatchSize--) {
taskArr.emplace_back(std::move(deque_.back()));
taskArr.emplace_back(std::forward<T>(deque_.back()));
deque_.pop_back();
result = true;
}
@ -139,7 +175,7 @@ public:
CGRAPH_NO_ALLOWED_COPY(UWorkStealingQueue)
private:
std::deque<UTask> deque_; //
std::deque<T> deque_; //
std::mutex lock_; // deque_的锁
};

+ 51
- 0
src/UtilsCtrl/ThreadPool/Semaphore/USemaphore.h View File

@ -0,0 +1,51 @@
/***************************
@Author: Chunel
@Contact: chunel@foxmail.com
@File: USemaphore.h
@Time: 2023/10/9 22:01
@Desc:
***************************/
#ifndef CGRAPH_USEMAPHORE_H
#define CGRAPH_USEMAPHORE_H
CGRAPH_NAMESPACE_BEGIN
#include <mutex>
#include <condition_variable>
#include "../UThreadObject.h"
class USemaphore : public UThreadObject {
public:
/**
*
*/
CVoid signal() {
CGRAPH_UNIQUE_LOCK lk(mutex_);
cnt_++;
if (cnt_ <= 0) {
cv_.notify_one();
}
}
/**
*
*/
CVoid wait() {
CGRAPH_UNIQUE_LOCK lk(mutex_);
cnt_--;
if (cnt_ < 0) {
cv_.wait(lk);
}
}
private:
CInt cnt_ = 0; //
std::mutex mutex_;
std::condition_variable cv_;
};
CGRAPH_NAMESPACE_END
#endif //CGRAPH_USEMAPHORE_H

+ 52
- 18
src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h View File

@ -9,6 +9,9 @@
#ifndef CGRAPH_UTHREADPRIMARY_H
#define CGRAPH_UTHREADPRIMARY_H
#include <vector>
#include <mutex>
#include "UThreadBase.h"
CGRAPH_NAMESPACE_BEGIN
@ -17,7 +20,6 @@ class UThreadPrimary : public UThreadBase {
protected:
explicit UThreadPrimary() {
index_ = CGRAPH_SECONDARY_THREAD_COMMON_ID;
steal_range_ = 0;
pool_threads_ = nullptr;
type_ = CGRAPH_THREAD_TYPE_PRIMARY;
}
@ -29,7 +31,7 @@ protected:
CGRAPH_ASSERT_NOT_NULL(config_)
is_init_ = true;
steal_range_ = config_->calcStealRange();
buildStealTargets();
thread_ = std::move(std::thread(&UThreadPrimary::run, this));
setSchedParam();
setAffinity(index_);
@ -64,7 +66,7 @@ protected:
* 线
* @return
*/
CStatus run() override {
CStatus run() final {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(true)
CGRAPH_ASSERT_NOT_NULL(pool_threads_)
@ -91,7 +93,7 @@ protected:
if (popTask(task) || popPoolTask(task) || stealTask(task)) {
runTask(task);
} else {
std::this_thread::yield();
fatWait();
}
}
@ -102,7 +104,22 @@ protected:
// 线/task
runTasks(tasks);
} else {
std::this_thread::yield();
fatWait();
}
}
/**
* task的状态
*
*/
CVoid fatWait() {
cur_empty_epoch_++;
std::this_thread::yield();
if (cur_empty_epoch_ >= config_->primary_thread_busy_epoch_) {
CGRAPH_UNIQUE_LOCK lk(mutex_);
cv_.wait_for(lk, std::chrono::milliseconds(config_->primary_thread_empty_interval_));
cur_empty_epoch_ = 0;
}
}
@ -117,6 +134,7 @@ protected:
|| secondary_queue_.tryPush(std::move(task)))) {
std::this_thread::yield();
}
cv_.notify_one();
}
@ -164,16 +182,15 @@ protected:
* primary线程中窃取
* primary线程数
*/
for (int i = 0; i < steal_range_; i++) {
for (auto& target : steal_targets_) {
/**
* 线thread中
* true
* steal
*/
int curIndex = (index_ + i + 1) % config_->default_thread_size_;
if (likely((*pool_threads_)[curIndex])
&& (((*pool_threads_)[curIndex])->secondary_queue_.trySteal(task))
|| ((*pool_threads_)[curIndex])->primary_queue_.trySteal(task)) {
if (likely((*pool_threads_)[target])
&& (((*pool_threads_)[target])->secondary_queue_.trySteal(task))
|| ((*pool_threads_)[target])->primary_queue_.trySteal(task)) {
return true;
}
}
@ -192,13 +209,12 @@ protected:
return false;
}
for (int i = 0; i < steal_range_; i++) {
int curIndex = (index_ + i + 1) % config_->default_thread_size_;
if (likely((*pool_threads_)[curIndex])) {
bool result = ((*pool_threads_)[curIndex])->secondary_queue_.trySteal(tasks, config_->max_steal_batch_size_);
for (auto& target : steal_targets_) {
if (likely((*pool_threads_)[target])) {
bool result = ((*pool_threads_)[target])->secondary_queue_.trySteal(tasks, config_->max_steal_batch_size_);
auto leftSize = config_->max_steal_batch_size_ - tasks.size();
if (leftSize > 0) {
result |= ((*pool_threads_)[curIndex])->primary_queue_.trySteal(tasks, leftSize);
result |= ((*pool_threads_)[target])->primary_queue_.trySteal(tasks, leftSize);
}
if (result) {
@ -216,12 +232,30 @@ protected:
return false;
}
/**
* steal target
* @return
*/
CVoid buildStealTargets() {
steal_targets_.clear();
for (int i = 0; i < config_->calcStealRange(); i++) {
auto target = (index_ + i + 1) % config_->default_thread_size_;
steal_targets_.push_back(target);
}
steal_targets_.shrink_to_fit();
}
private:
int index_; // 线index
int steal_range_; //
UWorkStealingQueue primary_queue_; //
UWorkStealingQueue secondary_queue_; //
int cur_empty_epoch_ = 0; //
UWorkStealingQueue<UTask> primary_queue_; //
UWorkStealingQueue<UTask> secondary_queue_; //
std::vector<UThreadPrimary *>* pool_threads_; // 线线
std::vector<int> steal_targets_; //
std::mutex mutex_;
std::condition_variable cv_;
friend class UThreadPool;
friend class UAllocator;

+ 1
- 1
src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h View File

@ -56,7 +56,7 @@ protected:
}
CStatus run() override {
CStatus run() final {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(true)

+ 1
- 1
src/UtilsCtrl/ThreadPool/UThreadPool.cpp View File

@ -229,7 +229,7 @@ CVoid UThreadPool::monitor() {
CGRAPH_SLEEP_SECOND(1)
}
int span = config_.monitor_span_;
auto span = config_.monitor_span_;
while (config_.monitor_enable_ && is_init_ && span--) {
CGRAPH_SLEEP_SECOND(1) // 保证可以快速退出
}

+ 3
- 1
src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h View File

@ -23,8 +23,10 @@ struct UThreadPoolConfig : public CStruct {
int max_local_batch_size_ = CGRAPH_MAX_LOCAL_BATCH_SIZE;
int max_pool_batch_size_ = CGRAPH_MAX_POOL_BATCH_SIZE;
int max_steal_batch_size_ = CGRAPH_MAX_STEAL_BATCH_SIZE;
int primary_thread_busy_epoch_ = CGRAPH_PRIMARY_THREAD_BUSY_EPOCH;
int primary_thread_empty_interval_ = CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL;
int secondary_thread_ttl_ = CGRAPH_SECONDARY_THREAD_TTL;
int monitor_span_ = CGRAPH_MONITOR_SPAN;
CSec monitor_span_ = CGRAPH_MONITOR_SPAN;
CMSec queue_emtpy_interval_ = CGRAPH_QUEUE_EMPTY_INTERVAL;
int primary_thread_policy_ = CGRAPH_PRIMARY_THREAD_POLICY;
int secondary_thread_policy_ = CGRAPH_SECONDARY_THREAD_POLICY;

+ 5
- 3
src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h View File

@ -49,17 +49,19 @@ static const int CGRAPH_LONG_TIME_TASK_STRATEGY = -101;
/**
* 线
*/
static const int CGRAPH_DEFAULT_THREAD_SIZE = 0; // 线
static const int CGRAPH_SECONDARY_THREAD_SIZE = 8; // 线
static const int CGRAPH_DEFAULT_THREAD_SIZE = 8; // 线
static const int CGRAPH_SECONDARY_THREAD_SIZE = 0; // 线
static const int CGRAPH_MAX_THREAD_SIZE = 16; // 线
static const int CGRAPH_MAX_TASK_STEAL_RANGE = 2; //
static const bool CGRAPH_BATCH_TASK_ENABLE = false; //
static const int CGRAPH_MAX_LOCAL_BATCH_SIZE = 2; //
static const int CGRAPH_MAX_POOL_BATCH_SIZE = 2; //
static const int CGRAPH_MAX_STEAL_BATCH_SIZE = 2; //
static const int CGRAPH_PRIMARY_THREAD_BUSY_EPOCH = 10; // 线wait状态的轮数
static const CMSec CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL = 3; // 线
static const int CGRAPH_SECONDARY_THREAD_TTL = 10; // 线ttls
static const bool CGRAPH_MONITOR_ENABLE = false; //
static const int CGRAPH_MONITOR_SPAN = 5; // 线s
static const CSec CGRAPH_MONITOR_SPAN = 5; // 线s
static const CMSec CGRAPH_QUEUE_EMPTY_INTERVAL = 3; // 线ms
static const bool CGRAPH_BIND_CPU_ENABLE = false; // cpu模式线
static const int CGRAPH_PRIMARY_THREAD_POLICY = CGRAPH_THREAD_SCHED_OTHER; // 线

+ 1
- 0
src/UtilsCtrl/ThreadPool/UThreadPoolInclude.h View File

@ -17,5 +17,6 @@
#include "Task/UTaskInclude.h"
#include "Thread/UThreadInclude.h"
#include "Lock/ULockInclude.h"
#include "Semaphore/USemaphore.h"
#endif //CGRAPH_UTHREADPOOLINCLUDE_H

Loading…
Cancel
Save