Sfoglia il codice sorgente

vlog_reader v1.0

main
VirgilZhu 8 mesi fa
parent
commit
6df1a279e1
10 ha cambiato i file con 265 aggiunte e 5 eliminazioni
  1. +3
    -1
      CMakeLists.txt
  2. +70
    -0
      db/db_impl.cc
  3. +2
    -0
      db/db_impl.h
  4. +1
    -1
      db/dbformat.h
  5. +2
    -2
      db/fields.cpp
  6. +10
    -0
      db/log_format.h
  7. +96
    -0
      db/vlog_reader.cpp
  8. +58
    -0
      db/vlog_reader.h
  9. +10
    -0
      include/leveldb/options.h
  10. +13
    -1
      include/leveldb/status.h

+ 3
- 1
CMakeLists.txt Vedi File

@ -117,7 +117,9 @@ endif(BUILD_SHARED_LIBS)
# Must be included before CMAKE_INSTALL_INCLUDEDIR is used.
include(GNUInstallDirs)
add_library(leveldb)
add_library(leveldb
db/vlog_reader.h
db/vlog_reader.cpp)
target_sources(leveldb
PRIVATE
"${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h"

+ 70
- 0
db/db_impl.cc Vedi File

@ -36,8 +36,12 @@
#include "util/logging.h"
#include "util/mutexlock.h"
#include "db/vlog_reader.h"
namespace leveldb {
using namespace log;
const int kNumNonTableCacheFiles = 10;
// Information kept for every waiting writer
@ -1118,6 +1122,25 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
return versions_->MaxNextLevelOverlappingBytes();
}
bool DBImpl::ParseVlogValue(Slice key_value, Slice key,
std::string& value, uint64_t val_size) {
Slice k_v = key_value;
if (k_v[0] != kTypeSeparation) return false;
k_v.remove_prefix(1);
Slice vlog_key;
Slice vlog_value;
if (GetLengthPrefixedSlice(&k_v, &vlog_key)
&& vlog_key == key
&& GetLengthPrefixedSlice(&k_v, &vlog_value)
&& vlog_value.size() == val_size) {
value = vlog_value.ToString();
return true;
} else {
return false;
}
}
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
@ -1162,6 +1185,53 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
/* Vlog 读取 value */
if (s.ok() && s.IsSeparated()) {
struct VlogReporter : public VlogReader::Reporter {
Status* status;
void Corruption(size_t bytes, const Status& s) override {
if (this->status->ok()) *this->status = s;
}
};
VlogReporter reporter;
Slice vlog_ptr(*value);
uint64_t file_no;
uint64_t offset;
uint64_t val_size;
size_t key_size = key.size();
GetVarint64(&vlog_ptr, &file_no);
GetVarint64(&vlog_ptr, &offset);
GetVarint64(&vlog_ptr, &val_size);
uint64_t encoded_len = 1 + VarintLength(key_size) + key.size() + VarintLength(val_size) + val_size;
std::string fname = LogFileName(dbname_, file_no);
RandomAccessFile* file;
s = env_->NewRandomAccessFile(fname,&file);
if (!s.ok()) {
return s;
}
VlogReader vlogReader(file, &reporter);
Slice key_value;
Slice ret_value;
char* scratch = new char[encoded_len];
if (vlogReader.ReadValue(offset, encoded_len, &key_value, scratch)) {
value->clear();
if (!ParseVlogValue(key_value, key, *value, val_size)) {
s = Status::Corruption("value in vlog isn't match with given key");
}
} else {
s = Status::Corruption("read vlog error");
}
delete file;
}
return s;
}

+ 2
- 0
db/db_impl.h Vedi File

