diff --git a/db/db_impl.cc b/db/db_impl.cc index 3b9e38d..7eed3f8 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1143,7 +1143,11 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, { mutex_.Unlock(); // First look in the memtable, then in the immutable memtable (if any). - LookupKey lkey(key, snapshot); + time_t nowTime; + time(&nowTime); + assert(nowTime > 0); + + LookupKey lkey(key, snapshot, nowTime); if (mem->Get(lkey, value, &s)) { printf("in mem\n"); // Done diff --git a/db/dbformat.cc b/db/dbformat.cc index d868270..686d04f 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -12,12 +12,14 @@ namespace leveldb { -static uint64_t PackSequenceAndTypeAndTtl(uint64_t seq, ValueType t, bool havettl) { +static uint64_t PackSequenceAndTypeAndTtlAndLookup( + uint64_t seq, ValueType t, bool havettl, bool islookup) { assert(seq <= kMaxSequenceNumber); assert(t <= kValueTypeForSeek); - return (seq << 8) | (havettl << 1) | t; + return (seq << 8) | (islookup << 2) | (havettl << 1) | t; } +//下面有两个调这个函数的没改,也许也要修改标志位? static uint64_t PackSequenceAndType(uint64_t seq, ValueType t) { assert(seq <= kMaxSequenceNumber); assert(t <= kValueTypeForSeek); @@ -28,8 +30,8 @@ void AppendInternalKey(std::string* result, const ParsedInternalKey& key) { result->append(key.user_key.data(), key.user_key.size()); if(key.deadTime != 0) PutFixed64(result, key.deadTime); - PutFixed64(result, PackSequenceAndTypeAndTtl( - key.sequence, key.type, (key.deadTime != 0))); + PutFixed64(result, PackSequenceAndTypeAndTtlAndLookup( + key.sequence, key.type, (key.deadTime != 0), false)); } std::string ParsedInternalKey::DebugString() const { @@ -58,13 +60,33 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const { // increasing user key (according to user-supplied comparator) // decreasing sequence number // decreasing type (though sequence# should be enough to disambiguate) + //目前看调用时都是a为node, b为key,万一有不是的,逻辑还得补充 + //for debug + // std::string a = ExtractUserKey(akey).ToString(); + // std::string b = ExtractUserKey(bkey).ToString(); int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey)); if (r == 0) { - const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8) | 0b10; - const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8) | 0b10; - if (anum > bnum) { + const uint64_t atag = DecodeFixed64(akey.data() + akey.size() - 8); + const uint64_t btag = DecodeFixed64(bkey.data() + bkey.size() - 8); + + const uint64_t aseq = atag >> 8; + const uint64_t bseq = btag >> 8; + if (aseq > bseq) { r = -1; - } else if (anum < bnum) { + return r; + } + + //原本应该找到了,新加判断 + if((btag & 0b100) && (atag & 0b10)){ //一个是查询键,另一个有ttl + const uint64_t atime = DecodeFixed64(akey.data() + akey.size() - 16); + const uint64_t btime = DecodeFixed64(bkey.data() + bkey.size() - 16); + if(atime <= btime){//过期了继续找 + r = -1; + return r; + } + } + + if (aseq < bseq) { r = +1; } } @@ -123,9 +145,9 @@ bool InternalFilterPolicy::KeyMayMatch(const Slice& key, const Slice& f) const { return user_policy_->KeyMayMatch(ExtractUserKey(key), f); } -LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) { +LookupKey::LookupKey(const Slice& user_key, SequenceNumber s, uint64_t nowTime) { size_t usize = user_key.size(); - size_t needed = usize + 13; // A conservative estimate + size_t needed = usize + 21; // A conservative estimate char* dst; if (needed <= sizeof(space_)) { dst = space_; @@ -133,11 +155,13 @@ LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) { dst = new char[needed]; } start_ = dst; - dst = EncodeVarint32(dst, usize + 8); + dst = EncodeVarint32(dst, usize + 16); kstart_ = dst; std::memcpy(dst, user_key.data(), usize); dst += usize; - EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek)); + EncodeFixed64(dst, nowTime); + dst += 8; + EncodeFixed64(dst, PackSequenceAndTypeAndTtlAndLookup(s, kValueTypeForSeek, 0, true)); dst += 8; end_ = dst; } diff --git a/db/dbformat.h b/db/dbformat.h index 93a4386..367322f 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -98,9 +98,10 @@ bool ParseInternalKey(const Slice& internal_key, ParsedInternalKey* result); inline Slice ExtractUserKey(const Slice& internal_key) { assert(internal_key.size() >= 8); uint64_t num = DecodeFixed64(internal_key.data() + internal_key.size() - 8); - uint8_t havettl = (num & 0xff) >> 1; + uint8_t havettl = (num & 0b10) >> 1; + uint8_t islookup = (num & 0b100) >> 2; size_t klen = internal_key.size() - 8; - if(havettl) klen -= 8; + if(havettl || islookup) klen -= 8; Slice user_key = Slice(internal_key.data(), klen); return user_key; } @@ -179,14 +180,14 @@ inline int InternalKeyComparator::Compare(const InternalKey& a, inline bool ParseInternalKey(const Slice& internal_key, ParsedInternalKey* result) { + //不确定需不需要标识islookup,先没改 const size_t n = internal_key.size(); if (n < 8) return false; uint64_t tag = DecodeFixed64(internal_key.data() + n - 8); uint8_t c = tag & 0xff; - uint8_t havettl = c >> 1; - assert(havettl <= 0b1); + uint8_t havettl = (c & 0b10) >> 1; result->sequence = tag >> 8; - result->type = static_cast(c); + result->type = static_cast(c & 0b1); if(havettl){ result->deadTime = DecodeFixed64(internal_key.data() + n - 16); result->user_key = Slice(internal_key.data(), n - 16); @@ -202,7 +203,7 @@ class LookupKey { public: // Initialize *this for looking up user_key at a snapshot with // the specified sequence number. - LookupKey(const Slice& user_key, SequenceNumber sequence); + LookupKey(const Slice& user_key, SequenceNumber sequence, uint64_t nowTime); LookupKey(const LookupKey&) = delete; LookupKey& operator=(const LookupKey&) = delete; @@ -216,14 +217,17 @@ class LookupKey { Slice internal_key() const { return Slice(kstart_, end_ - kstart_); } // Return the user key - Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); } + Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 16); } private: // We construct a char array of the form: // klength varint32 <-- start_ // userkey char[klength] <-- kstart_ - // tag uint64 + // nowTime uint64 + // tag uint64 最后一个字节为0000 0101 // <-- end_ + // 同userkey下,原本(insert时)的比较器规则为seq优先,不考虑时间 + // 新增标识位(tag倒数第三位),使比较器考虑时间 // The array is a suitable MemTable key. // The suffix starting with "userkey" can be used as an InternalKey. const char* start_; diff --git a/db/memtable.cc b/db/memtable.cc index 4fd768a..e5c2d88 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -145,18 +145,18 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); switch (static_cast(tag & 0x01)) { case kTypeValue: { - uint8_t havettl = (tag & 0xff) >> 1; - if(havettl){ - time_t nowTime; - time(&nowTime); - assert(nowTime > 0); - const uint64_t deadTime = DecodeFixed64(key_ptr + key_length - 16); - if(static_cast(nowTime) >= deadTime){ //过期了 - std::cout << nowTime << "dead:" << deadTime << std::endl; - *s = Status::NotFound(Slice()); - return true; //todo:之前有没过期的key - } - } + // uint8_t havettl = (tag & 0xff) >> 1; + // if(havettl){ + // time_t nowTime; + // time(&nowTime); + // assert(nowTime > 0); + // const uint64_t deadTime = DecodeFixed64(key_ptr + key_length - 16); + // if(static_cast(nowTime) >= deadTime){ //过期了 + // std::cout << nowTime << "dead:" << deadTime << std::endl; + // *s = Status::NotFound(Slice()); + // return true; //todo:之前有没过期的key + // } + // } Slice v = GetLengthPrefixedSlice(key_ptr + key_length); value->assign(v.data(), v.size()); return true; diff --git a/test/ttl_mmtable_test.cc b/test/ttl_mmtable_test.cc index 8f2cd65..dccf9ed 100644 --- a/test/ttl_mmtable_test.cc +++ b/test/ttl_mmtable_test.cc @@ -3,12 +3,12 @@ #include "ctime" #include #include - +#include "gtest/gtest.h" using namespace leveldb; constexpr int value_size = 2048; -constexpr int data_size = 4096 << 1; +constexpr int data_size = 2048 << 2; Status OpenDB(std::string dbName, DB **db) { Options options; @@ -16,7 +16,7 @@ Status OpenDB(std::string dbName, DB **db) { return DB::Open(options, dbName, db); } -void InsertData(DB *db, uint64_t ttl/* second */) { +void InsertData(DB *db, uint64_t ttl/* second */, int vsize = 1/*插不同长度的value*/) { printf("-----inserting-----\n"); Status status; WriteOptions writeOptions; @@ -27,59 +27,87 @@ void InsertData(DB *db, uint64_t ttl/* second */) { //int key_ = rand() % key_num+1; int key_ = i+1; std::string key = std::to_string(key_); - std::string value(value_size, 'a'); + std::string value(vsize, 'a'); status = db->Put(writeOptions, key, value, ttl); assert(status.ok()); } } -void GetData(DB *db, int size = (1 << 30)) { +void GetData(DB *db, bool isTimeout) { + printf("-----seeking-----\n"); ReadOptions readOptions; + Status status; int key_num = data_size / value_size; - - // 点查 srand(static_cast(time(0))); - for (int i = 0; i < 100; i++) { - int key_ = rand() % key_num+1; - std::string key = std::to_string(key_); - std::string value; - db->Get(readOptions, key, &value); + for (int i = 0; i < key_num; i++) { + //int key_ = rand() % key_num+1; + int key_ = i+1; + std::string key = std::to_string(key_); + std::string value; + status = db->Get(readOptions, key, &value); + if(isTimeout) assert(status.IsNotFound()); + else{ + assert(status.ok()); + std::cout << value << std::endl; + } } } -int main(int argc, char** argv) { +void TimeOut() { DB *db; - if(OpenDB("testdb", &db).ok() == false) { - std::cerr << "open db failed" << std::endl; - abort(); - } - - uint64_t ttl = 3; - - InsertData(db, ttl); - - printf("-----seeking-----\n"); - ReadOptions readOptions; - Status status; - int key_num = data_size / value_size; - srand(static_cast(time(0))); - for (int i = 0; i < key_num; i++) { - //int key_ = rand() % key_num+1; - int key_ = i+1; - std::string key = std::to_string(key_); - std::string value; - status = db->Get(readOptions, key, &value); - assert(status.ok()); - } - - Env::Default()->SleepForMicroseconds(ttl * 1000000); - - for (int i = 0; i < key_num; i++) { - int key_ = rand() % key_num+1; - std::string key = std::to_string(key_); - std::string value; - status = db->Get(readOptions, key, &value); - assert(status.IsNotFound()); - } - printf("success!\n"); + printf("-----opening-----\n"); + if(OpenDB("testdb", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + uint64_t ttl = 3; + + InsertData(db, ttl); + GetData(db, false); + + Env::Default()->SleepForMicroseconds(ttl * 1000000); + GetData(db, true); + + delete(db); + printf("-----closing-----\n"); + + // printf("-----recovery-----\n"); + // if(OpenDB("testdb", &db).ok() == false) { + // std::cerr << "open db failed" << std::endl; + // abort(); + // } + // GetData(db, true); + printf("success!\n"); +} + +void GetEarlierData() { +DB *db; + printf("-----opening-----\n"); + if(OpenDB("testdb", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + uint64_t ttl1 = 3; + uint64_t ttl2 = 5; + + InsertData(db, ttl2); + InsertData(db, ttl1, 2); + + //都没过期先找到后插的 + Env::Default()->SleepForMicroseconds(1 * 1000000); + GetData(db, false); + + //再找到前一次 + Env::Default()->SleepForMicroseconds(3 * 1000000); + GetData(db, false); + + delete(db); + printf("-----closing-----\n"); + printf("success!\n"); +} + +int main(int argc, char** argv) { + GetEarlierData(); }