diff --git a/db/db_impl.cc b/db/db_impl.cc index 5f87425..33c95c6 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4,6 +4,7 @@ #include "db/db_impl.h" +#include "db/version_edit.h" #include #include #include @@ -27,12 +28,16 @@ #include "leveldb/db.h" #include "leveldb/env.h" #include "leveldb/filter_policy.h" +#include "leveldb/iterator.h" +#include "leveldb/options.h" #include "leveldb/slice.h" #include "leveldb/status.h" #include "leveldb/table.h" #include "leveldb/table_builder.h" #include "leveldb/write_batch.h" #include "port/port.h" +#include "port/port_stdcxx.h" +#include "port/thread_annotations.h" #include "table/block.h" #include "table/merger.h" #include "table/two_level_iterator.h" @@ -271,6 +276,9 @@ void DBImpl::RemoveObsoleteFiles() { // be recorded in pending_outputs_, which is inserted into "live" keep = (live.find(number) != live.end()); break; + case kKVLogFile: + keep = (live.find(number) != live.end()); + break; case kCurrentFile: case kDBLockFile: case kInfoLogFile: @@ -542,7 +550,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, if (s.ok() && meta.file_size > 0) { const Slice min_user_key = meta.smallest.user_key(); const Slice max_user_key = meta.largest.user_key(); - if (base != nullptr) { + if (base != nullptr && !only_Level0) { level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } edit->AddFile(level, meta.number, meta.file_size, meta.smallest, @@ -550,6 +558,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, } edit->AddKVLogs(imm_kvlogfile_number); + pending_outputs_.erase(imm_kvlogfile_number); imm_kvlogfile_number = 0; delete imm_kvlogfile_; @@ -718,6 +727,154 @@ void DBImpl::BackgroundCall() { background_work_finished_signal_.SignalAll(); } +bool DBImpl::CollectKVLogs() { + mutex_.AssertHeld(); + Version *current = versions_->current(); + current->Ref(); + int total_files = 0; + for(int i = 0; i < config::kNumLevels; i++) { + total_files += current->NumFiles(i); + } + //这个判断保证了所有被选择回收的kvlog都是不对应于mem、imm、L0文件的 + if(current->kvlogs_.size() < total_files + 10) { + std::cout << "kvlogs not enough : " << current->kvlogs_.size() << " " << total_files << std::endl; + current->Unref(); + return false; + } + Log(options_.info_log,"CollectKVLogs Start\n"); + std::vector &kvlogs = current->kvlogs_; + + FileMetaData metaSST, metaKVLog; + metaSST.number = versions_->NewFileNumber(); + metaKVLog.number = versions_->NewFileNumber(); + pending_outputs_.insert(metaSST.number); + pending_outputs_.insert(metaKVLog.number); + + // WritableFile *fileSST; + // env_->NewWritableFile(TableFileName(dbname_, metaSST.number),&fileSST); + // TableBuilder *builder = new TableBuilder(options_,fileSST); + + WritableFile *fileKVLog; + env_->NewWritableFile(KVLogFileName(dbname_, metaKVLog.number), &fileKVLog); + KVLog kvlog_writer(fileKVLog,metaKVLog.number); + FilePointer fp; + + uint32_t seed; + SequenceNumber latest_sequence; + ReadOptions ro; + ro.decode = false; + Iterator *deep_iter = NewInternalDeepIterator(ro,&latest_sequence,&seed); + // Iterator *iter = NewDBIterator(this, user_comparator(), deep_iter, latest_sequence, seed); + + uint64_t budget = 10000; + SequenceNumber limit = latest_sequence + budget; + SequenceNumber now_seq = latest_sequence; + // std::cout << limit << " " << latest_sequence << std::endl; + versions_->SetLastSequence(limit); + only_Level0 = true; //保证后续的小合并的结果只会在level0 + mutex_.Unlock(); + + std::string saved_key_; + // ParsedInternalKey key_in_kvlog; + ParsedInternalKey key_in_sst; + WriteBatch write_batch,kp_batch; + MemTable *tempMem = new MemTable(internal_comparator_); + tempMem->Ref(); + std::set kvlogs_to_remove; + bool has_collected = false; + + for(int i = 0; i < kvlogs.size(); i++) { + FileMetaData *f = kvlogs[i]; + SequentialFile *file; + env_->NewSequentialFile(KVLogFileName(dbname_, f->number), &file); + KVLogReader reader(file); + for(int cnt = 0; reader.Valid() && now_seq <= limit; reader.Next(), cnt ++) { + // std::cout << "find KV: " << reader.Key().ToString() << " " << reader.Value().ToString() << " " << reader.Seq() << "\n"; + if(reader.Type() == kTypeDeletion) { + // std::cout << "is delete record\n"; + continue; + } + saved_key_.clear(); + AppendInternalKey(&saved_key_, + ParsedInternalKey(reader.Key(),latest_sequence,kValueTypeForSeek)); + deep_iter->Seek(saved_key_); + + // ParseInternalKey(reader.Key(), &key_in_kvlog); + ParseInternalKey(deep_iter->key(), &key_in_sst); + //当前的key已经由于合并没有了,kvlog中的记录失效 + if(internal_comparator_.user_comparator()->Compare(reader.Key(), key_in_sst.user_key) != 0) { + // std::cout << "has been compacted\n"; + continue; + } + //如果搜索到的key是delete,或者key的sequence比kvlog里面的大,那么kvlog中的就是失效状态 + //这里不可能搜到比kvlog里面还要小的情况 + if(key_in_sst.type == kTypeDeletion) { + // std::cout << "key delete \n"; + continue; + } + + if(key_in_sst.sequence > reader.Seq()) { + // printf("sst seq larger: sst:%ld kvlog:%ld\n",key_in_sst.sequence,reader.Seq()); + continue; + } + + has_collected = true; + // std::cout << "collected KV: " << reader.Key().ToString() << " " << reader.Value().ToString() << " " << reader.Seq() << "\n"; + if((limit - now_seq) % 100 == 0) { + std::cout << "collected KV: " << reader.Value().ToString() << " " << reader.Seq() << " " << cnt << "\n"; + std::cout << "rest budget : " << limit - now_seq << "\n"; + } + write_batch.Clear(); + kp_batch.Clear(); + write_batch.Put(reader.Key(), reader.Value()); + now_seq += 1; + WriteBatchInternal::SetSequence(&write_batch, now_seq); + kvlog_writer.AddRecord(WriteBatchInternal::Contents(&write_batch), fp); + WriteBatchInternal::ConstructKPBatch(&kp_batch, &write_batch, fp); + WriteBatchInternal::InsertInto(&kp_batch, tempMem); + } + if(reader.Valid()) break; + kvlogs_to_remove.insert(f->number); + delete file; + // break;//当前一次只回收一个kvlog + } + + VersionEdit edit; + Iterator *miter = nullptr; + if(has_collected) { + miter = tempMem->NewIterator(); + Status s = BuildTable(dbname_, env_, options_, + table_cache_, miter, &metaSST); + } + mutex_.Lock(); + if(has_collected) { + edit.AddFile(0, metaSST.number, metaSST.file_size, + metaSST.smallest, metaSST.largest); + edit.AddKVLogs(metaKVLog.number); + } + for(auto num : kvlogs_to_remove) { + edit.RemoveKVLogs(num); + } + Log(options_.info_log,"Add SST at Level0 : %ld\n",metaSST.number); + for(auto num : kvlogs_to_remove) { + Log(options_.info_log,"Collect kvlog : %ld\n",num); + printf("Collect kvlog : %ld\n",num); + } + Log(options_.info_log,"Merge to kvlog : %ld\n",metaKVLog.number); + + versions_->LogAndApply(&edit, &mutex_); + + delete miter; + tempMem->Unref(); + delete deep_iter; + delete fileKVLog; + pending_outputs_.erase(metaSST.number); + pending_outputs_.erase(metaKVLog.number); + only_Level0 = false; + current->Unref(); + return true; +} + void DBImpl::BackgroundCompaction() { mutex_.AssertHeld(); @@ -726,6 +883,10 @@ void DBImpl::BackgroundCompaction() { return; } + if(CollectKVLogs()) { + return; + } + Compaction* c; bool is_manual = (manual_compaction_ != nullptr); InternalKey manual_end; @@ -1091,6 +1252,20 @@ static void CleanupIteratorState(void* arg1, void* arg2) { delete state; } +struct DeepIterState { + port::Mutex* const mu; + Version* const version GUARDED_BY(mu); + DeepIterState(port::Mutex* mutex, Version* version) + : mu(mutex),version(version) {} +}; + +static void CleanupDeepIteratorState(void* arg1, void* args) { + DeepIterState* state = reinterpret_cast(arg1); + state->mu->AssertHeld(); + state->version->Unref(); + delete state; +} + } // anonymous namespace Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, @@ -1120,6 +1295,33 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, return internal_iter; } + +//在初始化和析构的时候都要保证mutex_.AssertHeld() +Iterator* DBImpl::NewInternalDeepIterator(const ReadOptions& options, + SequenceNumber* latest_snapshot, + uint32_t* seed) { + mutex_.AssertHeld(); + // mutex_.Lock(); + *latest_snapshot = versions_->LastSequence(); + + // Collect together all needed child iterators + std::vector list; + + versions_->current()->AddDeepIterators(options, &list); + Iterator* internal_iter = + NewMergingIterator(&internal_comparator_, &list[0], list.size()); + versions_->current()->Ref(); + + // IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current()); + // internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); + + DeepIterState *cleanup = new DeepIterState(&mutex_,versions_->current()); + internal_iter->RegisterCleanup(CleanupDeepIteratorState, cleanup, nullptr); + *seed = ++seed_; + // mutex_.Unlock(); + return internal_iter; +} + Iterator* DBImpl::TEST_NewInternalIterator() { SequenceNumber ignored; uint32_t ignored_seed; @@ -1292,6 +1494,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { WriteBatch* write_batch = BuildBatchGroup(&last_writer); WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); last_sequence += WriteBatchInternal::Count(write_batch); + versions_->SetLastSequence(last_sequence); // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging @@ -1333,7 +1536,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { } if (write_batch == tmp_batch_) tmp_batch_->Clear(); - versions_->SetLastSequence(last_sequence); + // versions_->SetLastSequence(last_sequence); } while (true) { @@ -1355,6 +1558,15 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { // NoWaiting_elapsed += env_->NowMicros() - start_; // Nowaited_count ++; // dumpStatistics(); + // SequentialFile *file; + // env_->NewSequentialFile(KVLogFileName(dbname_, kvlogfile_number_), &file); + // KVLogReader reader(file); + // std::cout << "==============tail check begin==============\n"; + // for( ;reader.Valid(); reader.Next()) { + // printf("key:%s, value:%s, seq:%ld\n",reader.Key().ToString().c_str(),reader.Value().ToString().c_str(),reader.Seq()); + // } + // std::cout << "==============tail check end==============\n"; + // delete file; return status; } @@ -1675,6 +1887,7 @@ Status DestroyDB(const std::string& dbname, const Options& options) { if (ParseFileName(filenames[i], &number, &type) && type != kDBLockFile) { // Lock file will be deleted at end Status del = env->RemoveFile(dbname + "/" + filenames[i]); + // std::cout << "remove file : " << filenames[i] << std::endl; if (result.ok() && !del.ok()) { result = del; } diff --git a/db/db_impl.h b/db/db_impl.h index be4abc2..cbced56 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -18,6 +18,7 @@ #include "db/snapshot.h" #include "leveldb/db.h" #include "leveldb/env.h" +#include "leveldb/options.h" #include "leveldb/slice.h" #include "leveldb/write_batch.h" #include "port/port.h" @@ -85,7 +86,7 @@ class DBImpl : public DB { // bytes. void RecordReadSample(Slice key); - Slice GetValueFromFP(const FilePointer &fp,std::string *value); + private: friend class DB; @@ -121,6 +122,10 @@ class DBImpl : public DB { SequenceNumber* latest_snapshot, uint32_t* seed); + Iterator* NewInternalDeepIterator(const ReadOptions&, + SequenceNumber* latest_snapshot, + uint32_t* seed); + Status NewDB(); // Recover the descriptor from persistent storage. May do a significant @@ -166,6 +171,10 @@ class DBImpl : public DB { Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input); Status InstallCompactionResults(CompactionState* compact) EXCLUSIVE_LOCKS_REQUIRED(mutex_); +/*about kvlogs*/ + Slice GetValueFromFP(const FilePointer &fp,std::string *value); + int triggers = 0; + bool CollectKVLogs(); const Comparator* user_comparator() const { return internal_comparator_.user_comparator(); @@ -202,6 +211,7 @@ class DBImpl : public DB { KVLog *kvlog_; WritableFile* imm_kvlogfile_; uint64_t imm_kvlogfile_number; + bool only_Level0 GUARDED_BY(mutex_) = false; /*kvlog end*/ uint32_t seed_ GUARDED_BY(mutex_); // For sampling. diff --git a/db/version_set.cc b/db/version_set.cc index 8a9fceb..ce7df85 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -246,6 +246,18 @@ void Version::AddIterators(const ReadOptions& options, } } +void Version::AddDeepIterators(const ReadOptions& options, + std::vector* iters) { + // For levels > 0, we can use a concatenating iterator that sequentially + // walks through the non-overlapping files in the level, opening them + // lazily. + for (int level = 1; level < config::kNumLevels; level++) { + if (!files_[level].empty()) { + iters->push_back(NewConcatenatingIterator(options, level)); + } + } +} + // Callback from TableCache::Get() namespace { enum SaverState { @@ -737,12 +749,12 @@ class VersionSet::Builder { //合并一个版本的kvlog以及当前edit的新增,但是不包含被删除的kvlog for(const auto &kvlog : base_->kvlogs_) { - if(!deleted_kvlogs.count(kvlog->number)) continue; + if(deleted_kvlogs.count(kvlog->number)) continue; v->kvlogs_.push_back(kvlog); kvlog->refs++; } for(const auto &kvlog : added_kvlogs) { - if(!deleted_kvlogs.count(kvlog->number)) continue; + if(deleted_kvlogs.count(kvlog->number)) continue; v->kvlogs_.push_back(kvlog); kvlog->refs++; } diff --git a/db/version_set.h b/db/version_set.h index b70117e..0affea7 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -15,6 +15,7 @@ #ifndef STORAGE_LEVELDB_DB_VERSION_SET_H_ #define STORAGE_LEVELDB_DB_VERSION_SET_H_ +#include "db/db_impl.h" #include #include #include @@ -69,6 +70,7 @@ class Version { // yield the contents of this Version when merged together. // REQUIRES: This version has been saved (see VersionSet::SaveTo) void AddIterators(const ReadOptions&, std::vector* iters); + void AddDeepIterators(const ReadOptions&, std::vector* iters); // Lookup the value for key. If found, store it in *val and // return OK. Else return a non-OK status. Fills *stats. @@ -118,6 +120,7 @@ class Version { private: friend class Compaction; friend class VersionSet; + friend class DBImpl; class LevelFileNumIterator; diff --git a/include/leveldb/options.h b/include/leveldb/options.h index d755f46..d3a76e2 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -162,6 +162,8 @@ struct LEVELDB_EXPORT ReadOptions { // not have been released). If "snapshot" is null, use an implicit // snapshot of the state at the beginning of this read operation. const Snapshot* snapshot = nullptr; + + bool decode = true; //将FilePointer解析成为实际的value }; // Options that control write operations diff --git a/kv_sep/kvlog.cc b/kv_sep/kvlog.cc index e08e7b4..be09aa4 100644 --- a/kv_sep/kvlog.cc +++ b/kv_sep/kvlog.cc @@ -49,6 +49,7 @@ Status KVLog::AddRecord(const Slice &slice, FilePointer &fp) { void KVLogReader::Next() { if(input.empty()) { NextWriteBatch(); + if(!Valid()) return; } NextKV(); } @@ -72,11 +73,12 @@ void KVLogReader::NextWriteBatch() { valid = false; return; } - seq = DecodeFixed64(input.data()); + seq = DecodeFixed64(input.data()) - 1;//-1是为了和后面的nextkv的实现对齐 input.remove_prefix(12); //remove writebatch header } void KVLogReader::NextKV() { + seq++; type = (ValueType)input[0]; input.remove_prefix(1); switch (type) { diff --git a/kv_sep/kvlog.h b/kv_sep/kvlog.h index cce73bb..8b8dffc 100644 --- a/kv_sep/kvlog.h +++ b/kv_sep/kvlog.h @@ -40,12 +40,17 @@ class KVLogReader { public: KVLogReader(SequentialFile *file):file(file),rep_(nullptr),rep_size(0),valid(true) { Next();}; + + ~KVLogReader() { + delete rep_; + } + ValueType Type() {return type;} Slice Key() {return key;} Slice Value() {return value;} SequenceNumber Seq() {return seq;} + bool Valid() {return valid;} void Next(); - void Valid(); private: void NextWriteBatch(); void NextKV(); diff --git a/test/test.cc b/test/test.cc index fe15fde..9ba1444 100644 --- a/test/test.cc +++ b/test/test.cc @@ -1,5 +1,6 @@ #include "leveldb/db.h" #include "leveldb/options.h" +#include "util/random.h" #include #include @@ -10,8 +11,8 @@ int main() { DB* db = nullptr; Options op; op.create_if_missing = true; - DestroyDB("testdb", Options()); - Status status = DB::Open(op, "testdb", &db); + DestroyDB("testdb_kv", Options()); + Status status = DB::Open(op, "testdb_kv", &db); // assert(status.ok()); // string key = "leveldb",value = to_string(0); @@ -52,9 +53,21 @@ int main() { // cout<<"without snapshot:"<ReleaseSnapshot(snapshot); - string key = "abc",value = "leveldb"; - for(int i = 0; i < 20; i++) { - db->Put(WriteOptions(),key + std::to_string(i),value + std::to_string(i)); + string key,value = "leveldb"; + key.resize(1000,'a'); + // value.resize(10,'b'); + value += "index"; + Random ran(0); + for(int i = 0; i < 100000; i++) { + for(int j = 0; j < 20; j++) { + int rand = ran.Uniform(200 * 10); + string tk = key + std::to_string(rand); + string tv = value + std::to_string(rand) + "_" + std::to_string(i); + db->Put(WriteOptions(),tk,tv); + } + if(i && i%1000 ==0) { + std::cout << "iteration:" << i << std::endl; + } } std::string res; for(int i = 0; i < 10; i++) {