// Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "leveldb/table_builder.h" #include #include "leveldb/comparator.h" #include "leveldb/env.h" #include "leveldb/filter_policy.h" #include "leveldb/options.h" #include "table/block_builder.h" #include "table/filter_block.h" #include "table/format.h" #include "util/coding.h" #include "util/crc32c.h" namespace leveldb { struct TableBuilder::Rep { Rep(const Options& opt, WritableFile* f) // Rep类的构造函数 : options(opt), // 使用初始化列表初始化成员变量options index_block_options(opt), // 同样使用初始化列表初始化index_block_options成员变量 file(f), // 使用初始化列表将file成员变量初始化为构造函数参数f的值,即指向可写文件的指针。 offset(0), // 初始化offset成员变量为0,用于跟踪当前写入文件的偏移量。 data_block(&options), // 初始化data_block成员变量,使用&options作为参数,表明数据块的配置将基于options。 index_block(&index_block_options), // 初始化index_block成员变量,与data_block类似,但它使用&index_block_options作为参数,这意味着索引块的配置与index_block_options一致。 num_entries(0), // 初始化num_entries成员变量为0,用于跟踪已写入的数据条目数量。 closed(false), // 初始化closed成员变量为false,用于跟踪文件或写入器是否已经被关闭。 filter_block(opt.filter_policy == nullptr // 用于构建或管理过滤器块,以优化数据的检索。 ? nullptr : new FilterBlockBuilder(opt.filter_policy)), pending_index_entry(false) { // 初始化pending_index_entry成员变量为false,用于跟踪是否有待处理的索引条目需要写入索引块。 index_block_options.block_restart_interval = 1; // 控制索引块中索引项的重启间隔,即每隔多少个索引项就进行一次“重启”或新的索引段开始 } Options options; // 包含了构建SSTable时所需的全局配置选项 Options index_block_options; // options的一个副本,用于构建索引块(Index Block)时的特殊配置 WritableFile* file; // 指向用于写入数据的WritableFile对象,该对象封装了对文件的写操作 uint64_t offset; // 记录当前写入位置在文件中的偏移量 Status status; // 记录构建过程中发生的任何错误或状态 BlockBuilder data_block; // 用于构建数据块(Data Block),数据块存储排序后的键值对 BlockBuilder index_block; // 用于构建索引块(Index Block),索引块存储指向数据块中键范围的指针,以支持快速查找 std::string last_key; // 存储最近添加到数据块中的键,用于与下一个键进行比较 int64_t num_entries; // 记录添加到表中的键值对总数 bool closed; // 标记TableBuilder是否已完成(调用Finish())或已放弃(调用Abandon()) FilterBlockBuilder* filter_block; // 如果配置了过滤器策略,用于构建过滤器块。过滤器块用于减少在查找键时必须读取的数据块数量 // Invariant: r->pending_index_entry is true only if data_block is empty. bool pending_index_entry; // 标记是否有一个待处理的索引条目需要添加到索引块中 BlockHandle pending_handle; // 存储待处理的索引条目的BlockHandle,BlockHandle包含了数据块在文件中的位置和大小信息 std::string compressed_output; // 用于临时存储压缩后的数据块或索引块内容 }; TableBuilder::TableBuilder(const Options& options, WritableFile* file) : rep_(new Rep(options, file)) { if (rep_->filter_block != nullptr) { rep_->filter_block->StartBlock(0); } } TableBuilder::~TableBuilder() { assert(rep_->closed); // Catch errors where caller forgot to call Finish() delete rep_->filter_block; delete rep_; } Status TableBuilder::ChangeOptions(const Options& options) { // Note: if more fields are added to Options, update // this function to catch changes that should not be allowed to // change in the middle of building a Table. if (options.comparator != rep_->options.comparator) { return Status::InvalidArgument("changing comparator while building table"); } // Note that any live BlockBuilders point to rep_->options and therefore // will automatically pick up the updated options. rep_->options = options; rep_->index_block_options = options; rep_->index_block_options.block_restart_interval = 1; return Status::OK(); } void TableBuilder::Add(const Slice& key, const Slice& value) { Rep* r = rep_; // 获取TableBuilder的Rep表示,Rep是TableBuilder的内部状态管理结构体 assert(!r->closed); // 断言确保当前 Build 过程没有结束 if (!ok()) return; // 如果状态不是OK,则直接返回 if (r->num_entries > 0) { // 如果这不是第一个键值对,则断言确保新添加的键大于(即排在后面)最后一个添加的键。 assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0); } // 如果有一个待处理的索引条目(pending_index_entry为true),则需要先处理它。 if (r->pending_index_entry) { assert(r->data_block.empty()); // 断言确保当前数据块为空,因为只有在添加新数据块前才会处理待处理的索引条目。 // 使用比较器找到当前最后一个键和新键之间的最短分隔符,并更新last_key r->options.comparator->FindShortestSeparator(&r->last_key, key); // 将待处理的索引条目的BlockHandle编码成字符串,并添加到索引块中 std::string handle_encoding; r->pending_handle.EncodeTo(&handle_encoding); r->index_block.Add(r->last_key, Slice(handle_encoding)); r->pending_index_entry = false; // 处理完毕后,将pending_index_entry标记为false } // 如果配置了过滤器块,则将新键添加到过滤器块中。 if (r->filter_block != nullptr) { r->filter_block->AddKey(key); } // 更新last_key为新添加的键,并增加num_entries计数器 r->last_key.assign(key.data(), key.size()); r->num_entries++; // 将键值对添加到数据块中 r->data_block.Add(key, value); // 估计当前数据块的大小,并检查是否超过了配置的块大小限制 const size_t estimated_block_size = r->data_block.CurrentSizeEstimate(); // 如果数据块大小超过了限制,则调用Flush()函数来处理数据块 if (estimated_block_size >= r->options.block_size) { Flush(); } } void TableBuilder::Flush() { Rep* r = rep_; // 获取TableBuilder的内部状态管理结构体Rep的指针 assert(!r->closed); // 断言确保TableBuilder没有被关闭 if (!ok()) return; // 如果TableBuilder的状态不是OK(可能由于之前的错误),则直接返回 if (r->data_block.empty()) return; // 如果数据块为空,则没有内容需要刷新,直接返回 assert(!r->pending_index_entry); // 断言确保没有待处理的索引条目,因为在此刻应该已经处理完上一个数据块的所有索引 // 将当前数据块写入 WriteBlock(&r->data_block, &r->pending_handle); // 检查WriteBlock操作是否成功,如果成功,则进行后续操作 if (ok()) { // 标记有一个待处理的索引条目,这是为了下一个数据块添加时能够正确设置索引 r->pending_index_entry = true; // 调用底层文件的Flush方法来确保所有数据都已经被正确写入并可能同步到磁盘 r->status = r->file->Flush(); } // 如果配置了过滤器块,并且数据块已经刷新到文件中 if (r->filter_block != nullptr) { // 通知过滤器块开始一个新的块,并传入当前数据块的偏移量 r->filter_block->StartBlock(r->offset); } } void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { // File format contains a sequence of blocks where each block has: // block_data: uint8[n] // type: uint8 // crc: uint32 assert(ok()); // 断言确保当前对象处于有效状态 Rep* r = rep_; // 获取TableBuilder的Rep表示,Rep是TableBuilder的内部状态管理结构体 Slice raw = block->Finish(); // Slice block_contents; // CompressionType type = r->options.compression; // TODO(postrelease): Support more compression options: zlib? // 通过switch语句根据r->options.compression的值(压缩类型)来决定是否压缩以及如何压缩数据 switch (type) { case kNoCompression: // 不进行压缩,直接将raw赋值给block_contents block_contents = raw; break; case kSnappyCompression: { // 尝试使用Snappy算法压缩数据,如果压缩成功且压缩后的数据大小小于原始数据的87.5%(raw.size() - (raw.size() / 8u)),则使用压缩后的数据;否则,回退到无压缩模式,并更新type为kNoCompression std::string* compressed = &r->compressed_output; if (port::Snappy_Compress(raw.data(), raw.size(), compressed) && compressed->size() < raw.size() - (raw.size() / 8u)) { block_contents = *compressed; } else { // Snappy not supported, or compressed less than 12.5%, so just // store uncompressed form block_contents = raw; type = kNoCompression; } break; } case kZstdCompression: { // 类似于Snappy的处理,但使用Zstandard算法进行压缩 std::string* compressed = &r->compressed_output; if (port::Zstd_Compress(r->options.zstd_compression_level, raw.data(), raw.size(), compressed) && compressed->size() < raw.size() - (raw.size() / 8u)) { block_contents = *compressed; } else { // Zstd not supported, or compressed less than 12.5%, so just // store uncompressed form block_contents = raw; type = kNoCompression; } break; } } WriteRawBlock(block_contents, type, handle); // 调用WriteRawBlock来实际将数据块写入到底层存储,并记录其压缩类型和位置信息到handle中 r->compressed_output.clear(); // 清理用于存储压缩输出的字符串,以避免内存泄漏 block->Reset(); // 调用block的Reset方法,重置BlockBuilder的状态,以便它可以被重新用于构建下一个数据块 } void TableBuilder::WriteRawBlock(const Slice& block_contents, CompressionType type, BlockHandle* handle) { Rep* r = rep_; // 获取内部表示 handle->set_offset(r->offset); // 设置数据块在文件中的偏移量 handle->set_size(block_contents.size()); // 设置数据块的大小 r->status = r->file->Append(block_contents); // 尝试将数据块内容追加到文件中 if (r->status.ok()) { // 检查数据块内容是否成功追加到文件 char trailer[kBlockTrailerSize]; // 准备一个用于存储块尾信息的缓冲区 trailer[0] = type; // 设置块尾信息的第一个字节为压缩类型 uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size()); // 计算数据块内容的CRC校验和 crc = crc32c::Extend(crc, trailer, 1); // 将CRC校验和扩展到包括块类型字节 EncodeFixed32(trailer + 1, crc32c::Mask(crc)); // 将CRC校验和(经过掩码处理以符合特定格式)编码到块尾信息的剩余部分 r->status = r->file->Append(Slice(trailer, kBlockTrailerSize)); // 尝试将块尾信息追加到文件中 if (r->status.ok()) { // 检查块尾信息是否成功追加到文件 r->offset += block_contents.size() + kBlockTrailerSize; // 更新内部偏移量,以反映已写入的数据块内容和块尾信息的总大小 } } } Status TableBuilder::status() const { return rep_->status; } Status TableBuilder::Finish() { Rep* r = rep_; // 获取内部表示 Flush(); // 刷新缓冲区,确保所有待写入的数据都已写入文件 assert(!r->closed); // 断言检查,确保在调用Finish之前表构建器没有被关闭 r->closed = true; // 标记表构建器为已关闭 // 初始化几个BlockHandle对象,用于存储后续块的位置信息 BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle; // 写入过滤器块(如果存在) if (ok() && r->filter_block != nullptr) { // 调用Finish()完成过滤器块的构建,并获取其内容 // 然后以无压缩方式写入过滤器块内容,并获取其BlockHandle WriteRawBlock(r->filter_block->Finish(), kNoCompression, &filter_block_handle); } // 写入 metaindex block if (ok()) { // 创建一个BlockBuilder对象用于构建metaindex block BlockBuilder meta_index_block(&r->options); // 如果存在过滤器块,则将其名称和位置信息添加到meta_index_block中 if (r->filter_block != nullptr) { // Add mapping from "filter.Name" to location of filter data std::string key = "filter."; key.append(r->options.filter_policy->Name()); // 过滤器策略的名称 std::string handle_encoding; filter_block_handle.EncodeTo(&handle_encoding); // 将过滤器块的BlockHandle编码为字符串 meta_index_block.Add(key, handle_encoding); // 将键值对添加到元索引块中 } // TODO(postrelease): Add stats and other meta blocks // 写入构建好的元索引块,并获取其BlockHandle WriteBlock(&meta_index_block, &metaindex_block_handle); } // 写入索引块 if (ok()) { // 如果存在待处理的索引项,则先处理它 if (r->pending_index_entry) { // 使用比较器找到最后一个键的短后继(用于索引的边界处理) r->options.comparator->FindShortSuccessor(&r->last_key); std::string handle_encoding; r->pending_handle.EncodeTo(&handle_encoding); // 编码待处理的BlockHandle r->index_block.Add(r->last_key, Slice(handle_encoding)); // 将键值对添加到索引块中 r->pending_index_entry = false; // 标记待处理索引项为已处理 } // 写入构建好的索引块,并获取其BlockHandle WriteBlock(&r->index_block, &index_block_handle); } // 写入表尾 if (ok()) { // 创建一个Footer对象,并设置元索引块和索引块的BlockHandle Footer footer; footer.set_metaindex_handle(metaindex_block_handle); footer.set_index_handle(index_block_handle); // 将Footer对象编码为字符串 std::string footer_encoding; footer.EncodeTo(&footer_encoding); // 将编码后的表尾追加到文件中 r->status = r->file->Append(footer_encoding); // 如果表尾成功写入,则更新内部偏移量 if (r->status.ok()) { r->offset += footer_encoding.size(); } } // 返回最终的状态,表示表构建是否成功完成 return r->status; } void TableBuilder::Abandon() { Rep* r = rep_; assert(!r->closed); r->closed = true; } uint64_t TableBuilder::NumEntries() const { return rep_->num_entries; } uint64_t TableBuilder::FileSize() const { return rep_->offset; } } // namespace leveldb