diff --git a/db/builder.cc b/db/builder.cc index 7cf13f8..780e6f9 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -28,6 +28,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, return s; } + // 如果第一个字节是 0x01,它会移除这个前缀,并尝试从剩下的数据中解析出 value { auto tmp_value=iter->value(); if(tmp_value.data()[0]==(char)(0x01)){ diff --git a/db/db_impl.cc b/db/db_impl.cc index 56d51ec..82d9ce7 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -839,8 +839,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { compact->builder = new TableBuilder(options_, compact->outfile); } - - compact->valuelog_offset=0; + // compaction 后的新 valuelog + compact->valuelog_offset=0; // why s = env_->NewWritableFile(ValueLogFileName(dbname_,compact->valuelog_file_id), &compact->valuelogfile); return s; @@ -878,6 +878,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, } delete compact->outfile; compact->outfile = nullptr; + // value log 落盘 if (s.ok()) { s = compact->valuelogfile->Flush(); } @@ -955,9 +956,13 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { std::string current_user_key; bool has_current_user_key = false; SequenceNumber last_sequence_for_key = kMaxSequenceNumber; + // 记录 当前和下一层 level 中被合并的 files for (int which = 0; which < 2; which++) { for (int i = 0; i < compact->compaction->num_input_files(which); i++) { - if(compact->compaction->input(which, i)->valuelog_id)oldvaluelog_ids[compact->compaction->input(which, i)->number]=compact->compaction->input(which, i)->valuelog_id; + auto tmp_file=compact->compaction->input(which, i); + if(tmp_file->valuelog_id){ + oldvaluelog_ids[tmp_file->number]=tmp_file->valuelog_id; + } } } while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { @@ -1044,31 +1049,39 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { Slice old_value=input->value(); Slice new_value; std::string buf=""; - if(old_value.size()==0||old_value.data()[0]==(char)(0x00)){//when it is a deletion, input->value() will be "" + if(old_value.size()==0||old_value.data()[0]==(char)(0x00)){ + // when it is a deletion, input->value() will be "" new_value=old_value; } else{ + // not delete old_value.remove_prefix(1); - uint64_t file_id,valuelog_offset,valuelog_len; + // put value_len into value_log + int value_offset=sizeof(uint64_t)*2;// 16 + // uint64_t file_id,valuelog_offset,valuelog_len; + uint64_t file_id,valuelog_offset; bool res=GetVarint64(&old_value,&file_id); if(!res)assert(0); res=GetVarint64(&old_value,&valuelog_offset); if(!res)assert(0); - res=GetVarint64(&old_value,&valuelog_len); - if(!res)assert(0); - Status s=ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); + // res=GetVarint64(&old_value,&valuelog_len); + // if(!res)assert(0); + // Status s=ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); + Status s=ReadValueLog(file_id,valuelog_offset,&new_value); assert(s.ok()); writeValueLogForCompaction(compact->valuelogfile,{new_value}); buf+=(char)(0x01); PutVarint64(&buf,compact->valuelog_file_id); PutVarint64(&buf,compact->valuelog_offset); - PutVarint64(&buf,valuelog_len); - compact->valuelog_offset+=valuelog_len; + // PutVarint64(&buf,valuelog_len); + compact->valuelog_offset+=value_offset; delete []new_value.data(); new_value=Slice(buf); } compact->builder->Add(key, new_value); + // 更新计数器 + // compact->builder->add_log_count(); // Close output file if it is big enough if (compact->builder->FileSize() >= @@ -1241,14 +1254,18 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, } Slice value_log_slice=Slice(value->c_str()+1,value->length()); Slice new_value; - uint64_t file_id,valuelog_offset,valuelog_len; + // put value_len into value_log + int value_offset=sizeof(uint64_t)*2;// 16 + // uint64_t file_id,valuelog_offset,valuelog_len; + uint64_t file_id,valuelog_offset; bool res=GetVarint64(&value_log_slice,&file_id); if(!res)return Status::Corruption("can't decode file id"); res=GetVarint64(&value_log_slice,&valuelog_offset); if(!res)return Status::Corruption("can't decode valuelog offset"); - res=GetVarint64(&value_log_slice,&valuelog_len); - if(!res)return Status::Corruption("can't decode valuelog len"); - ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); + // res=GetVarint64(&value_log_slice,&valuelog_len); + // if(!res)return Status::Corruption("can't decode valuelog len"); + // ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); + ReadValueLog(file_id,valuelog_offset,&new_value); *value=std::string(new_value.data(),new_value.size()); delete []new_value.data(); return s; @@ -1574,35 +1591,66 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { v->Unref(); } -std::vector>> DBImpl::WriteValueLog(std::vector values){ - //lock - // std::vector>> res; - // for(int i=0;iAppend(values[i]); - // res.push_back({valuelogfile_number_,{valuelogfile_offset,len}}); - // valuelogfile_offset+=len; - // } - // //unlock - // valuelogfile_->Flush(); - // return res; - std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_); +// std::vector>> DBImpl::WriteValueLog(std::vector values){ + +// std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_); +// std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); +// if (!valueFile.is_open()) { +// assert(0); +// } +// uint64_t offset=valueFile.tellp(); +// std::vector>> res; +// for(int i=0;i> DBImpl::WriteValueLog(std::vector values) { + std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); if (!valueFile.is_open()) { - assert(0); + assert(0); + } + + uint64_t offset = valueFile.tellp(); + std::vector> res; + + for (const auto& slice : values) { + uint64_t len = slice.size(); + + // 先写入长度 + valueFile.write(reinterpret_cast(&len), sizeof(uint64_t)); + if (!valueFile.good()) { + valueFile.close(); + return {}; // 写入长度失败,返回空结果 } - uint64_t offset=valueFile.tellp(); - std::vector>> res; - for(int i=0;i values){ for(int i=0;iAppend(values[i]); @@ -1610,34 +1658,73 @@ void DBImpl::writeValueLogForCompaction(WritableFile* target_file,std::vectorSync(); - // valuelogfile_->Close(); - // delete valuelogfile_; - // } valuelogfile_number_=versions_->NewFileNumber(); - // valuelogfile_offset=0; - // std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_); - // env_->NewWritableFile(file_name_,&valuelogfile_); - //unlock } -Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ +// Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ +// //lock_shared +// Status s=Status::OK(); +// std::string file_name_=ValueLogFileName(dbname_,file_id); +// //std::cout<(&len), sizeof(uint64_t)); + if (!inFile.good()) { + inFile.close(); + return Status::Corruption("Failed to read length from file!"); + } + + // Now seek to the actual data position and read the value + inFile.seekg(offset + sizeof(uint64_t)); + char* value_buf = new char[len]; + inFile.read(value_buf, len); + if (!inFile.good()) { + delete[] value_buf; + inFile.close(); + return Status::Corruption("Failed to read value from file!"); + } + + // Close the file after reading inFile.close(); - *value=Slice(value_buf,len); - return Status::OK(); + + // Assign the read data to the Slice + *value = Slice(value_buf, len); + + // Clean up allocated buffer + // should also do in v2 + delete[] value_buf; + + return s; } // Default implementations of convenience methods that subclasses of DB diff --git a/db/db_impl.h b/db/db_impl.h index ab21773..513690b 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -63,11 +63,14 @@ class DBImpl : public DB { bool GetProperty(const Slice& property, std::string* value) override; void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void CompactRange(const Slice* begin, const Slice* end) override; - std::vector>> WriteValueLog(std::vector value)override; + // std::vector>> WriteValueLog(std::vector value)override; + std::vector> WriteValueLog(std::vector value)override; void writeValueLogForCompaction(WritableFile* target_file,std::vector value); void addNewValueLog()override EXCLUSIVE_LOCKS_REQUIRED(mutex_);; std::pair getNewValuelog();//use for compaction - Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override; + // Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override; + Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice* value)override; + // Extra methods (for testing) that are not in the public DB interface diff --git a/db/db_iter.cc b/db/db_iter.cc index 28d227e..350d175 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -79,9 +79,10 @@ class DBIter : public Iterator { if(!res)assert(0); res=GetVarint64(&tmp_value,&valuelog_offset); if(!res)assert(0); - res=GetVarint64(&tmp_value,&valuelog_len); - if(!res)assert(0); - db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value); + // res=GetVarint64(&tmp_value,&valuelog_len); + // if(!res)assert(0); + // db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value); + db_->ReadValueLog(file_id,valuelog_offset,&tmp_value); return tmp_value; } Status status() const override { diff --git a/db/write_batch.cc b/db/write_batch.cc index 77af25a..14df572 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -149,8 +149,9 @@ class ValueLogInserter : public WriteBatch::Handler { v.push_back(value); auto res=db_->WriteValueLog(v); PutVarint64(&buf,res[0].first); - PutVarint64(&buf,res[0].second.first); - PutVarint64(&buf,res[0].second.second); + // PutVarint64(&buf,res[0].second.first); + // PutVarint64(&buf,res[0].second.second); + PutVarint64(&buf,res[0].second); } new_value=Slice(buf); writeBatch_.Put(key,new_value); diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 566fbee..116744f 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -102,19 +102,29 @@ class LEVELDB_EXPORT DB { // virtual Status Get_keys_by_field(const ReadOptions& options, const Field field,std::vector *keys); - virtual std::vector>> WriteValueLog(std::vector value){ + // virtual std::vector>> WriteValueLog(std::vector value){ + // assert(0); + // std::vector>> v; + // return v; + // } + virtual std::vector> WriteValueLog(std::vector value){ assert(0); - std::vector>> v; + std::vector> v; return v; } virtual void addNewValueLog(){assert(0);} - virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ + // virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ + // assert(0); // Not implemented + // return Status::Corruption("not imp"); + // } + virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice* value){ assert(0); // Not implemented return Status::Corruption("not imp"); } + // Return a heap-allocated iterator over the contents of the database. // The result of NewIterator() is initially invalid (caller must // call one of the Seek methods on the iterator before using it).