diff --git a/db/vlog_gc.cpp b/db/vlog_gc.cpp index d7bdca0..8a2072e 100644 --- a/db/vlog_gc.cpp +++ b/db/vlog_gc.cpp @@ -89,6 +89,13 @@ void VlogGC::gc_counter_decrement() { void VlogGC::do_gc(size_t old_vlog_num, size_t new_vlog_num) { + // 判断old_vlog_num是否正在进行gc,如果是,直接返回 + if (vlog_in_gc(old_vlog_num)) { + return ; + } + // 否则将当前old_vlog_num设置为正在gc + add_vlog_in_gc(old_vlog_num); + gc_counter_increment(); size_t _gc_num_ = get_gc_num(); @@ -96,12 +103,15 @@ void VlogGC::do_gc(size_t old_vlog_num, size_t new_vlog_num) { add_executor_params(_gc_num_, ep); add_vlog_gc(_gc_num_, this); + // FIXME: 线程的信息必须被保存在函数栈之外,否则函数栈销毁之后,线程会报错exc_bad_access, 这里需要有一个gc_hanlder线程一直运行并处理各个gc请求 std::thread gc_thread([_gc_num_]() mutable { auto _vlog_gc_ = get_vlog_gc(_gc_num_); assert(_vlog_gc_ != nullptr); _vlog_gc_->exec_gc(_gc_num_); }); + gc_thread.detach(); + } @@ -117,8 +127,8 @@ void VlogGC::exec_gc(size_t gc_num_) { // start gc process auto ep = get_executor_params(gc_num_); - gc_executor::exec_gc(ep.vg, ep.old_vlog_num, ep.new_vlog_num); -// test_func(); +// gc_executor::exec_gc(ep.vg, ep.old_vlog_num, ep.new_vlog_num); + test_func(); curr_thread_nums_latch_.lock(); if (curr_thread_nums_ >= max_thread_nums_) { @@ -128,8 +138,13 @@ void VlogGC::exec_gc(size_t gc_num_) { curr_thread_nums_latch_.unlock(); gc_counter_decrement(); + std::cout << "vlog_gc.cpp line 138" << std::endl; del_executor_params(gc_num_); + std::cout << "vlog_gc.cpp line 140" << std::endl; del_vlog_gc(gc_num_); + std::cout << "vlog_gc.cpp line 142" << std::endl; + + del_vlog_in_gc(ep.old_vlog_num); } // vlog: || curr_size(size_t) | value_nums(size_t) || value1 | value2 | ... | diff --git a/db/vlog_gc.h b/db/vlog_gc.h index e49c197..b4e836f 100644 --- a/db/vlog_gc.h +++ b/db/vlog_gc.h @@ -44,6 +44,7 @@ friend class gc_executor; memcpy(&slot_num, &value[sizeof(uint16_t)], sizeof(size_t)); return slot_num; } + inline size_t get_gc_num() { gc_num_latch_.lock(); size_t _gc_num_ = gc_num ++; @@ -51,6 +52,26 @@ friend class gc_executor; return _gc_num_; } + inline bool vlog_in_gc(size_t vlog_num) { + ovn_map_latch_.lock(); + bool flag = false; + if (ovn_map_.find(vlog_num) != ovn_map_.end()) { + flag = true; + } + ovn_map_latch_.unlock(); + return flag; + } + inline void add_vlog_in_gc(size_t vlog_num) { + ovn_map_latch_.lock(); + ovn_map_[vlog_num] = true; + ovn_map_latch_.unlock(); + } + inline void del_vlog_in_gc(size_t vlog_num) { + ovn_map_latch_.lock(); + ovn_map_.erase(vlog_num); + ovn_map_latch_.unlock(); + } + SlotPage *slot_page_; VlogSet *vlog_set; // 仅声明为指针,具体定义放在 vlog_gc.cpp @@ -62,6 +83,9 @@ friend class gc_executor; std::mutex gc_num_latch_; size_t gc_num; + + std::mutex ovn_map_latch_; + std::unordered_map ovn_map_; }; diff --git a/db/vlog_set.cpp b/db/vlog_set.cpp index 0f4ed8f..9657d29 100644 --- a/db/vlog_set.cpp +++ b/db/vlog_set.cpp @@ -369,7 +369,7 @@ void VlogSet::mark_del_value(const struct slot_content &sc) { vinfo->discard ++; vinfo->value_nums --; vinfo->curr_size -= value_size; - // FIXME: gc process + // FIXME: gc process, avoid repeated gc if (vlog_need_gc(sc.vlog_num) && !vinfo->processing_gc) { // create new vlog vinfo->processing_gc = true; diff --git a/test/db_test1.cc b/test/db_test1.cc index 7ce6779..5e96860 100644 --- a/test/db_test1.cc +++ b/test/db_test1.cc @@ -6,6 +6,7 @@ using namespace leveldb; int test1() { DB* db = nullptr; + Options op; op.create_if_missing = true; Status status = DB::Open(op, "testdb1", &db); diff --git a/test/db_test3.cc b/test/db_test3.cc index b5f9563..bf214bd 100644 --- a/test/db_test3.cc +++ b/test/db_test3.cc @@ -175,12 +175,12 @@ TEST(TestSchema, Basic) { std::cerr << "open db failed" << std::endl; abort(); } - std::string key = "key"; + // std::string key = "key"; std::string key0 = "k_0"; std::string key1 = "k_1"; std::string key2 = "k_2"; std::string key3 = "k_3"; - std::string value = "value"; + // std::string value = "value"; FieldArray fields0 = {{"name", "wxf"}}; FieldArray fields1 = { {"name", "Customer1"}, @@ -198,15 +198,15 @@ TEST(TestSchema, Basic) { {"address", "ecnu"}, {"phone", "11111"} }; - db->Put(writeOptions, key, value); - std::cout << "put_value: " << value << std::endl; + // db->Put(writeOptions, key, value); + // std::cout << "put_value: " << value << std::endl; db->Put_Fields(leveldb::WriteOptions(), key0, fields0); db->Put_Fields(leveldb::WriteOptions(), key1, fields1); db->Put_Fields(leveldb::WriteOptions(), key2, fields2); db->Put_Fields(leveldb::WriteOptions(), key3, fields3); - std::string value_ret; - db->Get(readOptions, key, &value_ret); - std::cout << "get_value: " << value_ret << std::endl; + // std::string value_ret; + // db->Get(readOptions, key, &value_ret); + // std::cout << "get_value: " << value_ret << std::endl; // 读取并反序列化 FieldArray fields_ret_0; FieldArray fields_ret_1; @@ -217,8 +217,8 @@ TEST(TestSchema, Basic) { // 检查反序列化结果 ASSERT_EQ(fields_ret_0.size(), fields0.size()); for (size_t i = 0; i < fields_ret_0.size(); ++i) { - ASSERT_EQ(fields_ret_0[i].name, fields1[i].name); - ASSERT_EQ(fields_ret_0[i].value, fields1[i].value); + ASSERT_EQ(fields_ret_0[i].name, fields0[i].name); + ASSERT_EQ(fields_ret_0[i].value, fields0[i].value); } ASSERT_EQ(fields_ret_1.size(), fields1.size());