diff --git a/db/builder.cc b/db/builder.cc index 7cf13f8..0ca249a 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -21,6 +21,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, iter->SeekToFirst(); std::string fname = TableFileName(dbname, meta->number); + std::string value_fname = ValueLogFileName(dbname, meta->valuelog_id); if (iter->Valid()) { WritableFile* file; s = env->NewWritableFile(fname, &file); @@ -28,22 +29,36 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, return s; } - { - auto tmp_value=iter->value(); - if(tmp_value.data()[0]==(char)(0x01)){ - tmp_value.remove_prefix(1); - assert(GetVarint64(&tmp_value,&meta->valuelog_id)); - } - else meta->valuelog_id=0; + WritableFile* value_file; + s = env->NewWritableFile(value_fname, &value_file); + if (!s.ok()) { + return s; } TableBuilder* builder = new TableBuilder(options, file); meta->smallest.DecodeFrom(iter->key()); Slice key; + uint64_t valuelog_offset=0; for (; iter->Valid(); iter->Next()) { key = iter->key(); - builder->Add(key, iter->value()); + Slice value=iter->value(); + Slice new_value=value; + if(value.size()>100){ + value.remove_prefix(1); + std::string buf; + value_file->Append(value); + buf+=(char)(0x01); + PutVarint64(&buf,meta->valuelog_id); + PutVarint64(&buf,valuelog_offset); + PutVarint64(&buf,value.size()); + valuelog_offset+=value.size(); + new_value=Slice(buf); + } + builder->Add(key, new_value); } + value_file->Flush(); + value_file->Sync(); + value_file->Close(); if (!key.empty()) { meta->largest.DecodeFrom(key); } diff --git a/db/db_impl.cc b/db/db_impl.cc index 9f51f7b..d91e6f4 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -518,6 +518,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; meta.number = versions_->NewFileNumber(); + meta.valuelog_id = versions_->NewFileNumber(); pending_outputs_.insert(meta.number); Iterator* iter = mem->NewIterator(); Log(options_.info_log, "Level-0 table #%llu: started", @@ -1092,14 +1093,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (status.ok()) { status = input->status(); } - //not completely correct, should be written in new function, related to removeabsol... - // if(status.ok()){ - // for(auto id:old_valuelog_ids){ - // auto valuelog_filename=ValueLogFileName(dbname_,id); - // Status s=env_->RemoveFile(valuelog_filename); - // assert(s.ok()); - // } - // } delete input; input = nullptr; @@ -1314,7 +1307,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Writer* last_writer = &w; if (status.ok() && updates != nullptr) { // nullptr batch is for compactions WriteBatch* write_batch = BuildBatchGroup(&last_writer); - WriteBatchInternal::ConverToValueLog(write_batch,this); WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); last_sequence += WriteBatchInternal::Count(write_batch); @@ -1480,7 +1472,6 @@ Status DBImpl::MakeRoomForWrite(bool force) { RecordBackgroundError(s); } delete logfile_; - addNewValueLog(); logfile_ = lfile; logfile_number_ = new_log_number; log_ = new log::Writer(lfile); @@ -1574,55 +1565,12 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { v->Unref(); } -std::vector>> DBImpl::WriteValueLog(std::vector values){ - //lock - // std::vector>> res; - // for(int i=0;iAppend(values[i]); - // res.push_back({valuelogfile_number_,{valuelogfile_offset,len}}); - // valuelogfile_offset+=len; - // } - // //unlock - // valuelogfile_->Flush(); - // return res; - std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_); - std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); - if (!valueFile.is_open()) { - assert(0); - } - uint64_t offset=valueFile.tellp(); - std::vector>> res; - for(int i=0;i values){ for(int i=0;iAppend(values[i]); } } -void DBImpl::addNewValueLog(){ - //lock - // if(valuelogfile_){ - // valuelogfile_->Sync(); - // valuelogfile_->Close(); - // delete valuelogfile_; - // } - valuelogfile_number_=versions_->NewFileNumber(); - // valuelogfile_offset=0; - // std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_); - // env_->NewWritableFile(file_name_,&valuelogfile_); - //unlock -} - Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ //lock_shared std::string file_name_=ValueLogFileName(dbname_,file_id); @@ -1644,7 +1592,11 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { WriteBatch batch; - batch.Put(key, value); + std::string buf; + buf+=(char)(0x00); + buf+=std::string(value.data(),value.size()); + Slice new_value=Slice(buf); + batch.Put(key, new_value); return Write(opt, &batch); } @@ -1678,7 +1630,6 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { impl->log_ = new log::Writer(lfile); impl->mem_ = new MemTable(impl->internal_comparator_); impl->mem_->Ref(); - impl->addNewValueLog(); } } if (s.ok() && save_manifest) { diff --git a/db/db_impl.h b/db/db_impl.h index ab21773..f3ab840 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -63,9 +63,7 @@ class DBImpl : public DB { bool GetProperty(const Slice& property, std::string* value) override; void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void CompactRange(const Slice* begin, const Slice* end) override; - std::vector>> WriteValueLog(std::vector value)override; void writeValueLogForCompaction(WritableFile* target_file,std::vector value); - void addNewValueLog()override EXCLUSIVE_LOCKS_REQUIRED(mutex_);; std::pair getNewValuelog();//use for compaction Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override; @@ -199,10 +197,7 @@ class DBImpl : public DB { MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted std::atomic has_imm_; // So bg thread can detect non-null imm_ WritableFile* logfile_; - WritableFile* valuelogfile_; - int valuelogfile_offset=0; uint64_t logfile_number_; - uint64_t valuelogfile_number_; log::Writer* log_; std::map oldvaluelog_ids; uint32_t seed_ GUARDED_BY(mutex_); // For sampling. diff --git a/db/write_batch.cc b/db/write_batch.cc index 77af25a..062329c 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -127,38 +127,38 @@ class MemTableInserter : public WriteBatch::Handler { sequence_++; } }; -class ValueLogInserter : public WriteBatch::Handler { - public: - WriteBatch writeBatch_; - DB* db_; - ValueLogInserter(WriteBatch writeBatch,DB* db){ - writeBatch_=writeBatch; - db_=db; - } - - void Put(const Slice& key, const Slice& value) override { - Slice new_value; - std::string buf; - if(value.size()<100){ - buf+=(char)(0x00); - buf.append(value.data(),value.size()); - } - else{ - buf+=(char)(0x01); - std::vector v; - v.push_back(value); - auto res=db_->WriteValueLog(v); - PutVarint64(&buf,res[0].first); - PutVarint64(&buf,res[0].second.first); - PutVarint64(&buf,res[0].second.second); - } - new_value=Slice(buf); - writeBatch_.Put(key,new_value); - } - void Delete(const Slice& key) override { - writeBatch_.Delete(key); - } -}; +// class ValueLogInserter : public WriteBatch::Handler { +// public: +// WriteBatch writeBatch_; +// DB* db_; +// ValueLogInserter(WriteBatch writeBatch,DB* db){ +// writeBatch_=writeBatch; +// db_=db; +// } + +// void Put(const Slice& key, const Slice& value) override { +// Slice new_value; +// std::string buf; +// if(value.size()<100){ +// buf+=(char)(0x00); +// buf.append(value.data(),value.size()); +// } +// else{ +// buf+=(char)(0x01); +// std::vector v; +// v.push_back(value); +// auto res=db_->WriteValueLog(v); +// PutVarint64(&buf,res[0].first); +// PutVarint64(&buf,res[0].second.first); +// PutVarint64(&buf,res[0].second.second); +// } +// new_value=Slice(buf); +// writeBatch_.Put(key,new_value); +// } +// void Delete(const Slice& key) override { +// writeBatch_.Delete(key); +// } +// }; } // namespace Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { @@ -168,13 +168,6 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { return b->Iterate(&inserter); } -Status WriteBatchInternal::ConverToValueLog(WriteBatch* b,DB* db_){ - ValueLogInserter inserter(WriteBatch(),db_); - auto res=b->Iterate(&inserter); - *b=inserter.writeBatch_; - return res; -} - void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { assert(contents.size() >= kHeader); b->rep_.assign(contents.data(), contents.size()); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 07869aa..fce86e3 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -37,8 +37,6 @@ class WriteBatchInternal { static Status InsertInto(const WriteBatch* batch, MemTable* memtable); - static Status ConverToValueLog(WriteBatch* batch,DB* db_); - static void Append(WriteBatch* dst, const WriteBatch* src); }; diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 566fbee..b9a5f4a 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -102,14 +102,6 @@ class LEVELDB_EXPORT DB { // virtual Status Get_keys_by_field(const ReadOptions& options, const Field field,std::vector *keys); - virtual std::vector>> WriteValueLog(std::vector value){ - assert(0); - std::vector>> v; - return v; - } - - virtual void addNewValueLog(){assert(0);} - virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ assert(0); // Not implemented return Status::Corruption("not imp"); diff --git a/test/test.cpp b/test/test.cpp index 78240d9..f7c22cc 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -10,8 +10,8 @@ using FieldArray=std::vector>; Status OpenDB(std::string dbName, DB **db) { Options options; - options.max_file_size=16*1024; - options.write_buffer_size=32*1024; + // options.max_file_size=16*1024; + // options.write_buffer_size=32*1024; options.create_if_missing = true; return DB::Open(options, dbName, db); } @@ -90,12 +90,13 @@ TEST(Test, CheckGetFields) { DB *db; WriteOptions writeOptions; ReadOptions readOptions; + std::cout<<"!!!"<