You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

157 lines
4.0 KiB

//
// Created by 马也驰 on 2024/12/29.
//
#include "vlog_gc.h"
#include "../db/vlog_set.h"
#include <thread>
#include <utility>
struct executor_param {
VlogGC *vg;
size_t old_vlog_num;
size_t new_vlog_num;
};
std::mutex map_latch_;
std::unordered_map<size_t, struct executor_param> executor_params_map_;
std::unordered_map<size_t, VlogGC *> vlog_gc_map_;
void test_func(VlogGC *vlog_gc_, size_t old_vlog_num, size_t new_vlog_num) {
assert(vlog_gc_);
auto vs = vlog_gc_->get_vlog_set();
auto sp = vlog_gc_->get_slot_page();
int nums[4] = {1, 2, 3, 4};
for (int i = 0; i < 4; i++) {
nums[i] ++;
}
}
void add_executor_params(size_t gc_num, struct executor_param &ep) {
map_latch_.lock();
executor_params_map_[gc_num] = ep;
map_latch_.unlock();
}
struct executor_param get_executor_params(size_t gc_num) {
map_latch_.lock();
auto ep = executor_params_map_[gc_num];
map_latch_.unlock();
return ep;
}
void del_executor_params(size_t gc_num) {
map_latch_.lock();
executor_params_map_.erase(gc_num);
map_latch_.unlock();
}
void add_vlog_gc(size_t gc_num, VlogGC *vg) {
map_latch_.lock();
vlog_gc_map_[gc_num] = vg;
map_latch_.unlock();
}
VlogGC * get_vlog_gc(size_t gc_num) {
map_latch_.lock();
auto vg = vlog_gc_map_[gc_num];
map_latch_.unlock();
return vg;
}
void del_vlog_gc(size_t gc_num) {
map_latch_.lock();
vlog_gc_map_.erase(gc_num);
map_latch_.unlock();
}
// 函数:增加 counter
void VlogGC::gc_counter_increment() {
vlog_set->counter_latch_.lock();
++vlog_set->counter; // 增加 counter
std::cout << "Increment: Counter incremented to " << vlog_set->counter << "\n";
vlog_set->counter_latch_.unlock();
}
// 函数:减少 counter
void VlogGC::gc_counter_decrement() {
vlog_set->counter_latch_.lock();
--vlog_set->counter; // 减少 counter
std::cout << "Decrement: Counter decremented to " << vlog_set->counter << "\n";
if (vlog_set->counter == 0) {
// 如果 counter 为 0
vlog_set->finished_latch_.lock();
vlog_set->finished = true; // 设置完成标志
vlog_set->finished_latch_.unlock();
}
vlog_set->counter_latch_.unlock();
}
void VlogGC::do_gc(size_t old_vlog_num, size_t new_vlog_num) {
// 判断old_vlog_num是否正在进行gc,如果是,直接返回
if (vlog_in_gc(old_vlog_num)) {
return ;
}
// 否则将当前old_vlog_num设置为正在gc
add_vlog_in_gc(old_vlog_num);
gc_counter_increment();
size_t _gc_num_ = get_gc_num();
struct executor_param ep = {this, old_vlog_num, new_vlog_num};
add_executor_params(_gc_num_, ep);
add_vlog_gc(_gc_num_, this);
// FIXME: 线程的信息必须被保存在函数栈之外,否则函数栈销毁之后,线程会报错exc_bad_access, 这里需要有一个gc_hanlder线程一直运行并处理各个gc请求
std::thread gc_thread([_gc_num_, this]() mutable {
auto _vlog_gc_ = get_vlog_gc(_gc_num_);
assert(_vlog_gc_ != nullptr);
_vlog_gc_->exec_gc(_gc_num_);
gc_counter_decrement();
});
gc_thread.detach();
}
void VlogGC::exec_gc(size_t gc_num_) {
// FIXME: might break, due to unknown concurrency problem
curr_thread_nums_latch_.lock();
curr_thread_nums_ ++;
if (curr_thread_nums_ >= max_thread_nums_) {
full_latch_.lock();
}
curr_thread_nums_latch_.unlock();
// start gc process
auto ep = get_executor_params(gc_num_);
gc_executor::exec_gc(ep.vg, ep.old_vlog_num, ep.new_vlog_num);
// test_func(ep.vg, ep.old_vlog_num, ep.new_vlog_num);
curr_thread_nums_latch_.lock();
if (curr_thread_nums_ >= max_thread_nums_) {
full_latch_.unlock();
}
curr_thread_nums_ --;
curr_thread_nums_latch_.unlock();
std::cout << "vlog_gc.cpp line 138" << std::endl;
del_executor_params(gc_num_);
std::cout << "vlog_gc.cpp line 140" << std::endl;
del_vlog_gc(gc_num_);
std::cout << "vlog_gc.cpp line 142" << std::endl;
del_vlog_in_gc(ep.old_vlog_num);
// FIXME: dead lock here (fixed, i think)
// FIXME: remove vlog physically
vlog_set->remove_old_vlog(ep.old_vlog_num);
}