Browse Source

加了点测试,修了点bug

pull/2/head
augurier 8 months ago
parent
commit
462019353e
6 changed files with 175 additions and 28 deletions
  1. +3
    -3
      fielddb/field_db.cpp
  2. +18
    -14
      fielddb/request.cpp
  3. +5
    -5
      fielddb/request.h
  4. +29
    -6
      test/helper.cc
  5. +34
    -0
      test/helper.h
  6. +86
    -0
      test/parallel_test.cc

+ 3
- 3
fielddb/field_db.cpp View File

@ -82,7 +82,7 @@ Again:
}
Request *tail = GetHandleInterval();
WriteBatch KVBatch,IndexBatch,MetaBatch;
std::unordered_set<std::string *> batchKeySet;
std::unordered_set<std::string> batchKeySet;
Status status;
if(!tail->isiCreateReq() && !tail->isiDeleteReq()) {
//表明这一个区间并没有涉及index的创建删除
@ -220,7 +220,7 @@ Status FieldDB::CreateIndexOnField(const std::string& field_name) {
return req.s;
}
WriteBatch KVBatch,IndexBatch,MetaBatch;
std::unordered_set<std::string *> useless;
std::unordered_set<std::string> useless;
req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless);
indexDB_->Write(WriteOptions(), &IndexBatch);
req.Finalize(this);
@ -236,7 +236,7 @@ Status FieldDB::DeleteIndex(const std::string &field_name) {
return req.s;
}
WriteBatch KVBatch,IndexBatch,MetaBatch;
std::unordered_set<std::string *> useless;
std::unordered_set<std::string> useless;
req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless);
indexDB_->Write(WriteOptions(), &IndexBatch);
req.Finalize(this);

+ 18
- 14
fielddb/request.cpp View File

@ -19,7 +19,8 @@ void Request::PendReq(Request *req) {
//为虚函数提供最基本的实现
void Request::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet)
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
{
assert(0);
}
@ -41,12 +42,13 @@ bool Request::isPending() {
/*******FieldsReq*******/
void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet)
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
{
if (batchKeySet.find(Key) != batchKeySet.end()){
if (batchKeySet.find(*Key) != batchKeySet.end()){
return;//并发的被合并的put/delete请求只处理一次
} else {
batchKeySet.insert(Key);
batchKeySet.insert(*Key);
}
std::string val_str;
Status s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str);
@ -59,8 +61,6 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
assert(0);
}
KVBatch.Put(Slice(*Key), Slice(SerializeValue(*Fields)));
bool HasIndex = false;
bool HasOldIndex = false;
{
@ -96,7 +96,8 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
}
}
}
KVBatch.Put(Slice(*Key), Slice(SerializeValue(*Fields)));
//2.对于没有冲突但含有索引操作的put,构建metaKV,这里直接将KV对简单编码后写入metaDB
if(HasIndex || HasOldIndex) {
Slice MetaKey,MetaValue;
@ -140,12 +141,13 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
/*******DeleteReq*******/
void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet)
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
{
if (batchKeySet.find(Key) != batchKeySet.end()){
if (batchKeySet.find(*Key) != batchKeySet.end()){
return;//并发的被合并的put/delete请求只处理一次
} else {
batchKeySet.insert(Key);
batchKeySet.insert(*Key);
}
//1. 读取当前的最新的键值对,判断是否存在含有键值对的field
//2.1 如果无,则正常构造delete
@ -153,9 +155,9 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
//在kvDB和metaDB中写入对应的delete
//2.3 如果存在field的索引状态是Creating或者Deleting,那么在那个队列上面进行等待
std::string val_str;
DB->kvDB_->Get(ReadOptions(), *Key, &val_str);
Status s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str);
if (s.IsNotFound()) return;
FieldArray *Fields = ParseValue(val_str);
KVBatch.Delete(Slice(*Key));
bool HasIndex = false;
{
// MutexLock L(&DB->index_mu); //互斥访问索引状态表
@ -174,6 +176,7 @@ void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
//assert(0);
}
}
KVBatch.Delete(Slice(*Key));
//2.对于没有冲突但含有索引操作的delete,构建metaKV,这里直接将KV对简单编码后写入metaDB
if(HasIndex) {
Slice MetaKey;
@ -223,7 +226,8 @@ void iCreateReq::PendReq(Request *req) {
}
void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet)
WriteBatch &MetaBatch,fielddb::FieldDB *DB,
std::unordered_set<std::string> &batchKeySet)
{
//遍历数据库,构建二级索引到indexbatch,(更新metaDB中的元数据为Index类型的(Field,Creating))
//一个indexwritebatch写入,那么索引创建删除应该和metadb没有交互
@ -282,7 +286,7 @@ void iDeleteReq::PendReq(Request* req) {
}
void iDeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet)
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet)
{
std::vector<std::pair<std::string, std::string>> keysAndVal =
DB->FindKeysAndValByFieldName(*Field);

+ 5
- 5
fielddb/request.h View File

@ -44,7 +44,7 @@ public:
//Fields的
virtual void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet);
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet);
//icreate和idelete在队列中的注册当前状态
virtual void Prepare(FieldDB *DB);
virtual void Finalize(FieldDB *DB);
@ -66,7 +66,7 @@ public:
Key(Key),Fields(Fields),Request(FieldsReq_t,mu) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
std::string *Key;
FieldArray *Fields;
@ -91,7 +91,7 @@ public:
Field(Field),Request(iCreateReq_t, mu),Existed(false) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
void Prepare(FieldDB *DB) override;
void Finalize(FieldDB *DB) override;
void PendReq(Request *req) override;
@ -108,7 +108,7 @@ public:
Field(Field),Request(iDeleteReq_t, mu),Deleted(false) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
void Prepare(FieldDB *DB) override;
void Finalize(FieldDB *DB) override;
void PendReq(Request *req) override;
@ -125,7 +125,7 @@ public:
Key(Key),Request(DeleteReq_t,mu) { };
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string *> &batchKeySet) override;
WriteBatch &MetaBatch,fielddb::FieldDB *DB,std::unordered_set<std::string> &batchKeySet) override;
std::string *Key;
};