@ -53,6 +53,8 @@ class DBImpl : public DB {
void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override;
void CompactRange(const Slice* begin, const Slice* end) override;
bool ParseVlogValue(Slice key_value, Slice key, std::string& value, uint64_t val_size);
// Extra methods (for testing) that are not in the public DB interface
// Compact any files in the named level that overlap [*begin,*end]

+ 1
- 1
db/dbformat.h Vedi File

@ -51,7 +51,7 @@ class InternalKey;
// Value types encoded as the last component of internal keys.
// DO NOT CHANGE THESE ENUM VALUES: they are embedded in the on-disk
// data structures.
enum ValueType { kTypeDeletion = 0x0, kTypeValue = 0x1 };
enum ValueType { kTypeDeletion = 0x0, kTypeValue = 0x1 class="p">, kTypeSeparation = 0x2};
// kValueTypeForSeek defines the ValueType that should be passed when
// constructing a ParsedInternalKey object for seeking to a particular
// sequence number (since we sort sequence numbers in decreasing order

+ 2
- 2
db/fields.cpp Vedi File

@ -171,7 +171,7 @@ std::string& Fields::operator[](const std::string& field_name) {
}
/* 通过若干个字段查询 Key */
std::vector<std::string> Fields::FindKeysByFields(leveldb::DB* db, const FieldArray& fields) {
std::vector<std::string> Fields::FindKeysByFields(leveldb::DB* db, const FieldArray& fields) {
Fields to_fields = Fields(fields);
to_fields.Fields::SortFields();
FieldArray search_fields_ = to_fields.fields_;
@ -199,7 +199,7 @@ std::string& Fields::operator[](const std::string& field_name) {
delete it;
return find_keys;
}
}
//std::vector<std::string> Fields::FindKeysByFields(leveldb::DB* db, const FieldArray& fields) {
// Fields to_fields = Fields(fields);

+ 10
- 0
db/log_format.h Vedi File

@ -29,6 +29,16 @@ static const int kBlockSize = 32768;
// Header is checksum (4 bytes), length (2 bytes), type (1 byte).
static const int kHeaderSize = 4 + 2 + 1;
/* 需要再研究下 */
//4M vlog GC
//static const int vBlockSize = 4*1024*1024;
// VlogHeader length (4 bytes).
static const int vHeaderSize = 4;
// write_batch Header
static const int wHeaderSize = 8 + 4 ;
} // namespace log
} // namespace leveldb

+ 96
- 0
db/vlog_reader.cpp Vedi File

@ -0,0 +1,96 @@
#include "vlog_reader.h"
#include "leveldb/env.h"
#include "util/coding.h"
namespace leveldb {
namespace log {
VlogReader::VlogReader(SequentialFile *file, Reporter* reporter)
: file_(file),
file_random_(nullptr),
reporter_(reporter),
backing_store_(new char[kBlockSize]),
buffer_(),
eof_(false),
last_record_offset_(0) {}
VlogReader::VlogReader(RandomAccessFile *file, Reporter* reporter)
: file_(nullptr),
file_random_(file),
reporter_(reporter),
backing_store_(new char[kBlockSize]),
buffer_(),
eof_(false),
last_record_offset_(0) {}
VlogReader::~VlogReader() { delete[] backing_store_; }
bool VlogReader::ReadValue(uint64_t offset, size_t length, Slice *key_value, char *scratch) {
if (file_random_ == nullptr) {
return false;
}
Status status = file_random_->Read(offset, length, key_value, scratch);
if (!status.ok()) {
return false;
}
return true;
}
bool VlogReader::ReadRecord(Slice *record, std::string *scratch) {
if (ReadPhysicalRecord(scratch)) {
*record = *scratch;
return true;
}
return false;
}
uint64_t VlogReader::LastRecordOffset() const {
return last_record_offset_;
}
void VlogReader::ReportCorruption(uint64_t bytes, const Status &reason) {
if (reporter_ != nullptr) {
reporter_->Corruption(static_cast<size_t>(bytes), reason);
}
}
bool VlogReader::ReadPhysicalRecord(std::string *result) {
result->clear();
buffer_.clear();
char* tmp_head = new char[vHeaderSize];
Status status = file_->Read(vHeaderSize, &buffer_, tmp_head);
if (!status.ok()) {
buffer_.clear();
ReportCorruption(kBlockSize, status);
eof_ = true;
return false;
} else if (buffer_.size() < vHeaderSize) {
eof_ = true;
}
if (!eof_) {
result->assign(buffer_.data(),buffer_.size());
uint32_t length = DecodeFixed32(buffer_.data());
buffer_.clear();
char* tmp = new char[length];
status = file_->Read(length, &buffer_, tmp);
if (status.ok() && buffer_.size() == length) {
*result += buffer_.ToString();
} else {
eof_ = true;
}
delete [] tmp;
}
delete [] tmp_head;
if (eof_) {
result->clear();
return false;
}
return true;
}
} // namespace log
} // namespace leveldb

