//
// Created by 马也驰 on 2024/12/29.
//

#include "vlog_set.h"
#include "../db/vlog_gc.h"
#include <thread>


// config file: | vlog_nums_(size_t) | vlog_1_num(size_t) | vlog_2_num(size_t) | ... |
// vlog: | curr_size(size_t) | value_nums(size_t) | value1 | value2 | ... |
// value: { value_len(uint16_t) | slot_num(size_t) | value }


// no bugs
VlogSet::VlogSet(std::string dbname, VlogGC *vlog_gc) : dbname(dbname), vlog_gc(vlog_gc) {
  config_file_name = get_config_file_name();
  auto config_file_ = std::fstream(config_file_name, std::ios::in | std::ios::out);
  if (!config_file_.is_open()) {
    // config 文件不存在,尝试创建
    config_file_ = std::fstream(config_file_name, std::ios::out);
    config_file_.close();
    // 重新以读写模式打开
    config_file_ = std::fstream(config_file_name, std::ios::in | std::ios::out);
    config_file_.seekp(0);
    size_t tmp = 0;
    config_file_.write(reinterpret_cast<const char*>(&tmp), sizeof(size_t));
    config_file_.flush();
    this->vlog_nums_ = 0;
    register_new_vlog();
  } else {
    // config 文件存在
    size_t _vlog_nums_;
    config_file_.seekp(0);
    config_file_.read(reinterpret_cast<char*>(&_vlog_nums_), sizeof(size_t));
    this->vlog_nums_ = _vlog_nums_;

    //  从 config file 中读取所有有效的vlog名字
    config_file_.seekp(sizeof(size_t));
    size_t tmp[_vlog_nums_];
    config_file_.read(reinterpret_cast<char*>(tmp), sizeof(tmp));
    for (auto i = 0; i < _vlog_nums_; i++) {
      size_t curr_vlog_num = tmp[i];
      if (!(curr_vlog_num & CONFIG_FILE_DELE_MASK)) {
        auto curr_vlog = std::fstream(get_vlog_name(curr_vlog_num), std::ios::in | std::ios::out);
        size_t curr_vlog_header[2];
        curr_vlog.seekp(0);
        curr_vlog.read(reinterpret_cast<char*>(curr_vlog_header), 2*sizeof(size_t));

        restore_vlog_inmaps(new vlog_info(curr_vlog_num, curr_vlog_header[1], curr_vlog_header[0]));
      }
    }
  }

  if (!config_file_.is_open()) {
    std::cerr << "Failed to open or create the db config file: " << config_file_name << std::endl;
    std::exit(EXIT_FAILURE);
  }

  config_file_.close();

  if (USING_VLOG_CACHE) {
    vlog_cache = new VlogCache();
  }
}

VlogSet::~VlogSet() {
  // wait for all gc threads to finish
  while (true) {
    finished_latch_.lock();
    if (!finished) {
      finished_latch_.unlock();
      std::this_thread::sleep_for(std::chrono::milliseconds(2));
    } else {
      finished_latch_.unlock();
      break;
    }
  }

  auto config_file_ = std::fstream(config_file_name, std::ios::in | std::ios::out);
  config_file_.seekp(0);
  config_file_.write(reinterpret_cast<const char*>(&vlog_nums_), sizeof(size_t));
  config_file_.flush();
  config_file_.close();

  mtx.lock();
  for (auto & it : vlog_info_map_) {
    struct vlog_info* vinfo = it.second;
    auto vlog_handler = get_vlog_handler(vinfo->vlog_num);

    vinfo->vlog_info_latch_.lock();
    // FIXME: SIGSEGV
    vlog_handler->vlog_latch_.hard_lock();

    // 所有操作都已经同步完成
    // 使用 vlog_info* 进行操作
    size_t tmp[2] = {vinfo->curr_size, vinfo->value_nums};
    vinfo->vlog_info_latch_.unlock();

    auto vlog_file_handler = std::fstream(get_vlog_name(vinfo->vlog_num), std::ios::in | std::ios::out);
    vlog_file_handler.seekp(0);
    vlog_file_handler.write(reinterpret_cast<const char*>(tmp), sizeof(tmp));
    vlog_file_handler.flush();
    vlog_file_handler.close();

    std::cout << "vlog_set.cpp line 102" << std::endl;
    vlog_handler->vlog_latch_.hard_unlock();
  }
  mtx.unlock();
  std::cout << "vlog_set.cpp line 106" << std::endl;
  delete vlog_gc;

  if (USING_VLOG_CACHE) {
    delete vlog_cache;
  }
}

// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
void VlogSet::get_value(const struct slot_content &sc, std::string *value) {
  mtx.lock();
  auto vinfo = get_vlog_info(sc.vlog_num);
  auto vhandler = get_vlog_handler(sc.vlog_num);

  vinfo->vlog_info_latch_.lock();
  if (!vinfo->vlog_valid_) {
    vhandler = get_vlog_handler(vinfo->vlog_num_for_gc);
  }
  vhandler->vlog_latch_.soft_lock();
  vhandler->incre_access_thread_nums(); // FIXME: increase thread nums
  mtx.unlock(); // for better performance
  vinfo->vlog_info_latch_.unlock();
  read_vlog_value(sc, value);
  vhandler->decre_access_thread_nums(); // FIXME: decrease thread nums
  vhandler->vlog_latch_.soft_unlock();
}

struct vlog_info * VlogSet::get_writable_vlog_info(size_t value_size) {
  for (auto & it : vlog_info_map_) {
    struct vlog_info* vinfo = it.second;

    // 使用 vlog_info* 进行操作
    vinfo->vlog_info_latch_.lock();
    if (vinfo->vlog_valid_ && !vinfo->processing_gc && vinfo->curr_size+value_size <= VLOG_SIZE) {
      vinfo->vlog_info_latch_.unlock();
      return vinfo;
    }
    vinfo->vlog_info_latch_.unlock();
  }

  // 所有vlog已满,创建新vlog
  return nullptr;
}

// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
void VlogSet::put_value(struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) {
  mtx.lock();

  auto vinfo = get_writable_vlog_info(value.size());
  if (!vinfo) {
    // vlog全部已满,创建新的vlog
    auto _vlog_num_ = register_new_vlog();
    vinfo = get_vlog_info(_vlog_num_);
  }

  vinfo->vlog_info_latch_.lock();
  sc.vlog_num = vinfo->vlog_num;
  sc.value_offset = vinfo->curr_size;
  vinfo->curr_size += value.size() + sizeof(uint16_t) + sizeof(size_t);
  vinfo->value_nums ++;

  auto vhandler = get_vlog_handler(vinfo->vlog_num);
  if (!vinfo->vlog_valid_ || vinfo->processing_gc) {
    vhandler = get_vlog_handler(vinfo->vlog_num_for_gc);
  }

  vhandler->vlog_latch_.hard_lock();
  vhandler->incre_access_thread_nums(); // FIXME: increase thread nums
  mtx.unlock(); // for better performance
  vinfo->vlog_info_latch_.unlock();
  write_vlog_value(sc, slot_num, value);
  vhandler->decre_access_thread_nums(); // FIXME: decrease thread nums
  vhandler->vlog_latch_.hard_unlock();
}

void VlogSet::del_value(const struct slot_content &sc) {
  mtx.lock();
  auto vinfo = get_vlog_info(sc.vlog_num);
  auto vhandler = get_vlog_handler(sc.vlog_num);

  vinfo->vlog_info_latch_.lock();
  if (!vinfo->vlog_valid_ || vinfo->processing_gc) {
    vhandler = get_vlog_handler(vinfo->vlog_num_for_gc);
  }

  vhandler->vlog_latch_.hard_lock();
  vhandler->incre_access_thread_nums(); // FIXME: increase thread nums
  mtx.unlock();  // for better performance
  vinfo->vlog_info_latch_.unlock();
  mark_del_value(sc);
  vhandler->decre_access_thread_nums(); // FIXME: decrease thread nums
  vhandler->vlog_latch_.hard_unlock();
}

size_t VlogSet::register_new_vlog() {
  // FIXME: concurrency on config file
  size_t vn = vlog_nums_;
  std::string vlog_name = get_vlog_name(vn);
  register_inconfig_file(vn);
  create_vlog(vn);
  register_vlog_inmaps(vn, vlog_name);
  vlog_nums_ ++;
  return vn;
}

