Explorar el Código

add rollback policy for Put and get & Update QueryByIndex

main
kevinyao0901 hace 3 semanas
padre
commit
c5361fc9dc
Se han modificado 2 ficheros con 167 adiciones y 103 borrados
  1. +67
    -41
      README.md
  2. +100
    -62
      db/db_impl.cc

+ 67
- 41
README.md Ver fichero

@ -101,30 +101,57 @@ Status DBImpl::CreateIndexOnField(const std::string& fieldName) {
---
### 3. **二级索引的查询**
`DBImpl` 中实现 `QueryByIndex` 方法,通过目标字段值查找对应的原始键:
- 在索引数据库中遍历 `fieldName:field_value` 开头的条目。
- 收集结果并返回。
在查询二级索引时,我们不再使用遍历所有索引的方式,而是直接利用 `Get` 方法根据索引键查找对应的值。这种方法避免了遍历所有索引的开销,提高了查询效率。
#### 核心代码:
```cpp
// 查询通过字段名索引的所有值
std::vector<std::string> DBImpl::QueryByIndex(const std::string& fieldName) {
std::vector<std::string> results;
leveldb::ReadOptions read_options;
leveldb::Iterator* it = indexDb_->NewIterator(read_options);
for (it->Seek(fieldName); it->Valid(); it->Next()) {
std::string value = it->value().ToString();
if (!value.empty()) {
results.push_back(value);
}
// 假设我们有一个存储索引的数据库 indexDb_
leveldb::ReadOptions read_options;
// 直接通过 Get 方法查找与 fieldName 相关的索引
std::string indexKey = fieldName; // fieldName 就是索引键
std::string value;
Status s = indexDb_->Get(read_options, Slice(indexKey), &value);
// 如果成功找到对应的值,将其加入结果中
if (s.ok()) {
results.push_back(value);
} else if (s.IsNotFound()) {
// 如果没有找到,返回空结果
std::cerr << "Index key not found: " << indexKey << std::endl;
} else {
// 处理其他错误
std::cerr << "Error querying index: " << s.ToString() << std::endl;
}
delete it;
return results;
}
```
#### 关键点说明:
- **`Get` 查询**:直接通过 `Get` 方法查找 `indexDb_` 中与 `fieldName` 对应的索引键。这样我们避免了遍历整个索引数据库,直接根据给定的键获取对应的值,查询效率得到了显著提高。
- **查询逻辑**:我们使用 `fieldName` 作为索引键,执行查询操作。如果查询成功,我们将返回的值(即主数据库的键)添加到结果集中。如果查询失败(例如索引键不存在),我们会输出相应的错误信息。
- **结果处理**:如果查询成功,返回的值将作为结果添加到返回的 `vector` 中;如果查询失败,系统会输出相应的错误信息,确保在查询操作中的透明度和可维护性。
#### 优点:
- **提高查询效率**:通过直接使用 `Get` 方法,我们不再需要遍历所有索引,这大大提升了查询的效率。
- **简化代码**:由于查询逻辑简化为一次 `Get` 调用,代码更加简洁易懂,减少了不必要的复杂性。
- **一致性保障**:通过 `Get` 查询,直接从索引数据库中获取与特定字段名相关的索引值,确保了结果的准确性。
通过这种方式,我们优化了二级索引的查询方法,提高了系统在处理索引查询时的性能,并且保证了查询结果的一致性和准确性。
---
### 4. **二级索引的删除**
@ -203,67 +230,66 @@ Status DBImpl::DeleteIndex(const std::string& fieldName) {
---
### 6.**数据插入与删除原子性的实现**
为确保主数据库 (`DBImpl`) 和二级索引数据库 (`indexDb_`) 的一致性,我们在 `Put``Delete` 方法中采用了事务处理机制 (`WriteBatch`),以实现原子性操作。具体实现如下:
### 6. **数据插入与删除原子性的实现**
#### **插入数据的实现**
1. **主数据写入**
`Put` 方法中,首先将主数据写入操作 (`key` 和 `val`) 添加到事务批次 (`WriteBatch`) 中。
2. **解析字段值更新索引**
遍历 `fieldWithIndex_`(即需要建立索引的字段列表),在 `val` 中提取对应字段的值。如果字段值非空,则构建索引键值对,例如 `fieldName:fieldValue -> key`,并将该索引插入到 `indexDb_` 中。
3. **事务提交**
利用 `WriteBatch` 将主数据库写入操作和索引更新操作一并提交。通过 `this->Write` 确保事务的原子性,即所有写入操作成功或全部失败。
在数据插入操作中,我们使用 `WriteBatch` 来确保对主数据库 (`DBImpl`) 和二级索引数据库 (`indexDb_`) 的操作能够作为一个原子事务提交。首先,将数据插入主数据库,然后根据预定义的字段集合 (`fieldWithIndex_`) 生成索引信息,并将这些索引数据插入到二级索引数据库中。如果插入操作中的任何一个步骤失败(例如,主数据库或索引数据库插入失败),整个事务将回滚,确保数据库的一致性。
```cpp
WriteBatch batch; // 创建事务
batch.Put(key, val); // 写入主数据库
// 在主数据库插入数据
batch.Put(key, val);
// 遍历并生成二级索引数据
for (const auto& field : fieldWithIndex_) {
std::string fieldValue = 提取字段值(val, field);
if (!fieldValue.empty()) {
std::string indexKey
std::string indexValue
batch.Put(indexKey, indexValue); // 添加索引写入操作
std::string indexKey = field + ":" + fieldValue;
std::string indexValue = key.ToString();
batch.Put(Slice(indexKey), Slice(indexValue)); // 将索引数据加入事务
}
}
Status s = this->Write(o, &batch); // 事务提交
// 提交事务
Status s = this->Write(o, &batch);
```
通过上述方式,插入操作确保了主数据库与二级索引数据库的一致性。如果插入过程中任何一个数据库发生错误,整个事务会回滚,不会进行部分写入,从而避免了数据不一致的情况。
#### **删除数据的实现**
1. **主数据删除**
`Delete` 方法中,将主数据库的删除操作加入事务。
2. **解析字段值删除索引**
遍历 `fieldWithIndex_`,根据 `key` 提取字段值,并构建对应的索引键。例如,从 `key` 提取 `fieldValue`,构建索引键 `fieldName:fieldValue`,将其从 `indexDb_` 中删除。
3. **事务提交**
将主数据库删除操作和索引删除操作合并为一个事务,通过 `this->Write` 一并提交。
删除数据的操作也采取了类似的原子性保证。首先,删除主数据库中的数据,然后检查是否存在与该数据相关的索引字段,如果存在,则删除二级索引数据库中的对应索引。与插入操作类似,如果在删除过程中出现任何错误(主数据库或索引数据库的删除操作失败),则回滚整个事务,确保两者的操作要么同时成功,要么同时失败。
```cpp
WriteBatch batch; // 创建事务
batch.Delete(key); // 删除主数据库记录
// 在主数据库删除数据
batch.Delete(key);
// 遍历并删除二级索引数据
for (const auto& field : fieldWithIndex_) {
std::string fieldValue = 提取字段值(key, field);
std::string fieldValue = 提取字段值(key, field);
if (!fieldValue.empty()) {
std::string indexKey ;
batch.Delete(indexKey); // 添加索引删除操作
std::string indexKey = field + ":" + fieldValue;
batch.Delete(Slice(indexKey)); // 删除索引数据
}
}
Status s = this->Write(options, &batch); // 事务提交
// 提交事务
Status s = this->Write(options, &batch);
```
这样,我们确保了无论是在主数据库还是在索引数据库中的删除操作都能保持一致性。如果删除操作中的任何一步失败,整个事务会被回滚,避免不一致的状态。
#### **事务处理的优点**
- **原子性**:通过 `WriteBatch`可以将主数据的更新和索引的更新/删除操作捆绑为一个原子事务,避免因系统崩溃导致的不一致性问题
- **简化代码逻辑**:事务批次的使用使得多步骤操作整合到统一的提交过程,降低了代码的复杂性。
- **一致性保障**如果某个步骤失败,整个事务会回滚,保证数据库状态的一致性
- **原子性**:通过 `WriteBatch`我们将主数据库的更新和索引的更新/删除操作捆绑成一个原子事务,避免了因系统崩溃或其他错误导致的数据库不一致性
- **简化代码逻辑**:事务批处理的使用使得多步骤操作能够统一提交,减少了代码的复杂性并提升了可维护性。
- **一致性保障**若事务中的任何步骤失败,整个事务会回滚,确保在数据库操作中的一致性,即主数据库和索引数据库的状态始终保持一致
通过这种设计,我们实现了主数据库和二级索引的紧密联动,确保在插入和删除操作中的数据一致性。
通过这种设计,我们实现了主数据库和二级索引数据库之间的紧密联动,确保在插入和删除操作中的数据一致性,提升了系统的健壮性和可靠性。
---

+ 100
- 62
db/db_impl.cc Ver fichero

@ -1236,39 +1236,73 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
// Convenience methods
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
//ToDo
WriteBatch batch; // 创建事务
WriteBatch batch; // 主数据库的事务
WriteBatch indexBatch; // 二级索引数据库的事务
Status s;
// 在主数据库写入数据
batch.Put(key, val);
// 遍历fieldWithIndex_,检查是否需要更新索引
for (const auto& field : fieldWithIndex_) {
size_t field_pos = val.ToString().find(field + ":");
if (field_pos != std::string::npos) {
size_t value_start = field_pos + field.size() + 1; // 跳过 "fieldName:"
size_t value_end = val.ToString().find("|", value_start); // 查找下一个分隔符
if (value_end == std::string::npos) {
value_end = val.ToString().size();
// 如果当前是 indexDb_ 的 Put 操作,只提交主数据库的事务
if (indexDb_ != nullptr) {
// 记录已插入的主数据库的键
std::vector<Slice> keysInserted;
// 在主数据库写入数据
batch.Put(key, val);
keysInserted.push_back(key);
// 遍历 fieldWithIndex_,检查是否需要更新索引
for (const auto& field : fieldWithIndex_) {
size_t field_pos = val.ToString().find(field + ":");
if (field_pos != std::string::npos) {
size_t value_start = field_pos + field.size() + 1; // 跳过 "fieldName:"
size_t value_end = val.ToString().find("|", value_start); // 查找下一个分隔符
if (value_end == std::string::npos) {
value_end = val.ToString().size();
}
std::string fieldValue = val.ToString().substr(value_start, value_end - value_start);
if (!fieldValue.empty()) {
std::string indexKey = field + ":" + fieldValue;
std::string indexValue = key.ToString();
// 将索引插入操作加入二级索引数据库的事务
indexBatch.Put(Slice(indexKey), Slice(indexValue));
}
}
}
// 提交主数据库的 WriteBatch
s = this->Write(o, &batch);
if (!s.ok()) {
return s; // 如果主数据库写入失败,则直接返回错误
}
// 提交二级索引数据库的 WriteBatch
s = indexDb_->Write(o, &indexBatch);
if (!s.ok()) {
// 如果二级索引数据库写入失败,回滚主数据库的写入
// 回滚操作:删除已经写入主数据库的键
for (const auto& insertedKey : keysInserted) {
batch.Delete(insertedKey); // 撤销主数据库的插入操作
}
std::string fieldValue = val.ToString().substr(value_start, value_end - value_start);
if (!fieldValue.empty()) {
std::string indexKey = field + ":" + fieldValue;
std::string indexValue = key.ToString();
// 将索引插入操作加入事务
batch.Put(Slice(indexKey), Slice(indexValue));
// 执行回滚:将主数据库的删除操作写入
Status rollbackStatus = this->Write(o, &batch);
if (!rollbackStatus.ok()) {
return rollbackStatus; // 如果回滚操作失败,返回回滚错误
}
return s; // 返回二级索引数据库写入失败的状态
}
} else {
// 如果是 indexDb_ 调用 Put,则只提交主数据库的 WriteBatch
batch.Put(key, val);
s = this->Write(o, &batch);
if (!s.ok()) {
return s;
}
}
// 使用 `this->Write` 提交事务
s = this->Write(o, &batch);
if (!s.ok()) {
return s;
}
return Status::OK();
return Status::OK(); // 成功时返回 OK
//ToDo end
//return DB::Put(o, key, val);
}
@ -1281,31 +1315,42 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
// 在主数据库删除数据
batch.Delete(key);
// 遍历fieldWithIndex_,检查是否需要删除索引
for (const auto& field : fieldWithIndex_) {
size_t field_pos = key.ToString().find(field + ":");
if (field_pos != std::string::npos) {
size_t value_start = field_pos + field.size() + 1;
size_t value_end = key.ToString().find("|", value_start);
if (value_end == std::string::npos) {
value_end = key.ToString().size();
}
std::string fieldValue = key.ToString().substr(value_start, value_end - value_start);
if (!fieldValue.empty()) {
std::string indexKey = field + ":" + fieldValue;
// 将索引删除操作加入事务
batch.Delete(Slice(indexKey));
// 如果不是在 indexDb_ 上调用 Delete,则需要处理索引删除
if (indexDb_ != nullptr) {
// 遍历fieldWithIndex_,检查是否需要删除索引
for (const auto& field : fieldWithIndex_) {
size_t field_pos = key.ToString().find(field + ":");
if (field_pos != std::string::npos) {
size_t value_start = field_pos + field.size() + 1;
size_t value_end = key.ToString().find("|", value_start);
if (value_end == std::string::npos) {
value_end = key.ToString().size();
}
std::string fieldValue = key.ToString().substr(value_start, value_end - value_start);
if (!fieldValue.empty()) {
std::string indexKey = field + ":" + fieldValue;
// 将索引删除操作加入事务
batch.Delete(Slice(indexKey));
}
}
}
}
// 使用 `this->Write` 提交事务
s = this->Write(options, &batch);
// 如果是 indexDb_ 调用,则仅提交该数据库的删除操作
if (this == indexDb_) {
s = indexDb_->Write(options, &batch); // 删除操作直接提交给 indexDb_
} else {
// 使用 `this->Write` 提交事务
s = this->Write(options, &batch); // 提交给主数据库和索引数据库的事务
}
if (!s.ok()) {
// 如果操作失败,则返回错误状态
return s;
}
return Status::OK();
//ToDo end
//return DB::Delete(options, key);
@ -1679,28 +1724,21 @@ Status DBImpl::DeleteIndex(const std::string& fieldName) {
// Query by index (retrieve all values indexed by a field)
std::vector<std::string> DBImpl::QueryByIndex(const std::string& fieldName) {
std::vector<std::string> results;
// 假设您有一个存储索引的数据库 indexDb_
// 例如:leveldb::DB* indexDb_;,并且它存储字段名称到值的映射
leveldb::ReadOptions read_options;
leveldb::Iterator* it = indexDb_->NewIterator(read_options);
for (it->Seek(fieldName); it->Valid(); it->Next()) {
std::string key = it->key().ToString();
std::string value = it->value().ToString();
// 仅将非空值添加到结果中
if (key == fieldName && !value.empty()) {
results.push_back(value);
}
std::string value;
// 使用 Get 方法从 indexDb_ 中获取与 fieldName 对应的值
leveldb::Status s = indexDb_->Get(read_options, fieldName, &value);
if (s.ok() && !value.empty()) {
// 如果获取的值非空,则将其添加到结果列表
results.push_back(value);
} else if (!s.ok()) {
// 如果查询发生错误,则处理错误
std::cerr << "Error querying index for field: " << fieldName
<< ". Status: " << s.ToString() << std::endl;
}
if (!it->status().ok()) {
// 处理查询错误
std::cerr << "Error querying index: " << it->status().ToString() << std::endl;
}
delete it;
return results;
}

Cargando…
Cancelar
Guardar