本项目的背景是提升 LevelDB 在高写入负载场景下的性能。LevelDB 是一种轻量级的键值存储引擎,但在数据频繁更新或大值(Large Values)存储场景下,由于数据写入和合并(Compaction)过程的设计,其性能可能受到显著影响。为解决这一问题,项目目标是实现 KV(Key-Value)分离机制,以降低写放大现象并提高存储效率。
具体实现内容包括在 LevelDB 内部引入 KV 分离功能,即将键(Key)与值(Value)存储到不同的存储介质中。通过修改 SSTable 的结构设计,将键与指向值的指针存储在原有的文件中,而将实际值存储到单独的文件或存储介质中,从而减少 Compaction 操作对大值的处理负担。此外,项目还优化了数据访问逻辑,实现了值文件的高效读写支持。
该功能的应用场景主要包括:
本项目涵盖下面三个方面:
具体指:基于 levelDB扩展 value 的结构,使其可以包含多个字段,并通过这些字段实现类似数据库列查询的功能。
using Field = std::pair<std::string, std::string>; // field_name:field_value
using FieldArray = std::vector<std::pair<std::string, std::string>>;
void DBImpl::SerializeValue(const FieldArray& fields, std::string &value)
功能: 将传入的字段数组序列化为字符串,并存到 value
字符串形式:
single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
输入: 字段数组, slot_num, &value
具体实现如下:
void DBImpl::SerializeValue(const FieldArray& fields, std::string &value) {
// 先构建 slot_num 之后的字符串,存到 tmp_value 中
std::string tmp_value;
// slot_num 之后的总长度
uint16_t value_size = sizeof(uint16_t);
// 字段数目
uint16_t field_nums = 0;
// 遍历所有字段
for (const auto& field : fields) {
// 字段名的长度
const uint8_t attr_name_len = field.name.size();
// 字段属性的长度
const uint16_t attr_value_len = field.value.size();
const size_t attr_size = attr_name_len + attr_value_len + sizeof(uint8_t) + sizeof(uint16_t);
// 用 attr_data 存放:字段名长度 + 字段名 + 字段属性长度 + 字段属性
char attr_data[attr_size];
size_t off = 0;
memcpy(attr_data+off, &attr_name_len, sizeof(uint8_t));
off += sizeof(uint8_t);
memcpy(attr_data+off, field.name.c_str(), attr_name_len);
off += attr_name_len;
memcpy(attr_data+off, &attr_value_len, sizeof(uint16_t));
off += sizeof(uint16_t);
memcpy(attr_data+off, field.value.c_str(), attr_value_len);
off += attr_value_len;
assert(off == attr_size);
// 将 attr_data 添加到 tmp_value 中
tmp_value += std::string(attr_data, attr_size);
// 更新总长度和字段数目
value_size += attr_size;
field_nums ++;
}
// value_data 存放完整的字符串
char value_data[value_size];
// 将 value_size 添加到 value_data 中
memcpy(value_data, &value_size, sizeof(uint16_t));
// 将 tmp_value 添加到 value_data 中
memcpy(value_data+sizeof(uint16_t), tmp_value.c_str(), tmp_value.size());
assert(sizeof(uint16_t) + tmp_value.size() == value_size);
value = std::string(value_data, value_size);
}
void DBImpl::DeserializeValue(FieldArray& fields, const std::string& value_str)
功能: 将传入的待解码字符串反序列化为字段数组并存到 fields
字符串形式:
single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
输入: 存放解码结果的字段数组,待解码的字符串
具体实现如下:
void DBImpl::DeserializeValue(FieldArray& fields, const std::string& value_str) {
const char *value_data = value_str.c_str();
// value_len 为 value 的长度
const size_t value_len = value_str.size();
// 最前面为 value_size,大小为 uint16_t
size_t attr_off = sizeof(uint16_t);
// 当偏移小于 value 的长度时,继续解码
while (attr_off < value_len) {
// 下面的步骤为:读取属性后加偏移
// 读属性名长度,加偏移
uint8_t attr_name_len = *(uint8_t *)(value_data+attr_off);
attr_off += sizeof(uint8_t);
// 读属性名,加偏移
auto attr_name = std::string(value_data+attr_off, attr_name_len);
attr_off += attr_name_len;
// 读属性长度,加偏移
uint16_t attr_len = *(uint16_t *)(value_data+attr_off);
attr_off += sizeof(uint16_t);
// 读属性值,加偏移
auto attr_value = std::string(value_data+attr_off, attr_len);
attr_off += attr_len;
// 将 属性名 和 属性 添加到 fields 中
fields.push_back({attr_name, attr_value});
}
assert(attr_off == value_len);
}
std::vector<std::string> FindKeysByField(leveldb::DB* db, const Field& field)
功能: 根据传入的字段值 field 查找所有包含该字段的 key,由于一个字段值可能对应多个key,所以返回std::vector<std::string>
具体实现如下:
std::vector<std::string> FindKeysByField(leveldb::DB* db, const Field& field) {
std::vector<std::string> keys;
// 遍历数据库中所有的 KV 对
leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid() ; it->Next()) {
std::string key = it->key().ToString();
FieldArray fields;
// 调用 Get_Fields 函数,获取 key 对应的字段数组
db->Get_Fields(leveldb::ReadOptions(), key, fields);
// 遍历字段数组,如果字段数组中包含该字段,则将该 key 添加到 keys 中
for (const auto& f : fields) {
if (f.name == field.name && f.value == field.value) {
keys.push_back(key);
break; // 假设每个key中每个字段值唯一
}
}
}
delete it;
return keys;
}
在LevelDB中实现KV分离,即将键值对中的键和值存储在不同的存储区域,以优化写性能和点查询性能。
/db/db_impl.cc
: 修改函数 DBImpl::Get, DBImpl::Put 和 DBImpl::Delete,添加函数 Put_fields, Get_fields, get_slot_num,SerializeValue, DeserializeValue/db/db_impl.h
: 添加两个结构体 SlotPage *slot_page_; VlogSet *vlog_set_ ,添加增加的相关函数的声明/db/shared_lock.h
: 定义了一个 SharedLock 类,用于实现读写锁机制。该类支持两种锁模式:软锁(soft_lock/soft_unlock)和硬锁(hard_lock/hard_unlock);/db/slotpage.h
:struct slot_content {
uint32_t vlog_num;
uint32_t value_offset;
slot_content() {}
slot_content(uint32_t vn, uint32_t vo) {
vlog_num = vn;
value_offset = vo;
}
};
struct vlog_info {
std::mutex vlog_info_latch_; // 保护对vlog_info本身的并发修改
size_t vlog_num;
size_t vlog_num_for_gc; // set when start gc
bool processing_gc;; // init to be false, set as true when processing gc
bool vlog_valid_; // init to be true, set as false after deleted
size_t discard;
size_t value_nums;
size_t curr_size;
vlog_info(size_t vlog_num) : processing_gc(false), discard(0), value_nums(0),
vlog_num(vlog_num), curr_size(2*sizeof(size_t)), vlog_valid_(true) {}
vlog_info(size_t vlog_num, size_t value_nums, size_t curr_size) : processing_gc(false),
discard(0), value_nums(value_nums),
vlog_num(vlog_num), curr_size(curr_size),
vlog_valid_(true) {}
};
struct vlog_handler {
std::mutex vlog_handler_latch_;
size_t curr_access_thread_nums;
SharedLock vlog_latch_; // 表明当前vlog上的并发情况,读上soft_lock,写上hard_lock
vlog_handler() : curr_access_thread_nums(0) {}
inline void incre_access_thread_nums() {
vlog_handler_latch_.lock();
curr_access_thread_nums ++;
vlog_handler_latch_.unlock();
}
inline void decre_access_thread_nums() {
vlog_handler_latch_.lock();
curr_access_thread_nums --;
vlog_handler_latch_.unlock();
}
inline bool non_access_thread() {
bool flag = false;
vlog_handler_latch_.lock();
if (!curr_access_thread_nums) {
flag = true;
}
vlog_handler_latch_.unlock();
return flag;
}
};
定义 VlogGC 类,声明类中函数:do_gc, exec_gc, gc_counter_increment, gc_counter_decrement, get_gc_num, vlog_in_gc, add_vlog_in_gc, del_vlog_in_gc
struct executor_param {
VlogGC *vg;
size_t old_vlog_num;
size_t new_vlog_num;
};
/db/vlog_set.h
:
定义 VlogSet 结构体/db/vlog_cache.h
:/db/vlog_cache.cpp
:实现 VlogCache 类中的函数/db/vlog_set.cpp
:
定义函数: get_value, get_writable_vlog_info, put_value, del_value, register_new_vlog, remove_old_vlog, vlog_need_gc, register_inconfig_file, remove_from_config_file, create_vlog, restore_vlog_inmaps, register_vlog_inmaps, remove_vlog_from_maps, read_vlog_value, write_vlog_value, mark_del_valueCMakeLists.txt
:添加可执行文件数据结构设计:
sstable 中:| key | slot_num |
slot_page 中: | slot0:{vlog_no(定长), offset(定长)}, slot1:{vlog_no, offset}, ... |
value_log 中:|value 长度 | slot_num | attr个数(定长) | attr1_name的长度(定长) | attr1_name(变长) | attr1_value的长度(定长) | attr1_value(变长) | ... |
读操作:
Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key, const FieldArray& fields)
功能:
将传入的字段数组插入数据库中
步骤:
代码实现:
Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key,
const FieldArray& fields) {
std::string serialized_value;
// alloc_slot 函数作用:分配一个 slot_num
size_t slot_num = slot_page_->alloc_slot();
// SerializeValue 函数作用:将字段数组和 slot_num_str 序列化为字符串 serialized_value
SerializeValue(fields, serialized_value, slot_num);
// 实例化 slot_content 结构体 sc
struct slot_content sc;
// put_value函数作用:将序列化后的字符串 serialized_value 插入 value_log 中
vlog_set_->put_value(sc, slot_num, serialized_value);
// set_slot函数作用: 将 slot_num 写入到 缓冲块中
slot_page_->set_slot(slot_num, &sc);
// 将 slot_num 作为 value 插入 memtable 中
char data[sizeof(size_t)];
memcpy(data, &slot_num, sizeof(size_t));
Slice slot_val(data, sizeof(data));
return DB::Put(opt, key, slot_val);
}
size_t alloc_slot()
功能: 分配一个 slot_num
实现步骤:
具体实现如下:
size_t alloc_slot() {
// 获取互斥锁
mtx.lock();
size_t target_slot = first_empty_slot;
char *start_byte = get_bitmap_byte(slot2byte(first_empty_slot));
const size_t off = slot2offset(first_empty_slot);
SETBIT(start_byte, off);
// find the next free slot
if (HASFREESLOT(*start_byte)) {
auto bit_off = find_first_free_slot_inbyte(*start_byte);
first_empty_slot += bit_off - off;
if (slot2byte(first_empty_slot) >= size) {
alloc_new_bitmap();
}
} else {
size_t i;
for (i = slot2byte(first_empty_slot)+1; i < size; i++) {
char *byte = get_bitmap_byte(i);
if (HASFREESLOT(*byte)) {
// FIXME: pack four bytes to do free slot finding
auto bit_off = find_first_free_slot_inbyte(*byte);
first_empty_slot = byte2slot(i) + bit_off;
break;
}
}
// scale the bitmap
if (i >= size) {
alloc_new_bitmap();
// char *byte = get_bitmap_byte(i);
// SETBIT(byte, 0);
first_empty_slot = byte2slot(i) + 1;
}
}
mtx.unlock();
return target_slot;
}
void set_slot(size_t slot_num, struct slot_content *sc)
功能: 将一个槽位的内容设置到缓存块中
实现步骤:
具体实现如下:
void set_slot(size_t slot_num, struct slot_content *sc) {
auto block_num = slotnum_hash2_blocknum(slot_num);
auto blockcache_num = block_num % BLOCK_NUM;
latches_[blockcache_num].lock();
if (!info[blockcache_num].used || info[blockcache_num].block_num != block_num) {
if (info[blockcache_num].is_dirty) {
write_back_block(blockcache_num);
}
read_in_block(blockcache_num, block_num);
access_time[blockcache_num] = 0;
info[blockcache_num] = block_info(block_num, false, true);
}
set_slot(sc, blockcache_num, SLOT_OFFSET_IN_BLOCK(slot_num));
access_time[blockcache_num]++;
info[blockcache_num].is_dirty = true;
latches_[blockcache_num].unlock();
}
void VlogSet::put_value(struct slot_content &sc, size_t slot_num, const leveldb::Slice &value)
功能: 将序列化后的字符串 serialized_value 插入 value_log 中,位置由 slot_content 确定
输入: 待插入的字符串 value,slot_num,slot_content
实现步骤:
具体实现如下:
void VlogSet::put_value(struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) {
mtx.lock();
// 根据值的大小获取可写入的vlog信息
auto vinfo = get_writable_vlog_info(value.size());
if (!vinfo) {
// vlog全部已满,创建新的vlog
auto _vlog_num_ = register_new_vlog();
vinfo = get_vlog_info(_vlog_num_);
}
// 锁定 vlog 信息,更新 slot_content 内容
vinfo->vlog_info_latch_.lock();
sc.vlog_num = vinfo->vlog_num;
sc.value_offset = vinfo->curr_size;
// 更新 vlog 内容,包括当前大小 curr_size 和存储的 value 个数
vinfo->curr_size += value.size() + sizeof(uint16_t) + sizeof(size_t);
vinfo->value_nums ++;
// 根据 vlog 编号获取 vlog 处理器
auto vhandler = get_vlog_handler(vinfo->vlog_num);
// 如果 vlog 无效或者正在进行GC,则使用 vlog_num_for_gc
if (!vinfo->vlog_valid_ || vinfo->processing_gc) {
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc);
}
// 加锁
vhandler->vlog_latch_.hard_lock();
// 增加访问线程数
vhandler->incre_access_thread_nums(); // FIXME: increase thread nums
mtx.unlock(); // for better performance
// vlog 信息写入完毕,解锁
vinfo->vlog_info_latch_.unlock();
// write_vlog_value 函数功能:将字符串 serialized_value 写入 vlog 中
write_vlog_value(sc, slot_num, value);
// 写入完毕,减少访问线程数
vhandler->decre_access_thread_nums(); // FIXME: decrease thread nums
// 解锁
vhandler->vlog_latch_.hard_unlock();
}
void VlogSet::write_vlog_value(const struct slot_content &sc, size_t slot_num, const leveldb::Slice &value)
功能: 将字符串 value 写入 vlog 中
输入: slot_content,slot_num,字符串 value
实现步骤:
具体实现如下:
void VlogSet::write_vlog_value(const struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) {
// 函数 get_vlog_name 作用:获取 slot_content 中 vlog_num 对应的 vlog 名称
auto vlog_name = get_vlog_name(sc.vlog_num);
// 打开文件:使用 fstream 打开文件,确保文件以读写模式打开
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out);
// 定位写入位置:通过 seekp 方法将文件指针移动到 slot_content 中的 value_offset 位置
handler.seekp(sc.value_offset);
// 准备数据:构造要写入的数据,包括值大小(uint16_t)、slot_num(size_t)和实际值内容(Slice)
const char *value_buff = value.data();
const size_t off = sizeof(uint16_t) + sizeof(size_t);
const size_t value_size = off + value.size();
char data[value_size];
memcpy(data, &value_size, sizeof(uint16_t));
memcpy(data+sizeof(uint16_t), &slot_num, sizeof(size_t));
memcpy(data+off, value_buff, value.size());
handler.write(data, value_size);
// 刷新缓冲区:调用 flush 方法确保数据写入磁盘
handler.flush();
// 关闭文件
handler.close();
}
Status DBImpl::Get_Fields(const ReadOptions& options, const Slice& key,FieldArray& fields)
功能: 从数据库中读取 key 对应的字段数组并存放到 fields 中
实现步骤:
代码实现:
Status DBImpl::Get_Fields(const ReadOptions& options, const Slice& key,
FieldArray& fields) {
size_t slot_num;
// get_slot_num 函数作用:从 memtable 中读取 key 对应的 slot_num
auto s = get_slot_num(options, key, &slot_num);
if (!s.ok()) {
return s;
}
struct slot_content sc;
std::string vlog_value;
// get_slot 函数作用:根据 slot_num 从缓存中获取 slot_content,并存放到 sc 中
slot_page_->get_slot(slot_num, &sc);
// get_value 函数作用:根据 sc 中的信息,从 value_log 中读取字符串并存放到 vlog_value
vlog_set_->get_value(sc, &vlog_value);
if (vlog_value.empty()) {
return Status::NotFound("value has been deleted");
}
// 调用 DeserializeValue 函数,将 vlog_value 解码为字段数组 fields
DeserializeValue(fields, vlog_value);
return Status::OK();
}
void get_slot(size_t slot_num, struct slot_content *sc)
功能: 获取 slot_num 对应的 slot_content 并存放到 sc 中
实现步骤:
具体实现如下:
void get_slot(size_t slot_num, struct slot_content *sc) {
auto block_num = slotnum_hash2_blocknum(slot_num);
auto blockcache_num = block_num % BLOCK_NUM;
latches_[blockcache_num].lock();
if (!info[blockcache_num].used || info[blockcache_num].block_num != block_num) { // cache miss
if (info[blockcache_num].is_dirty) {
write_back_block(blockcache_num);
}
read_in_block(blockcache_num, block_num);
access_time[blockcache_num] = 0;
info[blockcache_num] = block_info(block_num, false, true);
}
read_slot(sc, blockcache_num, SLOT_OFFSET_IN_BLOCK(slot_num));
access_time[blockcache_num]++;
latches_[blockcache_num].unlock();
}
void VlogSet::get_value(const struct slot_content &sc, std::string *value)
功能: 根据 slot_content 从 vlog 中读取字符串并存放到 value 中
实现步骤:
void VlogSet::get_value(const struct slot_content &sc, std::string *value) {
// 获取互斥锁
mtx.lock();
// get_vlog_info 函数作用:根据 sc 中的 vlog_num 获取 vlog_info
auto vinfo = get_vlog_info(sc.vlog_num);
// get_vlog_handler 函数作用:根据 sc 中的 vlog_num 获取 vlog_handler
auto vhandler = get_vlog_handler(sc.vlog_num);
// 加 vlog 信息锁
vinfo->vlog_info_latch_.lock();
// 根据 vinfo 检查 vlog 是否有效
if (!vinfo->vlog_valid_) {
// 如果无效,则进行垃圾处理
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc);
}
// 加锁
vhandler->vlog_latch_.soft_lock();
// 增加访问线程数
vhandler->incre_access_thread_nums(); // FIXME: increase thread nums
// 释放互斥锁和 vlog 信息锁
mtx.unlock(); // for better performance
vinfo->vlog_info_latch_.unlock();
// read_vlog_value 函数作用:根据 sc 中的 vlog_num 和 value_offset 从 vlog 中读取字符串
read_vlog_value(sc, value);
// 减少访问线程数
vhandler->decre_access_thread_nums(); // FIXME: decrease thread nums
// 释放 vlog 锁
vhandler->vlog_latch_.soft_unlock();
}
void VlogSet::read_vlog_value(const struct slot_content &sc, std::string *value)
功能: 根据 sc 中的 vlog_num 和 value_offset 从 vlog 中读取字符串并存放到 value
实现步骤:
具体实现如下:
void VlogSet::read_vlog_value(const struct slot_content &sc, std::string *value) {
// 根据 sc 中的 vlog_num 获取 vlog 文件名
auto vlog_name = get_vlog_name(sc.vlog_num);
// 打开 vlog 文件
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out);
// 使用 seekp 方法将文件指针定位到 value_offset 指定的位置
handler.seekp(sc.value_offset);
// 从文件中读取固定大小的数据到缓冲区 value_buff
char value_buff[VALUE_BUFF_SIZE];
handler.read(value_buff, VALUE_BUFF_SIZE);
// 从缓冲区中提取值的大小,并检查是否被标记为删除
uint16_t value_size;
memcpy(&value_size, value_buff, sizeof(uint16_t));
// 如果值带有删除标记,则将结果字符串设置为空并返回
if (value_size & VALUE_DELE_MASK) {
*value = "";
return ;
}
// 计算实际值的大小并从缓冲区中提取值,存储到结果字符串中
value_size &= VALUE_SIZE_MASK;
assert(value_size <= VALUE_BUFF_SIZE);
const size_t off = sizeof(uint16_t)+sizeof(size_t);
// 减去 off 的长度,只读取出 value 中实际存放属性的部分
*value = std::string(&value_buff[off], value_size-off);
// 关闭文件
handler.close();
}
Status DBImpl::Delete(const WriteOptions& options, const Slice& key)
功能: 删除 key 对应的条目
步骤:
代码实现:
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
size_t slot_num;
// get_slot_num 函数的作用: 获取 key 对应的 slot_num
auto s = get_slot_num(ReadOptions(), key, &slot_num);
if (!s.ok()) {
return s;
}
struct slot_content sc;
// get_slot 函数作用: 获取 slot_num 对应的 slot_content
slot_page_->get_slot(slot_num, &sc);
// del_value 函数作用:删除 vlog 中 slot_content 对应的条目
vlog_set_->del_value(sc);
// dealloc_slot 函数作用: 释放 slot_num 对应的 slot
slot_page_->dealloc_slot(slot_num);
return DB::Delete(options, key);
}
void VlogSet::del_value(const struct slot_content &sc)
功能: 删除 vlog 中 slot_content 对应的条目
实现步骤:
具体实现如下:
void VlogSet::del_value(const struct slot_content &sc) {
mtx.lock();
auto vinfo = get_vlog_info(sc.vlog_num);
auto vhandler = get_vlog_handler(sc.vlog_num);
vinfo->vlog_info_latch_.lock();
if (!vinfo->vlog_valid_ || vinfo->processing_gc) {
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc);
}
vhandler->vlog_latch_.hard_lock();
vhandler->incre_access_thread_nums(); // FIXME: increase thread nums
mtx.unlock(); // for better performance
vinfo->vlog_info_latch_.unlock();
mark_del_value(sc);
vhandler->decre_access_thread_nums(); // FIXME: decrease thread nums
vhandler->vlog_latch_.hard_unlock();
}
void VlogSet::mark_del_value(const struct slot_content &sc)
功能: 标记 slot_content 对应的条目为删除并判断是否需要调用 GC
实现步骤:
具体实现如下:
void VlogSet::mark_del_value(const struct slot_content &sc) {
// 根据 sc.vlog_num 获取 vlog 文件信息
auto vinfo = get_vlog_info(sc.vlog_num);
// 调用 delete_vlog_value 函数删除 vlog 中的条目,并返回被删除的项的大小
auto value_size = delete_vlog_value(sc);
// handle gc, mtx is locked outside, vlog_info_latch and vlog hard lock is locked outside too
// 更新统计信息,包括 增加丢弃计数 discard, 减少值的数量 value_nums, 减少当前大小 curr_size,减去被删除的日志项的大小 value_size
vinfo->discard ++;
vinfo->value_nums --;
vinfo->curr_size -= value_size;
// FIXME: gc process, avoid repeated gc
// 判断是否需要触发垃圾回收
if (vlog_need_gc(sc.vlog_num) && !vinfo->processing_gc) {
// create new vlog
vinfo->processing_gc = true;
vinfo->vlog_num_for_gc = register_new_vlog();
// 启动垃圾回收过程
vlog_gc->do_gc(sc.vlog_num, vinfo->vlog_num_for_gc);
}
}
void VlogGC::do_gc(size_t old_vlog_num, size_t new_vlog_num)
功能: 启动垃圾回收过程
实现步骤:
具体实现如下:
void VlogGC::do_gc(size_t old_vlog_num, size_t new_vlog_num) {
// 判断old_vlog_num是否正在进行gc,如果是,直接返回
if (vlog_in_gc(old_vlog_num)) {
return ;
}
// 否则将当前old_vlog_num设置为正在gc
add_vlog_in_gc(old_vlog_num);
// 增加全局GC计数器,并获取当前的GC编号
gc_counter_increment();
size_t _gc_num_ = get_gc_num();
// 创建一个结构体 executor_param,包含当前对象指针、旧日志编号和新日志编号
struct executor_param ep = {this, old_vlog_num, new_vlog_num};
add_executor_params(_gc_num_, ep);
add_vlog_gc(_gc_num_, this);
// FIXME: 线程的信息必须被保存在函数栈之外,否则函数栈销毁之后,线程会报错exc_bad_access, 这里需要有一个gc_hanlder线程一直运行并处理各个gc请求
std::thread gc_thread([_gc_num_, this]() mutable {
auto _vlog_gc_ = get_vlog_gc(_gc_num_);
assert(_vlog_gc_ != nullptr);
_vlog_gc_->exec_gc(_gc_num_);
gc_counter_decrement();
});
gc_thread.detach();
}
void VlogGC::exec_gc(size_t gc_num_)
功能: 执行GC任务
实现步骤:
void VlogGC::exec_gc(size_t gc_num_) {
// FIXME: might break, due to unknown concurrency problem
// 线程数控制
curr_thread_nums_latch_.lock();
curr_thread_nums_ ++;
if (curr_thread_nums_ >= max_thread_nums_) {
full_latch_.lock();
}
curr_thread_nums_latch_.unlock();
// start gc process
auto ep = get_executor_params(gc_num_);
gc_executor::exec_gc(ep.vg, ep.old_vlog_num, ep.new_vlog_num);
// test_func(ep.vg, ep.old_vlog_num, ep.new_vlog_num);
curr_thread_nums_latch_.lock();
if (curr_thread_nums_ >= max_thread_nums_) {
full_latch_.unlock();
}
curr_thread_nums_ --;
curr_thread_nums_latch_.unlock();
// std::cout << "vlog_gc.cpp line 138" << std::endl;
del_executor_params(gc_num_);
// std::cout << "vlog_gc.cpp line 140" << std::endl;
del_vlog_gc(gc_num_);
// std::cout << "vlog_gc.cpp line 142" << std::endl;
del_vlog_in_gc(ep.old_vlog_num);
// FIXME: dead lock here (fixed, i think)
// FIXME: remove vlog physically
vlog_set->remove_old_vlog(ep.old_vlog_num);
}
void gc_executor::exec_gc(VlogGC *vlog_gc_, size_t old_vlog_num, size_t new_vlog_num)
功能: 执行GC任务
具体实现如下:
void gc_executor::exec_gc(VlogGC *vlog_gc_, size_t old_vlog_num, size_t new_vlog_num) {
// 从 vlog_gc_ 对象中提取旧 vlog 和新 vlog 的相关信息,这些信息用于后续的操作
auto vlog_set = vlog_gc_->vlog_set;
auto slot_page_ = vlog_gc_->slot_page_;
// 锁定互斥锁
vlog_set->mtx.lock();
// 获取旧 vlog 和新 vlog 的信息
auto old_vlog_name = vlog_set->get_vlog_name(old_vlog_num);
auto new_vlog_name = vlog_set->get_vlog_name(new_vlog_num);
auto old_vlog_info = vlog_set->get_vlog_info(old_vlog_num);
auto new_vlog_info = vlog_set->get_vlog_info(new_vlog_num);
auto old_vlog_handler = vlog_set->get_vlog_handler(old_vlog_num);
auto new_vlog_handler = vlog_set->get_vlog_handler(new_vlog_num);
// 锁机制
old_vlog_info->vlog_info_latch_.lock();
old_vlog_handler->vlog_latch_.soft_lock();
new_vlog_info->vlog_info_latch_.lock();
new_vlog_handler->vlog_latch_.hard_lock();
vlog_set->mtx.unlock();
// 打开旧 vlog 和新 vlog 文件
auto old_vlog = std::fstream(old_vlog_name, std::ios::in | std::ios::out);
auto new_vlog = std::fstream(new_vlog_name, std::ios::in | std::ios::out);
// char old_vlog_buff[VLOG_SIZE];
// char new_vlog_buff[VLOG_SIZE];
// 动态分配内存用于存储旧 vlog 和新 vlog 的内容,可以在内存中高效地处理日志数据
char *old_vlog_buff = static_cast<char*>(malloc(VLOG_SIZE));
char *new_vlog_buff = static_cast<char*>(malloc(VLOG_SIZE));
// 读取旧日志文件内容
old_vlog.seekp(0);
old_vlog.read(old_vlog_buff, VLOG_SIZE);
// 初始化参数:初始化偏移量和新 vlog 中的条目计数,为遍历做准备
size_t value_nums = old_vlog_info->value_nums;
size_t ovb_off = 2 * sizeof(size_t);
size_t nvb_off = 2 * sizeof(size_t);
size_t new_vlog_value_nums = 0;
// 遍历旧 vlog 中的 value 并逐个处理
for (auto i = 0; i < value_nums; i++) {
char *value = &old_vlog_buff[ovb_off];
uint16_t value_len = get_value_len(value);
size_t slot_num = get_value_slotnum(value);
// 如果当前 value 未被设置为删除,则将其复制到新 vlog 的缓冲区中,并更新相关参数
if (!value_deleted(value_len)) {
memcpy(&new_vlog_buff[nvb_off], &old_vlog_buff[ovb_off], value_len);
memcpy(&new_vlog_buff[nvb_off+sizeof(uint16_t)], &(new_vlog_info->vlog_num), sizeof(size_t));
struct slot_content scn(new_vlog_info->vlog_num, nvb_off);
slot_page_->set_slot(slot_num, &scn);
nvb_off += value_len;
new_vlog_value_nums ++;
}
ovb_off += value_len;
}
// 更新新 vlog 信息, 包括条目数量和当前大小
new_vlog_info->value_nums = new_vlog_value_nums;
new_vlog_info->curr_size = nvb_off;
// 写入将新 vlog 缓冲区中的内容写入新 vlog 文件
memcpy(new_vlog_buff, &nvb_off, sizeof(size_t));
memcpy(&new_vlog_buff[sizeof(size_t)], &new_vlog_value_nums, sizeof(size_t));
new_vlog.seekp(0);
new_vlog.write(new_vlog_buff, VLOG_SIZE);
new_vlog.flush();
free(old_vlog_buff);
free(new_vlog_buff);
old_vlog.close();
new_vlog.close();
old_vlog_info->vlog_valid_ = false;
old_vlog_info->vlog_info_latch_.unlock();
old_vlog_handler->vlog_latch_.soft_unlock();
new_vlog_info->vlog_info_latch_.unlock();
new_vlog_handler->vlog_latch_.hard_unlock();
// vlog_gc_->gc_counter_decrement();
}
void SlotPage::dealloc_slot(size_t slot_num)
功能: 释放 slot_num 中对应的 slot
实现步骤:
具体实现如下:
void dealloc_slot(size_t slot_num) {
mtx.lock();
const size_t byte = slot2byte(slot_num);
const size_t off = slot2offset(slot_num);
char *target_byte = get_bitmap_byte(byte);
assert(*target_byte & POSMASK(off));
RESETBIT(target_byte, off);
// set_bitmap_byte(byte, target_byte);
first_empty_slot = first_empty_slot < slot_num ? first_empty_slot:slot_num;
mtx.unlock();
}
测试流程:
测试代码:
代码文件为 /test/db_test3.cc
TEST(TestSchema, Basic) {
DB* db;
WriteOptions writeOptions;
ReadOptions readOptions;
if (!OpenDB("testdb_function", &db).ok()) {
std::cerr << "open db failed" << std::endl;
abort();
}
std::string key0 = "k_0";
std::string key1 = "k_1";
std::string key2 = "k_2";
std::string key3 = "k_3";
FieldArray fields0 = {{"name", "myc&wxf"}};
FieldArray fields1 = {
{"name", "Customer1"},
{"address", "IVhzIApeRb"},
{"phone", "25-989-741-2988"}
};
FieldArray fields2 = {
{"name", "Customer1"},
{"address", "ecnu"},
{"phone", "123456789"}
};
FieldArray fields3 = {
{"name", "Customer2"},
{"address", "ecnu"},
{"phone", "11111"}
};
db->Put_Fields(leveldb::WriteOptions(), key0, fields0);
db->Put_Fields(leveldb::WriteOptions(), key1, fields1);
db->Put_Fields(leveldb::WriteOptions(), key2, fields2);
db->Put_Fields(leveldb::WriteOptions(), key3, fields3);
FieldArray fields_ret_0;
FieldArray fields_ret_1;
FieldArray fields_ret_2;
FieldArray fields_ret_3;
db->Get_Fields(leveldb::ReadOptions(), key0, fields_ret_0);
db->Get_Fields(leveldb::ReadOptions(), key1, fields_ret_1);
db->Get_Fields(leveldb::ReadOptions(), key2, fields_ret_2);
db->Get_Fields(leveldb::ReadOptions(), key3, fields_ret_3);
// 检查反序列化结果
ASSERT_EQ(fields_ret_0.size(), fields0.size());
for (size_t i = 0; i < fields_ret_0.size(); ++i) {
ASSERT_EQ(fields_ret_0[i].name, fields0[i].name);
ASSERT_EQ(fields_ret_0[i].value, fields0[i].value);
}
ASSERT_EQ(fields_ret_1.size(), fields1.size());
for (size_t i = 0; i < fields_ret_1.size(); ++i) {
ASSERT_EQ(fields_ret_1[i].name, fields1[i].name);
// ASSERT_EQ(fields_ret_1[i].value, fields1[i].value);
}
// 测试查找功能
Field query_field = {"name", "Customer1"};
std::vector<std::string> found_keys = FindKeysByField(db, query_field);
// 删除查找到的第一个 key
const std::string& key = found_keys[0];
db->Delete(leveldb::WriteOptions(), key);
// 再次查找
std::vector<std::string> found_deleted_keys = FindKeysByField(db, query_field);
// 关闭数据库
delete db;
}
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
测试流程: 创建 10 个线程,5 个线程为写线程,5 个线程为读线程,每个写线程写入 100 条数据,每个读线程读取 100 条数据。
代码文件为 /test/db_test6.cc
leveldb结果:
KV 分离结果:
参数设置为:
constexpr int value_size = 2048;
constexpr int data_size = 512 << 20;`
CURRENT 内容为: MANIFEST-000920
写放大为:4232686 + 4236850 = 8465426
CURRENT 内容为: MANIFEST-000008
总结: 虽然我们的 KV 分离实现与原本的 leveldb 相比读写性能提升不大,甚至有一定幅度的下降,但我们的实现能大幅度降低数据库的写放大。
这里我们选择在内存中维护一个可线性扩展的bitmap。这个bitmap中每一个bit标识了当前slot_page文件中对应slot是否被使用,是为1,不是为0。这样一来,在插入新kv时,我们可以用bitmap来分配一个新的slot(将bitmap中第一个为0的bit设置为1),将内容进行写入;在GC删除某个kv时,我们将这个slot对应的bitmap中的bit重置为0即可。
功能 | 完成日期 | 分工 |
---|---|---|
Field相关接口实现 | 12.8 | 王雪飞 |
value_log中value的存储格式 | 12.8 | 王雪飞 |
slot_page 相关接口 | 12.8 | 马也驰 |
slot_page 实现 | 12.8 | 马也驰 |
修改leveldb的接口实现字段功能 | 12.17 | 王雪飞 |
vlog的GC实现 | 12.29 | 马也驰 |
vlog_cache 实现 | 12.29 | 马也驰 |
性能测试 | 1.5 | 王雪飞, 马也驰 |
功能测试 | 1.5 | 王雪飞, 马也驰 |
写实验报告 | 1.5 | 王雪飞, 马也驰 |