From 22f704c537c4c21b4c9b195a680db07678fde463 Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Mon, 21 Oct 2024 14:56:53 +0800 Subject: [PATCH 1/3] only focus on mmtable, still some bugs --- CMakeLists.txt | 4 ++ db/c.cc | 2 +- db/db_impl.cc | 15 +++++--- db/db_impl.h | 2 +- db/db_test.cc | 6 +-- db/dbformat.cc | 15 +++++++- db/dbformat.h | 37 ++++++++++++++----- db/dumpfile.cc | 2 +- db/memtable.cc | 43 ++++++++++++++++++++-- db/memtable.h | 2 +- db/write_batch.cc | 42 ++++++++++++++++++--- include/leveldb/db.h | 6 +-- include/leveldb/write_batch.h | 5 ++- test/ttl_mmtable_test.cc | 85 +++++++++++++++++++++++++++++++++++++++++++ test/ttl_test.cc | 42 ++++++++++----------- 15 files changed, 249 insertions(+), 59 deletions(-) create mode 100644 test/ttl_mmtable_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 54b14a1..705abce 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -524,6 +524,10 @@ add_executable(db_test2 ) target_link_libraries(db_test2 PRIVATE leveldb) +add_executable(ttl_mmtable_test + "${PROJECT_SOURCE_DIR}/test/ttl_mmtable_test.cc" +) +target_link_libraries(ttl_mmtable_test PRIVATE leveldb) add_executable(ttl_test "${PROJECT_SOURCE_DIR}/test/ttl_test.cc" diff --git a/db/c.cc b/db/c.cc index 8bdde38..d8feb81 100644 --- a/db/c.cc +++ b/db/c.cc @@ -349,7 +349,7 @@ void leveldb_writebatch_iterate(const leveldb_writebatch_t* b, void* state, void* state_; void (*put_)(void*, const char* k, size_t klen, const char* v, size_t vlen); void (*deleted_)(void*, const char* k, size_t klen); - void Put(const Slice& key, const Slice& value) override { + void Put(const Slice& key, const Slice& value, uint64_t deadTime) override { (*put_)(state_, key.data(), key.size(), value.data(), value.size()); } void Delete(const Slice& key) override { diff --git a/db/db_impl.cc b/db/db_impl.cc index f96d245..3b9e38d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1145,10 +1145,13 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, // First look in the memtable, then in the immutable memtable (if any). LookupKey lkey(key, snapshot); if (mem->Get(lkey, value, &s)) { + printf("in mem\n"); // Done } else if (imm != nullptr && imm->Get(lkey, value, &s)) { + printf("in immem\n"); // Done } else { + printf("in current\n"); s = current->Get(options, lkey, value, &stats); have_stat_update = true; } @@ -1194,8 +1197,9 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) { } // Convenience methods -Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { - return DB::Put(o, key, val); +Status DBImpl::Put(const WriteOptions& o, const Slice& key, + const Slice& val, uint64_t ttl) { + return DB::Put(o, key, val, ttl); } Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { @@ -1232,7 +1236,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { // into mem_. { mutex_.Unlock(); - status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); + status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); //这里会有影响吗 bool sync_error = false; if (status.ok() && options.sync) { status = logfile_->Sync(); @@ -1485,9 +1489,10 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { // Default implementations of convenience methods that subclasses of DB // can call if they wish -Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { +Status DB::Put(const WriteOptions& opt, const Slice& key, + const Slice& value, uint64_t ttl) { WriteBatch batch; - batch.Put(key, value); + batch.Put(key, value, ttl); return Write(opt, &batch); } diff --git a/db/db_impl.h b/db/db_impl.h index c7b0172..ef81411 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -37,7 +37,7 @@ class DBImpl : public DB { // Implementations of the DB interface Status Put(const WriteOptions&, const Slice& key, - const Slice& value) override; + const Slice& value, uint64_t ttl) override; Status Delete(const WriteOptions&, const Slice& key) override; Status Write(const WriteOptions& options, WriteBatch* updates) override; Status Get(const ReadOptions& options, const Slice& key, diff --git a/db/db_test.cc b/db/db_test.cc index a4a84cd..4f570a1 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2114,8 +2114,8 @@ class ModelDB : public DB { explicit ModelDB(const Options& options) : options_(options) {} ~ModelDB() override = default; - Status Put(const WriteOptions& o, const Slice& k, const Slice& v) override { - return DB::Put(o, k, v); + Status Put(const WriteOptions& o, const Slice& k, const Slice& v, uint64_t ttl = 0) override { + return DB::Put(o, k, v, ttl); } Status Delete(const WriteOptions& o, const Slice& key) override { return DB::Delete(o, key); @@ -2149,7 +2149,7 @@ class ModelDB : public DB { class Handler : public WriteBatch::Handler { public: KVMap* map_; - void Put(const Slice& key, const Slice& value) override { + void Put(const Slice& key, const Slice& value, uint64_t deadTime = 0) override { (*map_)[key.ToString()] = value.ToString(); } void Delete(const Slice& key) override { map_->erase(key.ToString()); } diff --git a/db/dbformat.cc b/db/dbformat.cc index 2a5749f..8efd3e9 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -12,6 +12,12 @@ namespace leveldb { +static uint64_t PackSequenceAndTypeAndTtl(uint64_t seq, ValueType t, bool havettl) { + assert(seq <= kMaxSequenceNumber); + assert(t <= kValueTypeForSeek); + return (seq << 8) | (havettl << 1) | t; +} + static uint64_t PackSequenceAndType(uint64_t seq, ValueType t) { assert(seq <= kMaxSequenceNumber); assert(t <= kValueTypeForSeek); @@ -20,7 +26,10 @@ static uint64_t PackSequenceAndType(uint64_t seq, ValueType t) { void AppendInternalKey(std::string* result, const ParsedInternalKey& key) { result->append(key.user_key.data(), key.user_key.size()); - PutFixed64(result, PackSequenceAndType(key.sequence, key.type)); + if(key.deadTime != 0) + PutFixed64(result, key.deadTime); + PutFixed64(result, PackSequenceAndTypeAndTtl( + key.sequence, key.type, (key.deadTime != 0))); } std::string ParsedInternalKey::DebugString() const { @@ -49,6 +58,10 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const { // increasing user key (according to user-supplied comparator) // decreasing sequence number // decreasing type (though sequence# should be enough to disambiguate) + Slice user_akey = ExtractUserKey(akey); + Slice user_bkey = ExtractUserKey(bkey); + std::string a = user_akey.ToString(); + std::string b = user_bkey.ToString(); int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey)); if (r == 0) { const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8); diff --git a/db/dbformat.h b/db/dbformat.h index a1c30ed..93a4386 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -16,6 +16,7 @@ #include "leveldb/table_builder.h" #include "util/coding.h" #include "util/logging.h" +#include "iostream" namespace leveldb { @@ -69,17 +70,19 @@ static const SequenceNumber kMaxSequenceNumber = ((0x1ull << 56) - 1); struct ParsedInternalKey { Slice user_key; SequenceNumber sequence; + uint64_t deadTime; ValueType type; ParsedInternalKey() {} // Intentionally left uninitialized (for speed) - ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t) - : user_key(u), sequence(seq), type(t) {} + ParsedInternalKey(const Slice& u, const SequenceNumber& seq, + ValueType t, uint64_t d = 0) + : user_key(u), sequence(seq), type(t), deadTime(d) {} std::string DebugString() const; }; // Return the length of the encoding of "key". inline size_t InternalKeyEncodingLength(const ParsedInternalKey& key) { - return key.user_key.size() + 8; + return key.user_key.size() + 8 + (key.deadTime != 0) * 8; } // Append the serialization of "key" to *result. @@ -94,7 +97,12 @@ bool ParseInternalKey(const Slice& internal_key, ParsedInternalKey* result); // Returns the user key portion of an internal key. inline Slice ExtractUserKey(const Slice& internal_key) { assert(internal_key.size() >= 8); - return Slice(internal_key.data(), internal_key.size() - 8); + uint64_t num = DecodeFixed64(internal_key.data() + internal_key.size() - 8); + uint8_t havettl = (num & 0xff) >> 1; + size_t klen = internal_key.size() - 8; + if(havettl) klen -= 8; + Slice user_key = Slice(internal_key.data(), klen); + return user_key; } // A comparator for internal keys that uses a specified comparator for @@ -137,8 +145,9 @@ class InternalKey { public: InternalKey() {} // Leave rep_ as empty to indicate it is invalid - InternalKey(const Slice& user_key, SequenceNumber s, ValueType t) { - AppendInternalKey(&rep_, ParsedInternalKey(user_key, s, t)); + InternalKey(const Slice& user_key, SequenceNumber s, + ValueType t, uint64_t deadTime = 0) { + AppendInternalKey(&rep_, ParsedInternalKey(user_key, s, t, deadTime)); } bool DecodeFrom(const Slice& s) { @@ -172,11 +181,19 @@ inline bool ParseInternalKey(const Slice& internal_key, ParsedInternalKey* result) { const size_t n = internal_key.size(); if (n < 8) return false; - uint64_t num = DecodeFixed64(internal_key.data() + n - 8); - uint8_t c = num & 0xff; - result->sequence = num >> 8; + uint64_t tag = DecodeFixed64(internal_key.data() + n - 8); + uint8_t c = tag & 0xff; + uint8_t havettl = c >> 1; + assert(havettl <= 0b1); + result->sequence = tag >> 8; result->type = static_cast(c); - result->user_key = Slice(internal_key.data(), n - 8); + if(havettl){ + result->deadTime = DecodeFixed64(internal_key.data() + n - 16); + result->user_key = Slice(internal_key.data(), n - 16); + } else { + result->deadTime = 0; + result->user_key = Slice(internal_key.data(), n - 8); + } return (c <= static_cast(kTypeValue)); } diff --git a/db/dumpfile.cc b/db/dumpfile.cc index 6085475..0d4a24b 100644 --- a/db/dumpfile.cc +++ b/db/dumpfile.cc @@ -74,7 +74,7 @@ Status PrintLogContents(Env* env, const std::string& fname, // Called on every item found in a WriteBatch. class WriteBatchItemPrinter : public WriteBatch::Handler { public: - void Put(const Slice& key, const Slice& value) override { + void Put(const Slice& key, const Slice& value, uint64_t deadTime) override { std::string r = " put '"; AppendEscapedStringTo(&r, key); r += "' '"; diff --git a/db/memtable.cc b/db/memtable.cc index 4f09340..6563e8c 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -8,6 +8,8 @@ #include "leveldb/env.h" #include "leveldb/iterator.h" #include "util/coding.h" +#include "ctime" +#include "iostream" namespace leveldb { @@ -74,16 +76,24 @@ class MemTableIterator : public Iterator { Iterator* MemTable::NewIterator() { return new MemTableIterator(&table_); } void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, - const Slice& value) { + const Slice& value, uint64_t deadTime) { // Format of an entry is concatenation of: // key_size : varint32 of internal_key.size() // key bytes : char[internal_key.size()] // tag : uint64((sequence << 8) | type) // value_size : varint32 of value.size() // value bytes : char[value.size()] + //变成 + // key_size : varint32 of internal_key.size() + // key bytes : char[internal_key.size()] + // (deadTime) : uint64(可能有) + // tag : uint64((sequence << 8) | havettl << 1 | type) + // value_size : varint32 of value.size() + // value bytes : char[value.size()] size_t key_size = key.size(); size_t val_size = value.size(); size_t internal_key_size = key_size + 8; + if(deadTime != 0) internal_key_size += 8; const size_t encoded_len = VarintLength(internal_key_size) + internal_key_size + VarintLength(val_size) + val_size; @@ -91,22 +101,33 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, char* p = EncodeVarint32(buf, internal_key_size); std::memcpy(p, key.data(), key_size); p += key_size; - EncodeFixed64(p, (s << 8) | type); + + if(deadTime == 0){ + EncodeFixed64(p, (s << 8) | type); + } else { + EncodeFixed64(p, deadTime); + p += 8; + EncodeFixed64(p, (s << 8) | 0b10 | type); + } p += 8; + p = EncodeVarint32(p, val_size); std::memcpy(p, value.data(), val_size); assert(p + val_size == buf + encoded_len); table_.Insert(buf); + std::cout << "insert:" << key.ToString() << std::endl; } bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { Slice memkey = key.memtable_key(); Table::Iterator iter(&table_); iter.Seek(memkey.data()); + std::cout << "search:" << key.user_key().ToString() << " valid?" << iter.Valid(); if (iter.Valid()) { // entry format is: // klength varint32 // userkey char[klength] + // (deadTime) uint64 // tag uint64 // vlength varint32 // value char[vlength] @@ -116,12 +137,26 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { const char* entry = iter.key(); uint32_t key_length; const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); + + std::cout << " get:" << ExtractUserKey(Slice(key_ptr, key_length)).ToString(); if (comparator_.comparator.user_comparator()->Compare( - Slice(key_ptr, key_length - 8), key.user_key()) == 0) { + ExtractUserKey(Slice(key_ptr, key_length)), key.user_key()) == 0) { // Correct user key const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); - switch (static_cast(tag & 0xff)) { + switch (static_cast(tag & 0x01)) { case kTypeValue: { + uint8_t havettl = (tag & 0xff) >> 1; + if(havettl){ + time_t nowTime; + time(&nowTime); + assert(nowTime > 0); + const uint64_t deadTime = DecodeFixed64(key_ptr + key_length - 16); + if(static_cast(nowTime) > deadTime){ //过期了 + std::cout << nowTime << "dead:" << deadTime << std::endl; + *s = Status::NotFound(Slice()); + return true; //todo:之前有没过期的key + } + } Slice v = GetLengthPrefixedSlice(key_ptr + key_length); value->assign(v.data(), v.size()); return true; diff --git a/db/memtable.h b/db/memtable.h index 9d986b1..f5dbdcc 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -54,7 +54,7 @@ class MemTable { // specified sequence number and with the specified type. // Typically value will be empty if type==kTypeDeletion. void Add(SequenceNumber seq, ValueType type, const Slice& key, - const Slice& value); + const Slice& value, uint64_t deadTime = 0); // If memtable contains a value for key, store it in *value and return true. // If memtable contains a deletion for key, store a NotFound() error diff --git a/db/write_batch.cc b/db/write_batch.cc index b54313c..89c5421 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -6,13 +6,20 @@ // sequence: fixed64 // count: fixed32 // data: record[count] -// record := +// 原来record := // kTypeValue varstring varstring | // kTypeDeletion varstring // varstring := // len: varint32 // data: uint8[len] +// 增加ttl后record := +// kTypeValue havettl (deadtime) varstring varstring | +// kTypeDeletion varstring +// varstring := +// len: varint32 +// data: uint8[len] + #include "leveldb/write_batch.h" #include "db/dbformat.h" @@ -20,6 +27,8 @@ #include "db/write_batch_internal.h" #include "leveldb/db.h" #include "util/coding.h" +#include "ctime" +#include namespace leveldb { @@ -54,12 +63,22 @@ Status WriteBatch::Iterate(Handler* handler) const { input.remove_prefix(1); switch (tag) { case kTypeValue: + { + char havettl = input[0]; + input.remove_prefix(1); + uint64_t deadTime = 0; + if(havettl){ + deadTime = DecodeFixed64(input.data()); + input.remove_prefix(8); + } + if (GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value)) { - handler->Put(key, value); + handler->Put(key, value, deadTime); } else { return Status::Corruption("bad WriteBatch Put"); } + } break; case kTypeDeletion: if (GetLengthPrefixedSlice(&input, &key)) { @@ -95,9 +114,20 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { EncodeFixed64(&b->rep_[0], seq); } -void WriteBatch::Put(const Slice& key, const Slice& value) { +void WriteBatch::Put(const Slice& key, const Slice& value, uint64_t ttl) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); rep_.push_back(static_cast(kTypeValue)); + + rep_.push_back(static_cast(ttl != 0)); //1:havettl + if(ttl != 0){ + time_t nowTime; + time(&nowTime); + assert(nowTime > 0); + assert(ttl > 0); + uint64_t deadTime = static_cast(nowTime) + ttl; + PutFixed64(&rep_, deadTime); + } + PutLengthPrefixedSlice(&rep_, key); PutLengthPrefixedSlice(&rep_, value); } @@ -118,12 +148,12 @@ class MemTableInserter : public WriteBatch::Handler { SequenceNumber sequence_; MemTable* mem_; - void Put(const Slice& key, const Slice& value) override { - mem_->Add(sequence_, kTypeValue, key, value); + void Put(const Slice& key, const Slice& value, uint64_t deadTime) override { + mem_->Add(sequence_, kTypeValue, key, value, deadTime); sequence_++; } void Delete(const Slice& key) override { - mem_->Add(sequence_, kTypeDeletion, key, Slice()); + mem_->Add(sequence_, kTypeDeletion, key, Slice(), 0); sequence_++; } }; diff --git a/include/leveldb/db.h b/include/leveldb/db.h index bf4eec5..b60a2ad 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -63,8 +63,8 @@ class LEVELDB_EXPORT DB { // Set the database entry for "key" to "value". Returns OK on success, // and a non-OK status on error. // Note: consider setting options.sync = true. - virtual Status Put(const WriteOptions& options, const Slice& key, - const Slice& value) = 0; + // virtual Status Put(const WriteOptions& options, const Slice& key, + // const Slice& value) = 0; // Remove the database entry (if any) for "key". Returns OK on // success, and a non-OK status on error. It is not an error if "key" @@ -149,7 +149,7 @@ class LEVELDB_EXPORT DB { // ----------------------------For TTL----------------------------- // 为当前key设置ttl,过期后自动失效 virtual Status Put(const WriteOptions& options, const Slice& key, - const Slice& value, uint64_t ttl) = 0; + const Slice& value, uint64_t ttl = 0) = 0; }; // Destroy the contents of the specified database. diff --git a/include/leveldb/write_batch.h b/include/leveldb/write_batch.h index 94d4115..a0c99d5 100644 --- a/include/leveldb/write_batch.h +++ b/include/leveldb/write_batch.h @@ -25,6 +25,7 @@ #include "leveldb/export.h" #include "leveldb/status.h" +#include namespace leveldb { @@ -35,7 +36,7 @@ class LEVELDB_EXPORT WriteBatch { class LEVELDB_EXPORT Handler { public: virtual ~Handler(); - virtual void Put(const Slice& key, const Slice& value) = 0; + virtual void Put(const Slice& key, const Slice& value, uint64_t deadTime = 0) = 0; virtual void Delete(const Slice& key) = 0; }; @@ -48,7 +49,7 @@ class LEVELDB_EXPORT WriteBatch { ~WriteBatch(); // Store the mapping "key->value" in the database. - void Put(const Slice& key, const Slice& value); + void Put(const Slice& key, const Slice& value, uint64_t ttl = 0); // If the database contains a mapping for "key", erase it. Else do nothing. void Delete(const Slice& key); diff --git a/test/ttl_mmtable_test.cc b/test/ttl_mmtable_test.cc new file mode 100644 index 0000000..8f2cd65 --- /dev/null +++ b/test/ttl_mmtable_test.cc @@ -0,0 +1,85 @@ +#include "leveldb/env.h" +#include "leveldb/db.h" +#include "ctime" +#include +#include + + +using namespace leveldb; + +constexpr int value_size = 2048; +constexpr int data_size = 4096 << 1; + +Status OpenDB(std::string dbName, DB **db) { + Options options; + options.create_if_missing = true; + return DB::Open(options, dbName, db); +} + +void InsertData(DB *db, uint64_t ttl/* second */) { + printf("-----inserting-----\n"); + Status status; + WriteOptions writeOptions; + int key_num = data_size / value_size; + srand(static_cast(time(0))); + + for (int i = 0; i < key_num; i++) { + //int key_ = rand() % key_num+1; + int key_ = i+1; + std::string key = std::to_string(key_); + std::string value(value_size, 'a'); + status = db->Put(writeOptions, key, value, ttl); + assert(status.ok()); + } +} + +void GetData(DB *db, int size = (1 << 30)) { + ReadOptions readOptions; + int key_num = data_size / value_size; + + // 点查 + srand(static_cast(time(0))); + for (int i = 0; i < 100; i++) { + int key_ = rand() % key_num+1; + std::string key = std::to_string(key_); + std::string value; + db->Get(readOptions, key, &value); + } +} + +int main(int argc, char** argv) { +DB *db; + if(OpenDB("testdb", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + uint64_t ttl = 3; + + InsertData(db, ttl); + + printf("-----seeking-----\n"); + ReadOptions readOptions; + Status status; + int key_num = data_size / value_size; + srand(static_cast(time(0))); + for (int i = 0; i < key_num; i++) { + //int key_ = rand() % key_num+1; + int key_ = i+1; + std::string key = std::to_string(key_); + std::string value; + status = db->Get(readOptions, key, &value); + assert(status.ok()); + } + + Env::Default()->SleepForMicroseconds(ttl * 1000000); + + for (int i = 0; i < key_num; i++) { + int key_ = rand() % key_num+1; + std::string key = std::to_string(key_); + std::string value; + status = db->Get(readOptions, key, &value); + assert(status.IsNotFound()); + } + printf("success!\n"); +} diff --git a/test/ttl_test.cc b/test/ttl_test.cc index 06f4cda..12cd583 100644 --- a/test/ttl_test.cc +++ b/test/ttl_test.cc @@ -78,33 +78,33 @@ TEST(TestTTL, ReadTTL) { } } -TEST(TestTTL, CompactionTTL) { - DB *db; +// TEST(TestTTL, CompactionTTL) { +// DB *db; - if(OpenDB("testdb", &db).ok() == false) { - std::cerr << "open db failed" << std::endl; - abort(); - } +// if(OpenDB("testdb", &db).ok() == false) { +// std::cerr << "open db failed" << std::endl; +// abort(); +// } - uint64_t ttl = 20; - InsertData(db, ttl); +// uint64_t ttl = 20; +// InsertData(db, ttl); - leveldb::Range ranges[1]; - ranges[0] = leveldb::Range("-", "A"); - uint64_t sizes[1]; - db->GetApproximateSizes(ranges, 1, sizes); - ASSERT_GT(sizes[0], 0); +// leveldb::Range ranges[1]; +// ranges[0] = leveldb::Range("-", "A"); +// uint64_t sizes[1]; +// db->GetApproximateSizes(ranges, 1, sizes); +// ASSERT_GT(sizes[0], 0); - Env::Default()->SleepForMicroseconds(ttl * 1000000); +// Env::Default()->SleepForMicroseconds(ttl * 1000000); - db->CompactRange(nullptr, nullptr); +// db->CompactRange(nullptr, nullptr); - leveldb::Range ranges[1]; - ranges[0] = leveldb::Range("-", "A"); - uint64_t sizes[1]; - db->GetApproximateSizes(ranges, 1, sizes); - ASSERT_EQ(sizes[0], 0); -} +// leveldb::Range ranges[1]; +// ranges[0] = leveldb::Range("-", "A"); +// uint64_t sizes[1]; +// db->GetApproximateSizes(ranges, 1, sizes); +// ASSERT_EQ(sizes[0], 0); +// } int main(int argc, char** argv) { -- 2.8.3 From 885d1dfcb0d507fee1f72a2ad23f82e53a700431 Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Tue, 22 Oct 2024 13:31:51 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E8=B7=91=E9=80=9A=E4=BA=86mmtable=E4=B8=AD?= =?UTF-8?q?=E7=9A=84=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- db/dbformat.cc | 8 ++------ db/memtable.cc | 4 ++-- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/db/dbformat.cc b/db/dbformat.cc index 8efd3e9..d868270 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -58,14 +58,10 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const { // increasing user key (according to user-supplied comparator) // decreasing sequence number // decreasing type (though sequence# should be enough to disambiguate) - Slice user_akey = ExtractUserKey(akey); - Slice user_bkey = ExtractUserKey(bkey); - std::string a = user_akey.ToString(); - std::string b = user_bkey.ToString(); int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey)); if (r == 0) { - const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8); - const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8); + const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8) | 0b10; + const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8) | 0b10; if (anum > bnum) { r = -1; } else if (anum < bnum) { diff --git a/db/memtable.cc b/db/memtable.cc index 6563e8c..4fd768a 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -138,7 +138,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { uint32_t key_length; const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); - std::cout << " get:" << ExtractUserKey(Slice(key_ptr, key_length)).ToString(); + std::cout << " get:" << ExtractUserKey(Slice(key_ptr, key_length)).ToString() << std::endl; if (comparator_.comparator.user_comparator()->Compare( ExtractUserKey(Slice(key_ptr, key_length)), key.user_key()) == 0) { // Correct user key @@ -151,7 +151,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { time(&nowTime); assert(nowTime > 0); const uint64_t deadTime = DecodeFixed64(key_ptr + key_length - 16); - if(static_cast(nowTime) > deadTime){ //过期了 + if(static_cast(nowTime) >= deadTime){ //过期了 std::cout << nowTime << "dead:" << deadTime << std::endl; *s = Status::NotFound(Slice()); return true; //todo:之前有没过期的key -- 2.8.3 From bbc97df611f0a66824ca1be4e2d006a41ce58584 Mon Sep 17 00:00:00 2001 From: augurier <14434658+augurier@user.noreply.gitee.com> Date: Fri, 25 Oct 2024 16:55:47 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BA=86lookupkey?= =?UTF-8?q?=E5=92=8C=E6=AF=94=E8=BE=83=E5=99=A8=E9=80=BB=E8=BE=91=EF=BC=8C?= =?UTF-8?q?=E8=83=BD=E5=9C=A8=E6=9C=80=E6=96=B0=E7=9A=84key=E8=BF=87?= =?UTF-8?q?=E6=9C=9F=E5=90=8E=E6=9F=A5=E4=B9=8B=E5=89=8D=E6=9C=89=E6=97=A0?= =?UTF-8?q?=E5=AD=98=E5=9C=A8=E7=9A=84key?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- db/db_impl.cc | 6 ++- db/dbformat.cc | 48 ++++++++++++++----- db/dbformat.h | 20 ++++---- db/memtable.cc | 24 +++++----- test/ttl_mmtable_test.cc | 120 +++++++++++++++++++++++++++++------------------ 5 files changed, 139 insertions(+), 79 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 3b9e38d..7eed3f8 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1143,7 +1143,11 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, { mutex_.Unlock(); // First look in the memtable, then in the immutable memtable (if any). - LookupKey lkey(key, snapshot); + time_t nowTime; + time(&nowTime); + assert(nowTime > 0); + + LookupKey lkey(key, snapshot, nowTime); if (mem->Get(lkey, value, &s)) { printf("in mem\n"); // Done diff --git a/db/dbformat.cc b/db/dbformat.cc index d868270..686d04f 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -12,12 +12,14 @@ namespace leveldb { -static uint64_t PackSequenceAndTypeAndTtl(uint64_t seq, ValueType t, bool havettl) { +static uint64_t PackSequenceAndTypeAndTtlAndLookup( + uint64_t seq, ValueType t, bool havettl, bool islookup) { assert(seq <= kMaxSequenceNumber); assert(t <= kValueTypeForSeek); - return (seq << 8) | (havettl << 1) | t; + return (seq << 8) | (islookup << 2) | (havettl << 1) | t; } +//下面有两个调这个函数的没改,也许也要修改标志位? static uint64_t PackSequenceAndType(uint64_t seq, ValueType t) { assert(seq <= kMaxSequenceNumber); assert(t <= kValueTypeForSeek); @@ -28,8 +30,8 @@ void AppendInternalKey(std::string* result, const ParsedInternalKey& key) { result->append(key.user_key.data(), key.user_key.size()); if(key.deadTime != 0) PutFixed64(result, key.deadTime); - PutFixed64(result, PackSequenceAndTypeAndTtl( - key.sequence, key.type, (key.deadTime != 0))); + PutFixed64(result, PackSequenceAndTypeAndTtlAndLookup( + key.sequence, key.type, (key.deadTime != 0), false)); } std::string ParsedInternalKey::DebugString() const { @@ -58,13 +60,33 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const { // increasing user key (according to user-supplied comparator) // decreasing sequence number // decreasing type (though sequence# should be enough to disambiguate) + //目前看调用时都是a为node, b为key,万一有不是的,逻辑还得补充 + //for debug + // std::string a = ExtractUserKey(akey).ToString(); + // std::string b = ExtractUserKey(bkey).ToString(); int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey)); if (r == 0) { - const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8) | 0b10; - const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8) | 0b10; - if (anum > bnum) { + const uint64_t atag = DecodeFixed64(akey.data() + akey.size() - 8); + const uint64_t btag = DecodeFixed64(bkey.data() + bkey.size() - 8); + + const uint64_t aseq = atag >> 8; + const uint64_t bseq = btag >> 8; + if (aseq > bseq) { r = -1; - } else if (anum < bnum) { + return r; + } + + //原本应该找到了,新加判断 + if((btag & 0b100) && (atag & 0b10)){ //一个是查询键,另一个有ttl + const uint64_t atime = DecodeFixed64(akey.data() + akey.size() - 16); + const uint64_t btime = DecodeFixed64(bkey.data() + bkey.size() - 16); + if(atime <= btime){//过期了继续找 + r = -1; + return r; + } + } + + if (aseq < bseq) { r = +1; } } @@ -123,9 +145,9 @@ bool InternalFilterPolicy::KeyMayMatch(const Slice& key, const Slice& f) const { return user_policy_->KeyMayMatch(ExtractUserKey(key), f); } -LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) { +LookupKey::LookupKey(const Slice& user_key, SequenceNumber s, uint64_t nowTime) { size_t usize = user_key.size(); - size_t needed = usize + 13; // A conservative estimate + size_t needed = usize + 21; // A conservative estimate char* dst; if (needed <= sizeof(space_)) { dst = space_; @@ -133,11 +155,13 @@ LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) { dst = new char[needed]; } start_ = dst; - dst = EncodeVarint32(dst, usize + 8); + dst = EncodeVarint32(dst, usize + 16); kstart_ = dst; std::memcpy(dst, user_key.data(), usize); dst += usize; - EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek)); + EncodeFixed64(dst, nowTime); + dst += 8; + EncodeFixed64(dst, PackSequenceAndTypeAndTtlAndLookup(s, kValueTypeForSeek, 0, true)); dst += 8; end_ = dst; } diff --git a/db/dbformat.h b/db/dbformat.h index 93a4386..367322f 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -98,9 +98,10 @@ bool ParseInternalKey(const Slice& internal_key, ParsedInternalKey* result); inline Slice ExtractUserKey(const Slice& internal_key) { assert(internal_key.size() >= 8); uint64_t num = DecodeFixed64(internal_key.data() + internal_key.size() - 8); - uint8_t havettl = (num & 0xff) >> 1; + uint8_t havettl = (num & 0b10) >> 1; + uint8_t islookup = (num & 0b100) >> 2; size_t klen = internal_key.size() - 8; - if(havettl) klen -= 8; + if(havettl || islookup) klen -= 8; Slice user_key = Slice(internal_key.data(), klen); return user_key; } @@ -179,14 +180,14 @@ inline int InternalKeyComparator::Compare(const InternalKey& a, inline bool ParseInternalKey(const Slice& internal_key, ParsedInternalKey* result) { + //不确定需不需要标识islookup,先没改 const size_t n = internal_key.size(); if (n < 8) return false; uint64_t tag = DecodeFixed64(internal_key.data() + n - 8); uint8_t c = tag & 0xff; - uint8_t havettl = c >> 1; - assert(havettl <= 0b1); + uint8_t havettl = (c & 0b10) >> 1; result->sequence = tag >> 8; - result->type = static_cast(c); + result->type = static_cast(c & 0b1); if(havettl){ result->deadTime = DecodeFixed64(internal_key.data() + n - 16); result->user_key = Slice(internal_key.data(), n - 16); @@ -202,7 +203,7 @@ class LookupKey { public: // Initialize *this for looking up user_key at a snapshot with // the specified sequence number. - LookupKey(const Slice& user_key, SequenceNumber sequence); + LookupKey(const Slice& user_key, SequenceNumber sequence, uint64_t nowTime); LookupKey(const LookupKey&) = delete; LookupKey& operator=(const LookupKey&) = delete; @@ -216,14 +217,17 @@ class LookupKey { Slice internal_key() const { return Slice(kstart_, end_ - kstart_); } // Return the user key - Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); } + Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 16); } private: // We construct a char array of the form: // klength varint32 <-- start_ // userkey char[klength] <-- kstart_ - // tag uint64 + // nowTime uint64 + // tag uint64 最后一个字节为0000 0101 // <-- end_ + // 同userkey下,原本(insert时)的比较器规则为seq优先,不考虑时间 + // 新增标识位(tag倒数第三位),使比较器考虑时间 // The array is a suitable MemTable key. // The suffix starting with "userkey" can be used as an InternalKey. const char* start_; diff --git a/db/memtable.cc b/db/memtable.cc index 4fd768a..e5c2d88 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -145,18 +145,18 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); switch (static_cast(tag & 0x01)) { case kTypeValue: { - uint8_t havettl = (tag & 0xff) >> 1; - if(havettl){ - time_t nowTime; - time(&nowTime); - assert(nowTime > 0); - const uint64_t deadTime = DecodeFixed64(key_ptr + key_length - 16); - if(static_cast(nowTime) >= deadTime){ //过期了 - std::cout << nowTime << "dead:" << deadTime << std::endl; - *s = Status::NotFound(Slice()); - return true; //todo:之前有没过期的key - } - } + // uint8_t havettl = (tag & 0xff) >> 1; + // if(havettl){ + // time_t nowTime; + // time(&nowTime); + // assert(nowTime > 0); + // const uint64_t deadTime = DecodeFixed64(key_ptr + key_length - 16); + // if(static_cast(nowTime) >= deadTime){ //过期了 + // std::cout << nowTime << "dead:" << deadTime << std::endl; + // *s = Status::NotFound(Slice()); + // return true; //todo:之前有没过期的key + // } + // } Slice v = GetLengthPrefixedSlice(key_ptr + key_length); value->assign(v.data(), v.size()); return true; diff --git a/test/ttl_mmtable_test.cc b/test/ttl_mmtable_test.cc index 8f2cd65..dccf9ed 100644 --- a/test/ttl_mmtable_test.cc +++ b/test/ttl_mmtable_test.cc @@ -3,12 +3,12 @@ #include "ctime" #include #include - +#include "gtest/gtest.h" using namespace leveldb; constexpr int value_size = 2048; -constexpr int data_size = 4096 << 1; +constexpr int data_size = 2048 << 2; Status OpenDB(std::string dbName, DB **db) { Options options; @@ -16,7 +16,7 @@ Status OpenDB(std::string dbName, DB **db) { return DB::Open(options, dbName, db); } -void InsertData(DB *db, uint64_t ttl/* second */) { +void InsertData(DB *db, uint64_t ttl/* second */, int vsize = 1/*插不同长度的value*/) { printf("-----inserting-----\n"); Status status; WriteOptions writeOptions; @@ -27,59 +27,87 @@ void InsertData(DB *db, uint64_t ttl/* second */) { //int key_ = rand() % key_num+1; int key_ = i+1; std::string key = std::to_string(key_); - std::string value(value_size, 'a'); + std::string value(vsize, 'a'); status = db->Put(writeOptions, key, value, ttl); assert(status.ok()); } } -void GetData(DB *db, int size = (1 << 30)) { +void GetData(DB *db, bool isTimeout) { + printf("-----seeking-----\n"); ReadOptions readOptions; + Status status; int key_num = data_size / value_size; - - // 点查 srand(static_cast(time(0))); - for (int i = 0; i < 100; i++) { - int key_ = rand() % key_num+1; - std::string key = std::to_string(key_); - std::string value; - db->Get(readOptions, key, &value); + for (int i = 0; i < key_num; i++) { + //int key_ = rand() % key_num+1; + int key_ = i+1; + std::string key = std::to_string(key_); + std::string value; + status = db->Get(readOptions, key, &value); + if(isTimeout) assert(status.IsNotFound()); + else{ + assert(status.ok()); + std::cout << value << std::endl; + } } } -int main(int argc, char** argv) { +void TimeOut() { DB *db; - if(OpenDB("testdb", &db).ok() == false) { - std::cerr << "open db failed" << std::endl; - abort(); - } - - uint64_t ttl = 3; - - InsertData(db, ttl); - - printf("-----seeking-----\n"); - ReadOptions readOptions; - Status status; - int key_num = data_size / value_size; - srand(static_cast(time(0))); - for (int i = 0; i < key_num; i++) { - //int key_ = rand() % key_num+1; - int key_ = i+1; - std::string key = std::to_string(key_); - std::string value; - status = db->Get(readOptions, key, &value); - assert(status.ok()); - } - - Env::Default()->SleepForMicroseconds(ttl * 1000000); - - for (int i = 0; i < key_num; i++) { - int key_ = rand() % key_num+1; - std::string key = std::to_string(key_); - std::string value; - status = db->Get(readOptions, key, &value); - assert(status.IsNotFound()); - } - printf("success!\n"); + printf("-----opening-----\n"); + if(OpenDB("testdb", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + uint64_t ttl = 3; + + InsertData(db, ttl); + GetData(db, false); + + Env::Default()->SleepForMicroseconds(ttl * 1000000); + GetData(db, true); + + delete(db); + printf("-----closing-----\n"); + + // printf("-----recovery-----\n"); + // if(OpenDB("testdb", &db).ok() == false) { + // std::cerr << "open db failed" << std::endl; + // abort(); + // } + // GetData(db, true); + printf("success!\n"); +} + +void GetEarlierData() { +DB *db; + printf("-----opening-----\n"); + if(OpenDB("testdb", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + uint64_t ttl1 = 3; + uint64_t ttl2 = 5; + + InsertData(db, ttl2); + InsertData(db, ttl1, 2); + + //都没过期先找到后插的 + Env::Default()->SleepForMicroseconds(1 * 1000000); + GetData(db, false); + + //再找到前一次 + Env::Default()->SleepForMicroseconds(3 * 1000000); + GetData(db, false); + + delete(db); + printf("-----closing-----\n"); + printf("success!\n"); +} + +int main(int argc, char** argv) { + GetEarlierData(); } -- 2.8.3