Browse Source

Merge remote-tracking branch 'origin/master'

master
马也驰 8 months ago
parent
commit
0be1446e39
8 changed files with 89 additions and 120 deletions
  1. +1
    -1
      db/gc_executor.h
  2. +1
    -0
      db/slotpage.h
  3. +22
    -1
      db/vlog.h
  4. +14
    -1
      db/vlog_gc.h
  5. +6
    -0
      db/vlog_set.cpp
  6. +44
    -33
      report.md
  7. +1
    -1
      test/db_test1.cc
  8. +0
    -83
      test/db_test3.cc

+ 1
- 1
db/gc_executor.h View File

@ -5,10 +5,10 @@
#ifndef LEVELDB_GC_EXECUTOR_H
#define LEVELDB_GC_EXECUTOR_H
#include <cstdint>
#include <cstdlib>
#include <string>
class VlogGC;
class gc_executor {

+ 1
- 0
db/slotpage.h View File

@ -12,6 +12,7 @@
#include <iostream>
#include <fstream>
#include <mutex>
#include <cstring>
#include <iostream>
#include <unordered_map>

+ 22
- 1
db/vlog.h View File

@ -68,8 +68,29 @@ struct vlog_info {
};
struct vlog_handler {
std::mutex vlog_handler_latch_;
size_t curr_access_thread_nums;
SharedLock vlog_latch_; // vlog上的并发情况soft_lockhard_lock
vlog_handler() {}
vlog_handler() : curr_access_thread_nums(0) {}
inline void incre_access_thread_nums() {
vlog_handler_latch_.lock();
curr_access_thread_nums ++;
vlog_handler_latch_.unlock();
}
inline void decre_access_thread_nums() {
vlog_handler_latch_.lock();
curr_access_thread_nums --;
vlog_handler_latch_.unlock();
}
inline bool non_access_thread() {
bool flag = false;
vlog_handler_latch_.lock();
if (!curr_access_thread_nums) {
flag = true;
}
vlog_handler_latch_.unlock();
return flag;
}
};

+ 14
- 1
db/vlog_gc.h View File

@ -8,6 +8,8 @@
#include <string>
#include <mutex>
#include <thread>
#include <chrono>
#include "../db/slotpage.h"
#include "../db/vlog.h"
#include "../db/gc_executor.h"
@ -22,7 +24,18 @@ friend class gc_executor;
public:
VlogGC(SlotPage *s, VlogSet *vs) : slot_page_(s), vlog_set(vs),max_thread_nums_(THREAD_NUM),
curr_thread_nums_(0), gc_num(0) {}
~VlogGC() {}
~VlogGC() {
while (true) {
curr_thread_nums_latch_.lock();
if (curr_thread_nums_) {
curr_thread_nums_latch_.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(2));
} else {
curr_thread_nums_latch_.unlock();
break;
}
}
}
void do_gc(size_t old_vlog_num, size_t new_vlog_num);

+ 6
- 0
db/vlog_set.cpp View File

@ -107,9 +107,11 @@ void VlogSet::get_value(const struct slot_content &sc, std::string *value) {
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc);
}
vhandler->vlog_latch_.soft_lock();
vhandler->incre_access_thread_nums(); // FIXME: increase thread nums
mtx.unlock(); // for better performance
vinfo->vlog_info_latch_.unlock();
read_vlog_value(sc, value);
vhandler->decre_access_thread_nums(); // FIXME: decrease thread nums
vhandler->vlog_latch_.soft_unlock();
}
@ -154,9 +156,11 @@ void VlogSet::put_value(struct slot_content &sc, size_t slot_num, const leveldb:
}
vhandler->vlog_latch_.hard_lock();
vhandler->incre_access_thread_nums(); // FIXME: increase thread nums
mtx.unlock(); // for better performance
vinfo->vlog_info_latch_.unlock();
write_vlog_value(sc, slot_num, value);
vhandler->decre_access_thread_nums(); // FIXME: decrease thread nums
vhandler->vlog_latch_.hard_unlock();
}
@ -171,9 +175,11 @@ void VlogSet::del_value(const struct slot_content &sc) {
}
vhandler->vlog_latch_.hard_lock();
vhandler->decre_access_thread_nums(); // FIXME: increase thread nums
mtx.unlock(); // for better performance
vinfo->vlog_info_latch_.unlock();
mark_del_value(sc);
vhandler->decre_access_thread_nums(); // FIXME: decrease thread nums
vhandler->vlog_latch_.hard_unlock();
}

+ 44
- 33
report.md View File

