Kaynağa Gözat

Add VlogWriter

main
GUJIEJASON 8 ay önce
ebeveyn
işleme
af6de4df42
7 değiştirilmiş dosya ile 174 ekleme ve 2 silme
  1. +6
    -2
      db/db_impl.cc
  2. +3
    -0
      db/db_impl.h
  3. +45
    -0
      db/vlog_writer.cc
  4. +44
    -0
      db/vlog_writer.h
  5. +73
    -0
      db/write_batch.cc
  6. +2
    -0
      db/write_batch_internal.h
  7. +1
    -0
      include/leveldb/write_batch.h

+ 6
- 2
db/db_impl.cc Dosyayı Görüntüle

@ -1249,7 +1249,11 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
// into mem_.
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
// 先写入vlog再写入memtable
// 写vlog日志 offset 表示这个 write_batch 在vlog中的偏移地址。
uint64_t offset = 0;
status = vlog_->AddRecord(WriteBatchInternal::Contents(write_batch),offset);
// status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
@ -1258,7 +1262,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
status = WriteBatchInternal::InsertInto(write_batch, mem_, logfile_number_, offset);
}
mutex_.Lock();
if (sync_error) {

+ 3
- 0
db/db_impl.h Dosyayı Görüntüle

@ -12,6 +12,7 @@
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/vlog_writer.h"
#include "db/snapshot.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
@ -207,6 +208,8 @@ class DBImpl : public DB {
Status bg_error_ GUARDED_BY(mutex_);
CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_);
log::VlogWriter* vlog_;
};
// Sanitize db options. The caller should delete result.info_log if

+ 45
- 0
db/vlog_writer.cc Dosyayı Görüntüle

@ -0,0 +1,45 @@
#include "db/vlog_writer.h"
#include <cstdint>
#include "leveldb/env.h"
#include "util/coding.h"
#include "util/crc32c.h"
namespace leveldb {
namespace log {
VlogWriter::VlogWriter(WritableFile* dest) : dest_(dest), head_(0) {}
VlogWriter::VlogWriter(WritableFile* dest, uint64_t dest_length)
: dest_(dest), head_(0) {}
VlogWriter::~VlogWriter() = default;
Status VlogWriter::AddRecord(const Slice& slice, uint64_t& offset) {
const char* ptr = slice.data();
size_t left = slice.size();
Status s;
s = EmitPhysicalRecord(ptr, left, offset);
return s;
}
Status VlogWriter::EmitPhysicalRecord(const char* ptr, size_t length,
uint64_t& offset) {
assert(length <= 0xffff);
char buf[4];
EncodeFixed32(buf, length);
Status s = dest_->Append(Slice(buf, 4));
if (s.ok()) {
s = dest_->Append(Slice(ptr, length));
if (s.ok()) {
s = dest_->Flush();
offset = head_ + 4;
head_ += 4 + length;
}
}
return s;
}
} // namespace log
} // namespace leveldb

+ 44
- 0
db/vlog_writer.h Dosyayı Görüntüle

@ -0,0 +1,44 @@
#ifndef LEVELDB_DB_VLOG_WRITER_H_
#define LEVELDB_DB_VLOG_WRITER_H_
#include "db/log_format.h"
#include <cstdint>
#include "leveldb/slice.h"
#include "leveldb/status.h"
namespace leveldb {
class WritableFile;
namespace log {
class VlogWriter {
public:
// Create a writer that will append data to "*dest".
// "*dest" must be initially empty.
// "*dest" must remain live while this Writer is in use.
explicit VlogWriter(WritableFile* dest);
// Create a writer that will append data to "*dest".
// "*dest" must have initial length "dest_length".
// "*dest" must remain live while this Writer is in use.
VlogWriter(WritableFile* dest, uint64_t dest_length);
VlogWriter(const VlogWriter&) = delete;
VlogWriter& operator=(const VlogWriter&) = delete;
VlogWriter();
Status AddRecord(const Slice& slice, uint64_t& offset);
private:
Status EmitPhysicalRecord(const char* ptr, size_t length, uint64_t& offset);
size_t head_;
WritableFile* dest_;
};
} // namespace log
} // namespace leveldb
#endif // LEVELDB_DB_VLOG_WRITER_H_

