From 19a9a1204a5a91a5e4d74dbd3f5c677053bf1161 Mon Sep 17 00:00:00 2001 From: xxy <3237539022@qq.com> Date: Thu, 12 Dec 2024 22:23:18 +0800 Subject: [PATCH 1/3] add fize data size sign --- db/db_impl.cc | 93 ++++++++++++++++++++++++++++++++++++++++++----------------- test/test.cpp | 8 ++--- 2 files changed, 70 insertions(+), 31 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 45c7a4b..16687f8 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1249,6 +1249,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, if (!res) return Status::Corruption("can't decode file id"); res = GetVarint64(&value_log_slice, &valuelog_offset); if (!res) return Status::Corruption("can't decode valuelog offset"); + // std::cout<<"file_id: "<> DBImpl::WriteValueLog( std::vector> kv) { std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); - std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); + std::fstream valueFile(file_name_, std::ios::in | std::ios::out | std::ios::binary); if (!valueFile.is_open()) { assert(0); } + valueFile.seekg(0, std::ios::end); // 移动到文件末尾 + uint64_t offset = valueFile.tellg(); - uint64_t offset = valueFile.tellp(); - + // 如果超出fixed_size if(offset>=config::value_log_size){ addNewValueLog(); valueFile.close(); file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); - valueFile =std::ofstream(file_name_, std::ios::app | std::ios::binary); + valueFile =std::fstream(file_name_, std::ios::in | std::ios::out | std::ios::binary); if (!valueFile.is_open()) { assert(0); } - offset = valueFile.tellp(); - + valueFile.seekg(0, std::ios::end); // 移动到文件末尾 + offset = valueFile.tellg(); } + + uint64_t file_data_size = 0; // 文件数据大小标志位 + valueFile.seekg(0, std::ios::beg); + valueFile.read(reinterpret_cast(&file_data_size), sizeof(uint64_t)); + valueFile.clear(); // 清除错误状态 + valueFile.seekp(0, std::ios::end); // 返回文件末尾准备写入 + // std::cout<<"file_data_size: "<> res; for (const auto& [key_slice, value_slice] : kv) { @@ -1640,15 +1650,30 @@ std::vector> DBImpl::WriteValueLog( assert(0); } + // 更新文件数据大小 + file_data_size += sizeof(uint64_t) + key_len + sizeof(uint64_t) + value_len; // 记录 file_id 和 offset res.push_back({valuelogfile_number_, offset}); - // 更新偏移量 offset += sizeof(uint64_t) + key_len + sizeof(uint64_t) + value_len; } - // 解锁资源或进行其他清理操作 + // 在所有数据写入后,将更新的数据大小写回文件开头 + if (!res.empty()) { + valueFile.seekp(0, std::ios::beg); // 移动到文件开头 + valueFile.write(reinterpret_cast(&file_data_size), sizeof(uint64_t)); + if (!valueFile.good()) { + valueFile.close(); + assert(0); + } + } + else{ + valueFile.close(); + assert(0); + } + // 解锁资源或进行其他清理操作 + valueFile.flush(); // 确保所有缓冲区的数据都被写入文件 valueFile.close(); return res; } @@ -1656,6 +1681,28 @@ std::vector> DBImpl::WriteValueLog( void DBImpl::addNewValueLog() { valuelogfile_number_ = versions_->NewFileNumber(); + + std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); + std::fstream valueFile(file_name_, std::ios::app | std::ios::binary); + if (!valueFile.is_open()) { + assert(0); + } + uint64_t file_data_size = 0; // 新增的文件数据大小标志位 + if (valueFile.tellp() != 0) { + assert(0); + } + else{ + valueFile.write(reinterpret_cast(&file_data_size), sizeof(uint64_t)); + if (!valueFile.good()) { + valueFile.close(); + assert(0); + } + else{ + // 正常关闭文件 + valueFile.flush(); // 确保所有缓冲区的数据都被写入文件 + valueFile.close(); + } + } } Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, @@ -1769,8 +1816,9 @@ void DBImpl::GarbageCollect() { continue; } - uint64_t current_offset = 0; - uint64_t tmp_offset = 0; + // 初始化offset为占用大小 + uint64_t current_offset = sizeof(uint64_t); + uint64_t tmp_offset = current_offset; int cnt = 0; @@ -1810,11 +1858,12 @@ void DBImpl::GarbageCollect() { if (!cur_valuelog.good()) { delete[] key_buf_len; cur_valuelog.close(); - std::cerr << "Failed to read file: " << valuelog_name << std::endl; + std::cerr << "1Failed to read file: " << valuelog_name << std::endl; break; } // 更新当前偏移 current_offset += sizeof(uint64_t); + // std::cout << cnt <<" "< values; - for(int i=0;i<500000;i++){ + for(int i=0;i<50000;i++){ std::string key=std::to_string(i); std::string value; for(int j=0;j<5000;j++){ @@ -106,7 +106,7 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) { values.push_back(value); db->Put(writeOptions,key,value); } - for(int i=0;i<500000;i++){ + for(int i=0;i<50000;i++){ std::string key=std::to_string(i); std::string value; Status s=db->Get(readOptions,key,&value); From b1e59a336b5cecf1970b1296846d92c36f5c7f1b Mon Sep 17 00:00:00 2001 From: xxy <3237539022@qq.com> Date: Thu, 12 Dec 2024 23:12:49 +0800 Subject: [PATCH 2/3] value ahead of key --- db/db_impl.cc | 130 +++++++++++++++++++-------------------------------- db/db_impl.h | 2 +- db/db_iter.cc | 2 +- include/leveldb/db.h | 2 +- 4 files changed, 52 insertions(+), 84 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 16687f8..1434df4 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1241,7 +1241,6 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, return s; } Slice value_log_slice = Slice(value->c_str() + 1, value->length()); - Slice new_key; Slice new_value; int value_offset = sizeof(uint64_t) * 2; // 16 uint64_t file_id, valuelog_offset; @@ -1250,7 +1249,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, res = GetVarint64(&value_log_slice, &valuelog_offset); if (!res) return Status::Corruption("can't decode valuelog offset"); // std::cout<<"file_id: "<> DBImpl::WriteValueLog( std::vector> res; for (const auto& [key_slice, value_slice] : kv) { - // 写入 key 的长度 - uint64_t key_len = key_slice.size(); - valueFile.write(reinterpret_cast(&key_len), sizeof(uint64_t)); + + + // 写入 value 的长度 + uint64_t value_len = value_slice.size(); + valueFile.write(reinterpret_cast(&value_len), + sizeof(uint64_t)); if (!valueFile.good()) { valueFile.close(); assert(0); } - // 写入 key 本身 - valueFile.write(key_slice.data(), key_len); + // 写入 value 本身 + valueFile.write(value_slice.data(), value_len); if (!valueFile.good()) { valueFile.close(); assert(0); } - - // 写入 value 的长度 - uint64_t value_len = value_slice.size(); - valueFile.write(reinterpret_cast(&value_len), - sizeof(uint64_t)); + + // 写入 key 的长度 + uint64_t key_len = key_slice.size(); + valueFile.write(reinterpret_cast(&key_len), sizeof(uint64_t)); if (!valueFile.good()) { valueFile.close(); assert(0); } - // 写入 value 本身 - valueFile.write(value_slice.data(), value_len); + // 写入 key 本身 + valueFile.write(key_slice.data(), key_len); if (!valueFile.good()) { valueFile.close(); assert(0); @@ -1705,7 +1706,7 @@ void DBImpl::addNewValueLog() { } } -Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, +Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* value) { Status s = Status::OK(); std::string file_name_ = ValueLogFileName(dbname_, file_id); @@ -1718,63 +1719,33 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, return Status::Corruption("Failed to open file for reading!"); } - // Seek to the position of key length - inFile.seekg(offset); - - // Read the length of the key - char* key_buf_len = new char[sizeof(uint64_t)]; - inFile.read(key_buf_len, sizeof(uint64_t)); - uint64_t key_len = 0; - std::memcpy(&key_len, key_buf_len, sizeof(uint64_t)); - - if (!inFile.good()) { - delete[] key_buf_len; - inFile.close(); - return Status::Corruption("Failed to read key length from file!"); - } - - // Now seek to the actual key position and read the key - inFile.seekg(offset + sizeof(uint64_t)); - char* key_buf = new char[key_len]; - inFile.read(key_buf, key_len); - if (!inFile.good()) { - delete[] key_buf; - delete[] key_buf_len; - inFile.close(); - return Status::Corruption("Failed to read key from file!"); - } - - // Assign the read key data to the Slice - *key = Slice(key_buf, key_len); - // Read the length of the value - inFile.seekg(offset + sizeof(uint64_t) + key_len); + inFile.seekg(offset); char* value_buf_len = new char[sizeof(uint64_t)]; inFile.read(value_buf_len, sizeof(uint64_t)); uint64_t val_len = 0; std::memcpy(&val_len, value_buf_len, sizeof(uint64_t)); if (!inFile.good()) { - delete[] key_buf; - delete[] key_buf_len; delete[] value_buf_len; inFile.close(); return Status::Corruption("Failed to read value length from file!"); } // Now seek to the actual data position and read the value - inFile.seekg(offset + sizeof(uint64_t) + key_len + sizeof(uint64_t)); + inFile.seekg(offset + sizeof(uint64_t)); char* value_buf = new char[val_len]; inFile.read(value_buf, val_len); if (!inFile.good()) { - delete[] key_buf; - delete[] key_buf_len; delete[] value_buf_len; delete[] value_buf; inFile.close(); return Status::Corruption("Failed to read value from file!"); } + // Seek to the position of key length + inFile.seekg(offset + sizeof(uint64_t) + val_len); + // Close the file after reading inFile.close(); @@ -1841,6 +1812,33 @@ void DBImpl::GarbageCollect() { Status s = Status::OK(); + // Read the length of the value + cur_valuelog.seekg(current_offset); + char* value_buf_len = new char[sizeof(uint64_t)]; + cur_valuelog.read(value_buf_len, sizeof(uint64_t)); + + if (cur_valuelog.eof()) { + delete[] value_buf_len; + break; // 正常退出条件:到达文件末尾 + } + + uint64_t val_len = 0; + std::memcpy(&val_len, value_buf_len, sizeof(uint64_t)); + + if (!cur_valuelog.good()) { + delete[] value_buf_len; + cur_valuelog.close(); + std::cerr << "3Failed to read file: " << valuelog_name << std::endl; + break; + } + // 更新当前偏移 + current_offset += sizeof(uint64_t); + // std::cout << cnt <<" "< getNewValuelog(); // use for compaction // Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice* // value)override; - Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, + Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* value) override; // Extra methods (for testing) that are not in the public DB interface diff --git a/db/db_iter.cc b/db/db_iter.cc index e27f1a4..5995f86 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -83,7 +83,7 @@ class DBIter : public Iterator { // res=GetVarint64(&tmp_value,&valuelog_len); // if(!res)assert(0); // db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value); - db_->ReadValueLog(file_id,valuelog_offset, &key, &tmp_value); + db_->ReadValueLog(file_id,valuelog_offset, &tmp_value); return tmp_value; } Status status() const override { diff --git a/include/leveldb/db.h b/include/leveldb/db.h index cefa79c..6118f31 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -119,7 +119,7 @@ class LEVELDB_EXPORT DB { // assert(0); // Not implemented // return Status::Corruption("not imp"); // } - virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, Slice* value){ + virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* value){ assert(0); // Not implemented return Status::Corruption("not imp"); } From 321fba458ffb47192a5c695b5962f5dcc34bff83 Mon Sep 17 00:00:00 2001 From: xxy <3237539022@qq.com> Date: Wed, 18 Dec 2024 21:42:19 +0800 Subject: [PATCH 3/3] value log map for gc --- db/db_impl.cc | 98 ++++++++++++++++++++++++++++++++++++++++++++++++++++------- db/db_impl.h | 10 +++++- test/test.cpp | 4 +-- 3 files changed, 98 insertions(+), 14 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 1434df4..6765cb2 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -154,7 +154,10 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) background_garbage_collect_scheduled_(false), manual_compaction_(nullptr), versions_(new VersionSet(dbname_, &options_, table_cache_, - &internal_comparator_)) {} + &internal_comparator_)) { + InitializeExistingLogs(); + // std::cout<<"init map"<smallest_snapshot) { // Hidden by an newer entry for same user key drop = true; // (A) + // Parse the value based on its first character + if(ikey.type != kTypeDeletion){ + Slice value = input->value(); + char type = value[0]; + if (type == 0x00) { + // Value is less than 100 bytes, use it directly + } else { + // Value is >= 100 bytes, read from external file + uint64_t file_id, valuelog_offset; + std::string file_id_str = value.ToString().substr(1, 8); + Slice file_id_slice(file_id_str); + bool res = GetVarint64(&file_id_slice, &file_id); + if (!res) return Status::Corruption("can't decode file id"); + if(valuelog_origin[file_id] == 0){ + valuelog_origin[file_id] = valuelog_usage[file_id]; + } + valuelog_usage[file_id]--; + // std::cout << "file_id: " << file_id << " usage: " << valuelog_usage[file_id] << std::endl; + } + } } else if (ikey.type == kTypeDeletion && ikey.sequence <= compact->smallest_snapshot && compact->compaction->IsBaseLevelForKey(ikey.user_key)) { @@ -1072,7 +1095,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (compact->builder->NumEntries() == 0) { compact->current_output()->smallest.DecodeFrom(key); } - compact->current_output()->largest.DecodeFrom(key); + compact->current_output()->largest.DecodeFrom(key); compact->builder->Add(key, input->value()); // Close output file if it is big enough @@ -1084,7 +1107,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } } } - input->Next(); } @@ -1122,6 +1144,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp)); + // for(int i=0;ic_str() + 1, value->length()); Slice new_value; - int value_offset = sizeof(uint64_t) * 2; // 16 uint64_t file_id, valuelog_offset; bool res = GetVarint64(&value_log_slice, &file_id); if (!res) return Status::Corruption("can't decode file id"); res = GetVarint64(&value_log_slice, &valuelog_offset); if (!res) return Status::Corruption("can't decode valuelog offset"); - // std::cout<<"file_id: "<> DBImpl::WriteValueLog( // 如果超出fixed_size if(offset>=config::value_log_size){ + int file_capacity=ReadFileSize(valuelogfile_number_); + // std::cout<<"file_capacity: "<> DBImpl::WriteValueLog( std::vector> res; for (const auto& [key_slice, value_slice] : kv) { - - // 写入 value 的长度 uint64_t value_len = value_slice.size(); valueFile.write(reinterpret_cast(&value_len), @@ -1652,7 +1677,7 @@ std::vector> DBImpl::WriteValueLog( } // 更新文件数据大小 - file_data_size += sizeof(uint64_t) + key_len + sizeof(uint64_t) + value_len; + file_data_size ++; // 记录 file_id 和 offset res.push_back({valuelogfile_number_, offset}); // 更新偏移量 @@ -1758,7 +1783,6 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, // 垃圾回收实现 void DBImpl::GarbageCollect() { // 遍历数据库目录,找到所有 valuelog 文件 - std::vector filenames; Status s = env_->GetChildren(dbname_, &filenames); Log(options_.info_log, "start gc "); @@ -1767,11 +1791,24 @@ void DBImpl::GarbageCollect() { for (const auto& filename:filenames) { if (IsValueLogFile(filename)){ uint64_t cur_log_number = GetValueLogID(filename); + if (cur_log_number == valuelogfile_number_) { + continue; + } auto tmp_name = ValueLogFileName(dbname_, cur_log_number); - - if(!versions_->checkOldValueLog(tmp_name))valuelog_set.emplace(filename); + // std::cout <checkOldValueLog(tmp_name) && + valuelog_origin[cur_log_number]) { + if ((float)(valuelog_usage[cur_log_number] / + valuelog_origin[cur_log_number]) <= 0.6) { + valuelog_set.emplace(filename); + } + } } } + // std::cout << "valuelog_set size: " << valuelog_set.size() << std::endl; + Log(options_.info_log, "valuelog_set size: %d", valuelog_set.size()); + //bool tmp_judge=false;//only clean one file for (std::string valuelog_name : valuelog_set) { Log(options_.info_log, ("gc processing: "+valuelog_name).data()); @@ -2064,4 +2101,43 @@ Status DestroyDB(const std::string& dbname, const Options& options) { return result; } +// 读取所有现有日志文件的 file_data_size +void DBImpl::InitializeExistingLogs() { + std::vector filenames; + Status s = env_->GetChildren(dbname_, &filenames); + Log(options_.info_log, "start set file map "); + assert(s.ok()); + std::set valuelog_set; + for (const auto& filename : filenames) { + if (IsValueLogFile(filename)) { + uint64_t cur_log_number = GetValueLogID(filename); + uint64_t file_data_size = ReadFileSize(cur_log_number); + valuelog_usage.emplace(cur_log_number,file_data_size); + // std::cout << "cur_log_number: " << cur_log_number << " file_data_size: " << file_data_size << std::endl; + + } + } +} + +// 读取单个文件的 file_data_size +uint64_t DBImpl::ReadFileSize(uint64_t log_number) { + auto file_name = ValueLogFileName(dbname_, log_number); + std::ifstream valueFile(file_name, std::ios::in | std::ios::binary); + if (!valueFile.is_open()) { + std::cerr << "Failed to open file: " << file_name << std::endl; + return 0; + } + + uint64_t file_data_size = 0; + valueFile.read(reinterpret_cast(&file_data_size), sizeof(uint64_t)); + if (valueFile.fail() || valueFile.bad()) { + std::cerr << "Failed to read data size from file: " << file_name + << std::endl; + valueFile.close(); + return 0; + } + valueFile.close(); + return file_data_size; +} + } // namespace leveldb diff --git a/db/db_impl.h b/db/db_impl.h index ab8ec0e..9d829c6 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -69,7 +69,7 @@ class DBImpl : public DB { std::vector> WriteValueLog( std::vector> value) override; void addNewValueLog() override EXCLUSIVE_LOCKS_REQUIRED(mutex_); - ; + std::pair getNewValuelog(); // use for compaction // Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice* // value)override; @@ -101,6 +101,11 @@ class DBImpl : public DB { // bytes. void RecordReadSample(Slice key); + void InitializeExistingLogs(); + + uint64_t ReadFileSize(uint64_t log_number); + + private: friend class DB; struct CompactionState; @@ -230,6 +235,9 @@ class DBImpl : public DB { uint64_t valuelogfile_number_; log::Writer* log_; std::map oldvaluelog_ids; + std::map valuelog_usage; + std::map valuelog_origin; + uint32_t seed_ GUARDED_BY(mutex_); // For sampling. // Queue of writers. diff --git a/test/test.cpp b/test/test.cpp index 56e79af..e8bfbf5 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -129,7 +129,7 @@ TEST(Test, Garbage_Collect_TEST) { abort(); } std::vector values; - for(int i=0;i<5000;i++){ + for(int i=0;i<50000;i++){ std::string key=std::to_string(i); std::string value; for(int j=0;j<1000;j++){ @@ -142,7 +142,7 @@ TEST(Test, Garbage_Collect_TEST) { db->TEST_GarbageCollect(); std::cout<<"finish gc"<