|
@ -61,6 +61,7 @@ struct DBImpl::CompactionState { |
|
|
uint64_t number; |
|
|
uint64_t number; |
|
|
uint64_t file_size; |
|
|
uint64_t file_size; |
|
|
InternalKey smallest, largest; |
|
|
InternalKey smallest, largest; |
|
|
|
|
|
TIMESTAMP old_ts,new_ts; |
|
|
}; |
|
|
}; |
|
|
|
|
|
|
|
|
Output* current_output() { return &outputs[outputs.size() - 1]; } |
|
|
Output* current_output() { return &outputs[outputs.size() - 1]; } |
|
@ -520,7 +521,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, |
|
|
Status s; |
|
|
Status s; |
|
|
{ |
|
|
{ |
|
|
mutex_.Unlock(); |
|
|
mutex_.Unlock(); |
|
|
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); |
|
|
|
|
|
|
|
|
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);//meta包含largest_key,此时未刷盘meta,但meta.number对应生成的file会刷盘
|
|
|
mutex_.Lock(); |
|
|
mutex_.Lock(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -536,11 +537,15 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, |
|
|
if (s.ok() && meta.file_size > 0) { |
|
|
if (s.ok() && meta.file_size > 0) { |
|
|
const Slice min_user_key = meta.smallest.user_key(); |
|
|
const Slice min_user_key = meta.smallest.user_key(); |
|
|
const Slice max_user_key = meta.largest.user_key(); |
|
|
const Slice max_user_key = meta.largest.user_key(); |
|
|
|
|
|
const TIMESTAMP new_ts = meta.newer_ts; |
|
|
|
|
|
const TIMESTAMP old_ts = meta.oldest_ts; |
|
|
if (base != nullptr) { |
|
|
if (base != nullptr) { |
|
|
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); |
|
|
|
|
|
|
|
|
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);// TODO :基于timestamp和size和seek的新的选择规则
|
|
|
} |
|
|
} |
|
|
|
|
|
// edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
|
|
|
|
|
|
// meta.largest);
|
|
|
edit->AddFile(level, meta.number, meta.file_size, meta.smallest, |
|
|
edit->AddFile(level, meta.number, meta.file_size, meta.smallest, |
|
|
meta.largest); |
|
|
|
|
|
|
|
|
meta.largest,old_ts,new_ts); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
CompactionStats stats; |
|
|
CompactionStats stats; |
|
@ -598,6 +603,7 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) { |
|
|
for (int level = 0; level < max_level_with_files; level++) { |
|
|
for (int level = 0; level < max_level_with_files; level++) { |
|
|
TEST_CompactRange(level, begin, end); |
|
|
TEST_CompactRange(level, begin, end); |
|
|
} |
|
|
} |
|
|
|
|
|
TEST_CompactRange(max_level_with_files, begin, end); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
void DBImpl::TEST_CompactRange(int level, const Slice* begin, |
|
|
void DBImpl::TEST_CompactRange(int level, const Slice* begin, |
|
@ -743,8 +749,10 @@ void DBImpl::BackgroundCompaction() { |
|
|
assert(c->num_input_files(0) == 1); |
|
|
assert(c->num_input_files(0) == 1); |
|
|
FileMetaData* f = c->input(0, 0); |
|
|
FileMetaData* f = c->input(0, 0); |
|
|
c->edit()->RemoveFile(c->level(), f->number); |
|
|
c->edit()->RemoveFile(c->level(), f->number); |
|
|
|
|
|
// c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
|
|
|
|
|
|
// f->largest);
|
|
|
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, |
|
|
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, |
|
|
f->largest); |
|
|
|
|
|
|
|
|
f->largest,f->oldest_ts,f->newer_ts); |
|
|
status = versions_->LogAndApply(c->edit(), &mutex_); |
|
|
status = versions_->LogAndApply(c->edit(), &mutex_); |
|
|
if (!status.ok()) { |
|
|
if (!status.ok()) { |
|
|
RecordBackgroundError(status); |
|
|
RecordBackgroundError(status); |
|
@ -756,7 +764,7 @@ void DBImpl::BackgroundCompaction() { |
|
|
status.ToString().c_str(), versions_->LevelSummary(&tmp)); |
|
|
status.ToString().c_str(), versions_->LevelSummary(&tmp)); |
|
|
} else { |
|
|
} else { |
|
|
CompactionState* compact = new CompactionState(c); |
|
|
CompactionState* compact = new CompactionState(c); |
|
|
status = DoCompactionWork(compact); |
|
|
|
|
|
|
|
|
status = DoCompactionWork(compact);//
|
|
|
if (!status.ok()) { |
|
|
if (!status.ok()) { |
|
|
RecordBackgroundError(status); |
|
|
RecordBackgroundError(status); |
|
|
} |
|
|
} |
|
@ -818,6 +826,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { |
|
|
out.number = file_number; |
|
|
out.number = file_number; |
|
|
out.smallest.Clear(); |
|
|
out.smallest.Clear(); |
|
|
out.largest.Clear(); |
|
|
out.largest.Clear(); |
|
|
|
|
|
out.old_ts = UINT64_MAX; |
|
|
|
|
|
out.new_ts = 0; |
|
|
compact->outputs.push_back(out); |
|
|
compact->outputs.push_back(out); |
|
|
mutex_.Unlock(); |
|
|
mutex_.Unlock(); |
|
|
} |
|
|
} |
|
@ -892,8 +902,10 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { |
|
|
const int level = compact->compaction->level(); |
|
|
const int level = compact->compaction->level(); |
|
|
for (size_t i = 0; i < compact->outputs.size(); i++) { |
|
|
for (size_t i = 0; i < compact->outputs.size(); i++) { |
|
|
const CompactionState::Output& out = compact->outputs[i]; |
|
|
const CompactionState::Output& out = compact->outputs[i]; |
|
|
|
|
|
// compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
|
|
|
|
|
|
// out.smallest, out.largest);
|
|
|
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, |
|
|
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, |
|
|
out.smallest, out.largest); |
|
|
|
|
|
|
|
|
out.smallest, out.largest,out.old_ts,out.new_ts); |
|
|
} |
|
|
} |
|
|
return versions_->LogAndApply(compact->compaction->edit(), &mutex_); |
|
|
return versions_->LogAndApply(compact->compaction->edit(), &mutex_); |
|
|
} |
|
|
} |
|
@ -917,13 +929,13 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
Iterator* input = versions_->MakeInputIterator(compact->compaction); |
|
|
Iterator* input = versions_->MakeInputIterator(compact->compaction); |
|
|
|
|
|
|
|
|
// Release mutex while we're actually doing the compaction work
|
|
|
// Release mutex while we're actually doing the compaction work
|
|
|
mutex_.Unlock(); |
|
|
mutex_.Unlock(); |
|
|
|
|
|
|
|
|
input->SeekToFirst(); |
|
|
input->SeekToFirst(); |
|
|
Status status; |
|
|
Status status; |
|
|
ParsedInternalKey ikey; |
|
|
ParsedInternalKey ikey; |
|
|
|
|
|
TIMESTAMP ts = 0; |
|
|
std::string current_user_key; |
|
|
std::string current_user_key; |
|
|
bool has_current_user_key = false; |
|
|
bool has_current_user_key = false; |
|
|
SequenceNumber last_sequence_for_key = kMaxSequenceNumber; |
|
|
SequenceNumber last_sequence_for_key = kMaxSequenceNumber; |
|
@ -949,7 +961,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
break; |
|
|
break; |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
//auto a = DecodeFixed64(input->value().data() + input->value().size() - kTSLength);//debug
|
|
|
// Handle key/value, add to state, etc.
|
|
|
// Handle key/value, add to state, etc.
|
|
|
bool drop = false; |
|
|
bool drop = false; |
|
|
if (!ParseInternalKey(key, &ikey)) { |
|
|
if (!ParseInternalKey(key, &ikey)) { |
|
@ -981,6 +993,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
// few iterations of this loop (by rule (A) above).
|
|
|
// few iterations of this loop (by rule (A) above).
|
|
|
// Therefore this deletion marker is obsolete and can be dropped.
|
|
|
// Therefore this deletion marker is obsolete and can be dropped.
|
|
|
drop = true; |
|
|
drop = true; |
|
|
|
|
|
}else if((ts = DecodeFixed64(input->value().data() + input->value().size() - kTSLength)) < env_->NowMicros()){ |
|
|
|
|
|
|
|
|
|
|
|
drop = true; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
last_sequence_for_key = ikey.sequence; |
|
|
last_sequence_for_key = ikey.sequence; |
|
@ -1007,6 +1022,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
compact->current_output()->smallest.DecodeFrom(key); |
|
|
compact->current_output()->smallest.DecodeFrom(key); |
|
|
} |
|
|
} |
|
|
compact->current_output()->largest.DecodeFrom(key); |
|
|
compact->current_output()->largest.DecodeFrom(key); |
|
|
|
|
|
assert(ts != 0); |
|
|
|
|
|
//auto b = compact->current_output()->old_ts;
|
|
|
|
|
|
compact->current_output()->old_ts = std::min(compact->current_output()->old_ts,ts); |
|
|
|
|
|
compact->current_output()->new_ts = std::max(compact->current_output()->new_ts,ts); |
|
|
compact->builder->Add(key, input->value()); |
|
|
compact->builder->Add(key, input->value()); |
|
|
|
|
|
|
|
|
// Close output file if it is big enough
|
|
|
// Close output file if it is big enough
|
|
@ -1153,23 +1172,30 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, |
|
|
} else if (imm != nullptr && imm->Get(lkey, value, &s)) { |
|
|
} else if (imm != nullptr && imm->Get(lkey, value, &s)) { |
|
|
// Done
|
|
|
// Done
|
|
|
} else { |
|
|
} else { |
|
|
|
|
|
stats.now_ts = this->env_->NowMicros(); |
|
|
s = current->Get(options, lkey, value, &stats); |
|
|
s = current->Get(options, lkey, value, &stats); |
|
|
have_stat_update = true; |
|
|
have_stat_update = true; |
|
|
} |
|
|
} |
|
|
mutex_.Lock(); |
|
|
mutex_.Lock(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if (have_stat_update && current->UpdateStats(stats)) { |
|
|
|
|
|
MaybeScheduleCompaction(); |
|
|
|
|
|
|
|
|
if(s.ok()){ |
|
|
|
|
|
s = CheckIsExpire(value); |
|
|
|
|
|
} |
|
|
|
|
|
if (have_stat_update && current->UpdateStats(stats,s.IsExpire())) { |
|
|
|
|
|
MaybeScheduleCompaction();//有可能寻址过多,导致allow_seek为0,触发合并。
|
|
|
} |
|
|
} |
|
|
mem->Unref(); |
|
|
mem->Unref(); |
|
|
if (imm != nullptr) imm->Unref(); |
|
|
if (imm != nullptr) imm->Unref(); |
|
|
|
|
|
|
|
|
current->Unref(); |
|
|
current->Unref(); |
|
|
if(!s.ok()){ |
|
|
|
|
|
return s; |
|
|
|
|
|
} |
|
|
|
|
|
auto s2 = CheckIsExpire(value); |
|
|
|
|
|
return s2; |
|
|
|
|
|
|
|
|
// if(!s.ok()){
|
|
|
|
|
|
// return s;
|
|
|
|
|
|
// }
|
|
|
|
|
|
// auto s2 = CheckIsExpire(value);
|
|
|
|
|
|
// if(!s2.ok()){
|
|
|
|
|
|
// current->UpdateStats(stats);
|
|
|
|
|
|
// }
|
|
|
|
|
|
return s; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
Iterator* DBImpl::NewIterator(const ReadOptions& options) { |
|
|
Iterator* DBImpl::NewIterator(const ReadOptions& options) { |
|
@ -1209,26 +1235,11 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { |
|
|
|
|
|
|
|
|
Status DBImpl::Put(const WriteOptions& options, const Slice& key, |
|
|
Status DBImpl::Put(const WriteOptions& options, const Slice& key, |
|
|
const Slice& value, uint64_t ttl) { |
|
|
const Slice& value, uint64_t ttl) { |
|
|
//rocksdb的实现
|
|
|
|
|
|
// Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts,
|
|
|
|
|
|
// SystemClock* clock) {
|
|
|
|
|
|
// val_with_ts->reserve(kTSLength + val.size());
|
|
|
|
|
|
// char ts_string[kTSLength];
|
|
|
|
|
|
// int64_t curtime;
|
|
|
|
|
|
// Status st = clock->GetCurrentTime(&curtime);
|
|
|
|
|
|
// if (!st.ok()) {
|
|
|
|
|
|
// return st;
|
|
|
|
|
|
// }
|
|
|
|
|
|
// EncodeFixed32(ts_string, (int32_t)curtime);
|
|
|
|
|
|
// val_with_ts->append(val.data(), val.size());
|
|
|
|
|
|
// val_with_ts->append(ts_string, kTSLength);
|
|
|
|
|
|
// return st;
|
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
|
|
std::string val_with_ts; |
|
|
std::string val_with_ts; |
|
|
val_with_ts.reserve(value.size() + kTSLength); |
|
|
val_with_ts.reserve(value.size() + kTSLength); |
|
|
char ts_string[kTSLength]; |
|
|
char ts_string[kTSLength]; |
|
|
TIMESTAMP expiration_time = this->env_->GetCurrentTime() + ttl * 1000; |
|
|
|
|
|
|
|
|
TIMESTAMP expiration_time = this->env_->NowMicros() + ttl * 1000000; |
|
|
EncodeFixed64(ts_string,expiration_time); |
|
|
EncodeFixed64(ts_string,expiration_time); |
|
|
//assert(sizeof(expiration_time) == sizeof(TIMESTAMP ));
|
|
|
//assert(sizeof(expiration_time) == sizeof(TIMESTAMP ));
|
|
|
// 追加原始 value 到 val_with_ts
|
|
|
// 追加原始 value 到 val_with_ts
|
|
@ -1236,12 +1247,6 @@ Status DBImpl::Put(const WriteOptions& options, const Slice& key, |
|
|
|
|
|
|
|
|
// 将 expiration_time 追加到 val_with_ts
|
|
|
// 将 expiration_time 追加到 val_with_ts
|
|
|
val_with_ts.append(ts_string,kTSLength); |
|
|
val_with_ts.append(ts_string,kTSLength); |
|
|
// std::cout << "val_with_ts in hex: ";
|
|
|
|
|
|
// for (unsigned char c : val_with_ts) {
|
|
|
|
|
|
// std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)c << " ";
|
|
|
|
|
|
// }
|
|
|
|
|
|
// std::cout << std::endl;
|
|
|
|
|
|
|
|
|
|
|
|
return DB::Put(options, key, Slice(val_with_ts)); |
|
|
return DB::Put(options, key, Slice(val_with_ts)); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -1549,45 +1554,24 @@ void DBImpl::AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) { |
|
|
* @return timestamp in val,and remove timestamp from val |
|
|
* @return timestamp in val,and remove timestamp from val |
|
|
*/ |
|
|
*/ |
|
|
uint64_t DBImpl::GetTS(std::string* val) { |
|
|
uint64_t DBImpl::GetTS(std::string* val) { |
|
|
//uint64_t expiration_time;
|
|
|
|
|
|
// 输出 val 的十六进制表示
|
|
|
|
|
|
// std::cout << "befor decode,val in hex: ";
|
|
|
|
|
|
// for (unsigned char c : *val) {
|
|
|
|
|
|
// std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)c << " ";
|
|
|
|
|
|
// }
|
|
|
|
|
|
// std::cout << std::endl;
|
|
|
|
|
|
|
|
|
|
|
|
auto expiration_time = DecodeFixed64(val->data() + val->size() - kTSLength); |
|
|
auto expiration_time = DecodeFixed64(val->data() + val->size() - kTSLength); |
|
|
//memcpy(&expiration_time, val->data() + val->size() - sizeof(TIMESTAMP), sizeof(TIMESTAMP));
|
|
|
|
|
|
|
|
|
|
|
|
val->resize(val->size() - kTSLength); |
|
|
val->resize(val->size() - kTSLength); |
|
|
// std::cout << "after decode,val in hex: ";
|
|
|
|
|
|
// for (unsigned char c : *val) {
|
|
|
|
|
|
// std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)c << " ";
|
|
|
|
|
|
// }
|
|
|
|
|
|
// std::cout << std::endl;
|
|
|
|
|
|
|
|
|
|
|
|
return expiration_time; |
|
|
return expiration_time; |
|
|
|
|
|
|
|
|
// Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) {
|
|
|
|
|
|
// if (pinnable_val->size() < kTSLength) {
|
|
|
|
|
|
// return Status::Corruption("Bad timestamp in key-value");
|
|
|
|
|
|
// }
|
|
|
|
|
|
// // Erasing characters which hold the TS
|
|
|
|
|
|
// pinnable_val->remove_suffix(kTSLength);
|
|
|
|
|
|
// return Status::OK();
|
|
|
|
|
|
// }
|
|
|
|
|
|
} |
|
|
} |
|
|
Status DBImpl::CheckIsExpire(std::string* value) { |
|
|
Status DBImpl::CheckIsExpire(std::string* value) { |
|
|
//debug 用
|
|
|
//debug 用
|
|
|
auto a = env_->GetCurrentTime(); |
|
|
|
|
|
|
|
|
auto a = env_->NowMicros(); |
|
|
auto b = GetTS(value); |
|
|
auto b = GetTS(value); |
|
|
// std::cout<<"get current time"<<a<<std::endl;
|
|
|
|
|
|
// std::cout << "get ts from val"<<b<<std::endl;
|
|
|
|
|
|
|
|
|
|
|
|
if(a > b){ |
|
|
if(a > b){ |
|
|
return Status::Expire("Expire",Slice()); |
|
|
return Status::Expire("Expire",Slice()); |
|
|
} |
|
|
} |
|
|
return Status(); |
|
|
return Status(); |
|
|
// if(env_->GetCurrentTime() > GetTS(value)){
|
|
|
|
|
|
// return Status::Expire("Expire",Slice());
|
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
/**
|
|
|
/**
|
|
|