diff --git a/db/db_impl.cc b/db/db_impl.cc index 46f9221..93623d6 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -378,14 +378,14 @@ Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) { // expected.erase(number); // if (type == kLogFile && ((number >= min_log) || (number == prev_log))) // logs.push_back(number); - //TODO begin - if(number > max_number) max_number = number; - //删除存在的文件 + // begin 注释: max_number 为要恢复的最新的文件编号 + if (number > max_number) max_number = number; + // expected 里的文件现在依然存在,可以删除,需要恢复的是 expected 里不存在的 vlog 文件 expected.erase(number); - //存储当前已有的日志文件 + // 保存当前已有的 vlog 文件 if (type == kLogFile) logs.push_back(number); - //TODO end + // end } } if (!expected.empty()) { @@ -491,9 +491,9 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, Status::Corruption("log record too small")); continue; } - // TODO begin 如果 imm_last_sequence == 0 的话,那么整个说明没有进行一次imm 转sst的情况, - // 所有的log文件都需要进行回收,因为也有可能是manifest出问题了。 - // 回收编号最大的log文件即可。note + // begin 如果 imm_last_sequence == 0 的话, + // 那么整个说明没有进行一次 imm 转 sst的情况,所有的log文件都需要进行回收 + // 回收编号最大的 log 文件即可 if( !found_sequence_pos && imm_last_sequence != 0 ){ Slice tmp = record; tmp.remove_prefix(8); @@ -501,22 +501,22 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, tmp.remove_prefix(8); uint64_t kv_numbers = DecodeFixed32(tmp.data()); - // 解析出来的seq不符合要求跳过。恢复时定位seq位置时候一定是要大于或者等于 versions_->LastSequence() + // 解析出来的 seq 不符合要求跳过。恢复时定位 seq 位置一定要大于等于 versions_->LastSequence() if( ( seq + kv_numbers - 1 ) < imm_last_sequence ) { record_offset += record.size(); continue; }else if( ( seq + kv_numbers - 1 ) == imm_last_sequence ){ - // open db 落盘过sst,再一次打开db就是这个情况。 + // open db 落盘过 sst,再一次打开 db found_sequence_pos = true; record_offset += record.size(); continue; - }else { // open db 之后没有落盘过sst,然后数据库就关闭了,第二次又恢复的时候就是这个情况 + } else { // open db 之后没有落盘过 sst,然后关闭 db,第二次恢复的时候 found_sequence_pos = true; } } // 去除头部信息 crc 和length record.remove_prefix(log::vHeaderSize); - // TODO end + // end WriteBatchInternal::SetContents(&batch, record); if (mem == nullptr) { @@ -761,9 +761,8 @@ void DBImpl::RecordBackgroundError(const Status& s) { if (bg_error_.ok()) { bg_error_ = s; background_work_finished_signal_.SignalAll(); - // TODO begin - // garbage_collection_work_signal_.SignalAll(); - // TODO end + // 注释:唤醒后台 GC 线程 + garbage_collection_work_signal_.SignalAll(); } } @@ -784,12 +783,11 @@ void DBImpl::MaybeScheduleCompaction() { } } -// TODO begin 调度垃圾回收的相关的函数 不需要锁,仅仅是追加的方式。 -// 获得db库中所有的log文件,将其放入到vector中 -Status DBImpl::GetAllValueLog(std::string dir,std::vector& logs){ +// 注释:获取所有 VLogs +Status DBImpl::GetAllValueLog(std::string dir, std::vector& logs){ logs.clear(); std::vector filenames; - // 获取文件列表 + // 获取 VLogs 列表 Status s = env_->GetChildren(dir, &filenames); if (!s.ok()) { return s; @@ -798,7 +796,7 @@ Status DBImpl::GetAllValueLog(std::string dir,std::vector& logs){ FileType type; for (size_t i = 0; i < filenames.size(); i++) { if (ParseFileName(filenames[i], &number, &type)) { - //存储当前已有的日志文件 + // 获取所有 .log 文件 if (type == kLogFile) logs.push_back(number); } @@ -806,14 +804,11 @@ Status DBImpl::GetAllValueLog(std::string dir,std::vector& logs){ return s; } -// 手动进行离线回收、 -// 1. 如果管理类 separate_management中有的话,那么就按照这个类中的map的信息进行回收,主要用于删除快照后的一个回收 -// 2. 对db目录下的文件所有的文件进行回收,主要针对于open的时候。主线程中使用。 -// 返回的 status 如果是不ok的说明回收的时候出现一个log文件是有问题的。 +// 注释:手动进行离线回收 Status DBImpl::OutLineGarbageCollection(){ MutexLock l(&mutex_); Status s; - // map 中保存了文件的信息,那么就采用map来指导回收,否则对db下所有的log文件进行回收 + // map_file_info_ 非空,则根据它进行回收,否则进行所有 VLog 的回收 if (!garbage_collection_management_->EmptyMap()) { garbage_collection_management_->CollectionMap(); uint64_t last_sequence = versions_->LastSequence(); @@ -825,9 +820,8 @@ Status DBImpl::OutLineGarbageCollection(){ return s; } -// 读取回收一个log文件,不加锁 -// next_sequence : 只有在open的时候才会返回需要修改的值,在线gc是不需要的。 -// next_sequence 指的是第一个没有用到的sequence +// 注释:在线 GC,读取并回收一个 vlog 文件, +// next_sequence 指的是第一个没有用到的 sequence(由于是在线 GC ,所以需要提前指定) Status DBImpl::CollectionValueLog(uint64_t fid, uint64_t& next_sequence) { struct LogReporter : public log::VlogReader::Reporter { @@ -848,49 +842,45 @@ Status DBImpl::CollectionValueLog(uint64_t fid, uint64_t& next_sequence) { Slice record; std::string scratch; - // record_offset 每条record 相对文本开头的偏移。 + // record_offset 是每条 record 相对 VLog head 的偏移 uint64_t record_offset = 0; uint64_t size_offset = 0; WriteOptions opt(options_.background_garbage_collection_separate_); WriteBatch batch(opt.separate_threshold); batch.setGarbageColletion(true); WriteBatchInternal::SetSequence(&batch, next_sequence); - while( reader.ReadRecord(&record,&scratch) ){ + + while (reader.ReadRecord(&record, &scratch)) { const char* head_record_ptr = record.data(); record.remove_prefix(log::vHeaderSize + log::wHeaderSize); - while( record.size() > 0 ){ + while (record.size() > 0) { const char* head_kv_ptr = record.data(); - // kv对在文本中的偏移 uint64_t kv_offset = record_offset + head_kv_ptr - head_record_ptr; ValueType type = static_cast(record[0]); record.remove_prefix(1); Slice key; Slice value; std::string get_value; - GetLengthPrefixedSlice(&record,&key); - if( type != kTypeDeletion ){ + if (type != kTypeDeletion) { GetLengthPrefixedSlice(&record,&value); } - // 需要抛弃的值主要有以下三种情况:0,1,2 - // 0. log 中不是 kv 分离的都抛弃 - if(type != kTypeSeparation){ + if (type != kTypeSeparation) { continue; } status = this->GetLsm(key,&get_value); - // 1. 从LSM tree 中找不到值,说明这个值被删除了,log中要丢弃 - // 2. 找到了值,但是最新值不是kv分离的情况,所以也可以抛弃 - if (status.IsNotFound() || !status.IsSeparated() ) { + // 1. 从 LSM-tree 中找不到 key,说明这个 key 被删除了,vlog中要丢弃 + // 2. 找到了 key,但是最新的 kv 对不是 KV 分离的情况,也丢弃 + if (status.IsNotFound() || !status.IsSeparated()) { continue; } - // 读取错误,整个文件都不继续进行回收了 - if( !status.ok() ){ - + if (!status.ok()) { std::cout<<"read the file error "< config::gcWriteBatchSize ){ + if (kv_offset - size_offset > config::gcWriteBatchSize) { Write(opt, &batch); batch.Clear(); batch.setGarbageColletion(true); @@ -911,20 +902,19 @@ Status DBImpl::CollectionValueLog(uint64_t fid, uint64_t& next_sequence) { size_offset = kv_offset + kv_size; } } - } - record_offset += record.data() - head_record_ptr; } + Write(opt, &batch); status = env_->RemoveFile(logName); - if( status.ok() ){ + if (status.ok()) { garbage_collection_management_->RemoveFileFromMap(fid); } return status; } -// 回收任务 +// 注释:回收任务 void DBImpl::BackGroundGarbageCollection(){ uint64_t fid; uint64_t last_sequence; @@ -938,49 +928,50 @@ void DBImpl::BackGroundGarbageCollection(){ } } -// 可能调度后台线程进行压缩 +// 注释:可能调度后台线程进行 GC void DBImpl::MaybeScheduleGarbageCollection() { mutex_.AssertHeld(); if (background_GarbageCollection_scheduled_) { // Already scheduled - // 先检查线程是否已经被调度了,如果已经被调度了,就直接退出。 + // 先检查线程是否已经被调度了,如果已经被调度了,就直接退出 } else if (shutting_down_.load(std::memory_order_acquire)) { // DB is being deleted; no more background compactions - // 如果DB已经被关闭,那么就不调度了。 + // 如果 DB 已经被关闭,那么就不调度了。 } else if (!bg_error_.ok()) { // Already got an error; no more changes // 如果后台线程出错,也不调度。 } else { - //设置调度变量,通过detach线程调度;detach线程即使主线程退出,依然可以正常执行完成 -// background_GarbageCollection_scheduled_ = true; -// env_->ScheduleForGarbageCollection(&DBImpl::GarbageCollectionBGWork, this); + // 设置调度变量,通过 detach 线程调度; detach 线程即使主线程退出,依然可以正常执行完成 + background_GarbageCollection_scheduled_ = true; + env_->ScheduleForGarbageCollection(&DBImpl::GarbageCollectionBGWork, this); } } -// 后台gc线程中执行的任务 + +// 注释:后台 gc 线程中执行的任务 void DBImpl::GarbageCollectionBGWork(void* db) { reinterpret_cast(db)->GarbageCollectionBackgroundCall(); } +// 注释:后台 gc 线程执行 void DBImpl::GarbageCollectionBackgroundCall() { assert(background_GarbageCollection_scheduled_); if (shutting_down_.load(std::memory_order_acquire)) { // No more background work when shutting down. - // // 如果DB已经被关闭,那么就不调度了。 + // // 如果 DB 已经被关闭,那么就不调度了。 } else if (!bg_error_.ok()) { // No more background work after a background error. // 如果后台线程出错,也不调度。 } else { - // 开始后台GC回收线程 + // 开始后台 GC 线程 BackGroundGarbageCollection(); } background_GarbageCollection_scheduled_ = false; - //再调用 MaybeScheduleGarbageCollection 检查是否需要再次调度 - // MaybeScheduleGarbageCollection(); + // 再调用 MaybeScheduleGarbageCollection 检查是否需要再次调度 + MaybeScheduleGarbageCollection(); garbage_collection_work_signal_.SignalAll(); } - -// TODO end +// end void DBImpl::BGWork(void* db) { reinterpret_cast(db)->BackgroundCall(); @@ -1057,8 +1048,9 @@ void DBImpl::BackgroundCompaction() { if (!status.ok()) { RecordBackgroundError(status); } - // TODO begin conmpact 后需要考虑是否将 value log 文件进行 gc回收,如果需要将其加入到回收任务队列中。 - // 不进行后台的gc回收,那么也不更新待分配sequence的log了。 + // begin 注释: compact 后需要考虑是否将 vlog 文件进行 gc 回收, + // 如果需要则将其加入到 GC 任务队列中 + // 不进行后台的 gc 回收,那么也不用更新待分配 sequence 的 vlog if(!finish_back_garbage_collection_){ garbage_collection_management_->UpdateQueue(versions_->ImmLogFileNumber() ); } @@ -1320,10 +1312,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { break; } } - } else {// TODO begin - //fid ,key valuesize , + } else { + // begin 注释:drop 掉 LSM-tree 中的 kv 数值对了, + // 需要对属于 KV 分离的 kv 数值对进行 GC Slice drop_value = input->value(); - // 获得type类型 if( ikey.type == kTypeSeparation ){ uint64_t fid = 0; uint64_t offset = 0; @@ -1335,7 +1327,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { garbage_collection_management_->UpdateMap(fid,size); mutex_.Unlock(); } - }// TODO end + } // end input->Next(); } @@ -1619,20 +1611,20 @@ void DBImpl::RecordReadSample(Slice key) { const Snapshot* DBImpl::GetSnapshot() { MutexLock l(&mutex_); - // TODO begin 建立快照 对快照之后的信息不进行回收了。 + // begin 注释:建立快照,对快照之后的信息不用进行 GC finish_back_garbage_collection_ = true; - // TODO end + // end return snapshots_.New(versions_->LastSequence()); } void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) { MutexLock l(&mutex_); snapshots_.Delete(static_cast(snapshot)); - // TODO begin 没有快照了重新进行后台回收 - if( snapshots_.empty() ){ + // begin 注释:没有快照,重新进行后台 GC + if (snapshots_.empty()) { finish_back_garbage_collection_ = false; } - // TODO end + // end } /*** DBImpl 类关于 Fields 类的 Put、Get 接口 ***/ @@ -1685,32 +1677,28 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); last_sequence += WriteBatchInternal::Count(write_batch); - // TODO begin gc中的batch全部都是设置好的。此时是不需要设置的。 - if( !write_batch->IsGarbageColletion() ){ - // 判断是否需要进行垃圾回收,如需要,腾出一块sequence的区域,触发垃圾回收将在makeroomforwrite当中。 - // 先进行判断是否要进行gc后台回收,如果建立了快照的话finish_back_garbage_collection_就是true, - // 此时不进行sequence分配了。 - // - if( !finish_back_garbage_collection_ - && garbage_collection_management_->ConvertQueue(last_sequence) ){ - // 尝试调度gc回收线程进行回收。 + // begin 注释:GC 流程中写回的 WriteBatch 在 CollectionValueLog 函数中已经设置好了 + if (!write_batch->IsGarbageColletion()) { + // 判断是否需要进行 GC + // 如需要,空出一块 sequence 区域, 触发 GC 将在 MakeRoomForWrite 里 + // 先判断是否要进行 gc 后台回收 + // 如果建立了快照,finish_back_garbage_collection_ 就为 true + // 此时不进行 sequence 分配 + if (!finish_back_garbage_collection_ + && garbage_collection_management_->ConvertQueue(last_sequence)) { MaybeScheduleGarbageCollection(); } - //SetSequence在write_batch中写入本次的sequence - last_sequence += WriteBatchInternal::Count(write_batch); + // SetSequence 在 write_batch 中写入本次的 sequence + WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); + // last_sequence += WriteBatchInternal::Count(write_batch); } - // TODO 这里设置last_sequence 是为了照顾离线回收的时候,在map存在的时候需要调用 ConvertQueue 给回收任务分配sequence。 - // TODO 针对多线程调用put的时候,为了避免给gc回收的时候分配的sequence重叠。 + // 这里设置 last_sequence 是为了确保离线 GC 的时候, + // 在 map 存在的时候需要调用 ConvertQueue 给回收任务分配 sequence versions_->SetLastSequence(last_sequence); - // TODO end - - WriteBatchInternal::SetSequence(write_batch, last_sequence ); last_sequence += WriteBatchInternal::Count(write_batch); - - /* TODO */ vlog_kv_numbers_ += WriteBatchInternal::Count(write_batch); - // TODO end + // end // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging @@ -1789,15 +1777,15 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { ++iter; // Advance past "first" for (; iter != writers_.end(); ++iter) { Writer* w = *iter; - // TODO begin 写队列中如果碰到是gc的write_batch 停止合并。 + // begin 注释:写队列中如果遍历到是 gc 的 WriteBatch,停止合并 if (w->sync && !first->sync || first->batch->IsGarbageColletion() || w->batch->IsGarbageColletion()) { - // 当前的Writer要求 Sync ,而第一个Writer不要求Sync,两个的磁盘写入策略不一致。不做合并操作 + // 当前 Writer要求 Sync ,而第一个 Writer 不要求 Sync,两个磁盘写入策略不一致。不做合并操作 // Do not include a sync write into a batch handled by a non-sync write. break; } - // TODO end + // end if (w->batch != nullptr) { size += WriteBatchInternal::ByteSize(w->batch); if (size > max_size) { @@ -1996,11 +1984,11 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { bool save_manifest = false; Status s = impl->Recover(&edit, &save_manifest); - // TODO begin + // begin 注释: Recover 之后,获取所有 VLogs std::vector logs; s = impl->GetAllValueLog(dbname, logs); sort(logs.begin(),logs.end()); - // TODO end + // end if (s.ok() && impl->mem_ == nullptr) { // Create new log and a corresponding memtable. @@ -2021,20 +2009,21 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { edit.SetPrevLogNumber(0); // No older logs needed after recovery. edit.SetLogNumber(impl->logfile_number_); // s = impl->versions_->LogAndApply(&edit, &impl->mutex_); - // TODO begin 这里也要设置 imm_last_sequence 设置到新的maifest当中,不然下次就没有值了。 - // 就是全盘恢复了。 + // begin 注释:把 imm_last_sequence 设置到新的 manifest 当中, + // 即 RecoverLogFile 中的 imm -> sst 的情况,是一次成功的全盘恢复 impl->versions_->StartImmLastSequence(true); s = impl->versions_->LogAndApply(&edit, &impl->mutex_); impl->versions_->StartImmLastSequence(false); - // TODO end + // end } if (s.ok()) { impl->RemoveObsoleteFiles(); impl->MaybeScheduleCompaction(); } - // TODO begin 开始全盘的回收。 - if( s.ok() && impl->options_.start_garbage_collection ){ + // begin 开始全盘回收 + + if (s.ok() && impl->options_.start_garbage_collection) { if( s.ok() ){ int size = logs.size(); for( int i = 0; i < size ; i++){ @@ -2044,12 +2033,12 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { impl->mutex_.Unlock(); Status stmp = impl->CollectionValueLog(fid, next_sequence); impl->mutex_.Lock(); - if( !stmp.ok() ) s = stmp; + if (!stmp.ok()) s = stmp; impl->versions_->SetLastSequence(next_sequence - 1); } } } - // TODO end + // end impl->mutex_.Unlock(); if (s.ok()) { assert(impl->mem_ != nullptr); diff --git a/db/db_impl.h b/db/db_impl.h index 14a9b82..3e0fc4e 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -84,10 +84,11 @@ class DBImpl : public DB { // bytes. void RecordReadSample(Slice key); - // TODO begin + // begin 注释:手动进行离线回收 Status OutLineGarbageCollection(); + // 注释:在线 GC,读取并回收一个 vlog 文件 Status GetAllValueLog(std::string dir, std::vector& logs); - // TODO end + // end private: friend class DB; @@ -157,13 +158,14 @@ class DBImpl : public DB { void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); - // TODO begin + // begin 注释:参考 Compaction 的调度机制,对于 GC 也声明: + // GCBGWork、GCBackgroundCall、MaybeScheduleGC、BackGroundGC static void GarbageCollectionBGWork(void* db); void GarbageCollectionBackgroundCall(); void MaybeScheduleGarbageCollection() EXCLUSIVE_LOCKS_REQUIRED(mutex_); void BackGroundGarbageCollection(); Status CollectionValueLog(uint64_t fid, uint64_t& last_sequence); - // TODO end + // end static void BGWork(void* db); void BackgroundCall(); @@ -182,9 +184,10 @@ class DBImpl : public DB { return internal_comparator_.user_comparator(); } - // TODO begin - Status GetLsm( const Slice& key,std::string* value); - // TODO end + // 注释:GC 某个 VLog 文件时,每回收一条 record, + // 用于回查 LSM-tree 中该 record 里的所有 kv 是否依然存活, + // 再往 db 写回属于 GC 流程的写回 WriteBatch(只包含存活的有效 KV 数据对) + Status GetLsm( const Slice& key, std::string* value); // Constant after construction Env* const env_; @@ -210,7 +213,6 @@ class DBImpl : public DB { std::atomic has_imm_; // So bg thread can detect non-null imm_ WritableFile* logfile_; uint64_t logfile_number_ GUARDED_BY(mutex_); -// log::VlogWriter* log_; uint32_t seed_ GUARDED_BY(mutex_); // For sampling. // Queue of writers. @@ -239,16 +241,14 @@ class DBImpl : public DB { int vlog_kv_numbers_; - // TODO begin 用于gc回收的过程 - + // begin 注释:用于 gc 的线程互斥锁 port::CondVar garbage_collection_work_signal_ GUARDED_BY(mutex_); - // 表示后台gc线程是否已经被调度或者在运行 + // 表示后台 gc 线程是否正被调度 bool background_GarbageCollection_scheduled_ GUARDED_BY(mutex_); - + // 若为 true 则表示不允许后台 GC 线程继续进行 bool finish_back_garbage_collection_; + // end SeparateManagement* garbage_collection_management_; - - // TODO end }; // Sanitize db options. The caller should delete result.info_log if diff --git a/db/kv_separate_management.cc b/db/kv_separate_management.cc index 4bf6966..594a45b 100644 --- a/db/kv_separate_management.cc +++ b/db/kv_separate_management.cc @@ -5,8 +5,6 @@ namespace leveldb { -// 改变 db 的 last_sequence,给每一个需要进行gc回收的value log 文件分配 新的sequence的序号, -// 以便对value log 中的有效key的重新put进新的value log 中。返回值决定是否进行gc bool SeparateManagement::ConvertQueue(uint64_t& db_sequence) { if (!need_updates_.empty()) { db_sequence++; @@ -25,7 +23,6 @@ bool SeparateManagement::ConvertQueue(uint64_t& db_sequence) { return true; } -// 每一个vlog 罗盘的时候都会在map_file_info_ 中添加索引 ,这个在新建一个value log的时候会用到。 void SeparateManagement::WriteFileMap(uint64_t fid, int kv_numbers, size_t log_memory) { assert(map_file_info_.find(fid) == map_file_info_.end()); ValueLogInfo* info = new ValueLogInfo(); @@ -38,8 +35,6 @@ void SeparateManagement::WriteFileMap(uint64_t fid, int kv_numbers, size_t log_m map_file_info_.insert(std::make_pair(fid,info)); } -// map_file_info_ 存放了所有的value log 的信息,每次删除一个key的时候要对这个key对应的value log计算空间无效利用率 -// 所以要统计有多少空间是无效的,以便后面进行触发gc的过程。 void SeparateManagement::UpdateMap(uint64_t fid, uint64_t abandon_memory) { if (map_file_info_.find(fid) != map_file_info_.end()) { ValueLogInfo* info = map_file_info_[fid]; @@ -48,7 +43,6 @@ void SeparateManagement::UpdateMap(uint64_t fid, uint64_t abandon_memory) { } } -// 遍历 map_file_info_ 中所有的file 找到无效空间最大的log 进行gc回收 ,这个文件要不存在 delete_files_中 void SeparateManagement::UpdateQueue(uint64_t fid) { std::priority_queue, MapCmp> sort_priority_; @@ -57,29 +51,32 @@ void SeparateManagement::UpdateQueue(uint64_t fid) { sort_priority_.push(iter->second); } } + /* 默认每次只把一个 VLog 加入到 GC 队列 */ int num = 1; int threshold = garbage_collection_threshold_; if (!sort_priority_.empty() && sort_priority_.top()->invalid_memory_ >= garbage_collection_threshold_ * 1.2) { + /* 如果无效空间最多的 VLog 超过 GC 阈值 20%,这次会把 1~3 个 VLog 加入到 GC 队列 */ num = 3; threshold = garbage_collection_threshold_ * 1.2; } while (!sort_priority_.empty() && num > 0) { ValueLogInfo* info = sort_priority_.top(); sort_priority_.pop(); + /* 优先删除较旧的 VLog */ if (info->logfile_number_ > fid) { continue; } num--; if (info->invalid_memory_ >= threshold) { need_updates_.push_back(info); + /* 更新准备 GC(准备删除)的 VLog */ delete_files_.insert(info->logfile_number_); } } } -// gc回收线程用来获得需要回收的文件 -bool SeparateManagement::GetGarbageCollectionQueue(uint64_t& fid, uint64_t& last_sequence){ +bool SeparateManagement::GetGarbageCollectionQueue(uint64_t& fid, uint64_t& last_sequence) { if (garbage_collection_.empty()) { return false; } else { diff --git a/db/kv_separate_management.h b/db/kv_separate_management.h index b36af1c..4749cde 100644 --- a/db/kv_separate_management.h +++ b/db/kv_separate_management.h @@ -12,55 +12,56 @@ namespace leveldb { typedef struct ValueLogInfo { - uint64_t last_sequence_; - size_t file_size_; // 文件大小 - uint64_t logfile_number_; // 文件编号 - int left_kv_numbers_; // 剩下的 kv 数量 - uint64_t invalid_memory_; // value log 中无效的空间大小 + uint64_t last_sequence_; // VLog 中最后一个有效 kv 的序列号 + size_t file_size_; // VLog 文件大小 + uint64_t logfile_number_; // VLog 文件编号 + int left_kv_numbers_; // VLog 中剩下的 kv 数量 + uint64_t invalid_memory_; // VLog 中无效空间的大小 }ValueLogInfo; struct MapCmp{ bool operator ()(const ValueLogInfo* a, const ValueLogInfo* b) { - return a->invalid_memory_ < b->invalid_memory_; // 按照 value 从大到小排列 + return a->invalid_memory_ < b->invalid_memory_; } }; class SeparateManagement { - public: - SeparateManagement(uint64_t garbage_collection_threshold) - : garbage_collection_threshold_(garbage_collection_threshold) {} - - ~SeparateManagement() {} - - bool ConvertQueue(uint64_t& db_sequence); // 改变 db 的 last_sequence - - void UpdateMap(uint64_t fid,uint64_t abandon_memory); - - void UpdateQueue(uint64_t fid); - - bool GetGarbageCollectionQueue(uint64_t& fid,uint64_t& last_sequence); - - void WriteFileMap(uint64_t fid, int kv_numbers, size_t log_memory); - - bool MayNeedGarbageCollection() { return !garbage_collection_.empty(); } - - void RemoveFileFromMap(uint64_t fid) { map_file_info_.erase(fid); } - - bool EmptyMap() { return map_file_info_.empty(); } - - void CollectionMap(); - - private: - uint64_t garbage_collection_threshold_; - // 当前版本的所有的vlog文件的索引。 - std::unordered_map map_file_info_; - // 垃圾回收队列,这个队列中表示的文件info 将来是要进行gc回收的 - std::deque garbage_collection_; - // 需要触发gc回收的,但是还没有进行分配sequencen的info - std::deque need_updates_; - - std::unordered_set delete_files_; + public: + SeparateManagement(uint64_t garbage_collection_threshold) + : garbage_collection_threshold_(garbage_collection_threshold) {} + + ~SeparateManagement() {} + /* 更新数据库的最后一个序列号,为需要 GC 的 VLog 分配新的序列号,返回是否触发 GC */ + bool ConvertQueue(uint64_t& db_sequence); + /* 更新指定 VLogInfo 的无效空间和剩余键值对数量 */ + void UpdateMap(uint64_t fid, uint64_t abandon_memory); + /* 选择无效空间最大的 VLog 加入到 need_updates_ 队列 */ + void UpdateQueue(uint64_t fid); + /* 从 garbage_collection_ 队列中取出一个需要 GC 的 VLog,返回最后(最新)序列号 */ + bool GetGarbageCollectionQueue(uint64_t& fid, uint64_t& last_sequence); + /* 在 map_file_info_ 中添加新创建的 VLog 的 ValueLogInfo */ + void WriteFileMap(uint64_t fid, int kv_numbers, size_t log_memory); + /* 检查是否有任何 VLog 需要 GC */ + bool MayNeedGarbageCollection() { return !garbage_collection_.empty(); } + /* 从 map_file_info_ 中移除指定编号的 VLog */ + void RemoveFileFromMap(uint64_t fid) { map_file_info_.erase(fid); } + /* 检查 map_file_info_ 是否为空 */ + bool EmptyMap() { return map_file_info_.empty(); } + /* 遍历 map_file_info_ 中的所有 VLog,更新 need_updates_ 以便后续 GC */ + void CollectionMap(); + + private: + // VLog 触发 GC 的阈值 + uint64_t garbage_collection_threshold_; + // 当前 version 的所有 VLog 索引 + std::unordered_map map_file_info_; + // 即将 GC 的 VLog + std::deque garbage_collection_; + // 需要 GC 但尚未更新 Sequence 的 VLog + std::deque need_updates_; + // 正在删除的 VLog 文件编号 + std::unordered_set delete_files_; }; } // namespace leveldb diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 155e0ef..7832b2b 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -147,14 +147,14 @@ struct LEVELDB_EXPORT Options { // NewBloomFilterPolicy() here. const FilterPolicy* filter_policy = nullptr; - /* 需要再研究下 */ + /* 注释:重要的 VLog 与 GC 设置 */ // value log 的文件大小 uint64_t max_value_log_size = 16 * 1024 * 1024; - // gc 的回收阈值。 + // VLog 的 gc 的回收阈值。 uint64_t garbage_collection_threshold = max_value_log_size / 4; - // gc 后台回收时候重新put的时候,默认的kv分离的值。 + // gc 后台回收时重新 put 的时候,默认的 kv 分离的值。 uint64_t background_garbage_collection_separate_ = 1024 * 1024 - 1; - // 在open 数据库的时候就进行全盘的log文件回收 + // 在 open 数据库的时候就进行全盘的 vlog 回收 bool start_garbage_collection = false; }; diff --git a/util/env_posix.cc b/util/env_posix.cc index a60616a..05bde2e 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -787,13 +787,13 @@ class PosixEnv : public Env { std::queue background_work_queue_ GUARDED_BY(background_work_mutex_); - // TODO begin gc 回收相关的变量 + // begin 注释:GC 线程互斥锁 port::Mutex background_GlobalCollection_work_mutex_; port::CondVar background_GlobalCollection_work_cv_ GUARDED_BY(background_GlobalCollection_work_mutex_); std::queue background_GlobalCollection_work_queue_ GUARDED_BY(background_GlobalCollection_work_mutex_); - // TODO end + // end PosixLockTable locks_; // Thread-safe. Limiter mmap_limiter_; // Thread-safe.