35 Commits

Author SHA1 Message Date
  VirgilZhu 9c3c34dda7 finish project 8 months ago
  VirgilZhu 3c1d0d49e6 add notes 8 months ago
  VirgilZhu c34e37ee0c fix some bugs 8 months ago
  VirgilZhu 5f7d8de5d3 commit for note 8 months ago
  GUJIEJASON 48eeab4efc finish version 8 months ago
  GUJIEJASON 5ca7f7f3d5 temp work 8 months ago
  GUJIEJASON 162194ea5c Merge branch 'zwq' into jie 8 months ago
  VirgilZhu accd7e616c roughly completed 8 months ago
  VirgilZhu b6143be065 fix ParseInternalKey bug 8 months ago
  GUJIEJASON 228b316445 temp work 8 months ago
  VirgilZhu eedabe8dc3 tmp work 8 months ago
  GUJIEJASON 29d0bdb93b fix some bugs 8 months ago
  VirgilZhu 77e7772487 commit for debug_kv_sep 8 months ago
  VirgilZhu 43b44e3167 kv separation v1.0 8 months ago
  GUJIEJASON 31658444b8 Add bench_test.cc 8 months ago
  GUJIEJASON 11fc9d3a44 recover and version control 8 months ago
  GUJIEJASON 750d5cd1c9 fix some bugs inGet 8 months ago
  VirgilZhu fcc67b09de vlog_reader/writer v2.1 8 months ago
  GUJIEJASON 01dd8e75fe Add KV test 8 months ago
  VirgilZhu 1f1a6de7b2 vlog_reader/writer v2.0 8 months ago
  VirgilZhu 5b4c09301c vlog reader/writer v1.1 8 months ago
  VirgilZhu 10baaeb199 vlog_reader/writer v1.0 8 months ago
  VirgilZhu 6df1a279e1 vlog_reader v1.0 8 months ago
  GUJIEJASON af6de4df42 Add VlogWriter 9 months ago
  GUJIEJASON 04e46dde27 kv start 9 months ago
  GUJIEJASON b858e53230 Merge branch 'zwq' into jie 9 months ago
  GUJIEJASON 1d7f56bbd8 Update README 9 months ago
  VirgilZhu 041bede9e4 kv sep v0.1: add vlog_manager/reader/writer interface 9 months ago
  GUJIEJASON cf8e758754 finish value_field_test.cc 9 months ago
  VirgilZhu 4507dc27d0 fields v2.2: complete FindKeysByFields function 9 months ago
  GUJIEJASON 805eaff781 fix some bugs 9 months ago
  GUJIEJASON a21a1cc28e update value_field_test 9 months ago
  VirgilZhu 075e6a0205 fields v2.1: add possibly feasible ktypevalue/ktypedeletion parse in FindKeysByFields function 9 months ago
  VirgilZhu 04d5574577 fields v2: add FindKeysByFields Function 9 months ago
  VirgilZhu 64fd7be5f8 fields v1: finish DBImpl interface and Class Fields definition 9 months ago
53 changed files with 5086 additions and 224 deletions
Unified View
  1. +36
    -3
      CMakeLists.txt
  2. +1482
    -130
      README.md
  3. +106
    -12
      benchmarks/db_bench.cc
  4. +1218
    -0
      benchmarks/db_bench_kv.cc
  5. +1
    -1
      db/c.cc
  6. +549
    -53
      db/db_impl.cc
  7. +48
    -2
      db/db_impl.h
  8. +1
    -0
      db/db_iter.cc
  9. +2
    -0
      db/db_test.cc
  10. +4
    -1
      db/dbformat.h
  11. +3
    -1
      db/dumpfile.cc
  12. +1
    -0
      db/fault_injection_test.cc
  13. +292
    -0
      db/fields.cpp
  14. +77
    -0
      db/fields.h
  15. +104
    -0
      db/kv_separate_management.cc
  16. +69
    -0
      db/kv_separate_management.h
  17. +1
    -0
      db/leveldbutil.cc
  18. +5
    -0
      db/log_format.h
  19. +1
    -0
      db/log_test.cc
  20. +8
    -1
      db/memtable.cc
  21. +7
    -0
      db/memtable.h
  22. +34
    -3
      db/version_edit.cc
  23. +27
    -0
      db/version_edit.h
  24. +45
    -3
      db/version_set.cc
  25. +17
    -0
      db/version_set.h
  26. +98
    -0
      db/vlog_reader.cc
  27. +59
    -0
      db/vlog_reader.h
  28. +42
    -0
      db/vlog_writer.cc
  29. +44
    -0
      db/vlog_writer.h
  30. +78
    -5
      db/write_batch.cc
  31. +2
    -0
      db/write_batch_internal.h
  32. +7
    -2
      helpers/memenv/memenv.cc
  33. BIN
      image/kv_sep.png
  34. BIN
      image/kv_test.png
  35. BIN
      image/test_1.jpg
  36. BIN
      image/test_2.jpg
  37. BIN
      image/value_field_test.png
  38. BIN
      image/version_1.jpg
  39. BIN
      image/version_2.jpg
  40. BIN
      image/version_3.jpg
  41. BIN
      image/vlog.png
  42. BIN
      image/write-badger.png
  43. +6
    -2
      include/leveldb/db.h
  44. +7
    -0
      include/leveldb/env.h
  45. +15
    -1
      include/leveldb/options.h
  46. +13
    -1
      include/leveldb/status.h
  47. +12
    -2
      include/leveldb/write_batch.h
  48. +1
    -0
      table/table_test.cc
  49. +169
    -0
      test/bench_test.cc
  50. +82
    -0
      test/kv_test.cc
  51. +1
    -1
      test/ttl_test.cc
  52. +253
    -0
      test/value_field_test.cc
  53. +59
    -0
      util/env_posix.cc

+ 36
- 3
CMakeLists.txt View File

