瀏覽代碼

fix bug

kvsep_cyq
cyq 8 月之前
父節點
當前提交
3b6087ad23
共有 6 個檔案被更改,包括 64 行新增33 行删除
  1. +20
    -19
      db/db_impl.cc
  2. +2
    -0
      db/db_impl.h
  3. +3
    -0
      db/db_iter.cc
  4. +16
    -3
      db/dbformat.cc
  5. +10
    -7
      db/dbformat.h
  6. +13
    -4
      db/memtable.cc

+ 20
- 19
db/db_impl.cc 查看文件

@ -736,8 +736,8 @@ bool DBImpl::CollectKVLogs() {
total_files += current->NumFiles(i); total_files += current->NumFiles(i);
} }
//这个判断保证了所有被选择回收的kvlog都是不对应于mem、imm、L0文件的 //这个判断保证了所有被选择回收的kvlog都是不对应于mem、imm、L0文件的
if(current->kvlogs_.size() < total_files + 10) {
std::cout << "kvlogs not enough : " << current->kvlogs_.size() << " " << total_files << std::endl;
if(current->kvlogs_.size() < total_files * 20) {
// std::cout << "kvlogs not enough : " << current->kvlogs_.size() << " " << total_files << std::endl;
current->Unref(); current->Unref();
return false; return false;
} }
@ -766,11 +766,7 @@ bool DBImpl::CollectKVLogs() {
Iterator *deep_iter = NewInternalDeepIterator(ro,&latest_sequence,&seed); Iterator *deep_iter = NewInternalDeepIterator(ro,&latest_sequence,&seed);
// Iterator *iter = NewDBIterator(this, user_comparator(), deep_iter, latest_sequence, seed); // Iterator *iter = NewDBIterator(this, user_comparator(), deep_iter, latest_sequence, seed);
uint64_t budget = 10000;
SequenceNumber limit = latest_sequence + budget;
SequenceNumber now_seq = latest_sequence;
// std::cout << limit << " " << latest_sequence << std::endl;
versions_->SetLastSequence(limit);
uint64_t budget = 100;
only_Level0 = true; //保证后续的小合并的结果只会在level0 only_Level0 = true; //保证后续的小合并的结果只会在level0
mutex_.Unlock(); mutex_.Unlock();
@ -783,12 +779,12 @@ bool DBImpl::CollectKVLogs() {
std::set<uint64_t> kvlogs_to_remove; std::set<uint64_t> kvlogs_to_remove;
bool has_collected = false; bool has_collected = false;
for(int i = 0; i < kvlogs.size(); i++) {
for(int i = 0; i < std::min(kvlogs.size(),20ul) && budget > 0; i++) {
FileMetaData *f = kvlogs[i]; FileMetaData *f = kvlogs[i];
SequentialFile *file; SequentialFile *file;
env_->NewSequentialFile(KVLogFileName(dbname_, f->number), &file); env_->NewSequentialFile(KVLogFileName(dbname_, f->number), &file);
KVLogReader reader(file); KVLogReader reader(file);
for(int cnt = 0; reader.Valid() && now_seq <= limit; reader.Next(), cnt ++) {
for(int cnt = 0; reader.Valid(); reader.Next(), cnt ++) {
// std::cout << "find KV: " << reader.Key().ToString() << " " << reader.Value().ToString() << " " << reader.Seq() << "\n"; // std::cout << "find KV: " << reader.Key().ToString() << " " << reader.Value().ToString() << " " << reader.Seq() << "\n";
if(reader.Type() == kTypeDeletion) { if(reader.Type() == kTypeDeletion) {
// std::cout << "is delete record\n"; // std::cout << "is delete record\n";
@ -796,7 +792,7 @@ bool DBImpl::CollectKVLogs() {
} }
saved_key_.clear(); saved_key_.clear();
AppendInternalKey(&saved_key_, AppendInternalKey(&saved_key_,
ParsedInternalKey(reader.Key(),latest_sequence,kValueTypeForSeek));
ParsedInternalKey(reader.Key(),latest_sequence,kValueTypeForSeek,metaKVLog.number));
deep_iter->Seek(saved_key_); deep_iter->Seek(saved_key_);
// ParseInternalKey(reader.Key(), &key_in_kvlog); // ParseInternalKey(reader.Key(), &key_in_kvlog);
@ -818,24 +814,29 @@ bool DBImpl::CollectKVLogs() {
continue; continue;
} }
if(i > 0) {
budget = 0;
break;
}
has_collected = true; has_collected = true;
// std::cout << "collected KV: " << reader.Key().ToString() << " " << reader.Value().ToString() << " " << reader.Seq() << "\n"; // std::cout << "collected KV: " << reader.Key().ToString() << " " << reader.Value().ToString() << " " << reader.Seq() << "\n";
if((limit - now_seq) % 100 == 0) {
std::cout << "collected KV: " << reader.Value().ToString() << " " << reader.Seq() << " " << cnt << "\n";
std::cout << "rest budget : " << limit - now_seq << "\n";
}
// if(budget % 100 == 0) {
// std::cout << "collected KV: " << reader.Value().ToString() << " " << reader.Seq() << " " << cnt << "\n";
// std::cout << "rest budget : " << budget << "\n";
// }
budget--;
write_batch.Clear(); write_batch.Clear();
kp_batch.Clear(); kp_batch.Clear();
write_batch.Put(reader.Key(), reader.Value()); write_batch.Put(reader.Key(), reader.Value());
now_seq += 1;
WriteBatchInternal::SetSequence(&write_batch, now_seq);
WriteBatchInternal::SetSequence(&write_batch, reader.Seq());
kvlog_writer.AddRecord(WriteBatchInternal::Contents(&write_batch), fp); kvlog_writer.AddRecord(WriteBatchInternal::Contents(&write_batch), fp);
WriteBatchInternal::ConstructKPBatch(&kp_batch, &write_batch, fp); WriteBatchInternal::ConstructKPBatch(&kp_batch, &write_batch, fp);
WriteBatchInternal::InsertInto(&kp_batch, tempMem); WriteBatchInternal::InsertInto(&kp_batch, tempMem);
} }
delete file;
if(reader.Valid()) break; if(reader.Valid()) break;
kvlogs_to_remove.insert(f->number); kvlogs_to_remove.insert(f->number);
delete file;
// break;//当前一次只回收一个kvlog // break;//当前一次只回收一个kvlog
} }
@ -1494,7 +1495,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
WriteBatch* write_batch = BuildBatchGroup(&last_writer); WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch); last_sequence += WriteBatchInternal::Count(write_batch);
versions_->SetLastSequence(last_sequence);
// versions_->SetLastSequence(last_sequence);
// Add to log and apply to memtable. We can release the lock // Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging // during this phase since &w is currently responsible for logging
@ -1536,7 +1537,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
} }
if (write_batch == tmp_batch_) tmp_batch_->Clear(); if (write_batch == tmp_batch_) tmp_batch_->Clear();
// versions_->SetLastSequence(last_sequence);
versions_->SetLastSequence(last_sequence);
} }
while (true) { while (true) {

+ 2
- 0
db/db_impl.h 查看文件

@ -172,7 +172,9 @@ class DBImpl : public DB {
Status InstallCompactionResults(CompactionState* compact) Status InstallCompactionResults(CompactionState* compact)
EXCLUSIVE_LOCKS_REQUIRED(mutex_); EXCLUSIVE_LOCKS_REQUIRED(mutex_);
/*about kvlogs*/ /*about kvlogs*/
public:
Slice GetValueFromFP(const FilePointer &fp,std::string *value); Slice GetValueFromFP(const FilePointer &fp,std::string *value);
private:
int triggers = 0; int triggers = 0;
bool CollectKVLogs(); bool CollectKVLogs();

+ 3
- 0
db/db_iter.cc 查看文件

@ -13,6 +13,7 @@
#include "util/logging.h" #include "util/logging.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/random.h" #include "util/random.h"
#include "kv_sep/kvlog.h"
namespace leveldb { namespace leveldb {
@ -113,6 +114,8 @@ class DBIter : public Iterator {
Status status_; Status status_;
std::string saved_key_; // == current key when direction_==kReverse std::string saved_key_; // == current key when direction_==kReverse
std::string saved_value_; // == current raw value when direction_==kReverse std::string saved_value_; // == current raw value when direction_==kReverse
std::string buf;
FilePointer fp;
Direction direction_; Direction direction_;
bool valid_; bool valid_;
Random rnd_; Random rnd_;

+ 16
- 3
db/dbformat.cc 查看文件

@ -4,6 +4,7 @@
#include "db/dbformat.h" #include "db/dbformat.h"
#include <cstdint>
#include <cstdio> #include <cstdio>
#include <sstream> #include <sstream>
@ -20,13 +21,14 @@ static uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
void AppendInternalKey(std::string* result, const ParsedInternalKey& key) { void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {
result->append(key.user_key.data(), key.user_key.size()); result->append(key.user_key.data(), key.user_key.size());
PutFixed64(result, key.FileNumber);
PutFixed64(result, PackSequenceAndType(key.sequence, key.type)); PutFixed64(result, PackSequenceAndType(key.sequence, key.type));
} }
std::string ParsedInternalKey::DebugString() const { std::string ParsedInternalKey::DebugString() const {
std::ostringstream ss; std::ostringstream ss;
ss << '\'' << EscapeString(user_key.ToString()) << "' @ " << sequence << " : " ss << '\'' << EscapeString(user_key.ToString()) << "' @ " << sequence << " : "
<< static_cast<int>(type);
<< static_cast<int>(type) << " in file:" << FileNumber;
return ss.str(); return ss.str();
} }
@ -59,6 +61,15 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
r = +1; r = +1;
} }
} }
if (r == 0) {
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8 - 8);
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8 - 8);
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {
r = +1;
}
}
return r; return r;
} }
@ -116,7 +127,7 @@ bool InternalFilterPolicy::KeyMayMatch(const Slice& key, const Slice& f) const {
LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) { LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
size_t usize = user_key.size(); size_t usize = user_key.size();
size_t needed = usize + 13; // A conservative estimate
size_t needed = usize + 13 + 8; // A conservative estimate
char* dst; char* dst;
if (needed <= sizeof(space_)) { if (needed <= sizeof(space_)) {
dst = space_; dst = space_;
@ -124,10 +135,12 @@ LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
dst = new char[needed]; dst = new char[needed];
} }
start_ = dst; start_ = dst;
dst = EncodeVarint32(dst, usize + 8);
dst = EncodeVarint32(dst, usize + 8 + 8);
kstart_ = dst; kstart_ = dst;
std::memcpy(dst, user_key.data(), usize); std::memcpy(dst, user_key.data(), usize);
dst += usize; dst += usize;
EncodeFixed64(dst, UINT64_MAX);
dst += 8;
EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek)); EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
dst += 8; dst += 8;
end_ = dst; end_ = dst;

+ 10
- 7
db/dbformat.h 查看文件

@ -69,17 +69,18 @@ static const SequenceNumber kMaxSequenceNumber = ((0x1ull << 56) - 1);
struct ParsedInternalKey { struct ParsedInternalKey {
Slice user_key; Slice user_key;
SequenceNumber sequence; SequenceNumber sequence;
uint64_t FileNumber;
ValueType type; ValueType type;
ParsedInternalKey() {} // Intentionally left uninitialized (for speed) ParsedInternalKey() {} // Intentionally left uninitialized (for speed)
ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t)
: user_key(u), sequence(seq), type(t) {}
ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t, uint64_t file = UINT64_MAX)
: user_key(u), sequence(seq), type(t), FileNumber(file) {}
std::string DebugString() const; std::string DebugString() const;
}; };
// Return the length of the encoding of "key". // Return the length of the encoding of "key".
inline size_t InternalKeyEncodingLength(const ParsedInternalKey& key) { inline size_t InternalKeyEncodingLength(const ParsedInternalKey& key) {
return key.user_key.size() + 8;
return key.user_key.size() + 8 + 8;
} }
// Append the serialization of "key" to *result. // Append the serialization of "key" to *result.
@ -94,7 +95,7 @@ bool ParseInternalKey(const Slice& internal_key, ParsedInternalKey* result);
// Returns the user key portion of an internal key. // Returns the user key portion of an internal key.
inline Slice ExtractUserKey(const Slice& internal_key) { inline Slice ExtractUserKey(const Slice& internal_key) {
assert(internal_key.size() >= 8); assert(internal_key.size() >= 8);
return Slice(internal_key.data(), internal_key.size() - 8);
return Slice(internal_key.data(), internal_key.size() - 8 - 8);
} }
// A comparator for internal keys that uses a specified comparator for // A comparator for internal keys that uses a specified comparator for
@ -176,7 +177,8 @@ inline bool ParseInternalKey(const Slice& internal_key,
uint8_t c = num & 0xff; uint8_t c = num & 0xff;
result->sequence = num >> 8; result->sequence = num >> 8;
result->type = static_cast<ValueType>(c); result->type = static_cast<ValueType>(c);
result->user_key = Slice(internal_key.data(), n - 8);
result->user_key = Slice(internal_key.data(), n - 8 - 8);
result->FileNumber = DecodeFixed64(internal_key.data() + n - 8 - 8);
return (c <= static_cast<uint8_t>(kTypeValue)); return (c <= static_cast<uint8_t>(kTypeValue));
} }
@ -199,12 +201,13 @@ class LookupKey {
Slice internal_key() const { return Slice(kstart_, end_ - kstart_); } Slice internal_key() const { return Slice(kstart_, end_ - kstart_); }
// Return the user key // Return the user key
Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); }
Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8 - 8); }
private: private:
// We construct a char array of the form: // We construct a char array of the form:
// klength varint32 <-- start_ // klength varint32 <-- start_
// userkey char[klength] <-- kstart_
// userkey char[klength - 8 - 8] <-- kstart_
// FileNumber uint64
// tag uint64 // tag uint64
// <-- end_ // <-- end_
// The array is a suitable MemTable key. // The array is a suitable MemTable key.

