diff --git a/CMakeLists.txt b/CMakeLists.txt index a3a5eb6..af7c9c1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -119,7 +119,9 @@ include(GNUInstallDirs) add_library(leveldb db/vlog_reader.h - db/vlog_reader.cpp) + db/vlog_reader.cc + db/vlog_writer.h + db/vlog_writer.cc) target_sources(leveldb PRIVATE "${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h" @@ -526,6 +528,7 @@ endif(LEVELDB_INSTALL) add_executable(db_test2 "${PROJECT_SOURCE_DIR}/test/db_test2.cc" test/value_field_test.cc + db/vlog_writer.cc ) target_link_libraries(db_test2 PRIVATE leveldb) diff --git a/db/vlog_reader.cc b/db/vlog_reader.cc new file mode 100644 index 0000000..64ab4c4 --- /dev/null +++ b/db/vlog_reader.cc @@ -0,0 +1,96 @@ +#include "vlog_reader.h" +#include "leveldb/env.h" +#include "util/coding.h" + +namespace leveldb { + +namespace log { + +VlogReader::VlogReader(SequentialFile *file, Reporter* reporter) + : file_(file), + file_random_(nullptr), + reporter_(reporter), + backing_store_(new char[kBlockSize]), + buffer_(), + eof_(false), + last_record_offset_(0) {} + +VlogReader::VlogReader(RandomAccessFile *file, Reporter* reporter) + : file_(nullptr), + file_random_(file), + reporter_(reporter), + backing_store_(new char[kBlockSize]), + buffer_(), + eof_(false), + last_record_offset_(0) {} + +VlogReader::~VlogReader() { delete[] backing_store_; } + +bool VlogReader::ReadValue(uint64_t offset, size_t length, Slice *key_value, char *scratch) { + if (file_random_ == nullptr) { + return false; + } + Status status = file_random_->Read(offset, length, key_value, scratch); + if (!status.ok()) { + return false; + } + return true; +} + +bool VlogReader::ReadRecord(Slice *record, std::string *scratch) { + if (ReadPhysicalRecord(scratch)) { + *record = *scratch; + return true; + } + return false; +} + +uint64_t VlogReader::LastRecordOffset() const { + return last_record_offset_; +} + +void VlogReader::ReportCorruption(uint64_t bytes, const Status &reason) { + if (reporter_ != nullptr) { + reporter_->Corruption(static_cast(bytes), reason); + } +} + +bool VlogReader::ReadPhysicalRecord(std::string *result) { + result->clear(); + buffer_.clear(); + + char* tmp_head = new char[vHeaderSize]; + Status status = file_->Read(vHeaderSize, &buffer_, tmp_head); + if (!status.ok()) { + buffer_.clear(); + ReportCorruption(kBlockSize, status); + eof_ = true; + return false; + } else if (buffer_.size() < vHeaderSize) { + eof_ = true; + } + + if (!eof_) { + result->assign(buffer_.data(),buffer_.size()); + uint32_t length = DecodeFixed32(buffer_.data()); + buffer_.clear(); + char* tmp = new char[length]; + status = file_->Read(length, &buffer_, tmp); + if (status.ok() && buffer_.size() == length) { + *result += buffer_.ToString(); + } else { + eof_ = true; + } + delete [] tmp; + } + delete [] tmp_head; + + if (eof_) { + result->clear(); + return false; + } + return true; +} + +} // namespace log +} // namespace leveldb diff --git a/db/vlog_reader.cpp b/db/vlog_reader.cpp deleted file mode 100644 index 4a928a3..0000000 --- a/db/vlog_reader.cpp +++ /dev/null @@ -1,96 +0,0 @@ -#include "vlog_reader.h" -#include "leveldb/env.h" -#include "util/coding.h" - -namespace leveldb { - -namespace log { - -VlogReader::VlogReader(SequentialFile *file, Reporter* reporter) - : file_(file), - file_random_(nullptr), - reporter_(reporter), - backing_store_(new char[kBlockSize]), - buffer_(), - eof_(false), - last_record_offset_(0) {} - -VlogReader::VlogReader(RandomAccessFile *file, Reporter* reporter) - : file_(nullptr), - file_random_(file), - reporter_(reporter), - backing_store_(new char[kBlockSize]), - buffer_(), - eof_(false), - last_record_offset_(0) {} - -VlogReader::~VlogReader() { delete[] backing_store_; } - -bool VlogReader::ReadValue(uint64_t offset, size_t length, Slice *key_value, char *scratch) { - if (file_random_ == nullptr) { - return false; - } - Status status = file_random_->Read(offset, length, key_value, scratch); - if (!status.ok()) { - return false; - } - return true; -} - -bool VlogReader::ReadRecord(Slice *record, std::string *scratch) { - if (ReadPhysicalRecord(scratch)) { - *record = *scratch; - return true; - } - return false; -} - -uint64_t VlogReader::LastRecordOffset() const { - return last_record_offset_; -} - -void VlogReader::ReportCorruption(uint64_t bytes, const Status &reason) { - if (reporter_ != nullptr) { - reporter_->Corruption(static_cast(bytes), reason); - } -} - -bool VlogReader::ReadPhysicalRecord(std::string *result) { - result->clear(); - buffer_.clear(); - - char* tmp_head = new char[vHeaderSize]; - Status status = file_->Read(vHeaderSize, &buffer_, tmp_head); - if (!status.ok()) { - buffer_.clear(); - ReportCorruption(kBlockSize, status); - eof_ = true; - return false; - } else if (buffer_.size() < vHeaderSize) { - eof_ = true; - } - - if (!eof_) { - result->assign(buffer_.data(),buffer_.size()); - uint32_t length = DecodeFixed32(buffer_.data()); - buffer_.clear(); - char* tmp = new char[length]; - status = file_->Read(length, &buffer_, tmp); - if (status.ok() && buffer_.size() == length) { - *result += buffer_.ToString(); - } else { - eof_ = true; - } - delete [] tmp; - } - delete [] tmp_head; - - if (eof_) { - result->clear(); - return false; - } - return true; -} - -} // namespace log -} // namespace leveldb diff --git a/db/vlog_writer.cc b/db/vlog_writer.cc new file mode 100644 index 0000000..65d7572 --- /dev/null +++ b/db/vlog_writer.cc @@ -0,0 +1,42 @@ +#include "db/vlog_writer.h" + +#include + +#include "leveldb/env.h" + +#include "util/coding.h" + +namespace leveldb { +namespace log { +VlogWriter::VlogWriter(WritableFile* dest) : dest_(dest), head_(0) {} + +VlogWriter::VlogWriter(WritableFile* dest, uint64_t dest_length) + : dest_(dest), head_(0) {} + +Status VlogWriter::AddRecord(const Slice& slice, uint64_t& offset) { + const char* ptr = slice.data(); + size_t left = slice.size(); + Status s; + s = EmitPhysicalRecord(ptr, left, offset); + return s; +} + +Status VlogWriter::EmitPhysicalRecord(const char* ptr, size_t length, + uint64_t& offset) { + assert(length <= 0xffff); + char buf[4]; + EncodeFixed32(buf, length); + Status s = dest_->Append(Slice(buf, 4)); + if (s.ok()) { + s = dest_->Append(Slice(ptr, length)); + if (s.ok()) { + s = dest_->Flush(); + offset = head_ + 4; + head_ += 4 + length; + } + } + return s; +} + +} // namespace log +} // namespace leveldb diff --git a/db/vlog_writer.h b/db/vlog_writer.h new file mode 100644 index 0000000..0c16669 --- /dev/null +++ b/db/vlog_writer.h @@ -0,0 +1,44 @@ +#ifndef LEVELDB_DB_VLOG_WRITER_H_ +#define LEVELDB_DB_VLOG_WRITER_H_ + +#include "db/log_format.h" +#include + +#include "leveldb/slice.h" +#include "leveldb/status.h" + +namespace leveldb { + +class WritableFile; + +namespace log { + +class VlogWriter { + public: + // Create a writer that will append data to "*dest". + // "*dest" must be initially empty. + // "*dest" must remain live while this Writer is in use. + explicit VlogWriter(WritableFile* dest); + + // Create a writer that will append data to "*dest". + // "*dest" must have initial length "dest_length". + // "*dest" must remain live while this Writer is in use. + VlogWriter(WritableFile* dest, uint64_t dest_length); + + VlogWriter(const VlogWriter&) = delete; + VlogWriter& operator=(const VlogWriter&) = delete; + + ~VlogWriter() = default; + + Status AddRecord(const Slice& slice, uint64_t& offset); + + private: + Status EmitPhysicalRecord(const char* ptr, size_t length, uint64_t& offset); + size_t head_; + WritableFile* dest_; +}; + +} // namespace log +} // namespace leveldb + +#endif // LEVELDB_DB_VLOG_WRITER_H_