Compare commits

...

5 Комити

14 измењених фајлова са 185 додато и 125 уклоњено
  1. +19
    -1
      db/builder.cc
  2. +49
    -65
      db/db_impl.cc
  3. +3
    -3
      db/memtable.cc
  4. +9
    -0
      db/version_edit.cc
  5. +19
    -1
      db/version_edit.h
  6. +25
    -9
      db/version_set.cc
  7. +4
    -3
      db/version_set.h
  8. +1
    -1
      db/write_batch.cc
  9. +1
    -0
      include/leveldb/status.h
  10. +7
    -1
      table/table_builder.cc
  11. +1
    -1
      test/simple_test.cc
  12. +2
    -2
      test/ttl_test.cc
  13. +7
    -0
      util/coding.cc
  14. +38
    -38
      util/comparator.cc

+ 19
- 1
db/builder.cc Прегледај датотеку

@ -14,6 +14,16 @@
namespace leveldb {
/**
* description: SSTable
* @param dbname
* @param env
* @param options
* @param table_cache
* @param iter
* @param meta sstable的相关元数据
* @return
*/
Status BuildTable(const std::string& dbname, Env* env, const Options& options,
TableCache* table_cache, Iterator* iter, FileMetaData* meta) {
Status s;
@ -21,6 +31,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
iter->SeekToFirst();
std::string fname = TableFileName(dbname, meta->number);
if (iter->Valid()) {
WritableFile* file;
s = env->NewWritableFile(fname, &file);
@ -29,11 +40,18 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
}
TableBuilder* builder = new TableBuilder(options, file);
meta->smallest.DecodeFrom(iter->key());
meta->smallest.DecodeFrom(iter->key());//这里是internal_key
auto tmp_ts = DecodeFixed64(iter->value().data() + iter->value().size() - kTSLength);
meta->oldest_ts = tmp_ts;
meta->newer_ts = tmp_ts;
Slice key;
for (; iter->Valid(); iter->Next()) {
key = iter->key();
builder->Add(key, iter->value());
tmp_ts = DecodeFixed64(iter->value().data() + iter->value().size() - kTSLength);
meta->oldest_ts = meta->oldest_ts > tmp_ts ? tmp_ts : meta->oldest_ts;
meta->newer_ts = meta->newer_ts > tmp_ts ? meta->newer_ts : tmp_ts;
}
if (!key.empty()) {
meta->largest.DecodeFrom(key);

+ 49
- 65
db/db_impl.cc Прегледај датотеку

@ -61,6 +61,7 @@ struct DBImpl::CompactionState {
uint64_t number;
uint64_t file_size;
InternalKey smallest, largest;
TIMESTAMP old_ts,new_ts;
};
Output* current_output() { return &outputs[outputs.size() - 1]; }
@ -520,7 +521,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
Status s;
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);//meta包含largest_key,此时未刷盘meta,但meta.number对应生成的file会刷盘
mutex_.Lock();
}
@ -536,11 +537,15 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
if (s.ok() && meta.file_size > 0) {
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();
const TIMESTAMP new_ts = meta.newer_ts;
const TIMESTAMP old_ts = meta.oldest_ts;
if (base != nullptr) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);// TODO :基于timestamp和size和seek的新的选择规则
}
// edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
// meta.largest);
edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
meta.largest);
meta.largest,old_ts,new_ts);
}
CompactionStats stats;
@ -598,6 +603,7 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
for (int level = 0; level < max_level_with_files; level++) {
TEST_CompactRange(level, begin, end);
}
TEST_CompactRange(max_level_with_files, begin, end);
}
void DBImpl::TEST_CompactRange(int level, const Slice* begin,
@ -743,8 +749,10 @@ void DBImpl::BackgroundCompaction() {
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0);
c->edit()->RemoveFile(c->level(), f->number);
// c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
// f->largest);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
f->largest);
f->largest,f->oldest_ts,f->newer_ts);
status = versions_->LogAndApply(c->edit(), &mutex_);
if (!status.ok()) {
RecordBackgroundError(status);
@ -756,7 +764,7 @@ void DBImpl::BackgroundCompaction() {
status.ToString().c_str(), versions_->LevelSummary(&tmp));
} else {
CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact);
status = DoCompactionWork(compact);//
if (!status.ok()) {
RecordBackgroundError(status);
}
@ -818,6 +826,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
out.number = file_number;
out.smallest.Clear();
out.largest.Clear();
out.old_ts = UINT64_MAX;
out.new_ts = 0;
compact->outputs.push_back(out);
mutex_.Unlock();
}
@ -892,8 +902,10 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
const int level = compact->compaction->level();
for (size_t i = 0; i < compact->outputs.size(); i++) {
const CompactionState::Output& out = compact->outputs[i];
// compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
// out.smallest, out.largest);
compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
out.smallest, out.largest);
out.smallest, out.largest,out.old_ts,out.new_ts);
}
return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
}
@ -917,13 +929,13 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}
Iterator* input = versions_->MakeInputIterator(compact->compaction);
// Release mutex while we're actually doing the compaction work
mutex_.Unlock();
input->SeekToFirst();
Status status;
ParsedInternalKey ikey;
TIMESTAMP ts = 0;
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
@ -949,7 +961,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
break;
}
}
//auto a = DecodeFixed64(input->value().data() + input->value().size() - kTSLength);//debug
// Handle key/value, add to state, etc.
bool drop = false;
if (!ParseInternalKey(key, &ikey)) {
@ -981,6 +993,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
drop = true;
}else if((ts = DecodeFixed64(input->value().data() + input->value().size() - kTSLength)) < env_->NowMicros()){
drop = true;
}
last_sequence_for_key = ikey.sequence;
@ -1007,6 +1022,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
assert(ts != 0);
//auto b = compact->current_output()->old_ts;
compact->current_output()->old_ts = std::min(compact->current_output()->old_ts,ts);
compact->current_output()->new_ts = std::max(compact->current_output()->new_ts,ts);
compact->builder->Add(key, input->value());
// Close output file if it is big enough
@ -1153,23 +1172,30 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {
stats.now_ts = this->env_->NowMicros();
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
if(s.ok()){
s = CheckIsExpire(value);
}
if (have_stat_update && current->UpdateStats(stats,s.IsExpire())) {
MaybeScheduleCompaction();//有可能寻址过多,导致allow_seek为0,触发合并。
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
if(!s.ok()){
return s;
}
auto s2 = CheckIsExpire(value);
return s2;
// if(!s.ok()){
// return s;
// }
// auto s2 = CheckIsExpire(value);
// if(!s2.ok()){
// current->UpdateStats(stats);
// }
return s;
}
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
@ -1209,26 +1235,11 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
Status DBImpl::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) {
//rocksdb的实现
// Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts,
// SystemClock* clock) {
// val_with_ts->reserve(kTSLength + val.size());
// char ts_string[kTSLength];
// int64_t curtime;
// Status st = clock->GetCurrentTime(&curtime);
// if (!st.ok()) {
// return st;
// }
// EncodeFixed32(ts_string, (int32_t)curtime);
// val_with_ts->append(val.data(), val.size());
// val_with_ts->append(ts_string, kTSLength);
// return st;
// }
std::string val_with_ts;
val_with_ts.reserve(value.size() + kTSLength);
char ts_string[kTSLength];
TIMESTAMP expiration_time = this->env_->GetCurrentTime() + ttl * 1000;
TIMESTAMP expiration_time = this->env_->NowMicros() + ttl * 1000000;
EncodeFixed64(ts_string,expiration_time);
//assert(sizeof(expiration_time) == sizeof(TIMESTAMP ));
// 追加原始 value 到 val_with_ts
@ -1236,12 +1247,6 @@ Status DBImpl::Put(const WriteOptions& options, const Slice& key,
// 将 expiration_time 追加到 val_with_ts
val_with_ts.append(ts_string,kTSLength);
// std::cout << "val_with_ts in hex: ";
// for (unsigned char c : val_with_ts) {
// std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)c << " ";
// }
// std::cout << std::endl;
return DB::Put(options, key, Slice(val_with_ts));
}
@ -1549,45 +1554,24 @@ void DBImpl::AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) {
* @return timestamp in val,and remove timestamp from val
*/
uint64_t DBImpl::GetTS(std::string* val) {
//uint64_t expiration_time;
// 输出 val 的十六进制表示
// std::cout << "befor decode,val in hex: ";
// for (unsigned char c : *val) {
// std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)c << " ";
// }
// std::cout << std::endl;
auto expiration_time = DecodeFixed64(val->data() + val->size() - kTSLength);
//memcpy(&expiration_time, val->data() + val->size() - sizeof(TIMESTAMP), sizeof(TIMESTAMP));
val->resize(val->size() - kTSLength);
// std::cout << "after decode,val in hex: ";
// for (unsigned char c : *val) {
// std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)c << " ";
// }
// std::cout << std::endl;
return expiration_time;
// Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) {
// if (pinnable_val->size() < kTSLength) {
// return Status::Corruption("Bad timestamp in key-value");
// }
// // Erasing characters which hold the TS
// pinnable_val->remove_suffix(kTSLength);
// return Status::OK();
// }
}
Status DBImpl::CheckIsExpire(std::string* value) {
//debug 用
auto a = env_->GetCurrentTime();
auto a = env_->NowMicros();
auto b = GetTS(value);
// std::cout<<"get current time"<<a<<std::endl;
// std::cout << "get ts from val"<<b<<std::endl;
if(a > b){
return Status::Expire("Expire",Slice());
}
return Status();
// if(env_->GetCurrentTime() > GetTS(value)){
// return Status::Expire("Expire",Slice());
// }
}
/**

+ 3
- 3
db/memtable.cc Прегледај датотеку

@ -80,8 +80,8 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
// key bytes : char[internal_key.size()]
// tag : uint64((sequence << 8) | type)
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
// timestamp : uint64
// value bytes : char[value.size()],[real_value,timestamp(uint64)]
//
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
@ -95,7 +95,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
std::memcpy(p, value.data(), val_size);//value包含timestamp
std::memcpy(p, value.data(), val_size);//value包含timestamp,timestamp,8字节
assert(p + val_size == buf + encoded_len);
table_.Insert(buf);
}

+ 9
- 0
db/version_edit.cc Прегледај датотеку

@ -81,6 +81,8 @@ void VersionEdit::EncodeTo(std::string* dst) const {
PutVarint64(dst, f.file_size);
PutLengthPrefixedSlice(dst, f.smallest.Encode());
PutLengthPrefixedSlice(dst, f.largest.Encode());
PutFixed64(dst,f.oldest_ts);
PutFixed64(dst,f.newer_ts);
}
}
@ -180,6 +182,9 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
GetVarint64(&input, &f.file_size) &&
GetInternalKey(&input, &f.smallest) &&
GetInternalKey(&input, &f.largest)) {
f.newer_ts = DecodeFixed64(input.data());
f.oldest_ts = DecodeFixed64(input.data() + kTSLength);
input.remove_prefix(2 * kTSLength);
new_files_.push_back(std::make_pair(level, f));
} else {
msg = "new-file entry";
@ -250,6 +255,10 @@ std::string VersionEdit::DebugString() const {
r.append(f.smallest.DebugString());
r.append(" .. ");
r.append(f.largest.DebugString());
r.append(" ");
AppendNumberTo(&r,f.oldest_ts);
r.append("..");
AppendNumberTo(&r,f.newer_ts);
}
r.append("\n}\n");
return r;

+ 19
- 1
db/version_edit.h Прегледај датотеку

@ -16,7 +16,7 @@ namespace leveldb {
class VersionSet;
struct FileMetaData {
FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) {}
FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0),oldest_ts(UINT64_MAX),newer_ts(0) {}
int refs;
int allowed_seeks; // Seeks allowed until compaction
@ -24,6 +24,10 @@ struct FileMetaData {
uint64_t file_size; // File size in bytes
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table
TIMESTAMP oldest_ts;
TIMESTAMP newer_ts;
};
class VersionEdit {
@ -69,6 +73,20 @@ class VersionEdit {
f.largest = largest;
new_files_.push_back(std::make_pair(level, f));
}
void AddFile(int level, uint64_t file, uint64_t file_size,
const InternalKey& smallest, const InternalKey& largest,const TIMESTAMP old_ts,const TIMESTAMP new_ts) {
FileMetaData f;
f.number = file;
f.file_size = file_size;
f.smallest = smallest;
f.largest = largest;
f.newer_ts = new_ts;
f.oldest_ts = old_ts;
new_files_.push_back(std::make_pair(level, f));
}
// Delete the specified "file" from the specified "level".
void RemoveFile(int level, uint64_t file) {

+ 25
- 9
db/version_set.cc Прегледај датотеку

@ -138,7 +138,7 @@ bool SomeFileOverlapsRange(const InternalKeyComparator& icmp,
return false;
}
// Binary search over file list
// Binary search over file list, 保证SSTable有序
uint32_t index = 0;
if (smallest_user_key != nullptr) {
// Find the earliest possible internal key for smallest_user_key
@ -351,6 +351,7 @@ Status Version::Get(const ReadOptions& options, const LookupKey& k,
state->last_file_read = f;
state->last_file_read_level = level;
if(state->stats->now_ts > f->newer_ts)return false;
state->s = state->vset->table_cache_->Get(*state->options, f->number,
f->file_size, state->ikey,
&state->saver, SaveValue);
@ -412,6 +413,20 @@ bool Version::UpdateStats(const GetStats& stats) {
return false;
}
bool Version::UpdateStats(const GetStats& stats,bool is_expire) {
FileMetaData* f = stats.seek_file;
if (f != nullptr) {
f->allowed_seeks--;
if(is_expire)f->allowed_seeks--;
if (f->allowed_seeks <= 0 && file_to_compact_ == nullptr) {
file_to_compact_ = f;
file_to_compact_level_ = stats.seek_file_level;
return true;
}
}
return false;
}
bool Version::RecordReadSample(Slice internal_key) {
ParsedInternalKey ikey;
if (!ParseInternalKey(internal_key, &ikey)) {
@ -460,7 +475,7 @@ void Version::Unref() {
delete this;
}
}
//files_[level] 中存在至少一个文件的键范围与 [smallest_user_key, largest_user_key] 有重叠,那么 OverlapInLevel 就会返回 true。
bool Version::OverlapInLevel(int level, const Slice* smallest_user_key,
const Slice* largest_user_key) {
return SomeFileOverlapsRange(vset_->icmp_, (level > 0), files_[level],
@ -470,7 +485,7 @@ bool Version::OverlapInLevel(int level, const Slice* smallest_user_key,
int Version::PickLevelForMemTableOutput(const Slice& smallest_user_key,
const Slice& largest_user_key) {
int level = 0;
if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {
if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {//当与level0没有重叠时,直接选择压入其他层
// Push to next level if there is no overlap in next level,
// and the #bytes overlapping in the level after that are limited.
InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
@ -664,7 +679,7 @@ class VersionSet::Builder {
if (f->allowed_seeks < 100) f->allowed_seeks = 100;
levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
levels_[level].added_files->insert(f);//按照最小key排序
}
}
@ -795,7 +810,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v);
Finalize(v);//对于每层compact打分
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
@ -1087,7 +1102,8 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
const std::vector<FileMetaData*>& files = current_->files_[level];
for (size_t i = 0; i < files.size(); i++) {
const FileMetaData* f = files[i];
edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest);
//edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest);
edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest,f->oldest_ts,f->newer_ts);
}
}
@ -1386,7 +1402,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
const int level = c->level();
InternalKey smallest, largest;
AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]);
AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]);//增加边界值,userkey相等的情况
GetRange(c->inputs_[0], &smallest, &largest);
current_->GetOverlappingInputs(level + 1, &smallest, &largest,
@ -1444,11 +1460,11 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
compact_pointer_[level] = largest.Encode().ToString();
c->edit_.SetCompactPointer(level, largest);
}
//得到基于当前level的所涉及的需要compact的文件,(level0可能会涉及到level1等)
Compaction* VersionSet::CompactRange(int level, const InternalKey* begin,
const InternalKey* end) {
std::vector<FileMetaData*> inputs;
current_->GetOverlappingInputs(level, begin, end, &inputs);
current_->GetOverlappingInputs(level, begin, end, &inputs);//得到一层的input
if (inputs.empty()) {
return nullptr;
}

+ 4
- 3
db/version_set.h Прегледај датотеку

@ -62,6 +62,7 @@ class Version {
struct GetStats {
FileMetaData* seek_file;
int seek_file_level;
TIMESTAMP now_ts;
};
// Append to *iters a sequence of iterators that will
@ -79,8 +80,8 @@ class Version {
// compaction may need to be triggered, false otherwise.
// REQUIRES: lock is held
bool UpdateStats(const GetStats& stats);
// Record a sample of bytes read at the specified internal key.
bool UpdateStats(const GetStats& stats,bool is_expire);
// Record a sample of bytes read at the specified internal key.
// Samples are taken approximately once every config::kReadBytesPeriod
// bytes. Returns true if a new compaction may need to be triggered.
// REQUIRES: lock is held
@ -312,7 +313,7 @@ class VersionSet {
// Per-level key at which the next compaction at that level should start.
// Either an empty string, or a valid InternalKey.
std::string compact_pointer_[config::kNumLevels];
std::string compact_pointer_[config::kNumLevels];//
};
// A Compaction encapsulates information about a compaction.

+ 1
- 1
db/write_batch.cc Прегледај датотеку

@ -99,7 +99,7 @@ void WriteBatch::Put(const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
PutLengthPrefixedSlice(&rep_, value);//默认value的最大字节长度不会超过变长32位的表示。
}
void WriteBatch::Delete(const Slice& key) {

+ 1
- 0
include/leveldb/status.h Прегледај датотеку

@ -74,6 +74,7 @@ class LEVELDB_EXPORT Status {
// Returns true iff the status indicates an InvalidArgument.
bool IsInvalidArgument() const { return code() == kInvalidArgument; }
bool IsExpire()const{return code() == kExpire;}
// Return a string representation of this status suitable for printing.
// Returns the string "OK" for success.
std::string ToString() const;

+ 7
- 1
table/table_builder.cc Прегледај датотеку

@ -31,7 +31,8 @@ struct TableBuilder::Rep {
filter_block(opt.filter_policy == nullptr
? nullptr
: new FilterBlockBuilder(opt.filter_policy)),
pending_index_entry(false) {
pending_index_entry(false),
has_ttl_filter_(false){
index_block_options.block_restart_interval = 1;
}
@ -57,6 +58,7 @@ struct TableBuilder::Rep {
//
// Invariant: r->pending_index_entry is true only if data_block is empty.
bool pending_index_entry;
bool has_ttl_filter_;
BlockHandle pending_handle; // Handle to add to index block
std::string compressed_output;
@ -110,8 +112,12 @@ void TableBuilder::Add(const Slice& key, const Slice& value) {
if (r->filter_block != nullptr) {
r->filter_block->AddKey(key);
// if(r->has_ttl_filter_){
//
// }
}
r->last_key.assign(key.data(), key.size());
r->num_entries++;
r->data_block.Add(key, value);

+ 1
- 1
test/simple_test.cc Прегледај датотеку

@ -113,7 +113,7 @@ int main() {
std::string key = std::to_string(key_);
std::string value(1, 'a');
db->Put(writeOptions, key, value, ttl);
std::cout << "time to alive" << ttl << std::endl;
//std::cout << "time to alive" << ttl << std::endl;
ReadOptions readOptions;
std::string value_read;

+ 2
- 2
test/ttl_test.cc Прегледај датотеку

@ -39,7 +39,7 @@ void GetData(DB *db, int size = (1 << 30)) {
db->Get(readOptions, key, &value);
}
}
//
TEST(TestTTL, ReadTTL) {
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
@ -100,7 +100,7 @@ TEST(TestTTL, CompactionTTL) {
ranges2[0] = leveldb::Range("-", "A");
uint64_t sizes2[1];
db->GetApproximateSizes(ranges2, 1, sizes2);
ASSERT_EQ(sizes[0], 0);
ASSERT_EQ(sizes2[0], 0);
}

+ 7
- 0
util/coding.cc Прегледај датотеку

@ -69,6 +69,7 @@ void PutVarint64(std::string* dst, uint64_t v) {
dst->append(buf, ptr - buf);
}
void PutLengthPrefixedSlice(std::string* dst, const Slice& value) {
PutVarint32(dst, value.size());
dst->append(value.data(), value.size());
@ -142,6 +143,12 @@ bool GetVarint64(Slice* input, uint64_t* value) {
}
}
//bool GetFixed64(const char* ptr,uint64_t* value){
// *value = DecodeFixed64(ptr);
// return
//}
bool GetLengthPrefixedSlice(Slice* input, Slice* result) {
uint32_t len;
if (GetVarint32(input, &len) && input->size() >= len) {

+ 38
- 38
util/comparator.cc Прегледај датотеку

@ -18,53 +18,53 @@ namespace leveldb {
Comparator::~Comparator() = default;
namespace {
class BytewiseComparatorImpl : public Comparator {
public:
BytewiseComparatorImpl() = default;
class BytewiseComparatorImpl : public Comparator {
public:
BytewiseComparatorImpl() = default;
const char* Name() const override { return "leveldb.BytewiseComparator"; }
const char* Name() const override { return "leveldb.BytewiseComparator"; }
int Compare(const Slice& a, const Slice& b) const override {
return a.compare(b);
}
void FindShortestSeparator(std::string* start,
const Slice& limit) const override {
// Find length of common prefix
size_t min_length = std::min(start->size(), limit.size());
size_t diff_index = 0;
while ((diff_index < min_length) &&
((*start)[diff_index] == limit[diff_index])) {
diff_index++;
int Compare(const Slice& a, const Slice& b) const override {
return a.compare(b);
}
//最终生成的 start 将是一个有效的、更短的键,仍然在 limit 范围内;能够这么运行的前提:数据按key有序输入
void FindShortestSeparator(std::string* start,
const Slice& limit) const override {
// Find length of common prefix
size_t min_length = std::min(start->size(), limit.size());
size_t diff_index = 0;
while ((diff_index < min_length) &&
((*start)[diff_index] == limit[diff_index])) {
diff_index++;
}
if (diff_index >= min_length) {
// Do not shorten if one string is a prefix of the other
} else {
uint8_t diff_byte = static_cast<uint8_t>((*start)[diff_index]);
if (diff_byte < static_cast<uint8_t>(0xff) &&
diff_byte + 1 < static_cast<uint8_t>(limit[diff_index])) {
(*start)[diff_index]++;
start->resize(diff_index + 1);
assert(Compare(*start, limit) < 0);
if (diff_index >= min_length) {
// Do not shorten if one string is a prefix of the other
} else {
uint8_t diff_byte = static_cast<uint8_t>((*start)[diff_index]);
if (diff_byte < static_cast<uint8_t>(0xff) &&
diff_byte + 1 < static_cast<uint8_t>(limit[diff_index])) {
(*start)[diff_index]++;
start->resize(diff_index + 1);
assert(Compare(*start, limit) < 0);
}
}
}
}
void FindShortSuccessor(std::string* key) const override {
// Find first character that can be incremented
size_t n = key->size();
for (size_t i = 0; i < n; i++) {
const uint8_t byte = (*key)[i];
if (byte != static_cast<uint8_t>(0xff)) {
(*key)[i] = byte + 1;
key->resize(i + 1);
return;
void FindShortSuccessor(std::string* key) const override {
// Find first character that can be incremented
size_t n = key->size();
for (size_t i = 0; i < n; i++) {
const uint8_t byte = (*key)[i];
if (byte != static_cast<uint8_t>(0xff)) {
(*key)[i] = byte + 1;
key->resize(i + 1);
return;
}
}
// *key is a run of 0xffs. Leave it alone.
}
// *key is a run of 0xffs. Leave it alone.
}
};
};
} // namespace
const Comparator* BytewiseComparator() {

Loading…
Откажи
Сачувај