diff --git a/db/db_impl.cc b/db/db_impl.cc index 1abcff3..320b445 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1084,7 +1084,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } if (type == kVTableIndex) { - if (compact->compaction->level() > config::kNumLevels - 3) { + if (compact->compaction->level() >= config::kNumLevels - config::kLevelMergeLevel) { if (compact->vtable_builder == nullptr) { auto fname = VTableFileName(dbname_, compact->vtb_num); status = env_->NewWritableFile(fname, &compact->vtb_file); @@ -1092,16 +1092,17 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { compact->vtable_builder = vtable_builder; } VTableIndex index; - VTableReader reader; VTableRecord record; VTableHandle handle; status = index.Decode(&value); + VTableReader reader(index.file_number, this->vtable_manager_); std::string vtb_name = VTableFileName(this->dbname_, index.file_number); status = reader.Open(this->options_, vtb_name); status = reader.Get(index.vtable_handle, &record); + reader.Close(); vtable_manager_->AddInvalid(index.file_number); compact->vtable_builder->Add(record, &handle); VTableIndex new_index; @@ -1259,7 +1260,6 @@ Status DBImpl::DecodeValue(std::string* value) const { } if (type == kVTableIndex) { VTableIndex index; - VTableReader reader; VTableRecord record; Status s = index.Decode(input); @@ -1267,17 +1267,21 @@ Status DBImpl::DecodeValue(std::string* value) const { return s; } + VTableReader reader(index.file_number, this->vtable_manager_); std::string vtb_name = VTableFileName(this->dbname_, index.file_number); s = reader.Open(this->options_, vtb_name); if (!s.ok()) { + reader.Close(); return s; } s = reader.Get(index.vtable_handle, &record); if (!s.ok()) { + reader.Close(); return s; } *value = record.value.ToString(); + reader.Close(); return s; } return Status::Corruption("Unsupported value type"); @@ -1287,7 +1291,7 @@ Status DBImpl::DecodeValue(std::string* value) const { Status DBImpl::Get(const ReadOptions& options, const Slice& key, std::string* value) { - Status s; + Status s = Status::TimeOutRead(""); MutexLock l(&mutex_); SequenceNumber snapshot; if (options.snapshot != nullptr) { @@ -1343,7 +1347,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, Status DBImpl::Get(const ReadOptions& options, const Slice& key, Fields* fields) { - Status s; + Status s = Status::TimeOutRead(""); MutexLock l(&mutex_); SequenceNumber snapshot; if (options.snapshot != nullptr) { diff --git a/db/dbformat.h b/db/dbformat.h index a1c30ed..5884716 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -24,6 +24,8 @@ namespace leveldb { namespace config { static const int kNumLevels = 7; +static const int kLevelMergeLevel = 2; + // Level-0 compaction is started when we hit this many files. static const int kL0_CompactionTrigger = 4; diff --git a/db/filename.cc b/db/filename.cc index e98d8e4..51ee9ce 100644 --- a/db/filename.cc +++ b/db/filename.cc @@ -36,7 +36,7 @@ std::string TableFileName(const std::string& dbname, uint64_t number) { } std::string VTableFileName(const std::string& dbname, uint64_t number) { - assert(number > 0); + // assert(number > 0); return MakeFileName(dbname, number, "vtb"); } diff --git a/include/leveldb/status.h b/include/leveldb/status.h index e327314..2ed8d44 100644 --- a/include/leveldb/status.h +++ b/include/leveldb/status.h @@ -52,6 +52,9 @@ class LEVELDB_EXPORT Status { static Status IOError(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kIOError, msg, msg2); } + static Status TimeOutRead(const Slice& msg, const Slice& msg2 = Slice()) { + return Status(kTimeOutRead, msg, msg2); + } // Returns true iff the status indicates success. bool ok() const { return (state_ == nullptr); } @@ -71,6 +74,8 @@ class LEVELDB_EXPORT Status { // Returns true iff the status indicates an InvalidArgument. bool IsInvalidArgument() const { return code() == kInvalidArgument; } + bool IsTimeOutReadError() const { return code() == kTimeOutRead; } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; @@ -82,7 +87,8 @@ class LEVELDB_EXPORT Status { kCorruption = 2, kNotSupported = 3, kInvalidArgument = 4, - kIOError = 5 + kIOError = 5, + kTimeOutRead = 6 }; Code code() const { diff --git a/table/vtable_manager.cc b/table/vtable_manager.cc index c02f98d..21906ed 100644 --- a/table/vtable_manager.cc +++ b/table/vtable_manager.cc @@ -1,5 +1,6 @@ #include "table/vtable_manager.h" +#include "db/dbformat.h" #include "db/filename.h" #include <iostream> #include <ostream> @@ -13,8 +14,9 @@ namespace leveldb { struct GCInfo { std::string dbname; - std::vector<uint64_t> file_list; + std::set<uint64_t>* file_list; Env* env = nullptr; + VTableManager* vtable_manager = nullptr; }; void VTableMeta::Encode(std::string* target) const { @@ -50,7 +52,7 @@ Status VTableManager::AddInvalid(uint64_t file_num) { vtables_[file_num].invalid_num += 1; if (vtables_[file_num].invalid_num >= vtables_[file_num].records_num) { - invalid_.emplace_back(file_num); + invalid_.insert(file_num); } MaybeScheduleGarbageCollect(); @@ -127,7 +129,7 @@ Status VTableManager::LoadVTableMeta() { } AddVTable(vtable_meta); if (vtable_meta.invalid_num >= vtable_meta.records_num) { - invalid_.emplace_back(vtable_meta.number); + invalid_.insert(vtable_meta.number); } } else { return s; @@ -139,31 +141,43 @@ Status VTableManager::LoadVTableMeta() { void VTableManager::MaybeScheduleGarbageCollect() { size_t size = 0; + auto* delete_list = new std::set<uint64_t>(); for (auto & file_num : invalid_) { size += vtables_[file_num].table_size; } if (size >= gc_threshold_) { auto* gc_info = new GCInfo; gc_info->dbname = dbname_; - gc_info->file_list = invalid_; + gc_info->file_list = delete_list; gc_info->env = env_; + gc_info->vtable_manager = this; env_->StartThread(&VTableManager::BackgroudGC, gc_info); - for (auto & file_num : gc_info->file_list) { - RemoveVTable(file_num); - auto it = std::remove(invalid_.begin(), invalid_.end(), file_num); - } + // for (auto & file_num : gc_info->file_list) { + // RemoveVTable(file_num); + // auto it = std::remove(invalid_.begin(), invalid_.end(), file_num); + // } } } void VTableManager::BackgroudGC(void* gc_info) { auto info = reinterpret_cast<GCInfo*>(gc_info); - for (auto & file_num : info->file_list) { + for (auto & file_num : *info->file_list) { + // if (file_num <= 0) {continue;} auto fname = VTableFileName(info->dbname, file_num); - info->env->RemoveFile(fname); + + if (info->vtable_manager->vtables_[file_num].ref <= 0) { + info->env->RemoveFile(fname); + info->vtable_manager->invalid_.erase(file_num); + } } - delete info; } +void VTableManager::RefVTable(uint64_t file_num) { + vtables_[file_num].ref += 1; +} +void VTableManager::UnrefVTable(uint64_t file_num) { + vtables_[file_num].ref -= 1; +} } // namespace leveldb diff --git a/table/vtable_manager.h b/table/vtable_manager.h index 7ad1eff..8199202 100644 --- a/table/vtable_manager.h +++ b/table/vtable_manager.h @@ -2,6 +2,7 @@ #define VTABLE_MANAGER_H #include <map> +#include <set> #include "leveldb/env.h" #include "leveldb/slice.h" @@ -18,6 +19,8 @@ struct VTableMeta { uint64_t table_size; + uint64_t ref = 0; + void Encode(std::string* target) const; Status Decode(Slice* input); @@ -43,6 +46,10 @@ class VTableManager { Status LoadVTableMeta(); + void RefVTable(uint64_t file_num); + + void UnrefVTable(uint64_t file_num); + void MaybeScheduleGarbageCollect(); static void BackgroudGC(void* gc_info); @@ -51,7 +58,7 @@ class VTableManager { std::string dbname_; Env* env_; std::map<uint64_t, VTableMeta> vtables_; - std::vector<uint64_t> invalid_; + std::set<uint64_t> invalid_; size_t gc_threshold_; }; diff --git a/table/vtable_reader.cc b/table/vtable_reader.cc index 2cd1478..d02a0b7 100644 --- a/table/vtable_reader.cc +++ b/table/vtable_reader.cc @@ -7,14 +7,23 @@ namespace leveldb { Status VTableReader::Open(const Options& options, std::string fname) { options_ = options; - return options_.env->NewRandomAccessFile(fname, &file_); + Status s = options_.env->NewRandomAccessFile(fname, &file_); + if (manager_ != nullptr) { + manager_->RefVTable(fnum_); + } + return s; } Status VTableReader::Get(const VTableHandle& handle, VTableRecord* record) const { auto buf = new char[handle.size]; Slice input; - Status s = file_->Read(handle.offset, handle.size, &input, buf); + Status s; + if (file_ != nullptr) { + s = file_->Read(handle.offset, handle.size, &input, buf); + } else { + s = Status::TimeOutRead("Get for another time"); + } if (!s.ok()) { return s; @@ -35,5 +44,11 @@ namespace leveldb { return s; } + void VTableReader::Close() { + file_ = nullptr; + if (manager_ != nullptr) { + manager_->UnrefVTable(fnum_); + } + } } // namespace leveldb \ No newline at end of file diff --git a/table/vtable_reader.h b/table/vtable_reader.h index bad18f5..24e32fa 100644 --- a/table/vtable_reader.h +++ b/table/vtable_reader.h @@ -11,18 +11,29 @@ #include "util/coding.h" #include "vtable_format.h" +#include "vtable_manager.h" namespace leveldb { class VTableReader { public: + VTableReader() = default; + + VTableReader(uint64_t fnum, VTableManager *manager) : + fnum_(fnum), + manager_(manager) {}; + Status Open(const Options& options, std::string fname); Status Get(const VTableHandle& handle, VTableRecord* record) const ; + + void Close(); private: Options options_; + uint64_t fnum_; RandomAccessFile* file_{nullptr}; + VTableManager* manager_{nullptr}; }; } // namespace leveldb