diff --git a/db/db_impl.cc b/db/db_impl.cc index 3a5c225..06f8e61 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1249,7 +1249,11 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { // into mem_. { mutex_.Unlock(); - status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); + // 先写入vlog再写入memtable + // 写vlog日志 offset 表示这个 write_batch 在vlog中的偏移地址。 + uint64_t offset = 0; + status = vlog_->AddRecord(WriteBatchInternal::Contents(write_batch),offset); + // status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); bool sync_error = false; if (status.ok() && options.sync) { status = logfile_->Sync(); @@ -1258,7 +1262,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { } } if (status.ok()) { - status = WriteBatchInternal::InsertInto(write_batch, mem_); + status = WriteBatchInternal::InsertInto(write_batch, mem_, logfile_number_, offset); } mutex_.Lock(); if (sync_error) { diff --git a/db/db_impl.h b/db/db_impl.h index 91f4fd4..81db57d 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -12,6 +12,7 @@ #include "db/dbformat.h" #include "db/log_writer.h" +#include "db/vlog_writer.h" #include "db/snapshot.h" #include "leveldb/db.h" #include "leveldb/env.h" @@ -207,6 +208,8 @@ class DBImpl : public DB { Status bg_error_ GUARDED_BY(mutex_); CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_); + + log::VlogWriter* vlog_; }; // Sanitize db options. The caller should delete result.info_log if diff --git a/db/vlog_writer.cc b/db/vlog_writer.cc new file mode 100644 index 0000000..b07e2bc --- /dev/null +++ b/db/vlog_writer.cc @@ -0,0 +1,45 @@ +#include "db/vlog_writer.h" + +#include + +#include "leveldb/env.h" + +#include "util/coding.h" +#include "util/crc32c.h" + +namespace leveldb { +namespace log { +VlogWriter::VlogWriter(WritableFile* dest) : dest_(dest), head_(0) {} + +VlogWriter::VlogWriter(WritableFile* dest, uint64_t dest_length) + : dest_(dest), head_(0) {} + +VlogWriter::~VlogWriter() = default; + +Status VlogWriter::AddRecord(const Slice& slice, uint64_t& offset) { + const char* ptr = slice.data(); + size_t left = slice.size(); + Status s; + s = EmitPhysicalRecord(ptr, left, offset); + return s; +} + +Status VlogWriter::EmitPhysicalRecord(const char* ptr, size_t length, + uint64_t& offset) { + assert(length <= 0xffff); + char buf[4]; + EncodeFixed32(buf, length); + Status s = dest_->Append(Slice(buf, 4)); + if (s.ok()) { + s = dest_->Append(Slice(ptr, length)); + if (s.ok()) { + s = dest_->Flush(); + offset = head_ + 4; + head_ += 4 + length; + } + } + return s; +} + +} // namespace log +} // namespace leveldb diff --git a/db/vlog_writer.h b/db/vlog_writer.h new file mode 100644 index 0000000..9cb2313 --- /dev/null +++ b/db/vlog_writer.h @@ -0,0 +1,44 @@ +#ifndef LEVELDB_DB_VLOG_WRITER_H_ +#define LEVELDB_DB_VLOG_WRITER_H_ + +#include "db/log_format.h" +#include + +#include "leveldb/slice.h" +#include "leveldb/status.h" + +namespace leveldb { + +class WritableFile; + +namespace log { + +class VlogWriter { + public: + // Create a writer that will append data to "*dest". + // "*dest" must be initially empty. + // "*dest" must remain live while this Writer is in use. + explicit VlogWriter(WritableFile* dest); + + // Create a writer that will append data to "*dest". + // "*dest" must have initial length "dest_length". + // "*dest" must remain live while this Writer is in use. + VlogWriter(WritableFile* dest, uint64_t dest_length); + + VlogWriter(const VlogWriter&) = delete; + VlogWriter& operator=(const VlogWriter&) = delete; + + VlogWriter(); + + Status AddRecord(const Slice& slice, uint64_t& offset); + + private: + Status EmitPhysicalRecord(const char* ptr, size_t length, uint64_t& offset); + size_t head_; + WritableFile* dest_; +}; + +} // namespace log +} // namespace leveldb + +#endif // LEVELDB_DB_VLOG_WRITER_H_ diff --git a/db/write_batch.cc b/db/write_batch.cc index b54313c..874bc34 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -18,7 +18,9 @@ #include "db/dbformat.h" #include "db/memtable.h" #include "db/write_batch_internal.h" + #include "leveldb/db.h" + #include "util/coding.h" namespace leveldb { @@ -79,6 +81,68 @@ Status WriteBatch::Iterate(Handler* handler) const { } } +Status WriteBatch::Iterate(Handler* handler, uint64_t fid, + uint64_t offset) const { + Slice input(rep_); + // 整个writebatch 的起始地址 + const char* begin = input.data(); + + if (input.size() < kHeader) { + return Status::Corruption("malformed WriteBatch (too small)"); + } + // 12个字节,8个字节用来表示sequence,4个字节用来表示 count,移除头 + input.remove_prefix(kHeader); + Slice key, value; + int found = 0; + while (!input.empty()) { + found++; + const uint64_t kv_offset = input.data() - begin + offset; + assert(kv_offset > 0); + + // record 记录为 1 个字节 是 kTypeValue ,剩下的字节是 key value + // record 记录为 1 个字节 是 kTypeDeletion, 剩下的字节是key + char tag = input[0]; + input.remove_prefix(1); + switch (tag) { + case kTypeValue: + if (GetLengthPrefixedSlice(&input, &key) && + GetLengthPrefixedSlice(&input, &value)) { + handler->Put(key, value); + } else { + return Status::Corruption("bad WriteBatch Put"); + } + break; + case kTypeDeletion: + if (GetLengthPrefixedSlice(&input, &key)) { + handler->Delete(key); + } else { + return Status::Corruption("bad WriteBatch Delete"); + } + break; + // case kTypeSeparate: + // if (GetLengthPrefixedSlice(&input, &key) && + // GetLengthPrefixedSlice(&input, &(value))) { + // // value = fileNumber + offset + valuesize 采用变长编码的方式 + // std::string dest; + // PutVarint64(&dest, fid); + // PutVarint64(&dest, kv_offset); + // PutVarint64(&dest, value.size()); + // Slice value_offset(dest); + // handler->Put(key, value_offset, kTypeSeparate); + // } else { + // return Status::Corruption("bad WriteBatch Put"); + // } + // break; + default: + return Status::Corruption("unknown WriteBatch tag"); + } + } + if (found != WriteBatchInternal::Count(this)) { + return Status::Corruption("WriteBatch has wrong count"); + } else { + return Status::OK(); + } +} int WriteBatchInternal::Count(const WriteBatch* b) { return DecodeFixed32(b->rep_.data() + 8); } @@ -136,6 +200,15 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) { return b->Iterate(&inserter); } +Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable, + uint64_t fid, size_t offset) { + MemTableInserter inserter; + // 一批 writeBatch中只有一个sequence,公用的,后续会自加。 + inserter.sequence_ = WriteBatchInternal::Sequence(b); + inserter.mem_ = memtable; + return b->Iterate(&inserter, fid, offset); +} + void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { assert(contents.size() >= kHeader); b->rep_.assign(contents.data(), contents.size()); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index fce86e3..dd08be8 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -37,6 +37,8 @@ class WriteBatchInternal { static Status InsertInto(const WriteBatch* batch, MemTable* memtable); + static Status InsertInto(const WriteBatch* batch, MemTable* memtable,uint64_t fid, size_t offset); + static void Append(WriteBatch* dst, const WriteBatch* src); }; diff --git a/include/leveldb/write_batch.h b/include/leveldb/write_batch.h index 94d4115..2af98a3 100644 --- a/include/leveldb/write_batch.h +++ b/include/leveldb/write_batch.h @@ -71,6 +71,7 @@ class LEVELDB_EXPORT WriteBatch { // Support for iterating over the contents of a batch. Status Iterate(Handler* handler) const; + Status Iterate(Handler* handler, uint64_t fid, uint64_t offset) const; private: friend class WriteBatchInternal;