为了保证可变的字段数目和字段名,除了使用leveldb中提供的变长整数编码外,我们还提出了两种新的方法,但它们存在各自的局限性。
如将 fields{{"name", "Customer#000000001"},{"address", "IVhzIApeRb"}, {"phone", "25-989-741-2988"}} 转化为字符串 "nameCustomer#000000001addressIVhzIApeRbphone*25-989-741-2988"
局限性:如何选择特殊符号才能保证字段名和字段值中不包含特殊符号
如将 fields{{"name", "Customer#000000001"},{"address", "IVhzIApeRb"}, {"phone", "25-989-741-2988"}} 转化为字符串 "22name0address18phone28Customer#000000001IVhzIApeRb25-989-741-2988"
局限性:字段名如果包含数字可能会导致解析错误
这种方法依赖于特殊的编码方式Varint32/Varint64,在获得解码后的属性长度后可以直接获得属性值,节省空间且简洁高效,能够较好的进行数据压缩,但在大值处理上可能存在某些局限。具体结构见本文档3.1部分。
我们将 key 和 value 对应的索引还是存储在 SST 文件中,将真正的 value 存储在 blob 文件中。
key 和 value 分离的时机是在内存数据 flush 到磁盘的时候。
BlobFile 中包含了按key顺序的有序存储的 KV 对,KV 对按单个记录压缩。因此,在 Flush 的过程中,大 value 在 LSM 树中的存储形式为 <key, <fileno, offset>>。
我们知道 compaction 会删除 SST 文件中的垃圾数据,但并不会删除 blob 中真正的 value,那我们如何删除 blob 文件中的垃圾数据呢,这就要引入 blob 文件的 GC 机制。blob 文件的 GC 要解决两个问题:
那么问题又来了,如何判断文件的垃圾数据大小?
RocksDB 允许用户使用自定义的TablePropertiesCollector(文件信息搜集器)来搜集 SST 文件上的用户所关心的数据。我们可以通过这个特性来搜集 SST 文件上的 Blob 文件信息。如下图:
左边 SST 中 Index 的格式为: 第一列代表 BlobFile 的文件 ID,第二列代表 blob record 在 BlobFile 中的 offset,第三列代表 blob record 的 size。
右边 BlobFileSizeProperties 中的每一行代表一个 BlobFile 以及 SST 中有多少数据保存在这个 BlobFile 中,第一列代表 BlobFile 的文件 ID,第二列代表数据大小。
每次 compact 都会记录输入文件和输出文件
KV 分离存储除了 GC 的问题,还有很多问题需要解决,如 blob 文件的多版本并发访问、服务重启后如何重新计算 blob 文件的垃圾数据量等。
可能存在的问题
将字段个数、每个字段名与字段值的长度都作为无符号32位整数进行变长编码(Varint32),与字段名、字段值按顺序排列在一起作为value。
借助函数PutLengthPrefixedSlice(),该函数对参数字符串的长度进行编码,同时将编码后的长度和字符串追加到目标字符串中,从而形成value。
将 blob 文件构建卸载到 DB 的后台作业(即flush和compaction)有几个优点。它使 BlobDB 能够提供与 DB 本身相同的一致性保证。还有一些性能优势:
如上所述,结合示意图,我们可以看到blob file在一开始的时候(memtable flush成SST的时候)跟SST是一一映射的。在后续的SST compaction的时候,blob file有两种选择:
在用户删除或更新 key 后,常见 LSM 引擎会在 compaction 过程中把旧的记录删除,如下图所示:如果在做 compaction 时,发现上下层有相同的 key,或者上层有 delete tombstone,引擎则会将下层的 key 删除,在新生成的 SST 中只保留一份 KV。
将大 value 分离出 LSM 树后,我们需要处理大 value 的垃圾回收,减少盘上存在的垃圾数据,减少 KV 分离导致的空间放大。
普通垃圾回收 (Regular GC) ,用统计信息确定要回收的 BlobFile,而后重写对应的 BlobFile,将新的 vptr 写回 LSM 树。整个流程如下图所示。
由于没有类似内置 MVCC 多版本并发控制功能,在回写 LSM 树时,需要注册 WriteCallback,在 callback 中检测当前回写的 key 是否已经被删除或更新。这会对引擎 GC 过程中的用户写入吞吐造成巨大的影响。
为了解决这一问题,引入了一种新的 GC 方案,名为“Level Merge”,如下图所示,在 LSM 树做 compaction 的过程中,将对应的 BlobFile 重写,并顺便更新 SST 中的 vptr。由此减少了对前台用户写入的影响。
Level Merge 仅在 LSM 树的最后两层启用。
具体的数据结构可能需要等BlobFile落实才能进一步设计。
重点是第二步,也就是想要GC的话会在compaction过程中直接将过期的blob-value直接回收,compaction完成之后 LSM-Tree的sst 以及 blob都会被更新到,只需要维护后续的旧的blob回收即可。
BlobDB通过参数blob_garbage_collection_age_cutoff来判定哪些old blob file需要被回收,例如,默认值 0.25 向 RocksDB 发出信号,表明 GC 应该重新定位最旧 25% 的 Blob 文件中的 Blob。可以调整该参数来调整写入放大和空间放大之间的权衡。
也就是说blob file的物理删除是异步进行,根据blob_garbage_collection_age_cutoff来决定哪些blob file足够老,可以被删除。
描述实现功能要新增的函数及其功能说明。例如Value log的GC,二级索引的建立、查询、删除等操作。
用于将字段数组序列化为字符串。实现方法:
std::string SerializeValue(const FieldArray& fields)
{
定义空字符串dst;
获取字段数组的长度size;
将size编码为Varint32并加入dst;
遍历fields中的field_name, field_value
{
将field_name编码为Varint32并加入dst;
将field_value编码为Varint32并加入dst;
}
返回dst;
}
用于将字符串反序列化为字段数组,主要借助函数GetLengthPrefixedSlice解码字符串前缀的变长整数同时移除相应长度的字符串。实现方法:
FieldArray ParseValue(const std::string& value_str)
{
定义空的FieldArray fields;
解码得到字段数组的长度size;
循环size次
{
解码得到字段名;
解码得到字段值;
将字段名和字段值组成pair加入到fields中;
}
返回fields;
}
用于通过指定的字段名和字段值查找key。实现方法:
std::vector<std::string> FindKeysByField(leveldb::DB* db, Field &field)
{
新建一个std::vector<std::string> ret存储找到的key;
新建一个DBIter it;
it指向数据库中第一个key;
while it 有效
{
取出it对应的value;
对value进行解析,获得字段数组fields;
遍历fields
{
如果字段名等于field的字段名且字段值等于field的字段值
{
将it的key加入ret中;
}
}
}
返回ret;
}
是在刷盘flush的时候进入kv分离,有flush函数的文件table/table_builder.cc:
void TableBuilder::Flush() {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return; //朴,正常判断
assert(!r->pending_index_entry);
if (DBImpl::key_value_separated_) {
// 这里获取数据块内容并初始化 Block 对象,朴
Slice block_content = r->data_block.Finish();
BlockContents contents;
contents.data = block_content;
contents.heap_allocated = false;
contents.cachable = false;
// 初始化 Block
Block data_block(contents);
std::unique_ptr<Iterator> iter(data_block.NewIterator(Options().comparator));
// 遍历数据块中的键值对
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
const Slice& key = iter->key();
const Slice& value = iter->value();
// 检查值是否大于阈值
if (value.size() > min_blob_size) {
// 将值存储到 blobfile 中
Status status = blobfile->Put(key, value);
if (!status.ok()) {
r->status = status;
}
}
}
}
WriteBlock(&r->data_block, &r->pending_handle); //将数据块写入文件,并获取数据块的句柄。
if (ok()) {
r->pending_index_entry = true;
r->status = r->file->Flush(); //刷新
}
if (r->filter_block != nullptr) {
r->filter_block->StartBlock(r->offset);
}
}
kv分离相关配置参数,均支持动态调节。在和有flush函数的文件table_builder.cc想同文件夹下创建blob_file.cc和.h文件,创建相关数据结构:
enable_blob_files: 是否开启KV分离。
min_blob_size: KV分离阈值,大于等于该阈值的value在Flush/Compaction时写到Blob文件。
blob_file_size: Blob文件大小。
blob_compression_type: blob文件压缩算法,每个blobfile使用相同的压缩算法。
enable_blob_garbage_collection: 设置该值之后,引擎会在compaction时重写遇到的位于
最老的一批Blob文件中的value到新的Blob文件。
blob_garbage_collection_age_cutoff: 定义旧的Blob文件的阈值,默认为0.25,
表示所有Blob文件中最先生成的25% Blob文件即为旧的Blob文件。
blob_garbage_collection_force_threshold: 引擎主动GC的一个阈值。除了在compaction过程中
重写老的blob value以外,引擎还支持主动发起GC。当最旧的一批blob文件garbage所占比值>=该值,
会触发一次compaction清理blob文件。
blob_compaction_readahead_size: 从blob文件中预读数据大小。如果设置了该值,会在compaction
中预读blob文件,预读大小为blob_compaction_readahead_size。
具体实现:
key_value_separated_: 是否开启KV分离。
min_blob_size: KV分离阈值,大于等于该阈值的value在Flush/Compaction时写到Blob文件。
blob_num: Blob文件大小。
bfid:文件id
offset:偏移量
blob_file.h :
#ifndef LEVELDB_BLOB_FILE_H_
#define LEVELDB_BLOB_FILE_H_
#include <string>
#include "leveldb/status.h"
#include "leveldb/slice.h"
#include "leveldb/env.h"
namespace leveldb {
namespace blob {
class BlobFile {
public:
explicit BlobFile(WritableFile* dest);
BlobFile(WritableFile* dest, uint64_t dest_length);
~BlobFile();
// 添加一条记录,记录写入的偏移量
Status AddRecord(const Slice& key, const Slice& value, uint64_t& offset);
private:
WritableFile* dest_; // 用于写入数据的目标文件
uint64_t head_; // 当前写入位置的偏移量
uint64_t bfid_; // 用于标识 BlobFile 的唯一 ID
// uint64_t head_; // 当前写入文件的偏移量
Status EmitDynamicRecord(const Slice& key, const Slice& value, uint64_t& offset);
};
} // namespace blob
} // namespace leveldb
#endif // LEVELDB_BLOB_FILE_H_
实现TableBuilder
的Flush
写操作,当DBImpl::key_value_separated_
开启时,遍历数据块键值对,大于min_blob_size
的value
存入BlobFile
并记录偏移量和文件 ID,更新value
为编码后的偏移量和 ID,小于阈值的value
直接处理,最后将数据块写入文件并执行相关刷新及过滤操作,BlobFile
通过特定格式记录键值对并更新偏移量。
table_builder.cc :
void TableBuilder::Flush() {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return; //朴,正常判断
assert(!r->pending_index_entry);
if (DBImpl::key_value_separated_) {
// 这里获取数据块内容并初始化 Block 对象,朴
Slice block_content = r->data_block.Finish();
BlockContents contents;
contents.data = block_content;
contents.heap_allocated = false;
contents.cachable = false;
Rep* new_rep = new Rep(r->options, r->file); // 创建一个新的 Rep 实例
new_rep->offset = r->offset; // 新的 offset 初始化为当前的 offset
new_rep->num_entries = r->num_entries;
// 初始化 Block
Block data_block(contents);
leveldb::WritableFile* dest = nullptr;
leveldb::blob::BlobFile blobfile(dest); // 可以动态生成文件名以防止重复 // 初始化 BlobFile 对象,朴
leveldb::WritableFile* file;
int bfid = DBImpl::NewBlobNum(); // 生成唯一的 blobfile id
std::unique_ptr<Iterator> iter(data_block.NewIterator(Options().comparator));
// 遍历数据块中的键值对
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
const Slice& key = iter->key();
const Slice& value = iter->value();
// 检查值是否大于阈值
if (value.size() > min_blob_size) {
// 将值存储到 blobfile 中
uint64_t offset; // 局部变量存储偏移量
Status status = blobfile.AddRecord(key, value, offset);
if (!status.ok()) {
r->status = status;
}
// 这里修改 value,存储 Blob 的 offset 和 bfid
std::string new_value = EncodeBlobValue(offset, bfid);
new_rep->data_block.Add(key, Slice(new_value));
}
else{
// 不需要 Blob 存储,直接处理普通值
new_rep->data_block.Add(key, value);
}
}
}
WriteBlock(&r->data_block, &r->pending_handle); //将数据块写入文件,并获取数据块的句柄。
if (ok()) {
r->pending_index_entry = true;
r->status = r->file->Flush(); //刷新
}
if (r->filter_block != nullptr) {
r->filter_block->StartBlock(r->offset);
}
}
std::string TableBuilder::EncodeBlobValue(uint64_t offset, int bfid) {
// 自定义方法:编码新的 Blob 值
std::string result;
// 为 result 分配空间
result.resize(8 + 4); // 64位 (8字节) + 32位 (4字节)
// 将 offset 和 bfid 编码成一个新的值
std::string result;
EncodeFixed64(&result[0], offset); // 编码 offset
EncodeFixed32(&result[8], bfid); // 编码 bfid
return result;
}
blob_file.cc :
#include "table/blob_file.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include <cassert>
namespace leveldb {
namespace blob {
BlobFile::BlobFile(WritableFile* dest) : dest_(dest), head_(0) {}
BlobFile::BlobFile(WritableFile* dest, uint64_t dest_length)
: dest_(dest), head_(dest_length) {}
BlobFile::~BlobFile() = default;
Status BlobFile::AddRecord(const Slice& key, const Slice& value, uint64_t& offset) {
// 动态写入记录,返回写入的偏移量
return EmitDynamicRecord(key, value, offset);
}
Status BlobFile::EmitDynamicRecord(const Slice& key, const Slice& value, uint64_t& offset) {
// 记录头部,包括 key 和 value 的长度
char header[8]; // 4 字节 key 长度 + 4 字节 value 长度
uint32_t key_size = static_cast<uint32_t>(key.size());
uint32_t value_size = static_cast<uint32_t>(value.size());
// 编码 key 和 value 长度
EncodeFixed32(header, key_size);
EncodeFixed32(header + 4, value_size);
// 写入头部
Status s = dest_->Append(Slice(header, sizeof(header)));
if (!s.ok()) {
return s;
}
// 写入 key 和 value 数据
s = dest_->Append(key);
if (!s.ok()) {
return s;
}
s = dest_->Append(value);
if (!s.ok()) {
return s;
}
// 刷新文件到磁盘
s = dest_->Flush();
if (!s.ok()) {
return s;
}
// 更新偏移量
offset = head_;
head_ += sizeof(header) + key_size + value_size;
return Status::OK();
}
} // namespace blob
} // namespace leveldb
仿照写过程,实际可能有别的问题,例如增加blob的参数。读操作需要从 SST 文件和 Blob 文件中获取数据,考虑到持久化、恢复和并发等问题。在实现中,要确保数据的一致性和完整性,同时尽量优化性能。
在table_builder.cc
中添加读取函数:
class DBImpl {
public:
static bool key_value_separated_;
static int NewBlobNum();
// 其他成员变量和函数...
private:
static std::mutex read_mutex_;
};
std::string TableBuilder::DecodeBlobValue(const Slice& stored_value) {
uint64_t offset;
int bfid;
DecodeFixed64(stored_value.data(), &offset);
DecodeFixed32(stored_value.data() + 8, &bfid);
// 从Blob文件中读取实际的value
leveldb::SequentialFile* blob_file;
Status s = env_->NewSequentialFile(BlobFileName(bfid), &blob_file);
if (!s.ok()) {
return "";
}
s = blob_file->Skip(offset);
if (!s.ok()) {
delete blob_file;
return "";
}
char header[8];
s = blob_file->Read(header, sizeof(header), nullptr);
if (!s.ok()) {
delete blob_file;
return "";
}
uint32_t key_size, value_size;
DecodeFixed32(header, &key_size);
DecodeFixed32(header + 4, &value_size);
std::string blob_value;
blob_value.resize(value_size);
s = blob_file->Read(blob_value.data(), value_size, nullptr);
if (!s.ok()) {
delete blob_file;
return "";
}
delete blob_file;
return blob_value;
}
Status TableBuilder::Read(const Slice& key, std::string* value) {
Rep* r = rep_;
assert(!r->closed);
{
std::lock_guard<std::mutex> guard(DBImpl::read_mutex_);
if (DBImpl::key_value_separated_) {
// 从SST文件中查找key
BlockContents contents;
// 从SST文件中获取对应key的数据块内容并初始化Block对象
//contents = 获取对应key的数据块内容();
Block data_block(contents);
std::unique_ptr<Iterator> iter(data_block.NewIterator(Options().comparator));
iter->Seek(key);
if (iter->Valid() && iter->key() == key) {
Slice stored_value = iter->value();
if (stored_value.size() == 12) {
*value = DecodeBlobValue(stored_value);
} else {
*value = std::string(stored_value.data(), stored_value.size());
}
} else {
return Status::NotFound("Key not found");
}
} else {
// 未开启KV分离,直接从SST文件中读取
}
}
return Status::OK();
}
在 LevelDB 的常规 SST 文件合并过程中,会涉及到多个 SST 文件的整合。在 KV 分离的情况下,需要额外处理 Blob 文件的引用。
在table_builder.cc中添加合并相关代码
void TableBuilder::CompactSSTAndBlobFiles() {
// 假设这里有获取参与合并的SST文件列表的逻辑
std::vector<SSTFile*> inputSSTFiles = GetInputSSTFilesForCompaction();
std::vector<SSTFile*> outputSSTFiles;
std::vector<BlobFile*> inputBlobFiles;
std::vector<BlobFile*> outputBlobFiles;
// 初始化输出Blob文件
for (size_t i = 0; i < inputBlobFiles.size(); ++i) {
leveldb::WritableFile* outputBlobDest = nullptr;
// 生成唯一的Blob文件名
std::string outputBlobFileName = GenerateUniqueBlobFileName();
env_->NewWritableFile(outputBlobFileName, &outputBlobDest);
BlobFile* outputBlobFile = new BlobFile(outputBlobDest);
outputBlobFiles.push_back(outputBlobFile);
}
// 遍历每个输入SST文件
for (SSTFile* inputSST : inputSSTFiles) {
std::unique_ptr<Iterator> iter(inputSST->NewIterator(Options().comparator));
std::vector<std::pair<Slice, Slice>> newKeyValues;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
const Slice& key = iter->key();
const Slice& value = iter->value();
if (DBImpl::key_value_separated_ && value.size() == 12) { // 假设Blob引用长度为12
uint64_t offset;
int bfid;
DecodeFixed64(value.data(), &offset);
DecodeFixed32(value.data() + 8, &bfid);
// 从旧的Blob文件中读取实际值
BlobFile* inputBlobFile = inputBlobFiles[bfid];
std::string actualValue = ReadValueFromBlobFile(inputBlobFile, offset);
// 判断是否需要保留该键值对
if (ShouldKeepKeyValue(key, actualValue)) {
// 将实际值写入新的Blob文件
uint64_t newOffset;
BlobFile* outputBlobFile = outputBlobFiles[bfid];
Status status = outputBlobFile->AddRecord(key, Slice(actualValue), newOffset);
if (!status.ok()) {
// 处理错误
continue;
}
// 更新SST文件中的Blob引用
std::string newBlobValue = EncodeBlobValue(newOffset, bfid);
newKeyValues.emplace_back(key, Slice(newBlobValue));
}
} else {
// 非Blob引用,直接处理
if (ShouldKeepKeyValue(key, value)) {
newKeyValues.emplace_back(key, value);
}
}
}
// 创建新的SST文件并写入更新后的键值对
SSTFile* newSSTFile = CreateNewSSTFile();
for (const auto& pair : newKeyValues) {
newSSTFile->Add(pair.first, pair.second);
}
outputSSTFiles.push_back(newSSTFile);
}
// 清理旧的Blob文件(假设无其他引用)
for (BlobFile* inputBlobFile : inputBlobFiles) {
delete inputBlobFile;
}
}
std::string TableBuilder::ReadValueFromBlobFile(BlobFile* blobFile, uint64_t offset) {
char header[8];
Status s = blobFile->Seek(offset);
if (!s.ok()) {
return "";
}
s = blobFile->Read(header, sizeof(header), nullptr);
if (!s.ok()) {
return "";
}
uint32_t key_size, value_size;
DecodeFixed32(header, &key_size);
DecodeFixed32(header + 4, &value_size);
std::string actualValue;
actualValue.resize(value_size);
s = blobFile->Read(actualValue.data(), value_size, nullptr);
if (!s.ok()) {
return "";
}
return actualValue;
}
bool TableBuilder::ShouldKeepKeyValue(const Slice& key, const Slice& value) {
// 判断键值对是否需要保留的逻辑,例如根据删除标记或版本号等
return true;
}
TablePropertiesCollector
特性(虽然 LevelDB 原生没有直接类似的,但可自行实现类似功能),在每次 SST 文件 Compaction 完成后,搜集 SST 文件上的 Blob 文件信息。table_builder.cc:
void TableBuilder::GarbageCollectBlobFiles() {
// 获取所有Blob文件及其垃圾数据大小
std::map<int, uint64_t> blobFileGarbageSizes = CalculateBlobFileGarbageSizes();
// 按垃圾数据大小排序
std::vector<std::pair<int, uint64_t>> sortedBlobFiles(blobFileGarbageSizes.begin(), blobFileGarbageSizes.end());
std::sort(sortedBlobFiles.begin(), sortedBlobFiles.end(), [](const auto& a, const auto& b) {
return a.second > b.second;
});
// 选择垃圾数据最多的Blob文件进行GC
if (!sortedBlobFiles.empty()) {
int bfidToGC = sortedBlobFiles[0].first;
BlobFile* inputBlobFile = GetBlobFile(bfidToGC);
// 创建新的Blob文件
leveldb::WritableFile* outputBlobDest = nullptr;
std::string outputBlobFileName = GenerateUniqueBlobFileName();
env_->NewWritableFile(outputBlobFileName, &outputBlobDest);
BlobFile* outputBlobFile = new BlobFile(outputBlobDest);
// 遍历所有SST文件,重新写入需要保留的数据
std::vector<SSTFile*> allSSTFiles = GetAllSSTFiles();
for (SSTFile* sstFile : allSSTFiles) {
std::unique_ptr<Iterator> iter(sstFile->NewIterator(Options().comparator));
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
const Slice& key = iter->key();
const Slice& value = iter->value();
if (DBImpl::key_value_separated_ && value.size() == 12) { // 假设Blob引用长度为12
uint64_t offset;
int bfid;
DecodeFixed64(value.data(), &offset);
DecodeFixed32(value.data() + 8, &bfid);
if (bfid == bfidToGC) {
std::string actualValue = ReadValueFromBlobFile(inputBlobFile, offset);
if (ShouldKeepKeyValue(key, actualValue)) {
uint64_t newOffset;
Status status = outputBlobFile->AddRecord(key, Slice(actualValue), newOffset);
if (!status.ok()) {
// 处理错误
continue;
}
// 更新SST文件中的Blob引用
std::string newBlobValue = EncodeBlobValue(newOffset, bfid);
sstFile->UpdateBlobReference(key, Slice(newBlobValue));
}
}
}
}
}
// 清理旧的Blob文件
delete inputBlobFile;
}
}
std::map<int, uint64_t> TableBuilder::CalculateBlobFileGarbageSizes() {
// 这里实现计算每个Blob文件垃圾数据大小的逻辑,类似之前提到的对比输入输出SST文件的Blob信息
std::map<int, uint64_t> blobFileGarbageSizes;
return blobFileGarbageSizes;
}
BlobFile* TableBuilder::GetBlobFile(int bfid) {
// 根据bfid获取BlobFile对象
return nullptr;
}
在实现第一版之后,我们发现有很多困难,这个策略虽然适合大规模分布式数据存储与计算,能支持高并发、分布式事务等,但存在复杂度高、小规模存储内存使用高及可定制性低、需精细参数配置与优化等问题。于是我们转变了策略,重新考虑WiscKey 这篇论文的思路和实现方式。
而 WiscKey 策略的提出,旨在优化 LSM - Tree 的写放大问题,这与我们的实验目标高度契合。相较于其他如 LSM - trie 和 PebblesDB 等优化方案,WiscKey 具有通用性强、效果显著且原理简单易懂的特点,有效解决了 LSM - Tree 因 compaction 时重写大量数据(key 和 value)导致的写放大问题,尤其在现实中常见的 ksize 远小于 vsize 场景下,优化效果更为突出。
此外,这种策略还有诸多好处。例如,LSM - Tree Compaction 无需重写 value,大幅减小写放大;LSM - Tree 不存储 value,体积变小,一个 block 能存储更多 key,有利于减少读 I/O;体积变小后,cache 效果更佳,LSM - Tree 上层基本可缓存于内存。
但是这种策略会造成一些其他影响,如 range query 转化为多次 vlog 随机读,需依赖后台线程预读缓存来优化性能,且不同场景下效果有差异;key 和 value 分开存储后的一致性保障,虽通过先写 vlog 再写 LSM - Tree 及刷盘策略解决,但存在两次刷盘开销(可通过 WAL 和 vlog 合并优化);vlog 的垃圾回收也需谨慎处理,无论是扫 LSM - Tree 还是扫 vlog 都各有优劣。然而,我们认为这些问题均可通过合理的设计与优化解决,且相较于其带来的显著优势,这些挑战是值得面对与克服的。
向当前 vlog 的尾端插入 key 和 value ,将 vlog 的编号 (file_no) 和新条目在 vlog 中的偏移量 (file_offset) 整合成 vptr,向 memtable 中写入 key 和 vptr 。当 current vlog 的大小达到一定阈值时,创建新的 vlog 。
向当前 vlog 的尾端插入 key 和 value ,将 vlog 的编号 (file_no) 和新条目在 vlog 中的偏移量 (file_offset) 整合成 vptr,向 sstable 中写入 key 和 vptr 。当 current vlog 的大小达到一定阈值时,创建新的 vlog 。
在进行 compaction 的时候会遍历 sstable 中的每一个键值对,当 drop 变量为 true 时表明该键值对已失效。根据 vptr 到 vlog_manager 中查询存放相应的 value 所在的 vlog 文件,修改其 invalid_offsets_ 集合。同时,在后台有一个线程 BackgroundCleanup 定时扫描所有 vlog ,当 vlog 中的所有条目都已失效时,会删除该 vlog 文件,从而实现垃圾回收。
VWriter::AddRecord:向Vlog写入一个Record
VWriter::Flush:实现WritableFile的刷盘
VReader::ReadRecord:从文件指定偏移位置读取条目
VReader::ReadKV:从文件指定位置读取value
管理vlog文件,维护存在的文件映射表
VlogManager::AddVlogFile:新建一个vlog文件
VlogManager::GetVlogFile:获得指定vlogfile_number的vlog文件
VlogManager::MarkVlogValueInvalid:标记指定vlog文件指定偏移量的value无效
VlogManager::GetSequentialFile:通过映射获得writablefile对应的sequentialfile
VlogManager::IncrementTotalValueCount:增加文件条目计数器
VlogManager::CleanupInvalidVlogFiles:检查所有的vlog文件是否需要被回收
VlogManager::RemoveVlogFile:删除无效的vlog文件
VlogConverter::GetVptr:根据file_no和file_offset实现vptr的包装
VlogConverter::DecodeVptr:实现vptr的解析
增加若干辅助函数
DBImpl::StartBackgroundCleanupTask:后台一个自动GC的线程
DBImpl::GetKVSepType:获取KV分离的参数类型
DBImpl::WriteValueIntoVlog:写入Vlog
DBImpl::ReadValueFromVlog:从Vlog中读
DBImpl::IsDiskBusy:判断磁盘IO是否繁忙
增加KV分离时机选项。
主要围绕根据不同的键值分离类型,灵活处理键值对的存储方式,以优化存储性能和资源利用。
首先,在 Put
方法中,添加根据 options_.kvSepType
来决定键值对存储逻辑的入口。
对于 noKVSep
和 kVSepBeforeSSD
这两种情况,意味着不需要在当前逻辑下进行特殊的键值分离操作,所以直接复用原有的 DB::Put
方法,将键值对以常规方式存入数据库。这样做既保证了代码的简洁性,又能在不需要键值分离时维持原有的存储性能和兼容性。
而当 kvSepType
为 kVSepBeforeMem
时,就需要执行键值分离逻辑。这是因为我们希望在数据进入内存之前,就将值分离存储到 VLog 中,以达到优化存储结构的目的。此时,先调用 WriteValueIntoVlog
方法,将键值对写入 VLog 并获取值在 VLog 中的指针 vptr
。这个指针将替代原始值,用于后续在数据库中的存储。写入 VLog 后,立即调用 vlog_->Flush()
确保数据及时持久化,防止数据丢失。最后,再使用 DB::Put
方法将键和 vptr
插入数据库,完成整个存储过程。
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
// Convert value to vptr if need.
if(this->options_.kvSepType == noKVSep || this->options_.kvSepType == kVSepBeforeSSD){
return DB::Put(o, key, val);
}
else if(this->options_.kvSepType == kVSepBeforeMem){
char buf[20];
Slice vptr;
Status s = WriteValueIntoVlog(key, val, buf, vptr); //朴,写
if(!s.ok()) return s;
s = vlog_-> Flush();
if(!s.ok()) return s;
return DB::Put(o, key, vptr);
}
return Status::Corruption("Invalid kvSepType.");
}
在 WriteValueIntoVlog
方法中,首要任务是检查 VLog 文件的使用情况。通过 vlogfile_offset_
和 options_.vlog_file_size
的比较,判断当前 VLog 文件是否已满。如果已满,就需要创建新的 VLog 文件。这涉及到使用环境对象 options_.env
创建新的可写文件和顺序读文件,更新相关的文件管理模块,包括删除旧的写入器和文件对象,将新的读文件添加到文件管理器 vmanager_
中,重置写入器和文件偏移量等操作,确保 VLog 文件的连续性和可管理性。
接下来,为了将键值对写入 VLog,先通过 PutLengthPrefixedSlice
函数将键和值分别添加到临时字符串 tmp_vrec
中,并在前面加上它们的长度前缀。这样的编码方式有助于后续在读取时准确解析键值对。然后,调用 vlog_->AddRecord
方法将这个临时字符串作为一条记录写入 VLog,并获取写入的大小。
最后,利用 vconverter_
的 GetVptr
方法,根据当前的 VLog 文件编号、偏移量以及提供的缓冲区 buf
,生成值在 VLog 中的指针 vptr
。同时,更新 vlogfile_offset_
,记录下本次写入导致的偏移量增加。
Status DBImpl::WriteValueIntoVlog(const Slice& key, const Slice& val, char* buf, Slice& vptr){ //朴,写
//写VLog
Status s;
if(vlogfile_offset_ >= options_.vlog_file_size){
WritableFile* newfile;
SequentialFile* readfile;
s = options_.env->NewWritableFile(VlogFileName(dbname_, vlogfile_number_ + 1), &newfile);
if(!s.ok()) return s;
s = options_.env->NewSequentialFile(VlogFileName(dbname_, vlogfile_number_ + 1), &readfile);
if(!s.ok()) return s;
//更新相应的模块
delete vlog_;
delete vlogfile_;
vmanager_->AddVlogFile(vlogfile_number_ + 1, readfile);
vlogfile_ = newfile;
vlog_ = new vlog::VWriter(vlogfile_);
++vlogfile_number_;
vlogfile_offset_ = 0;
}
int write_size = 0;
std::string tmp_vrec;
PutLengthPrefixedSlice(&tmp_vrec, key);
PutLengthPrefixedSlice(&tmp_vrec, val);
s = vlog_-> AddRecord(Slice(tmp_vrec), write_size);
if(!s.ok()) return s;
//将val替换为vptr.
vptr = vconverter_->GetVptr(vlogfile_number_, vlogfile_offset_, buf);
vlogfile_offset_ += write_size; //朴
return s;
}
在 Get
方法里处理 KV 分离相关逻辑时,核心目标是确保在不同的 KV 分离配置下,都能准确获取到完整的键值对数据。
在获取键值对之后,针对不同的 options_.kvSepType
进行特殊处理。
kVSepBeforeSSD
情况:当 options_.kvSepType
为 kVSepBeforeSSD
时,意味着数据在落盘到 SSD 之前就进行了键值分离操作。在这种情况下,从常规存储结构中获取到的值实际上可能只是一个指向 VLog 中真实值的指针(vptr)。所以,需要调用 ReadValueFromVlog
方法,利用这个 vptr
从 VLog 中读取实际的键值对数据,从而更新 value
,使其包含完整的用户所需值。这样做是因为在这种 KV 分离策略下,数据的存储结构发生了变化,必须通过额外的步骤从 VLog 中还原出完整数据。kVSepBeforeMem
情况:当 options_.kvSepType
为 kVSepBeforeMem
时,同样面临从 VLog 中获取真实值的需求。即使在前面的查找过程中已经从存储结构中获取到了一些数据,但由于这种 KV 分离策略在数据进入内存之前就将值分离到了 VLog 中,所以获取到的值依然可能只是 vptr
。因此,再次调用 ReadValueFromVlog
方法,依据 vptr
从 VLog 中读取实际的键值对并更新 value
。这一步骤保证了无论数据在哪个阶段进行了 KV 分离,都能正确获取到完整的键值对。在整个过程中,对 kvSepType
的判断以及相应的从 VLog 读取数据的操作,都是紧密围绕 KV 分离的存储结构变化而设计的,旨在确保系统在不同的 KV 分离配置下,读取操作的准确性和一致性。
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
if (options.snapshot != nullptr) {
snapshot =
static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
} else {
snapshot = versions_->LastSequence();
}
MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != nullptr) imm->Ref();
current->Ref();
bool have_stat_update = false;
Version::GetStats stats;
// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
//if KVSeq, Need Decode
if(options_.kvSepType == kVSepBeforeSSD){
std::string tmp_key;
Status s = ReadValueFromVlog(&tmp_key, value, value);
if(!s.ok()) return s;
}
}
mutex_.Lock();
}
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
//Decode vptr if Need Kvsep
if(options_.kvSepType == kVSepBeforeMem){
std::string tmp_key;
Status s = ReadValueFromVlog(&tmp_key, value, value);
if(!s.ok()) return s;
}
return s;
}
对于 ReadValueFromVlog
方法,其设计是为了配合 KV 分离机制,准确地从 VLog 中读取被分离存储的值。
解析指针:
vptr
转换为 Slice
类型 Slice encoded_vptr = Slice(*vptr)
,这是为后续解析操作做准备。Slice
类型提供了方便的字节序列处理方式,使得我们能够更灵活地操作 vptr
所包含的信息,为解析出 VLog 文件编号和偏移量奠定基础。vconverter_->DecodeVptr
方法,从 encoded_vptr
中提取出 VLog 文件编号 vlogfile_number
和文件偏移量 vlogfile_offset
。在 KV 分离的存储体系中,vptr
承载了定位 VLog 中实际值的关键信息,通过 vconverter_
模块的解码功能,我们能够获取到这些关键位置信息。若解析过程失败,说明 vptr
可能存在错误或损坏,此时直接返回错误状态,以避免后续基于错误信息的无效操作。获取 VLog 文件并读取:
vmanager_->GetVlogFile(vlogfile_number)
获取对应的 VLog 文件 SequentialFile* vlog_file
。在 KV 分离的架构下,vmanager_
负责管理所有的 VLog 文件,通过文件编号可以准确找到存储实际值的目标 VLog 文件。若获取文件失败,这很可能意味着 VLog 文件系统出现问题,比如文件丢失或损坏,此时返回数据损坏的错误状态,以提示系统可能存在严重的数据问题。vlog::VReader
对象 vreader
,该对象专门用于从 VLog 文件中读取数据。它封装了针对 VLog 文件格式的读取逻辑,为从 VLog 中准确读取键值对提供了统一且便捷的接口,使得代码结构更加清晰,易于维护和扩展。vreader.ReadKV
方法,依据之前解析得到的偏移量 vlogfile_offset
从 VLog 文件中读取键值对,并分别存储到 key
和 val
中。ReadKV
方法依据 VLog 文件特定的存储格式,从指定偏移位置准确读取数据,并将其解析为键值对形式,从而完成从 VLog 中获取被分离存储的值的关键步骤。返回读取操作的状态 s
,如果前面的所有操作都成功执行,状态为 OK
,表示成功从 VLog 中读取到了所需的键值对数据。若在任何一个步骤中出现错误,如指针解析失败、VLog 文件获取失败等,状态将包含相应的错误信息。通过返回状态,上层调用函数(如 Get
方法)能够根据具体情况进行处理,比如向用户反馈错误、进行重试逻辑或者采取其他应对措施,确保整个系统在 KV 分离存储模式下的健壮性和稳定性。
Status DBImpl::ReadValueFromVlog(std::string* key, std::string* val, std::string* vptr){ //读,朴
Status s;
Slice encoded_vptr = Slice(*vptr);
uint64_t vlogfile_number;
uint64_t vlogfile_offset;
s = vconverter_->DecodeVptr(&vlogfile_number, &vlogfile_offset, &encoded_vptr);
if(!s.ok()) return s;
SequentialFile* vlog_file = vmanager_->GetVlogFile(vlogfile_number);
if(vlog_file == nullptr) return Status::Corruption("Failed to find vlog files.");
vlog::VReader vreader = vlog::VReader(vlog_file);
vreader.ReadKV(vlogfile_offset, key, val);
return s;
}
在 drop
为 true
时标记 vlog
文件中对应的 value
为无效
合并过程中检测到 drop == true
时,检查 options_.kvseptype
是否启用 KV 分离,并根据 ikey
的 vptr
(对应的 VLog 文件编号和偏移量)将其标记为无效。
修改 DBImpl::DoCompactionWork
中的相关代码:
if (drop) {
// 标记为无效的逻辑
if (options_.kvseptype) { // 如果启用了 KV 分离
Slice vptr = input->value(); // 获取指向 VLog 的 vptr
uint64_t vlogfile_number, vlogfile_offset;
Status s = vconverter_->DecodeVptr(&vlogfile_number, &vlogfile_offset, &vptr);
if (s.ok()) {
// 更新 vmanager_,标记该 offset 无效
vmanager_->MarkVlogValueInvalid(vlogfile_number, vlogfile_offset);
}
}
continue; // 跳过当前记录
}
在 vmanager_
中新增方法:
void MarkVlogValueInvalid(uint64_t vlogfile_number, uint64_t offset) {
auto vlog_file = GetVlogFile(vlogfile_number);
if (vlog_file) {
vlog_file->MarkValueInvalid(offset); // 调用具体文件的标记逻辑
}
}
设计一个后台任务周期性扫描所有 VLog 文件,检查是否所有 value
都已标记为无效。如果是,则删除对应的文件。
启动后台线程:
void DBImpl::StartBackgroundCleanupTask() {
std::thread([this]() {
while (!shutting_down_.load(std::memory_order_acquire)) {
vmanager_->CleanupInvalidVlogFiles();
std::this_thread::sleep_for(std::chrono::seconds(60)); // 每分钟检查一次
}
}).detach();
}
在 vmanager_
中新增清理逻辑:
void CleanupInvalidVlogFiles() {
for (const auto& vlog_pair : vlog_files_) {
uint64_t vlogfile_number = vlog_pair.first;
auto vlog_file = vlog_pair.second;
if (vlog_file->AllValuesInvalid()) { // 检查文件内所有值是否无效
RemoveVlogFile(vlogfile_number); // 删除 VLog 文件
}
}
}
在删除文件时,还需要在 vmanager_
中更新状态:
void RemoveVlogFile(uint64_t vlogfile_number) {
auto it = vlog_files_.find(vlogfile_number);
if (it != vlog_files_.end()) {
delete it->second; // 删除对应的 SequentialFile
vlog_files_.erase(it); // 从管理器中移除
options_.env->DeleteFile(VlogFileName(dbname_, vlogfile_number)); // 删除实际文件
}
}
考虑到每次对磁盘的读写操作都会受到操作系统的 I/O 调度策略影响,比如是否等待某个时间片(轮转到当前请求的 I/O 操作),合理选择新建文件的时机可能减少元数据操作对当前 I/O 的干扰,有如下优势:
提高吞吐量避免高 I/O 繁忙期进行文件切换,减少因调度冲突导致的性能抖动。
优化资源使用在磁盘空闲时执行高代价的文件切换,充分利用 I/O 带宽。
降低延迟避免不必要的磁盘等待时间,提升写入操作的实时性。
因此,我们期望实现当磁盘繁忙时,推迟创建新文件,优先将数据写入现有文件的逻辑。
bool IsDiskBusy(const std::string& device) { // 判断磁盘繁忙程度是否超过指定阈值
std::ifstream io_stat("/sys/block/" + device + "/stat");
if (!io_stat.is_open()) return false;
unsigned long read_requests, write_requests, io_in_progress;
io_stat >> read_requests >> write_requests >> io_in_progress;
io_stat.close();
// 如果当前正在处理的 I/O 请求数大于阈值,则认为磁盘繁忙
return io_in_progress > 2; // 设置一个合理的阈值
}
Status DBImpl::WriteValueIntoVlog(const Slice& key, const Slice& val, char* buf, Slice& vptr) {
Status s;
// 检查是否需要新建 VLog 文件
if (vlogfile_offset_ >= options_.vlog_file_size) {
// 判断当前 I/O 是否繁忙
if (IsDiskBusy("sda")) { // 替换 "sda" 为实际设备名
// 如果 I/O 繁忙,可以选择等待或继续写入当前文件
} else {
// 如果 I/O 空闲,立即创建新文件
// 更新相应模块
}
}
// 正常写入流程
...
}
在考虑垃圾回收的机制时,我们发现同样可以使用考虑磁盘IO繁忙程度的策略。在后台线程定期对vlog_manager中的vlog文件进行扫描检查的同时,为了避免在IO过度繁忙时进行垃圾回收影响前台写入和读取的速度,我们可以考虑只在磁盘较为空闲时进行垃圾回收。但由于垃圾回收频率会影响存储空间的利用率,如果推迟进行回收,就会导致无效的空间得不到利用,这实质上仍是一个时间和空间权衡的问题。
由于存在对vlog文件的创建、修改和删除,为了确保在并发环境下的线程安全,防止多个线程同时访问共享资源(例如 vlogfile_
、vlogfile_offset_
等),从而引发数据竞争或不一致性问题,在部分操作中加入锁。
文件切换的原子性:
if (vlogfile_offset_ >= options_.vlog_file_size) {
std::unique_lock<std::mutex> lock(vlog_mutex_);
WritableFile* newfile;
SequentialFile* readfile;
...
vlogfile_offset_ = 0;
}
如果没有加锁,多个线程可能同时检测到文件满的条件,导致:
vlogfile_
,导致后续操作的混乱。写入操作的原子性
{
std::unique_lock<std::mutex> lock(vlog_mutex_);
Status s = vlog_->AddRecord(Slice(tmp_vrec), write_size);
if (!s.ok()) return s;
vlogfile_offset_ += write_size;
}
vlog_->AddRecord
是一个写入操作,需要对当前 vlogfile_
文件追加记录。如果多个线程同时调用 AddRecord
,会导致写入的内容交叉,产生数据损坏或覆盖。
vlogfile_offset_
是一个共享变量,记录当前 VLog 文件的偏移。在没有加锁的情况下,多个线程可能同时更新 vlogfile_offset_
,导致偏移值不正确,最终可能会:
插入含有字段的key value对,然后直接使用原本的Get函数获取该key对应的value,验证是否正确执行插入操作。
TEST(TestSchema, Insert) {
std::string key = "k_1";
FieldArray fields = {
{"name", "Customer#000000001"},
{"address", "IVhzIApeRb"},
{"phone", "25-989-741-2988"}
};
// 序列化并插入
std::string value = SerializeValue(fields);
db->Put(WriteOptions(), key, value);
// 读取并反序列化
std::string value_ret;
db->Get(ReadOptions(), key, &value_ret);
auto fields_ret = ParseValue(value_ret);
}
插入多个含有字段的key value对,然后通过value中的某一字段查询匹配的key值,验证反向查询功能。
TEST(TestSchema, Find) {
std::vector<std::string> keys = {"s_1", "s_2", "s_3", "s_4"};
// 构造一组字段数组
std::vector<FieldArray> FieldArrays = {
{
{"name", "Sarah"},{"sex", "f"},{"age", "20"}
},
{
{"name", "Mike"},{"sex", "m"},{"age", "19"},{"hobby", "badminton"}
},
{
{ name", "Amy"},{"sex", "f"},{"age", "21"},{"talent", "sing"}
},
{
{ name", "John"}, {"sex", "m"},{"age", "20"}
}
};
// 序列化并插入
for (int i=0; i<FieldArrays.length(); i++)
{
key = keys[i];
fields = FieldArrays[i];
std::string value = SerializeValue(fields);
db->Put(WriteOptions(), key, value);
}
// 构建目标字段
Field field = {"sex", "f"};
std::vector<std::string> key_ret;
// 查询得到对应的key
key_ret = FindKeysByField(leveldb::DB* db, Field &field)
}
顺利通过测试,测试结果如下:
先配置leveldb
的Options
,设置create_if_missing
为true
且kvSepType
为kVSepBeforeMem
来开启kv分离的功能并打开数据库,之后将DB
指针转换为DBImpl
指针,利用该指针执行Put
操作插入键值对,接着再通过此指针执行Get
操作读取刚插入的键值对,随后执行Delete
操作删除键值对并验证在KV
分离设置下删除后无法读取,最后清理数据库资源防止资源泄漏,以此测试leveldb
在KV
分离设置下的写和读功能。
#include <cassert>
#include <iostream>
#include "leveldb/db.h"
#include "db/db_impl.h"
int main() {
leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;
options.kvSepType = leveldb::kVSepBeforeMem;
leveldb::Status status = leveldb::DB::Open(options, "/tmp/testdb", &db);
if (!status.ok()) {
std::cerr << "Failed to open DB: " << status.ToString() << '\n';
return 1;
}
// 使用 dynamic_cast 将基类指针转换为 DBImpl
auto* dbimpl = static_cast<leveldb::DBImpl*>(db);
if (dbimpl == nullptr) {
std::cerr << "Failed to cast to DBImpl\n";
delete db;
return 1;
}
status = dbimpl->Put(leveldb::WriteOptions(), "key1", "val1");
if (status.ok()) {
std::string val;
status = dbimpl->Get(leveldb::ReadOptions(), "key1", &val);
std::cout << "Find value of 'key1' From db: " << val << "\n";
}
if (status.ok()) {
std::string val;
dbimpl->Delete(leveldb::WriteOptions(), "key1");
status = dbimpl->Get(leveldb::ReadOptions(), "key1", &val);
// Not found.
std::cout << status.ToString() << '\n';
}
delete db;
return 0;
}
顺利通过测试,测试结果如下:
在之前已经完成点查询测试的基础上,之所以还要进行范围查询测试,是因为点查询只能验证单个键值对的读写操作,而实际应用场景中,经常需要获取某个范围内的所有数据。例如在数据库用于存储用户信息时,按用户名的字母区间范围查询一批用户,或在时间序列数据中查询某个时间段内的数据。
代码中,TestRangeQuery
测试用例先插入一组键值对,接着定义范围查询的起始和结束键,通过NewIterator
获取迭代器,利用Seek
方法定位到范围起始位置,在迭代器有效且键值小于等于范围结束键时,将遍历到的键值对存入actual_data
。最后验证actual_data
与预期结果expected_data
是否一致,并输出范围查询结果,以此完成对范围查询功能的测试。
#include <iostream>
#include "leveldb/db.h"
#include "leveldb/options.h"
#include "gtest/gtest.h"
class RangeQueryTest : public ::testing::Test {
protected:
leveldb::DB* db;
leveldb::Options options;
std::string db_path = "/tmp/range_testdb";
void SetUp() override {
options.create_if_missing = true;
leveldb::Status status = leveldb::DB::Open(options, db_path, &db);
ASSERT_TRUE(status.ok()) << "Failed to open DB: " << status.ToString();
}
void TearDown() override {
delete db;
}
};
TEST_F(RangeQueryTest, TestRangeQuery) {
// 插入一组键值对
std::vector<std::pair<std::string, std::string>> data = {
{"a_key1", "value1"},
{"a_key2", "value2"},
{"b_key3", "value3"},
{"b_key4", "value4"},
{"c_key5", "value5"}
};
for (const auto& pair : data) {
const std::string& key = pair.first;
const std::string& value = pair.second;
leveldb::Status status = db->Put(leveldb::WriteOptions(), key, value);
ASSERT_TRUE(status.ok()) << "Failed to put data: " << status.ToString();
}
// 查询范围内的键值对
std::string range_start = "a_key1";
std::string range_end = "b_key4";
std::vector<std::pair<std::string, std::string>> expected_data = {
{"a_key1", "value1"},
{"a_key2", "value2"},
{"b_key3", "value3"},
{"b_key4", "value4"}
};
leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions());
it->Seek(range_start); // 从范围起始位置开始
std::vector<std::pair<std::string, std::string>> actual_data;
while (it->Valid() && it->key().ToString() <= range_end) {
actual_data.emplace_back(it->key().ToString(), it->value().ToString());
it->Next();
}
delete it;
// 验证范围查询结果是否符合预期
ASSERT_EQ(actual_data.size(), expected_data.size()) << "Range query results size mismatch.";
for (size_t i = 0; i < actual_data.size(); ++i) {
EXPECT_EQ(actual_data[i].first, expected_data[i].first) << "Key mismatch at index " << i;
EXPECT_EQ(actual_data[i].second, expected_data[i].second) << "Value mismatch at index " << i;
}
// 输出范围查询结果
for (const auto& pair : actual_data) {
const std::string& key = pair.first;
const std::string& value = pair.second;
std::cout << "Key: " << key << ", Value: " << value << std::endl;
}
}
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
顺利通过测试,测试结果如下:
在LevelDB中,Major Compaction还可以按照触发条件分为三类:
void CompactRange(const Slice* begin, const Slice* end)
手动触发。垃圾回收的实现虽然是在DoCompactionWork
里实现,但是这是 LevelDB 的内部实现,它并不会直接暴露为一个可调用的接口。它通常是通过触发 CompactRange
或后台自动压缩机制间接调用的。
添加一些验证步骤来检查垃圾回收的效果,比如在 CompactRange
之前插入数据并记录状态,调用 CompactRange
后验证数据是否被正确回收(标记为无效或从存储中删除)。
#include <iostream>
#include "leveldb/db.h"
#include "leveldb/options.h"
#include "gtest/gtest.h"
class LevelDBTest : public ::testing::Test {
protected:
leveldb::DB* db;
leveldb::Options options;
std::string db_path = "/tmp/testdb";
void SetUp() override {
options.create_if_missing = true;
leveldb::Status status = leveldb::DB::Open(options, db_path, &db);
ASSERT_TRUE(status.ok()) << "Failed to open DB: " << status.ToString();
}
void TearDown() override {
delete db;
}
};
TEST_F(LevelDBTest, CompactionTest) {
// 插入数据
db->Put(leveldb::WriteOptions(), "start", "value1");
db->Put(leveldb::WriteOptions(), "end", "value2");
db->Put(leveldb::WriteOptions(), "key_to_delete", "value3");
// 删除一个键,模拟删除标记
db->Delete(leveldb::WriteOptions(), "key_to_delete");
// 触发压缩
leveldb::Slice begin_key("start");
leveldb::Slice end_key("end");
db->CompactRange(&begin_key, &end_key);
// 验证压缩后的数据
std::string value;
leveldb::Status status = db->Get(leveldb::ReadOptions(), "key_to_delete", &value);
if (!status.ok()) {
std::cout << "'key_to_delete' was successfully removed during compaction." << std::endl;
} else {
FAIL() << "Unexpected: 'key_to_delete' still exists: " << value;
}
}
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
顺利通过测试,测试结果如下:
使用leveldb自带的性能测试程序在不同键值对大小下的性能:
图14 性能指标释义 图15 VALUE_SIZE=100,leveldb_KVsep_GC性能测试结果 图16 VALUE_SIZE=1000,leveldb_KVsep_GC性能测试结果 图17 VALUE_SIZE=3072,leveldb_KVsep_GC(左)与leveldb(右)性能测试结果 图18 VALUE_SIZE=3072,leveldb_KVsep性能测试结果 图19 VALUE_SIZE=3072,leveldb_KVsep*性能测试结果 (考虑时间片优化)在大value情形下,加入KV分离后,数据库在顺序写入、同步写入、随机写入和重写上性能均有显著提升,在随机读上延迟性能降低,在顺序读和反方向读上,延迟性能提高,吞吐量性能降低。与我们的预期有所不同的是,由于我们的KV分离时机在写入memtable之前,vlog中数据的顺序是按照写入顺序的,因此我们认为顺序读的性能应当相较于原本的leveldb有所下降。
在标准的 LevelDB 中,key 和 value 通常存储在同一 LSM 树中,范围查询可以通过顺序扫描 SST 文件高效完成,数据的访问模式大部分是顺序 I/O。
在 KV 分离中,key 和 value 被分开存储:Key 存在 LSM 树中,范围查询可以快速获取目标 key 范围。Value 存在分离的 VLog 文件中,而这些文件是基于写入顺序存储的,范围查询需要对每个 key 执行一次随机读取以获取对应的 value。
随机 I/O 的开销是导致吞吐量下降的主要原因,尤其是对于大范围查询,读取 value 的 I/O 次数显著增加。
GC 对写性能的影响
主要影响:写入吞吐量下降、写入延迟增加。
原因:资源争用增加了额外的写入操作。
GC 对读性能的影响
主要影响:随机读取延迟增加、范围读取吞吐量下降。
原因:磁盘资源争用、有效条目迁移、缓存失效。
磁盘资源争用:GC 会进行大量的磁盘扫描和数据重写,与范围查询的 I/O 请求产生竞争。
VLog 位置变化:如果范围查询读取的数据正好涉及 GC 清理或重写的 VLog 文件,可能引发读取失败或重试操作,进一步增加延迟。
同步写的性能大幅提升的原因:
新文件创建是一个高开销操作:
延迟创建新文件意味着同步写操作可以直接写入现有文件,避免这些开销,进而提升性能。
顺序写性能变差的原因:当旧 VLog 文件接近满容量时,延迟创建新文件会导致顺序写操作被阻塞,等待当前文件完成写入。
读取性能的下降的原因
热点文件的竞争:延迟文件创建可能导致更多数据集中在少量 VLog 文件中,增加这些文件的访问竞争。随机读取需要频繁定位条目,而热点文件的频繁访问会导致磁盘队列拥塞,从而增加随机读取的延迟。
无效条目比例增加:延迟新文件创建会让当前 VLog 文件积累更多的无效条目,这些无效数据可能干扰随机读取路径,导致读取性能下降
延迟新 VLog 文件的创建是一个写优化策略,它通过避免磁盘繁忙时的额外元数据开销来提升写入性能(尤其是同步写)。然而,这种策略会带来以下权衡:
同步写性能显著提升:
其他读写性能下降的原因:
文件布局的复杂性增加:文件容量接近上限会导致碎片化分布。
GC 干扰:较大的文件会增加 GC 操作的复杂性。
读路径的热点问题:热点文件的访问竞争拖慢了随机和顺序读取。
VLog 文件预分配:在系统初始化或创建新 VLog 文件时,预先分配一定大小的连续磁盘空间。这样可以减少文件写入过程中的磁盘碎片,提高 I/O 性能。特别是在写入大量数据时,预分配空间可以避免因频繁的空间分配和扩展操作导致的性能开销。
目前实现的是清理全部Value都无效的vlog文件,但在存储空间紧张或频繁写入更新时,文件中的无效值累计快,当无效值达到一定比例时清理 VLog 文件,相比于只清理全部无效的文件,可以更早地释放存储空间,避免系统运行时碰膨胀。但这种方法也有一定的弊端,除了要考虑到有效value的地址改变导致的LSM-树性质受到影响的问题,这种方法还在清理时会增加磁盘IO,同时当清理任务与前台写入并发时,可能导致锁争用或性能下降的问题。
细粒度锁:目前在部分操作中加入了锁来确保线程安全,但可以进一步细化锁的粒度。例如,对于不同的 VLog 文件或不同的内存区域,使用不同的锁进行保护。这样可以减少锁争用的范围,提高并发性能。在高并发场景下,细粒度锁可以允许更多的线程同时访问不同的资源,从而提升整体的系统吞吐量。
功能 | 完成日期 | 分工 |
---|---|---|
讨论并敲定KV分离的实现方式,产出初版设计文档 | 11.25 | 曹可心 & 朴祉燕 |
初步实现Value的字段功能 | 11.26 | 曹可心 |
对字段的正确性进行测试 | 11.28 | 曹可心 |
实现ValueLog的组织结构 | 11.28-11.31 | 朴祉燕 |
实现ValueLog的写功能 | 11.31-12.07 | 朴祉燕 |
实现ValueLog的读功能 | 12.07-12.16 | 曹可心 |
对ValueLog读写正确性进行测试 | 12.16-12.20 | 朴祉燕 |
实现GC功能 | 12.16-12.26 | 曹可心 |
对垃圾回收策略进行测试 | 12.26-12.28 | 朴祉燕 |
对kv分离的性能进行测试 | 12.26-12.28 | 曹可心 & 朴祉燕 |
根据性能测试结果进行优化并撰写报告 | 12.28-1.04 | 曹可心 & 朴祉燕 |