|
|
@ -17,6 +17,7 @@ class UThreadPrimary : public UThreadBase { |
|
|
|
protected: |
|
|
|
explicit UThreadPrimary() { |
|
|
|
index_ = CGRAPH_SECONDARY_THREAD_COMMON_ID; |
|
|
|
steal_range_ = 0; |
|
|
|
pool_threads_ = nullptr; |
|
|
|
type_ = CGRAPH_THREAD_TYPE_PRIMARY; |
|
|
|
} |
|
|
@ -25,8 +26,10 @@ protected: |
|
|
|
CStatus init() override { |
|
|
|
CGRAPH_FUNCTION_BEGIN |
|
|
|
CGRAPH_ASSERT_INIT(false) |
|
|
|
CGRAPH_ASSERT_NOT_NULL(config_) |
|
|
|
|
|
|
|
is_init_ = true; |
|
|
|
steal_range_ = config_->calcStealRange(); |
|
|
|
thread_ = std::move(std::thread(&UThreadPrimary::run, this)); |
|
|
|
setSchedParam(); |
|
|
|
setAffinity(index_); |
|
|
@ -47,9 +50,7 @@ protected: |
|
|
|
UThreadPoolConfigPtr config) { |
|
|
|
CGRAPH_FUNCTION_BEGIN |
|
|
|
CGRAPH_ASSERT_INIT(false) // 初始化之前,设置参数 |
|
|
|
CGRAPH_ASSERT_NOT_NULL(poolTaskQueue) |
|
|
|
CGRAPH_ASSERT_NOT_NULL(poolThreads) |
|
|
|
CGRAPH_ASSERT_NOT_NULL(config) |
|
|
|
CGRAPH_ASSERT_NOT_NULL(poolTaskQueue, poolThreads, config) |
|
|
|
|
|
|
|
this->index_ = index; |
|
|
|
this->pool_task_queue_ = poolTaskQueue; |
|
|
@ -67,7 +68,6 @@ protected: |
|
|
|
CGRAPH_FUNCTION_BEGIN |
|
|
|
CGRAPH_ASSERT_INIT(true) |
|
|
|
CGRAPH_ASSERT_NOT_NULL(pool_threads_) |
|
|
|
CGRAPH_ASSERT_NOT_NULL(config_) |
|
|
|
|
|
|
|
/** |
|
|
|
* 线程池中任何一个primary线程为null都不可以执行 |
|
|
@ -81,38 +81,22 @@ protected: |
|
|
|
CGRAPH_RETURN_ERROR_STATUS("primary thread is null") |
|
|
|
} |
|
|
|
|
|
|
|
if (config_->calcBatchTaskRatio()) { |
|
|
|
while (done_) { |
|
|
|
processTasks(); // 批量任务获取执行接口 |
|
|
|
} |
|
|
|
} else { |
|
|
|
while (done_) { |
|
|
|
processTask(); // 单个任务获取执行接口 |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
status = loopProcess(); |
|
|
|
CGRAPH_FUNCTION_END |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
* 获取并执行任务 |
|
|
|
* @return |
|
|
|
*/ |
|
|
|
CVoid processTask() { |
|
|
|
CVoid processTask() override { |
|
|
|
UTask task; |
|
|
|
if (popTask(task) || popPoolTask(task) || stealTask(task)) { |
|
|
|
runTask(task); |
|
|
|
} else { |
|
|
|
std::this_thread::yield(); |
|
|
|
std::this_thread::yield(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
* 获取批量执行task信息 |
|
|
|
*/ |
|
|
|
CVoid processTasks() { |
|
|
|
CVoid processTasks() override { |
|
|
|
UTaskArr tasks; |
|
|
|
if (popTask(tasks) || popPoolTask(tasks) || stealTask(tasks)) { |
|
|
|
// 尝试从主线程中获取/盗取批量task,如果成功,则依次执行 |
|
|
@ -124,12 +108,25 @@ protected: |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
* 依次push到任一队列里。如果都失败,则yield,然后重新push |
|
|
|
* @param task |
|
|
|
* @return |
|
|
|
*/ |
|
|
|
CVoid pushTask(UTask&& task) { |
|
|
|
while (!(primary_queue_.tryPush(std::move(task)) |
|
|
|
|| secondary_queue_.tryPush(std::move(task)))) { |
|
|
|
std::this_thread::yield(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
* 从本地弹出一个任务 |
|
|
|
* @param task |
|
|
|
* @return |
|
|
|
*/ |
|
|
|
bool popTask(UTaskRef task) { |
|
|
|
return work_stealing_queue_.tryPop(task); |
|
|
|
return primary_queue_.tryPop(task) || secondary_queue_.tryPop(task); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -139,7 +136,13 @@ protected: |
|
|
|
* @return |
|
|
|
*/ |
|
|
|
bool popTask(UTaskArrRef tasks) { |
|
|
|
return work_stealing_queue_.tryPop(tasks, config_->max_local_batch_size_); |
|
|
|
CBool result = primary_queue_.tryPop(tasks, config_->max_local_batch_size_); |
|
|
|
auto leftSize = config_->max_local_batch_size_ - tasks.size(); |
|
|
|
if (leftSize > 0) { |
|
|
|
// 如果凑齐了,就不需要了。没凑齐的话,就继续 |
|
|
|
result |= (secondary_queue_.tryPop(tasks, leftSize)); |
|
|
|
} |
|
|
|
return result; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -161,15 +164,16 @@ protected: |
|
|
|
* 窃取的时候,仅从相邻的primary线程中窃取 |
|
|
|
* 待窃取相邻的数量,不能超过默认primary线程数 |
|
|
|
*/ |
|
|
|
int range = config_->calcStealRange(); |
|
|
|
for (int i = 0; i < range; i++) { |
|
|
|
for (int i = 0; i < steal_range_; i++) { |
|
|
|
/** |
|
|
|
* 从线程中周围的thread中,窃取任务。 |
|
|
|
* 如果成功,则返回true,并且执行任务。 |
|
|
|
* steal 的时候,先从第二个队列里偷,从而降低触碰锁的概率 |
|
|
|
*/ |
|
|
|
int curIndex = (index_ + i + 1) % config_->default_thread_size_; |
|
|
|
if (nullptr != (*pool_threads_)[curIndex] |
|
|
|
&& ((*pool_threads_)[curIndex])->work_stealing_queue_.trySteal(task)) { |
|
|
|
if (likely((*pool_threads_)[curIndex]) |
|
|
|
&& (((*pool_threads_)[curIndex])->secondary_queue_.trySteal(task)) |
|
|
|
|| ((*pool_threads_)[curIndex])->primary_queue_.trySteal(task)) { |
|
|
|
return true; |
|
|
|
} |
|
|
|
} |
|
|
@ -184,16 +188,28 @@ protected: |
|
|
|
* @return |
|
|
|
*/ |
|
|
|
bool stealTask(UTaskArrRef tasks) { |
|
|
|
if (unlikely(pool_threads_->size() < config_->default_thread_size_)) { |
|
|
|
if (unlikely(pool_threads_->size() != config_->default_thread_size_)) { |
|
|
|
return false; |
|
|
|
} |
|
|
|
|
|
|
|
int range = config_->calcStealRange(); |
|
|
|
for (int i = 0; i < range; i++) { |
|
|
|
for (int i = 0; i < steal_range_; i++) { |
|
|
|
int curIndex = (index_ + i + 1) % config_->default_thread_size_; |
|
|
|
if (nullptr != (*pool_threads_)[curIndex] |
|
|
|
&& ((*pool_threads_)[curIndex])->work_stealing_queue_.trySteal(tasks, config_->max_steal_batch_size_)) { |
|
|
|
return true; |
|
|
|
if (likely((*pool_threads_)[curIndex])) { |
|
|
|
bool result = ((*pool_threads_)[curIndex])->secondary_queue_.trySteal(tasks, config_->max_steal_batch_size_); |
|
|
|
auto leftSize = config_->max_steal_batch_size_ - tasks.size(); |
|
|
|
if (leftSize > 0) { |
|
|
|
result |= ((*pool_threads_)[curIndex])->primary_queue_.trySteal(tasks, leftSize); |
|
|
|
} |
|
|
|
|
|
|
|
if (result) { |
|
|
|
/** |
|
|
|
* 在这里,我们对模型进行了简化。实现的思路是: |
|
|
|
* 尝试从邻居主线程(先secondary,再primary)中,获取 x(=max_steal_batch_size_) 个task, |
|
|
|
* 如果从某一个邻居中,获取了 y(<=x) 个task,则也终止steal的流程 |
|
|
|
* 且如果如果有一次批量steal成功,就认定成功 |
|
|
|
*/ |
|
|
|
return true; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
@ -201,8 +217,10 @@ protected: |
|
|
|
} |
|
|
|
|
|
|
|
private: |
|
|
|
int index_ {CGRAPH_SECONDARY_THREAD_COMMON_ID}; // 线程index |
|
|
|
UWorkStealingQueue work_stealing_queue_; // 内部队列信息 |
|
|
|
int index_; // 线程index |
|
|
|
int steal_range_; // 偷窃的范围信息 |
|
|
|
UWorkStealingQueue primary_queue_; // 内部队列信息 |
|
|
|
UWorkStealingQueue secondary_queue_; // 第二个队列,用于减少触锁概率,提升性能 |
|
|
|
std::vector<UThreadPrimary *>* pool_threads_; // 用于存放线程池中的线程信息 |
|
|
|
|
|
|
|
friend class UThreadPool; |
|
|
|