Ver a proveniência

修正了创删索引时间算在并发读上的问题

pull/2/head
augurier há 8 meses
ascendente
cometimento
7f6ccefce0
2 ficheiros alterados com 85 adições e 100 eliminações
  1. +83
    -98
      benchmarks/db_bench_FieldDB.cc
  2. +2
    -2
      test/recover_test.cc

+ 83
- 98
benchmarks/db_bench_FieldDB.cc Ver ficheiro

@ -63,7 +63,6 @@ static const char* FLAGS_benchmarks =
"readreverse,"
"fill100K,"
"crc32c,"
"readwhilewriting,"
"CreateIndex,"
"FindKeysByField,"
"QueryByIndex,"
@ -78,11 +77,7 @@ static const char* FLAGS_benchmarks =
"ReadSeqWhileCreating,"
"ReadSeqWhileDeleting,"
"ReadRandomWhileCreating,"
"ReadRandomWhileDeleting,"
"snappycomp,"
"snappyuncomp,"
"zstdcomp,"
"zstduncomp,";
"ReadRandomWhileDeleting,";
// Number of key/values to place in database
static int FLAGS_num = 1000000;
@ -1155,17 +1150,7 @@ class Benchmark {
std::fprintf(stderr, "index status error in WriteWhileCreating\n");
std::exit(1);
}
while (true) {
if (db_->GetIndexStatus("age") == IndexStatus::Exist) {
break;
}
db_->CreateIndexOnField("age", write_options_);
}
// Do not count any of the preceding work/delay in stats.
thread->stats.Start();
db_->CreateIndexOnField("age", write_options_);
}
}
@ -1178,17 +1163,7 @@ class Benchmark {
std::fprintf(stderr, "index status error in WriteWhileDeleting\n");
std::exit(1);
}
while (true) {
if (db_->GetIndexStatus("age") == IndexStatus::NotExist) {
break;
}
db_->DeleteIndex("age", write_options_);
}
// Do not count any of the preceding work/delay in stats.
thread->stats.Start();
db_->DeleteIndex("age", write_options_);
}
}
@ -1201,17 +1176,7 @@ class Benchmark {
std::fprintf(stderr, "index status error in WriteWhileCreating\n");
std::exit(1);
}
while (true) {
if (db_->GetIndexStatus("age") == IndexStatus::Exist) {
break;
}
db_->CreateIndexOnField("age", write_options_);
}
// Do not count any of the preceding work/delay in stats.
thread->stats.Start();
db_->CreateIndexOnField("age", write_options_);
}
}
@ -1224,109 +1189,129 @@ class Benchmark {
std::fprintf(stderr, "index status error in WriteWhileDeleting\n");
std::exit(1);
}
while (true) {
if (db_->GetIndexStatus("age") == IndexStatus::NotExist) {
break;
}
db_->DeleteIndex("age", write_options_);
}
// Do not count any of the preceding work/delay in stats.
thread->stats.Start();
db_->DeleteIndex("age", write_options_);
}
}
void ReadSeqWhileCreating(ThreadState* thread) {
if (thread->tid > 0) {
ReadSequential(thread);
Iterator* iter = db_->NewIterator(ReadOptions());
iter->SeekToFirst();
int64_t bytes = 0;
while (true) {
{
MutexLock l(&thread->shared->mu);
if (thread->shared->num_done == 1) {
// 创删索引完成
delete iter;
thread->stats.AddBytes(bytes);
break;
}
}
bytes += iter->key().size() + iter->value().size();
thread->stats.FinishedSingleOp();
iter->Next();
if (!iter->Valid()) iter->SeekToFirst();
}
} else {
// Special thread that keeps creating index until other threads are done.
if (db_->GetIndexStatus("age") != IndexStatus::NotExist) {
std::fprintf(stderr, "index status error in WriteWhileCreating\n");
std::exit(1);
}
while (true) {
if (db_->GetIndexStatus("age") == IndexStatus::Exist) {
break;
}
db_->CreateIndexOnField("age", write_options_);
}
// Do not count any of the preceding work/delay in stats.
thread->stats.Start();
db_->CreateIndexOnField("age", write_options_);
}
}
void ReadSeqWhileDeleting(ThreadState* thread) {
if (thread->tid > 0) {
ReadSequential(thread);
Iterator* iter = db_->NewIterator(ReadOptions());
iter->SeekToFirst();
int64_t bytes = 0;
while (true) {
{
MutexLock l(&thread->shared->mu);
if (thread->shared->num_done == 1) {
// 创删索引完成
delete iter;
thread->stats.AddBytes(bytes);
break;
}
}
bytes += iter->key().size() + iter->value().size();
thread->stats.FinishedSingleOp();
iter->Next();
if (!iter->Valid()) iter->SeekToFirst();
}
} else {
// Special thread that keeps creating index until other threads are done.
if (db_->GetIndexStatus("age") != IndexStatus::Exist) {
std::fprintf(stderr, "index status error in WriteWhileDeleting\n");
std::exit(1);
}
while (true) {
if (db_->GetIndexStatus("age") == IndexStatus::NotExist) {
break;
}
db_->DeleteIndex("age", write_options_);
}
// Do not count any of the preceding work/delay in stats.
thread->stats.Start();
db_->DeleteIndex("age", write_options_);
}
}
void ReadRandomWhileCreating(ThreadState* thread) {
if (thread->tid > 0) {
ReadRandom(thread);
ReadOptions options;
int found = 0;
KeyBuffer key;
while (true) {
{
MutexLock l(&thread->shared->mu);
if (thread->shared->num_done == 1) {
// 创删索引完成
break;
}
}
const int k = thread->rand.Uniform(FLAGS_num);
key.Set(k);
FieldArray fields_ret;
if (db_->GetFields(options, key.slice(), &fields_ret).ok()) {
found++;
}
thread->stats.FinishedSingleOp();
}
} else {
// Special thread that keeps creating index until other threads are done.
if (db_->GetIndexStatus("age") != IndexStatus::NotExist) {
std::fprintf(stderr, "index status error in WriteWhileCreating\n");
std::exit(1);
}
while (true) {
if (db_->GetIndexStatus("age") == IndexStatus::Exist) {
break;
}
db_->CreateIndexOnField("age", write_options_);
}
// Do not count any of the preceding work/delay in stats.
thread->stats.Start();
db_->CreateIndexOnField("age", write_options_);
}
}
void ReadRandomWhileDeleting(ThreadState* thread) {
if (thread->tid > 0) {
ReadRandom(thread);
ReadOptions options;
int found = 0;
KeyBuffer key;
while (true) {
{
MutexLock l(&thread->shared->mu);
if (thread->shared->num_done == 1) {
// 创删索引完成
break;
}
}
const int k = thread->rand.Uniform(FLAGS_num);
key.Set(k);
FieldArray fields_ret;
if (db_->GetFields(options, key.slice(), &fields_ret).ok()) {
found++;
}
thread->stats.FinishedSingleOp();
}
} else {
// Special thread that keeps creating index until other threads are done.
if (db_->GetIndexStatus("age") != IndexStatus::Exist) {
std::fprintf(stderr, "index status error in WriteWhileDeleting\n");
std::exit(1);
}
while (true) {
if (db_->GetIndexStatus("age") == IndexStatus::NotExist) {
break;
}
db_->DeleteIndex("age", write_options_);
}
// Do not count any of the preceding work/delay in stats.
thread->stats.Start();
db_->DeleteIndex("age", write_options_);
}
}
};

+ 2
- 2
test/recover_test.cc Ver ficheiro

@ -45,8 +45,8 @@ TEST(TestParalRecover, Recover) {
// std::cerr << "open db failed" << std::endl;
// abort();
// }
// db->CreateIndexOnField("address");
// db->CreateIndexOnField("age");
// db->CreateIndexOnField("address", op);
// db->CreateIndexOnField("age", op);
// int thread_num_ = 4;
// std::vector<std::thread> threads(thread_num_);
// threads[0] = std::thread([db](){

Carregando…
Cancelar
Guardar