Compare commits

...

8 Commits

Author SHA1 Message Date
  ArcueidType 9c689f2f8b naive version which fails TEST LastLevelCompaction 10 months ago
  ArcueidType c175e4ed40 comment for TEST LastLevelCompaction 10 months ago
  ArcueidType a75a5f5c4d handle last level 10 months ago
  ArcueidType 2d970272b0 add a TODO 10 months ago
  ArcueidType 8edba27b88 add delete db for all tests 10 months ago
  ArcueidType 57cd670dcc a naive version to pass ttl_test 10 months ago
  ArcueidType 78185e9e9c optimize codes structure and paa ReadTTL test 10 months ago
  ArcueidType a0cfce1188 Modify Put/Get methods for TTL support 10 months ago
9 changed files with 185 additions and 40 deletions
Split View
  1. +6
    -1
      CMakeLists.txt
  2. +9
    -2
      db/builder.cc
  3. +70
    -14
      db/db_impl.cc
  4. +2
    -0
      db/db_impl.h
  5. +1
    -1
      db/dbformat.h
  6. +9
    -8
      db/version_set.cc
  7. +27
    -0
      test/db_test1.cc
  8. +50
    -14
      test/ttl_test.cc
  9. +11
    -0
      util/coding.h

+ 6
- 1
CMakeLists.txt View File

@ -528,4 +528,9 @@ target_link_libraries(db_test2 PRIVATE leveldb)
add_executable(ttl_test
"${PROJECT_SOURCE_DIR}/test/ttl_test.cc"
)
target_link_libraries(ttl_test PRIVATE leveldb gtest)
target_link_libraries(ttl_test PRIVATE leveldb gtest)
add_executable(db_test1
"${PROJECT_SOURCE_DIR}/test/db_test1.cc"
)
target_link_libraries(db_test1 leveldb)

+ 9
- 2
db/builder.cc View File

