Bladeren bron

gc bug fix

main
ArcueidType 4 maanden geleden
bovenliggende
commit
6a7aa874b1
8 gewijzigde bestanden met toevoegingen van 80 en 21 verwijderingen
  1. +9
    -5
      db/db_impl.cc
  2. +2
    -0
      db/dbformat.h
  3. +1
    -1
      db/filename.cc
  4. +7
    -1
      include/leveldb/status.h
  5. +25
    -11
      table/vtable_manager.cc
  6. +8
    -1
      table/vtable_manager.h
  7. +17
    -2
      table/vtable_reader.cc
  8. +11
    -0
      table/vtable_reader.h

+ 9
- 5
db/db_impl.cc Bestand weergeven

@ -1084,7 +1084,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
} }
if (type == kVTableIndex) { if (type == kVTableIndex) {
if (compact->compaction->level() > config::kNumLevels - 3) { if (compact->compaction->level() >= config::kNumLevels - config::kLevelMergeLevel) {
if (compact->vtable_builder == nullptr) { if (compact->vtable_builder == nullptr) {
auto fname = VTableFileName(dbname_, compact->vtb_num); auto fname = VTableFileName(dbname_, compact->vtb_num);
status = env_->NewWritableFile(fname, &compact->vtb_file); status = env_->NewWritableFile(fname, &compact->vtb_file);
@ -1092,16 +1092,17 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
compact->vtable_builder = vtable_builder; compact->vtable_builder = vtable_builder;
} }
VTableIndex index; VTableIndex index;
VTableReader reader;
VTableRecord record; VTableRecord record;
VTableHandle handle; VTableHandle handle;
status = index.Decode(&value); status = index.Decode(&value);
VTableReader reader(index.file_number, this->vtable_manager_);
std::string vtb_name = VTableFileName(this->dbname_, index.file_number); std::string vtb_name = VTableFileName(this->dbname_, index.file_number);
status = reader.Open(this->options_, vtb_name); status = reader.Open(this->options_, vtb_name);
status = reader.Get(index.vtable_handle, &record); status = reader.Get(index.vtable_handle, &record);
reader.Close();
vtable_manager_->AddInvalid(index.file_number); vtable_manager_->AddInvalid(index.file_number);
compact->vtable_builder->Add(record, &handle); compact->vtable_builder->Add(record, &handle);
VTableIndex new_index; VTableIndex new_index;
@ -1259,7 +1260,6 @@ Status DBImpl::DecodeValue(std::string* value) const {
} }
if (type == kVTableIndex) { if (type == kVTableIndex) {
VTableIndex index; VTableIndex index;
VTableReader reader;
VTableRecord record; VTableRecord record;
Status s = index.Decode(input); Status s = index.Decode(input);
@ -1267,17 +1267,21 @@ Status DBImpl::DecodeValue(std::string* value) const {
return s; return s;
} }
VTableReader reader(index.file_number, this->vtable_manager_);
std::string vtb_name = VTableFileName(this->dbname_, index.file_number); std::string vtb_name = VTableFileName(this->dbname_, index.file_number);
s = reader.Open(this->options_, vtb_name); s = reader.Open(this->options_, vtb_name);
if (!s.ok()) { if (!s.ok()) {
reader.Close();
return s; return s;
} }
s = reader.Get(index.vtable_handle, &record); s = reader.Get(index.vtable_handle, &record);
if (!s.ok()) { if (!s.ok()) {
reader.Close();
return s; return s;
} }
*value = record.value.ToString(); *value = record.value.ToString();
reader.Close();
return s; return s;
} }
return Status::Corruption("Unsupported value type"); return Status::Corruption("Unsupported value type");
@ -1287,7 +1291,7 @@ Status DBImpl::DecodeValue(std::string* value) const {
Status DBImpl::Get(const ReadOptions& options, const Slice& key, Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) { std::string* value) {
Status s; Status s = Status::TimeOutRead("");
MutexLock l(&mutex_); MutexLock l(&mutex_);
SequenceNumber snapshot; SequenceNumber snapshot;
if (options.snapshot != nullptr) { if (options.snapshot != nullptr) {
@ -1343,7 +1347,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
Status DBImpl::Get(const ReadOptions& options, const Slice& key, Status DBImpl::Get(const ReadOptions& options, const Slice& key,
Fields* fields) { Fields* fields) {
Status s; Status s = Status::TimeOutRead("");
MutexLock l(&mutex_); MutexLock l(&mutex_);
SequenceNumber snapshot; SequenceNumber snapshot;
if (options.snapshot != nullptr) { if (options.snapshot != nullptr) {

+ 2
- 0
db/dbformat.h Bestand weergeven

@ -24,6 +24,8 @@ namespace leveldb {
namespace config { namespace config {
static const int kNumLevels = 7; static const int kNumLevels = 7;
static const int kLevelMergeLevel = 2;
// Level-0 compaction is started when we hit this many files. // Level-0 compaction is started when we hit this many files.
static const int kL0_CompactionTrigger = 4; static const int kL0_CompactionTrigger = 4;

+ 1
- 1
db/filename.cc Bestand weergeven

@ -36,7 +36,7 @@ std::string TableFileName(const std::string& dbname, uint64_t number) {
} }
std::string VTableFileName(const std::string& dbname, uint64_t number) { std::string VTableFileName(const std::string& dbname, uint64_t number) {
assert(number > 0); // assert(number > 0);
return MakeFileName(dbname, number, "vtb"); return MakeFileName(dbname, number, "vtb");
} }

+ 7
- 1
include/leveldb/status.h Bestand weergeven

@ -52,6 +52,9 @@ class LEVELDB_EXPORT Status {
static Status IOError(const Slice& msg, const Slice& msg2 = Slice()) { static Status IOError(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kIOError, msg, msg2); return Status(kIOError, msg, msg2);
} }
static Status TimeOutRead(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kTimeOutRead, msg, msg2);
}
// Returns true iff the status indicates success. // Returns true iff the status indicates success.
bool ok() const { return (state_ == nullptr); } bool ok() const { return (state_ == nullptr); }
@ -71,6 +74,8 @@ class LEVELDB_EXPORT Status {
// Returns true iff the status indicates an InvalidArgument. // Returns true iff the status indicates an InvalidArgument.
bool IsInvalidArgument() const { return code() == kInvalidArgument; } bool IsInvalidArgument() const { return code() == kInvalidArgument; }
bool IsTimeOutReadError() const { return code() == kTimeOutRead; }
// Return a string representation of this status suitable for printing. // Return a string representation of this status suitable for printing.
// Returns the string "OK" for success. // Returns the string "OK" for success.
std::string ToString() const; std::string ToString() const;
@ -82,7 +87,8 @@ class LEVELDB_EXPORT Status {
kCorruption = 2, kCorruption = 2,
kNotSupported = 3, kNotSupported = 3,
kInvalidArgument = 4, kInvalidArgument = 4,
kIOError = 5 kIOError = 5,
kTimeOutRead = 6
}; };
Code code() const { Code code() const {

+ 25
- 11
table/vtable_manager.cc Bestand weergeven

@ -1,5 +1,6 @@
#include "table/vtable_manager.h" #include "table/vtable_manager.h"
#include "db/dbformat.h"
#include "db/filename.h" #include "db/filename.h"
#include <iostream> #include <iostream>
#include <ostream> #include <ostream>
@ -13,8 +14,9 @@ namespace leveldb {
struct GCInfo { struct GCInfo {
std::string dbname; std::string dbname;
std::vector<uint64_t> file_list; std::set<uint64_t>* file_list;
Env* env = nullptr; Env* env = nullptr;
VTableManager* vtable_manager = nullptr;
}; };
void VTableMeta::Encode(std::string* target) const { void VTableMeta::Encode(std::string* target) const {
@ -50,7 +52,7 @@ Status VTableManager::AddInvalid(uint64_t file_num) {
vtables_[file_num].invalid_num += 1; vtables_[file_num].invalid_num += 1;
if (vtables_[file_num].invalid_num >= vtables_[file_num].records_num) { if (vtables_[file_num].invalid_num >= vtables_[file_num].records_num) {
invalid_.emplace_back(file_num); invalid_.insert(file_num);
} }
MaybeScheduleGarbageCollect(); MaybeScheduleGarbageCollect();
@ -127,7 +129,7 @@ Status VTableManager::LoadVTableMeta() {
} }
AddVTable(vtable_meta); AddVTable(vtable_meta);
if (vtable_meta.invalid_num >= vtable_meta.records_num) { if (vtable_meta.invalid_num >= vtable_meta.records_num) {
invalid_.emplace_back(vtable_meta.number); invalid_.insert(vtable_meta.number);
} }
} else { } else {
return s; return s;
@ -139,31 +141,43 @@ Status VTableManager::LoadVTableMeta() {
void VTableManager::MaybeScheduleGarbageCollect() { void VTableManager::MaybeScheduleGarbageCollect() {
size_t size = 0; size_t size = 0;
auto* delete_list = new std::set<uint64_t>();
for (auto & file_num : invalid_) { for (auto & file_num : invalid_) {
size += vtables_[file_num].table_size; size += vtables_[file_num].table_size;
} }
if (size >= gc_threshold_) { if (size >= gc_threshold_) {
auto* gc_info = new GCInfo; auto* gc_info = new GCInfo;
gc_info->dbname = dbname_; gc_info->dbname = dbname_;
gc_info->file_list = invalid_; gc_info->file_list = delete_list;
gc_info->env = env_; gc_info->env = env_;
gc_info->vtable_manager = this;
env_->StartThread(&VTableManager::BackgroudGC, gc_info); env_->StartThread(&VTableManager::BackgroudGC, gc_info);
for (auto & file_num : gc_info->file_list) { // for (auto & file_num : gc_info->file_list) {
RemoveVTable(file_num); // RemoveVTable(file_num);
auto it = std::remove(invalid_.begin(), invalid_.end(), file_num); // auto it = std::remove(invalid_.begin(), invalid_.end(), file_num);
} // }
} }
} }
void VTableManager::BackgroudGC(void* gc_info) { void VTableManager::BackgroudGC(void* gc_info) {
auto info = reinterpret_cast<GCInfo*>(gc_info); auto info = reinterpret_cast<GCInfo*>(gc_info);
for (auto & file_num : info->file_list) { for (auto & file_num : *info->file_list) {
// if (file_num <= 0) {continue;}
auto fname = VTableFileName(info->dbname, file_num); auto fname = VTableFileName(info->dbname, file_num);
info->env->RemoveFile(fname); if (info->vtable_manager->vtables_[file_num].ref <= 0) {
info->env->RemoveFile(fname);
info->vtable_manager->invalid_.erase(file_num);
}
} }
delete info;
} }
void VTableManager::RefVTable(uint64_t file_num) {
vtables_[file_num].ref += 1;
}
void VTableManager::UnrefVTable(uint64_t file_num) {
vtables_[file_num].ref -= 1;
}
} // namespace leveldb } // namespace leveldb

+ 8
- 1
table/vtable_manager.h Bestand weergeven

@ -2,6 +2,7 @@
#define VTABLE_MANAGER_H #define VTABLE_MANAGER_H
#include <map> #include <map>
#include <set>
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/slice.h" #include "leveldb/slice.h"
@ -18,6 +19,8 @@ struct VTableMeta {
uint64_t table_size; uint64_t table_size;
uint64_t ref = 0;
void Encode(std::string* target) const; void Encode(std::string* target) const;
Status Decode(Slice* input); Status Decode(Slice* input);
@ -43,6 +46,10 @@ class VTableManager {
Status LoadVTableMeta(); Status LoadVTableMeta();
void RefVTable(uint64_t file_num);
void UnrefVTable(uint64_t file_num);
void MaybeScheduleGarbageCollect(); void MaybeScheduleGarbageCollect();
static void BackgroudGC(void* gc_info); static void BackgroudGC(void* gc_info);
@ -51,7 +58,7 @@ class VTableManager {
std::string dbname_; std::string dbname_;
Env* env_; Env* env_;
std::map<uint64_t, VTableMeta> vtables_; std::map<uint64_t, VTableMeta> vtables_;
std::vector<uint64_t> invalid_; std::set<uint64_t> invalid_;
size_t gc_threshold_; size_t gc_threshold_;
}; };

+ 17
- 2
table/vtable_reader.cc Bestand weergeven

@ -7,14 +7,23 @@
namespace leveldb { namespace leveldb {
Status VTableReader::Open(const Options& options, std::string fname) { Status VTableReader::Open(const Options& options, std::string fname) {
options_ = options; options_ = options;
return options_.env->NewRandomAccessFile(fname, &file_); Status s = options_.env->NewRandomAccessFile(fname, &file_);
if (manager_ != nullptr) {
manager_->RefVTable(fnum_);
}
return s;
} }
Status VTableReader::Get(const VTableHandle& handle, Status VTableReader::Get(const VTableHandle& handle,
VTableRecord* record) const { VTableRecord* record) const {
auto buf = new char[handle.size]; auto buf = new char[handle.size];
Slice input; Slice input;
Status s = file_->Read(handle.offset, handle.size, &input, buf); Status s;
if (file_ != nullptr) {
s = file_->Read(handle.offset, handle.size, &input, buf);
} else {
s = Status::TimeOutRead("Get for another time");
}
if (!s.ok()) { if (!s.ok()) {
return s; return s;
@ -35,5 +44,11 @@ namespace leveldb {
return s; return s;
} }
void VTableReader::Close() {
file_ = nullptr;
if (manager_ != nullptr) {
manager_->UnrefVTable(fnum_);
}
}
} // namespace leveldb } // namespace leveldb

+ 11
- 0
table/vtable_reader.h Bestand weergeven

@ -11,18 +11,29 @@
#include "util/coding.h" #include "util/coding.h"
#include "vtable_format.h" #include "vtable_format.h"
#include "vtable_manager.h"
namespace leveldb { namespace leveldb {
class VTableReader { class VTableReader {
public: public:
VTableReader() = default;
VTableReader(uint64_t fnum, VTableManager *manager) :
fnum_(fnum),
manager_(manager) {};
Status Open(const Options& options, std::string fname); Status Open(const Options& options, std::string fname);
Status Get(const VTableHandle& handle, Status Get(const VTableHandle& handle,
VTableRecord* record) const ; VTableRecord* record) const ;
void Close();
private: private:
Options options_; Options options_;
uint64_t fnum_;
RandomAccessFile* file_{nullptr}; RandomAccessFile* file_{nullptr};
VTableManager* manager_{nullptr};
}; };
} // namespace leveldb } // namespace leveldb

||||||
x
 
000:0
Laden…
Annuleren
Opslaan