diff --git a/CMakeLists.txt b/CMakeLists.txt index 974f190..a3a5eb6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -117,7 +117,9 @@ endif(BUILD_SHARED_LIBS) # Must be included before CMAKE_INSTALL_INCLUDEDIR is used. include(GNUInstallDirs) -add_library(leveldb) +add_library(leveldb + db/vlog_reader.h + db/vlog_reader.cpp) target_sources(leveldb PRIVATE "${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h" diff --git a/db/db_impl.cc b/db/db_impl.cc index 3a5c225..eacde23 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -36,8 +36,12 @@ #include "util/logging.h" #include "util/mutexlock.h" +#include "db/vlog_reader.h" + namespace leveldb { +using namespace log; + const int kNumNonTableCacheFiles = 10; // Information kept for every waiting writer @@ -1118,6 +1122,25 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { return versions_->MaxNextLevelOverlappingBytes(); } +bool DBImpl::ParseVlogValue(Slice key_value, Slice key, + std::string& value, uint64_t val_size) { + Slice k_v = key_value; + if (k_v[0] != kTypeSeparation) return false; + k_v.remove_prefix(1); + + Slice vlog_key; + Slice vlog_value; + if (GetLengthPrefixedSlice(&k_v, &vlog_key) + && vlog_key == key + && GetLengthPrefixedSlice(&k_v, &vlog_value) + && vlog_value.size() == val_size) { + value = vlog_value.ToString(); + return true; + } else { + return false; + } +} + Status DBImpl::Get(const ReadOptions& options, const Slice& key, std::string* value) { Status s; @@ -1162,6 +1185,53 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, mem->Unref(); if (imm != nullptr) imm->Unref(); current->Unref(); + + /* Vlog 读取 value */ + if (s.ok() && s.IsSeparated()) { + + struct VlogReporter : public VlogReader::Reporter { + Status* status; + void Corruption(size_t bytes, const Status& s) override { + if (this->status->ok()) *this->status = s; + } + }; + + VlogReporter reporter; + Slice vlog_ptr(*value); + uint64_t file_no; + uint64_t offset; + uint64_t val_size; + size_t key_size = key.size(); + + GetVarint64(&vlog_ptr, &file_no); + GetVarint64(&vlog_ptr, &offset); + GetVarint64(&vlog_ptr, &val_size); + uint64_t encoded_len = 1 + VarintLength(key_size) + key.size() + VarintLength(val_size) + val_size; + + std::string fname = LogFileName(dbname_, file_no); + RandomAccessFile* file; + s = env_->NewRandomAccessFile(fname,&file); + if (!s.ok()) { + return s; + } + + VlogReader vlogReader(file, &reporter); + Slice key_value; + Slice ret_value; + char* scratch = new char[encoded_len]; + + if (vlogReader.ReadValue(offset, encoded_len, &key_value, scratch)) { + value->clear(); + if (!ParseVlogValue(key_value, key, *value, val_size)) { + s = Status::Corruption("value in vlog isn't match with given key"); + } + } else { + s = Status::Corruption("read vlog error"); + } + + delete file; + } + return s; } diff --git a/db/db_impl.h b/db/db_impl.h index 91f4fd4..27c09d2 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -53,6 +53,8 @@ class DBImpl : public DB { void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void CompactRange(const Slice* begin, const Slice* end) override; + bool ParseVlogValue(Slice key_value, Slice key, std::string& value, uint64_t val_size); + // Extra methods (for testing) that are not in the public DB interface // Compact any files in the named level that overlap [*begin,*end] diff --git a/db/dbformat.h b/db/dbformat.h index a1c30ed..e74af21 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -51,7 +51,7 @@ class InternalKey; // Value types encoded as the last component of internal keys. // DO NOT CHANGE THESE ENUM VALUES: they are embedded in the on-disk // data structures. -enum ValueType { kTypeDeletion = 0x0, kTypeValue = 0x1 }; +enum ValueType { kTypeDeletion = 0x0, kTypeValue = 0x1, kTypeSeparation = 0x2}; // kValueTypeForSeek defines the ValueType that should be passed when // constructing a ParsedInternalKey object for seeking to a particular // sequence number (since we sort sequence numbers in decreasing order diff --git a/db/fields.cpp b/db/fields.cpp index 9fb5357..fb845eb 100644 --- a/db/fields.cpp +++ b/db/fields.cpp @@ -171,7 +171,7 @@ std::string& Fields::operator[](const std::string& field_name) { } /* 通过若干个字段查询 Key */ - std::vector Fields::FindKeysByFields(leveldb::DB* db, const FieldArray& fields) { +std::vector Fields::FindKeysByFields(leveldb::DB* db, const FieldArray& fields) { Fields to_fields = Fields(fields); to_fields.Fields::SortFields(); FieldArray search_fields_ = to_fields.fields_; @@ -199,7 +199,7 @@ std::string& Fields::operator[](const std::string& field_name) { delete it; return find_keys; - } +} //std::vector Fields::FindKeysByFields(leveldb::DB* db, const FieldArray& fields) { // Fields to_fields = Fields(fields); diff --git a/db/log_format.h b/db/log_format.h index 356e69f..1ea5429 100644 --- a/db/log_format.h +++ b/db/log_format.h @@ -29,6 +29,16 @@ static const int kBlockSize = 32768; // Header is checksum (4 bytes), length (2 bytes), type (1 byte). static const int kHeaderSize = 4 + 2 + 1; + +/* 需要再研究下 */ +//4M 每次读取 vlog 的大小,GC 和 恢复 会用到 +//static const int vBlockSize = 4*1024*1024; + +// VlogHeader 只包含 length (4 bytes). +static const int vHeaderSize = 4; +// write_batch Header +static const int wHeaderSize = 8 + 4 ; + } // namespace log } // namespace leveldb diff --git a/db/vlog_reader.cpp b/db/vlog_reader.cpp new file mode 100644 index 0000000..4a928a3 --- /dev/null +++ b/db/vlog_reader.cpp @@ -0,0 +1,96 @@ +#include "vlog_reader.h" +#include "leveldb/env.h" +#include "util/coding.h" + +namespace leveldb { + +namespace log { + +VlogReader::VlogReader(SequentialFile *file, Reporter* reporter) + : file_(file), + file_random_(nullptr), + reporter_(reporter), + backing_store_(new char[kBlockSize]), + buffer_(), + eof_(false), + last_record_offset_(0) {} + +VlogReader::VlogReader(RandomAccessFile *file, Reporter* reporter) + : file_(nullptr), + file_random_(file), + reporter_(reporter), + backing_store_(new char[kBlockSize]), + buffer_(), + eof_(false), + last_record_offset_(0) {} + +VlogReader::~VlogReader() { delete[] backing_store_; } + +bool VlogReader::ReadValue(uint64_t offset, size_t length, Slice *key_value, char *scratch) { + if (file_random_ == nullptr) { + return false; + } + Status status = file_random_->Read(offset, length, key_value, scratch); + if (!status.ok()) { + return false; + } + return true; +} + +bool VlogReader::ReadRecord(Slice *record, std::string *scratch) { + if (ReadPhysicalRecord(scratch)) { + *record = *scratch; + return true; + } + return false; +} + +uint64_t VlogReader::LastRecordOffset() const { + return last_record_offset_; +} + +void VlogReader::ReportCorruption(uint64_t bytes, const Status &reason) { + if (reporter_ != nullptr) { + reporter_->Corruption(static_cast(bytes), reason); + } +} + +bool VlogReader::ReadPhysicalRecord(std::string *result) { + result->clear(); + buffer_.clear(); + + char* tmp_head = new char[vHeaderSize]; + Status status = file_->Read(vHeaderSize, &buffer_, tmp_head); + if (!status.ok()) { + buffer_.clear(); + ReportCorruption(kBlockSize, status); + eof_ = true; + return false; + } else if (buffer_.size() < vHeaderSize) { + eof_ = true; + } + + if (!eof_) { + result->assign(buffer_.data(),buffer_.size()); + uint32_t length = DecodeFixed32(buffer_.data()); + buffer_.clear(); + char* tmp = new char[length]; + status = file_->Read(length, &buffer_, tmp); + if (status.ok() && buffer_.size() == length) { + *result += buffer_.ToString(); + } else { + eof_ = true; + } + delete [] tmp; + } + delete [] tmp_head; + + if (eof_) { + result->clear(); + return false; + } + return true; +} + +} // namespace log +} // namespace leveldb diff --git a/db/vlog_reader.h b/db/vlog_reader.h new file mode 100644 index 0000000..b1d9484 --- /dev/null +++ b/db/vlog_reader.h @@ -0,0 +1,58 @@ +#ifndef LEVELDB_VLOG_READER_H +#define LEVELDB_VLOG_READER_H + +#include + +#include "db/log_format.h" +#include "leveldb/slice.h" +#include "leveldb/status.h" + +namespace leveldb { + +class SequentialFile; +class RandomAccessFile; + +namespace log { + +class VlogReader { + public: + class Reporter { + public: + virtual ~Reporter() = default; + virtual void Corruption(size_t bytes, const Status& status) = 0; + }; + + explicit VlogReader(SequentialFile* file, Reporter* reporter); + explicit VlogReader(RandomAccessFile* file, Reporter* reporter); + + VlogReader(const VlogReader&) = delete; + VlogReader& operator=(const VlogReader&) = delete; + + ~VlogReader(); + // 从 vlog 中读取一条具体的值 + bool ReadValue(uint64_t offset, size_t length, Slice* key_value, char* scratch); + bool ReadRecord(Slice* record, std::string* scratch); + // 返回 ReadRecord 函数读取的最后一条 record 的偏移(last_record_offset_) + uint64_t LastRecordOffset() const; + + private: + /* 读取 vlog 物理记录( data 部分) */ + bool ReadPhysicalRecord(std::string* result); + + void ReportCorruption(uint64_t bytes, const Status &reason); + + SequentialFile* const file_; + RandomAccessFile* const file_random_; + Reporter* const reporter_; + char* const backing_store_; + Slice buffer_; + bool eof_; // Last Read() indicated EOF by returning < kBlockSize + + // Offset of the last record returned by ReadRecord. + uint64_t last_record_offset_; +}; + +} // namespace log +} // namespace leveldb + +#endif // LEVELDB_VLOG_READER_H diff --git a/include/leveldb/options.h b/include/leveldb/options.h index d755f46..e110c80 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -145,6 +145,16 @@ struct LEVELDB_EXPORT Options { // Many applications will benefit from passing the result of // NewBloomFilterPolicy() here. const FilterPolicy* filter_policy = nullptr; + + /* 需要再研究下 */ + // value log 的文件大小 + uint64_t max_value_log_size = 500 * 1024 * 1024; + // gc 的回收阈值。 + uint64_t garbage_collection_threshold = max_value_log_size / 4; + // gc 后台回收时候重新put的时候,默认的kv分离的值。 + uint64_t background_garbage_collection_separate_ = 1024 * 1024 - 1; + // 在open 数据库的时候就进行全盘的log文件回收 + bool start_garbage_collection = true; }; // Options that control read operations diff --git a/include/leveldb/status.h b/include/leveldb/status.h index e327314..e4e6897 100644 --- a/include/leveldb/status.h +++ b/include/leveldb/status.h @@ -71,6 +71,12 @@ class LEVELDB_EXPORT Status { // Returns true iff the status indicates an InvalidArgument. bool IsInvalidArgument() const { return code() == kInvalidArgument; } + bool IsSeparated() const { return code() == kSeparated; } + + void SetSeparated() { code_ = kSeparated; } + + void SetNotSeparated() { code_ = kNotSeparated; } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; @@ -82,9 +88,12 @@ class LEVELDB_EXPORT Status { kCorruption = 2, kNotSupported = 3, kInvalidArgument = 4, - kIOError = 5 + kIOError = 5, + kSeparated = 6, + kNotSeparated = 7 }; + Code code_; Code code() const { return (state_ == nullptr) ? kOk : static_cast(state_[4]); } @@ -101,6 +110,7 @@ class LEVELDB_EXPORT Status { }; inline Status::Status(const Status& rhs) { + code_ = rhs.code_; state_ = (rhs.state_ == nullptr) ? nullptr : CopyState(rhs.state_); } inline Status& Status::operator=(const Status& rhs) { @@ -108,12 +118,14 @@ inline Status& Status::operator=(const Status& rhs) { // and the common case where both rhs and *this are ok. if (state_ != rhs.state_) { delete[] state_; + code_ = rhs.code_; state_ = (rhs.state_ == nullptr) ? nullptr : CopyState(rhs.state_); } return *this; } inline Status& Status::operator=(Status&& rhs) noexcept { std::swap(state_, rhs.state_); + std::swap(code_ , rhs.code_); return *this; }