GUJIEJASON пре 8 месеци
родитељ
комит
48eeab4efc
10 измењених фајлова са 1129 додато и 993 уклоњено
  1. +24
    -4
      benchmarks/db_bench.cc
  2. +1049
    -921
      benchmarks/db_bench_kv.cc
  3. +6
    -12
      db/db_impl.cc
  4. +10
    -10
      db/version_edit.cc
  5. +10
    -15
      db/version_edit.h
  6. +14
    -16
      db/version_set.cc
  7. +6
    -6
      db/version_set.h
  8. +1
    -0
      db/write_batch.cc
  9. +8
    -8
      test/bench_test.cc
  10. +1
    -1
      test/value_field_test.cc

+ 24
- 4
benchmarks/db_bench.cc Прегледај датотеку

@ -56,9 +56,13 @@ static const char* FLAGS_benchmarks =
"readreverse,"
"compact,"
"readrandom,"
"fillgivenseq,"
"fillgivenrandom,"
"findkeysbyfield,"
"readseq,"
"readreverse,"
"deleteseq,"
"deleterandom,"
"fill100K,";
// "crc32c,"
// "snappycomp,"
@ -69,17 +73,19 @@ static const char* FLAGS_benchmarks =
// Number of key/values to place in database
static int FLAGS_num = 1000000;
static int FLAGS_delete_num = 100000;
// Number of read operations to do. If negative, do FLAGS_num reads.
static int FLAGS_reads = -1;
// Number of given fields used in FindKeysByField test. If negative, write in half of FLAGS_num targets with given field.
static int FLAGS_num_fields = 80000;
static int FLAGS_num_fields = 100000;
// Number of concurrent threads to run.
static int FLAGS_threads = 1;
// Size of each value
static int FLAGS_value_size = 100;
static int FLAGS_value_size = 1024;
// Arrange to generate values that shrink to this fraction of
// their original size after compression
@ -436,6 +442,7 @@ class Benchmark {
const FilterPolicy* filter_policy_;
DB* db_;
int num_;
int delete_num_;
int value_size_;
int entries_per_batch_;
WriteOptions write_options_;
@ -531,6 +538,7 @@ class Benchmark {
: nullptr),
db_(nullptr),
num_(FLAGS_num),
delete_num_(FLAGS_delete_num),
value_size_(FLAGS_value_size),
entries_per_batch_(1),
reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
@ -574,6 +582,7 @@ class Benchmark {
// Reset parameters that may be overridden below
num_ = FLAGS_num;
delete_num_ = FLAGS_delete_num;
reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
value_size_ = FLAGS_value_size;
entries_per_batch_ = 1;
@ -622,6 +631,12 @@ class Benchmark {
method = &Benchmark::SeekRandom;
} else if (name == Slice("seekordered")) {
method = &Benchmark::SeekOrdered;
} else if (name == Slice("fillgivenseq")){ // wesley add
fresh_db = true;
method = &Benchmark::WriteTargetSeq;
} else if (name == Slice("fillgivenrandom")){
fresh_db = true;
method = &Benchmark::WriteTargetRandom;
} else if (name == Slice("findkeysbyfield")) {
method = &Benchmark::FindKeysByField;
} else if (name == Slice("readhot")) {
@ -999,8 +1014,11 @@ class Benchmark {
void FindKeysByField(ThreadState* thread){
int found = 0;
Options options;
options.create_if_missing = true;
DBImpl* impl = new DBImpl(options, FLAGS_db);
FieldArray fields_to_find = {{"field2", "value2_"}};
std::vector<std::string> found_keys = Fields::FindKeysByFields(db_, fields_to_find);
std::vector<std::string> found_keys = Fields::FindKeysByFields(db_, fields_to_find, impl);
found = found_keys.size();
char msg[100];
snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_fields);
@ -1049,7 +1067,7 @@ class Benchmark {
WriteBatch batch;
Status s;
KeyBuffer key;
for (int i = 0; i < num_; i += entries_per_batch_) {
for (int i = 0; i < delete_num_; i += entries_per_batch_) {
batch.Clear();
for (int j = 0; j < entries_per_batch_; j++) {
const int k = seq ? i + j : (thread->rand.Uniform(FLAGS_num));
@ -1167,6 +1185,8 @@ int main(int argc, char** argv) {
FLAGS_compression = n;
} else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) {
FLAGS_num = n;
} else if (sscanf(argv[i], "--delete_num=%d%c", &n, &junk) == 1) {
FLAGS_delete_num = n;
} else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {
FLAGS_reads = n;
} else if (sscanf(argv[i], "--num_fields=%d%c", &n, &junk) == 1) {

+ 1049
- 921
benchmarks/db_bench_kv.cc
Разлика између датотеке није приказан због своје велике величине
Прегледај датотеку


+ 6
- 12
db/db_impl.cc Прегледај датотеку

@ -410,7 +410,7 @@ Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
// // update the file number allocation counter in VersionSet.
// versions_->MarkFileNumberUsed(logs[i]);
// }
//TODO begin
//注释:逐个恢复日志的内容
bool found_sequence_pos = false;
for(int i = 0; i < logs.size(); ++i){
if( logs[i] < versions_->ImmLogFileNumber() ) {
@ -424,8 +424,7 @@ Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
return s;
}
}
versions_->MarkFileNumberUsed(max_number);
//TODO end
versions_->MarkFileNumberUsed(max_number);
if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
@ -483,9 +482,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
uint64_t record_offset = 0;
int compactions = 0;
MemTable* mem = nullptr;
// TODO begin
//注释:设置 imm_last_sequence
uint64_t imm_last_sequence = versions_->ImmLastSequence();
// TODO end
while (reader.ReadRecord(&record, &scratch) && status.ok()) {
// if (record.size() < 12) {
if (record.size() < 20) {
@ -541,10 +539,9 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
compactions++;
*save_manifest = true;
// TODO begin mem 落盘修改 imm_last_sequence,版本恢复
// 注释:mem 落盘修改 imm_last_sequence,版本恢复
versions_->SetImmLastSequence(mem->GetTailSequence());
versions_->SetImmLogFileNumber(log_number);
// TODO end
status = WriteLevel0Table(mem, edit, nullptr);
mem->Unref();
@ -587,10 +584,9 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
// mem did not get reused; compact it.
if (status.ok()) {
// TODO begin mem 落盘修改 imm_last_sequence,版本恢复
//注释: mem 落盘修改 imm_last_sequence,版本恢复
versions_->SetImmLastSequence(mem->GetTailSequence());
versions_->SetImmLogFileNumber(log_number);
// TODO end
*save_manifest = true;
status = WriteLevel0Table(mem, edit, nullptr);
}
@ -664,14 +660,12 @@ void DBImpl::CompactMemTable() {
edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
// s = versions_->LogAndApply(&edit, &mutex_);
// TODO begin
//构建新版本,并将其加入到 version_当中
//注释: 构建新版本,并将其加入到 version_当中
versions_->StartImmLastSequence(true);
versions_->SetImmLastSequence(imm_->GetTailSequence());
versions_->SetImmLogFileNumber(imm_->GetLogFileNumber());
s = versions_->LogAndApply(&edit, &mutex_);
versions_->StartImmLastSequence(false);
// TODO end
}
if (s.ok()) {

+ 10
- 10
db/version_edit.cc Прегледај датотеку

@ -21,11 +21,10 @@ enum Tag {
kNewFile = 7,
// 8 was used for large value refs
kPrevLogNumber = 9,
// TODO begin 在版本中记录 immemtable 转到 sst的时候的 sequence,主要用来恢复的时候 定位db关闭的时候
// imm 和 mem中的内容在恢复的时候应该从log文件中哪里开始恢复。
// 注释: 用于记录immemtable到sst的sequence
kImmLastSequence = 10,
// 注释: 用于记录恢复immemtable和memtable时在log文件中对应的位置
kLogFile = 11
// TODO end
};
void VersionEdit::Clear() {
@ -35,20 +34,20 @@ void VersionEdit::Clear() {
last_sequence_ = 0;
next_file_number_ = 0;
// TODO begin
// 注释:重置为0
imm_last_sequence_ = 0;
// 注释:重置为0
imm_log_file_number_ = 0;
// TODO end
has_comparator_ = false;
has_log_number_ = false;
has_prev_log_number_ = false;
has_next_file_number_ = false;
has_next_file_number_ = false;
has_last_sequence_ = false;
// TODO begin
// 注释:重置为false
has_imm_last_sequence_ = false;
// TODO end
// compact_pointers_.clear();
deleted_files_.clear();
@ -76,6 +75,7 @@ void VersionEdit::EncodeTo(std::string* dst) const {
PutVarint32(dst, kLastSequence);
PutVarint64(dst, last_sequence_);
}
// 注释:若 imm_last_sequence_ 有效,则写入对应的标识符和数据
if (has_imm_last_sequence_) {
PutVarint32(dst, kImmLastSequence);
PutVarint64(dst, imm_last_sequence_);
@ -180,7 +180,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
}
break;
// TODO begin
// 注释:如果是kImmLastSequence类型,则解析imm_last_sequence_和imm_log_file_number_并将has_imm_last_sequence_ 设为true
case kImmLastSequence:
if (GetVarint64(&input, &imm_last_sequence_) && GetVarint64(&input, &imm_log_file_number_)) {
has_imm_last_sequence_ = true;
@ -188,7 +188,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
msg = "imemtable last sequence number";
}
break;
// TODO end
case kCompactPointer:
if (GetLevel(&input, &level) && GetInternalKey(&input, &key)) {

+ 10
- 15
db/version_edit.h Прегледај датотеку

@ -26,19 +26,17 @@ struct FileMetaData {
InternalKey largest; // Largest internal key served by table
};
// TODO begin
// vlog文件的元数据
struct LogMetaData {
LogMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) {}
int refs; //
// Seeks allowed until compaction; 0,compaction操作了; allowed_seeks的值在sstable文件加入到version时确定
int allowed_seeks;
int allowed_seeks; //0,compaction操作了; allowed_seeks的值在sstable文件加入到version时确定
uint64_t number; //;sstable文件的名字是 number.ldb
uint64_t file_size; // File size in bytes
InternalKey smallest; // Smallest internal key served by table key
InternalKey largest; // Largest internal key served by table key
uint64_t file_size; //
InternalKey smallest; //key
InternalKey largest; //key
};
// TODO end
class VersionEdit {
public:
@ -68,14 +66,13 @@ class VersionEdit {
last_sequence_ = seq;
}
// TODO begin
// imm_last_sequence_ imm sst的时候用
// imm_last_sequence_imm sst的时候用
void SetImmLastSequence(SequenceNumber seq,uint64_t fid) {
has_imm_last_sequence_ = true;
imm_last_sequence_ = seq;
imm_log_file_number_ = fid;
}
// TODO end
void SetCompactPointer(int level, const InternalKey& key) {
compact_pointers_.push_back(std::make_pair(level, key));
}
@ -119,14 +116,12 @@ class VersionEdit {
bool has_next_file_number_;
bool has_last_sequence_;
// TODO begin
// imm_last_sequence_
// imm_last_sequence_
bool has_imm_last_sequence_;
// log的时候 memtable immemtabl中的位置
// class="err">注log的时候 memtable immemtabl中的位置
SequenceNumber imm_last_sequence_;
// imm_last_sequence log文件
// class="err">注imm_last_sequence log文件
uint64_t imm_log_file_number_;
// TODO end
std::vector<std::pair<int, InternalKey>> compact_pointers_;
DeletedFileSet deleted_files_;

+ 14
- 16
db/version_set.cc Прегледај датотеку

@ -252,16 +252,15 @@ enum SaverState {
kDeleted,
kCorrupt,
};
// TODO begin
// 注释:Saver的kv对是否分离
enum SaverSeparate {
kNotSeparated,
kSeparated
};
// TODO end
struct Saver {
// TODO begin
// 注释:初始设为不分离
SaverSeparate separate = kNotSeparated;
// TODO end
SaverState state;
const Comparator* ucmp;
Slice user_key;
@ -279,9 +278,8 @@ static void SaveValue(void* arg, const Slice& ikey, const Slice& v) {
s->state = (parsed_key.type == kTypeValue || parsed_key.type == kTypeSeparation) ? kFound : kDeleted;
if (s->state == kFound) {
s->value->assign(v.data(), v.size());
// TODO begin
// 注释:如果key.type是kTypeSeparation,则设为kSeparated类型
s->separate = ( parsed_key.type == kTypeSeparation ) ? kSeparated : kNotSeparated;
// TODO end
}
}
}
@ -367,13 +365,12 @@ Status Version::Get(const ReadOptions& options, const LookupKey& k,
state->s = state->vset->table_cache_->Get(*state->options, f->number,
f->file_size, state->ikey,
&state->saver, SaveValue);
// TODO begin
// 注释:对于是否kv分离,调用不同的Set函数
if( state->saver.separate == kSeparated ){
state->s.SetSeparated();
} else{
state->s.SetNotSeparated();
}
// TODO end
if (!state->s.ok()) {
state->found = true;
return false;
@ -761,6 +758,10 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options,
next_file_number_(2),
manifest_file_number_(0), // Filled by Recover()
last_sequence_(0),
//注释:加上version_edit中添加的参数
imm_last_sequence_(0),
imm_log_file_number_(0),
save_imm_last_sequence_(false),
log_number_(0),
prev_log_number_(0),
descriptor_file_(nullptr),
@ -809,11 +810,10 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);
// TODO begin
// 注释:设置imm_last_sequence_和imm_log_file_number_
if( SaveImmLastSequence() ){
edit->SetImmLastSequence(imm_last_sequence_,imm_log_file_number_);
}
// TODO end
Version* v = new Version(this);
{
@ -919,11 +919,11 @@ Status VersionSet::Recover(bool* save_manifest) {
bool have_next_file = false;
bool have_last_sequence = false;
// TODO begin
//注释:重置version_edit里添加的参数
bool have_imm_last_sequence = false;
uint64_t imm_last_sequence = 0;
uint64_t imm_log_file_number = 0;
// TODO end
uint64_t next_file = 0;
uint64_t last_sequence = 0;
@ -975,13 +975,12 @@ Status VersionSet::Recover(bool* save_manifest) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
// TODO begin
//注释: 构建当前的Version 回放参数
if (edit.has_imm_last_sequence_) {
imm_last_sequence = edit.imm_last_sequence_;
imm_log_file_number = edit.imm_log_file_number_;
have_imm_last_sequence = true;
}
// TODO end
}
}
delete file;
@ -1015,10 +1014,9 @@ Status VersionSet::Recover(bool* save_manifest) {
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;
// TODO begin
//注释:修改imm_last_sequence_和imm_log_file_number_
imm_last_sequence_ = imm_last_sequence;
imm_log_file_number_ = imm_log_file_number;
// TODO end
// See if we can reuse the existing MANIFEST file.
if (ReuseManifest(dscname, current)) {

+ 6
- 6
db/version_set.h Прегледај датотеку

@ -269,14 +269,14 @@ class VersionSet {
};
const char* LevelSummary(LevelSummaryStorage* scratch) const;
// TODO begin
//
bool SaveImmLastSequence(){ return save_imm_last_sequence_; }
bool StartImmLastSequence(bool save ){ save_imm_last_sequence_ = save; }
void SetImmLastSequence( uint64_t seq ){ imm_last_sequence_ = seq; }
uint64_t ImmLastSequence() const { return imm_last_sequence_; }
uint64_t ImmLogFileNumber() const { return imm_log_file_number_; }
void SetImmLogFileNumber( uint64_t fid ){ imm_log_file_number_ = fid; }
// TODO end
private:
class Builder;
@ -313,13 +313,13 @@ class VersionSet {
uint64_t log_number_;
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted
// TODO begin
// immemtable转sst的sequence
uint64_t imm_last_sequence_;
// imm sst时候的sequence LogAndApply mior compact major compact的过程
// imm sst时候的sequence LogAndApply mior compact major compact的过程
bool save_imm_last_sequence_;
// imm_last_sequence log文件
// class="err">注imm_last_sequence log文件
uint64_t imm_log_file_number_;
// TODO end
// Opened lazily
WritableFile* descriptor_file_;

+ 1
- 0
db/write_batch.cc Прегледај датотеку

@ -34,6 +34,7 @@ WriteBatch::Handler::~Handler() = default;
void WriteBatch::Clear() {
belong_to_gc = false;
// belong_to_gc = true;
rep_.clear();
rep_.resize(kHeader);
}

+ 8
- 8
test/bench_test.cc Прегледај датотеку

@ -10,11 +10,11 @@
using namespace leveldb;
// Number of key/values to operate in database
constexpr int num_ = 100000;
constexpr int num_ = 500000;
// Size of each value
constexpr int value_size_ = 1000;
constexpr int value_size_ = 1024;
// Number of read operations
constexpr int reads_ = 100000;
constexpr int reads_ = 500000;
Status OpenDB(std::string dbName, DB **db) {
Options options;
@ -85,7 +85,7 @@ void FindKeys(DB *db, std::vector &lats) {
FieldArray fields_to_find = {{"field", "old_value_" }};
auto start_time = std::chrono::steady_clock::now();
std::string dbname_ = "benchmark_db";
std::string dbname_ = "bench_resr_db";
Options options;
options.create_if_missing = true;
DBImpl* impl = new DBImpl(options, dbname_);
@ -153,11 +153,11 @@ void RunBenchmark(const char* name, Func func, bool setup_data = true, bool setu
delete db;
}
// TEST(BenchTest, PutLatency) { RunBenchmark("Put", InsertData, false, false); }
// TEST(BenchTest, PutFieldsLatency) { RunBenchmark("PutFields", InsertFields, false, false); }
TEST(BenchTest, PutLatency) { RunBenchmark("Put", InsertData, false, false); }
TEST(BenchTest, PutFieldsLatency) { RunBenchmark("PutFields", InsertFields, false, false); }
// TEST(BenchTest, GetLatency) { RunBenchmark("Get", GetData, true, false); }
// TEST(BenchTest, IteratorLatency) { RunBenchmark("Iterator", ReadOrdered, true, false); }
TEST(BenchTest, GetLatency) { RunBenchmark("Get", GetData, true, false); }
TEST(BenchTest, IteratorLatency) { RunBenchmark("Iterator", ReadOrdered, true, false); }
TEST(BenchTest, FindKeysByFieldLatency) {
RunBenchmark("FindKeysByFields", FindKeys, false, true);

+ 1
- 1
test/value_field_test.cc Прегледај датотеку

@ -28,7 +28,7 @@ class FieldsTest : public ::testing::Test {
}
DB* db_ = nullptr; // 数据库实例指针。
std::string dbname_ = "testdb"; // 记录数据库路径
std::string dbname_ = "testdb_field"; // 记录数据库路径
};
// 测试各种构造函数

Loading…
Откажи
Сачувај