LevelDB二级索引实现 姚凯文(kevinyao0901) 姜嘉祺
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

373 lines
13 KiB

#include <iostream>
#include <cassert>
#include <vector>
#include <chrono>
#include <numeric>
#include <algorithm>
#include <sstream>
#include <regex>
#include <fstream>
#include <thread>
#include <leveldb/db.h>
using namespace std::chrono;
// 定义字段
struct Field {
std::string first;
std::string second;
};
// 解析数据值
std::vector<std::pair<std::string, std::string>> ParseValue(const std::string& value) {
std::vector<std::pair<std::string, std::string>> fields;
size_t start = 0;
size_t end = value.find("|");
while (end != std::string::npos) {
std::string field = value.substr(start, end - start);
size_t separator = field.find(":");
if (separator != std::string::npos) {
fields.push_back({field.substr(0, separator), field.substr(separator + 1)});
}
start = end + 1;
end = value.find("|", start);
}
return fields;
}
// 查询函数:根据字段查找所有包含该字段的 Key
std::vector<std::string> FindKeysByField(leveldb::DB* db, const Field& field) {
std::vector<std::string> keys;
leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
std::string key = it->key().ToString();
std::string value = it->value().ToString();
std::vector<std::pair<std::string, std::string>> fields = ParseValue(value);
// 查找是否有匹配的字段
for (const auto& f : fields) {
if (f.first == field.first && f.second == field.second) {
keys.push_back(key);
break;
}
}
}
delete it;
return keys;
}
size_t total_size = 0; // 用于累加请求写入的数据量
// 生成数据并插入数据库
void GenerateAndInsertData(leveldb::DB* db, int num_entries) {
leveldb::WriteOptions write_options;
leveldb::Status status;
for (int i = 1; i <= num_entries; ++i) {
std::string key = "k_" + std::to_string(i);
std::string name = "Customer#" + std::to_string(i);
std::string address = "Address_" + std::to_string(i);
std::string phone = "25-989-741-" + std::to_string(1000 + i);
std::string value = "name:" + name + "|address:" + address + "|phone:" + phone;
// 计算每条记录的大小
size_t key_size = key.size();
size_t value_size = value.size();
size_t record_size = key_size + value_size;
total_size += record_size; // 累加到总请求写入的数据量
status = db->Put(write_options, key, value);
assert(status.ok() && "Failed to insert data");
}
std::cout << "Total data written (in bytes): " << total_size << " bytes" << std::endl;
}
// 计算统计指标
void CalculateLatencyStats(const std::vector<double>& latencies) {
if (latencies.empty()) return;
// 平均延迟
double avg_latency = std::accumulate(latencies.begin(), latencies.end(), 0.0) / latencies.size();
// P75 延迟
std::vector<double> sorted_latencies = latencies;
std::sort(sorted_latencies.begin(), sorted_latencies.end());
double p75_latency = sorted_latencies[latencies.size() * 75 / 100];
// P99 延迟
double p99_latency = sorted_latencies[latencies.size() * 99 / 100];
// 输出结果
std::cout << "Average latency: " << avg_latency << " ms" << std::endl;
std::cout << "P75 latency: " << p75_latency << " ms" << std::endl;
std::cout << "P99 latency: " << p99_latency << " ms" << std::endl;
}
// 基准测试:计算吞吐量和延迟
void BenchmarkWritePerformance(leveldb::DB* db, int num_entries) {
leveldb::WriteOptions write_options;
leveldb::Status status;
std::vector<double> latencies; // 存储每次操作的延迟
auto start = high_resolution_clock::now();
for (int i = 1; i <= num_entries; ++i) {
std::string key = "k_" + std::to_string(i);
std::string value = "name:Customer#" + std::to_string(i) + "|address:Address_" + std::to_string(i) + "|phone:25-989-741-" + std::to_string(1000 + i);
// 计算每条记录的大小
size_t key_size = key.size();
size_t value_size = value.size();
size_t record_size = key_size + value_size;
total_size += record_size; // 累加到总请求写入的数据量
auto op_start = high_resolution_clock::now();
status = db->Put(write_options, key, value);
auto op_end = high_resolution_clock::now();
assert(status.ok() && "Failed to insert data");
// 记录每次操作的延迟(ms)
double latency = duration_cast<microseconds>(op_end - op_start).count() / 1000.0;
latencies.push_back(latency);
}
auto end = high_resolution_clock::now();
auto duration = duration_cast<microseconds>(end - start).count();
// 计算吞吐量
double throughput = num_entries / (duration / 1000000.0);
std::cout << "Total time for " << num_entries << " writes: " << duration / 1000.0 << " ms" << std::endl;
std::cout << "Throughput: " << throughput << " OPS (operations per second)" << std::endl;
// 计算延迟统计
CalculateLatencyStats(latencies);
}
// 获取写放大(Write Amplification)
void CalculateWriteAmplification(leveldb::DB* db) {
std::string property;
bool success = db->GetProperty("leveldb.stats", &property);
if (!success) {
std::cerr << "Failed to get db stats" << std::endl;
return;
}
// 获取日志文件中的合并信息
std::ifstream log_file("/home/kevin/leveldb_proj/build/testdb/LOG"); // 替换为实际日志路径
std::string log_line;
size_t total_compacted = 0;
std::regex compact_regex(R"(.*Compacted.*=>\s*([\d]+)\s*bytes)");
while (std::getline(log_file, log_line)) {
std::smatch match;
if (std::regex_search(log_line, match, compact_regex)) {
total_compacted += std::stoull(match[1]);
}
}
log_file.close();
double write_amplification = static_cast<double>(total_compacted) ;
std::cout << "Write Amplification: " << write_amplification << std::endl;
std::cout << "Total data written (in bytes): " << total_size << " bytes" << std::endl;
double write_amplification_factor = static_cast<double>(total_compacted) / total_size;
std::cout << "Write Amplification Factor: " << write_amplification_factor << std::endl;
}
// 基准测试:二级索引性能提升
void BenchmarkFieldQueryWithIndex(leveldb::DB* db) {
// 测试前,查询无索引的字段性能
auto start = high_resolution_clock::now();
Field field = {"name", "Customer#10000"};
std::vector<std::string> keys_without_index = FindKeysByField(db, field);
auto end = high_resolution_clock::now();
auto duration = duration_cast<microseconds>(end - start);
std::cout << "Time without index: " << duration.count() << " microseconds" << std::endl;
// 创建二级索引
// 在此添加创建索引的代码(可以使用 DBImpl::CreateIndexOnField 函数)
start = high_resolution_clock::now();
leveldb::Status status = db->CreateIndexOnField("name");
end = high_resolution_clock::now();
duration = duration_cast<microseconds>(end - start);
std::cout << "Time to create index: " << duration.count() << " microseconds" << std::endl;
// 测试后,查询有索引的字段性能
start = high_resolution_clock::now();
std::vector<std::string> keys_with_index = db->QueryByIndex("name:Customer#10000"); // 使用二级索引查询
end = high_resolution_clock::now();
duration = duration_cast<microseconds>(end - start);
std::cout << "Time with index: " << duration.count() << " microseconds" << std::endl;
// 输出查询结果
std::cout << "Found " << keys_with_index.size() << " keys with index." << std::endl;
assert(!keys_with_index.empty() && "Query by index returned no results");
std::cout << "Query by index results for name=Customer#10000: ";
for (const auto& result : keys_with_index) {
std::cout << result << ", ";
}
}
// // 基准测试:记录插入时的性能影响
// void BenchmarkWritePerformance(leveldb::DB* db, int num_entries) {
// leveldb::WriteOptions write_options;
// auto start = high_resolution_clock::now();
// GenerateAndInsertData(db, num_entries); // 执行批量插入
// auto end = high_resolution_clock::now();
// auto duration = duration_cast<microseconds>(end - start);
// std::cout << "Insertion time for " << num_entries << " entries: " << duration.count() << " microseconds" << std::endl;
// }
// 测试不同数据量下索引更新/删除对写入性能的影响
void BenchmarkIndexImpactOnWrite(leveldb::DB* db, const std::vector<int>& entry_sizes) {
for (int num_entries : entry_sizes) {
std::cout << "\nTesting with " << num_entries << " entries..." << std::endl;
// 创建索引
auto start = high_resolution_clock::now();
leveldb::Status status = db->CreateIndexOnField("name");
assert(status.ok() && "Failed to create index");
auto end = high_resolution_clock::now();
auto duration = duration_cast<microseconds>(end - start).count();
std::cout << "Time to create index: " << duration / 1000.0 << " ms" << std::endl;
// 测试插入性能(索引启用)
std::cout << "Benchmarking write performance with index..." << std::endl;
BenchmarkWritePerformance(db, num_entries);
BenchmarkFieldQueryWithIndex(db);
// 删除索引
start = high_resolution_clock::now();
status = db->DeleteIndex("name");
assert(status.ok() && "Failed to delete index");
end = high_resolution_clock::now();
duration = duration_cast<microseconds>(end - start).count();
std::cout << "Time to delete index: " << duration / 1000.0 << " ms" << std::endl;
// 测试插入性能(索引禁用)
std::cout << "Benchmarking write performance without index..." << std::endl;
BenchmarkWritePerformance(db, num_entries);
}
}
// 并发执行写入性能基准测试
void ConcurrentBenchmarkWritePerformance(leveldb::DB* db, int num_entries, int num_threads) {
auto write_task = [db, num_entries]() {
BenchmarkWritePerformance(db, num_entries);
};
auto start_time = std::chrono::high_resolution_clock::now();
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; ++i) {
threads.push_back(std::thread(write_task));
}
for (auto& t : threads) {
t.join();
}
auto end_time = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end_time - start_time;
double total_throughput = num_threads * num_entries / elapsed.count();
std::cout << "Total time for concurrent writes: " << elapsed.count() * 1000 << " ms\n";
std::cout << "Total throughput: " << total_throughput << " OPS\n";
}
// 基准测试:记录删除二级索引的开销
void BenchmarkDeleteIndex(leveldb::DB* db, const std::string& field_name) {
auto start = high_resolution_clock::now();
// 删除二级索引
leveldb::Status status = db->DeleteIndex(field_name);
assert(status.ok() && "Failed to delete index");
auto end = high_resolution_clock::now();
auto duration = duration_cast<microseconds>(end - start);
std::cout << "Time to delete index on field '" << field_name << "': " << duration.count() << " microseconds" << std::endl;
}
// 获取数据库大小(用来估算二级索引的空间占用)
void GetDatabaseSize(leveldb::DB* db) {
std::string property;
// 使用 bool 返回值检查是否成功获取属性
bool success = db->GetProperty("leveldb.stats", &property);
if (!success) {
std::cerr << "Failed to get db stats" << std::endl;
return;
}
std::cout << "Database stats: " << std::endl;
std::cout << property << std::endl;
}
int main() {
leveldb::Options options;
options.create_if_missing = true;
// 打开数据库
leveldb::DB* db = nullptr;
leveldb::Status status = leveldb::DB::Open(options, "./testdb", &db);
assert(status.ok() && "Failed to open database");
// 测试写入性能
BenchmarkWritePerformance(db, 100001); // 插入 100001 条数据
// 测试二级索引对查询性能的提升
BenchmarkFieldQueryWithIndex(db);
// 获取数据库大小
GetDatabaseSize(db);
// 计算写放大
CalculateWriteAmplification(db);
// 测试删除二级索引的开销
BenchmarkDeleteIndex(db, "name");
// 定义不同的数据量
std::vector<int> entry_sizes = {10000, 50000, 100000};
// 测试索引对写入性能的影响
BenchmarkIndexImpactOnWrite(db, entry_sizes);
// 测试并发写入
int num_entries = 100000; // 每个线程的写入条目数
int num_threads = 8; // 并发线程数
ConcurrentBenchmarkWritePerformance(db, num_entries, num_threads);
// 关闭数据库
delete db;
std::cout << "Benchmark tests completed." << std::endl;
return 0;
}