Browse Source

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	db/db_impl.cc
master
马也驰 8 months ago
parent
commit
fe583e0de3
5 changed files with 357 additions and 120 deletions
  1. +136
    -2
      db/db_impl.cc
  2. +15
    -3
      db/db_impl.h
  3. +13
    -0
      include/leveldb/db.h
  4. +78
    -1
      report.md
  5. +115
    -114
      test/db_test3.cc

+ 136
- 2
db/db_impl.cc View File

@ -11,6 +11,10 @@
#include <set> #include <set>
#include <string> #include <string>
#include <vector> #include <vector>
#include <iomanip>
#include <iostream>
#include <numeric>
#include <sstream>
#include "db/builder.h" #include "db/builder.h"
#include "db/db_iter.h" #include "db/db_iter.h"
@ -146,12 +150,14 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
tmp_batch_(new WriteBatch), tmp_batch_(new WriteBatch),
background_compaction_scheduled_(false), background_compaction_scheduled_(false),
manual_compaction_(nullptr), manual_compaction_(nullptr),
// TODO(begin)
versions_(new VersionSet(dbname_, &options_, table_cache_, versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)), &internal_comparator_)),
slot_page_(new SlotPage(dbname)) { slot_page_(new SlotPage(dbname)) {
vlog_set_ = new VlogSet(dbname, nullptr); vlog_set_ = new VlogSet(dbname, nullptr);
vlog_set_->set_vlog_gc(new VlogGC(slot_page_, vlog_set_)); vlog_set_->set_vlog_gc(new VlogGC(slot_page_, vlog_set_));
// TODO(end)
} }
DBImpl::~DBImpl() { DBImpl::~DBImpl() {
@ -181,9 +187,10 @@ DBImpl::~DBImpl() {
if (owns_cache_) { if (owns_cache_) {
delete options_.block_cache; delete options_.block_cache;
} }
// TODO(begin)
delete slot_page_; delete slot_page_;
delete vlog_set_; delete vlog_set_;
// TODO(end)
} }
Status DBImpl::NewDB() { Status DBImpl::NewDB() {
@ -1180,6 +1187,32 @@ Status DBImpl::get_slot_num(const ReadOptions& options, const Slice& key,
return s; return s;
} }
Status DBImpl::Get_Fields(const ReadOptions& options, const Slice& key,
FieldArray* fields) {
// Todo(begin)
std::string value;
Status s = Get(options, key, &value);
if (!s.ok()) {
return s;
}
// 从value中提取slot_num
size_t slot_num;
slot_num = *(size_t *)value.c_str(); // 这里假设value的前几个字节存储了slot_num
struct slot_content sc;
std::string vlog_value;
// 从slot_page中获取slot内容
slot_page_->get_slot(slot_num, &sc);
// 从vlog_set中获取实际的日志值
vlog_set_->get_value(sc.vlog_num, sc.value_offset, &vlog_value);
// 更新value为从vlog获取的值
value = vlog_value;
std::cout << "value from value_log: " << key.ToString() << value << std::endl;
*fields = DeserializeValue(value);
return Status::OK();
// TODO(end)
}
Status DBImpl::Get(const ReadOptions& options, const Slice& key, Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) { std::string* value) {
size_t slot_num; size_t slot_num;
@ -1225,6 +1258,26 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
} }
// Convenience methods // Convenience methods
Status DBImpl::Put_Fields(const WriteOptions& opt, const Slice& key,
const FieldArray& fields) {
// TODO(begin): allocate slot_num in slotpage and put value in vlog
// 将字段数组序列化
std::string serialized_value = SerializeValue(fields);
std::cout << "Put_Fields: " << key.ToString() << " " << serialized_value << std::endl;
size_t slot_num = slot_page_->alloc_slot();
struct slot_content sc;
vlog_set_->put_value(&sc.vlog_num, &sc.value_offset, serialized_value);
slot_page_->set_slot(slot_num, &sc);
char data[sizeof(size_t)];
memcpy(data, &slot_num, sizeof(size_t));
Slice slot_val(data, sizeof(data));
return DB::Put(opt, key, slot_val);
// TODO(end)
}
// Convenience methods
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
// TODO: allocate slot_num in slotpage and put value in vlog // TODO: allocate slot_num in slotpage and put value in vlog
@ -1537,7 +1590,88 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
v->Unref(); v->Unref();
} }
// Todo(begin)
// 反序列化函数,将字符串解码为字段数组
FieldArray DBImpl::DeserializeValue(const std::string& value_str) {
// 存放解析后的字段数组
FieldArray fields;
// 将输入字符串转换为输入流 iss, 方便读取
std::istringstream iss(value_str);
std::string content;
// 临时存放读取的数据
char buffer[100];
// 读取长度(定长,16比特)
iss.read(buffer, 16);
buffer[16] = '\0';
size_t total_length = std::stoi(buffer);
// std::cout << "读取到的总长度为: " << total_length << std::endl;
std::string value_content(value_str.begin() + 16, value_str.begin() + 16 + total_length);
// std::cout << value_content << std::endl;
std::istringstream iss_content(value_content);
iss_content.read(buffer, sizeof(size_t));
buffer[sizeof(size_t)] = '\0';
std::string slot_num = buffer;
// 读取属性个数
iss_content.read(buffer, 16);
// 在第17个比特位处添加终结符,确保字符串以终结符结尾
buffer[16] = '\0';
// 将 buffer 中的内容转化为整数并赋值给 field_count
int field_count = std::stoi(buffer);
// std::cout << "读取到的字段个数为: " << field_count << std::endl;
for (int i = 0; i < field_count; ++i) {
Field field;
// 读取属性名长度(定长,16比特)
iss_content.read(buffer, 16);
buffer[16] = '\0';
int name_length = std::stoi(buffer);
// std::cout << "读取到的属性名长度为: " << name_length << std::endl;
// 读取属性名(变长)
field.name.resize(name_length);
iss_content.read(&field.name[0], name_length);
// std::cout << "读取到的属性名为: " << field.name << std::endl;
// 读取属性值长度(定长,16比特)
iss_content.read(buffer, 16);
buffer[16] = '\0';
int value_length = std::stoi(buffer);
// std::cout << "读取到的属性值长度为: " << value_length << std::endl;
// 读取属性值(变长)
field.value.resize(value_length);
iss_content.read(&field.value[0], value_length);
// std::cout << "读取到的属性值为: " << field.value << std::endl;
fields.push_back(field);
}
return fields;
}
// Todo(end)
// Todo(begin)
// 序列化函数,将字段数组序列化为字符串
std::string DBImpl::SerializeValue(const FieldArray& fields) {
// 创建并初始化一个字符串流 oss,用于逐步构建最终的序列化字符串
std::ostringstream oss_temp;
std::string slot_num = "slot_num";
oss_temp << std::setw(sizeof(size_t)) << std::setfill('0') << slot_num;
// 写入属性个数(定长,16比特),使用std::setw(16)设置宽度,使用std::setfull(0)设置填充字符,将字段数组的大小写入oss中
oss_temp << std::setw(16) << std::setfill('0') << fields.size();
for (const auto& field : fields) {
// 写入属性名长度(定长,16比特)
oss_temp << std::setw(16) << std::setfill('0') << field.name.size();
// 写入属性名(变长)
oss_temp << field.name;
// 写入属性值长度(定长,16比特)
oss_temp << std::setw(16) << std::setfill('0') << field.value.size();
// 写入属性值(变长)
oss_temp << field.value;
}
std::string temp_str = oss_temp.str();
size_t value_length = temp_str.size();
std::ostringstream oss;
oss << std::setw(16) << std::setfill('0') << value_length;
oss << temp_str;
return oss.str();
}
// Todo(end)
// Default implementations of convenience methods that subclasses of DB // Default implementations of convenience methods that subclasses of DB
// can call if they wish // can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {

