diff --git a/db/db_impl.cc b/db/db_impl.cc index 6879b82..122760c 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1183,6 +1183,7 @@ std::vector DBImpl::FindKeysByField(Field &field){ result.push_back(iter->key().ToString()); } } + delete iter; return result; } diff --git a/fielddb/field_db.cpp b/fielddb/field_db.cpp index c42a370..121f991 100644 --- a/fielddb/field_db.cpp +++ b/fielddb/field_db.cpp @@ -57,16 +57,14 @@ Status FieldDB::OpenFieldDB(Options& options, (*dbptr)->metaDB_ = metadb; (*dbptr)->dbname_ = name; - // status = (*dbptr)->Recover(); + status = (*dbptr)->Recover(); (*dbptr)->options_ = &options; (*dbptr)->env_ = options.env; return status; } -// TODO:Recover Status FieldDB::Recover() { - //TODO: //1. 遍历所有Index类型的meta,重建内存中的index_状态表 Iterator *Iter = indexDB_->NewIterator(ReadOptions()); std::string IndexKey; @@ -76,10 +74,8 @@ Status FieldDB::Recover() { ParsedInternalIndexKey ParsedIndex; ParseInternalIndexKey(Slice(IndexKey),&ParsedIndex); index_[ParsedIndex.name_.ToString()] = {Exist,nullptr}; - //std::cout << "Existed Index : " << ParsedIndex.name_.ToString() << std::endl; //构建下一个搜索的对象,在原来的fieldname的基础上加一个最大的ascii字符(不可见字符) - //TODO:不知道这个做法有没有道理 std::string Seek; PutLengthPrefixedSlice(&Seek, ParsedIndex.name_); Seek.push_back(0xff); @@ -133,103 +129,100 @@ Status FieldDB::HandleRequest(Request &req) { uint64_t start_ = env_->NowMicros(); MutexLock L(&mutex_); taskqueue_.push_back(&req); -Again: - while(!req.done && &req != taskqueue_.front()) { - req.cond_.Wait(); - } - if(req.done) { - return req.s; //在返回时自动释放锁L - } - Request *tail = GetHandleInterval(); - WriteBatch KVBatch,IndexBatch,MetaBatch; - std::unordered_set batchKeySet; - Status status; - if(!tail->isiCreateReq() && !tail->isiDeleteReq()) { - //表明这一个区间并没有涉及index的创建删除 - { - //1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。 - MutexLock iL(&index_mu); - uint64_t start_construct = env_->NowMicros(); - for(auto *req_ptr : taskqueue_) { - req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, batchKeySet); - if(req_ptr == tail) break; - } - construct_elapsed += env_->NowMicros() - start_construct; - } - //2. 首先写入meta,再并发写入index和kv,完成之后清除meta数据 - //此处可以放锁是因为写入的有序性可以通过队列来保证 - mutex_.Unlock(); - uint64_t start_write = env_->NowMicros(); - WriteOptions op; - if(MetaBatch.ApproximateSize() > 12) { - uint64_t start_meta = env_->NowMicros(); - status = metaDB_->Write(op, &MetaBatch); - write_meta_elapsed += env_->NowMicros() - start_meta; - write_bytes += MetaBatch.ApproximateSize(); - assert(status.ok()); - } - //TODO:index的写入需要在另外一个线程中同时完成 - if(IndexBatch.ApproximateSize() > 12) { - uint64_t start_index = env_->NowMicros(); - status = indexDB_->Write(op, &IndexBatch); - write_index_elapsed += env_->NowMicros() - start_index; - write_bytes += IndexBatch.ApproximateSize(); - assert(status.ok()); + while(true){ + while(!req.done && &req != taskqueue_.front()) { + req.cond_.Wait(); } - if(KVBatch.ApproximateSize() > 12) { - uint64_t start_kv = env_->NowMicros(); - status = kvDB_->Write(op, &KVBatch); - write_kv_elapsed += env_->NowMicros() - start_kv; - write_bytes += KVBatch.ApproximateSize(); - assert(status.ok()); + if(req.done) { + return req.s; //在返回时自动释放锁L } - //3. 将meta数据清除 - if(MetaBatch.ApproximateSize() > 12) { - uint64_t start_clean = env_->NowMicros(); - MetaCleaner cleaner; - cleaner.Collect(MetaBatch); - cleaner.CleanMetaBatch(metaDB_); - write_clean_elapsed += env_->NowMicros() - start_clean; + Request *tail = GetHandleInterval(); + WriteBatch KVBatch,IndexBatch,MetaBatch; + std::unordered_set batchKeySet; + Status status; + if(!tail->isiCreateReq() && !tail->isiDeleteReq()) { + //表明这一个区间并没有涉及index的创建删除 + { + //1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。 + MutexLock iL(&index_mu); + uint64_t start_construct = env_->NowMicros(); + for(auto *req_ptr : taskqueue_) { + req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, batchKeySet); + if(req_ptr == tail) break; + } + construct_elapsed += env_->NowMicros() - start_construct; + } + //2. 首先写入meta,再并发写入index和kv,完成之后清除meta数据 + //此处可以放锁是因为写入的有序性可以通过队列来保证 + mutex_.Unlock(); + uint64_t start_write = env_->NowMicros(); + WriteOptions op; + if(MetaBatch.ApproximateSize() > 12) { + uint64_t start_meta = env_->NowMicros(); + status = metaDB_->Write(op, &MetaBatch); + write_meta_elapsed += env_->NowMicros() - start_meta; + write_bytes += MetaBatch.ApproximateSize(); + assert(status.ok()); + } + //TODO:index的写入需要在另外一个线程中同时完成 + if(IndexBatch.ApproximateSize() > 12) { + uint64_t start_index = env_->NowMicros(); + status = indexDB_->Write(op, &IndexBatch); + write_index_elapsed += env_->NowMicros() - start_index; + write_bytes += IndexBatch.ApproximateSize(); + assert(status.ok()); + } + if(KVBatch.ApproximateSize() > 12) { + uint64_t start_kv = env_->NowMicros(); + status = kvDB_->Write(op, &KVBatch); + write_kv_elapsed += env_->NowMicros() - start_kv; + write_bytes += KVBatch.ApproximateSize(); + assert(status.ok()); + } + //3. 将meta数据清除 + if(MetaBatch.ApproximateSize() > 12) { + uint64_t start_clean = env_->NowMicros(); + MetaCleaner cleaner; + cleaner.Collect(MetaBatch); + cleaner.CleanMetaBatch(metaDB_); + write_clean_elapsed += env_->NowMicros() - start_clean; + } + write_elapsed += env_->NowMicros() - start_write; + mutex_.Lock(); + } else { + //对于创建和删除索引的请求,通过prepare完成索引状态的更新 + MutexLock iL(&index_mu); + req.Prepare(this); } - write_elapsed += env_->NowMicros() - start_write; - mutex_.Lock(); - } else { - //对于创建和删除索引的请求,通过prepare完成索引状态的更新 - MutexLock iL(&index_mu); - req.Prepare(this); - } - // { - // static int count = 0; - // if(count++ % 100000 == 0) { - // std::cout << "TaskQueue Size : " << taskqueue_.size() << std::endl; - // } - // } - while(true) { - Request *ready = taskqueue_.front(); - // int debug = tail->type_; - taskqueue_.pop_front(); - //当前ready不是队首,不是和index的创建有关 - if(!ready->isPending() && !req.isiCreateReq() && !req.isiDeleteReq()) { - ready->s = status; - ready->done = true; - if (ready != &req) ready->cond_.Signal(); + // { + // static int count = 0; + // if(count++ % 100000 == 0) { + // std::cout << "TaskQueue Size : " << taskqueue_.size() << std::endl; + // } + // } + while(true) { + Request *ready = taskqueue_.front(); + // int debug = tail->type_; + taskqueue_.pop_front(); + //当前ready不是队首,不是和index的创建有关 + if(!ready->isPending() && !req.isiCreateReq() && !req.isiDeleteReq()) { + ready->s = status; + ready->done = true; + if (ready != &req) ready->cond_.Signal(); + } + if (ready == tail) break; } - if (ready == tail) break; - } - elapsed += env_->NowMicros() - start_; - count ++; - dumpStatistics(); + elapsed += env_->NowMicros() - start_; + count ++; + dumpStatistics(); - if(!taskqueue_.empty()) { - taskqueue_.front()->cond_.Signal(); + if(!taskqueue_.empty()) { + taskqueue_.front()->cond_.Signal(); + } + //如果done==true,那么就不会继续等待直接退出 + //如果处于某个请求的pending list里面,那么就会继续等待重新入队 } - //如果done==true,那么就不会继续等待直接退出 - //如果处于某个请求的pending list里面,那么就会继续等待重新入队 - //这里用了万恶的goto,蛤蛤 - goto Again; - - // return status; } // 这里把一个空串作为常规put的name @@ -242,7 +235,6 @@ Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice & // 需要对是否进行index更新做处理 Status FieldDB::PutFields(const WriteOptions &Options, const Slice &key, const FieldArray &fields) { - //这里是为了const和slice-string的转换被迫搞得 std::string key_ = key.ToString(); FieldArray fields_ = fields; @@ -250,7 +242,6 @@ Status FieldDB::PutFields(const WriteOptions &Options, Status status = HandleRequest(req); return status; - // return kvDB_->PutFields(Options, key, fields); } // 删除有索引的key时indexdb也要同步 @@ -260,31 +251,26 @@ Status FieldDB::Delete(const WriteOptions &options, const Slice &key) { DeleteReq req(&key_,&mutex_); Status status = HandleRequest(req); return status; - // return kvDB_->Delete(options, key); } -// TODO:根据updates里面的东西,要对是否需要更新index进行分别处理 -Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) { - { - uint64_t start_ = env_->NowMicros(); - Status status = kvDB_->Write(options, updates); - temp_elapsed += env_->NowMicros() - start_; - count ++; - dumpStatistics(); - return status; - } - //或许应该再做一个接口?或者基于现有的接口进行改造 +// 根据updates里面的东西,要对是否需要更新index进行分别处理 +Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) { + // { + // uint64_t start_ = env_->NowMicros(); + // Status status = kvDB_->Write(options, updates); + // temp_elapsed += env_->NowMicros() - start_; + // count ++; + // dumpStatistics(); + // return status; + // } uint64_t start_ = env_->NowMicros(); BatchReq req(updates,&mutex_); construct_BatchReq_init_elapsed += env_->NowMicros() - start_; Status status = HandleRequest(req); return status; - assert(0); - return Status::OK(); } //由于常规put将空串作为name,这里也需要适当修改 Status FieldDB::Get(const ReadOptions &options, const Slice &key, std::string *value) { - // return kvDB_->Get(options, key, value); FieldArray fields; Status s = GetFields(options, key, &fields); if(!s.ok()) { diff --git a/fielddb/request.cpp b/fielddb/request.cpp index d7757f9..b07ab2d 100644 --- a/fielddb/request.cpp +++ b/fielddb/request.cpp @@ -339,7 +339,7 @@ BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu): //为key和value构造存储的地方,防止由于string的析构造成可能得内存访问错误 str_buf->push_back(key.ToString()); FieldArray *field = new FieldArray; - // field = ParseValue(value.ToString(), field); + field = ParseValue(value.ToString(), field); if (field->empty()){ //batch中的value没有field fa_buf->push_back({{"",value.ToString()}}); } else { diff --git a/test/recover_test.cc b/test/recover_test.cc index 47cc731..36480b7 100644 --- a/test/recover_test.cc +++ b/test/recover_test.cc @@ -36,7 +36,7 @@ TEST(TestNormalRecover, Recover) { findKeysByAgeIndex(db, true); } -TEST(TestParalPutRecover, Recover) { +TEST(TestParalRecover, Recover) { //第一次运行 // fielddb::DestroyDB("testdb3.2",Options()); // FieldDB *db = new FieldDB(); @@ -47,24 +47,28 @@ TEST(TestParalPutRecover, Recover) { // } // db->CreateIndexOnField("address"); // db->CreateIndexOnField("age"); - // shanghaiKeys.clear(); - // age20Keys.clear(); - // int thread_num_ = 2; + // int thread_num_ = 4; // std::vector threads(thread_num_); // threads[0] = std::thread([db](){ // InsertFieldData(db); // }); // threads[1] = std::thread([db](){ + // WriteFieldData(db); + // }); + // threads[2] = std::thread([db](){ + // DeleteFieldData(db); + // }); + // threads[3] = std::thread([db](){ // InsertOneField(db); // delete db; // }); - // for (auto& t : threads) { + // for (auto& t : threads) { // if (t.joinable()) { // t.join(); // } // } - //线程1导致了线程0错误,测试会终止(模拟数据库崩溃) - //这会导致线程0在写入的各种奇怪的时间点崩溃 + //线程3导致了其他线程错误,测试会终止(模拟数据库崩溃) + //这会导致各线程在各种奇怪的时间点崩溃 //第二次运行注释掉上面的代码,运行下面的代码测试恢复