浏览代码

Merge commit '8dfdd9b9e92b8c26d4e4b37b24e3c90a47d49762' into cyq

gyf
cyq 8 个月前
父节点
当前提交
dc056b92b6
共有 4 个文件被更改,包括 111 次插入120 次删除
  1. +1
    -0
      db/db_impl.cc
  2. +98
    -112
      fielddb/field_db.cpp
  3. +1
    -1
      fielddb/request.cpp
  4. +11
    -7
      test/recover_test.cc

+ 1
- 0
db/db_impl.cc 查看文件

@ -1183,6 +1183,7 @@ std::vector DBImpl::FindKeysByField(Field &field){
result.push_back(iter->key().ToString()); result.push_back(iter->key().ToString());
} }
} }
delete iter;
return result; return result;
} }

+ 98
- 112
fielddb/field_db.cpp 查看文件

@ -57,16 +57,14 @@ Status FieldDB::OpenFieldDB(Options& options,
(*dbptr)->metaDB_ = metadb; (*dbptr)->metaDB_ = metadb;
(*dbptr)->dbname_ = name; (*dbptr)->dbname_ = name;
// status = (*dbptr)->Recover();
status = (*dbptr)->Recover();
(*dbptr)->options_ = &options; (*dbptr)->options_ = &options;
(*dbptr)->env_ = options.env; (*dbptr)->env_ = options.env;
return status; return status;
} }
// TODO:Recover
Status FieldDB::Recover() { Status FieldDB::Recover() {
//TODO:
//1. 遍历所有Index类型的meta,重建内存中的index_状态表 //1. 遍历所有Index类型的meta,重建内存中的index_状态表
Iterator *Iter = indexDB_->NewIterator(ReadOptions()); Iterator *Iter = indexDB_->NewIterator(ReadOptions());
std::string IndexKey; std::string IndexKey;
@ -76,10 +74,8 @@ Status FieldDB::Recover() {
ParsedInternalIndexKey ParsedIndex; ParsedInternalIndexKey ParsedIndex;
ParseInternalIndexKey(Slice(IndexKey),&ParsedIndex); ParseInternalIndexKey(Slice(IndexKey),&ParsedIndex);
index_[ParsedIndex.name_.ToString()] = {Exist,nullptr}; index_[ParsedIndex.name_.ToString()] = {Exist,nullptr};
//std::cout << "Existed Index : " << ParsedIndex.name_.ToString() << std::endl;
//构建下一个搜索的对象,在原来的fieldname的基础上加一个最大的ascii字符(不可见字符) //构建下一个搜索的对象,在原来的fieldname的基础上加一个最大的ascii字符(不可见字符)
//TODO:不知道这个做法有没有道理
std::string Seek; std::string Seek;
PutLengthPrefixedSlice(&Seek, ParsedIndex.name_); PutLengthPrefixedSlice(&Seek, ParsedIndex.name_);
Seek.push_back(0xff); Seek.push_back(0xff);
@ -133,103 +129,100 @@ Status FieldDB::HandleRequest(Request &req) {
uint64_t start_ = env_->NowMicros(); uint64_t start_ = env_->NowMicros();
MutexLock L(&mutex_); MutexLock L(&mutex_);
taskqueue_.push_back(&req); taskqueue_.push_back(&req);
Again:
while(!req.done && &req != taskqueue_.front()) {
req.cond_.Wait();
}
if(req.done) {
return req.s; //在返回时自动释放锁L
}
Request *tail = GetHandleInterval();
WriteBatch KVBatch,IndexBatch,MetaBatch;
std::unordered_set<std::string> batchKeySet;
Status status;
if(!tail->isiCreateReq() && !tail->isiDeleteReq()) {
//表明这一个区间并没有涉及index的创建删除
{
//1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。
MutexLock iL(&index_mu);
uint64_t start_construct = env_->NowMicros();
for(auto *req_ptr : taskqueue_) {
req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, batchKeySet);
if(req_ptr == tail) break;
}
construct_elapsed += env_->NowMicros() - start_construct;
}
//2. 首先写入meta,再并发写入index和kv,完成之后清除meta数据
//此处可以放锁是因为写入的有序性可以通过队列来保证
mutex_.Unlock();
uint64_t start_write = env_->NowMicros();
WriteOptions op;
if(MetaBatch.ApproximateSize() > 12) {
uint64_t start_meta = env_->NowMicros();
status = metaDB_->Write(op, &MetaBatch);
write_meta_elapsed += env_->NowMicros() - start_meta;
write_bytes += MetaBatch.ApproximateSize();
assert(status.ok());
}
//TODO:index的写入需要在另外一个线程中同时完成
if(IndexBatch.ApproximateSize() > 12) {
uint64_t start_index = env_->NowMicros();
status = indexDB_->Write(op, &IndexBatch);
write_index_elapsed += env_->NowMicros() - start_index;
write_bytes += IndexBatch.ApproximateSize();
assert(status.ok());
while(true){
while(!req.done && &req != taskqueue_.front()) {
req.cond_.Wait();
} }
if(KVBatch.ApproximateSize() > 12) {
uint64_t start_kv = env_->NowMicros();
status = kvDB_->Write(op, &KVBatch);
write_kv_elapsed += env_->NowMicros() - start_kv;
write_bytes += KVBatch.ApproximateSize();
assert(status.ok());
if(req.done) {
return req.s; //在返回时自动释放锁L
} }
//3. 将meta数据清除
if(MetaBatch.ApproximateSize() > 12) {
uint64_t start_clean = env_->NowMicros();
MetaCleaner cleaner;
cleaner.Collect(MetaBatch);
cleaner.CleanMetaBatch(metaDB_);
write_clean_elapsed += env_->NowMicros() - start_clean;
Request *tail = GetHandleInterval();
WriteBatch KVBatch,IndexBatch,MetaBatch;
std::unordered_set<std::string> batchKeySet;
Status status;
if(!tail->isiCreateReq() && !tail->isiDeleteReq()) {
//表明这一个区间并没有涉及index的创建删除
{
//1. 构建各个Batch。构建的过程中要保证索引状态的一致性,需要上锁。
MutexLock iL(&index_mu);
uint64_t start_construct = env_->NowMicros();
for(auto *req_ptr : taskqueue_) {
req_ptr->ConstructBatch(KVBatch, IndexBatch, MetaBatch, this, batchKeySet);
if(req_ptr == tail) break;
}
construct_elapsed += env_->NowMicros() - start_construct;
}
//2. 首先写入meta,再并发写入index和kv,完成之后清除meta数据
//此处可以放锁是因为写入的有序性可以通过队列来保证
mutex_.Unlock();
uint64_t start_write = env_->NowMicros();
WriteOptions op;
if(MetaBatch.ApproximateSize() > 12) {
uint64_t start_meta = env_->NowMicros();
status = metaDB_->Write(op, &MetaBatch);
write_meta_elapsed += env_->NowMicros() - start_meta;
write_bytes += MetaBatch.ApproximateSize();
assert(status.ok());
}
//TODO:index的写入需要在另外一个线程中同时完成
if(IndexBatch.ApproximateSize() > 12) {
uint64_t start_index = env_->NowMicros();
status = indexDB_->Write(op, &IndexBatch);
write_index_elapsed += env_->NowMicros() - start_index;
write_bytes += IndexBatch.ApproximateSize();
assert(status.ok());
}
if(KVBatch.ApproximateSize() > 12) {
uint64_t start_kv = env_->NowMicros();
status = kvDB_->Write(op, &KVBatch);
write_kv_elapsed += env_->NowMicros() - start_kv;
write_bytes += KVBatch.ApproximateSize();
assert(status.ok());
}
//3. 将meta数据清除
if(MetaBatch.ApproximateSize() > 12) {
uint64_t start_clean = env_->NowMicros();
MetaCleaner cleaner;
cleaner.Collect(MetaBatch);
cleaner.CleanMetaBatch(metaDB_);
write_clean_elapsed += env_->NowMicros() - start_clean;
}
write_elapsed += env_->NowMicros() - start_write;
mutex_.Lock();
} else {
//对于创建和删除索引的请求,通过prepare完成索引状态的更新
MutexLock iL(&index_mu);
req.Prepare(this);
} }
write_elapsed += env_->NowMicros() - start_write;
mutex_.Lock();
} else {
//对于创建和删除索引的请求,通过prepare完成索引状态的更新
MutexLock iL(&index_mu);
req.Prepare(this);
}
// {
// static int count = 0;
// if(count++ % 100000 == 0) {
// std::cout << "TaskQueue Size : " << taskqueue_.size() << std::endl;
// }
// }
while(true) {
Request *ready = taskqueue_.front();
// int debug = tail->type_;
taskqueue_.pop_front();
//当前ready不是队首,不是和index的创建有关
if(!ready->isPending() && !req.isiCreateReq() && !req.isiDeleteReq()) {
ready->s = status;
ready->done = true;
if (ready != &req) ready->cond_.Signal();
// {
// static int count = 0;
// if(count++ % 100000 == 0) {
// std::cout << "TaskQueue Size : " << taskqueue_.size() << std::endl;
// }
// }
while(true) {
Request *ready = taskqueue_.front();
// int debug = tail->type_;
taskqueue_.pop_front();
//当前ready不是队首,不是和index的创建有关
if(!ready->isPending() && !req.isiCreateReq() && !req.isiDeleteReq()) {
ready->s = status;
ready->done = true;
if (ready != &req) ready->cond_.Signal();
}
if (ready == tail) break;
} }
if (ready == tail) break;
}
elapsed += env_->NowMicros() - start_;
count ++;
dumpStatistics();
elapsed += env_->NowMicros() - start_;
count ++;
dumpStatistics();
if(!taskqueue_.empty()) {
taskqueue_.front()->cond_.Signal();
if(!taskqueue_.empty()) {
taskqueue_.front()->cond_.Signal();
}
//如果done==true,那么就不会继续等待直接退出
//如果处于某个请求的pending list里面,那么就会继续等待重新入队
} }
//如果done==true,那么就不会继续等待直接退出
//如果处于某个请求的pending list里面,那么就会继续等待重新入队
//这里用了万恶的goto,蛤蛤
goto Again;
// return status;
} }
// 这里把一个空串作为常规put的name // 这里把一个空串作为常规put的name
@ -242,7 +235,6 @@ Status FieldDB::Put(const WriteOptions &options, const Slice &key, const Slice &
// 需要对是否进行index更新做处理 // 需要对是否进行index更新做处理
Status FieldDB::PutFields(const WriteOptions &Options, Status FieldDB::PutFields(const WriteOptions &Options,
const Slice &key, const FieldArray &fields) { const Slice &key, const FieldArray &fields) {
//这里是为了const和slice-string的转换被迫搞得
std::string key_ = key.ToString(); std::string key_ = key.ToString();
FieldArray fields_ = fields; FieldArray fields_ = fields;
@ -250,7 +242,6 @@ Status FieldDB::PutFields(const WriteOptions &Options,
Status status = HandleRequest(req); Status status = HandleRequest(req);
return status; return status;
// return kvDB_->PutFields(Options, key, fields);
} }
// 删除有索引的key时indexdb也要同步 // 删除有索引的key时indexdb也要同步
@ -260,31 +251,26 @@ Status FieldDB::Delete(const WriteOptions &options, const Slice &key) {
DeleteReq req(&key_,&mutex_); DeleteReq req(&key_,&mutex_);
Status status = HandleRequest(req); Status status = HandleRequest(req);
return status; return status;
// return kvDB_->Delete(options, key);
} }
// TODO:根据updates里面的东西,要对是否需要更新index进行分别处理
Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) {
{
uint64_t start_ = env_->NowMicros();
Status status = kvDB_->Write(options, updates);
temp_elapsed += env_->NowMicros() - start_;
count ++;
dumpStatistics();
return status;
}
//或许应该再做一个接口?或者基于现有的接口进行改造
// 根据updates里面的东西,要对是否需要更新index进行分别处理
Status FieldDB::Write(const WriteOptions &options, WriteBatch *updates) {
// {
// uint64_t start_ = env_->NowMicros();
// Status status = kvDB_->Write(options, updates);
// temp_elapsed += env_->NowMicros() - start_;
// count ++;
// dumpStatistics();
// return status;
// }
uint64_t start_ = env_->NowMicros(); uint64_t start_ = env_->NowMicros();
BatchReq req(updates,&mutex_); BatchReq req(updates,&mutex_);
construct_BatchReq_init_elapsed += env_->NowMicros() - start_; construct_BatchReq_init_elapsed += env_->NowMicros() - start_;
Status status = HandleRequest(req); Status status = HandleRequest(req);
return status; return status;
assert(0);
return Status::OK();
} }
//由于常规put将空串作为name,这里也需要适当修改 //由于常规put将空串作为name,这里也需要适当修改
Status FieldDB::Get(const ReadOptions &options, const Slice &key, std::string *value) { Status FieldDB::Get(const ReadOptions &options, const Slice &key, std::string *value) {
// return kvDB_->Get(options, key, value);
FieldArray fields; FieldArray fields;
Status s = GetFields(options, key, &fields); Status s = GetFields(options, key, &fields);
if(!s.ok()) { if(!s.ok()) {

+ 1
- 1
fielddb/request.cpp 查看文件

@ -339,7 +339,7 @@ BatchReq::BatchReq(WriteBatch *Batch,port::Mutex *mu):
//为key和value构造存储的地方,防止由于string的析构造成可能得内存访问错误 //为key和value构造存储的地方,防止由于string的析构造成可能得内存访问错误
str_buf->push_back(key.ToString()); str_buf->push_back(key.ToString());
FieldArray *field = new FieldArray; FieldArray *field = new FieldArray;
// field = ParseValue(value.ToString(), field);
field = ParseValue(value.ToString(), field);
if (field->empty()){ //batch中的value没有field if (field->empty()){ //batch中的value没有field
fa_buf->push_back({{"",value.ToString()}}); fa_buf->push_back({{"",value.ToString()}});
} else { } else {

+ 11
- 7
test/recover_test.cc 查看文件

@ -36,7 +36,7 @@ TEST(TestNormalRecover, Recover) {
findKeysByAgeIndex(db, true); findKeysByAgeIndex(db, true);
} }
TEST(TestParalPutRecover, Recover) {
TEST(TestParalRecover, Recover) {
//第一次运行 //第一次运行
// fielddb::DestroyDB("testdb3.2",Options()); // fielddb::DestroyDB("testdb3.2",Options());
// FieldDB *db = new FieldDB(); // FieldDB *db = new FieldDB();
@ -47,24 +47,28 @@ TEST(TestParalPutRecover, Recover) {
// } // }
// db->CreateIndexOnField("address"); // db->CreateIndexOnField("address");
// db->CreateIndexOnField("age"); // db->CreateIndexOnField("age");
// shanghaiKeys.clear();
// age20Keys.clear();
// int thread_num_ = 2;
// int thread_num_ = 4;
// std::vector<std::thread> threads(thread_num_); // std::vector<std::thread> threads(thread_num_);
// threads[0] = std::thread([db](){ // threads[0] = std::thread([db](){
// InsertFieldData(db); // InsertFieldData(db);
// }); // });
// threads[1] = std::thread([db](){ // threads[1] = std::thread([db](){
// WriteFieldData(db);
// });
// threads[2] = std::thread([db](){
// DeleteFieldData(db);
// });
// threads[3] = std::thread([db](){
// InsertOneField(db); // InsertOneField(db);
// delete db; // delete db;
// }); // });
// for (auto& t : threads) {
// for (auto& t : threads) {
// if (t.joinable()) { // if (t.joinable()) {
// t.join(); // t.join();
// } // }
// } // }
//线程1导致了线程0错误,测试会终止(模拟数据库崩溃)
//这会导致线程0写入的各种奇怪的时间点崩溃
//线程3导致了其他线程错误,测试会终止(模拟数据库崩溃)
//这会导致线程在各种奇怪的时间点崩溃
//第二次运行注释掉上面的代码,运行下面的代码测试恢复 //第二次运行注释掉上面的代码,运行下面的代码测试恢复

正在加载...
取消
保存