From 50b731220b8fbc21df740f5d5f6a7c13a673defb Mon Sep 17 00:00:00 2001 From: alexfisher <1823748191@qq.com> Date: Tue, 26 Nov 2024 15:06:37 +0800 Subject: [PATCH] k/v seperation version 2 roughly finish --- db/db_impl.cc | 85 ++++++++++++++++++++++++++++++++--------------- db/db_impl.h | 9 ++++- db/db_iter.cc | 21 ++++++++---- db/write_batch.cc | 36 ++++++++++++++++++++ db/write_batch_internal.h | 2 ++ include/leveldb/db.h | 13 ++++++++ 6 files changed, 132 insertions(+), 34 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 3f1ea16..d2932ef 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1162,18 +1162,21 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, mem->Unref(); if (imm != nullptr) imm->Unref(); current->Unref(); - std::ifstream inFile("tmp.txt", std::ios::in | std::ios::binary); - if (!inFile.is_open()) { - std::cerr << "Failed to open file for writing!" << std::endl; - return Status::Corruption("Failed to open file for writing!"); - } - uint64_t value_offset=*(uint64_t*)(value->c_str()); - size_t value_size=*(size_t*)(value->c_str()+sizeof(uint64_t)); - inFile.seekg(value_offset); - char value_buf[value_size]; - inFile.read(value_buf,value_size); - inFile.close(); - *value=std::string(value_buf,value_size); + + if(value->c_str()[0]==0x00){ + *value=value->substr(1); + return s; + } + Slice value_log_slice=Slice(value->c_str()+1,value->length()); + uint64_t file_id,valuelog_offset,valuelog_len; + bool res=GetVarint64(&value_log_slice,&file_id); + if(!res)return Status::Corruption("can't decode file id"); + res=GetVarint64(&value_log_slice,&valuelog_offset); + if(!res)return Status::Corruption("can't decode valuelog offset"); + res=GetVarint64(&value_log_slice,&valuelog_len); + if(!res)return Status::Corruption("can't decode valuelog len"); + ReadValueLog(file_id,valuelog_offset,valuelog_len,&value_log_slice); + *value=std::string(value_log_slice.data(),value_log_slice.size()); return s; } @@ -1297,6 +1300,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Writer* last_writer = &w; if (status.ok() && updates != nullptr) { // nullptr batch is for compactions WriteBatch* write_batch = BuildBatchGroup(&last_writer); + WriteBatchInternal::ConverToValueLog(write_batch,this); WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); last_sequence += WriteBatchInternal::Count(write_batch); @@ -1556,25 +1560,54 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { v->Unref(); } +std::vector>> DBImpl::WriteValueLog(std::vector values){ + //lock + std::string file_name_=std::to_string(valuelogfile_number_)+".VALUELOG"; + std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); + if (!valueFile.is_open()) { + assert(0); + } + uint64_t offset=valueFile.tellp(); + std::vector>> res; + for(int i=0;iNewFileNumber(); + //unlock +} + +Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ + //lock_shared + std::string file_name_=std::to_string(valuelogfile_number_)+".VALUELOG"; + std::ifstream inFile(file_name_, std::ios::in | std::ios::binary); + if (!inFile.is_open()) { + std::cerr << "Failed to open file for writing!" << std::endl; + return Status::Corruption("Failed to open file for writing!"); + } + inFile.seekg(offset); + char *value_buf=new char[len]; + inFile.read(value_buf,len); + inFile.close(); + value=new Slice(value_buf,len); + return Status::OK(); + //unlock_shared +} // Default implementations of convenience methods that subclasses of DB // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { WriteBatch batch; - std::ofstream valueFile("tmp.txt", std::ios::app | std::ios::binary); - if (!valueFile.is_open()) { - std::cerr << "Failed to open file for writing!" << std::endl; - return Status::Corruption("Failed to open file for writing!"); - } - uint64_t offset=valueFile.tellp(); - valueFile.write(value.data(),value.size()); - valueFile.close(); - auto value_len=value.size(); - char *new_value_buf=new char[sizeof(uint64_t)+sizeof(size_t)]; - memcpy(new_value_buf,(void*)(&offset),sizeof(uint64_t)); - memcpy(new_value_buf+sizeof(uint64_t),(void*)(&value_len),sizeof(size_t)); - Slice new_value=Slice(new_value_buf,sizeof(uint64_t)+sizeof(size_t)); - batch.Put(key, new_value); + batch.Put(key, value); return Write(opt, &batch); } diff --git a/db/db_impl.h b/db/db_impl.h index 1bd02b2..aed3ece 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -9,6 +9,8 @@ #include #include #include +#include +#include #include "db/dbformat.h" #include "db/log_writer.h" @@ -60,6 +62,9 @@ class DBImpl : public DB { bool GetProperty(const Slice& property, std::string* value) override; void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void CompactRange(const Slice* begin, const Slice* end) override; + std::vector>> WriteValueLog(std::vector value)override; + void addNewValueLog()override; + Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override; // Extra methods (for testing) that are not in the public DB interface @@ -184,13 +189,15 @@ class DBImpl : public DB { // State below is protected by mutex_ port::Mutex mutex_; + //std::shared_mutex value_log_mutex; std::atomic shutting_down_; port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_); MemTable* mem_; MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted std::atomic has_imm_; // So bg thread can detect non-null imm_ WritableFile* logfile_; - uint64_t logfile_number_ GUARDED_BY(mutex_); + uint64_t logfile_number_; + uint64_t valuelogfile_number_ GUARDED_BY(mutex_); log::Writer* log_; uint32_t seed_ GUARDED_BY(mutex_); // For sampling. diff --git a/db/db_iter.cc b/db/db_iter.cc index a7ce298..28d227e 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -69,13 +69,20 @@ class DBIter : public Iterator { Slice value() const override { assert(valid_); auto tmp_value= (direction_ == kForward) ? iter_->value() : saved_value_; - std::ifstream inFile("tmp.txt", std::ios::in | std::ios::binary); - uint64_t value_offset=*(uint64_t*)(tmp_value.data()); - size_t value_size=*(size_t*)(tmp_value.data()+sizeof(uint64_t)); - inFile.seekg(value_offset); - char *value_buf=new char[value_size]; - inFile.read(value_buf,value_size); - return Slice(value_buf,value_size); + if(tmp_value.data()[0]==0x00){ + tmp_value.remove_prefix(1); + return tmp_value; + } + tmp_value.remove_prefix(1); + uint64_t file_id,valuelog_offset,valuelog_len; + bool res=GetVarint64(&tmp_value,&file_id); + if(!res)assert(0); + res=GetVarint64(&tmp_value,&valuelog_offset); + if(!res)assert(0); + res=GetVarint64(&tmp_value,&valuelog_len); + if(!res)assert(0); + db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value); + return tmp_value; } Status status() const override { if (status_.ok()) { diff --git a/db/write_batch.cc b/db/write_batch.cc index b54313c..cc20109 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -127,6 +127,34 @@ class MemTableInserter : public WriteBatch::Handler { sequence_++; } }; +class ValueLogInserter : public WriteBatch::Handler { + public: + WriteBatch writeBatch_; + DB* db_; + + void Put(const Slice& key, const Slice& value) override { + Slice new_value; + std::string buf; + if(value.size()<100){ + buf+=(char)(0x00); + buf.append(value.data(),value.size()); + } + else{ + buf+=(char)(0x01); + std::vector v; + v.push_back(value); + auto res=db_->WriteValueLog(v); + PutVarint64(&buf,res[0].first); + PutVarint64(&buf,res[0].second.first); + PutVarint64(&buf,res[0].second.second); + } + new_value=Slice(buf); + writeBatch_.Put(key,new_value); + } + void Delete(const Slice& key) override { + writeBatch_.Delete(key); + } +}; } // namespace Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { @@ -136,6 +164,14 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { return b->Iterate(&inserter); } +Status WriteBatchInternal::ConverToValueLog(WriteBatch* b,DB* db_){ + ValueLogInserter inserter; + inserter.writeBatch_=WriteBatch(); + auto res=b->Iterate(&inserter); + *b=inserter.writeBatch_; + return res; +} + void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { assert(contents.size() >= kHeader); b->rep_.assign(contents.data(), contents.size()); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index fce86e3..07869aa 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -37,6 +37,8 @@ class WriteBatchInternal { static Status InsertInto(const WriteBatch* batch, MemTable* memtable); + static Status ConverToValueLog(WriteBatch* batch,DB* db_); + static void Append(WriteBatch* dst, const WriteBatch* src); }; diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 4db4883..4d4da6e 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -102,6 +102,19 @@ class LEVELDB_EXPORT DB { virtual Status Get_keys_by_field(const ReadOptions& options, const Field field,std::vector *keys); + virtual std::vector>> WriteValueLog(std::vector value){ + assert(0); + std::vector>> v; + return v; + } + + virtual void addNewValueLog(){assert(0);} + + virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ + assert(0); // Not implemented + return Status::Corruption("not imp"); + } + // Return a heap-allocated iterator over the contents of the database. // The result of NewIterator() is initially invalid (caller must // call one of the Seek methods on the iterator before using it).