Compare commits

...

13 Commits

Author SHA1 Message Date
  ArcueidType 28ae1b52c5 update new implementation 10 months ago
  ArcueidType 47e4063400 simplify SetOtherInputs 10 months ago
  ArcueidType 45c1b090ae newest implementation 10 months ago
  ArcueidType de6432f1e2 optimize last level handling 10 months ago
  ArcueidType c175e4ed40 comment for TEST LastLevelCompaction 10 months ago
  ArcueidType 2481ca24ba config kNumLevels for TEST LastLevelCompaction 10 months ago
  ArcueidType 830f61ddb7 config kNumLevels for TEST LastLevelCompaction 10 months ago
  ArcueidType a75a5f5c4d handle last level 10 months ago
  ArcueidType 2d970272b0 add a TODO 10 months ago
  ArcueidType 8edba27b88 add delete db for all tests 10 months ago
  ArcueidType 57cd670dcc a naive version to pass ttl_test 10 months ago
  ArcueidType 78185e9e9c optimize codes structure and paa ReadTTL test 10 months ago
  ArcueidType a0cfce1188 Modify Put/Get methods for TTL support 10 months ago
12 changed files with 229 additions and 69 deletions
Unified View
  1. +6
    -1
      CMakeLists.txt
  2. +9
    -2
      db/builder.cc
  3. +5
    -5
      db/corruption_test.cc
  4. +70
    -17
      db/db_impl.cc
  5. +5
    -1
      db/db_impl.h
  6. +17
    -17
      db/db_test.cc
  7. +1
    -1
      db/dbformat.h
  8. +25
    -10
      db/version_set.cc
  9. +5
    -1
      db/version_set.h
  10. +27
    -0
      test/db_test1.cc
  11. +48
    -14
      test/ttl_test.cc
  12. +11
    -0
      util/coding.h

+ 6
- 1
CMakeLists.txt View File

@ -528,4 +528,9 @@ target_link_libraries(db_test2 PRIVATE leveldb)
add_executable(ttl_test add_executable(ttl_test
"${PROJECT_SOURCE_DIR}/test/ttl_test.cc" "${PROJECT_SOURCE_DIR}/test/ttl_test.cc"
) )
target_link_libraries(ttl_test PRIVATE leveldb gtest)
target_link_libraries(ttl_test PRIVATE leveldb gtest)
add_executable(db_test1
"${PROJECT_SOURCE_DIR}/test/db_test1.cc"
)
target_link_libraries(db_test1 leveldb)

+ 9
- 2
db/builder.cc View File

