#include "gtest/gtest.h" #include // #include "leveldb/env.h" // #include "leveldb/db.h" #include "fielddb/field_db.h" #include "test/helper.cc" using namespace fielddb; // 测试中read/write都表示带索引的读写 //读写有索引数据的并发 TEST(TestReadPut, Parallel) { fielddb::DestroyDB("testdb2.1",Options()); FieldDB *db = new FieldDB(); if(OpenDB("testdb2.1", &db).ok() == false) { std::cerr << "open db failed" << std::endl; abort(); } // ClearDB(db); db->CreateIndexOnField("address", op); db->CreateIndexOnField("age", op); int thread_num_ = 5; std::vector threads(thread_num_); //二写三读 for (size_t i = 0; i < thread_num_; i++) { if (i == 0) {//写随机序列0 threads[i] = std::thread(InsertFieldData, db, 0); } else if (i == 1) {//写随机序列1 threads[i] = std::thread(InsertFieldData, db, 1); } else {//读 bool allowNotFound = true; threads[i] = std::thread(GetFieldData, db, allowNotFound, 0); } } for (auto& t : threads) { if (t.joinable()) { t.join(); } } // 此时写已完成,一定能读到两次写 bool allowNotFound = false; GetFieldData(db, allowNotFound); GetFieldData(db, allowNotFound, 1); findKeysByCity(db); checkDataInKVAndIndex(db); // db->CompactRange(nullptr, nullptr); delete db; } //创建索引与写有该索引数据的并发 TEST(TestPutCreatei, Parallel) { fielddb::DestroyDB("testdb2.2",Options()); FieldDB *db = new FieldDB(); if(OpenDB("testdb2.2", &db).ok() == false) { std::cerr << "open db failed" << std::endl; abort(); } // ClearDB(db); shanghaiKeys.clear(); InsertFieldData(db); int thread_num_ = 2; std::vector threads(thread_num_); for (size_t i = 0; i < thread_num_; i++) { if (i == 0) {//创建索引 threads[i] = std::thread([db](){ db->CreateIndexOnField("address", op); std::cout << "finish create index\n"; }); } else {//写 threads[i] = std::thread([db](){ while (db->GetIndexStatus("address") == NotExist){ continue; //开始创建了再并发的写 } InsertOneField(db); //先插一条 }); } } for (auto& t : threads) { if (t.joinable()) { t.join(); } } //检查索引是否创建成功 bool haveIndex = true; findKeysByCityIndex(db, haveIndex); //检查写入是否成功 GetOneField(db); checkDataInKVAndIndex(db); delete db; } //创建删除不同索引的并发 TEST(TestCreateiCreatei, Parallel) { fielddb::DestroyDB("testdb2.3",Options()); FieldDB *db = new FieldDB(); if(OpenDB("testdb2.3", &db).ok() == false) { std::cerr << "open db failed" << std::endl; abort(); } // ClearDB(db); shanghaiKeys.clear(); age20Keys.clear(); InsertFieldData(db); int thread_num_ = 3; std::vector threads(thread_num_); for (size_t i = 0; i < thread_num_; i++) { //3线程并发创建索引address threads[i] = std::thread([db](){ db->CreateIndexOnField("address", op); std::cout << "finish create index address\n"; }); } for (auto& t : threads) { if (t.joinable()) { t.join(); } } //检查索引是否创建成功 bool haveIndex = true; findKeysByCityIndex(db, haveIndex); findKeysByAgeIndex(db, false); checkDataInKVAndIndex(db); for (size_t i = 0; i < thread_num_; i++) { if (i == 0 || i == 1) {//2线程删除索引address threads[i] = std::thread([db](){ db->DeleteIndex("address", op); std::cout << "finish delete index address\n"; }); } else {//1线程创建索引age threads[i] = std::thread([db](){ db->CreateIndexOnField("age", op); std::cout << "finish create index age\n"; }); } } for (auto& t : threads) { if (t.joinable()) { t.join(); } } //检查 findKeysByCityIndex(db, false); findKeysByAgeIndex(db, true); checkDataInKVAndIndex(db, "age"); delete db; } //有索引时,大量并发put与delete相同key,确保kvdb和indexdb的一致性 TEST(TestPutDeleteOne, Parallel) { fielddb::DestroyDB("testdb2.4",Options()); FieldDB *db = new FieldDB(); if(OpenDB("testdb2.4", &db).ok() == false) { std::cerr << "open db failed" << std::endl; abort(); } // ClearDB(db); shanghaiKeys.clear(); age20Keys.clear(); db->CreateIndexOnField("address", op); db->CreateIndexOnField("age", op); int thread_num_ = 20; std::vector threads(thread_num_); for (size_t i = 0; i < thread_num_; i++) { if (i % 2 == 0) { threads[i] = std::thread([db](){ for (size_t j = 0; j < 100; j++) { InsertOneField(db, std::to_string(j)); } }); } else { threads[i] = std::thread([db](){ for (size_t j = 0; j < 100; j++) { DeleteOneField(db, std::to_string(j)); } }); } } for (auto& t : threads) { if (t.joinable()) { t.join(); } } //检查 checkDataInKVAndIndex(db); delete db; } //有索引时,put与delete的并发,确保kvdb和indexdb的一致性 TEST(TestPutDelete, Parallel) { fielddb::DestroyDB("testdb2.5",Options()); FieldDB *db = new FieldDB(); if(OpenDB("testdb2.5", &db).ok() == false) { std::cerr << "open db failed" << std::endl; abort(); } // ClearDB(db); shanghaiKeys.clear(); age20Keys.clear(); db->CreateIndexOnField("address", op); db->CreateIndexOnField("age", op); int thread_num_ = 4; std::vector threads(thread_num_); threads[0] = std::thread([db](){InsertFieldData(db);}); threads[1] = std::thread([db](){InsertFieldData(db, 1);}); threads[2] = std::thread([db](){DeleteFieldData(db);}); threads[3] = std::thread([db](){DeleteFieldData(db, 1);}); for (auto& t : threads) { if (t.joinable()) { t.join(); } } //检查 checkDataInKVAndIndex(db); delete db; } //write和其他功能的并发(大杂烩 TEST(TestWrite, Parallel) { fielddb::DestroyDB("testdb2.6",Options()); FieldDB *db = new FieldDB(); if(OpenDB("testdb2.6", &db).ok() == false) { std::cerr << "open db failed" << std::endl; abort(); } // ClearDB(db); shanghaiKeys.clear(); age20Keys.clear(); db->CreateIndexOnField("address", op); InsertFieldData(db, 2); //先填点数据,让创建索引的时间久一点 int thread_num_ = 4; std::vector threads(thread_num_); threads[0] = std::thread([db](){db->CreateIndexOnField("age", op);}); threads[1] = std::thread([db](){ while (db->GetIndexStatus("age") == NotExist){ continue; //开始创建了再并发的写 } InsertFieldData(db);}); threads[2] = std::thread([db](){ while (db->GetIndexStatus("age") == NotExist){ continue; } WriteFieldData(db, 1);}); // threads[3] = std::thread([db](){ // while (db->GetIndexStatus("age") == NotExist){ // continue; // } // DeleteFieldData(db, 0);}); threads[3] = std::thread([db](){ while (db->GetIndexStatus("age") == NotExist){ continue; } db->DeleteIndex("age", op);}); for (auto& t : threads) { if (t.joinable()) { t.join(); } } //检查 //如果加入delete线程就不能保证下面被读到 GetFieldData(db, false, 0); GetFieldData(db, false, 1); GetFieldData(db, false, 2); checkDataInKVAndIndex(db); ASSERT_EQ(db->GetIndexStatus("age"), NotExist); //删除索引的请求应该被pend在创建之上 //删掉最后一个线程,可以测试创建age索引时并发的写入能不能保持age的一致性 //checkDataInKVAndIndex(db, "age"); delete db; } int main(int argc, char** argv) { // All tests currently run with the same read-only file limits. testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }