diff --git a/CMakeLists.txt b/CMakeLists.txt index 568040d..226c517 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -525,15 +525,7 @@ if(LEVELDB_INSTALL) DESTINATION "${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}" ) endif(LEVELDB_INSTALL) -add_executable(db_test1 +add_executable(valuelogTest "${PROJECT_SOURCE_DIR}/test/test.cpp" ) -target_link_libraries(db_test1 PRIVATE leveldb gtest) -add_executable(db_test2 -"${PROJECT_SOURCE_DIR}/test/test2.cpp" - ) -target_link_libraries(db_test2 PRIVATE leveldb gtest) -add_executable(db_test_bench -"${PROJECT_SOURCE_DIR}/test/benchmark_4leveldb.cpp" - ) -target_link_libraries(db_test_bench PRIVATE leveldb gtest) \ No newline at end of file +target_link_libraries(valuelogTest PRIVATE leveldb gtest) \ No newline at end of file diff --git a/db/db_impl.cc b/db/db_impl.cc index b61fd3d..3b7cd89 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -6,17 +6,16 @@ #include "db/builder.h" #include "db/db_iter.h" -#include "db/true_iter.h" -#include "db/unordered_iter.h" #include "db/dbformat.h" #include "db/filename.h" #include "db/log_reader.h" #include "db/log_writer.h" #include "db/memtable.h" #include "db/table_cache.h" +#include "db/true_iter.h" +#include "db/unordered_iter.h" #include "db/version_set.h" #include "db/write_batch_internal.h" -#include "util/crc32c.h" #include #include #include @@ -39,6 +38,7 @@ #include "table/merger.h" #include "table/two_level_iterator.h" #include "util/coding.h" +#include "util/crc32c.h" #include "util/logging.h" #include "util/mutexlock.h" @@ -162,19 +162,18 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) &internal_comparator_)), use_valuelog_length(raw_options.use_valuelog_length), value_log_size_(raw_options.value_log_size), - valuelog_crc_(raw_options.valuelog_crc){ - } - + valuelog_crc_(raw_options.valuelog_crc) {} DBImpl::~DBImpl() { // Wait for background work to finish. gc_mutex_.Lock(); - while(background_garbage_collect_scheduled_){ - background_gc_finished_signal_.Wait(); + while (background_garbage_collect_scheduled_) { + background_gc_finished_signal_.Wait(); // wait for gc thread finish } - background_garbage_collect_scheduled_=true; + background_garbage_collect_scheduled_ = true; + // avoid gc thread to be trigger again gc_mutex_.Unlock(); - if(gc_thread&&gc_thread->joinable())gc_thread->join(); + if (gc_thread && gc_thread->joinable()) gc_thread->join(); // join gc thread mutex_.Lock(); shutting_down_.store(true, std::memory_order_release); while (background_compaction_scheduled_) { @@ -182,7 +181,7 @@ DBImpl::~DBImpl() { } mutex_.Unlock(); gc_mutex_.Lock(); - background_garbage_collect_scheduled_=false; + background_garbage_collect_scheduled_ = false; gc_mutex_.Unlock(); if (db_lock_ != nullptr) { env_->UnlockFile(db_lock_); @@ -288,6 +287,7 @@ void DBImpl::RemoveObsoleteFiles() { // be recorded in pending_outputs_, which is inserted into "live" keep = (live.find(number) != live.end()); break; + // valuelog file use garbagecollect to delete, not here case kValueLogFile: case kCurrentFile: case kDBLockFile: @@ -684,9 +684,9 @@ Status DBImpl::TEST_CompactMemTable() { return s; } -void DBImpl::TEST_GarbageCollect() { +// this function will trigger and wait for a manual gc +void DBImpl::manual_GarbageCollect() { MaybeScheduleGarbageCollect(); - // Finish current background gc in the case where while (background_garbage_collect_scheduled_) { background_gc_finished_signal_.Wait(); } @@ -731,7 +731,7 @@ void DBImpl::MaybeScheduleGarbageCollect() { gc_mutex_.Lock(); background_garbage_collect_scheduled_ = true; gc_mutex_.Unlock(); - gc_thread =new std::thread(&DBImpl::BGWorkGC, this); + gc_thread = new std::thread(&DBImpl::BGWorkGC, this); } } @@ -840,7 +840,7 @@ void DBImpl::BackgroundCompaction() { CleanupCompaction(compact); c->ReleaseInputs(); RemoveObsoleteFiles(); - if(options_.valuelog_gc)MaybeScheduleGarbageCollect(); + if (options_.valuelog_gc) MaybeScheduleGarbageCollect(); } delete c; @@ -1049,18 +1049,18 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // Hidden by an newer entry for same user key drop = true; // (A) // Parse the value based on its first character - if(ikey.type != kTypeDeletion){ + if (ikey.type != kTypeDeletion) { Slice value = input->value(); char type = value[0]; if (type == 0x00) { // Value is less than 100 bytes, use it directly } else { - // Value is >= 100 bytes, read from external file + // Value is >= 100 bytes, update valuelog meta info for gc uint64_t file_id, valuelog_offset; value.remove_prefix(1); - - status=ParseFakeValueForValuelog(value,file_id,valuelog_offset); - if(!status.ok())break; + + status = ParseFakeValueForValuelog(value, file_id, valuelog_offset); + if (!status.ok()) break; valuelog_usage[file_id]--; } @@ -1101,7 +1101,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { if (compact->builder->NumEntries() == 0) { compact->current_output()->smallest.DecodeFrom(key); } - compact->current_output()->largest.DecodeFrom(key); + compact->current_output()->largest.DecodeFrom(key); compact->builder->Add(key, input->value()); // Close output file if it is big enough @@ -1263,52 +1263,58 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, } Slice value_log_slice = Slice(value->c_str(), value->length()); mutex_.Unlock(); - s=parseTrueValue(&value_log_slice,value,options.verify_checksums_for_valuelog); + s = parseTrueValue(&value_log_slice, value, + options.verify_checksums_for_valuelog); mutex_.Lock(); return s; } -Iterator *DBImpl::NewOriginalIterator(const ReadOptions& options) { +Iterator* DBImpl::NewOriginalIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; uint32_t seed; - int iter_num=24; + int iter_num = 24; mutex_.Lock(); Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed); - auto db_iter=NewDBIterator(this, user_comparator(), iter, - (options.snapshot != nullptr - ? static_cast(options.snapshot) - ->sequence_number() - : latest_snapshot), - seed); - + auto db_iter = + NewDBIterator(this, user_comparator(), iter, + (options.snapshot != nullptr + ? static_cast(options.snapshot) + ->sequence_number() + : latest_snapshot), + seed); + mutex_.Unlock(); return db_iter; } -Iterator* DBImpl::NewUnorderedIterator(const ReadOptions& options,const Slice &lower_key,const Slice &upper_key) { - auto iter=NewOriginalIterator(options); - return NewUnorderedIter(this,iter,dbname_,options,lower_key,upper_key,user_comparator()); +Iterator* DBImpl::NewUnorderedIterator(const ReadOptions& options, + const Slice& lower_key, + const Slice& upper_key) { + auto iter = NewOriginalIterator(options); + return NewUnorderedIter(this, iter, dbname_, options, lower_key, upper_key, + user_comparator()); } Iterator* DBImpl::NewIterator(const ReadOptions& options) { SequenceNumber latest_snapshot; uint32_t seed; - int iter_num=24; + int iter_num = 24; mutex_.Lock(); Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed); - auto db_iter=NewDBIterator(this, user_comparator(), iter, - (options.snapshot != nullptr - ? static_cast(options.snapshot) - ->sequence_number() - : latest_snapshot), - seed); + auto db_iter = + NewDBIterator(this, user_comparator(), iter, + (options.snapshot != nullptr + ? static_cast(options.snapshot) + ->sequence_number() + : latest_snapshot), + seed); mutex_.Unlock(); - return NewTrueIterator(this,db_iter,options.verify_checksums_for_valuelog); + return NewTrueIterator(this, db_iter, options.verify_checksums_for_valuelog); } void DBImpl::RecordReadSample(Slice key) { @@ -1330,7 +1336,6 @@ void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) { // Convenience methods Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { - return DB::Put(o, key, val); } @@ -1345,8 +1350,12 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { w.done = false; MutexLock l(&mutex_); - if(!options.valuelog_write&&updates&&valuelog_finding_key.size()>0){ - WriteBatchInternal::checkValueLog(updates, this,&valuelog_finding_key,&lock_valuelog_key_mutex_cond_); + // condition: hold mutex_, haven't been in writer queue, gc thread is + // searching a key check if the updates have the same key gc thread is + // searching, if true then wait until condition is false + if (!options.valuelog_write && updates && valuelog_finding_key.size() > 0) { + WriteBatchInternal::checkValueLog(updates, this, &valuelog_finding_key, + &lock_valuelog_key_mutex_cond_); } writers_.push_back(&w); while (!w.done && &w != writers_.front()) { @@ -1357,15 +1366,17 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { } // May temporarily unlock and wait. - Status status = MakeRoomForWrite(updates == nullptr&&!options.valuelog_write); + Status status = + MakeRoomForWrite(updates == nullptr && !options.valuelog_write); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; if (status.ok() && updates != nullptr) { // nullptr batch is for compactions WriteBatch* write_batch = BuildBatchGroup(&last_writer); - WriteBatchInternal::ConverToValueLog(write_batch, this,use_valuelog_length);//need lock! to protect valuelog_number + WriteBatchInternal::ConverToValueLog( + write_batch, this, + use_valuelog_length); // write data to valuelog WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); last_sequence += WriteBatchInternal::Count(write_batch); - // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging @@ -1374,7 +1385,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { { mutex_.Unlock(); - status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); bool sync_error = false; if (status.ok() && options.sync) { @@ -1627,91 +1637,96 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { std::vector> DBImpl::WriteValueLog( std::vector> kv) { - - if(valuelogfile_number_==0){ + if (valuelogfile_number_ == 0) { + // only reached when the first time this db use a valuelog since open. + // use this to avoid memenv problem addNewValueLog(); } std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); - std::fstream valueFile(file_name_, std::ios::in | std::ios::out | std::ios::binary); + std::fstream valueFile(file_name_, + std::ios::in | std::ios::out | std::ios::binary); assert(valueFile.is_open()); - valueFile.seekg(0, std::ios::end); // 移动到文件末尾 + valueFile.seekg(0, std::ios::end); uint64_t init_offset = valueFile.tellg(); - // 如果超出fixed_size - if(init_offset>=value_log_size_){ + // if larger then fixed_size + if (init_offset >= value_log_size_) { addNewValueLog(); valueFile.close(); file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); - valueFile =std::fstream(file_name_, std::ios::in | std::ios::out | std::ios::binary); - valueFile.seekg(0, std::ios::end); // 移动到文件末尾 + valueFile = std::fstream(file_name_, + std::ios::in | std::ios::out | std::ios::binary); + valueFile.seekg(0, std::ios::end); init_offset = 0; - valuelog_usage[valuelogfile_number_]=0; - valuelog_origin[valuelogfile_number_]=0; } std::vector> res; - int total_size=0; - total_size+=sizeof(uint64_t)*2*kv.size(); - for(const auto &pr:kv){ - total_size+=pr.first.size()+pr.second.size(); + int total_size = 0; + total_size += sizeof(uint64_t) * 2 * kv.size(); + for (const auto& pr : kv) { + total_size += pr.first.size() + pr.second.size(); } - if(valuelog_crc_){ - total_size+=sizeof(uint32_t)*kv.size(); + if (valuelog_crc_) { + total_size += sizeof(uint32_t) * kv.size(); } - char* buf= new char[total_size];//write all data with one fstream.write using this buf + char* buf = new char[total_size]; // write all data with one fstream.write + // using this buf - uint64_t offset=0; - for (const auto& pr:kv) { + uint64_t offset = 0; + for (const auto& pr : kv) { + res.push_back({valuelogfile_number_, init_offset + offset}); - // 记录 file_id 和 offset - res.push_back({valuelogfile_number_, init_offset+offset}); + auto key = pr.first, value = pr.second; - auto key=pr.first,value=pr.second; + int head_offset = offset; // use for crc - int head_offset=offset;//use for crc - - // 写入 value 的长度 + // format: + // valuelen (uint64_t) + // value (valuelen) + // keylen (uint64_t) + // key (keylen) + // crcvalue (uint32_t) (use if options include valuelog crc) uint64_t value_len = value.size(); - memcpy(buf+offset,&value_len,sizeof(uint64_t)); - offset+=sizeof(uint64_t); + memcpy(buf + offset, &value_len, sizeof(uint64_t)); + offset += sizeof(uint64_t); // 写入 value 本身 - memcpy(buf+offset,value.data(),value_len); - offset+=value_len; - + memcpy(buf + offset, value.data(), value_len); + offset += value_len; + // 写入 key 的长度 uint64_t key_len = key.size(); - memcpy(buf+offset,&key_len,sizeof(uint64_t)); - offset+=sizeof(uint64_t); + memcpy(buf + offset, &key_len, sizeof(uint64_t)); + offset += sizeof(uint64_t); // 写入 key 本身 - memcpy(buf+offset,key.data(),key_len); - offset+=key_len; - - - if(valuelog_crc_){ - uint32_t crc = crc32c::Value(buf+head_offset+sizeof(uint64_t),value_len); - crc=crc32c::Extend(crc,buf+head_offset+value_len+2*sizeof(uint64_t),key_len); - memcpy(buf+offset,&crc,sizeof(uint32_t)); - offset+=sizeof(uint32_t); + memcpy(buf + offset, key.data(), key_len); + offset += key_len; + + if (valuelog_crc_) { + uint32_t crc = + crc32c::Value(buf + head_offset + sizeof(uint64_t), value_len); + crc = crc32c::Extend( + crc, buf + head_offset + value_len + 2 * sizeof(uint64_t), key_len); + memcpy(buf + offset, &crc, sizeof(uint32_t)); + offset += sizeof(uint32_t); } } - valueFile.write(buf,total_size); + valueFile.write(buf, total_size); assert(valueFile.good()); - // 解锁资源或进行其他清理操作 - //valueFile.flush(); // 确保所有缓冲区的数据都被写入文件 - valueFile.close(); - valuelog_usage[valuelogfile_number_]+=res.size(); - valuelog_origin[valuelogfile_number_]+=res.size(); + valueFile.close(); // close includ flush + if (valuelog_origin.count(valuelogfile_number_)) { + valuelog_usage[valuelogfile_number_] += res.size(); + valuelog_origin[valuelogfile_number_] += res.size(); + } delete buf; return res; } - void DBImpl::addNewValueLog() { mutex_.AssertHeld(); valuelogfile_number_ = versions_->NewFileNumber(); @@ -1720,146 +1735,142 @@ void DBImpl::addNewValueLog() { std::fstream valueFile(file_name_, std::ios::app | std::ios::binary); assert(valueFile.is_open()); valueFile.close(); + valuelog_usage[valuelogfile_number_] = 0; // init meta info + valuelog_origin[valuelogfile_number_] = 0; } -static void valuelog_cache_deleter(const leveldb::Slice &key, void *value){ +static void valuelog_cache_deleter(const leveldb::Slice& key, void* value) { delete (RandomAccessFile*)value; } -Status DBImpl::parseTrueValue(Slice* value,std::string *true_value,bool checkcrc){ - if(value->empty()){ - *true_value=""; - } - else if(value->data()[0]==0x00){ +Status DBImpl::parseTrueValue(Slice* value, std::string* true_value, + bool checkcrc) { + if (value->empty()) { // maybe a delete op. return empty string is fine + *true_value = ""; + } else if (value->data()[0] == 0x00) { // a value which don't use valuelog. value->remove_prefix(1); - std::string new_str=std::string(value->data(),value->size()); - *true_value=std::move(new_str); - } - else{ - uint64_t value_id,value_offset; + std::string new_str = std::string(value->data(), value->size()); + *true_value = std::move(new_str); + } else { // a value use valuelog. + uint64_t value_id, value_offset; value->remove_prefix(1); - Status s=ParseFakeValueForValuelog(*value,value_id,value_offset); - if(!s.ok())return s; - return ReadValueLog(value_id,value_offset,true_value,checkcrc); + Status s = ParseFakeValueForValuelog(*value, value_id, value_offset); + if (!s.ok()) return s; + return ReadValueLog(value_id, value_offset, true_value, checkcrc); } return Status::OK(); } Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, - std::string* value,bool check_crc) { - + std::string* value, bool check_crc) { std::string file_name_ = ValueLogFileName(dbname_, file_id); mutex_.Lock(); - if(file_id==valuelogfile_number_||mem_value_log_number_==0){ + if (file_id == valuelogfile_number_ || mem_value_log_number_ == 0) { mutex_.Unlock(); - + std::ifstream inFile(file_name_, std::ios::in | std::ios::binary); + if (!inFile.is_open()) { + return Status::Corruption("valuelog not exist"); + } - inFile.seekg(0, std::ios::end); // get total length + inFile.seekg(0, std::ios::end); // get total length uint64_t totalSize = inFile.tellg(); - if(totalSizeLookup(file_name_)){ + Cache::Handle* handler = nullptr; + if (handler = valuelog_cache->Lookup(file_name_)) { // - } - else{ + } else { RandomAccessFile* new_file; - s=env_->NewRandomAccessFile(file_name_,&new_file); + s = env_->NewRandomAccessFile(file_name_, &new_file); assert(s.ok()); - handler=valuelog_cache->Insert(file_name_,new_file,1,&valuelog_cache_deleter); + handler = valuelog_cache->Insert(file_name_, new_file, 1, + &valuelog_cache_deleter); } - - leveldb::RandomAccessFile* valuelog_file=(RandomAccessFile*)(valuelog_cache->Value(handler)); + + leveldb::RandomAccessFile* valuelog_file = + (RandomAccessFile*)(valuelog_cache->Value(handler)); char buf[sizeof(uint64_t)]; Slice res; - s=valuelog_file->Read(offset,sizeof(uint64_t),&res,buf); + s = valuelog_file->Read(offset, sizeof(uint64_t), &res, buf); assert(s.ok()); - uint64_t value_len=*(uint64_t*)(res.data()); + uint64_t value_len = *(uint64_t*)(res.data()); char value_buf[value_len]; - s=valuelog_file->Read(offset+sizeof(uint64_t),value_len,&res,value_buf); + s = valuelog_file->Read(offset + sizeof(uint64_t), value_len, &res, + value_buf); assert(s.ok()); valuelog_cache->Release(handler); - *value=std::string(res.data(),res.size()); + *value = std::string(res.data(), res.size()); return s; } -// 垃圾回收实现 +// valuelog garbage collection void DBImpl::GarbageCollect() { - // 遍历数据库目录,找到所有 valuelog 文件 + // scan all file to find all valuelog file std::vector filenames; Status s = env_->GetChildren(dbname_, &filenames); Log(options_.info_log, "start gc "); assert(s.ok()); std::vector gc_valuelog_id_vector; - mutex_.Lock();// for visit valuelog_origin/usage - for(const auto&pr:valuelog_origin){ - if( - ((float)valuelog_usage[pr.first])/pr.secondAddOldValueLogFile(valuelog_name); mutex_.Unlock(); - } Log(options_.info_log, "end gc "); } @@ -2090,7 +2109,7 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { impl->RemoveObsoleteFiles(); impl->MaybeScheduleCompaction(); impl->InitializeExistingLogs(); - //impl->addNewValueLog(); + // impl->addNewValueLog(); } impl->mutex_.Unlock(); if (s.ok()) { @@ -2137,115 +2156,123 @@ Status DestroyDB(const std::string& dbname, const Options& options) { // recover for valuelog void DBImpl::InitializeExistingLogs() { + // step1: find all valuelog files std::vector filenames; Status s = env_->GetChildren(dbname_, &filenames); Log(options_.info_log, "start recover for valuelog"); assert(s.ok()); std::set all_valuelog_ids; std::set live_valuelog_ids; - uint64_t latest_valuelog_id=0; - uint64_t latest_valuelog_offset=0; + uint64_t latest_valuelog_id = 0; + uint64_t latest_valuelog_offset = 0; for (const auto& filename : filenames) { uint64_t valuelog_number; FileType type; - ParseFileName(filename,&valuelog_number,&type); - if(type==FileType::kValueLogFile)all_valuelog_ids.emplace(valuelog_number); + ParseFileName(filename, &valuelog_number, &type); + if (type == FileType::kValueLogFile) + all_valuelog_ids.emplace(valuelog_number); } + // this useful iterator do 3 thing: + // 1: get all living valuelog + // 2: recover valuelog_usage + // 3: find the latest valuelog and the lastest living record in it mutex_.Unlock(); - auto db_iter=NewOriginalIterator(ReadOptions()); - for(db_iter->SeekToFirst();db_iter->Valid();db_iter->Next()){ - auto value=db_iter->value(); - if(value.size()&&value[0]==0x01){ + auto db_iter = NewOriginalIterator(ReadOptions()); + for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) { + auto value = db_iter->value(); + if (value.size() && value[0] == 0x01) { value.remove_prefix(1); - uint64_t valuelog_id,valuelog_offset; + uint64_t valuelog_id, valuelog_offset; - Status status=ParseFakeValueForValuelog(value,valuelog_id,valuelog_offset); - if(!status.ok()){//handle error:skip this value, correct? + Status status = + ParseFakeValueForValuelog(value, valuelog_id, valuelog_offset); + if (!status.ok()) { // handle error:skip this value, correct? continue; } valuelog_usage[valuelog_id]++; - if(valuelog_id>=latest_valuelog_id){ - - if(valuelog_id==latest_valuelog_id){ - latest_valuelog_offset=std::max(latest_valuelog_offset,valuelog_offset); - } - else{ - latest_valuelog_id=valuelog_id; - latest_valuelog_offset=valuelog_offset; + if (valuelog_id >= latest_valuelog_id) { + if (valuelog_id == latest_valuelog_id) { + latest_valuelog_offset = + std::max(latest_valuelog_offset, valuelog_offset); + } else { + latest_valuelog_id = valuelog_id; + latest_valuelog_offset = valuelog_offset; } } } } delete db_iter; mutex_.Lock(); - for(const auto& pr:valuelog_usage){ + for (const auto& pr : valuelog_usage) { live_valuelog_ids.emplace(pr.first); } - - for(const auto &id:all_valuelog_ids){ - if(!live_valuelog_ids.count(id)){ - //useless valuelog, delete directly - auto valuelog_name=ValueLogFileName(dbname_,id); - s=env_->RemoveFile(valuelog_name); + // step2: delete all dead valuelog + // a useful step if auto-gc option is disabled + for (const auto& id : all_valuelog_ids) { + if (!live_valuelog_ids.count(id)) { + // useless valuelog, delete directly + auto valuelog_name = ValueLogFileName(dbname_, id); + s = env_->RemoveFile(valuelog_name); assert(s.ok()); } } - if(latest_valuelog_id>0){//delete data that was written in valuelog but not written in WAL - auto valuelog_name=ValueLogFileName(dbname_,latest_valuelog_id); + // step3: find the latest valuelog and the lastest living record in it. + // delete all data after that record (these data probably made by an + // unfinished write). delete these data so gc can delete this valuelog + if (latest_valuelog_id > 0) { + auto valuelog_name = ValueLogFileName(dbname_, latest_valuelog_id); std::ifstream inFile(valuelog_name, std::ios::in | std::ios::binary); - uint64_t value_len,key_len; + uint64_t value_len, key_len; inFile.seekg(latest_valuelog_offset); - inFile.read((char*)(&value_len),sizeof(uint64_t)); - latest_valuelog_offset+=value_len+sizeof(uint64_t); + inFile.read((char*)(&value_len), sizeof(uint64_t)); + latest_valuelog_offset += value_len + sizeof(uint64_t); inFile.seekg(latest_valuelog_offset); - inFile.read((char*)(&key_len),sizeof(uint64_t)); - latest_valuelog_offset+=key_len+sizeof(uint64_t); - if(options_.valuelog_crc)latest_valuelog_offset+=sizeof(uint32_t); - + inFile.read((char*)(&key_len), sizeof(uint64_t)); + latest_valuelog_offset += key_len + sizeof(uint64_t); + if (options_.valuelog_crc) latest_valuelog_offset += sizeof(uint32_t); - char* buf=new char[latest_valuelog_offset]; + char* buf = new char[latest_valuelog_offset]; inFile.seekg(0); - inFile.read(buf,latest_valuelog_offset); + inFile.read(buf, latest_valuelog_offset); inFile.close(); - - std::ofstream trunc_file(valuelog_name, std::ios::out | std::ios::binary | std::ios::trunc); - trunc_file.write(buf,latest_valuelog_offset); + std::ofstream trunc_file( + valuelog_name, std::ios::out | std::ios::binary | std::ios::trunc); + trunc_file.write(buf, latest_valuelog_offset); trunc_file.close(); delete buf; } - for(const auto&id:live_valuelog_ids){//update valuelog_origin - - auto valuelog_name=ValueLogFileName(dbname_,id); + // step 4: update valuelog_origin by scan every valuelog + for (const auto& id : live_valuelog_ids) { + auto valuelog_name = ValueLogFileName(dbname_, id); std::ifstream inFile(valuelog_name, std::ios::in | std::ios::binary); - int data_cnt=0; + int data_cnt = 0; - - uint64_t value_len,key_len; - int cur_offset=0; - while(1){ + uint64_t value_len, key_len; + int cur_offset = 0; + while (1) { inFile.seekg(cur_offset); - inFile.read((char*)(&value_len),sizeof(uint64_t)); + inFile.read((char*)(&value_len), sizeof(uint64_t)); if (inFile.eof()) { - break; // 正常退出条件:到达文件末尾 + break; } - - cur_offset+=value_len+sizeof(uint64_t); + + cur_offset += value_len + sizeof(uint64_t); inFile.seekg(cur_offset); - inFile.read((char*)(&key_len),sizeof(uint64_t)); - cur_offset+=key_len+sizeof(uint64_t); - if(options_.valuelog_crc)cur_offset+=sizeof(uint32_t); + inFile.read((char*)(&key_len), sizeof(uint64_t)); + cur_offset += key_len + sizeof(uint64_t); + if (options_.valuelog_crc) cur_offset += sizeof(uint32_t); data_cnt++; } - valuelog_origin[id]=data_cnt; + valuelog_origin[id] = data_cnt; } } diff --git a/db/db_impl.h b/db/db_impl.h index c198c6b..85af892 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -41,18 +41,6 @@ class DBImpl : public DB { ~DBImpl() override; - // // 序列化为字符串 - // std::string SerializeValue(const FieldArray& fields)override; - - // // 反序列化为字段数组 - // FieldArray ParseValue(const std::string& value_str)override; - - // Status Put_with_fields(const WriteOptions& options, const Slice& key,const - // FieldArray& fields)override; - - // Status Get_with_fields(const ReadOptions& options, const Slice& key, - // FieldArray* fields)override; - // Implementations of the DB interface Status Put(const WriteOptions&, const Slice& key, const Slice& value) override; @@ -68,8 +56,6 @@ class DBImpl : public DB { bool GetProperty(const Slice& property, std::string* value) override; void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void CompactRange(const Slice* begin, const Slice* end) override; - // std::vector>> - // WriteValueLog(std::vector value)override; std::vector> WriteValueLog( std::vector> value) override; void addNewValueLog() override EXCLUSIVE_LOCKS_REQUIRED(mutex_); @@ -88,7 +74,7 @@ class DBImpl : public DB { // Force current memtable contents to be compacted. Status TEST_CompactMemTable(); - void TEST_GarbageCollect() override; + void manual_GarbageCollect() override; // Return an internal iterator over the current state of the database. @@ -236,9 +222,12 @@ class DBImpl : public DB { uint64_t valuelogfile_number_=0; log::Writer* log_; std::map oldvaluelog_ids; - int mem_value_log_number_;//if =0, don't use cache + int mem_value_log_number_;//if =0, don't use cache for valuelog Cache* valuelog_cache; + + // count of record live in a valuelog std::map valuelog_usage; + // count of record written in a valuelog std::map valuelog_origin; std::thread* gc_thread=nullptr; @@ -266,10 +255,12 @@ class DBImpl : public DB { VersionSet* const versions_ GUARDED_BY(mutex_); + //better to be larger then 500. int use_valuelog_length=5000; int value_log_size_; + // if on, the database will use crc check to protect valuelog from any abnormal byte bool valuelog_crc_; // Have we encountered a background error in paranoid mode? diff --git a/db/fields.cc b/db/fields.cc index 858e417..c8fb3d0 100644 --- a/db/fields.cc +++ b/db/fields.cc @@ -2,8 +2,6 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#ifndef STORAGE_LEVELDB_INCLUDE_FIELDS_H_ -#define STORAGE_LEVELDB_INCLUDE_FIELDS_H_ #include @@ -21,43 +19,48 @@ namespace leveldb { return res_; } - void DeserializeValue(const std::string& value_str,FieldArray* res){ + Status DeserializeValue(const std::string& value_str,FieldArray* res){ Slice slice=Slice(value_str.c_str()); uint64_t siz; bool tmpres=GetVarint64(&slice,&siz); - assert(tmpres); + if(!tmpres)return Status::Corruption("Deserialize fail"); res->clear(); for(int i=0;iemplace_back(value_name,value); + if(!tmpres)return Status::Corruption("Deserialize fail"); + res->emplace_back(std::string(value_name.data(),value_name.size()),std::string(value.data(),value.size())); } + return Status::OK(); } Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,std::vector *keys){ auto it=db->NewUnorderedIterator(options,Slice(),Slice()); keys->clear(); - while(it->Valid()){ + while(it->Valid()&&it->status().ok()){ auto val=it->value(); FieldArray arr; auto str_val=std::string(val.data(),val.size()); - DeserializeValue(str_val,&arr); - for(auto pr:arr){ - if(pr.first==field.first&&pr.second==field.second){ - Slice key=it->key(); - keys->push_back(std::string(key.data(),key.size())); - break; - } - } + auto res=DeserializeValue(str_val,&arr); + if(res.ok()) + for(const auto &pr:arr){ + if(pr.first==field.first&&pr.second==field.second){ + Slice key=it->key(); + keys->push_back(std::string(key.data(),key.size())); + break; + } + } it->Next(); } + if(it->Valid()&&!it->status().ok()){ + auto res=it->status(); + delete it; + return res; + } delete it; return Status::OK(); } -} - -#endif // STORAGE_LEVELDB_INCLUDE_FIELDS_H_ \ No newline at end of file +} \ No newline at end of file diff --git a/db/true_iter.cc b/db/true_iter.cc index 9b6fb9a..a8c9d97 100644 --- a/db/true_iter.cc +++ b/db/true_iter.cc @@ -1,62 +1,52 @@ // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include -#include -#include -#include #include "db/true_iter.h" #include "db/db_impl.h" #include "db/dbformat.h" #include "db/filename.h" +#include +#include +#include +#include + #include "leveldb/env.h" #include "leveldb/iterator.h" + #include "port/port.h" #include "util/logging.h" #include "util/mutexlock.h" #include "util/random.h" -#include "port/port.h" namespace leveldb { namespace { - - -// Memtables and sstables that make the DB representation contain -// (userkey,seq,type) => uservalue entries. DBTrueIter -// combines multiple entries for the same userkey found in the DB -// representation into a single entry while accounting for sequence -// numbers, deletion markers, overwrites, etc. +// DBTrueIter simply wrap the DbIter and will parse the true value (maybe from +// valuelog) for user. +// if the crc check fail, DBTrueIter will skip the bad record +// and show it to user by status(). +// bad status will remain showed by status(), like other iterator. class DBTrueIter : public Iterator { public: - // Which direction is the iterator currently moving? - // (1) When moving forward, the internal iterator is positioned at - // the exact entry that yields this->key(), this->value() - // (2) When moving backwards, the internal iterator is positioned - // just before all entries whose user key == this->key(). - DBTrueIter(DBImpl* db, Iterator* iter,bool check_crc) - : - db_(db),iter_(iter),check_crc_(check_crc){} + DBTrueIter(DBImpl* db, Iterator* iter, bool check_crc) + : db_(db), iter_(iter), check_crc_(check_crc) {} DBTrueIter(const DBTrueIter&) = delete; DBTrueIter& operator=(const DBTrueIter&) = delete; - ~DBTrueIter() override { - delete iter_; - } + ~DBTrueIter() override { delete iter_; } bool Valid() const override { return iter_->Valid(); } - Slice key() const override { - return iter_->key(); - } + Slice key() const override { return iter_->key(); } Slice value() const override { - return Slice(buf_for_value.data(),buf_for_value.size()); + return Slice(buf_for_value.data(), buf_for_value.size()); } Status status() const override { - if(status_.ok()) + if (status_.ok()) return iter_->status(); - else return status_; + else + return status_; } void Next() override; @@ -66,60 +56,60 @@ class DBTrueIter : public Iterator { void SeekToLast() override; private: - Status GetAndParseTrueValue(Slice tmp_value){ - Status status=db_->parseTrueValue(&tmp_value,&buf_for_value,check_crc_); - if(!status.ok())status_=status; + Status GetAndParseTrueValue(Slice tmp_value) { + Status status = db_->parseTrueValue(&tmp_value, &buf_for_value, check_crc_); + if (!status.ok()) status_ = status; return status; } - DBImpl* db_; Iterator* const iter_; std::string buf_for_value; - Status status_=Status::OK(); + Status status_ = Status::OK(); bool check_crc_; }; void DBTrueIter::Next() { iter_->Next(); - if(iter_->Valid()){ - Status res=GetAndParseTrueValue(iter_->value()); - if(!res.ok())Next(); + while (iter_->Valid()) { + Status res = GetAndParseTrueValue(iter_->value()); + if (res.ok()) break; + iter_->Next(); } - } void DBTrueIter::Prev() { iter_->Prev(); - if(iter_->Valid()){ - Status res=GetAndParseTrueValue(iter_->value()); - if(!res.ok())Prev(); + while (iter_->Valid()) { + Status res = GetAndParseTrueValue(iter_->value()); + if (res.ok()) break; + iter_->Prev(); } } void DBTrueIter::Seek(const Slice& target) { iter_->Seek(target); - if(iter_->Valid()){ - Status res=GetAndParseTrueValue(iter_->value()); - if(!res.ok())Next();//lowerbound + if (iter_->Valid()) { + Status res = GetAndParseTrueValue(iter_->value()); + if (!res.ok()) Next(); // lowerbound } } void DBTrueIter::SeekToFirst() { iter_->SeekToFirst(); - if(iter_->Valid()){ - Status res=GetAndParseTrueValue(iter_->value()); - if(!res.ok())Next(); + if (iter_->Valid()) { + Status res = GetAndParseTrueValue(iter_->value()); + if (!res.ok()) Next(); } } void DBTrueIter::SeekToLast() { iter_->SeekToLast(); - if(iter_->Valid()){ - Status res=GetAndParseTrueValue(iter_->value()); - if(!res.ok())Prev(); + if (iter_->Valid()) { + Status res = GetAndParseTrueValue(iter_->value()); + if (!res.ok()) Prev(); } } } // anonymous namespace -Iterator* NewTrueIterator(DBImpl* db,Iterator* db_iter,bool check_crc) { - return new DBTrueIter(db,db_iter,check_crc); +Iterator* NewTrueIterator(DBImpl* db, Iterator* db_iter, bool check_crc) { + return new DBTrueIter(db, db_iter, check_crc); } } // namespace leveldb diff --git a/db/unordered_iter.cc b/db/unordered_iter.cc index d22926e..8e33413 100644 --- a/db/unordered_iter.cc +++ b/db/unordered_iter.cc @@ -26,19 +26,15 @@ namespace leveldb { namespace { - -// Memtables and sstables that make the DB representation contain -// (userkey,seq,type) => uservalue entries. UnorderedIter -// combines multiple entries for the same userkey found in the DB -// representation into a single entry while accounting for sequence -// numbers, deletion markers, overwrites, etc. +// DBTrueIter is similar to true_iter. +// The following are the unique points of unordered_iter +// 1.the data return by it will be unordered. +// 2.use its own memory space instead of valuelog cache. +// 3.In most cases, it can save more memory, be faster, and more stable. +// 4.Can not use Seek, SeekToFirst and SeekToLast, use lower_key and upper_key instead. +// 5.return all data in: [lower_key, upper_key) class UnorderedIter : public Iterator { public: - // Which direction is the iterator currently moving? - // (1) When moving forward, the internal iterator is positioned at - // the exact entry that yields this->key(), this->value() - // (2) When moving backwards, the internal iterator is positioned - // just before all entries whose user key == this->key(). enum IterPos {Left,Mid,Right}; UnorderedIter(DBImpl* db, Iterator* iter,std::string db_name,ReadOptions readOptions,const Slice &lower_key,const Slice &upper_key,const Comparator* user_comparator) diff --git a/db/version_set.cc b/db/version_set.cc index 57a3b56..51d89bf 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -83,8 +83,9 @@ Version::~Version() { } } + //update related valuelog's ref for(auto valuelog_name:old_valuelog_names){ - vset_->valuelogmap_mutex.Lock(); + vset_->valuelogmap_mutex.Lock();//lock to visit old_valuelog_map int res=vset_->old_valuelog_map[valuelog_name]--; if(res==1){ vset_->env_->RemoveFile(valuelog_name); diff --git a/db/write_batch.cc b/db/write_batch.cc index 4ce6bc9..efaa4b3 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -127,6 +127,9 @@ class MemTableInserter : public WriteBatch::Handler { sequence_++; } }; + +// will change the order in the writebatch! +// use batch_insert to improve performance class ValueLogInserter : public WriteBatch::Handler { public: WriteBatch writeBatch_; @@ -188,6 +191,8 @@ class ValueLogChecker : public WriteBatch::Handler { keys.push_back(key); } + //check if all data in the writebatch is different from the given key (the key gc is searching) + //if find a key was the same to target key, then all keys must be scaned again. void CheckValid(){ int len=keys.size(); if(!len)return; diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 672ee2c..9b2d06b 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -7,19 +7,19 @@ #include #include +#include #include "leveldb/export.h" #include "leveldb/iterator.h" #include "leveldb/options.h" -#include namespace leveldb { // Update CMakeLists.txt if you change these static const int kMajorVersion = 1; static const int kMinorVersion = 23; -using Field=std::pair; -using FieldArray=std::vector>; +using Field = std::pair; +using FieldArray = std::vector>; struct Options; struct ReadOptions; @@ -90,42 +90,33 @@ class LEVELDB_EXPORT DB { virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value) = 0; - // virtual std::string SerializeValue(const FieldArray& fields); - - // // 反序列化为字段数组 - // virtual void DeserializeValue(const std::string& value_str,FieldArray* res); - - // virtual Status Put_with_fields(const WriteOptions& options, const Slice& key,const FieldArray& fields); - - // virtual Status Get_with_fields(const ReadOptions& options, const Slice& key, - // FieldArray* fields); - - // virtual Status Get_keys_by_field(const ReadOptions& options, const Field field,std::vector *keys); - - // virtual std::vector>> WriteValueLog(std::vector value){ - // assert(0); - // std::vector>> v; - // return v; - // } - virtual std::vector> WriteValueLog(std::vector> value){ + // write a batch of k-v to a valuelog + // return the file_id and offset in valuelog as a pair for every k-v + virtual std::vector> WriteValueLog( + std::vector> value) { assert(0); - std::vector> v; + std::vector> v; return v; } - virtual void addNewValueLog(){assert(0);} + // add a new valuelog for database, should be only used while holding mutex_ + virtual void addNewValueLog() { assert(0); } - virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, std::string* value,bool check_crc){ - assert(0); // Not implemented + // read value from a valuelog + // should be only used while not holding mutex_ + virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, + std::string* value, bool check_crc) { + assert(0); return Status::Corruption("not imp"); } - virtual Status parseTrueValue(Slice* value,std::string* true_value,bool checkcrc){ + // parse the real value from a value provided by the lsm-tree + virtual Status parseTrueValue(Slice* value, std::string* true_value, + bool checkcrc) { assert(0); return Status::Corruption("not imp"); } - // Return a heap-allocated iterator over the contents of the database. // The result of NewIterator() is initially invalid (caller must // call one of the Seek methods on the iterator before using it). @@ -134,7 +125,13 @@ class LEVELDB_EXPORT DB { // The returned iterator should be deleted before this db is deleted. virtual Iterator* NewIterator(const ReadOptions& options) = 0; - virtual Iterator* NewUnorderedIterator(const ReadOptions&,const Slice &lower_key,const Slice &upper_key){ + // Similar to NewIterator(). + // return all data in: [lower_key, upper_key) + // the data returned by NewUnorderedIterator() is unordered. + // Seek, SeekToLast, SeekToFirst is invalid for this. + virtual Iterator* NewUnorderedIterator(const ReadOptions&, + const Slice& lower_key, + const Slice& upper_key) { assert(0); return nullptr; }; @@ -189,10 +186,11 @@ class LEVELDB_EXPORT DB { // Therefore the following call will compact the entire database: // db->CompactRange(nullptr, nullptr); virtual void CompactRange(const Slice* begin, const Slice* end) = 0; - virtual void TEST_GarbageCollect(){}; - - + // trigger a manual garbagecollection. + // it will only return when the garbagecollection is finished. + // might be a very time-consuming operation + virtual void manual_GarbageCollect() {}; }; // Destroy the contents of the specified database. diff --git a/include/leveldb/fields.h b/include/leveldb/fields.h index a9d2b2a..886d197 100644 --- a/include/leveldb/fields.h +++ b/include/leveldb/fields.h @@ -10,9 +10,11 @@ #include "leveldb/db.h" namespace leveldb { + //Serialize vector to a single string std::string SerializeValue(const FieldArray& fields); - void DeserializeValue(const std::string& value_str,FieldArray* res); + //Deserialize vector from a single string + Status DeserializeValue(const std::string& value_str,FieldArray* res); Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,std::vector *keys); } diff --git a/test/benchmark_4leveldb.cpp b/test/benchmark_4leveldb.cpp deleted file mode 100644 index 26a62e0..0000000 --- a/test/benchmark_4leveldb.cpp +++ /dev/null @@ -1,200 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include "leveldb/db.h" -#include "leveldb/write_batch.h" -#include "leveldb/iterator.h" -#include // For stat to get file size on Unix-like systems -#include // For directory reading on Unix-like systems - -#define THREAD_COUNT 16 // 线程数量 -#define PUT_THREAD_COUNT (THREAD_COUNT / 3) // Put线程数量 -#define DELETE_THREAD_COUNT (THREAD_COUNT / 3) // Delete线程数量 -#define ITERATE_THREAD_COUNT (THREAD_COUNT - PUT_THREAD_COUNT - DELETE_THREAD_COUNT) // Iterate线程数量 -#define VALUE_SIZE 1000 // Value的默认大小 -#define DATABASE_PATH "db_benchmark" // 数据库路径 - -std::mutex put_mutex; -std::mutex delete_mutex; -std::mutex iterate_mutex; - -std::pair put_time_count={0,0}; -std::pair delete_time_count={0,0}; -std::pair iterate_time_count={0,0}; - -// Helper function to generate a random string of a given length -std::string GenerateRandomString(size_t length) { - const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; - std::default_random_engine rng(std::random_device{}()); - std::uniform_int_distribution dist(0, sizeof(charset) - 2); - - std::string result; - result.reserve(length); - for (size_t i = 0; i < length; ++i) { - result += charset[dist(rng)]; - } - return result; -} - -void PutData(leveldb::DB* db, int thread_id, int num_entries, size_t value_size) { - leveldb::WriteOptions write_options; - write_options.sync = false; - - auto start_time = std::chrono::high_resolution_clock::now(); // 记录开始时间 - - for (int i = 0; i < num_entries; ++i) { - std::string key = "key_" + std::to_string(thread_id) + "_" + std::to_string(i); - std::string value = GenerateRandomString(value_size); - - leveldb::WriteBatch batch; - batch.Put(key, value); - db->Write(write_options, &batch); - } - - auto end_time = std::chrono::high_resolution_clock::now(); // 记录结束时间 - auto duration = std::chrono::duration_cast(end_time - start_time).count(); - put_mutex.lock(); - put_time_count.first+=duration; - put_time_count.second+=num_entries; - put_mutex.unlock(); -} - -void DeleteData(leveldb::DB* db, int thread_id, int num_entries) { - leveldb::WriteOptions write_options; - write_options.sync = false; - - auto start_time = std::chrono::high_resolution_clock::now(); // 记录开始时间 - - for (int i = 0; i < num_entries; ++i) { - std::string key = "key_" + std::to_string(thread_id) + "_" + std::to_string(i); - - leveldb::WriteBatch batch; - batch.Delete(key); - db->Write(write_options, &batch); - } - auto end_time = std::chrono::high_resolution_clock::now(); // 记录结束时间 - auto duration = std::chrono::duration_cast(end_time - start_time).count(); - delete_mutex.lock(); - delete_time_count.first+=duration; - delete_time_count.second+=num_entries; - delete_mutex.unlock(); -} - -void IterateData(leveldb::DB* db, leveldb::ReadOptions& read_options) { - std::unique_ptr it(db->NewIterator(read_options)); - - auto start_time = std::chrono::high_resolution_clock::now(); // 记录开始时间 - - for (it->SeekToFirst(); it->Valid(); it->Next()) { - // 这里可以选择是否打印键值对,或者仅遍历不做任何操作 - std::cout << "Key: " << it->key().ToString() << ", Value: " << it->value().ToString() << "\n"; - } - - if (!it->status().ok()) { - std::cerr << "Error during iteration: " << it->status().ToString() << "\n"; - } - - auto end_time = std::chrono::high_resolution_clock::now(); // 记录结束时间 - auto duration = std::chrono::duration_cast(end_time - start_time).count(); - iterate_mutex.lock(); - iterate_time_count.first+=duration; - iterate_time_count.second++; - iterate_mutex.unlock(); -} - -// Function to calculate the total size of all files in the database directory -uint64_t CalculateDatabaseSize(const std::string& db_path) { - uint64_t total_size = 0; - DIR* dir = opendir(db_path.c_str()); - if (dir == nullptr) { - std::cerr << "Failed to open directory: " << db_path << "\n"; - return total_size; - } - - struct dirent* entry; - while ((entry = readdir(dir)) != nullptr) { - if (entry->d_type == DT_REG) { // Only consider regular files - std::string full_path = db_path + "/" + entry->d_name; - struct stat file_stat; - if (stat(full_path.c_str(), &file_stat) == 0) { - total_size += file_stat.st_size; - } - } - } - - closedir(dir); - return total_size; -} - -void CleanupDatabase(const std::string& db_path) { - /// Delete database directory - #ifdef _WIN32 - std::string command = "rd /s /q \"" + db_path + "\""; // Windows delete directory - #else - std::string command = "rm -rf \"" + db_path + "\""; // Linux/macOS delete directory - #endif - if (std::system(command.c_str()) == 0) { - std::cout << "Database directory has been successfully deleted" << std::endl; - } else { - std::cerr << "Warning: Failed to delete the database directory. Please check manually!" << std::endl; - } -} - -int main() { - leveldb::DB* db; - leveldb::Options options; - options.create_if_missing = true; - leveldb::Status status = leveldb::DB::Open(options, DATABASE_PATH, &db); - if (!status.ok()) { - std::cerr << "Unable to open/create database: " << status.ToString() << "\n"; - return 1; - } - - const int entries_per_thread = 1000000; // 每个线程执行的操作次数 - std::vector threads; - - // Create snapshot for iterate threads - leveldb::ReadOptions read_options; - read_options.snapshot = db->GetSnapshot(); - - // Start threads for Put operations - for (int i = 0; i < PUT_THREAD_COUNT; ++i) { - threads.emplace_back(PutData, db, i, entries_per_thread, VALUE_SIZE); - } - - // Start threads for Delete operations - for (int i = 0; i < DELETE_THREAD_COUNT; ++i) { - threads.emplace_back(DeleteData, db, i, entries_per_thread); - } - std::this_thread::sleep_for(std::chrono::seconds(10)); - // Start threads for Iterate operations - for (int i = 0; i < ITERATE_THREAD_COUNT; ++i) { - threads.emplace_back(IterateData, db, std::ref(read_options)); - } - - // Wait for all threads to finish - for (auto& th : threads) { - if (th.joinable()) th.join(); - } - threads.clear(); - - // Release the snapshot after all threads have finished - db->ReleaseSnapshot(read_options.snapshot); - - // Close the database - delete db; - std::cout<<"Put average time(per second):"<Put(WriteOptions(), key1, value1); - - std::string value_ret; - FieldArray res1; + std::vector values; + + for(int i=0;i<300;i++){ + auto key=GenKeyByNum(i,300); + std::string true_value; + FieldArray value; + if(i%5){ + value.push_back({key,std::to_string(rand()%10000)}); + value.push_back({std::to_string(rand()%10000),std::to_string(rand()%10000)}); + value.push_back({std::to_string(rand()%10000),std::to_string(rand()%10000)}); + value.push_back({std::to_string(rand()%10000),std::to_string(rand()%10000)}); + value.push_back({std::to_string(rand()%10000),std::to_string(rand()%10000)}); + values.push_back(value); + true_value=SerializeValue(value); + { + FieldArray tmpvalue; + ASSERT_TRUE(DeserializeValue(true_value,&tmpvalue).ok()); + ASSERT_TRUE(tmpvalue==value); + } + } + else true_value=std::to_string(rand()%10000); + db->Put(writeOptions,key,true_value); + } - db->Get(ReadOptions(), key1, &value_ret); - DeserializeValue(value_ret, &res1); - ASSERT_TRUE(CompareFieldArray(fields1, res1)); + int cnt=0; + for(int i=0;i<300;i++){ + auto key=GenKeyByNum(i,300); + std::string true_value; + FieldArray value; + db->Get(readOptions,key,&true_value); - db->Delete(WriteOptions(),key1); + if(i%5){ + ASSERT_TRUE(DeserializeValue(true_value,&value).ok()); + ASSERT_TRUE(values[cnt++]==value); + } + else{ + ASSERT_FALSE(DeserializeValue(true_value,&value).ok()); + } + } delete db; - } TEST(Test, get_keys_by_field_test) { @@ -280,6 +298,7 @@ TEST(Test, get_keys_by_field_test) { std::vector target_keys; for(int i=0;i<10000;i++){ std::string key=std::to_string(rand()%10000)+"_"+std::to_string(i);//random for generate nonincreasing keys + std::string value; FieldArray fields={ {"name", key}, {"address", std::to_string(rand()%7)}, @@ -288,9 +307,16 @@ TEST(Test, get_keys_by_field_test) { if(rand()%5==0){ fields[0].second="special_key"; target_keys.push_back(key); + value=SerializeValue(fields); + } + else if(rand()%123==0){ + value=std::to_string(rand()%10000); + } + else{ + value=SerializeValue(fields); } keys.push_back(key); - db->Put(WriteOptions(),key,SerializeValue(fields)); + db->Put(WriteOptions(),key,value); } std::sort(target_keys.begin(),target_keys.end()); std::vector key_res; @@ -449,6 +475,69 @@ TEST(Test, valuelog_corruption_test) { delete db; } +TEST(Test, valuelog_whole_file_corruption_test) { + DB *db; + WriteOptions writeOptions; + ReadOptions readOptions; + readOptions.verify_checksums_for_valuelog=true; + Options dbOptions; + dbOptions.use_valuelog_length=100; + dbOptions.valuelog_gc=false; + dbOptions.value_log_size=1<<26; + dbOptions.valuelog_crc=true; + //a record size:8+4+8+4*5000+(4)=20024 + //64*1024*1024/20024=3351.42 + if(OpenDB(&db,dbOptions).ok() == false) { + std::cerr << "open db failed" << std::endl; + abort(); + } + //test Put + std::vector values; + for(int i=0;i<5000;i++){ + std::string key=GenKeyByNum(i,5000); + std::string value; + for(int j=0;j<5000;j++){ + value+=key; + } + values.push_back(value); + db->Put(writeOptions,key,value); + } + for(int i=0;i<5000;i++){ + std::string key=GenKeyByNum(i,5000); + std::string value; + Status s=db->Get(readOptions,key,&value); + assert(s.ok()); + ASSERT_TRUE(values[i]==value); + } + + std::vector files; + auto env_=Env::Default(); + ASSERT_TRUE(env_->GetChildren(dbName, &files).ok()); + for(auto file:files){ + uint64_t number; + FileType fileType; + ParseFileName(file,&number,&fileType); + if(fileType==FileType::kValueLogFile){ + env_->RemoveFile(dbName+ "/" + file); + } + } + + //the second record is corrupt, + for(int i=0;i<5000;i++){ + std::string key=GenKeyByNum(i,5000); + std::string value; + ASSERT_FALSE(db->Get(readOptions,key,&value).ok()); + } + + auto iter=db->NewIterator(readOptions); + iter->SeekToFirst(); + ASSERT_FALSE(iter->Valid()); + + delete iter; + delete db; +} + + TEST(Test, garbage_collect_test) { DB *db; @@ -493,7 +582,7 @@ TEST(Test, garbage_collect_test) { ASSERT_TRUE(oldest_valuelog_id<1000); db->CompactRange(nullptr,nullptr);//create garbage - db->TEST_GarbageCollect(); + db->manual_GarbageCollect(); for(int i=0;i<50000;i++){ std::string key=std::to_string(i); std::string value; @@ -592,7 +681,15 @@ TEST(Test, recovery_test){ } ASSERT_TRUE(oldest_valuelog_id<1000); db->CompactRange(nullptr,nullptr);//create garbage - db->TEST_GarbageCollect(); + db->manual_GarbageCollect(); + for(int i=0;i<5000;i++){ + std::string key=GenKeyByNum(i,5000); + std::string value; + for(int j=0;j<5000;j++){ + value+=key; + } + db->Put(writeOptions,key,value); + } db->CompactRange(nullptr,nullptr);//update version std::vector new_filenames; diff --git a/test/test2.cpp b/test/test2.cpp deleted file mode 100644 index 0d2dfda..0000000 --- a/test/test2.cpp +++ /dev/null @@ -1,81 +0,0 @@ -#include "gtest/gtest.h" -#include "leveldb/env.h" -#include "leveldb/db.h" -#include "util/coding.h" -#include -#include - -using namespace std::chrono; -using namespace leveldb; - -using Field=std::pair; -using FieldArray=std::vector>; - -int data_number=100000; - -Status OpenDB(std::string dbName, DB **db) { - Options options; - options.max_file_size=16*1024; - options.write_buffer_size=32*1024; - options.create_if_missing = true; - return DB::Open(options, dbName, db); -} - - -TEST(Test, Garbage_Collect_TEST) { - DB *db; - WriteOptions writeOptions; - ReadOptions readOptions; - if(OpenDB("testdb_for_XOY_large", &db).ok() == false) { - std::cerr << "open db failed" << std::endl; - abort(); - } - std::vector values; - for(int i=0;iPut(writeOptions,key,value); - } - // for(int i=0;iPut(writeOptions,key,value); - // } - // Measure GC time - auto start_time = high_resolution_clock::now(); - db->TEST_GarbageCollect(); - auto end_time = high_resolution_clock::now(); - - auto duration = duration_cast(end_time - start_time); - std::cout << "GC finished. Time taken: " << duration.count() << " ms" << std::endl; - - - for(int i=0;iGet(readOptions,key,&value); - assert(s.ok()); - if(values[i]!=value){ - std::cout< values; db->Put(writeOptions,key,value); } std::cout<<"start gc"<TEST_GarbageCollect(); + db->manual_GarbageCollect(); std::cout<<"finish gc"<