#include "fielddb/field_db.h" #include #include #include #include #include #include #include #include #include "leveldb/c.h" #include "leveldb/cache.h" #include "leveldb/db.h" #include "leveldb/env.h" #include "leveldb/iterator.h" #include "leveldb/options.h" #include "leveldb/slice.h" #include "leveldb/status.h" #include "leveldb/write_batch.h" #include "db/write_batch_internal.h" #include "util/coding.h" #include "util/mutexlock.h" #include "util/serialize_value.h" #include "fielddb/encode_index.h" #include "fielddb/meta.h" #include "field_db.h" namespace fielddb { using namespace leveldb; //TODO:打开fieldDB Status FieldDB::OpenFieldDB(Options& options, const std::string& name, FieldDB** dbptr) { // options.env->CreateDir("./abc") if(*dbptr == nullptr){ return Status::NotSupported(name, "new a fieldDb first\n"); } // Status status; DB *indexdb, *kvdb, *metadb; // options.block_cache = NewLRUCache(ULONG_MAX); // options.max_open_files = 1000; // options.write_buffer_size = 512 * 1024 * 1024; // options.env = getPosixEnv(); status = Open(options, name+"_indexDB", &indexdb); if(!status.ok()) return status; // options.env = getPosixEnv(); status = Open(options, name+"_kvDB", &kvdb); if(!status.ok()) return status; // options.env = getPosixEnv(); status = Open(options, name+"_metaDB", &metadb); if(!status.ok()) return status; (*dbptr)->indexDB_ = indexdb; (*dbptr)->kvDB_ = kvdb; (*dbptr)->metaDB_ = metadb; (*dbptr)->dbname_ = name; status = (*dbptr)->Recover(); (*dbptr)->options_ = &options; (*dbptr)->env_ = options.env; return status; } Status FieldDB::Recover() { //1. 遍历所有Index类型的meta,重建内存中的index_状态表 Iterator *Iter = indexDB_->NewIterator(ReadOptions()); std::string IndexKey; Iter->SeekToFirst(); while(Iter->Valid()) { IndexKey = Iter->key().ToString(); ParsedInternalIndexKey ParsedIndex; ParseInternalIndexKey(Slice(IndexKey),&ParsedIndex); index_[ParsedIndex.name_.ToString()] = {Exist,nullptr}; //构建下一个搜索的对象,在原来的fieldname的基础上加一个最大的ascii字符(不可见字符) std::string Seek; PutLengthPrefixedSlice(&Seek, ParsedIndex.name_); Seek.push_back(0xff); Iter->Seek(Slice(Seek)); } delete Iter; //2. 寻找所有KV类型的meta,再次提交一遍请求 Iter = metaDB_->NewIterator(ReadOptions()); Slice MetaValue; Iter->SeekToFirst(); while (Iter->Valid()) { MetaValue = Iter->key(); MetaType type = MetaType(DecodeFixed32(MetaValue.data())); MetaValue.remove_prefix(4);//移除头上的metaType的部分 Slice extractKey; GetLengthPrefixedSlice(&MetaValue, &extractKey); if(type == KV_Creating) { FieldArray fields; ParseValue(Iter->value().ToString(), &fields); PutFields(WriteOptions(), extractKey, fields); } else if(type == KV_Deleting) { Delete(WriteOptions(), extractKey); } else { assert(0 && "Invalid MetaType"); } Iter->Next(); } delete Iter; //在所有的请求完成后,会自动把metaDB的内容清空。 Iter = metaDB_->NewIterator(ReadOptions()); Iter->SeekToFirst(); //std::cout << "Iter Valid : " << Iter->Valid() << std::endl; delete Iter; //3. 等待所有请求完成 return Status::OK(); } Request *FieldDB::GetHandleInterval() { mutex_.AssertHeld(); //保证队列是互斥访问的 Request *tail = taskqueue_.front(); for(auto *req_ptr : taskqueue_) { if(req_ptr->isiDeleteReq() || req_ptr->isiCreateReq()) { return tail; } tail = req_ptr; } return tail; } Status FieldDB::HandleRequest(Request &req, const WriteOptions &op) { uint64_t start_ = env_->NowMicros(); MutexLock L(&mutex_); taskqueue_.push_back(&req); while(true){ while(!req.done && &req != taskqueue_.front()) { req.cond_.Wait(); } if(req.done) { return req.s; //在返回时自动释放锁L } Request *tail = GetHandleInterval(); WriteBatch KVBatch,IndexBatch,MetaBatch; std::unordered_set batchKeySet; Status status; if(!tail->isiCreateReq() && !tail->isiDeleteReq()) { //表明这一个区间并没有涉及index的创建删除 { //1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。 MutexLock iL(&index_mu); uint64_t start_construct = env_->NowMicros(); for(auto *req_ptr : taskqueue_) { req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, batchKeySet); if(req_ptr == tail) break; } construct_elapsed += env_->NowMicros() - start_construct; } //2. 首先写入meta,再并发写入index和kv,完成之后清除meta数据 //此处可以放锁是因为写入的有序性可以通过队列来保证 mutex_.Unlock(); uint64_t start_write = env_->NowMicros(); if(MetaBatch.ApproximateSize() > 12) { uint64_t start_meta = env_->NowMicros(); status = metaDB_->Write(op, &MetaBatch); write_meta_elapsed += env_->NowMicros() - start_meta; write_bytes += MetaBatch.ApproximateSize(); assert(status.ok()); } //TODO:index的写入需要在另外一个线程中同时完成 if(IndexBatch.ApproximateSize() > 12) { uint64_t start_index = env_->NowMicros(); status = indexDB_->Write(op, &IndexBatch); write_index_elapsed += env_->NowMicros() - start_index; write_bytes += IndexBatch.ApproximateSize(); assert(status.ok()); } if(KVBatch.ApproximateSize() > 12) { uint64_t start_kv = env_->NowMicros(); status = kvDB_->Write(op, &KVBatch); write_kv_elapsed += env_->NowMicros() - start_kv; write_bytes += KVBatch.ApproximateSize(); assert(status.ok()); } //3. 将meta数据清除 if(MetaBatch.ApproximateSize() > 12) { uint64_t start_clean = env_->NowMicros(); MetaCleaner cleaner; cleaner.Collect(MetaBatch); cleaner.CleanMetaBatch(metaDB_); write_clean_elapsed += env_->NowMicros() - start_clean; } write_elapsed += env_->NowMicros() - start_write; mutex_.Lock(); } else { //对于创建和删除索引的请求,通过prepare完成索引状态的更新 MutexLock iL(&index_mu); req.Prepare(this); } // { // static int count = 0; // if(count++ % 100000 == 0) { // std::cout << "TaskQueue Size : " << taskqueue_.size() << std::endl; // } // } while(true) { Request *ready = taskqueue_.front(); // int debug = tail->type_; taskqueue_.pop_front(); //当前ready不是队首,不是和index的创建有关 if(!ready->isPending() && !req.isiCreateReq() && !req.isiDeleteReq()) { ready->s = status; ready->done = true; if (ready != &req) ready->cond_.Signal(); } if (ready == tail) break; } elapsed += env_->NowMicros() - start_; count ++; //dumpStatistics(); if(!taskqueue_.empty()) { taskqueue_.front()->cond_.Signal(); } //如果done==true,那么就不会继续等待直接退出 //如果处于某个请求的pending list里面,那么就会继续等待重新入队 } } // 这里把一个空串作为常规put的name Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) { FieldArray FA = {{"",value.ToString()}}; return PutFields(options, key, FA); // return kvDB_->Put(options, key, value); } // 需要对是否进行index更新做处理 Status FieldDB::PutFields(const WriteOptions &Options, const Slice &key, const FieldArray &fields) { std::string key_ = key.ToString(); FieldArray fields_ = fields; FieldsReq req(&key_,&fields_,&mutex_); Status status = HandleRequest(req, Options); return status; } // 删除有索引的key时indexdb也要同步 Status FieldDB::Delete(const WriteOptions &options, const Slice &key) { std::string key_ = key.ToString(); DeleteReq req(&key_,&mutex_); Status status = HandleRequest(req, options); return status; } // 根据updates里面的东西,要对是否需要更新index进行分别处理 Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) { // { // uint64_t start_ = env_->NowMicros(); // Status status = kvDB_->Write(options, updates); // temp_elapsed += env_->NowMicros() - start_; // count ++; // dumpStatistics(); // return status; // } uint64_t start_ = env_->NowMicros(); BatchReq req(updates,&mutex_); construct_BatchReq_init_elapsed += env_->NowMicros() - start_; Status status = HandleRequest(req, options); return status; } //由于常规put将空串作为name,这里也需要适当修改 Status FieldDB::Get(const ReadOptions &options, const Slice &key, std::string *value) { FieldArray fields; Status s = GetFields(options, key, &fields); if(!s.ok()) { return s; } *value = fields[0].second; return s; } Status FieldDB::GetFields(const ReadOptions &options, const Slice &key, FieldArray *fields) { return kvDB_->GetFields(options, key, fields); } std::vector FieldDB::FindKeysByField(Field &field) { return kvDB_->FindKeysByField(field); } std::vector> FieldDB::FindKeysAndValByFieldName ( const std::string &fieldName){ std::vector> result; auto iter = kvDB_->NewIterator(ReadOptions()); std::string val; for(iter->SeekToFirst();iter->Valid();iter->Next()) { InternalFieldArray fields(iter->value()); val = fields.ValOfName(fieldName); if(!val.empty()) { result.push_back(std::make_pair(iter->key().ToString(), val)); } } delete iter; return result; } Status FieldDB::CreateIndexOnField(const std::string& field_name, const WriteOptions &op) { std::string Field = field_name; iCreateReq req(&Field,&mutex_); HandleRequest(req, op); //如果已经存在索引,那么直接返回 if(req.Existed) { return req.s; } WriteBatch KVBatch,IndexBatch,MetaBatch; std::unordered_set useless; req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless); indexDB_->Write(op, &IndexBatch); req.Finalize(this); return req.s; } Status FieldDB::DeleteIndex(const std::string &field_name, const WriteOptions &op) { std::string Field = field_name; iDeleteReq req(&Field,&mutex_); HandleRequest(req, op); //如果已经被删除或者不存在,那么可以直接返回 if(req.Deleted) { return req.s; } WriteBatch KVBatch,IndexBatch,MetaBatch; std::unordered_set useless; req.ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, useless); indexDB_->Write(op, &IndexBatch); req.Finalize(this); return req.s; } std::vector FieldDB::QueryByIndex(const Field &field, Status *s) { if (index_.count(field.first) == 0 || index_[field.first].first != Exist){ *s = Status::NotFound(Slice()); return std::vector(); } std::string indexKey; AppendIndexKey(&indexKey, ParsedInternalIndexKey(Slice(), field.first, field.second)); Iterator *indexIterator = indexDB_->NewIterator(ReadOptions()); indexIterator->Seek(indexKey); std::vector result; for (; indexIterator->Valid(); indexIterator->Next()) { ParsedInternalIndexKey iterKey; if (ParseInternalIndexKey(indexIterator->key(), &iterKey)){ if (iterKey.name_ == field.first && iterKey.val_ == field.second){ result.push_back(iterKey.user_key_.ToString()); continue; //查到说明在范围里,否则break } } break; } delete indexIterator; *s = Status::OK(); return result; } IndexStatus FieldDB::GetIndexStatus(const std::string &fieldName){ if (index_.count(fieldName) == 0) return IndexStatus::NotExist; IndexStatus idxs = index_[fieldName].first; return idxs; } Iterator * FieldDB::NewIterator(const ReadOptions &options) { return kvDB_->NewIterator(options); } // TODO:使用统一seq进行snapshot管理 const Snapshot * FieldDB::GetSnapshot() { return kvDB_->GetSnapshot(); } // TODO:同上 void FieldDB::ReleaseSnapshot(const Snapshot *snapshot) { kvDB_->ReleaseSnapshot(snapshot); } bool FieldDB::GetProperty(const Slice &property, std::string *value) { return kvDB_->GetProperty(property, value) | indexDB_->GetProperty(property, value); } void FieldDB::GetApproximateSizes(const Range *range, int n, uint64_t *sizes) { uint64_t temp = 0; kvDB_->GetApproximateSizes(range, n, sizes); indexDB_->GetApproximateSizes(range, n, &temp); *sizes += temp; } void FieldDB::CompactRange(const Slice *begin, const Slice *end) { kvDB_->CompactRange(begin, end); } Status DestroyDB(const std::string& name, const Options& options) { Status s; s = leveldb::DestroyDB(name+"_kvDB", options); assert(s.ok()); s = leveldb::DestroyDB(name+"_indexDB", options); assert(s.ok()); s = leveldb::DestroyDB(name+"_metaDB", options); assert(s.ok()); return s; } FieldDB::~FieldDB() { delete indexDB_; delete kvDB_; delete metaDB_; } } // namespace fielddb