Compare commits

...

31 Commits
main ... main

Author SHA1 Message Date
  ArcueidType 6c2151942a update README 2 weeks ago
  ArcueidType a69859e497 update README file 2 weeks ago
  ArcueidType 6da20e1b7f update README file 3 weeks ago
  ArcueidType bce01d8930 Complete Report in README 3 weeks ago
  ArcueidType c295a4a490 Complete Report in README 3 weeks ago
  ArcueidType ff59f3e731 update README file 3 weeks ago
  李畅 47343f975b report update again 3 weeks ago
  李畅 e5aaef2f4c modify test 3 weeks ago
  李畅 07f2b8206f add report 3 weeks ago
  李畅 f0a085c9a2 Merge branch 'main' of https://gitea.shuishan.net.cn/10225101440/leveldb_ttl into optimize 3 weeks ago
  李畅 d6199efe75 modify new test 3 weeks ago
  ArcueidType 308af20f79 update TEST LastLevelCompaction 3 weeks ago
  李畅 7ce859c2ce Merge branch 'main' of https://gitea.shuishan.net.cn/10225101440/leveldb_ttl into optimize 3 weeks ago
  ArcueidType 47e4063400 simplify SetOtherInputs 3 weeks ago
  李畅 8562e5a189 Merge branch 'optimize' of https://gitea.shuishan.net.cn/10225101440/leveldb_ttl into optimize 3 weeks ago
  李畅 4ada9d0897 modify modeldb 3 weeks ago
  ArcueidType 45c1b090ae newest implementation 3 weeks ago
  ArcueidType de6432f1e2 optimize last level handling 3 weeks ago
  李畅 9676d35ddc Merge branch 'light_ver' of https://gitea.shuishan.net.cn/10225101440/leveldb_ttl into light_ver 3 weeks ago
  李畅 5de63e747e report ver1 3 weeks ago
  ArcueidType c175e4ed40 comment for TEST LastLevelCompaction 3 weeks ago
  ArcueidType 2481ca24ba config kNumLevels for TEST LastLevelCompaction 3 weeks ago
  ArcueidType 830f61ddb7 config kNumLevels for TEST LastLevelCompaction 3 weeks ago
  ArcueidType a75a5f5c4d handle last level 3 weeks ago
  李畅 43fe2f30ed !NOT for USE! Compact last level 3 weeks ago
  李畅 b3f7582be3 slight change 3 weeks ago
  ArcueidType 2d970272b0 add a TODO 3 weeks ago
  ArcueidType 8edba27b88 add delete db for all tests 3 weeks ago
  ArcueidType 57cd670dcc a naive version to pass ttl_test 3 weeks ago
  ArcueidType 78185e9e9c optimize codes structure and paa ReadTTL test 3 weeks ago
  ArcueidType a0cfce1188 Modify Put/Get methods for TTL support 3 weeks ago
18 changed files with 951 additions and 68 deletions
Split View
  1. +6
    -1
      CMakeLists.txt
  2. +402
    -4
      README.md
  3. +307
    -0
      Report/Report.md
  4. +9
    -2
      db/builder.cc
  5. +5
    -5
      db/corruption_test.cc
  6. +70
    -17
      db/db_impl.cc
  7. +5
    -1
      db/db_impl.h
  8. +20
    -17
      db/db_test.cc
  9. +23
    -5
      db/version_set.cc
  10. +6
    -2
      db/version_set.h
  11. BIN
      img/all_pass.jpg
  12. BIN
      img/test1_succ.png
  13. BIN
      img/test2_succ.png
  14. BIN
      img/test3_failure.png
  15. BIN
      img/test3_succ.png
  16. +27
    -0
      test/db_test1.cc
  17. +60
    -14
      test/ttl_test.cc
  18. +11
    -0
      util/coding.h

+ 6
- 1
CMakeLists.txt View File

@ -528,4 +528,9 @@ target_link_libraries(db_test2 PRIVATE leveldb)
add_executable(ttl_test
"${PROJECT_SOURCE_DIR}/test/ttl_test.cc"
)
target_link_libraries(ttl_test PRIVATE leveldb gtest)
target_link_libraries(ttl_test PRIVATE leveldb gtest)
add_executable(db_test1
"${PROJECT_SOURCE_DIR}/test/db_test1.cc"
)
target_link_libraries(db_test1 leveldb)

+ 402
- 4
README.md View File

