Pārlūkot izejas kodu

vtable manager

main
ArcueidType pirms 8 mēnešiem
vecāks
revīzija
568fe7ac2e
13 mainītis faili ar 255 papildinājumiem un 7 dzēšanām
  1. +2
    -0
      CMakeLists.txt
  2. +15
    -1
      db/builder.cc
  3. +4
    -1
      db/builder.h
  4. +29
    -2
      db/db_impl.cc
  5. +3
    -0
      db/db_impl.h
  6. +7
    -0
      db/filename.cc
  7. +4
    -1
      db/filename.h
  8. +3
    -1
      db/repair.cc
  9. +1
    -0
      table/vtable_builder.cc
  10. +5
    -0
      table/vtable_builder.h
  11. +128
    -0
      table/vtable_manager.cc
  12. +53
    -0
      table/vtable_manager.h
  13. +1
    -1
      table/vtable_reader.cc

+ 2
- 0
CMakeLists.txt Parādīt failu

@ -176,6 +176,8 @@ target_sources(leveldb
"table/vtable_builder.h"
"table/vtable_format.cc"
"table/vtable_format.h"
"table/vtable_manager.cc"
"table/vtable_manager.h"
"table/vtable_reader.cc"
"table/vtable_reader.h"
"util/arena.cc"

+ 15
- 1
db/builder.cc Parādīt failu

@ -8,15 +8,19 @@
#include "db/filename.h"
#include "db/table_cache.h"
#include "db/version_edit.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "table/vtable_builder.h"
#include "table/vtable_manager.h"
namespace leveldb {
Status BuildTable(const std::string& dbname, Env* env, const Options& options,
TableCache* table_cache, Iterator* iter, FileMetaData* meta) {
TableCache* table_cache, Iterator* iter, FileMetaData* meta,
VTableMeta* vtable_meta) {
Status s;
meta->file_size = 0;
iter->SeekToFirst();
@ -94,6 +98,11 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
if (s.ok()) {
s = vtb_builder->Finish();
}
if (s.ok()) {
vtable_meta->number = meta->number;
vtable_meta->table_size = vtb_builder->FileSize();
vtable_meta->records_num = vtb_builder->RecordNumber();
}
delete vtb_builder;
if (s.ok()) {
@ -124,6 +133,11 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
} else {
env->RemoveFile(fname);
}
if (s.ok() && vtable_meta->table_size > 0) {
// Keep it
} else {
env->RemoveFile(vtb_name);
}
return s;
}

+ 4
- 1
db/builder.h Parādīt failu

@ -7,6 +7,8 @@
#include "leveldb/status.h"
#include "table/vtable_manager.h"
namespace leveldb {
struct Options;
@ -23,7 +25,8 @@ class VersionEdit;
// If no data is present in *iter, meta->file_size will be set to
// zero, and no Table file will be produced.
Status BuildTable(const std::string& dbname, Env* env, const Options& options,
TableCache* table_cache, Iterator* iter, FileMetaData* meta);
TableCache* table_cache, Iterator* iter, FileMetaData* meta,
VTableMeta* vtable_meta);
} // namespace leveldb

+ 29
- 2
db/db_impl.cc Parādīt failu

@ -34,6 +34,7 @@
#include "table/merger.h"
#include "table/two_level_iterator.h"
#include "table/vtable_format.h"
#include "table/vtable_manager.h"
#include "table/vtable_reader.h"
#include "util/coding.h"
#include "util/logging.h"
@ -151,7 +152,8 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
background_compaction_scheduled_(false),
manual_compaction_(nullptr),
versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {}
&internal_comparator_)),
vtable_manager_(new VTableManager(dbname, raw_options.env)){}
DBImpl::~DBImpl() {
// Wait for background work to finish.
@ -271,6 +273,7 @@ void DBImpl::RemoveObsoleteFiles() {
case kCurrentFile:
case kDBLockFile:
case kInfoLogFile:
case kVTableManagerFile:
keep = true;
break;
}
@ -386,6 +389,11 @@ Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
versions_->SetLastSequence(max_sequence);
}
s = vtable_manager_->LoadVTableMeta();
if (!s.ok()) {
return Status::Corruption("LoadVTableMeta failed");
}
return Status::OK();
}
@ -514,6 +522,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
FileMetaData meta;
VTableMeta vtable_meta;
meta.number = versions_->NewFileNumber();
pending_outputs_.insert(meta.number);
Iterator* iter = mem->NewIterator();
@ -523,7 +532,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
Status s;
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta, &vtable_meta);
mutex_.Lock();
}
@ -544,6 +553,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
}
edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
meta.largest);
vtable_manager_->AddVTable(vtable_meta);
}
CompactionStats stats;
@ -573,6 +583,9 @@ void DBImpl::CompactMemTable() {
edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
s = versions_->LogAndApply(&edit, &mutex_);
if (s.ok()) {
s = vtable_manager_->SaveVTableMeta();
}
}
if (s.ok()) {
@ -752,6 +765,12 @@ void DBImpl::BackgroundCompaction() {
if (!status.ok()) {
RecordBackgroundError(status);
}
status = vtable_manager_->SaveVTableMeta();
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number), c->level() + 1,
@ -1057,6 +1076,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (!status.ok()) {
RecordBackgroundError(status);
}
status = vtable_manager_->SaveVTableMeta();
if (!status.ok()) {
RecordBackgroundError(status);
}
VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));
return status;
@ -1695,6 +1719,9 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
if (s.ok()) {
s = impl->vtable_manager_->SaveVTableMeta();
}
}
if (s.ok()) {
impl->RemoveObsoleteFiles();

+ 3
- 0
db/db_impl.h Parādīt failu

@ -17,6 +17,7 @@
#include "leveldb/env.h"
#include "port/port.h"
#include "port/thread_annotations.h"
#include "table/vtable_manager.h"
namespace leveldb {
@ -210,6 +211,8 @@ class DBImpl : public DB {
Status bg_error_ GUARDED_BY(mutex_);
CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_);
VTableManager* vtable_manager_ GUARDED_BY(mutex_) {};
};
// Sanitize db options. The caller should delete result.info_log if

+ 7
- 0
db/filename.cc Parādīt failu

@ -73,6 +73,10 @@ std::string OldInfoLogFileName(const std::string& dbname) {
return dbname + "/LOG.old";
}
std::string VTableManagerFileName(const std::string& dbname) {
return dbname + "/VTableMeta";
}
// Owned filenames have the form:
// dbname/CURRENT
// dbname/LOCK
@ -89,6 +93,9 @@ bool ParseFileName(const std::string& filename, uint64_t* number,
} else if (rest == "LOCK") {
*number = 0;
*type = kDBLockFile;
} else if (rest == "VTableMeta") {
*number = 0;
*type = kVTableManagerFile;
} else if (rest == "LOG" || rest == "LOG.old") {
*number = 0;
*type = kInfoLogFile;

+ 4
- 1
db/filename.h Parādīt failu

@ -26,7 +26,8 @@ enum FileType {
kCurrentFile,
kTempFile,
kInfoLogFile,
kVTableFile// Either the current one, or an old one
kVTableFile,
kVTableManagerFile// Either the current one, or an old one
};
// Return the name of the log file with the specified number
@ -73,6 +74,8 @@ std::string InfoLogFileName(const std::string& dbname);
// Return the name of the old info log file for "dbname".
std::string OldInfoLogFileName(const std::string& dbname);
std::string VTableManagerFileName(const std::string& dbname);
// If filename is a leveldb file, store the type of the file in *type.
// The number encoded in the filename is stored in *number. If the
// filename was successfully parsed, returns true. Else return false.

+ 3
- 1
db/repair.cc Parādīt failu

@ -201,9 +201,10 @@ class Repairer {
// Do not record a version edit for this conversion to a Table
// since ExtractMetaData() will also generate edits.
FileMetaData meta;
VTableMeta vtable_meta;
meta.number = next_file_number_++;
Iterator* iter = mem->NewIterator();
status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta, &vtable_meta);
delete iter;
mem->Unref();
mem = nullptr;
@ -212,6 +213,7 @@ class Repairer {
table_numbers_.push_back(meta.number);
}
}
Log(options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s",
(unsigned long long)log, counter, (unsigned long long)meta.number,
status.ToString().c_str());

+ 1
- 0
table/vtable_builder.cc Parādīt failu

@ -19,6 +19,7 @@ void VTableBuilder::Add(const VTableRecord& record, VTableHandle* handle) {
status_ = file_->Append(encoder_.GetHeader().ToString() +
encoder_.GetRecord().ToString());
record_number_ += 1;
assert(ok());
//TODO: meta info support in the future
}

+ 5
- 0
table/vtable_builder.h Parādīt failu

@ -23,11 +23,16 @@ class VTableBuilder {
// Abandon building the vTable
void Abandon();
uint64_t FileSize() const { return file_size_; }
uint64_t RecordNumber() const { return record_number_; }
private:
bool ok() const { return status().ok(); }
WritableFile* file_;
uint64_t file_size_{0};
uint64_t record_number_{0};
Status status_;

+ 128
- 0
table/vtable_manager.cc Parādīt failu

@ -0,0 +1,128 @@
#include "table/vtable_manager.h"
#include "db/filename.h"
#include <iostream>
#include <ostream>
#include "leveldb/env.h"
#include "leveldb/status.h"
#include "util/coding.h"
namespace leveldb {
void VTableMeta::Encode(std::string* target) const {
PutVarint64(target, number);
PutVarint64(target, records_num);
PutVarint64(target, invalid_num);
PutVarint64(target, table_size);
}
Status VTableMeta::Decode(Slice* input) {
if (!GetVarint64(input, &number) || !GetVarint64(input, &records_num) ||
!GetVarint64(input, &invalid_num) || !GetVarint64(input, &table_size)) {
return Status::Corruption("Error Decode VTable meta");
}
return Status::OK();
}
void VTableManager::AddVTable(const VTableMeta& vtable_meta) {
vtables_[vtable_meta.number] = vtable_meta;
}
void VTableManager::RemoveVTable(uint64_t file_num) {
vtables_.erase(file_num);
}
Status VTableManager::AddInvalid(uint64_t file_num) {
const auto it = vtables_.find(file_num);
if (it == vtables_.end()) {
return Status::Corruption("Invalid VTable number");
}
vtables_[file_num].invalid_num += 1;
if (vtables_[file_num].invalid_num >= vtables_[file_num].records_num) {
invalid_.emplace_back(file_num);
}
return Status::OK();
}
Status VTableManager::SaveVTableMeta() const {
auto fname = VTableManagerFileName(dbname_);
WritableFile* file;
Status s = env_->NewWritableFile(fname, &file);
if (!s.ok()) {
return Status::Corruption("Failed to open vTable manager file");
}
const auto vtable_num = vtables_.size();
std::string target;
PutVarint64(&target, vtable_num);
for (auto & vtable : vtables_) {
vtable.second.Encode(&target);
}
s = file->Append(target);
if (!s.ok()) {
return Status::Corruption("Failed to append vTable manager file");
}
s = file->Flush();
if (s.ok()) {
s = file->Sync();
}
if (s.ok()) {
s = file->Close();
}
delete file;
file = nullptr;
if (!s.ok()) {
return Status::Corruption("Failed to write vTable meta file");
}
return Status::OK();
}
Status VTableManager::LoadVTableMeta() {
auto fname = VTableManagerFileName(dbname_);
if (!env_->FileExists(fname)) {
return Status::OK();
}
SequentialFile* file;
Status s = env_->NewSequentialFile(fname, &file);
if (!s.ok()) {
return Status::Corruption("Failed to open vTable manager file");
}
uint64_t file_size;
s = env_->GetFileSize(fname, &file_size);
if (!s.ok()) {
return Status::Corruption("Failed to get vTable manager file size");
}
auto buf = new char[file_size];
Slice input;
s = file->Read(file_size, &input, buf);
if (!s.ok()) {
return Status::Corruption("Failed to read vTable manager file");
}
uint64_t vtable_num;
if(!GetVarint64(&input, &vtable_num)) {
return Status::Corruption("Failed to get vTable num");
}
for (int i = 0; i < vtable_num; i++) {
VTableMeta vtable_meta;
s = vtable_meta.Decode(&input);
if (s.ok()) {
AddVTable(vtable_meta);
if (vtable_meta.invalid_num >= vtable_meta.records_num) {
invalid_.emplace_back(vtable_meta.number);
}
} else {
return s;
}
}
return s;
}
} // namespace leveldb

+ 53
- 0
table/vtable_manager.h Parādīt failu

@ -0,0 +1,53 @@
#ifndef VTABLE_MANAGER_H
#define VTABLE_MANAGER_H
#include <map>
#include "leveldb/env.h"
#include "leveldb/slice.h"
#include "leveldb/status.h"
namespace leveldb {
struct VTableMeta {
uint64_t number;
uint64_t records_num;
uint64_t invalid_num;
uint64_t table_size;
void Encode(std::string* target) const;
Status Decode(Slice* input);
VTableMeta() : number(0), records_num(0), invalid_num(0), table_size(0) {}
};
class VTableManager {
public:
explicit VTableManager(const std::string& dbname, Env* env) :
dbname_(dbname),
env_(env) {}
~VTableManager() = default;
void AddVTable(const VTableMeta& vtable_meta);
void RemoveVTable(uint64_t file_num);
Status AddInvalid(uint64_t file_num);
Status SaveVTableMeta() const;
Status LoadVTableMeta();
private:
std::string dbname_;
Env* env_;
std::map<uint64_t, VTableMeta> vtables_;
std::vector<uint64_t> invalid_;
};
} // namespace leveldb
#endif //VTABLE_MANAGER_H

+ 1
- 1
table/vtable_reader.cc Parādīt failu

@ -2,7 +2,7 @@
#include "leveldb/env.h"
#include &lt;table/vtable_reader.h>;
#include &#34;table/vtable_reader.h";
namespace leveldb {
Status VTableReader::Open(const Options& options, std::string fname) {

Notiek ielāde…
Atcelt
Saglabāt