Explorar el Código

添加文件的ttl元数据管理

lzj_version
林子骥 hace 2 semanas
padre
commit
439e59f973
Se han modificado 7 ficheros con 97 adiciones y 43 borrados
  1. +16
    -6
      db/builder.cc
  2. +20
    -6
      db/db_impl.cc
  3. +9
    -0
      db/version_edit.cc
  4. +17
    -3
      db/version_edit.h
  5. +1
    -1
      db/version_set.cc
  6. +27
    -27
      test/ttl_test.cc
  7. +7
    -0
      util/coding.cc

+ 16
- 6
db/builder.cc Ver fichero

@ -14,6 +14,16 @@
namespace leveldb {
/**
* description: SSTable
* @param dbname
* @param env
* @param options
* @param table_cache
* @param iter
* @param meta sstable的相关元数据
* @return
*/
Status BuildTable(const std::string& dbname, Env* env, const Options& options,
TableCache* table_cache, Iterator* iter, FileMetaData* meta) {
Status s;
@ -31,17 +41,17 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
TableBuilder* builder = new TableBuilder(options, file);
meta->smallest.DecodeFrom(iter->key());//这里是internal_key
// auto tmp_ts = DecodeFixed64(iter->value().data() + iter->value().size() - kTSLength);
// meta->oldest_ts = tmp_ts;
// meta->newer_ts = tmp_ts;
auto tmp_ts = DecodeFixed64(iter->value().data() + iter->value().size() - kTSLength);
meta->oldest_ts = tmp_ts;
meta->newer_ts = tmp_ts;
Slice key;
for (; iter->Valid(); iter->Next()) {
key = iter->key();
builder->Add(key, iter->value());
// tmp_ts = DecodeFixed64(iter->value().data() + iter->value().size() - kTSLength);
// meta->oldest_ts = meta->oldest_ts > tmp_ts ? tmp_ts : meta->oldest_ts;
// meta->newer_ts = meta->newer_ts > tmp_ts ? meta->newer_ts : tmp_ts;
tmp_ts = DecodeFixed64(iter->value().data() + iter->value().size() - kTSLength);
meta->oldest_ts = meta->oldest_ts > tmp_ts ? tmp_ts : meta->oldest_ts;
meta->newer_ts = meta->newer_ts > tmp_ts ? meta->newer_ts : tmp_ts;
}
if (!key.empty()) {
meta->largest.DecodeFrom(key);

+ 20
- 6
db/db_impl.cc Ver fichero

@ -61,6 +61,7 @@ struct DBImpl::CompactionState {
uint64_t number;
uint64_t file_size;
InternalKey smallest, largest;
TIMESTAMP old_ts,new_ts;
};
Output* current_output() { return &outputs[outputs.size() - 1]; }
@ -536,11 +537,15 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
if (s.ok() && meta.file_size > 0) {
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();
const TIMESTAMP new_ts = meta.newer_ts;
const TIMESTAMP old_ts = meta.oldest_ts;
if (base != nullptr) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);// TODO :基于timestamp和size和seek的新的选择规则
}
// edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
// meta.largest);
edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
meta.largest);
meta.largest,old_ts,new_ts);
}
CompactionStats stats;
@ -744,8 +749,10 @@ void DBImpl::BackgroundCompaction() {
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->RemoveFile(c->level(), f->number);
// c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
// f->largest);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
f->largest);
f->largest,f->oldest_ts,f->newer_ts);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
@ -819,6 +826,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
out.number = file_number;
out.smallest.Clear();
out.largest.Clear();
out.old_ts = UINT64_MAX;
out.new_ts = 0;
compact->outputs.push_back(out);
mutex_.Unlock();
}
@ -924,6 +933,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
input->SeekToFirst();
Status status;
ParsedInternalKey ikey;
TIMESTAMP ts = 0;
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
@ -949,7 +959,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
break;
}
}
auto a = DecodeFixed64(input->value().data() + input->value().size() - kTSLength);
//auto a = DecodeFixed64(input->value().data() + input->value().size() - kTSLength);//debug
// Handle key/value, add to state, etc.
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
@ -981,7 +991,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
}else if(DecodeFixed64(input->value().data() + input->value().size() - kTSLength) < env_->NowMicros()){
}else if((ts = DecodeFixed64(input->value().data() + input->value().size() - kTSLength)) < env_->NowMicros()){
drop = true;
}
@ -1010,6 +1020,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
assert(ts != 0);
//auto b = compact->current_output()->old_ts;
compact->current_output()->old_ts = std::min(compact->current_output()->old_ts,ts);
compact->current_output()->new_ts = std::max(compact->current_output()->new_ts,ts);
compact->builder->Add(key, input->value());
// Close output file if it is big enough
@ -1156,7 +1170,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {
//stats.now_ts = this->env_->NowMicros();
stats.now_ts = this->env_->NowMicros();
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}

+ 9
- 0
db/version_edit.cc Ver fichero

