|
@ -131,8 +131,8 @@ class ValueLogInserter : public WriteBatch::Handler { |
|
|
public: |
|
|
public: |
|
|
WriteBatch writeBatch_; |
|
|
WriteBatch writeBatch_; |
|
|
DB* db_; |
|
|
DB* db_; |
|
|
ValueLogInserter(WriteBatch writeBatch,DB* db){ |
|
|
|
|
|
writeBatch_=writeBatch; |
|
|
|
|
|
|
|
|
std::vector<std::pair<Slice,Slice>> kvs; |
|
|
|
|
|
ValueLogInserter(DB* db){ |
|
|
db_=db; |
|
|
db_=db; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -142,22 +142,28 @@ class ValueLogInserter : public WriteBatch::Handler { |
|
|
if(value.size()<100){ |
|
|
if(value.size()<100){ |
|
|
buf+=(char)(0x00);// should set in key
|
|
|
buf+=(char)(0x00);// should set in key
|
|
|
buf.append(value.data(),value.size()); |
|
|
buf.append(value.data(),value.size()); |
|
|
|
|
|
writeBatch_.Put(key,Slice(buf)); |
|
|
} |
|
|
} |
|
|
else{ |
|
|
else{ |
|
|
buf+=(char)(0x01); |
|
|
|
|
|
std::vector<std::pair<Slice,Slice>> kv; |
|
|
|
|
|
kv.push_back({key,value}); |
|
|
|
|
|
auto res=db_->WriteValueLog(kv); |
|
|
|
|
|
PutVarint64(&buf,res[0].first); |
|
|
|
|
|
PutVarint64(&buf,res[0].second); |
|
|
|
|
|
|
|
|
kvs.push_back({key,value}); |
|
|
} |
|
|
} |
|
|
new_value=Slice(buf); |
|
|
|
|
|
writeBatch_.Put(key,new_value); |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
void Delete(const Slice& key) override { |
|
|
void Delete(const Slice& key) override { |
|
|
writeBatch_.Delete(key); |
|
|
writeBatch_.Delete(key); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void batch_insert(){ |
|
|
|
|
|
if(kvs.size()==0)return; |
|
|
|
|
|
auto kv_res=db_->WriteValueLog(kvs); |
|
|
|
|
|
for(int i=0;i<kvs.size();i++){ |
|
|
|
|
|
std::string buf; |
|
|
|
|
|
buf+=(char)(0x01); |
|
|
|
|
|
PutVarint64(&buf,kv_res[i].first); |
|
|
|
|
|
PutVarint64(&buf,kv_res[i].second); |
|
|
|
|
|
writeBatch_.Put(kvs[i].first,Slice(buf)); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
|
class ValueLogChecker : public WriteBatch::Handler { |
|
|
class ValueLogChecker : public WriteBatch::Handler { |
|
@ -186,14 +192,15 @@ class ValueLogChecker : public WriteBatch::Handler { |
|
|
int l=0; |
|
|
int l=0; |
|
|
int r=len-1; |
|
|
int r=len-1; |
|
|
bool locked=false; |
|
|
bool locked=false; |
|
|
while(1){ |
|
|
|
|
|
|
|
|
while(!lock_key_->empty()){ |
|
|
locked=false; |
|
|
locked=false; |
|
|
while(keys[l]==*lock_key_){ |
|
|
|
|
|
|
|
|
while(!lock_key_->empty()&&keys[l]==*lock_key_){ |
|
|
cond_var_->Wait(); |
|
|
cond_var_->Wait(); |
|
|
locked=true; |
|
|
locked=true; |
|
|
} |
|
|
} |
|
|
if(locked){ |
|
|
if(locked){ |
|
|
r=l;//a full round to make sure no key = current lock_key
|
|
|
|
|
|
|
|
|
r=l-1;//a full round to make sure no key = current lock_key
|
|
|
|
|
|
if(r<0)r=len-1; |
|
|
} |
|
|
} |
|
|
else if(l==r)break; |
|
|
else if(l==r)break; |
|
|
if(++l==len)l=0; |
|
|
if(++l==len)l=0; |
|
@ -215,15 +222,17 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { |
|
|
Status WriteBatchInternal::checkValueLog(WriteBatch* b,DB* db_,Slice* lock_key,port::CondVar* cond_var_){ |
|
|
Status WriteBatchInternal::checkValueLog(WriteBatch* b,DB* db_,Slice* lock_key,port::CondVar* cond_var_){ |
|
|
if(lock_key->size()>0){ |
|
|
if(lock_key->size()>0){ |
|
|
ValueLogChecker checker(db_,lock_key,cond_var_); |
|
|
ValueLogChecker checker(db_,lock_key,cond_var_); |
|
|
b->Iterate(&checker); |
|
|
|
|
|
|
|
|
auto res=b->Iterate(&checker); |
|
|
|
|
|
if(!res.ok())return res; |
|
|
checker.CheckValid(); |
|
|
checker.CheckValid(); |
|
|
} |
|
|
} |
|
|
return Status::OK(); |
|
|
return Status::OK(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
Status WriteBatchInternal::ConverToValueLog(WriteBatch* b,DB* db_){ |
|
|
Status WriteBatchInternal::ConverToValueLog(WriteBatch* b,DB* db_){ |
|
|
ValueLogInserter inserter(WriteBatch(),db_); |
|
|
|
|
|
|
|
|
ValueLogInserter inserter(db_); |
|
|
auto res=b->Iterate(&inserter); |
|
|
auto res=b->Iterate(&inserter); |
|
|
|
|
|
inserter.batch_insert(); |
|
|
*b=inserter.writeBatch_; |
|
|
*b=inserter.writeBatch_; |
|
|
return res; |
|
|
return res; |
|
|
} |
|
|
} |
|
|