Browse Source

Merge branch 'main' into wesley

main
wesley 4 months ago
parent
commit
3d9eddf44f
8 changed files with 80 additions and 21 deletions
  1. +9
    -5
      db/db_impl.cc
  2. +2
    -0
      db/dbformat.h
  3. +1
    -1
      db/filename.cc
  4. +7
    -1
      include/leveldb/status.h
  5. +25
    -11
      table/vtable_manager.cc
  6. +8
    -1
      table/vtable_manager.h
  7. +17
    -2
      table/vtable_reader.cc
  8. +11
    -0
      table/vtable_reader.h

+ 9
- 5
db/db_impl.cc View File

@ -1084,7 +1084,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}
if (type == kVTableIndex) {
if (compact->compaction->level() > config::kNumLevels - 3) {
if (compact->compaction->level() >= config::kNumLevels - config::kLevelMergeLevel) {
if (compact->vtable_builder == nullptr) {
auto fname = VTableFileName(dbname_, compact->vtb_num);
status = env_->NewWritableFile(fname, &compact->vtb_file);
@ -1092,16 +1092,17 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
compact->vtable_builder = vtable_builder;
}
VTableIndex index;
VTableReader reader;
VTableRecord record;
VTableHandle handle;
status = index.Decode(&value);
VTableReader reader(index.file_number, this->vtable_manager_);
std::string vtb_name = VTableFileName(this->dbname_, index.file_number);
status = reader.Open(this->options_, vtb_name);
status = reader.Get(index.vtable_handle, &record);
reader.Close();
vtable_manager_->AddInvalid(index.file_number);
compact->vtable_builder->Add(record, &handle);
VTableIndex new_index;
@ -1259,7 +1260,6 @@ Status DBImpl::DecodeValue(std::string* value) const {
}
if (type == kVTableIndex) {
VTableIndex index;
VTableReader reader;
VTableRecord record;
Status s = index.Decode(input);
@ -1267,17 +1267,21 @@ Status DBImpl::DecodeValue(std::string* value) const {
return s;
}
VTableReader reader(index.file_number, this->vtable_manager_);
std::string vtb_name = VTableFileName(this->dbname_, index.file_number);
s = reader.Open(this->options_, vtb_name);
if (!s.ok()) {
reader.Close();
return s;
}
s = reader.Get(index.vtable_handle, &record);
if (!s.ok()) {
reader.Close();
return s;
}
*value = record.value.ToString();
reader.Close();
return s;
}
return Status::Corruption("Unsupported value type");
@ -1287,7 +1291,7 @@ Status DBImpl::DecodeValue(std::string* value) const {
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
Status s = Status::TimeOutRead("");
MutexLock l(&mutex_);
SequenceNumber snapshot;
if (options.snapshot != nullptr) {
@ -1343,7 +1347,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
Fields* fields) {
Status s;
Status s = Status::TimeOutRead("");
MutexLock l(&mutex_);
SequenceNumber snapshot;
if (options.snapshot != nullptr) {

+ 2
- 0
db/dbformat.h View File

@ -24,6 +24,8 @@ namespace leveldb {
namespace config {
static const int kNumLevels = 7;
static const int kLevelMergeLevel = 2;
// Level-0 compaction is started when we hit this many files.
static const int kL0_CompactionTrigger = 4;

+ 1
- 1
db/filename.cc View File

@ -36,7 +36,7 @@ std::string TableFileName(const std::string& dbname, uint64_t number) {
}
std::string VTableFileName(const std::string& dbname, uint64_t number) {
assert(number > 0);
// assert(number > 0);
return MakeFileName(dbname, number, "vtb");
}

+ 7
- 1
include/leveldb/status.h View File

@ -52,6 +52,9 @@ class LEVELDB_EXPORT Status {
static Status IOError(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kIOError, msg, msg2);
}
static Status TimeOutRead(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kTimeOutRead, msg, msg2);
}
// Returns true iff the status indicates success.
bool ok() const { return (state_ == nullptr); }
@ -71,6 +74,8 @@ class LEVELDB_EXPORT Status {
// Returns true iff the status indicates an InvalidArgument.
bool IsInvalidArgument() const { return code() == kInvalidArgument; }
bool IsTimeOutReadError() const { return code() == kTimeOutRead; }
// Return a string representation of this status suitable for printing.
// Returns the string "OK" for success.
std::string ToString() const;
@ -82,7 +87,8 @@ class LEVELDB_EXPORT Status {
kCorruption = 2,
kNotSupported = 3,
kInvalidArgument = 4,
kIOError = 5
kIOError = 5,
kTimeOutRead = 6
};
Code code() const {

+ 25
- 11
table/vtable_manager.cc View File

@ -1,5 +1,6 @@
#include "table/vtable_manager.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include <iostream>
#include <ostream>
@ -13,8 +14,9 @@ namespace leveldb {
struct GCInfo {
std::string dbname;
std::vector<uint64_t> file_list;
std::set<uint64_t>* file_list;
Env* env = nullptr;
VTableManager* vtable_manager = nullptr;
};
void VTableMeta::Encode(std::string* target) const {
@ -50,7 +52,7 @@ Status VTableManager::AddInvalid(uint64_t file_num) {
vtables_[file_num].invalid_num += 1;
if (vtables_[file_num].invalid_num >= vtables_[file_num].records_num) {
invalid_.emplace_back(file_num);
invalid_.insert(file_num);
}
MaybeScheduleGarbageCollect();
@ -127,7 +129,7 @@ Status VTableManager::LoadVTableMeta() {
}
AddVTable(vtable_meta);
if (vtable_meta.invalid_num >= vtable_meta.records_num) {
invalid_.emplace_back(vtable_meta.number);
invalid_.insert(vtable_meta.number);
}
} else {
return s;
@ -139,31 +141,43 @@ Status VTableManager::LoadVTableMeta() {
void VTableManager::MaybeScheduleGarbageCollect() {
size_t size = 0;
auto* delete_list = new std::set<uint64_t>();
for (auto & file_num : invalid_) {
size += vtables_[file_num].table_size;
}
if (size >= gc_threshold_) {
auto* gc_info = new GCInfo;
gc_info->dbname = dbname_;
gc_info->file_list = invalid_;
gc_info->file_list = delete_list;
gc_info->env = env_;
gc_info->vtable_manager = this;
env_->StartThread(&VTableManager::BackgroudGC, gc_info);
for (auto & file_num : gc_info->file_list) {
RemoveVTable(file_num);
auto it = std::remove(invalid_.begin(), invalid_.end(), file_num);
}
// for (auto & file_num : gc_info->file_list) {
// RemoveVTable(file_num);
// auto it = std::remove(invalid_.begin(), invalid_.end(), file_num);
// }
}
}
void VTableManager::BackgroudGC(void* gc_info) {
auto info = reinterpret_cast<GCInfo*>(gc_info);
for (auto & file_num : info->file_list) {
for (auto & file_num : *info->file_list) {
// if (file_num <= 0) {continue;}
auto fname = VTableFileName(info->dbname, file_num);
info->env->RemoveFile(fname);
if (info->vtable_manager->vtables_[file_num].ref <= 0) {
info->env->RemoveFile(fname);
info->vtable_manager->invalid_.erase(file_num);
}
}
delete info;
}
void VTableManager::RefVTable(uint64_t file_num) {
vtables_[file_num].ref += 1;
}
void VTableManager::UnrefVTable(uint64_t file_num) {
vtables_[file_num].ref -= 1;
}
} // namespace leveldb

+ 8
- 1
table/vtable_manager.h View File

@ -2,6 +2,7 @@
#define VTABLE_MANAGER_H
#include <map>
#include <set>
#include "leveldb/env.h"
#include "leveldb/slice.h"
@ -18,6 +19,8 @@ struct VTableMeta {
uint64_t table_size;
uint64_t ref = 0;
void Encode(std::string* target) const;
Status Decode(Slice* input);
@ -43,6 +46,10 @@ class VTableManager {
Status LoadVTableMeta();
void RefVTable(uint64_t file_num);
void UnrefVTable(uint64_t file_num);
void MaybeScheduleGarbageCollect();
static void BackgroudGC(void* gc_info);
@ -51,7 +58,7 @@ class VTableManager {
std::string dbname_;
Env* env_;
std::map<uint64_t, VTableMeta> vtables_;
std::vector<uint64_t> invalid_;
std::set<uint64_t> invalid_;
size_t gc_threshold_;
};

+ 17
- 2
table/vtable_reader.cc View File

@ -7,14 +7,23 @@
namespace leveldb {
Status VTableReader::Open(const Options& options, std::string fname) {
options_ = options;
return options_.env->NewRandomAccessFile(fname, &file_);
Status s = options_.env->NewRandomAccessFile(fname, &file_);
if (manager_ != nullptr) {
manager_->RefVTable(fnum_);
}
return s;
}
Status VTableReader::Get(const VTableHandle& handle,
VTableRecord* record) const {
auto buf = new char[handle.size];
Slice input;
Status s = file_->Read(handle.offset, handle.size, &input, buf);
Status s;
if (file_ != nullptr) {
s = file_->Read(handle.offset, handle.size, &input, buf);
} else {
s = Status::TimeOutRead("Get for another time");
}
if (!s.ok()) {
return s;
@ -35,5 +44,11 @@ namespace leveldb {
return s;
}
void VTableReader::Close() {
file_ = nullptr;
if (manager_ != nullptr) {
manager_->UnrefVTable(fnum_);
}
}
} // namespace leveldb

+ 11
- 0
table/vtable_reader.h View File

@ -11,18 +11,29 @@
#include "util/coding.h"
#include "vtable_format.h"
#include "vtable_manager.h"
namespace leveldb {
class VTableReader {
public:
VTableReader() = default;
VTableReader(uint64_t fnum, VTableManager *manager) :
fnum_(fnum),
manager_(manager) {};
Status Open(const Options& options, std::string fname);
Status Get(const VTableHandle& handle,
VTableRecord* record) const ;
void Close();
private:
Options options_;
uint64_t fnum_;
RandomAccessFile* file_{nullptr};
VTableManager* manager_{nullptr};
};
} // namespace leveldb

||||||
x
 
000:0
Loading…
Cancel
Save