@ -25,14 +25,14 @@
具体指:基于 levelDB扩展 value 的结构,使其可以包含多个字段,并通过这些字段实现类似数据库列查询的功能。
#### 2.1.1 实验要求:
字段存储:
1. 将 LevelDB 中的 value 组织成字段数组,每个数组元素对应一个字段(字段名:字段值)。
2. 字段会被序列化为字符串,然后插入LevelDB。
3. 这些字段可以通过解析字符串得到,字段名与字段值都是字符串类型。
4. 允许任意调整字段。
1. 字段存储:
+ 将 LevelDB 中的 value 组织成字段数组,每个数组元素对应一个字段(字段名:字段值)。
+ 字段会被序列化为字符串,然后插入LevelDB。
+ 这些字段可以通过解析字符串得到,字段名与字段值都是字符串类型。
+ 允许任意调整字段。
查询功能:
实现通过字段值查询对应的 key。
2. 查询功能:
+ 实现通过字段值查询对应的 key。
#### 2.1.2 实验内容
1. 数据存储与解析: 每个 value 存储为一个字符串数组,数组中的每个元素代表一个字段。
@ -41,7 +41,7 @@
**设计思路:**
1. 使用 Field 存储属性和值,使用 FieldArray 存储多个 Field;
2. 函数 SerializeValue 把字段数组序列化为字符串;
3. 函数 ParseValue 把字符串反序列化为字段数组;
3. 函数 DeserializeValue 把字符串反序列化为字段数组;
4. 函数 FindKeysByField 根据传入的字段名和字段的值找到对应的key。
### 2.1.3 实验进度以及实验结果
#### 实验进度
@ -62,9 +62,9 @@
对于vlog文件,我们在内存中维护一个bitmap,用来表示每一个slot的使用情况,并在插入和GC删除kv时进行动态的分配和释放。对于vlog文件的GC,我们用一个后台线程来扫描所有vlog的discard计数器。当某些vlog的discard计数器超过某个阈值(比如1024),我们就对这些vlog文件进行GC过程,当GC完成之后将slot_page中的slot元数据进行更新,再将原来的vlog文件进行删除,GC过程就完成了。
##### 2.2.1 相关代码文件
- [`/db/db_impl.cc`](./db/db_impl.cc): 修改 DBImpl::Get, DBImpl::Put 和 DBImpl::Delete
- [`/db/db_impl.h`](./db/db_impl.h): 添加两个结构体 SlotPage *slot_page_; VlogSet *vlog_set_;
-
- [`/db/db_impl.cc`](./db/db_impl.cc): 修改函数 DBImpl::Get, DBImpl::Put 和 DBImpl::Delete,添加函数 Put_fields, Get_fields, get_slot_num,SerializeValue, DeserializeValue
- [`/db/db_impl.h`](./db/db_impl.h): 添加两个结构体 SlotPage *slot_page_; VlogSet *vlog_set_ ,添加增加的相关函数的声明
-
- [`/db/shared_lock.h`](./db/shared_lock.h) 定义了一个 SharedLock 类,用于实现读写锁机制,包含四种操作:soft_lock():获取共享读锁,确保在没有写操作时允许多个读操作并发进行;soft_unlock():释放共享读锁;hard_lock():获取独占写锁,确保只有当没有其他读写操作时,允许写入操作进行;hard_unlock():释放独占写锁。
- [`/db/slotpage.h`](./db/slotpage.h)
- [`/db/threadpool.h`](./db/threadpool.h)
@ -73,37 +73,47 @@
- [`/db/vlog_gc.h`](./db/vlog_gc.h)
- [`/db/vlog_set.cpp`](./db/vlog_set.cpp)
- [`/db/vlog_set.h`](./db/vlog_set.h)
-
-
- [`/test/db_test3.cc`](./test/db_test3.cc):测试 value 的字段功能
- [`/test/db_test4.cc`](./test/db_test4.cc)
- [`/test/db_test5.cc`](./test/db_test5.cc)
-
-
- [`CMakeLists.txt`](CMakeLists.txt):添加可执行文件
##### 2.2.1 具体流程
写入流程
````
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key,
const FieldArray& fields) {
// TODO(begin): allocate slot_num in slotpage and put value in vlog
// 将字段数组序列化
size_t slot_num = slot_page_->alloc_slot();
std::string slot_num_str((char *)&slot_num, sizeof(size_t));
// size_t slot_num_str_num;
// std::memcpy(&slot_num_str_num, slot_num_str.c_str(), sizeof(size_t));
// std::cout << "slot_num_str_num: " << slot_num_str_num << std::endl;
std::string serialized_value = SerializeValue(fields, slot_num_str);
// std::cout << "Put_Fields: " << key.ToString() << " " << serialized_value << std::endl;
struct slot_content sc;
vlog_set_->put_value(&sc.vlog_num, &sc.value_offset, val);
vlog_set_->put_value(&sc.vlog_num, &sc.value_offset, serialized_value);
slot_page_->set_slot(slot_num, &sc);
char data[sizeof(size_t)];
memcpy(data, &slot_num, sizeof(size_t));
Slice slot_val(data, sizeof(data));
return DB::Put(o, key, slot_val);
// return DB::Put(opt, key, slot_val);
return DB::Put(opt, key, serialized_value);
// TODO(end)
}
}
````
1. 调用编码函数,将 val 编码为字符串
2. 在 slot_page_ 中为 K-V 对分配一个 slot ,编号为 slot_num
3. 实例化 slot_content 结构体 sc
4. 以 sc 中的 vlog_num(vlog编号) 和 value_offset(在vlog中的偏移量) 为参数,将字符串写入 vlog 中
5. 将 slot_content 中的内容赋值给 slot_num
6. 将 slot_num 作为 key 的 value 写入数据库中
1. 为当前 KV 对分配一个 size_t 类型的 slot_num;
2. 将 slot_num 转化为字符串形式 slot_num_str;
3. 调用 SerializeValue 函数将字段数组和 slot_num_str 序列化为字符串 serialized_value;
4. 实例化 slot_content 结构体 sc;
5. 调用 put_value 函数,以 sc 中的 vlog_num(vlog编号) 和 value_offset(在vlog中的偏移量) 为参数,将字符串 serialized_value 写入 vlog 中;
6. 调用 set_slot 函数,将 slot_content 中的内容赋值给 slot_num;
7. 将 slot_num 作为 value 写入数据库中;
读取流程
````
@ -146,7 +156,7 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
`slot_page: | slot0:{vlog_no(定长), offset(定长)}, slot1:{vlog_no, offset}, ... | `
`value 的格式:| attr个数(定长) | attr1_name的长度(定长) | attr1_name(变长) | attr1_value的长度(定长) | attr1_value(变长) | ... |`
`value 的格式:|value 长度 | slot_num | attr个数(定长) | attr1_name的长度(定长) | attr1_name(变长) | attr1_value的长度(定长) | attr1_value(变长) | ... |`
对于每一次读取,用户线程先读取lsm tree中key的slot_num下标,然后到slot_page中读取对应的slot内容(**每一个slot都是定长的**),之后再在这个slot中读取value所在的vlog文件号和偏移量offset,之后到对应的vlog文件中读取value。
@ -155,15 +165,15 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
### 4. 接口设计
#### 4.1 在 LevelDB 的 value 中实现字段功能
1. std::string SerializeValue(const FieldArray& fields)
1. std::string SerializeValue(const FieldArray& fields, std::string slot_num_str)
**功能:** 将字段数组序列化为字符串
**功能:** 将字段数组和 slot_num_str 序列化为字符串
**输入:** 字段名和字段的值组成的字段数组
**输入:** 字段名和字段的值组成的字段数组 和 slot_num_str,即为该 KV 对分配的 slot_num 的字符串形式
**输出:** 序列化后的字符串
2. FieldArray ParseValue(const std::string& value_str)
2. FieldArray DeserializeValue(const std::string& value_str)
**功能:** 将字符串反序列化为字段数组
@ -179,15 +189,16 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
**输出:** 包含该字段和字段数组的 key,由于可能不只有一个,所以返回值为 vector
4. Put_Fields (待实现)
4. Put_Fields(const WriteOptions& opt, const Slice& key,
const FieldArray& fields)
**功能:** 仿照Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value),通过调用序列化函数,实现以字段形式插入 value
**功能:** 仿照Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value),调用序列化函数,实现以字段形式插入 value
**输入:** 数据库名,字段名和字段的值
5. Get_Fields(const ReadOptions& options, const Slice& key,
FieldArray* fields)
**输出:** 包含该字段和字段数组的 key,由于可能不只有一个,所以返回值为 vector
**功能:** 仿照Status DB::Get(const WriteOptions& opt, const Slice& key, const Slice& value),读取key对应的 value 之后,通过调用反序列化函数,将 value 反序列化为字段数组,并存到 fields 中
5. Get_Fields (待实现)
#### 4.2 实现KV分离
1. 搜索slot_page文件: Status find_slot(const Slice& key, Slot *slot);
2. 搜索vlog文件: Status find_value(Slot *slot);

