|
@ -2,12 +2,16 @@ |
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
|
|
|
|
|
|
|
|
#include <chrono>
|
|
|
|
|
|
#include <iostream>
|
|
|
|
|
|
|
|
|
#include "db/db_impl.h"
|
|
|
#include "db/db_impl.h"
|
|
|
|
|
|
|
|
|
#include <algorithm>
|
|
|
#include <algorithm>
|
|
|
#include <atomic>
|
|
|
#include <atomic>
|
|
|
#include <cstdint>
|
|
|
#include <cstdint>
|
|
|
#include <cstdio>
|
|
|
#include <cstdio>
|
|
|
|
|
|
#include <cctype>
|
|
|
#include <set>
|
|
|
#include <set>
|
|
|
#include <string>
|
|
|
#include <string>
|
|
|
#include <vector>
|
|
|
#include <vector>
|
|
@ -591,7 +595,8 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) { |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
|
|
|
TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
|
|
|
for (int level = 0; level < max_level_with_files; level++) { |
|
|
|
|
|
|
|
|
for (int level = 0; level <= max_level_with_files; level++) { |
|
|
|
|
|
// for (int level = 0; level < max_level_with_files; level++) {
|
|
|
TEST_CompactRange(level, begin, end); |
|
|
TEST_CompactRange(level, begin, end); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -894,6 +899,17 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { |
|
|
return versions_->LogAndApply(compact->compaction->edit(), &mutex_); |
|
|
return versions_->LogAndApply(compact->compaction->edit(), &mutex_); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/* TODO: Check if a string consists entirely of digits */ |
|
|
|
|
|
bool isAllDigits(const std::string& str) { |
|
|
|
|
|
for (char c : str) { |
|
|
|
|
|
if (!isdigit(c)) { |
|
|
|
|
|
return false; |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
return true; |
|
|
|
|
|
} |
|
|
|
|
|
/* --------------------------- */ |
|
|
|
|
|
|
|
|
Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
const uint64_t start_micros = env_->NowMicros(); |
|
|
const uint64_t start_micros = env_->NowMicros(); |
|
|
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
|
|
|
int64_t imm_micros = 0; // Micros spent doing imm_ compactions
|
|
@ -978,7 +994,31 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { |
|
|
// Therefore this deletion marker is obsolete and can be dropped.
|
|
|
// Therefore this deletion marker is obsolete and can be dropped.
|
|
|
drop = true; |
|
|
drop = true; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/* TODO: Add TTL Version Compaction Drop Condition */ |
|
|
|
|
|
else { |
|
|
|
|
|
std::string user_value = input->value().ToString(); |
|
|
|
|
|
uint64_t now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); |
|
|
|
|
|
// if (user_value.find("_ts_") == std::string::npos) {
|
|
|
|
|
|
// input->value() = user_value + "_ts_" + std::to_string(now + ttl);
|
|
|
|
|
|
// }
|
|
|
|
|
|
// else {
|
|
|
|
|
|
// uint64_t deadtime = std::stoi(user_value.substr(user_value.find("_ts_") + 4));
|
|
|
|
|
|
// if (now >= deadtime) {
|
|
|
|
|
|
// drop = true;
|
|
|
|
|
|
// }
|
|
|
|
|
|
// }
|
|
|
|
|
|
size_t pos = user_value.rfind("_ts_"); |
|
|
|
|
|
if (pos != std::string::npos){ |
|
|
|
|
|
std::string timestampStr = user_value.substr(pos + 4); |
|
|
|
|
|
if (isAllDigits(timestampStr)) { |
|
|
|
|
|
uint64_t deadtime = std::stoull(timestampStr); |
|
|
|
|
|
if (now >= deadtime) { |
|
|
|
|
|
drop = true; |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
/* ----------------------------------------------- */ |
|
|
last_sequence_for_key = ikey.sequence; |
|
|
last_sequence_for_key = ikey.sequence; |
|
|
} |
|
|
} |
|
|
#if 0
|
|
|
#if 0
|
|
@ -1117,6 +1157,44 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { |
|
|
return versions_->MaxNextLevelOverlappingBytes(); |
|
|
return versions_->MaxNextLevelOverlappingBytes(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/* TODO: Add TTL Version isLive() */ |
|
|
|
|
|
// Status isLive(const Slice& key, std::string* value, Status& s, uint64_t ttl) {
|
|
|
|
|
|
// if (value->empty()) {
|
|
|
|
|
|
// s = Status::NotFound(key);
|
|
|
|
|
|
// return s;
|
|
|
|
|
|
// }
|
|
|
|
|
|
// uint64_t now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
|
|
|
|
|
|
// if (value->find("_ts_") == std::string::npos) {
|
|
|
|
|
|
// *value = *value + "_ts_" + std::to_string(now + ttl);
|
|
|
|
|
|
// }
|
|
|
|
|
|
// else {
|
|
|
|
|
|
// uint64_t deadtime = std::stoi(value->substr(value->find("_ts_") + 4));
|
|
|
|
|
|
// if (now >= deadtime) {
|
|
|
|
|
|
// s = Status::NotFound(key);
|
|
|
|
|
|
// }
|
|
|
|
|
|
// }
|
|
|
|
|
|
// return s;
|
|
|
|
|
|
// }
|
|
|
|
|
|
/* --------------------------- */ |
|
|
|
|
|
Status isLive(const Slice& key, std::string* value, Status& s) { |
|
|
|
|
|
if (value->empty()) { |
|
|
|
|
|
s = Status::NotFound(key); |
|
|
|
|
|
return s; |
|
|
|
|
|
} |
|
|
|
|
|
uint64_t now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); |
|
|
|
|
|
size_t pos = value->rfind("_ts_"); |
|
|
|
|
|
if (pos != std::string::npos){ |
|
|
|
|
|
std::string timestampStr = value->substr(pos + 4); |
|
|
|
|
|
if (isAllDigits(timestampStr)) { |
|
|
|
|
|
uint64_t deadtime = std::stoull(timestampStr); |
|
|
|
|
|
if (now >= deadtime) { |
|
|
|
|
|
s = Status::NotFound(key); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
return s; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
Status DBImpl::Get(const ReadOptions& options, const Slice& key, |
|
|
Status DBImpl::Get(const ReadOptions& options, const Slice& key, |
|
|
std::string* value) { |
|
|
std::string* value) { |
|
|
Status s; |
|
|
Status s; |
|
@ -1144,14 +1222,24 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key, |
|
|
mutex_.Unlock(); |
|
|
mutex_.Unlock(); |
|
|
// First look in the memtable, then in the immutable memtable (if any).
|
|
|
// First look in the memtable, then in the immutable memtable (if any).
|
|
|
LookupKey lkey(key, snapshot); |
|
|
LookupKey lkey(key, snapshot); |
|
|
|
|
|
|
|
|
|
|
|
/* TODO: Add TTL Version Get() */ |
|
|
if (mem->Get(lkey, value, &s)) { |
|
|
if (mem->Get(lkey, value, &s)) { |
|
|
|
|
|
// isLive(key, value, s, ttl);
|
|
|
|
|
|
isLive(key, value, s); |
|
|
// Done
|
|
|
// Done
|
|
|
} else if (imm != nullptr && imm->Get(lkey, value, &s)) { |
|
|
} else if (imm != nullptr && imm->Get(lkey, value, &s)) { |
|
|
|
|
|
// isLive(key, value, s, ttl);
|
|
|
|
|
|
isLive(key, value, s); |
|
|
// Done
|
|
|
// Done
|
|
|
} else { |
|
|
} else { |
|
|
s = current->Get(options, lkey, value, &stats); |
|
|
s = current->Get(options, lkey, value, &stats); |
|
|
|
|
|
// if (s.ok()) isLive(key, value, s, ttl);
|
|
|
|
|
|
if (s.ok()) isLive(key, value, s); |
|
|
have_stat_update = true; |
|
|
have_stat_update = true; |
|
|
} |
|
|
} |
|
|
|
|
|
/* --------------------------- */ |
|
|
|
|
|
|
|
|
mutex_.Lock(); |
|
|
mutex_.Lock(); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -1491,6 +1579,17 @@ Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { |
|
|
return Write(opt, &batch); |
|
|
return Write(opt, &batch); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/* TODO: Add TTL Version Put() */ |
|
|
|
|
|
Status DBImpl::Put(const WriteOptions& opt, const Slice& key, const Slice& value, uint64_t ttl){ |
|
|
|
|
|
WriteBatch batch; |
|
|
|
|
|
auto now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); |
|
|
|
|
|
auto end = now + ttl; |
|
|
|
|
|
Slice value_timestamp = Slice(value.ToString() + "_ts_" + std::to_string(end)); |
|
|
|
|
|
batch.Put(key, value_timestamp); |
|
|
|
|
|
return Write(opt, &batch); |
|
|
|
|
|
} |
|
|
|
|
|
/* --------------------------- */ |
|
|
|
|
|
|
|
|
Status DB::Delete(const WriteOptions& opt, const Slice& key) { |
|
|
Status DB::Delete(const WriteOptions& opt, const Slice& key) { |
|
|
WriteBatch batch; |
|
|
WriteBatch batch; |
|
|
batch.Delete(key); |
|
|
batch.Delete(key); |
|
@ -1508,12 +1607,22 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { |
|
|
// Recover handles create_if_missing, error_if_exists
|
|
|
// Recover handles create_if_missing, error_if_exists
|
|
|
bool save_manifest = false; |
|
|
bool save_manifest = false; |
|
|
Status s = impl->Recover(&edit, &save_manifest); |
|
|
Status s = impl->Recover(&edit, &save_manifest); |
|
|
|
|
|
|
|
|
|
|
|
if (!s.ok()) { |
|
|
|
|
|
std::cerr << "Recover failed: " << s.ToString() << std::endl; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
if (s.ok() && impl->mem_ == nullptr) { |
|
|
if (s.ok() && impl->mem_ == nullptr) { |
|
|
// Create new log and a corresponding memtable.
|
|
|
// Create new log and a corresponding memtable.
|
|
|
uint64_t new_log_number = impl->versions_->NewFileNumber(); |
|
|
uint64_t new_log_number = impl->versions_->NewFileNumber(); |
|
|
WritableFile* lfile; |
|
|
WritableFile* lfile; |
|
|
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), |
|
|
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), |
|
|
&lfile); |
|
|
&lfile); |
|
|
|
|
|
|
|
|
|
|
|
if (!s.ok()) { |
|
|
|
|
|
std::cerr << "NewWritableFile failed: " << s.ToString() << std::endl; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
if (s.ok()) { |
|
|
if (s.ok()) { |
|
|
edit.SetLogNumber(new_log_number); |
|
|
edit.SetLogNumber(new_log_number); |
|
|
impl->logfile_ = lfile; |
|
|
impl->logfile_ = lfile; |
|
@ -1527,6 +1636,11 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { |
|
|
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
|
|
|
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
|
|
|
edit.SetLogNumber(impl->logfile_number_); |
|
|
edit.SetLogNumber(impl->logfile_number_); |
|
|
s = impl->versions_->LogAndApply(&edit, &impl->mutex_); |
|
|
s = impl->versions_->LogAndApply(&edit, &impl->mutex_); |
|
|
|
|
|
|
|
|
|
|
|
if (!s.ok()) { |
|
|
|
|
|
std::cerr << "LogAndApply failed: " << s.ToString() << std::endl; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
} |
|
|
} |
|
|
if (s.ok()) { |
|
|
if (s.ok()) { |
|
|
impl->RemoveObsoleteFiles(); |
|
|
impl->RemoveObsoleteFiles(); |
|
|