From 11fc9d3a4487bceb38b54516a86a1f0a9dfe6ed0 Mon Sep 17 00:00:00 2001 From: GUJIEJASON <1776127334@qq.com> Date: Thu, 2 Jan 2025 02:27:00 +0800 Subject: [PATCH] recover and version control --- db/db_impl.cc | 141 +++++++++++++++++++++++++++++++++++++++++++++++------ db/db_impl.h | 2 +- db/version_edit.cc | 35 ++++++++++++- db/version_edit.h | 32 ++++++++++++ db/version_set.cc | 28 ++++++++++- db/version_set.h | 17 +++++++ 6 files changed, 236 insertions(+), 19 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index cf4cec7..a27619c 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -354,11 +354,20 @@ Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) { uint64_t number; FileType type; std::vector logs; + uint64_t max_number = 0; for (size_t i = 0; i < filenames.size(); i++) { if (ParseFileName(filenames[i], &number, &type)) { + // expected.erase(number); + // if (type == kLogFile && ((number >= min_log) || (number == prev_log))) + // logs.push_back(number); + //TODO begin + if(number > max_number) max_number = number; + //删除存在的文件 expected.erase(number); - if (type == kLogFile && ((number >= min_log) || (number == prev_log))) + //存储当前已有的日志文件 + if (type == kLogFile) logs.push_back(number); + //TODO end } } if (!expected.empty()) { @@ -370,18 +379,34 @@ Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) { // Recover in the order in which the logs were generated std::sort(logs.begin(), logs.end()); - for (size_t i = 0; i < logs.size(); i++) { - s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit, - &max_sequence); - if (!s.ok()) { - return s; - } - - // The previous incarnation may not have written any MANIFEST - // records after allocating this log number. So we manually - // update the file number allocation counter in VersionSet. - versions_->MarkFileNumberUsed(logs[i]); + // for (size_t i = 0; i < logs.size(); i++) { + // s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit, + // &max_sequence); + // if (!s.ok()) { + // return s; + // } + + // // The previous incarnation may not have written any MANIFEST + // // records after allocating this log number. So we manually + // // update the file number allocation counter in VersionSet. + // versions_->MarkFileNumberUsed(logs[i]); + // } + //TODO begin + bool found_sequence_pos = false; + for(int i = 0; i < logs.size(); ++i){ + if( logs[i] < versions_->ImmLogFileNumber() ) { + continue; + } + Log(options_.info_log, "RecoverLogFile old log: %06llu \n", static_cast(logs[i])); + //重做日志操作 + s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit, + &max_sequence, found_sequence_pos); + if (!s.ok()) { + return s; + } } + versions_->MarkFileNumberUsed(max_number); + //TODO end if (versions_->LastSequence() < max_sequence) { versions_->SetLastSequence(max_sequence); @@ -392,7 +417,8 @@ Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) { Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, bool* save_manifest, VersionEdit* edit, - SequenceNumber* max_sequence) { + SequenceNumber* max_sequence, + bool& found_sequence_pos) { struct LogReporter : public log::Reader::Reporter { Env* env; Logger* info_log; @@ -435,14 +461,45 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, std::string scratch; Slice record; WriteBatch batch; + uint64_t record_offset = 0; int compactions = 0; MemTable* mem = nullptr; + // TODO begin + uint64_t imm_last_sequence = versions_->ImmLastSequence(); + // TODO end while (reader.ReadRecord(&record, &scratch) && status.ok()) { - if (record.size() < 12) { + // if (record.size() < 12) { + if (record.size() < 20) { reporter.Corruption(record.size(), Status::Corruption("log record too small")); continue; } + // TODO begin 如果 imm_last_sequence == 0 的话,那么整个说明没有进行一次imm 转sst的情况, + // 所有的log文件都需要进行回收,因为也有可能是manifest出问题了。 + // 回收编号最大的log文件即可。note + if( !found_sequence_pos && imm_last_sequence != 0 ){ + Slice tmp = record; + tmp.remove_prefix(8); + uint64_t seq = DecodeFixed64(tmp.data()); + tmp.remove_prefix(8); + uint64_t kv_numbers = DecodeFixed32(tmp.data()); + + // 解析出来的seq不符合要求跳过。恢复时定位seq位置时候一定是要大于或者等于 versions_->LastSequence() + if( ( seq + kv_numbers - 1 ) < imm_last_sequence ) { + record_offset += record.size(); + continue; + }else if( ( seq + kv_numbers - 1 ) == imm_last_sequence ){ + // open db 落盘过sst,再一次打开db就是这个情况。 + found_sequence_pos = true; + record_offset += record.size(); + continue; + }else { // open db 之后没有落盘过sst,然后数据库就关闭了,第二次又恢复的时候就是这个情况 + found_sequence_pos = true; + } + } + // 去除头部信息 crc 和length + record.remove_prefix(log::vHeaderSize); + // TODO end WriteBatchInternal::SetContents(&batch, record); if (mem == nullptr) { @@ -463,6 +520,12 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { compactions++; *save_manifest = true; + + // TODO begin mem 落盘修改 imm_last_sequence,版本恢复 + versions_->SetImmLastSequence(mem->GetTailSequence()); + versions_->SetImmLogFileNumber(log_number); + // TODO end + status = WriteLevel0Table(mem, edit, nullptr); mem->Unref(); mem = nullptr; @@ -501,6 +564,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, if (mem != nullptr) { // mem did not get reused; compact it. if (status.ok()) { + + // TODO begin mem 落盘修改 imm_last_sequence,版本恢复 + versions_->SetImmLastSequence(mem->GetTailSequence()); + versions_->SetImmLogFileNumber(log_number); + // TODO end *save_manifest = true; status = WriteLevel0Table(mem, edit, nullptr); } @@ -573,7 +641,15 @@ void DBImpl::CompactMemTable() { if (s.ok()) { edit.SetPrevLogNumber(0); edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed + // s = versions_->LogAndApply(&edit, &mutex_); + // TODO begin + //构建新版本,并将其加入到 version_当中 + versions_->StartImmLastSequence(true); + versions_->SetImmLastSequence(imm_->GetTailSequence()); + versions_->SetImmLogFileNumber(imm_->GetLogFileNumber()); s = versions_->LogAndApply(&edit, &mutex_); + versions_->StartImmLastSequence(false); + // TODO end } if (s.ok()) { @@ -764,6 +840,12 @@ void DBImpl::BackgroundCompaction() { if (!status.ok()) { RecordBackgroundError(status); } + // TODO begin conmpact 后需要考虑是否将 value log 文件进行 gc回收,如果需要将其加入到回收任务队列中。 + // 不进行后台的gc回收,那么也不更新待分配sequence的log了。 + if(!finish_back_garbage_collection_){ + garbage_colletion_management_->UpdateQueue(versions_->ImmLogFileNumber() ); + } + // TODO end CleanupCompaction(compact); c->ReleaseInputs(); RemoveObsoleteFiles(); @@ -1599,6 +1681,13 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { // Recover handles create_if_missing, error_if_exists bool save_manifest = false; Status s = impl->Recover(&edit, &save_manifest); + + // TODO begin + std::vector logs; + s = impl->GetAllValueLog(dbname,logs); + sort(logs.begin(),logs.end()); + // TODO end + if (s.ok() && impl->mem_ == nullptr) { // Create new log and a corresponding memtable. uint64_t new_log_number = impl->versions_->NewFileNumber(); @@ -1617,12 +1706,36 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { if (s.ok() && save_manifest) { edit.SetPrevLogNumber(0); // No older logs needed after recovery. edit.SetLogNumber(impl->logfile_number_); + // s = impl->versions_->LogAndApply(&edit, &impl->mutex_); + // TODO begin 这里也要设置 imm_last_sequence 设置到新的maifest当中,不然下次就没有值了。 + // 就是全盘恢复了。 + impl->versions_->StartImmLastSequence(true); s = impl->versions_->LogAndApply(&edit, &impl->mutex_); + impl->versions_->StartImmLastSequence(false); + // TODO end } if (s.ok()) { impl->RemoveObsoleteFiles(); impl->MaybeScheduleCompaction(); } + // TODO beigin 开始全盘的回收。 + + if( s.ok() && impl->options_.start_garbage_collection ){ + if( s.ok() ){ + int size = logs.size(); + for( int i = 0; i < size ; i++){ + uint64_t fid = logs[i]; + uint64_t next_sequence = impl->versions_->LastSequence() + 1; + std::cout<<" collection file : "<mutex_.Unlock(); + Status stmp = impl->CollectionValueLog( fid,next_sequence ); + impl->mutex_.Lock(); + if( !stmp.ok() ) s = stmp; + impl->versions_->SetLastSequence(next_sequence - 1); + } + } + } + // TODO end impl->mutex_.Unlock(); if (s.ok()) { assert(impl->mem_ != nullptr); diff --git a/db/db_impl.h b/db/db_impl.h index 46a8993..d729107 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -131,7 +131,7 @@ class DBImpl : public DB { void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status RecoverLogFile(uint64_t log_number, bool last_log, bool* save_manifest, - VersionEdit* edit, SequenceNumber* max_sequence) + VersionEdit* edit, SequenceNumber* max_sequence,bool& found_sequence_pos) EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base) diff --git a/db/version_edit.cc b/db/version_edit.cc index 356ce88..84e7649 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -20,7 +20,12 @@ enum Tag { kDeletedFile = 6, kNewFile = 7, // 8 was used for large value refs - kPrevLogNumber = 9 + kPrevLogNumber = 9, + // TODO begin 在版本中记录 immemtable 转到 sst的时候的 sequence,主要用来恢复的时候 定位db关闭的时候 + // imm 和 mem中的内容在恢复的时候应该从log文件中哪里开始恢复。 + kImmLastSequence = 10, + kLogFile = 11 + // TODO end }; void VersionEdit::Clear() { @@ -29,12 +34,23 @@ void VersionEdit::Clear() { prev_log_number_ = 0; last_sequence_ = 0; next_file_number_ = 0; + + // TODO begin + imm_last_sequence_ = 0; + imm_log_file_number_ = 0; + // TODO end + has_comparator_ = false; has_log_number_ = false; has_prev_log_number_ = false; has_next_file_number_ = false; has_last_sequence_ = false; - compact_pointers_.clear(); + + // TODO begin + has_imm_last_sequence_ = false; + // TODO end + + // compact_pointers_.clear(); deleted_files_.clear(); new_files_.clear(); } @@ -60,6 +76,11 @@ void VersionEdit::EncodeTo(std::string* dst) const { PutVarint32(dst, kLastSequence); PutVarint64(dst, last_sequence_); } + if (has_imm_last_sequence_) { + PutVarint32(dst, kImmLastSequence); + PutVarint64(dst, imm_last_sequence_); + PutVarint64(dst, imm_log_file_number_); + } for (size_t i = 0; i < compact_pointers_.size(); i++) { PutVarint32(dst, kCompactPointer); @@ -159,6 +180,16 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; + // TODO begin + case kImmLastSequence: + if (GetVarint64(&input, &imm_last_sequence_) && GetVarint64(&input, &imm_log_file_number_)) { + has_imm_last_sequence_ = true; + } else { + msg = "imemtable last sequence number"; + } + break; + // TODO end + case kCompactPointer: if (GetLevel(&input, &level) && GetInternalKey(&input, &key)) { compact_pointers_.push_back(std::make_pair(level, key)); diff --git a/db/version_edit.h b/db/version_edit.h index 137b4b1..6467a2f 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -26,6 +26,20 @@ struct FileMetaData { InternalKey largest; // Largest internal key served by table }; +// TODO begin +struct LogMetaData { + LogMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) {} + + int refs; // 引用计数 + // Seeks allowed until compaction; 当该值为0时,意味着需要进行compaction操作了; 变量allowed_seeks的值在sstable文件加入到version时确定 + int allowed_seeks; + uint64_t number; //文件名相关;sstable文件的名字是 number.ldb + uint64_t file_size; // File size in bytes 文件大小 + InternalKey smallest; // Smallest internal key served by table 最小的key + InternalKey largest; // Largest internal key served by table 最大的key +}; +// TODO end + class VersionEdit { public: VersionEdit() { Clear(); } @@ -53,6 +67,15 @@ class VersionEdit { has_last_sequence_ = true; last_sequence_ = seq; } + + // TODO begin + //设置序列号 imm_last_sequence_ imm 转 sst的时候用 + void SetImmLastSequence(SequenceNumber seq,uint64_t fid) { + has_imm_last_sequence_ = true; + imm_last_sequence_ = seq; + imm_log_file_number_ = fid; + } + // TODO end void SetCompactPointer(int level, const InternalKey& key) { compact_pointers_.push_back(std::make_pair(level, key)); } @@ -96,6 +119,15 @@ class VersionEdit { bool has_next_file_number_; bool has_last_sequence_; + // TODO begin + // 是否包含 imm_last_sequence_ + bool has_imm_last_sequence_; + // 恢复log的时候 用来定位memtable 和 immemtabl中的位置 + SequenceNumber imm_last_sequence_; + // imm_last_sequence 所处在的log文件 + uint64_t imm_log_file_number_; + // TODO end + std::vector> compact_pointers_; DeletedFileSet deleted_files_; std::vector> new_files_; diff --git a/db/version_set.cc b/db/version_set.cc index 5cd9244..6c7017e 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -809,6 +809,12 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { edit->SetNextFile(next_file_number_); edit->SetLastSequence(last_sequence_); + // TODO begin + if( SaveImmLastSequence() ){ + edit->SetImmLastSequence(imm_last_sequence_,imm_log_file_number_); + } + // TODO end + Version* v = new Version(this); { Builder builder(this, current_); @@ -912,6 +918,13 @@ Status VersionSet::Recover(bool* save_manifest) { bool have_prev_log_number = false; bool have_next_file = false; bool have_last_sequence = false; + + // TODO begin + bool have_imm_last_sequence = false; + uint64_t imm_last_sequence = 0; + uint64_t imm_log_file_number = 0; + // TODO end + uint64_t next_file = 0; uint64_t last_sequence = 0; uint64_t log_number = 0; @@ -962,6 +975,13 @@ Status VersionSet::Recover(bool* save_manifest) { last_sequence = edit.last_sequence_; have_last_sequence = true; } + // TODO begin + if (edit.has_imm_last_sequence_) { + imm_last_sequence = edit.imm_last_sequence_; + imm_log_file_number = edit.imm_log_file_number_; + have_imm_last_sequence = true; + } + // TODO end } } delete file; @@ -995,6 +1015,10 @@ Status VersionSet::Recover(bool* save_manifest) { last_sequence_ = last_sequence; log_number_ = log_number; prev_log_number_ = prev_log_number; + // TODO begin + imm_last_sequence_ = imm_last_sequence; + imm_log_file_number_ = imm_log_file_number; + // TODO end // See if we can reuse the existing MANIFEST file. if (ReuseManifest(dscname, current)) { @@ -1411,7 +1435,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) { current_->GetOverlappingInputs(level + 1, &smallest, &largest, &c->inputs_[1]); - AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]); + // AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]); // Get entire range covered by compaction InternalKey all_start, all_limit; @@ -1434,7 +1458,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) { std::vector expanded1; current_->GetOverlappingInputs(level + 1, &new_start, &new_limit, &expanded1); - AddBoundaryInputs(icmp_, current_->files_[level + 1], &expanded1); + // AddBoundaryInputs(icmp_, current_->files_[level + 1], &expanded1); if (expanded1.size() == c->inputs_[1].size()) { Log(options_->info_log, "Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n", diff --git a/db/version_set.h b/db/version_set.h index ea0c925..e3da5e7 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -269,6 +269,15 @@ class VersionSet { }; const char* LevelSummary(LevelSummaryStorage* scratch) const; + // TODO begin + bool SaveImmLastSequence(){ return save_imm_last_sequence_; } + bool StartImmLastSequence(bool save ){ save_imm_last_sequence_ = save; } + void SetImmLastSequence( uint64_t seq ){ imm_last_sequence_ = seq; } + uint64_t ImmLastSequence() const { return imm_last_sequence_; } + uint64_t ImmLogFileNumber() const { return imm_log_file_number_; } + void SetImmLogFileNumber( uint64_t fid ){ imm_log_file_number_ = fid; } + // TODO end + private: class Builder; @@ -304,6 +313,14 @@ class VersionSet { uint64_t log_number_; uint64_t prev_log_number_; // 0 or backing store for memtable being compacted + // TODO begin + uint64_t imm_last_sequence_; + // 是否保存 imm 转 sst时候的sequence,主要用在 LogAndApply 这个函数当中,用于区分是mior compact 还是 major compact的过程。 + bool save_imm_last_sequence_; + // imm_last_sequence 所处在的log文件 + uint64_t imm_log_file_number_; + // TODO end + // Opened lazily WritableFile* descriptor_file_; log::Writer* descriptor_log_;