Browse Source

修改了lookupkey和比较器逻辑,能在最新的key过期后查之前有无存在的key

pull/2/head
augurier 1 year ago
parent
commit
bbc97df611
5 changed files with 139 additions and 79 deletions
  1. +5
    -1
      db/db_impl.cc
  2. +36
    -12
      db/dbformat.cc
  3. +12
    -8
      db/dbformat.h
  4. +12
    -12
      db/memtable.cc
  5. +74
    -46
      test/ttl_mmtable_test.cc

+ 5
- 1
db/db_impl.cc View File

@ -1143,7 +1143,11 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
{ {
mutex_.Unlock(); mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any). // First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
time_t nowTime;
time(&nowTime);
assert(nowTime > 0);
LookupKey lkey(key, snapshot, nowTime);
if (mem->Get(lkey, value, &s)) { if (mem->Get(lkey, value, &s)) {
printf("in mem\n"); printf("in mem\n");
// Done // Done

+ 36
- 12
db/dbformat.cc View File

@ -12,12 +12,14 @@
namespace leveldb { namespace leveldb {
static uint64_t PackSequenceAndTypeAndTtl(uint64_t seq, ValueType t, bool havettl) {
static uint64_t PackSequenceAndTypeAndTtlAndLookup(
uint64_t seq, ValueType t, bool havettl, bool islookup) {
assert(seq <= kMaxSequenceNumber); assert(seq <= kMaxSequenceNumber);
assert(t <= kValueTypeForSeek); assert(t <= kValueTypeForSeek);
return (seq << 8) | (havettl << 1) | t;
return (seq << 8) | (islookup << 2) | (havettl << 1) | t;
} }
//下面有两个调这个函数的没改,也许也要修改标志位?
static uint64_t PackSequenceAndType(uint64_t seq, ValueType t) { static uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
assert(seq <= kMaxSequenceNumber); assert(seq <= kMaxSequenceNumber);
assert(t <= kValueTypeForSeek); assert(t <= kValueTypeForSeek);
@ -28,8 +30,8 @@ void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {
result->append(key.user_key.data(), key.user_key.size()); result->append(key.user_key.data(), key.user_key.size());
if(key.deadTime != 0) if(key.deadTime != 0)
PutFixed64(result, key.deadTime); PutFixed64(result, key.deadTime);
PutFixed64(result, PackSequenceAndTypeAndTtl(
key.sequence, key.type, (key.deadTime != 0)));
PutFixed64(result, PackSequenceAndTypeAndTtlAndLookup(
key.sequence, key.type, (key.deadTime != 0), false));
} }
std::string ParsedInternalKey::DebugString() const { std::string ParsedInternalKey::DebugString() const {
@ -58,13 +60,33 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
// increasing user key (according to user-supplied comparator) // increasing user key (according to user-supplied comparator)
// decreasing sequence number // decreasing sequence number
// decreasing type (though sequence# should be enough to disambiguate) // decreasing type (though sequence# should be enough to disambiguate)
//目前看调用时都是a为node, b为key,万一有不是的,逻辑还得补充
//for debug
// std::string a = ExtractUserKey(akey).ToString();
// std::string b = ExtractUserKey(bkey).ToString();
int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey)); int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
if (r == 0) { if (r == 0) {
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8) | 0b10;
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8) | 0b10;
if (anum > bnum) {
const uint64_t atag = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t btag = DecodeFixed64(bkey.data() + bkey.size() - 8);
const uint64_t aseq = atag >> 8;
const uint64_t bseq = btag >> 8;
if (aseq > bseq) {
r = -1; r = -1;
} else if (anum < bnum) {
return r;
}
//原本应该找到了,新加判断
if((btag & 0b100) && (atag & 0b10)){ //一个是查询键,另一个有ttl
const uint64_t atime = DecodeFixed64(akey.data() + akey.size() - 16);
const uint64_t btime = DecodeFixed64(bkey.data() + bkey.size() - 16);
if(atime <= btime){//过期了继续找
r = -1;
return r;
}
}
if (aseq < bseq) {
r = +1; r = +1;
} }
} }
@ -123,9 +145,9 @@ bool InternalFilterPolicy::KeyMayMatch(const Slice& key, const Slice& f) const {
return user_policy_->KeyMayMatch(ExtractUserKey(key), f); return user_policy_->KeyMayMatch(ExtractUserKey(key), f);
} }
LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
LookupKey::LookupKey(const Slice& user_key, SequenceNumber s, uint64_t nowTime) {
size_t usize = user_key.size(); size_t usize = user_key.size();
size_t needed = usize + 13; // A conservative estimate
size_t needed = usize + 21; // A conservative estimate
char* dst; char* dst;
if (needed <= sizeof(space_)) { if (needed <= sizeof(space_)) {
dst = space_; dst = space_;
@ -133,11 +155,13 @@ LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
dst = new char[needed]; dst = new char[needed];
} }
start_ = dst; start_ = dst;
dst = EncodeVarint32(dst, usize + 8);
dst = EncodeVarint32(dst, usize + 16);
kstart_ = dst; kstart_ = dst;
std::memcpy(dst, user_key.data(), usize); std::memcpy(dst, user_key.data(), usize);
dst += usize; dst += usize;
EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
EncodeFixed64(dst, nowTime);
dst += 8;
EncodeFixed64(dst, PackSequenceAndTypeAndTtlAndLookup(s, kValueTypeForSeek, 0, true));
dst += 8; dst += 8;
end_ = dst; end_ = dst;
} }

+ 12
- 8
db/dbformat.h View File

@ -98,9 +98,10 @@ bool ParseInternalKey(const Slice& internal_key, ParsedInternalKey* result);
inline Slice ExtractUserKey(const Slice& internal_key) { inline Slice ExtractUserKey(const Slice& internal_key) {
assert(internal_key.size() >= 8); assert(internal_key.size() >= 8);
uint64_t num = DecodeFixed64(internal_key.data() + internal_key.size() - 8); uint64_t num = DecodeFixed64(internal_key.data() + internal_key.size() - 8);
uint8_t havettl = (num & 0xff) >> 1;
uint8_t havettl = (num & 0b10) >> 1;
uint8_t islookup = (num & 0b100) >> 2;
size_t klen = internal_key.size() - 8; size_t klen = internal_key.size() - 8;
if(havettl) klen -= 8;
if(havettl || islookup) klen -= 8;
Slice user_key = Slice(internal_key.data(), klen); Slice user_key = Slice(internal_key.data(), klen);
return user_key; return user_key;
} }
@ -179,14 +180,14 @@ inline int InternalKeyComparator::Compare(const InternalKey& a,
inline bool ParseInternalKey(const Slice& internal_key, inline bool ParseInternalKey(const Slice& internal_key,
ParsedInternalKey* result) { ParsedInternalKey* result) {
//islookup
const size_t n = internal_key.size(); const size_t n = internal_key.size();
if (n < 8) return false; if (n < 8) return false;
uint64_t tag = DecodeFixed64(internal_key.data() + n - 8); uint64_t tag = DecodeFixed64(internal_key.data() + n - 8);
uint8_t c = tag & 0xff; uint8_t c = tag & 0xff;
uint8_t havettl = c >> 1;
assert(havettl <= 0b1);
uint8_t havettl = (c & 0b10) >> 1;
result->sequence = tag >> 8; result->sequence = tag >> 8;
result->type = static_cast<ValueType>(c);
result->type = static_cast<ValueType>(c & 0b1);
if(havettl){ if(havettl){
result->deadTime = DecodeFixed64(internal_key.data() + n - 16); result->deadTime = DecodeFixed64(internal_key.data() + n - 16);
result->user_key = Slice(internal_key.data(), n - 16); result->user_key = Slice(internal_key.data(), n - 16);
@ -202,7 +203,7 @@ class LookupKey {
public: public:
// Initialize *this for looking up user_key at a snapshot with // Initialize *this for looking up user_key at a snapshot with
// the specified sequence number. // the specified sequence number.
LookupKey(const Slice& user_key, SequenceNumber sequence);
LookupKey(const Slice& user_key, SequenceNumber sequence, uint64_t nowTime);
LookupKey(const LookupKey&) = delete; LookupKey(const LookupKey&) = delete;
LookupKey& operator=(const LookupKey&) = delete; LookupKey& operator=(const LookupKey&) = delete;
@ -216,14 +217,17 @@ class LookupKey {
Slice internal_key() const { return Slice(kstart_, end_ - kstart_); } Slice internal_key() const { return Slice(kstart_, end_ - kstart_); }
// Return the user key // Return the user key
Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); }
Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 16); }
private: private:
// We construct a char array of the form: // We construct a char array of the form:
// klength varint32 <-- start_ // klength varint32 <-- start_
// userkey char[klength] <-- kstart_ // userkey char[klength] <-- kstart_
// tag uint64
// nowTime uint64
// tag uint64 0000 0101
// <-- end_ // <-- end_
// userkey下insert时seq优先
// tag倒数第三位使
// The array is a suitable MemTable key. // The array is a suitable MemTable key.
// The suffix starting with "userkey" can be used as an InternalKey. // The suffix starting with "userkey" can be used as an InternalKey.
const char* start_; const char* start_;

+ 12
- 12
db/memtable.cc View File

@ -145,18 +145,18 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0x01)) { switch (static_cast<ValueType>(tag & 0x01)) {
case kTypeValue: { case kTypeValue: {
uint8_t havettl = (tag & 0xff) >> 1;
if(havettl){
time_t nowTime;
time(&nowTime);
assert(nowTime > 0);
const uint64_t deadTime = DecodeFixed64(key_ptr + key_length - 16);
if(static_cast<uint64_t>(nowTime) >= deadTime){ //过期了
std::cout << nowTime << "dead:" << deadTime << std::endl;
*s = Status::NotFound(Slice());
return true; //todo:之前有没过期的key
}
}
// uint8_t havettl = (tag & 0xff) >> 1;
// if(havettl){
// time_t nowTime;
// time(&nowTime);
// assert(nowTime > 0);
// const uint64_t deadTime = DecodeFixed64(key_ptr + key_length - 16);
// if(static_cast<uint64_t>(nowTime) >= deadTime){ //过期了
// std::cout << nowTime << "dead:" << deadTime << std::endl;
// *s = Status::NotFound(Slice());
// return true; //todo:之前有没过期的key
// }
// }
Slice v = GetLengthPrefixedSlice(key_ptr + key_length); Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size()); value->assign(v.data(), v.size());
return true; return true;

+ 74
- 46
test/ttl_mmtable_test.cc View File

@ -3,12 +3,12 @@
#include "ctime" #include "ctime"
#include <iostream> #include <iostream>
#include <cstdlib> #include <cstdlib>
#include "gtest/gtest.h"
using namespace leveldb; using namespace leveldb;
constexpr int value_size = 2048; constexpr int value_size = 2048;
constexpr int data_size = 4096 << 1;
constexpr int data_size = 2048 << 2;
Status OpenDB(std::string dbName, DB **db) { Status OpenDB(std::string dbName, DB **db) {
Options options; Options options;
@ -16,7 +16,7 @@ Status OpenDB(std::string dbName, DB **db) {
return DB::Open(options, dbName, db); return DB::Open(options, dbName, db);
} }
void InsertData(DB *db, uint64_t ttl/* second */) {
void InsertData(DB *db, uint64_t ttl/* second */, int vsize = 1/*插不同长度的value*/) {
printf("-----inserting-----\n"); printf("-----inserting-----\n");
Status status; Status status;
WriteOptions writeOptions; WriteOptions writeOptions;
@ -27,59 +27,87 @@ void InsertData(DB *db, uint64_t ttl/* second */) {
//int key_ = rand() % key_num+1; //int key_ = rand() % key_num+1;
int key_ = i+1; int key_ = i+1;
std::string key = std::to_string(key_); std::string key = std::to_string(key_);
std::string value(value_size, 'a');
std::string value(vsize, 'a');
status = db->Put(writeOptions, key, value, ttl); status = db->Put(writeOptions, key, value, ttl);
assert(status.ok()); assert(status.ok());
} }
} }
void GetData(DB *db, int size = (1 << 30)) {
void GetData(DB *db, bool isTimeout) {
printf("-----seeking-----\n");
ReadOptions readOptions; ReadOptions readOptions;
Status status;
int key_num = data_size / value_size; int key_num = data_size / value_size;
// 点查
srand(static_cast<unsigned int>(time(0))); srand(static_cast<unsigned int>(time(0)));
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
std::string value;
db->Get(readOptions, key, &value);
for (int i = 0; i < key_num; i++) {
//int key_ = rand() % key_num+1;
int key_ = i+1;
std::string key = std::to_string(key_);
std::string value;
status = db->Get(readOptions, key, &value);
if(isTimeout) assert(status.IsNotFound());
else{
assert(status.ok());
std::cout << value << std::endl;
}
} }
} }
int main(int argc, char** argv) {
void TimeOut() {
DB *db; DB *db;
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
uint64_t ttl = 3;
InsertData(db, ttl);
printf("-----seeking-----\n");
ReadOptions readOptions;
Status status;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
for (int i = 0; i < key_num; i++) {
//int key_ = rand() % key_num+1;
int key_ = i+1;
std::string key = std::to_string(key_);
std::string value;
status = db->Get(readOptions, key, &value);
assert(status.ok());
}
Env::Default()->SleepForMicroseconds(ttl * 1000000);
for (int i = 0; i < key_num; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
std::string value;
status = db->Get(readOptions, key, &value);
assert(status.IsNotFound());
}
printf("success!\n");
printf("-----opening-----\n");
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
uint64_t ttl = 3;
InsertData(db, ttl);
GetData(db, false);
Env::Default()->SleepForMicroseconds(ttl * 1000000);
GetData(db, true);
delete(db);
printf("-----closing-----\n");
// printf("-----recovery-----\n");
// if(OpenDB("testdb", &db).ok() == false) {
// std::cerr << "open db failed" << std::endl;
// abort();
// }
// GetData(db, true);
printf("success!\n");
}
void GetEarlierData() {
DB *db;
printf("-----opening-----\n");
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
uint64_t ttl1 = 3;
uint64_t ttl2 = 5;
InsertData(db, ttl2);
InsertData(db, ttl1, 2);
//都没过期先找到后插的
Env::Default()->SleepForMicroseconds(1 * 1000000);
GetData(db, false);
//再找到前一次
Env::Default()->SleepForMicroseconds(3 * 1000000);
GetData(db, false);
delete(db);
printf("-----closing-----\n");
printf("success!\n");
}
int main(int argc, char** argv) {
GetEarlierData();
} }

Loading…
Cancel
Save