#include "leveldb/write_batch.h" #include "db/dbformat.h" #include "leveldb/db.h" #include "util/coding.h" #include "leveldb/my_leveldb.h" #include "unordered_map" #include namespace leveldb { //反序列化为字段数组 void MyLevelDB::ParseValue(const std::string& value_str, FieldArray& resFieldArray) { std::stringstream ss(value_str); std::string segment; // 按逗号分割字符串 while (std::getline(ss, segment, ',')) { std::string key; std::string value; std::stringstream kv(segment); if (std::getline(kv, key, ':') && std::getline(kv, value, ':')) { if (!key.empty() && !value.empty()) { resFieldArray.push_back(std::make_pair(key, value)); // std::cout << ((resFieldArray.back()).first).data() << std::endl; } else { std::cerr << "Invalid key-value pair: " << segment << std::endl; } } else { std::cerr << "Failed to parse segment: " << segment << std::endl; } } } //序列化为字符串 void MyLevelDB::SerializeValue(const FieldArray& fields, std::string& resString) { resString.clear(); for (int i = 0; i < fields.size(); i++) { const std::string& key = fields[i].first; const std::string& value = fields[i].second; resString += key + ":" +value; if (i != fields.size() - 1) { resString += ","; } } } Status MyLevelDB::PutWithFields(const WriteOptions& options, const std::string& key, const FieldArray& fields) { std::string value; SerializeValue(fields, value); // 将 fields 序列化为字符串 auto slice_key = Slice(key.c_str()); auto slice_value = Slice(value.c_str()); WriteBatch batch; Status s = _fields_db->Put(options, slice_key, slice_value); // 写入数据 if (!s.ok()) { return s; // 如果写入失败,直接返回错误 } // 记录所有操作,以便失败时可以回滚 std::vector> changes; changes.push_back({key, value}); // 记录字段数据的变化 // 更新索引 std::unordered_map match; std::unique_lock l(mutex_); for (int i = 0; i < fields.size(); i++) { for (size_t idx = 0; idx < index_list_.size(); idx++) { const auto& i_name = index_list_[idx]; if (fields[i].first == i_name) { match[i] = idx; // 找到匹配的字段 break; } } } // 生成索引的键值对 for (auto item : match) { std::string composed_key; composed_key += fields[item.first].second + ":" + key; batch.Put(Slice(composed_key.c_str()), Slice()); // 将索引数据加入批处理中 changes.push_back({composed_key, ""}); // 记录索引的变化 } // 提交批处理 s = _fields_db->Write(options, &batch); if (!s.ok()) { // 如果批处理写入失败,回滚之前的操作 std::cerr << "Failed to commit batch, rolling back." << std::endl; // 执行回滚:撤销所有已经提交的变更 for (const auto& change : changes) { _fields_db->Delete(WriteOptions(), Slice(change.first)); // 删除已修改的键值对 } return s; // 返回错误,表示操作失败 } return Status::OK(); // 一切正常,返回成功 } Status MyLevelDB::FindKeysByField(const ReadOptions& options, const Field field, std::vector* keys) { auto it = _fields_db->NewIterator(options); it->SeekToFirst(); keys->clear(); while (it->Valid()) { auto val = it->value(); FieldArray arr; auto str_val = std::string(val.data(), val.size()); ParseValue(str_val, arr); for (auto pr : arr) { if (pr.first == field.first && pr.second == field.second) { Slice key = it->key(); keys->push_back(std::string(key.data(), key.size())); break; } } it->Next(); } delete it; return Status::OK(); } Status MyLevelDB::CreateIndexOnField(const std::string& field_name) { // 检查索引是否已经存在 for (const auto& field : this->index_list_) { if (field == field_name) { return Status::InvalidArgument(field_name, "Index already exists for this field"); } } // 将新的索引字段添加到索引列表 index_list_.push_back(field_name); Options op = _op; DB* field_db; op.index_mode = true; WriteBatch batch; // 创建批处理操作 Status status = DB::Open(op, _db_name + "_index_" + field_name, &field_db); if (!status.ok()) { std::cerr << "Failed to open index DB: " << status.ToString() << std::endl; return status; // 如果打开数据库失败,返回错误 } index_db.push_back(field_db); // 将新创建的索引数据库添加到列表 // 记录所有变更 std::vector changes; changes.push_back("index_" + field_name); // 记录索引字段的变更 // 在批处理中添加索引数据 batch.Put(Slice(("index_" + field_name).c_str()), Slice()); // 添加索引记录到批处理中 // 提交批处理 status = _fields_db->Write(WriteOptions(), &batch); if (!status.ok()) { // 如果写入失败,执行回滚操作 std::cerr << "Failed to commit index: " << status.ToString() << std::endl; // 执行回滚:撤销所有已做的更改 for (const auto& change : changes) { _fields_db->Delete(WriteOptions(), Slice(change.c_str())); // 删除已创建的索引 } return status; // 返回失败状态,确保不会提交任何变更 } return Status::OK(); // 操作成功,返回成功状态 } Status MyLevelDB::DeleteIndex(std::string& field_name) { // 查找索引字段 auto it = std::find(index_list_.begin(), index_list_.end(), field_name); if (it == index_list_.end()) { return Status::NotFound("Index not found for this field"); } WriteBatch batch; // 创建批处理操作 // 删除索引字段 index_list_.erase(it); batch.Delete(Slice(("index_" + field_name).c_str())); // 删除索引字段记录 // 记录已删除的索引 std::vector changes; changes.push_back("index_" + field_name); // 提交批处理 Status s = _fields_db->Write(WriteOptions(), &batch); if (!s.ok()) { // 如果删除操作失败,回滚事务 std::cerr << "Failed to delete index: " << s.ToString() << std::endl; // 执行回滚:恢复删除的索引 for (const auto& change : changes) { batch.Put(Slice(change.c_str()), Slice()); // 恢复索引记录 } _fields_db->Write(WriteOptions(), &batch); // 再次提交恢复的批处理 return s; // 返回失败,确保不会提交任何变化 } return Status::OK(); // 删除成功,返回成功状态 } void MyLevelDB::QueryByIndex(const ReadOptions& options, Field& field, std::vector& keys) { int i = 0; for (; i < index_list_.size(); i++) { if (index_list_[i] == field.first) { break; } } assert(i != index_list_.size()); auto it = index_db[i]->NewIterator(options); it->SeekToFirst(); while (it->Valid()) { auto val = it->key(); auto str_val = std::string(val.data(), val.size()); std::string key; std::string value; std::stringstream kv(str_val); std::getline(kv, key, ':'); std::getline(kv, value, ':'); if (key == field.second) { keys.push_back(value); } it->Next(); } delete it; } Status MyLevelDB::Put(const WriteOptions& options, const Slice& key, const Slice& value) { return _fields_db->Put(options, key, value); } Status MyLevelDB::Delete(const WriteOptions& options, const Slice& key) { return _fields_db->Delete(options, key); } Status MyLevelDB::Write(const WriteOptions& options, WriteBatch* updates) { assert(0); return Status(); } Status MyLevelDB::Get(const ReadOptions& options, const Slice& key, std::string* value) { return _fields_db->Get(options, key, value); } Iterator* MyLevelDB::NewIterator(const ReadOptions& options) { return _fields_db->NewIterator(options); } const Snapshot* MyLevelDB::GetSnapshot() { return _fields_db->GetSnapshot(); } void MyLevelDB::ReleaseSnapshot(const Snapshot* snapshot) { return _fields_db->ReleaseSnapshot(snapshot); } bool MyLevelDB::GetProperty(const Slice& property, std::string* value) { return false; } void MyLevelDB::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { /* uint64_t temp = 0; _main_db->GetApproximateSizes(range, n, sizes); for (auto& index_db : field_db_) { index_db->GetApproximateSizes(range, n, &temp); *sizes += temp; }*/ } void MyLevelDB::CompactRange(const Slice* begin, const Slice* end) { _fields_db->CompactRange(begin, end); } }