|
|
@ -36,6 +36,16 @@ namespace { |
|
|
|
// numbers, deletion markers, overwrites, etc.
|
|
|
|
class DBIter: public Iterator { |
|
|
|
public: |
|
|
|
// Which direction is the iterator currently moving?
|
|
|
|
// (1) When moving forward, the internal iterator is positioned at
|
|
|
|
// the exact entry that yields this->key(), this->value()
|
|
|
|
// (2) When moving backwards, the internal iterator is positioned
|
|
|
|
// just before all entries whose user key == this->key().
|
|
|
|
enum Direction { |
|
|
|
kForward, |
|
|
|
kReverse |
|
|
|
}; |
|
|
|
|
|
|
|
DBIter(const std::string* dbname, Env* env, |
|
|
|
const Comparator* cmp, Iterator* iter, SequenceNumber s) |
|
|
|
: dbname_(dbname), |
|
|
@ -44,6 +54,7 @@ class DBIter: public Iterator { |
|
|
|
iter_(iter), |
|
|
|
sequence_(s), |
|
|
|
large_(NULL), |
|
|
|
direction_(kForward), |
|
|
|
valid_(false) { |
|
|
|
} |
|
|
|
virtual ~DBIter() { |
|
|
@ -53,48 +64,21 @@ class DBIter: public Iterator { |
|
|
|
virtual bool Valid() const { return valid_; } |
|
|
|
virtual Slice key() const { |
|
|
|
assert(valid_); |
|
|
|
return key_; |
|
|
|
return (direction_ == kForward) ? ExtractUserKey(iter_->key()) : saved_key_; |
|
|
|
} |
|
|
|
virtual Slice value() const { |
|
|
|
assert(valid_); |
|
|
|
Slice raw_value = (direction_ == kForward) ? iter_->value() : saved_value_; |
|
|
|
if (large_ == NULL) { |
|
|
|
return value_; |
|
|
|
return raw_value; |
|
|
|
} else { |
|
|
|
MutexLock l(&large_->mutex); |
|
|
|
if (!large_->produced) { |
|
|
|
ReadIndirectValue(); |
|
|
|
ReadIndirectValue(raw_value); |
|
|
|
} |
|
|
|
return large_->value; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
virtual void Next() { |
|
|
|
assert(valid_); |
|
|
|
// iter_ is already positioned past DBIter::key()
|
|
|
|
FindNextUserEntry(); |
|
|
|
} |
|
|
|
|
|
|
|
virtual void Prev() { |
|
|
|
assert(valid_); |
|
|
|
bool ignored; |
|
|
|
ScanUntilBeforeCurrentKey(&ignored); |
|
|
|
FindPrevUserEntry(); |
|
|
|
} |
|
|
|
|
|
|
|
virtual void Seek(const Slice& target) { |
|
|
|
ParsedInternalKey ikey(target, sequence_, kValueTypeForSeek); |
|
|
|
std::string tmp; |
|
|
|
AppendInternalKey(&tmp, ikey); |
|
|
|
iter_->Seek(tmp); |
|
|
|
FindNextUserEntry(); |
|
|
|
} |
|
|
|
virtual void SeekToFirst() { |
|
|
|
iter_->SeekToFirst(); |
|
|
|
FindNextUserEntry(); |
|
|
|
} |
|
|
|
|
|
|
|
virtual void SeekToLast(); |
|
|
|
|
|
|
|
virtual Status status() const { |
|
|
|
if (status_.ok()) { |
|
|
|
if (large_ != NULL && !large_->status.ok()) return large_->status; |
|
|
@ -104,23 +88,13 @@ class DBIter: public Iterator { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private: |
|
|
|
void FindNextUserEntry(); |
|
|
|
void FindPrevUserEntry(); |
|
|
|
void SaveKey(const Slice& k) { key_.assign(k.data(), k.size()); } |
|
|
|
void SaveValue(const Slice& v) { |
|
|
|
if (value_.capacity() > v.size() + 1048576) { |
|
|
|
std::string empty; |
|
|
|
swap(empty, value_); |
|
|
|
} |
|
|
|
value_.assign(v.data(), v.size()); |
|
|
|
} |
|
|
|
bool ParseKey(ParsedInternalKey* key); |
|
|
|
void SkipPast(const Slice& k); |
|
|
|
void ScanUntilBeforeCurrentKey(bool* found_live); |
|
|
|
|
|
|
|
void ReadIndirectValue() const; |
|
|
|
virtual void Next(); |
|
|
|
virtual void Prev(); |
|
|
|
virtual void Seek(const Slice& target); |
|
|
|
virtual void SeekToFirst(); |
|
|
|
virtual void SeekToLast(); |
|
|
|
|
|
|
|
private: |
|
|
|
struct Large { |
|
|
|
port::Mutex mutex; |
|
|
|
std::string value; |
|
|
@ -128,19 +102,42 @@ class DBIter: public Iterator { |
|
|
|
Status status; |
|
|
|
}; |
|
|
|
|
|
|
|
void FindNextUserEntry(bool skipping, std::string* skip); |
|
|
|
void FindPrevUserEntry(); |
|
|
|
bool ParseKey(ParsedInternalKey* key); |
|
|
|
void ReadIndirectValue(Slice ref) const; |
|
|
|
|
|
|
|
inline void SaveKey(const Slice& k, std::string* dst) { |
|
|
|
dst->assign(k.data(), k.size()); |
|
|
|
} |
|
|
|
|
|
|
|
inline void ForgetLargeValue() { |
|
|
|
if (large_ != NULL) { |
|
|
|
delete large_; |
|
|
|
large_ = NULL; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
inline void ClearSavedValue() { |
|
|
|
if (saved_value_.capacity() > 1048576) { |
|
|
|
std::string empty; |
|
|
|
swap(empty, saved_value_); |
|
|
|
} else { |
|
|
|
saved_value_.clear(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
const std::string* const dbname_; |
|
|
|
Env* const env_; |
|
|
|
|
|
|
|
const Comparator* const user_comparator_; |
|
|
|
|
|
|
|
// iter_ is positioned just past current entry for DBIter if valid_
|
|
|
|
Iterator* const iter_; |
|
|
|
|
|
|
|
SequenceNumber const sequence_; |
|
|
|
|
|
|
|
Status status_; |
|
|
|
std::string key_; // Always a user key
|
|
|
|
std::string value_; |
|
|
|
Large* large_; // Non-NULL if value is an indirect reference
|
|
|
|
std::string saved_key_; // == current key when direction_==kReverse
|
|
|
|
std::string saved_value_; // == current raw value when direction_==kReverse
|
|
|
|
Large* large_; // Non-NULL if value is an indirect reference
|
|
|
|
Direction direction_; |
|
|
|
bool valid_; |
|
|
|
|
|
|
|
// No copying allowed
|
|
|
@ -157,204 +154,189 @@ inline bool DBIter::ParseKey(ParsedInternalKey* ikey) { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
void DBIter::FindNextUserEntry() { |
|
|
|
if (large_ != NULL) { |
|
|
|
if (status_.ok() && !large_->status.ok()) { |
|
|
|
status_ = large_->status; |
|
|
|
} |
|
|
|
delete large_; |
|
|
|
large_ = NULL; |
|
|
|
} |
|
|
|
while (iter_->Valid()) { |
|
|
|
ParsedInternalKey ikey; |
|
|
|
if (!ParseKey(&ikey)) { |
|
|
|
// Skip past corrupted entry
|
|
|
|
iter_->Next(); |
|
|
|
continue; |
|
|
|
} |
|
|
|
if (ikey.sequence > sequence_) { |
|
|
|
// Ignore entries newer than the snapshot
|
|
|
|
void DBIter::Next() { |
|
|
|
assert(valid_); |
|
|
|
ForgetLargeValue(); |
|
|
|
|
|
|
|
if (direction_ == kReverse) { // Switch directions?
|
|
|
|
direction_ = kForward; |
|
|
|
// iter_ is pointing just before the entries for this->key(),
|
|
|
|
// so advance into the range of entries for this->key() and then
|
|
|
|
// use the normal skipping code below.
|
|
|
|
if (!iter_->Valid()) { |
|
|
|
iter_->SeekToFirst(); |
|
|
|
} else { |
|
|
|
iter_->Next(); |
|
|
|
continue; |
|
|
|
} |
|
|
|
|
|
|
|
switch (ikey.type) { |
|
|
|
case kTypeDeletion: |
|
|
|
SaveKey(ikey.user_key); // Make local copy for use by SkipPast()
|
|
|
|
iter_->Next(); |
|
|
|
SkipPast(key_); |
|
|
|
// Do not return deleted entries. Instead keep looping.
|
|
|
|
break; |
|
|
|
|
|
|
|
case kTypeValue: |
|
|
|
SaveKey(ikey.user_key); |
|
|
|
SaveValue(iter_->value()); |
|
|
|
iter_->Next(); |
|
|
|
SkipPast(key_); |
|
|
|
// Yield the value we just found.
|
|
|
|
valid_ = true; |
|
|
|
return; |
|
|
|
|
|
|
|
case kTypeLargeValueRef: |
|
|
|
SaveKey(ikey.user_key); |
|
|
|
// Save the large value ref as value_, and read it lazily on a call
|
|
|
|
// to value()
|
|
|
|
SaveValue(iter_->value()); |
|
|
|
large_ = new Large; |
|
|
|
large_->produced = false; |
|
|
|
iter_->Next(); |
|
|
|
SkipPast(key_); |
|
|
|
// Yield the value we just found.
|
|
|
|
valid_ = true; |
|
|
|
return; |
|
|
|
if (!iter_->Valid()) { |
|
|
|
valid_ = false; |
|
|
|
saved_key_.clear(); |
|
|
|
return; |
|
|
|
} |
|
|
|
} |
|
|
|
valid_ = false; |
|
|
|
key_.clear(); |
|
|
|
value_.clear(); |
|
|
|
assert(large_ == NULL); |
|
|
|
|
|
|
|
// Temporarily use saved_key_ as storage for key to skip.
|
|
|
|
std::string* skip = &saved_key_; |
|
|
|
SaveKey(ExtractUserKey(iter_->key()), skip); |
|
|
|
FindNextUserEntry(true, skip); |
|
|
|
} |
|
|
|
|
|
|
|
void DBIter::SkipPast(const Slice& k) { |
|
|
|
while (iter_->Valid()) { |
|
|
|
void DBIter::FindNextUserEntry(bool skipping, std::string* skip) { |
|
|
|
// Loop until we hit an acceptable entry to yield
|
|
|
|
assert(iter_->Valid()); |
|
|
|
assert(direction_ == kForward); |
|
|
|
assert(large_ == NULL); |
|
|
|
do { |
|
|
|
ParsedInternalKey ikey; |
|
|
|
// Note that if we cannot parse an internal key, we keep looping
|
|
|
|
// so that if we have a run like the following:
|
|
|
|
// <x,100,v> => value100
|
|
|
|
// <corrupted entry for user key x>
|
|
|
|
// <x,50,v> => value50
|
|
|
|
// we will skip over the corrupted entry as well as value50.
|
|
|
|
if (ParseKey(&ikey) && user_comparator_->Compare(ikey.user_key, k) != 0) { |
|
|
|
break; |
|
|
|
if (ParseKey(&ikey) && ikey.sequence <= sequence_) { |
|
|
|
switch (ikey.type) { |
|
|
|
case kTypeDeletion: |
|
|
|
// Arrange to skip all upcoming entries for this key since
|
|
|
|
// they are hidden by this deletion.
|
|
|
|
SaveKey(ikey.user_key, skip); |
|
|
|
skipping = true; |
|
|
|
break; |
|
|
|
case kTypeValue: |
|
|
|
case kTypeLargeValueRef: |
|
|
|
if (skipping && |
|
|
|
user_comparator_->Compare(ikey.user_key, *skip) <= 0) { |
|
|
|
// Entry hidden
|
|
|
|
} else { |
|
|
|
valid_ = true; |
|
|
|
saved_key_.clear(); |
|
|
|
if (ikey.type == kTypeLargeValueRef) { |
|
|
|
large_ = new Large; |
|
|
|
large_->produced = false; |
|
|
|
} |
|
|
|
return; |
|
|
|
} |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
iter_->Next(); |
|
|
|
} |
|
|
|
} while (iter_->Valid()); |
|
|
|
saved_key_.clear(); |
|
|
|
valid_ = false; |
|
|
|
} |
|
|
|
|
|
|
|
void DBIter::SeekToLast() { |
|
|
|
// Position iter_ at the last uncorrupted user key and then
|
|
|
|
// let FindPrevUserEntry() do the heavy lifting to find
|
|
|
|
// a user key that is live.
|
|
|
|
iter_->SeekToLast(); |
|
|
|
ParsedInternalKey current; |
|
|
|
while (iter_->Valid() && !ParseKey(¤t)) { |
|
|
|
iter_->Prev(); |
|
|
|
} |
|
|
|
if (iter_->Valid()) { |
|
|
|
SaveKey(current.user_key); |
|
|
|
} |
|
|
|
FindPrevUserEntry(); |
|
|
|
} |
|
|
|
void DBIter::Prev() { |
|
|
|
assert(valid_); |
|
|
|
ForgetLargeValue(); |
|
|
|
|
|
|
|
// Let X be the user key at which iter_ is currently positioned.
|
|
|
|
// Adjust DBIter to point at the last entry with a key <= X that
|
|
|
|
// has a live value.
|
|
|
|
void DBIter::FindPrevUserEntry() { |
|
|
|
// Consider the following example:
|
|
|
|
//
|
|
|
|
// A@540
|
|
|
|
// A@400
|
|
|
|
//
|
|
|
|
// B@300
|
|
|
|
// B@200
|
|
|
|
// B@100 <- iter_
|
|
|
|
//
|
|
|
|
// C@301
|
|
|
|
// C@201
|
|
|
|
//
|
|
|
|
// The comments marked "(first iteration)" below relate what happens
|
|
|
|
// for the preceding example in the first iteration of the while loop
|
|
|
|
// below. There may be more than one iteration either if there are
|
|
|
|
// no live values for B, or if there is a corruption.
|
|
|
|
while (iter_->Valid()) { |
|
|
|
std::string saved = key_; |
|
|
|
bool found_live; |
|
|
|
ScanUntilBeforeCurrentKey(&found_live); |
|
|
|
// (first iteration) iter_ at A@400
|
|
|
|
if (found_live) { |
|
|
|
// Step forward into range of entries with user key >= saved
|
|
|
|
if (direction_ == kForward) { // Switch directions?
|
|
|
|
// iter_ is pointing at the current entry. Scan backwards until
|
|
|
|
// the key changes so we can use the normal reverse scanning code.
|
|
|
|
assert(iter_->Valid()); // Otherwise valid_ would have been false
|
|
|
|
SaveKey(ExtractUserKey(iter_->key()), &saved_key_); |
|
|
|
while (true) { |
|
|
|
iter_->Prev(); |
|
|
|
if (!iter_->Valid()) { |
|
|
|
iter_->SeekToFirst(); |
|
|
|
} else { |
|
|
|
iter_->Next(); |
|
|
|
} |
|
|
|
// (first iteration) iter_ at B@300
|
|
|
|
|
|
|
|
FindNextUserEntry(); // Sets key_ to the key of the next value it found
|
|
|
|
if (valid_ && user_comparator_->Compare(key_, saved) == 0) { |
|
|
|
// (first iteration) iter_ at C@301
|
|
|
|
valid_ = false; |
|
|
|
saved_key_.clear(); |
|
|
|
ClearSavedValue(); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
// FindNextUserEntry() could not find any entries under the
|
|
|
|
// user key "saved". This is probably a corruption since
|
|
|
|
// ScanUntilBefore(saved) found a live value. So we skip
|
|
|
|
// backwards to an earlier key and ignore the corrupted
|
|
|
|
// entries for "saved".
|
|
|
|
//
|
|
|
|
// (first iteration) iter_ at C@301 and saved == "B"
|
|
|
|
key_ = saved; |
|
|
|
bool ignored; |
|
|
|
ScanUntilBeforeCurrentKey(&ignored); |
|
|
|
// (first iteration) iter_ at A@400
|
|
|
|
if (user_comparator_->Compare(ExtractUserKey(iter_->key()), |
|
|
|
saved_key_) < 0) { |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
direction_ = kReverse; |
|
|
|
} |
|
|
|
valid_ = false; |
|
|
|
key_.clear(); |
|
|
|
value_.clear(); |
|
|
|
|
|
|
|
FindPrevUserEntry(); |
|
|
|
} |
|
|
|
|
|
|
|
void DBIter::ScanUntilBeforeCurrentKey(bool* found_live) { |
|
|
|
*found_live = false; |
|
|
|
if (!iter_->Valid()) { |
|
|
|
iter_->SeekToLast(); |
|
|
|
} |
|
|
|
void DBIter::FindPrevUserEntry() { |
|
|
|
assert(direction_ == kReverse); |
|
|
|
assert(large_ == NULL); |
|
|
|
|
|
|
|
while (iter_->Valid()) { |
|
|
|
ParsedInternalKey current; |
|
|
|
if (!ParseKey(¤t)) { |
|
|
|
ValueType value_type = kTypeDeletion; |
|
|
|
if (iter_->Valid()) { |
|
|
|
SaveKey(ExtractUserKey(iter_->key()), &saved_key_); |
|
|
|
do { |
|
|
|
ParsedInternalKey ikey; |
|
|
|
if (ParseKey(&ikey) && ikey.sequence <= sequence_) { |
|
|
|
if ((value_type != kTypeDeletion) && |
|
|
|
user_comparator_->Compare(ikey.user_key, saved_key_) < 0) { |
|
|
|
// We encountered a non-deleted value in entries for previous keys,
|
|
|
|
break; |
|
|
|
} |
|
|
|
value_type = ikey.type; |
|
|
|
if (value_type == kTypeDeletion) { |
|
|
|
ClearSavedValue(); |
|
|
|
} else { |
|
|
|
Slice raw_value = iter_->value(); |
|
|
|
if (saved_value_.capacity() > raw_value.size() + 1048576) { |
|
|
|
std::string empty; |
|
|
|
swap(empty, saved_value_); |
|
|
|
} |
|
|
|
saved_value_.assign(raw_value.data(), raw_value.size()); |
|
|
|
} |
|
|
|
} |
|
|
|
iter_->Prev(); |
|
|
|
continue; |
|
|
|
} |
|
|
|
} while (iter_->Valid()); |
|
|
|
} |
|
|
|
|
|
|
|
if (current.sequence > sequence_) { |
|
|
|
// Ignore entries that are serialized after this read
|
|
|
|
iter_->Prev(); |
|
|
|
continue; |
|
|
|
if (value_type == kTypeDeletion) { |
|
|
|
// End
|
|
|
|
valid_ = false; |
|
|
|
saved_key_.clear(); |
|
|
|
ClearSavedValue(); |
|
|
|
direction_ = kForward; |
|
|
|
} else { |
|
|
|
valid_ = true; |
|
|
|
if (value_type == kTypeLargeValueRef) { |
|
|
|
large_ = new Large; |
|
|
|
large_->produced = false; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
const int cmp = user_comparator_->Compare(current.user_key, key_); |
|
|
|
if (cmp < 0) { |
|
|
|
SaveKey(current.user_key); |
|
|
|
return; |
|
|
|
} else if (cmp == 0) { |
|
|
|
switch (current.type) { |
|
|
|
case kTypeDeletion: |
|
|
|
*found_live = false; |
|
|
|
break; |
|
|
|
|
|
|
|
case kTypeValue: |
|
|
|
case kTypeLargeValueRef: |
|
|
|
*found_live = true; |
|
|
|
break; |
|
|
|
} |
|
|
|
} else { // cmp > 0
|
|
|
|
*found_live = false; |
|
|
|
} |
|
|
|
void DBIter::Seek(const Slice& target) { |
|
|
|
direction_ = kForward; |
|
|
|
ForgetLargeValue(); |
|
|
|
ClearSavedValue(); |
|
|
|
saved_key_.clear(); |
|
|
|
AppendInternalKey( |
|
|
|
&saved_key_, ParsedInternalKey(target, sequence_, kValueTypeForSeek)); |
|
|
|
iter_->Seek(saved_key_); |
|
|
|
if (iter_->Valid()) { |
|
|
|
FindNextUserEntry(false, &saved_key_ /* temporary storage */); |
|
|
|
} else { |
|
|
|
valid_ = false; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
iter_->Prev(); |
|
|
|
void DBIter::SeekToFirst() { |
|
|
|
direction_ = kForward; |
|
|
|
ForgetLargeValue(); |
|
|
|
ClearSavedValue(); |
|
|
|
iter_->SeekToFirst(); |
|
|
|
if (iter_->Valid()) { |
|
|
|
FindNextUserEntry(false, &saved_key_ /* temporary storage */); |
|
|
|
} else { |
|
|
|
valid_ = false; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
void DBIter::ReadIndirectValue() const { |
|
|
|
void DBIter::SeekToLast() { |
|
|
|
direction_ = kReverse; |
|
|
|
ForgetLargeValue(); |
|
|
|
ClearSavedValue(); |
|
|
|
iter_->SeekToLast(); |
|
|
|
FindPrevUserEntry(); |
|
|
|
} |
|
|
|
|
|
|
|
void DBIter::ReadIndirectValue(Slice ref) const { |
|
|
|
assert(!large_->produced); |
|
|
|
large_->produced = true; |
|
|
|
LargeValueRef large_ref; |
|
|
|
if (value_.size() != LargeValueRef::ByteSize()) { |
|
|
|
if (ref.size() != LargeValueRef::ByteSize()) { |
|
|
|
large_->status = Status::Corruption("malformed large value reference"); |
|
|
|
return; |
|
|
|
} |
|
|
|
memcpy(large_ref.data, value_.data(), LargeValueRef::ByteSize()); |
|
|
|
memcpy(large_ref.data, ref.data(), LargeValueRef::ByteSize()); |
|
|
|
std::string fname = LargeValueFileName(*dbname_, large_ref); |
|
|
|
RandomAccessFile* file; |
|
|
|
Status s = env_->NewRandomAccessFile(fname, &file); |
|
|
|