+ 73
- 0
db/write_batch.cc Dosyayı Görüntüle

@ -18,7 +18,9 @@
#include "db/dbformat.h"
#include "db/memtable.h"
#include "db/write_batch_internal.h"
#include "leveldb/db.h"
#include "util/coding.h"
namespace leveldb {
@ -79,6 +81,68 @@ Status WriteBatch::Iterate(Handler* handler) const {
}
}
Status WriteBatch::Iterate(Handler* handler, uint64_t fid,
uint64_t offset) const {
Slice input(rep_);
// 整个writebatch 的起始地址
const char* begin = input.data();
if (input.size() < kHeader) {
return Status::Corruption("malformed WriteBatch (too small)");
}
// 12个字节,8个字节用来表示sequence,4个字节用来表示 count,移除头
input.remove_prefix(kHeader);
Slice key, value;
int found = 0;
while (!input.empty()) {
found++;
const uint64_t kv_offset = input.data() - begin + offset;
assert(kv_offset > 0);
// record 记录为 1 个字节 是 kTypeValue ,剩下的字节是 key value
// record 记录为 1 个字节 是 kTypeDeletion, 剩下的字节是key
char tag = input[0];
input.remove_prefix(1);
switch (tag) {
case kTypeValue:
if (GetLengthPrefixedSlice(&input, &key) &&
GetLengthPrefixedSlice(&input, &value)) {
handler->Put(key, value);
} else {
return Status::Corruption("bad WriteBatch Put");
}
break;
case kTypeDeletion:
if (GetLengthPrefixedSlice(&input, &key)) {
handler->Delete(key);
} else {
return Status::Corruption("bad WriteBatch Delete");
}
break;
// case kTypeSeparate:
// if (GetLengthPrefixedSlice(&input, &key) &&
// GetLengthPrefixedSlice(&input, &(value))) {
// // value = fileNumber + offset + valuesize 采用变长编码的方式
// std::string dest;
// PutVarint64(&dest, fid);
// PutVarint64(&dest, kv_offset);
// PutVarint64(&dest, value.size());
// Slice value_offset(dest);
// handler->Put(key, value_offset, kTypeSeparate);
// } else {
// return Status::Corruption("bad WriteBatch Put");
// }
// break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
}
if (found != WriteBatchInternal::Count(this)) {
return Status::Corruption("WriteBatch has wrong count");
} else {
return Status::OK();
}
}
int WriteBatchInternal::Count(const WriteBatch* b) {
return DecodeFixed32(b->rep_.data() + 8);
}
@ -136,6 +200,15 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
return b->Iterate(&inserter);
}
Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable,
uint64_t fid, size_t offset) {
MemTableInserter inserter;
// 一批 writeBatch中只有一个sequence,公用的,后续会自加。
inserter.sequence_ = WriteBatchInternal::Sequence(b);
inserter.mem_ = memtable;
return b->Iterate(&inserter, fid, offset);
}
void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
assert(contents.size() >= kHeader);
b->rep_.assign(contents.data(), contents.size());

+ 2
- 0
db/write_batch_internal.h Dosyayı Görüntüle

@ -37,6 +37,8 @@ class WriteBatchInternal {
static Status InsertInto(const WriteBatch* batch, MemTable* memtable);
static Status InsertInto(const WriteBatch* batch, MemTable* memtable,uint64_t fid, size_t offset);
static void Append(WriteBatch* dst, const WriteBatch* src);
};

+ 1
- 0
include/leveldb/write_batch.h Dosyayı Görüntüle

@ -71,6 +71,7 @@ class LEVELDB_EXPORT WriteBatch {
// Support for iterating over the contents of a batch.
Status Iterate(Handler* handler) const;
Status Iterate(Handler* handler, uint64_t fid, uint64_t offset) const;
private:
friend class WriteBatchInternal;

Yükleniyor…
İptal
Kaydet