+ 1
- 1
test/db_test1.cc View File

@ -37,7 +37,7 @@ int test2() {
assert(status.ok());
const std::string value_prefix = "value_";
const size_t loop_times = 1000;
const size_t loop_times = 100000;
for (auto i = 0; i < loop_times; i++) {
auto key = std::to_string(i);
auto value = value_prefix + key;

+ 0
- 83
test/db_test3.cc View File

@ -13,89 +13,6 @@
#include "gtest/gtest.h"
using namespace leveldb;
//// 序列化函数,将字段数组编码为字符串
//std::string SerializeValue(const FieldArray& fields) {
// // 创建并初始化一个字符串流 oss,用于逐步构建最终的序列化字符串
// std::ostringstream oss_temp;
// std::string slot_num = "slot_num";
// oss_temp << std::setw(sizeof(size_t)) << std::setfill('0') << slot_num;
// // 写入属性个数(定长,16比特),使用std::setw(16)设置宽度,使用std::setfull(0)设置填充字符,将字段数组的大小写入oss中
// oss_temp << std::setw(16) << std::setfill('0') << fields.size();
// for (const auto& field : fields) {
// // 写入属性名长度(定长,16比特)
// oss_temp << std::setw(16) << std::setfill('0') << field.name.size();
// // 写入属性名(变长)
// oss_temp << field.name;
// // 写入属性值长度(定长,16比特)
// oss_temp << std::setw(16) << std::setfill('0') << field.value.size();
// // 写入属性值(变长)
// oss_temp << field.value;
// }
// std::string temp_str = oss_temp.str();
// size_t value_length = temp_str.size();
//
// std::ostringstream oss;
// oss << std::setw(16) << std::setfill('0') << value_length;
// oss << temp_str;
//
// std::cout << "value 的长度为: " << value_length << std::endl;
// std::cout << "总长度为: " << oss.str().size() << std::endl;
// return oss.str();
//}
//// 反序列化函数,将字符串解码为字段数组
//FieldArray ParseValue(const std::string& value_str) {
// // 存放解析后的字段数组
// FieldArray fields;
// // 将输入字符串转换为输入流 iss, 方便读取
// std::istringstream iss(value_str);
// std::string content;
// // 临时存放读取的数据
// char buffer[100];
// // 读取长度(定长,16比特)
// iss.read(buffer, 16);
// buffer[16] = '\0';
// size_t total_length = std::stoi(buffer);
// // std::cout << "读取到的总长度为: " << total_length << std::endl;
// std::string value_content(value_str.begin() + 16, value_str.begin() + 16 + total_length);
// // std::cout << value_content << std::endl;
// std::istringstream iss_content(value_content);
// iss_content.read(buffer, sizeof(size_t));
// buffer[sizeof(size_t)] = '\0';
// std::string slot_num = buffer;
// // 读取属性个数
// iss_content.read(buffer, 16);
// // 在第17个比特位处添加终结符,确保字符串以终结符结尾
// buffer[16] = '\0';
// // 将 buffer 中的内容转化为整数并赋值给 field_count
// int field_count = std::stoi(buffer);
// // std::cout << "读取到的字段个数为: " << field_count << std::endl;
//
// for (int i = 0; i < field_count; ++i) {
// Field field;
// // 读取属性名长度(定长,16比特)
// iss_content.read(buffer, 16);
// buffer[16] = '\0';
// int name_length = std::stoi(buffer);
// // std::cout << "读取到的属性名长度为: " << name_length << std::endl;
// // 读取属性名(变长)
// field.name.resize(name_length);
// iss_content.read(&field.name[0], name_length);
// // std::cout << "读取到的属性名为: " << field.name << std::endl;
// // 读取属性值长度(定长,16比特)
// iss_content.read(buffer, 16);
// buffer[16] = '\0';
// int value_length = std::stoi(buffer);
// // std::cout << "读取到的属性值长度为: " << value_length << std::endl;
// // 读取属性值(变长)
// field.value.resize(value_length);
// iss_content.read(&field.value[0], value_length);
// // std::cout << "读取到的属性值为: " << field.value << std::endl;
// fields.push_back(field);
// }
// return fields;
//}
// 根据字段值查找所有包含该字段的 key,遍历
std::vector<std::string> FindKeysByField(leveldb::DB* db, const Field& field) {
std::vector<std::string> keys;

Loading…
Cancel
Save