VirgilZhu преди 8 месеца
родител
ревизия
9c3c34dda7
променени са 14 файла, в които са добавени 1450 реда и са изтрити 149 реда
  1. +1429
    -112
      README.md
  2. +17
    -28
      db/db_impl.cc
  3. Двоични данни
      image/kv_sep.png
  4. Двоични данни
      image/kv_test.png
  5. Двоични данни
      image/test_1.jpg
  6. Двоични данни
      image/test_2.jpg
  7. Двоични данни
      image/value_field_test.png
  8. Двоични данни
      image/version_1.jpg
  9. Двоични данни
      image/version_2.jpg
  10. Двоични данни
      image/version_3.jpg
  11. Двоични данни
      image/vlog.png
  12. Двоични данни
      image/write-badger.png
  13. +2
    -2
      include/leveldb/write_batch.h
  14. +2
    -7
      test/kv_test.cc

+ 1429
- 112
README.md
Файловите разлики са ограничени, защото са твърде много
Целия файл


+ 17
- 28
db/db_impl.cc Целия файл

@ -375,14 +375,11 @@ Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
uint64_t max_number = 0;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
// expected.erase(number);
// if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
// logs.push_back(number);
// begin 注释: max_number 为要恢复的最新的文件编号
if (number > max_number) max_number = number;
// expected 里的文件现在依然存在,可以删除,需要恢复的是 expected 里不存在的 vlog 文件
// expected 里的文件现在依然存在,可以删除
expected.erase(number);
// 保存当前已有的 vlog 文件
// 保存当前已有的 vlog 文件,基于它们进行恢复
if (type == kLogFile)
logs.push_back(number);
// end
@ -417,7 +414,7 @@ Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
continue;
}
Log(options_.info_log, "RecoverLogFile old log: %06llu \n", static_cast<unsigned long long>(logs[i]));
//重做日志操作
// 重做日志操作
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
&max_sequence, found_sequence_pos);
if (!s.ok()) {
@ -433,6 +430,7 @@ Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
return Status::OK();
}
/* 恢复内存中的数据,将 VLog 中记录的操作读取出来,重新写入到 memtable */
Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
bool* save_manifest, VersionEdit* edit,
SequenceNumber* max_sequence,
@ -492,7 +490,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
continue;
}
// begin 如果 imm_last_sequence == 0 的话,
// 那么整个说明没有进行一次 imm 转 sst的情况,所有的log文件都需要进行回收
// 那么整个说明没有进行一次 imm 转 sst的情况,所有的 log 文件都需要进行回收
// 回收编号最大的 log 文件即可
if( !found_sequence_pos && imm_last_sequence != 0 ){
Slice tmp = record;
@ -784,7 +782,7 @@ void DBImpl::MaybeScheduleCompaction() {
}
// 注释:获取所有 VLogs
Status DBImpl::GetAllValueLog(std::string dir, std::vector<uint64_t>& logs){
Status DBImpl::GetAllValueLog(std::string dir, std::vector<uint64_t>& logs) {
logs.clear();
std::vector<std::string> filenames;
// 获取 VLogs 列表
@ -847,7 +845,7 @@ Status DBImpl::CollectionValueLog(uint64_t fid, uint64_t& next_sequence) {
uint64_t size_offset = 0;
WriteOptions opt(options_.background_garbage_collection_separate_);
WriteBatch batch(opt.separate_threshold);
batch.setGarbageColletion(true);
batch.setGarbageCollection(true);
WriteBatchInternal::SetSequence(&batch, next_sequence);
while (reader.ReadRecord(&record, &scratch)) {
@ -895,7 +893,7 @@ Status DBImpl::CollectionValueLog(uint64_t fid, uint64_t& next_sequence) {
if (kv_offset - size_offset > config::gcWriteBatchSize) {
Write(opt, &batch);
batch.Clear();
batch.setGarbageColletion(true);
batch.setGarbageCollection(true);
WriteBatchInternal::SetSequence(&batch, next_sequence);
uint64_t kv_size;
GetVarint64(&get_slice,&kv_size);
@ -914,7 +912,7 @@ Status DBImpl::CollectionValueLog(uint64_t fid, uint64_t& next_sequence) {
return status;
}
// 注释:回收任务
// 注释:后台 GC 任务
void DBImpl::BackGroundGarbageCollection(){
uint64_t fid;
uint64_t last_sequence;
@ -923,7 +921,7 @@ void DBImpl::BackGroundGarbageCollection(){
if( !garbage_collection_management_->GetGarbageCollectionQueue(fid,last_sequence) ){
return;
}
// 在线的gc回收的sequence是要提前就分配好的。
// 在线的 gc 回收的 sequence 是要提前就分配好的。
CollectionValueLog(fid,last_sequence);
}
}
@ -933,13 +931,10 @@ void DBImpl::MaybeScheduleGarbageCollection() {
mutex_.AssertHeld();
if (background_GarbageCollection_scheduled_) {
// Already scheduled
// 先检查线程是否已经被调度了,如果已经被调度了,就直接退出
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
// 如果 DB 已经被关闭,那么就不调度了。
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
// 如果后台线程出错,也不调度。
} else {
// 设置调度变量,通过 detach 线程调度; detach 线程即使主线程退出,依然可以正常执行完成
background_GarbageCollection_scheduled_ = true;
@ -957,12 +952,9 @@ void DBImpl::GarbageCollectionBackgroundCall() {
assert(background_GarbageCollection_scheduled_);
if (shutting_down_.load(std::memory_order_acquire)) {
// No more background work when shutting down.
// // 如果 DB 已经被关闭,那么就不调度了。
} else if (!bg_error_.ok()) {
// No more background work after a background error.
// 如果后台线程出错,也不调度。
} else {
// 开始后台 GC 线程
BackGroundGarbageCollection();
}
@ -1314,7 +1306,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}
} else {
// begin 注释:drop 掉 LSM-tree 中的 kv 数值对了,
// 需要对属于 KV 分离的 kv 数值对进行 GC
// 对属于 KV 分离的 kv 数值对进行 GC
Slice drop_value = input->value();
if( ikey.type == kTypeSeparation ){
uint64_t fid = 0;
@ -1674,11 +1666,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
// begin 注释:GC 流程中写回的 WriteBatch 在 CollectionValueLog 函数中已经设置好了
if (!write_batch->IsGarbageColletion()) {
if (!write_batch->IsGarbageCollection()) {
// 判断是否需要进行 GC
// 如需要,空出一块 sequence 区域, 触发 GC 将在 MakeRoomForWrite 里
// 先判断是否要进行 gc 后台回收
@ -1690,13 +1680,11 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
}
// SetSequence 在 write_batch 中写入本次的 sequence
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
// last_sequence += WriteBatchInternal::Count(write_batch);
last_sequence += WriteBatchInternal::Count(write_batch);
}
// 这里设置 last_sequence 是为了确保离线 GC 的时候,
// 在 map 存在的时候需要调用 ConvertQueue 给回收任务分配 sequence
versions_->SetLastSequence(last_sequence);
last_sequence += WriteBatchInternal::Count(write_batch);
vlog_kv_numbers_ += WriteBatchInternal::Count(write_batch);
// end
@ -1779,8 +1767,8 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
Writer* w = *iter;
// begin 注释:写队列中如果遍历到是 gc 的 WriteBatch,停止合并
if (w->sync && !first->sync
|| first->batch->IsGarbageColletion()
|| w->batch->IsGarbageColletion()) {
|| first->batch->IsGarbageCollection()
|| w->batch->IsGarbageCollection()) {
// 当前 Writer要求 Sync ,而第一个 Writer 不要求 Sync,两个磁盘写入策略不一致。不做合并操作
// Do not include a sync write into a batch handled by a non-sync write.
break;
@ -2010,7 +1998,8 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
edit.SetLogNumber(impl->logfile_number_);
// s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
// begin 注释:把 imm_last_sequence 设置到新的 manifest 当中,
// 即 RecoverLogFile 中的 imm -> sst 的情况,是一次成功的全盘恢复
// 即 RecoverLogFile 中判断上一次断电时的数据库状态的 imm -> sst 的情况,
// 表示一次成功的全盘恢复
impl->versions_->StartImmLastSequence(true);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
impl->versions_->StartImmLastSequence(false);

Двоични данни
image/kv_sep.png Целия файл

Преди След
Ширина: 1010  |  Височина: 711  |  Големина: 93 KiB

Двоични данни
image/kv_test.png Целия файл

Преди След
Ширина: 503  |  Височина: 29  |  Големина: 2.5 KiB

Двоични данни
image/test_1.jpg Целия файл

Преди След
Ширина: 501  |  Височина: 103  |  Големина: 8.8 KiB

Двоични данни
image/test_2.jpg Целия файл

Преди След
Ширина: 1104  |  Височина: 631  |  Големина: 83 KiB

Двоични данни
image/value_field_test.png Целия файл

Преди След
Ширина: 498  |  Височина: 32  |  Големина: 2.6 KiB

Двоични данни
image/version_1.jpg Целия файл

Преди След
Ширина: 756  |  Височина: 453  |  Големина: 38 KiB

Двоични данни
image/version_2.jpg Целия файл

Преди След
Ширина: 733  |  Височина: 176  |  Големина: 9.9 KiB

Двоични данни
image/version_3.jpg Целия файл

Преди След
Ширина: 750  |  Височина: 435  |  Големина: 28 KiB

Двоични данни
image/vlog.png Целия файл

Преди След
Ширина: 902  |  Височина: 357  |  Големина: 45 KiB

Двоични данни
image/write-badger.png Целия файл

Преди След
Ширина: 3220  |  Височина: 1695  |  Големина: 252 KiB

+ 2
- 2
include/leveldb/write_batch.h Целия файл

@ -76,9 +76,9 @@ class LEVELDB_EXPORT WriteBatch {
Status Iterate(Handler* handler) const;
Status Iterate(Handler* handler, uint64_t fid, uint64_t offset) const;
bool IsGarbageColletion() { return belong_to_gc; }
bool IsGarbageCollection() { return belong_to_gc; }
void setGarbageColletion(bool is_gc) { belong_to_gc = is_gc; }
void setGarbageCollection(bool is_gc) { belong_to_gc = is_gc; }
private:
friend class WriteBatchInternal;

+ 2
- 7
test/kv_test.cc Целия файл

@ -7,7 +7,7 @@
using namespace leveldb;
constexpr int short_value_size = 4;
constexpr int long_value_size = 32;
constexpr int long_value_size = 64;
constexpr int data_size = 512;
Status OpenDB(std::string dbName, DB **db) {
@ -65,16 +65,11 @@ TEST(TestKV, GetLongValue) {
ReadOptions readOptions;
Status status;
int key_num = data_size / long_value_size;
// for (int i = 0; i < key_num; i++) {
for (int i = key_num-1; i > -1; i--) {
// for (int i = 0; i < key_num - 1; i++) {
// int key_ = rand() % key_num+1;
for (int i = 0; i < key_num; i++) {
std::string key = std::to_string(i);
std::string value;
std::string expected_value(long_value_size, 'a');
status = db->Get(readOptions, key, &value);
// std::cout << key << std::endl;
// std::cout << status.ToString() << std::endl;
ASSERT_TRUE(status.ok());
EXPECT_EQ(expected_value, value);
}

Зареждане…
Отказ
Запис