diff --git a/.gitignore b/.gitignore index 9e34c6c..c600ed3 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,6 @@ out/ # clangd .cache/ -compile_commands.json \ No newline at end of file +compile_commands.json + +benchmark-result/ diff --git a/CMakeLists.txt b/CMakeLists.txt index aadc1cf..d0ea7d5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -198,6 +198,8 @@ target_sources(leveldb "fielddb/meta.h" "fielddb/request.cpp" "fielddb/request.h" + "testdb/testdb.cc" + "testdb/testdb.h" # Only CMake 3.3+ supports PUBLIC sources in targets exported by "install". $<$:PUBLIC> @@ -447,6 +449,8 @@ if(LEVELDB_BUILD_BENCHMARKS) if(NOT BUILD_SHARED_LIBS) leveldb_benchmark("benchmarks/db_bench.cc") + leveldb_benchmark("benchmarks/db_bench_FieldDB.cc") + leveldb_benchmark("benchmarks/db_bench_testDB.cc") endif(NOT BUILD_SHARED_LIBS) check_library_exists(sqlite3 sqlite3_open "" HAVE_SQLITE3) diff --git a/benchmarks/db_bench.cc b/benchmarks/db_bench.cc index 8e3f4e7..72d962c 100644 --- a/benchmarks/db_bench.cc +++ b/benchmarks/db_bench.cc @@ -325,8 +325,8 @@ class Stats { // elapsed times. double elapsed = (finish_ - start_) * 1e-6; char rate[100]; - std::snprintf(rate, sizeof(rate), "%6.1f MB/s", - (bytes_ / 1048576.0) / elapsed); + std::snprintf(rate, sizeof(rate), "%6.1f MB/s Bytes:%6.1f elapsed(s):%6.1f seconds:%6.1f ", + (bytes_ / 1048576.0) / elapsed,(bytes_ / 1048576.0),elapsed,seconds_); extra = rate; } AppendWithSpace(&extra, message_); @@ -737,6 +737,10 @@ class Benchmark { } shared.mu.Unlock(); + // for(int i = 0; i < n; i++) { + // arg[i].thread->stats.Report(name.ToString() + "thread:" + std::to_string(i)); + // } + for (int i = 1; i < n; i++) { arg[0].thread->stats.Merge(arg[i].thread->stats); } diff --git a/benchmarks/db_bench_FieldDB.cc b/benchmarks/db_bench_FieldDB.cc new file mode 100644 index 0000000..a0f9a21 --- /dev/null +++ b/benchmarks/db_bench_FieldDB.cc @@ -0,0 +1,1144 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include + +#include +#include +#include + +#include "leveldb/cache.h" +#include "leveldb/comparator.h" +#include "leveldb/db.h" +#include "leveldb/env.h" +#include "leveldb/filter_policy.h" +#include "leveldb/status.h" +#include "leveldb/write_batch.h" +#include "port/port.h" +#include "util/crc32c.h" +#include "util/histogram.h" +#include "util/mutexlock.h" +#include "util/random.h" +#include "util/testutil.h" + +#include "fielddb/field_db.h" +using namespace fielddb; + +// Comma-separated list of operations to run in the specified order +// Actual benchmarks: +// fillseq -- write N values in sequential key order in async mode +// fillrandom -- write N values in random key order in async mode +// overwrite -- overwrite N values in random key order in async mode +// fillsync -- write N/100 values in random key order in sync mode +// fill100K -- write N/1000 100K values in random order in async mode +// deleteseq -- delete N keys in sequential order +// deleterandom -- delete N keys in random order +// readseq -- read N times sequentially +// readreverse -- read N times in reverse order +// readrandom -- read N times in random order +// readmissing -- read N missing keys in random order +// readhot -- read N times in random order from 1% section of DB +// seekrandom -- N random seeks +// seekordered -- N ordered seeks +// open -- cost of opening a DB +// crc32c -- repeated crc32c of 4K of data +// Meta operations: +// compact -- Compact the entire DB +// stats -- Print DB stats +// sstables -- Print sstable info +// heapprofile -- Dump a heap profile (if supported by this port) +static const char* FLAGS_benchmarks = + "fillseq," + "fillsync," + "fillrandom," + "overwrite," + "readrandom," + "readrandom," // Extra run to allow previous compactions to quiesce + "readseq," + "readreverse," + "compact," + "readrandom," + "readseq," + "readreverse," + "fill100K," + "crc32c," + "snappycomp," + "snappyuncomp," + "zstdcomp," + "zstduncomp,"; + +// Number of key/values to place in database +static int FLAGS_num = 1000000; + +// Number of read operations to do. If negative, do FLAGS_num reads. +static int FLAGS_reads = -1; + +// Number of concurrent threads to run. +static int FLAGS_threads = 1; + +// Size of each value +static int FLAGS_value_size = 100; + +// Arrange to generate values that shrink to this fraction of +// their original size after compression +static double FLAGS_compression_ratio = 0.5; + +// Print histogram of operation timings +static bool FLAGS_histogram = false; + +// Count the number of string comparisons performed +static bool FLAGS_comparisons = false; + +// Number of bytes to buffer in memtable before compacting +// (initialized to default value by "main") +static int FLAGS_write_buffer_size = 0; + +// Number of bytes written to each file. +// (initialized to default value by "main") +static int FLAGS_max_file_size = 0; + +// Approximate size of user data packed per block (before compression. +// (initialized to default value by "main") +static int FLAGS_block_size = 0; + +// Number of bytes to use as a cache of uncompressed data. +// Negative means use default settings. +static int FLAGS_cache_size = -1; + +// Maximum number of files to keep open at the same time (use default if == 0) +static int FLAGS_open_files = 0; + +// Bloom filter bits per key. +// Negative means use default settings. +static int FLAGS_bloom_bits = -1; + +// Common key prefix length. +static int FLAGS_key_prefix = 0; + +// If true, do not destroy the existing database. If you set this +// flag and also specify a benchmark that wants a fresh database, that +// benchmark will fail. +static bool FLAGS_use_existing_db = false; + +// If true, reuse existing log/MANIFEST files when re-opening a database. +static bool FLAGS_reuse_logs = false; + +// If true, use compression. +static bool FLAGS_compression = true; + +// Use the db with the following name. +static const char* FLAGS_db = nullptr; + +// ZSTD compression level to try out +static int FLAGS_zstd_compression_level = 1; + +namespace leveldb { + +namespace { +leveldb::Env* g_env = nullptr; + +class CountComparator : public Comparator { + public: + CountComparator(const Comparator* wrapped) : wrapped_(wrapped) {} + ~CountComparator() override {} + int Compare(const Slice& a, const Slice& b) const override { + count_.fetch_add(1, std::memory_order_relaxed); + return wrapped_->Compare(a, b); + } + const char* Name() const override { return wrapped_->Name(); } + void FindShortestSeparator(std::string* start, + const Slice& limit) const override { + wrapped_->FindShortestSeparator(start, limit); + } + + void FindShortSuccessor(std::string* key) const override { + return wrapped_->FindShortSuccessor(key); + } + + size_t comparisons() const { return count_.load(std::memory_order_relaxed); } + + void reset() { count_.store(0, std::memory_order_relaxed); } + + private: + mutable std::atomic count_{0}; + const Comparator* const wrapped_; +}; + +// Helper for quickly generating random data. +class RandomGenerator { + private: + std::string data_; + int pos_; + + public: + RandomGenerator() { + // We use a limited amount of data over and over again and ensure + // that it is larger than the compression window (32KB), and also + // large enough to serve all typical value sizes we want to write. + Random rnd(301); + std::string piece; + while (data_.size() < 1048576) { + // Add a short fragment that is as compressible as specified + // by FLAGS_compression_ratio. + test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece); + data_.append(piece); + } + pos_ = 0; + } + + Slice Generate(size_t len) { + if (pos_ + len > data_.size()) { + pos_ = 0; + assert(len < data_.size()); + } + pos_ += len; + return Slice(data_.data() + pos_ - len, len); + } +}; + +class KeyBuffer { + public: + KeyBuffer() { + assert(FLAGS_key_prefix < sizeof(buffer_)); + memset(buffer_, 'a', FLAGS_key_prefix); + } + KeyBuffer& operator=(KeyBuffer& other) = delete; + KeyBuffer(KeyBuffer& other) = delete; + + void Set(int k) { + std::snprintf(buffer_ + FLAGS_key_prefix, + sizeof(buffer_) - FLAGS_key_prefix, "%016d", k); + } + + Slice slice() const { return Slice(buffer_, FLAGS_key_prefix + 16); } + + private: + char buffer_[1024]; +}; + +#if defined(__linux) +static Slice TrimSpace(Slice s) { + size_t start = 0; + while (start < s.size() && isspace(s[start])) { + start++; + } + size_t limit = s.size(); + while (limit > start && isspace(s[limit - 1])) { + limit--; + } + return Slice(s.data() + start, limit - start); +} +#endif + +static void AppendWithSpace(std::string* str, Slice msg) { + if (msg.empty()) return; + if (!str->empty()) { + str->push_back(' '); + } + str->append(msg.data(), msg.size()); +} + +class Stats { + private: + double start_; + double finish_; + double seconds_; + int done_; + int next_report_; + int64_t bytes_; + double last_op_finish_; + Histogram hist_; + std::string message_; + + public: + Stats() { Start(); } + + void Start() { + next_report_ = 100; + hist_.Clear(); + done_ = 0; + bytes_ = 0; + seconds_ = 0; + message_.clear(); + start_ = finish_ = last_op_finish_ = g_env->NowMicros(); + } + + void Merge(const Stats& other) { + hist_.Merge(other.hist_); + done_ += other.done_; + bytes_ += other.bytes_; + seconds_ += other.seconds_; + if (other.start_ < start_) start_ = other.start_; + if (other.finish_ > finish_) finish_ = other.finish_; + + // Just keep the messages from one thread + if (message_.empty()) message_ = other.message_; + } + + void Stop() { + finish_ = g_env->NowMicros(); + seconds_ = (finish_ - start_) * 1e-6; + } + + void AddMessage(Slice msg) { AppendWithSpace(&message_, msg); } + + void FinishedSingleOp() { + if (FLAGS_histogram) { + double now = g_env->NowMicros(); + double micros = now - last_op_finish_; + hist_.Add(micros); + if (micros > 20000) { + std::fprintf(stderr, "long op: %.1f micros%30s\r", micros, ""); + std::fflush(stderr); + } + last_op_finish_ = now; + } + + done_++; + if (done_ >= next_report_) { + if (next_report_ < 1000) + next_report_ += 100; + else if (next_report_ < 5000) + next_report_ += 500; + else if (next_report_ < 10000) + next_report_ += 1000; + else if (next_report_ < 50000) + next_report_ += 5000; + else if (next_report_ < 100000) + next_report_ += 10000; + else if (next_report_ < 500000) + next_report_ += 50000; + else + next_report_ += 100000; + std::fprintf(stderr, "... finished %d ops%30s\r", done_, ""); + std::fflush(stderr); + } + } + + void AddBytes(int64_t n) { bytes_ += n; } + + void Report(const Slice& name) { + // Pretend at least one op was done in case we are running a benchmark + // that does not call FinishedSingleOp(). + if (done_ < 1) done_ = 1; + + std::string extra; + if (bytes_ > 0) { + // Rate is computed on actual elapsed time, not the sum of per-thread + // elapsed times. + double elapsed = (finish_ - start_) * 1e-6; + char rate[100]; + std::snprintf(rate, sizeof(rate), "%6.1f MB/s", + (bytes_ / 1048576.0) / elapsed); + extra = rate; + } + AppendWithSpace(&extra, message_); + + std::fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n", + name.ToString().c_str(), seconds_ * 1e6 / done_, + (extra.empty() ? "" : " "), extra.c_str()); + if (FLAGS_histogram) { + std::fprintf(stdout, "Microseconds per op:\n%s\n", + hist_.ToString().c_str()); + } + std::fflush(stdout); + } +}; + +// State shared by all concurrent executions of the same benchmark. +struct SharedState { + port::Mutex mu; + port::CondVar cv GUARDED_BY(mu); + int total GUARDED_BY(mu); + + // Each thread goes through the following states: + // (1) initializing + // (2) waiting for others to be initialized + // (3) running + // (4) done + + int num_initialized GUARDED_BY(mu); + int num_done GUARDED_BY(mu); + bool start GUARDED_BY(mu); + + SharedState(int total) + : cv(&mu), total(total), num_initialized(0), num_done(0), start(false) {} +}; + +// Per-thread state for concurrent executions of the same benchmark. +struct ThreadState { + int tid; // 0..n-1 when running in n threads + Random rand; // Has different seeds for different threads + Stats stats; + SharedState* shared; + + ThreadState(int index, int seed) : tid(index), rand(seed), shared(nullptr) {} +}; + +void Compress( + ThreadState* thread, std::string name, + std::function compress_func) { + RandomGenerator gen; + Slice input = gen.Generate(Options().block_size); + int64_t bytes = 0; + int64_t produced = 0; + bool ok = true; + std::string compressed; + while (ok && bytes < 1024 * 1048576) { // Compress 1G + ok = compress_func(input.data(), input.size(), &compressed); + produced += compressed.size(); + bytes += input.size(); + thread->stats.FinishedSingleOp(); + } + + if (!ok) { + thread->stats.AddMessage("(" + name + " failure)"); + } else { + char buf[100]; + std::snprintf(buf, sizeof(buf), "(output: %.1f%%)", + (produced * 100.0) / bytes); + thread->stats.AddMessage(buf); + thread->stats.AddBytes(bytes); + } +} + +void Uncompress( + ThreadState* thread, std::string name, + std::function compress_func, + std::function uncompress_func) { + RandomGenerator gen; + Slice input = gen.Generate(Options().block_size); + std::string compressed; + bool ok = compress_func(input.data(), input.size(), &compressed); + int64_t bytes = 0; + char* uncompressed = new char[input.size()]; + while (ok && bytes < 1024 * 1048576) { // Compress 1G + ok = uncompress_func(compressed.data(), compressed.size(), uncompressed); + bytes += input.size(); + thread->stats.FinishedSingleOp(); + } + delete[] uncompressed; + + if (!ok) { + thread->stats.AddMessage("(" + name + " failure)"); + } else { + thread->stats.AddBytes(bytes); + } +} + +} // namespace + +class Benchmark { + private: + Cache* cache_; + const FilterPolicy* filter_policy_; + fielddb::FieldDB* db_; + int num_; + int value_size_; + int entries_per_batch_; + WriteOptions write_options_; + int reads_; + int heap_counter_; + CountComparator count_comparator_; + int total_thread_count_; + + void PrintHeader() { + const int kKeySize = 16 + FLAGS_key_prefix; + PrintEnvironment(); + std::fprintf(stdout, "Keys: %d bytes each\n", kKeySize); + std::fprintf( + stdout, "Values: %d bytes each (%d bytes after compression)\n", + FLAGS_value_size, + static_cast(FLAGS_value_size * FLAGS_compression_ratio + 0.5)); + std::fprintf(stdout, "Entries: %d\n", num_); + std::fprintf(stdout, "RawSize: %.1f MB (estimated)\n", + ((static_cast(kKeySize + FLAGS_value_size) * num_) / + 1048576.0)); + std::fprintf( + stdout, "FileSize: %.1f MB (estimated)\n", + (((kKeySize + FLAGS_value_size * FLAGS_compression_ratio) * num_) / + 1048576.0)); + PrintWarnings(); + std::fprintf(stdout, "------------------------------------------------\n"); + } + + void PrintWarnings() { +#if defined(__GNUC__) && !defined(__OPTIMIZE__) + std::fprintf( + stdout, + "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"); +#endif +#ifndef NDEBUG + std::fprintf( + stdout, + "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n"); +#endif + + // See if snappy is working by attempting to compress a compressible string + const char text[] = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy"; + std::string compressed; + if (!port::Snappy_Compress(text, sizeof(text), &compressed)) { + std::fprintf(stdout, "WARNING: Snappy compression is not enabled\n"); + } else if (compressed.size() >= sizeof(text)) { + std::fprintf(stdout, "WARNING: Snappy compression is not effective\n"); + } + } + + void PrintEnvironment() { + std::fprintf(stderr, "LevelDB: version %d.%d\n", kMajorVersion, + kMinorVersion); + +#if defined(__linux) + time_t now = time(nullptr); + std::fprintf(stderr, "Date: %s", + ctime(&now)); // ctime() adds newline + + FILE* cpuinfo = std::fopen("/proc/cpuinfo", "r"); + if (cpuinfo != nullptr) { + char line[1000]; + int num_cpus = 0; + std::string cpu_type; + std::string cache_size; + while (fgets(line, sizeof(line), cpuinfo) != nullptr) { + const char* sep = strchr(line, ':'); + if (sep == nullptr) { + continue; + } + Slice key = TrimSpace(Slice(line, sep - 1 - line)); + Slice val = TrimSpace(Slice(sep + 1)); + if (key == "model name") { + ++num_cpus; + cpu_type = val.ToString(); + } else if (key == "cache size") { + cache_size = val.ToString(); + } + } + std::fclose(cpuinfo); + std::fprintf(stderr, "CPU: %d * %s\n", num_cpus, cpu_type.c_str()); + std::fprintf(stderr, "CPUCache: %s\n", cache_size.c_str()); + } +#endif + } + + public: + Benchmark() + : cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : nullptr), + filter_policy_(FLAGS_bloom_bits >= 0 + ? NewBloomFilterPolicy(FLAGS_bloom_bits) + : nullptr), + db_(nullptr), + num_(FLAGS_num), + value_size_(FLAGS_value_size), + entries_per_batch_(1), + reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads), + heap_counter_(0), + count_comparator_(BytewiseComparator()), + total_thread_count_(0) { + std::vector files; + g_env->GetChildren(FLAGS_db, &files); + for (size_t i = 0; i < files.size(); i++) { + if (Slice(files[i]).starts_with("heap-")) { + g_env->RemoveFile(std::string(FLAGS_db) + "/" + files[i]); + } + } + if (!FLAGS_use_existing_db) { + DestroyDB(FLAGS_db, Options()); + } + } + + ~Benchmark() { + delete db_; + delete cache_; + delete filter_policy_; + } + + void Run() { + PrintHeader(); + Open(); + + const char* benchmarks = FLAGS_benchmarks; + while (benchmarks != nullptr) { + const char* sep = strchr(benchmarks, ','); + Slice name; + if (sep == nullptr) { + name = benchmarks; + benchmarks = nullptr; + } else { + name = Slice(benchmarks, sep - benchmarks); + benchmarks = sep + 1; + } + + // Reset parameters that may be overridden below + num_ = FLAGS_num; + reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads); + value_size_ = FLAGS_value_size; + entries_per_batch_ = 1; + write_options_ = WriteOptions(); + + void (Benchmark::*method)(ThreadState*) = nullptr; + bool fresh_db = false; + int num_threads = FLAGS_threads; + + if (name == Slice("open")) { + method = &Benchmark::OpenBench; + num_ /= 10000; + if (num_ < 1) num_ = 1; + } else if (name == Slice("fillseq")) { + fresh_db = true; + method = &Benchmark::WriteSeq; + } else if (name == Slice("fillbatch")) { + fresh_db = true; + entries_per_batch_ = 1000; + method = &Benchmark::WriteSeq; + } else if (name == Slice("fillrandom")) { + fresh_db = true; + method = &Benchmark::WriteRandom; + } else if (name == Slice("overwrite")) { + fresh_db = false; + method = &Benchmark::WriteRandom; + } else if (name == Slice("fillsync")) { + fresh_db = true; + num_ /= 1000; + write_options_.sync = true; + method = &Benchmark::WriteRandom; + } else if (name == Slice("fill100K")) { + fresh_db = true; + num_ /= 1000; + value_size_ = 100 * 1000; + method = &Benchmark::WriteRandom; + } else if (name == Slice("readseq")) { + method = &Benchmark::ReadSequential; + } else if (name == Slice("readreverse")) { + method = &Benchmark::ReadReverse; + } else if (name == Slice("readrandom")) { + method = &Benchmark::ReadRandom; + } else if (name == Slice("readmissing")) { + method = &Benchmark::ReadMissing; + } else if (name == Slice("seekrandom")) { + method = &Benchmark::SeekRandom; + } else if (name == Slice("seekordered")) { + method = &Benchmark::SeekOrdered; + } else if (name == Slice("readhot")) { + method = &Benchmark::ReadHot; + } else if (name == Slice("readrandomsmall")) { + reads_ /= 1000; + method = &Benchmark::ReadRandom; + } else if (name == Slice("deleteseq")) { + method = &Benchmark::DeleteSeq; + } else if (name == Slice("deleterandom")) { + method = &Benchmark::DeleteRandom; + } else if (name == Slice("readwhilewriting")) { + num_threads++; // Add extra thread for writing + method = &Benchmark::ReadWhileWriting; + } else if (name == Slice("compact")) { + method = &Benchmark::Compact; + } else if (name == Slice("crc32c")) { + method = &Benchmark::Crc32c; + } else if (name == Slice("snappycomp")) { + method = &Benchmark::SnappyCompress; + } else if (name == Slice("snappyuncomp")) { + method = &Benchmark::SnappyUncompress; + } else if (name == Slice("zstdcomp")) { + method = &Benchmark::ZstdCompress; + } else if (name == Slice("zstduncomp")) { + method = &Benchmark::ZstdUncompress; + } else if (name == Slice("heapprofile")) { + HeapProfile(); + } else if (name == Slice("stats")) { + PrintStats("leveldb.stats"); + } else if (name == Slice("sstables")) { + PrintStats("leveldb.sstables"); + } else { + if (!name.empty()) { // No error message for empty name + std::fprintf(stderr, "unknown benchmark '%s'\n", + name.ToString().c_str()); + } + } + + if (fresh_db) { + if (FLAGS_use_existing_db) { + std::fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n", + name.ToString().c_str()); + method = nullptr; + } else { + delete db_; + db_ = nullptr; + DestroyDB(FLAGS_db, Options()); + Open(); + } + } + + if (method != nullptr) { + RunBenchmark(num_threads, name, method); + } + } + } + + private: + struct ThreadArg { + Benchmark* bm; + SharedState* shared; + ThreadState* thread; + void (Benchmark::*method)(ThreadState*); + }; + + static void ThreadBody(void* v) { + ThreadArg* arg = reinterpret_cast(v); + SharedState* shared = arg->shared; + ThreadState* thread = arg->thread; + { + MutexLock l(&shared->mu); + shared->num_initialized++; + if (shared->num_initialized >= shared->total) { + shared->cv.SignalAll(); + } + while (!shared->start) { + shared->cv.Wait(); + } + } + + thread->stats.Start(); + (arg->bm->*(arg->method))(thread); + thread->stats.Stop(); + + { + MutexLock l(&shared->mu); + shared->num_done++; + if (shared->num_done >= shared->total) { + shared->cv.SignalAll(); + } + } + } + + void RunBenchmark(int n, Slice name, + void (Benchmark::*method)(ThreadState*)) { + SharedState shared(n); + + ThreadArg* arg = new ThreadArg[n]; + for (int i = 0; i < n; i++) { + arg[i].bm = this; + arg[i].method = method; + arg[i].shared = &shared; + ++total_thread_count_; + // Seed the thread's random state deterministically based upon thread + // creation across all benchmarks. This ensures that the seeds are unique + // but reproducible when rerunning the same set of benchmarks. + arg[i].thread = new ThreadState(i, /*seed=*/1000 + total_thread_count_); + arg[i].thread->shared = &shared; + g_env->StartThread(ThreadBody, &arg[i]); + } + + shared.mu.Lock(); + while (shared.num_initialized < n) { + shared.cv.Wait(); + } + + shared.start = true; + shared.cv.SignalAll(); + while (shared.num_done < n) { + shared.cv.Wait(); + } + shared.mu.Unlock(); + + for (int i = 1; i < n; i++) { + arg[0].thread->stats.Merge(arg[i].thread->stats); + } + arg[0].thread->stats.Report(name); + if (FLAGS_comparisons) { + fprintf(stdout, "Comparisons: %zu\n", count_comparator_.comparisons()); + count_comparator_.reset(); + fflush(stdout); + } + + for (int i = 0; i < n; i++) { + delete arg[i].thread; + } + delete[] arg; + } + + void Crc32c(ThreadState* thread) { + // Checksum about 500MB of data total + const int size = 4096; + const char* label = "(4K per op)"; + std::string data(size, 'x'); + int64_t bytes = 0; + uint32_t crc = 0; + while (bytes < 500 * 1048576) { + crc = crc32c::Value(data.data(), size); + thread->stats.FinishedSingleOp(); + bytes += size; + } + // Print so result is not dead + std::fprintf(stderr, "... crc=0x%x\r", static_cast(crc)); + + thread->stats.AddBytes(bytes); + thread->stats.AddMessage(label); + } + + void SnappyCompress(ThreadState* thread) { + Compress(thread, "snappy", &port::Snappy_Compress); + } + + void SnappyUncompress(ThreadState* thread) { + Uncompress(thread, "snappy", &port::Snappy_Compress, + &port::Snappy_Uncompress); + } + + void ZstdCompress(ThreadState* thread) { + Compress(thread, "zstd", + [](const char* input, size_t length, std::string* output) { + return port::Zstd_Compress(FLAGS_zstd_compression_level, input, + length, output); + }); + } + + void ZstdUncompress(ThreadState* thread) { + Uncompress( + thread, "zstd", + [](const char* input, size_t length, std::string* output) { + return port::Zstd_Compress(FLAGS_zstd_compression_level, input, + length, output); + }, + &port::Zstd_Uncompress); + } + + void Open() { + assert(db_ == nullptr); + Options options; + options.env = g_env; + options.create_if_missing = !FLAGS_use_existing_db; + options.block_cache = cache_; + options.write_buffer_size = FLAGS_write_buffer_size; + options.max_file_size = FLAGS_max_file_size; + options.block_size = FLAGS_block_size; + if (FLAGS_comparisons) { + options.comparator = &count_comparator_; + } + options.max_open_files = FLAGS_open_files; + options.filter_policy = filter_policy_; + options.reuse_logs = FLAGS_reuse_logs; + options.compression = + FLAGS_compression ? kSnappyCompression : kNoCompression; + // Status s = DB::Open(options, FLAGS_db, &db_); + db_ = new FieldDB(); + Status s = FieldDB::OpenFieldDB(options, FLAGS_db, &db_); + if (!s.ok()) { + std::fprintf(stderr, "open error: %s\n", s.ToString().c_str()); + std::exit(1); + } + } + + void OpenBench(ThreadState* thread) { + for (int i = 0; i < num_; i++) { + delete db_; + Open(); + thread->stats.FinishedSingleOp(); + } + } + + void WriteSeq(ThreadState* thread) { DoWrite(thread, true); } + + void WriteRandom(ThreadState* thread) { DoWrite(thread, false); } + + void DoWrite(ThreadState* thread, bool seq) { + if (num_ != FLAGS_num) { + char msg[100]; + std::snprintf(msg, sizeof(msg), "(%d ops)", num_); + thread->stats.AddMessage(msg); + } + + RandomGenerator gen; + WriteBatch batch; + Status s; + int64_t bytes = 0; + KeyBuffer key; + for (int i = 0; i < num_; i += entries_per_batch_) { + batch.Clear(); + for (int j = 0; j < entries_per_batch_; j++) { + const int k = seq ? i + j : thread->rand.Uniform(FLAGS_num); + key.Set(k); + batch.Put(key.slice(), gen.Generate(value_size_)); + bytes += value_size_ + key.slice().size(); + thread->stats.FinishedSingleOp(); + } + s = db_->Write(write_options_, &batch); + if (!s.ok()) { + std::fprintf(stderr, "put error: %s\n", s.ToString().c_str()); + std::exit(1); + } + } + thread->stats.AddBytes(bytes); + } + + void ReadSequential(ThreadState* thread) { + Iterator* iter = db_->NewIterator(ReadOptions()); + int i = 0; + int64_t bytes = 0; + for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) { + bytes += iter->key().size() + iter->value().size(); + thread->stats.FinishedSingleOp(); + ++i; + } + delete iter; + thread->stats.AddBytes(bytes); + } + + void ReadReverse(ThreadState* thread) { + Iterator* iter = db_->NewIterator(ReadOptions()); + int i = 0; + int64_t bytes = 0; + for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) { + bytes += iter->key().size() + iter->value().size(); + thread->stats.FinishedSingleOp(); + ++i; + } + delete iter; + thread->stats.AddBytes(bytes); + } + + void ReadRandom(ThreadState* thread) { + ReadOptions options; + std::string value; + int found = 0; + KeyBuffer key; + for (int i = 0; i < reads_; i++) { + const int k = thread->rand.Uniform(FLAGS_num); + key.Set(k); + if (db_->Get(options, key.slice(), &value).ok()) { + found++; + } + thread->stats.FinishedSingleOp(); + } + char msg[100]; + std::snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_); + thread->stats.AddMessage(msg); + } + + void ReadMissing(ThreadState* thread) { + ReadOptions options; + std::string value; + KeyBuffer key; + for (int i = 0; i < reads_; i++) { + const int k = thread->rand.Uniform(FLAGS_num); + key.Set(k); + Slice s = Slice(key.slice().data(), key.slice().size() - 1); + db_->Get(options, s, &value); + thread->stats.FinishedSingleOp(); + } + } + + void ReadHot(ThreadState* thread) { + ReadOptions options; + std::string value; + const int range = (FLAGS_num + 99) / 100; + KeyBuffer key; + for (int i = 0; i < reads_; i++) { + const int k = thread->rand.Uniform(range); + key.Set(k); + db_->Get(options, key.slice(), &value); + thread->stats.FinishedSingleOp(); + } + } + + void SeekRandom(ThreadState* thread) { + ReadOptions options; + int found = 0; + KeyBuffer key; + for (int i = 0; i < reads_; i++) { + Iterator* iter = db_->NewIterator(options); + const int k = thread->rand.Uniform(FLAGS_num); + key.Set(k); + iter->Seek(key.slice()); + if (iter->Valid() && iter->key() == key.slice()) found++; + delete iter; + thread->stats.FinishedSingleOp(); + } + char msg[100]; + snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_); + thread->stats.AddMessage(msg); + } + + void SeekOrdered(ThreadState* thread) { + ReadOptions options; + Iterator* iter = db_->NewIterator(options); + int found = 0; + int k = 0; + KeyBuffer key; + for (int i = 0; i < reads_; i++) { + k = (k + (thread->rand.Uniform(100))) % FLAGS_num; + key.Set(k); + iter->Seek(key.slice()); + if (iter->Valid() && iter->key() == key.slice()) found++; + thread->stats.FinishedSingleOp(); + } + delete iter; + char msg[100]; + std::snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_); + thread->stats.AddMessage(msg); + } + + void DoDelete(ThreadState* thread, bool seq) { + RandomGenerator gen; + WriteBatch batch; + Status s; + KeyBuffer key; + for (int i = 0; i < num_; i += entries_per_batch_) { + batch.Clear(); + for (int j = 0; j < entries_per_batch_; j++) { + const int k = seq ? i + j : (thread->rand.Uniform(FLAGS_num)); + key.Set(k); + batch.Delete(key.slice()); + thread->stats.FinishedSingleOp(); + } + s = db_->Write(write_options_, &batch); + if (!s.ok()) { + std::fprintf(stderr, "del error: %s\n", s.ToString().c_str()); + std::exit(1); + } + } + } + + void DeleteSeq(ThreadState* thread) { DoDelete(thread, true); } + + void DeleteRandom(ThreadState* thread) { DoDelete(thread, false); } + + void ReadWhileWriting(ThreadState* thread) { + if (thread->tid > 0) { + ReadRandom(thread); + } else { + // Special thread that keeps writing until other threads are done. + RandomGenerator gen; + KeyBuffer key; + while (true) { + { + MutexLock l(&thread->shared->mu); + if (thread->shared->num_done + 1 >= thread->shared->num_initialized) { + // Other threads have finished + break; + } + } + + const int k = thread->rand.Uniform(FLAGS_num); + key.Set(k); + Status s = + db_->Put(write_options_, key.slice(), gen.Generate(value_size_)); + if (!s.ok()) { + std::fprintf(stderr, "put error: %s\n", s.ToString().c_str()); + std::exit(1); + } + } + + // Do not count any of the preceding work/delay in stats. + thread->stats.Start(); + } + } + + void Compact(ThreadState* thread) { db_->CompactRange(nullptr, nullptr); } + + void PrintStats(const char* key) { + std::string stats; + if (!db_->GetProperty(key, &stats)) { + stats = "(failed)"; + } + std::fprintf(stdout, "\n%s\n", stats.c_str()); + } + + static void WriteToFile(void* arg, const char* buf, int n) { + reinterpret_cast(arg)->Append(Slice(buf, n)); + } + + void HeapProfile() { + char fname[100]; + std::snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, + ++heap_counter_); + WritableFile* file; + Status s = g_env->NewWritableFile(fname, &file); + if (!s.ok()) { + std::fprintf(stderr, "%s\n", s.ToString().c_str()); + return; + } + bool ok = port::GetHeapProfile(WriteToFile, file); + delete file; + if (!ok) { + std::fprintf(stderr, "heap profiling not supported\n"); + g_env->RemoveFile(fname); + } + } +}; + +} // namespace leveldb + +int main(int argc, char** argv) { + FLAGS_write_buffer_size = leveldb::Options().write_buffer_size; + FLAGS_max_file_size = leveldb::Options().max_file_size; + FLAGS_block_size = leveldb::Options().block_size; + FLAGS_open_files = leveldb::Options().max_open_files; + std::string default_db_path; + + for (int i = 1; i < argc; i++) { + double d; + int n; + char junk; + if (leveldb::Slice(argv[i]).starts_with("--benchmarks=")) { + FLAGS_benchmarks = argv[i] + strlen("--benchmarks="); + } else if (sscanf(argv[i], "--compression_ratio=%lf%c", &d, &junk) == 1) { + FLAGS_compression_ratio = d; + } else if (sscanf(argv[i], "--histogram=%d%c", &n, &junk) == 1 && + (n == 0 || n == 1)) { + FLAGS_histogram = n; + } else if (sscanf(argv[i], "--comparisons=%d%c", &n, &junk) == 1 && + (n == 0 || n == 1)) { + FLAGS_comparisons = n; + } else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 && + (n == 0 || n == 1)) { + FLAGS_use_existing_db = n; + } else if (sscanf(argv[i], "--reuse_logs=%d%c", &n, &junk) == 1 && + (n == 0 || n == 1)) { + FLAGS_reuse_logs = n; + } else if (sscanf(argv[i], "--compression=%d%c", &n, &junk) == 1 && + (n == 0 || n == 1)) { + FLAGS_compression = n; + } else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) { + FLAGS_num = n; + } else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) { + FLAGS_reads = n; + } else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) { + FLAGS_threads = n; + } else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) { + FLAGS_value_size = n; + } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) { + FLAGS_write_buffer_size = n; + } else if (sscanf(argv[i], "--max_file_size=%d%c", &n, &junk) == 1) { + FLAGS_max_file_size = n; + } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) { + FLAGS_block_size = n; + } else if (sscanf(argv[i], "--key_prefix=%d%c", &n, &junk) == 1) { + FLAGS_key_prefix = n; + } else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) { + FLAGS_cache_size = n; + } else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) { + FLAGS_bloom_bits = n; + } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) { + FLAGS_open_files = n; + } else if (strncmp(argv[i], "--db=", 5) == 0) { + FLAGS_db = argv[i] + 5; + } else { + std::fprintf(stderr, "Invalid flag '%s'\n", argv[i]); + std::exit(1); + } + } + + leveldb::g_env = leveldb::Env::Default(); + + // Choose a location for the test database if none given with --db= + if (FLAGS_db == nullptr) { + leveldb::g_env->GetTestDirectory(&default_db_path); + default_db_path += "/dbbench"; + FLAGS_db = default_db_path.c_str(); + } + + leveldb::Benchmark benchmark; + benchmark.Run(); + return 0; +} diff --git a/benchmarks/db_bench_testDB.cc b/benchmarks/db_bench_testDB.cc new file mode 100644 index 0000000..6191132 --- /dev/null +++ b/benchmarks/db_bench_testDB.cc @@ -0,0 +1,1145 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include + +#include +#include +#include + +#include "leveldb/cache.h" +#include "leveldb/comparator.h" +#include "leveldb/db.h" +#include "leveldb/env.h" +#include "leveldb/filter_policy.h" +#include "leveldb/status.h" +#include "leveldb/write_batch.h" +#include "port/port.h" +#include "util/crc32c.h" +#include "util/histogram.h" +#include "util/mutexlock.h" +#include "util/random.h" +#include "util/testutil.h" + +// #include "fielddb/field_db.h" +// using namespace fielddb; +#include "testdb/testdb.h" +using namespace testdb; +// Comma-separated list of operations to run in the specified order +// Actual benchmarks: +// fillseq -- write N values in sequential key order in async mode +// fillrandom -- write N values in random key order in async mode +// overwrite -- overwrite N values in random key order in async mode +// fillsync -- write N/100 values in random key order in sync mode +// fill100K -- write N/1000 100K values in random order in async mode +// deleteseq -- delete N keys in sequential order +// deleterandom -- delete N keys in random order +// readseq -- read N times sequentially +// readreverse -- read N times in reverse order +// readrandom -- read N times in random order +// readmissing -- read N missing keys in random order +// readhot -- read N times in random order from 1% section of DB +// seekrandom -- N random seeks +// seekordered -- N ordered seeks +// open -- cost of opening a DB +// crc32c -- repeated crc32c of 4K of data +// Meta operations: +// compact -- Compact the entire DB +// stats -- Print DB stats +// sstables -- Print sstable info +// heapprofile -- Dump a heap profile (if supported by this port) +static const char* FLAGS_benchmarks = + "fillseq," + "fillsync," + "fillrandom," + "overwrite," + "readrandom," + "readrandom," // Extra run to allow previous compactions to quiesce + "readseq," + "readreverse," + "compact," + "readrandom," + "readseq," + "readreverse," + "fill100K," + "crc32c," + "snappycomp," + "snappyuncomp," + "zstdcomp," + "zstduncomp,"; + +// Number of key/values to place in database +static int FLAGS_num = 1000000; + +// Number of read operations to do. If negative, do FLAGS_num reads. +static int FLAGS_reads = -1; + +// Number of concurrent threads to run. +static int FLAGS_threads = 1; + +// Size of each value +static int FLAGS_value_size = 100; + +// Arrange to generate values that shrink to this fraction of +// their original size after compression +static double FLAGS_compression_ratio = 0.5; + +// Print histogram of operation timings +static bool FLAGS_histogram = false; + +// Count the number of string comparisons performed +static bool FLAGS_comparisons = false; + +// Number of bytes to buffer in memtable before compacting +// (initialized to default value by "main") +static int FLAGS_write_buffer_size = 0; + +// Number of bytes written to each file. +// (initialized to default value by "main") +static int FLAGS_max_file_size = 0; + +// Approximate size of user data packed per block (before compression. +// (initialized to default value by "main") +static int FLAGS_block_size = 0; + +// Number of bytes to use as a cache of uncompressed data. +// Negative means use default settings. +static int FLAGS_cache_size = -1; + +// Maximum number of files to keep open at the same time (use default if == 0) +static int FLAGS_open_files = 0; + +// Bloom filter bits per key. +// Negative means use default settings. +static int FLAGS_bloom_bits = -1; + +// Common key prefix length. +static int FLAGS_key_prefix = 0; + +// If true, do not destroy the existing database. If you set this +// flag and also specify a benchmark that wants a fresh database, that +// benchmark will fail. +static bool FLAGS_use_existing_db = false; + +// If true, reuse existing log/MANIFEST files when re-opening a database. +static bool FLAGS_reuse_logs = false; + +// If true, use compression. +static bool FLAGS_compression = true; + +// Use the db with the following name. +static const char* FLAGS_db = nullptr; + +// ZSTD compression level to try out +static int FLAGS_zstd_compression_level = 1; + +namespace leveldb { + +namespace { +leveldb::Env* g_env = nullptr; + +class CountComparator : public Comparator { + public: + CountComparator(const Comparator* wrapped) : wrapped_(wrapped) {} + ~CountComparator() override {} + int Compare(const Slice& a, const Slice& b) const override { + count_.fetch_add(1, std::memory_order_relaxed); + return wrapped_->Compare(a, b); + } + const char* Name() const override { return wrapped_->Name(); } + void FindShortestSeparator(std::string* start, + const Slice& limit) const override { + wrapped_->FindShortestSeparator(start, limit); + } + + void FindShortSuccessor(std::string* key) const override { + return wrapped_->FindShortSuccessor(key); + } + + size_t comparisons() const { return count_.load(std::memory_order_relaxed); } + + void reset() { count_.store(0, std::memory_order_relaxed); } + + private: + mutable std::atomic count_{0}; + const Comparator* const wrapped_; +}; + +// Helper for quickly generating random data. +class RandomGenerator { + private: + std::string data_; + int pos_; + + public: + RandomGenerator() { + // We use a limited amount of data over and over again and ensure + // that it is larger than the compression window (32KB), and also + // large enough to serve all typical value sizes we want to write. + Random rnd(301); + std::string piece; + while (data_.size() < 1048576) { + // Add a short fragment that is as compressible as specified + // by FLAGS_compression_ratio. + test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece); + data_.append(piece); + } + pos_ = 0; + } + + Slice Generate(size_t len) { + if (pos_ + len > data_.size()) { + pos_ = 0; + assert(len < data_.size()); + } + pos_ += len; + return Slice(data_.data() + pos_ - len, len); + } +}; + +class KeyBuffer { + public: + KeyBuffer() { + assert(FLAGS_key_prefix < sizeof(buffer_)); + memset(buffer_, 'a', FLAGS_key_prefix); + } + KeyBuffer& operator=(KeyBuffer& other) = delete; + KeyBuffer(KeyBuffer& other) = delete; + + void Set(int k) { + std::snprintf(buffer_ + FLAGS_key_prefix, + sizeof(buffer_) - FLAGS_key_prefix, "%016d", k); + } + + Slice slice() const { return Slice(buffer_, FLAGS_key_prefix + 16); } + + private: + char buffer_[1024]; +}; + +#if defined(__linux) +static Slice TrimSpace(Slice s) { + size_t start = 0; + while (start < s.size() && isspace(s[start])) { + start++; + } + size_t limit = s.size(); + while (limit > start && isspace(s[limit - 1])) { + limit--; + } + return Slice(s.data() + start, limit - start); +} +#endif + +static void AppendWithSpace(std::string* str, Slice msg) { + if (msg.empty()) return; + if (!str->empty()) { + str->push_back(' '); + } + str->append(msg.data(), msg.size()); +} + +class Stats { + private: + double start_; + double finish_; + double seconds_; + int done_; + int next_report_; + int64_t bytes_; + double last_op_finish_; + Histogram hist_; + std::string message_; + + public: + Stats() { Start(); } + + void Start() { + next_report_ = 100; + hist_.Clear(); + done_ = 0; + bytes_ = 0; + seconds_ = 0; + message_.clear(); + start_ = finish_ = last_op_finish_ = g_env->NowMicros(); + } + + void Merge(const Stats& other) { + hist_.Merge(other.hist_); + done_ += other.done_; + bytes_ += other.bytes_; + seconds_ += other.seconds_; + if (other.start_ < start_) start_ = other.start_; + if (other.finish_ > finish_) finish_ = other.finish_; + + // Just keep the messages from one thread + if (message_.empty()) message_ = other.message_; + } + + void Stop() { + finish_ = g_env->NowMicros(); + seconds_ = (finish_ - start_) * 1e-6; + } + + void AddMessage(Slice msg) { AppendWithSpace(&message_, msg); } + + void FinishedSingleOp() { + if (FLAGS_histogram) { + double now = g_env->NowMicros(); + double micros = now - last_op_finish_; + hist_.Add(micros); + if (micros > 20000) { + std::fprintf(stderr, "long op: %.1f micros%30s\r", micros, ""); + std::fflush(stderr); + } + last_op_finish_ = now; + } + + done_++; + if (done_ >= next_report_) { + if (next_report_ < 1000) + next_report_ += 100; + else if (next_report_ < 5000) + next_report_ += 500; + else if (next_report_ < 10000) + next_report_ += 1000; + else if (next_report_ < 50000) + next_report_ += 5000; + else if (next_report_ < 100000) + next_report_ += 10000; + else if (next_report_ < 500000) + next_report_ += 50000; + else + next_report_ += 100000; + std::fprintf(stderr, "... finished %d ops%30s\r", done_, ""); + std::fflush(stderr); + } + } + + void AddBytes(int64_t n) { bytes_ += n; } + + void Report(const Slice& name) { + // Pretend at least one op was done in case we are running a benchmark + // that does not call FinishedSingleOp(). + if (done_ < 1) done_ = 1; + + std::string extra; + if (bytes_ > 0) { + // Rate is computed on actual elapsed time, not the sum of per-thread + // elapsed times. + double elapsed = (finish_ - start_) * 1e-6; + char rate[100]; + std::snprintf(rate, sizeof(rate), "%6.1f MB/s", + (bytes_ / 1048576.0) / elapsed); + extra = rate; + } + AppendWithSpace(&extra, message_); + + std::fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n", + name.ToString().c_str(), seconds_ * 1e6 / done_, + (extra.empty() ? "" : " "), extra.c_str()); + if (FLAGS_histogram) { + std::fprintf(stdout, "Microseconds per op:\n%s\n", + hist_.ToString().c_str()); + } + std::fflush(stdout); + } +}; + +// State shared by all concurrent executions of the same benchmark. +struct SharedState { + port::Mutex mu; + port::CondVar cv GUARDED_BY(mu); + int total GUARDED_BY(mu); + + // Each thread goes through the following states: + // (1) initializing + // (2) waiting for others to be initialized + // (3) running + // (4) done + + int num_initialized GUARDED_BY(mu); + int num_done GUARDED_BY(mu); + bool start GUARDED_BY(mu); + + SharedState(int total) + : cv(&mu), total(total), num_initialized(0), num_done(0), start(false) {} +}; + +// Per-thread state for concurrent executions of the same benchmark. +struct ThreadState { + int tid; // 0..n-1 when running in n threads + Random rand; // Has different seeds for different threads + Stats stats; + SharedState* shared; + + ThreadState(int index, int seed) : tid(index), rand(seed), shared(nullptr) {} +}; + +void Compress( + ThreadState* thread, std::string name, + std::function compress_func) { + RandomGenerator gen; + Slice input = gen.Generate(Options().block_size); + int64_t bytes = 0; + int64_t produced = 0; + bool ok = true; + std::string compressed; + while (ok && bytes < 1024 * 1048576) { // Compress 1G + ok = compress_func(input.data(), input.size(), &compressed); + produced += compressed.size(); + bytes += input.size(); + thread->stats.FinishedSingleOp(); + } + + if (!ok) { + thread->stats.AddMessage("(" + name + " failure)"); + } else { + char buf[100]; + std::snprintf(buf, sizeof(buf), "(output: %.1f%%)", + (produced * 100.0) / bytes); + thread->stats.AddMessage(buf); + thread->stats.AddBytes(bytes); + } +} + +void Uncompress( + ThreadState* thread, std::string name, + std::function compress_func, + std::function uncompress_func) { + RandomGenerator gen; + Slice input = gen.Generate(Options().block_size); + std::string compressed; + bool ok = compress_func(input.data(), input.size(), &compressed); + int64_t bytes = 0; + char* uncompressed = new char[input.size()]; + while (ok && bytes < 1024 * 1048576) { // Compress 1G + ok = uncompress_func(compressed.data(), compressed.size(), uncompressed); + bytes += input.size(); + thread->stats.FinishedSingleOp(); + } + delete[] uncompressed; + + if (!ok) { + thread->stats.AddMessage("(" + name + " failure)"); + } else { + thread->stats.AddBytes(bytes); + } +} + +} // namespace + +class Benchmark { + private: + Cache* cache_; + const FilterPolicy* filter_policy_; + testDB* db_; + int num_; + int value_size_; + int entries_per_batch_; + WriteOptions write_options_; + int reads_; + int heap_counter_; + CountComparator count_comparator_; + int total_thread_count_; + + void PrintHeader() { + const int kKeySize = 16 + FLAGS_key_prefix; + PrintEnvironment(); + std::fprintf(stdout, "Keys: %d bytes each\n", kKeySize); + std::fprintf( + stdout, "Values: %d bytes each (%d bytes after compression)\n", + FLAGS_value_size, + static_cast(FLAGS_value_size * FLAGS_compression_ratio + 0.5)); + std::fprintf(stdout, "Entries: %d\n", num_); + std::fprintf(stdout, "RawSize: %.1f MB (estimated)\n", + ((static_cast(kKeySize + FLAGS_value_size) * num_) / + 1048576.0)); + std::fprintf( + stdout, "FileSize: %.1f MB (estimated)\n", + (((kKeySize + FLAGS_value_size * FLAGS_compression_ratio) * num_) / + 1048576.0)); + PrintWarnings(); + std::fprintf(stdout, "------------------------------------------------\n"); + } + + void PrintWarnings() { +#if defined(__GNUC__) && !defined(__OPTIMIZE__) + std::fprintf( + stdout, + "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"); +#endif +#ifndef NDEBUG + std::fprintf( + stdout, + "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n"); +#endif + + // See if snappy is working by attempting to compress a compressible string + const char text[] = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy"; + std::string compressed; + if (!port::Snappy_Compress(text, sizeof(text), &compressed)) { + std::fprintf(stdout, "WARNING: Snappy compression is not enabled\n"); + } else if (compressed.size() >= sizeof(text)) { + std::fprintf(stdout, "WARNING: Snappy compression is not effective\n"); + } + } + + void PrintEnvironment() { + std::fprintf(stderr, "LevelDB: version %d.%d\n", kMajorVersion, + kMinorVersion); + +#if defined(__linux) + time_t now = time(nullptr); + std::fprintf(stderr, "Date: %s", + ctime(&now)); // ctime() adds newline + + FILE* cpuinfo = std::fopen("/proc/cpuinfo", "r"); + if (cpuinfo != nullptr) { + char line[1000]; + int num_cpus = 0; + std::string cpu_type; + std::string cache_size; + while (fgets(line, sizeof(line), cpuinfo) != nullptr) { + const char* sep = strchr(line, ':'); + if (sep == nullptr) { + continue; + } + Slice key = TrimSpace(Slice(line, sep - 1 - line)); + Slice val = TrimSpace(Slice(sep + 1)); + if (key == "model name") { + ++num_cpus; + cpu_type = val.ToString(); + } else if (key == "cache size") { + cache_size = val.ToString(); + } + } + std::fclose(cpuinfo); + std::fprintf(stderr, "CPU: %d * %s\n", num_cpus, cpu_type.c_str()); + std::fprintf(stderr, "CPUCache: %s\n", cache_size.c_str()); + } +#endif + } + + public: + Benchmark() + : cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : nullptr), + filter_policy_(FLAGS_bloom_bits >= 0 + ? NewBloomFilterPolicy(FLAGS_bloom_bits) + : nullptr), + db_(nullptr), + num_(FLAGS_num), + value_size_(FLAGS_value_size), + entries_per_batch_(1), + reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads), + heap_counter_(0), + count_comparator_(BytewiseComparator()), + total_thread_count_(0) { + std::vector files; + g_env->GetChildren(FLAGS_db, &files); + for (size_t i = 0; i < files.size(); i++) { + if (Slice(files[i]).starts_with("heap-")) { + g_env->RemoveFile(std::string(FLAGS_db) + "/" + files[i]); + } + } + if (!FLAGS_use_existing_db) { + DestroyDB(FLAGS_db, Options()); + } + } + + ~Benchmark() { + delete db_; + delete cache_; + delete filter_policy_; + } + + void Run() { + PrintHeader(); + Open(); + + const char* benchmarks = FLAGS_benchmarks; + while (benchmarks != nullptr) { + const char* sep = strchr(benchmarks, ','); + Slice name; + if (sep == nullptr) { + name = benchmarks; + benchmarks = nullptr; + } else { + name = Slice(benchmarks, sep - benchmarks); + benchmarks = sep + 1; + } + + // Reset parameters that may be overridden below + num_ = FLAGS_num; + reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads); + value_size_ = FLAGS_value_size; + entries_per_batch_ = 1; + write_options_ = WriteOptions(); + + void (Benchmark::*method)(ThreadState*) = nullptr; + bool fresh_db = false; + int num_threads = FLAGS_threads; + + if (name == Slice("open")) { + method = &Benchmark::OpenBench; + num_ /= 10000; + if (num_ < 1) num_ = 1; + } else if (name == Slice("fillseq")) { + fresh_db = true; + method = &Benchmark::WriteSeq; + } else if (name == Slice("fillbatch")) { + fresh_db = true; + entries_per_batch_ = 1000; + method = &Benchmark::WriteSeq; + } else if (name == Slice("fillrandom")) { + fresh_db = true; + method = &Benchmark::WriteRandom; + } else if (name == Slice("overwrite")) { + fresh_db = false; + method = &Benchmark::WriteRandom; + } else if (name == Slice("fillsync")) { + fresh_db = true; + num_ /= 1000; + write_options_.sync = true; + method = &Benchmark::WriteRandom; + } else if (name == Slice("fill100K")) { + fresh_db = true; + num_ /= 1000; + value_size_ = 100 * 1000; + method = &Benchmark::WriteRandom; + } else if (name == Slice("readseq")) { + method = &Benchmark::ReadSequential; + } else if (name == Slice("readreverse")) { + method = &Benchmark::ReadReverse; + } else if (name == Slice("readrandom")) { + method = &Benchmark::ReadRandom; + } else if (name == Slice("readmissing")) { + method = &Benchmark::ReadMissing; + } else if (name == Slice("seekrandom")) { + method = &Benchmark::SeekRandom; + } else if (name == Slice("seekordered")) { + method = &Benchmark::SeekOrdered; + } else if (name == Slice("readhot")) { + method = &Benchmark::ReadHot; + } else if (name == Slice("readrandomsmall")) { + reads_ /= 1000; + method = &Benchmark::ReadRandom; + } else if (name == Slice("deleteseq")) { + method = &Benchmark::DeleteSeq; + } else if (name == Slice("deleterandom")) { + method = &Benchmark::DeleteRandom; + } else if (name == Slice("readwhilewriting")) { + num_threads++; // Add extra thread for writing + method = &Benchmark::ReadWhileWriting; + } else if (name == Slice("compact")) { + method = &Benchmark::Compact; + } else if (name == Slice("crc32c")) { + method = &Benchmark::Crc32c; + } else if (name == Slice("snappycomp")) { + method = &Benchmark::SnappyCompress; + } else if (name == Slice("snappyuncomp")) { + method = &Benchmark::SnappyUncompress; + } else if (name == Slice("zstdcomp")) { + method = &Benchmark::ZstdCompress; + } else if (name == Slice("zstduncomp")) { + method = &Benchmark::ZstdUncompress; + } else if (name == Slice("heapprofile")) { + HeapProfile(); + } else if (name == Slice("stats")) { + PrintStats("leveldb.stats"); + } else if (name == Slice("sstables")) { + PrintStats("leveldb.sstables"); + } else { + if (!name.empty()) { // No error message for empty name + std::fprintf(stderr, "unknown benchmark '%s'\n", + name.ToString().c_str()); + } + } + + if (fresh_db) { + if (FLAGS_use_existing_db) { + std::fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n", + name.ToString().c_str()); + method = nullptr; + } else { + delete db_; + db_ = nullptr; + DestroyDB(FLAGS_db, Options()); + Open(); + } + } + + if (method != nullptr) { + RunBenchmark(num_threads, name, method); + } + } + } + + private: + struct ThreadArg { + Benchmark* bm; + SharedState* shared; + ThreadState* thread; + void (Benchmark::*method)(ThreadState*); + }; + + static void ThreadBody(void* v) { + ThreadArg* arg = reinterpret_cast(v); + SharedState* shared = arg->shared; + ThreadState* thread = arg->thread; + { + MutexLock l(&shared->mu); + shared->num_initialized++; + if (shared->num_initialized >= shared->total) { + shared->cv.SignalAll(); + } + while (!shared->start) { + shared->cv.Wait(); + } + } + + thread->stats.Start(); + (arg->bm->*(arg->method))(thread); + thread->stats.Stop(); + + { + MutexLock l(&shared->mu); + shared->num_done++; + if (shared->num_done >= shared->total) { + shared->cv.SignalAll(); + } + } + } + + void RunBenchmark(int n, Slice name, + void (Benchmark::*method)(ThreadState*)) { + SharedState shared(n); + + ThreadArg* arg = new ThreadArg[n]; + for (int i = 0; i < n; i++) { + arg[i].bm = this; + arg[i].method = method; + arg[i].shared = &shared; + ++total_thread_count_; + // Seed the thread's random state deterministically based upon thread + // creation across all benchmarks. This ensures that the seeds are unique + // but reproducible when rerunning the same set of benchmarks. + arg[i].thread = new ThreadState(i, /*seed=*/1000 + total_thread_count_); + arg[i].thread->shared = &shared; + g_env->StartThread(ThreadBody, &arg[i]); + } + + shared.mu.Lock(); + while (shared.num_initialized < n) { + shared.cv.Wait(); + } + + shared.start = true; + shared.cv.SignalAll(); + while (shared.num_done < n) { + shared.cv.Wait(); + } + shared.mu.Unlock(); + + for (int i = 1; i < n; i++) { + arg[0].thread->stats.Merge(arg[i].thread->stats); + } + arg[0].thread->stats.Report(name); + if (FLAGS_comparisons) { + fprintf(stdout, "Comparisons: %zu\n", count_comparator_.comparisons()); + count_comparator_.reset(); + fflush(stdout); + } + + for (int i = 0; i < n; i++) { + delete arg[i].thread; + } + delete[] arg; + } + + void Crc32c(ThreadState* thread) { + // Checksum about 500MB of data total + const int size = 4096; + const char* label = "(4K per op)"; + std::string data(size, 'x'); + int64_t bytes = 0; + uint32_t crc = 0; + while (bytes < 500 * 1048576) { + crc = crc32c::Value(data.data(), size); + thread->stats.FinishedSingleOp(); + bytes += size; + } + // Print so result is not dead + std::fprintf(stderr, "... crc=0x%x\r", static_cast(crc)); + + thread->stats.AddBytes(bytes); + thread->stats.AddMessage(label); + } + + void SnappyCompress(ThreadState* thread) { + Compress(thread, "snappy", &port::Snappy_Compress); + } + + void SnappyUncompress(ThreadState* thread) { + Uncompress(thread, "snappy", &port::Snappy_Compress, + &port::Snappy_Uncompress); + } + + void ZstdCompress(ThreadState* thread) { + Compress(thread, "zstd", + [](const char* input, size_t length, std::string* output) { + return port::Zstd_Compress(FLAGS_zstd_compression_level, input, + length, output); + }); + } + + void ZstdUncompress(ThreadState* thread) { + Uncompress( + thread, "zstd", + [](const char* input, size_t length, std::string* output) { + return port::Zstd_Compress(FLAGS_zstd_compression_level, input, + length, output); + }, + &port::Zstd_Uncompress); + } + + void Open() { + assert(db_ == nullptr); + Options options; + options.env = g_env; + options.create_if_missing = !FLAGS_use_existing_db; + options.block_cache = cache_; + options.write_buffer_size = FLAGS_write_buffer_size; + options.max_file_size = FLAGS_max_file_size; + options.block_size = FLAGS_block_size; + if (FLAGS_comparisons) { + options.comparator = &count_comparator_; + } + options.max_open_files = FLAGS_open_files; + options.filter_policy = filter_policy_; + options.reuse_logs = FLAGS_reuse_logs; + options.compression = + FLAGS_compression ? kSnappyCompression : kNoCompression; + // Status s = DB::Open(options, FLAGS_db, &db_); + db_ = new testDB(); + Status s = testDB::OpentestDB(options, FLAGS_db, &db_); + if (!s.ok()) { + std::fprintf(stderr, "open error: %s\n", s.ToString().c_str()); + std::exit(1); + } + } + + void OpenBench(ThreadState* thread) { + for (int i = 0; i < num_; i++) { + delete db_; + Open(); + thread->stats.FinishedSingleOp(); + } + } + + void WriteSeq(ThreadState* thread) { DoWrite(thread, true); } + + void WriteRandom(ThreadState* thread) { DoWrite(thread, false); } + + void DoWrite(ThreadState* thread, bool seq) { + if (num_ != FLAGS_num) { + char msg[100]; + std::snprintf(msg, sizeof(msg), "(%d ops)", num_); + thread->stats.AddMessage(msg); + } + + RandomGenerator gen; + WriteBatch batch; + Status s; + int64_t bytes = 0; + KeyBuffer key; + for (int i = 0; i < num_; i += entries_per_batch_) { + batch.Clear(); + for (int j = 0; j < entries_per_batch_; j++) { + const int k = seq ? i + j : thread->rand.Uniform(FLAGS_num); + key.Set(k); + batch.Put(key.slice(), gen.Generate(value_size_)); + bytes += value_size_ + key.slice().size(); + thread->stats.FinishedSingleOp(); + } + s = db_->Write(write_options_, &batch); + if (!s.ok()) { + std::fprintf(stderr, "put error: %s\n", s.ToString().c_str()); + std::exit(1); + } + } + thread->stats.AddBytes(bytes); + } + + void ReadSequential(ThreadState* thread) { + Iterator* iter = db_->NewIterator(ReadOptions()); + int i = 0; + int64_t bytes = 0; + for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) { + bytes += iter->key().size() + iter->value().size(); + thread->stats.FinishedSingleOp(); + ++i; + } + delete iter; + thread->stats.AddBytes(bytes); + } + + void ReadReverse(ThreadState* thread) { + Iterator* iter = db_->NewIterator(ReadOptions()); + int i = 0; + int64_t bytes = 0; + for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) { + bytes += iter->key().size() + iter->value().size(); + thread->stats.FinishedSingleOp(); + ++i; + } + delete iter; + thread->stats.AddBytes(bytes); + } + + void ReadRandom(ThreadState* thread) { + ReadOptions options; + std::string value; + int found = 0; + KeyBuffer key; + for (int i = 0; i < reads_; i++) { + const int k = thread->rand.Uniform(FLAGS_num); + key.Set(k); + if (db_->Get(options, key.slice(), &value).ok()) { + found++; + } + thread->stats.FinishedSingleOp(); + } + char msg[100]; + std::snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_); + thread->stats.AddMessage(msg); + } + + void ReadMissing(ThreadState* thread) { + ReadOptions options; + std::string value; + KeyBuffer key; + for (int i = 0; i < reads_; i++) { + const int k = thread->rand.Uniform(FLAGS_num); + key.Set(k); + Slice s = Slice(key.slice().data(), key.slice().size() - 1); + db_->Get(options, s, &value); + thread->stats.FinishedSingleOp(); + } + } + + void ReadHot(ThreadState* thread) { + ReadOptions options; + std::string value; + const int range = (FLAGS_num + 99) / 100; + KeyBuffer key; + for (int i = 0; i < reads_; i++) { + const int k = thread->rand.Uniform(range); + key.Set(k); + db_->Get(options, key.slice(), &value); + thread->stats.FinishedSingleOp(); + } + } + + void SeekRandom(ThreadState* thread) { + ReadOptions options; + int found = 0; + KeyBuffer key; + for (int i = 0; i < reads_; i++) { + Iterator* iter = db_->NewIterator(options); + const int k = thread->rand.Uniform(FLAGS_num); + key.Set(k); + iter->Seek(key.slice()); + if (iter->Valid() && iter->key() == key.slice()) found++; + delete iter; + thread->stats.FinishedSingleOp(); + } + char msg[100]; + snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_); + thread->stats.AddMessage(msg); + } + + void SeekOrdered(ThreadState* thread) { + ReadOptions options; + Iterator* iter = db_->NewIterator(options); + int found = 0; + int k = 0; + KeyBuffer key; + for (int i = 0; i < reads_; i++) { + k = (k + (thread->rand.Uniform(100))) % FLAGS_num; + key.Set(k); + iter->Seek(key.slice()); + if (iter->Valid() && iter->key() == key.slice()) found++; + thread->stats.FinishedSingleOp(); + } + delete iter; + char msg[100]; + std::snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_); + thread->stats.AddMessage(msg); + } + + void DoDelete(ThreadState* thread, bool seq) { + RandomGenerator gen; + WriteBatch batch; + Status s; + KeyBuffer key; + for (int i = 0; i < num_; i += entries_per_batch_) { + batch.Clear(); + for (int j = 0; j < entries_per_batch_; j++) { + const int k = seq ? i + j : (thread->rand.Uniform(FLAGS_num)); + key.Set(k); + batch.Delete(key.slice()); + thread->stats.FinishedSingleOp(); + } + s = db_->Write(write_options_, &batch); + if (!s.ok()) { + std::fprintf(stderr, "del error: %s\n", s.ToString().c_str()); + std::exit(1); + } + } + } + + void DeleteSeq(ThreadState* thread) { DoDelete(thread, true); } + + void DeleteRandom(ThreadState* thread) { DoDelete(thread, false); } + + void ReadWhileWriting(ThreadState* thread) { + if (thread->tid > 0) { + ReadRandom(thread); + } else { + // Special thread that keeps writing until other threads are done. + RandomGenerator gen; + KeyBuffer key; + while (true) { + { + MutexLock l(&thread->shared->mu); + if (thread->shared->num_done + 1 >= thread->shared->num_initialized) { + // Other threads have finished + break; + } + } + + const int k = thread->rand.Uniform(FLAGS_num); + key.Set(k); + Status s = + db_->Put(write_options_, key.slice(), gen.Generate(value_size_)); + if (!s.ok()) { + std::fprintf(stderr, "put error: %s\n", s.ToString().c_str()); + std::exit(1); + } + } + + // Do not count any of the preceding work/delay in stats. + thread->stats.Start(); + } + } + + void Compact(ThreadState* thread) { db_->CompactRange(nullptr, nullptr); } + + void PrintStats(const char* key) { + std::string stats; + if (!db_->GetProperty(key, &stats)) { + stats = "(failed)"; + } + std::fprintf(stdout, "\n%s\n", stats.c_str()); + } + + static void WriteToFile(void* arg, const char* buf, int n) { + reinterpret_cast(arg)->Append(Slice(buf, n)); + } + + void HeapProfile() { + char fname[100]; + std::snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, + ++heap_counter_); + WritableFile* file; + Status s = g_env->NewWritableFile(fname, &file); + if (!s.ok()) { + std::fprintf(stderr, "%s\n", s.ToString().c_str()); + return; + } + bool ok = port::GetHeapProfile(WriteToFile, file); + delete file; + if (!ok) { + std::fprintf(stderr, "heap profiling not supported\n"); + g_env->RemoveFile(fname); + } + } +}; + +} // namespace leveldb + +int main(int argc, char** argv) { + FLAGS_write_buffer_size = leveldb::Options().write_buffer_size; + FLAGS_max_file_size = leveldb::Options().max_file_size; + FLAGS_block_size = leveldb::Options().block_size; + FLAGS_open_files = leveldb::Options().max_open_files; + std::string default_db_path; + + for (int i = 1; i < argc; i++) { + double d; + int n; + char junk; + if (leveldb::Slice(argv[i]).starts_with("--benchmarks=")) { + FLAGS_benchmarks = argv[i] + strlen("--benchmarks="); + } else if (sscanf(argv[i], "--compression_ratio=%lf%c", &d, &junk) == 1) { + FLAGS_compression_ratio = d; + } else if (sscanf(argv[i], "--histogram=%d%c", &n, &junk) == 1 && + (n == 0 || n == 1)) { + FLAGS_histogram = n; + } else if (sscanf(argv[i], "--comparisons=%d%c", &n, &junk) == 1 && + (n == 0 || n == 1)) { + FLAGS_comparisons = n; + } else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 && + (n == 0 || n == 1)) { + FLAGS_use_existing_db = n; + } else if (sscanf(argv[i], "--reuse_logs=%d%c", &n, &junk) == 1 && + (n == 0 || n == 1)) { + FLAGS_reuse_logs = n; + } else if (sscanf(argv[i], "--compression=%d%c", &n, &junk) == 1 && + (n == 0 || n == 1)) { + FLAGS_compression = n; + } else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) { + FLAGS_num = n; + } else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) { + FLAGS_reads = n; + } else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) { + FLAGS_threads = n; + } else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) { + FLAGS_value_size = n; + } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) { + FLAGS_write_buffer_size = n; + } else if (sscanf(argv[i], "--max_file_size=%d%c", &n, &junk) == 1) { + FLAGS_max_file_size = n; + } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) { + FLAGS_block_size = n; + } else if (sscanf(argv[i], "--key_prefix=%d%c", &n, &junk) == 1) { + FLAGS_key_prefix = n; + } else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) { + FLAGS_cache_size = n; + } else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) { + FLAGS_bloom_bits = n; + } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) { + FLAGS_open_files = n; + } else if (strncmp(argv[i], "--db=", 5) == 0) { + FLAGS_db = argv[i] + 5; + } else { + std::fprintf(stderr, "Invalid flag '%s'\n", argv[i]); + std::exit(1); + } + } + + leveldb::g_env = leveldb::Env::Default(); + + // Choose a location for the test database if none given with --db= + if (FLAGS_db == nullptr) { + leveldb::g_env->GetTestDirectory(&default_db_path); + default_db_path += "/dbbench"; + FLAGS_db = default_db_path.c_str(); + } + + leveldb::Benchmark benchmark; + benchmark.Run(); + return 0; +} diff --git a/fielddb/field_db.cpp b/fielddb/field_db.cpp index 9bd93f1..c42a370 100644 --- a/fielddb/field_db.cpp +++ b/fielddb/field_db.cpp @@ -1,9 +1,14 @@ #include "fielddb/field_db.h" +#include #include #include +#include #include +#include #include #include +#include "leveldb/c.h" +#include "leveldb/cache.h" #include "leveldb/db.h" #include "leveldb/env.h" #include "leveldb/iterator.h" @@ -22,7 +27,7 @@ namespace fielddb { using namespace leveldb; //TODO:打开fieldDB -Status FieldDB::OpenFieldDB(const Options& options, +Status FieldDB::OpenFieldDB(Options& options, const std::string& name, FieldDB** dbptr) { // options.env->CreateDir("./abc") if(*dbptr == nullptr){ @@ -32,11 +37,18 @@ Status FieldDB::OpenFieldDB(const Options& options, // Status status; DB *indexdb, *kvdb, *metadb; + // options.block_cache = NewLRUCache(ULONG_MAX); + // options.max_open_files = 1000; + // options.write_buffer_size = 512 * 1024 * 1024; + // options.env = getPosixEnv(); status = Open(options, name+"_indexDB", &indexdb); if(!status.ok()) return status; - + + // options.env = getPosixEnv(); status = Open(options, name+"_kvDB", &kvdb); if(!status.ok()) return status; + + // options.env = getPosixEnv(); status = Open(options, name+"_metaDB", &metadb); if(!status.ok()) return status; @@ -45,7 +57,7 @@ Status FieldDB::OpenFieldDB(const Options& options, (*dbptr)->metaDB_ = metadb; (*dbptr)->dbname_ = name; - status = (*dbptr)->Recover(); + // status = (*dbptr)->Recover(); (*dbptr)->options_ = &options; (*dbptr)->env_ = options.env; @@ -118,6 +130,7 @@ Request *FieldDB::GetHandleInterval() { } Status FieldDB::HandleRequest(Request &req) { + uint64_t start_ = env_->NowMicros(); MutexLock L(&mutex_); taskqueue_.push_back(&req); Again: @@ -136,33 +149,61 @@ Again: { //1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。 MutexLock iL(&index_mu); + uint64_t start_construct = env_->NowMicros(); for(auto *req_ptr : taskqueue_) { req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, batchKeySet); if(req_ptr == tail) break; } + construct_elapsed += env_->NowMicros() - start_construct; } //2. 首先写入meta,再并发写入index和kv,完成之后清除meta数据 //此处可以放锁是因为写入的有序性可以通过队列来保证 mutex_.Unlock(); + uint64_t start_write = env_->NowMicros(); WriteOptions op; - status = metaDB_->Write(op, &MetaBatch); - assert(status.ok()); + if(MetaBatch.ApproximateSize() > 12) { + uint64_t start_meta = env_->NowMicros(); + status = metaDB_->Write(op, &MetaBatch); + write_meta_elapsed += env_->NowMicros() - start_meta; + write_bytes += MetaBatch.ApproximateSize(); + assert(status.ok()); + } //TODO:index的写入需要在另外一个线程中同时完成 - status = indexDB_->Write(op, &IndexBatch); - assert(status.ok()); - status = kvDB_->Write(op, &KVBatch); - assert(status.ok()); + if(IndexBatch.ApproximateSize() > 12) { + uint64_t start_index = env_->NowMicros(); + status = indexDB_->Write(op, &IndexBatch); + write_index_elapsed += env_->NowMicros() - start_index; + write_bytes += IndexBatch.ApproximateSize(); + assert(status.ok()); + } + if(KVBatch.ApproximateSize() > 12) { + uint64_t start_kv = env_->NowMicros(); + status = kvDB_->Write(op, &KVBatch); + write_kv_elapsed += env_->NowMicros() - start_kv; + write_bytes += KVBatch.ApproximateSize(); + assert(status.ok()); + } //3. 将meta数据清除 - MetaCleaner cleaner; - cleaner.Collect(MetaBatch); - cleaner.CleanMetaBatch(metaDB_); + if(MetaBatch.ApproximateSize() > 12) { + uint64_t start_clean = env_->NowMicros(); + MetaCleaner cleaner; + cleaner.Collect(MetaBatch); + cleaner.CleanMetaBatch(metaDB_); + write_clean_elapsed += env_->NowMicros() - start_clean; + } + write_elapsed += env_->NowMicros() - start_write; mutex_.Lock(); } else { //对于创建和删除索引的请求,通过prepare完成索引状态的更新 MutexLock iL(&index_mu); req.Prepare(this); } - + // { + // static int count = 0; + // if(count++ % 100000 == 0) { + // std::cout << "TaskQueue Size : " << taskqueue_.size() << std::endl; + // } + // } while(true) { Request *ready = taskqueue_.front(); // int debug = tail->type_; @@ -175,6 +216,11 @@ Again: } if (ready == tail) break; } + + elapsed += env_->NowMicros() - start_; + count ++; + dumpStatistics(); + if(!taskqueue_.empty()) { taskqueue_.front()->cond_.Signal(); } @@ -218,8 +264,19 @@ Status FieldDB::Delete(const WriteOptions &options, const Slice &key) { } // TODO:根据updates里面的东西,要对是否需要更新index进行分别处理 Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) { + { + uint64_t start_ = env_->NowMicros(); + Status status = kvDB_->Write(options, updates); + temp_elapsed += env_->NowMicros() - start_; + count ++; + dumpStatistics(); + return status; + } + //或许应该再做一个接口?或者基于现有的接口进行改造 + uint64_t start_ = env_->NowMicros(); BatchReq req(updates,&mutex_); + construct_BatchReq_init_elapsed += env_->NowMicros() - start_; Status status = HandleRequest(req); return status; assert(0); diff --git a/fielddb/field_db.h b/fielddb/field_db.h index f0fe5f2..27e8a86 100644 --- a/fielddb/field_db.h +++ b/fielddb/field_db.h @@ -1,5 +1,7 @@ #include "port/port_stdcxx.h" #include "db/db_impl.h" +#include +#include #include #include #include @@ -30,6 +32,7 @@ public: friend class iCreateReq; friend class iDeleteReq; friend class DeleteReq; + friend class BatchReq; //用的时候必须FieldDB *db = new FieldDB()再open,不能像之前一样DB *db FieldDB() : indexDB_(nullptr), kvDB_(nullptr), metaDB_(nullptr) {}; @@ -55,20 +58,21 @@ public: //返回当前数据库中索引状态,用来测试,不过也可以作为一个功能? IndexStatus GetIndexStatus(const std::string &fieldName); - static Status OpenFieldDB(const Options& options,const std::string& name,FieldDB** dbptr); + static Status OpenFieldDB(Options& options,const std::string& name,FieldDB** dbptr); private: //根据metaDB的内容进行恢复 Status Recover(); private: + leveldb::DB *kvDB_; + leveldb::DB *metaDB_; + leveldb::DB *indexDB_; + std::string dbname_; const Options *options_; Env *env_; - leveldb::DB *metaDB_; - leveldb::DB *indexDB_; - leveldb::DB *kvDB_; using FieldName = std::string; // 标记index的状态,如果是creating/deleting,则会附带相应的请求 @@ -85,6 +89,56 @@ private: Status HandleRequest(Request &req); //每个请求自行构造请求后交由这个函数处理 Request *GetHandleInterval(); //获得任务队列中的待处理区间,区间划分规则和原因见文档 +private: + int count = 0; + int count_Batch = 0; + int count_Batch_Sub = 0; + uint64_t elapsed = 0; + + uint64_t construct_elapsed = 0; + uint64_t construct_BatchReq_init_elapsed = 0; + uint64_t construct_BatchReq_elapsed = 0; + uint64_t construct_BatchReq_Sub_elapsed = 0; + uint64_t construct_BatchReq_perSub_elapsed = 0; + uint64_t construct_FieldsReq_Read_elapsed = 0; + + uint64_t write_elapsed = 0; + uint64_t write_meta_elapsed = 0; + uint64_t write_index_elapsed = 0; + uint64_t write_kv_elapsed = 0; + uint64_t write_clean_elapsed = 0; + + uint64_t write_bytes = 0; + uint64_t write_bytes_lim = 20 * 1024 * 1024; + + uint64_t temp_elapsed = 0; + + inline void dumpStatistics() { + if(count && count % 500000 == 0 || write_bytes && write_bytes > write_bytes_lim) { + std::cout << "=====================================================\n"; + std::cout << "Total Count : " << count; + std::cout << "\tTotal Write Bytes(MB) : " << write_bytes / 1048576.0 << std::endl; + std::cout << "Average Time(ms) : " << elapsed * 1.0 / count; + std::cout << "\tAverage Write rates(MB/s) : " << write_bytes / 1048576.0 / elapsed * 1000000 << std::endl; + std::cout << "Construct Time(ms) : " << construct_elapsed * 1.0 / count << std::endl; + std::cout << "\tConstruct BatchReq Init Time(ms) : " << construct_BatchReq_init_elapsed * 1.0 / count << std::endl; + std::cout << "\tConstruct BatchReq Time(ms) : " << construct_BatchReq_elapsed * 1.0 / count << std::endl; + std::cout << "\tConstruct BatchReq Sub Time(ms) : " << construct_BatchReq_Sub_elapsed * 1.0 / count << std::endl; + std::cout << "\tConstruct BatchReq perSub Time(ms) : " << construct_BatchReq_perSub_elapsed * 1.0 / count_Batch_Sub << std::endl; + std::cout << "\tConstruct FieldsReq Read Time(ms) : " << construct_FieldsReq_Read_elapsed * 1.0 / count << std::endl; + std::cout << "Write Time(ms) : " << write_elapsed * 1.0 / count << std::endl; + std::cout << "\tWrite Meta Time(ms) : " << write_meta_elapsed * 1.0 / count << std::endl; + std::cout << "\tWrite Index Time(ms) : " << write_index_elapsed * 1.0 / count << std::endl; + std::cout << "\tWrite KV Time(ms) : " << write_kv_elapsed * 1.0 / count << std::endl; + std::cout << "\tWrite Clean Time(ms) : " << write_clean_elapsed * 1.0 / count << std::endl; + std::cout << "TaskQueue Size : " << taskqueue_.size() << std::endl; + std::cout << "temp_elased : " << temp_elapsed * 1.0 / count<< std::endl; + // std::cout << MetaBatch.ApproximateSize() << " " << IndexBatch.ApproximateSize() << " " << KVBatch.ApproximateSize() << std::endl; + std::cout << "=====================================================\n"; + write_bytes_lim = write_bytes + 20 * 1024 * 1024; + std::fflush(stdout); + } + } }; Status DestroyDB(const std::string& name, diff --git a/fielddb/meta.cpp b/fielddb/meta.cpp index 13ee09d..11e1241 100644 --- a/fielddb/meta.cpp +++ b/fielddb/meta.cpp @@ -56,13 +56,14 @@ public: }; void MetaCleaner::Collect(WriteBatch &MetaBatch) { + if(MetaBatch.ApproximateSize() <= 12) return; CleanerHandler Handler; Handler.NeedClean = &NeedClean; MetaBatch.Iterate(&Handler); } void MetaCleaner::CleanMetaBatch(DB *metaDB) { - if(NeedClean.ApproximateSize() == 0) return; + if(NeedClean.ApproximateSize() <= 12) return; metaDB->Write(WriteOptions(), &NeedClean); } } \ No newline at end of file diff --git a/fielddb/request.cpp b/fielddb/request.cpp index 35524ee..d7757f9 100644 --- a/fielddb/request.cpp +++ b/fielddb/request.cpp @@ -1,5 +1,6 @@ #include "fielddb/request.h" #include +#include #include #include #include @@ -55,7 +56,10 @@ void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, batchKeySet.insert(*Key); } std::string val_str; - Status s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str); + Status s = Status::NotFound("test"); + uint64_t start_ = DB->env_->NowMicros(); + s = DB->kvDB_->Get(ReadOptions(), *Key, &val_str); + DB->construct_FieldsReq_Read_elapsed += DB->env_->NowMicros() - start_; FieldArray *oldFields; if (s.IsNotFound()){ oldFields = nullptr; @@ -335,8 +339,8 @@ BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu): //为key和value构造存储的地方,防止由于string的析构造成可能得内存访问错误 str_buf->push_back(key.ToString()); FieldArray *field = new FieldArray; - field = ParseValue(value.ToString(), field); - if (field == nullptr){ //batch中的value没有field + // field = ParseValue(value.ToString(), field); + if (field->empty()){ //batch中的value没有field fa_buf->push_back({{"",value.ToString()}}); } else { fa_buf->push_back(*field); @@ -383,18 +387,30 @@ void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch; std::unordered_set Sub_batchKeySet; //由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代 + uint64_t start_ = DB->env_->NowMicros(); for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) { + uint64_t start_sub = DB->env_->NowMicros(); (*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet); + DB->construct_BatchReq_perSub_elapsed += DB->env_->NowMicros() - start_sub; + DB->count_Batch_Sub ++; //所有的对于pendreq的调用传入的参数被改成了this->parent,因此,对于subrequests来说, //pendreq的传参为对应的Batchreq,因此,此处判断batchreq是否pending可以得到subreq是否有冲突 if(isPending()) { return; } } - KVBatch.Append(Sub_KVBatch); - IndexBatch.Append(Sub_IndexBatch); - MetaBatch.Append(Sub_MetaBatch); + DB->construct_BatchReq_Sub_elapsed += DB->env_->NowMicros() - start_; + if(Sub_KVBatch.ApproximateSize() > 12) { + KVBatch.Append(Sub_KVBatch); + } + if(Sub_IndexBatch.ApproximateSize() > 12) { + IndexBatch.Append(Sub_IndexBatch); + } + if(Sub_MetaBatch.ApproximateSize() > 12) { + MetaBatch.Append(Sub_MetaBatch); + } batchKeySet.insert(Sub_batchKeySet.begin(),Sub_batchKeySet.end()); + DB->construct_BatchReq_elapsed += DB->env_->NowMicros() - start_; } diff --git a/include/leveldb/env.h b/include/leveldb/env.h index e00895a..c165487 100644 --- a/include/leveldb/env.h +++ b/include/leveldb/env.h @@ -218,6 +218,8 @@ class LEVELDB_EXPORT Env { virtual void SleepForMicroseconds(int micros) = 0; }; +Env* getPosixEnv(); + // A file abstraction for reading sequentially through a file class LEVELDB_EXPORT SequentialFile { public: diff --git a/testdb/testdb.cc b/testdb/testdb.cc new file mode 100644 index 0000000..6baa209 --- /dev/null +++ b/testdb/testdb.cc @@ -0,0 +1,111 @@ +#include "testdb/testdb.h" +#include "db/db_impl.h" +#include +#include "leveldb/status.h" +using namespace testdb; + +Status testDB::OpentestDB(Options& options, + const std::string& name, testDB** dbptr) { + // options.env->CreateDir("./abc") + if(*dbptr == nullptr){ + return Status::NotSupported(name, "new a testDb first\n"); + } + + // + Status status; + DB *indexdb, *kvdb, *metadb; + // options.block_cache = NewLRUCache(ULONG_MAX); + // options.max_open_files = 1000; + // options.write_buffer_size = 512 * 1024 * 1024; + // options.env = getPosixEnv(); + // status = Open(options, name+"_indexDB", &indexdb); + // if(!status.ok()) return status; + // (*dbptr)->indexDB_ = indexdb; + + // options.env = getPosixEnv(); + status = DB::Open(options, name+"_kvDB", &kvdb); + if(!status.ok()) return status; + (*dbptr)->kvDB_ = kvdb; + + // options.env = getPosixEnv(); + // status = Open(options, name+"_metaDB", &metadb); + // if(!status.ok()) return status; + // (*dbptr)->metaDB_ = metadb; + + (*dbptr)->dbname_ = name; + + // status = (*dbptr)->Recover(); + + (*dbptr)->options_ = &options; + (*dbptr)->env_ = options.env; + return status; +} + +Status testDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) { + return kvDB_->Put(options, key, value); +} + +Status testDB::PutFields(const WriteOptions &, const Slice &key, const FieldArray &tests) { + return Status::OK(); +} + +Status testDB::Delete(const WriteOptions &options, const Slice &key) { + return kvDB_->Delete(options, key); +} + +Status testDB::Write(const WriteOptions &options, WriteBatch *updates) { + return kvDB_->Write(options, updates); +} + +Status testDB::Get(const ReadOptions &options, const Slice &key, std::string *value) { + return kvDB_->Get(options, key, value); +} + +Status testDB::GetFields(const ReadOptions &options, const Slice &key, FieldArray *tests) { + return Status::OK(); +} + +std::vector testDB::FindKeysByField(Field &test) { + return std::vector(); +} + +Iterator * testDB::NewIterator(const ReadOptions &options) { + return kvDB_->NewIterator(options); +} + +const Snapshot * testDB::GetSnapshot() { + return kvDB_->GetSnapshot(); +} + +void testDB::ReleaseSnapshot(const Snapshot *snapshot) { + kvDB_->ReleaseSnapshot(snapshot); +} + +bool testDB::GetProperty(const Slice &property, std::string *value) { + return kvDB_->GetProperty(property, value); +} + +void testDB::GetApproximateSizes(const Range *range, int n, uint64_t *sizes) { + kvDB_->GetApproximateSizes(range, n, sizes); +} + +void testDB::CompactRange(const Slice *begin, const Slice *end) { + kvDB_->CompactRange(begin, end); +} + +Status DestroyDB(const std::string& name, const Options& options) { + Status s; + s = leveldb::DestroyDB(name+"_kvDB", options); + assert(s.ok()); +// s = leveldb::DestroyDB(name+"_indexDB", options); +// assert(s.ok()); +// s = leveldb::DestroyDB(name+"_metaDB", options); +// assert(s.ok()); + return s; +} + +testDB::~testDB() { + delete kvDB_; + // delete indexDB_; + // delete metaDB_; +} \ No newline at end of file diff --git a/testdb/testdb.h b/testdb/testdb.h new file mode 100644 index 0000000..d51598b --- /dev/null +++ b/testdb/testdb.h @@ -0,0 +1,72 @@ +#include "port/port_stdcxx.h" +#include "db/db_impl.h" +#include +#include +#include +#include +#include +#include +#include "leveldb/db.h" +#include "leveldb/env.h" +#include "leveldb/options.h" +#include "leveldb/slice.h" +#include "leveldb/status.h" +#include +# ifndef test_DB_H +# define test_DB_H +namespace testdb { +using namespace leveldb; + +enum IndexStatus{ + Creating, + Deleting, + Exist, + NotExist + }; + +class testDB { +private: + leveldb::DB *kvDB_; + // leveldb::DB *metaDB_; + // leveldb::DB *indexDB_; + + std::string dbname_; + const Options *options_; + Env *env_; +public: + friend class Request; + friend class testsReq; + friend class iCreateReq; + friend class iDeleteReq; + friend class DeleteReq; + friend class BatchReq; + + //用的时候必须testDB *db = new testDB()再open,不能像之前一样DB *db + // testDB() : indexDB_(nullptr), kvDB_(nullptr), metaDB_(nullptr) {}; + testDB() : kvDB_(nullptr) { } + ~testDB(); +/*lab1的要求,作为db派生类要实现的虚函数*/ + Status Put(const WriteOptions &options, const Slice &key, const Slice &value) ; + Status PutFields(const WriteOptions &, const Slice &key, const FieldArray &tests) ; + Status Delete(const WriteOptions &options, const Slice &key) ; + Status Write(const WriteOptions &options, WriteBatch *updates) ; + Status Get(const ReadOptions &options, const Slice &key, std::string *value) ; + Status GetFields(const ReadOptions &options, const Slice &key, FieldArray *tests) ; + std::vector FindKeysByField(Field &test) ; + Iterator * NewIterator(const ReadOptions &options) ; + const Snapshot * GetSnapshot() ; + void ReleaseSnapshot(const Snapshot *snapshot) ; + bool GetProperty(const Slice &property, std::string *value) ; + void GetApproximateSizes(const Range *range, int n, uint64_t *sizes) ; + void CompactRange(const Slice *begin, const Slice *end) ; + + static Status OpentestDB(Options& options,const std::string& name,testDB** dbptr); + + + +}; + +Status DestroyDB(const std::string& name, + const Options& options); +} // end of namespace +# endif \ No newline at end of file diff --git a/util/env_posix.cc b/util/env_posix.cc index ffd06c4..ec18875 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -923,4 +923,8 @@ Env* Env::Default() { return env_container.env(); } +Env* getPosixEnv() { + return new PosixEnv; +} + } // namespace leveldb diff --git a/util/serialize_value.cc b/util/serialize_value.cc index 88aa844..73fb092 100644 --- a/util/serialize_value.cc +++ b/util/serialize_value.cc @@ -35,7 +35,7 @@ FieldArray *ParseValue(const std::string& value_str,FieldArray *fields){ valStr = valSlice.ToString(); res->emplace_back(nameStr, valStr); } else { - std::cout << "name and val not match!" << std::endl; + std::cout << "name and val not match! From ParseValue" << std::endl; } nameSlice.clear(); valSlice.clear(); diff --git a/util/serialize_value.h b/util/serialize_value.h index a337bc6..2405773 100644 --- a/util/serialize_value.h +++ b/util/serialize_value.h @@ -31,7 +31,7 @@ public: if(GetLengthPrefixedSlice(&valueSlice, &valSlice)) { map[nameSlice.ToString()] = valSlice.ToString(); } else { - std::cout << "name and val not match!" << std::endl; + std::cout << "name and val not match! From InternalFieldArray" << std::endl; } nameSlice.clear(); valSlice.clear();