From 8d45bf108bfe8f41b339591598737b54c4eb5799 Mon Sep 17 00:00:00 2001 From: xxy <3237539022@qq.com> Date: Fri, 6 Dec 2024 11:11:29 +0800 Subject: [PATCH] v3 roughly complete while mutex lacks in put --- CMakeLists.txt | 2 +- db/db_impl.cc | 322 +++++++++++++++++++++++----------------------- db/db_impl.h | 7 + include/leveldb/options.h | 3 + test/test.cpp | 6 +- 5 files changed, 174 insertions(+), 166 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 99951c3..122df31 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,7 +10,7 @@ project(leveldb VERSION 1.23.0 LANGUAGES C CXX) if(NOT CMAKE_C_STANDARD) # This project can use C11, but will gracefully decay down to C89. # 我改到17了 - set(CMAKE_C_STANDARD 11) + set(CMAKE_C_STANDARD 17) set(CMAKE_C_STANDARD_REQUIRED OFF) set(CMAKE_C_EXTENSIONS OFF) endif(NOT CMAKE_C_STANDARD) diff --git a/db/db_impl.cc b/db/db_impl.cc index 231e29c..27f8560 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -142,6 +143,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) db_lock_(nullptr), shutting_down_(false), background_work_finished_signal_(&mutex_), + background_gc_finished_signal_(&gc_mutex_), mem_(nullptr), imm_(nullptr), has_imm_(false), @@ -668,8 +670,9 @@ void DBImpl::TEST_GarbageCollect() { // Finish current background compaction in the case where // `background_work_finished_signal_` was signalled due to an error. while (background_garbage_collect_scheduled_) { - background_work_finished_signal_.Wait(); + background_gc_finished_signal_.Wait(); } + // std::cout<<"bg_signal"<Schedule(&DBImpl::BGWorkGC, this); + auto bg_thread_ = std::thread(&DBImpl::BGWorkGC, this); + bg_thread_.detach(); } - } void DBImpl::BGWorkGC(void* db) { @@ -744,7 +748,7 @@ void DBImpl::BackgroundCall() { } void DBImpl::BackgroundGarbageCollect() { - mutex_.AssertHeld(); + MutexLock l(&gc_mutex_); assert(background_garbage_collect_scheduled_); if (shutting_down_.load(std::memory_order_acquire)) { @@ -754,12 +758,15 @@ void DBImpl::BackgroundGarbageCollect() { } else { // Perform garbage collection here GarbageCollect(); + gc_mutex_.Unlock(); + } background_garbage_collect_scheduled_ = false; // Notify any waiting threads - background_work_finished_signal_.SignalAll(); + background_gc_finished_signal_.SignalAll(); + } void DBImpl::BackgroundCompaction() { @@ -1219,6 +1226,10 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, mem->Unref(); if (imm != nullptr) imm->Unref(); current->Unref(); + if(!s.ok())return s; + if(options.find_value_log_for_gc){ + return s; + } if (value->c_str()[0] == 0x00) { *value = value->substr(1); @@ -1233,7 +1244,10 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, if (!res) return Status::Corruption("can't decode file id"); res = GetVarint64(&value_log_slice, &valuelog_offset); if (!res) return Status::Corruption("can't decode valuelog offset"); - ReadValueLog(file_id, valuelog_offset, &new_key, &new_value); + s=ReadValueLog(file_id, valuelog_offset, &new_key, &new_value); + if(!s.ok()){ + return s; + } *value = std::string(new_value.data(), new_value.size()); delete[] new_value.data(); return s; @@ -1787,8 +1801,9 @@ bool IsValueLogFile(const std::string& filename) { void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id, uint64_t& offset) { // 假设 stored_value 格式为:valuelog_id|offset - std::istringstream iss(stored_value); - iss >> valuelog_id >> offset; + Slice tmp(stored_value.data(),stored_value.size()); + GetVarint64(&tmp,&valuelog_id); + GetVarint64(&tmp,&offset); } // 示例:获取 ValueLog 文件 ID @@ -1818,184 +1833,167 @@ uint64_t GetValueLogID(const std::string& valuelog_name) { // 垃圾回收实现 void DBImpl::GarbageCollect() { + gc_mutex_.AssertHeld(); // 遍历数据库目录,找到所有 valuelog 文件 Log(options_.info_log, "start gc "); auto files_set = fs::directory_iterator(dbname_); + std::set valuelog_set; + std::string cur_valuelog_name=ValueLogFileName(dbname_,valuelogfile_number_); for (const auto& cur_log_file : files_set) { if (fs::exists(cur_log_file) && - fs::is_regular_file(fs::status(cur_log_file)) && - IsValueLogFile(cur_log_file.path().filename().string())) { - std::string valuelog_name = cur_log_file.path().string(); - std::cout << valuelog_name << std::endl; - uint64_t cur_log_number = GetValueLogID(valuelog_name); - std::cout << "check point 1" << std::endl; - uint64_t new_log_number = versions_->NewFileNumber(); - WritableFile* new_valuelog = nullptr; - std::string new_valuelog_name = LogFileName(dbname_, new_log_number); - Status s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), - &new_valuelog); - if (!s.ok()) { - // Avoid chewing through file number space in a tight loop. - versions_->ReuseFileNumber(new_log_number); - break; - } - addNewValueLog(); - std::cout << "check point 2" << std::endl; - - // whether to reopen - std::ifstream new_valuelog_file(new_valuelog_name, - std::ios::in | std::ios::binary); - if (!new_valuelog_file.is_open()) { - std::cerr << "Failed to create new ValueLog file: " << new_valuelog_name - << std::endl; - continue; - } - - uint64_t current_offset = 0; - uint64_t tmp_offset = 0; + fs::is_regular_file(fs::status(cur_log_file)) && + IsValueLogFile(cur_log_file.path().filename().string())) { + if(cur_valuelog_name==cur_log_file.path().filename().string())continue; + valuelog_set.emplace(cur_log_file.path().filename().string()); + } + } + for (std::string valuelog_name:valuelog_set) { + // std::cout << valuelog_name << std::endl; + uint64_t cur_log_number = GetValueLogID(valuelog_name); + valuelog_name=ValueLogFileName(dbname_,cur_log_number); - int cnt=0; + uint64_t current_offset = 0; + uint64_t tmp_offset = 0; - std::cout << "check point 3" << std::endl; + int cnt=0; - // Open the file in binary mode for reading - std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary); - if (!cur_valuelog.is_open()) { - std::cerr << "Failed to open file: " << valuelog_name << " for reading cur_valuelog!" - << std::endl; - continue; - } - - while (true) { - tmp_offset=current_offset; - ++cnt; - std::cout << cnt <<" "< shutting_down_; port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_); + port::CondVar background_gc_finished_signal_ GUARDED_BY(gc_mutex_); + + // Slice valuelog_finding_key GUARDED_BY(mutex_ ); + MemTable* mem_; MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted std::atomic has_imm_; // So bg thread can detect non-null imm_ diff --git a/include/leveldb/options.h b/include/leveldb/options.h index d755f46..cad5032 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -157,6 +157,8 @@ struct LEVELDB_EXPORT ReadOptions { // Callers may wish to set this field to false for bulk scans. bool fill_cache = true; + bool find_value_log_for_gc = false; + // If "snapshot" is non-null, read as of the supplied snapshot // (which must belong to the DB that is being read and which must // not have been released). If "snapshot" is null, use an implicit @@ -183,6 +185,7 @@ struct LEVELDB_EXPORT WriteOptions { // with sync==true has similar crash semantics to a "write()" // system call followed by "fsync()". bool sync = false; + // bool valuelog_write=false; }; } // namespace leveldb diff --git a/test/test.cpp b/test/test.cpp index 2b7ac95..077e947 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -199,7 +199,7 @@ TEST(Test, Garbage_Collect_TEST) { abort(); } std::vector values; - for(int i=0;i<5000;i++){ + for(int i=0;i<500000;i++){ std::string key=std::to_string(i); std::string value; for(int j=0;j<1000;j++){ @@ -212,8 +212,8 @@ TEST(Test, Garbage_Collect_TEST) { db->TEST_GarbageCollect(); std::cout<<"finish gc"<Get(readOptions,key,&value);