+ 13
- 4
db/memtable.cc 查看文件

@ -4,6 +4,7 @@
#include "db/memtable.h" #include "db/memtable.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include <climits>
#include "leveldb/comparator.h" #include "leveldb/comparator.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/iterator.h" #include "leveldb/iterator.h"
@ -77,13 +78,14 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
const Slice& value) { const Slice& value) {
// Format of an entry is concatenation of: // Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size() // key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// key bytes : char[internal_key.size() - 8 - 8]
// FileNumber uint64
// tag : uint64((sequence << 8) | type) // tag : uint64((sequence << 8) | type)
// value_size : varint32 of value.size() // value_size : varint32 of value.size()
// value bytes : char[value.size()] // value bytes : char[value.size()]
size_t key_size = key.size(); size_t key_size = key.size();
size_t val_size = value.size(); size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
size_t internal_key_size = key_size + 8 + 8;
const size_t encoded_len = VarintLength(internal_key_size) + const size_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) + internal_key_size + VarintLength(val_size) +
val_size; val_size;
@ -91,6 +93,12 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
char* p = EncodeVarint32(buf, internal_key_size); char* p = EncodeVarint32(buf, internal_key_size);
std::memcpy(p, key.data(), key_size); std::memcpy(p, key.data(), key_size);
p += key_size; p += key_size;
if(value.size()) {
std::memcpy(p, value.data(), 8);
} else {
EncodeFixed64(p, UINT64_MAX);
}
p += 8;
EncodeFixed64(p, (s << 8) | type); EncodeFixed64(p, (s << 8) | type);
p += 8; p += 8;
p = EncodeVarint32(p, val_size); p = EncodeVarint32(p, val_size);
@ -106,7 +114,8 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
if (iter.Valid()) { if (iter.Valid()) {
// entry format is: // entry format is:
// klength varint32 // klength varint32
// userkey char[klength]
// userkey char[klength - 8]
// FileNumber uint64
// tag uint64 // tag uint64
// vlength varint32 // vlength varint32
// value char[vlength] // value char[vlength]
@ -117,7 +126,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
uint32_t key_length; uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (comparator_.comparator.user_comparator()->Compare( if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
Slice(key_ptr, key_length - 8 - 8), key.user_key()) == 0) {
// Correct user key // Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) { switch (static_cast<ValueType>(tag & 0xff)) {

Loading…
取消
儲存