Parcourir la source

Merge commit '23b603dda024e20c6e7238ef5f132be2e3ad59d8' into cyq

gyf
cyq il y a 8 mois
Parent
révision
91eaddd125
10 fichiers modifiés avec 391 ajouts et 42 suppressions
  1. +5
    -0
      CMakeLists.txt
  2. +17
    -9
      fielddb/field_db.cpp
  3. +1
    -0
      fielddb/field_db.h
  4. +31
    -19
      fielddb/request.cpp
  5. +6
    -6
      fielddb/request.h
  6. +3
    -0
      test/basic_function_test.cc
  7. +68
    -6
      test/helper.cc
  8. +34
    -0
      test/helper.h
  9. +141
    -2
      test/parallel_test.cc
  10. +85
    -0
      test/recover_test.cc

+ 5
- 0
CMakeLists.txt Voir le fichier

@ -535,3 +535,8 @@ add_executable(parallel_test
"${PROJECT_SOURCE_DIR}/test/parallel_test.cc"
)
target_link_libraries(parallel_test PRIVATE leveldb gtest)
add_executable(recover_test
"${PROJECT_SOURCE_DIR}/test/recover_test.cc"
)
target_link_libraries(recover_test PRIVATE leveldb gtest)

+ 17
- 9
fielddb/field_db.cpp Voir le fichier

@ -60,11 +60,11 @@ Status FieldDB::Recover() {
std::string IndexKey;
Iter->SeekToFirst();
while(Iter->Valid()) {
IndexKey = Iter->value().ToString();
IndexKey = Iter->key().ToString();
ParsedInternalIndexKey ParsedIndex;
ParseInternalIndexKey(Slice(IndexKey),&ParsedIndex);
index_[ParsedIndex.name_.ToString()] = {Exist,nullptr};
std::cout << "Existed Index : " << ParsedIndex.name_.ToString() << std::endl;
//std::cout << "Existed Index : " << ParsedIndex.name_.ToString() << std::endl;
//构建下一个搜索的对象,在原来的fieldname的基础上加一个最大的ascii字符(不可见字符)
//TODO:不知道这个做法有没有道理
@ -82,15 +82,18 @@ Status FieldDB::Recover() {
MetaValue = Iter->key();
MetaType type = MetaType(DecodeFixed32(MetaValue.data()));
MetaValue.remove_prefix(4);//移除头上的metaType的部分
Slice extractKey;
GetLengthPrefixedSlice(&MetaValue, &extractKey);
if(type == KV_Creating) {
FieldArray fields;
ParseValue(Iter->value().ToString(), &fields);
PutFields(WriteOptions(), MetaValue, fields);
PutFields(WriteOptions(), extractKey, fields);
} else if(type == KV_Deleting) {
Delete(WriteOptions(), MetaValue);
Delete(WriteOptions(), extractKey);
} else {
assert(0 && "Invalid MetaType");
}
Iter->Next();
}
delete Iter;
//在所有的请求完成后,会自动把metaDB的内容清空。
@ -126,7 +129,7 @@ Again:
}
Request *tail = GetHandleInterval();
WriteBatch KVBatch,IndexBatch,MetaBatch;
std::unordered_set<std::string *> batchKeySet;
std::unordered_set<std::string> batchKeySet;
Status status;
if(!tail->isiCreateReq() && !tail->isiDeleteReq()) {
//表明这一个区间并没有涉及index的创建删除
@ -183,8 +186,7 @@ Again:
// return status;
}
//这里把一个空串作为常规put的name
// 这里把一个空串作为常规put的name
Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) {
FieldArray FA = {{"",value.ToString()}};
return PutFields(options, key, FA);
@ -269,7 +271,7 @@ Status FieldDB::CreateIndexOnField(const std::string& field_name) {
return req.s;
}
WriteBatch KVBatch,IndexBatch,MetaBatch;
std::unordered_set<std::string *> useless;
std::unordered_set<std::string> useless;
req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless);
indexDB_->Write(WriteOptions(), &IndexBatch);
req.Finalize(this);
@ -285,7 +287,7 @@ Status FieldDB::DeleteIndex(const std::string &field_name) {
return req.s;
}
WriteBatch KVBatch,IndexBatch,MetaBatch;
std::unordered_set<std::string *> useless;
std::unordered_set<std::string> useless;
req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless);
indexDB_->Write(WriteOptions(), &IndexBatch);
req.Finalize(this);
@ -364,4 +366,10 @@ Status DestroyDB(const std::string& name, const Options& options) {
return s;
}
FieldDB::~FieldDB() {
delete indexDB_;
delete kvDB_;
delete metaDB_;
}
} // namespace fielddb

