Преглед на файлове

二级索引-基本功能

master
陈予曈 преди 8 месеца
родител
ревизия
53ec2fca2b
променени са 1 файла, в които са добавени 356 реда и са изтрити 2 реда
  1. +356
    -2
      db/NewDB.cc

+ 356
- 2
db/NewDB.cc Целия файл

@ -75,39 +75,393 @@ FieldArray NewDB::ParseValue(const std::string& value_str){
// return fields;
}
// 构造索引的key,结构:FieldName_FieldValue_Key-橙
std::string NewDB::ConstructIndexKey(const Slice& key, const Field& field){
// std::string s;
// PutLengthPrefixedSlice(&s, Slice(field.first));
// PutLengthPrefixedSlice(&s, Slice(field.second));
// PutLengthPrefixedSlice(&s, key);
// return s;
std::ostringstream oss;
oss << field.first << ":" << field.second << "_" << key.ToString();
return oss.str();
}
// 从索引键中提取原数据的键
std::string NewDB::ExtractIndexKey(const Slice& key){
// Slice input(key.ToString());
// Slice v;
// GetLengthPrefixedSlice(&input, &v);
// GetLengthPrefixedSlice(&input, &v);
// GetLengthPrefixedSlice(&input, &v);
// return v.ToString();
//extract key of dataDB from key of indexDB
std::string fullKey(key.data(), key.size());
// size_t pos1 = fullKey.find('_');
size_t pos1 = fullKey.find('_');
return fullKey.substr(pos1 + 1); // 提取 'Key' 部分
}
Status NewDB::Open(const Options& options, const std::string& name, NewDB** dbptr) {
// 打开底层的数据库
DB* dataDB;
Status s1 = DB::Open(options, name + "_data", &dataDB);
if (!s1.ok()) {
return s1; // 如果打开失败,返回错误
}
DB* indexDB;
Status s2 = DB::Open(options, name + "_index", &indexDB);
if (!s2.ok()) {
return s2; // 如果打开失败,返回错误
}
DB* recoverDB;
Status s3 = DB::Open(options, name + "_recover", &recoverDB);
if (!s3.ok()) {
return s3; // 如果打开失败,返回错误
}
// 创建一个 NewDB 实例
*dbptr = new NewDB();
// 初始化 data_db_,index_db_和recover_db_
(*dbptr)->data_db_ = std::unique_ptr<DB>(dataDB); // 将打开的数据库指针传递给 data_db_
(*dbptr)->index_db_ = std::unique_ptr<DB>(indexDB); // 创建 IndexDB
(*dbptr)->recover_db_ = std::unique_ptr<DB>(recoverDB);
(*dbptr)->TinyOpID = 0;
// 重新构造indexed_fields
Iterator* it = (*dbptr)->index_db_->NewIterator(ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
// 只关心以 "index_field" 为前缀的键
if (it->key().starts_with("index_field_read:")) {
std::string field = it->key().ToString().substr(16); // 获取字段名
(*dbptr)->indexed_fields_read.insert(field);
}
if (it->key().starts_with("index_field_write:")) {
std::string field = it->key().ToString().substr(17); // 获取字段名
(*dbptr)->indexed_fields_write.insert(field);
}
}
delete it;
return Status::OK(); // 返回成功
}
Status NewDB::Put_fields(const WriteOptions& options, const Slice& key, const FieldArray& fields){
FieldArray current_fields;
Status s = Get_fields(leveldb::ReadOptions(), key, &current_fields);
leveldb::WriteBatch data_batch;
leveldb::WriteBatch index_batch;
// uint64_t TinyOpID = 0;
if(!s.ok()){
for (const auto& field : fields) {
// 如果字段名在 indexed_fields_ 中,才插入二级索引
if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) {
// 构造索引的key,结构:FieldName_FieldValue_Key
std::string index_key = ConstructIndexKey(key, field);
index_batch.Put(index_key, Slice());
}
}
std::string value = SerializeValue(fields);
data_batch.Put(key.ToString(), value);
}
else{
std::pair<FieldArray, FieldArray> fieldarray_pair = UpdateIndex(options, key, fields, current_fields);
// put ops in indexdb
for (const auto& field : fieldarray_pair.first) {
if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) {
// put into index_batch
std::string index_key = ConstructIndexKey(key, field);
index_batch.Put(index_key, Slice());
}
}
// put ops in datadb
std::string value = SerializeValue(fields);
data_batch.Put(key.ToString(), value);
// delete ops in indexdb
for (const auto& field : fieldarray_pair.second) {
if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) {
// delete in index_batch
std::string index_key = ConstructIndexKey(key, field);
index_batch.Delete(index_key);
}
}
}
// write into indexDB
Status status = index_db_->Write(write_options, &index_batch);
if(!status.ok()){
return status;
}
// write into dataDB
Status data_status = data_db_->Write(write_options, &data_batch);
if (!data_status.ok()) {
return data_status; // 主数据写入失败,直接返回
}
return Status::OK();
}
Status NewDB::Get_fields(const ReadOptions& options, const Slice& key, FieldArray* fields){
std::pair<FieldArray, FieldArray> NewDB::UpdateIndex(const WriteOptions& options, const Slice& key, const FieldArray& fields, const FieldArray& current_fields){
// 构建current_fields的映射
std::unordered_map<std::string, std::string> current_map;
for (const auto& field : current_fields) {
current_map[field.first] = field.second;
}
// 构建fields的映射
std::unordered_map<std::string, std::string> fields_map;
for (const auto& field : fields) {
fields_map[field.first] = field.second;
}
// 删除的字段
FieldArray deleted_fields;
// 新增的字段
FieldArray insert_fields;
// fields中的字段
for (const auto& field : fields) {
// fields中存在但current_fields中不存在
if (current_map.find(field.first) == current_map.end()) {
insert_fields.push_back(field);
} else if (current_map[field.first] != field.second) {
// current_fields中存在但是值不同
insert_fields.push_back(field);
deleted_fields.push_back(std::make_pair(field.first, current_map[field.first]));
}
}
// current_fields中的字段
for (const auto& field : current_fields) {
if (fields_map.find(field.first) == fields_map.end()) {
deleted_fields.push_back(field);
}
}
return std::make_pair(insert_fields, deleted_fields);
}
std::vector<std::string> NewDB::FindKeysByField(Field &field){
Status NewDB::Get_fields(const ReadOptions& options, const Slice& key, FieldArray* fields){
//get value(fields) with key
// 从 DataDB 获取数据
Status s = data_db_->Get_fields(options, key, fields);
if (!s.ok()) {
return s; // 如果获取失败,返回状态
}
return Status::OK();
}
std::vector<std::string> NewDB::FindKeysByField(Field &field){
//get keys with field
std::vector<std::string> matching_keys;
leveldb::DB* db = data_db_.get();
matching_keys = data_db_->FindKeysByField(db, field);
return matching_keys;
}
bool NewDB::Delete(const WriteOptions& options, const Slice& key){
leveldb::WriteBatch data_batch;
leveldb::WriteBatch index_batch;
// uint64_t TinyOpID = 0;
FieldArray fields;
Status s = data_db_->Get_fields(leveldb::ReadOptions(), key, &fields);
if(s.ok()){
// delete in datadb
data_batch.Delete(key.ToString());
// delete in indexdb
for (const auto& field : fields) {
// 如果字段名在 indexed_fields_ 中,才插入二级索引
if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) {
// 构造索引的key,结构:FieldName_FieldValue_Key
std::string index_key = ConstructIndexKey(key, field);
index_batch.Delete(index_key);
}
}
// write dataDB
s = data_db_->Write(leveldb::WriteOptions(), &data_batch);
if(!s.ok()){
return false;
}
// write indexDB
s = index_db_->Write(leveldb::WriteOptions(), &index_batch);
if(!s.ok()){
return false;
}
}
else{
return false;
}
return true;
}
bool NewDB::CreateIndexOnField(const std::string& field_name) {
// 将索引字段元数据持久化到indexDB
index_db_->Put(WriteOptions(), "index_field_write:" + field_name, "1");
std::unique_lock<std::mutex> lock(db_mutex_, std::defer_lock);
lock.lock();
indexed_fields_write.insert(field_name);
lock.unlock();
const size_t BATCH_SIZE_LIMIT = 1000; // 每个批次限制1000条记录
size_t batch_count = 0;
// 遍历 data_db_ 中的所有键值对
leveldb::ReadOptions read_options;
leveldb::WriteBatch batch;
std::unique_ptr<leveldb::Iterator> it(data_db_->NewIterator(read_options));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
// 获取主数据库的键值对
std::string key = it->key().ToString();
std::string value = it->value().ToString();
// 解析值,提取字段数组
FieldArray fields = ParseValue(value);
// 查找指定字段
for (const auto& field : fields) {
if (field.first == field_name) {
// 构造索引键
std::string index_key = ConstructIndexKey(key, field);
// 插入到索引数据库
batch.Put(index_key, Slice());
batch_count++;
// 如果达到批次大小限制,执行写入
if (batch_count >= BATCH_SIZE_LIMIT) {
leveldb::Status status = index_db_->Write(leveldb::WriteOptions(), &batch);
if (!status.ok()) {
return false;
}
batch.Clear(); // 清空批次,准备下一个批次
batch_count = 0;
}
}
}
}
// 如果迭代器出错
if (!it->status().ok()) {
// 从头开始创建,覆盖掉原本失效的东西
return false;
}
// 批量写入
leveldb::WriteOptions write_options;
Status status = index_db_->Write(write_options, &batch);
if (!status.ok()) {
return false;
}
// 将索引字段元数据持久化到indexDB
index_db_->Put(WriteOptions(), "index_field_read:" + field_name, "1");
lock.lock();
indexed_fields_read.insert(field_name);
lock.unlock();
return true;
}
std::vector<std::string> NewDB::QueryByIndex(Field &field){
std::vector<std::string> matching_keys;
if(indexed_fields_read.find(field.first) != indexed_fields_read.end()){
std::string prefix = ConstructIndexKey(Slice(), field); //前缀-字段名+字段值
leveldb::Iterator* iter = index_db_->NewIterator(leveldb::ReadOptions());
for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){
Slice index_key = iter->key();
std::string data_key = ExtractIndexKey(index_key); //解析索引键中的key
matching_keys.push_back(data_key);
}
delete iter;
// check these matching_keys and remove keys not in datadb
for (auto& key:matching_keys) {
FieldArray fields;
Status s = data_db_->Get_fields(leveldb::ReadOptions(), key, &fields);
// if field not in this k-v?
int tag = 0;
if(s.ok()){
for(auto& each_field : fields){
if(each_field.first == field.first){
if(each_field.second == field.second){
tag = 1;
}
}
}
}
if(tag == 0){
matching_keys.erase(std::remove(matching_keys.begin(),matching_keys.end(),key),matching_keys.end());
}
// if(!s.ok()){
// matching_keys.erase(std::remove(matching_keys.begin(),matching_keys.end(),key),matching_keys.end());
// }
}
}
return matching_keys;
}
bool NewDB::DeleteIndex(const std::string& field_name){
// 将索引字段元数据持久化到indexDB
index_db_->Delete(WriteOptions(), "index_field_read:" + field_name);
index_db_->Delete(WriteOptions(), "index_field_write:" + field_name);
std::unique_lock<std::mutex> lock(db_mutex_, std::defer_lock);
lock.lock();
indexed_fields_read.erase(field_name);
indexed_fields_write.erase(field_name);
lock.unlock();
const size_t BATCH_SIZE_LIMIT = 1000; // 每个批次限制1000条记录
size_t batch_count = 0;
leveldb::WriteBatch delete_batch;
std::string prefix = field_name;
leveldb::Iterator* iter = index_db_->NewIterator(leveldb::ReadOptions());
for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){
Slice index_key = iter->key();
delete_batch.Delete(index_key);
batch_count++;
if (batch_count >= BATCH_SIZE_LIMIT) {
leveldb::Status status = index_db_->Write(leveldb::WriteOptions(), &delete_batch);
if (!status.ok()) {
return false;
}
delete_batch.Clear(); // 清空批次,准备下一个批次
batch_count = 0;
}
}
delete iter;
Status s = index_db_->Write(leveldb::WriteOptions(), &delete_batch);
if(!s.ok()){
return false;
}
return true;
}
}

Зареждане…
Отказ
Запис