Browse Source

Merge branch 'pzy' into mainaa

main
jiyeoniya 9 months ago
parent
commit
21e23e7d9c
12 changed files with 282 additions and 9 deletions
  1. +6
    -1
      CMakeLists.txt
  2. +19
    -2
      db/db_impl.cc
  3. +4
    -0
      db/db_impl.h
  4. +28
    -0
      db/write_batch.cc
  5. +8
    -0
      include/leveldb/options.h
  6. +26
    -0
      table/blob_file.cc
  7. +25
    -0
      table/blob_file.h
  8. +46
    -3
      table/table_builder.cc
  9. +119
    -0
      test/kv_seperate_test.cc
  10. +0
    -1
      third_party/benchmark
  11. +0
    -1
      third_party/googletest
  12. +1
    -1
      util/coding.h

+ 6
- 1
CMakeLists.txt View File

@ -533,4 +533,9 @@ target_link_libraries(ttl_test PRIVATE leveldb gtest)
add_executable(field_test
"${PROJECT_SOURCE_DIR}/test/field_test.cc"
)
target_link_libraries(field_test PRIVATE leveldb gtest)
target_link_libraries(field_test PRIVATE leveldb gtest)
# add_executable(kv_seperate_test
# "${PROJECT_SOURCE_DIR}/test/kv_seperate_test.cc"
# )
# target_link_libraries(kv_seperate_test PRIVATE leveldb gtest)

+ 19
- 2
db/db_impl.cc View File