+ 1
- 0
fielddb/field_db.h Voir le fichier

@ -33,6 +33,7 @@ public:
//FieldDB *db = new FieldDB()openDB *db
FieldDB() : indexDB_(nullptr), kvDB_(nullptr), metaDB_(nullptr) {};
~FieldDB();
/*lab1的要求,作为db派生类要实现的虚函数*/
Status Put(const WriteOptions &options, const Slice &key, const Slice &value) override;
Status PutFields(const WriteOptions &, const Slice &key, const FieldArray &fields) override;

+ 31
- 19
fielddb/request.cpp Voir le fichier

@ -23,7 +23,8 @@ void Request::PendReq(Request *req) {
//为虚函数提供最基本的实现
void Request::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet)
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
{
assert(0);
}
@ -45,12 +46,13 @@ bool Request::isPending() {
/*******FieldsReq*******/
void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet)
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
{
if (batchKeySet.find(Key) != batchKeySet.end()){
if (batchKeySet.find(*Key) != batchKeySet.end()){
return;//并发的被合并的put/delete请求只处理一次
} else {
batchKeySet.insert(Key);
batchKeySet.insert(*Key);
}
std::string val_str;
Status s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str);
@ -64,8 +66,6 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
assert(0);
}
KVBatch.Put(Slice(*Key), Slice(SerializeValue(*Fields)));
bool HasIndex = false;
bool HasOldIndex = false;
{
@ -101,7 +101,8 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
}
}
}
KVBatch.Put(Slice(*Key), Slice(SerializeValue(*Fields)));
//2.对于没有冲突但含有索引操作的put,构建metaKV,这里直接将KV对简单编码后写入metaDB
if(HasIndex || HasOldIndex) {
std::string MetaKey,MetaValue;
@ -147,12 +148,13 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
/*******DeleteReq*******/
void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet)
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
{
if (batchKeySet.find(Key) != batchKeySet.end()){
if (batchKeySet.find(*Key) != batchKeySet.end()){
return;//并发的被合并的put/delete请求只处理一次
} else {
batchKeySet.insert(Key);
batchKeySet.insert(*Key);
}
//1. 读取当前的最新的键值对,判断是否存在含有键值对的field
//2.1 如果无,则正常构造delete
@ -160,10 +162,10 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
//在kvDB和metaDB中写入对应的delete
//2.3 如果存在field的索引状态是Creating或者Deleting,那么在那个队列上面进行等待
std::string val_str;
DB->kvDB_->Get(ReadOptions(), *Key, &val_str);
Status s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str);
if (s.IsNotFound()) return;
FieldArray *Fields = new FieldArray;
ParseValue(val_str,Fields);
KVBatch.Delete(Slice(*Key));
bool HasIndex = false;
{
@ -183,6 +185,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
//assert(0);
}
}
KVBatch.Delete(Slice(*Key));
//2.对于没有冲突但含有索引操作的delete,构建metaKV,这里直接将KV对简单编码后写入metaDB
if(HasIndex) {
std::string MetaKey;
@ -233,7 +236,8 @@ void iCreateReq::PendReq(Request *req) {
}
void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet)
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
{
//遍历数据库,构建二级索引到indexbatch,(更新metaDB中的元数据为Index类型的(Field,Creating))
//一个indexwritebatch写入,那么索引创建删除应该和metadb没有交互
@ -282,7 +286,7 @@ void iDeleteReq::Prepare(FieldDB *DB) {
done = true;
} else {
//如果正在创建或者删除,那么pend到对应的请求上
parent->PendReq(this);
parent->PendReq(this->parent);
}
}
@ -292,7 +296,7 @@ void iDeleteReq::PendReq(Request* req) {
}
void iDeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet)
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet)
{
std::vector<std::pair<std::string, std::string>> keysAndVal =
DB->FindKeysAndValByFieldName(*Field);
@ -330,9 +334,17 @@ BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu):
void Put(const Slice &key, const Slice &value) override {
//为key和value构造存储的地方,防止由于string的析构造成可能得内存访问错误
str_buf->push_back(key.ToString());
fa_buf->push_back({{"",value.ToString()}});
FieldArray *field = new FieldArray;
field = ParseValue(value.ToString(), field);
if (field == nullptr){ //batch中的value没有field
fa_buf->push_back({{"",value.ToString()}});
} else {
fa_buf->push_back(*field);
}
sub_requests->emplace_back(new FieldsReq(&str_buf->back(),&fa_buf->back(),mu));
sub_requests->back()->parent = req;
delete field;
}
void Delete(const Slice &key) override {
str_buf->push_back(key.ToString());
@ -366,10 +378,10 @@ BatchReq::~BatchReq() {
}
void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet)
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet)
{
WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch;
std::unordered_set<std::string *> Sub_batchKeySet;
std::unordered_set<std::string> Sub_batchKeySet;
//由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代
for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) {
(*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet);
@ -382,7 +394,7 @@ void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
KVBatch.Append(Sub_KVBatch);
IndexBatch.Append(Sub_IndexBatch);
MetaBatch.Append(Sub_MetaBatch);
batchKeySet.insert(batchKeySet.begin(),batchKeySet.end());
batchKeySet.insert(Sub_batchKeySet.begin(),Sub_batchKeySet.end());
}

