Kaynağa Gözat

fix auto copaction problem , still need to deal compaction

main
kevinyao0901 3 hafta önce
ebeveyn
işleme
84186e0652
5 değiştirilmiş dosya ile 173 ekleme ve 42 silme
  1. +59
    -12
      db/db_impl.cc
  2. +4
    -3
      db/dbformat.h
  3. +14
    -14
      db/memtable.cc
  4. +2
    -0
      db/version_set.h
  5. +94
    -13
      test/ttl_test.cc

+ 59
- 12
db/db_impl.cc Dosyayı Görüntüle

@ -11,6 +11,7 @@
#include <set>
#include <string>
#include <vector>
#include <iostream>
#include "db/builder.h"
#include "db/db_iter.h"
@ -763,7 +764,7 @@ void DBImpl::BackgroundCompaction() {
Status status;
if (c == nullptr) {
// Nothing to do
} else if (!is_manual && c->IsTrivialMove()) {
} else if (!is_manual && c->IsTrivialMove() && 1==2) {
// Move file to next level
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
@ -924,9 +925,16 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
}
Status DBImpl::DoCompactionWork(CompactionState* compact) {
std::cout<< "start compact" << std::endl;
const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
//TTL ToDo
// 定义要检测的目标键
Slice target_key = "10000";
int dropped_keys_count = 0; // 初始化计数器
int total_keys_count = 0;
Log(options_.info_log, "Compacting %d@%d + %d@%d files",
compact->compaction->num_input_files(0), compact->compaction->level(),
compact->compaction->num_input_files(1),
@ -935,6 +943,17 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
assert(compact->builder == nullptr);
assert(compact->outfile == nullptr);
//TTL ToDo
// {
// //MutexLock l(&mutex_);
// if (has_imm_.load(std::memory_order_relaxed)) {
// CompactMemTable();
// background_work_finished_signal_.SignalAll();
// }
// }
//finish modify
if (snapshots_.empty()) {
compact->smallest_snapshot = versions_->LastSequence();
} else {
@ -947,6 +966,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
mutex_.Unlock();
input->SeekToFirst();
std::cout << "Compation first key: " << input->key().ToString() << std::endl;
Status status;
ParsedInternalKey ikey;
std::string current_user_key;
@ -1009,21 +1029,45 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}
// TTL ToDo: add expiration time check
if (!drop) { // 如果还未被标记为丢弃
Slice value = input->value();
if (value.size() >= sizeof(uint64_t)) {
const char* ptr = value.data();
uint64_t expiration_time = DecodeFixed64(ptr);
uint64_t current_time = env_->NowMicros() / 1000000;
if (current_time > expiration_time) {
drop = true; // 过期的键值对,标记为丢弃
}
// 检查是否为目标键
if (key == target_key) {
// 输出调试信息
Log(options_.info_log, "Found target key during compaction: %s\n", key.ToString().c_str());
}
Slice value = input->value();
if (value.size() >= sizeof(uint64_t)) {
const char* ptr = value.data();
uint64_t expiration_time = DecodeFixed64(ptr);
uint64_t current_time = env_->NowMicros() / 1000000;
if (current_time > expiration_time) {
drop = true; // 过期的键值对,标记为丢弃
dropped_keys_count ++; // 初始化计数器
}else{
bool flag = current_time > expiration_time;
}
}else{
bool bs = value.size() >= sizeof(uint64_t);
}
// if (!drop) { // 如果还未被标记为丢弃
// Slice value = input->value();
// if (value.size() >= sizeof(uint64_t)) {
// const char* ptr = value.data();
// uint64_t expiration_time = DecodeFixed64(ptr);
// uint64_t current_time = env_->NowMicros() / 1000000;
// if (current_time > expiration_time) {
// drop = true; // 过期的键值对,标记为丢弃
// }
// }
// }
last_sequence_for_key = ikey.sequence;
}
Log(options_.info_log, "Total dropped keys in compaction: %d\n", dropped_keys_count); // 输出统计结果
total_keys_count++;
#if 0
Log(options_.info_log,
" Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
@ -1061,6 +1105,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
input->Next();
}
std::cout << "Total dropped keys in compaction:" << dropped_keys_count
<< ", Total: " << total_keys_count << "\n";
if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
status = Status::IOError("Deleting DB during compaction");
}
@ -1206,7 +1253,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
uint64_t current_time = GetCurrentTime();
// 如果当前时间已经超过过期时间,则认为数据过期,返回 NotFound
if (current_time > expiration_time) {
if (current_time >= expiration_time) {
s = Status::NotFound(Slice());
} else {
// 数据未过期,解析出实际的值

+ 4
- 3
db/dbformat.h Dosyayı Görüntüle

@ -24,14 +24,15 @@ namespace leveldb {
namespace config {
static const int kNumLevels = 7;
//TTL ToDo
// Level-0 compaction is started when we hit this many files.
static const int kL0_CompactionTrigger = 4;
static const int kL0_CompactionTrigger = 4096;
// Soft limit on number of level-0 files. We slow down writes at this point.
static const int kL0_SlowdownWritesTrigger = 8;
static const int kL0_SlowdownWritesTrigger = 8192;
// Maximum number of level-0 files. We stop writes at this point.
static const int kL0_StopWritesTrigger = 12;
static const int kL0_StopWritesTrigger = 1024*1024;
// Maximum level to which a new compacted memtable is pushed if it
// does not create overlap. We try to push to level 2 to avoid the

+ 14
- 14
db/memtable.cc Dosyayı Görüntüle

@ -127,20 +127,20 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
// TTL ToDo :检查:从 value 中解析出过期时间戳并与当前时间比较
assert(value->size() >= sizeof(uint64_t));
uint64_t expiration_time;
memcpy(&expiration_time, value->data(), sizeof(uint64_t)); // 解析出过期时间戳
uint64_t current_time = static_cast<uint64_t>(time(nullptr)); // 获取当前时间
// 如果当前时间已超过过期时间,设置状态为 NotFound 表示数据过期
if (current_time > expiration_time) {
*s = Status::NotFound(Slice());
return true;
} else {
// 数据未过期,解析出实际的值部分,去掉过期时间戳
*value = value->substr(sizeof(uint64_t));
}
// // TTL ToDo :检查:从 value 中解析出过期时间戳并与当前时间比较
// assert(value->size() >= sizeof(uint64_t));
// uint64_t expiration_time;
// memcpy(&expiration_time, value->data(), sizeof(uint64_t)); // 解析出过期时间戳
// uint64_t current_time = static_cast<uint64_t>(time(nullptr)); // 获取当前时间
// // 如果当前时间已超过过期时间,设置状态为 NotFound 表示数据过期
// if (current_time > expiration_time) {
// *s = Status::NotFound(Slice());
// return true;
// } else {
// // 数据未过期,解析出实际的值部分,去掉过期时间戳
// *value = value->substr(sizeof(uint64_t));
// }
//finish modify
return true;

+ 2
- 0
db/version_set.h Dosyayı Görüntüle

@ -252,6 +252,8 @@ class VersionSet {
bool NeedsCompaction() const {
Version* v = current_;
return (v->compaction_score_ >= 1) || (v->file_to_compact_ != nullptr);
//TTL ToDo
//return false;
}
// Add all files listed in any live version to *live.

+ 94
- 13
test/ttl_test.cc Dosyayı Görüntüle

@ -11,6 +11,33 @@ using namespace leveldb;
constexpr int value_size = 2048;
constexpr int data_size = 128 << 20;
//--------------------------------------------------------------
void PrintAllKeys(DB *db) {
// 创建一个读选项对象
ReadOptions readOptions;
int LeftKeyCount = 0;
// 创建迭代器
std::unique_ptr<Iterator> it(db->NewIterator(readOptions));
// 遍历所有键
for (it->SeekToFirst(); it->Valid(); it->Next()) {
// std::string key = it->key().ToString();
// std::string value = it->value().ToString();
// std::cout << "Key: " << key << std::endl;
LeftKeyCount++;
}
// 检查迭代器的有效性
if (!it->status().ok()) {
std::cerr << "Error iterating through keys: " << it->status().ToString() << std::endl;
}
std::cerr << "Key hasn't been deleted: " << LeftKeyCount << std::endl;
}
//------------------------------------------------------------------
Status OpenDB(std::string dbName, DB **db) {
Options options;
options.create_if_missing = true;
@ -26,19 +53,36 @@ void InsertData(DB *db, uint64_t ttl/* second */) {
std::unordered_set<std::string> unique_keys;
for (int i = 0; i < key_num; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
std::string value(value_size, 'a');
Status status = db->Put(writeOptions, key, value, ttl);
if (!status.ok()) {
// 输出失败的状态信息并退出循环
std::cerr << "Failed to write key: " << key
<< ", Status: " << status.ToString() << std::endl;
}else{
std::cerr << "Success to write key: " << key << std::endl;
unique_keys.insert(key); // 插入集合中,如果已经存在则不会重复插入
std::string key;
do {
int key_ = rand() % key_num + 1;
key = std::to_string(key_);
} while (unique_keys.find(key) != unique_keys.end()); // 检查是否已存在
std::string value(value_size, 'a');
// 判断 key 是否在范围内
if (key >= "-" && key < "A") {
//std::cout << "Key: " << key << " is within the range (-, A)" << std::endl;
} else {
std::cout << "Key: " << key << " is outside the range (-, A)" << std::endl;
return;
}
Status status = db->Put(writeOptions, key, value, ttl);
if (!status.ok()) {
// 输出失败的状态信息并退出循环
std::cerr << "Failed to write key: " << key
<< ", Status: " << status.ToString() << std::endl;
} else {
unique_keys.insert(key); // 插入集合中,如果已经存在则不会重复插入
}
}
}
Iterator* iter = db->NewIterator(ReadOptions());
iter->SeekToFirst();
std::cout << "Data base First key: " << iter->key().ToString() << std::endl;
delete iter;
// 打印成功写入的唯一键的数量
std::cout << "Total unique keys successfully written: " << unique_keys.size() << std::endl;
@ -56,8 +100,22 @@ void GetData(DB *db, int size = (1 << 30)) {
std::string value;
db->Get(readOptions, key, &value);
}
Iterator* iter = db->NewIterator(ReadOptions());
iter->SeekToFirst();
std::cout << "Data base First key: " << iter->key().ToString() << std::endl;
int cnt = 0;
while (iter->Valid())
{
cnt++;
iter->Next();
}
std::cout << "Total key cnt: " << cnt << "\n";
delete iter;
}
#if 0
TEST(TestTTL, ReadTTL) {
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
@ -89,6 +147,7 @@ TEST(TestTTL, ReadTTL) {
Env::Default()->SleepForMicroseconds(ttl * 1000000);
srand(42);
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
std::string key = std::to_string(key_);
@ -102,32 +161,54 @@ TEST(TestTTL, ReadTTL) {
ASSERT_FALSE(status.ok());
}
delete db;
}
#endif
TEST(TestTTL, CompactionTTL) {
DB *db;
leveldb::Options options;
// options.write_buffer_size = 1024*1024*1024;
// options.max_file_size = 1024*1024*1024;
leveldb::DestroyDB("testdb", options);
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
uint64_t ttl = 20;
InsertData(db, ttl);
leveldb::Range ranges[1];
ranges[0] = leveldb::Range("-", "A");
uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_EQ(sizes[0], 0);
InsertData(db, ttl);
//leveldb::Range ranges[1];
ranges[0] = leveldb::Range("-", "A");
//uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_GT(sizes[0], 0);
ttl += 10;
Env::Default()->SleepForMicroseconds(ttl * 1000000);
std::cout << "Start drop\n";
db->CompactRange(nullptr, nullptr);
ranges[0] = leveldb::Range("-", "A");
db->GetApproximateSizes(ranges, 1, sizes);
PrintAllKeys(db);
ASSERT_EQ(sizes[0], 0);
delete db;
}

Yükleniyor…
İptal
Kaydet