#include "fielddb/request.h" #include #include "leveldb/slice.h" #include "leveldb/status.h" #include "leveldb/write_batch.h" #include "util/mutexlock.h" #include "util/serialize_value.h" #include "fielddb/encode_index.h" #include "fielddb/field_db.h" #include "fielddb/meta.h" #include "request.h" namespace fielddb { using namespace leveldb; //为虚函数提供最基本的实现 void Request::PendReq(Request *req) { assert(0); } //为虚函数提供最基本的实现 void Request::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) { assert(0); } void Request::Prepare(FieldDB *DB) { assert(0); } void Request::Finalize(FieldDB *DB) { assert(0); } //为虚函数提供最基本的实现 bool Request::isPending() { //pending中的请求的parent会指向所等待的请求(iCreate/iDelete) return parent != this; } /*******FieldsReq*******/ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) { if (batchKeySet.find(Key) != batchKeySet.end()){ return;//并发的被合并的put/delete请求只处理一次 } else { batchKeySet.insert(Key); } std::string val_str; Status s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str); FieldArray *oldFields; if (s.IsNotFound()){ oldFields = nullptr; } else if (s.ok()) { //得到数据库之前key的fields, 判断需不需要删除其中潜在的索引 oldFields = ParseValue(val_str); } else { assert(0); } KVBatch.Put(Slice(*Key), Slice(SerializeValue(*Fields))); bool HasIndex = false; bool HasOldIndex = false; { // MutexLock L(&DB->index_mu); //互斥访问索引状态表 DB->index_mu.AssertHeld(); //1.将存在冲突的put pend到对应的请求 for(auto [field_name,field_value] : *Fields) { if(field_name == "") break; if(DB->index_.count(field_name)) { auto [index_status,parent_req] = DB->index_[field_name]; if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { parent_req->PendReq(this); return; } else if(index_status == IndexStatus::Exist) { HasIndex = true; } //assert(0); } } //冲突也可能存在于,需要删除旧数据的索引,但该索引正在创删中 if (oldFields != nullptr){ for(auto [field_name,field_value] : *oldFields) { if(field_name == "") break; if(DB->index_.count(field_name)) { auto [index_status,parent_req] = DB->index_[field_name]; if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { parent_req->PendReq(this); return; } else if(index_status == IndexStatus::Exist) { HasOldIndex = true; } //assert(0); } } } //2.对于没有冲突但含有索引操作的put,构建metaKV,这里直接将KV对简单编码后写入metaDB if(HasIndex || HasOldIndex) { Slice MetaKey,MetaValue; std::string serialized = SerializeValue(*Fields); MetaKV MKV = MetaKV(Key,&serialized); MKV.TransPut(MetaKey, MetaValue); MetaBatch.Put(MetaKey, MetaValue); //3.1对于含有索引的oldfield删除索引 if (HasOldIndex) { for(auto [field_name,field_value] : *oldFields) { if(field_name == "") continue; if(DB->index_.count(field_name)) { std::string indexKey; AppendIndexKey(&indexKey, ParsedInternalIndexKey( *Key,field_name,field_value)); IndexBatch.Delete(indexKey); } } } //3.2对于含有索引的field建立索引 if (HasIndex) { for(auto [field_name,field_value] : *Fields) { if(field_name == "") continue; if(DB->index_.count(field_name)) { std::string indexKey; AppendIndexKey(&indexKey, ParsedInternalIndexKey( *Key,field_name,field_value)); IndexBatch.Put(indexKey, Slice()); } } } } //优化:对于3.1,3.2中都有的索引只写一次 } } /*******DeleteReq*******/ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) { if (batchKeySet.find(Key) != batchKeySet.end()){ return;//并发的被合并的put/delete请求只处理一次 } else { batchKeySet.insert(Key); } //1. 读取当前的最新的键值对,判断是否存在含有键值对的field //2.1 如果无,则正常构造delete //2.2 如果是有的field的索引状态都是exist,则在meta中写KV_Deleting类型的记录 //在kvDB和metaDB中写入对应的delete //2.3 如果存在field的索引状态是Creating或者Deleting,那么在那个队列上面进行等待 std::string val_str; DB->kvDB_->Get(ReadOptions(), *Key, &val_str); FieldArray *Fields = ParseValue(val_str); KVBatch.Delete(Slice(*Key)); bool HasIndex = false; { // MutexLock L(&DB->index_mu); //互斥访问索引状态表 DB->index_mu.AssertHeld(); //1.将存在冲突的delete pend到对应的请求 for(auto [field_name,field_value] : *Fields) { if(field_name == "") break; if(DB->index_.count(field_name)) { auto [index_status,parent_req] = DB->index_[field_name]; if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { parent_req->PendReq(this); return; } else if(index_status == IndexStatus::Exist) { HasIndex = true; } //assert(0); } } //2.对于没有冲突但含有索引操作的delete,构建metaKV,这里直接将KV对简单编码后写入metaDB if(HasIndex) { Slice MetaKey; MetaKV MKV = MetaKV(Key); MKV.TransDelete(MetaKey); //meta中写入一个delete不需要value MetaBatch.Put(MetaKey, Slice()); //3.对于含有索引的field删除索引 for(auto [field_name,field_value] : *Fields) { if(field_name == "") continue; if(DB->index_.count(field_name)) { std::string indexKey; AppendIndexKey(&indexKey, ParsedInternalIndexKey( *Key,field_name,field_value)); IndexBatch.Delete(indexKey); } } } } } /*******iCreateReq*******/ void iCreateReq::Prepare(FieldDB *DB) { //在index_中完成索引状态更新,在这里可以避免重复创建 DB->index_mu.AssertHeld(); if(DB->index_.count(*Field)) { auto [istatus,parent] = DB->index_[*Field]; if(istatus == IndexStatus::Exist) { //如果已经完成建立索引,则返回成功 done = true; Existed = true; s = Status::OK(); } else { //如果正在创建或删除,那么进行等待 parent->PendReq(this); } return; } //如果索引状态表中没有,则表示尚未创建,更新相应的状态 //这里将done设置为true表示在taskqueue中需要完成的部分已经完成,不需要pend DB->index_[*Field] = {IndexStatus::Creating,this}; done = true; } void iCreateReq::PendReq(Request *req) { req->parent = this; pending_list.push_back(req); } void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) { //遍历数据库,构建二级索引到indexbatch,(更新metaDB中的元数据为Index类型的(Field,Creating)) //一个indexwritebatch写入,那么索引创建删除应该和metadb没有交互 std::vector> keysAndVal = DB->FindKeysAndValByFieldName(*Field); Slice value = Slice(); for (auto &kvPair : keysAndVal){ std::string indexKey; AppendIndexKey(&indexKey, ParsedInternalIndexKey(kvPair.first, *Field, kvPair.second)); IndexBatch.Put(indexKey, value); } } void iCreateReq::Finalize(FieldDB *DB) { //1. 写入完成后,更新index状态表,(并将metaDB的值改为Index类型的(Field,Existing)) MutexLock iL(&DB->index_mu); DB->index_[*Field] = {IndexStatus::Exist, nullptr}; DB->index_mu.Unlock(); if (pending_list.empty()) return; //2. 将所有的pendinglist重新入队 MutexLock L(&DB->mutex_); for (auto req : pending_list){ DB->taskqueue_.push_back(req); req->parent = req; //解绑 } if (pending_list[0] == DB->taskqueue_.front()) { pending_list[0]->cond_.Signal(); } this->s = Status::OK(); } /*******iDeleteReq*******/ void iDeleteReq::Prepare(FieldDB *DB) { DB->index_mu.AssertHeld(); if(DB->index_.count(*Field) == 0) { done = true; Deleted = true; s = Status::OK(); return ; } auto [istatus,parent] = DB->index_[*Field]; if(istatus == IndexStatus::Exist) { DB->index_[*Field] = {IndexStatus::Deleting,this}; done = true; } else { //如果正在创建或者删除,那么pend到对应的请求上 parent->PendReq(this); } } void iDeleteReq::PendReq(Request* req) { req->parent = this; pending_list.push_back(req); } void iDeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) { std::vector> keysAndVal = DB->FindKeysAndValByFieldName(*Field); Slice value = Slice(); for (auto &kvPair : keysAndVal){ std::string indexKey; AppendIndexKey(&indexKey, ParsedInternalIndexKey(kvPair.first, *Field, kvPair.second)); IndexBatch.Delete(indexKey); } } void iDeleteReq::Finalize(FieldDB *DB) { MutexLock iL(&DB->index_mu); DB->index_.erase(*Field); DB->index_mu.Unlock(); if (pending_list.empty()) return; //2. 将所有的pendinglist重新入队 MutexLock L(&DB->mutex_); for (auto req : pending_list){ DB->taskqueue_.push_back(req); req->parent = req; //解绑 } if (pending_list[0] == DB->taskqueue_.front()) { pending_list[0]->cond_.Signal(); } this->s = Status::OK(); } } // namespace fielddb