Ver a proveniência

添加 MyLevelDB 类及相关功能,支持字段序列化、索引创建和查询

main
1021 há 8 meses
ascendente
cometimento
d7faa3a3bd
9 ficheiros alterados com 535 adições e 5 eliminações
  1. +14
    -1
      CMakeLists.txt
  2. +62
    -0
      include/leveldb/my_leveldb.h
  3. +7
    -2
      include/leveldb/options.h
  4. +1
    -1
      include/leveldb/slice.h
  5. +206
    -0
      myLevelDB/my_leveldb.cc
  6. +83
    -0
      test/db_test1.cc
  7. +122
    -0
      test/performancetest.cc
  8. +35
    -0
      util/coding.cc
  9. +5
    -1
      util/coding.h

+ 14
- 1
CMakeLists.txt Ver ficheiro

@ -190,7 +190,7 @@ target_sources(leveldb
"util/options.cc"
"util/random.h"
"util/status.cc"
"myLevelDB/my_leveldb.cc"
# Only CMake 3.3+ supports PUBLIC sources in targets exported by "install".
$<$<VERSION_GREATER:CMAKE_VERSION,3.2>:PUBLIC>
"${LEVELDB_PUBLIC_INCLUDE_DIR}/c.h"
@ -208,6 +208,7 @@ target_sources(leveldb
"${LEVELDB_PUBLIC_INCLUDE_DIR}/table_builder.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/table.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/write_batch.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/my_leveldb.h"
)
if (WIN32)
@ -492,6 +493,7 @@ if(LEVELDB_INSTALL)
"${LEVELDB_PUBLIC_INCLUDE_DIR}/table_builder.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/table.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/write_batch.h"
"${LEVELDB_PUBLIC_INCLUDE_DIR}/my_leveldb.h"
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/leveldb"
)
@ -517,3 +519,14 @@ if(LEVELDB_INSTALL)
DESTINATION "${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}"
)
endif(LEVELDB_INSTALL)
if(LEVELDB_INSTALL)
add_executable(db_test1
"${PROJECT_SOURCE_DIR}/test/db_test1.cc"
)
target_link_libraries(db_test1 leveldb)
add_executable(performancetest
"${PROJECT_SOURCE_DIR}/test/performancetest.cc"
)
target_link_libraries(performancetest leveldb)
endif()

+ 62
- 0
include/leveldb/my_leveldb.h Ver ficheiro

@ -0,0 +1,62 @@
#pragma
#include <iostream>
#include "leveldb/db.h"
#include <vector>
#include <mutex>
namespace leveldb {
class LEVELDB_EXPORT MyLevelDB : leveldb::DB {
public:
MyLevelDB(Options& options, std::string db_name)
: _op(options), _db_name(db_name) {
//
Status status = DB::Open(options, db_name + "_main", &_fields_db);
if (!status.ok()) {
std::cerr << "Open main database failure: " << status.ToString() << std::endl;
abort();
}
}
//
void SerializeValue(const FieldArray& fields, std::string& resString);
//
void ParseValue(const std::string& value_str, FieldArray& resFieldArray);
//
Status PutWithFields(const WriteOptions& options, const std::string& key,const FieldArray& fields);
//
Status FindKeysByField(const ReadOptions& options, const Field field,std::vector<std::string>* keys);
//
Status CreateIndexOnField(const std::string& field_name);
//
Status DeleteIndex(std::string& field_name);
void QueryByIndex(const ReadOptions& options, Field& field,
std::vector<std::string>& keys);
public:
Status Put(const WriteOptions& options, const Slice& key,
const Slice& value) override;
Status Delete(const WriteOptions& options, const Slice& key) override;
Status Write(const WriteOptions& options, WriteBatch* updates) override;
Status Get(const ReadOptions& options, const Slice& key,
std::string* value) override;
Iterator* NewIterator(const ReadOptions& options) override;
const Snapshot* GetSnapshot() override;
void ReleaseSnapshot(const Snapshot* snapshot) override;
bool GetProperty(const Slice& property, std::string* value) override;
void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override;
void CompactRange(const Slice* begin, const Slice* end) override;
private:
Options _op;
std::string _db_name;
DB* _fields_db;
std::vector<std::string> index_list_;
std::mutex mutex_;
std::vector<DB*> index_db;
};
}

