Browse Source

Merge branch 'main' into wesley

main
wesley 4 months ago
parent
commit
ebecd08378
14 changed files with 397 additions and 20 deletions
  1. +2
    -0
      CMakeLists.txt
  2. +15
    -1
      db/builder.cc
  3. +4
    -1
      db/builder.h
  4. +122
    -15
      db/db_impl.cc
  5. +3
    -0
      db/db_impl.h
  6. +7
    -0
      db/filename.cc
  7. +4
    -1
      db/filename.h
  8. +3
    -1
      db/repair.cc
  9. +2
    -0
      include/leveldb/options.h
  10. +1
    -0
      table/vtable_builder.cc
  11. +5
    -0
      table/vtable_builder.h
  12. +169
    -0
      table/vtable_manager.cc
  13. +59
    -0
      table/vtable_manager.h
  14. +1
    -1
      table/vtable_reader.cc

+ 2
- 0
CMakeLists.txt View File

@ -176,6 +176,8 @@ target_sources(leveldb
"table/vtable_builder.h" "table/vtable_builder.h"
"table/vtable_format.cc" "table/vtable_format.cc"
"table/vtable_format.h" "table/vtable_format.h"
"table/vtable_manager.cc"
"table/vtable_manager.h"
"table/vtable_reader.cc" "table/vtable_reader.cc"
"table/vtable_reader.h" "table/vtable_reader.h"
"util/arena.cc" "util/arena.cc"

+ 15
- 1
db/builder.cc View File

@ -8,15 +8,19 @@
#include "db/filename.h" #include "db/filename.h"
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/version_edit.h" #include "db/version_edit.h"
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/iterator.h" #include "leveldb/iterator.h"
#include "table/vtable_builder.h" #include "table/vtable_builder.h"
#include "table/vtable_manager.h"
namespace leveldb { namespace leveldb {
Status BuildTable(const std::string& dbname, Env* env, const Options& options, Status BuildTable(const std::string& dbname, Env* env, const Options& options,
TableCache* table_cache, Iterator* iter, FileMetaData* meta) { TableCache* table_cache, Iterator* iter, FileMetaData* meta,
VTableMeta* vtable_meta) {
Status s; Status s;
meta->file_size = 0; meta->file_size = 0;
iter->SeekToFirst(); iter->SeekToFirst();
@ -94,6 +98,11 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
if (s.ok()) { if (s.ok()) {
s = vtb_builder->Finish(); s = vtb_builder->Finish();
} }
if (s.ok()) {
vtable_meta->number = meta->number;
vtable_meta->table_size = vtb_builder->FileSize();
vtable_meta->records_num = vtb_builder->RecordNumber();
}
delete vtb_builder; delete vtb_builder;
if (s.ok()) { if (s.ok()) {
@ -124,6 +133,11 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
} else { } else {
env->RemoveFile(fname); env->RemoveFile(fname);
} }
if (s.ok() && vtable_meta->table_size > 0) {
// Keep it
} else {
env->RemoveFile(vtb_name);
}
return s; return s;
} }

+ 4
- 1
db/builder.h View File

@ -7,6 +7,8 @@
#include "leveldb/status.h" #include "leveldb/status.h"
#include "table/vtable_manager.h"
namespace leveldb { namespace leveldb {
struct Options; struct Options;
@ -23,7 +25,8 @@ class VersionEdit;
// If no data is present in *iter, meta->file_size will be set to // If no data is present in *iter, meta->file_size will be set to
// zero, and no Table file will be produced. // zero, and no Table file will be produced.
Status BuildTable(const std::string& dbname, Env* env, const Options& options, Status BuildTable(const std::string& dbname, Env* env, const Options& options,
TableCache* table_cache, Iterator* iter, FileMetaData* meta); TableCache* table_cache, Iterator* iter, FileMetaData* meta,
VTableMeta* vtable_meta);
} // namespace leveldb } // namespace leveldb

+ 122
- 15
db/db_impl.cc View File

@ -33,7 +33,9 @@
#include "table/block.h" #include "table/block.h"
#include "table/merger.h" #include "table/merger.h"
#include "table/two_level_iterator.h" #include "table/two_level_iterator.h"
#include "table/vtable_builder.h"
#include "table/vtable_format.h" #include "table/vtable_format.h"
#include "table/vtable_manager.h"
#include "table/vtable_reader.h" #include "table/vtable_reader.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/logging.h" #include "util/logging.h"
@ -69,7 +71,10 @@ struct DBImpl::CompactionState {
: compaction(c), : compaction(c),
smallest_snapshot(0), smallest_snapshot(0),
outfile(nullptr), outfile(nullptr),
vtb_file(nullptr),
builder(nullptr), builder(nullptr),
vtable_builder(nullptr),
vtb_num(0),
total_bytes(0) {} total_bytes(0) {}
Compaction* const compaction; Compaction* const compaction;
@ -84,8 +89,11 @@ struct DBImpl::CompactionState {
// State kept for output being generated // State kept for output being generated
WritableFile* outfile; WritableFile* outfile;
WritableFile* vtb_file;
TableBuilder* builder; TableBuilder* builder;
VTableBuilder* vtable_builder;
uint64_t vtb_num;
uint64_t total_bytes; uint64_t total_bytes;
}; };
@ -151,7 +159,8 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
background_compaction_scheduled_(false), background_compaction_scheduled_(false),
manual_compaction_(nullptr), manual_compaction_(nullptr),
versions_(new VersionSet(dbname_, &options_, table_cache_, versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {} &internal_comparator_)),
vtable_manager_(new VTableManager(dbname, raw_options.env, raw_options.gc_size_threshold)){}
DBImpl::~DBImpl() { DBImpl::~DBImpl() {
// Wait for background work to finish. // Wait for background work to finish.
@ -271,6 +280,7 @@ void DBImpl::RemoveObsoleteFiles() {
case kCurrentFile: case kCurrentFile:
case kDBLockFile: case kDBLockFile:
case kInfoLogFile: case kInfoLogFile:
case kVTableManagerFile:
keep = true; keep = true;
break; break;
} }
@ -386,6 +396,11 @@ Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
versions_->SetLastSequence(max_sequence); versions_->SetLastSequence(max_sequence);
} }
s = vtable_manager_->LoadVTableMeta();
if (!s.ok()) {
return Status::Corruption("LoadVTableMeta failed");
}
return Status::OK(); return Status::OK();
} }
@ -514,6 +529,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
mutex_.AssertHeld(); mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros(); const uint64_t start_micros = env_->NowMicros();
FileMetaData meta; FileMetaData meta;
VTableMeta vtable_meta;
meta.number = versions_->NewFileNumber(); meta.number = versions_->NewFileNumber();
pending_outputs_.insert(meta.number); pending_outputs_.insert(meta.number);
Iterator* iter = mem->NewIterator(); Iterator* iter = mem->NewIterator();
@ -523,7 +539,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
Status s; Status s;
{ {
mutex_.Unlock(); mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta, &vtable_meta);
mutex_.Lock(); mutex_.Lock();
} }
@ -544,6 +560,9 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
} }
edit->AddFile(level, meta.number, meta.file_size, meta.smallest, edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
meta.largest); meta.largest);
if (vtable_meta.number > 0) {
vtable_manager_->AddVTable(vtable_meta);
}
} }
CompactionStats stats; CompactionStats stats;
@ -573,6 +592,9 @@ void DBImpl::CompactMemTable() {
edit.SetPrevLogNumber(0); edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
s = versions_->LogAndApply(&edit, &mutex_); s = versions_->LogAndApply(&edit, &mutex_);
if (s.ok()) {
s = vtable_manager_->SaveVTableMeta();
}
} }
if (s.ok()) { if (s.ok()) {
@ -752,6 +774,12 @@ void DBImpl::BackgroundCompaction() {
if (!status.ok()) { if (!status.ok()) {
RecordBackgroundError(status); RecordBackgroundError(status);
} }
status = vtable_manager_->SaveVTableMeta();
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp; VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number), c->level() + 1, static_cast<unsigned long long>(f->number), c->level() + 1,
@ -825,6 +853,10 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
mutex_.Unlock(); mutex_.Unlock();
} }
if (compact->vtb_num == 0) {
compact->vtb_num = file_number;
}
// Make the output file // Make the output file
std::string fname = TableFileName(dbname_, file_number); std::string fname = TableFileName(dbname_, file_number);
Status s = env_->NewWritableFile(fname, &compact->outfile); Status s = env_->NewWritableFile(fname, &compact->outfile);
@ -867,6 +899,26 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
delete compact->outfile; delete compact->outfile;
compact->outfile = nullptr; compact->outfile = nullptr;
if (compact->vtable_builder != nullptr && s.ok()) {
VTableMeta meta;
meta.invalid_num = 0;
meta.number = compact->vtb_num;
meta.records_num = compact->vtable_builder->RecordNumber();
meta.table_size = compact->vtable_builder->FileSize();
s = compact->vtable_builder->Finish();
delete compact->vtable_builder;
compact->vtable_builder = nullptr;
if (s.ok()) {
s = compact->vtb_file->Sync();
}
if (s.ok()) {
s = compact->vtb_file->Close();
}
delete compact->vtb_file;
compact->vtb_file = nullptr;
}
if (s.ok() && current_entries > 0) { if (s.ok() && current_entries > 0) {
// Verify that the table is usable // Verify that the table is usable
Iterator* iter = Iterator* iter =
@ -901,6 +953,14 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
return versions_->LogAndApply(compact->compaction->edit(), &mutex_); return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
} }
bool GetValueType(const Slice& input, unsigned char* value) {
if (input.empty()) {
return false;
}
*value = *input.data();
return true;
}
Status DBImpl::DoCompactionWork(CompactionState* compact) { Status DBImpl::DoCompactionWork(CompactionState* compact) {
const uint64_t start_micros = env_->NowMicros(); const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions int64_t imm_micros = 0; // Micros spent doing imm_ compactions
@ -924,6 +984,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// Release mutex while we're actually doing the compaction work // Release mutex while we're actually doing the compaction work
mutex_.Unlock(); mutex_.Unlock();
enum Type : unsigned char {
kVTableIndex = 1,
kNonIndexValue = 2,
};
input->SeekToFirst(); input->SeekToFirst();
Status status; Status status;
ParsedInternalKey ikey; ParsedInternalKey ikey;
@ -1010,7 +1074,43 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
compact->current_output()->smallest.DecodeFrom(key); compact->current_output()->smallest.DecodeFrom(key);
} }
compact->current_output()->largest.DecodeFrom(key); compact->current_output()->largest.DecodeFrom(key);
compact->builder->Add(key, input->value()); auto value = input->value();
std::string new_value = value.ToString();
unsigned char type;
if(!GetValueType(value, &type)) {
break;
}
if (type == kVTableIndex) {
if (compact->compaction->level() > config::kNumLevels - 3) {
if (compact->vtable_builder == nullptr) {
auto fname = VTableFileName(dbname_, compact->vtb_num);
status = env_->NewWritableFile(fname, &compact->vtb_file);
auto vtable_builder = new VTableBuilder(options_, compact->vtb_file);
compact->vtable_builder = vtable_builder;
}
VTableIndex index;
VTableReader reader;
VTableRecord record;
VTableHandle handle;
status = index.Decode(&value);
std::string vtb_name = VTableFileName(this->dbname_, index.file_number);
status = reader.Open(this->options_, vtb_name);
status = reader.Get(index.vtable_handle, &record);
vtable_manager_->AddInvalid(index.file_number);
compact->vtable_builder->Add(record, &handle);
VTableIndex new_index;
new_index.file_number = compact->vtb_num;
new_index.vtable_handle = handle;
new_index.Encode(&new_value);
}
}
compact->builder->Add(key, Slice(new_value));
// Close output file if it is big enough // Close output file if it is big enough
if (compact->builder->FileSize() >= if (compact->builder->FileSize() >=
@ -1020,6 +1120,17 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
break; break;
} }
} }
} else {
unsigned char type;
auto value = input->value();
if(!GetValueType(value, &type)) {
break;
}
if (type == kVTableIndex) {
VTableIndex vtable_index;
vtable_index.Decode(&value);
vtable_manager_->AddInvalid(vtable_index.file_number);
}
} }
input->Next(); input->Next();
@ -1057,6 +1168,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (!status.ok()) { if (!status.ok()) {
RecordBackgroundError(status); RecordBackgroundError(status);
} }
status = vtable_manager_->SaveVTableMeta();
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp; VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp)); Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));
return status; return status;
@ -1124,18 +1240,6 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
return versions_->MaxNextLevelOverlappingBytes(); return versions_->MaxNextLevelOverlappingBytes();
} }
namespace {
bool GetValueType(const Slice& input, unsigned char* value) {
if (input.empty()) {
return false;
}
*value = *input.data();
return true;
}
} // namespace
Status DBImpl::DecodeValue(std::string* value) const { Status DBImpl::DecodeValue(std::string* value) const {
enum Type : unsigned char { enum Type : unsigned char {
kVTableIndex = 1, kVTableIndex = 1,
@ -1695,6 +1799,9 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery. edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_); edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_); s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
if (s.ok()) {
s = impl->vtable_manager_->SaveVTableMeta();
}
} }
if (s.ok()) { if (s.ok()) {
impl->RemoveObsoleteFiles(); impl->RemoveObsoleteFiles();

+ 3
- 0
db/db_impl.h View File

@ -17,6 +17,7 @@
#include "leveldb/env.h" #include "leveldb/env.h"
#include "port/port.h" #include "port/port.h"
#include "port/thread_annotations.h" #include "port/thread_annotations.h"
#include "table/vtable_manager.h"
namespace leveldb { namespace leveldb {
@ -210,6 +211,8 @@ class DBImpl : public DB {
Status bg_error_ GUARDED_BY(mutex_); Status bg_error_ GUARDED_BY(mutex_);
CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_); CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_);
VTableManager* vtable_manager_ {};
}; };
// Sanitize db options. The caller should delete result.info_log if // Sanitize db options. The caller should delete result.info_log if

+ 7
- 0
db/filename.cc View File

@ -73,6 +73,10 @@ std::string OldInfoLogFileName(const std::string& dbname) {
return dbname + "/LOG.old"; return dbname + "/LOG.old";
} }
std::string VTableManagerFileName(const std::string& dbname) {
return dbname + "/VTableMeta";
}
// Owned filenames have the form: // Owned filenames have the form:
// dbname/CURRENT // dbname/CURRENT
// dbname/LOCK // dbname/LOCK
@ -89,6 +93,9 @@ bool ParseFileName(const std::string& filename, uint64_t* number,
} else if (rest == "LOCK") { } else if (rest == "LOCK") {
*number = 0; *number = 0;
*type = kDBLockFile; *type = kDBLockFile;
} else if (rest == "VTableMeta") {
*number = 0;
*type = kVTableManagerFile;
} else if (rest == "LOG" || rest == "LOG.old") { } else if (rest == "LOG" || rest == "LOG.old") {
*number = 0; *number = 0;
*type = kInfoLogFile; *type = kInfoLogFile;

+ 4
- 1
db/filename.h View File

@ -26,7 +26,8 @@ enum FileType {
kCurrentFile, kCurrentFile,
kTempFile, kTempFile,
kInfoLogFile, kInfoLogFile,
kVTableFile// Either the current one, or an old one kVTableFile,
kVTableManagerFile// Either the current one, or an old one
}; };
// Return the name of the log file with the specified number // Return the name of the log file with the specified number
@ -73,6 +74,8 @@ std::string InfoLogFileName(const std::string& dbname);
// Return the name of the old info log file for "dbname". // Return the name of the old info log file for "dbname".
std::string OldInfoLogFileName(const std::string& dbname); std::string OldInfoLogFileName(const std::string& dbname);
std::string VTableManagerFileName(const std::string& dbname);
// If filename is a leveldb file, store the type of the file in *type. // If filename is a leveldb file, store the type of the file in *type.
// The number encoded in the filename is stored in *number. If the // The number encoded in the filename is stored in *number. If the
// filename was successfully parsed, returns true. Else return false. // filename was successfully parsed, returns true. Else return false.

