diff --git a/fielddb/field_db.cpp b/fielddb/field_db.cpp index 2e00ca9..c5c5ad4 100644 --- a/fielddb/field_db.cpp +++ b/fielddb/field_db.cpp @@ -82,15 +82,15 @@ Again: } Request *tail = GetHandleInterval(); WriteBatch KVBatch,IndexBatch,MetaBatch; + std::unordered_set batchKeySet; Status status; if(!tail->isiCreateReq() && !tail->isiDeleteReq()) { - // int debug = tail->type_; //表明这一个区间并没有涉及index的创建删除 { //1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。 MutexLock iL(&index_mu); for(auto *req_ptr : taskqueue_) { - req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this); + req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, batchKeySet); if(req_ptr == tail) break; } } @@ -147,7 +147,7 @@ Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice & // return kvDB_->Put(options, key, value); } -// TODO:需要对是否进行index更新做处理 +// 需要对是否进行index更新做处理 Status FieldDB::PutFields(const WriteOptions &Options, const Slice &key, const FieldArray &fields) { //这里是为了const和slice-string的转换被迫搞得 @@ -161,14 +161,14 @@ Status FieldDB::PutFields(const WriteOptions &Options, // return kvDB_->PutFields(Options, key, fields); } -// todo: 删除有索引的key时indexdb也要同步 +// 删除有索引的key时indexdb也要同步 Status FieldDB::Delete(const WriteOptions &options, const Slice &key) { - // - // std::string key_ = key.ToString(); - // DeleteReq req(&key_,&mutex_); - // Status status = HandleRequest(req); - // return status; - return kvDB_->Delete(options, key); + + std::string key_ = key.ToString(); + DeleteReq req(&key_,&mutex_); + Status status = HandleRequest(req); + return status; + // return kvDB_->Delete(options, key); } // TODO:根据updates里面的东西,要对是否需要更新index进行分别处理 Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) { @@ -220,7 +220,8 @@ Status FieldDB::CreateIndexOnField(const std::string& field_name) { return req.s; } WriteBatch KVBatch,IndexBatch,MetaBatch; - req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this); + std::unordered_set useless; + req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless); indexDB_->Write(WriteOptions(), &IndexBatch); req.Finalize(this); return req.s; @@ -235,7 +236,8 @@ Status FieldDB::DeleteIndex(const std::string &field_name) { return req.s; } WriteBatch KVBatch,IndexBatch,MetaBatch; - req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this); + std::unordered_set useless; + req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless); indexDB_->Write(WriteOptions(), &IndexBatch); req.Finalize(this); return req.s; diff --git a/fielddb/meta.cpp b/fielddb/meta.cpp index 970d1fb..a02f585 100644 --- a/fielddb/meta.cpp +++ b/fielddb/meta.cpp @@ -21,8 +21,8 @@ using namespace leveldb; // } -//对于含有index field的put的meta编码为 (KV|Key,Value) -void MetaKV::Trans(Slice &MetaKey,Slice &MetaValue) { +//对于含有index field的put/delete的meta编码为 (KV|Key,Value) +void MetaKV::TransPut(Slice &MetaKey,Slice &MetaValue) { MetaKey.clear(); MetaValue.clear(); std::string buf; @@ -32,6 +32,14 @@ void MetaKV::Trans(Slice &MetaKey,Slice &MetaValue) { MetaValue = Slice(*value); } +void MetaKV::TransDelete(Slice &MetaKey) { + MetaKey.clear(); + std::string buf; + PutFixed32(&buf, KV_Deleting); + PutLengthPrefixedSlice(&buf, Slice(*name)); + MetaKey = Slice(buf); +} + class CleanerHandler : public WriteBatch::Handler { public: WriteBatch *NeedClean; diff --git a/fielddb/meta.h b/fielddb/meta.h index eb3a927..3dba911 100644 --- a/fielddb/meta.h +++ b/fielddb/meta.h @@ -27,7 +27,7 @@ using namespace leveldb; // }; enum MetaType { - Index, //记录index状态的meta + //Index, //记录index状态的meta KV_Creating, //记录含有index field的put的meta KV_Deleting, }; @@ -35,9 +35,10 @@ enum MetaType { //将一对(field_name,field_value)转换到metaDB中的KV表示 class MetaKV { public: - MetaKV(std::string *field_name,std::string *field_value): + MetaKV(std::string *field_name,std::string *field_value = nullptr): name(field_name),value(field_value) { } - void Trans(Slice &MetaKey,Slice &MetaValue); + void TransPut(Slice &MetaKey,Slice &MetaValue); + void TransDelete(Slice &MetaKey); private: std::string *name; std::string *value; diff --git a/fielddb/request.cpp b/fielddb/request.cpp index 6d36caf..ce2f18e 100644 --- a/fielddb/request.cpp +++ b/fielddb/request.cpp @@ -19,7 +19,7 @@ void Request::PendReq(Request *req) { //为虚函数提供最基本的实现 void Request::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB) + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) { assert(0); } @@ -41,10 +41,28 @@ bool Request::isPending() { /*******FieldsReq*******/ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB) + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) { + if (batchKeySet.find(Key) != batchKeySet.end()){ + return;//并发的被合并的put/delete请求只处理一次 + } else { + batchKeySet.insert(Key); + } + std::string val_str; + Status s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str); + FieldArray *oldFields; + if (s.IsNotFound()){ + oldFields = nullptr; + } else if (s.ok()) { //得到数据库之前key的fields, 判断需不需要删除其中潜在的索引 + oldFields = ParseValue(val_str); + } else { + assert(0); + } + + KVBatch.Put(Slice(*Key), Slice(SerializeValue(*Fields))); bool HasIndex = false; + bool HasOldIndex = false; { // MutexLock L(&DB->index_mu); //互斥访问索引状态表 DB->index_mu.AssertHeld(); @@ -62,39 +80,118 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, //assert(0); } } + //冲突也可能存在于,需要删除旧数据的索引,但该索引正在创删中 + if (oldFields != nullptr){ + for(auto [field_name,field_value] : *oldFields) { + if(field_name == "") break; + if(DB->index_.count(field_name)) { + auto [index_status,parent_req] = DB->index_[field_name]; + if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { + parent_req->PendReq(this); + return; + } else if(index_status == IndexStatus::Exist) { + HasOldIndex = true; + } + //assert(0); + } + } + } + //2.对于没有冲突但含有索引操作的put,构建metaKV,这里直接将KV对简单编码后写入metaDB - if(HasIndex) { + if(HasIndex || HasOldIndex) { Slice MetaKey,MetaValue; std::string serialized = SerializeValue(*Fields); MetaKV MKV = MetaKV(Key,&serialized); - MKV.Trans(MetaKey, MetaValue); + MKV.TransPut(MetaKey, MetaValue); MetaBatch.Put(MetaKey, MetaValue); - //第三点是不是应该在这一分支中 - //3.对于含有索引的field建立索引 - for(auto [field_name,field_value] : *Fields) { - if(field_name == "") continue; - if(DB->index_.count(field_name)) { - std::string indexKey; - AppendIndexKey(&indexKey, ParsedInternalIndexKey( - *Key,field_name,field_value)); - IndexBatch.Put(indexKey, Slice()); + + + //3.1对于含有索引的oldfield删除索引 + if (HasOldIndex) { + for(auto [field_name,field_value] : *oldFields) { + if(field_name == "") continue; + if(DB->index_.count(field_name)) { + std::string indexKey; + AppendIndexKey(&indexKey, ParsedInternalIndexKey( + *Key,field_name,field_value)); + IndexBatch.Delete(indexKey); + } } } + + //3.2对于含有索引的field建立索引 + if (HasIndex) { + for(auto [field_name,field_value] : *Fields) { + if(field_name == "") continue; + if(DB->index_.count(field_name)) { + std::string indexKey; + AppendIndexKey(&indexKey, ParsedInternalIndexKey( + *Key,field_name,field_value)); + IndexBatch.Put(indexKey, Slice()); + } + } + } + } + //优化:对于3.1,3.2中都有的索引只写一次 } } /*******DeleteReq*******/ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB) + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) { - //TODO: + if (batchKeySet.find(Key) != batchKeySet.end()){ + return;//并发的被合并的put/delete请求只处理一次 + } else { + batchKeySet.insert(Key); + } //1. 读取当前的最新的键值对,判断是否存在含有键值对的field //2.1 如果无,则正常构造delete //2.2 如果是有的field的索引状态都是exist,则在meta中写KV_Deleting类型的记录 //在kvDB和metaDB中写入对应的delete //2.3 如果存在field的索引状态是Creating或者Deleting,那么在那个队列上面进行等待 + std::string val_str; + DB->kvDB_->Get(ReadOptions(), *Key, &val_str); + FieldArray *Fields = ParseValue(val_str); + KVBatch.Delete(Slice(*Key)); + bool HasIndex = false; + { + // MutexLock L(&DB->index_mu); //互斥访问索引状态表 + DB->index_mu.AssertHeld(); + //1.将存在冲突的delete pend到对应的请求 + for(auto [field_name,field_value] : *Fields) { + if(field_name == "") break; + if(DB->index_.count(field_name)) { + auto [index_status,parent_req] = DB->index_[field_name]; + if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { + parent_req->PendReq(this); + return; + } else if(index_status == IndexStatus::Exist) { + HasIndex = true; + } + //assert(0); + } + } + //2.对于没有冲突但含有索引操作的delete,构建metaKV,这里直接将KV对简单编码后写入metaDB + if(HasIndex) { + Slice MetaKey; + MetaKV MKV = MetaKV(Key); + MKV.TransDelete(MetaKey); //meta中写入一个delete不需要value + MetaBatch.Put(MetaKey, Slice()); + //3.对于含有索引的field删除索引 + for(auto [field_name,field_value] : *Fields) { + if(field_name == "") continue; + if(DB->index_.count(field_name)) { + std::string indexKey; + AppendIndexKey(&indexKey, ParsedInternalIndexKey( + *Key,field_name,field_value)); + IndexBatch.Delete(indexKey); + } + } + } + } } /*******iCreateReq*******/ @@ -126,7 +223,7 @@ void iCreateReq::PendReq(Request *req) { } void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB) + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) { //遍历数据库,构建二级索引到indexbatch,(更新metaDB中的元数据为Index类型的(Field,Creating)) //一个indexwritebatch写入,那么索引创建删除应该和metadb没有交互 @@ -185,7 +282,7 @@ void iDeleteReq::PendReq(Request* req) { } void iDeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB) + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) { std::vector> keysAndVal = DB->FindKeysAndValByFieldName(*Field); diff --git a/fielddb/request.h b/fielddb/request.h index e68c4f1..1ebebfd 100644 --- a/fielddb/request.h +++ b/fielddb/request.h @@ -5,6 +5,7 @@ #include "port/port_stdcxx.h" #include "util/mutexlock.h" #include "util/serialize_value.h" +#include // #include "fielddb/field_db.h" #ifndef REQUEST_H @@ -43,7 +44,7 @@ public: //用于含有Fields的 virtual void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB); + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet); //主要用于icreate和idelete在队列中的注册当前状态 virtual void Prepare(FieldDB *DB); virtual void Finalize(FieldDB *DB); @@ -65,7 +66,7 @@ public: Key(Key),Fields(Fields),Request(FieldsReq_t,mu) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; std::string *Key; FieldArray *Fields; @@ -90,7 +91,7 @@ public: Field(Field),Request(iCreateReq_t, mu),Existed(false) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; void Prepare(FieldDB *DB) override; void Finalize(FieldDB *DB) override; void PendReq(Request *req) override; @@ -107,7 +108,7 @@ public: Field(Field),Request(iDeleteReq_t, mu),Deleted(false) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; void Prepare(FieldDB *DB) override; void Finalize(FieldDB *DB) override; void PendReq(Request *req) override; @@ -124,7 +125,7 @@ public: Key(Key),Request(DeleteReq_t,mu) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; std::string *Key; }; diff --git a/test/basic_function_test.cc b/test/basic_function_test.cc index 28d9fc7..2918b15 100644 --- a/test/basic_function_test.cc +++ b/test/basic_function_test.cc @@ -19,6 +19,10 @@ TEST(TestLab1, Basic) { bool allowNotFound = false; GetFieldData(db, allowNotFound); findKeysByCity(db); + + DeleteFieldData(db); + GetDeleteData(db); + delete db; } @@ -40,10 +44,19 @@ TEST(TestLab2, Basic) { db->CreateIndexOnField("age"); findKeysByCityIndex(db, true); findKeysByAgeIndex(db, true); + db->DeleteIndex("address"); findKeysByCityIndex(db, false); findKeysByAgeIndex(db, true); + DeleteFieldData(db); + // GetDeleteData(db); + //helper太长不再封装函数了,这里因为数据都被delete了,但索引还在,所以能QueryByIndex但返回key数量0 + Field field = {"age", "20"}; + Status s; + std::vector resKeys = db->QueryByIndex(field, &s); + ASSERT_EQ(resKeys.size(), 0); + delete db; } diff --git a/test/helper.cc b/test/helper.cc index 9de92bc..b053871 100644 --- a/test/helper.cc +++ b/test/helper.cc @@ -15,7 +15,9 @@ std::vector cities = { //检查insert和queryByIndex的数据是否对应 std::set shanghaiKeys; std::set age20Keys; -//复杂的测试要注意这两个全局变量,目前只有InsertFieldData和InsertOneField会往里加,并且没有清理 +//复杂的测试要注意这两个全局变量, +//目前只有InsertFieldData和InsertOneField会往里加,DeleteFieldData会全部清空, +//其他测试之间有必要手动clear Status OpenDB(std::string dbName, FieldDB **db) { Options options; @@ -98,6 +100,25 @@ void InsertFieldData(FieldDB *db, int seed = 0/*随机种子*/) { } } +void DeleteFieldData(FieldDB *db, int seed = 0/*随机种子*/) { + std::cout << "-------deleting-------" << std::endl; + WriteOptions writeOptions; + int key_num = data_size / value_size; + // srand线程不安全,这种可以保证多线程时随机序列也一致 + std::mt19937 rng(seed); + + shanghaiKeys.clear(); + age20Keys.clear(); + for (int i = 0; i < key_num; i++) { + int randThisTime = rng(); //确保读写一个循环只rand一次,否则随机序列会不一致 + int key_ = std::abs(randThisTime) % key_num + 1; + std::string key = std::to_string(key_); + + Status s = db->Delete(WriteOptions(), key); + ASSERT_TRUE(s.ok()); + } +} + //并发时不一定能读到,加个参数控制 void GetFieldData(FieldDB *db, bool allowNotFound, int seed = 0) { std::cout << "-------getting-------" << std::endl; @@ -136,6 +157,24 @@ void GetFieldData(FieldDB *db, bool allowNotFound, int seed = 0) { } } +//检查对应种子有没有删除干净 +//删除期间即使其他种子也不能并发写,因为即使种子不同,随机出的key可能相同 +void GetDeleteData(FieldDB *db, int seed = 0) { + std::cout << "-------getting-------" << std::endl; + ReadOptions readOptions; + int key_num = data_size / value_size; + + std::mt19937 rng(seed); + for (int i = 0; i < 100; i++) { + int randThisTime = rng(); + int key_ = std::abs(randThisTime) % key_num + 1; + std::string key = std::to_string(key_); + FieldArray fields_ret; + Status s = db->GetFields(readOptions, key, &fields_ret); + ASSERT_TRUE(s.IsNotFound()); + } +} + void findKeysByCity(FieldDB *db) { std::cout << "-------getting field address-------" << std::endl; Field field = {"address", "Shanghai"};