@ -81,6 +81,8 @@ void VersionEdit::EncodeTo(std::string* dst) const {
PutVarint64(dst, f.file_size);
PutLengthPrefixedSlice(dst, f.smallest.Encode());
PutLengthPrefixedSlice(dst, f.largest.Encode());
PutFixed64(dst,f.oldest_ts);
PutFixed64(dst,f.newer_ts);
}
}
@ -180,6 +182,9 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
GetVarint64(&input, &f.file_size) &&
GetInternalKey(&input, &f.smallest) &&
GetInternalKey(&input, &f.largest)) {
f.newer_ts = DecodeFixed64(input.data());
f.oldest_ts = DecodeFixed64(input.data() + kTSLength);
input.remove_prefix(2 * kTSLength);
new_files_.push_back(std::make_pair(level, f));
} else {
msg = "new-file entry";
@ -250,6 +255,10 @@ std::string VersionEdit::DebugString() const {
r.append(f.smallest.DebugString());
r.append(" .. ");
r.append(f.largest.DebugString());
r.append(" ");
AppendNumberTo(&r,f.oldest_ts);
r.append("..");
AppendNumberTo(&r,f.newer_ts);
}
r.append("\n}\n");
return r;

+ 17
- 3
db/version_edit.h Ver fichero

@ -16,7 +16,7 @@ namespace leveldb {
class VersionSet;
struct FileMetaData {
FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) {}
FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0),oldest_ts(UINT64_MAX),newer_ts(0) {}
int refs;
int allowed_seeks; // Seeks allowed until compaction
@ -24,8 +24,8 @@ struct FileMetaData {
uint64_t file_size; // File size in bytes
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table
// TIMESTAMP oldest_ts;
// TIMESTAMP newer_ts;
TIMESTAMP oldest_ts;
TIMESTAMP newer_ts;
};
@ -73,6 +73,20 @@ class VersionEdit {
f.largest = largest;
new_files_.push_back(std::make_pair(level, f));
}
void AddFile(int level, uint64_t file, uint64_t file_size,
const InternalKey& smallest, const InternalKey& largest,const TIMESTAMP old_ts,const TIMESTAMP new_ts) {
FileMetaData f;
f.number = file;
f.file_size = file_size;
f.smallest = smallest;
f.largest = largest;
f.newer_ts = new_ts;
f.oldest_ts = old_ts;
new_files_.push_back(std::make_pair(level, f));
}
// Delete the specified "file" from the specified "level".
void RemoveFile(int level, uint64_t file) {

+ 1
- 1
db/version_set.cc Ver fichero

@ -351,7 +351,7 @@ Status Version::Get(const ReadOptions& options, const LookupKey& k,
state->last_file_read = f;
state->last_file_read_level = level;
// if(state->stats->now_ts > f->newer_ts)return false;
if(state->stats->now_ts > f->newer_ts)return false;
state->s = state->vset->table_cache_->Get(*state->options, f->number,
f->file_size, state->ikey,
&state->saver, SaveValue);

+ 27
- 27
test/ttl_test.cc Ver fichero

@ -75,33 +75,33 @@ TEST(TestTTL, ReadTTL) {
Env::Default()->SleepForMicroseconds( 1000);
}
TEST(TestTTL, CompactionTTL) {
DestroyDB("testdb", Options());
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
uint64_t ttl = 20;
InsertData(db, ttl);
leveldb::Range ranges[1];
ranges[0] = leveldb::Range("-", "A");
uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_GT(sizes[0], 0);
Env::Default()->SleepForMicroseconds(ttl * 1000000);
db->CompactRange(nullptr, nullptr);
leveldb::Range ranges2[1];
ranges2[0] = leveldb::Range("-", "A");
uint64_t sizes2[1];
db->GetApproximateSizes(ranges2, 1, sizes2);
ASSERT_EQ(sizes2[0], 0);
}
//TEST(TestTTL, CompactionTTL) {
// DestroyDB("testdb", Options());
// DB *db;
// if(OpenDB("testdb", &db).ok() == false) {
// std::cerr << "open db failed" << std::endl;
// abort();
// }
//
// uint64_t ttl = 20;
// InsertData(db, ttl);
//
// leveldb::Range ranges[1];
// ranges[0] = leveldb::Range("-", "A");
// uint64_t sizes[1];
// db->GetApproximateSizes(ranges, 1, sizes);
// ASSERT_GT(sizes[0], 0);
//
// Env::Default()->SleepForMicroseconds(ttl * 1000000);
//
// db->CompactRange(nullptr, nullptr);
//
// leveldb::Range ranges2[1];
// ranges2[0] = leveldb::Range("-", "A");
// uint64_t sizes2[1];
// db->GetApproximateSizes(ranges2, 1, sizes2);
// ASSERT_EQ(sizes2[0], 0);
//}
int main(int argc, char** argv) {

+ 7
- 0
util/coding.cc Ver fichero

@ -69,6 +69,7 @@ void PutVarint64(std::string* dst, uint64_t v) {
dst->append(buf, ptr - buf);
}
void PutLengthPrefixedSlice(std::string* dst, const Slice& value) {
PutVarint32(dst, value.size());
dst->append(value.data(), value.size());
@ -142,6 +143,12 @@ bool GetVarint64(Slice* input, uint64_t* value) {
}
}
//bool GetFixed64(const char* ptr,uint64_t* value){
// *value = DecodeFixed64(ptr);
// return
//}
bool GetLengthPrefixedSlice(Slice* input, Slice* result) {
uint32_t len;
if (GetVarint32(input, &len) && input->size() >= len) {

Cargando…
Cancelar
Guardar