Pārlūkot izejas kodu

Suppress error reporting after seeking but before a valid First or Full record is encountered.

Fix a spelling mistake.
pull/1/head
Mike Wiacek pirms 9 gadiem
revīziju iesūtīja Chris Mumford
vecāks
revīzija
ce45404bba
3 mainītis faili ar 45 papildinājumiem un 7 dzēšanām
  1. +13
    -1
      db/log_reader.cc
  2. +5
    -0
      db/log_reader.h
  3. +27
    -6
      db/log_test.cc

+ 13
- 1
db/log_reader.cc Parādīt failu

@ -25,7 +25,8 @@ Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,
eof_(false), eof_(false),
last_record_offset_(0), last_record_offset_(0),
end_of_buffer_offset_(0), end_of_buffer_offset_(0),
initial_offset_(initial_offset) {
initial_offset_(initial_offset),
resyncing_(initial_offset > 0) {
} }
Reader::~Reader() { Reader::~Reader() {
@ -74,6 +75,17 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) {
while (true) { while (true) {
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
const unsigned int record_type = ReadPhysicalRecord(&fragment); const unsigned int record_type = ReadPhysicalRecord(&fragment);
if (resyncing_) {
if (record_type == kMiddleType) {
continue;
} else if (record_type == kLastType) {
resyncing_ = false;
continue;
} else {
resyncing_ = false;
}
}
switch (record_type) { switch (record_type) {
case kFullType: case kFullType:
if (in_fragmented_record) { if (in_fragmented_record) {

+ 5
- 0
db/log_reader.h Parādīt failu

@ -73,6 +73,11 @@ class Reader {
// Offset at which to start looking for the first record to return // Offset at which to start looking for the first record to return
uint64_t const initial_offset_; uint64_t const initial_offset_;
// True if we are resynchronizing after a seek (initial_offset_ > 0). In
// particular, a run of kMiddleType and kLastType records can be silently
// skipped in this mode
bool resyncing_;
// Extend record types with the following special values // Extend record types with the following special values
enum { enum {
kEof = kMaxRecordType + 1, kEof = kMaxRecordType + 1,

+ 27
- 6
db/log_test.cc Parādīt failu

@ -79,7 +79,7 @@ class LogTest {
virtual Status Skip(uint64_t n) { virtual Status Skip(uint64_t n) {
if (n > contents_.size()) { if (n > contents_.size()) {
contents_.clear(); contents_.clear();
return Status::NotFound("in-memory file skipepd past end");
return Status::NotFound("in-memory file skipped past end");
} }
contents_.remove_prefix(n); contents_.remove_prefix(n);
@ -105,7 +105,7 @@ class LogTest {
ReportCollector report_; ReportCollector report_;
bool reading_; bool reading_;
Writer* writer_; Writer* writer_;
Reader reader_;
Reader* reader_;
// Record metadata for testing initial offset functionality // Record metadata for testing initial offset functionality
static size_t initial_offset_record_sizes_[]; static size_t initial_offset_record_sizes_[];
@ -114,12 +114,13 @@ class LogTest {
public: public:
LogTest() : reading_(false), LogTest() : reading_(false),
writer_(new Writer(&dest_)), writer_(new Writer(&dest_)),
reader_(&source_, &report_, true/*checksum*/,
0/*initial_offset*/) {
reader_(new Reader(&source_, &report_, true/*checksum*/,
0/*initial_offset*/)) {
} }
~LogTest() { ~LogTest() {
delete writer_; delete writer_;
delete reader_;
} }
void ReopenForAppend() { void ReopenForAppend() {
@ -143,7 +144,7 @@ class LogTest {
} }
std::string scratch; std::string scratch;
Slice record; Slice record;
if (reader_.ReadRecord(&record, &scratch)) {
if (reader_->ReadRecord(&record, &scratch)) {
return record.ToString(); return record.ToString();
} else { } else {
return "EOF"; return "EOF";
@ -198,6 +199,11 @@ class LogTest {
} }
} }
void StartReadingAt(uint64_t initial_offset) {
delete reader_;
reader_ = new Reader(&source_, &report_, true/*checksum*/, initial_offset);
}
void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) { void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
WriteInitialOffsetLog(); WriteInitialOffsetLog();
reading_ = true; reading_ = true;
@ -227,7 +233,6 @@ class LogTest {
ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]); ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
delete offset_reader; delete offset_reader;
} }
}; };
size_t LogTest::initial_offset_record_sizes_[] = size_t LogTest::initial_offset_record_sizes_[] =
@ -463,6 +468,22 @@ TEST(LogTest, PartialLastIsIgnored) {
ASSERT_EQ(0, DroppedBytes()); ASSERT_EQ(0, DroppedBytes());
} }
TEST(LogTest, SkipIntoMultiRecord) {
// Consider a fragmented record:
// first(R1), middle(R1), last(R1), first(R2)
// If initial_offset points to a record after first(R1) but before first(R2)
// incomplete fragment errors are not actual errors, and must be suppressed
// until a new first or full record is encountered.
Write(BigString("foo", 3*kBlockSize));
Write("correct");
StartReadingAt(kBlockSize);
ASSERT_EQ("correct", Read());
ASSERT_EQ("", ReportMessage());
ASSERT_EQ(0, DroppedBytes());
ASSERT_EQ("EOF", Read());
}
TEST(LogTest, ErrorJoinsRecords) { TEST(LogTest, ErrorJoinsRecords) {
// Consider two fragmented records: // Consider two fragmented records:
// first(R1) last(R1) first(R2) last(R2) // first(R1) last(R1) first(R2) last(R2)

Notiek ielāde…
Atcelt
Saglabāt