diff --git a/benchmarks/db_bench.cc b/benchmarks/db_bench.cc index 8e3f4e7..717fd77 100644 --- a/benchmarks/db_bench.cc +++ b/benchmarks/db_bench.cc @@ -74,7 +74,7 @@ static int FLAGS_reads = -1; static int FLAGS_threads = 1; // Size of each value -static int FLAGS_value_size = 100; +static int FLAGS_value_size = 1000; // Arrange to generate values that shrink to this fraction of // their original size after compression diff --git a/db/db_impl.cc b/db/db_impl.cc index 1f7ce83..d1572d8 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -143,6 +143,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) shutting_down_(false), background_work_finished_signal_(&mutex_), background_gc_finished_signal_(&gc_mutex_), + spj_mutex_cond_(&spj_mutex_), mem_(nullptr), imm_(nullptr), has_imm_(false), @@ -1279,6 +1280,14 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) { // Convenience methods Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { + if(!o.valuelog_write){ + spj_mutex_.Lock(); + while(key==valuelog_finding_key){ + spj_mutex_cond_.Wait(); + } + spj_mutex_.Unlock(); + } + return DB::Put(o, key, val); } @@ -1344,17 +1353,17 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { while (true) { Writer* ready = writers_.front(); writers_.pop_front(); - if (ready != &w) { - ready->status = status; - ready->done = true; - ready->cv.Signal(); - } + //if (ready != &w) { + ready->status = status; + ready->done = true; + ready->cv.SignalAll(); + //} if (ready == last_writer) break; } // Notify new head of write queue if (!writers_.empty()) { - writers_.front()->cv.Signal(); + writers_.front()->cv.SignalAll(); } return status; @@ -1577,6 +1586,18 @@ std::vector> DBImpl::WriteValueLog( } uint64_t offset = valueFile.tellp(); + + if(offset>=config::value_log_size){ + addNewValueLog(); + valueFile.close(); + file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); + valueFile =std::ofstream(file_name_, std::ios::app | std::ios::binary); + if (!valueFile.is_open()) { + assert(0); + } + offset = valueFile.tellp(); + + } std::vector> res; for (const auto& [key_slice, value_slice] : kv) { @@ -1714,47 +1735,6 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, return s; } -// 判断文件是否为 valuelog 文件 -bool IsValueLogFile(const std::string& filename) { - // 检查文件是否以 ".valuelog" 结尾 - const std::string suffix = ".valuelog"; - return filename.size() > suffix.size() && - filename.substr(filename.size() - suffix.size()) == suffix; -} - -// 示例:解析 sstable 中的元信息 -void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id, - uint64_t& offset) { - // 假设 stored_value 格式为:valuelog_id|offset - Slice tmp(stored_value.data(), stored_value.size()); - GetVarint64(&tmp, &valuelog_id); - GetVarint64(&tmp, &offset); -} - -// 示例:获取 ValueLog 文件 ID -uint64_t GetValueLogID(const std::string& valuelog_name) { - // 使用 std::filesystem::path 解析文件名 - std::filesystem::path file_path(valuelog_name); - std::string filename = file_path.filename().string(); // 获取文件名部分 - - // 查找文件名中的 '.' 位置,提取数字部分 - auto pos = filename.find('.'); - if (pos == std::string::npos) { - assert(0); - } - - // 提取数字部分 - std::string id_str = filename.substr(0, pos); - // 检查提取的部分是否为有效数字 - for (char c : id_str) { - if (!isdigit(c)) { - assert(0); - } - } - - return std::stoull(id_str); // 转换为 uint64_t -} - // 垃圾回收实现 void DBImpl::GarbageCollect() { gc_mutex_.AssertHeld(); @@ -1762,14 +1742,14 @@ void DBImpl::GarbageCollect() { Log(options_.info_log, "start gc "); auto files_set = fs::directory_iterator(dbname_); std::set valuelog_set; - std::string cur_valuelog_name = - ValueLogFileName(dbname_, valuelogfile_number_); + // std::string cur_valuelog_name = + // ValueLogFileName(dbname_, valuelogfile_number_); for (const auto& cur_log_file : files_set) { if (fs::exists(cur_log_file) && fs::is_regular_file(fs::status(cur_log_file)) && IsValueLogFile(cur_log_file.path().filename().string())) { - if (cur_valuelog_name == cur_log_file.path().filename().string()) - continue; + // if (cur_valuelog_name == cur_log_file.path().filename().string()) + // continue; valuelog_set.emplace(cur_log_file.path().filename().string()); } } @@ -1777,6 +1757,9 @@ void DBImpl::GarbageCollect() { // std::cout << valuelog_name << std::endl; uint64_t cur_log_number = GetValueLogID(valuelog_name); valuelog_name = ValueLogFileName(dbname_, cur_log_number); + if (cur_log_number == valuelogfile_number_) { + continue; + } uint64_t current_offset = 0; uint64_t tmp_offset = 0; @@ -1880,7 +1863,22 @@ void DBImpl::GarbageCollect() { // 检查 key 是否在 sstable 中存在 std::string stored_value; - auto option = leveldb::ReadOptions(); + + //lock those thread who attempt to push "key" + spj_mutex_.Lock(); + valuelog_finding_key=key; + spj_mutex_.Unlock(); + //wait for current writer queue to do all their thing + mutex_.Lock(); + if(writers_.size()>0){ + auto last_writer=writers_.back(); + while(!last_writer->done){ + last_writer->cv.Wait(); + } + } + mutex_.Unlock(); + + auto option=leveldb::ReadOptions(); option.find_value_log_for_gc = true; Status status = Get(option, key, &stored_value); @@ -1905,8 +1903,16 @@ void DBImpl::GarbageCollect() { // 记录无效,跳过 continue; } + + auto write_op=leveldb::WriteOptions(); + write_op.valuelog_write=true; + status = Put(write_op, key, value); + + spj_mutex_.Lock(); + valuelog_finding_key=""; + spj_mutex_.Unlock(); + spj_mutex_cond_.SignalAll(); - status = Put(leveldb::WriteOptions(), key, value); if (!status.ok()) { std::cerr << "Error accessing sstable: " << status.ToString() << std::endl; diff --git a/db/db_impl.h b/db/db_impl.h index 530690b..9dda4dd 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -211,14 +211,16 @@ class DBImpl : public DB { // State below is protected by mutex_ port::Mutex mutex_; port::Mutex gc_mutex_; - // port::Mutex spj_mutex_; + port::Mutex spj_mutex_; + port::CondVar spj_mutex_cond_ GUARDED_BY(spj_mutex_); + // std::shared_mutex value_log_mutex; std::atomic shutting_down_; port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_); port::CondVar background_gc_finished_signal_ GUARDED_BY(gc_mutex_); - // Slice valuelog_finding_key GUARDED_BY(mutex_ ); + Slice valuelog_finding_key="" GUARDED_BY(spj_mutex_ ); MemTable* mem_; MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted diff --git a/db/dbformat.h b/db/dbformat.h index a1c30ed..a89056f 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -44,6 +44,9 @@ static const int kMaxMemCompactLevel = 2; // Approximate gap in bytes between samples of data read during iteration. static const int kReadBytesPeriod = 1048576; +// maximum size of value_log file +static const int value_log_size=4<<14; + } // namespace config class InternalKey; diff --git a/include/leveldb/db.h b/include/leveldb/db.h index a788c60..cefa79c 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -183,7 +183,7 @@ class LEVELDB_EXPORT DB { // Therefore the following call will compact the entire database: // db->CompactRange(nullptr, nullptr); virtual void CompactRange(const Slice* begin, const Slice* end) = 0; - virtual void TEST_GarbageCollect() = 0; + virtual void TEST_GarbageCollect(){}; }; diff --git a/include/leveldb/options.h b/include/leveldb/options.h index cad5032..7bd79db 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -185,7 +185,7 @@ struct LEVELDB_EXPORT WriteOptions { // with sync==true has similar crash semantics to a "write()" // system call followed by "fsync()". bool sync = false; - // bool valuelog_write=false; + bool valuelog_write=false; }; } // namespace leveldb diff --git a/test/benchmark_4leveldb.cpp b/test/benchmark_4leveldb.cpp index 7573a04..26a62e0 100644 --- a/test/benchmark_4leveldb.cpp +++ b/test/benchmark_4leveldb.cpp @@ -1,141 +1,200 @@ -#include -#include +#include +#include #include -#include -#include -#include +#include +#include +#include #include "leveldb/db.h" - -// 配置 -const int TEST_EXPONENT = 5; -const int TEST_FREQUENCY = static_cast(std::pow(10, TEST_EXPONENT)); -const int MIN_STR_LEN = 255; -const int MAX_STR_LEN = 1024; -const std::string DB_PATH = "db_benchmark"; -const std::string CHARSET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; - -// 多语言 -const std::string BASE_VALUE = "こんにちは世界!Hello World! Привет, мир! ¡Hola Mundo! 你好,世界!Bonjour le monde! Hallo Welt!"; -// 莎士比亚 -// const std::string BASE_VALUE = "To be, or not to be, that is the question: Whether 'tis nobler in the mind to suffer the slings and arrows of outrageous fortune, or to take arms against a sea of troubles and by opposing end them."; -// 超长字符 -// const std::string BASE_VALUE = []() { -// std::string base = "壹贰叁肆伍陆柒捌玖拾"; -// std::string long_text; -// for (int i = 0; i < 100; ++i) { // 重复 100 次 -// long_text += base; -// } -// return long_text; -// }(); - - -// 随机字符串生成 -std::string randomStr() { - int len = rand() % (MAX_STR_LEN - MIN_STR_LEN + 1) + MIN_STR_LEN; - std::string str(len, '\0'); - for (int i = 0; i < len; ++i) { - str[i] = CHARSET[rand() % CHARSET.size()]; +#include "leveldb/write_batch.h" +#include "leveldb/iterator.h" +#include // For stat to get file size on Unix-like systems +#include // For directory reading on Unix-like systems + +#define THREAD_COUNT 16 // 线程数量 +#define PUT_THREAD_COUNT (THREAD_COUNT / 3) // Put线程数量 +#define DELETE_THREAD_COUNT (THREAD_COUNT / 3) // Delete线程数量 +#define ITERATE_THREAD_COUNT (THREAD_COUNT - PUT_THREAD_COUNT - DELETE_THREAD_COUNT) // Iterate线程数量 +#define VALUE_SIZE 1000 // Value的默认大小 +#define DATABASE_PATH "db_benchmark" // 数据库路径 + +std::mutex put_mutex; +std::mutex delete_mutex; +std::mutex iterate_mutex; + +std::pair put_time_count={0,0}; +std::pair delete_time_count={0,0}; +std::pair iterate_time_count={0,0}; + +// Helper function to generate a random string of a given length +std::string GenerateRandomString(size_t length) { + const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + std::default_random_engine rng(std::random_device{}()); + std::uniform_int_distribution dist(0, sizeof(charset) - 2); + + std::string result; + result.reserve(length); + for (size_t i = 0; i < length; ++i) { + result += charset[dist(rng)]; } - return str; + return result; } -// 计算并输出耗时的模板函数 -template -void measureTime(const std::string& operation, Func func) { - auto start = std::chrono::system_clock::now(); - func(); - auto end = std::chrono::system_clock::now(); - auto duration = std::chrono::duration_cast(end - start); - - double seconds = double(duration.count()) * std::chrono::microseconds::period::num / std::chrono::microseconds::period::den; - - // 输出格式化信息 - std::cout << "Operation: " << operation << "\n"; - std::cout << "Number of operations: " << TEST_FREQUENCY << "\n"; - std::cout << "Total time: " - << std::fixed << std::setprecision(6) << seconds << " seconds\n"; - std::cout << "Average time per operation: " - << std::fixed << std::setprecision(6) - << (seconds / TEST_FREQUENCY) * 1e6 << " microseconds\n"; - std::cout << "========================================\n"; +void PutData(leveldb::DB* db, int thread_id, int num_entries, size_t value_size) { + leveldb::WriteOptions write_options; + write_options.sync = false; + + auto start_time = std::chrono::high_resolution_clock::now(); // 记录开始时间 + + for (int i = 0; i < num_entries; ++i) { + std::string key = "key_" + std::to_string(thread_id) + "_" + std::to_string(i); + std::string value = GenerateRandomString(value_size); + + leveldb::WriteBatch batch; + batch.Put(key, value); + db->Write(write_options, &batch); + } + + auto end_time = std::chrono::high_resolution_clock::now(); // 记录结束时间 + auto duration = std::chrono::duration_cast(end_time - start_time).count(); + put_mutex.lock(); + put_time_count.first+=duration; + put_time_count.second+=num_entries; + put_mutex.unlock(); } -int main() { - leveldb::DB* db; - leveldb::Options options; - options.create_if_missing = true; +void DeleteData(leveldb::DB* db, int thread_id, int num_entries) { + leveldb::WriteOptions write_options; + write_options.sync = false; - // 打开数据库 - leveldb::Status status = leveldb::DB::Open(options, DB_PATH, &db); - assert(status.ok()); - std::cout << "db open: " << DB_PATH << std::endl; + auto start_time = std::chrono::high_resolution_clock::now(); // 记录开始时间 + + for (int i = 0; i < num_entries; ++i) { + std::string key = "key_" + std::to_string(thread_id) + "_" + std::to_string(i); + + leveldb::WriteBatch batch; + batch.Delete(key); + db->Write(write_options, &batch); + } + auto end_time = std::chrono::high_resolution_clock::now(); // 记录结束时间 + auto duration = std::chrono::duration_cast(end_time - start_time).count(); + delete_mutex.lock(); + delete_time_count.first+=duration; + delete_time_count.second+=num_entries; + delete_mutex.unlock(); +} - srand(2017); +void IterateData(leveldb::DB* db, leveldb::ReadOptions& read_options) { + std::unique_ptr it(db->NewIterator(read_options)); - // 生成测试数据 - std::string keys[TEST_FREQUENCY]; - for (int i = 0; i < TEST_FREQUENCY; ++i) { - keys[i] = randomStr(); + auto start_time = std::chrono::high_resolution_clock::now(); // 记录开始时间 + + for (it->SeekToFirst(); it->Valid(); it->Next()) { + // 这里可以选择是否打印键值对,或者仅遍历不做任何操作 + std::cout << "Key: " << it->key().ToString() << ", Value: " << it->value().ToString() << "\n"; } - std::string value = BASE_VALUE; - for (int i = 0; i < 4; ++i) { - value += value; // 扩展 base value + + if (!it->status().ok()) { + std::cerr << "Error during iteration: " << it->status().ToString() << "\n"; } - // 测试添加 - measureTime("ADD", [&]() { - for (int i = 0; i < TEST_FREQUENCY; ++i) { - status = db->Put(leveldb::WriteOptions(), keys[i], value); - assert(status.ok()); - } - }); - - // 测试获取 - measureTime("GET", [&]() { - std::string retrievedValues[TEST_FREQUENCY]; - for (int i = 0; i < TEST_FREQUENCY; ++i) { - status = db->Get(leveldb::ReadOptions(), keys[i], &retrievedValues[i]); - assert(status.ok()); - assert(retrievedValues[i] == value); // 验证获取结果 - } - }); - - // 测试修改 - measureTime("UPDATE", [&]() { - std::string newValue = value + value; - for (int i = 0; i < TEST_FREQUENCY; ++i) { - status = db->Put(leveldb::WriteOptions(), keys[i], newValue); - assert(status.ok()); - } - }); + auto end_time = std::chrono::high_resolution_clock::now(); // 记录结束时间 + auto duration = std::chrono::duration_cast(end_time - start_time).count(); + iterate_mutex.lock(); + iterate_time_count.first+=duration; + iterate_time_count.second++; + iterate_mutex.unlock(); +} - // 测试删除 - measureTime("DELETE", [&]() { - for (int i = 0; i < TEST_FREQUENCY; ++i) { - status = db->Delete(leveldb::WriteOptions(), keys[i]); - assert(status.ok()); - } - }); +// Function to calculate the total size of all files in the database directory +uint64_t CalculateDatabaseSize(const std::string& db_path) { + uint64_t total_size = 0; + DIR* dir = opendir(db_path.c_str()); + if (dir == nullptr) { + std::cerr << "Failed to open directory: " << db_path << "\n"; + return total_size; + } - if (db) { - delete db; - db = nullptr; + struct dirent* entry; + while ((entry = readdir(dir)) != nullptr) { + if (entry->d_type == DT_REG) { // Only consider regular files + std::string full_path = db_path + "/" + entry->d_name; + struct stat file_stat; + if (stat(full_path.c_str(), &file_stat) == 0) { + total_size += file_stat.st_size; + } + } } - std::cout << "Test completed, database has been closed." << std::endl; - // Delete database directory + closedir(dir); + return total_size; +} + +void CleanupDatabase(const std::string& db_path) { + /// Delete database directory #ifdef _WIN32 - std::string command = "rd /s /q \"" + DB_PATH + "\""; // Windows delete directory + std::string command = "rd /s /q \"" + db_path + "\""; // Windows delete directory #else - std::string command = "rm -rf \"" + DB_PATH + "\""; // Linux/macOS delete directory + std::string command = "rm -rf \"" + db_path + "\""; // Linux/macOS delete directory #endif - if (std::system(command.c_str()) == 0) { std::cout << "Database directory has been successfully deleted" << std::endl; } else { std::cerr << "Warning: Failed to delete the database directory. Please check manually!" << std::endl; } +} + +int main() { + leveldb::DB* db; + leveldb::Options options; + options.create_if_missing = true; + leveldb::Status status = leveldb::DB::Open(options, DATABASE_PATH, &db); + if (!status.ok()) { + std::cerr << "Unable to open/create database: " << status.ToString() << "\n"; + return 1; + } + + const int entries_per_thread = 1000000; // 每个线程执行的操作次数 + std::vector threads; + + // Create snapshot for iterate threads + leveldb::ReadOptions read_options; + read_options.snapshot = db->GetSnapshot(); + + // Start threads for Put operations + for (int i = 0; i < PUT_THREAD_COUNT; ++i) { + threads.emplace_back(PutData, db, i, entries_per_thread, VALUE_SIZE); + } + + // Start threads for Delete operations + for (int i = 0; i < DELETE_THREAD_COUNT; ++i) { + threads.emplace_back(DeleteData, db, i, entries_per_thread); + } + std::this_thread::sleep_for(std::chrono::seconds(10)); + // Start threads for Iterate operations + for (int i = 0; i < ITERATE_THREAD_COUNT; ++i) { + threads.emplace_back(IterateData, db, std::ref(read_options)); + } + + // Wait for all threads to finish + for (auto& th : threads) { + if (th.joinable()) th.join(); + } + threads.clear(); + + // Release the snapshot after all threads have finished + db->ReleaseSnapshot(read_options.snapshot); + + // Close the database + delete db; + std::cout<<"Put average time(per second):"< values; - for(int i=0;i<500000;i++){ + for(int i=0;i<5000;i++){ std::string key=std::to_string(i); std::string value; for(int j=0;j<1000;j++){ @@ -212,7 +212,7 @@ TEST(Test, Garbage_Collect_TEST) { db->TEST_GarbageCollect(); std::cout<<"finish gc"< + + namespace leveldb { @@ -153,4 +156,56 @@ bool GetLengthPrefixedSlice(Slice* input, Slice* result) { } } +// 判断文件是否为 valuelog 文件 +bool IsValueLogFile(const std::string& filename) { + // 检查文件是否以 ".valuelog" 结尾 + const std::string suffix = ".valuelog"; + return filename.size() > suffix.size() && + filename.substr(filename.size() - suffix.size()) == suffix; +} + +// 示例:解析 sstable 中的元信息 +void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id, + uint64_t& offset) { + // 假设 stored_value 格式为:valuelog_id|offset + Slice tmp(stored_value.data(), stored_value.size()); + GetVarint64(&tmp, &valuelog_id); + GetVarint64(&tmp, &offset); +} + +// 示例:获取 ValueLog 文件 ID +uint64_t GetValueLogID(const std::string& valuelog_name) { + // 使用 std::filesystem::path 解析文件名 + std::filesystem::path file_path(valuelog_name); + std::string filename = file_path.filename().string(); // 获取文件名部分 + + // 查找文件名中的 '.' 位置,提取数字部分 + auto pos = filename.find('.'); + if (pos == std::string::npos) { + assert(0); + } + + // 提取数字部分 + std::string id_str = filename.substr(0, pos); + // 检查提取的部分是否为有效数字 + for (char c : id_str) { + if (!isdigit(c)) { + assert(0); + } + } + + return std::stoull(id_str); // 转换为 uint64_t +} + +// Helper function to split the set of files into chunks +void SplitIntoChunks(const std::set& files, int num_workers, + std::vector>* chunks) { + chunks->resize(num_workers); + int index = 0; + for (const auto& file : files) { + (*chunks)[index % num_workers].push_back(file); + ++index; + } +} + } // namespace leveldb diff --git a/util/coding.h b/util/coding.h index f0bb57b..5a5c503 100644 --- a/util/coding.h +++ b/util/coding.h @@ -13,6 +13,8 @@ #include #include #include +#include +#include #include "leveldb/slice.h" #include "port/port.h" @@ -117,6 +119,13 @@ inline const char* GetVarint32Ptr(const char* p, const char* limit, return GetVarint32PtrFallback(p, limit, value); } +bool IsValueLogFile(const std::string& filename); +void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id, + uint64_t& offset); +uint64_t GetValueLogID(const std::string& valuelog_name); +void SplitIntoChunks(const std::set& files, int num_workers, + std::vector>* chunks); + } // namespace leveldb #endif // STORAGE_LEVELDB_UTIL_CODING_H_