Browse Source

除了kvlog回收外的所有

kvsep_cyq
cyq 8 months ago
parent
commit
ab67267dfd
19 changed files with 502 additions and 7 deletions
  1. +7
    -0
      CMakeLists.txt
  2. +71
    -3
      db/db_impl.cc
  3. +14
    -0
      db/db_impl.h
  4. +9
    -0
      db/filename.cc
  5. +3
    -0
      db/filename.h
  6. +6
    -2
      db/log_writer.cc
  7. +1
    -0
      db/log_writer.h
  8. +30
    -1
      db/version_edit.cc
  9. +16
    -0
      db/version_edit.h
  10. +43
    -0
      db/version_set.cc
  11. +4
    -0
      db/version_set.h
  12. +44
    -0
      db/write_batch.cc
  13. +6
    -0
      db/write_batch_internal.h
  14. +1
    -1
      fielddb/field_db.h
  15. +107
    -0
      kv_sep/kvlog.cc
  16. +67
    -0
      kv_sep/kvlog.h
  17. +0
    -0
      kv_sep/kvlog_cache.cc
  18. +0
    -0
      kv_sep/kvlog_cache.h
  19. +73
    -0
      test/test.cc

+ 7
- 0
CMakeLists.txt View File

@ -200,6 +200,8 @@ target_sources(leveldb
"fielddb/request.h"
"testdb/testdb.cc"
"testdb/testdb.h"
"kv_sep/kvlog.cc"
"kv_sep/kvlog.h"
# Only CMake 3.3+ supports PUBLIC sources in targets exported by "install".
$<$<VERSION_GREATER:CMAKE_VERSION,3.2>:PUBLIC>
@ -544,3 +546,8 @@ add_executable(recover_test
"${PROJECT_SOURCE_DIR}/test/recover_test.cc"
)
target_link_libraries(recover_test PRIVATE leveldb gtest)
add_executable(test1
"${PROJECT_SOURCE_DIR}/test/test.cc"
)
target_link_libraries(test1 PRIVATE leveldb gtest)

+ 71
- 3
db/db_impl.cc View File

@ -6,8 +6,10 @@
#include <algorithm>
#include <atomic>
#include <cassert>
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <set>
#include <string>
#include <vector>
@ -24,9 +26,12 @@
#include "db/write_batch_internal.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/filter_policy.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "leveldb/table.h"
#include "leveldb/table_builder.h"
#include "leveldb/write_batch.h"
#include "port/port.h"
#include "table/block.h"
#include "table/merger.h"
@ -35,6 +40,7 @@
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/serialize_value.h"
#include "kv_sep/kvlog.h"
namespace leveldb {
@ -145,6 +151,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
log_(nullptr),
seed_(0),
tmp_batch_(new WriteBatch),
tmp_kp_batch_(new WriteBatch),
background_compaction_scheduled_(false),
manual_compaction_(nullptr),
versions_(new VersionSet(dbname_, &options_, table_cache_,
@ -235,6 +242,8 @@ void DBImpl::RemoveObsoleteFiles() {
// Make a set of all of the live files
std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live);
//将所有的live的kvlog加入集合,防止被删除
versions_->AddLiveKVLogs(&live);
std::vector<std::string> filenames;
env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
@ -540,6 +549,10 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
meta.largest);
}
edit->AddKVLogs(imm_kvlogfile_number);
imm_kvlogfile_number = 0;
delete imm_kvlogfile_;
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.file_size;
@ -1118,6 +1131,18 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
return versions_->MaxNextLevelOverlappingBytes();
}
Slice DBImpl::GetValueFromFP(const FilePointer &fp,std::string *value) {
RandomAccessFile *file;
Status s = env_->NewRandomAccessFile(KVLogFileName(dbname_, fp.FileNumber), &file);
Slice slice;
char *buf = new char[fp.Size];
s = file->Read(fp.FileOffset, fp.Size, &slice, buf);
*value = slice.ToString();
delete[] buf;
delete file;
return slice;
}
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
@ -1159,6 +1184,15 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
if(!s.ok()) {
// printf( "not found : %s\n",key.ToString().c_str());
} else {
// printf("found key:%s value:%s\n",key.ToString().c_str(),value->c_str());
FilePointer fp;
DecodeFp(fp, value->data());
GetValueFromFP(fp, value);
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
@ -1252,7 +1286,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(updates == nullptr);
uint64_t last_sequence = versions_->LastSequence();
uint64_t last_sequence = versions_->LastSequence(), temp_seq = last_sequence;
Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
@ -1266,7 +1300,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
{
mutex_.Unlock();
// uint64_t start_write = env_->NowMicros();
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
FilePointer fp;
//1. 将WriteBatch写入到kvlog中
status = kvlog_->AddRecord(WriteBatchInternal::Contents(write_batch), fp);
//2. 将writebatch的filepointer写入到log中
// status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
char rep[8 * 3];
EncodeFP(fp, rep);
status = log_->AddRecord(Slice(rep, 3 * 8));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
@ -1274,8 +1315,11 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
sync_error = true;
}
}
//3. 根据write_batch里面的内容,构建kp_batch
WriteBatchInternal::ConstructKPBatch(tmp_kp_batch_, write_batch, fp);
WriteBatchInternal::SetSequence(tmp_kp_batch_,temp_seq + 1);
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
status = WriteBatchInternal::InsertInto(tmp_kp_batch_, mem_);
}
// BatchSize += write_batch->ApproximateSize();
// write_elapsed += env_->NowMicros() - start_write;
@ -1412,6 +1456,22 @@ Status DBImpl::MakeRoomForWrite(bool force) {
versions_->ReuseFileNumber(new_log_number);
break;
}
/*更换新的kvlog*/
uint64_t new_kvlog_number = versions_->NewFileNumber();
WritableFile* kvlogfile = nullptr;
s = env_->NewWritableFile(KVLogFileName(dbname_,new_kvlog_number),&kvlogfile);
if(!s.ok()) {
versions_->ReuseFileNumber(new_kvlog_number);
break;
}
pending_outputs_.insert(new_kvlog_number);
kvlogfile_->Close();
imm_kvlogfile_ = kvlogfile_;
kvlogfile_ = kvlogfile;
imm_kvlogfile_number = kvlogfile_number_;
kvlogfile_number_ = new_kvlog_number;
delete kvlog_;
kvlog_ = new KVLog(kvlogfile,new_kvlog_number);
delete log_;
@ -1566,6 +1626,14 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref();
}
uint64_t new_kvlog_number = impl->versions_->NewFileNumber();
WritableFile* kvlogfile;
s = options.env->NewWritableFile(KVLogFileName(dbname, new_kvlog_number), &kvlogfile);
impl->pending_outputs_.insert(new_kvlog_number);
impl->kvlogfile_number_ = new_kvlog_number;
impl->kvlogfile_ = kvlogfile;
impl->kvlog_ = new KVLog(kvlogfile,new_kvlog_number);
}
if (s.ok() && save_manifest) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery.

