diff --git a/db/db_impl.cc b/db/db_impl.cc index b7bec04..6b5d90a 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1268,9 +1268,9 @@ Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key, const FieldArray& fields) { // 将字段数组序列化 std::string serialized_value; - std::cout << "Put_Fields: " << key.ToString() << " " << serialized_value << std::endl; size_t slot_num = slot_page_->alloc_slot(); SerializeValue(fields, serialized_value, slot_num); + std::cout << "Put_Fields: " << key.ToString() << " " << serialized_value << std::endl; struct slot_content sc; vlog_set_->put_value(sc, slot_num, serialized_value); slot_page_->set_slot(slot_num, &sc); diff --git a/report.md b/report.md index 5350f13..6d9d75b 100644 --- a/report.md +++ b/report.md @@ -36,31 +36,173 @@ #### 2.1.2 实验内容 1. 数据存储与解析: 每个 value 存储为一个字符串数组,数组中的每个元素代表一个字段。 -2. 通过字段查询 Key: 实现函数FindKeysByField,传入字段名和字段的值就可以找到对应的key +```` +using Field = std::pair; // field_name:field_value +using FieldArray = std::vector>; + +// 编码函数,将传入的字段数组和 slot_num 序列化为字符串,并存到 value +void DBImpl::SerializeValue(const FieldArray& fields, std::string &value, size_t slot_num) { + // single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } | + // single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 | + std::string tmp_value; + uint16_t value_size = sizeof(uint16_t); + uint16_t field_nums = 0; + for (const auto& field : fields) { + const uint8_t attr_name_len = field.name.size(); + const uint16_t attr_value_len = field.value.size(); + const size_t attr_size = attr_name_len + attr_value_len + sizeof(uint8_t) + sizeof(uint16_t); + char attr_data[attr_size]; + + size_t off = 0; + memcpy(attr_data+off, &attr_name_len, sizeof(uint8_t)); + off += sizeof(uint8_t); + memcpy(attr_data+off, field.name.c_str(), attr_name_len); + off += attr_name_len; + memcpy(attr_data+off, &attr_value_len, sizeof(uint16_t)); + off += sizeof(uint16_t); + memcpy(attr_data+off, field.value.c_str(), attr_value_len); + off += attr_value_len; + + assert(off == attr_size); + tmp_value += std::string(attr_data, attr_size); + value_size += attr_size; + field_nums ++; + } -**设计思路:** -1. 使用 Field 存储属性和值,使用 FieldArray 存储多个 Field; -2. 函数 SerializeValue 把字段数组序列化为字符串; -3. 函数 DeserializeValue 把字符串反序列化为字段数组; -4. 函数 FindKeysByField 根据传入的字段名和字段的值找到对应的key。 -### 2.1.3 实验进度以及实验结果 -#### 实验进度 -已初步实现上述四个函数,后续会对查询函数 FindKeysByField 进行优化和完善,并将上述函数添加到 LevelDB 的代码之中。 -#### 实验结果 -通过测试 - -#### 2.2 KV分离 + char value_data[value_size]; + memcpy(value_data, &value_size, sizeof(uint16_t)); + memcpy(value_data+sizeof(uint16_t), tmp_value.c_str(), tmp_value.size()); + + assert(sizeof(uint16_t) + tmp_value.size() == value_size); + value = std::string(value_data, value_size); +} + +// 解码函数 将传入的 value_str 字符串反序列化为字段数组并存到 fields +void DBImpl::DeserializeValue(FieldArray& fields, const std::string& value_str) { + // single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } | + // single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 | + const char *value_data = value_str.c_str(); + const size_t value_len = value_str.size(); + size_t attr_off = sizeof(uint16_t); + + while (attr_off < value_len) { + uint8_t attr_name_len = *(uint8_t *)(value_data+attr_off); + attr_off += sizeof(uint8_t); + auto attr_name = std::string(value_data+attr_off, attr_name_len); + attr_off += attr_name_len; + + uint16_t attr_len = *(uint16_t *)(value_data+attr_off); + attr_off += sizeof(uint16_t); + auto attr_value = std::string(value_data+attr_off, attr_len); + attr_off += attr_len; + + fields.push_back({attr_name, attr_value}); + } + + assert(attr_off == value_len); +} +```` +2. 通过字段查询 Key: 实现函数 FindKeysByField,传入字段名和字段的值就可以找到对应的key +```` +// 根据字段值查找所有包含该字段的 key +// 一个字段值可能对应多个key,所以返回std::vector +std::vector FindKeysByField(leveldb::DB* db, const Field& field) { + std::vector keys; + leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); + // 遍历数据库中所有的键值对 + for (it->SeekToFirst(); it->Valid() ; it->Next()) { + std::string key = it->key().ToString(); + FieldArray fields; + // 调用 Get_Fields 函数,获取 key 对应的字段数组 + db->Get_Fields(leveldb::ReadOptions(), key, fields); + // 便利字段数组,如果字段数组中包含该字段,则将该 key 添加到 keys 中 + for (const auto& f : fields) { + if (f.name == field.name && f.value == field.value) { + keys.push_back(key); + break; // 假设每个key中每个字段值唯一 + } + } + } + + delete it; + return keys; +} +```` + +#### 2.1.3 实验结果 + + +### 2.2 KV分离 **设计目标:** -将value的存储和key在lsm tree中的存储分离,降低lsm tree的GC开销 +在LevelDB中实现KV分离,即将键值对中的键和值存储在不同的存储区域,以优化写性能和点查询性能。 +#### 2.2.1 实验要求 +1. KV 分离设计 ++ a. 将LevelDB的key-value存储结构进行扩展,分离存储key和value ++ b. Key存储在一个LevelDB实例中,LSM-tree中的value为一个指向Value log文件和偏移地址的指针,用户Value存储在Value log中。 +2. 读取操作 ++ a. KV分离后依然支持点查询与范围查询操作。 +3. value_log 的管理 ++ a. 当Value log超过一定大小后通过后台GC操作释放Value log中的无效数据。 ++ b. GC能把旧Value log中没有失效的数据写入新的Value log,并更新LSM-tree里的键值对。 ++ c. 新旧Value log的管理功能。 +4. 确保操作的原子性 +#### 2.2.1 实验内容 ++ 1) 不改变LevelDB原有的接口,实现KV分离。 ++ 2) 编写测试点验证KV分离是否正确实现。 **设计思路:** 1. value的分离式存储 我们使用若干个vlog文件,为每一个vlog文件设置容量上限(比如16MiB),并在内存中为每一个vlog维护一个discard计数器,表示这个vlog中当前有多少value已经在lsm tree中被标记为删除。 2. 存储value所在vlog和偏移量的元数据 - 我们在key和vlog中添加一个slot_page的中间层,这一层存储每一个key对应的value所在的vlog文件和文件内偏移,而lsm tree中的key包含的实际上是这个中间层的slot下标,而每一个slot中存储的是key所对应的vlog文件号以及value在vlog中的偏移。这样,我们就可以在不修改lsm tree的基础上,完成对vlog的compaction,并将vlog的gc结果只反映在这个中间层slot_page中。这个slot_page实际上也是一个线性增长的log文件,作用类似于os中的页表,负责维护lsm tree中存储的slot下标到vlog和vlog内偏移量的一个映射。这样,通过slot_page我们就可以找到具体的vlog文件和其文件内偏移量。对于vlog的GC过程,我们不需要修改lsm tree中的内容,我们只需要修改slot_page中的映射即可。 + 我们在 memtable 和vlog中添加一个slot_page的中间层,这一层存储每一个key对应的value所在的vlog文件和文件内偏移,而lsm tree中的key包含的实际上是这个中间层的slot下标,而每一个slot中存储的是key所对应的vlog文件号以及value在vlog中的偏移。这样,我们就可以在不修改lsm tree的基础上,完成对vlog的compaction,并将vlog的gc结果只反映在这个中间层slot_page中。这个slot_page实际上也是一个线性增长的log文件,作用类似于os中的页表,负责维护lsm tree中存储的slot下标到vlog和vlog内偏移量的一个映射。这样,通过slot_page我们就可以找到具体的vlog文件和其文件内偏移量。对于vlog的GC过程,我们不需要修改lsm tree中的内容,我们只需要修改slot_page中的映射即可。 3. slot_page文件和vlog文件的GC 对于vlog文件,我们在内存中维护一个bitmap,用来表示每一个slot的使用情况,并在插入和GC删除kv时进行动态的分配和释放。对于vlog文件的GC,我们用一个后台线程来扫描所有vlog的discard计数器。当某些vlog的discard计数器超过某个阈值(比如1024),我们就对这些vlog文件进行GC过程,当GC完成之后将slot_page中的slot元数据进行更新,再将原来的vlog文件进行删除,GC过程就完成了。 +```` +// 将传入的字段数组插入数据库中 +Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key, +const FieldArray& fields) { +std::string serialized_value; +// 分配一个 slot 下标 +size_t slot_num = slot_page_->alloc_slot(); +// 将 fields 序列化为字符串 +SerializeValue(fields, serialized_value, slot_num); +struct slot_content sc; +// 将序列化后的字符串插入 value_log 中 +vlog_set_->put_value(sc, slot_num, serialized_value); +slot_page_->set_slot(slot_num, &sc); + +char data[sizeof(size_t)]; +memcpy(data, &slot_num, sizeof(size_t)); +Slice slot_val(data, sizeof(data)); +// 将 slot_num 作为 value 插入 memtable 中 +return DB::Put(opt, key, slot_val); +} + +// 从数据库中读取 key 对应的字段数组 +Status DBImpl::Get_Fields(const ReadOptions& options, const Slice& key, +FieldArray& fields) { +size_t slot_num; +// 从 memtable 中读取 key 对应的 slot_num +auto s = get_slot_num(options, key, &slot_num); +if (!s.ok()) { +return s; +} +struct slot_content sc; +std::string vlog_value; +// 根据 slot_num 获取 slot_page_ 中的信息 +slot_page_->get_slot(slot_num, &sc); +// 根据 slot_page_ 中的信息,从 value_log 中读取字符串并存放到 vlog_value +vlog_set_->get_value(sc, &vlog_value); +if (vlog_value.empty()) { +return Status::NotFound("value has been deleted"); +} + +// 调用 DeserializeValue 函数,将 vlog_value 解码为字段数组 fields +DeserializeValue(fields, vlog_value); +return Status::OK(); +} +```` ##### 2.2.1 相关代码文件 - [`/db/db_impl.cc`](./db/db_impl.cc): 修改函数 DBImpl::Get, DBImpl::Put 和 DBImpl::Delete,添加函数 Put_fields, Get_fields, get_slot_num,SerializeValue, DeserializeValue - [`/db/db_impl.h`](./db/db_impl.h): 添加两个结构体 SlotPage *slot_page_; VlogSet *vlog_set_ ,添加增加的相关函数的声明 @@ -80,33 +222,7 @@ - - [`CMakeLists.txt`](CMakeLists.txt):添加可执行文件 ##### 2.2.1 具体流程 -写入流程 -```` -Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key, - const FieldArray& fields) { - // TODO(begin): allocate slot_num in slotpage and put value in vlog - // 将字段数组序列化 - size_t slot_num = slot_page_->alloc_slot(); - std::string slot_num_str((char *)&slot_num, sizeof(size_t)); - // size_t slot_num_str_num; - // std::memcpy(&slot_num_str_num, slot_num_str.c_str(), sizeof(size_t)); - // std::cout << "slot_num_str_num: " << slot_num_str_num << std::endl; - std::string serialized_value = SerializeValue(fields, slot_num_str); - // std::cout << "Put_Fields: " << key.ToString() << " " << serialized_value << std::endl; - struct slot_content sc; - vlog_set_->put_value(&sc.vlog_num, &sc.value_offset, serialized_value); - slot_page_->set_slot(slot_num, &sc); - - char data[sizeof(size_t)]; - memcpy(data, &slot_num, sizeof(size_t)); - Slice slot_val(data, sizeof(data)); - // return DB::Put(opt, key, slot_val); - return DB::Put(opt, key, serialized_value); - // TODO(end) -} -} -```` 1. 为当前 KV 对分配一个 size_t 类型的 slot_num; 2. 将 slot_num 转化为字符串形式 slot_num_str; 3. 调用 SerializeValue 函数将字段数组和 slot_num_str 序列化为字符串 serialized_value; @@ -116,16 +232,6 @@ Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key, 7. 将 slot_num 作为 value 写入数据库中; 读取流程 -```` -// TODO(begin): search the slotpage and get value from vlog - size_t slot_num = *(size_t *)value->c_str(); - struct slot_content sc; - std::string vlog_value; - slot_page_->get_slot(slot_num, &sc); - vlog_set_->get_value(sc.vlog_num, sc.value_offset, &vlog_value); - *value = vlog_value; - // TODO(end) -```` 1. 读取 key 对应的 value, 也就是 slot_num 2. 实例化 slot_content 结构体 sc 3. 根据 slot_num 从 slot_page_ 中读取 slot_content @@ -204,9 +310,9 @@ Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key, 2. 搜索vlog文件: Status find_value(Slot *slot); 3. 分配新的slot: Status allocate_slot(Bitmap *map, uint64_t *s); 4. 释放slot: void deallocate_slot(Bitmap *map, uint64_t *s); - - - +5. +6. VlogSet::get_value(const struct slot_content &sc, std::string *value) +7. ### 5. 功能测试 ### 5.1 在 LevelDB 的 value 中实现字段功能 #### 5.1.1 功能测试 @@ -312,11 +418,4 @@ int main(int argc, char **argv) { | 修改leveldb的接口实现字段功能 | 12.17 | 王雪飞 | | vlog的GC实现 | 12.29 | 马也驰 | | 性能测试 | 1.5 | 王雪飞, 马也驰 | -| 功能测试 | 1.5 | 王雪飞, 马也驰 | - - - - - - - +| 功能测试 | 1.5 | 王雪飞, 马也驰 | \ No newline at end of file diff --git a/test/db_test3.cc b/test/db_test3.cc index a204f89..3861ec1 100644 --- a/test/db_test3.cc +++ b/test/db_test3.cc @@ -29,7 +29,7 @@ std::vector FindKeysByField(leveldb::DB* db, const Field& field) { } } } - + std::cout << "Found " << keys.size() << " keys" << std::endl; delete it; return keys; } @@ -47,7 +47,10 @@ void TestThroughput(leveldb::DB* db, int num_operations) { for (int i = 0; i < num_operations; ++i) { std::string key = "key_" + std::to_string(i); - FieldArray fields = {{"name", "Customer" + std::to_string(i)}, {"address", "Address" + std::to_string(i)}, {"phone", "1234567890"}}; + FieldArray fields = { + {"name", "Customer" + std::to_string(i)}, + {"address", "Address" + std::to_string(i)}, + {"phone", "1234567890"}}; db->Put_Fields(writeOptions, key, fields); } @@ -65,8 +68,10 @@ void TestLatency(leveldb::DB* db, int num_operations, std::vector& lat_ for (int i = 0; i < num_operations; ++i) { // 执行写入操作 std::string key = "key_" + std::to_string(i); - FieldArray fields = {{"name", "Customer" + std::to_string(i)}, {"address", "Address" + std::to_string(i)}, {"phone", "1234567890"}}; - db->Put_Fields(writeOptions, key, fields); + FieldArray fields = { + {"name", "Customer" + std::to_string(i)}, + {"address", "Address" + std::to_string(i)}, + {"phone", "1234567890"}}; db->Get_Fields(leveldb::ReadOptions(), key, fields); end_time = std::chrono::steady_clock::now(); @@ -98,7 +103,7 @@ TEST(TestSchema, Basic) { std::string key2 = "k_2"; std::string key3 = "k_3"; // std::string value = "value"; - FieldArray fields0 = {{"name", "wxf"}}; + FieldArray fields0 = {{"name", "myc&wxf"}}; FieldArray fields1 = { {"name", "Customer1"}, {"address", "IVhzIApeRb"}, @@ -145,15 +150,15 @@ TEST(TestSchema, Basic) { } // 测试查找功能 - Field query_field = {"name", "Customer2"}; + Field query_field = {"name", "Customer1"}; std::vector found_keys = FindKeysByField(db, query_field); std::cout << "找到的key有:" << found_keys.size() << "个" << std::endl; - /*// 吞吐量测试 - TestThroughput(db, 10000);*/ + // 吞吐量测试 + TestThroughput(db, 10000); - /* // 延迟测试 + // 延迟测试 std::vector latency_results; - TestLatency(db, 1000, latency_results);*/ + TestLatency(db, 1000, latency_results); // 关闭数据库 delete db; }