@ -1,9 +1,407 @@
# 在LevelDB中实现TTL功能
**本仓库提供TTL基本的测试用例**
### 10225102463 李畅
### 10225101440 韩晨旭@ArcueidType
克隆代码:
# 实验要求
```bash
git clone --recurse-submodules https://gitea.shuishan.net.cn/building_data_management_systems.Xuanzhou.2024Fall.DaSE/leveldb_base.git
+ 在LevelDB中实现键值对的`TTL(Time-To-Live)`功能,使得过期的数据在**读取**时自动失效,并在适当的时候被**合并**清理。
+ 修改LevelDB的源码,实现对`TTL`的支持,包括数据的写入、读取和过期数据的清理。
+ 编写测试用例,验证`TTL`功能的正确性和稳定性。
# 1. 设计思路和实现过程
## 1.1 设计思路
### Phase 0
在LevelDB中实现`TTL`功能主要涉及数据的**读、写、合并**。
在**写入**数据时,`TTL`功能是个可选项。从代码层面来说,LevelDB数据的写入调用了`Put`函数接口:
```
// 假设增加一个新的Put接口,包含TTL参数, 单位(秒)
Status DB::Put(const WriteOptions& opt, const Slice& key,
const Slice& value, uint64_t ttl);
// 如果调用的是原本的Put接口,那么就不会失效
Status DB::Put(const WriteOptions& opt, const Slice& key,
const Slice& value);
```
这段代码中,如果存入`TTL`参数,则调用本实验中新实现的`Put`函数接口;否则直接调用原有的`Put`接口。
### Phase 1
本小组的思路很简明:
+ 直接将`TTL`信息在**写入**阶段添加到原有的数据结构中
+ 在**读取**和**合并**时从得到的数据中 **解读** 出其存储的`TTL`信息,判断是否过期
+ 在手动合并中对最后有文件的层(max_level_with_files)进行扫描,清除其中的过期数据
因此,接下来需要思考的是如何将`TTL`信息巧妙地存入LevelDB的数据结构中。
### Phase 2
由于插入数据时调用的`Put`接口只有三个参数,其中`opt`是写入时的系统配置,实际插入的数据只有`key/value`,因此存储`ttl`信息时,最简单的方法就是存入`key`或`value`中。
在这样的方法中,可以调用原有的`Put`函数,将含义`ttl`信息的`key/value`数据存入数据库。
经过讨论后,本小组选择将`ttl`信息存入`value`中。
这样做的优缺点是:
+ **优点**:由于LevelDB在合并数据的过程中,需要根据`SSTable`的`key`对数据进行有序化处理,将`ttl`信息存储在`value`中不会影响`key`的信息,因此对已有的合并过程不产生影响
+ **缺点**:在获取到`SSTable`的`key`时,无法直接判断该文件是否因为`ttl`而成为过期数据,仍然需要读取对应的`value`才能判断,多了一步读取开销。***但是***,实际上在读取数据以及合并数据的过程中,其代码实际上都读取了对应`SSTable`中存储的`value`信息,因此获取`ttl`信息时必要的读取`value`过程并不是多余的,实际***几乎不***造成额外的读取开销影响
### Phase 3
在确定插入数据时选用的方法后,读取和合并的操作只需要在获取数据文件的`value`后解读其包含的`ttl`信息,并判断是否过期就可以了。
## 1.2 实现过程
### 1.2.1 写入 Put
首先需要实现的是将`ttl`信息存入`value`的方法。`Put`中获取的`ttl`参数是该数据文件的**生存时间(单位:秒)**。本小组对此进行处理方法是通过`ttl`计算过期时间的**时间戳**,转码为字符串类型后存入`value`的最前部,并用`|`符号与原来的值分隔开。
对于不使用`ttl`的数据文件,存入`0`作为`ttl`,在读取数据时若读到`0`则表示不使用`TTL`功能。
将此功能封装为**编码**和**解码**文件过期时间(DeadLine)的函数,存储在`/util/coding.h`文件中:
```c++
inline std::string EncodeDeadLine(uint64_t ddl, const Slice& value) { // 存储ttl信息
return std::to_string(ddl) + "|" + value.ToString();
}
inline void DecodeDeadLineValue(std::string* value, uint64_t& ddl) { // 解读ttl信息
auto separator = value->find_first_of("|");
std::string ddl_str = value->substr(0, separator);
ddl = std::atoll(ddl_str.c_str());
*value = value->substr(separator + 1);
}
```
在写入数据调用`Put`接口时,分别启用`EncodeDeadLine`函数存储`ttl`信息:
```c++
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
// TTL: Update TTL Encode
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { // 不使用TTL
WriteBatch batch;
batch.Put(key, EncodeDeadLine(0, value));
return Write(opt, &batch);
}
// TTL: Put methods for ttl
Status DB::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) { // 使用TTL
WriteBatch batch;
auto dead_line = std::time(nullptr) + ttl; // 计算过期时间的时间戳
batch.Put(key, EncodeDeadLine(dead_line, value));
return Write(options, &batch);
}
```
### 1.2.2 读取 Get
LevelDB在读取数据时,调用`Get`接口,获取文件的`key/value`,因此只需要在`Get`函数中加入使用`TTL`功能的相关代码:
```c++
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
(......)
// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
// TTL: Get the true value and make sure the data is still living
if(!value->empty()) {
uint64_t dead_line;
DecodeDeadLineValue(value, dead_line);
if (dead_line != 0) {
// use TTL
if (std::time(nullptr) >= dead_line) {
// data expired
*value = "";
s = Status::NotFound("Data expired");
}
} else {
// TTL not set
}
}
mutex_.Lock();
}
(......)
}
```
若使用了`TTL`功能,则当文件过期时,返回`NotFound("Data expired")`的信息,即“数据已清除”。
**注意**:由于LevelDB对于数据的读取是只读`ReadOnly`的,因此只能返回`NotFound`的信息,而无法真正清理过期数据。
### 1.2.3 合并 Compaction
在合并的过程中,需要做到清理掉过期的数据,释放空间。
在大合并的过程中,需要调用`DoCompactionWorks`函数实现合并的操作,也是在这个过程中,LevelDB得以真正完成清理旧版本数据、已删除数据并释放空间的过程。
其实现逻辑是在该函数的过程中引入一个布尔变量`drop`,对于需要清理的数据设置`drop`为`True`,而需要保留的数据则是`drop`为`False`,最后根据`drop`的值清理过期数据,并将需要保留的数据合并写入新的`SSTable`。
因此,我们只需要在判断`drop`为`True`的条件中加入对过期时间(DeadLine)的判断就可以实现`TTL`功能的清理过期数据了:
```c++
Status DBImpl::DoCompactionWork(CompactionState* compact) {
(......)
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
(......)
// Handle key/value, add to state, etc.
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
current_user_key.clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
} else {
if (!has_current_user_key ||
user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
0) {
// First occurrence of this user key
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
}
std::string value = input->value().ToString();
uint64_t ddl;
DecodeDeadLineValue(&value, ddl);
if (last_sequence_for_key <= compact->smallest_snapshot) {
(......)
} else if (ddl <= std::time(nullptr)) { // 根据ttl判断是否为过期数据
// TTL: data expired
drop = true;
}
last_sequence_for_key = ikey.sequence;
}
(......)
input->Next();
}
(......)
}
```
理论上,`Compaction`已经实现了合并中清除过期数据的功能,但由于原本leveldb是按照没有TTL功能的逻辑设计的,因此其对数据的清除并不完全,也会造成测试不通过。
LevelDB中`Compaction`的逻辑是选中特定层(`level`)合并,假设为`level n`。在`level n`中找目标文件`SSTable A`(假设该次触发的合并从文件A开始),并确定`level n`以及`level n+1`中与`SSTable A`包含的数据(`key/value`)有`key`发生重复(`overlap`)的所有文件,合并后产生新的`SSTable B`放入`level n+1`层中。
然而由`DoCompactionWorks`代码可知,不参与合并的文件即使过期了也无法被清理。
例如,`level n+1`层中有含有过期数据的`SSTable C`,但由于`level n+1`是最后一个含有文件的层,即使是`CompactRange(nullptr, nullptr)`,对所有数据合并,其只将`level n`的所有文件与`level n+1`中与上层有重叠的文件合并,因此`SSTable C`不会被清理。
`CompactRange(nullptr, nullptr)`应该能够合并所有数据,也就是可以清除所有过期数据,而无法被清理的`SSTable C`明显是个例外,是个错误。
从代码层面看导致该错误的原因。原来的代码:
```c++
void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
int max_level_with_files = 1;
{
MutexLock l(&mutex_);
Version* base = versions_->current();
for (int level = 1; level < config::kNumLevels; level++) {
if (base->OverlapInLevel(level, begin, end)) {
max_level_with_files = level;
}
}
}
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
for (int level = 0; level < max_level_with_files; level++) {
TEST_CompactRange(level, begin, end);
}
}
void DBImpl::TEST_CompactRange(int level, const Slice* begin,
const Slice* end) {
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
(......)
}
```
这里注意两个数:`config::kNumLevels`和`max_level_with_files`:
+ config::kNumLevels:是LevelDB在启动时设定的数,表示总共使用的`level`层数。默认值为7
+ max_level_with_files:表示含有`SSTable`文件的最高层的编号。注意,这里的`level`编号是从0开始的,因此`max_level_with_files`理论最大值是`config::kNumLevels - 1`
可以看出与上述例子相符,对最后有文件的那一层不会进行全范围的合并,因此产生漏网之鱼,而最直接想到的解决办法就是让手动合并时也对这最后一层做一次全范围的合并,就可以清除所有的过期数据。
这样只需要让循环多走一层:
```c++
for (int level = 0; level < max_level_with_files + 1; level++) {
TEST_CompactRange(level, begin, end);
}
```
但是这种方法仍然存在两个问题:
1. 最后有文件的一层(max_level_with_files)可能并没有达到合并的条件,也就是说其不应该被合并到下一层中,这也同样符合leveldb原本的逻辑,但是这样的做法会导致其合并至下一层。
2. 当存储数据已经堆积到整个leveldb的最底层时(即已达到`kNumLevels`),按照上述代码执行会导致致命错误并使数据库崩溃。
防止最底层的数据被合并的函数调用是唯一的解决办法,但是这个做法又回到了上一个问题,最底层的过期数据可能始终无法被完全清除。
### 1.2.4 合并的最终解决方案
为了完全解决这个问题并让`CompactRange`正常工作,我们提出了一个新的方案:
对于最后有文件的一层(即max_level_with_files),我们进行单独的处理,去除掉其中的过期数据,但不将其推向下一层。
实现上,去除一层(e.g. level n)中的过期数据,实际上等价于将`level n`与一个空的层合并,再将合并的结果输出回`level n`。
因此,我们可以直接利用leveldb中被我们修改过的合并功能(合并中drop过期数据)来实现清除一层中的过期数据。
> CompactRange函数用于手动合并,对于leveldb的自动合并,我们并不需要强迫其清除其触碰不到的数据,因此只需要修改手动合并的部分
在实现上,引入参数`is_last_level`表示当前是否在处理`max_level_with_files`,如果是则按照上述逻辑清除该层过期数据,反之按照原本的合并正常进行
在结构体`ManualCompaction`和类`Compaction`中引入新的成员`is_max_level`:
```c++
struct ManualCompaction {
int level;
bool is_last_level; // TTL: Used to check if last level with files
bool done;
const InternalKey* begin; // null means beginning of key range
const InternalKey* end; // null means end of key range
InternalKey tmp_storage; // Used to keep track of compaction progress
};
class Compaction {
public:
(...)
bool is_last_level() const {return is_last_level_; }
(...)
private:
(...)
bool is_last_level_;
(...)
}
```
给方法`TEST_CompactRange`和`VersionSet::CompactRange`增加一个参数`is_last_level`,用来传递构造结构体`ManualCompaction`和类`Compaction`的相应值。
在`CompactRange`的上述循环中调用`TEST_CompactRange`时,会将`is_last_level`传给该方法
```c++
for (int level = 0; level < max_level_with_files + 1; level++) {
TEST_CompactRange(level, begin, end, level == max_level_with_files);
}
```
在清除当前层过期数据的操作中,compaction中的inputs[1]应该为空,因此在`VersionSet::SetupOtherInputs`方法中,跳过对inputs[1]的填写:
```c++
if (!c->is_last_level()) {
current_->GetOverlappingInputs(level + 1, &smallest, &largest,
&c->inputs_[1]);
AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]);
}
```
接下来我们要做的是让合并的结果输出到当前层`level`而不是`level+1`
需要修改两处:
1. DBImpl::DoCompactionWork中,stats_应该add在当前level上
2. DBImpl::InstallCompactionResults中,将结果写入当前level中
代码:
```c++
Status DBImpl::DoCompactionWork(CompactionState* compact) {
(...)
if (!compact->compaction->is_last_level()) {
stats_[compact->compaction->level() + 1].Add(stats);
} else {
// TTL: compaction for last level
stats_[compact->compaction->level()].Add(stats);
}
(...)
}
Status DBImpl::InstallCompactionResults(CompactionState* compact) {
(...)
if (!compact->compaction->is_last_level()) {
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.smallest, out.largest);
} else {
// TTL: outputs of last level compaction should be writen to last level itself
compact->compaction->edit()->AddFile(level, out.number, out.file_size,
out.smallest, out.largest);
}
(...)
}
```
通过以上修改后即解决了`CompactRange`的问题,使其能够正常工作
## 2. 测试用例和测试结果
### TestTTL.ReadTTL
`TestTTL`中第一个测试样例,检测插入和读取数据时,读取过期数据能够正确返回`NotFound`。
![ReadTTL](img/test1_succ.png)
### TestTTL.CompactionTTL
`TestTTL`中第二个测试样例,检测插入和合并数据,能够在手动触发所有数据的合并时清除所有过期数据。
![CompactionTTL](img/test2_succ.png)
### TestTTL.LastLevelCompaction
在解决了上面遇到的问题之后,本小组添加了一个测试样例,用来检测在含义`SSTable`的最底层能否正确合并、清理过期数据。
在该样例中将`kNumLevels`设置为`3`可以保证(同时Assert确认正确性)最后一层中存在数据,我们的实现仍然可以通过测试:
> main分支中为原始配置,即kNumLevels = 7, 其层数过大,导致要测试最底层含有数据非常耗时,因此我们在其他分支中提供了kNumLevels = 3配置的实现用于测试
`light_ver`分支:正确的合并实现,能通过测试
![LastLevelCompaction](img/test3_succ.png)
而采用上述直接添加一层循环并且跳过最底层避免崩溃的方法则无法通过测试:
`naive_version`分支:简单增加一层循环并跳过最底层合并,无法清除最底层过期数据,因此无法通过测试
![LastLevelCompactionFail](img/test3_failure.png)
最后,展示三个测试样例一起运行的结果:
![All_pass](img/all_pass.jpg)

+ 307
- 0
Report/Report.md View File

@ -0,0 +1,307 @@
<center><font size=7>在LevelDB中实现TTL功能</font></center>
<center><font size=4>10225102463 李畅 10225101___ 韩晨旭</font></center>
# 实验要求
+ 在LevelDB中实现键值对的`TTL(Time-To-Live)`功能,使得过期的数据在**读取**时自动失效,并在适当的时候被**合并**清理。
+ 修改LevelDB的源码,实现对`TTL`的支持,包括数据的写入、读取和过期数据的清理。
+ 编写测试用例,验证`TTL`功能的正确性和稳定性。*(Optional)*
# 1. 设计思路和实现过程
## 1.1 设计思路
### Phase 0
在LevelDB中实现`TTL`功能主要涉及数据的**读、写、合并**。
在**写入**数据时,`TTL`功能是个可选项。从代码层面来说,LevelDB数据的写入调用了`Put`函数接口:
```
// 假设增加一个新的Put接口,包含TTL参数, 单位(秒)
Status DB::Put(const WriteOptions& opt, const Slice& key,
const Slice& value, uint64_t ttl);
// 如果调用的是原本的Put接口,那么就不会失效
Status DB::Put(const WriteOptions& opt, const Slice& key,
const Slice& value);
```
这段代码中,如果存入`TTL`参数,则调用本实验中新实现的`Put`函数接口;否则直接调用原有的`Put`接口。
### Phase 1
本小组的思路很简明:
+ 直接将`TTL`信息在**写入**阶段添加到原有的数据结构中
+ 在**读取**和**合并**时从得到的数据中 **解读** 出其存储的`TTL`信息,判断是否过期
因此,接下来需要思考的是如何将`TTL`信息巧妙地存入LevelDB的数据结构中。
### Phase 2
由于插入数据时调用的`Put`接口只有三个参数,其中`opt`是写入时的系统配置,实际插入的数据只有`key/value`,因此存储`ttl`信息时,最简单的方法就是存入`key`或`value`中。
在这样的方法中,可以调用原有的`Put`函数,将含义`ttl`信息的`key/value`数据存入数据库。
经过讨论后,本小组选择将`ttl`信息存入`value`中。
这样做的优缺点是:
+ **优点**:由于LevelDB在合并数据的过程中,需要根据`SSTable`的`key`对数据进行有序化处理,将`ttl`信息存储在`value`中不会影响`key`的信息,因此对已有的合并过程不产生影响
+ **缺点**:在获取到`SSTable`的`key`时,无法直接判断该文件是否因为`ttl`而成为过期数据,仍然需要读取对应的`value`才能判断,多了一步读取开销。***但是***,实际上在读取数据以及合并数据的过程中,其代码实际上都读取了对应`SSTable`中存储的`value`信息,因此获取`ttl`信息时必要的读取`value`过程并不是多余的,实际***几乎不***造成额外的读取开销影响
### Phase 3
在确定插入数据时选用的方法后,读取和合并的操作只需要在获取数据文件的`value`后解读其包含的`ttl`信息,并判断是否过期就可以了。
## 1.2 实现过程
### 1.2.1 写入 Put
首先需要实现的是将`ttl`信息存入`value`的方法。`Put`中获取的`ttl`参数是该数据文件的**生存时间(单位:秒)**。本小组对此进行处理方法是通过`ttl`计算过期时间的**时间戳**,转码为字符串类型后存入`value`的最前部,并用`|`符号与原来的值分隔开。
对于不使用`ttl`的数据文件,存入`0`作为`ttl`,在读取数据时若读到`0`则表示不使用`TTL`功能。
将此功能封装为**编码**和**解码**文件过期时间(DeadLine)的函数,存储在`/util/coding.h`文件中:
```
inline std::string EncodeDeadLine(uint64_t ddl, const Slice& value) { // 存储ttl信息
return std::to_string(ddl) + "|" + value.ToString();
}
inline void DecodeDeadLineValue(std::string* value, uint64_t& ddl) { // 解读ttl信息
auto separator = value->find_first_of("|");
std::string ddl_str = value->substr(0, separator);
ddl = std::atoll(ddl_str.c_str());
*value = value->substr(separator + 1);
}
```
在写入数据调用`Put`接口时,分别启用`EncodeDeadLine`函数存储`ttl`信息:
```
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
// TTL: Update TTL Encode
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { // 不使用TTL
WriteBatch batch;
batch.Put(key, EncodeDeadLine(0, value));
return Write(opt, &batch);
}
// TTL: Put methods for ttl
Status DB::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) { // 使用TTL
WriteBatch batch;
auto dead_line = std::time(nullptr) + ttl; // 计算过期时间的时间戳
batch.Put(key, EncodeDeadLine(dead_line, value));
return Write(options, &batch);
}
```
### 1.2.2 读取 Get
LevelDB在读取数据时,调用`Get`接口,获取文件的`key/value`,因此只需要在`Get`函数中加入使用`TTL`功能的相关代码:
```
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
(......)
// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
// TTL: Get the true value and make sure the data is still living
if(!value->empty()) {
uint64_t dead_line;
DecodeDeadLineValue(value, dead_line);
if (dead_line != 0) {
// use TTL
if (std::time(nullptr) >= dead_line) {
// data expired
*value = "";
s = Status::NotFound("Data expired");
}
} else {
// TTL not set
}
}
mutex_.Lock();
}
(......)
}
```
若使用了`TTL`功能,则当文件过期时,返回`NotFound("Data expired")`的信息,即“数据已清除”。
**注意**:由于LevelDB对于数据的读取是只读`ReadOnly`的,因此只能返回`NotFound`的信息,而无法真正清理过期数据。
### 1.2.3 合并 Compaction
在合并的过程中,需要做到清理掉过期的数据,释放空间。
在大合并的过程中,需要调用`DoCompactionWorks`函数实现合并的操作,也是在这个过程中,LevelDB得以真正完成清理旧版本数据、已删除数据并释放空间的过程。
其实现逻辑是在该函数的过程中引入一个布尔变量`drop`,对于需要清理的数据设置`drop`为`True`,而需要保留的数据则是`drop`为`False`,最后根据`drop`的值清理过期数据,并将需要保留的数据合并写入新的`SSTable`。
因此,我们只需要在判断`drop`为`True`的条件中加入对过期时间(DeadLine)的判断就可以实现`TTL`功能的清理过期数据了:
```
Status DBImpl::DoCompactionWork(CompactionState* compact) {
(......)
while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
(......)
// Handle key/value, add to state, etc.
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
current_user_key.clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
} else {
if (!has_current_user_key ||
user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
0) {
// First occurrence of this user key
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
}
std::string value = input->value().ToString();
uint64_t ddl;
DecodeDeadLineValue(&value, ddl);
if (last_sequence_for_key <= compact->smallest_snapshot) {
// Hidden by an newer entry for same user key
drop = true; // (A)
} else if (ikey.type == kTypeDeletion &&
ikey.sequence <= compact->smallest_snapshot &&
compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
// For this user key:
// (1) there is no data in higher levels
// (2) data in lower levels will have larger sequence numbers
// (3) data in layers that are being compacted here and have
// smaller sequence numbers will be dropped in the next
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
} else if (ddl <= std::time(nullptr)) { // 根据ttl判断是否为过期数据
// TTL: data expired
drop = true;
}
last_sequence_for_key = ikey.sequence;
}
(......)
input->Next();
}
(......)
}
```
理论上,`Compaction`相关的代码也实现了,但实际上还会存在一些问题。具体问题和解决方法详见 [问题和解决方案](#3.-问题和解决方案)。
## 2. 测试用例和测试结果
## 3. 问题和解决方案
本实验的问题和**合并**`Compaction`有关。
LevelDB中`Compaction`的逻辑是选中特定层(`level`)合并,假设为`level n`。在`level n`中找目标文件`SSTable A`(假设该次触发的合并从文件A开始),并确定`level n`以及`level n+1`中与`SSTable A`包含的数据(`key/value`)有`key`发生重复(`overlap`)的所有文件,合并后产生新的`SSTable B`放入`level n+1`层中。
这样引发的问题是:由`DoCompactionWorks`代码可知,不参与合并的文件无法被获取信息,因而即使过期了也无法被清理。在此例子中,若`level n+1`层中有含有过期数据的`SSTable C`,但`SSTable C`与`SSTable A`发生合并时触及的所有文件都没有`overlap`的话,则在该次合并中并没有被触及,因而无法被清理。
再次强调,合并过程必须清理**所有**已经过期的数据(尽管这听起来有些让人困惑,因为正常使用时,没有被合并的过期数据即使未被清理也不影响正常使用),而无法被清理的`SSTable C`明显是个例外,是个错误。
而在LevelDB提供的`CompactRange(nullptr,nullptr)`这个“合并所有数据”的功能中,同样会发生这样的错误,导致有部分数据文件并没有在合并过程中被触及——即使它声称合并了**所有**数据。
从代码层面可以理解导致该错误的原因。原来的代码:
```
void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
int max_level_with_files = 1;
{
MutexLock l(&mutex_);
Version* base = versions_->current();
for (int level = 1; level < config::kNumLevels; level++) {
if (base->OverlapInLevel(level, begin, end)) {
max_level_with_files = level;
}
}
}
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
for (int level = 0; level < max_level_with_files; level++) {
TEST_CompactRange(level, begin, end);
}
}
void DBImpl::TEST_CompactRange(int level, const Slice* begin,
const Slice* end) {
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
(......)
}
```
这里注意两个数:`config::kNumLevels`和`max_level_with_files`:
+ config::kNumLevels:是LevelDB在启动时设定的数,表示总共使用的`level`层数。默认值为7
+ max_level_with_files:表示含有`SSTable`文件的最高层的编号。注意,这里的`level`编号是从0开始的,因此`max_level_with_files`理论最大值是`config::kNumLevels - 1`
在原来的代码中,`DBImpl::CompactRange`函数中的最后一个循环,选择的`level`无法到达`max_level_with_files`,因此即使“合并所有数据”,也没法触及`max_level_with_files`中的所有文件,而是对`max_level_with_files - 1`层的所有文件做了大合并。
修改后的代码如下:
```
void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
int max_level_with_files = 1;
{
MutexLock l(&mutex_);
Version* base = versions_->current();
for (int level = 1; level < config::kNumLevels; level++) {
if (base->OverlapInLevel(level, begin, end)) {
max_level_with_files = level;
}
}
}
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
for (int level = 0; level < max_level_with_files + 1; level++) {
TEST_CompactRange(level, begin, end);
}
}
void DBImpl::TEST_CompactRange(int level, const Slice* begin,
const Slice* end) {
assert(level >= 0);
assert(level < config::kNumLevels);
(......)
}
```
简单直接地改了循环的条件和相关函数`TEST_CompactRange`的判断条件,使得`level`可以到达`max_level_with_files`,这样就可以真正地合并所有文件并清理过期数据。

+ 9
- 2
db/builder.cc View File

@ -32,8 +32,15 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
meta->smallest.DecodeFrom(iter->key());
Slice key;
for (; iter->Valid(); iter->Next()) {
key = iter->key();
builder->Add(key, iter->value());
// std::string value;
// uint64_t ddl;
// value = iter->value().ToString();
// DecodeDeadLineValue(&value, ddl);
// if (ddl > std::time(nullptr)) {
// TTL: only liver remains
key = iter->key();
builder->Add(key, iter->value());
// }
}
if (!key.empty()) {
meta->largest.DecodeFrom(key);

+ 5
- 5
db/corruption_test.cc View File

@ -228,8 +228,8 @@ TEST_F(CorruptionTest, TableFile) {
Build(100);
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable();
dbi->TEST_CompactRange(0, nullptr, nullptr);
dbi->TEST_CompactRange(1, nullptr, nullptr);
dbi->TEST_CompactRange(0, nullptr, nullptr, false);
dbi->TEST_CompactRange(1, nullptr, nullptr, false);
Corrupt(kTableFile, 100, 1);
Check(90, 99);
@ -242,8 +242,8 @@ TEST_F(CorruptionTest, TableFileRepair) {
Build(100);
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable();
dbi->TEST_CompactRange(0, nullptr, nullptr);
dbi->TEST_CompactRange(1, nullptr, nullptr);
dbi->TEST_CompactRange(0, nullptr, nullptr, false);
dbi->TEST_CompactRange(1, nullptr, nullptr, false);
Corrupt(kTableFile, 100, 1);
RepairDB();
@ -293,7 +293,7 @@ TEST_F(CorruptionTest, CorruptedDescriptor) {
ASSERT_LEVELDB_OK(db_->Put(WriteOptions(), "foo", "hello"));
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable();
dbi->TEST_CompactRange(0, nullptr, nullptr);
dbi->TEST_CompactRange(0, nullptr, nullptr, false);
Corrupt(kDescriptorFile, 0, 1000);
Status s = TryReopen();

+ 70
- 17
db/db_impl.cc View File

@ -4,14 +4,6 @@
#include "db/db_impl.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <set>
#include <string>
#include <vector>
#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
@ -22,11 +14,22 @@
#include "db/table_cache.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <iostream>
#include <ostream>
#include <set>
#include <string>
#include <vector>
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/status.h"
#include "leveldb/table.h"
#include "leveldb/table_builder.h"
#include "port/port.h"
#include "table/block.h"
#include "table/merger.h"
@ -591,20 +594,21 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
}
}
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
for (int level = 0; level < max_level_with_files; level++) {
TEST_CompactRange(level, begin, end);
for (int level = 0; level < max_level_with_files + 1; level++) {
TEST_CompactRange(level, begin, end, level == max_level_with_files);
}
}
void DBImpl::TEST_CompactRange(int level, const Slice* begin,
const Slice* end) {
const Slice* end, bool is_last_level) {
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
assert(level < config::kNumLevels);
InternalKey begin_storage, end_storage;
ManualCompaction manual;
manual.level = level;
manual.is_last_level = is_last_level;
manual.done = false;
if (begin == nullptr) {
manual.begin = nullptr;
@ -717,7 +721,7 @@ void DBImpl::BackgroundCompaction() {
InternalKey manual_end;
if (is_manual) {
ManualCompaction* m = manual_compaction_;
c = versions_->CompactRange(m->level, m->begin, m->end);
c = versions_->CompactRange(m->level, m->begin, m->end, m->is_last_level);
m->done = (c == nullptr);
if (c != nullptr) {
manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
@ -888,8 +892,14 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
const int level = compact->compaction->level();
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.smallest, out.largest);
if (!compact->compaction->is_last_level()) {
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.smallest, out.largest);
} else {
// TTL: outputs of last level compaction should be writen to last level itself
compact->compaction->edit()->AddFile(level, out.number, out.file_size,
out.smallest, out.largest);
}
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}
@ -963,6 +973,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
last_sequence_for_key = kMaxSequenceNumber;
}
std::string value = input->value().ToString();
uint64_t ddl;
DecodeDeadLineValue(&value, ddl);
if (last_sequence_for_key <= compact->smallest_snapshot) {
// Hidden by an newer entry for same user key
drop = true; // (A)
@ -977,6 +992,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
} else if (ddl <= std::time(nullptr)) {
// TTL: data expired
drop = true;
}
last_sequence_for_key = ikey.sequence;
@ -1042,7 +1060,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}
mutex_.Lock();
stats_[compact->compaction->level() + 1].Add(stats);
if (!compact->compaction->is_last_level()) {
stats_[compact->compaction->level() + 1].Add(stats);
} else {
// TTL: compaction for last level
stats_[compact->compaction->level()].Add(stats);
}
if (status.ok()) {
status = InstallCompactionResults(compact);
@ -1152,6 +1175,21 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
// TTL: Get the true value and make sure the data is still living
if(!value->empty()) {
uint64_t dead_line;
DecodeDeadLineValue(value, dead_line);
if (dead_line != 0) {
// use TTL
if (std::time(nullptr) >= dead_line) {
// data expired
*value = "";
s = Status::NotFound("Data expired");
}
} else {
// TTL not set
}
}
mutex_.Lock();
}
@ -1198,6 +1236,11 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val);
}
Status DBImpl::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) {
return DB::Put(options, key, value, ttl);
}
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
return DB::Delete(options, key);
}
@ -1485,12 +1528,22 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
// TTL: Update TTL Encode
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
batch.Put(key, EncodeDeadLine(0, value));
return Write(opt, &batch);
}
// TTL: Put methods for ttl
Status DB::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) {
WriteBatch batch;
auto dead_line = std::time(nullptr) + ttl;
batch.Put(key, EncodeDeadLine(dead_line, value));
return Write(options, &batch);
}
Status DB::Delete(const WriteOptions& opt, const Slice& key) {
WriteBatch batch;
batch.Delete(key);

+ 5
- 1
db/db_impl.h View File

@ -38,6 +38,8 @@ class DBImpl : public DB {
// Implementations of the DB interface
Status Put(const WriteOptions&, const Slice& key,
const Slice& value) override;
Status Put(const WriteOptions& options, const Slice& key, const Slice& value,
uint64_t ttl) override;
Status Delete(const WriteOptions&, const Slice& key) override;
Status Write(const WriteOptions& options, WriteBatch* updates) override;
Status Get(const ReadOptions& options, const Slice& key,
@ -52,7 +54,8 @@ class DBImpl : public DB {
// Extra methods (for testing) that are not in the public DB interface
// Compact any files in the named level that overlap [*begin,*end]
void TEST_CompactRange(int level, const Slice* begin, const Slice* end);
void TEST_CompactRange(int level, const Slice* begin, const Slice* end,
bool is_last_level);
// Force current memtable contents to be compacted.
Status TEST_CompactMemTable();
@ -79,6 +82,7 @@ class DBImpl : public DB {
// Information for a manual compaction
struct ManualCompaction {
int level;
bool is_last_level; // TTL: Used to check if last level with files
bool done;
const InternalKey* begin; // null means beginning of key range
const InternalKey* end; // null means end of key range

+ 20
- 17
db/db_test.cc View File

@ -786,7 +786,7 @@ TEST_F(DBTest, GetEncountersEmptyLevel) {
}
// Step 2: clear level 1 if necessary.
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
dbfull()->TEST_CompactRange(1, nullptr, nullptr, false);
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 1);
@ -1145,7 +1145,7 @@ TEST_F(DBTest, CompactionsGenerateMultipleFiles) {
// Reopening moves updates to level-0
Reopen(&options);
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
dbfull()->TEST_CompactRange(0, nullptr, nullptr, false);
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_GT(NumTableFilesAtLevel(1), 1);
@ -1196,7 +1196,7 @@ TEST_F(DBTest, SparseMerge) {
}
Put("C", "vc");
dbfull()->TEST_CompactMemTable();
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
dbfull()->TEST_CompactRange(0, nullptr, nullptr, false);
// Make sparse update
Put("A", "va2");
@ -1207,9 +1207,9 @@ TEST_F(DBTest, SparseMerge) {
// Compactions should not cause us to create a situation where
// a file overlaps too much data at the next level.
ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20 * 1048576);
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
dbfull()->TEST_CompactRange(0, nullptr, nullptr, false);
ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20 * 1048576);
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
dbfull()->TEST_CompactRange(1, nullptr, nullptr, false);
ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20 * 1048576);
}
@ -1273,7 +1273,7 @@ TEST_F(DBTest, ApproximateSizes) {
std::string cend_str = Key(compact_start + 9);
Slice cstart = cstart_str;
Slice cend = cend_str;
dbfull()->TEST_CompactRange(0, &cstart, &cend);
dbfull()->TEST_CompactRange(0, &cstart, &cend, false);
}
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
@ -1320,7 +1320,7 @@ TEST_F(DBTest, ApproximateSizes_MixOfSmallAndLarge) {
ASSERT_TRUE(Between(Size(Key(3), Key(5)), 110000, 111000));
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
dbfull()->TEST_CompactRange(0, nullptr, nullptr, false);
}
} while (ChangeOptions());
}
@ -1397,11 +1397,11 @@ TEST_F(DBTest, HiddenValuesAreRemoved) {
db_->ReleaseSnapshot(snapshot);
ASSERT_EQ(AllEntriesFor("foo"), "[ tiny, " + big + " ]");
Slice x("x");
dbfull()->TEST_CompactRange(0, nullptr, &x);
dbfull()->TEST_CompactRange(0, nullptr, &x, false);
ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]");
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_GE(NumTableFilesAtLevel(1), 1);
dbfull()->TEST_CompactRange(1, nullptr, &x);
dbfull()->TEST_CompactRange(1, nullptr, &x, false);
ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]");
ASSERT_TRUE(Between(Size("", "pastfoo"), 0, 1000));
@ -1427,11 +1427,11 @@ TEST_F(DBTest, DeletionMarkers1) {
ASSERT_LEVELDB_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
Slice z("z");
dbfull()->TEST_CompactRange(last - 2, nullptr, &z);
dbfull()->TEST_CompactRange(last - 2, nullptr, &z, false);
// DEL eliminated, but v1 remains because we aren't compacting that level
// (DEL can be eliminated because v2 hides v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, v1 ]");
dbfull()->TEST_CompactRange(last - 1, nullptr, nullptr);
dbfull()->TEST_CompactRange(last - 1, nullptr, nullptr, false);
// Merging last-1 w/ last, so we are the base level for "foo", so
// DEL is removed. (as is v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ v2 ]");
@ -1454,10 +1454,10 @@ TEST_F(DBTest, DeletionMarkers2) {
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
ASSERT_LEVELDB_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
dbfull()->TEST_CompactRange(last - 2, nullptr, nullptr);
dbfull()->TEST_CompactRange(last - 2, nullptr, nullptr, false);
// DEL kept: "last" file overlaps
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
dbfull()->TEST_CompactRange(last - 1, nullptr, nullptr);
dbfull()->TEST_CompactRange(last - 1, nullptr, nullptr, false);
// Merging last-1 w/ last, so we are the base level for "foo", so
// DEL is removed. (as is v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ ]");
@ -1491,8 +1491,8 @@ TEST_F(DBTest, OverlapInLevel0) {
ASSERT_EQ("2,1,1", FilesPerLevel());
// Compact away the placeholder files we created initially
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
dbfull()->TEST_CompactRange(2, nullptr, nullptr);
dbfull()->TEST_CompactRange(1, nullptr, nullptr, false);
dbfull()->TEST_CompactRange(2, nullptr, nullptr, false);
ASSERT_EQ("2", FilesPerLevel());
// Do a memtable compaction. Before bug-fix, the compaction would
@ -1787,7 +1787,7 @@ TEST_F(DBTest, NoSpace) {
env_->no_space_.store(true, std::memory_order_release);
for (int i = 0; i < 10; i++) {
for (int level = 0; level < config::kNumLevels - 1; level++) {
dbfull()->TEST_CompactRange(level, nullptr, nullptr);
dbfull()->TEST_CompactRange(level, nullptr, nullptr, false);
}
}
env_->no_space_.store(false, std::memory_order_release);
@ -1876,7 +1876,7 @@ TEST_F(DBTest, ManifestWriteError) {
// Merging compaction (will fail)
error_type->store(true, std::memory_order_release);
dbfull()->TEST_CompactRange(last, nullptr, nullptr); // Should fail
dbfull()->TEST_CompactRange(last, nullptr, nullptr, false); // Should fail
ASSERT_EQ("bar", Get("foo"));
// Recovery: should not lose data
@ -2117,6 +2117,9 @@ class ModelDB : public DB {
Status Put(const WriteOptions& o, const Slice& k, const Slice& v) override {
return DB::Put(o, k, v);
}
Status Put(const WriteOptions& o, const Slice& k, const Slice& v, uint64_t ttl) override {
return DB::Put(o, k, v, ttl);
}
Status Delete(const WriteOptions& o, const Slice& key) override {
return DB::Delete(o, key);
}

+ 23
- 5
db/version_set.cc View File

@ -1389,9 +1389,12 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]);
GetRange(c->inputs_[0], &smallest, &largest);
current_->GetOverlappingInputs(level + 1, &smallest, &largest,
&c->inputs_[1]);
AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]);
// TTL: manual compaction for last level shouldn't have inputs[1]
if (!c->is_last_level()) {
current_->GetOverlappingInputs(level + 1, &smallest, &largest,
&c->inputs_[1]);
AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]);
}
// Get entire range covered by compaction
InternalKey all_start, all_limit;
@ -1446,7 +1449,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
}
Compaction* VersionSet::CompactRange(int level, const InternalKey* begin,
const InternalKey* end) {
const InternalKey* end, bool is_last_level) {
std::vector<FileMetaData*> inputs;
current_->GetOverlappingInputs(level, begin, end, &inputs);
if (inputs.empty()) {
@ -1470,16 +1473,31 @@ Compaction* VersionSet::CompactRange(int level, const InternalKey* begin,
}
}
Compaction* c = new Compaction(options_, level);
Compaction* c = new Compaction(options_, level, is_last_level);
c->input_version_ = current_;
c->input_version_->Ref();
c->inputs_[0] = inputs;
SetupOtherInputs(c);
return c;
}
Compaction::Compaction(const Options* options, int level, bool is_last_level)
: level_(level),
is_last_level_(is_last_level),
max_output_file_size_(MaxFileSizeForLevel(options, level)),
input_version_(nullptr),
grandparent_index_(0),
seen_key_(false),
overlapped_bytes_(0) {
for (int i = 0; i < config::kNumLevels; i++) {
level_ptrs_[i] = 0;
}
}
Compaction::Compaction(const Options* options, int level)
: level_(level),
is_last_level_(false),
max_output_file_size_(MaxFileSizeForLevel(options, level)),
input_version_(nullptr),
grandparent_index_(0),

+ 6
- 2
db/version_set.h View File

@ -238,7 +238,7 @@ class VersionSet {
// level that overlaps the specified range. Caller should delete
// the result.
Compaction* CompactRange(int level, const InternalKey* begin,
const InternalKey* end);
const InternalKey* end, bool is_last_level);
// Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1.
@ -286,7 +286,7 @@ class VersionSet {
const std::vector<FileMetaData*>& inputs2,
InternalKey* smallest, InternalKey* largest);
void SetupOtherInputs(Compaction* c);
void SetupOtherInputs(Compaction* c);
// Save current contents to *log
Status WriteSnapshot(log::Writer* log);
@ -324,6 +324,8 @@ class Compaction {
// and "level+1" will be merged to produce a set of "level+1" files.
int level() const { return level_; }
bool is_last_level() const {return is_last_level_; }
// Return the object that holds the edits to the descriptor done
// by this compaction.
VersionEdit* edit() { return &edit_; }
@ -362,8 +364,10 @@ class Compaction {
friend class VersionSet;
Compaction(const Options* options, int level);
Compaction(const Options* options, int level, bool is_last_level);
int level_;
bool is_last_level_; // TTL: info whether the last level to compact
uint64_t max_output_file_size_;
Version* input_version_;
VersionEdit edit_;

BIN
img/all_pass.jpg View File

Before After
Width: 2050  |  Height: 318  |  Size: 80 KiB

BIN
img/test1_succ.png View File

Before After
Width: 343  |  Height: 37  |  Size: 7.2 KiB

BIN
img/test2_succ.png View File

Before After
Width: 386  |  Height: 39  |  Size: 8.7 KiB

BIN
img/test3_failure.png View File

Before After
Width: 1556  |  Height: 492  |  Size: 114 KiB

BIN
img/test3_succ.png View File

Before After
Width: 1152  |  Height: 360  |  Size: 75 KiB

+ 27
- 0
test/db_test1.cc View File

@ -0,0 +1,27 @@
#include "leveldb/db.h"
#include <iostream>
using namespace std;
using namespace leveldb;
int main() {
DB* db = nullptr;
Options op;
op.create_if_missing = true;
Status status = DB::Open(op, "testdb", &db);
assert(status.ok());
db->Put(WriteOptions(), "001", "leveldb", 5);
string s;
auto stat = db->Get(ReadOptions(), "001", &s);
cout<<s<<endl;
db->Put(WriteOptions(), "002", "world");
string s1;
db->Delete(WriteOptions(), "002");
db->Get(ReadOptions(), "002", &s1);
cout<<s1<<endl;
delete db;
return 0;
}

+ 60
- 14
test/ttl_test.cc View File

@ -1,11 +1,7 @@
#include "gtest/gtest.h"
#include "leveldb/env.h"
#include "leveldb/db.h"
using namespace leveldb;
constexpr int value_size = 2048;
@ -20,7 +16,7 @@ Status OpenDB(std::string dbName, DB **db) {
void InsertData(DB *db, uint64_t ttl/* second */) {
WriteOptions writeOptions;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
srand(0);
for (int i = 0; i < key_num; i++) {
int key_ = rand() % key_num+1;
@ -35,7 +31,7 @@ void GetData(DB *db, int size = (1 << 30)) {
int key_num = data_size / value_size;
// 点查
srand(static_cast<unsigned int>(time(0)));
srand(0);
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
@ -58,7 +54,7 @@ TEST(TestTTL, ReadTTL) {
ReadOptions readOptions;
Status status;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
srand(0);
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
@ -76,6 +72,8 @@ TEST(TestTTL, ReadTTL) {
status = db->Get(readOptions, key, &value);
ASSERT_FALSE(status.ok());
}
delete db;
}
TEST(TestTTL, CompactionTTL) {
@ -94,16 +92,64 @@ TEST(TestTTL, CompactionTTL) {
uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_GT(sizes[0], 0);
Env::Default()->SleepForMicroseconds(ttl * 1000000);
db->CompactRange(nullptr, nullptr);
leveldb::Range ranges1[1];
ranges1[0] = leveldb::Range("-", "A");
uint64_t sizes1[1];
db->GetApproximateSizes(ranges1, 1, sizes1);
ASSERT_EQ(sizes1[0], 0);
leveldb::Range ranges[1];
ranges[0] = leveldb::Range("-", "A");
uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_EQ(sizes[0], 0);
delete db;
}
// Time-consuming when kNumLevels is 7, so run this test when kNumLevels is set to 3,
// the codes on branch light_ver is a version with kNumLevels set to 3.
// In this case level 2 is the last level with files.
TEST(TestTTL, LastLevelCompaction) {
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
uint64_t ttl = 20;
InsertData(db, ttl);
leveldb::Range ranges[1];
ranges[0] = leveldb::Range("-", "A");
uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_GT(sizes[0], 0);
int last_level_file_num;
std::string file_num;
std::string last_level = "2";
for (int level = 0; level < 7; level++){
db->GetProperty("leveldb.num-files-at-level" + std::to_string(level), &file_num);
if (std::atoi(file_num.c_str()) > 0){
last_level = std::to_string(level);
last_level_file_num = std::atoi(file_num.c_str());
}
else{
break;
}
}
std::cout << "File nums in last level: " << last_level_file_num << std::endl;
ASSERT_GT(last_level_file_num, 0);
Env::Default()->SleepForMicroseconds(ttl * 1000000);
db->CompactRange(nullptr, nullptr);
leveldb::Range ranges1[1];
ranges1[0] = leveldb::Range("-", "A");
uint64_t sizes1[1];
db->GetApproximateSizes(ranges1, 1, sizes1);
ASSERT_EQ(sizes1[0], 0);
delete db;
}
@ -111,4 +157,4 @@ int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits.
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
}

+ 11
- 0
util/coding.h View File

@ -117,6 +117,17 @@ inline const char* GetVarint32Ptr(const char* p, const char* limit,
return GetVarint32PtrFallback(p, limit, value);
}
inline std::string EncodeDeadLine(uint64_t ddl, const Slice& value) {
return std::to_string(ddl) + "|" + value.ToString();
}
inline void DecodeDeadLineValue(std::string* value, uint64_t& ddl) {
auto separator = value->find_first_of("|");
std::string ddl_str = value->substr(0, separator);
ddl = std::atoll(ddl_str.c_str());
*value = value->substr(separator + 1);
}
} // namespace leveldb
#endif // STORAGE_LEVELDB_UTIL_CODING_H_

Loading…
Cancel
Save