From c053ad80504b392a629c5698eddfc3d58f338f9a Mon Sep 17 00:00:00 2001 From: alexfisher <1823748191@qq.com> Date: Mon, 23 Dec 2024 09:59:41 +0800 Subject: [PATCH] fix lock_key bug --- db/db_impl.cc | 23 +++++++++----------- db/db_impl.h | 5 ++--- db/write_batch.cc | 53 +++++++++++++++++++++++++++++++++++++++++++++++ db/write_batch_internal.h | 2 ++ 4 files changed, 67 insertions(+), 16 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index f3e6646..be8f9c4 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -142,7 +142,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) shutting_down_(false), background_work_finished_signal_(&mutex_), background_gc_finished_signal_(&gc_mutex_), - spj_mutex_cond_(&spj_mutex_), + lock_valuelog_key_mutex_cond_(&mutex_), mem_(nullptr), imm_(nullptr), has_imm_(false), @@ -1330,13 +1330,6 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) { // Convenience methods Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { - if(!o.valuelog_write){ - spj_mutex_.Lock(); - while(key==valuelog_finding_key){ - spj_mutex_cond_.Wait(); - } - spj_mutex_.Unlock(); - } return DB::Put(o, key, val); } @@ -1352,6 +1345,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { w.done = false; MutexLock l(&mutex_); + if(!options.valuelog_write&&valuelog_finding_key.size()>0){ + WriteBatchInternal::checkValueLog(updates, this,&valuelog_finding_key,&lock_valuelog_key_mutex_cond_); + } writers_.push_back(&w); while (!w.done && &w != writers_.front()) { w.cv.Wait(); @@ -1367,6 +1363,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { if (status.ok() && updates != nullptr) { // nullptr batch is for compactions WriteBatch* write_batch = BuildBatchGroup(&last_writer); WriteBatchInternal::ConverToValueLog(write_batch, this); + WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); last_sequence += WriteBatchInternal::Count(write_batch); @@ -1908,9 +1905,9 @@ void DBImpl::GarbageCollect() { std::string stored_value; //lock those thread who attempt to push "key" - spj_mutex_.Lock(); + mutex_.Lock(); valuelog_finding_key=key; - spj_mutex_.Unlock(); + mutex_.Unlock(); //wait for current writer queue to do all their thing { auto op=leveldb::WriteOptions(); @@ -1965,10 +1962,10 @@ void DBImpl::GarbageCollect() { write_op.valuelog_write=true; status = Put(write_op, key, value); - spj_mutex_.Lock(); + mutex_.Lock(); valuelog_finding_key=""; - spj_mutex_.Unlock(); - spj_mutex_cond_.SignalAll(); + mutex_.Unlock(); + lock_valuelog_key_mutex_cond_.SignalAll(); if (!status.ok()) { std::cerr << "Error accessing sstable: " << status.ToString() diff --git a/db/db_impl.h b/db/db_impl.h index 2b6be55..df723b7 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -213,8 +213,7 @@ class DBImpl : public DB { // State below is protected by mutex_ port::Mutex mutex_; port::Mutex gc_mutex_; - port::Mutex spj_mutex_; - port::CondVar spj_mutex_cond_ GUARDED_BY(spj_mutex_); + port::CondVar lock_valuelog_key_mutex_cond_ GUARDED_BY(mutex_); // std::shared_mutex value_log_mutex; @@ -222,7 +221,7 @@ class DBImpl : public DB { port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_); port::CondVar background_gc_finished_signal_ GUARDED_BY(gc_mutex_); - Slice valuelog_finding_key="" GUARDED_BY(spj_mutex_ ); + Slice valuelog_finding_key="" GUARDED_BY(mutex_ ); MemTable* mem_; MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted diff --git a/db/write_batch.cc b/db/write_batch.cc index 6275892..74a4773 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -159,8 +159,52 @@ class ValueLogInserter : public WriteBatch::Handler { writeBatch_.Delete(key); } }; + +class ValueLogChecker : public WriteBatch::Handler { + public: + std::vector keys; + DB* db_; + Slice* lock_key_; + port::CondVar* cond_var_; + ValueLogChecker(DB* db,Slice* lock_key,port::CondVar* cond_var){ + db_=db; + lock_key_=lock_key; + cond_var_=cond_var; + } + + void Put(const Slice& key, const Slice& value) override { + keys.push_back(key); + } + + void Delete(const Slice& key) override { + keys.push_back(key); + } + + void CheckValid(){ + int len=keys.size(); + if(!len)return; + int l=0; + int r=len-1; + bool locked=false; + while(1){ + locked=false; + while(keys[l]==*lock_key_){ + cond_var_->Wait(); + locked=true; + } + if(locked){ + r=l;//a full round to make sure no key = current lock_key + } + else if(l==r)break; + if(++l==len)l=0; + } + } +}; } // namespace + + + Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { MemTableInserter inserter; inserter.sequence_ = WriteBatchInternal::Sequence(b); @@ -168,6 +212,15 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { return b->Iterate(&inserter); } +Status WriteBatchInternal::checkValueLog(WriteBatch* b,DB* db_,Slice* lock_key,port::CondVar* cond_var_){ + if(lock_key->size()>0){ + ValueLogChecker checker(db_,lock_key,cond_var_); + b->Iterate(&checker); + checker.CheckValid(); + } + return Status::OK(); +} + Status WriteBatchInternal::ConverToValueLog(WriteBatch* b,DB* db_){ ValueLogInserter inserter(WriteBatch(),db_); auto res=b->Iterate(&inserter); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 07869aa..404e0a0 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -36,6 +36,8 @@ class WriteBatchInternal { static void SetContents(WriteBatch* batch, const Slice& contents); static Status InsertInto(const WriteBatch* batch, MemTable* memtable); + + static Status checkValueLog(WriteBatch* batch,DB* db_,Slice* lock_key,port::CondVar* cond_var_); static Status ConverToValueLog(WriteBatch* batch,DB* db_);