From d5e46b56b375e6f974f6b817bfa193a3fea97869 Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Sun, 22 Dec 2024 15:51:27 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=B9=B6=E9=80=9A=E8=BF=87?= =?UTF-8?q?=E4=BA=86write=E7=9B=B8=E5=85=B3=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fielddb/request.cpp | 18 ++++++++++---- fielddb/request.h | 2 +- test/basic_function_test.cc | 3 +++ test/helper.cc | 45 ++++++++++++++++++++++++++++++++--- test/parallel_test.cc | 57 +++++++++++++++++++++++++++++++++++++++++++-- 5 files changed, 114 insertions(+), 11 deletions(-) diff --git a/fielddb/request.cpp b/fielddb/request.cpp index f08937b..35524ee 100644 --- a/fielddb/request.cpp +++ b/fielddb/request.cpp @@ -286,7 +286,7 @@ void iDeleteReq::Prepare(FieldDB *DB) { done = true; } else { //如果正在创建或者删除,那么pend到对应的请求上 - parent->PendReq(this); + parent->PendReq(this->parent); } } @@ -334,9 +334,17 @@ BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu): void Put(const Slice &key, const Slice &value) override { //为key和value构造存储的地方,防止由于string的析构造成可能得内存访问错误 str_buf->push_back(key.ToString()); - fa_buf->push_back({{"",value.ToString()}}); + FieldArray *field = new FieldArray; + field = ParseValue(value.ToString(), field); + if (field == nullptr){ //batch中的value没有field + fa_buf->push_back({{"",value.ToString()}}); + } else { + fa_buf->push_back(*field); + } + sub_requests->emplace_back(new FieldsReq(&str_buf->back(),&fa_buf->back(),mu)); sub_requests->back()->parent = req; + delete field; } void Delete(const Slice &key) override { str_buf->push_back(key.ToString()); @@ -370,10 +378,10 @@ BatchReq::~BatchReq() { } void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) { WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch; - std::unordered_set Sub_batchKeySet; + std::unordered_set Sub_batchKeySet; //由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代 for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) { (*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet); @@ -386,7 +394,7 @@ void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, KVBatch.Append(Sub_KVBatch); IndexBatch.Append(Sub_IndexBatch); MetaBatch.Append(Sub_MetaBatch); - batchKeySet.insert(batchKeySet.begin(),batchKeySet.end()); + batchKeySet.insert(Sub_batchKeySet.begin(),Sub_batchKeySet.end()); } diff --git a/fielddb/request.h b/fielddb/request.h index de391e0..19faf49 100644 --- a/fielddb/request.h +++ b/fielddb/request.h @@ -138,7 +138,7 @@ public: ~BatchReq(); void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, - WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; + WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set &batchKeySet) override; WriteBatch *Batch; std::deque sub_requests; diff --git a/test/basic_function_test.cc b/test/basic_function_test.cc index 2918b15..729fb73 100644 --- a/test/basic_function_test.cc +++ b/test/basic_function_test.cc @@ -57,6 +57,9 @@ TEST(TestLab2, Basic) { std::vector resKeys = db->QueryByIndex(field, &s); ASSERT_EQ(resKeys.size(), 0); + WriteFieldData(db); + GetFieldData(db, false); + findKeysByAgeIndex(db, true); delete db; } diff --git a/test/helper.cc b/test/helper.cc index 105c05b..a0b9e79 100644 --- a/test/helper.cc +++ b/test/helper.cc @@ -19,7 +19,8 @@ std::vector cities = { ThreadSafeSet shanghaiKeys; ThreadSafeSet age20Keys; //复杂的测试要注意这两个全局变量, -//目前只有InsertFieldData和InsertOneField会往里加,DeleteFieldData和InsertOneField会删除, +//目前只有InsertFieldData和InsertOneField和writeFieldData会往里加, +//DeleteFieldData和InsertOneField会删除, //其他测试之间有必要手动clear Status OpenDB(std::string dbName, FieldDB **db) { @@ -131,6 +132,41 @@ void DeleteFieldData(FieldDB *db, int seed = 0/*随机种子*/) { } } +void WriteFieldData(FieldDB *db, int seed = 0/*随机种子*/) { + std::cout << "-------writing-------" << std::endl; + WriteOptions writeOptions; + int key_num = data_size / value_size; + // srand线程不安全,这种可以保证多线程时随机序列也一致 + std::mt19937 rng(seed); + + WriteBatch wb; + for (int i = 0; i < key_num; i++) { + int randThisTime = rng(); //确保读写一个循环只rand一次,否则随机序列会不一致 + //让批量写入的key>0, 单独写入的key<=0,方便测试观察 + int key_ = std::abs(randThisTime) % key_num + 1; + std::string key = std::to_string(key_); + + std::string name = "customer#" + std::to_string(key_); + std::string address = cities[randThisTime % cities.size()]; + std::string age = std::to_string(std::abs(randThisTime) % AGE_RANGE); + FieldArray fields = { + {"name", name}, + {"address", address}, + {"age", age} + }; + if (address == "Shanghai") { + shanghaiKeys.insert(key); + } + if (age == "20") { + age20Keys.insert(key); + } + wb.Put(key, SerializeValue(fields)); + + } + Status s = db->Write(writeOptions, &wb); + ASSERT_TRUE(s.ok()); +} + //并发时不一定能读到,加个参数控制 void GetFieldData(FieldDB *db, bool allowNotFound, int seed = 0) { std::cout << "-------getting-------" << std::endl; @@ -232,8 +268,11 @@ void findKeysByAgeIndex(FieldDB *db, bool haveIndex) { } } -void checkDataInKVAndIndex(FieldDB *db) { - Field field = {"address", "Shanghai"}; +void checkDataInKVAndIndex(FieldDB *db, std::string fieldName = "address") { + Field field; + if (fieldName == "address") field = {"address", "Shanghai"}; + else if (fieldName == "age") field = {"age", "20"}; + else assert(0);//只支持这两个字段检查 Status s; std::vector resKeys1 = db->QueryByIndex(field, &s); //indexdb根据索引查到的数据 std::vector resKeys2 = db->FindKeysByField(field); //kvdb强行遍历查到的数据 diff --git a/test/parallel_test.cc b/test/parallel_test.cc index 80c28ea..6925df7 100644 --- a/test/parallel_test.cc +++ b/test/parallel_test.cc @@ -9,7 +9,7 @@ using namespace fielddb; // 测试中read/write都表示带索引的读写 //读写有索引数据的并发 -TEST(TestReadWrite, Parallel) { +TEST(TestReadPut, Parallel) { fielddb::DestroyDB("testdb2.1",Options()); FieldDB *db = new FieldDB(); @@ -52,7 +52,7 @@ TEST(TestReadWrite, Parallel) { } //创建索引与写有该索引数据的并发 -TEST(TestWriteCreatei, Parallel) { +TEST(TestPutCreatei, Parallel) { fielddb::DestroyDB("testdb2.2",Options()); FieldDB *db = new FieldDB(); @@ -242,6 +242,59 @@ TEST(TestPutDelete, Parallel) { delete db; } +//write和其他功能的并发(大杂烩 +TEST(TestWrite, Parallel) { + fielddb::DestroyDB("testdb2.6",Options()); + FieldDB *db = new FieldDB(); + + if(OpenDB("testdb2.6", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + // ClearDB(db); + shanghaiKeys.clear(); + age20Keys.clear(); + db->CreateIndexOnField("address"); + InsertFieldData(db, 2); //先填点数据,让创建索引的时间久一点 + int thread_num_ = 5; + std::vector threads(thread_num_); + threads[0] = std::thread([db](){db->CreateIndexOnField("age");}); + threads[1] = std::thread([db](){ + while (db->GetIndexStatus("age") == NotExist){ + continue; //开始创建了再并发的写 + } + InsertFieldData(db);}); + threads[2] = std::thread([db](){ + while (db->GetIndexStatus("age") == NotExist){ + continue; + } + WriteFieldData(db, 1);}); + threads[3] = std::thread([db](){ + while (db->GetIndexStatus("age") == NotExist){ + continue; + } + DeleteFieldData(db, 0);}); + threads[4] = std::thread([db](){ + while (db->GetIndexStatus("age") == NotExist){ + continue; + } + db->DeleteIndex("age");}); + + for (auto& t : threads) { + if (t.joinable()) { + t.join(); + } + } + + //检查 + checkDataInKVAndIndex(db); + ASSERT_EQ(db->GetIndexStatus("age"), NotExist); //删除索引的请求应该被pend在创建之上 + //删掉最后一个线程,可以测试创建age索引时并发的写入能不能保持age的一致性 + //checkDataInKVAndIndex(db, "age"); + delete db; +} + int main(int argc, char** argv) { // All tests currently run with the same read-only file limits. testing::InitGoogleTest(&argc, argv);