diff --git a/db/db_impl.cc b/db/db_impl.cc index ba5ef9e..99b98d6 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -820,7 +820,7 @@ void DBImpl::BackgroundCompaction() { CleanupCompaction(compact); c->ReleaseInputs(); RemoveObsoleteFiles(); - MaybeScheduleGarbageCollect(); + //MaybeScheduleGarbageCollect(); } delete c; @@ -1354,17 +1354,17 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { while (true) { Writer* ready = writers_.front(); writers_.pop_front(); - //if (ready != &w) { - ready->status = status; - ready->done = true; - ready->cv.SignalAll(); - //} + if (ready != &w) { + ready->status = status; + ready->done = true; + ready->cv.Signal(); + } if (ready == last_writer) break; } // Notify new head of write queue if (!writers_.empty()) { - writers_.front()->cv.SignalAll(); + writers_.front()->cv.Signal(); } return status; @@ -1857,14 +1857,7 @@ void DBImpl::GarbageCollect() { valuelog_finding_key=key; spj_mutex_.Unlock(); //wait for current writer queue to do all their thing - mutex_.Lock(); - if(writers_.size()>0){ - auto last_writer=writers_.back(); - while(!last_writer->done){ - last_writer->cv.Wait(); - } - } - mutex_.Unlock(); + Write(leveldb::WriteOptions(), nullptr); auto option=leveldb::ReadOptions(); option.find_value_log_for_gc = true; diff --git a/test/test.cpp b/test/test.cpp index 7b1e506..b0d17c3 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -37,7 +37,6 @@ TEST(Test, CheckGetFields) { db->Put(WriteOptions(), key1, value1); - // 璇诲彇骞跺弽搴忓垪鍖? std::string value_ret; FieldArray res1; @@ -101,23 +100,23 @@ TEST(Test, LARGE_DATA_COMPACT_TEST) { for(int i=0;i<500000;i++){ std::string key=std::to_string(i); std::string value; - for(int j=0;j<1000;j++){ + for(int j=0;j<5000;j++){ value+=std::to_string(i); } values.push_back(value); db->Put(writeOptions,key,value); } - // for(int i=0;i<500000;i++){ - // std::string key=std::to_string(i); - // std::string value; - // Status s=db->Get(readOptions,key,&value); - // assert(s.ok()); - // if(values[i]!=value){ - // std::cout<Get(readOptions,key,&value); + assert(s.ok()); + if(values[i]!=value){ + std::cout<> WriteValueLog(std::vector value); @@ -349,7 +369,7 @@ std::vector> WriteValueLog(std::vector [!NOTE] > -> 在第三版设计中,valuelog中会存储key,所以有部分改动。 +> 在第三版设计中,valuelog中会存储key,所以还需要传入key @@ -357,8 +377,6 @@ std::vector> WriteValueLog(std::vector [!NOTE] > -> 在第三版设计中,valuelog中会存储key,所以有部分改动。 - - +> 在第三版设计中,valuelog中会存储key,所以还会多一个返回参数Key ##### **4.2.3 测试GC** @@ -384,7 +400,7 @@ void DBImpl::TEST_GarbageCollect() ##### **4.2.4 调用线程进行GC** -启动一个新的后台线程执行`BGWorkGC`方法。这里使用了`gc_mutex_.Lock()`来确保线程安全。 +启动一个新的后台线程执行`BGWorkGC`方法。使用互斥锁gc_mutex_确保同时最多只有一个GC后台线程会进行。 ```cpp void DBImpl::MaybeScheduleGarbageCollect() @@ -404,7 +420,9 @@ void DBImpl::BGWorkGC(void* db) ##### **4.2.6 后台GC函数** -负责执行后台垃圾回收任务。它确保在进行垃圾回收时,只有一个线程能够访问共享资源,并且在完成任务后通知等待的线程。 +负责执行后台垃圾回收任务(即调用GarbageCollect)。确保在完成任务后通知等待的线程。 + +(当前等待的函数仅两个:Test与DBImpl的析构函数) ```cpp void DBImpl::BackgroundGarbageCollect() @@ -422,6 +440,28 @@ void DBImpl::GarbageCollect() +##### 4.2.8 改动TableMeta(用于valuelog per table) + +对TableMeta新增一个uint64_t的属性valuelog_id,表示该SSTable所对应的valuelog id。如果该SSTable内所有Value长度均小于ValueLog要求长度,则该属性值为0。 + +这个设计对versionEdit以及compact的各个函数均有影响。 + + + +##### 4.2.9 class ValueLogInserter : public WriteBatch::Handler{} + +用于将一个WriteBatch里的所有键值对插入到ValueLog里,并生成将Value指向ValueLog的新WriteBatch。 + + + +##### 4.2.10 Status WriteBatchInternal::ConverToValueLog(WriteBatch* b,DB* db_) + +使用ValueLogInserter将b中的键值对更新至ValueLog,并将更新后的WriteBatch放回b。 + +该函数用于Write中的队首线程build batch后且写WAL日志/写Memtable前。 + + + ------ @@ -432,7 +472,7 @@ void DBImpl::GarbageCollect() #### 依据我们的设计,每周的工作内容完成后,都将对当前完成的功能进行正确性检验。以下以第一周我们完成的功能为例: -#### 第一周 +#### 第一周(已完成) **字段数组的存储与读取:** @@ -446,80 +486,95 @@ void DBImpl::GarbageCollect() 并未额外设计,通过上两个功能的正确运行能够证明Key Value分离的初步实现大体是正确的。 -```c++ -#include "gtest/gtest.h" -#include "leveldb/env.h" -#include "leveldb/db.h" -using namespace leveldb; - -constexpr int value_size = 2048; -constexpr int data_size = 128 << 20; - -Status OpenDB(std::string dbName, DB **db) { - Options options; - options.create_if_missing = true; - return DB::Open(options, dbName, db); -} - -TEST(TestTTL, OurTTL) { - DB *db; - WriteOptions writeOptions; - ReadOptions readOptions; - if(OpenDB("testdb_for_XOY", &db).ok() == false) { - std::cerr << "open db failed" << std::endl; - abort(); - } - std::string key = "k_1"; - - std::string key1 = "k_2"; - - FieldArray fields = { - {"name", "Customer#000000001"}, - {"address", "IVhzIApeRb"}, - {"phone", "25-989-741-2988"} - }; - - FieldArray fields1 = { - {"name", "Customer#000000001"}, - {"address", "abc"}, - {"phone", "def"} - }; - - db->Put_with_fields(WriteOptions(), key, fields); - - db->Put_with_fields(WriteOptions(), key1, fields1); - - // 读取并反序列化 - FieldArray value_ret; - db->Get_with_fields(ReadOptions(), key, &value_ret);; - for(auto pr:value_ret){ - std::cout< v; - db->Get_keys_by_field(ReadOptions(),fields[0],&v); - for(auto s:v)std::cout< keys; + std::vector target_keys; + for(int i=0;i<10000;i++){ + std::string key=std::to_string(rand()%10000)+"_"+std::to_string(i);//random for generate nonincreasing keys + FieldArray fields={ + {"name", key}, + {"address", std::to_string(rand()%7)}, + {"phone", std::to_string(rand()%114514)} + }; + if(rand()%5==0){ + fields[0].second="special_key"; + target_keys.push_back(key); + } + keys.push_back(key); + db->Put(WriteOptions(),key,SerializeValue(fields)); + } + std::sort(target_keys.begin(),target_keys.end()); + std::vector key_res; + Get_keys_by_field(db,ReadOptions(),{"name", "special_key"},&key_res); + ASSERT_TRUE(CompareKey(key_res, target_keys)); +``` -**对KV分离实现更细粒度的测试,以及对KV分离GC操作实现测试** +2.向表内插入大量value较大的键值对后检验正确性(该测试被leveldb原benchmark完美取代。使用leveldb原benchmark获得了正确性保障) -1.向表内插入一些value较小的键值对以及value较大的键值对,随后通过检查ValueLog内部数据(也可以是ValueLog文件长度)来判断是否对长短数据各自进行了处理。 +对于for(int j=0;j<5000;j++)这一段,我们也尝试过将5000改成rand()%1000,同样通过了测试。 -2.向表内插入大量value较大的键值对后,查询ValueLog文件总数,删除其中绝大多数键值对,然后再查一次ValueLog文件总数,期望文件总数变少。 +``` +std::vector values; + for(int i=0;i<500000;i++){ + std::string key=std::to_string(i); + std::string value; + for(int j=0;j<5000;j++){ + value+=std::to_string(i); + } + values.push_back(value); + db->Put(writeOptions,key,value); + } + for(int i=0;i<500000;i++){ + std::string key=std::to_string(i); + std::string value; + Status s=db->Get(readOptions,key,&value); + assert(s.ok()); + if(values[i]!=value){ + std::cout< values; + for(int i=0;i<5000;i++){ + std::string key=std::to_string(i); + std::string value; + for(int j=0;j<1000;j++){ + value+=std::to_string(i); + } + values.push_back(value); + db->Put(writeOptions,key,value); + } + std::cout<<"start gc"<TEST_GarbageCollect(); + std::cout<<"finish gc"<Get(readOptions,key,&value); + assert(s.ok()); + if(values[i]!=value){ + std::cout<