|
@ -52,11 +52,12 @@ struct DBImpl::Writer { |
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
|
struct DBImpl::CompactionState { |
|
|
struct DBImpl::CompactionState { |
|
|
// Files produced by compaction
|
|
|
|
|
|
|
|
|
// Files produced by compaction 这里的改动和filemetadata对应
|
|
|
struct Output { |
|
|
struct Output { |
|
|
uint64_t number; |
|
|
uint64_t number; |
|
|
uint64_t file_size; |
|
|
uint64_t file_size; |
|
|
InternalKey smallest, largest; |
|
|
InternalKey smallest, largest; |
|
|
|
|
|
uint64_t smallest_deadtime,largest_deadtime; |
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
|
Output* current_output() { return &outputs[outputs.size() - 1]; } |
|
|
Output* current_output() { return &outputs[outputs.size() - 1]; } |
|
@ -536,7 +537,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, |
|
|
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); |
|
|
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); |
|
|
} |
|
|
} |
|
|
edit->AddFile(level, meta.number, meta.file_size, meta.smallest, |
|
|
edit->AddFile(level, meta.number, meta.file_size, meta.smallest, |
|
|
meta.largest); |
|
|
|
|
|
|
|
|
meta.largest,meta.smallest_deadtime,meta.largest_deadtime); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
CompactionStats stats; |
|
|
CompactionStats stats; |
|
@ -590,6 +591,7 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
// max_level_with_files = config::kNumLevels - 1; //TODO:强制合并所有level中的sst,但是这么做不是很优雅
|
|
|
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
|
|
|
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
|
|
|
for (int level = 0; level < max_level_with_files; level++) { |
|
|
for (int level = 0; level < max_level_with_files; level++) { |
|
|
TEST_CompactRange(level, begin, end); |
|
|
TEST_CompactRange(level, begin, end); |
|
@ -704,6 +706,31 @@ void DBImpl::BackgroundCall() { |
|
|
background_work_finished_signal_.SignalAll(); |
|
|
background_work_finished_signal_.SignalAll(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
bool DBImpl::RemoveExpireTable() { |
|
|
|
|
|
bool remove = false; |
|
|
|
|
|
VersionEdit edit; |
|
|
|
|
|
time_t nowTime; |
|
|
|
|
|
time(&nowTime); |
|
|
|
|
|
Version *base = versions_->current(); |
|
|
|
|
|
base->Ref(); |
|
|
|
|
|
for(int level = 0; level < config::kNumLevels; level ++) { |
|
|
|
|
|
const std::vector<FileMetaData*> &files = versions_->current()->Files(level); |
|
|
|
|
|
for(auto meta:files) { |
|
|
|
|
|
if(meta->largest_deadtime < nowTime) { |
|
|
|
|
|
remove = true; |
|
|
|
|
|
edit.RemoveFile(level,meta->number); |
|
|
|
|
|
std::cout<<"remove file : "<<meta->number<<" from level : "<<level<<" deadtime : "<<meta->largest_deadtime<<std::endl; |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
if(remove) { |
|
|
|
|
|
versions_->LogAndApply(&edit,&mutex_); |
|
|
|
|
|
// RemoveObsoleteFiles();
|
|
|
|
|
|
} |
|
|
|
|
|
base->Unref(); |
|
|
|
|
|
return remove; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
void DBImpl::BackgroundCompaction() { |
|
|
void DBImpl::BackgroundCompaction() { |
|
|
mutex_.AssertHeld(); |
|
|
mutex_.AssertHeld(); |
|
|
|
|
|
|
|
@ -712,6 +739,10 @@ void DBImpl::BackgroundCompaction() { |
|
|
return; |
|
|
return; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if(RemoveExpireTable()) { |
|
|
|
|
|
return; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
Compaction* c; |
|
|
Compaction* c; |
|
|
bool is_manual = (manual_compaction_ != nullptr); |
|
|
bool is_manual = (manual_compaction_ != nullptr); |
|
|
InternalKey manual_end; |
|
|
InternalKey manual_end; |
|
@ -740,7 +771,7 @@ void DBImpl::BackgroundCompaction() { |
|
|
FileMetaData* f = c->input(0, 0); |
|
|
FileMetaData* f = c->input(0, 0); |
|
|
c->edit()->RemoveFile(c->level(), f->number); |
|
|
c->edit()->RemoveFile(c->level(), f->number); |
|
|
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, |
|
|
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, |
|
|
f->largest); |
|
|
|
|
|
|
|
|
f->largest,f->smallest_deadtime,f->largest_deadtime); |
|
|
status = versions_->LogAndApply(c->edit(), &mutex_); |
|
|
status = versions_->LogAndApply(c->edit(), &mutex_); |
|
|
if (!status.ok()) { |
|
|
if (!status.ok()) { |
|
|
RecordBackgroundError(status); |
|
|
RecordBackgroundError(status); |
|
@ -814,6 +845,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { |
|
|
out.number = file_number; |
|
|
out.number = file_number; |
|
|
out.smallest.Clear(); |
|
|
out.smallest.Clear(); |
|
|
out.largest.Clear(); |
|
|
out.largest.Clear(); |
|
|
|
|
|
out.smallest_deadtime = UINT64_MAX; |
|
|
|
|
|
out.largest_deadtime = 0; |
|
|
compact->outputs.push_back(out); |
|
|
compact->outputs.push_back(out); |
|
|
mutex_.Unlock(); |
|
|
mutex_.Unlock(); |
|
|
} |
|
|
} |
|
@ -889,7 +922,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { |
|
|
for (size_t i = 0; i < compact->outputs.size(); i++) { |
|
|
for (size_t i = 0; i < compact->outputs.size(); i++) { |
|
|
const CompactionState::Output& out = compact->outputs[i]; |
|
|
const CompactionState::Output& out = compact->outputs[i]; |
|
|
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, |
|
|
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, |
|
|
out.smallest, out.largest); |
|
|
|
|
|
|
|
|
out.smallest, out.largest,out.smallest_deadtime,out.largest_deadtime); |
|
|
} |
|
|
} |
|
|
return versions_->LogAndApply(compact->compaction->edit(), &mutex_); |
|
|
return versions_->LogAndApply(compact->compaction->edit(), &mutex_); |
|
|
} |
|
|
} |
|
@ -948,11 +981,20 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
|
|
|
|
|
|
// Handle key/value, add to state, etc.
|
|
|
// Handle key/value, add to state, etc.
|
|
|
bool drop = false; |
|
|
bool drop = false; |
|
|
|
|
|
time_t nowTime; |
|
|
|
|
|
time(&nowTime); |
|
|
if (!ParseInternalKey(key, &ikey)) { |
|
|
if (!ParseInternalKey(key, &ikey)) { |
|
|
// Do not hide error keys
|
|
|
// Do not hide error keys
|
|
|
current_user_key.clear(); |
|
|
current_user_key.clear(); |
|
|
has_current_user_key = false; |
|
|
has_current_user_key = false; |
|
|
last_sequence_for_key = kMaxSequenceNumber; |
|
|
last_sequence_for_key = kMaxSequenceNumber; |
|
|
|
|
|
} else if(ikey.deadTime != 0 && ikey.deadTime < nowTime){ |
|
|
|
|
|
static int count = 0; |
|
|
|
|
|
if(count % 1000 == 0) { |
|
|
|
|
|
std::cout<<"count "<<count<<" drop dead in Compaction: "<<ikey.user_key.ToString()<<" "<<ikey.deadTime<<std::endl; |
|
|
|
|
|
count ++; |
|
|
|
|
|
} |
|
|
|
|
|
drop = true; |
|
|
} else { |
|
|
} else { |
|
|
if (!has_current_user_key || |
|
|
if (!has_current_user_key || |
|
|
user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) != |
|
|
user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) != |
|
@ -978,7 +1020,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
// Therefore this deletion marker is obsolete and can be dropped.
|
|
|
// Therefore this deletion marker is obsolete and can be dropped.
|
|
|
drop = true; |
|
|
drop = true; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
last_sequence_for_key = ikey.sequence; |
|
|
last_sequence_for_key = ikey.sequence; |
|
|
} |
|
|
} |
|
|
#if 0
|
|
|
#if 0
|
|
@ -990,7 +1031,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
compact->compaction->IsBaseLevelForKey(ikey.user_key), |
|
|
compact->compaction->IsBaseLevelForKey(ikey.user_key), |
|
|
(int)last_sequence_for_key, (int)compact->smallest_snapshot); |
|
|
(int)last_sequence_for_key, (int)compact->smallest_snapshot); |
|
|
#endif
|
|
|
#endif
|
|
|
|
|
|
|
|
|
if (!drop) { |
|
|
if (!drop) { |
|
|
// Open output file if necessary
|
|
|
// Open output file if necessary
|
|
|
if (compact->builder == nullptr) { |
|
|
if (compact->builder == nullptr) { |
|
@ -1003,6 +1043,17 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
compact->current_output()->smallest.DecodeFrom(key); |
|
|
compact->current_output()->smallest.DecodeFrom(key); |
|
|
} |
|
|
} |
|
|
compact->current_output()->largest.DecodeFrom(key); |
|
|
compact->current_output()->largest.DecodeFrom(key); |
|
|
|
|
|
|
|
|
|
|
|
ParsedInternalKey parsed; |
|
|
|
|
|
ParseInternalKey(key,&parsed); |
|
|
|
|
|
uint64_t &smallest_deadtime = compact->current_output()->smallest_deadtime; |
|
|
|
|
|
uint64_t &largest_deadtime = compact->current_output()->largest_deadtime; |
|
|
|
|
|
if(parsed.deadTime == 0) { |
|
|
|
|
|
smallest_deadtime = UINT64_MAX; |
|
|
|
|
|
} |
|
|
|
|
|
smallest_deadtime = std::min(smallest_deadtime,parsed.deadTime); |
|
|
|
|
|
largest_deadtime = std::max(largest_deadtime,parsed.deadTime); |
|
|
|
|
|
|
|
|
compact->builder->Add(key, input->value()); |
|
|
compact->builder->Add(key, input->value()); |
|
|
|
|
|
|
|
|
// Close output file if it is big enough
|
|
|
// Close output file if it is big enough
|
|
@ -1206,6 +1257,11 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, |
|
|
return DB::Put(o, key, val, ttl); |
|
|
return DB::Put(o, key, val, ttl); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Status DBImpl::Put(const WriteOptions& options, const Slice& key,
|
|
|
|
|
|
// const Slice& value, uint64_t ttl) {
|
|
|
|
|
|
// return DB::Put(options,key,value,ttl);
|
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { |
|
|
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { |
|
|
return DB::Delete(options, key); |
|
|
return DB::Delete(options, key); |
|
|
} |
|
|
} |
|
@ -1500,6 +1556,14 @@ Status DB::Put(const WriteOptions& opt, const Slice& key, |
|
|
return Write(opt, &batch); |
|
|
return Write(opt, &batch); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// //为了通过编译,忽略ttl
|
|
|
|
|
|
// Status DB::Put(const WriteOptions& options, const Slice& key,
|
|
|
|
|
|
// const Slice& value, uint64_t ttl) {
|
|
|
|
|
|
// WriteBatch batch;
|
|
|
|
|
|
// batch.Put(key, value);
|
|
|
|
|
|
// return Write(options, &batch);
|
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
Status DB::Delete(const WriteOptions& opt, const Slice& key) { |
|
|
Status DB::Delete(const WriteOptions& opt, const Slice& key) { |
|
|
WriteBatch batch; |
|
|
WriteBatch batch; |
|
|
batch.Delete(key); |
|
|
batch.Delete(key); |
|
|