diff --git a/db/builder.cc b/db/builder.cc index e6329e0..7cf13f8 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -28,6 +28,15 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, return s; } + { + auto tmp_value=iter->value(); + if(tmp_value.data()[0]==(char)(0x01)){ + tmp_value.remove_prefix(1); + assert(GetVarint64(&tmp_value,&meta->valuelog_id)); + } + else meta->valuelog_id=0; + } + TableBuilder* builder = new TableBuilder(options, file); meta->smallest.DecodeFrom(iter->key()); Slice key; diff --git a/db/db_impl.cc b/db/db_impl.cc index 2e3e21b..94a45b2 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -57,6 +57,7 @@ struct DBImpl::CompactionState { struct Output { uint64_t number; uint64_t file_size; + uint64_t valuelog_id; InternalKey smallest, largest; }; @@ -83,6 +84,10 @@ struct DBImpl::CompactionState { WritableFile* outfile; TableBuilder* builder; + WritableFile* valuelogfile; + uint64_t valuelog_offset=0; + uint64_t valuelog_file_id=0; + uint64_t total_bytes; }; @@ -537,7 +542,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } edit->AddFile(level, meta.number, meta.file_size, meta.smallest, - meta.largest); + meta.largest,meta.valuelog_id); } CompactionStats stats; @@ -741,7 +746,7 @@ void DBImpl::BackgroundCompaction() { FileMetaData* f = c->input(0, 0); c->edit()->RemoveFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, - f->largest); + f->largest,f->valuelog_id); status = versions_->LogAndApply(c->edit(), &mutex_); if (!status.ok()) { RecordBackgroundError(status); @@ -815,7 +820,11 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { out.number = file_number; out.smallest.Clear(); out.largest.Clear(); + compact->valuelog_file_id=versions_->NewFileNumber(); + out.valuelog_id=compact->valuelog_file_id; + compact->outputs.push_back(out); + mutex_.Unlock(); } @@ -825,6 +834,11 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { if (s.ok()) { compact->builder = new TableBuilder(options_, compact->outfile); } + + + compact->valuelog_offset=0; + s = env_->NewWritableFile(ValueLogFileName(dbname_,compact->valuelog_file_id), &compact->valuelogfile); + return s; } @@ -860,6 +874,19 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, } delete compact->outfile; compact->outfile = nullptr; + if (s.ok()) { + s = compact->valuelogfile->Flush(); + } + if (s.ok()) { + s = compact->valuelogfile->Sync(); + } + if (s.ok()) { + s = compact->valuelogfile->Close(); + } + delete compact->valuelogfile; + compact->valuelogfile=nullptr; + compact->valuelog_file_id=0; + compact->valuelog_offset=0; if (s.ok() && current_entries > 0) { // Verify that the table is usable @@ -890,7 +917,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, - out.smallest, out.largest); + out.smallest, out.largest,out.valuelog_id); } return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } @@ -924,6 +951,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { std::string current_user_key; bool has_current_user_key = false; SequenceNumber last_sequence_for_key = kMaxSequenceNumber; + std::set old_valuelog_ids; + for (int which = 0; which < 2; which++) { + for (int i = 0; i < compact->compaction->num_input_files(which); i++) { + if(compact->compaction->input(which, i)->valuelog_id)old_valuelog_ids.emplace(compact->compaction->input(which, i)->valuelog_id); + } + } while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { // Prioritize immutable compaction work if (has_imm_.load(std::memory_order_relaxed)) { @@ -1004,7 +1037,35 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { compact->current_output()->smallest.DecodeFrom(key); } compact->current_output()->largest.DecodeFrom(key); - compact->builder->Add(key, input->value()); + + Slice old_value=input->value(); + Slice new_value; + std::string buf=""; + if(old_value.data()[0]==(char)(0x00)){ + new_value=old_value; + } + else{ + old_value.remove_prefix(1); + uint64_t file_id,valuelog_offset,valuelog_len; + bool res=GetVarint64(&old_value,&file_id); + if(!res)assert(0); + res=GetVarint64(&old_value,&valuelog_offset); + if(!res)assert(0); + res=GetVarint64(&old_value,&valuelog_len); + if(!res)assert(0); + Status s=ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); + assert(s.ok()); + writeValueLogForCompaction(compact->valuelogfile,{new_value}); + buf+=(char)(0x01); + PutVarint64(&buf,compact->valuelog_file_id); + PutVarint64(&buf,compact->valuelog_offset); + PutVarint64(&buf,valuelog_len); + compact->valuelog_offset+=valuelog_len; + delete []new_value.data(); + new_value=Slice(buf); + } + + compact->builder->Add(key, new_value); // Close output file if it is big enough if (compact->builder->FileSize() >= @@ -1028,6 +1089,14 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (status.ok()) { status = input->status(); } + //not completely correct, should be written in new function, related to removeabsol... + if(status.ok()){ + for(auto id:old_valuelog_ids){ + auto valuelog_filename=ValueLogFileName(dbname_,id); + Status s=env_->RemoveFile(valuelog_filename); + assert(s.ok()); + } + } delete input; input = nullptr; @@ -1168,6 +1237,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, return s; } Slice value_log_slice=Slice(value->c_str()+1,value->length()); + Slice new_value; uint64_t file_id,valuelog_offset,valuelog_len; bool res=GetVarint64(&value_log_slice,&file_id); if(!res)return Status::Corruption("can't decode file id"); @@ -1175,8 +1245,9 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, if(!res)return Status::Corruption("can't decode valuelog offset"); res=GetVarint64(&value_log_slice,&valuelog_len); if(!res)return Status::Corruption("can't decode valuelog len"); - ReadValueLog(file_id,valuelog_offset,valuelog_len,&value_log_slice); - *value=std::string(value_log_slice.data(),value_log_slice.size()); + ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); + *value=std::string(new_value.data(),new_value.size()); + delete []new_value.data(); return s; } @@ -1406,7 +1477,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { RecordBackgroundError(s); } delete logfile_; - + addNewValueLog(); logfile_ = lfile; logfile_number_ = new_log_number; log_ = new log::Writer(lfile); @@ -1502,7 +1573,17 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { } std::vector>> DBImpl::WriteValueLog(std::vector values){ //lock - std::string file_name_=std::to_string(valuelogfile_number_)+".VALUELOG"; + // std::vector>> res; + // for(int i=0;iAppend(values[i]); + // res.push_back({valuelogfile_number_,{valuelogfile_offset,len}}); + // valuelogfile_offset+=len; + // } + // //unlock + // valuelogfile_->Flush(); + // return res; + std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_); std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); if (!valueFile.is_open()) { assert(0); @@ -1519,28 +1600,41 @@ std::vector>> DBImpl::WriteValue valueFile.close(); return res; } +void DBImpl::writeValueLogForCompaction(WritableFile* target_file,std::vector values){ + for(int i=0;iAppend(values[i]); + } +} void DBImpl::addNewValueLog(){ //lock + // if(valuelogfile_){ + // valuelogfile_->Sync(); + // valuelogfile_->Close(); + // delete valuelogfile_; + // } valuelogfile_number_=versions_->NewFileNumber(); + // valuelogfile_offset=0; + // std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_); + // env_->NewWritableFile(file_name_,&valuelogfile_); //unlock } Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ //lock_shared - std::string file_name_=std::to_string(valuelogfile_number_)+".VALUELOG"; + std::string file_name_=ValueLogFileName(dbname_,file_id); + //std::cout<log_ = new log::Writer(lfile); impl->mem_ = new MemTable(impl->internal_comparator_); impl->mem_->Ref(); + impl->addNewValueLog(); } } if (s.ok() && save_manifest) { diff --git a/db/db_impl.h b/db/db_impl.h index aed3ece..767d41c 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -63,7 +63,9 @@ class DBImpl : public DB { void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void CompactRange(const Slice* begin, const Slice* end) override; std::vector>> WriteValueLog(std::vector value)override; - void addNewValueLog()override; + void writeValueLogForCompaction(WritableFile* target_file,std::vector value); + void addNewValueLog()override EXCLUSIVE_LOCKS_REQUIRED(mutex_);; + std::pair getNewValuelog();//use for compaction Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override; // Extra methods (for testing) that are not in the public DB interface @@ -196,8 +198,10 @@ class DBImpl : public DB { MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted std::atomic has_imm_; // So bg thread can detect non-null imm_ WritableFile* logfile_; + WritableFile* valuelogfile_; + int valuelogfile_offset=0; uint64_t logfile_number_; - uint64_t valuelogfile_number_ GUARDED_BY(mutex_); + uint64_t valuelogfile_number_; log::Writer* log_; uint32_t seed_ GUARDED_BY(mutex_); // For sampling. diff --git a/db/filename.cc b/db/filename.cc index e526249..ceff186 100644 --- a/db/filename.cc +++ b/db/filename.cc @@ -30,6 +30,11 @@ std::string LogFileName(const std::string& dbname, uint64_t number) { return MakeFileName(dbname, number, "log"); } +std::string ValueLogFileName(const std::string& dbname, uint64_t number){ + assert(number > 0); + return MakeFileName(dbname, number, "valuelog"); +} + std::string TableFileName(const std::string& dbname, uint64_t number) { assert(number > 0); return MakeFileName(dbname, number, "ldb"); diff --git a/db/filename.h b/db/filename.h index 563c6d8..6c71754 100644 --- a/db/filename.h +++ b/db/filename.h @@ -33,6 +33,8 @@ enum FileType { // "dbname". std::string LogFileName(const std::string& dbname, uint64_t number); +std::string ValueLogFileName(const std::string& dbname, uint64_t number); + // Return the name of the sstable with the specified number // in the db named by "dbname". The result will be prefixed with // "dbname". diff --git a/db/repair.cc b/db/repair.cc index 97a27c6..513327a 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -369,7 +369,7 @@ class Repairer { // TODO(opt): separate out into multiple levels const TableInfo& t = tables_[i]; edit_.AddFile(0, t.meta.number, t.meta.file_size, t.meta.smallest, - t.meta.largest); + t.meta.largest,t.meta.valuelog_id); } // std::fprintf(stderr, diff --git a/db/version_edit.cc b/db/version_edit.cc index 356ce88..2041a3e 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -79,6 +79,7 @@ void VersionEdit::EncodeTo(std::string* dst) const { PutVarint32(dst, new_files_[i].first); // level PutVarint64(dst, f.number); PutVarint64(dst, f.file_size); + PutVarint64(dst, f.valuelog_id); PutLengthPrefixedSlice(dst, f.smallest.Encode()); PutLengthPrefixedSlice(dst, f.largest.Encode()); } @@ -178,6 +179,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) { case kNewFile: if (GetLevel(&input, &level) && GetVarint64(&input, &f.number) && GetVarint64(&input, &f.file_size) && + GetVarint64(&input,&f.valuelog_id) && GetInternalKey(&input, &f.smallest) && GetInternalKey(&input, &f.largest)) { new_files_.push_back(std::make_pair(level, f)); @@ -247,6 +249,8 @@ std::string VersionEdit::DebugString() const { r.append(" "); AppendNumberTo(&r, f.file_size); r.append(" "); + AppendNumberTo(&r, f.valuelog_id); + r.append(" "); r.append(f.smallest.DebugString()); r.append(" .. "); r.append(f.largest.DebugString()); diff --git a/db/version_edit.h b/db/version_edit.h index 137b4b1..1ee850e 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -22,8 +22,10 @@ struct FileMetaData { int allowed_seeks; // Seeks allowed until compaction uint64_t number; uint64_t file_size; // File size in bytes + uint64_t valuelog_id=0; InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table + }; class VersionEdit { @@ -61,12 +63,13 @@ class VersionEdit { // REQUIRES: This version has not been saved (see VersionSet::SaveTo) // REQUIRES: "smallest" and "largest" are smallest and largest keys in file void AddFile(int level, uint64_t file, uint64_t file_size, - const InternalKey& smallest, const InternalKey& largest) { + const InternalKey& smallest, const InternalKey& largest,uint64_t valuelog_id=0) { FileMetaData f; f.number = file; f.file_size = file_size; f.smallest = smallest; f.largest = largest; + f.valuelog_id=valuelog_id; new_files_.push_back(std::make_pair(level, f)); } diff --git a/db/version_set.cc b/db/version_set.cc index 4e37bf9..dcce922 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1087,7 +1087,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { const std::vector& files = current_->files_[level]; for (size_t i = 0; i < files.size(); i++) { const FileMetaData* f = files[i]; - edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest); + edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest,f->valuelog_id); } } diff --git a/db/write_batch.cc b/db/write_batch.cc index cc20109..77af25a 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -131,6 +131,10 @@ class ValueLogInserter : public WriteBatch::Handler { public: WriteBatch writeBatch_; DB* db_; + ValueLogInserter(WriteBatch writeBatch,DB* db){ + writeBatch_=writeBatch; + db_=db; + } void Put(const Slice& key, const Slice& value) override { Slice new_value; @@ -165,8 +169,7 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { } Status WriteBatchInternal::ConverToValueLog(WriteBatch* b,DB* db_){ - ValueLogInserter inserter; - inserter.writeBatch_=WriteBatch(); + ValueLogInserter inserter(WriteBatch(),db_); auto res=b->Iterate(&inserter); *b=inserter.writeBatch_; return res; diff --git a/test/test.cpp b/test/test.cpp index 2ac837c..ca6a5ec 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -8,11 +8,10 @@ using namespace leveldb; using Field=std::pair; using FieldArray=std::vector>; -constexpr int value_size = 2048; -constexpr int data_size = 128 << 20; - Status OpenDB(std::string dbName, DB **db) { Options options; + options.max_file_size=16*1024; + options.write_buffer_size=32*1024; options.create_if_missing = true; return DB::Open(options, dbName, db); } @@ -26,9 +25,13 @@ bool CompareFieldArray(const FieldArray &a, const FieldArray &b) { } bool CompareKey(const std::vector a, std::vector b) { - if (a.size() != b.size()) return false; + if (a.size() != b.size()){ + return false; + } for (size_t i = 0; i < a.size(); ++i) { - if (a[i] != b[i]) return false; + if (a[i] != b[i]){ + return false; + } } return true; } @@ -43,7 +46,7 @@ std::string SerializeValue(const FieldArray& fields){ return res_; } - // 反序列化为字段数组 + // 鍙嶅簭鍒楀寲涓哄瓧娈垫暟缁? void DeserializeValue(const std::string& value_str,FieldArray* res){ Slice slice=Slice(value_str.c_str()); uint64_t siz; @@ -83,75 +86,107 @@ Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,st return Status::OK(); } -TEST(Test, CheckGetFields) { - DB *db; - WriteOptions writeOptions; - ReadOptions readOptions; - if(OpenDB("testdb_for_XOY", &db).ok() == false) { - std::cerr << "open db failed" << std::endl; - abort(); - } - std::string key1 = "k_1"; - std::string key2 = "k_2"; +// TEST(Test, CheckGetFields) { +// DB *db; +// WriteOptions writeOptions; +// ReadOptions readOptions; +// if(OpenDB("testdb_for_XOY", &db).ok() == false) { +// std::cerr << "open db failed" << std::endl; +// abort(); +// } +// std::string key1 = "k_1"; +// std::string key2 = "k_2"; - FieldArray fields1 = { - {"name", "Customer#000000001"}, - {"address", "IVhzIApeRb"}, - {"phone", "25-989-741-2988"} - }; - - FieldArray fields2 = { - {"name", "Customer#000000001"}, - {"address", "abc"}, - {"phone", "def"} - }; - - auto value1=SerializeValue(fields1); - auto value2=SerializeValue(fields2); - - db->Put(WriteOptions(), key1, value1); - db->Put(WriteOptions(), key2, value2); - - // 读取并反序列化 - std::string value_ret; - FieldArray res1; - - db->Get(ReadOptions(), key1, &value_ret); - DeserializeValue(value_ret, &res1); - for(auto pr:res1){ - std::cout<Put(WriteOptions(), key1, value1); +// db->Put(WriteOptions(), key2, value2); + +// // 璇诲彇骞跺弽搴忓垪鍖? +// std::string value_ret; +// FieldArray res1; + +// db->Get(ReadOptions(), key1, &value_ret); +// DeserializeValue(value_ret, &res1); +// for(auto pr:res1){ +// std::cout< keys = {key1, key2}; +// std::vector key_res; +// Get_keys_by_field(db,ReadOptions(),fields1[0],&key_res); +// for(auto s:key_res)std::cout< keys = {key1, key2}; - std::vector key_res; - Get_keys_by_field(db,ReadOptions(),fields1[0],&key_res); - for(auto s:key_res)std::cout< values; + for(int i=0;i<100000;i++){ + std::string key=std::to_string(i); + std::string value; + for(int j=0;j<1000;j++){ + value+=std::to_string(i); + } + values.push_back(value); + db->Put(writeOptions,key,value); + } + for(int i=0;i<100000;i++){ + std::string key=std::to_string(i); + std::string value; + Status s=db->Get(readOptions,key,&value); + assert(s.ok()); + if(values[i]!=value){ + std::cout<