Browse Source

complete compaction

ckx
jjhengxin 3 weeks ago
parent
commit
c7b28e6be8
9 changed files with 409 additions and 59 deletions
  1. +138
    -0
      db/1.cc
  2. +87
    -0
      db/1.h
  3. +31
    -6
      db/db_impl.cc
  4. +99
    -31
      db/memtable.cc
  5. +1
    -0
      db/memtable.h
  6. +1
    -1
      db/write_batch.cc
  7. +19
    -0
      test.cc
  8. +33
    -21
      test/ttl_test.cc
  9. BIN
      test1

+ 138
- 0
db/1.cc View File

@ -0,0 +1,138 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/memtable.h"
#include "db/dbformat.h"
#include "leveldb/comparator.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "util/coding.h"
namespace leveldb {
static Slice GetLengthPrefixedSlice(const char* data) {
uint32_t len;
const char* p = data;
p = GetVarint32Ptr(p, p + 5, &len); // +5: we assume "p" is not corrupted
return Slice(p, len);
}
MemTable::MemTable(const InternalKeyComparator& comparator)
: comparator_(comparator), refs_(0), table_(comparator_, &arena_) {}
MemTable::~MemTable() { assert(refs_ == 0); }
size_t MemTable::ApproximateMemoryUsage() { return arena_.MemoryUsage(); }
int MemTable::KeyComparator::operator()(const char* aptr,
const char* bptr) const {
// Internal keys are encoded as length-prefixed strings.
Slice a = GetLengthPrefixedSlice(aptr);
Slice b = GetLengthPrefixedSlice(bptr);
return comparator.Compare(a, b);
}
// Encode a suitable internal key target for "target" and return it.
// Uses *scratch as scratch space, and the returned pointer will point
// into this scratch space.
static const char* EncodeKey(std::string* scratch, const Slice& target) {
scratch->clear();
PutVarint32(scratch, target.size());
scratch->append(target.data(), target.size());
return scratch->data();
}
class MemTableIterator : public Iterator {
public:
explicit MemTableIterator(MemTable::Table* table) : iter_(table) {}
MemTableIterator(const MemTableIterator&) = delete;
MemTableIterator& operator=(const MemTableIterator&) = delete;
~MemTableIterator() override = default;
bool Valid() const override { return iter_.Valid(); }
void Seek(const Slice& k) override { iter_.Seek(EncodeKey(&tmp_, k)); }
void SeekToFirst() override { iter_.SeekToFirst(); }
void SeekToLast() override { iter_.SeekToLast(); }
void Next() override { iter_.Next(); }
void Prev() override { iter_.Prev(); }
Slice key() const override { return GetLengthPrefixedSlice(iter_.key()); }
Slice value() const override {
Slice key_slice = GetLengthPrefixedSlice(iter_.key());
return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
}
Status status() const override { return Status::OK(); }
private:
MemTable::Table::Iterator iter_;
std::string tmp_; // For passing to EncodeKey
};
Iterator* MemTable::NewIterator() { return new MemTableIterator(&table_); }
void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
const Slice& value) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// tag : uint64((sequence << 8) | type)
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
const size_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) +
val_size;
char* buf = arena_.Allocate(encoded_len);
char* p = EncodeVarint32(buf, internal_key_size);
std::memcpy(p, key.data(), key_size);
p += key_size;
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
std::memcpy(p, value.data(), val_size);
assert(p + val_size == buf + encoded_len);
table_.Insert(buf);
}
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
if (iter.Valid()) {
// entry format is:
// klength varint32
// userkey char[klength]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter.key();
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
return true;
}
case kTypeDeletion:
*s = Status::NotFound(Slice());
return true;
}
}
}
return false;
}
} // namespace leveldb

+ 87
- 0
db/1.h View File

@ -0,0 +1,87 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef STORAGE_LEVELDB_DB_MEMTABLE_H_
#define STORAGE_LEVELDB_DB_MEMTABLE_H_
#include <string>
#include "db/dbformat.h"
#include "db/skiplist.h"
#include "leveldb/db.h"
#include "util/arena.h"
namespace leveldb {
class InternalKeyComparator;
class MemTableIterator;
class MemTable {
public:
// MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once.
explicit MemTable(const InternalKeyComparator& comparator);
MemTable(const MemTable&) = delete;
MemTable& operator=(const MemTable&) = delete;
// Increase reference count.
void Ref() { ++refs_; }
// Drop reference count. Delete if no more references exist.
void Unref() {
--refs_;
assert(refs_ >= 0);
if (refs_ <= 0) {
delete this;
}
}
// Returns an estimate of the number of bytes of data in use by this
// data structure. It is safe to call when MemTable is being modified.
size_t ApproximateMemoryUsage();
// Return an iterator that yields the contents of the memtable.
//
// The caller must ensure that the underlying MemTable remains live
// while the returned iterator is live. The keys returned by this
// iterator are internal keys encoded by AppendInternalKey in the
// db/format.{h,cc} module.
Iterator* NewIterator();
// Add an entry into memtable that maps key to value at the
// specified sequence number and with the specified type.
// Typically value will be empty if type==kTypeDeletion.
void Add(SequenceNumber seq, ValueType type, const Slice& key,
const Slice& value);
// If memtable contains a value for key, store it in *value and return true.
// If memtable contains a deletion for key, store a NotFound() error
// in *status and return true.
// Else, return false.
bool Get(const LookupKey& key, std::string* value, Status* s);
private:
friend class MemTableIterator;
friend class MemTableBackwardIterator;
struct KeyComparator {
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) {}
int operator()(const char* a, const char* b) const;
};
typedef SkipList<const char*, KeyComparator> Table;
~MemTable(); // Private since only Unref() should be used to delete it
KeyComparator comparator_;
int refs_;
Arena arena_;
Table table_;
};
} // namespace leveldb
#endif // STORAGE_LEVELDB_DB_MEMTABLE_H_