// only used in vlog gc
void VlogSet::remove_old_vlog(size_t old_vlog_num) {
  // after gc, new_vlog has been created
  // new vlog should have been created here
  mtx.lock();
  auto vi_old = get_vlog_info(old_vlog_num);
  auto vh_old = get_vlog_handler(old_vlog_num);

  vi_old->vlog_info_latch_.lock();
  // FIXME: dead lock
  while (!vh_old->non_access_thread()) {
    std::this_thread::sleep_for(std::chrono::milliseconds(2));
    std::cout << "waiting in remove_old_vlog" << std::endl;
  }

  std::string old_vlog_name = get_vlog_name(old_vlog_num);
  remove_from_config_file(old_vlog_num);
  remove_vlog_file(old_vlog_name); // FIXME: this function should be called after all access on this vlog finished !!!
  vi_old->vlog_valid_ = false;
  vi_old->vlog_info_latch_.unlock();
  vlog_nums_ --;
  mtx.unlock();
}

bool VlogSet::vlog_need_gc(size_t vlog_num) {
  // TODO: need to judge whether vlog is full
  auto vi = get_vlog_info(vlog_num);
  if ((double)(vi->curr_size*1.0)/VLOG_SIZE >= VLOG_GC_VOLUM_THRESHOLD &&
      (double)(vi->curr_size*1.0)/VLOG_SIZE < VLOG_GC_THREHOLD) {
    return false;
  }
  bool retval = vi->vlog_valid_ && (vi->discard/vi->value_nums >= GC_THREDHOLD);
  return retval;
}

void VlogSet::register_inconfig_file(size_t vlog_num) {
  // config file: | vlog_nums_(size_t) | vlog_count_(size_t) | vlog_1_name | ... |
  // first size_t in config file indicates current vlog_nums_(size_t)
  // second size_t in config file indicates current vlog_count_(size_t)
  config_file_latch_.lock();
  auto config_file_ = std::fstream(config_file_name, std::ios::in | std::ios::out);
  config_file_.seekp(sizeof(size_t) + vlog_nums_* sizeof(size_t));
  config_file_.write(reinterpret_cast<const char*>(&vlog_num), sizeof(size_t));
  config_file_.flush();
  config_file_.close();
  config_file_latch_.unlock();
}

void VlogSet::remove_from_config_file(size_t vlog_num) {
  // FIXME: concurrency on config file
  config_file_latch_.lock();

  size_t tmp[vlog_nums_];
  auto config_file_ = std::fstream(config_file_name, std::ios::in | std::ios::out);
  config_file_.seekp(sizeof(size_t));
  config_file_.read(reinterpret_cast<char*>(tmp), sizeof(tmp));

  for (auto i = 0; i < vlog_nums_; i++) {
    size_t curr_vlog_num = tmp[i];
    if (!(curr_vlog_num & CONFIG_FILE_DELE_MASK) && curr_vlog_num == vlog_num) {
      curr_vlog_num |= CONFIG_FILE_DELE_MASK;
      config_file_.seekp(sizeof(size_t) + i*sizeof(size_t));
      config_file_.write(reinterpret_cast<const char*>(&curr_vlog_num), sizeof(size_t));
      config_file_.flush();
      break;
    }
  }
  config_file_.close();

  config_file_latch_.unlock();
}

void VlogSet::create_vlog(size_t vlog_num) {
  auto vlog_name = get_vlog_name(vlog_num);
  auto vlog_new = std::fstream(vlog_name, std::ios::out);
  vlog_new.close();
  // 重新以读写模式打开
  vlog_new = std::fstream(vlog_name, std::ios::in | std::ios::out);
  if (!vlog_new.is_open()) {
    std::cerr << "Failed to open or create the vlog file: " << vlog_name << std::endl;
    std::exit(EXIT_FAILURE);
  }
  size_t tmp[2] = {2*sizeof(size_t), 0};
  vlog_new.seekp(0);
  vlog_new.write(reinterpret_cast<const char*>(tmp), sizeof(tmp));
  vlog_new.flush();
  vlog_new.close();
}

inline void VlogSet::restore_vlog_inmaps(struct vlog_info *vi) {
  auto vlog_name = get_vlog_name(vi->vlog_num);
  vlog_info_map_[vlog_name] = vi;
  vlog_handler_map_[vlog_name] = new vlog_handler();
}

inline void VlogSet::register_vlog_inmaps(size_t vlog_num, std::string &vlog_name) {
  auto vinfo = new vlog_info(vlog_num);
  auto vhandler = new vlog_handler();
  vlog_info_map_[vlog_name] = vinfo;
  vlog_handler_map_[vlog_name] = vhandler;
}

