diff --git a/Makefile b/Makefile index 921b71c..84f77ab 100644 --- a/Makefile +++ b/Makefile @@ -28,9 +28,22 @@ PLATFORM_CFLAGS = -DLEVELDB_PLATFORM_POSIX -std=c++0x PORT_MODULE = port_posix.o endif # UNAME -CFLAGS = -c -I. -I./include $(PLATFORM_CFLAGS) $(OPT) +# Set 'SNAPPY' to 1 if you have the Snappy compression library +# installed and want to enable its use in LevelDB +# (see http://code.google.com/p/snappy/) +SNAPPY=0 + +ifeq ($(SNAPPY), 0) +SNAPPY_CFLAGS= +SNAPPY_LDFLAGS= +else +SNAPPY_CFLAGS=-DSNAPPY +SNAPPY_LDFLAGS=-lsnappy +endif -LDFLAGS=-lpthread +CFLAGS = -c -I. -I./include $(PLATFORM_CFLAGS) $(OPT) $(SNAPPY_CFLAGS) + +LDFLAGS=-lpthread $(SNAPPY_LDFLAGS) LIBOBJECTS = \ ./db/builder.o \ @@ -85,6 +98,7 @@ TESTS = \ skiplist_test \ table_test \ version_edit_test \ + version_set_test \ write_batch_test PROGRAMS = db_bench $(TESTS) @@ -151,17 +165,23 @@ skiplist_test: db/skiplist_test.o $(LIBOBJECTS) $(TESTHARNESS) version_edit_test: db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CC) $(LDFLAGS) db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ +version_set_test: db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CC) $(LDFLAGS) db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ + write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CC) $(LDFLAGS) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ ifeq ($(PLATFORM), IOS) # For iOS, create universal object files to be used on both the simulator and # a device. +SIMULATORROOT=/Developer/Platforms/iPhoneSimulator.platform/Developer +DEVICEROOT=/Developer/Platforms/iPhoneOS.platform/Developer +IOSVERSION=$(shell defaults read /Developer/Platforms/iPhoneOS.platform/version CFBundleShortVersionString) .cc.o: mkdir -p ios-x86/$(dir $@) - $(CC) $(CFLAGS) -isysroot /Developer/Platforms/iPhoneSimulator.platform/Developer/SDKs/iPhoneSimulator4.3.sdk -arch i686 $< -o ios-x86/$@ + $(SIMULATORROOT)/usr/bin/$(CC) $(CFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 $< -o ios-x86/$@ mkdir -p ios-arm/$(dir $@) - $(CC) $(CFLAGS) -isysroot /Developer/Platforms/iPhoneOS.platform/Developer/SDKs/iPhoneOS4.3.sdk -arch armv6 -arch armv7 $< -o ios-arm/$@ + $(DEVICEROOT)/usr/bin/$(CC) $(CFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 $< -o ios-arm/$@ lipo ios-x86/$@ ios-arm/$@ -create -output $@ else .cc.o: diff --git a/TODO b/TODO index ce81439..9130b6a 100644 --- a/TODO +++ b/TODO @@ -8,7 +8,6 @@ db object stores, etc. can be done in the background anyway, so probably not that important. -api changes: -- Make it wrappable - -Faster Get implementation +After a range is completely deleted, what gets rid of the +corresponding files if we do no future changes to that range. Make +the conditions for triggering compactions fire in more situations? diff --git a/db/builder.cc b/db/builder.cc index 9f132d7..34a7b87 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -19,8 +19,7 @@ Status BuildTable(const std::string& dbname, const Options& options, TableCache* table_cache, Iterator* iter, - FileMetaData* meta, - VersionEdit* edit) { + FileMetaData* meta) { Status s; meta->file_size = 0; iter->SeekToFirst(); @@ -79,8 +78,7 @@ Status BuildTable(const std::string& dbname, } if (s.ok() && meta->file_size > 0) { - edit->AddFile(0, meta->number, meta->file_size, - meta->smallest, meta->largest); + // Keep it } else { env->DeleteFile(fname); } diff --git a/db/builder.h b/db/builder.h index 5dd17b6..b2aeabf 100644 --- a/db/builder.h +++ b/db/builder.h @@ -19,17 +19,15 @@ class VersionEdit; // Build a Table file from the contents of *iter. The generated file // will be named according to meta->number. On success, the rest of -// *meta will be filled with metadata about the generated table, and -// the file information will be added to *edit. If no data is present -// in *iter, meta->file_size will be set to zero, and no Table file -// will be produced. +// *meta will be filled with metadata about the generated table. +// If no data is present in *iter, meta->file_size will be set to +// zero, and no Table file will be produced. extern Status BuildTable(const std::string& dbname, Env* env, const Options& options, TableCache* table_cache, Iterator* iter, - FileMetaData* meta, - VersionEdit* edit); + FileMetaData* meta); } diff --git a/db/corruption_test.cc b/db/corruption_test.cc index 12d176e..8015101 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -27,13 +27,12 @@ static const int kValueSize = 1000; class CorruptionTest { public: test::ErrorEnv env_; - Random rnd_; std::string dbname_; Cache* tiny_cache_; Options options_; DB* db_; - CorruptionTest() : rnd_(test::RandomSeed()) { + CorruptionTest() { tiny_cache_ = NewLRUCache(100); options_.env = &env_; dbname_ = test::TmpDir() + "/db_test"; @@ -122,15 +121,17 @@ class CorruptionTest { ASSERT_OK(env_.GetChildren(dbname_, &filenames)); uint64_t number; FileType type; - std::vector candidates; + std::string fname; + int picked_number = -1; for (int i = 0; i < filenames.size(); i++) { if (ParseFileName(filenames[i], &number, &type) && - type == filetype) { - candidates.push_back(dbname_ + "/" + filenames[i]); + type == filetype && + int(number) > picked_number) { // Pick latest file + fname = dbname_ + "/" + filenames[i]; + picked_number = number; } } - ASSERT_TRUE(!candidates.empty()) << filetype; - std::string fname = candidates[rnd_.Uniform(candidates.size())]; + ASSERT_TRUE(!fname.empty()) << filetype; struct stat sbuf; if (stat(fname.c_str(), &sbuf) != 0) { @@ -239,8 +240,6 @@ TEST(CorruptionTest, TableFileIndexData) { Build(10000); // Enough to build multiple Tables DBImpl* dbi = reinterpret_cast(db_); dbi->TEST_CompactMemTable(); - dbi->TEST_CompactRange(0, "", "~"); - dbi->TEST_CompactRange(1, "", "~"); Corrupt(kTableFile, -2000, 500); Reopen(); @@ -296,7 +295,8 @@ TEST(CorruptionTest, CompactionInputError) { Build(10); DBImpl* dbi = reinterpret_cast(db_); dbi->TEST_CompactMemTable(); - ASSERT_EQ(1, Property("leveldb.num-files-at-level0")); + const int last = config::kNumLevels - 1; + ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last))); Corrupt(kTableFile, 100, 1); Check(9, 9); @@ -304,8 +304,6 @@ TEST(CorruptionTest, CompactionInputError) { // Force compactions by writing lots of values Build(10000); Check(10000, 10000); - dbi->TEST_CompactRange(0, "", "~"); - ASSERT_EQ(0, Property("leveldb.num-files-at-level0")); } TEST(CorruptionTest, CompactionInputErrorParanoid) { @@ -313,9 +311,16 @@ TEST(CorruptionTest, CompactionInputErrorParanoid) { options.paranoid_checks = true; options.write_buffer_size = 1048576; Reopen(&options); + DBImpl* dbi = reinterpret_cast(db_); + + // Fill levels >= 1 so memtable compaction outputs to level 1 + for (int level = 1; level < config::kNumLevels; level++) { + dbi->Put(WriteOptions(), "", "begin"); + dbi->Put(WriteOptions(), "~", "end"); + dbi->TEST_CompactMemTable(); + } Build(10); - DBImpl* dbi = reinterpret_cast(db_); dbi->TEST_CompactMemTable(); ASSERT_EQ(1, Property("leveldb.num-files-at-level0")); diff --git a/db/db_bench.cc b/db/db_bench.cc index b24179d..53b8c53 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -86,6 +86,9 @@ static int FLAGS_open_files = 0; // benchmark will fail. static bool FLAGS_use_existing_db = false; +// Use the db with the following name. +static const char* FLAGS_db = "/tmp/dbbench"; + namespace leveldb { // Helper for quickly generating random data. @@ -318,14 +321,14 @@ class Benchmark { bytes_(0), rand_(301) { std::vector files; - Env::Default()->GetChildren("/tmp/dbbench", &files); + Env::Default()->GetChildren(FLAGS_db, &files); for (int i = 0; i < files.size(); i++) { if (Slice(files[i]).starts_with("heap-")) { - Env::Default()->DeleteFile("/tmp/dbbench/" + files[i]); + Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]); } } if (!FLAGS_use_existing_db) { - DestroyDB("/tmp/dbbench", Options()); + DestroyDB(FLAGS_db, Options()); } } @@ -364,7 +367,7 @@ class Benchmark { Write(write_options, RANDOM, EXISTING, num_, FLAGS_value_size, 1); } else if (name == Slice("fillsync")) { write_options.sync = true; - Write(write_options, RANDOM, FRESH, num_ / 100, FLAGS_value_size, 1); + Write(write_options, RANDOM, FRESH, num_ / 1000, FLAGS_value_size, 1); } else if (name == Slice("fill100K")) { Write(write_options, RANDOM, FRESH, num_ / 1000, 100 * 1000, 1); } else if (name == Slice("readseq")) { @@ -490,7 +493,7 @@ class Benchmark { options.create_if_missing = !FLAGS_use_existing_db; options.block_cache = cache_; options.write_buffer_size = FLAGS_write_buffer_size; - Status s = DB::Open(options, "/tmp/dbbench", &db_); + Status s = DB::Open(options, FLAGS_db, &db_); if (!s.ok()) { fprintf(stderr, "open error: %s\n", s.ToString().c_str()); exit(1); @@ -506,7 +509,7 @@ class Benchmark { } delete db_; db_ = NULL; - DestroyDB("/tmp/dbbench", Options()); + DestroyDB(FLAGS_db, Options()); Open(); Start(); // Do not count time taken to destroy/open } @@ -617,7 +620,7 @@ class Benchmark { void HeapProfile() { char fname[100]; - snprintf(fname, sizeof(fname), "/tmp/dbbench/heap-%04d", ++heap_counter_); + snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_); WritableFile* file; Status s = Env::Default()->NewWritableFile(fname, &file); if (!s.ok()) { @@ -665,6 +668,8 @@ int main(int argc, char** argv) { FLAGS_cache_size = n; } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) { FLAGS_open_files = n; + } else if (strncmp(argv[i], "--db=", 5) == 0) { + FLAGS_db = argv[i] + 5; } else { fprintf(stderr, "Invalid flag '%s'\n", argv[i]); exit(1); diff --git a/db/db_impl.cc b/db/db_impl.cc index abcc761..7556d5a 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -122,6 +122,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) mem_(new MemTable(internal_comparator_)), imm_(NULL), logfile_(NULL), + logfile_number_(0), log_(NULL), bg_compaction_scheduled_(false), manual_compaction_(NULL) { @@ -219,7 +220,7 @@ void DBImpl::DeleteObsoleteFiles() { bool keep = true; switch (type) { case kLogFile: - keep = ((number == versions_->LogNumber()) || + keep = ((number >= versions_->LogNumber()) || (number == versions_->PrevLogNumber())); break; case kDescriptorFile: @@ -287,14 +288,39 @@ Status DBImpl::Recover(VersionEdit* edit) { s = versions_->Recover(); if (s.ok()) { - // Recover from the log files named in the descriptor SequenceNumber max_sequence(0); - if (versions_->PrevLogNumber() != 0) { // log#==0 means no prev log - s = RecoverLogFile(versions_->PrevLogNumber(), edit, &max_sequence); + + // Recover from all newer log files than the ones named in the + // descriptor (new log files may have been added by the previous + // incarnation without registering them in the descriptor). + // + // Note that PrevLogNumber() is no longer used, but we pay + // attention to it in case we are recovering a database + // produced by an older version of leveldb. + const uint64_t min_log = versions_->LogNumber(); + const uint64_t prev_log = versions_->PrevLogNumber(); + std::vector filenames; + s = env_->GetChildren(dbname_, &filenames); + if (!s.ok()) { + return s; + } + uint64_t number; + FileType type; + std::vector logs; + for (size_t i = 0; i < filenames.size(); i++) { + if (ParseFileName(filenames[i], &number, &type) + && type == kLogFile + && ((number >= min_log) || (number == prev_log))) { + logs.push_back(number); + } } - if (s.ok() && versions_->LogNumber() != 0) { // log#==0 for initial state - s = RecoverLogFile(versions_->LogNumber(), edit, &max_sequence); + + // Recover in the order in which the logs were generated + std::sort(logs.begin(), logs.end()); + for (size_t i = 0; i < logs.size(); i++) { + s = RecoverLogFile(logs[i], edit, &max_sequence); } + if (s.ok()) { if (versions_->LastSequence() < max_sequence) { versions_->SetLastSequence(max_sequence); @@ -378,7 +404,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, } if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { - status = WriteLevel0Table(mem, edit); + status = WriteLevel0Table(mem, edit, NULL); if (!status.ok()) { // Reflect errors immediately so that conditions like full // file-systems cause the DB::Open() to fail. @@ -390,7 +416,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, } if (status.ok() && mem != NULL) { - status = WriteLevel0Table(mem, edit); + status = WriteLevel0Table(mem, edit, NULL); // Reflect errors immediately so that conditions like full // file-systems cause the DB::Open() to fail. } @@ -400,7 +426,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, return status; } -Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit) { +Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, + Version* base) { mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; @@ -413,7 +440,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit) { Status s; { mutex_.Unlock(); - s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta, edit); + s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); mutex_.Lock(); } @@ -424,10 +451,26 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit) { delete iter; pending_outputs_.erase(meta.number); + + // Note that if file_size is zero, the file has been deleted and + // should not be added to the manifest. + int level = 0; + if (s.ok() && meta.file_size > 0) { + if (base != NULL && !base->OverlapInLevel(0, meta.smallest, meta.largest)) { + // Push to largest level we can without causing overlaps + while (level + 1 < config::kNumLevels && + !base->OverlapInLevel(level + 1, meta.smallest, meta.largest)) { + level++; + } + } + edit->AddFile(level, meta.number, meta.file_size, + meta.smallest, meta.largest); + } + CompactionStats stats; stats.micros = env_->NowMicros() - start_micros; stats.bytes_written = meta.file_size; - stats_[0].Add(stats); + stats_[level].Add(stats); return s; } @@ -437,11 +480,19 @@ Status DBImpl::CompactMemTable() { // Save the contents of the memtable as a new Table VersionEdit edit; - Status s = WriteLevel0Table(imm_, &edit); + Version* base = versions_->current(); + base->Ref(); + Status s = WriteLevel0Table(imm_, &edit, base); + base->Unref(); + + if (s.ok() && shutting_down_.Acquire_Load()) { + s = Status::IOError("Deleting DB during memtable compaction"); + } // Replace immutable memtable with the generated Table if (s.ok()) { edit.SetPrevLogNumber(0); + edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed s = versions_->LogAndApply(&edit); } @@ -460,6 +511,9 @@ void DBImpl::TEST_CompactRange( int level, const std::string& begin, const std::string& end) { + assert(level >= 0); + assert(level + 1 < config::kNumLevels); + MutexLock l(&mutex_); while (manual_compaction_ != NULL) { bg_cv_.Wait(); @@ -934,22 +988,38 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { Status DBImpl::Get(const ReadOptions& options, const Slice& key, std::string* value) { - // TODO(opt): faster implementation - Iterator* iter = NewIterator(options); - iter->Seek(key); - bool found = false; - if (iter->Valid() && user_comparator()->Compare(key, iter->key()) == 0) { - Slice v = iter->value(); - value->assign(v.data(), v.size()); - found = true; - } - // Non-OK iterator status trumps everything else - Status result = iter->status(); - if (result.ok() && !found) { - result = Status::NotFound(Slice()); // Use an empty error message for speed + Status s; + MutexLock l(&mutex_); + SequenceNumber snapshot; + if (options.snapshot != NULL) { + snapshot = reinterpret_cast(options.snapshot)->number_; + } else { + snapshot = versions_->LastSequence(); } - delete iter; - return result; + + // First look in the memtable, then in the immutable memtable (if any). + LookupKey lkey(key, snapshot); + if (mem_->Get(lkey, value, &s)) { + return s; + } + if (imm_ != NULL && imm_->Get(lkey, value, &s)) { + return s; + } + + // Not in memtable(s); try live files in level order + Version* current = versions_->current(); + current->Ref(); + Version::GetStats stats; + { // Unlock while reading from files + mutex_.Unlock(); + s = current->Get(options, lkey, value, &stats); + mutex_.Lock(); + } + if (current->UpdateStats(stats)) { + MaybeScheduleCompaction(); + } + current->Unref(); + return s; } Iterator* DBImpl::NewIterator(const ReadOptions& options) { @@ -1050,18 +1120,10 @@ Status DBImpl::MakeRoomForWrite(bool force) { if (!s.ok()) { break; } - VersionEdit edit; - edit.SetPrevLogNumber(versions_->LogNumber()); - edit.SetLogNumber(new_log_number); - s = versions_->LogAndApply(&edit); - if (!s.ok()) { - delete lfile; - env_->DeleteFile(LogFileName(dbname_, new_log_number)); - break; - } delete log_; delete logfile_; logfile_ = lfile; + logfile_number_ = new_log_number; log_ = new log::Writer(lfile); imm_ = mem_; has_imm_.Release_Store(imm_); @@ -1183,6 +1245,7 @@ Status DB::Open(const Options& options, const std::string& dbname, if (s.ok()) { edit.SetLogNumber(new_log_number); impl->logfile_ = lfile; + impl->logfile_number_ = new_log_number; impl->log_ = new log::Writer(lfile); s = impl->versions_->LogAndApply(&edit); } diff --git a/db/db_impl.h b/db/db_impl.h index 84ce154..f11ea55 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -85,7 +85,7 @@ class DBImpl : public DB { VersionEdit* edit, SequenceNumber* max_sequence); - Status WriteLevel0Table(MemTable* mem, VersionEdit* edit); + Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base); Status MakeRoomForWrite(bool force /* compact even if there is room? */); @@ -124,6 +124,7 @@ class DBImpl : public DB { MemTable* imm_; // Memtable being compacted port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_ WritableFile* logfile_; + uint64_t logfile_number_; log::Writer* log_; SnapshotList snapshots_; diff --git a/db/db_test.cc b/db/db_test.cc index 42e70cf..d5d60cd 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -21,15 +21,57 @@ static std::string RandomString(Random* rnd, int len) { return r; } +// Special Env used to delay background operations +class SpecialEnv : public EnvWrapper { + public: + // sstable Sync() calls are blocked while this pointer is non-NULL. + port::AtomicPointer delay_sstable_sync_; + + explicit SpecialEnv(Env* base) : EnvWrapper(base) { + delay_sstable_sync_.Release_Store(NULL); + } + + Status NewWritableFile(const std::string& f, WritableFile** r) { + class SSTableFile : public WritableFile { + private: + SpecialEnv* env_; + WritableFile* base_; + + public: + SSTableFile(SpecialEnv* env, WritableFile* base) + : env_(env), + base_(base) { + } + Status Append(const Slice& data) { return base_->Append(data); } + Status Close() { return base_->Close(); } + Status Flush() { return base_->Flush(); } + Status Sync() { + while (env_->delay_sstable_sync_.Acquire_Load() != NULL) { + env_->SleepForMicroseconds(100000); + } + return base_->Sync(); + } + }; + + Status s = target()->NewWritableFile(f, r); + if (s.ok()) { + if (strstr(f.c_str(), ".sst") != NULL) { + *r = new SSTableFile(this, *r); + } + } + return s; + } +}; + class DBTest { public: std::string dbname_; - Env* env_; + SpecialEnv* env_; DB* db_; Options last_options_; - DBTest() : env_(Env::Default()) { + DBTest() : env_(new SpecialEnv(Env::Default())) { dbname_ = test::TmpDir() + "/db_test"; DestroyDB(dbname_, Options()); db_ = NULL; @@ -39,6 +81,7 @@ class DBTest { ~DBTest() { delete db_; DestroyDB(dbname_, Options()); + delete env_; } DBImpl* dbfull() { @@ -142,6 +185,14 @@ class DBTest { return atoi(property.c_str()); } + int TotalTableFiles() { + int result = 0; + for (int level = 0; level < config::kNumLevels; level++) { + result += NumTableFilesAtLevel(level); + } + return result; + } + uint64_t Size(const Slice& start, const Slice& limit) { Range r(start, limit); uint64_t size; @@ -162,6 +213,16 @@ class DBTest { } } + // Prevent pushing of new sstables into deeper levels by adding + // tables that cover a specified range to all levels. + void FillLevels(const std::string& smallest, const std::string& largest) { + for (int level = 0; level < config::kNumLevels; level++) { + Put(smallest, "begin"); + Put(largest, "end"); + dbfull()->TEST_CompactMemTable(); + } + } + void DumpFileCounts(const char* label) { fprintf(stderr, "---\n%s:\n", label); fprintf(stderr, "maxoverlap: %lld\n", @@ -209,6 +270,80 @@ TEST(DBTest, PutDeleteGet) { ASSERT_EQ("NOT_FOUND", Get("foo")); } +TEST(DBTest, GetFromImmutableLayer) { + Options options; + options.env = env_; + options.write_buffer_size = 100000; // Small write buffer + Reopen(&options); + + ASSERT_OK(Put("foo", "v1")); + ASSERT_EQ("v1", Get("foo")); + + env_->delay_sstable_sync_.Release_Store(env_); // Block sync calls + Put("k1", std::string(100000, 'x')); // Fill memtable + Put("k2", std::string(100000, 'y')); // Trigger compaction + ASSERT_EQ("v1", Get("foo")); + env_->delay_sstable_sync_.Release_Store(NULL); // Release sync calls +} + +TEST(DBTest, GetFromVersions) { + ASSERT_OK(Put("foo", "v1")); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("v1", Get("foo")); +} + +TEST(DBTest, GetSnapshot) { + // Try with both a short key and a long key + for (int i = 0; i < 2; i++) { + std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x'); + ASSERT_OK(Put(key, "v1")); + const Snapshot* s1 = db_->GetSnapshot(); + ASSERT_OK(Put(key, "v2")); + ASSERT_EQ("v2", Get(key)); + ASSERT_EQ("v1", Get(key, s1)); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("v2", Get(key)); + ASSERT_EQ("v1", Get(key, s1)); + db_->ReleaseSnapshot(s1); + } +} + +TEST(DBTest, GetLevel0Ordering) { + // Check that we process level-0 files in correct order. The code + // below generates two level-0 files where the earlier one comes + // before the later one in the level-0 file list since the earlier + // one has a smaller "smallest" key. + ASSERT_OK(Put("bar", "b")); + ASSERT_OK(Put("foo", "v1")); + dbfull()->TEST_CompactMemTable(); + ASSERT_OK(Put("foo", "v2")); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("v2", Get("foo")); +} + +TEST(DBTest, GetOrderedByLevels) { + ASSERT_OK(Put("foo", "v1")); + Compact("a", "z"); + ASSERT_EQ("v1", Get("foo")); + ASSERT_OK(Put("foo", "v2")); + ASSERT_EQ("v2", Get("foo")); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("v2", Get("foo")); +} + +TEST(DBTest, GetPicksCorrectFile) { + // Arrange to have multiple files in a non-level-0 level. + ASSERT_OK(Put("a", "va")); + Compact("a", "b"); + ASSERT_OK(Put("x", "vx")); + Compact("x", "y"); + ASSERT_OK(Put("f", "vf")); + Compact("f", "g"); + ASSERT_EQ("va", Get("a")); + ASSERT_EQ("vf", Get("f")); + ASSERT_EQ("vx", Get("x")); +} + TEST(DBTest, IterEmpty) { Iterator* iter = db_->NewIterator(ReadOptions()); @@ -413,6 +548,27 @@ TEST(DBTest, RecoveryWithEmptyLog) { ASSERT_EQ("v3", Get("foo")); } +// Check that writes done during a memtable compaction are recovered +// if the database is shutdown during the memtable compaction. +TEST(DBTest, RecoverDuringMemtableCompaction) { + Options options; + options.env = env_; + options.write_buffer_size = 1000000; + Reopen(&options); + + // Trigger a long memtable compaction and reopen the database during it + ASSERT_OK(Put("foo", "v1")); // Goes to 1st log file + ASSERT_OK(Put("big1", std::string(10000000, 'x'))); // Fills memtable + ASSERT_OK(Put("big2", std::string(1000, 'y'))); // Triggers compaction + ASSERT_OK(Put("bar", "v2")); // Goes to new log file + + Reopen(&options); + ASSERT_EQ("v1", Get("foo")); + ASSERT_EQ("v2", Get("bar")); + ASSERT_EQ(std::string(10000000, 'x'), Get("big1")); + ASSERT_EQ(std::string(1000, 'y'), Get("big2")); +} + static std::string Key(int i) { char buf[100]; snprintf(buf, sizeof(buf), "key%06d", i); @@ -426,11 +582,11 @@ TEST(DBTest, MinorCompactionsHappen) { const int N = 500; - int starting_num_tables = NumTableFilesAtLevel(0); + int starting_num_tables = TotalTableFiles(); for (int i = 0; i < N; i++) { ASSERT_OK(Put(Key(i), Key(i) + std::string(1000, 'v'))); } - int ending_num_tables = NumTableFilesAtLevel(0); + int ending_num_tables = TotalTableFiles(); ASSERT_GT(ending_num_tables, starting_num_tables); for (int i = 0; i < N; i++) { @@ -499,6 +655,8 @@ TEST(DBTest, SparseMerge) { options.compression = kNoCompression; Reopen(&options); + FillLevels("A", "Z"); + // Suppose there is: // small amount of data with prefix A // large amount of data with prefix B @@ -514,7 +672,8 @@ TEST(DBTest, SparseMerge) { Put(key, value); } Put("C", "vc"); - Compact("", "z"); + dbfull()->TEST_CompactMemTable(); + dbfull()->TEST_CompactRange(0, "A", "Z"); // Make sparse update Put("A", "va2"); @@ -675,6 +834,8 @@ TEST(DBTest, Snapshot) { TEST(DBTest, HiddenValuesAreRemoved) { Random rnd(301); + FillLevels("a", "z"); + std::string big = RandomString(&rnd, 50000); Put("foo", big); Put("pastfoo", "v"); @@ -702,40 +863,54 @@ TEST(DBTest, HiddenValuesAreRemoved) { TEST(DBTest, DeletionMarkers1) { Put("foo", "v1"); ASSERT_OK(dbfull()->TEST_CompactMemTable()); - dbfull()->TEST_CompactRange(0, "", "z"); - dbfull()->TEST_CompactRange(1, "", "z"); - ASSERT_EQ(NumTableFilesAtLevel(2), 1); // foo => v1 is now in level 2 file + const int last = config::kNumLevels - 1; + ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo => v1 is now in last level + + // Place a table at level last-1 to prevent merging with preceding mutation + Put("a", "begin"); + Put("z", "end"); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ(NumTableFilesAtLevel(last), 1); + ASSERT_EQ(NumTableFilesAtLevel(last-1), 1); + Delete("foo"); Put("foo", "v2"); ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]"); - ASSERT_OK(dbfull()->TEST_CompactMemTable()); + ASSERT_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2 ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]"); - dbfull()->TEST_CompactRange(0, "", "z"); + dbfull()->TEST_CompactRange(last-2, "", "z"); // DEL eliminated, but v1 remains because we aren't compacting that level // (DEL can be eliminated because v2 hides v1). ASSERT_EQ(AllEntriesFor("foo"), "[ v2, v1 ]"); - dbfull()->TEST_CompactRange(1, "", "z"); - // Merging L1 w/ L2, so we are the base level for "foo", so DEL is removed. - // (as is v1). + dbfull()->TEST_CompactRange(last-1, "", "z"); + // Merging last-1 w/ last, so we are the base level for "foo", so + // DEL is removed. (as is v1). ASSERT_EQ(AllEntriesFor("foo"), "[ v2 ]"); } TEST(DBTest, DeletionMarkers2) { Put("foo", "v1"); ASSERT_OK(dbfull()->TEST_CompactMemTable()); - dbfull()->TEST_CompactRange(0, "", "z"); - dbfull()->TEST_CompactRange(1, "", "z"); - ASSERT_EQ(NumTableFilesAtLevel(2), 1); // foo => v1 is now in level 2 file + const int last = config::kNumLevels - 1; + ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo => v1 is now in last level + + // Place a table at level last-1 to prevent merging with preceding mutation + Put("a", "begin"); + Put("z", "end"); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ(NumTableFilesAtLevel(last), 1); + ASSERT_EQ(NumTableFilesAtLevel(last-1), 1); + Delete("foo"); ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]"); - ASSERT_OK(dbfull()->TEST_CompactMemTable()); + ASSERT_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2 ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]"); - dbfull()->TEST_CompactRange(0, "", "z"); - // DEL kept: L2 file overlaps + dbfull()->TEST_CompactRange(last-2, "", "z"); + // DEL kept: "last" file overlaps ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]"); - dbfull()->TEST_CompactRange(1, "", "z"); - // Merging L1 w/ L2, so we are the base level for "foo", so DEL is removed. - // (as is v1). + dbfull()->TEST_CompactRange(last-1, "", "z"); + // Merging last-1 w/ last, so we are the base level for "foo", so + // DEL is removed. (as is v1). ASSERT_EQ(AllEntriesFor("foo"), "[ ]"); } diff --git a/db/dbformat.cc b/db/dbformat.cc index c12c138..af2e077 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -84,4 +84,23 @@ void InternalKeyComparator::FindShortSuccessor(std::string* key) const { } } +LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) { + size_t usize = user_key.size(); + size_t needed = usize + 13; // A conservative estimate + char* dst; + if (needed <= sizeof(space_)) { + dst = space_; + } else { + dst = new char[needed]; + } + start_ = dst; + dst = EncodeVarint32(dst, usize + 8); + kstart_ = dst; + memcpy(dst, user_key.data(), usize); + dst += usize; + EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek)); + dst += 8; + end_ = dst; +} + } diff --git a/db/dbformat.h b/db/dbformat.h index 89c4afb..97491bc 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -160,6 +160,46 @@ inline bool ParseInternalKey(const Slice& internal_key, return (c <= static_cast(kTypeValue)); } +// A helper class useful for DBImpl::Get() +class LookupKey { + public: + // Initialize *this for looking up user_key at a snapshot with + // the specified sequence number. + LookupKey(const Slice& user_key, SequenceNumber sequence); + + ~LookupKey(); + + // Return a key suitable for lookup in a MemTable. + Slice memtable_key() const { return Slice(start_, end_ - start_); } + + // Return an internal key (suitable for passing to an internal iterator) + Slice internal_key() const { return Slice(kstart_, end_ - kstart_); } + + // Return the user key + Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); } + + private: + // We construct a char array of the form: + // klength varint32 <-- start_ + // userkey char[klength] <-- kstart_ + // tag uint64 + // <-- end_ + // The array is a suitable MemTable key. + // The suffix starting with "userkey" can be used as an InternalKey. + const char* start_; + const char* kstart_; + const char* end_; + char space_[200]; // Avoid allocation for short keys + + // No copying allowed + LookupKey(const LookupKey&); + void operator=(const LookupKey&); +}; + +inline LookupKey::~LookupKey() { + if (start_ != space_) delete[] start_; +} + } #endif // STORAGE_LEVELDB_DB_FORMAT_H_ diff --git a/db/memtable.cc b/db/memtable.cc index 687900a..4555abb 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -105,4 +105,41 @@ void MemTable::Add(SequenceNumber s, ValueType type, table_.Insert(buf); } +bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { + Slice memkey = key.memtable_key(); + Table::Iterator iter(&table_); + iter.Seek(memkey.data()); + if (iter.Valid()) { + // entry format is: + // klength varint32 + // userkey char[klength] + // tag uint64 + // vlength varint32 + // value char[vlength] + // Check that it belongs to same user key. We do not check the + // sequence number since the Seek() call above should have skipped + // all entries with overly large sequence numbers. + const char* entry = iter.key(); + uint32_t key_length; + const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length); + if (comparator_.comparator.user_comparator()->Compare( + Slice(key_ptr, key_length - 8), + key.user_key()) == 0) { + // Correct user key + const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); + switch (static_cast(tag & 0xff)) { + case kTypeValue: { + Slice v = GetLengthPrefixedSlice(key_ptr + key_length); + value->assign(v.data(), v.size()); + return true; + } + case kTypeDeletion: + *s = Status::NotFound(Slice()); + return true; + } + } + } + return false; +} + } diff --git a/db/memtable.h b/db/memtable.h index 2e9bd61..1898b5e 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -57,6 +57,12 @@ class MemTable { const Slice& key, const Slice& value); + // If memtable contains a value for key, store it in *value and return true. + // If memtable contains a deletion for key, store a NotFound() error + // in *status and return true. + // Else, return false. + bool Get(const LookupKey& key, std::string* value, Status* s); + private: ~MemTable(); // Private since only Unref() should be used to delete it diff --git a/db/repair.cc b/db/repair.cc index 4b57169..ae1b136 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -212,14 +212,12 @@ class Repairer { } delete lfile; - // We ignore any version edits generated by the conversion to a Table + // Do not record a version edit for this conversion to a Table // since ExtractMetaData() will also generate edits. - VersionEdit skipped; FileMetaData meta; meta.number = next_file_number_++; Iterator* iter = mem->NewIterator(); - status = BuildTable(dbname_, env_, options_, table_cache_, iter, - &meta, &skipped); + status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); delete iter; mem->Unref(); mem = NULL; diff --git a/db/version_edit.h b/db/version_edit.h index ab874da..a069893 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -16,12 +16,13 @@ class VersionSet; struct FileMetaData { int refs; + int allowed_seeks; // Seeks allowed until compaction uint64_t number; uint64_t file_size; // File size in bytes InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table - FileMetaData() : refs(0), file_size(0) { } + FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) { } }; class VersionEdit { diff --git a/db/version_set.cc b/db/version_set.cc index f64ac8d..54342e4 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -75,6 +75,37 @@ Version::~Version() { } } +int FindFile(const InternalKeyComparator& icmp, + const std::vector& files, + const Slice& key) { + uint32_t left = 0; + uint32_t right = files.size(); + while (left < right) { + uint32_t mid = (left + right) / 2; + const FileMetaData* f = files[mid]; + if (icmp.InternalKeyComparator::Compare(f->largest.Encode(), key) < 0) { + // Key at "mid.largest" is < "target". Therefore all + // files at or before "mid" are uninteresting. + left = mid + 1; + } else { + // Key at "mid.largest" is >= "target". Therefore all files + // after "mid" are uninteresting. + right = mid; + } + } + return right; +} + +bool SomeFileOverlapsRange( + const InternalKeyComparator& icmp, + const std::vector& files, + const InternalKey& smallest, + const InternalKey& largest) { + const int index = FindFile(icmp, files, smallest.Encode()); + return ((index < files.size()) && + icmp.Compare(largest, files[index]->smallest) >= 0); +} + // An internal iterator. For a given version/level pair, yields // information about the files in the level. For a given entry, key() // is the largest key that occurs in the file, and value() is an @@ -92,22 +123,7 @@ class Version::LevelFileNumIterator : public Iterator { return index_ < flist_->size(); } virtual void Seek(const Slice& target) { - uint32_t left = 0; - uint32_t right = flist_->size() - 1; - while (left < right) { - uint32_t mid = (left + right) / 2; - int cmp = icmp_.Compare((*flist_)[mid]->largest.Encode(), target); - if (cmp < 0) { - // Key at "mid.largest" is < than "target". Therefore all - // files at or before "mid" are uninteresting. - left = mid + 1; - } else { - // Key at "mid.largest" is >= "target". Therefore all files - // after "mid" are uninteresting. - right = mid; - } - } - index_ = left; + index_ = FindFile(icmp_, *flist_, target); } virtual void SeekToFirst() { index_ = 0; } virtual void SeekToLast() { @@ -185,6 +201,144 @@ void Version::AddIterators(const ReadOptions& options, } } +// If "*iter" points at a value or deletion for user_key, store +// either the value, or a NotFound error and return true. +// Else return false. +static bool GetValue(Iterator* iter, const Slice& user_key, + std::string* value, + Status* s) { + if (!iter->Valid()) { + return false; + } + ParsedInternalKey parsed_key; + if (!ParseInternalKey(iter->key(), &parsed_key)) { + *s = Status::Corruption("corrupted key for ", user_key); + return true; + } + if (parsed_key.user_key != user_key) { + return false; + } + switch (parsed_key.type) { + case kTypeDeletion: + *s = Status::NotFound(Slice()); // Use an empty error message for speed + break; + case kTypeValue: { + Slice v = iter->value(); + value->assign(v.data(), v.size()); + break; + } + } + return true; +} + +static bool NewestFirst(FileMetaData* a, FileMetaData* b) { + return a->number > b->number; +} + +Status Version::Get(const ReadOptions& options, + const LookupKey& k, + std::string* value, + GetStats* stats) { + Slice ikey = k.internal_key(); + Slice user_key = k.user_key(); + const Comparator* ucmp = vset_->icmp_.user_comparator(); + Status s; + + stats->seek_file = NULL; + stats->seek_file_level = -1; + FileMetaData* last_file_read = NULL; + + // We can search level-by-level since entries never hop across + // levels. Therefore we are guaranteed that if we find data + // in an smaller level, later levels are irrelevant. + std::vector tmp; + FileMetaData* tmp2; + for (int level = 0; level < config::kNumLevels; level++) { + size_t num_files = files_[level].size(); + if (num_files == 0) continue; + + // Get the list of files to search in this level + FileMetaData* const* files = &files_[level][0]; + if (level == 0) { + // Level-0 files may overlap each other. Find all files that + // overlap user_key and process them in order from newest to oldest. + tmp.reserve(num_files); + for (int i = 0; i < num_files; i++) { + FileMetaData* f = files[i]; + if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 && + ucmp->Compare(user_key, f->largest.user_key()) <= 0) { + tmp.push_back(f); + } + } + if (tmp.empty()) continue; + + std::sort(tmp.begin(), tmp.end(), NewestFirst); + files = &tmp[0]; + num_files = tmp.size(); + } else { + // Binary search to find earliest index whose largest key >= ikey. + uint32_t index = FindFile(vset_->icmp_, files_[level], ikey); + if (index >= num_files) { + files = NULL; + num_files = 0; + } else { + tmp2 = files[index]; + if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) { + // All of "tmp2" is past any data for user_key + files = NULL; + num_files = 0; + } else { + files = &tmp2; + num_files = 1; + } + } + } + + for (int i = 0; i < num_files; ++i) { + if (last_file_read != NULL && stats->seek_file == NULL) { + // We have had more than one seek for this read. Charge the 1st file. + stats->seek_file = last_file_read; + stats->seek_file_level = (i == 0 ? level - 1 : level); + } + + FileMetaData* f = files[i]; + last_file_read = f; + + Iterator* iter = vset_->table_cache_->NewIterator( + options, + f->number, + f->file_size); + iter->Seek(ikey); + const bool done = GetValue(iter, user_key, value, &s); + if (!iter->status().ok()) { + s = iter->status(); + delete iter; + return s; + } else { + delete iter; + if (done) { + return s; + } + } + } + } + + return Status::NotFound(Slice()); // Use an empty error message for speed +} + +bool Version::UpdateStats(const GetStats& stats) { + FileMetaData* f = stats.seek_file; + if (f != NULL) { + f->allowed_seeks--; + if (f->allowed_seeks <= 0 && file_to_compact_ == NULL) { + file_to_compact_ = f; + file_to_compact_level_ = stats.seek_file_level; + return true; + } + } + return false; +} + void Version::Ref() { ++refs_; } @@ -198,13 +352,22 @@ void Version::Unref() { } } +bool Version::OverlapInLevel(int level, + const InternalKey& smallest, + const InternalKey& largest) { + return SomeFileOverlapsRange(vset_->icmp_, files_[level], smallest, largest); +} + std::string Version::DebugString() const { std::string r; for (int level = 0; level < config::kNumLevels; level++) { - // E.g., level 1: 17:123['a' .. 'd'] 20:43['e' .. 'g'] - r.append("level "); + // E.g., + // --- level 1 --- + // 17:123['a' .. 'd'] + // 20:43['e' .. 'g'] + r.append("--- level "); AppendNumberTo(&r, level); - r.push_back(':'); + r.append(" ---\n"); const std::vector& files = files_[level]; for (size_t i = 0; i < files.size(); i++) { r.push_back(' '); @@ -215,9 +378,8 @@ std::string Version::DebugString() const { AppendEscapedStringTo(&r, files[i]->smallest.Encode()); r.append("' .. '"); AppendEscapedStringTo(&r, files[i]->largest.Encode()); - r.append("']"); + r.append("']\n"); } - r.push_back('\n'); } return r; } @@ -305,6 +467,23 @@ class VersionSet::Builder { const int level = edit->new_files_[i].first; FileMetaData* f = new FileMetaData(edit->new_files_[i].second); f->refs = 1; + + // We arrange to automatically compact this file after + // a certain number of seeks. Let's assume: + // (1) One seek costs 10ms + // (2) Writing or reading 1MB costs 10ms (100MB/s) + // (3) A compaction of 1MB does 25MB of IO: + // 1MB read from this level + // 10-12MB read from next level (boundaries may be misaligned) + // 10-12MB written to next level + // This implies that 25 seeks cost the same as the compaction + // of 1MB of data. I.e., one seek costs approximately the + // same as the compaction of 40KB of data. We are a little + // conservative and allow approximately one seek for every 16KB + // of data before triggering a compaction. + f->allowed_seeks = (f->file_size / 16384); + if (f->allowed_seeks < 100) f->allowed_seeks = 100; + levels_[level].deleted_files.erase(f->number); levels_[level].added_files->insert(f); } @@ -363,8 +542,14 @@ class VersionSet::Builder { if (levels_[level].deleted_files.count(f->number) > 0) { // File is deleted: do nothing } else { + std::vector* files = &v->files_[level]; + if (level > 0 && !files->empty()) { + // Must not overlap + assert(vset_->icmp_.Compare((*files)[files->size()-1]->largest, + f->smallest) < 0); + } f->refs++; - v->files_[level].push_back(f); + files->push_back(f); } } }; @@ -749,7 +934,7 @@ int64_t VersionSet::NumLevelBytes(int level) const { int64_t VersionSet::MaxNextLevelOverlappingBytes() { int64_t result = 0; std::vector overlaps; - for (int level = 0; level < config::kNumLevels - 1; level++) { + for (int level = 1; level < config::kNumLevels - 1; level++) { for (size_t i = 0; i < current_->files_[level].size(); i++) { const FileMetaData* f = current_->files_[level][i]; GetOverlappingInputs(level+1, f->smallest, f->largest, &overlaps); @@ -854,31 +1039,43 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { } Compaction* VersionSet::PickCompaction() { - if (!NeedsCompaction()) { + Compaction* c; + int level; + + // We prefer compactions triggered by too much data in a level over + // the compactions triggered by seeks. + const bool size_compaction = (current_->compaction_score_ >= 1); + const bool seek_compaction = (current_->file_to_compact_ != NULL); + if (size_compaction) { + level = current_->compaction_level_; + assert(level >= 0); + assert(level+1 < config::kNumLevels); + c = new Compaction(level); + + // Pick the first file that comes after compact_pointer_[level] + for (size_t i = 0; i < current_->files_[level].size(); i++) { + FileMetaData* f = current_->files_[level][i]; + if (compact_pointer_[level].empty() || + icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) { + c->inputs_[0].push_back(f); + break; + } + } + if (c->inputs_[0].empty()) { + // Wrap-around to the beginning of the key space + c->inputs_[0].push_back(current_->files_[level][0]); + } + } else if (seek_compaction) { + level = current_->file_to_compact_level_; + c = new Compaction(level); + c->inputs_[0].push_back(current_->file_to_compact_); + } else { return NULL; } - const int level = current_->compaction_level_; - assert(level >= 0); - assert(level+1 < config::kNumLevels); - Compaction* c = new Compaction(level); c->input_version_ = current_; c->input_version_->Ref(); - // Pick the first file that comes after compact_pointer_[level] - for (size_t i = 0; i < current_->files_[level].size(); i++) { - FileMetaData* f = current_->files_[level][i]; - if (compact_pointer_[level].empty() || - icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) { - c->inputs_[0].push_back(f); - break; - } - } - if (c->inputs_[0].empty()) { - // Wrap-around to the beginning of the key space - c->inputs_[0].push_back(current_->files_[level][0]); - } - // Files in level 0 may overlap each other, so pick up all overlapping ones if (level == 0) { InternalKey smallest, largest; diff --git a/db/version_set.h b/db/version_set.h index 2bac5e2..f00c35a 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -35,6 +35,21 @@ class Version; class VersionSet; class WritableFile; +// Return the smallest index i such that files[i]->largest >= key. +// Return files.size() if there is no such file. +// REQUIRES: "files" contains a sorted list of non-overlapping files. +extern int FindFile(const InternalKeyComparator& icmp, + const std::vector& files, + const Slice& key); + +// Returns true iff some file in "files" overlaps some part of +// [smallest,largest]. +extern bool SomeFileOverlapsRange( + const InternalKeyComparator& icmp, + const std::vector& files, + const InternalKey& smallest, + const InternalKey& largest); + class Version { public: // Append to *iters a sequence of iterators that will @@ -42,11 +57,34 @@ class Version { // REQUIRES: This version has been saved (see VersionSet::SaveTo) void AddIterators(const ReadOptions&, std::vector* iters); + // Lookup the value for key. If found, store it in *val and + // return OK. Else return a non-OK status. Fills *stats. + // REQUIRES: lock is not held + struct GetStats { + FileMetaData* seek_file; + int seek_file_level; + }; + Status Get(const ReadOptions&, const LookupKey& key, std::string* val, + GetStats* stats); + + // Adds "stats" into the current state. Returns true if a new + // compaction may need to be triggered, false otherwise. + // REQUIRES: lock is held + bool UpdateStats(const GetStats& stats); + // Reference count management (so Versions do not disappear out from // under live iterators) void Ref(); void Unref(); + // Returns true iff some file in the specified level overlaps + // some part of [smallest,largest]. + bool OverlapInLevel(int level, + const InternalKey& smallest, + const InternalKey& largest); + + int NumFiles(int level) const { return files_[level].size(); } + // Return a human readable string that describes this version's contents. std::string DebugString() const; @@ -65,6 +103,10 @@ class Version { // List of files per level std::vector files_[config::kNumLevels]; + // Next file to compact based on seek stats. + FileMetaData* file_to_compact_; + int file_to_compact_level_; + // Level that should be compacted next and its compaction score. // Score < 1 means compaction is not strictly needed. These fields // are initialized by Finalize(). @@ -73,6 +115,8 @@ class Version { explicit Version(VersionSet* vset) : vset_(vset), next_(this), prev_(this), refs_(0), + file_to_compact_(NULL), + file_to_compact_level_(-1), compaction_score_(-1), compaction_level_(-1) { } @@ -158,7 +202,10 @@ class VersionSet { Iterator* MakeInputIterator(Compaction* c); // Returns true iff some level needs a compaction. - bool NeedsCompaction() const { return current_->compaction_score_ >= 1; } + bool NeedsCompaction() const { + Version* v = current_; + return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL); + } // Add all files listed in any live version to *live. // May also mutate some internal state. diff --git a/port/port_posix.h b/port/port_posix.h index c158db1..d0b0615 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -9,6 +9,9 @@ #include #include +#ifdef SNAPPY +#include +#endif #include #include #include @@ -72,15 +75,30 @@ class AtomicPointer { } }; -// TODO(gabor): Implement actual compress inline bool Snappy_Compress(const char* input, size_t input_length, std::string* output) { +#ifdef SNAPPY + output->resize(snappy::MaxCompressedLength(input_length)); + size_t outlen; + snappy::RawCompress(input, input_length, &(*output)[0], &outlen); + output->resize(outlen); + return true; +#endif + return false; } -// TODO(gabor): Implement actual uncompress inline bool Snappy_Uncompress(const char* input_data, size_t input_length, std::string* output) { +#ifdef SNAPPY + size_t ulength; + if (!snappy::GetUncompressedLength(input_data, ulength, &ulength)) { + return false; + } + output->resize(ulength); + return snappy::RawUncompress(input_data, input_length, &(*output)[0]); +#endif + return false; } diff --git a/table/table_test.cc b/table/table_test.cc index cf2bae0..10d08fc 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -727,11 +727,15 @@ TEST(Harness, RandomizedLongDB) { Test(&rnd); // We must have created enough data to force merging - std::string l0_files, l1_files; - ASSERT_TRUE(db()->GetProperty("leveldb.num-files-at-level0", &l0_files)); - ASSERT_TRUE(db()->GetProperty("leveldb.num-files-at-level1", &l1_files)); - ASSERT_GT(atoi(l0_files.c_str()) + atoi(l1_files.c_str()), 0); - + int files = 0; + for (int level = 0; level < config::kNumLevels; level++) { + std::string value; + char name[100]; + snprintf(name, sizeof(name), "leveldb.num-files-at-level%d", level); + ASSERT_TRUE(db()->GetProperty(name, &value)); + files += atoi(value.c_str()); + } + ASSERT_GT(files, 0); } class MemTableTest { };