diff --git a/db/NewDB.cc b/db/NewDB.cc index 0f10928..8a81616 100644 --- a/db/NewDB.cc +++ b/db/NewDB.cc @@ -19,6 +19,12 @@ NewDB::~NewDB() { } +// row lock +std::condition_variable cv; +std::mutex m; +bool ready = false; +// row lock + std::string NewDB::SerializeValue(const FieldArray& fields){ std::string value_str; for(const auto& pair : fields){ @@ -104,6 +110,53 @@ std::string NewDB::ExtractIndexKey(const Slice& key){ return fullKey.substr(pos1 + 1); // 提取 'Key' 部分 } +std::string NewDB::ConstructRecoverKey(std::string UserOpID, std::string TinyOpID, std::string DBname){ + std::string s; + PutLengthPrefixedSlice(&s, Slice(UserOpID)); + PutLengthPrefixedSlice(&s, Slice(TinyOpID)); + PutLengthPrefixedSlice(&s, Slice(DBname)); + return s; +} + +std::string NewDB::ConstructRecoverValue(std::string TinyOp, std::string key, std::string value){ + std::string s; + PutLengthPrefixedSlice(&s, Slice(TinyOp)); + PutLengthPrefixedSlice(&s, Slice(key)); + PutLengthPrefixedSlice(&s, Slice(value)); + return s; +} + +std::string NewDB::ExtractRecoverKey(std::string s){ + Slice input(s); + Slice v; + GetLengthPrefixedSlice(&input, &v); + GetLengthPrefixedSlice(&input, &v); + GetLengthPrefixedSlice(&input, &v); + return v.ToString(); +} + +std::pair<std::string, std::string> NewDB::ExtractRecoverValue(std::string s, std::string* TinyOp){ + Slice input(s); + Slice v; + GetLengthPrefixedSlice(&input, &v); + *TinyOp = v.ToString(); + GetLengthPrefixedSlice(&input, &v); + std::string key = v.ToString(); + GetLengthPrefixedSlice(&input, &v); + std::string value = v.ToString(); + return std::make_pair(key, value); +} + +std::string NewDB::ConstructUserOpID(std::thread::id thread_id){ + auto now = std::chrono::system_clock::now(); + std::time_t now_t = std::chrono::system_clock::to_time_t(now); + + std::ostringstream oss; + oss << thread_id; + std::string UserOpID = std::to_string(now_t) + oss.str(); + return UserOpID; +} + Status NewDB::Open(const Options& options, const std::string& name, NewDB** dbptr) { // 打开底层的数据库 DB* dataDB; @@ -133,6 +186,35 @@ Status NewDB::Open(const Options& options, const std::string& name, NewDB** dbpt (*dbptr)->TinyOpID = 0; + // recover dataDB&indexDB using recoverDB + leveldb::Iterator* iter = recoverDB->NewIterator(leveldb::ReadOptions()); + Slice recover_key; + for(iter->SeekToFirst(); iter->Valid(); iter->Next()){ + recover_key = iter->key(); + std::string DBname = (*dbptr)->ExtractRecoverKey(recover_key.ToString()); + std::string TinyOp; + std::pair<std::string, std::string> k_v = (*dbptr)->ExtractRecoverValue(iter->value().ToString(), &TinyOp); + if(DBname == "dataDB"){ + if(TinyOp == "PUT"){ + dataDB->Put(leveldb::WriteOptions(), k_v.first, k_v.second); + } + else{ + dataDB->Delete(leveldb::WriteOptions(), k_v.first); + } + } + else{ + if(TinyOp == "PUT"){ + indexDB->Put(leveldb::WriteOptions(), k_v.first, k_v.second); + } + else{ + indexDB->Delete(leveldb::WriteOptions(), k_v.first); + } + } + recoverDB->Delete(leveldb::WriteOptions(), recover_key); + } + // std::cout << "recover " << i << " rows, end key is " << keystr << std::endl; + delete iter; + // 重新构造indexed_fields Iterator* it = (*dbptr)->index_db_->NewIterator(ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { @@ -153,11 +235,28 @@ Status NewDB::Open(const Options& options, const std::string& name, NewDB** dbpt Status NewDB::Put_fields(const WriteOptions& options, const Slice& key, const FieldArray& fields){ + std::string UserOpID = ConstructUserOpID(std::this_thread::get_id()); + + // row lock + std::unique_lock<std::mutex> lock(db_mutex_, std::defer_lock); + std::unique_lock<std::mutex> row_lock(m, std::defer_lock); + ready = (putting_keys.find(key.ToString()) == putting_keys.end()); // no another thread putting the same key + + if (!ready) { + cv.wait(row_lock, []{return ready;}); + } + + lock.lock(); + putting_keys.insert(key.ToString()); + lock.unlock(); + // row lock + FieldArray current_fields; Status s = Get_fields(leveldb::ReadOptions(), key, ¤t_fields); leveldb::WriteBatch data_batch; leveldb::WriteBatch index_batch; + leveldb::WriteBatch recover_batch; // uint64_t TinyOpID = 0; @@ -168,10 +267,23 @@ Status NewDB::Put_fields(const WriteOptions& options, const Slice& key, const Fi // 构造索引的key,结构:FieldName_FieldValue_Key std::string index_key = ConstructIndexKey(key, field); index_batch.Put(index_key, Slice()); + // prepare recover_batch + std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); + std::string Recover_value = ConstructRecoverValue("PUT", index_key, ""); + recover_batch.Put(Recover_key, Recover_value); + + TinyOpID = TinyOpID + 1; } } std::string value = SerializeValue(fields); data_batch.Put(key.ToString(), value); + + // put dataDB's k-v into recover_batch + std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "dataDB"); + std::string Recover_value = ConstructRecoverValue("PUT", key.ToString(), value); + recover_batch.Put(Recover_key, Recover_value); + + TinyOpID = TinyOpID + 1; } else{ @@ -183,6 +295,12 @@ Status NewDB::Put_fields(const WriteOptions& options, const Slice& key, const Fi // put into index_batch std::string index_key = ConstructIndexKey(key, field); index_batch.Put(index_key, Slice()); + // put into recover_batch + std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); + std::string Recover_value = ConstructRecoverValue("PUT", index_key, ""); + recover_batch.Put(Recover_key, Recover_value); + + TinyOpID = TinyOpID + 1; } } @@ -190,16 +308,35 @@ Status NewDB::Put_fields(const WriteOptions& options, const Slice& key, const Fi std::string value = SerializeValue(fields); data_batch.Put(key.ToString(), value); + // put dataDB's k-v into recover_batch + std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "dataDB"); + std::string Recover_value = ConstructRecoverValue("PUT", key.ToString(), value); + recover_batch.Put(Recover_key, Recover_value); + TinyOpID = TinyOpID + 1; + // delete ops in indexdb for (const auto& field : fieldarray_pair.second) { if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) { // delete in index_batch std::string index_key = ConstructIndexKey(key, field); index_batch.Delete(index_key); + // delete in recover_batch + std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); + std::string Recover_value = ConstructRecoverValue("DELETE", index_key, ""); + recover_batch.Put(Recover_key, Recover_value); + + TinyOpID = TinyOpID + 1; } } } + // write into RocoverDB + leveldb::WriteOptions write_options; + s = recover_db_->Write(write_options, &recover_batch); + if(!s.ok()){ + return s; + } + // write into indexDB Status status = index_db_->Write(write_options, &index_batch); if(!status.ok()){ @@ -212,6 +349,27 @@ Status NewDB::Put_fields(const WriteOptions& options, const Slice& key, const Fi return data_status; // 主数据写入失败,直接返回 } + //delete TinyOps of this UserOp in RecoverDB + leveldb::WriteBatch batch; + std::string prefix; + PutLengthPrefixedSlice(&prefix, Slice(UserOpID)); + leveldb::Iterator* iter = recover_db_->NewIterator(leveldb::ReadOptions()); + for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){ + Slice Recover_key = iter->key(); + batch.Delete(Recover_key); + } + delete iter; + recover_db_->Write(leveldb::WriteOptions(), &batch); + + // row lock + lock.lock(); + putting_keys.erase(key.ToString()); + lock.unlock(); + + ready = true; + cv.notify_all(); + // row lock + return Status::OK(); } @@ -274,8 +432,11 @@ std::vector<std::string> NewDB::FindKeysByField(Field &field){ } bool NewDB::Delete(const WriteOptions& options, const Slice& key){ + std::string UserOpID = ConstructUserOpID(std::this_thread::get_id()); + leveldb::WriteBatch data_batch; leveldb::WriteBatch index_batch; + leveldb::WriteBatch recover_batch; // uint64_t TinyOpID = 0; FieldArray fields; @@ -285,6 +446,11 @@ bool NewDB::Delete(const WriteOptions& options, const Slice& key){ // delete in datadb data_batch.Delete(key.ToString()); + std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "dataDB"); + std::string Recover_value = ConstructRecoverValue("DELETE", key.ToString(), ""); + recover_batch.Put(Recover_key, Recover_value); + TinyOpID = TinyOpID + 1; + // delete in indexdb for (const auto& field : fields) { // 如果字段名在 indexed_fields_ 中,才插入二级索引 @@ -292,9 +458,21 @@ bool NewDB::Delete(const WriteOptions& options, const Slice& key){ // 构造索引的key,结构:FieldName_FieldValue_Key std::string index_key = ConstructIndexKey(key, field); index_batch.Delete(index_key); + // prepare recover_batch + std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); + std::string Recover_value = ConstructRecoverValue("DELETE", index_key, ""); + recover_batch.Put(Recover_key, Recover_value); + + TinyOpID = TinyOpID + 1; } } + // write recoverDB + Status s = recover_db_->Write(leveldb::WriteOptions(), &recover_batch); + if(!s.ok()){ + return false; + } + // write dataDB s = data_db_->Write(leveldb::WriteOptions(), &data_batch); if(!s.ok()){ @@ -306,6 +484,18 @@ bool NewDB::Delete(const WriteOptions& options, const Slice& key){ if(!s.ok()){ return false; } + + //delete TinyOps of this UserOp in RecoverDB + leveldb::WriteBatch batch; + std::string prefix; + PutLengthPrefixedSlice(&prefix, Slice(UserOpID)); + leveldb::Iterator* iter = recover_db_->NewIterator(leveldb::ReadOptions()); + for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){ + Slice Recover_key = iter->key(); + batch.Delete(Recover_key); + } + delete iter; + recover_db_->Write(leveldb::WriteOptions(), &batch); } else{ return false;