小组成员:姚凯文(kevinyao0901),姜嘉琪
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1721 lines
53 KiB

Add Env::Remove{File,Dir} which obsolete Env::Delete{File,Dir}. The "DeleteFile" method name causes pain for Windows developers, because <windows.h> #defines a DeleteFile macro to DeleteFileW or DeleteFileA. Current code uses workarounds, like #undefining DeleteFile everywhere an Env is declared, implemented, or used. This CL removes the need for workarounds by renaming Env::DeleteFile to Env::RemoveFile. For consistency, Env::DeleteDir is also renamed to Env::RemoveDir. A few internal methods are also renamed for consistency. Software that supports Windows is expected to migrate any Env implementations and usage to Remove{File,Dir}, and never use the name Env::Delete{File,Dir} in its code. The renaming is done in a backwards-compatible way, at the risk of making it slightly more difficult to build a new correct Env implementation. The backwards compatibility is achieved using the following hacks: 1) Env::Remove{File,Dir} methods are added, with a default implementation that calls into Env::Delete{File,Dir}. This makes old Env implementations compatible with code that calls into the updated API. 2) The Env::Delete{File,Dir} methods are no longer pure virtuals. Instead, they gain a default implementation that calls into Env::Remove{File,Dir}. This makes updated Env implementations compatible with code that calls into the old API. The cost of this approach is that it's possible to write an Env without overriding either Rename{File,Dir} or Delete{File,Dir}, without getting a compiler warning. However, attempting to run the test suite will immediately fail with an infinite call stack ending in {Remove,Delete}{File,Dir}, making developers aware of the problem. PiperOrigin-RevId: 288710907
4 years ago
Add Env::Remove{File,Dir} which obsolete Env::Delete{File,Dir}. The "DeleteFile" method name causes pain for Windows developers, because <windows.h> #defines a DeleteFile macro to DeleteFileW or DeleteFileA. Current code uses workarounds, like #undefining DeleteFile everywhere an Env is declared, implemented, or used. This CL removes the need for workarounds by renaming Env::DeleteFile to Env::RemoveFile. For consistency, Env::DeleteDir is also renamed to Env::RemoveDir. A few internal methods are also renamed for consistency. Software that supports Windows is expected to migrate any Env implementations and usage to Remove{File,Dir}, and never use the name Env::Delete{File,Dir} in its code. The renaming is done in a backwards-compatible way, at the risk of making it slightly more difficult to build a new correct Env implementation. The backwards compatibility is achieved using the following hacks: 1) Env::Remove{File,Dir} methods are added, with a default implementation that calls into Env::Delete{File,Dir}. This makes old Env implementations compatible with code that calls into the updated API. 2) The Env::Delete{File,Dir} methods are no longer pure virtuals. Instead, they gain a default implementation that calls into Env::Remove{File,Dir}. This makes updated Env implementations compatible with code that calls into the old API. The cost of this approach is that it's possible to write an Env without overriding either Rename{File,Dir} or Delete{File,Dir}, without getting a compiler warning. However, attempting to run the test suite will immediately fail with an infinite call stack ending in {Remove,Delete}{File,Dir}, making developers aware of the problem. PiperOrigin-RevId: 288710907
4 years ago
Add Env::Remove{File,Dir} which obsolete Env::Delete{File,Dir}. The "DeleteFile" method name causes pain for Windows developers, because <windows.h> #defines a DeleteFile macro to DeleteFileW or DeleteFileA. Current code uses workarounds, like #undefining DeleteFile everywhere an Env is declared, implemented, or used. This CL removes the need for workarounds by renaming Env::DeleteFile to Env::RemoveFile. For consistency, Env::DeleteDir is also renamed to Env::RemoveDir. A few internal methods are also renamed for consistency. Software that supports Windows is expected to migrate any Env implementations and usage to Remove{File,Dir}, and never use the name Env::Delete{File,Dir} in its code. The renaming is done in a backwards-compatible way, at the risk of making it slightly more difficult to build a new correct Env implementation. The backwards compatibility is achieved using the following hacks: 1) Env::Remove{File,Dir} methods are added, with a default implementation that calls into Env::Delete{File,Dir}. This makes old Env implementations compatible with code that calls into the updated API. 2) The Env::Delete{File,Dir} methods are no longer pure virtuals. Instead, they gain a default implementation that calls into Env::Remove{File,Dir}. This makes updated Env implementations compatible with code that calls into the old API. The cost of this approach is that it's possible to write an Env without overriding either Rename{File,Dir} or Delete{File,Dir}, without getting a compiler warning. However, attempting to run the test suite will immediately fail with an infinite call stack ending in {Remove,Delete}{File,Dir}, making developers aware of the problem. PiperOrigin-RevId: 288710907
4 years ago
Release 1.18 Changes are: * Update version number to 1.18 * Replace the basic fprintf call with a call to fwrite in order to work around the apparent compiler optimization/rewrite failure that we are seeing with the new toolchain/iOS SDKs provided with Xcode6 and iOS8. * Fix ALL the header guards. * Createed a README.md with the LevelDB project description. * A new CONTRIBUTING file. * Don't implicitly convert uint64_t to size_t or int. Either preserve it as uint64_t, or explicitly cast. This fixes MSVC warnings about possible value truncation when compiling this code in Chromium. * Added a DumpFile() library function that encapsulates the guts of the "leveldbutil dump" command. This will allow clients to dump data to their log files instead of stdout. It will also allow clients to supply their own environment. * leveldb: Remove unused function 'ConsumeChar'. * leveldbutil: Remove unused member variables from WriteBatchItemPrinter. * OpenBSD, NetBSD and DragonflyBSD have _LITTLE_ENDIAN, so define PLATFORM_IS_LITTLE_ENDIAN like on FreeBSD. This fixes: * issue #143 * issue #198 * issue #249 * Switch from <cstdatomic> to <atomic>. The former never made it into the standard and doesn't exist in modern gcc versions at all. The later contains everything that leveldb was using from the former. This problem was noticed when porting to Portable Native Client where no memory barrier is defined. The fact that <cstdatomic> is missing normally goes unnoticed since memory barriers are defined for most architectures. * Make Hash() treat its input as unsigned. Before this change LevelDB files from platforms with different signedness of char were not compatible. This change fixes: issue #243 * Verify checksums of index/meta/filter blocks when paranoid_checks set. * Invoke all tools for iOS with xcrun. (This was causing problems with the new XCode 5.1.1 image on pulse.) * include <sys/stat.h> only once, and fix the following linter warning: "Found C system header after C++ system header" * When encountering a corrupted table file, return Status::Corruption instead of Status::InvalidArgument. * Support cygwin as build platform, patch is from https://code.google.com/p/leveldb/issues/detail?id=188 * Fix typo, merge patch from https://code.google.com/p/leveldb/issues/detail?id=159 * Fix typos and comments, and address the following two issues: * issue #166 * issue #241 * Add missing db synchronize after "fillseq" in the benchmark. * Removed unused variable in SeekRandom: value (issue #201)
10 years ago
Add Env::Remove{File,Dir} which obsolete Env::Delete{File,Dir}. The "DeleteFile" method name causes pain for Windows developers, because <windows.h> #defines a DeleteFile macro to DeleteFileW or DeleteFileA. Current code uses workarounds, like #undefining DeleteFile everywhere an Env is declared, implemented, or used. This CL removes the need for workarounds by renaming Env::DeleteFile to Env::RemoveFile. For consistency, Env::DeleteDir is also renamed to Env::RemoveDir. A few internal methods are also renamed for consistency. Software that supports Windows is expected to migrate any Env implementations and usage to Remove{File,Dir}, and never use the name Env::Delete{File,Dir} in its code. The renaming is done in a backwards-compatible way, at the risk of making it slightly more difficult to build a new correct Env implementation. The backwards compatibility is achieved using the following hacks: 1) Env::Remove{File,Dir} methods are added, with a default implementation that calls into Env::Delete{File,Dir}. This makes old Env implementations compatible with code that calls into the updated API. 2) The Env::Delete{File,Dir} methods are no longer pure virtuals. Instead, they gain a default implementation that calls into Env::Remove{File,Dir}. This makes updated Env implementations compatible with code that calls into the old API. The cost of this approach is that it's possible to write an Env without overriding either Rename{File,Dir} or Delete{File,Dir}, without getting a compiler warning. However, attempting to run the test suite will immediately fail with an infinite call stack ending in {Remove,Delete}{File,Dir}, making developers aware of the problem. PiperOrigin-RevId: 288710907
4 years ago
3 weeks ago
Add Env::Remove{File,Dir} which obsolete Env::Delete{File,Dir}. The "DeleteFile" method name causes pain for Windows developers, because <windows.h> #defines a DeleteFile macro to DeleteFileW or DeleteFileA. Current code uses workarounds, like #undefining DeleteFile everywhere an Env is declared, implemented, or used. This CL removes the need for workarounds by renaming Env::DeleteFile to Env::RemoveFile. For consistency, Env::DeleteDir is also renamed to Env::RemoveDir. A few internal methods are also renamed for consistency. Software that supports Windows is expected to migrate any Env implementations and usage to Remove{File,Dir}, and never use the name Env::Delete{File,Dir} in its code. The renaming is done in a backwards-compatible way, at the risk of making it slightly more difficult to build a new correct Env implementation. The backwards compatibility is achieved using the following hacks: 1) Env::Remove{File,Dir} methods are added, with a default implementation that calls into Env::Delete{File,Dir}. This makes old Env implementations compatible with code that calls into the updated API. 2) The Env::Delete{File,Dir} methods are no longer pure virtuals. Instead, they gain a default implementation that calls into Env::Remove{File,Dir}. This makes updated Env implementations compatible with code that calls into the old API. The cost of this approach is that it's possible to write an Env without overriding either Rename{File,Dir} or Delete{File,Dir}, without getting a compiler warning. However, attempting to run the test suite will immediately fail with an infinite call stack ending in {Remove,Delete}{File,Dir}, making developers aware of the problem. PiperOrigin-RevId: 288710907
4 years ago
Add Env::Remove{File,Dir} which obsolete Env::Delete{File,Dir}. The "DeleteFile" method name causes pain for Windows developers, because <windows.h> #defines a DeleteFile macro to DeleteFileW or DeleteFileA. Current code uses workarounds, like #undefining DeleteFile everywhere an Env is declared, implemented, or used. This CL removes the need for workarounds by renaming Env::DeleteFile to Env::RemoveFile. For consistency, Env::DeleteDir is also renamed to Env::RemoveDir. A few internal methods are also renamed for consistency. Software that supports Windows is expected to migrate any Env implementations and usage to Remove{File,Dir}, and never use the name Env::Delete{File,Dir} in its code. The renaming is done in a backwards-compatible way, at the risk of making it slightly more difficult to build a new correct Env implementation. The backwards compatibility is achieved using the following hacks: 1) Env::Remove{File,Dir} methods are added, with a default implementation that calls into Env::Delete{File,Dir}. This makes old Env implementations compatible with code that calls into the updated API. 2) The Env::Delete{File,Dir} methods are no longer pure virtuals. Instead, they gain a default implementation that calls into Env::Remove{File,Dir}. This makes updated Env implementations compatible with code that calls into the old API. The cost of this approach is that it's possible to write an Env without overriding either Rename{File,Dir} or Delete{File,Dir}, without getting a compiler warning. However, attempting to run the test suite will immediately fail with an infinite call stack ending in {Remove,Delete}{File,Dir}, making developers aware of the problem. PiperOrigin-RevId: 288710907
4 years ago
3 weeks ago
3 weeks ago
Release 1.18 Changes are: * Update version number to 1.18 * Replace the basic fprintf call with a call to fwrite in order to work around the apparent compiler optimization/rewrite failure that we are seeing with the new toolchain/iOS SDKs provided with Xcode6 and iOS8. * Fix ALL the header guards. * Createed a README.md with the LevelDB project description. * A new CONTRIBUTING file. * Don't implicitly convert uint64_t to size_t or int. Either preserve it as uint64_t, or explicitly cast. This fixes MSVC warnings about possible value truncation when compiling this code in Chromium. * Added a DumpFile() library function that encapsulates the guts of the "leveldbutil dump" command. This will allow clients to dump data to their log files instead of stdout. It will also allow clients to supply their own environment. * leveldb: Remove unused function 'ConsumeChar'. * leveldbutil: Remove unused member variables from WriteBatchItemPrinter. * OpenBSD, NetBSD and DragonflyBSD have _LITTLE_ENDIAN, so define PLATFORM_IS_LITTLE_ENDIAN like on FreeBSD. This fixes: * issue #143 * issue #198 * issue #249 * Switch from <cstdatomic> to <atomic>. The former never made it into the standard and doesn't exist in modern gcc versions at all. The later contains everything that leveldb was using from the former. This problem was noticed when porting to Portable Native Client where no memory barrier is defined. The fact that <cstdatomic> is missing normally goes unnoticed since memory barriers are defined for most architectures. * Make Hash() treat its input as unsigned. Before this change LevelDB files from platforms with different signedness of char were not compatible. This change fixes: issue #243 * Verify checksums of index/meta/filter blocks when paranoid_checks set. * Invoke all tools for iOS with xcrun. (This was causing problems with the new XCode 5.1.1 image on pulse.) * include <sys/stat.h> only once, and fix the following linter warning: "Found C system header after C++ system header" * When encountering a corrupted table file, return Status::Corruption instead of Status::InvalidArgument. * Support cygwin as build platform, patch is from https://code.google.com/p/leveldb/issues/detail?id=188 * Fix typo, merge patch from https://code.google.com/p/leveldb/issues/detail?id=159 * Fix typos and comments, and address the following two issues: * issue #166 * issue #241 * Add missing db synchronize after "fillseq" in the benchmark. * Removed unused variable in SeekRandom: value (issue #201)
10 years ago
Add Env::Remove{File,Dir} which obsolete Env::Delete{File,Dir}. The "DeleteFile" method name causes pain for Windows developers, because <windows.h> #defines a DeleteFile macro to DeleteFileW or DeleteFileA. Current code uses workarounds, like #undefining DeleteFile everywhere an Env is declared, implemented, or used. This CL removes the need for workarounds by renaming Env::DeleteFile to Env::RemoveFile. For consistency, Env::DeleteDir is also renamed to Env::RemoveDir. A few internal methods are also renamed for consistency. Software that supports Windows is expected to migrate any Env implementations and usage to Remove{File,Dir}, and never use the name Env::Delete{File,Dir} in its code. The renaming is done in a backwards-compatible way, at the risk of making it slightly more difficult to build a new correct Env implementation. The backwards compatibility is achieved using the following hacks: 1) Env::Remove{File,Dir} methods are added, with a default implementation that calls into Env::Delete{File,Dir}. This makes old Env implementations compatible with code that calls into the updated API. 2) The Env::Delete{File,Dir} methods are no longer pure virtuals. Instead, they gain a default implementation that calls into Env::Remove{File,Dir}. This makes updated Env implementations compatible with code that calls into the old API. The cost of this approach is that it's possible to write an Env without overriding either Rename{File,Dir} or Delete{File,Dir}, without getting a compiler warning. However, attempting to run the test suite will immediately fail with an infinite call stack ending in {Remove,Delete}{File,Dir}, making developers aware of the problem. PiperOrigin-RevId: 288710907
4 years ago
Add Env::Remove{File,Dir} which obsolete Env::Delete{File,Dir}. The "DeleteFile" method name causes pain for Windows developers, because <windows.h> #defines a DeleteFile macro to DeleteFileW or DeleteFileA. Current code uses workarounds, like #undefining DeleteFile everywhere an Env is declared, implemented, or used. This CL removes the need for workarounds by renaming Env::DeleteFile to Env::RemoveFile. For consistency, Env::DeleteDir is also renamed to Env::RemoveDir. A few internal methods are also renamed for consistency. Software that supports Windows is expected to migrate any Env implementations and usage to Remove{File,Dir}, and never use the name Env::Delete{File,Dir} in its code. The renaming is done in a backwards-compatible way, at the risk of making it slightly more difficult to build a new correct Env implementation. The backwards compatibility is achieved using the following hacks: 1) Env::Remove{File,Dir} methods are added, with a default implementation that calls into Env::Delete{File,Dir}. This makes old Env implementations compatible with code that calls into the updated API. 2) The Env::Delete{File,Dir} methods are no longer pure virtuals. Instead, they gain a default implementation that calls into Env::Remove{File,Dir}. This makes updated Env implementations compatible with code that calls into the old API. The cost of this approach is that it's possible to write an Env without overriding either Rename{File,Dir} or Delete{File,Dir}, without getting a compiler warning. However, attempting to run the test suite will immediately fail with an infinite call stack ending in {Remove,Delete}{File,Dir}, making developers aware of the problem. PiperOrigin-RevId: 288710907
4 years ago
Add Env::Remove{File,Dir} which obsolete Env::Delete{File,Dir}. The "DeleteFile" method name causes pain for Windows developers, because <windows.h> #defines a DeleteFile macro to DeleteFileW or DeleteFileA. Current code uses workarounds, like #undefining DeleteFile everywhere an Env is declared, implemented, or used. This CL removes the need for workarounds by renaming Env::DeleteFile to Env::RemoveFile. For consistency, Env::DeleteDir is also renamed to Env::RemoveDir. A few internal methods are also renamed for consistency. Software that supports Windows is expected to migrate any Env implementations and usage to Remove{File,Dir}, and never use the name Env::Delete{File,Dir} in its code. The renaming is done in a backwards-compatible way, at the risk of making it slightly more difficult to build a new correct Env implementation. The backwards compatibility is achieved using the following hacks: 1) Env::Remove{File,Dir} methods are added, with a default implementation that calls into Env::Delete{File,Dir}. This makes old Env implementations compatible with code that calls into the updated API. 2) The Env::Delete{File,Dir} methods are no longer pure virtuals. Instead, they gain a default implementation that calls into Env::Remove{File,Dir}. This makes updated Env implementations compatible with code that calls into the old API. The cost of this approach is that it's possible to write an Env without overriding either Rename{File,Dir} or Delete{File,Dir}, without getting a compiler warning. However, attempting to run the test suite will immediately fail with an infinite call stack ending in {Remove,Delete}{File,Dir}, making developers aware of the problem. PiperOrigin-RevId: 288710907
4 years ago
  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include "db/db_impl.h"
  5. #include <algorithm>
  6. #include <atomic>
  7. #include <cstdint>
  8. #include <cstdio>
  9. #include <set>
  10. #include <string>
  11. #include <vector>
  12. #include <iostream>
  13. #include "db/builder.h"
  14. #include "db/db_iter.h"
  15. #include "db/dbformat.h"
  16. #include "db/filename.h"
  17. #include "db/log_reader.h"
  18. #include "db/log_writer.h"
  19. #include "db/memtable.h"
  20. #include "db/table_cache.h"
  21. #include "db/version_set.h"
  22. #include "db/write_batch_internal.h"
  23. #include "leveldb/db.h"
  24. #include "leveldb/env.h"
  25. #include "leveldb/status.h"
  26. #include "leveldb/table.h"
  27. #include "leveldb/table_builder.h"
  28. #include "port/port.h"
  29. #include "table/block.h"
  30. #include "table/merger.h"
  31. #include "table/two_level_iterator.h"
  32. #include "util/coding.h"
  33. #include "util/logging.h"
  34. #include "util/mutexlock.h"
  35. namespace leveldb {
  36. const int kNumNonTableCacheFiles = 10;
  37. //TTL ToDo : add func for TTL Put
  38. void AppendExpirationTime(std::string* value, uint64_t expiration_time) {
  39. // 直接将小端序的过期时间戳(64位整数)附加到值的前面
  40. value->append(reinterpret_cast<const char*>(&expiration_time), sizeof(expiration_time));
  41. }
  42. uint64_t GetCurrentTime() {
  43. // 返回当前的Unix时间戳
  44. return static_cast<uint64_t>(time(nullptr));
  45. }
  46. // 解析过期时间戳
  47. uint64_t ParseExpirationTime(const std::string& value) {
  48. // 假设过期时间戳在值的前 8 字节
  49. assert(value.size() >= sizeof(uint64_t));
  50. uint64_t expiration_time;
  51. memcpy(&expiration_time, value.data(), sizeof(uint64_t));
  52. return expiration_time; // 直接返回小端序的值
  53. }
  54. // 解析出实际的值(去掉前面的过期时间戳部分)
  55. std::string ParseActualValue(const std::string& value) {
  56. // 去掉前 8 字节(存储过期时间戳),返回实际值
  57. return value.substr(sizeof(uint64_t));
  58. }
  59. //finish modify
  60. // Information kept for every waiting writer
  61. struct DBImpl::Writer {
  62. explicit Writer(port::Mutex* mu)
  63. : batch(nullptr), sync(false), done(false), cv(mu) {}
  64. Status status;
  65. WriteBatch* batch;
  66. bool sync;
  67. bool done;
  68. port::CondVar cv;
  69. };
  70. struct DBImpl::CompactionState {
  71. // Files produced by compaction
  72. struct Output {
  73. uint64_t number;
  74. uint64_t file_size;
  75. InternalKey smallest, largest;
  76. };
  77. Output* current_output() { return &outputs[outputs.size() - 1]; }
  78. explicit CompactionState(Compaction* c)
  79. : compaction(c),
  80. smallest_snapshot(0),
  81. outfile(nullptr),
  82. builder(nullptr),
  83. total_bytes(0) {}
  84. Compaction* const compaction;
  85. // Sequence numbers < smallest_snapshot are not significant since we
  86. // will never have to service a snapshot below smallest_snapshot.
  87. // Therefore if we have seen a sequence number S <= smallest_snapshot,
  88. // we can drop all entries for the same key with sequence numbers < S.
  89. SequenceNumber smallest_snapshot;
  90. std::vector<Output> outputs;
  91. // State kept for output being generated
  92. WritableFile* outfile;
  93. TableBuilder* builder;
  94. uint64_t total_bytes;
  95. };
  96. // Fix user-supplied options to be reasonable
  97. template <class T, class V>
  98. static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
  99. if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
  100. if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
  101. }
  102. Options SanitizeOptions(const std::string& dbname,
  103. const InternalKeyComparator* icmp,
  104. const InternalFilterPolicy* ipolicy,
  105. const Options& src) {
  106. Options result = src;
  107. result.comparator = icmp;
  108. result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
  109. ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
  110. ClipToRange(&result.write_buffer_size, 64 << 10, 1 << 30);
  111. ClipToRange(&result.max_file_size, 1 << 20, 1 << 30);
  112. ClipToRange(&result.block_size, 1 << 10, 4 << 20);
  113. if (result.info_log == nullptr) {
  114. // Open a log file in the same directory as the db
  115. src.env->CreateDir(dbname); // In case it does not exist
  116. src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
  117. Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
  118. if (!s.ok()) {
  119. // No place suitable for logging
  120. result.info_log = nullptr;
  121. }
  122. }
  123. if (result.block_cache == nullptr) {
  124. result.block_cache = NewLRUCache(8 << 20);
  125. }
  126. return result;
  127. }
  128. static int TableCacheSize(const Options& sanitized_options) {
  129. // Reserve ten files or so for other uses and give the rest to TableCache.
  130. return sanitized_options.max_open_files - kNumNonTableCacheFiles;
  131. }
  132. DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
  133. : env_(raw_options.env),
  134. internal_comparator_(raw_options.comparator),
  135. internal_filter_policy_(raw_options.filter_policy),
  136. options_(SanitizeOptions(dbname, &internal_comparator_,
  137. &internal_filter_policy_, raw_options)),
  138. owns_info_log_(options_.info_log != raw_options.info_log),
  139. owns_cache_(options_.block_cache != raw_options.block_cache),
  140. dbname_(dbname),
  141. table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
  142. db_lock_(nullptr),
  143. shutting_down_(false),
  144. background_work_finished_signal_(&mutex_),
  145. mem_(nullptr),
  146. imm_(nullptr),
  147. has_imm_(false),
  148. logfile_(nullptr),
  149. logfile_number_(0),
  150. log_(nullptr),
  151. seed_(0),
  152. tmp_batch_(new WriteBatch),
  153. background_compaction_scheduled_(false),
  154. manual_compaction_(nullptr),
  155. versions_(new VersionSet(dbname_, &options_, table_cache_,
  156. &internal_comparator_)) {}
  157. DBImpl::~DBImpl() {
  158. // Wait for background work to finish.
  159. mutex_.Lock();
  160. shutting_down_.store(true, std::memory_order_release);
  161. while (background_compaction_scheduled_) {
  162. background_work_finished_signal_.Wait();
  163. }
  164. mutex_.Unlock();
  165. if (db_lock_ != nullptr) {
  166. env_->UnlockFile(db_lock_);
  167. }
  168. delete versions_;
  169. if (mem_ != nullptr) mem_->Unref();
  170. if (imm_ != nullptr) imm_->Unref();
  171. delete tmp_batch_;
  172. delete log_;
  173. delete logfile_;
  174. delete table_cache_;
  175. if (owns_info_log_) {
  176. delete options_.info_log;
  177. }
  178. if (owns_cache_) {
  179. delete options_.block_cache;
  180. }
  181. }
  182. Status DBImpl::NewDB() {
  183. VersionEdit new_db;
  184. new_db.SetComparatorName(user_comparator()->Name());
  185. new_db.SetLogNumber(0);
  186. new_db.SetNextFile(2);
  187. new_db.SetLastSequence(0);
  188. const std::string manifest = DescriptorFileName(dbname_, 1);
  189. WritableFile* file;
  190. Status s = env_->NewWritableFile(manifest, &file);
  191. if (!s.ok()) {
  192. return s;
  193. }
  194. {
  195. log::Writer log(file);
  196. std::string record;
  197. new_db.EncodeTo(&record);
  198. s = log.AddRecord(record);
  199. if (s.ok()) {
  200. s = file->Sync();
  201. }
  202. if (s.ok()) {
  203. s = file->Close();
  204. }
  205. }
  206. delete file;
  207. if (s.ok()) {
  208. // Make "CURRENT" file that points to the new manifest file.
  209. s = SetCurrentFile(env_, dbname_, 1);
  210. } else {
  211. env_->RemoveFile(manifest);
  212. }
  213. return s;
  214. }
  215. void DBImpl::MaybeIgnoreError(Status* s) const {
  216. if (s->ok() || options_.paranoid_checks) {
  217. // No change needed
  218. } else {
  219. Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
  220. *s = Status::OK();
  221. }
  222. }
  223. void DBImpl::RemoveObsoleteFiles() {
  224. mutex_.AssertHeld();
  225. if (!bg_error_.ok()) {
  226. // After a background error, we don't know whether a new version may
  227. // or may not have been committed, so we cannot safely garbage collect.
  228. return;
  229. }
  230. // Make a set of all of the live files
  231. std::set<uint64_t> live = pending_outputs_;
  232. versions_->AddLiveFiles(&live);
  233. std::vector<std::string> filenames;
  234. env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
  235. uint64_t number;
  236. FileType type;
  237. std::vector<std::string> files_to_delete;
  238. for (std::string& filename : filenames) {
  239. if (ParseFileName(filename, &number, &type)) {
  240. bool keep = true;
  241. switch (type) {
  242. case kLogFile:
  243. keep = ((number >= versions_->LogNumber()) ||
  244. (number == versions_->PrevLogNumber()));
  245. break;
  246. case kDescriptorFile:
  247. // Keep my manifest file, and any newer incarnations'
  248. // (in case there is a race that allows other incarnations)
  249. keep = (number >= versions_->ManifestFileNumber());
  250. break;
  251. case kTableFile:
  252. keep = (live.find(number) != live.end());
  253. break;
  254. case kTempFile:
  255. // Any temp files that are currently being written to must
  256. // be recorded in pending_outputs_, which is inserted into "live"
  257. keep = (live.find(number) != live.end());
  258. break;
  259. case kCurrentFile:
  260. case kDBLockFile:
  261. case kInfoLogFile:
  262. keep = true;
  263. break;
  264. }
  265. if (!keep) {
  266. files_to_delete.push_back(std::move(filename));
  267. if (type == kTableFile) {
  268. table_cache_->Evict(number);
  269. }
  270. Log(options_.info_log, "Delete type=%d #%lld\n", static_cast<int>(type),
  271. static_cast<unsigned long long>(number));
  272. }
  273. }
  274. }
  275. // While deleting all files unblock other threads. All files being deleted
  276. // have unique names which will not collide with newly created files and
  277. // are therefore safe to delete while allowing other threads to proceed.
  278. mutex_.Unlock();
  279. for (const std::string& filename : files_to_delete) {
  280. env_->RemoveFile(dbname_ + "/" + filename);
  281. }
  282. mutex_.Lock();
  283. }
  284. Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
  285. mutex_.AssertHeld();
  286. // Ignore error from CreateDir since the creation of the DB is
  287. // committed only when the descriptor is created, and this directory
  288. // may already exist from a previous failed creation attempt.
  289. env_->CreateDir(dbname_);
  290. assert(db_lock_ == nullptr);
  291. Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
  292. if (!s.ok()) {
  293. return s;
  294. }
  295. if (!env_->FileExists(CurrentFileName(dbname_))) {
  296. if (options_.create_if_missing) {
  297. Log(options_.info_log, "Creating DB %s since it was missing.",
  298. dbname_.c_str());
  299. s = NewDB();
  300. if (!s.ok()) {
  301. return s;
  302. }
  303. } else {
  304. return Status::InvalidArgument(
  305. dbname_, "does not exist (create_if_missing is false)");
  306. }
  307. } else {
  308. if (options_.error_if_exists) {
  309. return Status::InvalidArgument(dbname_,
  310. "exists (error_if_exists is true)");
  311. }
  312. }
  313. s = versions_->Recover(save_manifest);
  314. if (!s.ok()) {
  315. return s;
  316. }
  317. SequenceNumber max_sequence(0);
  318. // Recover from all newer log files than the ones named in the
  319. // descriptor (new log files may have been added by the previous
  320. // incarnation without registering them in the descriptor).
  321. //
  322. // Note that PrevLogNumber() is no longer used, but we pay
  323. // attention to it in case we are recovering a database
  324. // produced by an older version of leveldb.
  325. const uint64_t min_log = versions_->LogNumber();
  326. const uint64_t prev_log = versions_->PrevLogNumber();
  327. std::vector<std::string> filenames;
  328. s = env_->GetChildren(dbname_, &filenames);
  329. if (!s.ok()) {
  330. return s;
  331. }
  332. std::set<uint64_t> expected;
  333. versions_->AddLiveFiles(&expected);
  334. uint64_t number;
  335. FileType type;
  336. std::vector<uint64_t> logs;
  337. for (size_t i = 0; i < filenames.size(); i++) {
  338. if (ParseFileName(filenames[i], &number, &type)) {
  339. expected.erase(number);
  340. if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
  341. logs.push_back(number);
  342. }
  343. }
  344. if (!expected.empty()) {
  345. char buf[50];
  346. std::snprintf(buf, sizeof(buf), "%d missing files; e.g.",
  347. static_cast<int>(expected.size()));
  348. return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
  349. }
  350. // Recover in the order in which the logs were generated
  351. std::sort(logs.begin(), logs.end());
  352. for (size_t i = 0; i < logs.size(); i++) {
  353. s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
  354. &max_sequence);
  355. if (!s.ok()) {
  356. return s;
  357. }
  358. // The previous incarnation may not have written any MANIFEST
  359. // records after allocating this log number. So we manually
  360. // update the file number allocation counter in VersionSet.
  361. versions_->MarkFileNumberUsed(logs[i]);
  362. }
  363. if (versions_->LastSequence() < max_sequence) {
  364. versions_->SetLastSequence(max_sequence);
  365. }
  366. return Status::OK();
  367. }
  368. Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
  369. bool* save_manifest, VersionEdit* edit,
  370. SequenceNumber* max_sequence) {
  371. struct LogReporter : public log::Reader::Reporter {
  372. Env* env;
  373. Logger* info_log;
  374. const char* fname;
  375. Status* status; // null if options_.paranoid_checks==false
  376. void Corruption(size_t bytes, const Status& s) override {
  377. Log(info_log, "%s%s: dropping %d bytes; %s",
  378. (this->status == nullptr ? "(ignoring error) " : ""), fname,
  379. static_cast<int>(bytes), s.ToString().c_str());
  380. if (this->status != nullptr && this->status->ok()) *this->status = s;
  381. }
  382. };
  383. mutex_.AssertHeld();
  384. // Open the log file
  385. std::string fname = LogFileName(dbname_, log_number);
  386. SequentialFile* file;
  387. Status status = env_->NewSequentialFile(fname, &file);
  388. if (!status.ok()) {
  389. MaybeIgnoreError(&status);
  390. return status;
  391. }
  392. // Create the log reader.
  393. LogReporter reporter;
  394. reporter.env = env_;
  395. reporter.info_log = options_.info_log;
  396. reporter.fname = fname.c_str();
  397. reporter.status = (options_.paranoid_checks ? &status : nullptr);
  398. // We intentionally make log::Reader do checksumming even if
  399. // paranoid_checks==false so that corruptions cause entire commits
  400. // to be skipped instead of propagating bad information (like overly
  401. // large sequence numbers).
  402. log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
  403. Log(options_.info_log, "Recovering log #%llu",
  404. (unsigned long long)log_number);
  405. // Read all the records and add to a memtable
  406. std::string scratch;
  407. Slice record;
  408. WriteBatch batch;
  409. int compactions = 0;
  410. MemTable* mem = nullptr;
  411. while (reader.ReadRecord(&record, &scratch) && status.ok()) {
  412. if (record.size() < 12) {
  413. reporter.Corruption(record.size(),
  414. Status::Corruption("log record too small"));
  415. continue;
  416. }
  417. WriteBatchInternal::SetContents(&batch, record);
  418. if (mem == nullptr) {
  419. mem = new MemTable(internal_comparator_);
  420. mem->Ref();
  421. }
  422. status = WriteBatchInternal::InsertInto(&batch, mem);
  423. MaybeIgnoreError(&status);
  424. if (!status.ok()) {
  425. break;
  426. }
  427. const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
  428. WriteBatchInternal::Count(&batch) - 1;
  429. if (last_seq > *max_sequence) {
  430. *max_sequence = last_seq;
  431. }
  432. if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
  433. compactions++;
  434. *save_manifest = true;
  435. status = WriteLevel0Table(mem, edit, nullptr);
  436. mem->Unref();
  437. mem = nullptr;
  438. if (!status.ok()) {
  439. // Reflect errors immediately so that conditions like full
  440. // file-systems cause the DB::Open() to fail.
  441. break;
  442. }
  443. }
  444. }
  445. delete file;
  446. // See if we should keep reusing the last log file.
  447. if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
  448. assert(logfile_ == nullptr);
  449. assert(log_ == nullptr);
  450. assert(mem_ == nullptr);
  451. uint64_t lfile_size;
  452. if (env_->GetFileSize(fname, &lfile_size).ok() &&
  453. env_->NewAppendableFile(fname, &logfile_).ok()) {
  454. Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
  455. log_ = new log::Writer(logfile_, lfile_size);
  456. logfile_number_ = log_number;
  457. if (mem != nullptr) {
  458. mem_ = mem;
  459. mem = nullptr;
  460. } else {
  461. // mem can be nullptr if lognum exists but was empty.
  462. mem_ = new MemTable(internal_comparator_);
  463. mem_->Ref();
  464. }
  465. }
  466. }
  467. if (mem != nullptr) {
  468. // mem did not get reused; compact it.
  469. if (status.ok()) {
  470. *save_manifest = true;
  471. status = WriteLevel0Table(mem, edit, nullptr);
  472. }
  473. mem->Unref();
  474. }
  475. return status;
  476. }
  477. Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
  478. Version* base) {
  479. mutex_.AssertHeld();
  480. const uint64_t start_micros = env_->NowMicros();
  481. FileMetaData meta;
  482. meta.number = versions_->NewFileNumber();
  483. pending_outputs_.insert(meta.number);
  484. Iterator* iter = mem->NewIterator();
  485. Log(options_.info_log, "Level-0 table #%llu: started",
  486. (unsigned long long)meta.number);
  487. Status s;
  488. {
  489. mutex_.Unlock();
  490. s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
  491. mutex_.Lock();
  492. }
  493. Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
  494. (unsigned long long)meta.number, (unsigned long long)meta.file_size,
  495. s.ToString().c_str());
  496. delete iter;
  497. pending_outputs_.erase(meta.number);
  498. // Note that if file_size is zero, the file has been deleted and
  499. // should not be added to the manifest.
  500. int level = 0;
  501. if (s.ok() && meta.file_size > 0) {
  502. const Slice min_user_key = meta.smallest.user_key();
  503. const Slice max_user_key = meta.largest.user_key();
  504. if (base != nullptr) {
  505. level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
  506. }
  507. edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
  508. meta.largest);
  509. }
  510. CompactionStats stats;
  511. stats.micros = env_->NowMicros() - start_micros;
  512. stats.bytes_written = meta.file_size;
  513. stats_[level].Add(stats);
  514. return s;
  515. }
  516. void DBImpl::CompactMemTable() {
  517. mutex_.AssertHeld();
  518. assert(imm_ != nullptr);
  519. // Save the contents of the memtable as a new Table
  520. VersionEdit edit;
  521. Version* base = versions_->current();
  522. base->Ref();
  523. Status s = WriteLevel0Table(imm_, &edit, base);
  524. base->Unref();
  525. if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
  526. s = Status::IOError("Deleting DB during memtable compaction");
  527. }
  528. // Replace immutable memtable with the generated Table
  529. if (s.ok()) {
  530. edit.SetPrevLogNumber(0);
  531. edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
  532. s = versions_->LogAndApply(&edit, &mutex_);
  533. }
  534. if (s.ok()) {
  535. // Commit to the new state
  536. imm_->Unref();
  537. imm_ = nullptr;
  538. has_imm_.store(false, std::memory_order_release);
  539. RemoveObsoleteFiles();
  540. } else {
  541. RecordBackgroundError(s);
  542. }
  543. }
  544. void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
  545. int max_level_with_files = 1;
  546. {
  547. MutexLock l(&mutex_);
  548. Version* base = versions_->current();
  549. for (int level = 1; level < config::kNumLevels; level++) {
  550. if (base->OverlapInLevel(level, begin, end)) {
  551. max_level_with_files = level;
  552. }
  553. }
  554. }
  555. TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
  556. for (int level = 0; level <= max_level_with_files; level++) {
  557. TEST_CompactRange(level, begin, end);
  558. }
  559. }
  560. void DBImpl::TEST_CompactRange(int level, const Slice* begin,
  561. const Slice* end) {
  562. assert(level >= 0);
  563. assert(level + 1 < config::kNumLevels);
  564. InternalKey begin_storage, end_storage;
  565. ManualCompaction manual;
  566. manual.level = level;
  567. manual.done = false;
  568. if (begin == nullptr) {
  569. manual.begin = nullptr;
  570. } else {
  571. begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
  572. manual.begin = &begin_storage;
  573. }
  574. if (end == nullptr) {
  575. manual.end = nullptr;
  576. } else {
  577. end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
  578. manual.end = &end_storage;
  579. }
  580. MutexLock l(&mutex_);
  581. while (!manual.done && !shutting_down_.load(std::memory_order_acquire) &&
  582. bg_error_.ok()) {
  583. if (manual_compaction_ == nullptr) { // Idle
  584. manual_compaction_ = &manual;
  585. MaybeScheduleCompaction();
  586. } else { // Running either my compaction or another compaction.
  587. background_work_finished_signal_.Wait();
  588. }
  589. }
  590. // Finish current background compaction in the case where
  591. // `background_work_finished_signal_` was signalled due to an error.
  592. while (background_compaction_scheduled_) {
  593. background_work_finished_signal_.Wait();
  594. }
  595. if (manual_compaction_ == &manual) {
  596. // Cancel my manual compaction since we aborted early for some reason.
  597. manual_compaction_ = nullptr;
  598. }
  599. }
  600. Status DBImpl::TEST_CompactMemTable() {
  601. // nullptr batch means just wait for earlier writes to be done
  602. Status s = Write(WriteOptions(), nullptr);
  603. if (s.ok()) {
  604. // Wait until the compaction completes
  605. MutexLock l(&mutex_);
  606. while (imm_ != nullptr && bg_error_.ok()) {
  607. background_work_finished_signal_.Wait();
  608. }
  609. if (imm_ != nullptr) {
  610. s = bg_error_;
  611. }
  612. }
  613. return s;
  614. }
  615. void DBImpl::RecordBackgroundError(const Status& s) {
  616. mutex_.AssertHeld();
  617. if (bg_error_.ok()) {
  618. bg_error_ = s;
  619. background_work_finished_signal_.SignalAll();
  620. }
  621. }
  622. void DBImpl::MaybeScheduleCompaction() {
  623. mutex_.AssertHeld();
  624. if (background_compaction_scheduled_) {
  625. // Already scheduled
  626. } else if (shutting_down_.load(std::memory_order_acquire)) {
  627. // DB is being deleted; no more background compactions
  628. } else if (!bg_error_.ok()) {
  629. // Already got an error; no more changes
  630. } else if (imm_ == nullptr && manual_compaction_ == nullptr &&
  631. !versions_->NeedsCompaction()) {
  632. // No work to be done
  633. } else {
  634. background_compaction_scheduled_ = true;
  635. env_->Schedule(&DBImpl::BGWork, this);
  636. }
  637. }
  638. void DBImpl::BGWork(void* db) {
  639. reinterpret_cast<DBImpl*>(db)->BackgroundCall();
  640. }
  641. void DBImpl::BackgroundCall() {
  642. MutexLock l(&mutex_);
  643. assert(background_compaction_scheduled_);
  644. if (shutting_down_.load(std::memory_order_acquire)) {
  645. // No more background work when shutting down.
  646. } else if (!bg_error_.ok()) {
  647. // No more background work after a background error.
  648. } else {
  649. BackgroundCompaction();
  650. }
  651. background_compaction_scheduled_ = false;
  652. // Previous compaction may have produced too many files in a level,
  653. // so reschedule another compaction if needed.
  654. MaybeScheduleCompaction();
  655. background_work_finished_signal_.SignalAll();
  656. }
  657. void DBImpl::BackgroundCompaction() {
  658. mutex_.AssertHeld();
  659. if (imm_ != nullptr) {
  660. CompactMemTable();
  661. return;
  662. }
  663. Compaction* c;
  664. bool is_manual = (manual_compaction_ != nullptr);
  665. InternalKey manual_end;
  666. if (is_manual) {
  667. ManualCompaction* m = manual_compaction_;
  668. c = versions_->CompactRange(m->level, m->begin, m->end);
  669. m->done = (c == nullptr);
  670. if (c != nullptr) {
  671. manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
  672. }
  673. Log(options_.info_log,
  674. "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
  675. m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
  676. (m->end ? m->end->DebugString().c_str() : "(end)"),
  677. (m->done ? "(end)" : manual_end.DebugString().c_str()));
  678. } else {
  679. c = versions_->PickCompaction();
  680. }
  681. Status status;
  682. if (c == nullptr) {
  683. // Nothing to do
  684. } else if (!is_manual && c->IsTrivialMove() && 1==2) {
  685. // Move file to next level
  686. assert(c->num_input_files(0) == 1);
  687. FileMetaData* f = c->input(0, 0);
  688. c->edit()->RemoveFile(c->level(), f->number);
  689. c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
  690. f->largest);
  691. status = versions_->LogAndApply(c->edit(), &mutex_);
  692. if (!status.ok()) {
  693. RecordBackgroundError(status);
  694. }
  695. VersionSet::LevelSummaryStorage tmp;
  696. Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
  697. static_cast<unsigned long long>(f->number), c->level() + 1,
  698. static_cast<unsigned long long>(f->file_size),
  699. status.ToString().c_str(), versions_->LevelSummary(&tmp));
  700. } else {
  701. CompactionState* compact = new CompactionState(c);
  702. status = DoCompactionWork(compact);
  703. if (!status.ok()) {
  704. RecordBackgroundError(status);
  705. }
  706. CleanupCompaction(compact);
  707. c->ReleaseInputs();
  708. RemoveObsoleteFiles();
  709. }
  710. delete c;
  711. if (status.ok()) {
  712. // Done
  713. } else if (shutting_down_.load(std::memory_order_acquire)) {
  714. // Ignore compaction errors found during shutting down
  715. } else {
  716. Log(options_.info_log, "Compaction error: %s", status.ToString().c_str());
  717. }
  718. if (is_manual) {
  719. ManualCompaction* m = manual_compaction_;
  720. if (!status.ok()) {
  721. m->done = true;
  722. }
  723. if (!m->done) {
  724. // We only compacted part of the requested range. Update *m
  725. // to the range that is left to be compacted.
  726. m->tmp_storage = manual_end;
  727. m->begin = &m->tmp_storage;
  728. }
  729. manual_compaction_ = nullptr;
  730. }
  731. }
  732. void DBImpl::CleanupCompaction(CompactionState* compact) {
  733. mutex_.AssertHeld();
  734. if (compact->builder != nullptr) {
  735. // May happen if we get a shutdown call in the middle of compaction
  736. compact->builder->Abandon();
  737. delete compact->builder;
  738. } else {
  739. assert(compact->outfile == nullptr);
  740. }
  741. delete compact->outfile;
  742. for (size_t i = 0; i < compact->outputs.size(); i++) {
  743. const CompactionState::Output& out = compact->outputs[i];
  744. pending_outputs_.erase(out.number);
  745. }
  746. delete compact;
  747. }
  748. Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
  749. assert(compact != nullptr);
  750. assert(compact->builder == nullptr);
  751. uint64_t file_number;
  752. {
  753. mutex_.Lock();
  754. file_number = versions_->NewFileNumber();
  755. pending_outputs_.insert(file_number);
  756. CompactionState::Output out;
  757. out.number = file_number;
  758. out.smallest.Clear();
  759. out.largest.Clear();
  760. compact->outputs.push_back(out);
  761. mutex_.Unlock();
  762. }
  763. // Make the output file
  764. std::string fname = TableFileName(dbname_, file_number);
  765. Status s = env_->NewWritableFile(fname, &compact->outfile);
  766. if (s.ok()) {
  767. compact->builder = new TableBuilder(options_, compact->outfile);
  768. }
  769. return s;
  770. }
  771. Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
  772. Iterator* input) {
  773. assert(compact != nullptr);
  774. assert(compact->outfile != nullptr);
  775. assert(compact->builder != nullptr);
  776. const uint64_t output_number = compact->current_output()->number;
  777. assert(output_number != 0);
  778. // Check for iterator errors
  779. Status s = input->status();
  780. const uint64_t current_entries = compact->builder->NumEntries();
  781. if (s.ok()) {
  782. s = compact->builder->Finish();
  783. } else {
  784. compact->builder->Abandon();
  785. }
  786. const uint64_t current_bytes = compact->builder->FileSize();
  787. compact->current_output()->file_size = current_bytes;
  788. compact->total_bytes += current_bytes;
  789. delete compact->builder;
  790. compact->builder = nullptr;
  791. // Finish and check for file errors
  792. if (s.ok()) {
  793. s = compact->outfile->Sync();
  794. }
  795. if (s.ok()) {
  796. s = compact->outfile->Close();
  797. }
  798. delete compact->outfile;
  799. compact->outfile = nullptr;
  800. if (s.ok() && current_entries > 0) {
  801. // Verify that the table is usable
  802. Iterator* iter =
  803. table_cache_->NewIterator(ReadOptions(), output_number, current_bytes);
  804. s = iter->status();
  805. delete iter;
  806. if (s.ok()) {
  807. Log(options_.info_log, "Generated table #%llu@%d: %lld keys, %lld bytes",
  808. (unsigned long long)output_number, compact->compaction->level(),
  809. (unsigned long long)current_entries,
  810. (unsigned long long)current_bytes);
  811. }
  812. }
  813. return s;
  814. }
  815. Status DBImpl::InstallCompactionResults(CompactionState* compact) {
  816. mutex_.AssertHeld();
  817. Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
  818. compact->compaction->num_input_files(0), compact->compaction->level(),
  819. compact->compaction->num_input_files(1), compact->compaction->level() + 1,
  820. static_cast<long long>(compact->total_bytes));
  821. // Add compaction outputs
  822. compact->compaction->AddInputDeletions(compact->compaction->edit());
  823. const int level = compact->compaction->level();
  824. for (size_t i = 0; i < compact->outputs.size(); i++) {
  825. const CompactionState::Output& out = compact->outputs[i];
  826. compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
  827. out.smallest, out.largest);
  828. }
  829. return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
  830. }
  831. Status DBImpl::DoCompactionWork(CompactionState* compact) {
  832. std::cout<< "start compact" << std::endl;
  833. const uint64_t start_micros = env_->NowMicros();
  834. int64_t imm_micros = 0; // Micros spent doing imm_ compactions
  835. //TTL ToDo
  836. // 定义要检测的目标键
  837. Slice target_key = "10000";
  838. int dropped_keys_count = 0; // 初始化计数器
  839. int total_keys_count = 0;
  840. std::cout << "Level 0: ";
  841. for(int i=0;i<compact->compaction->num_input_files(0);i++) {
  842. auto f = compact->compaction->input(0, i);
  843. std::cout << f->number << " ";
  844. }
  845. Log(options_.info_log, "Compacting %d@%d + %d@%d files",
  846. compact->compaction->num_input_files(0), compact->compaction->level(),
  847. compact->compaction->num_input_files(1),
  848. compact->compaction->level() + 1);
  849. assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
  850. assert(compact->builder == nullptr);
  851. assert(compact->outfile == nullptr);
  852. //TTL ToDo
  853. // {
  854. // //MutexLock l(&mutex_);
  855. // if (has_imm_.load(std::memory_order_relaxed)) {
  856. // CompactMemTable();
  857. // background_work_finished_signal_.SignalAll();
  858. // }
  859. // }
  860. //finish modify
  861. if (snapshots_.empty()) {
  862. compact->smallest_snapshot = versions_->LastSequence();
  863. } else {
  864. compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
  865. }
  866. Iterator* input = versions_->MakeInputIterator(compact->compaction);
  867. // Release mutex while we're actually doing the compaction work
  868. mutex_.Unlock();
  869. input->SeekToFirst();
  870. std::cout << "Compation first key: " << input->key().ToString() << std::endl;
  871. Status status;
  872. ParsedInternalKey ikey;
  873. std::string current_user_key;
  874. bool has_current_user_key = false;
  875. SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
  876. while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
  877. // Prioritize immutable compaction work
  878. if (has_imm_.load(std::memory_order_relaxed)) {
  879. const uint64_t imm_start = env_->NowMicros();
  880. mutex_.Lock();
  881. if (imm_ != nullptr) {
  882. CompactMemTable();
  883. // Wake up MakeRoomForWrite() if necessary.
  884. background_work_finished_signal_.SignalAll();
  885. }
  886. mutex_.Unlock();
  887. imm_micros += (env_->NowMicros() - imm_start);
  888. }
  889. Slice key = input->key();
  890. if (compact->compaction->ShouldStopBefore(key) &&
  891. compact->builder != nullptr) {
  892. status = FinishCompactionOutputFile(compact, input);
  893. if (!status.ok()) {
  894. break;
  895. }
  896. }
  897. // Handle key/value, add to state, etc.
  898. bool drop = false;
  899. if (!ParseInternalKey(key, &ikey)) {
  900. // Do not hide error keys
  901. current_user_key.clear();
  902. has_current_user_key = false;
  903. last_sequence_for_key = kMaxSequenceNumber;
  904. } else {
  905. if (!has_current_user_key ||
  906. user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
  907. 0) {
  908. // First occurrence of this user key
  909. current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
  910. has_current_user_key = true;
  911. last_sequence_for_key = kMaxSequenceNumber;
  912. }
  913. if (last_sequence_for_key <= compact->smallest_snapshot) {
  914. // Hidden by an newer entry for same user key
  915. drop = true; // (A)
  916. } else if (ikey.type == kTypeDeletion &&
  917. ikey.sequence <= compact->smallest_snapshot &&
  918. compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
  919. // For this user key:
  920. // (1) there is no data in higher levels
  921. // (2) data in lower levels will have larger sequence numbers
  922. // (3) data in layers that are being compacted here and have
  923. // smaller sequence numbers will be dropped in the next
  924. // few iterations of this loop (by rule (A) above).
  925. // Therefore this deletion marker is obsolete and can be dropped.
  926. drop = true;
  927. }
  928. // TTL ToDo: add expiration time check
  929. // 检查是否为目标键
  930. if (key == target_key) {
  931. // 输出调试信息
  932. Log(options_.info_log, "Found target key during compaction: %s\n", key.ToString().c_str());
  933. }
  934. Slice value = input->value();
  935. if (value.size() >= sizeof(uint64_t)) {
  936. const char* ptr = value.data();
  937. uint64_t expiration_time = DecodeFixed64(ptr);
  938. uint64_t current_time = env_->NowMicros() / 1000000;
  939. if (current_time > expiration_time) {
  940. drop = true; // 过期的键值对,标记为丢弃
  941. dropped_keys_count ++; // 初始化计数器
  942. }else{
  943. bool flag = current_time > expiration_time;
  944. }
  945. }else{
  946. bool bs = value.size() >= sizeof(uint64_t);
  947. }
  948. // if (!drop) { // 如果还未被标记为丢弃
  949. // Slice value = input->value();
  950. // if (value.size() >= sizeof(uint64_t)) {
  951. // const char* ptr = value.data();
  952. // uint64_t expiration_time = DecodeFixed64(ptr);
  953. // uint64_t current_time = env_->NowMicros() / 1000000;
  954. // if (current_time > expiration_time) {
  955. // drop = true; // 过期的键值对,标记为丢弃
  956. // }
  957. // }
  958. // }
  959. last_sequence_for_key = ikey.sequence;
  960. }
  961. input->SeekToLast();
  962. std::cout << "Compation last key: " << input->key().ToString() << std::endl;
  963. Log(options_.info_log, "Total dropped keys in compaction: %d\n", dropped_keys_count); // 输出统计结果
  964. total_keys_count++;
  965. #if 0
  966. Log(options_.info_log,
  967. " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
  968. "%d smallest_snapshot: %d",
  969. ikey.user_key.ToString().c_str(),
  970. (int)ikey.sequence, ikey.type, kTypeValue, drop,
  971. compact->compaction->IsBaseLevelForKey(ikey.user_key),
  972. (int)last_sequence_for_key, (int)compact->smallest_snapshot);
  973. #endif
  974. if (!drop) {
  975. // Open output file if necessary
  976. if (compact->builder == nullptr) {
  977. status = OpenCompactionOutputFile(compact);
  978. if (!status.ok()) {
  979. break;
  980. }
  981. }
  982. if (compact->builder->NumEntries() == 0) {
  983. compact->current_output()->smallest.DecodeFrom(key);
  984. }
  985. compact->current_output()->largest.DecodeFrom(key);
  986. compact->builder->Add(key, input->value());
  987. // Close output file if it is big enough
  988. if (compact->builder->FileSize() >=
  989. compact->compaction->MaxOutputFileSize()) {
  990. status = FinishCompactionOutputFile(compact, input);
  991. if (!status.ok()) {
  992. break;
  993. }
  994. }
  995. }
  996. input->Next();
  997. }
  998. std::cout << "Total dropped keys in compaction:" << dropped_keys_count
  999. << ", Total: " << total_keys_count << "\n";
  1000. if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
  1001. status = Status::IOError("Deleting DB during compaction");
  1002. }
  1003. if (status.ok() && compact->builder != nullptr) {
  1004. status = FinishCompactionOutputFile(compact, input);
  1005. }
  1006. if (status.ok()) {
  1007. status = input->status();
  1008. }
  1009. delete input;
  1010. input = nullptr;
  1011. CompactionStats stats;
  1012. stats.micros = env_->NowMicros() - start_micros - imm_micros;
  1013. for (int which = 0; which < 2; which++) {
  1014. for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
  1015. stats.bytes_read += compact->compaction->input(which, i)->file_size;
  1016. }
  1017. }
  1018. for (size_t i = 0; i < compact->outputs.size(); i++) {
  1019. stats.bytes_written += compact->outputs[i].file_size;
  1020. }
  1021. mutex_.Lock();
  1022. stats_[compact->compaction->level() + 1].Add(stats);
  1023. if (status.ok()) {
  1024. status = InstallCompactionResults(compact);
  1025. }
  1026. if (!status.ok()) {
  1027. RecordBackgroundError(status);
  1028. }
  1029. VersionSet::LevelSummaryStorage tmp;
  1030. Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));
  1031. return status;
  1032. }
  1033. namespace {
  1034. struct IterState {
  1035. port::Mutex* const mu;
  1036. Version* const version GUARDED_BY(mu);
  1037. MemTable* const mem GUARDED_BY(mu);
  1038. MemTable* const imm GUARDED_BY(mu);
  1039. IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version)
  1040. : mu(mutex), version(version), mem(mem), imm(imm) {}
  1041. };
  1042. static void CleanupIteratorState(void* arg1, void* arg2) {
  1043. IterState* state = reinterpret_cast<IterState*>(arg1);
  1044. state->mu->Lock();
  1045. state->mem->Unref();
  1046. if (state->imm != nullptr) state->imm->Unref();
  1047. state->version->Unref();
  1048. state->mu->Unlock();
  1049. delete state;
  1050. }
  1051. } // anonymous namespace
  1052. Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
  1053. SequenceNumber* latest_snapshot,
  1054. uint32_t* seed) {
  1055. mutex_.Lock();
  1056. *latest_snapshot = versions_->LastSequence();
  1057. // Collect together all needed child iterators
  1058. std::vector<Iterator*> list;
  1059. list.push_back(mem_->NewIterator());
  1060. mem_->Ref();
  1061. if (imm_ != nullptr) {
  1062. list.push_back(imm_->NewIterator());
  1063. imm_->Ref();
  1064. }
  1065. versions_->current()->AddIterators(options, &list);
  1066. Iterator* internal_iter =
  1067. NewMergingIterator(&internal_comparator_, &list[0], list.size());
  1068. versions_->current()->Ref();
  1069. IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current());
  1070. internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
  1071. *seed = ++seed_;
  1072. mutex_.Unlock();
  1073. return internal_iter;
  1074. }
  1075. Iterator* DBImpl::TEST_NewInternalIterator() {
  1076. SequenceNumber ignored;
  1077. uint32_t ignored_seed;
  1078. return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
  1079. }
  1080. int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
  1081. MutexLock l(&mutex_);
  1082. return versions_->MaxNextLevelOverlappingBytes();
  1083. }
  1084. Status DBImpl::Get(const ReadOptions& options, const Slice& key,
  1085. std::string* value) {
  1086. Status s;
  1087. MutexLock l(&mutex_);
  1088. SequenceNumber snapshot;
  1089. if (options.snapshot != nullptr) {
  1090. snapshot =
  1091. static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
  1092. } else {
  1093. snapshot = versions_->LastSequence();
  1094. }
  1095. MemTable* mem = mem_;
  1096. MemTable* imm = imm_;
  1097. Version* current = versions_->current();
  1098. mem->Ref();
  1099. if (imm != nullptr) imm->Ref();
  1100. current->Ref();
  1101. bool have_stat_update = false;
  1102. Version::GetStats stats;
  1103. // Unlock while reading from files and memtables
  1104. {
  1105. mutex_.Unlock();
  1106. // First look in the memtable, then in the immutable memtable (if any).
  1107. LookupKey lkey(key, snapshot);
  1108. if (mem->Get(lkey, value, &s)) {
  1109. // Done
  1110. } else if (imm != nullptr && imm->Get(lkey, value, &s)) {
  1111. // Done
  1112. } else {
  1113. s = current->Get(options, lkey, value, &stats);
  1114. have_stat_update = true;
  1115. }
  1116. mutex_.Lock();
  1117. }
  1118. // TTL ToDo : add check for TTL
  1119. // 如果从 memtable、imm 或 sstable 获取到了数据,则需要检查TTL
  1120. if (s.ok()) {
  1121. // 从 value 中解析出过期时间戳(假设值存储格式为:[过期时间戳][实际值])
  1122. uint64_t expiration_time = ParseExpirationTime(*value);
  1123. uint64_t current_time = GetCurrentTime();
  1124. // 如果当前时间已经超过过期时间,则认为数据过期,返回 NotFound
  1125. if (current_time >= expiration_time) {
  1126. s = Status::NotFound(Slice());
  1127. } else {
  1128. // 数据未过期,解析出实际的值
  1129. *value = ParseActualValue(*value);
  1130. }
  1131. }
  1132. // //finish modify
  1133. if (have_stat_update && current->UpdateStats(stats)) {
  1134. MaybeScheduleCompaction();
  1135. }
  1136. mem->Unref();
  1137. if (imm != nullptr) imm->Unref();
  1138. current->Unref();
  1139. return s;
  1140. }
  1141. Iterator* DBImpl::NewIterator(const ReadOptions& options) {
  1142. SequenceNumber latest_snapshot;
  1143. uint32_t seed;
  1144. Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
  1145. return NewDBIterator(this, user_comparator(), iter,
  1146. (options.snapshot != nullptr
  1147. ? static_cast<const SnapshotImpl*>(options.snapshot)
  1148. ->sequence_number()
  1149. : latest_snapshot),
  1150. seed);
  1151. }
  1152. void DBImpl::RecordReadSample(Slice key) {
  1153. MutexLock l(&mutex_);
  1154. if (versions_->current()->RecordReadSample(key)) {
  1155. MaybeScheduleCompaction();
  1156. }
  1157. }
  1158. const Snapshot* DBImpl::GetSnapshot() {
  1159. MutexLock l(&mutex_);
  1160. return snapshots_.New(versions_->LastSequence());
  1161. }
  1162. void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
  1163. MutexLock l(&mutex_);
  1164. snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot));
  1165. }
  1166. // Convenience methods
  1167. Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
  1168. return DB::Put(o, key, val);
  1169. }
  1170. // TTL ToDo: add DBImpl for Put
  1171. // 新增支持TTL的Put方法
  1172. Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val, uint64_t ttl) {
  1173. return DB::Put(o, key, val, ttl);
  1174. }
  1175. //finish modify
  1176. Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
  1177. return DB::Delete(options, key);
  1178. }
  1179. Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
  1180. Writer w(&mutex_);
  1181. w.batch = updates;
  1182. w.sync = options.sync;
  1183. w.done = false;
  1184. MutexLock l(&mutex_);
  1185. writers_.push_back(&w);
  1186. while (!w.done && &w != writers_.front()) {
  1187. w.cv.Wait();
  1188. }
  1189. if (w.done) {
  1190. return w.status;
  1191. }
  1192. // May temporarily unlock and wait.
  1193. Status status = MakeRoomForWrite(updates == nullptr);
  1194. uint64_t last_sequence = versions_->LastSequence();
  1195. Writer* last_writer = &w;
  1196. if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
  1197. WriteBatch* write_batch = BuildBatchGroup(&last_writer);
  1198. WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
  1199. last_sequence += WriteBatchInternal::Count(write_batch);
  1200. // Add to log and apply to memtable. We can release the lock
  1201. // during this phase since &w is currently responsible for logging
  1202. // and protects against concurrent loggers and concurrent writes
  1203. // into mem_.
  1204. {
  1205. mutex_.Unlock();
  1206. status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
  1207. bool sync_error = false;
  1208. if (status.ok() && options.sync) {
  1209. status = logfile_->Sync();
  1210. if (!status.ok()) {
  1211. sync_error = true;
  1212. }
  1213. }
  1214. if (status.ok()) {
  1215. status = WriteBatchInternal::InsertInto(write_batch, mem_);
  1216. }
  1217. mutex_.Lock();
  1218. if (sync_error) {
  1219. // The state of the log file is indeterminate: the log record we
  1220. // just added may or may not show up when the DB is re-opened.
  1221. // So we force the DB into a mode where all future writes fail.
  1222. RecordBackgroundError(status);
  1223. }
  1224. }
  1225. if (write_batch == tmp_batch_) tmp_batch_->Clear();
  1226. versions_->SetLastSequence(last_sequence);
  1227. }
  1228. while (true) {
  1229. Writer* ready = writers_.front();
  1230. writers_.pop_front();
  1231. if (ready != &w) {
  1232. ready->status = status;
  1233. ready->done = true;
  1234. ready->cv.Signal();
  1235. }
  1236. if (ready == last_writer) break;
  1237. }
  1238. // Notify new head of write queue
  1239. if (!writers_.empty()) {
  1240. writers_.front()->cv.Signal();
  1241. }
  1242. return status;
  1243. }
  1244. // REQUIRES: Writer list must be non-empty
  1245. // REQUIRES: First writer must have a non-null batch
  1246. WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
  1247. mutex_.AssertHeld();
  1248. assert(!writers_.empty());
  1249. Writer* first = writers_.front();
  1250. WriteBatch* result = first->batch;
  1251. assert(result != nullptr);
  1252. size_t size = WriteBatchInternal::ByteSize(first->batch);
  1253. // Allow the group to grow up to a maximum size, but if the
  1254. // original write is small, limit the growth so we do not slow
  1255. // down the small write too much.
  1256. size_t max_size = 1 << 20;
  1257. if (size <= (128 << 10)) {
  1258. max_size = size + (128 << 10);
  1259. }
  1260. *last_writer = first;
  1261. std::deque<Writer*>::iterator iter = writers_.begin();
  1262. ++iter; // Advance past "first"
  1263. for (; iter != writers_.end(); ++iter) {
  1264. Writer* w = *iter;
  1265. if (w->sync && !first->sync) {
  1266. // Do not include a sync write into a batch handled by a non-sync write.
  1267. break;
  1268. }
  1269. if (w->batch != nullptr) {
  1270. size += WriteBatchInternal::ByteSize(w->batch);
  1271. if (size > max_size) {
  1272. // Do not make batch too big
  1273. break;
  1274. }
  1275. // Append to *result
  1276. if (result == first->batch) {
  1277. // Switch to temporary batch instead of disturbing caller's batch
  1278. result = tmp_batch_;
  1279. assert(WriteBatchInternal::Count(result) == 0);
  1280. WriteBatchInternal::Append(result, first->batch);
  1281. }
  1282. WriteBatchInternal::Append(result, w->batch);
  1283. }
  1284. *last_writer = w;
  1285. }
  1286. return result;
  1287. }
  1288. // REQUIRES: mutex_ is held
  1289. // REQUIRES: this thread is currently at the front of the writer queue
  1290. Status DBImpl::MakeRoomForWrite(bool force) {
  1291. mutex_.AssertHeld();
  1292. assert(!writers_.empty());
  1293. bool allow_delay = !force;
  1294. Status s;
  1295. while (true) {
  1296. if (!bg_error_.ok()) {
  1297. // Yield previous error
  1298. s = bg_error_;
  1299. break;
  1300. } else if (allow_delay && versions_->NumLevelFiles(0) >=
  1301. config::kL0_SlowdownWritesTrigger) {
  1302. // We are getting close to hitting a hard limit on the number of
  1303. // L0 files. Rather than delaying a single write by several
  1304. // seconds when we hit the hard limit, start delaying each
  1305. // individual write by 1ms to reduce latency variance. Also,
  1306. // this delay hands over some CPU to the compaction thread in
  1307. // case it is sharing the same core as the writer.
  1308. mutex_.Unlock();
  1309. env_->SleepForMicroseconds(1000);
  1310. allow_delay = false; // Do not delay a single write more than once
  1311. mutex_.Lock();
  1312. } else if (!force &&
  1313. (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
  1314. // There is room in current memtable
  1315. break;
  1316. } else if (imm_ != nullptr) {
  1317. // We have filled up the current memtable, but the previous
  1318. // one is still being compacted, so we wait.
  1319. Log(options_.info_log, "Current memtable full; waiting...\n");
  1320. background_work_finished_signal_.Wait();
  1321. } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
  1322. // There are too many level-0 files.
  1323. Log(options_.info_log, "Too many L0 files; waiting...\n");
  1324. background_work_finished_signal_.Wait();
  1325. } else {
  1326. // Attempt to switch to a new memtable and trigger compaction of old
  1327. assert(versions_->PrevLogNumber() == 0);
  1328. uint64_t new_log_number = versions_->NewFileNumber();
  1329. WritableFile* lfile = nullptr;
  1330. s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
  1331. if (!s.ok()) {
  1332. // Avoid chewing through file number space in a tight loop.
  1333. versions_->ReuseFileNumber(new_log_number);
  1334. break;
  1335. }
  1336. delete log_;
  1337. s = logfile_->Close();
  1338. if (!s.ok()) {
  1339. // We may have lost some data written to the previous log file.
  1340. // Switch to the new log file anyway, but record as a background
  1341. // error so we do not attempt any more writes.
  1342. //
  1343. // We could perhaps attempt to save the memtable corresponding
  1344. // to log file and suppress the error if that works, but that
  1345. // would add more complexity in a critical code path.
  1346. RecordBackgroundError(s);
  1347. }
  1348. delete logfile_;
  1349. logfile_ = lfile;
  1350. logfile_number_ = new_log_number;
  1351. log_ = new log::Writer(lfile);
  1352. imm_ = mem_;
  1353. has_imm_.store(true, std::memory_order_release);
  1354. mem_ = new MemTable(internal_comparator_);
  1355. mem_->Ref();
  1356. force = false; // Do not force another compaction if have room
  1357. MaybeScheduleCompaction();
  1358. }
  1359. }
  1360. return s;
  1361. }
  1362. bool DBImpl::GetProperty(const Slice& property, std::string* value) {
  1363. value->clear();
  1364. MutexLock l(&mutex_);
  1365. Slice in = property;
  1366. Slice prefix("leveldb.");
  1367. if (!in.starts_with(prefix)) return false;
  1368. in.remove_prefix(prefix.size());
  1369. if (in.starts_with("num-files-at-level")) {
  1370. in.remove_prefix(strlen("num-files-at-level"));
  1371. uint64_t level;
  1372. bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
  1373. if (!ok || level >= config::kNumLevels) {
  1374. return false;
  1375. } else {
  1376. char buf[100];
  1377. std::snprintf(buf, sizeof(buf), "%d",
  1378. versions_->NumLevelFiles(static_cast<int>(level)));
  1379. *value = buf;
  1380. return true;
  1381. }
  1382. } else if (in == "stats") {
  1383. char buf[200];
  1384. std::snprintf(buf, sizeof(buf),
  1385. " Compactions\n"
  1386. "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
  1387. "--------------------------------------------------\n");
  1388. value->append(buf);
  1389. for (int level = 0; level < config::kNumLevels; level++) {
  1390. int files = versions_->NumLevelFiles(level);
  1391. if (stats_[level].micros > 0 || files > 0) {
  1392. std::snprintf(buf, sizeof(buf), "%3d %8d %8.0f %9.0f %8.0f %9.0f\n",
  1393. level, files, versions_->NumLevelBytes(level) / 1048576.0,
  1394. stats_[level].micros / 1e6,
  1395. stats_[level].bytes_read / 1048576.0,
  1396. stats_[level].bytes_written / 1048576.0);
  1397. value->append(buf);
  1398. }
  1399. }
  1400. return true;
  1401. } else if (in == "sstables") {
  1402. *value = versions_->current()->DebugString();
  1403. return true;
  1404. } else if (in == "approximate-memory-usage") {
  1405. size_t total_usage = options_.block_cache->TotalCharge();
  1406. if (mem_) {
  1407. total_usage += mem_->ApproximateMemoryUsage();
  1408. }
  1409. if (imm_) {
  1410. total_usage += imm_->ApproximateMemoryUsage();
  1411. }
  1412. char buf[50];
  1413. std::snprintf(buf, sizeof(buf), "%llu",
  1414. static_cast<unsigned long long>(total_usage));
  1415. value->append(buf);
  1416. return true;
  1417. }
  1418. return false;
  1419. }
  1420. void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
  1421. // TODO(opt): better implementation
  1422. MutexLock l(&mutex_);
  1423. Version* v = versions_->current();
  1424. v->Ref();
  1425. for (int i = 0; i < n; i++) {
  1426. // Convert user_key into a corresponding internal key.
  1427. InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
  1428. InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
  1429. uint64_t start = versions_->ApproximateOffsetOf(v, k1);
  1430. uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
  1431. sizes[i] = (limit >= start ? limit - start : 0);
  1432. }
  1433. v->Unref();
  1434. }
  1435. // Default implementations of convenience methods that subclasses of DB
  1436. // can call if they wish
  1437. Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  1438. WriteBatch batch;
  1439. batch.Put(key, value);
  1440. return Write(opt, &batch);
  1441. }
  1442. //TTL ToDo: add a func for TTL Put
  1443. Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value, uint64_t ttl) {
  1444. // 获取当前时间并计算过期时间戳
  1445. uint64_t expiration_time = GetCurrentTime() + ttl;
  1446. // 将过期时间戳和值一起存储(假设值前面附加过期时间戳)
  1447. std::string new_value;
  1448. AppendExpirationTime(&new_value, expiration_time);
  1449. new_value.append(value.data(), value.size());
  1450. // 构造 WriteBatch,并将键值对加入到批处理中
  1451. WriteBatch batch;
  1452. batch.Put(key, new_value);
  1453. // 执行写操作
  1454. return Write(opt, &batch);
  1455. }
  1456. //finish modify
  1457. Status DB::Delete(const WriteOptions& opt, const Slice& key) {
  1458. WriteBatch batch;
  1459. batch.Delete(key);
  1460. return Write(opt, &batch);
  1461. }
  1462. DB::~DB() = default;
  1463. Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
  1464. *dbptr = nullptr;
  1465. DBImpl* impl = new DBImpl(options, dbname);
  1466. impl->mutex_.Lock();
  1467. VersionEdit edit;
  1468. // Recover handles create_if_missing, error_if_exists
  1469. bool save_manifest = false;
  1470. Status s = impl->Recover(&edit, &save_manifest);
  1471. if (s.ok() && impl->mem_ == nullptr) {
  1472. // Create new log and a corresponding memtable.
  1473. uint64_t new_log_number = impl->versions_->NewFileNumber();
  1474. WritableFile* lfile;
  1475. s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
  1476. &lfile);
  1477. if (s.ok()) {
  1478. edit.SetLogNumber(new_log_number);
  1479. impl->logfile_ = lfile;
  1480. impl->logfile_number_ = new_log_number;
  1481. impl->log_ = new log::Writer(lfile);
  1482. impl->mem_ = new MemTable(impl->internal_comparator_);
  1483. impl->mem_->Ref();
  1484. }
  1485. }
  1486. if (s.ok() && save_manifest) {
  1487. edit.SetPrevLogNumber(0); // No older logs needed after recovery.
  1488. edit.SetLogNumber(impl->logfile_number_);
  1489. s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
  1490. }
  1491. if (s.ok()) {
  1492. impl->RemoveObsoleteFiles();
  1493. impl->MaybeScheduleCompaction();
  1494. }
  1495. impl->mutex_.Unlock();
  1496. if (s.ok()) {
  1497. assert(impl->mem_ != nullptr);
  1498. *dbptr = impl;
  1499. } else {
  1500. delete impl;
  1501. }
  1502. return s;
  1503. }
  1504. Snapshot::~Snapshot() = default;
  1505. Status DestroyDB(const std::string& dbname, const Options& options) {
  1506. Env* env = options.env;
  1507. std::vector<std::string> filenames;
  1508. Status result = env->GetChildren(dbname, &filenames);
  1509. if (!result.ok()) {
  1510. // Ignore error in case directory does not exist
  1511. return Status::OK();
  1512. }
  1513. FileLock* lock;
  1514. const std::string lockname = LockFileName(dbname);
  1515. result = env->LockFile(lockname, &lock);
  1516. if (result.ok()) {
  1517. uint64_t number;
  1518. FileType type;
  1519. for (size_t i = 0; i < filenames.size(); i++) {
  1520. if (ParseFileName(filenames[i], &number, &type) &&
  1521. type != kDBLockFile) { // Lock file will be deleted at end
  1522. Status del = env->RemoveFile(dbname + "/" + filenames[i]);
  1523. if (result.ok() && !del.ok()) {
  1524. result = del;
  1525. }
  1526. }
  1527. }
  1528. env->UnlockFile(lock); // Ignore error since state is already gone
  1529. env->RemoveFile(lockname);
  1530. env->RemoveDir(dbname); // Ignore error in case dir contains other files
  1531. }
  1532. return result;
  1533. }
  1534. } // namespace leveldb