Kaynağa Gözat

just modify db_impl.cc but 2 test pass.

main
郭夏辉 3 hafta önce
ebeveyn
işleme
3d1080b2e8
7 değiştirilmiş dosya ile 58 ekleme ve 156 silme
  1. +40
    -120
      db/db_impl.cc
  2. +3
    -9
      db/db_impl.h
  3. +6
    -3
      db/dbformat.h
  4. +2
    -9
      db/memtable.cc
  5. +1
    -6
      include/leveldb/db.h
  6. +0
    -3
      include/leveldb/slice.h
  7. +6
    -6
      test/ttl_test.cc

+ 40
- 120
db/db_impl.cc Dosyayı Görüntüle

@ -14,15 +14,6 @@
#include "db/table_cache.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <iomanip>
#include <iostream>
#include <set>
#include <string>
#include <vector>
#include "leveldb/db.h"
#include "leveldb/env.h"
@ -51,7 +42,6 @@ struct DBImpl::Writer {
WriteBatch* batch;
bool sync;
bool done;
port::CondVar cv;
};
@ -983,6 +973,18 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
drop = true;
}
Slice value = input->value();
if (value.size() >= sizeof(uint64_t)) {
const char* ptr = value.data();
std::string temp_str(value.data(), value.size());
uint64_t expiration_time = DBImpl::GetTS(&temp_str);
uint64_t current_time = env_->GetCurrentTime();
if (current_time > expiration_time) {
drop = true;
}else {drop = false;}
}
last_sequence_for_key = ikey.sequence;
}
#if 0
@ -1158,18 +1160,25 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
}
mutex_.Lock();
}
if (s.ok()) {
// 直接在这里判断是否过期
auto t1 = env_->GetCurrentTime();
auto t2 = GetTS(value);
if(t1 >= t2){
// 过期
s = Status::Expire("Expire",Slice());
} else {
// 没过期
*value = value->substr(0, value->size() - sizeof(uint64_t));
}
}
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
if(!s.ok()){
return s;
}
auto s2 = CheckIsExpire(value);
return s2;
return s;
}
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
@ -1205,44 +1214,10 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val);
}
Status DBImpl::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) {
//rocksdb的实现
// Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts,
// SystemClock* clock) {
// val_with_ts->reserve(kTSLength + val.size());
// char ts_string[kTSLength];
// int64_t curtime;
// Status st = clock->GetCurrentTime(&curtime);
// if (!st.ok()) {
// return st;
// }
// EncodeFixed32(ts_string, (int32_t)curtime);
// val_with_ts->append(val.data(), val.size());
// val_with_ts->append(ts_string, kTSLength);
// return st;
// }
std::string val_with_ts;
val_with_ts.reserve(value.size() + kTSLength);
char ts_string[kTSLength];
TIMESTAMP expiration_time = this->env_->GetCurrentTime() + ttl * 1000;
EncodeFixed64(ts_string,expiration_time);
//assert(sizeof(expiration_time) == sizeof(TIMESTAMP ));
// 追加原始 value 到 val_with_ts
val_with_ts.append(value.data(), value.size());
// 将 expiration_time 追加到 val_with_ts
val_with_ts.append(ts_string,kTSLength);
// std::cout << "val_with_ts in hex: ";
// for (unsigned char c : val_with_ts) {
// std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)c << " ";
// }
// std::cout << std::endl;
return DB::Put(options, key, Slice(val_with_ts));
// 扔掉了很复杂的逻辑
return DB::Put(options, key, value, ttl);
}
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
@ -1527,14 +1502,6 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
}
v->Unref();
}
/**
*
* @param val
* @param val_with_ts val后面连接上预计超时的timestamp
* @param ttl
*/
void DBImpl::AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) {
val_with_ts->reserve(kTSLength + val.size());
char ts_string[kTSLength];
@ -1548,71 +1515,32 @@ void DBImpl::AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) {
* @param val
* @return timestamp in val,and remove timestamp from val
*/
uint64_t DBImpl::GetTS(std::string* val) {
//uint64_t expiration_time;
// 输出 val 的十六进制表示
// std::cout << "befor decode,val in hex: ";
// for (unsigned char c : *val) {
// std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)c << " ";
// }
// std::cout << std::endl;
auto expiration_time = DecodeFixed64(val->data() + val->size() - kTSLength);
//memcpy(&expiration_time, val->data() + val->size() - sizeof(TIMESTAMP), sizeof(TIMESTAMP));
val->resize(val->size() - kTSLength);
// std::cout << "after decode,val in hex: ";
// for (unsigned char c : *val) {
// std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)c << " ";
// }
// std::cout << std::endl;
uint64_t DBImpl::GetTS(const std::string* val) {
// 不用auto再写一下
uint64_t expiration_time;
memcpy(&expiration_time, val->data() + val->size() - sizeof(uint64_t), sizeof(uint64_t));
return expiration_time;
// Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) {
// if (pinnable_val->size() < kTSLength) {
// return Status::Corruption("Bad timestamp in key-value");
// }
// // Erasing characters which hold the TS
// pinnable_val->remove_suffix(kTSLength);
// return Status::OK();
// }
}
Status DBImpl::CheckIsExpire(std::string* value) {
//debug 用
auto a = env_->GetCurrentTime();
auto b = GetTS(value);
// std::cout<<"get current time"<<a<<std::endl;
// std::cout << "get ts from val"<<b<<std::endl;
if(a > b){
return Status::Expire("Expire",Slice());
}
return Status();
// if(env_->GetCurrentTime() > GetTS(value)){
// return Status::Expire("Expire",Slice());
// }
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}
/**
*
* @param options
* @param key
* @param value
* @param ttl
* @return
*/
Status DB::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) {
Status DB::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) {
// 将 value 和 expiration_time 合并到一起,形成带 TTL 的 value
std::string val_with_ts;
val_with_ts.reserve(value.size() + sizeof(uint64_t));
uint64_t expiration_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count() + ttl * 1000;
// 追加原始 value 到 val_with_ts
val_with_ts.append(value.data(), value.size());
// 将 expiration_time 追加到 val_with_ts
val_with_ts.append(reinterpret_cast<const char*>(&expiration_time), sizeof(expiration_time));
// std::cout<<"PUT"<<std::endl;
@ -1623,14 +1551,6 @@ Status DB::Put(const WriteOptions& options, const Slice& key,
return Write(options, &batch);
}
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}
Status DB::Delete(const WriteOptions& opt, const Slice& key) {
WriteBatch batch;
batch.Delete(key);

+ 3
- 9
db/db_impl.h Dosyayı Görüntüle

@ -36,11 +36,8 @@ class DBImpl : public DB {
~DBImpl() override;
// Implementations of the DB interface
Status Put(const WriteOptions&, const Slice& key,
const Slice& value) override;
Status Put(const WriteOptions& options, const Slice& key, const Slice& value,
uint64_t ttl) override;
Status Put(const WriteOptions&, const Slice& key,const Slice& value) override;
Status Put(const WriteOptions& options, const Slice& key, const Slice& value,uint64_t ttl) override;
Status Delete(const WriteOptions&, const Slice& key) override;
Status Write(const WriteOptions& options, WriteBatch* updates) override;
Status Get(const ReadOptions& options, const Slice& key,
@ -52,7 +49,7 @@ class DBImpl : public DB {
void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override;
void CompactRange(const Slice* begin, const Slice* end) override;
void AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl);
static uint64_t GetTS(std::string* val);
static uint64_t GetTS(const std::string* val);
// Extra methods (for testing) that are not in the public DB interface
// Compact any files in the named level that overlap [*begin,*end]
@ -74,8 +71,6 @@ class DBImpl : public DB {
// Samples are taken approximately once every config::kReadBytesPeriod
// bytes.
void RecordReadSample(Slice key);
// Status Write(const WriteOptions& options, WriteBatch* updates,
// uint64_t ttl) override;
private:
friend class DB;
@ -209,7 +204,6 @@ class DBImpl : public DB {
Status bg_error_ GUARDED_BY(mutex_);
CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_);
Status CheckIsExpire(std::string* value);
};
// Sanitize db options. The caller should delete result.info_log if

+ 6
- 3
db/dbformat.h Dosyayı Görüntüle

@ -25,13 +25,16 @@ namespace config {
static const int kNumLevels = 7;
// Level-0 compaction is started when we hit this many files.
static const int kL0_CompactionTrigger = 4;
// 4
static const int kL0_CompactionTrigger = 128;
// Soft limit on number of level-0 files. We slow down writes at this point.
static const int kL0_SlowdownWritesTrigger = 8;
// 8
static const int kL0_SlowdownWritesTrigger = 128;
// Maximum number of level-0 files. We stop writes at this point.
static const int kL0_StopWritesTrigger = 12;
// 12
static const int kL0_StopWritesTrigger = 128;
// Maximum level to which a new compacted memtable is pushed if it
// does not create overlap. We try to push to level 2 to avoid the

+ 2
- 9
db/memtable.cc Dosyayı Görüntüle

@ -8,6 +8,7 @@
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "util/coding.h"
#include "db/db_impl.h"
namespace leveldb {
@ -81,7 +82,6 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
// tag : uint64((sequence << 8) | type)
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
// timestamp : uint64
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
@ -95,18 +95,11 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
std::memcpy(p, value.data(), val_size);//value包含timestamp
std::memcpy(p, value.data(), val_size);
assert(p + val_size == buf + encoded_len);
table_.Insert(buf);
}
/**
* @descrption: ttl问题场景keyttl的相对大小于版本号的相对大小不同
* @param key
* @param value
* @param s
* @return
*/
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);

+ 1
- 6
include/leveldb/db.h Dosyayı Görüntüle

@ -148,12 +148,7 @@ class LEVELDB_EXPORT DB {
// ----------------------------For TTL-----------------------------
// key设置ttl
virtual Status Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) = 0;
// virtual Status Write(const WriteOptions& options, WriteBatch* updates,uint64_t ttl) = 0;
// virtual void AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) = 0;
// virtual void AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) = 0;
virtual Status Put(const WriteOptions& options, const Slice& key,const Slice& value, uint64_t ttl) = 0;
};
// Destroy the contents of the specified database.

+ 0
- 3
include/leveldb/slice.h Dosyayı Görüntüle

@ -38,7 +38,6 @@ class LEVELDB_EXPORT Slice {
// Create a slice that refers to s[0,strlen(s)-1]
Slice(const char* s) : data_(s), size_(strlen(s)) {}
// Intentionally copyable.
Slice(const Slice&) = default;
Slice& operator=(const Slice&) = default;
@ -49,7 +48,6 @@ class LEVELDB_EXPORT Slice {
// Return the length (in bytes) of the referenced data
size_t size() const { return size_; }
// Return true iff the length of the referenced data is zero
bool empty() const { return size_ == 0; }
@ -76,7 +74,6 @@ class LEVELDB_EXPORT Slice {
size_ -= n;
}
// Return a string that contains the copy of the referenced data.
std::string ToString() const { return std::string(data_, size_); }

+ 6
- 6
test/ttl_test.cc Dosyayı Görüntüle

@ -72,12 +72,11 @@ TEST(TestTTL, ReadTTL) {
ASSERT_FALSE(status.ok());
}
delete db;
Env::Default()->SleepForMicroseconds( 1000);
}
TEST(TestTTL, CompactionTTL) {
DestroyDB("testdb", Options());
DB *db;
DestroyDB("testdb", Options());
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
@ -96,11 +95,12 @@ TEST(TestTTL, CompactionTTL) {
db->CompactRange(nullptr, nullptr);
leveldb::Range ranges2[1];
ranges2[0] = leveldb::Range("-", "A");
leveldb::Range ranges1[1];
ranges[0] = leveldb::Range("-", "A");
uint64_t sizes2[1];
db->GetApproximateSizes(ranges2, 1, sizes2);
ASSERT_EQ(sizes[0], 0);
db->GetApproximateSizes(ranges1, 1, sizes2);
ASSERT_EQ(sizes2[0], 0);
delete db;
}

Yükleniyor…
İptal
Kaydet