diff --git a/db/corruption_test.cc b/db/corruption_test.cc index a59ab0e..1f4f26c 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -13,6 +13,7 @@ #include "include/write_batch.h" #include "db/db_impl.h" #include "db/filename.h" +#include "db/log_format.h" #include "db/version_set.h" #include "util/logging.h" #include "util/testharness.h" @@ -128,17 +129,17 @@ class CorruptionTest { std::string fname = candidates[rnd_.Uniform(candidates.size())]; struct stat sbuf; - if (stat(fname.c_str(), &sbuf) != 0) { - const char* msg = strerror(errno); - ASSERT_TRUE(false) << fname << ": " << msg; - } - - if (offset < 0) { - // Relative to end of file; make it absolute - if (-offset > sbuf.st_size) { - offset = 0; - } else { - offset = sbuf.st_size + offset; + if (stat(fname.c_str(), &sbuf) != 0) { + const char* msg = strerror(errno); + ASSERT_TRUE(false) << fname << ": " << msg; + } + + if (offset < 0) { + // Relative to end of file; make it absolute + if (-offset > sbuf.st_size) { + offset = 0; + } else { + offset = sbuf.st_size + offset; } } if (offset > sbuf.st_size) { @@ -183,12 +184,14 @@ class CorruptionTest { }; TEST(CorruptionTest, Recovery) { - Build(10); - Check(10, 10); + Build(100); + Check(100, 100); Corrupt(kLogFile, 19, 1); // WriteBatch tag for first record - Corrupt(kLogFile, 2*kValueSize, 1); // Somewhere in second log record? + Corrupt(kLogFile, log::kBlockSize + 1000, 1); // Somewhere in second block Reopen(); - Check(8, 8); + + // The 64 records in the first two log blocks are completely lost. + Check(36, 36); } TEST(CorruptionTest, RecoverWriteError) { diff --git a/db/db_bench.cc b/db/db_bench.cc index 4ccdd5a..db8deea 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -35,9 +35,11 @@ static const char* FLAGS_benchmarks = "writerandom," "sync,tenth,tenth,writerandom,nosync,normal," "readseq," + "readreverse," "readrandom," "compact," "readseq," + "readreverse," "readrandom," "writebig"; @@ -167,7 +169,7 @@ class Benchmark { message_.append(rate); } - fprintf(stdout, "%-12s : %10.3f micros/op;%s%s\n", + fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n", name.ToString().c_str(), (finish - start_) * 1e6 / done_, (message_.empty() ? "" : " "), @@ -179,7 +181,11 @@ class Benchmark { } public: - enum Order { SEQUENTIAL, RANDOM }; + enum Order { + SEQUENTIAL, + REVERSE, // Currently only supported for reads + RANDOM + }; Benchmark() : cache_(NewLRUCache(200<<20)), db_(NULL), @@ -239,6 +245,8 @@ class Benchmark { Write(RANDOM, num_ / 1000, 100 * 1000); } else if (name == Slice("readseq")) { Read(SEQUENTIAL); + } else if (name == Slice("readreverse")) { + Read(REVERSE); } else if (name == Slice("readrandom")) { Read(RANDOM); } else if (name == Slice("compact")) { @@ -284,23 +292,39 @@ class Benchmark { void Read(Order order) { ReadOptions options; - if (order == SEQUENTIAL) { - Iterator* iter = db_->NewIterator(options); - int i = 0; - for (iter->SeekToFirst(); i < num_ && iter->Valid(); iter->Next()) { - bytes_ += iter->key().size() + iter->value().size(); - FinishedSingleOp(); - ++i; + switch (order) { + case SEQUENTIAL: { + Iterator* iter = db_->NewIterator(options); + int i = 0; + for (iter->SeekToFirst(); i < num_ && iter->Valid(); iter->Next()) { + bytes_ += iter->key().size() + iter->value().size(); + FinishedSingleOp(); + ++i; + } + delete iter; + break; } - delete iter; - } else { - std::string value; - for (int i = 0; i < num_; i++) { - char key[100]; - const int k = (order == SEQUENTIAL) ? i : (rand_.Next() % FLAGS_num); - snprintf(key, sizeof(key), "%012d", k); - db_->Get(options, key, &value); - FinishedSingleOp(); + case REVERSE: { + Iterator* iter = db_->NewIterator(options); + int i = 0; + for (iter->SeekToLast(); i < num_ && iter->Valid(); iter->Prev()) { + bytes_ += iter->key().size() + iter->value().size(); + FinishedSingleOp(); + ++i; + } + delete iter; + break; + } + case RANDOM: { + std::string value; + for (int i = 0; i < num_; i++) { + char key[100]; + const int k = (order == SEQUENTIAL) ? i : (rand_.Next() % FLAGS_num); + snprintf(key, sizeof(key), "%012d", k); + db_->Get(options, key, &value); + FinishedSingleOp(); + } + break; } } } diff --git a/db/log_reader.cc b/db/log_reader.cc index 243bd2c..39a6d2b 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -148,16 +148,22 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result) { // Check crc if (checksum_) { if (type == kZeroType && length == 0) { - // Skip zero length record - buffer_.remove_prefix(kHeaderSize + length); + // Skip zero length record without reporting any drops since + // such records are produced by the mmap based writing code in + // env_posix.cc that preallocates file regions. + buffer_.clear(); return kBadRecord; } uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); uint32_t actual_crc = crc32c::Value(header + 6, 1 + length); if (actual_crc != expected_crc) { - ReportDrop(length, "checksum mismatch"); - buffer_.remove_prefix(kHeaderSize + length); + // Drop the rest of the buffer since "length" itself may have + // been corrupted and if we trust it, we could find some + // fragment of a real log record that just happens to look + // like a valid log record. + ReportDrop(buffer_.size(), "checksum mismatch"); + buffer_.clear(); return kBadRecord; } } diff --git a/db/log_test.cc b/db/log_test.cc index 8c1915d..5fa20aa 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -286,7 +286,7 @@ TEST(LogTest, ChecksumMismatch) { Write("foo"); IncrementByte(0, 10); ASSERT_EQ("EOF", Read()); - ASSERT_EQ(3, DroppedBytes()); + ASSERT_EQ(10, DroppedBytes()); ASSERT_EQ("OK", MatchError("checksum mismatch")); } diff --git a/table/merger.cc b/table/merger.cc index 74c1aaa..afa8b77 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -17,7 +17,8 @@ class MergingIterator : public Iterator { : comparator_(comparator), children_(new IteratorWrapper[n]), n_(n), - current_(NULL) { + current_(NULL), + direction_(kForward) { for (int i = 0; i < n; i++) { children_[i].Set(children[i]); } @@ -36,6 +37,7 @@ class MergingIterator : public Iterator { children_[i].SeekToFirst(); } FindSmallest(); + direction_ = kForward; } virtual void SeekToLast() { @@ -43,6 +45,7 @@ class MergingIterator : public Iterator { children_[i].SeekToLast(); } FindLargest(); + direction_ = kReverse; } virtual void Seek(const Slice& target) { @@ -50,16 +53,60 @@ class MergingIterator : public Iterator { children_[i].Seek(target); } FindSmallest(); + direction_ = kForward; } virtual void Next() { assert(Valid()); + + // Ensure that all children are positioned after key(). + // If we are moving in the forward direction, it is already + // true for all of the non-current_ children since current_ is + // the smallest child and key() == current_->key(). Otherwise, + // we explicitly position the non-current_ children. + if (direction_ != kForward) { + for (int i = 0; i < n_; i++) { + IteratorWrapper* child = &children_[i]; + if (child != current_) { + child->Seek(key()); + if (child->Valid() && + comparator_->Compare(key(), child->key()) == 0) { + child->Next(); + } + } + } + direction_ = kForward; + } + current_->Next(); FindSmallest(); } virtual void Prev() { assert(Valid()); + + // Ensure that all children are positioned before key(). + // If we are moving in the reverse direction, it is already + // true for all of the non-current_ children since current_ is + // the largest child and key() == current_->key(). Otherwise, + // we explicitly position the non-current_ children. + if (direction_ != kReverse) { + for (int i = 0; i < n_; i++) { + IteratorWrapper* child = &children_[i]; + if (child != current_) { + child->Seek(key()); + if (child->Valid()) { + // Child is at first entry >= key(). Step back one to be < key() + child->Prev(); + } else { + // Child has no entries >= key(). Position at last entry. + child->SeekToLast(); + } + } + } + direction_ = kReverse; + } + current_->Prev(); FindLargest(); } @@ -96,6 +143,13 @@ class MergingIterator : public Iterator { IteratorWrapper* children_; int n_; IteratorWrapper* current_; + + // Which direction is the iterator moving? + enum Direction { + kForward, + kReverse + }; + Direction direction_; }; void MergingIterator::FindSmallest() { diff --git a/table/table_test.cc b/table/table_test.cc index f4bd7c7..d997454 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -169,6 +169,8 @@ class Constructor { virtual const KVMap& data() { return data_; } + virtual DB* db() const { return NULL; } // Overridden in DBConstructor + private: KVMap data_; }; @@ -381,6 +383,8 @@ class DBConstructor: public Constructor { return db_->NewIterator(ReadOptions()); } + virtual DB* db() const { return db_; } + private: void NewDB() { std::string name = test::TmpDir() + "/table_testdb"; @@ -392,6 +396,7 @@ class DBConstructor: public Constructor { options.create_if_missing = true; options.error_if_exists = true; + options.write_buffer_size = 10000; // Something small to force merging status = DB::Open(options, name, &db_); ASSERT_TRUE(status.ok()) << status.ToString(); } @@ -640,6 +645,9 @@ class Harness { } } + // Returns NULL if not running against a DB + DB* db() const { return constructor_->db(); } + private: Options options_; Constructor* constructor_; @@ -704,6 +712,26 @@ TEST(Harness, Randomized) { } } +TEST(Harness, RandomizedLongDB) { + Random rnd(test::RandomSeed()); + TestArgs args = { DB_TEST, false, 16 }; + Init(args); + int num_entries = 100000; + for (int e = 0; e < num_entries; e++) { + std::string v; + Add(test::RandomKey(&rnd, rnd.Skewed(4)), + test::RandomString(&rnd, rnd.Skewed(5), &v).ToString()); + } + Test(&rnd); + + // We must have created enough data to force merging + uint64_t l0_files, l1_files; + ASSERT_TRUE(db()->GetProperty("leveldb.num-files-at-level0", &l0_files)); + ASSERT_TRUE(db()->GetProperty("leveldb.num-files-at-level1", &l1_files)); + ASSERT_GT(l0_files + l1_files, 0); + +} + class MemTableTest { }; TEST(MemTableTest, Simple) { diff --git a/util/env_posix.cc b/util/env_posix.cc index b662f9c..f5174d3 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -422,7 +422,7 @@ class PosixEnv : public Env { virtual void Logv(WritableFile* info_log, const char* format, va_list ap) { pthread_t tid = pthread_self(); uint64_t thread_id = 0; - memcpy(&thread_id, &tid, min(sizeof(thread_id), sizeof(tid))); + memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid))); // We try twice: the first time with a fixed-size stack allocated buffer, // and the second time with a much larger dynamically allocated buffer.