瀏覽代碼

recover and version control

main
GUJIEJASON 8 月之前
父節點
當前提交
11fc9d3a44
共有 6 個檔案被更改,包括 236 行新增19 行删除
  1. +127
    -14
      db/db_impl.cc
  2. +1
    -1
      db/db_impl.h
  3. +33
    -2
      db/version_edit.cc
  4. +32
    -0
      db/version_edit.h
  5. +26
    -2
      db/version_set.cc
  6. +17
    -0
      db/version_set.h

+ 127
- 14
db/db_impl.cc 查看文件

@ -354,11 +354,20 @@ Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
uint64_t max_number = 0;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
// expected.erase(number);
// if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
// logs.push_back(number);
//TODO begin
if(number > max_number) max_number = number;
//删除存在的文件
expected.erase(number);
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
//存储当前已有的日志文件
if (type == kLogFile)
logs.push_back(number);
//TODO end
}
}
if (!expected.empty()) {
@ -370,18 +379,34 @@ Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
&max_sequence);
if (!s.ok()) {
return s;
}
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
// for (size_t i = 0; i < logs.size(); i++) {
// s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
// &max_sequence);
// if (!s.ok()) {
// return s;
// }
// // The previous incarnation may not have written any MANIFEST
// // records after allocating this log number. So we manually
// // update the file number allocation counter in VersionSet.
// versions_->MarkFileNumberUsed(logs[i]);
// }
//TODO begin
bool found_sequence_pos = false;
for(int i = 0; i < logs.size(); ++i){
if( logs[i] < versions_->ImmLogFileNumber() ) {
continue;
}
Log(options_.info_log, "RecoverLogFile old log: %06llu \n", static_cast<unsigned long long>(logs[i]));
//重做日志操作
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
&max_sequence, found_sequence_pos);
if (!s.ok()) {
return s;
}
}
versions_->MarkFileNumberUsed(max_number);
//TODO end
if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
@ -392,7 +417,8 @@ Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
bool* save_manifest, VersionEdit* edit,
SequenceNumber* max_sequence) {
SequenceNumber* max_sequence,
bool& found_sequence_pos) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
@ -435,14 +461,45 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
std::string scratch;
Slice record;
WriteBatch batch;
uint64_t record_offset = 0;
int compactions = 0;
MemTable* mem = nullptr;
// TODO begin
uint64_t imm_last_sequence = versions_->ImmLastSequence();
// TODO end
while (reader.ReadRecord(&record, &scratch) && status.ok()) {
if (record.size() < 12) {
// if (record.size() < 12) {
if (record.size() < 20) {
reporter.Corruption(record.size(),
Status::Corruption("log record too small"));
continue;
}
// TODO begin 如果 imm_last_sequence == 0 的话,那么整个说明没有进行一次imm 转sst的情况,
// 所有的log文件都需要进行回收,因为也有可能是manifest出问题了。
// 回收编号最大的log文件即可。note
if( !found_sequence_pos && imm_last_sequence != 0 ){
Slice tmp = record;
tmp.remove_prefix(8);
uint64_t seq = DecodeFixed64(tmp.data());
tmp.remove_prefix(8);
uint64_t kv_numbers = DecodeFixed32(tmp.data());
// 解析出来的seq不符合要求跳过。恢复时定位seq位置时候一定是要大于或者等于 versions_->LastSequence()
if( ( seq + kv_numbers - 1 ) < imm_last_sequence ) {
record_offset += record.size();
continue;
}else if( ( seq + kv_numbers - 1 ) == imm_last_sequence ){
// open db 落盘过sst,再一次打开db就是这个情况。
found_sequence_pos = true;
record_offset += record.size();
continue;
}else { // open db 之后没有落盘过sst,然后数据库就关闭了,第二次又恢复的时候就是这个情况
found_sequence_pos = true;
}
}
// 去除头部信息 crc 和length
record.remove_prefix(log::vHeaderSize);
// TODO end
WriteBatchInternal::SetContents(&batch, record);
if (mem == nullptr) {
@ -463,6 +520,12 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
compactions++;
*save_manifest = true;
// TODO begin mem 落盘修改 imm_last_sequence,版本恢复
versions_->SetImmLastSequence(mem->GetTailSequence());
versions_->SetImmLogFileNumber(log_number);
// TODO end
status = WriteLevel0Table(mem, edit, nullptr);
mem->Unref();
mem = nullptr;
@ -501,6 +564,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
if (mem != nullptr) {
// mem did not get reused; compact it.
if (status.ok()) {
// TODO begin mem 落盘修改 imm_last_sequence,版本恢复
versions_->SetImmLastSequence(mem->GetTailSequence());
versions_->SetImmLogFileNumber(log_number);
// TODO end
*save_manifest = true;
status = WriteLevel0Table(mem, edit, nullptr);
}
@ -573,7 +641,15 @@ void DBImpl::CompactMemTable() {
if (s.ok()) {
edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
// s = versions_->LogAndApply(&edit, &mutex_);
// TODO begin
//构建新版本,并将其加入到 version_当中
versions_->StartImmLastSequence(true);
versions_->SetImmLastSequence(imm_->GetTailSequence());
versions_->SetImmLogFileNumber(imm_->GetLogFileNumber());
s = versions_->LogAndApply(&edit, &mutex_);
versions_->StartImmLastSequence(false);
// TODO end
}
if (s.ok()) {
@ -764,6 +840,12 @@ void DBImpl::BackgroundCompaction() {
if (!status.ok()) {
RecordBackgroundError(status);
}
// TODO begin conmpact 后需要考虑是否将 value log 文件进行 gc回收,如果需要将其加入到回收任务队列中。
// 不进行后台的gc回收,那么也不更新待分配sequence的log了。
if(!finish_back_garbage_collection_){
garbage_colletion_management_->UpdateQueue(versions_->ImmLogFileNumber() );
}
// TODO end
CleanupCompaction(compact);
c->ReleaseInputs();
RemoveObsoleteFiles();
@ -1599,6 +1681,13 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
// Recover handles create_if_missing, error_if_exists
bool save_manifest = false;
Status s = impl->Recover(&edit, &save_manifest);
// TODO begin
std::vector<uint64_t> logs;
s = impl->GetAllValueLog(dbname,logs);
sort(logs.begin(),logs.end());
// TODO end
if (s.ok() && impl->mem_ == nullptr) {
// Create new log and a corresponding memtable.
uint64_t new_log_number = impl->versions_->NewFileNumber();
@ -1617,12 +1706,36 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
if (s.ok() && save_manifest) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
// s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
// TODO begin 这里也要设置 imm_last_sequence 设置到新的maifest当中,不然下次就没有值了。
// 就是全盘恢复了。
impl->versions_->StartImmLastSequence(true);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
impl->versions_->StartImmLastSequence(false);
// TODO end
}
if (s.ok()) {
impl->RemoveObsoleteFiles();
impl->MaybeScheduleCompaction();
}
// TODO beigin 开始全盘的回收。
if( s.ok() && impl->options_.start_garbage_collection ){
if( s.ok() ){
int size = logs.size();
for( int i = 0; i < size ; i++){
uint64_t fid = logs[i];
uint64_t next_sequence = impl->versions_->LastSequence() + 1;
std::cout<<" collection file : "<<fid<<std::endl;
impl->mutex_.Unlock();
Status stmp = impl->CollectionValueLog( fid,next_sequence );
impl->mutex_.Lock();
if( !stmp.ok() ) s = stmp;
impl->versions_->SetLastSequence(next_sequence - 1);
}
}
}
// TODO end
impl->mutex_.Unlock();
if (s.ok()) {
assert(impl->mem_ != nullptr);

+ 1
- 1
db/db_impl.h 查看文件

@ -131,7 +131,7 @@ class DBImpl : public DB {
void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status RecoverLogFile(uint64_t log_number, bool last_log, bool* save_manifest,
VersionEdit* edit, SequenceNumber* max_sequence)
VersionEdit* edit, SequenceNumber* max_sequence,bool& found_sequence_pos)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base)

+ 33
- 2
db/version_edit.cc 查看文件

@ -20,7 +20,12 @@ enum Tag {
kDeletedFile = 6,
kNewFile = 7,
// 8 was used for large value refs
kPrevLogNumber = 9
kPrevLogNumber = 9,
// TODO begin 在版本中记录 immemtable 转到 sst的时候的 sequence,主要用来恢复的时候 定位db关闭的时候
// imm 和 mem中的内容在恢复的时候应该从log文件中哪里开始恢复。
kImmLastSequence = 10,
kLogFile = 11
// TODO end
};
void VersionEdit::Clear() {
@ -29,12 +34,23 @@ void VersionEdit::Clear() {
prev_log_number_ = 0;
last_sequence_ = 0;
next_file_number_ = 0;
// TODO begin
imm_last_sequence_ = 0;
imm_log_file_number_ = 0;
// TODO end
has_comparator_ = false;
has_log_number_ = false;
has_prev_log_number_ = false;
has_next_file_number_ = false;
has_last_sequence_ = false;
compact_pointers_.clear();
// TODO begin
has_imm_last_sequence_ = false;
// TODO end
// compact_pointers_.clear();
deleted_files_.clear();
new_files_.clear();
}
@ -60,6 +76,11 @@ void VersionEdit::EncodeTo(std::string* dst) const {
PutVarint32(dst, kLastSequence);
PutVarint64(dst, last_sequence_);
}
if (has_imm_last_sequence_) {
PutVarint32(dst, kImmLastSequence);
PutVarint64(dst, imm_last_sequence_);
PutVarint64(dst, imm_log_file_number_);
}
for (size_t i = 0; i < compact_pointers_.size(); i++) {
PutVarint32(dst, kCompactPointer);
@ -159,6 +180,16 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
}
break;
// TODO begin
case kImmLastSequence:
if (GetVarint64(&input, &imm_last_sequence_) && GetVarint64(&input, &imm_log_file_number_)) {
has_imm_last_sequence_ = true;
} else {
msg = "imemtable last sequence number";
}
break;
// TODO end
case kCompactPointer:
if (GetLevel(&input, &level) && GetInternalKey(&input, &key)) {
compact_pointers_.push_back(std::make_pair(level, key));

+ 32
- 0
db/version_edit.h 查看文件

@ -26,6 +26,20 @@ struct FileMetaData {
InternalKey largest; // Largest internal key served by table
};
// TODO begin
struct LogMetaData {
LogMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) {}
int refs; //
// Seeks allowed until compaction; 0,compaction操作了; allowed_seeks的值在sstable文件加入到version时确定
int allowed_seeks;
uint64_t number; //;sstable文件的名字是 number.ldb
uint64_t file_size; // File size in bytes
InternalKey smallest; // Smallest internal key served by table key
InternalKey largest; // Largest internal key served by table key
};
// TODO end
class VersionEdit {
public:
VersionEdit() { Clear(); }
@ -53,6 +67,15 @@ class VersionEdit {
has_last_sequence_ = true;
last_sequence_ = seq;
}
// TODO begin
// imm_last_sequence_ imm sst的时候用
void SetImmLastSequence(SequenceNumber seq,uint64_t fid) {
has_imm_last_sequence_ = true;
imm_last_sequence_ = seq;
imm_log_file_number_ = fid;
}
// TODO end
void SetCompactPointer(int level, const InternalKey& key) {
compact_pointers_.push_back(std::make_pair(level, key));
}
@ -96,6 +119,15 @@ class VersionEdit {
bool has_next_file_number_;
bool has_last_sequence_;
// TODO begin
// imm_last_sequence_
bool has_imm_last_sequence_;
// log的时候 memtable immemtabl中的位置
SequenceNumber imm_last_sequence_;
// imm_last_sequence log文件
uint64_t imm_log_file_number_;
// TODO end
std::vector<std::pair<int, InternalKey>> compact_pointers_;
DeletedFileSet deleted_files_;
std::vector<std::pair<int, FileMetaData>> new_files_;

+ 26
- 2
db/version_set.cc 查看文件

@ -809,6 +809,12 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);
// TODO begin
if( SaveImmLastSequence() ){
edit->SetImmLastSequence(imm_last_sequence_,imm_log_file_number_);
}
// TODO end
Version* v = new Version(this);
{
Builder builder(this, current_);
@ -912,6 +918,13 @@ Status VersionSet::Recover(bool* save_manifest) {
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
// TODO begin
bool have_imm_last_sequence = false;
uint64_t imm_last_sequence = 0;
uint64_t imm_log_file_number = 0;
// TODO end
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
@ -962,6 +975,13 @@ Status VersionSet::Recover(bool* save_manifest) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
// TODO begin
if (edit.has_imm_last_sequence_) {
imm_last_sequence = edit.imm_last_sequence_;
imm_log_file_number = edit.imm_log_file_number_;
have_imm_last_sequence = true;
}
// TODO end
}
}
delete file;
@ -995,6 +1015,10 @@ Status VersionSet::Recover(bool* save_manifest) {
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;
// TODO begin
imm_last_sequence_ = imm_last_sequence;
imm_log_file_number_ = imm_log_file_number;
// TODO end
// See if we can reuse the existing MANIFEST file.
if (ReuseManifest(dscname, current)) {
@ -1411,7 +1435,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
current_->GetOverlappingInputs(level + 1, &smallest, &largest,
&c->inputs_[1]);
AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]);
// AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]);
// Get entire range covered by compaction
InternalKey all_start, all_limit;
@ -1434,7 +1458,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
std::vector<FileMetaData*> expanded1;
current_->GetOverlappingInputs(level + 1, &new_start, &new_limit,
&expanded1);
AddBoundaryInputs(icmp_, current_->files_[level + 1], &expanded1);
// AddBoundaryInputs(icmp_, current_->files_[level + 1], &expanded1);
if (expanded1.size() == c->inputs_[1].size()) {
Log(options_->info_log,
"Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n",

+ 17
- 0
db/version_set.h 查看文件

@ -269,6 +269,15 @@ class VersionSet {
};
const char* LevelSummary(LevelSummaryStorage* scratch) const;
// TODO begin
bool SaveImmLastSequence(){ return save_imm_last_sequence_; }
bool StartImmLastSequence(bool save ){ save_imm_last_sequence_ = save; }
void SetImmLastSequence( uint64_t seq ){ imm_last_sequence_ = seq; }
uint64_t ImmLastSequence() const { return imm_last_sequence_; }
uint64_t ImmLogFileNumber() const { return imm_log_file_number_; }
void SetImmLogFileNumber( uint64_t fid ){ imm_log_file_number_ = fid; }
// TODO end
private:
class Builder;
@ -304,6 +313,14 @@ class VersionSet {
uint64_t log_number_;
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted
// TODO begin
uint64_t imm_last_sequence_;
// imm sst时候的sequence LogAndApply mior compact major compact的过程
bool save_imm_last_sequence_;
// imm_last_sequence log文件
uint64_t imm_log_file_number_;
// TODO end
// Opened lazily
WritableFile* descriptor_file_;
log::Writer* descriptor_log_;

Loading…
取消
儲存