@ -32,8 +32,15 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
meta->smallest.DecodeFrom(iter->key()); meta->smallest.DecodeFrom(iter->key());
Slice key; Slice key;
for (; iter->Valid(); iter->Next()) { for (; iter->Valid(); iter->Next()) {
key = iter->key();
builder->Add(key, iter->value());
// std::string value;
// uint64_t ddl;
// value = iter->value().ToString();
// DecodeDeadLineValue(&value, ddl);
// if (ddl > std::time(nullptr)) {
// TTL: only liver remains
key = iter->key();
builder->Add(key, iter->value());
// }
} }
if (!key.empty()) { if (!key.empty()) {
meta->largest.DecodeFrom(key); meta->largest.DecodeFrom(key);

+ 5
- 5
db/corruption_test.cc View File

@ -228,8 +228,8 @@ TEST_F(CorruptionTest, TableFile) {
Build(100); Build(100);
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable(); dbi->TEST_CompactMemTable();
dbi->TEST_CompactRange(0, nullptr, nullptr);
dbi->TEST_CompactRange(1, nullptr, nullptr);
dbi->TEST_CompactRange(0, nullptr, nullptr, false);
dbi->TEST_CompactRange(1, nullptr, nullptr, false);
Corrupt(kTableFile, 100, 1); Corrupt(kTableFile, 100, 1);
Check(90, 99); Check(90, 99);
@ -242,8 +242,8 @@ TEST_F(CorruptionTest, TableFileRepair) {
Build(100); Build(100);
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable(); dbi->TEST_CompactMemTable();
dbi->TEST_CompactRange(0, nullptr, nullptr);
dbi->TEST_CompactRange(1, nullptr, nullptr);
dbi->TEST_CompactRange(0, nullptr, nullptr, false);
dbi->TEST_CompactRange(1, nullptr, nullptr, false);
Corrupt(kTableFile, 100, 1); Corrupt(kTableFile, 100, 1);
RepairDB(); RepairDB();
@ -293,7 +293,7 @@ TEST_F(CorruptionTest, CorruptedDescriptor) {
ASSERT_LEVELDB_OK(db_->Put(WriteOptions(), "foo", "hello")); ASSERT_LEVELDB_OK(db_->Put(WriteOptions(), "foo", "hello"));
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable(); dbi->TEST_CompactMemTable();
dbi->TEST_CompactRange(0, nullptr, nullptr);
dbi->TEST_CompactRange(0, nullptr, nullptr, false);
Corrupt(kDescriptorFile, 0, 1000); Corrupt(kDescriptorFile, 0, 1000);
Status s = TryReopen(); Status s = TryReopen();

+ 70
- 17
db/db_impl.cc View File

@ -4,14 +4,6 @@
#include "db/db_impl.h" #include "db/db_impl.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <set>
#include <string>
#include <vector>
#include "db/builder.h" #include "db/builder.h"
#include "db/db_iter.h" #include "db/db_iter.h"
#include "db/dbformat.h" #include "db/dbformat.h"
@ -22,11 +14,22 @@
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <iostream>
#include <ostream>
#include <set>
#include <string>
#include <vector>
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/status.h" #include "leveldb/status.h"
#include "leveldb/table.h" #include "leveldb/table.h"
#include "leveldb/table_builder.h" #include "leveldb/table_builder.h"
#include "port/port.h" #include "port/port.h"
#include "table/block.h" #include "table/block.h"
#include "table/merger.h" #include "table/merger.h"
@ -591,20 +594,21 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
} }
} }
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
for (int level = 0; level < max_level_with_files; level++) {
TEST_CompactRange(level, begin, end);
for (int level = 0; level < max_level_with_files + 1; level++) {
TEST_CompactRange(level, begin, end, level == max_level_with_files);
} }
} }
void DBImpl::TEST_CompactRange(int level, const Slice* begin, void DBImpl::TEST_CompactRange(int level, const Slice* begin,
const Slice* end) {
const Slice* end, bool is_last_level) {
assert(level >= 0); assert(level >= 0);
assert(level + 1 < config::kNumLevels);
assert(level < config::kNumLevels);
InternalKey begin_storage, end_storage; InternalKey begin_storage, end_storage;
ManualCompaction manual; ManualCompaction manual;
manual.level = level; manual.level = level;
manual.is_last_level = is_last_level;
manual.done = false; manual.done = false;
if (begin == nullptr) { if (begin == nullptr) {
manual.begin = nullptr; manual.begin = nullptr;
@ -717,7 +721,7 @@ void DBImpl::BackgroundCompaction() {
InternalKey manual_end; InternalKey manual_end;
if (is_manual) { if (is_manual) {
ManualCompaction* m = manual_compaction_; ManualCompaction* m = manual_compaction_;
c = versions_->CompactRange(m->level, m->begin, m->end);
c = versions_->CompactRange(m->level, m->begin, m->end, m->is_last_level);
m->done = (c == nullptr); m->done = (c == nullptr);
if (c != nullptr) { if (c != nullptr) {
manual_end = c->input(0, c->num_input_files(0) - 1)->largest; manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
@ -888,8 +892,14 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
const int level = compact->compaction->level(); const int level = compact->compaction->level();
for (size_t i = 0; i < compact->outputs.size(); i++) { for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i]; const CompactionState::Output& out = compact->outputs[i];
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.smallest, out.largest);
if (!compact->compaction->is_last_level()) {
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.smallest, out.largest);
} else {
// TTL: outputs of last level compaction should be writen to last level itself
compact->compaction->edit()->AddFile(level, out.number, out.file_size,
out.smallest, out.largest);
}
} }
return versions_->LogAndApply(compact->compaction->edit(), &mutex_); return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
} }
@ -963,6 +973,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
last_sequence_for_key = kMaxSequenceNumber; last_sequence_for_key = kMaxSequenceNumber;
} }
std::string value = input->value().ToString();
uint64_t ddl;
DecodeDeadLineValue(&value, ddl);
if (last_sequence_for_key <= compact->smallest_snapshot) { if (last_sequence_for_key <= compact->smallest_snapshot) {
// Hidden by an newer entry for same user key // Hidden by an newer entry for same user key
drop = true; // (A) drop = true; // (A)
@ -977,6 +992,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// few iterations of this loop (by rule (A) above). // few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped. // Therefore this deletion marker is obsolete and can be dropped.
drop = true; drop = true;
} else if (ddl <= std::time(nullptr)) {
// TTL: data expired
drop = true;
} }
last_sequence_for_key = ikey.sequence; last_sequence_for_key = ikey.sequence;
@ -1042,7 +1060,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
} }
mutex_.Lock(); mutex_.Lock();
stats_[compact->compaction->level() + 1].Add(stats);
if (!compact->compaction->is_last_level()) {
stats_[compact->compaction->level() + 1].Add(stats);
} else {
// TTL: compaction for last level
stats_[compact->compaction->level()].Add(stats);
}
if (status.ok()) { if (status.ok()) {
status = InstallCompactionResults(compact); status = InstallCompactionResults(compact);
@ -1152,6 +1175,21 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
s = current->Get(options, lkey, value, &stats); s = current->Get(options, lkey, value, &stats);
have_stat_update = true; have_stat_update = true;
} }
// TTL: Get the true value and make sure the data is still living
if(!value->empty()) {
uint64_t dead_line;
DecodeDeadLineValue(value, dead_line);
if (dead_line != 0) {
// use TTL
if (std::time(nullptr) >= dead_line) {
// data expired
*value = "";
s = Status::NotFound("Data expired");
}
} else {
// TTL not set
}
}
mutex_.Lock(); mutex_.Lock();
} }
@ -1198,6 +1236,11 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val); return DB::Put(o, key, val);
} }
Status DBImpl::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) {
return DB::Put(options, key, value, ttl);
}
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
return DB::Delete(options, key); return DB::Delete(options, key);
} }
@ -1485,12 +1528,22 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
// Default implementations of convenience methods that subclasses of DB // Default implementations of convenience methods that subclasses of DB
// can call if they wish // can call if they wish
// TTL: Update TTL Encode
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch; WriteBatch batch;
batch.Put(key, value);
batch.Put(key, EncodeDeadLine(0, value));
return Write(opt, &batch); return Write(opt, &batch);
} }
// TTL: Put methods for ttl
Status DB::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) {
WriteBatch batch;
auto dead_line = std::time(nullptr) + ttl;
batch.Put(key, EncodeDeadLine(dead_line, value));
return Write(options, &batch);
}
Status DB::Delete(const WriteOptions& opt, const Slice& key) { Status DB::Delete(const WriteOptions& opt, const Slice& key) {
WriteBatch batch; WriteBatch batch;
batch.Delete(key); batch.Delete(key);

+ 5
- 1
db/db_impl.h View File

@ -38,6 +38,8 @@ class DBImpl : public DB {
// Implementations of the DB interface // Implementations of the DB interface
Status Put(const WriteOptions&, const Slice& key, Status Put(const WriteOptions&, const Slice& key,
const Slice& value) override; const Slice& value) override;
Status Put(const WriteOptions& options, const Slice& key, const Slice& value,
uint64_t ttl) override;
Status Delete(const WriteOptions&, const Slice& key) override; Status Delete(const WriteOptions&, const Slice& key) override;
Status Write(const WriteOptions& options, WriteBatch* updates) override; Status Write(const WriteOptions& options, WriteBatch* updates) override;
Status Get(const ReadOptions& options, const Slice& key, Status Get(const ReadOptions& options, const Slice& key,
@ -52,7 +54,8 @@ class DBImpl : public DB {
// Extra methods (for testing) that are not in the public DB interface // Extra methods (for testing) that are not in the public DB interface
// Compact any files in the named level that overlap [*begin,*end] // Compact any files in the named level that overlap [*begin,*end]
void TEST_CompactRange(int level, const Slice* begin, const Slice* end);
void TEST_CompactRange(int level, const Slice* begin, const Slice* end,
bool is_last_level);
// Force current memtable contents to be compacted. // Force current memtable contents to be compacted.
Status TEST_CompactMemTable(); Status TEST_CompactMemTable();
@ -79,6 +82,7 @@ class DBImpl : public DB {
// Information for a manual compaction // Information for a manual compaction
struct ManualCompaction { struct ManualCompaction {
int level; int level;
bool is_last_level; // TTL: Used to check if last level with files
bool done; bool done;
const InternalKey* begin; // null means beginning of key range const InternalKey* begin; // null means beginning of key range
const InternalKey* end; // null means end of key range const InternalKey* end; // null means end of key range

+ 17
- 17
db/db_test.cc View File

@ -786,7 +786,7 @@ TEST_F(DBTest, GetEncountersEmptyLevel) {
} }
// Step 2: clear level 1 if necessary. // Step 2: clear level 1 if necessary.
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
dbfull()->TEST_CompactRange(1, nullptr, nullptr, false);
ASSERT_EQ(NumTableFilesAtLevel(0), 1); ASSERT_EQ(NumTableFilesAtLevel(0), 1);
ASSERT_EQ(NumTableFilesAtLevel(1), 0); ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 1); ASSERT_EQ(NumTableFilesAtLevel(2), 1);
@ -1145,7 +1145,7 @@ TEST_F(DBTest, CompactionsGenerateMultipleFiles) {
// Reopening moves updates to level-0 // Reopening moves updates to level-0
Reopen(&options); Reopen(&options);
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
dbfull()->TEST_CompactRange(0, nullptr, nullptr, false);
ASSERT_EQ(NumTableFilesAtLevel(0), 0); ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_GT(NumTableFilesAtLevel(1), 1); ASSERT_GT(NumTableFilesAtLevel(1), 1);
@ -1196,7 +1196,7 @@ TEST_F(DBTest, SparseMerge) {
} }
Put("C", "vc"); Put("C", "vc");
dbfull()->TEST_CompactMemTable(); dbfull()->TEST_CompactMemTable();
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
dbfull()->TEST_CompactRange(0, nullptr, nullptr, false);
// Make sparse update // Make sparse update
Put("A", "va2"); Put("A", "va2");
@ -1207,9 +1207,9 @@ TEST_F(DBTest, SparseMerge) {
// Compactions should not cause us to create a situation where // Compactions should not cause us to create a situation where
// a file overlaps too much data at the next level. // a file overlaps too much data at the next level.
ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20 * 1048576); ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20 * 1048576);
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
dbfull()->TEST_CompactRange(0, nullptr, nullptr, false);
ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20 * 1048576); ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20 * 1048576);
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
dbfull()->TEST_CompactRange(1, nullptr, nullptr, false);
ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20 * 1048576); ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20 * 1048576);
} }
@ -1273,7 +1273,7 @@ TEST_F(DBTest, ApproximateSizes) {
std::string cend_str = Key(compact_start + 9); std::string cend_str = Key(compact_start + 9);
Slice cstart = cstart_str; Slice cstart = cstart_str;
Slice cend = cend_str; Slice cend = cend_str;
dbfull()->TEST_CompactRange(0, &cstart, &cend);
dbfull()->TEST_CompactRange(0, &cstart, &cend, false);
} }
ASSERT_EQ(NumTableFilesAtLevel(0), 0); ASSERT_EQ(NumTableFilesAtLevel(0), 0);
@ -1320,7 +1320,7 @@ TEST_F(DBTest, ApproximateSizes_MixOfSmallAndLarge) {
ASSERT_TRUE(Between(Size(Key(3), Key(5)), 110000, 111000)); ASSERT_TRUE(Between(Size(Key(3), Key(5)), 110000, 111000));
dbfull()->TEST_CompactRange(0, nullptr, nullptr);
dbfull()->TEST_CompactRange(0, nullptr, nullptr, false);
} }
} while (ChangeOptions()); } while (ChangeOptions());
} }
@ -1397,11 +1397,11 @@ TEST_F(DBTest, HiddenValuesAreRemoved) {
db_->ReleaseSnapshot(snapshot); db_->ReleaseSnapshot(snapshot);
ASSERT_EQ(AllEntriesFor("foo"), "[ tiny, " + big + " ]"); ASSERT_EQ(AllEntriesFor("foo"), "[ tiny, " + big + " ]");
Slice x("x"); Slice x("x");
dbfull()->TEST_CompactRange(0, nullptr, &x);
dbfull()->TEST_CompactRange(0, nullptr, &x, false);
ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]"); ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]");
ASSERT_EQ(NumTableFilesAtLevel(0), 0); ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_GE(NumTableFilesAtLevel(1), 1); ASSERT_GE(NumTableFilesAtLevel(1), 1);
dbfull()->TEST_CompactRange(1, nullptr, &x);
dbfull()->TEST_CompactRange(1, nullptr, &x, false);
ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]"); ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]");
ASSERT_TRUE(Between(Size("", "pastfoo"), 0, 1000)); ASSERT_TRUE(Between(Size("", "pastfoo"), 0, 1000));
@ -1427,11 +1427,11 @@ TEST_F(DBTest, DeletionMarkers1) {
ASSERT_LEVELDB_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2 ASSERT_LEVELDB_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]"); ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
Slice z("z"); Slice z("z");
dbfull()->TEST_CompactRange(last - 2, nullptr, &z);
dbfull()->TEST_CompactRange(last - 2, nullptr, &z, false);
// DEL eliminated, but v1 remains because we aren't compacting that level // DEL eliminated, but v1 remains because we aren't compacting that level
// (DEL can be eliminated because v2 hides v1). // (DEL can be eliminated because v2 hides v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, v1 ]"); ASSERT_EQ(AllEntriesFor("foo"), "[ v2, v1 ]");
dbfull()->TEST_CompactRange(last - 1, nullptr, nullptr);
dbfull()->TEST_CompactRange(last - 1, nullptr, nullptr, false);
// Merging last-1 w/ last, so we are the base level for "foo", so // Merging last-1 w/ last, so we are the base level for "foo", so
// DEL is removed. (as is v1). // DEL is removed. (as is v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ v2 ]"); ASSERT_EQ(AllEntriesFor("foo"), "[ v2 ]");
@ -1454,10 +1454,10 @@ TEST_F(DBTest, DeletionMarkers2) {
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]"); ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
ASSERT_LEVELDB_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2 ASSERT_LEVELDB_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]"); ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
dbfull()->TEST_CompactRange(last - 2, nullptr, nullptr);
dbfull()->TEST_CompactRange(last - 2, nullptr, nullptr, false);
// DEL kept: "last" file overlaps // DEL kept: "last" file overlaps
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]"); ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
dbfull()->TEST_CompactRange(last - 1, nullptr, nullptr);
dbfull()->TEST_CompactRange(last - 1, nullptr, nullptr, false);
// Merging last-1 w/ last, so we are the base level for "foo", so // Merging last-1 w/ last, so we are the base level for "foo", so
// DEL is removed. (as is v1). // DEL is removed. (as is v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ ]"); ASSERT_EQ(AllEntriesFor("foo"), "[ ]");
@ -1491,8 +1491,8 @@ TEST_F(DBTest, OverlapInLevel0) {
ASSERT_EQ("2,1,1", FilesPerLevel()); ASSERT_EQ("2,1,1", FilesPerLevel());
// Compact away the placeholder files we created initially // Compact away the placeholder files we created initially
dbfull()->TEST_CompactRange(1, nullptr, nullptr);
dbfull()->TEST_CompactRange(2, nullptr, nullptr);
dbfull()->TEST_CompactRange(1, nullptr, nullptr, false);
dbfull()->TEST_CompactRange(2, nullptr, nullptr, false);
ASSERT_EQ("2", FilesPerLevel()); ASSERT_EQ("2", FilesPerLevel());
// Do a memtable compaction. Before bug-fix, the compaction would // Do a memtable compaction. Before bug-fix, the compaction would
@ -1787,7 +1787,7 @@ TEST_F(DBTest, NoSpace) {
env_->no_space_.store(true, std::memory_order_release); env_->no_space_.store(true, std::memory_order_release);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
for (int level = 0; level < config::kNumLevels - 1; level++) { for (int level = 0; level < config::kNumLevels - 1; level++) {
dbfull()->TEST_CompactRange(level, nullptr, nullptr);
dbfull()->TEST_CompactRange(level, nullptr, nullptr, false);
} }
} }
env_->no_space_.store(false, std::memory_order_release); env_->no_space_.store(false, std::memory_order_release);
@ -1876,7 +1876,7 @@ TEST_F(DBTest, ManifestWriteError) {
// Merging compaction (will fail) // Merging compaction (will fail)
error_type->store(true, std::memory_order_release); error_type->store(true, std::memory_order_release);
dbfull()->TEST_CompactRange(last, nullptr, nullptr); // Should fail
dbfull()->TEST_CompactRange(last, nullptr, nullptr, false); // Should fail
ASSERT_EQ("bar", Get("foo")); ASSERT_EQ("bar", Get("foo"));
// Recovery: should not lose data // Recovery: should not lose data

+ 1
- 1
db/dbformat.h View File

@ -22,7 +22,7 @@ namespace leveldb {
// Grouping of constants. We may want to make some of these // Grouping of constants. We may want to make some of these
// parameters set via options. // parameters set via options.
namespace config { namespace config {
static const int kNumLevels = 7;
static const int kNumLevels = 3;
// Level-0 compaction is started when we hit this many files. // Level-0 compaction is started when we hit this many files.
static const int kL0_CompactionTrigger = 4; static const int kL0_CompactionTrigger = 4;

+ 25
- 10
db/version_set.cc View File

@ -1104,13 +1104,11 @@ int VersionSet::NumLevelFiles(int level) const {
const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const { const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
// Update code if kNumLevels changes // Update code if kNumLevels changes
static_assert(config::kNumLevels == 7, "");
static_assert(config::kNumLevels == 3, "");
std::snprintf( std::snprintf(
scratch->buffer, sizeof(scratch->buffer), "files[ %d %d %d %d %d %d %d ]",
scratch->buffer, sizeof(scratch->buffer), "files[ %d %d %d ]",
int(current_->files_[0].size()), int(current_->files_[1].size()), int(current_->files_[0].size()), int(current_->files_[1].size()),
int(current_->files_[2].size()), int(current_->files_[3].size()),
int(current_->files_[4].size()), int(current_->files_[5].size()),
int(current_->files_[6].size()));
int(current_->files_[2].size()));
return scratch->buffer; return scratch->buffer;
} }
@ -1389,9 +1387,12 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]); AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]);
GetRange(c->inputs_[0], &smallest, &largest); GetRange(c->inputs_[0], &smallest, &largest);
current_->GetOverlappingInputs(level + 1, &smallest, &largest,
&c->inputs_[1]);
AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]);
// TTL: manual compaction for last level shouldn't have inputs[1]
if (!c->is_last_level()) {
current_->GetOverlappingInputs(level + 1, &smallest, &largest,
&c->inputs_[1]);
AddBoundaryInputs(icmp_, current_->files_[level + 1], &c->inputs_[1]);
}
// Get entire range covered by compaction // Get entire range covered by compaction
InternalKey all_start, all_limit; InternalKey all_start, all_limit;
@ -1446,7 +1447,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
} }
Compaction* VersionSet::CompactRange(int level, const InternalKey* begin, Compaction* VersionSet::CompactRange(int level, const InternalKey* begin,
const InternalKey* end) {
const InternalKey* end, bool is_last_level) {
std::vector<FileMetaData*> inputs; std::vector<FileMetaData*> inputs;
current_->GetOverlappingInputs(level, begin, end, &inputs); current_->GetOverlappingInputs(level, begin, end, &inputs);
if (inputs.empty()) { if (inputs.empty()) {
@ -1470,7 +1471,7 @@ Compaction* VersionSet::CompactRange(int level, const InternalKey* begin,
} }
} }
Compaction* c = new Compaction(options_, level);
Compaction* c = new Compaction(options_, level, is_last_level);
c->input_version_ = current_; c->input_version_ = current_;
c->input_version_->Ref(); c->input_version_->Ref();
c->inputs_[0] = inputs; c->inputs_[0] = inputs;
@ -1478,8 +1479,22 @@ Compaction* VersionSet::CompactRange(int level, const InternalKey* begin,
return c; return c;
} }
Compaction::Compaction(const Options* options, int level, bool is_last_level)
: level_(level),
is_last_level_(is_last_level),
max_output_file_size_(MaxFileSizeForLevel(options, level)),
input_version_(nullptr),
grandparent_index_(0),
seen_key_(false),
overlapped_bytes_(0) {
for (int i = 0; i < config::kNumLevels; i++) {
level_ptrs_[i] = 0;
}
}
Compaction::Compaction(const Options* options, int level) Compaction::Compaction(const Options* options, int level)
: level_(level), : level_(level),
is_last_level_(false),
max_output_file_size_(MaxFileSizeForLevel(options, level)), max_output_file_size_(MaxFileSizeForLevel(options, level)),
input_version_(nullptr), input_version_(nullptr),
grandparent_index_(0), grandparent_index_(0),