@ -32,8 +32,15 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
meta->smallest.DecodeFrom(iter->key());
Slice key;
for (; iter->Valid(); iter->Next()) {
key = iter->key();
builder->Add(key, iter->value());
// std::string value;
// uint64_t ddl;
// value = iter->value().ToString();
// DecodeDeadLineValue(&value, ddl);
// if (ddl > std::time(nullptr)) {
// TTL: only liver remains
key = iter->key();
builder->Add(key, iter->value());
// }
}
if (!key.empty()) {
meta->largest.DecodeFrom(key);

+ 70
- 14
db/db_impl.cc View File

@ -4,14 +4,6 @@
#include "db/db_impl.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <set>
#include <string>
#include <vector>
#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
@ -22,11 +14,22 @@
#include "db/table_cache.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <iostream>
#include <ostream>
#include <set>
#include <string>
#include <vector>
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/status.h"
#include "leveldb/table.h"
#include "leveldb/table_builder.h"
#include "port/port.h"
#include "table/block.h"
#include "table/merger.h"
@ -591,7 +594,7 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
}
}
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
for (int level = 0; level < max_level_with_files; level++) {
for (int level = 0; level < max_level_with_files + 1; level++) {
TEST_CompactRange(level, begin, end);
}
}
@ -599,10 +602,14 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
void DBImpl::TEST_CompactRange(int level, const Slice* begin,
const Slice* end) {
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
assert(level < config::kNumLevels);
InternalKey begin_storage, end_storage;
if (level == config::kNumLevels - 1) {
return;
}
ManualCompaction manual;
manual.level = level;
manual.done = false;
@ -888,8 +895,14 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
const int level = compact->compaction->level();
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.smallest, out.largest);
if (level < config::kNumLevels - 1) {
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.smallest, out.largest);
} else {
// TTL: outputs of last level compaction should be writen to last level itself
compact->compaction->edit()->AddFile(level, out.number, out.file_size,
out.smallest, out.largest);
}
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}
@ -963,6 +976,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
last_sequence_for_key = kMaxSequenceNumber;
}
std::string value = input->value().ToString();
uint64_t ddl;
DecodeDeadLineValue(&value, ddl);
if (last_sequence_for_key <= compact->smallest_snapshot) {
// Hidden by an newer entry for same user key
drop = true; // (A)
@ -977,6 +995,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
} else if (ddl <= std::time(nullptr)) {
// TTL: data expired
drop = true;
}
last_sequence_for_key = ikey.sequence;
@ -1042,7 +1063,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}
mutex_.Lock();
stats_[compact->compaction->level() + 1].Add(stats);
if (compact->compaction->level() + 1 < config::kNumLevels) {
stats_[compact->compaction->level() + 1].Add(stats);
} else {
// TTL: compaction for last level
stats_[compact->compaction->level()].Add(stats);
}
if (status.ok()) {
status = InstallCompactionResults(compact);
@ -1152,6 +1178,21 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
// TTL: Get the true value and make sure the data is still living
if(!value->empty()) {
uint64_t dead_line;
DecodeDeadLineValue(value, dead_line);
if (dead_line != 0) {
// use TTL
if (std::time(nullptr) >= dead_line) {
// data expired
*value = "";
s = Status::NotFound("Data expired");
}
} else {
// TTL not set
}
}
mutex_.Lock();
}
@ -1198,6 +1239,11 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val);
}
Status DBImpl::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) {
return DB::Put(options, key, value, ttl);
}
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
return DB::Delete(options, key);
}
@ -1485,12 +1531,22 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
// TTL: Update TTL Encode
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
batch.Put(key, EncodeDeadLine(0, value));
return Write(opt, &batch);
}
// TTL: Put methods for ttl
Status DB::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) {
WriteBatch batch;
auto dead_line = std::time(nullptr) + ttl;
batch.Put(key, EncodeDeadLine(dead_line, value));
return Write(options, &batch);
}
Status DB::Delete(const WriteOptions& opt, const Slice& key) {
WriteBatch batch;
batch.Delete(key);

+ 2
- 0
db/db_impl.h View File

@ -38,6 +38,8 @@ class DBImpl : public DB {
// Implementations of the DB interface
Status Put(const WriteOptions&, const Slice& key,
const Slice& value) override;
Status Put(const WriteOptions& options, const Slice& key, const Slice& value,
uint64_t ttl) override;
Status Delete(const WriteOptions&, const Slice& key) override;
Status Write(const WriteOptions& options, WriteBatch* updates) override;
Status Get(const ReadOptions& options, const Slice& key,

+ 1
- 1
db/dbformat.h View File

@ -22,7 +22,7 @@ namespace leveldb {
// Grouping of constants. We may want to make some of these
// parameters set via options.
namespace config {
static const int kNumLevels = 7;
static const int kNumLevels = 3;
// Level-0 compaction is started when we hit this many files.
static const int kL0_CompactionTrigger = 4;

+ 9
- 8
db/version_set.cc View File

@ -1104,13 +1104,11 @@ int VersionSet::NumLevelFiles(int level) const {
const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
// Update code if kNumLevels changes
static_assert(config::kNumLevels == 7, "");
static_assert(config::kNumLevels == 3, "");
std::snprintf(
scratch->buffer, sizeof(scratch->buffer), "files[ %d %d %d %d %d %d %d ]",
scratch->buffer, sizeof(scratch->buffer), "files[ %d %d %d ]",
int(current_->files_[0].size()), int(current_->files_[1].size()),
int(current_->files_[2].size()), int(current_->files_[3].size()),
int(current_->files_[4].size()), int(current_->files_[5].size()),
int(current_->files_[6].size()));
int(current_->files_[2].size()));
return scratch->buffer;
}
@ -1389,9 +1387,12 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]);
GetRange(c->inputs_[0], &smallest, &largest);
current_->GetOverlappingInputs(level + 1, &smallest, &largest,
&c->inputs_[1]);
AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]);
// TTL: manual compaction for last level shouldn't have inputs[1]
if (level + 1 < config::kNumLevels) {
current_->GetOverlappingInputs(level + 1, &smallest, &largest,
&c->inputs_[1]);
AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]);
}
// Get entire range covered by compaction
InternalKey all_start, all_limit;

+ 27
- 0
test/db_test1.cc View File

