From 3fe6636b2e50cad9782007d29a12aeee7bdc5c11 Mon Sep 17 00:00:00 2001 From: alexfisher <1823748191@qq.com> Date: Mon, 28 Oct 2024 20:26:52 +0800 Subject: [PATCH] complete ttl function for memtable read and write --- db/db_impl.cc | 17 ++++++- db/db_impl.h | 4 ++ db/db_test.cc | 3 ++ db/dbformat.cc | 6 ++- db/dbformat.h | 22 ++++++++- db/memtable.cc | 24 +++++++-- db/memtable.h | 3 +- db/write_batch.cc | 18 ++++++- include/leveldb/db.h | 1 + include/leveldb/write_batch.h | 5 +- test/ttl_test.cc | 110 ++++++++++++++++++++++-------------------- 11 files changed, 150 insertions(+), 63 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index f96d245..5897ee7 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1198,6 +1198,10 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { return DB::Put(o, key, val); } +Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val,uint64_t ttl) { + return DB::Put(o, key, val,ttl); +} + Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { return DB::Delete(options, key); } @@ -1487,7 +1491,18 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { WriteBatch batch; - batch.Put(key, value); + batch.Put_for_ttl(key, value,INT32_MAX); + return Write(opt, &batch); +} + + +Status DB::Put(const WriteOptions& opt, const Slice& key, + const Slice& value, uint64_t ttl) { + WriteBatch batch; + // 获取当前时间的时间戳 + time_t now = time(nullptr);//ttl单位为秒 + ttl+=static_cast(now); + batch.Put_for_ttl(key, value,ttl); return Write(opt, &batch); } diff --git a/db/db_impl.h b/db/db_impl.h index c7b0172..6a86427 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -38,6 +38,10 @@ class DBImpl : public DB { // Implementations of the DB interface Status Put(const WriteOptions&, const Slice& key, const Slice& value) override; + + Status Put(const WriteOptions& opt, const Slice& key, + const Slice& value, uint64_t ttl) override; + Status Delete(const WriteOptions&, const Slice& key) override; Status Write(const WriteOptions& options, WriteBatch* updates) override; Status Get(const ReadOptions& options, const Slice& key, diff --git a/db/db_test.cc b/db/db_test.cc index a4a84cd..4a05522 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2117,6 +2117,9 @@ class ModelDB : public DB { Status Put(const WriteOptions& o, const Slice& k, const Slice& v) override { return DB::Put(o, k, v); } + Status Put(const WriteOptions& o, const Slice& k, const Slice& v,uint64_t ttl) override { + return DB::Put(o, k, v,ttl); + } Status Delete(const WriteOptions& o, const Slice& key) override { return DB::Delete(o, key); } diff --git a/db/dbformat.cc b/db/dbformat.cc index 2a5749f..20caea9 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -116,7 +116,7 @@ bool InternalFilterPolicy::KeyMayMatch(const Slice& key, const Slice& f) const { LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) { size_t usize = user_key.size(); - size_t needed = usize + 13; // A conservative estimate + size_t needed = usize + 30; // A conservative estimate char* dst; if (needed <= sizeof(space_)) { dst = space_; @@ -130,6 +130,10 @@ LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) { dst += usize; EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek)); dst += 8; + time_t now = time(nullptr); + uint64_t timestamp = static_cast(now); + EncodeFixed64(dst, timestamp);//大于等于当前时间的key才被视为有效? + dst += 8; end_ = dst; } diff --git a/db/dbformat.h b/db/dbformat.h index a1c30ed..f0b6d1a 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -99,6 +99,23 @@ inline Slice ExtractUserKey(const Slice& internal_key) { // A comparator for internal keys that uses a specified comparator for // the user key portion and breaks ties by decreasing sequence number. + +class ttlInternalKeyComparator: public Comparator{ + private: + const Comparator* user_comparator_; + + public: + explicit ttlInternalKeyComparator(const Comparator* c) : user_comparator_(c) {} + const char* Name() const override; + int Compare(const Slice& a, const Slice& b) const override; + void FindShortestSeparator(std::string* start, + const Slice& limit) const override; + void FindShortSuccessor(std::string* key) const override; + + const Comparator* user_comparator() const { return user_comparator_; } + + int Compare(const InternalKey& a, const InternalKey& b) const; +}; class InternalKeyComparator : public Comparator { private: const Comparator* user_comparator_; @@ -199,13 +216,16 @@ class LookupKey { Slice internal_key() const { return Slice(kstart_, end_ - kstart_); } // Return the user key - Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); } + Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 16); } + + uint64_t ttl() const {return *(uint64_t*)(end_-8);} private: // We construct a char array of the form: // klength varint32 <-- start_ // userkey char[klength] <-- kstart_ // tag uint64 + // ttl uint64 // <-- end_ // The array is a suitable MemTable key. // The suffix starting with "userkey" can be used as an InternalKey. diff --git a/db/memtable.cc b/db/memtable.cc index 4f09340..67287a6 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -74,7 +74,7 @@ class MemTableIterator : public Iterator { Iterator* MemTable::NewIterator() { return new MemTableIterator(&table_); } void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, - const Slice& value) { + const Slice& value,uint64_t ttl) { // Format of an entry is concatenation of: // key_size : varint32 of internal_key.size() // key bytes : char[internal_key.size()] @@ -82,17 +82,23 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, // value_size : varint32 of value.size() // value bytes : char[value.size()] size_t key_size = key.size(); + + size_t val_size = value.size(); size_t internal_key_size = key_size + 8; const size_t encoded_len = VarintLength(internal_key_size) + - internal_key_size + VarintLength(val_size) + + internal_key_size + 8 + VarintLength(val_size) + val_size; char* buf = arena_.Allocate(encoded_len); + //internal_key_size(sizeof(key)+sizeof(sequence+type)+sizeof(ttl)) + //--key--(sequence+type)--ttl--sizeof(val)--val char* p = EncodeVarint32(buf, internal_key_size); std::memcpy(p, key.data(), key_size); p += key_size; EncodeFixed64(p, (s << 8) | type); p += 8; + EncodeFixed64(p,ttl); + p += 8; p = EncodeVarint32(p, val_size); std::memcpy(p, value.data(), val_size); assert(p + val_size == buf + encoded_len); @@ -103,7 +109,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { Slice memkey = key.memtable_key(); Table::Iterator iter(&table_); iter.Seek(memkey.data()); - if (iter.Valid()) { + while(iter.Valid()) { // entry format is: // klength varint32 // userkey char[klength] @@ -114,7 +120,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { // sequence number since the Seek() call above should have skipped // all entries with overly large sequence numbers. const char* entry = iter.key(); - uint32_t key_length; + uint32_t key_length;//internal_key_size const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); if (comparator_.comparator.user_comparator()->Compare( Slice(key_ptr, key_length - 8), key.user_key()) == 0) { @@ -122,7 +128,12 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); switch (static_cast(tag & 0xff)) { case kTypeValue: { - Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + const uint64_t ttl = DecodeFixed64(key_ptr+key_length); + if(ttlassign(v.data(), v.size()); return true; } @@ -131,6 +142,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { return true; } } + else{ + break; + } } return false; } diff --git a/db/memtable.h b/db/memtable.h index 9d986b1..d1e1079 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -54,7 +54,7 @@ class MemTable { // specified sequence number and with the specified type. // Typically value will be empty if type==kTypeDeletion. void Add(SequenceNumber seq, ValueType type, const Slice& key, - const Slice& value); + const Slice& value,uint64_t ttl=INT32_MAX); // If memtable contains a value for key, store it in *value and return true. // If memtable contains a deletion for key, store a NotFound() error @@ -66,6 +66,7 @@ class MemTable { friend class MemTableIterator; friend class MemTableBackwardIterator; + struct KeyComparator { const InternalKeyComparator comparator; explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) {} diff --git a/db/write_batch.cc b/db/write_batch.cc index b54313c..cd47607 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -54,12 +54,16 @@ Status WriteBatch::Iterate(Handler* handler) const { input.remove_prefix(1); switch (tag) { case kTypeValue: + { + uint64_t ttl; + GetVarint64(&input, &ttl); if (GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value)) { - handler->Put(key, value); + handler->Put(key, value,ttl); } else { return Status::Corruption("bad WriteBatch Put"); } + } break; case kTypeDeletion: if (GetLengthPrefixedSlice(&input, &key)) { @@ -102,6 +106,14 @@ void WriteBatch::Put(const Slice& key, const Slice& value) { PutLengthPrefixedSlice(&rep_, value); } +void WriteBatch::Put_for_ttl(const Slice& key, const Slice& value,uint64_t ttl){ + WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); + rep_.push_back(static_cast(kTypeValue)); + PutVarint64(&rep_,ttl); + PutLengthPrefixedSlice(&rep_, key); + PutLengthPrefixedSlice(&rep_, value); +} + void WriteBatch::Delete(const Slice& key) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); rep_.push_back(static_cast(kTypeDeletion)); @@ -122,6 +134,10 @@ class MemTableInserter : public WriteBatch::Handler { mem_->Add(sequence_, kTypeValue, key, value); sequence_++; } + void Put(const Slice& key, const Slice& value,uint64_t ttl) override { + mem_->Add(sequence_, kTypeValue, key, value,ttl); + sequence_++; + } void Delete(const Slice& key) override { mem_->Add(sequence_, kTypeDeletion, key, Slice()); sequence_++; diff --git a/include/leveldb/db.h b/include/leveldb/db.h index bf4eec5..1c3d819 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -66,6 +66,7 @@ class LEVELDB_EXPORT DB { virtual Status Put(const WriteOptions& options, const Slice& key, const Slice& value) = 0; + // Remove the database entry (if any) for "key". Returns OK on // success, and a non-OK status on error. It is not an error if "key" // did not exist in the database. diff --git a/include/leveldb/write_batch.h b/include/leveldb/write_batch.h index 94d4115..536bb5d 100644 --- a/include/leveldb/write_batch.h +++ b/include/leveldb/write_batch.h @@ -36,9 +36,9 @@ class LEVELDB_EXPORT WriteBatch { public: virtual ~Handler(); virtual void Put(const Slice& key, const Slice& value) = 0; + virtual void Put(const Slice& key, const Slice& value,uint64_t ttl){}; virtual void Delete(const Slice& key) = 0; }; - WriteBatch(); // Intentionally copyable. @@ -50,6 +50,9 @@ class LEVELDB_EXPORT WriteBatch { // Store the mapping "key->value" in the database. void Put(const Slice& key, const Slice& value); + void Put_for_ttl(const Slice& key, const Slice& value,uint64_t ttl); + + // If the database contains a mapping for "key", erase it. Else do nothing. void Delete(const Slice& key); diff --git a/test/ttl_test.cc b/test/ttl_test.cc index 06f4cda..ba84866 100644 --- a/test/ttl_test.cc +++ b/test/ttl_test.cc @@ -10,7 +10,7 @@ using namespace leveldb; constexpr int value_size = 2048; constexpr int data_size = 128 << 20; - +std::vector> gen_v; Status OpenDB(std::string dbName, DB **db) { Options options; options.create_if_missing = true; @@ -19,34 +19,37 @@ Status OpenDB(std::string dbName, DB **db) { void InsertData(DB *db, uint64_t ttl/* second */) { WriteOptions writeOptions; - int key_num = data_size / value_size; + int key_num = 50; srand(static_cast(time(0))); for (int i = 0; i < key_num; i++) { - int key_ = rand() % key_num+1; + //int key_ = rand() % key_num+1; + int key_=i; std::string key = std::to_string(key_); std::string value(value_size, 'a'); - db->Put(writeOptions, key, value, ttl); + gen_v.push_back({key,value}); + auto status=db->Put(writeOptions, key, value, ttl); + assert(status.ok()); } } -void GetData(DB *db, int size = (1 << 30)) { - ReadOptions readOptions; - int key_num = data_size / value_size; +// void GetData(DB *db, int size = (1 << 30)) { +// ReadOptions readOptions; +// int key_num = data_size / value_size; - // 点查 - srand(static_cast(time(0))); - for (int i = 0; i < 100; i++) { - int key_ = rand() % key_num+1; - std::string key = std::to_string(key_); - std::string value; - db->Get(readOptions, key, &value); - } -} +// // 点查 +// srand(static_cast(time(0))); +// for (int i = 0; i < 100; i++) { +// int key_ = rand() % key_num+1; +// std::string key = std::to_string(key_); +// std::string value; +// db->Get(readOptions, key, &value); +// } +// } TEST(TestTTL, ReadTTL) { DB *db; - if(OpenDB("testdb", &db).ok() == false) { + if(OpenDB("testdb_4", &db).ok() == false) { std::cerr << "open db failed" << std::endl; abort(); } @@ -58,10 +61,9 @@ TEST(TestTTL, ReadTTL) { ReadOptions readOptions; Status status; int key_num = data_size / value_size; - srand(static_cast(time(0))); - for (int i = 0; i < 100; i++) { - int key_ = rand() % key_num+1; - std::string key = std::to_string(key_); + //srand(static_cast(time(0))); + for (auto pr:gen_v) { + std::string key = pr.first; std::string value; status = db->Get(readOptions, key, &value); ASSERT_TRUE(status.ok()); @@ -69,42 +71,46 @@ TEST(TestTTL, ReadTTL) { Env::Default()->SleepForMicroseconds(ttl * 1000000); - for (int i = 0; i < 100; i++) { - int key_ = rand() % key_num+1; - std::string key = std::to_string(key_); + for (auto pr:gen_v) { + std::string key = pr.first; std::string value; - status = db->Get(readOptions, key, &value); + Slice slice_key= Slice(key); + status = db->Get(readOptions,slice_key, &value); ASSERT_FALSE(status.ok()); } } -TEST(TestTTL, CompactionTTL) { - DB *db; - - if(OpenDB("testdb", &db).ok() == false) { - std::cerr << "open db failed" << std::endl; - abort(); - } - - uint64_t ttl = 20; - InsertData(db, ttl); - - leveldb::Range ranges[1]; - ranges[0] = leveldb::Range("-", "A"); - uint64_t sizes[1]; - db->GetApproximateSizes(ranges, 1, sizes); - ASSERT_GT(sizes[0], 0); - - Env::Default()->SleepForMicroseconds(ttl * 1000000); - - db->CompactRange(nullptr, nullptr); - - leveldb::Range ranges[1]; - ranges[0] = leveldb::Range("-", "A"); - uint64_t sizes[1]; - db->GetApproximateSizes(ranges, 1, sizes); - ASSERT_EQ(sizes[0], 0); -} +// TEST(TestTTL, CompactionTTL) { +// DB *db; + +// if(OpenDB("testdb", &db).ok() == false) { +// std::cerr << "open db failed" << std::endl; +// abort(); +// } + +// uint64_t ttl = 20; +// InsertData(db, ttl); + +// { +// leveldb::Range ranges[1]; +// ranges[0] = leveldb::Range("-", "A"); +// uint64_t sizes[1]; +// db->GetApproximateSizes(ranges, 1, sizes); +// ASSERT_GT(sizes[0], 0); + +// } +// Env::Default()->SleepForMicroseconds(ttl * 1000000); + +// db->CompactRange(nullptr, nullptr); + +// { +// leveldb::Range ranges[1]; +// ranges[0] = leveldb::Range("-", "A"); +// uint64_t sizes[1]; +// db->GetApproximateSizes(ranges, 1, sizes); +// ASSERT_EQ(sizes[0], 0); +// } +// } int main(int argc, char** argv) {