瀏覽代碼

插桩会影响恢复测试(env的问题)先注释了

pull/2/head
augurier 8 月之前
父節點
當前提交
75ff6b6c12
共有 3 個檔案被更改,包括 26 行新增26 行删除
  1. +18
    -18
      fielddb/field_db.cpp
  2. +7
    -8
      fielddb/request.cpp
  3. +1
    -0
      test/recover_test.cc

+ 18
- 18
fielddb/field_db.cpp 查看文件

@ -125,17 +125,17 @@ Request *FieldDB::GetHandleInterval() {
} }
Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) { Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) {
uint64_t start_ = env_->NowMicros();
//uint64_t start_ = env_->NowMicros();
MutexLock L(&mutex_); MutexLock L(&mutex_);
taskqueue_.push_back(&req); taskqueue_.push_back(&req);
while(true){ while(true){
uint64_t start_waiting = env_->NowMicros();
//uint64_t start_waiting = env_->NowMicros();
while(req.isPending() || !req.done && &req != taskqueue_.front()) { while(req.isPending() || !req.done && &req != taskqueue_.front()) {
req.cond_.Wait(); req.cond_.Wait();
} }
waiting_elasped += env_->NowMicros() - start_waiting;
//waiting_elasped += env_->NowMicros() - start_waiting;
if(req.done) { if(req.done) {
elapsed += env_->NowMicros() - start_;
//elapsed += env_->NowMicros() - start_;
count ++; count ++;
// dumpStatistics(); // dumpStatistics();
return req.s; //在返回时自动释放锁L return req.s; //在返回时自动释放锁L
@ -149,48 +149,48 @@ Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) {
{ {
//1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。 //1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。
MutexLock iL(&index_mu); MutexLock iL(&index_mu);
uint64_t start_construct = env_->NowMicros();
//uint64_t start_construct = env_->NowMicros();
for(auto *req_ptr : taskqueue_) { for(auto *req_ptr : taskqueue_) {
req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, batchKeySet); req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, batchKeySet);
if(req_ptr == tail) break; if(req_ptr == tail) break;
} }
construct_elapsed += env_->NowMicros() - start_construct;
//construct_elapsed += env_->NowMicros() - start_construct;
} }
//2. 首先写入meta,再并发写入index和kv,完成之后清除meta数据 //2. 首先写入meta,再并发写入index和kv,完成之后清除meta数据
//此处可以放锁是因为写入的有序性可以通过队列来保证 //此处可以放锁是因为写入的有序性可以通过队列来保证
mutex_.Unlock(); mutex_.Unlock();
uint64_t start_write = env_->NowMicros();
//uint64_t start_write = env_->NowMicros();
if(MetaBatch.ApproximateSize() > 12) { if(MetaBatch.ApproximateSize() > 12) {
uint64_t start_meta = env_->NowMicros();
//uint64_t start_meta = env_->NowMicros();
status = metaDB_->Write(op, &MetaBatch); status = metaDB_->Write(op, &MetaBatch);
write_meta_elapsed += env_->NowMicros() - start_meta;
//write_meta_elapsed += env_->NowMicros() - start_meta;
write_bytes += MetaBatch.ApproximateSize(); write_bytes += MetaBatch.ApproximateSize();
assert(status.ok()); assert(status.ok());
} }
//TODO:index的写入需要在另外一个线程中同时完成 //TODO:index的写入需要在另外一个线程中同时完成
if(IndexBatch.ApproximateSize() > 12) { if(IndexBatch.ApproximateSize() > 12) {
uint64_t start_index = env_->NowMicros();
//uint64_t start_index = env_->NowMicros();
status = indexDB_->Write(op, &IndexBatch); status = indexDB_->Write(op, &IndexBatch);
write_index_elapsed += env_->NowMicros() - start_index;
//write_index_elapsed += env_->NowMicros() - start_index;
write_bytes += IndexBatch.ApproximateSize(); write_bytes += IndexBatch.ApproximateSize();
assert(status.ok()); assert(status.ok());
} }
if(KVBatch.ApproximateSize() > 12) { if(KVBatch.ApproximateSize() > 12) {
uint64_t start_kv = env_->NowMicros();
//uint64_t start_kv = env_->NowMicros();
status = kvDB_->Write(op, &KVBatch); status = kvDB_->Write(op, &KVBatch);
write_kv_elapsed += env_->NowMicros() - start_kv;
//write_kv_elapsed += env_->NowMicros() - start_kv;
write_bytes += KVBatch.ApproximateSize(); write_bytes += KVBatch.ApproximateSize();
assert(status.ok()); assert(status.ok());
} }
//3. 将meta数据清除 //3. 将meta数据清除
if(MetaBatch.ApproximateSize() > 12) { if(MetaBatch.ApproximateSize() > 12) {
uint64_t start_clean = env_->NowMicros();
//uint64_t start_clean = env_->NowMicros();
MetaCleaner cleaner; MetaCleaner cleaner;
cleaner.Collect(MetaBatch); cleaner.Collect(MetaBatch);
cleaner.CleanMetaBatch(metaDB_); cleaner.CleanMetaBatch(metaDB_);
write_clean_elapsed += env_->NowMicros() - start_clean;
//write_clean_elapsed += env_->NowMicros() - start_clean;
} }
write_elapsed += env_->NowMicros() - start_write;
//write_elapsed += env_->NowMicros() - start_write;
mutex_.Lock(); mutex_.Lock();
} else { } else {
//对于创建和删除索引的请求,通过prepare完成索引状态的更新 //对于创建和删除索引的请求,通过prepare完成索引状态的更新
@ -263,9 +263,9 @@ Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) {
// dumpStatistics(); // dumpStatistics();
// return status; // return status;
// } // }
uint64_t start_ = env_->NowMicros();
//uint64_t start_ = env_->NowMicros();
BatchReq req(updates,&mutex_); BatchReq req(updates,&mutex_);
construct_BatchReq_init_elapsed += env_->NowMicros() - start_;
//construct_BatchReq_init_elapsed += env_->NowMicros() - start_;
Status status = HandleRequest(req, options); Status status = HandleRequest(req, options);
return status; return status;
} }

+ 7
- 8
fielddb/request.cpp 查看文件

@ -56,10 +56,9 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
batchKeySet.insert(Key); batchKeySet.insert(Key);
} }
std::string val_str; std::string val_str;
Status s = Status::NotFound("test");
uint64_t start_ = DB->env_->NowMicros();
//uint64_t start_ = DB->env_->NowMicros();
s = DB->kvDB_->Get(ReadOptions(), Key, &val_str); s = DB->kvDB_->Get(ReadOptions(), Key, &val_str);
DB->construct_FieldsReq_Read_elapsed += DB->env_->NowMicros() - start_;
//DB->construct_FieldsReq_Read_elapsed += DB->env_->NowMicros() - start_;
// FieldArray *oldFields; // FieldArray *oldFields;
FieldSliceArray oldFields; FieldSliceArray oldFields;
if (s.IsNotFound()){ if (s.IsNotFound()){
@ -409,12 +408,12 @@ void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch; WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch;
SliceHashSet Sub_batchKeySet; SliceHashSet Sub_batchKeySet;
//由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代 //由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代
uint64_t start_ = DB->env_->NowMicros();
//uint64_t start_ = DB->env_->NowMicros();
for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) { for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) {
uint64_t start_sub = DB->env_->NowMicros();
//uint64_t start_sub = DB->env_->NowMicros();
(*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet); (*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet);
// (*subreq)->ConstructBatch(KVBatch, IndexBatch, MetaBatch, DB, batchKeySet); // (*subreq)->ConstructBatch(KVBatch, IndexBatch, MetaBatch, DB, batchKeySet);
DB->construct_BatchReq_perSub_elapsed += DB->env_->NowMicros() - start_sub;
//DB->construct_BatchReq_perSub_elapsed += DB->env_->NowMicros() - start_sub;
DB->count_Batch_Sub ++; DB->count_Batch_Sub ++;
//所有的对于pendreq的调用传入的参数被改成了this->parent,因此,对于subrequests来说, //所有的对于pendreq的调用传入的参数被改成了this->parent,因此,对于subrequests来说,
//pendreq的传参为对应的Batchreq,因此,此处判断batchreq是否pending可以得到subreq是否有冲突 //pendreq的传参为对应的Batchreq,因此,此处判断batchreq是否pending可以得到subreq是否有冲突
@ -422,7 +421,7 @@ void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
return; return;
} }
} }
DB->construct_BatchReq_Sub_elapsed += DB->env_->NowMicros() - start_;
//DB->construct_BatchReq_Sub_elapsed += DB->env_->NowMicros() - start_;
if(Sub_KVBatch.ApproximateSize() > 12) { if(Sub_KVBatch.ApproximateSize() > 12) {
KVBatch.Append(Sub_KVBatch); KVBatch.Append(Sub_KVBatch);
} }
@ -433,7 +432,7 @@ void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch,
MetaBatch.Append(Sub_MetaBatch); MetaBatch.Append(Sub_MetaBatch);
} }
batchKeySet.insert(Sub_batchKeySet.begin(),Sub_batchKeySet.end()); batchKeySet.insert(Sub_batchKeySet.begin(),Sub_batchKeySet.end());
DB->construct_BatchReq_elapsed += DB->env_->NowMicros() - start_;
//DB->construct_BatchReq_elapsed += DB->env_->NowMicros() - start_;
} }

+ 1
- 0
test/recover_test.cc 查看文件

@ -80,6 +80,7 @@ TEST(TestParalRecover, Recover) {
} }
GetOneField(db); GetOneField(db);
checkDataInKVAndIndex(db); checkDataInKVAndIndex(db);
//这里会出现两个数字,如果>1说明除了线程3插入的一条数据,其他线程也有数据在崩溃前被正确恢复了
} }
int main(int argc, char** argv) { int main(int argc, char** argv) {

Loading…
取消
儲存