From 9da542dfe190975084445b3d21c3d56da2cb07ff Mon Sep 17 00:00:00 2001 From: xxy <3237539022@qq.com> Date: Thu, 5 Dec 2024 11:50:39 +0800 Subject: [PATCH] gc dead --- db/db_impl.cc | 151 +++++++++++++++++++++++++++++++++++++++++++-------- db/db_impl.h | 3 + include/leveldb/db.h | 2 + test/test.cpp | 43 +++++++++++++-- 4 files changed, 172 insertions(+), 27 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 87664b4..cb52517 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -663,6 +663,15 @@ Status DBImpl::TEST_CompactMemTable() { return s; } +void DBImpl::TEST_GarbageCollect() { + MaybeScheduleGarbageCollect(); + // Finish current background compaction in the case where + // `background_work_finished_signal_` was signalled due to an error. + while (background_garbage_collect_scheduled_) { + background_work_finished_signal_.Wait(); + } +} + void DBImpl::RecordBackgroundError(const Status& s) { mutex_.AssertHeld(); if (bg_error_.ok()) { @@ -725,8 +734,8 @@ void DBImpl::BackgroundCall() { background_compaction_scheduled_ = false; - // Check if garbage collection needs to be scheduled after compaction - MaybeScheduleGarbageCollect(); + // // Check if garbage collection needs to be scheduled after compaction + // MaybeScheduleGarbageCollect(); // Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed. @@ -735,7 +744,7 @@ void DBImpl::BackgroundCall() { } void DBImpl::BackgroundGarbageCollect() { - MutexLock l(&mutex_); + mutex_.AssertHeld(); assert(background_garbage_collect_scheduled_); if (shutting_down_.load(std::memory_order_acquire)) { @@ -1767,10 +1776,13 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, // 判断文件是否为 valuelog 文件 bool IsValueLogFile(const std::string& filename) { - return filename.find("valuelog_") == - 0; // 简单判断文件名是否匹配 valuelog 前缀 + // 检查文件是否以 ".valuelog" 结尾 + const std::string suffix = ".valuelog"; + return filename.size() > suffix.size() && + filename.substr(filename.size() - suffix.size()) == suffix; } + // 示例:解析 sstable 中的元信息 void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id, uint64_t& offset) { @@ -1781,14 +1793,33 @@ void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id, // 示例:获取 ValueLog 文件 ID uint64_t GetValueLogID(const std::string& valuelog_name) { - // 假设文件名中包含唯一的 ID,例如 "valuelog_1" - auto pos = valuelog_name.find_last_of('_'); - return std::stoull(valuelog_name.substr(pos + 1)); + // 使用 std::filesystem::path 解析文件名 + std::filesystem::path file_path(valuelog_name); + std::string filename = file_path.filename().string(); // 获取文件名部分 + + // 查找文件名中的 '.' 位置,提取数字部分 + auto pos = filename.find('.'); + if (pos == std::string::npos) { + assert(0); + } + + // 提取数字部分 + std::string id_str = filename.substr(0, pos); + // 检查提取的部分是否为有效数字 + for (char c : id_str) { + if (!isdigit(c)) { + assert(0); + } + } + + return std::stoull(id_str); // 转换为 uint64_t } + // 垃圾回收实现 void DBImpl::GarbageCollect() { // 遍历数据库目录,找到所有 valuelog 文件 + Log(options_.info_log, "start gc "); auto files_set = fs::directory_iterator(dbname_); for (const auto& cur_log_file : files_set) { if (fs::exists(cur_log_file) && @@ -1796,6 +1827,7 @@ void DBImpl::GarbageCollect() { IsValueLogFile(cur_log_file.path().filename().string())) { std::string valuelog_name = cur_log_file.path().string(); uint64_t cur_log_number = GetValueLogID(valuelog_name); + std::cout << "check point 1" << std::endl; uint64_t new_log_number = versions_->NewFileNumber(); WritableFile* new_valuelog = nullptr; std::string new_valuelog_name = LogFileName(dbname_, new_log_number); @@ -1807,13 +1839,7 @@ void DBImpl::GarbageCollect() { break; } addNewValueLog(); - - std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary); - if (!cur_valuelog.is_open()) { - std::cerr << "Failed to open ValueLog file: " << valuelog_name - << std::endl; - continue; - } + std::cout << "check point 2" << std::endl; // whether to reopen std::ifstream new_valuelog_file(new_valuelog_name, @@ -1825,16 +1851,96 @@ void DBImpl::GarbageCollect() { } uint64_t current_offset = 0; - uint64_t new_offset = 0; // 新的 ValueLog 偏移 + int cnt=0; + + std::cout << "check point 3" << std::endl; + + // Open the file in binary mode for reading + std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary); + if (!cur_valuelog.is_open()) { + std::cerr << "Failed to open file: " << valuelog_name << " for reading!" + << std::endl; + continue; + } while (true) { + ++cnt; + std::cout << cnt << std::endl; + // 读取一个 kv 对 uint64_t key_len, value_len; Slice key, value; - Slice new_value; - ReadValueLog(cur_log_number, current_offset, &key, &value); - value = std::string(new_value.data(), new_value.size()); + Status s = Status::OK(); + + // Seek to the position of key length + cur_valuelog.seekg(current_offset); + + // Read the length of the key + char* key_buf_len = new char[sizeof(uint64_t)]; + cur_valuelog.read(key_buf_len, sizeof(uint64_t)); + + if (cur_valuelog.eof()) { + delete[] key_buf_len; + break; // 正常退出条件:到达文件末尾 + } + + std::memcpy(&key_len, key_buf_len, sizeof(uint64_t)); + + if (!cur_valuelog.good()) { + delete[] key_buf_len; + cur_valuelog.close(); + std::cerr << "Failed to read file: " << valuelog_name << std::endl; + break; + } + + // Now seek to the actual key position and read the key + cur_valuelog.seekg(current_offset + sizeof(uint64_t)); + char* key_buf = new char[key_len]; + cur_valuelog.read(key_buf, key_len); + if (!cur_valuelog.good()) { + delete[] key_buf; + delete[] key_buf_len; + cur_valuelog.close(); + std::cerr << "Failed to read file: " << valuelog_name << std::endl; + break; + } + + // Assign the read key data to the Slice + key = Slice(key_buf, key_len); + + // Read the length of the value + cur_valuelog.seekg(current_offset + sizeof(uint64_t) + key_len); + char* value_buf_len = new char[sizeof(uint64_t)]; + cur_valuelog.read(value_buf_len, sizeof(uint64_t)); + uint64_t val_len = 0; + std::memcpy(&val_len, value_buf_len, sizeof(uint64_t)); + + if (!cur_valuelog.good()) { + delete[] key_buf; + delete[] key_buf_len; + delete[] value_buf_len; + cur_valuelog.close(); + std::cerr << "Failed to read file: " << valuelog_name << std::endl; + break; + } + + // Now seek to the actual data position and read the value + cur_valuelog.seekg(current_offset + sizeof(uint64_t) + key_len + sizeof(uint64_t)); + char* value_buf = new char[val_len]; + cur_valuelog.read(value_buf, val_len); + if (!cur_valuelog.good()) { + delete[] key_buf; + delete[] key_buf_len; + delete[] value_buf_len; + delete[] value_buf; + cur_valuelog.close(); + std::cerr << "Failed to read file: " << valuelog_name << std::endl; + break; + } + + // Assign the read value data to the Slice + value = Slice(value_buf, val_len); // 检查 key 是否在 sstable 中存在 std::string stored_value; @@ -1868,10 +1974,6 @@ void DBImpl::GarbageCollect() { continue; } - // 更新偏移 - new_offset += - sizeof(key_len) + key.size() + sizeof(value_len) + value.size(); - // 更新当前偏移 current_offset += sizeof(key_len) + key.size() + sizeof(value_len) + value.size(); @@ -1882,6 +1984,9 @@ void DBImpl::GarbageCollect() { new_valuelog_file.close(); std::remove(valuelog_name.c_str()); // 删除旧的 ValueLog 文件 + Log(options_.info_log, "remove file during gc %s", valuelog_name.c_str()); + Log(options_.info_log, "add file during gc %s", new_valuelog_name.c_str()); + } } } diff --git a/db/db_impl.h b/db/db_impl.h index 2c2d100..0f44458 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -86,6 +86,9 @@ class DBImpl : public DB { // Force current memtable contents to be compacted. Status TEST_CompactMemTable(); + void TEST_GarbageCollect() override; + + // Return an internal iterator over the current state of the database. // The keys of this iterator are internal keys (see format.h). // The returned iterator should be deleted when no longer needed. diff --git a/include/leveldb/db.h b/include/leveldb/db.h index a72a6db..a788c60 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -183,6 +183,8 @@ class LEVELDB_EXPORT DB { // Therefore the following call will compact the entire database: // db->CompactRange(nullptr, nullptr); virtual void CompactRange(const Slice* begin, const Slice* end) = 0; + virtual void TEST_GarbageCollect() = 0; + }; // Destroy the contents of the specified database. diff --git a/test/test.cpp b/test/test.cpp index 6e523ba..8dc25f6 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -158,7 +158,39 @@ Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,st // delete db; // } -TEST(Test, LARGE_DATA_COMPACT_TEST) { +// TEST(Test, LARGE_DATA_COMPACT_TEST) { +// DB *db; +// WriteOptions writeOptions; +// ReadOptions readOptions; +// if(OpenDB("testdb_for_XOY_large", &db).ok() == false) { +// std::cerr << "open db failed" << std::endl; +// abort(); +// } +// std::vector values; +// for(int i=0;i<500000;i++){ +// std::string key=std::to_string(i); +// std::string value; +// for(int j=0;j<1000;j++){ +// value+=std::to_string(i); +// } +// values.push_back(value); +// db->Put(writeOptions,key,value); +// } +// for(int i=0;i<500000;i++){ +// std::string key=std::to_string(i); +// std::string value; +// Status s=db->Get(readOptions,key,&value); +// assert(s.ok()); +// if(values[i]!=value){ +// std::cout< values; - for(int i=0;i<5000;i++){ + for(int i=0;i<500000;i++){ std::string key=std::to_string(i); std::string value; for(int j=0;j<1000;j++){ @@ -176,7 +208,11 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) { values.push_back(value); db->Put(writeOptions,key,value); } - for(int i=0;i<5000;i++){ + std::cout<<"start gc"<TEST_GarbageCollect(); + std::cout<<"finish gc"<Get(readOptions,key,&value); @@ -191,7 +227,6 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) { } - int main(int argc, char** argv) { // All tests currently run with the same read-only file limits. testing::InitGoogleTest(&argc, argv);