Browse Source

value ahead of key

xxy
xxy 9 months ago
parent
commit
b1e59a336b
4 changed files with 52 additions and 84 deletions
  1. +49
    -81
      db/db_impl.cc
  2. +1
    -1
      db/db_impl.h
  3. +1
    -1
      db/db_iter.cc
  4. +1
    -1
      include/leveldb/db.h

+ 49
- 81
db/db_impl.cc View File

@ -1241,7 +1241,6 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
return s;
}
Slice value_log_slice = Slice(value->c_str() + 1, value->length());
Slice new_key;
Slice new_value;
int value_offset = sizeof(uint64_t) * 2; // 16
uint64_t file_id, valuelog_offset;
@ -1250,7 +1249,7 @@ Status DBImpl::Get(const ReadOptions& options, const Slice& key,
res = GetVarint64(&value_log_slice, &valuelog_offset);
if (!res) return Status::Corruption("can't decode valuelog offset");
// std::cout<<"file_id: "<<file_id<<", valuelog_offset: "<<valuelog_offset<<std::endl;
s = ReadValueLog(file_id, valuelog_offset, &new_key, &new_value);
s = ReadValueLog(file_id, valuelog_offset, &new_value);
if (!s.ok()) {
return s;
}
@ -1619,32 +1618,34 @@ std::vector> DBImpl::WriteValueLog(
std::vector<std::pair<uint64_t, uint64_t>> res;
for (const auto& [key_slice, value_slice] : kv) {
// 写入 key 的长度
uint64_t key_len = key_slice.size();
valueFile.write(reinterpret_cast<const char*>(&key_len), sizeof(uint64_t));
// 写入 value 的长度
uint64_t value_len = value_slice.size();
valueFile.write(reinterpret_cast<const char*>(&value_len),
sizeof(uint64_t));
if (!valueFile.good()) {
valueFile.close();
assert(0);
}
// 写入 key 本身
valueFile.write(key_slice.data(), key_len);
// 写入 value 本身
valueFile.write(value_slice.data(), value_len);
if (!valueFile.good()) {
valueFile.close();
assert(0);
}
// 写入 value 的长度
uint64_t value_len = value_slice.size();
valueFile.write(reinterpret_cast<const char*>(&value_len),
sizeof(uint64_t));
// 写入 key 的长度
uint64_t key_len = key_slice.size();
valueFile.write(reinterpret_cast<const char*>(&key_len), sizeof(uint64_t));
if (!valueFile.good()) {
valueFile.close();
assert(0);
}
// 写入 value 本身
valueFile.write(value_slice.data(), value_len);
// 写入 key 本身
valueFile.write(key_slice.data(), key_len);
if (!valueFile.good()) {
valueFile.close();
assert(0);
@ -1705,7 +1706,7 @@ void DBImpl::addNewValueLog() {
}
}
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key,
Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset,
Slice* value) {
Status s = Status::OK();
std::string file_name_ = ValueLogFileName(dbname_, file_id);
@ -1718,63 +1719,33 @@ Status DBImpl::ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key,
return Status::Corruption("Failed to open file for reading!");
}
// Seek to the position of key length
inFile.seekg(offset);
// Read the length of the key
char* key_buf_len = new char[sizeof(uint64_t)];
inFile.read(key_buf_len, sizeof(uint64_t));
uint64_t key_len = 0;
std::memcpy(&key_len, key_buf_len, sizeof(uint64_t));
if (!inFile.good()) {
delete[] key_buf_len;
inFile.close();
return Status::Corruption("Failed to read key length from file!");
}
// Now seek to the actual key position and read the key
inFile.seekg(offset + sizeof(uint64_t));
char* key_buf = new char[key_len];
inFile.read(key_buf, key_len);
if (!inFile.good()) {
delete[] key_buf;
delete[] key_buf_len;
inFile.close();
return Status::Corruption("Failed to read key from file!");
}
// Assign the read key data to the Slice
*key = Slice(key_buf, key_len);
// Read the length of the value
inFile.seekg(offset + sizeof(uint64_t) + key_len);
inFile.seekg(offset);
char* value_buf_len = new char[sizeof(uint64_t)];
inFile.read(value_buf_len, sizeof(uint64_t));
uint64_t val_len = 0;
std::memcpy(&val_len, value_buf_len, sizeof(uint64_t));
if (!inFile.good()) {
delete[] key_buf;
delete[] key_buf_len;
delete[] value_buf_len;
inFile.close();
return Status::Corruption("Failed to read value length from file!");
}
// Now seek to the actual data position and read the value
inFile.seekg(offset + sizeof(uint64_t) + key_len + sizeof(uint64_t));
inFile.seekg(offset + sizeof(uint64_t));
char* value_buf = new char[val_len];
inFile.read(value_buf, val_len);
if (!inFile.good()) {
delete[] key_buf;
delete[] key_buf_len;
delete[] value_buf_len;
delete[] value_buf;
inFile.close();
return Status::Corruption("Failed to read value from file!");
}
// Seek to the position of key length
inFile.seekg(offset + sizeof(uint64_t) + val_len);
// Close the file after reading
inFile.close();
@ -1841,6 +1812,33 @@ void DBImpl::GarbageCollect() {
Status s = Status::OK();
// Read the length of the value
cur_valuelog.seekg(current_offset);
char* value_buf_len = new char[sizeof(uint64_t)];
cur_valuelog.read(value_buf_len, sizeof(uint64_t));
if (cur_valuelog.eof()) {
delete[] value_buf_len;
break; // 正常退出条件:到达文件末尾
}
uint64_t val_len = 0;
std::memcpy(&val_len, value_buf_len, sizeof(uint64_t));
if (!cur_valuelog.good()) {
delete[] value_buf_len;
cur_valuelog.close();
std::cerr << "3Failed to read file: " << valuelog_name << std::endl;
break;
}
// 更新当前偏移
current_offset += sizeof(uint64_t);
// std::cout << cnt <<" "<<current_offset<< std::endl;
current_offset += val_len;
// std::cout << cnt <<" "<<current_offset<< std::endl;
// Seek to the position of key length
cur_valuelog.seekg(current_offset);
@ -1848,11 +1846,6 @@ void DBImpl::GarbageCollect() {
char* key_buf_len = new char[sizeof(uint64_t)];
cur_valuelog.read(key_buf_len, sizeof(uint64_t));
if (cur_valuelog.eof()) {
delete[] key_buf_len;
break; // 正常退出条件:到达文件末尾
}
std::memcpy(&key_len, key_buf_len, sizeof(uint64_t));
if (!cur_valuelog.good()) {
@ -1882,31 +1875,6 @@ void DBImpl::GarbageCollect() {
// Assign the read key data to the Slice
key = Slice(key_buf, key_len);
// Read the length of the value
cur_valuelog.seekg(current_offset);
char* value_buf_len = new char[sizeof(uint64_t)];
cur_valuelog.read(value_buf_len, sizeof(uint64_t));
uint64_t val_len = 0;
std::memcpy(&val_len, value_buf_len, sizeof(uint64_t));
if (!cur_valuelog.good()) {
delete[] key_buf;
delete[] key_buf_len;
delete[] value_buf_len;
cur_valuelog.close();
std::cerr << "3Failed to read file: " << valuelog_name << std::endl;
break;
}
// 更新当前偏移
current_offset += sizeof(uint64_t);
// std::cout << cnt <<" "<<current_offset<< std::endl;
current_offset += val_len;
// std::cout << cnt <<" "<<current_offset<< std::endl;
// // Assign the read value data to the Slice
// value = Slice(value_buf, val_len);
// 检查 key 是否在 sstable 中存在
std::string stored_value;
@ -1953,7 +1921,7 @@ void DBImpl::GarbageCollect() {
}
// Now seek to the actual data position and read the value
cur_valuelog.seekg(current_offset-val_len);
cur_valuelog.seekg(tmp_offset+sizeof(uint64_t));
char* value_buf = new char[val_len];
cur_valuelog.read(value_buf, val_len);
if (!cur_valuelog.good()) {

+ 1
- 1
db/db_impl.h View File

@ -73,7 +73,7 @@ class DBImpl : public DB {
std::pair<WritableFile*, uint64_t> getNewValuelog(); // use for compaction
// Status ReadValueLog(uint64_t file_id, uint64_t offset,Slice*
// value)override;
Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key,
Status ReadValueLog(uint64_t file_id, uint64_t offset,
Slice* value) override;
// Extra methods (for testing) that are not in the public DB interface

+ 1
- 1
db/db_iter.cc View File

@ -83,7 +83,7 @@ class DBIter : public Iterator {
// res=GetVarint64(&tmp_value,&valuelog_len);
// if(!res)assert(0);
// db_->ReadValueLog(file_id,valuelog_offset,valuelog_len,&tmp_value);
db_->ReadValueLog(file_id,valuelog_offset, &key, &tmp_value);
db_->ReadValueLog(file_id,valuelog_offset, &tmp_value);
return tmp_value;
}
Status status() const override {

+ 1
- 1
include/leveldb/db.h View File

@ -119,7 +119,7 @@ class LEVELDB_EXPORT DB {
// assert(0); // Not implemented
// return Status::Corruption("not imp");
// }
virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* key, Slice* value){
virtual Status ReadValueLog(uint64_t file_id, uint64_t offset, Slice* value){
assert(0); // Not implemented
return Status::Corruption("not imp");
}

Loading…
Cancel
Save