字段结构: 使用 std::pair<std::string, std::string>
表示字段名和字段值的关系,构成一个字段数组 FieldArray
,即 std::vector<std::pair<std::string, std::string>>
。
序列化和反序列化
:
SerializeValue
函数,将字段数组序列化为字符串,以便存储到 LevelDB 的 value 中。ParseValue
函数,从存储的字符串中解析出字段数组。db->Put()
方法将序列化后的字符串作为 value 存储到 LevelDB 中。TEST(TestSchema, Basic)
中演示如何创建一个包含多个字段的 value,进行序列化和存储,并随后读取和反序列化。FindKeysByField
,能够根据指定的字段名和字段值查找所有对应的 key。由于一个字段值可能对应多个 key,因此返回类型为 std::vector<std::string>
。思路:
字段数组 FieldArray
被定义为 std::vector<std::pair<std::string, std::string>>
类型,每个字段包含一个键值对。为了方便存储,需要将字段数组序列化为字符串格式 key:value,key:value,...
。
在序列化过程中,字段 key
和 value
通过 ":"
拼接,字段之间用 ","
分隔,最后一个字段不加分隔符。
代码解释:
resString
存储最终的序列化结果。FieldArray
,将每对键值对拼接成 key:value
格式并追加到 resString
中。这种序列化方式确保了数据的紧凑性,适合高效存储。
void MyLevelDB::SerializeValue(const FieldArray& fields,
std::string& resString) {
resString.clear();
for (int i = 0; i < fields.size(); i++) {
const std::string& key = fields[i].first;
const std::string& value = fields[i].second;
resString += key + ":" +value;
if (i != fields.size() - 1) {
resString += ",";
}
}
}
思路:
反序列化过程需要将序列化的字符串拆解为键值对,并存储到字段数组 FieldArray
中。具体操作包括:
","
分割字符串,得到 key:value
格式的子字符串。":"
分割,提取 key
和 value
。key
和 value
存入字段数组。代码解释:
std::stringstream
拆分字符串,第一层按 ","
分割,第二层按 ":"
分割。key
和 value
非空,将结果存入 FieldArray
。该步骤将存储的数据还原为结构化的字段数组,便于后续的字段查找和处理。
void MyLevelDB::ParseValue(const std::string& value_str,
FieldArray& resFieldArray) {
std::stringstream ss(value_str);
std::string segment;
// 按逗号分割字符串
while (std::getline(ss, segment, ',')) {
std::string key;
std::string value;
std::stringstream kv(segment);
if (std::getline(kv, key, ':') && std::getline(kv, value, ':')) {
if (!key.empty() && !value.empty()) {
resFieldArray.push_back(std::make_pair(key, value));
// std::cout << ((resFieldArray.back()).first).data() << std::endl;
} else {
std::cerr << "Invalid key-value pair: " << segment << std::endl;
}
} else {
std::cerr << "Failed to parse segment: " << segment << std::endl;
}
}
}
思路:
为了实现字段查找功能,需要遍历数据库中的所有数据,逐条反序列化后判断是否包含目标字段。如果匹配到查询的字段,则将对应的 key
存入结果集 keys
中。
代码解释:
NewIterator
遍历数据库的每条记录。ParseValue
将字符串反序列化为字段数组。key
提取并存入结果集中。通过这种逐条解析和匹配的方式,可以实现对字段的灵活查询,支持模糊匹配和多条件过滤等扩展功能。
Status MyLevelDB::FindKeysByField(const ReadOptions& options, const Field field,
std::vector<std::string>* keys) {
auto it = _fields_db->NewIterator(options);
it->SeekToFirst();
keys->clear();
while (it->Valid()) {
auto val = it->value();
FieldArray arr;
auto str_val = std::string(val.data(), val.size());
ParseValue(str_val, arr);
for (auto pr : arr) {
if (pr.first == field.first && pr.second == field.second) {
Slice key = it->key();
keys->push_back(std::string(key.data(), key.size()));
break;
}
}
it->Next();
}
delete it;
return Status::OK();
}
思路: 为了支持快速查询,需要在插入数据时为部分字段创建索引。索引创建的核心步骤包括:
key
组合存入索引数据库。代码解释:
SerializeValue
将字段数组序列化后存入 _fields_db
数据库。index_list_
中的字段名匹配,确定需要创建索引的字段。field_value:key
,并存入对应的 index_db
数据库。这一设计实现了字段的高效存储与索引管理,通过索引提升了字段查询的性能。
Status MyLevelDB::PutWithFields(const WriteOptions& options,const std::string& key,const FieldArray& fields) {
std::string value;
SerializeValue(fields, value);
auto slice_key = Slice(key.c_str());
auto slice_value = Slice(value.c_str());
Status s = _fields_db->Put(options, slice_key, slice_value);
std::unordered_map<int, int> match;
std::unique_lock<std::mutex> l(mutex_);
for (int i = 0; i < fields.size(); i++) {
for (size_t idx = 0; idx < index_list_.size(); idx++) {
const auto& i_name = index_list_[idx];
if (fields[i].first == i_name) {
match[i] = idx;
break;
}
}
}
for (auto item : match) {
std::string composed_key;
composed_key += fields[item.first].second + ":" + key;
s = index_db[item.second]->Put(options, composed_key, Slice());
}
return s;
}
思路:
CreateIndexOnField
函数用于为指定字段 field_name
创建索引。索引的核心在于维护一个字段值到主键的映射关系,索引字段的存储格式为:value:key:null
,其中 value
是字段值,key
是主键。
在实现中,index_list_
用于记录已创建索引的字段名,而 index_db
是索引数据库的集合,专门存储各字段的索引数据。
实现步骤:
index_list_
,判断指定字段是否已存在索引。如果存在,则返回错误状态,避免重复创建。index_list_
并为其创建新的数据库实例。索引数据库以 _db_name + "_index_" + field_name
命名,便于区分和管理。index_db
,供后续使用。代码逻辑解释:
index_list_
和 index_db
实现了索引的动态管理。DB::Open
函数为字段名创建独立的索引数据库,确保索引和数据的分离。这种设计将字段索引的创建与基础数据存储解耦,确保了功能模块的独立性。
Status MyLevelDB::CreateIndexOnField(const std::string& field_name) {
for (const auto& field : this->index_list_) {
if (field == field_name) {
return Status::InvalidArgument(field_name,
"Index already exists for this field");
}
}
index_list_.push_back(field_name);
Options op = _op;
DB* field_db;
op.index_mode = true;
Status status = DB::Open(op, _db_name + "_index_" + field_name, &field_db);
index_db.push_back(field_db);
if (!status.ok()) {
std::cerr << "Failed to open index DB: " << status.ToString() << std::endl;
abort();
}
return status;
}
思路:
DeleteIndex
函数用于从系统中删除指定字段的索引。删除操作的核心是从 index_list_
和 index_db
中移除相关信息,释放存储资源。
实现步骤:
index_list_
,查找指定字段。如果未找到,返回错误状态,提示索引不存在。index_list_
中移除字段名,并根据需要释放 index_db
中的数据库资源。代码逻辑解释:
std::find
查找字段名是否存在,避免重复删除。该功能确保了索引的动态管理,便于根据业务需求调整索引配置。
Status MyLevelDB::DeleteIndex(std::string& field_name) {
auto it = std::find(index_list_.begin(), index_list_.end(), field_name);
if (it == index_list_.end()) {
return Status::NotFound("Index not found for this field");
}
// 从列表中移除该字段
index_list_.erase(it);
return Status::OK();
}
思路:
QueryByIndex
函数利用二级索引实现高效的字段值查询。通过索引数据库,可以快速定位字段值对应的主键,避免全表扫描。
实现步骤:
index_list_
,查找指定字段名对应的索引数据库。如果未找到,触发异常。value:key
格式解析每条记录,提取字段值和主键。代码逻辑解释:
value:key:null
,通过解析每条记录的键值对,实现字段值到主键的映射。通过二级索引的设计,查询复杂度由全表扫描降低为按索引查找,显著提升了查询效率。
void MyLevelDB::QueryByIndex(const ReadOptions& options, Field& field,
std::vector<std::string>& keys) {
int i = 0;
for (; i < index_list_.size(); i++) {
if (index_list_[i] == field.first) {
break;
}
}
assert(i != index_list_.size());
auto it = index_db[i]->NewIterator(options);
it->SeekToFirst();
while (it->Valid()) {
auto val = it->key();
auto str_val = std::string(val.data(), val.size());
std::string key;
std::string value;
std::stringstream kv(str_val);
std::getline(kv, key, ':');
std::getline(kv, value, ':');
if (key == field.second) {
keys.push_back(value);
}
it->Next();
}
delete it;
}
Options op;
op.create_if_missing = true;
MyLevelDB db(op, "testMyDB");
//序列化测试
std::string res1;
FieldArray fields1 = {
{"name", "Customer#000000001"}, {"address", "abc"}, {"phone", "def"}};
db.SerializeValue(fields1,res1);
std::cout << "序列化测试结果:" << std::endl << res1 << std::endl;
//反序列化测试
FieldArray fields2;
db.ParseValue(res1,fields2);
std::cout << "反序列化测试结果:" << std::endl ;
for (int i = 0; i < fields2.size(); i++) {
std::cout << fields2[i].first << ":" << fields2[i].second << std::endl;
}
//字段存储
std::cout << "字段存储和查找结果:" << std::endl;
std::string key2 = "k_1";
std::string key3 = "k_2";
std::string key4 = "k_3";
FieldArray field2 = {{"name", "Customer#000000001"},
{"address", "IVhzIApeRb"},
{"phone", "25-989-741-2988"}};
FieldArray field3 = {
{"name", "Customer#000000001"}, {"address", "abc"}, {"phone", "def"}};
FieldArray field4 = {
{"name", "Customer#000000001"}, {"address", "abc"}, {"phone", "def"}};
db.PutWithFields(WriteOptions(), key2, field2);
db.PutWithFields(WriteOptions(), key3, field3);
db.PutWithFields(WriteOptions(), key4, field4);
//字段查找
FieldArray value_ret;
std::vector<std::string> v;
db.FindKeysByField(ReadOptions(), field2[1], &v);
for (auto s : v) std::cout << s << "\n";
//创建索引
WriteOptions writeOptions;
ReadOptions readOptions;
Options options;
options.create_if_missing = true;
auto db1 = new MyLevelDB(options, "testdb2");
db1->CreateIndexOnField("address");
std::string key8 = "k_8";
std::string key9 = "k_9";
FieldArray fields8 = {
{"name", "Customer#000000001"}, {"address", "abc"}, {"phone", "def"}};
FieldArray fields9 = {{"name", "Customer#000000001"},
{"address", "IVhzIApeRb"},
{"phone", "25-989-741-2988"}};
FieldArray fields10 = {
{"name", "Customer#000000001"}, {"address", "abc"}, {"phone", "def"}};
FieldArray fields11 = {
{"name", "Customer#000000001"}, {"address", "abc"}, {"phone", "def"}};
FieldArray fields12 = {
{"name", "Customer#000000001"}, {"address", "abc"}, {"phone", "def"}};
Field query = {"address", "abc"};
db1->PutWithFields(WriteOptions(), key8, fields8);
db1->PutWithFields(WriteOptions(), key9, fields9);
std::cout << "索引存储与查找:" << std::endl;
std::vector<std::string> keys;
db1->QueryByIndex(readOptions, query,keys);
for (int i = 0; i < keys.size();i++) {
std::cout << keys[i] << std::endl;
}
这里主要进行了吞吐量测试和延迟测试
k_0
, k_1
, ...)及其对应的字段。// 吞吐量
void TestThroughput(int num_operations) {
WriteOptions writeOptions;
ReadOptions readOptions;
Options options;
options.create_if_missing = true;
auto db1 = new MyLevelDB(options, "testThroughput");
std::string key = "k_";
FieldArray fields = {{"name", "Customer#000000001"},
{"address", "IVhzIApeRb"},
{"phone", "25-989-741-2988"}};
//写
auto start_time1 = std::chrono::steady_clock::now();
for (int i = 0; i < num_operations; ++i) {
db1->PutWithFields(WriteOptions(), key + to_string(i), fields);
}
auto end_time1 = std::chrono::steady_clock::now();
auto duration1 =
chrono::duration_cast<chrono::milliseconds>(end_time1 - start_time1)
.count();
cout << "Put Op Throughput: " << num_operations * 1000 / duration1 << " OPS" << endl;
//读
string str;
auto start_time2 = std::chrono::steady_clock::now();
for (int i = 0; i < num_operations*100; ++i) {
db1->Get(ReadOptions(), key, &str);
}
auto end_time2 = std::chrono::steady_clock::now();
auto duration2 =
chrono::duration_cast<chrono::milliseconds>(end_time2 - start_time2)
.count();
//cout << duration2 << endl;
cout << "Get Op Throughput: " << num_operations*100 * 1000 / duration2 << " OPS"
<< endl;
//字段查找
std::vector<std::string> keys;
auto start_time3 = std::chrono::steady_clock::now();
for (int i = 0; i < num_operations; ++i) {
db1->FindKeysByField(ReadOptions(), fields[0],&keys);
}
auto end_time3 = std::chrono::steady_clock::now();
auto duration3 =
chrono::duration_cast<chrono::milliseconds>(end_time3 - start_time3)
.count();
cout << "FindKeysByField Op Throughput: " << num_operations * 1000 / duration3 << " OPS"
<< endl;
}
// 延迟
void TestLatency(int num_operations) {
Options options;
options.create_if_missing = true;
auto db = new MyLevelDB(options, "testLatency");
std::string key = "k_";
FieldArray fields = {{"name", "Customer#000000001"},
{"address", "IVhzIApeRb"},
{"phone", "25-989-741-2988"}};
//Put
int64_t latency1 = 0;
int64_t tollatency = 0;
auto end_time1 = std::chrono::steady_clock::now();
auto last_time1 = end_time1;
for (int i = 0; i < num_operations*100; ++i) {
// Operations
db->PutWithFields(WriteOptions(), key + to_string(i), fields);
end_time1 = std::chrono::steady_clock::now();
latency1 = std::chrono::duration_cast<std::chrono::milliseconds>(end_time1 -
last_time1)
.count();
last_time1 = end_time1;
tollatency += latency1;
}
std::cout << num_operations*100<<" put op averange latency:" << (double)tollatency / num_operations<< std::endl;
//Get
int64_t latency2 = 0;
tollatency = 0;
auto end_time2 = std::chrono::steady_clock::now();
auto last_time2 = end_time2;
std::string str;
for (int i = 0; i < num_operations*100; ++i) {
// Operations
db->Get(ReadOptions(), key + to_string(i),&str );
end_time2 = std::chrono::steady_clock::now();
latency2 = std::chrono::duration_cast<std::chrono::milliseconds>(end_time2 -
last_time2)
.count();
last_time2 = end_time2;
tollatency += latency2;
}
std::cout << num_operations*100
<< " Get operation averange latency:" << (double)tollatency / num_operations
<< std::endl;
//FindKeysByField
int64_t latency3 = 0;
tollatency = 0;
auto end_time3 = std::chrono::steady_clock::now();
auto last_time3 = end_time3;
std::vector<std::string> keys;
for (int i = 0; i < 50; ++i) {
// Operations
db->FindKeysByField(ReadOptions(), fields[0], &keys);
end_time3 = std::chrono::steady_clock::now();
latency3 = std::chrono::duration_cast<std::chrono::milliseconds>(end_time3 -
last_time3)
.count();
last_time3 = end_time3;
tollatency += latency3;
}
std::cout << num_operations
<< " FindKeysByField operation averange latency:" << tollatency / num_operations
<< std::endl;
}
在反序列化为字符数组时,反序列化后的字符数组,除最后一个字段外全部乱码。经过调试排查,是因为将字符串反序列化后的字段转换成Slice类型时,每次访问的地址都是同一个地址,因此后边的数据会覆盖前边的数据,导致乱码。出函数后,该内存地址被释放,但是数组中存储的数据还是指向该内存地址,导致程序出现段错误。
解决方法:在序列化和反序列化时,不将字段从string类型转换成Slice类型,在putwhitefield进行字段存储时在进行转换。