|
@ -57,6 +57,7 @@ struct DBImpl::CompactionState { |
|
|
struct Output { |
|
|
struct Output { |
|
|
uint64_t number; |
|
|
uint64_t number; |
|
|
uint64_t file_size; |
|
|
uint64_t file_size; |
|
|
|
|
|
uint64_t valuelog_id; |
|
|
InternalKey smallest, largest; |
|
|
InternalKey smallest, largest; |
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
@ -541,7 +542,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, |
|
|
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); |
|
|
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); |
|
|
} |
|
|
} |
|
|
edit->AddFile(level, meta.number, meta.file_size, meta.smallest, |
|
|
edit->AddFile(level, meta.number, meta.file_size, meta.smallest, |
|
|
meta.largest); |
|
|
|
|
|
|
|
|
meta.largest,meta.valuelog_id); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
CompactionStats stats; |
|
|
CompactionStats stats; |
|
@ -745,7 +746,7 @@ void DBImpl::BackgroundCompaction() { |
|
|
FileMetaData* f = c->input(0, 0); |
|
|
FileMetaData* f = c->input(0, 0); |
|
|
c->edit()->RemoveFile(c->level(), f->number); |
|
|
c->edit()->RemoveFile(c->level(), f->number); |
|
|
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, |
|
|
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, |
|
|
f->largest); |
|
|
|
|
|
|
|
|
f->largest,f->valuelog_id); |
|
|
status = versions_->LogAndApply(c->edit(), &mutex_); |
|
|
status = versions_->LogAndApply(c->edit(), &mutex_); |
|
|
if (!status.ok()) { |
|
|
if (!status.ok()) { |
|
|
RecordBackgroundError(status); |
|
|
RecordBackgroundError(status); |
|
@ -819,8 +820,11 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { |
|
|
out.number = file_number; |
|
|
out.number = file_number; |
|
|
out.smallest.Clear(); |
|
|
out.smallest.Clear(); |
|
|
out.largest.Clear(); |
|
|
out.largest.Clear(); |
|
|
compact->outputs.push_back(out); |
|
|
|
|
|
compact->valuelog_file_id=versions_->NewFileNumber(); |
|
|
compact->valuelog_file_id=versions_->NewFileNumber(); |
|
|
|
|
|
out.valuelog_id=compact->valuelog_file_id; |
|
|
|
|
|
|
|
|
|
|
|
compact->outputs.push_back(out); |
|
|
|
|
|
|
|
|
mutex_.Unlock(); |
|
|
mutex_.Unlock(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -913,7 +917,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { |
|
|
for (size_t i = 0; i < compact->outputs.size(); i++) { |
|
|
for (size_t i = 0; i < compact->outputs.size(); i++) { |
|
|
const CompactionState::Output& out = compact->outputs[i]; |
|
|
const CompactionState::Output& out = compact->outputs[i]; |
|
|
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, |
|
|
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, |
|
|
out.smallest, out.largest); |
|
|
|
|
|
|
|
|
out.smallest, out.largest,out.valuelog_id); |
|
|
} |
|
|
} |
|
|
return versions_->LogAndApply(compact->compaction->edit(), &mutex_); |
|
|
return versions_->LogAndApply(compact->compaction->edit(), &mutex_); |
|
|
} |
|
|
} |
|
@ -947,6 +951,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
std::string current_user_key; |
|
|
std::string current_user_key; |
|
|
bool has_current_user_key = false; |
|
|
bool has_current_user_key = false; |
|
|
SequenceNumber last_sequence_for_key = kMaxSequenceNumber; |
|
|
SequenceNumber last_sequence_for_key = kMaxSequenceNumber; |
|
|
|
|
|
std::set<uint64_t> old_valuelog_ids; |
|
|
|
|
|
for (int which = 0; which < 2; which++) { |
|
|
|
|
|
for (int i = 0; i < compact->compaction->num_input_files(which); i++) { |
|
|
|
|
|
if(compact->compaction->input(which, i)->valuelog_id)old_valuelog_ids.emplace(compact->compaction->input(which, i)->valuelog_id); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { |
|
|
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { |
|
|
// Prioritize immutable compaction work
|
|
|
// Prioritize immutable compaction work
|
|
|
if (has_imm_.load(std::memory_order_relaxed)) { |
|
|
if (has_imm_.load(std::memory_order_relaxed)) { |
|
@ -1035,6 +1045,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
new_value=old_value; |
|
|
new_value=old_value; |
|
|
} |
|
|
} |
|
|
else{ |
|
|
else{ |
|
|
|
|
|
old_value.remove_prefix(1); |
|
|
uint64_t file_id,valuelog_offset,valuelog_len; |
|
|
uint64_t file_id,valuelog_offset,valuelog_len; |
|
|
bool res=GetVarint64(&old_value,&file_id); |
|
|
bool res=GetVarint64(&old_value,&file_id); |
|
|
if(!res)assert(0); |
|
|
if(!res)assert(0); |
|
@ -1078,6 +1089,14 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
if (status.ok()) { |
|
|
if (status.ok()) { |
|
|
status = input->status(); |
|
|
status = input->status(); |
|
|
} |
|
|
} |
|
|
|
|
|
//not completely correct, should be written in new function, related to removeabsol...
|
|
|
|
|
|
if(status.ok()){ |
|
|
|
|
|
for(auto id:old_valuelog_ids){ |
|
|
|
|
|
auto valuelog_filename=ValueLogFileName(dbname_,id); |
|
|
|
|
|
Status s=env_->RemoveFile(valuelog_filename); |
|
|
|
|
|
assert(s.ok()); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
delete input; |
|
|
delete input; |
|
|
input = nullptr; |
|
|
input = nullptr; |
|
|
|
|
|
|
|
@ -1607,7 +1626,7 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice |
|
|
//std::cout<<file_name_<<" "<<offset<<" "<<len<<std::endl;
|
|
|
//std::cout<<file_name_<<" "<<offset<<" "<<len<<std::endl;
|
|
|
std::ifstream inFile(file_name_, std::ios::in | std::ios::binary); |
|
|
std::ifstream inFile(file_name_, std::ios::in | std::ios::binary); |
|
|
if (!inFile.is_open()) { |
|
|
if (!inFile.is_open()) { |
|
|
std::cerr << "Failed to open file for writing!" << std::endl; |
|
|
|
|
|
|
|
|
std::cerr << "Failed to open file for writing!" class="o"><<file_id<<" "<<offset<<" "<<len<< std::endl; |
|
|
return Status::Corruption("Failed to open file for writing!"); |
|
|
return Status::Corruption("Failed to open file for writing!"); |
|
|
} |
|
|
} |
|
|
inFile.seekg(offset); |
|
|
inFile.seekg(offset); |
|
|