+ 6
- 6
fielddb/request.h Voir le fichier

@ -46,7 +46,7 @@ public:
//Fields的
virtual void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet);
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet);
//icreate和idelete在队列中的注册当前状态
virtual void Prepare(FieldDB *DB);
virtual void Finalize(FieldDB *DB);
@ -68,7 +68,7 @@ public:
Key(Key),Fields(Fields),Request(FieldsReq_t,mu) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
std::string *Key;
FieldArray *Fields;
@ -93,7 +93,7 @@ public:
Field(Field),Request(iCreateReq_t, mu),Existed(false) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
void Prepare(FieldDB *DB) override;
void Finalize(FieldDB *DB) override;
void PendReq(Request *req) override;
@ -110,7 +110,7 @@ public:
Field(Field),Request(iDeleteReq_t, mu),Deleted(false) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
void Prepare(FieldDB *DB) override;
void Finalize(FieldDB *DB) override;
void PendReq(Request *req) override;
@ -127,7 +127,7 @@ public:
Key(Key),Request(DeleteReq_t,mu) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
std::string *Key;
};
@ -138,7 +138,7 @@ public:
~BatchReq();
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
WriteBatch *Batch;
std::deque<Request *> sub_requests;

+ 3
- 0
test/basic_function_test.cc Voir le fichier

@ -57,6 +57,9 @@ TEST(TestLab2, Basic) {
std::vector<std::string> resKeys = db->QueryByIndex(field, &s);
ASSERT_EQ(resKeys.size(), 0);
WriteFieldData(db);
GetFieldData(db, false);
findKeysByAgeIndex(db, true);
delete db;
}

+ 68
- 6
test/helper.cc Voir le fichier

