Parcourir la source

merge lzj_version and little modification

main
郭夏辉 il y a 2 semaines
Parent
révision
74c80876cd
17 fichiers modifiés avec 229 ajouts et 126 suppressions
  1. +5
    -1
      README.md
  2. +18
    -6
      db/builder.cc
  3. +112
    -57
      db/db_impl.cc
  4. +8
    -2
      db/db_impl.h
  5. +3
    -6
      db/dbformat.h
  6. +8
    -18
      db/memtable.cc
  7. +9
    -0
      db/version_edit.cc
  8. +19
    -1
      db/version_edit.h
  9. +18
    -3
      db/version_set.cc
  10. +3
    -3
      db/version_set.h
  11. +6
    -1
      include/leveldb/db.h
  12. +3
    -0
      include/leveldb/slice.h
  13. +1
    -0
      include/leveldb/status.h
  14. +1
    -21
      table/block.cc
  15. +1
    -1
      test/simple_test.cc
  16. +7
    -6
      test/ttl_test.cc
  17. +7
    -0
      util/coding.cc

+ 5
- 1
README.md Voir le fichier

@ -2,4 +2,8 @@
LevelDB project 1
**本仓库提供TTL基本的测试用例**
10225501460 林子骥
10211900416 郭夏辉
如果出现open db failed,可能是运行地太慢了,请酌情调高ttl_test.cc中ttl的值。

+ 18
- 6
db/builder.cc Voir le fichier

