@ -0,0 +1,24 @@ | |||||
# ifndef SLICE_HASH_SET_H | |||||
# define SLICE_HASH_SET_H | |||||
#include "leveldb/slice.h" | |||||
#include "util/hash.h" | |||||
#include <unordered_set> | |||||
using namespace leveldb; | |||||
class SliceHash { | |||||
public: | |||||
uint32_t operator()(const Slice &lhs) const { | |||||
return Hash(lhs.data(),lhs.size(),0x1234); | |||||
} | |||||
}; | |||||
class SliceEq { | |||||
public: | |||||
bool operator()(const Slice &lhs, const Slice &rhs) const { | |||||
return lhs == rhs; | |||||
} | |||||
}; | |||||
using SliceHashSet = std::unordered_set<Slice,SliceHash,SliceEq>; | |||||
#endif |
@ -0,0 +1,69 @@ | |||||
#include "fielddb/meta.h" | |||||
#include "util/coding.h" | |||||
#include <string> | |||||
#include "leveldb/options.h" | |||||
#include "leveldb/slice.h" | |||||
#include "leveldb/write_batch.h" | |||||
namespace fielddb { | |||||
using namespace leveldb; | |||||
// Slice MetaKV::metaKey() { | |||||
// std::string buf; | |||||
// PutLengthPrefixedSlice(&buf, Key); | |||||
// PutFixed64(&buf, meta_seq); | |||||
// PutFixed32(&buf, tag); | |||||
// return Slice(buf); | |||||
// } | |||||
// Slice MetaKV::metaValue() { | |||||
// return Slice(SerializeValue(Fields)); | |||||
// } | |||||
//对于含有index field的put/delete的meta编码为 (KV|Key,Value) | |||||
void MetaKV::TransPut(std::string &MetaKey,std::string &MetaValue) { | |||||
MetaKey.clear(); | |||||
MetaValue.clear(); | |||||
//这里的改动是为了防止潜在的段错误。原来的写法中,slice(buf)对应的buf是局部的,在函数返回后,buf被销毁 | |||||
//但是slice中的指针指向的是析构的string对象的部分内存 | |||||
std::string &buf = MetaKey; | |||||
PutFixed32(&buf, KV_Creating); | |||||
PutLengthPrefixedSlice(&buf, Slice(name)); | |||||
// MetaKey = Slice(buf); | |||||
// MetaValue = Slice(*value); | |||||
} | |||||
void MetaKV::TransDelete(std::string &MetaKey) { | |||||
MetaKey.clear(); | |||||
std::string &buf = MetaKey; | |||||
PutFixed32(&buf, KV_Deleting); | |||||
PutLengthPrefixedSlice(&buf, Slice(name)); | |||||
// MetaKey = Slice(buf); | |||||
} | |||||
class CleanerHandler : public WriteBatch::Handler { | |||||
public: | |||||
WriteBatch *NeedClean; | |||||
void Put(const Slice& key, const Slice& value) override { | |||||
//将所有之前put的meta数据进行delete | |||||
NeedClean->Delete(key); | |||||
} | |||||
void Delete(const Slice& key) override { | |||||
//所有的传入的MetaBatch都是Put的 | |||||
assert(0); | |||||
} | |||||
}; | |||||
void MetaCleaner::Collect(WriteBatch &MetaBatch) { | |||||
if(MetaBatch.ApproximateSize() <= 12) return; | |||||
CleanerHandler Handler; | |||||
Handler.NeedClean = &NeedClean; | |||||
MetaBatch.Iterate(&Handler); | |||||
} | |||||
void MetaCleaner::CleanMetaBatch(DB *metaDB) { | |||||
if(NeedClean.ApproximateSize() <= 12) return; | |||||
metaDB->Write(WriteOptions(), &NeedClean); | |||||
} | |||||
} |
@ -0,0 +1,56 @@ | |||||
#pragma once | |||||
#include <cstdint> | |||||
#include <cstdio> | |||||
#include "leveldb/slice.h" | |||||
#include "leveldb/write_batch.h" | |||||
#include "util/serialize_value.h" | |||||
#include "fielddb/field_db.h" | |||||
namespace fielddb { | |||||
using namespace leveldb; | |||||
/*根据写入的流程可以推断,需要存在metaDB中的数据其实都是带索引的数据,也就是FieldArray*/ | |||||
// class MetaKV { | |||||
// MetaKV(Slice &Key,FieldArray Fields): | |||||
// Key(Key),Fields(Fields),tag(0),meta_seq(0) { } | |||||
// inline int get_seq() { return meta_seq; } | |||||
// inline void set_seq(int meta_seq) { this->meta_seq = meta_seq; } | |||||
// inline void setPut() { tag = PUT; } | |||||
// inline void setDelete() { tag = DELETE; } | |||||
// Slice metaKey(); | |||||
// Slice metaValue(); | |||||
// private: | |||||
// enum {PUT = 0x0,DELETE = 0x1}; | |||||
// uint64_t meta_seq; | |||||
// uint8_t tag; | |||||
// Slice &Key; | |||||
// FieldArray Fields; | |||||
// }; | |||||
enum MetaType { | |||||
//Index, //记录index状态的meta | |||||
KV_Creating, //记录含有index field的put的meta | |||||
KV_Deleting, | |||||
}; | |||||
//将一对(field_name,field_value)转换到metaDB中的KV表示 | |||||
class MetaKV { | |||||
public: | |||||
MetaKV(Slice field_name,Slice field_value = Slice()): | |||||
name(field_name),value(field_value) { } | |||||
void TransPut(std::string &MetaKey,std::string &MetaValue); | |||||
void TransDelete(std::string &MetaKey); | |||||
private: | |||||
Slice name; | |||||
Slice value; | |||||
}; | |||||
class MetaCleaner { | |||||
public: | |||||
MetaCleaner() = default; | |||||
void Collect(WriteBatch &MetaBatch); | |||||
void CleanMetaBatch(DB *metaDB); | |||||
private: | |||||
WriteBatch NeedClean; | |||||
}; | |||||
} |
@ -1,20 +0,0 @@ | |||||
#include "fielddb/metakv.h" | |||||
#include "util/coding.h" | |||||
#include <string> | |||||
namespace fielddb { | |||||
using namespace leveldb; | |||||
Slice MetaKV::metaKey() { | |||||
std::string buf; | |||||
PutLengthPrefixedSlice(&buf, Key); | |||||
PutFixed64(&buf, meta_seq); | |||||
PutFixed32(&buf, tag); | |||||
return Slice(buf); | |||||
} | |||||
Slice MetaKV::metaValue() { | |||||
return Slice(SerializeValue(Fields)); | |||||
} | |||||
} |
@ -1,26 +0,0 @@ | |||||
#pragma once | |||||
#include <cstdint> | |||||
#include <cstdio> | |||||
#include "leveldb/slice.h" | |||||
#include "util/serialize_value.h" | |||||
namespace fielddb { | |||||
using namespace leveldb; | |||||
/*根据写入的流程可以推断,需要存在metaDB中的数据其实都是带索引的数据,也就是FieldArray*/ | |||||
class MetaKV { | |||||
MetaKV(Slice &Key,FieldArray Fields): | |||||
Key(Key),Fields(Fields),tag(0),meta_seq(0) { } | |||||
inline int get_seq() { return meta_seq; } | |||||
inline void set_seq(int meta_seq) { this->meta_seq = meta_seq; } | |||||
inline void setPut() { tag = PUT; } | |||||
inline void setDelete() { tag = DELETE; } | |||||
Slice metaKey(); | |||||
Slice metaValue(); | |||||
private: | |||||
enum {PUT = 0x0,DELETE = 0x1}; | |||||
uint64_t meta_seq; | |||||
uint8_t tag; | |||||
Slice &Key; | |||||
FieldArray Fields; | |||||
}; | |||||
} |
@ -0,0 +1,439 @@ | |||||
#include "fielddb/request.h" | |||||
#include <cassert> | |||||
#include <cstdint> | |||||
#include <deque> | |||||
#include <string> | |||||
#include <unordered_set> | |||||
#include "leveldb/slice.h" | |||||
#include "leveldb/status.h" | |||||
#include "leveldb/write_batch.h" | |||||
#include "port/port_stdcxx.h" | |||||
#include "util/mutexlock.h" | |||||
#include "util/serialize_value.h" | |||||
#include "fielddb/encode_index.h" | |||||
#include "fielddb/field_db.h" | |||||
#include "fielddb/meta.h" | |||||
#include "request.h" | |||||
namespace fielddb { | |||||
using namespace leveldb; | |||||
//为虚函数提供最基本的实现 | |||||
void Request::PendReq(Request *req) { | |||||
assert(0); | |||||
} | |||||
//为虚函数提供最基本的实现 | |||||
void Request::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, | |||||
WriteBatch &MetaBatch,fielddb::FieldDB *DB, | |||||
SliceHashSet &batchKeySet) | |||||
{ | |||||
assert(0); | |||||
} | |||||
void Request::Prepare(FieldDB *DB) { | |||||
assert(0); | |||||
} | |||||
void Request::Finalize(FieldDB *DB) { | |||||
assert(0); | |||||
} | |||||
//为虚函数提供最基本的实现 | |||||
bool Request::isPending() { | |||||
//pending中的请求的parent会指向所等待的请求(iCreate/iDelete) | |||||
return parent != this; | |||||
} | |||||
/*******FieldsReq*******/ | |||||
void FieldsReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, | |||||
WriteBatch &MetaBatch,fielddb::FieldDB *DB, | |||||
SliceHashSet &batchKeySet) | |||||
{ | |||||
if (batchKeySet.find(Key) != batchKeySet.end()){ | |||||
return;//并发的被合并的put/delete请求只处理一次 | |||||
} else { | |||||
batchKeySet.insert(Key); | |||||
} | |||||
std::string val_str; | |||||
//uint64_t start_ = DB->env_->NowMicros(); | |||||
s = DB->kvDB_->Get(ReadOptions(), Key, &val_str); | |||||
//DB->construct_FieldsReq_Read_elapsed += DB->env_->NowMicros() - start_; | |||||
// FieldArray *oldFields; | |||||
FieldSliceArray oldFields; | |||||
if (s.IsNotFound()){ | |||||
// oldFields = nullptr; | |||||
} else if (s.ok()) { //得到数据库之前key的fields, 判断需不需要删除其中潜在的索引 | |||||
// oldFields = new FieldArray; | |||||
// oldFields = ParseValue(val_str,oldFields); | |||||
Slice nameSlice, valSlice; | |||||
Slice Value(val_str); | |||||
while(GetLengthPrefixedSlice(&Value, &nameSlice)) { | |||||
if(GetLengthPrefixedSlice(&Value, &valSlice)) { | |||||
oldFields.push_back({nameSlice,valSlice}); | |||||
} else { | |||||
std::cout << "name and val not match! From FieldsReq Init" << std::endl; | |||||
} | |||||
nameSlice.clear(), valSlice.clear(); | |||||
} | |||||
} else { | |||||
assert(0); | |||||
} | |||||
bool HasIndex = false; | |||||
bool HasOldIndex = false; | |||||
{ | |||||
// MutexLock L(&DB->index_mu); //互斥访问索引状态表 | |||||
DB->index_mu.AssertHeld(); | |||||
//1.将存在冲突的put pend到对应的请求 | |||||
for(auto &[field_name,field_value] : SliceFields) { | |||||
if(field_name.data() == EMPTY) break; | |||||
if(DB->index_.count(field_name.ToString())) { | |||||
auto [index_status,parent_req] = DB->index_[field_name.ToString()]; | |||||
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { | |||||
parent_req->PendReq(this->parent); | |||||
return; | |||||
} else if(index_status == IndexStatus::Exist) { | |||||
HasIndex = true; | |||||
} | |||||
//assert(0); | |||||
} | |||||
} | |||||
//冲突也可能存在于,需要删除旧数据的索引,但该索引正在创删中 | |||||
if (!oldFields.empty()){ | |||||
for(auto &[field_name,field_value] : oldFields) { | |||||
if(field_name.data() == EMPTY) break; | |||||
if(DB->index_.count(field_name.ToString())) { | |||||
auto [index_status,parent_req] = DB->index_[field_name.ToString()]; | |||||
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { | |||||
parent_req->PendReq(this->parent); | |||||
return; | |||||
} else if(index_status == IndexStatus::Exist) { | |||||
HasOldIndex = true; | |||||
} | |||||
//assert(0); | |||||
} | |||||
} | |||||
} | |||||
std::string scrach = SerializeValue(SliceFields); | |||||
KVBatch.Put(Slice(Key), Slice(scrach)); | |||||
//2.对于没有冲突但含有索引操作的put,构建metaKV,这里直接将KV对简单编码后写入metaDB | |||||
if(HasIndex || HasOldIndex) { | |||||
std::string MetaKey,MetaValue; | |||||
std::string serialized = SerializeValue(SliceFields); | |||||
MetaKV MKV = MetaKV(Key,serialized); | |||||
MKV.TransPut(MetaKey, MetaValue); | |||||
MetaBatch.Put(MetaKey, serialized); | |||||
//3.1对于含有索引的oldfield删除索引 | |||||
if (HasOldIndex) { | |||||
for(auto &[field_name,field_value] : oldFields) { | |||||
if(field_name.data() == EMPTY) continue; | |||||
if(DB->index_.count(field_name.ToString())) { | |||||
std::string indexKey; | |||||
AppendIndexKey(&indexKey, ParsedInternalIndexKey( | |||||
Key,field_name,field_value)); | |||||
IndexBatch.Delete(indexKey); | |||||
} | |||||
} | |||||
} | |||||
//3.2对于含有索引的field建立索引 | |||||
if (HasIndex) { | |||||
for(auto &[field_name,field_value] : SliceFields) { | |||||
if(field_name.data() == EMPTY) continue; | |||||
if(DB->index_.count(field_name.ToString())) { | |||||
std::string indexKey; | |||||
AppendIndexKey(&indexKey, ParsedInternalIndexKey( | |||||
Key,field_name,field_value)); | |||||
IndexBatch.Put(indexKey, Slice()); | |||||
} | |||||
} | |||||
} | |||||
} | |||||
//优化:对于3.1,3.2中都有的索引只写一次 | |||||
} | |||||
// if(oldFields) delete oldFields; | |||||
} | |||||
/*******DeleteReq*******/ | |||||
void DeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, | |||||
WriteBatch &MetaBatch,fielddb::FieldDB *DB, | |||||
SliceHashSet &batchKeySet) | |||||
{ | |||||
if (batchKeySet.find(Key) != batchKeySet.end()){ | |||||
return;//并发的被合并的put/delete请求只处理一次 | |||||
} else { | |||||
batchKeySet.insert(Key); | |||||
} | |||||
//1. 读取当前的最新的键值对,判断是否存在含有键值对的field | |||||
//2.1 如果无,则正常构造delete | |||||
//2.2 如果是有的field的索引状态都是exist,则在meta中写KV_Deleting类型的记录 | |||||
//在kvDB和metaDB中写入对应的delete | |||||
//2.3 如果存在field的索引状态是Creating或者Deleting,那么在那个队列上面进行等待 | |||||
std::string val_str; | |||||
Status s = DB->kvDB_->Get(ReadOptions(), Key, &val_str); | |||||
if (s.IsNotFound()) return; | |||||
// FieldArray *Fields = new FieldArray; | |||||
// ParseValue(val_str,Fields); | |||||
FieldSliceArray Fields; | |||||
Slice nameSlice, valSlice; | |||||
Slice Value(val_str); | |||||
while(GetLengthPrefixedSlice(&Value, &nameSlice)) { | |||||
if(GetLengthPrefixedSlice(&Value, &valSlice)) { | |||||
Fields.push_back({nameSlice,valSlice}); | |||||
} else { | |||||
std::cout << "name and val not match! From FieldsReq Init" << std::endl; | |||||
} | |||||
nameSlice.clear(), valSlice.clear(); | |||||
} | |||||
KVBatch.Delete(Slice(Key)); | |||||
bool HasIndex = false; | |||||
{ | |||||
// MutexLock L(&DB->index_mu); //互斥访问索引状态表 | |||||
DB->index_mu.AssertHeld(); | |||||
//1.将存在冲突的delete pend到对应的请求 | |||||
for(auto &[field_name,field_value] : Fields) { | |||||
if(field_name.data() == EMPTY) break; | |||||
if(DB->index_.count(field_name.ToString())) { | |||||
auto [index_status,parent_req] = DB->index_[field_name.ToString()]; | |||||
if(index_status == IndexStatus::Creating || index_status == IndexStatus::Deleting) { | |||||
parent_req->PendReq(this->parent); | |||||
return; | |||||
} else if(index_status == IndexStatus::Exist) { | |||||
HasIndex = true; | |||||
} | |||||
//assert(0); | |||||
} | |||||
} | |||||
KVBatch.Delete(Slice(Key)); | |||||
//2.对于没有冲突但含有索引操作的delete,构建metaKV,这里直接将KV对简单编码后写入metaDB | |||||
if(HasIndex) { | |||||
std::string MetaKey; | |||||
MetaKV MKV = MetaKV(Key); | |||||
MKV.TransDelete(MetaKey); //meta中写入一个delete不需要value | |||||
MetaBatch.Put(MetaKey, Slice()); | |||||
//3.对于含有索引的field删除索引 | |||||
for(auto &[field_name,field_value] : Fields) { | |||||
if(field_name.data() == EMPTY) continue; | |||||
if(DB->index_.count(field_name.ToString())) { | |||||
std::string indexKey; | |||||
AppendIndexKey(&indexKey, ParsedInternalIndexKey( | |||||
Key,field_name,field_value)); | |||||
IndexBatch.Delete(indexKey); | |||||
} | |||||
} | |||||
} | |||||
} | |||||
// delete Fields; | |||||
} | |||||
/*******iCreateReq*******/ | |||||
void iCreateReq::Prepare(FieldDB *DB) { | |||||
//在index_中完成索引状态更新,在这里可以避免重复创建 | |||||
DB->index_mu.AssertHeld(); | |||||
if(DB->index_.count(Field.ToString())) { | |||||
auto [istatus,parent] = DB->index_[Field.ToString()]; | |||||
if(istatus == IndexStatus::Exist) { | |||||
//如果已经完成建立索引,则返回成功 | |||||
done = true; | |||||
Existed = true; | |||||
s = Status::OK(); | |||||
} else { | |||||
//如果正在创建或删除,那么进行等待 | |||||
parent->PendReq(this->parent); | |||||
} | |||||
return; | |||||
} | |||||
//如果索引状态表中没有,则表示尚未创建,更新相应的状态 | |||||
//这里将done设置为true表示在taskqueue中需要完成的部分已经完成,不需要pend | |||||
DB->index_[Field.ToString()] = {IndexStatus::Creating,this}; | |||||
done = true; | |||||
} | |||||
void iCreateReq::PendReq(Request *req) { | |||||
req->parent = this; | |||||
pending_list.push_back(req); | |||||
} | |||||
void iCreateReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, | |||||
WriteBatch &MetaBatch,fielddb::FieldDB *DB, | |||||
SliceHashSet &batchKeySet) | |||||
{ | |||||
//遍历数据库,构建二级索引到indexbatch,(更新metaDB中的元数据为Index类型的(Field,Creating)) | |||||
//一个indexwritebatch写入,那么索引创建删除应该和metadb没有交互 | |||||
std::vector<std::pair<std::string, std::string>> keysAndVal = | |||||
DB->FindKeysAndValByFieldName(Field.ToString()); | |||||
Slice value = Slice(); | |||||
for (auto &kvPair : keysAndVal){ | |||||
std::string indexKey; | |||||
AppendIndexKey(&indexKey, | |||||
ParsedInternalIndexKey(kvPair.first, Field, kvPair.second)); | |||||
IndexBatch.Put(indexKey, value); | |||||
} | |||||
} | |||||
void iCreateReq::Finalize(FieldDB *DB) { | |||||
//1. 写入完成后,更新index状态表,(并将metaDB的值改为Index类型的(Field,Existing)) | |||||
MutexLock iL(&DB->index_mu); | |||||
DB->index_[Field.ToString()] = {IndexStatus::Exist, nullptr}; | |||||
DB->index_mu.Unlock(); | |||||
if (pending_list.empty()) return; | |||||
//2. 将所有的pendinglist重新入队 | |||||
MutexLock L(&DB->mutex_); | |||||
for (auto req : pending_list){ | |||||
DB->taskqueue_.push_back(req); | |||||
req->parent = req; //解绑 | |||||
} | |||||
if (pending_list[0] == DB->taskqueue_.front()) { | |||||
pending_list[0]->cond_.Signal(); | |||||
} | |||||
this->s = Status::OK(); | |||||
} | |||||
/*******iDeleteReq*******/ | |||||
void iDeleteReq::Prepare(FieldDB *DB) { | |||||
DB->index_mu.AssertHeld(); | |||||
if(DB->index_.count(Field.ToString()) == 0) { | |||||
done = true; | |||||
Deleted = true; | |||||
s = Status::OK(); | |||||
return ; | |||||
} | |||||
auto [istatus,parent] = DB->index_[Field.ToString()]; | |||||
if(istatus == IndexStatus::Exist) { | |||||
DB->index_[Field.ToString()] = {IndexStatus::Deleting,this}; | |||||
done = true; | |||||
} else { | |||||
//如果正在创建或者删除,那么pend到对应的请求上 | |||||
parent->PendReq(this->parent); | |||||
} | |||||
} | |||||
void iDeleteReq::PendReq(Request* req) { | |||||
req->parent = this; | |||||
pending_list.push_back(req); | |||||
} | |||||
void iDeleteReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, | |||||
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) | |||||
{ | |||||
std::vector<std::pair<std::string, std::string>> keysAndVal = | |||||
DB->FindKeysAndValByFieldName(Field); | |||||
Slice value = Slice(); | |||||
for (auto &kvPair : keysAndVal){ | |||||
std::string indexKey; | |||||
AppendIndexKey(&indexKey, | |||||
ParsedInternalIndexKey(kvPair.first, Field, kvPair.second)); | |||||
IndexBatch.Delete(indexKey); | |||||
} | |||||
} | |||||
void iDeleteReq::Finalize(FieldDB *DB) { | |||||
MutexLock iL(&DB->index_mu); | |||||
DB->index_.erase(Field.ToString()); | |||||
DB->index_mu.Unlock(); | |||||
if (pending_list.empty()) return; | |||||
//2. 将所有的pendinglist重新入队 | |||||
MutexLock L(&DB->mutex_); | |||||
for (auto req : pending_list){ | |||||
DB->taskqueue_.push_back(req); | |||||
req->parent = req; //解绑 | |||||
} | |||||
if (pending_list[0] == DB->taskqueue_.front()) { | |||||
pending_list[0]->cond_.Signal(); | |||||
} | |||||
this->s = Status::OK(); | |||||
} | |||||
BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu): | |||||
Batch(Batch),Request(BatchReq_t, mu) { | |||||
struct BatchHandler : WriteBatch::Handler { | |||||
void Put(const Slice &key, const Slice &value) override { | |||||
//为key和value构造存储的地方,防止由于string的析构造成可能得内存访问错误 | |||||
// str_buf->push_back(key.ToString()); | |||||
// FieldArray *field = new FieldArray; | |||||
// field = ParseValue(value.ToString(), field); | |||||
// if (field->empty()){ //batch中的value没有field | |||||
// fa_buf->push_back({{EMPTY,value.ToString()}}); | |||||
// } else { | |||||
// fa_buf->push_back(*field); | |||||
// } | |||||
//默认所有WriteBatch中的东西都是有Field的!!!!! | |||||
sub_requests->emplace_back(new FieldsReq(key,value,mu)); | |||||
sub_requests->back()->parent = req; | |||||
// delete field; | |||||
} | |||||
void Delete(const Slice &key) override { | |||||
// str_buf->push_back(key.ToString()); | |||||
sub_requests->emplace_back(new DeleteReq(key,mu)); | |||||
sub_requests->back()->parent = req; | |||||
} | |||||
BatchReq *req; | |||||
port::Mutex *mu; | |||||
// std::deque<std::string> *str_buf; | |||||
// std::deque<FieldArray> *fa_buf; | |||||
std::deque<Request*> *sub_requests; | |||||
}; | |||||
BatchHandler Handler; | |||||
Handler.req = this; | |||||
Handler.mu = mu; | |||||
// Handler.str_buf = &str_buf; | |||||
// Handler.fa_buf = &fa_buf; | |||||
Handler.sub_requests = &sub_requests; | |||||
Batch->Iterate(&Handler); | |||||
} | |||||
BatchReq::~BatchReq() { | |||||
while(!sub_requests.empty()) { | |||||
Request *req = sub_requests.front(); | |||||
sub_requests.pop_front(); | |||||
delete req; | |||||
} | |||||
} | |||||
void BatchReq::ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, | |||||
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) | |||||
{ | |||||
WriteBatch Sub_KVBatch,Sub_IndexBatch,Sub_MetaBatch; | |||||
SliceHashSet Sub_batchKeySet; | |||||
//由于batch是有顺序的,根据我们现在的一个key只处理最开始的算法,这里需要反向迭代 | |||||
//uint64_t start_ = DB->env_->NowMicros(); | |||||
for(auto subreq = sub_requests.rbegin(); subreq != sub_requests.rend(); subreq++ ) { | |||||
//uint64_t start_sub = DB->env_->NowMicros(); | |||||
(*subreq)->ConstructBatch(Sub_KVBatch, Sub_IndexBatch, Sub_MetaBatch, DB, Sub_batchKeySet); | |||||
// (*subreq)->ConstructBatch(KVBatch, IndexBatch, MetaBatch, DB, batchKeySet); | |||||
//DB->construct_BatchReq_perSub_elapsed += DB->env_->NowMicros() - start_sub; | |||||
//DB->count_Batch_Sub ++; | |||||
//所有的对于pendreq的调用传入的参数被改成了this->parent,因此,对于subrequests来说, | |||||
//pendreq的传参为对应的Batchreq,因此,此处判断batchreq是否pending可以得到subreq是否有冲突 | |||||
if(isPending()) { | |||||
return; | |||||
} | |||||
} | |||||
//DB->construct_BatchReq_Sub_elapsed += DB->env_->NowMicros() - start_; | |||||
if(Sub_KVBatch.ApproximateSize() > 12) { | |||||
KVBatch.Append(Sub_KVBatch); | |||||
} | |||||
if(Sub_IndexBatch.ApproximateSize() > 12) { | |||||
IndexBatch.Append(Sub_IndexBatch); | |||||
} | |||||
if(Sub_MetaBatch.ApproximateSize() > 12) { | |||||
MetaBatch.Append(Sub_MetaBatch); | |||||
} | |||||
batchKeySet.insert(Sub_batchKeySet.begin(),Sub_batchKeySet.end()); | |||||
//DB->construct_BatchReq_elapsed += DB->env_->NowMicros() - start_; | |||||
} | |||||
} // namespace fielddb |
@ -1,25 +1,171 @@ | |||||
#include <deque> | |||||
#include <string> | #include <string> | ||||
#include "leveldb/slice.h" | |||||
#include "leveldb/status.h" | |||||
#include "leveldb/write_batch.h" | |||||
#include "port/port_stdcxx.h" | #include "port/port_stdcxx.h" | ||||
#include "util/coding.h" | |||||
#include "util/mutexlock.h" | #include "util/mutexlock.h" | ||||
#include "util/serialize_value.h" | #include "util/serialize_value.h" | ||||
#include <unordered_set> | |||||
// #include "fielddb/field_db.h" | |||||
#include "fielddb/SliceHashSet.h" | |||||
#ifndef REQUEST_H | |||||
#define REQUEST_H | |||||
namespace fielddb { | namespace fielddb { | ||||
using namespace leveldb; | using namespace leveldb; | ||||
// 在taskqueue中的Request,由taskqueue最开始的线程处理一批Request | // 在taskqueue中的Request,由taskqueue最开始的线程处理一批Request | ||||
// 这个思路与write写入的思路类似 | // 这个思路与write写入的思路类似 | ||||
class FieldDB; | |||||
class Request { | class Request { | ||||
public: | public: | ||||
Request(std::string *Key,std::string *Value,port::Mutex *mu): | |||||
Key(Key),Value(Value),hasFields(false),_cond(mu) { } | |||||
Request(std::string *Key,FieldArray *Fields,port::Mutex *mu): | |||||
Key(Key),Fields(Fields),hasFields(false),_cond(mu) { } | |||||
friend class FieldDB; | |||||
enum RequestType { | |||||
FieldsReq_t, | |||||
//ValueReq_t, | |||||
iCreateReq_t, | |||||
iDeleteReq_t, | |||||
DeleteReq_t, | |||||
BatchReq_t, | |||||
}; | |||||
public: | |||||
// Request(std::string *Key,std::string *Value,port::Mutex *mu): | |||||
// Key(Key),Value(Value),hasFields(false),cond_(mu) { } | |||||
// Request(std::string *Key,FieldArray *Fields,port::Mutex *mu): | |||||
// Key(Key),Fields(Fields),hasFields(true),cond_(mu) { } | |||||
Request(RequestType type,port::Mutex *mu): | |||||
type_(type),cond_(mu),done(false) { parent = this; }; | |||||
//virtual ~Request(); | |||||
inline bool isFieldsReq() { return type_ == FieldsReq_t; } | |||||
// inline bool isValueReq() { return type_ == ValueReq_t; } | |||||
inline bool isiCreateReq() { return type_ == iCreateReq_t; } | |||||
inline bool isiDeleteReq() { return type_ == iDeleteReq_t; } | |||||
inline bool isDeleteReq() { return type_ == DeleteReq_t; } | |||||
inline bool isBatchReq() { return type_ == BatchReq_t; } | |||||
private: | |||||
//用于含有Fields的 | |||||
virtual void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, | |||||
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet); | |||||
//主要用于icreate和idelete在队列中的注册当前状态 | |||||
virtual void Prepare(FieldDB *DB); | |||||
virtual void Finalize(FieldDB *DB); | |||||
virtual void PendReq(Request *req); | |||||
bool isPending(); | |||||
// protected: | |||||
bool done; | bool done; | ||||
port::CondVar _cond; | |||||
Status s; | |||||
port::CondVar cond_; | |||||
RequestType type_; | |||||
Request *parent; | |||||
}; | |||||
//含有field的put | |||||
class FieldsReq : public Request { | |||||
public: | |||||
FieldsReq(Slice Key,const FieldArray &Fields,port::Mutex *mu): | |||||
Key(Key),Request(FieldsReq_t,mu) { | |||||
for(auto &[name,value] : Fields) { | |||||
SliceFields.push_back({name,value}); | |||||
} | |||||
}; | |||||
FieldsReq(Slice Key, Slice Value,port::Mutex *mu): | |||||
Key(Key),Request(FieldsReq_t,mu) { | |||||
Slice nameSlice, valSlice; | |||||
while(GetLengthPrefixedSlice(&Value, &nameSlice)) { | |||||
if(GetLengthPrefixedSlice(&Value, &valSlice)) { | |||||
SliceFields.push_back({nameSlice,valSlice}); | |||||
} else { | |||||
std::cout << "name and val not match! From FieldsReq Init" << std::endl; | |||||
} | |||||
nameSlice.clear(), valSlice.clear(); | |||||
} | |||||
} | |||||
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, | |||||
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override; | |||||
Slice Key; | |||||
FieldSliceArray SliceFields; | |||||
}; | |||||
//不含有field的put,但是计划被弃用了 | |||||
// class ValueReq : public Request { | |||||
// public: | |||||
// ValueReq(std::string *Key,std::string *Value,port::Mutex *mu): | |||||
// Key(Key),Value(Value),Request(ValueReq_t,mu) { }; | |||||
// std::string *Key; | |||||
// std::string *Value; | |||||
// }; | |||||
bool hasFields; | |||||
std::string *Key; | |||||
std::string *Value; | |||||
FieldArray *Fields; | |||||
//TODO:下面的Field什么的可能通过传引用的方式会更加好? | |||||
//创建索引的request | |||||
class iCreateReq : public Request { | |||||
public: | |||||
iCreateReq(Slice Field,port::Mutex *mu): | |||||
Field(Field),Request(iCreateReq_t, mu),Existed(false) { }; | |||||
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, | |||||
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override; | |||||
void Prepare(FieldDB *DB) override; | |||||
void Finalize(FieldDB *DB) override; | |||||
void PendReq(Request *req) override; | |||||
bool Existed; | |||||
Slice Field; | |||||
std::deque<Request *> pending_list; | |||||
}; | }; | ||||
//删除索引的request | |||||
class iDeleteReq : public Request { | |||||
public: | |||||
iDeleteReq(Slice Field,port::Mutex *mu): | |||||
Field(Field),Request(iDeleteReq_t, mu),Deleted(false) { }; | |||||
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, | |||||
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override; | |||||
void Prepare(FieldDB *DB) override; | |||||
void Finalize(FieldDB *DB) override; | |||||
void PendReq(Request *req) override; | |||||
bool Deleted; | |||||
Slice Field; | |||||
std::deque<Request *> pending_list; | |||||
}; | |||||
//删除key的request | |||||
class DeleteReq : public Request { | |||||
public: | |||||
DeleteReq(Slice Key,port::Mutex *mu): | |||||
Key(Key),Request(DeleteReq_t,mu) { }; | |||||
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, | |||||
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override; | |||||
Slice Key; | |||||
}; | |||||
class BatchReq : public Request { | |||||
public: | |||||
BatchReq(WriteBatch *Batch,port::Mutex *mu); | |||||
~BatchReq(); | |||||
void ConstructBatch(WriteBatch &KVBatch,WriteBatch &IndexBatch, | |||||
WriteBatch &MetaBatch,fielddb::FieldDB *DB,SliceHashSet &batchKeySet) override; | |||||
WriteBatch *Batch; | |||||
std::deque<Request *> sub_requests; | |||||
// std::deque<std::string> str_buf; | |||||
// std::deque<FieldArray> fa_buf; | |||||
}; | |||||
} | } | ||||
#endif |
@ -0,0 +1,284 @@ | |||||
#include "gtest/gtest.h" | |||||
// #include "leveldb/env.h" | |||||
// #include "leveldb/db.h" | |||||
#include "fielddb/field_db.h" | |||||
#include <random> | |||||
#include "helper.h" | |||||
using namespace fielddb; | |||||
constexpr int value_size = 2048; | |||||
constexpr int data_size = 128 << 20; | |||||
#define AGE_RANGE 100 | |||||
std::vector<std::string> cities = { | |||||
"Beijing", "Shanghai", "Guangzhou", "Shenzhen", "Hangzhou", | |||||
"Chengdu", "Chongqing", "Wuhan", "Suzhou", "Tianjin" | |||||
}; | |||||
//检查insert和queryByIndex的数据是否对应 | |||||
//封装了一个线程安全的全局set | |||||
ThreadSafeSet shanghaiKeys; | |||||
ThreadSafeSet age20Keys; | |||||
//复杂的测试要注意这两个全局变量, | |||||
//目前只有InsertFieldData和InsertOneField和writeFieldData会往里加, | |||||
//DeleteFieldData和InsertOneField会删除, | |||||
//其他测试之间有必要手动clear | |||||
const WriteOptions op; | |||||
Status OpenDB(std::string dbName, FieldDB **db) { | |||||
Options options; | |||||
options.create_if_missing = true; | |||||
return FieldDB::OpenFieldDB(options, dbName, db); | |||||
} | |||||
// void ClearDB(FieldDB *db){ | |||||
// //destroy和恢复没做前先用这个清理数据库,否则跑不同的数据多做几次测试会污染 | |||||
// WriteOptions writeOptions; | |||||
// int key_num = data_size / value_size; | |||||
// for (int i = 0; i < key_num; i++) { | |||||
// int key_ = i+1; | |||||
// std::string key = std::to_string(key_); | |||||
// Status s = db->Delete(WriteOptions(), key); | |||||
// ASSERT_TRUE(s.ok()); | |||||
// } | |||||
// } | |||||
//只插一条特定数据的测试 | |||||
void InsertOneField(FieldDB *db, std::string key = "0") { | |||||
WriteOptions writeOptions; | |||||
FieldArray fields = { | |||||
{"name", "special#" + key}, | |||||
{"address", "Shanghai"}, | |||||
{"age", "20"} | |||||
}; | |||||
Status s = db->PutFields(WriteOptions(), key, fields); | |||||
ASSERT_TRUE(s.ok()); | |||||
shanghaiKeys.insert(key); | |||||
age20Keys.insert(key); | |||||
} | |||||
//只删一条特定数据的测试 | |||||
void DeleteOneField(FieldDB *db, std::string key = "0") { | |||||
WriteOptions writeOptions; | |||||
Status s = db->Delete(WriteOptions(), key); | |||||
ASSERT_TRUE(s.ok()); | |||||
shanghaiKeys.erase(key); | |||||
age20Keys.erase(key); | |||||
} | |||||
//与上面对应 | |||||
void GetOneField(FieldDB *db, std::string key = "0") { | |||||
ReadOptions readOptions; | |||||
FieldArray fields_ret; | |||||
Status s = db->GetFields(readOptions, key, &fields_ret); | |||||
ASSERT_TRUE(s.ok()); | |||||
for (const Field& pairs : fields_ret) { | |||||
if (pairs.first == "name"){ | |||||
ASSERT_EQ(pairs.second, "special#" + key); | |||||
} else if (pairs.first == "address"){ | |||||
ASSERT_EQ(pairs.second, "Shanghai"); | |||||
} else if (pairs.first == "age"){ | |||||
ASSERT_EQ(pairs.second, "20"); | |||||
} else assert(false); | |||||
} | |||||
} | |||||
void InsertFieldData(FieldDB *db, int seed = 0/*随机种子*/) { | |||||
std::cout << "-------inserting-------" << std::endl; | |||||
WriteOptions writeOptions; | |||||
int key_num = data_size / value_size; | |||||
// srand线程不安全,这种可以保证多线程时随机序列也一致 | |||||
std::mt19937 rng(seed); | |||||
for (int i = 0; i < key_num; i++) { | |||||
int randThisTime = rng(); //确保读写一个循环只rand一次,否则随机序列会不一致 | |||||
//让批量写入的key>0, 单独写入的key<=0,方便测试观察 | |||||
int key_ = std::abs(randThisTime) % key_num + 1; | |||||
std::string key = std::to_string(key_); | |||||
std::string name = "customer#" + std::to_string(key_); | |||||
std::string address = cities[randThisTime % cities.size()]; | |||||
std::string age = std::to_string(std::abs(randThisTime) % AGE_RANGE); | |||||
FieldArray fields = { | |||||
{"name", name}, | |||||
{"address", address}, | |||||
{"age", age} | |||||
}; | |||||
if (address == "Shanghai") { | |||||
shanghaiKeys.insert(key); | |||||
} | |||||
if (age == "20") { | |||||
age20Keys.insert(key); | |||||
} | |||||
Status s = db->PutFields(WriteOptions(), key, fields); | |||||
ASSERT_TRUE(s.ok()); | |||||
} | |||||
} | |||||
void DeleteFieldData(FieldDB *db, int seed = 0/*随机种子*/) { | |||||
std::cout << "-------deleting-------" << std::endl; | |||||
WriteOptions writeOptions; | |||||
int key_num = data_size / value_size; | |||||
// srand线程不安全,这种可以保证多线程时随机序列也一致 | |||||
std::mt19937 rng(seed); | |||||
shanghaiKeys.clear(); | |||||
age20Keys.clear(); | |||||
for (int i = 0; i < key_num; i++) { | |||||
int randThisTime = rng(); //确保读写一个循环只rand一次,否则随机序列会不一致 | |||||
int key_ = std::abs(randThisTime) % key_num + 1; | |||||
std::string key = std::to_string(key_); | |||||
Status s = db->Delete(WriteOptions(), key); | |||||
ASSERT_TRUE(s.ok()); | |||||
} | |||||
} | |||||
void WriteFieldData(FieldDB *db, int seed = 0/*随机种子*/) { | |||||
std::cout << "-------writing-------" << std::endl; | |||||
WriteOptions writeOptions; | |||||
int key_num = data_size / value_size; | |||||
// srand线程不安全,这种可以保证多线程时随机序列也一致 | |||||
std::mt19937 rng(seed); | |||||
WriteBatch wb; | |||||
for (int i = 0; i < key_num; i++) { | |||||
int randThisTime = rng(); //确保读写一个循环只rand一次,否则随机序列会不一致 | |||||
//让批量写入的key>0, 单独写入的key<=0,方便测试观察 | |||||
int key_ = std::abs(randThisTime) % key_num + 1; | |||||
std::string key = std::to_string(key_); | |||||
std::string name = "customer#" + std::to_string(key_); | |||||
std::string address = cities[randThisTime % cities.size()]; | |||||
std::string age = std::to_string(std::abs(randThisTime) % AGE_RANGE); | |||||
FieldArray fields = { | |||||
{"name", name}, | |||||
{"address", address}, | |||||
{"age", age} | |||||
}; | |||||
if (address == "Shanghai") { | |||||
shanghaiKeys.insert(key); | |||||
} | |||||
if (age == "20") { | |||||
age20Keys.insert(key); | |||||
} | |||||
wb.Put(key, SerializeValue(fields)); | |||||
} | |||||
Status s = db->Write(writeOptions, &wb); | |||||
ASSERT_TRUE(s.ok()); | |||||
} | |||||
//并发时不一定能读到,加个参数控制 | |||||
void GetFieldData(FieldDB *db, bool allowNotFound, int seed = 0) { | |||||
std::cout << "-------getting-------" << std::endl; | |||||
ReadOptions readOptions; | |||||
int key_num = data_size / value_size; | |||||
// 点查 | |||||
std::mt19937 rng(seed); | |||||
for (int i = 0; i < 100; i++) { | |||||
int randThisTime = rng(); | |||||
int key_ = std::abs(randThisTime) % key_num + 1; | |||||
std::string key = std::to_string(key_); | |||||
FieldArray fields_ret; | |||||
Status s = db->GetFields(readOptions, key, &fields_ret); | |||||
if (!allowNotFound){ //必须读到 | |||||
// if (!s.ok()){ | |||||
// std::cout << key << std::endl; | |||||
// } | |||||
ASSERT_TRUE(s.ok()); | |||||
} else { //不必须读到,但只要读到address必须正确 | |||||
if(s.IsNotFound()) continue; | |||||
} | |||||
for (const Field& pairs : fields_ret) { | |||||
if (pairs.first == "name"){ | |||||
} else if (pairs.first == "address"){ | |||||
std::string city = pairs.second; | |||||
ASSERT_NE(std::find(cities.begin(), cities.end(), city), cities.end()); | |||||
} else if (pairs.first == "age"){ | |||||
int age = std::stoi(pairs.second); | |||||
ASSERT_TRUE(age >= 0 && age < AGE_RANGE); | |||||
} else assert(false); | |||||
} | |||||
} | |||||
} | |||||
//检查对应种子有没有删除干净 | |||||
//删除期间即使其他种子也不能并发写,因为即使种子不同,随机出的key可能相同 | |||||
void GetDeleteData(FieldDB *db, int seed = 0) { | |||||
std::cout << "-------getting-------" << std::endl; | |||||
ReadOptions readOptions; | |||||
int key_num = data_size / value_size; | |||||
std::mt19937 rng(seed); | |||||
for (int i = 0; i < 100; i++) { | |||||
int randThisTime = rng(); | |||||
int key_ = std::abs(randThisTime) % key_num + 1; | |||||
std::string key = std::to_string(key_); | |||||
FieldArray fields_ret; | |||||
Status s = db->GetFields(readOptions, key, &fields_ret); | |||||
ASSERT_TRUE(s.IsNotFound()); | |||||
} | |||||
} | |||||
void findKeysByCity(FieldDB *db) { | |||||
std::cout << "-------getting field address-------" << std::endl; | |||||
Field field = {"address", "Shanghai"}; | |||||
std::vector<std::string> resKeys = db->FindKeysByField(field); | |||||
//打印比较,因为shanghaikey可能被后写入的、其他address的key覆盖,打印出的后一个数应该小于前一个数 | |||||
//如果随机种子相同,每次打印出的两个数也应该相同 | |||||
std::cout << "address: " << shanghaiKeys.size() << " " << resKeys.size() << std::endl; | |||||
for (const std::string &key : resKeys){ | |||||
ASSERT_TRUE(shanghaiKeys.haveKey(key)); | |||||
} | |||||
} | |||||
// haveIndex表明数据库有没有该索引(address) | |||||
void findKeysByCityIndex(FieldDB *db, bool haveIndex) { | |||||
std::cout << "-------getting field address by index-------" << std::endl; | |||||
Field field = {"address", "Shanghai"}; | |||||
Status s; | |||||
std::vector<std::string> resKeys = db->QueryByIndex(field, &s); | |||||
if (haveIndex) ASSERT_TRUE(s.ok()); | |||||
else { | |||||
ASSERT_TRUE(s.IsNotFound()); | |||||
return; | |||||
} | |||||
std::cout << "address: " << shanghaiKeys.size() << " " << resKeys.size() << std::endl;//打印比较 | |||||
for (const std::string &key : resKeys){ | |||||
ASSERT_TRUE(shanghaiKeys.haveKey(key)); | |||||
} | |||||
} | |||||
void findKeysByAgeIndex(FieldDB *db, bool haveIndex) { | |||||
std::cout << "-------getting field age by index-------" << std::endl; | |||||
Field field = {"age", "20"}; | |||||
Status s; | |||||
std::vector<std::string> resKeys = db->QueryByIndex(field, &s); | |||||
if (haveIndex) ASSERT_TRUE(s.ok()); | |||||
else { | |||||
ASSERT_TRUE(s.IsNotFound()); | |||||
return; | |||||
} | |||||
std::cout << "age: " << age20Keys.size() << " " << resKeys.size() << std::endl; | |||||
for (const std::string &key : resKeys){ | |||||
ASSERT_TRUE(age20Keys.haveKey(key)); | |||||
} | |||||
} | |||||
void checkDataInKVAndIndex(FieldDB *db, std::string fieldName = "address") { | |||||
Field field; | |||||
if (fieldName == "address") field = {"address", "Shanghai"}; | |||||
else if (fieldName == "age") field = {"age", "20"}; | |||||
else assert(0);//只支持这两个字段检查 | |||||
Status s; | |||||
std::vector<std::string> resKeys1 = db->QueryByIndex(field, &s); //indexdb根据索引查到的数据 | |||||
std::vector<std::string> resKeys2 = db->FindKeysByField(field); //kvdb强行遍历查到的数据 | |||||
std::sort(resKeys1.begin(), resKeys1.end()); | |||||
std::sort(resKeys2.begin(), resKeys2.end()); | |||||
std::cout << resKeys1.size() << " " << resKeys2.size() << std::endl; | |||||
ASSERT_EQ(resKeys1, resKeys2); | |||||
} |
@ -0,0 +1,34 @@ | |||||
#include "fielddb/field_db.h" | |||||
using namespace fielddb; | |||||
class ThreadSafeSet | |||||
{ | |||||
private: | |||||
std::set<std::string> keys; | |||||
std::mutex setMutex; | |||||
public: | |||||
ThreadSafeSet(){} | |||||
void insert(std::string key){ | |||||
std::lock_guard<std::mutex> lock(setMutex); | |||||
keys.insert(key); | |||||
} | |||||
void erase(std::string key){ | |||||
std::lock_guard<std::mutex> lock(setMutex); | |||||
keys.erase(key); | |||||
} | |||||
void clear(){ | |||||
std::lock_guard<std::mutex> lock(setMutex); | |||||
keys.clear(); | |||||
} | |||||
size_t size(){ | |||||
std::lock_guard<std::mutex> lock(setMutex); | |||||
return keys.size(); | |||||
} | |||||
bool haveKey(std::string key){ | |||||
return std::find(keys.begin(), keys.end(), key) != keys.end(); | |||||
} | |||||
}; |
@ -0,0 +1,302 @@ | |||||
#include "gtest/gtest.h" | |||||
#include <thread> | |||||
// #include "leveldb/env.h" | |||||
// #include "leveldb/db.h" | |||||
#include "fielddb/field_db.h" | |||||
#include "test/helper.cc" | |||||
using namespace fielddb; | |||||
// 测试中read/write都表示带索引的读写 | |||||
//读写有索引数据的并发 | |||||
TEST(TestReadPut, Parallel) { | |||||
fielddb::DestroyDB("testdb2.1",Options()); | |||||
FieldDB *db = new FieldDB(); | |||||
if(OpenDB("testdb2.1", &db).ok() == false) { | |||||
std::cerr << "open db failed" << std::endl; | |||||
abort(); | |||||
} | |||||
// ClearDB(db); | |||||
db->CreateIndexOnField("address", op); | |||||
db->CreateIndexOnField("age", op); | |||||
int thread_num_ = 5; | |||||
std::vector<std::thread> threads(thread_num_); | |||||
//二写三读 | |||||
for (size_t i = 0; i < thread_num_; i++) | |||||
{ | |||||
if (i == 0) {//写随机序列0 | |||||
threads[i] = std::thread(InsertFieldData, db, 0); | |||||
} else if (i == 1) | |||||
{//写随机序列1 | |||||
threads[i] = std::thread(InsertFieldData, db, 1); | |||||
} else {//读 | |||||
bool allowNotFound = true; | |||||
threads[i] = std::thread(GetFieldData, db, allowNotFound, 0); | |||||
} | |||||
} | |||||
for (auto& t : threads) { | |||||
if (t.joinable()) { | |||||
t.join(); | |||||
} | |||||
} | |||||
// 此时写已完成,一定能读到两次写 | |||||
bool allowNotFound = false; | |||||
GetFieldData(db, allowNotFound); | |||||
GetFieldData(db, allowNotFound, 1); | |||||
findKeysByCity(db); | |||||
checkDataInKVAndIndex(db); | |||||
delete db; | |||||
} | |||||
//创建索引与写有该索引数据的并发 | |||||
TEST(TestPutCreatei, Parallel) { | |||||
fielddb::DestroyDB("testdb2.2",Options()); | |||||
FieldDB *db = new FieldDB(); | |||||
if(OpenDB("testdb2.2", &db).ok() == false) { | |||||
std::cerr << "open db failed" << std::endl; | |||||
abort(); | |||||
} | |||||
// ClearDB(db); | |||||
shanghaiKeys.clear(); | |||||
InsertFieldData(db); | |||||
int thread_num_ = 2; | |||||
std::vector<std::thread> threads(thread_num_); | |||||
for (size_t i = 0; i < thread_num_; i++) | |||||
{ | |||||
if (i == 0) {//创建索引 | |||||
threads[i] = std::thread([db](){ | |||||
db->CreateIndexOnField("address", op); | |||||
std::cout << "finish create index\n"; | |||||
}); | |||||
} else {//写 | |||||
threads[i] = std::thread([db](){ | |||||
while (db->GetIndexStatus("address") == NotExist){ | |||||
continue; //开始创建了再并发的写 | |||||
} | |||||
InsertOneField(db); //先插一条 | |||||
}); | |||||
} | |||||
} | |||||
for (auto& t : threads) { | |||||
if (t.joinable()) { | |||||
t.join(); | |||||
} | |||||
} | |||||
//检查索引是否创建成功 | |||||
bool haveIndex = true; | |||||
findKeysByCityIndex(db, haveIndex); | |||||
//检查写入是否成功 | |||||
GetOneField(db); | |||||
checkDataInKVAndIndex(db); | |||||
delete db; | |||||
} | |||||
//创建删除不同索引的并发 | |||||
TEST(TestCreateiCreatei, Parallel) { | |||||
fielddb::DestroyDB("testdb2.3",Options()); | |||||
FieldDB *db = new FieldDB(); | |||||
if(OpenDB("testdb2.3", &db).ok() == false) { | |||||
std::cerr << "open db failed" << std::endl; | |||||
abort(); | |||||
} | |||||
// ClearDB(db); | |||||
shanghaiKeys.clear(); | |||||
age20Keys.clear(); | |||||
InsertFieldData(db); | |||||
int thread_num_ = 3; | |||||
std::vector<std::thread> threads(thread_num_); | |||||
for (size_t i = 0; i < thread_num_; i++) | |||||
{ | |||||
//3线程并发创建索引address | |||||
threads[i] = std::thread([db](){ | |||||
db->CreateIndexOnField("address", op); | |||||
std::cout << "finish create index address\n"; | |||||
}); | |||||
} | |||||
for (auto& t : threads) { | |||||
if (t.joinable()) { | |||||
t.join(); | |||||
} | |||||
} | |||||
//检查索引是否创建成功 | |||||
bool haveIndex = true; | |||||
findKeysByCityIndex(db, haveIndex); | |||||
findKeysByAgeIndex(db, false); | |||||
checkDataInKVAndIndex(db); | |||||
for (size_t i = 0; i < thread_num_; i++) | |||||
{ | |||||
if (i == 0 || i == 1) {//2线程删除索引address | |||||
threads[i] = std::thread([db](){ | |||||
db->DeleteIndex("address", op); | |||||
std::cout << "finish delete index address\n"; | |||||
}); | |||||
} else {//1线程创建索引age | |||||
threads[i] = std::thread([db](){ | |||||
db->CreateIndexOnField("age", op); | |||||
std::cout << "finish create index age\n"; | |||||
}); | |||||
} | |||||
} | |||||
for (auto& t : threads) { | |||||
if (t.joinable()) { | |||||
t.join(); | |||||
} | |||||
} | |||||
//检查 | |||||
findKeysByCityIndex(db, false); | |||||
findKeysByAgeIndex(db, true); | |||||
delete db; | |||||
} | |||||
//有索引时,大量并发put与delete相同key,确保kvdb和indexdb的一致性 | |||||
TEST(TestPutDeleteOne, Parallel) { | |||||
fielddb::DestroyDB("testdb2.4",Options()); | |||||
FieldDB *db = new FieldDB(); | |||||
if(OpenDB("testdb2.4", &db).ok() == false) { | |||||
std::cerr << "open db failed" << std::endl; | |||||
abort(); | |||||
} | |||||
// ClearDB(db); | |||||
shanghaiKeys.clear(); | |||||
age20Keys.clear(); | |||||
db->CreateIndexOnField("address", op); | |||||
db->CreateIndexOnField("age", op); | |||||
int thread_num_ = 20; | |||||
std::vector<std::thread> threads(thread_num_); | |||||
for (size_t i = 0; i < thread_num_; i++) | |||||
{ | |||||
if (i % 2 == 0) {//2线程删除索引address | |||||
threads[i] = std::thread([db](){ | |||||
for (size_t j = 0; j < 100; j++) | |||||
{ | |||||
InsertOneField(db, std::to_string(j)); | |||||
} | |||||
}); | |||||
} else {//1线程创建索引age | |||||
threads[i] = std::thread([db](){ | |||||
for (size_t j = 0; j < 100; j++) | |||||
{ | |||||
DeleteOneField(db, std::to_string(j)); | |||||
} | |||||
}); | |||||
} | |||||
} | |||||
for (auto& t : threads) { | |||||
if (t.joinable()) { | |||||
t.join(); | |||||
} | |||||
} | |||||
//检查 | |||||
checkDataInKVAndIndex(db); | |||||
delete db; | |||||
} | |||||
//有索引时,put与delete的并发,确保kvdb和indexdb的一致性 | |||||
TEST(TestPutDelete, Parallel) { | |||||
fielddb::DestroyDB("testdb2.5",Options()); | |||||
FieldDB *db = new FieldDB(); | |||||
if(OpenDB("testdb2.5", &db).ok() == false) { | |||||
std::cerr << "open db failed" << std::endl; | |||||
abort(); | |||||
} | |||||
// ClearDB(db); | |||||
shanghaiKeys.clear(); | |||||
age20Keys.clear(); | |||||
db->CreateIndexOnField("address", op); | |||||
db->CreateIndexOnField("age", op); | |||||
int thread_num_ = 4; | |||||
std::vector<std::thread> threads(thread_num_); | |||||
threads[0] = std::thread([db](){InsertFieldData(db);}); | |||||
threads[1] = std::thread([db](){InsertFieldData(db, 1);}); | |||||
threads[2] = std::thread([db](){DeleteFieldData(db);}); | |||||
threads[3] = std::thread([db](){DeleteFieldData(db, 1);}); | |||||
for (auto& t : threads) { | |||||
if (t.joinable()) { | |||||
t.join(); | |||||
} | |||||
} | |||||
//检查 | |||||
checkDataInKVAndIndex(db); | |||||
delete db; | |||||
} | |||||
//write和其他功能的并发(大杂烩 | |||||
TEST(TestWrite, Parallel) { | |||||
fielddb::DestroyDB("testdb2.6",Options()); | |||||
FieldDB *db = new FieldDB(); | |||||
if(OpenDB("testdb2.6", &db).ok() == false) { | |||||
std::cerr << "open db failed" << std::endl; | |||||
abort(); | |||||
} | |||||
// ClearDB(db); | |||||
shanghaiKeys.clear(); | |||||
age20Keys.clear(); | |||||
db->CreateIndexOnField("address", op); | |||||
InsertFieldData(db, 2); //先填点数据,让创建索引的时间久一点 | |||||
int thread_num_ = 5; | |||||
std::vector<std::thread> threads(thread_num_); | |||||
threads[0] = std::thread([db](){db->CreateIndexOnField("age", op);}); | |||||
threads[1] = std::thread([db](){ | |||||
while (db->GetIndexStatus("age") == NotExist){ | |||||
continue; //开始创建了再并发的写 | |||||
} | |||||
InsertFieldData(db);}); | |||||
threads[2] = std::thread([db](){ | |||||
while (db->GetIndexStatus("age") == NotExist){ | |||||
continue; | |||||
} | |||||
WriteFieldData(db, 1);}); | |||||
threads[3] = std::thread([db](){ | |||||
while (db->GetIndexStatus("age") == NotExist){ | |||||
continue; | |||||
} | |||||
DeleteFieldData(db, 0);}); | |||||
threads[4] = std::thread([db](){ | |||||
while (db->GetIndexStatus("age") == NotExist){ | |||||
continue; | |||||
} | |||||
db->DeleteIndex("age", op);}); | |||||
for (auto& t : threads) { | |||||
if (t.joinable()) { | |||||
t.join(); | |||||
} | |||||
} | |||||
//检查 | |||||
checkDataInKVAndIndex(db); | |||||
ASSERT_EQ(db->GetIndexStatus("age"), NotExist); //删除索引的请求应该被pend在创建之上 | |||||
//删掉最后一个线程,可以测试创建age索引时并发的写入能不能保持age的一致性 | |||||
//checkDataInKVAndIndex(db, "age"); | |||||
delete db; | |||||
} | |||||
int main(int argc, char** argv) { | |||||
// All tests currently run with the same read-only file limits. | |||||
testing::InitGoogleTest(&argc, argv); | |||||
return RUN_ALL_TESTS(); | |||||
} |
@ -0,0 +1,90 @@ | |||||
#include "gtest/gtest.h" | |||||
// #include "leveldb/env.h" | |||||
// #include "leveldb/db.h" | |||||
#include "fielddb/field_db.h" | |||||
#include "test/helper.cc" | |||||
#include <thread> | |||||
#include <csignal> | |||||
#include <exception> | |||||
using namespace fielddb; | |||||
TEST(TestNormalRecover, Recover) { | |||||
fielddb::DestroyDB("testdb3.1",Options()); | |||||
FieldDB *db = new FieldDB(); | |||||
if(OpenDB("testdb3.1", &db).ok() == false) { | |||||
std::cerr << "open db failed" << std::endl; | |||||
abort(); | |||||
} | |||||
db->CreateIndexOnField("address", op); | |||||
db->CreateIndexOnField("age", op); | |||||
InsertFieldData(db); | |||||
bool allowNotFound = false; | |||||
GetFieldData(db, allowNotFound); | |||||
findKeysByCityIndex(db, true); | |||||
findKeysByAgeIndex(db, true); | |||||
delete db; | |||||
db = new FieldDB(); | |||||
if(OpenDB("testdb3.1", &db).ok() == false) { | |||||
std::cerr << "open db failed" << std::endl; | |||||
abort(); | |||||
} | |||||
//仍然能读到之前写入的数据和索引 | |||||
GetFieldData(db, allowNotFound); | |||||
findKeysByCityIndex(db, true); | |||||
findKeysByAgeIndex(db, true); | |||||
} | |||||
TEST(TestParalRecover, Recover) { | |||||
//第一次运行 | |||||
// fielddb::DestroyDB("testdb3.2",Options()); | |||||
// FieldDB *db = new FieldDB(); | |||||
// if(OpenDB("testdb3.2", &db).ok() == false) { | |||||
// std::cerr << "open db failed" << std::endl; | |||||
// abort(); | |||||
// } | |||||
// db->CreateIndexOnField("address", op); | |||||
// db->CreateIndexOnField("age", op); | |||||
// int thread_num_ = 4; | |||||
// std::vector<std::thread> threads(thread_num_); | |||||
// threads[0] = std::thread([db](){ | |||||
// InsertFieldData(db); | |||||
// }); | |||||
// threads[1] = std::thread([db](){ | |||||
// WriteFieldData(db); | |||||
// }); | |||||
// threads[2] = std::thread([db](){ | |||||
// DeleteFieldData(db); | |||||
// }); | |||||
// threads[3] = std::thread([db](){ | |||||
// InsertOneField(db); | |||||
// delete db; | |||||
// }); | |||||
// for (auto& t : threads) { | |||||
// if (t.joinable()) { | |||||
// t.join(); | |||||
// } | |||||
// } | |||||
//线程3导致了其他线程错误,测试会终止(模拟数据库崩溃) | |||||
//这会导致各线程在各种奇怪的时间点崩溃 | |||||
//第二次运行注释掉上面的代码,运行下面的代码测试恢复 | |||||
//第二次运行 | |||||
FieldDB *db = new FieldDB(); | |||||
if(OpenDB("testdb3.2", &db).ok() == false) { | |||||
std::cerr << "open db failed" << std::endl; | |||||
abort(); | |||||
} | |||||
GetOneField(db); | |||||
checkDataInKVAndIndex(db); | |||||
//这里会出现两个数字,如果>1说明除了线程3插入的一条数据,其他线程也有数据在崩溃前被正确恢复了 | |||||
} | |||||
int main(int argc, char** argv) { | |||||
// All tests currently run with the same read-only file limits. | |||||
testing::InitGoogleTest(&argc, argv); | |||||
return RUN_ALL_TESTS(); | |||||
} |
@ -0,0 +1,112 @@ | |||||
#include "testdb/testdb.h" | |||||
#include "db/db_impl.h" | |||||
#include <memory> | |||||
#include "leveldb/status.h" | |||||
#include "testdb.h" | |||||
using namespace testdb; | |||||
Status testDB::OpentestDB(Options& options, | |||||
const std::string& name, testDB** dbptr) { | |||||
// options.env->CreateDir("./abc") | |||||
if(*dbptr == nullptr){ | |||||
return Status::NotSupported(name, "new a testDb first\n"); | |||||
} | |||||
// | |||||
Status status; | |||||
DB *indexdb, *kvdb, *metadb; | |||||
// options.block_cache = NewLRUCache(ULONG_MAX); | |||||
// options.max_open_files = 1000; | |||||
// options.write_buffer_size = 512 * 1024 * 1024; | |||||
// options.env = getPosixEnv(); | |||||
// status = Open(options, name+"_indexDB", &indexdb); | |||||
// if(!status.ok()) return status; | |||||
// (*dbptr)->indexDB_ = indexdb; | |||||
// options.env = getPosixEnv(); | |||||
status = DB::Open(options, name+"_kvDB", &kvdb); | |||||
if(!status.ok()) return status; | |||||
(*dbptr)->kvDB_ = kvdb; | |||||
// options.env = getPosixEnv(); | |||||
// status = Open(options, name+"_metaDB", &metadb); | |||||
// if(!status.ok()) return status; | |||||
// (*dbptr)->metaDB_ = metadb; | |||||
(*dbptr)->dbname_ = name; | |||||
// status = (*dbptr)->Recover(); | |||||
(*dbptr)->options_ = &options; | |||||
(*dbptr)->env_ = options.env; | |||||
return status; | |||||
} | |||||
Status testDB::Put(const WriteOptions &options, const Slice &key, const Slice &value) { | |||||
return kvDB_->Put(options, key, value); | |||||
} | |||||
Status testDB::PutFields(const WriteOptions &, const Slice &key, const FieldArray &tests) { | |||||
return Status::OK(); | |||||
} | |||||
Status testDB::Delete(const WriteOptions &options, const Slice &key) { | |||||
return kvDB_->Delete(options, key); | |||||
} | |||||
Status testDB::Write(const WriteOptions &options, WriteBatch *updates) { | |||||
return kvDB_->Write(options, updates); | |||||
} | |||||
Status testDB::Get(const ReadOptions &options, const Slice &key, std::string *value) { | |||||
return kvDB_->Get(options, key, value); | |||||
} | |||||
Status testDB::GetFields(const ReadOptions &options, const Slice &key, FieldArray *tests) { | |||||
return Status::OK(); | |||||
} | |||||
std::vector<std::string> testDB::FindKeysByField(Field &test) { | |||||
return std::vector<std::string>(); | |||||
} | |||||
Iterator * testDB::NewIterator(const ReadOptions &options) { | |||||
return kvDB_->NewIterator(options); | |||||
} | |||||
const Snapshot * testDB::GetSnapshot() { | |||||
return kvDB_->GetSnapshot(); | |||||
} | |||||
void testDB::ReleaseSnapshot(const Snapshot *snapshot) { | |||||
kvDB_->ReleaseSnapshot(snapshot); | |||||
} | |||||
bool testDB::GetProperty(const Slice &property, std::string *value) { | |||||
return kvDB_->GetProperty(property, value); | |||||
} | |||||
void testDB::GetApproximateSizes(const Range *range, int n, uint64_t *sizes) { | |||||
kvDB_->GetApproximateSizes(range, n, sizes); | |||||
} | |||||
void testDB::CompactRange(const Slice *begin, const Slice *end) { | |||||
kvDB_->CompactRange(begin, end); | |||||
} | |||||
Status testdb::DestroyDB(const std::string& name, const Options& options) { | |||||
Status s; | |||||
s = leveldb::DestroyDB(name+"_kvDB", options); | |||||
assert(s.ok()); | |||||
// s = leveldb::DestroyDB(name+"_indexDB", options); | |||||
// assert(s.ok()); | |||||
// s = leveldb::DestroyDB(name+"_metaDB", options); | |||||
// assert(s.ok()); | |||||
return s; | |||||
} | |||||
testDB::~testDB() { | |||||
delete kvDB_; | |||||
// delete indexDB_; | |||||
// delete metaDB_; | |||||
} |
@ -0,0 +1,72 @@ | |||||
#include "port/port_stdcxx.h" | |||||
#include "db/db_impl.h" | |||||
#include <cstdint> | |||||
#include <cstdio> | |||||
#include <deque> | |||||
#include <map> | |||||
#include <set> | |||||
#include <string> | |||||
#include "leveldb/db.h" | |||||
#include "leveldb/env.h" | |||||
#include "leveldb/options.h" | |||||
#include "leveldb/slice.h" | |||||
#include "leveldb/status.h" | |||||
#include <shared_mutex> | |||||
# ifndef test_DB_H | |||||
# define test_DB_H | |||||
namespace testdb { | |||||
using namespace leveldb; | |||||
enum IndexStatus{ | |||||
Creating, | |||||
Deleting, | |||||
Exist, | |||||
NotExist | |||||
}; | |||||
class testDB { | |||||
private: | |||||
leveldb::DB *kvDB_; | |||||
// leveldb::DB *metaDB_; | |||||
// leveldb::DB *indexDB_; | |||||
std::string dbname_; | |||||
const Options *options_; | |||||
Env *env_; | |||||
public: | |||||
friend class Request; | |||||
friend class testsReq; | |||||
friend class iCreateReq; | |||||
friend class iDeleteReq; | |||||
friend class DeleteReq; | |||||
friend class BatchReq; | |||||
//用的时候必须testDB *db = new testDB()再open,不能像之前一样DB *db | |||||
// testDB() : indexDB_(nullptr), kvDB_(nullptr), metaDB_(nullptr) {}; | |||||
testDB() : kvDB_(nullptr) { } | |||||
~testDB(); | |||||
/*lab1的要求,作为db派生类要实现的虚函数*/ | |||||
Status Put(const WriteOptions &options, const Slice &key, const Slice &value) ; | |||||
Status PutFields(const WriteOptions &, const Slice &key, const FieldArray &tests) ; | |||||
Status Delete(const WriteOptions &options, const Slice &key) ; | |||||
Status Write(const WriteOptions &options, WriteBatch *updates) ; | |||||
Status Get(const ReadOptions &options, const Slice &key, std::string *value) ; | |||||
Status GetFields(const ReadOptions &options, const Slice &key, FieldArray *tests) ; | |||||
std::vector<std::string> FindKeysByField(Field &test) ; | |||||
Iterator * NewIterator(const ReadOptions &options) ; | |||||
const Snapshot * GetSnapshot() ; | |||||
void ReleaseSnapshot(const Snapshot *snapshot) ; | |||||
bool GetProperty(const Slice &property, std::string *value) ; | |||||
void GetApproximateSizes(const Range *range, int n, uint64_t *sizes) ; | |||||
void CompactRange(const Slice *begin, const Slice *end) ; | |||||
static Status OpentestDB(Options& options,const std::string& name,testDB** dbptr); | |||||
}; | |||||
Status DestroyDB(const std::string& name, | |||||
const Options& options); | |||||
} // end of namespace | |||||
# endif |