|
|
@ -164,6 +164,8 @@ std::vector FindKeysByField(leveldb::DB* db, const Field& field) { |
|
|
|
+ a. 将LevelDB的key-value存储结构进行扩展,分离存储key和value |
|
|
|
+ b. Key存储在一个LevelDB实例中,LSM-tree中的value为一个指向Value log文件和偏移地址的指针,用户Value存储在Value log中。 |
|
|
|
|
|
|
|
实现方法 |
|
|
|
|
|
|
|
**数据结构设计:** |
|
|
|
|
|
|
|
`memtable中:| key | slot_num | ` |
|
|
@ -174,7 +176,9 @@ std::vector FindKeysByField(leveldb::DB* db, const Field& field) { |
|
|
|
2. 读取操作 |
|
|
|
+ a. KV分离后依然支持点查询与范围查询操作。 |
|
|
|
|
|
|
|
**读写操作:** |
|
|
|
实现方法 |
|
|
|
|
|
|
|
**读操作:** |
|
|
|
|
|
|
|
`Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key, const FieldArray& fields)` |
|
|
|
|
|
|
@ -184,7 +188,6 @@ std::vector FindKeysByField(leveldb::DB* db, const Field& field) { |
|
|
|
|
|
|
|
**步骤:** |
|
|
|
1. 为当前 KV 对分配一个 size_t 类型的 slot_num; |
|
|
|
2. 将 slot_num 转化为字符串形式 slot_num_str; |
|
|
|
3. 调用 SerializeValue 函数将字段数组和 slot_num_str 序列化为字符串 serialized_value; |
|
|
|
4. 实例化 slot_content 结构体 sc; |
|
|
|
5. 调用 put_value 函数,以 sc 中的 vlog_num(vlog编号) 和 value_offset(在vlog中的偏移量) 为参数,将字符串 serialized_value 写入 vlog 中; |
|
|
@ -196,22 +199,181 @@ std::vector FindKeysByField(leveldb::DB* db, const Field& field) { |
|
|
|
Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key, |
|
|
|
const FieldArray& fields) { |
|
|
|
std::string serialized_value; |
|
|
|
// 分配一个 slot 下标 |
|
|
|
// alloc_slot 函数作用:分配一个 slot_num |
|
|
|
size_t slot_num = slot_page_->alloc_slot(); |
|
|
|
// 将 fields 序列化为字符串 |
|
|
|
// 调用 SerializeValue 函数将字段数组和 slot_num_str 序列化为字符串 serialized_value |
|
|
|
SerializeValue(fields, serialized_value, slot_num); |
|
|
|
// 实例化 slot_content 结构体 sc |
|
|
|
struct slot_content sc; |
|
|
|
// 将序列化后的字符串插入 value_log 中 |
|
|
|
// put_value函数作用:将序列化后的字符串 serialized_value 插入 value_log 中 |
|
|
|
vlog_set_->put_value(sc, slot_num, serialized_value); |
|
|
|
// set_slot函数作用: 将 slot_num 写入到 缓冲块中 |
|
|
|
slot_page_->set_slot(slot_num, &sc); |
|
|
|
|
|
|
|
// 将 slot_num 作为 value 插入 memtable 中 |
|
|
|
char data[sizeof(size_t)]; |
|
|
|
memcpy(data, &slot_num, sizeof(size_t)); |
|
|
|
Slice slot_val(data, sizeof(data)); |
|
|
|
// 将 slot_num 作为 value 插入 memtable 中 |
|
|
|
return DB::Put(opt, key, slot_val); |
|
|
|
} |
|
|
|
```` |
|
|
|
`size_t alloc_slot()` |
|
|
|
|
|
|
|
**功能:** 分配一个 slot_num |
|
|
|
|
|
|
|
**实现步骤:** |
|
|
|
1. 获取互斥锁; |
|
|
|
2. 判断当前 bitmap 是否有空闲槽位,就是遍历 bitmap,找到第一个为 0 的位,然后设置该位为 1,返回该位对应的 slot_num。 |
|
|
|
**具体实现如下:** |
|
|
|
```` |
|
|
|
size_t alloc_slot() { |
|
|
|
// 获取互斥锁 |
|
|
|
mtx.lock(); |
|
|
|
size_t target_slot = first_empty_slot; |
|
|
|
char *start_byte = get_bitmap_byte(slot2byte(first_empty_slot)); |
|
|
|
const size_t off = slot2offset(first_empty_slot); |
|
|
|
SETBIT(start_byte, off); |
|
|
|
// find the next free slot |
|
|
|
if (HASFREESLOT(*start_byte)) { |
|
|
|
auto bit_off = find_first_free_slot_inbyte(*start_byte); |
|
|
|
first_empty_slot += bit_off - off; |
|
|
|
if (slot2byte(first_empty_slot) >= size) { |
|
|
|
alloc_new_bitmap(); |
|
|
|
} |
|
|
|
} else { |
|
|
|
size_t i; |
|
|
|
for (i = slot2byte(first_empty_slot)+1; i < size; i++) { |
|
|
|
char *byte = get_bitmap_byte(i); |
|
|
|
if (HASFREESLOT(*byte)) { |
|
|
|
// FIXME: pack four bytes to do free slot finding |
|
|
|
auto bit_off = find_first_free_slot_inbyte(*byte); |
|
|
|
first_empty_slot = byte2slot(i) + bit_off; |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
// scale the bitmap |
|
|
|
if (i >= size) { |
|
|
|
alloc_new_bitmap(); |
|
|
|
// char *byte = get_bitmap_byte(i); |
|
|
|
// SETBIT(byte, 0); |
|
|
|
first_empty_slot = byte2slot(i) + 1; |
|
|
|
} |
|
|
|
} |
|
|
|
mtx.unlock(); |
|
|
|
return target_slot; |
|
|
|
} |
|
|
|
```` |
|
|
|
`void set_slot(size_t slot_num, struct slot_content *sc)` |
|
|
|
|
|
|
|
**功能:** 将一个槽位的内容设置到缓存块中 |
|
|
|
|
|
|
|
**实现步骤:** |
|
|
|
1. 计算块编号:通过 slotnum_hash2_blocknum 函数将槽位编号转换为块编号 |
|
|
|
2. 确定缓存块位置:使用块编号对 BLOCK_NUM 取模,得到缓存块的位置 |
|
|
|
3. 加锁:对目标缓存块加锁以确保线程安全 |
|
|
|
4. 检查和更新缓存块 |
|
|
|
5. 设置槽位内容:调用 set_slot 函数设置槽位内容 |
|
|
|
6. 更新访问时间和脏标志:增加访问时间并标记为脏数据 |
|
|
|
7. 解锁:释放锁 |
|
|
|
**具体实现如下:** |
|
|
|
```` |
|
|
|
void set_slot(size_t slot_num, struct slot_content *sc) { |
|
|
|
auto block_num = slotnum_hash2_blocknum(slot_num); |
|
|
|
auto blockcache_num = block_num % BLOCK_NUM; |
|
|
|
latches_[blockcache_num].lock(); |
|
|
|
if (!info[blockcache_num].used || info[blockcache_num].block_num != block_num) { |
|
|
|
if (info[blockcache_num].is_dirty) { |
|
|
|
write_back_block(blockcache_num); |
|
|
|
} |
|
|
|
read_in_block(blockcache_num, block_num); |
|
|
|
access_time[blockcache_num] = 0; |
|
|
|
info[blockcache_num] = block_info(block_num, false, true); |
|
|
|
} |
|
|
|
set_slot(sc, blockcache_num, SLOT_OFFSET_IN_BLOCK(slot_num)); |
|
|
|
access_time[blockcache_num]++; |
|
|
|
info[blockcache_num].is_dirty = true; |
|
|
|
latches_[blockcache_num].unlock(); |
|
|
|
} |
|
|
|
```` |
|
|
|
|
|
|
|
`void VlogSet::put_value(struct slot_content &sc, size_t slot_num, const leveldb::Slice &value)` |
|
|
|
|
|
|
|
**功能:** 将序列化后的字符串 serialized_value 插入 value_log 中,位置由 slot_content 确定 |
|
|
|
|
|
|
|
**输入:** 待插入的字符串 value,slot_num,slot_content |
|
|
|
|
|
|
|
**具体实现如下:** |
|
|
|
```` |
|
|
|
void VlogSet::put_value(struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) { |
|
|
|
mtx.lock(); |
|
|
|
// 根据值的大小获取可写入的vlog信息 |
|
|
|
auto vinfo = get_writable_vlog_info(value.size()); |
|
|
|
if (!vinfo) { |
|
|
|
// vlog全部已满,创建新的vlog |
|
|
|
auto _vlog_num_ = register_new_vlog(); |
|
|
|
vinfo = get_vlog_info(_vlog_num_); |
|
|
|
} |
|
|
|
// 锁定 vlog 信息,更新 slot_content 内容 |
|
|
|
vinfo->vlog_info_latch_.lock(); |
|
|
|
sc.vlog_num = vinfo->vlog_num; |
|
|
|
sc.value_offset = vinfo->curr_size; |
|
|
|
// 更新 vlog 内容,包括当前大小 curr_size 和存储的 value 个数 |
|
|
|
vinfo->curr_size += value.size() + sizeof(uint16_t) + sizeof(size_t); |
|
|
|
vinfo->value_nums ++; |
|
|
|
// 根据 vlog 编号获取 vlog 处理器 |
|
|
|
auto vhandler = get_vlog_handler(vinfo->vlog_num); |
|
|
|
// 如果 vlog 无效或者正在进行GC,则使用 vlog_num_for_gc |
|
|
|
if (!vinfo->vlog_valid_ || vinfo->processing_gc) { |
|
|
|
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); |
|
|
|
} |
|
|
|
// 加锁 |
|
|
|
vhandler->vlog_latch_.hard_lock(); |
|
|
|
// 增加访问线程数 |
|
|
|
vhandler->incre_access_thread_nums(); // FIXME: increase thread nums |
|
|
|
mtx.unlock(); // for better performance |
|
|
|
// vlog 信息写入完毕,解锁 |
|
|
|
vinfo->vlog_info_latch_.unlock(); |
|
|
|
// 调用 write_vlog_value 函数,将字符串 serialized_value 写入 vlog 中 |
|
|
|
write_vlog_value(sc, slot_num, value); |
|
|
|
// 写入完毕,减少访问线程数 |
|
|
|
vhandler->decre_access_thread_nums(); // FIXME: decrease thread nums |
|
|
|
// 解锁 |
|
|
|
vhandler->vlog_latch_.hard_unlock(); |
|
|
|
} |
|
|
|
```` |
|
|
|
`void VlogSet::write_vlog_value(const struct slot_content &sc, size_t slot_num, const leveldb::Slice &value)` |
|
|
|
|
|
|
|
**功能:** 将字符串 value 写入 vlog 中 |
|
|
|
|
|
|
|
**输入:** slot_content,slot_num,字符串 value |
|
|
|
|
|
|
|
**实现步骤:** |
|
|
|
|
|
|
|
**具体实现如下:** |
|
|
|
```` |
|
|
|
void VlogSet::write_vlog_value(const struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) { |
|
|
|
// 函数 get_vlog_name 作用:获取 slot_content 中 vlog_num 对应的 vlog 名称 |
|
|
|
auto vlog_name = get_vlog_name(sc.vlog_num); |
|
|
|
// 打开文件:使用 fstream 打开文件,确保文件以读写模式打开 |
|
|
|
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); |
|
|
|
// 定位写入位置:通过 seekp 方法将文件指针移动到 slot_content 中的 value_offset 位置 |
|
|
|
handler.seekp(sc.value_offset); |
|
|
|
// 准备数据:构造要写入的数据,包括值大小(uint16_t)、slot_num(size_t)和实际值内容(Slice) |
|
|
|
const char *value_buff = value.data(); |
|
|
|
|
|
|
|
const size_t off = sizeof(uint16_t) + sizeof(size_t); |
|
|
|
const size_t value_size = off + value.size(); |
|
|
|
char data[value_size]; |
|
|
|
memcpy(data, &value_size, sizeof(uint16_t)); |
|
|
|
memcpy(data+sizeof(uint16_t), &slot_num, sizeof(size_t)); |
|
|
|
memcpy(data+off, value_buff, value.size()); |
|
|
|
|
|
|
|
handler.write(data, value_size); |
|
|
|
// 刷新缓冲区:调用 flush 方法确保数据写入磁盘 |
|
|
|
handler.flush(); |
|
|
|
// 关闭文件 |
|
|
|
handler.close(); |
|
|
|
} |
|
|
|
```` |
|
|
|
|
|
|
|
`Status DBImpl::Get_Fields(const ReadOptions& options, const Slice& key,FieldArray& fields)` |
|
|
|
|
|
|
@ -224,16 +386,16 @@ return DB::Put(opt, key, slot_val); |
|
|
|
读取流程 |
|
|
|
1. 读取 key 对应的 slot_num |
|
|
|
2. 实例化 slot_content 结构体 sc |
|
|
|
3. 根据 slot_num 从 slot_page_ 中读取 slot_content |
|
|
|
4. 利用 sc 中的 vlog_num(vlog编号) 和 value_offset(在vlog中的偏移量) 从 vlog 中读取字符串 |
|
|
|
5. 将字符串进行解码得到 value |
|
|
|
3. 调用 get_slot 函数,根据 slot_num 从缓存中获取 slot_content |
|
|
|
4. 调用 get_value 函数,根据 sc 中的 vlog_num(vlog编号) 和 value_offset(在vlog中的偏移量) 从 vlog 中读取字符串 |
|
|
|
5. 将字符串解码得到 value |
|
|
|
|
|
|
|
**代码实现:** |
|
|
|
```` |
|
|
|
Status DBImpl::Get_Fields(const ReadOptions& options, const Slice& key, |
|
|
|
FieldArray& fields) { |
|
|
|
size_t slot_num; |
|
|
|
// 从 memtable 中读取 key 对应的 slot_num |
|
|
|
// get_slot_num 函数作用:从 memtable 中读取 key 对应的 slot_num |
|
|
|
auto s = get_slot_num(options, key, &slot_num); |
|
|
|
if (!s.ok()) { |
|
|
|
return s; |
|
|
@ -241,9 +403,9 @@ return s; |
|
|
|
|
|
|
|
struct slot_content sc; |
|
|
|
std::string vlog_value; |
|
|
|
// 根据 slot_num 获取 slot_page_ 中的信息 |
|
|
|
// get_slot 函数作用:根据 slot_num 从缓存中获取 slot_content,并存放到 sc 中 |
|
|
|
slot_page_->get_slot(slot_num, &sc); |
|
|
|
// 根据 slot_page_ 中的信息,从 value_log 中读取字符串并存放到 vlog_value |
|
|
|
// get_value 函数作用:根据 sc 中的信息,从 value_log 中读取字符串并存放到 vlog_value |
|
|
|
vlog_set_->get_value(sc, &vlog_value); |
|
|
|
if (vlog_value.empty()) { |
|
|
|
return Status::NotFound("value has been deleted"); |
|
|
@ -253,6 +415,111 @@ DeserializeValue(fields, vlog_value); |
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
```` |
|
|
|
`void get_slot(size_t slot_num, struct slot_content *sc)` |
|
|
|
|
|
|
|
**功能:** 获取 slot_num 对应的 slot_content |
|
|
|
|
|
|
|
**实现步骤:** |
|
|
|
1. 计算块编号:根据槽位号计算出对应的块编号。 |
|
|
|
2. 锁定缓存块:通过哈希计算确定缓存块编号,并加锁以确保线程安全。 |
|
|
|
3. 检查缓存命中:如果缓存未使用或块编号不匹配,则处理缓存未命中情况。 |
|
|
|
4. 写回脏数据:如果缓存块是脏数据,先将其写回磁盘。 |
|
|
|
5. 读取新块:从磁盘读取新的块到缓存,并更新访问时间和块信息。 |
|
|
|
6. 读取槽位内容:从缓存块中读取指定槽位的内容。 |
|
|
|
7. 解锁缓存块:操作完成后解锁。 |
|
|
|
**具体实现如下:** |
|
|
|
```` |
|
|
|
void get_slot(size_t slot_num, struct slot_content *sc) { |
|
|
|
auto block_num = slotnum_hash2_blocknum(slot_num); |
|
|
|
auto blockcache_num = block_num % BLOCK_NUM; |
|
|
|
latches_[blockcache_num].lock(); |
|
|
|
if (!info[blockcache_num].used || info[blockcache_num].block_num != block_num) { // cache miss |
|
|
|
if (info[blockcache_num].is_dirty) { |
|
|
|
write_back_block(blockcache_num); |
|
|
|
} |
|
|
|
read_in_block(blockcache_num, block_num); |
|
|
|
access_time[blockcache_num] = 0; |
|
|
|
info[blockcache_num] = block_info(block_num, false, true); |
|
|
|
} |
|
|
|
read_slot(sc, blockcache_num, SLOT_OFFSET_IN_BLOCK(slot_num)); |
|
|
|
access_time[blockcache_num]++; |
|
|
|
latches_[blockcache_num].unlock(); |
|
|
|
} |
|
|
|
```` |
|
|
|
|
|
|
|
`void VlogSet::get_value(const struct slot_content &sc, std::string *value)` |
|
|
|
|
|
|
|
**功能:** 从 vlog 中读取字符串 |
|
|
|
|
|
|
|
**实现步骤:** |
|
|
|
1. 获取 vlog_num 和 vlog_handler |
|
|
|
|
|
|
|
**具体实现如下:** |
|
|
|
```` |
|
|
|
void VlogSet::get_value(const struct slot_content &sc, std::string *value) { |
|
|
|
// 获取互斥锁 |
|
|
|
mtx.lock(); |
|
|
|
// get_vlog_info 函数作用:根据 sc 中的 vlog_num 获取 vlog_info |
|
|
|
auto vinfo = get_vlog_info(sc.vlog_num); |
|
|
|
// get_vlog_handler 函数作用:根据 sc 中的 vlog_num 获取 vlog_handler |
|
|
|
auto vhandler = get_vlog_handler(sc.vlog_num); |
|
|
|
// 加 vlog 信息锁 |
|
|
|
vinfo->vlog_info_latch_.lock(); |
|
|
|
// 根据 vinfo 检查 vlog 是否有效 |
|
|
|
if (!vinfo->vlog_valid_) { |
|
|
|
// 如果无效,则进行垃圾处理 |
|
|
|
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); |
|
|
|
} |
|
|
|
// 加锁 |
|
|
|
vhandler->vlog_latch_.soft_lock(); |
|
|
|
// 增加访问线程数 |
|
|
|
vhandler->incre_access_thread_nums(); // FIXME: increase thread nums |
|
|
|
// 释放互斥锁和 vlog 信息锁 |
|
|
|
mtx.unlock(); // for better performance |
|
|
|
vinfo->vlog_info_latch_.unlock(); |
|
|
|
// read_vlog_value 函数作用:根据 sc 中的 vlog_num 和 value_offset 从 vlog 中读取字符串 |
|
|
|
read_vlog_value(sc, value); |
|
|
|
// 减少访问线程数 |
|
|
|
vhandler->decre_access_thread_nums(); // FIXME: decrease thread nums |
|
|
|
// 释放 vlog 锁 |
|
|
|
vhandler->vlog_latch_.soft_unlock(); |
|
|
|
} |
|
|
|
```` |
|
|
|
`void VlogSet::read_vlog_value(const struct slot_content &sc, std::string *value)` |
|
|
|
|
|
|
|
**功能:** 根据 sc 中的 vlog_num 和 value_offset 从 vlog 中读取字符串并存放到 value |
|
|
|
|
|
|
|
**实现步骤:** |
|
|
|
1. 根据 sc 中的 vlog_num 获取 vlog 文件名 |
|
|
|
**具体实现如下:** |
|
|
|
```` |
|
|
|
void VlogSet::read_vlog_value(const struct slot_content &sc, std::string *value) { |
|
|
|
// 根据 sc 中的 vlog_num 获取 vlog 文件名 |
|
|
|
auto vlog_name = get_vlog_name(sc.vlog_num); |
|
|
|
// 打开 vlog 文件 |
|
|
|
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); |
|
|
|
// 使用 seekp 方法将文件指针定位到 value_offset 指定的位置 |
|
|
|
handler.seekp(sc.value_offset); |
|
|
|
// 从文件中读取固定大小的数据到缓冲区 value_buff |
|
|
|
char value_buff[VALUE_BUFF_SIZE]; |
|
|
|
handler.read(value_buff, VALUE_BUFF_SIZE); |
|
|
|
// 从缓冲区中提取值的大小,并检查是否被删除标记 |
|
|
|
uint16_t value_size; |
|
|
|
memcpy(&value_size, value_buff, sizeof(uint16_t)); |
|
|
|
// 如果值带有删除标记,则将结果字符串设置为空并返回 |
|
|
|
if (value_size & VALUE_DELE_MASK) { |
|
|
|
*value = ""; |
|
|
|
return ; |
|
|
|
} |
|
|
|
// 计算实际值的大小并从缓冲区中提取值,存储到结果字符串中 |
|
|
|
value_size &= VALUE_SIZE_MASK; |
|
|
|
assert(value_size <= VALUE_BUFF_SIZE); |
|
|
|
const size_t off = sizeof(uint16_t)+sizeof(size_t); |
|
|
|
*value = std::string(&value_buff[off], value_size-off); |
|
|
|
// 关闭文件 |
|
|
|
handler.close(); |
|
|
|
} |
|
|
|
```` |
|
|
|
`Status DBImpl::Delete(const WriteOptions& options, const Slice& key)` |
|
|
|
|
|
|
|
**功能:** 删除 key 对应的条目 |
|
|
@ -290,7 +557,7 @@ return DB::Delete(options, key); |
|
|
|
4. 确保操作的原子性 |
|
|
|
|
|
|
|
**锁机制:** |
|
|
|
|
|
|
|
[`/db/shared_lock.h`](./db/shared_lock.h) 定义了一个 SharedLock 类,用于实现读写锁机制,包含四种操作:soft_lock():获取共享读锁,确保在没有写操作时允许多个读操作并发进行;soft_unlock():释放共享读锁;hard_lock():获取独占写锁,确保只有当没有其他读写操作时,允许写入操作进行;hard_unlock():释放独占写锁。 |
|
|
|
#### 2.2.1 实验内容 |
|
|
|
+ 1) 不改变LevelDB原有的接口,实现KV分离。 |
|
|
|
+ 2) 编写测试点验证KV分离是否正确实现。 |
|
|
@ -443,4 +710,11 @@ int main(int argc, char** argv) { |
|
|
|
| 修改leveldb的接口实现字段功能 | 12.17 | 王雪飞 | |
|
|
|
| vlog的GC实现 | 12.29 | 马也驰 | |
|
|
|
| 性能测试 | 1.5 | 王雪飞, 马也驰 | |
|
|
|
| 功能测试 | 1.5 | 王雪飞, 马也驰 | |
|
|
|
| 功能测试 | 1.5 | 王雪飞, 马也驰 | |
|
|
|
|
|
|
|
报告待完成部分: |
|
|
|
+ alloc_slot() set_slot() get_slot() |
|
|
|
+ gc过程 |
|
|
|
+ slot_page 管理,value_log 管理 |
|
|
|
+ 性能测试 |
|
|
|
+ 功能测试 |