Browse Source

add some judge; modify decode method; merge lzj_version 241102

main
郭夏辉 2 weeks ago
parent
commit
7b1b5f8103
11 changed files with 127 additions and 67 deletions
  1. +7
    -1
      db/builder.cc
  2. +19
    -10
      db/db_impl.cc
  3. +1
    -1
      db/db_impl.h
  4. +3
    -3
      db/dbformat.h
  5. +19
    -2
      db/memtable.cc
  6. +10
    -9
      db/version_set.cc
  7. +1
    -0
      db/version_set.h
  8. +1
    -1
      db/write_batch.cc
  9. +21
    -1
      table/block.cc
  10. +7
    -1
      table/table_builder.cc
  11. +38
    -38
      util/comparator.cc

+ 7
- 1
db/builder.cc View File

@ -29,11 +29,17 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
}
TableBuilder* builder = new TableBuilder(options, file);
meta->smallest.DecodeFrom(iter->key());
meta->smallest.DecodeFrom(iter->key());//这里是internal_key
// auto tmp_ts = DecodeFixed64(iter->value().data() + iter->value().size() - kTSLength);
// meta->oldest_ts = tmp_ts;
// meta->newer_ts = tmp_ts;
Slice key;
for (; iter->Valid(); iter->Next()) {
key = iter->key();
builder->Add(key, iter->value());
// tmp_ts = DecodeFixed64(iter->value().data() + iter->value().size() - kTSLength);
// meta->oldest_ts = meta->oldest_ts > tmp_ts ? tmp_ts : meta->oldest_ts;
// meta->newer_ts = meta->newer_ts > tmp_ts ? meta->newer_ts : tmp_ts;
}
if (!key.empty()) {
meta->largest.DecodeFrom(key);

+ 19
- 10
db/db_impl.cc View File

@ -510,7 +510,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
Status s;
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);//meta包含largest_key,此时未刷盘meta,但meta.number对应生成的file会刷盘
mutex_.Lock();
}
@ -588,6 +588,7 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
for (int level = 0; level < max_level_with_files; level++) {
TEST_CompactRange(level, begin, end);
}
TEST_CompactRange(max_level_with_files, begin, end);
}
void DBImpl::TEST_CompactRange(int level, const Slice* begin,
@ -618,7 +619,7 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,
bg_error_.ok()) {
if (manual_compaction_ == nullptr) { // Idle
manual_compaction_ = &manual;
MaybeScheduleCompaction();
MaybeScheduleCompaction();//有可能寻址过多,导致allow_seek为0,触发合并。
} else { // Running either my compaction or another compaction.
background_work_finished_signal_.Wait();
}
@ -1166,7 +1167,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
auto t2 = GetTS(value);
if(t1 >= t2){
// 过期
s = Status::Expire("Expire",Slice());
s = Status::NotFound("NotFound",Slice());
} else {
// 没过期
*value = value->substr(0, value->size() - sizeof(uint64_t));
@ -1515,11 +1516,15 @@ void DBImpl::AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) {
* @param val
* @return timestamp in val,and remove timestamp from val
*/
uint64_t DBImpl::GetTS(const std::string* val) {
// 不用auto再写一下
uint64_t expiration_time;
memcpy(&expiration_time, val->data() + val->size() - sizeof(uint64_t), sizeof(uint64_t));
return expiration_time;
uint64_t DBImpl::GetTS(std::string* val) {
// 不用auto再写一下 老逻辑:
// uint64_t expiration_time;
// memcpy(&expiration_time, val->data() + val->size() - sizeof(uint64_t), sizeof(uint64_t));
// return expiration_time;
// 新逻辑:
auto expiration_time = DecodeFixed64(val->data() + val->size() - kTSLength);
val->resize(val->size() - kTSLength);
return expiration_time;
}
// Default implementations of convenience methods that subclasses of DB
@ -1535,14 +1540,18 @@ Status DB::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) {
// 将 value 和 expiration_time 合并到一起,形成带 TTL 的 value
std::string val_with_ts;
val_with_ts.reserve(value.size() + sizeof(uint64_t));
val_with_ts.reserve(value.size() + kTSLength);
uint64_t expiration_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count() + ttl * 1000;
char ts_string[kTSLength];
EncodeFixed64(ts_string, expiration_time);
// 追加原始 value 到 val_with_ts
val_with_ts.append(value.data(), value.size());
// 将 expiration_time 追加到 val_with_ts
val_with_ts.append(reinterpret_cast<const char*>(&expiration_time), sizeof(expiration_time));
// val_with_ts.append(reinterpret_cast<const char*>(&expiration_time), sizeof(expiration_time));
val_with_ts.append(ts_string, kTSLength);
// std::cout<<"PUT"<<std::endl;
// std::cout << "timestamp: " << expiration_time << std::endl;
//"a\323='\277\222\001\000"

+ 1
- 1
db/db_impl.h View File

@ -49,7 +49,7 @@ class DBImpl : public DB {
void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override;
void CompactRange(const Slice* begin, const Slice* end) override;
void AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl);
static uint64_t GetTS(const std::string* val);
static uint64_t GetTS(std::string* val);
// Extra methods (for testing) that are not in the public DB interface
// Compact any files in the named level that overlap [*begin,*end]

+ 3
- 3
db/dbformat.h View File

@ -26,15 +26,15 @@ static const int kNumLevels = 7;
// Level-0 compaction is started when we hit this many files.
// 4
static const int kL0_CompactionTrigger = 128;
static const int kL0_CompactionTrigger = 64;
// Soft limit on number of level-0 files. We slow down writes at this point.
// 8
static const int kL0_SlowdownWritesTrigger = 128;
static const int kL0_SlowdownWritesTrigger = 64;
// Maximum number of level-0 files. We stop writes at this point.
// 12
static const int kL0_StopWritesTrigger = 128;
static const int kL0_StopWritesTrigger = 64;
// Maximum level to which a new compacted memtable is pushed if it
// does not create overlap. We try to push to level 2 to avoid the

+ 19
- 2
db/memtable.cc View File

@ -81,7 +81,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
// key bytes : char[internal_key.size()]
// tag : uint64((sequence << 8) | type)
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
// value bytes : char[value.size()],[real_value,timestamp(uint64)]
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
@ -95,7 +95,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
EncodeFixed64(p, (s << 8) | type);
p += 8;
p = EncodeVarint32(p, val_size);
std::memcpy(p, value.data(), val_size);
std::memcpy(p, value.data(), val_size);//value包含timestamp,timestamp,8字节
assert(p + val_size == buf + encoded_len);
table_.Insert(buf);
}
@ -125,6 +125,23 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
case kTypeValue: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
// 这里新加了一些逻辑
if (value->size() >= sizeof(uint64_t)){
const char* ptr = value->data();
std::string temp_str(value->data(), value->size());
uint64_t expiration_time = DBImpl::GetTS(&temp_str);
uint64_t current_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
if (current_time > expiration_time) {
*s = Status::NotFound("NotFound",Slice());
}else {
*value = value->substr(0, value->size() - sizeof(uint64_t));
}
}
return true;
}
case kTypeDeletion:

+ 10
- 9
db/version_set.cc View File

@ -138,7 +138,7 @@ bool SomeFileOverlapsRange(const InternalKeyComparator& icmp,
return false;
}
// Binary search over file list
// Binary search over file list, 保证SSTable有序
uint32_t index = 0;
if (smallest_user_key != nullptr) {
// Find the earliest possible internal key for smallest_user_key
@ -351,6 +351,7 @@ Status Version::Get(const ReadOptions& options, const LookupKey& k,
state->last_file_read = f;
state->last_file_read_level = level;
// if(state->stats->now_ts > f->newer_ts)return false;
state->s = state->vset->table_cache_->Get(*state->options, f->number,
f->file_size, state->ikey,
&state->saver, SaveValue);
@ -460,7 +461,7 @@ void Version::Unref() {
delete this;
}
}
//files_[level] 中存在至少一个文件的键范围与 [smallest_user_key, largest_user_key] 有重叠,那么 OverlapInLevel 就会返回 true。
bool Version::OverlapInLevel(int level, const Slice* smallest_user_key,
const Slice* largest_user_key) {
return SomeFileOverlapsRange(vset_->icmp_, (level > 0), files_[level],
@ -470,7 +471,7 @@ bool Version::OverlapInLevel(int level, const Slice* smallest_user_key,
int Version::PickLevelForMemTableOutput(const Slice& smallest_user_key,
const Slice& largest_user_key) {
int level = 0;
if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {
if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {//当与level0没有重叠时,直接选择压入其他层
// Push to next level if there is no overlap in next level,
// and the #bytes overlapping in the level after that are limited.
InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
@ -664,7 +665,7 @@ class VersionSet::Builder {
if (f->allowed_seeks < 100) f->allowed_seeks = 100;
levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
levels_[level].added_files->insert(f);//按照最小key排序
}
}
@ -795,7 +796,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v);
Finalize(v);//对于每层compact打分
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
@ -820,7 +821,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
// Write new record to MANIFEST log
if (s.ok()) {
std::string record;
edit->EncodeTo(&record);
edit->EncodeTo(&record);// TODO:修改
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync();
@ -1386,7 +1387,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
const int level = c->level();
InternalKey smallest, largest;
AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]);
AddBoundaryInputs(icmp_, current_->files_[level], &c->inputs_[0]);//增加边界值,userkey相等的情况
GetRange(c->inputs_[0], &smallest, &largest);
current_->GetOverlappingInputs(level + 1, &smallest, &largest,
@ -1444,11 +1445,11 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
compact_pointer_[level] = largest.Encode().ToString();
c->edit_.SetCompactPointer(level, largest);
}
//得到基于当前level的所涉及的需要compact的文件,(level0可能会涉及到level1等)
Compaction* VersionSet::CompactRange(int level, const InternalKey* begin,
const InternalKey* end) {
std::vector<FileMetaData*> inputs;
current_->GetOverlappingInputs(level, begin, end, &inputs);
current_->GetOverlappingInputs(level, begin, end, &inputs);//得到一层的input
if (inputs.empty()) {
return nullptr;
}

+ 1
- 0
db/version_set.h View File

@ -62,6 +62,7 @@ class Version {
struct GetStats {
FileMetaData* seek_file;
int seek_file_level;
TIMESTAMP now_ts;
};
// Append to *iters a sequence of iterators that will

+ 1
- 1
db/write_batch.cc View File

@ -99,7 +99,7 @@ void WriteBatch::Put(const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
PutLengthPrefixedSlice(&rep_, value);//默认value的最大字节长度不会超过变长32位的表示。
}
void WriteBatch::Delete(const Slice& key) {

+ 21
- 1
table/block.cc View File

@ -220,7 +220,27 @@ class Block::Iter : public Iterator {
if (!ParseNextKey()) {
return;
}
if (Compare(key_, target) >= 0) {
if (Compare(key_, target) >= 0) { // 找到了key_
// 这里新加了一些逻辑
std::string value(value_.data(), value_.size());
if (value.size() >= sizeof(uint64_t)){
const char* ptr = value.data();
std::string temp_str(value.data(), value.size());
uint64_t expiration_time = DecodeFixed64((&temp_str)->data() + (&temp_str)->size() - sizeof(uint64_t));
(&temp_str)->resize((&temp_str)->size() - sizeof(uint64_t));
uint64_t current_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
if (current_time > expiration_time) {
status_ = Status::NotFound("NotFound",Slice());
}else {
value_ = value.substr(0, value.size() - sizeof(uint64_t));
}
}
return;
}
}

+ 7
- 1
table/table_builder.cc View File

@ -31,7 +31,8 @@ struct TableBuilder::Rep {
filter_block(opt.filter_policy == nullptr
? nullptr
: new FilterBlockBuilder(opt.filter_policy)),
pending_index_entry(false) {
pending_index_entry(false),
has_ttl_filter_(false){
index_block_options.block_restart_interval = 1;
}
@ -57,6 +58,7 @@ struct TableBuilder::Rep {
//
// Invariant: r->pending_index_entry is true only if data_block is empty.
bool pending_index_entry;
bool has_ttl_filter_;
BlockHandle pending_handle; // Handle to add to index block
std::string compressed_output;
@ -110,8 +112,12 @@ void TableBuilder::Add(const Slice& key, const Slice& value) {
if (r->filter_block != nullptr) {
r->filter_block->AddKey(key);
// if(r->has_ttl_filter_){
//
// }
}
r->last_key.assign(key.data(), key.size());
r->num_entries++;
r->data_block.Add(key, value);

+ 38
- 38
util/comparator.cc View File

@ -18,53 +18,53 @@ namespace leveldb {
Comparator::~Comparator() = default;
namespace {
class BytewiseComparatorImpl : public Comparator {
public:
BytewiseComparatorImpl() = default;
class BytewiseComparatorImpl : public Comparator {
public:
BytewiseComparatorImpl() = default;
const char* Name() const override { return "leveldb.BytewiseComparator"; }
const char* Name() const override { return "leveldb.BytewiseComparator"; }
int Compare(const Slice& a, const Slice& b) const override {
return a.compare(b);
}
void FindShortestSeparator(std::string* start,
const Slice& limit) const override {
// Find length of common prefix
size_t min_length = std::min(start->size(), limit.size());
size_t diff_index = 0;
while ((diff_index < min_length) &&
((*start)[diff_index] == limit[diff_index])) {
diff_index++;
int Compare(const Slice& a, const Slice& b) const override {
return a.compare(b);
}
//最终生成的 start 将是一个有效的、更短的键,仍然在 limit 范围内;能够这么运行的前提:数据按key有序输入
void FindShortestSeparator(std::string* start,
const Slice& limit) const override {
// Find length of common prefix
size_t min_length = std::min(start->size(), limit.size());
size_t diff_index = 0;
while ((diff_index < min_length) &&
((*start)[diff_index] == limit[diff_index])) {
diff_index++;
}
if (diff_index >= min_length) {
// Do not shorten if one string is a prefix of the other
} else {
uint8_t diff_byte = static_cast<uint8_t>((*start)[diff_index]);
if (diff_byte < static_cast<uint8_t>(0xff) &&
diff_byte + 1 < static_cast<uint8_t>(limit[diff_index])) {
(*start)[diff_index]++;
start->resize(diff_index + 1);
assert(Compare(*start, limit) < 0);
if (diff_index >= min_length) {
// Do not shorten if one string is a prefix of the other
} else {
uint8_t diff_byte = static_cast<uint8_t>((*start)[diff_index]);
if (diff_byte < static_cast<uint8_t>(0xff) &&
diff_byte + 1 < static_cast<uint8_t>(limit[diff_index])) {
(*start)[diff_index]++;
start->resize(diff_index + 1);
assert(Compare(*start, limit) < 0);
}
}
}
}
void FindShortSuccessor(std::string* key) const override {
// Find first character that can be incremented
size_t n = key->size();
for (size_t i = 0; i < n; i++) {
const uint8_t byte = (*key)[i];
if (byte != static_cast<uint8_t>(0xff)) {
(*key)[i] = byte + 1;
key->resize(i + 1);
return;
void FindShortSuccessor(std::string* key) const override {
// Find first character that can be incremented
size_t n = key->size();
for (size_t i = 0; i < n; i++) {
const uint8_t byte = (*key)[i];
if (byte != static_cast<uint8_t>(0xff)) {
(*key)[i] = byte + 1;
key->resize(i + 1);
return;
}
}
// *key is a run of 0xffs. Leave it alone.
}
// *key is a run of 0xffs. Leave it alone.
}
};
};
} // namespace
const Comparator* BytewiseComparator() {

Loading…
Cancel
Save