diff --git a/fielddb/field_db.cpp b/fielddb/field_db.cpp index 346c8c9..63c9f2a 100644 --- a/fielddb/field_db.cpp +++ b/fielddb/field_db.cpp @@ -125,17 +125,17 @@ Request *FieldDB::GetHandleInterval() { } Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) { - uint64_t start_ = env_->NowMicros(); + //uint64_t start_ = env_->NowMicros(); MutexLock L(&mutex_); taskqueue_.push_back(&req); while(true){ - uint64_t start_waiting = env_->NowMicros(); + //uint64_t start_waiting = env_->NowMicros(); while(req.isPending() || !req.done && &req != taskqueue_.front()) { req.cond_.Wait(); } - waiting_elasped += env_->NowMicros() - start_waiting; + //waiting_elasped += env_->NowMicros() - start_waiting; if(req.done) { - elapsed += env_->NowMicros() - start_; + //elapsed += env_->NowMicros() - start_; count ++; // dumpStatistics(); return req.s; //在返回时自动释放锁L @@ -149,48 +149,48 @@ Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) { { //1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。 MutexLock iL(&index_mu); - uint64_t start_construct = env_->NowMicros(); + //uint64_t start_construct = env_->NowMicros(); for(auto *req_ptr : taskqueue_) { req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, batchKeySet); if(req_ptr == tail) break; } - construct_elapsed += env_->NowMicros() - start_construct; + //construct_elapsed += env_->NowMicros() - start_construct; } //2. 首先写入meta,再并发写入index和kv,完成之后清除meta数据 //此处可以放锁是因为写入的有序性可以通过队列来保证 mutex_.Unlock(); - uint64_t start_write = env_->NowMicros(); + //uint64_t start_write = env_->NowMicros(); if(MetaBatch.ApproximateSize() > 12) { - uint64_t start_meta = env_->NowMicros(); + //uint64_t start_meta = env_->NowMicros(); status = metaDB_->Write(op, &MetaBatch); - write_meta_elapsed += env_->NowMicros() - start_meta; + //write_meta_elapsed += env_->NowMicros() - start_meta; write_bytes += MetaBatch.ApproximateSize(); assert(status.ok()); } //TODO:index的写入需要在另外一个线程中同时完成 if(IndexBatch.ApproximateSize() > 12) { - uint64_t start_index = env_->NowMicros(); + //uint64_t start_index = env_->NowMicros(); status = indexDB_->Write(op, &IndexBatch); - write_index_elapsed += env_->NowMicros() - start_index; + //write_index_elapsed += env_->NowMicros() - start_index; write_bytes += IndexBatch.ApproximateSize(); assert(status.ok()); } if(KVBatch.ApproximateSize() > 12) { - uint64_t start_kv = env_->NowMicros(); + //uint64_t start_kv = env_->NowMicros(); status = kvDB_->Write(op, &KVBatch); - write_kv_elapsed += env_->NowMicros() - start_kv; + //write_kv_elapsed += env_->NowMicros() - start_kv; write_bytes += KVBatch.ApproximateSize(); assert(status.ok()); } //3. 将meta数据清除 if(MetaBatch.ApproximateSize() > 12) { - uint64_t start_clean = env_->NowMicros(); + //uint64_t start_clean = env_->NowMicros(); MetaCleaner cleaner; cleaner.Collect(MetaBatch); cleaner.CleanMetaBatch(metaDB_); - write_clean_elapsed += env_->NowMicros() - start_clean; + //write_clean_elapsed += env_->NowMicros() - start_clean; } - write_elapsed += env_->NowMicros() - start_write; + //write_elapsed += env_->NowMicros() - start_write; mutex_.Lock(); } else { //对于创建和删除索引的请求,通过prepare完成索引状态的更新 @@ -263,9 +263,9 @@ Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) { // dumpStatistics(); // return status; // } - uint64_t start_ = env_->NowMicros(); + //uint64_t start_ = env_->NowMicros(); BatchReq req(updates,&mutex_); - construct_BatchReq_init_elapsed += env_->NowMicros() - start_; + //construct_BatchReq_init_elapsed += env_->NowMicros() - start_; Status status = HandleRequest(req, options); return status; } diff --git a/fielddb/request.cpp b/fielddb/request.cpp index 9a53a6a..2fe585b 100644 --- a/fielddb/request.cpp +++ b/fielddb/request.cpp @@ -56,10 +56,9 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, batchKeySet.insert(Key); } std::string val_str; - Status s = Status::NotFound("test"); - uint64_t start_ = DB->env_->NowMicros(); + //uint64_t start_ = DB->env_->NowMicros(); s = DB->kvDB_->Get(ReadOptions(), Key, &val_str); - DB->construct_FieldsReq_Read_elapsed += DB->env_->NowMicros() - start_; + //DB->construct_FieldsReq_Read_elapsed += DB->env_->NowMicros() - start_; // FieldArray *oldFields; FieldSliceArray oldFields; if (s.IsNotFound()){ @@ -409,12 +408,12 @@ void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch; SliceHashSet Sub_batchKeySet; //由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代 - uint64_t start_ = DB->env_->NowMicros(); + //uint64_t start_ = DB->env_->NowMicros(); for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) { - uint64_t start_sub = DB->env_->NowMicros(); + //uint64_t start_sub = DB->env_->NowMicros(); (*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet); // (*subreq)->ConstructBatch(KVBatch, IndexBatch, MetaBatch, DB, batchKeySet); - DB->construct_BatchReq_perSub_elapsed += DB->env_->NowMicros() - start_sub; + //DB->construct_BatchReq_perSub_elapsed += DB->env_->NowMicros() - start_sub; DB->count_Batch_Sub ++; //所有的对于pendreq的调用传入的参数被改成了this->parent,因此,对于subrequests来说, //pendreq的传参为对应的Batchreq,因此,此处判断batchreq是否pending可以得到subreq是否有冲突 @@ -422,7 +421,7 @@ void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, return; } } - DB->construct_BatchReq_Sub_elapsed += DB->env_->NowMicros() - start_; + //DB->construct_BatchReq_Sub_elapsed += DB->env_->NowMicros() - start_; if(Sub_KVBatch.ApproximateSize() > 12) { KVBatch.Append(Sub_KVBatch); } @@ -433,7 +432,7 @@ void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, MetaBatch.Append(Sub_MetaBatch); } batchKeySet.insert(Sub_batchKeySet.begin(),Sub_batchKeySet.end()); - DB->construct_BatchReq_elapsed += DB->env_->NowMicros() - start_; + //DB->construct_BatchReq_elapsed += DB->env_->NowMicros() - start_; } diff --git a/test/recover_test.cc b/test/recover_test.cc index 3d42f80..d2b104d 100644 --- a/test/recover_test.cc +++ b/test/recover_test.cc @@ -80,6 +80,7 @@ TEST(TestParalRecover, Recover) { } GetOneField(db); checkDataInKVAndIndex(db); + //这里会出现两个数字,如果>1说明除了线程3插入的一条数据,其他线程也有数据在崩溃前被正确恢复了 } int main(int argc, char** argv) {