Browse Source

update fixed file size

xxy
xxy 9 months ago
parent
commit
ced3c88035
10 changed files with 308 additions and 174 deletions
  1. +1
    -1
      benchmarks/db_bench.cc
  2. +59
    -53
      db/db_impl.cc
  3. +4
    -2
      db/db_impl.h
  4. +3
    -0
      db/dbformat.h
  5. +1
    -1
      include/leveldb/db.h
  6. +1
    -1
      include/leveldb/options.h
  7. +173
    -114
      test/benchmark_4leveldb.cpp
  8. +2
    -2
      test/test.cpp
  9. +55
    -0
      util/coding.cc
  10. +9
    -0
      util/coding.h

+ 1
- 1
benchmarks/db_bench.cc View File

@ -74,7 +74,7 @@ static int FLAGS_reads = -1;
static int FLAGS_threads = 1;
// Size of each value
static int FLAGS_value_size = 100;
static int FLAGS_value_size = 1000;
// Arrange to generate values that shrink to this fraction of
// their original size after compression

+ 59
- 53
db/db_impl.cc View File

@ -143,6 +143,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
shutting_down_(false),
background_work_finished_signal_(&mutex_),
background_gc_finished_signal_(&gc_mutex_),
spj_mutex_cond_(&spj_mutex_),
mem_(nullptr),
imm_(nullptr),
has_imm_(false),
@ -1279,6 +1280,14 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
// Convenience methods
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
if(!o.valuelog_write){
spj_mutex_.Lock();
while(key==valuelog_finding_key){
spj_mutex_cond_.Wait();
}
spj_mutex_.Unlock();
}
return DB::Put(o, key, val);
}
@ -1344,17 +1353,17 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
//if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.SignalAll();
//}
if (ready == last_writer) break;
}
// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
writers_.front()->cv.SignalAll();
}
return status;
@ -1577,6 +1586,18 @@ std::vector> DBImpl::WriteValueLog(
}
uint64_t offset = valueFile.tellp();
if(offset>=config::value_log_size){
addNewValueLog();
valueFile.close();
file_name_ = ValueLogFileName(dbname_, valuelogfile_number_);
valueFile =std::ofstream(file_name_, std::ios::app | std::ios::binary);
if (!valueFile.is_open()) {
assert(0);
}
offset = valueFile.tellp();
}
std::vector<std::pair<uint64_t, uint64_t>> res;
for (const auto& [key_slice, value_slice] : kv) {
@ -1714,47 +1735,6 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key,
return s;
}
// 判断文件是否为 valuelog 文件
bool IsValueLogFile(const std::string& filename) {
// 检查文件是否以 ".valuelog" 结尾
const std::string suffix = ".valuelog";
return filename.size() > suffix.size() &&
filename.substr(filename.size() - suffix.size()) == suffix;
}
// 示例:解析 sstable 中的元信息
void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id,
uint64_t& offset) {
// 假设 stored_value 格式为:valuelog_id|offset
Slice tmp(stored_value.data(), stored_value.size());
GetVarint64(&tmp, &valuelog_id);
GetVarint64(&tmp, &offset);
}
// 示例:获取 ValueLog 文件 ID
uint64_t GetValueLogID(const std::string& valuelog_name) {
// 使用 std::filesystem::path 解析文件名
std::filesystem::path file_path(valuelog_name);
std::string filename = file_path.filename().string(); // 获取文件名部分
// 查找文件名中的 '.' 位置,提取数字部分
auto pos = filename.find('.');
if (pos == std::string::npos) {
assert(0);
}
// 提取数字部分
std::string id_str = filename.substr(0, pos);
// 检查提取的部分是否为有效数字
for (char c : id_str) {
if (!isdigit(c)) {
assert(0);
}
}
return std::stoull(id_str); // 转换为 uint64_t
}
// 垃圾回收实现
void DBImpl::GarbageCollect() {
gc_mutex_.AssertHeld();
@ -1762,14 +1742,14 @@ void DBImpl::GarbageCollect() {
Log(options_.info_log, "start gc ");
auto files_set = fs::directory_iterator(dbname_);
std::set<std::string> valuelog_set;
std::string cur_valuelog_name =
ValueLogFileName(dbname_, valuelogfile_number_);
// std::string cur_valuelog_name =
// ValueLogFileName(dbname_, valuelogfile_number_);
for (const auto& cur_log_file : files_set) {
if (fs::exists(cur_log_file) &&
fs::is_regular_file(fs::status(cur_log_file)) &&
IsValueLogFile(cur_log_file.path().filename().string())) {
if (cur_valuelog_name == cur_log_file.path().filename().string())
continue;
// if (cur_valuelog_name == cur_log_file.path().filename().string())
// continue;
valuelog_set.emplace(cur_log_file.path().filename().string());
}
}
@ -1777,6 +1757,9 @@ void DBImpl::GarbageCollect() {
// std::cout << valuelog_name << std::endl;
uint64_t cur_log_number = GetValueLogID(valuelog_name);
valuelog_name = ValueLogFileName(dbname_, cur_log_number);
if (cur_log_number == valuelogfile_number_) {
continue;
}
uint64_t current_offset = 0;
uint64_t tmp_offset = 0;
@ -1880,7 +1863,22 @@ void DBImpl::GarbageCollect() {
// 检查 key 是否在 sstable 中存在
std::string stored_value;
auto option = leveldb::ReadOptions();
//lock those thread who attempt to push "key"
spj_mutex_.Lock();
valuelog_finding_key=key;
spj_mutex_.Unlock();
//wait for current writer queue to do all their thing
mutex_.Lock();
if(writers_.size()>0){
auto last_writer=writers_.back();
while(!last_writer->done){
last_writer->cv.Wait();
}
}
mutex_.Unlock();
auto option=leveldb::ReadOptions();
option.find_value_log_for_gc = true;
Status status = Get(option, key, &stored_value);
@ -1905,8 +1903,16 @@ void DBImpl::GarbageCollect() {
// 记录无效,跳过
continue;
}
auto write_op=leveldb::WriteOptions();
write_op.valuelog_write=true;
status = Put(write_op, key, value);
spj_mutex_.Lock();
valuelog_finding_key="";
spj_mutex_.Unlock();
spj_mutex_cond_.SignalAll();
status = Put(leveldb::WriteOptions(), key, value);
if (!status.ok()) {
std::cerr << "Error accessing sstable: " << status.ToString()
<< std::endl;

+ 4
- 2
db/db_impl.h View File

@ -211,14 +211,16 @@ class DBImpl : public DB {
// State below is protected by mutex_
port::Mutex mutex_;
port::Mutex gc_mutex_;
// port::Mutex spj_mutex_;
port::Mutex spj_mutex_;
port::CondVar spj_mutex_cond_ GUARDED_BY(spj_mutex_);
// std::shared_mutex value_log_mutex;
std::atomic<bool> shutting_down_;
port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_);
port::CondVar background_gc_finished_signal_ GUARDED_BY(gc_mutex_);
// Slice valuelog_finding_key GUARDED_BY(mutex_ );
Slice valuelog_finding_key="" GUARDED_BY(spj_mutex_ );
MemTable* mem_;
MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted

+ 3
- 0
db/dbformat.h View File

@ -44,6 +44,9 @@ static const int kMaxMemCompactLevel = 2;
// Approximate gap in bytes between samples of data read during iteration.
static const int kReadBytesPeriod = 1048576;
// maximum size of value_log file
static const int value_log_size=4<<14;
} // namespace config
class InternalKey;

+ 1
- 1
include/leveldb/db.h View File

@ -183,7 +183,7 @@ class LEVELDB_EXPORT DB {
// Therefore the following call will compact the entire database:
// db->CompactRange(nullptr, nullptr);
virtual void CompactRange(const Slice* begin, const Slice* end) = 0;
virtual void TEST_GarbageCollect() = 0;
virtual void TEST_GarbageCollect(){};
};

+ 1
- 1
include/leveldb/options.h View File

@ -185,7 +185,7 @@ struct LEVELDB_EXPORT WriteOptions {
// with sync==true has similar crash semantics to a "write()"
// system call followed by "fsync()".
bool sync = false;
// bool valuelog_write=false;
bool valuelog_write=false;
};
} // namespace leveldb

+ 173
- 114
test/benchmark_4leveldb.cpp View File

@ -1,141 +1,200 @@
#include <cassert>
#include <string>
#include <thread>
#include <vector>
#include <iostream>
#include <iomanip>
#include <cmath>
#include <chrono>
#include <random>
#include <string>
#include <mutex>
#include "leveldb/db.h"
// 配置
const int TEST_EXPONENT = 5;
const int TEST_FREQUENCY = static_cast<int>(std::pow(10, TEST_EXPONENT));
const int MIN_STR_LEN = 255;
const int MAX_STR_LEN = 1024;
const std::string DB_PATH = "db_benchmark";
const std::string CHARSET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
// 多语言
const std::string BASE_VALUE = "こんにちは世界!Hello World! Привет, мир! ¡Hola Mundo! 你好,世界!Bonjour le monde! Hallo Welt!";
// 莎士比亚
// const std::string BASE_VALUE = "To be, or not to be, that is the question: Whether 'tis nobler in the mind to suffer the slings and arrows of outrageous fortune, or to take arms against a sea of troubles and by opposing end them.";
// 超长字符
// const std::string BASE_VALUE = []() {
// std::string base = "壹贰叁肆伍陆柒捌玖拾";
// std::string long_text;
// for (int i = 0; i < 100; ++i) { // 重复 100 次
// long_text += base;
// }
// return long_text;
// }();
// 随机字符串生成
class="n">std class="o">: class="o">: class="n">string randomStr() {
int len = rand() % (MAX_STR_LEN - MIN_STR_LEN + 1) + MIN_STR_LEN;
std::string str(len, '\0');
for (int i = 0; i < len; ++i) {
str[i] = CHARSET[rand() % CHARSET.size()];
#include "leveldb/write_batch.h"
#include "leveldb/iterator.h"
#include <sys/stat.h> // For stat to get file size on Unix-like systems
#include <dirent.h> // For directory reading on Unix-like systems
#define THREAD_COUNT 16 // 线程数量
#define PUT_THREAD_COUNT (THREAD_COUNT / 3) // Put线程数量
#define DELETE_THREAD_COUNT (THREAD_COUNT / 3) // Delete线程数量
#define ITERATE_THREAD_COUNT (THREAD_COUNT - PUT_THREAD_COUNT - DELETE_THREAD_COUNT) // Iterate线程数量
#define VALUE_SIZE 1000 // Value的默认大小
#define DATABASE_PATH "db_benchmark" // 数据库路径
std::mutex put_mutex;
std::mutex delete_mutex;
std::mutex iterate_mutex;
std::pair<int,int> put_time_count={0,0};
std::pair<int,int> delete_time_count={0,0};
std::pair<int,int> iterate_time_count={0,0};
// Helper function to generate a random string of a given length
std::string GenerateRandomString(size_t length) {
const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
std::default_random_engine rng(std::random_device{}());
std::uniform_int_distribution<int> dist(0, sizeof(charset) - 2);
std::string result;
result.reserve(length);
for (size_t i = 0; i < length; ++i) {
result += charset[dist(rng)];
}
return str;
return result;
}
// 计算并输出耗时的模板函数
template<typename Func>
void measureTime(const std::string& operation, Func func) {
auto start = std::chrono::system_clock::now();
func();
auto end = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end - start);
double seconds = double(duration.count()) * std::chrono::microseconds::period::num / std::chrono::microseconds::period::den;
// 输出格式化信息
std::cout << "Operation: " << operation << "\n";
std::cout << "Number of operations: " << TEST_FREQUENCY << "\n";
std::cout << "Total time: "
<< std::fixed << std::setprecision(6) << seconds << " seconds\n";
std::cout << "Average time per operation: "
<< std::fixed << std::setprecision(6)
<< (seconds / TEST_FREQUENCY) * 1e6 << " microseconds\n";
std::cout << "========================================\n";
void PutData(leveldb::DB* db, int thread_id, int num_entries, size_t value_size) {
leveldb::WriteOptions write_options;
write_options.sync = false;
auto start_time = std::chrono::high_resolution_clock::now(); // 记录开始时间
for (int i = 0; i < num_entries; ++i) {
std::string key = "key_" + std::to_string(thread_id) + "_" + std::to_string(i);
std::string value = GenerateRandomString(value_size);
leveldb::WriteBatch batch;
batch.Put(key, value);
db->Write(write_options, &batch);
}
auto end_time = std::chrono::high_resolution_clock::now(); // 记录结束时间
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();
put_mutex.lock();
put_time_count.first+=duration;
put_time_count.second+=num_entries;
put_mutex.unlock();
}
int main() {
leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;
void DeleteData(leveldb::DB* db, int thread_id, int num_entries) {
leveldb::WriteOptions write_options;
write_options.sync = false;
// 打开数据库
leveldb::Status status = leveldb::DB::Open(options, DB_PATH, &db);
assert(status.ok());
std::cout << "db open: " << DB_PATH << std::endl;
auto start_time = std::chrono::high_resolution_clock::now(); // 记录开始时间
for (int i = 0; i < num_entries; ++i) {
std::string key = "key_" + std::to_string(thread_id) + "_" + std::to_string(i);
leveldb::WriteBatch batch;
batch.Delete(key);
db->Write(write_options, &batch);
}
auto end_time = std::chrono::high_resolution_clock::now(); // 记录结束时间
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();
delete_mutex.lock();
delete_time_count.first+=duration;
delete_time_count.second+=num_entries;
delete_mutex.unlock();
}
srand(2017);
void IterateData(leveldb::DB* db, leveldb::ReadOptions& read_options) {
std::unique_ptr<leveldb::Iterator> it(db->NewIterator(read_options));
// 生成测试数据
std::string keys[TEST_FREQUENCY];
for (int i = 0; i < TEST_FREQUENCY; ++i) {
keys[i] = randomStr();
auto start_time = std::chrono::high_resolution_clock::now(); // 记录开始时间
for (it->SeekToFirst(); it->Valid(); it->Next()) {
// 这里可以选择是否打印键值对,或者仅遍历不做任何操作
std::cout << "Key: " << it->key().ToString() << ", Value: " << it->value().ToString() << "\n";
}
std::string value = BASE_VALUE;
for (int i = 0; i < 4; ++i) {
value += value; // 扩展 base value
if (!it->status().ok()) {
std::cerr << "Error during iteration: " << it->status().ToString() << "\n";
}
// 测试添加
measureTime("ADD", [&]() {
for (int i = 0; i < TEST_FREQUENCY; ++i) {
status = db->Put(leveldb::WriteOptions(), keys[i], value);
assert(status.ok());
}
});
// 测试获取
measureTime("GET", [&]() {
std::string retrievedValues[TEST_FREQUENCY];
for (int i = 0; i < TEST_FREQUENCY; ++i) {
status = db->Get(leveldb::ReadOptions(), keys[i], &retrievedValues[i]);
assert(status.ok());
assert(retrievedValues[i] == value); // 验证获取结果
}
});
// 测试修改
measureTime("UPDATE", [&]() {
std::string newValue = value + value;
for (int i = 0; i < TEST_FREQUENCY; ++i) {
status = db->Put(leveldb::WriteOptions(), keys[i], newValue);
assert(status.ok());
}
});
auto end_time = std::chrono::high_resolution_clock::now(); // 记录结束时间
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();
iterate_mutex.lock();
iterate_time_count.first+=duration;
iterate_time_count.second++;
iterate_mutex.unlock();
}
// 测试删除
measureTime("DELETE", [&]() {
for (int i = 0; i < TEST_FREQUENCY; ++i) {
status = db->Delete(leveldb::WriteOptions(), keys[i]);
assert(status.ok());
}
});
// Function to calculate the total size of all files in the database directory
uint64_t CalculateDatabaseSize(const std::string& db_path) {
uint64_t total_size = 0;
DIR* dir = opendir(db_path.c_str());
if (dir == nullptr) {
std::cerr << "Failed to open directory: " << db_path << "\n";
return total_size;
}
if (db) {
delete db;
db = nullptr;
struct dirent* entry;
while ((entry = readdir(dir)) != nullptr) {
if (entry->d_type == DT_REG) { // Only consider regular files
std::string full_path = db_path + "/" + entry->d_name;
struct stat file_stat;
if (stat(full_path.c_str(), &file_stat) == 0) {
total_size += file_stat.st_size;
}
}
}
std::cout << "Test completed, database has been closed." << std::endl;
// Delete database directory
closedir(dir);
return total_size;
}
void CleanupDatabase(const std::string& db_path) {
/// Delete database directory
#ifdef _WIN32
std::string command = "rd /s /q \"" + DB_PATH + "\""; // Windows delete directory
std::string command = "rd /s /q \"" + db_path + "\""; // Windows delete directory
#else
std::string command = "rm -rf \"" + DB_PATH + "\""; // Linux/macOS delete directory
std::string command = "rm -rf \"" + db_path + "\""; // Linux/macOS delete directory
#endif
if (std::system(command.c_str()) == 0) {
std::cout << "Database directory has been successfully deleted" << std::endl;
} else {
std::cerr << "Warning: Failed to delete the database directory. Please check manually!" << std::endl;
}
}
int main() {
leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;
leveldb::Status status = leveldb::DB::Open(options, DATABASE_PATH, &db);
if (!status.ok()) {
std::cerr << "Unable to open/create database: " << status.ToString() << "\n";
return 1;
}
const int entries_per_thread = 1000000; // 每个线程执行的操作次数
std::vector<std::thread> threads;
// Create snapshot for iterate threads
leveldb::ReadOptions read_options;
read_options.snapshot = db->GetSnapshot();
// Start threads for Put operations
for (int i = 0; i < PUT_THREAD_COUNT; ++i) {
threads.emplace_back(PutData, db, i, entries_per_thread, VALUE_SIZE);
}
// Start threads for Delete operations
for (int i = 0; i < DELETE_THREAD_COUNT; ++i) {
threads.emplace_back(DeleteData, db, i, entries_per_thread);
}
std::this_thread::sleep_for(std::chrono::seconds(10));
// Start threads for Iterate operations
for (int i = 0; i < ITERATE_THREAD_COUNT; ++i) {
threads.emplace_back(IterateData, db, std::ref(read_options));
}
// Wait for all threads to finish
for (auto& th : threads) {
if (th.joinable()) th.join();
}
threads.clear();
// Release the snapshot after all threads have finished
db->ReleaseSnapshot(read_options.snapshot);
// Close the database
delete db;
std::cout<<"Put average time(per second):"<<put_time_count.first<<" "<<put_time_count.second<<std::endl;
std::cout<<"Delete average time(per second):"<<delete_time_count.first<<" "<<delete_time_count.second<<std::endl;
std::cout<<"Iterate average time(per second):"<<iterate_time_count.first<<" "<<iterate_time_count.second<<std::endl;
// Calculate and print the total size of the database files
uint64_t db_size = CalculateDatabaseSize(DATABASE_PATH);
std::cout << "Total size of database files: " << db_size << " bytes\n";
// Cleanup the database
CleanupDatabase(DATABASE_PATH);
return 0;
}
}

+ 2
- 2
test/test.cpp View File

@ -199,7 +199,7 @@ TEST(Test, Garbage_Collect_TEST) {
abort();
}
std::vector<std::string> values;
for(int i=0;i<500000;i++){
for(int i=0;i<5000;i++){
std::string key=std::to_string(i);
std::string value;
for(int j=0;j<1000;j++){
@ -212,7 +212,7 @@ TEST(Test, Garbage_Collect_TEST) {
db->TEST_GarbageCollect();
std::cout<<"finish gc"<<std::endl;
for(int i=0;i<500000;i++){
for(int i=0;i<5000;i++){
// std::cout<<i<<std::endl;
std::string key=std::to_string(i);
std::string value;

+ 55
- 0
util/coding.cc View File

@ -3,6 +3,9 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/coding.h"
#include <filesystem>
namespace leveldb {
@ -153,4 +156,56 @@ bool GetLengthPrefixedSlice(Slice* input, Slice* result) {
}
}
// 判断文件是否为 valuelog 文件
bool IsValueLogFile(const std::string& filename) {
// 检查文件是否以 ".valuelog" 结尾
const std::string suffix = ".valuelog";
return filename.size() > suffix.size() &&
filename.substr(filename.size() - suffix.size()) == suffix;
}
// 示例:解析 sstable 中的元信息
void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id,
uint64_t& offset) {
// 假设 stored_value 格式为:valuelog_id|offset
Slice tmp(stored_value.data(), stored_value.size());
GetVarint64(&tmp, &valuelog_id);
GetVarint64(&tmp, &offset);
}
// 示例:获取 ValueLog 文件 ID
uint64_t GetValueLogID(const std::string& valuelog_name) {
// 使用 std::filesystem::path 解析文件名
std::filesystem::path file_path(valuelog_name);
std::string filename = file_path.filename().string(); // 获取文件名部分
// 查找文件名中的 '.' 位置,提取数字部分
auto pos = filename.find('.');
if (pos == std::string::npos) {
assert(0);
}
// 提取数字部分
std::string id_str = filename.substr(0, pos);
// 检查提取的部分是否为有效数字
for (char c : id_str) {
if (!isdigit(c)) {
assert(0);
}
}
return std::stoull(id_str); // 转换为 uint64_t
}
// Helper function to split the set of files into chunks
void SplitIntoChunks(const std::set<std::string>& files, int num_workers,
std::vector<std::vector<std::string>>* chunks) {
chunks->resize(num_workers);
int index = 0;
for (const auto& file : files) {
(*chunks)[index % num_workers].push_back(file);
++index;
}
}
} // namespace leveldb

+ 9
- 0
util/coding.h View File

@ -13,6 +13,8 @@
#include <cstdint>
#include <cstring>
#include <string>
#include <set>
#include <vector>
#include "leveldb/slice.h"
#include "port/port.h"
@ -117,6 +119,13 @@ inline const char* GetVarint32Ptr(const char* p, const char* limit,
return GetVarint32PtrFallback(p, limit, value);
}
bool IsValueLogFile(const std::string& filename);
void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id,
uint64_t& offset);
uint64_t GetValueLogID(const std::string& valuelog_name);
void SplitIntoChunks(const std::set<std::string>& files, int num_workers,
std::vector<std::vector<std::string>>* chunks);
} // namespace leveldb
#endif // STORAGE_LEVELDB_UTIL_CODING_H_

Loading…
Cancel
Save