作者: 谢瑞阳 10225101483 徐翔宇 10225101535
Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.
 
 

23 KiB

LEVELDB TTL

小组成员

谢瑞阳 徐翔宇

整体介绍

我们在整体实验过程中,经历了先后两版对于TLL功能的设计和开发过程,在开发第一版TTL功能的过程中,我们遇到了一些困难。经过思考讨论后,我们重构了全部代码,并基于新的设计快速的完成了TTL功能以及对应的垃圾回收功能。

我们仓库的地址为:https://gitea.shuishan.net.cn/10225101483/XOY-Leveldb

其中每个分支对应的内容为:

master分支:主分支:第二版TTL设计。

xry分支:第一版TTL设计,完成了在memtable中的带有TTL数据的存取(即在小数据量情况下可以正确运行测试脚本中的第一个测试)。

xxy分支、new_version分支:第二版TTL设计。其中new_version分支与master分支内容相同。

设计思路

要完成TTL功能,首先也是最重要的内容即是要在原本leveldb存储数据的格式上增加一块用于存储TTL的空间,并且想明白如何对其进行读取,以及需要对其他存储空间(key,sequence number,value)的存取需要作出什么改动。

在写入数据时,涉及到编码的主要函数有以下三个:

leveldb通过WriteBatch::Put对传入的键值对进行编码,写入writebatch中。

void WriteBatch::Put(const Slice& key, const Slice& value) {
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeValue));
  PutLengthPrefixedSlice(&rep_, key);
  PutLengthPrefixedSlice(&rep_, value);
}

之后,使用WriteBatch::Iterate将一个writebatch中的所有键值对依次解析并使用handler放入memtable。

Slice input(rep_);
  if (input.size() < kHeader) {
    return Status::Corruption("malformed WriteBatch (too small)");
  }

  input.remove_prefix(kHeader);
  Slice key, value;
  int found = 0;
  while (!input.empty()) {
    found++;
    char tag = input[0];
    input.remove_prefix(1);
    switch (tag) {
      case kTypeValue:
        if (GetLengthPrefixedSlice(&input, &key) &&
            GetLengthPrefixedSlice(&input, &value)) {
          handler->Put(key, value);
        } else {
          return Status::Corruption("bad WriteBatch Put");
        }
        break;
      case kTypeDeletion:
        if (GetLengthPrefixedSlice(&input, &key)) {
          handler->Delete(key);
        } else {
          return Status::Corruption("bad WriteBatch Delete");
        }
        break;
      default:
        return Status::Corruption("unknown WriteBatch tag");
    }
  }

之后,使用MemTable::Add将解析出来的数据重新编码,并插入至memtable。对于SSTable来说,InternalKey中的SequenceNumber和Type被隐藏,但存储数据的形式未发生改变。

void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
                   const Slice& value) {
  // Format of an entry is concatenation of:
  //  key_size     : varint32 of internal_key.size()
  //  key bytes    : char[internal_key.size()]
  //  tag          : uint64((sequence << 8) | type)
  //  value_size   : varint32 of value.size()
  //  value bytes  : char[value.size()]
  size_t key_size = key.size();
  size_t val_size = value.size();
  size_t internal_key_size = key_size + 8;
  const size_t encoded_len = VarintLength(internal_key_size) +
                             internal_key_size + VarintLength(val_size) +
                             val_size;
  char* buf = arena_.Allocate(encoded_len);
  char* p = EncodeVarint32(buf, internal_key_size);
  std::memcpy(p, key.data(), key_size);
  p += key_size;
  EncodeFixed64(p, (s << 8) | type);
  p += 8;
  p = EncodeVarint32(p, val_size);
  std::memcpy(p, value.data(), val_size);
  assert(p + val_size == buf + encoded_len);
  table_.Insert(buf);
}

这是原本leveldb在memtable和sstable中存储单个键值对时使用的数据结构:

old_record

其中,key和value对应着用户输入的键值对,而sequenceNumber是该键值对在leveldb内部的序列号(越大越新),type则标识该记录的类型。

第一版设计:增加在中

我们的第一版TTL存储设计如下:

old_record-17307966031002

在我们最初的思考中,TTL与SequenceNumber、Type一样,属于和主要的存储内容(key,value)分割开的存储数据,因此我们选择将TTL加在SequenceNumber和Type后面(不加在中间和前面,以免破坏原先InternalKey的结构)。

但在实现过程中,我们发现在中间增加TTL的形式会导致许多代码变动,因为对数据编码形式的修改会导致InternalKey和LookupKey等基础的编码类及函数需要进行改动,它们在一些与TTL无关的地方似乎也被复用(比如versionEdit和大量测试函数),因此我们认为对其进行修改会非常复杂,可能导致leveldb其他模块受到牵连,不符合我们对TTL轻量级改动的构想。因此我们进行了讨论,并产生了如下的新版设计

