// // Created by 马也驰 on 2024/12/29. // #include "vlog_gc.h" #include "../db/vlog_set.h" #include #include struct executor_param { VlogGC *vg; size_t old_vlog_num; size_t new_vlog_num; }; std::mutex map_latch_; std::unordered_map executor_params_map_; std::unordered_map vlog_gc_map_; void test_func(VlogGC *vlog_gc_, size_t old_vlog_num, size_t new_vlog_num) { assert(vlog_gc_); auto vs = vlog_gc_->get_vlog_set(); auto sp = vlog_gc_->get_slot_page(); int nums[4] = {1, 2, 3, 4}; for (int i = 0; i < 4; i++) { nums[i] ++; } } void add_executor_params(size_t gc_num, struct executor_param &ep) { map_latch_.lock(); executor_params_map_[gc_num] = ep; map_latch_.unlock(); } struct executor_param get_executor_params(size_t gc_num) { map_latch_.lock(); auto ep = executor_params_map_[gc_num]; map_latch_.unlock(); return ep; } void del_executor_params(size_t gc_num) { map_latch_.lock(); executor_params_map_.erase(gc_num); map_latch_.unlock(); } void add_vlog_gc(size_t gc_num, VlogGC *vg) { map_latch_.lock(); vlog_gc_map_[gc_num] = vg; map_latch_.unlock(); } VlogGC * get_vlog_gc(size_t gc_num) { map_latch_.lock(); auto vg = vlog_gc_map_[gc_num]; map_latch_.unlock(); return vg; } void del_vlog_gc(size_t gc_num) { map_latch_.lock(); vlog_gc_map_.erase(gc_num); map_latch_.unlock(); } // 函数:增加 counter void VlogGC::gc_counter_increment() { vlog_set->counter_latch_.lock(); ++vlog_set->counter; // 增加 counter std::cout << "Increment: Counter incremented to " << vlog_set->counter << "\n"; vlog_set->counter_latch_.unlock(); } // 函数:减少 counter void VlogGC::gc_counter_decrement() { vlog_set->counter_latch_.lock(); --vlog_set->counter; // 减少 counter std::cout << "Decrement: Counter decremented to " << vlog_set->counter << "\n"; if (vlog_set->counter == 0) { // 如果 counter 为 0 vlog_set->finished_latch_.lock(); vlog_set->finished = true; // 设置完成标志 vlog_set->finished_latch_.unlock(); } vlog_set->counter_latch_.unlock(); } void VlogGC::do_gc(size_t old_vlog_num, size_t new_vlog_num) { // 判断old_vlog_num是否正在进行gc,如果是,直接返回 if (vlog_in_gc(old_vlog_num)) { return ; } // 否则将当前old_vlog_num设置为正在gc add_vlog_in_gc(old_vlog_num); gc_counter_increment(); size_t _gc_num_ = get_gc_num(); struct executor_param ep = {this, old_vlog_num, new_vlog_num}; add_executor_params(_gc_num_, ep); add_vlog_gc(_gc_num_, this); // FIXME: 线程的信息必须被保存在函数栈之外,否则函数栈销毁之后,线程会报错exc_bad_access, 这里需要有一个gc_hanlder线程一直运行并处理各个gc请求 std::thread gc_thread([_gc_num_, this]() mutable { auto _vlog_gc_ = get_vlog_gc(_gc_num_); assert(_vlog_gc_ != nullptr); _vlog_gc_->exec_gc(_gc_num_); gc_counter_decrement(); }); gc_thread.detach(); } void VlogGC::exec_gc(size_t gc_num_) { // FIXME: might break, due to unknown concurrency problem curr_thread_nums_latch_.lock(); curr_thread_nums_ ++; if (curr_thread_nums_ >= max_thread_nums_) { full_latch_.lock(); } curr_thread_nums_latch_.unlock(); // start gc process auto ep = get_executor_params(gc_num_); gc_executor::exec_gc(ep.vg, ep.old_vlog_num, ep.new_vlog_num); // test_func(ep.vg, ep.old_vlog_num, ep.new_vlog_num); curr_thread_nums_latch_.lock(); if (curr_thread_nums_ >= max_thread_nums_) { full_latch_.unlock(); } curr_thread_nums_ --; curr_thread_nums_latch_.unlock(); // std::cout << "vlog_gc.cpp line 138" << std::endl; del_executor_params(gc_num_); // std::cout << "vlog_gc.cpp line 140" << std::endl; del_vlog_gc(gc_num_); // std::cout << "vlog_gc.cpp line 142" << std::endl; del_vlog_in_gc(ep.old_vlog_num); // FIXME: dead lock here (fixed, i think) // FIXME: remove vlog physically vlog_set->remove_old_vlog(ep.old_vlog_num); }