From a2fb086d07b7dbd9c4a59fe57646bd465841edd5 Mon Sep 17 00:00:00 2001
From: corrado <corrado@google.com>
Date: Tue, 27 Sep 2016 04:50:38 -0700
Subject: [PATCH] Add option for max file size. The currend hard-coded value of
 2M is inefficient in colossus.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=134391640
---
 db/db_bench.cc            | 38 +++++++++++++++++++++++--------
 db/db_impl.cc             |  1 +
 db/version_set.cc         | 57 +++++++++++++++++++++++++++++------------------
 db/version_set.h          |  2 +-
 include/leveldb/options.h | 12 ++++++++++
 util/options.cc           |  1 +
 6 files changed, 79 insertions(+), 32 deletions(-)

diff --git a/db/db_bench.cc b/db/db_bench.cc
index 7a0f5e0..3ad19a5 100644
--- a/db/db_bench.cc
+++ b/db/db_bench.cc
@@ -84,6 +84,14 @@ static bool FLAGS_histogram = false;
 // (initialized to default value by "main")
 static int FLAGS_write_buffer_size = 0;
 
+// Number of bytes written to each file.
+// (initialized to default value by "main")
+static int FLAGS_max_file_size = 0;
+
+// Approximate size of user data packed per block (before compression.
+// (initialized to default value by "main")
+static int FLAGS_block_size = 0;
+
 // Number of bytes to use as a cache of uncompressed data.
 // Negative means use default settings.
 static int FLAGS_cache_size = -1;
