Parcourir la source

Add option for max file size. The currend hard-coded value of 2M is inefficient in colossus.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=134391640
main
corrado il y a 8 ans
committed by Chris Mumford
Parent
révision
a2fb086d07
6 fichiers modifiés avec 79 ajouts et 32 suppressions
  1. +29
    -9
      db/db_bench.cc
  2. +1
    -0
      db/db_impl.cc
  3. +35
    -22
      db/version_set.cc
  4. +1
    -1
      db/version_set.h
  5. +12
    -0
      include/leveldb/options.h
  6. +1
    -0
      util/options.cc

+ 29
- 9
db/db_bench.cc Voir le fichier

@ -84,6 +84,14 @@ static bool FLAGS_histogram = false;
// (initialized to default value by "main") // (initialized to default value by "main")
static int FLAGS_write_buffer_size = 0; static int FLAGS_write_buffer_size = 0;
// Number of bytes written to each file.
// (initialized to default value by "main")
static int FLAGS_max_file_size = 0;
// Approximate size of user data packed per block (before compression.
// (initialized to default value by "main")
static int FLAGS_block_size = 0;
// Number of bytes to use as a cache of uncompressed data. // Number of bytes to use as a cache of uncompressed data.
// Negative means use default settings. // Negative means use default settings.
static int FLAGS_cache_size = -1; static int FLAGS_cache_size = -1;
@ -109,6 +117,7 @@ static const char* FLAGS_db = NULL;
namespace leveldb { namespace leveldb {
namespace { namespace {
leveldb::Env* g_env = NULL;
// Helper for quickly generating random data. // Helper for quickly generating random data.
class RandomGenerator { class RandomGenerator {
@ -186,7 +195,7 @@ class Stats {
done_ = 0; done_ = 0;
bytes_ = 0; bytes_ = 0;
seconds_ = 0; seconds_ = 0;
start_ = Env::Default()->NowMicros();
start_ = g_env->NowMicros();
finish_ = start_; finish_ = start_;
message_.clear(); message_.clear();
} }
@ -204,7 +213,7 @@ class Stats {
} }
void Stop() { void Stop() {
finish_ = Env::Default()->NowMicros();
finish_ = g_env->NowMicros();
seconds_ = (finish_ - start_) * 1e-6; seconds_ = (finish_ - start_) * 1e-6;
} }
@ -214,7 +223,7 @@ class Stats {
void FinishedSingleOp() { void FinishedSingleOp() {
if (FLAGS_histogram) { if (FLAGS_histogram) {
double now = Env::Default()->NowMicros();
double now = g_env->NowMicros();
double micros = now - last_op_finish_; double micros = now - last_op_finish_;
hist_.Add(micros); hist_.Add(micros);
if (micros > 20000) { if (micros > 20000) {
@ -404,10 +413,10 @@ class Benchmark {
reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads), reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
heap_counter_(0) { heap_counter_(0) {
std::vector<std::string> files; std::vector<std::string> files;
Env::Default()->GetChildren(FLAGS_db, &files);
g_env->GetChildren(FLAGS_db, &files);
for (size_t i = 0; i < files.size(); i++) { for (size_t i = 0; i < files.size(); i++) {
if (Slice(files[i]).starts_with("heap-")) { if (Slice(files[i]).starts_with("heap-")) {
Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]);
g_env->DeleteFile(std::string(FLAGS_db) + "/" + files[i]);
} }
} }
if (!FLAGS_use_existing_db) { if (!FLAGS_use_existing_db) {
@ -589,7 +598,7 @@ class Benchmark {
arg[i].shared = &shared; arg[i].shared = &shared;
arg[i].thread = new ThreadState(i); arg[i].thread = new ThreadState(i);
arg[i].thread->shared = &shared; arg[i].thread->shared = &shared;
Env::Default()->StartThread(ThreadBody, &arg[i]);
g_env->StartThread(ThreadBody, &arg[i]);
} }
shared.mu.Lock(); shared.mu.Lock();
@ -700,9 +709,12 @@ class Benchmark {
void Open() { void Open() {
assert(db_ == NULL); assert(db_ == NULL);
Options options; Options options;
options.env = g_env;
options.create_if_missing = !FLAGS_use_existing_db; options.create_if_missing = !FLAGS_use_existing_db;
options.block_cache = cache_; options.block_cache = cache_;
options.write_buffer_size = FLAGS_write_buffer_size; options.write_buffer_size = FLAGS_write_buffer_size;
options.max_file_size = FLAGS_max_file_size;
options.block_size = FLAGS_block_size;
options.max_open_files = FLAGS_open_files; options.max_open_files = FLAGS_open_files;
options.filter_policy = filter_policy_; options.filter_policy = filter_policy_;
options.reuse_logs = FLAGS_reuse_logs; options.reuse_logs = FLAGS_reuse_logs;
@ -925,7 +937,7 @@ class Benchmark {
char fname[100]; char fname[100];
snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_); snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_);
WritableFile* file; WritableFile* file;
Status s = Env::Default()->NewWritableFile(fname, &file);
Status s = g_env->NewWritableFile(fname, &file);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "%s\n", s.ToString().c_str()); fprintf(stderr, "%s\n", s.ToString().c_str());
return; return;
@ -934,7 +946,7 @@ class Benchmark {
delete file; delete file;
if (!ok) { if (!ok) {
fprintf(stderr, "heap profiling not supported\n"); fprintf(stderr, "heap profiling not supported\n");
Env::Default()->DeleteFile(fname);
g_env->DeleteFile(fname);
} }
} }
}; };
@ -943,6 +955,8 @@ class Benchmark {
int main(int argc, char** argv) { int main(int argc, char** argv) {
FLAGS_write_buffer_size = leveldb::Options().write_buffer_size; FLAGS_write_buffer_size = leveldb::Options().write_buffer_size;
FLAGS_max_file_size = leveldb::Options().max_file_size;
FLAGS_block_size = leveldb::Options().block_size;
FLAGS_open_files = leveldb::Options().max_open_files; FLAGS_open_files = leveldb::Options().max_open_files;
std::string default_db_path; std::string default_db_path;
@ -973,6 +987,10 @@ int main(int argc, char** argv) {
FLAGS_value_size = n; FLAGS_value_size = n;
} else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) { } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) {
FLAGS_write_buffer_size = n; FLAGS_write_buffer_size = n;
} else if (sscanf(argv[i], "--max_file_size=%d%c", &n, &junk) == 1) {
FLAGS_max_file_size = n;
} else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) {
FLAGS_block_size = n;
} else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) { } else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) {
FLAGS_cache_size = n; FLAGS_cache_size = n;
} else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) { } else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) {
@ -987,9 +1005,11 @@ int main(int argc, char** argv) {
} }
} }
leveldb::g_env = leveldb::Env::Default();
// Choose a location for the test database if none given with --db=<path> // Choose a location for the test database if none given with --db=<path>
if (FLAGS_db == NULL) { if (FLAGS_db == NULL) {
leveldb::Env::Default()->GetTestDirectory(&default_db_path);
leveldb::g_env->GetTestDirectory(&default_db_path);
default_db_path += "/dbbench"; default_db_path += "/dbbench";
FLAGS_db = default_db_path.c_str(); FLAGS_db = default_db_path.c_str();
} }

+ 1
- 0
db/db_impl.cc Voir le fichier

@ -96,6 +96,7 @@ Options SanitizeOptions(const std::string& dbname,
result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL; result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL;
ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000); ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
ClipToRange(&result.write_buffer_size, 64<<10, 1<<30); ClipToRange(&result.write_buffer_size, 64<<10, 1<<30);
ClipToRange(&result.max_file_size, 1<<20, 1<<30);
ClipToRange(&result.block_size, 1<<10, 4<<20); ClipToRange(&result.block_size, 1<<10, 4<<20);
if (result.info_log == NULL) { if (result.info_log == NULL) {
// Open a log file in the same directory as the db // Open a log file in the same directory as the db

+ 35
- 22
db/version_set.cc Voir le fichier

@ -20,21 +20,29 @@
namespace leveldb { namespace leveldb {
static const int kTargetFileSize = 2 * 1048576;
static int TargetFileSize(const Options* options) {
return options->max_file_size;
}
// Maximum bytes of overlaps in grandparent (i.e., level+2) before we // Maximum bytes of overlaps in grandparent (i.e., level+2) before we
// stop building a single file in a level->level+1 compaction. // stop building a single file in a level->level+1 compaction.
static const int64_t kMaxGrandParentOverlapBytes = 10 * kTargetFileSize;
static int64_t MaxGrandParentOverlapBytes(const Options* options) {
return 10 * TargetFileSize(options);
}
// Maximum number of bytes in all compacted files. We avoid expanding // Maximum number of bytes in all compacted files. We avoid expanding
// the lower level file set of a compaction if it would make the // the lower level file set of a compaction if it would make the
// total compaction cover more than this many bytes. // total compaction cover more than this many bytes.
static const int64_t kExpandedCompactionByteSizeLimit = 25 * kTargetFileSize;
static int64_t ExpandedCompactionByteSizeLimit(const Options* options) {
return 25 * TargetFileSize(options);
}
static double MaxBytesForLevel(int level) {
static double MaxBytesForLevel(const Options* options, int level) {
// Note: the result for level zero is not really used since we set // Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files. // the level-0 compaction threshold based on number of files.
double result = 10 * 1048576.0; // Result for both level-0 and level-1
// Result for both level-0 and level-1
double result = 10. * 1048576.0;
while (level > 1) { while (level > 1) {
result *= 10; result *= 10;
level--; level--;
@ -42,8 +50,9 @@ static double MaxBytesForLevel(int level) {
return result; return result;
} }
static uint64_t MaxFileSizeForLevel(int level) {
return kTargetFileSize; // We could vary per level to reduce number of files?
static uint64_t MaxFileSizeForLevel(const Options* options, int level) {
// We could vary per level to reduce number of files?
return TargetFileSize(options);
} }
static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) { static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
@ -508,7 +517,7 @@ int Version::PickLevelForMemTableOutput(
// Check that file does not overlap too many grandparent bytes. // Check that file does not overlap too many grandparent bytes.
GetOverlappingInputs(level + 2, &start, &limit, &overlaps); GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
const int64_t sum = TotalFileSize(overlaps); const int64_t sum = TotalFileSize(overlaps);
if (sum > kMaxGrandParentOverlapBytes) {
if (sum > MaxGrandParentOverlapBytes(vset_->options_)) {
break; break;
} }
} }
@ -1027,7 +1036,7 @@ bool VersionSet::ReuseManifest(const std::string& dscname,
manifest_type != kDescriptorFile || manifest_type != kDescriptorFile ||
!env_->GetFileSize(dscname, &manifest_size).ok() || !env_->GetFileSize(dscname, &manifest_size).ok() ||
// Make new compacted MANIFEST if old one is too big // Make new compacted MANIFEST if old one is too big
manifest_size >= kTargetFileSize) {
manifest_size >= TargetFileSize(options_)) {
return false; return false;
} }
@ -1076,7 +1085,8 @@ void VersionSet::Finalize(Version* v) {
} else { } else {
// Compute the ratio of current size to size limit. // Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]); const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score = static_cast<double>(level_bytes) / MaxBytesForLevel(level);
score =
static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level);
} }
if (score > best_score) { if (score > best_score) {
@ -1290,7 +1300,7 @@ Compaction* VersionSet::PickCompaction() {
level = current_->compaction_level_; level = current_->compaction_level_;
assert(level >= 0); assert(level >= 0);
assert(level+1 < config::kNumLevels); assert(level+1 < config::kNumLevels);
c = new Compaction(level);
c = new Compaction(options_, level);
// Pick the first file that comes after compact_pointer_[level] // Pick the first file that comes after compact_pointer_[level]
for (size_t i = 0; i < current_->files_[level].size(); i++) { for (size_t i = 0; i < current_->files_[level].size(); i++) {
@ -1307,7 +1317,7 @@ Compaction* VersionSet::PickCompaction() {
} }
} else if (seek_compaction) { } else if (seek_compaction) {
level = current_->file_to_compact_level_; level = current_->file_to_compact_level_;
c = new Compaction(level);
c = new Compaction(options_, level);
c->inputs_[0].push_back(current_->file_to_compact_); c->inputs_[0].push_back(current_->file_to_compact_);
} else { } else {
return NULL; return NULL;
@ -1352,7 +1362,8 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
const int64_t inputs1_size = TotalFileSize(c->inputs_[1]); const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
const int64_t expanded0_size = TotalFileSize(expanded0); const int64_t expanded0_size = TotalFileSize(expanded0);
if (expanded0.size() > c->inputs_[0].size() && if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size < kExpandedCompactionByteSizeLimit) {
inputs1_size + expanded0_size <
ExpandedCompactionByteSizeLimit(options_)) {
InternalKey new_start, new_limit; InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit); GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1; std::vector<FileMetaData*> expanded1;
@ -1414,7 +1425,7 @@ Compaction* VersionSet::CompactRange(
// and we must not pick one file and drop another older file if the // and we must not pick one file and drop another older file if the
// two files overlap. // two files overlap.
if (level > 0) { if (level > 0) {
const uint64_t limit = MaxFileSizeForLevel(level);
const uint64_t limit = MaxFileSizeForLevel(options_, level);
uint64_t total = 0; uint64_t total = 0;
for (size_t i = 0; i < inputs.size(); i++) { for (size_t i = 0; i < inputs.size(); i++) {
uint64_t s = inputs[i]->file_size; uint64_t s = inputs[i]->file_size;
@ -1426,7 +1437,7 @@ Compaction* VersionSet::CompactRange(
} }
} }
Compaction* c = new Compaction(level);
Compaction* c = new Compaction(options_, level);
c->input_version_ = current_; c->input_version_ = current_;
c->input_version_->Ref(); c->input_version_->Ref();
c->inputs_[0] = inputs; c->inputs_[0] = inputs;
@ -1434,9 +1445,9 @@ Compaction* VersionSet::CompactRange(
return c; return c;
} }
Compaction::Compaction(int level)
Compaction::Compaction(const Options* options, int level)
: level_(level), : level_(level),
max_output_file_size_(MaxFileSizeForLevel(level)),
max_output_file_size_(MaxFileSizeForLevel(options, level)),
input_version_(NULL), input_version_(NULL),
grandparent_index_(0), grandparent_index_(0),
seen_key_(false), seen_key_(false),
@ -1453,12 +1464,13 @@ Compaction::~Compaction() {
} }
bool Compaction::IsTrivialMove() const { bool Compaction::IsTrivialMove() const {
const VersionSet* vset = input_version_->vset_;
// Avoid a move if there is lots of overlapping grandparent data. // Avoid a move if there is lots of overlapping grandparent data.
// Otherwise, the move could create a parent file that will require // Otherwise, the move could create a parent file that will require
// a very expensive merge later on. // a very expensive merge later on.
return (num_input_files(0) == 1 &&
num_input_files(1) == 0 &&
TotalFileSize(grandparents_) <= kMaxGrandParentOverlapBytes);
return (num_input_files(0) == 1 && num_input_files(1) == 0 &&
TotalFileSize(grandparents_) <=
MaxGrandParentOverlapBytes(vset->options_));
} }
void Compaction::AddInputDeletions(VersionEdit* edit) { void Compaction::AddInputDeletions(VersionEdit* edit) {
@ -1491,8 +1503,9 @@ bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
} }
bool Compaction::ShouldStopBefore(const Slice& internal_key) { bool Compaction::ShouldStopBefore(const Slice& internal_key) {
const VersionSet* vset = input_version_->vset_;
// Scan to find earliest grandparent file that contains key. // Scan to find earliest grandparent file that contains key.
const InternalKeyComparator* icmp = &input_version_->vset_->icmp_;
const InternalKeyComparator* icmp = &vset->icmp_;
while (grandparent_index_ < grandparents_.size() && while (grandparent_index_ < grandparents_.size() &&
icmp->Compare(internal_key, icmp->Compare(internal_key,
grandparents_[grandparent_index_]->largest.Encode()) > 0) { grandparents_[grandparent_index_]->largest.Encode()) > 0) {
@ -1503,7 +1516,7 @@ bool Compaction::ShouldStopBefore(const Slice& internal_key) {
} }
seen_key_ = true; seen_key_ = true;
if (overlapped_bytes_ > kMaxGrandParentOverlapBytes) {
if (overlapped_bytes_ > MaxGrandParentOverlapBytes(vset->options_)) {
// Too much overlap for current output; start new output // Too much overlap for current output; start new output
overlapped_bytes_ = 0; overlapped_bytes_ = 0;
return true; return true;

+ 1
- 1
db/version_set.h Voir le fichier

@ -366,7 +366,7 @@ class Compaction {
friend class Version; friend class Version;
friend class VersionSet; friend class VersionSet;
explicit Compaction(int level);
Compaction(const Options* options, int level);
int level_; int level_;
uint64_t max_output_file_size_; uint64_t max_output_file_size_;

+ 12
- 0
include/leveldb/options.h Voir le fichier

@ -112,6 +112,18 @@ struct Options {
// Default: 16 // Default: 16
int block_restart_interval; int block_restart_interval;
// Leveldb will write up to this amount of bytes to a file before
// switching to a new one.
// Most clients should leave this parameter alone. However if your
// filesystem is more efficient with larger files, you could
// consider increasing the value. The downside will be longer
// compactions and hence longer latency/performance hiccups.
// Another reason to increase this parameter might be when you are
// initially populating a large database.
//
// Default: 2MB
size_t max_file_size;
// Compress blocks using the specified compression algorithm. This // Compress blocks using the specified compression algorithm. This
// parameter can be changed dynamically. // parameter can be changed dynamically.
// //

+ 1
- 0
util/options.cc Voir le fichier

@ -21,6 +21,7 @@ Options::Options()
block_cache(NULL), block_cache(NULL),
block_size(4096), block_size(4096),
block_restart_interval(16), block_restart_interval(16),
max_file_size(2<<20),
compression(kSnappyCompression), compression(kSnappyCompression),
reuse_logs(false), reuse_logs(false),
filter_policy(NULL) { filter_policy(NULL) {

Chargement…
Annuler
Enregistrer