@ -0,0 +1,27 @@
#include "leveldb/db.h"
#include <iostream>
using namespace std;
using namespace leveldb;
int main() {
DB* db = nullptr;
Options op;
op.create_if_missing = true;
Status status = DB::Open(op, "testdb", &db);
assert(status.ok());
db->Put(WriteOptions(), "001", "leveldb", 5);
string s;
auto stat = db->Get(ReadOptions(), "001", &s);
cout<<s<<endl;
db->Put(WriteOptions(), "002", "world");
string s1;
db->Delete(WriteOptions(), "002");
db->Get(ReadOptions(), "002", &s1);
cout<<s1<<endl;
delete db;
return 0;
}

+ 50
- 14
test/ttl_test.cc View File

@ -1,11 +1,7 @@
#include "gtest/gtest.h"
#include "leveldb/env.h"
#include "leveldb/db.h"
using namespace leveldb;
constexpr int value_size = 2048;
@ -20,7 +16,7 @@ Status OpenDB(std::string dbName, DB **db) {
void InsertData(DB *db, uint64_t ttl/* second */) {
WriteOptions writeOptions;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
srand(0);
for (int i = 0; i < key_num; i++) {
int key_ = rand() % key_num+1;
@ -35,7 +31,7 @@ void GetData(DB *db, int size = (1 << 30)) {
int key_num = data_size / value_size;
// 点查
srand(static_cast<unsigned int>(time(0)));
srand(0);
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
@ -58,7 +54,7 @@ TEST(TestTTL, ReadTTL) {
ReadOptions readOptions;
Status status;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
srand(0);
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
@ -76,6 +72,8 @@ TEST(TestTTL, ReadTTL) {
status = db->Get(readOptions, key, &value);
ASSERT_FALSE(status.ok());
}
delete db;
}
TEST(TestTTL, CompactionTTL) {
@ -94,16 +92,54 @@ TEST(TestTTL, CompactionTTL) {
uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_GT(sizes[0], 0);
Env::Default()->SleepForMicroseconds(ttl * 1000000);
db->CompactRange(nullptr, nullptr);
leveldb::Range ranges1[1];
ranges1[0] = leveldb::Range("-", "A");
uint64_t sizes1[1];
db->GetApproximateSizes(ranges1, 1, sizes1);
ASSERT_EQ(sizes1[0], 0);
leveldb::Range ranges[1];
ranges[0] = leveldb::Range("-", "A");
uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_EQ(sizes[0], 0);
delete db;
}
// Time-consuming when kNumLevels is 7, so run this test when kNumLevels is set to 3,
// the codes on branch light_ver is a version with kNumLevels set to 3.
// In this case level 2 is the last level with files.
TEST(TestTTL, LastLevelCompaction) {
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
uint64_t ttl = 20;
InsertData(db, ttl);
leveldb::Range ranges[1];
ranges[0] = leveldb::Range("-", "A");
uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_GT(sizes[0], 0);
std::string last_level_file_num;
std::string last_level = "2";
db->GetProperty("leveldb.num-files-at-level" + last_level, &last_level_file_num);
std::cout << "File nums in last level: " << last_level_file_num << std::endl;
ASSERT_GT(std::atoi(last_level_file_num.c_str()), 0);
Env::Default()->SleepForMicroseconds(ttl * 1000000);
db->CompactRange(nullptr, nullptr);
leveldb::Range ranges1[1];
ranges1[0] = leveldb::Range("-", "A");
uint64_t sizes1[1];
db->GetApproximateSizes(ranges1, 1, sizes1);
ASSERT_EQ(sizes1[0], 0);
delete db;
}
@ -111,4 +147,4 @@ int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits.
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
}

+ 11
- 0
util/coding.h View File

@ -117,6 +117,17 @@ inline const char* GetVarint32Ptr(const char* p, const char* limit,
return GetVarint32PtrFallback(p, limit, value);
}
inline std::string EncodeDeadLine(uint64_t ddl, const Slice& value) {
return std::to_string(ddl) + "|" + value.ToString();
}
inline void DecodeDeadLineValue(std::string* value, uint64_t& ddl) {
auto separator = value->find_first_of("|");
std::string ddl_str = value->substr(0, separator);
ddl = std::atoll(ddl_str.c_str());
*value = value->substr(separator + 1);
}
} // namespace leveldb
#endif // STORAGE_LEVELDB_UTIL_CODING_H_

Loading…
Cancel
Save