+ 31
- 6
db/db_impl.cc View File

@ -591,7 +591,7 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
}
}
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
for (int level = 0; level < max_level_with_files; level++) {
for (int level = 0; level <= max_level_with_files; level++) { //心
TEST_CompactRange(level, begin, end);
}
}
@ -949,10 +949,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// Handle key/value, add to state, etc.
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
current_user_key.clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
// Do not hide error keys
current_user_key.clear();
has_current_user_key = false;
last_sequence_for_key = kMaxSequenceNumber;
} else {
if (!has_current_user_key ||
user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
@ -978,6 +978,31 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
}
else{
Slice value = input->value();
std::string combined_str = value.ToString();
// 从右往左提取固定长度的字符串
std::string time_part = combined_str.substr(combined_str.length() - 19, 19);
// 解析时间字符串
std::tm parsed_tm = {};
const char* result = strptime(time_part.c_str(), "%Y-%m-%d %H:%M:%S", &parsed_tm);
// 将解析出的时间转为 time_t
std::time_t parsed_time_t = std::mktime(&parsed_tm);
// 获取当前时间
auto now = std::chrono::system_clock::now();
// 转换为 time_t
std::time_t now_time_t = std::chrono::system_clock::to_time_t(now);
if (parsed_time_t <= now_time_t)
{
drop = true;
std::string s = key.ToString();
}
// 心
}
last_sequence_for_key = ikey.sequence;
}
@ -1500,7 +1525,7 @@ Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value, ui
WriteBatch batch;
batch.Put(key, value, ttl);
return Write(opt, &batch);
} // 这里应该是新的PUT接口的真正实现的地方,还是由本来的DB类实现,怪?心
} // 这里应该是新的PUT接口的真正实现的地方,还是由本来的DB类实现,怪?心
Status DB::Delete(const WriteOptions& opt, const Slice& key) {
WriteBatch batch;

+ 99
- 31
db/memtable.cc View File

@ -8,6 +8,7 @@
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "util/coding.h"
#include "include/leveldb/write_batch.h"
namespace leveldb {
@ -99,40 +100,107 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
table_.Insert(buf);
}
// bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { //燕改
// Slice memkey = key.memtable_key();
// Table::Iterator iter(&table_);
// iter.Seek(memkey.data());
// if (iter.Valid()) {// 获取跳表项的内容
// // entry format is:
// // klength varint32
// // userkey char[klength]
// // tag uint64
// // vlength varint32
// // value char[vlength]
// // Check that it belongs to same user key. We do not check the
// // sequence number since the Seek() call above should have skipped
// // all entries with overly large sequence numbers.
// const char* entry = iter.key();
// uint32_t key_length;
// const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
// if (comparator_.comparator.user_comparator()->Compare(// 比较键值
// Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
// // Correct user key
// const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
// switch (static_cast<ValueType>(tag & 0xff)) {
// case kTypeValue: {
// // Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
// // value->assign(v.data(), v.size());
// // return true;
// // 获取过期时间戳,燕改
// Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
// uint64_t expire_time = DecodeFixed64(v.data() + v.size() - sizeof(uint64_t));
// // 检查是否已过期
// uint64_t current_time = Env::Default()->NowMicros() / 1000000; // 当前时间(秒)
// if (expire_time > 0 && expire_time < current_time) {
// *s = Status::NotFound("Key has expired"); // 已过期
// return false;
// }
// // 未过期,返回值
// value->assign(v.data() + 8, v.size() - 8); // 去除前8字节的时间戳
// return true;
// }
// case kTypeDeletion:
// *s = Status::NotFound(Slice());
// return true;
// }
// }
// }
// return false;
// }
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
if (iter.Valid()) {
// entry format is:
// klength varint32
// userkey char[klength]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter.key();
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
return true;
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
if (iter.Valid()) { // 获取跳表项的内容
const char* entry = iter.key();
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
// 获取存储的值和时间戳
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
std::string combined_str(v.data(), v.size());
// 根据存储格式分离原始值和时间戳
std::string actual_value = combined_str.substr(0, combined_str.size() - 20);
std::string time_str = combined_str.substr(combined_str.size() - 19, 19);
// 获取当前时间(字符串格式)
auto now = std::chrono::system_clock::now();
auto now_time_t = std::chrono::system_clock::to_time_t(now);
std::tm* now_tm = std::localtime(&now_time_t);
char buffer[20];
std::strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", now_tm);
std::string current_time_str(buffer);
// 检查过期
if (time_str <= current_time_str) {
*s = Status::NotFound("Key has expired"); // 已过期
return true;
}
// 未过期,返回实际值
value->assign(actual_value);
return true;
}
case kTypeDeletion:
*s = Status::NotFound(Slice());
return true;
}
}
case kTypeDeletion:
*s = Status::NotFound(Slice());
return true;
}
}
}
return false;
return false;
}
} // namespace leveldb