+ 3
- 1
db/repair.cc View File

@ -201,9 +201,10 @@ class Repairer {
// Do not record a version edit for this conversion to a Table // Do not record a version edit for this conversion to a Table
// since ExtractMetaData() will also generate edits. // since ExtractMetaData() will also generate edits.
FileMetaData meta; FileMetaData meta;
VTableMeta vtable_meta;
meta.number = next_file_number_++; meta.number = next_file_number_++;
Iterator* iter = mem->NewIterator(); Iterator* iter = mem->NewIterator();
status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta, &vtable_meta);
delete iter; delete iter;
mem->Unref(); mem->Unref();
mem = nullptr; mem = nullptr;
@ -212,6 +213,7 @@ class Repairer {
table_numbers_.push_back(meta.number); table_numbers_.push_back(meta.number);
} }
} }
Log(options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s", Log(options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s",
(unsigned long long)log, counter, (unsigned long long)meta.number, (unsigned long long)log, counter, (unsigned long long)meta.number,
status.ToString().c_str()); status.ToString().c_str());

+ 2
- 0
include/leveldb/options.h View File

@ -118,6 +118,8 @@ struct LEVELDB_EXPORT Options {
// initially populating a large database. // initially populating a large database.
size_t max_file_size = 2 * 1024 * 1024; size_t max_file_size = 2 * 1024 * 1024;
size_t gc_size_threshold = 1024 * 1024 * 1024;
// Compress blocks using the specified compression algorithm. This // Compress blocks using the specified compression algorithm. This
// parameter can be changed dynamically. // parameter can be changed dynamically.
// //

+ 1
- 0
table/vtable_builder.cc View File

@ -19,6 +19,7 @@ void VTableBuilder::Add(const VTableRecord& record, VTableHandle* handle) {
status_ = file_->Append(encoder_.GetHeader().ToString() + status_ = file_->Append(encoder_.GetHeader().ToString() +
encoder_.GetRecord().ToString()); encoder_.GetRecord().ToString());
record_number_ += 1;
assert(ok()); assert(ok());
//TODO: meta info support in the future //TODO: meta info support in the future
} }

