|
|
@ -1,663 +0,0 @@ |
|
|
|
#include "db/NewDB.h"
|
|
|
|
#include "util/coding.h"
|
|
|
|
#include "db/write_batch_internal.h"
|
|
|
|
#include "db/version_set.h"
|
|
|
|
#include "db/db_impl.h"
|
|
|
|
#include <string>
|
|
|
|
#include <sstream>
|
|
|
|
#include <unordered_map>
|
|
|
|
#include <map>
|
|
|
|
#include <iostream>
|
|
|
|
#include <mutex>
|
|
|
|
#include <thread>
|
|
|
|
#include <ctime>
|
|
|
|
#include <condition_variable>
|
|
|
|
#include <unordered_set>
|
|
|
|
|
|
|
|
namespace leveldb{ |
|
|
|
NewDB::~NewDB() { |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// row lock
|
|
|
|
std::condition_variable cv; |
|
|
|
std::mutex m; |
|
|
|
bool ready = false; |
|
|
|
// row lock
|
|
|
|
|
|
|
|
std::string NewDB::SerializeValue(const FieldArray& fields){ |
|
|
|
std::string value_str; |
|
|
|
for(const auto& pair : fields){ |
|
|
|
std::string field = pair.first + ":" + pair.second; |
|
|
|
uint32_t field_size = field.size(); |
|
|
|
char buffer[4]; |
|
|
|
EncodeFixed32(buffer, field_size); |
|
|
|
value_str.append(buffer, 4); |
|
|
|
value_str.append(field); |
|
|
|
} |
|
|
|
return value_str; |
|
|
|
|
|
|
|
// std::string s;
|
|
|
|
// for(auto& field : fields){
|
|
|
|
// PutLengthPrefixedSlice(&s, Slice(field.first));
|
|
|
|
// PutLengthPrefixedSlice(&s, Slice(field.second));
|
|
|
|
// }
|
|
|
|
// return s;
|
|
|
|
} |
|
|
|
|
|
|
|
FieldArray NewDB::ParseValue(const std::string& value_str){ |
|
|
|
FieldArray fields; |
|
|
|
const char* data = value_str.data(); |
|
|
|
size_t length = value_str.size(); |
|
|
|
|
|
|
|
while (length >= 4) { |
|
|
|
uint32_t field_size = DecodeFixed32(data); |
|
|
|
if (length < 4 + field_size) { |
|
|
|
break; |
|
|
|
} |
|
|
|
|
|
|
|
std::string field(data + 4, field_size); |
|
|
|
size_t colon_pos = field.find(':'); |
|
|
|
|
|
|
|
std::string field_name = field.substr(0, colon_pos); |
|
|
|
std::string field_value = field.substr(colon_pos + 1); |
|
|
|
|
|
|
|
fields.push_back(std::make_pair(field_name,field_value)); |
|
|
|
|
|
|
|
data += 4 + field_size; |
|
|
|
length -= 4 + field_size; |
|
|
|
} |
|
|
|
|
|
|
|
return fields; |
|
|
|
|
|
|
|
// Slice input(value_str);
|
|
|
|
// Slice v1,v2;
|
|
|
|
// FieldArray fields;
|
|
|
|
// while(v1!=Slice()){
|
|
|
|
// GetLengthPrefixedSlice(&input, &v1);
|
|
|
|
// GetLengthPrefixedSlice(&input, &v2);
|
|
|
|
// fields.push_back(std::make_pair(v1.ToString(), v2.ToString()));
|
|
|
|
// }
|
|
|
|
// return fields;
|
|
|
|
} |
|
|
|
|
|
|
|
// 构造索引的key,结构:FieldName_FieldValue_Key-橙
|
|
|
|
std::string NewDB::ConstructIndexKey(const Slice& key, const Field& field){ |
|
|
|
// std::string s;
|
|
|
|
// PutLengthPrefixedSlice(&s, Slice(field.first));
|
|
|
|
// PutLengthPrefixedSlice(&s, Slice(field.second));
|
|
|
|
// PutLengthPrefixedSlice(&s, key);
|
|
|
|
// return s;
|
|
|
|
|
|
|
|
std::ostringstream oss; |
|
|
|
oss << field.first << ":" << field.second << "_" << key.ToString(); |
|
|
|
return oss.str(); |
|
|
|
} |
|
|
|
|
|
|
|
// 从索引键中提取原数据的键
|
|
|
|
std::string NewDB::ExtractIndexKey(const Slice& key){ |
|
|
|
// Slice input(key.ToString());
|
|
|
|
// Slice v;
|
|
|
|
// GetLengthPrefixedSlice(&input, &v);
|
|
|
|
// GetLengthPrefixedSlice(&input, &v);
|
|
|
|
// GetLengthPrefixedSlice(&input, &v);
|
|
|
|
// return v.ToString();
|
|
|
|
|
|
|
|
//extract key of dataDB from key of indexDB
|
|
|
|
std::string fullKey(key.data(), key.size()); |
|
|
|
// size_t pos1 = fullKey.find('_');
|
|
|
|
size_t pos1 = fullKey.find('_'); |
|
|
|
return fullKey.substr(pos1 + 1); // 提取 'Key' 部分
|
|
|
|
} |
|
|
|
|
|
|
|
std::string NewDB::ConstructRecoverKey(std::string UserOpID, std::string TinyOpID, std::string DBname){ |
|
|
|
std::string s; |
|
|
|
PutLengthPrefixedSlice(&s, Slice(UserOpID)); |
|
|
|
PutLengthPrefixedSlice(&s, Slice(TinyOpID)); |
|
|
|
PutLengthPrefixedSlice(&s, Slice(DBname)); |
|
|
|
return s; |
|
|
|
} |
|
|
|
|
|
|
|
std::string NewDB::ConstructRecoverValue(std::string TinyOp, std::string key, std::string value){ |
|
|
|
std::string s; |
|
|
|
PutLengthPrefixedSlice(&s, Slice(TinyOp)); |
|
|
|
PutLengthPrefixedSlice(&s, Slice(key)); |
|
|
|
PutLengthPrefixedSlice(&s, Slice(value)); |
|
|
|
return s; |
|
|
|
} |
|
|
|
|
|
|
|
std::string NewDB::ExtractRecoverKey(std::string s){ |
|
|
|
Slice input(s); |
|
|
|
Slice v; |
|
|
|
GetLengthPrefixedSlice(&input, &v); |
|
|
|
GetLengthPrefixedSlice(&input, &v); |
|
|
|
GetLengthPrefixedSlice(&input, &v); |
|
|
|
return v.ToString(); |
|
|
|
} |
|
|
|
|
|
|
|
std::pair<std::string, std::string> NewDB::ExtractRecoverValue(std::string s, std::string* TinyOp){ |
|
|
|
Slice input(s); |
|
|
|
Slice v; |
|
|
|
GetLengthPrefixedSlice(&input, &v); |
|
|
|
*TinyOp = v.ToString(); |
|
|
|
GetLengthPrefixedSlice(&input, &v); |
|
|
|
std::string key = v.ToString(); |
|
|
|
GetLengthPrefixedSlice(&input, &v); |
|
|
|
std::string value = v.ToString(); |
|
|
|
return std::make_pair(key, value); |
|
|
|
} |
|
|
|
|
|
|
|
std::string NewDB::ConstructUserOpID(std::thread::id thread_id){ |
|
|
|
auto now = std::chrono::system_clock::now(); |
|
|
|
std::time_t now_t = std::chrono::system_clock::to_time_t(now); |
|
|
|
|
|
|
|
std::ostringstream oss; |
|
|
|
oss << thread_id; |
|
|
|
std::string UserOpID = std::to_string(now_t) + oss.str(); |
|
|
|
return UserOpID; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
Status NewDB::Open(const Options& options, const std::string& name, NewDB** dbptr) { |
|
|
|
// 打开底层的数据库
|
|
|
|
DB* dataDB; |
|
|
|
Status s1 = DB::Open(options, name + "_data", &dataDB); |
|
|
|
if (!s1.ok()) { |
|
|
|
return s1; // 如果打开失败,返回错误
|
|
|
|
} |
|
|
|
|
|
|
|
DB* indexDB; |
|
|
|
Status s2 = DB::Open(options, name + "_index", &indexDB); |
|
|
|
if (!s2.ok()) { |
|
|
|
return s2; // 如果打开失败,返回错误
|
|
|
|
} |
|
|
|
|
|
|
|
DB* recoverDB; |
|
|
|
Status s3 = DB::Open(options, name + "_recover", &recoverDB); |
|
|
|
if (!s3.ok()) { |
|
|
|
return s3; // 如果打开失败,返回错误
|
|
|
|
} |
|
|
|
// 创建一个 NewDB 实例
|
|
|
|
*dbptr = new NewDB(); |
|
|
|
|
|
|
|
// 初始化 data_db_,index_db_和recover_db_
|
|
|
|
(*dbptr)->data_db_ = std::unique_ptr<DB>(dataDB); // 将打开的数据库指针传递给 data_db_
|
|
|
|
(*dbptr)->index_db_ = std::unique_ptr<DB>(indexDB); // 创建 IndexDB
|
|
|
|
(*dbptr)->recover_db_ = std::unique_ptr<DB>(recoverDB); |
|
|
|
|
|
|
|
(*dbptr)->TinyOpID = 0; |
|
|
|
|
|
|
|
// recover dataDB&indexDB using recoverDB
|
|
|
|
leveldb::Iterator* iter = recoverDB->NewIterator(leveldb::ReadOptions()); |
|
|
|
Slice recover_key; |
|
|
|
for(iter->SeekToFirst(); iter->Valid(); iter->Next()){ |
|
|
|
recover_key = iter->key(); |
|
|
|
std::string DBname = (*dbptr)->ExtractRecoverKey(recover_key.ToString()); |
|
|
|
std::string TinyOp; |
|
|
|
std::pair<std::string, std::string> k_v = (*dbptr)->ExtractRecoverValue(iter->value().ToString(), &TinyOp); |
|
|
|
if(DBname == "dataDB"){ |
|
|
|
if(TinyOp == "PUT"){ |
|
|
|
dataDB->Put(leveldb::WriteOptions(), k_v.first, k_v.second); |
|
|
|
} |
|
|
|
else{ |
|
|
|
dataDB->Delete(leveldb::WriteOptions(), k_v.first); |
|
|
|
} |
|
|
|
} |
|
|
|
else{ |
|
|
|
if(TinyOp == "PUT"){ |
|
|
|
indexDB->Put(leveldb::WriteOptions(), k_v.first, k_v.second); |
|
|
|
} |
|
|
|
else{ |
|
|
|
indexDB->Delete(leveldb::WriteOptions(), k_v.first); |
|
|
|
} |
|
|
|
} |
|
|
|
recoverDB->Delete(leveldb::WriteOptions(), recover_key); |
|
|
|
} |
|
|
|
// std::cout << "recover " << i << " rows, end key is " << keystr << std::endl;
|
|
|
|
delete iter; |
|
|
|
|
|
|
|
// 重新构造indexed_fields
|
|
|
|
Iterator* it = (*dbptr)->index_db_->NewIterator(ReadOptions()); |
|
|
|
for (it->SeekToFirst(); it->Valid(); it->Next()) { |
|
|
|
// 只关心以 "index_field" 为前缀的键
|
|
|
|
if (it->key().starts_with("index_field_read:")) { |
|
|
|
std::string field = it->key().ToString().substr(16); // 获取字段名
|
|
|
|
(*dbptr)->indexed_fields_read.insert(field); |
|
|
|
} |
|
|
|
if (it->key().starts_with("index_field_write:")) { |
|
|
|
std::string field = it->key().ToString().substr(17); // 获取字段名
|
|
|
|
(*dbptr)->indexed_fields_write.insert(field); |
|
|
|
} |
|
|
|
} |
|
|
|
delete it; |
|
|
|
|
|
|
|
return Status::OK(); // 返回成功
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
Status NewDB::Put_fields(const WriteOptions& options, const Slice& key, const FieldArray& fields){ |
|
|
|
std::string UserOpID = ConstructUserOpID(std::this_thread::get_id()); |
|
|
|
|
|
|
|
// row lock
|
|
|
|
std::unique_lock<std::mutex> lock(db_mutex_, std::defer_lock); |
|
|
|
std::unique_lock<std::mutex> row_lock(m, std::defer_lock); |
|
|
|
ready = (putting_keys.find(key.ToString()) == putting_keys.end()); // no another thread putting the same key
|
|
|
|
|
|
|
|
if (!ready) { |
|
|
|
cv.wait(row_lock, []{return ready;}); |
|
|
|
} |
|
|
|
|
|
|
|
lock.lock(); |
|
|
|
putting_keys.insert(key.ToString()); |
|
|
|
lock.unlock(); |
|
|
|
// row lock
|
|
|
|
|
|
|
|
FieldArray current_fields; |
|
|
|
Status s = Get_fields(leveldb::ReadOptions(), key, ¤t_fields); |
|
|
|
|
|
|
|
leveldb::WriteBatch data_batch; |
|
|
|
leveldb::WriteBatch index_batch; |
|
|
|
leveldb::WriteBatch recover_batch; |
|
|
|
|
|
|
|
// uint64_t TinyOpID = 0;
|
|
|
|
|
|
|
|
if(!s.ok()){ |
|
|
|
for (const auto& field : fields) { |
|
|
|
// 如果字段名在 indexed_fields_ 中,才插入二级索引
|
|
|
|
if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) { |
|
|
|
// 构造索引的key,结构:FieldName_FieldValue_Key
|
|
|
|
std::string index_key = ConstructIndexKey(key, field); |
|
|
|
index_batch.Put(index_key, Slice()); |
|
|
|
// prepare recover_batch
|
|
|
|
std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); |
|
|
|
std::string Recover_value = ConstructRecoverValue("PUT", index_key, ""); |
|
|
|
recover_batch.Put(Recover_key, Recover_value); |
|
|
|
|
|
|
|
TinyOpID = TinyOpID + 1; |
|
|
|
} |
|
|
|
} |
|
|
|
std::string value = SerializeValue(fields); |
|
|
|
data_batch.Put(key.ToString(), value); |
|
|
|
|
|
|
|
// put dataDB's k-v into recover_batch
|
|
|
|
std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "dataDB"); |
|
|
|
std::string Recover_value = ConstructRecoverValue("PUT", key.ToString(), value); |
|
|
|
recover_batch.Put(Recover_key, Recover_value); |
|
|
|
|
|
|
|
TinyOpID = TinyOpID + 1; |
|
|
|
} |
|
|
|
|
|
|
|
else{ |
|
|
|
std::pair<FieldArray, FieldArray> fieldarray_pair = UpdateIndex(options, key, fields, current_fields); |
|
|
|
|
|
|
|
// put ops in indexdb
|
|
|
|
for (const auto& field : fieldarray_pair.first) { |
|
|
|
if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) { |
|
|
|
// put into index_batch
|
|
|
|
std::string index_key = ConstructIndexKey(key, field); |
|
|
|
index_batch.Put(index_key, Slice()); |
|
|
|
// put into recover_batch
|
|
|
|
std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); |
|
|
|
std::string Recover_value = ConstructRecoverValue("PUT", index_key, ""); |
|
|
|
recover_batch.Put(Recover_key, Recover_value); |
|
|
|
|
|
|
|
TinyOpID = TinyOpID + 1; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// put ops in datadb
|
|
|
|
std::string value = SerializeValue(fields); |
|
|
|
data_batch.Put(key.ToString(), value); |
|
|
|
|
|
|
|
// put dataDB's k-v into recover_batch
|
|
|
|
std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "dataDB"); |
|
|
|
std::string Recover_value = ConstructRecoverValue("PUT", key.ToString(), value); |
|
|
|
recover_batch.Put(Recover_key, Recover_value); |
|
|
|
TinyOpID = TinyOpID + 1; |
|
|
|
|
|
|
|
// delete ops in indexdb
|
|
|
|
for (const auto& field : fieldarray_pair.second) { |
|
|
|
if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) { |
|
|
|
// delete in index_batch
|
|
|
|
std::string index_key = ConstructIndexKey(key, field); |
|
|
|
index_batch.Delete(index_key); |
|
|
|
// delete in recover_batch
|
|
|
|
std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); |
|
|
|
std::string Recover_value = ConstructRecoverValue("DELETE", index_key, ""); |
|
|
|
recover_batch.Put(Recover_key, Recover_value); |
|
|
|
|
|
|
|
TinyOpID = TinyOpID + 1; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// write into RocoverDB
|
|
|
|
leveldb::WriteOptions write_options; |
|
|
|
s = recover_db_->Write(write_options, &recover_batch); |
|
|
|
if(!s.ok()){ |
|
|
|
return s; |
|
|
|
} |
|
|
|
|
|
|
|
// write into indexDB
|
|
|
|
Status status = index_db_->Write(write_options, &index_batch); |
|
|
|
if(!status.ok()){ |
|
|
|
return status; |
|
|
|
} |
|
|
|
|
|
|
|
// write into dataDB
|
|
|
|
Status data_status = data_db_->Write(write_options, &data_batch); |
|
|
|
if (!data_status.ok()) { |
|
|
|
return data_status; // 主数据写入失败,直接返回
|
|
|
|
} |
|
|
|
|
|
|
|
//delete TinyOps of this UserOp in RecoverDB
|
|
|
|
leveldb::WriteBatch batch; |
|
|
|
std::string prefix; |
|
|
|
PutLengthPrefixedSlice(&prefix, Slice(UserOpID)); |
|
|
|
leveldb::Iterator* iter = recover_db_->NewIterator(leveldb::ReadOptions()); |
|
|
|
for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){ |
|
|
|
Slice Recover_key = iter->key(); |
|
|
|
batch.Delete(Recover_key); |
|
|
|
} |
|
|
|
delete iter; |
|
|
|
recover_db_->Write(leveldb::WriteOptions(), &batch); |
|
|
|
|
|
|
|
// row lock
|
|
|
|
lock.lock(); |
|
|
|
putting_keys.erase(key.ToString()); |
|
|
|
lock.unlock(); |
|
|
|
|
|
|
|
ready = true; |
|
|
|
cv.notify_all(); |
|
|
|
// row lock
|
|
|
|
|
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
|
|
|
|
std::pair<FieldArray, FieldArray> NewDB::UpdateIndex(const WriteOptions& options, const Slice& key, const FieldArray& fields, const FieldArray& current_fields){ |
|
|
|
// 构建current_fields的映射
|
|
|
|
std::unordered_map<std::string, std::string> current_map; |
|
|
|
for (const auto& field : current_fields) { |
|
|
|
current_map[field.first] = field.second; |
|
|
|
} |
|
|
|
|
|
|
|
// 构建fields的映射
|
|
|
|
std::unordered_map<std::string, std::string> fields_map; |
|
|
|
for (const auto& field : fields) { |
|
|
|
fields_map[field.first] = field.second; |
|
|
|
} |
|
|
|
|
|
|
|
// 删除的字段
|
|
|
|
FieldArray deleted_fields; |
|
|
|
// 新增的字段
|
|
|
|
FieldArray insert_fields; |
|
|
|
|
|
|
|
// fields中的字段
|
|
|
|
for (const auto& field : fields) { |
|
|
|
// fields中存在但current_fields中不存在
|
|
|
|
if (current_map.find(field.first) == current_map.end()) { |
|
|
|
insert_fields.push_back(field); |
|
|
|
} else if (current_map[field.first] != field.second) { |
|
|
|
// current_fields中存在但是值不同
|
|
|
|
insert_fields.push_back(field); |
|
|
|
deleted_fields.push_back(std::make_pair(field.first, current_map[field.first])); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// current_fields中的字段
|
|
|
|
for (const auto& field : current_fields) { |
|
|
|
if (fields_map.find(field.first) == fields_map.end()) { |
|
|
|
deleted_fields.push_back(field); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return std::make_pair(insert_fields, deleted_fields); |
|
|
|
} |
|
|
|
|
|
|
|
Status NewDB::Get_fields(const ReadOptions& options, const Slice& key, FieldArray* fields){ |
|
|
|
//get value(fields) with key
|
|
|
|
// 从 DataDB 获取数据
|
|
|
|
Status s = data_db_->Get_fields(options, key, fields); |
|
|
|
if (!s.ok()) { |
|
|
|
return s; // 如果获取失败,返回状态
|
|
|
|
} |
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
|
|
|
|
std::vector<std::string> NewDB::FindKeysByField(Field &field){ |
|
|
|
//get keys with field
|
|
|
|
std::vector<std::string> matching_keys; |
|
|
|
leveldb::DB* db = data_db_.get(); |
|
|
|
matching_keys = data_db_->FindKeysByField(db, field); |
|
|
|
return matching_keys; |
|
|
|
} |
|
|
|
|
|
|
|
bool NewDB::Delete(const WriteOptions& options, const Slice& key){ |
|
|
|
std::string UserOpID = ConstructUserOpID(std::this_thread::get_id()); |
|
|
|
|
|
|
|
leveldb::WriteBatch data_batch; |
|
|
|
leveldb::WriteBatch index_batch; |
|
|
|
leveldb::WriteBatch recover_batch; |
|
|
|
// uint64_t TinyOpID = 0;
|
|
|
|
|
|
|
|
FieldArray fields; |
|
|
|
Status s = data_db_->Get_fields(leveldb::ReadOptions(), key, &fields); |
|
|
|
|
|
|
|
if(s.ok()){ |
|
|
|
// delete in datadb
|
|
|
|
data_batch.Delete(key.ToString()); |
|
|
|
|
|
|
|
std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "dataDB"); |
|
|
|
std::string Recover_value = ConstructRecoverValue("DELETE", key.ToString(), ""); |
|
|
|
recover_batch.Put(Recover_key, Recover_value); |
|
|
|
TinyOpID = TinyOpID + 1; |
|
|
|
|
|
|
|
// delete in indexdb
|
|
|
|
for (const auto& field : fields) { |
|
|
|
// 如果字段名在 indexed_fields_ 中,才插入二级索引
|
|
|
|
if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) { |
|
|
|
// 构造索引的key,结构:FieldName_FieldValue_Key
|
|
|
|
std::string index_key = ConstructIndexKey(key, field); |
|
|
|
index_batch.Delete(index_key); |
|
|
|
// prepare recover_batch
|
|
|
|
std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); |
|
|
|
std::string Recover_value = ConstructRecoverValue("DELETE", index_key, ""); |
|
|
|
recover_batch.Put(Recover_key, Recover_value); |
|
|
|
|
|
|
|
TinyOpID = TinyOpID + 1; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// write recoverDB
|
|
|
|
Status s = recover_db_->Write(leveldb::WriteOptions(), &recover_batch); |
|
|
|
if(!s.ok()){ |
|
|
|
return false; |
|
|
|
} |
|
|
|
|
|
|
|
// write dataDB
|
|
|
|
s = data_db_->Write(leveldb::WriteOptions(), &data_batch); |
|
|
|
if(!s.ok()){ |
|
|
|
return false; |
|
|
|
} |
|
|
|
|
|
|
|
// write indexDB
|
|
|
|
s = index_db_->Write(leveldb::WriteOptions(), &index_batch); |
|
|
|
if(!s.ok()){ |
|
|
|
return false; |
|
|
|
} |
|
|
|
|
|
|
|
//delete TinyOps of this UserOp in RecoverDB
|
|
|
|
leveldb::WriteBatch batch; |
|
|
|
std::string prefix; |
|
|
|
PutLengthPrefixedSlice(&prefix, Slice(UserOpID)); |
|
|
|
leveldb::Iterator* iter = recover_db_->NewIterator(leveldb::ReadOptions()); |
|
|
|
for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){ |
|
|
|
Slice Recover_key = iter->key(); |
|
|
|
batch.Delete(Recover_key); |
|
|
|
} |
|
|
|
delete iter; |
|
|
|
recover_db_->Write(leveldb::WriteOptions(), &batch); |
|
|
|
} |
|
|
|
else{ |
|
|
|
return false; |
|
|
|
} |
|
|
|
|
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
bool NewDB::CreateIndexOnField(const std::string& field_name) { |
|
|
|
// 将索引字段元数据持久化到indexDB
|
|
|
|
index_db_->Put(WriteOptions(), "index_field_write:" + field_name, "1"); |
|
|
|
std::unique_lock<std::mutex> lock(db_mutex_, std::defer_lock); |
|
|
|
lock.lock(); |
|
|
|
indexed_fields_write.insert(field_name); |
|
|
|
lock.unlock(); |
|
|
|
|
|
|
|
const size_t BATCH_SIZE_LIMIT = 1000; // 每个批次限制1000条记录
|
|
|
|
size_t batch_count = 0; |
|
|
|
|
|
|
|
// 遍历 data_db_ 中的所有键值对
|
|
|
|
leveldb::ReadOptions read_options; |
|
|
|
leveldb::WriteBatch batch; |
|
|
|
std::unique_ptr<leveldb::Iterator> it(data_db_->NewIterator(read_options)); |
|
|
|
|
|
|
|
for (it->SeekToFirst(); it->Valid(); it->Next()) { |
|
|
|
// 获取主数据库的键值对
|
|
|
|
std::string key = it->key().ToString(); |
|
|
|
std::string value = it->value().ToString(); |
|
|
|
|
|
|
|
// 解析值,提取字段数组
|
|
|
|
FieldArray fields = ParseValue(value); |
|
|
|
|
|
|
|
// 查找指定字段
|
|
|
|
for (const auto& field : fields) { |
|
|
|
if (field.first == field_name) { |
|
|
|
// 构造索引键
|
|
|
|
std::string index_key = ConstructIndexKey(key, field); |
|
|
|
|
|
|
|
// 插入到索引数据库
|
|
|
|
batch.Put(index_key, Slice()); |
|
|
|
|
|
|
|
batch_count++; |
|
|
|
// 如果达到批次大小限制,执行写入
|
|
|
|
if (batch_count >= BATCH_SIZE_LIMIT) { |
|
|
|
leveldb::Status status = index_db_->Write(leveldb::WriteOptions(), &batch); |
|
|
|
if (!status.ok()) { |
|
|
|
return false; |
|
|
|
} |
|
|
|
batch.Clear(); // 清空批次,准备下一个批次
|
|
|
|
batch_count = 0; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// 如果迭代器出错
|
|
|
|
if (!it->status().ok()) { |
|
|
|
// 从头开始创建,覆盖掉原本失效的东西
|
|
|
|
return false; |
|
|
|
} |
|
|
|
// 批量写入
|
|
|
|
leveldb::WriteOptions write_options; |
|
|
|
Status status = index_db_->Write(write_options, &batch); |
|
|
|
if (!status.ok()) { |
|
|
|
return false; |
|
|
|
} |
|
|
|
|
|
|
|
// 将索引字段元数据持久化到indexDB
|
|
|
|
index_db_->Put(WriteOptions(), "index_field_read:" + field_name, "1"); |
|
|
|
lock.lock(); |
|
|
|
indexed_fields_read.insert(field_name); |
|
|
|
lock.unlock(); |
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
std::vector<std::string> NewDB::QueryByIndex(Field &field){ |
|
|
|
std::vector<std::string> matching_keys; |
|
|
|
|
|
|
|
if(indexed_fields_read.find(field.first) != indexed_fields_read.end()){ |
|
|
|
std::string prefix = ConstructIndexKey(Slice(), field); //前缀-字段名+字段值
|
|
|
|
leveldb::Iterator* iter = index_db_->NewIterator(leveldb::ReadOptions()); |
|
|
|
for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){ |
|
|
|
Slice index_key = iter->key(); |
|
|
|
std::string data_key = ExtractIndexKey(index_key); //解析索引键中的key
|
|
|
|
matching_keys.push_back(data_key); |
|
|
|
} |
|
|
|
delete iter; |
|
|
|
|
|
|
|
// check these matching_keys and remove keys not in datadb
|
|
|
|
for (auto& key:matching_keys) { |
|
|
|
FieldArray fields; |
|
|
|
Status s = data_db_->Get_fields(leveldb::ReadOptions(), key, &fields); |
|
|
|
|
|
|
|
// if field not in this k-v?
|
|
|
|
int tag = 0; |
|
|
|
if(s.ok()){ |
|
|
|
for(auto& each_field : fields){ |
|
|
|
if(each_field.first == field.first){ |
|
|
|
if(each_field.second == field.second){ |
|
|
|
tag = 1; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
if(tag == 0){ |
|
|
|
matching_keys.erase(std::remove(matching_keys.begin(),matching_keys.end(),key),matching_keys.end()); |
|
|
|
} |
|
|
|
// if(!s.ok()){
|
|
|
|
// matching_keys.erase(std::remove(matching_keys.begin(),matching_keys.end(),key),matching_keys.end());
|
|
|
|
// }
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
return matching_keys; |
|
|
|
} |
|
|
|
|
|
|
|
bool NewDB::DeleteIndex(const std::string& field_name){ |
|
|
|
// 将索引字段元数据持久化到indexDB
|
|
|
|
index_db_->Delete(WriteOptions(), "index_field_read:" + field_name); |
|
|
|
index_db_->Delete(WriteOptions(), "index_field_write:" + field_name); |
|
|
|
std::unique_lock<std::mutex> lock(db_mutex_, std::defer_lock); |
|
|
|
lock.lock(); |
|
|
|
indexed_fields_read.erase(field_name); |
|
|
|
indexed_fields_write.erase(field_name); |
|
|
|
lock.unlock(); |
|
|
|
|
|
|
|
const size_t BATCH_SIZE_LIMIT = 1000; // 每个批次限制1000条记录
|
|
|
|
size_t batch_count = 0; |
|
|
|
|
|
|
|
leveldb::WriteBatch delete_batch; |
|
|
|
|
|
|
|
std::string prefix = field_name; |
|
|
|
leveldb::Iterator* iter = index_db_->NewIterator(leveldb::ReadOptions()); |
|
|
|
for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){ |
|
|
|
Slice index_key = iter->key(); |
|
|
|
delete_batch.Delete(index_key); |
|
|
|
batch_count++; |
|
|
|
if (batch_count >= BATCH_SIZE_LIMIT) { |
|
|
|
leveldb::Status status = index_db_->Write(leveldb::WriteOptions(), &delete_batch); |
|
|
|
if (!status.ok()) { |
|
|
|
return false; |
|
|
|
} |
|
|
|
delete_batch.Clear(); // 清空批次,准备下一个批次
|
|
|
|
batch_count = 0; |
|
|
|
} |
|
|
|
} |
|
|
|
delete iter; |
|
|
|
|
|
|
|
Status s = index_db_->Write(leveldb::WriteOptions(), &delete_batch); |
|
|
|
if(!s.ok()){ |
|
|
|
return false; |
|
|
|
} |
|
|
|
|
|
|
|
return true; |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|