Przeglądaj źródła

增加并通过了write相关测试

pull/2/head
augurier 8 miesięcy temu
rodzic
commit
d5e46b56b3
5 zmienionych plików z 114 dodań i 11 usunięć
  1. +13
    -5
      fielddb/request.cpp
  2. +1
    -1
      fielddb/request.h
  3. +3
    -0
      test/basic_function_test.cc
  4. +42
    -3
      test/helper.cc
  5. +55
    -2
      test/parallel_test.cc

+ 13
- 5
fielddb/request.cpp Wyświetl plik

@ -286,7 +286,7 @@ void iDeleteReq::Prepare(FieldDB *DB) {
done = true;
} else {
//如果正在创建或者删除,那么pend到对应的请求上
parent->PendReq(this);
parent->PendReq(this->parent);
}
}
@ -334,9 +334,17 @@ BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu):
void Put(const Slice &key, const Slice &value) override {
//为key和value构造存储的地方,防止由于string的析构造成可能得内存访问错误
str_buf->push_back(key.ToString());
fa_buf->push_back({{"",value.ToString()}});
FieldArray *field = new FieldArray;
field = ParseValue(value.ToString(), field);
if (field == nullptr){ //batch中的value没有field
fa_buf->push_back({{"",value.ToString()}});
} else {
fa_buf->push_back(*field);
}
sub_requests->emplace_back(new FieldsReq(&str_buf->back(),&fa_buf->back(),mu));
sub_requests->back()->parent = req;
delete field;
}
void Delete(const Slice &key) override {
str_buf->push_back(key.ToString());
@ -370,10 +378,10 @@ BatchReq::~BatchReq() {
}
void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet)
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet)
{
WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch;
std::unordered_set<std::string *> Sub_batchKeySet;
std::unordered_set<std::string> Sub_batchKeySet;
//由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代
for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) {
(*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet);
@ -386,7 +394,7 @@ void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
KVBatch.Append(Sub_KVBatch);
IndexBatch.Append(Sub_IndexBatch);
MetaBatch.Append(Sub_MetaBatch);
batchKeySet.insert(batchKeySet.begin(),batchKeySet.end());
batchKeySet.insert(Sub_batchKeySet.begin(),Sub_batchKeySet.end());
}

+ 1
- 1
fielddb/request.h Wyświetl plik

@ -138,7 +138,7 @@ public:
~BatchReq();
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
WriteBatch *Batch;
std::deque<Request *> sub_requests;

+ 3
- 0
test/basic_function_test.cc Wyświetl plik

@ -57,6 +57,9 @@ TEST(TestLab2, Basic) {
std::vector<std::string> resKeys = db->QueryByIndex(field, &s);
ASSERT_EQ(resKeys.size(), 0);
WriteFieldData(db);
GetFieldData(db, false);
findKeysByAgeIndex(db, true);
delete db;
}

+ 42
- 3
test/helper.cc Wyświetl plik

