diff --git a/db/db_impl.cc b/db/db_impl.cc index a5e7153..946f3b9 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -146,11 +146,14 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) has_imm_(false), logfile_(nullptr), logfile_number_(0), - log_(nullptr), seed_(0), tmp_batch_(new WriteBatch), background_compaction_scheduled_(false), manual_compaction_(nullptr), + + vlog_(nullptr), + vlog_kv_numbers_(0), + versions_(new VersionSet(dbname_, &options_, table_cache_, &internal_comparator_)) {} @@ -171,7 +174,7 @@ DBImpl::~DBImpl() { if (mem_ != nullptr) mem_->Unref(); if (imm_ != nullptr) imm_->Unref(); delete tmp_batch_; - delete log_; + delete vlog_; delete logfile_; delete table_cache_; @@ -476,13 +479,13 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, // See if we should keep reusing the last log file. if (status.ok() && options_.reuse_logs && last_log && compactions == 0) { assert(logfile_ == nullptr); - assert(log_ == nullptr); + assert(vlog_ == nullptr); assert(mem_ == nullptr); uint64_t lfile_size; if (env_->GetFileSize(fname, &lfile_size).ok() && env_->NewAppendableFile(fname, &logfile_).ok()) { Log(options_.info_log, "Reusing old log %s \n", fname.c_str()); - log_ = new log::Writer(logfile_, lfile_size); + vlog_ = new log::VlogWriter(logfile_, lfile_size); logfile_number_ = log_number; if (mem != nullptr) { mem_ = mem; @@ -1313,6 +1316,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); last_sequence += WriteBatchInternal::Count(write_batch); + /* TODO */ + vlog_kv_numbers_ += WriteBatchInternal::Count(write_batch); + // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes @@ -1423,6 +1429,23 @@ Status DBImpl::MakeRoomForWrite(bool force) { assert(!writers_.empty()); bool allow_delay = !force; Status s; + + if (logfile_->GetSize() > options_.max_value_log_size) { + uint64_t new_log_number = versions_->NewFileNumber(); + WritableFile* lfile = nullptr; + s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); + if (!s.ok()) { + versions_->ReuseFileNumber(new_log_number); + } +// gc_management_->WriteFileMap(logfile_number_, vlog_kv_numbers_, logfile_->GetSize()); + vlog_kv_numbers_ = 0; + delete vlog_; + delete logfile_; + logfile_ = lfile; + logfile_number_ = new_log_number; + vlog_ = new log::VlogWriter(lfile); + } + while (true) { if (!bg_error_.ok()) { // Yield previous error @@ -1456,33 +1479,9 @@ Status DBImpl::MakeRoomForWrite(bool force) { } else { // Attempt to switch to a new memtable and trigger compaction of old assert(versions_->PrevLogNumber() == 0); - uint64_t new_log_number = versions_->NewFileNumber(); - WritableFile* lfile = nullptr; - s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); - if (!s.ok()) { - // Avoid chewing through file number space in a tight loop. - versions_->ReuseFileNumber(new_log_number); - break; - } - delete log_; + mem_->SetLogFileNumber(logfile_number_); - s = logfile_->Close(); - if (!s.ok()) { - // We may have lost some data written to the previous log file. - // Switch to the new log file anyway, but record as a background - // error so we do not attempt any more writes. - // - // We could perhaps attempt to save the memtable corresponding - // to log file and suppress the error if that works, but that - // would add more complexity in a critical code path. - RecordBackgroundError(s); - } - delete logfile_; - - logfile_ = lfile; - logfile_number_ = new_log_number; - log_ = new log::Writer(lfile); imm_ = mem_; has_imm_.store(true, std::memory_order_release); mem_ = new MemTable(internal_comparator_); @@ -1577,7 +1576,7 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { // Default implementations of convenience methods that subclasses of DB // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { - WriteBatch batch; + WriteBatch batch(opt.separate_threshold); batch.Put(key, value); return Write(opt, &batch); } @@ -1609,7 +1608,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { edit.SetLogNumber(new_log_number); impl->logfile_ = lfile; impl->logfile_number_ = new_log_number; - impl->log_ = new log::Writer(lfile); + impl->vlog_ = new log::VlogWriter(lfile); impl->mem_ = new MemTable(impl->internal_comparator_); impl->mem_->Ref(); } diff --git a/db/db_impl.h b/db/db_impl.h index 2237a72..46a8993 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -186,7 +186,7 @@ class DBImpl : public DB { std::atomic has_imm_; // So bg thread can detect non-null imm_ WritableFile* logfile_; uint64_t logfile_number_ GUARDED_BY(mutex_); - log::Writer* log_; +// log::VlogWriter* log_; uint32_t seed_ GUARDED_BY(mutex_); // For sampling. // Queue of writers. @@ -212,6 +212,10 @@ class DBImpl : public DB { CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_); log::VlogWriter* vlog_; + + int vlog_kv_numbers_; + +// KVSepManagement* gc_management_; }; // Sanitize db options. The caller should delete result.info_log if diff --git a/db/db_iter.cc b/db/db_iter.cc index 532c2db..b83948b 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -189,6 +189,7 @@ void DBIter::FindNextUserEntry(bool skipping, std::string* skip) { skipping = true; break; case kTypeValue: + case kTypeSeparation: if (skipping && user_comparator_->Compare(ikey.user_key, *skip) <= 0) { // Entry hidden diff --git a/db/db_test.cc b/db/db_test.cc index a4a84cd..98cac55 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -190,6 +190,7 @@ class SpecialEnv : public EnvWrapper { } return base_->Sync(); } + size_t GetSize() { return base_->GetSize(); } }; class ManifestFile : public WritableFile { private: @@ -215,6 +216,7 @@ class SpecialEnv : public EnvWrapper { return base_->Sync(); } } + size_t GetSize() { return base_->GetSize(); } }; if (non_writable_.load(std::memory_order_acquire)) { diff --git a/db/dumpfile.cc b/db/dumpfile.cc index 6085475..76a15c6 100644 --- a/db/dumpfile.cc +++ b/db/dumpfile.cc @@ -74,7 +74,7 @@ Status PrintLogContents(Env* env, const std::string& fname, // Called on every item found in a WriteBatch. class WriteBatchItemPrinter : public WriteBatch::Handler { public: - void Put(const Slice& key, const Slice& value) override { + void Put(const Slice& key, const Slice& value, ValueType type = kTypeValue) override { std::string r = " put '"; AppendEscapedStringTo(&r, key); r += "' '"; @@ -189,6 +189,8 @@ Status DumpTable(Env* env, const std::string& fname, WritableFile* dst) { r += "del"; } else if (key.type == kTypeValue) { r += "val"; + } else if (key.type == kTypeSeparation) { + r += "val"; } else { AppendNumberTo(&r, key.type); } diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index ef864a4..cbc5cfd 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -114,6 +114,7 @@ class TestWritableFile : public WritableFile { Status Close() override; Status Flush() override; Status Sync() override; + size_t GetSize() override { return 0; }; private: FileState state_; diff --git a/db/leveldbutil.cc b/db/leveldbutil.cc index 95ee897..9980c02 100644 --- a/db/leveldbutil.cc +++ b/db/leveldbutil.cc @@ -20,6 +20,7 @@ class StdoutPrinter : public WritableFile { Status Close() override { return Status::OK(); } Status Flush() override { return Status::OK(); } Status Sync() override { return Status::OK(); } + size_t GetSize() override { return 0; } }; bool HandleDumpCommand(Env* env, char** files, int num) { diff --git a/db/log_test.cc b/db/log_test.cc index d55d4dd..73e3844 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -164,6 +164,7 @@ class LogTest : public testing::Test { Status Close() override { return Status::OK(); } Status Flush() override { return Status::OK(); } Status Sync() override { return Status::OK(); } + size_t GetSize() override { return 0; } Status Append(const Slice& slice) override { contents_.append(slice.data(), slice.size()); return Status::OK(); diff --git a/db/memtable.cc b/db/memtable.cc index 4f09340..c09b51d 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -126,9 +126,16 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { value->assign(v.data(), v.size()); return true; } - case kTypeDeletion: + case kTypeDeletion: { *s = Status::NotFound(Slice()); return true; + } + case kTypeSeparation: { + Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + value->assign(v.data(), v.size()); + s->SetSeparated(); + return true; + } } } } diff --git a/db/memtable.h b/db/memtable.h index 9d986b1..0ce2087 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -62,6 +62,10 @@ class MemTable { // Else, return false. bool Get(const LookupKey& key, std::string* value, Status* s); + uint64_t GetTailSequence() { return tail_sequence_; } + uint64_t GetLogFileNumber() { return log_file_number_; } + uint64_t SetLogFileNumber(uint64_t fid) { log_file_number_ = fid; } + private: friend class MemTableIterator; friend class MemTableBackwardIterator; @@ -76,6 +80,9 @@ class MemTable { ~MemTable(); // Private since only Unref() should be used to delete it + uint64_t tail_sequence_; + uint64_t log_file_number_; + KeyComparator comparator_; int refs_; Arena arena_; diff --git a/db/write_batch.cc b/db/write_batch.cc index 874bc34..ef95a35 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -9,20 +9,18 @@ // record := // kTypeValue varstring varstring | // kTypeDeletion varstring +// kTypeSeparation varstring varstring // varstring := // len: varint32 // data: uint8[len] #include "leveldb/write_batch.h" -#include "db/dbformat.h" #include "db/memtable.h" #include "db/write_batch_internal.h" #include "leveldb/db.h" -#include "util/coding.h" - namespace leveldb { // WriteBatch header has an 8-byte sequence number followed by a 4-byte count. @@ -84,13 +82,11 @@ Status WriteBatch::Iterate(Handler* handler) const { Status WriteBatch::Iterate(Handler* handler, uint64_t fid, uint64_t offset) const { Slice input(rep_); - // 整个writebatch 的起始地址 const char* begin = input.data(); if (input.size() < kHeader) { return Status::Corruption("malformed WriteBatch (too small)"); } - // 12个字节,8个字节用来表示sequence,4个字节用来表示 count,移除头 input.remove_prefix(kHeader); Slice key, value; int found = 0; @@ -99,8 +95,6 @@ Status WriteBatch::Iterate(Handler* handler, uint64_t fid, const uint64_t kv_offset = input.data() - begin + offset; assert(kv_offset > 0); - // record 记录为 1 个字节 是 kTypeValue ,剩下的字节是 key value - // record 记录为 1 个字节 是 kTypeDeletion, 剩下的字节是key char tag = input[0]; input.remove_prefix(1); switch (tag) { @@ -119,20 +113,20 @@ Status WriteBatch::Iterate(Handler* handler, uint64_t fid, return Status::Corruption("bad WriteBatch Delete"); } break; - // case kTypeSeparate: - // if (GetLengthPrefixedSlice(&input, &key) && - // GetLengthPrefixedSlice(&input, &(value))) { - // // value = fileNumber + offset + valuesize 采用变长编码的方式 - // std::string dest; - // PutVarint64(&dest, fid); - // PutVarint64(&dest, kv_offset); - // PutVarint64(&dest, value.size()); - // Slice value_offset(dest); - // handler->Put(key, value_offset, kTypeSeparate); - // } else { - // return Status::Corruption("bad WriteBatch Put"); - // } - // break; + case kTypeSeparation: + if (GetLengthPrefixedSlice(&input, &key) && + GetLengthPrefixedSlice(&input, &(value))) { + // value = fileNumber + offset + valuesize 采用变长编码的方式 + std::string dest; + PutVarint64(&dest, fid); + PutVarint64(&dest, kv_offset); + PutVarint64(&dest, value.size()); + Slice value_offset(dest); + handler->Put(key, value_offset, kTypeSeparation); + } else { + return Status::Corruption("WriteBatch Put error"); + } + break; default: return Status::Corruption("unknown WriteBatch tag"); } @@ -143,6 +137,7 @@ Status WriteBatch::Iterate(Handler* handler, uint64_t fid, return Status::OK(); } } + int WriteBatchInternal::Count(const WriteBatch* b) { return DecodeFixed32(b->rep_.data() + 8); } @@ -161,7 +156,11 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { void WriteBatch::Put(const Slice& key, const Slice& value) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); - rep_.push_back(static_cast(kTypeValue)); + if (value.size() >= separate_threshold_) { + rep_.push_back(static_cast(kTypeSeparation)); + } else { + rep_.push_back(static_cast(kTypeValue)); + } PutLengthPrefixedSlice(&rep_, key); PutLengthPrefixedSlice(&rep_, value); } @@ -182,8 +181,8 @@ class MemTableInserter : public WriteBatch::Handler { SequenceNumber sequence_; MemTable* mem_; - void Put(const Slice& key, const Slice& value) override { - mem_->Add(sequence_, kTypeValue, key, value); + void Put(const Slice& key, const Slice& value, ValueType type = kTypeValue) override { + mem_->Add(sequence_, type, key, value); sequence_++; } void Delete(const Slice& key) override { @@ -203,7 +202,6 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable, uint64_t fid, size_t offset) { MemTableInserter inserter; - // 一批 writeBatch中只有一个sequence,公用的,后续会自加。 inserter.sequence_ = WriteBatchInternal::Sequence(b); inserter.mem_ = memtable; return b->Iterate(&inserter, fid, offset); diff --git a/helpers/memenv/memenv.cc b/helpers/memenv/memenv.cc index e476613..8407924 100644 --- a/helpers/memenv/memenv.cc +++ b/helpers/memenv/memenv.cc @@ -199,17 +199,22 @@ class RandomAccessFileImpl : public RandomAccessFile { class WritableFileImpl : public WritableFile { public: - WritableFileImpl(FileState* file) : file_(file) { file_->Ref(); } + WritableFileImpl(FileState* file) : file_(file), file_size_(0) { file_->Ref(); } ~WritableFileImpl() override { file_->Unref(); } - Status Append(const Slice& data) override { return file_->Append(data); } + Status Append(const Slice& data) override { + file_size_+= data.size(); + return file_->Append(data); + } Status Close() override { return Status::OK(); } Status Flush() override { return Status::OK(); } Status Sync() override { return Status::OK(); } + size_t GetSize() override { return file_size_; } private: + int file_size_; FileState* file_; }; diff --git a/include/leveldb/env.h b/include/leveldb/env.h index e00895a..2f0dae3 100644 --- a/include/leveldb/env.h +++ b/include/leveldb/env.h @@ -287,6 +287,7 @@ class LEVELDB_EXPORT WritableFile { virtual Status Close() = 0; virtual Status Flush() = 0; virtual Status Sync() = 0; + virtual size_t GetSize() = 0; }; // An interface for writing log messages. diff --git a/include/leveldb/options.h b/include/leveldb/options.h index e110c80..dd82879 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -148,7 +148,7 @@ struct LEVELDB_EXPORT Options { /* 需要再研究下 */ // value log 的文件大小 - uint64_t max_value_log_size = 500 * 1024 * 1024; + uint64_t max_value_log_size = 16 * 1024 * 1024; // gc 的回收阈值。 uint64_t garbage_collection_threshold = max_value_log_size / 4; // gc 后台回收时候重新put的时候,默认的kv分离的值。 @@ -176,7 +176,9 @@ struct LEVELDB_EXPORT ReadOptions { // Options that control write operations struct LEVELDB_EXPORT WriteOptions { - WriteOptions() = default; + explicit WriteOptions(size_t separateThreshold = 16) + : separate_threshold(separateThreshold) {} +// WriteOptions() = default; // If true, the write will be flushed from the operating system // buffer cache (by calling WritableFile::Sync()) before the write @@ -192,6 +194,7 @@ struct LEVELDB_EXPORT WriteOptions { // crash semantics as the "write()" system call. A DB write // with sync==true has similar crash semantics to a "write()" // system call followed by "fsync()". + size_t separate_threshold ; bool sync = false; }; diff --git a/include/leveldb/write_batch.h b/include/leveldb/write_batch.h index 2af98a3..c09ceb0 100644 --- a/include/leveldb/write_batch.h +++ b/include/leveldb/write_batch.h @@ -25,6 +25,7 @@ #include "leveldb/export.h" #include "leveldb/status.h" +#include "db/dbformat.h" namespace leveldb { @@ -35,11 +36,13 @@ class LEVELDB_EXPORT WriteBatch { class LEVELDB_EXPORT Handler { public: virtual ~Handler(); - virtual void Put(const Slice& key, const Slice& value) = 0; + virtual void Put(const Slice& key, const Slice& value, ValueType type = kTypeValue) = 0; virtual void Delete(const Slice& key) = 0; }; WriteBatch(); + explicit WriteBatch(size_t separate_threshold) + : separate_threshold_(separate_threshold) { Clear(); } // Intentionally copyable. WriteBatch(const WriteBatch&) = default; @@ -75,7 +78,7 @@ class LEVELDB_EXPORT WriteBatch { private: friend class WriteBatchInternal; - + size_t separate_threshold_; std::string rep_; // See comment in write_batch.cc for the format of rep_ }; diff --git a/table/table_test.cc b/table/table_test.cc index aea0697..a924157 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -97,6 +97,7 @@ class StringSink : public WritableFile { Status Close() override { return Status::OK(); } Status Flush() override { return Status::OK(); } Status Sync() override { return Status::OK(); } + size_t GetSize(){ return contents_.size(); } Status Append(const Slice& data) override { contents_.append(data.data(), data.size()); diff --git a/util/env_posix.cc b/util/env_posix.cc index ffd06c4..eb26480 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -279,6 +279,7 @@ class PosixWritableFile final : public WritableFile { PosixWritableFile(std::string filename, int fd) : pos_(0), fd_(fd), + file_size_(0), is_manifest_(IsManifest(filename)), filename_(std::move(filename)), dirname_(Dirname(filename_)) {} @@ -290,10 +291,14 @@ class PosixWritableFile final : public WritableFile { } } + size_t GetSize() { return file_size_; } + Status Append(const Slice& data) override { size_t write_size = data.size(); const char* write_data = data.data(); + file_size_ += write_size; + // Fit as much as possible into buffer. size_t copy_size = std::min(write_size, kWritableFileBufferSize - pos_); std::memcpy(buf_ + pos_, write_data, copy_size); @@ -459,6 +464,8 @@ class PosixWritableFile final : public WritableFile { size_t pos_; int fd_; + int file_size_; + const bool is_manifest_; // True if the file's name starts with MANIFEST. const std::string filename_; const std::string dirname_; // The directory of filename_.