LevelDB二级索引实现 姚凯文(kevinyao0901) 姜嘉祺
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

926 lines
27 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include <dirent.h>
  5. #include <fcntl.h>
  6. #include <sys/mman.h>
  7. #ifndef __Fuchsia__
  8. #include <sys/resource.h>
  9. #endif
  10. #include <sys/stat.h>
  11. #include <sys/time.h>
  12. #include <sys/types.h>
  13. #include <unistd.h>
  14. #include <atomic>
  15. #include <cerrno>
  16. #include <cstddef>
  17. #include <cstdint>
  18. #include <cstdio>
  19. #include <cstdlib>
  20. #include <cstring>
  21. #include <limits>
  22. #include <queue>
  23. #include <set>
  24. #include <string>
  25. #include <thread>
  26. #include <type_traits>
  27. #include <utility>
  28. #include "leveldb/env.h"
  29. #include "leveldb/slice.h"
  30. #include "leveldb/status.h"
  31. #include "port/port.h"
  32. #include "port/thread_annotations.h"
  33. #include "util/env_posix_test_helper.h"
  34. #include "util/posix_logger.h"
  35. namespace leveldb {
  36. namespace {
  37. // Set by EnvPosixTestHelper::SetReadOnlyMMapLimit() and MaxOpenFiles().
  38. int g_open_read_only_file_limit = -1;
  39. // Up to 1000 mmap regions for 64-bit binaries; none for 32-bit.
  40. constexpr const int kDefaultMmapLimit = (sizeof(void*) >= 8) ? 1000 : 0;
  41. // Can be set using EnvPosixTestHelper::SetReadOnlyMMapLimit().
  42. int g_mmap_limit = kDefaultMmapLimit;
  43. // Common flags defined for all posix open operations
  44. #if defined(HAVE_O_CLOEXEC)
  45. constexpr const int kOpenBaseFlags = O_CLOEXEC;
  46. #else
  47. constexpr const int kOpenBaseFlags = 0;
  48. #endif // defined(HAVE_O_CLOEXEC)
  49. constexpr const size_t kWritableFileBufferSize = 65536;
  50. Status PosixError(const std::string& context, int error_number) {
  51. if (error_number == ENOENT) {
  52. return Status::NotFound(context, std::strerror(error_number));
  53. } else {
  54. return Status::IOError(context, std::strerror(error_number));
  55. }
  56. }
  57. // Helper class to limit resource usage to avoid exhaustion.
  58. // Currently used to limit read-only file descriptors and mmap file usage
  59. // so that we do not run out of file descriptors or virtual memory, or run into
  60. // kernel performance problems for very large databases.
  61. class Limiter {
  62. public:
  63. // Limit maximum number of resources to |max_acquires|.
  64. Limiter(int max_acquires)
  65. :
  66. #if !defined(NDEBUG)
  67. max_acquires_(max_acquires),
  68. #endif // !defined(NDEBUG)
  69. acquires_allowed_(max_acquires) {
  70. assert(max_acquires >= 0);
  71. }
  72. Limiter(const Limiter&) = delete;
  73. Limiter operator=(const Limiter&) = delete;
  74. // If another resource is available, acquire it and return true.
  75. // Else return false.
  76. bool Acquire() {
  77. int old_acquires_allowed =
  78. acquires_allowed_.fetch_sub(1, std::memory_order_relaxed);
  79. if (old_acquires_allowed > 0) return true;
  80. int pre_increment_acquires_allowed =
  81. acquires_allowed_.fetch_add(1, std::memory_order_relaxed);
  82. // Silence compiler warnings about unused arguments when NDEBUG is defined.
  83. (void)pre_increment_acquires_allowed;
  84. // If the check below fails, Release() was called more times than acquire.
  85. assert(pre_increment_acquires_allowed < max_acquires_);
  86. return false;
  87. }
  88. // Release a resource acquired by a previous call to Acquire() that returned
  89. // true.
  90. void Release() {
  91. int old_acquires_allowed =
  92. acquires_allowed_.fetch_add(1, std::memory_order_relaxed);
  93. // Silence compiler warnings about unused arguments when NDEBUG is defined.
  94. (void)old_acquires_allowed;
  95. // If the check below fails, Release() was called more times than acquire.
  96. assert(old_acquires_allowed < max_acquires_);
  97. }
  98. private:
  99. #if !defined(NDEBUG)
  100. // Catches an excessive number of Release() calls.
  101. const int max_acquires_;
  102. #endif // !defined(NDEBUG)
  103. // The number of available resources.
  104. //
  105. // This is a counter and is not tied to the invariants of any other class, so
  106. // it can be operated on safely using std::memory_order_relaxed.
  107. std::atomic<int> acquires_allowed_;
  108. };
  109. // Implements sequential read access in a file using read().
  110. //
  111. // Instances of this class are thread-friendly but not thread-safe, as required
  112. // by the SequentialFile API.
  113. class PosixSequentialFile final : public SequentialFile {
  114. public:
  115. PosixSequentialFile(std::string filename, int fd)
  116. : fd_(fd), filename_(std::move(filename)) {}
  117. ~PosixSequentialFile() override { close(fd_); }
  118. Status Read(size_t n, Slice* result, char* scratch) override {
  119. Status status;
  120. while (true) {
  121. ::ssize_t read_size = ::read(fd_, scratch, n);
  122. if (read_size < 0) { // Read error.
  123. if (errno == EINTR) {
  124. continue; // Retry
  125. }
  126. status = PosixError(filename_, errno);
  127. break;
  128. }
  129. *result = Slice(scratch, read_size);
  130. break;
  131. }
  132. return status;
  133. }
  134. Status Skip(uint64_t n) override {
  135. if (::lseek(fd_, n, SEEK_CUR) == static_cast<off_t>(-1)) {
  136. return PosixError(filename_, errno);
  137. }
  138. return Status::OK();
  139. }
  140. private:
  141. const int fd_;
  142. const std::string filename_;
  143. };
  144. // Implements random read access in a file using pread().
  145. //
  146. // Instances of this class are thread-safe, as required by the RandomAccessFile
  147. // API. Instances are immutable and Read() only calls thread-safe library
  148. // functions.
  149. class PosixRandomAccessFile final : public RandomAccessFile {
  150. public:
  151. // The new instance takes ownership of |fd|. |fd_limiter| must outlive this
  152. // instance, and will be used to determine if .
  153. PosixRandomAccessFile(std::string filename, int fd, Limiter* fd_limiter)
  154. : has_permanent_fd_(fd_limiter->Acquire()),
  155. fd_(has_permanent_fd_ ? fd : -1),
  156. fd_limiter_(fd_limiter),
  157. filename_(std::move(filename)) {
  158. if (!has_permanent_fd_) {
  159. assert(fd_ == -1);
  160. ::close(fd); // The file will be opened on every read.
  161. }
  162. }
  163. ~PosixRandomAccessFile() override {
  164. if (has_permanent_fd_) {
  165. assert(fd_ != -1);
  166. ::close(fd_);
  167. fd_limiter_->Release();
  168. }
  169. }
  170. Status Read(uint64_t offset, size_t n, Slice* result,
  171. char* scratch) const override {
  172. int fd = fd_;
  173. if (!has_permanent_fd_) {
  174. fd = ::open(filename_.c_str(), O_RDONLY | kOpenBaseFlags);
  175. if (fd < 0) {
  176. return PosixError(filename_, errno);
  177. }
  178. }
  179. assert(fd != -1);
  180. Status status;
  181. ssize_t read_size = ::pread(fd, scratch, n, static_cast<off_t>(offset));
  182. *result = Slice(scratch, (read_size < 0) ? 0 : read_size);
  183. if (read_size < 0) {
  184. // An error: return a non-ok status.
  185. status = PosixError(filename_, errno);
  186. }
  187. if (!has_permanent_fd_) {
  188. // Close the temporary file descriptor opened earlier.
  189. assert(fd != fd_);
  190. ::close(fd);
  191. }
  192. return status;
  193. }
  194. private:
  195. const bool has_permanent_fd_; // If false, the file is opened on every read.
  196. const int fd_; // -1 if has_permanent_fd_ is false.
  197. Limiter* const fd_limiter_;
  198. const std::string filename_;
  199. };
  200. // Implements random read access in a file using mmap().
  201. //
  202. // Instances of this class are thread-safe, as required by the RandomAccessFile
  203. // API. Instances are immutable and Read() only calls thread-safe library
  204. // functions.
  205. class PosixMmapReadableFile final : public RandomAccessFile {
  206. public:
  207. // mmap_base[0, length-1] points to the memory-mapped contents of the file. It
  208. // must be the result of a successful call to mmap(). This instances takes
  209. // over the ownership of the region.
  210. //
  211. // |mmap_limiter| must outlive this instance. The caller must have already
  212. // acquired the right to use one mmap region, which will be released when this
  213. // instance is destroyed.
  214. PosixMmapReadableFile(std::string filename, char* mmap_base, size_t length,
  215. Limiter* mmap_limiter)
  216. : mmap_base_(mmap_base),
  217. length_(length),
  218. mmap_limiter_(mmap_limiter),
  219. filename_(std::move(filename)) {}
  220. ~PosixMmapReadableFile() override {
  221. ::munmap(static_cast<void*>(mmap_base_), length_);
  222. mmap_limiter_->Release();
  223. }
  224. Status Read(uint64_t offset, size_t n, Slice* result,
  225. char* scratch) const override {
  226. if (offset + n > length_) {
  227. *result = Slice();
  228. return PosixError(filename_, EINVAL);
  229. }
  230. *result = Slice(mmap_base_ + offset, n);
  231. return Status::OK();
  232. }
  233. private:
  234. char* const mmap_base_;
  235. const size_t length_;
  236. Limiter* const mmap_limiter_;
  237. const std::string filename_;
  238. };
  239. class PosixWritableFile final : public WritableFile {
  240. public:
  241. PosixWritableFile(std::string filename, int fd)
  242. : pos_(0),
  243. fd_(fd),
  244. is_manifest_(IsManifest(filename)),
  245. filename_(std::move(filename)),
  246. dirname_(Dirname(filename_)) {}
  247. ~PosixWritableFile() override {
  248. if (fd_ >= 0) {
  249. // Ignoring any potential errors
  250. Close();
  251. }
  252. }
  253. Status Append(const Slice& data) override {
  254. size_t write_size = data.size();
  255. const char* write_data = data.data();
  256. // Fit as much as possible into buffer.
  257. size_t copy_size = std::min(write_size, kWritableFileBufferSize - pos_);
  258. std::memcpy(buf_ + pos_, write_data, copy_size);
  259. write_data += copy_size;
  260. write_size -= copy_size;
  261. pos_ += copy_size;
  262. if (write_size == 0) {
  263. return Status::OK();
  264. }
  265. // Can't fit in buffer, so need to do at least one write.
  266. Status status = FlushBuffer();
  267. if (!status.ok()) {
  268. return status;
  269. }
  270. // Small writes go to buffer, large writes are written directly.
  271. if (write_size < kWritableFileBufferSize) {
  272. std::memcpy(buf_, write_data, write_size);
  273. pos_ = write_size;
  274. return Status::OK();
  275. }
  276. return WriteUnbuffered(write_data, write_size);
  277. }
  278. Status Close() override {
  279. Status status = FlushBuffer();
  280. const int close_result = ::close(fd_);
  281. if (close_result < 0 && status.ok()) {
  282. status = PosixError(filename_, errno);
  283. }
  284. fd_ = -1;
  285. return status;
  286. }
  287. Status Flush() override { return FlushBuffer(); }
  288. Status Sync() override {
  289. // Ensure new files referred to by the manifest are in the filesystem.
  290. //
  291. // This needs to happen before the manifest file is flushed to disk, to
  292. // avoid crashing in a state where the manifest refers to files that are not
  293. // yet on disk.
  294. Status status = SyncDirIfManifest();
  295. if (!status.ok()) {
  296. return status;
  297. }
  298. status = FlushBuffer();
  299. if (!status.ok()) {
  300. return status;
  301. }
  302. return SyncFd(fd_, filename_);
  303. }
  304. private:
  305. Status FlushBuffer() {
  306. Status status = WriteUnbuffered(buf_, pos_);
  307. pos_ = 0;
  308. return status;
  309. }
  310. Status WriteUnbuffered(const char* data, size_t size) {
  311. while (size > 0) {
  312. ssize_t write_result = ::write(fd_, data, size);
  313. if (write_result < 0) {
  314. if (errno == EINTR) {
  315. continue; // Retry
  316. }
  317. return PosixError(filename_, errno);
  318. }
  319. data += write_result;
  320. size -= write_result;
  321. }
  322. return Status::OK();
  323. }
  324. Status SyncDirIfManifest() {
  325. Status status;
  326. if (!is_manifest_) {
  327. return status;
  328. }
  329. int fd = ::open(dirname_.c_str(), O_RDONLY | kOpenBaseFlags);
  330. if (fd < 0) {
  331. status = PosixError(dirname_, errno);
  332. } else {
  333. status = SyncFd(fd, dirname_);
  334. ::close(fd);
  335. }
  336. return status;
  337. }
  338. // Ensures that all the caches associated with the given file descriptor's
  339. // data are flushed all the way to durable media, and can withstand power
  340. // failures.
  341. //
  342. // The path argument is only used to populate the description string in the
  343. // returned Status if an error occurs.
  344. static Status SyncFd(int fd, const std::string& fd_path) {
  345. #if HAVE_FULLFSYNC
  346. // On macOS and iOS, fsync() doesn't guarantee durability past power
  347. // failures. fcntl(F_FULLFSYNC) is required for that purpose. Some
  348. // filesystems don't support fcntl(F_FULLFSYNC), and require a fallback to
  349. // fsync().
  350. if (::fcntl(fd, F_FULLFSYNC) == 0) {
  351. return Status::OK();
  352. }
  353. #endif // HAVE_FULLFSYNC
  354. #if HAVE_FDATASYNC
  355. bool sync_success = ::fdatasync(fd) == 0;
  356. #else
  357. bool sync_success = ::fsync(fd) == 0;
  358. #endif // HAVE_FDATASYNC
  359. if (sync_success) {
  360. return Status::OK();
  361. }
  362. return PosixError(fd_path, errno);
  363. }
  364. // Returns the directory name in a path pointing to a file.
  365. //
  366. // Returns "." if the path does not contain any directory separator.
  367. static std::string Dirname(const std::string& filename) {
  368. std::string::size_type separator_pos = filename.rfind('/');
  369. if (separator_pos == std::string::npos) {
  370. return std::string(".");
  371. }
  372. // The filename component should not contain a path separator. If it does,
  373. // the splitting was done incorrectly.
  374. assert(filename.find('/', separator_pos + 1) == std::string::npos);
  375. return filename.substr(0, separator_pos);
  376. }
  377. // Extracts the file name from a path pointing to a file.
  378. //
  379. // The returned Slice points to |filename|'s data buffer, so it is only valid
  380. // while |filename| is alive and unchanged.
  381. static Slice Basename(const std::string& filename) {
  382. std::string::size_type separator_pos = filename.rfind('/');
  383. if (separator_pos == std::string::npos) {
  384. return Slice(filename);
  385. }
  386. // The filename component should not contain a path separator. If it does,
  387. // the splitting was done incorrectly.
  388. assert(filename.find('/', separator_pos + 1) == std::string::npos);
  389. return Slice(filename.data() + separator_pos + 1,
  390. filename.length() - separator_pos - 1);
  391. }
  392. // True if the given file is a manifest file.
  393. static bool IsManifest(const std::string& filename) {
  394. return Basename(filename).starts_with("MANIFEST");
  395. }
  396. // buf_[0, pos_ - 1] contains data to be written to fd_.
  397. char buf_[kWritableFileBufferSize];
  398. size_t pos_;
  399. int fd_;
  400. const bool is_manifest_; // True if the file's name starts with MANIFEST.
  401. const std::string filename_;
  402. const std::string dirname_; // The directory of filename_.
  403. };
  404. int LockOrUnlock(int fd, bool lock) {
  405. errno = 0;
  406. struct ::flock file_lock_info;
  407. std::memset(&file_lock_info, 0, sizeof(file_lock_info));
  408. file_lock_info.l_type = (lock ? F_WRLCK : F_UNLCK);
  409. file_lock_info.l_whence = SEEK_SET;
  410. file_lock_info.l_start = 0;
  411. file_lock_info.l_len = 0; // Lock/unlock entire file.
  412. return ::fcntl(fd, F_SETLK, &file_lock_info);
  413. }
  414. // Instances are thread-safe because they are immutable.
  415. class PosixFileLock : public FileLock {
  416. public:
  417. PosixFileLock(int fd, std::string filename)
  418. : fd_(fd), filename_(std::move(filename)) {}
  419. int fd() const { return fd_; }
  420. const std::string& filename() const { return filename_; }
  421. private:
  422. const int fd_;
  423. const std::string filename_;
  424. };
  425. // Tracks the files locked by PosixEnv::LockFile().
  426. //
  427. // We maintain a separate set instead of relying on fcntl(F_SETLK) because
  428. // fcntl(F_SETLK) does not provide any protection against multiple uses from the
  429. // same process.
  430. //
  431. // Instances are thread-safe because all member data is guarded by a mutex.
  432. class PosixLockTable {
  433. public:
  434. bool Insert(const std::string& fname) LOCKS_EXCLUDED(mu_) {
  435. mu_.Lock();
  436. bool succeeded = locked_files_.insert(fname).second;
  437. mu_.Unlock();
  438. return succeeded;
  439. }
  440. void Remove(const std::string& fname) LOCKS_EXCLUDED(mu_) {
  441. mu_.Lock();
  442. locked_files_.erase(fname);
  443. mu_.Unlock();
  444. }
  445. private:
  446. port::Mutex mu_;
  447. std::set<std::string> locked_files_ GUARDED_BY(mu_);
  448. };
  449. class PosixEnv : public Env {
  450. public:
  451. PosixEnv();
  452. ~PosixEnv() override {
  453. static const char msg[] =
  454. "PosixEnv singleton destroyed. Unsupported behavior!\n";
  455. std::fwrite(msg, 1, sizeof(msg), stderr);
  456. std::abort();
  457. }
  458. Status NewSequentialFile(const std::string& filename,
  459. SequentialFile** result) override {
  460. int fd = ::open(filename.c_str(), O_RDONLY | kOpenBaseFlags);
  461. if (fd < 0) {
  462. *result = nullptr;
  463. return PosixError(filename, errno);
  464. }
  465. *result = new PosixSequentialFile(filename, fd);
  466. return Status::OK();
  467. }
  468. Status NewRandomAccessFile(const std::string& filename,
  469. RandomAccessFile** result) override {
  470. *result = nullptr;
  471. int fd = ::open(filename.c_str(), O_RDONLY | kOpenBaseFlags);
  472. if (fd < 0) {
  473. return PosixError(filename, errno);
  474. }
  475. if (!mmap_limiter_.Acquire()) {
  476. *result = new PosixRandomAccessFile(filename, fd, &fd_limiter_);
  477. return Status::OK();
  478. }
  479. uint64_t file_size;
  480. Status status = GetFileSize(filename, &file_size);
  481. if (status.ok()) {
  482. void* mmap_base =
  483. ::mmap(/*addr=*/nullptr, file_size, PROT_READ, MAP_SHARED, fd, 0);
  484. if (mmap_base != MAP_FAILED) {
  485. *result = new PosixMmapReadableFile(filename,
  486. reinterpret_cast<char*>(mmap_base),
  487. file_size, &mmap_limiter_);
  488. } else {
  489. status = PosixError(filename, errno);
  490. }
  491. }
  492. ::close(fd);
  493. if (!status.ok()) {
  494. mmap_limiter_.Release();
  495. }
  496. return status;
  497. }
  498. Status NewWritableFile(const std::string& filename,
  499. WritableFile** result) override {
  500. int fd = ::open(filename.c_str(),
  501. O_TRUNC | O_WRONLY | O_CREAT | kOpenBaseFlags, 0644);
  502. if (fd < 0) {
  503. *result = nullptr;
  504. return PosixError(filename, errno);
  505. }
  506. *result = new PosixWritableFile(filename, fd);
  507. return Status::OK();
  508. }
  509. Status NewAppendableFile(const std::string& filename,
  510. WritableFile** result) override {
  511. int fd = ::open(filename.c_str(),
  512. O_APPEND | O_WRONLY | O_CREAT | kOpenBaseFlags, 0644);
  513. if (fd < 0) {
  514. *result = nullptr;
  515. return PosixError(filename, errno);
  516. }
  517. *result = new PosixWritableFile(filename, fd);
  518. return Status::OK();
  519. }
  520. bool FileExists(const std::string& filename) override {
  521. return ::access(filename.c_str(), F_OK) == 0;
  522. }
  523. Status GetChildren(const std::string& directory_path,
  524. std::vector<std::string>* result) override {
  525. result->clear();
  526. ::DIR* dir = ::opendir(directory_path.c_str());
  527. if (dir == nullptr) {
  528. return PosixError(directory_path, errno);
  529. }
  530. struct ::dirent* entry;
  531. while ((entry = ::readdir(dir)) != nullptr) {
  532. result->emplace_back(entry->d_name);
  533. }
  534. ::closedir(dir);
  535. return Status::OK();
  536. }
  537. Status RemoveFile(const std::string& filename) override {
  538. if (::unlink(filename.c_str()) != 0) {
  539. return PosixError(filename, errno);
  540. }
  541. return Status::OK();
  542. }
  543. Status CreateDir(const std::string& dirname) override {
  544. if (::mkdir(dirname.c_str(), 0755) != 0) {
  545. return PosixError(dirname, errno);
  546. }
  547. return Status::OK();
  548. }
  549. Status RemoveDir(const std::string& dirname) override {
  550. if (::rmdir(dirname.c_str()) != 0) {
  551. return PosixError(dirname, errno);
  552. }
  553. return Status::OK();
  554. }
  555. Status GetFileSize(const std::string& filename, uint64_t* size) override {
  556. struct ::stat file_stat;
  557. if (::stat(filename.c_str(), &file_stat) != 0) {
  558. *size = 0;
  559. return PosixError(filename, errno);
  560. }
  561. *size = file_stat.st_size;
  562. return Status::OK();
  563. }
  564. Status RenameFile(const std::string& from, const std::string& to) override {
  565. if (std::rename(from.c_str(), to.c_str()) != 0) {
  566. return PosixError(from, errno);
  567. }
  568. return Status::OK();
  569. }
  570. Status LockFile(const std::string& filename, FileLock** lock) override {
  571. *lock = nullptr;
  572. int fd = ::open(filename.c_str(), O_RDWR | O_CREAT | kOpenBaseFlags, 0644);
  573. if (fd < 0) {
  574. return PosixError(filename, errno);
  575. }
  576. if (!locks_.Insert(filename)) {
  577. ::close(fd);
  578. return Status::IOError("lock " + filename, "already held by process");
  579. }
  580. if (LockOrUnlock(fd, true) == -1) {
  581. int lock_errno = errno;
  582. ::close(fd);
  583. locks_.Remove(filename);
  584. return PosixError("lock " + filename, lock_errno);
  585. }
  586. *lock = new PosixFileLock(fd, filename);
  587. return Status::OK();
  588. }
  589. Status UnlockFile(FileLock* lock) override {
  590. PosixFileLock* posix_file_lock = static_cast<PosixFileLock*>(lock);
  591. if (LockOrUnlock(posix_file_lock->fd(), false) == -1) {
  592. return PosixError("unlock " + posix_file_lock->filename(), errno);
  593. }
  594. locks_.Remove(posix_file_lock->filename());
  595. ::close(posix_file_lock->fd());
  596. delete posix_file_lock;
  597. return Status::OK();
  598. }
  599. void Schedule(void (*background_work_function)(void* background_work_arg),
  600. void* background_work_arg) override;
  601. void StartThread(void (*thread_main)(void* thread_main_arg),
  602. void* thread_main_arg) override {
  603. std::thread new_thread(thread_main, thread_main_arg);
  604. new_thread.detach();
  605. }
  606. Status GetTestDirectory(std::string* result) override {
  607. const char* env = std::getenv("TEST_TMPDIR");
  608. if (env && env[0] != '\0') {
  609. *result = env;
  610. } else {
  611. char buf[100];
  612. std::snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d",
  613. static_cast<int>(::geteuid()));
  614. *result = buf;
  615. }
  616. // The CreateDir status is ignored because the directory may already exist.
  617. CreateDir(*result);
  618. return Status::OK();
  619. }
  620. Status NewLogger(const std::string& filename, Logger** result) override {
  621. int fd = ::open(filename.c_str(),
  622. O_APPEND | O_WRONLY | O_CREAT | kOpenBaseFlags, 0644);
  623. if (fd < 0) {
  624. *result = nullptr;
  625. return PosixError(filename, errno);
  626. }
  627. std::FILE* fp = ::fdopen(fd, "w");
  628. if (fp == nullptr) {
  629. ::close(fd);
  630. *result = nullptr;
  631. return PosixError(filename, errno);
  632. } else {
  633. *result = new PosixLogger(fp);
  634. return Status::OK();
  635. }
  636. }
  637. uint64_t NowMicros() override {
  638. static constexpr uint64_t kUsecondsPerSecond = 1000000;
  639. struct ::timeval tv;
  640. ::gettimeofday(&tv, nullptr);
  641. return static_cast<uint64_t>(tv.tv_sec) * kUsecondsPerSecond + tv.tv_usec;
  642. }
  643. void SleepForMicroseconds(int micros) override {
  644. std::this_thread::sleep_for(std::chrono::microseconds(micros));
  645. }
  646. private:
  647. void BackgroundThreadMain();
  648. static void BackgroundThreadEntryPoint(PosixEnv* env) {
  649. env->BackgroundThreadMain();
  650. }
  651. // Stores the work item data in a Schedule() call.
  652. //
  653. // Instances are constructed on the thread calling Schedule() and used on the
  654. // background thread.
  655. //
  656. // This structure is thread-safe because it is immutable.
  657. struct BackgroundWorkItem {
  658. explicit BackgroundWorkItem(void (*function)(void* arg), void* arg)
  659. : function(function), arg(arg) {}
  660. void (*const function)(void*);
  661. void* const arg;
  662. };
  663. port::Mutex background_work_mutex_;
  664. port::CondVar background_work_cv_ GUARDED_BY(background_work_mutex_);
  665. bool started_background_thread_ GUARDED_BY(background_work_mutex_);
  666. std::queue<BackgroundWorkItem> background_work_queue_
  667. GUARDED_BY(background_work_mutex_);
  668. PosixLockTable locks_; // Thread-safe.
  669. Limiter mmap_limiter_; // Thread-safe.
  670. Limiter fd_limiter_; // Thread-safe.
  671. };
  672. // Return the maximum number of concurrent mmaps.
  673. int MaxMmaps() { return g_mmap_limit; }
  674. // Return the maximum number of read-only files to keep open.
  675. int MaxOpenFiles() {
  676. if (g_open_read_only_file_limit >= 0) {
  677. return g_open_read_only_file_limit;
  678. }
  679. #ifdef __Fuchsia__
  680. // Fuchsia doesn't implement getrlimit.
  681. g_open_read_only_file_limit = 50;
  682. #else
  683. struct ::rlimit rlim;
  684. if (::getrlimit(RLIMIT_NOFILE, &rlim)) {
  685. // getrlimit failed, fallback to hard-coded default.
  686. g_open_read_only_file_limit = 50;
  687. } else if (rlim.rlim_cur == RLIM_INFINITY) {
  688. g_open_read_only_file_limit = std::numeric_limits<int>::max();
  689. } else {
  690. // Allow use of 20% of available file descriptors for read-only files.
  691. g_open_read_only_file_limit = rlim.rlim_cur / 5;
  692. }
  693. #endif
  694. return g_open_read_only_file_limit;
  695. }
  696. } // namespace
  697. PosixEnv::PosixEnv()
  698. : background_work_cv_(&background_work_mutex_),
  699. started_background_thread_(false),
  700. mmap_limiter_(MaxMmaps()),
  701. fd_limiter_(MaxOpenFiles()) {}
  702. void PosixEnv::Schedule(
  703. void (*background_work_function)(void* background_work_arg),
  704. void* background_work_arg) {
  705. background_work_mutex_.Lock();
  706. // Start the background thread, if we haven't done so already.
  707. if (!started_background_thread_) {
  708. started_background_thread_ = true;
  709. std::thread background_thread(PosixEnv::BackgroundThreadEntryPoint, this);
  710. background_thread.detach();
  711. }
  712. // If the queue is empty, the background thread may be waiting for work.
  713. if (background_work_queue_.empty()) {
  714. background_work_cv_.Signal();
  715. }
  716. background_work_queue_.emplace(background_work_function, background_work_arg);
  717. background_work_mutex_.Unlock();
  718. }
  719. void PosixEnv::BackgroundThreadMain() {
  720. while (true) {
  721. background_work_mutex_.Lock();
  722. // Wait until there is work to be done.
  723. while (background_work_queue_.empty()) {
  724. background_work_cv_.Wait();
  725. }
  726. assert(!background_work_queue_.empty());
  727. auto background_work_function = background_work_queue_.front().function;
  728. void* background_work_arg = background_work_queue_.front().arg;
  729. background_work_queue_.pop();
  730. background_work_mutex_.Unlock();
  731. background_work_function(background_work_arg);
  732. }
  733. }
  734. namespace {
  735. // Wraps an Env instance whose destructor is never created.
  736. //
  737. // Intended usage:
  738. // using PlatformSingletonEnv = SingletonEnv<PlatformEnv>;
  739. // void ConfigurePosixEnv(int param) {
  740. // PlatformSingletonEnv::AssertEnvNotInitialized();
  741. // // set global configuration flags.
  742. // }
  743. // Env* Env::Default() {
  744. // static PlatformSingletonEnv default_env;
  745. // return default_env.env();
  746. // }
  747. template <typename EnvType>
  748. class SingletonEnv {
  749. public:
  750. SingletonEnv() {
  751. #if !defined(NDEBUG)
  752. env_initialized_.store(true, std::memory_order_relaxed);
  753. #endif // !defined(NDEBUG)
  754. static_assert(sizeof(env_storage_) >= sizeof(EnvType),
  755. "env_storage_ will not fit the Env");
  756. static_assert(alignof(decltype(env_storage_)) >= alignof(EnvType),
  757. "env_storage_ does not meet the Env's alignment needs");
  758. new (&env_storage_) EnvType();
  759. }
  760. ~SingletonEnv() = default;
  761. SingletonEnv(const SingletonEnv&) = delete;
  762. SingletonEnv& operator=(const SingletonEnv&) = delete;
  763. Env* env() { return reinterpret_cast<Env*>(&env_storage_); }
  764. static void AssertEnvNotInitialized() {
  765. #if !defined(NDEBUG)
  766. assert(!env_initialized_.load(std::memory_order_relaxed));
  767. #endif // !defined(NDEBUG)
  768. }
  769. private:
  770. typename std::aligned_storage<sizeof(EnvType), alignof(EnvType)>::type
  771. env_storage_;
  772. #if !defined(NDEBUG)
  773. static std::atomic<bool> env_initialized_;
  774. #endif // !defined(NDEBUG)
  775. };
  776. #if !defined(NDEBUG)
  777. template <typename EnvType>
  778. std::atomic<bool> SingletonEnv<EnvType>::env_initialized_;
  779. #endif // !defined(NDEBUG)
  780. using PosixDefaultEnv = SingletonEnv<PosixEnv>;
  781. } // namespace
  782. void EnvPosixTestHelper::SetReadOnlyFDLimit(int limit) {
  783. PosixDefaultEnv::AssertEnvNotInitialized();
  784. g_open_read_only_file_limit = limit;
  785. }
  786. void EnvPosixTestHelper::SetReadOnlyMMapLimit(int limit) {
  787. PosixDefaultEnv::AssertEnvNotInitialized();
  788. g_mmap_limit = limit;
  789. }
  790. Env* Env::Default() {
  791. static PosixDefaultEnv env_container;
  792. return env_container.env();
  793. }
  794. } // namespace leveldb