diff --git a/db/corruption_test.cc b/db/corruption_test.cc index 8015101..69fa03a 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -295,7 +295,7 @@ TEST(CorruptionTest, CompactionInputError) { Build(10); DBImpl* dbi = reinterpret_cast(db_); dbi->TEST_CompactMemTable(); - const int last = config::kNumLevels - 1; + const int last = config::kMaxMemCompactLevel; ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last))); Corrupt(kTableFile, 100, 1); diff --git a/db/db_impl.cc b/db/db_impl.cc index 7556d5a..48056da 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -456,10 +456,13 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, // should not be added to the manifest. int level = 0; if (s.ok() && meta.file_size > 0) { - if (base != NULL && !base->OverlapInLevel(0, meta.smallest, meta.largest)) { - // Push to largest level we can without causing overlaps - while (level + 1 < config::kNumLevels && - !base->OverlapInLevel(level + 1, meta.smallest, meta.largest)) { + const Slice min_user_key = meta.smallest.user_key(); + const Slice max_user_key = meta.largest.user_key(); + if (base != NULL && !base->OverlapInLevel(0, min_user_key, max_user_key)) { + // Push the new sstable to a higher level if possible to reduce + // expensive manifest file ops. + while (level < config::kMaxMemCompactLevel && + !base->OverlapInLevel(level + 1, min_user_key, max_user_key)) { level++; } } @@ -1276,12 +1279,14 @@ Status DestroyDB(const std::string& dbname, const Options& options) { } FileLock* lock; - Status result = env->LockFile(LockFileName(dbname), &lock); + const std::string lockname = LockFileName(dbname); + Status result = env->LockFile(lockname, &lock); if (result.ok()) { uint64_t number; FileType type; for (size_t i = 0; i < filenames.size(); i++) { - if (ParseFileName(filenames[i], &number, &type)) { + if (ParseFileName(filenames[i], &number, &type) && + filenames[i] != lockname) { // Lock file will be deleted at end Status del = env->DeleteFile(dbname + "/" + filenames[i]); if (result.ok() && !del.ok()) { result = del; @@ -1289,7 +1294,7 @@ Status DestroyDB(const std::string& dbname, const Options& options) { } } env->UnlockFile(lock); // Ignore error since state is already gone - env->DeleteFile(LockFileName(dbname)); + env->DeleteFile(lockname); env->DeleteDir(dbname); // Ignore error in case dir contains other files } return result; diff --git a/db/db_test.cc b/db/db_test.cc index d5d60cd..0ac29e6 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -650,6 +650,25 @@ TEST(DBTest, CompactionsGenerateMultipleFiles) { } } +TEST(DBTest, RepeatedWritesToSameKey) { + Options options; + options.env = env_; + options.write_buffer_size = 100000; // Small write buffer + Reopen(&options); + + // We must have at most one file per level except for level-0, + // which may have up to kL0_StopWritesTrigger files. + const int kMaxFiles = config::kNumLevels + config::kL0_StopWritesTrigger; + + Random rnd(301); + std::string value = RandomString(&rnd, 2 * options.write_buffer_size); + for (int i = 0; i < 5 * kMaxFiles; i++) { + Put("key", value); + ASSERT_LE(TotalTableFiles(), kMaxFiles); + fprintf(stderr, "after %d: %d files\n", int(i+1), TotalTableFiles()); + } +} + TEST(DBTest, SparseMerge) { Options options; options.compression = kNoCompression; @@ -863,7 +882,7 @@ TEST(DBTest, HiddenValuesAreRemoved) { TEST(DBTest, DeletionMarkers1) { Put("foo", "v1"); ASSERT_OK(dbfull()->TEST_CompactMemTable()); - const int last = config::kNumLevels - 1; + const int last = config::kMaxMemCompactLevel; ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo => v1 is now in last level // Place a table at level last-1 to prevent merging with preceding mutation @@ -891,7 +910,7 @@ TEST(DBTest, DeletionMarkers1) { TEST(DBTest, DeletionMarkers2) { Put("foo", "v1"); ASSERT_OK(dbfull()->TEST_CompactMemTable()); - const int last = config::kNumLevels - 1; + const int last = config::kMaxMemCompactLevel; ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo => v1 is now in last level // Place a table at level last-1 to prevent merging with preceding mutation diff --git a/db/dbformat.h b/db/dbformat.h index 97491bc..ec1d193 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -29,6 +29,14 @@ static const int kL0_SlowdownWritesTrigger = 8; // Maximum number of level-0 files. We stop writes at this point. static const int kL0_StopWritesTrigger = 12; +// Maximum level to which a new compacted memtable is pushed if it +// does not create overlap. We try to push to level 2 to avoid the +// relatively expensive level 0=>1 compactions and to avoid some +// expensive manifest file operations. We do not push all the way to +// the largest level since that can generate a lot of wasted disk +// space if the same key space is being repeatedly overwritten. +static const int kMaxMemCompactLevel = 2; + } class InternalKey; diff --git a/db/log_reader.cc b/db/log_reader.cc index 8721071..fcb3aa7 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -4,6 +4,7 @@ #include "db/log_reader.h" +#include #include "leveldb/env.h" #include "util/coding.h" #include "util/crc32c.h" @@ -72,7 +73,8 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) { Slice fragment; while (true) { uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); - switch (ReadPhysicalRecord(&fragment)) { + const unsigned int record_type = ReadPhysicalRecord(&fragment); + switch (record_type) { case kFullType: if (in_fragmented_record) { // Handle bug in earlier versions of log::Writer where @@ -144,13 +146,16 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) { } break; - default: + default: { + char buf[40]; + snprintf(buf, sizeof(buf), "unknown record type %u", record_type); ReportCorruption( (fragment.size() + (in_fragmented_record ? scratch->size() : 0)), - "unknown record type"); + buf); in_fragmented_record = false; scratch->clear(); break; + } } } return false; @@ -212,16 +217,16 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { return kBadRecord; } + if (type == kZeroType && length == 0) { + // Skip zero length record without reporting any drops since + // such records are produced by the mmap based writing code in + // env_posix.cc that preallocates file regions. + buffer_.clear(); + return kBadRecord; + } + // Check crc if (checksum_) { - if (type == kZeroType && length == 0) { - // Skip zero length record without reporting any drops since - // such records are produced by the mmap based writing code in - // env_posix.cc that preallocates file regions. - buffer_.clear(); - return kBadRecord; - } - uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); uint32_t actual_crc = crc32c::Value(header + 6, 1 + length); if (actual_crc != expected_crc) { diff --git a/db/version_set.cc b/db/version_set.cc index 54342e4..816f189 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -99,11 +99,14 @@ int FindFile(const InternalKeyComparator& icmp, bool SomeFileOverlapsRange( const InternalKeyComparator& icmp, const std::vector& files, - const InternalKey& smallest, - const InternalKey& largest) { - const int index = FindFile(icmp, files, smallest.Encode()); + const Slice& smallest_user_key, + const Slice& largest_user_key) { + // Find the earliest possible internal key for smallest_user_key + InternalKey small(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek); + const int index = FindFile(icmp, files, small.Encode()); return ((index < files.size()) && - icmp.Compare(largest, files[index]->smallest) >= 0); + icmp.user_comparator()->Compare( + largest_user_key, files[index]->smallest.user_key()) >= 0); } // An internal iterator. For a given version/level pair, yields @@ -353,9 +356,11 @@ void Version::Unref() { } bool Version::OverlapInLevel(int level, - const InternalKey& smallest, - const InternalKey& largest) { - return SomeFileOverlapsRange(vset_->icmp_, files_[level], smallest, largest); + const Slice& smallest_user_key, + const Slice& largest_user_key) { + return SomeFileOverlapsRange(vset_->icmp_, files_[level], + smallest_user_key, + largest_user_key); } std::string Version::DebugString() const { diff --git a/db/version_set.h b/db/version_set.h index f00c35a..693fc6f 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -42,13 +42,13 @@ extern int FindFile(const InternalKeyComparator& icmp, const std::vector& files, const Slice& key); -// Returns true iff some file in "files" overlaps some part of +// Returns true iff some file in "files" overlaps the user key range // [smallest,largest]. extern bool SomeFileOverlapsRange( const InternalKeyComparator& icmp, const std::vector& files, - const InternalKey& smallest, - const InternalKey& largest); + const Slice& smallest_user_key, + const Slice& largest_user_key); class Version { public: @@ -78,10 +78,10 @@ class Version { void Unref(); // Returns true iff some file in the specified level overlaps - // some part of [smallest,largest]. + // some part of [smallest_user_key,largest_user_key]. bool OverlapInLevel(int level, - const InternalKey& smallest, - const InternalKey& largest); + const Slice& smallest_user_key, + const Slice& largest_user_key); int NumFiles(int level) const { return files_[level].size(); } diff --git a/db/version_set_test.cc b/db/version_set_test.cc index eae2a80..ecfd62b 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -19,11 +19,13 @@ class FindFileTest { } } - void Add(const char* smallest, const char* largest) { + void Add(const char* smallest, const char* largest, + SequenceNumber smallest_seq = 100, + SequenceNumber largest_seq = 100) { FileMetaData* f = new FileMetaData; f->number = files_.size() + 1; - f->smallest = InternalKey(smallest, 100, kTypeValue); - f->largest = InternalKey(largest, 100, kTypeValue); + f->smallest = InternalKey(smallest, smallest_seq, kTypeValue); + f->largest = InternalKey(largest, largest_seq, kTypeValue); files_.push_back(f); } @@ -34,10 +36,8 @@ class FindFileTest { } bool Overlaps(const char* smallest, const char* largest) { - InternalKey s(smallest, 100, kTypeValue); - InternalKey l(largest, 100, kTypeValue); InternalKeyComparator cmp(BytewiseComparator()); - return SomeFileOverlapsRange(cmp, files_, s, l); + return SomeFileOverlapsRange(cmp, files_, smallest, largest); } }; @@ -108,6 +108,15 @@ TEST(FindFileTest, Multiple) { ASSERT_TRUE(Overlaps("450", "500")); } +TEST(FindFileTest, OverlapSequenceChecks) { + Add("200", "200", 5000, 3000); + ASSERT_TRUE(! Overlaps("199", "199")); + ASSERT_TRUE(! Overlaps("201", "300")); + ASSERT_TRUE(Overlaps("200", "200")); + ASSERT_TRUE(Overlaps("190", "200")); + ASSERT_TRUE(Overlaps("200", "210")); +} + } int main(int argc, char** argv) { diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index d975444..6d65eed 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -9,6 +9,8 @@ namespace leveldb { +class MemTable; + // WriteBatchInternal provides static methods for manipulating a // WriteBatch that we don't want in the public WriteBatch interface. class WriteBatchInternal { diff --git a/include/leveldb/options.h b/include/leveldb/options.h index a94651f..a0afbf2 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -75,6 +75,8 @@ struct Options { // Larger values increase performance, especially during bulk loads. // Up to two write buffers may be held in memory at the same time, // so you may wish to adjust this parameter to control memory usage. + // Also, a larger write buffer will result in a longer recovery time + // the next time the database is opened. // // Default: 4MB size_t write_buffer_size; diff --git a/util/env_posix.cc b/util/env_posix.cc index fec1599..46723e2 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -28,6 +28,10 @@ namespace leveldb { namespace { +static Status IOError(const std::string& context, int err_number) { + return Status::IOError(context, strerror(err_number)); +} + class PosixSequentialFile: public SequentialFile { private: std::string filename_; @@ -47,7 +51,7 @@ class PosixSequentialFile: public SequentialFile { // We leave status as ok if we hit the end of the file } else { // A partial read with an error: return a non-ok status - s = Status::IOError(filename_, strerror(errno)); + s = IOError(filename_, errno); } } return s; @@ -55,7 +59,7 @@ class PosixSequentialFile: public SequentialFile { virtual Status Skip(uint64_t n) { if (fseek(file_, n, SEEK_CUR)) { - return Status::IOError(filename_, strerror(errno)); + return IOError(filename_, errno); } return Status::OK(); } @@ -78,7 +82,7 @@ class PosixRandomAccessFile: public RandomAccessFile { *result = Slice(scratch, (r < 0) ? 0 : r); if (r < 0) { // An error: return a non-ok status - s = Status::IOError(filename_, strerror(errno)); + s = IOError(filename_, errno); } return s; } @@ -114,13 +118,16 @@ class PosixMmapFile : public WritableFile { return s; } - void UnmapCurrentRegion() { + bool UnmapCurrentRegion() { + bool result = true; if (base_ != NULL) { if (last_sync_ < limit_) { // Defer syncing this data until next Sync() call, if any pending_sync_ = true; } - munmap(base_, limit_ - base_); + if (munmap(base_, limit_ - base_) != 0) { + result = false; + } file_offset_ += limit_ - base_; base_ = NULL; limit_ = NULL; @@ -132,6 +139,7 @@ class PosixMmapFile : public WritableFile { map_size_ *= 2; } } + return result; } bool MapNewRegion() { @@ -181,8 +189,10 @@ class PosixMmapFile : public WritableFile { assert(dst_ <= limit_); size_t avail = limit_ - dst_; if (avail == 0) { - UnmapCurrentRegion(); - MapNewRegion(); + if (!UnmapCurrentRegion() || + !MapNewRegion()) { + return IOError(filename_, errno); + } } size_t n = (left <= avail) ? left : avail; @@ -197,17 +207,18 @@ class PosixMmapFile : public WritableFile { virtual Status Close() { Status s; size_t unused = limit_ - dst_; - UnmapCurrentRegion(); - if (unused > 0) { + if (!UnmapCurrentRegion()) { + s = IOError(filename_, errno); + } else if (unused > 0) { // Trim the extra space at the end of the file if (ftruncate(fd_, file_offset_ - unused) < 0) { - s = Status::IOError(filename_, strerror(errno)); + s = IOError(filename_, errno); } } if (close(fd_) < 0) { if (s.ok()) { - s = Status::IOError(filename_, strerror(errno)); + s = IOError(filename_, errno); } } @@ -228,7 +239,7 @@ class PosixMmapFile : public WritableFile { // Some unmapped data was not synced pending_sync_ = false; if (fdatasync(fd_) < 0) { - s = Status::IOError(filename_, strerror(errno)); + s = IOError(filename_, errno); } } @@ -239,7 +250,7 @@ class PosixMmapFile : public WritableFile { size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1); last_sync_ = dst_; if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) { - s = Status::IOError(filename_, strerror(errno)); + s = IOError(filename_, errno); } } @@ -276,7 +287,7 @@ class PosixEnv : public Env { FILE* f = fopen(fname.c_str(), "r"); if (f == NULL) { *result = NULL; - return Status::IOError(fname, strerror(errno)); + return IOError(fname, errno); } else { *result = new PosixSequentialFile(fname, f); return Status::OK(); @@ -288,7 +299,7 @@ class PosixEnv : public Env { int fd = open(fname.c_str(), O_RDONLY); if (fd < 0) { *result = NULL; - return Status::IOError(fname, strerror(errno)); + return IOError(fname, errno); } *result = new PosixRandomAccessFile(fname, fd); return Status::OK(); @@ -300,7 +311,7 @@ class PosixEnv : public Env { const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); if (fd < 0) { *result = NULL; - s = Status::IOError(fname, strerror(errno)); + s = IOError(fname, errno); } else { *result = new PosixMmapFile(fname, fd, page_size_); } @@ -316,7 +327,7 @@ class PosixEnv : public Env { result->clear(); DIR* d = opendir(dir.c_str()); if (d == NULL) { - return Status::IOError(dir, strerror(errno)); + return IOError(dir, errno); } struct dirent* entry; while ((entry = readdir(d)) != NULL) { @@ -329,7 +340,7 @@ class PosixEnv : public Env { virtual Status DeleteFile(const std::string& fname) { Status result; if (unlink(fname.c_str()) != 0) { - result = Status::IOError(fname, strerror(errno)); + result = IOError(fname, errno); } return result; }; @@ -337,7 +348,7 @@ class PosixEnv : public Env { virtual Status CreateDir(const std::string& name) { Status result; if (mkdir(name.c_str(), 0755) != 0) { - result = Status::IOError(name, strerror(errno)); + result = IOError(name, errno); } return result; }; @@ -345,7 +356,7 @@ class PosixEnv : public Env { virtual Status DeleteDir(const std::string& name) { Status result; if (rmdir(name.c_str()) != 0) { - result = Status::IOError(name, strerror(errno)); + result = IOError(name, errno); } return result; }; @@ -355,7 +366,7 @@ class PosixEnv : public Env { struct stat sbuf; if (stat(fname.c_str(), &sbuf) != 0) { *size = 0; - s = Status::IOError(fname, strerror(errno)); + s = IOError(fname, errno); } else { *size = sbuf.st_size; } @@ -365,7 +376,7 @@ class PosixEnv : public Env { virtual Status RenameFile(const std::string& src, const std::string& target) { Status result; if (rename(src.c_str(), target.c_str()) != 0) { - result = Status::IOError(src, strerror(errno)); + result = IOError(src, errno); } return result; } @@ -375,9 +386,9 @@ class PosixEnv : public Env { Status result; int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644); if (fd < 0) { - result = Status::IOError(fname, strerror(errno)); + result = IOError(fname, errno); } else if (LockOrUnlock(fd, true) == -1) { - result = Status::IOError("lock " + fname, strerror(errno)); + result = IOError("lock " + fname, errno); close(fd); } else { PosixFileLock* my_lock = new PosixFileLock; @@ -391,7 +402,7 @@ class PosixEnv : public Env { PosixFileLock* my_lock = reinterpret_cast(lock); Status result; if (LockOrUnlock(my_lock->fd_, false) == -1) { - result = Status::IOError(strerror(errno)); + result = IOError("unlock", errno); } close(my_lock->fd_); delete my_lock;