5 Commits

8 changed files with 167 additions and 29 deletions
Split View
  1. +4
    -1
      db/builder.cc
  2. +18
    -2
      db/db_impl.cc
  3. +5
    -1
      db/memtable.cc
  4. +29
    -4
      db/version_set.cc
  5. +1
    -0
      db/write_batch.cc
  6. +10
    -9
      table/block.cc
  7. +51
    -7
      table/table.cc
  8. +49
    -5
      test/ttl_test.cc

+ 4
- 1
db/builder.cc View File

@ -47,11 +47,14 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
if(parsed.deadTime == 0) parsed.deadTime = UINT64_MAX;
if(parsed.deadTime < nowTime) {
static int count = 0;
if(count % 1000 == 0) {
if(count % 10 == 0) {
std::cout<<"count "<<count++<<" drop dead in L0: "<<parsed.user_key.ToString()<<" "<<parsed.deadTime<<std::endl;
}
continue;
}
if(parsed.user_key.ToString() == "16636") {
printf("specific insert %s deadtime %lu file num %lu\n",parsed.user_key.ToString().c_str(),parsed.deadTime,meta->number);
}
meta->smallest_deadtime = std::min(meta->smallest_deadtime,parsed.deadTime);
meta->largest_deadtime = std::max(meta->largest_deadtime,parsed.deadTime);
}

+ 18
- 2
db/db_impl.cc View File

