diff --git a/db/db_impl.cc b/db/db_impl.cc index 2353227..d6fda6a 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1129,11 +1129,12 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { return versions_->MaxNextLevelOverlappingBytes(); } -Status DBImpl::Get(const ReadOptions& options, const Slice& key, - std::string* value) { +Status DBImpl::get_slot_num(const ReadOptions& options, const Slice& key, + size_t *slot_num) { Status s; MutexLock l(&mutex_); SequenceNumber snapshot; + std::string value; if (options.snapshot != nullptr) { snapshot = static_cast(options.snapshot)->sequence_number(); @@ -1156,12 +1157,12 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, mutex_.Unlock(); // First look in the memtable, then in the immutable memtable (if any). LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s)) { + if (mem->Get(lkey, &value, &s)) { // Done - } else if (imm != nullptr && imm->Get(lkey, value, &s)) { + } else if (imm != nullptr && imm->Get(lkey, &value, &s)) { // Done } else { - s = current->Get(options, lkey, value, &stats); + s = current->Get(options, lkey, &value, &stats); have_stat_update = true; } mutex_.Lock(); @@ -1174,8 +1175,17 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, if (imm != nullptr) imm->Unref(); current->Unref(); + *slot_num = *(size_t *)(&value)->c_str(); + + return s; +} + +Status DBImpl::Get(const ReadOptions& options, const Slice& key, + std::string* value) { + size_t slot_num; + auto s = get_slot_num(options, key, &slot_num); + // TODO: search the slotpage and get value from vlog - size_t slot_num = *(size_t *)value->c_str(); struct slot_content sc; std::string vlog_value; slot_page_->get_slot(slot_num, &sc); @@ -1237,15 +1247,10 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { // slot_page_->get_slot(slot_num, &sc); // vlog_set_->get_value(sc.vlog_num, sc.value_offset, &vlog_value); // *value = vlog_value; - ReadOptions ro; - ro.verify_checksums = true; - ro.fill_cache = false; - ro.snapshot = nullptr; - std::string value; - Get(ro, key, &value); - size_t slot_num = *(size_t *)value.c_str(); + size_t slot_num; + auto s = get_slot_num(ReadOptions(), key, &slot_num); + struct slot_content sc; - std::string vlog_value; slot_page_->get_slot(slot_num, &sc); vlog_set_->del_value(sc.vlog_num, sc.value_offset); diff --git a/db/db_impl.h b/db/db_impl.h index 9b74668..2383872 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -107,6 +107,9 @@ class DBImpl : public DB { int64_t bytes_written; }; + Status get_slot_num(const ReadOptions& options, const Slice& key, + size_t *slot_num); + Iterator* NewInternalIterator(const ReadOptions&, SequenceNumber* latest_snapshot, uint32_t* seed); diff --git a/db/vlog_set.cpp b/db/vlog_set.cpp index 90ba2a9..506dc79 100644 --- a/db/vlog_set.cpp +++ b/db/vlog_set.cpp @@ -22,7 +22,12 @@ VlogSet::VlogSet(std::string dbname, VlogGC *vlog_gc) : dbname(dbname), vlog_gc( delete this->config_file_; // 重新以读写模式打开 this->config_file_ = new std::fstream(cfname, std::ios::in | std::ios::out); + this->config_file_->seekp(0); + size_t tmp = 0; + this->config_file_->write(reinterpret_cast(&tmp), sizeof(size_t)); + this->config_file_->flush(); this->vlog_nums_ = 0; + register_new_vlog(); } else { // config 文件存在 size_t _vlog_nums_; @@ -43,7 +48,7 @@ VlogSet::VlogSet(std::string dbname, VlogGC *vlog_gc) : dbname(dbname), vlog_gc( curr_vlog.read(reinterpret_cast(curr_vlog_header), 2*sizeof(size_t)); curr_vlog.close(); - restore_vlog_inmaps(new vlog_info(curr_vlog_num, tmp[1], tmp[0])); + restore_vlog_inmaps(new vlog_info(curr_vlog_num, curr_vlog_header[1], curr_vlog_header[0])); } } } @@ -162,7 +167,7 @@ size_t VlogSet::register_new_vlog() { size_t vn = vlog_nums_; std::string vlog_name = get_vlog_name(vn); register_inconfig_file(vn); - create_vlog(vlog_name); + create_vlog(vn); // auto vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out); // if (!vlog_new->is_open()) { // std::cerr << "Failed to open or create the vlog file: " << vlog_new << std::endl; @@ -190,8 +195,12 @@ void VlogSet::remove_old_vlog(size_t old_vlog_num) { } bool VlogSet::vlog_need_gc(size_t vlog_num) { + // FIXME: vlog应该已经满了才行 std::string vlog_name = get_vlog_name(vlog_num); auto vi = vlog_info_map_[vlog_name]; + if ((double)vi->curr_size/VLOG_SIZE < VLOG_GC_THREHOLD) { + return false; + } bool retval = vi->vlog_valid_ && (vi->discard/vi->value_nums >= GC_THREDHOLD); return retval; } @@ -223,13 +232,23 @@ void VlogSet::remove_from_config_file(size_t vlog_num) { } } -void VlogSet::create_vlog(std::string &vlog_name) { +void VlogSet::create_vlog(size_t vlog_num) { + auto vlog_name = get_vlog_name(vlog_num); std::fstream *vlog_new = new std::fstream(vlog_name, std::ios::out); - char tmp[2*sizeof(size_t)]; - memset(tmp, 0, sizeof(tmp)); - vlog_new->write(tmp, sizeof(tmp)); + vlog_new->close(); + delete vlog_new; + // 重新以读写模式打开 + vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out); + if (!vlog_new->is_open()) { + std::cerr << "Failed to open or create the vlog file: " << vlog_name << std::endl; + std::exit(EXIT_FAILURE); + } + size_t tmp[2] = {2*sizeof(size_t), vlog_num}; + vlog_new->seekp(0); + vlog_new->write(reinterpret_cast(tmp), sizeof(tmp)); vlog_new->flush(); vlog_new->close(); + delete vlog_new; } inline void VlogSet::restore_vlog_inmaps(struct vlog_info *vi) { @@ -239,8 +258,10 @@ inline void VlogSet::restore_vlog_inmaps(struct vlog_info *vi) { } inline void VlogSet::register_vlog_inmaps(size_t vlog_num, std::string &vlog_name) { - vlog_info_map_[vlog_name] = new vlog_info(vlog_num); - vlog_handler_map_[vlog_name] = new vlog_handler(); + auto vinfo = new vlog_info(vlog_num); + auto vhandler = new vlog_handler(); + vlog_info_map_[vlog_name] = vinfo; + vlog_handler_map_[vlog_name] = vhandler; } inline void VlogSet::remove_vlog_from_maps(std::string &vlog_name) { @@ -277,10 +298,12 @@ void VlogSet::read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::str char value_buff[VALUE_BUFF_SIZE]; handler.read(value_buff, VALUE_BUFF_SIZE); + // FIXME: remove value size uint16_t value_size; memcpy(&value_size, value_buff, sizeof(uint16_t)); value_size &= VALUE_SIZE_MASK; - value->assign(&value_buff[sizeof(uint16_t)], value_size); + *value = std::string(value_buff); +// value->assign(&value_buff[sizeof(uint16_t)], value_size); handler.close(); } @@ -292,8 +315,6 @@ void VlogSet::write_vlog_value(uint32_t vlog_num, uint32_t value_offset, const l handler.write(value_buff, value.size()); auto vinfo = get_vlog_info(vlog_num); - vinfo->value_nums ++; - vinfo->curr_size += value.size(); handler.flush(); handler.close(); @@ -315,7 +336,7 @@ void VlogSet::mark_del_value(uint32_t vlog_num, uint32_t value_offset) { return ; } assert(!(value_size & VALUE_DELE_MASK)); - uint16_t masked_value_size = value_size & 0xffff; + uint16_t masked_value_size = value_size | VALUE_DELE_MASK; memcpy(value_buff, &masked_value_size, sizeof(uint16_t)); handler.write(value_buff, value_size); handler.flush(); diff --git a/db/vlog_set.h b/db/vlog_set.h index 918c37c..a4f031c 100644 --- a/db/vlog_set.h +++ b/db/vlog_set.h @@ -21,6 +21,7 @@ friend class VlogGC; #define CONFIG_FILE_DELE_MASK (0x1 << (sizeof(size_t)-1)) #define CONFIG_FILE_VLOG_NUM(v) ((v) & ~CONFIG_FILE_DELE_MASK) +#define VLOG_GC_THREHOLD 0.8 public: VlogSet(std::string dbname, VlogGC *vlog_gc); ~VlogSet(); @@ -37,7 +38,7 @@ friend class VlogGC; void register_inconfig_file(size_t vlog_num); void remove_from_config_file(size_t vlog_num); - void create_vlog(std::string &vlog_name); + void create_vlog(size_t vlog_num); struct vlog_info *get_writable_vlog_info(size_t value_size); inline void restore_vlog_inmaps(struct vlog_info *vi); inline void register_vlog_inmaps(size_t vlog_num, std::string &vlog_name); diff --git a/test/db_test1.cc b/test/db_test1.cc index 424406c..ade9372 100644 --- a/test/db_test1.cc +++ b/test/db_test1.cc @@ -14,11 +14,13 @@ int main() { string s; db->Get(ReadOptions(), "001", &s); cout<Put(WriteOptions(), "002", "world"); string s1; db->Delete(WriteOptions(), "002"); db->Get(ReadOptions(), "002", &s1); + cout << s1.size() << endl; cout< +#include +#include #include -#include "leveldb/env.h" #include -#include -#include +#include #include -#include +#include +#include + +#include "leveldb/env.h" + #include "gtest/gtest.h" using namespace leveldb; -using Field = std::pair; // field_name:field_value -using FieldArray = std::vector>; -// 序列化为字符串 +// 字段信息结构体 +struct Field { + std::string name; + std::string value; +}; +using FieldArray = std::vector; + +// 序列化函数,将字段数组编码为字符串 std::string SerializeValue(const FieldArray& fields) { - std::ostringstream oss; + // 创建并初始化一个字符串流 oss,用于逐步构建最终的序列化字符串 + std::ostringstream oss_temp; + std::string slot_num = "slot_num"; + oss_temp << std::setw(sizeof(size_t)) << std::setfill('0') << slot_num; + // 写入属性个数(定长,16比特),使用std::setw(16)设置宽度,使用std::setfull(0)设置填充字符,将字段数组的大小写入oss中 + oss_temp << std::setw(16) << std::setfill('0') << fields.size(); for (const auto& field : fields) { - oss << field.first << ":" << field.second << ";"; + // 写入属性名长度(定长,16比特) + oss_temp << std::setw(16) << std::setfill('0') << field.name.size(); + // 写入属性名(变长) + oss_temp << field.name; + // 写入属性值长度(定长,16比特) + oss_temp << std::setw(16) << std::setfill('0') << field.value.size(); + // 写入属性值(变长) + oss_temp << field.value; } + std::string temp_str = oss_temp.str(); + size_t value_length = temp_str.size(); + + std::ostringstream oss; + oss << std::setw(16) << std::setfill('0') << value_length; + oss << temp_str; + + std::cout << "value 的长度为: " << value_length << std::endl; + std::cout << "总长度为: " << oss.str().size() << std::endl; return oss.str(); } -// 反序列化为字段数组 +// 反序列化函数,将字符串解码为字段数组 FieldArray ParseValue(const std::string& value_str) { + // 存放解析后的字段数组 FieldArray fields; + // 将输入字符串转换为输入流 iss, 方便读取 std::istringstream iss(value_str); - std::string field_str; - while (std::getline(iss, field_str, ';')) { - size_t delimiter_pos = field_str.find(':'); - if (delimiter_pos != std::string::npos) { - std::string field_name = field_str.substr(0, delimiter_pos); - std::string field_value = field_str.substr(delimiter_pos + 1); - fields.emplace_back(field_name, field_value); - } + std::string content; + // 临时存放读取的数据 + char buffer[100]; + // 读取长度(定长,16比特) + iss.read(buffer, 16); + buffer[16] = '\0'; + size_t total_length = std::stoi(buffer); + // std::cout << "读取到的总长度为: " << total_length << std::endl; + std::string value_content(value_str.begin() + 16, value_str.begin() + 16 + total_length); + // std::cout << value_content << std::endl; + std::istringstream iss_content(value_content); + iss_content.read(buffer, sizeof(size_t)); + buffer[sizeof(size_t)] = '\0'; + std::string slot_num = buffer; + // 读取属性个数 + iss_content.read(buffer, 16); + // 在第17个比特位处添加终结符,确保字符串以终结符结尾 + buffer[16] = '\0'; + // 将 buffer 中的内容转化为整数并赋值给 field_count + int field_count = std::stoi(buffer); + // std::cout << "读取到的字段个数为: " << field_count << std::endl; + + for (int i = 0; i < field_count; ++i) { + Field field; + // 读取属性名长度(定长,16比特) + iss_content.read(buffer, 16); + buffer[16] = '\0'; + int name_length = std::stoi(buffer); + // std::cout << "读取到的属性名长度为: " << name_length << std::endl; + // 读取属性名(变长) + field.name.resize(name_length); + iss_content.read(&field.name[0], name_length); + // std::cout << "读取到的属性名为: " << field.name << std::endl; + // 读取属性值长度(定长,16比特) + iss_content.read(buffer, 16); + buffer[16] = '\0'; + int value_length = std::stoi(buffer); + // std::cout << "读取到的属性值长度为: " << value_length << std::endl; + // 读取属性值(变长) + field.value.resize(value_length); + iss_content.read(&field.value[0], value_length); + // std::cout << "读取到的属性值为: " << field.value << std::endl; + fields.push_back(field); } return fields; } -// 根据字段值查找所有包含该字段的 key -std::vector FindKeysByField(leveldb::DB* db, Field &field) { +// 根据字段值查找所有包含该字段的 key,遍历 +std::vector FindKeysByField(leveldb::DB* db, const Field& field) { std::vector keys; leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions()); - for (it->SeekToFirst(); it->Valid(); it->Next()) { + for (it->SeekToFirst(); it->Valid() ; it->Next()) { std::string key = it->key().ToString(); std::string value; db->Get(leveldb::ReadOptions(), key, &value); FieldArray fields = ParseValue(value); for (const auto& f : fields) { - if (f.first == field.first && f.second == field.second) { + if (f.name == field.name && f.value == field.value) { keys.push_back(key); - break; // 假设每个key中每个字段值唯一,如果允许重复,可以移除这行 + break; // 假设每个key中每个字段值唯一 } } } @@ -59,38 +127,92 @@ std::vector FindKeysByField(leveldb::DB* db, Field &field) { return keys; } -Status OpenDB(std::string dbName, DB **db) { +Status OpenDB(std::string dbName, DB** db) { Options options; options.create_if_missing = true; return DB::Open(options, dbName, db); } +// 吞吐量测试函数 +void TestThroughput(leveldb::DB* db, int num_operations) { + WriteOptions writeOptions; + auto start_time = std::chrono::steady_clock::now(); + + for (int i = 0; i < num_operations; ++i) { + std::string key = "key_" + std::to_string(i); + FieldArray fields = {{"name", "Customer" + std::to_string(i)}, {"address", "Address" + std::to_string(i)}, {"phone", "1234567890"}}; + std::string value = SerializeValue(fields); + db->Put(writeOptions, key, value); + } + + auto end_time = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(end_time - start_time).count(); + std::cout << "Throughput: " << num_operations * 1000 / duration << " OPS" << std::endl; +} + +// 延迟测试函数 +void TestLatency(leveldb::DB* db, int num_operations, std::vector& lat_res) { + WriteOptions writeOptions; + int64_t latency = 0; + auto end_time = std::chrono::steady_clock::now(); + auto last_time = end_time; + for (int i = 0; i < num_operations; ++i) { + // 执行写入操作 + std::string key = "key_" + std::to_string(i); + FieldArray fields = {{"name", "Customer" + std::to_string(i)}, {"address", "Address" + std::to_string(i)}, {"phone", "1234567890"}}; + std::string value = SerializeValue(fields); + db->Put(writeOptions, key, value); + db->Get(leveldb::ReadOptions(), key, &value); + + end_time = std::chrono::steady_clock::now(); + latency = std::chrono::duration_cast( + end_time - last_time).count(); + last_time = end_time; + + lat_res.emplace_back(latency); + } + + // 输出延迟统计信息(可选) + double avg_latency = std::accumulate(lat_res.begin(), lat_res.end(), 0.0) / lat_res.size(); + std::cout << "Average Latency: " << std::fixed << std::setprecision(2) << avg_latency << " ms" << std::endl; + std::cout << "Max Latency: " << *std::max_element(lat_res.begin(), lat_res.end()) << " ms" << std::endl; + std::cout << "Min Latency: " << *std::min_element(lat_res.begin(), lat_res.end()) << " ms" << std::endl; +} + TEST(TestSchema, Basic) { - DB *db; + DB* db; WriteOptions writeOptions; ReadOptions readOptions; - if(OpenDB("testdb", &db).ok() == false) { + if (!OpenDB("testdb", &db).ok()) { std::cerr << "open db failed" << std::endl; abort(); } std::string key1 = "k_1"; std::string key2 = "k_2"; + std::string key3 = "k_3"; FieldArray fields1 = { - {"name", "Customer#000000001"}, + {"name", "Customer1"}, {"address", "IVhzIApeRb"}, {"phone", "25-989-741-2988"} }; FieldArray fields2 = { - {"name", "Customer#000000001"}, + {"name", "Customer1"}, {"address", "ecnu"}, {"phone", "123456789"} }; + FieldArray fields3 = { + {"name", "Customer2"}, + {"address", "ecnu"}, + {"phone", "11111"} + }; // 序列化并插入 std::string value1 = SerializeValue(fields1); std::string value2 = SerializeValue(fields2); + std::string value3 = SerializeValue(fields3); db->Put(leveldb::WriteOptions(), key1, value1); db->Put(leveldb::WriteOptions(), key2, value2); + db->Put(leveldb::WriteOptions(), key3, value3); // 读取并反序列化 std::string value_ret; @@ -100,21 +222,25 @@ TEST(TestSchema, Basic) { // 检查反序列化结果 ASSERT_EQ(fields_ret.size(), fields1.size()); for (size_t i = 0; i < fields_ret.size(); ++i) { - ASSERT_EQ(fields_ret[i].first, fields1[i].first); - ASSERT_EQ(fields_ret[i].second, fields1[i].second); + ASSERT_EQ(fields_ret[i].name, fields1[i].name); + ASSERT_EQ(fields_ret[i].value, fields1[i].value); } // 测试查找功能 - Field query_field = {"name", "Customer#000000001"}; + Field query_field = {"name", "Customer2"}; std::vector found_keys = FindKeysByField(db, query_field); std::cout << "找到的key有:" << found_keys.size() << "个" << std::endl; - ASSERT_EQ(found_keys[0], key1); + /*// 吞吐量测试 + TestThroughput(db, 10000);*/ + /* // 延迟测试 + std::vector latency_results; + TestLatency(db, 1000, latency_results);*/ // 关闭数据库 delete db; } -int main(int argc, char **argv) { +int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } \ No newline at end of file