浏览代码

Optimize leveldb block seeks to utilize the current iterator location.

This is beneficial when iterators are reused and seeks are not random
but increasing. It is additionally beneficial with larger block sizes and keys with common prefixes.

Add a benchmark "seekordered" to db_bench that reuses iterators across
increasing seeks.  Add support to the benchmark to count comparisons made and to support common key prefix length. Change benchmark random seeds to be reproducible for entire benchmark suite executions but unique for threads in different benchmarks runs.  This changes a benchmark suite of readrandom,seekrandom from having a 100% found ratio as previously it had the same seed used for fillrandom.

./db_bench --benchmarks=fillrandom,compact,seekordered --block_size=262144 --comparisons=1 --key_prefix=100

without this change (though with benchmark changes):
seekrandom   :      55.309 micros/op; (631820 of 1000000 found)
Comparisons: 27001049
seekordered  :       1.732 micros/op; (631882 of 1000000 found)
Comparisons: 26998402

with this change:
seekrandom   :      55.866 micros/op; (631820 of 1000000 found)
Comparisons: 26952143
seekordered  :       1.686 micros/op; (631882 of 1000000 found)
Comparisons: 25549369

For ordered seeking, this is a reduction of 5% comparisons and a 3% speedup. For random seeking (with single use iterators) the comparisons and speed are less than 1% and likely noise.

PiperOrigin-RevId: 351149832
main
leveldb Team 3 年前
committed by Victor Costan
父节点
当前提交
8cce47e450
共有 3 个文件被更改,包括 221 次插入35 次删除
  1. +134
    -34
      benchmarks/db_bench.cc
  2. +61
    -0
      db/db_test.cc
  3. +26
    -1
      table/block.cc

+ 134
- 34
benchmarks/db_bench.cc 查看文件

@ -4,10 +4,12 @@
#include <sys/types.h>
#include <atomic>
#include <cstdio>
#include <cstdlib>
#include "leveldb/cache.h"
#include "leveldb/comparator.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/filter_policy.h"
@ -34,6 +36,7 @@
// readmissing -- read N missing keys in random order
// readhot -- read N times in random order from 1% section of DB
// seekrandom -- N random seeks
// seekordered -- N ordered seeks
// open -- cost of opening a DB
// crc32c -- repeated crc32c of 4K of data
// Meta operations:
@ -78,6 +81,9 @@ static double FLAGS_compression_ratio = 0.5;
// Print histogram of operation timings
static bool FLAGS_histogram = false;
// Count the number of string comparisons performed
static bool FLAGS_comparisons = false;
// Number of bytes to buffer in memtable before compacting
// (initialized to default value by "main")
static int FLAGS_write_buffer_size = 0;
@ -101,6 +107,9 @@ static int FLAGS_open_files = 0;
// Negative means use default settings.
static int FLAGS_bloom_bits = -1;
// Common key prefix length.
static int FLAGS_key_prefix = 0;
// If true, do not destroy the existing database. If you set this
// flag and also specify a benchmark that wants a fresh database, that
// benchmark will fail.
@ -117,6 +126,33 @@ namespace leveldb {
namespace {
leveldb::Env* g_env = nullptr;
class CountComparator : public Comparator {
public:
CountComparator(const Comparator* wrapped) : wrapped_(wrapped) {}
~CountComparator() override {}
int Compare(const Slice& a, const Slice& b) const {
count_.fetch_add(1, std::memory_order_relaxed);
return wrapped_->Compare(a, b);
}
const char* Name() const override { return wrapped_->Name(); }
void FindShortestSeparator(std::string* start,
const Slice& limit) const override {
wrapped_->FindShortestSeparator(start, limit);
}
void FindShortSuccessor(std::string* key) const override {
return wrapped_->FindShortSuccessor(key);
}
size_t comparisons() const { return count_.load(std::memory_order_relaxed); }
void reset() { count_.store(0, std::memory_order_relaxed); }
private:
mutable std::atomic<size_t> count_ = 0;
const Comparator* const wrapped_;
};
// Helper for quickly generating random data.
class RandomGenerator {
private:
@ -149,6 +185,26 @@ class RandomGenerator {
}
};
class KeyBuffer {
public:
KeyBuffer() {
assert(FLAGS_key_prefix < sizeof(buffer_));
memset(buffer_, 'a', FLAGS_key_prefix);
}
KeyBuffer& operator=(KeyBuffer& other) = delete;
KeyBuffer(KeyBuffer& other) = delete;
void Set(int k) {
std::snprintf(buffer_ + FLAGS_key_prefix,
sizeof(buffer_) - FLAGS_key_prefix, "%016d", k);
}
Slice slice() const { return Slice(buffer_, FLAGS_key_prefix + 16); }
private:
char buffer_[1024];
};
#if defined(__linux)
static Slice TrimSpace(Slice s) {
size_t start = 0;
@ -305,7 +361,7 @@ struct ThreadState {
Stats stats;
SharedState* shared;
ThreadState(int index) : tid(index), rand(1000 + index), shared(nullptr) {}
ThreadState(int index, int seed) : tid(index), rand(seed), shared(nullptr) {}
};
} // namespace
@ -321,9 +377,11 @@ class Benchmark {
WriteOptions write_options_;
int reads_;
int heap_counter_;
CountComparator count_comparator_;
int total_thread_count_;
void PrintHeader() {
const int kKeySize = 16;
const int kKeySize = 16 + FLAGS_key_prefix;
PrintEnvironment();
std::fprintf(stdout, "Keys: %d bytes each\n", kKeySize);
std::fprintf(
@ -411,7 +469,9 @@ class Benchmark {
value_size_(FLAGS_value_size),
entries_per_batch_(1),
reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
heap_counter_(0) {
heap_counter_(0),
count_comparator_(BytewiseComparator()),
total_thread_count_(0) {
std::vector<std::string> files;
g_env->GetChildren(FLAGS_db, &files);
for (size_t i = 0; i < files.size(); i++) {
@ -494,6 +554,8 @@ class Benchmark {
method = &Benchmark::ReadMissing;
} else if (name == Slice("seekrandom")) {
method = &Benchmark::SeekRandom;
} else if (name == Slice("seekordered")) {
method = &Benchmark::SeekOrdered;
} else if (name == Slice("readhot")) {
method = &Benchmark::ReadHot;
} else if (name == Slice("readrandomsmall")) {
@ -591,7 +653,11 @@ class Benchmark {
arg[i].bm = this;
arg[i].method = method;
arg[i].shared = &shared;
arg[i].thread = new ThreadState(i);
++total_thread_count_;
// Seed the thread's random state deterministically based upon thread
// creation across all benchmarks. This ensures that the seeds are unique
// but reproducible when rerunning the same set of benchmarks.
arg[i].thread = new ThreadState(i, /*seed=*/1000 + total_thread_count_);
arg[i].thread->shared = &shared;
g_env->StartThread(ThreadBody, &arg[i]);
}
@ -612,6 +678,11 @@ class Benchmark {
arg[0].thread->stats.Merge(arg[i].thread->stats);
}
arg[0].thread->stats.Report(name);
if (FLAGS_comparisons) {
fprintf(stdout, "Comparisons: %ld\n", count_comparator_.comparisons());
count_comparator_.reset();
fflush(stdout);
}
for (int i = 0; i < n; i++) {
delete arg[i].thread;
@ -694,6 +765,9 @@ class Benchmark {
options.write_buffer_size = FLAGS_write_buffer_size;
options.max_file_size = FLAGS_max_file_size;
options.block_size = FLAGS_block_size;
if (FLAGS_comparisons) {
options.comparator = &count_comparator_;
}
options.max_open_files = FLAGS_open_files;
options.filter_policy = filter_policy_;
options.reuse_logs = FLAGS_reuse_logs;
@ -727,14 +801,14 @@ class Benchmark {
WriteBatch batch;
Status s;
int64_t bytes = 0;
KeyBuffer key;
for (int i = 0; i < num_; i += entries_per_batch_) {
batch.Clear();
for (int j = 0; j < entries_per_batch_; j++) {
const int k = seq ? i + j : (thread->rand.Next() % FLAGS_num);
char key[100];
std::snprintf(key, sizeof(key), "%016d", k);
batch.Put(key, gen.Generate(value_size_));
bytes += value_size_ + strlen(key);
const int k = seq ? i + j : thread->rand.Uniform(FLAGS_num);
key.Set(k);
batch.Put(key.slice(), gen.Generate(value_size_));
bytes += value_size_ + key.slice().size();
thread->stats.FinishedSingleOp();
}
s = db_->Write(write_options_, &batch);
@ -776,11 +850,11 @@ class Benchmark {
ReadOptions options;
std::string value;
int found = 0;
KeyBuffer key;
for (int i = 0; i < reads_; i++) {
char key[100];
const int k = thread->rand.Next() % FLAGS_num;
std::snprintf(key, sizeof(key), "%016d", k);
if (db_->Get(options, key, &value).ok()) {
const int k = thread->rand.Uniform(FLAGS_num);
key.Set(k);
if (db_->Get(options, key.slice(), &value).ok()) {
found++;
}
thread->stats.FinishedSingleOp();
@ -793,11 +867,12 @@ class Benchmark {
void ReadMissing(ThreadState* thread) {
ReadOptions options;
std::string value;
KeyBuffer key;
for (int i = 0; i < reads_; i++) {
char key[100];
const int k = thread->rand.Next() % FLAGS_num;
std::snprintf(key, sizeof(key), "%016d.", k);
db_->Get(options, key, &value);
const int k = thread->rand.Uniform(FLAGS_num);
key.Set(k);
Slice s = Slice(key.slice().data(), key.slice().size() - 1);
db_->Get(options, s, &value);
thread->stats.FinishedSingleOp();
}
}
@ -806,11 +881,11 @@ class Benchmark {
ReadOptions options;
std::string value;
const int range = (FLAGS_num + 99) / 100;
KeyBuffer key;
for (int i = 0; i < reads_; i++) {
char key[100];
const int k = thread->rand.Next() % range;
std::snprintf(key, sizeof(key), "%016d", k);
db_->Get(options, key, &value);
const int k = thread->rand.Uniform(range);
key.Set(k);
db_->Get(options, key.slice(), &value);
thread->stats.FinishedSingleOp();
}
}
@ -818,17 +893,36 @@ class Benchmark {
void SeekRandom(ThreadState* thread) {
ReadOptions options;
int found = 0;
KeyBuffer key;
for (int i = 0; i < reads_; i++) {
Iterator* iter = db_->NewIterator(options);
char key[100];
const int k = thread->rand.Next() % FLAGS_num;
std::snprintf(key, sizeof(key), "%016d", k);
iter->Seek(key);
if (iter->Valid() && iter->key() == key) found++;
const int k = thread->rand.Uniform(FLAGS_num);
key.Set(k);
iter->Seek(key.slice());
if (iter->Valid() && iter->key() == key.slice()) found++;
delete iter;
thread->stats.FinishedSingleOp();
}
char msg[100];
snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
thread->stats.AddMessage(msg);
}
void SeekOrdered(ThreadState* thread) {
ReadOptions options;
Iterator* iter = db_->NewIterator(options);
int found = 0;
int k = 0;
KeyBuffer key;
for (int i = 0; i < reads_; i++) {
k = (k + (thread->rand.Uniform(100))) % FLAGS_num;
key.Set(k);
iter->Seek(key.slice());
if (iter->Valid() && iter->key() == key.slice()) found++;
thread->stats.FinishedSingleOp();
}
delete iter;
char msg[100];
std::snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_);
thread->stats.AddMessage(msg);
}
@ -837,13 +931,13 @@ class Benchmark {
RandomGenerator gen;
WriteBatch batch;
Status s;
KeyBuffer key;
for (int i = 0; i < num_; i += entries_per_batch_) {
batch.Clear();
for (int j = 0; j < entries_per_batch_; j++) {
const int k = seq ? i + j : (thread->rand.Next() % FLAGS_num);
char key[100];
std::snprintf(key, sizeof(key), "%016d", k);
batch.Delete(key);
const int k = seq ? i + j : (thread->rand.Uniform(FLAGS_num));
key.Set(k);
batch.Delete(key.slice());
thread->stats.FinishedSingleOp();
}
s = db_->Write(write_options_, &batch);
@ -864,6 +958,7 @@ class Benchmark {
} else {
// Special thread that keeps writing until other threads are done.
RandomGenerator gen;
KeyBuffer key;
while (true) {
{
MutexLock l(&thread->shared->mu);
@ -873,10 +968,10 @@ class Benchmark {
}
}
const int k = thread->rand.Next() % FLAGS_num;
char key[100];
std::snprintf(key, sizeof(key), "%016d", k);
class="n">Status class="n">s = db_->Put(write_options_, key, gen.Generate(value_size_));
const int k = thread->rand.Uniform(FLAGS_num);
key.Set(k);
Status s =
db_->Put(write_options_, key.slice(), gen.Generate(value_size_));
if (!s.ok()) {
std::fprintf(stderr, "put error: %s\n", s.ToString().c_str());
std::exit(1);
@ -941,6 +1036,9 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--histogram=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
FLAGS_histogram = n;
} else if (sscanf(argv[i], "--comparisons=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
FLAGS_comparisons = n;
} else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
FLAGS_use_existing_db = n;
@ -961,6 +1059,8 @@ int main(int argc, char** argv) {
FLAGS_max_file_size = n;
} else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) {
FLAGS_block_size = n;
} else if (sscanf(argv[i], "--key_prefix=%d%c", &n, &junk) == 1) {
FLAGS_key_prefix = n;
} else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) {
FLAGS_cache_size = n;
} else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) {

+ 61
- 0
db/db_test.cc 查看文件

@ -965,6 +965,26 @@ TEST_F(DBTest, IterMultiWithDelete) {
} while (ChangeOptions());
}
TEST_F(DBTest, IterMultiWithDeleteAndCompaction) {
do {
ASSERT_LEVELDB_OK(Put("b", "vb"));
ASSERT_LEVELDB_OK(Put("c", "vc"));
ASSERT_LEVELDB_OK(Put("a", "va"));
dbfull()->TEST_CompactMemTable();
ASSERT_LEVELDB_OK(Delete("b"));
ASSERT_EQ("NOT_FOUND", Get("b"));
Iterator* iter = db_->NewIterator(ReadOptions());
iter->Seek("c");
ASSERT_EQ(IterStatus(iter), "c->vc");
iter->Prev();
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Seek("b");
ASSERT_EQ(IterStatus(iter), "c->vc");
delete iter;
} while (ChangeOptions());
}
TEST_F(DBTest, Recover) {
do {
ASSERT_LEVELDB_OK(Put("foo", "v1"));
@ -2132,6 +2152,9 @@ static bool CompareIterators(int step, DB* model, DB* db,
Iterator* dbiter = db->NewIterator(options);
bool ok = true;
int count = 0;
std::vector<std::string> seek_keys;
// Compare equality of all elements using Next(). Save some of the keys for
// comparing Seek equality.
for (miter->SeekToFirst(), dbiter->SeekToFirst();
ok && miter->Valid() && dbiter->Valid(); miter->Next(), dbiter->Next()) {
count++;
@ -2150,6 +2173,11 @@ static bool CompareIterators(int step, DB* model, DB* db,
EscapeString(miter->value()).c_str(),
EscapeString(miter->value()).c_str());
ok = false;
break;
}
if (count % 10 == 0) {
seek_keys.push_back(miter->key().ToString());
}
}
@ -2160,6 +2188,39 @@ static bool CompareIterators(int step, DB* model, DB* db,
ok = false;
}
}
if (ok) {
// Validate iterator equality when performing seeks.
for (auto kiter = seek_keys.begin(); ok && kiter != seek_keys.end();
++kiter) {
miter->Seek(*kiter);
dbiter->Seek(*kiter);
if (!miter->Valid() || !dbiter->Valid()) {
std::fprintf(stderr, "step %d: Seek iterators invalid: %d vs. %d\n",
step, miter->Valid(), dbiter->Valid());
ok = false;
}
if (miter->key().compare(dbiter->key()) != 0) {
std::fprintf(stderr, "step %d: Seek key mismatch: '%s' vs. '%s'\n",
step, EscapeString(miter->key()).c_str(),
EscapeString(dbiter->key()).c_str());
ok = false;
break;
}
if (miter->value().compare(dbiter->value()) != 0) {
std::fprintf(
stderr,
"step %d: Seek value mismatch for key '%s': '%s' vs. '%s'\n", step,
EscapeString(miter->key()).c_str(),
EscapeString(miter->value()).c_str(),
EscapeString(miter->value()).c_str());
ok = false;
break;
}
}
}
std::fprintf(stderr, "%d entries compared: ok=%d\n", count, ok);
delete miter;
delete dbiter;

+ 26
- 1
table/block.cc 查看文件

@ -166,6 +166,24 @@ class Block::Iter : public Iterator {
// with a key < target
uint32_t left = 0;
uint32_t right = num_restarts_ - 1;
int current_key_compare = 0;
if (Valid()) {
// If we're already scanning, use the current position as a starting
// point. This is beneficial if the key we're seeking to is ahead of the
// current position.
current_key_compare = Compare(key_, target);
if (current_key_compare < 0) {
// key_ is smaller than target
left = restart_index_;
} else if (current_key_compare > 0) {
right = restart_index_;
} else {
// We're seeking to the key we're already at.
return;
}
}
while (left < right) {
uint32_t mid = (left + right + 1) / 2;
uint32_t region_offset = GetRestartPoint(mid);
@ -189,8 +207,15 @@ class Block::Iter : public Iterator {
}
}
// We might be able to use our current position within the restart block.
// This is true if we determined the key we desire is in the current block
// and is after than the current key.
assert(current_key_compare == 0 || Valid());
bool skip_seek = left == restart_index_ && current_key_compare < 0;
if (!skip_seek) {
SeekToRestartPoint(left);
}
// Linear search (within restart block) for first key >= target
SeekToRestartPoint(left);
while (true) {
if (!ParseNextKey()) {
return;

正在加载...
取消
保存