+ 7
- 2
include/leveldb/options.h Ver ficheiro

@ -6,9 +6,10 @@
#define STORAGE_LEVELDB_INCLUDE_OPTIONS_H_
#include <cstddef>
#include <iostream>
#include <vector>
#include "leveldb/export.h"
#include "leveldb/db.h"
namespace leveldb {
class Cache;
@ -17,6 +18,8 @@ class Env;
class FilterPolicy;
class Logger;
class Snapshot;
using Field = std::pair<std::string, std::string>;
using FieldArray = std::vector<std::pair<std::string, std::string>>;
// DB contents are stored in a set of blocks, each of which holds a
// sequence of key,value pairs. Each block may be compressed before
@ -145,6 +148,8 @@ struct LEVELDB_EXPORT Options {
// Many applications will benefit from passing the result of
// NewBloomFilterPolicy() here.
const FilterPolicy* filter_policy = nullptr;
bool index_mode = false;
};
// Options that control read operations

+ 1
- 1
include/leveldb/slice.h Ver ficheiro

@ -19,7 +19,7 @@
#include <cstddef>
#include <cstring>
#include <string>
#include <iostream>
#include "leveldb/export.h"
namespace leveldb {

+ 206
- 0
myLevelDB/my_leveldb.cc Ver ficheiro

@ -0,0 +1,206 @@
#include "leveldb/write_batch.h"
#include "db/dbformat.h"
#include "leveldb/db.h"
#include "util/coding.h"
#include "leveldb/my_leveldb.h"
#include "unordered_map"
#include <sstream>
namespace leveldb {
//反序列化为字段数组
void MyLevelDB::ParseValue(const std::string& value_str,
FieldArray& resFieldArray) {
std::stringstream ss(value_str);
std::string segment;
// 按逗号分割字符串
while (std::getline(ss, segment, ',')) {
std::string key;
std::string value;
std::stringstream kv(segment);
if (std::getline(kv, key, ':') && std::getline(kv, value, ':')) {
if (!key.empty() && !value.empty()) {
resFieldArray.push_back(std::make_pair(key, value));
// std::cout << ((resFieldArray.back()).first).data() << std::endl;
} else {
std::cerr << "Invalid key-value pair: " << segment << std::endl;
}
} else {
std::cerr << "Failed to parse segment: " << segment << std::endl;
}
}
}
//序列化为字符串
void MyLevelDB::SerializeValue(const FieldArray& fields,
std::string& resString) {
resString.clear();
for (int i = 0; i < fields.size(); i++) {
const std::string& key = fields[i].first;
const std::string& value = fields[i].second;
resString += key + ":" +value;
if (i != fields.size() - 1) {
resString += ",";
}
}
}
Status MyLevelDB::PutWithFields(const WriteOptions& options,const std::string& key,const FieldArray& fields) {
std::string value;
SerializeValue(fields, value);
auto slice_key = Slice(key.c_str());
auto slice_value = Slice(value.c_str());
Status s = _fields_db->Put(options, slice_key, slice_value);
std::unordered_map<int, int> match;
std::unique_lock<std::mutex> l(mutex_);
for (int i = 0; i < fields.size(); i++) {
for (size_t idx = 0; idx < index_list_.size(); idx++) {
const auto& i_name = index_list_[idx];
if (fields[i].first == i_name) {
match[i] = idx;
break;
}
}
}
for (auto item : match) {
std::string composed_key;
composed_key += fields[item.first].second + ":" + key;
s = index_db[item.second]->Put(options, composed_key, Slice());
}
return s;
}
Status MyLevelDB::FindKeysByField(const ReadOptions& options, const Field field,
std::vector<std::string>* keys) {
auto it = _fields_db->NewIterator(options);
it->SeekToFirst();
keys->clear();
while (it->Valid()) {
auto val = it->value();
FieldArray arr;
auto str_val = std::string(val.data(), val.size());
ParseValue(str_val, arr);
for (auto pr : arr) {
if (pr.first == field.first && pr.second == field.second) {
Slice key = it->key();
keys->push_back(std::string(key.data(), key.size()));
break;
}
}
it->Next();
}
delete it;
return Status::OK();
}
Status MyLevelDB::CreateIndexOnField(const std::string& field_name) {
for (const auto& field : this->index_list_) {
if (field == field_name) {
return Status::InvalidArgument(field_name,
"Index already exists for this field");
}
}
index_list_.push_back(field_name);
Options op = _op;
DB* field_db;
op.index_mode = true;
Status status = DB::Open(op, _db_name + "_index_" + field_name, &field_db);
index_db.push_back(field_db);
if (!status.ok()) {
std::cerr << "Failed to open index DB: " << status.ToString() << std::endl;
abort();
}
return status;
}
Status MyLevelDB::DeleteIndex(std::string& field_name) {
auto it = std::find(index_list_.begin(), index_list_.end(), field_name);
if (it == index_list_.end()) {
return Status::NotFound("Index not found for this field");
}
// 从列表中移除该字段
index_list_.erase(it);
return Status::OK();
}
void MyLevelDB::QueryByIndex(const ReadOptions& options, Field& field,
std::vector<std::string>& keys) {
int i = 0;
for (; i < index_list_.size(); i++) {
if (index_list_[i] == field.first) {
break;
}
}
assert(i != index_list_.size());
auto it = index_db[i]->NewIterator(options);
it->SeekToFirst();
while (it->Valid()) {
auto val = it->key();
auto str_val = std::string(val.data(), val.size());
std::string key;
std::string value;
std::stringstream kv(str_val);
std::getline(kv, key, ':');
std::getline(kv, value, ':');
if (key == field.second) {
keys.push_back(value);
}
it->Next();
}
delete it;
}
Status MyLevelDB::Put(const WriteOptions& options, const Slice& key,
const Slice& value) {
return _fields_db->Put(options, key, value);
}
Status MyLevelDB::Delete(const WriteOptions& options, const Slice& key) {
return _fields_db->Delete(options, key);
}
Status MyLevelDB::Write(const WriteOptions& options, WriteBatch* updates) {
assert(0);
return Status();
}
Status MyLevelDB::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
return _fields_db->Get(options, key, value);
}
Iterator* MyLevelDB::NewIterator(const ReadOptions& options) {
return _fields_db->NewIterator(options);
}
const Snapshot* MyLevelDB::GetSnapshot() { return _fields_db->GetSnapshot(); }
void MyLevelDB::ReleaseSnapshot(const Snapshot* snapshot) {
return _fields_db->ReleaseSnapshot(snapshot);
}
bool MyLevelDB::GetProperty(const Slice& property, std::string* value) {
return false;
}
void MyLevelDB::GetApproximateSizes(const Range* range, int n,
uint64_t* sizes) {
/* uint64_t temp = 0;
_main_db->GetApproximateSizes(range, n, sizes);
for (auto& index_db : field_db_) {
index_db->GetApproximateSizes(range, n, &temp);
*sizes += temp;
}*/
}
void MyLevelDB::CompactRange(const Slice* begin, const Slice* end) {
_fields_db->CompactRange(begin, end);
}
}

+ 83
- 0
test/db_test1.cc Ver ficheiro

@ -0,0 +1,83 @@
#include "leveldb/db.h"
#include "leveldb/my_leveldb.h"
#include <iostream>
using namespace std;
using namespace leveldb;
int main() {
Options op;
op.create_if_missing = true;
MyLevelDB db(op, "testMyDB");
//序列化测试
std::string res1;
FieldArray fields1 = {
{"name", "Customer#000000001"}, {"address", "abc"}, {"phone", "def"}};
db.SerializeValue(fields1,res1);
std::cout << "序列化测试结果:" << std::endl << res1 << std::endl;
//反序列化测试
FieldArray fields2;
db.ParseValue(res1,fields2);
std::cout << "反序列化测试结果:" << std::endl ;
for (int i = 0; i < fields2.size(); i++) {
std::cout << fields2[i].first << ":" << fields2[i].second << std::endl;
}
//字段存储
std::cout << "字段存储和查找结果:" << std::endl;
std::string key2 = "k_1";
std::string key3 = "k_2";
std::string key4 = "k_3";
FieldArray field2 = {{"name", "Customer#000000001"},
{"address", "IVhzIApeRb"},
{"phone", "25-989-741-2988"}};
FieldArray field3 = {
{"name", "Customer#000000001"}, {"address", "abc"}, {"phone", "def"}};
FieldArray field4 = {
{"name", "Customer#000000001"}, {"address", "abc"}, {"phone", "def"}};
db.PutWithFields(WriteOptions(), key2, field2);
db.PutWithFields(WriteOptions(), key3, field3);
db.PutWithFields(WriteOptions(), key4, field4);
//字段查找
FieldArray value_ret;
std::vector<std::string> v;
db.FindKeysByField(ReadOptions(), field2[1], &v);
for (auto s : v) std::cout << s << "\n";
//创建索引
WriteOptions writeOptions;
ReadOptions readOptions;
Options options;
options.create_if_missing = true;
auto db1 = new MyLevelDB(options, "testdb2");
db1->CreateIndexOnField("address");
std::string key8 = "k_8";
std::string key9 = "k_9";
FieldArray fields8 = {
{"name", "Customer#000000001"}, {"address", "abc"}, {"phone", "def"}};
FieldArray fields9 = {{"name", "Customer#000000001"},
{"address", "IVhzIApeRb"},
{"phone", "25-989-741-2988"}};
FieldArray fields10 = {
{"name", "Customer#000000001"}, {"address", "abc"}, {"phone", "def"}};
FieldArray fields11 = {
{"name", "Customer#000000001"}, {"address", "abc"}, {"phone", "def"}};
FieldArray fields12 = {
{"name", "Customer#000000001"}, {"address", "abc"}, {"phone", "def"}};
Field query = {"address", "abc"};
db1->PutWithFields(WriteOptions(), key8, fields8);
db1->PutWithFields(WriteOptions(), key9, fields9);
std::cout << "索引存储与查找:" << std::endl;
std::vector<std::string> keys;
db1->QueryByIndex(readOptions, query,keys);
for (int i = 0; i < keys.size();i++) {
std::cout << keys[i] << std::endl;
}
return 0;
}

+ 122
- 0
test/performancetest.cc Ver ficheiro

@ -0,0 +1,122 @@
#include "leveldb/db.h"
#include "leveldb/my_leveldb.h"
#include <iostream>
using namespace std;
using namespace leveldb;
// ÍÌÍÂÁ¿
void TestThroughput(int num_operations) {
WriteOptions writeOptions;
ReadOptions readOptions;
Options options;
options.create_if_missing = true;
auto db1 = new MyLevelDB(options, "testThroughput");
std::string key = "k_";
FieldArray fields = {{"name", "Customer#000000001"},
{"address", "IVhzIApeRb"},
{"phone", "25-989-741-2988"}};
//д
auto start_time1 = std::chrono::steady_clock::now();
for (int i = 0; i < num_operations; ++i) {
db1->PutWithFields(WriteOptions(), key + to_string(i), fields);
}
auto end_time1 = std::chrono::steady_clock::now();
auto duration1 =
chrono::duration_cast<chrono::milliseconds>(end_time1 - start_time1)
.count();
cout << "Put Op Throughput: " << num_operations * 1000 / duration1 << " OPS" << endl;
//¶Á
string str;
auto start_time2 = std::chrono::steady_clock::now();
for (int i = 0; i < num_operations*100; ++i) {
db1->Get(ReadOptions(), key, &str);
}
auto end_time2 = std::chrono::steady_clock::now();
auto duration2 =
chrono::duration_cast<chrono::milliseconds>(end_time2 - start_time2)
.count();
//cout << duration2 << endl;
cout << "Get Op Throughput: " << num_operations*100 * 1000 / duration2 << " OPS"
<< endl;
//×ֶβéÕÒ
std::vector<std::string> keys;
auto start_time3 = std::chrono::steady_clock::now();
for (int i = 0; i < num_operations; ++i) {
db1->FindKeysByField(ReadOptions(), fields[0],&keys);
}
auto end_time3 = std::chrono::steady_clock::now();
auto duration3 =
chrono::duration_cast<chrono::milliseconds>(end_time3 - start_time3)
.count();
cout << "FindKeysByField Op Throughput: " << num_operations * 1000 / duration3 << " OPS"
<< endl;
}
// ÑÓ³Ù
void TestLatency(int num_operations) {
Options options;
options.create_if_missing = true;
auto db = new MyLevelDB(options, "testLatency");
std::string key = "k_";
FieldArray fields = {{"name", "Customer#000000001"},
{"address", "IVhzIApeRb"},
{"phone", "25-989-741-2988"}};
//Put
int64_t latency1 = 0;
int64_t tollatency = 0;
auto end_time1 = std::chrono::steady_clock::now();
auto last_time1 = end_time1;
for (int i = 0; i < num_operations*100; ++i) {
// Operations
db->PutWithFields(WriteOptions(), key + to_string(i), fields);
end_time1 = std::chrono::steady_clock::now();
latency1 = std::chrono::duration_cast<std::chrono::milliseconds>(end_time1 -
last_time1)
.count();
last_time1 = end_time1;
tollatency += latency1;
}
std::cout << num_operations*100<<" put op averange latency:" << (double)tollatency / num_operations<< std::endl;
//Get
int64_t latency2 = 0;
tollatency = 0;
auto end_time2 = std::chrono::steady_clock::now();
auto last_time2 = end_time2;
std::string str;
for (int i = 0; i < num_operations*100; ++i) {
// Operations
db->Get(ReadOptions(), key + to_string(i),&str );
end_time2 = std::chrono::steady_clock::now();
latency2 = std::chrono::duration_cast<std::chrono::milliseconds>(end_time2 -
last_time2)
.count();
last_time2 = end_time2;
tollatency += latency2;
}
std::cout << num_operations*100
<< " Get operation averange latency:" << (double)tollatency / num_operations
<< std::endl;
//FindKeysByField
int64_t latency3 = 0;
tollatency = 0;
auto end_time3 = std::chrono::steady_clock::now();
auto last_time3 = end_time3;
std::vector<std::string> keys;
for (int i = 0; i < 50; ++i) {
// Operations
db->FindKeysByField(ReadOptions(), fields[0], &keys);
end_time3 = std::chrono::steady_clock::now();
latency3 = std::chrono::duration_cast<std::chrono::milliseconds>(end_time3 -
last_time3)
.count();
last_time3 = end_time3;
tollatency += latency3;
}
std::cout << 50 << " FindKeysByField operation averange latency:"
<< (double)tollatency / num_operations << "ms"
<< std::endl;
}
int main() {
TestThroughput(100);
TestLatency(1000);
}

+ 35
- 0
util/coding.cc Ver ficheiro

@ -153,4 +153,39 @@ bool GetLengthPrefixedSlice(Slice* input, Slice* result) {
}
}
uint32_t GetIndexKey(const Slice& val, Slice* index_name, Slice* index_val) {
uint32_t tmp;
uint32_t num_index;
uint64_t key_offset;
Slice input = val;
auto limit = input.data() + input.size();
GetVarint32(&input, &tmp);
GetVarint32(&input, &num_index);
assert(num_index == 1);
GetLengthPrefixedSlice(&input, index_name);
const char* p = input.data();
GetLengthPrefixedSlice(&input, index_val);
return input.data() - p;
}
// val_len | val | key
Slice BuildComposedKey(const Slice& key, Slice& val, uint32_t val_hold_bi) {
std::string composed_key = "";
PutLengthPrefixedSlice(&composed_key, val);
composed_key.append(key.data(), key.size());
return Slice(composed_key);
}
void BuildComposedKey(const Slice& key, Slice& val, std::string* composed_key) {
PutLengthPrefixedSlice(composed_key, val);
composed_key->append(key.data(), key.size());
}
void BuildComposedKey(const Slice& key, const Slice& val,
std::string* composed_key) {
PutLengthPrefixedSlice(composed_key, val);
composed_key->append(key.data(), key.size());
}
} // namespace leveldb

+ 5
- 1
util/coding.h Ver ficheiro

@ -116,7 +116,11 @@ inline const char* GetVarint32Ptr(const char* p, const char* limit,
}
return GetVarint32PtrFallback(p, limit, value);
}
uint32_t GetIndexKey(const Slice& val, Slice* index_name, Slice* index_val);
Slice BuildComposedKey(const Slice& key, Slice& val, uint32_t val_hold_bi);
void BuildComposedKey(const Slice& key, Slice& val, std::string* composed_key);
void BuildComposedKey(const Slice& key, const Slice& val,
std::string* composed_key);
} // namespace leveldb
#endif // STORAGE_LEVELDB_UTIL_CODING_H_

Carregando…
Cancelar
Guardar