diff --git a/db/NewDB.cc b/db/NewDB.cc new file mode 100644 index 0000000..1602ce3 --- /dev/null +++ b/db/NewDB.cc @@ -0,0 +1,663 @@ +#include "db/NewDB.h" +#include "util/coding.h" +#include "db/write_batch_internal.h" +#include "db/version_set.h" +#include "db/db_impl.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace leveldb{ +NewDB::~NewDB() { + +} + +// row lock +std::condition_variable cv; +std::mutex m; +bool ready = false; +// row lock + +std::string NewDB::SerializeValue(const FieldArray& fields){ + std::string value_str; + for(const auto& pair : fields){ + std::string field = pair.first + ":" + pair.second; + uint32_t field_size = field.size(); + char buffer[4]; + EncodeFixed32(buffer, field_size); + value_str.append(buffer, 4); + value_str.append(field); + } + return value_str; + + // std::string s; + // for(auto& field : fields){ + // PutLengthPrefixedSlice(&s, Slice(field.first)); + // PutLengthPrefixedSlice(&s, Slice(field.second)); + // } + // return s; +} + +FieldArray NewDB::ParseValue(const std::string& value_str){ + FieldArray fields; + const char* data = value_str.data(); + size_t length = value_str.size(); + + while (length >= 4) { + uint32_t field_size = DecodeFixed32(data); + if (length < 4 + field_size) { + break; + } + + std::string field(data + 4, field_size); + size_t colon_pos = field.find(':'); + + std::string field_name = field.substr(0, colon_pos); + std::string field_value = field.substr(colon_pos + 1); + + fields.push_back(std::make_pair(field_name,field_value)); + + data += 4 + field_size; + length -= 4 + field_size; + } + + return fields; + + // Slice input(value_str); + // Slice v1,v2; + // FieldArray fields; + // while(v1!=Slice()){ + // GetLengthPrefixedSlice(&input, &v1); + // GetLengthPrefixedSlice(&input, &v2); + // fields.push_back(std::make_pair(v1.ToString(), v2.ToString())); + // } + // return fields; +} + +// 构造索引的key,结构:FieldName_FieldValue_Key-橙 +std::string NewDB::ConstructIndexKey(const Slice& key, const Field& field){ + // std::string s; + // PutLengthPrefixedSlice(&s, Slice(field.first)); + // PutLengthPrefixedSlice(&s, Slice(field.second)); + // PutLengthPrefixedSlice(&s, key); + // return s; + + std::ostringstream oss; + oss << field.first << ":" << field.second << "_" << key.ToString(); + return oss.str(); +} + +// 从索引键中提取原数据的键 +std::string NewDB::ExtractIndexKey(const Slice& key){ + // Slice input(key.ToString()); + // Slice v; + // GetLengthPrefixedSlice(&input, &v); + // GetLengthPrefixedSlice(&input, &v); + // GetLengthPrefixedSlice(&input, &v); + // return v.ToString(); + + //extract key of dataDB from key of indexDB + std::string fullKey(key.data(), key.size()); + // size_t pos1 = fullKey.find('_'); + size_t pos1 = fullKey.find('_'); + return fullKey.substr(pos1 + 1); // 提取 'Key' 部分 +} + +std::string NewDB::ConstructRecoverKey(std::string UserOpID, std::string TinyOpID, std::string DBname){ + std::string s; + PutLengthPrefixedSlice(&s, Slice(UserOpID)); + PutLengthPrefixedSlice(&s, Slice(TinyOpID)); + PutLengthPrefixedSlice(&s, Slice(DBname)); + return s; +} + +std::string NewDB::ConstructRecoverValue(std::string TinyOp, std::string key, std::string value){ + std::string s; + PutLengthPrefixedSlice(&s, Slice(TinyOp)); + PutLengthPrefixedSlice(&s, Slice(key)); + PutLengthPrefixedSlice(&s, Slice(value)); + return s; +} + +std::string NewDB::ExtractRecoverKey(std::string s){ + Slice input(s); + Slice v; + GetLengthPrefixedSlice(&input, &v); + GetLengthPrefixedSlice(&input, &v); + GetLengthPrefixedSlice(&input, &v); + return v.ToString(); +} + +std::pair NewDB::ExtractRecoverValue(std::string s, std::string* TinyOp){ + Slice input(s); + Slice v; + GetLengthPrefixedSlice(&input, &v); + *TinyOp = v.ToString(); + GetLengthPrefixedSlice(&input, &v); + std::string key = v.ToString(); + GetLengthPrefixedSlice(&input, &v); + std::string value = v.ToString(); + return std::make_pair(key, value); +} + +std::string NewDB::ConstructUserOpID(std::thread::id thread_id){ + auto now = std::chrono::system_clock::now(); + std::time_t now_t = std::chrono::system_clock::to_time_t(now); + + std::ostringstream oss; + oss << thread_id; + std::string UserOpID = std::to_string(now_t) + oss.str(); + return UserOpID; +} + + +Status NewDB::Open(const Options& options, const std::string& name, NewDB** dbptr) { + // 打开底层的数据库 + DB* dataDB; + Status s1 = DB::Open(options, name + "_data", &dataDB); + if (!s1.ok()) { + return s1; // 如果打开失败,返回错误 + } + + DB* indexDB; + Status s2 = DB::Open(options, name + "_index", &indexDB); + if (!s2.ok()) { + return s2; // 如果打开失败,返回错误 + } + + DB* recoverDB; + Status s3 = DB::Open(options, name + "_recover", &recoverDB); + if (!s3.ok()) { + return s3; // 如果打开失败,返回错误 + } + // 创建一个 NewDB 实例 + *dbptr = new NewDB(); + + // 初始化 data_db_,index_db_和recover_db_ + (*dbptr)->data_db_ = std::unique_ptr(dataDB); // 将打开的数据库指针传递给 data_db_ + (*dbptr)->index_db_ = std::unique_ptr(indexDB); // 创建 IndexDB + (*dbptr)->recover_db_ = std::unique_ptr(recoverDB); + + (*dbptr)->TinyOpID = 0; + + // recover dataDB&indexDB using recoverDB + leveldb::Iterator* iter = recoverDB->NewIterator(leveldb::ReadOptions()); + Slice recover_key; + for(iter->SeekToFirst(); iter->Valid(); iter->Next()){ + recover_key = iter->key(); + std::string DBname = (*dbptr)->ExtractRecoverKey(recover_key.ToString()); + std::string TinyOp; + std::pair k_v = (*dbptr)->ExtractRecoverValue(iter->value().ToString(), &TinyOp); + if(DBname == "dataDB"){ + if(TinyOp == "PUT"){ + dataDB->Put(leveldb::WriteOptions(), k_v.first, k_v.second); + } + else{ + dataDB->Delete(leveldb::WriteOptions(), k_v.first); + } + } + else{ + if(TinyOp == "PUT"){ + indexDB->Put(leveldb::WriteOptions(), k_v.first, k_v.second); + } + else{ + indexDB->Delete(leveldb::WriteOptions(), k_v.first); + } + } + recoverDB->Delete(leveldb::WriteOptions(), recover_key); + } + // std::cout << "recover " << i << " rows, end key is " << keystr << std::endl; + delete iter; + + // 重新构造indexed_fields + Iterator* it = (*dbptr)->index_db_->NewIterator(ReadOptions()); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + // 只关心以 "index_field" 为前缀的键 + if (it->key().starts_with("index_field_read:")) { + std::string field = it->key().ToString().substr(16); // 获取字段名 + (*dbptr)->indexed_fields_read.insert(field); + } + if (it->key().starts_with("index_field_write:")) { + std::string field = it->key().ToString().substr(17); // 获取字段名 + (*dbptr)->indexed_fields_write.insert(field); + } + } + delete it; + + return Status::OK(); // 返回成功 +} + + +Status NewDB::Put_fields(const WriteOptions& options, const Slice& key, const FieldArray& fields){ + std::string UserOpID = ConstructUserOpID(std::this_thread::get_id()); + + // row lock + std::unique_lock lock(db_mutex_, std::defer_lock); + std::unique_lock row_lock(m, std::defer_lock); + ready = (putting_keys.find(key.ToString()) == putting_keys.end()); // no another thread putting the same key + + if (!ready) { + cv.wait(row_lock, []{return ready;}); + } + + lock.lock(); + putting_keys.insert(key.ToString()); + lock.unlock(); + // row lock + + FieldArray current_fields; + Status s = Get_fields(leveldb::ReadOptions(), key, ¤t_fields); + + leveldb::WriteBatch data_batch; + leveldb::WriteBatch index_batch; + leveldb::WriteBatch recover_batch; + + // uint64_t TinyOpID = 0; + + if(!s.ok()){ + for (const auto& field : fields) { + // 如果字段名在 indexed_fields_ 中,才插入二级索引 + if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) { + // 构造索引的key,结构:FieldName_FieldValue_Key + std::string index_key = ConstructIndexKey(key, field); + index_batch.Put(index_key, Slice()); + // prepare recover_batch + std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); + std::string Recover_value = ConstructRecoverValue("PUT", index_key, ""); + recover_batch.Put(Recover_key, Recover_value); + + TinyOpID = TinyOpID + 1; + } + } + std::string value = SerializeValue(fields); + data_batch.Put(key.ToString(), value); + + // put dataDB's k-v into recover_batch + std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "dataDB"); + std::string Recover_value = ConstructRecoverValue("PUT", key.ToString(), value); + recover_batch.Put(Recover_key, Recover_value); + + TinyOpID = TinyOpID + 1; + } + + else{ + std::pair fieldarray_pair = UpdateIndex(options, key, fields, current_fields); + + // put ops in indexdb + for (const auto& field : fieldarray_pair.first) { + if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) { + // put into index_batch + std::string index_key = ConstructIndexKey(key, field); + index_batch.Put(index_key, Slice()); + // put into recover_batch + std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); + std::string Recover_value = ConstructRecoverValue("PUT", index_key, ""); + recover_batch.Put(Recover_key, Recover_value); + + TinyOpID = TinyOpID + 1; + } + } + + // put ops in datadb + std::string value = SerializeValue(fields); + data_batch.Put(key.ToString(), value); + + // put dataDB's k-v into recover_batch + std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "dataDB"); + std::string Recover_value = ConstructRecoverValue("PUT", key.ToString(), value); + recover_batch.Put(Recover_key, Recover_value); + TinyOpID = TinyOpID + 1; + + // delete ops in indexdb + for (const auto& field : fieldarray_pair.second) { + if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) { + // delete in index_batch + std::string index_key = ConstructIndexKey(key, field); + index_batch.Delete(index_key); + // delete in recover_batch + std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); + std::string Recover_value = ConstructRecoverValue("DELETE", index_key, ""); + recover_batch.Put(Recover_key, Recover_value); + + TinyOpID = TinyOpID + 1; + } + } + } + + // write into RocoverDB + leveldb::WriteOptions write_options; + s = recover_db_->Write(write_options, &recover_batch); + if(!s.ok()){ + return s; + } + + // write into indexDB + Status status = index_db_->Write(write_options, &index_batch); + if(!status.ok()){ + return status; + } + + // write into dataDB + Status data_status = data_db_->Write(write_options, &data_batch); + if (!data_status.ok()) { + return data_status; // 主数据写入失败,直接返回 + } + + //delete TinyOps of this UserOp in RecoverDB + leveldb::WriteBatch batch; + std::string prefix; + PutLengthPrefixedSlice(&prefix, Slice(UserOpID)); + leveldb::Iterator* iter = recover_db_->NewIterator(leveldb::ReadOptions()); + for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){ + Slice Recover_key = iter->key(); + batch.Delete(Recover_key); + } + delete iter; + recover_db_->Write(leveldb::WriteOptions(), &batch); + + // row lock + lock.lock(); + putting_keys.erase(key.ToString()); + lock.unlock(); + + ready = true; + cv.notify_all(); + // row lock + + return Status::OK(); +} + +std::pair NewDB::UpdateIndex(const WriteOptions& options, const Slice& key, const FieldArray& fields, const FieldArray& current_fields){ + // 构建current_fields的映射 + std::unordered_map current_map; + for (const auto& field : current_fields) { + current_map[field.first] = field.second; + } + + // 构建fields的映射 + std::unordered_map fields_map; + for (const auto& field : fields) { + fields_map[field.first] = field.second; + } + + // 删除的字段 + FieldArray deleted_fields; + // 新增的字段 + FieldArray insert_fields; + + // fields中的字段 + for (const auto& field : fields) { + // fields中存在但current_fields中不存在 + if (current_map.find(field.first) == current_map.end()) { + insert_fields.push_back(field); + } else if (current_map[field.first] != field.second) { + // current_fields中存在但是值不同 + insert_fields.push_back(field); + deleted_fields.push_back(std::make_pair(field.first, current_map[field.first])); + } + } + + // current_fields中的字段 + for (const auto& field : current_fields) { + if (fields_map.find(field.first) == fields_map.end()) { + deleted_fields.push_back(field); + } + } + + return std::make_pair(insert_fields, deleted_fields); +} + +Status NewDB::Get_fields(const ReadOptions& options, const Slice& key, FieldArray* fields){ + //get value(fields) with key + // 从 DataDB 获取数据 + Status s = data_db_->Get_fields(options, key, fields); + if (!s.ok()) { + return s; // 如果获取失败,返回状态 + } + return Status::OK(); +} + +std::vector NewDB::FindKeysByField(Field &field){ + //get keys with field + std::vector matching_keys; + leveldb::DB* db = data_db_.get(); + matching_keys = data_db_->FindKeysByField(db, field); + return matching_keys; +} + +bool NewDB::Delete(const WriteOptions& options, const Slice& key){ + std::string UserOpID = ConstructUserOpID(std::this_thread::get_id()); + + leveldb::WriteBatch data_batch; + leveldb::WriteBatch index_batch; + leveldb::WriteBatch recover_batch; + // uint64_t TinyOpID = 0; + + FieldArray fields; + Status s = data_db_->Get_fields(leveldb::ReadOptions(), key, &fields); + + if(s.ok()){ + // delete in datadb + data_batch.Delete(key.ToString()); + + std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "dataDB"); + std::string Recover_value = ConstructRecoverValue("DELETE", key.ToString(), ""); + recover_batch.Put(Recover_key, Recover_value); + TinyOpID = TinyOpID + 1; + + // delete in indexdb + for (const auto& field : fields) { + // 如果字段名在 indexed_fields_ 中,才插入二级索引 + if (indexed_fields_write.find(field.first) != indexed_fields_write.end()) { + // 构造索引的key,结构:FieldName_FieldValue_Key + std::string index_key = ConstructIndexKey(key, field); + index_batch.Delete(index_key); + // prepare recover_batch + std::string Recover_key = ConstructRecoverKey(UserOpID, std::to_string(TinyOpID), "indexDB"); + std::string Recover_value = ConstructRecoverValue("DELETE", index_key, ""); + recover_batch.Put(Recover_key, Recover_value); + + TinyOpID = TinyOpID + 1; + } + } + + // write recoverDB + Status s = recover_db_->Write(leveldb::WriteOptions(), &recover_batch); + if(!s.ok()){ + return false; + } + + // write dataDB + s = data_db_->Write(leveldb::WriteOptions(), &data_batch); + if(!s.ok()){ + return false; + } + + // write indexDB + s = index_db_->Write(leveldb::WriteOptions(), &index_batch); + if(!s.ok()){ + return false; + } + + //delete TinyOps of this UserOp in RecoverDB + leveldb::WriteBatch batch; + std::string prefix; + PutLengthPrefixedSlice(&prefix, Slice(UserOpID)); + leveldb::Iterator* iter = recover_db_->NewIterator(leveldb::ReadOptions()); + for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){ + Slice Recover_key = iter->key(); + batch.Delete(Recover_key); + } + delete iter; + recover_db_->Write(leveldb::WriteOptions(), &batch); + } + else{ + return false; + } + + return true; +} + + + +bool NewDB::CreateIndexOnField(const std::string& field_name) { + // 将索引字段元数据持久化到indexDB + index_db_->Put(WriteOptions(), "index_field_write:" + field_name, "1"); + std::unique_lock lock(db_mutex_, std::defer_lock); + lock.lock(); + indexed_fields_write.insert(field_name); + lock.unlock(); + + const size_t BATCH_SIZE_LIMIT = 1000; // 每个批次限制1000条记录 + size_t batch_count = 0; + + // 遍历 data_db_ 中的所有键值对 + leveldb::ReadOptions read_options; + leveldb::WriteBatch batch; + std::unique_ptr it(data_db_->NewIterator(read_options)); + + for (it->SeekToFirst(); it->Valid(); it->Next()) { + // 获取主数据库的键值对 + std::string key = it->key().ToString(); + std::string value = it->value().ToString(); + + // 解析值,提取字段数组 + FieldArray fields = ParseValue(value); + + // 查找指定字段 + for (const auto& field : fields) { + if (field.first == field_name) { + // 构造索引键 + std::string index_key = ConstructIndexKey(key, field); + + // 插入到索引数据库 + batch.Put(index_key, Slice()); + + batch_count++; + // 如果达到批次大小限制,执行写入 + if (batch_count >= BATCH_SIZE_LIMIT) { + leveldb::Status status = index_db_->Write(leveldb::WriteOptions(), &batch); + if (!status.ok()) { + return false; + } + batch.Clear(); // 清空批次,准备下一个批次 + batch_count = 0; + } + } + } + } + + // 如果迭代器出错 + if (!it->status().ok()) { + // 从头开始创建,覆盖掉原本失效的东西 + return false; + } + // 批量写入 + leveldb::WriteOptions write_options; + Status status = index_db_->Write(write_options, &batch); + if (!status.ok()) { + return false; + } + + // 将索引字段元数据持久化到indexDB + index_db_->Put(WriteOptions(), "index_field_read:" + field_name, "1"); + lock.lock(); + indexed_fields_read.insert(field_name); + lock.unlock(); + return true; +} + +std::vector NewDB::QueryByIndex(Field &field){ + std::vector matching_keys; + + if(indexed_fields_read.find(field.first) != indexed_fields_read.end()){ + std::string prefix = ConstructIndexKey(Slice(), field); //前缀-字段名+字段值 + leveldb::Iterator* iter = index_db_->NewIterator(leveldb::ReadOptions()); + for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){ + Slice index_key = iter->key(); + std::string data_key = ExtractIndexKey(index_key); //解析索引键中的key + matching_keys.push_back(data_key); + } + delete iter; + + // check these matching_keys and remove keys not in datadb + for (auto& key:matching_keys) { + FieldArray fields; + Status s = data_db_->Get_fields(leveldb::ReadOptions(), key, &fields); + + // if field not in this k-v? + int tag = 0; + if(s.ok()){ + for(auto& each_field : fields){ + if(each_field.first == field.first){ + if(each_field.second == field.second){ + tag = 1; + } + } + } + } + if(tag == 0){ + matching_keys.erase(std::remove(matching_keys.begin(),matching_keys.end(),key),matching_keys.end()); + } + // if(!s.ok()){ + // matching_keys.erase(std::remove(matching_keys.begin(),matching_keys.end(),key),matching_keys.end()); + // } + } + } + + return matching_keys; +} + +bool NewDB::DeleteIndex(const std::string& field_name){ + // 将索引字段元数据持久化到indexDB + index_db_->Delete(WriteOptions(), "index_field_read:" + field_name); + index_db_->Delete(WriteOptions(), "index_field_write:" + field_name); + std::unique_lock lock(db_mutex_, std::defer_lock); + lock.lock(); + indexed_fields_read.erase(field_name); + indexed_fields_write.erase(field_name); + lock.unlock(); + + const size_t BATCH_SIZE_LIMIT = 1000; // 每个批次限制1000条记录 + size_t batch_count = 0; + + leveldb::WriteBatch delete_batch; + + std::string prefix = field_name; + leveldb::Iterator* iter = index_db_->NewIterator(leveldb::ReadOptions()); + for(iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()){ + Slice index_key = iter->key(); + delete_batch.Delete(index_key); + batch_count++; + if (batch_count >= BATCH_SIZE_LIMIT) { + leveldb::Status status = index_db_->Write(leveldb::WriteOptions(), &delete_batch); + if (!status.ok()) { + return false; + } + delete_batch.Clear(); // 清空批次,准备下一个批次 + batch_count = 0; + } + } + delete iter; + + Status s = index_db_->Write(leveldb::WriteOptions(), &delete_batch); + if(!s.ok()){ + return false; + } + + return true; +} + +} + + + + + diff --git a/db/NewDB.h b/db/NewDB.h new file mode 100644 index 0000000..21a51b6 --- /dev/null +++ b/db/NewDB.h @@ -0,0 +1,79 @@ +#ifndef NEWDB_H +#define NEWDB_H + +#include +#include +#include +#include +#include "leveldb/db.h" +#include "db/log_writer.h" + +namespace leveldb{ + +class LEVELDB_EXPORT NewDB { + public: + NewDB() = default; + + NewDB(const NewDB&) = delete; + NewDB& operator=(const NewDB&) = delete; + + virtual ~NewDB(); + + std::string SerializeValue(const FieldArray& fields); + FieldArray ParseValue(const std::string& value_str); + std::string ConstructIndexKey(const Slice& key, const Field& field); + std::string ExtractIndexKey(const Slice& key); + std::string ConstructRecoverKey(std::string UserOpID, std::string TinyOpID, std::string DBname); + std::string ConstructRecoverValue(std::string TinyOp, std::string key, std::string value); + std::string ExtractRecoverKey(std::string s); + std::pair ExtractRecoverValue(std::string s, std::string* TinyOp); + std::string ConstructUserOpID(std::thread::id thread_id); + + static Status Open(const Options& options, const std::string& name, + NewDB** dbptr); // 改为返回 NewDB* 类型-橙 + + Status Put_fields(const WriteOptions& options, const Slice& key, + const FieldArray& fields); + + std::pair UpdateIndex(const WriteOptions& options, const Slice& key, + const FieldArray& fields, const FieldArray& current_fields); + + + Status Get_fields(const ReadOptions& options, const Slice& key, + FieldArray* fields); + + bool Delete(const WriteOptions& options, const Slice& key); + + std::vector FindKeysByField(Field &field); + + // std::string ConstructKey(const Slice& key, const Field& field); + + // std::string ExtractKey(const Slice& key); + + bool CreateIndexOnField(const std::string& field_name); + + std::vector QueryByIndex(Field &field); + + bool DeleteIndex(const std::string& field_name); + + // 用于存储已经为其创建索引的字段名称。只有当字段名在这个集合中时,才会在 indexDB 中插入对应的索引条目。 + std::unordered_set indexed_fields_read; + std::unordered_set indexed_fields_write; + + std::unordered_set putting_keys; + + private: + std::unique_ptr data_db_; + + std::unique_ptr index_db_; + + std::unique_ptr recover_db_; + + std::mutex db_mutex_; // 用于跨数据库操作的互斥锁 + + uint64_t TinyOpID; +}; + +} + +#endif \ No newline at end of file