From 2ab4c1aa4768e804931e29b141e01f0a7db6b452 Mon Sep 17 00:00:00 2001 From: Estella <10225101469@stu.ecnu.edu.cn> Date: Sun, 5 Jan 2025 19:32:17 +0800 Subject: [PATCH] delete --- db/NewDB.cc | 663 ------------------------------------------------------------ db/NewDB.h | 79 -------- 2 files changed, 742 deletions(-) delete mode 100644 db/NewDB.cc delete mode 100644 db/NewDB.h diff --git a/db/NewDB.cc b/db/NewDB.cc deleted file mode 100644 index 1602ce3..0000000 --- a/db/NewDB.cc +++ /dev/null @@ -1,663 +0,0 @@ -#include "db/NewDB.h" -#include "util/coding.h" -#include "db/write_batch_internal.h" -#include "db/version_set.h" -#include "db/db_impl.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace leveldb{ -NewDB::~NewDB() { - -} - -// row lock -std::condition_variable cv; -std::mutex m; -bool ready = false; -// row lock - -std::string NewDB::SerializeValue(const FieldArray& fields){ - std::string value_str; - for(const auto& pair : fields){ - std::string field = pair.first + ":" + pair.second; - uint32_t field_size = field.size(); - char buffer[4]; - EncodeFixed32(buffer, field_size); - value_str.append(buffer, 4); - value_str.append(field); - } - return value_str; - - // std::string s; - // for(auto& field : fields){ - // PutLengthPrefixedSlice(&s, Slice(field.first)); - // PutLengthPrefixedSlice(&s, Slice(field.second)); - // } - // return s; -} - -FieldArray NewDB::ParseValue(const std::string& value_str){ - FieldArray fields; - const char* data = value_str.data(); - size_t length = value_str.size(); - - while (length >= 4) { - uint32_t field_size = DecodeFixed32(data); - if (length < 4 + field_size) { - break; - } - - std::string field(data + 4, field_size); - size_t colon_pos = field.find(':'); - - std::string field_name = field.substr(0, colon_pos); - std::string field_value = field.substr(colon_pos + 1); - - fields.push_back(std::make_pair(field_name,field_value)); - - data += 4 + field_size; - length -= 4 + field_size; - } - - return fields; - - // Slice input(value_str); - // Slice v1,v2; - // FieldArray fields; - // while(v1!=Slice()){ - // GetLengthPrefixedSlice(&input, &v1); - // GetLengthPrefixedSlice(&input, &v2); - // fields.push_back(std::make_pair(v1.ToString(), v2.ToString())); - // } - // return fields; -} - -// 构造索引的key,结构:FieldName_FieldValue_Key-橙 -std::string NewDB::ConstructIndexKey(const Slice& key, const Field& field){ - // std::string s; - // PutLengthPrefixedSlice(&s, Slice(field.first)); - // PutLengthPrefixedSlice(&s, Slice(field.second)); - // PutLengthPrefixedSlice(&s, key); - // return s; - - std::ostringstream oss; - oss << field.first << ":" << field.second << "_" << key.ToString(); - return oss.str(); -} - -// 从索引键中提取原数据的键 -std::string NewDB::ExtractIndexKey(const Slice& key){ - // Slice input(key.ToString()); - // Slice v; - // GetLengthPrefixedSlice(&input, &v); - // GetLengthPrefixedSlice(&input, &v); - // GetLengthPrefixedSlice(&input, &v); - // return v.ToString(); - - //extract key of dataDB from key of indexDB - std::string fullKey(key.data(), key.size()); - // size_t pos1 = fullKey.find('_'); - size_t pos1 = fullKey.find('_'); - return fullKey.substr(pos1 + 1); // 提取 'Key' 部分 -} - -std::string NewDB::ConstructRecoverKey(std::string UserOpID, std::string TinyOpID, std::string DBname){ - std::string s; - PutLengthPrefixedSlice(&s, Slice(UserOpID)); - PutLengthPrefixedSlice(&s, Slice(TinyOpID)); - PutLengthPrefixedSlice(&s, Slice(DBname)); - return s; -} - -std::string NewDB::ConstructRecoverValue(std::string TinyOp, std::string key, std::string value){ - std::string s; - PutLengthPrefixedSlice(&s, Slice(TinyOp)); - PutLengthPrefixedSlice(&s, Slice(key)); - PutLengthPrefixedSlice(&s, Slice(value)); - return s; -} - -std::string NewDB::ExtractRecoverKey(std::string s){ - Slice input(s); - Slice v; - GetLengthPrefixedSlice(&input, &v); - GetLengthPrefixedSlice(&input, &v); - GetLengthPrefixedSlice(&input, &v); - return v.ToString(); -} - -std::pair NewDB::ExtractRecoverValue(std::string s, std::string* TinyOp){ - Slice input(s); - Slice v; - GetLengthPrefixedSlice(&input, &v); - *TinyOp = v.ToString(); - GetLengthPrefixedSlice(&input, &v); - std::string key = v.ToString(); - GetLengthPrefixedSlice(&input, &v); - std::string value = v.ToString(); - return std::make_pair(key, value); -} - -std::string NewDB::ConstructUserOpID(std::thread::id thread_id){ - auto now = std::chrono::system_clock::now(); - std::time_t now_t = std::chrono::system_clock::to_time_t(now); - - std::ostringstream oss; - oss << thread_id; - std::string UserOpID = std::to_string(now_t) + oss.str(); - return UserOpID; -} - - -Status NewDB::Open(const Options& options, const std::string& name, NewDB** dbptr) { - // 打开底层的数据库 - DB* dataDB; - Status s1 = DB::Open(options, name + "_data", &dataDB); - if (!s1.ok()) { - return s1; // 如果打开失败,返回错误 - } - - DB* indexDB; - Status s2 = DB::Open(options, name + "_index", &indexDB); - if (!s2.ok()) { - return s2; // 如果打开失败,返回错误 - } - - DB* recoverDB; - Status s3 = DB::Open(options, name + "_recover", &recoverDB); - if (!s3.ok()) { - return s3; // 如果打开失败,返回错误 - } - // 创建一个 NewDB 实例 - *dbptr = new NewDB(); - - // 初始化 data_db_,index_db_和recover_db_ - (*dbptr)->data_db_ = std::unique_ptr(dataDB); // 将打开的数据库指针传递给 data_db_ - (*dbptr)->index_db_ = std::unique_ptr(indexDB); // 创建 IndexDB - (*dbptr)->recover_db_ = std::unique_ptr(recoverDB); - - (*dbptr)->TinyOpID = 0; - - // recover dataDB&indexDB using recoverDB - leveldb::Iterator* iter = recoverDB->NewIterator(leveldb::ReadOptions()); - Slice recover_key; - for(iter->SeekToFirst(); iter->Valid(); iter->Next()){ - recover_key = iter->key(); - std::string DBname = (*dbptr)->ExtractRecoverKey(recover_key.ToString()); - std::string TinyOp; - std::pair k_v = (*dbptr)->ExtractRecoverValue(iter->value().ToString(), &TinyOp); - if(DBname == "dataDB"){ - if(TinyOp == "PUT"){ - dataDB->Put(leveldb::WriteOptions(), k_v.first, k_v.second); - } - else{ - dataDB->Delete(leveldb::WriteOptions(), k_v.first); - } - } - else{ - if(TinyOp == "PUT"){ - indexDB->Put(leveldb::WriteOptions(), k_v.first, k_v.second); - } - else{ - indexDB->Delete(leveldb::WriteOptions(), k_v.first); - } - } - recoverDB->Delete(leveldb::WriteOptions(), recover_key); - } - // std::cout << "recover " << i << " rows, end key is " << keystr << std::endl; - delete iter; - - // 重新构造indexed_fields - Iterator* it = (*dbptr)->index_db_->NewIterator(ReadOptions()); - for (it->SeekToFirst(); it->Valid(); it->Next()) { - // 只关心以 "index_field" 为前缀的键 - if (it->key().starts_with("index_field_read:")) { - std::string field = it->key().ToString().substr(16); // 获取字段名 - (*dbptr)->indexed_fields_read.insert(field); - } - if (it->key().starts_with("index_field_write:")) { - std::string field = it->key().ToString().substr(17); // 获取字段名 - (*dbptr)->indexed_fields_write.insert(field); - } - } - delete it; - - return Status::OK(); // 返回成功 -} - - -Status NewDB::Put_fields(const WriteOptions& options, const Slice& key, const FieldArray& fields){ - std::string UserOpID = ConstructUserOpID(std::this_thread::get_id()); - - // row lock - std::unique_lock lock(db_mutex_, std::defer_lock); - std::unique_lock row_lock(m, std::defer_lock); - ready = (putting_keys.find(key.ToString()) == putting_keys.end()); // no another thread putting the same key - - if (!ready) { - cv.wait(row_lock, []{return ready;}); - } - - lock.lock(); - putting_keys.insert(key.ToString()); - lock.unlock(); - // row lock - - FieldArray current_fields; - Status s = Get_fields(leveldb::ReadOptions(), key, ¤t_fields); - - leveldb::WriteBatch data_batch; - leveldb::WriteBatch index_batch; - leveldb::WriteBatch recover_batch; - - // uint64_t TinyOpID = 0; - - if(!s.ok()){ - for (const auto& field : fields) { - // 如果字段名在 indexed_fields_ 中,才插入二级索引 - if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) { - // 构造索引的key,结构:FieldName_FieldValue_Key - std::string index_key = ConstructIndexKey(key, field); - index_batch.Put(index_key, Slice()); - // prepare recover_batch - std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); - std::string Recover_value = ConstructRecoverValue("PUT", index_key, ""); - recover_batch.Put(Recover_key, Recover_value); - - TinyOpID = TinyOpID + 1; - } - } - std::string value = SerializeValue(fields); - data_batch.Put(key.ToString(), value); - - // put dataDB's k-v into recover_batch - std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "dataDB"); - std::string Recover_value = ConstructRecoverValue("PUT", key.ToString(), value); - recover_batch.Put(Recover_key, Recover_value); - - TinyOpID = TinyOpID + 1; - } - - else{ - std::pair fieldarray_pair = UpdateIndex(options, key, fields, current_fields); - - // put ops in indexdb - for (const auto& field : fieldarray_pair.first) { - if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) { - // put into index_batch - std::string index_key = ConstructIndexKey(key, field); - index_batch.Put(index_key, Slice()); - // put into recover_batch - std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); - std::string Recover_value = ConstructRecoverValue("PUT", index_key, ""); - recover_batch.Put(Recover_key, Recover_value); - - TinyOpID = TinyOpID + 1; - } - } - - // put ops in datadb - std::string value = SerializeValue(fields); - data_batch.Put(key.ToString(), value); - - // put dataDB's k-v into recover_batch - std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "dataDB"); - std::string Recover_value = ConstructRecoverValue("PUT", key.ToString(), value); - recover_batch.Put(Recover_key, Recover_value); - TinyOpID = TinyOpID + 1; - - // delete ops in indexdb - for (const auto& field : fieldarray_pair.second) { - if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) { - // delete in index_batch - std::string index_key = ConstructIndexKey(key, field); - index_batch.Delete(index_key); - // delete in recover_batch - std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); - std::string Recover_value = ConstructRecoverValue("DELETE", index_key, ""); - recover_batch.Put(Recover_key, Recover_value); - - TinyOpID = TinyOpID + 1; - } - } - } - - // write into RocoverDB - leveldb::WriteOptions write_options; - s = recover_db_->Write(write_options, &recover_batch); - if(!s.ok()){ - return s; - } - - // write into indexDB - Status status = index_db_->Write(write_options, &index_batch); - if(!status.ok()){ - return status; - } - - // write into dataDB - Status data_status = data_db_->Write(write_options, &data_batch); - if (!data_status.ok()) { - return data_status; // 主数据写入失败,直接返回 - } - - //delete TinyOps of this UserOp in RecoverDB - leveldb::WriteBatch batch; - std::string prefix; - PutLengthPrefixedSlice(&prefix, Slice(UserOpID)); - leveldb::Iterator* iter = recover_db_->NewIterator(leveldb::ReadOptions()); - for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){ - Slice Recover_key = iter->key(); - batch.Delete(Recover_key); - } - delete iter; - recover_db_->Write(leveldb::WriteOptions(), &batch); - - // row lock - lock.lock(); - putting_keys.erase(key.ToString()); - lock.unlock(); - - ready = true; - cv.notify_all(); - // row lock - - return Status::OK(); -} - -std::pair NewDB::UpdateIndex(const WriteOptions& options, const Slice& key, const FieldArray& fields, const FieldArray& current_fields){ - // 构建current_fields的映射 - std::unordered_map current_map; - for (const auto& field : current_fields) { - current_map[field.first] = field.second; - } - - // 构建fields的映射 - std::unordered_map fields_map; - for (const auto& field : fields) { - fields_map[field.first] = field.second; - } - - // 删除的字段 - FieldArray deleted_fields; - // 新增的字段 - FieldArray insert_fields; - - // fields中的字段 - for (const auto& field : fields) { - // fields中存在但current_fields中不存在 - if (current_map.find(field.first) == current_map.end()) { - insert_fields.push_back(field); - } else if (current_map[field.first] != field.second) { - // current_fields中存在但是值不同 - insert_fields.push_back(field); - deleted_fields.push_back(std::make_pair(field.first, current_map[field.first])); - } - } - - // current_fields中的字段 - for (const auto& field : current_fields) { - if (fields_map.find(field.first) == fields_map.end()) { - deleted_fields.push_back(field); - } - } - - return std::make_pair(insert_fields, deleted_fields); -} - -Status NewDB::Get_fields(const ReadOptions& options, const Slice& key, FieldArray* fields){ - //get value(fields) with key - // 从 DataDB 获取数据 - Status s = data_db_->Get_fields(options, key, fields); - if (!s.ok()) { - return s; // 如果获取失败,返回状态 - } - return Status::OK(); -} - -std::vector NewDB::FindKeysByField(Field &field){ - //get keys with field - std::vector matching_keys; - leveldb::DB* db = data_db_.get(); - matching_keys = data_db_->FindKeysByField(db, field); - return matching_keys; -} - -bool NewDB::Delete(const WriteOptions& options, const Slice& key){ - std::string UserOpID = ConstructUserOpID(std::this_thread::get_id()); - - leveldb::WriteBatch data_batch; - leveldb::WriteBatch index_batch; - leveldb::WriteBatch recover_batch; - // uint64_t TinyOpID = 0; - - FieldArray fields; - Status s = data_db_->Get_fields(leveldb::ReadOptions(), key, &fields); - - if(s.ok()){ - // delete in datadb - data_batch.Delete(key.ToString()); - - std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "dataDB"); - std::string Recover_value = ConstructRecoverValue("DELETE", key.ToString(), ""); - recover_batch.Put(Recover_key, Recover_value); - TinyOpID = TinyOpID + 1; - - // delete in indexdb - for (const auto& field : fields) { - // 如果字段名在 indexed_fields_ 中,才插入二级索引 - if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) { - // 构造索引的key,结构:FieldName_FieldValue_Key - std::string index_key = ConstructIndexKey(key, field); - index_batch.Delete(index_key); - // prepare recover_batch - std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); - std::string Recover_value = ConstructRecoverValue("DELETE", index_key, ""); - recover_batch.Put(Recover_key, Recover_value); - - TinyOpID = TinyOpID + 1; - } - } - - // write recoverDB - Status s = recover_db_->Write(leveldb::WriteOptions(), &recover_batch); - if(!s.ok()){ - return false; - } - - // write dataDB - s = data_db_->Write(leveldb::WriteOptions(), &data_batch); - if(!s.ok()){ - return false; - } - - // write indexDB - s = index_db_->Write(leveldb::WriteOptions(), &index_batch); - if(!s.ok()){ - return false; - } - - //delete TinyOps of this UserOp in RecoverDB - leveldb::WriteBatch batch; - std::string prefix; - PutLengthPrefixedSlice(&prefix, Slice(UserOpID)); - leveldb::Iterator* iter = recover_db_->NewIterator(leveldb::ReadOptions()); - for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){ - Slice Recover_key = iter->key(); - batch.Delete(Recover_key); - } - delete iter; - recover_db_->Write(leveldb::WriteOptions(), &batch); - } - else{ - return false; - } - - return true; -} - - - -bool NewDB::CreateIndexOnField(const std::string& field_name) { - // 将索引字段元数据持久化到indexDB - index_db_->Put(WriteOptions(), "index_field_write:" + field_name, "1"); - std::unique_lock lock(db_mutex_, std::defer_lock); - lock.lock(); - indexed_fields_write.insert(field_name); - lock.unlock(); - - const size_t BATCH_SIZE_LIMIT = 1000; // 每个批次限制1000条记录 - size_t batch_count = 0; - - // 遍历 data_db_ 中的所有键值对 - leveldb::ReadOptions read_options; - leveldb::WriteBatch batch; - std::unique_ptr it(data_db_->NewIterator(read_options)); - - for (it->SeekToFirst(); it->Valid(); it->Next()) { - // 获取主数据库的键值对 - std::string key = it->key().ToString(); - std::string value = it->value().ToString(); - - // 解析值,提取字段数组 - FieldArray fields = ParseValue(value); - - // 查找指定字段 - for (const auto& field : fields) { - if (field.first == field_name) { - // 构造索引键 - std::string index_key = ConstructIndexKey(key, field); - - // 插入到索引数据库 - batch.Put(index_key, Slice()); - - batch_count++; - // 如果达到批次大小限制,执行写入 - if (batch_count >= BATCH_SIZE_LIMIT) { - leveldb::Status status = index_db_->Write(leveldb::WriteOptions(), &batch); - if (!status.ok()) { - return false; - } - batch.Clear(); // 清空批次,准备下一个批次 - batch_count = 0; - } - } - } - } - - // 如果迭代器出错 - if (!it->status().ok()) { - // 从头开始创建,覆盖掉原本失效的东西 - return false; - } - // 批量写入 - leveldb::WriteOptions write_options; - Status status = index_db_->Write(write_options, &batch); - if (!status.ok()) { - return false; - } - - // 将索引字段元数据持久化到indexDB - index_db_->Put(WriteOptions(), "index_field_read:" + field_name, "1"); - lock.lock(); - indexed_fields_read.insert(field_name); - lock.unlock(); - return true; -} - -std::vector NewDB::QueryByIndex(Field &field){ - std::vector matching_keys; - - if(indexed_fields_read.find(field.first) != indexed_fields_read.end()){ - std::string prefix = ConstructIndexKey(Slice(), field); //前缀-字段名+字段值 - leveldb::Iterator* iter = index_db_->NewIterator(leveldb::ReadOptions()); - for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){ - Slice index_key = iter->key(); - std::string data_key = ExtractIndexKey(index_key); //解析索引键中的key - matching_keys.push_back(data_key); - } - delete iter; - - // check these matching_keys and remove keys not in datadb - for (auto& key:matching_keys) { - FieldArray fields; - Status s = data_db_->Get_fields(leveldb::ReadOptions(), key, &fields); - - // if field not in this k-v? - int tag = 0; - if(s.ok()){ - for(auto& each_field : fields){ - if(each_field.first == field.first){ - if(each_field.second == field.second){ - tag = 1; - } - } - } - } - if(tag == 0){ - matching_keys.erase(std::remove(matching_keys.begin(),matching_keys.end(),key),matching_keys.end()); - } - // if(!s.ok()){ - // matching_keys.erase(std::remove(matching_keys.begin(),matching_keys.end(),key),matching_keys.end()); - // } - } - } - - return matching_keys; -} - -bool NewDB::DeleteIndex(const std::string& field_name){ - // 将索引字段元数据持久化到indexDB - index_db_->Delete(WriteOptions(), "index_field_read:" + field_name); - index_db_->Delete(WriteOptions(), "index_field_write:" + field_name); - std::unique_lock lock(db_mutex_, std::defer_lock); - lock.lock(); - indexed_fields_read.erase(field_name); - indexed_fields_write.erase(field_name); - lock.unlock(); - - const size_t BATCH_SIZE_LIMIT = 1000; // 每个批次限制1000条记录 - size_t batch_count = 0; - - leveldb::WriteBatch delete_batch; - - std::string prefix = field_name; - leveldb::Iterator* iter = index_db_->NewIterator(leveldb::ReadOptions()); - for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){ - Slice index_key = iter->key(); - delete_batch.Delete(index_key); - batch_count++; - if (batch_count >= BATCH_SIZE_LIMIT) { - leveldb::Status status = index_db_->Write(leveldb::WriteOptions(), &delete_batch); - if (!status.ok()) { - return false; - } - delete_batch.Clear(); // 清空批次,准备下一个批次 - batch_count = 0; - } - } - delete iter; - - Status s = index_db_->Write(leveldb::WriteOptions(), &delete_batch); - if(!s.ok()){ - return false; - } - - return true; -} - -} - - - - - diff --git a/db/NewDB.h b/db/NewDB.h deleted file mode 100644 index 21a51b6..0000000 --- a/db/NewDB.h +++ /dev/null @@ -1,79 +0,0 @@ -#ifndef NEWDB_H -#define NEWDB_H - -#include -#include -#include -#include -#include "leveldb/db.h" -#include "db/log_writer.h" - -namespace leveldb{ - -class LEVELDB_EXPORT NewDB { - public: - NewDB() = default; - - NewDB(const NewDB&) = delete; - NewDB& operator=(const NewDB&) = delete; - - virtual ~NewDB(); - - std::string SerializeValue(const FieldArray& fields); - FieldArray ParseValue(const std::string& value_str); - std::string ConstructIndexKey(const Slice& key, const Field& field); - std::string ExtractIndexKey(const Slice& key); - std::string ConstructRecoverKey(std::string UserOpID, std::string TinyOpID, std::string DBname); - std::string ConstructRecoverValue(std::string TinyOp, std::string key, std::string value); - std::string ExtractRecoverKey(std::string s); - std::pair ExtractRecoverValue(std::string s, std::string* TinyOp); - std::string ConstructUserOpID(std::thread::id thread_id); - - static Status Open(const Options& options, const std::string& name, - NewDB** dbptr); // 改为返回 NewDB* 类型-橙 - - Status Put_fields(const WriteOptions& options, const Slice& key, - const FieldArray& fields); - - std::pair UpdateIndex(const WriteOptions& options, const Slice& key, - const FieldArray& fields, const FieldArray& current_fields); - - - Status Get_fields(const ReadOptions& options, const Slice& key, - FieldArray* fields); - - bool Delete(const WriteOptions& options, const Slice& key); - - std::vector FindKeysByField(Field &field); - - // std::string ConstructKey(const Slice& key, const Field& field); - - // std::string ExtractKey(const Slice& key); - - bool CreateIndexOnField(const std::string& field_name); - - std::vector QueryByIndex(Field &field); - - bool DeleteIndex(const std::string& field_name); - - // 用于存储已经为其创建索引的字段名称。只有当字段名在这个集合中时,才会在 indexDB 中插入对应的索引条目。 - std::unordered_set indexed_fields_read; - std::unordered_set indexed_fields_write; - - std::unordered_set putting_keys; - - private: - std::unique_ptr data_db_; - - std::unique_ptr index_db_; - - std::unique_ptr recover_db_; - - std::mutex db_mutex_; // 用于跨数据库操作的互斥锁 - - uint64_t TinyOpID; -}; - -} - -#endif \ No newline at end of file