第二版设计:放入Value

我们的第二版TTL存储设计如下:

old_record-17308259248204

由于 leveldb 本身对于 key、value、sequencenumber 的解析已经非常完善并且有些耦合,因此我们想要在不破坏 leveldb 原先的解析方式下完成 TTL 功能,我们想到的最好最轻量级的方式,就是将 TTL 数据作为 Value的一部分(在原 Value 后追加 8byte 作为 TTL )进行存储,仅当要解析 Value 时(即读取数据时或合并时),才会在同时对 TTL 进行解析。由此一来,我们无需改变原本的存储结构。事实证明,我们在很短时间内就完成了这一版 TTL的全部所需功能。

实现流程

第一版设计

首先,我们需要修改在 writebatch 中数据的存取格式(主要是对 Value 的读取需要向后偏移8位)。我们在 WriteBatch 中新增了 Put_for_ttl 函数,将 ttl 放置在 key,value 之前。

void WriteBatch::Put_for_ttl(const Slice& key, const Slice& value,uint64_t ttl){
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeValue));
  PutVarint64(&rep_,ttl);
  PutLengthPrefixedSlice(&rep_, key);
  PutLengthPrefixedSlice(&rep_, value);
}

同时,对 WriteBatch 的迭代器也要进行修改。

image-20241106141245806

使用 Add 将键值对加入 memtable 时,也要在 key 和 value 之间插入参数 ttl。具体位置在 key 末尾的 tag 之后,使用 EncodeFixed64 放入大小为 8 的 ttl 。

void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
                   const Slice& value,uint64_t ttl) {
  // Format of an entry is concatenation of:
  //  key_size     : varint32 of internal_key.size()
  //  key bytes    : char[internal_key.size()]
  //  tag          : uint64((sequence << 8) | type)
  //  ttl          : uint64(now)
  //  value_size   : varint32 of value.size()
  //  value bytes  : char[value.size()]
  size_t key_size = key.size();

  size_t val_size = value.size();
  size_t internal_key_size = key_size + 8;
  const size_t encoded_len = VarintLength(internal_key_size) +
                             internal_key_size + 8 + VarintLength(val_size) +
                             val_size;
  char* buf = arena_.Allocate(encoded_len);
  //internal_key_size(sizeof(key)+sizeof(sequence+type)+sizeof(ttl))
  //--key--(sequence+type)--ttl--sizeof(val)--val
  char* p = EncodeVarint32(buf, internal_key_size);
  std::memcpy(p, key.data(), key_size);
  p += key_size;
  EncodeFixed64(p, (s << 8) | type);
  p += 8;
  EncodeFixed64(p,ttl);
  p += 8;
  p = EncodeVarint32(p, val_size);
  std::memcpy(p, value.data(), val_size);
  assert(p + val_size == buf + encoded_len);
  table_.Insert(buf);
}

从 memtable 获取插入的数据时,需要读出 key, value 之间的 ttl 进行检查( line15 )

bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
  Slice memkey = key.memtable_key();
  Table::Iterator iter(&table_);
  iter.Seek(memkey.data());
  while(iter.Valid()) {
    const char* entry = iter.key();
    uint32_t key_length;//internal_key_size
    const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
    if (comparator_.comparator.user_comparator()->Compare(
            Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
      // Correct user key
      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
      switch (static_cast<ValueType>(tag & 0xff)) {
        case kTypeValue: {
          const uint64_t ttl = DecodeFixed64(key_ptr+key_length);
          if(ttl<key.ttl()){ // data的ttl需要大于等于查找开始时的时间,否则查找直到下一个ttl有效的键值对
            iter.Next();// drop dead data
            continue;
          }
          Slice v = GetLengthPrefixedSlice(key_ptr + key_length+8);// 指针再偏移ttl的长度
          value->assign(v.data(), v.size());
          return true;
        }
        case kTypeDeletion:
          *s = Status::NotFound(Slice());
          return true;
      }
    }
    else{
      break;// 当 key 不一致的时候返回 false
    }
  }
  return false;
}

在开始查找时,将当前的时间写入 LookupKey,如果查找到的数据 ttl 超过当前时间,那么就会抛弃该条数据。

image-20241106152151687

通过 blockbuilder 向 sstable 插入数据时(新建 level0/1/2)时使用 add 函数,需要将 ttl 通过PutVarint64 编码成 Varint64 形式,放入 key 与 value 之间