+ 5
- 1
db/version_set.h View File

@ -238,7 +238,7 @@ class VersionSet {
// level that overlaps the specified range. Caller should delete // level that overlaps the specified range. Caller should delete
// the result. // the result.
Compaction* CompactRange(int level, const InternalKey* begin, Compaction* CompactRange(int level, const InternalKey* begin,
const InternalKey* end);
const InternalKey* end, bool is_last_level);
// Return the maximum overlapping data (in bytes) at next level for any // Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1. // file at a level >= 1.
@ -324,6 +324,8 @@ class Compaction {
// and "level+1" will be merged to produce a set of "level+1" files. // and "level+1" will be merged to produce a set of "level+1" files.
int level() const { return level_; } int level() const { return level_; }
bool is_last_level() const {return is_last_level_; }
// Return the object that holds the edits to the descriptor done // Return the object that holds the edits to the descriptor done
// by this compaction. // by this compaction.
VersionEdit* edit() { return &edit_; } VersionEdit* edit() { return &edit_; }
@ -362,8 +364,10 @@ class Compaction {
friend class VersionSet; friend class VersionSet;
Compaction(const Options* options, int level); Compaction(const Options* options, int level);
Compaction(const Options* options, int level, bool is_last_level);
int level_; int level_;
bool is_last_level_; // TTL: info whether the last level to compact
uint64_t max_output_file_size_; uint64_t max_output_file_size_;
Version* input_version_; Version* input_version_;
VersionEdit edit_; VersionEdit edit_;

+ 27
- 0
test/db_test1.cc View File

@ -0,0 +1,27 @@
#include "leveldb/db.h"
#include <iostream>
using namespace std;
using namespace leveldb;
int main() {
DB* db = nullptr;
Options op;
op.create_if_missing = true;
Status status = DB::Open(op, "testdb", &db);
assert(status.ok());
db->Put(WriteOptions(), "001", "leveldb", 5);
string s;
auto stat = db->Get(ReadOptions(), "001", &s);
cout<<s<<endl;
db->Put(WriteOptions(), "002", "world");
string s1;
db->Delete(WriteOptions(), "002");
db->Get(ReadOptions(), "002", &s1);
cout<<s1<<endl;
delete db;
return 0;
}

+ 48
- 14
test/ttl_test.cc View File

@ -1,11 +1,7 @@
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/db.h" #include "leveldb/db.h"
using namespace leveldb; using namespace leveldb;
constexpr int value_size = 2048; constexpr int value_size = 2048;
@ -20,7 +16,7 @@ Status OpenDB(std::string dbName, DB **db) {
void InsertData(DB *db, uint64_t ttl/* second */) { void InsertData(DB *db, uint64_t ttl/* second */) {
WriteOptions writeOptions; WriteOptions writeOptions;
int key_num = data_size / value_size; int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
srand(0);
for (int i = 0; i < key_num; i++) { for (int i = 0; i < key_num; i++) {
int key_ = rand() % key_num+1; int key_ = rand() % key_num+1;
@ -35,7 +31,7 @@ void GetData(DB *db, int size = (1 << 30)) {
int key_num = data_size / value_size; int key_num = data_size / value_size;
// 点查 // 点查
srand(static_cast<unsigned int>(time(0)));
srand(0);
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1; int key_ = rand() % key_num+1;
std::string key = std::to_string(key_); std::string key = std::to_string(key_);
@ -58,7 +54,7 @@ TEST(TestTTL, ReadTTL) {
ReadOptions readOptions; ReadOptions readOptions;
Status status; Status status;
int key_num = data_size / value_size; int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
srand(0);
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1; int key_ = rand() % key_num+1;
std::string key = std::to_string(key_); std::string key = std::to_string(key_);
@ -76,6 +72,8 @@ TEST(TestTTL, ReadTTL) {
status = db->Get(readOptions, key, &value); status = db->Get(readOptions, key, &value);
ASSERT_FALSE(status.ok()); ASSERT_FALSE(status.ok());
} }
delete db;
} }
TEST(TestTTL, CompactionTTL) { TEST(TestTTL, CompactionTTL) {
@ -94,16 +92,52 @@ TEST(TestTTL, CompactionTTL) {
uint64_t sizes[1]; uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes); db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_GT(sizes[0], 0); ASSERT_GT(sizes[0], 0);
Env::Default()->SleepForMicroseconds(ttl * 1000000); Env::Default()->SleepForMicroseconds(ttl * 1000000);
db->CompactRange(nullptr, nullptr); db->CompactRange(nullptr, nullptr);
leveldb::Range ranges1[1];
ranges1[0] = leveldb::Range("-", "A");
uint64_t sizes1[1];
db->GetApproximateSizes(ranges1, 1, sizes1);
ASSERT_EQ(sizes1[0], 0);
leveldb::Range ranges[1];
ranges[0] = leveldb::Range("-", "A");
uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_EQ(sizes[0], 0);
delete db;
}
// Test handling last level with kNumLevels = 3
TEST(TestTTL, LastLevelCompaction) {
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
uint64_t ttl = 20;
InsertData(db, ttl);
leveldb::Range ranges[1];
ranges[0] = leveldb::Range("-", "A");
uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_GT(sizes[0], 0);
std::string last_level_file_num;
std::string last_level = "2";
db->GetProperty("leveldb.num-files-at-level" + last_level, &last_level_file_num);
std::cout << "File nums in last level: " << last_level_file_num << std::endl;
ASSERT_GT(std::atoi(last_level_file_num.c_str()), 0);
Env::Default()->SleepForMicroseconds(ttl * 1000000);
db->CompactRange(nullptr, nullptr);
leveldb::Range ranges1[1];
ranges1[0] = leveldb::Range("-", "A");
uint64_t sizes1[1];
db->GetApproximateSizes(ranges1, 1, sizes1);
ASSERT_EQ(sizes1[0], 0);
delete db;
} }
@ -111,4 +145,4 @@ int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits. // All tests currently run with the same read-only file limits.
testing::InitGoogleTest(&argc, argv); testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
}
}

+ 11
- 0
util/coding.h View File

@ -117,6 +117,17 @@ inline const char* GetVarint32Ptr(const char* p, const char* limit,
return GetVarint32PtrFallback(p, limit, value); return GetVarint32PtrFallback(p, limit, value);
} }
inline std::string EncodeDeadLine(uint64_t ddl, const Slice& value) {
return std::to_string(ddl) + "|" + value.ToString();
}
inline void DecodeDeadLineValue(std::string* value, uint64_t& ddl) {
auto separator = value->find_first_of("|");
std::string ddl_str = value->substr(0, separator);
ddl = std::atoll(ddl_str.c_str());
*value = value->substr(separator + 1);
}
} // namespace leveldb } // namespace leveldb
#endif // STORAGE_LEVELDB_UTIL_CODING_H_ #endif // STORAGE_LEVELDB_UTIL_CODING_H_

Loading…
Cancel
Save