diff --git a/db/db_impl.cc b/db/db_impl.cc index d9af970..6402fc0 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1963,9 +1963,10 @@ void DBImpl::GarbageCollect() { status = Put(write_op, key, value); mutex_.Lock(); - valuelog_finding_key=""; - mutex_.Unlock(); + valuelog_finding_key=Slice(); lock_valuelog_key_mutex_cond_.SignalAll(); + mutex_.Unlock(); + if (!status.ok()) { std::cerr << "Error accessing sstable: " << status.ToString() diff --git a/db/write_batch.cc b/db/write_batch.cc index 74a4773..0757c39 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -131,8 +131,8 @@ class ValueLogInserter : public WriteBatch::Handler { public: WriteBatch writeBatch_; DB* db_; - ValueLogInserter(WriteBatch writeBatch,DB* db){ - writeBatch_=writeBatch; + std::vector> kvs; + ValueLogInserter(DB* db){ db_=db; } @@ -142,22 +142,28 @@ class ValueLogInserter : public WriteBatch::Handler { if(value.size()<100){ buf+=(char)(0x00);// should set in key buf.append(value.data(),value.size()); + writeBatch_.Put(key,Slice(buf)); } else{ - buf+=(char)(0x01); - std::vector> kv; - kv.push_back({key,value}); - auto res=db_->WriteValueLog(kv); - PutVarint64(&buf,res[0].first); - PutVarint64(&buf,res[0].second); + kvs.push_back({key,value}); } - new_value=Slice(buf); - writeBatch_.Put(key,new_value); } void Delete(const Slice& key) override { writeBatch_.Delete(key); } + + void batch_insert(){ + if(kvs.size()==0)return; + auto kv_res=db_->WriteValueLog(kvs); + for(int i=0;iempty()){ locked=false; - while(keys[l]==*lock_key_){ + while(!lock_key_->empty()&&keys[l]==*lock_key_){ cond_var_->Wait(); locked=true; } if(locked){ - r=l;//a full round to make sure no key = current lock_key + r=l-1;//a full round to make sure no key = current lock_key + if(r<0)r=len-1; } else if(l==r)break; if(++l==len)l=0; @@ -215,15 +222,17 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { Status WriteBatchInternal::checkValueLog(WriteBatch* b,DB* db_,Slice* lock_key,port::CondVar* cond_var_){ if(lock_key->size()>0){ ValueLogChecker checker(db_,lock_key,cond_var_); - b->Iterate(&checker); + auto res=b->Iterate(&checker); + if(!res.ok())return res; checker.CheckValid(); } return Status::OK(); } Status WriteBatchInternal::ConverToValueLog(WriteBatch* b,DB* db_){ - ValueLogInserter inserter(WriteBatch(),db_); + ValueLogInserter inserter(db_); auto res=b->Iterate(&inserter); + inserter.batch_insert(); *b=inserter.writeBatch_; return res; }