+ 14
- 0
db/db_impl.h View File

@ -18,10 +18,14 @@
#include "db/snapshot.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/slice.h"
#include "leveldb/write_batch.h"
#include "port/port.h"
#include "port/thread_annotations.h"
#include "util/serialize_value.h"
#include "kv_sep/kvlog.h"
namespace leveldb {
class MemTable;
@ -81,6 +85,8 @@ class DBImpl : public DB {
// bytes.
void RecordReadSample(Slice key);
Slice GetValueFromFP(const FilePointer &fp,std::string *value);
private:
friend class DB;
struct CompactionState;
@ -190,11 +196,19 @@ class DBImpl : public DB {
WritableFile* logfile_;
uint64_t logfile_number_ GUARDED_BY(mutex_);
log::Writer* log_;
/*kvlog begin*/
WritableFile* kvlogfile_;
uint64_t kvlogfile_number_ GUARDED_BY(mutex_);
KVLog *kvlog_;
WritableFile* imm_kvlogfile_;
uint64_t imm_kvlogfile_number;
/*kvlog end*/
uint32_t seed_ GUARDED_BY(mutex_); // For sampling.
// Queue of writers.
std::deque<Writer*> writers_ GUARDED_BY(mutex_);
WriteBatch* tmp_batch_ GUARDED_BY(mutex_);
WriteBatch* tmp_kp_batch_ GUARDED_BY(mutex_);//memtable和SSTable的key,pointer
SnapshotList snapshots_ GUARDED_BY(mutex_);

+ 9
- 0
db/filename.cc View File

@ -5,10 +5,12 @@
#include "db/filename.h"
#include <cassert>
#include <cstdint>
#include <cstdio>
#include "db/dbformat.h"
#include "leveldb/env.h"
#include "leveldb/slice.h"
#include "util/logging.h"
namespace leveldb {
@ -30,6 +32,11 @@ std::string LogFileName(const std::string& dbname, uint64_t number) {
return MakeFileName(dbname, number, "log");
}
std::string KVLogFileName(const std::string& dbname, uint64_t number) {
assert(number > 0);
return MakeFileName(dbname, number, "kvlog");
}
std::string TableFileName(const std::string& dbname, uint64_t number) {
assert(number > 0);
return MakeFileName(dbname, number, "ldb");
@ -112,6 +119,8 @@ bool ParseFileName(const std::string& filename, uint64_t* number,
*type = kTableFile;
} else if (suffix == Slice(".dbtmp")) {
*type = kTempFile;
} else if (suffix == Slice(".kvlog")) {
*type = kKVLogFile;
} else {
return false;
}

+ 3
- 0
db/filename.h View File

@ -25,6 +25,7 @@ enum FileType {
kDescriptorFile,
kCurrentFile,
kTempFile,
kKVLogFile,
kInfoLogFile // Either the current one, or an old one
};
@ -33,6 +34,8 @@ enum FileType {
// "dbname".
std::string LogFileName(const std::string& dbname, uint64_t number);
std::string KVLogFileName(const std::string& dbname, uint64_t number);
// Return the name of the sstable with the specified number
// in the db named by "dbname". The result will be prefixed with
// "dbname".

+ 6
- 2
db/log_writer.cc View File

@ -4,6 +4,7 @@
#include "db/log_writer.h"
#include "db/log_format.h"
#include <cstdint>
#include "leveldb/env.h"
@ -20,12 +21,12 @@ static void InitTypeCrc(uint32_t* type_crc) {
}
}
Writer::Writer(WritableFile* dest) : dest_(dest), block_offset_(0) {
Writer::Writer(WritableFile* dest) : dest_(dest), block_offset_(0), pos_(0) {
InitTypeCrc(type_crc_);
}
Writer::Writer(WritableFile* dest, uint64_t dest_length)
: dest_(dest), block_offset_(dest_length % kBlockSize) {
: dest_(dest), block_offset_(dest_length % kBlockSize), pos_(dest_length) {
InitTypeCrc(type_crc_);
}
@ -49,6 +50,7 @@ Status Writer::AddRecord(const Slice& slice) {
// Fill the trailer (literal below relies on kHeaderSize being 7)
static_assert(kHeaderSize == 7, "");
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
pos_ += leftover;
}
block_offset_ = 0;
}
@ -97,8 +99,10 @@ Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr,
// Write the header and the payload
Status s = dest_->Append(Slice(buf, kHeaderSize));
pos_ += kHeaderSize;
if (s.ok()) {
s = dest_->Append(Slice(ptr, length));
pos_ += length;
if (s.ok()) {
s = dest_->Flush();
}

+ 1
- 0
db/log_writer.h View File

@ -41,6 +41,7 @@ class Writer {
WritableFile* dest_;
int block_offset_; // Current offset in block
uint64_t pos_;
// crc32c values for all supported record types. These are
// pre-computed to reduce the overhead of computing the crc of the

+ 30
- 1
db/version_edit.cc View File

@ -20,7 +20,9 @@ enum Tag {
kDeletedFile = 6,
kNewFile = 7,
// 8 was used for large value refs
kPrevLogNumber = 9
kPrevLogNumber = 9,
kNewKVLog = 10,
kDeletedKVLog = 11,
};
void VersionEdit::Clear() {
@ -82,6 +84,17 @@ void VersionEdit::EncodeTo(std::string* dst) const {
PutLengthPrefixedSlice(dst, f.smallest.Encode());
PutLengthPrefixedSlice(dst, f.largest.Encode());
}
for(int i = 0; i < new_kvlogs_.size(); i++) {
const FileMetaData& f = new_kvlogs_[i];
PutVarint32(dst, kNewKVLog);
PutVarint64(dst, f.number);
}
for(const auto& deleted_kvlog_number : deleted_kvlogs_) {
PutVarint32(dst,kDeletedKVLog);
PutVarint64(dst, deleted_kvlog_number);
}
}
static bool GetInternalKey(Slice* input, InternalKey* dst) {
@ -186,6 +199,22 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
}
break;
case kNewKVLog:
if(GetVarint64(&input, &f.number)) {
new_kvlogs_.push_back(f);
} else {
msg = "new-kvlog entry";
}
break;
case kDeletedKVLog:
if(GetVarint64(&input, &number)) {
deleted_kvlogs_.insert(number);
} else {
msg = "deleted kvlog";
}
break;
default:
msg = "unknown tag";
break;

+ 16
- 0
db/version_edit.h View File

@ -5,6 +5,7 @@
#ifndef STORAGE_LEVELDB_DB_VERSION_EDIT_H_
#define STORAGE_LEVELDB_DB_VERSION_EDIT_H_
#include <cstdint>
#include <set>
#include <utility>
#include <vector>
@ -75,6 +76,18 @@ class VersionEdit {
deleted_files_.insert(std::make_pair(level, file));
}
//TODO:
void AddKVLogs(uint64_t file) {
FileMetaData f;
f.number = file;
new_kvlogs_.push_back(f);
}
void RemoveKVLogs(uint64_t file) {
deleted_kvlogs_.insert(file);
}
void EncodeTo(std::string* dst) const;
Status DecodeFrom(const Slice& src);
@ -99,6 +112,9 @@ class VersionEdit {
std::vector<std::pair<int, InternalKey>> compact_pointers_;
DeletedFileSet deleted_files_;
std::vector<std::pair<int, FileMetaData>> new_files_;
std::set<uint64_t> deleted_kvlogs_;
std::vector<FileMetaData> new_kvlogs_;
};
} // namespace leveldb

+ 43
- 0
db/version_set.cc View File

@ -4,7 +4,9 @@
#include "db/version_set.h"
#include "db/version_edit.h"
#include <algorithm>
#include <cstdint>
#include <cstdio>
#include "db/filename.h"
@ -583,6 +585,12 @@ class VersionSet::Builder {
}
};
struct BySmallestFileNum {
bool operator()(FileMetaData* f1, FileMetaData* f2) const {
return (f1->number < f2->number);
}
};
typedef std::set<FileMetaData*, BySmallestKey> FileSet;
struct LevelState {
std::set<uint64_t> deleted_files;
@ -593,6 +601,9 @@ class VersionSet::Builder {
Version* base_;
LevelState levels_[config::kNumLevels];
std::set<uint64_t> deleted_kvlogs;
std::set<FileMetaData*,BySmallestFileNum> added_kvlogs;
public:
// Initialize a builder with the files from *base and other info from *vset
Builder(VersionSet* vset, Version* base) : vset_(vset), base_(base) {
@ -666,6 +677,17 @@ class VersionSet::Builder {
levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
}
//Delete KVLogs
for(const auto& deleted_kvlog : edit->deleted_kvlogs_) {
deleted_kvlogs.insert(deleted_kvlog);
}
//Add new KVLogs
for(const auto& added_kvlog : edit->new_kvlogs_) {
FileMetaData *f = new FileMetaData(added_kvlog);
f->refs = 1;
added_kvlogs.insert(f);
}
}
// Save the current state in *v.
@ -712,6 +734,18 @@ class VersionSet::Builder {
}
#endif
}
//合并一个版本的kvlog以及当前edit的新增,但是不包含被删除的kvlog
for(const auto &kvlog : base_->kvlogs_) {
if(!deleted_kvlogs.count(kvlog->number)) continue;
v->kvlogs_.push_back(kvlog);
kvlog->refs++;
}
for(const auto &kvlog : added_kvlogs) {
if(!deleted_kvlogs.count(kvlog->number)) continue;
v->kvlogs_.push_back(kvlog);
kvlog->refs++;
}
}
void MaybeAddFile(Version* v, int level, FileMetaData* f) {
@ -1158,6 +1192,15 @@ void VersionSet::AddLiveFiles(std::set* live) {
}
}
void VersionSet::AddLiveKVLogs(std::set<uint64_t>* live_kvlogs_) {
for(Version* v = dummy_versions_.next_; v != &dummy_versions_;
v = v->next_) {
for(int i = 0; i < v->kvlogs_.size(); i++) {
live_kvlogs_->insert(v->kvlogs_[i]->number);
}
}
}
int64_t VersionSet::NumLevelBytes(int level) const {
assert(level >= 0);
assert(level < config::kNumLevels);

+ 4
- 0
db/version_set.h View File

@ -15,6 +15,7 @@
#ifndef STORAGE_LEVELDB_DB_VERSION_SET_H_
#define STORAGE_LEVELDB_DB_VERSION_SET_H_
#include <cstdint>
#include <map>
#include <set>
#include <vector>
@ -152,6 +153,7 @@ class Version {
// List of files per level
std::vector<FileMetaData*> files_[config::kNumLevels];
std::vector<FileMetaData*> kvlogs_;
// Next file to compact based on seek stats.
FileMetaData* file_to_compact_;
@ -258,6 +260,8 @@ class VersionSet {
// May also mutate some internal state.
void AddLiveFiles(std::set<uint64_t>* live);
void AddLiveKVLogs(std::set<uint64_t>* live_kvlogs);
// Return the approximate offset in the database of the data for
// "key" as of version "v".
uint64_t ApproximateOffsetOf(Version* v, const InternalKey& key);

+ 44
- 0
db/write_batch.cc View File

@ -15,12 +15,18 @@
#include "leveldb/write_batch.h"
#include "db/db_impl.h"
#include "db/dbformat.h"
#include "db/memtable.h"
#include "db/write_batch_internal.h"
#include <cstdint>
#include "leveldb/db.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "util/coding.h"
#include "kv_sep/kvlog.h"
namespace leveldb {
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
@ -147,4 +153,42 @@ void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
}
class KPBatchConstructor : public WriteBatch::Handler {
public:
void Put(const Slice& key, const Slice& value) override {
int addend_key = EncodeVarint32(buf, key.size()) - buf;
int addend_value = EncodeVarint32(buf, value.size()) - buf;
fp.FileOffset += 1 + addend_key + key.size() + addend_value;
fp.Size = value.size();
EncodeFP(fp, rep);
kp_batch->Put(key, Slice(rep,3 * 8));
fp.FileOffset += fp.Size;
// printf("key:%s, file num:%ld,offset:%ld,size:%ld\n",key.ToString().c_str(),fp.FileNumber,fp.FileOffset,fp.Size);
}
void Delete(const Slice& key) override {
kp_batch->Delete(key);
int addend= EncodeVarint32(buf, key.size()) - buf;
fp.FileOffset += 1 + addend + key.size();
}
WriteBatch *kp_batch;
FilePointer fp;
char rep[8 * 3];
char buf[5];
};
Status WriteBatchInternal::ConstructKPBatch(WriteBatch *kp_batch,
const WriteBatch *write_batch, FilePointer fp) {
kp_batch->Clear();
KPBatchConstructor constructor;
constructor.kp_batch = kp_batch;
constructor.fp.FileNumber = fp.FileNumber;
constructor.fp.FileOffset = fp.FileOffset + kHeader;
constructor.fp.Size = 0;
return write_batch->Iterate(&constructor);
}
} // namespace leveldb

+ 6
- 0
db/write_batch_internal.h View File

@ -5,7 +5,10 @@
#ifndef STORAGE_LEVELDB_DB_WRITE_BATCH_INTERNAL_H_
#define STORAGE_LEVELDB_DB_WRITE_BATCH_INTERNAL_H_
#include "db/db_impl.h"
#include "db/dbformat.h"
#include <cstdint>
#include "leveldb/status.h"
#include "leveldb/write_batch.h"
namespace leveldb {
@ -38,6 +41,9 @@ class WriteBatchInternal {
static Status InsertInto(const WriteBatch* batch, MemTable* memtable);
static void Append(WriteBatch* dst, const WriteBatch* src);
static Status ConstructKPBatch(WriteBatch* kp_batch,
const WriteBatch* write_batch,FilePointer fp);
};
} // namespace leveldb

+ 1
- 1
fielddb/field_db.h View File

@ -145,7 +145,7 @@ private:
// std::fflush(stdout);
// }
// }
// };
};
Status DestroyDB(const std::string& name,
const Options& options);

+ 107
- 0
kv_sep/kvlog.cc View File

@ -0,0 +1,107 @@
#include "kv_sep/kvlog.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include "db/write_batch_internal.h"
#include <cstdint>
#include "leveldb/env.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "util/coding.h"
namespace leveldb {
void EncodeFP(const struct FilePointer &fp, char *scratch) {
EncodeFixed64(scratch, fp.FileNumber);
EncodeFixed64(scratch + 8, fp.FileOffset);
EncodeFixed64(scratch + 16, fp.Size);
}
void DecodeFp(struct FilePointer &fp, char *src) {
fp.FileNumber = DecodeFixed64(src);
fp.FileOffset = DecodeFixed64(src + 8);
fp.Size = DecodeFixed64(src + 16);
}
KVLog::KVLog(WritableFile *dest,uint64_t file_number):
dest_(dest),pos_(0),file_number(file_number) {}
KVLog::KVLog(WritableFile *dest, uint64_t dest_length,uint64_t file_number):
dest_(dest),pos_(dest_length),file_number(file_number) {}
Status KVLog::AddRecord(const Slice &slice, FilePointer &fp) {
//写入slice大小
EncodeFixed64(buf, slice.size());
Status s = dest_->Append(Slice(buf,8));
pos_ += 8;
//写入slice
fp.FileNumber = file_number;
fp.FileOffset = pos_;
fp.Size = slice.size();
s = dest_->Append(slice);
pos_ += slice.size();
if(s.ok()) {
s = dest_->Flush();
}
return s;
}
void KVLogReader::Next() {
if(input.empty()) {
NextWriteBatch();
}
NextKV();
}
void KVLogReader::NextWriteBatch() {
Slice num;
file->Read(8, &num, number);
if(num.size() !=8) {
valid = false;
return;
}
uint64_t batch_size = DecodeFixed64(number);
if(batch_size > rep_size) {
delete[] rep_;
rep_ = new char[batch_size];
rep_size = batch_size;
}
input.clear();
file->Read(batch_size,&input,rep_);
if(input.size() != batch_size) {
valid = false;
return;
}
seq = DecodeFixed64(input.data());
input.remove_prefix(12); //remove writebatch header
}
void KVLogReader::NextKV() {
type = (ValueType)input[0];
input.remove_prefix(1);
switch (type) {
case kTypeValue:
if(GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
} else {
valid = false;
return;
}
break;
case kTypeDeletion:
if(GetLengthPrefixedSlice(&input, &key)) {
} else {
valid = false;
return;
}
break;
default:
valid = false;
return;
}
}
}

+ 67
- 0
kv_sep/kvlog.h View File

@ -0,0 +1,67 @@
#pragma once
#include "db/dbformat.h"
#include <cstddef>
#include <cstdint>
#include "leveldb/env.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
#include "leveldb/write_batch.h"
namespace leveldb {
struct FilePointer {
uint64_t FileNumber;
uint64_t FileOffset;
uint64_t Size;
};
void EncodeFP(const struct FilePointer &fp,char *scratch);
void DecodeFp(struct FilePointer &fp, char *src);
class KVLog {
public:
explicit KVLog(WritableFile *dest,uint64_t file_number);
KVLog(WritableFile *dest, uint64_t dest_length,uint64_t file_number);
KVLog(const KVLog&) = delete;
KVLog& operator=(const KVLog&) = delete;
~KVLog() = default;
Status AddRecord(const Slice& slice,FilePointer &fp);
private:
WritableFile* dest_;
uint64_t pos_;
uint64_t file_number;
char buf[8];
};
class KVLogReader {
public:
KVLogReader(SequentialFile *file):file(file),rep_(nullptr),rep_size(0),valid(true)
{ Next();};
ValueType Type() {return type;}
Slice Key() {return key;}
Slice Value() {return value;}
SequenceNumber Seq() {return seq;}
void Next();
void Valid();
private:
void NextWriteBatch();
void NextKV();
private:
Slice key;
Slice value;
SequenceNumber seq;
ValueType type;
bool valid;
SequentialFile *file;
Slice input;
uint64_t rep_size;
char *rep_;
char number[8];
};
}

+ 0
- 0
kv_sep/kvlog_cache.cc View File


+ 0
- 0
kv_sep/kvlog_cache.h View File


+ 73
- 0
test/test.cc View File

@ -0,0 +1,73 @@
#include "leveldb/db.h"
#include "leveldb/options.h"
#include <bits/stdc++.h>
#include <string>
using namespace std;
using namespace leveldb;
int main() {
DB* db = nullptr;
Options op;
op.create_if_missing = true;
DestroyDB("testdb", Options());
Status status = DB::Open(op, "testdb", &db);
// assert(status.ok());
// string key = "leveldb",value = to_string(0);
// string res;
// for(int i = 0; i < 5; i++) {
// db->Put(WriteOptions(),key,to_string(i));
// }
// db->Put(WriteOptions(),key+key,to_string(0));
// db->Put(WriteOptions(),key+key,to_string(1));
// sleep(1);
// auto snapshot = db->GetSnapshot();
// auto readopts = ReadOptions();
// readopts.snapshot = snapshot;
// db->Put(WriteOptions(),key,to_string(10));
// db->CompactRange(nullptr,nullptr);
// db->Get(readopts,key,&res);
// cout<<"with snapshot:"<<res<<endl;
// res.clear();
// db->Get(ReadOptions(),key,&res);
// cout<<"without snapshot:"<<res<<endl;
// string value = "leveldb",key = "abc";
// db->Delete(WriteOptions(),key);
// db->Put(WriteOptions(),key,value);
// const Snapshot *snapshot = db->GetSnapshot();
// ReadOptions read_op = ReadOptions();
// read_op.snapshot = snapshot;
// string result;
// db->Get(read_op,key,&result);
// cout<<result<<endl;
// db->Delete(WriteOptions(),key);
// result.clear();
// db->Get(read_op,key,&result);
// cout<<"with snapshot:"<<result<<endl;
// result.clear();
// db->Get(ReadOptions(),key,&result);
// cout<<"without snapshot:"<<result<<endl;
// db->ReleaseSnapshot(snapshot);
string key = "abc",value = "leveldb";
for(int i = 0; i < 20; i++) {
db->Put(WriteOptions(),key + std::to_string(i),value + std::to_string(i));
}
std::string res;
for(int i = 0; i < 10; i++) {
db->Get(ReadOptions(), key + std::to_string(i), &res);
std::cout << i << " " << res << std::endl;
}
delete db;
// DB::Open(op,"testdb",&db);
// key = "abc", value = "";
// db->Get(ReadOptions(),key,&value);
// cout<<value;
// // DestroyDB("testdb",op);
// delete db;
return 0;
}

Loading…
Cancel
Save