瀏覽代碼

add notes

main
VirgilZhu 8 月之前
父節點
當前提交
3c1d0d49e6
共有 6 個文件被更改,包括 159 次插入172 次删除
  1. +93
    -104
      db/db_impl.cc
  2. +14
    -14
      db/db_impl.h
  3. +5
    -8
      db/kv_separate_management.cc
  4. +41
    -40
      db/kv_separate_management.h
  5. +4
    -4
      include/leveldb/options.h
  6. +2
    -2
      util/env_posix.cc

+ 93
- 104
db/db_impl.cc 查看文件

@ -378,14 +378,14 @@ Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
// expected.erase(number);
// if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
// logs.push_back(number);
//TODO begin
if(number > max_number) max_number = number;
//删除存在的文件
// begin 注释: max_number 为要恢复的最新的文件编号
if (number > max_number) max_number = number;
// expected 里的文件现在依然存在,可以删除,需要恢复的是 expected 里不存在的 vlog 文件
expected.erase(number);
//存储当前已有的日志文件
// 保存当前已有的 vlog 文件
if (type == kLogFile)
logs.push_back(number);
//TODO end
// end
}
}
if (!expected.empty()) {
@ -491,9 +491,9 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
Status::Corruption("log record too small"));
continue;
}
// TODO begin 如果 imm_last_sequence == 0 的话,那么整个说明没有进行一次imm 转sst的情况
// 所有的log文件都需要进行回收,因为也有可能是manifest出问题了。
// 回收编号最大的log文件即可。note
// begin 如果 imm_last_sequence == 0 的话,
// 那么整个说明没有进行一次 imm 转 sst的情况,所有的log文件都需要进行回收
// 回收编号最大的 log 文件即可
if( !found_sequence_pos && imm_last_sequence != 0 ){
Slice tmp = record;
tmp.remove_prefix(8);
@ -501,22 +501,22 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
tmp.remove_prefix(8);
uint64_t kv_numbers = DecodeFixed32(tmp.data());
// 解析出来的seq不符合要求跳过。恢复时定位seq位置时候一定要大于或者等于 versions_->LastSequence()
// 解析出来的 seq 不符合要求跳过。恢复时定位 seq 位置一定要大于等于 versions_->LastSequence()
if( ( seq + kv_numbers - 1 ) < imm_last_sequence ) {
record_offset += record.size();
continue;
}else if( ( seq + kv_numbers - 1 ) == imm_last_sequence ){
// open db 落盘过sst,再一次打开db就是这个情况。
// open db 落盘过 sst,再一次打开 db
found_sequence_pos = true;
record_offset += record.size();
continue;
}else { // open db 之后没有落盘过sst,然后数据库就关闭了,第二次又恢复的时候就是这个情况
} else { // open db 之后没有落盘过 sst,然后关闭 db,第二次恢复的时候
found_sequence_pos = true;
}
}
// 去除头部信息 crc 和length
record.remove_prefix(log::vHeaderSize);
// TODO end
// end
WriteBatchInternal::SetContents(&batch, record);
if (mem == nullptr) {
@ -761,9 +761,8 @@ void DBImpl::RecordBackgroundError(const Status& s) {
if (bg_error_.ok()) {
bg_error_ = s;
background_work_finished_signal_.SignalAll();
// TODO begin
// garbage_collection_work_signal_.SignalAll();
// TODO end
// 注释:唤醒后台 GC 线程
garbage_collection_work_signal_.SignalAll();
}
}
@ -784,12 +783,11 @@ void DBImpl::MaybeScheduleCompaction() {
}
}
// TODO begin 调度垃圾回收的相关的函数 不需要锁,仅仅是追加的方式。
// 获得db库中所有的log文件,将其放入到vector中
Status DBImpl::GetAllValueLog(std::string dir,std::vector<uint64_t>& logs){
// 注释:获取所有 VLogs
Status DBImpl::GetAllValueLog(std::string dir, std::vector<uint64_t>& logs){
logs.clear();
std::vector<std::string> filenames;
// 获取文件列表
// 获取 VLogs 列表
Status s = env_->GetChildren(dir, &filenames);
if (!s.ok()) {
return s;
@ -798,7 +796,7 @@ Status DBImpl::GetAllValueLog(std::string dir,std::vector& logs){
FileType type;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
//存储当前已有的日志文件
// 获取所有 .log 文件
if (type == kLogFile)
logs.push_back(number);
}
@ -806,14 +804,11 @@ Status DBImpl::GetAllValueLog(std::string dir,std::vector& logs){
return s;
}
// 手动进行离线回收、
// 1. 如果管理类 separate_management中有的话,那么就按照这个类中的map的信息进行回收,主要用于删除快照后的一个回收
// 2. 对db目录下的文件所有的文件进行回收,主要针对于open的时候。主线程中使用。
// 返回的 status 如果是不ok的说明回收的时候出现一个log文件是有问题的。
// 注释:手动进行离线回收
Status DBImpl::OutLineGarbageCollection(){
MutexLock l(&mutex_);
Status s;
// map 中保存了文件的信息,那么就采用map来指导回收,否则对db下所有的log文件进行回收
// map_file_info_ 非空,则根据它进行回收,否则进行所有 VLog 的回收
if (!garbage_collection_management_->EmptyMap()) {
garbage_collection_management_->CollectionMap();
uint64_t last_sequence = versions_->LastSequence();
@ -825,9 +820,8 @@ Status DBImpl::OutLineGarbageCollection(){
return s;
}
// 读取回收一个log文件,不加锁
// next_sequence : 只有在open的时候才会返回需要修改的值,在线gc是不需要的。
// next_sequence 指的是第一个没有用到的sequence
// 注释:在线 GC,读取并回收一个 vlog 文件,
// next_sequence 指的是第一个没有用到的 sequence(由于是在线 GC ,所以需要提前指定)
Status DBImpl::CollectionValueLog(uint64_t fid, uint64_t& next_sequence) {
struct LogReporter : public log::VlogReader::Reporter {
@ -848,49 +842,45 @@ Status DBImpl::CollectionValueLog(uint64_t fid, uint64_t& next_sequence) {
Slice record;
std::string scratch;
// record_offset 每条record 相对文本开头的偏移。
// record_offset 是每条 record 相对 VLog head 的偏移
uint64_t record_offset = 0;
uint64_t size_offset = 0;
WriteOptions opt(options_.background_garbage_collection_separate_);
WriteBatch batch(opt.separate_threshold);
batch.setGarbageColletion(true);
WriteBatchInternal::SetSequence(&batch, next_sequence);
while( reader.ReadRecord(&record,&scratch) ){
while (reader.ReadRecord(&record, &scratch)) {
const char* head_record_ptr = record.data();
record.remove_prefix(log::vHeaderSize + log::wHeaderSize);
while( record.size() > 0 ){
while (record.size() > 0) {
const char* head_kv_ptr = record.data();
// kv对在文本中的偏移
uint64_t kv_offset = record_offset + head_kv_ptr - head_record_ptr;
ValueType type = static_cast<ValueType>(record[0]);
record.remove_prefix(1);
Slice key;
Slice value;
std::string get_value;
GetLengthPrefixedSlice(&record,&key);
if( type != kTypeDeletion ){
if (type != kTypeDeletion) {
GetLengthPrefixedSlice(&record,&value);
}
// 需要抛弃的值主要有以下三种情况:0,1,2
// 0. log 中不是 kv 分离的都抛弃
if(type != kTypeSeparation){
if (type != kTypeSeparation) {
continue;
}
status = this->GetLsm(key,&get_value);
// 1. 从LSM tree 中找不到值,说明这个值被删除了,log中要丢弃
// 2. 找到了值,但是最新值不是kv分离的情况,所以也可以抛
if (status.IsNotFound() || !status.IsSeparated() ) {
// 1. 从 LSM-tree 中找不到 key,说明这个 key 被删除了,vlog中要丢弃
// 2. 找到了 key,但是最新的 kv 对不是 KV 分离的情况,也丢
if (status.IsNotFound() || !status.IsSeparated()) {
continue;
}
// 读取错误,整个文件都不继续进行回收了
if( !status.ok() ){
if (!status.ok()) {
std::cout<<"read the file error "<<std::endl;
return status;
}
// 判断是否要丢弃旧值
Slice get_slice(get_value);
uint64_t lsm_fid;
@ -898,10 +888,11 @@ Status DBImpl::CollectionValueLog(uint64_t fid, uint64_t& next_sequence) {
GetVarint64(&get_slice,&lsm_fid);
GetVarint64(&get_slice,&lsm_offset);
if( fid == lsm_fid && lsm_offset == kv_offset ){
if (fid == lsm_fid && lsm_offset == kv_offset) {
batch.Put(key, value);
++next_sequence;
if( kv_offset - size_offset > config::gcWriteBatchSize ){
if (kv_offset - size_offset > config::gcWriteBatchSize) {
Write(opt, &batch);
batch.Clear();
batch.setGarbageColletion(true);
@ -911,20 +902,19 @@ Status DBImpl::CollectionValueLog(uint64_t fid, uint64_t& next_sequence) {
size_offset = kv_offset + kv_size;
}
}
}
record_offset += record.data() - head_record_ptr;
}
Write(opt, &batch);
status = env_->RemoveFile(logName);
if( status.ok() ){
if (status.ok()) {
garbage_collection_management_->RemoveFileFromMap(fid);
}
return status;
}
// 回收任务
// 注释:回收任务
void DBImpl::BackGroundGarbageCollection(){
uint64_t fid;
uint64_t last_sequence;
@ -938,49 +928,50 @@ void DBImpl::BackGroundGarbageCollection(){
}
}
// 可能调度后台线程进行压缩
// 注释:可能调度后台线程进行 GC
void DBImpl::MaybeScheduleGarbageCollection() {
mutex_.AssertHeld();
if (background_GarbageCollection_scheduled_) {
// Already scheduled
// 先检查线程是否已经被调度了,如果已经被调度了,就直接退出
// 先检查线程是否已经被调度了,如果已经被调度了,就直接退出
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
// 如果DB已经被关闭,那么就不调度了。
// 如果 DB 已经被关闭,那么就不调度了。
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
// 如果后台线程出错,也不调度。
} else {
//设置调度变量,通过detach线程调度;detach线程即使主线程退出,依然可以正常执行完成
// background_GarbageCollection_scheduled_ = true;
// env_->ScheduleForGarbageCollection(&DBImpl::GarbageCollectionBGWork, this);
// 设置调度变量,通过 detach 线程调度; detach 线程即使主线程退出,依然可以正常执行完成
background_GarbageCollection_scheduled_ = true;
env_->ScheduleForGarbageCollection(&DBImpl::GarbageCollectionBGWork, this);
}
}
// 后台gc线程中执行的任务
// 注释:后台 gc 线程中执行的任务
void DBImpl::GarbageCollectionBGWork(void* db) {
reinterpret_cast<DBImpl*>(db)->GarbageCollectionBackgroundCall();
}
// 注释:后台 gc 线程执行
void DBImpl::GarbageCollectionBackgroundCall() {
assert(background_GarbageCollection_scheduled_);
if (shutting_down_.load(std::memory_order_acquire)) {
// No more background work when shutting down.
// // 如果DB已经被关闭,那么就不调度了。
// // 如果 DB 已经被关闭,那么就不调度了。
} else if (!bg_error_.ok()) {
// No more background work after a background error.
// 如果后台线程出错,也不调度。
} else {
// 开始后台GC回收线程
// 开始后台 GC 线程
BackGroundGarbageCollection();
}
background_GarbageCollection_scheduled_ = false;
//再调用 MaybeScheduleGarbageCollection 检查是否需要再次调度
// MaybeScheduleGarbageCollection();
// 再调用 MaybeScheduleGarbageCollection 检查是否需要再次调度
MaybeScheduleGarbageCollection();
garbage_collection_work_signal_.SignalAll();
}
// TODO end
// end
void DBImpl::BGWork(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundCall();
@ -1057,8 +1048,9 @@ void DBImpl::BackgroundCompaction() {
if (!status.ok()) {
RecordBackgroundError(status);
}
// TODO begin conmpact 后需要考虑是否将 value log 文件进行 gc回收,如果需要将其加入到回收任务队列中。
// 不进行后台的gc回收,那么也不更新待分配sequence的log了。
// begin 注释: compact 后需要考虑是否将 vlog 文件进行 gc 回收,
// 如果需要则将其加入到 GC 任务队列中
// 不进行后台的 gc 回收,那么也不用更新待分配 sequence 的 vlog
if(!finish_back_garbage_collection_){
garbage_collection_management_->UpdateQueue(versions_->ImmLogFileNumber() );
}
@ -1320,10 +1312,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
break;
}
}
} else {// TODO begin
//fid ,key valuesize ,
} else {
// begin 注释:drop 掉 LSM-tree 中的 kv 数值对了,
// 需要对属于 KV 分离的 kv 数值对进行 GC
Slice drop_value = input->value();
// 获得type类型
if( ikey.type == kTypeSeparation ){
uint64_t fid = 0;
uint64_t offset = 0;
@ -1335,7 +1327,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
garbage_collection_management_->UpdateMap(fid,size);
mutex_.Unlock();
}
}// TODO end
} // end
input->Next();
}
@ -1619,20 +1611,20 @@ void DBImpl::RecordReadSample(Slice key) {
const Snapshot* DBImpl::GetSnapshot() {
MutexLock l(&mutex_);
// TODO begin 建立快照 对快照之后的信息不进行回收了。
// begin 注释:建立快照,对快照之后的信息不用进行 GC
finish_back_garbage_collection_ = true;
// TODO end
// end
return snapshots_.New(versions_->LastSequence());
}
void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
MutexLock l(&mutex_);
snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot));
// TODO begin 没有快照了重新进行后台回收
if( snapshots_.empty() ){
// begin 注释:没有快照,重新进行后台 GC
if (snapshots_.empty()) {
finish_back_garbage_collection_ = false;
}
// TODO end
// end
}
/*** DBImpl 类关于 Fields 类的 Put、Get 接口 ***/
@ -1685,32 +1677,28 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
// TODO begin gc中的batch全部都是设置好的。此时是不需要设置的。
if( !write_batch->IsGarbageColletion() ){
// 判断是否需要进行垃圾回收,如需要,腾出一块sequence的区域,触发垃圾回收将在makeroomforwrite当中。
// 先进行判断是否要进行gc后台回收,如果建立了快照的话finish_back_garbage_collection_就是true,
// 此时不进行sequence分配了。
//
if( !finish_back_garbage_collection_
&& garbage_collection_management_->ConvertQueue(last_sequence) ){
// 尝试调度gc回收线程进行回收。
// begin 注释:GC 流程中写回的 WriteBatch 在 CollectionValueLog 函数中已经设置好了
if (!write_batch->IsGarbageColletion()) {
// 判断是否需要进行 GC
// 如需要,空出一块 sequence 区域, 触发 GC 将在 MakeRoomForWrite 里
// 先判断是否要进行 gc 后台回收
// 如果建立了快照,finish_back_garbage_collection_ 就为 true
// 此时不进行 sequence 分配
if (!finish_back_garbage_collection_
&& garbage_collection_management_->ConvertQueue(last_sequence)) {
MaybeScheduleGarbageCollection();
}
//SetSequence在write_batch中写入本次的sequence
last_sequence += WriteBatchInternal::Count(write_batch);
// SetSequence 在 write_batch 中写入本次的 sequence
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
// last_sequence += WriteBatchInternal::Count(write_batch);
}
// TODO 这里设置last_sequence 是为了照顾离线回收的时候,在map存在的时候需要调用 ConvertQueue 给回收任务分配sequence。
// TODO 针对多线程调用put的时候,为了避免给gc回收的时候分配的sequence重叠。
// 这里设置 last_sequence 是为了确保离线 GC 的时候,
// 在 map 存在的时候需要调用 ConvertQueue 给回收任务分配 sequence
versions_->SetLastSequence(last_sequence);
// TODO end
WriteBatchInternal::SetSequence(write_batch, last_sequence );
last_sequence += WriteBatchInternal::Count(write_batch);
/* TODO */
vlog_kv_numbers_ += WriteBatchInternal::Count(write_batch);
// TODO end
// end
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
@ -1789,15 +1777,15 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
++iter; // Advance past "first"
for (; iter != writers_.end(); ++iter) {
Writer* w = *iter;
// TODO begin 写队列中如果碰到是gc的write_batch 停止合并。
// begin 注释:写队列中如果遍历到是 gc 的 WriteBatch,停止合并
if (w->sync && !first->sync
|| first->batch->IsGarbageColletion()
|| w->batch->IsGarbageColletion()) {
// 当前的Writer要求 Sync ,而第一个Writer不要求Sync,两个的磁盘写入策略不一致。不做合并操作
// 当前 Writer要求 Sync ,而第一个 Writer 不要求 Sync,两个磁盘写入策略不一致。不做合并操作
// Do not include a sync write into a batch handled by a non-sync write.
break;
}
// TODO end
// end
if (w->batch != nullptr) {
size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size) {
@ -1996,11 +1984,11 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
bool save_manifest = false;
Status s = impl->Recover(&edit, &save_manifest);
// TODO begin
// begin 注释: Recover 之后,获取所有 VLogs
std::vector<uint64_t> logs;
s = impl->GetAllValueLog(dbname, logs);
sort(logs.begin(),logs.end());
// TODO end
// end
if (s.ok() && impl->mem_ == nullptr) {
// Create new log and a corresponding memtable.
@ -2021,20 +2009,21 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
// s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
// TODO begin 这里也要设置 imm_last_sequence 设置到新的maifest当中,不然下次就没有值了。
// 就是全盘恢复了。
// begin 注释:把 imm_last_sequence 设置到新的 manifest 当中,
// 即 RecoverLogFile 中的 imm -> sst 的情况,是一次成功的全盘恢复
impl->versions_->StartImmLastSequence(true);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
impl->versions_->StartImmLastSequence(false);
// TODO end
// end
}
if (s.ok()) {
impl->RemoveObsoleteFiles();
impl->MaybeScheduleCompaction();
}
// TODO begin 开始全盘的回收。
if( s.ok() && impl->options_.start_garbage_collection ){
// begin 开始全盘回收
if (s.ok() && impl->options_.start_garbage_collection) {
if( s.ok() ){
int size = logs.size();
for( int i = 0; i < size ; i++){
@ -2044,12 +2033,12 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
impl->mutex_.Unlock();
Status stmp = impl->CollectionValueLog(fid, next_sequence);
impl->mutex_.Lock();
if( !stmp.ok() ) s = stmp;
if (!stmp.ok()) s = stmp;
impl->versions_->SetLastSequence(next_sequence - 1);
}
}
}
// TODO end
// end
impl->mutex_.Unlock();
if (s.ok()) {
assert(impl->mem_ != nullptr);

+ 14
- 14
db/db_impl.h 查看文件

@ -84,10 +84,11 @@ class DBImpl : public DB {
// bytes.
void RecordReadSample(Slice key);
// TODO begin
// begin 线
Status OutLineGarbageCollection();
// 线 GC vlog
Status GetAllValueLog(std::string dir, std::vector<uint64_t>& logs);
// TODO end
// end
private:
friend class DB;
@ -157,13 +158,14 @@ class DBImpl : public DB {
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// TODO begin
// begin Compaction GC
// GCBGWorkGCBackgroundCallMaybeScheduleGCBackGroundGC
static void GarbageCollectionBGWork(void* db);
void GarbageCollectionBackgroundCall();
void MaybeScheduleGarbageCollection() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void BackGroundGarbageCollection();
Status CollectionValueLog(uint64_t fid, uint64_t& last_sequence);
// TODO end
// end
static void BGWork(void* db);
void BackgroundCall();
@ -182,9 +184,10 @@ class DBImpl : public DB {
return internal_comparator_.user_comparator();
}
// TODO begin
Status GetLsm( const Slice& key,std::string* value);
// TODO end
// GC VLog record
// LSM-tree record kv
// db GC WriteBatch KV
Status GetLsm( const Slice& key, std::string* value);
// Constant after construction
Env* const env_;
@ -210,7 +213,6 @@ class DBImpl : public DB {
std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_
WritableFile* logfile_;
uint64_t logfile_number_ GUARDED_BY(mutex_);
// log::VlogWriter* log_;
uint32_t seed_ GUARDED_BY(mutex_); // For sampling.
// Queue of writers.
@ -239,16 +241,14 @@ class DBImpl : public DB {
int vlog_kv_numbers_;
// TODO begin gc回收的过程
// begin gc 线
port::CondVar garbage_collection_work_signal_ GUARDED_BY(mutex_);
// gc线程是否已经被调度或者在运行
// gc 线
bool background_GarbageCollection_scheduled_ GUARDED_BY(mutex_);
// true GC 线
bool finish_back_garbage_collection_;
// end
SeparateManagement* garbage_collection_management_;
// TODO end
};
// Sanitize db options. The caller should delete result.info_log if

+ 5
- 8
db/kv_separate_management.cc 查看文件

@ -5,8 +5,6 @@
namespace leveldb {
// 改变 db 的 last_sequence,给每一个需要进行gc回收的value log 文件分配 新的sequence的序号,
// 以便对value log 中的有效key的重新put进新的value log 中。返回值决定是否进行gc
bool SeparateManagement::ConvertQueue(uint64_t& db_sequence) {
if (!need_updates_.empty()) {
db_sequence++;
@ -25,7 +23,6 @@ bool SeparateManagement::ConvertQueue(uint64_t& db_sequence) {
return true;
}
// 每一个vlog 罗盘的时候都会在map_file_info_ 中添加索引 ,这个在新建一个value log的时候会用到。
void SeparateManagement::WriteFileMap(uint64_t fid, int kv_numbers, size_t log_memory) {
assert(map_file_info_.find(fid) == map_file_info_.end());
ValueLogInfo* info = new ValueLogInfo();
@ -38,8 +35,6 @@ void SeparateManagement::WriteFileMap(uint64_t fid, int kv_numbers, size_t log_m
map_file_info_.insert(std::make_pair(fid,info));
}
// map_file_info_ 存放了所有的value log 的信息,每次删除一个key的时候要对这个key对应的value log计算空间无效利用率
// 所以要统计有多少空间是无效的,以便后面进行触发gc的过程。
void SeparateManagement::UpdateMap(uint64_t fid, uint64_t abandon_memory) {
if (map_file_info_.find(fid) != map_file_info_.end()) {
ValueLogInfo* info = map_file_info_[fid];
@ -48,7 +43,6 @@ void SeparateManagement::UpdateMap(uint64_t fid, uint64_t abandon_memory) {
}
}
// 遍历 map_file_info_ 中所有的file 找到无效空间最大的log 进行gc回收 ,这个文件要不存在 delete_files_中
void SeparateManagement::UpdateQueue(uint64_t fid) {
std::priority_queue<ValueLogInfo*, std::vector<ValueLogInfo*>, MapCmp> sort_priority_;
@ -57,29 +51,32 @@ void SeparateManagement::UpdateQueue(uint64_t fid) {
sort_priority_.push(iter->second);
}
}
/* 默认每次只把一个 VLog 加入到 GC 队列 */
int num = 1;
int threshold = garbage_collection_threshold_;
if (!sort_priority_.empty()
&& sort_priority_.top()->invalid_memory_ >= garbage_collection_threshold_ * 1.2) {
/* 如果无效空间最多的 VLog 超过 GC 阈值 20%,这次会把 1~3 个 VLog 加入到 GC 队列 */
num = 3;
threshold = garbage_collection_threshold_ * 1.2;
}
while (!sort_priority_.empty() && num > 0) {
ValueLogInfo* info = sort_priority_.top();
sort_priority_.pop();
/* 优先删除较旧的 VLog */
if (info->logfile_number_ > fid) {
continue;
}
num--;
if (info->invalid_memory_ >= threshold) {
need_updates_.push_back(info);
/* 更新准备 GC(准备删除)的 VLog */
delete_files_.insert(info->logfile_number_);
}
}
}
// gc回收线程用来获得需要回收的文件
bool SeparateManagement::GetGarbageCollectionQueue(uint64_t& fid, uint64_t& last_sequence){
bool SeparateManagement::GetGarbageCollectionQueue(uint64_t& fid, uint64_t& last_sequence) {
if (garbage_collection_.empty()) {
return false;
} else {

+ 41
- 40
db/kv_separate_management.h 查看文件

@ -12,55 +12,56 @@
namespace leveldb {
typedef struct ValueLogInfo {
uint64_t last_sequence_;
size_t file_size_; //
uint64_t logfile_number_; //
int left_kv_numbers_; // kv
uint64_t invalid_memory_; // value log
uint64_t last_sequence_; // VLog kv
size_t file_size_; // VLog
uint64_t logfile_number_; // VLog
int left_kv_numbers_; // VLog kv
uint64_t invalid_memory_; // VLog
}ValueLogInfo;
struct MapCmp{
bool operator ()(const ValueLogInfo* a, const ValueLogInfo* b)
{
return a->invalid_memory_ < b->invalid_memory_; // value
return a->invalid_memory_ < b->invalid_memory_;
}
};
class SeparateManagement {
public:
SeparateManagement(uint64_t garbage_collection_threshold)
: garbage_collection_threshold_(garbage_collection_threshold) {}
~SeparateManagement() {}
bool ConvertQueue(uint64_t& db_sequence); // db last_sequence
void UpdateMap(uint64_t fid,uint64_t abandon_memory);
void UpdateQueue(uint64_t fid);
bool GetGarbageCollectionQueue(uint64_t& fid,uint64_t& last_sequence);
void WriteFileMap(uint64_t fid, int kv_numbers, size_t log_memory);
bool MayNeedGarbageCollection() { return !garbage_collection_.empty(); }
void RemoveFileFromMap(uint64_t fid) { map_file_info_.erase(fid); }
bool EmptyMap() { return map_file_info_.empty(); }
void CollectionMap();
private:
uint64_t garbage_collection_threshold_;
// vlog文件的索引
std::unordered_map<uint64_t, ValueLogInfo*> map_file_info_;
// info gc回收的
std::deque<ValueLogInfo*> garbage_collection_;
// gc回收的sequencen的info
std::deque<ValueLogInfo*> need_updates_;
std::unordered_set<uint64_t> delete_files_;
public:
SeparateManagement(uint64_t garbage_collection_threshold)
: garbage_collection_threshold_(garbage_collection_threshold) {}
~SeparateManagement() {}
/* 更新数据库的最后一个序列号,为需要 GC 的 VLog 分配新的序列号,返回是否触发 GC */
bool ConvertQueue(uint64_t& db_sequence);
/* 更新指定 VLogInfo 的无效空间和剩余键值对数量 */
void UpdateMap(uint64_t fid, uint64_t abandon_memory);
/* 选择无效空间最大的 VLog 加入到 need_updates_ 队列 */
void UpdateQueue(uint64_t fid);
/* 从 garbage_collection_ 队列中取出一个需要 GC 的 VLog,返回最后(最新)序列号 */
bool GetGarbageCollectionQueue(uint64_t& fid, uint64_t& last_sequence);
/* 在 map_file_info_ 中添加新创建的 VLog 的 ValueLogInfo */
void WriteFileMap(uint64_t fid, int kv_numbers, size_t log_memory);
/* 检查是否有任何 VLog 需要 GC */
bool MayNeedGarbageCollection() { return !garbage_collection_.empty(); }
/* 从 map_file_info_ 中移除指定编号的 VLog */
void RemoveFileFromMap(uint64_t fid) { map_file_info_.erase(fid); }
/* 检查 map_file_info_ 是否为空 */
bool EmptyMap() { return map_file_info_.empty(); }
/* 遍历 map_file_info_ 中的所有 VLog,更新 need_updates_ 以便后续 GC */
void CollectionMap();
private:
// VLog GC
uint64_t garbage_collection_threshold_;
// version VLog
std::unordered_map<uint64_t, ValueLogInfo*> map_file_info_;
// GC VLog
std::deque<ValueLogInfo*> garbage_collection_;
// GC Sequence VLog
std::deque<ValueLogInfo*> need_updates_;
// VLog
std::unordered_set<uint64_t> delete_files_;
};
} // namespace leveldb

+ 4
- 4
include/leveldb/options.h 查看文件

@ -147,14 +147,14 @@ struct LEVELDB_EXPORT Options {
// NewBloomFilterPolicy() here.
const FilterPolicy* filter_policy = nullptr;
/* 需要再研究下 */
/* 注释:重要的 VLog 与 GC 设置 */
// value log
uint64_t max_value_log_size = 16 * 1024 * 1024;
// gc
// VLog gc
uint64_t garbage_collection_threshold = max_value_log_size / 4;
// gc put的时kv分离的值
// gc put kv
uint64_t background_garbage_collection_separate_ = 1024 * 1024 - 1;
// open log文件回
// open vlog
bool start_garbage_collection = false;
};

+ 2
- 2
util/env_posix.cc 查看文件

@ -787,13 +787,13 @@ class PosixEnv : public Env {
std::queue<BackgroundWorkItem> background_work_queue_
GUARDED_BY(background_work_mutex_);
// TODO begin gc 回收相关的变量
// begin 注释:GC 线程互斥锁
port::Mutex background_GlobalCollection_work_mutex_;
port::CondVar background_GlobalCollection_work_cv_ GUARDED_BY(background_GlobalCollection_work_mutex_);
std::queue<BackgroundWorkItem> background_GlobalCollection_work_queue_
GUARDED_BY(background_GlobalCollection_work_mutex_);
// TODO end
// end
PosixLockTable locks_; // Thread-safe.
Limiter mmap_limiter_; // Thread-safe.

Loading…
取消
儲存