Browse Source

vlog_reader/writer v1.0

main
VirgilZhu 8 months ago
parent
commit
10baaeb199
5 changed files with 186 additions and 97 deletions
  1. +4
    -1
      CMakeLists.txt
  2. +96
    -0
      db/vlog_reader.cc
  3. +0
    -96
      db/vlog_reader.cpp
  4. +42
    -0
      db/vlog_writer.cc
  5. +44
    -0
      db/vlog_writer.h

+ 4
- 1
CMakeLists.txt View File

@ -119,7 +119,9 @@ include(GNUInstallDirs)
add_library(leveldb add_library(leveldb
db/vlog_reader.h db/vlog_reader.h
db/vlog_reader.cpp)
db/vlog_reader.cc
db/vlog_writer.h
db/vlog_writer.cc)
target_sources(leveldb target_sources(leveldb
PRIVATE PRIVATE
"${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h" "${PROJECT_BINARY_DIR}/${LEVELDB_PORT_CONFIG_DIR}/port_config.h"
@ -526,6 +528,7 @@ endif(LEVELDB_INSTALL)
add_executable(db_test2 add_executable(db_test2
"${PROJECT_SOURCE_DIR}/test/db_test2.cc" "${PROJECT_SOURCE_DIR}/test/db_test2.cc"
test/value_field_test.cc test/value_field_test.cc
db/vlog_writer.cc
) )
target_link_libraries(db_test2 PRIVATE leveldb) target_link_libraries(db_test2 PRIVATE leveldb)

+ 96
- 0
db/vlog_reader.cc View File

@ -0,0 +1,96 @@
#include "vlog_reader.h"
#include "leveldb/env.h"
#include "util/coding.h"
namespace leveldb {
namespace log {
VlogReader::VlogReader(SequentialFile *file, Reporter* reporter)
: file_(file),
file_random_(nullptr),
reporter_(reporter),
backing_store_(new char[kBlockSize]),
buffer_(),
eof_(false),
last_record_offset_(0) {}
VlogReader::VlogReader(RandomAccessFile *file, Reporter* reporter)
: file_(nullptr),
file_random_(file),
reporter_(reporter),
backing_store_(new char[kBlockSize]),
buffer_(),
eof_(false),
last_record_offset_(0) {}
VlogReader::~VlogReader() { delete[] backing_store_; }
bool VlogReader::ReadValue(uint64_t offset, size_t length, Slice *key_value, char *scratch) {
if (file_random_ == nullptr) {
return false;
}
Status status = file_random_->Read(offset, length, key_value, scratch);
if (!status.ok()) {
return false;
}
return true;
}
bool VlogReader::ReadRecord(Slice *record, std::string *scratch) {
if (ReadPhysicalRecord(scratch)) {
*record = *scratch;
return true;
}
return false;
}
uint64_t VlogReader::LastRecordOffset() const {
return last_record_offset_;
}
void VlogReader::ReportCorruption(uint64_t bytes, const Status &reason) {
if (reporter_ != nullptr) {
reporter_->Corruption(static_cast<size_t>(bytes), reason);
}
}
bool VlogReader::ReadPhysicalRecord(std::string *result) {
result->clear();
buffer_.clear();
char* tmp_head = new char[vHeaderSize];
Status status = file_->Read(vHeaderSize, &buffer_, tmp_head);
if (!status.ok()) {
buffer_.clear();
ReportCorruption(kBlockSize, status);
eof_ = true;
return false;
} else if (buffer_.size() < vHeaderSize) {
eof_ = true;
}
if (!eof_) {
result->assign(buffer_.data(),buffer_.size());
uint32_t length = DecodeFixed32(buffer_.data());
buffer_.clear();
char* tmp = new char[length];
status = file_->Read(length, &buffer_, tmp);
if (status.ok() && buffer_.size() == length) {
*result += buffer_.ToString();
} else {
eof_ = true;
}
delete [] tmp;
}
delete [] tmp_head;
if (eof_) {
result->clear();
return false;
}
return true;
}
} // namespace log
} // namespace leveldb

+ 0
- 96
db/vlog_reader.cpp View File

@ -1,96 +0,0 @@
#include "vlog_reader.h"
#include "leveldb/env.h"
#include "util/coding.h"
namespace leveldb {
namespace log {
VlogReader::VlogReader(SequentialFile *file, Reporter* reporter)
: file_(file),
file_random_(nullptr),
reporter_(reporter),
backing_store_(new char[kBlockSize]),
buffer_(),
eof_(false),
last_record_offset_(0) {}
VlogReader::VlogReader(RandomAccessFile *file, Reporter* reporter)
: file_(nullptr),
file_random_(file),
reporter_(reporter),
backing_store_(new char[kBlockSize]),
buffer_(),
eof_(false),
last_record_offset_(0) {}
VlogReader::~VlogReader() { delete[] backing_store_; }
bool VlogReader::ReadValue(uint64_t offset, size_t length, Slice *key_value, char *scratch) {
if (file_random_ == nullptr) {
return false;
}
Status status = file_random_->Read(offset, length, key_value, scratch);
if (!status.ok()) {
return false;
}
return true;
}
bool VlogReader::ReadRecord(Slice *record, std::string *scratch) {
if (ReadPhysicalRecord(scratch)) {
*record = *scratch;
return true;
}
return false;
}
uint64_t VlogReader::LastRecordOffset() const {
return last_record_offset_;
}
void VlogReader::ReportCorruption(uint64_t bytes, const Status &reason) {
if (reporter_ != nullptr) {
reporter_->Corruption(static_cast<size_t>(bytes), reason);
}
}
bool VlogReader::ReadPhysicalRecord(std::string *result) {
result->clear();
buffer_.clear();
char* tmp_head = new char[vHeaderSize];
Status status = file_->Read(vHeaderSize, &buffer_, tmp_head);
if (!status.ok()) {
buffer_.clear();
ReportCorruption(kBlockSize, status);
eof_ = true;
return false;
} else if (buffer_.size() < vHeaderSize) {
eof_ = true;
}
if (!eof_) {
result->assign(buffer_.data(),buffer_.size());
uint32_t length = DecodeFixed32(buffer_.data());
buffer_.clear();
char* tmp = new char[length];
status = file_->Read(length, &buffer_, tmp);
if (status.ok() && buffer_.size() == length) {
*result += buffer_.ToString();
} else {
eof_ = true;
}
delete [] tmp;
}
delete [] tmp_head;
if (eof_) {
result->clear();
return false;
}
return true;
}
} // namespace log
} // namespace leveldb

+ 42
- 0
db/vlog_writer.cc View File

@ -0,0 +1,42 @@
#include "db/vlog_writer.h"
#include <cstdint>
#include "leveldb/env.h"
#include "util/coding.h"
namespace leveldb {
namespace log {
VlogWriter::VlogWriter(WritableFile* dest) : dest_(dest), head_(0) {}
VlogWriter::VlogWriter(WritableFile* dest, uint64_t dest_length)
: dest_(dest), head_(0) {}
Status VlogWriter::AddRecord(const Slice& slice, uint64_t& offset) {
const char* ptr = slice.data();
size_t left = slice.size();
Status s;
s = EmitPhysicalRecord(ptr, left, offset);
return s;
}
Status VlogWriter::EmitPhysicalRecord(const char* ptr, size_t length,
uint64_t& offset) {
assert(length <= 0xffff);
char buf[4];
EncodeFixed32(buf, length);
Status s = dest_->Append(Slice(buf, 4));
if (s.ok()) {
s = dest_->Append(Slice(ptr, length));
if (s.ok()) {
s = dest_->Flush();
offset = head_ + 4;
head_ += 4 + length;
}
}
return s;
}
} // namespace log
} // namespace leveldb

+ 44
- 0
db/vlog_writer.h View File

@ -0,0 +1,44 @@
#ifndef LEVELDB_DB_VLOG_WRITER_H_
#define LEVELDB_DB_VLOG_WRITER_H_
#include "db/log_format.h"
#include <cstdint>
#include "leveldb/slice.h"
#include "leveldb/status.h"
namespace leveldb {
class WritableFile;
namespace log {
class VlogWriter {
public:
// Create a writer that will append data to "*dest".
// "*dest" must be initially empty.
// "*dest" must remain live while this Writer is in use.
explicit VlogWriter(WritableFile* dest);
// Create a writer that will append data to "*dest".
// "*dest" must have initial length "dest_length".
// "*dest" must remain live while this Writer is in use.
VlogWriter(WritableFile* dest, uint64_t dest_length);
VlogWriter(const VlogWriter&) = delete;
VlogWriter& operator=(const VlogWriter&) = delete;
~VlogWriter() = default;
Status AddRecord(const Slice& slice, uint64_t& offset);
private:
Status EmitPhysicalRecord(const char* ptr, size_t length, uint64_t& offset);
size_t head_;
WritableFile* dest_;
};
} // namespace log
} // namespace leveldb
#endif // LEVELDB_DB_VLOG_WRITER_H_

Loading…
Cancel
Save