You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

214 lines
6.4 KiB

//
// Created by 马也驰 on 2024/12/29.
//
#include "vlog_gc.h"
#include "../db/vlog_set.h"
#include <thread>
#include <utility>
struct executor_param {
VlogGC *vg;
size_t old_vlog_num;
size_t new_vlog_num;
};
std::mutex map_latch_;
std::unordered_map<size_t, struct executor_param> executor_params_map_;
std::unordered_map<size_t, VlogGC *> vlog_gc_map_;
void test_func() {
int nums[4] = {1, 2, 3, 4};
for (int i = 0; i < 4; i++) {
nums[i] ++;
}
}
void add_executor_params(size_t gc_num, struct executor_param &ep) {
map_latch_.lock();
executor_params_map_[gc_num] = ep;
map_latch_.unlock();
}
struct executor_param get_executor_params(size_t gc_num) {
map_latch_.lock();
auto ep = executor_params_map_[gc_num];
map_latch_.unlock();
return ep;
}
void del_executor_params(size_t gc_num) {
map_latch_.lock();
executor_params_map_.erase(gc_num);
map_latch_.unlock();
}
void add_vlog_gc(size_t gc_num, VlogGC *vg) {
map_latch_.lock();
vlog_gc_map_[gc_num] = vg;
map_latch_.unlock();
}
VlogGC * get_vlog_gc(size_t gc_num) {
map_latch_.lock();
auto vg = vlog_gc_map_[gc_num];
map_latch_.unlock();
return vg;
}
void del_vlog_gc(size_t gc_num) {
map_latch_.lock();
vlog_gc_map_.erase(gc_num);
map_latch_.unlock();
}
// 函数:增加 counter
void VlogGC::gc_counter_increment() {
vlog_set->counter_latch_.lock();
++vlog_set->counter; // 增加 counter
std::cout << "Increment: Counter incremented to " << vlog_set->counter << "\n";
vlog_set->counter_latch_.unlock();
}
// 函数:减少 counter
void VlogGC::gc_counter_decrement() {
vlog_set->counter_latch_.lock();
--vlog_set->counter; // 减少 counter
std::cout << "Decrement: Counter decremented to " << vlog_set->counter << "\n";
if (vlog_set->counter == 0) {
// 如果 counter 为 0
vlog_set->finished_latch_.lock();
vlog_set->finished = true; // 设置完成标志
vlog_set->finished_latch_.unlock();
}
vlog_set->counter_latch_.unlock();
}
void VlogGC::do_gc(size_t old_vlog_num, size_t new_vlog_num) {
// 判断old_vlog_num是否正在进行gc,如果是,直接返回
if (vlog_in_gc(old_vlog_num)) {
return ;
}
// 否则将当前old_vlog_num设置为正在gc
add_vlog_in_gc(old_vlog_num);
gc_counter_increment();
size_t _gc_num_ = get_gc_num();
struct executor_param ep = {this, old_vlog_num, new_vlog_num};
add_executor_params(_gc_num_, ep);
add_vlog_gc(_gc_num_, this);
// FIXME: 线程的信息必须被保存在函数栈之外,否则函数栈销毁之后,线程会报错exc_bad_access, 这里需要有一个gc_hanlder线程一直运行并处理各个gc请求
std::thread gc_thread([_gc_num_]() mutable {
auto _vlog_gc_ = get_vlog_gc(_gc_num_);
assert(_vlog_gc_ != nullptr);
_vlog_gc_->exec_gc(_gc_num_);
});
gc_thread.detach();
}
void VlogGC::exec_gc(size_t gc_num_) {
// FIXME: might break, due to unknown concurrency problem
curr_thread_nums_latch_.lock();
curr_thread_nums_ ++;
if (curr_thread_nums_ >= max_thread_nums_) {
full_latch_.lock();
}
curr_thread_nums_latch_.unlock();
// start gc process
auto ep = get_executor_params(gc_num_);
// gc_executor::exec_gc(ep.vg, ep.old_vlog_num, ep.new_vlog_num);
test_func();
curr_thread_nums_latch_.lock();
if (curr_thread_nums_ >= max_thread_nums_) {
full_latch_.unlock();
}
curr_thread_nums_ --;
curr_thread_nums_latch_.unlock();
gc_counter_decrement();
std::cout << "vlog_gc.cpp line 138" << std::endl;
del_executor_params(gc_num_);
std::cout << "vlog_gc.cpp line 140" << std::endl;
del_vlog_gc(gc_num_);
std::cout << "vlog_gc.cpp line 142" << std::endl;
del_vlog_in_gc(ep.old_vlog_num);
}
// vlog: || curr_size(size_t) | value_nums(size_t) || value1 | value2 | ... |
// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
//void VlogGC::exec_gc(size_t old_vlog_num, size_t new_vlog_num) {
// vlog_set->mtx.lock();
// auto old_vlog_name = vlog_set->get_vlog_name(old_vlog_num);
// auto new_vlog_name = vlog_set->get_vlog_name(new_vlog_num);
// auto old_vlog_info = vlog_set->get_vlog_info(old_vlog_num);
// auto new_vlog_info = vlog_set->get_vlog_info(new_vlog_num);
// auto old_vlog_handler = vlog_set->get_vlog_handler(old_vlog_num);
// auto new_vlog_handler = vlog_set->get_vlog_handler(new_vlog_num);
//
// old_vlog_info->vlog_info_latch_.lock();
// old_vlog_handler->vlog_latch_.soft_lock();
// new_vlog_info->vlog_info_latch_.lock();
// new_vlog_handler->vlog_latch_.hard_lock();
// vlog_set->mtx.unlock();
//
// auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out);
// auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out);
//
// char old_vlog_buff[VLOG_SIZE];
// char new_vlog_buff[VLOG_SIZE];
// old_vlog.seekp(0);
// old_vlog.read(old_vlog_buff, VLOG_SIZE);
//
// size_t value_nums = old_vlog_info->value_nums;
// size_t ovb_off = 2 * sizeof(size_t);
// size_t nvb_off = 2 * sizeof(size_t);
// size_t new_vlog_value_nums = 0;
// for (auto i = 0; i < value_nums; i++) {
// char *value = &old_vlog_buff[ovb_off];
// uint16_t value_len = get_value_len(value);
// size_t slot_num = get_value_slotnum(value);
// if (!value_deleted(value_len)) {
// memcpy(&new_vlog_buff[nvb_off], &old_vlog_buff[ovb_off], value_len);
// memcpy(&new_vlog_buff[nvb_off+sizeof(uint16_t)], &(new_vlog_info->vlog_num), sizeof(size_t));
// struct slot_content scn(new_vlog_info->vlog_num, nvb_off);
// slot_page_->set_slot(slot_num, &scn);
// nvb_off += value_len;
// new_vlog_value_nums ++;
// }
// ovb_off += value_len;
// }
// new_vlog_info->value_nums = new_vlog_value_nums;
// new_vlog_info->curr_size = nvb_off;
// memcpy(new_vlog_buff, &nvb_off, sizeof(size_t));
// memcpy(&new_vlog_buff[sizeof(size_t)], &new_vlog_value_nums, sizeof(size_t));
// new_vlog.seekp(0);
// new_vlog.write(new_vlog_buff, VLOG_SIZE);
// new_vlog.flush();
//
// old_vlog.close();
// new_vlog.close();
//
// old_vlog_info->vlog_valid_ = false;
// vlog_set->remove_old_vlog(old_vlog_num);
//
// old_vlog_info->vlog_info_latch_.unlock();
// old_vlog_handler->vlog_latch_.soft_unlock();
// new_vlog_info->vlog_info_latch_.unlock();
// new_vlog_handler->vlog_latch_.hard_unlock();
//
//}
//