//
// Created by 马也驰 on 2025/1/4.
//


#include "vlog_cache.h"

#include <fstream>


void VlogCache::read_data(std::string &vlog_name, std::string& value, size_t offset, size_t len) {
  std::string value_part1;
  std::string value_part2;

  size_t offset1, len1, offset2, len2;
  offset1 = offset;
  if (VALUE_ACROSS_BLOCK(offset, len)) {
    offset2 = ((offset+len) / BLOCK_SIZE) * BLOCK_SIZE;
//    offset2 = ((offset+len) & ~(size_t)BLOCK_SIZE);
    len1 = offset2 - offset1;
    len2 = offset1 + len - offset2;
  } else {
    len1 = len;
    offset2 = offset1 + len;
    len2 = 0;
  }

  read_data_inblock(vlog_name, value_part1, offset1, len1);
  read_data_inblock(vlog_name, value_part2, offset2, len2);

  value = value_part1 + value_part2;
}

void VlogCache::write_data(std::string &vlog_name, std::string& value, size_t offset, size_t len) {
  std::string value_part1;
  std::string value_part2;

  size_t offset1, len1, offset2, len2;
  offset1 = offset;
  if (VALUE_ACROSS_BLOCK(offset, len)) {
    // FIXME: compute correct offset2
    offset2 = ((offset+len) / BLOCK_SIZE) * BLOCK_SIZE;
//    offset2 = ((offset+len) & ~(size_t)BLOCK_SIZE);
    len1 = offset2 - offset1;
    len2 = offset1 + len - offset2;
  } else {
    len1 = len;
    offset2 = 0;
    len2 = 0;
  }

  const char *data = value.data();
  value_part1 = std::string(data, len1);
  value_part2 = std::string(&data[len1], len2);

  write_data_inblock(vlog_name, value_part1, offset1, len1);
  write_data_inblock(vlog_name, value_part2, offset2, len2);
}

uint16_t VlogCache::delete_data(std::string &vlog_name, size_t offset) {
  uint16_t value_size;
  std::string value_size_str;
  read_data(vlog_name, value_size_str, offset, sizeof(uint16_t));

  memcpy(&value_size, value_size_str.data(), sizeof(uint16_t));
  if (value_size & DATA_DELE_MASK) {
    return value_size & DATA_SIZE_MASK;
  }

  assert(!(value_size & DATA_DELE_MASK));
  uint16_t masked_value_size = value_size | (uint16_t)DATA_DELE_MASK;

  value_size_str = std::string((char *)&masked_value_size, sizeof(uint16_t));
  write_data(vlog_name, value_size_str, offset, sizeof(uint16_t));

  return value_size;
}


void VlogCache::read_data_inblock(std::string &vlog_name, std::string& value, size_t offset, size_t len) {
  if (len == 0) return ;

  auto block_num = offset_2_blocknum(offset);
  auto block_offset = offset_2_inblock_offset(offset);
//  auto vlog_handler = std::fstream(vlog_name, std::ios::in | std::ios::out);

  auto blk_frame = block_frames[block_num];
  auto frame_info_ptr = &blk_frame->frame_info_;

  blk_frame->mtx.lock();
  frame_info_ptr->info_latch_.lock();

  if (!frame_info_ptr->cache_hit(vlog_name, block_num)) {
    // cache miss
    // swap old page
    if (frame_info_ptr->used && frame_info_ptr->is_dirty) {
      write_back_block(block_num);
    }
    // read new page
    read_in_block(block_num, vlog_name, offset/BLOCK_SIZE);
    // update frame info
    frame_info_ptr->used = true;
    frame_info_ptr->vlog_name = vlog_name;
    frame_info_ptr->vlog_block_num = offset / BLOCK_SIZE;
    frame_info_ptr->is_dirty = false;
  }

  value = std::string(&blk_frame->block_buff[block_offset], len);

  frame_info_ptr->info_latch_.unlock();
  blk_frame->mtx.unlock();
}


void VlogCache::write_data_inblock(std::string &vlog_name, std::string& value, size_t offset, size_t len) {
  if (len == 0) return ;

  auto block_num = offset_2_blocknum(offset);
  auto block_offset = offset_2_inblock_offset(offset);
  //  auto vlog_handler = std::fstream(vlog_name, std::ios::in | std::ios::out);

  auto blk_frame = block_frames[block_num];
  auto frame_info_ptr = &blk_frame->frame_info_;

  blk_frame->mtx.lock();
  frame_info_ptr->info_latch_.lock();

  if (!frame_info_ptr->cache_hit(vlog_name, block_num)) {
    // cache miss
    // swap old page
    if (frame_info_ptr->used && frame_info_ptr->is_dirty) {
      write_back_block(block_num);
    }
    // read new page
    read_in_block(block_num, vlog_name, offset/BLOCK_SIZE);
    // update frame info
    frame_info_ptr->used = true;
    frame_info_ptr->vlog_name = vlog_name;
    frame_info_ptr->vlog_block_num = offset / BLOCK_SIZE;
  }

  memcpy(&blk_frame->block_buff[block_offset], value.c_str(), len);
  frame_info_ptr->is_dirty = true;

  frame_info_ptr->info_latch_.unlock();
  blk_frame->mtx.unlock();
}


void VlogCache::flush_all_blocks() {
  for (auto i = 0; i < BLOCK_NUMS; i++) {
    auto blk_frame = block_frames[i];
    auto frame_info_ptr = &blk_frame->frame_info_;

    blk_frame->mtx.lock();
    frame_info_ptr->info_latch_.lock();

    if (frame_info_ptr->used && frame_info_ptr->is_dirty) {
      write_back_block(i);
    }

    blk_frame->mtx.unlock();
    frame_info_ptr->info_latch_.unlock();
  }
}


void VlogCache::write_back_block(size_t block_num) {
  auto frame_info_ptr = &block_frames[block_num]->frame_info_;
  auto vlog_handler = std::fstream(frame_info_ptr->vlog_name, std::ios::in | std::ios::out);
  if (!vlog_handler.is_open()) return ;
  vlog_handler.seekp(frame_info_ptr->vlog_block_num*BLOCK_SIZE);
  vlog_handler.write(block_frames[block_num]->block_buff, BLOCK_SIZE);
  vlog_handler.flush();
  vlog_handler.close();
}


void VlogCache::read_in_block(size_t block_num, std::string& vlog_name,
                              size_t vlog_block_num) {
  auto vlog_handler = std::fstream(vlog_name, std::ios::in | std::ios::out);
  if (!vlog_handler.is_open()) return ;
  vlog_handler.seekp(vlog_block_num*BLOCK_SIZE);
  vlog_handler.read(block_frames[block_num]->block_buff, BLOCK_SIZE);
  vlog_handler.close();
}