Przeglądaj źródła

Bugfix for issue 33; reduce lock contention in Get(), parallel benchmarks.

- Fix for issue 33 (non-null-terminated result from
  leveldb_property_value())

- Support for running multiple instances of a benchmark in parallel.

- Reduce lock contention on Get():
  (1) Do not hold the lock while searching memtables.
  (2) Shard block and table caches 16-ways.

  Benchmark for evaluating this change:
  $ db_bench --benchmarks=fillseq1,readrandom --threads=$n
  (fillseq1 is a small hack to make sure fillseq runs once regardless
  of number of threads specified on the command line).



git-svn-id: https://leveldb.googlecode.com/svn/trunk@49 62dab493-f737-651d-591e-8d6aee1b9529
main
gabor@google.com 13 lat temu
rodzic
commit
e3584f9c28
7 zmienionych plików z 510 dodań i 259 usunięć
  1. +2
    -1
      db/c.cc
  2. +351
    -179
      db/db_bench.cc
  3. +23
    -13
      db/db_impl.cc
  4. +94
    -55
      util/cache.cc
  5. +28
    -11
      util/cache_test.cc
  6. +11
    -0
      util/histogram.cc
  7. +1
    -0
      util/histogram.h

+ 2
- 1
db/c.cc Wyświetl plik

