diff --git a/CMakeLists.txt b/CMakeLists.txt index 91a44f2..122df31 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,22 +2,23 @@ # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. See the AUTHORS file for names of contributors. -cmake_minimum_required(VERSION 3.9) +cmake_minimum_required(VERSION 3.10) # Keep the version below in sync with the one in db.h project(leveldb VERSION 1.23.0 LANGUAGES C CXX) # C standard can be overridden when this is used as a sub-project. if(NOT CMAKE_C_STANDARD) # This project can use C11, but will gracefully decay down to C89. - set(CMAKE_C_STANDARD 11) + # 我改到17了 + set(CMAKE_C_STANDARD 17) set(CMAKE_C_STANDARD_REQUIRED OFF) set(CMAKE_C_EXTENSIONS OFF) endif(NOT CMAKE_C_STANDARD) # C++ standard can be overridden when this is used as a sub-project. if(NOT CMAKE_CXX_STANDARD) - # This project requires C++11. - set(CMAKE_CXX_STANDARD 11) + # This project requires C++17. + set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) endif(NOT CMAKE_CXX_STANDARD) diff --git a/db/builder.cc b/db/builder.cc index 7cf13f8..780e6f9 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -28,6 +28,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, return s; } + // 如果第一个字节是 0x01,它会移除这个前缀,并尝试从剩下的数据中解析出 value { auto tmp_value=iter->value(); if(tmp_value.data()[0]==(char)(0x01)){ diff --git a/db/db_impl.cc b/db/db_impl.cc index 9f51f7b..651fc10 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4,15 +4,6 @@ #include "db/db_impl.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include #include "db/builder.h" #include "db/db_iter.h" #include "db/dbformat.h" @@ -23,11 +14,24 @@ #include "db/table_cache.h" #include "db/version_set.h" #include "db/write_batch_internal.h" +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + #include "leveldb/db.h" #include "leveldb/env.h" #include "leveldb/status.h" #include "leveldb/table.h" #include "leveldb/table_builder.h" + #include "port/port.h" #include "table/block.h" #include "table/merger.h" @@ -35,6 +39,8 @@ #include "util/coding.h" #include "util/logging.h" #include "util/mutexlock.h" +#include +namespace fs = std::filesystem; namespace leveldb { @@ -57,7 +63,6 @@ struct DBImpl::CompactionState { struct Output { uint64_t number; uint64_t file_size; - uint64_t valuelog_id; InternalKey smallest, largest; }; @@ -84,10 +89,6 @@ struct DBImpl::CompactionState { WritableFile* outfile; TableBuilder* builder; - WritableFile* valuelogfile; - uint64_t valuelog_offset=0; - uint64_t valuelog_file_id=0; - uint64_t total_bytes; }; @@ -142,6 +143,8 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) db_lock_(nullptr), shutting_down_(false), background_work_finished_signal_(&mutex_), + background_gc_finished_signal_(&gc_mutex_), + spj_mutex_cond_(&spj_mutex_), mem_(nullptr), imm_(nullptr), has_imm_(false), @@ -151,6 +154,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) seed_(0), tmp_batch_(new WriteBatch), background_compaction_scheduled_(false), + background_garbage_collect_scheduled_(false), manual_compaction_(nullptr), versions_(new VersionSet(dbname_, &options_, table_cache_, &internal_comparator_)) {} @@ -281,10 +285,6 @@ void DBImpl::RemoveObsoleteFiles() { } Log(options_.info_log, "Delete type=%d #%lld\n", static_cast(type), static_cast(number)); - if(oldvaluelog_ids.count(number)){ - std::string valuelog_filename=ValueLogFileName(dbname_,oldvaluelog_ids[number]); - env_->RemoveFile(valuelog_filename); - } } } } @@ -546,7 +546,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } edit->AddFile(level, meta.number, meta.file_size, meta.smallest, - meta.largest,meta.valuelog_id); + meta.largest); } CompactionStats stats; @@ -666,6 +666,16 @@ Status DBImpl::TEST_CompactMemTable() { return s; } +void DBImpl::TEST_GarbageCollect() { + MaybeScheduleGarbageCollect(); + // Finish current background compaction in the case where + // `background_work_finished_signal_` was signalled due to an error. + while (background_garbage_collect_scheduled_) { + background_gc_finished_signal_.Wait(); + } + // std::cout<<"bg_signal"<(db)->BackgroundGarbageCollect(); +} + void DBImpl::BGWork(void* db) { reinterpret_cast(db)->BackgroundCall(); } @@ -708,12 +739,37 @@ void DBImpl::BackgroundCall() { background_compaction_scheduled_ = false; + // // Check if garbage collection needs to be scheduled after compaction + // MaybeScheduleGarbageCollect(); + // Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed. MaybeScheduleCompaction(); background_work_finished_signal_.SignalAll(); } +void DBImpl::BackgroundGarbageCollect() { + MutexLock l(&gc_mutex_); + assert(background_garbage_collect_scheduled_); + + if (shutting_down_.load(std::memory_order_acquire)) { + // No more background work when shutting down. + } else if (!bg_error_.ok()) { + // No more background work after a background error. + } else { + // Perform garbage collection here + GarbageCollect(); + gc_mutex_.Unlock(); + + } + + background_garbage_collect_scheduled_ = false; + + // Notify any waiting threads + background_gc_finished_signal_.SignalAll(); + +} + void DBImpl::BackgroundCompaction() { mutex_.AssertHeld(); @@ -750,7 +806,7 @@ void DBImpl::BackgroundCompaction() { FileMetaData* f = c->input(0, 0); c->edit()->RemoveFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, - f->largest,f->valuelog_id); + f->largest); status = versions_->LogAndApply(c->edit(), &mutex_); if (!status.ok()) { RecordBackgroundError(status); @@ -824,11 +880,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { out.number = file_number; out.smallest.Clear(); out.largest.Clear(); - compact->valuelog_file_id=versions_->NewFileNumber(); - out.valuelog_id=compact->valuelog_file_id; - compact->outputs.push_back(out); - mutex_.Unlock(); } @@ -838,11 +890,6 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { if (s.ok()) { compact->builder = new TableBuilder(options_, compact->outfile); } - - - compact->valuelog_offset=0; - s = env_->NewWritableFile(ValueLogFileName(dbname_,compact->valuelog_file_id), &compact->valuelogfile); - return s; } @@ -878,19 +925,6 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, } delete compact->outfile; compact->outfile = nullptr; - if (s.ok()) { - s = compact->valuelogfile->Flush(); - } - if (s.ok()) { - s = compact->valuelogfile->Sync(); - } - if (s.ok()) { - s = compact->valuelogfile->Close(); - } - delete compact->valuelogfile; - compact->valuelogfile=nullptr; - compact->valuelog_file_id=0; - compact->valuelog_offset=0; if (s.ok() && current_entries > 0) { // Verify that the table is usable @@ -921,7 +955,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, - out.smallest, out.largest,out.valuelog_id); + out.smallest, out.largest); } return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } @@ -955,11 +989,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { std::string current_user_key; bool has_current_user_key = false; SequenceNumber last_sequence_for_key = kMaxSequenceNumber; - for (int which = 0; which < 2; which++) { - for (int i = 0; i < compact->compaction->num_input_files(which); i++) { - if(compact->compaction->input(which, i)->valuelog_id)oldvaluelog_ids[compact->compaction->input(which, i)->number]=compact->compaction->input(which, i)->valuelog_id; - } - } while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { // Prioritize immutable compaction work if (has_imm_.load(std::memory_order_relaxed)) { @@ -1040,35 +1069,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { compact->current_output()->smallest.DecodeFrom(key); } compact->current_output()->largest.DecodeFrom(key); - - Slice old_value=input->value(); - Slice new_value; - std::string buf=""; - if(old_value.size()==0||old_value.data()[0]==(char)(0x00)){//when it is a deletion, input->value() will be "" - new_value=old_value; - } - else{ - old_value.remove_prefix(1); - uint64_t file_id,valuelog_offset,valuelog_len; - bool res=GetVarint64(&old_value,&file_id); - if(!res)assert(0); - res=GetVarint64(&old_value,&valuelog_offset); - if(!res)assert(0); - res=GetVarint64(&old_value,&valuelog_len); - if(!res)assert(0); - Status s=ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); - assert(s.ok()); - writeValueLogForCompaction(compact->valuelogfile,{new_value}); - buf+=(char)(0x01); - PutVarint64(&buf,compact->valuelog_file_id); - PutVarint64(&buf,compact->valuelog_offset); - PutVarint64(&buf,valuelog_len); - compact->valuelog_offset+=valuelog_len; - delete []new_value.data(); - new_value=Slice(buf); - } - - compact->builder->Add(key, new_value); + compact->builder->Add(key, input->value()); // Close output file if it is big enough if (compact->builder->FileSize() >= @@ -1092,14 +1093,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (status.ok()) { status = input->status(); } - //not completely correct, should be written in new function, related to removeabsol... - // if(status.ok()){ - // for(auto id:old_valuelog_ids){ - // auto valuelog_filename=ValueLogFileName(dbname_,id); - // Status s=env_->RemoveFile(valuelog_filename); - // assert(s.ok()); - // } - // } delete input; input = nullptr; @@ -1235,22 +1228,29 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, if (imm != nullptr) imm->Unref(); current->Unref(); if(!s.ok())return s; - if(value->c_str()[0]==0x00){ - *value=value->substr(1); + if(options.find_value_log_for_gc){ return s; } - Slice value_log_slice=Slice(value->c_str()+1,value->length()); + + if (value->c_str()[0] == 0x00) { + *value = value->substr(1); + return s; + } + Slice value_log_slice = Slice(value->c_str() + 1, value->length()); + Slice new_key; Slice new_value; - uint64_t file_id,valuelog_offset,valuelog_len; - bool res=GetVarint64(&value_log_slice,&file_id); - if(!res)return Status::Corruption("can't decode file id"); - res=GetVarint64(&value_log_slice,&valuelog_offset); - if(!res)return Status::Corruption("can't decode valuelog offset"); - res=GetVarint64(&value_log_slice,&valuelog_len); - if(!res)return Status::Corruption("can't decode valuelog len"); - ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); - *value=std::string(new_value.data(),new_value.size()); - delete []new_value.data(); + int value_offset = sizeof(uint64_t) * 2; // 16 + uint64_t file_id, valuelog_offset; + bool res = GetVarint64(&value_log_slice, &file_id); + if (!res) return Status::Corruption("can't decode file id"); + res = GetVarint64(&value_log_slice, &valuelog_offset); + if (!res) return Status::Corruption("can't decode valuelog offset"); + s=ReadValueLog(file_id, valuelog_offset, &new_key, &new_value); + if(!s.ok()){ + return s; + } + *value = std::string(new_value.data(), new_value.size()); + delete[] new_value.data(); return s; } @@ -1285,10 +1285,17 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) { // Convenience methods Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { + if(!o.valuelog_write){ + spj_mutex_.Lock(); + while(key==valuelog_finding_key){ + spj_mutex_cond_.Wait(); + } + spj_mutex_.Unlock(); + } + return DB::Put(o, key, val); } - Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { return DB::Delete(options, key); } @@ -1314,7 +1321,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Writer* last_writer = &w; if (status.ok() && updates != nullptr) { // nullptr batch is for compactions WriteBatch* write_batch = BuildBatchGroup(&last_writer); - WriteBatchInternal::ConverToValueLog(write_batch,this); + WriteBatchInternal::ConverToValueLog(write_batch, this); WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); last_sequence += WriteBatchInternal::Count(write_batch); @@ -1351,17 +1358,17 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { while (true) { Writer* ready = writers_.front(); writers_.pop_front(); - if (ready != &w) { - ready->status = status; - ready->done = true; - ready->cv.Signal(); - } + //if (ready != &w) { + ready->status = status; + ready->done = true; + ready->cv.SignalAll(); + //} if (ready == last_writer) break; } // Notify new head of write queue if (!writers_.empty()) { - writers_.front()->cv.Signal(); + writers_.front()->cv.SignalAll(); } return status; @@ -1574,70 +1581,452 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { v->Unref(); } -std::vector>> DBImpl::WriteValueLog(std::vector values){ - //lock - // std::vector>> res; - // for(int i=0;iAppend(values[i]); - // res.push_back({valuelogfile_number_,{valuelogfile_offset,len}}); - // valuelogfile_offset+=len; - // } - // //unlock - // valuelogfile_->Flush(); - // return res; - std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_); +// std::vector>> +// DBImpl::WriteValueLog(std::vector values){ + +// std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_); +// std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); +// if (!valueFile.is_open()) { +// assert(0); +// } +// uint64_t offset=valueFile.tellp(); +// std::vector>> res; +// for(int i=0;i> DBImpl::WriteValueLog( + std::vector> kv) { + std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); if (!valueFile.is_open()) { - assert(0); + assert(0); + } + + uint64_t offset = valueFile.tellp(); + std::vector> res; + + for (const auto& [key_slice, value_slice] : kv) { + // 写入 key 的长度 + uint64_t key_len = key_slice.size(); + valueFile.write(reinterpret_cast(&key_len), sizeof(uint64_t)); + if (!valueFile.good()) { + valueFile.close(); + assert(0); + } + + // 写入 key 本身 + valueFile.write(key_slice.data(), key_len); + if (!valueFile.good()) { + valueFile.close(); + assert(0); + } + + // 写入 value 的长度 + uint64_t value_len = value_slice.size(); + valueFile.write(reinterpret_cast(&value_len), + sizeof(uint64_t)); + if (!valueFile.good()) { + valueFile.close(); + assert(0); } - uint64_t offset=valueFile.tellp(); - std::vector>> res; - for(int i=0;i values){ - for(int i=0;i values) { + for (int i = 0; i < values.size(); i++) { target_file->Append(values[i]); } } -void DBImpl::addNewValueLog(){ - //lock - // if(valuelogfile_){ - // valuelogfile_->Sync(); - // valuelogfile_->Close(); - // delete valuelogfile_; - // } - valuelogfile_number_=versions_->NewFileNumber(); - // valuelogfile_offset=0; - // std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_); - // env_->NewWritableFile(file_name_,&valuelogfile_); - //unlock +void DBImpl::addNewValueLog() { + valuelogfile_number_ = versions_->NewFileNumber(); } -Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ - //lock_shared - std::string file_name_=ValueLogFileName(dbname_,file_id); - //std::cout<(&len), sizeof(uint64_t)); +// char *value_buf_len=new char[sizeof(uint64_t)]; +// inFile.read(value_buf_len,sizeof(uint64_t)); +// uint64_t len=0; +// std::memcpy(&len, value_buf_len, sizeof(uint64_t)); + +// if (!inFile.good()) { +// inFile.close(); +// return Status::Corruption("Failed to read length from file!"); +// } + +// // Now seek to the actual data position and read the value +// inFile.seekg(offset + sizeof(uint64_t)); +// char* value_buf = new char[len]; +// inFile.read(value_buf, len); +// if (!inFile.good()) { +// delete[] value_buf; +// inFile.close(); +// return Status::Corruption("Failed to read value from file!"); +// } + +// // Close the file after reading +// inFile.close(); + +// // Assign the read data to the Slice +// *value = Slice(value_buf, len); + +// return s; +// } + +Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, + Slice* value) { + Status s = Status::OK(); + std::string file_name_ = ValueLogFileName(dbname_, file_id); + + // Open the file in binary mode for reading std::ifstream inFile(file_name_, std::ios::in | std::ios::binary); if (!inFile.is_open()) { - std::cerr << "Failed to open file for writing!"< suffix.size() && + filename.substr(filename.size() - suffix.size()) == suffix; +} + + +// 示例:解析 sstable 中的元信息 +void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id, + uint64_t& offset) { + // 假设 stored_value 格式为:valuelog_id|offset + Slice tmp(stored_value.data(),stored_value.size()); + GetVarint64(&tmp,&valuelog_id); + GetVarint64(&tmp,&offset); +} + +// 示例:获取 ValueLog 文件 ID +uint64_t GetValueLogID(const std::string& valuelog_name) { + // 使用 std::filesystem::path 解析文件名 + std::filesystem::path file_path(valuelog_name); + std::string filename = file_path.filename().string(); // 获取文件名部分 + + // 查找文件名中的 '.' 位置,提取数字部分 + auto pos = filename.find('.'); + if (pos == std::string::npos) { + assert(0); + } + + // 提取数字部分 + std::string id_str = filename.substr(0, pos); + // 检查提取的部分是否为有效数字 + for (char c : id_str) { + if (!isdigit(c)) { + assert(0); + } + } + + return std::stoull(id_str); // 转换为 uint64_t +} + + +// 垃圾回收实现 +void DBImpl::GarbageCollect() { + gc_mutex_.AssertHeld(); + // 遍历数据库目录,找到所有 valuelog 文件 + Log(options_.info_log, "start gc "); + auto files_set = fs::directory_iterator(dbname_); + std::set valuelog_set; + std::string cur_valuelog_name=ValueLogFileName(dbname_,valuelogfile_number_); + for (const auto& cur_log_file : files_set) { + if (fs::exists(cur_log_file) && + fs::is_regular_file(fs::status(cur_log_file)) && + IsValueLogFile(cur_log_file.path().filename().string())) { + if(cur_valuelog_name==cur_log_file.path().filename().string())continue; + valuelog_set.emplace(cur_log_file.path().filename().string()); + } + } + for (std::string valuelog_name:valuelog_set) { + // std::cout << valuelog_name << std::endl; + uint64_t cur_log_number = GetValueLogID(valuelog_name); + valuelog_name=ValueLogFileName(dbname_,cur_log_number); + + uint64_t current_offset = 0; + uint64_t tmp_offset = 0; + + int cnt=0; + + // Open the file in binary mode for reading + std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary); + if (!cur_valuelog.is_open()) { + std::cerr << "Failed to open file: " << valuelog_name << " for reading cur_valuelog!" + << std::endl; + continue; + } + + while (true) { + tmp_offset=current_offset; + ++cnt; + // std::cout << cnt <<" "<0){ + auto last_writer=writers_.back(); + while(!last_writer->done){ + last_writer->cv.Wait(); + } + } + mutex_.Unlock(); + + auto option=leveldb::ReadOptions(); + option.find_value_log_for_gc = true; + + Status status = Get(option, key, &stored_value); + + if (status.IsNotFound()) { + // Key 不存在,忽略此记录 + continue; + } + + if (!status.ok()) { + std::cerr << "Error accessing sstable: " << status.ToString() + << std::endl; + continue; + } + + // 检查 valuelog_id 和 offset 是否匹配 + uint64_t stored_valuelog_id, stored_offset; + ParseStoredValue(stored_value.substr(1), stored_valuelog_id, + stored_offset); // 假设解析函数 + if (stored_valuelog_id != GetValueLogID(valuelog_name) || + stored_offset != tmp_offset) { + // 记录无效,跳过 + continue; + } + + auto write_op=leveldb::WriteOptions(); + write_op.valuelog_write=true; + status = Put(write_op, key, value); + + spj_mutex_.Lock(); + valuelog_finding_key=""; + spj_mutex_.Unlock(); + spj_mutex_cond_.SignalAll(); + + if (!status.ok()) { + std::cerr << "Error accessing sstable: " << status.ToString() + << std::endl; + continue; + } + } + + // 清理旧文件(如果需要) + cur_valuelog.close(); + + std::remove(valuelog_name.c_str()); // 删除旧的 ValueLog 文件 + Log(options_.info_log, "remove file during gc %s", valuelog_name.c_str()); + } } // Default implementations of convenience methods that subclasses of DB diff --git a/db/db_impl.h b/db/db_impl.h index ab21773..9dda4dd 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -5,19 +5,20 @@ #ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_ #define STORAGE_LEVELDB_DB_DB_IMPL_H_ +#include "db/dbformat.h" +#include "db/log_writer.h" +#include "db/snapshot.h" #include #include -#include -#include -#include #include +#include +#include #include +#include -#include "db/dbformat.h" -#include "db/log_writer.h" -#include "db/snapshot.h" #include "leveldb/db.h" #include "leveldb/env.h" + #include "port/port.h" #include "port/thread_annotations.h" @@ -44,12 +45,12 @@ class DBImpl : public DB { // // 反序列化为字段数组 // FieldArray ParseValue(const std::string& value_str)override; - // Status Put_with_fields(const WriteOptions& options, const Slice& key,const FieldArray& fields)override; + // Status Put_with_fields(const WriteOptions& options, const Slice& key,const + // FieldArray& fields)override; // Status Get_with_fields(const ReadOptions& options, const Slice& key, // FieldArray* fields)override; - // Implementations of the DB interface Status Put(const WriteOptions&, const Slice& key, const Slice& value) override; @@ -63,11 +64,19 @@ class DBImpl : public DB { bool GetProperty(const Slice& property, std::string* value) override; void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void CompactRange(const Slice* begin, const Slice* end) override; - std::vector>> WriteValueLog(std::vector value)override; - void writeValueLogForCompaction(WritableFile* target_file,std::vector value); - void addNewValueLog()override EXCLUSIVE_LOCKS_REQUIRED(mutex_);; - std::pair getNewValuelog();//use for compaction - Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override; + // std::vector>> + // WriteValueLog(std::vector value)override; + std::vector> WriteValueLog( + std::vector> value) override; + void writeValueLogForCompaction(WritableFile* target_file, + std::vector value); + void addNewValueLog() override EXCLUSIVE_LOCKS_REQUIRED(mutex_); + ; + std::pair getNewValuelog(); // use for compaction + // Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice* + // value)override; + Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, + Slice* value) override; // Extra methods (for testing) that are not in the public DB interface @@ -77,6 +86,9 @@ class DBImpl : public DB { // Force current memtable contents to be compacted. Status TEST_CompactMemTable(); + void TEST_GarbageCollect() override; + + // Return an internal iterator over the current state of the database. // The keys of this iterator are internal keys (see format.h). // The returned iterator should be deleted when no longer needed. @@ -158,9 +170,15 @@ class DBImpl : public DB { void RecordBackgroundError(const Status& s); void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void MaybeScheduleGarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_); static void BGWork(void* db); + static void BGWorkGC(void* db); + void BackgroundCall(); void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void BackgroundGarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void GarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void CleanupCompaction(CompactionState* compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status DoCompactionWork(CompactionState* compact) @@ -192,19 +210,28 @@ class DBImpl : public DB { // State below is protected by mutex_ port::Mutex mutex_; - //std::shared_mutex value_log_mutex; + port::Mutex gc_mutex_; + port::Mutex spj_mutex_; + port::CondVar spj_mutex_cond_ GUARDED_BY(spj_mutex_); + + + // std::shared_mutex value_log_mutex; std::atomic shutting_down_; port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_); + port::CondVar background_gc_finished_signal_ GUARDED_BY(gc_mutex_); + + Slice valuelog_finding_key="" GUARDED_BY(spj_mutex_ ); + MemTable* mem_; MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted std::atomic has_imm_; // So bg thread can detect non-null imm_ WritableFile* logfile_; WritableFile* valuelogfile_; - int valuelogfile_offset=0; + int valuelogfile_offset = 0; uint64_t logfile_number_; uint64_t valuelogfile_number_; log::Writer* log_; - std::map oldvaluelog_ids; + std::map oldvaluelog_ids; uint32_t seed_ GUARDED_BY(mutex_); // For sampling. // Queue of writers. @@ -220,6 +247,10 @@ class DBImpl : public DB { // Has a background compaction been scheduled or is running? bool background_compaction_scheduled_ GUARDED_BY(mutex_); + // Has a background gc been scheduled or is running? + bool background_garbage_collect_scheduled_ GUARDED_BY(mutex_); + + ManualCompaction* manual_compaction_ GUARDED_BY(mutex_); VersionSet* const versions_ GUARDED_BY(mutex_); diff --git a/db/db_iter.cc b/db/db_iter.cc index 28d227e..e27f1a4 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -69,6 +69,7 @@ class DBIter : public Iterator { Slice value() const override { assert(valid_); auto tmp_value= (direction_ == kForward) ? iter_->value() : saved_value_; + Slice key; if(tmp_value.data()[0]==0x00){ tmp_value.remove_prefix(1); return tmp_value; @@ -79,9 +80,10 @@ class DBIter : public Iterator { if(!res)assert(0); res=GetVarint64(&tmp_value,&valuelog_offset); if(!res)assert(0); - res=GetVarint64(&tmp_value,&valuelog_len); - if(!res)assert(0); - db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value); + // res=GetVarint64(&tmp_value,&valuelog_len); + // if(!res)assert(0); + // db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value); + db_->ReadValueLog(file_id,valuelog_offset, &key, &tmp_value); return tmp_value; } Status status() const override { diff --git a/db/write_batch.cc b/db/write_batch.cc index 77af25a..6030f47 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -145,12 +145,13 @@ class ValueLogInserter : public WriteBatch::Handler { } else{ buf+=(char)(0x01); - std::vector v; - v.push_back(value); - auto res=db_->WriteValueLog(v); + std::vector> kv; + kv.push_back({key,value}); + auto res=db_->WriteValueLog(kv); PutVarint64(&buf,res[0].first); - PutVarint64(&buf,res[0].second.first); - PutVarint64(&buf,res[0].second.second); + // PutVarint64(&buf,res[0].second.first); + // PutVarint64(&buf,res[0].second.second); + PutVarint64(&buf,res[0].second); } new_value=Slice(buf); writeBatch_.Put(key,new_value); diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 566fbee..cefa79c 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -102,19 +102,29 @@ class LEVELDB_EXPORT DB { // virtual Status Get_keys_by_field(const ReadOptions& options, const Field field,std::vector *keys); - virtual std::vector>> WriteValueLog(std::vector value){ + // virtual std::vector>> WriteValueLog(std::vector value){ + // assert(0); + // std::vector>> v; + // return v; + // } + virtual std::vector> WriteValueLog(std::vector> value){ assert(0); - std::vector>> v; + std::vector> v; return v; } virtual void addNewValueLog(){assert(0);} - virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ + // virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ + // assert(0); // Not implemented + // return Status::Corruption("not imp"); + // } + virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, Slice* value){ assert(0); // Not implemented return Status::Corruption("not imp"); } + // Return a heap-allocated iterator over the contents of the database. // The result of NewIterator() is initially invalid (caller must // call one of the Seek methods on the iterator before using it). @@ -173,6 +183,8 @@ class LEVELDB_EXPORT DB { // Therefore the following call will compact the entire database: // db->CompactRange(nullptr, nullptr); virtual void CompactRange(const Slice* begin, const Slice* end) = 0; + virtual void TEST_GarbageCollect(){}; + }; // Destroy the contents of the specified database. diff --git a/include/leveldb/options.h b/include/leveldb/options.h index d755f46..7bd79db 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -157,6 +157,8 @@ struct LEVELDB_EXPORT ReadOptions { // Callers may wish to set this field to false for bulk scans. bool fill_cache = true; + bool find_value_log_for_gc = false; + // If "snapshot" is non-null, read as of the supplied snapshot // (which must belong to the DB that is being read and which must // not have been released). If "snapshot" is null, use an implicit @@ -183,6 +185,7 @@ struct LEVELDB_EXPORT WriteOptions { // with sync==true has similar crash semantics to a "write()" // system call followed by "fsync()". bool sync = false; + bool valuelog_write=false; }; } // namespace leveldb diff --git a/include/leveldb/slice.h b/include/leveldb/slice.h index d225f34..d78c078 100644 --- a/include/leveldb/slice.h +++ b/include/leveldb/slice.h @@ -69,7 +69,7 @@ class LEVELDB_EXPORT Slice { // Drop the first "n" bytes from this slice. void remove_prefix(size_t n) { - if(n>size()){ + if (n > size()) { assert(0); } assert(n <= size()); diff --git a/test/test.cpp b/test/test.cpp index 78240d9..077e947 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -86,79 +86,111 @@ Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,st return Status::OK(); } -TEST(Test, CheckGetFields) { - DB *db; - WriteOptions writeOptions; - ReadOptions readOptions; - if(OpenDB("testdb_for_XOY", &db).ok() == false) { - std::cerr << "open db failed" << std::endl; - abort(); - } - std::string key1 = "k_1"; +// TEST(Test, CheckGetFields) { +// DB *db; +// WriteOptions writeOptions; +// ReadOptions readOptions; +// if(OpenDB("testdb_for_XOY", &db).ok() == false) { +// std::cerr << "open db failed" << std::endl; +// abort(); +// } +// std::string key1 = "k_1"; - FieldArray fields1 = { - {"name", "Customer#000000001"}, - {"address", "IVhzIApeRb"}, - {"phone", "25-989-741-2988"} - }; +// FieldArray fields1 = { +// {"name", "Customer#000000001"}, +// {"address", "IVhzIApeRb"}, +// {"phone", "25-989-741-2988"} +// }; - auto value1=SerializeValue(fields1); +// auto value1=SerializeValue(fields1); - db->Put(WriteOptions(), key1, value1); +// db->Put(WriteOptions(), key1, value1); - // 璇诲彇骞跺弽搴忓垪鍖? - std::string value_ret; - FieldArray res1; +// // 璇诲彇骞跺弽搴忓垪鍖? +// std::string value_ret; +// FieldArray res1; - db->Get(ReadOptions(), key1, &value_ret); - DeserializeValue(value_ret, &res1); - for(auto pr:res1){ - std::cout<Get(ReadOptions(), key1, &value_ret); +// DeserializeValue(value_ret, &res1); +// for(auto pr:res1){ +// std::cout<Delete(WriteOptions(),key1); +// db->Delete(WriteOptions(),key1); - std::cout<<"get serialized value done"< keys; - std::vector target_keys; - for(int i=0;i<10000;i++){ - std::string key=std::to_string(rand()%10000)+"_"+std::to_string(i);//random for generate nonincreasing keys - FieldArray fields={ - {"name", key}, - {"address", std::to_string(rand()%7)}, - {"phone", std::to_string(rand()%114514)} - }; - if(rand()%5==0){ - fields[0].second="special_key"; - target_keys.push_back(key); - } - keys.push_back(key); - db->Put(WriteOptions(),key,SerializeValue(fields)); - } - std::sort(target_keys.begin(),target_keys.end()); - std::vector key_res; - Get_keys_by_field(db,ReadOptions(),{"name", "special_key"},&key_res); - ASSERT_TRUE(CompareKey(key_res, target_keys)); - std::cout<<"get key by field done"<Delete(WriteOptions(),s); - } - delete db; -} - -TEST(Test, LARGE_DATA_COMPACT_TEST) { +// std::cout<<"get serialized value done"< keys; +// std::vector target_keys; +// for(int i=0;i<10000;i++){ +// std::string key=std::to_string(rand()%10000)+"_"+std::to_string(i);//random for generate nonincreasing keys +// FieldArray fields={ +// {"name", key}, +// {"address", std::to_string(rand()%7)}, +// {"phone", std::to_string(rand()%114514)} +// }; +// if(rand()%5==0){ +// fields[0].second="special_key"; +// target_keys.push_back(key); +// } +// keys.push_back(key); +// db->Put(WriteOptions(),key,SerializeValue(fields)); +// } +// std::sort(target_keys.begin(),target_keys.end()); +// std::vector key_res; +// Get_keys_by_field(db,ReadOptions(),{"name", "special_key"},&key_res); +// ASSERT_TRUE(CompareKey(key_res, target_keys)); +// std::cout<<"get key by field done"<Delete(WriteOptions(),s); +// } +// delete db; +// } + +// TEST(Test, LARGE_DATA_COMPACT_TEST) { +// DB *db; +// WriteOptions writeOptions; +// ReadOptions readOptions; +// if(OpenDB("testdb_for_XOY_large", &db).ok() == false) { +// std::cerr << "open db failed" << std::endl; +// abort(); +// } +// std::vector values; +// for(int i=0;i<500000;i++){ +// std::string key=std::to_string(i); +// std::string value; +// for(int j=0;j<1000;j++){ +// value+=std::to_string(i); +// } +// values.push_back(value); +// db->Put(writeOptions,key,value); +// } +// for(int i=0;i<500000;i++){ +// std::string key=std::to_string(i); +// std::string value; +// Status s=db->Get(readOptions,key,&value); +// assert(s.ok()); +// if(values[i]!=value){ +// std::cout<Put(writeOptions,key,value); } + std::cout<<"start gc"<TEST_GarbageCollect(); + std::cout<<"finish gc"<Get(readOptions,key,&value); @@ -191,7 +228,6 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) { } - int main(int argc, char** argv) { // All tests currently run with the same read-only file limits. testing::InitGoogleTest(&argc, argv);