#include "gtest/gtest.h" #include "db/NewDB.h" // NewDB 的头文件 #include "leveldb/env.h" #include "leveldb/db.h" #include "db/write_batch_internal.h" #include <iostream> #include <random> #include <ctime> #include <thread> #include <mutex> using namespace leveldb; Status OpenNewDB(std::string dbName, NewDB** db) { Options options = Options(); options.create_if_missing = true; return NewDB::Open(options, dbName, db); } // 全局的随机数引擎 std::default_random_engine rng; // 设置随机种子 void SetGlobalSeed(unsigned seed) { rng.seed(seed); } // 生成随机字符串 std::string GenerateRandomString(size_t length) { static const char alphanum[] = "0123456789" "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"; std::uniform_int_distribution<int> dist(0, sizeof(alphanum) - 2); std::string str(length, 0); for (size_t i = 0; i < length; ++i) { str[i] = alphanum[dist(rng)]; } return str; } int getRandomInRange(int min, int max) { return min + std::rand() % ((max + 1) - min); } std::mutex results_mutex; std::vector<bool> results; void InsertResult(bool TRUEorFALSE){ std::unique_lock<std::mutex> lock(results_mutex, std::defer_lock); lock.lock(); results.emplace_back(TRUEorFALSE); lock.unlock(); } std::string testdbname = "dbtest38"; void thread_task_deleteindex(NewDB* db, int thread_id) { db->DeleteIndex("address"); } void thread_task_delete(NewDB* db, int thread_id) { int i = getRandomInRange(0,9999); std::string key = "k_" + std::to_string(i); std::string fieldvalue; FieldArray retrieved_fields; if(db->Get_fields(ReadOptions(), Slice(key), &retrieved_fields).ok()){ fieldvalue = retrieved_fields[1].second; } // while (db->indexed_fields_write.find("address") == db->indexed_fields_write.end()) { // std::this_thread::sleep_for(std::chrono::milliseconds(1)); // } db->Delete(WriteOptions(), key); // check the consistency if(db->Get_fields(ReadOptions(), Slice(key), &retrieved_fields).ok()){ InsertResult(false); return; } Field field_to_query{"address", fieldvalue}; // field_value of field address if (db->indexed_fields_read.find("address") == db->indexed_fields_read.end()) { // index not ready // std::this_thread::sleep_for(std::chrono::milliseconds(1)); std::vector<std::string> matching_keys = db->QueryByIndex(field_to_query); if(matching_keys.empty()){ InsertResult(true); return; } } // index ready std::vector<std::string> matching_keys = db->QueryByIndex(field_to_query); if(!matching_keys.empty()){ // std::cout << thread_id << std::endl; InsertResult(false); return; } InsertResult(true); } void thread_task_createindex(NewDB* db, int thread_id) { db->CreateIndexOnField("address"); } void thread_task_put(NewDB* db, int thread_id) { std::string key = "k_" + GenerateRandomString(20); FieldArray fields = { {"name", GenerateRandomString(5)}, {"address", GenerateRandomString(15)}, {"phone", GenerateRandomString(11)} }; // while (db->indexed_fields_write.find("address") == db->indexed_fields_write.end()) { // std::this_thread::sleep_for(std::chrono::milliseconds(1)); // } db->Put_fields(WriteOptions(), key, fields); // check the consistency FieldArray retrieved_fields; if(!db->Get_fields(ReadOptions(), Slice(key), &retrieved_fields).ok()){ InsertResult(false); return; } Field field_to_query{"address", fields[1].second}; // field_value of field address if (db->indexed_fields_read.find("address") == db->indexed_fields_read.end()) { // index not ready // std::this_thread::sleep_for(std::chrono::milliseconds(1)); std::vector<std::string> matching_keys = db->QueryByIndex(field_to_query); if(matching_keys.empty()){ InsertResult(true); return; } } // index ready std::vector<std::string> matching_keys = db->QueryByIndex(field_to_query); if(matching_keys.empty()){ // std::cout << thread_id << std::endl; InsertResult(false); return; } int tag = 0; for(auto& matching_key : matching_keys){ if(key == matching_key){ tag = 1; break; } } if(tag != 1){ InsertResult(false); return; } InsertResult(true); } void thread_task_put_key(NewDB* db, int thread_id, std::string key, std::string name) { FieldArray fields = { {"name", name}, {"address", GenerateRandomString(15)}, {"phone", GenerateRandomString(11)} }; db->Put_fields(WriteOptions(), key, fields); } TEST(TestNewDB, ConcurrencyALLTest) { // 创建 NewDB 实例 NewDB* db; ASSERT_TRUE(OpenNewDB(testdbname, &db).ok()); // 批量插入数据 const int num_records = 10000; for (int i = 0; i < num_records; ++i) { // std::string key = "k_" + GenerateRandomString(20); std::string key = "k_" + std::to_string(i); FieldArray fields = { {"name", GenerateRandomString(5)}, {"address", GenerateRandomString(15)}, {"phone", GenerateRandomString(11)} }; ASSERT_TRUE(db->Put_fields(WriteOptions(), key, fields).ok()); } // 创建线程向量用于保存所有线程对象 std::vector<std::thread> threads; std::vector<std::thread> threads2; threads.emplace_back([=]() { thread_task_createindex(db, 0); }); // 启动100个线程,每个线程都运行thread_task函数 int thread_num = 100; for (int i = 1; i <= thread_num; ++i) { // 创建随机设备用于种子 std::random_device rd; // 使用随机设备创建一个均匀分布的随机数生成器 std::uniform_int_distribution<int> dist(0, 1); // 根据随机数选择执行哪个代码块 if (dist(rd) == 0) { threads.emplace_back([=]() { thread_task_delete(db, i); }); } else { threads.emplace_back([=]() { thread_task_put(db, i); }); } // threads.emplace_back([=]() { thread_task_put(db, i); }); } // 等待所有线程完成 for (auto& th : threads) { if (th.joinable()) { th.join(); } } // check if all the results are true for(const auto& result : results){ EXPECT_EQ(result, true); } threads2.emplace_back([=]() { thread_task_deleteindex(db, 200); }); // 启动100个线程,每个线程都运行thread_task函数 for (int i = 201; i <= thread_num+200; ++i) { // 创建随机设备用于种子 std::random_device rd; // 使用随机设备创建一个均匀分布的随机数生成器 std::uniform_int_distribution<int> dist(0, 1); // 根据随机数选择执行哪个代码块 if (dist(rd) == 0) { threads2.emplace_back([=]() { thread_task_delete(db, i); }); } else { threads2.emplace_back([=]() { thread_task_put(db, i); }); } // threads2.emplace_back([=]() { thread_task_put(db, i); }); } // 等待所有线程完成 for (auto& th : threads2) { if (th.joinable()) { th.join(); } } // check if all the results are true for(const auto& result : results){ EXPECT_EQ(result, true); } delete db; } TEST(TestNewDB, ConcurrencyPutSameKeyTest) { // 创建 NewDB 实例 NewDB* db; ASSERT_TRUE(OpenNewDB(testdbname, &db).ok()); // 批量插入数据 const int num_records = 1; for (int i = 0; i < num_records; ++i) { // std::string key = "k_" + GenerateRandomString(20); std::string key = "k_" + std::to_string(i); FieldArray fields = { {"name", GenerateRandomString(5)}, {"address", GenerateRandomString(15)}, {"phone", GenerateRandomString(11)} }; ASSERT_TRUE(db->Put_fields(WriteOptions(), key, fields).ok()); } db->DeleteIndex("name"); db->CreateIndexOnField("name"); // 创建线程向量用于保存所有线程对象 std::vector<std::thread> threads; std::string name; for (int i = 1; i <= 1; ++i) { std::string ikey = "k_" + std::to_string(i); name = "bob"; threads.emplace_back([=]() { thread_task_put_key(db, i, ikey, name); }); name = "john"; threads.emplace_back([=]() { thread_task_put_key(db, i, ikey, name); }); // 等待所有线程完成 for (auto& th : threads) { if (th.joinable()) { th.join(); } } // check data and index FieldArray retrieved_fields; Status s = db->Get_fields(ReadOptions(), Slice(ikey), &retrieved_fields); ASSERT_TRUE(s.ok()); std::string fieldvalue = retrieved_fields[0].second; Field field_to_query{"name", fieldvalue}; // fieldvalue should be found by index std::vector<std::string> matching_keys = db->QueryByIndex(field_to_query); ASSERT_FALSE(matching_keys.empty()); ASSERT_TRUE(std::find(matching_keys.begin(), matching_keys.end(), ikey) != matching_keys.end()); } delete db; } int main(int argc, char** argv) { // 设置全局随机种子 SetGlobalSeed(static_cast<unsigned>(time(nullptr))); testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }