林子骥 преди 3 седмици
родител
ревизия
4219142a3c
променени са 9 файла, в които са добавени 72 реда и са изтрити 51 реда
  1. +9
    -1
      db/builder.cc
  2. +3
    -2
      db/db_impl.cc
  3. +3
    -3
      db/memtable.cc
  4. +4
    -0
      db/version_edit.h
  5. +6
    -5
      db/version_set.cc
  6. +1
    -0
      db/version_set.h
  7. +1
    -1
      db/write_batch.cc
  8. +7
    -1
      table/table_builder.cc
  9. +38
    -38
      util/comparator.cc

+ 9
- 1
db/builder.cc Целия файл

@ -21,6 +21,7 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
iter->SeekToFirst();
std::string fname = TableFileName(dbname, meta->number);
if (iter->Valid()) {
WritableFile* file;
s = env->NewWritableFile(fname, &file);
@ -29,11 +30,18 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
}
TableBuilder* builder = new TableBuilder(options, file);
meta->smallest.DecodeFrom(iter->key());
meta->smallest.DecodeFrom(iter->key());//这里是internal_key
// auto tmp_ts = DecodeFixed64(iter->value().data() + iter->value().size() - kTSLength);
// meta->oldest_ts = tmp_ts;
// meta->newer_ts = tmp_ts;
Slice key;
for (; iter->Valid(); iter->Next()) {
key = iter->key();
builder->Add(key, iter->value());
// tmp_ts = DecodeFixed64(iter->value().data() + iter->value().size() - kTSLength);
// meta->oldest_ts = meta->oldest_ts > tmp_ts ? tmp_ts : meta->oldest_ts;
// meta->newer_ts = meta->newer_ts > tmp_ts ? meta->newer_ts : tmp_ts;
}
if (!key.empty()) {
meta->largest.DecodeFrom(key);

+ 3
- 2
db/db_impl.cc Целия файл

@ -520,7 +520,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
Status s;
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);//meta包含largest_key,此时未刷盘meta,但meta.number对应生成的file会刷盘
mutex_.Lock();
}
@ -1156,6 +1156,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {
//stats.now_ts = this->env_->NowMicros();
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
@ -1163,7 +1164,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
}
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
MaybeScheduleCompaction();//有可能寻址过多,导致allow_seek为0,触发合并。
}
mem->Unref();
if (imm != nullptr) imm->Unref();

+ 3
- 3
db/memtable.cc Целия файл

@ -80,8 +80,8 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
// key bytes : char[internal_key.size()]
// tag : uint64((sequence << 8) | type)
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
// timestamp : uint64
// value bytes : char[value.size()],[real_value,timestamp(uint64)]
//
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
@ -95,7 +95,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
std::memcpy(p, value.data(), val_size);//value包含timestamp
std::memcpy(p, value.data(), val_size);//value包含timestamp,timestamp,8字节
assert(p + val_size == buf + encoded_len);
table_.Insert(buf);
}

+ 4
- 0
db/version_edit.h Целия файл

