Browse Source

测试框架和并发测试,并跑通(没有实现delete和恢复)

gyf
augurier 8 months ago
parent
commit
32d8a45989
8 changed files with 496 additions and 181 deletions
  1. +11
    -2
      CMakeLists.txt
  2. +32
    -46
      fielddb/field_db.cpp
  3. +13
    -5
      fielddb/field_db.h
  4. +80
    -24
      fielddb/request.cpp
  5. +4
    -2
      fielddb/request.h
  6. +11
    -102
      test/basic_function_test.cc
  7. +182
    -0
      test/helper.cc
  8. +163
    -0
      test/parallel_test.cc

+ 11
- 2
CMakeLists.txt View File

@ -194,6 +194,10 @@ target_sources(leveldb
"util/serialize_value.cc"
"fielddb/field_db.cpp"
"fielddb/field_db.h"
"fielddb/meta.cpp"
"fielddb/meta.h"
"fielddb/request.cpp"
"fielddb/request.h"
# Only CMake 3.3+ supports PUBLIC sources in targets exported by "install".
$<$<VERSION_GREATER:CMAKE_VERSION,3.2>:PUBLIC>
@ -522,7 +526,12 @@ if(LEVELDB_INSTALL)
)
endif(LEVELDB_INSTALL)
add_executable(lab1_test
add_executable(basic_function_test
"${PROJECT_SOURCE_DIR}/test/basic_function_test.cc"
)
target_link_libraries(lab1_test PRIVATE leveldb gtest)
target_link_libraries(basic_function_test PRIVATE leveldb gtest)
add_executable(parallel_test
"${PROJECT_SOURCE_DIR}/test/parallel_test.cc"
)
target_link_libraries(parallel_test PRIVATE leveldb gtest)

+ 32
- 46
fielddb/field_db.cpp View File

@ -14,6 +14,7 @@
#include "util/serialize_value.h"
#include "fielddb/encode_index.h"
#include "fielddb/meta.h"
#include "field_db.h"
namespace fielddb {
using namespace leveldb;
@ -61,7 +62,7 @@ Request *FieldDB::GetHandleInterval() {
mutex_.AssertHeld(); //保证队列是互斥访问的
Request *tail = taskqueue_.front();
for(auto *req_ptr : taskqueue_) {
if(req_ptr->isDeleteReq() || req_ptr->isiCreateReq()) {
if(req_ptr->isiDeleteReq() || req_ptr->isiCreateReq()) {
return tail;
}
tail = req_ptr;
@ -83,6 +84,7 @@ Again:
WriteBatch KVBatch,IndexBatch,MetaBatch;
Status status;
if(!tail->isiCreateReq() && !tail->isiDeleteReq()) {
// int debug = tail->type_;
//表明这一个区间并没有涉及index的创建删除
{
//1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。
@ -97,9 +99,12 @@ Again:
mutex_.Unlock();
WriteOptions op;
status = metaDB_->Write(op, &MetaBatch);
assert(status.ok());
//TODO:index的写入需要在另外一个线程中同时完成
status = indexDB_->Write(op, &IndexBatch);
assert(status.ok());
status = kvDB_->Write(op, &KVBatch);
assert(status.ok());
//3. 将meta数据清除
MetaCleaner cleaner;
cleaner.Collect(MetaBatch);
@ -113,13 +118,13 @@ Again:
while(true) {
Request *ready = taskqueue_.front();
// int debug = tail->type_;
taskqueue_.pop_front();
//当前ready不是队首,不是和index的创建有关
if(ready != &req && !ready->isPending() &&
!req.isiCreateReq() && !req.isiDeleteReq()) {
if(!ready->isPending() && !req.isiCreateReq() && !req.isiDeleteReq()) {
ready->s = status;
ready->done = true;
ready->cond_.Signal();
if (ready != &req) ready->cond_.Signal();
}
if (ready == tail) break;
}
@ -159,11 +164,11 @@ Status FieldDB::PutFields(const WriteOptions &Options,
// todo: 删除有索引的key时indexdb也要同步
Status FieldDB::Delete(const WriteOptions &options, const Slice &key) {
//
std::string key_ = key.ToString();
DeleteReq req(&key_,&mutex_);
Status status = HandleRequest(req);
return status;
// return kvDB_->Delete(options, key);
// std::string key_ = key.ToString();
// DeleteReq req(&key_,&mutex_);
// Status status = HandleRequest(req);
// return status;
return kvDB_->Delete(options, key);
}
// TODO:根据updates里面的东西,要对是否需要更新index进行分别处理
Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) {
@ -207,25 +212,6 @@ std::vector> FieldDB::FindKeysAndValByFieldN
}
Status FieldDB::CreateIndexOnField(const std::string& field_name) {
//taskQueue相关
//写锁 是不是只需要给putfields设置一把锁就行
// std::vector<std::pair<std::string, std::string>> keysAndVal =
// FindKeysAndValByFieldName(field_name);
// WriteBatch writeBatch;
// Slice value = Slice();
// for (auto &kvPair : keysAndVal){
// std::string indexKey;
// AppendIndexKey(&indexKey,
// ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second));
// writeBatch.Put(indexKey, value);
// }
// Status s = indexDB_->Write(WriteOptions(), &writeBatch);
// if (!s.ok()) return s;
// index_[field_name].first = Exist;
// //唤醒taskqueue
// return s;
std::string Field = field_name;
iCreateReq req(&Field,&mutex_);
HandleRequest(req);
@ -241,23 +227,6 @@ Status FieldDB::CreateIndexOnField(const std::string& field_name) {
}
Status FieldDB::DeleteIndex(const std::string &field_name) {
//taskQueue相关
//写锁
// std::vector<std::pair<std::string, std::string>> keysAndVal =
// FindKeysAndValByFieldName(field_name);
// WriteBatch writeBatch;
// for (auto &kvPair : keysAndVal){
// std::string indexKey;
// AppendIndexKey(&indexKey,
// ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second));
// writeBatch.Delete(indexKey);
// }
// Status s = indexDB_->Write(WriteOptions(), &writeBatch);
// if (!s.ok()) return s;
// index_.erase(field_name);
// //唤醒taskqueue
// return s;
std::string Field = field_name;
iDeleteReq req(&Field,&mutex_);
HandleRequest(req);
@ -299,6 +268,12 @@ std::vector FieldDB::QueryByIndex(const Field &field, Status *s) {
return result;
}
IndexStatus FieldDB::GetIndexStatus(const std::string &fieldName){
if (index_.count(fieldName) == 0) return IndexStatus::NotExist;
IndexStatus idxs = index_[fieldName].first;
return idxs;
}
Iterator * FieldDB::NewIterator(const ReadOptions &options) {
return kvDB_->NewIterator(options);
}
@ -327,4 +302,15 @@ void FieldDB::CompactRange(const Slice *begin, const Slice *end) {
kvDB_->CompactRange(begin, end);
}
} // end of namespace
Status DestroyDB(const std::string& name, const Options& options) {
Status s;
s = leveldb::DestroyDB(name+"_kvDB", options);
assert(s.ok());
s = leveldb::DestroyDB(name+"_indexDB", options);
assert(s.ok());
s = leveldb::DestroyDB(name+"_metaDB", options);
assert(s.ok());
return s;
}
} // namespace fielddb

+ 13
- 5
fielddb/field_db.h View File

@ -15,6 +15,14 @@
# define FIELD_DB_H
namespace fielddb {
using namespace leveldb;
enum IndexStatus{
Creating,
Deleting,
Exist,
NotExist
};
class FieldDB : DB {
public:
friend class Request;
@ -43,6 +51,8 @@ public:
Status CreateIndexOnField(const std::string& field_name);
Status DeleteIndex(const std::string &field_name);
std::vector<std::string> QueryByIndex(const Field &field, Status *s);
//
IndexStatus GetIndexStatus(const std::string &fieldName);
static Status OpenFieldDB(const Options& options,const std::string& name,FieldDB** dbptr);
@ -59,11 +69,6 @@ private:
leveldb::DB *indexDB_;
leveldb::DB *kvDB_;
enum IndexStatus{
Creating,
Deleting,
Exist
};
using FieldName = std::string;
// index的状态,creating/deleting
std::map<FieldName, std::pair<IndexStatus,Request*>> index_;
@ -80,5 +85,8 @@ private:
Request *GetHandleInterval(); //
};
Status DestroyDB(const std::string& name,
const Options& options);
} // end of namespace
# endif

+ 80
- 24
fielddb/request.cpp View File

@ -8,6 +8,7 @@
#include "fielddb/encode_index.h"
#include "fielddb/field_db.h"
#include "fielddb/meta.h"
#include "request.h"
namespace fielddb {
using namespace leveldb;
@ -52,13 +53,13 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
if(field_name == "") break;
if(DB->index_.count(field_name)) {
auto [index_status,parent_req] = DB->index_[field_name];
if(index_status == FieldDB::Creating || index_status == FieldDB::Deleting) {
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) {
parent_req->PendReq(this);
return;
} else if(index_status == FieldDB::Exist) {
} else if(index_status == IndexStatus::Exist) {
HasIndex = true;
}
assert(0);
//assert(0);
}
}
//2.对于没有冲突但含有索引操作的put,构建metaKV,这里直接将KV对简单编码后写入metaDB
@ -68,15 +69,16 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
MetaKV MKV = MetaKV(Key,&serialized);
MKV.Trans(MetaKey, MetaValue);
MetaBatch.Put(MetaKey, MetaValue);
}
//第三点是不是应该在这一分支中
//3.对于含有索引的field建立索引
for(auto [field_name,field_value] : *Fields) {
if(field_name == "") continue;
if(DB->index_.count(field_name)) {
std::string indexKey;
AppendIndexKey(&indexKey, ParsedInternalIndexKey(
*Key,field_name,field_value));
IndexBatch.Put(indexKey, Slice());
for(auto [field_name,field_value] : *Fields) {
if(field_name == "") continue;
if(DB->index_.count(field_name)) {
std::string indexKey;
AppendIndexKey(&indexKey, ParsedInternalIndexKey(
*Key,field_name,field_value));
IndexBatch.Put(indexKey, Slice());
}
}
}
}
@ -101,7 +103,7 @@ void iCreateReq::Prepare(FieldDB *DB) {
DB->index_mu.AssertHeld();
if(DB->index_.count(*Field)) {
auto [istatus,parent] = DB->index_[*Field];
if(istatus == FieldDB::Exist) {
if(istatus == IndexStatus::Exist) {
//如果已经完成建立索引,则返回成功
done = true;
Existed = true;
@ -114,22 +116,48 @@ void iCreateReq::Prepare(FieldDB *DB) {
}
//如果索引状态表中没有,则表示尚未创建,更新相应的状态
//这里将done设置为true表示在taskqueue中需要完成的部分已经完成,不需要pend
DB->index_[*Field] = {FieldDB::Creating,this};
DB->index_[*Field] = {IndexStatus::Creating,this};
done = true;
}
void iCreateReq::PendReq(Request *req) {
req->parent = this;
pending_list.push_back(req);
}
void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB)
{
//TODO:遍历数据库,构建二级索引到indexbatch,并且更新metaDB中的元数据为Index类型的(Field,Creating)
//这里或许不需要在metaDB中先写一遍?
//遍历数据库,构建二级索引到indexbatch,(更新metaDB中的元数据为Index类型的(Field,Creating))
//一个indexwritebatch写入,那么索引创建删除应该和metadb没有交互
std::vector<std::pair<std::string, std::string>> keysAndVal =
DB->FindKeysAndValByFieldName(*Field);
Slice value = Slice();
for (auto &kvPair : keysAndVal){
std::string indexKey;
AppendIndexKey(&indexKey,
ParsedInternalIndexKey(kvPair.first, *Field, kvPair.second));
IndexBatch.Put(indexKey, value);
}
}
void iCreateReq::Finalize(FieldDB *DB) {
//TODO:
//1. 写入完成后,更新index状态表,并将metaDB的值改为Index类型的(Field,Existing)
//2. 将所有的pendinglist重新入队
//1. 写入完成后,更新index状态表,(并将metaDB的值改为Index类型的(Field,Existing))
MutexLock iL(&DB->index_mu);
DB->index_[*Field] = {IndexStatus::Exist, nullptr};
DB->index_mu.Unlock();
if (pending_list.empty()) return;
//2. 将所有的pendinglist重新入队
MutexLock L(&DB->mutex_);
for (auto req : pending_list){
DB->taskqueue_.push_back(req);
req->parent = req; //解绑
}
if (pending_list[0] == DB->taskqueue_.front()) {
pending_list[0]->cond_.Signal();
}
this->s = Status::OK();
}
/*******iDeleteReq*******/
@ -142,8 +170,8 @@ void iDeleteReq::Prepare(FieldDB *DB) {
return ;
}
auto [istatus,parent] = DB->index_[*Field];
if(istatus == FieldDB::Exist) {
DB->index_[*Field] = {FieldDB::Creating,this};
if(istatus == IndexStatus::Exist) {
DB->index_[*Field] = {IndexStatus::Deleting,this};
done = true;
} else {
//如果正在创建或者删除,那么pend到对应的请求上
@ -151,14 +179,42 @@ void iDeleteReq::Prepare(FieldDB *DB) {
}
}
void iDeleteReq::PendReq(Request* req) {
req->parent = this;
pending_list.push_back(req);
}
void iDeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB)
{
std::vector<std::pair<std::string, std::string>> keysAndVal =
DB->FindKeysAndValByFieldName(*Field);
Slice value = Slice();
for (auto &kvPair : keysAndVal){
std::string indexKey;
AppendIndexKey(&indexKey,
ParsedInternalIndexKey(kvPair.first, *Field, kvPair.second));
IndexBatch.Delete(indexKey);
}
}
void iDeleteReq::Finalize(FieldDB *DB) {
MutexLock iL(&DB->index_mu);
DB->index_.erase(*Field);
DB->index_mu.Unlock();
if (pending_list.empty()) return;
//2. 将所有的pendinglist重新入队
MutexLock L(&DB->mutex_);
for (auto req : pending_list){
DB->taskqueue_.push_back(req);
req->parent = req; //解绑
}
if (pending_list[0] == DB->taskqueue_.front()) {
pending_list[0]->cond_.Signal();
}
this->s = Status::OK();
}
}
} // namespace fielddb

+ 4
- 2
fielddb/request.h View File

@ -19,7 +19,7 @@ public:
friend class FieldDB;
enum RequestType {
FieldsReq_t,
ValueReq_t,
//ValueReq_t,
iCreateReq_t,
iDeleteReq_t,
DeleteReq_t,
@ -33,7 +33,7 @@ public:
Request(RequestType type,port::Mutex *mu):
type_(type),cond_(mu),done(false) { parent = this; };
virtual ~Request();
//virtual ~Request();
inline bool isFieldsReq() { return type_ == FieldsReq_t; }
// inline bool isValueReq() { return type_ == ValueReq_t; }
@ -93,6 +93,7 @@ public:
WriteBatch &MetaBatch,fielddb::FieldDB *DB) override;
void Prepare(FieldDB *DB) override;
void Finalize(FieldDB *DB) override;
void PendReq(Request *req) override;
bool Existed;
std::string *Field;
@ -109,6 +110,7 @@ public:
WriteBatch &MetaBatch,fielddb::FieldDB *DB) override;
void Prepare(FieldDB *DB) override;
void Finalize(FieldDB *DB) override;
void PendReq(Request *req) override;
bool Deleted;
std::string *Field;

+ 11
- 102
test/basic_function_test.cc View File

@ -2,138 +2,47 @@
// #include "leveldb/env.h"
// #include "leveldb/db.h"
#include "fielddb/field_db.h"
#include "test/helper.cc"
using namespace fielddb;
constexpr int value_size = 2048;
constexpr int data_size = 128 << 20;
std::vector<std::string> cities = {
"Beijing", "Shanghai", "Guangzhou", "Shenzhen", "Hangzhou",
"Chengdu", "Chongqing", "Wuhan", "Suzhou", "Tianjin"
};
std::vector<std::string> shanghaiKeys;
Status OpenDB(std::string dbName, FieldDB **db) {
Options options;
options.create_if_missing = true;
return FieldDB::OpenFieldDB(options, dbName, db);
}
void ClearDB(FieldDB *db){
//destroy和恢复没做前先用这个清理数据库,否则跑不同的数据多做几次测试会污染
WriteOptions writeOptions;
int key_num = data_size / value_size;
for (int i = 0; i < key_num; i++) {
int key_ = i+1;
std::string key = std::to_string(key_);
Status s = db->Delete(WriteOptions(), key);
ASSERT_TRUE(s.ok());
}
}
void InsertFieldData(FieldDB *db) {
WriteOptions writeOptions;
int key_num = data_size / value_size;
srand(0);
for (int i = 0; i < key_num; i++) {
int randThisTime = rand(); //确保读写一个循环只rand一次,否则随机序列会不一致
int key_ = randThisTime % key_num+1;
std::string key = std::to_string(key_);
std::string name = "customer#" + std::to_string(key_);
std::string address = cities[randThisTime % cities.size()];
FieldArray fields = {
{"name", name},
{"address", address}
};
if (address == "Shanghai") {
shanghaiKeys.push_back(key);
}
Status s = db->PutFields(WriteOptions(), key, fields);
ASSERT_TRUE(s.ok());
}
}
void GetFieldData(FieldDB *db) {
ReadOptions readOptions;
int key_num = data_size / value_size;
// 点查
srand(0);
for (int i = 0; i < 100; i++) {
int randThisTime = rand();
int key_ = randThisTime % key_num+1;
std::string key = std::to_string(key_);
FieldArray fields_ret;
Status s = db->GetFields(readOptions, key, &fields_ret);
ASSERT_TRUE(s.ok());
for (const Field& pairs : fields_ret) {
if (pairs.first == "name"){
} else if (pairs.first == "address"){
std::string city = pairs.second;
ASSERT_NE(std::find(cities.begin(), cities.end(), city), cities.end());
} else assert(false);
}
}
}
void findKeysByCity(FieldDB *db) {
Field field = {"address", "Shanghai"};
std::vector<std::string> resKeys = db->FindKeysByField(field);
std::cout << shanghaiKeys.size() << " " << resKeys.size() << std::endl;
for (const std::string &key : resKeys){
ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end());
}
}
void findKeysByCityIndex(FieldDB *db, bool expect) {
Field field = {"address", "Shanghai"};
Status s;
std::vector<std::string> resKeys = db->QueryByIndex(field, &s);
if (expect) ASSERT_TRUE(s.ok());
else {
ASSERT_TRUE(s.IsNotFound());
return;
}
std::cout << shanghaiKeys.size() << " " << resKeys.size() << std::endl;
for (const std::string &key : resKeys){
ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end());
}
}
TEST(TestLab1, Basic) {
// DestroyDB("testdb",Options());
fielddb::DestroyDB("testdb1.1",Options()); //每个测试前,先把对应名称的之前的数据库删了
FieldDB *db = new FieldDB();
if(OpenDB("testdb", &db).ok() == false) {
if(OpenDB("testdb1.1", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
InsertFieldData(db);
GetFieldData(db);
bool allowNotFound = false;
GetFieldData(db, allowNotFound);
findKeysByCity(db);
delete db;
}
TEST(TestLab2, Basic) {
//destroy
fielddb::DestroyDB("testdb1.2",Options());
FieldDB *db = new FieldDB();
if(OpenDB("testdb2", &db).ok() == false) {
if(OpenDB("testdb1.2", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
shanghaiKeys.clear();
age20Keys.clear();
InsertFieldData(db);
// GetFieldData(db);
// findKeysByCity(db);
db->CreateIndexOnField("address");
db->CreateIndexOnField("age");
findKeysByCityIndex(db, true);
findKeysByAgeIndex(db, true);
db->DeleteIndex("address");
findKeysByCityIndex(db, false);
findKeysByAgeIndex(db, true);
delete db;
}

+ 182
- 0
test/helper.cc View File

@ -0,0 +1,182 @@
#include "gtest/gtest.h"
// #include "leveldb/env.h"
// #include "leveldb/db.h"
#include "fielddb/field_db.h"
#include <random>
using namespace fielddb;
constexpr int value_size = 2048;
constexpr int data_size = 128 << 20;
#define AGE_RANGE 100
std::vector<std::string> cities = {
"Beijing", "Shanghai", "Guangzhou", "Shenzhen", "Hangzhou",
"Chengdu", "Chongqing", "Wuhan", "Suzhou", "Tianjin"
};
//检查insert和queryByIndex的数据是否对应
std::set<std::string> shanghaiKeys;
std::set<std::string> age20Keys;
//复杂的测试要注意这两个全局变量,目前只有InsertFieldData和InsertOneField会往里加,并且没有清理
Status OpenDB(std::string dbName, FieldDB **db) {
Options options;
options.create_if_missing = true;
return FieldDB::OpenFieldDB(options, dbName, db);
}
// void ClearDB(FieldDB *db){
// //destroy和恢复没做前先用这个清理数据库,否则跑不同的数据多做几次测试会污染
// WriteOptions writeOptions;
// int key_num = data_size / value_size;
// for (int i = 0; i < key_num; i++) {
// int key_ = i+1;
// std::string key = std::to_string(key_);
// Status s = db->Delete(WriteOptions(), key);
// ASSERT_TRUE(s.ok());
// }
// }
//只插一条特定数据的测试
void InsertOneField(FieldDB *db, std::string key = "0") {
WriteOptions writeOptions;
FieldArray fields = {
{"name", "special#" + key},
{"address", "Shanghai"},
{"age", "20"}
};
Status s = db->PutFields(WriteOptions(), key, fields);
ASSERT_TRUE(s.ok());
shanghaiKeys.insert(key);
age20Keys.insert(key);
}
//与上面对应
void GetOneField(FieldDB *db, std::string key = "0") {
ReadOptions readOptions;
FieldArray fields_ret;
Status s = db->GetFields(readOptions, key, &fields_ret);
ASSERT_TRUE(s.ok());
for (const Field& pairs : fields_ret) {
if (pairs.first == "name"){
ASSERT_EQ(pairs.second, "special#" + key);
} else if (pairs.first == "address"){
ASSERT_EQ(pairs.second, "Shanghai");
} else if (pairs.first == "age"){
ASSERT_EQ(pairs.second, "20");
} else assert(false);
}
}
void InsertFieldData(FieldDB *db, int seed = 0/*随机种子*/) {
std::cout << "-------inserting-------" << std::endl;
WriteOptions writeOptions;
int key_num = data_size / value_size;
// srand线程不安全,这种可以保证多线程时随机序列也一致
std::mt19937 rng(seed);
for (int i = 0; i < key_num; i++) {
int randThisTime = rng(); //确保读写一个循环只rand一次,否则随机序列会不一致
//让批量写入的key>0, 单独写入的key<=0,方便测试观察
int key_ = std::abs(randThisTime) % key_num + 1;
std::string key = std::to_string(key_);
std::string name = "customer#" + std::to_string(key_);
std::string address = cities[randThisTime % cities.size()];
std::string age = std::to_string(std::abs(randThisTime) % AGE_RANGE);
FieldArray fields = {
{"name", name},
{"address", address},
{"age", age}
};
if (address == "Shanghai") {
shanghaiKeys.insert(key);
}
if (age == "20") {
age20Keys.insert(key);
}
Status s = db->PutFields(WriteOptions(), key, fields);
ASSERT_TRUE(s.ok());
}
}
//并发时不一定能读到,加个参数控制
void GetFieldData(FieldDB *db, bool allowNotFound, int seed = 0) {
std::cout << "-------getting-------" << std::endl;
ReadOptions readOptions;
int key_num = data_size / value_size;
// 点查
std::mt19937 rng(seed);
for (int i = 0; i < 100; i++) {
int randThisTime = rng();
int key_ = std::abs(randThisTime) % key_num + 1;
std::string key = std::to_string(key_);
FieldArray fields_ret;
Status s = db->GetFields(readOptions, key, &fields_ret);
if (!allowNotFound){ //必须读到
// if (!s.ok()){
// std::cout << key << std::endl;
// }
ASSERT_TRUE(s.ok());
} else { //不必须读到,但只要读到address必须正确
if(s.IsNotFound()) continue;
}
for (const Field& pairs : fields_ret) {
if (pairs.first == "name"){
} else if (pairs.first == "address"){
std::string city = pairs.second;
ASSERT_NE(std::find(cities.begin(), cities.end(), city), cities.end());
} else if (pairs.first == "age"){
int age = std::stoi(pairs.second);
ASSERT_TRUE(age >= 0 && age < AGE_RANGE);
} else assert(false);
}
}
}
void findKeysByCity(FieldDB *db) {
std::cout << "-------getting field address-------" << std::endl;
Field field = {"address", "Shanghai"};
std::vector<std::string> resKeys = db->FindKeysByField(field);
//打印比较,因为shanghaikey可能被后写入的、其他address的key覆盖,打印出的后一个数应该小于前一个数
//如果随机种子相同,每次打印出的两个数也应该相同
std::cout << "address: " << shanghaiKeys.size() << " " << resKeys.size() << std::endl;
for (const std::string &key : resKeys){
ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end());
}
}
// haveIndex表明数据库有没有该索引(address)
void findKeysByCityIndex(FieldDB *db, bool haveIndex) {
std::cout << "-------getting field address by index-------" << std::endl;
Field field = {"address", "Shanghai"};
Status s;
std::vector<std::string> resKeys = db->QueryByIndex(field, &s);
if (haveIndex) ASSERT_TRUE(s.ok());
else {
ASSERT_TRUE(s.IsNotFound());
return;
}
std::cout << "address: " << shanghaiKeys.size() << " " << resKeys.size() << std::endl;//打印比较
for (const std::string &key : resKeys){
ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end());
}
}
void findKeysByAgeIndex(FieldDB *db, bool haveIndex) {
std::cout << "-------getting field age by index-------" << std::endl;
Field field = {"age", "20"};
Status s;
std::vector<std::string> resKeys = db->QueryByIndex(field, &s);
if (haveIndex) ASSERT_TRUE(s.ok());
else {
ASSERT_TRUE(s.IsNotFound());
return;
}
std::cout << "age: " << age20Keys.size() << " " << resKeys.size() << std::endl;
for (const std::string &key : resKeys){
ASSERT_NE(std::find(age20Keys.begin(), age20Keys.end(), key), age20Keys.end());
}
}

+ 163
- 0
test/parallel_test.cc View File

@ -0,0 +1,163 @@
#include "gtest/gtest.h"
#include <thread>
// #include "leveldb/env.h"
// #include "leveldb/db.h"
#include "fielddb/field_db.h"
#include "test/helper.cc"
using namespace fielddb;
// 测试中read/write都表示带索引的读写
//读写有索引数据的并发
TEST(TestReadWrite, Parallel) {
fielddb::DestroyDB("testdb2.1",Options());
FieldDB *db = new FieldDB();
if(OpenDB("testdb2.1", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
int thread_num_ = 5;
std::vector<std::thread> threads(thread_num_);
//二写三读
for (size_t i = 0; i < thread_num_; i++)
{
if (i == 0) {//写随机序列0
threads[i] = std::thread(InsertFieldData, db, 0);
} else if (i == 1)
{//写随机序列1
threads[i] = std::thread(InsertFieldData, db, 1);
} else {//读
bool allowNotFound = true;
threads[i] = std::thread(GetFieldData, db, allowNotFound, 0);
}
}
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
// 此时写已完成,一定能读到两次写
bool allowNotFound = false;
GetFieldData(db, allowNotFound);
GetFieldData(db, allowNotFound, 1);
findKeysByCity(db);
delete db;
}
//创建索引与写有该索引数据的并发
TEST(TestWriteCreatei, Parallel) {
fielddb::DestroyDB("testdb2.2",Options());
FieldDB *db = new FieldDB();
if(OpenDB("testdb2.2", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
shanghaiKeys.clear();
InsertFieldData(db);
int thread_num_ = 2;
std::vector<std::thread> threads(thread_num_);
for (size_t i = 0; i < thread_num_; i++)
{
if (i == 0) {//创建索引
threads[i] = std::thread([db](){
db->CreateIndexOnField("address");
std::cout << "finish create index\n";
});
} else {//写
threads[i] = std::thread([db](){
while (db->GetIndexStatus("address") == NotExist){
continue; //开始创建了再并发的写
}
InsertOneField(db); //先插一条
});
}
}
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
//检查索引是否创建成功
bool haveIndex = true;
findKeysByCityIndex(db, haveIndex);
//检查写入是否成功
GetOneField(db);
delete db;
}
//创建删除不同索引的并发
TEST(TestCreateiCreatei, Parallel) {
fielddb::DestroyDB("testdb2.3",Options());
FieldDB *db = new FieldDB();
if(OpenDB("testdb2.3", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
shanghaiKeys.clear();
age20Keys.clear();
InsertFieldData(db);
int thread_num_ = 3;
std::vector<std::thread> threads(thread_num_);
for (size_t i = 0; i < thread_num_; i++)
{
//3线程并发创建索引address
threads[i] = std::thread([db](){
db->CreateIndexOnField("address");
std::cout << "finish create index address\n";
});
}
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
//检查索引是否创建成功
bool haveIndex = true;
findKeysByCityIndex(db, haveIndex);
findKeysByAgeIndex(db, false);
for (size_t i = 0; i < thread_num_; i++)
{
if (i == 0 || i == 1) {//2线程删除索引address
threads[i] = std::thread([db](){
db->DeleteIndex("address");
std::cout << "finish delete index address\n";
});
} else {//1线程创建索引age
threads[i] = std::thread([db](){
db->CreateIndexOnField("age");
std::cout << "finish create index age\n";
});
}
}
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
//检查
findKeysByCityIndex(db, false);
findKeysByAgeIndex(db, true);
delete db;
}
int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits.
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

Loading…
Cancel
Save