From 60c257b82929aa01524b44b976c84c85925ea772 Mon Sep 17 00:00:00 2001 From: xxy <3237539022@qq.com> Date: Tue, 3 Dec 2024 19:59:38 +0800 Subject: [PATCH 1/6] v3 compaction unfinish --- db/builder.cc | 1 + db/db_impl.cc | 197 +++++++++++++++++++++++++++++++++++++-------------- db/db_impl.h | 7 +- db/db_iter.cc | 7 +- db/write_batch.cc | 5 +- include/leveldb/db.h | 16 ++++- 6 files changed, 168 insertions(+), 65 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 7cf13f8..780e6f9 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -28,6 +28,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, return s; } + // 如果第一个字节是 0x01,它会移除这个前缀,并尝试从剩下的数据中解析出 value { auto tmp_value=iter->value(); if(tmp_value.data()[0]==(char)(0x01)){ diff --git a/db/db_impl.cc b/db/db_impl.cc index 56d51ec..82d9ce7 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -839,8 +839,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { compact->builder = new TableBuilder(options_, compact->outfile); } - - compact->valuelog_offset=0; + // compaction 后的新 valuelog + compact->valuelog_offset=0; // why s = env_->NewWritableFile(ValueLogFileName(dbname_,compact->valuelog_file_id), &compact->valuelogfile); return s; @@ -878,6 +878,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, } delete compact->outfile; compact->outfile = nullptr; + // value log 落盘 if (s.ok()) { s = compact->valuelogfile->Flush(); } @@ -955,9 +956,13 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { std::string current_user_key; bool has_current_user_key = false; SequenceNumber last_sequence_for_key = kMaxSequenceNumber; + // 记录 当前和下一层 level 中被合并的 files for (int which = 0; which < 2; which++) { for (int i = 0; i < compact->compaction->num_input_files(which); i++) { - if(compact->compaction->input(which, i)->valuelog_id)oldvaluelog_ids[compact->compaction->input(which, i)->number]=compact->compaction->input(which, i)->valuelog_id; + auto tmp_file=compact->compaction->input(which, i); + if(tmp_file->valuelog_id){ + oldvaluelog_ids[tmp_file->number]=tmp_file->valuelog_id; + } } } while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { @@ -1044,31 +1049,39 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { Slice old_value=input->value(); Slice new_value; std::string buf=""; - if(old_value.size()==0||old_value.data()[0]==(char)(0x00)){//when it is a deletion, input->value() will be "" + if(old_value.size()==0||old_value.data()[0]==(char)(0x00)){ + // when it is a deletion, input->value() will be "" new_value=old_value; } else{ + // not delete old_value.remove_prefix(1); - uint64_t file_id,valuelog_offset,valuelog_len; + // put value_len into value_log + int value_offset=sizeof(uint64_t)*2;// 16 + // uint64_t file_id,valuelog_offset,valuelog_len; + uint64_t file_id,valuelog_offset; bool res=GetVarint64(&old_value,&file_id); if(!res)assert(0); res=GetVarint64(&old_value,&valuelog_offset); if(!res)assert(0); - res=GetVarint64(&old_value,&valuelog_len); - if(!res)assert(0); - Status s=ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); + // res=GetVarint64(&old_value,&valuelog_len); + // if(!res)assert(0); + // Status s=ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); + Status s=ReadValueLog(file_id,valuelog_offset,&new_value); assert(s.ok()); writeValueLogForCompaction(compact->valuelogfile,{new_value}); buf+=(char)(0x01); PutVarint64(&buf,compact->valuelog_file_id); PutVarint64(&buf,compact->valuelog_offset); - PutVarint64(&buf,valuelog_len); - compact->valuelog_offset+=valuelog_len; + // PutVarint64(&buf,valuelog_len); + compact->valuelog_offset+=value_offset; delete []new_value.data(); new_value=Slice(buf); } compact->builder->Add(key, new_value); + // 更新计数器 + // compact->builder->add_log_count(); // Close output file if it is big enough if (compact->builder->FileSize() >= @@ -1241,14 +1254,18 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, } Slice value_log_slice=Slice(value->c_str()+1,value->length()); Slice new_value; - uint64_t file_id,valuelog_offset,valuelog_len; + // put value_len into value_log + int value_offset=sizeof(uint64_t)*2;// 16 + // uint64_t file_id,valuelog_offset,valuelog_len; + uint64_t file_id,valuelog_offset; bool res=GetVarint64(&value_log_slice,&file_id); if(!res)return Status::Corruption("can't decode file id"); res=GetVarint64(&value_log_slice,&valuelog_offset); if(!res)return Status::Corruption("can't decode valuelog offset"); - res=GetVarint64(&value_log_slice,&valuelog_len); - if(!res)return Status::Corruption("can't decode valuelog len"); - ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); + // res=GetVarint64(&value_log_slice,&valuelog_len); + // if(!res)return Status::Corruption("can't decode valuelog len"); + // ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); + ReadValueLog(file_id,valuelog_offset,&new_value); *value=std::string(new_value.data(),new_value.size()); delete []new_value.data(); return s; @@ -1574,35 +1591,66 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { v->Unref(); } -std::vector>> DBImpl::WriteValueLog(std::vector values){ - //lock - // std::vector>> res; - // for(int i=0;iAppend(values[i]); - // res.push_back({valuelogfile_number_,{valuelogfile_offset,len}}); - // valuelogfile_offset+=len; - // } - // //unlock - // valuelogfile_->Flush(); - // return res; - std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_); +// std::vector>> DBImpl::WriteValueLog(std::vector values){ + +// std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_); +// std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); +// if (!valueFile.is_open()) { +// assert(0); +// } +// uint64_t offset=valueFile.tellp(); +// std::vector>> res; +// for(int i=0;i> DBImpl::WriteValueLog(std::vector values) { + std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); if (!valueFile.is_open()) { - assert(0); + assert(0); + } + + uint64_t offset = valueFile.tellp(); + std::vector> res; + + for (const auto& slice : values) { + uint64_t len = slice.size(); + + // 先写入长度 + valueFile.write(reinterpret_cast(&len), sizeof(uint64_t)); + if (!valueFile.good()) { + valueFile.close(); + return {}; // 写入长度失败,返回空结果 } - uint64_t offset=valueFile.tellp(); - std::vector>> res; - for(int i=0;i values){ for(int i=0;iAppend(values[i]); @@ -1610,34 +1658,73 @@ void DBImpl::writeValueLogForCompaction(WritableFile* target_file,std::vectorSync(); - // valuelogfile_->Close(); - // delete valuelogfile_; - // } valuelogfile_number_=versions_->NewFileNumber(); - // valuelogfile_offset=0; - // std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_); - // env_->NewWritableFile(file_name_,&valuelogfile_); - //unlock } -Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ +// Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ +// //lock_shared +// Status s=Status::OK(); +// std::string file_name_=ValueLogFileName(dbname_,file_id); +// //std::cout<(&len), sizeof(uint64_t)); + if (!inFile.good()) { + inFile.close(); + return Status::Corruption("Failed to read length from file!"); + } + + // Now seek to the actual data position and read the value + inFile.seekg(offset + sizeof(uint64_t)); + char* value_buf = new char[len]; + inFile.read(value_buf, len); + if (!inFile.good()) { + delete[] value_buf; + inFile.close(); + return Status::Corruption("Failed to read value from file!"); + } + + // Close the file after reading inFile.close(); - *value=Slice(value_buf,len); - return Status::OK(); + + // Assign the read data to the Slice + *value = Slice(value_buf, len); + + // Clean up allocated buffer + // should also do in v2 + delete[] value_buf; + + return s; } // Default implementations of convenience methods that subclasses of DB diff --git a/db/db_impl.h b/db/db_impl.h index ab21773..513690b 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -63,11 +63,14 @@ class DBImpl : public DB { bool GetProperty(const Slice& property, std::string* value) override; void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void CompactRange(const Slice* begin, const Slice* end) override; - std::vector>> WriteValueLog(std::vector value)override; + // std::vector>> WriteValueLog(std::vector value)override; + std::vector> WriteValueLog(std::vector value)override; void writeValueLogForCompaction(WritableFile* target_file,std::vector value); void addNewValueLog()override EXCLUSIVE_LOCKS_REQUIRED(mutex_);; std::pair getNewValuelog();//use for compaction - Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override; + // Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override; + Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice* value)override; + // Extra methods (for testing) that are not in the public DB interface diff --git a/db/db_iter.cc b/db/db_iter.cc index 28d227e..350d175 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -79,9 +79,10 @@ class DBIter : public Iterator { if(!res)assert(0); res=GetVarint64(&tmp_value,&valuelog_offset); if(!res)assert(0); - res=GetVarint64(&tmp_value,&valuelog_len); - if(!res)assert(0); - db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value); + // res=GetVarint64(&tmp_value,&valuelog_len); + // if(!res)assert(0); + // db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value); + db_->ReadValueLog(file_id,valuelog_offset,&tmp_value); return tmp_value; } Status status() const override { diff --git a/db/write_batch.cc b/db/write_batch.cc index 77af25a..14df572 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -149,8 +149,9 @@ class ValueLogInserter : public WriteBatch::Handler { v.push_back(value); auto res=db_->WriteValueLog(v); PutVarint64(&buf,res[0].first); - PutVarint64(&buf,res[0].second.first); - PutVarint64(&buf,res[0].second.second); + // PutVarint64(&buf,res[0].second.first); + // PutVarint64(&buf,res[0].second.second); + PutVarint64(&buf,res[0].second); } new_value=Slice(buf); writeBatch_.Put(key,new_value); diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 566fbee..116744f 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -102,19 +102,29 @@ class LEVELDB_EXPORT DB { // virtual Status Get_keys_by_field(const ReadOptions& options, const Field field,std::vector *keys); - virtual std::vector>> WriteValueLog(std::vector value){ + // virtual std::vector>> WriteValueLog(std::vector value){ + // assert(0); + // std::vector>> v; + // return v; + // } + virtual std::vector> WriteValueLog(std::vector value){ assert(0); - std::vector>> v; + std::vector> v; return v; } virtual void addNewValueLog(){assert(0);} - virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ + // virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ + // assert(0); // Not implemented + // return Status::Corruption("not imp"); + // } + virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice* value){ assert(0); // Not implemented return Status::Corruption("not imp"); } + // Return a heap-allocated iterator over the contents of the database. // The result of NewIterator() is initially invalid (caller must // call one of the Seek methods on the iterator before using it). From 05fd39cd6b7c098cf745046ce79a6c5c19612556 Mon Sep 17 00:00:00 2001 From: xxy <3237539022@qq.com> Date: Wed, 4 Dec 2024 17:26:59 +0800 Subject: [PATCH 2/6] v3 except gc --- db/db_impl.cc | 116 +++++++--------------------------------------------------- 1 file changed, 13 insertions(+), 103 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 82d9ce7..d3b1471 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -57,7 +57,6 @@ struct DBImpl::CompactionState { struct Output { uint64_t number; uint64_t file_size; - uint64_t valuelog_id; InternalKey smallest, largest; }; @@ -84,10 +83,6 @@ struct DBImpl::CompactionState { WritableFile* outfile; TableBuilder* builder; - WritableFile* valuelogfile; - uint64_t valuelog_offset=0; - uint64_t valuelog_file_id=0; - uint64_t total_bytes; }; @@ -281,10 +276,6 @@ void DBImpl::RemoveObsoleteFiles() { } Log(options_.info_log, "Delete type=%d #%lld\n", static_cast(type), static_cast(number)); - if(oldvaluelog_ids.count(number)){ - std::string valuelog_filename=ValueLogFileName(dbname_,oldvaluelog_ids[number]); - env_->RemoveFile(valuelog_filename); - } } } } @@ -546,7 +537,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } edit->AddFile(level, meta.number, meta.file_size, meta.smallest, - meta.largest,meta.valuelog_id); + meta.largest); } CompactionStats stats; @@ -750,7 +741,7 @@ void DBImpl::BackgroundCompaction() { FileMetaData* f = c->input(0, 0); c->edit()->RemoveFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, - f->largest,f->valuelog_id); + f->largest); status = versions_->LogAndApply(c->edit(), &mutex_); if (!status.ok()) { RecordBackgroundError(status); @@ -824,11 +815,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { out.number = file_number; out.smallest.Clear(); out.largest.Clear(); - compact->valuelog_file_id=versions_->NewFileNumber(); - out.valuelog_id=compact->valuelog_file_id; - compact->outputs.push_back(out); - mutex_.Unlock(); } @@ -838,11 +825,6 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { if (s.ok()) { compact->builder = new TableBuilder(options_, compact->outfile); } - - // compaction 后的新 valuelog - compact->valuelog_offset=0; // why - s = env_->NewWritableFile(ValueLogFileName(dbname_,compact->valuelog_file_id), &compact->valuelogfile); - return s; } @@ -878,20 +860,6 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, } delete compact->outfile; compact->outfile = nullptr; - // value log 落盘 - if (s.ok()) { - s = compact->valuelogfile->Flush(); - } - if (s.ok()) { - s = compact->valuelogfile->Sync(); - } - if (s.ok()) { - s = compact->valuelogfile->Close(); - } - delete compact->valuelogfile; - compact->valuelogfile=nullptr; - compact->valuelog_file_id=0; - compact->valuelog_offset=0; if (s.ok() && current_entries > 0) { // Verify that the table is usable @@ -922,7 +890,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, - out.smallest, out.largest,out.valuelog_id); + out.smallest, out.largest); } return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } @@ -956,15 +924,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { std::string current_user_key; bool has_current_user_key = false; SequenceNumber last_sequence_for_key = kMaxSequenceNumber; - // 记录 当前和下一层 level 中被合并的 files - for (int which = 0; which < 2; which++) { - for (int i = 0; i < compact->compaction->num_input_files(which); i++) { - auto tmp_file=compact->compaction->input(which, i); - if(tmp_file->valuelog_id){ - oldvaluelog_ids[tmp_file->number]=tmp_file->valuelog_id; - } - } - } while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { // Prioritize immutable compaction work if (has_imm_.load(std::memory_order_relaxed)) { @@ -1045,43 +1004,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { compact->current_output()->smallest.DecodeFrom(key); } compact->current_output()->largest.DecodeFrom(key); - - Slice old_value=input->value(); - Slice new_value; - std::string buf=""; - if(old_value.size()==0||old_value.data()[0]==(char)(0x00)){ - // when it is a deletion, input->value() will be "" - new_value=old_value; - } - else{ - // not delete - old_value.remove_prefix(1); - // put value_len into value_log - int value_offset=sizeof(uint64_t)*2;// 16 - // uint64_t file_id,valuelog_offset,valuelog_len; - uint64_t file_id,valuelog_offset; - bool res=GetVarint64(&old_value,&file_id); - if(!res)assert(0); - res=GetVarint64(&old_value,&valuelog_offset); - if(!res)assert(0); - // res=GetVarint64(&old_value,&valuelog_len); - // if(!res)assert(0); - // Status s=ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); - Status s=ReadValueLog(file_id,valuelog_offset,&new_value); - assert(s.ok()); - writeValueLogForCompaction(compact->valuelogfile,{new_value}); - buf+=(char)(0x01); - PutVarint64(&buf,compact->valuelog_file_id); - PutVarint64(&buf,compact->valuelog_offset); - // PutVarint64(&buf,valuelog_len); - compact->valuelog_offset+=value_offset; - delete []new_value.data(); - new_value=Slice(buf); - } - - compact->builder->Add(key, new_value); - // 更新计数器 - // compact->builder->add_log_count(); + compact->builder->Add(key, input->value()); // Close output file if it is big enough if (compact->builder->FileSize() >= @@ -1105,14 +1028,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (status.ok()) { status = input->status(); } - //not completely correct, should be written in new function, related to removeabsol... - // if(status.ok()){ - // for(auto id:old_valuelog_ids){ - // auto valuelog_filename=ValueLogFileName(dbname_,id); - // Status s=env_->RemoveFile(valuelog_filename); - // assert(s.ok()); - // } - // } delete input; input = nullptr; @@ -1254,17 +1169,12 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, } Slice value_log_slice=Slice(value->c_str()+1,value->length()); Slice new_value; - // put value_len into value_log int value_offset=sizeof(uint64_t)*2;// 16 - // uint64_t file_id,valuelog_offset,valuelog_len; uint64_t file_id,valuelog_offset; bool res=GetVarint64(&value_log_slice,&file_id); if(!res)return Status::Corruption("can't decode file id"); res=GetVarint64(&value_log_slice,&valuelog_offset); if(!res)return Status::Corruption("can't decode valuelog offset"); - // res=GetVarint64(&value_log_slice,&valuelog_len); - // if(!res)return Status::Corruption("can't decode valuelog len"); - // ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); ReadValueLog(file_id,valuelog_offset,&new_value); *value=std::string(new_value.data(),new_value.size()); delete []new_value.data(); @@ -1628,20 +1538,19 @@ std::vector> DBImpl::WriteValueLog(std::vector(&len), sizeof(uint64_t)); if (!valueFile.good()) { valueFile.close(); - return {}; // 写入长度失败,返回空结果 + assert(0); } // 再写入实际数据 valueFile.write(slice.data(), len); if (!valueFile.good()) { valueFile.close(); - return {}; // 写入数据失败,返回空结果 + assert(0); } // 记录 file_id 和 offset res.push_back({valuelogfile_number_, offset}); - // 更新偏移量,包括长度信息 offset += sizeof(uint64_t) + len; } @@ -1697,8 +1606,13 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* value) { inFile.seekg(offset); // Read the length of the value - uint64_t len; - inFile.read(reinterpret_cast(&len), sizeof(uint64_t)); + // uint64_t len; + // inFile.read(reinterpret_cast(&len), sizeof(uint64_t)); + char *value_buf_len=new char[sizeof(uint64_t)]; + inFile.read(value_buf_len,sizeof(uint64_t)); + uint64_t len=0; + std::memcpy(&len, value_buf_len, sizeof(uint64_t)); + if (!inFile.good()) { inFile.close(); return Status::Corruption("Failed to read length from file!"); @@ -1720,10 +1634,6 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* value) { // Assign the read data to the Slice *value = Slice(value_buf, len); - // Clean up allocated buffer - // should also do in v2 - delete[] value_buf; - return s; } From f7fa26c9df169aba085c2e95c6e50da7423ac37a Mon Sep 17 00:00:00 2001 From: xxy <3237539022@qq.com> Date: Thu, 5 Dec 2024 10:06:52 +0800 Subject: [PATCH 3/6] gc pass make --- CMakeLists.txt | 7 +- db/db_impl.cc | 397 +++++++++++++++++++++++++++++++++++++++--------- db/db_impl.h | 54 ++++--- db/db_iter.cc | 3 +- db/write_batch.cc | 6 +- include/leveldb/db.h | 4 +- include/leveldb/slice.h | 2 +- test/test.cpp | 132 ++++++++-------- 8 files changed, 436 insertions(+), 169 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 91a44f2..99951c3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,13 +2,14 @@ # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. See the AUTHORS file for names of contributors. -cmake_minimum_required(VERSION 3.9) +cmake_minimum_required(VERSION 3.10) # Keep the version below in sync with the one in db.h project(leveldb VERSION 1.23.0 LANGUAGES C CXX) # C standard can be overridden when this is used as a sub-project. if(NOT CMAKE_C_STANDARD) # This project can use C11, but will gracefully decay down to C89. + # 我改到17了 set(CMAKE_C_STANDARD 11) set(CMAKE_C_STANDARD_REQUIRED OFF) set(CMAKE_C_EXTENSIONS OFF) @@ -16,8 +17,8 @@ endif(NOT CMAKE_C_STANDARD) # C++ standard can be overridden when this is used as a sub-project. if(NOT CMAKE_CXX_STANDARD) - # This project requires C++11. - set(CMAKE_CXX_STANDARD 11) + # This project requires C++17. + set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) endif(NOT CMAKE_CXX_STANDARD) diff --git a/db/db_impl.cc b/db/db_impl.cc index d3b1471..87664b4 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4,15 +4,6 @@ #include "db/db_impl.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include #include "db/builder.h" #include "db/db_iter.h" #include "db/dbformat.h" @@ -23,11 +14,23 @@ #include "db/table_cache.h" #include "db/version_set.h" #include "db/write_batch_internal.h" +#include +#include +#include +#include + +#include +#include +#include +#include +#include + #include "leveldb/db.h" #include "leveldb/env.h" #include "leveldb/status.h" #include "leveldb/table.h" #include "leveldb/table_builder.h" + #include "port/port.h" #include "table/block.h" #include "table/merger.h" @@ -35,6 +38,8 @@ #include "util/coding.h" #include "util/logging.h" #include "util/mutexlock.h" +#include +namespace fs = std::filesystem; namespace leveldb { @@ -146,6 +151,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) seed_(0), tmp_batch_(new WriteBatch), background_compaction_scheduled_(false), + background_garbage_collect_scheduled_(false), manual_compaction_(nullptr), versions_(new VersionSet(dbname_, &options_, table_cache_, &internal_comparator_)) {} @@ -667,6 +673,7 @@ void DBImpl::RecordBackgroundError(const Status& s) { void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); + if (background_compaction_scheduled_) { // Already scheduled } else if (shutting_down_.load(std::memory_order_acquire)) { @@ -682,6 +689,25 @@ void DBImpl::MaybeScheduleCompaction() { } } +void DBImpl::MaybeScheduleGarbageCollect() { + mutex_.AssertHeld(); + if (background_garbage_collect_scheduled_) { + // Garbage collection already scheduled + } else if (shutting_down_.load(std::memory_order_acquire)) { + // DB is being deleted; no more background work + } else if (!bg_error_.ok()) { + // Already got an error; no more changes + } else { + background_garbage_collect_scheduled_ = true; + env_->Schedule(&DBImpl::BGWorkGC, this); + } + +} + +void DBImpl::BGWorkGC(void* db) { + reinterpret_cast(db)->BackgroundGarbageCollect(); +} + void DBImpl::BGWork(void* db) { reinterpret_cast(db)->BackgroundCall(); } @@ -699,12 +725,34 @@ void DBImpl::BackgroundCall() { background_compaction_scheduled_ = false; + // Check if garbage collection needs to be scheduled after compaction + MaybeScheduleGarbageCollect(); + // Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed. MaybeScheduleCompaction(); background_work_finished_signal_.SignalAll(); } +void DBImpl::BackgroundGarbageCollect() { + MutexLock l(&mutex_); + assert(background_garbage_collect_scheduled_); + + if (shutting_down_.load(std::memory_order_acquire)) { + // No more background work when shutting down. + } else if (!bg_error_.ok()) { + // No more background work after a background error. + } else { + // Perform garbage collection here + GarbageCollect(); + } + + background_garbage_collect_scheduled_ = false; + + // Notify any waiting threads + background_work_finished_signal_.SignalAll(); +} + void DBImpl::BackgroundCompaction() { mutex_.AssertHeld(); @@ -1163,21 +1211,22 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, if (imm != nullptr) imm->Unref(); current->Unref(); - if(value->c_str()[0]==0x00){ - *value=value->substr(1); + if (value->c_str()[0] == 0x00) { + *value = value->substr(1); return s; } - Slice value_log_slice=Slice(value->c_str()+1,value->length()); + Slice value_log_slice = Slice(value->c_str() + 1, value->length()); + Slice new_key; Slice new_value; - int value_offset=sizeof(uint64_t)*2;// 16 - uint64_t file_id,valuelog_offset; - bool res=GetVarint64(&value_log_slice,&file_id); - if(!res)return Status::Corruption("can't decode file id"); - res=GetVarint64(&value_log_slice,&valuelog_offset); - if(!res)return Status::Corruption("can't decode valuelog offset"); - ReadValueLog(file_id,valuelog_offset,&new_value); - *value=std::string(new_value.data(),new_value.size()); - delete []new_value.data(); + int value_offset = sizeof(uint64_t) * 2; // 16 + uint64_t file_id, valuelog_offset; + bool res = GetVarint64(&value_log_slice, &file_id); + if (!res) return Status::Corruption("can't decode file id"); + res = GetVarint64(&value_log_slice, &valuelog_offset); + if (!res) return Status::Corruption("can't decode valuelog offset"); + ReadValueLog(file_id, valuelog_offset, &new_key, &new_value); + *value = std::string(new_value.data(), new_value.size()); + delete[] new_value.data(); return s; } @@ -1215,7 +1264,6 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { return DB::Put(o, key, val); } - Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { return DB::Delete(options, key); } @@ -1241,7 +1289,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Writer* last_writer = &w; if (status.ok() && updates != nullptr) { // nullptr batch is for compactions WriteBatch* write_batch = BuildBatchGroup(&last_writer); - WriteBatchInternal::ConverToValueLog(write_batch,this); + WriteBatchInternal::ConverToValueLog(write_batch, this); WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); last_sequence += WriteBatchInternal::Count(write_batch); @@ -1501,7 +1549,8 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { v->Unref(); } -// std::vector>> DBImpl::WriteValueLog(std::vector values){ +// std::vector>> +// DBImpl::WriteValueLog(std::vector values){ // std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_); // std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); @@ -1521,37 +1570,54 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { // return res; // } -std::vector> DBImpl::WriteValueLog(std::vector values) { +std::vector> DBImpl::WriteValueLog( + std::vector> kv) { std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); if (!valueFile.is_open()) { - assert(0); + assert(0); } uint64_t offset = valueFile.tellp(); std::vector> res; - for (const auto& slice : values) { - uint64_t len = slice.size(); + for (const auto& [key_slice, value_slice] : kv) { + // 写入 key 的长度 + uint64_t key_len = key_slice.size(); + valueFile.write(reinterpret_cast(&key_len), sizeof(uint64_t)); + if (!valueFile.good()) { + valueFile.close(); + assert(0); + } - // 先写入长度 - valueFile.write(reinterpret_cast(&len), sizeof(uint64_t)); + // 写入 key 本身 + valueFile.write(key_slice.data(), key_len); if (!valueFile.good()) { - valueFile.close(); - assert(0); + valueFile.close(); + assert(0); } - // 再写入实际数据 - valueFile.write(slice.data(), len); + // 写入 value 的长度 + uint64_t value_len = value_slice.size(); + valueFile.write(reinterpret_cast(&value_len), + sizeof(uint64_t)); if (!valueFile.good()) { - valueFile.close(); - assert(0); + valueFile.close(); + assert(0); + } + + // 写入 value 本身 + valueFile.write(value_slice.data(), value_len); + if (!valueFile.good()) { + valueFile.close(); + assert(0); } // 记录 file_id 和 offset res.push_back({valuelogfile_number_, offset}); - offset += sizeof(uint64_t) + len; + // 更新偏移量 + offset += sizeof(uint64_t) + key_len + sizeof(uint64_t) + value_len; } // 解锁资源或进行其他清理操作 @@ -1560,83 +1626,266 @@ std::vector> DBImpl::WriteValueLog(std::vector values){ - for(int i=0;i values) { + for (int i = 0; i < values.size(); i++) { target_file->Append(values[i]); } } -void DBImpl::addNewValueLog(){ - valuelogfile_number_=versions_->NewFileNumber(); +void DBImpl::addNewValueLog() { + valuelogfile_number_ = versions_->NewFileNumber(); } -// Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ +// Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* value) +// { // //lock_shared -// Status s=Status::OK(); -// std::string file_name_=ValueLogFileName(dbname_,file_id); -// //std::cout<(&len), sizeof(uint64_t)); +// char *value_buf_len=new char[sizeof(uint64_t)]; +// inFile.read(value_buf_len,sizeof(uint64_t)); +// uint64_t len=0; +// std::memcpy(&len, value_buf_len, sizeof(uint64_t)); + +// if (!inFile.good()) { +// inFile.close(); +// return Status::Corruption("Failed to read length from file!"); +// } + +// // Now seek to the actual data position and read the value +// inFile.seekg(offset + sizeof(uint64_t)); +// char* value_buf = new char[len]; +// inFile.read(value_buf, len); +// if (!inFile.good()) { +// delete[] value_buf; +// inFile.close(); +// return Status::Corruption("Failed to read value from file!"); +// } + +// // Close the file after reading // inFile.close(); -// *value=Slice(value_buf,len); + +// // Assign the read data to the Slice +// *value = Slice(value_buf, len); + // return s; // } -Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* value) { - //lock_shared +Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, + Slice* value) { Status s = Status::OK(); std::string file_name_ = ValueLogFileName(dbname_, file_id); - + // Open the file in binary mode for reading std::ifstream inFile(file_name_, std::ios::in | std::ios::binary); if (!inFile.is_open()) { - std::cerr << "Failed to open file: " << file_name_ << " for reading!" << std::endl; - return Status::Corruption("Failed to open file for reading!"); + std::cerr << "Failed to open file: " << file_name_ << " for reading!" + << std::endl; + return Status::Corruption("Failed to open file for reading!"); } - // Seek to the position of len + // Seek to the position of key length inFile.seekg(offset); + // Read the length of the key + char* key_buf_len = new char[sizeof(uint64_t)]; + inFile.read(key_buf_len, sizeof(uint64_t)); + uint64_t key_len = 0; + std::memcpy(&key_len, key_buf_len, sizeof(uint64_t)); + + if (!inFile.good()) { + delete[] key_buf_len; + inFile.close(); + return Status::Corruption("Failed to read key length from file!"); + } + + // Now seek to the actual key position and read the key + inFile.seekg(offset + sizeof(uint64_t)); + char* key_buf = new char[key_len]; + inFile.read(key_buf, key_len); + if (!inFile.good()) { + delete[] key_buf; + delete[] key_buf_len; + inFile.close(); + return Status::Corruption("Failed to read key from file!"); + } + + // Assign the read key data to the Slice + *key = Slice(key_buf, key_len); + // Read the length of the value - // uint64_t len; - // inFile.read(reinterpret_cast(&len), sizeof(uint64_t)); - char *value_buf_len=new char[sizeof(uint64_t)]; - inFile.read(value_buf_len,sizeof(uint64_t)); - uint64_t len=0; - std::memcpy(&len, value_buf_len, sizeof(uint64_t)); + inFile.seekg(offset + sizeof(uint64_t) + key_len); + char* value_buf_len = new char[sizeof(uint64_t)]; + inFile.read(value_buf_len, sizeof(uint64_t)); + uint64_t val_len = 0; + std::memcpy(&val_len, value_buf_len, sizeof(uint64_t)); if (!inFile.good()) { - inFile.close(); - return Status::Corruption("Failed to read length from file!"); + delete[] key_buf; + delete[] key_buf_len; + delete[] value_buf_len; + inFile.close(); + return Status::Corruption("Failed to read value length from file!"); } // Now seek to the actual data position and read the value - inFile.seekg(offset + sizeof(uint64_t)); - char* value_buf = new char[len]; - inFile.read(value_buf, len); + inFile.seekg(offset + sizeof(uint64_t) + key_len + sizeof(uint64_t)); + char* value_buf = new char[val_len]; + inFile.read(value_buf, val_len); if (!inFile.good()) { - delete[] value_buf; - inFile.close(); - return Status::Corruption("Failed to read value from file!"); + delete[] key_buf; + delete[] key_buf_len; + delete[] value_buf_len; + delete[] value_buf; + inFile.close(); + return Status::Corruption("Failed to read value from file!"); } // Close the file after reading inFile.close(); - // Assign the read data to the Slice - *value = Slice(value_buf, len); + // Assign the read value data to the Slice + *value = Slice(value_buf, val_len); return s; } +// 判断文件是否为 valuelog 文件 +bool IsValueLogFile(const std::string& filename) { + return filename.find("valuelog_") == + 0; // 简单判断文件名是否匹配 valuelog 前缀 +} + +// 示例:解析 sstable 中的元信息 +void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id, + uint64_t& offset) { + // 假设 stored_value 格式为:valuelog_id|offset + std::istringstream iss(stored_value); + iss >> valuelog_id >> offset; +} + +// 示例:获取 ValueLog 文件 ID +uint64_t GetValueLogID(const std::string& valuelog_name) { + // 假设文件名中包含唯一的 ID,例如 "valuelog_1" + auto pos = valuelog_name.find_last_of('_'); + return std::stoull(valuelog_name.substr(pos + 1)); +} + +// 垃圾回收实现 +void DBImpl::GarbageCollect() { + // 遍历数据库目录,找到所有 valuelog 文件 + auto files_set = fs::directory_iterator(dbname_); + for (const auto& cur_log_file : files_set) { + if (fs::exists(cur_log_file) && + fs::is_regular_file(fs::status(cur_log_file)) && + IsValueLogFile(cur_log_file.path().filename().string())) { + std::string valuelog_name = cur_log_file.path().string(); + uint64_t cur_log_number = GetValueLogID(valuelog_name); + uint64_t new_log_number = versions_->NewFileNumber(); + WritableFile* new_valuelog = nullptr; + std::string new_valuelog_name = LogFileName(dbname_, new_log_number); + Status s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), + &new_valuelog); + if (!s.ok()) { + // Avoid chewing through file number space in a tight loop. + versions_->ReuseFileNumber(new_log_number); + break; + } + addNewValueLog(); + + std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary); + if (!cur_valuelog.is_open()) { + std::cerr << "Failed to open ValueLog file: " << valuelog_name + << std::endl; + continue; + } + + // whether to reopen + std::ifstream new_valuelog_file(new_valuelog_name, + std::ios::in | std::ios::binary); + if (!new_valuelog_file.is_open()) { + std::cerr << "Failed to create new ValueLog file: " << new_valuelog_name + << std::endl; + continue; + } + + uint64_t current_offset = 0; + uint64_t new_offset = 0; // 新的 ValueLog 偏移 + + while (true) { + // 读取一个 kv 对 + uint64_t key_len, value_len; + Slice key, value; + Slice new_value; + + ReadValueLog(cur_log_number, current_offset, &key, &value); + value = std::string(new_value.data(), new_value.size()); + + // 检查 key 是否在 sstable 中存在 + std::string stored_value; + Status status = Get(leveldb::ReadOptions(), key, &stored_value); + + if (status.IsNotFound()) { + // Key 不存在,忽略此记录 + continue; + } + + if (!status.ok()) { + std::cerr << "Error accessing sstable: " << status.ToString() + << std::endl; + continue; + } + + // 检查 valuelog_id 和 offset 是否匹配 + uint64_t stored_valuelog_id, stored_offset; + ParseStoredValue(stored_value, stored_valuelog_id, + stored_offset); // 假设解析函数 + if (stored_valuelog_id != GetValueLogID(valuelog_name) || + stored_offset != current_offset) { + // 记录无效,跳过 + continue; + } + + status = Put(leveldb::WriteOptions(), key, value); + if (!status.ok()) { + std::cerr << "Error accessing sstable: " << status.ToString() + << std::endl; + continue; + } + + // 更新偏移 + new_offset += + sizeof(key_len) + key.size() + sizeof(value_len) + value.size(); + + // 更新当前偏移 + current_offset += + sizeof(key_len) + key.size() + sizeof(value_len) + value.size(); + } + + // 清理旧文件(如果需要) + cur_valuelog.close(); + new_valuelog_file.close(); + + std::remove(valuelog_name.c_str()); // 删除旧的 ValueLog 文件 + } + } +} + // Default implementations of convenience methods that subclasses of DB // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { diff --git a/db/db_impl.h b/db/db_impl.h index 513690b..2c2d100 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -5,19 +5,20 @@ #ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_ #define STORAGE_LEVELDB_DB_DB_IMPL_H_ +#include "db/dbformat.h" +#include "db/log_writer.h" +#include "db/snapshot.h" #include #include -#include -#include -#include #include +#include +#include #include +#include -#include "db/dbformat.h" -#include "db/log_writer.h" -#include "db/snapshot.h" #include "leveldb/db.h" #include "leveldb/env.h" + #include "port/port.h" #include "port/thread_annotations.h" @@ -44,12 +45,12 @@ class DBImpl : public DB { // // 反序列化为字段数组 // FieldArray ParseValue(const std::string& value_str)override; - // Status Put_with_fields(const WriteOptions& options, const Slice& key,const FieldArray& fields)override; + // Status Put_with_fields(const WriteOptions& options, const Slice& key,const + // FieldArray& fields)override; // Status Get_with_fields(const ReadOptions& options, const Slice& key, // FieldArray* fields)override; - // Implementations of the DB interface Status Put(const WriteOptions&, const Slice& key, const Slice& value) override; @@ -63,14 +64,19 @@ class DBImpl : public DB { bool GetProperty(const Slice& property, std::string* value) override; void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void CompactRange(const Slice* begin, const Slice* end) override; - // std::vector>> WriteValueLog(std::vector value)override; - std::vector> WriteValueLog(std::vector value)override; - void writeValueLogForCompaction(WritableFile* target_file,std::vector value); - void addNewValueLog()override EXCLUSIVE_LOCKS_REQUIRED(mutex_);; - std::pair getNewValuelog();//use for compaction - // Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override; - Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice* value)override; - + // std::vector>> + // WriteValueLog(std::vector value)override; + std::vector> WriteValueLog( + std::vector> value) override; + void writeValueLogForCompaction(WritableFile* target_file, + std::vector value); + void addNewValueLog() override EXCLUSIVE_LOCKS_REQUIRED(mutex_); + ; + std::pair getNewValuelog(); // use for compaction + // Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice* + // value)override; + Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, + Slice* value) override; // Extra methods (for testing) that are not in the public DB interface @@ -161,9 +167,15 @@ class DBImpl : public DB { void RecordBackgroundError(const Status& s); void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void MaybeScheduleGarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_); static void BGWork(void* db); + static void BGWorkGC(void* db); + void BackgroundCall(); void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void BackgroundGarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void GarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void CleanupCompaction(CompactionState* compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status DoCompactionWork(CompactionState* compact) @@ -195,7 +207,7 @@ class DBImpl : public DB { // State below is protected by mutex_ port::Mutex mutex_; - //std::shared_mutex value_log_mutex; + // std::shared_mutex value_log_mutex; std::atomic shutting_down_; port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_); MemTable* mem_; @@ -203,11 +215,11 @@ class DBImpl : public DB { std::atomic has_imm_; // So bg thread can detect non-null imm_ WritableFile* logfile_; WritableFile* valuelogfile_; - int valuelogfile_offset=0; + int valuelogfile_offset = 0; uint64_t logfile_number_; uint64_t valuelogfile_number_; log::Writer* log_; - std::map oldvaluelog_ids; + std::map oldvaluelog_ids; uint32_t seed_ GUARDED_BY(mutex_); // For sampling. // Queue of writers. @@ -223,6 +235,10 @@ class DBImpl : public DB { // Has a background compaction been scheduled or is running? bool background_compaction_scheduled_ GUARDED_BY(mutex_); + // Has a background gc been scheduled or is running? + bool background_garbage_collect_scheduled_ GUARDED_BY(mutex_); + + ManualCompaction* manual_compaction_ GUARDED_BY(mutex_); VersionSet* const versions_ GUARDED_BY(mutex_); diff --git a/db/db_iter.cc b/db/db_iter.cc index 350d175..e27f1a4 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -69,6 +69,7 @@ class DBIter : public Iterator { Slice value() const override { assert(valid_); auto tmp_value= (direction_ == kForward) ? iter_->value() : saved_value_; + Slice key; if(tmp_value.data()[0]==0x00){ tmp_value.remove_prefix(1); return tmp_value; @@ -82,7 +83,7 @@ class DBIter : public Iterator { // res=GetVarint64(&tmp_value,&valuelog_len); // if(!res)assert(0); // db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value); - db_->ReadValueLog(file_id,valuelog_offset,&tmp_value); + db_->ReadValueLog(file_id,valuelog_offset, &key, &tmp_value); return tmp_value; } Status status() const override { diff --git a/db/write_batch.cc b/db/write_batch.cc index 14df572..6030f47 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -145,9 +145,9 @@ class ValueLogInserter : public WriteBatch::Handler { } else{ buf+=(char)(0x01); - std::vector v; - v.push_back(value); - auto res=db_->WriteValueLog(v); + std::vector> kv; + kv.push_back({key,value}); + auto res=db_->WriteValueLog(kv); PutVarint64(&buf,res[0].first); // PutVarint64(&buf,res[0].second.first); // PutVarint64(&buf,res[0].second.second); diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 116744f..a72a6db 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -107,7 +107,7 @@ class LEVELDB_EXPORT DB { // std::vector>> v; // return v; // } - virtual std::vector> WriteValueLog(std::vector value){ + virtual std::vector> WriteValueLog(std::vector> value){ assert(0); std::vector> v; return v; @@ -119,7 +119,7 @@ class LEVELDB_EXPORT DB { // assert(0); // Not implemented // return Status::Corruption("not imp"); // } - virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice* value){ + virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, Slice* value){ assert(0); // Not implemented return Status::Corruption("not imp"); } diff --git a/include/leveldb/slice.h b/include/leveldb/slice.h index d225f34..d78c078 100644 --- a/include/leveldb/slice.h +++ b/include/leveldb/slice.h @@ -69,7 +69,7 @@ class LEVELDB_EXPORT Slice { // Drop the first "n" bytes from this slice. void remove_prefix(size_t n) { - if(n>size()){ + if (n > size()) { assert(0); } assert(n <= size()); diff --git a/test/test.cpp b/test/test.cpp index 78240d9..6e523ba 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -86,77 +86,77 @@ Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,st return Status::OK(); } -TEST(Test, CheckGetFields) { - DB *db; - WriteOptions writeOptions; - ReadOptions readOptions; - if(OpenDB("testdb_for_XOY", &db).ok() == false) { - std::cerr << "open db failed" << std::endl; - abort(); - } - std::string key1 = "k_1"; +// TEST(Test, CheckGetFields) { +// DB *db; +// WriteOptions writeOptions; +// ReadOptions readOptions; +// if(OpenDB("testdb_for_XOY", &db).ok() == false) { +// std::cerr << "open db failed" << std::endl; +// abort(); +// } +// std::string key1 = "k_1"; - FieldArray fields1 = { - {"name", "Customer#000000001"}, - {"address", "IVhzIApeRb"}, - {"phone", "25-989-741-2988"} - }; +// FieldArray fields1 = { +// {"name", "Customer#000000001"}, +// {"address", "IVhzIApeRb"}, +// {"phone", "25-989-741-2988"} +// }; - auto value1=SerializeValue(fields1); +// auto value1=SerializeValue(fields1); - db->Put(WriteOptions(), key1, value1); +// db->Put(WriteOptions(), key1, value1); - // 璇诲彇骞跺弽搴忓垪鍖? - std::string value_ret; - FieldArray res1; +// // 璇诲彇骞跺弽搴忓垪鍖? +// std::string value_ret; +// FieldArray res1; - db->Get(ReadOptions(), key1, &value_ret); - DeserializeValue(value_ret, &res1); - for(auto pr:res1){ - std::cout<Get(ReadOptions(), key1, &value_ret); +// DeserializeValue(value_ret, &res1); +// for(auto pr:res1){ +// std::cout<Delete(WriteOptions(),key1); +// db->Delete(WriteOptions(),key1); - std::cout<<"get serialized value done"< keys; - std::vector target_keys; - for(int i=0;i<10000;i++){ - std::string key=std::to_string(rand()%10000)+"_"+std::to_string(i);//random for generate nonincreasing keys - FieldArray fields={ - {"name", key}, - {"address", std::to_string(rand()%7)}, - {"phone", std::to_string(rand()%114514)} - }; - if(rand()%5==0){ - fields[0].second="special_key"; - target_keys.push_back(key); - } - keys.push_back(key); - db->Put(WriteOptions(),key,SerializeValue(fields)); - } - std::sort(target_keys.begin(),target_keys.end()); - std::vector key_res; - Get_keys_by_field(db,ReadOptions(),{"name", "special_key"},&key_res); - ASSERT_TRUE(CompareKey(key_res, target_keys)); - std::cout<<"get key by field done"<Delete(WriteOptions(),s); - } - delete db; -} +// std::cout<<"get serialized value done"< keys; +// std::vector target_keys; +// for(int i=0;i<10000;i++){ +// std::string key=std::to_string(rand()%10000)+"_"+std::to_string(i);//random for generate nonincreasing keys +// FieldArray fields={ +// {"name", key}, +// {"address", std::to_string(rand()%7)}, +// {"phone", std::to_string(rand()%114514)} +// }; +// if(rand()%5==0){ +// fields[0].second="special_key"; +// target_keys.push_back(key); +// } +// keys.push_back(key); +// db->Put(WriteOptions(),key,SerializeValue(fields)); +// } +// std::sort(target_keys.begin(),target_keys.end()); +// std::vector key_res; +// Get_keys_by_field(db,ReadOptions(),{"name", "special_key"},&key_res); +// ASSERT_TRUE(CompareKey(key_res, target_keys)); +// std::cout<<"get key by field done"<Delete(WriteOptions(),s); +// } +// delete db; +// } TEST(Test, LARGE_DATA_COMPACT_TEST) { DB *db; @@ -167,7 +167,7 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) { abort(); } std::vector values; - for(int i=0;i<500000;i++){ + for(int i=0;i<5000;i++){ std::string key=std::to_string(i); std::string value; for(int j=0;j<1000;j++){ @@ -176,7 +176,7 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) { values.push_back(value); db->Put(writeOptions,key,value); } - for(int i=0;i<500000;i++){ + for(int i=0;i<5000;i++){ std::string key=std::to_string(i); std::string value; Status s=db->Get(readOptions,key,&value); From 9da542dfe190975084445b3d21c3d56da2cb07ff Mon Sep 17 00:00:00 2001 From: xxy <3237539022@qq.com> Date: Thu, 5 Dec 2024 11:50:39 +0800 Subject: [PATCH 4/6] gc dead --- db/db_impl.cc | 151 +++++++++++++++++++++++++++++++++++++++++++-------- db/db_impl.h | 3 + include/leveldb/db.h | 2 + test/test.cpp | 43 +++++++++++++-- 4 files changed, 172 insertions(+), 27 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 87664b4..cb52517 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -663,6 +663,15 @@ Status DBImpl::TEST_CompactMemTable() { return s; } +void DBImpl::TEST_GarbageCollect() { + MaybeScheduleGarbageCollect(); + // Finish current background compaction in the case where + // `background_work_finished_signal_` was signalled due to an error. + while (background_garbage_collect_scheduled_) { + background_work_finished_signal_.Wait(); + } +} + void DBImpl::RecordBackgroundError(const Status& s) { mutex_.AssertHeld(); if (bg_error_.ok()) { @@ -725,8 +734,8 @@ void DBImpl::BackgroundCall() { background_compaction_scheduled_ = false; - // Check if garbage collection needs to be scheduled after compaction - MaybeScheduleGarbageCollect(); + // // Check if garbage collection needs to be scheduled after compaction + // MaybeScheduleGarbageCollect(); // Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed. @@ -735,7 +744,7 @@ void DBImpl::BackgroundCall() { } void DBImpl::BackgroundGarbageCollect() { - MutexLock l(&mutex_); + mutex_.AssertHeld(); assert(background_garbage_collect_scheduled_); if (shutting_down_.load(std::memory_order_acquire)) { @@ -1767,10 +1776,13 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, // 判断文件是否为 valuelog 文件 bool IsValueLogFile(const std::string& filename) { - return filename.find("valuelog_") == - 0; // 简单判断文件名是否匹配 valuelog 前缀 + // 检查文件是否以 ".valuelog" 结尾 + const std::string suffix = ".valuelog"; + return filename.size() > suffix.size() && + filename.substr(filename.size() - suffix.size()) == suffix; } + // 示例:解析 sstable 中的元信息 void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id, uint64_t& offset) { @@ -1781,14 +1793,33 @@ void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id, // 示例:获取 ValueLog 文件 ID uint64_t GetValueLogID(const std::string& valuelog_name) { - // 假设文件名中包含唯一的 ID,例如 "valuelog_1" - auto pos = valuelog_name.find_last_of('_'); - return std::stoull(valuelog_name.substr(pos + 1)); + // 使用 std::filesystem::path 解析文件名 + std::filesystem::path file_path(valuelog_name); + std::string filename = file_path.filename().string(); // 获取文件名部分 + + // 查找文件名中的 '.' 位置,提取数字部分 + auto pos = filename.find('.'); + if (pos == std::string::npos) { + assert(0); + } + + // 提取数字部分 + std::string id_str = filename.substr(0, pos); + // 检查提取的部分是否为有效数字 + for (char c : id_str) { + if (!isdigit(c)) { + assert(0); + } + } + + return std::stoull(id_str); // 转换为 uint64_t } + // 垃圾回收实现 void DBImpl::GarbageCollect() { // 遍历数据库目录,找到所有 valuelog 文件 + Log(options_.info_log, "start gc "); auto files_set = fs::directory_iterator(dbname_); for (const auto& cur_log_file : files_set) { if (fs::exists(cur_log_file) && @@ -1796,6 +1827,7 @@ void DBImpl::GarbageCollect() { IsValueLogFile(cur_log_file.path().filename().string())) { std::string valuelog_name = cur_log_file.path().string(); uint64_t cur_log_number = GetValueLogID(valuelog_name); + std::cout << "check point 1" << std::endl; uint64_t new_log_number = versions_->NewFileNumber(); WritableFile* new_valuelog = nullptr; std::string new_valuelog_name = LogFileName(dbname_, new_log_number); @@ -1807,13 +1839,7 @@ void DBImpl::GarbageCollect() { break; } addNewValueLog(); - - std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary); - if (!cur_valuelog.is_open()) { - std::cerr << "Failed to open ValueLog file: " << valuelog_name - << std::endl; - continue; - } + std::cout << "check point 2" << std::endl; // whether to reopen std::ifstream new_valuelog_file(new_valuelog_name, @@ -1825,16 +1851,96 @@ void DBImpl::GarbageCollect() { } uint64_t current_offset = 0; - uint64_t new_offset = 0; // 新的 ValueLog 偏移 + int cnt=0; + + std::cout << "check point 3" << std::endl; + + // Open the file in binary mode for reading + std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary); + if (!cur_valuelog.is_open()) { + std::cerr << "Failed to open file: " << valuelog_name << " for reading!" + << std::endl; + continue; + } while (true) { + ++cnt; + std::cout << cnt << std::endl; + // 读取一个 kv 对 uint64_t key_len, value_len; Slice key, value; - Slice new_value; - ReadValueLog(cur_log_number, current_offset, &key, &value); - value = std::string(new_value.data(), new_value.size()); + Status s = Status::OK(); + + // Seek to the position of key length + cur_valuelog.seekg(current_offset); + + // Read the length of the key + char* key_buf_len = new char[sizeof(uint64_t)]; + cur_valuelog.read(key_buf_len, sizeof(uint64_t)); + + if (cur_valuelog.eof()) { + delete[] key_buf_len; + break; // 正常退出条件:到达文件末尾 + } + + std::memcpy(&key_len, key_buf_len, sizeof(uint64_t)); + + if (!cur_valuelog.good()) { + delete[] key_buf_len; + cur_valuelog.close(); + std::cerr << "Failed to read file: " << valuelog_name << std::endl; + break; + } + + // Now seek to the actual key position and read the key + cur_valuelog.seekg(current_offset + sizeof(uint64_t)); + char* key_buf = new char[key_len]; + cur_valuelog.read(key_buf, key_len); + if (!cur_valuelog.good()) { + delete[] key_buf; + delete[] key_buf_len; + cur_valuelog.close(); + std::cerr << "Failed to read file: " << valuelog_name << std::endl; + break; + } + + // Assign the read key data to the Slice + key = Slice(key_buf, key_len); + + // Read the length of the value + cur_valuelog.seekg(current_offset + sizeof(uint64_t) + key_len); + char* value_buf_len = new char[sizeof(uint64_t)]; + cur_valuelog.read(value_buf_len, sizeof(uint64_t)); + uint64_t val_len = 0; + std::memcpy(&val_len, value_buf_len, sizeof(uint64_t)); + + if (!cur_valuelog.good()) { + delete[] key_buf; + delete[] key_buf_len; + delete[] value_buf_len; + cur_valuelog.close(); + std::cerr << "Failed to read file: " << valuelog_name << std::endl; + break; + } + + // Now seek to the actual data position and read the value + cur_valuelog.seekg(current_offset + sizeof(uint64_t) + key_len + sizeof(uint64_t)); + char* value_buf = new char[val_len]; + cur_valuelog.read(value_buf, val_len); + if (!cur_valuelog.good()) { + delete[] key_buf; + delete[] key_buf_len; + delete[] value_buf_len; + delete[] value_buf; + cur_valuelog.close(); + std::cerr << "Failed to read file: " << valuelog_name << std::endl; + break; + } + + // Assign the read value data to the Slice + value = Slice(value_buf, val_len); // 检查 key 是否在 sstable 中存在 std::string stored_value; @@ -1868,10 +1974,6 @@ void DBImpl::GarbageCollect() { continue; } - // 更新偏移 - new_offset += - sizeof(key_len) + key.size() + sizeof(value_len) + value.size(); - // 更新当前偏移 current_offset += sizeof(key_len) + key.size() + sizeof(value_len) + value.size(); @@ -1882,6 +1984,9 @@ void DBImpl::GarbageCollect() { new_valuelog_file.close(); std::remove(valuelog_name.c_str()); // 删除旧的 ValueLog 文件 + Log(options_.info_log, "remove file during gc %s", valuelog_name.c_str()); + Log(options_.info_log, "add file during gc %s", new_valuelog_name.c_str()); + } } } diff --git a/db/db_impl.h b/db/db_impl.h index 2c2d100..0f44458 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -86,6 +86,9 @@ class DBImpl : public DB { // Force current memtable contents to be compacted. Status TEST_CompactMemTable(); + void TEST_GarbageCollect() override; + + // Return an internal iterator over the current state of the database. // The keys of this iterator are internal keys (see format.h). // The returned iterator should be deleted when no longer needed. diff --git a/include/leveldb/db.h b/include/leveldb/db.h index a72a6db..a788c60 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -183,6 +183,8 @@ class LEVELDB_EXPORT DB { // Therefore the following call will compact the entire database: // db->CompactRange(nullptr, nullptr); virtual void CompactRange(const Slice* begin, const Slice* end) = 0; + virtual void TEST_GarbageCollect() = 0; + }; // Destroy the contents of the specified database. diff --git a/test/test.cpp b/test/test.cpp index 6e523ba..8dc25f6 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -158,7 +158,39 @@ Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,st // delete db; // } -TEST(Test, LARGE_DATA_COMPACT_TEST) { +// TEST(Test, LARGE_DATA_COMPACT_TEST) { +// DB *db; +// WriteOptions writeOptions; +// ReadOptions readOptions; +// if(OpenDB("testdb_for_XOY_large", &db).ok() == false) { +// std::cerr << "open db failed" << std::endl; +// abort(); +// } +// std::vector values; +// for(int i=0;i<500000;i++){ +// std::string key=std::to_string(i); +// std::string value; +// for(int j=0;j<1000;j++){ +// value+=std::to_string(i); +// } +// values.push_back(value); +// db->Put(writeOptions,key,value); +// } +// for(int i=0;i<500000;i++){ +// std::string key=std::to_string(i); +// std::string value; +// Status s=db->Get(readOptions,key,&value); +// assert(s.ok()); +// if(values[i]!=value){ +// std::cout< values; - for(int i=0;i<5000;i++){ + for(int i=0;i<500000;i++){ std::string key=std::to_string(i); std::string value; for(int j=0;j<1000;j++){ @@ -176,7 +208,11 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) { values.push_back(value); db->Put(writeOptions,key,value); } - for(int i=0;i<5000;i++){ + std::cout<<"start gc"<TEST_GarbageCollect(); + std::cout<<"finish gc"<Get(readOptions,key,&value); @@ -191,7 +227,6 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) { } - int main(int argc, char** argv) { // All tests currently run with the same read-only file limits. testing::InitGoogleTest(&argc, argv); From bc111ba32828e2352f37a6bd73e954e51e85258d Mon Sep 17 00:00:00 2001 From: xxy <3237539022@qq.com> Date: Thu, 5 Dec 2024 14:09:00 +0800 Subject: [PATCH 5/6] gc deadlock --- db/db_impl.cc | 30 +++++++++++++++++++----------- test/test.cpp | 5 +++-- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index cb52517..231e29c 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1703,7 +1703,7 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, // Open the file in binary mode for reading std::ifstream inFile(file_name_, std::ios::in | std::ios::binary); if (!inFile.is_open()) { - std::cerr << "Failed to open file: " << file_name_ << " for reading!" + std::cerr << "Failed to open file: " << file_name_ << " for read valuelog!" << std::endl; return Status::Corruption("Failed to open file for reading!"); } @@ -1826,6 +1826,7 @@ void DBImpl::GarbageCollect() { fs::is_regular_file(fs::status(cur_log_file)) && IsValueLogFile(cur_log_file.path().filename().string())) { std::string valuelog_name = cur_log_file.path().string(); + std::cout << valuelog_name << std::endl; uint64_t cur_log_number = GetValueLogID(valuelog_name); std::cout << "check point 1" << std::endl; uint64_t new_log_number = versions_->NewFileNumber(); @@ -1851,6 +1852,8 @@ void DBImpl::GarbageCollect() { } uint64_t current_offset = 0; + uint64_t tmp_offset = 0; + int cnt=0; std::cout << "check point 3" << std::endl; @@ -1858,14 +1861,16 @@ void DBImpl::GarbageCollect() { // Open the file in binary mode for reading std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary); if (!cur_valuelog.is_open()) { - std::cerr << "Failed to open file: " << valuelog_name << " for reading!" + std::cerr << "Failed to open file: " << valuelog_name << " for reading cur_valuelog!" << std::endl; continue; } while (true) { + tmp_offset=current_offset; ++cnt; - std::cout << cnt << std::endl; + std::cout << cnt <<" "< values; - for(int i=0;i<500000;i++){ + for(int i=0;i<5000;i++){ std::string key=std::to_string(i); std::string value; for(int j=0;j<1000;j++){ @@ -212,7 +212,8 @@ TEST(Test, Garbage_Collect_TEST) { db->TEST_GarbageCollect(); std::cout<<"finish gc"<Get(readOptions,key,&value); From 8d45bf108bfe8f41b339591598737b54c4eb5799 Mon Sep 17 00:00:00 2001 From: xxy <3237539022@qq.com> Date: Fri, 6 Dec 2024 11:11:29 +0800 Subject: [PATCH 6/6] v3 roughly complete while mutex lacks in put --- CMakeLists.txt | 2 +- db/db_impl.cc | 322 +++++++++++++++++++++++----------------------- db/db_impl.h | 7 + include/leveldb/options.h | 3 + test/test.cpp | 6 +- 5 files changed, 174 insertions(+), 166 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 99951c3..122df31 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,7 +10,7 @@ project(leveldb VERSION 1.23.0 LANGUAGES C CXX) if(NOT CMAKE_C_STANDARD) # This project can use C11, but will gracefully decay down to C89. # 我改到17了 - set(CMAKE_C_STANDARD 11) + set(CMAKE_C_STANDARD 17) set(CMAKE_C_STANDARD_REQUIRED OFF) set(CMAKE_C_EXTENSIONS OFF) endif(NOT CMAKE_C_STANDARD) diff --git a/db/db_impl.cc b/db/db_impl.cc index 231e29c..27f8560 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -142,6 +143,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) db_lock_(nullptr), shutting_down_(false), background_work_finished_signal_(&mutex_), + background_gc_finished_signal_(&gc_mutex_), mem_(nullptr), imm_(nullptr), has_imm_(false), @@ -668,8 +670,9 @@ void DBImpl::TEST_GarbageCollect() { // Finish current background compaction in the case where // `background_work_finished_signal_` was signalled due to an error. while (background_garbage_collect_scheduled_) { - background_work_finished_signal_.Wait(); + background_gc_finished_signal_.Wait(); } + // std::cout<<"bg_signal"<Schedule(&DBImpl::BGWorkGC, this); + auto bg_thread_ = std::thread(&DBImpl::BGWorkGC, this); + bg_thread_.detach(); } - } void DBImpl::BGWorkGC(void* db) { @@ -744,7 +748,7 @@ void DBImpl::BackgroundCall() { } void DBImpl::BackgroundGarbageCollect() { - mutex_.AssertHeld(); + MutexLock l(&gc_mutex_); assert(background_garbage_collect_scheduled_); if (shutting_down_.load(std::memory_order_acquire)) { @@ -754,12 +758,15 @@ void DBImpl::BackgroundGarbageCollect() { } else { // Perform garbage collection here GarbageCollect(); + gc_mutex_.Unlock(); + } background_garbage_collect_scheduled_ = false; // Notify any waiting threads - background_work_finished_signal_.SignalAll(); + background_gc_finished_signal_.SignalAll(); + } void DBImpl::BackgroundCompaction() { @@ -1219,6 +1226,10 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, mem->Unref(); if (imm != nullptr) imm->Unref(); current->Unref(); + if(!s.ok())return s; + if(options.find_value_log_for_gc){ + return s; + } if (value->c_str()[0] == 0x00) { *value = value->substr(1); @@ -1233,7 +1244,10 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, if (!res) return Status::Corruption("can't decode file id"); res = GetVarint64(&value_log_slice, &valuelog_offset); if (!res) return Status::Corruption("can't decode valuelog offset"); - ReadValueLog(file_id, valuelog_offset, &new_key, &new_value); + s=ReadValueLog(file_id, valuelog_offset, &new_key, &new_value); + if(!s.ok()){ + return s; + } *value = std::string(new_value.data(), new_value.size()); delete[] new_value.data(); return s; @@ -1787,8 +1801,9 @@ bool IsValueLogFile(const std::string& filename) { void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id, uint64_t& offset) { // 假设 stored_value 格式为:valuelog_id|offset - std::istringstream iss(stored_value); - iss >> valuelog_id >> offset; + Slice tmp(stored_value.data(),stored_value.size()); + GetVarint64(&tmp,&valuelog_id); + GetVarint64(&tmp,&offset); } // 示例:获取 ValueLog 文件 ID @@ -1818,184 +1833,167 @@ uint64_t GetValueLogID(const std::string& valuelog_name) { // 垃圾回收实现 void DBImpl::GarbageCollect() { + gc_mutex_.AssertHeld(); // 遍历数据库目录,找到所有 valuelog 文件 Log(options_.info_log, "start gc "); auto files_set = fs::directory_iterator(dbname_); + std::set valuelog_set; + std::string cur_valuelog_name=ValueLogFileName(dbname_,valuelogfile_number_); for (const auto& cur_log_file : files_set) { if (fs::exists(cur_log_file) && - fs::is_regular_file(fs::status(cur_log_file)) && - IsValueLogFile(cur_log_file.path().filename().string())) { - std::string valuelog_name = cur_log_file.path().string(); - std::cout << valuelog_name << std::endl; - uint64_t cur_log_number = GetValueLogID(valuelog_name); - std::cout << "check point 1" << std::endl; - uint64_t new_log_number = versions_->NewFileNumber(); - WritableFile* new_valuelog = nullptr; - std::string new_valuelog_name = LogFileName(dbname_, new_log_number); - Status s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), - &new_valuelog); - if (!s.ok()) { - // Avoid chewing through file number space in a tight loop. - versions_->ReuseFileNumber(new_log_number); - break; - } - addNewValueLog(); - std::cout << "check point 2" << std::endl; - - // whether to reopen - std::ifstream new_valuelog_file(new_valuelog_name, - std::ios::in | std::ios::binary); - if (!new_valuelog_file.is_open()) { - std::cerr << "Failed to create new ValueLog file: " << new_valuelog_name - << std::endl; - continue; - } - - uint64_t current_offset = 0; - uint64_t tmp_offset = 0; + fs::is_regular_file(fs::status(cur_log_file)) && + IsValueLogFile(cur_log_file.path().filename().string())) { + if(cur_valuelog_name==cur_log_file.path().filename().string())continue; + valuelog_set.emplace(cur_log_file.path().filename().string()); + } + } + for (std::string valuelog_name:valuelog_set) { + // std::cout << valuelog_name << std::endl; + uint64_t cur_log_number = GetValueLogID(valuelog_name); + valuelog_name=ValueLogFileName(dbname_,cur_log_number); - int cnt=0; + uint64_t current_offset = 0; + uint64_t tmp_offset = 0; - std::cout << "check point 3" << std::endl; + int cnt=0; - // Open the file in binary mode for reading - std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary); - if (!cur_valuelog.is_open()) { - std::cerr << "Failed to open file: " << valuelog_name << " for reading cur_valuelog!" - << std::endl; - continue; - } - - while (true) { - tmp_offset=current_offset; - ++cnt; - std::cout << cnt <<" "< shutting_down_; port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_); + port::CondVar background_gc_finished_signal_ GUARDED_BY(gc_mutex_); + + // Slice valuelog_finding_key GUARDED_BY(mutex_ ); + MemTable* mem_; MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted std::atomic has_imm_; // So bg thread can detect non-null imm_ diff --git a/include/leveldb/options.h b/include/leveldb/options.h index d755f46..cad5032 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -157,6 +157,8 @@ struct LEVELDB_EXPORT ReadOptions { // Callers may wish to set this field to false for bulk scans. bool fill_cache = true; + bool find_value_log_for_gc = false; + // If "snapshot" is non-null, read as of the supplied snapshot // (which must belong to the DB that is being read and which must // not have been released). If "snapshot" is null, use an implicit @@ -183,6 +185,7 @@ struct LEVELDB_EXPORT WriteOptions { // with sync==true has similar crash semantics to a "write()" // system call followed by "fsync()". bool sync = false; + // bool valuelog_write=false; }; } // namespace leveldb diff --git a/test/test.cpp b/test/test.cpp index 2b7ac95..077e947 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -199,7 +199,7 @@ TEST(Test, Garbage_Collect_TEST) { abort(); } std::vector values; - for(int i=0;i<5000;i++){ + for(int i=0;i<500000;i++){ std::string key=std::to_string(i); std::string value; for(int j=0;j<1000;j++){ @@ -212,8 +212,8 @@ TEST(Test, Garbage_Collect_TEST) { db->TEST_GarbageCollect(); std::cout<<"finish gc"<Get(readOptions,key,&value);