Browse Source

Merge remote-tracking branch 'origin/master'

master
马也驰 8 months ago
parent
commit
20ff00ebac
6 changed files with 218 additions and 60 deletions
  1. +19
    -14
      db/db_impl.cc
  2. +3
    -0
      db/db_impl.h
  3. +33
    -12
      db/vlog_set.cpp
  4. +2
    -1
      db/vlog_set.h
  5. +2
    -0
      test/db_test1.cc
  6. +159
    -33
      test/db_test3.cc

+ 19
- 14
db/db_impl.cc View File

@ -1129,11 +1129,12 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
return versions_->MaxNextLevelOverlappingBytes();
}
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status DBImpl::get_slot_num(const ReadOptions& options, const Slice& key,
size_t *slot_num) {
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
std::string value;
if (options.snapshot != nullptr) {
snapshot =
static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
@ -1156,12 +1157,12 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
if (mem->Get(lkey, &value, &s)) {
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
} else if (imm != nullptr && imm->Get(lkey, &value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
s = current->Get(options, lkey, &value, &stats);
have_stat_update = true;
}
mutex_.Lock();
@ -1174,8 +1175,17 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
if (imm != nullptr) imm->Unref();
current->Unref();
*slot_num = *(size_t *)(&value)->c_str();
return s;
}
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
size_t slot_num;
auto s = get_slot_num(options, key, &slot_num);
// TODO: search the slotpage and get value from vlog
size_t slot_num = *(size_t *)value->c_str();
struct slot_content sc;
std::string vlog_value;
slot_page_->get_slot(slot_num, &sc);
@ -1237,15 +1247,10 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
// slot_page_->get_slot(slot_num, &sc);
// vlog_set_->get_value(sc.vlog_num, sc.value_offset, &vlog_value);
// *value = vlog_value;
ReadOptions ro;
ro.verify_checksums = true;
ro.fill_cache = false;
ro.snapshot = nullptr;
std::string value;
Get(ro, key, &value);
size_t slot_num = *(size_t *)value.c_str();
size_t slot_num;
auto s = get_slot_num(ReadOptions(), key, &slot_num);
struct slot_content sc;
std::string vlog_value;
slot_page_->get_slot(slot_num, &sc);
vlog_set_->del_value(sc.vlog_num, sc.value_offset);

+ 3
- 0
db/db_impl.h View File

@ -107,6 +107,9 @@ class DBImpl : public DB {
int64_t bytes_written;
};
Status get_slot_num(const ReadOptions& options, const Slice& key,
size_t *slot_num);
Iterator* NewInternalIterator(const ReadOptions&,
SequenceNumber* latest_snapshot,
uint32_t* seed);

+ 33
- 12
db/vlog_set.cpp View File

@ -22,7 +22,12 @@ VlogSet::VlogSet(std::string dbname, VlogGC *vlog_gc) : dbname(dbname), vlog_gc(
delete this->config_file_;
// 重新以读写模式打开
this->config_file_ = new std::fstream(cfname, std::ios::in | std::ios::out);
this->config_file_->seekp(0);
size_t tmp = 0;
this->config_file_->write(reinterpret_cast<const char*>(&tmp), sizeof(size_t));
this->config_file_->flush();
this->vlog_nums_ = 0;
register_new_vlog();
} else {
// config 文件存在
size_t _vlog_nums_;
@ -43,7 +48,7 @@ VlogSet::VlogSet(std::string dbname, VlogGC *vlog_gc) : dbname(dbname), vlog_gc(
curr_vlog.read(reinterpret_cast<char*>(curr_vlog_header), 2*sizeof(size_t));
curr_vlog.close();
restore_vlog_inmaps(new vlog_info(curr_vlog_num, tmp[1], tmp[0]));
restore_vlog_inmaps(new vlog_info(curr_vlog_num, curr_vlog_header[1], curr_vlog_header[0]));
}
}
}
@ -162,7 +167,7 @@ size_t VlogSet::register_new_vlog() {
size_t vn = vlog_nums_;
std::string vlog_name = get_vlog_name(vn);
register_inconfig_file(vn);
create_vlog(vlog_name);
create_vlog(vn);
// auto vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out);
// if (!vlog_new->is_open()) {
// std::cerr << "Failed to open or create the vlog file: " << vlog_new << std::endl;
@ -190,8 +195,12 @@ void VlogSet::remove_old_vlog(size_t old_vlog_num) {
}
bool VlogSet::vlog_need_gc(size_t vlog_num) {
// FIXME: vlog应该已经满了才行
std::string vlog_name = get_vlog_name(vlog_num);
auto vi = vlog_info_map_[vlog_name];
if ((double)vi->curr_size/VLOG_SIZE < VLOG_GC_THREHOLD) {
return false;
}
bool retval = vi->vlog_valid_ && (vi->discard/vi->value_nums >= GC_THREDHOLD);
return retval;
}
@ -223,13 +232,23 @@ void VlogSet::remove_from_config_file(size_t vlog_num) {
}
}
void VlogSet::create_vlog(std::string &vlog_name) {
void VlogSet::create_vlog(size_t vlog_num) {
auto vlog_name = get_vlog_name(vlog_num);
std::fstream *vlog_new = new std::fstream(vlog_name, std::ios::out);
char tmp[2*sizeof(size_t)];
memset(tmp, 0, sizeof(tmp));
vlog_new->write(tmp, sizeof(tmp));
vlog_new->close();
delete vlog_new;
// 重新以读写模式打开
vlog_new = new std::fstream(vlog_name, std::ios::in | std::ios::out);
if (!vlog_new->is_open()) {
std::cerr << "Failed to open or create the vlog file: " << vlog_name << std::endl;
std::exit(EXIT_FAILURE);
}
size_t tmp[2] = {2*sizeof(size_t), vlog_num};
vlog_new->seekp(0);
vlog_new->write(reinterpret_cast<const char*>(tmp), sizeof(tmp));
vlog_new->flush();
vlog_new->close();
delete vlog_new;
}
inline void VlogSet::restore_vlog_inmaps(struct vlog_info *vi) {
@ -239,8 +258,10 @@ inline void VlogSet::restore_vlog_inmaps(struct vlog_info *vi) {
}
inline void VlogSet::register_vlog_inmaps(size_t vlog_num, std::string &vlog_name) {
vlog_info_map_[vlog_name] = new vlog_info(vlog_num);
vlog_handler_map_[vlog_name] = new vlog_handler();
auto vinfo = new vlog_info(vlog_num);
auto vhandler = new vlog_handler();
vlog_info_map_[vlog_name] = vinfo;
vlog_handler_map_[vlog_name] = vhandler;
}
inline void VlogSet::remove_vlog_from_maps(std::string &vlog_name) {
@ -277,10 +298,12 @@ void VlogSet::read_vlog_value(uint32_t vlog_num, uint32_t value_offset, std::str
char value_buff[VALUE_BUFF_SIZE];
handler.read(value_buff, VALUE_BUFF_SIZE);
// FIXME: remove value size
uint16_t value_size;
memcpy(&value_size, value_buff, sizeof(uint16_t));
value_size &= VALUE_SIZE_MASK;
value->assign(&value_buff[sizeof(uint16_t)], value_size);
*value = std::string(value_buff);
// value->assign(&value_buff[sizeof(uint16_t)], value_size);
handler.close();
}
@ -292,8 +315,6 @@ void VlogSet::write_vlog_value(uint32_t vlog_num, uint32_t value_offset, const l
handler.write(value_buff, value.size());
auto vinfo = get_vlog_info(vlog_num);
vinfo->value_nums ++;
vinfo->curr_size += value.size();
handler.flush();
handler.close();
@ -315,7 +336,7 @@ void VlogSet::mark_del_value(uint32_t vlog_num, uint32_t value_offset) {
return ;
}
assert(!(value_size & VALUE_DELE_MASK));
uint16_t masked_value_size = value_size & 0xffff;
uint16_t masked_value_size = value_size | VALUE_DELE_MASK;
memcpy(value_buff, &masked_value_size, sizeof(uint16_t));
handler.write(value_buff, value_size);
handler.flush();

+ 2
- 1
db/vlog_set.h View File

@ -21,6 +21,7 @@ friend class VlogGC;
#define CONFIG_FILE_DELE_MASK (0x1 << (sizeof(size_t)-1))
#define CONFIG_FILE_VLOG_NUM(v) ((v) & ~CONFIG_FILE_DELE_MASK)
#define VLOG_GC_THREHOLD 0.8
public:
VlogSet(std::string dbname, VlogGC *vlog_gc);
~VlogSet();
@ -37,7 +38,7 @@ friend class VlogGC;
void register_inconfig_file(size_t vlog_num);
void remove_from_config_file(size_t vlog_num);
void create_vlog(std::string &vlog_name);
void create_vlog(size_t vlog_num);
struct vlog_info *get_writable_vlog_info(size_t value_size);
inline void restore_vlog_inmaps(struct vlog_info *vi);
inline void register_vlog_inmaps(size_t vlog_num, std::string &vlog_name);

+ 2
- 0
test/db_test1.cc View File

@ -14,11 +14,13 @@ int main() {
string s;
db->Get(ReadOptions(), "001", &s);
cout<<s<<endl;
cout << s.size() << endl;
db->Put(WriteOptions(), "002", "world");
string s1;
db->Delete(WriteOptions(), "002");
db->Get(ReadOptions(), "002", &s1);
cout << s1.size() << endl;
cout<<s1<<endl;
delete db;

+ 159
- 33
test/db_test3.cc View File

@ -1,56 +1,124 @@
#include <chrono>
#include <iomanip>
#include <iostream>
#include <leveldb/db.h>
#include "leveldb/env.h"
#include <leveldb/options.h>
#include <vector>
#include <string>
#include <numeric>
#include <sstream>
#include <iostream>
#include <string>
#include <vector>
#include "leveldb/env.h"
#include "gtest/gtest.h"
using namespace leveldb;
using Field = std::pair<std::string, std::string>; // field_name:field_value
using FieldArray = std::vector<std::pair<std::string, std::string>>;
// 序列化为字符串
// 字段信息结构体
struct Field {
std::string name;
std::string value;
};
using FieldArray = std::vector<Field>;
// 序列化函数,将字段数组编码为字符串
std::string SerializeValue(const FieldArray& fields) {
std::ostringstream oss;
// 创建并初始化一个字符串流 oss,用于逐步构建最终的序列化字符串
std::ostringstream oss_temp;
std::string slot_num = "slot_num";
oss_temp << std::setw(sizeof(size_t)) << std::setfill('0') << slot_num;
// 写入属性个数(定长,16比特),使用std::setw(16)设置宽度,使用std::setfull(0)设置填充字符,将字段数组的大小写入oss中
oss_temp << std::setw(16) << std::setfill('0') << fields.size();
for (const auto& field : fields) {
oss << field.first << ":" << field.second << ";";
// 写入属性名长度(定长,16比特)
oss_temp << std::setw(16) << std::setfill('0') << field.name.size();
// 写入属性名(变长)
oss_temp << field.name;
// 写入属性值长度(定长,16比特)
oss_temp << std::setw(16) << std::setfill('0') << field.value.size();
// 写入属性值(变长)
oss_temp << field.value;
}
std::string temp_str = oss_temp.str();
size_t value_length = temp_str.size();
std::ostringstream oss;
oss << std::setw(16) << std::setfill('0') << value_length;
oss << temp_str;
std::cout << "value 的长度为: " << value_length << std::endl;
std::cout << "总长度为: " << oss.str().size() << std::endl;
return oss.str();
}
// 反序列化为字段数组
// 反序列化函数,将字符串解码为字段数组
FieldArray ParseValue(const std::string& value_str) {
// 存放解析后的字段数组
FieldArray fields;
// 将输入字符串转换为输入流 iss, 方便读取
std::istringstream iss(value_str);
std::string field_str;
while (std::getline(iss, field_str, ';')) {
size_t delimiter_pos = field_str.find(':');
if (delimiter_pos != std::string::npos) {
std::string field_name = field_str.substr(0, delimiter_pos);
std::string field_value = field_str.substr(delimiter_pos + 1);
fields.emplace_back(field_name, field_value);
}
std::string content;
// 临时存放读取的数据
char buffer[100];
// 读取长度(定长,16比特)
iss.read(buffer, 16);
buffer[16] = '\0';
size_t total_length = std::stoi(buffer);
// std::cout << "读取到的总长度为: " << total_length << std::endl;
std::string value_content(value_str.begin() + 16, value_str.begin() + 16 + total_length);
// std::cout << value_content << std::endl;
std::istringstream iss_content(value_content);
iss_content.read(buffer, sizeof(size_t));
buffer[sizeof(size_t)] = '\0';
std::string slot_num = buffer;
// 读取属性个数
iss_content.read(buffer, 16);
// 在第17个比特位处添加终结符,确保字符串以终结符结尾
buffer[16] = '\0';
// 将 buffer 中的内容转化为整数并赋值给 field_count
int field_count = std::stoi(buffer);
// std::cout << "读取到的字段个数为: " << field_count << std::endl;
for (int i = 0; i < field_count; ++i) {
Field field;
// 读取属性名长度(定长,16比特)
iss_content.read(buffer, 16);
buffer[16] = '\0';
int name_length = std::stoi(buffer);
// std::cout << "读取到的属性名长度为: " << name_length << std::endl;
// 读取属性名(变长)
field.name.resize(name_length);
iss_content.read(&field.name[0], name_length);
// std::cout << "读取到的属性名为: " << field.name << std::endl;
// 读取属性值长度(定长,16比特)
iss_content.read(buffer, 16);
buffer[16] = '\0';
int value_length = std::stoi(buffer);
// std::cout << "读取到的属性值长度为: " << value_length << std::endl;
// 读取属性值(变长)
field.value.resize(value_length);
iss_content.read(&field.value[0], value_length);
// std::cout << "读取到的属性值为: " << field.value << std::endl;
fields.push_back(field);
}
return fields;
}
// 根据字段值查找所有包含该字段的 key
std::vector<std::string> FindKeysByField(leveldb::DB* db, Field &field) {
// 根据字段值查找所有包含该字段的 key,遍历
std::vector<std::string> FindKeysByField(leveldb::DB* db, const Field& field) {
std::vector<std::string> keys;
leveldb::Iterator* it = db->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
for (it->SeekToFirst(); it->Valid() ; it->Next()) {
std::string key = it->key().ToString();
std::string value;
db->Get(leveldb::ReadOptions(), key, &value);
FieldArray fields = ParseValue(value);
for (const auto& f : fields) {
if (f.first == field.first && f.second == field.second) {
if (f.name == field.name && f.value == field.value) {
keys.push_back(key);
break; // 假设每个key中每个字段值唯一,如果允许重复,可以移除这行
break; // 假设每个key中每个字段值唯一
}
}
}
@ -59,38 +127,92 @@ std::vector FindKeysByField(leveldb::DB* db, Field &field) {
return keys;
}
Status OpenDB(std::string dbName, DB **db) {
Status OpenDB(std::string dbName, DB** db) {
Options options;
options.create_if_missing = true;
return DB::Open(options, dbName, db);
}
// 吞吐量测试函数
void TestThroughput(leveldb::DB* db, int num_operations) {
WriteOptions writeOptions;
auto start_time = std::chrono::steady_clock::now();
for (int i = 0; i < num_operations; ++i) {
std::string key = "key_" + std::to_string(i);
FieldArray fields = {{"name", "Customer" + std::to_string(i)}, {"address", "Address" + std::to_string(i)}, {"phone", "1234567890"}};
std::string value = SerializeValue(fields);
db->Put(writeOptions, key, value);
}
auto end_time = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();
std::cout << "Throughput: " << num_operations * 1000 / duration << " OPS" << std::endl;
}
// 延迟测试函数
void TestLatency(leveldb::DB* db, int num_operations, std::vector<int64_t>& lat_res) {
WriteOptions writeOptions;
int64_t latency = 0;
auto end_time = std::chrono::steady_clock::now();
auto last_time = end_time;
for (int i = 0; i < num_operations; ++i) {
// 执行写入操作
std::string key = "key_" + std::to_string(i);
FieldArray fields = {{"name", "Customer" + std::to_string(i)}, {"address", "Address" + std::to_string(i)}, {"phone", "1234567890"}};
std::string value = SerializeValue(fields);
db->Put(writeOptions, key, value);
db->Get(leveldb::ReadOptions(), key, &value);
end_time = std::chrono::steady_clock::now();
latency = std::chrono::duration_cast<std::chrono::milliseconds>(
end_time - last_time).count();
last_time = end_time;
lat_res.emplace_back(latency);
}
// 输出延迟统计信息(可选)
double avg_latency = std::accumulate(lat_res.begin(), lat_res.end(), 0.0) / lat_res.size();
std::cout << "Average Latency: " << std::fixed << std::setprecision(2) << avg_latency << " ms" << std::endl;
std::cout << "Max Latency: " << *std::max_element(lat_res.begin(), lat_res.end()) << " ms" << std::endl;
std::cout << "Min Latency: " << *std::min_element(lat_res.begin(), lat_res.end()) << " ms" << std::endl;
}
TEST(TestSchema, Basic) {
DB *db;
DB* db;
WriteOptions writeOptions;
ReadOptions readOptions;
if(OpenDB("testdb", &db).ok() == false) {
if (!OpenDB("testdb", &db).ok()) {
std::cerr << "open db failed" << std::endl;
abort();
}
std::string key1 = "k_1";
std::string key2 = "k_2";
std::string key3 = "k_3";
FieldArray fields1 = {
{"name", "Customer#000000001"},
{"name", "Customer1"},
{"address", "IVhzIApeRb"},
{"phone", "25-989-741-2988"}
};
FieldArray fields2 = {
{"name", "Customer#000000001"},
{"name", "Customer1"},
{"address", "ecnu"},
{"phone", "123456789"}
};
FieldArray fields3 = {
{"name", "Customer2"},
{"address", "ecnu"},
{"phone", "11111"}
};
// 序列化并插入
std::string value1 = SerializeValue(fields1);
std::string value2 = SerializeValue(fields2);
std::string value3 = SerializeValue(fields3);
db->Put(leveldb::WriteOptions(), key1, value1);
db->Put(leveldb::WriteOptions(), key2, value2);
db->Put(leveldb::WriteOptions(), key3, value3);
// 读取并反序列化
std::string value_ret;
@ -100,21 +222,25 @@ TEST(TestSchema, Basic) {
// 检查反序列化结果
ASSERT_EQ(fields_ret.size(), fields1.size());
for (size_t i = 0; i < fields_ret.size(); ++i) {
ASSERT_EQ(fields_ret[i].first, fields1[i].first);
ASSERT_EQ(fields_ret[i].second, fields1[i].second);
ASSERT_EQ(fields_ret[i].name, fields1[i].name);
ASSERT_EQ(fields_ret[i].value, fields1[i].value);
}
// 测试查找功能
Field query_field = {"name", "Customer#000000001"};
Field query_field = {"name", "Customer2"};
std::vector<std::string> found_keys = FindKeysByField(db, query_field);
std::cout << "找到的key有:" << found_keys.size() << "" << std::endl;
ASSERT_EQ(found_keys[0], key1);
/*// 吞吐量测试
TestThroughput(db, 10000);*/
/* // 延迟测试
std::vector<int64_t> latency_results;
TestLatency(db, 1000, latency_results);*/
// 关闭数据库
delete db;
}
int main(int argc, char **argv) {
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

Loading…
Cancel
Save