diff --git a/db/db_impl.cc b/db/db_impl.cc index b552fb1..85015f4 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -33,6 +33,7 @@ #include "table/block.h" #include "table/merger.h" #include "table/two_level_iterator.h" +#include "table/vtable_builder.h" #include "table/vtable_format.h" #include "table/vtable_manager.h" #include "table/vtable_reader.h" @@ -70,7 +71,10 @@ struct DBImpl::CompactionState { : compaction(c), smallest_snapshot(0), outfile(nullptr), + vtb_file(nullptr), builder(nullptr), + vtable_builder(nullptr), + vtb_num(0), total_bytes(0) {} Compaction* const compaction; @@ -85,8 +89,11 @@ struct DBImpl::CompactionState { // State kept for output being generated WritableFile* outfile; + WritableFile* vtb_file; TableBuilder* builder; + VTableBuilder* vtable_builder; + uint64_t vtb_num; uint64_t total_bytes; }; @@ -153,7 +160,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) manual_compaction_(nullptr), versions_(new VersionSet(dbname_, &options_, table_cache_, &internal_comparator_)), - vtable_manager_(new VTableManager(dbname, raw_options.env)){} + vtable_manager_(new VTableManager(dbname, raw_options.env, raw_options.gc_size_threshold)){} DBImpl::~DBImpl() { // Wait for background work to finish. @@ -553,7 +560,9 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, } edit->AddFile(level, meta.number, meta.file_size, meta.smallest, meta.largest); - vtable_manager_->AddVTable(vtable_meta); + if (vtable_meta.number > 0) { + vtable_manager_->AddVTable(vtable_meta); + } } CompactionStats stats; @@ -844,6 +853,10 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { mutex_.Unlock(); } + if (compact->vtb_num == 0) { + compact->vtb_num = file_number; + } + // Make the output file std::string fname = TableFileName(dbname_, file_number); Status s = env_->NewWritableFile(fname, &compact->outfile); @@ -886,6 +899,26 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, delete compact->outfile; compact->outfile = nullptr; + if (compact->vtable_builder != nullptr && s.ok()) { + VTableMeta meta; + meta.invalid_num = 0; + meta.number = compact->vtb_num; + meta.records_num = compact->vtable_builder->RecordNumber(); + meta.table_size = compact->vtable_builder->FileSize(); + + s = compact->vtable_builder->Finish(); + delete compact->vtable_builder; + compact->vtable_builder = nullptr; + if (s.ok()) { + s = compact->vtb_file->Sync(); + } + if (s.ok()) { + s = compact->vtb_file->Close(); + } + delete compact->vtb_file; + compact->vtb_file = nullptr; + } + if (s.ok() && current_entries > 0) { // Verify that the table is usable Iterator* iter = @@ -920,6 +953,14 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } +bool GetValueType(const Slice& input, unsigned char* value) { + if (input.empty()) { + return false; + } + *value = *input.data(); + return true; +} + Status DBImpl::DoCompactionWork(CompactionState* compact) { const uint64_t start_micros = env_->NowMicros(); int64_t imm_micros = 0; // Micros spent doing imm_ compactions @@ -943,6 +984,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // Release mutex while we're actually doing the compaction work mutex_.Unlock(); + enum Type : unsigned char { + kVTableIndex = 1, + kNonIndexValue = 2, + }; input->SeekToFirst(); Status status; ParsedInternalKey ikey; @@ -1029,7 +1074,43 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { compact->current_output()->smallest.DecodeFrom(key); } compact->current_output()->largest.DecodeFrom(key); - compact->builder->Add(key, input->value()); + + auto value = input->value(); + std::string new_value = value.ToString(); + unsigned char type; + if(!GetValueType(value, &type)) { + break; + } + + if (type == kVTableIndex) { + if (compact->compaction->level() > config::kNumLevels - 3) { + if (compact->vtable_builder == nullptr) { + auto fname = VTableFileName(dbname_, compact->vtb_num); + status = env_->NewWritableFile(fname, &compact->vtb_file); + auto vtable_builder = new VTableBuilder(options_, compact->vtb_file); + compact->vtable_builder = vtable_builder; + } + VTableIndex index; + VTableReader reader; + VTableRecord record; + VTableHandle handle; + + status = index.Decode(&value); + + std::string vtb_name = VTableFileName(this->dbname_, index.file_number); + status = reader.Open(this->options_, vtb_name); + status = reader.Get(index.vtable_handle, &record); + + vtable_manager_->AddInvalid(index.file_number); + compact->vtable_builder->Add(record, &handle); + VTableIndex new_index; + new_index.file_number = compact->vtb_num; + new_index.vtable_handle = handle; + new_index.Encode(&new_value); + } + } + + compact->builder->Add(key, Slice(new_value)); // Close output file if it is big enough if (compact->builder->FileSize() >= @@ -1039,6 +1120,17 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { break; } } + } else { + unsigned char type; + auto value = input->value(); + if(!GetValueType(value, &type)) { + break; + } + if (type == kVTableIndex) { + VTableIndex vtable_index; + vtable_index.Decode(&value); + vtable_manager_->AddInvalid(vtable_index.file_number); + } } input->Next(); @@ -1148,18 +1240,6 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { return versions_->MaxNextLevelOverlappingBytes(); } -namespace { - -bool GetValueType(const Slice& input, unsigned char* value) { - if (input.empty()) { - return false; - } - *value = *input.data(); - return true; -} - -} // namespace - Status DBImpl::DecodeValue(std::string* value) const { enum Type : unsigned char { kVTableIndex = 1, diff --git a/db/db_impl.h b/db/db_impl.h index a8abbc0..c14b00a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -212,7 +212,7 @@ class DBImpl : public DB { CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_); - VTableManager* vtable_manager_ GUARDED_BY(mutex_) {}; + VTableManager* vtable_manager_ {}; }; // Sanitize db options. The caller should delete result.info_log if diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 2185d86..cf7707f 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -118,6 +118,8 @@ struct LEVELDB_EXPORT Options { // initially populating a large database. size_t max_file_size = 2 * 1024 * 1024; + size_t gc_size_threshold = 1024 * 1024 * 1024; + // Compress blocks using the specified compression algorithm. This // parameter can be changed dynamically. // diff --git a/table/vtable_manager.cc b/table/vtable_manager.cc index e2b118b..c02f98d 100644 --- a/table/vtable_manager.cc +++ b/table/vtable_manager.cc @@ -11,6 +11,12 @@ namespace leveldb { +struct GCInfo { + std::string dbname; + std::vector file_list; + Env* env = nullptr; +}; + void VTableMeta::Encode(std::string* target) const { PutVarint64(target, number); PutVarint64(target, records_num); @@ -46,6 +52,9 @@ Status VTableManager::AddInvalid(uint64_t file_num) { if (vtables_[file_num].invalid_num >= vtables_[file_num].records_num) { invalid_.emplace_back(file_num); } + + MaybeScheduleGarbageCollect(); + return Status::OK(); } @@ -113,6 +122,9 @@ Status VTableManager::LoadVTableMeta() { VTableMeta vtable_meta; s = vtable_meta.Decode(&input); if (s.ok()) { + if (vtable_meta.number == 0) { + continue; + } AddVTable(vtable_meta); if (vtable_meta.invalid_num >= vtable_meta.records_num) { invalid_.emplace_back(vtable_meta.number); @@ -125,4 +137,33 @@ Status VTableManager::LoadVTableMeta() { return s; } +void VTableManager::MaybeScheduleGarbageCollect() { + size_t size = 0; + for (auto & file_num : invalid_) { + size += vtables_[file_num].table_size; + } + if (size >= gc_threshold_) { + auto* gc_info = new GCInfo; + gc_info->dbname = dbname_; + gc_info->file_list = invalid_; + gc_info->env = env_; + env_->StartThread(&VTableManager::BackgroudGC, gc_info); + for (auto & file_num : gc_info->file_list) { + RemoveVTable(file_num); + auto it = std::remove(invalid_.begin(), invalid_.end(), file_num); + } + } +} + +void VTableManager::BackgroudGC(void* gc_info) { + auto info = reinterpret_cast(gc_info); + for (auto & file_num : info->file_list) { + auto fname = VTableFileName(info->dbname, file_num); + info->env->RemoveFile(fname); + } + delete info; +} + + + } // namespace leveldb diff --git a/table/vtable_manager.h b/table/vtable_manager.h index 1fa0e4a..7ad1eff 100644 --- a/table/vtable_manager.h +++ b/table/vtable_manager.h @@ -26,9 +26,10 @@ struct VTableMeta { class VTableManager { public: - explicit VTableManager(const std::string& dbname, Env* env) : + explicit VTableManager(const std::string& dbname, Env* env, size_t gc_threshold) : dbname_(dbname), - env_(env) {} + env_(env), + gc_threshold_(gc_threshold) {} ~VTableManager() = default; @@ -42,11 +43,16 @@ class VTableManager { Status LoadVTableMeta(); + void MaybeScheduleGarbageCollect(); + + static void BackgroudGC(void* gc_info); + private: std::string dbname_; Env* env_; std::map vtables_; std::vector invalid_; + size_t gc_threshold_; }; } // namespace leveldb