diff --git a/CMakeLists.txt b/CMakeLists.txt index af7c9c1..36d85fc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -528,7 +528,6 @@ endif(LEVELDB_INSTALL) add_executable(db_test2 "${PROJECT_SOURCE_DIR}/test/db_test2.cc" test/value_field_test.cc - db/vlog_writer.cc ) target_link_libraries(db_test2 PRIVATE leveldb) diff --git a/README.md b/README.md index 0692880..e15fab2 100755 --- a/README.md +++ b/README.md @@ -33,8 +33,15 @@ 3. 允许在存储 KV 键值对前修改字段内容; 4. 允许通过字段值查询所有对应的 key。 + **实现思路:** - 1. 设计类 `Fields` 来操作字段数组,与字段相关的字段截取、读写操作、序列化等函数均在 `Fields` 类中实现; - 2. 通过字段查询 Key :实现函数 `FindKeysByField`,传入若干字段名和字段值(即子字段数组),遍历查找 LSM-Tree 找到对应的若干 key。 + 1. 定义一个` Fields`类来管理`LevelDB`中的字段。其中`Field`是使用标准库中的`std::pair` 定义了单个字段的格式,而`FieldArray`则是使用了`std::vector` 来定义一组字段。 + 2. 定义`fields_`这一私有成员变量,它是一个 `FieldArray` 类型的向量,用来存储一组字段。 + 3. 定义一系列的构造函数来支持从不同类型的参数创建`Fields`对象。 + 4. 定义了`SortFields`方法来确保在创建`Fields`对象时,各个字段会根据`field_name`从小到大进行排序,进而减少后续更新删除操作中会出现的通过 `field_name` 遍历 `Fields` 的耗时。 + 5. 定义了`UpdateField` 和 `UpdateFields` 方法允许用户更新或插入单个或多个字段,以及`DeleteField` 和 `DeleteFields` 方法允许用户删除单个或多个字段。实现思路是通过遍历fields_来查找匹配的field_name并对其进行更新(若不存在则插入)以及删除操作(⭐在上述操作的实现中,由于字段列表的有序性,遍历时可通过比较field_name大小来提前判断该字段是否存在,对于小字段的查找尤其明显,进而有效减少搜索时间,提高搜索效率。) + 6. 定义`SerializeValue` 和 `ParseValue` 方法分别用于将字段序列化为字符串或将字符串反序列化为字段对象。实现时是调用了`conding.h`文件中的`PutLengthPrefixedSlice`函数以及`GetLengthPrefixedSlice`函数,它们的作用分别是对一个`string`进行编码并在其前面加入长度信息和将编码后的`string`中的长度信息去除,提取出原始`string`。这两个函数不仅可以完美实现我们对字段编码的最初设计,同时也和`lsm-tree`里原来所有的`kv`数据对的编码保持一致。 + 7. 定义`GetField`和`HasField`方法用于访问特定字段和检查字段是否存在,实现思路与更新删除操作类似,也是对`fields_`进行遍历。 + 8. 重载 `[]` 运算符,以提供类似字典的字段访问方式。对于常量对象,`operator[]` 返回字段值的副本,如果给定的字段名不存在,则返回一个空字符串,并输出错误信息。而对于非常量对象,`operator[]` 返回字段值的引用,并允许修改该字段值。如果给定的字段名不存在,则会插入一个新的字段,并返回新字段值的引用。 + 9. 定义了一个静态方法`FindKeysByFields` 用于根据若干个字段在数据库中查找对应的键。实现上是使用`LevelDB`提供的`API`创建一个`NewIterator`,从数据库的第一个条目遍历到最后一个条目。事先定义了`find_keys` 来存储找到的键,在遍历过程中,为了避免重复处理同一个键,会先检查当前键是否已经存在于 `find_keys` 中。如果存在,则跳过此条目。若不存在,则提取其`value`部分,利用 `ParseValue` 方法将字符串形式的值解析为 `Fields` 对象,进而获得该条目对应的字段数组。再对解析后的字段数组与`search_fields_`进行匹配,这里支持完全匹配和部分匹配,将匹配的`key`存入`find_keys`中,最后返回`find_keys`。(💡Tips:在使用`Iterator`进行遍历时,`it.key()`和`it.value()`获取的其实是`kv`字符串本身,不需要我们再解码`kv length`和考虑`tag`(`ktypevalue`、`ktypedeletion`)。我们在设计之初并未考虑到这一点,是在后续测试的`debug`中发现了这一情况) ### 2.2 KV 分离 @@ -75,7 +82,7 @@ class Fields { private: FieldArray fields_; - + public: /* 从 FieldArray 构造 */ explicit Fields(const FieldArray& fields); diff --git a/db/db_impl.cc b/db/db_impl.cc index eacde23..a5e7153 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1319,7 +1319,11 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { // into mem_. { mutex_.Unlock(); - status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); + // 先写入vlog再写入memtable + // 写vlog日志 offset 表示这个 write_batch 在vlog中的偏移地址。 + uint64_t offset = 0; + status = vlog_->AddRecord(WriteBatchInternal::Contents(write_batch),offset); + // status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); bool sync_error = false; if (status.ok() && options.sync) { status = logfile_->Sync(); @@ -1328,7 +1332,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { } } if (status.ok()) { - status = WriteBatchInternal::InsertInto(write_batch, mem_); + status = WriteBatchInternal::InsertInto(write_batch, mem_, logfile_number_, offset); } mutex_.Lock(); if (sync_error) { diff --git a/db/db_impl.h b/db/db_impl.h index 27c09d2..2237a72 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -12,6 +12,7 @@ #include "db/dbformat.h" #include "db/log_writer.h" +#include "db/vlog_writer.h" #include "db/snapshot.h" #include "leveldb/db.h" #include "leveldb/env.h" @@ -209,6 +210,8 @@ class DBImpl : public DB { Status bg_error_ GUARDED_BY(mutex_); CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_); + + log::VlogWriter* vlog_; }; // Sanitize db options. The caller should delete result.info_log if diff --git a/db/write_batch.cc b/db/write_batch.cc index b54313c..874bc34 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -18,7 +18,9 @@ #include "db/dbformat.h" #include "db/memtable.h" #include "db/write_batch_internal.h" + #include "leveldb/db.h" + #include "util/coding.h" namespace leveldb { @@ -79,6 +81,68 @@ Status WriteBatch::Iterate(Handler* handler) const { } } +Status WriteBatch::Iterate(Handler* handler, uint64_t fid, + uint64_t offset) const { + Slice input(rep_); + // 整个writebatch 的起始地址 + const char* begin = input.data(); + + if (input.size() < kHeader) { + return Status::Corruption("malformed WriteBatch (too small)"); + } + // 12个字节,8个字节用来表示sequence,4个字节用来表示 count,移除头 + input.remove_prefix(kHeader); + Slice key, value; + int found = 0; + while (!input.empty()) { + found++; + const uint64_t kv_offset = input.data() - begin + offset; + assert(kv_offset > 0); + + // record 记录为 1 个字节 是 kTypeValue ,剩下的字节是 key value + // record 记录为 1 个字节 是 kTypeDeletion, 剩下的字节是key + char tag = input[0]; + input.remove_prefix(1); + switch (tag) { + case kTypeValue: + if (GetLengthPrefixedSlice(&input, &key) && + GetLengthPrefixedSlice(&input, &value)) { + handler->Put(key, value); + } else { + return Status::Corruption("bad WriteBatch Put"); + } + break; + case kTypeDeletion: + if (GetLengthPrefixedSlice(&input, &key)) { + handler->Delete(key); + } else { + return Status::Corruption("bad WriteBatch Delete"); + } + break; + // case kTypeSeparate: + // if (GetLengthPrefixedSlice(&input, &key) && + // GetLengthPrefixedSlice(&input, &(value))) { + // // value = fileNumber + offset + valuesize 采用变长编码的方式 + // std::string dest; + // PutVarint64(&dest, fid); + // PutVarint64(&dest, kv_offset); + // PutVarint64(&dest, value.size()); + // Slice value_offset(dest); + // handler->Put(key, value_offset, kTypeSeparate); + // } else { + // return Status::Corruption("bad WriteBatch Put"); + // } + // break; + default: + return Status::Corruption("unknown WriteBatch tag"); + } + } + if (found != WriteBatchInternal::Count(this)) { + return Status::Corruption("WriteBatch has wrong count"); + } else { + return Status::OK(); + } +} int WriteBatchInternal::Count(const WriteBatch* b) { return DecodeFixed32(b->rep_.data() + 8); } @@ -136,6 +200,15 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { return b->Iterate(&inserter); } +Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable, + uint64_t fid, size_t offset) { + MemTableInserter inserter; + // 一批 writeBatch中只有一个sequence,公用的,后续会自加。 + inserter.sequence_ = WriteBatchInternal::Sequence(b); + inserter.mem_ = memtable; + return b->Iterate(&inserter, fid, offset); +} + void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { assert(contents.size() >= kHeader); b->rep_.assign(contents.data(), contents.size()); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index fce86e3..dd08be8 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -37,6 +37,8 @@ class WriteBatchInternal { static Status InsertInto(const WriteBatch* batch, MemTable* memtable); + static Status InsertInto(const WriteBatch* batch, MemTable* memtable,uint64_t fid, size_t offset); + static void Append(WriteBatch* dst, const WriteBatch* src); }; diff --git a/include/leveldb/write_batch.h b/include/leveldb/write_batch.h index 94d4115..2af98a3 100644 --- a/include/leveldb/write_batch.h +++ b/include/leveldb/write_batch.h @@ -71,6 +71,7 @@ class LEVELDB_EXPORT WriteBatch { // Support for iterating over the contents of a batch. Status Iterate(Handler* handler) const; + Status Iterate(Handler* handler, uint64_t fid, uint64_t offset) const; private: friend class WriteBatchInternal;