diff --git a/CMakeLists.txt b/CMakeLists.txt index 5647580..d33c5df 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -121,7 +121,9 @@ add_library(leveldb db/vlog_reader.h db/vlog_reader.cc db/vlog_writer.h - db/vlog_writer.cc) + db/vlog_writer.cc + db/kv_separate_management.cc + db/kv_separate_management.h) target_sources(leveldb PRIVATE "${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h" @@ -548,11 +550,15 @@ target_link_libraries(value_field_test PRIVATE leveldb gtest) add_executable(kv_test "${PROJECT_SOURCE_DIR}/test/kv_test.cc" test/kv_test.cc + db/kv_separate_management.cc + db/kv_separate_management.h ) target_link_libraries(kv_test PRIVATE leveldb gtest) add_executable(bench_test "${PROJECT_SOURCE_DIR}/test/bench_test.cc" test/bench_test.cc + db/kv_separate_management.cc + db/kv_separate_management.h ) target_link_libraries(bench_test PRIVATE leveldb gtest) \ No newline at end of file diff --git a/db/db_impl.cc b/db/db_impl.cc index a27619c..47e212e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -11,6 +11,7 @@ #include #include #include +#include #include "fields.h" #include "db/builder.h" @@ -141,6 +142,9 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) db_lock_(nullptr), shutting_down_(false), background_work_finished_signal_(&mutex_), + + garbage_collection_work_signal_(&mutex_), + mem_(nullptr), imm_(nullptr), has_imm_(false), @@ -148,12 +152,15 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) logfile_number_(0), seed_(0), tmp_batch_(new WriteBatch), + background_compaction_scheduled_(false), + background_GarbageCollection_scheduled_(false), + finish_back_garbage_collection_(false), manual_compaction_(nullptr), vlog_(nullptr), vlog_kv_numbers_(0), - + garbage_collection_management_(new SeparateManagement(raw_options.garbage_collection_threshold) ), versions_(new VersionSet(dbname_, &options_, table_cache_, &internal_comparator_)) {} @@ -164,6 +171,9 @@ DBImpl::~DBImpl() { while (background_compaction_scheduled_) { background_work_finished_signal_.Wait(); } + while(background_GarbageCollection_scheduled_){ + garbage_collection_work_signal_.Wait(); + } mutex_.Unlock(); if (db_lock_ != nullptr) { @@ -745,6 +755,9 @@ void DBImpl::RecordBackgroundError(const Status& s) { if (bg_error_.ok()) { bg_error_ = s; background_work_finished_signal_.SignalAll(); + // TODO begin + // garbage_collection_work_signal_.SignalAll(); + // TODO end } } @@ -765,6 +778,204 @@ void DBImpl::MaybeScheduleCompaction() { } } +// TODO begin 调度垃圾回收的相关的函数 不需要锁,仅仅是追加的方式。 +// 获得db库中所有的log文件,将其放入到vector中 +Status DBImpl::GetAllValueLog(std::string dir,std::vector& logs){ + logs.clear(); + std::vector filenames; + // 获取文件列表 + Status s = env_->GetChildren(dir, &filenames); + if (!s.ok()) { + return s; + } + uint64_t number; + FileType type; + for (size_t i = 0; i < filenames.size(); i++) { + if (ParseFileName(filenames[i], &number, &type)) { + //存储当前已有的日志文件 + if (type == kLogFile) + logs.push_back(number); + } + } + return s; +} + +// 手动进行离线回收、 +// 1. 如果管理类 separate_management中有的话,那么就按照这个类中的map的信息进行回收,主要用于删除快照后的一个回收 +// 2. 对db目录下的文件所有的文件进行回收,主要针对于open的时候。主线程中使用。 +// 返回的 status 如果是不ok的说明回收的时候出现一个log文件是有问题的。 +Status DBImpl::OutLineGarbageCollection(){ + MutexLock l(&mutex_); + Status s; + // map 中保存了文件的信息,那么就采用map来指导回收,否则对db下所有的log文件进行回收 + if (!garbage_collection_management_->EmptyMap()) { + garbage_collection_management_->CollectionMap(); + uint64_t last_sequence = versions_->LastSequence(); + garbage_collection_management_->ConvertQueue(last_sequence); + versions_->SetLastSequence(last_sequence); + MaybeScheduleGarbageCollection(); + return Status(); + } + return s; +} + +// 读取回收一个log文件,不加锁 +// next_sequence : 只有在open的时候才会返回需要修改的值,在线gc是不需要的。 +// next_sequence 指的是第一个没有用到的sequence +Status DBImpl::CollectionValueLog(uint64_t fid, uint64_t& next_sequence) { + + struct LogReporter : public log::VlogReader::Reporter { + Status* status; + void Corruption(size_t bytes, const Status& s) override { + if (this->status->ok()) *this->status = s; + } + }; + LogReporter report; + std::string logName = LogFileName(dbname_, fid); + SequentialFile* lfile; + Status status = env_->NewSequentialFile(logName, &lfile); + if (!status.ok()) { + Log(options_.info_log, "Garbage Collection Open file error: %s", status.ToString().c_str()); + return status; + } + log::VlogReader reader(lfile, &report); + + Slice record; + std::string scratch; + // record_offset 每条record 相对文本开头的偏移。 + uint64_t record_offset = 0; + uint64_t size_offset = 0; + WriteOptions opt(options_.background_garbage_collection_separate_); + WriteBatch batch(opt.separate_threshold); + batch.setGarbageColletion(true); + WriteBatchInternal::SetSequence(&batch, next_sequence); + while( reader.ReadRecord(&record,&scratch) ){ + const char* head_record_ptr = record.data(); + record.remove_prefix(log::vHeaderSize + log::wHeaderSize); + + while( record.size() > 0 ){ + const char* head_kv_ptr = record.data(); + // kv对在文本中的偏移 + uint64_t kv_offset = record_offset + head_kv_ptr - head_record_ptr; + ValueType type = static_cast(record[0]); + record.remove_prefix(1); + Slice key; + Slice value; + std::string get_value; + + GetLengthPrefixedSlice(&record,&key); + if( type != kTypeDeletion ){ + GetLengthPrefixedSlice(&record,&value); + } + // 需要抛弃的值主要有以下三种情况:0,1,2 + // 0. log 中不是 kv 分离的都抛弃 + if(type != kTypeSeparation){ + continue; + } + + status = this->GetLsm(key,&get_value); + // 1. 从LSM tree 中找不到值,说明这个值被删除了,log中要丢弃 + // 2. 找到了值,但是最新值不是kv分离的情况,所以也可以抛弃 + if (status.IsNotFound() || !status.IsSeparated() ) { + continue; + } + // 读取错误,整个文件都不继续进行回收了 + if( !status.ok() ){ + + std::cout<<"read the file error "< config::gcWriteBatchSize ){ + Write(opt, &batch); + batch.Clear(); + batch.setGarbageColletion(true); + WriteBatchInternal::SetSequence(&batch, next_sequence); + uint64_t kv_size; + GetVarint64(&get_slice,&kv_size); + size_offset = kv_offset + kv_size; + } + } + + } + + record_offset += record.data() - head_record_ptr; + } + Write(opt, &batch); + status = env_->RemoveFile(logName); + if( status.ok() ){ + garbage_collection_management_->RemoveFileFromMap(fid); + } + return status; +} + +// 回收任务 +void DBImpl::BackGroundGarbageCollection(){ + uint64_t fid; + uint64_t last_sequence; + while( true){ + Log(options_.info_log, "garbage collection file number: %lu", fid); + if( !garbage_collection_management_->GetGarbageCollectionQueue(fid,last_sequence) ){ + return; + } + // 在线的gc回收的sequence是要提前就分配好的。 + CollectionValueLog(fid,last_sequence); + } +} + +// 可能调度后台线程进行压缩 +void DBImpl::MaybeScheduleGarbageCollection() { + mutex_.AssertHeld(); + if (background_GarbageCollection_scheduled_) { + // Already scheduled + // 先检查线程是否已经被调度了,如果已经被调度了,就直接退出。 + } else if (shutting_down_.load(std::memory_order_acquire)) { + // DB is being deleted; no more background compactions + // 如果DB已经被关闭,那么就不调度了。 + } else if (!bg_error_.ok()) { + // Already got an error; no more changes + // 如果后台线程出错,也不调度。 + } else { + //设置调度变量,通过detach线程调度;detach线程即使主线程退出,依然可以正常执行完成 + background_GarbageCollection_scheduled_ = true; + env_->ScheduleForGarbageCollection(&DBImpl::GarbageCollectionBGWork, this); + } +} +// 后台gc线程中执行的任务 +void DBImpl::GarbageCollectionBGWork(void* db) { + reinterpret_cast(db)->GarbageCollectionBackgroundCall(); +} + +void DBImpl::GarbageCollectionBackgroundCall() { + assert(background_GarbageCollection_scheduled_); + if (shutting_down_.load(std::memory_order_acquire)) { + // No more background work when shutting down. + // // 如果DB已经被关闭,那么就不调度了。 + } else if (!bg_error_.ok()) { + // No more background work after a background error. + // 如果后台线程出错,也不调度。 + } else { + // 开始后台GC回收线程 + BackGroundGarbageCollection(); + } + + background_GarbageCollection_scheduled_ = false; + //再调用 MaybeScheduleGarbageCollection 检查是否需要再次调度 + // MaybeScheduleGarbageCollection(); + garbage_collection_work_signal_.SignalAll(); +} + +// TODO end + void DBImpl::BGWork(void* db) { reinterpret_cast(db)->BackgroundCall(); } @@ -843,9 +1054,9 @@ void DBImpl::BackgroundCompaction() { // TODO begin conmpact 后需要考虑是否将 value log 文件进行 gc回收,如果需要将其加入到回收任务队列中。 // 不进行后台的gc回收,那么也不更新待分配sequence的log了。 if(!finish_back_garbage_collection_){ - garbage_colletion_management_->UpdateQueue(versions_->ImmLogFileNumber() ); + garbage_collection_management_->UpdateQueue(versions_->ImmLogFileNumber() ); } - // TODO end + CleanupCompaction(compact); c->ReleaseInputs(); RemoveObsoleteFiles(); @@ -1103,7 +1314,22 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { break; } } - } + } else {// TODO begin + //fid ,key valuesize , + Slice drop_value = input->value(); + // 获得type类型 + if( ikey.type == kTypeSeparation ){ + uint64_t fid = 0; + uint64_t offset = 0; + uint64_t size = 0; + GetVarint64(&drop_value,&fid); + GetVarint64(&drop_value,&offset); + GetVarint64(&drop_value,&size); + mutex_.Lock(); + garbage_collection_management_->UpdateMap(fid,size); + mutex_.Unlock(); + } + }// TODO end input->Next(); } @@ -1226,6 +1452,48 @@ bool DBImpl::ParseVlogValue(Slice key_value, Slice key, } } +Status DBImpl::GetLsm(const Slice& key, std::string* value) { + MutexLock l(&mutex_); + ReadOptions options; + MemTable* mem = mem_; + MemTable* imm = imm_; + Version* current = versions_->current(); + if( !this->snapshots_.empty() ){ + options.snapshot = this->snapshots_.oldest(); + } + SequenceNumber snapshot; + if (options.snapshot != nullptr) { + snapshot = static_cast(options.snapshot)->sequence_number(); + } else { + snapshot = versions_->LastSequence(); + } + Status s; + mem->Ref(); + // imm 不一定存在,但是 mem 是一定存在的。 + if (imm != nullptr) imm->Ref(); + current->Ref(); // Version 读引用计数增一 + Version::GetStats stats; + // Unlock while reading from files and memtables + { + mutex_.Unlock(); + // First look in the memtable, then in the immutable memtable (if any). + LookupKey lkey(key, snapshot); + if (mem->Get(lkey, value, &s )) { + // Done + } else if (imm != nullptr && imm->Get(lkey, value, &s)) { + // Done + } else { + //在Version中查找是否包含指定key值 + s = current->Get(options, lkey, value, &stats); + } + mutex_.Lock(); + } + mem->Unref(); + if (imm != nullptr) imm->Unref(); + current->Unref(); //Version 读引用计数减一 + return s; +} + Status DBImpl::Get(const ReadOptions& options, const Slice& key, std::string* value) { Status s; @@ -1341,12 +1609,20 @@ void DBImpl::RecordReadSample(Slice key) { const Snapshot* DBImpl::GetSnapshot() { MutexLock l(&mutex_); + // TODO begin 建立快照 对快照之后的信息不进行回收了。 + finish_back_garbage_collection_ = true; + // TODO end return snapshots_.New(versions_->LastSequence()); } void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) { MutexLock l(&mutex_); snapshots_.Delete(static_cast(snapshot)); + // TODO begin 没有快照了重新进行后台回收 + if( snapshots_.empty() ){ + finish_back_garbage_collection_ = false; + } + // TODO end } /*** DBImpl 类关于 Fields 类的 Put、Get 接口 ***/ @@ -1396,6 +1672,30 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { if (status.ok() && updates != nullptr) { // nullptr batch is for compactions WriteBatch* write_batch = BuildBatchGroup(&last_writer); + + // TODO begin gc中的batch全部都是设置好的。此时是不需要设置的。 + if( !write_batch->IsGarbageColletion() ){ + // 判断是否需要进行垃圾回收,如需要,腾出一块sequence的区域,触发垃圾回收将在makeroomforwrite当中。 + // 先进行判断是否要进行gc后台回收,如果建立了快照的话finish_back_garbage_collection_就是true, + // 此时不进行sequence分配了。 + // + if( !finish_back_garbage_collection_ + && garbage_collection_management_->ConvertQueue(last_sequence) ){ + // 尝试调度gc回收线程进行回收。 + MaybeScheduleGarbageCollection(); + } + //SetSequence在write_batch中写入本次的sequence + WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); + // Count返回write_batch中的key-value个数 + last_sequence += WriteBatchInternal::Count(write_batch); + } + vlog_kv_numbers_ += WriteBatchInternal::Count(write_batch); + // TODO 这里设置last_sequence 是为了照顾离线回收的时候,在map存在的时候需要调用 ConvertQueue 给回收任务分配sequence。 + // TODO 针对多线程调用put的时候,为了避免给gc回收的时候分配的sequence重叠。 + versions_->SetLastSequence(last_sequence); + // TODO end + + WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); last_sequence += WriteBatchInternal::Count(write_batch); @@ -1479,11 +1779,15 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { ++iter; // Advance past "first" for (; iter != writers_.end(); ++iter) { Writer* w = *iter; - if (w->sync && !first->sync) { + // TODO begin 写队列中如果碰到是gc的write_batch 停止合并。 + if (w->sync && !first->sync + || first->batch->IsGarbageColletion() + || w->batch->IsGarbageColletion()) { + // 当前的Writer要求 Sync ,而第一个Writer不要求Sync,两个的磁盘写入策略不一致。不做合并操作 // Do not include a sync write into a batch handled by a non-sync write. break; } - + // TODO end if (w->batch != nullptr) { size += WriteBatchInternal::ByteSize(w->batch); if (size > max_size) { @@ -1520,7 +1824,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { if (!s.ok()) { versions_->ReuseFileNumber(new_log_number); } -// gc_management_->WriteFileMap(logfile_number_, vlog_kv_numbers_, logfile_->GetSize()); + garbage_collection_management_->WriteFileMap(logfile_number_, vlog_kv_numbers_, logfile_->GetSize()); vlog_kv_numbers_ = 0; delete vlog_; delete logfile_; @@ -1684,7 +1988,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { // TODO begin std::vector logs; - s = impl->GetAllValueLog(dbname,logs); + s = impl->GetAllValueLog(dbname, logs); sort(logs.begin(),logs.end()); // TODO end @@ -1728,7 +2032,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { uint64_t next_sequence = impl->versions_->LastSequence() + 1; std::cout<<" collection file : "<mutex_.Unlock(); - Status stmp = impl->CollectionValueLog( fid,next_sequence ); + Status stmp = impl->CollectionValueLog(fid, next_sequence); impl->mutex_.Lock(); if( !stmp.ok() ) s = stmp; impl->versions_->SetLastSequence(next_sequence - 1); diff --git a/db/db_impl.h b/db/db_impl.h index d729107..5d0b274 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -13,6 +13,8 @@ #include "db/dbformat.h" #include "db/log_writer.h" #include "db/vlog_writer.h" +#include "db/kv_separate_management.h" + #include "db/snapshot.h" #include "leveldb/db.h" #include "leveldb/env.h" @@ -78,6 +80,11 @@ class DBImpl : public DB { // bytes. void RecordReadSample(Slice key); + // TODO begin + Status OutLineGarbageCollection(); + Status GetAllValueLog(std::string dir, std::vector& logs); + // TODO end + private: friend class DB; struct CompactionState; @@ -145,6 +152,15 @@ class DBImpl : public DB { void RecordBackgroundError(const Status& s); void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + + // TODO begin + static void GarbageCollectionBGWork(void* db); + void GarbageCollectionBackgroundCall(); + void MaybeScheduleGarbageCollection() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void BackGroundGarbageCollection(); + Status CollectionValueLog(uint64_t fid, uint64_t& last_sequence); + // TODO end + static void BGWork(void* db); void BackgroundCall(); void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); @@ -162,6 +178,10 @@ class DBImpl : public DB { return internal_comparator_.user_comparator(); } + // TODO begin + Status GetLsm( const Slice& key,std::string* value); + // TODO end + // Constant after construction Env* const env_; const InternalKeyComparator internal_comparator_; @@ -215,7 +235,16 @@ class DBImpl : public DB { int vlog_kv_numbers_; -// KVSepManagement* gc_management_; + // TODO begin 用于gc回收的过程 + + port::CondVar garbage_collection_work_signal_ GUARDED_BY(mutex_); + // 表示后台gc线程是否已经被调度或者在运行 + bool background_GarbageCollection_scheduled_ GUARDED_BY(mutex_); + + bool finish_back_garbage_collection_; + SeparateManagement* garbage_collection_management_; + + // TODO end }; // Sanitize db options. The caller should delete result.info_log if diff --git a/db/dbformat.h b/db/dbformat.h index e74af21..0b5e23f 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -44,6 +44,9 @@ static const int kMaxMemCompactLevel = 2; // Approximate gap in bytes between samples of data read during iteration. static const int kReadBytesPeriod = 1048576; +// gc后台回收的时候进行 batch合并之后再写入的大小。 +static const uint64_t gcWriteBatchSize = 4*1024*1024; + } // namespace config class InternalKey; diff --git a/db/kv_separate_management.cc b/db/kv_separate_management.cc new file mode 100644 index 0000000..4bf6966 --- /dev/null +++ b/db/kv_separate_management.cc @@ -0,0 +1,107 @@ +#include "kv_separate_management.h" +#include +#include +#include + +namespace leveldb { + +// 改变 db 的 last_sequence,给每一个需要进行gc回收的value log 文件分配 新的sequence的序号, +// 以便对value log 中的有效key的重新put进新的value log 中。返回值决定是否进行gc +bool SeparateManagement::ConvertQueue(uint64_t& db_sequence) { + if (!need_updates_.empty()) { + db_sequence++; + } else { + return false; + } + while (!need_updates_.empty()) { + ValueLogInfo* info = need_updates_.front(); + need_updates_.pop_front(); + map_file_info_.erase(info->logfile_number_); + info->last_sequence_ = db_sequence; + db_sequence += info->left_kv_numbers_; + garbage_collection_.push_back(info); + } + assert(db_sequence <= kMaxSequenceNumber); + return true; +} + +// 每一个vlog 罗盘的时候都会在map_file_info_ 中添加索引 ,这个在新建一个value log的时候会用到。 +void SeparateManagement::WriteFileMap(uint64_t fid, int kv_numbers, size_t log_memory) { + assert(map_file_info_.find(fid) == map_file_info_.end()); + ValueLogInfo* info = new ValueLogInfo(); + info->logfile_number_ = fid; + info->left_kv_numbers_ = kv_numbers; + assert(kv_numbers <= kMaxSequenceNumber); + info->invalid_memory_ = 0; + info->last_sequence_ = -1; + info->file_size_ = log_memory; + map_file_info_.insert(std::make_pair(fid,info)); +} + +// map_file_info_ 存放了所有的value log 的信息,每次删除一个key的时候要对这个key对应的value log计算空间无效利用率 +// 所以要统计有多少空间是无效的,以便后面进行触发gc的过程。 +void SeparateManagement::UpdateMap(uint64_t fid, uint64_t abandon_memory) { + if (map_file_info_.find(fid) != map_file_info_.end()) { + ValueLogInfo* info = map_file_info_[fid]; + info->left_kv_numbers_--; + info->invalid_memory_ += abandon_memory; + } +} + +// 遍历 map_file_info_ 中所有的file 找到无效空间最大的log 进行gc回收 ,这个文件要不存在 delete_files_中 +void SeparateManagement::UpdateQueue(uint64_t fid) { + std::priority_queue, MapCmp> sort_priority_; + + for (auto iter = map_file_info_.begin(); iter != map_file_info_.end(); ++iter) { + if (delete_files_.find( iter->first) == delete_files_.end()) { + sort_priority_.push(iter->second); + } + } + int num = 1; + int threshold = garbage_collection_threshold_; + if (!sort_priority_.empty() + && sort_priority_.top()->invalid_memory_ >= garbage_collection_threshold_ * 1.2) { + num = 3; + threshold = garbage_collection_threshold_ * 1.2; + } + while (!sort_priority_.empty() && num > 0) { + ValueLogInfo* info = sort_priority_.top(); + sort_priority_.pop(); + if (info->logfile_number_ > fid) { + continue; + } + num--; + if (info->invalid_memory_ >= threshold) { + need_updates_.push_back(info); + delete_files_.insert(info->logfile_number_); + } + } +} + +// gc回收线程用来获得需要回收的文件 +bool SeparateManagement::GetGarbageCollectionQueue(uint64_t& fid, uint64_t& last_sequence){ + if (garbage_collection_.empty()) { + return false; + } else { + ValueLogInfo* info = garbage_collection_.front(); + garbage_collection_.pop_front(); + fid = info->logfile_number_; + last_sequence = info->last_sequence_; + return true; + } +} + +void SeparateManagement::CollectionMap(){ + if (map_file_info_.empty()) return; + + for (auto iter : map_file_info_) { + uint64_t fid = iter.first; + ValueLogInfo* info = iter.second; + if (delete_files_.find(fid) == delete_files_.end()) { + need_updates_.push_back(info); + delete_files_.insert(info->logfile_number_); + } + } +} + +} \ No newline at end of file diff --git a/db/kv_separate_management.h b/db/kv_separate_management.h new file mode 100644 index 0000000..b36af1c --- /dev/null +++ b/db/kv_separate_management.h @@ -0,0 +1,68 @@ +#ifndef LEVELDB_KV_SEPARATE_MANAGEMENT_H +#define LEVELDB_KV_SEPARATE_MANAGEMENT_H + +#include +#include + +#include +#include "leveldb/slice.h" +#include + + +namespace leveldb { + +typedef struct ValueLogInfo { + uint64_t last_sequence_; + size_t file_size_; // 文件大小 + uint64_t logfile_number_; // 文件编号 + int left_kv_numbers_; // 剩下的 kv 数量 + uint64_t invalid_memory_; // value log 中无效的空间大小 +}ValueLogInfo; + +struct MapCmp{ + bool operator ()(const ValueLogInfo* a, const ValueLogInfo* b) + { + return a->invalid_memory_ < b->invalid_memory_; // 按照 value 从大到小排列 + } +}; + +class SeparateManagement { + public: + SeparateManagement(uint64_t garbage_collection_threshold) + : garbage_collection_threshold_(garbage_collection_threshold) {} + + ~SeparateManagement() {} + + bool ConvertQueue(uint64_t& db_sequence); // 改变 db 的 last_sequence + + void UpdateMap(uint64_t fid,uint64_t abandon_memory); + + void UpdateQueue(uint64_t fid); + + bool GetGarbageCollectionQueue(uint64_t& fid,uint64_t& last_sequence); + + void WriteFileMap(uint64_t fid, int kv_numbers, size_t log_memory); + + bool MayNeedGarbageCollection() { return !garbage_collection_.empty(); } + + void RemoveFileFromMap(uint64_t fid) { map_file_info_.erase(fid); } + + bool EmptyMap() { return map_file_info_.empty(); } + + void CollectionMap(); + + private: + uint64_t garbage_collection_threshold_; + // 当前版本的所有的vlog文件的索引。 + std::unordered_map map_file_info_; + // 垃圾回收队列,这个队列中表示的文件info 将来是要进行gc回收的 + std::deque garbage_collection_; + // 需要触发gc回收的,但是还没有进行分配sequencen的info + std::deque need_updates_; + + std::unordered_set delete_files_; +}; + +} // namespace leveldb + +#endif //LEVELDB_KV_SEPARATE_MANAGEMENT_H \ No newline at end of file diff --git a/include/leveldb/env.h b/include/leveldb/env.h index 2f0dae3..ff3ae58 100644 --- a/include/leveldb/env.h +++ b/include/leveldb/env.h @@ -197,6 +197,7 @@ class LEVELDB_EXPORT Env { // serialized. virtual void Schedule(void (*function)(void* arg), void* arg) = 0; + virtual void ScheduleForGarbageCollection(void (*function)(void* arg), void* arg) = 0; // Start a new thread, invoking "function(arg)" within the new thread. // When "function(arg)" returns, the thread will be destroyed. virtual void StartThread(void (*function)(void* arg), void* arg) = 0; @@ -385,6 +386,11 @@ class LEVELDB_EXPORT EnvWrapper : public Env { void Schedule(void (*f)(void*), void* a) override { return target_->Schedule(f, a); } + + void ScheduleForGarbageCollection(void (*f)(void*), void* a) override{ + return target_->ScheduleForGarbageCollection(f, a); + } + void StartThread(void (*f)(void*), void* a) override { return target_->StartThread(f, a); } diff --git a/include/leveldb/write_batch.h b/include/leveldb/write_batch.h index c09ceb0..687444e 100644 --- a/include/leveldb/write_batch.h +++ b/include/leveldb/write_batch.h @@ -76,10 +76,16 @@ class LEVELDB_EXPORT WriteBatch { Status Iterate(Handler* handler) const; Status Iterate(Handler* handler, uint64_t fid, uint64_t offset) const; + bool IsGarbageColletion() { return belong_to_gc; } + + void setGarbageColletion(bool is_gc) { belong_to_gc = is_gc; } + private: friend class WriteBatchInternal; size_t separate_threshold_; std::string rep_; // See comment in write_batch.cc for the format of rep_ + + bool belong_to_gc; }; } // namespace leveldb diff --git a/util/env_posix.cc b/util/env_posix.cc index eb26480..a60616a 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -698,6 +698,9 @@ class PosixEnv : public Env { void Schedule(void (*background_work_function)(void* background_work_arg), void* background_work_arg) override; + void ScheduleForGarbageCollection(void (*background_work_function)(void* background_work_arg), + void* background_work_arg) override; + void StartThread(void (*thread_main)(void* thread_main_arg), void* thread_main_arg) override { std::thread new_thread(thread_main, thread_main_arg); @@ -753,11 +756,16 @@ class PosixEnv : public Env { private: void BackgroundThreadMain(); + void BackgroundThreadMainGarbageCollection(); static void BackgroundThreadEntryPoint(PosixEnv* env) { env->BackgroundThreadMain(); } + static void BackgroundThreadEntryPointforGlobalCollection(PosixEnv* env) { + env->BackgroundThreadMainGarbageCollection(); + } + // Stores the work item data in a Schedule() call. // // Instances are constructed on the thread calling Schedule() and used on the @@ -779,6 +787,14 @@ class PosixEnv : public Env { std::queue background_work_queue_ GUARDED_BY(background_work_mutex_); + // TODO begin gc 回收相关的变量 + port::Mutex background_GlobalCollection_work_mutex_; + port::CondVar background_GlobalCollection_work_cv_ GUARDED_BY(background_GlobalCollection_work_mutex_); + + std::queue background_GlobalCollection_work_queue_ + GUARDED_BY(background_GlobalCollection_work_mutex_); + // TODO end + PosixLockTable locks_; // Thread-safe. Limiter mmap_limiter_; // Thread-safe. Limiter fd_limiter_; // Thread-safe. @@ -814,6 +830,7 @@ int MaxOpenFiles() { PosixEnv::PosixEnv() : background_work_cv_(&background_work_mutex_), + background_GlobalCollection_work_cv_(&background_GlobalCollection_work_mutex_), started_background_thread_(false), mmap_limiter_(MaxMmaps()), fd_limiter_(MaxOpenFiles()) {} @@ -858,6 +875,41 @@ void PosixEnv::BackgroundThreadMain() { } } +void PosixEnv::ScheduleForGarbageCollection( + void (*background_work_function)(void* background_work_arg), + void* background_work_arg) { + background_GlobalCollection_work_mutex_.Lock(); + + + // If the queue is empty, the background thread may be waiting for work. + if (background_GlobalCollection_work_queue_.empty()) { + background_GlobalCollection_work_cv_.Signal(); + } + // 因为是锁住了 所以可以先 signal 再 emplace。 + background_GlobalCollection_work_queue_.emplace(background_work_function, background_work_arg); + background_GlobalCollection_work_mutex_.Unlock(); +} + +// gc 的后台回收任务 +void PosixEnv::BackgroundThreadMainGarbageCollection() { + while (true) { + background_GlobalCollection_work_mutex_.Lock(); + + // Wait until there is work to be done. + while (background_GlobalCollection_work_queue_.empty()) { + background_GlobalCollection_work_cv_.Wait(); + } + + assert(!background_GlobalCollection_work_queue_.empty()); + auto background_work_function = background_GlobalCollection_work_queue_.front().function; + void* background_work_arg = background_GlobalCollection_work_queue_.front().arg; + background_GlobalCollection_work_queue_.pop(); + + background_GlobalCollection_work_mutex_.Unlock(); + background_work_function(background_work_arg); + } +} + namespace { // Wraps an Env instance whose destructor is never created.