diff --git a/CMakeLists.txt b/CMakeLists.txt index 32edf3b..b56bb39 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -176,6 +176,8 @@ target_sources(leveldb "table/vtable_builder.h" "table/vtable_format.cc" "table/vtable_format.h" + "table/vtable_manager.cc" + "table/vtable_manager.h" "table/vtable_reader.cc" "table/vtable_reader.h" "util/arena.cc" diff --git a/db/builder.cc b/db/builder.cc index a8b9780..a4457f5 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -8,15 +8,19 @@ #include "db/filename.h" #include "db/table_cache.h" #include "db/version_edit.h" + #include "leveldb/db.h" #include "leveldb/env.h" #include "leveldb/iterator.h" + #include "table/vtable_builder.h" +#include "table/vtable_manager.h" namespace leveldb { Status BuildTable(const std::string& dbname, Env* env, const Options& options, - TableCache* table_cache, Iterator* iter, FileMetaData* meta) { + TableCache* table_cache, Iterator* iter, FileMetaData* meta, + VTableMeta* vtable_meta) { Status s; meta->file_size = 0; iter->SeekToFirst(); @@ -94,6 +98,11 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, if (s.ok()) { s = vtb_builder->Finish(); } + if (s.ok()) { + vtable_meta->number = meta->number; + vtable_meta->table_size = vtb_builder->FileSize(); + vtable_meta->records_num = vtb_builder->RecordNumber(); + } delete vtb_builder; if (s.ok()) { @@ -124,6 +133,11 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, } else { env->RemoveFile(fname); } + if (s.ok() && vtable_meta->table_size > 0) { + // Keep it + } else { + env->RemoveFile(vtb_name); + } return s; } diff --git a/db/builder.h b/db/builder.h index 7bd0b80..2e40c5e 100644 --- a/db/builder.h +++ b/db/builder.h @@ -7,6 +7,8 @@ #include "leveldb/status.h" +#include "table/vtable_manager.h" + namespace leveldb { struct Options; @@ -23,7 +25,8 @@ class VersionEdit; // If no data is present in *iter, meta->file_size will be set to // zero, and no Table file will be produced. Status BuildTable(const std::string& dbname, Env* env, const Options& options, - TableCache* table_cache, Iterator* iter, FileMetaData* meta); + TableCache* table_cache, Iterator* iter, FileMetaData* meta, + VTableMeta* vtable_meta); } // namespace leveldb diff --git a/db/db_impl.cc b/db/db_impl.cc index 299b249..85015f4 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -33,7 +33,9 @@ #include "table/block.h" #include "table/merger.h" #include "table/two_level_iterator.h" +#include "table/vtable_builder.h" #include "table/vtable_format.h" +#include "table/vtable_manager.h" #include "table/vtable_reader.h" #include "util/coding.h" #include "util/logging.h" @@ -69,7 +71,10 @@ struct DBImpl::CompactionState { : compaction(c), smallest_snapshot(0), outfile(nullptr), + vtb_file(nullptr), builder(nullptr), + vtable_builder(nullptr), + vtb_num(0), total_bytes(0) {} Compaction* const compaction; @@ -84,8 +89,11 @@ struct DBImpl::CompactionState { // State kept for output being generated WritableFile* outfile; + WritableFile* vtb_file; TableBuilder* builder; + VTableBuilder* vtable_builder; + uint64_t vtb_num; uint64_t total_bytes; }; @@ -151,7 +159,8 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) background_compaction_scheduled_(false), manual_compaction_(nullptr), versions_(new VersionSet(dbname_, &options_, table_cache_, - &internal_comparator_)) {} + &internal_comparator_)), + vtable_manager_(new VTableManager(dbname, raw_options.env, raw_options.gc_size_threshold)){} DBImpl::~DBImpl() { // Wait for background work to finish. @@ -271,6 +280,7 @@ void DBImpl::RemoveObsoleteFiles() { case kCurrentFile: case kDBLockFile: case kInfoLogFile: + case kVTableManagerFile: keep = true; break; } @@ -386,6 +396,11 @@ Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) { versions_->SetLastSequence(max_sequence); } + s = vtable_manager_->LoadVTableMeta(); + if (!s.ok()) { + return Status::Corruption("LoadVTableMeta failed"); + } + return Status::OK(); } @@ -514,6 +529,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; + VTableMeta vtable_meta; meta.number = versions_->NewFileNumber(); pending_outputs_.insert(meta.number); Iterator* iter = mem->NewIterator(); @@ -523,7 +539,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, Status s; { mutex_.Unlock(); - s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); + s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta, &vtable_meta); mutex_.Lock(); } @@ -544,6 +560,9 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, } edit->AddFile(level, meta.number, meta.file_size, meta.smallest, meta.largest); + if (vtable_meta.number > 0) { + vtable_manager_->AddVTable(vtable_meta); + } } CompactionStats stats; @@ -573,6 +592,9 @@ void DBImpl::CompactMemTable() { edit.SetPrevLogNumber(0); edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed s = versions_->LogAndApply(&edit, &mutex_); + if (s.ok()) { + s = vtable_manager_->SaveVTableMeta(); + } } if (s.ok()) { @@ -752,6 +774,12 @@ void DBImpl::BackgroundCompaction() { if (!status.ok()) { RecordBackgroundError(status); } + + status = vtable_manager_->SaveVTableMeta(); + + if (!status.ok()) { + RecordBackgroundError(status); + } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast<unsigned long long>(f->number), c->level() + 1, @@ -825,6 +853,10 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { mutex_.Unlock(); } + if (compact->vtb_num == 0) { + compact->vtb_num = file_number; + } + // Make the output file std::string fname = TableFileName(dbname_, file_number); Status s = env_->NewWritableFile(fname, &compact->outfile); @@ -867,6 +899,26 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, delete compact->outfile; compact->outfile = nullptr; + if (compact->vtable_builder != nullptr && s.ok()) { + VTableMeta meta; + meta.invalid_num = 0; + meta.number = compact->vtb_num; + meta.records_num = compact->vtable_builder->RecordNumber(); + meta.table_size = compact->vtable_builder->FileSize(); + + s = compact->vtable_builder->Finish(); + delete compact->vtable_builder; + compact->vtable_builder = nullptr; + if (s.ok()) { + s = compact->vtb_file->Sync(); + } + if (s.ok()) { + s = compact->vtb_file->Close(); + } + delete compact->vtb_file; + compact->vtb_file = nullptr; + } + if (s.ok() && current_entries > 0) { // Verify that the table is usable Iterator* iter = @@ -901,6 +953,14 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } +bool GetValueType(const Slice& input, unsigned char* value) { + if (input.empty()) { + return false; + } + *value = *input.data(); + return true; +} + Status DBImpl::DoCompactionWork(CompactionState* compact) { const uint64_t start_micros = env_->NowMicros(); int64_t imm_micros = 0; // Micros spent doing imm_ compactions @@ -924,6 +984,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // Release mutex while we're actually doing the compaction work mutex_.Unlock(); + enum Type : unsigned char { + kVTableIndex = 1, + kNonIndexValue = 2, + }; input->SeekToFirst(); Status status; ParsedInternalKey ikey; @@ -1010,7 +1074,43 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { compact->current_output()->smallest.DecodeFrom(key); } compact->current_output()->largest.DecodeFrom(key); - compact->builder->Add(key, input->value()); + + auto value = input->value(); + std::string new_value = value.ToString(); + unsigned char type; + if(!GetValueType(value, &type)) { + break; + } + + if (type == kVTableIndex) { + if (compact->compaction->level() > config::kNumLevels - 3) { + if (compact->vtable_builder == nullptr) { + auto fname = VTableFileName(dbname_, compact->vtb_num); + status = env_->NewWritableFile(fname, &compact->vtb_file); + auto vtable_builder = new VTableBuilder(options_, compact->vtb_file); + compact->vtable_builder = vtable_builder; + } + VTableIndex index; + VTableReader reader; + VTableRecord record; + VTableHandle handle; + + status = index.Decode(&value); + + std::string vtb_name = VTableFileName(this->dbname_, index.file_number); + status = reader.Open(this->options_, vtb_name); + status = reader.Get(index.vtable_handle, &record); + + vtable_manager_->AddInvalid(index.file_number); + compact->vtable_builder->Add(record, &handle); + VTableIndex new_index; + new_index.file_number = compact->vtb_num; + new_index.vtable_handle = handle; + new_index.Encode(&new_value); + } + } + + compact->builder->Add(key, Slice(new_value)); // Close output file if it is big enough if (compact->builder->FileSize() >= @@ -1020,6 +1120,17 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { break; } } + } else { + unsigned char type; + auto value = input->value(); + if(!GetValueType(value, &type)) { + break; + } + if (type == kVTableIndex) { + VTableIndex vtable_index; + vtable_index.Decode(&value); + vtable_manager_->AddInvalid(vtable_index.file_number); + } } input->Next(); @@ -1057,6 +1168,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (!status.ok()) { RecordBackgroundError(status); } + + status = vtable_manager_->SaveVTableMeta(); + if (!status.ok()) { + RecordBackgroundError(status); + } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp)); return status; @@ -1124,18 +1240,6 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { return versions_->MaxNextLevelOverlappingBytes(); } -namespace { - -bool GetValueType(const Slice& input, unsigned char* value) { - if (input.empty()) { - return false; - } - *value = *input.data(); - return true; -} - -} // namespace - Status DBImpl::DecodeValue(std::string* value) const { enum Type : unsigned char { kVTableIndex = 1, @@ -1695,6 +1799,9 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { edit.SetPrevLogNumber(0); // No older logs needed after recovery. edit.SetLogNumber(impl->logfile_number_); s = impl->versions_->LogAndApply(&edit, &impl->mutex_); + if (s.ok()) { + s = impl->vtable_manager_->SaveVTableMeta(); + } } if (s.ok()) { impl->RemoveObsoleteFiles(); diff --git a/db/db_impl.h b/db/db_impl.h index 4d6fc29..c14b00a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -17,6 +17,7 @@ #include "leveldb/env.h" #include "port/port.h" #include "port/thread_annotations.h" +#include "table/vtable_manager.h" namespace leveldb { @@ -210,6 +211,8 @@ class DBImpl : public DB { Status bg_error_ GUARDED_BY(mutex_); CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_); + + VTableManager* vtable_manager_ {}; }; // Sanitize db options. The caller should delete result.info_log if diff --git a/db/filename.cc b/db/filename.cc index baaa512..e98d8e4 100644 --- a/db/filename.cc +++ b/db/filename.cc @@ -73,6 +73,10 @@ std::string OldInfoLogFileName(const std::string& dbname) { return dbname + "/LOG.old"; } +std::string VTableManagerFileName(const std::string& dbname) { + return dbname + "/VTableMeta"; +} + // Owned filenames have the form: // dbname/CURRENT // dbname/LOCK @@ -89,6 +93,9 @@ bool ParseFileName(const std::string& filename, uint64_t* number, } else if (rest == "LOCK") { *number = 0; *type = kDBLockFile; + } else if (rest == "VTableMeta") { + *number = 0; + *type = kVTableManagerFile; } else if (rest == "LOG" || rest == "LOG.old") { *number = 0; *type = kInfoLogFile; diff --git a/db/filename.h b/db/filename.h index f777104..3d21c6d 100644 --- a/db/filename.h +++ b/db/filename.h @@ -26,7 +26,8 @@ enum FileType { kCurrentFile, kTempFile, kInfoLogFile, - kVTableFile// Either the current one, or an old one + kVTableFile, + kVTableManagerFile// Either the current one, or an old one }; // Return the name of the log file with the specified number @@ -73,6 +74,8 @@ std::string InfoLogFileName(const std::string& dbname); // Return the name of the old info log file for "dbname". std::string OldInfoLogFileName(const std::string& dbname); +std::string VTableManagerFileName(const std::string& dbname); + // If filename is a leveldb file, store the type of the file in *type. // The number encoded in the filename is stored in *number. If the // filename was successfully parsed, returns true. Else return false. diff --git a/db/repair.cc b/db/repair.cc index 97a27c6..f840d77 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -201,9 +201,10 @@ class Repairer { // Do not record a version edit for this conversion to a Table // since ExtractMetaData() will also generate edits. FileMetaData meta; + VTableMeta vtable_meta; meta.number = next_file_number_++; Iterator* iter = mem->NewIterator(); - status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); + status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta, &vtable_meta); delete iter; mem->Unref(); mem = nullptr; @@ -212,6 +213,7 @@ class Repairer { table_numbers_.push_back(meta.number); } } + Log(options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s", (unsigned long long)log, counter, (unsigned long long)meta.number, status.ToString().c_str()); diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 2185d86..cf7707f 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -118,6 +118,8 @@ struct LEVELDB_EXPORT Options { // initially populating a large database. size_t max_file_size = 2 * 1024 * 1024; + size_t gc_size_threshold = 1024 * 1024 * 1024; + // Compress blocks using the specified compression algorithm. This // parameter can be changed dynamically. // diff --git a/table/vtable_builder.cc b/table/vtable_builder.cc index 3833e57..843cfab 100644 --- a/table/vtable_builder.cc +++ b/table/vtable_builder.cc @@ -19,6 +19,7 @@ void VTableBuilder::Add(const VTableRecord& record, VTableHandle* handle) { status_ = file_->Append(encoder_.GetHeader().ToString() + encoder_.GetRecord().ToString()); + record_number_ += 1; assert(ok()); //TODO: meta info support in the future } diff --git a/table/vtable_builder.h b/table/vtable_builder.h index 0eedb84..5b814ee 100644 --- a/table/vtable_builder.h +++ b/table/vtable_builder.h @@ -23,11 +23,16 @@ class VTableBuilder { // Abandon building the vTable void Abandon(); + + uint64_t FileSize() const { return file_size_; } + + uint64_t RecordNumber() const { return record_number_; } private: bool ok() const { return status().ok(); } WritableFile* file_; uint64_t file_size_{0}; + uint64_t record_number_{0}; Status status_; diff --git a/table/vtable_manager.cc b/table/vtable_manager.cc new file mode 100644 index 0000000..c02f98d --- /dev/null +++ b/table/vtable_manager.cc @@ -0,0 +1,169 @@ +#include "table/vtable_manager.h" + +#include "db/filename.h" +#include <iostream> +#include <ostream> + +#include "leveldb/env.h" +#include "leveldb/status.h" + +#include "util/coding.h" + +namespace leveldb { + +struct GCInfo { + std::string dbname; + std::vector<uint64_t> file_list; + Env* env = nullptr; +}; + +void VTableMeta::Encode(std::string* target) const { + PutVarint64(target, number); + PutVarint64(target, records_num); + PutVarint64(target, invalid_num); + PutVarint64(target, table_size); +} + +Status VTableMeta::Decode(Slice* input) { + if (!GetVarint64(input, &number) || !GetVarint64(input, &records_num) || + !GetVarint64(input, &invalid_num) || !GetVarint64(input, &table_size)) { + return Status::Corruption("Error Decode VTable meta"); + } + return Status::OK(); +} + + + +void VTableManager::AddVTable(const VTableMeta& vtable_meta) { + vtables_[vtable_meta.number] = vtable_meta; +} + +void VTableManager::RemoveVTable(uint64_t file_num) { + vtables_.erase(file_num); +} + +Status VTableManager::AddInvalid(uint64_t file_num) { + const auto it = vtables_.find(file_num); + if (it == vtables_.end()) { + return Status::Corruption("Invalid VTable number"); + } + + vtables_[file_num].invalid_num += 1; + if (vtables_[file_num].invalid_num >= vtables_[file_num].records_num) { + invalid_.emplace_back(file_num); + } + + MaybeScheduleGarbageCollect(); + + return Status::OK(); +} + +Status VTableManager::SaveVTableMeta() const { + auto fname = VTableManagerFileName(dbname_); + WritableFile* file; + Status s = env_->NewWritableFile(fname, &file); + if (!s.ok()) { + return Status::Corruption("Failed to open vTable manager file"); + } + + const auto vtable_num = vtables_.size(); + std::string target; + PutVarint64(&target, vtable_num); + for (auto & vtable : vtables_) { + vtable.second.Encode(&target); + } + s = file->Append(target); + if (!s.ok()) { + return Status::Corruption("Failed to append vTable manager file"); + } + s = file->Flush(); + if (s.ok()) { + s = file->Sync(); + } + if (s.ok()) { + s = file->Close(); + } + delete file; + file = nullptr; + if (!s.ok()) { + return Status::Corruption("Failed to write vTable meta file"); + } + return Status::OK(); +} + +Status VTableManager::LoadVTableMeta() { + auto fname = VTableManagerFileName(dbname_); + if (!env_->FileExists(fname)) { + return Status::OK(); + } + SequentialFile* file; + Status s = env_->NewSequentialFile(fname, &file); + if (!s.ok()) { + return Status::Corruption("Failed to open vTable manager file"); + } + uint64_t file_size; + s = env_->GetFileSize(fname, &file_size); + if (!s.ok()) { + return Status::Corruption("Failed to get vTable manager file size"); + } + auto buf = new char[file_size]; + Slice input; + s = file->Read(file_size, &input, buf); + if (!s.ok()) { + return Status::Corruption("Failed to read vTable manager file"); + } + + uint64_t vtable_num; + if(!GetVarint64(&input, &vtable_num)) { + return Status::Corruption("Failed to get vTable num"); + } + + for (int i = 0; i < vtable_num; i++) { + VTableMeta vtable_meta; + s = vtable_meta.Decode(&input); + if (s.ok()) { + if (vtable_meta.number == 0) { + continue; + } + AddVTable(vtable_meta); + if (vtable_meta.invalid_num >= vtable_meta.records_num) { + invalid_.emplace_back(vtable_meta.number); + } + } else { + return s; + } + } + + return s; +} + +void VTableManager::MaybeScheduleGarbageCollect() { + size_t size = 0; + for (auto & file_num : invalid_) { + size += vtables_[file_num].table_size; + } + if (size >= gc_threshold_) { + auto* gc_info = new GCInfo; + gc_info->dbname = dbname_; + gc_info->file_list = invalid_; + gc_info->env = env_; + env_->StartThread(&VTableManager::BackgroudGC, gc_info); + for (auto & file_num : gc_info->file_list) { + RemoveVTable(file_num); + auto it = std::remove(invalid_.begin(), invalid_.end(), file_num); + } + } +} + +void VTableManager::BackgroudGC(void* gc_info) { + auto info = reinterpret_cast<GCInfo*>(gc_info); + for (auto & file_num : info->file_list) { + auto fname = VTableFileName(info->dbname, file_num); + info->env->RemoveFile(fname); + } + delete info; +} + + + +} // namespace leveldb diff --git a/table/vtable_manager.h b/table/vtable_manager.h new file mode 100644 index 0000000..7ad1eff --- /dev/null +++ b/table/vtable_manager.h @@ -0,0 +1,59 @@ +#ifndef VTABLE_MANAGER_H +#define VTABLE_MANAGER_H + +#include <map> + +#include "leveldb/env.h" +#include "leveldb/slice.h" +#include "leveldb/status.h" + +namespace leveldb { + +struct VTableMeta { + uint64_t number; + + uint64_t records_num; + + uint64_t invalid_num; + + uint64_t table_size; + + void Encode(std::string* target) const; + Status Decode(Slice* input); + + VTableMeta() : number(0), records_num(0), invalid_num(0), table_size(0) {} +}; + +class VTableManager { + public: + explicit VTableManager(const std::string& dbname, Env* env, size_t gc_threshold) : + dbname_(dbname), + env_(env), + gc_threshold_(gc_threshold) {} + + ~VTableManager() = default; + + void AddVTable(const VTableMeta& vtable_meta); + + void RemoveVTable(uint64_t file_num); + + Status AddInvalid(uint64_t file_num); + + Status SaveVTableMeta() const; + + Status LoadVTableMeta(); + + void MaybeScheduleGarbageCollect(); + + static void BackgroudGC(void* gc_info); + + private: + std::string dbname_; + Env* env_; + std::map<uint64_t, VTableMeta> vtables_; + std::vector<uint64_t> invalid_; + size_t gc_threshold_; +}; + +} // namespace leveldb +#endif //VTABLE_MANAGER_H diff --git a/table/vtable_reader.cc b/table/vtable_reader.cc index cb56f12..2cd1478 100644 --- a/table/vtable_reader.cc +++ b/table/vtable_reader.cc @@ -2,7 +2,7 @@ #include "leveldb/env.h" -#include <table/vtable_reader.h> +#include "table/vtable_reader.h" namespace leveldb { Status VTableReader::Open(const Options& options, std::string fname) {