@ -196,7 +196,8 @@ char* leveldb_property_value(
const char* propname) {
std::string tmp;
if (db->rep->GetProperty(Slice(propname), &tmp)) {
return CopyString(tmp);
// We use strdup() since we expect human readable output.
return strdup(tmp.c_str());
} else {
return NULL;
}

+ 351
- 179
db/db_bench.cc Wyświetl plik

@ -14,6 +14,7 @@
#include "port/port.h"
#include "util/crc32c.h"
#include "util/histogram.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "util/testutil.h"
@ -60,6 +61,9 @@ static int FLAGS_num = 1000000;
// Number of read operations to do. If negative, do FLAGS_num reads.
static int FLAGS_reads = -1;
// Number of concurrent threads to run.
static int FLAGS_threads = 1;
// Size of each value
static int FLAGS_value_size = 100;
@ -91,8 +95,9 @@ static const char* FLAGS_db = "/tmp/dbbench";
namespace leveldb {
// Helper for quickly generating random data.
namespace {
// Helper for quickly generating random data.
class RandomGenerator {
private:
std::string data_;
@ -136,6 +141,152 @@ static Slice TrimSpace(Slice s) {
return Slice(s.data() + start, limit - start);
}
static void AppendWithSpace(std::string* str, Slice msg) {
if (msg.empty()) return;
if (!str->empty()) {
str->push_back(' ');
}
str->append(msg.data(), msg.size());
}
class Stats {
private:
double start_;
double finish_;
double seconds_;
int done_;
int next_report_;
int64_t bytes_;
double last_op_finish_;
Histogram hist_;
std::string message_;
public:
Stats() { Start(); }
void Start() {
next_report_ = 100;
last_op_finish_ = start_;
hist_.Clear();
done_ = 0;
bytes_ = 0;
seconds_ = 0;
start_ = Env::Default()->NowMicros();
finish_ = start_;
message_.clear();
}
void Merge(const Stats& other) {
hist_.Merge(other.hist_);
done_ += other.done_;
bytes_ += other.bytes_;
seconds_ += other.seconds_;
if (other.start_ < start_) start_ = other.start_;
if (other.finish_ > finish_) finish_ = other.finish_;
// Just keep the messages from one thread
if (message_.empty()) message_ = other.message_;
}
void Stop() {
finish_ = Env::Default()->NowMicros();
seconds_ = (finish_ - start_) * 1e-6;
}
void AddMessage(Slice msg) {
AppendWithSpace(&message_, msg);
}
void FinishedSingleOp() {
if (FLAGS_histogram) {
double now = Env::Default()->NowMicros();
double micros = now - last_op_finish_;
hist_.Add(micros);
if (micros > 20000) {
fprintf(stderr, "long op: %.1f micros%30s\r", micros, "");
fflush(stderr);
}
last_op_finish_ = now;
}
done_++;
if (done_ >= next_report_) {
if (next_report_ < 1000) next_report_ += 100;
else if (next_report_ < 5000) next_report_ += 500;
else if (next_report_ < 10000) next_report_ += 1000;
else if (next_report_ < 50000) next_report_ += 5000;
else if (next_report_ < 100000) next_report_ += 10000;
else if (next_report_ < 500000) next_report_ += 50000;
else next_report_ += 100000;
fprintf(stderr, "... finished %d ops%30s\r", done_, "");
fflush(stderr);
}
}
void AddBytes(int64_t n) {
bytes_ += n;
}
void Report(const Slice& name) {
// Pretend at least one op was done in case we are running a benchmark
// that does not call FinishedSingleOp().
if (done_ < 1) done_ = 1;
std::string extra;
if (bytes_ > 0) {
// Rate is computed on actual elapsed time, not the sum of per-thread
// elapsed times.
double elapsed = (finish_ - start_) * 1e-6;
char rate[100];
snprintf(rate, sizeof(rate), "%6.1f MB/s",
(bytes_ / 1048576.0) / elapsed);
extra = rate;
}
AppendWithSpace(&extra, message_);
fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n",
name.ToString().c_str(),
seconds_ * 1e6 / done_,
(extra.empty() ? "" : " "),
extra.c_str());
if (FLAGS_histogram) {
fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
}
fflush(stdout);
}
};
// State shared by all concurrent executions of the same benchmark.
struct SharedState {
port::Mutex mu;
port::CondVar cv;
int total;
// Each thread goes through the following states:
// (1) initializing
// (2) waiting for others to be initialized
// (3) running
// (4) done
int num_initialized;
int num_done;
bool start;
SharedState() : cv(&mu) { }
};
// Per-thread state for concurrent executions of the same benchmark.
struct ThreadState {
int tid; // 0..n-1 when running in n threads
Random rand; // Has different seeds for different threads
Stats stats;
ThreadState(int index)
: tid(index),
rand(1000 + index) {
}
};
}
class Benchmark {
@ -143,20 +294,11 @@ class Benchmark {
Cache* cache_;
DB* db_;
int num_;
int value_size_;
int entries_per_batch_;
WriteOptions write_options_;
int reads_;
int heap_counter_;
double start_;
double last_op_finish_;
int64_t bytes_;
std::string message_;
std::string post_message_;
Histogram hist_;
RandomGenerator gen_;
Random rand_;
// State kept for progress messages
int done_;
int next_report_; // When to report next
void PrintHeader() {
const int kKeySize = 16;
@ -232,94 +374,15 @@ class Benchmark {
#endif
}
void Start() {
start_ = Env::Default()->NowMicros() * 1e-6;
bytes_ = 0;
message_.clear();
last_op_finish_ = start_;
hist_.Clear();
done_ = 0;
next_report_ = 100;
}
void FinishedSingleOp() {
if (FLAGS_histogram) {
double now = Env::Default()->NowMicros() * 1e-6;
double micros = (now - last_op_finish_) * 1e6;
hist_.Add(micros);
if (micros > 20000) {
fprintf(stderr, "long op: %.1f micros%30s\r", micros, "");
fflush(stderr);
}
last_op_finish_ = now;
}
done_++;
if (done_ >= next_report_) {
if (next_report_ < 1000) next_report_ += 100;
else if (next_report_ < 5000) next_report_ += 500;
else if (next_report_ < 10000) next_report_ += 1000;
else if (next_report_ < 50000) next_report_ += 5000;
else if (next_report_ < 100000) next_report_ += 10000;
else if (next_report_ < 500000) next_report_ += 50000;
else next_report_ += 100000;
fprintf(stderr, "... finished %d ops%30s\r", done_, "");
fflush(stderr);
}
}
void Stop(const Slice& name) {
double finish = Env::Default()->NowMicros() * 1e-6;
// Pretend at least one op was done in case we are running a benchmark
// that does nto call FinishedSingleOp().
if (done_ < 1) done_ = 1;
if (bytes_ > 0) {
char rate[100];
snprintf(rate, sizeof(rate), "%6.1f MB/s",
(bytes_ / 1048576.0) / (finish - start_));
if (!message_.empty()) {
message_ = std::string(rate) + " " + message_;
} else {
message_ = rate;
}
}
fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n",
name.ToString().c_str(),
(finish - start_) * 1e6 / done_,
(message_.empty() ? "" : " "),
message_.c_str());
if (FLAGS_histogram) {
fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
}
fflush(stdout);
if (!post_message_.empty()) {
fprintf(stdout, "\n%s\n", post_message_.c_str());
post_message_.clear();
}
}
public:
enum Order {
SEQUENTIAL,
RANDOM
};
enum DBState {
FRESH,
EXISTING
};
Benchmark()
: cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : NULL),
db_(NULL),
num_(FLAGS_num),
value_size_(FLAGS_value_size),
entries_per_batch_(1),
reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
heap_counter_(0),
bytes_(0),
rand_(301) {
heap_counter_(0) {
std::vector<std::string> files;
Env::Default()->GetChildren(FLAGS_db, &files);
for (int i = 0; i < files.size(); i++) {
@ -353,98 +416,203 @@ class Benchmark {
benchmarks = sep + 1;
}
Start();
// Reset parameters that may be overriddden bwlow
num_ = FLAGS_num;
reads_ = num_;
value_size_ = FLAGS_value_size;
entries_per_batch_ = 1;
write_options_ = WriteOptions();
void (Benchmark::*method)(ThreadState*) = NULL;
bool fresh_db = false;
WriteOptions write_options;
bool known = true;
if (name == Slice("fillseq")) {
Write(write_options, SEQUENTIAL, FRESH, num_, FLAGS_value_size, 1);
fresh_db = true;
method = &Benchmark::WriteSeq;
} else if (name == Slice("fillbatch")) {
Write(write_options, SEQUENTIAL, FRESH, num_, FLAGS_value_size, 1000);
fresh_db = true;
entries_per_batch_ = 1000;
method = &Benchmark::WriteSeq;
} else if (name == Slice("fillrandom")) {
Write(write_options, RANDOM, FRESH, num_, FLAGS_value_size, 1);
fresh_db = true;
method = &Benchmark::WriteRandom;
} else if (name == Slice("overwrite")) {
Write(write_options, RANDOM, EXISTING, num_, FLAGS_value_size, 1);
fresh_db = false;
method = &Benchmark::WriteRandom;
} else if (name == Slice("fillsync")) {
write_options.sync = true;
Write(write_options, RANDOM, FRESH, num_ / 1000, FLAGS_value_size, 1);
fresh_db = true;
num_ /= 1000;
write_options_.sync = true;
method = &Benchmark::WriteRandom;
} else if (name == Slice("fill100K")) {
Write(write_options, RANDOM, FRESH, num_ / 1000, 100 * 1000, 1);
fresh_db = true;
num_ /= 1000;
value_size_ = 100 * 1000;
method = &Benchmark::WriteRandom;
} else if (name == Slice("readseq")) {
ReadSequential();
method = &Benchmark::ReadSequential;
} else if (name == Slice("readreverse")) {
ReadReverse();
method = &Benchmark::ReadReverse;
} else if (name == Slice("readrandom")) {
ReadRandom();
method = &Benchmark::ReadRandom;
} else if (name == Slice("readhot")) {
ReadHot();
method = &Benchmark::ReadHot;
} else if (name == Slice("readrandomsmall")) {
int n = reads_;
reads_ /= 1000;
ReadRandom();
reads_ = n;
method = &Benchmark::ReadRandom;
} else if (name == Slice("compact")) {
Compact();
method = &Benchmark::Compact;
} else if (name == Slice("crc32c")) {
Crc32c(4096, "(4K per op)");
method = &Benchmark::Crc32c;
} else if (name == Slice("acquireload")) {
AcquireLoad();
method = &Benchmark::AcquireLoad;
} else if (name == Slice("snappycomp")) {
SnappyCompress();
method = &Benchmark::SnappyCompress;
} else if (name == Slice("snappyuncomp")) {
SnappyUncompress();
method = &Benchmark::SnappyUncompress;
} else if (name == Slice("heapprofile")) {
HeapProfile();
} else if (name == Slice("stats")) {
PrintStats();
} else {
known = false;
if (name != Slice()) { // No error message for empty name
fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str());
}
}
if (known) {
Stop(name);
if (fresh_db) {
if (FLAGS_use_existing_db) {
fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
name.ToString().c_str());
method = NULL;
} else {
delete db_;
db_ = NULL;
DestroyDB(FLAGS_db, Options());
Open();
}
}
if (method != NULL) {
RunBenchmark(name, method);
}
}
}
private:
void Crc32c(int size, const char* label) {
struct ThreadArg {
Benchmark* bm;
SharedState* shared;
ThreadState* thread;
void (Benchmark::*method)(ThreadState*);
};
static void ThreadBody(void* v) {
ThreadArg* arg = reinterpret_cast<ThreadArg*>(v);
SharedState* shared = arg->shared;
ThreadState* thread = arg->thread;
{
MutexLock l(&shared->mu);
shared->num_initialized++;
if (shared->num_initialized >= shared->total) {
shared->cv.SignalAll();
}
while (!shared->start) {
shared->cv.Wait();
}
}
thread->stats.Start();
(arg->bm->*(arg->method))(thread);
thread->stats.Stop();
{
MutexLock l(&shared->mu);
shared->num_done++;
if (shared->num_done >= shared->total) {
shared->cv.SignalAll();
}
}
}
void RunBenchmark(Slice name, void (Benchmark::*method)(ThreadState*)) {
const int n = FLAGS_threads;
SharedState shared;
shared.total = n;
shared.num_initialized = 0;
shared.num_done = 0;
shared.start = false;
ThreadArg* arg = new ThreadArg[n];
for (int i = 0; i < n; i++) {
arg[i].bm = this;
arg[i].method = method;
arg[i].shared = &shared;
arg[i].thread = new ThreadState(i);
Env::Default()->StartThread(ThreadBody, &arg[i]);
}
shared.mu.Lock();
while (shared.num_initialized < n) {
shared.cv.Wait();
}
shared.start = true;
shared.cv.SignalAll();
while (shared.num_done < n) {
shared.cv.Wait();
}
shared.mu.Unlock();
for (int i = 1; i < n; i++) {
arg[0].thread->stats.Merge(arg[i].thread->stats);
}
arg[0].thread->stats.Report(name);
for (int i = 0; i < n; i++) {
delete arg[i].thread;
}
delete[] arg;
}
void Crc32c(ThreadState* thread) {
// Checksum about 500MB of data total
const int size = 4096;
const char* label = "(4K per op)";
std::string data(size, 'x');
int64_t bytes = 0;
uint32_t crc = 0;
while (bytes < 500 * 1048576) {
crc = crc32c::Value(data.data(), size);
FinishedSingleOp();
thread->stats.FinishedSingleOp();
bytes += size;
}
// Print so result is not dead
fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc));
bytes_ = bytes;
message_ = label;
thread->stats.AddBytes(bytes);
thread->stats.AddMessage(label);
}
void AcquireLoad() {
void AcquireLoad(ThreadState* thread) {
int dummy;
port::AtomicPointer ap(&dummy);
int count = 0;
void *ptr = NULL;
message_ = "(each op is 1000 loads)";
thread->stats.AddMessage("(each op is 1000 loads)");
while (count < 100000) {
for (int i = 0; i < 1000; i++) {
ptr = ap.Acquire_Load();
}
count++;
FinishedSingleOp();
thread->stats.FinishedSingleOp();
}
if (ptr == NULL) exit(1); // Disable unused variable warning.
}
void SnappyCompress() {
Slice input = gen_.Generate(Options().block_size);
void SnappyCompress(ThreadState* thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
int64_t bytes = 0;
int64_t produced = 0;
bool ok = true;
@ -453,22 +621,23 @@ class Benchmark {
ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
produced += compressed.size();
bytes += input.size();
FinishedSingleOp();
thread->stats.FinishedSingleOp();
}
if (!ok) {
message_ = "(snappy failure)";
thread->stats.AddMessage("(snappy failure)");
} else {
char buf[100];
snprintf(buf, sizeof(buf), "(output: %.1f%%)",
(produced * 100.0) / bytes);
message_ = buf;
bytes_ = bytes;
thread->stats.AddMessage(buf);
thread->stats.AddBytes(bytes);
}
}
void SnappyUncompress() {
Slice input = gen_.Generate(Options().block_size);
void SnappyUncompress(ThreadState* thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
std::string compressed;
bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
int64_t bytes = 0;
@ -477,14 +646,14 @@ class Benchmark {
ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
uncompressed);
bytes += input.size();
FinishedSingleOp();
thread->stats.FinishedSingleOp();
}
delete[] uncompressed;
if (!ok) {
message_ = "(snappy failure)";
thread->stats.AddMessage("(snappy failure)");
} else {
bytes_ = bytes;
thread->stats.AddBytes(bytes);
}
}
@ -501,95 +670,97 @@ class Benchmark {
}
}
void Write(const WriteOptions& options, Order order, DBState state,
int num_entries, int value_size, int entries_per_batch) {
if (state == FRESH) {
if (FLAGS_use_existing_db) {
message_ = "skipping (--use_existing_db is true)";
return;
}
delete db_;
db_ = NULL;
DestroyDB(FLAGS_db, Options());
Open();
Start(); // Do not count time taken to destroy/open
}
void WriteSeq(ThreadState* thread) {
DoWrite(thread, true);
}
if (num_entries != num_) {
void WriteRandom(ThreadState* thread) {
DoWrite(thread, false);
}
void DoWrite(ThreadState* thread, bool seq) {
if (num_ != FLAGS_num) {
char msg[100];
snprintf(msg, sizeof(msg), "(%d ops)", num_entries);
message_ = msg;
snprintf(msg, sizeof(msg), "(%d ops)", num_);
thread->stats.AddMessage(msg);
}
RandomGenerator gen;
WriteBatch batch;
Status s;
std::string val;
for (int i = 0; i < num_entries; i += entries_per_batch) {
int64_t bytes = 0;
for (int i = 0; i < num_; i += entries_per_batch_) {
batch.Clear();
for (int j = 0; j < entries_per_batch; j++) {
const int k = (order == SEQUENTIAL) ? i+j : (rand_.Next() % FLAGS_num);
for (int j = 0; j < entries_per_batch_; j++) {
const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num);
char key[100];
snprintf(key, sizeof(key), "%016d", k);
batch.Put(key, gen_.Generate(value_size));
bytes_ += value_size + strlen(key);
FinishedSingleOp();
batch.Put(key, gen.Generate(value_size_));
bytes += value_size_ + strlen(key);
thread->stats.FinishedSingleOp();
}
s = db_->Write(options, &batch);
s = db_->Write(write_options_, &batch);
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
}
thread->stats.AddBytes(bytes);
}
void ReadSequential() {
void ReadSequential(ThreadState* thread) {
Iterator* iter = db_->NewIterator(ReadOptions());
int i = 0;
int64_t bytes = 0;
for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
bytes_ += iter->key().size() + iter->value().size();
FinishedSingleOp();
bytes += iter->key().size() + iter->value().size();
thread->stats.FinishedSingleOp();
++i;
}
delete iter;
thread->stats.AddBytes(bytes);
}
void ReadReverse() {
void ReadReverse(ThreadState* thread) {
Iterator* iter = db_->NewIterator(ReadOptions());
int i = 0;
int64_t bytes = 0;
for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
bytes_ += iter->key().size() + iter->value().size();
FinishedSingleOp();
bytes += iter->key().size() + iter->value().size();
thread->stats.FinishedSingleOp();
++i;
}
delete iter;
thread->stats.AddBytes(bytes);
}
void ReadRandom() {
void ReadRandom(ThreadState* thread) {
ReadOptions options;
std::string value;
for (int i = 0; i < reads_; i++) {
char key[100];
const int k = rand_.Next() % FLAGS_num;
const int k = thread->rand.Next() % FLAGS_num;
snprintf(key, sizeof(key), "%016d", k);
db_->Get(options, key, &value);
FinishedSingleOp();
thread->stats.FinishedSingleOp();
}
}
void ReadHot() {
void ReadHot(ThreadState* thread) {
ReadOptions options;
std::string value;
const int range = (FLAGS_num + 99) / 100;
for (int i = 0; i < reads_; i++) {
char key[100];
const int k = rand_.Next() % range;
const int k = thread->rand.Next() % range;
snprintf(key, sizeof(key), "%016d", k);
db_->Get(options, key, &value);
FinishedSingleOp();
thread->stats.FinishedSingleOp();
}
}
void Compact() {
void Compact(ThreadState* thread) {
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable();
int max_level_with_files = 1;
@ -609,10 +780,9 @@ class Benchmark {
void PrintStats() {
std::string stats;
if (!db_->GetProperty("leveldb.stats", &stats)) {
message_ = "(failed)";
} else {
post_message_ = stats;
stats = "(failed)";
}
fprintf(stdout, "\n%s\n", stats.c_str());
}
static void WriteToFile(void* arg, const char* buf, int n) {
@ -625,13 +795,13 @@ class Benchmark {
WritableFile* file;
Status s = Env::Default()->NewWritableFile(fname, &file);
if (!s.ok()) {
message_ = s.ToString();
fprintf(stderr, "%s\n", s.ToString().c_str());
return;
}
bool ok = port::GetHeapProfile(WriteToFile, file);
delete file;
if (!ok) {
message_ = "not supported";
fprintf(stderr, "heap profiling not supported\n");
Env::Default()->DeleteFile(fname);
}
}
@ -661,6 +831,8 @@ int main(int argc, char** argv) {
FLAGS_num = n;
} else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {
FLAGS_reads = n;
} else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) {
FLAGS_threads = n;
} else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) {
FLAGS_value_size = n;
} else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) {

+ 23
- 13
db/db_impl.cc Wyświetl plik

@ -989,27 +989,37 @@ Status DBImpl::Get(const ReadOptions& options,
snapshot = versions_->LastSequence();
}
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem_->Get(lkey, value, &s)) {
return s;
}
if (imm_ != NULL && imm_->Get(lkey, value, &s)) {
return s;
}
// Not in memtable(s); try live files in level order
MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != NULL) imm->Ref();
current->Ref();
bool have_stat_update = false;
Version::GetStats stats;
{ // Unlock while reading from files
// Unlock while reading from files and memtables
{
mutex_.Unlock();
s = current->Get(options, lkey, value, &stats);
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem_->Get(lkey, value, &s)) {
// Done
} else if (imm_ != NULL && imm_->Get(lkey, value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}
if (current->UpdateStats(stats)) {
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != NULL) imm->Unref();
current->Unref();
return s;
}

+ 94
- 55
util/cache.cc Wyświetl plik

@ -30,7 +30,8 @@ struct LRUHandle {
LRUHandle* prev;
size_t charge; // TODO(opt): Only allow uint32_t?
size_t key_length;
size_t refs; // TODO(opt): Pack with "key_length"?
uint32_t refs;
uint32_t hash; // Hash of key(); used for fast sharding and comparisons
char key_data[1]; // Beginning of key
Slice key() const {
@ -54,12 +55,12 @@ class HandleTable {
HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); }
~HandleTable() { delete[] list_; }
LRUHandle* Lookup(LRUHandle* h) {
return *FindPointer(h);
LRUHandle* Lookup(const Slice& key, uint32_t hash) {
return *FindPointer(key, hash);
}
LRUHandle* Insert(LRUHandle* h) {
LRUHandle** ptr = FindPointer(h);
LRUHandle** ptr = FindPointer(h->key(), h->hash);
LRUHandle* old = *ptr;
h->next_hash = (old == NULL ? NULL : old->next_hash);
*ptr = h;
@ -74,8 +75,8 @@ class HandleTable {
return old;
}
LRUHandle* Remove(LRUHandle* h) {
LRUHandle** ptr = FindPointer(h);
LRUHandle* Remove(const Slice& key, uint32_t hash) {
LRUHandle** ptr = FindPointer(key, hash);
LRUHandle* result = *ptr;
if (result != NULL) {
*ptr = result->next_hash;
@ -92,13 +93,12 @@ class HandleTable {
LRUHandle** list_;
// Return a pointer to slot that points to a cache entry that
// matches *h. If there is no such cache entry, return a pointer to
// the trailing slot in the corresponding linked list.
LRUHandle** FindPointer(LRUHandle* h) {
Slice key = h->key();
uint32_t hash = Hash(key.data(), key.size(), 0);
// matches key/hash. If there is no such cache entry, return a
// pointer to the trailing slot in the corresponding linked list.
LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
LRUHandle** ptr = &list_[hash & (length_ - 1)];
while (*ptr != NULL && key != (*ptr)->key()) {
while (*ptr != NULL &&
((*ptr)->hash != hash || key != (*ptr)->key())) {
ptr = &(*ptr)->next_hash;
}
return ptr;
@ -117,7 +117,7 @@ class HandleTable {
while (h != NULL) {
LRUHandle* next = h->next_hash;
Slice key = h->key();
uint32_t hash = Hash(key.data(), key.size(), 0);
uint32_t hash = h->hash;
LRUHandle** ptr = &new_list[hash & (new_length - 1)];
h->next_hash = *ptr;
*ptr = h;
@ -132,26 +132,30 @@ class HandleTable {
}
};
class LRUCache : public Cache {
// A single shard of sharded cache.
class LRUCache {
public:
explicitn> LRUCache(size_t capacity);
virtual ~LRUCache();
LRUCache();
~LRUCache();
virtual Handle* Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value));
virtual Handle* Lookup(const Slice& key);
virtual void Release(Handle* handle);
virtual void* Value(Handle* handle);
virtual void Erase(const Slice& key);
virtual uint64_t NewId();
// Separate from constructor so caller can easily make an array of LRUCache
void SetCapacity(size_t capacity) { capacity_ = capacity; }
// Like Cache methods, but with an extra "hash" parameter.
Cache::Handle* Insert(const Slice& key, uint32_t hash,
void* value, size_t charge,
void (*deleter)(const Slice& key, void* value));
Cache::Handle* Lookup(const Slice& key, uint32_t hash);
void Release(Cache::Handle* handle);
void Erase(const Slice& key, uint32_t hash);
private:
void LRU_Remove(LRUHandle* e);
void LRU_Append(LRUHandle* e);
void Unref(LRUHandle* e);
// Constructor parameters
const size_t capacity_;
// Initialized before use.
size_t capacity_;
// mutex_ protects the following state.
port::Mutex mutex_;
@ -165,9 +169,8 @@ class LRUCache : public Cache {
HandleTable table_;
};
LRUCache::LRUCache(size_t capacity)
: capacity_(capacity),
usage_(0),
LRUCache::LRUCache()
: usage_(0),
last_id_(0) {
// Make empty circular linked list
lru_.next = &lru_;
@ -206,32 +209,25 @@ void LRUCache::LRU_Append(LRUHandle* e) {
e->next->prev = e;
}
Cache::Handle* LRUCache::Lookup(const Slice& key) {
Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) {
MutexLock l(&mutex_);
LRUHandle dummy;
dummy.next = &dummy;
dummy.value = const_cast<Slice*>(&key);
LRUHandle* e = table_.Lookup(&dummy);
LRUHandle* e = table_.Lookup(key, hash);
if (e != NULL) {
e->refs++;
LRU_Remove(e);
LRU_Append(e);
}
return reinterpret_cast<Handle*>(e);
return reinterpret_cast<Cache::Handle*>(e);
}
void* LRUCache::Value(Handle* handle) {
return reinterpret_cast<LRUHandle*>(handle)->value;
}
void LRUCache::Release(Handle* handle) {
void LRUCache::Release(Cache::Handle* handle) {
MutexLock l(&mutex_);
Unref(reinterpret_cast<LRUHandle*>(handle));
}
Cache::Handle* LRUCache::Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value)) {
Cache::Handle* LRUCache::Insert(
const Slice& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value)) {
MutexLock l(&mutex_);
LRUHandle* e = reinterpret_cast<LRUHandle*>(
@ -240,6 +236,7 @@ Cache::Handle* LRUCache::Insert(const Slice& key, void* value, size_t charge,
e->deleter = deleter;
e->charge = charge;
e->key_length = key.size();
e->hash = hash;
e->refs = 2; // One from LRUCache, one for the returned handle
memcpy(e->key_data, key.data(), key.size());
LRU_Append(e);
@ -254,35 +251,77 @@ Cache::Handle* LRUCache::Insert(const Slice& key, void* value, size_t charge,
while (usage_ > capacity_ && lru_.next != &lru_) {
LRUHandle* old = lru_.next;
LRU_Remove(old);
table_.Remove(old);
table_.Remove(old->key(), old->hash);
Unref(old);
}
return reinterpret_cast<Handle*>(e);
return reinterpret_cast<Cache::Handle*>(e);
}
void LRUCache::Erase(const Slice& key) {
void LRUCache::Erase(const Slice& key, uint32_t hash) {
MutexLock l(&mutex_);
LRUHandle dummy;
dummy.next = &dummy;
dummy.value = const_cast<Slice*>(&key);
LRUHandle* e = table_.Remove(&dummy);
LRUHandle* e = table_.Remove(key, hash);
if (e != NULL) {
LRU_Remove(e);
Unref(e);
}
}
uint64_t LRUCache::NewId() {
MutexLock l(&mutex_);
return ++(last_id_);
}
static const int kNumShardBits = 4;
static const int kNumShards = 1 << kNumShardBits;
class ShardedLRUCache : public Cache {
private:
LRUCache shard_[kNumShards];
port::Mutex id_mutex_;
uint64_t last_id_;
static inline uint32_t HashSlice(const Slice& s) {
return Hash(s.data(), s.size(), 0);
}
static uint32_t Shard(uint32_t hash) {
return hash >> (32 - kNumShardBits);
}
public:
explicit ShardedLRUCache(size_t capacity) {
const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards;
for (int s = 0; s < kNumShards; s++) {
shard_[s].SetCapacity(per_shard);
}
}
virtual ~ShardedLRUCache() { }
virtual Handle* Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value)) {
const uint32_t hash = HashSlice(key);
return shard_[Shard(hash)].Insert(key, hash, value, charge, deleter);
}
virtual Handle* Lookup(const Slice& key) {
const uint32_t hash = HashSlice(key);
return shard_[Shard(hash)].Lookup(key, hash);
}
virtual void Release(Handle* handle) {
LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
shard_[Shard(h->hash)].Release(handle);
}
virtual void Erase(const Slice& key) {
const uint32_t hash = HashSlice(key);
shard_[Shard(hash)].Erase(key, hash);
}
virtual void* Value(Handle* handle) {
return reinterpret_cast<LRUHandle*>(handle)->value;
}
virtual uint64_t NewId() {
MutexLock l(&id_mutex_);
return ++(last_id_);
}
};
} // end anonymous namespace
Cache* NewLRUCache(size_t capacity) {
return new LRUCache(capacity);
return new ShardedLRUCache(capacity);
}
}

+ 28
- 11
util/cache_test.cc Wyświetl plik

@ -32,7 +32,7 @@ class CacheTest {
current_->deleted_values_.push_back(DecodeValue(v));
}
static const int kCacheSize = 100;
static const int kCacheSize = 1000;
std::vector<int> deleted_keys_;
std::vector<int> deleted_values_;
Cache* cache_;
@ -137,23 +137,40 @@ TEST(CacheTest, EvictionPolicy) {
Insert(200, 201);
// Frequently used entry must be kept around
for (int i = 0; i < kCacheSize; i++) {
for (int i = 0; i < kCacheSize + 100; i++) {
Insert(1000+i, 2000+i);
ASSERT_EQ(2000+i, Lookup(1000+i));
ASSERT_EQ(101, Lookup(100));
}
ASSERT_EQ(101, Lookup(100));
ASSERT_EQ(2, deleted_keys_.size());
ASSERT_EQ(200, deleted_keys_[0]);
ASSERT_EQ(201, deleted_values_[0]);
ASSERT_EQ(-1, Lookup(200));
}
TEST(CacheTest, HeavyEntry) {
Insert(100, 101);
Insert(200, 201, kCacheSize);
ASSERT_EQ(1, deleted_keys_.size());
ASSERT_EQ(100, deleted_keys_[0]);
ASSERT_EQ(101, deleted_values_[0]);
TEST(CacheTest, HeavyEntries) {
// Add a bunch of light and heavy entries and then count the combined
// size of items still in the cache, which must be approximately the
// same as the total capacity.
const int kLight = 1;
const int kHeavy = 10;
int added = 0;
int index = 0;
while (added < 2*kCacheSize) {
const int weight = (index & 1) ? kLight : kHeavy;
Insert(index, 1000+index, weight);
added += weight;
index++;
}
int cached_weight = 0;
for (int i = 0; i < index; i++) {
const int weight = (i & 1 ? kLight : kHeavy);
int r = Lookup(i);
if (r >= 0) {
cached_weight += weight;
ASSERT_EQ(1000+i, r);
}
}
ASSERT_LE(cached_weight, kCacheSize + kCacheSize/10);
}
TEST(CacheTest, NewId) {

+ 11
- 0
util/histogram.cc Wyświetl plik

@ -55,6 +55,17 @@ void Histogram::Add(double value) {
sum_squares_ += (value * value);
}
void Histogram::Merge(const Histogram& other) {
if (other.min_ < min_) min_ = other.min_;
if (other.max_ > max_) max_ = other.max_;
num_ += other.num_;
sum_ += other.sum_;
sum_squares_ += other.sum_squares_;
for (int b = 0; b < kNumBuckets; b++) {
buckets_[b] += other.buckets_[b];
}
}
double Histogram::Median() const {
return Percentile(50.0);
}

+ 1
- 0
util/histogram.h Wyświetl plik

@ -16,6 +16,7 @@ class Histogram {
void Clear();
void Add(double value);
void Merge(const Histogram& other);
std::string ToString() const;

Ładowanie…
Anuluj
Zapisz