LevelDB project 1 10225501460 林子骥 10211900416 郭夏辉
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

286 lines
8.5 KiB

1 month ago
1 month ago
1 month ago
1 month ago
1 month ago
  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include "leveldb/table_builder.h"
  5. #include <cassert>
  6. #include "leveldb/comparator.h"
  7. #include "leveldb/env.h"
  8. #include "leveldb/filter_policy.h"
  9. #include "leveldb/options.h"
  10. #include "table/block_builder.h"
  11. #include "table/filter_block.h"
  12. #include "table/format.h"
  13. #include "util/coding.h"
  14. #include "util/crc32c.h"
  15. namespace leveldb {
  16. struct TableBuilder::Rep {
  17. Rep(const Options& opt, WritableFile* f)
  18. : options(opt),
  19. index_block_options(opt),
  20. file(f),
  21. offset(0),
  22. data_block(&options),
  23. index_block(&index_block_options),
  24. num_entries(0),
  25. closed(false),
  26. filter_block(opt.filter_policy == nullptr
  27. ? nullptr
  28. : new FilterBlockBuilder(opt.filter_policy)),
  29. pending_index_entry(false),
  30. has_ttl_filter_(false){
  31. index_block_options.block_restart_interval = 1;
  32. }
  33. Options options;
  34. Options index_block_options;
  35. WritableFile* file;
  36. uint64_t offset;
  37. Status status;
  38. BlockBuilder data_block;
  39. BlockBuilder index_block;
  40. std::string last_key;
  41. int64_t num_entries;
  42. bool closed; // Either Finish() or Abandon() has been called.
  43. FilterBlockBuilder* filter_block;
  44. // We do not emit the index entry for a block until we have seen the
  45. // first key for the next data block. This allows us to use shorter
  46. // keys in the index block. For example, consider a block boundary
  47. // between the keys "the quick brown fox" and "the who". We can use
  48. // "the r" as the key for the index block entry since it is >= all
  49. // entries in the first block and < all entries in subsequent
  50. // blocks.
  51. //
  52. // Invariant: r->pending_index_entry is true only if data_block is empty.
  53. bool pending_index_entry;
  54. bool has_ttl_filter_;
  55. BlockHandle pending_handle; // Handle to add to index block
  56. std::string compressed_output;
  57. };
  58. TableBuilder::TableBuilder(const Options& options, WritableFile* file)
  59. : rep_(new Rep(options, file)) {
  60. if (rep_->filter_block != nullptr) {
  61. rep_->filter_block->StartBlock(0);
  62. }
  63. }
  64. TableBuilder::~TableBuilder() {
  65. assert(rep_->closed); // Catch errors where caller forgot to call Finish()
  66. delete rep_->filter_block;
  67. delete rep_;
  68. }
  69. Status TableBuilder::ChangeOptions(const Options& options) {
  70. // Note: if more fields are added to Options, update
  71. // this function to catch changes that should not be allowed to
  72. // change in the middle of building a Table.
  73. if (options.comparator != rep_->options.comparator) {
  74. return Status::InvalidArgument("changing comparator while building table");
  75. }
  76. // Note that any live BlockBuilders point to rep_->options and therefore
  77. // will automatically pick up the updated options.
  78. rep_->options = options;
  79. rep_->index_block_options = options;
  80. rep_->index_block_options.block_restart_interval = 1;
  81. return Status::OK();
  82. }
  83. void TableBuilder::Add(const Slice& key, const Slice& value) {
  84. Rep* r = rep_;
  85. assert(!r->closed);
  86. if (!ok()) return;
  87. if (r->num_entries > 0) {
  88. assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
  89. }
  90. if (r->pending_index_entry) {
  91. assert(r->data_block.empty());
  92. r->options.comparator->FindShortestSeparator(&r->last_key, key);
  93. std::string handle_encoding;
  94. r->pending_handle.EncodeTo(&handle_encoding);
  95. r->index_block.Add(r->last_key, Slice(handle_encoding));
  96. r->pending_index_entry = false;
  97. }
  98. if (r->filter_block != nullptr) {
  99. r->filter_block->AddKey(key);
  100. // if(r->has_ttl_filter_){
  101. //
  102. // }
  103. }
  104. r->last_key.assign(key.data(), key.size());
  105. r->num_entries++;
  106. r->data_block.Add(key, value);
  107. const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
  108. if (estimated_block_size >= r->options.block_size) {
  109. Flush();
  110. }
  111. }
  112. void TableBuilder::Flush() {
  113. Rep* r = rep_;
  114. assert(!r->closed);
  115. if (!ok()) return;
  116. if (r->data_block.empty()) return;
  117. assert(!r->pending_index_entry);
  118. WriteBlock(&r->data_block, &r->pending_handle);
  119. if (ok()) {
  120. r->pending_index_entry = true;
  121. r->status = r->file->Flush();
  122. }
  123. if (r->filter_block != nullptr) {
  124. r->filter_block->StartBlock(r->offset);
  125. }
  126. }
  127. void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
  128. // File format contains a sequence of blocks where each block has:
  129. // block_data: uint8[n]
  130. // type: uint8
  131. // crc: uint32
  132. assert(ok());
  133. Rep* r = rep_;
  134. Slice raw = block->Finish();
  135. Slice block_contents;
  136. CompressionType type = r->options.compression;
  137. // TODO(postrelease): Support more compression options: zlib?
  138. switch (type) {
  139. case kNoCompression:
  140. block_contents = raw;
  141. break;
  142. case kSnappyCompression: {
  143. std::string* compressed = &r->compressed_output;
  144. if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
  145. compressed->size() < raw.size() - (raw.size() / 8u)) {
  146. block_contents = *compressed;
  147. } else {
  148. // Snappy not supported, or compressed less than 12.5%, so just
  149. // store uncompressed form
  150. block_contents = raw;
  151. type = kNoCompression;
  152. }
  153. break;
  154. }
  155. case kZstdCompression: {
  156. std::string* compressed = &r->compressed_output;
  157. if (port::Zstd_Compress(r->options.zstd_compression_level, raw.data(),
  158. raw.size(), compressed) &&
  159. compressed->size() < raw.size() - (raw.size() / 8u)) {
  160. block_contents = *compressed;
  161. } else {
  162. // Zstd not supported, or compressed less than 12.5%, so just
  163. // store uncompressed form
  164. block_contents = raw;
  165. type = kNoCompression;
  166. }
  167. break;
  168. }
  169. }
  170. WriteRawBlock(block_contents, type, handle);
  171. r->compressed_output.clear();
  172. block->Reset();
  173. }
  174. void TableBuilder::WriteRawBlock(const Slice& block_contents,
  175. CompressionType type, BlockHandle* handle) {
  176. Rep* r = rep_;
  177. handle->set_offset(r->offset);
  178. handle->set_size(block_contents.size());
  179. r->status = r->file->Append(block_contents);
  180. if (r->status.ok()) {
  181. char trailer[kBlockTrailerSize];
  182. trailer[0] = type;
  183. uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
  184. crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
  185. EncodeFixed32(trailer + 1, crc32c::Mask(crc));
  186. r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
  187. if (r->status.ok()) {
  188. r->offset += block_contents.size() + kBlockTrailerSize;
  189. }
  190. }
  191. }
  192. Status TableBuilder::status() const { return rep_->status; }
  193. Status TableBuilder::Finish() {
  194. Rep* r = rep_;
  195. Flush();
  196. assert(!r->closed);
  197. r->closed = true;
  198. BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
  199. // Write filter block
  200. if (ok() && r->filter_block != nullptr) {
  201. WriteRawBlock(r->filter_block->Finish(), kNoCompression,
  202. &filter_block_handle);
  203. }
  204. // Write metaindex block
  205. if (ok()) {
  206. BlockBuilder meta_index_block(&r->options);
  207. if (r->filter_block != nullptr) {
  208. // Add mapping from "filter.Name" to location of filter data
  209. std::string key = "filter.";
  210. key.append(r->options.filter_policy->Name());
  211. std::string handle_encoding;
  212. filter_block_handle.EncodeTo(&handle_encoding);
  213. meta_index_block.Add(key, handle_encoding);
  214. }
  215. // TODO(postrelease): Add stats and other meta blocks
  216. WriteBlock(&meta_index_block, &metaindex_block_handle);
  217. }
  218. // Write index block
  219. if (ok()) {
  220. if (r->pending_index_entry) {
  221. r->options.comparator->FindShortSuccessor(&r->last_key);
  222. std::string handle_encoding;
  223. r->pending_handle.EncodeTo(&handle_encoding);
  224. r->index_block.Add(r->last_key, Slice(handle_encoding));
  225. r->pending_index_entry = false;
  226. }
  227. WriteBlock(&r->index_block, &index_block_handle);
  228. }
  229. // Write footer
  230. if (ok()) {
  231. Footer footer;
  232. footer.set_metaindex_handle(metaindex_block_handle);
  233. footer.set_index_handle(index_block_handle);
  234. std::string footer_encoding;
  235. footer.EncodeTo(&footer_encoding);
  236. r->status = r->file->Append(footer_encoding);
  237. if (r->status.ok()) {
  238. r->offset += footer_encoding.size();
  239. }
  240. }
  241. return r->status;
  242. }
  243. void TableBuilder::Abandon() {
  244. Rep* r = rep_;
  245. assert(!r->closed);
  246. r->closed = true;
  247. }
  248. uint64_t TableBuilder::NumEntries() const { return rep_->num_entries; }
  249. uint64_t TableBuilder::FileSize() const { return rep_->offset; }
  250. } // namespace leveldb