@ -149,6 +149,8 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
manual_compaction_(nullptr),
versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {}
bool static key_value_separated_; //朴,添加是否kv分离,12.07
DBImpl::~DBImpl() {
// Wait for background work to finish.
@ -1196,10 +1198,21 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
// Convenience methods
// Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
// return DB::Put(o, key, val);
// }
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
return DB::Put(o, key, val);
if (key_value_separated_) {
// 分离key和value的逻辑,朴,12.07
//...
} else {
// 不分离key和value的逻辑
return DB::Put(o, key, val);
}
}
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val, uint64_t ttl) {
return DB::Put(o, key, val, ttl);
} // 实现新的put接口,心
@ -1491,7 +1504,7 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
// Default implementations of convenience methods that subclasses of DB
// can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { //朴
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
@ -1504,6 +1517,10 @@ Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value, ui
return Write(opt, &batch);
} // 这里应该是新的PUT接口的真正实现的地方,还是由本来的DB类实现,怪?心
Status DB::Delete(const WriteOptions& opt, const Slice& key) {
WriteBatch batch;
batch.Delete(key);

+ 4
- 0
db/db_impl.h View File

@ -50,6 +50,8 @@ class DBImpl : public DB {
bool GetProperty(const Slice& property, std::string* value) override;
void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override;
void CompactRange(const Slice* begin, const Slice* end) override;
// kv分离接口12.07
bool static key_value_separated_;
// Extra methods (for testing) that are not in the public DB interface
@ -79,6 +81,8 @@ class DBImpl : public DB {
struct CompactionState;
struct Writer;
// Information for a manual compaction
struct ManualCompaction {
int level;

+ 28
- 0
db/write_batch.cc View File

@ -19,10 +19,12 @@
#include "db/memtable.h"
#include "db/write_batch_internal.h"
#include "leveldb/db.h"
#include "db/db_impl.h" //朴
#include "util/coding.h"
#include <sstream> // For std::ostringstream 心
#include <cstdint>
#include <string>
namespace leveldb {
@ -105,6 +107,32 @@ void WriteBatch::Put(const Slice& key, const Slice& value) {
PutLengthPrefixedSlice(&rep_, value);
}
// void WriteBatch::Put(const Slice& key, const Slice& value) { // 朴,kv分离,12.07
// if (DBImpl::key_value_separated_) {
// // 分离key和value的逻辑
// // 例如,你可以将key和value分别存储在不同的容器中
// // 这里需要根据你的具体需求来实现
// //...
// if (value.size() > max_value_size_) {
// // 分离key和value的逻辑
// // 将value存进新的数据结构blobfile
// //...
// // 例如,你可以使用以下代码将value写入blobfile
// std::ofstream blobfile("blobfile.dat", std::ios::binary | std::ios::app);
// blobfile.write(value.data(), value.size());
// blobfile.close();
// }
// }
// else {
// // 不分离key和value的逻辑
// WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
// rep_.push_back(static_cast<char>(kTypeValue));
// PutLengthPrefixedSlice(&rep_, key);
// PutLengthPrefixedSlice(&rep_, value);
// }
// }
void WriteBatch::Put(const Slice& key, const Slice& value, uint64_t ttl) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));

+ 8
- 0
include/leveldb/options.h View File

@ -145,6 +145,14 @@ struct LEVELDB_EXPORT Options {
// Many applications will benefit from passing the result of
// NewBloomFilterPolicy() here.
const FilterPolicy* filter_policy = nullptr;
// KV
bool key_value_separated = false;
Options() {
//
key_value_separated = false;
}
};
// Options that control read operations

+ 26
- 0
table/blob_file.cc View File

@ -0,0 +1,26 @@
#include "blob_file.h"
#include <fstream>
namespace leveldb {
BlobFile::BlobFile(const std::string& filename) : filename_(filename) {
// 初始化 BlobFile,例如打开文件
}
BlobFile::~BlobFile() {
// 关闭文件
}
Status BlobFile::Put(const Slice& key, const Slice& value) {
std::ofstream file(filename_, std::ios::app | std::ios::binary);
if (!file.is_open()) {
return Status::IOError("Failed to open blob file");
}
// 简单实现,将 key 和 value 写入文件
file.write(key.data(), key.size());
file.write(value.data(), value.size());
file.close();
return Status::OK();
}
} // namespace leveldb

+ 25
- 0
table/blob_file.h View File

@ -0,0 +1,25 @@
#ifndef LEVELDB_BLOB_FILE_H_
#define LEVELDB_BLOB_FILE_H_
#include <string>
#include "leveldb/status.h"
#include "leveldb/slice.h"
namespace leveldb {
class BlobFile {
public:
BlobFile(const std::string& filename);
~BlobFile();
//
Status Put(const Slice& key, const Slice& value);
private:
std::string filename_;
//
};
} // namespace leveldb
#endif // LEVELDB_BLOB_FILE_H_

+ 46
- 3
table/table_builder.cc View File

@ -15,9 +15,23 @@
#include "table/format.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "db/db_impl.h" //朴
#include "table/blob_file.h" //朴
#include "table/block.h" //朴
const size_t min_blob_size = 1024; // 设定值大小阈值为 1KB,朴
namespace leveldb {
BlobFile* blobfile = new BlobFile("blob_data"); // 初始化全局 blobfile 对象,朴
class BlobFileManager {
public:
static BlobFile* GetInstance() {
static BlobFile instance("blob_data");
return &instance;
}
};
struct TableBuilder::Rep {
Rep(const Options& opt, WritableFile* f)
: options(opt),
@ -126,12 +140,41 @@ void TableBuilder::Flush() {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return;
if (r->data_block.empty()) return; //朴,正常判断
assert(!r->pending_index_entry);
WriteBlock(&r->data_block, &r->pending_handle);
if (DBImpl::key_value_separated_) {
// 这里获取数据块内容并初始化 Block 对象,朴
Slice block_content = r->data_block.Finish();
BlockContents contents;
contents.data = block_content;
contents.heap_allocated = false;
contents.cachable = false;
// 初始化 Block
Block data_block(contents);
std::unique_ptr<Iterator> iter(data_block.NewIterator(Options().comparator));
// 遍历数据块中的键值对
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
const Slice& key = iter->key();
const Slice& value = iter->value();
// 检查值是否大于阈值
if (value.size() > min_blob_size) {
// 将值存储到 blobfile 中
Status status = blobfile->Put(key, value);
if (!status.ok()) {
r->status = status;
}
}
}
}
WriteBlock(&r->data_block, &r->pending_handle); //将数据块写入文件,并获取数据块的句柄。
if (ok()) {
r->pending_index_entry = true;
r->status = r->file->Flush();
r->status = r->file->Flush(); //刷新
}
if (r->filter_block != nullptr) {
r->filter_block->StartBlock(r->offset);

+ 119
- 0
test/kv_seperate_test.cc View File

@ -0,0 +1,119 @@
#include "gtest/gtest.h"
#include "leveldb/env.h"
#include "leveldb/db.h"
#include "table/blob_file.h" // 假设 BlobFile 的头文件
using namespace leveldb;
constexpr int value_size = 2048; // 单个值的大小
constexpr int data_size = 128 << 20; // 总数据大小
constexpr int min_blob_size = 1024; // KV 分离的阈值
Status OpenDB(std::string dbName, DB** db) {
Options options;
options.create_if_missing = true;
options.key_value_separated = true; // 启用 KV 分离
return DB::Open(options, dbName, db);
}
// 插入数据,模拟 KV 分离
void InsertData(DB* db) {
WriteOptions writeOptions;
int key_num = data_size / value_size;
srand(static_cast<unsigned int>(time(0)));
for (int i = 0; i < key_num; i++) {
int key_ = rand() % key_num + 1;
std::string key = std::to_string(key_);
std::string value(value_size, 'a'); // 大 value
db->Put(writeOptions, key, value); // 使用标准 Put 接口插入
}
}
// 检查数据是否被正确存入 BlobFile
void VerifyBlobFile(const std::string& blob_file_path, int expected_entries) {
BlobFile blobfile(blob_file_path, BlobFile::kReadMode);
Status status = blobfile.Open();
ASSERT_TRUE(status.ok());
int entry_count = 0;
BlobFile::Iterator it = blobfile.NewIterator();
for (it.SeekToFirst(); it.Valid(); it.Next()) {
++entry_count;
const Slice& key = it.key();
const Slice& value = it.value();
ASSERT_GT(value.size(), min_blob_size); // 确认 value 大于阈值
}
ASSERT_EQ(entry_count, expected_entries); // 确认条目数是否正确
blobfile.Close();
}
// KV 分离读写测试
TEST(TestKVSeparation, WriteAndRead) {
DB* db;
if (OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// 插入数据
InsertData(db);
// 验证 BlobFile 内容
VerifyBlobFile("blob_data", data_size / value_size);
// 随机点查数据
ReadOptions readOptions;
srand(static_cast<unsigned int>(time(0)));
int key_num = data_size / value_size;
for (int i = 0; i < 100; i++) {
int key_ = rand() % key_num + 1;
std::string key = std::to_string(key_);
std::string value;
Status status = db->Get(readOptions, key, &value);
ASSERT_TRUE(status.ok()); // 验证是否成功读取
if (value.size() > min_blob_size) {
ASSERT_TRUE(value == std::string(value_size, 'a')); // 验证大 value 的内容
}
}
delete db;
}
// KV 分离压缩测试
TEST(TestKVSeparation, Compaction) {
DB* db;
if (OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
// 插入数据
InsertData(db);
leveldb::Range ranges[1];
ranges[0] = leveldb::Range("-", "A");
uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_GT(sizes[0], 0);
// 执行压缩
db->CompactRange(nullptr, nullptr);
// 验证压缩后主数据区的大小
ranges[0] = leveldb::Range("-", "A");
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_EQ(sizes[0], 0);
// 验证 BlobFile 内容仍然有效
VerifyBlobFile("blob_data", data_size / value_size);
delete db;
}
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

+ 0
- 1
third_party/benchmark

@ -1 +0,0 @@
Subproject commit f7547e29ccaed7b64ef4f7495ecfff1c9f6f3d03

+ 0
- 1
third_party/googletest

@ -1 +0,0 @@
Subproject commit 662fe38e44900c007eccb65a5d2ea19df7bd520e

+ 1
- 1
util/coding.h View File

@ -13,7 +13,7 @@
#include <cstdint>
#include <cstring>
#include <string>
#include<vector>
#include <vector>
#include "util/coding.h"
#include "leveldb/db.h"
#include <iostream>

Loading…
Cancel
Save