void VlogSet::remove_vlog_file(std::string &vlog_name) {
  assert(!std::remove(vlog_name.c_str()));
//  vlog_handler_map_.erase(vlog_name);
}

inline std::string VlogSet::get_config_file_name() {
  return dbname + "_config_file";
}

std::string VlogSet::get_vlog_name(size_t vlog_num) {
  return dbname + "_vlog_" + std::to_string(vlog_num);
}

struct vlog_info * VlogSet::get_vlog_info(size_t vlog_num) {
  return vlog_info_map_[get_vlog_name(vlog_num)];
}

struct vlog_handler * VlogSet::get_vlog_handler(size_t vlog_num) {
  return vlog_handler_map_[get_vlog_name(vlog_num)];
}



size_t VlogSet::serialize_data(const leveldb::Slice &buff, size_t slot_num, std::string &value) {
  const char *value_buff = buff.data();
  const size_t off = sizeof(uint16_t) + sizeof(size_t);
  const size_t value_size = off + buff.size();
  char data[value_size];
  memcpy(data, &value_size, sizeof(uint16_t));
  memcpy(data+sizeof(uint16_t), &slot_num, sizeof(size_t));
  memcpy(data+off, value_buff, buff.size());
  value = std::string(data, value_size);
  return value_size;
}

void VlogSet::deserialize_data(char *buff, std::string &value) {
  uint16_t value_size;
  memcpy(&value_size, buff, sizeof(uint16_t));
  if (value_size & VALUE_DELE_MASK) {
    value = "";
    return ;
  }
  value_size &= VALUE_SIZE_MASK;
  assert(value_size <= VALUE_BUFF_SIZE);

  const size_t off = sizeof(uint16_t)+sizeof(size_t);
  value = std::string(&buff[off], value_size-off);
}



void VlogSet::read_vlog_value(const struct slot_content &sc, std::string *value) {
  if (USING_VLOG_CACHE) {
    read_vlog_value_cache(sc, value);
  } else {
    read_vlog_value_direct(sc, value);
  }
}

void VlogSet::write_vlog_value(const struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) {
  if (USING_VLOG_CACHE) {
    write_vlog_value_cache(sc, slot_num, value);
  } else {
    write_vlog_value_direct(sc, slot_num, value);
  }
}

uint16_t VlogSet::delete_vlog_value(const struct slot_content &sc) {
  uint16_t value_size;
  if (USING_VLOG_CACHE) {
    value_size = delete_vlog_value_cache(sc);
  } else {
    value_size = delete_vlog_value_direct(sc);
  }
  return value_size;
}



// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
void VlogSet::read_vlog_value_direct(const struct slot_content &sc, std::string *value) {
  auto vlog_name = get_vlog_name(sc.vlog_num);
  auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out);
  handler.seekp(sc.value_offset);
  char value_buff[VALUE_BUFF_SIZE];
  handler.read(value_buff, VALUE_BUFF_SIZE);
  handler.close();

  std::string vlog_value;
  deserialize_data(value_buff, vlog_value);
  *value = vlog_value;

//  uint16_t value_size;
//  memcpy(&value_size, value_buff, sizeof(uint16_t));
//  if (value_size & VALUE_DELE_MASK) {
//    *value = "";
//    return ;
//  }
//  value_size &= VALUE_SIZE_MASK;
//  assert(value_size <= VALUE_BUFF_SIZE);
//
//  const size_t off = sizeof(uint16_t)+sizeof(size_t);
//  *value = std::string(&value_buff[off], value_size-off);
}

// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
void VlogSet::write_vlog_value_direct(const struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) {
  auto vlog_name = get_vlog_name(sc.vlog_num);
  auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out);
  handler.seekp(sc.value_offset);

  std::string vlog_value;
  auto value_size = serialize_data(value, slot_num, vlog_value);
  const char *data = vlog_value.data();

//  const char *value_buff = value.data();
//
//  const size_t off = sizeof(uint16_t) + sizeof(size_t);
//  const size_t value_size = off + value.size();
//  char data[value_size];
//  memcpy(data, &value_size, sizeof(uint16_t));
//  memcpy(data+sizeof(uint16_t), &slot_num, sizeof(size_t));
//  memcpy(data+off, value_buff, value.size());

  handler.write(data, value_size);

  handler.flush();
  handler.close();
}