@ -3,6 +3,7 @@
// #include "leveldb/db.h"
#include "fielddb/field_db.h"
#include <random>
#include "helper.h"
using namespace fielddb;
constexpr int value_size = 2048;
@ -13,10 +14,13 @@ std::vector cities = {
"Chengdu", "Chongqing", "Wuhan", "Suzhou", "Tianjin"
};
//检查insert和queryByIndex的数据是否对应
std::set<std::string> shanghaiKeys;
std::set<std::string> age20Keys;
//封装了一个线程安全的全局set
ThreadSafeSet shanghaiKeys;
ThreadSafeSet age20Keys;
//复杂的测试要注意这两个全局变量,
//目前只有InsertFieldData和InsertOneField会往里加,DeleteFieldData会全部清空,
//目前只有InsertFieldData和InsertOneField和writeFieldData会往里加,
//DeleteFieldData和InsertOneField会删除,
//其他测试之间有必要手动clear
Status OpenDB(std::string dbName, FieldDB **db) {
@ -51,6 +55,15 @@ void InsertOneField(FieldDB *db, std::string key = "0") {
age20Keys.insert(key);
}
//只删一条特定数据的测试
void DeleteOneField(FieldDB *db, std::string key = "0") {
WriteOptions writeOptions;
Status s = db->Delete(WriteOptions(), key);
ASSERT_TRUE(s.ok());
shanghaiKeys.erase(key);
age20Keys.erase(key);
}
//与上面对应
void GetOneField(FieldDB *db, std::string key = "0") {
ReadOptions readOptions;
@ -119,6 +132,41 @@ void DeleteFieldData(FieldDB *db, int seed = 0/*随机种子*/) {
}
}
void WriteFieldData(FieldDB *db, int seed = 0/*随机种子*/) {
std::cout << "-------writing-------" << std::endl;
WriteOptions writeOptions;
int key_num = data_size / value_size;
// srand线程不安全,这种可以保证多线程时随机序列也一致
std::mt19937 rng(seed);
WriteBatch wb;
for (int i = 0; i < key_num; i++) {
int randThisTime = rng(); //确保读写一个循环只rand一次,否则随机序列会不一致
//让批量写入的key>0, 单独写入的key<=0,方便测试观察
int key_ = std::abs(randThisTime) % key_num + 1;
std::string key = std::to_string(key_);
std::string name = "customer#" + std::to_string(key_);
std::string address = cities[randThisTime % cities.size()];
std::string age = std::to_string(std::abs(randThisTime) % AGE_RANGE);
FieldArray fields = {
{"name", name},
{"address", address},
{"age", age}
};
if (address == "Shanghai") {
shanghaiKeys.insert(key);
}
if (age == "20") {
age20Keys.insert(key);
}
wb.Put(key, SerializeValue(fields));
}
Status s = db->Write(writeOptions, &wb);
ASSERT_TRUE(s.ok());
}
//并发时不一定能读到,加个参数控制
void GetFieldData(FieldDB *db, bool allowNotFound, int seed = 0) {
std::cout << "-------getting-------" << std::endl;
@ -183,7 +231,7 @@ void findKeysByCity(FieldDB *db) {
//如果随机种子相同,每次打印出的两个数也应该相同
std::cout << "address: " << shanghaiKeys.size() << " " << resKeys.size() << std::endl;
for (const std::string &key : resKeys){
ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end());
ASSERT_TRUE(shanghaiKeys.haveKey(key));
}
}
@ -200,7 +248,7 @@ void findKeysByCityIndex(FieldDB *db, bool haveIndex) {
}
std::cout << "address: " << shanghaiKeys.size() << " " << resKeys.size() << std::endl;//打印比较
for (const std::string &key : resKeys){
ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end());
ASSERT_TRUE(shanghaiKeys.haveKey(key));
}
}
@ -216,6 +264,20 @@ void findKeysByAgeIndex(FieldDB *db, bool haveIndex) {
}
std::cout << "age: " << age20Keys.size() << " " << resKeys.size() << std::endl;
for (const std::string &key : resKeys){
ASSERT_NE(std::find(age20Keys.begin(), age20Keys.end(), key), age20Keys.end());
ASSERT_TRUE(age20Keys.haveKey(key));
}
}
void checkDataInKVAndIndex(FieldDB *db, std::string fieldName = "address") {
Field field;
if (fieldName == "address") field = {"address", "Shanghai"};
else if (fieldName == "age") field = {"age", "20"};
else assert(0);//只支持这两个字段检查
Status s;
std::vector<std::string> resKeys1 = db->QueryByIndex(field, &s); //indexdb根据索引查到的数据
std::vector<std::string> resKeys2 = db->FindKeysByField(field); //kvdb强行遍历查到的数据
std::sort(resKeys1.begin(), resKeys1.end());
std::sort(resKeys2.begin(), resKeys2.end());
std::cout << resKeys1.size() << " " << resKeys2.size() << std::endl;
ASSERT_EQ(resKeys1, resKeys2);
}

+ 34
- 0
test/helper.h Voir le fichier