+ 29
- 6
test/helper.cc View File

@ -3,6 +3,7 @@
// #include "leveldb/db.h"
#include "fielddb/field_db.h"
#include <random>
#include "helper.h"
using namespace fielddb;
constexpr int value_size = 2048;
@ -13,10 +14,12 @@ std::vector cities = {
"Chengdu", "Chongqing", "Wuhan", "Suzhou", "Tianjin"
};
//检查insert和queryByIndex的数据是否对应
std::set<std::string> shanghaiKeys;
std::set<std::string> age20Keys;
//封装了一个线程安全的全局set
ThreadSafeSet shanghaiKeys;
ThreadSafeSet age20Keys;
//复杂的测试要注意这两个全局变量,
//目前只有InsertFieldData和InsertOneField会往里加,DeleteFieldData会全部清空,
//目前只有InsertFieldData和InsertOneField会往里加,DeleteFieldData和InsertOneField会删除
//其他测试之间有必要手动clear
Status OpenDB(std::string dbName, FieldDB **db) {
@ -51,6 +54,15 @@ void InsertOneField(FieldDB *db, std::string key = "0") {
age20Keys.insert(key);
}
//只删一条特定数据的测试
void DeleteOneField(FieldDB *db, std::string key = "0") {
WriteOptions writeOptions;
Status s = db->Delete(WriteOptions(), key);
ASSERT_TRUE(s.ok());
shanghaiKeys.erase(key);
age20Keys.erase(key);
}
//与上面对应
void GetOneField(FieldDB *db, std::string key = "0") {
ReadOptions readOptions;
@ -183,7 +195,7 @@ void findKeysByCity(FieldDB *db) {
//如果随机种子相同,每次打印出的两个数也应该相同
std::cout << "address: " << shanghaiKeys.size() << " " << resKeys.size() << std::endl;
for (const std::string &key : resKeys){
ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end());
ASSERT_TRUE(shanghaiKeys.haveKey(key));
}
}
@ -200,7 +212,7 @@ void findKeysByCityIndex(FieldDB *db, bool haveIndex) {
}
std::cout << "address: " << shanghaiKeys.size() << " " << resKeys.size() << std::endl;//打印比较
for (const std::string &key : resKeys){
ASSERT_NE(std::find(shanghaiKeys.begin(), shanghaiKeys.end(), key), shanghaiKeys.end());
ASSERT_TRUE(shanghaiKeys.haveKey(key));
}
}
@ -216,6 +228,17 @@ void findKeysByAgeIndex(FieldDB *db, bool haveIndex) {
}
std::cout << "age: " << age20Keys.size() << " " << resKeys.size() << std::endl;
for (const std::string &key : resKeys){
ASSERT_NE(std::find(age20Keys.begin(), age20Keys.end(), key), age20Keys.end());
ASSERT_TRUE(age20Keys.haveKey(key));
}
}
void checkDataInKVAndIndex(FieldDB *db) {
Field field = {"address", "Shanghai"};
Status s;
std::vector<std::string> resKeys1 = db->QueryByIndex(field, &s); //indexdb根据索引查到的数据
std::vector<std::string> resKeys2 = db->FindKeysByField(field); //kvdb强行遍历查到的数据
std::sort(resKeys1.begin(), resKeys1.end());
std::sort(resKeys2.begin(), resKeys2.end());
std::cout << resKeys1.size() << " " << resKeys2.size() << std::endl;
ASSERT_EQ(resKeys1, resKeys2);
}