@ -14,6 +14,16 @@
namespace leveldb {
/**
* description: SSTable
* @param dbname
* @param env
* @param options
* @param table_cache
* @param iter
* @param meta sstable的相关元数据
* @return
*/
Status BuildTable(const std::string& dbname, Env* env, const Options& options,
TableCache* table_cache, Iterator* iter, FileMetaData* meta) {
Status s;
@ -21,6 +31,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
iter->SeekToFirst();
std::string fname = TableFileName(dbname, meta->number);
if (iter->Valid()) {
WritableFile* file;
s = env->NewWritableFile(fname, &file);
@ -30,16 +41,17 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
TableBuilder* builder = new TableBuilder(options, file);
meta->smallest.DecodeFrom(iter->key());//这里是internal_key
// auto tmp_ts = DecodeFixed64(iter->value().data() + iter->value().size() - kTSLength);
// meta->oldest_ts = tmp_ts;
// meta->newer_ts = tmp_ts;
auto tmp_ts = DecodeFixed64(iter->value().data() + iter->value().size() - kTSLength);
meta->oldest_ts = tmp_ts;
meta->newer_ts = tmp_ts;
Slice key;
for (; iter->Valid(); iter->Next()) {
key = iter->key();
builder->Add(key, iter->value());
// tmp_ts = DecodeFixed64(iter->value().data() + iter->value().size() - kTSLength);
// meta->oldest_ts = meta->oldest_ts > tmp_ts ? tmp_ts : meta->oldest_ts;
// meta->newer_ts = meta->newer_ts > tmp_ts ? meta->newer_ts : tmp_ts;
tmp_ts = DecodeFixed64(iter->value().data() + iter->value().size() - kTSLength);
meta->oldest_ts = meta->oldest_ts > tmp_ts ? tmp_ts : meta->oldest_ts;
meta->newer_ts = meta->newer_ts > tmp_ts ? meta->newer_ts : tmp_ts;
}
if (!key.empty()) {
meta->largest.DecodeFrom(key);

+ 112
- 57
db/db_impl.cc Voir le fichier

@ -14,6 +14,15 @@
#include "db/table_cache.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <cstdio>
#include <iomanip>
#include <iostream>
#include <set>
#include <string>
#include <vector>
#include "leveldb/db.h"
#include "leveldb/env.h"
@ -42,6 +51,7 @@ struct DBImpl::Writer {
WriteBatch* batch;
bool sync;
bool done;
port::CondVar cv;
};
@ -51,6 +61,7 @@ struct DBImpl::CompactionState {
uint64_t number;
uint64_t file_size;
InternalKey smallest, largest;
TIMESTAMP old_ts,new_ts;
};
Output* current_output() { return &outputs[outputs.size() - 1]; }
@ -526,11 +537,15 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
if (s.ok() && meta.file_size > 0) {
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();
const TIMESTAMP new_ts = meta.newer_ts;
const TIMESTAMP old_ts = meta.oldest_ts;
if (base != nullptr) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);// TODO :基于timestamp和size和seek的新的选择规则
}
// edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
// meta.largest);
edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
meta.largest);
meta.largest,old_ts,new_ts);
}
CompactionStats stats;
@ -619,7 +634,7 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,
bg_error_.ok()) {
if (manual_compaction_ == nullptr) { // Idle
manual_compaction_ = &manual;
MaybeScheduleCompaction();//有可能寻址过多,导致allow_seek为0,触发合并。
MaybeScheduleCompaction();
} else { // Running either my compaction or another compaction.
background_work_finished_signal_.Wait();
}
@ -734,8 +749,10 @@ void DBImpl::BackgroundCompaction() {
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->RemoveFile(c->level(), f->number);
// c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
// f->largest);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
f->largest);
f->largest,f->oldest_ts,f->newer_ts);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
@ -747,7 +764,7 @@ void DBImpl::BackgroundCompaction() {
status.ToString().c_str(), versions_->LevelSummary(&tmp));
} else {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
status = DoCompactionWork(compact);//
if (!status.ok()) {
RecordBackgroundError(status);
}
@ -809,6 +826,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
out.number = file_number;
out.smallest.Clear();
out.largest.Clear();
out.old_ts = UINT64_MAX;
out.new_ts = 0;
compact->outputs.push_back(out);
mutex_.Unlock();
}
@ -883,8 +902,10 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
const int level = compact->compaction->level();
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
// compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
// out.smallest, out.largest);
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.smallest, out.largest);
out.smallest, out.largest,out.old_ts,out.new_ts);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}
@ -908,13 +929,13 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}
Iterator* input = versions_->MakeInputIterator(compact->compaction);
// Release mutex while we're actually doing the compaction work
mutex_.Unlock();
input->SeekToFirst();
Status status;
ParsedInternalKey ikey;
TIMESTAMP ts = 0;
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
@ -940,7 +961,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
break;
}
}
//auto a = DecodeFixed64(input->value().data() + input->value().size() - kTSLength);//debug
// Handle key/value, add to state, etc.
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
@ -972,19 +993,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
}
Slice value = input->value();
if (value.size() >= sizeof(uint64_t)) {
const char* ptr = value.data();
std::string temp_str(value.data(), value.size());
uint64_t expiration_time = DBImpl::GetTS(&temp_str);
uint64_t current_time = env_->GetCurrentTime();
}else if((ts = DecodeFixed64(input->value().data() + input->value().size() - kTSLength)) < env_->NowMicros()){
if (current_time > expiration_time) {
drop = true;
}else {drop = false;}
}
drop = true;
}
last_sequence_for_key = ikey.sequence;
}
@ -1010,6 +1022,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
assert(ts != 0);
//auto b = compact->current_output()->old_ts;
compact->current_output()->old_ts = std::min(compact->current_output()->old_ts,ts);
compact->current_output()->new_ts = std::max(compact->current_output()->new_ts,ts);
compact->builder->Add(key, input->value());
// Close output file if it is big enough
@ -1156,29 +1172,29 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {
stats.now_ts = this->env_->NowMicros();
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}
if (s.ok()) {
// 直接在这里判断是否过期
auto t1 = env_->GetCurrentTime();
auto t2 = GetTS(value);
if(t1 >= t2){
// 过期
s = Status::NotFound("NotFound",Slice());
} else {
// 没过期
*value = value->substr(0, value->size() - sizeof(uint64_t));
}
}
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
if(s.ok()){
s = CheckIsExpire(value);
}
if (have_stat_update && current->UpdateStats(stats,s.IsExpire())) {
MaybeScheduleCompaction();//有可能寻址过多,导致allow_seek为0,触发合并。
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
// if(!s.ok()){
// return s;
// }
// auto s2 = CheckIsExpire(value);
// if(!s2.ok()){
// current->UpdateStats(stats);
// }
return s;
}
@ -1215,10 +1231,23 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val);
}
Status DBImpl::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) {
// 扔掉了很复杂的逻辑
return DB::Put(options, key, value, ttl);
std::string val_with_ts;
val_with_ts.reserve(value.size() + kTSLength);
char ts_string[kTSLength];
TIMESTAMP expiration_time = this->env_->NowMicros() + ttl * 1000000;
EncodeFixed64(ts_string,expiration_time);
//assert(sizeof(expiration_time) == sizeof(TIMESTAMP ));
// 追加原始 value 到 val_with_ts
val_with_ts.append(value.data(), value.size());
// 将 expiration_time 追加到 val_with_ts
val_with_ts.append(ts_string,kTSLength);
return DB::Put(options, key, Slice(val_with_ts));
}
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
@ -1503,6 +1532,14 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
}
v->Unref();
}
/**
*
* @param val
* @param val_with_ts val后面连接上预计超时的timestamp
* @param ttl
*/
void DBImpl::AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) {
val_with_ts->reserve(kTSLength + val.size());
char ts_string[kTSLength];
@ -1517,41 +1554,51 @@ void DBImpl::AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) {
* @return timestamp in val,and remove timestamp from val
*/
uint64_t DBImpl::GetTS(std::string* val) {
// 不用auto再写一下 老逻辑:
// uint64_t expiration_time;
// memcpy(&expiration_time, val->data() + val->size() - sizeof(uint64_t), sizeof(uint64_t));
// return expiration_time;
// 新逻辑:
auto expiration_time = DecodeFixed64(val->data() + val->size() - kTSLength);
val->resize(val->size() - kTSLength);
return expiration_time;
}
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
auto expiration_time = DecodeFixed64(val->data() + val->size() - kTSLength);
val->resize(val->size() - kTSLength);
return expiration_time;
}
Status DBImpl::CheckIsExpire(std::string* value) {
//debug 用
auto a = env_->NowMicros();
auto b = GetTS(value);
if(a > b){
return Status::Expire("Expire",Slice());
}
return Status();
Status DB::Put(const WriteOptions& options, const Slice& key,
}
/**
*
* @param options
* @param key
* @param value
* @param ttl
* @return
*/
Status DB::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) {
// 将 value 和 expiration_time 合并到一起,形成带 TTL 的 value
std::string val_with_ts;
val_with_ts.reserve(value.size() + kTSLength);
val_with_ts.reserve(value.size() + sizeof(uint64_t));
uint64_t expiration_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count() + ttl * 1000;
char ts_string[kTSLength];
EncodeFixed64(ts_string, expiration_time);
// 追加原始 value 到 val_with_ts
val_with_ts.append(value.data(), value.size());
// 将 expiration_time 追加到 val_with_ts
// val_with_ts.append(reinterpret_cast<const char*>(&expiration_time), sizeof(expiration_time));
val_with_ts.append(ts_string, kTSLength);
val_with_ts.append(reinterpret_cast<const char*>(&expiration_time), sizeof(expiration_time));
// std::cout<<"PUT"<<std::endl;
// std::cout << "timestamp: " << expiration_time << std::endl;
//"a\323='\277\222\001\000"
@ -1560,6 +1607,14 @@ Status DB::Put(const WriteOptions& options, const Slice& key,
return Write(options, &batch);
}
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}
Status DB::Delete(const WriteOptions& opt, const Slice& key) {
WriteBatch batch;
batch.Delete(key);

+ 8
- 2
db/db_impl.h Voir le fichier

@ -36,8 +36,11 @@ class DBImpl : public DB {
~DBImpl() override;
// Implementations of the DB interface
Status Put(const WriteOptions&, const Slice& key,const Slice& value) override;
Status Put(const WriteOptions& options, const Slice& key, const Slice& value,uint64_t ttl) override;
Status Put(const WriteOptions&, const Slice& key,
const Slice& value) override;
Status Put(const WriteOptions& options, const Slice& key, const Slice& value,
uint64_t ttl) override;
Status Delete(const WriteOptions&, const Slice& key) override;
Status Write(const WriteOptions& options, WriteBatch* updates) override;
Status Get(const ReadOptions& options, const Slice& key,
@ -71,6 +74,8 @@ class DBImpl : public DB {
// Samples are taken approximately once every config::kReadBytesPeriod
// bytes.
void RecordReadSample(Slice key);
// Status Write(const WriteOptions& options, WriteBatch* updates,
// uint64_t ttl) override;
private:
friend class DB;
@ -204,6 +209,7 @@ class DBImpl : public DB {
Status bg_error_ GUARDED_BY(mutex_);
CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_);
Status CheckIsExpire(std::string* value);
};
// Sanitize db options. The caller should delete result.info_log if

+ 3
- 6
db/dbformat.h Voir le fichier

@ -25,16 +25,13 @@ namespace config {
static const int kNumLevels = 7;
// Level-0 compaction is started when we hit this many files.
// 4
static const int kL0_CompactionTrigger = 64;
static const int kL0_CompactionTrigger = 4;
// Soft limit on number of level-0 files. We slow down writes at this point.
// 8
static const int kL0_SlowdownWritesTrigger = 64;
static const int kL0_SlowdownWritesTrigger = 8;
// Maximum number of level-0 files. We stop writes at this point.
// 12
static const int kL0_StopWritesTrigger = 64;
static const int kL0_StopWritesTrigger = 12;
// Maximum level to which a new compacted memtable is pushed if it
// does not create overlap. We try to push to level 2 to avoid the

+ 8
- 18
db/memtable.cc Voir le fichier

@ -8,7 +8,6 @@
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "util/coding.h"
#include "db/db_impl.h"
namespace leveldb {
@ -82,6 +81,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
// tag : uint64((sequence << 8) | type)
// value_size : varint32 of value.size()
// value bytes : char[value.size()],[real_value,timestamp(uint64)]
//
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
@ -100,6 +100,13 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
table_.Insert(buf);
}
/**
* @descrption: ttl问题场景keyttl的相对大小于版本号的相对大小不同
* @param key
* @param value
* @param s
* @return
*/
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
@ -125,23 +132,6 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
case kTypeValue: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
// 这里新加了一些逻辑
if (value->size() >= sizeof(uint64_t)){
const char* ptr = value->data();
std::string temp_str(value->data(), value->size());
uint64_t expiration_time = DBImpl::GetTS(&temp_str);
uint64_t current_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
if (current_time > expiration_time) {
*s = Status::NotFound("NotFound",Slice());
}else {
*value = value->substr(0, value->size() - sizeof(uint64_t));
}
}
return true;
}
case kTypeDeletion:

+ 9
- 0
db/version_edit.cc Voir le fichier

@ -81,6 +81,8 @@ void VersionEdit::EncodeTo(std::string* dst) const {
PutVarint64(dst, f.file_size);
PutLengthPrefixedSlice(dst, f.smallest.Encode());
PutLengthPrefixedSlice(dst, f.largest.Encode());
PutFixed64(dst,f.oldest_ts);
PutFixed64(dst,f.newer_ts);
}
}
@ -180,6 +182,9 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
GetVarint64(&input, &f.file_size) &&
GetInternalKey(&input, &f.smallest) &&
GetInternalKey(&input, &f.largest)) {
f.newer_ts = DecodeFixed64(input.data());
f.oldest_ts = DecodeFixed64(input.data() + kTSLength);
input.remove_prefix(2 * kTSLength);
new_files_.push_back(std::make_pair(level, f));
} else {
msg = "new-file entry";
@ -250,6 +255,10 @@ std::string VersionEdit::DebugString() const {
r.append(f.smallest.DebugString());
r.append(" .. ");
r.append(f.largest.DebugString());
r.append(" ");
AppendNumberTo(&r,f.oldest_ts);
r.append("..");
AppendNumberTo(&r,f.newer_ts);
}
r.append("\n}\n");
return r;

+ 19
- 1
db/version_edit.h Voir le fichier

@ -16,7 +16,7 @@ namespace leveldb {
class VersionSet;
struct FileMetaData {
FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) {}
FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0),oldest_ts(UINT64_MAX),newer_ts(0) {}
int refs;
int allowed_seeks; // Seeks allowed until compaction
@ -24,6 +24,10 @@ struct FileMetaData {
uint64_t file_size; // File size in bytes
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table
TIMESTAMP oldest_ts;
TIMESTAMP newer_ts;
};
class VersionEdit {
@ -69,6 +73,20 @@ class VersionEdit {
f.largest = largest;
new_files_.push_back(std::make_pair(level, f));
}
void AddFile(int level, uint64_t file, uint64_t file_size,
const InternalKey& smallest, const InternalKey& largest,const TIMESTAMP old_ts,const TIMESTAMP new_ts) {
FileMetaData f;
f.number = file;
f.file_size = file_size;
f.smallest = smallest;
f.largest = largest;
f.newer_ts = new_ts;
f.oldest_ts = old_ts;
new_files_.push_back(std::make_pair(level, f));
}
// Delete the specified "file" from the specified "level".
void RemoveFile(int level, uint64_t file) {

+ 18
- 3
db/version_set.cc Voir le fichier

@ -351,7 +351,7 @@ Status Version::Get(const ReadOptions& options, const LookupKey& k,
state->last_file_read = f;
state->last_file_read_level = level;
// if(state->stats->now_ts > f->newer_ts)return false;
if(state->stats->now_ts > f->newer_ts)return false;
state->s = state->vset->table_cache_->Get(*state->options, f->number,
f->file_size, state->ikey,
&state->saver, SaveValue);
@ -413,6 +413,20 @@ bool Version::UpdateStats(const GetStats& stats) {
return false;
}
bool Version::UpdateStats(const GetStats& stats,bool is_expire) {
FileMetaData* f = stats.seek_file;
if (f != nullptr) {
f->allowed_seeks--;
if(is_expire)f->allowed_seeks--;
if (f->allowed_seeks <= 0 && file_to_compact_ == nullptr) {
file_to_compact_ = f;
file_to_compact_level_ = stats.seek_file_level;
return true;
}
}
return false;
}
bool Version::RecordReadSample(Slice internal_key) {
ParsedInternalKey ikey;
if (!ParseInternalKey(internal_key, &ikey)) {
@ -821,7 +835,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
// Write new record to MANIFEST log
if (s.ok()) {
std::string record;
edit->EncodeTo(&record);// TODO:修改
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync();
@ -1088,7 +1102,8 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
const std::vector<FileMetaData*>& files = current_->files_[level];
for (size_t i = 0; i < files.size(); i++) {
const FileMetaData* f = files[i];
edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest);
//edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest);
edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest,f->oldest_ts,f->newer_ts);
}
}

+ 3
- 3
db/version_set.h Voir le fichier

@ -80,8 +80,8 @@ class Version {
// compaction may need to be triggered, false otherwise.
// REQUIRES: lock is held
bool UpdateStats(const GetStats& stats);
// Record a sample of bytes read at the specified internal key.
bool UpdateStats(const GetStats& stats,bool is_expire);
// Record a sample of bytes read at the specified internal key.
// Samples are taken approximately once every config::kReadBytesPeriod
// bytes. Returns true if a new compaction may need to be triggered.
// REQUIRES: lock is held
@ -313,7 +313,7 @@ class VersionSet {
// Per-level key at which the next compaction at that level should start.
// Either an empty string, or a valid InternalKey.
std::string compact_pointer_[config::kNumLevels];
std::string compact_pointer_[config::kNumLevels];//
};
// A Compaction encapsulates information about a compaction.

+ 6
- 1
include/leveldb/db.h Voir le fichier

@ -148,7 +148,12 @@ class LEVELDB_EXPORT DB {
// ----------------------------For TTL-----------------------------
// key设置ttl
virtual Status Put(const WriteOptions& options, const Slice& key,const Slice& value, uint64_t ttl) = 0;
virtual Status Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) = 0;
// virtual Status Write(const WriteOptions& options, WriteBatch* updates,uint64_t ttl) = 0;
// virtual void AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) = 0;
// virtual void AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) = 0;
};
// Destroy the contents of the specified database.

+ 3
- 0
include/leveldb/slice.h Voir le fichier

@ -38,6 +38,7 @@ class LEVELDB_EXPORT Slice {
// Create a slice that refers to s[0,strlen(s)-1]
Slice(const char* s) : data_(s), size_(strlen(s)) {}
// Intentionally copyable.
Slice(const Slice&) = default;
Slice& operator=(const Slice&) = default;
@ -48,6 +49,7 @@ class LEVELDB_EXPORT Slice {
// Return the length (in bytes) of the referenced data
size_t size() const { return size_; }
// Return true iff the length of the referenced data is zero
bool empty() const { return size_ == 0; }
@ -74,6 +76,7 @@ class LEVELDB_EXPORT Slice {
size_ -= n;
}
// Return a string that contains the copy of the referenced data.
std::string ToString() const { return std::string(data_, size_); }

+ 1
- 0
include/leveldb/status.h Voir le fichier

@ -74,6 +74,7 @@ class LEVELDB_EXPORT Status {
// Returns true iff the status indicates an InvalidArgument.
bool IsInvalidArgument() const { return code() == kInvalidArgument; }
bool IsExpire()const{return code() == kExpire;}
// Return a string representation of this status suitable for printing.
// Returns the string "OK" for success.
std::string ToString() const;

+ 1
- 21
table/block.cc Voir le fichier

@ -220,27 +220,7 @@ class Block::Iter : public Iterator {
if (!ParseNextKey()) {
return;
}
if (Compare(key_, target) >= 0) { // 找到了key_
// 这里新加了一些逻辑
std::string value(value_.data(), value_.size());
if (value.size() >= sizeof(uint64_t)){
const char* ptr = value.data();
std::string temp_str(value.data(), value.size());
uint64_t expiration_time = DecodeFixed64((&temp_str)->data() + (&temp_str)->size() - sizeof(uint64_t));
(&temp_str)->resize((&temp_str)->size() - sizeof(uint64_t));
uint64_t current_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
if (current_time > expiration_time) {
status_ = Status::NotFound("NotFound",Slice());
}else {
value_ = value.substr(0, value.size() - sizeof(uint64_t));
}
}
if (Compare(key_, target) >= 0) {
return;
}
}

+ 1
- 1
test/simple_test.cc Voir le fichier

@ -113,7 +113,7 @@ int main() {
std::string key = std::to_string(key_);
std::string value(1, 'a');
db->Put(writeOptions, key, value, ttl);
std::cout << "time to alive" << ttl << std::endl;
//std::cout << "time to alive" << ttl << std::endl;
ReadOptions readOptions;
std::string value_read;

+ 7
- 6
test/ttl_test.cc Voir le fichier

@ -39,13 +39,14 @@ void GetData(DB *db, int size = (1 << 30)) {
db->Get(readOptions, key, &value);
}
}
//
TEST(TestTTL, ReadTTL) {
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// 如果出现open db fail,请酌情提高这里
uint64_t ttl = 20;
InsertData(db, ttl);
@ -72,11 +73,12 @@ TEST(TestTTL, ReadTTL) {
ASSERT_FALSE(status.ok());
}
delete db;
Env::Default()->SleepForMicroseconds( 1000);
}
TEST(TestTTL, CompactionTTL) {
DB *db;
DestroyDB("testdb", Options());
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
@ -95,12 +97,11 @@ TEST(TestTTL, CompactionTTL) {
db->CompactRange(nullptr, nullptr);
leveldb::Range ranges1[1];
ranges[0] = leveldb::Range("-", "A");
leveldb::Range ranges2[1];
ranges2[0] = leveldb::Range("-", "A");
uint64_t sizes2[1];
db->GetApproximateSizes(ranges1, 1, sizes2);
db->GetApproximateSizes(ranges2, 1, sizes2);
ASSERT_EQ(sizes2[0], 0);
delete db;
}

+ 7
- 0
util/coding.cc Voir le fichier

@ -69,6 +69,7 @@ void PutVarint64(std::string* dst, uint64_t v) {
dst->append(buf, ptr - buf);
}
void PutLengthPrefixedSlice(std::string* dst, const Slice& value) {
PutVarint32(dst, value.size());
dst->append(value.data(), value.size());
@ -142,6 +143,12 @@ bool GetVarint64(Slice* input, uint64_t* value) {
}
}
//bool GetFixed64(const char* ptr,uint64_t* value){
// *value = DecodeFixed64(ptr);
// return
//}
bool GetLengthPrefixedSlice(Slice* input, Slice* result) {
uint32_t len;
if (GetVarint32(input, &len) && input->size() >= len) {

Chargement…
Annuler
Enregistrer