+ 1
- 0
db/memtable.h View File

@ -60,6 +60,7 @@ class MemTable {
// If memtable contains a deletion for key, store a NotFound() error
// in *status and return true.
// Else, return false.
std::chrono::system_clock::time_point insert_time; //
bool Get(const LookupKey& key, std::string* value, Status* s);
private:

+ 1
- 1
db/write_batch.cc View File

@ -104,7 +104,7 @@ void WriteBatch::Put(const Slice& key, const Slice& value) {
PutLengthPrefixedSlice(&rep_, value);
}
void WriteBatch::Put(const Slice& key, const Slice& value, std::uint64_t ttl) {
void WriteBatch::Put(const Slice& key, const Slice& value, uint64_t ttl) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
PutLengthPrefixedSlice(&rep_, key);

+ 19
- 0
test.cc View File

@ -0,0 +1,19 @@
#include<bits/stdc++.h>
using namespace std;
int main() {
srand(0);
for (int i = 0; i <= 10;i++)
{
int k = rand() % 1000;
printf("%d", k);
}
printf("\n----------------\n");
srand(0);
for (int i = 0; i <= 10;i++)
{
int k = rand() % 1000;
printf("%d", k);
}
return 0;
}

+ 33
- 21
test/ttl_test.cc View File

@ -20,7 +20,7 @@ Status OpenDB(std::string dbName, DB **db) {
void InsertData(DB *db, uint64_t ttl/* second */) {
WriteOptions writeOptions;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
srand(0);
for (int i = 0; i < key_num; i++) {
//int key_ = rand() % key_num+1;
@ -28,6 +28,7 @@ void InsertData(DB *db, uint64_t ttl/* second */) {
std::string key = std::to_string(key_);
std::string value(value_size, 'a');
db->Put(writeOptions, key, value, ttl);
if (i<100) printf("%d ", key_);
}
}
@ -35,8 +36,8 @@ void GetData(DB *db, int size = (1 << 30)) {
ReadOptions readOptions;
int key_num = data_size / value_size;
// 使用随机种子生成随机键进行点查(单次查询),燕
srand(static_cast<unsigned int>(time(0)));
// 点查
srand(0);
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
//int key_ = i % key_num+1;
@ -53,33 +54,44 @@ TEST(TestTTL, ReadTTL) {
abort();
}
uint64_t ttl = 20;
uint64_t ttl = 40;
InsertData(db, ttl);
printf("\n-------------\n");
ReadOptions readOptions;
Status status;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
for (int i = 0; i < 100; i++) {
//int key_ = rand() % key_num+1;
int key_ = i % key_num+1;
std::string key = std::to_string(key_);
std::string value;
status = db->Get(readOptions, key, &value);
ASSERT_TRUE(status.ok());
}
// 等待TTL过期,使插入的数据变为“过期”状态,燕
Env::Default()->SleepForMicroseconds(ttl * 1000000);
srand(0);
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num+1;
// int key_ = i % key_num+1;
printf("%d ", key_);
std::string key = std::to_string(key_);
std::string value;
status = db->Get(readOptions, key, &value);
ASSERT_FALSE(status.ok());
if (!status.ok())
{
status = db->Get(readOptions, key, &value);
printf("\n%d\n", key_);
}
// ASSERT_TRUE(status.ok());
}
// Env::Default()->SleepForMicroseconds(ttl * 1000000);
// srand(0);
// for (int i = 0; i < 100; i++) {
// int key_ = rand() % key_num+1;
// std::string key = std::to_string(key_);
// std::string value;
// status = db->Get(readOptions, key, &value);
// if (!status.ok())
// {
// status = db->Get(readOptions, key, &value);
// printf("\n%d\n", key_);
// }
// // ASSERT_FALSE(status.ok()); // 经过超长时间之后所有的键值对应该都过期了,心
// }
delete db;
}
TEST(TestTTL, CompactionTTL) {
@ -99,16 +111,16 @@ TEST(TestTTL, CompactionTTL) {
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_GT(sizes[0], 0);
Env::Default()->SleepForMicroseconds(ttl * 1000000);
Env::Default()->SleepForMicroseconds(100 * 1000000);
db->CompactRange(nullptr, nullptr);
// leveldb::Range ranges[1]; // 这里为什么要重复定义?心
ranges[0] = leveldb::Range("-", "A");
// uint64_t sizes[1]; // 心
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_EQ(sizes[0], 0);
delete db;
}

BIN
test1 View File


Loading…
Cancel
Save