@ -117,10 +117,18 @@ endif(BUILD_SHARED_LIBS)
# Must be included before CMAKE_INSTALL_INCLUDEDIR is used. # Must be included before CMAKE_INSTALL_INCLUDEDIR is used.
include(GNUInstallDirs) include(GNUInstallDirs)
add_library(leveldb "")
add_library(leveldb
db/vlog_reader.h
db/vlog_reader.cc
db/vlog_writer.h
db/vlog_writer.cc
db/kv_separate_management.cc
db/kv_separate_management.h)
target_sources(leveldb target_sources(leveldb
PRIVATE PRIVATE
"${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h" "${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h"
"db/fields.cpp"
"db/fields.h"
"db/builder.cc" "db/builder.cc"
"db/builder.h" "db/builder.h"
"db/c.cc" "db/c.cc"
@ -310,7 +318,8 @@ if(LEVELDB_BUILD_TESTS)
APPEND PROPERTY COMPILE_OPTIONS -Wno-missing-field-initializers) APPEND PROPERTY COMPILE_OPTIONS -Wno-missing-field-initializers)
endif(LEVELDB_HAVE_NO_MISSING_FIELD_INITIALIZERS) endif(LEVELDB_HAVE_NO_MISSING_FIELD_INITIALIZERS)
add_executable(leveldb_tests "")
add_executable(leveldb_tests ""
test/kv_test.cc)
target_sources(leveldb_tests target_sources(leveldb_tests
PRIVATE PRIVATE
# "db/fault_injection_test.cc" # "db/fault_injection_test.cc"
@ -521,11 +530,35 @@ endif(LEVELDB_INSTALL)
add_executable(db_test2 add_executable(db_test2
"${PROJECT_SOURCE_DIR}/test/db_test2.cc" "${PROJECT_SOURCE_DIR}/test/db_test2.cc"
test/value_field_test.cc
) )
target_link_libraries(db_test2 PRIVATE leveldb) target_link_libraries(db_test2 PRIVATE leveldb)
add_executable(ttl_test add_executable(ttl_test
"${PROJECT_SOURCE_DIR}/test/ttl_test.cc" "${PROJECT_SOURCE_DIR}/test/ttl_test.cc"
test/value_field_test.cc
) )
target_link_libraries(ttl_test PRIVATE leveldb gtest)
target_link_libraries(ttl_test PRIVATE leveldb gtest)
add_executable(value_field_test
"${PROJECT_SOURCE_DIR}/test/value_field_test.cc"
test/value_field_test.cc
)
target_link_libraries(value_field_test PRIVATE leveldb gtest)
add_executable(kv_test
"${PROJECT_SOURCE_DIR}/test/kv_test.cc"
test/kv_test.cc
db/kv_separate_management.cc
db/kv_separate_management.h
)
target_link_libraries(kv_test PRIVATE leveldb gtest)
add_executable(bench_test
"${PROJECT_SOURCE_DIR}/test/bench_test.cc"
test/bench_test.cc
db/kv_separate_management.cc
db/kv_separate_management.h
)
target_link_libraries(bench_test PRIVATE leveldb gtest)

+ 1482
- 130
README.md
File diff suppressed because it is too large
View File


+ 106
- 12
benchmarks/db_bench.cc View File

@ -20,6 +20,7 @@
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/random.h" #include "util/random.h"
#include "util/testutil.h" #include "util/testutil.h"
#include "db/fields.h"
// Comma-separated list of operations to run in the specified order // Comma-separated list of operations to run in the specified order
// Actual benchmarks: // Actual benchmarks:
@ -55,26 +56,36 @@ static const char* FLAGS_benchmarks =
"readreverse," "readreverse,"
"compact," "compact,"
"readrandom," "readrandom,"
"fillgivenseq,"
"fillgivenrandom,"
"findkeysbyfield,"
"readseq," "readseq,"
"readreverse," "readreverse,"
"fill100K,"
"crc32c,"
"snappycomp,"
"snappyuncomp,"
"zstdcomp,"
"zstduncomp,";
"deleteseq,"
"deleterandom,"
"fill100K,";
// "crc32c,"
// "snappycomp,"
// "snappyuncomp,"
// "zstdcomp,"
// "zstduncomp,";
// Number of key/values to place in database // Number of key/values to place in database
static int FLAGS_num = 1000000; static int FLAGS_num = 1000000;
static int FLAGS_delete_num = 100000;
// Number of read operations to do. If negative, do FLAGS_num reads. // Number of read operations to do. If negative, do FLAGS_num reads.
static int FLAGS_reads = -1; static int FLAGS_reads = -1;
// Number of given fields used in FindKeysByField test. If negative, write in half of FLAGS_num targets with given field.
static int FLAGS_num_fields = 100000;
// Number of concurrent threads to run. // Number of concurrent threads to run.
static int FLAGS_threads = 1; static int FLAGS_threads = 1;
// Size of each value // Size of each value
static int FLAGS_value_size = 100;
static int FLAGS_value_size = 1024;
// Arrange to generate values that shrink to this fraction of // Arrange to generate values that shrink to this fraction of
// their original size after compression // their original size after compression
@ -124,7 +135,7 @@ static bool FLAGS_reuse_logs = false;
static bool FLAGS_compression = true; static bool FLAGS_compression = true;
// Use the db with the following name. // Use the db with the following name.
static const char* FLAGS_db = nullptr;
static const char* FLAGS_db = "benchmark_db";
// ZSTD compression level to try out // ZSTD compression level to try out
static int FLAGS_zstd_compression_level = 1; static int FLAGS_zstd_compression_level = 1;
@ -431,6 +442,7 @@ class Benchmark {
const FilterPolicy* filter_policy_; const FilterPolicy* filter_policy_;
DB* db_; DB* db_;
int num_; int num_;
int delete_num_;
int value_size_; int value_size_;
int entries_per_batch_; int entries_per_batch_;
WriteOptions write_options_; WriteOptions write_options_;
@ -438,6 +450,7 @@ class Benchmark {
int heap_counter_; int heap_counter_;
CountComparator count_comparator_; CountComparator count_comparator_;
int total_thread_count_; int total_thread_count_;
int num_fields; // 插入的fields数量
void PrintHeader() { void PrintHeader() {
const int kKeySize = 16 + FLAGS_key_prefix; const int kKeySize = 16 + FLAGS_key_prefix;
@ -525,12 +538,14 @@ class Benchmark {
: nullptr), : nullptr),
db_(nullptr), db_(nullptr),
num_(FLAGS_num), num_(FLAGS_num),
delete_num_(FLAGS_delete_num),
value_size_(FLAGS_value_size), value_size_(FLAGS_value_size),
entries_per_batch_(1), entries_per_batch_(1),
reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads), reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
heap_counter_(0), heap_counter_(0),
count_comparator_(BytewiseComparator()), count_comparator_(BytewiseComparator()),
total_thread_count_(0) {
total_thread_count_(0),
num_fields(FLAGS_num_fields < 0 ? FLAGS_num / 2 : FLAGS_num_fields) {
std::vector<std::string> files; std::vector<std::string> files;
g_env->GetChildren(FLAGS_db, &files); g_env->GetChildren(FLAGS_db, &files);
for (size_t i = 0; i < files.size(); i++) { for (size_t i = 0; i < files.size(); i++) {
@ -567,6 +582,7 @@ class Benchmark {
// Reset parameters that may be overridden below // Reset parameters that may be overridden below
num_ = FLAGS_num; num_ = FLAGS_num;
delete_num_ = FLAGS_delete_num;
reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads); reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
value_size_ = FLAGS_value_size; value_size_ = FLAGS_value_size;
entries_per_batch_ = 1; entries_per_batch_ = 1;
@ -615,6 +631,14 @@ class Benchmark {
method = &Benchmark::SeekRandom; method = &Benchmark::SeekRandom;
} else if (name == Slice("seekordered")) { } else if (name == Slice("seekordered")) {
method = &Benchmark::SeekOrdered; method = &Benchmark::SeekOrdered;
} else if (name == Slice("fillgivenseq")){ // wesley add
fresh_db = true;
method = &Benchmark::WriteTargetSeq;
} else if (name == Slice("fillgivenrandom")){
fresh_db = true;
method = &Benchmark::WriteTargetRandom;
} else if (name == Slice("findkeysbyfield")) {
method = &Benchmark::FindKeysByField;
} else if (name == Slice("readhot")) { } else if (name == Slice("readhot")) {
method = &Benchmark::ReadHot; method = &Benchmark::ReadHot;
} else if (name == Slice("readrandomsmall")) { } else if (name == Slice("readrandomsmall")) {
@ -852,8 +876,11 @@ class Benchmark {
for (int j = 0; j < entries_per_batch_; j++) { for (int j = 0; j < entries_per_batch_; j++) {
const int k = seq ? i + j : thread->rand.Uniform(FLAGS_num); const int k = seq ? i + j : thread->rand.Uniform(FLAGS_num);
key.Set(k); key.Set(k);
batch.Put(key.slice(), gen.Generate(value_size_));
bytes += value_size_ + key.slice().size();
FieldArray fields = {{"field1", "value1_" + std::to_string(i)}, {"field2", "value2_"}};
Fields ffields(fields);
db_->PutFields(WriteOptions(), key.slice(), ffields);
// batch.Put(key.slice(), gen.Generate(value_size_));
bytes += ffields.size() + key.slice().size();
thread->stats.FinishedSingleOp(); thread->stats.FinishedSingleOp();
} }
s = db_->Write(write_options_, &batch); s = db_->Write(write_options_, &batch);
@ -935,6 +962,69 @@ class Benchmark {
} }
} }
void WriteTargetSeq(ThreadState* thread) { WriteGiven(thread, true); }
void WriteTargetRandom(ThreadState* thread) { WriteGiven(thread, false); }
void WriteGiven(ThreadState* thread, bool seq) {
if (num_ != FLAGS_num) {
char msg[100];
std::snprintf(msg, sizeof(msg), "(%d ops)", num_);
thread->stats.AddMessage(msg);
}
RandomGenerator gen;
WriteBatch batch;
Status s;
int64_t bytes = 0;
KeyBuffer key;
for (int i = 0; i < num_; i += entries_per_batch_) {
batch.Clear();
for (int j = 0; j < entries_per_batch_; j++) {
const int k = seq ? i + j : thread->rand.Uniform(FLAGS_num);
key.Set(k);
FieldArray fields;
auto value = gen.Generate(value_size_);
if (i < num_fields) {
fields = {
{"field1", value.ToString()},
{"field2", "value2_"},
};
} else {
fields = {
{"field1", value.ToString()},
};
}
Fields ffields(fields);
db_->PutFields(WriteOptions(), key.slice(), ffields);
bytes += ffields.size() + key.slice().size();
thread->stats.FinishedSingleOp();
}
s = db_->Write(write_options_, &batch);
if (!s.ok()) {
std::fprintf(stderr, "put error: %s\n", s.ToString().c_str());
std::exit(1);
}
}
thread->stats.AddBytes(bytes);
}
void FindKeysByField(ThreadState* thread){
int found = 0;
Options options;
options.create_if_missing = true;
DBImpl* impl = new DBImpl(options, FLAGS_db);
FieldArray fields_to_find = {{"field2", "value2_"}};
std::vector<std::string> found_keys = Fields::FindKeysByFields(db_, fields_to_find, impl);
found = found_keys.size();
char msg[100];
snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_fields);
thread->stats.AddMessage(msg);
}
void SeekRandom(ThreadState* thread) { void SeekRandom(ThreadState* thread) {
ReadOptions options; ReadOptions options;
int found = 0; int found = 0;
@ -977,7 +1067,7 @@ class Benchmark {
WriteBatch batch; WriteBatch batch;
Status s; Status s;
KeyBuffer key; KeyBuffer key;
for (int i = 0; i < num_; i += entries_per_batch_) {
for (int i = 0; i < delete_num_; i += entries_per_batch_) {
batch.Clear(); batch.Clear();
for (int j = 0; j < entries_per_batch_; j++) { for (int j = 0; j < entries_per_batch_; j++) {
const int k = seq ? i + j : (thread->rand.Uniform(FLAGS_num)); const int k = seq ? i + j : (thread->rand.Uniform(FLAGS_num));
@ -1095,8 +1185,12 @@ int main(int argc, char** argv) {
FLAGS_compression = n; FLAGS_compression = n;
} else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) { } else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) {
FLAGS_num = n; FLAGS_num = n;
} else if (sscanf(argv[i], "--delete_num=%d%c", &n, &junk) == 1) {
FLAGS_delete_num = n;
} else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) { } else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {
FLAGS_reads = n; FLAGS_reads = n;
} else if (sscanf(argv[i], "--num_fields=%d%c", &n, &junk) == 1) {
FLAGS_num_fields = n;
} else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) { } else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) {
FLAGS_threads = n; FLAGS_threads = n;
} else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) { } else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) {

+ 1218
- 0
benchmarks/db_bench_kv.cc
File diff suppressed because it is too large
View File


+ 1
- 1
db/c.cc View File

@ -349,7 +349,7 @@ void leveldb_writebatch_iterate(const leveldb_writebatch_t* b, void* state,
void* state_; void* state_;
void (*put_)(void*, const char* k, size_t klen, const char* v, size_t vlen); void (*put_)(void*, const char* k, size_t klen, const char* v, size_t vlen);
void (*deleted_)(void*, const char* k, size_t klen); void (*deleted_)(void*, const char* k, size_t klen);
void Put(const Slice& key, const Slice& value) override {
void Put(const Slice& key, const Slice& value, leveldb::ValueType type = leveldb::kTypeValue) override {
(*put_)(state_, key.data(), key.size(), value.data(), value.size()); (*put_)(state_, key.data(), key.size(), value.data(), value.size());
} }
void Delete(const Slice& key) override { void Delete(const Slice& key) override {

+ 549
- 53
db/db_impl.cc View File

@ -11,7 +11,9 @@
#include <set> #include <set>
#include <string> #include <string>
#include <vector> #include <vector>
#include <iostream>
#include "fields.h"
#include "db/builder.h" #include "db/builder.h"
#include "db/db_iter.h" #include "db/db_iter.h"
#include "db/dbformat.h" #include "db/dbformat.h"
@ -35,8 +37,12 @@
#include "util/logging.h" #include "util/logging.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "db/vlog_reader.h"
namespace leveldb { namespace leveldb {
using namespace log;
const int kNumNonTableCacheFiles = 10; const int kNumNonTableCacheFiles = 10;
// Information kept for every waiting writer // Information kept for every waiting writer
@ -136,16 +142,25 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
db_lock_(nullptr), db_lock_(nullptr),
shutting_down_(false), shutting_down_(false),
background_work_finished_signal_(&mutex_), background_work_finished_signal_(&mutex_),
garbage_collection_work_signal_(&mutex_),
mem_(nullptr), mem_(nullptr),
imm_(nullptr), imm_(nullptr),
has_imm_(false), has_imm_(false),
logfile_(nullptr), logfile_(nullptr),
logfile_number_(0), logfile_number_(0),
log_(nullptr),
seed_(0), seed_(0),
tmp_batch_(new WriteBatch), tmp_batch_(new WriteBatch),
background_compaction_scheduled_(false), background_compaction_scheduled_(false),
background_GarbageCollection_scheduled_(false),
finish_back_garbage_collection_(false),
manual_compaction_(nullptr), manual_compaction_(nullptr),
vlog_(nullptr),
vlog_kv_numbers_(0),
garbage_collection_management_(new SeparateManagement(raw_options.garbage_collection_threshold) ),
versions_(new VersionSet(dbname_, &options_, table_cache_, versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {} &internal_comparator_)) {}
@ -156,6 +171,9 @@ DBImpl::~DBImpl() {
while (background_compaction_scheduled_) { while (background_compaction_scheduled_) {
background_work_finished_signal_.Wait(); background_work_finished_signal_.Wait();
} }
while(background_GarbageCollection_scheduled_){
garbage_collection_work_signal_.Wait();
}
mutex_.Unlock(); mutex_.Unlock();
if (db_lock_ != nullptr) { if (db_lock_ != nullptr) {
@ -166,7 +184,7 @@ DBImpl::~DBImpl() {
if (mem_ != nullptr) mem_->Unref(); if (mem_ != nullptr) mem_->Unref();
if (imm_ != nullptr) imm_->Unref(); if (imm_ != nullptr) imm_->Unref();
delete tmp_batch_; delete tmp_batch_;
delete log_;
delete vlog_;
delete logfile_; delete logfile_;
delete table_cache_; delete table_cache_;
@ -213,6 +231,14 @@ Status DBImpl::NewDB() {
return s; return s;
} }
Env* DBImpl::GetEnv() const {
return env_;
}
std::string DBImpl::GetDBName() const {
return dbname_;
}
void DBImpl::MaybeIgnoreError(Status* s) const { void DBImpl::MaybeIgnoreError(Status* s) const {
if (s->ok() || options_.paranoid_checks) { if (s->ok() || options_.paranoid_checks) {
// No change needed // No change needed
@ -346,11 +372,17 @@ Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
uint64_t number; uint64_t number;
FileType type; FileType type;
std::vector<uint64_t> logs; std::vector<uint64_t> logs;
uint64_t max_number = 0;
for (size_t i = 0; i < filenames.size(); i++) { for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) { if (ParseFileName(filenames[i], &number, &type)) {
// begin 注释: max_number 为要恢复的最新的文件编号
if (number > max_number) max_number = number;
// expected 里的文件现在依然存在,可以删除
expected.erase(number); expected.erase(number);
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
// 保存当前已有的 vlog 文件,基于它们进行恢复
if (type == kLogFile)
logs.push_back(number); logs.push_back(number);
// end
} }
} }
if (!expected.empty()) { if (!expected.empty()) {
@ -362,18 +394,34 @@ Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
// Recover in the order in which the logs were generated // Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end()); std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
&max_sequence);
if (!s.ok()) {
return s;
}
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
assert( logs.size() == 0 || logs[logs.size() - 1] >= versions_->ImmLogFileNumber() );
// for (size_t i = 0; i < logs.size(); i++) {
// s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
// &max_sequence);
// if (!s.ok()) {
// return s;
// }
// // The previous incarnation may not have written any MANIFEST
// // records after allocating this log number. So we manually
// // update the file number allocation counter in VersionSet.
// versions_->MarkFileNumberUsed(logs[i]);
// }
//注释:逐个恢复日志的内容
bool found_sequence_pos = false;
for(int i = 0; i < logs.size(); ++i){
if( logs[i] < versions_->ImmLogFileNumber() ) {
continue;
}
Log(options_.info_log, "RecoverLogFile old log: %06llu \n", static_cast<unsigned long long>(logs[i]));
// 重做日志操作
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
&max_sequence, found_sequence_pos);
if (!s.ok()) {
return s;
}
} }
versions_->MarkFileNumberUsed(max_number);
if (versions_->LastSequence() < max_sequence) { if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence); versions_->SetLastSequence(max_sequence);
@ -382,9 +430,11 @@ Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
return Status::OK(); return Status::OK();
} }
/* 恢复内存中的数据,将 VLog 中记录的操作读取出来,重新写入到 memtable */
Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
bool* save_manifest, VersionEdit* edit, bool* save_manifest, VersionEdit* edit,
SequenceNumber* max_sequence) {
SequenceNumber* max_sequence,
bool& found_sequence_pos) {
struct LogReporter : public log::Reader::Reporter { struct LogReporter : public log::Reader::Reporter {
Env* env; Env* env;
Logger* info_log; Logger* info_log;
@ -427,21 +477,52 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
std::string scratch; std::string scratch;
Slice record; Slice record;
WriteBatch batch; WriteBatch batch;
uint64_t record_offset = 0;
int compactions = 0; int compactions = 0;
MemTable* mem = nullptr; MemTable* mem = nullptr;
//注释:设置 imm_last_sequence
uint64_t imm_last_sequence = versions_->ImmLastSequence();
while (reader.ReadRecord(&record, &scratch) && status.ok()) { while (reader.ReadRecord(&record, &scratch) && status.ok()) {
if (record.size() < 12) {
// if (record.size() < 12) {
if (record.size() < 20) {
reporter.Corruption(record.size(), reporter.Corruption(record.size(),
Status::Corruption("log record too small")); Status::Corruption("log record too small"));
continue; continue;
} }
// begin 如果 imm_last_sequence == 0 的话,
// 那么整个说明没有进行一次 imm 转 sst的情况,所有的 log 文件都需要进行回收
// 回收编号最大的 log 文件即可
if( !found_sequence_pos && imm_last_sequence != 0 ){
Slice tmp = record;
tmp.remove_prefix(8);
uint64_t seq = DecodeFixed64(tmp.data());
tmp.remove_prefix(8);
uint64_t kv_numbers = DecodeFixed32(tmp.data());
// 解析出来的 seq 不符合要求跳过。恢复时定位 seq 位置一定要大于等于 versions_->LastSequence()
if( ( seq + kv_numbers - 1 ) < imm_last_sequence ) {
record_offset += record.size();
continue;
}else if( ( seq + kv_numbers - 1 ) == imm_last_sequence ){
// open db 落盘过 sst,再一次打开 db
found_sequence_pos = true;
record_offset += record.size();
continue;
} else { // open db 之后没有落盘过 sst,然后关闭 db,第二次恢复的时候
found_sequence_pos = true;
}
}
// 去除头部信息 crc 和length
record.remove_prefix(log::vHeaderSize);
// end
WriteBatchInternal::SetContents(&batch, record); WriteBatchInternal::SetContents(&batch, record);
if (mem == nullptr) { if (mem == nullptr) {
mem = new MemTable(internal_comparator_); mem = new MemTable(internal_comparator_);
mem->Ref(); mem->Ref();
} }
status = WriteBatchInternal::InsertInto(&batch, mem);
// status = WriteBatchInternal::InsertInto(&batch, mem);
status = WriteBatchInternal::InsertInto(&batch, mem,log_number,record_offset + 4);
MaybeIgnoreError(&status); MaybeIgnoreError(&status);
if (!status.ok()) { if (!status.ok()) {
break; break;
@ -455,6 +536,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
compactions++; compactions++;
*save_manifest = true; *save_manifest = true;
// 注释:mem 落盘修改 imm_last_sequence,版本恢复
versions_->SetImmLastSequence(mem->GetTailSequence());
versions_->SetImmLogFileNumber(log_number);
status = WriteLevel0Table(mem, edit, nullptr); status = WriteLevel0Table(mem, edit, nullptr);
mem->Unref(); mem->Unref();
mem = nullptr; mem = nullptr;
@ -464,6 +550,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
break; break;
} }
} }
// 前面已经移除了一个头部了,所以偏移位置要个头部
record_offset += record.size() + log::vHeaderSize ;
} }
delete file; delete file;
@ -471,13 +559,13 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
// See if we should keep reusing the last log file. // See if we should keep reusing the last log file.
if (status.ok() && options_.reuse_logs && last_log && compactions == 0) { if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
assert(logfile_ == nullptr); assert(logfile_ == nullptr);
assert(log_ == nullptr);
assert(vlog_ == nullptr);
assert(mem_ == nullptr); assert(mem_ == nullptr);
uint64_t lfile_size; uint64_t lfile_size;
if (env_->GetFileSize(fname, &lfile_size).ok() && if (env_->GetFileSize(fname, &lfile_size).ok() &&
env_->NewAppendableFile(fname, &logfile_).ok()) { env_->NewAppendableFile(fname, &logfile_).ok()) {
Log(options_.info_log, "Reusing old log %s \n", fname.c_str()); Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
log_ = new log::Writer(logfile_, lfile_size);
vlog_ = new log::VlogWriter(logfile_, lfile_size);
logfile_number_ = log_number; logfile_number_ = log_number;
if (mem != nullptr) { if (mem != nullptr) {
mem_ = mem; mem_ = mem;
@ -493,6 +581,10 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
if (mem != nullptr) { if (mem != nullptr) {
// mem did not get reused; compact it. // mem did not get reused; compact it.
if (status.ok()) { if (status.ok()) {
//注释: mem 落盘修改 imm_last_sequence,版本恢复
versions_->SetImmLastSequence(mem->GetTailSequence());
versions_->SetImmLogFileNumber(log_number);
*save_manifest = true; *save_manifest = true;
status = WriteLevel0Table(mem, edit, nullptr); status = WriteLevel0Table(mem, edit, nullptr);
} }
@ -565,7 +657,13 @@ void DBImpl::CompactMemTable() {
if (s.ok()) { if (s.ok()) {
edit.SetPrevLogNumber(0); edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
// s = versions_->LogAndApply(&edit, &mutex_);
//注释: 构建新版本,并将其加入到 version_当中
versions_->StartImmLastSequence(true);
versions_->SetImmLastSequence(imm_->GetTailSequence());
versions_->SetImmLogFileNumber(imm_->GetLogFileNumber());
s = versions_->LogAndApply(&edit, &mutex_); s = versions_->LogAndApply(&edit, &mutex_);
versions_->StartImmLastSequence(false);
} }
if (s.ok()) { if (s.ok()) {
@ -661,6 +759,8 @@ void DBImpl::RecordBackgroundError(const Status& s) {
if (bg_error_.ok()) { if (bg_error_.ok()) {
bg_error_ = s; bg_error_ = s;
background_work_finished_signal_.SignalAll(); background_work_finished_signal_.SignalAll();
// 注释:唤醒后台 GC 线程
garbage_collection_work_signal_.SignalAll();
} }
} }
@ -681,6 +781,190 @@ void DBImpl::MaybeScheduleCompaction() {
} }
} }
// 注释:获取所有 VLogs
Status DBImpl::GetAllValueLog(std::string dir, std::vector<uint64_t>& logs) {
logs.clear();
std::vector<std::string> filenames;
// 获取 VLogs 列表
Status s = env_->GetChildren(dir, &filenames);
if (!s.ok()) {
return s;
}
uint64_t number;
FileType type;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
// 获取所有 .log 文件
if (type == kLogFile)
logs.push_back(number);
}
}
return s;
}
// 注释:手动进行离线回收
Status DBImpl::OutLineGarbageCollection(){
MutexLock l(&mutex_);
Status s;
// map_file_info_ 非空,则根据它进行回收,否则进行所有 VLog 的回收
if (!garbage_collection_management_->EmptyMap()) {
garbage_collection_management_->CollectionMap();
uint64_t last_sequence = versions_->LastSequence();
garbage_collection_management_->ConvertQueue(last_sequence);
versions_->SetLastSequence(last_sequence);
MaybeScheduleGarbageCollection();
return Status();
}
return s;
}
// 注释:在线 GC,读取并回收一个 vlog 文件,
// next_sequence 指的是第一个没有用到的 sequence(由于是在线 GC ,所以需要提前指定)
Status DBImpl::CollectionValueLog(uint64_t fid, uint64_t& next_sequence) {
struct LogReporter : public log::VlogReader::Reporter {
Status* status;
void Corruption(size_t bytes, const Status& s) override {
if (this->status->ok()) *this->status = s;
}
};
LogReporter report;
std::string logName = LogFileName(dbname_, fid);
SequentialFile* lfile;
Status status = env_->NewSequentialFile(logName, &lfile);
if (!status.ok()) {
Log(options_.info_log, "Garbage Collection Open file error: %s", status.ToString().c_str());
return status;
}
log::VlogReader reader(lfile, &report);
Slice record;
std::string scratch;
// record_offset 是每条 record 相对 VLog head 的偏移
uint64_t record_offset = 0;
uint64_t size_offset = 0;
WriteOptions opt(options_.background_garbage_collection_separate_);
WriteBatch batch(opt.separate_threshold);
batch.setGarbageCollection(true);
WriteBatchInternal::SetSequence(&batch, next_sequence);
while (reader.ReadRecord(&record, &scratch)) {
const char* head_record_ptr = record.data();
record.remove_prefix(log::vHeaderSize + log::wHeaderSize);
while (record.size() > 0) {
const char* head_kv_ptr = record.data();
uint64_t kv_offset = record_offset + head_kv_ptr - head_record_ptr;
ValueType type = static_cast<ValueType>(record[0]);
record.remove_prefix(1);
Slice key;
Slice value;
std::string get_value;
GetLengthPrefixedSlice(&record,&key);
if (type != kTypeDeletion) {
GetLengthPrefixedSlice(&record,&value);
}
if (type != kTypeSeparation) {
continue;
}
status = this->GetLsm(key,&get_value);
// 1. 从 LSM-tree 中找不到 key,说明这个 key 被删除了,vlog中要丢弃
// 2. 找到了 key,但是最新的 kv 对不是 KV 分离的情况,也丢弃
if (status.IsNotFound() || !status.IsSeparated()) {
continue;
}
if (!status.ok()) {
std::cout<<"read the file error "<<std::endl;
return status;
}
// 判断是否要丢弃旧值
Slice get_slice(get_value);
uint64_t lsm_fid;
uint64_t lsm_offset;
GetVarint64(&get_slice,&lsm_fid);
GetVarint64(&get_slice,&lsm_offset);
if (fid == lsm_fid && lsm_offset == kv_offset) {
batch.Put(key, value);
++next_sequence;
if (kv_offset - size_offset > config::gcWriteBatchSize) {
Write(opt, &batch);
batch.Clear();
batch.setGarbageCollection(true);
WriteBatchInternal::SetSequence(&batch, next_sequence);
uint64_t kv_size;
GetVarint64(&get_slice,&kv_size);
size_offset = kv_offset + kv_size;
}
}
}
record_offset += record.data() - head_record_ptr;
}
Write(opt, &batch);
status = env_->RemoveFile(logName);
if (status.ok()) {
garbage_collection_management_->RemoveFileFromMap(fid);
}
return status;
}
// 注释:后台 GC 任务
void DBImpl::BackGroundGarbageCollection(){
uint64_t fid;
uint64_t last_sequence;
while( true){
Log(options_.info_log, "garbage collection file number: %lu", fid);
if( !garbage_collection_management_->GetGarbageCollectionQueue(fid,last_sequence) ){
return;
}
// 在线的 gc 回收的 sequence 是要提前就分配好的。
CollectionValueLog(fid,last_sequence);
}
}
// 注释:可能调度后台线程进行 GC
void DBImpl::MaybeScheduleGarbageCollection() {
mutex_.AssertHeld();
if (background_GarbageCollection_scheduled_) {
// Already scheduled
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
} else {
// 设置调度变量,通过 detach 线程调度; detach 线程即使主线程退出,依然可以正常执行完成
background_GarbageCollection_scheduled_ = true;
env_->ScheduleForGarbageCollection(&DBImpl::GarbageCollectionBGWork, this);
}
}
// 注释:后台 gc 线程中执行的任务
void DBImpl::GarbageCollectionBGWork(void* db) {
reinterpret_cast<DBImpl*>(db)->GarbageCollectionBackgroundCall();
}
// 注释:后台 gc 线程执行
void DBImpl::GarbageCollectionBackgroundCall() {
assert(background_GarbageCollection_scheduled_);
if (shutting_down_.load(std::memory_order_acquire)) {
// No more background work when shutting down.
} else if (!bg_error_.ok()) {
// No more background work after a background error.
} else {
BackGroundGarbageCollection();
}
background_GarbageCollection_scheduled_ = false;
// 再调用 MaybeScheduleGarbageCollection 检查是否需要再次调度
MaybeScheduleGarbageCollection();
garbage_collection_work_signal_.SignalAll();
}
// end
void DBImpl::BGWork(void* db) { void DBImpl::BGWork(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundCall(); reinterpret_cast<DBImpl*>(db)->BackgroundCall();
} }
@ -756,6 +1040,13 @@ void DBImpl::BackgroundCompaction() {
if (!status.ok()) { if (!status.ok()) {
RecordBackgroundError(status); RecordBackgroundError(status);
} }
// begin 注释: compact 后需要考虑是否将 vlog 文件进行 gc 回收,
// 如果需要则将其加入到 GC 任务队列中
// 不进行后台的 gc 回收,那么也不用更新待分配 sequence 的 vlog
if(!finish_back_garbage_collection_){
garbage_collection_management_->UpdateQueue(versions_->ImmLogFileNumber() );
}
CleanupCompaction(compact); CleanupCompaction(compact);
c->ReleaseInputs(); c->ReleaseInputs();
RemoveObsoleteFiles(); RemoveObsoleteFiles();
@ -1013,7 +1304,22 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
break; break;
} }
} }
}
} else {
// begin 注释:drop 掉 LSM-tree 中的 kv 数值对了,
// 对属于 KV 分离的 kv 数值对进行 GC
Slice drop_value = input->value();
if( ikey.type == kTypeSeparation ){
uint64_t fid = 0;
uint64_t offset = 0;
uint64_t size = 0;
GetVarint64(&drop_value,&fid);
GetVarint64(&drop_value,&offset);
GetVarint64(&drop_value,&size);
mutex_.Lock();
garbage_collection_management_->UpdateMap(fid,size);
mutex_.Unlock();
}
} // end
input->Next(); input->Next();
} }
@ -1117,6 +1423,68 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
return versions_->MaxNextLevelOverlappingBytes(); return versions_->MaxNextLevelOverlappingBytes();
} }
/* 从 VlogReader 读取的 VLog kv 数据对中解析出 value */
bool DBImpl::ParseVlogValue(Slice key_value, Slice key,
std::string& value, uint64_t val_size) {
Slice k_v = key_value;
if (k_v[0] != kTypeSeparation) return false;
k_v.remove_prefix(1);
Slice vlog_key;
Slice vlog_value;
if (GetLengthPrefixedSlice(&k_v, &vlog_key)
&& vlog_key == key
&& GetLengthPrefixedSlice(&k_v, &vlog_value)
&& vlog_value.size() == val_size) {
value = vlog_value.ToString();
return true;
} else {
return false;
}
}
Status DBImpl::GetLsm(const Slice& key, std::string* value) {
MutexLock l(&mutex_);
ReadOptions options;
MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
if( !this->snapshots_.empty() ){
options.snapshot = this->snapshots_.oldest();
}
SequenceNumber snapshot;
if (options.snapshot != nullptr) {
snapshot = static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
} else {
snapshot = versions_->LastSequence();
}
Status s;
mem->Ref();
// imm 不一定存在,但是 mem 是一定存在的。
if (imm != nullptr) imm->Ref();
current->Ref(); // Version 读引用计数增一
Version::GetStats stats;
// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s )) {
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {
//在Version中查找是否包含指定key值
s = current->Get(options, lkey, value, &stats);
}
mutex_.Lock();
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref(); //Version 读引用计数减一
return s;
}
Status DBImpl::Get(const ReadOptions& options, const Slice& key, Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) { std::string* value) {
Status s; Status s;
@ -1161,6 +1529,56 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
mem->Unref(); mem->Unref();
if (imm != nullptr) imm->Unref(); if (imm != nullptr) imm->Unref();
current->Unref(); current->Unref();
/* Vlog 读取 value */
if (s.ok() && s.IsSeparated()) {
struct VlogReporter : public VlogReader::Reporter {
Status* status;
void Corruption(size_t bytes, const Status& s) override {
if (this->status->ok()) *this->status = s;
}
};
VlogReporter reporter;
Slice vlog_ptr(*value);
uint64_t file_no;
uint64_t offset;
uint64_t val_size;
size_t key_size = key.size();
/* 已知该 value 保存在 VLog,解码出 vlog_ptr(fid, offset, val_size)*/
GetVarint64(&vlog_ptr, &file_no);
GetVarint64(&vlog_ptr, &offset);
GetVarint64(&vlog_ptr, &val_size);
/* VLog 内部 kv 对的编码长度,1B 为 type */
uint64_t encoded_len = 1 + VarintLength(key_size) + key.size() + VarintLength(val_size) + val_size;
std::string fname = LogFileName(dbname_, file_no);
RandomAccessFile* file;
s = env_->NewRandomAccessFile(fname,&file);
if (!s.ok()) {
return s;
}
VlogReader vlogReader(file, &reporter);
Slice key_value;
char* scratch = new char[encoded_len];
if (vlogReader.ReadValue(offset, encoded_len, &key_value, scratch)) {
value->clear();
if (!ParseVlogValue(key_value, key, *value, val_size)) {
s = Status::Corruption("value in vlog isn't match with given key");
}
} else {
s = Status::Corruption("read vlog error");
}
delete file;
file = nullptr;
}
return s; return s;
} }
@ -1185,14 +1603,38 @@ void DBImpl::RecordReadSample(Slice key) {
const Snapshot* DBImpl::GetSnapshot() { const Snapshot* DBImpl::GetSnapshot() {
MutexLock l(&mutex_); MutexLock l(&mutex_);
// begin 注释:建立快照,对快照之后的信息不用进行 GC
finish_back_garbage_collection_ = true;
// end
return snapshots_.New(versions_->LastSequence()); return snapshots_.New(versions_->LastSequence());
} }
void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) { void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
MutexLock l(&mutex_); MutexLock l(&mutex_);
snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot)); snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot));
// begin 注释:没有快照,重新进行后台 GC
if (snapshots_.empty()) {
finish_back_garbage_collection_ = false;
}
// end
} }
/*** DBImpl 类关于 Fields 类的 Put、Get 接口 ***/
Status DBImpl::PutFields(const WriteOptions& o, const Slice& key, const Fields& fields) {
return DBImpl::Put(o, key, Slice(fields.SerializeValue()));
}
Status DBImpl::GetFields(const ReadOptions& o, const Slice& key, Fields& fields) {
std::string value_str;
Status s = DBImpl::Get(o, key, &value_str);
if (!s.ok()) return s;
fields = Fields::ParseValue(value_str);
return Status::OK();
}
/**************************************************/
// Convenience methods // Convenience methods
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val); return DB::Put(o, key, val);
@ -1221,10 +1663,30 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Status status = MakeRoomForWrite(updates == nullptr); Status status = MakeRoomForWrite(updates == nullptr);
uint64_t last_sequence = versions_->LastSequence(); uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w; Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer); WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
// begin 注释:GC 流程中写回的 WriteBatch 在 CollectionValueLog 函数中已经设置好了
if (!write_batch->IsGarbageCollection()) {
// 判断是否需要进行 GC
// 如需要,空出一块 sequence 区域, 触发 GC 将在 MakeRoomForWrite 里
// 先判断是否要进行 gc 后台回收
// 如果建立了快照,finish_back_garbage_collection_ 就为 true
// 此时不进行 sequence 分配
if (!finish_back_garbage_collection_
&& garbage_collection_management_->ConvertQueue(last_sequence)) {
MaybeScheduleGarbageCollection();
}
// SetSequence 在 write_batch 中写入本次的 sequence
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
}
// 这里设置 last_sequence 是为了确保离线 GC 的时候,
// 在 map 存在的时候需要调用 ConvertQueue 给回收任务分配 sequence
versions_->SetLastSequence(last_sequence);
vlog_kv_numbers_ += WriteBatchInternal::Count(write_batch);
// end
// Add to log and apply to memtable. We can release the lock // Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging // during this phase since &w is currently responsible for logging
@ -1232,7 +1694,11 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
// into mem_. // into mem_.
{ {
mutex_.Unlock(); mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
// 先写入vlog再写入memtable
// 写vlog日志 offset 表示这个 write_batch 在vlog中的偏移地址。
uint64_t offset = 0;
status = vlog_->AddRecord(WriteBatchInternal::Contents(write_batch),offset);
// status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false; bool sync_error = false;
if (status.ok() && options.sync) { if (status.ok() && options.sync) {
status = logfile_->Sync(); status = logfile_->Sync();
@ -1241,7 +1707,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
} }
} }
if (status.ok()) { if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
status = WriteBatchInternal::InsertInto(write_batch, mem_, logfile_number_, offset);
} }
mutex_.Lock(); mutex_.Lock();
if (sync_error) { if (sync_error) {
@ -1299,11 +1765,15 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
++iter; // Advance past "first" ++iter; // Advance past "first"
for (; iter != writers_.end(); ++iter) { for (; iter != writers_.end(); ++iter) {
Writer* w = *iter; Writer* w = *iter;
if (w->sync && !first->sync) {
// begin 注释:写队列中如果遍历到是 gc 的 WriteBatch,停止合并
if (w->sync && !first->sync
|| first->batch->IsGarbageCollection()
|| w->batch->IsGarbageCollection()) {
// 当前 Writer要求 Sync ,而第一个 Writer 不要求 Sync,两个磁盘写入策略不一致。不做合并操作
// Do not include a sync write into a batch handled by a non-sync write. // Do not include a sync write into a batch handled by a non-sync write.
break; break;
} }
// end
if (w->batch != nullptr) { if (w->batch != nullptr) {
size += WriteBatchInternal::ByteSize(w->batch); size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size) { if (size > max_size) {
@ -1332,6 +1802,23 @@ Status DBImpl::MakeRoomForWrite(bool force) {
assert(!writers_.empty()); assert(!writers_.empty());
bool allow_delay = !force; bool allow_delay = !force;
Status s; Status s;
if (logfile_->GetSize() > options_.max_value_log_size) {
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = nullptr;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
versions_->ReuseFileNumber(new_log_number);
}
garbage_collection_management_->WriteFileMap(logfile_number_, vlog_kv_numbers_, logfile_->GetSize());
vlog_kv_numbers_ = 0;
delete vlog_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
vlog_ = new log::VlogWriter(lfile);
}
while (true) { while (true) {
if (!bg_error_.ok()) { if (!bg_error_.ok()) {
// Yield previous error // Yield previous error
@ -1365,33 +1852,9 @@ Status DBImpl::MakeRoomForWrite(bool force) {
} else { } else {
// Attempt to switch to a new memtable and trigger compaction of old // Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0); assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = nullptr;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_;
s = logfile_->Close();
if (!s.ok()) {
// We may have lost some data written to the previous log file.
// Switch to the new log file anyway, but record as a background
// error so we do not attempt any more writes.
//
// We could perhaps attempt to save the memtable corresponding
// to log file and suppress the error if that works, but that
// would add more complexity in a critical code path.
RecordBackgroundError(s);
}
delete logfile_;
mem_->SetLogFileNumber(logfile_number_);
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_; imm_ = mem_;
has_imm_.store(true, std::memory_order_release); has_imm_.store(true, std::memory_order_release);
mem_ = new MemTable(internal_comparator_); mem_ = new MemTable(internal_comparator_);
@ -1486,7 +1949,7 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
// Default implementations of convenience methods that subclasses of DB // Default implementations of convenience methods that subclasses of DB
// can call if they wish // can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
WriteBatch batch(opt.separate_threshold);
batch.Put(key, value); batch.Put(key, value);
return Write(opt, &batch); return Write(opt, &batch);
} }
@ -1508,6 +1971,13 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
// Recover handles create_if_missing, error_if_exists // Recover handles create_if_missing, error_if_exists
bool save_manifest = false; bool save_manifest = false;
Status s = impl->Recover(&edit, &save_manifest); Status s = impl->Recover(&edit, &save_manifest);
// begin 注释: Recover 之后,获取所有 VLogs
std::vector<uint64_t> logs;
s = impl->GetAllValueLog(dbname, logs);
sort(logs.begin(),logs.end());
// end
if (s.ok() && impl->mem_ == nullptr) { if (s.ok() && impl->mem_ == nullptr) {
// Create new log and a corresponding memtable. // Create new log and a corresponding memtable.
uint64_t new_log_number = impl->versions_->NewFileNumber(); uint64_t new_log_number = impl->versions_->NewFileNumber();
@ -1518,7 +1988,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
edit.SetLogNumber(new_log_number); edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile; impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number; impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
impl->vlog_ = new log::VlogWriter(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_); impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref(); impl->mem_->Ref();
} }
@ -1526,12 +1996,38 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
if (s.ok() && save_manifest) { if (s.ok() && save_manifest) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery. edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_); edit.SetLogNumber(impl->logfile_number_);
// s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
// begin 注释:把 imm_last_sequence 设置到新的 manifest 当中,
// 即 RecoverLogFile 中判断上一次断电时的数据库状态的 imm -> sst 的情况,
// 表示一次成功的全盘恢复
impl->versions_->StartImmLastSequence(true);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_); s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
impl->versions_->StartImmLastSequence(false);
// end
} }
if (s.ok()) { if (s.ok()) {
impl->RemoveObsoleteFiles(); impl->RemoveObsoleteFiles();
impl->MaybeScheduleCompaction(); impl->MaybeScheduleCompaction();
} }
// begin 开始全盘回收
if (s.ok() && impl->options_.start_garbage_collection) {
if( s.ok() ){
int size = logs.size();
for( int i = 0; i < size ; i++){
uint64_t fid = logs[i];
uint64_t next_sequence = impl->versions_->LastSequence() + 1;
std::cout<<" collection file : "<<fid<<std::endl;
impl->mutex_.Unlock();
Status stmp = impl->CollectionValueLog(fid, next_sequence);
impl->mutex_.Lock();
if (!stmp.ok()) s = stmp;
impl->versions_->SetLastSequence(next_sequence - 1);
}
}
}
// end
impl->mutex_.Unlock(); impl->mutex_.Unlock();
if (s.ok()) { if (s.ok()) {
assert(impl->mem_ != nullptr); assert(impl->mem_ != nullptr);

+ 48
- 2
db/db_impl.h View File

@ -12,11 +12,15 @@
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/log_writer.h" #include "db/log_writer.h"
#include "db/vlog_writer.h"
#include "db/kv_separate_management.h"
#include "db/snapshot.h" #include "db/snapshot.h"
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "port/port.h" #include "port/port.h"
#include "port/thread_annotations.h" #include "port/thread_annotations.h"
#include "port/port_stdcxx.h"
namespace leveldb { namespace leveldb {
@ -25,6 +29,7 @@ class TableCache;
class Version; class Version;
class VersionEdit; class VersionEdit;
class VersionSet; class VersionSet;
class Fields;
class DBImpl : public DB { class DBImpl : public DB {
public: public:
@ -36,6 +41,9 @@ class DBImpl : public DB {
~DBImpl() override; ~DBImpl() override;
// Implementations of the DB interface // Implementations of the DB interface
Status PutFields(const WriteOptions&, const Slice& key, const Fields& fields) override;
Status GetFields(const ReadOptions& options, const Slice& key, Fields& fields) override;
Status Put(const WriteOptions&, const Slice& key, Status Put(const WriteOptions&, const Slice& key,
const Slice& value) override; const Slice& value) override;
Status Delete(const WriteOptions&, const Slice& key) override; Status Delete(const WriteOptions&, const Slice& key) override;
@ -49,6 +57,11 @@ class DBImpl : public DB {
void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override;
void CompactRange(const Slice* begin, const Slice* end) override; void CompactRange(const Slice* begin, const Slice* end) override;
static bool ParseVlogValue(Slice key_value, Slice key, std::string& value, uint64_t val_size);
Env* GetEnv() const;
std::string GetDBName() const;
// Extra methods (for testing) that are not in the public DB interface // Extra methods (for testing) that are not in the public DB interface
// Compact any files in the named level that overlap [*begin,*end] // Compact any files in the named level that overlap [*begin,*end]
@ -71,6 +84,12 @@ class DBImpl : public DB {
// bytes. // bytes.
void RecordReadSample(Slice key); void RecordReadSample(Slice key);
// begin 线
Status OutLineGarbageCollection();
// 线 GC vlog
Status GetAllValueLog(std::string dir, std::vector<uint64_t>& logs);
// end
private: private:
friend class DB; friend class DB;
struct CompactionState; struct CompactionState;
@ -124,7 +143,7 @@ class DBImpl : public DB {
void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_); void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status RecoverLogFile(uint64_t log_number, bool last_log, bool* save_manifest, Status RecoverLogFile(uint64_t log_number, bool last_log, bool* save_manifest,
VersionEdit* edit, SequenceNumber* max_sequence)
VersionEdit* edit, SequenceNumber* max_sequence,bool& found_sequence_pos)
EXCLUSIVE_LOCKS_REQUIRED(mutex_); EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base) Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base)
@ -138,6 +157,16 @@ class DBImpl : public DB {
void RecordBackgroundError(const Status& s); void RecordBackgroundError(const Status& s);
void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
// begin Compaction GC
// GCBGWorkGCBackgroundCallMaybeScheduleGCBackGroundGC
static void GarbageCollectionBGWork(void* db);
void GarbageCollectionBackgroundCall();
void MaybeScheduleGarbageCollection() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void BackGroundGarbageCollection();
Status CollectionValueLog(uint64_t fid, uint64_t& last_sequence);
// end
static void BGWork(void* db); static void BGWork(void* db);
void BackgroundCall(); void BackgroundCall();
void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
@ -155,6 +184,11 @@ class DBImpl : public DB {
return internal_comparator_.user_comparator(); return internal_comparator_.user_comparator();
} }
// GC VLog record
// LSM-tree record kv
// db GC WriteBatch KV
Status GetLsm( const Slice& key, std::string* value);
// Constant after construction // Constant after construction
Env* const env_; Env* const env_;
const InternalKeyComparator internal_comparator_; const InternalKeyComparator internal_comparator_;
@ -179,7 +213,6 @@ class DBImpl : public DB {
std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_ std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_
WritableFile* logfile_; WritableFile* logfile_;
uint64_t logfile_number_ GUARDED_BY(mutex_); uint64_t logfile_number_ GUARDED_BY(mutex_);
log::Writer* log_;
uint32_t seed_ GUARDED_BY(mutex_); // For sampling. uint32_t seed_ GUARDED_BY(mutex_); // For sampling.
// Queue of writers. // Queue of writers.
@ -203,6 +236,19 @@ class DBImpl : public DB {
Status bg_error_ GUARDED_BY(mutex_); Status bg_error_ GUARDED_BY(mutex_);
CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_); CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_);
log::VlogWriter* vlog_;
int vlog_kv_numbers_;
// begin gc 线
port::CondVar garbage_collection_work_signal_ GUARDED_BY(mutex_);
// gc 线
bool background_GarbageCollection_scheduled_ GUARDED_BY(mutex_);
// true GC 线
bool finish_back_garbage_collection_;
// end
SeparateManagement* garbage_collection_management_;
}; };
// Sanitize db options. The caller should delete result.info_log if // Sanitize db options. The caller should delete result.info_log if

+ 1
- 0
db/db_iter.cc View File

@ -189,6 +189,7 @@ void DBIter::FindNextUserEntry(bool skipping, std::string* skip) {
skipping = true; skipping = true;
break; break;
case kTypeValue: case kTypeValue:
case kTypeSeparation:
if (skipping && if (skipping &&
user_comparator_->Compare(ikey.user_key, *skip) <= 0) { user_comparator_->Compare(ikey.user_key, *skip) <= 0) {
// Entry hidden // Entry hidden

+ 2
- 0
db/db_test.cc View File

@ -190,6 +190,7 @@ class SpecialEnv : public EnvWrapper {
} }
return base_->Sync(); return base_->Sync();
} }
size_t GetSize() { return base_->GetSize(); }
}; };
class ManifestFile : public WritableFile { class ManifestFile : public WritableFile {
private: private:
@ -215,6 +216,7 @@ class SpecialEnv : public EnvWrapper {
return base_->Sync(); return base_->Sync();
} }
} }
size_t GetSize() { return base_->GetSize(); }
}; };
if (non_writable_.load(std::memory_order_acquire)) { if (non_writable_.load(std::memory_order_acquire)) {

+ 4
- 1
db/dbformat.h View File

@ -44,6 +44,9 @@ static const int kMaxMemCompactLevel = 2;
// Approximate gap in bytes between samples of data read during iteration. // Approximate gap in bytes between samples of data read during iteration.
static const int kReadBytesPeriod = 1048576; static const int kReadBytesPeriod = 1048576;
// gc后台回收的时候进行 batch合并之后再写入的大小
static const uint64_t gcWriteBatchSize = 4*1024*1024;
} // namespace config } // namespace config
class InternalKey; class InternalKey;
@ -51,7 +54,7 @@ class InternalKey;
// Value types encoded as the last component of internal keys. // Value types encoded as the last component of internal keys.
// DO NOT CHANGE THESE ENUM VALUES: they are embedded in the on-disk // DO NOT CHANGE THESE ENUM VALUES: they are embedded in the on-disk
// data structures. // data structures.
enum ValueType { kTypeDeletion = 0x0, kTypeValue = 0x1 };
enum ValueType { kTypeDeletion = 0x0, kTypeValue = 0x2, kTypeSeparation = 0x1};
// kValueTypeForSeek defines the ValueType that should be passed when // kValueTypeForSeek defines the ValueType that should be passed when
// constructing a ParsedInternalKey object for seeking to a particular // constructing a ParsedInternalKey object for seeking to a particular
// sequence number (since we sort sequence numbers in decreasing order // sequence number (since we sort sequence numbers in decreasing order

+ 3
- 1
db/dumpfile.cc View File

@ -74,7 +74,7 @@ Status PrintLogContents(Env* env, const std::string& fname,
// Called on every item found in a WriteBatch. // Called on every item found in a WriteBatch.
class WriteBatchItemPrinter : public WriteBatch::Handler { class WriteBatchItemPrinter : public WriteBatch::Handler {
public: public:
void Put(const Slice& key, const Slice& value) override {
void Put(const Slice& key, const Slice& value, ValueType type = kTypeValue) override {
std::string r = " put '"; std::string r = " put '";
AppendEscapedStringTo(&r, key); AppendEscapedStringTo(&r, key);
r += "' '"; r += "' '";
@ -189,6 +189,8 @@ Status DumpTable(Env* env, const std::string& fname, WritableFile* dst) {
r += "del"; r += "del";
} else if (key.type == kTypeValue) { } else if (key.type == kTypeValue) {
r += "val"; r += "val";
} else if (key.type == kTypeSeparation) {
r += "val";
} else { } else {
AppendNumberTo(&r, key.type); AppendNumberTo(&r, key.type);
} }

+ 1
- 0
db/fault_injection_test.cc View File

@ -114,6 +114,7 @@ class TestWritableFile : public WritableFile {
Status Close() override; Status Close() override;
Status Flush() override; Status Flush() override;
Status Sync() override; Status Sync() override;
size_t GetSize() override { return 0; };
private: private:
FileState state_; FileState state_;

+ 292
- 0
db/fields.cpp View File

@ -0,0 +1,292 @@
#include <iostream>
#include <utility>
#include "db/filename.h"
#include "leveldb/env.h"
#include "util/coding.h"
#include "dbformat.h"
#include "fields.h"
#include "vlog_reader.h"
#include "db_impl.h"
namespace leveldb {
/* 构造函数 */
Fields::Fields(FieldArray fields)
: fields_(std::move(fields)) {
SortFields();
}
Fields::Fields(const Field& field)
: fields_({field}) {}
Fields::Fields(const std::vector<std::string>& field_names) {
for (const auto& name : field_names) {
fields_.emplace_back(name, "");
}
SortFields();
}
/* 根据 field_name 从小到大进行排序,减少通过 field_name 遍历 Fields 的耗时 */
void Fields::SortFields() {
std::sort(fields_.begin(), fields_.end(), [](const Field& a, const Field& b) {
return a.first < b.first;
});
}
/* 更新/插入单个字段值 */
void Fields::UpdateField(const std::string& field_name, const std::string& field_value) {
for (auto iter = fields_.begin(); iter != fields_.end(); iter++) {
if ((*iter).first > field_name) {
fields_.insert(iter, {field_name, field_value});
return;
}
if ((*iter).first == field_name) {
(*iter).second = field_value;
return;
}
}
fields_.emplace_back(field_name, field_value);
}
void Fields::UpdateField(const Field& field) {
UpdateField(field.first, field.second);
}
/* 更新/插入多个字段值 */
void Fields::UpdateFields(const std::vector<std::string>& field_names, const std::vector<std::string>& field_values) {
if (field_names.size() != field_values.size()) {
std::cerr << "UpdateFields Failed: field_name and field_values must have the same size." << std::endl;
return;
}
for (size_t i = 0; i < field_names.size(); ++i) {
UpdateField(field_names[i], field_values[i]);
}
}
void Fields::UpdateFields(const FieldArray& fields) {
for (const auto& field : fields) {
UpdateField(field);
}
}
/* 删除单个字段 */
void Fields::DeleteField(const std::string& field_name) {
for (auto iter = fields_.begin(); iter != fields_.end(); iter++) {
if ((*iter).first > field_name) return;
if ((*iter).first == field_name) {
fields_.erase(iter);
return;
}
}
}
/* 删除多个字段 */
void Fields::DeleteFields(const std::vector<std::string>& field_names) {
for (auto &name : field_names) {
if (fields_.empty()) return;
DeleteField(name);
}
}
/* 序列化 Field 或 FieldArray 为 value 字符串 */
std::string Fields::SerializeValue(const FieldArray& fields) {
std::string value_str;
for (const auto& field : fields) {
std::string field_str = SerializeValue(field);
value_str += field_str;
}
return value_str;
}
std::string Fields::SerializeValue(const Field& field) {
std::string value_str;
PutLengthPrefixedSlice(&value_str, Slice(field.first));
PutLengthPrefixedSlice(&value_str, Slice(field.second));
return value_str;
}
std::string Fields::SerializeValue() const {
return SerializeValue(fields_);
}
/* 反序列化 value 字符串为 Fields */
Fields Fields::ParseValue(const std::string& value_str) {
Fields fields;
Slice value_slice(value_str);
while (!value_slice.empty()) {
Slice field_name;
Slice field_value;
if (!GetLengthPrefixedSlice(&value_slice, &field_name)) break;
if (!GetLengthPrefixedSlice(&value_slice, &field_value)) break;
fields.UpdateField(field_name.ToString(), field_value.ToString());
}
return fields;
}
/* 获取字段 */
Field Fields::GetField(const std::string& field_name) const {
for (auto iter = fields_.begin(); iter != fields_.end(); iter++) {
if ((*iter).first == field_name) return *iter;
if ((*iter).first > field_name || iter == fields_.end() - 1) {
std::cerr << "GetField Failed: field name [" + field_name + "] doesn't exist, return {}." << std::endl;
return {};
}
}
std::cerr << "GetField Failed: field name [" + field_name + "] doesn't exist, return {}." << std::endl;
return {};
}
/* 检查字段是否存在 */
bool Fields::HasField(const std::string& field_name) const {
for (auto iter = fields_.begin(); iter != fields_.end(); iter++) {
if ((*iter).first == field_name) return true;
if ((*iter).first > field_name || iter == fields_.end() - 1) return false;
}
return false;
}
/* 重载运算符 [] 用于访问字段值 */
std::string Fields::operator[](const std::string& field_name) const {
for (auto iter = fields_.begin(); iter != fields_.end(); iter++) {
if ((*iter).first == field_name) return (*iter).second;
if ((*iter).first > field_name || iter == fields_.end() - 1) {
static const std::string empty_str;
std::cerr << "GetField Failed: field name [" + field_name + "] doesn't exist." << std::endl;
return empty_str;
}
}
static const std::string empty_str;
std::cerr << "GetField Failed: field name [" + field_name + "] doesn't exist." << std::endl;
return empty_str;
}
std::string& Fields::operator[](const std::string& field_name) {
for (auto iter = fields_.begin(); iter != fields_.end(); iter++) {
if ((*iter).first == field_name) return (*iter).second;
if ((*iter).first > field_name) {
return fields_.insert(iter, {field_name, ""})->second;
}
}
fields_.emplace_back(field_name, "");
return fields_.back().second;
}
/* 通过若干个字段查询 Key */
//std::vector<std::string> Fields::FindKeysByFields(leveldb::DB* db, const FieldArray& fields, const std::string& dbname, Env* env) {
// Fields to_fields = Fields(fields);
// to_fields.Fields::SortFields();
// FieldArray search_fields_ = to_fields.fields_;
//
// std::vector<std::string> find_keys;
//
// Iterator* it = db->NewIterator(leveldb::ReadOptions());
// for (it->SeekToFirst(); it->Valid(); it->Next()) {
//
// std::string iter_key = it->key().ToString();
// if (std::find(find_keys.begin(), find_keys.end(), iter_key) != find_keys.end()){
// continue;
// }
//
// FieldArray iter_fields_ = Fields::ParseValue(it->value().ToString()).fields_;
//
// if (iter_fields_ == search_fields_ ||
// std::includes(iter_fields_.begin(), iter_fields_.end(),
// search_fields_.begin(), search_fields_.end())) {
// find_keys.emplace_back(iter_key);
// }
// }
//
// assert(it->status().ok());
// delete it;
//
// return find_keys;
//}
std::vector<std::string> Fields::FindKeysByFields(leveldb::DB* db, const FieldArray& fields, DBImpl* impl) {
Fields to_fields = Fields(fields);
to_fields.Fields::SortFields();
FieldArray search_fields_ = to_fields.fields_;
std::vector<std::string> find_keys;
Iterator* it = db->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
std::string iter_key = it->key().ToString();
if (std::find(find_keys.begin(), find_keys.end(), iter_key) != find_keys.end()){
continue;
}
FieldArray iter_value_ = Fields::ParseValue(it->value().ToString()).fields_;
if (!iter_value_.empty()) {
if (iter_value_ == search_fields_ ||
std::includes(iter_value_.begin(), iter_value_.end(),
search_fields_.begin(), search_fields_.end())) {
find_keys.emplace_back(iter_key);
}
} else {
uint64_t fid;
uint64_t kv_offset;
uint64_t val_size;
Slice vlog_ptr = it->value();
if (!(GetVarint64(&vlog_ptr, &fid)
&& GetVarint64(&vlog_ptr, &kv_offset)
&& GetVarint64(&vlog_ptr, &val_size))) {
continue;
}
uint64_t encoded_len = 1 + VarintLength(it->key().size()) + it->key().size() + VarintLength(val_size) + val_size;
Env* env = impl->GetEnv();
std::string dbname = impl->GetDBName();
std::string fname = LogFileName(dbname, fid);
RandomAccessFile* file;
Status s = env->NewRandomAccessFile(fname, &file);
if (!s.ok()) {
continue;
}
struct VlogReporter : public log::VlogReader::Reporter {
Status* status;
void Corruption(size_t bytes, const Status& s) override {
if (this->status->ok()) *this->status = s;
}
};
VlogReporter reporter;
log::VlogReader vlogReader(file, &reporter);
Slice key_value;
std::string vlog_value;
char* scratch = new char[encoded_len];
if (vlogReader.ReadValue(kv_offset, encoded_len, &key_value, scratch)) {
if (!DBImpl::ParseVlogValue(key_value, it->key(), vlog_value, val_size)) {
s = Status::Corruption("value in vlog isn't match with given key");
}
} else {
s = Status::Corruption("read vlog error");
}
delete file;
file = nullptr;
iter_value_ = Fields::ParseValue(vlog_value).fields_;
if (iter_value_ == search_fields_ ||
std::includes(iter_value_.begin(), iter_value_.end(),
search_fields_.begin(), search_fields_.end())) {
find_keys.emplace_back(iter_key);
}
}
}
// assert(it->status().ok());
delete it;
return find_keys;
}
} // namespace leveldb

+ 77
- 0
db/fields.h View File

@ -0,0 +1,77 @@
#ifndef LEVELDB_FIELDS_H
#define LEVELDB_FIELDS_H
#include "leveldb/db.h"
#include "db_impl.h"
#include "vector"
namespace leveldb {
using Field = std::pair<std::string, std::string>;
using FieldArray = std::vector<Field>;
class Fields {
private:
FieldArray fields_;
public:
/* 从 FieldArray 构造 */
explicit Fields(FieldArray fields);
/* 从单个 Field 构造 */
explicit Fields(const Field& field);
/* 只传参 field_name 数组的构造 */
explicit Fields(const std::vector<std::string>& field_names);
Fields() = default;
~Fields() = default;
/* 根据 field_name 从小到大进行排序,减少通过 field_name 遍历 Fields 的耗时 */
void SortFields();
/* 更新/插入单个字段值,插入后会进行 field_name 的排序 */
void UpdateField(const std::string& field_name, const std::string& field_value);
void UpdateField(const Field& field);
/* 更新/插入多个字段值 */
void UpdateFields(const std::vector<std::string>& field_names, const std::vector<std::string>& field_values);
void UpdateFields(const FieldArray& fields);
/* 删除单个字段 */
void DeleteField(const std::string& field_name);
/* 删除多个字段 */
void DeleteFields(const std::vector<std::string>& field_names);
/* 序列化 Field 或 FieldArray 为 value 字符串 */
static std::string SerializeValue(const FieldArray& fields);
static std::string SerializeValue(const Field& field);
std::string SerializeValue() const;
/* 反序列化 value 字符串为 Fields */
static Fields ParseValue(const std::string& value_str);
/* 获取字段 */
Field GetField(const std::string& field_name) const;
/* 检查字段是否存在 */
bool HasField(const std::string& field_name) const;
/* 重载运算符 [] 用于访问字段值 */
std::string operator[](const std::string& field_name) const;
/* 重载运算符 [] 用于修改字段值 */
std::string& operator[](const std::string& field_name);
/* 通过若干个字段查询 Key */
static std::vector<std::string> FindKeysByFields(leveldb::DB* db, const FieldArray& fields, leveldb::DBImpl* impl);
using iterator = std::vector<Field>::iterator;
using const_iterator = std::vector<Field>::const_iterator;
iterator begin() { return fields_.begin(); }
const_iterator begin() const { return fields_.cbegin(); }
iterator end() { return fields_.end(); }
const_iterator end() const { return fields_.cend(); }
size_t size() const { return fields_.size(); }
};
} // namespace leveldb
#endif // LEVELDB_FIELDS_H

+ 104
- 0
db/kv_separate_management.cc View File

@ -0,0 +1,104 @@
#include "kv_separate_management.h"
#include <queue>
#include <vector>
#include <db/dbformat.h>
namespace leveldb {
bool SeparateManagement::ConvertQueue(uint64_t& db_sequence) {
if (!need_updates_.empty()) {
db_sequence++;
} else {
return false;
}
while (!need_updates_.empty()) {
ValueLogInfo* info = need_updates_.front();
need_updates_.pop_front();
map_file_info_.erase(info->logfile_number_);
info->last_sequence_ = db_sequence;
db_sequence += info->left_kv_numbers_;
garbage_collection_.push_back(info);
}
assert(db_sequence <= kMaxSequenceNumber);
return true;
}
void SeparateManagement::WriteFileMap(uint64_t fid, int kv_numbers, size_t log_memory) {
assert(map_file_info_.find(fid) == map_file_info_.end());
ValueLogInfo* info = new ValueLogInfo();
info->logfile_number_ = fid;
info->left_kv_numbers_ = kv_numbers;
assert(kv_numbers <= kMaxSequenceNumber);
info->invalid_memory_ = 0;
info->last_sequence_ = -1;
info->file_size_ = log_memory;
map_file_info_.insert(std::make_pair(fid,info));
}
void SeparateManagement::UpdateMap(uint64_t fid, uint64_t abandon_memory) {
if (map_file_info_.find(fid) != map_file_info_.end()) {
ValueLogInfo* info = map_file_info_[fid];
info->left_kv_numbers_--;
info->invalid_memory_ += abandon_memory;
}
}
void SeparateManagement::UpdateQueue(uint64_t fid) {
std::priority_queue<ValueLogInfo*, std::vector<ValueLogInfo*>, MapCmp> sort_priority_;
for (auto iter = map_file_info_.begin(); iter != map_file_info_.end(); ++iter) {
if (delete_files_.find( iter->first) == delete_files_.end()) {
sort_priority_.push(iter->second);
}
}
/* 默认每次只把一个 VLog 加入到 GC 队列 */
int num = 1;
int threshold = garbage_collection_threshold_;
if (!sort_priority_.empty()
&& sort_priority_.top()->invalid_memory_ >= garbage_collection_threshold_ * 1.2) {
/* 如果无效空间最多的 VLog 超过 GC 阈值 20%,这次会把 1~3 个 VLog 加入到 GC 队列 */
num = 3;
threshold = garbage_collection_threshold_ * 1.2;
}
while (!sort_priority_.empty() && num > 0) {
ValueLogInfo* info = sort_priority_.top();
sort_priority_.pop();
/* 优先删除较旧的 VLog */
if (info->logfile_number_ > fid) {
continue;
}
num--;
if (info->invalid_memory_ >= threshold) {
need_updates_.push_back(info);
/* 更新准备 GC(准备删除)的 VLog */
delete_files_.insert(info->logfile_number_);
}
}
}
bool SeparateManagement::GetGarbageCollectionQueue(uint64_t& fid, uint64_t& last_sequence) {
if (garbage_collection_.empty()) {
return false;
} else {
ValueLogInfo* info = garbage_collection_.front();
garbage_collection_.pop_front();
fid = info->logfile_number_;
last_sequence = info->last_sequence_;
return true;
}
}
void SeparateManagement::CollectionMap(){
if (map_file_info_.empty()) return;
for (auto iter : map_file_info_) {
uint64_t fid = iter.first;
ValueLogInfo* info = iter.second;
if (delete_files_.find(fid) == delete_files_.end()) {
need_updates_.push_back(info);
delete_files_.insert(info->logfile_number_);
}
}
}
}

+ 69
- 0
db/kv_separate_management.h View File

@ -0,0 +1,69 @@
#ifndef LEVELDB_KV_SEPARATE_MANAGEMENT_H
#define LEVELDB_KV_SEPARATE_MANAGEMENT_H
#include <unordered_map>
#include <unordered_set>
#include <deque>
#include "leveldb/slice.h"
#include <iterator>
namespace leveldb {
typedef struct ValueLogInfo {
uint64_t last_sequence_; // VLog kv
size_t file_size_; // VLog
uint64_t logfile_number_; // VLog
int left_kv_numbers_; // VLog kv
uint64_t invalid_memory_; // VLog
}ValueLogInfo;
struct MapCmp{
bool operator ()(const ValueLogInfo* a, const ValueLogInfo* b)
{
return a->invalid_memory_ < b->invalid_memory_;
}
};
class SeparateManagement {
public:
SeparateManagement(uint64_t garbage_collection_threshold)
: garbage_collection_threshold_(garbage_collection_threshold) {}
~SeparateManagement() {}
/* 更新数据库的最后一个序列号,为需要 GC 的 VLog 分配新的序列号,返回是否触发 GC */
bool ConvertQueue(uint64_t& db_sequence);
/* 更新指定 VLogInfo 的无效空间和剩余键值对数量 */
void UpdateMap(uint64_t fid, uint64_t abandon_memory);
/* 选择无效空间最大的 VLog 加入到 need_updates_ 队列 */
void UpdateQueue(uint64_t fid);
/* 从 garbage_collection_ 队列中取出一个需要 GC 的 VLog,返回最后(最新)序列号 */
bool GetGarbageCollectionQueue(uint64_t& fid, uint64_t& last_sequence);
/* 在 map_file_info_ 中添加新创建的 VLog 的 ValueLogInfo */
void WriteFileMap(uint64_t fid, int kv_numbers, size_t log_memory);
/* 检查是否有任何 VLog 需要 GC */
bool MayNeedGarbageCollection() { return !garbage_collection_.empty(); }
/* 从 map_file_info_ 中移除指定编号的 VLog */
void RemoveFileFromMap(uint64_t fid) { map_file_info_.erase(fid); }
/* 检查 map_file_info_ 是否为空 */
bool EmptyMap() { return map_file_info_.empty(); }
/* 遍历 map_file_info_ 中的所有 VLog,更新 need_updates_ 以便后续 GC */
void CollectionMap();
private:
// VLog GC
uint64_t garbage_collection_threshold_;
// version VLog
std::unordered_map<uint64_t, ValueLogInfo*> map_file_info_;
// GC VLog
std::deque<ValueLogInfo*> garbage_collection_;
// GC Sequence VLog
std::deque<ValueLogInfo*> need_updates_;
// VLog
std::unordered_set<uint64_t> delete_files_;
};
} // namespace leveldb
#endif //LEVELDB_KV_SEPARATE_MANAGEMENT_H

+ 1
- 0
db/leveldbutil.cc View File

@ -20,6 +20,7 @@ class StdoutPrinter : public WritableFile {
Status Close() override { return Status::OK(); } Status Close() override { return Status::OK(); }
Status Flush() override { return Status::OK(); } Status Flush() override { return Status::OK(); }
Status Sync() override { return Status::OK(); } Status Sync() override { return Status::OK(); }
size_t GetSize() override { return 0; }
}; };
bool HandleDumpCommand(Env* env, char** files, int num) { bool HandleDumpCommand(Env* env, char** files, int num) {

+ 5
- 0
db/log_format.h View File

@ -29,6 +29,11 @@ static const int kBlockSize = 32768;
// Header is checksum (4 bytes), length (2 bytes), type (1 byte). // Header is checksum (4 bytes), length (2 bytes), type (1 byte).
static const int kHeaderSize = 4 + 2 + 1; static const int kHeaderSize = 4 + 2 + 1;
// VlogHeader length (4 bytes).
static const int vHeaderSize = 4;
// write_batch Header
static const int wHeaderSize = 8 + 4 ;
} // namespace log } // namespace log
} // namespace leveldb } // namespace leveldb

+ 1
- 0
db/log_test.cc View File

@ -164,6 +164,7 @@ class LogTest : public testing::Test {
Status Close() override { return Status::OK(); } Status Close() override { return Status::OK(); }
Status Flush() override { return Status::OK(); } Status Flush() override { return Status::OK(); }
Status Sync() override { return Status::OK(); } Status Sync() override { return Status::OK(); }
size_t GetSize() override { return 0; }
Status Append(const Slice& slice) override { Status Append(const Slice& slice) override {
contents_.append(slice.data(), slice.size()); contents_.append(slice.data(), slice.size());
return Status::OK(); return Status::OK();

+ 8
- 1
db/memtable.cc View File

@ -126,9 +126,16 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
value->assign(v.data(), v.size()); value->assign(v.data(), v.size());
return true; return true;
} }
case kTypeDeletion:
case kTypeDeletion: {
*s = Status::NotFound(Slice()); *s = Status::NotFound(Slice());
return true; return true;
}
case kTypeSeparation: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
s->SetSeparated();
return true;
}
} }
} }
} }

+ 7
- 0
db/memtable.h View File

@ -62,6 +62,10 @@ class MemTable {
// Else, return false. // Else, return false.
bool Get(const LookupKey& key, std::string* value, Status* s); bool Get(const LookupKey& key, std::string* value, Status* s);
uint64_t GetTailSequence() { return tail_sequence_; }
uint64_t GetLogFileNumber() { return log_file_number_; }
void SetLogFileNumber(uint64_t fid) { log_file_number_ = fid; }
private: private:
friend class MemTableIterator; friend class MemTableIterator;
friend class MemTableBackwardIterator; friend class MemTableBackwardIterator;
@ -76,6 +80,9 @@ class MemTable {
~MemTable(); // Private since only Unref() should be used to delete it ~MemTable(); // Private since only Unref() should be used to delete it
uint64_t tail_sequence_;
uint64_t log_file_number_;
KeyComparator comparator_; KeyComparator comparator_;
int refs_; int refs_;
Arena arena_; Arena arena_;

+ 34
- 3
db/version_edit.cc View File

@ -20,7 +20,11 @@ enum Tag {
kDeletedFile = 6, kDeletedFile = 6,
kNewFile = 7, kNewFile = 7,
// 8 was used for large value refs // 8 was used for large value refs
kPrevLogNumber = 9
kPrevLogNumber = 9,
// 注释: 用于记录immemtable到sst的sequence
kImmLastSequence = 10,
// 注释: 用于记录恢复immemtable和memtable时在log文件中对应的位置
kLogFile = 11
}; };
void VersionEdit::Clear() { void VersionEdit::Clear() {
@ -29,12 +33,23 @@ void VersionEdit::Clear() {
prev_log_number_ = 0; prev_log_number_ = 0;
last_sequence_ = 0; last_sequence_ = 0;
next_file_number_ = 0; next_file_number_ = 0;
// 注释:重置为0
imm_last_sequence_ = 0;
// 注释:重置为0
imm_log_file_number_ = 0;
has_comparator_ = false; has_comparator_ = false;
has_log_number_ = false; has_log_number_ = false;
has_prev_log_number_ = false; has_prev_log_number_ = false;
has_next_file_number_ = false;
has_next_file_number_ = false;
has_last_sequence_ = false; has_last_sequence_ = false;
compact_pointers_.clear();
// 注释:重置为false
has_imm_last_sequence_ = false;
// compact_pointers_.clear();
deleted_files_.clear(); deleted_files_.clear();
new_files_.clear(); new_files_.clear();
} }
@ -60,6 +75,12 @@ void VersionEdit::EncodeTo(std::string* dst) const {
PutVarint32(dst, kLastSequence); PutVarint32(dst, kLastSequence);
PutVarint64(dst, last_sequence_); PutVarint64(dst, last_sequence_);
} }
// 注释:若 imm_last_sequence_ 有效,则写入对应的标识符和数据
if (has_imm_last_sequence_) {
PutVarint32(dst, kImmLastSequence);
PutVarint64(dst, imm_last_sequence_);
PutVarint64(dst, imm_log_file_number_);
}
for (size_t i = 0; i < compact_pointers_.size(); i++) { for (size_t i = 0; i < compact_pointers_.size(); i++) {
PutVarint32(dst, kCompactPointer); PutVarint32(dst, kCompactPointer);
@ -159,6 +180,16 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
} }
break; break;
// 注释:如果是kImmLastSequence类型,则解析imm_last_sequence_和imm_log_file_number_并将has_imm_last_sequence_ 设为true
case kImmLastSequence:
if (GetVarint64(&input, &imm_last_sequence_) && GetVarint64(&input, &imm_log_file_number_)) {
has_imm_last_sequence_ = true;
} else {
msg = "imemtable last sequence number";
}
break;
case kCompactPointer: case kCompactPointer:
if (GetLevel(&input, &level) && GetInternalKey(&input, &key)) { if (GetLevel(&input, &level) && GetInternalKey(&input, &key)) {
compact_pointers_.push_back(std::make_pair(level, key)); compact_pointers_.push_back(std::make_pair(level, key));

+ 27
- 0
db/version_edit.h View File

@ -26,6 +26,18 @@ struct FileMetaData {
InternalKey largest; // Largest internal key served by table InternalKey largest; // Largest internal key served by table
}; };
// vlog文件的元数据
struct LogMetaData {
LogMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) {}
int refs; //
int allowed_seeks; //0,compaction操作了; allowed_seeks的值在sstable文件加入到version时确定
uint64_t number; //;sstable文件的名字是 number.ldb
uint64_t file_size; //
InternalKey smallest; //key
InternalKey largest; //key
};
class VersionEdit { class VersionEdit {
public: public:
VersionEdit() { Clear(); } VersionEdit() { Clear(); }
@ -53,6 +65,14 @@ class VersionEdit {
has_last_sequence_ = true; has_last_sequence_ = true;
last_sequence_ = seq; last_sequence_ = seq;
} }
// imm_last_sequence_imm sst的时候用
void SetImmLastSequence(SequenceNumber seq,uint64_t fid) {
has_imm_last_sequence_ = true;
imm_last_sequence_ = seq;
imm_log_file_number_ = fid;
}
void SetCompactPointer(int level, const InternalKey& key) { void SetCompactPointer(int level, const InternalKey& key) {
compact_pointers_.push_back(std::make_pair(level, key)); compact_pointers_.push_back(std::make_pair(level, key));
} }
@ -96,6 +116,13 @@ class VersionEdit {
bool has_next_file_number_; bool has_next_file_number_;
bool has_last_sequence_; bool has_last_sequence_;
// imm_last_sequence_
bool has_imm_last_sequence_;
//log的时候 memtable immemtabl中的位置
SequenceNumber imm_last_sequence_;
//imm_last_sequence log文件
uint64_t imm_log_file_number_;
std::vector<std::pair<int, InternalKey>> compact_pointers_; std::vector<std::pair<int, InternalKey>> compact_pointers_;
DeletedFileSet deleted_files_; DeletedFileSet deleted_files_;
std::vector<std::pair<int, FileMetaData>> new_files_; std::vector<std::pair<int, FileMetaData>> new_files_;

+ 45
- 3
db/version_set.cc View File

@ -252,7 +252,15 @@ enum SaverState {
kDeleted, kDeleted,
kCorrupt, kCorrupt,
}; };
// 注释:Saver的kv对是否分离
enum SaverSeparate {
kNotSeparated,
kSeparated
};
struct Saver { struct Saver {
// 注释:初始设为不分离
SaverSeparate separate = kNotSeparated;
SaverState state; SaverState state;
const Comparator* ucmp; const Comparator* ucmp;
Slice user_key; Slice user_key;
@ -266,9 +274,12 @@ static void SaveValue(void* arg, const Slice& ikey, const Slice& v) {
s->state = kCorrupt; s->state = kCorrupt;
} else { } else {
if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) { if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) {
s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted;
// s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted;
s->state = (parsed_key.type == kTypeValue || parsed_key.type == kTypeSeparation) ? kFound : kDeleted;
if (s->state == kFound) { if (s->state == kFound) {
s->value->assign(v.data(), v.size()); s->value->assign(v.data(), v.size());
// 注释:如果key.type是kTypeSeparation,则设为kSeparated类型
s->separate = ( parsed_key.type == kTypeSeparation ) ? kSeparated : kNotSeparated;
} }
} }
} }
@ -354,6 +365,12 @@ Status Version::Get(const ReadOptions& options, const LookupKey& k,
state->s = state->vset->table_cache_->Get(*state->options, f->number, state->s = state->vset->table_cache_->Get(*state->options, f->number,
f->file_size, state->ikey, f->file_size, state->ikey,
&state->saver, SaveValue); &state->saver, SaveValue);
// 注释:对于是否kv分离,调用不同的Set函数
if( state->saver.separate == kSeparated ){
state->s.SetSeparated();
} else{
state->s.SetNotSeparated();
}
if (!state->s.ok()) { if (!state->s.ok()) {
state->found = true; state->found = true;
return false; return false;
@ -741,6 +758,10 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options,
next_file_number_(2), next_file_number_(2),
manifest_file_number_(0), // Filled by Recover() manifest_file_number_(0), // Filled by Recover()
last_sequence_(0), last_sequence_(0),
//注释:加上version_edit中添加的参数
imm_last_sequence_(0),
imm_log_file_number_(0),
save_imm_last_sequence_(false),
log_number_(0), log_number_(0),
prev_log_number_(0), prev_log_number_(0),
descriptor_file_(nullptr), descriptor_file_(nullptr),
@ -789,6 +810,11 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
edit->SetNextFile(next_file_number_); edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_); edit->SetLastSequence(last_sequence_);
// 注释:设置imm_last_sequence_和imm_log_file_number_
if( SaveImmLastSequence() ){
edit->SetImmLastSequence(imm_last_sequence_,imm_log_file_number_);
}
Version* v = new Version(this); Version* v = new Version(this);
{ {
Builder builder(this, current_); Builder builder(this, current_);
@ -892,6 +918,13 @@ Status VersionSet::Recover(bool* save_manifest) {
bool have_prev_log_number = false; bool have_prev_log_number = false;
bool have_next_file = false; bool have_next_file = false;
bool have_last_sequence = false; bool have_last_sequence = false;
//注释:重置version_edit里添加的参数
bool have_imm_last_sequence = false;
uint64_t imm_last_sequence = 0;
uint64_t imm_log_file_number = 0;
uint64_t next_file = 0; uint64_t next_file = 0;
uint64_t last_sequence = 0; uint64_t last_sequence = 0;
uint64_t log_number = 0; uint64_t log_number = 0;
@ -942,6 +975,12 @@ Status VersionSet::Recover(bool* save_manifest) {
last_sequence = edit.last_sequence_; last_sequence = edit.last_sequence_;
have_last_sequence = true; have_last_sequence = true;
} }
//注释: 构建当前的Version 回放参数
if (edit.has_imm_last_sequence_) {
imm_last_sequence = edit.imm_last_sequence_;
imm_log_file_number = edit.imm_log_file_number_;
have_imm_last_sequence = true;
}
} }
} }
delete file; delete file;
@ -975,6 +1014,9 @@ Status VersionSet::Recover(bool* save_manifest) {
last_sequence_ = last_sequence; last_sequence_ = last_sequence;
log_number_ = log_number; log_number_ = log_number;
prev_log_number_ = prev_log_number; prev_log_number_ = prev_log_number;
//注释:修改imm_last_sequence_和imm_log_file_number_
imm_last_sequence_ = imm_last_sequence;
imm_log_file_number_ = imm_log_file_number;
// See if we can reuse the existing MANIFEST file. // See if we can reuse the existing MANIFEST file.
if (ReuseManifest(dscname, current)) { if (ReuseManifest(dscname, current)) {
@ -1391,7 +1433,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
current_->GetOverlappingInputs(level + 1, &smallest, &largest, current_->GetOverlappingInputs(level + 1, &smallest, &largest,
&c->inputs_[1]); &c->inputs_[1]);
AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]);
// AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]);
// Get entire range covered by compaction // Get entire range covered by compaction
InternalKey all_start, all_limit; InternalKey all_start, all_limit;
@ -1414,7 +1456,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
std::vector<FileMetaData*> expanded1; std::vector<FileMetaData*> expanded1;
current_->GetOverlappingInputs(level + 1, &new_start, &new_limit, current_->GetOverlappingInputs(level + 1, &new_start, &new_limit,
&expanded1); &expanded1);
AddBoundaryInputs(icmp_, current_->files_[level + 1], &expanded1);
// AddBoundaryInputs(icmp_, current_->files_[level + 1], &expanded1);
if (expanded1.size() == c->inputs_[1].size()) { if (expanded1.size() == c->inputs_[1].size()) {
Log(options_->info_log, Log(options_->info_log,
"Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n", "Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n",

+ 17
- 0
db/version_set.h View File

@ -269,6 +269,15 @@ class VersionSet {
}; };
const char* LevelSummary(LevelSummaryStorage* scratch) const; const char* LevelSummary(LevelSummaryStorage* scratch) const;
//
bool SaveImmLastSequence(){ return save_imm_last_sequence_; }
bool StartImmLastSequence(bool save ){ save_imm_last_sequence_ = save; }
void SetImmLastSequence( uint64_t seq ){ imm_last_sequence_ = seq; }
uint64_t ImmLastSequence() const { return imm_last_sequence_; }
uint64_t ImmLogFileNumber() const { return imm_log_file_number_; }
void SetImmLogFileNumber( uint64_t fid ){ imm_log_file_number_ = fid; }
private: private:
class Builder; class Builder;
@ -304,6 +313,14 @@ class VersionSet {
uint64_t log_number_; uint64_t log_number_;
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted uint64_t prev_log_number_; // 0 or backing store for memtable being compacted
// immemtable转sst的sequence
uint64_t imm_last_sequence_;
// imm sst时候的sequence LogAndApply mior compact major compact的过程
bool save_imm_last_sequence_;
//imm_last_sequence log文件
uint64_t imm_log_file_number_;
// Opened lazily // Opened lazily
WritableFile* descriptor_file_; WritableFile* descriptor_file_;
log::Writer* descriptor_log_; log::Writer* descriptor_log_;

+ 98
- 0
db/vlog_reader.cc View File

@ -0,0 +1,98 @@
#include "vlog_reader.h"
#include "leveldb/env.h"
#include "util/coding.h"
namespace leveldb {
namespace log {
VlogReader::VlogReader(SequentialFile *file, Reporter* reporter)
: file_(file),
file_random_(nullptr),
reporter_(reporter),
backing_store_(new char[kBlockSize]),
buffer_(),
eof_(false),
last_record_offset_(0) {}
VlogReader::VlogReader(RandomAccessFile *file, Reporter* reporter)
: file_(nullptr),
file_random_(file),
reporter_(reporter),
backing_store_(new char[kBlockSize]),
buffer_(),
eof_(false),
last_record_offset_(0) {}
VlogReader::~VlogReader() { delete[] backing_store_; }
bool VlogReader::ReadValue(uint64_t offset, size_t length, Slice *key_value, char *scratch) {
if (file_random_ == nullptr) {
return false;
}
/* 随机读取一个 RandomAccessFile,使用对应的读接口 Read 函数 */
Status status = file_random_->Read(offset, length, key_value, scratch);
if (!status.ok()) {
return false;
}
return true;
}
bool VlogReader::ReadRecord(Slice *record, std::string *scratch) {
if (ReadPhysicalRecord(scratch)) {
*record = *scratch;
return true;
}
return false;
}
uint64_t VlogReader::LastRecordOffset() const {
return last_record_offset_;
}
void VlogReader::ReportCorruption(uint64_t bytes, const Status &reason) {
if (reporter_ != nullptr) {
reporter_->Corruption(static_cast<size_t>(bytes), reason);
}
}
bool VlogReader::ReadPhysicalRecord(std::string *result) {
result->clear();
buffer_.clear();
char* tmp_head = new char[vHeaderSize];
/* 顺序读取一个 SequentialFile,使用对应的读接口 Read 函数 */
Status status = file_->Read(vHeaderSize, &buffer_, tmp_head);
if (!status.ok()) {
buffer_.clear();
ReportCorruption(kBlockSize, status);
eof_ = true;
return false;
} else if (buffer_.size() < vHeaderSize) {
eof_ = true;
}
if (!eof_) {
result->assign(buffer_.data(),buffer_.size());
uint32_t length = DecodeFixed32(buffer_.data());
buffer_.clear();
char* tmp = new char[length];
status = file_->Read(length, &buffer_, tmp);
if (status.ok() && buffer_.size() == length) {
*result += buffer_.ToString();
} else {
eof_ = true;
}
delete [] tmp;
}
delete [] tmp_head;
if (eof_) {
result->clear();
return false;
}
return true;
}
} // namespace log
} // namespace leveldb

+ 59
- 0
db/vlog_reader.h View File

@ -0,0 +1,59 @@
#ifndef LEVELDB_VLOG_READER_H
#define LEVELDB_VLOG_READER_H
#include <cstdint>
#include "db/log_format.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
namespace leveldb {
class SequentialFile;
class RandomAccessFile;
namespace log {
class VlogReader {
public:
class Reporter {
public:
virtual ~Reporter() = default;
virtual void Corruption(size_t bytes, const Status& status) = 0;
};
//
explicit VlogReader(SequentialFile* file, Reporter* reporter);
explicit VlogReader(RandomAccessFile* file, Reporter* reporter);
VlogReader(const VlogReader&) = delete;
VlogReader& operator=(const VlogReader&) = delete;
~VlogReader();
// vlog
bool ReadValue(uint64_t offset, size_t length, Slice* key_value, char* scratch);
bool ReadRecord(Slice* record, std::string* scratch);
// ReadRecord record last_record_offset_
uint64_t LastRecordOffset() const;
private:
/* 读取 vlog 物理记录( data 部分) */
bool ReadPhysicalRecord(std::string* result);
void ReportCorruption(uint64_t bytes, const Status &reason);
SequentialFile* const file_;
RandomAccessFile* const file_random_;
Reporter* const reporter_;
char* const backing_store_;
Slice buffer_;
bool eof_; // Last Read() indicated EOF by returning < kBlockSize
// Offset of the last record returned by ReadRecord.
uint64_t last_record_offset_;
};
} // namespace log
} // namespace leveldb
#endif // LEVELDB_VLOG_READER_H

+ 42
- 0
db/vlog_writer.cc View File

@ -0,0 +1,42 @@
#include "db/vlog_writer.h"
#include <cstdint>
#include "leveldb/env.h"
#include "util/coding.h"
namespace leveldb {
namespace log {
VlogWriter::VlogWriter(WritableFile* dest) : dest_(dest), head_(0) {}
VlogWriter::VlogWriter(WritableFile* dest, uint64_t dest_length)
: dest_(dest), head_(0) {}
Status VlogWriter::AddRecord(const Slice& slice, uint64_t& offset) {
const char* ptr = slice.data();
size_t left = slice.size();
Status s;
s = EmitPhysicalRecord(ptr, left, offset);
return s;
}
Status VlogWriter::EmitPhysicalRecord(const char* ptr, size_t length,
uint64_t& offset) {
assert(length <= 0xffff);
char buf[4];
EncodeFixed32(buf, length);
Status s = dest_->Append(Slice(buf, 4));
if (s.ok()) {
s = dest_->Append(Slice(ptr, length));
if (s.ok()) {
s = dest_->Flush();
offset = head_ + 4;
head_ += 4 + length;
}
}
return s;
}
} // namespace log
} // namespace leveldb

+ 44
- 0
db/vlog_writer.h View File

@ -0,0 +1,44 @@
#ifndef LEVELDB_DB_VLOG_WRITER_H_
#define LEVELDB_DB_VLOG_WRITER_H_
#include "db/log_format.h"
#include <cstdint>
#include "leveldb/slice.h"
#include "leveldb/status.h"
namespace leveldb {
class WritableFile;
namespace log {
class VlogWriter {
public:
// Create a writer that will append data to "*dest".
// "*dest" must be initially empty.
// "*dest" must remain live while this Writer is in use.
explicit VlogWriter(WritableFile* dest);
// Create a writer that will append data to "*dest".
// "*dest" must have initial length "dest_length".
// "*dest" must remain live while this Writer is in use.
VlogWriter(WritableFile* dest, uint64_t dest_length);
VlogWriter(const VlogWriter&) = delete;
VlogWriter& operator=(const VlogWriter&) = delete;
~VlogWriter() = default;
Status AddRecord(const Slice& slice, uint64_t& offset);
private:
Status EmitPhysicalRecord(const char* ptr, size_t length, uint64_t& offset);
size_t head_;
WritableFile* dest_;
};
} // namespace log
} // namespace leveldb
#endif // LEVELDB_DB_VLOG_WRITER_H_

+ 78
- 5
db/write_batch.cc View File

@ -9,17 +9,17 @@
// record := // record :=
// kTypeValue varstring varstring | // kTypeValue varstring varstring |
// kTypeDeletion varstring // kTypeDeletion varstring
// kTypeSeparation varstring varstring
// varstring := // varstring :=
// len: varint32 // len: varint32
// data: uint8[len] // data: uint8[len]
#include "leveldb/write_batch.h" #include "leveldb/write_batch.h"
#include "db/dbformat.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "leveldb/db.h" #include "leveldb/db.h"
#include "util/coding.h"
namespace leveldb { namespace leveldb {
@ -33,6 +33,8 @@ WriteBatch::~WriteBatch() = default;
WriteBatch::Handler::~Handler() = default; WriteBatch::Handler::~Handler() = default;
void WriteBatch::Clear() { void WriteBatch::Clear() {
belong_to_gc = false;
// belong_to_gc = true;
rep_.clear(); rep_.clear();
rep_.resize(kHeader); rep_.resize(kHeader);
} }
@ -79,6 +81,65 @@ Status WriteBatch::Iterate(Handler* handler) const {
} }
} }
Status WriteBatch::Iterate(Handler* handler, uint64_t fid,
uint64_t offset) const {
Slice input(rep_);
const char* begin = input.data();
if (input.size() < kHeader) {
return Status::Corruption("malformed WriteBatch (too small)");
}
input.remove_prefix(kHeader);
Slice key, value;
int found = 0;
while (!input.empty()) {
found++;
const uint64_t kv_offset = input.data() - begin + offset;
assert(kv_offset > 0);
char tag = input[0];
input.remove_prefix(1);
switch (tag) {
case kTypeValue:
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
handler->Put(key, value);
} else {
return Status::Corruption("bad WriteBatch Put");
}
break;
case kTypeDeletion:
if (GetLengthPrefixedSlice(&input, &key)) {
handler->Delete(key);
} else {
return Status::Corruption("bad WriteBatch Delete");
}
break;
case kTypeSeparation:
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &(value))) {
// value = fileNumber + offset + valuesize 采用变长编码的方式
std::string dest;
PutVarint64(&dest, fid);
PutVarint64(&dest, kv_offset);
PutVarint64(&dest, value.size());
Slice value_offset(dest);
handler->Put(key, value_offset, kTypeSeparation);
} else {
return Status::Corruption("WriteBatch Put error");
}
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
}
if (found != WriteBatchInternal::Count(this)) {
return Status::Corruption("WriteBatch has wrong count");
} else {
return Status::OK();
}
}
int WriteBatchInternal::Count(const WriteBatch* b) { int WriteBatchInternal::Count(const WriteBatch* b) {
return DecodeFixed32(b->rep_.data() + 8); return DecodeFixed32(b->rep_.data() + 8);
} }
@ -97,7 +158,11 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
void WriteBatch::Put(const Slice& key, const Slice& value) { void WriteBatch::Put(const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
if (value.size() >= separate_threshold_) {
rep_.push_back(static_cast<char>(kTypeSeparation));
} else {
rep_.push_back(static_cast<char>(kTypeValue));
}
PutLengthPrefixedSlice(&rep_, key); PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value); PutLengthPrefixedSlice(&rep_, value);
} }
@ -118,8 +183,8 @@ class MemTableInserter : public WriteBatch::Handler {
SequenceNumber sequence_; SequenceNumber sequence_;
MemTable* mem_; MemTable* mem_;
void Put(const Slice& key, const Slice& value) override {
mem_->Add(sequence_, kTypeValue, key, value);
void Put(const Slice& key, const Slice& value, ValueType type = kTypeValue) override {
mem_->Add(sequence_, type, key, value);
sequence_++; sequence_++;
} }
void Delete(const Slice& key) override { void Delete(const Slice& key) override {
@ -136,6 +201,14 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
return b->Iterate(&inserter); return b->Iterate(&inserter);
} }
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable,
uint64_t fid, size_t offset) {
MemTableInserter inserter;
inserter.sequence_ = WriteBatchInternal::Sequence(b);
inserter.mem_ = memtable;
return b->Iterate(&inserter, fid, offset);
}
void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
assert(contents.size() >= kHeader); assert(contents.size() >= kHeader);
b->rep_.assign(contents.data(), contents.size()); b->rep_.assign(contents.data(), contents.size());

+ 2
- 0
db/write_batch_internal.h View File

@ -37,6 +37,8 @@ class WriteBatchInternal {
static Status InsertInto(const WriteBatch* batch, MemTable* memtable); static Status InsertInto(const WriteBatch* batch, MemTable* memtable);
static Status InsertInto(const WriteBatch* batch, MemTable* memtable,uint64_t fid, size_t offset);
static void Append(WriteBatch* dst, const WriteBatch* src); static void Append(WriteBatch* dst, const WriteBatch* src);
}; };

+ 7
- 2
helpers/memenv/memenv.cc View File

@ -199,17 +199,22 @@ class RandomAccessFileImpl : public RandomAccessFile {
class WritableFileImpl : public WritableFile { class WritableFileImpl : public WritableFile {
public: public:
WritableFileImpl(FileState* file) : file_(file) { file_->Ref(); }
WritableFileImpl(FileState* file) : file_(file), file_size_(0) { file_->Ref(); }
~WritableFileImpl() override { file_->Unref(); } ~WritableFileImpl() override { file_->Unref(); }
Status Append(const Slice& data) override { return file_->Append(data); }
Status Append(const Slice& data) override {
file_size_+= data.size();
return file_->Append(data);
}
Status Close() override { return Status::OK(); } Status Close() override { return Status::OK(); }
Status Flush() override { return Status::OK(); } Status Flush() override { return Status::OK(); }
Status Sync() override { return Status::OK(); } Status Sync() override { return Status::OK(); }
size_t GetSize() override { return file_size_; }
private: private:
int file_size_;
FileState* file_; FileState* file_;
}; };

BIN
image/kv_sep.png View File

Before After
Width: 1010  |  Height: 711  |  Size: 93 KiB

BIN
image/kv_test.png View File

Before After
Width: 503  |  Height: 29  |  Size: 2.5 KiB

BIN
image/test_1.jpg View File

Before After
Width: 501  |  Height: 103  |  Size: 8.8 KiB

BIN
image/test_2.jpg View File

Before After
Width: 1104  |  Height: 631  |  Size: 83 KiB

BIN
image/value_field_test.png View File

Before After
Width: 498  |  Height: 32  |  Size: 2.6 KiB

BIN
image/version_1.jpg View File

Before After
Width: 756  |  Height: 453  |  Size: 38 KiB

BIN
image/version_2.jpg View File

Before After
Width: 733  |  Height: 176  |  Size: 9.9 KiB

BIN
image/version_3.jpg View File

Before After
Width: 750  |  Height: 435  |  Size: 28 KiB

BIN
image/vlog.png View File

Before After
Width: 902  |  Height: 357  |  Size: 45 KiB

BIN
image/write-badger.png View File

Before After
Width: 3220  |  Height: 1695  |  Size: 252 KiB

+ 6
- 2
include/leveldb/db.h View File

@ -22,6 +22,7 @@ struct Options;
struct ReadOptions; struct ReadOptions;
struct WriteOptions; struct WriteOptions;
class WriteBatch; class WriteBatch;
class Fields;
// Abstract handle to particular state of a DB. // Abstract handle to particular state of a DB.
// A Snapshot is an immutable object and can therefore be safely // A Snapshot is an immutable object and can therefore be safely
@ -60,6 +61,9 @@ class LEVELDB_EXPORT DB {
virtual ~DB(); virtual ~DB();
virtual Status PutFields(const WriteOptions&, const Slice& key, const Fields& fields) = 0;
virtual Status GetFields(const ReadOptions& options, const Slice& key, Fields& fields) = 0;
// Set the database entry for "key" to "value". Returns OK on success, // Set the database entry for "key" to "value". Returns OK on success,
// and a non-OK status on error. // and a non-OK status on error.
// Note: consider setting options.sync = true. // Note: consider setting options.sync = true.
@ -148,8 +152,8 @@ class LEVELDB_EXPORT DB {
// ----------------------------For TTL----------------------------- // ----------------------------For TTL-----------------------------
// key设置ttl // key设置ttl
virtual Status Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) = 0;
// virtual Status Put(const WriteOptions& options, const Slice& key,
// const Slice& value, uint64_t ttl) = 0;
}; };
// Destroy the contents of the specified database. // Destroy the contents of the specified database.

+ 7
- 0
include/leveldb/env.h View File

@ -197,6 +197,7 @@ class LEVELDB_EXPORT Env {
// serialized. // serialized.
virtual void Schedule(void (*function)(void* arg), void* arg) = 0; virtual void Schedule(void (*function)(void* arg), void* arg) = 0;
virtual void ScheduleForGarbageCollection(void (*function)(void* arg), void* arg) = 0;
// Start a new thread, invoking "function(arg)" within the new thread. // Start a new thread, invoking "function(arg)" within the new thread.
// When "function(arg)" returns, the thread will be destroyed. // When "function(arg)" returns, the thread will be destroyed.
virtual void StartThread(void (*function)(void* arg), void* arg) = 0; virtual void StartThread(void (*function)(void* arg), void* arg) = 0;
@ -287,6 +288,7 @@ class LEVELDB_EXPORT WritableFile {
virtual Status Close() = 0; virtual Status Close() = 0;
virtual Status Flush() = 0; virtual Status Flush() = 0;
virtual Status Sync() = 0; virtual Status Sync() = 0;
virtual size_t GetSize() = 0;
}; };
// An interface for writing log messages. // An interface for writing log messages.
@ -384,6 +386,11 @@ class LEVELDB_EXPORT EnvWrapper : public Env {
void Schedule(void (*f)(void*), void* a) override { void Schedule(void (*f)(void*), void* a) override {
return target_->Schedule(f, a); return target_->Schedule(f, a);
} }
void ScheduleForGarbageCollection(void (*f)(void*), void* a) override{
return target_->ScheduleForGarbageCollection(f, a);
}
void StartThread(void (*f)(void*), void* a) override { void StartThread(void (*f)(void*), void* a) override {
return target_->StartThread(f, a); return target_->StartThread(f, a);
} }

+ 15
- 1
include/leveldb/options.h View File

@ -6,6 +6,7 @@
#define STORAGE_LEVELDB_INCLUDE_OPTIONS_H_ #define STORAGE_LEVELDB_INCLUDE_OPTIONS_H_
#include <cstddef> #include <cstddef>
#include <cstdint>
#include "leveldb/export.h" #include "leveldb/export.h"
@ -145,6 +146,16 @@ struct LEVELDB_EXPORT Options {
// Many applications will benefit from passing the result of // Many applications will benefit from passing the result of
// NewBloomFilterPolicy() here. // NewBloomFilterPolicy() here.
const FilterPolicy* filter_policy = nullptr; const FilterPolicy* filter_policy = nullptr;
/* 注释:重要的 VLog 与 GC 设置 */
// value log
uint64_t max_value_log_size = 16 * 1024 * 1024;
// VLog gc
uint64_t garbage_collection_threshold = max_value_log_size / 4;
// gc put kv
uint64_t background_garbage_collection_separate_ = 1024 * 1024 - 1;
// open vlog
bool start_garbage_collection = false;
}; };
// Options that control read operations // Options that control read operations
@ -166,7 +177,9 @@ struct LEVELDB_EXPORT ReadOptions {
// Options that control write operations // Options that control write operations
struct LEVELDB_EXPORT WriteOptions { struct LEVELDB_EXPORT WriteOptions {
WriteOptions() = default;
explicit WriteOptions(size_t separateThreshold = 32)
: separate_threshold(separateThreshold) {}
// WriteOptions() = default;
// If true, the write will be flushed from the operating system // If true, the write will be flushed from the operating system
// buffer cache (by calling WritableFile::Sync()) before the write // buffer cache (by calling WritableFile::Sync()) before the write
@ -182,6 +195,7 @@ struct LEVELDB_EXPORT WriteOptions {
// crash semantics as the "write()" system call. A DB write // crash semantics as the "write()" system call. A DB write
// with sync==true has similar crash semantics to a "write()" // with sync==true has similar crash semantics to a "write()"
// system call followed by "fsync()". // system call followed by "fsync()".
size_t separate_threshold ;
bool sync = false; bool sync = false;
}; };

+ 13
- 1
include/leveldb/status.h View File

@ -71,6 +71,12 @@ class LEVELDB_EXPORT Status {
// Returns true iff the status indicates an InvalidArgument. // Returns true iff the status indicates an InvalidArgument.
bool IsInvalidArgument() const { return code() == kInvalidArgument; } bool IsInvalidArgument() const { return code() == kInvalidArgument; }
bool IsSeparated() const { return code_ == kSeparated; }
void SetSeparated() { code_ = kSeparated; }
void SetNotSeparated() { code_ = kNotSeparated; }
// Return a string representation of this status suitable for printing. // Return a string representation of this status suitable for printing.
// Returns the string "OK" for success. // Returns the string "OK" for success.
std::string ToString() const; std::string ToString() const;
@ -82,9 +88,12 @@ class LEVELDB_EXPORT Status {
kCorruption = 2, kCorruption = 2,
kNotSupported = 3, kNotSupported = 3,
kInvalidArgument = 4, kInvalidArgument = 4,
kIOError = 5
kIOError = 5,
kSeparated = 6,
kNotSeparated = 7
}; };
Code code_;
Code code() const { Code code() const {
return (state_ == nullptr) ? kOk : static_cast<Code>(state_[4]); return (state_ == nullptr) ? kOk : static_cast<Code>(state_[4]);
} }
@ -101,6 +110,7 @@ class LEVELDB_EXPORT Status {
}; };
inline Status::Status(const Status& rhs) { inline Status::Status(const Status& rhs) {
code_ = rhs.code_;
state_ = (rhs.state_ == nullptr) ? nullptr : CopyState(rhs.state_); state_ = (rhs.state_ == nullptr) ? nullptr : CopyState(rhs.state_);
} }
inline Status& Status::operator=(const Status& rhs) { inline Status& Status::operator=(const Status& rhs) {
@ -108,12 +118,14 @@ inline Status& Status::operator=(const Status& rhs) {
// and the common case where both rhs and *this are ok. // and the common case where both rhs and *this are ok.
if (state_ != rhs.state_) { if (state_ != rhs.state_) {
delete[] state_; delete[] state_;
code_ = rhs.code_;
state_ = (rhs.state_ == nullptr) ? nullptr : CopyState(rhs.state_); state_ = (rhs.state_ == nullptr) ? nullptr : CopyState(rhs.state_);
} }
return *this; return *this;
} }
inline Status& Status::operator=(Status&& rhs) noexcept { inline Status& Status::operator=(Status&& rhs) noexcept {
std::swap(state_, rhs.state_); std::swap(state_, rhs.state_);
std::swap(code_ , rhs.code_);
return *this; return *this;
} }

+ 12
- 2
include/leveldb/write_batch.h View File

@ -25,6 +25,7 @@
#include "leveldb/export.h" #include "leveldb/export.h"
#include "leveldb/status.h" #include "leveldb/status.h"
#include "db/dbformat.h"
namespace leveldb { namespace leveldb {
@ -35,11 +36,13 @@ class LEVELDB_EXPORT WriteBatch {
class LEVELDB_EXPORT Handler { class LEVELDB_EXPORT Handler {
public: public:
virtual ~Handler(); virtual ~Handler();
virtual void Put(const Slice& key, const Slice& value) = 0;
virtual void Put(const Slice& key, const Slice& value, ValueType type = kTypeValue) = 0;
virtual void Delete(const Slice& key) = 0; virtual void Delete(const Slice& key) = 0;
}; };
WriteBatch(); WriteBatch();
explicit WriteBatch(size_t separate_threshold)
: separate_threshold_(separate_threshold) { Clear(); }
// Intentionally copyable. // Intentionally copyable.
WriteBatch(const WriteBatch&) = default; WriteBatch(const WriteBatch&) = default;
@ -71,11 +74,18 @@ class LEVELDB_EXPORT WriteBatch {
// Support for iterating over the contents of a batch. // Support for iterating over the contents of a batch.
Status Iterate(Handler* handler) const; Status Iterate(Handler* handler) const;
Status Iterate(Handler* handler, uint64_t fid, uint64_t offset) const;
bool IsGarbageCollection() { return belong_to_gc; }
void setGarbageCollection(bool is_gc) { belong_to_gc = is_gc; }
private: private:
friend class WriteBatchInternal; friend class WriteBatchInternal;
size_t separate_threshold_;
std::string rep_; // See comment in write_batch.cc for the format of rep_ std::string rep_; // See comment in write_batch.cc for the format of rep_
bool belong_to_gc;
}; };
} // namespace leveldb } // namespace leveldb

+ 1
- 0
table/table_test.cc View File

@ -97,6 +97,7 @@ class StringSink : public WritableFile {
Status Close() override { return Status::OK(); } Status Close() override { return Status::OK(); }
Status Flush() override { return Status::OK(); } Status Flush() override { return Status::OK(); }
Status Sync() override { return Status::OK(); } Status Sync() override { return Status::OK(); }
size_t GetSize(){ return contents_.size(); }
Status Append(const Slice& data) override { Status Append(const Slice& data) override {
contents_.append(data.data(), data.size()); contents_.append(data.data(), data.size());

+ 169
- 0
test/bench_test.cc View File

@ -0,0 +1,169 @@
#include <iostream>
#include <gtest/gtest.h>
#include <chrono>
#include <vector>
#include "leveldb/env.h"
#include "leveldb/db.h"
#include "db/fields.h"
#include "leveldb/write_batch.h"
using namespace leveldb;
// Number of key/values to operate in database
constexpr int num_ = 500000;
// Size of each value
constexpr int value_size_ = 1024;
// Number of read operations
constexpr int reads_ = 500000;
Status OpenDB(std::string dbName, DB **db) {
Options options;
options.create_if_missing = true;
return DB::Open(options, dbName, db);
}
void InsertData(DB *db, std::vector<int64_t> &lats) {
WriteOptions writeOptions;
srand(0);
for (int i = 0; i < num_; ++i) {
int key_ = rand() % num_ + 1;
std::string key = std::to_string(key_);
std::string value(value_size_, 'a');
auto start_time = std::chrono::steady_clock::now();
db->Put(writeOptions, key, value);
auto end_time = std::chrono::steady_clock::now();
lats.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count());
}
}
void InsertFields(DB *db, std::vector<int64_t> &lats) {
WriteOptions writeOptions;
srand(0);
for (int i = 0; i < num_; ++i) {
int key_ = rand() % num_ + 1;
std::string key = std::to_string(key_);
FieldArray fields = {{"field", "old_value_"}};
Fields f(fields);
auto start_time = std::chrono::steady_clock::now();
db->PutFields(writeOptions, Slice(key), f);
auto end_time = std::chrono::steady_clock::now();
lats.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count());
}
}
void GetData(DB *db, std::vector<int64_t> &lats) {
ReadOptions readOptions;
srand(0);
for (int i = 0; i < reads_; ++i) {
int key_ = rand() % num_ + 1;
std::string key = std::to_string(key_);
std::string value;
auto start_time = std::chrono::steady_clock::now();
db->Get(readOptions, key, &value);
auto end_time = std::chrono::steady_clock::now();
lats.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count());
}
}
void ReadOrdered(DB *db, std::vector<int64_t> &lats) {
Iterator* iter = db->NewIterator(ReadOptions());
int i = 0;
for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
++i;
auto start_time = std::chrono::steady_clock::now();
// Just iterating over the data without performing any operation.
auto end_time = std::chrono::steady_clock::now();
lats.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count());
}
delete iter;
}
void FindKeys(DB *db, std::vector<int64_t> &lats) {
srand(0);
for (int i = 0; i < reads_; ++i) {
int key_ = rand() % num_ + 1;
FieldArray fields_to_find = {{"field", "old_value_" }};
auto start_time = std::chrono::steady_clock::now();
std::string dbname_ = "bench_resr_db";
Options options;
options.create_if_missing = true;
DBImpl* impl = new DBImpl(options, dbname_);
Fields::FindKeysByFields(db, fields_to_find, impl);
auto end_time = std::chrono::steady_clock::now();
lats.emplace_back(std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count());
}
}
double CalculatePercentile(const std::vector<int64_t>& latencies, double percentile) {
if (latencies.empty()) return 0.0;
std::vector<int64_t> sorted_latencies = latencies;
std::sort(sorted_latencies.begin(), sorted_latencies.end());
size_t index = static_cast<size_t>(percentile * sorted_latencies.size());
if (index >= sorted_latencies.size()) index = sorted_latencies.size() - 1;
return sorted_latencies[index];
}
void SetupData(DB *db) {
std::vector<int64_t> lats;
InsertData(db, lats);
}
void SetupFields(DB *db) {
std::vector<int64_t> lats;
InsertFields(db, lats);
}
template<typename Func>
void RunBenchmark(const char* name, Func func, bool setup_data = true, bool setup_fields = false) {
DB *db;
std::string rm_command = "rm -rf testdb_bench";
system(rm_command.c_str());
if (!OpenDB("testdb_bench", &db).ok()) {
std::cerr << "open db failed" << std::endl;
abort();
}
if (setup_data) SetupData(db);
if (setup_fields) SetupFields(db);
std::vector<int64_t> lats;
auto start_time = std::chrono::steady_clock::now();
func(db, lats);
auto end_time = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();
double avg = 0.0;
for (auto latency : lats) {
avg += latency;
}
avg /= lats.size();
double p75 = CalculatePercentile(lats, 0.75);
double p99 = CalculatePercentile(lats, 0.99);
std::cout << name << " Latency (avg, P75, P99): " << avg << " micros/op, " << p75 << " micros/op, " << p99 << " micros/op" << std::endl;
std::cout << name << " Throughput: " << lats.size() / duration << " ops/ms" << std::endl;
delete db;
}
TEST(BenchTest, PutLatency) { RunBenchmark("Put", InsertData, false, false); }
TEST(BenchTest, PutFieldsLatency) { RunBenchmark("PutFields", InsertFields, false, false); }
TEST(BenchTest, GetLatency) { RunBenchmark("Get", GetData, true, false); }
TEST(BenchTest, IteratorLatency) { RunBenchmark("Iterator", ReadOrdered, true, false); }
TEST(BenchTest, FindKeysByFieldLatency) {
RunBenchmark("FindKeysByFields", FindKeys, false, true);
}
int main(int argc, char **argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

+ 82
- 0
test/kv_test.cc View File

@ -0,0 +1,82 @@
#include <chrono>
#include "gtest/gtest.h"
#include "leveldb/env.h"
#include "leveldb/db.h"
using namespace leveldb;
constexpr int short_value_size = 4;
constexpr int long_value_size = 64;
constexpr int data_size = 512;
Status OpenDB(std::string dbName, DB **db) {
Options options;
options.create_if_missing = true;
return DB::Open(options, dbName, db);
}
void InsertData(DB *db, int value_size) {
WriteOptions writeOptions;
int key_num = data_size / value_size;
srand(42);
for (int i = 0; i < key_num; i++) {
int key_ = i;
std::string key = std::to_string(key_);
std::string value(value_size, 'a');
db->Put(writeOptions, key, value);
}
}
TEST(TestKV, GetValue) {
DB *db;
if(OpenDB("testdb_TestKV_short_value", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
InsertData(db, short_value_size);
ReadOptions readOptions;
Status status;
int key_num = data_size / short_value_size;
srand(42);
for (int i = 0; i < key_num; i++) {
// int key_ = rand() % key_num+1;
std::string key = std::to_string(i);
std::string value;
std::string expected_value(short_value_size, 'a');
status = db->Get(readOptions, key, &value);
// std::cout << key << std::endl;
ASSERT_TRUE(status.ok());
EXPECT_EQ(expected_value, value);
}
}
TEST(TestKV, GetLongValue) {
DB *db;
if(OpenDB("testdb_TestKV_long_value", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
InsertData(db, long_value_size);
ReadOptions readOptions;
Status status;
int key_num = data_size / long_value_size;
for (int i = 0; i < key_num; i++) {
std::string key = std::to_string(i);
std::string value;
std::string expected_value(long_value_size, 'a');
status = db->Get(readOptions, key, &value);
ASSERT_TRUE(status.ok());
EXPECT_EQ(expected_value, value);
}
}
int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits.
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

+ 1
- 1
test/ttl_test.cc View File

@ -59,7 +59,7 @@ TEST(TestTTL, ReadTTL) {
Status status; Status status;
int key_num = data_size / value_size; int key_num = data_size / value_size;
srand(0); srand(0);
for (int i = 0; i < 100; i++) {
for (int i = 14; i < 100; i++) {
int key_ = rand() % key_num+1; int key_ = rand() % key_num+1;
std::string key = std::to_string(key_); std::string key = std::to_string(key_);
std::string value; std::string value;

+ 253
- 0
test/value_field_test.cc View File

@ -0,0 +1,253 @@
#include "gtest/gtest.h"
#include "leveldb/db.h"
#include "db/fields.h"
#include "leveldb/write_batch.h"
using namespace leveldb;
Status OpenDB(const std::string& dbName, DB** db) {
// 如果数据库已经存在,则删除它。
// std::string rm_command = "rm -rf " + dbName;
// system(rm_command.c_str());
Options options;
options.create_if_missing = true;
return DB::Open(options, dbName, db);
}
class FieldsTest : public ::testing::Test {
protected:
void SetUp() override {
Status s = OpenDB(dbname_ , &db_);
EXPECT_TRUE(s.ok()) << "Failed to open database: " << s.ToString();
}
void TearDown() override {
delete db_;
db_ = nullptr;
}
DB* db_ = nullptr; // 数据库实例指针。
std::string dbname_ = "testdb_field"; // 记录数据库路径
};
// 测试各种构造函数
TEST_F(FieldsTest, TestConstructors) {
// 单个 Field 构造
Fields f_single(Field("single", "value"));
EXPECT_EQ(f_single.size(), 1);
EXPECT_TRUE(f_single.HasField("single"));
// FieldArray 构造
FieldArray fields = {{"array1", "value1"}, {"array2", "value2"}};
Fields f_array(fields);
EXPECT_EQ(f_array.size(), 2);
EXPECT_TRUE(f_array.HasField("array1"));
EXPECT_TRUE(f_array.HasField("array2"));
// field_names 数组构造
std::vector<std::string> field_names = {"name1", "name2"};
Fields f_names(field_names);
EXPECT_EQ(f_names.size(), 2);
}
// 测试构造函数内的SortFields的实现
TEST_F(FieldsTest, TestSortFields) {
// 准备一组未排序的字段数据
FieldArray unsorted_fields = {
{"field3", "value3"},
{"field1", "value1"},
{"field2", "value2"},
{"field5", "value5"},
{"field4", "value4"}
};
// 创建 Fields 对象,构造函数应该自动调用 SortFields
Fields f(unsorted_fields);
// 验证字段是否已经正确排序
EXPECT_TRUE(std::is_sorted(f.begin(), f.end(),
[](const Field& lhs, const Field& rhs) {
return lhs.first < rhs.first;
})) << "Fields are not sorted after constructor.";
// 验证排序后的字段顺序是否符合预期
std::vector<std::string> expected_order = {"field1", "field2", "field3", "field4", "field5"};
size_t index = 0;
for (const auto& field : f) {
EXPECT_EQ(field.first, expected_order[index++]) << "Field order is incorrect after constructor sorting.";
}
}
// 测试 operator[] 访问功能
TEST_F(FieldsTest, TestOperatorBracketAccess) {
// 创建一个 Fields 对象并添加一些字段
FieldArray fields = {{"field1", "value1"}, {"field2", "value2"}};
Fields f(fields);
// 使用 operator[] 来获取字段值
EXPECT_EQ(f["field1"], "value1");
EXPECT_EQ(f["field2"], "value2");
// 尝试获取不存在的字段,应该返回空字符串
testing::internal::CaptureStderr();
EXPECT_EQ(f["nonexistent_field"], "");
}
// 测试 operator[] 更新功能
TEST_F(FieldsTest, TestOperatorBracketUpdate) {
// 创建一个 Fields 对象并添加一些字段
Fields f;
// 使用 operator[] 来设置字段值(字段不存在时应插入)
f["field1"] = "value1";
EXPECT_EQ(f["field1"], "value1");
// 更新已存在的字段值
f["field1"] = "new_value1";
EXPECT_EQ(f["field1"], "new_value1");
// 插入多个新字段
f["field2"] = "value2";
f["field3"] = "value3";
// 验证所有字段都已正确插入
EXPECT_EQ(f.size(), 3);
EXPECT_EQ(f["field1"], "new_value1");
EXPECT_EQ(f["field2"], "value2");
EXPECT_EQ(f["field3"], "value3");
}
// 测试批量删除功能
TEST_F(FieldsTest, TestBulkDelete) {
const size_t num_fields = 1000;
leveldb::WriteBatch batch;
// 准备大量字段数据,并通过 PutFields 插入到数据库
for (size_t i = 0; i < num_fields; ++i) {
std::string key = "key_" + std::to_string(i);
FieldArray fields = {{"field" + std::to_string(i), "value_" + std::to_string(i)}};
Fields f(fields);
Status status = db_->PutFields(WriteOptions(), Slice(key), f);
EXPECT_TRUE(status.ok()) << "Failed to put fields for key: " << key;
}
// 批量删除一半的字段
for (size_t i = 0; i < num_fields / 2; ++i) {
std::string key = "key_" + std::to_string(i);
Status status = db_->Delete(WriteOptions(), key);
EXPECT_TRUE(status.ok()) << "Failed to delete key: " << key;
}
// 验证删除后的字段数量和内容
for (size_t i = 0; i < num_fields; ++i) {
std::string key = "key_" + std::to_string(i);
Fields fields;
Status status = db_->GetFields(ReadOptions(), Slice(key), fields);
if (i < num_fields / 2) {
EXPECT_FALSE(status.ok()) << "Deleted key still exists: " << key;
} else {
EXPECT_TRUE(status.ok()) << "Missing non-deleted key: " << key;
auto field_value = fields.GetField("field" + std::to_string(i));
EXPECT_EQ(field_value.second, "value_" + std::to_string(i)) << "Incorrect value for non-deleted field: " << key;
}
}
}
// 测试批量更新操作
TEST_F(FieldsTest, TestBulkUpdate) {
const size_t num_fields = 500;
leveldb::WriteBatch batch;
// 准备大量字段数据,并通过 PutFields 插入到数据库
for (size_t i = 0; i < num_fields; ++i) {
std::string key = "key_" + std::to_string(i);
FieldArray fields = {{"field" + std::to_string(i), "old_value_" + std::to_string(i)}};
Fields f(fields);
Status status = db_->PutFields(WriteOptions(), Slice(key), f);
EXPECT_TRUE(status.ok()) << "Failed to put fields for key: " << key;
}
// 批量更新一半的字段
for (size_t i = 0; i < num_fields / 2; ++i) {
std::string key = "key_" + std::to_string(i);
FieldArray update_fields = {{"field" + std::to_string(i), "new_value_" + std::to_string(i)}};
Fields f(update_fields);
Status status = db_->PutFields(WriteOptions(), Slice(key), f);
EXPECT_TRUE(status.ok()) << "Failed to update fields for key: " << key;
}
// 验证更新后的字段值
for (size_t i = 0; i < num_fields; ++i) {
std::string key = "key_" + std::to_string(i);
Fields fields;
Status status = db_->GetFields(ReadOptions(), Slice(key), fields);
EXPECT_TRUE(status.ok()) << "Failed to read key: " << key;
auto field_value = fields.GetField("field" + std::to_string(i));
auto expected_value = (i < num_fields / 2) ? ("new_value_" + std::to_string(i)) : ("old_value_" + std::to_string(i));
EXPECT_EQ(field_value.second, expected_value) << "Incorrect value for updated field: " << key;
}
}
// 测试批量插入、序列化/反序列化、删除以及 FindKeysByFields 功能
TEST_F(FieldsTest, TestBulkInsertSerializeDeleteAndFindKeys) {
const size_t num_entries = 500;
// 准备大量键值对数据,并通过 PutFields 插入到数据库
for (size_t i = num_entries; i > 0; --i) {
std::string key = "key_" + std::to_string(i);
FieldArray fields = {{"field1", "value1_" + std::to_string(i)}, {"field2", "value2_"}};
Fields ffields(fields);
Status status = db_->PutFields(WriteOptions(), Slice(key), ffields);
EXPECT_TRUE(status.ok()) << "Failed to put fields for key: " << key << ", error: " << status.ToString();
}
// 验证插入的数据是否正确
for (size_t i = 1; i <= num_entries; ++i) {
std::string key = "key_" + std::to_string(i);
Fields fields;
Status status = db_->GetFields(ReadOptions(), Slice(key), fields);
EXPECT_TRUE(status.ok()) << "Failed to read key: " << key << ", error: " << status.ToString();
// 使用 GetField 方法验证字段值
auto field1_value = fields.GetField("field1");
auto field2_value = fields.GetField("field2");
EXPECT_EQ(field1_value.second, "value1_" + std::to_string(i)) << "Incorrect value for field1 in key: " << key;
EXPECT_EQ(field2_value.second, "value2_") << "Incorrect value for field2 in key: " << key;
}
// 使用 Delete 删除第一个键值对
Status status = db_->Delete(WriteOptions(), "key_1");
EXPECT_TRUE(status.ok()) << "Failed to delete key: key_1, error: " << status.ToString();
// 使用 FindKeysByFields 查找包含特定字段的键
FieldArray fields_to_find = {{"field2", "value2_"}};
Options options;
options.create_if_missing = true;
DBImpl* impl = new DBImpl(options, dbname_);
std::vector<std::string> found_keys = Fields::FindKeysByFields(db_, fields_to_find, impl);
// 验证找到的键是否正确
EXPECT_EQ(found_keys.size(), num_entries - 1) << "Expected " << num_entries - 1 << " keys but found " << found_keys.size();
// for (size_t i = 2; i <= num_entries; ++i) {
// std::string expected_key = "key_" + std::to_string(i);
// EXPECT_TRUE(std::find(found_keys.begin(), found_keys.end(), expected_key) != found_keys.end())
// << "Key not found: " << expected_key;
// }
// 再次查找,这次没有符合条件的字段
FieldArray no_match_fields = {{"nonexistent_field", ""}};
found_keys = Fields::FindKeysByFields(db_, no_match_fields, impl);
EXPECT_TRUE(found_keys.empty()) << "Expected an empty result for non-matching fields.";
}
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

+ 59
- 0
util/env_posix.cc View File

@ -279,6 +279,7 @@ class PosixWritableFile final : public WritableFile {
PosixWritableFile(std::string filename, int fd) PosixWritableFile(std::string filename, int fd)
: pos_(0), : pos_(0),
fd_(fd), fd_(fd),
file_size_(0),
is_manifest_(IsManifest(filename)), is_manifest_(IsManifest(filename)),
filename_(std::move(filename)), filename_(std::move(filename)),
dirname_(Dirname(filename_)) {} dirname_(Dirname(filename_)) {}
@ -290,10 +291,14 @@ class PosixWritableFile final : public WritableFile {
} }
} }
size_t GetSize() { return file_size_; }
Status Append(const Slice& data) override { Status Append(const Slice& data) override {
size_t write_size = data.size(); size_t write_size = data.size();
const char* write_data = data.data(); const char* write_data = data.data();
file_size_ += write_size;
// Fit as much as possible into buffer. // Fit as much as possible into buffer.
size_t copy_size = std::min(write_size, kWritableFileBufferSize - pos_); size_t copy_size = std::min(write_size, kWritableFileBufferSize - pos_);
std::memcpy(buf_ + pos_, write_data, copy_size); std::memcpy(buf_ + pos_, write_data, copy_size);
@ -459,6 +464,8 @@ class PosixWritableFile final : public WritableFile {
size_t pos_; size_t pos_;
int fd_; int fd_;
int file_size_;
const bool is_manifest_; // True if the file's name starts with MANIFEST. const bool is_manifest_; // True if the file's name starts with MANIFEST.
const std::string filename_; const std::string filename_;
const std::string dirname_; // The directory of filename_. const std::string dirname_; // The directory of filename_.
@ -691,6 +698,9 @@ class PosixEnv : public Env {
void Schedule(void (*background_work_function)(void* background_work_arg), void Schedule(void (*background_work_function)(void* background_work_arg),
void* background_work_arg) override; void* background_work_arg) override;
void ScheduleForGarbageCollection(void (*background_work_function)(void* background_work_arg),
void* background_work_arg) override;
void StartThread(void (*thread_main)(void* thread_main_arg), void StartThread(void (*thread_main)(void* thread_main_arg),
void* thread_main_arg) override { void* thread_main_arg) override {
std::thread new_thread(thread_main, thread_main_arg); std::thread new_thread(thread_main, thread_main_arg);
@ -746,11 +756,16 @@ class PosixEnv : public Env {
private: private:
void BackgroundThreadMain(); void BackgroundThreadMain();
void BackgroundThreadMainGarbageCollection();
static void BackgroundThreadEntryPoint(PosixEnv* env) { static void BackgroundThreadEntryPoint(PosixEnv* env) {
env->BackgroundThreadMain(); env->BackgroundThreadMain();
} }
static void BackgroundThreadEntryPointforGlobalCollection(PosixEnv* env) {
env->BackgroundThreadMainGarbageCollection();
}
// Stores the work item data in a Schedule() call. // Stores the work item data in a Schedule() call.
// //
// Instances are constructed on the thread calling Schedule() and used on the // Instances are constructed on the thread calling Schedule() and used on the
@ -772,6 +787,14 @@ class PosixEnv : public Env {
std::queue<BackgroundWorkItem> background_work_queue_ std::queue<BackgroundWorkItem> background_work_queue_
GUARDED_BY(background_work_mutex_); GUARDED_BY(background_work_mutex_);
// begin 注释:GC 线程互斥锁
port::Mutex background_GlobalCollection_work_mutex_;
port::CondVar background_GlobalCollection_work_cv_ GUARDED_BY(background_GlobalCollection_work_mutex_);
std::queue<BackgroundWorkItem> background_GlobalCollection_work_queue_
GUARDED_BY(background_GlobalCollection_work_mutex_);
// end
PosixLockTable locks_; // Thread-safe. PosixLockTable locks_; // Thread-safe.
Limiter mmap_limiter_; // Thread-safe. Limiter mmap_limiter_; // Thread-safe.
Limiter fd_limiter_; // Thread-safe. Limiter fd_limiter_; // Thread-safe.
@ -807,6 +830,7 @@ int MaxOpenFiles() {
PosixEnv::PosixEnv() PosixEnv::PosixEnv()
: background_work_cv_(&background_work_mutex_), : background_work_cv_(&background_work_mutex_),
background_GlobalCollection_work_cv_(&background_GlobalCollection_work_mutex_),
started_background_thread_(false), started_background_thread_(false),
mmap_limiter_(MaxMmaps()), mmap_limiter_(MaxMmaps()),
fd_limiter_(MaxOpenFiles()) {} fd_limiter_(MaxOpenFiles()) {}
@ -851,6 +875,41 @@ void PosixEnv::BackgroundThreadMain() {
} }
} }
void PosixEnv::ScheduleForGarbageCollection(
void (*background_work_function)(void* background_work_arg),
void* background_work_arg) {
background_GlobalCollection_work_mutex_.Lock();
// If the queue is empty, the background thread may be waiting for work.
if (background_GlobalCollection_work_queue_.empty()) {
background_GlobalCollection_work_cv_.Signal();
}
// 因为是锁住了 所以可以先 signal 再 emplace。
background_GlobalCollection_work_queue_.emplace(background_work_function, background_work_arg);
background_GlobalCollection_work_mutex_.Unlock();
}
// gc 的后台回收任务
void PosixEnv::BackgroundThreadMainGarbageCollection() {
while (true) {
background_GlobalCollection_work_mutex_.Lock();
// Wait until there is work to be done.
while (background_GlobalCollection_work_queue_.empty()) {
background_GlobalCollection_work_cv_.Wait();
}
assert(!background_GlobalCollection_work_queue_.empty());
auto background_work_function = background_GlobalCollection_work_queue_.front().function;
void* background_work_arg = background_GlobalCollection_work_queue_.front().arg;
background_GlobalCollection_work_queue_.pop();
background_GlobalCollection_work_mutex_.Unlock();
background_work_function(background_work_arg);
}
}
namespace { namespace {
// Wraps an Env instance whose destructor is never created. // Wraps an Env instance whose destructor is never created.

Loading…
Cancel
Save