Browse Source

fix lock_key bug

pull/3/head
alexfisher 9 months ago
parent
commit
c053ad8050
4 changed files with 67 additions and 16 deletions
  1. +10
    -13
      db/db_impl.cc
  2. +2
    -3
      db/db_impl.h
  3. +53
    -0
      db/write_batch.cc
  4. +2
    -0
      db/write_batch_internal.h

+ 10
- 13
db/db_impl.cc View File

@ -142,7 +142,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
shutting_down_(false), shutting_down_(false),
background_work_finished_signal_(&mutex_), background_work_finished_signal_(&mutex_),
background_gc_finished_signal_(&gc_mutex_), background_gc_finished_signal_(&gc_mutex_),
spj_mutex_cond_(&spj_mutex_),
lock_valuelog_key_mutex_cond_(&mutex_),
mem_(nullptr), mem_(nullptr),
imm_(nullptr), imm_(nullptr),
has_imm_(false), has_imm_(false),
@ -1330,13 +1330,6 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
// Convenience methods // Convenience methods
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
if(!o.valuelog_write){
spj_mutex_.Lock();
while(key==valuelog_finding_key){
spj_mutex_cond_.Wait();
}
spj_mutex_.Unlock();
}
return DB::Put(o, key, val); return DB::Put(o, key, val);
} }
@ -1352,6 +1345,9 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
w.done = false; w.done = false;
MutexLock l(&mutex_); MutexLock l(&mutex_);
if(!options.valuelog_write&&valuelog_finding_key.size()>0){
WriteBatchInternal::checkValueLog(updates, this,&valuelog_finding_key,&lock_valuelog_key_mutex_cond_);
}
writers_.push_back(&w); writers_.push_back(&w);
while (!w.done && &w != writers_.front()) { while (!w.done && &w != writers_.front()) {
w.cv.Wait(); w.cv.Wait();
@ -1367,6 +1363,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer); WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::ConverToValueLog(write_batch, this); WriteBatchInternal::ConverToValueLog(write_batch, this);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch); last_sequence += WriteBatchInternal::Count(write_batch);
@ -1908,9 +1905,9 @@ void DBImpl::GarbageCollect() {
std::string stored_value; std::string stored_value;
//lock those thread who attempt to push "key" //lock those thread who attempt to push "key"
spj_mutex_.Lock();
mutex_.Lock();
valuelog_finding_key=key; valuelog_finding_key=key;
spj_mutex_.Unlock();
mutex_.Unlock();
//wait for current writer queue to do all their thing //wait for current writer queue to do all their thing
{ {
auto op=leveldb::WriteOptions(); auto op=leveldb::WriteOptions();
@ -1965,10 +1962,10 @@ void DBImpl::GarbageCollect() {
write_op.valuelog_write=true; write_op.valuelog_write=true;
status = Put(write_op, key, value); status = Put(write_op, key, value);
spj_mutex_.Lock();
mutex_.Lock();
valuelog_finding_key=""; valuelog_finding_key="";
spj_mutex_.Unlock();
spj_mutex_cond_.SignalAll();
mutex_.Unlock();
lock_valuelog_key_mutex_cond_.SignalAll();
if (!status.ok()) { if (!status.ok()) {
std::cerr << "Error accessing sstable: " << status.ToString() std::cerr << "Error accessing sstable: " << status.ToString()

+ 2
- 3
db/db_impl.h View File

@ -213,8 +213,7 @@ class DBImpl : public DB {
// State below is protected by mutex_ // State below is protected by mutex_
port::Mutex mutex_; port::Mutex mutex_;
port::Mutex gc_mutex_; port::Mutex gc_mutex_;
port::Mutex spj_mutex_;
port::CondVar spj_mutex_cond_ GUARDED_BY(spj_mutex_);
port::CondVar lock_valuelog_key_mutex_cond_ GUARDED_BY(mutex_);
// std::shared_mutex value_log_mutex; // std::shared_mutex value_log_mutex;
@ -222,7 +221,7 @@ class DBImpl : public DB {
port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_); port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_);
port::CondVar background_gc_finished_signal_ GUARDED_BY(gc_mutex_); port::CondVar background_gc_finished_signal_ GUARDED_BY(gc_mutex_);
Slice valuelog_finding_key="" GUARDED_BY(spj_mutex_ );
Slice valuelog_finding_key="" GUARDED_BY(mutex_ );
MemTable* mem_; MemTable* mem_;
MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted

+ 53
- 0
db/write_batch.cc View File

@ -159,8 +159,52 @@ class ValueLogInserter : public WriteBatch::Handler {
writeBatch_.Delete(key); writeBatch_.Delete(key);
} }
}; };
class ValueLogChecker : public WriteBatch::Handler {
public:
std::vector<Slice> keys;
DB* db_;
Slice* lock_key_;
port::CondVar* cond_var_;
ValueLogChecker(DB* db,Slice* lock_key,port::CondVar* cond_var){
db_=db;
lock_key_=lock_key;
cond_var_=cond_var;
}
void Put(const Slice& key, const Slice& value) override {
keys.push_back(key);
}
void Delete(const Slice& key) override {
keys.push_back(key);
}
void CheckValid(){
int len=keys.size();
if(!len)return;
int l=0;
int r=len-1;
bool locked=false;
while(1){
locked=false;
while(keys[l]==*lock_key_){
cond_var_->Wait();
locked=true;
}
if(locked){
r=l;//a full round to make sure no key = current lock_key
}
else if(l==r)break;
if(++l==len)l=0;
}
}
};
} // namespace } // namespace
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
MemTableInserter inserter; MemTableInserter inserter;
inserter.sequence_ = WriteBatchInternal::Sequence(b); inserter.sequence_ = WriteBatchInternal::Sequence(b);
@ -168,6 +212,15 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
return b->Iterate(&inserter); return b->Iterate(&inserter);
} }
Status WriteBatchInternal::checkValueLog(WriteBatch* b,DB* db_,Slice* lock_key,port::CondVar* cond_var_){
if(lock_key->size()>0){
ValueLogChecker checker(db_,lock_key,cond_var_);
b->Iterate(&checker);
checker.CheckValid();
}
return Status::OK();
}
Status WriteBatchInternal::ConverToValueLog(WriteBatch* b,DB* db_){ Status WriteBatchInternal::ConverToValueLog(WriteBatch* b,DB* db_){
ValueLogInserter inserter(WriteBatch(),db_); ValueLogInserter inserter(WriteBatch(),db_);
auto res=b->Iterate(&inserter); auto res=b->Iterate(&inserter);

+ 2
- 0
db/write_batch_internal.h View File

@ -36,6 +36,8 @@ class WriteBatchInternal {
static void SetContents(WriteBatch* batch, const Slice& contents); static void SetContents(WriteBatch* batch, const Slice& contents);
static Status InsertInto(const WriteBatch* batch, MemTable* memtable); static Status InsertInto(const WriteBatch* batch, MemTable* memtable);
static Status checkValueLog(WriteBatch* batch,DB* db_,Slice* lock_key,port::CondVar* cond_var_);
static Status ConverToValueLog(WriteBatch* batch,DB* db_); static Status ConverToValueLog(WriteBatch* batch,DB* db_);

Loading…
Cancel
Save