From 462019353ea5085f55a34bed27187a5291e2242f Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Sat, 21 Dec 2024 12:14:46 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E5=8A=A0=E4=BA=86=E7=82=B9=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=EF=BC=8C=E4=BF=AE=E4=BA=86=E7=82=B9bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fielddb/field_db.cpp | 6 ++-- fielddb/request.cpp | 32 ++++++++++--------- fielddb/request.h | 10 +++--- test/helper.cc | 35 +++++++++++++++++---- test/helper.h | 34 ++++++++++++++++++++ test/parallel_test.cc | 86 +++++++++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 175 insertions(+), 28 deletions(-) create mode 100644 test/helper.h diff --git a/fielddb/field_db.cpp b/fielddb/field_db.cpp index c5c5ad4..4c1cf81 100644 --- a/fielddb/field_db.cpp +++ b/fielddb/field_db.cpp @@ -82,7 +82,7 @@ Again: } Request *tail = GetHandleInterval(); WriteBatch KVBatch,IndexBatch,MetaBatch; - std::unordered_set batchKeySet; + std::unordered_set batchKeySet; Status status; if(!tail->isiCreateReq() && !tail->isiDeleteReq()) { //表明这一个区间并没有涉及index的创建删除 @@ -220,7 +220,7 @@ Status FieldDB::CreateIndexOnField(const std::string& field_name) { return req.s; } WriteBatch KVBatch,IndexBatch,MetaBatch; - std::unordered_set useless; + std::unordered_set useless; req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless); indexDB_->Write(WriteOptions(), &IndexBatch); req.Finalize(this); @@ -236,7 +236,7 @@ Status FieldDB::DeleteIndex(const std::string &field_name) { return req.s; } WriteBatch KVBatch,IndexBatch,MetaBatch; - std::unordered_set useless; + std::unordered_set useless; req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless); indexDB_->Write(WriteOptions(), &IndexBatch); req.Finalize(this); diff --git a/fielddb/request.cpp b/fielddb/request.cpp index ce2f18e..d29d668 100644 --- a/fielddb/request.cpp +++ b/fielddb/request.cpp @@ -19,7 +19,8 @@ void Request::PendReq(Request *req) { //为虚函数提供最基本的实现 void Request::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) + WriteBatch &MetaBatch,fielddb::FieldDB *DB, + std::unordered_set &batchKeySet) { assert(0); } @@ -41,12 +42,13 @@ bool Request::isPending() { /*******FieldsReq*******/ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) + WriteBatch &MetaBatch,fielddb::FieldDB *DB, + std::unordered_set &batchKeySet) { - if (batchKeySet.find(Key) != batchKeySet.end()){ + if (batchKeySet.find(*Key) != batchKeySet.end()){ return;//并发的被合并的put/delete请求只处理一次 } else { - batchKeySet.insert(Key); + batchKeySet.insert(*Key); } std::string val_str; Status s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str); @@ -59,8 +61,6 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, assert(0); } - - KVBatch.Put(Slice(*Key), Slice(SerializeValue(*Fields))); bool HasIndex = false; bool HasOldIndex = false; { @@ -96,7 +96,8 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, } } } - + + KVBatch.Put(Slice(*Key), Slice(SerializeValue(*Fields))); //2.对于没有冲突但含有索引操作的put,构建metaKV,这里直接将KV对简单编码后写入metaDB if(HasIndex || HasOldIndex) { Slice MetaKey,MetaValue; @@ -140,12 +141,13 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, /*******DeleteReq*******/ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) + WriteBatch &MetaBatch,fielddb::FieldDB *DB, + std::unordered_set &batchKeySet) { - if (batchKeySet.find(Key) != batchKeySet.end()){ + if (batchKeySet.find(*Key) != batchKeySet.end()){ return;//并发的被合并的put/delete请求只处理一次 } else { - batchKeySet.insert(Key); + batchKeySet.insert(*Key); } //1. 读取当前的最新的键值对,判断是否存在含有键值对的field //2.1 如果无,则正常构造delete @@ -153,9 +155,9 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, //在kvDB和metaDB中写入对应的delete //2.3 如果存在field的索引状态是Creating或者Deleting,那么在那个队列上面进行等待 std::string val_str; - DB->kvDB_->Get(ReadOptions(), *Key, &val_str); + Status s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str); + if (s.IsNotFound()) return; FieldArray *Fields = ParseValue(val_str); - KVBatch.Delete(Slice(*Key)); bool HasIndex = false; { // MutexLock L(&DB->index_mu); //互斥访问索引状态表 @@ -174,6 +176,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, //assert(0); } } + KVBatch.Delete(Slice(*Key)); //2.对于没有冲突但含有索引操作的delete,构建metaKV,这里直接将KV对简单编码后写入metaDB if(HasIndex) { Slice MetaKey; @@ -223,7 +226,8 @@ void iCreateReq::PendReq(Request *req) { } void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) + WriteBatch &MetaBatch,fielddb::FieldDB *DB, + std::unordered_set &batchKeySet) { //遍历数据库,构建二级索引到indexbatch,(更新metaDB中的元数据为Index类型的(Field,Creating)) //一个indexwritebatch写入,那么索引创建删除应该和metadb没有交互 @@ -282,7 +286,7 @@ void iDeleteReq::PendReq(Request* req) { } void iDeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) { std::vector> keysAndVal = DB->FindKeysAndValByFieldName(*Field); diff --git a/fielddb/request.h b/fielddb/request.h index 1ebebfd..e1ab1e8 100644 --- a/fielddb/request.h +++ b/fielddb/request.h @@ -44,7 +44,7 @@ public: //用于含有Fields的 virtual void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet); + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet); //主要用于icreate和idelete在队列中的注册当前状态 virtual void Prepare(FieldDB *DB); virtual void Finalize(FieldDB *DB); @@ -66,7 +66,7 @@ public: Key(Key),Fields(Fields),Request(FieldsReq_t,mu) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; std::string *Key; FieldArray *Fields; @@ -91,7 +91,7 @@ public: Field(Field),Request(iCreateReq_t, mu),Existed(false) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; void Prepare(FieldDB *DB) override; void Finalize(FieldDB *DB) override; void PendReq(Request *req) override; @@ -108,7 +108,7 @@ public: Field(Field),Request(iDeleteReq_t, mu),Deleted(false) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; void Prepare(FieldDB *DB) override; void Finalize(FieldDB *DB) override; void PendReq(Request *req) override; @@ -125,7 +125,7 @@ public: Key(Key),Request(DeleteReq_t,mu) { }; void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; std::string *Key; }; diff --git a/test/helper.cc b/test/helper.cc index b053871..105c05b 100644 --- a/test/helper.cc +++ b/test/helper.cc @@ -3,6 +3,7 @@ // #include "leveldb/db.h" #include "fielddb/field_db.h" #include +#include "helper.h" using namespace fielddb; constexpr int value_size = 2048; @@ -13,10 +14,12 @@ std::vector cities = { "Chengdu", "Chongqing", "Wuhan", "Suzhou", "Tianjin" }; //检查insert和queryByIndex的数据是否对应 -std::set shanghaiKeys; -std::set age20Keys; +//封装了一个线程安全的全局set + +ThreadSafeSet shanghaiKeys; +ThreadSafeSet age20Keys; //复杂的测试要注意这两个全局变量, -//目前只有InsertFieldData和InsertOneField会往里加,DeleteFieldData会全部清空, +//目前只有InsertFieldData和InsertOneField会往里加,DeleteFieldData和InsertOneField会删除, //其他测试之间有必要手动clear Status OpenDB(std::string dbName, FieldDB **db) { @@ -51,6 +54,15 @@ void InsertOneField(FieldDB *db, std::string key = "0") { age20Keys.insert(key); } +//只删一条特定数据的测试 +void DeleteOneField(FieldDB *db, std::string key = "0") { + WriteOptions writeOptions; + Status s = db->Delete(WriteOptions(), key); + ASSERT_TRUE(s.ok()); + shanghaiKeys.erase(key); + age20Keys.erase(key); +} + //与上面对应 void GetOneField(FieldDB *db, std::string key = "0") { ReadOptions readOptions; @@ -183,7 +195,7 @@ void findKeysByCity(FieldDB *db) { //如果随机种子相同,每次打印出的两个数也应该相同 std::cout << "address: " << shanghaiKeys.size() << " " << resKeys.size() << std::endl; for (const std::string &key : resKeys){ - ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end()); + ASSERT_TRUE(shanghaiKeys.haveKey(key)); } } @@ -200,7 +212,7 @@ void findKeysByCityIndex(FieldDB *db, bool haveIndex) { } std::cout << "address: " << shanghaiKeys.size() << " " << resKeys.size() << std::endl;//打印比较 for (const std::string &key : resKeys){ - ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end()); + ASSERT_TRUE(shanghaiKeys.haveKey(key)); } } @@ -216,6 +228,17 @@ void findKeysByAgeIndex(FieldDB *db, bool haveIndex) { } std::cout << "age: " << age20Keys.size() << " " << resKeys.size() << std::endl; for (const std::string &key : resKeys){ - ASSERT_NE(std::find(age20Keys.begin(), age20Keys.end(), key), age20Keys.end()); + ASSERT_TRUE(age20Keys.haveKey(key)); } +} + +void checkDataInKVAndIndex(FieldDB *db) { + Field field = {"address", "Shanghai"}; + Status s; + std::vector resKeys1 = db->QueryByIndex(field, &s); //indexdb根据索引查到的数据 + std::vector resKeys2 = db->FindKeysByField(field); //kvdb强行遍历查到的数据 + std::sort(resKeys1.begin(), resKeys1.end()); + std::sort(resKeys2.begin(), resKeys2.end()); + std::cout << resKeys1.size() << " " << resKeys2.size() << std::endl; + ASSERT_EQ(resKeys1, resKeys2); } \ No newline at end of file diff --git a/test/helper.h b/test/helper.h new file mode 100644 index 0000000..76752cb --- /dev/null +++ b/test/helper.h @@ -0,0 +1,34 @@ +#include "fielddb/field_db.h" +using namespace fielddb; +class ThreadSafeSet +{ +private: + std::set keys; + std::mutex setMutex; +public: + ThreadSafeSet(){} + + void insert(std::string key){ + std::lock_guard lock(setMutex); + keys.insert(key); + } + + void erase(std::string key){ + std::lock_guard lock(setMutex); + keys.erase(key); + } + + void clear(){ + std::lock_guard lock(setMutex); + keys.clear(); + } + + size_t size(){ + std::lock_guard lock(setMutex); + return keys.size(); + } + + bool haveKey(std::string key){ + return std::find(keys.begin(), keys.end(), key) != keys.end(); + } +}; \ No newline at end of file diff --git a/test/parallel_test.cc b/test/parallel_test.cc index 98bf457..80c28ea 100644 --- a/test/parallel_test.cc +++ b/test/parallel_test.cc @@ -18,6 +18,8 @@ TEST(TestReadWrite, Parallel) { abort(); } // ClearDB(db); + db->CreateIndexOnField("address"); + db->CreateIndexOnField("age"); int thread_num_ = 5; std::vector threads(thread_num_); //二写三读 @@ -45,6 +47,7 @@ TEST(TestReadWrite, Parallel) { GetFieldData(db, allowNotFound); GetFieldData(db, allowNotFound, 1); findKeysByCity(db); + checkDataInKVAndIndex(db); delete db; } @@ -90,6 +93,7 @@ TEST(TestWriteCreatei, Parallel) { findKeysByCityIndex(db, haveIndex); //检查写入是否成功 GetOneField(db); + checkDataInKVAndIndex(db); delete db; } @@ -128,6 +132,7 @@ TEST(TestCreateiCreatei, Parallel) { bool haveIndex = true; findKeysByCityIndex(db, haveIndex); findKeysByAgeIndex(db, false); + checkDataInKVAndIndex(db); for (size_t i = 0; i < thread_num_; i++) @@ -156,6 +161,87 @@ TEST(TestCreateiCreatei, Parallel) { delete db; } + +//有索引时,大量并发put与delete相同key,确保kvdb和indexdb的一致性 +TEST(TestPutDeleteOne, Parallel) { + fielddb::DestroyDB("testdb2.4",Options()); + FieldDB *db = new FieldDB(); + + if(OpenDB("testdb2.4", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + // ClearDB(db); + shanghaiKeys.clear(); + age20Keys.clear(); + db->CreateIndexOnField("address"); + db->CreateIndexOnField("age"); + int thread_num_ = 20; + std::vector threads(thread_num_); + for (size_t i = 0; i < thread_num_; i++) + { + if (i % 2 == 0) {//2线程删除索引address + threads[i] = std::thread([db](){ + for (size_t j = 0; j < 100; j++) + { + InsertOneField(db, std::to_string(j)); + } + }); + } else {//1线程创建索引age + threads[i] = std::thread([db](){ + for (size_t j = 0; j < 100; j++) + { + DeleteOneField(db, std::to_string(j)); + } + }); + } + } + + for (auto& t : threads) { + if (t.joinable()) { + t.join(); + } + } + + //检查 + checkDataInKVAndIndex(db); + delete db; +} + +//有索引时,put与delete的并发,确保kvdb和indexdb的一致性 +TEST(TestPutDelete, Parallel) { + fielddb::DestroyDB("testdb2.5",Options()); + FieldDB *db = new FieldDB(); + + if(OpenDB("testdb2.5", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + // ClearDB(db); + shanghaiKeys.clear(); + age20Keys.clear(); + db->CreateIndexOnField("address"); + db->CreateIndexOnField("age"); + int thread_num_ = 4; + std::vector threads(thread_num_); + threads[0] = std::thread([db](){InsertFieldData(db);}); + threads[1] = std::thread([db](){InsertFieldData(db, 1);}); + threads[2] = std::thread([db](){DeleteFieldData(db);}); + threads[3] = std::thread([db](){DeleteFieldData(db, 1);}); + + for (auto& t : threads) { + if (t.joinable()) { + t.join(); + } + } + + //检查 + checkDataInKVAndIndex(db); + delete db; +} + int main(int argc, char** argv) { // All tests currently run with the same read-only file limits. testing::InitGoogleTest(&argc, argv); From d5e46b56b375e6f974f6b817bfa193a3fea97869 Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Sun, 22 Dec 2024 15:51:27 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=B9=B6=E9=80=9A?= =?UTF-8?q?=E8=BF=87=E4=BA=86write=E7=9B=B8=E5=85=B3=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fielddb/request.cpp | 18 ++++++++++---- fielddb/request.h | 2 +- test/basic_function_test.cc | 3 +++ test/helper.cc | 45 ++++++++++++++++++++++++++++++++--- test/parallel_test.cc | 57 +++++++++++++++++++++++++++++++++++++++++++-- 5 files changed, 114 insertions(+), 11 deletions(-) diff --git a/fielddb/request.cpp b/fielddb/request.cpp index f08937b..35524ee 100644 --- a/fielddb/request.cpp +++ b/fielddb/request.cpp @@ -286,7 +286,7 @@ void iDeleteReq::Prepare(FieldDB *DB) { done = true; } else { //如果正在创建或者删除,那么pend到对应的请求上 - parent->PendReq(this); + parent->PendReq(this->parent); } } @@ -334,9 +334,17 @@ BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu): void Put(const Slice &key, const Slice &value) override { //为key和value构造存储的地方,防止由于string的析构造成可能得内存访问错误 str_buf->push_back(key.ToString()); - fa_buf->push_back({{"",value.ToString()}}); + FieldArray *field = new FieldArray; + field = ParseValue(value.ToString(), field); + if (field == nullptr){ //batch中的value没有field + fa_buf->push_back({{"",value.ToString()}}); + } else { + fa_buf->push_back(*field); + } + sub_requests->emplace_back(new FieldsReq(&str_buf->back(),&fa_buf->back(),mu)); sub_requests->back()->parent = req; + delete field; } void Delete(const Slice &key) override { str_buf->push_back(key.ToString()); @@ -370,10 +378,10 @@ BatchReq::~BatchReq() { } void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) { WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch; - std::unordered_set Sub_batchKeySet; + std::unordered_set Sub_batchKeySet; //由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代 for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) { (*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet); @@ -386,7 +394,7 @@ void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, KVBatch.Append(Sub_KVBatch); IndexBatch.Append(Sub_IndexBatch); MetaBatch.Append(Sub_MetaBatch); - batchKeySet.insert(batchKeySet.begin(),batchKeySet.end()); + batchKeySet.insert(Sub_batchKeySet.begin(),Sub_batchKeySet.end()); } diff --git a/fielddb/request.h b/fielddb/request.h index de391e0..19faf49 100644 --- a/fielddb/request.h +++ b/fielddb/request.h @@ -138,7 +138,7 @@ public: ~BatchReq(); void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; WriteBatch *Batch; std::deque sub_requests; diff --git a/test/basic_function_test.cc b/test/basic_function_test.cc index 2918b15..729fb73 100644 --- a/test/basic_function_test.cc +++ b/test/basic_function_test.cc @@ -57,6 +57,9 @@ TEST(TestLab2, Basic) { std::vector resKeys = db->QueryByIndex(field, &s); ASSERT_EQ(resKeys.size(), 0); + WriteFieldData(db); + GetFieldData(db, false); + findKeysByAgeIndex(db, true); delete db; } diff --git a/test/helper.cc b/test/helper.cc index 105c05b..a0b9e79 100644 --- a/test/helper.cc +++ b/test/helper.cc @@ -19,7 +19,8 @@ std::vector cities = { ThreadSafeSet shanghaiKeys; ThreadSafeSet age20Keys; //复杂的测试要注意这两个全局变量, -//目前只有InsertFieldData和InsertOneField会往里加,DeleteFieldData和InsertOneField会删除, +//目前只有InsertFieldData和InsertOneField和writeFieldData会往里加, +//DeleteFieldData和InsertOneField会删除, //其他测试之间有必要手动clear Status OpenDB(std::string dbName, FieldDB **db) { @@ -131,6 +132,41 @@ void DeleteFieldData(FieldDB *db, int seed = 0/*随机种子*/) { } } +void WriteFieldData(FieldDB *db, int seed = 0/*随机种子*/) { + std::cout << "-------writing-------" << std::endl; + WriteOptions writeOptions; + int key_num = data_size / value_size; + // srand线程不安全,这种可以保证多线程时随机序列也一致 + std::mt19937 rng(seed); + + WriteBatch wb; + for (int i = 0; i < key_num; i++) { + int randThisTime = rng(); //确保读写一个循环只rand一次,否则随机序列会不一致 + //让批量写入的key>0, 单独写入的key<=0,方便测试观察 + int key_ = std::abs(randThisTime) % key_num + 1; + std::string key = std::to_string(key_); + + std::string name = "customer#" + std::to_string(key_); + std::string address = cities[randThisTime % cities.size()]; + std::string age = std::to_string(std::abs(randThisTime) % AGE_RANGE); + FieldArray fields = { + {"name", name}, + {"address", address}, + {"age", age} + }; + if (address == "Shanghai") { + shanghaiKeys.insert(key); + } + if (age == "20") { + age20Keys.insert(key); + } + wb.Put(key, SerializeValue(fields)); + + } + Status s = db->Write(writeOptions, &wb); + ASSERT_TRUE(s.ok()); +} + //并发时不一定能读到,加个参数控制 void GetFieldData(FieldDB *db, bool allowNotFound, int seed = 0) { std::cout << "-------getting-------" << std::endl; @@ -232,8 +268,11 @@ void findKeysByAgeIndex(FieldDB *db, bool haveIndex) { } } -void checkDataInKVAndIndex(FieldDB *db) { - Field field = {"address", "Shanghai"}; +void checkDataInKVAndIndex(FieldDB *db, std::string fieldName = "address") { + Field field; + if (fieldName == "address") field = {"address", "Shanghai"}; + else if (fieldName == "age") field = {"age", "20"}; + else assert(0);//只支持这两个字段检查 Status s; std::vector resKeys1 = db->QueryByIndex(field, &s); //indexdb根据索引查到的数据 std::vector resKeys2 = db->FindKeysByField(field); //kvdb强行遍历查到的数据 diff --git a/test/parallel_test.cc b/test/parallel_test.cc index 80c28ea..6925df7 100644 --- a/test/parallel_test.cc +++ b/test/parallel_test.cc @@ -9,7 +9,7 @@ using namespace fielddb; // 测试中read/write都表示带索引的读写 //读写有索引数据的并发 -TEST(TestReadWrite, Parallel) { +TEST(TestReadPut, Parallel) { fielddb::DestroyDB("testdb2.1",Options()); FieldDB *db = new FieldDB(); @@ -52,7 +52,7 @@ TEST(TestReadWrite, Parallel) { } //创建索引与写有该索引数据的并发 -TEST(TestWriteCreatei, Parallel) { +TEST(TestPutCreatei, Parallel) { fielddb::DestroyDB("testdb2.2",Options()); FieldDB *db = new FieldDB(); @@ -242,6 +242,59 @@ TEST(TestPutDelete, Parallel) { delete db; } +//write和其他功能的并发(大杂烩 +TEST(TestWrite, Parallel) { + fielddb::DestroyDB("testdb2.6",Options()); + FieldDB *db = new FieldDB(); + + if(OpenDB("testdb2.6", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + // ClearDB(db); + shanghaiKeys.clear(); + age20Keys.clear(); + db->CreateIndexOnField("address"); + InsertFieldData(db, 2); //先填点数据,让创建索引的时间久一点 + int thread_num_ = 5; + std::vector threads(thread_num_); + threads[0] = std::thread([db](){db->CreateIndexOnField("age");}); + threads[1] = std::thread([db](){ + while (db->GetIndexStatus("age") == NotExist){ + continue; //开始创建了再并发的写 + } + InsertFieldData(db);}); + threads[2] = std::thread([db](){ + while (db->GetIndexStatus("age") == NotExist){ + continue; + } + WriteFieldData(db, 1);}); + threads[3] = std::thread([db](){ + while (db->GetIndexStatus("age") == NotExist){ + continue; + } + DeleteFieldData(db, 0);}); + threads[4] = std::thread([db](){ + while (db->GetIndexStatus("age") == NotExist){ + continue; + } + db->DeleteIndex("age");}); + + for (auto& t : threads) { + if (t.joinable()) { + t.join(); + } + } + + //检查 + checkDataInKVAndIndex(db); + ASSERT_EQ(db->GetIndexStatus("age"), NotExist); //删除索引的请求应该被pend在创建之上 + //删掉最后一个线程,可以测试创建age索引时并发的写入能不能保持age的一致性 + //checkDataInKVAndIndex(db, "age"); + delete db; +} + int main(int argc, char** argv) { // All tests currently run with the same read-only file limits. testing::InitGoogleTest(&argc, argv); From 6c0b64cfa391ac5b17d2ea3ecd4d6a94c8947d02 Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Sun, 22 Dec 2024 20:13:50 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E6=AD=A3=E5=B8=B8=E9=80=80=E5=87=BA?= =?UTF-8?q?=E7=9A=84=E6=81=A2=E5=A4=8D=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 5 +++ fielddb/field_db.cpp | 13 +++++--- fielddb/field_db.h | 1 + test/recover_test.cc | 86 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 101 insertions(+), 4 deletions(-) create mode 100644 test/recover_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index fa7468f..aadc1cf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -535,3 +535,8 @@ add_executable(parallel_test "${PROJECT_SOURCE_DIR}/test/parallel_test.cc" ) target_link_libraries(parallel_test PRIVATE leveldb gtest) + +add_executable(recover_test + "${PROJECT_SOURCE_DIR}/test/recover_test.cc" +) +target_link_libraries(recover_test PRIVATE leveldb gtest) diff --git a/fielddb/field_db.cpp b/fielddb/field_db.cpp index c7e094e..a83960f 100644 --- a/fielddb/field_db.cpp +++ b/fielddb/field_db.cpp @@ -60,11 +60,11 @@ Status FieldDB::Recover() { std::string IndexKey; Iter->SeekToFirst(); while(Iter->Valid()) { - IndexKey = Iter->value().ToString(); + IndexKey = Iter->key().ToString(); ParsedInternalIndexKey ParsedIndex; ParseInternalIndexKey(Slice(IndexKey),&ParsedIndex); index_[ParsedIndex.name_.ToString()] = {Exist,nullptr}; - std::cout << "Existed Index : " << ParsedIndex.name_.ToString() << std::endl; + //std::cout << "Existed Index : " << ParsedIndex.name_.ToString() << std::endl; //构建下一个搜索的对象,在原来的fieldname的基础上加一个最大的ascii字符(不可见字符) //TODO:不知道这个做法有没有道理 @@ -183,8 +183,7 @@ Again: // return status; } - -//这里把一个空串作为常规put的name +// 这里把一个空串作为常规put的name Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) { FieldArray FA = {{"",value.ToString()}}; return PutFields(options, key, FA); @@ -364,4 +363,10 @@ Status DestroyDB(const std::string& name, const Options& options) { return s; } +FieldDB::~FieldDB() { + delete indexDB_; + delete kvDB_; + delete metaDB_; +} + } // namespace fielddb diff --git a/fielddb/field_db.h b/fielddb/field_db.h index 684a820..f0fe5f2 100644 --- a/fielddb/field_db.h +++ b/fielddb/field_db.h @@ -33,6 +33,7 @@ public: //用的时候必须FieldDB *db = new FieldDB()再open,不能像之前一样DB *db FieldDB() : indexDB_(nullptr), kvDB_(nullptr), metaDB_(nullptr) {}; + ~FieldDB(); /*lab1的要求,作为db派生类要实现的虚函数*/ Status Put(const WriteOptions &options, const Slice &key, const Slice &value) override; Status PutFields(const WriteOptions &, const Slice &key, const FieldArray &fields) override; diff --git a/test/recover_test.cc b/test/recover_test.cc new file mode 100644 index 0000000..4b05c1d --- /dev/null +++ b/test/recover_test.cc @@ -0,0 +1,86 @@ +#include "gtest/gtest.h" +// #include "leveldb/env.h" +// #include "leveldb/db.h" +#include "fielddb/field_db.h" +#include "test/helper.cc" +#include +#include +#include +using namespace fielddb; + +// std::atomic thread_has_error(false); + +// void signalHandler(int signum) { +// // 捕捉段错误 +// } + +TEST(TestNormalRecover, Recover) { + fielddb::DestroyDB("testdb3.1",Options()); + FieldDB *db = new FieldDB(); + + if(OpenDB("testdb3.1", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + db->CreateIndexOnField("address"); + db->CreateIndexOnField("age"); + InsertFieldData(db); + bool allowNotFound = false; + GetFieldData(db, allowNotFound); + findKeysByCityIndex(db, true); + findKeysByAgeIndex(db, true); + + delete db; + db = new FieldDB(); + if(OpenDB("testdb3.1", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + //仍然能读到之前写入的数据和索引 + GetFieldData(db, allowNotFound); + findKeysByCityIndex(db, true); + findKeysByAgeIndex(db, true); +} + +// TEST(TestParalPutRecover, Recover) { +// signal(SIGSEGV, signalHandler); +// fielddb::DestroyDB("testdb3.2",Options()); +// FieldDB *db = new FieldDB(); + +// if(OpenDB("testdb3.2", &db).ok() == false) { +// std::cerr << "open db failed" << std::endl; +// abort(); +// } +// db->CreateIndexOnField("address"); +// db->CreateIndexOnField("age"); +// shanghaiKeys.clear(); +// age20Keys.clear(); +// int thread_num_ = 2; +// std::vector threads(thread_num_); +// threads[0] = std::thread([db](){ +// InsertFieldData(db); +// }); +// threads[1] = std::thread([db](){ +// InsertOneField(db); +// delete db; +// }); + + +// if (threads[1].joinable()) { +// threads[1].join(); +// } + +// db = new FieldDB(); +// if(OpenDB("testdb3.2", &db).ok() == false) { +// std::cerr << "open db failed" << std::endl; +// abort(); +// } +// GetOneField(db); +// checkDataInKVAndIndex(db); +// } + +int main(int argc, char** argv) { + // All tests currently run with the same read-only file limits. + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file From 23b603dda024e20c6e7238ef5f132be2e3ad59d8 Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Mon, 23 Dec 2024 13:54:01 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E6=A8=A1=E6=8B=9F=E6=8F=92=E5=85=A5?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=97=B6=E5=B4=A9=E6=BA=83=E7=9A=84=E6=B5=8B?= =?UTF-8?q?=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fielddb/field_db.cpp | 7 +++-- test/recover_test.cc | 79 ++++++++++++++++++++++++++-------------------------- 2 files changed, 44 insertions(+), 42 deletions(-) diff --git a/fielddb/field_db.cpp b/fielddb/field_db.cpp index a83960f..9bd93f1 100644 --- a/fielddb/field_db.cpp +++ b/fielddb/field_db.cpp @@ -82,15 +82,18 @@ Status FieldDB::Recover() { MetaValue = Iter->key(); MetaType type = MetaType(DecodeFixed32(MetaValue.data())); MetaValue.remove_prefix(4);//移除头上的metaType的部分 + Slice extractKey; + GetLengthPrefixedSlice(&MetaValue, &extractKey); if(type == KV_Creating) { FieldArray fields; ParseValue(Iter->value().ToString(), &fields); - PutFields(WriteOptions(), MetaValue, fields); + PutFields(WriteOptions(), extractKey, fields); } else if(type == KV_Deleting) { - Delete(WriteOptions(), MetaValue); + Delete(WriteOptions(), extractKey); } else { assert(0 && "Invalid MetaType"); } + Iter->Next(); } delete Iter; //在所有的请求完成后,会自动把metaDB的内容清空。 diff --git a/test/recover_test.cc b/test/recover_test.cc index 4b05c1d..47cc731 100644 --- a/test/recover_test.cc +++ b/test/recover_test.cc @@ -8,12 +8,6 @@ #include using namespace fielddb; -// std::atomic thread_has_error(false); - -// void signalHandler(int signum) { -// // 捕捉段错误 -// } - TEST(TestNormalRecover, Recover) { fielddb::DestroyDB("testdb3.1",Options()); FieldDB *db = new FieldDB(); @@ -42,42 +36,47 @@ TEST(TestNormalRecover, Recover) { findKeysByAgeIndex(db, true); } -// TEST(TestParalPutRecover, Recover) { -// signal(SIGSEGV, signalHandler); -// fielddb::DestroyDB("testdb3.2",Options()); -// FieldDB *db = new FieldDB(); +TEST(TestParalPutRecover, Recover) { + //第一次运行 + // fielddb::DestroyDB("testdb3.2",Options()); + // FieldDB *db = new FieldDB(); -// if(OpenDB("testdb3.2", &db).ok() == false) { -// std::cerr << "open db failed" << std::endl; -// abort(); -// } -// db->CreateIndexOnField("address"); -// db->CreateIndexOnField("age"); -// shanghaiKeys.clear(); -// age20Keys.clear(); -// int thread_num_ = 2; -// std::vector threads(thread_num_); -// threads[0] = std::thread([db](){ -// InsertFieldData(db); -// }); -// threads[1] = std::thread([db](){ -// InsertOneField(db); -// delete db; -// }); + // if(OpenDB("testdb3.2", &db).ok() == false) { + // std::cerr << "open db failed" << std::endl; + // abort(); + // } + // db->CreateIndexOnField("address"); + // db->CreateIndexOnField("age"); + // shanghaiKeys.clear(); + // age20Keys.clear(); + // int thread_num_ = 2; + // std::vector threads(thread_num_); + // threads[0] = std::thread([db](){ + // InsertFieldData(db); + // }); + // threads[1] = std::thread([db](){ + // InsertOneField(db); + // delete db; + // }); + // for (auto& t : threads) { + // if (t.joinable()) { + // t.join(); + // } + // } + //线程1导致了线程0错误,测试会终止(模拟数据库崩溃) + //这会导致线程0在写入的各种奇怪的时间点崩溃 + //第二次运行注释掉上面的代码,运行下面的代码测试恢复 - -// if (threads[1].joinable()) { -// threads[1].join(); -// } - -// db = new FieldDB(); -// if(OpenDB("testdb3.2", &db).ok() == false) { -// std::cerr << "open db failed" << std::endl; -// abort(); -// } -// GetOneField(db); -// checkDataInKVAndIndex(db); -// } + + //第二次运行 + FieldDB *db = new FieldDB(); + if(OpenDB("testdb3.2", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + GetOneField(db); + checkDataInKVAndIndex(db); +} int main(int argc, char** argv) { // All tests currently run with the same read-only file limits.