|
|
@ -4,6 +4,7 @@ |
|
|
|
|
|
|
|
#include "vlog_set.h"
|
|
|
|
#include "../db/vlog_gc.h"
|
|
|
|
#include <thread>
|
|
|
|
|
|
|
|
|
|
|
|
// config file: | vlog_nums_(size_t) | vlog_1_num(size_t) | vlog_2_num(size_t) | ... |
|
|
|
@ -53,7 +54,6 @@ VlogSet::VlogSet(std::string dbname, VlogGC *vlog_gc) : dbname(dbname), vlog_gc( |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (!this->config_file_->is_open()) { |
|
|
|
std::cerr << "Failed to open or create the db config file: " << cfname << std::endl; |
|
|
|
std::exit(EXIT_FAILURE); |
|
|
@ -61,6 +61,18 @@ VlogSet::VlogSet(std::string dbname, VlogGC *vlog_gc) : dbname(dbname), vlog_gc( |
|
|
|
} |
|
|
|
|
|
|
|
VlogSet::~VlogSet() { |
|
|
|
// wait for all gc threads to finish
|
|
|
|
while (true) { |
|
|
|
finished_latch_.lock(); |
|
|
|
if (!finished) { |
|
|
|
finished_latch_.unlock(); |
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(2)); |
|
|
|
} else { |
|
|
|
finished_latch_.unlock(); |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
config_file_->seekp(0); |
|
|
|
config_file_->write(reinterpret_cast<const char*>(&vlog_nums_), sizeof(size_t)); |
|
|
|
config_file_->flush(); |
|
|
@ -69,6 +81,7 @@ VlogSet::~VlogSet() { |
|
|
|
for (auto & it : vlog_info_map_) { |
|
|
|
struct vlog_info* vinfo = it.second; |
|
|
|
|
|
|
|
// 所有操作都已经同步完成
|
|
|
|
// 使用 vlog_info* 进行操作
|
|
|
|
size_t tmp[2] = {vinfo->curr_size, vinfo->value_nums}; |
|
|
|
auto vlog_file_handler = std::fstream(get_vlog_name(vinfo->vlog_num), std::ios::in | std::ios::out); |
|
|
@ -82,20 +95,22 @@ VlogSet::~VlogSet() { |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
void VlogSet::get_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { |
|
|
|
// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
|
|
|
|
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
|
|
|
|
void VlogSet::get_value(const struct slot_content &sc, std::string *value) { |
|
|
|
mtx.lock(); |
|
|
|
auto vinfo = get_vlog_info(vlog_num); |
|
|
|
auto vhandler = get_vlog_handler(vlog_num); |
|
|
|
auto vinfo = get_vlog_info(sc.vlog_num); |
|
|
|
auto vhandler = get_vlog_handler(sc.vlog_num); |
|
|
|
|
|
|
|
vinfo->vlog_info_latch_.lock(); |
|
|
|
if (!vinfo->vlog_valid_) { |
|
|
|
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); |
|
|
|
} |
|
|
|
vhandler->vlog_latch_.soft_lock(); |
|
|
|
mtx.unlock(); // for better performance
|
|
|
|
vinfo->vlog_info_latch_.unlock(); |
|
|
|
read_vlog_value(vlog_numspan>class="p">, value_offset, value); |
|
|
|
read_vlog_value(sc, value); |
|
|
|
vhandler->vlog_latch_.soft_unlock(); |
|
|
|
mtx.unlock(); |
|
|
|
} |
|
|
|
|
|
|
|
struct vlog_info * VlogSet::get_writable_vlog_info(size_t value_size) { |
|
|
@ -103,16 +118,21 @@ struct vlog_info * VlogSet::get_writable_vlog_info(size_t value_size) { |
|
|
|
struct vlog_info* vinfo = it->second; |
|
|
|
|
|
|
|
// 使用 vlog_info* 进行操作
|
|
|
|
if (vinfo->curr_size + value_size <= VLOG_SIZE) { |
|
|
|
vinfo->vlog_info_latch_.lock(); |
|
|
|
if (vinfo->vlog_valid_ && !vinfo->processing_gc && vinfo->curr_size+value_size <= VLOG_SIZE) { |
|
|
|
vinfo->vlog_info_latch_.unlock(); |
|
|
|
return vinfo; |
|
|
|
} |
|
|
|
vinfo->vlog_info_latch_.unlock(); |
|
|
|
} |
|
|
|
|
|
|
|
// 所有vlog已满,创建新vlog
|
|
|
|
return nullptr; |
|
|
|
} |
|
|
|
|
|
|
|
void VlogSet::put_value(uint32_t *vlog_num, uint32_t *value_offset, const leveldb::Slice &value) { |
|
|
|
// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
|
|
|
|
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
|
|
|
|
void VlogSet::put_value(struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) { |
|
|
|
mtx.lock(); |
|
|
|
|
|
|
|
auto vinfo = get_writable_vlog_info(value.size()); |
|
|
@ -123,82 +143,69 @@ void VlogSet::put_value(uint32_t *vlog_num, uint32_t *value_offset, const leveld |
|
|
|
} |
|
|
|
|
|
|
|
vinfo->vlog_info_latch_.lock(); |
|
|
|
*vlog_num = vinfo->vlog_num; |
|
|
|
*value_offset = vinfo->curr_size; |
|
|
|
vinfo->curr_size += value.size(); |
|
|
|
sc.vlog_num = vinfo->vlog_num; |
|
|
|
sc.value_offset = vinfo->curr_size; |
|
|
|
vinfo->curr_size += value.size() + sizeof(uint16_t) + sizeof(size_t); |
|
|
|
vinfo->value_nums ++; |
|
|
|
vinfo->vlog_info_latch_.unlock(); |
|
|
|
|
|
|
|
auto vhandler = get_vlog_handler(*vlog_num); |
|
|
|
|
|
|
|
vinfo->vlog_info_latch_.lock(); |
|
|
|
if (!vinfo->vlog_valid_) { |
|
|
|
auto vhandler = get_vlog_handler(vinfo->vlog_num); |
|
|
|
if (!vinfo->vlog_valid_ || vinfo->processing_gc) { |
|
|
|
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); |
|
|
|
} |
|
|
|
|
|
|
|
vhandler->vlog_latch_.hard_lock(); |
|
|
|
mtx.unlock(); // for better performance
|
|
|
|
vinfo->vlog_info_latch_.unlock(); |
|
|
|
write_vlog_value(*vlog_num, *value_offset, value); |
|
|
|
write_vlog_value(sc, slot_num, value); |
|
|
|
vhandler->vlog_latch_.hard_unlock(); |
|
|
|
mtx.unlock(); |
|
|
|
} |
|
|
|
|
|
|
|
void VlogSet::del_value(uint32_t vlog_num, uint32_t value_offset) { |
|
|
|
void VlogSet::del_value(const struct slot_content &sc) { |
|
|
|
mtx.lock(); |
|
|
|
auto vinfo = get_vlog_info(vlog_num); |
|
|
|
auto vhandler = get_vlog_handler(vlog_num); |
|
|
|
auto vinfo = get_vlog_info(sc.vlog_num); |
|
|
|
auto vhandler = get_vlog_handler(sc.vlog_num); |
|
|
|
|
|
|
|
vinfo->vlog_info_latch_.lock(); |
|
|
|
if (!vinfo->vlog_valid_) { |
|
|
|
// auto vlog_num_gc = vinfo->vlog_num_for_gc;
|
|
|
|
if (!vinfo->vlog_valid_ || vinfo->processing_gc) { |
|
|
|
vhandler = get_vlog_handler(vinfo->vlog_num_for_gc); |
|
|
|
// vinfo->vlog_info_latch_.unlock();
|
|
|
|
// vinfo = get_vlog_info(vlog_num_gc);
|
|
|
|
// vinfo->vlog_info_latch_.lock();
|
|
|
|
} |
|
|
|
vhandler->vlog_latch_.hard_lock(); |
|
|
|
mark_del_value(vlog_num, value_offset); |
|
|
|
|
|
|
|
vhandler->vlog_latch_.hard_lock(); |
|
|
|
mtx.unlock(); // for better performance
|
|
|
|
vinfo->vlog_info_latch_.unlock(); |
|
|
|
mark_del_value(sc); |
|
|
|
vhandler->vlog_latch_.hard_unlock(); |
|
|
|
mtx.unlock(); |
|
|
|
} |
|
|
|
|
|
|
|
size_t VlogSet::register_new_vlog() { |
|
|
|
// FIXME: concurrency on config file
|
|
|
|
size_t vn = vlog_nums_; |
|
|
|
std::string vlog_name = get_vlog_name(vn); |
|
|
|
register_inconfig_file(vn); |
|
|
|
create_vlog(vn); |
|
|
|
// auto vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out);
|
|
|
|
// if (!vlog_new->is_open()) {
|
|
|
|
// std::cerr << "Failed to open or create the vlog file: " << vlog_new << std::endl;
|
|
|
|
// std::exit(EXIT_FAILURE);
|
|
|
|
// }
|
|
|
|
register_vlog_inmaps(vn, vlog_name); |
|
|
|
vlog_nums_ ++; |
|
|
|
// vlog_count_ ++;
|
|
|
|
return vn; |
|
|
|
} |
|
|
|
|
|
|
|
// only used in vlog gc
|
|
|
|
void VlogSet::remove_old_vlog(size_t old_vlog_num) { |
|
|
|
// after gc, new_vlog has been created
|
|
|
|
// new vlog should have been created here !
|
|
|
|
// new vlog should have been created here
|
|
|
|
mtx.lock(); |
|
|
|
auto vi_old = get_vlog_info(old_vlog_num); |
|
|
|
vi_old->vlog_info_latch_.lock(); |
|
|
|
std::string old_vlog_name = get_vlog_name(old_vlog_num); |
|
|
|
remove_from_config_file(old_vlog_num); |
|
|
|
remove_vlog_from_maps(old_vlog_name); |
|
|
|
// remove_vlog_from_maps(old_vlog_name); // FIXME: this function should be called after all access on this vlog finished !!!
|
|
|
|
vi_old->vlog_valid_ = false; |
|
|
|
vi_old->vlog_info_latch_.unlock(); |
|
|
|
// vlog_count_ --;
|
|
|
|
mtx.unlock(); |
|
|
|
} |
|
|
|
|
|
|
|
bool VlogSet::vlog_need_gc(size_t vlog_num) { |
|
|
|
// FIXME: vlog应该已经满了才行
|
|
|
|
std::string vlog_name = get_vlog_name(vlog_num); |
|
|
|
auto vi = vlog_info_map_[vlog_name]; |
|
|
|
if ((double)vi->curr_size/VLOG_SIZE < VLOG_GC_THREHOLD) { |
|
|
|
auto vi = get_vlog_info(vlog_num); |
|
|
|
if ((double)(vi->curr_size*1.0)/VLOG_SIZE < VLOG_GC_THREHOLD) { |
|
|
|
return false; |
|
|
|
} |
|
|
|
bool retval = vi->vlog_valid_ && (vi->discard/vi->value_nums >= GC_THREDHOLD); |
|
|
@ -209,12 +216,16 @@ void VlogSet::register_inconfig_file(size_t vlog_num) { |
|
|
|
// config file: | vlog_nums_(size_t) | vlog_count_(size_t) | vlog_1_name | ... |
|
|
|
|
// first size_t in config file indicates current vlog_nums_(size_t)
|
|
|
|
// second size_t in config file indicates current vlog_count_(size_t)
|
|
|
|
config_file_latch_.lock(); |
|
|
|
config_file_->seekp(sizeof(size_t) + vlog_nums_* sizeof(size_t)); |
|
|
|
config_file_->write(reinterpret_cast<const char*>(&vlog_num), sizeof(size_t)); |
|
|
|
config_file_->flush(); |
|
|
|
config_file_latch_.unlock(); |
|
|
|
} |
|
|
|
|
|
|
|
void VlogSet::remove_from_config_file(size_t vlog_num) { |
|
|
|
// FIXME: concurrency on config file
|
|
|
|
config_file_latch_.lock(); |
|
|
|
char tmp[vlog_nums_*vlog_num_size]; |
|
|
|
config_file_->seekp(sizeof(size_t)); |
|
|
|
config_file_->read(tmp, sizeof(tmp)); |
|
|
@ -229,6 +240,7 @@ void VlogSet::remove_from_config_file(size_t vlog_num) { |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
|
config_file_latch_.unlock(); |
|
|
|
} |
|
|
|
|
|
|
|
void VlogSet::create_vlog(size_t vlog_num) { |
|
|
@ -264,14 +276,8 @@ inline void VlogSet::register_vlog_inmaps(size_t vlog_num, std::string &vlog_nam |
|
|
|
} |
|
|
|
|
|
|
|
inline void VlogSet::remove_vlog_from_maps(std::string &vlog_name) { |
|
|
|
auto vi = vlog_info_map_[vlog_name]; |
|
|
|
// auto vh = vlog_handler_map_[vlog_name];
|
|
|
|
vi->vlog_info_latch_.lock(); |
|
|
|
vi->vlog_valid_ = false; // TODO: change the slotpage vlog_num when get/put thread read this flag
|
|
|
|
// vh->handler_->close();
|
|
|
|
assert(!std::remove(vlog_name.c_str())); |
|
|
|
vlog_handler_map_.erase(vlog_name); |
|
|
|
vi->vlog_info_latch_.unlock(); |
|
|
|
} |
|
|
|
|
|
|
|
inline std::string VlogSet::get_config_file_name() { |
|
|
@ -290,42 +296,57 @@ struct vlog_handler * VlogSet::get_vlog_handler(size_t vlog_num) { |
|
|
|
return vlog_handler_map_[get_vlog_name(vlog_num)]; |
|
|
|
} |
|
|
|
|
|
|
|
// value: | value_size(uint16_t) | slot_num(size_t) | field_nums(uint16_t) | value |
|
|
|
|
void VlogSet::read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::string *value) { |
|
|
|
auto vlog_name = get_vlog_name(vlog_num); |
|
|
|
// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
|
|
|
|
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
|
|
|
|
void VlogSet::read_vlog_value(const struct slot_content &sc, std::string *value) { |
|
|
|
auto vlog_name = get_vlog_name(sc.vlog_num); |
|
|
|
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); |
|
|
|
handler.seekp(value_offset); |
|
|
|
handler.seekp(sc.value_offset); |
|
|
|
char value_buff[VALUE_BUFF_SIZE]; |
|
|
|
handler.read(value_buff, VALUE_BUFF_SIZE); |
|
|
|
|
|
|
|
// FIXME: remove value size
|
|
|
|
uint16_t value_size; |
|
|
|
memcpy(&value_size, value_buff, sizeof(uint16_t)); |
|
|
|
if (value_size & VALUE_DELE_MASK) { |
|
|
|
*value = ""; |
|
|
|
return ; |
|
|
|
} |
|
|
|
value_size &= VALUE_SIZE_MASK; |
|
|
|
// *value = std::string(value_buff);
|
|
|
|
assert(value_size <= VALUE_BUFF_SIZE); |
|
|
|
value->assign(&value_buff[sizeof(uint16_t)], value_size); |
|
|
|
|
|
|
|
const size_t off = sizeof(uint16_t)+sizeof(size_t); |
|
|
|
*value = std::string(&value_buff[off], value_size-off); |
|
|
|
handler.close(); |
|
|
|
} |
|
|
|
|
|
|
|
void VlogSet::write_vlog_value(uint32_t vlog_num, uint32_t value_offset, const leveldb::Slice &value) { |
|
|
|
auto vlog_name = get_vlog_name(vlog_num); |
|
|
|
// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
|
|
|
|
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
|
|
|
|
void VlogSet::write_vlog_value(const struct slot_content &sc, size_t slot_num, const leveldb::Slice &value) { |
|
|
|
auto vlog_name = get_vlog_name(sc.vlog_num); |
|
|
|
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); |
|
|
|
handler.seekp(value_offset); |
|
|
|
handler.seekp(sc.value_offset); |
|
|
|
const char *value_buff = value.data(); |
|
|
|
handler.write(value_buff, value.size()); |
|
|
|
|
|
|
|
auto vinfo = get_vlog_info(vlog_num); |
|
|
|
const size_t off = sizeof(uint16_t) + sizeof(size_t); |
|
|
|
const size_t value_size = off + value.size(); |
|
|
|
char data[value_size]; |
|
|
|
memcpy(data, &value_size, sizeof(uint16_t)); |
|
|
|
memcpy(data+sizeof(uint16_t), &slot_num, sizeof(size_t)); |
|
|
|
memcpy(data+off, value_buff, value.size()); |
|
|
|
|
|
|
|
handler.write(data, value_size); |
|
|
|
|
|
|
|
handler.flush(); |
|
|
|
handler.close(); |
|
|
|
} |
|
|
|
|
|
|
|
void VlogSet::mark_del_value(uint32_t vlog_num, uint32_t value_offset) { |
|
|
|
auto vinfo = get_vlog_info(vlog_num); |
|
|
|
auto vlog_name = get_vlog_name(vlog_num); |
|
|
|
// single value: || value_size(uint16_t) | slot_num(size_t) || {field_nums(uint16_t), attr1, attr2, ... } |
|
|
|
|
// single attr: | attr1_name_len(uint8_t) | attr1_name | attr1_len(uint16_t) | attr1 |
|
|
|
|
void VlogSet::mark_del_value(const struct slot_content &sc) { |
|
|
|
auto vinfo = get_vlog_info(sc.vlog_num); |
|
|
|
auto vlog_name = get_vlog_name(sc.vlog_num); |
|
|
|
auto handler = std::fstream(vlog_name, std::ios::in | std::ios::out); |
|
|
|
handler.seekp(value_offset); |
|
|
|
handler.seekp(sc.value_offset); |
|
|
|
char value_buff[VALUE_BUFF_SIZE]; |
|
|
|
handler.read(value_buff, VALUE_BUFF_SIZE); |
|
|
|
|
|
|
@ -337,22 +358,22 @@ void VlogSet::mark_del_value(uint32_t vlog_num, uint32_t value_offset) { |
|
|
|
return ; |
|
|
|
} |
|
|
|
assert(!(value_size & VALUE_DELE_MASK)); |
|
|
|
uint16_t masked_value_size = value_size | VALUE_DELE_MASK; |
|
|
|
uint16_t masked_value_size = value_size | (uint16_t)VALUE_DELE_MASK; |
|
|
|
memcpy(value_buff, &masked_value_size, sizeof(uint16_t)); |
|
|
|
handler.write(value_buff, value_size); |
|
|
|
handler.seekp(sc.value_offset); |
|
|
|
handler.write(value_buff, sizeof(uint16_t)); |
|
|
|
handler.flush(); |
|
|
|
handler.close(); |
|
|
|
|
|
|
|
// handle gc, mtx is locked outside, vlog_info_latch and vlog hard lock is locked outside too
|
|
|
|
// auto vinfo = get_vlog_info(vlog_num);
|
|
|
|
vinfo->discard ++; |
|
|
|
vinfo->value_nums --; |
|
|
|
vinfo->curr_size -= value_size & VALUE_SIZE_MASK; |
|
|
|
if (vlog_need_gc(vlog_num)) { |
|
|
|
vinfo->curr_size -= value_size; |
|
|
|
// FIXME: gc process
|
|
|
|
if (vlog_need_gc(sc.vlog_num) && !vinfo->processing_gc) { |
|
|
|
// create new vlog
|
|
|
|
vinfo->vlog_valid_ = false; |
|
|
|
vinfo->processing_gc = true; |
|
|
|
vinfo->vlog_num_for_gc = register_new_vlog(); |
|
|
|
// vinfo->vlog_valid_ = false;
|
|
|
|
vlog_gc->do_gc(vlog_num, vinfo->vlog_num_for_gc); |
|
|
|
vlog_gc->do_gc(sc.vlog_num, vinfo->vlog_num_for_gc); |
|
|
|
} |
|
|
|
} |