本项目的背景是提升 LevelDB 在高写入负载场景下的性能。LevelDB 是一种轻量级的键值存储引擎,但在数据频繁更新或大值(Large Values)存储场景下,由于数据写入和合并(Compaction)过程的设计,其性能可能受到显著影响。为解决这一问题,项目目标是实现 KV(Key-Value)分离机制,以降低写放大现象并提高存储效率。
具体实现内容包括在 LevelDB 内部引入 KV 分离功能,即将键(Key)与值(Value)存储到不同的存储介质中。通过修改 SSTable 的结构设计,将键与指向值的指针存储在原有的文件中,而将实际值存储到单独的文件或存储介质中,从而减少 Compaction 操作对大值的处理负担。此外,项目还优化了数据访问逻辑,实现了值文件的高效读写支持。
该功能的应用场景主要包括:
本项目涵盖下面三个方面:
具体指:基于 levelDB扩展 value 的结构,使其可以包含多个字段,并通过这些字段实现类似数据库列查询的功能。
using Field = std::pair<std::string, std::string>; // field_name:field_value
using FieldArray = std::vector<std::pair<std::string, std::string>>;
void DBImpl::SerializeValue(const FieldArray& fields, std::string &value, size_t slot_num)
功能: 将传入的字段数组和 slot_num 序列化为字符串,并存到 value
字符串形式:
single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
输入: 字段数组, slot_num, &value
具体实现如下:
void DBImpl::SerializeValue(const FieldArray& fields, std::string &value, size_t slot_num) {
std::string tmp_value;
uint16_t value_size = sizeof(uint16_t);
uint16_t field_nums = 0;
for (const auto& field : fields) {
const uint8_t attr_name_len = field.name.size();
const uint16_t attr_value_len = field.value.size();
const size_t attr_size = attr_name_len + attr_value_len + sizeof(uint8_t) + sizeof(uint16_t);
char attr_data[attr_size];
size_t off = 0;
memcpy(attr_data+off, &attr_name_len, sizeof(uint8_t));
off += sizeof(uint8_t);
memcpy(attr_data+off, field.name.c_str(), attr_name_len);
off += attr_name_len;
memcpy(attr_data+off, &attr_value_len, sizeof(uint16_t));
off += sizeof(uint16_t);
memcpy(attr_data+off, field.value.c_str(), attr_value_len);
off += attr_value_len;
assert(off == attr_size);
tmp_value += std::string(attr_data, attr_size);
value_size += attr_size;
field_nums ++;
}
char value_data[value_size];
memcpy(value_data, &value_size, sizeof(uint16_t));
memcpy(value_data+sizeof(uint16_t), tmp_value.c_str(), tmp_value.size());
assert(sizeof(uint16_t) + tmp_value.size() == value_size);
value = std::string(value_data, value_size);
}
void DBImpl::DeserializeValue(FieldArray& fields, const std::string& value_str)
功能: 将传入的待解码字符串反序列化为字段数组并存到 fields
字符串形式:
single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
输入: 存放解码结果的字段数组,待解码的字符串
具体实现如下:
void DBImpl::DeserializeValue(FieldArray& fields, const std::string& value_str) {
const char *value_data = value_str.c_str();
const size_t value_len = value_str.size();
size_t attr_off = sizeof(uint16_t);
while (attr_off < value_len) {
uint8_t attr_name_len = *(uint8_t *)(value_data+attr_off);
attr_off += sizeof(uint8_t);
auto attr_name = std::string(value_data+attr_off, attr_name_len);
attr_off += attr_name_len;
uint16_t attr_len = *(uint16_t *)(value_data+attr_off);
attr_off += sizeof(uint16_t);
auto attr_value = std::string(value_data+attr_off, attr_len);
attr_off += attr_len;
fields.push_back({attr_name, attr_value});
}
assert(attr_off == value_len);
}
std::vector<std::string> FindKeysByField(leveldb::DB* db, const Field& field)
功能: 根据传入的字段值 field 查找所有包含该字段的 key,由于一个字段值可能对应多个key,所以返回std::vector<std::string>
具体实现如下:
std::vector<std::string> FindKeysByField(leveldb::DB* db, const Field& field) {
std::vector<std::string> keys;
leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions());
// 遍历数据库中所有的键值对
for (it->SeekToFirst(); it->Valid() ; it->Next()) {
std::string key = it->key().ToString();
FieldArray fields;
// 调用 Get_Fields 函数,获取 key 对应的字段数组
db->Get_Fields(leveldb::ReadOptions(), key, fields);
// 遍历字段数组,如果字段数组中包含该字段,则将该 key 添加到 keys 中
for (const auto& f : fields) {
if (f.name == field.name && f.value == field.value) {
keys.push_back(key);
break; // 假设每个key中每个字段值唯一
}
}
}
delete it;
return keys;
}
在LevelDB中实现KV分离,即将键值对中的键和值存储在不同的存储区域,以优化写性能和点查询性能。
数据结构设计:
memtable中:| key | slot_num |
slot_page中: | slot0:{vlog_no(定长), offset(定长)}, slot1:{vlog_no, offset}, ... |
value_log 中:|value 长度 | slot_num | attr个数(定长) | attr1_name的长度(定长) | attr1_name(变长) | attr1_value的长度(定长) | attr1_value(变长) | ... |
2. 读取操作
读写操作:
Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key, const FieldArray& fields)
功能:
将传入的字段数组插入数据库中
步骤:
代码实现:
Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key,
const FieldArray& fields) {
std::string serialized_value;
// 分配一个 slot 下标
size_t slot_num = slot_page_->alloc_slot();
// 将 fields 序列化为字符串
SerializeValue(fields, serialized_value, slot_num);
struct slot_content sc;
// 将序列化后的字符串插入 value_log 中
vlog_set_->put_value(sc, slot_num, serialized_value);
slot_page_->set_slot(slot_num, &sc);
char data[sizeof(size_t)];
memcpy(data, &slot_num, sizeof(size_t));
Slice slot_val(data, sizeof(data));
// 将 slot_num 作为 value 插入 memtable 中
return DB::Put(opt, key, slot_val);
}
Status DBImpl::Get_Fields(const ReadOptions& options, const Slice& key,FieldArray& fields)
功能:
从数据库中读取 key 对应的字段数组
步骤:
读取流程
代码实现:
Status DBImpl::Get_Fields(const ReadOptions& options, const Slice& key,
FieldArray& fields) {
size_t slot_num;
// 从 memtable 中读取 key 对应的 slot_num
auto s = get_slot_num(options, key, &slot_num);
if (!s.ok()) {
return s;
}
struct slot_content sc;
std::string vlog_value;
// 根据 slot_num 获取 slot_page_ 中的信息
slot_page_->get_slot(slot_num, &sc);
// 根据 slot_page_ 中的信息,从 value_log 中读取字符串并存放到 vlog_value
vlog_set_->get_value(sc, &vlog_value);
if (vlog_value.empty()) {
return Status::NotFound("value has been deleted");
}
// 调用 DeserializeValue 函数,将 vlog_value 解码为字段数组 fields
DeserializeValue(fields, vlog_value);
return Status::OK();
}
Status DBImpl::Delete(const WriteOptions& options, const Slice& key)
功能: 删除 key 对应的条目
步骤:
代码实现:
// 删除操作,删除 key 对应的条目
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
size_t slot_num;
// get_slot_num 函数的作用: 获取 key 对应的 slot_num
auto s = get_slot_num(ReadOptions(), key, &slot_num);
if (!s.ok()) {
return s;
}
struct slot_content sc;
// get_slot 函数作用:
slot_page_->get_slot(slot_num, &sc);
// del_value 函数作用:
vlog_set_->del_value(sc);
slot_page_->dealloc_slot(slot_num);
return DB::Delete(options, key);
}
GC 和 slot_page 管理, vlog管理: 4. 确保操作的原子性
锁机制:
设计思路:
/db/db_impl.cc
: 修改函数 DBImpl::Get, DBImpl::Put 和 DBImpl::Delete,添加函数 Put_fields, Get_fields, get_slot_num,SerializeValue, DeserializeValue/db/db_impl.h
: 添加两个结构体 SlotPage *slot_page_; VlogSet *vlog_set_ ,添加增加的相关函数的声明/db/shared_lock.h
定义了一个 SharedLock 类,用于实现读写锁机制,包含四种操作:soft_lock():获取共享读锁,确保在没有写操作时允许多个读操作并发进行;soft_unlock():释放共享读锁;hard_lock():获取独占写锁,确保只有当没有其他读写操作时,允许写入操作进行;hard_unlock():释放独占写锁。/db/slotpage.h
/db/threadpool.h
/db/vlog.h
/db/vlog_gc.cpp
/db/vlog_gc.h
/db/vlog_set.cpp
/db/vlog_set.h
/test/db_test3.cc
:测试 value 的字段功能/test/db_test4.cc
/test/db_test5.cc
CMakeLists.txt
:添加可执行文件对于每一次读取,用户线程先读取lsm tree中key的slot_num下标,然后到slot_page中读取对应的slot内容(每一个slot都是定长的),之后再在这个slot中读取value所在的vlog文件号和偏移量offset,之后到对应的vlog文件中读取value。
但是这又带来了一个问题,我们该如何管理slot_page这个文件?当插入新的kv时,我们需要在这个slot_page中分配新的slot,在GC删除某个kv时,我们需要将对应的slot进行释放。这里我们选择在内存中维护一个可线性扩展的bitmap。这个bitmap中每一个bit标识了当前slot_page文件中对应slot是否被使用,是为1,不是为0。这样一来,在插入新kv时,我们可以用bitmap来分配一个新的slot(将bitmap中第一个为0的bit设置为1),将内容进行写入;在GC删除某个kv时,我们将这个slot对应的bitmap中的bit重置为0即可。
测试流程:
测试代码:
TEST(TestSchema, Basic) {
DB* db;
WriteOptions writeOptions;
ReadOptions readOptions;
if (!OpenDB("testdb_function", &db).ok()) {
std::cerr << "open db failed" << std::endl;
abort();
}
std::string key0 = "k_0";
std::string key1 = "k_1";
std::string key2 = "k_2";
std::string key3 = "k_3";
FieldArray fields0 = {{"name", "myc&wxf"}};
FieldArray fields1 = {
{"name", "Customer1"},
{"address", "IVhzIApeRb"},
{"phone", "25-989-741-2988"}
};
FieldArray fields2 = {
{"name", "Customer1"},
{"address", "ecnu"},
{"phone", "123456789"}
};
FieldArray fields3 = {
{"name", "Customer2"},
{"address", "ecnu"},
{"phone", "11111"}
};
db->Put_Fields(leveldb::WriteOptions(), key0, fields0);
db->Put_Fields(leveldb::WriteOptions(), key1, fields1);
db->Put_Fields(leveldb::WriteOptions(), key2, fields2);
db->Put_Fields(leveldb::WriteOptions(), key3, fields3);
FieldArray fields_ret_0;
FieldArray fields_ret_1;
FieldArray fields_ret_2;
FieldArray fields_ret_3;
db->Get_Fields(leveldb::ReadOptions(), key0, fields_ret_0);
db->Get_Fields(leveldb::ReadOptions(), key1, fields_ret_1);
db->Get_Fields(leveldb::ReadOptions(), key2, fields_ret_2);
db->Get_Fields(leveldb::ReadOptions(), key3, fields_ret_3);
// 检查反序列化结果
ASSERT_EQ(fields_ret_0.size(), fields0.size());
for (size_t i = 0; i < fields_ret_0.size(); ++i) {
ASSERT_EQ(fields_ret_0[i].name, fields0[i].name);
ASSERT_EQ(fields_ret_0[i].value, fields0[i].value);
}
ASSERT_EQ(fields_ret_1.size(), fields1.size());
for (size_t i = 0; i < fields_ret_1.size(); ++i) {
ASSERT_EQ(fields_ret_1[i].name, fields1[i].name);
// ASSERT_EQ(fields_ret_1[i].value, fields1[i].value);
}
// 测试查找功能
Field query_field = {"name", "Customer1"};
std::vector<std::string> found_keys = FindKeysByField(db, query_field);
// 删除查找到的第一个 key
const std::string& key = found_keys[0];
db->Delete(leveldb::WriteOptions(), key);
// 再次查找
std::vector<std::string> found_deleted_keys = FindKeysByField(db, query_field);
// 关闭数据库
delete db;
}
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
测试结果:
单元测试:
吞吐率下降很多 写放大下降很多
列出实现过程中可能遇到的技术难题及其解决思路,如如何处理GC开销、数据同步、索引原子更新等问题。 各种参数的设置,比如vlog的容量上限,以及slot_page的bitmap管理方式是否足够高效?以及在GC过程中如果对被GC中的vlog进行写入该让用户线程和后台线程以什么样的方式进行同步?slot_page的读写放大也是一个重要的问题。
功能 | 完成日期 | 分工 |
---|---|---|
Field相关接口实现 | 12.8 | 王雪飞 |
value_log中value的存储格式 | 12.8 | 王雪飞 |
slot_page 相关接口 | 12.8 | 马也驰 |
slot_page实现 | 12.8 | 马也驰 |
修改leveldb的接口实现字段功能 | 12.17 | 王雪飞 |
vlog的GC实现 | 12.29 | 马也驰 |
性能测试 | 1.5 | 王雪飞, 马也驰 |
功能测试 | 1.5 | 王雪飞, 马也驰 |