You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

270 line
7.9 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include "leveldb/table_builder.h"
  5. #include <assert.h>
  6. #include "leveldb/comparator.h"
  7. #include "leveldb/env.h"
  8. #include "leveldb/filter_policy.h"
  9. #include "leveldb/options.h"
  10. #include "table/block_builder.h"
  11. #include "table/filter_block.h"
  12. #include "table/format.h"
  13. #include "util/coding.h"
  14. #include "util/crc32c.h"
  15. namespace leveldb {
  16. struct TableBuilder::Rep {
  17. Options options;
  18. Options index_block_options;
  19. WritableFile* file;
  20. uint64_t offset;
  21. Status status;
  22. BlockBuilder data_block;
  23. BlockBuilder index_block;
  24. std::string last_key;
  25. int64_t num_entries;
  26. bool closed; // Either Finish() or Abandon() has been called.
  27. FilterBlockBuilder* filter_block;
  28. // We do not emit the index entry for a block until we have seen the
  29. // first key for the next data block. This allows us to use shorter
  30. // keys in the index block. For example, consider a block boundary
  31. // between the keys "the quick brown fox" and "the who". We can use
  32. // "the r" as the key for the index block entry since it is >= all
  33. // entries in the first block and < all entries in subsequent
  34. // blocks.
  35. //
  36. // Invariant: r->pending_index_entry is true only if data_block is empty.
  37. bool pending_index_entry;
  38. BlockHandle pending_handle; // Handle to add to index block
  39. std::string compressed_output;
  40. Rep(const Options& opt, WritableFile* f)
  41. : options(opt),
  42. index_block_options(opt),
  43. file(f),
  44. offset(0),
  45. data_block(&options),
  46. index_block(&index_block_options),
  47. num_entries(0),
  48. closed(false),
  49. filter_block(opt.filter_policy == nullptr ? nullptr
  50. : new FilterBlockBuilder(opt.filter_policy)),
  51. pending_index_entry(false) {
  52. index_block_options.block_restart_interval = 1;
  53. }
  54. };
  55. TableBuilder::TableBuilder(const Options& options, WritableFile* file)
  56. : rep_(new Rep(options, file)) {
  57. if (rep_->filter_block != nullptr) {
  58. rep_->filter_block->StartBlock(0);
  59. }
  60. }
  61. TableBuilder::~TableBuilder() {
  62. assert(rep_->closed); // Catch errors where caller forgot to call Finish()
  63. delete rep_->filter_block;
  64. delete rep_;
  65. }
  66. Status TableBuilder::ChangeOptions(const Options& options) {
  67. // Note: if more fields are added to Options, update
  68. // this function to catch changes that should not be allowed to
  69. // change in the middle of building a Table.
  70. if (options.comparator != rep_->options.comparator) {
  71. return Status::InvalidArgument("changing comparator while building table");
  72. }
  73. // Note that any live BlockBuilders point to rep_->options and therefore
  74. // will automatically pick up the updated options.
  75. rep_->options = options;
  76. rep_->index_block_options = options;
  77. rep_->index_block_options.block_restart_interval = 1;
  78. return Status::OK();
  79. }
  80. void TableBuilder::Add(const Slice& key, const Slice& value) {
  81. Rep* r = rep_;
  82. assert(!r->closed);
  83. if (!ok()) return;
  84. if (r->num_entries > 0) {
  85. assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
  86. }
  87. if (r->pending_index_entry) {
  88. assert(r->data_block.empty());
  89. r->options.comparator->FindShortestSeparator(&r->last_key, key);
  90. std::string handle_encoding;
  91. r->pending_handle.EncodeTo(&handle_encoding);
  92. r->index_block.Add(r->last_key, Slice(handle_encoding));
  93. r->pending_index_entry = false;
  94. }
  95. if (r->filter_block != nullptr) {
  96. r->filter_block->AddKey(key);
  97. }
  98. r->last_key.assign(key.data(), key.size());
  99. r->num_entries++;
  100. r->data_block.Add(key, value);
  101. const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
  102. if (estimated_block_size >= r->options.block_size) {
  103. Flush();
  104. }
  105. }
  106. void TableBuilder::Flush() {
  107. Rep* r = rep_;
  108. assert(!r->closed);
  109. if (!ok()) return;
  110. if (r->data_block.empty()) return;
  111. assert(!r->pending_index_entry);
  112. WriteBlock(&r->data_block, &r->pending_handle);
  113. if (ok()) {
  114. r->pending_index_entry = true;
  115. r->status = r->file->Flush();
  116. }
  117. if (r->filter_block != nullptr) {
  118. r->filter_block->StartBlock(r->offset);
  119. }
  120. }
  121. void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
  122. // File format contains a sequence of blocks where each block has:
  123. // block_data: uint8[n]
  124. // type: uint8
  125. // crc: uint32
  126. assert(ok());
  127. Rep* r = rep_;
  128. Slice raw = block->Finish();
  129. Slice block_contents;
  130. CompressionType type = r->options.compression;
  131. // TODO(postrelease): Support more compression options: zlib?
  132. switch (type) {
  133. case kNoCompression:
  134. block_contents = raw;
  135. break;
  136. case kSnappyCompression: {
  137. std::string* compressed = &r->compressed_output;
  138. if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
  139. compressed->size() < raw.size() - (raw.size() / 8u)) {
  140. block_contents = *compressed;
  141. } else {
  142. // Snappy not supported, or compressed less than 12.5%, so just
  143. // store uncompressed form
  144. block_contents = raw;
  145. type = kNoCompression;
  146. }
  147. break;
  148. }
  149. }
  150. WriteRawBlock(block_contents, type, handle);
  151. r->compressed_output.clear();
  152. block->Reset();
  153. }
  154. void TableBuilder::WriteRawBlock(const Slice& block_contents,
  155. CompressionType type,
  156. BlockHandle* handle) {
  157. Rep* r = rep_;
  158. handle->set_offset(r->offset);
  159. handle->set_size(block_contents.size());
  160. r->status = r->file->Append(block_contents);
  161. if (r->status.ok()) {
  162. char trailer[kBlockTrailerSize];
  163. trailer[0] = type;
  164. uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
  165. crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
  166. EncodeFixed32(trailer+1, crc32c::Mask(crc));
  167. r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
  168. if (r->status.ok()) {
  169. r->offset += block_contents.size() + kBlockTrailerSize;
  170. }
  171. }
  172. }
  173. Status TableBuilder::status() const {
  174. return rep_->status;
  175. }
  176. Status TableBuilder::Finish() {
  177. Rep* r = rep_;
  178. Flush();
  179. assert(!r->closed);
  180. r->closed = true;
  181. BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
  182. // Write filter block
  183. if (ok() && r->filter_block != nullptr) {
  184. WriteRawBlock(r->filter_block->Finish(), kNoCompression,
  185. &filter_block_handle);
  186. }
  187. // Write metaindex block
  188. if (ok()) {
  189. BlockBuilder meta_index_block(&r->options);
  190. if (r->filter_block != nullptr) {
  191. // Add mapping from "filter.Name" to location of filter data
  192. std::string key = "filter.";
  193. key.append(r->options.filter_policy->Name());
  194. std::string handle_encoding;
  195. filter_block_handle.EncodeTo(&handle_encoding);
  196. meta_index_block.Add(key, handle_encoding);
  197. }
  198. // TODO(postrelease): Add stats and other meta blocks
  199. WriteBlock(&meta_index_block, &metaindex_block_handle);
  200. }
  201. // Write index block
  202. if (ok()) {
  203. if (r->pending_index_entry) {
  204. r->options.comparator->FindShortSuccessor(&r->last_key);
  205. std::string handle_encoding;
  206. r->pending_handle.EncodeTo(&handle_encoding);
  207. r->index_block.Add(r->last_key, Slice(handle_encoding));
  208. r->pending_index_entry = false;
  209. }
  210. WriteBlock(&r->index_block, &index_block_handle);
  211. }
  212. // Write footer
  213. if (ok()) {
  214. Footer footer;
  215. footer.set_metaindex_handle(metaindex_block_handle);
  216. footer.set_index_handle(index_block_handle);
  217. std::string footer_encoding;
  218. footer.EncodeTo(&footer_encoding);
  219. r->status = r->file->Append(footer_encoding);
  220. if (r->status.ok()) {
  221. r->offset += footer_encoding.size();
  222. }
  223. }
  224. return r->status;
  225. }
  226. void TableBuilder::Abandon() {
  227. Rep* r = rep_;
  228. assert(!r->closed);
  229. r->closed = true;
  230. }
  231. uint64_t TableBuilder::NumEntries() const {
  232. return rep_->num_entries;
  233. }
  234. uint64_t TableBuilder::FileSize() const {
  235. return rep_->offset;
  236. }
  237. } // namespace leveldb