diff --git a/CMakeLists.txt b/CMakeLists.txt index 9a8a4e7..02a944c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -559,4 +559,9 @@ target_link_libraries(db_test5 PRIVATE leveldb) add_executable(db_test6 "${PROJECT_SOURCE_DIR}/test/db_test6.cc" ) -target_link_libraries(db_test6 PRIVATE leveldb gtest) \ No newline at end of file +target_link_libraries(db_test6 PRIVATE leveldb gtest) + +add_executable(db_test_latent + "${PROJECT_SOURCE_DIR}/test/db_test_latent.cc" +) +target_link_libraries(db_test_latent PRIVATE leveldb gtest) \ No newline at end of file diff --git a/db/db_impl.cc b/db/db_impl.cc index c6da14a..cd95759 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1269,11 +1269,10 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) { // Convenience methods Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key, const FieldArray& fields) { - // 将字段数组序列化 std::string serialized_value; - std::cout << "Put_Fields: " << key.ToString() << " " << serialized_value << std::endl; size_t slot_num = slot_page_->alloc_slot(); SerializeValue(fields, serialized_value, slot_num); + std::cout << "Put_Fields: " << key.ToString() << " " << serialized_value << std::endl; struct slot_content sc; vlog_set_->put_value(sc, slot_num, serialized_value); slot_page_->set_slot(slot_num, &sc); diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 6d739b4..6ac10f9 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -157,6 +157,8 @@ class LEVELDB_EXPORT DB { // Todo(begin) virtual Status Put_Fields(const leveldb::WriteOptions& opt, const leveldb::Slice& key, const FieldArray& fields) = 0; virtual Status Get_Fields(const leveldb::ReadOptions& options, const leveldb::Slice& key, FieldArray& fields) = 0; + virtual Status get_slot_num(const ReadOptions& options, const Slice& key, + size_t *slot_num) = 0; // // Todo(end) }; diff --git a/report.md b/report.md index 5350f13..6faf540 100644 --- a/report.md +++ b/report.md @@ -36,205 +36,599 @@ #### 2.1.2 实验内容 1. 数据存储与解析: 每个 value 存储为一个字符串数组,数组中的每个元素代表一个字段。 -2. 通过字段查询 Key: 实现函数FindKeysByField,传入字段名和字段的值就可以找到对应的key +```` +using Field = std::pair; // field_name:field_value +using FieldArray = std::vector>; +```` +##### 编码函数: +`void DBImpl::SerializeValue(const FieldArray& fields, std::string &value, size_t slot_num)` -**设计思路:** -1. 使用 Field 存储属性和值,使用 FieldArray 存储多个 Field; -2. 函数 SerializeValue 把字段数组序列化为字符串; -3. 函数 DeserializeValue 把字符串反序列化为字段数组; -4. 函数 FindKeysByField 根据传入的字段名和字段的值找到对应的key。 -### 2.1.3 实验进度以及实验结果 -#### 实验进度 -已初步实现上述四个函数,后续会对查询函数 FindKeysByField 进行优化和完善,并将上述函数添加到 LevelDB 的代码之中。 -#### 实验结果 -通过测试 - -#### 2.2 KV分离 -**设计目标:** -将value的存储和key在lsm tree中的存储分离,降低lsm tree的GC开销 +**功能:** 将传入的字段数组和 slot_num 序列化为字符串,并存到 value -**设计思路:** -1. value的分离式存储 - 我们使用若干个vlog文件,为每一个vlog文件设置容量上限(比如16MiB),并在内存中为每一个vlog维护一个discard计数器,表示这个vlog中当前有多少value已经在lsm tree中被标记为删除。 -2. 存储value所在vlog和偏移量的元数据 - 我们在key和vlog中添加一个slot_page的中间层,这一层存储每一个key对应的value所在的vlog文件和文件内偏移,而lsm tree中的key包含的实际上是这个中间层的slot下标,而每一个slot中存储的是key所对应的vlog文件号以及value在vlog中的偏移。这样,我们就可以在不修改lsm tree的基础上,完成对vlog的compaction,并将vlog的gc结果只反映在这个中间层slot_page中。这个slot_page实际上也是一个线性增长的log文件,作用类似于os中的页表,负责维护lsm tree中存储的slot下标到vlog和vlog内偏移量的一个映射。这样,通过slot_page我们就可以找到具体的vlog文件和其文件内偏移量。对于vlog的GC过程,我们不需要修改lsm tree中的内容,我们只需要修改slot_page中的映射即可。 -3. slot_page文件和vlog文件的GC - 对于vlog文件,我们在内存中维护一个bitmap,用来表示每一个slot的使用情况,并在插入和GC删除kv时进行动态的分配和释放。对于vlog文件的GC,我们用一个后台线程来扫描所有vlog的discard计数器。当某些vlog的discard计数器超过某个阈值(比如1024),我们就对这些vlog文件进行GC过程,当GC完成之后将slot_page中的slot元数据进行更新,再将原来的vlog文件进行删除,GC过程就完成了。 +**字符串形式:** -##### 2.2.1 相关代码文件 -- [`/db/db_impl.cc`](./db/db_impl.cc): 修改函数 DBImpl::Get, DBImpl::Put 和 DBImpl::Delete,添加函数 Put_fields, Get_fields, get_slot_num,SerializeValue, DeserializeValue -- [`/db/db_impl.h`](./db/db_impl.h): 添加两个结构体 SlotPage *slot_page_; VlogSet *vlog_set_ ,添加增加的相关函数的声明 -- -- [`/db/shared_lock.h`](./db/shared_lock.h) 定义了一个 SharedLock 类,用于实现读写锁机制,包含四种操作:soft_lock():获取共享读锁,确保在没有写操作时允许多个读操作并发进行;soft_unlock():释放共享读锁;hard_lock():获取独占写锁,确保只有当没有其他读写操作时,允许写入操作进行;hard_unlock():释放独占写锁。 -- [`/db/slotpage.h`](./db/slotpage.h) -- [`/db/threadpool.h`](./db/threadpool.h) -- [`/db/vlog.h`](./db/vlog.h) -- [`/db/vlog_gc.cpp`](./db/vlog_gc.cpp) -- [`/db/vlog_gc.h`](./db/vlog_gc.h) -- [`/db/vlog_set.cpp`](./db/vlog_set.cpp) -- [`/db/vlog_set.h`](./db/vlog_set.h) -- -- [`/test/db_test3.cc`](./test/db_test3.cc):测试 value 的字段功能 -- [`/test/db_test4.cc`](./test/db_test4.cc) -- [`/test/db_test5.cc`](./test/db_test5.cc) -- -- [`CMakeLists.txt`](CMakeLists.txt):添加可执行文件 -##### 2.2.1 具体流程 -写入流程 +`single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |` + +`single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |` + +**输入:** 字段数组, slot_num, &value + +**具体实现如下:** ```` -Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key, - const FieldArray& fields) { - // TODO(begin): allocate slot_num in slotpage and put value in vlog - // 将字段数组序列化 - size_t slot_num = slot_page_->alloc_slot(); - std::string slot_num_str((char *)&slot_num, sizeof(size_t)); - // size_t slot_num_str_num; - // std::memcpy(&slot_num_str_num, slot_num_str.c_str(), sizeof(size_t)); - // std::cout << "slot_num_str_num: " << slot_num_str_num << std::endl; - std::string serialized_value = SerializeValue(fields, slot_num_str); - // std::cout << "Put_Fields: " << key.ToString() << " " << serialized_value << std::endl; - struct slot_content sc; - vlog_set_->put_value(&sc.vlog_num, &sc.value_offset, serialized_value); - slot_page_->set_slot(slot_num, &sc); - - char data[sizeof(size_t)]; - memcpy(data, &slot_num, sizeof(size_t)); - Slice slot_val(data, sizeof(data)); - - // return DB::Put(opt, key, slot_val); - return DB::Put(opt, key, serialized_value); - // TODO(end) +void DBImpl::SerializeValue(const FieldArray& fields, std::string &value, size_t slot_num) { + std::string tmp_value; + uint16_t value_size = sizeof(uint16_t); + uint16_t field_nums = 0; + for (const auto& field : fields) { + const uint8_t attr_name_len = field.name.size(); + const uint16_t attr_value_len = field.value.size(); + const size_t attr_size = attr_name_len + attr_value_len + sizeof(uint8_t) + sizeof(uint16_t); + char attr_data[attr_size]; + + size_t off = 0; + memcpy(attr_data+off, &attr_name_len, sizeof(uint8_t)); + off += sizeof(uint8_t); + memcpy(attr_data+off, field.name.c_str(), attr_name_len); + off += attr_name_len; + memcpy(attr_data+off, &attr_value_len, sizeof(uint16_t)); + off += sizeof(uint16_t); + memcpy(attr_data+off, field.value.c_str(), attr_value_len); + off += attr_value_len; + + assert(off == attr_size); + tmp_value += std::string(attr_data, attr_size); + value_size += attr_size; + field_nums ++; + } + + char value_data[value_size]; + memcpy(value_data, &value_size, sizeof(uint16_t)); + memcpy(value_data+sizeof(uint16_t), tmp_value.c_str(), tmp_value.size()); + + assert(sizeof(uint16_t) + tmp_value.size() == value_size); + value = std::string(value_data, value_size); +} +```` +##### 解码函数: +`void DBImpl::DeserializeValue(FieldArray& fields, const std::string& value_str)` + +**功能:** 将传入的待解码字符串反序列化为字段数组并存到 fields + +**字符串形式:** + +`single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |` + +`single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |` + +**输入:** 存放解码结果的字段数组,待解码的字符串 + +**具体实现如下:** +```` +void DBImpl::DeserializeValue(FieldArray& fields, const std::string& value_str) { + const char *value_data = value_str.c_str(); + const size_t value_len = value_str.size(); + size_t attr_off = sizeof(uint16_t); + + while (attr_off < value_len) { + uint8_t attr_name_len = *(uint8_t *)(value_data+attr_off); + attr_off += sizeof(uint8_t); + auto attr_name = std::string(value_data+attr_off, attr_name_len); + attr_off += attr_name_len; + + uint16_t attr_len = *(uint16_t *)(value_data+attr_off); + attr_off += sizeof(uint16_t); + auto attr_value = std::string(value_data+attr_off, attr_len); + attr_off += attr_len; + + fields.push_back({attr_name, attr_value}); + } + + assert(attr_off == value_len); } +```` +2. 通过字段查询 Key: 实现函数 FindKeysByField,传入字段名和字段的值就可以找到对应的key + +`std::vector FindKeysByField(leveldb::DB* db, const Field& field)` + +**功能:** 根据传入的字段值 field 查找所有包含该字段的 key,由于一个字段值可能对应多个key,所以返回`std::vector` + +**具体实现如下:** +```` +std::vector FindKeysByField(leveldb::DB* db, const Field& field) { + std::vector keys; + leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); + // 遍历数据库中所有的键值对 + for (it->SeekToFirst(); it->Valid() ; it->Next()) { + std::string key = it->key().ToString(); + FieldArray fields; + // 调用 Get_Fields 函数,获取 key 对应的字段数组 + db->Get_Fields(leveldb::ReadOptions(), key, fields); + // 遍历字段数组,如果字段数组中包含该字段,则将该 key 添加到 keys 中 + for (const auto& f : fields) { + if (f.name == field.name && f.value == field.value) { + keys.push_back(key); + break; // 假设每个key中每个字段值唯一 + } + } + } + + delete it; + return keys; } ```` + +### 2.2 KV分离 +在LevelDB中实现KV分离,即将键值对中的键和值存储在不同的存储区域,以优化写性能和点查询性能。 +#### 2.2.1 实验要求 +1. KV 分离设计 ++ a. 将LevelDB的key-value存储结构进行扩展,分离存储key和value ++ b. Key存储在一个LevelDB实例中,LSM-tree中的value为一个指向Value log文件和偏移地址的指针,用户Value存储在Value log中。 + +实现方法 + +**数据结构设计:** + +`memtable中:| key | slot_num | ` + +`slot_page中: | slot0:{vlog_no(定长), offset(定长)}, slot1:{vlog_no, offset}, ... | ` + +`value_log 中:|value 长度 | slot_num | attr个数(定长) | attr1_name的长度(定长) | attr1_name(变长) | attr1_value的长度(定长) | attr1_value(变长) | ... |` +2. 读取操作 ++ a. KV分离后依然支持点查询与范围查询操作。 + +实现方法 + +**读操作:** + +`Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key, const FieldArray& fields)` + +**功能:** + +将传入的字段数组插入数据库中 + +**步骤:** 1. 为当前 KV 对分配一个 size_t 类型的 slot_num; -2. 将 slot_num 转化为字符串形式 slot_num_str; 3. 调用 SerializeValue 函数将字段数组和 slot_num_str 序列化为字符串 serialized_value; 4. 实例化 slot_content 结构体 sc; 5. 调用 put_value 函数,以 sc 中的 vlog_num(vlog编号) 和 value_offset(在vlog中的偏移量) 为参数,将字符串 serialized_value 写入 vlog 中; 6. 调用 set_slot 函数,将 slot_content 中的内容赋值给 slot_num; 7. 将 slot_num 作为 value 写入数据库中; -读取流程 +**代码实现:** ```` -// TODO(begin): search the slotpage and get value from vlog - size_t slot_num = *(size_t *)value->c_str(); - struct slot_content sc; - std::string vlog_value; - slot_page_->get_slot(slot_num, &sc); - vlog_set_->get_value(sc.vlog_num, sc.value_offset, &vlog_value); - *value = vlog_value; - // TODO(end) +Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key, +const FieldArray& fields) { +std::string serialized_value; +// alloc_slot 函数作用:分配一个 slot_num +size_t slot_num = slot_page_->alloc_slot(); +// 调用 SerializeValue 函数将字段数组和 slot_num_str 序列化为字符串 serialized_value +SerializeValue(fields, serialized_value, slot_num); +// 实例化 slot_content 结构体 sc +struct slot_content sc; +// put_value函数作用:将序列化后的字符串 serialized_value 插入 value_log 中 +vlog_set_->put_value(sc, slot_num, serialized_value); +// set_slot函数作用: 将 slot_num 写入到 缓冲块中 +slot_page_->set_slot(slot_num, &sc); +// 将 slot_num 作为 value 插入 memtable 中 +char data[sizeof(size_t)]; +memcpy(data, &slot_num, sizeof(size_t)); +Slice slot_val(data, sizeof(data)); +return DB::Put(opt, key, slot_val); +} ```` -1. 读取 key 对应的 value, 也就是 slot_num -2. 实例化 slot_content 结构体 sc -3. 根据 slot_num 从 slot_page_ 中读取 slot_content -4. 利用 sc 中的 vlog_num(vlog编号) 和 value_offset(在vlog中的偏移量) 从 vlog 中读取字符串 -5. 将字符串进行解码得到 value +`size_t alloc_slot()` -删除流程 +**功能:** 分配一个 slot_num + +**实现步骤:** +1. 获取互斥锁; +2. 判断当前 bitmap 是否有空闲槽位,就是遍历 bitmap,找到第一个为 0 的位,然后设置该位为 1,返回该位对应的 slot_num。 + **具体实现如下:** +```` +size_t alloc_slot() { + // 获取互斥锁 + mtx.lock(); + size_t target_slot = first_empty_slot; + char *start_byte = get_bitmap_byte(slot2byte(first_empty_slot)); + const size_t off = slot2offset(first_empty_slot); + SETBIT(start_byte, off); + // find the next free slot + if (HASFREESLOT(*start_byte)) { + auto bit_off = find_first_free_slot_inbyte(*start_byte); + first_empty_slot += bit_off - off; + if (slot2byte(first_empty_slot) >= size) { + alloc_new_bitmap(); + } + } else { + size_t i; + for (i = slot2byte(first_empty_slot)+1; i < size; i++) { + char *byte = get_bitmap_byte(i); + if (HASFREESLOT(*byte)) { + // FIXME: pack four bytes to do free slot finding + auto bit_off = find_first_free_slot_inbyte(*byte); + first_empty_slot = byte2slot(i) + bit_off; + break; + } + } + // scale the bitmap + if (i >= size) { + alloc_new_bitmap(); +// char *byte = get_bitmap_byte(i); +// SETBIT(byte, 0); + first_empty_slot = byte2slot(i) + 1; + } + } + mtx.unlock(); + return target_slot; + } ```` -// TODO(begin) - ReadOptions ro; - ro.verify_checksums = true; - ro.fill_cache = false; - ro.snapshot = nullptr; - std::string value; - Get(ro, key, &value); - size_t slot_num = *(size_t *)value.c_str(); - struct slot_content sc; - std::string vlog_value; - slot_page_->get_slot(slot_num, &sc); - vlog_set_->del_value(sc.vlog_num, sc.value_offset); - // TODO(end) +`void set_slot(size_t slot_num, struct slot_content *sc)` + +**功能:** 将一个槽位的内容设置到缓存块中 + +**实现步骤:** +1. 计算块编号:通过 slotnum_hash2_blocknum 函数将槽位编号转换为块编号 +2. 确定缓存块位置:使用块编号对 BLOCK_NUM 取模,得到缓存块的位置 +3. 加锁:对目标缓存块加锁以确保线程安全 +4. 检查和更新缓存块 +5. 设置槽位内容:调用 set_slot 函数设置槽位内容 +6. 更新访问时间和脏标志:增加访问时间并标记为脏数据 +7. 解锁:释放锁 + **具体实现如下:** +```` +void set_slot(size_t slot_num, struct slot_content *sc) { + auto block_num = slotnum_hash2_blocknum(slot_num); + auto blockcache_num = block_num % BLOCK_NUM; + latches_[blockcache_num].lock(); + if (!info[blockcache_num].used || info[blockcache_num].block_num != block_num) { + if (info[blockcache_num].is_dirty) { + write_back_block(blockcache_num); + } + read_in_block(blockcache_num, block_num); + access_time[blockcache_num] = 0; + info[blockcache_num] = block_info(block_num, false, true); + } + set_slot(sc, blockcache_num, SLOT_OFFSET_IN_BLOCK(slot_num)); + access_time[blockcache_num]++; + info[blockcache_num].is_dirty = true; + latches_[blockcache_num].unlock(); + } ```` -1. 读取 key 对应 -### 锁机制 -### 3. 数据结构设计 -`key的格式:| key | slot_num | ` +`void VlogSet::put_value(struct slot_content &sc, size_t slot_num, const leveldb::Slice &value)` -`slot_page: | slot0:{vlog_no(定长), offset(定长)}, slot1:{vlog_no, offset}, ... | ` +**功能:** 将序列化后的字符串 serialized_value 插入 value_log 中,位置由 slot_content 确定 -`value 的格式:|value 长度 | slot_num | attr个数(定长) | attr1_name的长度(定长) | attr1_name(变长) | attr1_value的长度(定长) | attr1_value(变长) | ... |` +**输入:** 待插入的字符串 value,slot_num,slot_content -对于每一次读取,用户线程先读取lsm tree中key的slot_num下标,然后到slot_page中读取对应的slot内容(**每一个slot都是定长的**),之后再在这个slot中读取value所在的vlog文件号和偏移量offset,之后到对应的vlog文件中读取value。 +**具体实现如下:** +```` +void VlogSet::put_value(struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) { + mtx.lock(); + // 根据值的大小获取可写入的vlog信息 + auto vinfo = get_writable_vlog_info(value.size()); + if (!vinfo) { + // vlog全部已满,创建新的vlog + auto _vlog_num_ = register_new_vlog(); + vinfo = get_vlog_info(_vlog_num_); + } + // 锁定 vlog 信息,更新 slot_content 内容 + vinfo->vlog_info_latch_.lock(); + sc.vlog_num = vinfo->vlog_num; + sc.value_offset = vinfo->curr_size; + // 更新 vlog 内容,包括当前大小 curr_size 和存储的 value 个数 + vinfo->curr_size += value.size() + sizeof(uint16_t) + sizeof(size_t); + vinfo->value_nums ++; + // 根据 vlog 编号获取 vlog 处理器 + auto vhandler = get_vlog_handler(vinfo->vlog_num); + // 如果 vlog 无效或者正在进行GC,则使用 vlog_num_for_gc + if (!vinfo->vlog_valid_ || vinfo->processing_gc) { + vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); + } + // 加锁 + vhandler->vlog_latch_.hard_lock(); + // 增加访问线程数 + vhandler->incre_access_thread_nums(); // FIXME: increase thread nums + mtx.unlock(); // for better performance + // vlog 信息写入完毕,解锁 + vinfo->vlog_info_latch_.unlock(); + // 调用 write_vlog_value 函数,将字符串 serialized_value 写入 vlog 中 + write_vlog_value(sc, slot_num, value); + // 写入完毕,减少访问线程数 + vhandler->decre_access_thread_nums(); // FIXME: decrease thread nums + // 解锁 + vhandler->vlog_latch_.hard_unlock(); +} +```` +`void VlogSet::write_vlog_value(const struct slot_content &sc, size_t slot_num, const leveldb::Slice &value)` -但是这又带来了一个问题,我们该如何管理slot_page这个文件?当插入新的kv时,我们需要在这个slot_page中分配新的slot,在GC删除某个kv时,我们需要将对应的slot进行释放。这里我们选择在内存中维护一个可线性扩展的bitmap。这个bitmap中每一个bit标识了当前slot_page文件中对应slot是否被使用,是为1,不是为0。这样一来,在插入新kv时,我们可以用bitmap来分配一个新的slot(将bitmap中第一个为0的bit设置为1),将内容进行写入;在GC删除某个kv时,我们将这个slot对应的bitmap中的bit重置为0即可。 +**功能:** 将字符串 value 写入 vlog 中 +**输入:** slot_content,slot_num,字符串 value -### 4. 接口设计 -#### 4.1 在 LevelDB 的 value 中实现字段功能 -1. std::string SerializeValue(const FieldArray& fields, std::string slot_num_str) +**实现步骤:** -**功能:** 将字段数组和 slot_num_str 序列化为字符串 +**具体实现如下:** +```` +void VlogSet::write_vlog_value(const struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) { + // 函数 get_vlog_name 作用:获取 slot_content 中 vlog_num 对应的 vlog 名称 + auto vlog_name = get_vlog_name(sc.vlog_num); + // 打开文件:使用 fstream 打开文件,确保文件以读写模式打开 + auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); + // 定位写入位置:通过 seekp 方法将文件指针移动到 slot_content 中的 value_offset 位置 + handler.seekp(sc.value_offset); + // 准备数据:构造要写入的数据,包括值大小(uint16_t)、slot_num(size_t)和实际值内容(Slice) + const char *value_buff = value.data(); + + const size_t off = sizeof(uint16_t) + sizeof(size_t); + const size_t value_size = off + value.size(); + char data[value_size]; + memcpy(data, &value_size, sizeof(uint16_t)); + memcpy(data+sizeof(uint16_t), &slot_num, sizeof(size_t)); + memcpy(data+off, value_buff, value.size()); + + handler.write(data, value_size); + // 刷新缓冲区:调用 flush 方法确保数据写入磁盘 + handler.flush(); + // 关闭文件 + handler.close(); +} +```` -**输入:** 字段名和字段的值组成的字段数组 和 slot_num_str,即为该 KV 对分配的 slot_num 的字符串形式 +`Status DBImpl::Get_Fields(const ReadOptions& options, const Slice& key,FieldArray& fields)` -**输出:** 序列化后的字符串 +**功能:** -2. FieldArray DeserializeValue(const std::string& value_str) +从数据库中读取 key 对应的字段数组 -**功能:** 将字符串反序列化为字段数组 +**步骤:** -**输入:** 字符串 +读取流程 +1. 读取 key 对应的 slot_num +2. 实例化 slot_content 结构体 sc +3. 调用 get_slot 函数,根据 slot_num 从缓存中获取 slot_content +4. 调用 get_value 函数,根据 sc 中的 vlog_num(vlog编号) 和 value_offset(在vlog中的偏移量) 从 vlog 中读取字符串 +5. 将字符串解码得到 value -**输出:** 反序列化的字段数组 +**代码实现:** +```` +Status DBImpl::Get_Fields(const ReadOptions& options, const Slice& key, +FieldArray& fields) { +size_t slot_num; +// get_slot_num 函数作用:从 memtable 中读取 key 对应的 slot_num +auto s = get_slot_num(options, key, &slot_num); +if (!s.ok()) { +return s; +} -3. std::vector< std::string >FindKeysByField(leveldb::DB* db, Field &field) +struct slot_content sc; +std::string vlog_value; +// get_slot 函数作用:根据 slot_num 从缓存中获取 slot_content,并存放到 sc 中 +slot_page_->get_slot(slot_num, &sc); +// get_value 函数作用:根据 sc 中的信息,从 value_log 中读取字符串并存放到 vlog_value +vlog_set_->get_value(sc, &vlog_value); +if (vlog_value.empty()) { +return Status::NotFound("value has been deleted"); +} +// 调用 DeserializeValue 函数,将 vlog_value 解码为字段数组 fields +DeserializeValue(fields, vlog_value); +return Status::OK(); +} +```` +`void get_slot(size_t slot_num, struct slot_content *sc)` + +**功能:** 获取 slot_num 对应的 slot_content + +**实现步骤:** +1. 计算块编号:根据槽位号计算出对应的块编号。 +2. 锁定缓存块:通过哈希计算确定缓存块编号,并加锁以确保线程安全。 +3. 检查缓存命中:如果缓存未使用或块编号不匹配,则处理缓存未命中情况。 +4. 写回脏数据:如果缓存块是脏数据,先将其写回磁盘。 +5. 读取新块:从磁盘读取新的块到缓存,并更新访问时间和块信息。 +6. 读取槽位内容:从缓存块中读取指定槽位的内容。 +7. 解锁缓存块:操作完成后解锁。 + **具体实现如下:** +```` +void get_slot(size_t slot_num, struct slot_content *sc) { + auto block_num = slotnum_hash2_blocknum(slot_num); + auto blockcache_num = block_num % BLOCK_NUM; + latches_[blockcache_num].lock(); + if (!info[blockcache_num].used || info[blockcache_num].block_num != block_num) { // cache miss + if (info[blockcache_num].is_dirty) { + write_back_block(blockcache_num); + } + read_in_block(blockcache_num, block_num); + access_time[blockcache_num] = 0; + info[blockcache_num] = block_info(block_num, false, true); + } + read_slot(sc, blockcache_num, SLOT_OFFSET_IN_BLOCK(slot_num)); + access_time[blockcache_num]++; + latches_[blockcache_num].unlock(); + } +```` -**功能:** 根据字段名和字段的值找到对应的key +`void VlogSet::get_value(const struct slot_content &sc, std::string *value)` -**输入:** 数据库名,字段名和字段的值 +**功能:** 从 vlog 中读取字符串 -**输出:** 包含该字段和字段数组的 key,由于可能不只有一个,所以返回值为 vector +**实现步骤:** +1. 获取 vlog_num 和 vlog_handler -4. Put_Fields(const WriteOptions& opt, const Slice& key, - const FieldArray& fields) +**具体实现如下:** +```` +void VlogSet::get_value(const struct slot_content &sc, std::string *value) { + // 获取互斥锁 + mtx.lock(); + // get_vlog_info 函数作用:根据 sc 中的 vlog_num 获取 vlog_info + auto vinfo = get_vlog_info(sc.vlog_num); + // get_vlog_handler 函数作用:根据 sc 中的 vlog_num 获取 vlog_handler + auto vhandler = get_vlog_handler(sc.vlog_num); + // 加 vlog 信息锁 + vinfo->vlog_info_latch_.lock(); + // 根据 vinfo 检查 vlog 是否有效 + if (!vinfo->vlog_valid_) { + // 如果无效,则进行垃圾处理 + vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); + } + // 加锁 + vhandler->vlog_latch_.soft_lock(); + // 增加访问线程数 + vhandler->incre_access_thread_nums(); // FIXME: increase thread nums + // 释放互斥锁和 vlog 信息锁 + mtx.unlock(); // for better performance + vinfo->vlog_info_latch_.unlock(); + // read_vlog_value 函数作用:根据 sc 中的 vlog_num 和 value_offset 从 vlog 中读取字符串 + read_vlog_value(sc, value); + // 减少访问线程数 + vhandler->decre_access_thread_nums(); // FIXME: decrease thread nums + // 释放 vlog 锁 + vhandler->vlog_latch_.soft_unlock(); +} +```` +`void VlogSet::read_vlog_value(const struct slot_content &sc, std::string *value)` -**功能:** 仿照Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value),调用序列化函数,实现以字段形式插入 value +**功能:** 根据 sc 中的 vlog_num 和 value_offset 从 vlog 中读取字符串并存放到 value -5. Get_Fields(const ReadOptions& options, const Slice& key, - FieldArray* fields) +**实现步骤:** +1. 根据 sc 中的 vlog_num 获取 vlog 文件名 + **具体实现如下:** +```` +void VlogSet::read_vlog_value(const struct slot_content &sc, std::string *value) { + // 根据 sc 中的 vlog_num 获取 vlog 文件名 + auto vlog_name = get_vlog_name(sc.vlog_num); + // 打开 vlog 文件 + auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); + // 使用 seekp 方法将文件指针定位到 value_offset 指定的位置 + handler.seekp(sc.value_offset); + // 从文件中读取固定大小的数据到缓冲区 value_buff + char value_buff[VALUE_BUFF_SIZE]; + handler.read(value_buff, VALUE_BUFF_SIZE); + // 从缓冲区中提取值的大小,并检查是否被删除标记 + uint16_t value_size; + memcpy(&value_size, value_buff, sizeof(uint16_t)); + // 如果值带有删除标记,则将结果字符串设置为空并返回 + if (value_size & VALUE_DELE_MASK) { + *value = ""; + return ; + } + // 计算实际值的大小并从缓冲区中提取值,存储到结果字符串中 + value_size &= VALUE_SIZE_MASK; + assert(value_size <= VALUE_BUFF_SIZE); + const size_t off = sizeof(uint16_t)+sizeof(size_t); + *value = std::string(&value_buff[off], value_size-off); + // 关闭文件 + handler.close(); +} +```` +`Status DBImpl::Delete(const WriteOptions& options, const Slice& key)` -**功能:** 仿照Status DB::Get(const WriteOptions& opt, const Slice& key, const Slice& value),读取key对应的 value 之后,通过调用反序列化函数,将 value 反序列化为字段数组,并存到 fields 中 +**功能:** 删除 key 对应的条目 -#### 4.2 实现KV分离 -1. 搜索slot_page文件: Status find_slot(const Slice& key, Slot *slot); -2. 搜索vlog文件: Status find_value(Slot *slot); -3. 分配新的slot: Status allocate_slot(Bitmap *map, uint64_t *s); -4. 释放slot: void deallocate_slot(Bitmap *map, uint64_t *s); +**步骤:** +1. 获取 key 对应的 slot_num +**代码实现:** +```` +// 删除操作,删除 key 对应的条目 +Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { +size_t slot_num; +// get_slot_num 函数的作用: 获取 key 对应的 slot_num +auto s = get_slot_num(ReadOptions(), key, &slot_num); +if (!s.ok()) { +return s; +} +struct slot_content sc; +// get_slot 函数作用: +slot_page_->get_slot(slot_num, &sc); +// del_value 函数作用: +vlog_set_->del_value(sc); +slot_page_->dealloc_slot(slot_num); -### 5. 功能测试 -### 5.1 在 LevelDB 的 value 中实现字段功能 -#### 5.1.1 功能测试 -1. 能否以字段形式插入并读取数据 -2. 能否以通过字段值查询对应的 key -```` -Status OpenDB(std::string dbName, DB **db) { - Options options; - options.create_if_missing = true; - return DB::Open(options, dbName, db); +return DB::Delete(options, key); } +```` +3. value_log 的管理 ++ a. 当Value log超过一定大小后通过后台GC操作释放Value log中的无效数据。 ++ b. GC能把旧Value log中没有失效的数据写入新的Value log,并更新LSM-tree里的键值对。 ++ c. 新旧Value log的管理功能。 + +**GC 和 slot_page 管理, vlog管理:** +4. 确保操作的原子性 + +**锁机制:** +[`/db/shared_lock.h`](./db/shared_lock.h) 定义了一个 SharedLock 类,用于实现读写锁机制,包含四种操作:soft_lock():获取共享读锁,确保在没有写操作时允许多个读操作并发进行;soft_unlock():释放共享读锁;hard_lock():获取独占写锁,确保只有当没有其他读写操作时,允许写入操作进行;hard_unlock():释放独占写锁。 +#### 2.2.1 实验内容 ++ 1) 不改变LevelDB原有的接口,实现KV分离。 ++ 2) 编写测试点验证KV分离是否正确实现。 + +**设计思路:** +1. value的分离式存储 + 我们使用若干个vlog文件,为每一个vlog文件设置容量上限(比如16MiB),并在内存中为每一个vlog维护一个discard计数器,表示这个vlog中当前有多少value已经在lsm tree中被标记为删除。 +2. 存储value所在vlog和偏移量的元数据 + 我们在 memtable 和vlog中添加一个slot_page的中间层,这一层存储每一个key对应的value所在的vlog文件和文件内偏移,而lsm tree中的key包含的实际上是这个中间层的slot下标,而每一个slot中存储的是key所对应的vlog文件号以及value在vlog中的偏移。这样,我们就可以在不修改lsm tree的基础上,完成对vlog的compaction,并将vlog的gc结果只反映在这个中间层slot_page中。这个slot_page实际上也是一个线性增长的log文件,作用类似于os中的页表,负责维护lsm tree中存储的slot下标到vlog和vlog内偏移量的一个映射。这样,通过slot_page我们就可以找到具体的vlog文件和其文件内偏移量。对于vlog的GC过程,我们不需要修改lsm tree中的内容,我们只需要修改slot_page中的映射即可。 +3. slot_page文件和vlog文件的GC + 对于vlog文件,我们在内存中维护一个bitmap,用来表示每一个slot的使用情况,并在插入和GC删除kv时进行动态的分配和释放。对于vlog文件的GC,我们用一个后台线程来扫描所有vlog的discard计数器。当某些vlog的discard计数器超过某个阈值(比如1024),我们就对这些vlog文件进行GC过程,当GC完成之后将slot_page中的slot元数据进行更新,再将原来的vlog文件进行删除,GC过程就完成了。 + +##### 2.2.1 相关代码文件 +- [`/db/db_impl.cc`](./db/db_impl.cc): 修改函数 DBImpl::Get, DBImpl::Put 和 DBImpl::Delete,添加函数 Put_fields, Get_fields, get_slot_num,SerializeValue, DeserializeValue +- [`/db/db_impl.h`](./db/db_impl.h): 添加两个结构体 SlotPage *slot_page_; VlogSet *vlog_set_ ,添加增加的相关函数的声明 +- +- [`/db/shared_lock.h`](./db/shared_lock.h) 定义了一个 SharedLock 类,用于实现读写锁机制,包含四种操作:soft_lock():获取共享读锁,确保在没有写操作时允许多个读操作并发进行;soft_unlock():释放共享读锁;hard_lock():获取独占写锁,确保只有当没有其他读写操作时,允许写入操作进行;hard_unlock():释放独占写锁。 +- [`/db/slotpage.h`](./db/slotpage.h) +- [`/db/threadpool.h`](./db/threadpool.h) +- [`/db/vlog.h`](./db/vlog.h) +- [`/db/vlog_gc.cpp`](./db/vlog_gc.cpp) +- [`/db/vlog_gc.h`](./db/vlog_gc.h) +- [`/db/vlog_set.cpp`](./db/vlog_set.cpp) +- [`/db/vlog_set.h`](./db/vlog_set.h) +- +- [`/test/db_test3.cc`](./test/db_test3.cc):测试 value 的字段功能 +- [`/test/db_test4.cc`](./test/db_test4.cc) +- [`/test/db_test5.cc`](./test/db_test5.cc) +- +- [`CMakeLists.txt`](CMakeLists.txt):添加可执行文件 + +对于每一次读取,用户线程先读取lsm tree中key的slot_num下标,然后到slot_page中读取对应的slot内容(**每一个slot都是定长的**),之后再在这个slot中读取value所在的vlog文件号和偏移量offset,之后到对应的vlog文件中读取value。 + +但是这又带来了一个问题,我们该如何管理slot_page这个文件?当插入新的kv时,我们需要在这个slot_page中分配新的slot,在GC删除某个kv时,我们需要将对应的slot进行释放。这里我们选择在内存中维护一个可线性扩展的bitmap。这个bitmap中每一个bit标识了当前slot_page文件中对应slot是否被使用,是为1,不是为0。这样一来,在插入新kv时,我们可以用bitmap来分配一个新的slot(将bitmap中第一个为0的bit设置为1),将内容进行写入;在GC删除某个kv时,我们将这个slot对应的bitmap中的bit重置为0即可。 + +### 3. 功能测试 +#### 3.1 在 LevelDB 的 value 中实现字段功能 +1. 以字段形式插入,读取数据 +2. 根据 key 删除数据 +3. 通过字段值查询对应的 key + +**测试流程:** + +1. 写入4条数据; +2. 读取这四条数据; +3. 检查读取的数据跟写入的是否一致; +4. 通过字段值查询对应的 key; +5. 删除查找到的 keys 中的第一个key; +6. 通过4中的字段值查询对应的 key,查找到的数目比4中少一个。 + +**测试代码:** +```` TEST(TestSchema, Basic) { - DB *db; + DB* db; WriteOptions writeOptions; ReadOptions readOptions; - if(OpenDB("testdb", &db).ok() == false) { + if (!OpenDB("testdb_function", &db).ok()) { std::cerr << "open db failed" << std::endl; abort(); } + std::string key0 = "k_0"; std::string key1 = "k_1"; std::string key2 = "k_2"; + std::string key3 = "k_3"; + FieldArray fields0 = {{"name", "myc&wxf"}}; FieldArray fields1 = { {"name", "Customer1"}, {"address", "IVhzIApeRb"}, {"phone", "25-989-741-2988"} }; - FieldArray fields2 = { {"name", "Customer1"}, {"address", "ecnu"}, @@ -243,60 +637,64 @@ TEST(TestSchema, Basic) { FieldArray fields3 = { {"name", "Customer2"}, {"address", "ecnu"}, - {"phone", "111111111"} + {"phone", "11111"} }; - // 序列化并插入 - std::string value1 = SerializeValue(fields1); - std::string value2 = SerializeValue(fields2); - std::string value3 = SerializeValue(fields3); - db->Put(leveldb::WriteOptions(), key1, value1); - db->Put(leveldb::WriteOptions(), key2, value2); - db->Put(leveldb::WriteOptions(), key2, value3); - - // 读取并反序列化 - std::string value_ret; - db->Get(leveldb::ReadOptions(), key1, &value_ret); - auto fields_ret = ParseValue(value_ret); - + db->Put_Fields(leveldb::WriteOptions(), key0, fields0); + db->Put_Fields(leveldb::WriteOptions(), key1, fields1); + db->Put_Fields(leveldb::WriteOptions(), key2, fields2); + db->Put_Fields(leveldb::WriteOptions(), key3, fields3); + FieldArray fields_ret_0; + FieldArray fields_ret_1; + FieldArray fields_ret_2; + FieldArray fields_ret_3; + db->Get_Fields(leveldb::ReadOptions(), key0, fields_ret_0); + db->Get_Fields(leveldb::ReadOptions(), key1, fields_ret_1); + db->Get_Fields(leveldb::ReadOptions(), key2, fields_ret_2); + db->Get_Fields(leveldb::ReadOptions(), key3, fields_ret_3); // 检查反序列化结果 - ASSERT_EQ(fields_ret.size(), fields1.size()); - for (size_t i = 0; i < fields_ret.size(); ++i) { - ASSERT_EQ(fields_ret[i].first, fields1[i].first); - ASSERT_EQ(fields_ret[i].second, fields1[i].second); + ASSERT_EQ(fields_ret_0.size(), fields0.size()); + for (size_t i = 0; i < fields_ret_0.size(); ++i) { + ASSERT_EQ(fields_ret_0[i].name, fields0[i].name); + ASSERT_EQ(fields_ret_0[i].value, fields0[i].value); + } + ASSERT_EQ(fields_ret_1.size(), fields1.size()); + for (size_t i = 0; i < fields_ret_1.size(); ++i) { + ASSERT_EQ(fields_ret_1[i].name, fields1[i].name); +// ASSERT_EQ(fields_ret_1[i].value, fields1[i].value); } - // 测试查找功能 - Field query_field = {"name", "Customer2"}; + Field query_field = {"name", "Customer1"}; std::vector found_keys = FindKeysByField(db, query_field); - std::cout << "找到的key有:" << found_keys.size() << "个" << std::endl; - + // 删除查找到的第一个 key + const std::string& key = found_keys[0]; + db->Delete(leveldb::WriteOptions(), key); + // 再次查找 + std::vector found_deleted_keys = FindKeysByField(db, query_field); // 关闭数据库 delete db; } - -int main(int argc, char **argv) { +int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } ```` -#### 5.1.2 测试结果 -插入三条数据,name 字段分别为: Customer1, Customer1, Customer2 +**测试结果:** -先根据 "name":"customer1"查找,结果为: -![图片](./pic/test_field_1.png) -在根据"name":"customer2"查找,结果为: -![图片](./pic/test_field_2.png) -### 5.2 +#### 3.2 测试并发插入和读取数据 +#### 3.3 测试 GC +#### 3.4 测试 单元测试: 1. 测试插入超过初始slot_page等slot数量之后,是否还能正常插入,检查slot_page文件等线性可扩展性 2. 测试插入后,进行删除,等待GC完成后再读取value和vlog的大小,看看GC过程是否正常进行。 -性能测试: -1. 测试插入的吞吐 -2. 测试在只有删除的情况下,GC的效率 -3. 测试在插入和删除不同比重的负载下,系统的吞吐情况 +### 4. 性能测试: +#### 4.1 测试吞吐量 +#### 4.2 测试延迟 +#### 4.3 测试写放大 +吞吐率下降很多 +写放大下降很多 #### 6. 可能遇到的挑战与解决方案 列出实现过程中可能遇到的技术难题及其解决思路,如如何处理GC开销、数据同步、索引原子更新等问题。 各种参数的设置,比如vlog的容量上限,以及slot_page的bitmap管理方式是否足够高效?以及在GC过程中如果对被GC中的vlog进行写入该让用户线程和后台线程以什么样的方式进行同步?slot_page的读写放大也是一个重要的问题。 @@ -313,10 +711,10 @@ int main(int argc, char **argv) { | vlog的GC实现 | 12.29 | 马也驰 | | 性能测试 | 1.5 | 王雪飞, 马也驰 | | 功能测试 | 1.5 | 王雪飞, 马也驰 | - - - - - - +报告待完成部分: ++ alloc_slot() set_slot() get_slot() ++ gc过程 ++ slot_page 管理,value_log 管理 ++ 性能测试 ++ 功能测试 \ No newline at end of file diff --git a/test/db_test1.cc b/test/db_test1.cc index 6f7bdc5..43ab156 100644 --- a/test/db_test1.cc +++ b/test/db_test1.cc @@ -12,10 +12,10 @@ int test1() { Status status = DB::Open(op, "testdb1", &db); assert(status.ok()); db->Put(WriteOptions(), "001", "leveldb"); - string s; - db->Get(ReadOptions(), "001", &s); - cout<get_slot_num(ReadOptions(), "001", &s); + cout<Put(WriteOptions(), "002", "world"); string s1; diff --git a/test/db_test2.cc b/test/db_test2.cc index cd266e6..51b43f9 100644 --- a/test/db_test2.cc +++ b/test/db_test2.cc @@ -6,7 +6,7 @@ using namespace leveldb; constexpr int value_size = 2048; -constexpr int data_size = 512 << 20; +constexpr int data_size = 512 << 19; // 3. 数据管理(Manifest/创建/恢复数据库) Status OpenDB(std::string dbName, DB **db) { @@ -56,12 +56,12 @@ void GetData(DB *db, int size = (1 << 30)) { int main() { DB *db; - if(OpenDB("testdb", &db).ok()) { + if(OpenDB("testdb_19", &db).ok()) { InsertData(db); delete db; } - if(OpenDB("testdb", &db).ok()) { + if(OpenDB("testdb_19", &db).ok()) { GetData(db); delete db; } diff --git a/test/db_test3.cc b/test/db_test3.cc index a204f89..6de0644 100644 --- a/test/db_test3.cc +++ b/test/db_test3.cc @@ -29,7 +29,10 @@ std::vector FindKeysByField(leveldb::DB* db, const Field& field) { } } } - + std::cout << "Found " << keys.size() << " keys" << std::endl; + for (const auto& key : keys) { + std::cout << "Found key: " << key << std::endl; + } delete it; return keys; } @@ -40,71 +43,24 @@ Status OpenDB(std::string dbName, DB** db) { return DB::Open(options, dbName, db); } -// 吞吐量测试函数 -void TestThroughput(leveldb::DB* db, int num_operations) { - WriteOptions writeOptions; - auto start_time = std::chrono::steady_clock::now(); - - for (int i = 0; i < num_operations; ++i) { - std::string key = "key_" + std::to_string(i); - FieldArray fields = {{"name", "Customer" + std::to_string(i)}, {"address", "Address" + std::to_string(i)}, {"phone", "1234567890"}}; - db->Put_Fields(writeOptions, key, fields); - } - - auto end_time = std::chrono::steady_clock::now(); - auto duration = std::chrono::duration_cast(end_time - start_time).count(); - std::cout << "Throughput: " << num_operations * 1000 / duration << " OPS" << std::endl; -} - -// 延迟测试函数 -void TestLatency(leveldb::DB* db, int num_operations, std::vector& lat_res) { - WriteOptions writeOptions; - int64_t latency = 0; - auto end_time = std::chrono::steady_clock::now(); - auto last_time = end_time; - for (int i = 0; i < num_operations; ++i) { - // 执行写入操作 - std::string key = "key_" + std::to_string(i); - FieldArray fields = {{"name", "Customer" + std::to_string(i)}, {"address", "Address" + std::to_string(i)}, {"phone", "1234567890"}}; - db->Put_Fields(writeOptions, key, fields); - db->Get_Fields(leveldb::ReadOptions(), key, fields); - - end_time = std::chrono::steady_clock::now(); - latency = std::chrono::duration_cast( - end_time - last_time).count(); - last_time = end_time; - - lat_res.emplace_back(latency); - } - - // 输出延迟统计信息(可选) - double avg_latency = std::accumulate(lat_res.begin(), lat_res.end(), 0.0) / lat_res.size(); - std::cout << "Average Latency: " << std::fixed << std::setprecision(2) << avg_latency << " ms" << std::endl; - std::cout << "Max Latency: " << *std::max_element(lat_res.begin(), lat_res.end()) << " ms" << std::endl; - std::cout << "Min Latency: " << *std::min_element(lat_res.begin(), lat_res.end()) << " ms" << std::endl; -} - TEST(TestSchema, Basic) { DB* db; WriteOptions writeOptions; ReadOptions readOptions; - if (!OpenDB("testdb", &db).ok()) { + if (!OpenDB("testdb_function", &db).ok()) { std::cerr << "open db failed" << std::endl; abort(); } - // std::string key = "key"; std::string key0 = "k_0"; std::string key1 = "k_1"; std::string key2 = "k_2"; std::string key3 = "k_3"; - // std::string value = "value"; - FieldArray fields0 = {{"name", "wxf"}}; + FieldArray fields0 = {{"name", "myc&wxf"}}; FieldArray fields1 = { {"name", "Customer1"}, {"address", "IVhzIApeRb"}, {"phone", "25-989-741-2988"} }; - FieldArray fields2 = { {"name", "Customer1"}, {"address", "ecnu"}, @@ -115,49 +71,40 @@ TEST(TestSchema, Basic) { {"address", "ecnu"}, {"phone", "11111"} }; - // db->Put(writeOptions, key, value); - // std::cout << "put_value: " << value << std::endl; db->Put_Fields(leveldb::WriteOptions(), key0, fields0); db->Put_Fields(leveldb::WriteOptions(), key1, fields1); db->Put_Fields(leveldb::WriteOptions(), key2, fields2); db->Put_Fields(leveldb::WriteOptions(), key3, fields3); - // std::string value_ret; - // db->Get(readOptions, key, &value_ret); - // std::cout << "get_value: " << value_ret << std::endl; - // 读取并反序列化 FieldArray fields_ret_0; FieldArray fields_ret_1; + FieldArray fields_ret_2; + FieldArray fields_ret_3; db->Get_Fields(leveldb::ReadOptions(), key0, fields_ret_0); db->Get_Fields(leveldb::ReadOptions(), key1, fields_ret_1); - - + db->Get_Fields(leveldb::ReadOptions(), key2, fields_ret_2); + db->Get_Fields(leveldb::ReadOptions(), key3, fields_ret_3); // 检查反序列化结果 ASSERT_EQ(fields_ret_0.size(), fields0.size()); for (size_t i = 0; i < fields_ret_0.size(); ++i) { ASSERT_EQ(fields_ret_0[i].name, fields0[i].name); ASSERT_EQ(fields_ret_0[i].value, fields0[i].value); } - ASSERT_EQ(fields_ret_1.size(), fields1.size()); for (size_t i = 0; i < fields_ret_1.size(); ++i) { ASSERT_EQ(fields_ret_1[i].name, fields1[i].name); // ASSERT_EQ(fields_ret_1[i].value, fields1[i].value); } - // 测试查找功能 - Field query_field = {"name", "Customer2"}; + Field query_field = {"name", "Customer1"}; std::vector found_keys = FindKeysByField(db, query_field); - std::cout << "找到的key有:" << found_keys.size() << "个" << std::endl; - /*// 吞吐量测试 - TestThroughput(db, 10000);*/ - - /* // 延迟测试 - std::vector latency_results; - TestLatency(db, 1000, latency_results);*/ + // 删除查找到的第一个 key + const std::string& key = found_keys[0]; + db->Delete(leveldb::WriteOptions(), key); + // 再次查找 + std::vector found_deleted_keys = FindKeysByField(db, query_field); // 关闭数据库 delete db; } - int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); diff --git a/test/db_test4.cc b/test/db_test4.cc index b7c04d8..afdd62e 100644 --- a/test/db_test4.cc +++ b/test/db_test4.cc @@ -2,9 +2,11 @@ // Created by 马也驰 on 2024/12/20. // -#include "../db/slotpage.h" +#include #include +#include "../db/slotpage.h" + #define SLOTNUM 8192 void printVector(const std::vector& vec) { diff --git a/test/db_test5.cc b/test/db_test5.cc index 2151825..1f458fd 100644 --- a/test/db_test5.cc +++ b/test/db_test5.cc @@ -2,6 +2,7 @@ // Created by 马也驰 on 2024/12/22. // +#include #include #include #include diff --git a/test/db_test_latent.cc b/test/db_test_latent.cc new file mode 100644 index 0000000..6ef7373 --- /dev/null +++ b/test/db_test_latent.cc @@ -0,0 +1,122 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "leveldb/env.h" + +#include "gtest/gtest.h" + +using namespace leveldb; +// 根据字段值查找所有包含该字段的 key,遍历 +std::vector FindKeysByField(leveldb::DB* db, const Field& field) { + std::vector keys; + leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); + + for (it->SeekToFirst(); it->Valid() ; it->Next()) { + std::string key = it->key().ToString(); + FieldArray fields; + db->Get_Fields(leveldb::ReadOptions(), key, fields); + for (const auto& f : fields) { + if (f.name == field.name && f.value == field.value) { + keys.push_back(key); + break; // 假设每个key中每个字段值唯一 + } + } + } + std::cout << "Found " << keys.size() << " keys" << std::endl; + delete it; + return keys; +} + +Status OpenDB(std::string dbName, DB** db) { + Options options; + options.create_if_missing = true; + return DB::Open(options, dbName, db); +} + +// 吞吐量测试函数 +void TestThroughput(leveldb::DB* db, int num_operations) { + WriteOptions writeOptions; + auto start_time = std::chrono::steady_clock::now(); + + for (int i = 0; i < num_operations; ++i) { + std::string key = "key_" + std::to_string(i); + std::string value = "value_" + std::to_string(i); + db->Put(writeOptions, key, value); + // FieldArray fields = { + // {"name", "Customer" + std::to_string(i)}, + // {"address", "Address" + std::to_string(i)}, + // {"phone", "1234567890"}}; + // db->Put_Fields(writeOptions, key, fields); + } + for (int i = 0; i < num_operations; ++i) { + std::string key = "key_" + std::to_string(i); + std::string value; + db->Get(ReadOptions(), key, &value); + } + + auto end_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(end_time - start_time).count(); + std::cout << "Throughput: " << num_operations * 1000 / duration << " OPS" << std::endl; +} + +// 延迟测试函数 +void TestLatency(leveldb::DB* db, int num_operations, std::vector& lat_res) { + WriteOptions writeOptions; + int64_t latency = 0; + auto end_time = std::chrono::steady_clock::now(); + auto last_time = end_time; + for (int i = 0; i < num_operations; ++i) { + // 执行写入操作 + std::string key = "key_" + std::to_string(i); + // FieldArray fields = { + // {"name", "Customer" + std::to_string(i)}, + // {"address", "Address" + std::to_string(i)}, + // {"phone", "1234567890"}}; + // db->Get_Fields(leveldb::ReadOptions(), key, fields); + std::string value = "value_" + std::to_string(i); + db->Put(writeOptions, key, value); + + end_time = std::chrono::steady_clock::now(); + latency = std::chrono::duration_cast( + end_time - last_time).count(); + last_time = end_time; + + lat_res.emplace_back(latency); + } + + // 输出延迟统计信息(可选) + double avg_latency = std::accumulate(lat_res.begin(), lat_res.end(), 0.0) / lat_res.size(); + std::cout << "Average Latency: " << std::fixed << std::setprecision(2) << avg_latency << " ms" << std::endl; + std::cout << "Max Latency: " << *std::max_element(lat_res.begin(), lat_res.end()) << " ms" << std::endl; + std::cout << "Min Latency: " << *std::min_element(lat_res.begin(), lat_res.end()) << " ms" << std::endl; +} + +TEST(TestSchema, Basic) { + DB* db; + WriteOptions writeOptions; + ReadOptions readOptions; + if (!OpenDB("testdb", &db).ok()) { + std::cerr << "open db failed" << std::endl; + abort(); + } + // 吞吐量测试 + TestThroughput(db, 1000); + + // 延迟测试 + std::vector latency_results; + TestLatency(db, 1000, latency_results); + // 关闭数据库 + delete db; +} + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file