Compare commits

...

8 Commits

Author SHA1 Message Date
  ArcueidType 9c689f2f8b naive version which fails TEST LastLevelCompaction 10 months ago
  ArcueidType c175e4ed40 comment for TEST LastLevelCompaction 10 months ago
  ArcueidType a75a5f5c4d handle last level 10 months ago
  ArcueidType 2d970272b0 add a TODO 10 months ago
  ArcueidType 8edba27b88 add delete db for all tests 10 months ago
  ArcueidType 57cd670dcc a naive version to pass ttl_test 10 months ago
  ArcueidType 78185e9e9c optimize codes structure and paa ReadTTL test 10 months ago
  ArcueidType a0cfce1188 Modify Put/Get methods for TTL support 10 months ago
9 changed files with 185 additions and 40 deletions
Unified View
  1. +6
    -1
      CMakeLists.txt
  2. +9
    -2
      db/builder.cc
  3. +70
    -14
      db/db_impl.cc
  4. +2
    -0
      db/db_impl.h
  5. +1
    -1
      db/dbformat.h
  6. +9
    -8
      db/version_set.cc
  7. +27
    -0
      test/db_test1.cc
  8. +50
    -14
      test/ttl_test.cc
  9. +11
    -0
      util/coding.h

+ 6
- 1
CMakeLists.txt View File

@ -528,4 +528,9 @@ target_link_libraries(db_test2 PRIVATE leveldb)
add_executable(ttl_test add_executable(ttl_test
"${PROJECT_SOURCE_DIR}/test/ttl_test.cc" "${PROJECT_SOURCE_DIR}/test/ttl_test.cc"
) )
target_link_libraries(ttl_test PRIVATE leveldb gtest)
target_link_libraries(ttl_test PRIVATE leveldb gtest)
add_executable(db_test1
"${PROJECT_SOURCE_DIR}/test/db_test1.cc"
)
target_link_libraries(db_test1 leveldb)

+ 9
- 2
db/builder.cc View File