void BlockBuilder::Add(const Slice& key, uint64_t ttl, const Slice& value) {
  ···
  ···

  // Add "<shared><non_shared><value_size>" to buffer_
  PutVarint32(&buffer_, shared);
  PutVarint32(&buffer_, non_shared);
  PutVarint32(&buffer_, value.size());

  // Add string delta to buffer_ followed by value
  buffer_.append(key.data() + shared, non_shared);
  PutVarint64(&buffer_, ttl);//加入ttl
  buffer_.append(value.data(), value.size());

  // Update state
  last_key_.resize(shared);
  last_key_.append(key.data() + shared, non_shared);
  assert(Slice(last_key_) == key);
  counter_++;
}

通过 TableBuilder 将合并后的数据插入新的 sstable,使用 add 函数

image-20241106154658482

void TableBuilder::Add(const Slice& key, uint64_t ttl, const Slice& value) {
  time_t now = time(nullptr);
  if(ttl < static_cast<uint64_t>(now))return; // 再次检查,是否超时(理论上在上层DoCompactionWork时已经检查过一次,都是为了保障正确性,冗余的测试并不会造成很大的性能瓶颈)

  ···
  ···
      
  if (r->filter_block != nullptr) {
    r->filter_block->AddKey(key);
  }

  r->last_key.assign(key.data(), key.size());
  r->num_entries++;
  r->data_block.Add(key, ttl, value); //加入block时仍旧需要使用 BlockBuilder::Add

  const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
  if (estimated_block_size >= r->options.block_size) {
    Flush();
  }
}

在 DoCompactionWork 中,合并是会检查数据 ttl 是否到期,如果到期则会丢弃。

image-20241106173117857

然而,在处理Block中数据的存取时,我们遇到了问题:

image-20241106203303238

Leveldb使用Block::Iter对于IndexBlock和dataBlock进行统一的数据读取,在处理dataBlock时,我们期望Block::Iter对数据的处理能够适配ttl,即在key与value中间存有一份TTL。然而在debug过程中我们发现,在改动Block::Iter获取下一个键值对和获取当前键值对的Value的逻辑后(因为要多向后移8字节),使得IndexBlock获取数据出错。在经过一段时间的debug后,我们认为要完成解决这一bug,需要连带改动IndexBlock存取数据的逻辑,而这已经偏离了TTL本身的范围,使得项目的紧耦合度增强,既不轻量也不优雅,因此我们放弃解决这一bug,转而构思出了第二版设计。

第二版设计

原先的Put
// Convenience methods
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
  return DB::Put(o, key, val);
}

// 加入ttl功能后
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val,uint64_t ttl) {
  return DB::Put(o, key, val,ttl);
}
老版本的 DB::Put 方法仅接受键和值,并将值直接存入数据库。
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  WriteBatch batch;
  batch.Put(key, value);
  return Write(opt, &batch);
}
新版本增加了一个可选参数 ttl(过期时间),允许用户指定存储的值的生存时间。
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  WriteBatch batch;
  int len = value.size() + sizeof(uint64_t);
  char* new_data = new char[len];
  time_t now = time(nullptr); // 获取当前时间,单位为秒
  uint64_t ttl = INT64_MAX; // 设置默认的 TTL 为最大值(即永不过期)
  
  memcpy(new_data, value.data(), value.size()); // 将实际的值复制到 new_data 中
  memcpy(new_data + len - sizeof(uint64_t), (char*)(&ttl), sizeof(uint64_t)); // 将 TTL 复制到 new_data 的末尾
  
  Slice newValue = Slice(new_data, len); // 创建一个新的 Slice 对象
  batch.Put(key, newValue); // 将键值对放入 WriteBatch 中
  return Write(opt, &batch); // 写入数据库
}


Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value, uint64_t ttl) {
  WriteBatch batch;
  int len = value.size() + sizeof(uint64_t);
  char* new_data = new char[len];
  time_t now = time(nullptr); // 获取当前时间,单位为秒
  ttl += static_cast<uint64_t>(now); // 将当前时间加上 TTL,计算过期时间,以过期时间戳作为真正存储的TTL

  memcpy(new_data, value.data(), value.size()); // 将实际的值复制到 new_data 中
  memcpy(new_data + len - sizeof(uint64_t), (char*)(&ttl), sizeof(uint64_t)); // 将 TTL 复制到 new_data 的末尾

  Slice newValue = Slice(new_data, len); // 创建一个新的 Slice 对象
  batch.Put(key, newValue); // 将键值对放入 WriteBatch 中
  delete []new_data; // 释放分配的内存,防止内存泄漏
  return Write(opt, &batch); // 写入数据库
}
合并sstable内的数据
Status DBImpl::DoCompactionWork(CompactionState* compact) {
···
···
  Iterator* input = versions_->MakeInputIterator(compact->compaction);

  // Release mutex while we're actually doing the compaction work
  mutex_.Unlock();

  input->SeekToFirst();
  Status status;
  ParsedInternalKey ikey;
  std::string current_user_key;
  bool has_current_user_key = false;
  SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
  while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
    auto x=input->value(); // 获取键值对 value
    uint64_t ttl=*(uint64_t*)(x.data()+x.size()-sizeof(uint64_t));// 将 TTL 从 new_data 的末尾取出
    time_t now = time(nullptr); // 获得当前时间
    // 如果 TTL 超过当前时间,说明数据已经过期
    if(ttl < static_cast<uint64_t>(now)){
      Log(options_.info_log, "delete record for ttl");
      input->Next(); // 将 input 指向下一个键值对
      continue;
    } 
    // Prioritize immutable compaction work
    if (has_imm_.load(std::memory_order_relaxed)) {
      const uint64_t imm_start = env_->NowMicros();
      mutex_.Lock();
      if (imm_ != nullptr) {
        CompactMemTable();
        // Wake up MakeRoomForWrite() if necessary.
        background_work_finished_signal_.SignalAll();
      }
      mutex_.Unlock();
      imm_micros += (env_->NowMicros() - imm_start);
    }
···
···
}
用于处理从sstable中读取的键值对
static void SaveValue(void* arg, const Slice& ikey, const Slice& v) {
  Saver* s = reinterpret_cast<Saver*>(arg);
  ParsedInternalKey parsed_key;
  if (!ParseInternalKey(ikey, &parsed_key)) {
    s->state = kCorrupt;
  } else {
    if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) {
      //  kTypeValue 对应被插入的数据
      if(parsed_key.type == kTypeValue){
        time_t now = time(nullptr);
        uint64_t ttl=*(uint64_t*)(v.data()+v.size()-sizeof(uint64_t)); // 将 TTL 从 new_data 的末尾取出
        if(ttl < static_cast<uint64_t>(now))return; // 如果 TTL 超过当前时间,说明数据已经过期,直接返回
      }
      s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted;
      if (s->state == kFound) {
        s->value->assign(v.data(), v.size()-sizeof(uint64_t));
      }
    }
  }
}
读取memtable内的数据
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
  Slice memkey = key.memtable_key();
  Table::Iterator iter(&table_);
  iter.Seek(memkey.data());
  while (iter.Valid()) {
    const char* entry = iter.key();
    uint32_t key_length;
    const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
    if (comparator_.comparator.user_comparator()->Compare(
            Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
      // Correct user key
      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
      switch (static_cast<ValueType>(tag & 0xff)) {
        //  kTypeValue 对应被插入的数据
        case kTypeValue: {
          Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
          uint64_t ttl=*(uint64_t*)(v.data()+v.size()-sizeof(uint64_t)); // 将 TTL 从 new_data 的末尾取出
          time_t now = time(nullptr);
          // 如果 TTL 超过当前时间,说明数据已经过期
          if(ttl < static_cast<uint64_t>(now)){
            iter.Next(); // 将 iter 指向下一个键值对
            continue;
          }
          value->assign(v.data(), v.size()-sizeof(uint64_t));

          return true;
        }
        case kTypeDeletion:
          *s = Status::NotFound(Slice());
          return true;
      }
    }
    else break;
  }
  return false;
}
测试

1.将原有的两个测试的严格sleep ttl秒,改成sleep(ttl+1)秒,因为我们的设计是以秒为存储单位,可能存在一秒以内的读取误差,所以进行修改。(我们认为ttl无需毫秒/微妙级的数据准确性,一秒之内的误差可以接受,因此采取此种设计)

2.在原有设计上增加了第三个测试,其目的在于:①测试新的数据过期后,是否能够重新查找到旧但未过期的数据。②测试无TTL的接口是否可以正确运行,并且数据不过期。

#include "gtest/gtest.h"
#include "leveldb/env.h"
#include "leveldb/db.h"

using namespace leveldb;

constexpr int value_size = 2048;
constexpr int data_size = 128 << 20;

Status OpenDB(std::string dbName, DB **db) {
  Options options;
  options.create_if_missing = true;
  return DB::Open(options, dbName, db);
}

void InsertData(DB *db, uint64_t ttl/* second */) {
  WriteOptions writeOptions;
  int key_num = data_size / value_size;
  srand(0);

  for (int i = 0; i < key_num; i++) {
    int key_ = rand() % key_num+1;
    std::string key = std::to_string(key_);
    std::string value(value_size, 'a');
    db->Put(writeOptions, key, value, ttl);
  }
}

