From 6b1da0d8b424f8cb68a2dde7085b53bb3978eeb1 Mon Sep 17 00:00:00 2001 From: alexfisher <1823748191@qq.com> Date: Sat, 30 Nov 2024 20:38:09 +0800 Subject: [PATCH 1/2] update imp 2 --- db/db_impl.cc | 92 +++++++++++++++++++++++++++--- db/db_impl.h | 8 ++- db/filename.cc | 5 ++ db/filename.h | 2 + db/write_batch.cc | 7 ++- test/test.cpp | 165 ++++++++++++++++++++++++++++++++---------------------- 6 files changed, 201 insertions(+), 78 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 2e3e21b..f09caf1 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -83,6 +83,10 @@ struct DBImpl::CompactionState { WritableFile* outfile; TableBuilder* builder; + WritableFile* valuelogfile; + uint64_t valuelog_offset=0; + uint64_t valuelog_file_id=0; + uint64_t total_bytes; }; @@ -816,6 +820,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { out.smallest.Clear(); out.largest.Clear(); compact->outputs.push_back(out); + compact->valuelog_file_id=versions_->NewFileNumber(); mutex_.Unlock(); } @@ -825,6 +830,11 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { if (s.ok()) { compact->builder = new TableBuilder(options_, compact->outfile); } + + + compact->valuelog_offset=0; + s = env_->NewWritableFile(ValueLogFileName(dbname_,compact->valuelog_file_id), &compact->valuelogfile); + return s; } @@ -860,6 +870,19 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, } delete compact->outfile; compact->outfile = nullptr; + if (s.ok()) { + s = compact->valuelogfile->Flush(); + } + if (s.ok()) { + s = compact->valuelogfile->Sync(); + } + if (s.ok()) { + s = compact->valuelogfile->Close(); + } + delete compact->valuelogfile; + compact->valuelogfile=nullptr; + compact->valuelog_file_id=0; + compact->valuelog_offset=0; if (s.ok() && current_entries > 0) { // Verify that the table is usable @@ -1004,7 +1027,34 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { compact->current_output()->smallest.DecodeFrom(key); } compact->current_output()->largest.DecodeFrom(key); - compact->builder->Add(key, input->value()); + + Slice old_value=input->value(); + Slice new_value; + std::string buf=""; + if(old_value.data()[0]==(char)(0x00)){ + new_value=old_value; + } + else{ + uint64_t file_id,valuelog_offset,valuelog_len; + bool res=GetVarint64(&old_value,&file_id); + if(!res)assert(0); + res=GetVarint64(&old_value,&valuelog_offset); + if(!res)assert(0); + res=GetVarint64(&old_value,&valuelog_len); + if(!res)assert(0); + Status s=ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); + assert(s.ok()); + writeValueLogForCompaction(compact->valuelogfile,{new_value}); + buf+=(char)(0x01); + PutVarint64(&buf,compact->valuelog_file_id); + PutVarint64(&buf,compact->valuelog_offset); + PutVarint64(&buf,valuelog_len); + compact->valuelog_offset+=valuelog_len; + delete []new_value.data(); + new_value=Slice(buf); + } + + compact->builder->Add(key, new_value); // Close output file if it is big enough if (compact->builder->FileSize() >= @@ -1168,6 +1218,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, return s; } Slice value_log_slice=Slice(value->c_str()+1,value->length()); + Slice new_value; uint64_t file_id,valuelog_offset,valuelog_len; bool res=GetVarint64(&value_log_slice,&file_id); if(!res)return Status::Corruption("can't decode file id"); @@ -1175,8 +1226,9 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, if(!res)return Status::Corruption("can't decode valuelog offset"); res=GetVarint64(&value_log_slice,&valuelog_len); if(!res)return Status::Corruption("can't decode valuelog len"); - ReadValueLog(file_id,valuelog_offset,valuelog_len,&value_log_slice); - *value=std::string(value_log_slice.data(),value_log_slice.size()); + ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); + *value=std::string(new_value.data(),new_value.size()); + delete []new_value.data(); return s; } @@ -1406,7 +1458,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { RecordBackgroundError(s); } delete logfile_; - + addNewValueLog(); logfile_ = lfile; logfile_number_ = new_log_number; log_ = new log::Writer(lfile); @@ -1502,7 +1554,17 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { } std::vector>> DBImpl::WriteValueLog(std::vector values){ //lock - std::string file_name_=std::to_string(valuelogfile_number_)+".VALUELOG"; + // std::vector>> res; + // for(int i=0;iAppend(values[i]); + // res.push_back({valuelogfile_number_,{valuelogfile_offset,len}}); + // valuelogfile_offset+=len; + // } + // //unlock + // valuelogfile_->Flush(); + // return res; + std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_); std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); if (!valueFile.is_open()) { assert(0); @@ -1519,16 +1581,30 @@ std::vector>> DBImpl::WriteValue valueFile.close(); return res; } +void DBImpl::writeValueLogForCompaction(WritableFile* target_file,std::vector values){ + for(int i=0;iAppend(values[i]); + } +} void DBImpl::addNewValueLog(){ //lock + // if(valuelogfile_){ + // valuelogfile_->Sync(); + // valuelogfile_->Close(); + // delete valuelogfile_; + // } valuelogfile_number_=versions_->NewFileNumber(); + // valuelogfile_offset=0; + // std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_); + // env_->NewWritableFile(file_name_,&valuelogfile_); //unlock } Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ //lock_shared - std::string file_name_=std::to_string(valuelogfile_number_)+".VALUELOG"; + std::string file_name_=ValueLogFileName(dbname_,file_id); + //std::cout<log_ = new log::Writer(lfile); impl->mem_ = new MemTable(impl->internal_comparator_); impl->mem_->Ref(); + impl->addNewValueLog(); } } if (s.ok() && save_manifest) { diff --git a/db/db_impl.h b/db/db_impl.h index aed3ece..767d41c 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -63,7 +63,9 @@ class DBImpl : public DB { void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void CompactRange(const Slice* begin, const Slice* end) override; std::vector>> WriteValueLog(std::vector value)override; - void addNewValueLog()override; + void writeValueLogForCompaction(WritableFile* target_file,std::vector value); + void addNewValueLog()override EXCLUSIVE_LOCKS_REQUIRED(mutex_);; + std::pair getNewValuelog();//use for compaction Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override; // Extra methods (for testing) that are not in the public DB interface @@ -196,8 +198,10 @@ class DBImpl : public DB { MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted std::atomic has_imm_; // So bg thread can detect non-null imm_ WritableFile* logfile_; + WritableFile* valuelogfile_; + int valuelogfile_offset=0; uint64_t logfile_number_; - uint64_t valuelogfile_number_ GUARDED_BY(mutex_); + uint64_t valuelogfile_number_; log::Writer* log_; uint32_t seed_ GUARDED_BY(mutex_); // For sampling. diff --git a/db/filename.cc b/db/filename.cc index e526249..ceff186 100644 --- a/db/filename.cc +++ b/db/filename.cc @@ -30,6 +30,11 @@ std::string LogFileName(const std::string& dbname, uint64_t number) { return MakeFileName(dbname, number, "log"); } +std::string ValueLogFileName(const std::string& dbname, uint64_t number){ + assert(number > 0); + return MakeFileName(dbname, number, "valuelog"); +} + std::string TableFileName(const std::string& dbname, uint64_t number) { assert(number > 0); return MakeFileName(dbname, number, "ldb"); diff --git a/db/filename.h b/db/filename.h index 563c6d8..6c71754 100644 --- a/db/filename.h +++ b/db/filename.h @@ -33,6 +33,8 @@ enum FileType { // "dbname". std::string LogFileName(const std::string& dbname, uint64_t number); +std::string ValueLogFileName(const std::string& dbname, uint64_t number); + // Return the name of the sstable with the specified number // in the db named by "dbname". The result will be prefixed with // "dbname". diff --git a/db/write_batch.cc b/db/write_batch.cc index cc20109..77af25a 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -131,6 +131,10 @@ class ValueLogInserter : public WriteBatch::Handler { public: WriteBatch writeBatch_; DB* db_; + ValueLogInserter(WriteBatch writeBatch,DB* db){ + writeBatch_=writeBatch; + db_=db; + } void Put(const Slice& key, const Slice& value) override { Slice new_value; @@ -165,8 +169,7 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { } Status WriteBatchInternal::ConverToValueLog(WriteBatch* b,DB* db_){ - ValueLogInserter inserter; - inserter.writeBatch_=WriteBatch(); + ValueLogInserter inserter(WriteBatch(),db_); auto res=b->Iterate(&inserter); *b=inserter.writeBatch_; return res; diff --git a/test/test.cpp b/test/test.cpp index 2ac837c..14c2a51 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -8,9 +8,6 @@ using namespace leveldb; using Field=std::pair; using FieldArray=std::vector>; -constexpr int value_size = 2048; -constexpr int data_size = 128 << 20; - Status OpenDB(std::string dbName, DB **db) { Options options; options.create_if_missing = true; @@ -26,9 +23,13 @@ bool CompareFieldArray(const FieldArray &a, const FieldArray &b) { } bool CompareKey(const std::vector a, std::vector b) { - if (a.size() != b.size()) return false; + if (a.size() != b.size()){ + return false; + } for (size_t i = 0; i < a.size(); ++i) { - if (a[i] != b[i]) return false; + if (a[i] != b[i]){ + return false; + } } return true; } @@ -43,7 +44,7 @@ std::string SerializeValue(const FieldArray& fields){ return res_; } - // 反序列化为字段数组 + // 鍙嶅簭鍒楀寲涓哄瓧娈垫暟缁? void DeserializeValue(const std::string& value_str,FieldArray* res){ Slice slice=Slice(value_str.c_str()); uint64_t siz; @@ -83,75 +84,107 @@ Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,st return Status::OK(); } -TEST(Test, CheckGetFields) { - DB *db; - WriteOptions writeOptions; - ReadOptions readOptions; - if(OpenDB("testdb_for_XOY", &db).ok() == false) { - std::cerr << "open db failed" << std::endl; - abort(); - } - std::string key1 = "k_1"; - std::string key2 = "k_2"; +// TEST(Test, CheckGetFields) { +// DB *db; +// WriteOptions writeOptions; +// ReadOptions readOptions; +// if(OpenDB("testdb_for_XOY", &db).ok() == false) { +// std::cerr << "open db failed" << std::endl; +// abort(); +// } +// std::string key1 = "k_1"; +// std::string key2 = "k_2"; - FieldArray fields1 = { - {"name", "Customer#000000001"}, - {"address", "IVhzIApeRb"}, - {"phone", "25-989-741-2988"} - }; - - FieldArray fields2 = { - {"name", "Customer#000000001"}, - {"address", "abc"}, - {"phone", "def"} - }; - - auto value1=SerializeValue(fields1); - auto value2=SerializeValue(fields2); - - db->Put(WriteOptions(), key1, value1); - db->Put(WriteOptions(), key2, value2); - - // 读取并反序列化 - std::string value_ret; - FieldArray res1; - - db->Get(ReadOptions(), key1, &value_ret); - DeserializeValue(value_ret, &res1); - for(auto pr:res1){ - std::cout<Put(WriteOptions(), key1, value1); +// db->Put(WriteOptions(), key2, value2); + +// // 璇诲彇骞跺弽搴忓垪鍖? +// std::string value_ret; +// FieldArray res1; + +// db->Get(ReadOptions(), key1, &value_ret); +// DeserializeValue(value_ret, &res1); +// for(auto pr:res1){ +// std::cout< keys = {key1, key2}; +// std::vector key_res; +// Get_keys_by_field(db,ReadOptions(),fields1[0],&key_res); +// for(auto s:key_res)std::cout< keys = {key1, key2}; - std::vector key_res; - Get_keys_by_field(db,ReadOptions(),fields1[0],&key_res); - for(auto s:key_res)std::cout< values; + for(int i=0;i<1000;i++){ + std::string key=std::to_string(i); + std::string value; + for(int j=0;j<1000;j++){ + value+=std::to_string(i); + } + values.push_back(value); + db->Put(writeOptions,key,value); + } + for(int i=0;i<1000;i++){ + std::string key=std::to_string(i); + std::string value; + Status s=db->Get(readOptions,key,&value); + assert(s.ok()); + if(values[i]!=value){ + std::cout< Date: Sat, 30 Nov 2024 22:51:33 +0800 Subject: [PATCH 2/2] version 2 roughly complete with GC --- db/builder.cc | 9 +++++++++ db/db_impl.cc | 29 ++++++++++++++++++++++++----- db/repair.cc | 2 +- db/version_edit.cc | 4 ++++ db/version_edit.h | 5 ++++- db/version_set.cc | 2 +- test/test.cpp | 6 ++++-- 7 files changed, 47 insertions(+), 10 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index e6329e0..7cf13f8 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -28,6 +28,15 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, return s; } + { + auto tmp_value=iter->value(); + if(tmp_value.data()[0]==(char)(0x01)){ + tmp_value.remove_prefix(1); + assert(GetVarint64(&tmp_value,&meta->valuelog_id)); + } + else meta->valuelog_id=0; + } + TableBuilder* builder = new TableBuilder(options, file); meta->smallest.DecodeFrom(iter->key()); Slice key; diff --git a/db/db_impl.cc b/db/db_impl.cc index f09caf1..94a45b2 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -57,6 +57,7 @@ struct DBImpl::CompactionState { struct Output { uint64_t number; uint64_t file_size; + uint64_t valuelog_id; InternalKey smallest, largest; }; @@ -541,7 +542,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } edit->AddFile(level, meta.number, meta.file_size, meta.smallest, - meta.largest); + meta.largest,meta.valuelog_id); } CompactionStats stats; @@ -745,7 +746,7 @@ void DBImpl::BackgroundCompaction() { FileMetaData* f = c->input(0, 0); c->edit()->RemoveFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, - f->largest); + f->largest,f->valuelog_id); status = versions_->LogAndApply(c->edit(), &mutex_); if (!status.ok()) { RecordBackgroundError(status); @@ -819,8 +820,11 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { out.number = file_number; out.smallest.Clear(); out.largest.Clear(); - compact->outputs.push_back(out); compact->valuelog_file_id=versions_->NewFileNumber(); + out.valuelog_id=compact->valuelog_file_id; + + compact->outputs.push_back(out); + mutex_.Unlock(); } @@ -913,7 +917,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, - out.smallest, out.largest); + out.smallest, out.largest,out.valuelog_id); } return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } @@ -947,6 +951,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { std::string current_user_key; bool has_current_user_key = false; SequenceNumber last_sequence_for_key = kMaxSequenceNumber; + std::set old_valuelog_ids; + for (int which = 0; which < 2; which++) { + for (int i = 0; i < compact->compaction->num_input_files(which); i++) { + if(compact->compaction->input(which, i)->valuelog_id)old_valuelog_ids.emplace(compact->compaction->input(which, i)->valuelog_id); + } + } while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { // Prioritize immutable compaction work if (has_imm_.load(std::memory_order_relaxed)) { @@ -1035,6 +1045,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { new_value=old_value; } else{ + old_value.remove_prefix(1); uint64_t file_id,valuelog_offset,valuelog_len; bool res=GetVarint64(&old_value,&file_id); if(!res)assert(0); @@ -1078,6 +1089,14 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (status.ok()) { status = input->status(); } + //not completely correct, should be written in new function, related to removeabsol... + if(status.ok()){ + for(auto id:old_valuelog_ids){ + auto valuelog_filename=ValueLogFileName(dbname_,id); + Status s=env_->RemoveFile(valuelog_filename); + assert(s.ok()); + } + } delete input; input = nullptr; @@ -1607,7 +1626,7 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice //std::cout<& files = current_->files_[level]; for (size_t i = 0; i < files.size(); i++) { const FileMetaData* f = files[i]; - edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest); + edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest,f->valuelog_id); } } diff --git a/test/test.cpp b/test/test.cpp index 14c2a51..ca6a5ec 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -10,6 +10,8 @@ using FieldArray=std::vector>; Status OpenDB(std::string dbName, DB **db) { Options options; + options.max_file_size=16*1024; + options.write_buffer_size=32*1024; options.create_if_missing = true; return DB::Open(options, dbName, db); } @@ -165,7 +167,7 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) { abort(); } std::vector values; - for(int i=0;i<1000;i++){ + for(int i=0;i<100000;i++){ std::string key=std::to_string(i); std::string value; for(int j=0;j<1000;j++){ @@ -174,7 +176,7 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) { values.push_back(value); db->Put(writeOptions,key,value); } - for(int i=0;i<1000;i++){ + for(int i=0;i<100000;i++){ std::string key=std::to_string(i); std::string value; Status s=db->Get(readOptions,key,&value);