+ 15
- 3
db/db_impl.h View File

@ -37,14 +37,24 @@ class DBImpl : public DB {
DBImpl& operator=(const DBImpl&) = delete; DBImpl& operator=(const DBImpl&) = delete;
~DBImpl() override; ~DBImpl() override;
// Todo(begin)
using FieldArray = std::vector<Field>;
// Todo(end)
// Implementations of the DB interface // Implementations of the DB interface
Status Put(const WriteOptions&, const Slice& key, Status Put(const WriteOptions&, const Slice& key,
const Slice& value) override; const Slice& value) override;
// Todo(begin)
Status Put_Fields(const WriteOptions& opt, const Slice& key,
const FieldArray& fields) override;
// Todo(end)
Status Delete(const WriteOptions&, const Slice& key) override; Status Delete(const WriteOptions&, const Slice& key) override;
Status Write(const WriteOptions& options, WriteBatch* updates) override; Status Write(const WriteOptions& options, WriteBatch* updates) override;
Status Get(const ReadOptions& options, const Slice& key, Status Get(const ReadOptions& options, const Slice& key,
std::string* value) override; std::string* value) override;
// Todo(begin)
Status Get_Fields(const ReadOptions& options, const Slice& key,
FieldArray* fields) override;
// Todo(end)
Iterator* NewIterator(const ReadOptions&) override; Iterator* NewIterator(const ReadOptions&) override;
const Snapshot* GetSnapshot() override; const Snapshot* GetSnapshot() override;
void ReleaseSnapshot(const Snapshot* snapshot) override; void ReleaseSnapshot(const Snapshot* snapshot) override;
@ -78,10 +88,12 @@ class DBImpl : public DB {
friend class DB; friend class DB;
struct CompactionState; struct CompactionState;
struct Writer; struct Writer;
// TODO(begin)
SlotPage *slot_page_; SlotPage *slot_page_;
VlogSet *vlog_set_; VlogSet *vlog_set_;
static std::string SerializeValue(const FieldArray& fields);
static FieldArray DeserializeValue(const std::string& value_str);
// TODO(end)
// Information for a manual compaction // Information for a manual compaction
struct ManualCompaction { struct ManualCompaction {
int level; int level;

+ 13
- 0
include/leveldb/db.h View File

@ -7,6 +7,7 @@
#include <cstdint> #include <cstdint>
#include <cstdio> #include <cstdio>
#include <vector>
#include "leveldb/export.h" #include "leveldb/export.h"
#include "leveldb/iterator.h" #include "leveldb/iterator.h"
@ -21,6 +22,13 @@ static const int kMinorVersion = 23;
struct Options; struct Options;
struct ReadOptions; struct ReadOptions;
struct WriteOptions; struct WriteOptions;
// Todo(begin)
struct Field {
std::string name;
std::string value;
};
using FieldArray = std::vector<Field>;
// Todo(end)
class WriteBatch; class WriteBatch;
// Abstract handle to particular state of a DB. // Abstract handle to particular state of a DB.
@ -145,6 +153,11 @@ class LEVELDB_EXPORT DB {
// Therefore the following call will compact the entire database: // Therefore the following call will compact the entire database:
// db->CompactRange(nullptr, nullptr); // db->CompactRange(nullptr, nullptr);
virtual void CompactRange(const Slice* begin, const Slice* end) = 0; virtual void CompactRange(const Slice* begin, const Slice* end) = 0;
//
// Todo(begin)
virtual Status Put_Fields(const leveldb::WriteOptions& opt, const leveldb::Slice& key, const FieldArray& fields) = 0;
virtual Status Get_Fields(const leveldb::ReadOptions& options, const leveldb::Slice& key, FieldArray* fields) = 0;
// // Todo(end)
}; };
// Destroy the contents of the specified database. // Destroy the contents of the specified database.

+ 78
- 1
report.md View File

@ -61,7 +61,85 @@
3. slot_page文件和vlog文件的GC 3. slot_page文件和vlog文件的GC
对于vlog文件,我们在内存中维护一个bitmap,用来表示每一个slot的使用情况,并在插入和GC删除kv时进行动态的分配和释放。对于vlog文件的GC,我们用一个后台线程来扫描所有vlog的discard计数器。当某些vlog的discard计数器超过某个阈值(比如1024),我们就对这些vlog文件进行GC过程,当GC完成之后将slot_page中的slot元数据进行更新,再将原来的vlog文件进行删除,GC过程就完成了。 对于vlog文件,我们在内存中维护一个bitmap,用来表示每一个slot的使用情况,并在插入和GC删除kv时进行动态的分配和释放。对于vlog文件的GC,我们用一个后台线程来扫描所有vlog的discard计数器。当某些vlog的discard计数器超过某个阈值(比如1024),我们就对这些vlog文件进行GC过程,当GC完成之后将slot_page中的slot元数据进行更新,再将原来的vlog文件进行删除,GC过程就完成了。
##### 2.2.1 相关代码文件
- [`/db/db_impl.cc`](./db/db_impl.cc): 修改 DBImpl::Get, DBImpl::Put 和 DBImpl::Delete
- [`/db/db_impl.h`](./db/db_impl.h): 添加两个结构体 SlotPage *slot_page_; VlogSet *vlog_set_;
-
- [`/db/shared_lock.h`](./db/shared_lock.h) 定义了一个 SharedLock 类,用于实现读写锁机制,包含四种操作:soft_lock():获取共享读锁,确保在没有写操作时允许多个读操作并发进行;soft_unlock():释放共享读锁;hard_lock():获取独占写锁,确保只有当没有其他读写操作时,允许写入操作进行;hard_unlock():释放独占写锁。
- [`/db/slotpage.h`](./db/slotpage.h)
- [`/db/threadpool.h`](./db/threadpool.h)
- [`/db/vlog.h`](./db/vlog.h)
- [`/db/vlog_gc.cpp`](./db/vlog_gc.cpp)
- [`/db/vlog_gc.h`](./db/vlog_gc.h)
- [`/db/vlog_set.cpp`](./db/vlog_set.cpp)
- [`/db/vlog_set.h`](./db/vlog_set.h)
-
- [`/test/db_test3.cc`](./test/db_test3.cc):测试 value 的字段功能
- [`/test/db_test4.cc`](./test/db_test4.cc)
- [`/test/db_test5.cc`](./test/db_test5.cc)
-
- [`CMakeLists.txt`](CMakeLists.txt):添加可执行文件
##### 2.2.1 具体流程
写入流程
````
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
// TODO(begin): allocate slot_num in slotpage and put value in vlog
size_t slot_num = slot_page_->alloc_slot();
struct slot_content sc;
vlog_set_->put_value(&sc.vlog_num, &sc.value_offset, val);
slot_page_->set_slot(slot_num, &sc);
char data[sizeof(size_t)];
memcpy(data, &slot_num, sizeof(size_t));
Slice slot_val(data, sizeof(data));
return DB::Put(o, key, slot_val);
// TODO(end)
}
````
1. 调用编码函数,将 val 编码为字符串
2. 在 slot_page_ 中为 K-V 对分配一个 slot ,编号为 slot_num
3. 实例化 slot_content 结构体 sc
4. 以 sc 中的 vlog_num(vlog编号) 和 value_offset(在vlog中的偏移量) 为参数,将字符串写入 vlog 中
5. 将 slot_content 中的内容赋值给 slot_num
6. 将 slot_num 作为 key 的 value 写入数据库中
读取流程
````
// TODO(begin): search the slotpage and get value from vlog
size_t slot_num = *(size_t *)value->c_str();
struct slot_content sc;
std::string vlog_value;
slot_page_->get_slot(slot_num, &sc);
vlog_set_->get_value(sc.vlog_num, sc.value_offset, &vlog_value);
*value = vlog_value;
// TODO(end)
````
1. 读取 key 对应的 value, 也就是 slot_num
2. 实例化 slot_content 结构体 sc
3. 根据 slot_num 从 slot_page_ 中读取 slot_content
4. 利用 sc 中的 vlog_num(vlog编号) 和 value_offset(在vlog中的偏移量) 从 vlog 中读取字符串
5. 将字符串进行解码得到 value
删除流程
````
// TODO(begin)
ReadOptions ro;
ro.verify_checksums = true;
ro.fill_cache = false;
ro.snapshot = nullptr;
std::string value;
Get(ro, key, &value);
size_t slot_num = *(size_t *)value.c_str();
struct slot_content sc;
std::string vlog_value;
slot_page_->get_slot(slot_num, &sc);
vlog_set_->del_value(sc.vlog_num, sc.value_offset);
// TODO(end)
````
1. 读取 key 对应
### 锁机制
### 3. 数据结构设计 ### 3. 数据结构设计
`key的格式:| key | slot_num | ` `key的格式:| key | slot_num | `
@ -111,7 +189,6 @@
5. Get_Fields (待实现) 5. Get_Fields (待实现)
#### 4.2 实现KV分离 #### 4.2 实现KV分离
这里只展示和vlog以及GC无关的接口,vlog的创建,管理以及后台线程的GC设计到vlog等新数据结构的实现,较为复杂和庞大,这里不做展示。我们只列出与kv的插入有关的新接口:
1. 搜索slot_page文件: Status find_slot(const Slice& key, Slot *slot); 1. 搜索slot_page文件: Status find_slot(const Slice& key, Slot *slot);
2. 搜索vlog文件: Status find_value(Slot *slot); 2. 搜索vlog文件: Status find_value(Slot *slot);
3. 分配新的slot: Status allocate_slot(Bitmap *map, uint64_t *s); 3. 分配新的slot: Status allocate_slot(Bitmap *map, uint64_t *s);

+ 115
- 114
test/db_test3.cc View File

@ -13,96 +13,88 @@
#include "gtest/gtest.h" #include "gtest/gtest.h"
using namespace leveldb; using namespace leveldb;
// 字段信息结构体
struct Field {
std::string name;
std::string value;
};
using FieldArray = std::vector<Field>;
// 序列化函数,将字段数组编码为字符串
std::string SerializeValue(const FieldArray& fields) {
// 创建并初始化一个字符串流 oss,用于逐步构建最终的序列化字符串
std::ostringstream oss_temp;
std::string slot_num = "slot_num";
oss_temp << std::setw(sizeof(size_t)) << std::setfill('0') << slot_num;
// 写入属性个数(定长,16比特),使用std::setw(16)设置宽度,使用std::setfull(0)设置填充字符,将字段数组的大小写入oss中
oss_temp << std::setw(16) << std::setfill('0') << fields.size();
for (const auto& field : fields) {
// 写入属性名长度(定长,16比特)
oss_temp << std::setw(16) << std::setfill('0') << field.name.size();
// 写入属性名(变长)
oss_temp << field.name;
// 写入属性值长度(定长,16比特)
oss_temp << std::setw(16) << std::setfill('0') << field.value.size();
// 写入属性值(变长)
oss_temp << field.value;
}
std::string temp_str = oss_temp.str();
size_t value_length = temp_str.size();
std::ostringstream oss;
oss << std::setw(16) << std::setfill('0') << value_length;
oss << temp_str;
std::cout << "value 的长度为: " << value_length << std::endl;
std::cout << "总长度为: " << oss.str().size() << std::endl;
return oss.str();
}
// 反序列化函数,将字符串解码为字段数组
FieldArray ParseValue(const std::string& value_str) {
// 存放解析后的字段数组
FieldArray fields;
// 将输入字符串转换为输入流 iss, 方便读取
std::istringstream iss(value_str);
std::string content;
// 临时存放读取的数据
char buffer[100];
// 读取长度(定长,16比特)
iss.read(buffer, 16);
buffer[16] = '\0';
size_t total_length = std::stoi(buffer);
// std::cout << "读取到的总长度为: " << total_length << std::endl;
std::string value_content(value_str.begin() + 16, value_str.begin() + 16 + total_length);
// std::cout << value_content << std::endl;
std::istringstream iss_content(value_content);
iss_content.read(buffer, sizeof(size_t));
buffer[sizeof(size_t)] = '\0';
std::string slot_num = buffer;
// 读取属性个数
iss_content.read(buffer, 16);
// 在第17个比特位处添加终结符,确保字符串以终结符结尾
buffer[16] = '\0';
// 将 buffer 中的内容转化为整数并赋值给 field_count
int field_count = std::stoi(buffer);
// std::cout << "读取到的字段个数为: " << field_count << std::endl;
for (int i = 0; i < field_count; ++i) {
Field field;
// 读取属性名长度(定长,16比特)
iss_content.read(buffer, 16);
buffer[16] = '\0';
int name_length = std::stoi(buffer);
// std::cout << "读取到的属性名长度为: " << name_length << std::endl;
// 读取属性名(变长)
field.name.resize(name_length);
iss_content.read(&field.name[0], name_length);
// std::cout << "读取到的属性名为: " << field.name << std::endl;
// 读取属性值长度(定长,16比特)
iss_content.read(buffer, 16);
buffer[16] = '\0';
int value_length = std::stoi(buffer);
// std::cout << "读取到的属性值长度为: " << value_length << std::endl;
// 读取属性值(变长)
field.value.resize(value_length);
iss_content.read(&field.value[0], value_length);
// std::cout << "读取到的属性值为: " << field.value << std::endl;
fields.push_back(field);
}
return fields;
}
//// 序列化函数,将字段数组编码为字符串
//std::string SerializeValue(const FieldArray& fields) {
// // 创建并初始化一个字符串流 oss,用于逐步构建最终的序列化字符串
// std::ostringstream oss_temp;
// std::string slot_num = "slot_num";
// oss_temp << std::setw(sizeof(size_t)) << std::setfill('0') << slot_num;
// // 写入属性个数(定长,16比特),使用std::setw(16)设置宽度,使用std::setfull(0)设置填充字符,将字段数组的大小写入oss中
// oss_temp << std::setw(16) << std::setfill('0') << fields.size();
// for (const auto& field : fields) {
// // 写入属性名长度(定长,16比特)
// oss_temp << std::setw(16) << std::setfill('0') << field.name.size();
// // 写入属性名(变长)
// oss_temp << field.name;
// // 写入属性值长度(定长,16比特)
// oss_temp << std::setw(16) << std::setfill('0') << field.value.size();
// // 写入属性值(变长)
// oss_temp << field.value;
// }
// std::string temp_str = oss_temp.str();
// size_t value_length = temp_str.size();
//
// std::ostringstream oss;
// oss << std::setw(16) << std::setfill('0') << value_length;
// oss << temp_str;
//
// std::cout << "value 的长度为: " << value_length << std::endl;
// std::cout << "总长度为: " << oss.str().size() << std::endl;
// return oss.str();
//}
//// 反序列化函数,将字符串解码为字段数组
//FieldArray ParseValue(const std::string& value_str) {
// // 存放解析后的字段数组
// FieldArray fields;
// // 将输入字符串转换为输入流 iss, 方便读取
// std::istringstream iss(value_str);
// std::string content;
// // 临时存放读取的数据
// char buffer[100];
// // 读取长度(定长,16比特)
// iss.read(buffer, 16);
// buffer[16] = '\0';
// size_t total_length = std::stoi(buffer);
// // std::cout << "读取到的总长度为: " << total_length << std::endl;
// std::string value_content(value_str.begin() + 16, value_str.begin() + 16 + total_length);
// // std::cout << value_content << std::endl;
// std::istringstream iss_content(value_content);
// iss_content.read(buffer, sizeof(size_t));
// buffer[sizeof(size_t)] = '\0';
// std::string slot_num = buffer;
// // 读取属性个数
// iss_content.read(buffer, 16);
// // 在第17个比特位处添加终结符,确保字符串以终结符结尾
// buffer[16] = '\0';
// // 将 buffer 中的内容转化为整数并赋值给 field_count
// int field_count = std::stoi(buffer);
// // std::cout << "读取到的字段个数为: " << field_count << std::endl;
//
// for (int i = 0; i < field_count; ++i) {
// Field field;
// // 读取属性名长度(定长,16比特)
// iss_content.read(buffer, 16);
// buffer[16] = '\0';
// int name_length = std::stoi(buffer);
// // std::cout << "读取到的属性名长度为: " << name_length << std::endl;
// // 读取属性名(变长)
// field.name.resize(name_length);
// iss_content.read(&field.name[0], name_length);
// // std::cout << "读取到的属性名为: " << field.name << std::endl;
// // 读取属性值长度(定长,16比特)
// iss_content.read(buffer, 16);
// buffer[16] = '\0';
// int value_length = std::stoi(buffer);
// // std::cout << "读取到的属性值长度为: " << value_length << std::endl;
// // 读取属性值(变长)
// field.value.resize(value_length);
// iss_content.read(&field.value[0], value_length);
// // std::cout << "读取到的属性值为: " << field.value << std::endl;
// fields.push_back(field);
// }
// return fields;
//}
// 根据字段值查找所有包含该字段的 key,遍历 // 根据字段值查找所有包含该字段的 key,遍历
std::vector<std::string> FindKeysByField(leveldb::DB* db, const Field& field) { std::vector<std::string> FindKeysByField(leveldb::DB* db, const Field& field) {
@ -111,10 +103,8 @@ std::vector FindKeysByField(leveldb::DB* db, const Field& field) {
for (it->SeekToFirst(); it->Valid() ; it->Next()) { for (it->SeekToFirst(); it->Valid() ; it->Next()) {
std::string key = it->key().ToString(); std::string key = it->key().ToString();
std::string value;
db->Get(leveldb::ReadOptions(), key, &value);
FieldArray fields = ParseValue(value);
FieldArray fields;
db->Get_Fields(leveldb::ReadOptions(), key, &fields);
for (const auto& f : fields) { for (const auto& f : fields) {
if (f.name == field.name && f.value == field.value) { if (f.name == field.name && f.value == field.value) {
keys.push_back(key); keys.push_back(key);
@ -141,8 +131,7 @@ void TestThroughput(leveldb::DB* db, int num_operations) {
for (int i = 0; i < num_operations; ++i) { for (int i = 0; i < num_operations; ++i) {
std::string key = "key_" + std::to_string(i); std::string key = "key_" + std::to_string(i);
FieldArray fields = {{"name", "Customer" + std::to_string(i)}, {"address", "Address" + std::to_string(i)}, {"phone", "1234567890"}}; FieldArray fields = {{"name", "Customer" + std::to_string(i)}, {"address", "Address" + std::to_string(i)}, {"phone", "1234567890"}};
std::string value = SerializeValue(fields);
db->Put(writeOptions, key, value);
db->Put_Fields(writeOptions, key, fields);
} }
auto end_time = std::chrono::steady_clock::now(); auto end_time = std::chrono::steady_clock::now();
@ -160,9 +149,8 @@ void TestLatency(leveldb::DB* db, int num_operations, std::vector& lat_
// 执行写入操作 // 执行写入操作
std::string key = "key_" + std::to_string(i); std::string key = "key_" + std::to_string(i);
FieldArray fields = {{"name", "Customer" + std::to_string(i)}, {"address", "Address" + std::to_string(i)}, {"phone", "1234567890"}}; FieldArray fields = {{"name", "Customer" + std::to_string(i)}, {"address", "Address" + std::to_string(i)}, {"phone", "1234567890"}};
std::string value = SerializeValue(fields);
db->Put(writeOptions, key, value);
db->Get(leveldb::ReadOptions(), key, &value);
db->Put_Fields(writeOptions, key, fields);
db->Get_Fields(leveldb::ReadOptions(), key, &fields);
end_time = std::chrono::steady_clock::now(); end_time = std::chrono::steady_clock::now();
latency = std::chrono::duration_cast<std::chrono::milliseconds>( latency = std::chrono::duration_cast<std::chrono::milliseconds>(
@ -187,9 +175,13 @@ TEST(TestSchema, Basic) {
std::cerr << "open db failed" << std::endl; std::cerr << "open db failed" << std::endl;
abort(); abort();
} }
std::string key = "key";
std::string key0 = "k_0";
std::string key1 = "k_1"; std::string key1 = "k_1";
std::string key2 = "k_2"; std::string key2 = "k_2";
std::string key3 = "k_3"; std::string key3 = "k_3";
std::string value = "value";
FieldArray fields0 = {{"name", "wxf"}};
FieldArray fields1 = { FieldArray fields1 = {
{"name", "Customer1"}, {"name", "Customer1"},
{"address", "IVhzIApeRb"}, {"address", "IVhzIApeRb"},
@ -206,24 +198,33 @@ TEST(TestSchema, Basic) {
{"address", "ecnu"}, {"address", "ecnu"},
{"phone", "11111"} {"phone", "11111"}
}; };
// 序列化并插入
std::string value1 = SerializeValue(fields1);
std::string value2 = SerializeValue(fields2);
std::string value3 = SerializeValue(fields3);
db->Put(leveldb::WriteOptions(), key1, value1);
db->Put(leveldb::WriteOptions(), key2, value2);
db->Put(leveldb::WriteOptions(), key3, value3);
// 读取并反序列化
db->Put(writeOptions, key, value);
std::cout << "put_value: " << value << std::endl;
db->Put_Fields(leveldb::WriteOptions(), key0, fields0);
db->Put_Fields(leveldb::WriteOptions(), key1, fields1);
db->Put_Fields(leveldb::WriteOptions(), key2, fields2);
db->Put_Fields(leveldb::WriteOptions(), key3, fields3);
std::string value_ret; std::string value_ret;
db->Get(leveldb::ReadOptions(), key1, &value_ret);
auto fields_ret = ParseValue(value_ret);
db->Get(readOptions, key, &value_ret);
std::cout << "get_value: " << value_ret << std::endl;
// 读取并反序列化
FieldArray fields_ret_0;
FieldArray fields_ret_1;
db->Get_Fields(leveldb::ReadOptions(), key0, &fields_ret_0);
db->Get_Fields(leveldb::ReadOptions(), key1, &fields_ret_1);
// 检查反序列化结果 // 检查反序列化结果
ASSERT_EQ(fields_ret.size(), fields1.size());
for (size_t i = 0; i < fields_ret.size(); ++i) {
ASSERT_EQ(fields_ret[i].name, fields1[i].name);
ASSERT_EQ(fields_ret[i].value, fields1[i].value);
ASSERT_EQ(fields_ret_0.size(), fields0.size());
for (size_t i = 0; i < fields_ret_0.size(); ++i) {
ASSERT_EQ(fields_ret_0[i].name, fields1[i].name);
ASSERT_EQ(fields_ret_0[i].value, fields1[i].value);
}
ASSERT_EQ(fields_ret_1.size(), fields1.size());
for (size_t i = 0; i < fields_ret_1.size(); ++i) {
ASSERT_EQ(fields_ret_1[i].name, fields1[i].name);
ASSERT_EQ(fields_ret_1[i].value, fields1[i].value);
} }
// 测试查找功能 // 测试查找功能

Loading…
Cancel
Save