uint16_t VlogSet::delete_vlog_value_direct(const struct slot_content &sc) {
  auto vinfo = get_vlog_info(sc.vlog_num);
  auto vlog_name = get_vlog_name(sc.vlog_num);
  auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out);
  handler.seekp(sc.value_offset);
  char value_buff[VALUE_BUFF_SIZE];
  handler.read(value_buff, VALUE_BUFF_SIZE);

  uint16_t value_size;
  memcpy(&value_size, value_buff, sizeof(uint16_t));
  if (value_size & VALUE_DELE_MASK) {
    // case when value has been deleted
    handler.close();
    return value_size & VALUE_SIZE_MASK;
  }
  assert(!(value_size & VALUE_DELE_MASK));
  uint16_t masked_value_size = value_size | (uint16_t)VALUE_DELE_MASK;
  memcpy(value_buff, &masked_value_size, sizeof(uint16_t));
  handler.seekp(sc.value_offset);
  handler.write(value_buff, sizeof(uint16_t));
  handler.flush();
  handler.close();

  return value_size;
}



void VlogSet::read_vlog_value_cache(const struct slot_content &sc, std::string *value) {
  auto vlog_name = get_vlog_name(sc.vlog_num);
  std::string tmp_value;
  vlog_cache->read_data(vlog_name, tmp_value, sc.value_offset, sizeof(uint16_t));

  uint16_t value_len;
  memcpy(&value_len, tmp_value.data(), sizeof(uint16_t));
  if (value_len & VALUE_DELE_MASK) {
    *value = "";
    return ;
  }

  assert(!(value_len & VALUE_DELE_MASK));
  std::string value_str;
  vlog_cache->read_data(vlog_name, value_str, sc.value_offset, value_len);

  std::string vlog_value;
  char *value_buff = const_cast<char*>(value_str.data());
  deserialize_data(value_buff, vlog_value);
  *value = vlog_value;
}

void VlogSet::write_vlog_value_cache(const struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) {
  auto vlog_name = get_vlog_name(sc.vlog_num);
  auto value_str = value.ToString();
  std::string vlog_value;
  auto value_size = serialize_data(value.data(), slot_num, vlog_value);
  vlog_cache->write_data(vlog_name, vlog_value, sc.value_offset, value_size);
}

uint16_t VlogSet::delete_vlog_value_cache(const struct slot_content &sc) {
  auto vlog_name = get_vlog_name(sc.vlog_num);
  return vlog_cache->delete_data(vlog_name, sc.value_offset);
}



// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
void VlogSet::mark_del_value(const struct slot_content &sc) {
  auto vinfo = get_vlog_info(sc.vlog_num);
  auto value_size = delete_vlog_value(sc);
//  auto vlog_name = get_vlog_name(sc.vlog_num);
//  auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out);
//  handler.seekp(sc.value_offset);
//  char value_buff[VALUE_BUFF_SIZE];
//  handler.read(value_buff, VALUE_BUFF_SIZE);
//
//  uint16_t value_size;
//  memcpy(&value_size, value_buff, sizeof(uint16_t));
//  if (value_size & VALUE_DELE_MASK) {
//    // case when value has been deleted
//    handler.close();
//    return ;
//  }
//  assert(!(value_size & VALUE_DELE_MASK));
//  uint16_t masked_value_size = value_size | (uint16_t)VALUE_DELE_MASK;
//  memcpy(value_buff, &masked_value_size, sizeof(uint16_t));
//  handler.seekp(sc.value_offset);
//  handler.write(value_buff, sizeof(uint16_t));
//  handler.flush();
//  handler.close();

  // handle gc, mtx is locked outside, vlog_info_latch and vlog hard lock is locked outside too
  vinfo->discard ++;
  vinfo->value_nums --;
  vinfo->curr_size -= value_size;
  // FIXME: gc process, avoid repeated gc
  if (vlog_need_gc(sc.vlog_num) && !vinfo->processing_gc) {
    // create new vlog
    vinfo->processing_gc = true;
    vinfo->vlog_num_for_gc = register_new_vlog();
    vlog_gc->do_gc(sc.vlog_num, vinfo->vlog_num_for_gc);
  }
}