diff --git a/CMakeLists.txt b/CMakeLists.txt index b70e461..9529fb0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -194,6 +194,10 @@ target_sources(leveldb "util/serialize_value.cc" "fielddb/field_db.cpp" "fielddb/field_db.h" + "fielddb/meta.cpp" + "fielddb/meta.h" + "fielddb/request.cpp" + "fielddb/request.h" # Only CMake 3.3+ supports PUBLIC sources in targets exported by "install". $<$:PUBLIC> @@ -522,7 +526,12 @@ if(LEVELDB_INSTALL) ) endif(LEVELDB_INSTALL) -add_executable(lab1_test +add_executable(basic_function_test "${PROJECT_SOURCE_DIR}/test/basic_function_test.cc" ) -target_link_libraries(lab1_test PRIVATE leveldb gtest) +target_link_libraries(basic_function_test PRIVATE leveldb gtest) + +add_executable(parallel_test + "${PROJECT_SOURCE_DIR}/test/parallel_test.cc" +) +target_link_libraries(parallel_test PRIVATE leveldb gtest) diff --git a/fielddb/field_db.cpp b/fielddb/field_db.cpp index ad9e72d..2e00ca9 100644 --- a/fielddb/field_db.cpp +++ b/fielddb/field_db.cpp @@ -14,6 +14,7 @@ #include "util/serialize_value.h" #include "fielddb/encode_index.h" #include "fielddb/meta.h" +#include "field_db.h" namespace fielddb { using namespace leveldb; @@ -61,7 +62,7 @@ Request *FieldDB::GetHandleInterval() { mutex_.AssertHeld(); //保证队列是互斥访问的 Request *tail = taskqueue_.front(); for(auto *req_ptr : taskqueue_) { - if(req_ptr->isDeleteReq() || req_ptr->isiCreateReq()) { + if(req_ptr->isiDeleteReq() || req_ptr->isiCreateReq()) { return tail; } tail = req_ptr; @@ -83,6 +84,7 @@ Again: WriteBatch KVBatch,IndexBatch,MetaBatch; Status status; if(!tail->isiCreateReq() && !tail->isiDeleteReq()) { + // int debug = tail->type_; //表明这一个区间并没有涉及index的创建删除 { //1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。 @@ -97,9 +99,12 @@ Again: mutex_.Unlock(); WriteOptions op; status = metaDB_->Write(op, &MetaBatch); + assert(status.ok()); //TODO:index的写入需要在另外一个线程中同时完成 status = indexDB_->Write(op, &IndexBatch); + assert(status.ok()); status = kvDB_->Write(op, &KVBatch); + assert(status.ok()); //3. 将meta数据清除 MetaCleaner cleaner; cleaner.Collect(MetaBatch); @@ -113,13 +118,13 @@ Again: while(true) { Request *ready = taskqueue_.front(); + // int debug = tail->type_; taskqueue_.pop_front(); //当前ready不是队首,不是和index的创建有关 - if(ready != &req && !ready->isPending() && - !req.isiCreateReq() && !req.isiDeleteReq()) { + if(!ready->isPending() && !req.isiCreateReq() && !req.isiDeleteReq()) { ready->s = status; ready->done = true; - ready->cond_.Signal(); + if (ready != &req) ready->cond_.Signal(); } if (ready == tail) break; } @@ -159,11 +164,11 @@ Status FieldDB::PutFields(const WriteOptions &Options, // todo: 删除有索引的key时indexdb也要同步 Status FieldDB::Delete(const WriteOptions &options, const Slice &key) { // - std::string key_ = key.ToString(); - DeleteReq req(&key_,&mutex_); - Status status = HandleRequest(req); - return status; - // return kvDB_->Delete(options, key); + // std::string key_ = key.ToString(); + // DeleteReq req(&key_,&mutex_); + // Status status = HandleRequest(req); + // return status; + return kvDB_->Delete(options, key); } // TODO:根据updates里面的东西,要对是否需要更新index进行分别处理 Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) { @@ -207,25 +212,6 @@ std::vector> FieldDB::FindKeysAndValByFieldN } Status FieldDB::CreateIndexOnField(const std::string& field_name) { - //taskQueue相关 - //写锁 是不是只需要给putfields设置一把锁就行 - - // std::vector> keysAndVal = - // FindKeysAndValByFieldName(field_name); - // WriteBatch writeBatch; - // Slice value = Slice(); - // for (auto &kvPair : keysAndVal){ - // std::string indexKey; - // AppendIndexKey(&indexKey, - // ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second)); - // writeBatch.Put(indexKey, value); - // } - // Status s = indexDB_->Write(WriteOptions(), &writeBatch); - // if (!s.ok()) return s; - - // index_[field_name].first = Exist; - // //唤醒taskqueue - // return s; std::string Field = field_name; iCreateReq req(&Field,&mutex_); HandleRequest(req); @@ -241,23 +227,6 @@ Status FieldDB::CreateIndexOnField(const std::string& field_name) { } Status FieldDB::DeleteIndex(const std::string &field_name) { - //taskQueue相关 - //写锁 - // std::vector> keysAndVal = - // FindKeysAndValByFieldName(field_name); - // WriteBatch writeBatch; - // for (auto &kvPair : keysAndVal){ - // std::string indexKey; - // AppendIndexKey(&indexKey, - // ParsedInternalIndexKey(kvPair.first, field_name, kvPair.second)); - // writeBatch.Delete(indexKey); - // } - // Status s = indexDB_->Write(WriteOptions(), &writeBatch); - // if (!s.ok()) return s; - - // index_.erase(field_name); - // //唤醒taskqueue - // return s; std::string Field = field_name; iDeleteReq req(&Field,&mutex_); HandleRequest(req); @@ -299,6 +268,12 @@ std::vector FieldDB::QueryByIndex(const Field &field, Status *s) { return result; } +IndexStatus FieldDB::GetIndexStatus(const std::string &fieldName){ + if (index_.count(fieldName) == 0) return IndexStatus::NotExist; + IndexStatus idxs = index_[fieldName].first; + return idxs; +} + Iterator * FieldDB::NewIterator(const ReadOptions &options) { return kvDB_->NewIterator(options); } @@ -327,4 +302,15 @@ void FieldDB::CompactRange(const Slice *begin, const Slice *end) { kvDB_->CompactRange(begin, end); } -} // end of namespace \ No newline at end of file +Status DestroyDB(const std::string& name, const Options& options) { + Status s; + s = leveldb::DestroyDB(name+"_kvDB", options); + assert(s.ok()); + s = leveldb::DestroyDB(name+"_indexDB", options); + assert(s.ok()); + s = leveldb::DestroyDB(name+"_metaDB", options); + assert(s.ok()); + return s; +} + +} // namespace fielddb diff --git a/fielddb/field_db.h b/fielddb/field_db.h index c54b525..684a820 100644 --- a/fielddb/field_db.h +++ b/fielddb/field_db.h @@ -15,6 +15,14 @@ # define FIELD_DB_H namespace fielddb { using namespace leveldb; + +enum IndexStatus{ + Creating, + Deleting, + Exist, + NotExist + }; + class FieldDB : DB { public: friend class Request; @@ -43,6 +51,8 @@ public: Status CreateIndexOnField(const std::string& field_name); Status DeleteIndex(const std::string &field_name); std::vector QueryByIndex(const Field &field, Status *s); + //返回当前数据库中索引状态,用来测试,不过也可以作为一个功能? + IndexStatus GetIndexStatus(const std::string &fieldName); static Status OpenFieldDB(const Options& options,const std::string& name,FieldDB** dbptr); @@ -59,11 +69,6 @@ private: leveldb::DB *indexDB_; leveldb::DB *kvDB_; - enum IndexStatus{ - Creating, - Deleting, - Exist - }; using FieldName = std::string; // 标记index的状态,如果是creating/deleting,则会附带相应的请求 std::map> index_; @@ -80,5 +85,8 @@ private: Request *GetHandleInterval(); //获得任务队列中的待处理区间,区间划分规则和原因见文档 }; + +Status DestroyDB(const std::string& name, + const Options& options); } // end of namespace # endif \ No newline at end of file diff --git a/fielddb/request.cpp b/fielddb/request.cpp index c1e926e..6d36caf 100644 --- a/fielddb/request.cpp +++ b/fielddb/request.cpp @@ -8,6 +8,7 @@ #include "fielddb/encode_index.h" #include "fielddb/field_db.h" #include "fielddb/meta.h" +#include "request.h" namespace fielddb { using namespace leveldb; @@ -52,13 +53,13 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, if(field_name == "") break; if(DB->index_.count(field_name)) { auto [index_status,parent_req] = DB->index_[field_name]; - if(index_status == FieldDB::Creating || index_status == FieldDB::Deleting) { + if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { parent_req->PendReq(this); return; - } else if(index_status == FieldDB::Exist) { + } else if(index_status == IndexStatus::Exist) { HasIndex = true; } - assert(0); + //assert(0); } } //2.对于没有冲突但含有索引操作的put,构建metaKV,这里直接将KV对简单编码后写入metaDB @@ -68,15 +69,16 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, MetaKV MKV = MetaKV(Key,&serialized); MKV.Trans(MetaKey, MetaValue); MetaBatch.Put(MetaKey, MetaValue); - } + //第三点是不是应该在这一分支中 //3.对于含有索引的field建立索引 - for(auto [field_name,field_value] : *Fields) { - if(field_name == "") continue; - if(DB->index_.count(field_name)) { - std::string indexKey; - AppendIndexKey(&indexKey, ParsedInternalIndexKey( - *Key,field_name,field_value)); - IndexBatch.Put(indexKey, Slice()); + for(auto [field_name,field_value] : *Fields) { + if(field_name == "") continue; + if(DB->index_.count(field_name)) { + std::string indexKey; + AppendIndexKey(&indexKey, ParsedInternalIndexKey( + *Key,field_name,field_value)); + IndexBatch.Put(indexKey, Slice()); + } } } } @@ -101,7 +103,7 @@ void iCreateReq::Prepare(FieldDB *DB) { DB->index_mu.AssertHeld(); if(DB->index_.count(*Field)) { auto [istatus,parent] = DB->index_[*Field]; - if(istatus == FieldDB::Exist) { + if(istatus == IndexStatus::Exist) { //如果已经完成建立索引,则返回成功 done = true; Existed = true; @@ -114,22 +116,48 @@ void iCreateReq::Prepare(FieldDB *DB) { } //如果索引状态表中没有,则表示尚未创建,更新相应的状态 //这里将done设置为true表示在taskqueue中需要完成的部分已经完成,不需要pend - DB->index_[*Field] = {FieldDB::Creating,this}; + DB->index_[*Field] = {IndexStatus::Creating,this}; done = true; } +void iCreateReq::PendReq(Request *req) { + req->parent = this; + pending_list.push_back(req); +} + void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB) { - //TODO:遍历数据库,构建二级索引到indexbatch,并且更新metaDB中的元数据为Index类型的(Field,Creating) - //这里或许不需要在metaDB中先写一遍? + //遍历数据库,构建二级索引到indexbatch,(更新metaDB中的元数据为Index类型的(Field,Creating)) + //一个indexwritebatch写入,那么索引创建删除应该和metadb没有交互 + std::vector> keysAndVal = + DB->FindKeysAndValByFieldName(*Field); + Slice value = Slice(); + for (auto &kvPair : keysAndVal){ + std::string indexKey; + AppendIndexKey(&indexKey, + ParsedInternalIndexKey(kvPair.first, *Field, kvPair.second)); + IndexBatch.Put(indexKey, value); + } } void iCreateReq::Finalize(FieldDB *DB) { - //TODO: - //1. 写入完成后,更新index状态表,并将metaDB的值改为Index类型的(Field,Existing) - //2. 将所有的pendinglist重新入队 - + //1. 写入完成后,更新index状态表,(并将metaDB的值改为Index类型的(Field,Existing)) + MutexLock iL(&DB->index_mu); + DB->index_[*Field] = {IndexStatus::Exist, nullptr}; + DB->index_mu.Unlock(); + + if (pending_list.empty()) return; + //2. 将所有的pendinglist重新入队 + MutexLock L(&DB->mutex_); + for (auto req : pending_list){ + DB->taskqueue_.push_back(req); + req->parent = req; //解绑 + } + if (pending_list[0] == DB->taskqueue_.front()) { + pending_list[0]->cond_.Signal(); + } + this->s = Status::OK(); } /*******iDeleteReq*******/ @@ -142,8 +170,8 @@ void iDeleteReq::Prepare(FieldDB *DB) { return ; } auto [istatus,parent] = DB->index_[*Field]; - if(istatus == FieldDB::Exist) { - DB->index_[*Field] = {FieldDB::Creating,this}; + if(istatus == IndexStatus::Exist) { + DB->index_[*Field] = {IndexStatus::Deleting,this}; done = true; } else { //如果正在创建或者删除,那么pend到对应的请求上 @@ -151,14 +179,42 @@ void iDeleteReq::Prepare(FieldDB *DB) { } } +void iDeleteReq::PendReq(Request* req) { + req->parent = this; + pending_list.push_back(req); +} + void iDeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch &MetaBatch,fielddb::FieldDB *DB) { - + std::vector> keysAndVal = + DB->FindKeysAndValByFieldName(*Field); + Slice value = Slice(); + for (auto &kvPair : keysAndVal){ + std::string indexKey; + AppendIndexKey(&indexKey, + ParsedInternalIndexKey(kvPair.first, *Field, kvPair.second)); + IndexBatch.Delete(indexKey); + } } void iDeleteReq::Finalize(FieldDB *DB) { - + MutexLock iL(&DB->index_mu); + DB->index_.erase(*Field); + DB->index_mu.Unlock(); + + if (pending_list.empty()) return; + //2. 将所有的pendinglist重新入队 + MutexLock L(&DB->mutex_); + for (auto req : pending_list){ + DB->taskqueue_.push_back(req); + req->parent = req; //解绑 + } + if (pending_list[0] == DB->taskqueue_.front()) { + pending_list[0]->cond_.Signal(); + } + this->s = Status::OK(); } -} \ No newline at end of file + +} // namespace fielddb \ No newline at end of file diff --git a/fielddb/request.h b/fielddb/request.h index 3346c1f..e68c4f1 100644 --- a/fielddb/request.h +++ b/fielddb/request.h @@ -19,7 +19,7 @@ public: friend class FieldDB; enum RequestType { FieldsReq_t, - ValueReq_t, + //ValueReq_t, iCreateReq_t, iDeleteReq_t, DeleteReq_t, @@ -33,7 +33,7 @@ public: Request(RequestType type,port::Mutex *mu): type_(type),cond_(mu),done(false) { parent = this; }; - virtual ~Request(); + //virtual ~Request(); inline bool isFieldsReq() { return type_ == FieldsReq_t; } // inline bool isValueReq() { return type_ == ValueReq_t; } @@ -93,6 +93,7 @@ public: WriteBatch &MetaBatch,fielddb::FieldDB *DB) override; void Prepare(FieldDB *DB) override; void Finalize(FieldDB *DB) override; + void PendReq(Request *req) override; bool Existed; std::string *Field; @@ -109,6 +110,7 @@ public: WriteBatch &MetaBatch,fielddb::FieldDB *DB) override; void Prepare(FieldDB *DB) override; void Finalize(FieldDB *DB) override; + void PendReq(Request *req) override; bool Deleted; std::string *Field; diff --git a/test/basic_function_test.cc b/test/basic_function_test.cc index 4915fe4..28d9fc7 100644 --- a/test/basic_function_test.cc +++ b/test/basic_function_test.cc @@ -2,138 +2,47 @@ // #include "leveldb/env.h" // #include "leveldb/db.h" #include "fielddb/field_db.h" +#include "test/helper.cc" using namespace fielddb; -constexpr int value_size = 2048; -constexpr int data_size = 128 << 20; -std::vector cities = { - "Beijing", "Shanghai", "Guangzhou", "Shenzhen", "Hangzhou", - "Chengdu", "Chongqing", "Wuhan", "Suzhou", "Tianjin" - }; -std::vector shanghaiKeys; - -Status OpenDB(std::string dbName, FieldDB **db) { - Options options; - options.create_if_missing = true; - return FieldDB::OpenFieldDB(options, dbName, db); -} - -void ClearDB(FieldDB *db){ - //destroy和恢复没做前先用这个清理数据库,否则跑不同的数据多做几次测试会污染 - WriteOptions writeOptions; - int key_num = data_size / value_size; - for (int i = 0; i < key_num; i++) { - int key_ = i+1; - std::string key = std::to_string(key_); - Status s = db->Delete(WriteOptions(), key); - ASSERT_TRUE(s.ok()); - } -} - -void InsertFieldData(FieldDB *db) { - WriteOptions writeOptions; - int key_num = data_size / value_size; - srand(0); - - for (int i = 0; i < key_num; i++) { - int randThisTime = rand(); //确保读写一个循环只rand一次,否则随机序列会不一致 - int key_ = randThisTime % key_num+1; - std::string key = std::to_string(key_); - - std::string name = "customer#" + std::to_string(key_); - std::string address = cities[randThisTime % cities.size()]; - FieldArray fields = { - {"name", name}, - {"address", address} - }; - if (address == "Shanghai") { - shanghaiKeys.push_back(key); - } - Status s = db->PutFields(WriteOptions(), key, fields); - ASSERT_TRUE(s.ok()); - } -} - -void GetFieldData(FieldDB *db) { - ReadOptions readOptions; - int key_num = data_size / value_size; - - // 点查 - srand(0); - for (int i = 0; i < 100; i++) { - int randThisTime = rand(); - int key_ = randThisTime % key_num+1; - std::string key = std::to_string(key_); - FieldArray fields_ret; - Status s = db->GetFields(readOptions, key, &fields_ret); - ASSERT_TRUE(s.ok()); - for (const Field& pairs : fields_ret) { - if (pairs.first == "name"){ - - } else if (pairs.first == "address"){ - std::string city = pairs.second; - ASSERT_NE(std::find(cities.begin(), cities.end(), city), cities.end()); - } else assert(false); - } - } -} - -void findKeysByCity(FieldDB *db) { - Field field = {"address", "Shanghai"}; - std::vector resKeys = db->FindKeysByField(field); - std::cout << shanghaiKeys.size() << " " << resKeys.size() << std::endl; - for (const std::string &key : resKeys){ - ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end()); - } -} - -void findKeysByCityIndex(FieldDB *db, bool expect) { - Field field = {"address", "Shanghai"}; - Status s; - std::vector resKeys = db->QueryByIndex(field, &s); - if (expect) ASSERT_TRUE(s.ok()); - else { - ASSERT_TRUE(s.IsNotFound()); - return; - } - std::cout << shanghaiKeys.size() << " " << resKeys.size() << std::endl; - for (const std::string &key : resKeys){ - ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end()); - } -} TEST(TestLab1, Basic) { - // DestroyDB("testdb",Options()); + fielddb::DestroyDB("testdb1.1",Options()); //每个测试前,先把对应名称的之前的数据库删了 FieldDB *db = new FieldDB(); - if(OpenDB("testdb", &db).ok() == false) { + if(OpenDB("testdb1.1", &db).ok() == false) { std::cerr << "open db failed" << std::endl; abort(); } // ClearDB(db); InsertFieldData(db); - GetFieldData(db); + bool allowNotFound = false; + GetFieldData(db, allowNotFound); findKeysByCity(db); delete db; } TEST(TestLab2, Basic) { - //destroy + fielddb::DestroyDB("testdb1.2",Options()); FieldDB *db = new FieldDB(); - if(OpenDB("testdb2", &db).ok() == false) { + if(OpenDB("testdb1.2", &db).ok() == false) { std::cerr << "open db failed" << std::endl; abort(); } // ClearDB(db); shanghaiKeys.clear(); + age20Keys.clear(); InsertFieldData(db); // GetFieldData(db); // findKeysByCity(db); db->CreateIndexOnField("address"); + db->CreateIndexOnField("age"); findKeysByCityIndex(db, true); + findKeysByAgeIndex(db, true); db->DeleteIndex("address"); findKeysByCityIndex(db, false); + findKeysByAgeIndex(db, true); delete db; } diff --git a/test/helper.cc b/test/helper.cc new file mode 100644 index 0000000..9de92bc --- /dev/null +++ b/test/helper.cc @@ -0,0 +1,182 @@ +#include "gtest/gtest.h" +// #include "leveldb/env.h" +// #include "leveldb/db.h" +#include "fielddb/field_db.h" +#include +using namespace fielddb; + +constexpr int value_size = 2048; +constexpr int data_size = 128 << 20; +#define AGE_RANGE 100 +std::vector cities = { + "Beijing", "Shanghai", "Guangzhou", "Shenzhen", "Hangzhou", + "Chengdu", "Chongqing", "Wuhan", "Suzhou", "Tianjin" + }; +//检查insert和queryByIndex的数据是否对应 +std::set shanghaiKeys; +std::set age20Keys; +//复杂的测试要注意这两个全局变量,目前只有InsertFieldData和InsertOneField会往里加,并且没有清理 + +Status OpenDB(std::string dbName, FieldDB **db) { + Options options; + options.create_if_missing = true; + return FieldDB::OpenFieldDB(options, dbName, db); +} + +// void ClearDB(FieldDB *db){ +// //destroy和恢复没做前先用这个清理数据库,否则跑不同的数据多做几次测试会污染 +// WriteOptions writeOptions; +// int key_num = data_size / value_size; +// for (int i = 0; i < key_num; i++) { +// int key_ = i+1; +// std::string key = std::to_string(key_); +// Status s = db->Delete(WriteOptions(), key); +// ASSERT_TRUE(s.ok()); +// } +// } + +//只插一条特定数据的测试 +void InsertOneField(FieldDB *db, std::string key = "0") { + WriteOptions writeOptions; + FieldArray fields = { + {"name", "special#" + key}, + {"address", "Shanghai"}, + {"age", "20"} + }; + Status s = db->PutFields(WriteOptions(), key, fields); + ASSERT_TRUE(s.ok()); + shanghaiKeys.insert(key); + age20Keys.insert(key); +} + +//与上面对应 +void GetOneField(FieldDB *db, std::string key = "0") { + ReadOptions readOptions; + FieldArray fields_ret; + Status s = db->GetFields(readOptions, key, &fields_ret); + ASSERT_TRUE(s.ok()); + for (const Field& pairs : fields_ret) { + if (pairs.first == "name"){ + ASSERT_EQ(pairs.second, "special#" + key); + } else if (pairs.first == "address"){ + ASSERT_EQ(pairs.second, "Shanghai"); + } else if (pairs.first == "age"){ + ASSERT_EQ(pairs.second, "20"); + } else assert(false); + } +} + +void InsertFieldData(FieldDB *db, int seed = 0/*随机种子*/) { + std::cout << "-------inserting-------" << std::endl; + WriteOptions writeOptions; + int key_num = data_size / value_size; + // srand线程不安全,这种可以保证多线程时随机序列也一致 + std::mt19937 rng(seed); + + for (int i = 0; i < key_num; i++) { + int randThisTime = rng(); //确保读写一个循环只rand一次,否则随机序列会不一致 + //让批量写入的key>0, 单独写入的key<=0,方便测试观察 + int key_ = std::abs(randThisTime) % key_num + 1; + std::string key = std::to_string(key_); + + std::string name = "customer#" + std::to_string(key_); + std::string address = cities[randThisTime % cities.size()]; + std::string age = std::to_string(std::abs(randThisTime) % AGE_RANGE); + FieldArray fields = { + {"name", name}, + {"address", address}, + {"age", age} + }; + if (address == "Shanghai") { + shanghaiKeys.insert(key); + } + if (age == "20") { + age20Keys.insert(key); + } + Status s = db->PutFields(WriteOptions(), key, fields); + ASSERT_TRUE(s.ok()); + } +} + +//并发时不一定能读到,加个参数控制 +void GetFieldData(FieldDB *db, bool allowNotFound, int seed = 0) { + std::cout << "-------getting-------" << std::endl; + ReadOptions readOptions; + int key_num = data_size / value_size; + + // 点查 + std::mt19937 rng(seed); + for (int i = 0; i < 100; i++) { + int randThisTime = rng(); + int key_ = std::abs(randThisTime) % key_num + 1; + std::string key = std::to_string(key_); + FieldArray fields_ret; + Status s = db->GetFields(readOptions, key, &fields_ret); + if (!allowNotFound){ //必须读到 + // if (!s.ok()){ + // std::cout << key << std::endl; + // } + ASSERT_TRUE(s.ok()); + } else { //不必须读到,但只要读到address必须正确 + if(s.IsNotFound()) continue; + } + for (const Field& pairs : fields_ret) { + if (pairs.first == "name"){ + + } else if (pairs.first == "address"){ + std::string city = pairs.second; + ASSERT_NE(std::find(cities.begin(), cities.end(), city), cities.end()); + + } else if (pairs.first == "age"){ + int age = std::stoi(pairs.second); + ASSERT_TRUE(age >= 0 && age < AGE_RANGE); + + } else assert(false); + } + } +} + +void findKeysByCity(FieldDB *db) { + std::cout << "-------getting field address-------" << std::endl; + Field field = {"address", "Shanghai"}; + std::vector resKeys = db->FindKeysByField(field); + //打印比较,因为shanghaikey可能被后写入的、其他address的key覆盖,打印出的后一个数应该小于前一个数 + //如果随机种子相同,每次打印出的两个数也应该相同 + std::cout << "address: " << shanghaiKeys.size() << " " << resKeys.size() << std::endl; + for (const std::string &key : resKeys){ + ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end()); + } +} + +// haveIndex表明数据库有没有该索引(address) +void findKeysByCityIndex(FieldDB *db, bool haveIndex) { + std::cout << "-------getting field address by index-------" << std::endl; + Field field = {"address", "Shanghai"}; + Status s; + std::vector resKeys = db->QueryByIndex(field, &s); + if (haveIndex) ASSERT_TRUE(s.ok()); + else { + ASSERT_TRUE(s.IsNotFound()); + return; + } + std::cout << "address: " << shanghaiKeys.size() << " " << resKeys.size() << std::endl;//打印比较 + for (const std::string &key : resKeys){ + ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end()); + } +} + +void findKeysByAgeIndex(FieldDB *db, bool haveIndex) { + std::cout << "-------getting field age by index-------" << std::endl; + Field field = {"age", "20"}; + Status s; + std::vector resKeys = db->QueryByIndex(field, &s); + if (haveIndex) ASSERT_TRUE(s.ok()); + else { + ASSERT_TRUE(s.IsNotFound()); + return; + } + std::cout << "age: " << age20Keys.size() << " " << resKeys.size() << std::endl; + for (const std::string &key : resKeys){ + ASSERT_NE(std::find(age20Keys.begin(), age20Keys.end(), key), age20Keys.end()); + } +} \ No newline at end of file diff --git a/test/parallel_test.cc b/test/parallel_test.cc new file mode 100644 index 0000000..98bf457 --- /dev/null +++ b/test/parallel_test.cc @@ -0,0 +1,163 @@ +#include "gtest/gtest.h" +#include +// #include "leveldb/env.h" +// #include "leveldb/db.h" +#include "fielddb/field_db.h" +#include "test/helper.cc" +using namespace fielddb; + +// 测试中read/write都表示带索引的读写 + +//读写有索引数据的并发 +TEST(TestReadWrite, Parallel) { + fielddb::DestroyDB("testdb2.1",Options()); + FieldDB *db = new FieldDB(); + + if(OpenDB("testdb2.1", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + // ClearDB(db); + int thread_num_ = 5; + std::vector threads(thread_num_); + //二写三读 + for (size_t i = 0; i < thread_num_; i++) + { + if (i == 0) {//写随机序列0 + threads[i] = std::thread(InsertFieldData, db, 0); + } else if (i == 1) + {//写随机序列1 + threads[i] = std::thread(InsertFieldData, db, 1); + } else {//读 + bool allowNotFound = true; + threads[i] = std::thread(GetFieldData, db, allowNotFound, 0); + } + } + + for (auto& t : threads) { + if (t.joinable()) { + t.join(); + } + } + + // 此时写已完成,一定能读到两次写 + bool allowNotFound = false; + GetFieldData(db, allowNotFound); + GetFieldData(db, allowNotFound, 1); + findKeysByCity(db); + delete db; +} + +//创建索引与写有该索引数据的并发 +TEST(TestWriteCreatei, Parallel) { + fielddb::DestroyDB("testdb2.2",Options()); + FieldDB *db = new FieldDB(); + + if(OpenDB("testdb2.2", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + // ClearDB(db); + shanghaiKeys.clear(); + InsertFieldData(db); + int thread_num_ = 2; + std::vector threads(thread_num_); + for (size_t i = 0; i < thread_num_; i++) + { + if (i == 0) {//创建索引 + threads[i] = std::thread([db](){ + db->CreateIndexOnField("address"); + std::cout << "finish create index\n"; + }); + } else {//写 + threads[i] = std::thread([db](){ + while (db->GetIndexStatus("address") == NotExist){ + continue; //开始创建了再并发的写 + } + InsertOneField(db); //先插一条 + }); + } + } + + for (auto& t : threads) { + if (t.joinable()) { + t.join(); + } + } + //检查索引是否创建成功 + bool haveIndex = true; + findKeysByCityIndex(db, haveIndex); + //检查写入是否成功 + GetOneField(db); + + delete db; +} + +//创建删除不同索引的并发 +TEST(TestCreateiCreatei, Parallel) { + fielddb::DestroyDB("testdb2.3",Options()); + FieldDB *db = new FieldDB(); + + if(OpenDB("testdb2.3", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + // ClearDB(db); + shanghaiKeys.clear(); + age20Keys.clear(); + InsertFieldData(db); + int thread_num_ = 3; + std::vector threads(thread_num_); + for (size_t i = 0; i < thread_num_; i++) + { + //3线程并发创建索引address + threads[i] = std::thread([db](){ + db->CreateIndexOnField("address"); + std::cout << "finish create index address\n"; + }); + } + + for (auto& t : threads) { + if (t.joinable()) { + t.join(); + } + } + //检查索引是否创建成功 + bool haveIndex = true; + findKeysByCityIndex(db, haveIndex); + findKeysByAgeIndex(db, false); + + + for (size_t i = 0; i < thread_num_; i++) + { + if (i == 0 || i == 1) {//2线程删除索引address + threads[i] = std::thread([db](){ + db->DeleteIndex("address"); + std::cout << "finish delete index address\n"; + }); + } else {//1线程创建索引age + threads[i] = std::thread([db](){ + db->CreateIndexOnField("age"); + std::cout << "finish create index age\n"; + }); + } + } + + for (auto& t : threads) { + if (t.joinable()) { + t.join(); + } + } + //检查 + findKeysByCityIndex(db, false); + findKeysByAgeIndex(db, true); + + delete db; +} +int main(int argc, char** argv) { + // All tests currently run with the same read-only file limits. + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file