Kaynağa Gözat

Merge commit '11e70db68095ee11b35310db3912f56802a74343' into cyq

pull/1/head
cyq 3 hafta önce
ebeveyn
işleme
6664512419
5 değiştirilmiş dosya ile 140 ekleme ve 84 silme
  1. +5
    -1
      db/db_impl.cc
  2. +36
    -16
      db/dbformat.cc
  3. +12
    -8
      db/dbformat.h
  4. +13
    -13
      db/memtable.cc
  5. +74
    -46
      test/ttl_mmtable_test.cc

+ 5
- 1
db/db_impl.cc Dosyayı Görüntüle

@ -1143,7 +1143,11 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
time_t nowTime;
time(&nowTime);
assert(nowTime > 0);
LookupKey lkey(key, snapshot, nowTime);
if (mem->Get(lkey, value, &s)) {
printf("in mem\n");
// Done

+ 36
- 16
db/dbformat.cc Dosyayı Görüntüle

@ -12,12 +12,14 @@
namespace leveldb {
static uint64_t PackSequenceAndTypeAndTtl(uint64_t seq, ValueType t, bool havettl) {
static uint64_t PackSequenceAndTypeAndTtlAndLookup(
uint64_t seq, ValueType t, bool havettl, bool islookup) {
assert(seq <= kMaxSequenceNumber);
assert(t <= kValueTypeForSeek);
return (seq << 8) | (havettl << 1) | t;
return (seq << 8) | (islookup << 2) | (havettl << 1) | t;
}
//下面有两个调这个函数的没改,也许也要修改标志位?
static uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
assert(seq <= kMaxSequenceNumber);
assert(t <= kValueTypeForSeek);
@ -28,8 +30,8 @@ void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {
result->append(key.user_key.data(), key.user_key.size());
if(key.deadTime != 0)
PutFixed64(result, key.deadTime);
PutFixed64(result, PackSequenceAndTypeAndTtl(
key.sequence, key.type, (key.deadTime != 0)));
PutFixed64(result, PackSequenceAndTypeAndTtlAndLookup(
key.sequence, key.type, (key.deadTime != 0), false));
}
std::string ParsedInternalKey::DebugString() const {
@ -58,17 +60,33 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
// increasing user key (according to user-supplied comparator)
// decreasing sequence number
// decreasing type (though sequence# should be enough to disambiguate)
Slice user_akey = ExtractUserKey(akey);
Slice user_bkey = ExtractUserKey(bkey);
std::string a = user_akey.ToString();
std::string b = user_bkey.ToString();
//目前看调用时都是a为node, b为key,万一有不是的,逻辑还得补充
//for debug
// std::string a = ExtractUserKey(akey).ToString();
// std::string b = ExtractUserKey(bkey).ToString();
int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
if (r == 0) {
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
if (anum > bnum) {
const uint64_t atag = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t btag = DecodeFixed64(bkey.data() + bkey.size() - 8);
const uint64_t aseq = atag >> 8;
const uint64_t bseq = btag >> 8;
if (aseq > bseq) {
r = -1;
} else if (anum < bnum) {
return r;
}
//原本应该找到了,新加判断
if((btag & 0b100) && (atag & 0b10)){ //一个是查询键,另一个有ttl
const uint64_t atime = DecodeFixed64(akey.data() + akey.size() - 16);
const uint64_t btime = DecodeFixed64(bkey.data() + bkey.size() - 16);
if(atime <= btime){//过期了继续找
r = -1;
return r;
}
}
if (aseq < bseq) {
r = +1;
}
}
@ -127,9 +145,9 @@ bool InternalFilterPolicy::KeyMayMatch(const Slice& key, const Slice& f) const {
return user_policy_->KeyMayMatch(ExtractUserKey(key), f);
}
LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
LookupKey::LookupKey(const Slice& user_key, SequenceNumber s, uint64_t nowTime) {
size_t usize = user_key.size();
size_t needed = usize + 13; // A conservative estimate
size_t needed = usize + 21; // A conservative estimate
char* dst;
if (needed <= sizeof(space_)) {
dst = space_;
@ -137,11 +155,13 @@ LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
dst = new char[needed];
}
start_ = dst;
dst = EncodeVarint32(dst, usize + 8);
dst = EncodeVarint32(dst, usize + 16);
kstart_ = dst;
std::memcpy(dst, user_key.data(), usize);
dst += usize;
EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
EncodeFixed64(dst, nowTime);
dst += 8;
EncodeFixed64(dst, PackSequenceAndTypeAndTtlAndLookup(s, kValueTypeForSeek, 0, true));
dst += 8;
end_ = dst;
}

+ 12
- 8
db/dbformat.h Dosyayı Görüntüle

@ -98,9 +98,10 @@ bool ParseInternalKey(const Slice& internal_key, ParsedInternalKey* result);
inline Slice ExtractUserKey(const Slice& internal_key) {
assert(internal_key.size() >= 8);
uint64_t num = DecodeFixed64(internal_key.data() + internal_key.size() - 8);
uint8_t havettl = (num & 0xff) >> 1;
uint8_t havettl = (num & 0b10) >> 1;
uint8_t islookup = (num & 0b100) >> 2;
size_t klen = internal_key.size() - 8;
if(havettl) klen -= 8;
if(havettl || islookup) klen -= 8;
Slice user_key = Slice(internal_key.data(), klen);
return user_key;
}
@ -179,14 +180,14 @@ inline int InternalKeyComparator::Compare(const InternalKey& a,
inline bool ParseInternalKey(const Slice& internal_key,
ParsedInternalKey* result) {
//islookup
const size_t n = internal_key.size();
if (n < 8) return false;
uint64_t tag = DecodeFixed64(internal_key.data() + n - 8);
uint8_t c = tag & 0xff;
uint8_t havettl = c >> 1;
assert(havettl <= 0b1);
uint8_t havettl = (c & 0b10) >> 1;
result->sequence = tag >> 8;
result->type = static_cast<ValueType>(c);
result->type = static_cast<ValueType>(c & 0b1);
if(havettl){
result->deadTime = DecodeFixed64(internal_key.data() + n - 16);
result->user_key = Slice(internal_key.data(), n - 16);
@ -202,7 +203,7 @@ class LookupKey {
public:
// Initialize *this for looking up user_key at a snapshot with
// the specified sequence number.
LookupKey(const Slice& user_key, SequenceNumber sequence);
LookupKey(const Slice& user_key, SequenceNumber sequence, uint64_t nowTime);
LookupKey(const LookupKey&) = delete;
LookupKey& operator=(const LookupKey&) = delete;
@ -216,14 +217,17 @@ class LookupKey {
Slice internal_key() const { return Slice(kstart_, end_ - kstart_); }
// Return the user key
Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); }
Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 16); }
private:
// We construct a char array of the form:
// klength varint32 <-- start_
// userkey char[klength] <-- kstart_
// tag uint64
// nowTime uint64
// tag uint64 0000 0101
// <-- end_
// userkey下insert时seq优先
// tag倒数第三位使
// The array is a suitable MemTable key.
// The suffix starting with "userkey" can be used as an InternalKey.
const char* start_;

+ 13
- 13
db/memtable.cc Dosyayı Görüntüle

@ -138,25 +138,25 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
std::cout << " get:" << ExtractUserKey(Slice(key_ptr, key_length)).ToString();
std::cout << " get:" << ExtractUserKey(Slice(key_ptr, key_length)).ToString() << std::endl;
if (comparator_.comparator.user_comparator()->Compare(
ExtractUserKey(Slice(key_ptr, key_length)), key.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0x01)) {
case kTypeValue: {
uint8_t havettl = (tag & 0xff) >> 1;
if(havettl){
time_t nowTime;
time(&nowTime);
assert(nowTime > 0);
const uint64_t deadTime = DecodeFixed64(key_ptr + key_length - 16);
if(static_cast<uint64_t>(nowTime) > deadTime){ //过期了
std::cout << nowTime << "dead:" << deadTime << std::endl;
*s = Status::NotFound(Slice());
return true; //todo:之前有没过期的key
}
}
// uint8_t havettl = (tag & 0xff) >> 1;
// if(havettl){
// time_t nowTime;
// time(&nowTime);
// assert(nowTime > 0);
// const uint64_t deadTime = DecodeFixed64(key_ptr + key_length - 16);
// if(static_cast<uint64_t>(nowTime) >= deadTime){ //过期了
// std::cout << nowTime << "dead:" << deadTime << std::endl;
// *s = Status::NotFound(Slice());
// return true; //todo:之前有没过期的key
// }
// }
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
return true;

+ 74
- 46
test/ttl_mmtable_test.cc Dosyayı Görüntüle

@ -3,12 +3,12 @@
#include "ctime"
#include <iostream>
#include <cstdlib>
#include "gtest/gtest.h"
using namespace leveldb;
constexpr int value_size = 2048;
constexpr int data_size = 4096 << 1;
constexpr int data_size = 2048 << 2;
Status OpenDB(std::string dbName, DB **db) {
Options options;
@ -16,7 +16,7 @@ Status OpenDB(std::string dbName, DB **db) {
return DB::Open(options, dbName, db);
}
void InsertData(DB *db, uint64_t ttl/* second */) {
void InsertData(DB *db, uint64_t ttl/* second */, int vsize = 1/*插不同长度的value*/) {
printf("-----inserting-----\n");
Status status;
WriteOptions writeOptions;
@ -27,59 +27,87 @@ void InsertData(DB *db, uint64_t ttl/* second */) {
//int key_ = rand() % key_num+1;
int key_ = i+1;
std::string key = std::to_string(key_);
std::string value(value_size, 'a');
std::string value(vsize, 'a');
status = db->Put(writeOptions, key, value, ttl);
assert(status.ok());
}
}
void GetData(DB *db, int size = (1 << 30)) {
void GetData(DB *db, bool isTimeout) {
printf("-----seeking-----\n");
ReadOptions readOptions;
Status status;
int key_num = data_size / value_size;
// 点查
srand(static_cast<unsigned int>(time(0)));
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
std::string value;
db->Get(readOptions, key, &value);
for (int i = 0; i < key_num; i++) {
//int key_ = rand() % key_num+1;
int key_ = i+1;
std::string key = std::to_string(key_);
std::string value;
status = db->Get(readOptions, key, &value);
if(isTimeout) assert(status.IsNotFound());
else{
assert(status.ok());
std::cout << value << std::endl;
}
}
}
int main(int argc, char** argv) {
void TimeOut() {
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
uint64_t ttl = 3;
InsertData(db, ttl);
printf("-----seeking-----\n");
ReadOptions readOptions;
Status status;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
for (int i = 0; i < key_num; i++) {
//int key_ = rand() % key_num+1;
int key_ = i+1;
std::string key = std::to_string(key_);
std::string value;
status = db->Get(readOptions, key, &value);
assert(status.ok());
}
Env::Default()->SleepForMicroseconds(ttl * 1000000);
for (int i = 0; i < key_num; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
std::string value;
status = db->Get(readOptions, key, &value);
assert(status.IsNotFound());
}
printf("success!\n");
printf("-----opening-----\n");
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
uint64_t ttl = 3;
InsertData(db, ttl);
GetData(db, false);
Env::Default()->SleepForMicroseconds(ttl * 1000000);
GetData(db, true);
delete(db);
printf("-----closing-----\n");
// printf("-----recovery-----\n");
// if(OpenDB("testdb", &db).ok() == false) {
// std::cerr << "open db failed" << std::endl;
// abort();
// }
// GetData(db, true);
printf("success!\n");
}
void GetEarlierData() {
DB *db;
printf("-----opening-----\n");
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
uint64_t ttl1 = 3;
uint64_t ttl2 = 5;
InsertData(db, ttl2);
InsertData(db, ttl1, 2);
//都没过期先找到后插的
Env::Default()->SleepForMicroseconds(1 * 1000000);
GetData(db, false);
//再找到前一次
Env::Default()->SleepForMicroseconds(3 * 1000000);
GetData(db, false);
delete(db);
printf("-----closing-----\n");
printf("success!\n");
}
int main(int argc, char** argv) {
GetEarlierData();
}

Yükleniyor…
İptal
Kaydet