+ 5
- 0
table/vtable_builder.h View File

@ -23,11 +23,16 @@ class VTableBuilder {
// Abandon building the vTable // Abandon building the vTable
void Abandon(); void Abandon();
uint64_t FileSize() const { return file_size_; }
uint64_t RecordNumber() const { return record_number_; }
private: private:
bool ok() const { return status().ok(); } bool ok() const { return status().ok(); }
WritableFile* file_; WritableFile* file_;
uint64_t file_size_{0}; uint64_t file_size_{0};
uint64_t record_number_{0};
Status status_; Status status_;

+ 169
- 0
table/vtable_manager.cc View File

@ -0,0 +1,169 @@
#include "table/vtable_manager.h"
#include "db/filename.h"
#include <iostream>
#include <ostream>
#include "leveldb/env.h"
#include "leveldb/status.h"
#include "util/coding.h"
namespace leveldb {
struct GCInfo {
std::string dbname;
std::vector<uint64_t> file_list;
Env* env = nullptr;
};
void VTableMeta::Encode(std::string* target) const {
PutVarint64(target, number);
PutVarint64(target, records_num);
PutVarint64(target, invalid_num);
PutVarint64(target, table_size);
}
Status VTableMeta::Decode(Slice* input) {
if (!GetVarint64(input, &number) || !GetVarint64(input, &records_num) ||
!GetVarint64(input, &invalid_num) || !GetVarint64(input, &table_size)) {
return Status::Corruption("Error Decode VTable meta");
}
return Status::OK();
}
void VTableManager::AddVTable(const VTableMeta& vtable_meta) {
vtables_[vtable_meta.number] = vtable_meta;
}
void VTableManager::RemoveVTable(uint64_t file_num) {
vtables_.erase(file_num);
}
Status VTableManager::AddInvalid(uint64_t file_num) {
const auto it = vtables_.find(file_num);
if (it == vtables_.end()) {
return Status::Corruption("Invalid VTable number");
}
vtables_[file_num].invalid_num += 1;
if (vtables_[file_num].invalid_num >= vtables_[file_num].records_num) {
invalid_.emplace_back(file_num);
}
MaybeScheduleGarbageCollect();
return Status::OK();
}
Status VTableManager::SaveVTableMeta() const {
auto fname = VTableManagerFileName(dbname_);
WritableFile* file;
Status s = env_->NewWritableFile(fname, &file);
if (!s.ok()) {
return Status::Corruption("Failed to open vTable manager file");
}
const auto vtable_num = vtables_.size();
std::string target;
PutVarint64(&target, vtable_num);
for (auto & vtable : vtables_) {
vtable.second.Encode(&target);
}
s = file->Append(target);
if (!s.ok()) {
return Status::Corruption("Failed to append vTable manager file");
}
s = file->Flush();
if (s.ok()) {
s = file->Sync();
}
if (s.ok()) {
s = file->Close();
}
delete file;
file = nullptr;
if (!s.ok()) {
return Status::Corruption("Failed to write vTable meta file");
}
return Status::OK();
}
Status VTableManager::LoadVTableMeta() {
auto fname = VTableManagerFileName(dbname_);
if (!env_->FileExists(fname)) {
return Status::OK();
}
SequentialFile* file;
Status s = env_->NewSequentialFile(fname, &file);
if (!s.ok()) {
return Status::Corruption("Failed to open vTable manager file");
}
uint64_t file_size;
s = env_->GetFileSize(fname, &file_size);
if (!s.ok()) {
return Status::Corruption("Failed to get vTable manager file size");
}
auto buf = new char[file_size];
Slice input;
s = file->Read(file_size, &input, buf);
if (!s.ok()) {
return Status::Corruption("Failed to read vTable manager file");
}
uint64_t vtable_num;
if(!GetVarint64(&input, &vtable_num)) {
return Status::Corruption("Failed to get vTable num");
}
for (int i = 0; i < vtable_num; i++) {
VTableMeta vtable_meta;
s = vtable_meta.Decode(&input);
if (s.ok()) {
if (vtable_meta.number == 0) {
continue;
}
AddVTable(vtable_meta);
if (vtable_meta.invalid_num >= vtable_meta.records_num) {
invalid_.emplace_back(vtable_meta.number);
}
} else {
return s;
}
}
return s;
}
void VTableManager::MaybeScheduleGarbageCollect() {
size_t size = 0;
for (auto & file_num : invalid_) {
size += vtables_[file_num].table_size;
}
if (size >= gc_threshold_) {
auto* gc_info = new GCInfo;
gc_info->dbname = dbname_;
gc_info->file_list = invalid_;
gc_info->env = env_;
env_->StartThread(&VTableManager::BackgroudGC, gc_info);
for (auto & file_num : gc_info->file_list) {
RemoveVTable(file_num);
auto it = std::remove(invalid_.begin(), invalid_.end(), file_num);
}
}
}
void VTableManager::BackgroudGC(void* gc_info) {
auto info = reinterpret_cast<GCInfo*>(gc_info);
for (auto & file_num : info->file_list) {
auto fname = VTableFileName(info->dbname, file_num);
info->env->RemoveFile(fname);
}
delete info;
}
} // namespace leveldb

+ 59
- 0
table/vtable_manager.h View File

@ -0,0 +1,59 @@
#ifndef VTABLE_MANAGER_H
#define VTABLE_MANAGER_H
#include <map>
#include "leveldb/env.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
namespace leveldb {
struct VTableMeta {
uint64_t number;
uint64_t records_num;
uint64_t invalid_num;
uint64_t table_size;
void Encode(std::string* target) const;
Status Decode(Slice* input);
VTableMeta() : number(0), records_num(0), invalid_num(0), table_size(0) {}
};
class VTableManager {
public:
explicit VTableManager(const std::string& dbname, Env* env, size_t gc_threshold) :
dbname_(dbname),
env_(env),
gc_threshold_(gc_threshold) {}
~VTableManager() = default;
void AddVTable(const VTableMeta& vtable_meta);
void RemoveVTable(uint64_t file_num);
Status AddInvalid(uint64_t file_num);
Status SaveVTableMeta() const;
Status LoadVTableMeta();
void MaybeScheduleGarbageCollect();
static void BackgroudGC(void* gc_info);
private:
std::string dbname_;
Env* env_;
std::map<uint64_t, VTableMeta> vtables_;
std::vector<uint64_t> invalid_;
size_t gc_threshold_;
};
} // namespace leveldb
#endif //VTABLE_MANAGER_H

+ 1
- 1
table/vtable_reader.cc View File

@ -2,7 +2,7 @@
#include "leveldb/env.h" #include "leveldb/env.h"
#include &lt;table/vtable_reader.h>; #include &#34;table/vtable_reader.h";
namespace leveldb { namespace leveldb {
Status VTableReader::Open(const Options& options, std::string fname) { Status VTableReader::Open(const Options& options, std::string fname) {

||||||
x
 
000:0
Loading…
Cancel
Save