Browse Source

gc function

main
ArcueidType 8 months ago
parent
commit
096f6fddfb
5 changed files with 147 additions and 18 deletions
  1. +95
    -15
      db/db_impl.cc
  2. +1
    -1
      db/db_impl.h
  3. +2
    -0
      include/leveldb/options.h
  4. +41
    -0
      table/vtable_manager.cc
  5. +8
    -2
      table/vtable_manager.h

+ 95
- 15
db/db_impl.cc View File

@ -33,6 +33,7 @@
#include "table/block.h"
#include "table/merger.h"
#include "table/two_level_iterator.h"
#include "table/vtable_builder.h"
#include "table/vtable_format.h"
#include "table/vtable_manager.h"
#include "table/vtable_reader.h"
@ -70,7 +71,10 @@ struct DBImpl::CompactionState {
: compaction(c),
smallest_snapshot(0),
outfile(nullptr),
vtb_file(nullptr),
builder(nullptr),
vtable_builder(nullptr),
vtb_num(0),
total_bytes(0) {}
Compaction* const compaction;
@ -85,8 +89,11 @@ struct DBImpl::CompactionState {
// State kept for output being generated
WritableFile* outfile;
WritableFile* vtb_file;
TableBuilder* builder;
VTableBuilder* vtable_builder;
uint64_t vtb_num;
uint64_t total_bytes;
};
@ -153,7 +160,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
manual_compaction_(nullptr),
versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)),
vtable_manager_(new VTableManager(dbname, raw_options.env)){}
vtable_manager_(new VTableManager(dbname, raw_options.env, raw_options.gc_size_threshold)){}
DBImpl::~DBImpl() {
// Wait for background work to finish.
@ -553,7 +560,9 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
}
edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
meta.largest);
vtable_manager_->AddVTable(vtable_meta);
if (vtable_meta.number > 0) {
vtable_manager_->AddVTable(vtable_meta);
}
}
CompactionStats stats;
@ -844,6 +853,10 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
mutex_.Unlock();
}
if (compact->vtb_num == 0) {
compact->vtb_num = file_number;
}
// Make the output file
std::string fname = TableFileName(dbname_, file_number);
Status s = env_->NewWritableFile(fname, &compact->outfile);
@ -886,6 +899,26 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
delete compact->outfile;
compact->outfile = nullptr;
if (compact->vtable_builder != nullptr && s.ok()) {
VTableMeta meta;
meta.invalid_num = 0;
meta.number = compact->vtb_num;
meta.records_num = compact->vtable_builder->RecordNumber();
meta.table_size = compact->vtable_builder->FileSize();
s = compact->vtable_builder->Finish();
delete compact->vtable_builder;
compact->vtable_builder = nullptr;
if (s.ok()) {
s = compact->vtb_file->Sync();
}
if (s.ok()) {
s = compact->vtb_file->Close();
}
delete compact->vtb_file;
compact->vtb_file = nullptr;
}
if (s.ok() && current_entries > 0) {
// Verify that the table is usable
Iterator* iter =
@ -920,6 +953,14 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}
bool GetValueType(const Slice& input, unsigned char* value) {
if (input.empty()) {
return false;
}
*value = *input.data();
return true;
}
Status DBImpl::DoCompactionWork(CompactionState* compact) {
const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
@ -943,6 +984,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// Release mutex while we're actually doing the compaction work
mutex_.Unlock();
enum Type : unsigned char {
kVTableIndex = 1,
kNonIndexValue = 2,
};
input->SeekToFirst();
Status status;
ParsedInternalKey ikey;
@ -1029,7 +1074,43 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
compact->builder->Add(key, input->value());
auto value = input->value();
std::string new_value = value.ToString();
unsigned char type;
if(!GetValueType(value, &type)) {
break;
}
if (type == kVTableIndex) {
if (compact->compaction->level() > config::kNumLevels - 3) {
if (compact->vtable_builder == nullptr) {
auto fname = VTableFileName(dbname_, compact->vtb_num);
status = env_->NewWritableFile(fname, &compact->vtb_file);
auto vtable_builder = new VTableBuilder(options_, compact->vtb_file);
compact->vtable_builder = vtable_builder;
}
VTableIndex index;
VTableReader reader;
VTableRecord record;
VTableHandle handle;
status = index.Decode(&value);
std::string vtb_name = VTableFileName(this->dbname_, index.file_number);
status = reader.Open(this->options_, vtb_name);
status = reader.Get(index.vtable_handle, &record);
vtable_manager_->AddInvalid(index.file_number);
compact->vtable_builder->Add(record, &handle);
VTableIndex new_index;
new_index.file_number = compact->vtb_num;
new_index.vtable_handle = handle;
new_index.Encode(&new_value);
}
}
compact->builder->Add(key, Slice(new_value));
// Close output file if it is big enough
if (compact->builder->FileSize() >=
@ -1039,6 +1120,17 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
break;
}
}
} else {
unsigned char type;
auto value = input->value();
if(!GetValueType(value, &type)) {
break;
}
if (type == kVTableIndex) {
VTableIndex vtable_index;
vtable_index.Decode(&value);
vtable_manager_->AddInvalid(vtable_index.file_number);
}
}
input->Next();
@ -1148,18 +1240,6 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
return versions_->MaxNextLevelOverlappingBytes();
}
namespace {
bool GetValueType(const Slice& input, unsigned char* value) {
if (input.empty()) {
return false;
}
*value = *input.data();
return true;
}
} // namespace
Status DBImpl::DecodeValue(std::string* value) const {
enum Type : unsigned char {
kVTableIndex = 1,

+ 1
- 1
db/db_impl.h View File

@ -212,7 +212,7 @@ class DBImpl : public DB {
CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_);
VTableManager* vtable_manager_ GUARDED_BY(mutex_) {};
VTableManager* vtable_manager_ {};
};
// Sanitize db options. The caller should delete result.info_log if

+ 2
- 0
include/leveldb/options.h View File

@ -118,6 +118,8 @@ struct LEVELDB_EXPORT Options {
// initially populating a large database.
size_t max_file_size = 2 * 1024 * 1024;
size_t gc_size_threshold = 1024 * 1024 * 1024;
// Compress blocks using the specified compression algorithm. This
// parameter can be changed dynamically.
//

+ 41
- 0
table/vtable_manager.cc View File

@ -11,6 +11,12 @@
namespace leveldb {
struct GCInfo {
std::string dbname;
std::vector<uint64_t> file_list;
Env* env = nullptr;
};
void VTableMeta::Encode(std::string* target) const {
PutVarint64(target, number);
PutVarint64(target, records_num);
@ -46,6 +52,9 @@ Status VTableManager::AddInvalid(uint64_t file_num) {
if (vtables_[file_num].invalid_num >= vtables_[file_num].records_num) {
invalid_.emplace_back(file_num);
}
MaybeScheduleGarbageCollect();
return Status::OK();
}
@ -113,6 +122,9 @@ Status VTableManager::LoadVTableMeta() {
VTableMeta vtable_meta;
s = vtable_meta.Decode(&input);
if (s.ok()) {
if (vtable_meta.number == 0) {
continue;
}
AddVTable(vtable_meta);
if (vtable_meta.invalid_num >= vtable_meta.records_num) {
invalid_.emplace_back(vtable_meta.number);
@ -125,4 +137,33 @@ Status VTableManager::LoadVTableMeta() {
return s;
}
void VTableManager::MaybeScheduleGarbageCollect() {
size_t size = 0;
for (auto & file_num : invalid_) {
size += vtables_[file_num].table_size;
}
if (size >= gc_threshold_) {
auto* gc_info = new GCInfo;
gc_info->dbname = dbname_;
gc_info->file_list = invalid_;
gc_info->env = env_;
env_->StartThread(&VTableManager::BackgroudGC, gc_info);
for (auto & file_num : gc_info->file_list) {
RemoveVTable(file_num);
auto it = std::remove(invalid_.begin(), invalid_.end(), file_num);
}
}
}
void VTableManager::BackgroudGC(void* gc_info) {
auto info = reinterpret_cast<GCInfo*>(gc_info);
for (auto & file_num : info->file_list) {
auto fname = VTableFileName(info->dbname, file_num);
info->env->RemoveFile(fname);
}
delete info;
}
} // namespace leveldb

+ 8
- 2
table/vtable_manager.h View File

@ -26,9 +26,10 @@ struct VTableMeta {
class VTableManager {
public:
explicit VTableManager(const std::string& dbname, Env* env) :
explicit VTableManager(const std::string& dbname, Env* env, size_t gc_threshold) :
dbname_(dbname),
env_(env) {}
env_(env),
gc_threshold_(gc_threshold) {}
~VTableManager() = default;
@ -42,11 +43,16 @@ class VTableManager {
Status LoadVTableMeta();
void MaybeScheduleGarbageCollect();
static void BackgroudGC(void* gc_info);
private:
std::string dbname_;
Env* env_;
std::map<uint64_t, VTableMeta> vtables_;
std::vector<uint64_t> invalid_;
size_t gc_threshold_;
};
} // namespace leveldb

Loading…
Cancel
Save