@ -19,7 +19,8 @@ std::vector cities = {
ThreadSafeSet shanghaiKeys;
ThreadSafeSet age20Keys;
//复杂的测试要注意这两个全局变量,
//目前只有InsertFieldData和InsertOneField会往里加,DeleteFieldData和InsertOneField会删除,
//目前只有InsertFieldData和InsertOneField和writeFieldData会往里加,
//DeleteFieldData和InsertOneField会删除,
//其他测试之间有必要手动clear
Status OpenDB(std::string dbName, FieldDB **db) {
@ -131,6 +132,41 @@ void DeleteFieldData(FieldDB *db, int seed = 0/*随机种子*/) {
}
}
void WriteFieldData(FieldDB *db, int seed = 0/*随机种子*/) {
std::cout << "-------writing-------" << std::endl;
WriteOptions writeOptions;
int key_num = data_size / value_size;
// srand线程不安全,这种可以保证多线程时随机序列也一致
std::mt19937 rng(seed);
WriteBatch wb;
for (int i = 0; i < key_num; i++) {
int randThisTime = rng(); //确保读写一个循环只rand一次,否则随机序列会不一致
//让批量写入的key>0, 单独写入的key<=0,方便测试观察
int key_ = std::abs(randThisTime) % key_num + 1;
std::string key = std::to_string(key_);
std::string name = "customer#" + std::to_string(key_);
std::string address = cities[randThisTime % cities.size()];
std::string age = std::to_string(std::abs(randThisTime) % AGE_RANGE);
FieldArray fields = {
{"name", name},
{"address", address},
{"age", age}
};
if (address == "Shanghai") {
shanghaiKeys.insert(key);
}
if (age == "20") {
age20Keys.insert(key);
}
wb.Put(key, SerializeValue(fields));
}
Status s = db->Write(writeOptions, &wb);
ASSERT_TRUE(s.ok());
}
//并发时不一定能读到,加个参数控制
void GetFieldData(FieldDB *db, bool allowNotFound, int seed = 0) {
std::cout << "-------getting-------" << std::endl;
@ -232,8 +268,11 @@ void findKeysByAgeIndex(FieldDB *db, bool haveIndex) {
}
}
void checkDataInKVAndIndex(FieldDB *db) {
Field field = {"address", "Shanghai"};
void checkDataInKVAndIndex(FieldDB *db, std::string fieldName = "address") {
Field field;
if (fieldName == "address") field = {"address", "Shanghai"};
else if (fieldName == "age") field = {"age", "20"};
else assert(0);//只支持这两个字段检查
Status s;
std::vector<std::string> resKeys1 = db->QueryByIndex(field, &s); //indexdb根据索引查到的数据
std::vector<std::string> resKeys2 = db->FindKeysByField(field); //kvdb强行遍历查到的数据

+ 55
- 2
test/parallel_test.cc Wyświetl plik

@ -9,7 +9,7 @@ using namespace fielddb;
// 测试中read/write都表示带索引的读写
//读写有索引数据的并发
TEST(TestReadWrite, Parallel) {
TEST(TestReadPut, Parallel) {
fielddb::DestroyDB("testdb2.1",Options());
FieldDB *db = new FieldDB();
@ -52,7 +52,7 @@ TEST(TestReadWrite, Parallel) {
}
//创建索引与写有该索引数据的并发
TEST(TestWriteCreatei, Parallel) {
TEST(TestPutCreatei, Parallel) {
fielddb::DestroyDB("testdb2.2",Options());
FieldDB *db = new FieldDB();
@ -242,6 +242,59 @@ TEST(TestPutDelete, Parallel) {
delete db;
}
//write和其他功能的并发(大杂烩
TEST(TestWrite, Parallel) {
fielddb::DestroyDB("testdb2.6",Options());
FieldDB *db = new FieldDB();
if(OpenDB("testdb2.6", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
shanghaiKeys.clear();
age20Keys.clear();
db->CreateIndexOnField("address");
InsertFieldData(db, 2); //先填点数据,让创建索引的时间久一点
int thread_num_ = 5;
std::vector<std::thread> threads(thread_num_);
threads[0] = std::thread([db](){db->CreateIndexOnField("age");});
threads[1] = std::thread([db](){
while (db->GetIndexStatus("age") == NotExist){
continue; //开始创建了再并发的写
}
InsertFieldData(db);});
threads[2] = std::thread([db](){
while (db->GetIndexStatus("age") == NotExist){
continue;
}
WriteFieldData(db, 1);});
threads[3] = std::thread([db](){
while (db->GetIndexStatus("age") == NotExist){
continue;
}
DeleteFieldData(db, 0);});
threads[4] = std::thread([db](){
while (db->GetIndexStatus("age") == NotExist){
continue;
}
db->DeleteIndex("age");});
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
//检查
checkDataInKVAndIndex(db);
ASSERT_EQ(db->GetIndexStatus("age"), NotExist); //删除索引的请求应该被pend在创建之上
//删掉最后一个线程,可以测试创建age索引时并发的写入能不能保持age的一致性
//checkDataInKVAndIndex(db, "age");
delete db;
}
int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits.
testing::InitGoogleTest(&argc, argv);

Ładowanie…
Anuluj
Zapisz