void GetData(DB *db, int size = (1 << 30)) {
  ReadOptions readOptions;
  int key_num = data_size / value_size;
  
  // 点查
  srand(0);
  for (int i = 0; i < 100; i++) {
    int key_ = rand() % key_num+1;
    std::string key = std::to_string(key_);
    std::string value;
    db->Get(readOptions, key, &value);
  }
}

TEST(TestTTL, ReadTTL) {
    DB *db;
    if(OpenDB("testdb", &db).ok() == false) {
        std::cerr << "open db failed" << std::endl;
        abort();
    }

    uint64_t ttl = 20;

    InsertData(db, ttl);

    ReadOptions readOptions;
    Status status;
    int key_num = data_size / value_size;
    srand(0);
    for (int i = 0; i < 100; i++) {
        int key_ = rand() % key_num+1;
        std::string key = std::to_string(key_);
        std::string value;
        status = db->Get(readOptions, key, &value);
        ASSERT_TRUE(status.ok());
    }

    Env::Default()->SleepForMicroseconds((ttl+1) * 1000000);

    for (int i = 0; i < 100; i++) {
        int key_ = rand() % key_num+1;
        std::string key = std::to_string(key_);
        std::string value;
        status = db->Get(readOptions, key, &value);
        ASSERT_FALSE(status.ok());
    }

    delete db;
}

TEST(TestTTL, CompactionTTL) {
    DB *db;

    if(OpenDB("testdb", &db).ok() == false) {
        std::cerr << "open db failed" << std::endl;
        abort();
    }

    uint64_t ttl = 20;
    InsertData(db, ttl);

    leveldb::Range ranges[1];
    ranges[0] = leveldb::Range("-", "A");
    uint64_t sizes[1];
    db->GetApproximateSizes(ranges, 1, sizes);
    ASSERT_GT(sizes[0], 0);

    Env::Default()->SleepForMicroseconds((ttl+1) * 1000000);

    db->CompactRange(nullptr, nullptr);

    leveldb::Range  ranges_1[1];
    ranges[0] = leveldb::Range("-", "A");
    uint64_t sizes_1[1];
    db->GetApproximateSizes(ranges_1, 1, sizes_1);
    ASSERT_EQ(sizes_1[0], 0);

    delete db;
}


TEST(TestTTL, OurTTL) {
    DB *db;
    WriteOptions writeOptions;
    ReadOptions readOptions;
    if(OpenDB("testdb_for_XOY", &db).ok() == false) {
        std::cerr << "open db failed" << std::endl;
        abort();
    }
    for (int i = 0; i < 10000; i++) {
      std::string key = std::to_string(i);
      std::string value = std::to_string(i);
      db->Put(writeOptions, key, value);//插入永不过期的key value(value=key)
    }
    for (int i = 0; i < 10000; i++) {
      std::string key = std::to_string(i);
      std::string value = std::to_string(i*2);
      db->Put(writeOptions, key, value, 30);//插入30秒后过期的key value(value=key*2)
    }
    for (int i = 0; i < 10000; i++) {
      std::string key = std::to_string(i);
      std::string value = std::to_string(i*3);
      db->Put(writeOptions, key, value, 15);//插入15秒后过期的key value(value=key*3)
    }

    for (int i = 0; i < 10000; i++) {
        std::string key = std::to_string(i);
        std::string value;
        Status status = db->Get(readOptions, key, &value);
        ASSERT_TRUE(status.ok());
        ASSERT_TRUE(value==std::to_string(i*3));//立刻读取,期望得到value=key*3
    }

    Env::Default()->SleepForMicroseconds((15+1) * 1000000);

    for (int i = 0; i < 10000; i++) {
        std::string key = std::to_string(i);
        std::string value;
        Status status = db->Get(readOptions, key, &value);
        ASSERT_TRUE(status.ok());
        ASSERT_TRUE(value==std::to_string(i*2));//16秒后读取,期望得到value=key*2
    }

    Env::Default()->SleepForMicroseconds((15+1) * 1000000);

    for (int i = 0; i < 10000; i++) {
        std::string key = std::to_string(i);
        std::string value;
        Status status = db->Get(readOptions, key, &value);
        ASSERT_TRUE(status.ok());
        ASSERT_TRUE(value==std::to_string(i));//32秒后读取,期望得到value=key
    }

    delete db;
}


int main(int argc, char** argv) {
  // All tests currently run with the same read-only file limits.
  testing::InitGoogleTest(&argc, argv);
  return RUN_ALL_TESTS();
}