|
|
@ -58,14 +58,25 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, |
|
|
|
std::string val_str; |
|
|
|
Status s = Status::NotFound("test"); |
|
|
|
uint64_t start_ = DB->env_->NowMicros(); |
|
|
|
s = DB->kvDB_->Get(ReadOptions(), Key.ToString(), &val_str); |
|
|
|
s = DB->kvDB_->Get(ReadOptions(), Key, &val_str); |
|
|
|
DB->construct_FieldsReq_Read_elapsed += DB->env_->NowMicros() - start_; |
|
|
|
FieldArray *oldFields; |
|
|
|
// FieldArray *oldFields;
|
|
|
|
FieldSliceArray oldFields; |
|
|
|
if (s.IsNotFound()){ |
|
|
|
oldFields = nullptr; |
|
|
|
// oldFields = nullptr;
|
|
|
|
} else if (s.ok()) { //得到数据库之前key的fields, 判断需不需要删除其中潜在的索引
|
|
|
|
oldFields = new FieldArray; |
|
|
|
oldFields = ParseValue(val_str,oldFields); |
|
|
|
// oldFields = new FieldArray;
|
|
|
|
// oldFields = ParseValue(val_str,oldFields);
|
|
|
|
Slice nameSlice, valSlice; |
|
|
|
Slice Value(val_str); |
|
|
|
while(GetLengthPrefixedSlice(&Value, &nameSlice)) { |
|
|
|
if(GetLengthPrefixedSlice(&Value, &valSlice)) { |
|
|
|
oldFields.push_back({nameSlice,valSlice}); |
|
|
|
} else { |
|
|
|
std::cout << "name and val not match! From FieldsReq Init" << std::endl; |
|
|
|
} |
|
|
|
nameSlice.clear(), valSlice.clear(); |
|
|
|
} |
|
|
|
} else { |
|
|
|
assert(0); |
|
|
|
} |
|
|
@ -76,8 +87,8 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, |
|
|
|
// MutexLock L(&DB->index_mu); //互斥访问索引状态表
|
|
|
|
DB->index_mu.AssertHeld(); |
|
|
|
//1.将存在冲突的put pend到对应的请求
|
|
|
|
for(auto [field_name,field_value] : SliceFields) { |
|
|
|
if(field_name == EMPTY) break; |
|
|
|
for(auto &[field_name,field_value] : SliceFields) { |
|
|
|
if(field_name.data() == EMPTY) break; |
|
|
|
if(DB->index_.count(field_name.ToString())) { |
|
|
|
auto [index_status,parent_req] = DB->index_[field_name.ToString()]; |
|
|
|
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { |
|
|
@ -90,11 +101,11 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, |
|
|
|
} |
|
|
|
} |
|
|
|
//冲突也可能存在于,需要删除旧数据的索引,但该索引正在创删中
|
|
|
|
if (oldFields != nullptr){ |
|
|
|
for(auto [field_name,field_value] : *oldFields) { |
|
|
|
if(field_name == EMPTY) break; |
|
|
|
if(DB->index_.count(field_name)) { |
|
|
|
auto [index_status,parent_req] = DB->index_[field_name]; |
|
|
|
if (!oldFields.empty()){ |
|
|
|
for(auto &[field_name,field_value] : oldFields) { |
|
|
|
if(field_name.data() == EMPTY) break; |
|
|
|
if(DB->index_.count(field_name.ToString())) { |
|
|
|
auto [index_status,parent_req] = DB->index_[field_name.ToString()]; |
|
|
|
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { |
|
|
|
parent_req->PendReq(this->parent); |
|
|
|
return; |
|
|
@ -118,9 +129,9 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, |
|
|
|
|
|
|
|
//3.1对于含有索引的oldfield删除索引
|
|
|
|
if (HasOldIndex) { |
|
|
|
for(auto [field_name,field_value] : *oldFields) { |
|
|
|
if(field_name == EMPTY) continue; |
|
|
|
if(DB->index_.count(field_name)) { |
|
|
|
for(auto &[field_name,field_value] : oldFields) { |
|
|
|
if(field_name.data() == EMPTY) continue; |
|
|
|
if(DB->index_.count(field_name.ToString())) { |
|
|
|
std::string indexKey; |
|
|
|
AppendIndexKey(&indexKey, ParsedInternalIndexKey( |
|
|
|
Key,field_name,field_value)); |
|
|
@ -131,8 +142,8 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, |
|
|
|
|
|
|
|
//3.2对于含有索引的field建立索引
|
|
|
|
if (HasIndex) { |
|
|
|
for(auto [field_name,field_value] : SliceFields) { |
|
|
|
if(field_name == EMPTY) continue; |
|
|
|
for(auto &[field_name,field_value] : SliceFields) { |
|
|
|
if(field_name.data() == EMPTY) continue; |
|
|
|
if(DB->index_.count(field_name.ToString())) { |
|
|
|
std::string indexKey; |
|
|
|
AppendIndexKey(&indexKey, ParsedInternalIndexKey( |
|
|
@ -146,7 +157,7 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, |
|
|
|
//优化:对于3.1,3.2中都有的索引只写一次
|
|
|
|
} |
|
|
|
|
|
|
|
if(oldFields) delete oldFields; |
|
|
|
// if(oldFields) delete oldFields;
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -168,18 +179,29 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, |
|
|
|
std::string val_str; |
|
|
|
Status s = DB->kvDB_->Get(ReadOptions(), Key, &val_str); |
|
|
|
if (s.IsNotFound()) return; |
|
|
|
FieldArray *Fields = new FieldArray; |
|
|
|
ParseValue(val_str,Fields); |
|
|
|
// FieldArray *Fields = new FieldArray;
|
|
|
|
// ParseValue(val_str,Fields);
|
|
|
|
FieldSliceArray Fields; |
|
|
|
Slice nameSlice, valSlice; |
|
|
|
Slice Value(val_str); |
|
|
|
while(GetLengthPrefixedSlice(&Value, &nameSlice)) { |
|
|
|
if(GetLengthPrefixedSlice(&Value, &valSlice)) { |
|
|
|
Fields.push_back({nameSlice,valSlice}); |
|
|
|
} else { |
|
|
|
std::cout << "name and val not match! From FieldsReq Init" << std::endl; |
|
|
|
} |
|
|
|
nameSlice.clear(), valSlice.clear(); |
|
|
|
} |
|
|
|
KVBatch.Delete(Slice(Key)); |
|
|
|
bool HasIndex = false; |
|
|
|
{ |
|
|
|
// MutexLock L(&DB->index_mu); //互斥访问索引状态表
|
|
|
|
DB->index_mu.AssertHeld(); |
|
|
|
//1.将存在冲突的delete pend到对应的请求
|
|
|
|
for(auto [field_name,field_value] : *Fields) { |
|
|
|
if(field_name == EMPTY) break; |
|
|
|
if(DB->index_.count(field_name)) { |
|
|
|
auto [index_status,parent_req] = DB->index_[field_name]; |
|
|
|
for(auto &[field_name,field_value] : Fields) { |
|
|
|
if(field_name.data() == EMPTY) break; |
|
|
|
if(DB->index_.count(field_name.ToString())) { |
|
|
|
auto [index_status,parent_req] = DB->index_[field_name.ToString()]; |
|
|
|
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { |
|
|
|
parent_req->PendReq(this->parent); |
|
|
|
return; |
|
|
@ -197,9 +219,9 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, |
|
|
|
MKV.TransDelete(MetaKey); //meta中写入一个delete不需要value
|
|
|
|
MetaBatch.Put(MetaKey, Slice()); |
|
|
|
//3.对于含有索引的field删除索引
|
|
|
|
for(auto [field_name,field_value] : *Fields) { |
|
|
|
if(field_name == EMPTY) continue; |
|
|
|
if(DB->index_.count(field_name)) { |
|
|
|
for(auto &[field_name,field_value] : Fields) { |
|
|
|
if(field_name.data() == EMPTY) continue; |
|
|
|
if(DB->index_.count(field_name.ToString())) { |
|
|
|
std::string indexKey; |
|
|
|
AppendIndexKey(&indexKey, ParsedInternalIndexKey( |
|
|
|
Key,field_name,field_value)); |
|
|
@ -208,7 +230,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
delete Fields; |
|
|
|
// delete Fields;
|
|
|
|
} |
|
|
|
|
|
|
|
/*******iCreateReq*******/ |
|
|
@ -384,13 +406,14 @@ BatchReq::~BatchReq() { |
|
|
|
void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, |
|
|
|
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) |
|
|
|
{ |
|
|
|
WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch; |
|
|
|
// WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch;
|
|
|
|
std::unordered_set<std::string> Sub_batchKeySet; |
|
|
|
//由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代
|
|
|
|
uint64_t start_ = DB->env_->NowMicros(); |
|
|
|
for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) { |
|
|
|
uint64_t start_sub = DB->env_->NowMicros(); |
|
|
|
(*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet); |
|
|
|
// (*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet);
|
|
|
|
(*subreq)->ConstructBatch(KVBatch, IndexBatch, MetaBatch, DB, batchKeySet); |
|
|
|
DB->construct_BatchReq_perSub_elapsed += DB->env_->NowMicros() - start_sub; |
|
|
|
DB->count_Batch_Sub ++; |
|
|
|
//所有的对于pendreq的调用传入的参数被改成了this->parent,因此,对于subrequests来说,
|
|
|
@ -399,17 +422,17 @@ void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
DB->construct_BatchReq_Sub_elapsed += DB->env_->NowMicros() - start_; |
|
|
|
if(Sub_KVBatch.ApproximateSize() > 12) { |
|
|
|
KVBatch.Append(Sub_KVBatch); |
|
|
|
} |
|
|
|
if(Sub_IndexBatch.ApproximateSize() > 12) { |
|
|
|
IndexBatch.Append(Sub_IndexBatch); |
|
|
|
} |
|
|
|
if(Sub_MetaBatch.ApproximateSize() > 12) { |
|
|
|
MetaBatch.Append(Sub_MetaBatch); |
|
|
|
} |
|
|
|
batchKeySet.insert(Sub_batchKeySet.begin(),Sub_batchKeySet.end()); |
|
|
|
// DB->construct_BatchReq_Sub_elapsed += DB->env_->NowMicros() - start_;
|
|
|
|
// if(Sub_KVBatch.ApproximateSize() > 12) {
|
|
|
|
// KVBatch.Append(Sub_KVBatch);
|
|
|
|
// }
|
|
|
|
// if(Sub_IndexBatch.ApproximateSize() > 12) {
|
|
|
|
// IndexBatch.Append(Sub_IndexBatch);
|
|
|
|
// }
|
|
|
|
// if(Sub_MetaBatch.ApproximateSize() > 12) {
|
|
|
|
// MetaBatch.Append(Sub_MetaBatch);
|
|
|
|
// }
|
|
|
|
// batchKeySet.insert(Sub_batchKeySet.begin(),Sub_batchKeySet.end());
|
|
|
|
DB->construct_BatchReq_elapsed += DB->env_->NowMicros() - start_; |
|
|
|
} |
|
|
|
|
|
|
|