diff --git a/db/log_reader.cc b/db/log_reader.cc index e44b66c..6d4a5b2 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -25,7 +25,8 @@ Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum, eof_(false), last_record_offset_(0), end_of_buffer_offset_(0), - initial_offset_(initial_offset) { + initial_offset_(initial_offset), + resyncing_(initial_offset > 0) { } Reader::~Reader() { @@ -74,6 +75,17 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) { while (true) { uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); const unsigned int record_type = ReadPhysicalRecord(&fragment); + if (resyncing_) { + if (record_type == kMiddleType) { + continue; + } else if (record_type == kLastType) { + resyncing_ = false; + continue; + } else { + resyncing_ = false; + } + } + switch (record_type) { case kFullType: if (in_fragmented_record) { diff --git a/db/log_reader.h b/db/log_reader.h index 6aff791..8389d61 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -73,6 +73,11 @@ class Reader { // Offset at which to start looking for the first record to return uint64_t const initial_offset_; + // True if we are resynchronizing after a seek (initial_offset_ > 0). In + // particular, a run of kMiddleType and kLastType records can be silently + // skipped in this mode + bool resyncing_; + // Extend record types with the following special values enum { kEof = kMaxRecordType + 1, diff --git a/db/log_test.cc b/db/log_test.cc index 11e4a47..7b08c26 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -79,7 +79,7 @@ class LogTest { virtual Status Skip(uint64_t n) { if (n > contents_.size()) { contents_.clear(); - return Status::NotFound("in-memory file skipepd past end"); + return Status::NotFound("in-memory file skipped past end"); } contents_.remove_prefix(n); @@ -105,7 +105,7 @@ class LogTest { ReportCollector report_; bool reading_; Writer* writer_; - Reader reader_; + Reader* reader_; // Record metadata for testing initial offset functionality static size_t initial_offset_record_sizes_[]; @@ -114,12 +114,13 @@ class LogTest { public: LogTest() : reading_(false), writer_(new Writer(&dest_)), - reader_(&source_, &report_, true/*checksum*/, - 0/*initial_offset*/) { + reader_(new Reader(&source_, &report_, true/*checksum*/, + 0/*initial_offset*/)) { } ~LogTest() { delete writer_; + delete reader_; } void ReopenForAppend() { @@ -143,7 +144,7 @@ class LogTest { } std::string scratch; Slice record; - if (reader_.ReadRecord(&record, &scratch)) { + if (reader_->ReadRecord(&record, &scratch)) { return record.ToString(); } else { return "EOF"; @@ -198,6 +199,11 @@ class LogTest { } } + void StartReadingAt(uint64_t initial_offset) { + delete reader_; + reader_ = new Reader(&source_, &report_, true/*checksum*/, initial_offset); + } + void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) { WriteInitialOffsetLog(); reading_ = true; @@ -227,7 +233,6 @@ class LogTest { ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]); delete offset_reader; } - }; size_t LogTest::initial_offset_record_sizes_[] = @@ -463,6 +468,22 @@ TEST(LogTest, PartialLastIsIgnored) { ASSERT_EQ(0, DroppedBytes()); } +TEST(LogTest, SkipIntoMultiRecord) { + // Consider a fragmented record: + // first(R1), middle(R1), last(R1), first(R2) + // If initial_offset points to a record after first(R1) but before first(R2) + // incomplete fragment errors are not actual errors, and must be suppressed + // until a new first or full record is encountered. + Write(BigString("foo", 3*kBlockSize)); + Write("correct"); + StartReadingAt(kBlockSize); + + ASSERT_EQ("correct", Read()); + ASSERT_EQ("", ReportMessage()); + ASSERT_EQ(0, DroppedBytes()); + ASSERT_EQ("EOF", Read()); +} + TEST(LogTest, ErrorJoinsRecords) { // Consider two fragmented records: // first(R1) last(R1) first(R2) last(R2)