@ -0,0 +1,34 @@
#include "fielddb/field_db.h"
using namespace fielddb;
class ThreadSafeSet
{
private:
std::set<std::string> keys;
std::mutex setMutex;
public:
ThreadSafeSet(){}
void insert(std::string key){
std::lock_guard<std::mutex> lock(setMutex);
keys.insert(key);
}
void erase(std::string key){
std::lock_guard<std::mutex> lock(setMutex);
keys.erase(key);
}
void clear(){
std::lock_guard<std::mutex> lock(setMutex);
keys.clear();
}
size_t size(){
std::lock_guard<std::mutex> lock(setMutex);
return keys.size();
}
bool haveKey(std::string key){
return std::find(keys.begin(), keys.end(), key) != keys.end();
}
};

+ 141
- 2
test/parallel_test.cc Voir le fichier

@ -9,7 +9,7 @@ using namespace fielddb;
// 测试中read/write都表示带索引的读写
//读写有索引数据的并发
TEST(TestReadWrite, Parallel) {
TEST(TestReadPut, Parallel) {
fielddb::DestroyDB("testdb2.1",Options());
FieldDB *db = new FieldDB();
@ -18,6 +18,8 @@ TEST(TestReadWrite, Parallel) {
abort();
}
// ClearDB(db);
db->CreateIndexOnField("address");
db->CreateIndexOnField("age");
int thread_num_ = 5;
std::vector<std::thread> threads(thread_num_);
//二写三读
@ -45,11 +47,12 @@ TEST(TestReadWrite, Parallel) {
GetFieldData(db, allowNotFound);
GetFieldData(db, allowNotFound, 1);
findKeysByCity(db);
checkDataInKVAndIndex(db);
delete db;
}
//创建索引与写有该索引数据的并发
TEST(TestWriteCreatei, Parallel) {
TEST(TestPutCreatei, Parallel) {
fielddb::DestroyDB("testdb2.2",Options());
FieldDB *db = new FieldDB();
@ -90,6 +93,7 @@ TEST(TestWriteCreatei, Parallel) {
findKeysByCityIndex(db, haveIndex);
//检查写入是否成功
GetOneField(db);
checkDataInKVAndIndex(db);
delete db;
}
@ -128,6 +132,7 @@ TEST(TestCreateiCreatei, Parallel) {
bool haveIndex = true;
findKeysByCityIndex(db, haveIndex);
findKeysByAgeIndex(db, false);
checkDataInKVAndIndex(db);
for (size_t i = 0; i < thread_num_; i++)
@ -156,6 +161,140 @@ TEST(TestCreateiCreatei, Parallel) {
delete db;
}
//有索引时,大量并发put与delete相同key,确保kvdb和indexdb的一致性
TEST(TestPutDeleteOne, Parallel) {
fielddb::DestroyDB("testdb2.4",Options());
FieldDB *db = new FieldDB();
if(OpenDB("testdb2.4", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
shanghaiKeys.clear();
age20Keys.clear();
db->CreateIndexOnField("address");
db->CreateIndexOnField("age");
int thread_num_ = 20;
std::vector<std::thread> threads(thread_num_);
for (size_t i = 0; i < thread_num_; i++)
{
if (i % 2 == 0) {//2线程删除索引address
threads[i] = std::thread([db](){
for (size_t j = 0; j < 100; j++)
{
InsertOneField(db, std::to_string(j));
}
});
} else {//1线程创建索引age
threads[i] = std::thread([db](){
for (size_t j = 0; j < 100; j++)
{
DeleteOneField(db, std::to_string(j));
}
});
}
}
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
//检查
checkDataInKVAndIndex(db);
delete db;
}
//有索引时,put与delete的并发,确保kvdb和indexdb的一致性
TEST(TestPutDelete, Parallel) {
fielddb::DestroyDB("testdb2.5",Options());
FieldDB *db = new FieldDB();
if(OpenDB("testdb2.5", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
shanghaiKeys.clear();
age20Keys.clear();
db->CreateIndexOnField("address");
db->CreateIndexOnField("age");
int thread_num_ = 4;
std::vector<std::thread> threads(thread_num_);
threads[0] = std::thread([db](){InsertFieldData(db);});
threads[1] = std::thread([db](){InsertFieldData(db, 1);});
threads[2] = std::thread([db](){DeleteFieldData(db);});
threads[3] = std::thread([db](){DeleteFieldData(db, 1);});
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
//检查
checkDataInKVAndIndex(db);
delete db;
}
//write和其他功能的并发(大杂烩
TEST(TestWrite, Parallel) {
fielddb::DestroyDB("testdb2.6",Options());
FieldDB *db = new FieldDB();
if(OpenDB("testdb2.6", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
shanghaiKeys.clear();
age20Keys.clear();
db->CreateIndexOnField("address");
InsertFieldData(db, 2); //先填点数据,让创建索引的时间久一点
int thread_num_ = 5;
std::vector<std::thread> threads(thread_num_);
threads[0] = std::thread([db](){db->CreateIndexOnField("age");});
threads[1] = std::thread([db](){
while (db->GetIndexStatus("age") == NotExist){
continue; //开始创建了再并发的写
}
InsertFieldData(db);});
threads[2] = std::thread([db](){
while (db->GetIndexStatus("age") == NotExist){
continue;
}
WriteFieldData(db, 1);});
threads[3] = std::thread([db](){
while (db->GetIndexStatus("age") == NotExist){
continue;
}
DeleteFieldData(db, 0);});
threads[4] = std::thread([db](){
while (db->GetIndexStatus("age") == NotExist){
continue;
}
db->DeleteIndex("age");});
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
//检查
checkDataInKVAndIndex(db);
ASSERT_EQ(db->GetIndexStatus("age"), NotExist); //删除索引的请求应该被pend在创建之上
//删掉最后一个线程,可以测试创建age索引时并发的写入能不能保持age的一致性
//checkDataInKVAndIndex(db, "age");
delete db;
}
int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits.
testing::InitGoogleTest(&argc, argv);

+ 85
- 0
test/recover_test.cc Voir le fichier

@ -0,0 +1,85 @@
#include "gtest/gtest.h"
// #include "leveldb/env.h"
// #include "leveldb/db.h"
#include "fielddb/field_db.h"
#include "test/helper.cc"
#include <thread>
#include <csignal>
#include <exception>
using namespace fielddb;
TEST(TestNormalRecover, Recover) {
fielddb::DestroyDB("testdb3.1",Options());
FieldDB *db = new FieldDB();
if(OpenDB("testdb3.1", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
db->CreateIndexOnField("address");
db->CreateIndexOnField("age");
InsertFieldData(db);
bool allowNotFound = false;
GetFieldData(db, allowNotFound);
findKeysByCityIndex(db, true);
findKeysByAgeIndex(db, true);
delete db;
db = new FieldDB();
if(OpenDB("testdb3.1", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
//仍然能读到之前写入的数据和索引
GetFieldData(db, allowNotFound);
findKeysByCityIndex(db, true);
findKeysByAgeIndex(db, true);
}
TEST(TestParalPutRecover, Recover) {
//第一次运行
// fielddb::DestroyDB("testdb3.2",Options());
// FieldDB *db = new FieldDB();
// if(OpenDB("testdb3.2", &db).ok() == false) {
// std::cerr << "open db failed" << std::endl;
// abort();
// }
// db->CreateIndexOnField("address");
// db->CreateIndexOnField("age");
// shanghaiKeys.clear();
// age20Keys.clear();
// int thread_num_ = 2;
// std::vector<std::thread> threads(thread_num_);
// threads[0] = std::thread([db](){
// InsertFieldData(db);
// });
// threads[1] = std::thread([db](){
// InsertOneField(db);
// delete db;
// });
// for (auto& t : threads) {
// if (t.joinable()) {
// t.join();
// }
// }
//线程1导致了线程0错误,测试会终止(模拟数据库崩溃)
//这会导致线程0在写入的各种奇怪的时间点崩溃
//第二次运行注释掉上面的代码,运行下面的代码测试恢复
//第二次运行
FieldDB *db = new FieldDB();
if(OpenDB("testdb3.2", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
GetOneField(db);
checkDataInKVAndIndex(db);
}
int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits.
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

Chargement…
Annuler
Enregistrer