Explorar el Código

对于过期导致的value不命中,多减少一次allowed_seek的次数

lzj_version
林子骥 hace 3 semanas
padre
commit
881a751a29
Se han modificado 5 ficheros con 64 adiciones y 85 borrados
  1. +18
    -54
      db/db_impl.cc
  2. +15
    -1
      db/version_set.cc
  3. +3
    -3
      db/version_set.h
  4. +1
    -0
      include/leveldb/status.h
  5. +27
    -27
      test/ttl_test.cc

+ 18
- 54
db/db_impl.cc Ver fichero

@ -1178,18 +1178,24 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
}
mutex_.Lock();
}
if (have_stat_update && current->UpdateStats(stats)) {
if(s.ok()){
s = CheckIsExpire(value);
}
if (have_stat_update && current->UpdateStats(stats,s.IsExpire())) {
MaybeScheduleCompaction();//有可能寻址过多,导致allow_seek为0,触发合并。
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
if(!s.ok()){
return s;
}
auto s2 = CheckIsExpire(value);
return s2;
// if(!s.ok()){
// return s;
// }
// auto s2 = CheckIsExpire(value);
// if(!s2.ok()){
// current->UpdateStats(stats);
// }
return s;
}
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
@ -1229,21 +1235,6 @@ Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
Status DBImpl::Put(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t ttl) {
//rocksdb的实现
// Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts,
// SystemClock* clock) {
// val_with_ts->reserve(kTSLength + val.size());
// char ts_string[kTSLength];
// int64_t curtime;
// Status st = clock->GetCurrentTime(&curtime);
// if (!st.ok()) {
// return st;
// }
// EncodeFixed32(ts_string, (int32_t)curtime);
// val_with_ts->append(val.data(), val.size());
// val_with_ts->append(ts_string, kTSLength);
// return st;
// }
std::string val_with_ts;
val_with_ts.reserve(value.size() + kTSLength);
@ -1256,12 +1247,6 @@ Status DBImpl::Put(const WriteOptions& options, const Slice& key,
// 将 expiration_time 追加到 val_with_ts
val_with_ts.append(ts_string,kTSLength);
// std::cout << "val_with_ts in hex: ";
// for (unsigned char c : val_with_ts) {
// std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)c << " ";
// }
// std::cout << std::endl;
return DB::Put(options, key, Slice(val_with_ts));
}
@ -1569,45 +1554,24 @@ void DBImpl::AppendTS(const Slice& val, std::string* val_with_ts,uint64_t ttl) {
* @return timestamp in val,and remove timestamp from val
*/
uint64_t DBImpl::GetTS(std::string* val) {
//uint64_t expiration_time;
// 输出 val 的十六进制表示
// std::cout << "befor decode,val in hex: ";
// for (unsigned char c : *val) {
// std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)c << " ";
// }
// std::cout << std::endl;
auto expiration_time = DecodeFixed64(val->data() + val->size() - kTSLength);
//memcpy(&expiration_time, val->data() + val->size() - sizeof(TIMESTAMP), sizeof(TIMESTAMP));
val->resize(val->size() - kTSLength);
// std::cout << "after decode,val in hex: ";
// for (unsigned char c : *val) {
// std::cout << std::hex << std::setw(2) << std::setfill('0') << (int)c << " ";
// }
// std::cout << std::endl;
return expiration_time;
// Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) {
// if (pinnable_val->size() < kTSLength) {
// return Status::Corruption("Bad timestamp in key-value");
// }
// // Erasing characters which hold the TS
// pinnable_val->remove_suffix(kTSLength);
// return Status::OK();
// }
}
Status DBImpl::CheckIsExpire(std::string* value) {
//debug 用
auto a = env_->NowMicros();
auto b = GetTS(value);
// std::cout<<"get current time"<<a<<std::endl;
// std::cout << "get ts from val"<<b<<std::endl;
if(a > b){
return Status::Expire("Expire",Slice());
}
return Status();
// if(env_->GetCurrentTime() > GetTS(value)){
// return Status::Expire("Expire",Slice());
// }
}
/**

+ 15
- 1
db/version_set.cc Ver fichero

@ -413,6 +413,20 @@ bool Version::UpdateStats(const GetStats& stats) {
return false;
}
bool Version::UpdateStats(const GetStats& stats,bool is_expire) {
FileMetaData* f = stats.seek_file;
if (f != nullptr) {
f->allowed_seeks--;
if(is_expire)f->allowed_seeks--;
if (f->allowed_seeks <= 0 && file_to_compact_ == nullptr) {
file_to_compact_ = f;
file_to_compact_level_ = stats.seek_file_level;
return true;
}
}
return false;
}
bool Version::RecordReadSample(Slice internal_key) {
ParsedInternalKey ikey;
if (!ParseInternalKey(internal_key, &ikey)) {
@ -821,7 +835,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
// Write new record to MANIFEST log
if (s.ok()) {
std::string record;
edit->EncodeTo(&record);// TODO:修改
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync();

+ 3
- 3
db/version_set.h Ver fichero

@ -80,8 +80,8 @@ class Version {
// compaction may need to be triggered, false otherwise.
// REQUIRES: lock is held
bool UpdateStats(const GetStats& stats);
// Record a sample of bytes read at the specified internal key.
bool UpdateStats(const GetStats& stats,bool is_expire);
// Record a sample of bytes read at the specified internal key.
// Samples are taken approximately once every config::kReadBytesPeriod
// bytes. Returns true if a new compaction may need to be triggered.
// REQUIRES: lock is held
@ -313,7 +313,7 @@ class VersionSet {
// Per-level key at which the next compaction at that level should start.
// Either an empty string, or a valid InternalKey.
std::string compact_pointer_[config::kNumLevels];
std::string compact_pointer_[config::kNumLevels];//
};
// A Compaction encapsulates information about a compaction.

+ 1
- 0
include/leveldb/status.h Ver fichero

@ -74,6 +74,7 @@ class LEVELDB_EXPORT Status {
// Returns true iff the status indicates an InvalidArgument.
bool IsInvalidArgument() const { return code() == kInvalidArgument; }
bool IsExpire()const{return code() == kExpire;}
// Return a string representation of this status suitable for printing.
// Returns the string "OK" for success.
std::string ToString() const;

+ 27
- 27
test/ttl_test.cc Ver fichero

@ -75,33 +75,33 @@ TEST(TestTTL, ReadTTL) {
Env::Default()->SleepForMicroseconds( 1000);
}
//TEST(TestTTL, CompactionTTL) {
// DestroyDB("testdb", Options());
// DB *db;
// if(OpenDB("testdb", &db).ok() == false) {
// std::cerr << "open db failed" << std::endl;
// abort();
// }
//
// uint64_t ttl = 20;
// InsertData(db, ttl);
//
// leveldb::Range ranges[1];
// ranges[0] = leveldb::Range("-", "A");
// uint64_t sizes[1];
// db->GetApproximateSizes(ranges, 1, sizes);
// ASSERT_GT(sizes[0], 0);
//
// Env::Default()->SleepForMicroseconds(ttl * 1000000);
//
// db->CompactRange(nullptr, nullptr);
//
// leveldb::Range ranges2[1];
// ranges2[0] = leveldb::Range("-", "A");
// uint64_t sizes2[1];
// db->GetApproximateSizes(ranges2, 1, sizes2);
// ASSERT_EQ(sizes2[0], 0);
//}
TEST(TestTTL, CompactionTTL) {
DestroyDB("testdb", Options());
DB *db;
if(OpenDB("testdb", &db).ok() == false) {
std::cerr << "open db failed" << std::endl;
abort();
}
uint64_t ttl = 20;
InsertData(db, ttl);
leveldb::Range ranges[1];
ranges[0] = leveldb::Range("-", "A");
uint64_t sizes[1];
db->GetApproximateSizes(ranges, 1, sizes);
ASSERT_GT(sizes[0], 0);
Env::Default()->SleepForMicroseconds(ttl * 1000000);
db->CompactRange(nullptr, nullptr);
leveldb::Range ranges2[1];
ranges2[0] = leveldb::Range("-", "A");
uint64_t sizes2[1];
db->GetApproximateSizes(ranges2, 1, sizes2);
ASSERT_EQ(sizes2[0], 0);
}
int main(int argc, char** argv) {

Cargando…
Cancelar
Guardar