From ab67267dfd9f1e95f87f07ebc0314157a88abf6e Mon Sep 17 00:00:00 2001 From: cyq <1056374449@qq.com> Date: Sun, 5 Jan 2025 08:15:20 +0800 Subject: [PATCH] =?UTF-8?q?=E9=99=A4=E4=BA=86kvlog=E5=9B=9E=E6=94=B6?= =?UTF-8?q?=E5=A4=96=E7=9A=84=E6=89=80=E6=9C=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 7 +++ db/db_impl.cc | 74 ++++++++++++++++++++++++++++++-- db/db_impl.h | 14 ++++++ db/filename.cc | 9 ++++ db/filename.h | 3 ++ db/log_writer.cc | 8 +++- db/log_writer.h | 1 + db/version_edit.cc | 31 +++++++++++++- db/version_edit.h | 16 +++++++ db/version_set.cc | 43 +++++++++++++++++++ db/version_set.h | 4 ++ db/write_batch.cc | 44 +++++++++++++++++++ db/write_batch_internal.h | 6 +++ fielddb/field_db.h | 2 +- kv_sep/kvlog.cc | 107 ++++++++++++++++++++++++++++++++++++++++++++++ kv_sep/kvlog.h | 67 +++++++++++++++++++++++++++++ kv_sep/kvlog_cache.cc | 0 kv_sep/kvlog_cache.h | 0 test/test.cc | 73 +++++++++++++++++++++++++++++++ 19 files changed, 502 insertions(+), 7 deletions(-) create mode 100644 kv_sep/kvlog.cc create mode 100644 kv_sep/kvlog.h create mode 100644 kv_sep/kvlog_cache.cc create mode 100644 kv_sep/kvlog_cache.h create mode 100644 test/test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index d0ea7d5..7674ff0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -200,6 +200,8 @@ target_sources(leveldb "fielddb/request.h" "testdb/testdb.cc" "testdb/testdb.h" + "kv_sep/kvlog.cc" + "kv_sep/kvlog.h" # Only CMake 3.3+ supports PUBLIC sources in targets exported by "install". $<$:PUBLIC> @@ -544,3 +546,8 @@ add_executable(recover_test "${PROJECT_SOURCE_DIR}/test/recover_test.cc" ) target_link_libraries(recover_test PRIVATE leveldb gtest) + +add_executable(test1 + "${PROJECT_SOURCE_DIR}/test/test.cc" +) +target_link_libraries(test1 PRIVATE leveldb gtest) diff --git a/db/db_impl.cc b/db/db_impl.cc index 6b28260..5f87425 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -6,8 +6,10 @@ #include #include +#include #include #include +#include #include #include #include @@ -24,9 +26,12 @@ #include "db/write_batch_internal.h" #include "leveldb/db.h" #include "leveldb/env.h" +#include "leveldb/filter_policy.h" +#include "leveldb/slice.h" #include "leveldb/status.h" #include "leveldb/table.h" #include "leveldb/table_builder.h" +#include "leveldb/write_batch.h" #include "port/port.h" #include "table/block.h" #include "table/merger.h" @@ -35,6 +40,7 @@ #include "util/logging.h" #include "util/mutexlock.h" #include "util/serialize_value.h" +#include "kv_sep/kvlog.h" namespace leveldb { @@ -145,6 +151,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) log_(nullptr), seed_(0), tmp_batch_(new WriteBatch), + tmp_kp_batch_(new WriteBatch), background_compaction_scheduled_(false), manual_compaction_(nullptr), versions_(new VersionSet(dbname_, &options_, table_cache_, @@ -235,6 +242,8 @@ void DBImpl::RemoveObsoleteFiles() { // Make a set of all of the live files std::set live = pending_outputs_; versions_->AddLiveFiles(&live); + //将所有的live的kvlog加入集合,防止被删除 + versions_->AddLiveKVLogs(&live); std::vector filenames; env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose @@ -540,6 +549,10 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, meta.largest); } + edit->AddKVLogs(imm_kvlogfile_number); + imm_kvlogfile_number = 0; + delete imm_kvlogfile_; + CompactionStats stats; stats.micros = env_->NowMicros() - start_micros; stats.bytes_written = meta.file_size; @@ -1118,6 +1131,18 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { return versions_->MaxNextLevelOverlappingBytes(); } +Slice DBImpl::GetValueFromFP(const FilePointer &fp,std::string *value) { + RandomAccessFile *file; + Status s = env_->NewRandomAccessFile(KVLogFileName(dbname_, fp.FileNumber), &file); + Slice slice; + char *buf = new char[fp.Size]; + s = file->Read(fp.FileOffset, fp.Size, &slice, buf); + *value = slice.ToString(); + delete[] buf; + delete file; + return slice; +} + Status DBImpl::Get(const ReadOptions& options, const Slice& key, std::string* value) { Status s; @@ -1159,6 +1184,15 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, if (have_stat_update && current->UpdateStats(stats)) { MaybeScheduleCompaction(); } + if(!s.ok()) { + // printf( "not found : %s\n",key.ToString().c_str()); + } else { + // printf("found key:%s value:%s\n",key.ToString().c_str(),value->c_str()); + FilePointer fp; + DecodeFp(fp, value->data()); + GetValueFromFP(fp, value); + } + mem->Unref(); if (imm != nullptr) imm->Unref(); current->Unref(); @@ -1252,7 +1286,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { // May temporarily unlock and wait. Status status = MakeRoomForWrite(updates == nullptr); - uint64_t last_sequence = versions_->LastSequence(); + uint64_t last_sequence = versions_->LastSequence(), temp_seq = last_sequence; Writer* last_writer = &w; if (status.ok() && updates != nullptr) { // nullptr batch is for compactions WriteBatch* write_batch = BuildBatchGroup(&last_writer); @@ -1266,7 +1300,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { { mutex_.Unlock(); // uint64_t start_write = env_->NowMicros(); - status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); + FilePointer fp; + //1. 将WriteBatch写入到kvlog中 + status = kvlog_->AddRecord(WriteBatchInternal::Contents(write_batch), fp); + //2. 将writebatch的filepointer写入到log中 + // status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); + char rep[8 * 3]; + EncodeFP(fp, rep); + status = log_->AddRecord(Slice(rep, 3 * 8)); bool sync_error = false; if (status.ok() && options.sync) { status = logfile_->Sync(); @@ -1274,8 +1315,11 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { sync_error = true; } } + //3. 根据write_batch里面的内容,构建kp_batch + WriteBatchInternal::ConstructKPBatch(tmp_kp_batch_, write_batch, fp); + WriteBatchInternal::SetSequence(tmp_kp_batch_,temp_seq + 1); if (status.ok()) { - status = WriteBatchInternal::InsertInto(write_batch, mem_); + status = WriteBatchInternal::InsertInto(tmp_kp_batch_, mem_); } // BatchSize += write_batch->ApproximateSize(); // write_elapsed += env_->NowMicros() - start_write; @@ -1412,6 +1456,22 @@ Status DBImpl::MakeRoomForWrite(bool force) { versions_->ReuseFileNumber(new_log_number); break; } + /*更换新的kvlog*/ + uint64_t new_kvlog_number = versions_->NewFileNumber(); + WritableFile* kvlogfile = nullptr; + s = env_->NewWritableFile(KVLogFileName(dbname_,new_kvlog_number),&kvlogfile); + if(!s.ok()) { + versions_->ReuseFileNumber(new_kvlog_number); + break; + } + pending_outputs_.insert(new_kvlog_number); + kvlogfile_->Close(); + imm_kvlogfile_ = kvlogfile_; + kvlogfile_ = kvlogfile; + imm_kvlogfile_number = kvlogfile_number_; + kvlogfile_number_ = new_kvlog_number; + delete kvlog_; + kvlog_ = new KVLog(kvlogfile,new_kvlog_number); delete log_; @@ -1566,6 +1626,14 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { impl->mem_ = new MemTable(impl->internal_comparator_); impl->mem_->Ref(); } + + uint64_t new_kvlog_number = impl->versions_->NewFileNumber(); + WritableFile* kvlogfile; + s = options.env->NewWritableFile(KVLogFileName(dbname, new_kvlog_number), &kvlogfile); + impl->pending_outputs_.insert(new_kvlog_number); + impl->kvlogfile_number_ = new_kvlog_number; + impl->kvlogfile_ = kvlogfile; + impl->kvlog_ = new KVLog(kvlogfile,new_kvlog_number); } if (s.ok() && save_manifest) { edit.SetPrevLogNumber(0); // No older logs needed after recovery. diff --git a/db/db_impl.h b/db/db_impl.h index 241125e..be4abc2 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -18,10 +18,14 @@ #include "db/snapshot.h" #include "leveldb/db.h" #include "leveldb/env.h" +#include "leveldb/slice.h" +#include "leveldb/write_batch.h" #include "port/port.h" #include "port/thread_annotations.h" #include "util/serialize_value.h" +#include "kv_sep/kvlog.h" + namespace leveldb { class MemTable; @@ -81,6 +85,8 @@ class DBImpl : public DB { // bytes. void RecordReadSample(Slice key); + Slice GetValueFromFP(const FilePointer &fp,std::string *value); + private: friend class DB; struct CompactionState; @@ -190,11 +196,19 @@ class DBImpl : public DB { WritableFile* logfile_; uint64_t logfile_number_ GUARDED_BY(mutex_); log::Writer* log_; +/*kvlog begin*/ + WritableFile* kvlogfile_; + uint64_t kvlogfile_number_ GUARDED_BY(mutex_); + KVLog *kvlog_; + WritableFile* imm_kvlogfile_; + uint64_t imm_kvlogfile_number; +/*kvlog end*/ uint32_t seed_ GUARDED_BY(mutex_); // For sampling. // Queue of writers. std::deque writers_ GUARDED_BY(mutex_); WriteBatch* tmp_batch_ GUARDED_BY(mutex_); + WriteBatch* tmp_kp_batch_ GUARDED_BY(mutex_);//表示实际写入memtable和SSTable的(key,pointer)对 SnapshotList snapshots_ GUARDED_BY(mutex_); diff --git a/db/filename.cc b/db/filename.cc index e526249..9a6c5e1 100644 --- a/db/filename.cc +++ b/db/filename.cc @@ -5,10 +5,12 @@ #include "db/filename.h" #include +#include #include #include "db/dbformat.h" #include "leveldb/env.h" +#include "leveldb/slice.h" #include "util/logging.h" namespace leveldb { @@ -30,6 +32,11 @@ std::string LogFileName(const std::string& dbname, uint64_t number) { return MakeFileName(dbname, number, "log"); } +std::string KVLogFileName(const std::string& dbname, uint64_t number) { + assert(number > 0); + return MakeFileName(dbname, number, "kvlog"); +} + std::string TableFileName(const std::string& dbname, uint64_t number) { assert(number > 0); return MakeFileName(dbname, number, "ldb"); @@ -112,6 +119,8 @@ bool ParseFileName(const std::string& filename, uint64_t* number, *type = kTableFile; } else if (suffix == Slice(".dbtmp")) { *type = kTempFile; + } else if (suffix == Slice(".kvlog")) { + *type = kKVLogFile; } else { return false; } diff --git a/db/filename.h b/db/filename.h index 563c6d8..e563045 100644 --- a/db/filename.h +++ b/db/filename.h @@ -25,6 +25,7 @@ enum FileType { kDescriptorFile, kCurrentFile, kTempFile, + kKVLogFile, kInfoLogFile // Either the current one, or an old one }; @@ -33,6 +34,8 @@ enum FileType { // "dbname". std::string LogFileName(const std::string& dbname, uint64_t number); +std::string KVLogFileName(const std::string& dbname, uint64_t number); + // Return the name of the sstable with the specified number // in the db named by "dbname". The result will be prefixed with // "dbname". diff --git a/db/log_writer.cc b/db/log_writer.cc index ad66bfb..cda7722 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -4,6 +4,7 @@ #include "db/log_writer.h" +#include "db/log_format.h" #include #include "leveldb/env.h" @@ -20,12 +21,12 @@ static void InitTypeCrc(uint32_t* type_crc) { } } -Writer::Writer(WritableFile* dest) : dest_(dest), block_offset_(0) { +Writer::Writer(WritableFile* dest) : dest_(dest), block_offset_(0), pos_(0) { InitTypeCrc(type_crc_); } Writer::Writer(WritableFile* dest, uint64_t dest_length) - : dest_(dest), block_offset_(dest_length % kBlockSize) { + : dest_(dest), block_offset_(dest_length % kBlockSize), pos_(dest_length) { InitTypeCrc(type_crc_); } @@ -49,6 +50,7 @@ Status Writer::AddRecord(const Slice& slice) { // Fill the trailer (literal below relies on kHeaderSize being 7) static_assert(kHeaderSize == 7, ""); dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover)); + pos_ += leftover; } block_offset_ = 0; } @@ -97,8 +99,10 @@ Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, // Write the header and the payload Status s = dest_->Append(Slice(buf, kHeaderSize)); + pos_ += kHeaderSize; if (s.ok()) { s = dest_->Append(Slice(ptr, length)); + pos_ += length; if (s.ok()) { s = dest_->Flush(); } diff --git a/db/log_writer.h b/db/log_writer.h index ad36794..84cee54 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -41,6 +41,7 @@ class Writer { WritableFile* dest_; int block_offset_; // Current offset in block + uint64_t pos_; // crc32c values for all supported record types. These are // pre-computed to reduce the overhead of computing the crc of the diff --git a/db/version_edit.cc b/db/version_edit.cc index 356ce88..f010182 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -20,7 +20,9 @@ enum Tag { kDeletedFile = 6, kNewFile = 7, // 8 was used for large value refs - kPrevLogNumber = 9 + kPrevLogNumber = 9, + kNewKVLog = 10, + kDeletedKVLog = 11, }; void VersionEdit::Clear() { @@ -82,6 +84,17 @@ void VersionEdit::EncodeTo(std::string* dst) const { PutLengthPrefixedSlice(dst, f.smallest.Encode()); PutLengthPrefixedSlice(dst, f.largest.Encode()); } + + for(int i = 0; i < new_kvlogs_.size(); i++) { + const FileMetaData& f = new_kvlogs_[i]; + PutVarint32(dst, kNewKVLog); + PutVarint64(dst, f.number); + } + + for(const auto& deleted_kvlog_number : deleted_kvlogs_) { + PutVarint32(dst,kDeletedKVLog); + PutVarint64(dst, deleted_kvlog_number); + } } static bool GetInternalKey(Slice* input, InternalKey* dst) { @@ -186,6 +199,22 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; + case kNewKVLog: + if(GetVarint64(&input, &f.number)) { + new_kvlogs_.push_back(f); + } else { + msg = "new-kvlog entry"; + } + break; + + case kDeletedKVLog: + if(GetVarint64(&input, &number)) { + deleted_kvlogs_.insert(number); + } else { + msg = "deleted kvlog"; + } + break; + default: msg = "unknown tag"; break; diff --git a/db/version_edit.h b/db/version_edit.h index 137b4b1..07d88ff 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -5,6 +5,7 @@ #ifndef STORAGE_LEVELDB_DB_VERSION_EDIT_H_ #define STORAGE_LEVELDB_DB_VERSION_EDIT_H_ +#include #include #include #include @@ -75,6 +76,18 @@ class VersionEdit { deleted_files_.insert(std::make_pair(level, file)); } + + //TODO:扩展更多的属性 + void AddKVLogs(uint64_t file) { + FileMetaData f; + f.number = file; + new_kvlogs_.push_back(f); + } + + void RemoveKVLogs(uint64_t file) { + deleted_kvlogs_.insert(file); + } + void EncodeTo(std::string* dst) const; Status DecodeFrom(const Slice& src); @@ -99,6 +112,9 @@ class VersionEdit { std::vector> compact_pointers_; DeletedFileSet deleted_files_; std::vector> new_files_; + + std::set deleted_kvlogs_; + std::vector new_kvlogs_; }; } // namespace leveldb diff --git a/db/version_set.cc b/db/version_set.cc index 4e37bf9..8a9fceb 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4,7 +4,9 @@ #include "db/version_set.h" +#include "db/version_edit.h" #include +#include #include #include "db/filename.h" @@ -583,6 +585,12 @@ class VersionSet::Builder { } }; + struct BySmallestFileNum { + bool operator()(FileMetaData* f1, FileMetaData* f2) const { + return (f1->number < f2->number); + } + }; + typedef std::set FileSet; struct LevelState { std::set deleted_files; @@ -593,6 +601,9 @@ class VersionSet::Builder { Version* base_; LevelState levels_[config::kNumLevels]; + std::set deleted_kvlogs; + std::set added_kvlogs; + public: // Initialize a builder with the files from *base and other info from *vset Builder(VersionSet* vset, Version* base) : vset_(vset), base_(base) { @@ -666,6 +677,17 @@ class VersionSet::Builder { levels_[level].deleted_files.erase(f->number); levels_[level].added_files->insert(f); } + + //Delete KVLogs + for(const auto& deleted_kvlog : edit->deleted_kvlogs_) { + deleted_kvlogs.insert(deleted_kvlog); + } + //Add new KVLogs + for(const auto& added_kvlog : edit->new_kvlogs_) { + FileMetaData *f = new FileMetaData(added_kvlog); + f->refs = 1; + added_kvlogs.insert(f); + } } // Save the current state in *v. @@ -712,6 +734,18 @@ class VersionSet::Builder { } #endif } + + //合并一个版本的kvlog以及当前edit的新增,但是不包含被删除的kvlog + for(const auto &kvlog : base_->kvlogs_) { + if(!deleted_kvlogs.count(kvlog->number)) continue; + v->kvlogs_.push_back(kvlog); + kvlog->refs++; + } + for(const auto &kvlog : added_kvlogs) { + if(!deleted_kvlogs.count(kvlog->number)) continue; + v->kvlogs_.push_back(kvlog); + kvlog->refs++; + } } void MaybeAddFile(Version* v, int level, FileMetaData* f) { @@ -1158,6 +1192,15 @@ void VersionSet::AddLiveFiles(std::set* live) { } } +void VersionSet::AddLiveKVLogs(std::set* live_kvlogs_) { + for(Version* v = dummy_versions_.next_; v != &dummy_versions_; + v = v->next_) { + for(int i = 0; i < v->kvlogs_.size(); i++) { + live_kvlogs_->insert(v->kvlogs_[i]->number); + } + } +} + int64_t VersionSet::NumLevelBytes(int level) const { assert(level >= 0); assert(level < config::kNumLevels); diff --git a/db/version_set.h b/db/version_set.h index ea0c925..b70117e 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -15,6 +15,7 @@ #ifndef STORAGE_LEVELDB_DB_VERSION_SET_H_ #define STORAGE_LEVELDB_DB_VERSION_SET_H_ +#include #include #include #include @@ -152,6 +153,7 @@ class Version { // List of files per level std::vector files_[config::kNumLevels]; + std::vector kvlogs_; // Next file to compact based on seek stats. FileMetaData* file_to_compact_; @@ -258,6 +260,8 @@ class VersionSet { // May also mutate some internal state. void AddLiveFiles(std::set* live); + void AddLiveKVLogs(std::set* live_kvlogs); + // Return the approximate offset in the database of the data for // "key" as of version "v". uint64_t ApproximateOffsetOf(Version* v, const InternalKey& key); diff --git a/db/write_batch.cc b/db/write_batch.cc index b54313c..28bf437 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -15,12 +15,18 @@ #include "leveldb/write_batch.h" +#include "db/db_impl.h" #include "db/dbformat.h" #include "db/memtable.h" #include "db/write_batch_internal.h" +#include #include "leveldb/db.h" +#include "leveldb/slice.h" +#include "leveldb/status.h" #include "util/coding.h" +#include "kv_sep/kvlog.h" + namespace leveldb { // WriteBatch header has an 8-byte sequence number followed by a 4-byte count. @@ -147,4 +153,42 @@ void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader); } + +class KPBatchConstructor : public WriteBatch::Handler { +public: + void Put(const Slice& key, const Slice& value) override { + int addend_key = EncodeVarint32(buf, key.size()) - buf; + int addend_value = EncodeVarint32(buf, value.size()) - buf; + fp.FileOffset += 1 + addend_key + key.size() + addend_value; + fp.Size = value.size(); + EncodeFP(fp, rep); + kp_batch->Put(key, Slice(rep,3 * 8)); + fp.FileOffset += fp.Size; + + // printf("key:%s, file num:%ld,offset:%ld,size:%ld\n",key.ToString().c_str(),fp.FileNumber,fp.FileOffset,fp.Size); + } + + void Delete(const Slice& key) override { + kp_batch->Delete(key); + int addend= EncodeVarint32(buf, key.size()) - buf; + fp.FileOffset += 1 + addend + key.size(); + } + + WriteBatch *kp_batch; + FilePointer fp; + char rep[8 * 3]; + char buf[5]; +}; + +Status WriteBatchInternal::ConstructKPBatch(WriteBatch *kp_batch, + const WriteBatch *write_batch, FilePointer fp) { + kp_batch->Clear(); + KPBatchConstructor constructor; + constructor.kp_batch = kp_batch; + constructor.fp.FileNumber = fp.FileNumber; + constructor.fp.FileOffset = fp.FileOffset + kHeader; + constructor.fp.Size = 0; + return write_batch->Iterate(&constructor); +} + } // namespace leveldb diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index fce86e3..7faf405 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -5,7 +5,10 @@ #ifndef STORAGE_LEVELDB_DB_WRITE_BATCH_INTERNAL_H_ #define STORAGE_LEVELDB_DB_WRITE_BATCH_INTERNAL_H_ +#include "db/db_impl.h" #include "db/dbformat.h" +#include +#include "leveldb/status.h" #include "leveldb/write_batch.h" namespace leveldb { @@ -38,6 +41,9 @@ class WriteBatchInternal { static Status InsertInto(const WriteBatch* batch, MemTable* memtable); static void Append(WriteBatch* dst, const WriteBatch* src); + + static Status ConstructKPBatch(WriteBatch* kp_batch, + const WriteBatch* write_batch,FilePointer fp); }; } // namespace leveldb diff --git a/fielddb/field_db.h b/fielddb/field_db.h index ef981e9..2b7f0d9 100644 --- a/fielddb/field_db.h +++ b/fielddb/field_db.h @@ -145,7 +145,7 @@ private: // std::fflush(stdout); // } // } -// }; +}; Status DestroyDB(const std::string& name, const Options& options); diff --git a/kv_sep/kvlog.cc b/kv_sep/kvlog.cc new file mode 100644 index 0000000..e08e7b4 --- /dev/null +++ b/kv_sep/kvlog.cc @@ -0,0 +1,107 @@ +#include "kv_sep/kvlog.h" +#include "db/dbformat.h" +#include "db/filename.h" +#include "db/write_batch_internal.h" +#include +#include "leveldb/env.h" +#include "leveldb/slice.h" +#include "leveldb/status.h" +#include "util/coding.h" + +namespace leveldb { +void EncodeFP(const struct FilePointer &fp, char *scratch) { + EncodeFixed64(scratch, fp.FileNumber); + EncodeFixed64(scratch + 8, fp.FileOffset); + EncodeFixed64(scratch + 16, fp.Size); + +} + +void DecodeFp(struct FilePointer &fp, char *src) { + + fp.FileNumber = DecodeFixed64(src); + fp.FileOffset = DecodeFixed64(src + 8); + fp.Size = DecodeFixed64(src + 16); +} + +KVLog::KVLog(WritableFile *dest,uint64_t file_number): + dest_(dest),pos_(0),file_number(file_number) {} + +KVLog::KVLog(WritableFile *dest, uint64_t dest_length,uint64_t file_number): + dest_(dest),pos_(dest_length),file_number(file_number) {} + +Status KVLog::AddRecord(const Slice &slice, FilePointer &fp) { + //写入slice大小 + EncodeFixed64(buf, slice.size()); + Status s = dest_->Append(Slice(buf,8)); + pos_ += 8; + //写入slice + fp.FileNumber = file_number; + fp.FileOffset = pos_; + fp.Size = slice.size(); + s = dest_->Append(slice); + pos_ += slice.size(); + if(s.ok()) { + s = dest_->Flush(); + } + return s; +} + +void KVLogReader::Next() { + if(input.empty()) { + NextWriteBatch(); + } + NextKV(); +} + +void KVLogReader::NextWriteBatch() { + Slice num; + file->Read(8, &num, number); + if(num.size() !=8) { + valid = false; + return; + } + uint64_t batch_size = DecodeFixed64(number); + if(batch_size > rep_size) { + delete[] rep_; + rep_ = new char[batch_size]; + rep_size = batch_size; + } + input.clear(); + file->Read(batch_size,&input,rep_); + if(input.size() != batch_size) { + valid = false; + return; + } + seq = DecodeFixed64(input.data()); + input.remove_prefix(12); //remove writebatch header +} + +void KVLogReader::NextKV() { + type = (ValueType)input[0]; + input.remove_prefix(1); + switch (type) { + case kTypeValue: + if(GetLengthPrefixedSlice(&input, &key) && + GetLengthPrefixedSlice(&input, &value)) { + + } else { + valid = false; + return; + } + break; + + case kTypeDeletion: + if(GetLengthPrefixedSlice(&input, &key)) { + + } else { + valid = false; + return; + } + break; + default: + valid = false; + return; + } +} + +} diff --git a/kv_sep/kvlog.h b/kv_sep/kvlog.h new file mode 100644 index 0000000..cce73bb --- /dev/null +++ b/kv_sep/kvlog.h @@ -0,0 +1,67 @@ +#pragma once +#include "db/dbformat.h" +#include +#include +#include "leveldb/env.h" +#include "leveldb/slice.h" +#include "leveldb/status.h" +#include "leveldb/write_batch.h" +namespace leveldb { + +struct FilePointer { + uint64_t FileNumber; + uint64_t FileOffset; + uint64_t Size; +}; + +void EncodeFP(const struct FilePointer &fp,char *scratch); +void DecodeFp(struct FilePointer &fp, char *src); + +class KVLog { +public: + explicit KVLog(WritableFile *dest,uint64_t file_number); + KVLog(WritableFile *dest, uint64_t dest_length,uint64_t file_number); + + KVLog(const KVLog&) = delete; + KVLog& operator=(const KVLog&) = delete; + + ~KVLog() = default; + + Status AddRecord(const Slice& slice,FilePointer &fp); +private: + WritableFile* dest_; + uint64_t pos_; + uint64_t file_number; + char buf[8]; +}; + + +class KVLogReader { +public: + KVLogReader(SequentialFile *file):file(file),rep_(nullptr),rep_size(0),valid(true) + { Next();}; + ValueType Type() {return type;} + Slice Key() {return key;} + Slice Value() {return value;} + SequenceNumber Seq() {return seq;} + void Next(); + void Valid(); +private: + void NextWriteBatch(); + void NextKV(); + +private: + Slice key; + Slice value; + SequenceNumber seq; + ValueType type; + bool valid; + + SequentialFile *file; + Slice input; + uint64_t rep_size; + char *rep_; + char number[8]; +}; + +} \ No newline at end of file diff --git a/kv_sep/kvlog_cache.cc b/kv_sep/kvlog_cache.cc new file mode 100644 index 0000000..e69de29 diff --git a/kv_sep/kvlog_cache.h b/kv_sep/kvlog_cache.h new file mode 100644 index 0000000..e69de29 diff --git a/test/test.cc b/test/test.cc new file mode 100644 index 0000000..fe15fde --- /dev/null +++ b/test/test.cc @@ -0,0 +1,73 @@ +#include "leveldb/db.h" +#include "leveldb/options.h" +#include +#include + +using namespace std; +using namespace leveldb; + +int main() { + DB* db = nullptr; + Options op; + op.create_if_missing = true; + DestroyDB("testdb", Options()); + Status status = DB::Open(op, "testdb", &db); + // assert(status.ok()); + + // string key = "leveldb",value = to_string(0); + // string res; + // for(int i = 0; i < 5; i++) { + // db->Put(WriteOptions(),key,to_string(i)); + // } + // db->Put(WriteOptions(),key+key,to_string(0)); + // db->Put(WriteOptions(),key+key,to_string(1)); + // sleep(1); + // auto snapshot = db->GetSnapshot(); + // auto readopts = ReadOptions(); + // readopts.snapshot = snapshot; + // db->Put(WriteOptions(),key,to_string(10)); + // db->CompactRange(nullptr,nullptr); + // db->Get(readopts,key,&res); + // cout<<"with snapshot:"<Get(ReadOptions(),key,&res); + // cout<<"without snapshot:"<Delete(WriteOptions(),key); + // db->Put(WriteOptions(),key,value); + // const Snapshot *snapshot = db->GetSnapshot(); + // ReadOptions read_op = ReadOptions(); + // read_op.snapshot = snapshot; + // string result; + // db->Get(read_op,key,&result); + // cout<Delete(WriteOptions(),key); + // result.clear(); + // db->Get(read_op,key,&result); + // cout<<"with snapshot:"<Get(ReadOptions(),key,&result); + // cout<<"without snapshot:"<ReleaseSnapshot(snapshot); + + string key = "abc",value = "leveldb"; + for(int i = 0; i < 20; i++) { + db->Put(WriteOptions(),key + std::to_string(i),value + std::to_string(i)); + } + std::string res; + for(int i = 0; i < 10; i++) { + db->Get(ReadOptions(), key + std::to_string(i), &res); + std::cout << i << " " << res << std::endl; + } + delete db; + // DB::Open(op,"testdb",&db); + // key = "abc", value = ""; + // db->Get(ReadOptions(),key,&value); + // cout<