Browse Source

final update for codes

version_3
alexfisher 8 months ago
parent
commit
b3bc7e515f
14 changed files with 551 additions and 730 deletions
  1. +2
    -10
      CMakeLists.txt
  2. +311
    -284
      db/db_impl.cc
  3. +7
    -16
      db/db_impl.h
  4. +22
    -19
      db/fields.cc
  5. +42
    -52
      db/true_iter.cc
  6. +7
    -11
      db/unordered_iter.cc
  7. +2
    -1
      db/version_set.cc
  8. +5
    -0
      db/write_batch.cc
  9. +29
    -31
      include/leveldb/db.h
  10. +3
    -1
      include/leveldb/fields.h
  11. +0
    -200
      test/benchmark_4leveldb.cpp
  12. +119
    -22
      test/test.cpp
  13. +0
    -81
      test/test2.cpp
  14. +2
    -2
      设计文档.md

+ 2
- 10
CMakeLists.txt View File

@ -525,15 +525,7 @@ if(LEVELDB_INSTALL)
DESTINATION "${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}"
)
endif(LEVELDB_INSTALL)
add_executable(db_test1
add_executable(valuelogTest
"${PROJECT_SOURCE_DIR}/test/test.cpp"
)
target_link_libraries(db_test1 PRIVATE leveldb gtest)
add_executable(db_test2
"${PROJECT_SOURCE_DIR}/test/test2.cpp"
)
target_link_libraries(db_test2 PRIVATE leveldb gtest)
add_executable(db_test_bench
"${PROJECT_SOURCE_DIR}/test/benchmark_4leveldb.cpp"
)
target_link_libraries(db_test_bench PRIVATE leveldb gtest)
target_link_libraries(valuelogTest PRIVATE leveldb gtest)

+ 311
- 284
db/db_impl.cc
File diff suppressed because it is too large
View File


+ 7
- 16
db/db_impl.h View File

@ -41,18 +41,6 @@ class DBImpl : public DB {
~DBImpl() override;
// //
// std::string SerializeValue(const FieldArray& fields)override;
// //
// FieldArray ParseValue(const std::string& value_str)override;
// Status Put_with_fields(const WriteOptions& options, const Slice& key,const
// FieldArray& fields)override;
// Status Get_with_fields(const ReadOptions& options, const Slice& key,
// FieldArray* fields)override;
// Implementations of the DB interface
Status Put(const WriteOptions&, const Slice& key,
const Slice& value) override;
@ -68,8 +56,6 @@ class DBImpl : public DB {
bool GetProperty(const Slice& property, std::string* value) override;
void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override;
void CompactRange(const Slice* begin, const Slice* end) override;
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>>
// WriteValueLog(std::vector<Slice> value)override;
std::vector<std::pair<uint64_t, uint64_t>> WriteValueLog(
std::vector<std::pair<Slice, Slice>> value) override;
void addNewValueLog() override EXCLUSIVE_LOCKS_REQUIRED(mutex_);
@ -88,7 +74,7 @@ class DBImpl : public DB {
// Force current memtable contents to be compacted.
Status TEST_CompactMemTable();
void TEST_GarbageCollect() override;
void manual_GarbageCollect() override;
// Return an internal iterator over the current state of the database.
@ -236,9 +222,12 @@ class DBImpl : public DB {
uint64_t valuelogfile_number_=0;
log::Writer* log_;
std::map<uint64_t, uint64_t> oldvaluelog_ids;
int mem_value_log_number_;//if =0, don't use cache
int mem_value_log_number_;//if =0, don't use cache for valuelog
Cache* valuelog_cache;
// count of record live in a valuelog
std::map<uint64_t, uint64_t> valuelog_usage;
// count of record written in a valuelog
std::map<uint64_t, uint64_t> valuelog_origin;
std::thread* gc_thread=nullptr;
@ -266,10 +255,12 @@ class DBImpl : public DB {
VersionSet* const versions_ GUARDED_BY(mutex_);
//better to be larger then 500.
int use_valuelog_length=5000;
int value_log_size_;
// if on, the database will use crc check to protect valuelog from any abnormal byte
bool valuelog_crc_;
// Have we encountered a background error in paranoid mode?

+ 22
- 19
db/fields.cc View File

@ -2,8 +2,6 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef STORAGE_LEVELDB_INCLUDE_FIELDS_H_
#define STORAGE_LEVELDB_INCLUDE_FIELDS_H_
#include <string>
@ -21,43 +19,48 @@ namespace leveldb {
return res_;
}
void DeserializeValue(const std::string& value_str,FieldArray* res){
Status DeserializeValue(const std::string& value_str,FieldArray* res){
Slice slice=Slice(value_str.c_str());
uint64_t siz;
bool tmpres=GetVarint64(&slice,&siz);
assert(tmpres);
if(!tmpres)return Status::Corruption("Deserialize fail");
res->clear();
for(int i=0;i<siz;i++){
Slice value_name;
Slice value;
tmpres=GetLengthPrefixedSlice(&slice,&value_name);
assert(tmpres);
if(!tmpres)return Status::Corruption("Deserialize fail");
tmpres=GetLengthPrefixedSlice(&slice,&value);
assert(tmpres);
res->emplace_back(value_name,value);
if(!tmpres)return Status::Corruption("Deserialize fail");
res->emplace_back(std::string(value_name.data(),value_name.size()),std::string(value.data(),value.size()));
}
return Status::OK();
}
Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,std::vector<std::string> *keys){
auto it=db->NewUnorderedIterator(options,Slice(),Slice());
keys->clear();
while(it->Valid()){
while(it->Valid()&&it->status().ok()){
auto val=it->value();
FieldArray arr;
auto str_val=std::string(val.data(),val.size());
DeserializeValue(str_val,&arr);
for(auto pr:arr){
if(pr.first==field.first&&pr.second==field.second){
Slice key=it->key();
keys->push_back(std::string(key.data(),key.size()));
break;
}
}
auto res=DeserializeValue(str_val,&arr);
if(res.ok())
for(const auto &pr:arr){
if(pr.first==field.first&&pr.second==field.second){
Slice key=it->key();
keys->push_back(std::string(key.data(),key.size()));
break;
}
}
it->Next();
}
if(it->Valid()&&!it->status().ok()){
auto res=it->status();
delete it;
return res;
}
delete it;
return Status::OK();
}
}
#endif // STORAGE_LEVELDB_INCLUDE_FIELDS_H_
}

+ 42
- 52
db/true_iter.cc View File

@ -1,62 +1,52 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <iostream>
#include <fstream>
#include <thread>
#include <queue>
#include "db/true_iter.h"
#include "db/db_impl.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include <fstream>
#include <iostream>
#include <queue>
#include <thread>
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "port/port.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "port/port.h"
namespace leveldb {
namespace {
// Memtables and sstables that make the DB representation contain
// (userkey,seq,type) => uservalue entries. DBTrueIter
// combines multiple entries for the same userkey found in the DB
// representation into a single entry while accounting for sequence
// numbers, deletion markers, overwrites, etc.
// DBTrueIter simply wrap the DbIter and will parse the true value (maybe from
// valuelog) for user.
// if the crc check fail, DBTrueIter will skip the bad record
// and show it to user by status().
// bad status will remain showed by status(), like other iterator.
class DBTrueIter : public Iterator {
public:
// Which direction is the iterator currently moving?
// (1) When moving forward, the internal iterator is positioned at
// the exact entry that yields this->key(), this->value()
// (2) When moving backwards, the internal iterator is positioned
// just before all entries whose user key == this->key().
DBTrueIter(DBImpl* db, Iterator* iter,bool check_crc)
:
db_(db),iter_(iter),check_crc_(check_crc){}
DBTrueIter(DBImpl* db, Iterator* iter, bool check_crc)
: db_(db), iter_(iter), check_crc_(check_crc) {}
DBTrueIter(const DBTrueIter&) = delete;
DBTrueIter& operator=(const DBTrueIter&) = delete;
~DBTrueIter() override {
delete iter_;
}
~DBTrueIter() override { delete iter_; }
bool Valid() const override { return iter_->Valid(); }
Slice key() const override {
return iter_->key();
}
Slice key() const override { return iter_->key(); }
Slice value() const override {
return Slice(buf_for_value.data(),buf_for_value.size());
return Slice(buf_for_value.data(), buf_for_value.size());
}
Status status() const override {
if(status_.ok())
if (status_.ok())
return iter_->status();
else return status_;
else
return status_;
}
void Next() override;
@ -66,60 +56,60 @@ class DBTrueIter : public Iterator {
void SeekToLast() override;
private:
Status GetAndParseTrueValue(Slice tmp_value){
Status status=db_->parseTrueValue(&tmp_value,&buf_for_value,check_crc_);
if(!status.ok())status_=status;
Status GetAndParseTrueValue(Slice tmp_value) {
Status status = db_->parseTrueValue(&tmp_value, &buf_for_value, check_crc_);
if (!status.ok()) status_ = status;
return status;
}
DBImpl* db_;
Iterator* const iter_;
std::string buf_for_value;
Status status_=Status::OK();
Status status_ = Status::OK();
bool check_crc_;
};
void DBTrueIter::Next() {
iter_->Next();
if(iter_->Valid()){
Status res=GetAndParseTrueValue(iter_->value());
if(!res.ok())Next();
while (iter_->Valid()) {
Status res = GetAndParseTrueValue(iter_->value());
if (res.ok()) break;
iter_->Next();
}
}
void DBTrueIter::Prev() {
iter_->Prev();
if(iter_->Valid()){
Status res=GetAndParseTrueValue(iter_->value());
if(!res.ok())Prev();
while (iter_->Valid()) {
Status res = GetAndParseTrueValue(iter_->value());
if (res.ok()) break;
iter_->Prev();
}
}
void DBTrueIter::Seek(const Slice& target) {
iter_->Seek(target);
if(iter_->Valid()){
Status res=GetAndParseTrueValue(iter_->value());
if(!res.ok())Next();//lowerbound
if (iter_->Valid()) {
Status res = GetAndParseTrueValue(iter_->value());
if (!res.ok()) Next(); // lowerbound
}
}
void DBTrueIter::SeekToFirst() {
iter_->SeekToFirst();
if(iter_->Valid()){
Status res=GetAndParseTrueValue(iter_->value());
if(!res.ok())Next();
if (iter_->Valid()) {
Status res = GetAndParseTrueValue(iter_->value());
if (!res.ok()) Next();
}
}
void DBTrueIter::SeekToLast() {
iter_->SeekToLast();
if(iter_->Valid()){
Status res=GetAndParseTrueValue(iter_->value());
if(!res.ok())Prev();
if (iter_->Valid()) {
Status res = GetAndParseTrueValue(iter_->value());
if (!res.ok()) Prev();
}
}
} // anonymous namespace
Iterator* NewTrueIterator(DBImpl* db,Iterator* db_iter,bool check_crc) {
return new DBTrueIter(db,db_iter,check_crc);
Iterator* NewTrueIterator(DBImpl* db, Iterator* db_iter, bool check_crc) {
return new DBTrueIter(db, db_iter, check_crc);
}
} // namespace leveldb

+ 7
- 11
db/unordered_iter.cc View File

@ -26,19 +26,15 @@ namespace leveldb {
namespace {
// Memtables and sstables that make the DB representation contain
// (userkey,seq,type) => uservalue entries. UnorderedIter
// combines multiple entries for the same userkey found in the DB
// representation into a single entry while accounting for sequence
// numbers, deletion markers, overwrites, etc.
// DBTrueIter is similar to true_iter.
// The following are the unique points of unordered_iter
// 1.the data return by it will be unordered.
// 2.use its own memory space instead of valuelog cache.
// 3.In most cases, it can save more memory, be faster, and more stable.
// 4.Can not use Seek, SeekToFirst and SeekToLast, use lower_key and upper_key instead.
// 5.return all data in: [lower_key, upper_key)
class UnorderedIter : public Iterator {
public:
// Which direction is the iterator currently moving?
// (1) When moving forward, the internal iterator is positioned at
// the exact entry that yields this->key(), this->value()
// (2) When moving backwards, the internal iterator is positioned
// just before all entries whose user key == this->key().
enum IterPos {Left,Mid,Right};
UnorderedIter(DBImpl* db, Iterator* iter,std::string db_name,ReadOptions readOptions,const Slice &lower_key,const Slice &upper_key,const Comparator* user_comparator)

+ 2
- 1
db/version_set.cc View File

@ -83,8 +83,9 @@ Version::~Version() {
}
}
//update related valuelog's ref
for(auto valuelog_name:old_valuelog_names){
vset_->valuelogmap_mutex.Lock();
vset_->valuelogmap_mutex.Lock();//lock to visit old_valuelog_map
int res=vset_->old_valuelog_map[valuelog_name]--;
if(res==1){
vset_->env_->RemoveFile(valuelog_name);

+ 5
- 0
db/write_batch.cc View File

@ -127,6 +127,9 @@ class MemTableInserter : public WriteBatch::Handler {
sequence_++;
}
};
// will change the order in the writebatch!
// use batch_insert to improve performance
class ValueLogInserter : public WriteBatch::Handler {
public:
WriteBatch writeBatch_;
@ -188,6 +191,8 @@ class ValueLogChecker : public WriteBatch::Handler {
keys.push_back(key);
}
//check if all data in the writebatch is different from the given key (the key gc is searching)
//if find a key was the same to target key, then all keys must be scaned again.
void CheckValid(){
int len=keys.size();
if(!len)return;

+ 29
- 31
include/leveldb/db.h View File

@ -7,19 +7,19 @@
#include <cstdint>
#include <cstdio>
#include <vector>
#include "leveldb/export.h"
#include "leveldb/iterator.h"
#include "leveldb/options.h"
#include <vector>
namespace leveldb {
// Update CMakeLists.txt if you change these
static const int kMajorVersion = 1;
static const int kMinorVersion = 23;
using Field=std::pair<Slice,Slice>;
using FieldArray=std::vector<std::pair<Slice, Slice>>;
using Field = std::pair<std::string, std::string>;
using FieldArray = std::vector<std::pair<std::string, std::string>>;
struct Options;
struct ReadOptions;
@ -90,42 +90,33 @@ class LEVELDB_EXPORT DB {
virtual Status Get(const ReadOptions& options, const Slice& key,
std::string* value) = 0;
// virtual std::string SerializeValue(const FieldArray& fields);
// //
// virtual void DeserializeValue(const std::string& value_str,FieldArray* res);
// virtual Status Put_with_fields(const WriteOptions& options, const Slice& key,const FieldArray& fields);
// virtual Status Get_with_fields(const ReadOptions& options, const Slice& key,
// FieldArray* fields);
// virtual Status Get_keys_by_field(const ReadOptions& options, const Field field,std::vector<std::string> *keys);
// virtual std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> WriteValueLog(std::vector<Slice> value){
// assert(0);
// std::vector<std::pair<uint64_t,std::pair<uint64_t,uint64_t>>> v;
// return v;
// }
virtual std::vector<std::pair<uint64_t,uint64_t>> WriteValueLog(std::vector<std::pair<Slice,Slice>> value){
// write a batch of k-v to a valuelog
// return the file_id and offset in valuelog as a pair for every k-v
virtual std::vector<std::pair<uint64_t, uint64_t>> WriteValueLog(
std::vector<std::pair<Slice, Slice>> value) {
assert(0);
std::vector<std::pair<uint64_t,uint64_t>> v;
std::vector<std::pair<uint64_t, uint64_t>> v;
return v;
}
virtual void addNewValueLog(){assert(0);}
// add a new valuelog for database, should be only used while holding mutex_
virtual void addNewValueLog() { assert(0); }
virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, std::string* value,bool check_crc){
assert(0); // Not implemented
// read value from a valuelog
// should be only used while not holding mutex_
virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,
std::string* value, bool check_crc) {
assert(0);
return Status::Corruption("not imp");
}
virtual Status parseTrueValue(Slice* value,std::string* true_value,bool checkcrc){
// parse the real value from a value provided by the lsm-tree
virtual Status parseTrueValue(Slice* value, std::string* true_value,
bool checkcrc) {
assert(0);
return Status::Corruption("not imp");
}
// Return a heap-allocated iterator over the contents of the database.
// The result of NewIterator() is initially invalid (caller must
// call one of the Seek methods on the iterator before using it).
@ -134,7 +125,13 @@ class LEVELDB_EXPORT DB {
// The returned iterator should be deleted before this db is deleted.
virtual Iterator* NewIterator(const ReadOptions& options) = 0;
virtual Iterator* NewUnorderedIterator(const ReadOptions&,const Slice &lower_key,const Slice &upper_key){
// Similar to NewIterator().
// return all data in: [lower_key, upper_key)
// the data returned by NewUnorderedIterator() is unordered.
// Seek, SeekToLast, SeekToFirst is invalid for this.
virtual Iterator* NewUnorderedIterator(const ReadOptions&,
const Slice& lower_key,
const Slice& upper_key) {
assert(0);
return nullptr;
};
@ -189,10 +186,11 @@ class LEVELDB_EXPORT DB {
// Therefore the following call will compact the entire database:
// db->CompactRange(nullptr, nullptr);
virtual void CompactRange(const Slice* begin, const Slice* end) = 0;
virtual void TEST_GarbageCollect(){};
// trigger a manual garbagecollection.
// it will only return when the garbagecollection is finished.
// might be a very time-consuming operation
virtual void manual_GarbageCollect() {};
};
// Destroy the contents of the specified database.

+ 3
- 1
include/leveldb/fields.h View File

@ -10,9 +10,11 @@
#include "leveldb/db.h"
namespace leveldb {
//Serialize vector<string,string> to a single string
std::string SerializeValue(const FieldArray& fields);
void DeserializeValue(const std::string& value_str,FieldArray* res);
//Deserialize vector<string,string> from a single string
Status DeserializeValue(const std::string& value_str,FieldArray* res);
Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,std::vector<std::string> *keys);
}

+ 0
- 200
test/benchmark_4leveldb.cpp View File

@ -1,200 +0,0 @@
#include <thread>
#include <vector>
#include <iostream>
#include <random>
#include <string>
#include <mutex>
#include "leveldb/db.h"
#include "leveldb/write_batch.h"
#include "leveldb/iterator.h"
#include <sys/stat.h> // For stat to get file size on Unix-like systems
#include <dirent.h> // For directory reading on Unix-like systems
#define THREAD_COUNT 16 // 线程数量
#define PUT_THREAD_COUNT (THREAD_COUNT / 3) // Put线程数量
#define DELETE_THREAD_COUNT (THREAD_COUNT / 3) // Delete线程数量
#define ITERATE_THREAD_COUNT (THREAD_COUNT - PUT_THREAD_COUNT - DELETE_THREAD_COUNT) // Iterate线程数量
#define VALUE_SIZE 1000 // Value的默认大小
#define DATABASE_PATH "db_benchmark" // 数据库路径
std::mutex put_mutex;
std::mutex delete_mutex;
std::mutex iterate_mutex;
std::pair<int,int> put_time_count={0,0};
std::pair<int,int> delete_time_count={0,0};
std::pair<int,int> iterate_time_count={0,0};
// Helper function to generate a random string of a given length
std::string GenerateRandomString(size_t length) {
const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
std::default_random_engine rng(std::random_device{}());
std::uniform_int_distribution<int> dist(0, sizeof(charset) - 2);
std::string result;
result.reserve(length);
for (size_t i = 0; i < length; ++i) {
result += charset[dist(rng)];
}
return result;
}
void PutData(leveldb::DB* db, int thread_id, int num_entries, size_t value_size) {
leveldb::WriteOptions write_options;
write_options.sync = false;
auto start_time = std::chrono::high_resolution_clock::now(); // 记录开始时间
for (int i = 0; i < num_entries; ++i) {
std::string key = "key_" + std::to_string(thread_id) + "_" + std::to_string(i);
std::string value = GenerateRandomString(value_size);
leveldb::WriteBatch batch;
batch.Put(key, value);
db->Write(write_options, &batch);
}
auto end_time = std::chrono::high_resolution_clock::now(); // 记录结束时间
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();
put_mutex.lock();
put_time_count.first+=duration;
put_time_count.second+=num_entries;
put_mutex.unlock();
}
void DeleteData(leveldb::DB* db, int thread_id, int num_entries) {
leveldb::WriteOptions write_options;
write_options.sync = false;
auto start_time = std::chrono::high_resolution_clock::now(); // 记录开始时间
for (int i = 0; i < num_entries; ++i) {
std::string key = "key_" + std::to_string(thread_id) + "_" + std::to_string(i);
leveldb::WriteBatch batch;
batch.Delete(key);
db->Write(write_options, &batch);
}
auto end_time = std::chrono::high_resolution_clock::now(); // 记录结束时间
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();
delete_mutex.lock();
delete_time_count.first+=duration;
delete_time_count.second+=num_entries;
delete_mutex.unlock();
}
void IterateData(leveldb::DB* db, leveldb::ReadOptions& read_options) {
std::unique_ptr<leveldb::Iterator> it(db->NewIterator(read_options));
auto start_time = std::chrono::high_resolution_clock::now(); // 记录开始时间
for (it->SeekToFirst(); it->Valid(); it->Next()) {
// 这里可以选择是否打印键值对,或者仅遍历不做任何操作
std::cout << "Key: " << it->key().ToString() << ", Value: " << it->value().ToString() << "\n";
}
if (!it->status().ok()) {
std::cerr << "Error during iteration: " << it->status().ToString() << "\n";
}
auto end_time = std::chrono::high_resolution_clock::now(); // 记录结束时间
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();
iterate_mutex.lock();
iterate_time_count.first+=duration;
iterate_time_count.second++;
iterate_mutex.unlock();
}
// Function to calculate the total size of all files in the database directory
uint64_t CalculateDatabaseSize(const std::string& db_path) {
uint64_t total_size = 0;
DIR* dir = opendir(db_path.c_str());
if (dir == nullptr) {
std::cerr << "Failed to open directory: " << db_path << "\n";
return total_size;
}
struct dirent* entry;
while ((entry = readdir(dir)) != nullptr) {
if (entry->d_type == DT_REG) { // Only consider regular files
std::string full_path = db_path + "/" + entry->d_name;
struct stat file_stat;
if (stat(full_path.c_str(), &file_stat) == 0) {
total_size += file_stat.st_size;
}
}
}
closedir(dir);
return total_size;
}
void CleanupDatabase(const std::string& db_path) {
/// Delete database directory
#ifdef _WIN32
std::string command = "rd /s /q \"" + db_path + "\""; // Windows delete directory
#else
std::string command = "rm -rf \"" + db_path + "\""; // Linux/macOS delete directory
#endif
if (std::system(command.c_str()) == 0) {
std::cout << "Database directory has been successfully deleted" << std::endl;
} else {
std::cerr << "Warning: Failed to delete the database directory. Please check manually!" << std::endl;
}
}
int main() {
leveldb::DB* db;
leveldb::Options options;
options.create_if_missing = true;
leveldb::Status status = leveldb::DB::Open(options, DATABASE_PATH, &db);
if (!status.ok()) {
std::cerr << "Unable to open/create database: " << status.ToString() << "\n";
return 1;
}
const int entries_per_thread = 1000000; // 每个线程执行的操作次数
std::vector<std::thread> threads;
// Create snapshot for iterate threads
leveldb::ReadOptions read_options;
read_options.snapshot = db->GetSnapshot();
// Start threads for Put operations
for (int i = 0; i < PUT_THREAD_COUNT; ++i) {
threads.emplace_back(PutData, db, i, entries_per_thread, VALUE_SIZE);
}
// Start threads for Delete operations
for (int i = 0; i < DELETE_THREAD_COUNT; ++i) {
threads.emplace_back(DeleteData, db, i, entries_per_thread);
}
std::this_thread::sleep_for(std::chrono::seconds(10));
// Start threads for Iterate operations
for (int i = 0; i < ITERATE_THREAD_COUNT; ++i) {
threads.emplace_back(IterateData, db, std::ref(read_options));
}
// Wait for all threads to finish
for (auto& th : threads) {
if (th.joinable()) th.join();
}
threads.clear();
// Release the snapshot after all threads have finished
db->ReleaseSnapshot(read_options.snapshot);
// Close the database
delete db;
std::cout<<"Put average time(per second):"<<put_time_count.first<<" "<<put_time_count.second<<std::endl;
std::cout<<"Delete average time(per second):"<<delete_time_count.first<<" "<<delete_time_count.second<<std::endl;
std::cout<<"Iterate average time(per second):"<<iterate_time_count.first<<" "<<iterate_time_count.second<<std::endl;
// Calculate and print the total size of the database files
uint64_t db_size = CalculateDatabaseSize(DATABASE_PATH);
std::cout << "Total size of database files: " << db_size << " bytes\n";
// Cleanup the database
CleanupDatabase(DATABASE_PATH);
return 0;
}

+ 119
- 22
test/test.cpp View File

@ -245,28 +245,46 @@ TEST(Test, fields_simple_test) {
std::cerr << "open db failed" << std::endl;
abort();
}
std::string key1 = "k_1";
FieldArray fields1 = {
{"name", "Customer#000000001"},
{"address", "IVhzIApeRb"},
{"phone", "25-989-741-2988"}
};
auto value1=SerializeValue(fields1);
db->Put(WriteOptions(), key1, value1);
std::string value_ret;
FieldArray res1;
std::vector<FieldArray> values;
for(int i=0;i<300;i++){
auto key=GenKeyByNum(i,300);
std::string true_value;
FieldArray value;
if(i%5){
value.push_back({key,std::to_string(rand()%10000)});
value.push_back({std::to_string(rand()%10000),std::to_string(rand()%10000)});
value.push_back({std::to_string(rand()%10000),std::to_string(rand()%10000)});
value.push_back({std::to_string(rand()%10000),std::to_string(rand()%10000)});
value.push_back({std::to_string(rand()%10000),std::to_string(rand()%10000)});
values.push_back(value);
true_value=SerializeValue(value);
{
FieldArray tmpvalue;
ASSERT_TRUE(DeserializeValue(true_value,&tmpvalue).ok());
ASSERT_TRUE(tmpvalue==value);
}
}
else true_value=std::to_string(rand()%10000);
db->Put(writeOptions,key,true_value);
}
db->Get(ReadOptions(), key1, &value_ret);
DeserializeValue(value_ret, &res1);
ASSERT_TRUE(CompareFieldArray(fields1, res1));
int cnt=0;
for(int i=0;i<300;i++){
auto key=GenKeyByNum(i,300);
std::string true_value;
FieldArray value;
db->Get(readOptions,key,&true_value);
db->Delete(WriteOptions(),key1);
if(i%5){
ASSERT_TRUE(DeserializeValue(true_value,&value).ok());
ASSERT_TRUE(values[cnt++]==value);
}
else{
ASSERT_FALSE(DeserializeValue(true_value,&value).ok());
}
}
delete db;
}
TEST(Test, get_keys_by_field_test) {
@ -280,6 +298,7 @@ TEST(Test, get_keys_by_field_test) {
std::vector<std::string> target_keys;
for(int i=0;i<10000;i++){
std::string key=std::to_string(rand()%10000)+"_"+std::to_string(i);//random for generate nonincreasing keys
std::string value;
FieldArray fields={
{"name", key},
{"address", std::to_string(rand()%7)},
@ -288,9 +307,16 @@ TEST(Test, get_keys_by_field_test) {
if(rand()%5==0){
fields[0].second="special_key";
target_keys.push_back(key);
value=SerializeValue(fields);
}
else if(rand()%123==0){
value=std::to_string(rand()%10000);
}
else{
value=SerializeValue(fields);
}
keys.push_back(key);
db->Put(WriteOptions(),key,SerializeValue(fields));
db->Put(WriteOptions(),key,value);
}
std::sort(target_keys.begin(),target_keys.end());
std::vector<std::string> key_res;
@ -449,6 +475,69 @@ TEST(Test, valuelog_corruption_test) {
delete db;
}
TEST(Test, valuelog_whole_file_corruption_test) {
DB *db;
WriteOptions writeOptions;
ReadOptions readOptions;
readOptions.verify_checksums_for_valuelog=true;
Options dbOptions;
dbOptions.use_valuelog_length=100;
dbOptions.valuelog_gc=false;
dbOptions.value_log_size=1<<26;
dbOptions.valuelog_crc=true;
//a record size:8+4+8+4*5000+(4)=20024
//64*1024*1024/20024=3351.42
if(OpenDB(&db,dbOptions).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
//test Put
std::vector<std::string> values;
for(int i=0;i<5000;i++){
std::string key=GenKeyByNum(i,5000);
std::string value;
for(int j=0;j<5000;j++){
value+=key;
}
values.push_back(value);
db->Put(writeOptions,key,value);
}
for(int i=0;i<5000;i++){
std::string key=GenKeyByNum(i,5000);
std::string value;
Status s=db->Get(readOptions,key,&value);
assert(s.ok());
ASSERT_TRUE(values[i]==value);
}
std::vector<std::string> files;
auto env_=Env::Default();
ASSERT_TRUE(env_->GetChildren(dbName, &files).ok());
for(auto file:files){
uint64_t number;
FileType fileType;
ParseFileName(file,&number,&fileType);
if(fileType==FileType::kValueLogFile){
env_->RemoveFile(dbName+ "/" + file);
}
}
//the second record is corrupt,
for(int i=0;i<5000;i++){
std::string key=GenKeyByNum(i,5000);
std::string value;
ASSERT_FALSE(db->Get(readOptions,key,&value).ok());
}
auto iter=db->NewIterator(readOptions);
iter->SeekToFirst();
ASSERT_FALSE(iter->Valid());
delete iter;
delete db;
}
TEST(Test, garbage_collect_test) {
DB *db;
@ -493,7 +582,7 @@ TEST(Test, garbage_collect_test) {
ASSERT_TRUE(oldest_valuelog_id<1000);
db->CompactRange(nullptr,nullptr);//create garbage
db->TEST_GarbageCollect();
db->manual_GarbageCollect();
for(int i=0;i<50000;i++){
std::string key=std::to_string(i);
std::string value;
@ -592,7 +681,15 @@ TEST(Test, recovery_test){
}
ASSERT_TRUE(oldest_valuelog_id<1000);
db->CompactRange(nullptr,nullptr);//create garbage
db->TEST_GarbageCollect();
db->manual_GarbageCollect();
for(int i=0;i<5000;i++){
std::string key=GenKeyByNum(i,5000);
std::string value;
for(int j=0;j<5000;j++){
value+=key;
}
db->Put(writeOptions,key,value);
}
db->CompactRange(nullptr,nullptr);//update version
std::vector<std::string> new_filenames;

+ 0
- 81
test/test2.cpp View File

@ -1,81 +0,0 @@
#include "gtest/gtest.h"
#include "leveldb/env.h"
#include "leveldb/db.h"
#include "util/coding.h"
#include <iostream>
#include <chrono>
using namespace std::chrono;
using namespace leveldb;
using Field=std::pair<Slice,Slice>;
using FieldArray=std::vector<std::pair<Slice, Slice>>;
int data_number=100000;
Status OpenDB(std::string dbName, DB **db) {
Options options;
options.max_file_size=16*1024;
options.write_buffer_size=32*1024;
options.create_if_missing = true;
return DB::Open(options, dbName, db);
}
TEST(Test, Garbage_Collect_TEST) {
DB *db;
WriteOptions writeOptions;
ReadOptions readOptions;
if(OpenDB("testdb_for_XOY_large", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
std::vector<std::string> values;
for(int i=0;i<data_number;i++){
std::string key=std::to_string(i);
std::string value;
for(int j=0;j<1000;j++){
value+=std::to_string(i);
}
values.push_back(value);
db->Put(writeOptions,key,value);
}
// for(int i=0;i<data_number;i++){
// std::string key=std::to_string(i);
// std::string value;
// for(int j=0;j<1000;j++){
// value+=std::to_string(i);
// }
// values.push_back(value);
// db->Put(writeOptions,key,value);
// }
// Measure GC time
auto start_time = high_resolution_clock::now();
db->TEST_GarbageCollect();
auto end_time = high_resolution_clock::now();
auto duration = duration_cast<milliseconds>(end_time - start_time);
std::cout << "GC finished. Time taken: " << duration.count() << " ms" << std::endl;
for(int i=0;i<data_number;i++){
// std::cout<<i<<std::endl;
std::string key=std::to_string(i);
std::string value;
Status s=db->Get(readOptions,key,&value);
assert(s.ok());
if(values[i]!=value){
std::cout<<value.size()<<std::endl;
assert(0);
}
ASSERT_TRUE(values[i]==value);
}
delete db;
}
int main(int argc, char** argv) {
// All tests currently run with the same read-only file limits.
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

+ 2
- 2
设计文档.md View File

@ -393,7 +393,7 @@ Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice* value);
调用`MaybeScheduleGarbageCollect()`来安排一个后台线程执行垃圾回收任务。它会等待所有已安排的垃圾回收任务完成,这通过循环检查`background_garbage_collect_scheduled_`标志,并在该标志为真时等待`background_gc_finished_signal_`信号来实现。
```cpp
void DBImpl::TEST_GarbageCollect()
void DBImpl::manual_GarbageCollect()
```
@ -559,7 +559,7 @@ std::vector values;
db->Put(writeOptions,key,value);
}
std::cout<<"start gc"<<std::endl;
db->TEST_GarbageCollect();
db->manual_GarbageCollect();
std::cout<<"finish gc"<<std::endl;
for(int i=0;i<5000;i++){

Loading…
Cancel
Save