Ver código fonte

修改compactRange函数,实现对最后一层的ttl过滤

lzj_version
林子骥 3 semanas atrás
pai
commit
940b4f10e6
4 arquivos alterados com 15 adições e 12 exclusões
  1. +8
    -5
      db/db_impl.cc
  2. +4
    -4
      db/version_set.cc
  3. +1
    -1
      test/simple_test.cc
  4. +2
    -2
      test/ttl_test.cc

+ 8
- 5
db/db_impl.cc Ver arquivo

@ -598,6 +598,7 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
for (int level = 0; level < max_level_with_files; level++) {
TEST_CompactRange(level, begin, end);
}
TEST_CompactRange(max_level_with_files, begin, end);
}
void DBImpl::TEST_CompactRange(int level, const Slice* begin,
@ -756,7 +757,7 @@ void DBImpl::BackgroundCompaction() {
status.ToString().c_str(), versions_->LevelSummary(&tmp));
} else {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
status = DoCompactionWork(compact);//
if (!status.ok()) {
RecordBackgroundError(status);
}
@ -917,7 +918,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}
Iterator* input = versions_->MakeInputIterator(compact->compaction);
// Release mutex while we're actually doing the compaction work
mutex_.Unlock();
@ -949,7 +949,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
break;
}
}
auto a = DecodeFixed64(input->value().data() + input->value().size() - kTSLength);
// Handle key/value, add to state, etc.
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
@ -981,6 +981,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
}else if(DecodeFixed64(input->value().data() + input->value().size() - kTSLength) < env_->NowMicros()){
drop = true;
}
last_sequence_for_key = ikey.sequence;
@ -1228,7 +1231,7 @@ Status DBImpl::Put(const WriteOptions& options, const Slice& key,
std::string val_with_ts;
val_with_ts.reserve(value.size() + kTSLength);
char ts_string[kTSLength];
TIMESTAMP expiration_time = this->env_->GetCurrentTime() + ttl * 1000;
TIMESTAMP expiration_time = this->env_->NowMicros() + ttl * 1000000;
EncodeFixed64(ts_string,expiration_time);
//assert(sizeof(expiration_time) == sizeof(TIMESTAMP ));
// 追加原始 value 到 val_with_ts
@ -1577,7 +1580,7 @@ uint64_t DBImpl::GetTS(std::string* val) {
}
Status DBImpl::CheckIsExpire(std::string* value) {
//debug 用
auto a = env_->GetCurrentTime();
auto a = env_->NowMicros();
auto b = GetTS(value);
// std::cout<<"get current time"<<a<<std::endl;
// std::cout << "get ts from val"<<b<<std::endl;

+ 4
- 4
db/version_set.cc Ver arquivo

@ -138,7 +138,7 @@ bool SomeFileOverlapsRange(const InternalKeyComparator& icmp,
return false;
}
// Binary search over file list
// Binary search over file list, 保证SSTable有序
uint32_t index = 0;
if (smallest_user_key != nullptr) {
// Find the earliest possible internal key for smallest_user_key
@ -1386,7 +1386,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
const int level = c->level();
InternalKey smallest, largest;
AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]);
AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]);//增加边界值,userkey相等的情况
GetRange(c->inputs_[0], &smallest, &largest);
current_->GetOverlappingInputs(level + 1, &smallest, &largest,
@ -1444,11 +1444,11 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
compact_pointer_[level] = largest.Encode().ToString();
c->edit_.SetCompactPointer(level, largest);
}
//得到基于当前level的所涉及的需要compact的文件,(level0可能会涉及到level1等)
Compaction* VersionSet::CompactRange(int level, const InternalKey* begin,
const InternalKey* end) {
std::vector<FileMetaData*> inputs;
current_->GetOverlappingInputs(level, begin, end, &inputs);
current_->GetOverlappingInputs(level, begin, end, &inputs);//得到一层的input
if (inputs.empty()) {
return nullptr;
}

+ 1
- 1
test/simple_test.cc Ver arquivo

@ -113,7 +113,7 @@ int main() {
std::string key = std::to_string(key_);
std::string value(1, 'a');
db->Put(writeOptions, key, value, ttl);
std::cout << "time to alive" << ttl << std::endl;
//std::cout << "time to alive" << ttl << std::endl;
ReadOptions readOptions;
std::string value_read;

+ 2
- 2
test/ttl_test.cc Ver arquivo

@ -39,7 +39,7 @@ void GetData(DB *db, int size = (1 << 30)) {
db->Get(readOptions, key, &value);
}
}
//
TEST(TestTTL, ReadTTL) {
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
@ -100,7 +100,7 @@ TEST(TestTTL, CompactionTTL) {
ranges2[0] = leveldb::Range("-", "A");
uint64_t sizes2[1];
db->GetApproximateSizes(ranges2, 1, sizes2);
ASSERT_EQ(sizes[0], 0);
ASSERT_EQ(sizes2[0], 0);
}

Carregando…
Cancelar
Salvar