From 50e2ec65e06e6824d5509bfa5976af937c17eaf7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E4=BA=88=E6=9B=88?= <10222140454@stu.ecnu.edu.cn> Date: Sun, 5 Jan 2025 21:48:37 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B5=8B=E8=AF=95=E6=96=87=E4=BB=B6-=E5=B9=B6?= =?UTF-8?q?=E5=8F=91=E6=8E=A7=E5=88=B6=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/concurrency_test.cc | 328 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 328 insertions(+) create mode 100644 test/concurrency_test.cc diff --git a/test/concurrency_test.cc b/test/concurrency_test.cc new file mode 100644 index 0000000..6daf4cb --- /dev/null +++ b/test/concurrency_test.cc @@ -0,0 +1,328 @@ +#include "gtest/gtest.h" +#include "db/NewDB.h" // NewDB 的头文件 +#include "leveldb/env.h" +#include "leveldb/db.h" +#include "db/write_batch_internal.h" +#include +#include +#include +#include +#include + +using namespace leveldb; + +Status OpenNewDB(std::string dbName, NewDB** db) { + Options options = Options(); + options.create_if_missing = true; + return NewDB::Open(options, dbName, db); +} + +// 全局的随机数引擎 +std::default_random_engine rng; + +// 设置随机种子 +void SetGlobalSeed(unsigned seed) { + rng.seed(seed); +} + +// 生成随机字符串 +std::string GenerateRandomString(size_t length) { + static const char alphanum[] = + "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + + std::uniform_int_distribution dist(0, sizeof(alphanum) - 2); + + std::string str(length, 0); + for (size_t i = 0; i < length; ++i) { + str[i] = alphanum[dist(rng)]; + } + return str; +} + +int getRandomInRange(int min, int max) { + return min + std::rand() % ((max + 1) - min); +} + +std::mutex results_mutex; +std::vector results; + +void InsertResult(bool TRUEorFALSE){ + std::unique_lock lock(results_mutex, std::defer_lock); + lock.lock(); + results.emplace_back(TRUEorFALSE); + lock.unlock(); +} + +std::string testdbname = "dbtest38"; + + +void thread_task_deleteindex(NewDB* db, int thread_id) { + db->DeleteIndex("address"); +} + +void thread_task_delete(NewDB* db, int thread_id) { + int i = getRandomInRange(0,9999); + + std::string key = "k_" + std::to_string(i); + std::string fieldvalue; + + FieldArray retrieved_fields; + if(db->Get_fields(ReadOptions(), Slice(key), &retrieved_fields).ok()){ + fieldvalue = retrieved_fields[1].second; + } + + // while (db->indexed_fields_write.find("address") == db->indexed_fields_write.end()) { + // std::this_thread::sleep_for(std::chrono::milliseconds(1)); + // } + db->Delete(WriteOptions(), key); + +// check the consistency + if(db->Get_fields(ReadOptions(), Slice(key), &retrieved_fields).ok()){ + InsertResult(false); + return; + } + + Field field_to_query{"address", fieldvalue}; // field_value of field address + if (db->indexed_fields_read.find("address") == db->indexed_fields_read.end()) { // index not ready + // std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::vector matching_keys = db->QueryByIndex(field_to_query); + if(matching_keys.empty()){ + InsertResult(true); + return; + } + } + + // index ready + std::vector matching_keys = db->QueryByIndex(field_to_query); + if(!matching_keys.empty()){ + // std::cout << thread_id << std::endl; + InsertResult(false); + return; + } + + InsertResult(true); +} + +void thread_task_createindex(NewDB* db, int thread_id) { + db->CreateIndexOnField("address"); +} + +void thread_task_put(NewDB* db, int thread_id) { + std::string key = "k_" + GenerateRandomString(20); + FieldArray fields = { + {"name", GenerateRandomString(5)}, + {"address", GenerateRandomString(15)}, + {"phone", GenerateRandomString(11)} + }; + + // while (db->indexed_fields_write.find("address") == db->indexed_fields_write.end()) { + // std::this_thread::sleep_for(std::chrono::milliseconds(1)); + // } + db->Put_fields(WriteOptions(), key, fields); + +// check the consistency + FieldArray retrieved_fields; + if(!db->Get_fields(ReadOptions(), Slice(key), &retrieved_fields).ok()){ + InsertResult(false); + return; + } + + Field field_to_query{"address", fields[1].second}; // field_value of field address + + if (db->indexed_fields_read.find("address") == db->indexed_fields_read.end()) { // index not ready + // std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::vector matching_keys = db->QueryByIndex(field_to_query); + if(matching_keys.empty()){ + InsertResult(true); + return; + } + } + + // index ready + std::vector matching_keys = db->QueryByIndex(field_to_query); + if(matching_keys.empty()){ + // std::cout << thread_id << std::endl; + InsertResult(false); + return; + } + + int tag = 0; + for(auto& matching_key : matching_keys){ + if(key == matching_key){ + tag = 1; + break; + } + } + if(tag != 1){ + InsertResult(false); + return; + } + + InsertResult(true); +} + +void thread_task_put_key(NewDB* db, int thread_id, std::string key, std::string name) { + FieldArray fields = { + {"name", name}, + {"address", GenerateRandomString(15)}, + {"phone", GenerateRandomString(11)} + }; + + db->Put_fields(WriteOptions(), key, fields); +} + +TEST(TestNewDB, ConcurrencyALLTest) { + // 创建 NewDB 实例 + NewDB* db; + ASSERT_TRUE(OpenNewDB(testdbname, &db).ok()); + + // 批量插入数据 + const int num_records = 10000; + + for (int i = 0; i < num_records; ++i) { + // std::string key = "k_" + GenerateRandomString(20); + std::string key = "k_" + std::to_string(i); + FieldArray fields = { + {"name", GenerateRandomString(5)}, + {"address", GenerateRandomString(15)}, + {"phone", GenerateRandomString(11)} + }; + ASSERT_TRUE(db->Put_fields(WriteOptions(), key, fields).ok()); + } + + // 创建线程向量用于保存所有线程对象 + std::vector threads; + std::vector threads2; + + threads.emplace_back([=]() { thread_task_createindex(db, 0); }); + + // 启动100个线程,每个线程都运行thread_task函数 + int thread_num = 100; + for (int i = 1; i <= thread_num; ++i) { + // 创建随机设备用于种子 + std::random_device rd; + + // 使用随机设备创建一个均匀分布的随机数生成器 + std::uniform_int_distribution dist(0, 1); + + // 根据随机数选择执行哪个代码块 + if (dist(rd) == 0) { + threads.emplace_back([=]() { thread_task_delete(db, i); }); + } else { + threads.emplace_back([=]() { thread_task_put(db, i); }); + } + // threads.emplace_back([=]() { thread_task_put(db, i); }); + } + + // 等待所有线程完成 + for (auto& th : threads) { + if (th.joinable()) { + th.join(); + } + } + + // check if all the results are true + for(const auto& result : results){ + EXPECT_EQ(result, true); + } + + threads2.emplace_back([=]() { thread_task_deleteindex(db, 200); }); + + // 启动100个线程,每个线程都运行thread_task函数 + for (int i = 201; i <= thread_num+200; ++i) { + // 创建随机设备用于种子 + std::random_device rd; + + // 使用随机设备创建一个均匀分布的随机数生成器 + std::uniform_int_distribution dist(0, 1); + + // 根据随机数选择执行哪个代码块 + if (dist(rd) == 0) { + threads2.emplace_back([=]() { thread_task_delete(db, i); }); + } else { + threads2.emplace_back([=]() { thread_task_put(db, i); }); + } + // threads2.emplace_back([=]() { thread_task_put(db, i); }); + } + + // 等待所有线程完成 + for (auto& th : threads2) { + if (th.joinable()) { + th.join(); + } + } + + // check if all the results are true + for(const auto& result : results){ + EXPECT_EQ(result, true); + } + + delete db; +} + +TEST(TestNewDB, ConcurrencyPutSameKeyTest) { + // 创建 NewDB 实例 + NewDB* db; + ASSERT_TRUE(OpenNewDB(testdbname, &db).ok()); + + // 批量插入数据 + const int num_records = 1; + + for (int i = 0; i < num_records; ++i) { + // std::string key = "k_" + GenerateRandomString(20); + std::string key = "k_" + std::to_string(i); + FieldArray fields = { + {"name", GenerateRandomString(5)}, + {"address", GenerateRandomString(15)}, + {"phone", GenerateRandomString(11)} + }; + ASSERT_TRUE(db->Put_fields(WriteOptions(), key, fields).ok()); + } + + db->DeleteIndex("name"); + + db->CreateIndexOnField("name"); + + // 创建线程向量用于保存所有线程对象 + std::vector threads; + std::string name; + + for (int i = 1; i <= 1; ++i) { + std::string ikey = "k_" + std::to_string(i); + name = "bob"; + threads.emplace_back([=]() { thread_task_put_key(db, i, ikey, name); }); + name = "john"; + threads.emplace_back([=]() { thread_task_put_key(db, i, ikey, name); }); + + // 等待所有线程完成 + for (auto& th : threads) { + if (th.joinable()) { + th.join(); + } + } + + // check data and index + FieldArray retrieved_fields; + Status s = db->Get_fields(ReadOptions(), Slice(ikey), &retrieved_fields); + ASSERT_TRUE(s.ok()); + + std::string fieldvalue = retrieved_fields[0].second; + Field field_to_query{"name", fieldvalue}; + + // fieldvalue should be found by index + std::vector matching_keys = db->QueryByIndex(field_to_query); + ASSERT_FALSE(matching_keys.empty()); + ASSERT_TRUE(std::find(matching_keys.begin(), matching_keys.end(), ikey) != matching_keys.end()); + } + + delete db; +} + +int main(int argc, char** argv) { + // 设置全局随机种子 + SetGlobalSeed(static_cast(time(nullptr))); + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}