@ -1,9 +1,407 @@ | |||||
# 在LevelDB中实现TTL功能 | |||||
**本仓库提供TTL基本的测试用例** | |||||
### 10225102463 李畅 | |||||
### 10225101440 韩晨旭@ArcueidType | |||||
克隆代码: | |||||
# 实验要求 | |||||
```bash | |||||
git clone --recurse-submodules https://gitea.shuishan.net.cn/building_data_management_systems.Xuanzhou.2024Fall.DaSE/leveldb_base.git | |||||
+ 在LevelDB中实现键值对的`TTL(Time-To-Live)`功能,使得过期的数据在**读取**时自动失效,并在适当的时候被**合并**清理。 | |||||
+ 修改LevelDB的源码,实现对`TTL`的支持,包括数据的写入、读取和过期数据的清理。 | |||||
+ 编写测试用例,验证`TTL`功能的正确性和稳定性。 | |||||
# 1. 设计思路和实现过程 | |||||
## 1.1 设计思路 | |||||
### Phase 0 | |||||
在LevelDB中实现`TTL`功能主要涉及数据的**读、写、合并**。 | |||||
在**写入**数据时,`TTL`功能是个可选项。从代码层面来说,LevelDB数据的写入调用了`Put`函数接口: | |||||
``` | |||||
// 假设增加一个新的Put接口,包含TTL参数, 单位(秒) | |||||
Status DB::Put(const WriteOptions& opt, const Slice& key, | |||||
const Slice& value, uint64_t ttl); | |||||
// 如果调用的是原本的Put接口,那么就不会失效 | |||||
Status DB::Put(const WriteOptions& opt, const Slice& key, | |||||
const Slice& value); | |||||
``` | |||||
这段代码中,如果存入`TTL`参数,则调用本实验中新实现的`Put`函数接口;否则直接调用原有的`Put`接口。 | |||||
### Phase 1 | |||||
本小组的思路很简明: | |||||
+ 直接将`TTL`信息在**写入**阶段添加到原有的数据结构中 | |||||
+ 在**读取**和**合并**时从得到的数据中 **解读** 出其存储的`TTL`信息,判断是否过期 | |||||
+ 在手动合并中对最后有文件的层(max_level_with_files)进行扫描,清除其中的过期数据 | |||||
因此,接下来需要思考的是如何将`TTL`信息巧妙地存入LevelDB的数据结构中。 | |||||
### Phase 2 | |||||
由于插入数据时调用的`Put`接口只有三个参数,其中`opt`是写入时的系统配置,实际插入的数据只有`key/value`,因此存储`ttl`信息时,最简单的方法就是存入`key`或`value`中。 | |||||
在这样的方法中,可以调用原有的`Put`函数,将含义`ttl`信息的`key/value`数据存入数据库。 | |||||
经过讨论后,本小组选择将`ttl`信息存入`value`中。 | |||||
这样做的优缺点是: | |||||
+ **优点**:由于LevelDB在合并数据的过程中,需要根据`SSTable`的`key`对数据进行有序化处理,将`ttl`信息存储在`value`中不会影响`key`的信息,因此对已有的合并过程不产生影响 | |||||
+ **缺点**:在获取到`SSTable`的`key`时,无法直接判断该文件是否因为`ttl`而成为过期数据,仍然需要读取对应的`value`才能判断,多了一步读取开销。***但是***,实际上在读取数据以及合并数据的过程中,其代码实际上都读取了对应`SSTable`中存储的`value`信息,因此获取`ttl`信息时必要的读取`value`过程并不是多余的,实际***几乎不***造成额外的读取开销影响 | |||||
### Phase 3 | |||||
在确定插入数据时选用的方法后,读取和合并的操作只需要在获取数据文件的`value`后解读其包含的`ttl`信息,并判断是否过期就可以了。 | |||||
## 1.2 实现过程 | |||||
### 1.2.1 写入 Put | |||||
首先需要实现的是将`ttl`信息存入`value`的方法。`Put`中获取的`ttl`参数是该数据文件的**生存时间(单位:秒)**。本小组对此进行处理方法是通过`ttl`计算过期时间的**时间戳**,转码为字符串类型后存入`value`的最前部,并用`|`符号与原来的值分隔开。 | |||||
对于不使用`ttl`的数据文件,存入`0`作为`ttl`,在读取数据时若读到`0`则表示不使用`TTL`功能。 | |||||
将此功能封装为**编码**和**解码**文件过期时间(DeadLine)的函数,存储在`/util/coding.h`文件中: | |||||
```c++ | |||||
inline std::string EncodeDeadLine(uint64_t ddl, const Slice& value) { // 存储ttl信息 | |||||
return std::to_string(ddl) + "|" + value.ToString(); | |||||
} | |||||
inline void DecodeDeadLineValue(std::string* value, uint64_t& ddl) { // 解读ttl信息 | |||||
auto separator = value->find_first_of("|"); | |||||
std::string ddl_str = value->substr(0, separator); | |||||
ddl = std::atoll(ddl_str.c_str()); | |||||
*value = value->substr(separator + 1); | |||||
} | |||||
``` | |||||
在写入数据调用`Put`接口时,分别启用`EncodeDeadLine`函数存储`ttl`信息: | |||||
```c++ | |||||
// Default implementations of convenience methods that subclasses of DB | |||||
// can call if they wish | |||||
// TTL: Update TTL Encode | |||||
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { // 不使用TTL | |||||
WriteBatch batch; | |||||
batch.Put(key, EncodeDeadLine(0, value)); | |||||
return Write(opt, &batch); | |||||
} | |||||
// TTL: Put methods for ttl | |||||
Status DB::Put(const WriteOptions& options, const Slice& key, | |||||
const Slice& value, uint64_t ttl) { // 使用TTL | |||||
WriteBatch batch; | |||||
auto dead_line = std::time(nullptr) + ttl; // 计算过期时间的时间戳 | |||||
batch.Put(key, EncodeDeadLine(dead_line, value)); | |||||
return Write(options, &batch); | |||||
} | |||||
``` | |||||
### 1.2.2 读取 Get | |||||
LevelDB在读取数据时,调用`Get`接口,获取文件的`key/value`,因此只需要在`Get`函数中加入使用`TTL`功能的相关代码: | |||||
```c++ | |||||
Status DBImpl::Get(const ReadOptions& options, const Slice& key, | |||||
std::string* value) { | |||||
(......) | |||||
// Unlock while reading from files and memtables | |||||
{ | |||||
mutex_.Unlock(); | |||||
// First look in the memtable, then in the immutable memtable (if any). | |||||
LookupKey lkey(key, snapshot); | |||||
if (mem->Get(lkey, value, &s)) { | |||||
// Done | |||||
} else if (imm != nullptr && imm->Get(lkey, value, &s)) { | |||||
// Done | |||||
} else { | |||||
s = current->Get(options, lkey, value, &stats); | |||||
have_stat_update = true; | |||||
} | |||||
// TTL: Get the true value and make sure the data is still living | |||||
if(!value->empty()) { | |||||
uint64_t dead_line; | |||||
DecodeDeadLineValue(value, dead_line); | |||||
if (dead_line != 0) { | |||||
// use TTL | |||||
if (std::time(nullptr) >= dead_line) { | |||||
// data expired | |||||
*value = ""; | |||||
s = Status::NotFound("Data expired"); | |||||
} | |||||
} else { | |||||
// TTL not set | |||||
} | |||||
} | |||||
mutex_.Lock(); | |||||
} | |||||
(......) | |||||
} | |||||
``` | |||||
若使用了`TTL`功能,则当文件过期时,返回`NotFound("Data expired")`的信息,即“数据已清除”。 | |||||
**注意**:由于LevelDB对于数据的读取是只读`ReadOnly`的,因此只能返回`NotFound`的信息,而无法真正清理过期数据。 | |||||
### 1.2.3 合并 Compaction | |||||
在合并的过程中,需要做到清理掉过期的数据,释放空间。 | |||||
在大合并的过程中,需要调用`DoCompactionWorks`函数实现合并的操作,也是在这个过程中,LevelDB得以真正完成清理旧版本数据、已删除数据并释放空间的过程。 | |||||
其实现逻辑是在该函数的过程中引入一个布尔变量`drop`,对于需要清理的数据设置`drop`为`True`,而需要保留的数据则是`drop`为`False`,最后根据`drop`的值清理过期数据,并将需要保留的数据合并写入新的`SSTable`。 | |||||
因此,我们只需要在判断`drop`为`True`的条件中加入对过期时间(DeadLine)的判断就可以实现`TTL`功能的清理过期数据了: | |||||
```c++ | |||||
Status DBImpl::DoCompactionWork(CompactionState* compact) { | |||||
(......) | |||||
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { | |||||
(......) | |||||
// Handle key/value, add to state, etc. | |||||
bool drop = false; | |||||
if (!ParseInternalKey(key, &ikey)) { | |||||
// Do not hide error keys | |||||
current_user_key.clear(); | |||||
has_current_user_key = false; | |||||
last_sequence_for_key = kMaxSequenceNumber; | |||||
} else { | |||||
if (!has_current_user_key || | |||||
user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) != | |||||
0) { | |||||
// First occurrence of this user key | |||||
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size()); | |||||
has_current_user_key = true; | |||||
last_sequence_for_key = kMaxSequenceNumber; | |||||
} | |||||
std::string value = input->value().ToString(); | |||||
uint64_t ddl; | |||||
DecodeDeadLineValue(&value, ddl); | |||||
if (last_sequence_for_key <= compact->smallest_snapshot) { | |||||
(......) | |||||
} else if (ddl <= std::time(nullptr)) { // 根据ttl判断是否为过期数据 | |||||
// TTL: data expired | |||||
drop = true; | |||||
} | |||||
last_sequence_for_key = ikey.sequence; | |||||
} | |||||
(......) | |||||
input->Next(); | |||||
} | |||||
(......) | |||||
} | |||||
``` | |||||
理论上,`Compaction`已经实现了合并中清除过期数据的功能,但由于原本leveldb是按照没有TTL功能的逻辑设计的,因此其对数据的清除并不完全,也会造成测试不通过。 | |||||
LevelDB中`Compaction`的逻辑是选中特定层(`level`)合并,假设为`level n`。在`level n`中找目标文件`SSTable A`(假设该次触发的合并从文件A开始),并确定`level n`以及`level n+1`中与`SSTable A`包含的数据(`key/value`)有`key`发生重复(`overlap`)的所有文件,合并后产生新的`SSTable B`放入`level n+1`层中。 | |||||
然而由`DoCompactionWorks`代码可知,不参与合并的文件即使过期了也无法被清理。 | |||||
例如,`level n+1`层中有含有过期数据的`SSTable C`,但由于`level n+1`是最后一个含有文件的层,即使是`CompactRange(nullptr, nullptr)`,对所有数据合并,其只将`level n`的所有文件与`level n+1`中与上层有重叠的文件合并,因此`SSTable C`不会被清理。 | |||||
`CompactRange(nullptr, nullptr)`应该能够合并所有数据,也就是可以清除所有过期数据,而无法被清理的`SSTable C`明显是个例外,是个错误。 | |||||
从代码层面看导致该错误的原因。原来的代码: | |||||
```c++ | |||||
void DBImpl::CompactRange(const Slice* begin, const Slice* end) { | |||||
int max_level_with_files = 1; | |||||
{ | |||||
MutexLock l(&mutex_); | |||||
Version* base = versions_->current(); | |||||
for (int level = 1; level < config::kNumLevels; level++) { | |||||
if (base->OverlapInLevel(level, begin, end)) { | |||||
max_level_with_files = level; | |||||
} | |||||
} | |||||
} | |||||
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap | |||||
for (int level = 0; level < max_level_with_files; level++) { | |||||
TEST_CompactRange(level, begin, end); | |||||
} | |||||
} | |||||
void DBImpl::TEST_CompactRange(int level, const Slice* begin, | |||||
const Slice* end) { | |||||
assert(level >= 0); | |||||
assert(level + 1 < config::kNumLevels); | |||||
(......) | |||||
} | |||||
``` | |||||
这里注意两个数:`config::kNumLevels`和`max_level_with_files`: | |||||
+ config::kNumLevels:是LevelDB在启动时设定的数,表示总共使用的`level`层数。默认值为7 | |||||
+ max_level_with_files:表示含有`SSTable`文件的最高层的编号。注意,这里的`level`编号是从0开始的,因此`max_level_with_files`理论最大值是`config::kNumLevels - 1` | |||||
可以看出与上述例子相符,对最后有文件的那一层不会进行全范围的合并,因此产生漏网之鱼,而最直接想到的解决办法就是让手动合并时也对这最后一层做一次全范围的合并,就可以清除所有的过期数据。 | |||||
这样只需要让循环多走一层: | |||||
```c++ | |||||
for (int level = 0; level < max_level_with_files + 1; level++) { | |||||
TEST_CompactRange(level, begin, end); | |||||
} | |||||
``` | |||||
但是这种方法仍然存在两个问题: | |||||
1. 最后有文件的一层(max_level_with_files)可能并没有达到合并的条件,也就是说其不应该被合并到下一层中,这也同样符合leveldb原本的逻辑,但是这样的做法会导致其合并至下一层。 | |||||
2. 当存储数据已经堆积到整个leveldb的最底层时(即已达到`kNumLevels`),按照上述代码执行会导致致命错误并使数据库崩溃。 | |||||
防止最底层的数据被合并的函数调用是唯一的解决办法,但是这个做法又回到了上一个问题,最底层的过期数据可能始终无法被完全清除。 | |||||
### 1.2.4 合并的最终解决方案 | |||||
为了完全解决这个问题并让`CompactRange`正常工作,我们提出了一个新的方案: | |||||
对于最后有文件的一层(即max_level_with_files),我们进行单独的处理,去除掉其中的过期数据,但不将其推向下一层。 | |||||
实现上,去除一层(e.g. level n)中的过期数据,实际上等价于将`level n`与一个空的层合并,再将合并的结果输出回`level n`。 | |||||
因此,我们可以直接利用leveldb中被我们修改过的合并功能(合并中drop过期数据)来实现清除一层中的过期数据。 | |||||
> CompactRange函数用于手动合并,对于leveldb的自动合并,我们并不需要强迫其清除其触碰不到的数据,因此只需要修改手动合并的部分 | |||||
在实现上,引入参数`is_last_level`表示当前是否在处理`max_level_with_files`,如果是则按照上述逻辑清除该层过期数据,反之按照原本的合并正常进行 | |||||
在结构体`ManualCompaction`和类`Compaction`中引入新的成员`is_max_level`: | |||||
```c++ | |||||
struct ManualCompaction { | |||||
int level; | |||||
bool is_last_level; // TTL: Used to check if last level with files | |||||
bool done; | |||||
const InternalKey* begin; // null means beginning of key range | |||||
const InternalKey* end; // null means end of key range | |||||
InternalKey tmp_storage; // Used to keep track of compaction progress | |||||
}; | |||||
class Compaction { | |||||
public: | |||||
(...) | |||||
bool is_last_level() const {return is_last_level_; } | |||||
(...) | |||||
private: | |||||
(...) | |||||
bool is_last_level_; | |||||
(...) | |||||
} | |||||
``` | |||||
给方法`TEST_CompactRange`和`VersionSet::CompactRange`增加一个参数`is_last_level`,用来传递构造结构体`ManualCompaction`和类`Compaction`的相应值。 | |||||
在`CompactRange`的上述循环中调用`TEST_CompactRange`时,会将`is_last_level`传给该方法 | |||||
```c++ | |||||
for (int level = 0; level < max_level_with_files + 1; level++) { | |||||
TEST_CompactRange(level, begin, end, level == max_level_with_files); | |||||
} | |||||
``` | |||||
在清除当前层过期数据的操作中,compaction中的inputs[1]应该为空,因此在`VersionSet::SetupOtherInputs`方法中,跳过对inputs[1]的填写: | |||||
```c++ | |||||
if (!c->is_last_level()) { | |||||
current_->GetOverlappingInputs(level + 1, &smallest, &largest, | |||||
&c->inputs_[1]); | |||||
AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]); | |||||
} | |||||
``` | ``` | ||||
接下来我们要做的是让合并的结果输出到当前层`level`而不是`level+1` | |||||
需要修改两处: | |||||
1. DBImpl::DoCompactionWork中,stats_应该add在当前level上 | |||||
2. DBImpl::InstallCompactionResults中,将结果写入当前level中 | |||||
代码: | |||||
```c++ | |||||
Status DBImpl::DoCompactionWork(CompactionState* compact) { | |||||
(...) | |||||
if (!compact->compaction->is_last_level()) { | |||||
stats_[compact->compaction->level() + 1].Add(stats); | |||||
} else { | |||||
// TTL: compaction for last level | |||||
stats_[compact->compaction->level()].Add(stats); | |||||
} | |||||
(...) | |||||
} | |||||
Status DBImpl::InstallCompactionResults(CompactionState* compact) { | |||||
(...) | |||||
if (!compact->compaction->is_last_level()) { | |||||
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, | |||||
out.smallest, out.largest); | |||||
} else { | |||||
// TTL: outputs of last level compaction should be writen to last level itself | |||||
compact->compaction->edit()->AddFile(level, out.number, out.file_size, | |||||
out.smallest, out.largest); | |||||
} | |||||
(...) | |||||
} | |||||
``` | |||||
通过以上修改后即解决了`CompactRange`的问题,使其能够正常工作 | |||||
## 2. 测试用例和测试结果 | |||||
### TestTTL.ReadTTL | |||||
`TestTTL`中第一个测试样例,检测插入和读取数据时,读取过期数据能够正确返回`NotFound`。 | |||||
![ReadTTL](img/test1_succ.png) | |||||
### TestTTL.CompactionTTL | |||||
`TestTTL`中第二个测试样例,检测插入和合并数据,能够在手动触发所有数据的合并时清除所有过期数据。 | |||||
![CompactionTTL](img/test2_succ.png) | |||||
### TestTTL.LastLevelCompaction | |||||
在解决了上面遇到的问题之后,本小组添加了一个测试样例,用来检测在含义`SSTable`的最底层能否正确合并、清理过期数据。 | |||||
在该样例中将`kNumLevels`设置为`3`可以保证(同时Assert确认正确性)最后一层中存在数据,我们的实现仍然可以通过测试: | |||||
> main分支中为原始配置,即kNumLevels = 7, 其层数过大,导致要测试最底层含有数据非常耗时,因此我们在其他分支中提供了kNumLevels = 3配置的实现用于测试 | |||||
`light_ver`分支:正确的合并实现,能通过测试 | |||||
![LastLevelCompaction](img/test3_succ.png) | |||||
而采用上述直接添加一层循环并且跳过最底层避免崩溃的方法则无法通过测试: | |||||
`naive_version`分支:简单增加一层循环并跳过最底层合并,无法清除最底层过期数据,因此无法通过测试 | |||||
![LastLevelCompactionFail](img/test3_failure.png) | |||||
最后,展示三个测试样例一起运行的结果: | |||||
![All_pass](img/all_pass.jpg) |
@ -0,0 +1,307 @@ | |||||
<center><font size=7>在LevelDB中实现TTL功能</font></center> | |||||
<center><font size=4>10225102463 李畅 10225101___ 韩晨旭</font></center> | |||||
# 实验要求 | |||||
+ 在LevelDB中实现键值对的`TTL(Time-To-Live)`功能,使得过期的数据在**读取**时自动失效,并在适当的时候被**合并**清理。 | |||||
+ 修改LevelDB的源码,实现对`TTL`的支持,包括数据的写入、读取和过期数据的清理。 | |||||
+ 编写测试用例,验证`TTL`功能的正确性和稳定性。*(Optional)* | |||||
# 1. 设计思路和实现过程 | |||||
## 1.1 设计思路 | |||||
### Phase 0 | |||||
在LevelDB中实现`TTL`功能主要涉及数据的**读、写、合并**。 | |||||
在**写入**数据时,`TTL`功能是个可选项。从代码层面来说,LevelDB数据的写入调用了`Put`函数接口: | |||||
``` | |||||
// 假设增加一个新的Put接口,包含TTL参数, 单位(秒) | |||||
Status DB::Put(const WriteOptions& opt, const Slice& key, | |||||
const Slice& value, uint64_t ttl); | |||||
// 如果调用的是原本的Put接口,那么就不会失效 | |||||
Status DB::Put(const WriteOptions& opt, const Slice& key, | |||||
const Slice& value); | |||||
``` | |||||
这段代码中,如果存入`TTL`参数,则调用本实验中新实现的`Put`函数接口;否则直接调用原有的`Put`接口。 | |||||
### Phase 1 | |||||
本小组的思路很简明: | |||||
+ 直接将`TTL`信息在**写入**阶段添加到原有的数据结构中 | |||||
+ 在**读取**和**合并**时从得到的数据中 **解读** 出其存储的`TTL`信息,判断是否过期 | |||||
因此,接下来需要思考的是如何将`TTL`信息巧妙地存入LevelDB的数据结构中。 | |||||
### Phase 2 | |||||
由于插入数据时调用的`Put`接口只有三个参数,其中`opt`是写入时的系统配置,实际插入的数据只有`key/value`,因此存储`ttl`信息时,最简单的方法就是存入`key`或`value`中。 | |||||
在这样的方法中,可以调用原有的`Put`函数,将含义`ttl`信息的`key/value`数据存入数据库。 | |||||
经过讨论后,本小组选择将`ttl`信息存入`value`中。 | |||||
这样做的优缺点是: | |||||
+ **优点**:由于LevelDB在合并数据的过程中,需要根据`SSTable`的`key`对数据进行有序化处理,将`ttl`信息存储在`value`中不会影响`key`的信息,因此对已有的合并过程不产生影响 | |||||
+ **缺点**:在获取到`SSTable`的`key`时,无法直接判断该文件是否因为`ttl`而成为过期数据,仍然需要读取对应的`value`才能判断,多了一步读取开销。***但是***,实际上在读取数据以及合并数据的过程中,其代码实际上都读取了对应`SSTable`中存储的`value`信息,因此获取`ttl`信息时必要的读取`value`过程并不是多余的,实际***几乎不***造成额外的读取开销影响 | |||||
### Phase 3 | |||||
在确定插入数据时选用的方法后,读取和合并的操作只需要在获取数据文件的`value`后解读其包含的`ttl`信息,并判断是否过期就可以了。 | |||||
## 1.2 实现过程 | |||||
### 1.2.1 写入 Put | |||||
首先需要实现的是将`ttl`信息存入`value`的方法。`Put`中获取的`ttl`参数是该数据文件的**生存时间(单位:秒)**。本小组对此进行处理方法是通过`ttl`计算过期时间的**时间戳**,转码为字符串类型后存入`value`的最前部,并用`|`符号与原来的值分隔开。 | |||||
对于不使用`ttl`的数据文件,存入`0`作为`ttl`,在读取数据时若读到`0`则表示不使用`TTL`功能。 | |||||
将此功能封装为**编码**和**解码**文件过期时间(DeadLine)的函数,存储在`/util/coding.h`文件中: | |||||
``` | |||||
inline std::string EncodeDeadLine(uint64_t ddl, const Slice& value) { // 存储ttl信息 | |||||
return std::to_string(ddl) + "|" + value.ToString(); | |||||
} | |||||
inline void DecodeDeadLineValue(std::string* value, uint64_t& ddl) { // 解读ttl信息 | |||||
auto separator = value->find_first_of("|"); | |||||
std::string ddl_str = value->substr(0, separator); | |||||
ddl = std::atoll(ddl_str.c_str()); | |||||
*value = value->substr(separator + 1); | |||||
} | |||||
``` | |||||
在写入数据调用`Put`接口时,分别启用`EncodeDeadLine`函数存储`ttl`信息: | |||||
``` | |||||
// Default implementations of convenience methods that subclasses of DB | |||||
// can call if they wish | |||||
// TTL: Update TTL Encode | |||||
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { // 不使用TTL | |||||
WriteBatch batch; | |||||
batch.Put(key, EncodeDeadLine(0, value)); | |||||
return Write(opt, &batch); | |||||
} | |||||
// TTL: Put methods for ttl | |||||
Status DB::Put(const WriteOptions& options, const Slice& key, | |||||
const Slice& value, uint64_t ttl) { // 使用TTL | |||||
WriteBatch batch; | |||||
auto dead_line = std::time(nullptr) + ttl; // 计算过期时间的时间戳 | |||||
batch.Put(key, EncodeDeadLine(dead_line, value)); | |||||
return Write(options, &batch); | |||||
} | |||||
``` | |||||
### 1.2.2 读取 Get | |||||
LevelDB在读取数据时,调用`Get`接口,获取文件的`key/value`,因此只需要在`Get`函数中加入使用`TTL`功能的相关代码: | |||||
``` | |||||
Status DBImpl::Get(const ReadOptions& options, const Slice& key, | |||||
std::string* value) { | |||||
(......) | |||||
// Unlock while reading from files and memtables | |||||
{ | |||||
mutex_.Unlock(); | |||||
// First look in the memtable, then in the immutable memtable (if any). | |||||
LookupKey lkey(key, snapshot); | |||||
if (mem->Get(lkey, value, &s)) { | |||||
// Done | |||||
} else if (imm != nullptr && imm->Get(lkey, value, &s)) { | |||||
// Done | |||||
} else { | |||||
s = current->Get(options, lkey, value, &stats); | |||||
have_stat_update = true; | |||||
} | |||||
// TTL: Get the true value and make sure the data is still living | |||||
if(!value->empty()) { | |||||
uint64_t dead_line; | |||||
DecodeDeadLineValue(value, dead_line); | |||||
if (dead_line != 0) { | |||||
// use TTL | |||||
if (std::time(nullptr) >= dead_line) { | |||||
// data expired | |||||
*value = ""; | |||||
s = Status::NotFound("Data expired"); | |||||
} | |||||
} else { | |||||
// TTL not set | |||||
} | |||||
} | |||||
mutex_.Lock(); | |||||
} | |||||
(......) | |||||
} | |||||
``` | |||||
若使用了`TTL`功能,则当文件过期时,返回`NotFound("Data expired")`的信息,即“数据已清除”。 | |||||
**注意**:由于LevelDB对于数据的读取是只读`ReadOnly`的,因此只能返回`NotFound`的信息,而无法真正清理过期数据。 | |||||
### 1.2.3 合并 Compaction | |||||
在合并的过程中,需要做到清理掉过期的数据,释放空间。 | |||||
在大合并的过程中,需要调用`DoCompactionWorks`函数实现合并的操作,也是在这个过程中,LevelDB得以真正完成清理旧版本数据、已删除数据并释放空间的过程。 | |||||
其实现逻辑是在该函数的过程中引入一个布尔变量`drop`,对于需要清理的数据设置`drop`为`True`,而需要保留的数据则是`drop`为`False`,最后根据`drop`的值清理过期数据,并将需要保留的数据合并写入新的`SSTable`。 | |||||
因此,我们只需要在判断`drop`为`True`的条件中加入对过期时间(DeadLine)的判断就可以实现`TTL`功能的清理过期数据了: | |||||
``` | |||||
Status DBImpl::DoCompactionWork(CompactionState* compact) { | |||||
(......) | |||||
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { | |||||
(......) | |||||
// Handle key/value, add to state, etc. | |||||
bool drop = false; | |||||
if (!ParseInternalKey(key, &ikey)) { | |||||
// Do not hide error keys | |||||
current_user_key.clear(); | |||||
has_current_user_key = false; | |||||
last_sequence_for_key = kMaxSequenceNumber; | |||||
} else { | |||||
if (!has_current_user_key || | |||||
user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) != | |||||
0) { | |||||
// First occurrence of this user key | |||||
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size()); | |||||
has_current_user_key = true; | |||||
last_sequence_for_key = kMaxSequenceNumber; | |||||
} | |||||
std::string value = input->value().ToString(); | |||||
uint64_t ddl; | |||||
DecodeDeadLineValue(&value, ddl); | |||||
if (last_sequence_for_key <= compact->smallest_snapshot) { | |||||
// Hidden by an newer entry for same user key | |||||
drop = true; // (A) | |||||
} else if (ikey.type == kTypeDeletion && | |||||
ikey.sequence <= compact->smallest_snapshot && | |||||
compact->compaction->IsBaseLevelForKey(ikey.user_key)) { | |||||
// For this user key: | |||||
// (1) there is no data in higher levels | |||||
// (2) data in lower levels will have larger sequence numbers | |||||
// (3) data in layers that are being compacted here and have | |||||
// smaller sequence numbers will be dropped in the next | |||||
// few iterations of this loop (by rule (A) above). | |||||
// Therefore this deletion marker is obsolete and can be dropped. | |||||
drop = true; | |||||
} else if (ddl <= std::time(nullptr)) { // 根据ttl判断是否为过期数据 | |||||
// TTL: data expired | |||||
drop = true; | |||||
} | |||||
last_sequence_for_key = ikey.sequence; | |||||
} | |||||
(......) | |||||
input->Next(); | |||||
} | |||||
(......) | |||||
} | |||||
``` | |||||
理论上,`Compaction`相关的代码也实现了,但实际上还会存在一些问题。具体问题和解决方法详见 [问题和解决方案](#3.-问题和解决方案)。 | |||||
## 2. 测试用例和测试结果 | |||||
## 3. 问题和解决方案 | |||||
本实验的问题和**合并**`Compaction`有关。 | |||||
LevelDB中`Compaction`的逻辑是选中特定层(`level`)合并,假设为`level n`。在`level n`中找目标文件`SSTable A`(假设该次触发的合并从文件A开始),并确定`level n`以及`level n+1`中与`SSTable A`包含的数据(`key/value`)有`key`发生重复(`overlap`)的所有文件,合并后产生新的`SSTable B`放入`level n+1`层中。 | |||||
这样引发的问题是:由`DoCompactionWorks`代码可知,不参与合并的文件无法被获取信息,因而即使过期了也无法被清理。在此例子中,若`level n+1`层中有含有过期数据的`SSTable C`,但`SSTable C`与`SSTable A`发生合并时触及的所有文件都没有`overlap`的话,则在该次合并中并没有被触及,因而无法被清理。 | |||||
再次强调,合并过程必须清理**所有**已经过期的数据(尽管这听起来有些让人困惑,因为正常使用时,没有被合并的过期数据即使未被清理也不影响正常使用),而无法被清理的`SSTable C`明显是个例外,是个错误。 | |||||
而在LevelDB提供的`CompactRange(nullptr,nullptr)`这个“合并所有数据”的功能中,同样会发生这样的错误,导致有部分数据文件并没有在合并过程中被触及——即使它声称合并了**所有**数据。 | |||||
从代码层面可以理解导致该错误的原因。原来的代码: | |||||
``` | |||||
void DBImpl::CompactRange(const Slice* begin, const Slice* end) { | |||||
int max_level_with_files = 1; | |||||
{ | |||||
MutexLock l(&mutex_); | |||||
Version* base = versions_->current(); | |||||
for (int level = 1; level < config::kNumLevels; level++) { | |||||
if (base->OverlapInLevel(level, begin, end)) { | |||||
max_level_with_files = level; | |||||
} | |||||
} | |||||
} | |||||
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap | |||||
for (int level = 0; level < max_level_with_files; level++) { | |||||
TEST_CompactRange(level, begin, end); | |||||
} | |||||
} | |||||
void DBImpl::TEST_CompactRange(int level, const Slice* begin, | |||||
const Slice* end) { | |||||
assert(level >= 0); | |||||
assert(level + 1 < config::kNumLevels); | |||||
(......) | |||||
} | |||||
``` | |||||
这里注意两个数:`config::kNumLevels`和`max_level_with_files`: | |||||
+ config::kNumLevels:是LevelDB在启动时设定的数,表示总共使用的`level`层数。默认值为7 | |||||
+ max_level_with_files:表示含有`SSTable`文件的最高层的编号。注意,这里的`level`编号是从0开始的,因此`max_level_with_files`理论最大值是`config::kNumLevels - 1` | |||||
在原来的代码中,`DBImpl::CompactRange`函数中的最后一个循环,选择的`level`无法到达`max_level_with_files`,因此即使“合并所有数据”,也没法触及`max_level_with_files`中的所有文件,而是对`max_level_with_files - 1`层的所有文件做了大合并。 | |||||
修改后的代码如下: | |||||
``` | |||||
void DBImpl::CompactRange(const Slice* begin, const Slice* end) { | |||||
int max_level_with_files = 1; | |||||
{ | |||||
MutexLock l(&mutex_); | |||||
Version* base = versions_->current(); | |||||
for (int level = 1; level < config::kNumLevels; level++) { | |||||
if (base->OverlapInLevel(level, begin, end)) { | |||||
max_level_with_files = level; | |||||
} | |||||
} | |||||
} | |||||
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap | |||||
for (int level = 0; level < max_level_with_files + 1; level++) { | |||||
TEST_CompactRange(level, begin, end); | |||||
} | |||||
} | |||||
void DBImpl::TEST_CompactRange(int level, const Slice* begin, | |||||
const Slice* end) { | |||||
assert(level >= 0); | |||||
assert(level < config::kNumLevels); | |||||
(......) | |||||
} | |||||
``` | |||||
简单直接地改了循环的条件和相关函数`TEST_CompactRange`的判断条件,使得`level`可以到达`max_level_with_files`,这样就可以真正地合并所有文件并清理过期数据。 |
@ -0,0 +1,27 @@ | |||||
#include "leveldb/db.h" | |||||
#include <iostream> | |||||
using namespace std; | |||||
using namespace leveldb; | |||||
int main() { | |||||
DB* db = nullptr; | |||||
Options op; | |||||
op.create_if_missing = true; | |||||
Status status = DB::Open(op, "testdb", &db); | |||||
assert(status.ok()); | |||||
db->Put(WriteOptions(), "001", "leveldb", 5); | |||||
string s; | |||||
auto stat = db->Get(ReadOptions(), "001", &s); | |||||
cout<<s<<endl; | |||||
db->Put(WriteOptions(), "002", "world"); | |||||
string s1; | |||||
db->Delete(WriteOptions(), "002"); | |||||
db->Get(ReadOptions(), "002", &s1); | |||||
cout<<s1<<endl; | |||||
delete db; | |||||
return 0; | |||||
} |