|
|
@ -0,0 +1,461 @@ |
|
|
|
# 实验报告:在LevelDB中实现TTL功能 |
|
|
|
|
|
|
|
10225501460 林子骥 |
|
|
|
|
|
|
|
10211900416 郭夏辉 |
|
|
|
|
|
|
|
## 1.实验目的 |
|
|
|
|
|
|
|
- 深入了解LevelDB的内部原理和数据结构。 |
|
|
|
- 掌握TTL(Time To Live,生存时间)功能的设计与实现方法。 |
|
|
|
- 学习如何在开源项目中添加新功能,提升代码阅读和修改能力。 |
|
|
|
|
|
|
|
## 2.实验要求 |
|
|
|
|
|
|
|
- 在LevelDB中实现键值对的TTL功能,使得过期的数据在**读取时自动失效**,并在适当的时候**被合并清理**。 |
|
|
|
- 修改LevelDB的源码,实现对TTL的支持,包括数据的写入、读取和过期数据的清理。 |
|
|
|
- 编写测试用例,验证TTL功能的正确性和稳定性。 |
|
|
|
|
|
|
|
## 3.实验背景 |
|
|
|
|
|
|
|
**TTL(Time To Live)**,即生存时间,是指数据在存储系统中的有效期。设置TTL可以使得过期的数据自动失效,减少存储空间占用,提高系统性能。**为什么需要TTL功能:** |
|
|
|
|
|
|
|
- **数据自动过期**:无需手动删除过期数据,简化数据管理。 |
|
|
|
- **节省存储空间**:定期清理无效数据,优化资源利用。 |
|
|
|
- **提高性能**:减少无效数据的干扰,提升读写效率。 |
|
|
|
|
|
|
|
**在LevelDB中添加TTL功能的方案:** |
|
|
|
|
|
|
|
1. **数据编码方式修改**:在键或值中增加过期时间的信息。 |
|
|
|
2. **读取时判断过期**:在Get操作时,检查数据是否过期,过期则返回NotFound。 |
|
|
|
3. **Compaction清理**:在数据压缩过程中,删除过期的数据。 |
|
|
|
|
|
|
|
## 4.实验步骤 |
|
|
|
|
|
|
|
### 4.1 数据格式调整及支持TTL的Put方法 |
|
|
|
|
|
|
|
原本的数据样貌大致是[key, value],为了方便进一步的操作,在样貌不变的前提下,我们直接将过期时间放在了实际数据后面,一起存储到 value 中。 |
|
|
|
|
|
|
|
```cpp |
|
|
|
Status DBImpl::Put(const WriteOptions& options, const Slice& key, |
|
|
|
const Slice& value, uint64_t ttl) { |
|
|
|
std::string val_with_ts; |
|
|
|
val_with_ts.reserve(value.size() + kTSLength); |
|
|
|
char ts_string[kTSLength]; |
|
|
|
TIMESTAMP expiration_time = this->env_->NowMicros() + ttl * 1000000; |
|
|
|
EncodeFixed64(ts_string,expiration_time); |
|
|
|
//assert(sizeof(expiration_time) == sizeof(TIMESTAMP )); |
|
|
|
// 追加原始 value 到 val_with_ts |
|
|
|
val_with_ts.append(value.data(), value.size()); |
|
|
|
// 将 expiration_time 追加到 val_with_ts |
|
|
|
val_with_ts.append(ts_string,kTSLength); |
|
|
|
return DB::Put(options, key, Slice(val_with_ts)); |
|
|
|
} |
|
|
|
``` |
|
|
|
|
|
|
|
最开始,我们拓展了`DBImpl::Put`使得它支持TTL。从前到后的逻辑很简单: |
|
|
|
|
|
|
|
1. 创建一个空的 `std::string val_with_ts`,存储要写入的带过期时间的value,并且为其预留足够的空间(value和时间戳的长度之和) |
|
|
|
2. 利用`env_`提供的`NowMicros()`方法,以微秒为单位计算过期时间(`当前时间+TTL*1000000`)这里我们尽量将所有底层操作交给`env_`完成,而DBImpl有该对象可以调用相关查找时间的函数 |
|
|
|
3. 利用`EncodeFixed64`编码函数将`expiration_time`编码为一个固定长度的字符串,并存储在`ts_string`中。利用LevelDB自带的而不是自己手写的编码函数可以提升一些运行效率。 |
|
|
|
4. 追加原始`value`到`val_with_ts`以及将`expiration_time` 追加到 `val_with_ts`只需要使用字符串的`append`方法从前往后拼接即可。 |
|
|
|
5. 最后将刚刚处理得到的`val_with_ts`打包成一个`Slice`,然后再调用原始的`DB::Put`方法。 |
|
|
|
|
|
|
|
为了适配之后的实验,提升兼容性,我们将上面这套逻辑重复了一遍,也完成了新的`Status DB::Put(const WriteOptions& options, const Slice& key, const Slice& value, uint64_t ttl) `,使得它支持TTL。 |
|
|
|
|
|
|
|
```cpp |
|
|
|
Status DB::Put(const WriteOptions& options, const Slice& key, |
|
|
|
const Slice& value, uint64_t ttl) { |
|
|
|
// 将 value 和 expiration_time 合并到一起,形成带 TTL 的 value |
|
|
|
std::string val_with_ts; |
|
|
|
|
|
|
|
val_with_ts.reserve(value.size() + sizeof(uint64_t)); |
|
|
|
|
|
|
|
uint64_t expiration_time = std::chrono::duration_cast<std::chrono::milliseconds>( |
|
|
|
std::chrono::system_clock::now().time_since_epoch()) |
|
|
|
.count() + ttl * 1000; |
|
|
|
|
|
|
|
// 追加原始 value 到 val_with_ts |
|
|
|
val_with_ts.append(value.data(), value.size()); |
|
|
|
|
|
|
|
// 将 expiration_time 追加到 val_with_ts |
|
|
|
val_with_ts.append(reinterpret_cast<const char*>(&expiration_time), sizeof(expiration_time)); |
|
|
|
|
|
|
|
WriteBatch batch; |
|
|
|
batch.Put(key, Slice(val_with_ts)); |
|
|
|
return Write(options, &batch); |
|
|
|
} |
|
|
|
``` |
|
|
|
|
|
|
|
只不过在这里计算当前时间不能再使用`env_`的相关方法了,使用的是`std::chrono::system_clock::now()`获取当前时间,其他的逻辑是一样的。 |
|
|
|
|
|
|
|
实现很简单,就是将TTL值转为到期的时间戳,加入value尾部,这样可以尽可能降低整个项目的耦合程度,我们无需对ExtractUserkey(提取具体key值相关的函数),FindShortSeperator(简化key长度,这里还可能导致时间戳被修改)等函数进行复杂的重写。此外,还可以节约空间。 |
|
|
|
|
|
|
|
这里基于原先的项目举几个例子来说明为什么不讲时间戳作为key的一部分写入,原先的index_block就是为了尽量缩短key的长度,而如果将时间戳加入key的尾部,其时间戳本身的长度就会大于key的长度。并且由于其内部的时间戳与key不同,是无序存储的,因此很难使用重启点来减少存储时间戳的空间开销。 |
|
|
|
|
|
|
|
![image-20241107080532982](assets/image-20241107080532982.png) |
|
|
|
|
|
|
|
### 4.2支持TTL的Get方法 |
|
|
|
|
|
|
|
Get操作的核心便是检查数据是否过期,只要实现这里就能让读取顺利进行了。 |
|
|
|
|
|
|
|
```cpp |
|
|
|
uint64_t DBImpl::GetTS(std::string* val) { |
|
|
|
auto expiration_time = DecodeFixed64(val->data() + val->size() - kTSLength); |
|
|
|
val->resize(val->size() - kTSLength);//保证返回值没有ttl |
|
|
|
return expiration_time; |
|
|
|
} |
|
|
|
|
|
|
|
Status DBImpl::CheckIsExpire(std::string* value) { |
|
|
|
//debug 用 |
|
|
|
auto a = env_->NowMicros(); |
|
|
|
auto b = GetTS(value); |
|
|
|
if(a > b){ |
|
|
|
return Status::Expire("Expire",Slice()); |
|
|
|
} |
|
|
|
return Status(); |
|
|
|
} |
|
|
|
|
|
|
|
Status DBImpl::Get(const ReadOptions& options, const Slice& key, |
|
|
|
std::string* value) { |
|
|
|
// other code... |
|
|
|
// Unlock while reading from files and memtables |
|
|
|
// Unlock while reading from files and memtables |
|
|
|
{ |
|
|
|
mutex_.Unlock(); |
|
|
|
// First look in the memtable, then in the immutable memtable (if any). |
|
|
|
LookupKey lkey(key, snapshot); |
|
|
|
if (mem->Get(lkey, value, &s)) { |
|
|
|
// Done |
|
|
|
} else if (imm != nullptr && imm->Get(lkey, value, &s)) { |
|
|
|
// Done |
|
|
|
} else { |
|
|
|
stats.now_ts = this->env_->NowMicros(); |
|
|
|
s = current->Get(options, lkey, value, &stats); |
|
|
|
have_stat_update = true; |
|
|
|
} |
|
|
|
mutex_.Lock(); |
|
|
|
} |
|
|
|
if(s.ok()){ |
|
|
|
s = CheckIsExpire(value); |
|
|
|
} |
|
|
|
if (have_stat_update && current->UpdateStats(stats,s.IsExpire())) { |
|
|
|
MaybeScheduleCompaction();//有可能寻址过多,导致allow_seek为0,触发合并。 |
|
|
|
} |
|
|
|
//...... |
|
|
|
} |
|
|
|
|
|
|
|
bool Version::UpdateStats(const GetStats& stats,bool is_expire) { |
|
|
|
FileMetaData* f = stats.seek_file; |
|
|
|
if (f != nullptr) { |
|
|
|
f->allowed_seeks--; |
|
|
|
if(is_expire)f->allowed_seeks--; |
|
|
|
if (f->allowed_seeks <= 0 && file_to_compact_ == nullptr) { |
|
|
|
file_to_compact_ = f; |
|
|
|
file_to_compact_level_ = stats.seek_file_level; |
|
|
|
return true; |
|
|
|
} |
|
|
|
} |
|
|
|
return false; |
|
|
|
} |
|
|
|
``` |
|
|
|
|
|
|
|
与编码时候的`EncodeFixed64`相对应,我们使用`DecodeFixed64`将过期时间戳从value里面提取出来,并实现了`DBImpl::GetTS`方法。在此基础上我们检查value是否应该被丢弃了,实现了`DBImpl::CheckIsExpire`方法,如果当前时间已经超过了时间戳,那么直接返回过期状态`Status::Expire`。(其实这里也纠结了一下要不要按照实验文档中要求的那样返回状态`Status::NotFound`,经过测试两者都是可以的,然后严谨一些,实际情况是key已经存在,但由于某种原因不可再访问,所以最终还是使用了`Status::Expire`) |
|
|
|
|
|
|
|
在此我们进行一些时间分析,对于涉及内存(memtable内,缓存命中)的查找速度很快;而这里可能会想到,有许多过期数据都在缓存池内要怎么办,实际上,只要是在缓存池内的数据,即使过期了,它也是有信息价值的(提供了过期这一信息)。因此,我们实际需要解决的是,如何降低IO层面的对于过期数据的查找。这里我们在基于leveldb采用的自动合并优化上(即一个文件访问多次后,它将会作为input传给compaction,然后后续过滤过期数据),对于在缓存池中,读取到的所过期的数据所在的文件,其`allowed_seek`次数再次减一(也可以基于需求减去其他值),这样的目的是,期望尽快对于其key值所在的文件触发合并,减少底层过期的数据量。 |
|
|
|
|
|
|
|
当然,这样做对于此次的查找帮助不大,主要是为了以后再次查找时,能加快扫描速度。接下里我们可能会想,如何减少cache内减少IO操作的次数。先回忆整个cache的读取流程:获得布隆过滤器,若key值存在,从硬盘中读取IndexBlock,然后再从硬盘读取DataBlock。其中IndexBlock的设计在之前已经提到,为了节约空间开销,不能添加时间戳。因此我们只能依靠于bloomfilter类似的功能。这里我们以manifest的元数据作为过滤的基础。 |
|
|
|
|
|
|
|
我们为每个文件的元数据增加新字段 |
|
|
|
|
|
|
|
```cpp |
|
|
|
struct FileMetaData { |
|
|
|
FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0),oldest_ts(UINT64_MAX),newer_ts(0) {} |
|
|
|
|
|
|
|
int refs; |
|
|
|
int allowed_seeks; // Seeks allowed until compaction |
|
|
|
uint64_t number; |
|
|
|
uint64_t file_size; // File size in bytes |
|
|
|
InternalKey smallest; // Smallest internal key served by table |
|
|
|
InternalKey largest; // Largest internal key served by table |
|
|
|
TIMESTAMP oldest_ts; //新字段:该文件内最小的时间戳, |
|
|
|
TIMESTAMP newer_ts; //新字段:该文件内最大的时间戳 |
|
|
|
}; |
|
|
|
``` |
|
|
|
|
|
|
|
接下来就是对相关的MANIFEST文件的元数据写入进行修改以及在写入key,value对时,对时间戳更新。包括`VersionEdit::AddFile`,`VersionEdit::EncodeTo`Status `VersionEdit::DecodeFrom`,`builder.h:Status BuildTable`这里不在报告中举出。然后最后的过滤操作,则在`Status Version::Get`内的`match`函数 |
|
|
|
|
|
|
|
```cpp |
|
|
|
//1.调用Status Version::Get(const ReadOptions& options, const LookupKey& k, |
|
|
|
// std::string* value, GetStats* stats)接口,进入非memtable部分查找 |
|
|
|
//2调用void Version::ForEachOverlapping(Slice user_key, Slice internal_key, void* arg, |
|
|
|
// bool (*func)(void*, int, FileMetaData*))查找涉及到的key以及对应的文件元数据 |
|
|
|
//3.调用match函数,寻址 |
|
|
|
|
|
|
|
static bool Match(void* arg, int level, FileMetaData* f) { |
|
|
|
State* state = reinterpret_cast<State*>(arg); |
|
|
|
if (state->stats->seek_file == nullptr && state->last_file_read != nullptr) { |
|
|
|
// We have had more than one seek for this read. Charge the 1st file. |
|
|
|
state->stats->seek_file = state->last_file_read; |
|
|
|
state->stats->seek_file_level = state->last_file_read_level; |
|
|
|
} |
|
|
|
state->last_file_read = f; |
|
|
|
state->last_file_read_level = level; |
|
|
|
if(state->stats->now_ts > f->newer_ts)return false;//在访问缓存池前,判断是否过期 |
|
|
|
state->s = state->vset->table_cache_->Get(*state->options, f->number, f->file_size, state->ikey,&state->saver, SaveValue); |
|
|
|
// other code... |
|
|
|
} |
|
|
|
``` |
|
|
|
|
|
|
|
这里有尝试`if(state->stats->now_ts > f->newer_ts)return false;`根据文件内的最新时间戳来进行过滤。但是实际上,对于多次的批量插入以及一个batch内不同的TTL,在合并后,我们无法很好的预测出一个文件的新旧情况,即每一个文件的元数据的new_ts有可能都十分大,最终还是要进入cache内查找。 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
**经过刚刚的修改,我们初步完成了读取功能,第一个测试点经过一些修改也能顺利通过了。** |
|
|
|
|
|
|
|
在这时曾发现ttl_test启动后,无论如何修改ReadTTL都显示 failed,经过助教提醒发现是随机种子的问题,如果采用`static_cast<unsigned int>(time(0))`根据当前的时间戳生成随机数,那么`InsertData`插入数据的 key 和查找时所使用的 key 不一致导致读取失败。解决的办法就是`srand(0)`固定随机种子。 |
|
|
|
|
|
|
|
ReadTTL之后,CompactionTTL还未开始,直接显示 open DB failed,出现这个原因是ReadTTL之后打开的数据库没有清除,只需要在ReadTTL的末尾加上`delete db;`就解决了问题。 |
|
|
|
|
|
|
|
还有就是`delete db;`之后需要睡眠一秒,否则还是会出现read DB failed。这里的另外一个问题就是ReadTTL测试用例在我的电脑上一直出现open db fail,但是在另一位队友的电脑上就不会,这是因为在我这里程序跑的太慢了,超过它所设定的20s,这里酌情提高之后就能解决了。 |
|
|
|
|
|
|
|
### 4.3支持TTL的Compaction |
|
|
|
|
|
|
|
在数据Compaction过程中,需要删除过期的数据。大合并操作都调用`DoCompactionWork`进行,结合之前的授课我们可以知道它本身就会检查数据是否有效,因此我们只需要在此多加入一些关于TTL判断的逻辑就行。比较巧的是,`DoCompactionWork`自带drop变量来标记数据是否需要被丢弃,我们只需要利用好它就行。 |
|
|
|
|
|
|
|
```cpp |
|
|
|
Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
|
// other code... |
|
|
|
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { |
|
|
|
// other code... |
|
|
|
if (!ParseInternalKey(key, &ikey)) { |
|
|
|
// other code... |
|
|
|
} else { |
|
|
|
if (!has_current_user_key || |
|
|
|
user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) != |
|
|
|
0) { |
|
|
|
// other code... |
|
|
|
} |
|
|
|
if (last_sequence_for_key <= compact->smallest_snapshot) { |
|
|
|
// other code... |
|
|
|
} else if (ikey.type == kTypeDeletion && |
|
|
|
ikey.sequence <= compact->smallest_snapshot && |
|
|
|
compact->compaction->IsBaseLevelForKey(ikey.user_key)) { |
|
|
|
// other code... |
|
|
|
}else if((ts = DecodeFixed64(input->value().data() + input->value().size() - kTSLength)) < env_->NowMicros()){ |
|
|
|
// TTL |
|
|
|
drop = true; |
|
|
|
} |
|
|
|
|
|
|
|
last_sequence_for_key = ikey.sequence; |
|
|
|
} |
|
|
|
// other code... |
|
|
|
return status; |
|
|
|
} |
|
|
|
``` |
|
|
|
|
|
|
|
其实判断的逻辑和之前`Get`和`Put`时候是一样的,如果当前时间超出了过期时间,则将 drop 标记为 true,表示丢弃该数据;若当前时间尚未超出过期时间则将数据视为有效的。 |
|
|
|
|
|
|
|
这里有一个小问题是TEST中无法完全删除所有过期数据。具体的例子如下,未触发手动合并前:level0文件数量:31个;level1数量:1;level2数量:1个;然后手动合并后,它会先尝试合并level0数据,然后在合并的过程中,它会将level0和level1的所有数据都当做input,放入一个mergeIterator。因为时间过期了,它会将这些文件全删掉,只剩下level2的一个文件。然后因为此时我们最大的遍历层只能到level1,level2的数据删除依赖于level1在合并时,其key的范围覆盖到level2,然后删除。此外,即使level1还剩下一个文件,也需要其所有。对于这个问题的解决,我们最开始是修改`kL0_CompactionTrigger `等参数,加大level0的文件数量,使得所有数据都放在level0中,这样可以确保所有数据都在compaction中作为input,但这会导致在今后正常运行时(非测试情况),其IO开销较大(因为level0层会覆盖很多key)。 |
|
|
|
|
|
|
|
在这里经过助教提示发现了一个小问题,就是手动合并可能无法保证合并所有数据,导致无法完全丢弃过期数据。解决方法就是修改 CompactRange 的流程使得所有数据得到合并。原始代码并没有去合并最大的含有文件的level(max_level_with_files)的文件,只合并了在此之前的level,我们的修改是这样的: |
|
|
|
|
|
|
|
```cpp |
|
|
|
void DBImpl::CompactRange(const Slice* begin, const Slice* end) { |
|
|
|
// other code... |
|
|
|
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap |
|
|
|
for (int level = 0; level < max_level_with_files; level++) { |
|
|
|
TEST_CompactRange(level, begin, end); |
|
|
|
} |
|
|
|
TEST_CompactRange(max_level_with_files, begin, end); |
|
|
|
} |
|
|
|
``` |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
## 5.测试和结果 |
|
|
|
|
|
|
|
### 5.1测试用例 |
|
|
|
|
|
|
|
```cpp |
|
|
|
#include "gtest/gtest.h" |
|
|
|
#include "leveldb/env.h" |
|
|
|
#include "leveldb/db.h" |
|
|
|
|
|
|
|
using namespace leveldb; |
|
|
|
|
|
|
|
constexpr int value_size = 2048; |
|
|
|
constexpr int data_size = 128 << 20; |
|
|
|
|
|
|
|
Status OpenDB(std::string dbName, DB **db) { |
|
|
|
Options options; |
|
|
|
options.create_if_missing = true; |
|
|
|
return DB::Open(options, dbName, db); |
|
|
|
} |
|
|
|
|
|
|
|
void InsertData(DB *db, uint64_t ttl/* second */) { |
|
|
|
WriteOptions writeOptions; |
|
|
|
int key_num = data_size / value_size; |
|
|
|
srand(0); |
|
|
|
|
|
|
|
for (int i = 0; i < key_num; i++) { |
|
|
|
int key_ = rand() % key_num+1; |
|
|
|
std::string key = std::to_string(key_); |
|
|
|
std::string value(value_size, 'a'); |
|
|
|
db->Put(writeOptions, key, value, ttl); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
void GetData(DB *db, int size = (1 << 30)) { |
|
|
|
ReadOptions readOptions; |
|
|
|
int key_num = data_size / value_size; |
|
|
|
|
|
|
|
// 点查 |
|
|
|
srand(0); |
|
|
|
for (int i = 0; i < 100; i++) { |
|
|
|
int key_ = rand() % key_num+1; |
|
|
|
std::string key = std::to_string(key_); |
|
|
|
std::string value; |
|
|
|
db->Get(readOptions, key, &value); |
|
|
|
} |
|
|
|
} |
|
|
|
// |
|
|
|
TEST(TestTTL, ReadTTL) { |
|
|
|
DB *db; |
|
|
|
if(OpenDB("testdb", &db).ok() == false) { |
|
|
|
std::cerr << "open db failed" << std::endl; |
|
|
|
abort(); |
|
|
|
} |
|
|
|
// 如果出现open db fail,请酌情提高这里 |
|
|
|
uint64_t ttl = 20; |
|
|
|
|
|
|
|
InsertData(db, ttl); |
|
|
|
|
|
|
|
ReadOptions readOptions; |
|
|
|
Status status; |
|
|
|
int key_num = data_size / value_size; |
|
|
|
srand(0); |
|
|
|
for (int i = 0; i < 100; i++) { |
|
|
|
int key_ = rand() % key_num+1; |
|
|
|
std::string key = std::to_string(key_); |
|
|
|
std::string value; |
|
|
|
status = db->Get(readOptions, key, &value); |
|
|
|
ASSERT_TRUE(status.ok()); |
|
|
|
} |
|
|
|
|
|
|
|
Env::Default()->SleepForMicroseconds(ttl * 1000000); |
|
|
|
|
|
|
|
for (int i = 0; i < 100; i++) { |
|
|
|
int key_ = rand() % key_num+1; |
|
|
|
std::string key = std::to_string(key_); |
|
|
|
std::string value; |
|
|
|
status = db->Get(readOptions, key, &value); |
|
|
|
ASSERT_FALSE(status.ok()); |
|
|
|
} |
|
|
|
delete db; |
|
|
|
Env::Default()->SleepForMicroseconds( 1000); |
|
|
|
} |
|
|
|
|
|
|
|
TEST(TestTTL, CompactionTTL) { |
|
|
|
DestroyDB("testdb", Options()); |
|
|
|
DB *db; |
|
|
|
if(OpenDB("testdb", &db).ok() == false) { |
|
|
|
std::cerr << "open db failed" << std::endl; |
|
|
|
abort(); |
|
|
|
} |
|
|
|
|
|
|
|
uint64_t ttl = 20; |
|
|
|
InsertData(db, ttl); |
|
|
|
|
|
|
|
leveldb::Range ranges[1]; |
|
|
|
ranges[0] = leveldb::Range("-", "A"); |
|
|
|
uint64_t sizes[1]; |
|
|
|
db->GetApproximateSizes(ranges, 1, sizes); |
|
|
|
ASSERT_GT(sizes[0], 0); |
|
|
|
|
|
|
|
Env::Default()->SleepForMicroseconds(ttl * 1000000); |
|
|
|
|
|
|
|
db->CompactRange(nullptr, nullptr); |
|
|
|
|
|
|
|
leveldb::Range ranges2[1]; |
|
|
|
ranges2[0] = leveldb::Range("-", "A"); |
|
|
|
uint64_t sizes2[1]; |
|
|
|
db->GetApproximateSizes(ranges2, 1, sizes2); |
|
|
|
ASSERT_EQ(sizes2[0], 0); |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
int main(int argc, char** argv) { |
|
|
|
// All tests currently run with the same read-only file limits. |
|
|
|
testing::InitGoogleTest(&argc, argv); |
|
|
|
return RUN_ALL_TESTS(); |
|
|
|
} |
|
|
|
``` |
|
|
|
|
|
|
|
### 5.2实验结果 |
|
|
|
|
|
|
|
在我的电脑上,我适当调大了ReadTTL中TTL之后(调整到了50),结果如下所示: |
|
|
|
|
|
|
|
![image-20241107082315854](assets/image-20241107082315854.png) |
|
|
|
|
|
|
|
## 5.3其他可能出现的问题: |
|
|
|
|
|
|
|
可能是因为test文件内的数据准备以及硬件的运行速度,有些问题有可能会发生。 |
|
|
|
|
|
|
|
```cpp |
|
|
|
inline bool DBIter::ParseKey(ParsedInternalKey* ikey) { |
|
|
|
Slice k = iter_->key(); |
|
|
|
|
|
|
|
size_t bytes_read = k.size() + iter_->value().size(); |
|
|
|
while (bytes_until_read_sampling_ < bytes_read) { |
|
|
|
bytes_until_read_sampling_ += RandomCompactionPeriod(); |
|
|
|
db_->RecordReadSample(k); |
|
|
|
} |
|
|
|
|
|
|
|
//...... |
|
|
|
void DBImpl::RecordReadSample(Slice key) { |
|
|
|
MutexLock l(&mutex_); |
|
|
|
if (versions_->current()->RecordReadSample(key)) { |
|
|
|
MaybeScheduleCompaction(); |
|
|
|
} |
|
|
|
} |
|
|
|
``` |
|
|
|
|
|
|
|
上述过程是在读取的过程中发生的,当发现key跨文件存在后,它也会触发合并。这就有可能导致,对于testRead文件,如果ttl设置的比较小,在读取key1(file1内)时,发现它跨文件存在于file2,然后触发合并,但是值key2也在file2内,此时有可能过期。这就导致在读取key2前,就已经触发合并,将过期数据删除,这样会导致test报错。 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
## 6.本实验ttl实现与已有ttl实现的比较 |
|
|
|
|
|
|
|
**RocksDB** : |
|
|
|
|
|
|
|
本身支持一种数据过期淘汰方案,该方案是通过特定的 API 打开 DB,对写入该 DB 的全部 key 都遵循一个 TTL 过期策略,例如 TTL 为 3 天,那么写入该 DB 的 key 都会在写入的三天后自动过期。但是该方案不灵活,无法针对每一条 key 设置 TTL。 |
|
|
|
|
|
|
|
此外,rocksdb该方案底层也是通过 compaction filter 实现的,也就是说过期数据虽然对用户不可见,但是磁盘空间并不会及时回收,根本原因就是数据堆积在某一层,而该层没有触发 compaction,那么我们可以手动调用 RocksDB 的 CompactionRange 函数,来触发 compaction filter,达到快速回收磁盘空间的目的。但是主动调用 CompactionRange 会导致 RocksDB 自身的 compaction 暂停,这会触发 Write Stall,造成非常严重的后果。 |
|
|
|
|
|
|
|
**Parker**: |
|
|
|
|
|
|
|
采用`Periodic compaction + dynamic compaction`。Periodic compaction 的主要原理是增加一个 periodic_compaction_seconds 参数,并记录每个SST 文件的创建时间,每隔 periodic_compaction_seconds 秒,主动对这个 SST 文件进行 compaction 操作,从而回收沉底的 SST 文件;而 dynamic compaction 则是通过设置level_compaction_dynamic_level_bytes 为 true,进行动态合并,而不是按 level 的顺序合并到下一层,这使得 compaction 更加频繁。 |
|
|
|
|
|
|
|
**业内有赞 KV :** |
|
|
|
|
|
|
|
具体做法是,保持现有存储列族不变,另外增加一个列族,专门存储 key 的 TTL,然后通过一个 goroutine 根据当前时间戳,按照过期数据删除策略可以是定时触发,例如凌晨1点,或者每个一段时间触发等。该列族中 key 的编码规则如下: |
|
|
|
|
|
|
|
- key:时间戳 + key 的类型 + key 值 |
|
|
|
- value:存储一个字节,代表不同数据类型 |
|
|
|
|
|
|
|
这个方案优点是回收速度快且回收时间可控,但是缺点就是实现复杂,极端情况下会降低 50% 的TPS。 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
**本实验的ttl** |
|
|
|
|
|
|
|
在数据的 value 的尾部添加八个字节的 TTL,实现灵活。 |
|
|
|
|
|
|
|
![屏幕截图 2024-11-07 110211](./assets/key_value_pair.jpg) |
|
|
|
|
|
|
|
基于ttl过期的case,显著降低allowed_seek次数,使得对于指定的过期文件,能够加快合并速度。 |
|
|
|
|
|
|
|
## 7.实验总结和思考 |
|
|
|
|
|
|
|
通过本次实验,我们对LevelDB有了更深入的理解。超脱于之前的理论讲解,我们透过代码掌握了各个部分是如何实现的,也对整体样貌再次有了一个概览,为日后的数据管理系统的学习打下了坚实基础。 |