@ -32,8 +32,15 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
meta->smallest.DecodeFrom(iter->key()); meta->smallest.DecodeFrom(iter->key());
Slice key; Slice key;
for (; iter->Valid(); iter->Next()) { for (; iter->Valid(); iter->Next()) {
key = iter->key();
builder->Add(key, iter->value());
// std::string value;
// uint64_t ddl;
// value = iter->value().ToString();
// DecodeDeadLineValue(&value, ddl);
// if (ddl > std::time(nullptr)) {
// TTL: only liver remains
key = iter->key();
builder->Add(key, iter->value());
// }
} }
if (!key.empty()) { if (!key.empty()) {
meta->largest.DecodeFrom(key); meta->largest.DecodeFrom(key);

+ 70
- 14
db/db_impl.cc View File

@ -4,14 +4,6 @@
#include "db/db_impl.h" #include "db/db_impl.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <set>
#include <string>
#include <vector>
#include "db/builder.h" #include "db/builder.h"
#include "db/db_iter.h" #include "db/db_iter.h"
#include "db/dbformat.h" #include "db/dbformat.h"
@ -22,11 +14,22 @@
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <iostream>
#include <ostream>
#include <set>
#include <string>
#include <vector>
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/status.h" #include "leveldb/status.h"
#include "leveldb/table.h" #include "leveldb/table.h"
#include "leveldb/table_builder.h" #include "leveldb/table_builder.h"
#include "port/port.h" #include "port/port.h"
#include "table/block.h" #include "table/block.h"
#include "table/merger.h" #include "table/merger.h"
@ -591,7 +594,7 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
} }
} }
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
for (int level = 0; level < max_level_with_files; level++) {
for (int level = 0; level < max_level_with_files + 1; level++) {
TEST_CompactRange(level, begin, end); TEST_CompactRange(level, begin, end);
} }
} }
@ -599,10 +602,14 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
void DBImpl::TEST_CompactRange(int level, const Slice* begin, void DBImpl::TEST_CompactRange(int level, const Slice* begin,
const Slice* end) { const Slice* end) {
assert(level >= 0); assert(level >= 0);
assert(level + 1 < config::kNumLevels);
assert(level < config::kNumLevels);
InternalKey begin_storage, end_storage; InternalKey begin_storage, end_storage;
if (level == config::kNumLevels - 1) {
return;
}
ManualCompaction manual; ManualCompaction manual;
manual.level = level; manual.level = level;
manual.done = false; manual.done = false;
@ -888,8 +895,14 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
const int level = compact->compaction->level(); const int level = compact->compaction->level();
for (size_t i = 0; i < compact->outputs.size(); i++) { for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i]; const CompactionState::Output& out = compact->outputs[i];
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.smallest, out.largest);
if (level < config::kNumLevels - 1) {
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.smallest, out.largest);
} else {
// TTL: outputs of last level compaction should be writen to last level itself
compact->compaction->edit()->AddFile(level, out.number, out.file_size,
out.smallest, out.largest);
}
} }
return versions_->LogAndApply(compact->compaction->edit(), &mutex_); return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
} }
@ -963,6 +976,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
last_sequence_for_key = kMaxSequenceNumber; last_sequence_for_key = kMaxSequenceNumber;
} }
std::string value = input->value().ToString();
uint64_t ddl;
DecodeDeadLineValue(&value, ddl);
if (last_sequence_for_key <= compact->smallest_snapshot) { if (last_sequence_for_key <= compact->smallest_snapshot) {
// Hidden by an newer entry for same user key // Hidden by an newer entry for same user key
drop = true; // (A) drop = true; // (A)
@ -977,6 +995,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// few iterations of this loop (by rule (A) above). // few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped. // Therefore this deletion marker is obsolete and can be dropped.
drop = true; drop = true;
} else if (ddl <= std::time(nullptr)) {
// TTL: data expired
drop = true;
} }
last_sequence_for_key = ikey.sequence; last_sequence_for_key = ikey.sequence;
@ -1042,7 +1063,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
} }
mutex_.Lock(); mutex_.Lock();
stats_[compact->compaction->level() + 1].Add(stats);
if (compact->compaction->level() + 1 < config::kNumLevels) {
stats_[compact->compaction->level() + 1].Add(stats);
} else {
// TTL: compaction for last level
stats_[compact->compaction->level()].Add(stats);
}
if (status.ok()) { if (status.ok()) {
status = InstallCompactionResults(compact); status = InstallCompactionResults(compact);
@ -1152,6 +1178,21 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
s = current->Get(options, lkey, value, &stats); s = current->Get(options, lkey, value, &stats);
have_stat_update = true; have_stat_update = true;
} }
// TTL: Get the true value and make sure the data is still living
if(!value->empty()) {
uint64_t dead_line;
DecodeDeadLineValue(value, dead_line);
if (dead_line != 0) {
// use TTL
if (std::time(nullptr) >= dead_line) {
// data expired
*value = "";
s = Status::NotFound("Data expired");
}
} else {
// TTL not set
}
}
mutex_.Lock(); mutex_.Lock();
} }
@ -1198,6 +1239,11 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val); return DB::Put(o, key, val);
} }
Status DBImpl::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) {
return DB::Put(options, key, value, ttl);
}
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
return DB::Delete(options, key); return DB::Delete(options, key);
} }
@ -1485,12 +1531,22 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
// Default implementations of convenience methods that subclasses of DB // Default implementations of convenience methods that subclasses of DB
// can call if they wish // can call if they wish
// TTL: Update TTL Encode
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch; WriteBatch batch;
batch.Put(key, value);
batch.Put(key, EncodeDeadLine(0, value));
return Write(opt, &batch); return Write(opt, &batch);
} }
// TTL: Put methods for ttl
Status DB::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) {
WriteBatch batch;
auto dead_line = std::time(nullptr) + ttl;
batch.Put(key, EncodeDeadLine(dead_line, value));
return Write(options, &batch);
}
Status DB::Delete(const WriteOptions& opt, const Slice& key) { Status DB::Delete(const WriteOptions& opt, const Slice& key) {
WriteBatch batch; WriteBatch batch;
batch.Delete(key); batch.Delete(key);

+ 2
- 0
db/db_impl.h View File

@ -38,6 +38,8 @@ class DBImpl : public DB {
// Implementations of the DB interface // Implementations of the DB interface
Status Put(const WriteOptions&, const Slice& key, Status Put(const WriteOptions&, const Slice& key,
const Slice& value) override; const Slice& value) override;
Status Put(const WriteOptions& options, const Slice& key, const Slice& value,
uint64_t ttl) override;
Status Delete(const WriteOptions&, const Slice& key) override; Status Delete(const WriteOptions&, const Slice& key) override;
Status Write(const WriteOptions& options, WriteBatch* updates) override; Status Write(const WriteOptions& options, WriteBatch* updates) override;
Status Get(const ReadOptions& options, const Slice& key, Status Get(const ReadOptions& options, const Slice& key,

+ 1
- 1
db/dbformat.h View File

@ -22,7 +22,7 @@ namespace leveldb {
// Grouping of constants. We may want to make some of these // Grouping of constants. We may want to make some of these
// parameters set via options. // parameters set via options.
namespace config { namespace config {
static const int kNumLevels = 7;
static const int kNumLevels = 3;
// Level-0 compaction is started when we hit this many files. // Level-0 compaction is started when we hit this many files.
static const int kL0_CompactionTrigger = 4; static const int kL0_CompactionTrigger = 4;

+ 9
- 8
db/version_set.cc View File

@ -1104,13 +1104,11 @@ int VersionSet::NumLevelFiles(int level) const {
const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const { const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
// Update code if kNumLevels changes // Update code if kNumLevels changes
static_assert(config::kNumLevels == 7, "");
static_assert(config::kNumLevels == 3, "");
std::snprintf( std::snprintf(
scratch->buffer, sizeof(scratch->buffer), "files[ %d %d %d %d %d %d %d ]",
scratch->buffer, sizeof(scratch->buffer), "files[ %d %d %d ]",
int(current_->files_[0].size()), int(current_->files_[1].size()), int(current_->files_[0].size()), int(current_->files_[1].size()),
int(current_->files_[2].size()), int(current_->files_[3].size()),
int(current_->files_[4].size()), int(current_->files_[5].size()),
int(current_->files_[6].size()));
int(current_->files_[2].size()));
return scratch->buffer; return scratch->buffer;
} }
@ -1389,9 +1387,12 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]); AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]);
GetRange(c->inputs_[0], &smallest, &largest); GetRange(c->inputs_[0], &smallest, &largest);
current_->GetOverlappingInputs(level + 1, &smallest, &largest,
&c->inputs_[1]);
AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]);
// TTL: manual compaction for last level shouldn't have inputs[1]
if (level + 1 < config::kNumLevels) {
current_->GetOverlappingInputs(level + 1, &smallest, &largest,
&c->inputs_[1]);
AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]);
}
// Get entire range covered by compaction // Get entire range covered by compaction
InternalKey all_start, all_limit; InternalKey all_start, all_limit;

