// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "db/builder.h"

#include "db/dbformat.h"
#include "db/filename.h"
#include "db/table_cache.h"
#include "db/version_edit.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "table/vtable_builder.h"

namespace leveldb {

Status BuildTable(const std::string& dbname, Env* env, const Options& options,
                  TableCache* table_cache, Iterator* iter, FileMetaData* meta) {
  Status s;
  meta->file_size = 0;
  iter->SeekToFirst();

  std::string fname = TableFileName(dbname, meta->number);
  std::string vtb_name = VTableFileName(dbname, meta->number);
  if (iter->Valid()) {
    WritableFile* file;
    s = env->NewWritableFile(fname, &file);
    if (!s.ok()) {
      return s;
    }

    WritableFile* vtb_file;
    s = env->NewWritableFile(vtb_name, &vtb_file);
    if (!s.ok()) {
      return s;
    }

    TableBuilder* builder = new TableBuilder(options, file);
    VTableBuilder* vtb_builder = new VTableBuilder(options, vtb_file);
    meta->smallest.DecodeFrom(iter->key());
    Slice key;
    for (; iter->Valid(); iter->Next()) {
      key = iter->key();
      Slice value = iter->value();
      if (value.size() < options.kv_sep_size) {
        // No need to separate key and value
        builder->Add(key, value);
      }
      else {
        // Separate key value
        ParsedInternalKey parsed;
        if (!ParseInternalKey(key, &parsed)) {
          s = Status::Corruption("Fatal. Memtable Key Error");
          builder->Abandon();
          vtb_builder->Abandon();
          return s;
        }
        value.remove_prefix(1);
        VTableRecord record {parsed.user_key, value};
        VTableHandle handle;
        VTableIndex index;
        std::string value_index;
        vtb_builder->Add(record, &handle);

        index.file_number = meta->number;
        index.vtable_handle = handle;
        index.Encode(&value_index);
        builder->Add(key, Slice(value_index));
      }
    }
    if (!key.empty()) {
      meta->largest.DecodeFrom(key);
    }

    // Finish and check for builder errors
    s = builder->Finish();
    if (s.ok()) {
      meta->file_size = builder->FileSize();
      assert(meta->file_size > 0);
    }
    delete builder;

    // Finish and check for file errors
    if (s.ok()) {
      s = file->Sync();
    }
    if (s.ok()) {
      s = file->Close();
    }
    delete file;
    file = nullptr;

    if (s.ok()) {
      s = vtb_builder->Finish();
    }
    delete vtb_builder;

    if (s.ok()) {
      s = vtb_file->Sync();
    }
    if (s.ok()) {
      s = vtb_file->Close();
    }
    delete vtb_file;
    vtb_file = nullptr;

    if (s.ok()) {
      // Verify that the table is usable
      Iterator* it = table_cache->NewIterator(ReadOptions(), meta->number,
                                              meta->file_size);
      s = it->status();
      delete it;
    }
  }

  // Check for input iterator errors
  if (!iter->status().ok()) {
    s = iter->status();
  }

  if (s.ok() && meta->file_size > 0) {
    // Keep it
  } else {
    env->RemoveFile(fname);
  }
  return s;
}

}  // namespace leveldb