|
@ -0,0 +1,671 @@ |
|
|
|
|
|
# LEVELDB TTL |
|
|
|
|
|
|
|
|
|
|
|
#### 小组成员 |
|
|
|
|
|
|
|
|
|
|
|
谢瑞阳 徐翔宇 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
## 整体介绍 |
|
|
|
|
|
|
|
|
|
|
|
我们在整体实验过程中,经历了先后两版对于TLL功能的设计和开发过程,在开发第一版TTL功能的过程中,我们遇到了一些困难。经过思考讨论后,我们重构了全部代码,并基于新的设计快速的完成了TTL功能以及对应的垃圾回收功能。 |
|
|
|
|
|
|
|
|
|
|
|
我们仓库的地址为:https://gitea.shuishan.net.cn/10225101483/XOY-Leveldb |
|
|
|
|
|
|
|
|
|
|
|
其中每个分支对应的内容为: |
|
|
|
|
|
|
|
|
|
|
|
master分支:主分支:第二版TTL设计。 |
|
|
|
|
|
|
|
|
|
|
|
xry分支:第一版TTL设计,完成了在memtable中的带有TTL数据的存取(即在小数据量情况下可以正确运行测试脚本中的第一个测试)。 |
|
|
|
|
|
|
|
|
|
|
|
xxy分支、new_version分支:第二版TTL设计。其中new_version分支与master分支内容相同。 |
|
|
|
|
|
|
|
|
|
|
|
### 设计思路 |
|
|
|
|
|
|
|
|
|
|
|
要完成TTL功能,首先也是最重要的内容即是要在原本leveldb存储数据的格式上增加一块用于存储TTL的空间,并且想明白如何对其进行读取,以及需要对其他存储空间(key,sequence number,value)的存取需要作出什么改动。 |
|
|
|
|
|
|
|
|
|
|
|
在写入数据时,涉及到编码的主要函数有以下三个: |
|
|
|
|
|
|
|
|
|
|
|
leveldb通过WriteBatch::Put对传入的键值对进行编码,写入writebatch中。 |
|
|
|
|
|
|
|
|
|
|
|
```c++ |
|
|
|
|
|
void WriteBatch::Put(const Slice& key, const Slice& value) { |
|
|
|
|
|
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); |
|
|
|
|
|
rep_.push_back(static_cast<char>(kTypeValue)); |
|
|
|
|
|
PutLengthPrefixedSlice(&rep_, key); |
|
|
|
|
|
PutLengthPrefixedSlice(&rep_, value); |
|
|
|
|
|
} |
|
|
|
|
|
``` |
|
|
|
|
|
|
|
|
|
|
|
之后,使用WriteBatch::Iterate将一个writebatch中的所有键值对依次解析并使用handler放入memtable。 |
|
|
|
|
|
|
|
|
|
|
|
```c++ |
|
|
|
|
|
Slice input(rep_); |
|
|
|
|
|
if (input.size() < kHeader) { |
|
|
|
|
|
return Status::Corruption("malformed WriteBatch (too small)"); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
input.remove_prefix(kHeader); |
|
|
|
|
|
Slice key, value; |
|
|
|
|
|
int found = 0; |
|
|
|
|
|
while (!input.empty()) { |
|
|
|
|
|
found++; |
|
|
|
|
|
char tag = input[0]; |
|
|
|
|
|
input.remove_prefix(1); |
|
|
|
|
|
switch (tag) { |
|
|
|
|
|
case kTypeValue: |
|
|
|
|
|
if (GetLengthPrefixedSlice(&input, &key) && |
|
|
|
|
|
GetLengthPrefixedSlice(&input, &value)) { |
|
|
|
|
|
handler->Put(key, value); |
|
|
|
|
|
} else { |
|
|
|
|
|
return Status::Corruption("bad WriteBatch Put"); |
|
|
|
|
|
} |
|
|
|
|
|
break; |
|
|
|
|
|
case kTypeDeletion: |
|
|
|
|
|
if (GetLengthPrefixedSlice(&input, &key)) { |
|
|
|
|
|
handler->Delete(key); |
|
|
|
|
|
} else { |
|
|
|
|
|
return Status::Corruption("bad WriteBatch Delete"); |
|
|
|
|
|
} |
|
|
|
|
|
break; |
|
|
|
|
|
default: |
|
|
|
|
|
return Status::Corruption("unknown WriteBatch tag"); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
``` |
|
|
|
|
|
|
|
|
|
|
|
之后,使用MemTable::Add将解析出来的数据重新编码,并插入至memtable。对于SSTable来说,InternalKey中的SequenceNumber和Type被隐藏,但存储数据的形式未发生改变。 |
|
|
|
|
|
|
|
|
|
|
|
```c++ |
|
|
|
|
|
void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, |
|
|
|
|
|
const Slice& value) { |
|
|
|
|
|
// Format of an entry is concatenation of: |
|
|
|
|
|
// key_size : varint32 of internal_key.size() |
|
|
|
|
|
// key bytes : char[internal_key.size()] |
|
|
|
|
|
// tag : uint64((sequence << 8) | type) |
|
|
|
|
|
// value_size : varint32 of value.size() |
|
|
|
|
|
// value bytes : char[value.size()] |
|
|
|
|
|
size_t key_size = key.size(); |
|
|
|
|
|
size_t val_size = value.size(); |
|
|
|
|
|
size_t internal_key_size = key_size + 8; |
|
|
|
|
|
const size_t encoded_len = VarintLength(internal_key_size) + |
|
|
|
|
|
internal_key_size + VarintLength(val_size) + |
|
|
|
|
|
val_size; |
|
|
|
|
|
char* buf = arena_.Allocate(encoded_len); |
|
|
|
|
|
char* p = EncodeVarint32(buf, internal_key_size); |
|
|
|
|
|
std::memcpy(p, key.data(), key_size); |
|
|
|
|
|
p += key_size; |
|
|
|
|
|
EncodeFixed64(p, (s << 8) | type); |
|
|
|
|
|
p += 8; |
|
|
|
|
|
p = EncodeVarint32(p, val_size); |
|
|
|
|
|
std::memcpy(p, value.data(), val_size); |
|
|
|
|
|
assert(p + val_size == buf + encoded_len); |
|
|
|
|
|
table_.Insert(buf); |
|
|
|
|
|
} |
|
|
|
|
|
``` |
|
|
|
|
|
|
|
|
|
|
|
这是原本leveldb在memtable和sstable中存储单个键值对时使用的数据结构: |
|
|
|
|
|
|
|
|
|
|
|
![old_record](ttl.assets/old_record.png) |
|
|
|
|
|
|
|
|
|
|
|
其中,key和value对应着用户输入的键值对,而sequenceNumber是该键值对在leveldb内部的序列号(越大越新),type则标识该记录的类型。 |
|
|
|
|
|
|
|
|
|
|
|
#### 第一版设计:增加在中 |
|
|
|
|
|
|
|
|
|
|
|
我们的第一版TTL存储设计如下: |
|
|
|
|
|
|
|
|
|
|
|
![old_record-17307966031002](ttl.assets/old_record-17307966031002.png) |
|
|
|
|
|
|
|
|
|
|
|
在我们最初的思考中,TTL与SequenceNumber、Type一样,属于和主要的存储内容(key,value)分割开的存储数据,因此我们选择将TTL加在SequenceNumber和Type后面(不加在中间和前面,以免破坏原先InternalKey的结构)。 |
|
|
|
|
|
|
|
|
|
|
|
但在实现过程中,我们发现在中间增加TTL的形式会导致许多代码变动,因为对数据编码形式的修改会导致InternalKey和LookupKey等基础的编码类及函数需要进行改动,它们在一些与TTL无关的地方似乎也被复用(比如versionEdit和大量测试函数),因此我们认为对其进行修改会非常复杂,可能导致leveldb其他模块受到牵连,不符合我们对TTL轻量级改动的构想。因此我们进行了讨论,并产生了如下的新版设计 |
|
|
|
|
|
|
|
|
|
|
|
#### 第二版设计:放入Value |
|
|
|
|
|
|
|
|
|
|
|
我们的第二版TTL存储设计如下: |
|
|
|
|
|
|
|
|
|
|
|
![old_record-17308259248204](ttl.assets/old_record-17308259248204.png) |
|
|
|
|
|
|
|
|
|
|
|
由于 leveldb 本身对于 key、value、sequencenumber 的解析已经非常完善并且有些耦合,因此我们想要在不破坏 leveldb 原先的解析方式下完成 TTL 功能,我们想到的最好最轻量级的方式,就是将 TTL 数据作为 Value的一部分(在原 Value 后追加 8byte 作为 TTL )进行存储,仅当要解析 Value 时(即读取数据时或合并时),才会在同时对 TTL 进行解析。由此一来,我们无需改变原本的存储结构。事实证明,我们在很短时间内就完成了这一版 TTL的全部所需功能。 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
### 实现流程 |
|
|
|
|
|
|
|
|
|
|
|
#### 第一版设计 |
|
|
|
|
|
|
|
|
|
|
|
首先,我们需要修改在 writebatch 中数据的存取格式(主要是对 Value 的读取需要向后偏移8位)。我们在 WriteBatch 中新增了 Put_for_ttl 函数,将 ttl 放置在 key,value 之前。 |
|
|
|
|
|
|
|
|
|
|
|
```c++ |
|
|
|
|
|
void WriteBatch::Put_for_ttl(const Slice& key, const Slice& value,uint64_t ttl){ |
|
|
|
|
|
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); |
|
|
|
|
|
rep_.push_back(static_cast<char>(kTypeValue)); |
|
|
|
|
|
PutVarint64(&rep_,ttl); |
|
|
|
|
|
PutLengthPrefixedSlice(&rep_, key); |
|
|
|
|
|
PutLengthPrefixedSlice(&rep_, value); |
|
|
|
|
|
} |
|
|
|
|
|
``` |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
同时,对 WriteBatch 的迭代器也要进行修改。 |
|
|
|
|
|
|
|
|
|
|
|
![image-20241106141245806](ttl.assets/image-20241106141245806.png) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
使用 Add 将键值对加入 memtable 时,也要在 key 和 value 之间插入参数 ttl。具体位置在 key 末尾的 tag 之后,使用 EncodeFixed64 放入大小为 8 的 ttl 。 |
|
|
|
|
|
|
|
|
|
|
|
```c++ |
|
|
|
|
|
void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, |
|
|
|
|
|
const Slice& value,uint64_t ttl) { |
|
|
|
|
|
// Format of an entry is concatenation of: |
|
|
|
|
|
// key_size : varint32 of internal_key.size() |
|
|
|
|
|
// key bytes : char[internal_key.size()] |
|
|
|
|
|
// tag : uint64((sequence << 8) | type) |
|
|
|
|
|
// ttl : uint64(now) |
|
|
|
|
|
// value_size : varint32 of value.size() |
|
|
|
|
|
// value bytes : char[value.size()] |
|
|
|
|
|
size_t key_size = key.size(); |
|
|
|
|
|
|
|
|
|
|
|
size_t val_size = value.size(); |
|
|
|
|
|
size_t internal_key_size = key_size + 8; |
|
|
|
|
|
const size_t encoded_len = VarintLength(internal_key_size) + |
|
|
|
|
|
internal_key_size + 8 + VarintLength(val_size) + |
|
|
|
|
|
val_size; |
|
|
|
|
|
char* buf = arena_.Allocate(encoded_len); |
|
|
|
|
|
//internal_key_size(sizeof(key)+sizeof(sequence+type)+sizeof(ttl)) |
|
|
|
|
|
//--key--(sequence+type)--ttl--sizeof(val)--val |
|
|
|
|
|
char* p = EncodeVarint32(buf, internal_key_size); |
|
|
|
|
|
std::memcpy(p, key.data(), key_size); |
|
|
|
|
|
p += key_size; |
|
|
|
|
|
EncodeFixed64(p, (s << 8) | type); |
|
|
|
|
|
p += 8; |
|
|
|
|
|
EncodeFixed64(p,ttl); |
|
|
|
|
|
p += 8; |
|
|
|
|
|
p = EncodeVarint32(p, val_size); |
|
|
|
|
|
std::memcpy(p, value.data(), val_size); |
|
|
|
|
|
assert(p + val_size == buf + encoded_len); |
|
|
|
|
|
table_.Insert(buf); |
|
|
|
|
|
} |
|
|
|
|
|
``` |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
从 memtable 获取插入的数据时,需要读出 key, value 之间的 ttl 进行检查( line15 ) |
|
|
|
|
|
|
|
|
|
|
|
```c++ |
|
|
|
|
|
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { |
|
|
|
|
|
Slice memkey = key.memtable_key(); |
|
|
|
|
|
Table::Iterator iter(&table_); |
|
|
|
|
|
iter.Seek(memkey.data()); |
|
|
|
|
|
while(iter.Valid()) { |
|
|
|
|
|
const char* entry = iter.key(); |
|
|
|
|
|
uint32_t key_length;//internal_key_size |
|
|
|
|
|
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); |
|
|
|
|
|
if (comparator_.comparator.user_comparator()->Compare( |
|
|
|
|
|
Slice(key_ptr, key_length - 8), key.user_key()) == 0) { |
|
|
|
|
|
// Correct user key |
|
|
|
|
|
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); |
|
|
|
|
|
switch (static_cast<ValueType>(tag & 0xff)) { |
|
|
|
|
|
case kTypeValue: { |
|
|
|
|
|
const uint64_t ttl = DecodeFixed64(key_ptr+key_length); |
|
|
|
|
|
if(ttl<key.ttl()){ // data的ttl需要大于等于查找开始时的时间,否则查找直到下一个ttl有效的键值对 |
|
|
|
|
|
iter.Next();// drop dead data |
|
|
|
|
|
continue; |
|
|
|
|
|
} |
|
|
|
|
|
Slice v = GetLengthPrefixedSlice(key_ptr + key_length+8);// 指针再偏移ttl的长度 |
|
|
|
|
|
value->assign(v.data(), v.size()); |
|
|
|
|
|
return true; |
|
|
|
|
|
} |
|
|
|
|
|
case kTypeDeletion: |
|
|
|
|
|
*s = Status::NotFound(Slice()); |
|
|
|
|
|
return true; |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
else{ |
|
|
|
|
|
break;// 当 key 不一致的时候返回 false |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
return false; |
|
|
|
|
|
} |
|
|
|
|
|
``` |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
在开始查找时,将当前的时间写入 LookupKey,如果查找到的数据 ttl 超过当前时间,那么就会抛弃该条数据。 |
|
|
|
|
|
|
|
|
|
|
|
![image-20241106152151687](ttl.assets/image-20241106152151687.png) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
通过 blockbuilder 向 sstable 插入数据时(新建 level0/1/2)时使用 add 函数,需要将 ttl 通过PutVarint64 编码成 `Varint64` 形式,放入 key 与 value 之间 |
|
|
|
|
|
|
|
|
|
|
|
```c++ |
|
|
|
|
|
void BlockBuilder::Add(const Slice& key, uint64_t ttl, const Slice& value) { |
|
|
|
|
|
··· |
|
|
|
|
|
··· |
|
|
|
|
|
|
|
|
|
|
|
// Add "<shared><non_shared><value_size>" to buffer_ |
|
|
|
|
|
PutVarint32(&buffer_, shared); |
|
|
|
|
|
PutVarint32(&buffer_, non_shared); |
|
|
|
|
|
PutVarint32(&buffer_, value.size()); |
|
|
|
|
|
|
|
|
|
|
|
// Add string delta to buffer_ followed by value |
|
|
|
|
|
buffer_.append(key.data() + shared, non_shared); |
|
|
|
|
|
PutVarint64(&buffer_, ttl);//加入ttl |
|
|
|
|
|
buffer_.append(value.data(), value.size()); |
|
|
|
|
|
|
|
|
|
|
|
// Update state |
|
|
|
|
|
last_key_.resize(shared); |
|
|
|
|
|
last_key_.append(key.data() + shared, non_shared); |
|
|
|
|
|
assert(Slice(last_key_) == key); |
|
|
|
|
|
counter_++; |
|
|
|
|
|
} |
|
|
|
|
|
``` |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
通过 TableBuilder 将合并后的数据插入新的 sstable,使用 add 函数 |
|
|
|
|
|
|
|
|
|
|
|
![image-20241106154658482](ttl.assets/image-20241106154658482.png) |
|
|
|
|
|
|
|
|
|
|
|
```c++ |
|
|
|
|
|
void TableBuilder::Add(const Slice& key, uint64_t ttl, const Slice& value) { |
|
|
|
|
|
time_t now = time(nullptr); |
|
|
|
|
|
if(ttl < static_cast<uint64_t>(now))return; // 再次检查,是否超时(理论上在上层DoCompactionWork时已经检查过一次,都是为了保障正确性,冗余的测试并不会造成很大的性能瓶颈) |
|
|
|
|
|
|
|
|
|
|
|
··· |
|
|
|
|
|
··· |
|
|
|
|
|
|
|
|
|
|
|
if (r->filter_block != nullptr) { |
|
|
|
|
|
r->filter_block->AddKey(key); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
r->last_key.assign(key.data(), key.size()); |
|
|
|
|
|
r->num_entries++; |
|
|
|
|
|
r->data_block.Add(key, ttl, value); //加入block时仍旧需要使用 BlockBuilder::Add |
|
|
|
|
|
|
|
|
|
|
|
const size_t estimated_block_size = r->data_block.CurrentSizeEstimate(); |
|
|
|
|
|
if (estimated_block_size >= r->options.block_size) { |
|
|
|
|
|
Flush(); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
``` |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
在 DoCompactionWork 中,合并是会检查数据 ttl 是否到期,如果到期则会丢弃。 |
|
|
|
|
|
|
|
|
|
|
|
![image-20241106173117857](ttl.assets/image-20241106173117857.png) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
##### 然而,在处理Block中数据的存取时,我们遇到了问题: |
|
|
|
|
|
|
|
|
|
|
|
![image-20241106203303238](ttl.assets/image-20241106203303238.png) |
|
|
|
|
|
|
|
|
|
|
|
Leveldb使用Block::Iter对于IndexBlock和dataBlock进行统一的数据读取,在处理dataBlock时,我们期望Block::Iter对数据的处理能够适配ttl,即在key与value中间存有一份TTL。然而在debug过程中我们发现,在改动Block::Iter获取下一个键值对和获取当前键值对的Value的逻辑后(因为要多向后移8字节),使得IndexBlock获取数据出错。在经过一段时间的debug后,我们认为要完成解决这一bug,需要连带改动IndexBlock存取数据的逻辑,而这已经偏离了TTL本身的范围,使得项目的紧耦合度增强,既不轻量也不优雅,因此我们放弃解决这一bug,转而构思出了第二版设计。 |
|
|
|
|
|
|
|
|
|
|
|
#### 第二版设计 |
|
|
|
|
|
|
|
|
|
|
|
##### 原先的Put |
|
|
|
|
|
|
|
|
|
|
|
```c++ |
|
|
|
|
|
// Convenience methods |
|
|
|
|
|
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { |
|
|
|
|
|
return DB::Put(o, key, val); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// 加入ttl功能后 |
|
|
|
|
|
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val,uint64_t ttl) { |
|
|
|
|
|
return DB::Put(o, key, val,ttl); |
|
|
|
|
|
} |
|
|
|
|
|
``` |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
##### 老版本的 `DB::Put` 方法仅接受键和值,并将值直接存入数据库。 |
|
|
|
|
|
|
|
|
|
|
|
```c++ |
|
|
|
|
|
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { |
|
|
|
|
|
WriteBatch batch; |
|
|
|
|
|
batch.Put(key, value); |
|
|
|
|
|
return Write(opt, &batch); |
|
|
|
|
|
} |
|
|
|
|
|
``` |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
##### 新版本增加了一个可选参数 `ttl`(过期时间),允许用户指定存储的值的生存时间。 |
|
|
|
|
|
|
|
|
|
|
|
```c++ |
|
|
|
|
|
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { |
|
|
|
|
|
WriteBatch batch; |
|
|
|
|
|
int len = value.size() + sizeof(uint64_t); |
|
|
|
|
|
char* new_data = new char[len]; |
|
|
|
|
|
time_t now = time(nullptr); // 获取当前时间,单位为秒 |
|
|
|
|
|
uint64_t ttl = INT64_MAX; // 设置默认的 TTL 为最大值(即永不过期) |
|
|
|
|
|
|
|
|
|
|
|
memcpy(new_data, value.data(), value.size()); // 将实际的值复制到 new_data 中 |
|
|
|
|
|
memcpy(new_data + len - sizeof(uint64_t), (char*)(&ttl), sizeof(uint64_t)); // 将 TTL 复制到 new_data 的末尾 |
|
|
|
|
|
|
|
|
|
|
|
Slice newValue = Slice(new_data, len); // 创建一个新的 Slice 对象 |
|
|
|
|
|
batch.Put(key, newValue); // 将键值对放入 WriteBatch 中 |
|
|
|
|
|
return Write(opt, &batch); // 写入数据库 |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value, uint64_t ttl) { |
|
|
|
|
|
WriteBatch batch; |
|
|
|
|
|
int len = value.size() + sizeof(uint64_t); |
|
|
|
|
|
char* new_data = new char[len]; |
|
|
|
|
|
time_t now = time(nullptr); // 获取当前时间,单位为秒 |
|
|
|
|
|
ttl += static_cast<uint64_t>(now); // 将当前时间加上 TTL,计算过期时间,以过期时间戳作为真正存储的TTL |
|
|
|
|
|
|
|
|
|
|
|
memcpy(new_data, value.data(), value.size()); // 将实际的值复制到 new_data 中 |
|
|
|
|
|
memcpy(new_data + len - sizeof(uint64_t), (char*)(&ttl), sizeof(uint64_t)); // 将 TTL 复制到 new_data 的末尾 |
|
|
|
|
|
|
|
|
|
|
|
Slice newValue = Slice(new_data, len); // 创建一个新的 Slice 对象 |
|
|
|
|
|
batch.Put(key, newValue); // 将键值对放入 WriteBatch 中 |
|
|
|
|
|
delete []new_data; // 释放分配的内存,防止内存泄漏 |
|
|
|
|
|
return Write(opt, &batch); // 写入数据库 |
|
|
|
|
|
} |
|
|
|
|
|
``` |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
##### 合并sstable内的数据 |
|
|
|
|
|
|
|
|
|
|
|
```c++ |
|
|
|
|
|
Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
|
|
|
··· |
|
|
|
|
|
··· |
|
|
|
|
|
Iterator* input = versions_->MakeInputIterator(compact->compaction); |
|
|
|
|
|
|
|
|
|
|
|
// Release mutex while we're actually doing the compaction work |
|
|
|
|
|
mutex_.Unlock(); |
|
|
|
|
|
|
|
|
|
|
|
input->SeekToFirst(); |
|
|
|
|
|
Status status; |
|
|
|
|
|
ParsedInternalKey ikey; |
|
|
|
|
|
std::string current_user_key; |
|
|
|
|
|
bool has_current_user_key = false; |
|
|
|
|
|
SequenceNumber last_sequence_for_key = kMaxSequenceNumber; |
|
|
|
|
|
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { |
|
|
|
|
|
auto x=input->value(); // 获取键值对 value |
|
|
|
|
|
uint64_t ttl=*(uint64_t*)(x.data()+x.size()-sizeof(uint64_t));// 将 TTL 从 new_data 的末尾取出 |
|
|
|
|
|
time_t now = time(nullptr); // 获得当前时间 |
|
|
|
|
|
// 如果 TTL 超过当前时间,说明数据已经过期 |
|
|
|
|
|
if(ttl < static_cast<uint64_t>(now)){ |
|
|
|
|
|
Log(options_.info_log, "delete record for ttl"); |
|
|
|
|
|
input->Next(); // 将 input 指向下一个键值对 |
|
|
|
|
|
continue; |
|
|
|
|
|
} |
|
|
|
|
|
// Prioritize immutable compaction work |
|
|
|
|
|
if (has_imm_.load(std::memory_order_relaxed)) { |
|
|
|
|
|
const uint64_t imm_start = env_->NowMicros(); |
|
|
|
|
|
mutex_.Lock(); |
|
|
|
|
|
if (imm_ != nullptr) { |
|
|
|
|
|
CompactMemTable(); |
|
|
|
|
|
// Wake up MakeRoomForWrite() if necessary. |
|
|
|
|
|
background_work_finished_signal_.SignalAll(); |
|
|
|
|
|
} |
|
|
|
|
|
mutex_.Unlock(); |
|
|
|
|
|
imm_micros += (env_->NowMicros() - imm_start); |
|
|
|
|
|
} |
|
|
|
|
|
··· |
|
|
|
|
|
··· |
|
|
|
|
|
} |
|
|
|
|
|
``` |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
##### 用于处理从sstable中读取的键值对 |
|
|
|
|
|
|
|
|
|
|
|
```c++ |
|
|
|
|
|
static void SaveValue(void* arg, const Slice& ikey, const Slice& v) { |
|
|
|
|
|
Saver* s = reinterpret_cast<Saver*>(arg); |
|
|
|
|
|
ParsedInternalKey parsed_key; |
|
|
|
|
|
if (!ParseInternalKey(ikey, &parsed_key)) { |
|
|
|
|
|
s->state = kCorrupt; |
|
|
|
|
|
} else { |
|
|
|
|
|
if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) { |
|
|
|
|
|
// kTypeValue 对应被插入的数据 |
|
|
|
|
|
if(parsed_key.type == kTypeValue){ |
|
|
|
|
|
time_t now = time(nullptr); |
|
|
|
|
|
uint64_t ttl=*(uint64_t*)(v.data()+v.size()-sizeof(uint64_t)); // 将 TTL 从 new_data 的末尾取出 |
|
|
|
|
|
if(ttl < static_cast<uint64_t>(now))return; // 如果 TTL 超过当前时间,说明数据已经过期,直接返回 |
|
|
|
|
|
} |
|
|
|
|
|
s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted; |
|
|
|
|
|
if (s->state == kFound) { |
|
|
|
|
|
s->value->assign(v.data(), v.size()-sizeof(uint64_t)); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
``` |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
##### 读取memtable内的数据 |
|
|
|
|
|
|
|
|
|
|
|
```c++ |
|
|
|
|
|
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { |
|
|
|
|
|
Slice memkey = key.memtable_key(); |
|
|
|
|
|
Table::Iterator iter(&table_); |
|
|
|
|
|
iter.Seek(memkey.data()); |
|
|
|
|
|
while (iter.Valid()) { |
|
|
|
|
|
const char* entry = iter.key(); |
|
|
|
|
|
uint32_t key_length; |
|
|
|
|
|
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); |
|
|
|
|
|
if (comparator_.comparator.user_comparator()->Compare( |
|
|
|
|
|
Slice(key_ptr, key_length - 8), key.user_key()) == 0) { |
|
|
|
|
|
// Correct user key |
|
|
|
|
|
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); |
|
|
|
|
|
switch (static_cast<ValueType>(tag & 0xff)) { |
|
|
|
|
|
// kTypeValue 对应被插入的数据 |
|
|
|
|
|
case kTypeValue: { |
|
|
|
|
|
Slice v = GetLengthPrefixedSlice(key_ptr + key_length); |
|
|
|
|
|
uint64_t ttl=*(uint64_t*)(v.data()+v.size()-sizeof(uint64_t)); // 将 TTL 从 new_data 的末尾取出 |
|
|
|
|
|
time_t now = time(nullptr); |
|
|
|
|
|
// 如果 TTL 超过当前时间,说明数据已经过期 |
|
|
|
|
|
if(ttl < static_cast<uint64_t>(now)){ |
|
|
|
|
|
iter.Next(); // 将 iter 指向下一个键值对 |
|
|
|
|
|
continue; |
|
|
|
|
|
} |
|
|
|
|
|
value->assign(v.data(), v.size()-sizeof(uint64_t)); |
|
|
|
|
|
|
|
|
|
|
|
return true; |
|
|
|
|
|
} |
|
|
|
|
|
case kTypeDeletion: |
|
|
|
|
|
*s = Status::NotFound(Slice()); |
|
|
|
|
|
return true; |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
else break; |
|
|
|
|
|
} |
|
|
|
|
|
return false; |
|
|
|
|
|
} |
|
|
|
|
|
``` |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
##### 测试 |
|
|
|
|
|
|
|
|
|
|
|
1.将原有的两个测试的严格sleep ttl秒,改成sleep(ttl+1)秒,因为我们的设计是以秒为存储单位,可能存在一秒以内的读取误差,所以进行修改。(我们认为ttl无需毫秒/微妙级的数据准确性,一秒之内的误差可以接受,因此采取此种设计) |
|
|
|
|
|
|
|
|
|
|
|
2.在原有设计上增加了第三个测试,其目的在于:①测试新的数据过期后,是否能够重新查找到旧但未过期的数据。②测试无TTL的接口是否可以正确运行,并且数据不过期。 |
|
|
|
|
|
|
|
|
|
|
|
```c++ |
|
|
|
|
|
#include "gtest/gtest.h" |
|
|
|
|
|
#include "leveldb/env.h" |
|
|
|
|
|
#include "leveldb/db.h" |
|
|
|
|
|
|
|
|
|
|
|
using namespace leveldb; |
|
|
|
|
|
|
|
|
|
|
|
constexpr int value_size = 2048; |
|
|
|
|
|
constexpr int data_size = 128 << 20; |
|
|
|
|
|
|
|
|
|
|
|
Status OpenDB(std::string dbName, DB **db) { |
|
|
|
|
|
Options options; |
|
|
|
|
|
options.create_if_missing = true; |
|
|
|
|
|
return DB::Open(options, dbName, db); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void InsertData(DB *db, uint64_t ttl/* second */) { |
|
|
|
|
|
WriteOptions writeOptions; |
|
|
|
|
|
int key_num = data_size / value_size; |
|
|
|
|
|
srand(0); |
|
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < key_num; i++) { |
|
|
|
|
|
int key_ = rand() % key_num+1; |
|
|
|
|
|
std::string key = std::to_string(key_); |
|
|
|
|
|
std::string value(value_size, 'a'); |
|
|
|
|
|
db->Put(writeOptions, key, value, ttl); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
void GetData(DB *db, int size = (1 << 30)) { |
|
|
|
|
|
ReadOptions readOptions; |
|
|
|
|
|
int key_num = data_size / value_size; |
|
|
|
|
|
|
|
|
|
|
|
// 点查 |
|
|
|
|
|
srand(0); |
|
|
|
|
|
for (int i = 0; i < 100; i++) { |
|
|
|
|
|
int key_ = rand() % key_num+1; |
|
|
|
|
|
std::string key = std::to_string(key_); |
|
|
|
|
|
std::string value; |
|
|
|
|
|
db->Get(readOptions, key, &value); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
TEST(TestTTL, ReadTTL) { |
|
|
|
|
|
DB *db; |
|
|
|
|
|
if(OpenDB("testdb", &db).ok() == false) { |
|
|
|
|
|
std::cerr << "open db failed" << std::endl; |
|
|
|
|
|
abort(); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
uint64_t ttl = 20; |
|
|
|
|
|
|
|
|
|
|
|
InsertData(db, ttl); |
|
|
|
|
|
|
|
|
|
|
|
ReadOptions readOptions; |
|
|
|
|
|
Status status; |
|
|
|
|
|
int key_num = data_size / value_size; |
|
|
|
|
|
srand(0); |
|
|
|
|
|
for (int i = 0; i < 100; i++) { |
|
|
|
|
|
int key_ = rand() % key_num+1; |
|
|
|
|
|
std::string key = std::to_string(key_); |
|
|
|
|
|
std::string value; |
|
|
|
|
|
status = db->Get(readOptions, key, &value); |
|
|
|
|
|
ASSERT_TRUE(status.ok()); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
Env::Default()->SleepForMicroseconds((ttl+1) * 1000000); |
|
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < 100; i++) { |
|
|
|
|
|
int key_ = rand() % key_num+1; |
|
|
|
|
|
std::string key = std::to_string(key_); |
|
|
|
|
|
std::string value; |
|
|
|
|
|
status = db->Get(readOptions, key, &value); |
|
|
|
|
|
ASSERT_FALSE(status.ok()); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
delete db; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
TEST(TestTTL, CompactionTTL) { |
|
|
|
|
|
DB *db; |
|
|
|
|
|
|
|
|
|
|
|
if(OpenDB("testdb", &db).ok() == false) { |
|
|
|
|
|
std::cerr << "open db failed" << std::endl; |
|
|
|
|
|
abort(); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
uint64_t ttl = 20; |
|
|
|
|
|
InsertData(db, ttl); |
|
|
|
|
|
|
|
|
|
|
|
leveldb::Range ranges[1]; |
|
|
|
|
|
ranges[0] = leveldb::Range("-", "A"); |
|
|
|
|
|
uint64_t sizes[1]; |
|
|
|
|
|
db->GetApproximateSizes(ranges, 1, sizes); |
|
|
|
|
|
ASSERT_GT(sizes[0], 0); |
|
|
|
|
|
|
|
|
|
|
|
Env::Default()->SleepForMicroseconds((ttl+1) * 1000000); |
|
|
|
|
|
|
|
|
|
|
|
db->CompactRange(nullptr, nullptr); |
|
|
|
|
|
|
|
|
|
|
|
leveldb::Range ranges_1[1]; |
|
|
|
|
|
ranges[0] = leveldb::Range("-", "A"); |
|
|
|
|
|
uint64_t sizes_1[1]; |
|
|
|
|
|
db->GetApproximateSizes(ranges_1, 1, sizes_1); |
|
|
|
|
|
ASSERT_EQ(sizes_1[0], 0); |
|
|
|
|
|
|
|
|
|
|
|
delete db; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
TEST(TestTTL, OurTTL) { |
|
|
|
|
|
DB *db; |
|
|
|
|
|
WriteOptions writeOptions; |
|
|
|
|
|
ReadOptions readOptions; |
|
|
|
|
|
if(OpenDB("testdb_for_XOY", &db).ok() == false) { |
|
|
|
|
|
std::cerr << "open db failed" << std::endl; |
|
|
|
|
|
abort(); |
|
|
|
|
|
} |
|
|
|
|
|
for (int i = 0; i < 10000; i++) { |
|
|
|
|
|
std::string key = std::to_string(i); |
|
|
|
|
|
std::string value = std::to_string(i); |
|
|
|
|
|
db->Put(writeOptions, key, value);//插入永不过期的key value(value=key) |
|
|
|
|
|
} |
|
|
|
|
|
for (int i = 0; i < 10000; i++) { |
|
|
|
|
|
std::string key = std::to_string(i); |
|
|
|
|
|
std::string value = std::to_string(i*2); |
|
|
|
|
|
db->Put(writeOptions, key, value, 30);//插入30秒后过期的key value(value=key*2) |
|
|
|
|
|
} |
|
|
|
|
|
for (int i = 0; i < 10000; i++) { |
|
|
|
|
|
std::string key = std::to_string(i); |
|
|
|
|
|
std::string value = std::to_string(i*3); |
|
|
|
|
|
db->Put(writeOptions, key, value, 15);//插入15秒后过期的key value(value=key*3) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < 10000; i++) { |
|
|
|
|
|
std::string key = std::to_string(i); |
|
|
|
|
|
std::string value; |
|
|
|
|
|
Status status = db->Get(readOptions, key, &value); |
|
|
|
|
|
ASSERT_TRUE(status.ok()); |
|
|
|
|
|
ASSERT_TRUE(value==std::to_string(i*3));//立刻读取,期望得到value=key*3 |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
Env::Default()->SleepForMicroseconds((15+1) * 1000000); |
|
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < 10000; i++) { |
|
|
|
|
|
std::string key = std::to_string(i); |
|
|
|
|
|
std::string value; |
|
|
|
|
|
Status status = db->Get(readOptions, key, &value); |
|
|
|
|
|
ASSERT_TRUE(status.ok()); |
|
|
|
|
|
ASSERT_TRUE(value==std::to_string(i*2));//16秒后读取,期望得到value=key*2 |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
Env::Default()->SleepForMicroseconds((15+1) * 1000000); |
|
|
|
|
|
|
|
|
|
|
|
for (int i = 0; i < 10000; i++) { |
|
|
|
|
|
std::string key = std::to_string(i); |
|
|
|
|
|
std::string value; |
|
|
|
|
|
Status status = db->Get(readOptions, key, &value); |
|
|
|
|
|
ASSERT_TRUE(status.ok()); |
|
|
|
|
|
ASSERT_TRUE(value==std::to_string(i));//32秒后读取,期望得到value=key |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
delete db; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int main(int argc, char** argv) { |
|
|
|
|
|
// All tests currently run with the same read-only file limits. |
|
|
|
|
|
testing::InitGoogleTest(&argc, argv); |
|
|
|
|
|
return RUN_ALL_TESTS(); |
|
|
|
|
|
} |
|
|
|
|
|
``` |