+ 27
- 0
test/db_test1.cc View File

@ -0,0 +1,27 @@
#include "leveldb/db.h"
#include <iostream>
using namespace std;
using namespace leveldb;
int main() {
DB* db = nullptr;
Options op;
op.create_if_missing = true;
Status status = DB::Open(op, "testdb", &db);
assert(status.ok());
db->Put(WriteOptions(), "001", "leveldb", 5);
string s;
auto stat = db->Get(ReadOptions(), "001", &s);
cout<<s<<endl;
db->Put(WriteOptions(), "002", "world");
string s1;
db->Delete(WriteOptions(), "002");
db->Get(ReadOptions(), "002", &s1);
cout<<s1<<endl;
delete db;
return 0;
}

+ 50
- 14
test/ttl_test.cc View File

@ -1,11 +1,7 @@
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/db.h" #include "leveldb/db.h"
using namespace leveldb; using namespace leveldb;
constexpr int value_size = 2048; constexpr int value_size = 2048;
@ -20,7 +16,7 @@ Status OpenDB(std::string dbName, DB **db) {
void InsertData(DB *db, uint64_t ttl/* second */) { void InsertData(DB *db, uint64_t ttl/* second */) {
WriteOptions writeOptions; WriteOptions writeOptions;
int key_num = data_size / value_size; int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
srand(0);
for (int i = 0; i < key_num; i++) { for (int i = 0; i < key_num; i++) {
int key_ = rand() % key_num+1; int key_ = rand() % key_num+1;
@ -35,7 +31,7 @@ void GetData(DB *db, int size = (1 << 30)) {
int key_num = data_size / value_size; int key_num = data_size / value_size;
// 点查 // 点查
srand(static_cast<unsigned int>(time(0)));
srand(0);
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1; int key_ = rand() % key_num+1;
std::string key = std::to_string(key_); std::string key = std::to_string(key_);
@ -58,7 +54,7 @@ TEST(TestTTL, ReadTTL) {
ReadOptions readOptions; ReadOptions readOptions;
Status status; Status status;
int key_num = data_size / value_size; int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
srand(0);
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1; int key_ = rand() % key_num+1;
std::string key = std::to_string(key_); std::string key = std::to_string(key_);
@ -76,6 +72,8 @@ TEST(TestTTL, ReadTTL) {
status = db->Get(readOptions, key, &value); status = db->Get(readOptions, key, &value);
ASSERT_FALSE(status.ok()); ASSERT_FALSE(status.ok());
} }
delete db;
} }
TEST(TestTTL, CompactionTTL) { TEST(TestTTL, CompactionTTL) {
@ -94,16 +92,54 @@ TEST(TestTTL, CompactionTTL) {
uint64_t sizes[1]; uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes); db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_GT(sizes[0], 0); ASSERT_GT(sizes[0], 0);
Env::Default()->SleepForMicroseconds(ttl * 1000000); Env::Default()->SleepForMicroseconds(ttl * 1000000);
db->CompactRange(nullptr, nullptr); db->CompactRange(nullptr, nullptr);
leveldb::Range ranges1[1];
ranges1[0] = leveldb::Range("-", "A");
uint64_t sizes1[1];
db->GetApproximateSizes(ranges1, 1, sizes1);
ASSERT_EQ(sizes1[0], 0);
leveldb::Range ranges[1];
ranges[0] = leveldb::Range("-", "A");
uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_EQ(sizes[0], 0);
delete db;
}
// Time-consuming when kNumLevels is 7, so run this test when kNumLevels is set to 3,
// the codes on branch light_ver is a version with kNumLevels set to 3.
// In this case level 2 is the last level with files.
TEST(TestTTL, LastLevelCompaction) {
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
uint64_t ttl = 20;
InsertData(db, ttl);
leveldb::Range ranges[1];
ranges[0] = leveldb::Range("-", "A");
uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_GT(sizes[0], 0);
std::string last_level_file_num;
std::string last_level = "2";
db->GetProperty("leveldb.num-files-at-level" + last_level, &last_level_file_num);
std::cout << "File nums in last level: " << last_level_file_num << std::endl;
ASSERT_GT(std::atoi(last_level_file_num.c_str()), 0);
Env::Default()->SleepForMicroseconds(ttl * 1000000);
db->CompactRange(nullptr, nullptr);
leveldb::Range ranges1[1];
ranges1[0] = leveldb::Range("-", "A");
uint64_t sizes1[1];
db->GetApproximateSizes(ranges1, 1, sizes1);
ASSERT_EQ(sizes1[0], 0);
delete db;
} }
@ -111,4 +147,4 @@ int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits. // All tests currently run with the same read-only file limits.
testing::InitGoogleTest(&argc, argv); testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
}
}

+ 11
- 0
util/coding.h View File

@ -117,6 +117,17 @@ inline const char* GetVarint32Ptr(const char* p, const char* limit,
return GetVarint32PtrFallback(p, limit, value); return GetVarint32PtrFallback(p, limit, value);
} }
inline std::string EncodeDeadLine(uint64_t ddl, const Slice& value) {
return std::to_string(ddl) + "|" + value.ToString();
}
inline void DecodeDeadLineValue(std::string* value, uint64_t& ddl) {
auto separator = value->find_first_of("|");
std::string ddl_str = value->substr(0, separator);
ddl = std::atoll(ddl_str.c_str());
*value = value->substr(separator + 1);
}
} // namespace leveldb } // namespace leveldb
#endif // STORAGE_LEVELDB_UTIL_CODING_H_ #endif // STORAGE_LEVELDB_UTIL_CODING_H_

Loading…
Cancel
Save