小组成员:陈予曈、朱陈媛
 
 

328 regels
9.5 KiB

#include "gtest/gtest.h"
#include "db/NewDB.h" // NewDB 的头文件
#include "leveldb/env.h"
#include "leveldb/db.h"
#include "db/write_batch_internal.h"
#include <iostream>
#include <random>
#include <ctime>
#include <thread>
#include <mutex>
using namespace leveldb;
Status OpenNewDB(std::string dbName, NewDB** db) {
Options options = Options();
options.create_if_missing = true;
return NewDB::Open(options, dbName, db);
}
// 全局的随机数引擎
std::default_random_engine rng;
// 设置随机种子
void SetGlobalSeed(unsigned seed) {
rng.seed(seed);
}
// 生成随机字符串
std::string GenerateRandomString(size_t length) {
static const char alphanum[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
std::uniform_int_distribution<int> dist(0, sizeof(alphanum) - 2);
std::string str(length, 0);
for (size_t i = 0; i < length; ++i) {
str[i] = alphanum[dist(rng)];
}
return str;
}
int getRandomInRange(int min, int max) {
return min + std::rand() % ((max + 1) - min);
}
std::mutex results_mutex;
std::vector<bool> results;
void InsertResult(bool TRUEorFALSE){
std::unique_lock<std::mutex> lock(results_mutex, std::defer_lock);
lock.lock();
results.emplace_back(TRUEorFALSE);
lock.unlock();
}
std::string testdbname = "dbtest38";
void thread_task_deleteindex(NewDB* db, int thread_id) {
db->DeleteIndex("address");
}
void thread_task_delete(NewDB* db, int thread_id) {
int i = getRandomInRange(0,9999);
std::string key = "k_" + std::to_string(i);
std::string fieldvalue;
FieldArray retrieved_fields;
if(db->Get_fields(ReadOptions(), Slice(key), &retrieved_fields).ok()){
fieldvalue = retrieved_fields[1].second;
}
// while (db->indexed_fields_write.find("address") == db->indexed_fields_write.end()) {
// std::this_thread::sleep_for(std::chrono::milliseconds(1));
// }
db->Delete(WriteOptions(), key);
// check the consistency
if(db->Get_fields(ReadOptions(), Slice(key), &retrieved_fields).ok()){
InsertResult(false);
return;
}
Field field_to_query{"address", fieldvalue}; // field_value of field address
if (db->indexed_fields_read.find("address") == db->indexed_fields_read.end()) { // index not ready
// std::this_thread::sleep_for(std::chrono::milliseconds(1));
std::vector<std::string> matching_keys = db->QueryByIndex(field_to_query);
if(matching_keys.empty()){
InsertResult(true);
return;
}
}
// index ready
std::vector<std::string> matching_keys = db->QueryByIndex(field_to_query);
if(!matching_keys.empty()){
// std::cout << thread_id << std::endl;
InsertResult(false);
return;
}
InsertResult(true);
}
void thread_task_createindex(NewDB* db, int thread_id) {
db->CreateIndexOnField("address");
}
void thread_task_put(NewDB* db, int thread_id) {
std::string key = "k_" + GenerateRandomString(20);
FieldArray fields = {
{"name", GenerateRandomString(5)},
{"address", GenerateRandomString(15)},
{"phone", GenerateRandomString(11)}
};
// while (db->indexed_fields_write.find("address") == db->indexed_fields_write.end()) {
// std::this_thread::sleep_for(std::chrono::milliseconds(1));
// }
db->Put_fields(WriteOptions(), key, fields);
// check the consistency
FieldArray retrieved_fields;
if(!db->Get_fields(ReadOptions(), Slice(key), &retrieved_fields).ok()){
InsertResult(false);
return;
}
Field field_to_query{"address", fields[1].second}; // field_value of field address
if (db->indexed_fields_read.find("address") == db->indexed_fields_read.end()) { // index not ready
// std::this_thread::sleep_for(std::chrono::milliseconds(1));
std::vector<std::string> matching_keys = db->QueryByIndex(field_to_query);
if(matching_keys.empty()){
InsertResult(true);
return;
}
}
// index ready
std::vector<std::string> matching_keys = db->QueryByIndex(field_to_query);
if(matching_keys.empty()){
// std::cout << thread_id << std::endl;
InsertResult(false);
return;
}
int tag = 0;
for(auto& matching_key : matching_keys){
if(key == matching_key){
tag = 1;
break;
}
}
if(tag != 1){
InsertResult(false);
return;
}
InsertResult(true);
}
void thread_task_put_key(NewDB* db, int thread_id, std::string key, std::string name) {
FieldArray fields = {
{"name", name},
{"address", GenerateRandomString(15)},
{"phone", GenerateRandomString(11)}
};
db->Put_fields(WriteOptions(), key, fields);
}
TEST(TestNewDB, ConcurrencyALLTest) {
// 创建 NewDB 实例
NewDB* db;
ASSERT_TRUE(OpenNewDB(testdbname, &db).ok());
// 批量插入数据
const int num_records = 10000;
for (int i = 0; i < num_records; ++i) {
// std::string key = "k_" + GenerateRandomString(20);
std::string key = "k_" + std::to_string(i);
FieldArray fields = {
{"name", GenerateRandomString(5)},
{"address", GenerateRandomString(15)},
{"phone", GenerateRandomString(11)}
};
ASSERT_TRUE(db->Put_fields(WriteOptions(), key, fields).ok());
}
// 创建线程向量用于保存所有线程对象
std::vector<std::thread> threads;
std::vector<std::thread> threads2;
threads.emplace_back([=]() { thread_task_createindex(db, 0); });
// 启动100个线程,每个线程都运行thread_task函数
int thread_num = 100;
for (int i = 1; i <= thread_num; ++i) {
// 创建随机设备用于种子
std::random_device rd;
// 使用随机设备创建一个均匀分布的随机数生成器
std::uniform_int_distribution<int> dist(0, 1);
// 根据随机数选择执行哪个代码块
if (dist(rd) == 0) {
threads.emplace_back([=]() { thread_task_delete(db, i); });
} else {
threads.emplace_back([=]() { thread_task_put(db, i); });
}
// threads.emplace_back([=]() { thread_task_put(db, i); });
}
// 等待所有线程完成
for (auto& th : threads) {
if (th.joinable()) {
th.join();
}
}
// check if all the results are true
for(const auto& result : results){
EXPECT_EQ(result, true);
}
threads2.emplace_back([=]() { thread_task_deleteindex(db, 200); });
// 启动100个线程,每个线程都运行thread_task函数
for (int i = 201; i <= thread_num+200; ++i) {
// 创建随机设备用于种子
std::random_device rd;
// 使用随机设备创建一个均匀分布的随机数生成器
std::uniform_int_distribution<int> dist(0, 1);
// 根据随机数选择执行哪个代码块
if (dist(rd) == 0) {
threads2.emplace_back([=]() { thread_task_delete(db, i); });
} else {
threads2.emplace_back([=]() { thread_task_put(db, i); });
}
// threads2.emplace_back([=]() { thread_task_put(db, i); });
}
// 等待所有线程完成
for (auto& th : threads2) {
if (th.joinable()) {
th.join();
}
}
// check if all the results are true
for(const auto& result : results){
EXPECT_EQ(result, true);
}
delete db;
}
TEST(TestNewDB, ConcurrencyPutSameKeyTest) {
// 创建 NewDB 实例
NewDB* db;
ASSERT_TRUE(OpenNewDB(testdbname, &db).ok());
// 批量插入数据
const int num_records = 1;
for (int i = 0; i < num_records; ++i) {
// std::string key = "k_" + GenerateRandomString(20);
std::string key = "k_" + std::to_string(i);
FieldArray fields = {
{"name", GenerateRandomString(5)},
{"address", GenerateRandomString(15)},
{"phone", GenerateRandomString(11)}
};
ASSERT_TRUE(db->Put_fields(WriteOptions(), key, fields).ok());
}
db->DeleteIndex("name");
db->CreateIndexOnField("name");
// 创建线程向量用于保存所有线程对象
std::vector<std::thread> threads;
std::string name;
for (int i = 1; i <= 1; ++i) {
std::string ikey = "k_" + std::to_string(i);
name = "bob";
threads.emplace_back([=]() { thread_task_put_key(db, i, ikey, name); });
name = "john";
threads.emplace_back([=]() { thread_task_put_key(db, i, ikey, name); });
// 等待所有线程完成
for (auto& th : threads) {
if (th.joinable()) {
th.join();
}
}
// check data and index
FieldArray retrieved_fields;
Status s = db->Get_fields(ReadOptions(), Slice(ikey), &retrieved_fields);
ASSERT_TRUE(s.ok());
std::string fieldvalue = retrieved_fields[0].second;
Field field_to_query{"name", fieldvalue};
// fieldvalue should be found by index
std::vector<std::string> matching_keys = db->QueryByIndex(field_to_query);
ASSERT_FALSE(matching_keys.empty());
ASSERT_TRUE(std::find(matching_keys.begin(), matching_keys.end(), ikey) != matching_keys.end());
}
delete db;
}
int main(int argc, char** argv) {
// 设置全局随机种子
SetGlobalSeed(static_cast<unsigned>(time(nullptr)));
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}