@ -2,12 +2,16 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
# include <chrono>
# include <iostream>
# include "db/db_impl.h"
# include <algorithm>
# include <atomic>
# include <cstdint>
# include <cstdio>
# include <cctype>
# include <set>
# include <string>
# include <vector>
@ -591,7 +595,8 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
}
}
TEST_CompactMemTable ( ) ; // TODO(sanjay): Skip if memtable does not overlap
for ( int level = 0 ; level < max_level_with_files ; level + + ) {
for ( int level = 0 ; level < = max_level_with_files ; level + + ) {
// for (int level = 0; level < max_level_with_files; level++) {
TEST_CompactRange ( level , begin , end ) ;
}
}
@ -894,6 +899,17 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
return versions_ - > LogAndApply ( compact - > compaction - > edit ( ) , & mutex_ ) ;
}
/* TODO: Check if a string consists entirely of digits */
bool isAllDigits ( const std : : string & str ) {
for ( char c : str ) {
if ( ! isdigit ( c ) ) {
return false ;
}
}
return true ;
}
/* --------------------------- */
Status DBImpl : : DoCompactionWork ( CompactionState * compact ) {
const uint64_t start_micros = env_ - > NowMicros ( ) ;
int64_t imm_micros = 0 ; // Micros spent doing imm_ compactions
@ -978,7 +994,31 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
// Therefore this deletion marker is obsolete and can be dropped.
drop = true ;
}
/* TODO: Add TTL Version Compaction Drop Condition */
else {
std : : string user_value = input - > value ( ) . ToString ( ) ;
uint64_t now = std : : chrono : : duration_cast < std : : chrono : : seconds > ( std : : chrono : : steady_clock : : now ( ) . time_since_epoch ( ) ) . count ( ) ;
// if (user_value.find("_ts_") == std::string::npos) {
// input->value() = user_value + "_ts_" + std::to_string(now + ttl);
// }
// else {
// uint64_t deadtime = std::stoi(user_value.substr(user_value.find("_ts_") + 4));
// if (now >= deadtime) {
// drop = true;
// }
// }
size_t pos = user_value . rfind ( " _ts_ " ) ;
if ( pos ! = std : : string : : npos ) {
std : : string timestampStr = user_value . substr ( pos + 4 ) ;
if ( isAllDigits ( timestampStr ) ) {
uint64_t deadtime = std : : stoull ( timestampStr ) ;
if ( now > = deadtime ) {
drop = true ;
}
}
}
}
/* ----------------------------------------------- */
last_sequence_for_key = ikey . sequence ;
}
#if 0
@ -1117,6 +1157,44 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
return versions_ - > MaxNextLevelOverlappingBytes ( ) ;
}
/* TODO: Add TTL Version isLive() */
// Status isLive(const Slice& key, std::string* value, Status& s, uint64_t ttl) {
// if (value->empty()) {
// s = Status::NotFound(key);
// return s;
// }
// uint64_t now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
// if (value->find("_ts_") == std::string::npos) {
// *value = *value + "_ts_" + std::to_string(now + ttl);
// }
// else {
// uint64_t deadtime = std::stoi(value->substr(value->find("_ts_") + 4));
// if (now >= deadtime) {
// s = Status::NotFound(key);
// }
// }
// return s;
// }
/* --------------------------- */
Status isLive ( const Slice & key , std : : string * value , Status & s ) {
if ( value - > empty ( ) ) {
s = Status : : NotFound ( key ) ;
return s ;
}
uint64_t now = std : : chrono : : duration_cast < std : : chrono : : seconds > ( std : : chrono : : steady_clock : : now ( ) . time_since_epoch ( ) ) . count ( ) ;
size_t pos = value - > rfind ( " _ts_ " ) ;
if ( pos ! = std : : string : : npos ) {
std : : string timestampStr = value - > substr ( pos + 4 ) ;
if ( isAllDigits ( timestampStr ) ) {
uint64_t deadtime = std : : stoull ( timestampStr ) ;
if ( now > = deadtime ) {
s = Status : : NotFound ( key ) ;
}
}
}
return s ;
}
Status DBImpl : : Get ( const ReadOptions & options , const Slice & key ,
std : : string * value ) {
Status s ;
@ -1144,14 +1222,24 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
mutex_ . Unlock ( ) ;
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey ( key , snapshot ) ;
/* TODO: Add TTL Version Get() */
if ( mem - > Get ( lkey , value , & s ) ) {
// isLive(key, value, s, ttl);
isLive ( key , value , s ) ;
// Done
} else if ( imm ! = nullptr & & imm - > Get ( lkey , value , & s ) ) {
// isLive(key, value, s, ttl);
isLive ( key , value , s ) ;
// Done
} else {
s = current - > Get ( options , lkey , value , & stats ) ;
// if (s.ok()) isLive(key, value, s, ttl);
if ( s . ok ( ) ) isLive ( key , value , s ) ;
have_stat_update = true ;
}
/* --------------------------- */
mutex_ . Lock ( ) ;
}
@ -1491,6 +1579,17 @@ Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
return Write ( opt , & batch ) ;
}
/* TODO: Add TTL Version Put() */
Status DBImpl : : Put ( const WriteOptions & opt , const Slice & key , const Slice & value , uint64_t ttl ) {
WriteBatch batch ;
auto now = std : : chrono : : duration_cast < std : : chrono : : seconds > ( std : : chrono : : steady_clock : : now ( ) . time_since_epoch ( ) ) . count ( ) ;
auto end = now + ttl ;
Slice value_timestamp = Slice ( value . ToString ( ) + " _ts_ " + std : : to_string ( end ) ) ;
batch . Put ( key , value_timestamp ) ;
return Write ( opt , & batch ) ;
}
/* --------------------------- */
Status DB : : Delete ( const WriteOptions & opt , const Slice & key ) {
WriteBatch batch ;
batch . Delete ( key ) ;
@ -1508,12 +1607,22 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
// Recover handles create_if_missing, error_if_exists
bool save_manifest = false ;
Status s = impl - > Recover ( & edit , & save_manifest ) ;
if ( ! s . ok ( ) ) {
std : : cerr < < " Recover failed: " < < s . ToString ( ) < < std : : endl ;
}
if ( s . ok ( ) & & impl - > mem_ = = nullptr ) {
// Create new log and a corresponding memtable.
uint64_t new_log_number = impl - > versions_ - > NewFileNumber ( ) ;
WritableFile * lfile ;
s = options . env - > NewWritableFile ( LogFileName ( dbname , new_log_number ) ,
& lfile ) ;
if ( ! s . ok ( ) ) {
std : : cerr < < " NewWritableFile failed: " < < s . ToString ( ) < < std : : endl ;
}
if ( s . ok ( ) ) {
edit . SetLogNumber ( new_log_number ) ;
impl - > logfile_ = lfile ;
@ -1527,6 +1636,11 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
edit . SetPrevLogNumber ( 0 ) ; // No older logs needed after recovery.
edit . SetLogNumber ( impl - > logfile_number_ ) ;
s = impl - > versions_ - > LogAndApply ( & edit , & impl - > mutex_ ) ;
if ( ! s . ok ( ) ) {
std : : cerr < < " LogAndApply failed: " < < s . ToString ( ) < < std : : endl ;
}
}
if ( s . ok ( ) ) {
impl - > RemoveObsoleteFiles ( ) ;