+ 58
- 0
db/vlog_reader.h Vedi File

@ -0,0 +1,58 @@
#ifndef LEVELDB_VLOG_READER_H
#define LEVELDB_VLOG_READER_H
#include <cstdint>
#include "db/log_format.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
namespace leveldb {
class SequentialFile;
class RandomAccessFile;
namespace log {
class VlogReader {
public:
class Reporter {
public:
virtual ~Reporter() = default;
virtual void Corruption(size_t bytes, const Status& status) = 0;
};
explicit VlogReader(SequentialFile* file, Reporter* reporter);
explicit VlogReader(RandomAccessFile* file, Reporter* reporter);
VlogReader(const VlogReader&) = delete;
VlogReader& operator=(const VlogReader&) = delete;
~VlogReader();
// vlog
bool ReadValue(uint64_t offset, size_t length, Slice* key_value, char* scratch);
bool ReadRecord(Slice* record, std::string* scratch);
// ReadRecord record last_record_offset_
uint64_t LastRecordOffset() const;
private:
/* 读取 vlog 物理记录( data 部分) */
bool ReadPhysicalRecord(std::string* result);
void ReportCorruption(uint64_t bytes, const Status &reason);
SequentialFile* const file_;
RandomAccessFile* const file_random_;
Reporter* const reporter_;
char* const backing_store_;
Slice buffer_;
bool eof_; // Last Read() indicated EOF by returning < kBlockSize
// Offset of the last record returned by ReadRecord.
uint64_t last_record_offset_;
};
} // namespace log
} // namespace leveldb
#endif // LEVELDB_VLOG_READER_H

+ 10
- 0
include/leveldb/options.h Vedi File

@ -145,6 +145,16 @@ struct LEVELDB_EXPORT Options {
// Many applications will benefit from passing the result of
// NewBloomFilterPolicy() here.
const FilterPolicy* filter_policy = nullptr;
/* 需要再研究下 */
// value log
uint64_t max_value_log_size = 500 * 1024 * 1024;
// gc
uint64_t garbage_collection_threshold = max_value_log_size / 4;
// gc put的时候kv分离的值
uint64_t background_garbage_collection_separate_ = 1024 * 1024 - 1;
// open log文件回收
bool start_garbage_collection = true;
};
// Options that control read operations

+ 13
- 1
include/leveldb/status.h Vedi File

@ -71,6 +71,12 @@ class LEVELDB_EXPORT Status {
// Returns true iff the status indicates an InvalidArgument.
bool IsInvalidArgument() const { return code() == kInvalidArgument; }
bool IsSeparated() const { return code() == kSeparated; }
void SetSeparated() { code_ = kSeparated; }
void SetNotSeparated() { code_ = kNotSeparated; }
// Return a string representation of this status suitable for printing.
// Returns the string "OK" for success.
std::string ToString() const;
@ -82,9 +88,12 @@ class LEVELDB_EXPORT Status {
kCorruption = 2,
kNotSupported = 3,
kInvalidArgument = 4,
kIOError = 5
kIOError = 5,
kSeparated = 6,
kNotSeparated = 7
};
Code code_;
Code code() const {
return (state_ == nullptr) ? kOk : static_cast<Code>(state_[4]);
}
@ -101,6 +110,7 @@ class LEVELDB_EXPORT Status {
};
inline Status::Status(const Status& rhs) {
code_ = rhs.code_;
state_ = (rhs.state_ == nullptr) ? nullptr : CopyState(rhs.state_);
}
inline Status& Status::operator=(const Status& rhs) {
@ -108,12 +118,14 @@ inline Status& Status::operator=(const Status& rhs) {
// and the common case where both rhs and *this are ok.
if (state_ != rhs.state_) {
delete[] state_;
code_ = rhs.code_;
state_ = (rhs.state_ == nullptr) ? nullptr : CopyState(rhs.state_);
}
return *this;
}
inline Status& Status::operator=(Status&& rhs) noexcept {
std::swap(state_, rhs.state_);
std::swap(code_ , rhs.code_);
return *this;
}

Caricamento…
Annulla
Salva