From 2a8b78d43bb8dcd03e2b26a42b97ee9c12f88f04 Mon Sep 17 00:00:00 2001 From: Yechi Ma <2662511702@qq.com> Date: Thu, 12 Dec 2024 15:05:23 +0800 Subject: [PATCH 1/2] modify report.md --- report.md | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/report.md b/report.md index c81b0dc..351a904 100644 --- a/report.md +++ b/report.md @@ -57,22 +57,22 @@ 1. value的分离式存储 我们使用若干个vlog文件,为每一个vlog文件设置容量上限(比如16MiB),并在内存中为每一个vlog维护一个discard计数器,表示这个vlog中当前有多少value已经在lsm tree中被标记为删除。 2. 存储value所在vlog和偏移量的元数据 - 我们在key和vlog中添加一个vlog_page的中间层,这一层存储每一个key对应的value所在的vlog文件和文件内偏移,而lsm tree中的key包含的实际上是这个中间层的slot下标,而每一个slot中存储的是key所对应的vlog文件号以及value在vlog中的偏移。这样,我们就可以在不修改lsm tree的基础上,完成对vlog的compaction,并将vlog的gc结果只反映在这个中间层vlog_page中。这个vlog_page实际上也是一个线性增长的log文件,作用类似于os中的页表,负责维护lsm tree中存储的slot下标到vlog和vlog内偏移量的一个映射。这样,通过vlog_page我们就可以找到具体的vlog文件和其文件内偏移量。对于vlog的GC过程,我们不需要修改lsm tree中的内容,我们只需要修改vlog_page中的映射即可。 -3. vlog_page文件和vlog文件的GC - 对于vlog文件,我们在内存中维护一个bitmap,用来表示每一个slot的使用情况,并在插入和GC删除kv时进行动态的分配和释放。对于vlog文件的GC,我们用一个后台线程来扫描所有vlog的discard计数器。当某些vlog的discard计数器超过某个阈值(比如1024),我们就对这些vlog文件进行GC过程,当GC完成之后将vlog_page中的slot元数据进行更新,再将原来的vlog文件进行删除,GC过程就完成了。 + 我们在key和vlog中添加一个slot_page的中间层,这一层存储每一个key对应的value所在的vlog文件和文件内偏移,而lsm tree中的key包含的实际上是这个中间层的slot下标,而每一个slot中存储的是key所对应的vlog文件号以及value在vlog中的偏移。这样,我们就可以在不修改lsm tree的基础上,完成对vlog的compaction,并将vlog的gc结果只反映在这个中间层slot_page中。这个slot_page实际上也是一个线性增长的log文件,作用类似于os中的页表,负责维护lsm tree中存储的slot下标到vlog和vlog内偏移量的一个映射。这样,通过slot_page我们就可以找到具体的vlog文件和其文件内偏移量。对于vlog的GC过程,我们不需要修改lsm tree中的内容,我们只需要修改slot_page中的映射即可。 +3. slot_page文件和vlog文件的GC + 对于vlog文件,我们在内存中维护一个bitmap,用来表示每一个slot的使用情况,并在插入和GC删除kv时进行动态的分配和释放。对于vlog文件的GC,我们用一个后台线程来扫描所有vlog的discard计数器。当某些vlog的discard计数器超过某个阈值(比如1024),我们就对这些vlog文件进行GC过程,当GC完成之后将slot_page中的slot元数据进行更新,再将原来的vlog文件进行删除,GC过程就完成了。 ### 3. 数据结构设计 -`key的格式:| key | vlog_page_slot | ` +`key的格式:| key | slot_num | ` -`vlog_page: | slot0:{vlog_no(定长), offset(定长)}, slot1:{vlog_no, offset}, ... | ` +`slot_page: | slot0:{vlog_no(定长), offset(定长)}, slot1:{vlog_no, offset}, ... | ` `value 的格式:| attr个数(定长) | attr1_name的长度(定长) | attr1_name(变长) | attr1_value的长度(定长) | attr1_value(变长) | ... |` -对于每一次读取,用户线程先读取lsm tree中key的vlog_page_slot下标,然后到vlog_page中读取对应的slot内容(**每一个slot都是定长的**),之后再在这个slot中读取value所在的vlog文件号和偏移量offset,之后到对应的vlog文件中读取value。 +对于每一次读取,用户线程先读取lsm tree中key的slot_num下标,然后到slot_page中读取对应的slot内容(**每一个slot都是定长的**),之后再在这个slot中读取value所在的vlog文件号和偏移量offset,之后到对应的vlog文件中读取value。 -但是这又带来了一个问题,我们该如何管理vlog_page这个文件?当插入新的kv时,我们需要在这个vlog_page中分配新的slot,在GC删除某个kv时,我们需要将对应的slot进行释放。这里我们选择在内存中维护一个可线性扩展的bitmap。这个bitmap中每一个bit标识了当前vlog_page文件中对应slot是否被使用,是为1,不是为0。这样一来,在插入新kv时,我们可以用bitmap来分配一个新的slot(将bitmap中第一个为0的bit设置为1),将内容进行写入;在GC删除某个kv时,我们将这个slot对应的bitmap中的bit重置为0即可。 +但是这又带来了一个问题,我们该如何管理slot_page这个文件?当插入新的kv时,我们需要在这个slot_page中分配新的slot,在GC删除某个kv时,我们需要将对应的slot进行释放。这里我们选择在内存中维护一个可线性扩展的bitmap。这个bitmap中每一个bit标识了当前slot_page文件中对应slot是否被使用,是为1,不是为0。这样一来,在插入新kv时,我们可以用bitmap来分配一个新的slot(将bitmap中第一个为0的bit设置为1),将内容进行写入;在GC删除某个kv时,我们将这个slot对应的bitmap中的bit重置为0即可。 ### 4. 接口设计 @@ -112,7 +112,7 @@ 5. Get_Fields (待实现) #### 4.2 实现KV分离 这里只展示和vlog以及GC无关的接口,vlog的创建,管理以及后台线程的GC设计到vlog等新数据结构的实现,较为复杂和庞大,这里不做展示。我们只列出与kv的插入有关的新接口: -1. 搜索vlog_page文件: Status find_slot(const Slice& key, Slot *slot); +1. 搜索slot_page文件: Status find_slot(const Slice& key, Slot *slot); 2. 搜索vlog文件: Status find_value(Slot *slot); 3. 分配新的slot: Status allocate_slot(Bitmap *map, uint64_t *s); 4. 释放slot: void deallocate_slot(Bitmap *map, uint64_t *s); @@ -201,7 +201,7 @@ int main(int argc, char **argv) { ![图片](./pic/test_field_2.png) ### 5.2 单元测试: -1. 测试插入超过初始vlog_page等slot数量之后,是否还能正常插入,检查vlog_page文件等线性可扩展性 +1. 测试插入超过初始slot_page等slot数量之后,是否还能正常插入,检查slot_page文件等线性可扩展性 2. 测试插入后,进行删除,等待GC完成后再读取value和vlog的大小,看看GC过程是否正常进行。 性能测试: @@ -211,7 +211,7 @@ int main(int argc, char **argv) { #### 6. 可能遇到的挑战与解决方案 列出实现过程中可能遇到的技术难题及其解决思路,如如何处理GC开销、数据同步、索引原子更新等问题。 -各种参数的设置,比如vlog的容量上限,以及vlog_page的bitmap管理方式是否足够高效?以及在GC过程中如果对被GC中的vlog进行写入该让用户线程和后台线程以什么样的方式进行同步?vlog_page的读写放大也是一个重要的问题。 +各种参数的设置,比如vlog的容量上限,以及slot_page的bitmap管理方式是否足够高效?以及在GC过程中如果对被GC中的vlog进行写入该让用户线程和后台线程以什么样的方式进行同步?slot_page的读写放大也是一个重要的问题。 #### 7. 分工和进度安排 @@ -219,8 +219,8 @@ int main(int argc, char **argv) { |----------------------|-------|----------| | Field相关接口实现 | 12.8 | 王雪飞 | | value_log中value的存储格式 | 12.8 | 王雪飞 | -| vlog_page 相关接口 | 12.8 | 马也驰 | -| vlog_page实现 | 12.8 | 马也驰 | +| slot_page 相关接口 | 12.8 | 马也驰 | +| slot_page实现 | 12.8 | 马也驰 | | 修改leveldb的接口实现字段功能 | 12.17 | 王雪飞 | | vlog的GC实现 | 12.29 | 马也驰 | | 性能测试 | 1.5 | 王雪飞, 马也驰 | From 966c248003b8b6b58a4de78caa1f33a5d2bf1e83 Mon Sep 17 00:00:00 2001 From: wangxuefei <10225501435@stu.ecnu.edu.cn> Date: Mon, 30 Dec 2024 21:49:21 +0800 Subject: [PATCH 2/2] modify db_test3.cc --- test/db_test3.cc | 192 +++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 159 insertions(+), 33 deletions(-) diff --git a/test/db_test3.cc b/test/db_test3.cc index d51d2b8..9e584fa 100644 --- a/test/db_test3.cc +++ b/test/db_test3.cc @@ -1,56 +1,124 @@ +#include +#include +#include #include -#include "leveldb/env.h" #include -#include -#include +#include #include -#include +#include +#include + +#include "leveldb/env.h" + #include "gtest/gtest.h" using namespace leveldb; -using Field = std::pair; // field_name:field_value -using FieldArray = std::vector>; -// 序列化为字符串 +// 字段信息结构体 +struct Field { + std::string name; + std::string value; +}; +using FieldArray = std::vector; + +// 序列化函数,将字段数组编码为字符串 std::string SerializeValue(const FieldArray& fields) { - std::ostringstream oss; + // 创建并初始化一个字符串流 oss,用于逐步构建最终的序列化字符串 + std::ostringstream oss_temp; + std::string slot_num = "slot_num"; + oss_temp << std::setw(sizeof(size_t)) << std::setfill('0') << slot_num; + // 写入属性个数(定长,16比特),使用std::setw(16)设置宽度,使用std::setfull(0)设置填充字符,将字段数组的大小写入oss中 + oss_temp << std::setw(16) << std::setfill('0') << fields.size(); for (const auto& field : fields) { - oss << field.first << ":" << field.second << ";"; + // 写入属性名长度(定长,16比特) + oss_temp << std::setw(16) << std::setfill('0') << field.name.size(); + // 写入属性名(变长) + oss_temp << field.name; + // 写入属性值长度(定长,16比特) + oss_temp << std::setw(16) << std::setfill('0') << field.value.size(); + // 写入属性值(变长) + oss_temp << field.value; } + std::string temp_str = oss_temp.str(); + size_t value_length = temp_str.size(); + + std::ostringstream oss; + oss << std::setw(16) << std::setfill('0') << value_length; + oss << temp_str; + + std::cout << "value 的长度为: " << value_length << std::endl; + std::cout << "总长度为: " << oss.str().size() << std::endl; return oss.str(); } -// 反序列化为字段数组 +// 反序列化函数,将字符串解码为字段数组 FieldArray ParseValue(const std::string& value_str) { + // 存放解析后的字段数组 FieldArray fields; + // 将输入字符串转换为输入流 iss, 方便读取 std::istringstream iss(value_str); - std::string field_str; - while (std::getline(iss, field_str, ';')) { - size_t delimiter_pos = field_str.find(':'); - if (delimiter_pos != std::string::npos) { - std::string field_name = field_str.substr(0, delimiter_pos); - std::string field_value = field_str.substr(delimiter_pos + 1); - fields.emplace_back(field_name, field_value); - } + std::string content; + // 临时存放读取的数据 + char buffer[100]; + // 读取长度(定长,16比特) + iss.read(buffer, 16); + buffer[16] = '\0'; + size_t total_length = std::stoi(buffer); + // std::cout << "读取到的总长度为: " << total_length << std::endl; + std::string value_content(value_str.begin() + 16, value_str.begin() + 16 + total_length); + // std::cout << value_content << std::endl; + std::istringstream iss_content(value_content); + iss_content.read(buffer, sizeof(size_t)); + buffer[sizeof(size_t)] = '\0'; + std::string slot_num = buffer; + // 读取属性个数 + iss_content.read(buffer, 16); + // 在第17个比特位处添加终结符,确保字符串以终结符结尾 + buffer[16] = '\0'; + // 将 buffer 中的内容转化为整数并赋值给 field_count + int field_count = std::stoi(buffer); + // std::cout << "读取到的字段个数为: " << field_count << std::endl; + + for (int i = 0; i < field_count; ++i) { + Field field; + // 读取属性名长度(定长,16比特) + iss_content.read(buffer, 16); + buffer[16] = '\0'; + int name_length = std::stoi(buffer); + // std::cout << "读取到的属性名长度为: " << name_length << std::endl; + // 读取属性名(变长) + field.name.resize(name_length); + iss_content.read(&field.name[0], name_length); + // std::cout << "读取到的属性名为: " << field.name << std::endl; + // 读取属性值长度(定长,16比特) + iss_content.read(buffer, 16); + buffer[16] = '\0'; + int value_length = std::stoi(buffer); + // std::cout << "读取到的属性值长度为: " << value_length << std::endl; + // 读取属性值(变长) + field.value.resize(value_length); + iss_content.read(&field.value[0], value_length); + // std::cout << "读取到的属性值为: " << field.value << std::endl; + fields.push_back(field); } return fields; } -// 根据字段值查找所有包含该字段的 key -std::vector FindKeysByField(leveldb::DB* db, Field &field) { +// 根据字段值查找所有包含该字段的 key,遍历 +std::vector FindKeysByField(leveldb::DB* db, const Field& field) { std::vector keys; leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); - for (it->SeekToFirst(); it->Valid(); it->Next()) { + for (it->SeekToFirst(); it->Valid() ; it->Next()) { std::string key = it->key().ToString(); std::string value; db->Get(leveldb::ReadOptions(), key, &value); FieldArray fields = ParseValue(value); for (const auto& f : fields) { - if (f.first == field.first && f.second == field.second) { + if (f.name == field.name && f.value == field.value) { keys.push_back(key); - break; // 假设每个key中每个字段值唯一,如果允许重复,可以移除这行 + break; // 假设每个key中每个字段值唯一 } } } @@ -59,38 +127,92 @@ std::vector FindKeysByField(leveldb::DB* db, Field &field) { return keys; } -Status OpenDB(std::string dbName, DB **db) { +Status OpenDB(std::string dbName, DB** db) { Options options; options.create_if_missing = true; return DB::Open(options, dbName, db); } +// 吞吐量测试函数 +void TestThroughput(leveldb::DB* db, int num_operations) { + WriteOptions writeOptions; + auto start_time = std::chrono::steady_clock::now(); + + for (int i = 0; i < num_operations; ++i) { + std::string key = "key_" + std::to_string(i); + FieldArray fields = {{"name", "Customer" + std::to_string(i)}, {"address", "Address" + std::to_string(i)}, {"phone", "1234567890"}}; + std::string value = SerializeValue(fields); + db->Put(writeOptions, key, value); + } + + auto end_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(end_time - start_time).count(); + std::cout << "Throughput: " << num_operations * 1000 / duration << " OPS" << std::endl; +} + +// 延迟测试函数 +void TestLatency(leveldb::DB* db, int num_operations, std::vector& lat_res) { + WriteOptions writeOptions; + int64_t latency = 0; + auto end_time = std::chrono::steady_clock::now(); + auto last_time = end_time; + for (int i = 0; i < num_operations; ++i) { + // 执行写入操作 + std::string key = "key_" + std::to_string(i); + FieldArray fields = {{"name", "Customer" + std::to_string(i)}, {"address", "Address" + std::to_string(i)}, {"phone", "1234567890"}}; + std::string value = SerializeValue(fields); + db->Put(writeOptions, key, value); + db->Get(leveldb::ReadOptions(), key, &value); + + end_time = std::chrono::steady_clock::now(); + latency = std::chrono::duration_cast( + end_time - last_time).count(); + last_time = end_time; + + lat_res.emplace_back(latency); + } + + // 输出延迟统计信息(可选) + double avg_latency = std::accumulate(lat_res.begin(), lat_res.end(), 0.0) / lat_res.size(); + std::cout << "Average Latency: " << std::fixed << std::setprecision(2) << avg_latency << " ms" << std::endl; + std::cout << "Max Latency: " << *std::max_element(lat_res.begin(), lat_res.end()) << " ms" << std::endl; + std::cout << "Min Latency: " << *std::min_element(lat_res.begin(), lat_res.end()) << " ms" << std::endl; +} + TEST(TestSchema, Basic) { - DB *db; + DB* db; WriteOptions writeOptions; ReadOptions readOptions; - if(OpenDB("testdb", &db).ok() == false) { + if (!OpenDB("testdb", &db).ok()) { std::cerr << "open db failed" << std::endl; abort(); } std::string key1 = "k_1"; std::string key2 = "k_2"; + std::string key3 = "k_3"; FieldArray fields1 = { - {"name", "Customer#000000001"}, + {"name", "Customer1"}, {"address", "IVhzIApeRb"}, {"phone", "25-989-741-2988"} }; FieldArray fields2 = { - {"name", "Customer#000000001"}, + {"name", "Customer1"}, {"address", "ecnu"}, {"phone", "123456789"} }; + FieldArray fields3 = { + {"name", "Customer2"}, + {"address", "ecnu"}, + {"phone", "11111"} + }; // 序列化并插入 std::string value1 = SerializeValue(fields1); std::string value2 = SerializeValue(fields2); + std::string value3 = SerializeValue(fields3); db->Put(leveldb::WriteOptions(), key1, value1); db->Put(leveldb::WriteOptions(), key2, value2); + db->Put(leveldb::WriteOptions(), key3, value3); // 读取并反序列化 std::string value_ret; @@ -100,21 +222,25 @@ TEST(TestSchema, Basic) { // 检查反序列化结果 ASSERT_EQ(fields_ret.size(), fields1.size()); for (size_t i = 0; i < fields_ret.size(); ++i) { - ASSERT_EQ(fields_ret[i].first, fields1[i].first); - ASSERT_EQ(fields_ret[i].second, fields1[i].second); + ASSERT_EQ(fields_ret[i].name, fields1[i].name); + ASSERT_EQ(fields_ret[i].value, fields1[i].value); } // 测试查找功能 - Field query_field = {"name", "Customer#000000001"}; + Field query_field = {"name", "Customer2"}; std::vector found_keys = FindKeysByField(db, query_field); std::cout << "找到的key有:" << found_keys.size() << "个" << std::endl; - ASSERT_EQ(found_keys[0], key1); + /*// 吞吐量测试 + TestThroughput(db, 10000);*/ + /* // 延迟测试 + std::vector latency_results; + TestLatency(db, 1000, latency_results);*/ // 关闭数据库 delete db; } -int main(int argc, char **argv) { +int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } \ No newline at end of file