|
|
@ -57,6 +57,7 @@ struct DBImpl::CompactionState { |
|
|
|
struct Output { |
|
|
|
uint64_t number; |
|
|
|
uint64_t file_size; |
|
|
|
uint64_t valuelog_id; |
|
|
|
InternalKey smallest, largest; |
|
|
|
}; |
|
|
|
|
|
|
@ -83,6 +84,10 @@ struct DBImpl::CompactionState { |
|
|
|
WritableFile* outfile; |
|
|
|
TableBuilder* builder; |
|
|
|
|
|
|
|
WritableFile* valuelogfile; |
|
|
|
uint64_t valuelog_offset=0; |
|
|
|
uint64_t valuelog_file_id=0; |
|
|
|
|
|
|
|
uint64_t total_bytes; |
|
|
|
}; |
|
|
|
|
|
|
@ -537,7 +542,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, |
|
|
|
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); |
|
|
|
} |
|
|
|
edit->AddFile(level, meta.number, meta.file_size, meta.smallest, |
|
|
|
meta.largest); |
|
|
|
meta.largest,meta.valuelog_id); |
|
|
|
} |
|
|
|
|
|
|
|
CompactionStats stats; |
|
|
@ -741,7 +746,7 @@ void DBImpl::BackgroundCompaction() { |
|
|
|
FileMetaData* f = c->input(0, 0); |
|
|
|
c->edit()->RemoveFile(c->level(), f->number); |
|
|
|
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, |
|
|
|
f->largest); |
|
|
|
f->largest,f->valuelog_id); |
|
|
|
status = versions_->LogAndApply(c->edit(), &mutex_); |
|
|
|
if (!status.ok()) { |
|
|
|
RecordBackgroundError(status); |
|
|
@ -815,7 +820,11 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { |
|
|
|
out.number = file_number; |
|
|
|
out.smallest.Clear(); |
|
|
|
out.largest.Clear(); |
|
|
|
compact->valuelog_file_id=versions_->NewFileNumber(); |
|
|
|
out.valuelog_id=compact->valuelog_file_id; |
|
|
|
|
|
|
|
compact->outputs.push_back(out); |
|
|
|
|
|
|
|
mutex_.Unlock(); |
|
|
|
} |
|
|
|
|
|
|
@ -825,6 +834,11 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { |
|
|
|
if (s.ok()) { |
|
|
|
compact->builder = new TableBuilder(options_, compact->outfile); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
compact->valuelog_offset=0; |
|
|
|
s = env_->NewWritableFile(ValueLogFileName(dbname_,compact->valuelog_file_id), &compact->valuelogfile); |
|
|
|
|
|
|
|
return s; |
|
|
|
} |
|
|
|
|
|
|
@ -860,6 +874,19 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, |
|
|
|
} |
|
|
|
delete compact->outfile; |
|
|
|
compact->outfile = nullptr; |
|
|
|
if (s.ok()) { |
|
|
|
s = compact->valuelogfile->Flush(); |
|
|
|
} |
|
|
|
if (s.ok()) { |
|
|
|
s = compact->valuelogfile->Sync(); |
|
|
|
} |
|
|
|
if (s.ok()) { |
|
|
|
s = compact->valuelogfile->Close(); |
|
|
|
} |
|
|
|
delete compact->valuelogfile; |
|
|
|
compact->valuelogfile=nullptr; |
|
|
|
compact->valuelog_file_id=0; |
|
|
|
compact->valuelog_offset=0; |
|
|
|
|
|
|
|
if (s.ok() && current_entries > 0) { |
|
|
|
// Verify that the table is usable
|
|
|
@ -890,7 +917,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { |
|
|
|
for (size_t i = 0; i < compact->outputs.size(); i++) { |
|
|
|
const CompactionState::Output& out = compact->outputs[i]; |
|
|
|
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, |
|
|
|
out.smallest, out.largest); |
|
|
|
out.smallest, out.largest,out.valuelog_id); |
|
|
|
} |
|
|
|
return versions_->LogAndApply(compact->compaction->edit(), &mutex_); |
|
|
|
} |
|
|
@ -924,6 +951,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
|
std::string current_user_key; |
|
|
|
bool has_current_user_key = false; |
|
|
|
SequenceNumber last_sequence_for_key = kMaxSequenceNumber; |
|
|
|
std::set<uint64_t> old_valuelog_ids; |
|
|
|
for (int which = 0; which < 2; which++) { |
|
|
|
for (int i = 0; i < compact->compaction->num_input_files(which); i++) { |
|
|
|
if(compact->compaction->input(which, i)->valuelog_id)old_valuelog_ids.emplace(compact->compaction->input(which, i)->valuelog_id); |
|
|
|
} |
|
|
|
} |
|
|
|
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { |
|
|
|
// Prioritize immutable compaction work
|
|
|
|
if (has_imm_.load(std::memory_order_relaxed)) { |
|
|
@ -1004,7 +1037,35 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
|
compact->current_output()->smallest.DecodeFrom(key); |
|
|
|
} |
|
|
|
compact->current_output()->largest.DecodeFrom(key); |
|
|
|
compact->builder->Add(key, input->value()); |
|
|
|
|
|
|
|
Slice old_value=input->value(); |
|
|
|
Slice new_value; |
|
|
|
std::string buf=""; |
|
|
|
if(old_value.data()[0]==(char)(0x00)){ |
|
|
|
new_value=old_value; |
|
|
|
} |
|
|
|
else{ |
|
|
|
old_value.remove_prefix(1); |
|
|
|
uint64_t file_id,valuelog_offset,valuelog_len; |
|
|
|
bool res=GetVarint64(&old_value,&file_id); |
|
|
|
if(!res)assert(0); |
|
|
|
res=GetVarint64(&old_value,&valuelog_offset); |
|
|
|
if(!res)assert(0); |
|
|
|
res=GetVarint64(&old_value,&valuelog_len); |
|
|
|
if(!res)assert(0); |
|
|
|
Status s=ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); |
|
|
|
assert(s.ok()); |
|
|
|
writeValueLogForCompaction(compact->valuelogfile,{new_value}); |
|
|
|
buf+=(char)(0x01); |
|
|
|
PutVarint64(&buf,compact->valuelog_file_id); |
|
|
|
PutVarint64(&buf,compact->valuelog_offset); |
|
|
|
PutVarint64(&buf,valuelog_len); |
|
|
|
compact->valuelog_offset+=valuelog_len; |
|
|
|
delete []new_value.data(); |
|
|
|
new_value=Slice(buf); |
|
|
|
} |
|
|
|
|
|
|
|
compact->builder->Add(key, new_value); |
|
|
|
|
|
|
|
// Close output file if it is big enough
|
|
|
|
if (compact->builder->FileSize() >= |
|
|
@ -1028,6 +1089,14 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
|
if (status.ok()) { |
|
|
|
status = input->status(); |
|
|
|
} |
|
|
|
//not completely correct, should be written in new function, related to removeabsol...
|
|
|
|
if(status.ok()){ |
|
|
|
for(auto id:old_valuelog_ids){ |
|
|
|
auto valuelog_filename=ValueLogFileName(dbname_,id); |
|
|
|
Status s=env_->RemoveFile(valuelog_filename); |
|
|
|
assert(s.ok()); |
|
|
|
} |
|
|
|
} |
|
|
|
delete input; |
|
|
|
input = nullptr; |
|
|
|
|
|
|
@ -1168,6 +1237,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, |
|
|
|
return s; |
|
|
|
} |
|
|
|
Slice value_log_slice=Slice(value->c_str()+1,value->length()); |
|
|
|
Slice new_value; |
|
|
|
uint64_t file_id,valuelog_offset,valuelog_len; |
|
|
|
bool res=GetVarint64(&value_log_slice,&file_id); |
|
|
|
if(!res)return Status::Corruption("can't decode file id"); |
|
|
@ -1175,8 +1245,9 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, |
|
|
|
if(!res)return Status::Corruption("can't decode valuelog offset"); |
|
|
|
res=GetVarint64(&value_log_slice,&valuelog_len); |
|
|
|
if(!res)return Status::Corruption("can't decode valuelog len"); |
|
|
|
ReadValueLog(file_id,valuelog_offset,valuelog_len,&value_log_slice); |
|
|
|
*value=std::string(value_log_slice.data(),value_log_slice.size()); |
|
|
|
ReadValueLog(file_id,valuelog_offset,valuelog_len,&new_value); |
|
|
|
*value=std::string(new_value.data(),new_value.size()); |
|
|
|
delete []new_value.data(); |
|
|
|
return s; |
|
|
|
} |
|
|
|
|
|
|
@ -1406,7 +1477,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { |
|
|
|
RecordBackgroundError(s); |
|
|
|
} |
|
|
|
delete logfile_; |
|
|
|
|
|
|
|
addNewValueLog(); |
|
|
|
logfile_ = lfile; |
|
|
|
logfile_number_ = new_log_number; |
|
|
|
log_ = new log::Writer(lfile); |
|
|
@ -1502,7 +1573,17 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { |
|
|
|
} |
|
|
|
std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> DBImpl::WriteValueLog(std::vector<Slice> values){ |
|
|
|
//lock
|
|
|
|
std::string file_name_=std::to_string(valuelogfile_number_)+".VALUELOG"; |
|
|
|
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> res;
|
|
|
|
// for(int i=0;i<values.size();i++){
|
|
|
|
// int len=values[i].size();
|
|
|
|
// valuelogfile_->Append(values[i]);
|
|
|
|
// res.push_back({valuelogfile_number_,{valuelogfile_offset,len}});
|
|
|
|
// valuelogfile_offset+=len;
|
|
|
|
// }
|
|
|
|
// //unlock
|
|
|
|
// valuelogfile_->Flush();
|
|
|
|
// return res;
|
|
|
|
std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_); |
|
|
|
std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); |
|
|
|
if (!valueFile.is_open()) { |
|
|
|
assert(0); |
|
|
@ -1519,28 +1600,41 @@ std::vector>> DBImpl::WriteValue |
|
|
|
valueFile.close(); |
|
|
|
return res; |
|
|
|
} |
|
|
|
void DBImpl::writeValueLogForCompaction(WritableFile* target_file,std::vector<Slice> values){ |
|
|
|
for(int i=0;i<values.size();i++){ |
|
|
|
target_file->Append(values[i]); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
void DBImpl::addNewValueLog(){ |
|
|
|
//lock
|
|
|
|
// if(valuelogfile_){
|
|
|
|
// valuelogfile_->Sync();
|
|
|
|
// valuelogfile_->Close();
|
|
|
|
// delete valuelogfile_;
|
|
|
|
// }
|
|
|
|
valuelogfile_number_=versions_->NewFileNumber(); |
|
|
|
// valuelogfile_offset=0;
|
|
|
|
// std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_);
|
|
|
|
// env_->NewWritableFile(file_name_,&valuelogfile_);
|
|
|
|
//unlock
|
|
|
|
} |
|
|
|
|
|
|
|
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ |
|
|
|
//lock_shared
|
|
|
|
std::string file_name_=std::to_string(valuelogfile_number_)+".VALUELOG"; |
|
|
|
std::string file_name_=ValueLogFileName(dbname_,file_id); |
|
|
|
//std::cout<<file_name_<<" "<<offset<<" "<<len<<std::endl;
|
|
|
|
std::ifstream inFile(file_name_, std::ios::in | std::ios::binary); |
|
|
|
if (!inFile.is_open()) { |
|
|
|
std::cerr << "Failed to open file for writing!" << std::endl; |
|
|
|
std::cerr << "Failed to open file for writing!" class="o"><<file_id<<" "<<offset<<" "<<len<< std::endl; |
|
|
|
return Status::Corruption("Failed to open file for writing!"); |
|
|
|
} |
|
|
|
inFile.seekg(offset); |
|
|
|
char *value_buf=new char[len]; |
|
|
|
inFile.read(value_buf,len); |
|
|
|
inFile.close(); |
|
|
|
value=new Slice(value_buf,len); |
|
|
|
*value=Slice(value_buf,len); |
|
|
|
return Status::OK(); |
|
|
|
//unlock_shared
|
|
|
|
} |
|
|
|
|
|
|
|
// Default implementations of convenience methods that subclasses of DB
|
|
|
@ -1581,6 +1675,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { |
|
|
|
impl->log_ = new log::Writer(lfile); |
|
|
|
impl->mem_ = new MemTable(impl->internal_comparator_); |
|
|
|
impl->mem_->Ref(); |
|
|
|
impl->addNewValueLog(); |
|
|
|
} |
|
|
|
} |
|
|
|
if (s.ok() && save_manifest) { |
|
|
|