@ -988,13 +988,26 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
current_user_key.clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
} else if(ikey.deadTime != 0 && ikey.deadTime < nowTime){
} else if(ikey.deadTime != 0){
if(ikey.user_key.ToString() == "16636") {
printf("in compaction:specific key %s deadtime %lu nowTime %lu\n",ikey.user_key.ToString().c_str(),ikey.deadTime,nowTime);
}
if(ikey.deadTime < nowTime){
static int count = 0;
if(count % 1000 == 0) {
if(count % 10 == 0) {
std::cout<<"count "<<count<<" drop dead in Compaction: "<<ikey.user_key.ToString()<<" "<<ikey.deadTime<<std::endl;
count ++;
}
drop = true;
}
else {
static int count1 = 0;
if(count1 % 10 == 0) {
std::cout<<"count "<<count1<<" keep key with deadtime: "<<ikey.user_key.ToString()<<" "<<ikey.deadTime<<std::endl;
count1 ++;
}
drop = false;
}
} else {
if (!has_current_user_key ||
user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
@ -1053,6 +1066,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}
smallest_deadtime = std::min(smallest_deadtime,parsed.deadTime);
largest_deadtime = std::max(largest_deadtime,parsed.deadTime);
if(parsed.user_key == "16636")
printf("in compaction:keep specific key %s deadtime %lu nowTime %lu\n",
parsed.user_key.ToString().c_str(),parsed.deadTime,nowTime);
compact->builder->Add(key, input->value());

+ 5
- 1
db/memtable.cc View File

@ -140,6 +140,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
auto user_comparator = comparator_.comparator.user_comparator();
while(iter.Valid()) {
Slice now = GetLengthPrefixedSlice(iter.key()); //迭代器所处的位置
MemTable::KeyComparator comp_ = iter.get_comparator();
@ -151,7 +152,10 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
if(deadtime_k == 0) deadtime_k = UINT64_MAX;
if(deadtime_now == 0) deadtime_now = UINT64_MAX;
std::cout<<"key :"<<parsed_k.user_key.ToString()<<" k time: "<<deadtime_k<<" now time: "<<deadtime_now<<std::endl;
std::cout<<"k&now"<<parsed_k.user_key.ToString()<<" "<<parsed_now.user_key.ToString()<<std::endl;
std::cout<<"k&now "<<parsed_k.user_key.ToString()<<" "<<parsed_now.user_key.ToString()<<std::endl;
if(user_comparator->Compare(parsed_k.user_key,parsed_now.user_key) < 0) {
break;
}
if(deadtime_k > deadtime_now) {
iter.Next();
continue;

+ 29
- 4
db/version_set.cc View File

@ -83,11 +83,15 @@ Version::~Version() {
}
}
}
//寻找文件的时候也要考虑生存期
int FindFile(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>& files, const Slice& key) {
uint32_t left = 0;
uint32_t right = files.size();
ParsedInternalKey parsed;
ParseInternalKey(key,&parsed);
while (left < right) {
uint32_t mid = (left + right) / 2;
const FileMetaData* f = files[mid];
@ -101,6 +105,19 @@ int FindFile(const InternalKeyComparator& icmp,
right = mid;
}
}
while(right < files.size()) {
printf("file ind %d num %d largest deadtime %d parsed deadtime %d\n",
right,files[right]->number,files[right]->largest_deadtime,parsed.deadTime);
if(files[right]->largest_deadtime >= parsed.deadTime) {
break;
}
// if(icmp.InternalKeyComparator::Compare(files[right]->largest.Encode(), key) > 0) {
// break;
// }
right ++;
}
return right;
}
@ -266,7 +283,7 @@ static void SaveValue(void* arg, const Slice& ikey, const Slice& v) {
// std::cout<<"corrupt get"<<std::endl;
s->state = kCorrupt;
} else {
std::cout<<"tar&found"<<parsed_key.user_key.ToString()<<" "<<s->user_key.ToString()<<std::endl;
std::cout<<"found & target "<<parsed_key.user_key.ToString()<<" "<<s->user_key.ToString()<<std::endl;
if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) {
s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted;
if (s->state == kFound) {
@ -284,14 +301,18 @@ static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
void Version::ForEachOverlapping(Slice user_key, Slice internal_key, void* arg,
bool (*func)(void*, int, FileMetaData*)) {
const Comparator* ucmp = vset_->icmp_.user_comparator();
ParsedInternalKey parsed;
ParseInternalKey(internal_key,&parsed);
printf("parsed lookup deadtime : %d\n",parsed.deadTime);
// Search level-0 in order from newest to oldest.
std::vector<FileMetaData*> tmp;
tmp.reserve(files_[0].size());
for (uint32_t i = 0; i < files_[0].size(); i++) {
FileMetaData* f = files_[0][i];
//除了key的范围之外,还要考虑文件含有kv的最大deadtime
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
ucmp->Compare(user_key, f->largest.user_key()) <= 0 &&
f->largest_deadtime > parsed.deadTime) {
tmp.push_back(f);
}
}
@ -306,6 +327,7 @@ void Version::ForEachOverlapping(Slice user_key, Slice internal_key, void* arg,
// Search other levels.
for (int level = 1; level < config::kNumLevels; level++) {
std::cout<<"----------search in level "<<level<<"--------------\n";
size_t num_files = files_[level].size();
if (num_files == 0) continue;
@ -313,6 +335,7 @@ void Version::ForEachOverlapping(Slice user_key, Slice internal_key, void* arg,
uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key);
if (index < num_files) {
FileMetaData* f = files_[level][index];
std::cout<<"userkey fsmallest "<<user_key.ToString()<<" "<<f->smallest.user_key().ToString()<<std::endl;
if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) {
// All of "f" is past any data for user_key
} else {
@ -357,6 +380,8 @@ Status Version::Get(const ReadOptions& options, const LookupKey& k,
state->s = state->vset->table_cache_->Get(*state->options, f->number,
f->file_size, state->ikey,
&state->saver, SaveValue);
printf("file level %d num %d\n",level,f->number);
std::cout<<"state->s ->saver.state: "<<state->s.ok()<<" "<<state->saver.state<<std::endl;
if (!state->s.ok()) {
state->found = true;
return false;

+ 1
- 0
db/write_batch.cc View File

@ -125,6 +125,7 @@ void WriteBatch::Put(const Slice& key, const Slice& value, uint64_t ttl) {
assert(nowTime > 0);
assert(ttl > 0);
uint64_t deadTime = static_cast<uint64_t>(nowTime) + ttl;
// std::cout<<"now and dead time : "<<nowTime<<" "<<deadTime<<std::endl;
PutFixed64(&rep_, deadTime);
}

+ 10
- 9
table/block.cc View File

@ -222,15 +222,16 @@ class Block::Iter : public Iterator {
if (!ParseNextKey()) {
return;
}
ParsedInternalKey parsed_target,parsed_key_;
ParseInternalKey(target,&parsed_target);
ParseInternalKey(key_,&parsed_key_);
uint64_t deadtime_tar = parsed_target.deadTime;
uint64_t deadtime_key_ = parsed_key_.deadTime;
if(deadtime_tar == 0) deadtime_tar = UINT64_MAX;
if(deadtime_key_ == 0) deadtime_key_ = UINT64_MAX;
std::cout<<"key :"<<parsed_target.user_key.ToString()<<" tar time: "<<deadtime_tar<<" key time: "<<deadtime_key_<<std::endl;
if(deadtime_tar > deadtime_key_) continue;
// ParsedInternalKey parsed_target,parsed_key_;
// ParseInternalKey(target,&parsed_target);
// ParseInternalKey(key_,&parsed_key_);
// uint64_t deadtime_tar = parsed_target.deadTime;
// uint64_t deadtime_key_ = parsed_key_.deadTime;
// if(deadtime_tar == 0) deadtime_tar = UINT64_MAX;
// if(deadtime_key_ == 0) deadtime_key_ = UINT64_MAX;
// std::cout<<"target key :"<<parsed_target.user_key.ToString()<<" now key: "<<parsed_key_.user_key.ToString()
// <<" tar time: "<<deadtime_tar<<" now time: "<<deadtime_key_<<std::endl;
// if(deadtime_tar > deadtime_key_) continue;
if (Compare(key_, target) >= 0) {
return;
}

+ 51
- 7
table/table.cc View File

@ -14,7 +14,8 @@
#include "table/format.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "db/dbformat.h"
#include <iostream>
namespace leveldb {
struct Table::Rep {
@ -216,6 +217,10 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k, void* arg,
const Slice&)) {
Status s;
Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator);
// const Comparator *comparator = rep_->options.comparator;
const InternalKeyComparator *comparator = static_cast<const InternalKeyComparator *>(rep_->options.comparator);
const Comparator *user_comparator = comparator->user_comparator();
std::cout<<"-----------iter on index-------------\n";
iiter->Seek(k);
if (iiter->Valid()) {
Slice handle_value = iiter->value();
@ -225,13 +230,52 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k, void* arg,
!filter->KeyMayMatch(handle.offset(), k)) {
// Not found
} else {
Iterator* block_iter = BlockReader(this, options, iiter->value());
block_iter->Seek(k);
if (block_iter->Valid()) {
(*handle_result)(arg, block_iter->key(), block_iter->value());
// Iterator* block_iter = BlockReader(this, options, iiter->value());
std::cout<<"-----------iter on data-------------\n";
// block_iter->Seek(k);
// if (block_iter->Valid()) {
// (*handle_result)(arg, block_iter->key(), block_iter->value());
// }
// s = block_iter->status();
// delete block_iter;
bool found = false;
while(iiter->Valid()) {
std::cout<<"-----------iter on new block-------------\n";
Iterator* block_iter = BlockReader(this,options,iiter->value());
block_iter->Seek(k);
ParsedInternalKey target,now;
ParseInternalKey(k,&target);
ParseInternalKey(block_iter->key(),&now);
if(user_comparator->Compare(target.user_key,now.user_key) < 0) {
std::cout<<"target key :"<<target.user_key.ToString()<<" now key: "<<now.user_key.ToString()
<<" tar time: "<<target.deadTime<<" now time: "<<now.deadTime<<std::endl;
// (*handle_result)(arg,block_iter->key(),block_iter->value());
delete block_iter;
break;
}
while(block_iter->Valid()) {
ParseInternalKey(block_iter->key(),&now);
std::cout<<"target key :"<<target.user_key.ToString()<<" now key: "<<now.user_key.ToString()
<<" tar time: "<<target.deadTime<<" now time: "<<now.deadTime<<std::endl;
if(target.deadTime > now.deadTime) {
block_iter->Next();
continue;
}
if(user_comparator->Compare(target.user_key,now.user_key) < 0) {
break;
}
(*handle_result)(arg,block_iter->key(),block_iter->value());
found = true;
break;
// block_iter->Next();
}
s = block_iter->status();
delete block_iter;
if(found) break;
iiter->Next();
}
s = block_iter->status();
delete block_iter;
}
}
if (s.ok()) {

+ 49
- 5
test/ttl_test.cc View File

@ -17,15 +17,15 @@ Status OpenDB(std::string dbName, DB **db) {
return DB::Open(options, dbName, db);
}
void InsertData(DB *db, uint64_t ttl/* second */) {
void InsertData(DB *db, uint64_t ttl/* second */, int vsize = 0) {
WriteOptions writeOptions;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
srand(0);
for (int i = 0; i < key_num; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
std::string value(value_size, 'a');
std::string value(value_size + vsize, 'a');
db->Put(writeOptions, std::to_string(i+1), value, ttl);
// db->Put(writeOptions, key, value, ttl);
}
@ -36,7 +36,7 @@ void GetData(DB *db, int size = (1 << 30)) {
int key_num = data_size / value_size;
// 点查
srand(static_cast<unsigned int>(time(0)));
srand(0);
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
@ -60,7 +60,7 @@ TEST(TestTTL, ReadTTL) {
ReadOptions readOptions;
Status status;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
srand(0);
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
@ -83,6 +83,50 @@ TEST(TestTTL, ReadTTL) {
delete db;
}
TEST(TestTTL, GetEarlierData) {
DestroyDB("testdb",Options());
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
uint64_t ttl1 = 15;
uint64_t ttl2 = 115;
uint64_t extra_size = 1;
InsertData(db, ttl2);
// sleep(1);
InsertData(db, ttl1, extra_size); //后一个数据长度变化一下
//都没过期先找到后插的
Env::Default()->SleepForMicroseconds(1 * 1000000);
int key_num = data_size / value_size;
ReadOptions readOptions;
Status status;
srand(0);
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
std::string value;
status = db->Get(readOptions, key, &value);
ASSERT_TRUE(status.ok());
ASSERT_EQ(value.size(), value_size + extra_size);
}
//再找到前一次
Env::Default()->SleepForMicroseconds(ttl1 * 1000000);
// db->CompactRange(nullptr,nullptr);
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
std::string value;
status = db->Get(readOptions, key, &value);
ASSERT_TRUE(status.ok());
ASSERT_EQ(value.size(), value_size);
}
delete db;
}
TEST(TestTTL, CompactionTTL) {
DestroyDB("testdb",Options());
DB *db;

Loading…
Cancel
Save