@@ -109,6 +117,7 @@ static const char* FLAGS_db = NULL;
 namespace leveldb {
 
 namespace {
+leveldb::Env* g_env = NULL;
 
 // Helper for quickly generating random data.
 class RandomGenerator {
@@ -186,7 +195,7 @@ class Stats {
     done_ = 0;
     bytes_ = 0;
     seconds_ = 0;
-    start_ = Env::Default()->NowMicros();
+    start_ = g_env->NowMicros();
     finish_ = start_;
     message_.clear();
   }
@@ -204,7 +213,7 @@ class Stats {
   }
 
   void Stop() {
-    finish_ = Env::Default()->NowMicros();
+    finish_ = g_env->NowMicros();
     seconds_ = (finish_ - start_) * 1e-6;
   }
 
@@ -214,7 +223,7 @@ class Stats {
 
   void FinishedSingleOp() {
     if (FLAGS_histogram) {
-      double now = Env::Default()->NowMicros();
+      double now = g_env->NowMicros();
       double micros = now - last_op_finish_;
       hist_.Add(micros);
       if (micros > 20000) {
@@ -404,10 +413,10 @@ class Benchmark {
     reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
     heap_counter_(0) {
     std::vector<std::string> files;
-    Env::Default()->GetChildren(FLAGS_db, &files);
+    g_env->GetChildren(FLAGS_db, &files);
     for (size_t i = 0; i < files.size(); i++) {
       if (Slice(files[i]).starts_with("heap-")) {
-        Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]);
+        g_env->DeleteFile(std::string(FLAGS_db) + "/" + files[i]);
       }
     }
     if (!FLAGS_use_existing_db) {
@@ -589,7 +598,7 @@ class Benchmark {
       arg[i].shared = &shared;
       arg[i].thread = new ThreadState(i);
       arg[i].thread->shared = &shared;
-      Env::Default()->StartThread(ThreadBody, &arg[i]);
+      g_env->StartThread(ThreadBody, &arg[i]);
     }
 
     shared.mu.Lock();
@@ -700,9 +709,12 @@ class Benchmark {
   void Open() {
     assert(db_ == NULL);
     Options options;
+    options.env = g_env;
     options.create_if_missing = !FLAGS_use_existing_db;
     options.block_cache = cache_;
     options.write_buffer_size = FLAGS_write_buffer_size;
+    options.max_file_size = FLAGS_max_file_size;
+    options.block_size = FLAGS_block_size;
     options.max_open_files = FLAGS_open_files;
     options.filter_policy = filter_policy_;
     options.reuse_logs = FLAGS_reuse_logs;
@@ -925,7 +937,7 @@ class Benchmark {
     char fname[100];
     snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_);
     WritableFile* file;
-    Status s = Env::Default()->NewWritableFile(fname, &file);
+    Status s = g_env->NewWritableFile(fname, &file);
     if (!s.ok()) {
       fprintf(stderr, "%s\n", s.ToString().c_str());
       return;
@@ -934,7 +946,7 @@ class Benchmark {
     delete file;
     if (!ok) {
       fprintf(stderr, "heap profiling not supported\n");
-      Env::Default()->DeleteFile(fname);
+      g_env->DeleteFile(fname);
     }
   }
 };
@@ -943,6 +955,8 @@ class Benchmark {
 
 int main(int argc, char** argv) {
   FLAGS_write_buffer_size = leveldb::Options().write_buffer_size;
+  FLAGS_max_file_size = leveldb::Options().max_file_size;
+  FLAGS_block_size = leveldb::Options().block_size;
   FLAGS_open_files = leveldb::Options().max_open_files;
   std::string default_db_path;
 
@@ -973,6 +987,10 @@ int main(int argc, char** argv) {
       FLAGS_value_size = n;
     } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) {
       FLAGS_write_buffer_size = n;
+    } else if (sscanf(argv[i], "--max_file_size=%d%c", &n, &junk) == 1) {
+      FLAGS_max_file_size = n;
+    } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) {
+      FLAGS_block_size = n;
     } else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) {
       FLAGS_cache_size = n;
     } else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) {
@@ -987,9 +1005,11 @@ int main(int argc, char** argv) {
     }
   }
 
+  leveldb::g_env = leveldb::Env::Default();
+
   // Choose a location for the test database if none given with --db=<path>
   if (FLAGS_db == NULL) {
-      leveldb::Env::Default()->GetTestDirectory(&default_db_path);
+      leveldb::g_env->GetTestDirectory(&default_db_path);
       default_db_path += "/dbbench";
       FLAGS_db = default_db_path.c_str();
   }
diff --git a/db/db_impl.cc b/db/db_impl.cc
index 60f4e66..f43ad76 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -96,6 +96,7 @@ Options SanitizeOptions(const std::string& dbname,
   result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL;
   ClipToRange(&result.max_open_files,    64 + kNumNonTableCacheFiles, 50000);
   ClipToRange(&result.write_buffer_size, 64<<10,                      1<<30);
+  ClipToRange(&result.max_file_size,     1<<20,                       1<<30);
   ClipToRange(&result.block_size,        1<<10,                       4<<20);
   if (result.info_log == NULL) {
     // Open a log file in the same directory as the db
diff --git a/db/version_set.cc b/db/version_set.cc
index a5e0f77..b1256f9 100644
--- a/db/version_set.cc
+++ b/db/version_set.cc
@@ -20,21 +20,29 @@
 
 namespace leveldb {
 
-static const int kTargetFileSize = 2 * 1048576;
+static int TargetFileSize(const Options* options) {
+  return options->max_file_size;
+}
 
 // Maximum bytes of overlaps in grandparent (i.e., level+2) before we
 // stop building a single file in a level->level+1 compaction.
-static const int64_t kMaxGrandParentOverlapBytes = 10 * kTargetFileSize;
+static int64_t MaxGrandParentOverlapBytes(const Options* options) {
+  return 10 * TargetFileSize(options);
+}
 
 // Maximum number of bytes in all compacted files.  We avoid expanding
 // the lower level file set of a compaction if it would make the
 // total compaction cover more than this many bytes.
-static const int64_t kExpandedCompactionByteSizeLimit = 25 * kTargetFileSize;
+static int64_t ExpandedCompactionByteSizeLimit(const Options* options) {
+  return 25 * TargetFileSize(options);
+}
 
-static double MaxBytesForLevel(int level) {
+static double MaxBytesForLevel(const Options* options, int level) {
   // Note: the result for level zero is not really used since we set
   // the level-0 compaction threshold based on number of files.
-  double result = 10 * 1048576.0;  // Result for both level-0 and level-1
+
+  // Result for both level-0 and level-1
+  double result = 10. * 1048576.0;
   while (level > 1) {
     result *= 10;
     level--;
@@ -42,8 +50,9 @@ static double MaxBytesForLevel(int level) {
   return result;
 }
 
-static uint64_t MaxFileSizeForLevel(int level) {
-  return kTargetFileSize;  // We could vary per level to reduce number of files?
+static uint64_t MaxFileSizeForLevel(const Options* options, int level) {
+  // We could vary per level to reduce number of files?
+  return TargetFileSize(options);
 }
 
 static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
@@ -508,7 +517,7 @@ int Version::PickLevelForMemTableOutput(
         // Check that file does not overlap too many grandparent bytes.
         GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
         const int64_t sum = TotalFileSize(overlaps);
-        if (sum > kMaxGrandParentOverlapBytes) {
+        if (sum > MaxGrandParentOverlapBytes(vset_->options_)) {
           break;
         }
       }
@@ -1027,7 +1036,7 @@ bool VersionSet::ReuseManifest(const std::string& dscname,
       manifest_type != kDescriptorFile ||
       !env_->GetFileSize(dscname, &manifest_size).ok() ||
       // Make new compacted MANIFEST if old one is too big
-      manifest_size >= kTargetFileSize) {
+      manifest_size >= TargetFileSize(options_)) {
     return false;
   }
 
@@ -1076,7 +1085,8 @@ void VersionSet::Finalize(Version* v) {
     } else {
       // Compute the ratio of current size to size limit.
       const uint64_t level_bytes = TotalFileSize(v->files_[level]);
-      score = static_cast<double>(level_bytes) / MaxBytesForLevel(level);
+      score =
+          static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level);
     }
 
     if (score > best_score) {
@@ -1290,7 +1300,7 @@ Compaction* VersionSet::PickCompaction() {
     level = current_->compaction_level_;
     assert(level >= 0);
     assert(level+1 < config::kNumLevels);
-    c = new Compaction(level);
+    c = new Compaction(options_, level);
 
     // Pick the first file that comes after compact_pointer_[level]
     for (size_t i = 0; i < current_->files_[level].size(); i++) {
@@ -1307,7 +1317,7 @@ Compaction* VersionSet::PickCompaction() {
     }
   } else if (seek_compaction) {
     level = current_->file_to_compact_level_;
-    c = new Compaction(level);
+    c = new Compaction(options_, level);
     c->inputs_[0].push_back(current_->file_to_compact_);
   } else {
     return NULL;
@@ -1352,7 +1362,8 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
     const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
     const int64_t expanded0_size = TotalFileSize(expanded0);
     if (expanded0.size() > c->inputs_[0].size() &&
-        inputs1_size + expanded0_size < kExpandedCompactionByteSizeLimit) {
+        inputs1_size + expanded0_size <
+            ExpandedCompactionByteSizeLimit(options_)) {
       InternalKey new_start, new_limit;
       GetRange(expanded0, &new_start, &new_limit);
       std::vector<FileMetaData*> expanded1;
@@ -1414,7 +1425,7 @@ Compaction* VersionSet::CompactRange(
   // and we must not pick one file and drop another older file if the
   // two files overlap.
   if (level > 0) {
-    const uint64_t limit = MaxFileSizeForLevel(level);
+    const uint64_t limit = MaxFileSizeForLevel(options_, level);
     uint64_t total = 0;
     for (size_t i = 0; i < inputs.size(); i++) {
       uint64_t s = inputs[i]->file_size;
@@ -1426,7 +1437,7 @@ Compaction* VersionSet::CompactRange(
     }
   }
 
-  Compaction* c = new Compaction(level);
+  Compaction* c = new Compaction(options_, level);
   c->input_version_ = current_;
   c->input_version_->Ref();
   c->inputs_[0] = inputs;
@@ -1434,9 +1445,9 @@ Compaction* VersionSet::CompactRange(
   return c;
 }
 
-Compaction::Compaction(int level)
+Compaction::Compaction(const Options* options, int level)
     : level_(level),
-      max_output_file_size_(MaxFileSizeForLevel(level)),
+      max_output_file_size_(MaxFileSizeForLevel(options, level)),
       input_version_(NULL),
       grandparent_index_(0),
       seen_key_(false),
@@ -1453,12 +1464,13 @@ Compaction::~Compaction() {
 }
 
 bool Compaction::IsTrivialMove() const {
+  const VersionSet* vset = input_version_->vset_;
   // Avoid a move if there is lots of overlapping grandparent data.
   // Otherwise, the move could create a parent file that will require
   // a very expensive merge later on.
-  return (num_input_files(0) == 1 &&
-          num_input_files(1) == 0 &&
-          TotalFileSize(grandparents_) <= kMaxGrandParentOverlapBytes);
+  return (num_input_files(0) == 1 && num_input_files(1) == 0 &&
+          TotalFileSize(grandparents_) <=
+              MaxGrandParentOverlapBytes(vset->options_));
 }
 
 void Compaction::AddInputDeletions(VersionEdit* edit) {
@@ -1491,8 +1503,9 @@ bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
 }
 
 bool Compaction::ShouldStopBefore(const Slice& internal_key) {
+  const VersionSet* vset = input_version_->vset_;
   // Scan to find earliest grandparent file that contains key.
-  const InternalKeyComparator* icmp = &input_version_->vset_->icmp_;
+  const InternalKeyComparator* icmp = &vset->icmp_;
   while (grandparent_index_ < grandparents_.size() &&
       icmp->Compare(internal_key,
                     grandparents_[grandparent_index_]->largest.Encode()) > 0) {
@@ -1503,7 +1516,7 @@ bool Compaction::ShouldStopBefore(const Slice& internal_key) {
   }
   seen_key_ = true;
 
-  if (overlapped_bytes_ > kMaxGrandParentOverlapBytes) {
+  if (overlapped_bytes_ > MaxGrandParentOverlapBytes(vset->options_)) {
     // Too much overlap for current output; start new output
     overlapped_bytes_ = 0;
     return true;
diff --git a/db/version_set.h b/db/version_set.h
index 1dec745..c4e7ac3 100644
--- a/db/version_set.h
+++ b/db/version_set.h
@@ -366,7 +366,7 @@ class Compaction {
   friend class Version;
   friend class VersionSet;
 
-  explicit Compaction(int level);
+  Compaction(const Options* options, int level);
 
   int level_;
   uint64_t max_output_file_size_;
diff --git a/include/leveldb/options.h b/include/leveldb/options.h
index 83a1ef3..976e381 100644
--- a/include/leveldb/options.h
+++ b/include/leveldb/options.h
@@ -112,6 +112,18 @@ struct Options {
   // Default: 16
   int block_restart_interval;
 
+  // Leveldb will write up to this amount of bytes to a file before
+  // switching to a new one.
+  // Most clients should leave this parameter alone.  However if your
+  // filesystem is more efficient with larger files, you could
+  // consider increasing the value.  The downside will be longer
+  // compactions and hence longer latency/performance hiccups.
+  // Another reason to increase this parameter might be when you are
+  // initially populating a large database.
+  //
+  // Default: 2MB
+  size_t max_file_size;
+
   // Compress blocks using the specified compression algorithm.  This
   // parameter can be changed dynamically.
   //
diff --git a/util/options.cc b/util/options.cc
index 8b618fb..b5e6227 100644
--- a/util/options.cc
+++ b/util/options.cc
@@ -21,6 +21,7 @@ Options::Options()
       block_cache(NULL),
       block_size(4096),
       block_restart_interval(16),
+      max_file_size(2<<20),
       compression(kSnappyCompression),
       reuse_logs(false),
       filter_policy(NULL) {