#include "db/NewDB.h" #include "util/coding.h" #include "db/write_batch_internal.h" #include "db/version_set.h" #include "db/db_impl.h" #include #include #include #include #include #include #include #include #include #include namespace leveldb{ NewDB::~NewDB() { } // row lock std::condition_variable cv; std::mutex m; bool ready = false; // row lock std::string NewDB::SerializeValue(const FieldArray& fields){ std::string value_str; for(const auto& pair : fields){ std::string field = pair.first + ":" + pair.second; uint32_t field_size = field.size(); char buffer[4]; EncodeFixed32(buffer, field_size); value_str.append(buffer, 4); value_str.append(field); } return value_str; // std::string s; // for(auto& field : fields){ // PutLengthPrefixedSlice(&s, Slice(field.first)); // PutLengthPrefixedSlice(&s, Slice(field.second)); // } // return s; } FieldArray NewDB::ParseValue(const std::string& value_str){ FieldArray fields; const char* data = value_str.data(); size_t length = value_str.size(); while (length >= 4) { uint32_t field_size = DecodeFixed32(data); if (length < 4 + field_size) { break; } std::string field(data + 4, field_size); size_t colon_pos = field.find(':'); std::string field_name = field.substr(0, colon_pos); std::string field_value = field.substr(colon_pos + 1); fields.push_back(std::make_pair(field_name,field_value)); data += 4 + field_size; length -= 4 + field_size; } return fields; // Slice input(value_str); // Slice v1,v2; // FieldArray fields; // while(v1!=Slice()){ // GetLengthPrefixedSlice(&input, &v1); // GetLengthPrefixedSlice(&input, &v2); // fields.push_back(std::make_pair(v1.ToString(), v2.ToString())); // } // return fields; } // 构造索引的key,结构:FieldName_FieldValue_Key-橙 std::string NewDB::ConstructIndexKey(const Slice& key, const Field& field){ // std::string s; // PutLengthPrefixedSlice(&s, Slice(field.first)); // PutLengthPrefixedSlice(&s, Slice(field.second)); // PutLengthPrefixedSlice(&s, key); // return s; std::ostringstream oss; oss << field.first << ":" << field.second << "_" << key.ToString(); return oss.str(); } // 从索引键中提取原数据的键 std::string NewDB::ExtractIndexKey(const Slice& key){ // Slice input(key.ToString()); // Slice v; // GetLengthPrefixedSlice(&input, &v); // GetLengthPrefixedSlice(&input, &v); // GetLengthPrefixedSlice(&input, &v); // return v.ToString(); //extract key of dataDB from key of indexDB std::string fullKey(key.data(), key.size()); // size_t pos1 = fullKey.find('_'); size_t pos1 = fullKey.find('_'); return fullKey.substr(pos1 + 1); // 提取 'Key' 部分 } std::string NewDB::ConstructRecoverKey(std::string UserOpID, std::string TinyOpID, std::string DBname){ std::string s; PutLengthPrefixedSlice(&s, Slice(UserOpID)); PutLengthPrefixedSlice(&s, Slice(TinyOpID)); PutLengthPrefixedSlice(&s, Slice(DBname)); return s; } std::string NewDB::ConstructRecoverValue(std::string TinyOp, std::string key, std::string value){ std::string s; PutLengthPrefixedSlice(&s, Slice(TinyOp)); PutLengthPrefixedSlice(&s, Slice(key)); PutLengthPrefixedSlice(&s, Slice(value)); return s; } std::string NewDB::ExtractRecoverKey(std::string s){ Slice input(s); Slice v; GetLengthPrefixedSlice(&input, &v); GetLengthPrefixedSlice(&input, &v); GetLengthPrefixedSlice(&input, &v); return v.ToString(); } std::pair NewDB::ExtractRecoverValue(std::string s, std::string* TinyOp){ Slice input(s); Slice v; GetLengthPrefixedSlice(&input, &v); *TinyOp = v.ToString(); GetLengthPrefixedSlice(&input, &v); std::string key = v.ToString(); GetLengthPrefixedSlice(&input, &v); std::string value = v.ToString(); return std::make_pair(key, value); } std::string NewDB::ConstructUserOpID(std::thread::id thread_id){ auto now = std::chrono::system_clock::now(); std::time_t now_t = std::chrono::system_clock::to_time_t(now); std::ostringstream oss; oss << thread_id; std::string UserOpID = std::to_string(now_t) + oss.str(); return UserOpID; } Status NewDB::Open(const Options& options, const std::string& name, NewDB** dbptr) { // 打开底层的数据库 DB* dataDB; Status s1 = DB::Open(options, name + "_data", &dataDB); if (!s1.ok()) { return s1; // 如果打开失败,返回错误 } DB* indexDB; Status s2 = DB::Open(options, name + "_index", &indexDB); if (!s2.ok()) { return s2; // 如果打开失败,返回错误 } DB* recoverDB; Status s3 = DB::Open(options, name + "_recover", &recoverDB); if (!s3.ok()) { return s3; // 如果打开失败,返回错误 } // 创建一个 NewDB 实例 *dbptr = new NewDB(); // 初始化 data_db_,index_db_和recover_db_ (*dbptr)->data_db_ = std::unique_ptr(dataDB); // 将打开的数据库指针传递给 data_db_ (*dbptr)->index_db_ = std::unique_ptr(indexDB); // 创建 IndexDB (*dbptr)->recover_db_ = std::unique_ptr(recoverDB); (*dbptr)->TinyOpID = 0; // recover dataDB&indexDB using recoverDB leveldb::Iterator* iter = recoverDB->NewIterator(leveldb::ReadOptions()); Slice recover_key; for(iter->SeekToFirst(); iter->Valid(); iter->Next()){ recover_key = iter->key(); std::string DBname = (*dbptr)->ExtractRecoverKey(recover_key.ToString()); std::string TinyOp; std::pair k_v = (*dbptr)->ExtractRecoverValue(iter->value().ToString(), &TinyOp); if(DBname == "dataDB"){ if(TinyOp == "PUT"){ dataDB->Put(leveldb::WriteOptions(), k_v.first, k_v.second); } else{ dataDB->Delete(leveldb::WriteOptions(), k_v.first); } } else{ if(TinyOp == "PUT"){ indexDB->Put(leveldb::WriteOptions(), k_v.first, k_v.second); } else{ indexDB->Delete(leveldb::WriteOptions(), k_v.first); } } recoverDB->Delete(leveldb::WriteOptions(), recover_key); } // std::cout << "recover " << i << " rows, end key is " << keystr << std::endl; delete iter; // 重新构造indexed_fields Iterator* it = (*dbptr)->index_db_->NewIterator(ReadOptions()); for (it->SeekToFirst(); it->Valid(); it->Next()) { // 只关心以 "index_field" 为前缀的键 if (it->key().starts_with("index_field_read:")) { std::string field = it->key().ToString().substr(16); // 获取字段名 (*dbptr)->indexed_fields_read.insert(field); } if (it->key().starts_with("index_field_write:")) { std::string field = it->key().ToString().substr(17); // 获取字段名 (*dbptr)->indexed_fields_write.insert(field); } } delete it; return Status::OK(); // 返回成功 } Status NewDB::Put_fields(const WriteOptions& options, const Slice& key, const FieldArray& fields){ std::string UserOpID = ConstructUserOpID(std::this_thread::get_id()); // row lock std::unique_lock lock(db_mutex_, std::defer_lock); std::unique_lock row_lock(m, std::defer_lock); ready = (putting_keys.find(key.ToString()) == putting_keys.end()); // no another thread putting the same key if (!ready) { cv.wait(row_lock, []{return ready;}); } lock.lock(); putting_keys.insert(key.ToString()); lock.unlock(); // row lock FieldArray current_fields; Status s = Get_fields(leveldb::ReadOptions(), key, ¤t_fields); leveldb::WriteBatch data_batch; leveldb::WriteBatch index_batch; leveldb::WriteBatch recover_batch; // uint64_t TinyOpID = 0; if(!s.ok()){ for (const auto& field : fields) { // 如果字段名在 indexed_fields_ 中,才插入二级索引 if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) { // 构造索引的key,结构:FieldName_FieldValue_Key std::string index_key = ConstructIndexKey(key, field); index_batch.Put(index_key, Slice()); // prepare recover_batch std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); std::string Recover_value = ConstructRecoverValue("PUT", index_key, ""); recover_batch.Put(Recover_key, Recover_value); TinyOpID = TinyOpID + 1; } } std::string value = SerializeValue(fields); data_batch.Put(key.ToString(), value); // put dataDB's k-v into recover_batch std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "dataDB"); std::string Recover_value = ConstructRecoverValue("PUT", key.ToString(), value); recover_batch.Put(Recover_key, Recover_value); TinyOpID = TinyOpID + 1; } else{ std::pair fieldarray_pair = UpdateIndex(options, key, fields, current_fields); // put ops in indexdb for (const auto& field : fieldarray_pair.first) { if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) { // put into index_batch std::string index_key = ConstructIndexKey(key, field); index_batch.Put(index_key, Slice()); // put into recover_batch std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); std::string Recover_value = ConstructRecoverValue("PUT", index_key, ""); recover_batch.Put(Recover_key, Recover_value); TinyOpID = TinyOpID + 1; } } // put ops in datadb std::string value = SerializeValue(fields); data_batch.Put(key.ToString(), value); // put dataDB's k-v into recover_batch std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "dataDB"); std::string Recover_value = ConstructRecoverValue("PUT", key.ToString(), value); recover_batch.Put(Recover_key, Recover_value); TinyOpID = TinyOpID + 1; // delete ops in indexdb for (const auto& field : fieldarray_pair.second) { if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) { // delete in index_batch std::string index_key = ConstructIndexKey(key, field); index_batch.Delete(index_key); // delete in recover_batch std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); std::string Recover_value = ConstructRecoverValue("DELETE", index_key, ""); recover_batch.Put(Recover_key, Recover_value); TinyOpID = TinyOpID + 1; } } } // write into RocoverDB leveldb::WriteOptions write_options; s = recover_db_->Write(write_options, &recover_batch); if(!s.ok()){ return s; } // write into indexDB Status status = index_db_->Write(write_options, &index_batch); if(!status.ok()){ return status; } // write into dataDB Status data_status = data_db_->Write(write_options, &data_batch); if (!data_status.ok()) { return data_status; // 主数据写入失败,直接返回 } //delete TinyOps of this UserOp in RecoverDB leveldb::WriteBatch batch; std::string prefix; PutLengthPrefixedSlice(&prefix, Slice(UserOpID)); leveldb::Iterator* iter = recover_db_->NewIterator(leveldb::ReadOptions()); for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){ Slice Recover_key = iter->key(); batch.Delete(Recover_key); } delete iter; recover_db_->Write(leveldb::WriteOptions(), &batch); // row lock lock.lock(); putting_keys.erase(key.ToString()); lock.unlock(); ready = true; cv.notify_all(); // row lock return Status::OK(); } std::pair NewDB::UpdateIndex(const WriteOptions& options, const Slice& key, const FieldArray& fields, const FieldArray& current_fields){ // 构建current_fields的映射 std::unordered_map current_map; for (const auto& field : current_fields) { current_map[field.first] = field.second; } // 构建fields的映射 std::unordered_map fields_map; for (const auto& field : fields) { fields_map[field.first] = field.second; } // 删除的字段 FieldArray deleted_fields; // 新增的字段 FieldArray insert_fields; // fields中的字段 for (const auto& field : fields) { // fields中存在但current_fields中不存在 if (current_map.find(field.first) == current_map.end()) { insert_fields.push_back(field); } else if (current_map[field.first] != field.second) { // current_fields中存在但是值不同 insert_fields.push_back(field); deleted_fields.push_back(std::make_pair(field.first, current_map[field.first])); } } // current_fields中的字段 for (const auto& field : current_fields) { if (fields_map.find(field.first) == fields_map.end()) { deleted_fields.push_back(field); } } return std::make_pair(insert_fields, deleted_fields); } Status NewDB::Get_fields(const ReadOptions& options, const Slice& key, FieldArray* fields){ //get value(fields) with key // 从 DataDB 获取数据 Status s = data_db_->Get_fields(options, key, fields); if (!s.ok()) { return s; // 如果获取失败,返回状态 } return Status::OK(); } std::vector NewDB::FindKeysByField(Field &field){ //get keys with field std::vector matching_keys; leveldb::DB* db = data_db_.get(); matching_keys = data_db_->FindKeysByField(db, field); return matching_keys; } bool NewDB::Delete(const WriteOptions& options, const Slice& key){ std::string UserOpID = ConstructUserOpID(std::this_thread::get_id()); leveldb::WriteBatch data_batch; leveldb::WriteBatch index_batch; leveldb::WriteBatch recover_batch; // uint64_t TinyOpID = 0; FieldArray fields; Status s = data_db_->Get_fields(leveldb::ReadOptions(), key, &fields); if(s.ok()){ // delete in datadb data_batch.Delete(key.ToString()); std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "dataDB"); std::string Recover_value = ConstructRecoverValue("DELETE", key.ToString(), ""); recover_batch.Put(Recover_key, Recover_value); TinyOpID = TinyOpID + 1; // delete in indexdb for (const auto& field : fields) { // 如果字段名在 indexed_fields_ 中,才插入二级索引 if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) { // 构造索引的key,结构:FieldName_FieldValue_Key std::string index_key = ConstructIndexKey(key, field); index_batch.Delete(index_key); // prepare recover_batch std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); std::string Recover_value = ConstructRecoverValue("DELETE", index_key, ""); recover_batch.Put(Recover_key, Recover_value); TinyOpID = TinyOpID + 1; } } // write recoverDB Status s = recover_db_->Write(leveldb::WriteOptions(), &recover_batch); if(!s.ok()){ return false; } // write dataDB s = data_db_->Write(leveldb::WriteOptions(), &data_batch); if(!s.ok()){ return false; } // write indexDB s = index_db_->Write(leveldb::WriteOptions(), &index_batch); if(!s.ok()){ return false; } //delete TinyOps of this UserOp in RecoverDB leveldb::WriteBatch batch; std::string prefix; PutLengthPrefixedSlice(&prefix, Slice(UserOpID)); leveldb::Iterator* iter = recover_db_->NewIterator(leveldb::ReadOptions()); for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){ Slice Recover_key = iter->key(); batch.Delete(Recover_key); } delete iter; recover_db_->Write(leveldb::WriteOptions(), &batch); } else{ return false; } return true; } bool NewDB::CreateIndexOnField(const std::string& field_name) { // 将索引字段元数据持久化到indexDB index_db_->Put(WriteOptions(), "index_field_write:" + field_name, "1"); std::unique_lock lock(db_mutex_, std::defer_lock); lock.lock(); indexed_fields_write.insert(field_name); lock.unlock(); const size_t BATCH_SIZE_LIMIT = 1000; // 每个批次限制1000条记录 size_t batch_count = 0; // 遍历 data_db_ 中的所有键值对 leveldb::ReadOptions read_options; leveldb::WriteBatch batch; std::unique_ptr it(data_db_->NewIterator(read_options)); for (it->SeekToFirst(); it->Valid(); it->Next()) { // 获取主数据库的键值对 std::string key = it->key().ToString(); std::string value = it->value().ToString(); // 解析值,提取字段数组 FieldArray fields = ParseValue(value); // 查找指定字段 for (const auto& field : fields) { if (field.first == field_name) { // 构造索引键 std::string index_key = ConstructIndexKey(key, field); // 插入到索引数据库 batch.Put(index_key, Slice()); batch_count++; // 如果达到批次大小限制,执行写入 if (batch_count >= BATCH_SIZE_LIMIT) { leveldb::Status status = index_db_->Write(leveldb::WriteOptions(), &batch); if (!status.ok()) { return false; } batch.Clear(); // 清空批次,准备下一个批次 batch_count = 0; } } } } // 如果迭代器出错 if (!it->status().ok()) { // 从头开始创建,覆盖掉原本失效的东西 return false; } // 批量写入 leveldb::WriteOptions write_options; Status status = index_db_->Write(write_options, &batch); if (!status.ok()) { return false; } // 将索引字段元数据持久化到indexDB index_db_->Put(WriteOptions(), "index_field_read:" + field_name, "1"); lock.lock(); indexed_fields_read.insert(field_name); lock.unlock(); return true; } std::vector NewDB::QueryByIndex(Field &field){ std::vector matching_keys; if(indexed_fields_read.find(field.first) != indexed_fields_read.end()){ std::string prefix = ConstructIndexKey(Slice(), field); //前缀-字段名+字段值 leveldb::Iterator* iter = index_db_->NewIterator(leveldb::ReadOptions()); for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){ Slice index_key = iter->key(); std::string data_key = ExtractIndexKey(index_key); //解析索引键中的key matching_keys.push_back(data_key); } delete iter; // check these matching_keys and remove keys not in datadb for (auto& key:matching_keys) { FieldArray fields; Status s = data_db_->Get_fields(leveldb::ReadOptions(), key, &fields); // if field not in this k-v? int tag = 0; if(s.ok()){ for(auto& each_field : fields){ if(each_field.first == field.first){ if(each_field.second == field.second){ tag = 1; } } } } if(tag == 0){ matching_keys.erase(std::remove(matching_keys.begin(),matching_keys.end(),key),matching_keys.end()); } // if(!s.ok()){ // matching_keys.erase(std::remove(matching_keys.begin(),matching_keys.end(),key),matching_keys.end()); // } } } return matching_keys; } bool NewDB::DeleteIndex(const std::string& field_name){ // 将索引字段元数据持久化到indexDB index_db_->Delete(WriteOptions(), "index_field_read:" + field_name); index_db_->Delete(WriteOptions(), "index_field_write:" + field_name); std::unique_lock lock(db_mutex_, std::defer_lock); lock.lock(); indexed_fields_read.erase(field_name); indexed_fields_write.erase(field_name); lock.unlock(); const size_t BATCH_SIZE_LIMIT = 1000; // 每个批次限制1000条记录 size_t batch_count = 0; leveldb::WriteBatch delete_batch; std::string prefix = field_name; leveldb::Iterator* iter = index_db_->NewIterator(leveldb::ReadOptions()); for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){ Slice index_key = iter->key(); delete_batch.Delete(index_key); batch_count++; if (batch_count >= BATCH_SIZE_LIMIT) { leveldb::Status status = index_db_->Write(leveldb::WriteOptions(), &delete_batch); if (!status.ok()) { return false; } delete_batch.Clear(); // 清空批次,准备下一个批次 batch_count = 0; } } delete iter; Status s = index_db_->Write(leveldb::WriteOptions(), &delete_batch); if(!s.ok()){ return false; } return true; } }