Browse Source

update Put & delete with writebatch to ensuring Atomicity

main
kevinyao0901 1 month ago
parent
commit
6a7b6eadaf
2 changed files with 164 additions and 49 deletions
  1. +103
    -0
      README.md
  2. +61
    -49
      db/db_impl.cc

+ 103
- 0
README.md View File

@ -165,6 +165,109 @@ Status DBImpl::DeleteIndex(const std::string& fieldName) {
---
### 5. **对 `Put``Delete` 方法的内容更新描述**
为了在 `Put``Delete` 操作中同步更新二级索引,我们对代码进行了以下扩展:
#### **Put 方法**
`Put` 方法中,新增逻辑检查并更新字段索引:
1. **字段值提取与检查**
- 遍历所有已创建索引的字段列表 (`fieldWithIndex_`)。
- 检查待插入数据值 (`val`) 中是否包含当前字段。
- 如果字段存在,提取该字段的值 (`fieldValue`)。
2. **构建索引键与插入索引数据库**
- 使用字段名和字段值组合构建索引键 (`field:fieldValue`)。
- 将该索引键与原始键 (`key`) 写入二级索引数据库 `indexDb_`
- 如果写入操作失败,立即返回错误状态。
此逻辑保证在 `Put` 方法中,对 `fieldWithIndex_` 中的每个字段都可以维护最新的索引关系。
#### **Delete 方法**
`Delete` 方法中,新增逻辑检查并移除相关字段索引:
1. **字段值提取与检查**
- 遍历所有已创建索引的字段列表 (`fieldWithIndex_`)。
- 检查待删除数据键 (`key`) 中是否包含当前字段。
- 如果字段存在,提取该字段的值 (`fieldValue`)。
2. **构建索引键与删除索引条目**
- 使用字段名和字段值组合构建索引键 (`field:fieldValue`)。
- 从二级索引数据库 `indexDb_` 中删除该索引键。
- 如果删除操作失败,立即返回错误状态。
此逻辑确保在 `Delete` 操作中能够正确移除已删除记录对应的二级索引条目。
---
### 6.**数据插入与删除原子性的实现**
为确保主数据库 (`DBImpl`) 和二级索引数据库 (`indexDb_`) 的一致性,我们在 `Put``Delete` 方法中采用了事务处理机制 (`WriteBatch`),以实现原子性操作。具体实现如下:
#### **插入数据的实现**
1. **主数据写入**
`Put` 方法中,首先将主数据写入操作 (`key` 和 `val`) 添加到事务批次 (`WriteBatch`) 中。
2. **解析字段值更新索引**
遍历 `fieldWithIndex_`(即需要建立索引的字段列表),在 `val` 中提取对应字段的值。如果字段值非空,则构建索引键值对,例如 `fieldName:fieldValue -> key`,并将该索引插入到 `indexDb_` 中。
3. **事务提交**
利用 `WriteBatch` 将主数据库写入操作和索引更新操作一并提交。通过 `this->Write` 确保事务的原子性,即所有写入操作成功或全部失败。
```cpp
WriteBatch batch; // 创建事务
batch.Put(key, val); // 写入主数据库
for (const auto& field : fieldWithIndex_) {
std::string fieldValue = 提取字段值(val, field);
if (!fieldValue.empty()) {
std::string indexKey
std::string indexValue
batch.Put(indexKey, indexValue); // 添加索引写入操作
}
}
Status s = this->Write(o, &batch); // 事务提交
```
#### **删除数据的实现**
1. **主数据删除**
`Delete` 方法中,将主数据库的删除操作加入事务。
2. **解析字段值删除索引**
遍历 `fieldWithIndex_`,根据 `key` 提取字段值,并构建对应的索引键。例如,从 `key` 提取 `fieldValue`,构建索引键 `fieldName:fieldValue`,将其从 `indexDb_` 中删除。
3. **事务提交**
将主数据库删除操作和索引删除操作合并为一个事务,通过 `this->Write` 一并提交。
```cpp
WriteBatch batch; // 创建事务
batch.Delete(key); // 删除主数据库记录
for (const auto& field : fieldWithIndex_) {
std::string fieldValue = 提取字段值(key, field);
if (!fieldValue.empty()) {
std::string indexKey ;
batch.Delete(indexKey); // 添加索引删除操作
}
}
Status s = this->Write(options, &batch); // 事务提交
```
#### **事务处理的优点**
- **原子性**:通过 `WriteBatch`,可以将主数据的更新和索引的更新/删除操作捆绑为一个原子事务,避免因系统崩溃导致的不一致性问题。
- **简化代码逻辑**:事务批次的使用使得多步骤操作整合到统一的提交过程,降低了代码的复杂性。
- **一致性保障**:如果某个步骤失败,整个事务会回滚,保证数据库状态的一致性。
通过这种设计,我们实现了主数据库和二级索引的紧密联动,确保了在插入和删除操作中的数据一致性。
---
### 示例流程
1. 插入原始数据:
```

+ 61
- 49
db/db_impl.cc View File

@ -1236,67 +1236,79 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
// Convenience methods
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
//ToDo
WriteBatch batch; // 创建事务
Status s;
// 遍历fieldWithIndex_,检查是否需要更新索引
for (const auto& field : fieldWithIndex_) {
// 提取字段值
size_t field_pos = val.ToString().find(field + ":");
if (field_pos != std::string::npos) {
size_t value_start = field_pos + field.size() + 1; // 跳过 "fieldName:"
size_t value_end = val.ToString().find("|", value_start); // 查找下一个分隔符
if (value_end == std::string::npos) {
value_end = val.ToString().size();
}
std::string fieldValue = val.ToString().substr(value_start, value_end - value_start);
// 在主数据库写入数据
batch.Put(key, val);
// 遍历fieldWithIndex_,检查是否需要更新索引
for (const auto& field : fieldWithIndex_) {
size_t field_pos = val.ToString().find(field + ":");
if (field_pos != std::string::npos) {
size_t value_start = field_pos + field.size() + 1; // 跳过 "fieldName:"
size_t value_end = val.ToString().find("|", value_start); // 查找下一个分隔符
if (value_end == std::string::npos) {
value_end = val.ToString().size();
}
std::string fieldValue = val.ToString().substr(value_start, value_end - value_start);
if (!fieldValue.empty()) {
// 构建索引键 (例如:fieldValue -> key)
std::string indexKey = field + ":" + fieldValue;
std::string indexValue = key.ToString();
// 在indexDb_中插入二级索引
s = indexDb_->Put(o, Slice(indexKey), Slice(indexValue));
if (!s.ok()) {
return s;
}
}
}
}
if (!fieldValue.empty()) {
std::string indexKey = field + ":" + fieldValue;
std::string indexValue = key.ToString();
// 将索引插入操作加入事务
batch.Put(Slice(indexKey), Slice(indexValue));
}
}
}
// 使用 `this->Write` 提交事务
s = this->Write(o, &batch);
if (!s.ok()) {
return s;
}
return Status::OK();
//ToDo end
return DB::Put(o, key, val);
//return DB::Put(o, key, val);
}
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
//ToDo
WriteBatch batch; // 创建事务
Status s;
// 遍历fieldWithIndex_,检查是否需要删除索引
for (const auto& field : fieldWithIndex_) {
// 假设key是包含字段的格式,提取字段值
size_t field_pos = key.ToString().find(field + ":");
if (field_pos != std::string::npos) {
size_t value_start = field_pos + field.size() + 1;
size_t value_end = key.ToString().find("|", value_start);
if (value_end == std::string::npos) {
value_end = key.ToString().size();
}
std::string fieldValue = key.ToString().substr(value_start, value_end - value_start);
// 在主数据库删除数据
batch.Delete(key);
// 遍历fieldWithIndex_,检查是否需要删除索引
for (const auto& field : fieldWithIndex_) {
size_t field_pos = key.ToString().find(field + ":");
if (field_pos != std::string::npos) {
size_t value_start = field_pos + field.size() + 1;
size_t value_end = key.ToString().find("|", value_start);
if (value_end == std::string::npos) {
value_end = key.ToString().size();
}
std::string fieldValue = key.ToString().substr(value_start, value_end - value_start);
if (!fieldValue.empty()) {
// 构建索引键 (例如:fieldValue -> key)
std::string indexKey = field + ":" + fieldValue;
// 从indexDb_中删除二级索引
s = indexDb_->Delete(options, Slice(indexKey));
if (!s.ok()) {
return s;
}
}
}
}
if (!fieldValue.empty()) {
std::string indexKey = field + ":" + fieldValue;
// 将索引删除操作加入事务
batch.Delete(Slice(indexKey));
}
}
}
// 使用 `this->Write` 提交事务
s = this->Write(options, &batch);
if (!s.ok()) {
return s;
}
return Status::OK();
//ToDo end
return DB::Delete(options, key);
//return DB::Delete(options, key);
}
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {

Loading…
Cancel
Save