Browse Source

Merge commit '4c938eb956ae880d70d4a835f5d5f93db9231d9b' into cyq

pull/2/head
cyq 8 months ago
parent
commit
cac71dedbd
7 changed files with 56 additions and 44 deletions
  1. +22
    -9
      benchmarks/db_bench_FieldDB.cc
  2. +12
    -13
      fielddb/field_db.cpp
  3. +3
    -3
      fielddb/field_db.h
  4. +3
    -4
      test/basic_function_test.cc
  5. +1
    -0
      test/helper.cc
  6. +13
    -13
      test/parallel_test.cc
  7. +2
    -2
      test/recover_test.cc

+ 22
- 9
benchmarks/db_bench_FieldDB.cc View File

@ -664,7 +664,7 @@ class Benchmark {
} else {
delete db_;
db_ = nullptr;
DestroyDB(FLAGS_db, Options());
//DestroyDB(FLAGS_db, Options());
Open();
}
}
@ -821,6 +821,7 @@ class Benchmark {
options.compression =
FLAGS_compression ? kSnappyCompression : kNoCompression;
// Status s = DB::Open(options, FLAGS_db, &db_);
fielddb::DestroyDB(FLAGS_db, options);
db_ = new FieldDB();
Status s = FieldDB::OpenFieldDB(options, FLAGS_db, &db_);
if (!s.ok()) {
@ -858,8 +859,20 @@ class Benchmark {
for (int j = 0; j < entries_per_batch_; j++) {
const int k = seq ? i + j : thread->rand.Uniform(FLAGS_num);
key.Set(k);
batch.Put(key.slice(), gen.Generate(value_size_));
bytes += value_size_ + key.slice().size();
std::string name = "customer#" + std::to_string(k);
//这个字段用来查找
std::string age = std::to_string(thread->rand.Uniform(FLAGS_num) % 100);
//这个字段填充长度
std::string tag = gen.Generate(value_size_).ToString();
FieldArray fields = {
{"name", name},
{"age", age},
{"tag", tag}
};
Slice value = SerializeValue(fields);
batch.Put(key.slice(), value);
bytes += value.size() + key.slice().size();
thread->stats.FinishedSingleOp();
}
s = db_->Write(write_options_, &batch);
@ -899,13 +912,13 @@ class Benchmark {
void ReadRandom(ThreadState* thread) {
ReadOptions options;
std::string value;
int found = 0;
KeyBuffer key;
for (int i = 0; i < reads_; i++) {
const int k = thread->rand.Uniform(FLAGS_num);
key.Set(k);
if (db_->Get(options, key.slice(), &value).ok()) {
FieldArray fields_ret;
if (db_->GetFields(options, key.slice(), &fields_ret).ok()) {
found++;
}
thread->stats.FinishedSingleOp();
@ -917,26 +930,26 @@ class Benchmark {
void ReadMissing(ThreadState* thread) {
ReadOptions options;
std::string value;
FieldArray fields_ret;
KeyBuffer key;
for (int i = 0; i < reads_; i++) {
const int k = thread->rand.Uniform(FLAGS_num);
key.Set(k);
Slice s = Slice(key.slice().data(), key.slice().size() - 1);
db_->Get(options, s, &value);
db_->GetFields(options, s, &fields_ret);
thread->stats.FinishedSingleOp();
}
}
void ReadHot(ThreadState* thread) {
ReadOptions options;
std::string value;
FieldArray fields_ret;
const int range = (FLAGS_num + 99) / 100;
KeyBuffer key;
for (int i = 0; i < reads_; i++) {
const int k = thread->rand.Uniform(range);
key.Set(k);
db_->Get(options, key.slice(), &value);
db_->GetFields(options, key.slice(), &fields_ret);
thread->stats.FinishedSingleOp();
}
}

+ 12
- 13
fielddb/field_db.cpp View File

@ -107,7 +107,7 @@ Status FieldDB::Recover() {
//在所有的请求完成后,会自动把metaDB的内容清空。
Iter = metaDB_->NewIterator(ReadOptions());
Iter->SeekToFirst();
std::cout << "Iter Valid : " << Iter->Valid() << std::endl;
//std::cout << "Iter Valid : " << Iter->Valid() << std::endl;
delete Iter;
//3. 等待所有请求完成
return Status::OK();
@ -125,7 +125,7 @@ Request *FieldDB::GetHandleInterval() {
return tail;
}
Status FieldDB::HandleRequest(Request &req) {
Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) {
uint64_t start_ = env_->NowMicros();
MutexLock L(&mutex_);
taskqueue_.push_back(&req);
@ -156,7 +156,6 @@ Status FieldDB::HandleRequest(Request &req) {
//此处可以放锁是因为写入的有序性可以通过队列来保证
mutex_.Unlock();
uint64_t start_write = env_->NowMicros();
WriteOptions op;
if(MetaBatch.ApproximateSize() > 12) {
uint64_t start_meta = env_->NowMicros();
status = metaDB_->Write(op, &MetaBatch);
@ -215,7 +214,7 @@ Status FieldDB::HandleRequest(Request &req) {
elapsed += env_->NowMicros() - start_;
count ++;
dumpStatistics();
//dumpStatistics();
if(!taskqueue_.empty()) {
taskqueue_.front()->cond_.Signal();
@ -240,7 +239,7 @@ Status FieldDB::PutFields(const WriteOptions &Options,
FieldsReq req(&key_,&fields_,&mutex_);
Status status = HandleRequest(req);
Status status = HandleRequest(req, Options);
return status;
}
@ -249,7 +248,7 @@ Status FieldDB::Delete(const WriteOptions &options, const Slice &key) {
std::string key_ = key.ToString();
DeleteReq req(&key_,&mutex_);
Status status = HandleRequest(req);
Status status = HandleRequest(req, options);
return status;
}
@ -266,7 +265,7 @@ Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) {
uint64_t start_ = env_->NowMicros();
BatchReq req(updates,&mutex_);
construct_BatchReq_init_elapsed += env_->NowMicros() - start_;
Status status = HandleRequest(req);
Status status = HandleRequest(req, options);
return status;
}
//由于常规put将空串作为name,这里也需要适当修改
@ -305,10 +304,10 @@ std::vector> FieldDB::FindKeysAndValByFieldN
return result;
}
Status FieldDB::CreateIndexOnField(const std::string& field_name) {
Status FieldDB::CreateIndexOnField(const std::string& field_name, const WriteOptions &op) {
std::string Field = field_name;
iCreateReq req(&Field,&mutex_);
HandleRequest(req);
HandleRequest(req, op);
//如果已经存在索引,那么直接返回
if(req.Existed) {
return req.s;
@ -316,15 +315,15 @@ Status FieldDB::CreateIndexOnField(const std::string& field_name) {
WriteBatch KVBatch,IndexBatch,MetaBatch;
std::unordered_set<std::string> useless;
req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless);
indexDB_->Write(WriteOptionspan>(), &IndexBatch);
indexDB_->Write(op, &IndexBatch);
req.Finalize(this);
return req.s;
}
Status FieldDB::DeleteIndex(const std::string &field_name) {
Status FieldDB::DeleteIndex(const std::string &field_name, const WriteOptions &op) {
std::string Field = field_name;
iDeleteReq req(&Field,&mutex_);
HandleRequest(req);
HandleRequest(req, op);
//如果已经被删除或者不存在,那么可以直接返回
if(req.Deleted) {
return req.s;
@ -332,7 +331,7 @@ Status FieldDB::DeleteIndex(const std::string &field_name) {
WriteBatch KVBatch,IndexBatch,MetaBatch;
std::unordered_set<std::string> useless;
req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless);
indexDB_->Write(WriteOptionspan>(), &IndexBatch);
indexDB_->Write(op, &IndexBatch);
req.Finalize(this);
return req.s;
}

+ 3
- 3
fielddb/field_db.h View File

@ -54,8 +54,8 @@ public:
void GetApproximateSizes(const Range *range, int n, uint64_t *sizes) override;
void CompactRange(const Slice *begin, const Slice *end) override;
/*与索引相关*/
Status CreateIndexOnField(const std::string& field_name);
Status DeleteIndex(const std::string &field_name);
Status CreateIndexOnField(const std::string& field_name, const WriteOptions &op);
Status DeleteIndex(const std::string &field_name, const WriteOptions &op);
std::vector<std::string> QueryByIndex(const Field &field, Status *s);
//
IndexStatus GetIndexStatus(const std::string &fieldName);
@ -88,7 +88,7 @@ private:
const std::string &fieldName);
/*For request handling*/
Status HandleRequest(Request &req); //
Status HandleRequest(Request &req, const WriteOptions &op); //
Request *GetHandleInterval(); //
private:

+ 3
- 4
test/basic_function_test.cc View File

@ -34,18 +34,17 @@ TEST(TestLab2, Basic) {
std::cerr << "open db failed" << std::endl;
abort();
}
// ClearDB(db);
shanghaiKeys.clear();
age20Keys.clear();
InsertFieldData(db);
// GetFieldData(db);
// findKeysByCity(db);
db->CreateIndexOnField("address");
db->CreateIndexOnField("age");
db->CreateIndexOnField("address", op);
db->CreateIndexOnField("age", op);
findKeysByCityIndex(db, true);
findKeysByAgeIndex(db, true);
db->DeleteIndex("address");
db->DeleteIndex("address", op);
findKeysByCityIndex(db, false);
findKeysByAgeIndex(db, true);

+ 1
- 0
test/helper.cc View File

@ -22,6 +22,7 @@ ThreadSafeSet age20Keys;
//目前只有InsertFieldData和InsertOneField和writeFieldData会往里加,
//DeleteFieldData和InsertOneField会删除,
//其他测试之间有必要手动clear
const WriteOptions op;
Status OpenDB(std::string dbName, FieldDB **db) {
Options options;

+ 13
- 13
test/parallel_test.cc View File

@ -18,8 +18,8 @@ TEST(TestReadPut, Parallel) {
abort();
}
// ClearDB(db);
db->CreateIndexOnField("address");
db->CreateIndexOnField("age");
db->CreateIndexOnField("address", op);
db->CreateIndexOnField("age", op);
int thread_num_ = 5;
std::vector<std::thread> threads(thread_num_);
//二写三读
@ -70,7 +70,7 @@ TEST(TestPutCreatei, Parallel) {
{
if (i == 0) {//创建索引
threads[i] = std::thread([db](){
db->CreateIndexOnField("address");
db->CreateIndexOnField("address", op);
std::cout << "finish create index\n";
});
} else {//写
@ -118,7 +118,7 @@ TEST(TestCreateiCreatei, Parallel) {
{
//3线程并发创建索引address
threads[i] = std::thread([db](){
db->CreateIndexOnField("address");
db->CreateIndexOnField("address", op);
std::cout << "finish create index address\n";
});
}
@ -139,12 +139,12 @@ TEST(TestCreateiCreatei, Parallel) {
{
if (i == 0 || i == 1) {//2线程删除索引address
threads[i] = std::thread([db](){
db->DeleteIndex("address");
db->DeleteIndex("address", op);
std::cout << "finish delete index address\n";
});
} else {//1线程创建索引age
threads[i] = std::thread([db](){
db->CreateIndexOnField("age");
db->CreateIndexOnField("age", op);
std::cout << "finish create index age\n";
});
}
@ -175,8 +175,8 @@ TEST(TestPutDeleteOne, Parallel) {
// ClearDB(db);
shanghaiKeys.clear();
age20Keys.clear();
db->CreateIndexOnField("address");
db->CreateIndexOnField("age");
db->CreateIndexOnField("address", op);
db->CreateIndexOnField("age", op);
int thread_num_ = 20;
std::vector<std::thread> threads(thread_num_);
for (size_t i = 0; i < thread_num_; i++)
@ -222,8 +222,8 @@ TEST(TestPutDelete, Parallel) {
// ClearDB(db);
shanghaiKeys.clear();
age20Keys.clear();
db->CreateIndexOnField("address");
db->CreateIndexOnField("age");
db->CreateIndexOnField("address", op);
db->CreateIndexOnField("age", op);
int thread_num_ = 4;
std::vector<std::thread> threads(thread_num_);
threads[0] = std::thread([db](){InsertFieldData(db);});
@ -255,11 +255,11 @@ TEST(TestWrite, Parallel) {
// ClearDB(db);
shanghaiKeys.clear();
age20Keys.clear();
db->CreateIndexOnField("address");
db->CreateIndexOnField("address", op);
InsertFieldData(db, 2); //先填点数据,让创建索引的时间久一点
int thread_num_ = 5;
std::vector<std::thread> threads(thread_num_);
threads[0] = std::thread([db](){db->CreateIndexOnField("age");});
threads[0] = std::thread([db](){db->CreateIndexOnField("age", op);});
threads[1] = std::thread([db](){
while (db->GetIndexStatus("age") == NotExist){
continue; //开始创建了再并发的写
@ -279,7 +279,7 @@ TEST(TestWrite, Parallel) {
while (db->GetIndexStatus("age") == NotExist){
continue;
}
db->DeleteIndex("age");});
db->DeleteIndex("age", op);});
for (auto& t : threads) {
if (t.joinable()) {

+ 2
- 2
test/recover_test.cc View File

@ -16,8 +16,8 @@ TEST(TestNormalRecover, Recover) {
std::cerr << "open db failed" << std::endl;
abort();
}
db->CreateIndexOnField("address");
db->CreateIndexOnField("age");
db->CreateIndexOnField("address", op);
db->CreateIndexOnField("age", op);
InsertFieldData(db);
bool allowNotFound = false;
GetFieldData(db, allowNotFound);

Loading…
Cancel
Save