- // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file. See the AUTHORS file for names of contributors.
-
- #include "db/builder.h"
-
- #include "db/dbformat.h"
- #include "db/filename.h"
- #include "db/table_cache.h"
- #include "db/version_edit.h"
- #include "leveldb/db.h"
- #include "leveldb/env.h"
- #include "leveldb/iterator.h"
- #include "table/vtable_builder.h"
-
- namespace leveldb {
-
- Status BuildTable(const std::string& dbname, Env* env, const Options& options,
- TableCache* table_cache, Iterator* iter, FileMetaData* meta) {
- Status s;
- meta->file_size = 0;
- iter->SeekToFirst();
-
- std::string fname = TableFileName(dbname, meta->number);
- std::string vtb_name = VTableFileName(dbname, meta->number);
- if (iter->Valid()) {
- WritableFile* file;
- s = env->NewWritableFile(fname, &file);
- if (!s.ok()) {
- return s;
- }
-
- WritableFile* vtb_file;
- s = env->NewWritableFile(vtb_name, &vtb_file);
- if (!s.ok()) {
- return s;
- }
-
- TableBuilder* builder = new TableBuilder(options, file);
- VTableBuilder* vtb_builder = new VTableBuilder(options, vtb_file);
- meta->smallest.DecodeFrom(iter->key());
- Slice key;
- for (; iter->Valid(); iter->Next()) {
- key = iter->key();
- Slice value = iter->value();
- if (value.size() < options.kv_sep_size) {
- // No need to separate key and value
- builder->Add(key, value);
- }
- else {
- // Separate key value
- ParsedInternalKey parsed;
- if (!ParseInternalKey(key, &parsed)) {
- s = Status::Corruption("Fatal. Memtable Key Error");
- builder->Abandon();
- vtb_builder->Abandon();
- return s;
- }
- value.remove_prefix(1);
- VTableRecord record {parsed.user_key, value};
- VTableHandle handle;
- VTableIndex index;
- std::string value_index;
- vtb_builder->Add(record, &handle);
-
- index.file_number = meta->number;
- index.vtable_handle = handle;
- index.Encode(&value_index);
- builder->Add(key, Slice(value_index));
- }
- }
- if (!key.empty()) {
- meta->largest.DecodeFrom(key);
- }
-
- // Finish and check for builder errors
- s = builder->Finish();
- if (s.ok()) {
- meta->file_size = builder->FileSize();
- assert(meta->file_size > 0);
- }
- delete builder;
-
- // Finish and check for file errors
- if (s.ok()) {
- s = file->Sync();
- }
- if (s.ok()) {
- s = file->Close();
- }
- delete file;
- file = nullptr;
-
- if (s.ok()) {
- s = vtb_builder->Finish();
- }
- delete vtb_builder;
-
- if (s.ok()) {
- s = vtb_file->Sync();
- }
- if (s.ok()) {
- s = vtb_file->Close();
- }
- delete vtb_file;
- vtb_file = nullptr;
-
- if (s.ok()) {
- // Verify that the table is usable
- Iterator* it = table_cache->NewIterator(ReadOptions(), meta->number,
- meta->file_size);
- s = it->status();
- delete it;
- }
- }
-
- // Check for input iterator errors
- if (!iter->status().ok()) {
- s = iter->status();
- }
-
- if (s.ok() && meta->file_size > 0) {
- // Keep it
- } else {
- env->RemoveFile(fname);
- }
- return s;
- }
-
- } // namespace leveldb
|