@ -24,6 +24,10 @@ struct FileMetaData {
uint64_t file_size; // File size in bytes
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table
// TIMESTAMP oldest_ts;
// TIMESTAMP newer_ts;
};
class VersionEdit {

+ 6
- 5
db/version_set.cc Целия файл

@ -351,6 +351,7 @@ Status Version::Get(const ReadOptions& options, const LookupKey& k,
state->last_file_read = f;
state->last_file_read_level = level;
// if(state->stats->now_ts > f->newer_ts)return false;
state->s = state->vset->table_cache_->Get(*state->options, f->number,
f->file_size, state->ikey,
&state->saver, SaveValue);
@ -460,7 +461,7 @@ void Version::Unref() {
delete this;
}
}
//files_[level] 中存在至少一个文件的键范围与 [smallest_user_key, largest_user_key] 有重叠,那么 OverlapInLevel 就会返回 true。
bool Version::OverlapInLevel(int level, const Slice* smallest_user_key,
const Slice* largest_user_key) {
return SomeFileOverlapsRange(vset_->icmp_, (level > 0), files_[level],
@ -470,7 +471,7 @@ bool Version::OverlapInLevel(int level, const Slice* smallest_user_key,
int Version::PickLevelForMemTableOutput(const Slice& smallest_user_key,
const Slice& largest_user_key) {
int level = 0;
if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {
if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {//当与level0没有重叠时,直接选择压入其他层
// Push to next level if there is no overlap in next level,
// and the #bytes overlapping in the level after that are limited.
InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
@ -664,7 +665,7 @@ class VersionSet::Builder {
if (f->allowed_seeks < 100) f->allowed_seeks = 100;
levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
levels_[level].added_files->insert(f);//按照最小key排序
}
}
@ -795,7 +796,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v);
Finalize(v);//对于每层compact打分
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
@ -820,7 +821,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
// Write new record to MANIFEST log
if (s.ok()) {
std::string record;
edit->EncodeTo(&record);
edit->EncodeTo(&record);// TODO:修改
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync();

+ 1
- 0
db/version_set.h Целия файл

@ -62,6 +62,7 @@ class Version {
struct GetStats {
FileMetaData* seek_file;
int seek_file_level;
TIMESTAMP now_ts;
};
// Append to *iters a sequence of iterators that will

+ 1
- 1
db/write_batch.cc Целия файл

@ -99,7 +99,7 @@ void WriteBatch::Put(const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
PutLengthPrefixedSlice(&rep_, value);//默认value的最大字节长度不会超过变长32位的表示。
}
void WriteBatch::Delete(const Slice& key) {

+ 7
- 1
table/table_builder.cc Целия файл

@ -31,7 +31,8 @@ struct TableBuilder::Rep {
filter_block(opt.filter_policy == nullptr
? nullptr
: new FilterBlockBuilder(opt.filter_policy)),
pending_index_entry(false) {
pending_index_entry(false),
has_ttl_filter_(false){
index_block_options.block_restart_interval = 1;
}
@ -57,6 +58,7 @@ struct TableBuilder::Rep {
//
// Invariant: r->pending_index_entry is true only if data_block is empty.
bool pending_index_entry;
bool has_ttl_filter_;
BlockHandle pending_handle; // Handle to add to index block
std::string compressed_output;
@ -110,8 +112,12 @@ void TableBuilder::Add(const Slice& key, const Slice& value) {
if (r->filter_block != nullptr) {
r->filter_block->AddKey(key);
// if(r->has_ttl_filter_){
//
// }
}
r->last_key.assign(key.data(), key.size());
r->num_entries++;
r->data_block.Add(key, value);

+ 38
- 38
util/comparator.cc Целия файл

@ -18,53 +18,53 @@ namespace leveldb {
Comparator::~Comparator() = default;
namespace {
class BytewiseComparatorImpl : public Comparator {
public:
BytewiseComparatorImpl() = default;
class BytewiseComparatorImpl : public Comparator {
public:
BytewiseComparatorImpl() = default;
const char* Name() const override { return "leveldb.BytewiseComparator"; }
const char* Name() const override { return "leveldb.BytewiseComparator"; }
int Compare(const Slice& a, const Slice& b) const override {
return a.compare(b);
}
void FindShortestSeparator(std::string* start,
const Slice& limit) const override {
// Find length of common prefix
size_t min_length = std::min(start->size(), limit.size());
size_t diff_index = 0;
while ((diff_index < min_length) &&
((*start)[diff_index] == limit[diff_index])) {
diff_index++;
int Compare(const Slice& a, const Slice& b) const override {
return a.compare(b);
}
//最终生成的 start 将是一个有效的、更短的键,仍然在 limit 范围内;能够这么运行的前提:数据按key有序输入
void FindShortestSeparator(std::string* start,
const Slice& limit) const override {
// Find length of common prefix
size_t min_length = std::min(start->size(), limit.size());
size_t diff_index = 0;
while ((diff_index < min_length) &&
((*start)[diff_index] == limit[diff_index])) {
diff_index++;
}
if (diff_index >= min_length) {
// Do not shorten if one string is a prefix of the other
} else {
uint8_t diff_byte = static_cast<uint8_t>((*start)[diff_index]);
if (diff_byte < static_cast<uint8_t>(0xff) &&
diff_byte + 1 < static_cast<uint8_t>(limit[diff_index])) {
(*start)[diff_index]++;
start->resize(diff_index + 1);
assert(Compare(*start, limit) < 0);
if (diff_index >= min_length) {
// Do not shorten if one string is a prefix of the other
} else {
uint8_t diff_byte = static_cast<uint8_t>((*start)[diff_index]);
if (diff_byte < static_cast<uint8_t>(0xff) &&
diff_byte + 1 < static_cast<uint8_t>(limit[diff_index])) {
(*start)[diff_index]++;
start->resize(diff_index + 1);
assert(Compare(*start, limit) < 0);
}
}
}
}
void FindShortSuccessor(std::string* key) const override {
// Find first character that can be incremented
size_t n = key->size();
for (size_t i = 0; i < n; i++) {
const uint8_t byte = (*key)[i];
if (byte != static_cast<uint8_t>(0xff)) {
(*key)[i] = byte + 1;
key->resize(i + 1);
return;
void FindShortSuccessor(std::string* key) const override {
// Find first character that can be incremented
size_t n = key->size();
for (size_t i = 0; i < n; i++) {
const uint8_t byte = (*key)[i];
if (byte != static_cast<uint8_t>(0xff)) {
(*key)[i] = byte + 1;
key->resize(i + 1);
return;
}
}
// *key is a run of 0xffs. Leave it alone.
}
// *key is a run of 0xffs. Leave it alone.
}
};
};
} // namespace
const Comparator* BytewiseComparator() {

Зареждане…
Отказ
Запис