diff --git a/CMakeLists.txt b/CMakeLists.txt index 705abce..a9b1c5e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -107,6 +107,7 @@ configure_file( include_directories( "${PROJECT_BINARY_DIR}/include" "." + "./third_party/googletest/googletest/include/" ) if(BUILD_SHARED_LIBS) diff --git a/db/builder.cc b/db/builder.cc index e6329e0..3d1ca0e 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -28,12 +28,32 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, return s; } + time_t nowTime; + time(&nowTime); + assert(nowTime > 0); + TableBuilder* builder = new TableBuilder(options, file); meta->smallest.DecodeFrom(iter->key()); Slice key; + ParsedInternalKey parsed; + meta->smallest_deadtime = UINT64_MAX; + meta->largest_deadtime = 0; for (; iter->Valid(); iter->Next()) { key = iter->key(); builder->Add(key, iter->value()); + //在构建sstable文件的时候,记录当前文件的生存期最大和最小值, + //这里要注意internalkey和metadata中对于没有生存期的表示的转换 + ParseInternalKey(key,&parsed); + if(parsed.deadTime == 0) parsed.deadTime = UINT64_MAX; + if(parsed.deadTime < nowTime) { + static int count = 0; + if(count % 1000 == 0) { + std::cout<<"count "<smallest_deadtime = std::min(meta->smallest_deadtime,parsed.deadTime); + meta->largest_deadtime = std::max(meta->largest_deadtime,parsed.deadTime); } if (!key.empty()) { meta->largest.DecodeFrom(key); diff --git a/db/db_impl.cc b/db/db_impl.cc index 7eed3f8..5a56c16 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -52,11 +52,12 @@ struct DBImpl::Writer { }; struct DBImpl::CompactionState { - // Files produced by compaction + // Files produced by compaction 这里的改动和filemetadata对应 struct Output { uint64_t number; uint64_t file_size; InternalKey smallest, largest; + uint64_t smallest_deadtime,largest_deadtime; }; Output* current_output() { return &outputs[outputs.size() - 1]; } @@ -536,7 +537,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } edit->AddFile(level, meta.number, meta.file_size, meta.smallest, - meta.largest); + meta.largest,meta.smallest_deadtime,meta.largest_deadtime); } CompactionStats stats; @@ -590,6 +591,7 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) { } } } + // max_level_with_files = config::kNumLevels - 1; //TODO:强制合并所有level中的sst,但是这么做不是很优雅 TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap for (int level = 0; level < max_level_with_files; level++) { TEST_CompactRange(level, begin, end); @@ -704,6 +706,31 @@ void DBImpl::BackgroundCall() { background_work_finished_signal_.SignalAll(); } +bool DBImpl::RemoveExpireTable() { + bool remove = false; + VersionEdit edit; + time_t nowTime; + time(&nowTime); + Version *base = versions_->current(); + base->Ref(); + for(int level = 0; level < config::kNumLevels; level ++) { + const std::vector &files = versions_->current()->Files(level); + for(auto meta:files) { + if(meta->largest_deadtime < nowTime) { + remove = true; + edit.RemoveFile(level,meta->number); + std::cout<<"remove file : "<number<<" from level : "<largest_deadtime<LogAndApply(&edit,&mutex_); + // RemoveObsoleteFiles(); + } + base->Unref(); + return remove; +} + void DBImpl::BackgroundCompaction() { mutex_.AssertHeld(); @@ -712,6 +739,10 @@ void DBImpl::BackgroundCompaction() { return; } + if(RemoveExpireTable()) { + return; + } + Compaction* c; bool is_manual = (manual_compaction_ != nullptr); InternalKey manual_end; @@ -740,7 +771,7 @@ void DBImpl::BackgroundCompaction() { FileMetaData* f = c->input(0, 0); c->edit()->RemoveFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, - f->largest); + f->largest,f->smallest_deadtime,f->largest_deadtime); status = versions_->LogAndApply(c->edit(), &mutex_); if (!status.ok()) { RecordBackgroundError(status); @@ -814,6 +845,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { out.number = file_number; out.smallest.Clear(); out.largest.Clear(); + out.smallest_deadtime = UINT64_MAX; + out.largest_deadtime = 0; compact->outputs.push_back(out); mutex_.Unlock(); } @@ -889,7 +922,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, - out.smallest, out.largest); + out.smallest, out.largest,out.smallest_deadtime,out.largest_deadtime); } return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } @@ -948,11 +981,20 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // Handle key/value, add to state, etc. bool drop = false; + time_t nowTime; + time(&nowTime); if (!ParseInternalKey(key, &ikey)) { // Do not hide error keys current_user_key.clear(); has_current_user_key = false; last_sequence_for_key = kMaxSequenceNumber; + } else if(ikey.deadTime != 0 && ikey.deadTime < nowTime){ + static int count = 0; + if(count % 1000 == 0) { + std::cout<<"count "<Compare(ikey.user_key, Slice(current_user_key)) != @@ -978,7 +1020,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // Therefore this deletion marker is obsolete and can be dropped. drop = true; } - last_sequence_for_key = ikey.sequence; } #if 0 @@ -990,7 +1031,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { compact->compaction->IsBaseLevelForKey(ikey.user_key), (int)last_sequence_for_key, (int)compact->smallest_snapshot); #endif - if (!drop) { // Open output file if necessary if (compact->builder == nullptr) { @@ -1003,6 +1043,17 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { compact->current_output()->smallest.DecodeFrom(key); } compact->current_output()->largest.DecodeFrom(key); + + ParsedInternalKey parsed; + ParseInternalKey(key,&parsed); + uint64_t &smallest_deadtime = compact->current_output()->smallest_deadtime; + uint64_t &largest_deadtime = compact->current_output()->largest_deadtime; + if(parsed.deadTime == 0) { + smallest_deadtime = UINT64_MAX; + } + smallest_deadtime = std::min(smallest_deadtime,parsed.deadTime); + largest_deadtime = std::max(largest_deadtime,parsed.deadTime); + compact->builder->Add(key, input->value()); // Close output file if it is big enough @@ -1206,6 +1257,11 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, return DB::Put(o, key, val, ttl); } +// Status DBImpl::Put(const WriteOptions& options, const Slice& key, +// const Slice& value, uint64_t ttl) { +// return DB::Put(options,key,value,ttl); +// } + Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { return DB::Delete(options, key); } @@ -1500,6 +1556,14 @@ Status DB::Put(const WriteOptions& opt, const Slice& key, return Write(opt, &batch); } +// //为了通过编译,忽略ttl +// Status DB::Put(const WriteOptions& options, const Slice& key, +// const Slice& value, uint64_t ttl) { +// WriteBatch batch; +// batch.Put(key, value); +// return Write(options, &batch); +// } + Status DB::Delete(const WriteOptions& opt, const Slice& key) { WriteBatch batch; batch.Delete(key); diff --git a/db/db_impl.h b/db/db_impl.h index ef81411..92ea869 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -145,6 +145,7 @@ class DBImpl : public DB { EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status DoCompactionWork(CompactionState* compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_); + bool RemoveExpireTable(); //if remove some table thne return true otherwise return false Status OpenCompactionOutputFile(CompactionState* compact); Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input); diff --git a/db/db_test.cc b/db/db_test.cc index 4f570a1..79478b0 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2117,6 +2117,9 @@ class ModelDB : public DB { Status Put(const WriteOptions& o, const Slice& k, const Slice& v, uint64_t ttl = 0) override { return DB::Put(o, k, v, ttl); } + // Status Put(const WriteOptions& o, const Slice& k,const Slice& v,uint64_t ttl) { + // return DB::Put(o,k,v); + // } Status Delete(const WriteOptions& o, const Slice& key) override { return DB::Delete(o, key); } diff --git a/db/dbformat.cc b/db/dbformat.cc index 686d04f..2ef75e2 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -75,16 +75,18 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const { r = -1; return r; } - + const uint64_t atime = DecodeFixed64(akey.data() + akey.size() - 16); + const uint64_t btime = DecodeFixed64(bkey.data() + bkey.size() - 16); //原本应该找到了,新加判断 - if((btag & 0b100) && (atag & 0b10)){ //一个是查询键,另一个有ttl - const uint64_t atime = DecodeFixed64(akey.data() + akey.size() - 16); - const uint64_t btime = DecodeFixed64(bkey.data() + bkey.size() - 16); - if(atime <= btime){//过期了继续找 - r = -1; - return r; - } - } + // if((btag & 0b100) && (atag & 0b10)){ //一个是查询键,另一个有ttl + // const uint64_t atime = DecodeFixed64(akey.data() + akey.size() - 16); + // const uint64_t btime = DecodeFixed64(bkey.data() + bkey.size() - 16); + // std::cout<<"atime:"<= 8); uint64_t num = DecodeFixed64(internal_key.data() + internal_key.size() - 8); uint8_t havettl = (num & 0b10) >> 1; @@ -195,7 +198,8 @@ inline bool ParseInternalKey(const Slice& internal_key, result->deadTime = 0; result->user_key = Slice(internal_key.data(), n - 8); } - return (c <= static_cast(kTypeValue)); + // return c <= 0b111; + return ((c & 0b1) <= static_cast(kTypeValue)); } // A helper class useful for DBImpl::Get() diff --git a/db/memtable.cc b/db/memtable.cc index e5c2d88..f27dca3 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -55,7 +55,23 @@ class MemTableIterator : public Iterator { ~MemTableIterator() override = default; bool Valid() const override { return iter_.Valid(); } - void Seek(const Slice& k) override { iter_.Seek(EncodeKey(&tmp_, k)); } + void Seek(const Slice& k) override { + iter_.Seek(EncodeKey(&tmp_, k)); + MemTable::KeyComparator comp_ = iter_.get_comparator(); + while(Valid()) { + Slice now = key(); + ParsedInternalKey parsed_k,parsed_now; + ParseInternalKey(k,&parsed_k); + ParseInternalKey(now,&parsed_now); + uint64_t deadtime_k = parsed_k.deadTime; + uint64_t deadtime_now = parsed_now.deadTime; + if(deadtime_k == 0) deadtime_k = UINT64_MAX; + if(deadtime_now == 0) deadtime_now = UINT64_MAX; + if(deadtime_k > deadtime_now) {Next();continue;}; + if(comp_.comparator.Compare(k,now) <= 0) return; + Next(); + } + } void SeekToFirst() override { iter_.SeekToFirst(); } void SeekToLast() override { iter_.SeekToLast(); } void Next() override { iter_.Next(); } @@ -115,13 +131,35 @@ void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key, std::memcpy(p, value.data(), val_size); assert(p + val_size == buf + encoded_len); table_.Insert(buf); - std::cout << "insert:" << key.ToString() << std::endl; + static int count = 0; + if(count++ % 1000 == 0) + std::cout<<"count: "< deadtime_now) { + iter.Next(); + continue; + } + std::cout<<"size : "<compare_;} + private: const SkipList* list_; Node* node_; diff --git a/db/version_edit.cc b/db/version_edit.cc index 356ce88..4ee796e 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -81,6 +81,8 @@ void VersionEdit::EncodeTo(std::string* dst) const { PutVarint64(dst, f.file_size); PutLengthPrefixedSlice(dst, f.smallest.Encode()); PutLengthPrefixedSlice(dst, f.largest.Encode()); + PutVarint64(dst,f.smallest_deadtime); + PutVarint64(dst,f.largest_deadtime); } } @@ -179,7 +181,9 @@ Status VersionEdit::DecodeFrom(const Slice& src) { if (GetLevel(&input, &level) && GetVarint64(&input, &f.number) && GetVarint64(&input, &f.file_size) && GetInternalKey(&input, &f.smallest) && - GetInternalKey(&input, &f.largest)) { + GetInternalKey(&input, &f.largest) && + GetVarint64(&input,&f.smallest_deadtime) && + GetVarint64(&input,&f.largest_deadtime)) { new_files_.push_back(std::make_pair(level, f)); } else { msg = "new-file entry"; @@ -250,6 +254,10 @@ std::string VersionEdit::DebugString() const { r.append(f.smallest.DebugString()); r.append(" .. "); r.append(f.largest.DebugString()); + r.append(" "); + AppendNumberTo(&r,f.smallest_deadtime); + r.append(" .. "); + AppendNumberTo(&r,f.largest_deadtime); } r.append("\n}\n"); return r; diff --git a/db/version_edit.h b/db/version_edit.h index 137b4b1..f965c3e 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -16,7 +16,8 @@ namespace leveldb { class VersionSet; struct FileMetaData { - FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) {} + FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0), + smallest_deadtime(UINT64_MAX),largest_deadtime(UINT64_MAX) {} int refs; int allowed_seeks; // Seeks allowed until compaction @@ -24,6 +25,9 @@ struct FileMetaData { uint64_t file_size; // File size in bytes InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table + //在FileMetaData中,使用Uint64MAX作为没有生存期的标志(表示生存期无限长) + uint64_t smallest_deadtime; //smallest deadtime + uint64_t largest_deadtime; //largest deadtime }; class VersionEdit { @@ -61,12 +65,15 @@ class VersionEdit { // REQUIRES: This version has not been saved (see VersionSet::SaveTo) // REQUIRES: "smallest" and "largest" are smallest and largest keys in file void AddFile(int level, uint64_t file, uint64_t file_size, - const InternalKey& smallest, const InternalKey& largest) { + const InternalKey& smallest, const InternalKey& largest, + uint64_t smallest_deadtime = UINT64_MAX, uint64_t largest_deadtime = UINT64_MAX) { FileMetaData f; f.number = file; f.file_size = file_size; f.smallest = smallest; f.largest = largest; + f.smallest_deadtime = smallest_deadtime; + f.largest_deadtime = largest_deadtime; new_files_.push_back(std::make_pair(level, f)); } diff --git a/db/version_set.cc b/db/version_set.cc index 4e37bf9..1bf9ddd 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -263,8 +263,10 @@ static void SaveValue(void* arg, const Slice& ikey, const Slice& v) { Saver* s = reinterpret_cast(arg); ParsedInternalKey parsed_key; if (!ParseInternalKey(ikey, &parsed_key)) { + // std::cout<<"corrupt get"<state = kCorrupt; } else { + std::cout<<"tar&found"<user_key.ToString()<ucmp->Compare(parsed_key.user_key, s->user_key) == 0) { s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted; if (s->state == kFound) { @@ -272,6 +274,7 @@ static void SaveValue(void* arg, const Slice& ikey, const Slice& v) { } } } + std::cout<<"state : "<state<& files = current_->files_[level]; for (size_t i = 0; i < files.size(); i++) { const FileMetaData* f = files[i]; - edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest); + edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest, + f->smallest_deadtime,f->largest_deadtime); } } diff --git a/db/version_set.h b/db/version_set.h index ea0c925..6b804f5 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -111,6 +111,8 @@ class Version { int NumFiles(int level) const { return files_[level].size(); } + const std::vector& Files(int level) const {return files_[level]; } + // Return a human readable string that describes this version's contents. std::string DebugString() const; diff --git a/port/port.h b/port/port.h index 4b247f7..c2073f3 100644 --- a/port/port.h +++ b/port/port.h @@ -7,6 +7,9 @@ #include +#ifndef LEVELDB_PLATFORM_POSIX + #define LEVELDB_PLATFORM_POSIX +#endif // Include the appropriate platform specific file below. If you are // porting to a new platform, see "port_example.h" for documentation // of what the new port_.h file must provide. diff --git a/table/block.cc b/table/block.cc index 3b15257..0a041e3 100644 --- a/table/block.cc +++ b/table/block.cc @@ -14,6 +14,7 @@ #include "table/format.h" #include "util/coding.h" #include "util/logging.h" +#include "db/dbformat.h" namespace leveldb { @@ -216,10 +217,20 @@ class Block::Iter : public Iterator { SeekToRestartPoint(left); } // Linear search (within restart block) for first key >= target + //处理deadtime:从当前位置向后找到最新的未死亡的key while (true) { if (!ParseNextKey()) { return; } + ParsedInternalKey parsed_target,parsed_key_; + ParseInternalKey(target,&parsed_target); + ParseInternalKey(key_,&parsed_key_); + uint64_t deadtime_tar = parsed_target.deadTime; + uint64_t deadtime_key_ = parsed_key_.deadTime; + if(deadtime_tar == 0) deadtime_tar = UINT64_MAX; + if(deadtime_key_ == 0) deadtime_key_ = UINT64_MAX; + std::cout<<"key :"< deadtime_key_) continue; if (Compare(key_, target) >= 0) { return; } diff --git a/test/db_test2.cc b/test/db_test2.cc index 49e26ab..4b2b33c 100644 --- a/test/db_test2.cc +++ b/test/db_test2.cc @@ -68,6 +68,7 @@ int main() { GetData(db); delete db; } + DestroyDB("testdb",Options()); return 0; } diff --git a/test/ttl_mmtable_test.cc b/test/ttl_mmtable_test.cc index dccf9ed..5b44033 100644 --- a/test/ttl_mmtable_test.cc +++ b/test/ttl_mmtable_test.cc @@ -8,7 +8,7 @@ using namespace leveldb; constexpr int value_size = 2048; -constexpr int data_size = 2048 << 2; +constexpr int data_size = 2048 << 15; Status OpenDB(std::string dbName, DB **db) { Options options; @@ -92,7 +92,7 @@ DB *db; uint64_t ttl1 = 3; uint64_t ttl2 = 5; - InsertData(db, ttl2); + // InsertData(db, ttl2); InsertData(db, ttl1, 2); //都没过期先找到后插的 @@ -101,8 +101,8 @@ DB *db; //再找到前一次 Env::Default()->SleepForMicroseconds(3 * 1000000); - GetData(db, false); - + GetData(db, true); + DestroyDB("testdb",Options()); delete(db); printf("-----closing-----\n"); printf("success!\n"); diff --git a/test/ttl_test.cc b/test/ttl_test.cc index 12cd583..40d38d4 100644 --- a/test/ttl_test.cc +++ b/test/ttl_test.cc @@ -26,7 +26,8 @@ void InsertData(DB *db, uint64_t ttl/* second */) { int key_ = rand() % key_num+1; std::string key = std::to_string(key_); std::string value(value_size, 'a'); - db->Put(writeOptions, key, value, ttl); + db->Put(writeOptions, std::to_string(i+1), value, ttl); + // db->Put(writeOptions, key, value, ttl); } } @@ -45,13 +46,14 @@ void GetData(DB *db, int size = (1 << 30)) { } TEST(TestTTL, ReadTTL) { + DestroyDB("testdb",Options()); DB *db; if(OpenDB("testdb", &db).ok() == false) { std::cerr << "open db failed" << std::endl; abort(); } - uint64_t ttl = 20; + uint64_t ttl = 15; InsertData(db, ttl); @@ -64,47 +66,53 @@ TEST(TestTTL, ReadTTL) { std::string key = std::to_string(key_); std::string value; status = db->Get(readOptions, key, &value); + std::cout<SleepForMicroseconds(ttl * 1000000); + Env::Default()->SleepForMicroseconds((ttl+1) * 1000000); for (int i = 0; i < 100; i++) { int key_ = rand() % key_num+1; std::string key = std::to_string(key_); std::string value; status = db->Get(readOptions, key, &value); + std::cout<GetApproximateSizes(ranges, 1, sizes); -// ASSERT_GT(sizes[0], 0); + leveldb::Range ranges[1]; + ranges[0] = leveldb::Range("-", "A"); + uint64_t sizes[1]; + db->GetApproximateSizes(ranges, 1, sizes); + ASSERT_GT(sizes[0], 0); -// Env::Default()->SleepForMicroseconds(ttl * 1000000); + Env::Default()->SleepForMicroseconds((ttl+1) * 1000000); + // Env::Default()->SleepForMicroseconds(ttl * 1000000); -// db->CompactRange(nullptr, nullptr); + db->CompactRange(nullptr, nullptr); -// leveldb::Range ranges[1]; -// ranges[0] = leveldb::Range("-", "A"); -// uint64_t sizes[1]; -// db->GetApproximateSizes(ranges, 1, sizes); -// ASSERT_EQ(sizes[0], 0); -// } + // leveldb::Range ranges[1]; + ranges[0] = leveldb::Range("-", "A"); + // uint64_t sizes[1]; + db->GetApproximateSizes(ranges, 1, sizes); + ASSERT_EQ(sizes[0], 0); + delete db; +} int main(int argc, char** argv) {