浏览代码

vlog_reader/writer v2.0

main
VirgilZhu 8 个月前
父节点
当前提交
1f1a6de7b2
共有 17 个文件被更改,包括 108 次插入65 次删除
  1. +30
    -31
      db/db_impl.cc
  2. +5
    -1
      db/db_impl.h
  3. +1
    -0
      db/db_iter.cc
  4. +2
    -0
      db/db_test.cc
  5. +3
    -1
      db/dumpfile.cc
  6. +1
    -0
      db/fault_injection_test.cc
  7. +1
    -0
      db/leveldbutil.cc
  8. +1
    -0
      db/log_test.cc
  9. +8
    -1
      db/memtable.cc
  10. +7
    -0
      db/memtable.h
  11. +23
    -25
      db/write_batch.cc
  12. +7
    -2
      helpers/memenv/memenv.cc
  13. +1
    -0
      include/leveldb/env.h
  14. +5
    -2
      include/leveldb/options.h
  15. +5
    -2
      include/leveldb/write_batch.h
  16. +1
    -0
      table/table_test.cc
  17. +7
    -0
      util/env_posix.cc

+ 30
- 31
db/db_impl.cc 查看文件

@ -146,11 +146,14 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
has_imm_(false),
logfile_(nullptr),
logfile_number_(0),
log_(nullptr),
seed_(0),
tmp_batch_(new WriteBatch),
background_compaction_scheduled_(false),
manual_compaction_(nullptr),
vlog_(nullptr),
vlog_kv_numbers_(0),
versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {}
@ -171,7 +174,7 @@ DBImpl::~DBImpl() {
if (mem_ != nullptr) mem_->Unref();
if (imm_ != nullptr) imm_->Unref();
delete tmp_batch_;
delete log_;
delete vlog_;
delete logfile_;
delete table_cache_;
@ -476,13 +479,13 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
// See if we should keep reusing the last log file.
if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
assert(logfile_ == nullptr);
assert(log_ == nullptr);
assert(vlog_ == nullptr);
assert(mem_ == nullptr);
uint64_t lfile_size;
if (env_->GetFileSize(fname, &lfile_size).ok() &&
env_->NewAppendableFile(fname, &logfile_).ok()) {
Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
log_ = new log::Writer(logfile_, lfile_size);
vlog_ = new log::VlogWriter(logfile_, lfile_size);
logfile_number_ = log_number;
if (mem != nullptr) {
mem_ = mem;
@ -1313,6 +1316,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
/* TODO */
vlog_kv_numbers_ += WriteBatchInternal::Count(write_batch);
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
@ -1423,6 +1429,23 @@ Status DBImpl::MakeRoomForWrite(bool force) {
assert(!writers_.empty());
bool allow_delay = !force;
Status s;
if (logfile_->GetSize() > options_.max_value_log_size) {
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = nullptr;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
versions_->ReuseFileNumber(new_log_number);
}
// gc_management_->WriteFileMap(logfile_number_, vlog_kv_numbers_, logfile_->GetSize());
vlog_kv_numbers_ = 0;
delete vlog_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
vlog_ = new log::VlogWriter(lfile);
}
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
@ -1456,33 +1479,9 @@ Status DBImpl::MakeRoomForWrite(bool force) {
} else {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = nullptr;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_;
mem_->SetLogFileNumber(logfile_number_);
s = logfile_->Close();
if (!s.ok()) {
// We may have lost some data written to the previous log file.
// Switch to the new log file anyway, but record as a background
// error so we do not attempt any more writes.
//
// We could perhaps attempt to save the memtable corresponding
// to log file and suppress the error if that works, but that
// would add more complexity in a critical code path.
RecordBackgroundError(s);
}
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.store(true, std::memory_order_release);
mem_ = new MemTable(internal_comparator_);
@ -1577,7 +1576,7 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
WriteBatch batch(opt.separate_threshold);
batch.Put(key, value);
return Write(opt, &batch);
}
@ -1609,7 +1608,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
impl->vlog_ = new log::VlogWriter(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref();
}

+ 5
- 1
db/db_impl.h 查看文件

@ -186,7 +186,7 @@ class DBImpl : public DB {
std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_
WritableFile* logfile_;
uint64_t logfile_number_ GUARDED_BY(mutex_);
log::Writer* log_;
// log::VlogWriter* log_;
uint32_t seed_ GUARDED_BY(mutex_); // For sampling.
// Queue of writers.
@ -212,6 +212,10 @@ class DBImpl : public DB {
CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_);
log::VlogWriter* vlog_;
int vlog_kv_numbers_;
// KVSepManagement* gc_management_;
};
// Sanitize db options. The caller should delete result.info_log if

+ 1
- 0
db/db_iter.cc 查看文件

@ -189,6 +189,7 @@ void DBIter::FindNextUserEntry(bool skipping, std::string* skip) {
skipping = true;
break;
case kTypeValue:
case kTypeSeparation:
if (skipping &&
user_comparator_->Compare(ikey.user_key, *skip) <= 0) {
// Entry hidden

+ 2
- 0
db/db_test.cc 查看文件

@ -190,6 +190,7 @@ class SpecialEnv : public EnvWrapper {
}
return base_->Sync();
}
size_t GetSize() { return base_->GetSize(); }
};
class ManifestFile : public WritableFile {
private:
@ -215,6 +216,7 @@ class SpecialEnv : public EnvWrapper {
return base_->Sync();
}
}
size_t GetSize() { return base_->GetSize(); }
};
if (non_writable_.load(std::memory_order_acquire)) {

+ 3
- 1
db/dumpfile.cc 查看文件

@ -74,7 +74,7 @@ Status PrintLogContents(Env* env, const std::string& fname,
// Called on every item found in a WriteBatch.
class WriteBatchItemPrinter : public WriteBatch::Handler {
public:
void Put(const Slice& key, const Slice& value) override {
void Put(const Slice& key, const Slice& value, ValueType type = kTypeValue) override {
std::string r = " put '";
AppendEscapedStringTo(&r, key);
r += "' '";
@ -189,6 +189,8 @@ Status DumpTable(Env* env, const std::string& fname, WritableFile* dst) {
r += "del";
} else if (key.type == kTypeValue) {
r += "val";
} else if (key.type == kTypeSeparation) {
r += "val";
} else {
AppendNumberTo(&r, key.type);
}

+ 1
- 0
db/fault_injection_test.cc 查看文件

@ -114,6 +114,7 @@ class TestWritableFile : public WritableFile {
Status Close() override;
Status Flush() override;
Status Sync() override;
size_t GetSize() override { return 0; };
private:
FileState state_;

+ 1
- 0
db/leveldbutil.cc 查看文件

@ -20,6 +20,7 @@ class StdoutPrinter : public WritableFile {
Status Close() override { return Status::OK(); }
Status Flush() override { return Status::OK(); }
Status Sync() override { return Status::OK(); }
size_t GetSize() override { return 0; }
};
bool HandleDumpCommand(Env* env, char** files, int num) {

+ 1
- 0
db/log_test.cc 查看文件

@ -164,6 +164,7 @@ class LogTest : public testing::Test {
Status Close() override { return Status::OK(); }
Status Flush() override { return Status::OK(); }
Status Sync() override { return Status::OK(); }
size_t GetSize() override { return 0; }
Status Append(const Slice& slice) override {
contents_.append(slice.data(), slice.size());
return Status::OK();

+ 8
- 1
db/memtable.cc 查看文件

@ -126,9 +126,16 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
value->assign(v.data(), v.size());
return true;
}
case kTypeDeletion:
case kTypeDeletion: {
*s = Status::NotFound(Slice());
return true;
}
case kTypeSeparation: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
s->SetSeparated();
return true;
}
}
}
}

+ 7
- 0
db/memtable.h 查看文件

@ -62,6 +62,10 @@ class MemTable {
// Else, return false.
bool Get(const LookupKey& key, std::string* value, Status* s);
uint64_t GetTailSequence() { return tail_sequence_; }
uint64_t GetLogFileNumber() { return log_file_number_; }
uint64_t SetLogFileNumber(uint64_t fid) { log_file_number_ = fid; }
private:
friend class MemTableIterator;
friend class MemTableBackwardIterator;
@ -76,6 +80,9 @@ class MemTable {
~MemTable(); // Private since only Unref() should be used to delete it
uint64_t tail_sequence_;
uint64_t log_file_number_;
KeyComparator comparator_;
int refs_;
Arena arena_;

+ 23
- 25
db/write_batch.cc 查看文件

@ -9,20 +9,18 @@
// record :=
// kTypeValue varstring varstring |
// kTypeDeletion varstring
// kTypeSeparation varstring varstring
// varstring :=
// len: varint32
// data: uint8[len]
#include "leveldb/write_batch.h"
#include "db/dbformat.h"
#include "db/memtable.h"
#include "db/write_batch_internal.h"
#include "leveldb/db.h"
#include "util/coding.h"
namespace leveldb {
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
@ -84,13 +82,11 @@ Status WriteBatch::Iterate(Handler* handler) const {
Status WriteBatch::Iterate(Handler* handler, uint64_t fid,
uint64_t offset) const {
Slice input(rep_);
// 整个writebatch 的起始地址
const char* begin = input.data();
if (input.size() < kHeader) {
return Status::Corruption("malformed WriteBatch (too small)");
}
// 12个字节,8个字节用来表示sequence,4个字节用来表示 count,移除头
input.remove_prefix(kHeader);
Slice key, value;
int found = 0;
@ -99,8 +95,6 @@ Status WriteBatch::Iterate(Handler* handler, uint64_t fid,
const uint64_t kv_offset = input.data() - begin + offset;
assert(kv_offset > 0);
// record 记录为 1 个字节 是 kTypeValue ,剩下的字节是 key value
// record 记录为 1 个字节 是 kTypeDeletion, 剩下的字节是key
char tag = input[0];
input.remove_prefix(1);
switch (tag) {
@ -119,20 +113,20 @@ Status WriteBatch::Iterate(Handler* handler, uint64_t fid,
return Status::Corruption("bad WriteBatch Delete");
}
break;
// case kTypeSeparate:
// if (GetLengthPrefixedSlice(&input, &key) &&
// GetLengthPrefixedSlice(&input, &(value))) {
// // value = fileNumber + offset + valuesize 采用变长编码的方式
// std::string dest;
// PutVarint64(&dest, fid);
// PutVarint64(&dest, kv_offset);
// PutVarint64(&dest, value.size());
// Slice value_offset(dest);
// handler->Put(key, value_offset, kTypeSeparate);
// } else {
// return Status::Corruption("bad WriteBatch Put");
// }
// break;
case kTypeSeparation:
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &(value))) {
// value = fileNumber + offset + valuesize 采用变长编码的方式
std::string dest;
PutVarint64(&dest, fid);
PutVarint64(&dest, kv_offset);
PutVarint64(&dest, value.size());
Slice value_offset(dest);
handler->Put(key, value_offset, kTypeSeparation);
} else {
return Status::Corruption("WriteBatch Put error");
}
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
@ -143,6 +137,7 @@ Status WriteBatch::Iterate(Handler* handler, uint64_t fid,
return Status::OK();
}
}
int WriteBatchInternal::Count(const WriteBatch* b) {
return DecodeFixed32(b->rep_.data() + 8);
}
@ -161,7 +156,11 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
void WriteBatch::Put(const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
if (value.size() >= separate_threshold_) {
rep_.push_back(static_cast<char>(kTypeSeparation));
} else {
rep_.push_back(static_cast<char>(kTypeValue));
}
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
}
@ -182,8 +181,8 @@ class MemTableInserter : public WriteBatch::Handler {
SequenceNumber sequence_;
MemTable* mem_;
void Put(const Slice& key, const Slice& value) override {
mem_->Add(sequence_, kTypeValue, key, value);
void Put(const Slice& key, const Slice& value, ValueType type = kTypeValue) override {
mem_->Add(sequence_, type, key, value);
sequence_++;
}
void Delete(const Slice& key) override {
@ -203,7 +202,6 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable,
uint64_t fid, size_t offset) {
MemTableInserter inserter;
// 一批 writeBatch中只有一个sequence,公用的,后续会自加。
inserter.sequence_ = WriteBatchInternal::Sequence(b);
inserter.mem_ = memtable;
return b->Iterate(&inserter, fid, offset);

+ 7
- 2
helpers/memenv/memenv.cc 查看文件

@ -199,17 +199,22 @@ class RandomAccessFileImpl : public RandomAccessFile {
class WritableFileImpl : public WritableFile {
public:
WritableFileImpl(FileState* file) : file_(file) { file_->Ref(); }
WritableFileImpl(FileState* file) : file_(file), file_size_(0) { file_->Ref(); }
~WritableFileImpl() override { file_->Unref(); }
Status Append(const Slice& data) override { return file_->Append(data); }
Status Append(const Slice& data) override {
file_size_+= data.size();
return file_->Append(data);
}
Status Close() override { return Status::OK(); }
Status Flush() override { return Status::OK(); }
Status Sync() override { return Status::OK(); }
size_t GetSize() override { return file_size_; }
private:
int file_size_;
FileState* file_;
};

+ 1
- 0
include/leveldb/env.h 查看文件

@ -287,6 +287,7 @@ class LEVELDB_EXPORT WritableFile {
virtual Status Close() = 0;
virtual Status Flush() = 0;
virtual Status Sync() = 0;
virtual size_t GetSize() = 0;
};
// An interface for writing log messages.

+ 5
- 2
include/leveldb/options.h 查看文件

@ -148,7 +148,7 @@ struct LEVELDB_EXPORT Options {
/* 需要再研究下 */
// value log
uint64_t max_value_log_size = 500 * 1024 * 1024;
uint64_t max_value_log_size = 16 * 1024 * 1024;
// gc
uint64_t garbage_collection_threshold = max_value_log_size / 4;
// gc put的时候kv分离的值
@ -176,7 +176,9 @@ struct LEVELDB_EXPORT ReadOptions {
// Options that control write operations
struct LEVELDB_EXPORT WriteOptions {
WriteOptions() = default;
explicit WriteOptions(size_t separateThreshold = 16)
: separate_threshold(separateThreshold) {}
// WriteOptions() = default;
// If true, the write will be flushed from the operating system
// buffer cache (by calling WritableFile::Sync()) before the write
@ -192,6 +194,7 @@ struct LEVELDB_EXPORT WriteOptions {
// crash semantics as the "write()" system call. A DB write
// with sync==true has similar crash semantics to a "write()"
// system call followed by "fsync()".
size_t separate_threshold ;
bool sync = false;
};

+ 5
- 2
include/leveldb/write_batch.h 查看文件

@ -25,6 +25,7 @@
#include "leveldb/export.h"
#include "leveldb/status.h"
#include "db/dbformat.h"
namespace leveldb {
@ -35,11 +36,13 @@ class LEVELDB_EXPORT WriteBatch {
class LEVELDB_EXPORT Handler {
public:
virtual ~Handler();
virtual void Put(const Slice& key, const Slice& value) = 0;
virtual void Put(const Slice& key, const Slice& value, ValueType type = kTypeValue) = 0;
virtual void Delete(const Slice& key) = 0;
};
WriteBatch();
explicit WriteBatch(size_t separate_threshold)
: separate_threshold_(separate_threshold) { Clear(); }
// Intentionally copyable.
WriteBatch(const WriteBatch&) = default;
@ -75,7 +78,7 @@ class LEVELDB_EXPORT WriteBatch {
private:
friend class WriteBatchInternal;
size_t separate_threshold_;
std::string rep_; // See comment in write_batch.cc for the format of rep_
};

+ 1
- 0
table/table_test.cc 查看文件

@ -97,6 +97,7 @@ class StringSink : public WritableFile {
Status Close() override { return Status::OK(); }
Status Flush() override { return Status::OK(); }
Status Sync() override { return Status::OK(); }
size_t GetSize(){ return contents_.size(); }
Status Append(const Slice& data) override {
contents_.append(data.data(), data.size());

+ 7
- 0
util/env_posix.cc 查看文件

@ -279,6 +279,7 @@ class PosixWritableFile final : public WritableFile {
PosixWritableFile(std::string filename, int fd)
: pos_(0),
fd_(fd),
file_size_(0),
is_manifest_(IsManifest(filename)),
filename_(std::move(filename)),
dirname_(Dirname(filename_)) {}
@ -290,10 +291,14 @@ class PosixWritableFile final : public WritableFile {
}
}
size_t GetSize() { return file_size_; }
Status Append(const Slice& data) override {
size_t write_size = data.size();
const char* write_data = data.data();
file_size_ += write_size;
// Fit as much as possible into buffer.
size_t copy_size = std::min(write_size, kWritableFileBufferSize - pos_);
std::memcpy(buf_ + pos_, write_data, copy_size);
@ -459,6 +464,8 @@ class PosixWritableFile final : public WritableFile {
size_t pos_;
int fd_;
int file_size_;
const bool is_manifest_; // True if the file's name starts with MANIFEST.
const std::string filename_;
const std::string dirname_; // The directory of filename_.

正在加载...
取消
保存