diff --git a/db/db_impl.cc b/db/db_impl.cc index 22ab80d..9ee6db4 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -924,10 +924,15 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { bool has_current_user_key = false; SequenceNumber last_sequence_for_key = kMaxSequenceNumber; while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { - auto x=input->value(); - uint64_t ttl=*(uint64_t*)(x.data()+x.size()-sizeof(uint64_t)); - time_t now = time(nullptr); - if(ttl < static_cast(now))return; + auto x=input->value(); // 获取键值对 value + uint64_t ttl=*(uint64_t*)(x.data()+x.size()-sizeof(uint64_t));// 将 TTL 从 new_data 的末尾取出 + time_t now = time(nullptr); // 获得当前时间 + // 如果 TTL 超过当前时间,说明数据已经过期 + if(ttl < static_cast(now)){ + Log(options_.info_log, "delete record for ttl"); + input->Next(); // 将 input 指向下一个键值对 + continue; + } // Prioritize immutable compaction work if (has_imm_.load(std::memory_order_relaxed)) { const uint64_t imm_start = env_->NowMicros(); @@ -1497,26 +1502,30 @@ Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { WriteBatch batch; int len=value.size()+sizeof(uint64_t); char* new_data=new char[len]; - time_t now = time(nullptr);//ttl单位为秒 - uint64_t ttl=INT64_MAX; - memcpy(new_data,value.data(),value.size()); - memcpy(new_data+len-sizeof(uint64_t),(char*)(&ttl),sizeof(uint64_t)); - Slice newValue=Slice(new_data,len); - batch.Put(key, newValue); - return Write(opt, &batch); + time_t now = time(nullptr); // 获取当前时间,单位为秒 + uint64_t ttl = INT64_MAX; // 设置默认的 TTL 为最大值(即永不过期) + memcpy(new_data, value.data(), value.size()); // 将实际的值复制到 new_data 中 + memcpy(new_data + len - sizeof(uint64_t), (char*)(&ttl), sizeof(uint64_t)); // 将 TTL 复制到 new_data 的末尾 + + Slice newValue = Slice(new_data, len); // 创建一个新的 Slice 对象 + batch.Put(key, newValue); // 将键值对放入 WriteBatch 中 + return Write(opt, &batch); // 写入数据库 } -Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value,uint64_t ttl) { +Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value, uint64_t ttl) { WriteBatch batch; - int len=value.size()+sizeof(uint64_t); - char* new_data=new char[len]; - time_t now = time(nullptr);//ttl单位为秒 - ttl+=static_cast(now); - memcpy(new_data,value.data(),value.size()); - memcpy(new_data+len-sizeof(uint64_t),(char*)(&ttl),sizeof(uint64_t)); - Slice newValue=Slice(new_data,len); - batch.Put(key, newValue); - return Write(opt, &batch); + int len = value.size() + sizeof(uint64_t); + char* new_data = new char[len]; + time_t now = time(nullptr); // 获取当前时间,单位为秒 + ttl += static_cast(now); // 将当前时间加上 TTL,计算过期时间 + + memcpy(new_data, value.data(), value.size()); // 将实际的值复制到 new_data 中 + memcpy(new_data + len - sizeof(uint64_t), (char*)(&ttl), sizeof(uint64_t)); // 将 TTL 复制到 new_data 的末尾 + + Slice newValue = Slice(new_data, len); // 创建一个新的 Slice 对象 + batch.Put(key, newValue); // 将键值对放入 WriteBatch 中 + delete []new_data; // 释放分配的内存,防止内存泄漏 + return Write(opt, &batch); // 写入数据库 } Status DB::Delete(const WriteOptions& opt, const Slice& key) { diff --git a/db/db_impl.h b/db/db_impl.h index f9ae1ab..771dbba 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -143,6 +143,8 @@ class DBImpl : public DB { static void BGWork(void* db); void BackgroundCall(); void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void ttl_ManualCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void CleanupCompaction(CompactionState* compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status DoCompactionWork(CompactionState* compact) diff --git a/db/memtable.cc b/db/memtable.cc index 1cf4b33..7638bb6 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -81,6 +81,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, // tag : uint64((sequence << 8) | type) // value_size : varint32 of value.size() // value bytes : char[value.size()] + // ttl : uint64(now) size_t key_size = key.size(); size_t val_size = value.size(); size_t internal_key_size = key_size + 8; @@ -121,12 +122,14 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { // Correct user key const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); switch (static_cast(tag & 0xff)) { + // kTypeValue 对应被插入的数据 case kTypeValue: { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); - uint64_t ttl=*(uint64_t*)(v.data()+v.size()-sizeof(uint64_t)); + uint64_t ttl=*(uint64_t*)(v.data()+v.size()-sizeof(uint64_t)); // 将 TTL 从 new_data 的末尾取出 time_t now = time(nullptr); + // 如果 TTL 超过当前时间,说明数据已经过期 if(ttl < static_cast(now)){ - iter.Next(); + iter.Next(); // 将 iter 指向下一个键值对 continue; } value->assign(v.data(), v.size()-sizeof(uint64_t)); diff --git a/db/version_set.cc b/db/version_set.cc index 109e5c9..2b25849 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -259,6 +259,7 @@ struct Saver { std::string* value; }; } // namespace + static void SaveValue(void* arg, const Slice& ikey, const Slice& v) { Saver* s = reinterpret_cast(arg); ParsedInternalKey parsed_key; @@ -266,15 +267,11 @@ static void SaveValue(void* arg, const Slice& ikey, const Slice& v) { s->state = kCorrupt; } else { if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) { + // kTypeValue 对应被插入的数据 if(parsed_key.type == kTypeValue){ time_t now = time(nullptr); - uint64_t ttl=*(uint64_t*)(v.data()+v.size()-sizeof(uint64_t)); - if(ttl < static_cast(now))return; - } - if(parsed_key.type == kTypeValue){ - time_t now = time(nullptr); - uint64_t ttl=*(uint64_t*)(v.data()+v.size()-8); - if(ttl < static_cast(now))return; + uint64_t ttl=*(uint64_t*)(v.data()+v.size()-sizeof(uint64_t)); // 将 TTL 从 new_data 的末尾取出 + if(ttl < static_cast(now))return; // 如果 TTL 超过当前时间,说明数据已经过期,直接返回 } s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted; if (s->state == kFound) { diff --git a/ttl.assets/image-20241105215210467.png b/ttl.assets/image-20241105215210467.png new file mode 100644 index 0000000..79a2017 Binary files /dev/null and b/ttl.assets/image-20241105215210467.png differ diff --git a/ttl.assets/image-20241106141245806.png b/ttl.assets/image-20241106141245806.png new file mode 100644 index 0000000..40c6f67 Binary files /dev/null and b/ttl.assets/image-20241106141245806.png differ diff --git a/ttl.assets/image-20241106152151687.png b/ttl.assets/image-20241106152151687.png new file mode 100644 index 0000000..72a79bd Binary files /dev/null and b/ttl.assets/image-20241106152151687.png differ diff --git a/ttl.assets/image-20241106154658482.png b/ttl.assets/image-20241106154658482.png new file mode 100644 index 0000000..463c460 Binary files /dev/null and b/ttl.assets/image-20241106154658482.png differ diff --git a/ttl.assets/image-20241106173117857.png b/ttl.assets/image-20241106173117857.png new file mode 100644 index 0000000..dcc9f6a Binary files /dev/null and b/ttl.assets/image-20241106173117857.png differ diff --git a/ttl.assets/old_record-17307966031002.png b/ttl.assets/old_record-17307966031002.png new file mode 100644 index 0000000..3510b52 Binary files /dev/null and b/ttl.assets/old_record-17307966031002.png differ diff --git a/ttl.assets/old_record-17308259248204.png b/ttl.assets/old_record-17308259248204.png new file mode 100644 index 0000000..5f71dec Binary files /dev/null and b/ttl.assets/old_record-17308259248204.png differ diff --git a/ttl.assets/old_record.png b/ttl.assets/old_record.png new file mode 100644 index 0000000..c99b5b6 Binary files /dev/null and b/ttl.assets/old_record.png differ diff --git a/ttl.md b/ttl.md new file mode 100644 index 0000000..9680945 --- /dev/null +++ b/ttl.md @@ -0,0 +1,666 @@ + + +# LEVELDB TTL + +#### 小组成员 + +谢瑞阳 徐翔宇 + + + +## 整体介绍 + +我们在整体实验过程中,经历了先后两版对于TLL功能的设计和开发过程,在开发第一版TTL功能的过程中,我们遇到了一些困难。经过思考讨论后,我们重构了全部代码,并基于新的设计快速的完成了TTL功能以及对应的垃圾回收功能。 + +我们仓库的地址为:https://gitea.shuishan.net.cn/10225101483/XOY-Leveldb + +其中每个分支对应的内容为: + +master分支:主分支:第二版TTL设计。 + +xry分支:第一版TTL设计,完成了在memtable中的带有TTL数据的存取(即在小数据量情况下可以正确运行测试脚本中的第一个测试)。 + +xxy分支、new_version分支:第二版TTL设计。其中new_version分支与master分支内容相同。 + +### 设计思路 + +要完成TTL功能,首先也是最重要的内容即是要在原本leveldb存储数据的格式上增加一块用于存储TTL的空间,并且想明白如何对其进行读取,以及需要对其他存储空间(key,sequence number,value)的存取需要作出什么改动。 + +在写入数据时,涉及到编码的主要函数有以下三个: + +leveldb通过WriteBatch::Put对传入的键值对进行编码,写入writebatch中。 + +```c++ +void WriteBatch::Put(const Slice& key, const Slice& value) { + WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); + rep_.push_back(static_cast(kTypeValue)); + PutLengthPrefixedSlice(&rep_, key); + PutLengthPrefixedSlice(&rep_, value); +} +``` + +之后,使用WriteBatch::Iterate将一个writebatch中的所有键值对依次解析并使用handler放入memtable。 + +```c++ +Slice input(rep_); + if (input.size() < kHeader) { + return Status::Corruption("malformed WriteBatch (too small)"); + } + + input.remove_prefix(kHeader); + Slice key, value; + int found = 0; + while (!input.empty()) { + found++; + char tag = input[0]; + input.remove_prefix(1); + switch (tag) { + case kTypeValue: + if (GetLengthPrefixedSlice(&input, &key) && + GetLengthPrefixedSlice(&input, &value)) { + handler->Put(key, value); + } else { + return Status::Corruption("bad WriteBatch Put"); + } + break; + case kTypeDeletion: + if (GetLengthPrefixedSlice(&input, &key)) { + handler->Delete(key); + } else { + return Status::Corruption("bad WriteBatch Delete"); + } + break; + default: + return Status::Corruption("unknown WriteBatch tag"); + } + } +``` + +之后,使用MemTable::Add将解析出来的数据重新编码,并插入至memtable,这种新的编码形式同样适用于SSTable。 + +```c++ +void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, + const Slice& value) { + // Format of an entry is concatenation of: + // key_size : varint32 of internal_key.size() + // key bytes : char[internal_key.size()] + // tag : uint64((sequence << 8) | type) + // value_size : varint32 of value.size() + // value bytes : char[value.size()] + size_t key_size = key.size(); + size_t val_size = value.size(); + size_t internal_key_size = key_size + 8; + const size_t encoded_len = VarintLength(internal_key_size) + + internal_key_size + VarintLength(val_size) + + val_size; + char* buf = arena_.Allocate(encoded_len); + char* p = EncodeVarint32(buf, internal_key_size); + std::memcpy(p, key.data(), key_size); + p += key_size; + EncodeFixed64(p, (s << 8) | type); + p += 8; + p = EncodeVarint32(p, val_size); + std::memcpy(p, value.data(), val_size); + assert(p + val_size == buf + encoded_len); + table_.Insert(buf); +} +``` + +这是原本leveldb在memtable和sstable中存储单个键值对时使用的数据结构: + +![old_record](ttl.assets/old_record.png) + +其中,key和value对应着用户输入的键值对,而sequenceNumber是该键值对在leveldb内部的序列号(越大越新),type则标识该记录的类型。 + +#### 第一版设计:增加在中 + +我们的第一版TTL存储设计如下: + +![old_record](ttl.assets/old_record-17307966031002.png) + +在我们最初的思考中,TTL与SequenceNumber、Type一样,属于和主要的存储内容(key,value)分割开的存储数据,因此我们选择将TTL加在SequenceNumber和Type后面(不加在中间和前面,以免破坏原先InternalKey的结构)。 + +但在实现过程中,我们发现在中间增加TTL的形式会导致许多代码变动,因为对数据编码形式的修改会导致InternalKey和LookupKey等基础的编码类及函数需要进行改动,它们在一些与TTL无关的地方似乎也被复用(比如versionEdit和大量测试函数),因此我们认为对其进行修改会非常复杂,可能导致leveldb其他模块受到牵连,不符合我们对TTL轻量级改动的构想。因此我们进行了讨论,并产生了如下的新版设计 + +#### 第二版设计:放入Value + +我们的第二版TTL存储设计如下: + +![old_record](ttl.assets/old_record-17308259248204.png) + +由于 leveldb 本身对于 key、value、sequencenumber 的解析已经非常完善并且有些耦合,因此我们想要在不破坏 leveldb 原先的解析方式下完成 TTL 功能,我们想到的最好最轻量级的方式,就是将 TTL 数据作为 Value的一部分(在原 Value 后追加 8byte 作为 TTL )进行存储,仅当要解析 Value 时(即读取数据时或合并时),才会在同时对 TTL 进行解析。由此一来,我们无需改变原本的存储结构。事实证明,我们在很短时间内就完成了这一版 TTL的全部所需功能。 + + + +### 实现流程 + +### 第一版设计 + +首先,我们需要修改在 writebatch 中数据的存取格式(主要是对 Value 的读取需要向后偏移8位)。我们在 WriteBatch 中新增了 Put_for_ttl 函数,将 ttl 放置在 key,value 之前。 + +```c++ +void WriteBatch::Put_for_ttl(const Slice& key, const Slice& value,uint64_t ttl){ + WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); + rep_.push_back(static_cast(kTypeValue)); + PutVarint64(&rep_,ttl); + PutLengthPrefixedSlice(&rep_, key); + PutLengthPrefixedSlice(&rep_, value); +} +``` + + + +同时,对 WriteBatch 的迭代器也要进行修改。 + +![image-20241106141245806](ttl.assets/image-20241106141245806.png) + + + +使用 Add 将键值对加入 memtable 时,也要在 key 和 value 之间插入参数 ttl。具体位置在 key 末尾的 tag 之后,使用 EncodeFixed64 放入大小为 8 的 ttl 。 + +```c++ +void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, + const Slice& value,uint64_t ttl) { + // Format of an entry is concatenation of: + // key_size : varint32 of internal_key.size() + // key bytes : char[internal_key.size()] + // tag : uint64((sequence << 8) | type) + // ttl : uint64(now) + // value_size : varint32 of value.size() + // value bytes : char[value.size()] + size_t key_size = key.size(); + + size_t val_size = value.size(); + size_t internal_key_size = key_size + 8; + const size_t encoded_len = VarintLength(internal_key_size) + + internal_key_size + 8 + VarintLength(val_size) + + val_size; + char* buf = arena_.Allocate(encoded_len); + //internal_key_size(sizeof(key)+sizeof(sequence+type)+sizeof(ttl)) + //--key--(sequence+type)--ttl--sizeof(val)--val + char* p = EncodeVarint32(buf, internal_key_size); + std::memcpy(p, key.data(), key_size); + p += key_size; + EncodeFixed64(p, (s << 8) | type); + p += 8; + EncodeFixed64(p,ttl); + p += 8; + p = EncodeVarint32(p, val_size); + std::memcpy(p, value.data(), val_size); + assert(p + val_size == buf + encoded_len); + table_.Insert(buf); +} +``` + + + +从 memtable 获取插入的数据时,需要读出 key, value 之间的 ttl 进行检查( line15 ) + +```c++ +bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { + Slice memkey = key.memtable_key(); + Table::Iterator iter(&table_); + iter.Seek(memkey.data()); + while(iter.Valid()) { + const char* entry = iter.key(); + uint32_t key_length;//internal_key_size + const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); + if (comparator_.comparator.user_comparator()->Compare( + Slice(key_ptr, key_length - 8), key.user_key()) == 0) { + // Correct user key + const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); + switch (static_cast(tag & 0xff)) { + case kTypeValue: { + const uint64_t ttl = DecodeFixed64(key_ptr+key_length); + if(ttlassign(v.data(), v.size()); + return true; + } + case kTypeDeletion: + *s = Status::NotFound(Slice()); + return true; + } + } + else{ + break;// 当 key 不一致的时候返回 false + } + } + return false; +} +``` + + + +在开始查找时,将当前的时间写入 LookupKey,如果查找到的数据 ttl 超过当前时间,那么就会抛弃该条数据。 + +![image-20241106152151687](ttl.assets/image-20241106152151687.png) + + + +通过 blockbuilder 向 sstable 插入数据时(新建 level0/1/2)时使用 add 函数,需要将 ttl 通过PutVarint64 编码成 `Varint64` 形式,放入 key 与 value 之间 + +```c++ +void BlockBuilder::Add(const Slice& key, uint64_t ttl, const Slice& value) { + ··· + ··· + + // Add "" to buffer_ + PutVarint32(&buffer_, shared); + PutVarint32(&buffer_, non_shared); + PutVarint32(&buffer_, value.size()); + + // Add string delta to buffer_ followed by value + buffer_.append(key.data() + shared, non_shared); + PutVarint64(&buffer_, ttl);//加入ttl + buffer_.append(value.data(), value.size()); + + // Update state + last_key_.resize(shared); + last_key_.append(key.data() + shared, non_shared); + assert(Slice(last_key_) == key); + counter_++; +} +``` + + + +通过 TableBuilder 将合并后的数据插入新的 sstable,使用 add 函数 + +![image-20241106154658482](ttl.assets/image-20241106154658482.png) + +```c++ +void TableBuilder::Add(const Slice& key, uint64_t ttl, const Slice& value) { + time_t now = time(nullptr); + if(ttl < static_cast(now))return; // 再次检查,是否超时(理论上在上层DoCompactionWork时已经检查过一次,都是为了保障正确性,冗余的测试并不会造成很大的性能瓶颈) + + ··· + ··· + + if (r->filter_block != nullptr) { + r->filter_block->AddKey(key); + } + + r->last_key.assign(key.data(), key.size()); + r->num_entries++; + r->data_block.Add(key, ttl, value); //加入block时仍旧需要使用 BlockBuilder::Add + + const size_t estimated_block_size = r->data_block.CurrentSizeEstimate(); + if (estimated_block_size >= r->options.block_size) { + Flush(); + } +} +``` + + + +在 DoCompactionWork 中,合并是会检查数据 ttl 是否到期,如果到期则会丢弃。 + +![image-20241106173117857](ttl.assets/image-20241106173117857.png) + + + + + +#### 第二版设计 + +##### 原先的Put + +```c++ +// Convenience methods +Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { + return DB::Put(o, key, val); +} + +// 加入ttl功能后 +Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val,uint64_t ttl) { + return DB::Put(o, key, val,ttl); +} +``` + + + +##### 老版本的 `DB::Put` 方法仅接受键和值,并将值直接存入数据库。 + +```c++ +Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { + WriteBatch batch; + batch.Put(key, value); + return Write(opt, &batch); +} +``` + + + +##### 新版本增加了一个可选参数 `ttl`(过期时间),允许用户指定存储的值的生存时间。 + +```c++ +Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { + WriteBatch batch; + int len = value.size() + sizeof(uint64_t); + char* new_data = new char[len]; + time_t now = time(nullptr); // 获取当前时间,单位为秒 + uint64_t ttl = INT64_MAX; // 设置默认的 TTL 为最大值(即永不过期) + + memcpy(new_data, value.data(), value.size()); // 将实际的值复制到 new_data 中 + memcpy(new_data + len - sizeof(uint64_t), (char*)(&ttl), sizeof(uint64_t)); // 将 TTL 复制到 new_data 的末尾 + + Slice newValue = Slice(new_data, len); // 创建一个新的 Slice 对象 + batch.Put(key, newValue); // 将键值对放入 WriteBatch 中 + return Write(opt, &batch); // 写入数据库 +} + + +Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value, uint64_t ttl) { + WriteBatch batch; + int len = value.size() + sizeof(uint64_t); + char* new_data = new char[len]; + time_t now = time(nullptr); // 获取当前时间,单位为秒 + ttl += static_cast(now); // 将当前时间加上 TTL,计算过期时间 + + memcpy(new_data, value.data(), value.size()); // 将实际的值复制到 new_data 中 + memcpy(new_data + len - sizeof(uint64_t), (char*)(&ttl), sizeof(uint64_t)); // 将 TTL 复制到 new_data 的末尾 + + Slice newValue = Slice(new_data, len); // 创建一个新的 Slice 对象 + batch.Put(key, newValue); // 将键值对放入 WriteBatch 中 + delete []new_data; // 释放分配的内存,防止内存泄漏 + return Write(opt, &batch); // 写入数据库 +} +``` + + + +##### 合并sstable内的数据 + +```c++ +Status DBImpl::DoCompactionWork(CompactionState* compact) { +··· +··· + Iterator* input = versions_->MakeInputIterator(compact->compaction); + + // Release mutex while we're actually doing the compaction work + mutex_.Unlock(); + + input->SeekToFirst(); + Status status; + ParsedInternalKey ikey; + std::string current_user_key; + bool has_current_user_key = false; + SequenceNumber last_sequence_for_key = kMaxSequenceNumber; + while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { + auto x=input->value(); // 获取键值对 value + uint64_t ttl=*(uint64_t*)(x.data()+x.size()-sizeof(uint64_t));// 将 TTL 从 new_data 的末尾取出 + time_t now = time(nullptr); // 获得当前时间 + // 如果 TTL 超过当前时间,说明数据已经过期 + if(ttl < static_cast(now)){ + Log(options_.info_log, "delete record for ttl"); + input->Next(); // 将 input 指向下一个键值对 + continue; + } + // Prioritize immutable compaction work + if (has_imm_.load(std::memory_order_relaxed)) { + const uint64_t imm_start = env_->NowMicros(); + mutex_.Lock(); + if (imm_ != nullptr) { + CompactMemTable(); + // Wake up MakeRoomForWrite() if necessary. + background_work_finished_signal_.SignalAll(); + } + mutex_.Unlock(); + imm_micros += (env_->NowMicros() - imm_start); + } +··· +··· +} +``` + + + +##### 用于处理从sstable中读取的键值对 + +```c++ +static void SaveValue(void* arg, const Slice& ikey, const Slice& v) { + Saver* s = reinterpret_cast(arg); + ParsedInternalKey parsed_key; + if (!ParseInternalKey(ikey, &parsed_key)) { + s->state = kCorrupt; + } else { + if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) { + // kTypeValue 对应被插入的数据 + if(parsed_key.type == kTypeValue){ + time_t now = time(nullptr); + uint64_t ttl=*(uint64_t*)(v.data()+v.size()-sizeof(uint64_t)); // 将 TTL 从 new_data 的末尾取出 + if(ttl < static_cast(now))return; // 如果 TTL 超过当前时间,说明数据已经过期,直接返回 + } + s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted; + if (s->state == kFound) { + s->value->assign(v.data(), v.size()-sizeof(uint64_t)); + } + } + } +} +``` + + + +##### 读取memtable内的数据 + +```c++ +bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { + Slice memkey = key.memtable_key(); + Table::Iterator iter(&table_); + iter.Seek(memkey.data()); + while (iter.Valid()) { + const char* entry = iter.key(); + uint32_t key_length; + const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); + if (comparator_.comparator.user_comparator()->Compare( + Slice(key_ptr, key_length - 8), key.user_key()) == 0) { + // Correct user key + const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); + switch (static_cast(tag & 0xff)) { + // kTypeValue 对应被插入的数据 + case kTypeValue: { + Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + uint64_t ttl=*(uint64_t*)(v.data()+v.size()-sizeof(uint64_t)); // 将 TTL 从 new_data 的末尾取出 + time_t now = time(nullptr); + // 如果 TTL 超过当前时间,说明数据已经过期 + if(ttl < static_cast(now)){ + iter.Next(); // 将 iter 指向下一个键值对 + continue; + } + value->assign(v.data(), v.size()-sizeof(uint64_t)); + + return true; + } + case kTypeDeletion: + *s = Status::NotFound(Slice()); + return true; + } + } + else break; + } + return false; +} +``` + + + +##### 测试 + +```c++ +#include "gtest/gtest.h" +#include "leveldb/env.h" +#include "leveldb/db.h" + +using namespace leveldb; + +constexpr int value_size = 2048; +constexpr int data_size = 128 << 20; + +Status OpenDB(std::string dbName, DB **db) { + Options options; + options.create_if_missing = true; + return DB::Open(options, dbName, db); +} + +void InsertData(DB *db, uint64_t ttl/* second */) { + WriteOptions writeOptions; + int key_num = data_size / value_size; + srand(0); + + for (int i = 0; i < key_num; i++) { + int key_ = rand() % key_num+1; + std::string key = std::to_string(key_); + std::string value(value_size, 'a'); + db->Put(writeOptions, key, value, ttl); + } +} + +void GetData(DB *db, int size = (1 << 30)) { + ReadOptions readOptions; + int key_num = data_size / value_size; + + // 点查 + srand(0); + for (int i = 0; i < 100; i++) { + int key_ = rand() % key_num+1; + std::string key = std::to_string(key_); + std::string value; + db->Get(readOptions, key, &value); + } +} + +TEST(TestTTL, ReadTTL) { + DB *db; + if(OpenDB("testdb", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + uint64_t ttl = 20; + + InsertData(db, ttl); + + ReadOptions readOptions; + Status status; + int key_num = data_size / value_size; + srand(0); + for (int i = 0; i < 100; i++) { + int key_ = rand() % key_num+1; + std::string key = std::to_string(key_); + std::string value; + status = db->Get(readOptions, key, &value); + ASSERT_TRUE(status.ok()); + } + + Env::Default()->SleepForMicroseconds((ttl+1) * 1000000); + + for (int i = 0; i < 100; i++) { + int key_ = rand() % key_num+1; + std::string key = std::to_string(key_); + std::string value; + status = db->Get(readOptions, key, &value); + ASSERT_FALSE(status.ok()); + } + + delete db; +} + +TEST(TestTTL, CompactionTTL) { + DB *db; + + if(OpenDB("testdb", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + + uint64_t ttl = 20; + InsertData(db, ttl); + + leveldb::Range ranges[1]; + ranges[0] = leveldb::Range("-", "A"); + uint64_t sizes[1]; + db->GetApproximateSizes(ranges, 1, sizes); + ASSERT_GT(sizes[0], 0); + + Env::Default()->SleepForMicroseconds((ttl+1) * 1000000); + + db->CompactRange(nullptr, nullptr); + + leveldb::Range ranges_1[1]; + ranges[0] = leveldb::Range("-", "A"); + uint64_t sizes_1[1]; + db->GetApproximateSizes(ranges_1, 1, sizes_1); + ASSERT_EQ(sizes_1[0], 0); + + delete db; +} + + +TEST(TestTTL, OurTTL) { + DB *db; + WriteOptions writeOptions; + ReadOptions readOptions; + if(OpenDB("testdb_for_XOY", &db).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + for (int i = 0; i < 10000; i++) { + std::string key = std::to_string(i); + std::string value = std::to_string(i); + db->Put(writeOptions, key, value); + } + for (int i = 0; i < 10000; i++) { + std::string key = std::to_string(i); + std::string value = std::to_string(i*2); + db->Put(writeOptions, key, value, 30); + } + for (int i = 0; i < 10000; i++) { + std::string key = std::to_string(i); + std::string value = std::to_string(i*3); + db->Put(writeOptions, key, value, 15); + } + + for (int i = 0; i < 10000; i++) { + std::string key = std::to_string(i); + std::string value; + Status status = db->Get(readOptions, key, &value); + ASSERT_TRUE(status.ok()); + ASSERT_TRUE(value==std::to_string(i*3)); + } + + Env::Default()->SleepForMicroseconds((15+1) * 1000000); + + for (int i = 0; i < 10000; i++) { + std::string key = std::to_string(i); + std::string value; + Status status = db->Get(readOptions, key, &value); + ASSERT_TRUE(status.ok()); + ASSERT_TRUE(value==std::to_string(i*2)); + } + + Env::Default()->SleepForMicroseconds((15+1) * 1000000); + + for (int i = 0; i < 10000; i++) { + std::string key = std::to_string(i); + std::string value; + Status status = db->Get(readOptions, key, &value); + ASSERT_TRUE(status.ok()); + ASSERT_TRUE(value==std::to_string(i)); + } + + delete db; +} + + +int main(int argc, char** argv) { + // All tests currently run with the same read-only file limits. + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} +``` +