#include "port/port_stdcxx.h" #include "db/db_impl.h" #include #include #include #include #include #include #include "leveldb/db.h" #include "leveldb/env.h" #include "leveldb/options.h" #include "leveldb/slice.h" #include "leveldb/status.h" #include "fielddb/request.h" #include # ifndef FIELD_DB_H # define FIELD_DB_H namespace fielddb { using namespace leveldb; const char EMPTY[1] = {0}; enum IndexStatus{ Creating, Deleting, Exist, NotExist }; class FieldDB : DB { public: friend class Request; friend class FieldsReq; friend class iCreateReq; friend class iDeleteReq; friend class DeleteReq; friend class BatchReq; //用的时候必须FieldDB *db = new FieldDB()再open,不能像之前一样DB *db FieldDB() : indexDB_(nullptr), kvDB_(nullptr), metaDB_(nullptr) {}; ~FieldDB(); /*lab1的要求,作为db派生类要实现的虚函数*/ Status Put(const WriteOptions &options, const Slice &key, const Slice &value) override; Status PutFields(const WriteOptions &, const Slice &key, const FieldArray &fields) override; Status Delete(const WriteOptions &options, const Slice &key) override; Status Write(const WriteOptions &options, WriteBatch *updates) override; Status Get(const ReadOptions &options, const Slice &key, std::string *value) override; Status GetFields(const ReadOptions &options, const Slice &key, FieldArray *fields) override; std::vector FindKeysByField(Field &field) override; Iterator * NewIterator(const ReadOptions &options) override; const Snapshot * GetSnapshot() override; void ReleaseSnapshot(const Snapshot *snapshot) override; bool GetProperty(const Slice &property, std::string *value) override; void GetApproximateSizes(const Range *range, int n, uint64_t *sizes) override; void CompactRange(const Slice *begin, const Slice *end) override; /*与索引相关*/ Status CreateIndexOnField(const std::string& field_name, const WriteOptions &op); Status DeleteIndex(const std::string &field_name, const WriteOptions &op); std::vector QueryByIndex(const Field &field, Status *s); //返回当前数据库中索引状态,用来测试,不过也可以作为一个功能? IndexStatus GetIndexStatus(const std::string &fieldName); static Status OpenFieldDB(Options& options,const std::string& name,FieldDB** dbptr); private: //根据metaDB的内容进行恢复 Status Recover(); private: leveldb::DB *kvDB_; leveldb::DB *metaDB_; leveldb::DB *indexDB_; std::string dbname_; const Options *options_; Env *env_; using FieldName = std::string; // 标记index的状态,如果是creating/deleting,则会附带相应的请求 std::map> index_; port::Mutex index_mu; leveldb::port::Mutex mutex_; // mutex for taskqueue std::deque taskqueue_; std::vector> FindKeysAndValByFieldName ( const Slice fieldName); /*For request handling*/ Status HandleRequest(Request &req, const WriteOptions &op); //每个请求自行构造请求后交由这个函数处理 Request *GetHandleInterval(); //获得任务队列中的待处理区间,区间划分规则和原因见文档 // private: // int count = 0; // int count_Batch = 0; // int count_Batch_Sub = 0; // uint64_t elapsed = 0; // uint64_t construct_elapsed = 0; // uint64_t construct_BatchReq_init_elapsed = 0; // uint64_t construct_BatchReq_elapsed = 0; // uint64_t construct_BatchReq_Sub_elapsed = 0; // uint64_t construct_BatchReq_perSub_elapsed = 0; // uint64_t construct_FieldsReq_Read_elapsed = 0; // uint64_t write_elapsed = 0; // uint64_t write_meta_elapsed = 0; // uint64_t write_index_elapsed = 0; // uint64_t write_kv_elapsed = 0; // uint64_t write_clean_elapsed = 0; // uint64_t write_bytes = 0; // uint64_t write_step = 500 * 1024 * 1024; // uint64_t write_bytes_lim = write_step; // uint64_t temp_elapsed = 0; // uint64_t waiting_elasped = 0; // inline void dumpStatistics() { // if(count && count % 500000 == 0 || write_bytes && write_bytes > write_bytes_lim) { // std::cout << "=====================================================\n"; // std::cout << "Total Count : " << count; // std::cout << "\tTotal Write Bytes(MB) : " << write_bytes / 1048576.0 << std::endl; // std::cout << "Average Time(ms) : " << elapsed * 1.0 / count; // std::cout << "\tAverage Write rates(MB/s) : " << write_bytes / 1048576.0 / elapsed * 1000000 << std::endl; // std::cout << "Construct Time(ms) : " << construct_elapsed * 1.0 / count << std::endl; // std::cout << "\tConstruct BatchReq Init Time(ms) : " << construct_BatchReq_init_elapsed * 1.0 / count << std::endl; // std::cout << "\tConstruct BatchReq Time(ms) : " << construct_BatchReq_elapsed * 1.0 / count << std::endl; // std::cout << "\tConstruct BatchReq Sub Time(ms) : " << construct_BatchReq_Sub_elapsed * 1.0 / count << std::endl; // std::cout << "\tConstruct BatchReq perSub Time(ms) : " << construct_BatchReq_perSub_elapsed * 1.0 / count_Batch_Sub << std::endl; // std::cout << "\tConstruct FieldsReq Read Time(ms) : " << construct_FieldsReq_Read_elapsed * 1.0 / count << std::endl; // std::cout << "Write Time(ms) : " << write_elapsed * 1.0 / count << std::endl; // std::cout << "\tWrite Meta Time(ms) : " << write_meta_elapsed * 1.0 / count << std::endl; // std::cout << "\tWrite Index Time(ms) : " << write_index_elapsed * 1.0 / count << std::endl; // std::cout << "\tWrite KV Time(ms) : " << write_kv_elapsed * 1.0 / count << std::endl; // std::cout << "\tWrite Clean Time(ms) : " << write_clean_elapsed * 1.0 / count << std::endl; // std::cout << "TaskQueue Size : " << taskqueue_.size() << std::endl; // std::cout << "temp_elased : " << temp_elapsed * 1.0 / count << std::endl; // std::cout << "waiting elapsed : " << waiting_elasped * 1.0 / count << std::endl; // // std::cout << MetaBatch.ApproximateSize() << " " << IndexBatch.ApproximateSize() << " " << KVBatch.ApproximateSize() << std::endl; // std::cout << "=====================================================\n"; // write_bytes_lim = write_bytes + write_step; // std::fflush(stdout); // } // } }; Status DestroyDB(const std::string& name, const Options& options); } // end of namespace # endif