From f7fa26c9df169aba085c2e95c6e50da7423ac37a Mon Sep 17 00:00:00 2001 From: xxy <3237539022@qq.com> Date: Thu, 5 Dec 2024 10:06:52 +0800 Subject: [PATCH] gc pass make --- CMakeLists.txt | 7 +- db/db_impl.cc | 397 +++++++++++++++++++++++++++++++++++++++--------- db/db_impl.h | 54 ++++--- db/db_iter.cc | 3 +- db/write_batch.cc | 6 +- include/leveldb/db.h | 4 +- include/leveldb/slice.h | 2 +- test/test.cpp | 132 ++++++++-------- 8 files changed, 436 insertions(+), 169 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 91a44f2..99951c3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,13 +2,14 @@ # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. See the AUTHORS file for names of contributors. -cmake_minimum_required(VERSION 3.9) +cmake_minimum_required(VERSION 3.10) # Keep the version below in sync with the one in db.h project(leveldb VERSION 1.23.0 LANGUAGES C CXX) # C standard can be overridden when this is used as a sub-project. if(NOT CMAKE_C_STANDARD) # This project can use C11, but will gracefully decay down to C89. + # 我改到17了 set(CMAKE_C_STANDARD 11) set(CMAKE_C_STANDARD_REQUIRED OFF) set(CMAKE_C_EXTENSIONS OFF) @@ -16,8 +17,8 @@ endif(NOT CMAKE_C_STANDARD) # C++ standard can be overridden when this is used as a sub-project. if(NOT CMAKE_CXX_STANDARD) - # This project requires C++11. - set(CMAKE_CXX_STANDARD 11) + # This project requires C++17. + set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) endif(NOT CMAKE_CXX_STANDARD) diff --git a/db/db_impl.cc b/db/db_impl.cc index d3b1471..87664b4 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4,15 +4,6 @@ #include "db/db_impl.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include #include "db/builder.h" #include "db/db_iter.h" #include "db/dbformat.h" @@ -23,11 +14,23 @@ #include "db/table_cache.h" #include "db/version_set.h" #include "db/write_batch_internal.h" +#include +#include +#include +#include + +#include +#include +#include +#include +#include + #include "leveldb/db.h" #include "leveldb/env.h" #include "leveldb/status.h" #include "leveldb/table.h" #include "leveldb/table_builder.h" + #include "port/port.h" #include "table/block.h" #include "table/merger.h" @@ -35,6 +38,8 @@ #include "util/coding.h" #include "util/logging.h" #include "util/mutexlock.h" +#include +namespace fs = std::filesystem; namespace leveldb { @@ -146,6 +151,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) seed_(0), tmp_batch_(new WriteBatch), background_compaction_scheduled_(false), + background_garbage_collect_scheduled_(false), manual_compaction_(nullptr), versions_(new VersionSet(dbname_, &options_, table_cache_, &internal_comparator_)) {} @@ -667,6 +673,7 @@ void DBImpl::RecordBackgroundError(const Status& s) { void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); + if (background_compaction_scheduled_) { // Already scheduled } else if (shutting_down_.load(std::memory_order_acquire)) { @@ -682,6 +689,25 @@ void DBImpl::MaybeScheduleCompaction() { } } +void DBImpl::MaybeScheduleGarbageCollect() { + mutex_.AssertHeld(); + if (background_garbage_collect_scheduled_) { + // Garbage collection already scheduled + } else if (shutting_down_.load(std::memory_order_acquire)) { + // DB is being deleted; no more background work + } else if (!bg_error_.ok()) { + // Already got an error; no more changes + } else { + background_garbage_collect_scheduled_ = true; + env_->Schedule(&DBImpl::BGWorkGC, this); + } + +} + +void DBImpl::BGWorkGC(void* db) { + reinterpret_cast(db)->BackgroundGarbageCollect(); +} + void DBImpl::BGWork(void* db) { reinterpret_cast(db)->BackgroundCall(); } @@ -699,12 +725,34 @@ void DBImpl::BackgroundCall() { background_compaction_scheduled_ = false; + // Check if garbage collection needs to be scheduled after compaction + MaybeScheduleGarbageCollect(); + // Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed. MaybeScheduleCompaction(); background_work_finished_signal_.SignalAll(); } +void DBImpl::BackgroundGarbageCollect() { + MutexLock l(&mutex_); + assert(background_garbage_collect_scheduled_); + + if (shutting_down_.load(std::memory_order_acquire)) { + // No more background work when shutting down. + } else if (!bg_error_.ok()) { + // No more background work after a background error. + } else { + // Perform garbage collection here + GarbageCollect(); + } + + background_garbage_collect_scheduled_ = false; + + // Notify any waiting threads + background_work_finished_signal_.SignalAll(); +} + void DBImpl::BackgroundCompaction() { mutex_.AssertHeld(); @@ -1163,21 +1211,22 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, if (imm != nullptr) imm->Unref(); current->Unref(); - if(value->c_str()[0]==0x00){ - *value=value->substr(1); + if (value->c_str()[0] == 0x00) { + *value = value->substr(1); return s; } - Slice value_log_slice=Slice(value->c_str()+1,value->length()); + Slice value_log_slice = Slice(value->c_str() + 1, value->length()); + Slice new_key; Slice new_value; - int value_offset=sizeof(uint64_t)*2;// 16 - uint64_t file_id,valuelog_offset; - bool res=GetVarint64(&value_log_slice,&file_id); - if(!res)return Status::Corruption("can't decode file id"); - res=GetVarint64(&value_log_slice,&valuelog_offset); - if(!res)return Status::Corruption("can't decode valuelog offset"); - ReadValueLog(file_id,valuelog_offset,&new_value); - *value=std::string(new_value.data(),new_value.size()); - delete []new_value.data(); + int value_offset = sizeof(uint64_t) * 2; // 16 + uint64_t file_id, valuelog_offset; + bool res = GetVarint64(&value_log_slice, &file_id); + if (!res) return Status::Corruption("can't decode file id"); + res = GetVarint64(&value_log_slice, &valuelog_offset); + if (!res) return Status::Corruption("can't decode valuelog offset"); + ReadValueLog(file_id, valuelog_offset, &new_key, &new_value); + *value = std::string(new_value.data(), new_value.size()); + delete[] new_value.data(); return s; } @@ -1215,7 +1264,6 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { return DB::Put(o, key, val); } - Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { return DB::Delete(options, key); } @@ -1241,7 +1289,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Writer* last_writer = &w; if (status.ok() && updates != nullptr) { // nullptr batch is for compactions WriteBatch* write_batch = BuildBatchGroup(&last_writer); - WriteBatchInternal::ConverToValueLog(write_batch,this); + WriteBatchInternal::ConverToValueLog(write_batch, this); WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); last_sequence += WriteBatchInternal::Count(write_batch); @@ -1501,7 +1549,8 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { v->Unref(); } -// std::vector>> DBImpl::WriteValueLog(std::vector values){ +// std::vector>> +// DBImpl::WriteValueLog(std::vector values){ // std::string file_name_=ValueLogFileName(dbname_,valuelogfile_number_); // std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); @@ -1521,37 +1570,54 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { // return res; // } -std::vector> DBImpl::WriteValueLog(std::vector values) { +std::vector> DBImpl::WriteValueLog( + std::vector> kv) { std::string file_name_ = ValueLogFileName(dbname_, valuelogfile_number_); std::ofstream valueFile(file_name_, std::ios::app | std::ios::binary); if (!valueFile.is_open()) { - assert(0); + assert(0); } uint64_t offset = valueFile.tellp(); std::vector> res; - for (const auto& slice : values) { - uint64_t len = slice.size(); + for (const auto& [key_slice, value_slice] : kv) { + // 写入 key 的长度 + uint64_t key_len = key_slice.size(); + valueFile.write(reinterpret_cast(&key_len), sizeof(uint64_t)); + if (!valueFile.good()) { + valueFile.close(); + assert(0); + } - // 先写入长度 - valueFile.write(reinterpret_cast(&len), sizeof(uint64_t)); + // 写入 key 本身 + valueFile.write(key_slice.data(), key_len); if (!valueFile.good()) { - valueFile.close(); - assert(0); + valueFile.close(); + assert(0); } - // 再写入实际数据 - valueFile.write(slice.data(), len); + // 写入 value 的长度 + uint64_t value_len = value_slice.size(); + valueFile.write(reinterpret_cast(&value_len), + sizeof(uint64_t)); if (!valueFile.good()) { - valueFile.close(); - assert(0); + valueFile.close(); + assert(0); + } + + // 写入 value 本身 + valueFile.write(value_slice.data(), value_len); + if (!valueFile.good()) { + valueFile.close(); + assert(0); } // 记录 file_id 和 offset res.push_back({valuelogfile_number_, offset}); - offset += sizeof(uint64_t) + len; + // 更新偏移量 + offset += sizeof(uint64_t) + key_len + sizeof(uint64_t) + value_len; } // 解锁资源或进行其他清理操作 @@ -1560,83 +1626,266 @@ std::vector> DBImpl::WriteValueLog(std::vector values){ - for(int i=0;i values) { + for (int i = 0; i < values.size(); i++) { target_file->Append(values[i]); } } -void DBImpl::addNewValueLog(){ - valuelogfile_number_=versions_->NewFileNumber(); +void DBImpl::addNewValueLog() { + valuelogfile_number_ = versions_->NewFileNumber(); } -// Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value){ +// Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* value) +// { // //lock_shared -// Status s=Status::OK(); -// std::string file_name_=ValueLogFileName(dbname_,file_id); -// //std::cout<(&len), sizeof(uint64_t)); +// char *value_buf_len=new char[sizeof(uint64_t)]; +// inFile.read(value_buf_len,sizeof(uint64_t)); +// uint64_t len=0; +// std::memcpy(&len, value_buf_len, sizeof(uint64_t)); + +// if (!inFile.good()) { +// inFile.close(); +// return Status::Corruption("Failed to read length from file!"); +// } + +// // Now seek to the actual data position and read the value +// inFile.seekg(offset + sizeof(uint64_t)); +// char* value_buf = new char[len]; +// inFile.read(value_buf, len); +// if (!inFile.good()) { +// delete[] value_buf; +// inFile.close(); +// return Status::Corruption("Failed to read value from file!"); +// } + +// // Close the file after reading // inFile.close(); -// *value=Slice(value_buf,len); + +// // Assign the read data to the Slice +// *value = Slice(value_buf, len); + // return s; // } -Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* value) { - //lock_shared +Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, + Slice* value) { Status s = Status::OK(); std::string file_name_ = ValueLogFileName(dbname_, file_id); - + // Open the file in binary mode for reading std::ifstream inFile(file_name_, std::ios::in | std::ios::binary); if (!inFile.is_open()) { - std::cerr << "Failed to open file: " << file_name_ << " for reading!" << std::endl; - return Status::Corruption("Failed to open file for reading!"); + std::cerr << "Failed to open file: " << file_name_ << " for reading!" + << std::endl; + return Status::Corruption("Failed to open file for reading!"); } - // Seek to the position of len + // Seek to the position of key length inFile.seekg(offset); + // Read the length of the key + char* key_buf_len = new char[sizeof(uint64_t)]; + inFile.read(key_buf_len, sizeof(uint64_t)); + uint64_t key_len = 0; + std::memcpy(&key_len, key_buf_len, sizeof(uint64_t)); + + if (!inFile.good()) { + delete[] key_buf_len; + inFile.close(); + return Status::Corruption("Failed to read key length from file!"); + } + + // Now seek to the actual key position and read the key + inFile.seekg(offset + sizeof(uint64_t)); + char* key_buf = new char[key_len]; + inFile.read(key_buf, key_len); + if (!inFile.good()) { + delete[] key_buf; + delete[] key_buf_len; + inFile.close(); + return Status::Corruption("Failed to read key from file!"); + } + + // Assign the read key data to the Slice + *key = Slice(key_buf, key_len); + // Read the length of the value - // uint64_t len; - // inFile.read(reinterpret_cast(&len), sizeof(uint64_t)); - char *value_buf_len=new char[sizeof(uint64_t)]; - inFile.read(value_buf_len,sizeof(uint64_t)); - uint64_t len=0; - std::memcpy(&len, value_buf_len, sizeof(uint64_t)); + inFile.seekg(offset + sizeof(uint64_t) + key_len); + char* value_buf_len = new char[sizeof(uint64_t)]; + inFile.read(value_buf_len, sizeof(uint64_t)); + uint64_t val_len = 0; + std::memcpy(&val_len, value_buf_len, sizeof(uint64_t)); if (!inFile.good()) { - inFile.close(); - return Status::Corruption("Failed to read length from file!"); + delete[] key_buf; + delete[] key_buf_len; + delete[] value_buf_len; + inFile.close(); + return Status::Corruption("Failed to read value length from file!"); } // Now seek to the actual data position and read the value - inFile.seekg(offset + sizeof(uint64_t)); - char* value_buf = new char[len]; - inFile.read(value_buf, len); + inFile.seekg(offset + sizeof(uint64_t) + key_len + sizeof(uint64_t)); + char* value_buf = new char[val_len]; + inFile.read(value_buf, val_len); if (!inFile.good()) { - delete[] value_buf; - inFile.close(); - return Status::Corruption("Failed to read value from file!"); + delete[] key_buf; + delete[] key_buf_len; + delete[] value_buf_len; + delete[] value_buf; + inFile.close(); + return Status::Corruption("Failed to read value from file!"); } // Close the file after reading inFile.close(); - // Assign the read data to the Slice - *value = Slice(value_buf, len); + // Assign the read value data to the Slice + *value = Slice(value_buf, val_len); return s; } +// 判断文件是否为 valuelog 文件 +bool IsValueLogFile(const std::string& filename) { + return filename.find("valuelog_") == + 0; // 简单判断文件名是否匹配 valuelog 前缀 +} + +// 示例:解析 sstable 中的元信息 +void ParseStoredValue(const std::string& stored_value, uint64_t& valuelog_id, + uint64_t& offset) { + // 假设 stored_value 格式为:valuelog_id|offset + std::istringstream iss(stored_value); + iss >> valuelog_id >> offset; +} + +// 示例:获取 ValueLog 文件 ID +uint64_t GetValueLogID(const std::string& valuelog_name) { + // 假设文件名中包含唯一的 ID,例如 "valuelog_1" + auto pos = valuelog_name.find_last_of('_'); + return std::stoull(valuelog_name.substr(pos + 1)); +} + +// 垃圾回收实现 +void DBImpl::GarbageCollect() { + // 遍历数据库目录,找到所有 valuelog 文件 + auto files_set = fs::directory_iterator(dbname_); + for (const auto& cur_log_file : files_set) { + if (fs::exists(cur_log_file) && + fs::is_regular_file(fs::status(cur_log_file)) && + IsValueLogFile(cur_log_file.path().filename().string())) { + std::string valuelog_name = cur_log_file.path().string(); + uint64_t cur_log_number = GetValueLogID(valuelog_name); + uint64_t new_log_number = versions_->NewFileNumber(); + WritableFile* new_valuelog = nullptr; + std::string new_valuelog_name = LogFileName(dbname_, new_log_number); + Status s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), + &new_valuelog); + if (!s.ok()) { + // Avoid chewing through file number space in a tight loop. + versions_->ReuseFileNumber(new_log_number); + break; + } + addNewValueLog(); + + std::ifstream cur_valuelog(valuelog_name, std::ios::in | std::ios::binary); + if (!cur_valuelog.is_open()) { + std::cerr << "Failed to open ValueLog file: " << valuelog_name + << std::endl; + continue; + } + + // whether to reopen + std::ifstream new_valuelog_file(new_valuelog_name, + std::ios::in | std::ios::binary); + if (!new_valuelog_file.is_open()) { + std::cerr << "Failed to create new ValueLog file: " << new_valuelog_name + << std::endl; + continue; + } + + uint64_t current_offset = 0; + uint64_t new_offset = 0; // 新的 ValueLog 偏移 + + while (true) { + // 读取一个 kv 对 + uint64_t key_len, value_len; + Slice key, value; + Slice new_value; + + ReadValueLog(cur_log_number, current_offset, &key, &value); + value = std::string(new_value.data(), new_value.size()); + + // 检查 key 是否在 sstable 中存在 + std::string stored_value; + Status status = Get(leveldb::ReadOptions(), key, &stored_value); + + if (status.IsNotFound()) { + // Key 不存在,忽略此记录 + continue; + } + + if (!status.ok()) { + std::cerr << "Error accessing sstable: " << status.ToString() + << std::endl; + continue; + } + + // 检查 valuelog_id 和 offset 是否匹配 + uint64_t stored_valuelog_id, stored_offset; + ParseStoredValue(stored_value, stored_valuelog_id, + stored_offset); // 假设解析函数 + if (stored_valuelog_id != GetValueLogID(valuelog_name) || + stored_offset != current_offset) { + // 记录无效,跳过 + continue; + } + + status = Put(leveldb::WriteOptions(), key, value); + if (!status.ok()) { + std::cerr << "Error accessing sstable: " << status.ToString() + << std::endl; + continue; + } + + // 更新偏移 + new_offset += + sizeof(key_len) + key.size() + sizeof(value_len) + value.size(); + + // 更新当前偏移 + current_offset += + sizeof(key_len) + key.size() + sizeof(value_len) + value.size(); + } + + // 清理旧文件(如果需要) + cur_valuelog.close(); + new_valuelog_file.close(); + + std::remove(valuelog_name.c_str()); // 删除旧的 ValueLog 文件 + } + } +} + // Default implementations of convenience methods that subclasses of DB // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { diff --git a/db/db_impl.h b/db/db_impl.h index 513690b..2c2d100 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -5,19 +5,20 @@ #ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_ #define STORAGE_LEVELDB_DB_DB_IMPL_H_ +#include "db/dbformat.h" +#include "db/log_writer.h" +#include "db/snapshot.h" #include #include -#include -#include -#include #include +#include +#include #include +#include -#include "db/dbformat.h" -#include "db/log_writer.h" -#include "db/snapshot.h" #include "leveldb/db.h" #include "leveldb/env.h" + #include "port/port.h" #include "port/thread_annotations.h" @@ -44,12 +45,12 @@ class DBImpl : public DB { // // 反序列化为字段数组 // FieldArray ParseValue(const std::string& value_str)override; - // Status Put_with_fields(const WriteOptions& options, const Slice& key,const FieldArray& fields)override; + // Status Put_with_fields(const WriteOptions& options, const Slice& key,const + // FieldArray& fields)override; // Status Get_with_fields(const ReadOptions& options, const Slice& key, // FieldArray* fields)override; - // Implementations of the DB interface Status Put(const WriteOptions&, const Slice& key, const Slice& value) override; @@ -63,14 +64,19 @@ class DBImpl : public DB { bool GetProperty(const Slice& property, std::string* value) override; void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void CompactRange(const Slice* begin, const Slice* end) override; - // std::vector>> WriteValueLog(std::vector value)override; - std::vector> WriteValueLog(std::vector value)override; - void writeValueLogForCompaction(WritableFile* target_file,std::vector value); - void addNewValueLog()override EXCLUSIVE_LOCKS_REQUIRED(mutex_);; - std::pair getNewValuelog();//use for compaction - // Status ReadValueLog(uint64_t file_id, uint64_t offset,uint64_t len,Slice* value)override; - Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice* value)override; - + // std::vector>> + // WriteValueLog(std::vector value)override; + std::vector> WriteValueLog( + std::vector> value) override; + void writeValueLogForCompaction(WritableFile* target_file, + std::vector value); + void addNewValueLog() override EXCLUSIVE_LOCKS_REQUIRED(mutex_); + ; + std::pair getNewValuelog(); // use for compaction + // Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice* + // value)override; + Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, + Slice* value) override; // Extra methods (for testing) that are not in the public DB interface @@ -161,9 +167,15 @@ class DBImpl : public DB { void RecordBackgroundError(const Status& s); void MaybeScheduleCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void MaybeScheduleGarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_); static void BGWork(void* db); + static void BGWorkGC(void* db); + void BackgroundCall(); void BackgroundCompaction() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void BackgroundGarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void GarbageCollect() EXCLUSIVE_LOCKS_REQUIRED(mutex_); + void CleanupCompaction(CompactionState* compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status DoCompactionWork(CompactionState* compact) @@ -195,7 +207,7 @@ class DBImpl : public DB { // State below is protected by mutex_ port::Mutex mutex_; - //std::shared_mutex value_log_mutex; + // std::shared_mutex value_log_mutex; std::atomic shutting_down_; port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_); MemTable* mem_; @@ -203,11 +215,11 @@ class DBImpl : public DB { std::atomic has_imm_; // So bg thread can detect non-null imm_ WritableFile* logfile_; WritableFile* valuelogfile_; - int valuelogfile_offset=0; + int valuelogfile_offset = 0; uint64_t logfile_number_; uint64_t valuelogfile_number_; log::Writer* log_; - std::map oldvaluelog_ids; + std::map oldvaluelog_ids; uint32_t seed_ GUARDED_BY(mutex_); // For sampling. // Queue of writers. @@ -223,6 +235,10 @@ class DBImpl : public DB { // Has a background compaction been scheduled or is running? bool background_compaction_scheduled_ GUARDED_BY(mutex_); + // Has a background gc been scheduled or is running? + bool background_garbage_collect_scheduled_ GUARDED_BY(mutex_); + + ManualCompaction* manual_compaction_ GUARDED_BY(mutex_); VersionSet* const versions_ GUARDED_BY(mutex_); diff --git a/db/db_iter.cc b/db/db_iter.cc index 350d175..e27f1a4 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -69,6 +69,7 @@ class DBIter : public Iterator { Slice value() const override { assert(valid_); auto tmp_value= (direction_ == kForward) ? iter_->value() : saved_value_; + Slice key; if(tmp_value.data()[0]==0x00){ tmp_value.remove_prefix(1); return tmp_value; @@ -82,7 +83,7 @@ class DBIter : public Iterator { // res=GetVarint64(&tmp_value,&valuelog_len); // if(!res)assert(0); // db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value); - db_->ReadValueLog(file_id,valuelog_offset,&tmp_value); + db_->ReadValueLog(file_id,valuelog_offset, &key, &tmp_value); return tmp_value; } Status status() const override { diff --git a/db/write_batch.cc b/db/write_batch.cc index 14df572..6030f47 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -145,9 +145,9 @@ class ValueLogInserter : public WriteBatch::Handler { } else{ buf+=(char)(0x01); - std::vector v; - v.push_back(value); - auto res=db_->WriteValueLog(v); + std::vector> kv; + kv.push_back({key,value}); + auto res=db_->WriteValueLog(kv); PutVarint64(&buf,res[0].first); // PutVarint64(&buf,res[0].second.first); // PutVarint64(&buf,res[0].second.second); diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 116744f..a72a6db 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -107,7 +107,7 @@ class LEVELDB_EXPORT DB { // std::vector>> v; // return v; // } - virtual std::vector> WriteValueLog(std::vector value){ + virtual std::vector> WriteValueLog(std::vector> value){ assert(0); std::vector> v; return v; @@ -119,7 +119,7 @@ class LEVELDB_EXPORT DB { // assert(0); // Not implemented // return Status::Corruption("not imp"); // } - virtual Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice* value){ + virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, Slice* value){ assert(0); // Not implemented return Status::Corruption("not imp"); } diff --git a/include/leveldb/slice.h b/include/leveldb/slice.h index d225f34..d78c078 100644 --- a/include/leveldb/slice.h +++ b/include/leveldb/slice.h @@ -69,7 +69,7 @@ class LEVELDB_EXPORT Slice { // Drop the first "n" bytes from this slice. void remove_prefix(size_t n) { - if(n>size()){ + if (n > size()) { assert(0); } assert(n <= size()); diff --git a/test/test.cpp b/test/test.cpp index 78240d9..6e523ba 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -86,77 +86,77 @@ Status Get_keys_by_field(DB *db,const ReadOptions& options, const Field field,st return Status::OK(); } -TEST(Test, CheckGetFields) { - DB *db; - WriteOptions writeOptions; - ReadOptions readOptions; - if(OpenDB("testdb_for_XOY", &db).ok() == false) { - std::cerr << "open db failed" << std::endl; - abort(); - } - std::string key1 = "k_1"; +// TEST(Test, CheckGetFields) { +// DB *db; +// WriteOptions writeOptions; +// ReadOptions readOptions; +// if(OpenDB("testdb_for_XOY", &db).ok() == false) { +// std::cerr << "open db failed" << std::endl; +// abort(); +// } +// std::string key1 = "k_1"; - FieldArray fields1 = { - {"name", "Customer#000000001"}, - {"address", "IVhzIApeRb"}, - {"phone", "25-989-741-2988"} - }; +// FieldArray fields1 = { +// {"name", "Customer#000000001"}, +// {"address", "IVhzIApeRb"}, +// {"phone", "25-989-741-2988"} +// }; - auto value1=SerializeValue(fields1); +// auto value1=SerializeValue(fields1); - db->Put(WriteOptions(), key1, value1); +// db->Put(WriteOptions(), key1, value1); - // 璇诲彇骞跺弽搴忓垪鍖? - std::string value_ret; - FieldArray res1; +// // 璇诲彇骞跺弽搴忓垪鍖? +// std::string value_ret; +// FieldArray res1; - db->Get(ReadOptions(), key1, &value_ret); - DeserializeValue(value_ret, &res1); - for(auto pr:res1){ - std::cout<Get(ReadOptions(), key1, &value_ret); +// DeserializeValue(value_ret, &res1); +// for(auto pr:res1){ +// std::cout<Delete(WriteOptions(),key1); +// db->Delete(WriteOptions(),key1); - std::cout<<"get serialized value done"< keys; - std::vector target_keys; - for(int i=0;i<10000;i++){ - std::string key=std::to_string(rand()%10000)+"_"+std::to_string(i);//random for generate nonincreasing keys - FieldArray fields={ - {"name", key}, - {"address", std::to_string(rand()%7)}, - {"phone", std::to_string(rand()%114514)} - }; - if(rand()%5==0){ - fields[0].second="special_key"; - target_keys.push_back(key); - } - keys.push_back(key); - db->Put(WriteOptions(),key,SerializeValue(fields)); - } - std::sort(target_keys.begin(),target_keys.end()); - std::vector key_res; - Get_keys_by_field(db,ReadOptions(),{"name", "special_key"},&key_res); - ASSERT_TRUE(CompareKey(key_res, target_keys)); - std::cout<<"get key by field done"<Delete(WriteOptions(),s); - } - delete db; -} +// std::cout<<"get serialized value done"< keys; +// std::vector target_keys; +// for(int i=0;i<10000;i++){ +// std::string key=std::to_string(rand()%10000)+"_"+std::to_string(i);//random for generate nonincreasing keys +// FieldArray fields={ +// {"name", key}, +// {"address", std::to_string(rand()%7)}, +// {"phone", std::to_string(rand()%114514)} +// }; +// if(rand()%5==0){ +// fields[0].second="special_key"; +// target_keys.push_back(key); +// } +// keys.push_back(key); +// db->Put(WriteOptions(),key,SerializeValue(fields)); +// } +// std::sort(target_keys.begin(),target_keys.end()); +// std::vector key_res; +// Get_keys_by_field(db,ReadOptions(),{"name", "special_key"},&key_res); +// ASSERT_TRUE(CompareKey(key_res, target_keys)); +// std::cout<<"get key by field done"<Delete(WriteOptions(),s); +// } +// delete db; +// } TEST(Test, LARGE_DATA_COMPACT_TEST) { DB *db; @@ -167,7 +167,7 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) { abort(); } std::vector values; - for(int i=0;i<500000;i++){ + for(int i=0;i<5000;i++){ std::string key=std::to_string(i); std::string value; for(int j=0;j<1000;j++){ @@ -176,7 +176,7 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) { values.push_back(value); db->Put(writeOptions,key,value); } - for(int i=0;i<500000;i++){ + for(int i=0;i<5000;i++){ std::string key=std::to_string(i); std::string value; Status s=db->Get(readOptions,key,&value);