From 462019353ea5085f55a34bed27187a5291e2242f Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Sat, 21 Dec 2024 12:14:46 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E4=BA=86=E7=82=B9=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=EF=BC=8C=E4=BF=AE=E4=BA=86=E7=82=B9bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fielddb/field_db.cpp | 6 ++-- fielddb/request.cpp | 32 ++++++++++--------- fielddb/request.h | 10 +++--- test/helper.cc | 35 +++++++++++++++++---- test/helper.h | 34 ++++++++++++++++++++ test/parallel_test.cc | 86 +++++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 175 insertions(+), 28 deletions(-) create mode 100644 test/helper.h diff --git a/fielddb/field_db.cpp b/fielddb/field_db.cpp index c5c5ad4..4c1cf81 100644 --- a/fielddb/field_db.cpp +++ b/fielddb/field_db.cpp @@ -82,7 +82,7 @@ Again: } Request *tail = GetHandleInterval(); WriteBatch KVBatch,IndexBatch,MetaBatch; - std::unordered_set batchKeySet; + std::unordered_set batchKeySet; Status status; if(!tail->isiCreateReq() && !tail->isiDeleteReq()) { //表明这一个区间并没有涉及index的创建删除 @@ -220,7 +220,7 @@ Status FieldDB::CreateIndexOnField(const std::string& field_name) { return req.s; } WriteBatch KVBatch,IndexBatch,MetaBatch; - std::unordered_set useless; + std::unordered_set useless; req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless); indexDB_->Write(WriteOptions(), &IndexBatch); req.Finalize(this); @@ -236,7 +236,7 @@ Status FieldDB::DeleteIndex(const std::string &field_name) { return req.s; } WriteBatch KVBatch,IndexBatch,MetaBatch; - std::unordered_set useless; + std::unordered_set useless; req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless); indexDB_->Write(WriteOptions(), &IndexBatch); req.Finalize(this); diff --git a/fielddb/request.cpp b/fielddb/request.cpp index ce2f18e..d29d668 100644 --- a/fielddb/request.cpp +++ b/fielddb/request.cpp @@ -19,7 +19,8 @@ void Request::PendReq(Request *req) { //为虚函数提供最基本的实现 void Request::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) + WriteBatch &MetaBatch,fielddb::FieldDB *DB, + std::unordered_set &batchKeySet) { assert(0); } @@ -41,12 +42,13 @@ bool Request::isPending() { /*******FieldsReq*******/ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) + WriteBatch &MetaBatch,fielddb::FieldDB *DB, + std::unordered_set &batchKeySet) { - if (batchKeySet.find(Key) != batchKeySet.end()){ + if (batchKeySet.find(*Key) != batchKeySet.end()){ return;//并发的被合并的put/delete请求只处理一次 } else { - batchKeySet.insert(Key); + batchKeySet.insert(*Key); } std::string val_str; Status s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str); @@ -59,8 +61,6 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, assert(0); } - - KVBatch.Put(Slice(*Key), Slice(SerializeValue(*Fields))); bool HasIndex = false; bool HasOldIndex = false; { @@ -96,7 +96,8 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, } } } - + + KVBatch.Put(Slice(*Key), Slice(SerializeValue(*Fields))); //2.对于没有冲突但含有索引操作的put,构建metaKV,这里直接将KV对简单编码后写入metaDB if(HasIndex || HasOldIndex) { Slice MetaKey,MetaValue; @@ -140,12 +141,13 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, /*******DeleteReq*******/ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) + WriteBatch &MetaBatch,fielddb::FieldDB *DB, + std::unordered_set &batchKeySet) { - if (batchKeySet.find(Key) != batchKeySet.end()){ + if (batchKeySet.find(*Key) != batchKeySet.end()){ return;//并发的被合并的put/delete请求只处理一次 } else { - batchKeySet.insert(Key); + batchKeySet.insert(*Key); } //1. 读取当前的最新的键值对,判断是否存在含有键值对的field //2.1 如果无,则正常构造delete @@ -153,9 +155,9 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, //在kvDB和metaDB中写入对应的delete //2.3 如果存在field的索引状态是Creating或者Deleting,那么在那个队列上面进行等待 std::string val_str; - DB->kvDB_->Get(ReadOptions(), *Key, &val_str); + Status s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str); + if (s.IsNotFound()) return; FieldArray *Fields = ParseValue(val_str); - KVBatch.Delete(Slice(*Key)); bool HasIndex = false; { // MutexLock L(&DB->index_mu); //互斥访问索引状态表 @@ -174,6 +176,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, //assert(0); } } + KVBatch.Delete(Slice(*Key)); //2.对于没有冲突但含有索引操作的delete,构建metaKV,这里直接将KV对简单编码后写入metaDB if(HasIndex) { Slice MetaKey; @@ -223,7 +226,8 @@ void iCreateReq::PendReq(Request *req) { } void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) + WriteBatch &MetaBatch,fielddb::FieldDB *DB, + std::unordered_set &batchKeySet) { //遍历数据库,构建二级索引到indexbatch,(更新metaDB中的元数据为Index类型的(Field,Creating)) //一个indexwritebatch写入,那么索引创建删除应该和metadb没有交互 @@ -282,7 +286,7 @@ void iDeleteReq::PendReq(Request* req) { } void iDeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) { std::vector> keysAndVal = DB->FindKeysAndValByFieldName(*Field); diff --git a/fielddb/request.h b/fielddb/request.h index 1ebebfd..e1ab1e8 100644 --- a/fielddb/request.h +++ b/fielddb/request.h @@ -44,7 +44,7 @@ public: //用于含有Fields的 virtual void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet); + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet); //主要用于icreate和idelete在队列中的注册当前状态 virtual void Prepare(FieldDB *DB); virtual void Finalize(FieldDB *DB); @@ -66,7 +66,7 @@ public: Key(Key),Fields(Fields),Request(FieldsReq_t,mu) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; std::string *Key; FieldArray *Fields; @@ -91,7 +91,7 @@ public: Field(Field),Request(iCreateReq_t, mu),Existed(false) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; void Prepare(FieldDB *DB) override; void Finalize(FieldDB *DB) override; void PendReq(Request *req) override; @@ -108,7 +108,7 @@ public: Field(Field),Request(iDeleteReq_t, mu),Deleted(false) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; void Prepare(FieldDB *DB) override; void Finalize(FieldDB *DB) override; void PendReq(Request *req) override; @@ -125,7 +125,7 @@ public: Key(Key),Request(DeleteReq_t,mu) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; std::string *Key; }; diff --git a/test/helper.cc b/test/helper.cc index b053871..105c05b 100644 --- a/test/helper.cc +++ b/test/helper.cc @@ -3,6 +3,7 @@ // #include "leveldb/db.h" #include "fielddb/field_db.h" #include +#include "helper.h" using namespace fielddb; constexpr int value_size = 2048; @@ -13,10 +14,12 @@ std::vector cities = { "Chengdu", "Chongqing", "Wuhan", "Suzhou", "Tianjin" }; //检查insert和queryByIndex的数据是否对应 -std::set shanghaiKeys; -std::set age20Keys; +//封装了一个线程安全的全局set + +ThreadSafeSet shanghaiKeys; +ThreadSafeSet age20Keys; //复杂的测试要注意这两个全局变量, -//目前只有InsertFieldData和InsertOneField会往里加,DeleteFieldData会全部清空, +//目前只有InsertFieldData和InsertOneField会往里加,DeleteFieldData和InsertOneField会删除, //其他测试之间有必要手动clear Status OpenDB(std::string dbName, FieldDB **db) { @@ -51,6 +54,15 @@ void InsertOneField(FieldDB *db, std::string key = "0") { age20Keys.insert(key); } +//只删一条特定数据的测试 +void DeleteOneField(FieldDB *db, std::string key = "0") { + WriteOptions writeOptions; + Status s = db->Delete(WriteOptions(), key); + ASSERT_TRUE(s.ok()); + shanghaiKeys.erase(key); + age20Keys.erase(key); +} + //与上面对应 void GetOneField(FieldDB *db, std::string key = "0") { ReadOptions readOptions; @@ -183,7 +195,7 @@ void findKeysByCity(FieldDB *db) { //如果随机种子相同,每次打印出的两个数也应该相同 std::cout << "address: " << shanghaiKeys.size() << " " << resKeys.size() << std::endl; for (const std::string &key : resKeys){ - ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end()); + ASSERT_TRUE(shanghaiKeys.haveKey(key)); } } @@ -200,7 +212,7 @@ void findKeysByCityIndex(FieldDB *db, bool haveIndex) { } std::cout << "address: " << shanghaiKeys.size() << " " << resKeys.size() << std::endl;//打印比较 for (const std::string &key : resKeys){ - ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end()); + ASSERT_TRUE(shanghaiKeys.haveKey(key)); } } @@ -216,6 +228,17 @@ void findKeysByAgeIndex(FieldDB *db, bool haveIndex) { } std::cout << "age: " << age20Keys.size() << " " << resKeys.size() << std::endl; for (const std::string &key : resKeys){ - ASSERT_NE(std::find(age20Keys.begin(), age20Keys.end(), key), age20Keys.end()); + ASSERT_TRUE(age20Keys.haveKey(key)); } +} + +void checkDataInKVAndIndex(FieldDB *db) { + Field field = {"address", "Shanghai"}; + Status s; + std::vector resKeys1 = db->QueryByIndex(field, &s); //indexdb根据索引查到的数据 + std::vector resKeys2 = db->FindKeysByField(field); //kvdb强行遍历查到的数据 + std::sort(resKeys1.begin(), resKeys1.end()); + std::sort(resKeys2.begin(), resKeys2.end()); + std::cout << resKeys1.size() << " " << resKeys2.size() << std::endl; + ASSERT_EQ(resKeys1, resKeys2); } \ No newline at end of file diff --git a/test/helper.h b/test/helper.h new file mode 100644 index 0000000..76752cb --- /dev/null +++ b/test/helper.h @@ -0,0 +1,34 @@ +#include "fielddb/field_db.h" +using namespace fielddb; +class ThreadSafeSet +{ +private: + std::set keys; + std::mutex setMutex; +public: + ThreadSafeSet(){} + + void insert(std::string key){ + std::lock_guard lock(setMutex); + keys.insert(key); + } + + void erase(std::string key){ + std::lock_guard lock(setMutex); + keys.erase(key); + } + + void clear(){ + std::lock_guard lock(setMutex); + keys.clear(); + } + + size_t size(){ + std::lock_guard lock(setMutex); + return keys.size(); + } + + bool haveKey(std::string key){ + return std::find(keys.begin(), keys.end(), key) != keys.end(); + } +}; \ No newline at end of file diff --git a/test/parallel_test.cc b/test/parallel_test.cc index 98bf457..80c28ea 100644 --- a/test/parallel_test.cc +++ b/test/parallel_test.cc @@ -18,6 +18,8 @@ TEST(TestReadWrite, Parallel) { abort(); } // ClearDB(db); + db->CreateIndexOnField("address"); + db->CreateIndexOnField("age"); int thread_num_ = 5; std::vector threads(thread_num_); //二写三读 @@ -45,6 +47,7 @@ TEST(TestReadWrite, Parallel) { GetFieldData(db, allowNotFound); GetFieldData(db, allowNotFound, 1); findKeysByCity(db); + checkDataInKVAndIndex(db); delete db; } @@ -90,6 +93,7 @@ TEST(TestWriteCreatei, Parallel) { findKeysByCityIndex(db, haveIndex); //检查写入是否成功 GetOneField(db); + checkDataInKVAndIndex(db); delete db; } @@ -128,6 +132,7 @@ TEST(TestCreateiCreatei, Parallel) { bool haveIndex = true; findKeysByCityIndex(db, haveIndex); findKeysByAgeIndex(db, false); + checkDataInKVAndIndex(db); for (size_t i = 0; i < thread_num_; i++) @@ -156,6 +161,87 @@ TEST(TestCreateiCreatei, Parallel) { delete db; } + +//有索引时,大量并发put与delete相同key,确保kvdb和indexdb的一致性 +TEST(TestPutDeleteOne, Parallel) { + fielddb::DestroyDB("testdb2.4",Options()); + FieldDB *db = new FieldDB(); + + if(OpenDB("testdb2.4", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + // ClearDB(db); + shanghaiKeys.clear(); + age20Keys.clear(); + db->CreateIndexOnField("address"); + db->CreateIndexOnField("age"); + int thread_num_ = 20; + std::vector threads(thread_num_); + for (size_t i = 0; i < thread_num_; i++) + { + if (i % 2 == 0) {//2线程删除索引address + threads[i] = std::thread([db](){ + for (size_t j = 0; j < 100; j++) + { + InsertOneField(db, std::to_string(j)); + } + }); + } else {//1线程创建索引age + threads[i] = std::thread([db](){ + for (size_t j = 0; j < 100; j++) + { + DeleteOneField(db, std::to_string(j)); + } + }); + } + } + + for (auto& t : threads) { + if (t.joinable()) { + t.join(); + } + } + + //检查 + checkDataInKVAndIndex(db); + delete db; +} + +//有索引时,put与delete的并发,确保kvdb和indexdb的一致性 +TEST(TestPutDelete, Parallel) { + fielddb::DestroyDB("testdb2.5",Options()); + FieldDB *db = new FieldDB(); + + if(OpenDB("testdb2.5", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + // ClearDB(db); + shanghaiKeys.clear(); + age20Keys.clear(); + db->CreateIndexOnField("address"); + db->CreateIndexOnField("age"); + int thread_num_ = 4; + std::vector threads(thread_num_); + threads[0] = std::thread([db](){InsertFieldData(db);}); + threads[1] = std::thread([db](){InsertFieldData(db, 1);}); + threads[2] = std::thread([db](){DeleteFieldData(db);}); + threads[3] = std::thread([db](){DeleteFieldData(db, 1);}); + + for (auto& t : threads) { + if (t.joinable()) { + t.join(); + } + } + + //检查 + checkDataInKVAndIndex(db); + delete db; +} + int main(int argc, char** argv) { // All tests currently run with the same read-only file limits. testing::InitGoogleTest(&argc, argv);