Kaynağa Gözat

二级索引-一致性

master
陈予曈 5 ay önce
ebeveyn
işleme
43fad69972
1 değiştirilmiş dosya ile 190 ekleme ve 0 silme
  1. +190
    -0
      db/NewDB.cc

+ 190
- 0
db/NewDB.cc Dosyayı Görüntüle

@ -19,6 +19,12 @@ NewDB::~NewDB() {
}
// row lock
std::condition_variable cv;
std::mutex m;
bool ready = false;
// row lock
std::string NewDB::SerializeValue(const FieldArray& fields){
std::string value_str;
for(const auto& pair : fields){
@ -104,6 +110,53 @@ std::string NewDB::ExtractIndexKey(const Slice& key){
return fullKey.substr(pos1 + 1); // 提取 'Key' 部分
}
std::string NewDB::ConstructRecoverKey(std::string UserOpID, std::string TinyOpID, std::string DBname){
std::string s;
PutLengthPrefixedSlice(&s, Slice(UserOpID));
PutLengthPrefixedSlice(&s, Slice(TinyOpID));
PutLengthPrefixedSlice(&s, Slice(DBname));
return s;
}
std::string NewDB::ConstructRecoverValue(std::string TinyOp, std::string key, std::string value){
std::string s;
PutLengthPrefixedSlice(&s, Slice(TinyOp));
PutLengthPrefixedSlice(&s, Slice(key));
PutLengthPrefixedSlice(&s, Slice(value));
return s;
}
std::string NewDB::ExtractRecoverKey(std::string s){
Slice input(s);
Slice v;
GetLengthPrefixedSlice(&input, &v);
GetLengthPrefixedSlice(&input, &v);
GetLengthPrefixedSlice(&input, &v);
return v.ToString();
}
std::pair<std::string, std::string> NewDB::ExtractRecoverValue(std::string s, std::string* TinyOp){
Slice input(s);
Slice v;
GetLengthPrefixedSlice(&input, &v);
*TinyOp = v.ToString();
GetLengthPrefixedSlice(&input, &v);
std::string key = v.ToString();
GetLengthPrefixedSlice(&input, &v);
std::string value = v.ToString();
return std::make_pair(key, value);
}
std::string NewDB::ConstructUserOpID(std::thread::id thread_id){
auto now = std::chrono::system_clock::now();
std::time_t now_t = std::chrono::system_clock::to_time_t(now);
std::ostringstream oss;
oss << thread_id;
std::string UserOpID = std::to_string(now_t) + oss.str();
return UserOpID;
}
Status NewDB::Open(const Options& options, const std::string& name, NewDB** dbptr) {
// 打开底层的数据库
DB* dataDB;
@ -133,6 +186,35 @@ Status NewDB::Open(const Options& options, const std::string& name, NewDB** dbpt
(*dbptr)->TinyOpID = 0;
// recover dataDB&indexDB using recoverDB
leveldb::Iterator* iter = recoverDB->NewIterator(leveldb::ReadOptions());
Slice recover_key;
for(iter->SeekToFirst(); iter->Valid(); iter->Next()){
recover_key = iter->key();
std::string DBname = (*dbptr)->ExtractRecoverKey(recover_key.ToString());
std::string TinyOp;
std::pair<std::string, std::string> k_v = (*dbptr)->ExtractRecoverValue(iter->value().ToString(), &TinyOp);
if(DBname == "dataDB"){
if(TinyOp == "PUT"){
dataDB->Put(leveldb::WriteOptions(), k_v.first, k_v.second);
}
else{
dataDB->Delete(leveldb::WriteOptions(), k_v.first);
}
}
else{
if(TinyOp == "PUT"){
indexDB->Put(leveldb::WriteOptions(), k_v.first, k_v.second);
}
else{
indexDB->Delete(leveldb::WriteOptions(), k_v.first);
}
}
recoverDB->Delete(leveldb::WriteOptions(), recover_key);
}
// std::cout << "recover " << i << " rows, end key is " << keystr << std::endl;
delete iter;
// 重新构造indexed_fields
Iterator* it = (*dbptr)->index_db_->NewIterator(ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
@ -153,11 +235,28 @@ Status NewDB::Open(const Options& options, const std::string& name, NewDB** dbpt
Status NewDB::Put_fields(const WriteOptions& options, const Slice& key, const FieldArray& fields){
std::string UserOpID = ConstructUserOpID(std::this_thread::get_id());
// row lock
std::unique_lock<std::mutex> lock(db_mutex_, std::defer_lock);
std::unique_lock<std::mutex> row_lock(m, std::defer_lock);
ready = (putting_keys.find(key.ToString()) == putting_keys.end()); // no another thread putting the same key
if (!ready) {
cv.wait(row_lock, []{return ready;});
}
lock.lock();
putting_keys.insert(key.ToString());
lock.unlock();
// row lock
FieldArray current_fields;
Status s = Get_fields(leveldb::ReadOptions(), key, &current_fields);
leveldb::WriteBatch data_batch;
leveldb::WriteBatch index_batch;
leveldb::WriteBatch recover_batch;
// uint64_t TinyOpID = 0;
@ -168,10 +267,23 @@ Status NewDB::Put_fields(const WriteOptions& options, const Slice& key, const Fi
// 构造索引的key,结构:FieldName_FieldValue_Key
std::string index_key = ConstructIndexKey(key, field);
index_batch.Put(index_key, Slice());
// prepare recover_batch
std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB");
std::string Recover_value = ConstructRecoverValue("PUT", index_key, "");
recover_batch.Put(Recover_key, Recover_value);
TinyOpID = TinyOpID + 1;
}
}
std::string value = SerializeValue(fields);
data_batch.Put(key.ToString(), value);
// put dataDB's k-v into recover_batch
std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "dataDB");
std::string Recover_value = ConstructRecoverValue("PUT", key.ToString(), value);
recover_batch.Put(Recover_key, Recover_value);
TinyOpID = TinyOpID + 1;
}
else{
@ -183,6 +295,12 @@ Status NewDB::Put_fields(const WriteOptions& options, const Slice& key, const Fi
// put into index_batch
std::string index_key = ConstructIndexKey(key, field);
index_batch.Put(index_key, Slice());
// put into recover_batch
std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB");
std::string Recover_value = ConstructRecoverValue("PUT", index_key, "");
recover_batch.Put(Recover_key, Recover_value);
TinyOpID = TinyOpID + 1;
}
}
@ -190,16 +308,35 @@ Status NewDB::Put_fields(const WriteOptions& options, const Slice& key, const Fi
std::string value = SerializeValue(fields);
data_batch.Put(key.ToString(), value);
// put dataDB's k-v into recover_batch
std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "dataDB");
std::string Recover_value = ConstructRecoverValue("PUT", key.ToString(), value);
recover_batch.Put(Recover_key, Recover_value);
TinyOpID = TinyOpID + 1;
// delete ops in indexdb
for (const auto& field : fieldarray_pair.second) {
if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) {
// delete in index_batch
std::string index_key = ConstructIndexKey(key, field);
index_batch.Delete(index_key);
// delete in recover_batch
std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB");
std::string Recover_value = ConstructRecoverValue("DELETE", index_key, "");
recover_batch.Put(Recover_key, Recover_value);
TinyOpID = TinyOpID + 1;
}
}
}
// write into RocoverDB
leveldb::WriteOptions write_options;
s = recover_db_->Write(write_options, &recover_batch);
if(!s.ok()){
return s;
}
// write into indexDB
Status status = index_db_->Write(write_options, &index_batch);
if(!status.ok()){
@ -212,6 +349,27 @@ Status NewDB::Put_fields(const WriteOptions& options, const Slice& key, const Fi
return data_status; // 主数据写入失败,直接返回
}
//delete TinyOps of this UserOp in RecoverDB
leveldb::WriteBatch batch;
std::string prefix;
PutLengthPrefixedSlice(&prefix, Slice(UserOpID));
leveldb::Iterator* iter = recover_db_->NewIterator(leveldb::ReadOptions());
for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){
Slice Recover_key = iter->key();
batch.Delete(Recover_key);
}
delete iter;
recover_db_->Write(leveldb::WriteOptions(), &batch);
// row lock
lock.lock();
putting_keys.erase(key.ToString());
lock.unlock();
ready = true;
cv.notify_all();
// row lock
return Status::OK();
}
@ -274,8 +432,11 @@ std::vector NewDB::FindKeysByField(Field &field){
}
bool NewDB::Delete(const WriteOptions& options, const Slice& key){
std::string UserOpID = ConstructUserOpID(std::this_thread::get_id());
leveldb::WriteBatch data_batch;
leveldb::WriteBatch index_batch;
leveldb::WriteBatch recover_batch;
// uint64_t TinyOpID = 0;
FieldArray fields;
@ -285,6 +446,11 @@ bool NewDB::Delete(const WriteOptions& options, const Slice& key){
// delete in datadb
data_batch.Delete(key.ToString());
std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "dataDB");
std::string Recover_value = ConstructRecoverValue("DELETE", key.ToString(), "");
recover_batch.Put(Recover_key, Recover_value);
TinyOpID = TinyOpID + 1;
// delete in indexdb
for (const auto& field : fields) {
// 如果字段名在 indexed_fields_ 中,才插入二级索引
@ -292,9 +458,21 @@ bool NewDB::Delete(const WriteOptions& options, const Slice& key){
// 构造索引的key,结构:FieldName_FieldValue_Key
std::string index_key = ConstructIndexKey(key, field);
index_batch.Delete(index_key);
// prepare recover_batch
std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB");
std::string Recover_value = ConstructRecoverValue("DELETE", index_key, "");
recover_batch.Put(Recover_key, Recover_value);
TinyOpID = TinyOpID + 1;
}
}
// write recoverDB
Status s = recover_db_->Write(leveldb::WriteOptions(), &recover_batch);
if(!s.ok()){
return false;
}
// write dataDB
s = data_db_->Write(leveldb::WriteOptions(), &data_batch);
if(!s.ok()){
@ -306,6 +484,18 @@ bool NewDB::Delete(const WriteOptions& options, const Slice& key){
if(!s.ok()){
return false;
}
//delete TinyOps of this UserOp in RecoverDB
leveldb::WriteBatch batch;
std::string prefix;
PutLengthPrefixedSlice(&prefix, Slice(UserOpID));
leveldb::Iterator* iter = recover_db_->NewIterator(leveldb::ReadOptions());
for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){
Slice Recover_key = iter->key();
batch.Delete(Recover_key);
}
delete iter;
recover_db_->Write(leveldb::WriteOptions(), &batch);
}
else{
return false;

||||||
x
 
000:0
Yükleniyor…
İptal
Kaydet