+ 34
- 0
test/helper.h View File

@ -0,0 +1,34 @@
#include "fielddb/field_db.h"
using namespace fielddb;
class ThreadSafeSet
{
private:
std::set<std::string> keys;
std::mutex setMutex;
public:
ThreadSafeSet(){}
void insert(std::string key){
std::lock_guard<std::mutex> lock(setMutex);
keys.insert(key);
}
void erase(std::string key){
std::lock_guard<std::mutex> lock(setMutex);
keys.erase(key);
}
void clear(){
std::lock_guard<std::mutex> lock(setMutex);
keys.clear();
}
size_t size(){
std::lock_guard<std::mutex> lock(setMutex);
return keys.size();
}
bool haveKey(std::string key){
return std::find(keys.begin(), keys.end(), key) != keys.end();
}
};

+ 86
- 0
test/parallel_test.cc View File

@ -18,6 +18,8 @@ TEST(TestReadWrite, Parallel) {
abort();
}
// ClearDB(db);
db->CreateIndexOnField("address");
db->CreateIndexOnField("age");
int thread_num_ = 5;
std::vector<std::thread> threads(thread_num_);
//二写三读
@ -45,6 +47,7 @@ TEST(TestReadWrite, Parallel) {
GetFieldData(db, allowNotFound);
GetFieldData(db, allowNotFound, 1);
findKeysByCity(db);
checkDataInKVAndIndex(db);
delete db;
}
@ -90,6 +93,7 @@ TEST(TestWriteCreatei, Parallel) {
findKeysByCityIndex(db, haveIndex);
//检查写入是否成功
GetOneField(db);
checkDataInKVAndIndex(db);
delete db;
}
@ -128,6 +132,7 @@ TEST(TestCreateiCreatei, Parallel) {
bool haveIndex = true;
findKeysByCityIndex(db, haveIndex);
findKeysByAgeIndex(db, false);
checkDataInKVAndIndex(db);
for (size_t i = 0; i < thread_num_; i++)
@ -156,6 +161,87 @@ TEST(TestCreateiCreatei, Parallel) {
delete db;
}
//有索引时,大量并发put与delete相同key,确保kvdb和indexdb的一致性
TEST(TestPutDeleteOne, Parallel) {
fielddb::DestroyDB("testdb2.4",Options());
FieldDB *db = new FieldDB();
if(OpenDB("testdb2.4", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
shanghaiKeys.clear();
age20Keys.clear();
db->CreateIndexOnField("address");
db->CreateIndexOnField("age");
int thread_num_ = 20;
std::vector<std::thread> threads(thread_num_);
for (size_t i = 0; i < thread_num_; i++)
{
if (i % 2 == 0) {//2线程删除索引address
threads[i] = std::thread([db](){
for (size_t j = 0; j < 100; j++)
{
InsertOneField(db, std::to_string(j));
}
});
} else {//1线程创建索引age
threads[i] = std::thread([db](){
for (size_t j = 0; j < 100; j++)
{
DeleteOneField(db, std::to_string(j));
}
});
}
}
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
//检查
checkDataInKVAndIndex(db);
delete db;
}
//有索引时,put与delete的并发,确保kvdb和indexdb的一致性
TEST(TestPutDelete, Parallel) {
fielddb::DestroyDB("testdb2.5",Options());
FieldDB *db = new FieldDB();
if(OpenDB("testdb2.5", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
shanghaiKeys.clear();
age20Keys.clear();
db->CreateIndexOnField("address");
db->CreateIndexOnField("age");
int thread_num_ = 4;
std::vector<std::thread> threads(thread_num_);
threads[0] = std::thread([db](){InsertFieldData(db);});
threads[1] = std::thread([db](){InsertFieldData(db, 1);});
threads[2] = std::thread([db](){DeleteFieldData(db);});
threads[3] = std::thread([db](){DeleteFieldData(db, 1);});
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
//检查
checkDataInKVAndIndex(db);
delete db;
}
int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits.
testing::InitGoogleTest(&argc, argv);

Loading…
Cancel
Save