From c912ed68b5575754aca2891cc4c9ee0a9f1e0e03 Mon Sep 17 00:00:00 2001 From: Yechi Ma <2662511702@qq.com> Date: Sat, 4 Jan 2025 15:29:31 +0800 Subject: [PATCH] solve concurrency(i think) --- db/db_impl.cc | 9 +++++- db/gc_executor.cpp | 11 ++++++-- db/vlog_gc.cpp | 83 +++++++++--------------------------------------------- db/vlog_gc.h | 38 +++++++++++++++---------- db/vlog_set.cpp | 18 ++++++++---- db/vlog_set.h | 10 ++++--- test/db_test1.cc | 1 + 7 files changed, 72 insertions(+), 98 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 7db4001..b7bec04 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1202,6 +1202,9 @@ Status DBImpl::Get_Fields(const ReadOptions& options, const Slice& key, std::string vlog_value; slot_page_->get_slot(slot_num, &sc); vlog_set_->get_value(sc, &vlog_value); + if (vlog_value.empty()) { + return Status::NotFound("value has been deleted"); + } std::cout << "value from value_log: " << key.ToString() << vlog_value << std::endl; DeserializeValue(fields, vlog_value); @@ -1224,7 +1227,11 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, vlog_set_->get_value(sc, &vlog_value); *value = vlog_value; - return s; + if (vlog_value.empty()) { + return Status::NotFound("value has been deleted"); + } + + return Status::OK(); } Iterator* DBImpl::NewIterator(const ReadOptions& options) { diff --git a/db/gc_executor.cpp b/db/gc_executor.cpp index 2091e9b..283844a 100644 --- a/db/gc_executor.cpp +++ b/db/gc_executor.cpp @@ -2,6 +2,7 @@ // Created by 马也驰 on 2025/1/3. // + #include "gc_executor.h" #include "vlog_set.h" #include "vlog_gc.h" @@ -28,8 +29,10 @@ auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out); auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out); - char old_vlog_buff[VLOG_SIZE]; - char new_vlog_buff[VLOG_SIZE]; +// char old_vlog_buff[VLOG_SIZE]; +// char new_vlog_buff[VLOG_SIZE]; + char *old_vlog_buff = static_cast(malloc(VLOG_SIZE)); + char *new_vlog_buff = static_cast(malloc(VLOG_SIZE)); old_vlog.seekp(0); old_vlog.read(old_vlog_buff, VLOG_SIZE); @@ -59,11 +62,13 @@ new_vlog.write(new_vlog_buff, VLOG_SIZE); new_vlog.flush(); + free(old_vlog_buff); + free(new_vlog_buff); + old_vlog.close(); new_vlog.close(); old_vlog_info->vlog_valid_ = false; - vlog_set->remove_old_vlog(old_vlog_num); old_vlog_info->vlog_info_latch_.unlock(); old_vlog_handler->vlog_latch_.soft_unlock(); diff --git a/db/vlog_gc.cpp b/db/vlog_gc.cpp index 8a2072e..0fe1eb0 100644 --- a/db/vlog_gc.cpp +++ b/db/vlog_gc.cpp @@ -19,7 +19,10 @@ std::unordered_map executor_params_map_; std::unordered_map vlog_gc_map_; -void test_func() { +void test_func(VlogGC *vlog_gc_, size_t old_vlog_num, size_t new_vlog_num) { + assert(vlog_gc_); + auto vs = vlog_gc_->get_vlog_set(); + auto sp = vlog_gc_->get_slot_page(); int nums[4] = {1, 2, 3, 4}; for (int i = 0; i < 4; i++) { nums[i] ++; @@ -64,6 +67,7 @@ void del_vlog_gc(size_t gc_num) { map_latch_.unlock(); } + // 函数:增加 counter void VlogGC::gc_counter_increment() { vlog_set->counter_latch_.lock(); @@ -104,10 +108,11 @@ void VlogGC::do_gc(size_t old_vlog_num, size_t new_vlog_num) { add_vlog_gc(_gc_num_, this); // FIXME: 线程的信息必须被保存在函数栈之外,否则函数栈销毁之后,线程会报错exc_bad_access, 这里需要有一个gc_hanlder线程一直运行并处理各个gc请求 - std::thread gc_thread([_gc_num_]() mutable { + std::thread gc_thread([_gc_num_, this]() mutable { auto _vlog_gc_ = get_vlog_gc(_gc_num_); assert(_vlog_gc_ != nullptr); _vlog_gc_->exec_gc(_gc_num_); + gc_counter_decrement(); }); gc_thread.detach(); @@ -127,8 +132,8 @@ void VlogGC::exec_gc(size_t gc_num_) { // start gc process auto ep = get_executor_params(gc_num_); -// gc_executor::exec_gc(ep.vg, ep.old_vlog_num, ep.new_vlog_num); - test_func(); + gc_executor::exec_gc(ep.vg, ep.old_vlog_num, ep.new_vlog_num); +// test_func(ep.vg, ep.old_vlog_num, ep.new_vlog_num); curr_thread_nums_latch_.lock(); if (curr_thread_nums_ >= max_thread_nums_) { @@ -137,7 +142,6 @@ void VlogGC::exec_gc(size_t gc_num_) { curr_thread_nums_ --; curr_thread_nums_latch_.unlock(); - gc_counter_decrement(); std::cout << "vlog_gc.cpp line 138" << std::endl; del_executor_params(gc_num_); std::cout << "vlog_gc.cpp line 140" << std::endl; @@ -145,70 +149,9 @@ void VlogGC::exec_gc(size_t gc_num_) { std::cout << "vlog_gc.cpp line 142" << std::endl; del_vlog_in_gc(ep.old_vlog_num); + + // FIXME: dead lock here (fixed, i think) + // FIXME: remove vlog physically + vlog_set->remove_old_vlog(ep.old_vlog_num); } -// vlog: || curr_size(size_t) | value_nums(size_t) || value1 | value2 | ... | -// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } | -// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 | -//void VlogGC::exec_gc(size_t old_vlog_num, size_t new_vlog_num) { -// vlog_set->mtx.lock(); -// auto old_vlog_name = vlog_set->get_vlog_name(old_vlog_num); -// auto new_vlog_name = vlog_set->get_vlog_name(new_vlog_num); -// auto old_vlog_info = vlog_set->get_vlog_info(old_vlog_num); -// auto new_vlog_info = vlog_set->get_vlog_info(new_vlog_num); -// auto old_vlog_handler = vlog_set->get_vlog_handler(old_vlog_num); -// auto new_vlog_handler = vlog_set->get_vlog_handler(new_vlog_num); -// -// old_vlog_info->vlog_info_latch_.lock(); -// old_vlog_handler->vlog_latch_.soft_lock(); -// new_vlog_info->vlog_info_latch_.lock(); -// new_vlog_handler->vlog_latch_.hard_lock(); -// vlog_set->mtx.unlock(); -// -// auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out); -// auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out); -// -// char old_vlog_buff[VLOG_SIZE]; -// char new_vlog_buff[VLOG_SIZE]; -// old_vlog.seekp(0); -// old_vlog.read(old_vlog_buff, VLOG_SIZE); -// -// size_t value_nums = old_vlog_info->value_nums; -// size_t ovb_off = 2 * sizeof(size_t); -// size_t nvb_off = 2 * sizeof(size_t); -// size_t new_vlog_value_nums = 0; -// for (auto i = 0; i < value_nums; i++) { -// char *value = &old_vlog_buff[ovb_off]; -// uint16_t value_len = get_value_len(value); -// size_t slot_num = get_value_slotnum(value); -// if (!value_deleted(value_len)) { -// memcpy(&new_vlog_buff[nvb_off], &old_vlog_buff[ovb_off], value_len); -// memcpy(&new_vlog_buff[nvb_off+sizeof(uint16_t)], &(new_vlog_info->vlog_num), sizeof(size_t)); -// struct slot_content scn(new_vlog_info->vlog_num, nvb_off); -// slot_page_->set_slot(slot_num, &scn); -// nvb_off += value_len; -// new_vlog_value_nums ++; -// } -// ovb_off += value_len; -// } -// new_vlog_info->value_nums = new_vlog_value_nums; -// new_vlog_info->curr_size = nvb_off; -// memcpy(new_vlog_buff, &nvb_off, sizeof(size_t)); -// memcpy(&new_vlog_buff[sizeof(size_t)], &new_vlog_value_nums, sizeof(size_t)); -// new_vlog.seekp(0); -// new_vlog.write(new_vlog_buff, VLOG_SIZE); -// new_vlog.flush(); -// -// old_vlog.close(); -// new_vlog.close(); -// -// old_vlog_info->vlog_valid_ = false; -// vlog_set->remove_old_vlog(old_vlog_num); -// -// old_vlog_info->vlog_info_latch_.unlock(); -// old_vlog_handler->vlog_latch_.soft_unlock(); -// new_vlog_info->vlog_info_latch_.unlock(); -// new_vlog_handler->vlog_latch_.hard_unlock(); -// -//} -// diff --git a/db/vlog_gc.h b/db/vlog_gc.h index 8bd8b34..51b345b 100644 --- a/db/vlog_gc.h +++ b/db/vlog_gc.h @@ -35,28 +35,34 @@ friend class gc_executor; break; } } + std::cout << "vlog_gc has been deleted!" << std::endl; } void do_gc(size_t old_vlog_num, size_t new_vlog_num); + SlotPage *get_slot_page() { return slot_page_; } + VlogSet *get_vlog_set() { return vlog_set; } + + private: void exec_gc(size_t gc_num_); +// static void gc_process(VlogGC *vlog_gc_, size_t old_vlog_num, size_t new_vlog_num); void gc_counter_increment(); void gc_counter_decrement(); - inline bool value_deleted(uint16_t value_len) { - return !(value_len >> 15); - } - inline uint16_t get_value_len(char *value) { - uint16_t value_len; - memcpy(&value_len, value, sizeof(uint16_t)); - return value_len; - } - inline size_t get_value_slotnum(char *value) { - size_t slot_num; - memcpy(&slot_num, &value[sizeof(uint16_t)], sizeof(size_t)); - return slot_num; - } +// static inline bool value_deleted(uint16_t value_len) { +// return !(value_len >> 15); +// } +// static inline uint16_t get_value_len(char *value) { +// uint16_t value_len; +// memcpy(&value_len, value, sizeof(uint16_t)); +// return value_len; +// } +// static inline size_t get_value_slotnum(char *value) { +// size_t slot_num; +// memcpy(&slot_num, &value[sizeof(uint16_t)], sizeof(size_t)); +// return slot_num; +// } inline size_t get_gc_num() { gc_num_latch_.lock(); @@ -85,6 +91,7 @@ friend class gc_executor; ovn_map_latch_.unlock(); } + private: SlotPage *slot_page_; VlogSet *vlog_set; // 仅声明为指针,具体定义放在 vlog_gc.cpp @@ -94,11 +101,12 @@ friend class gc_executor; std::mutex curr_thread_nums_latch_; size_t curr_thread_nums_; + // 避免重复gc std::mutex gc_num_latch_; - size_t gc_num; + size_t gc_num; // gc线程id,不断增长,方便从全局map中获取gc参数等信息 std::mutex ovn_map_latch_; - std::unordered_map ovn_map_; + std::unordered_map ovn_map_; // 表明某个vlog是否正在进行gc }; diff --git a/db/vlog_set.cpp b/db/vlog_set.cpp index c1612c2..31602bd 100644 --- a/db/vlog_set.cpp +++ b/db/vlog_set.cpp @@ -116,8 +116,8 @@ void VlogSet::get_value(const struct slot_content &sc, std::string *value) { } struct vlog_info * VlogSet::get_writable_vlog_info(size_t value_size) { - for (auto it = vlog_info_map_.begin(); it != vlog_info_map_.end(); ++it) { - struct vlog_info* vinfo = it->second; + for (auto & it : vlog_info_map_) { + struct vlog_info* vinfo = it.second; // 使用 vlog_info* 进行操作 vinfo->vlog_info_latch_.lock(); @@ -175,7 +175,7 @@ void VlogSet::del_value(const struct slot_content &sc) { } vhandler->vlog_latch_.hard_lock(); - vhandler->decre_access_thread_nums(); // FIXME: increase thread nums + vhandler->incre_access_thread_nums(); // FIXME: increase thread nums mtx.unlock(); // for better performance vinfo->vlog_info_latch_.unlock(); mark_del_value(sc); @@ -200,10 +200,18 @@ void VlogSet::remove_old_vlog(size_t old_vlog_num) { // new vlog should have been created here mtx.lock(); auto vi_old = get_vlog_info(old_vlog_num); + auto vh_old = get_vlog_handler(old_vlog_num); + vi_old->vlog_info_latch_.lock(); + // FIXME: dead lock + while (!vh_old->non_access_thread()) { + std::this_thread::sleep_for(std::chrono::milliseconds(2)); + std::cout << "waiting in remove_old_vlog" << std::endl; + } + std::string old_vlog_name = get_vlog_name(old_vlog_num); remove_from_config_file(old_vlog_num); -// remove_vlog_from_maps(old_vlog_name); // FIXME: this function should be called after all access on this vlog finished !!! + remove_vlog_from_maps(old_vlog_name); // FIXME: this function should be called after all access on this vlog finished !!! vi_old->vlog_valid_ = false; vi_old->vlog_info_latch_.unlock(); mtx.unlock(); @@ -281,7 +289,7 @@ inline void VlogSet::register_vlog_inmaps(size_t vlog_num, std::string &vlog_nam vlog_handler_map_[vlog_name] = vhandler; } -inline void VlogSet::remove_vlog_from_maps(std::string &vlog_name) { +void VlogSet::remove_vlog_from_maps(std::string &vlog_name) { assert(!std::remove(vlog_name.c_str())); vlog_handler_map_.erase(vlog_name); } diff --git a/db/vlog_set.h b/db/vlog_set.h index 249bbd6..796d7c8 100644 --- a/db/vlog_set.h +++ b/db/vlog_set.h @@ -23,7 +23,8 @@ friend class gc_executor; #define CONFIG_FILE_DELE_MASK (0x1 << (sizeof(size_t)-1)) #define CONFIG_FILE_VLOG_NUM(v) ((v) & ~CONFIG_FILE_DELE_MASK) -#define VLOG_GC_THREHOLD 0.8 +#define VLOG_GC_THREHOLD 0.5 + public: VlogSet(std::string dbname, VlogGC *vlog_gc); ~VlogSet(); @@ -33,6 +34,7 @@ friend class gc_executor; void set_vlog_gc(VlogGC *vg) { this->vlog_gc = vg; } + private: size_t register_new_vlog(); void remove_old_vlog(size_t old_vlog_num); @@ -44,7 +46,7 @@ friend class gc_executor; struct vlog_info *get_writable_vlog_info(size_t value_size); inline void restore_vlog_inmaps(struct vlog_info *vi); inline void register_vlog_inmaps(size_t vlog_num, std::string &vlog_name); - inline void remove_vlog_from_maps(std::string &vlog_name); + void remove_vlog_from_maps(std::string &vlog_name); inline std::string get_config_file_name(); std::string get_vlog_name(size_t vlog_num); struct vlog_info *get_vlog_info(size_t vlog_num); @@ -62,10 +64,10 @@ friend class gc_executor; std::mutex config_file_latch_; std::fstream *config_file_; - int counter = 0; + int counter = 0; // 表明当前有多少个gc线程正在进行 std::mutex counter_latch_; std::mutex finished_latch_; - bool finished = true; + bool finished = true; // 表明当前是否所有gc线程都已结束 VlogGC *vlog_gc; // 仅声明为指针,具体定义放在 vlog_set.cpp }; diff --git a/test/db_test1.cc b/test/db_test1.cc index 060110d..a1039b9 100644 --- a/test/db_test1.cc +++ b/test/db_test1.cc @@ -62,6 +62,7 @@ int test2() { } delete db; + std::cout << "db has been deleted!" << std::endl; return 0; }