You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

714 lines
21 KiB

  1. // Copyright (c) 2018 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. // Prevent Windows headers from defining min/max macros and instead
  5. // use STL.
  6. #define NOMINMAX
  7. #include <windows.h>
  8. #include <algorithm>
  9. #include <atomic>
  10. #include <chrono>
  11. #include <condition_variable>
  12. #include <deque>
  13. #include <memory>
  14. #include <mutex>
  15. #include <sstream>
  16. #include <string>
  17. #include <vector>
  18. #include "leveldb/env.h"
  19. #include "leveldb/slice.h"
  20. #include "port/port.h"
  21. #include "port/thread_annotations.h"
  22. #include "util/env_windows_test_helper.h"
  23. #include "util/logging.h"
  24. #include "util/mutexlock.h"
  25. #include "util/windows_logger.h"
  26. #if defined(DeleteFile)
  27. #undef DeleteFile
  28. #endif // defined(DeleteFile)
  29. namespace leveldb {
  30. namespace {
  31. constexpr const size_t kWritableFileBufferSize = 65536;
  32. // Up to 1000 mmaps for 64-bit binaries; none for 32-bit.
  33. constexpr int kDefaultMmapLimit = sizeof(void*) >= 8 ? 1000 : 0;
  34. // Modified by EnvWindowsTestHelper::SetReadOnlyMMapLimit().
  35. int g_mmap_limit = kDefaultMmapLimit;
  36. std::string GetWindowsErrorMessage(DWORD error_code) {
  37. std::string message;
  38. char* error_text = nullptr;
  39. // Use MBCS version of FormatMessage to match return value.
  40. size_t error_text_size = ::FormatMessageA(
  41. FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER |
  42. FORMAT_MESSAGE_IGNORE_INSERTS,
  43. nullptr, error_code, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
  44. reinterpret_cast<char*>(&error_text), 0, nullptr);
  45. if (!error_text) {
  46. return message;
  47. }
  48. message.assign(error_text, error_text_size);
  49. ::LocalFree(error_text);
  50. return message;
  51. }
  52. Status WindowsError(const std::string& context, DWORD error_code) {
  53. if (error_code == ERROR_FILE_NOT_FOUND || error_code == ERROR_PATH_NOT_FOUND)
  54. return Status::NotFound(context, GetWindowsErrorMessage(error_code));
  55. return Status::IOError(context, GetWindowsErrorMessage(error_code));
  56. }
  57. class ScopedHandle {
  58. public:
  59. ScopedHandle(HANDLE handle) : handle_(handle) {}
  60. ScopedHandle(ScopedHandle&& other) noexcept : handle_(other.Release()) {}
  61. ~ScopedHandle() { Close(); }
  62. ScopedHandle& operator=(ScopedHandle&& rhs) noexcept {
  63. if (this != &rhs) handle_ = rhs.Release();
  64. return *this;
  65. }
  66. bool Close() {
  67. if (!is_valid()) {
  68. return true;
  69. }
  70. HANDLE h = handle_;
  71. handle_ = INVALID_HANDLE_VALUE;
  72. return ::CloseHandle(h);
  73. }
  74. bool is_valid() const {
  75. return handle_ != INVALID_HANDLE_VALUE && handle_ != nullptr;
  76. }
  77. HANDLE get() const { return handle_; }
  78. HANDLE Release() {
  79. HANDLE h = handle_;
  80. handle_ = INVALID_HANDLE_VALUE;
  81. return h;
  82. }
  83. private:
  84. HANDLE handle_;
  85. };
  86. // Helper class to limit resource usage to avoid exhaustion.
  87. // Currently used to limit read-only file descriptors and mmap file usage
  88. // so that we do not run out of file descriptors or virtual memory, or run into
  89. // kernel performance problems for very large databases.
  90. class Limiter {
  91. public:
  92. // Limit maximum number of resources to |max_acquires|.
  93. Limiter(int max_acquires) : acquires_allowed_(max_acquires) {}
  94. Limiter(const Limiter&) = delete;
  95. Limiter operator=(const Limiter&) = delete;
  96. // If another resource is available, acquire it and return true.
  97. // Else return false.
  98. bool Acquire() {
  99. int old_acquires_allowed =
  100. acquires_allowed_.fetch_sub(1, std::memory_order_relaxed);
  101. if (old_acquires_allowed > 0)
  102. return true;
  103. acquires_allowed_.fetch_add(1, std::memory_order_relaxed);
  104. return false;
  105. }
  106. // Release a resource acquired by a previous call to Acquire() that returned
  107. // true.
  108. void Release() {
  109. acquires_allowed_.fetch_add(1, std::memory_order_relaxed);
  110. }
  111. private:
  112. // The number of available resources.
  113. //
  114. // This is a counter and is not tied to the invariants of any other class, so
  115. // it can be operated on safely using std::memory_order_relaxed.
  116. std::atomic<int> acquires_allowed_;
  117. };
  118. class WindowsSequentialFile : public SequentialFile {
  119. public:
  120. WindowsSequentialFile(std::string fname, ScopedHandle file)
  121. : filename_(fname), file_(std::move(file)) {}
  122. ~WindowsSequentialFile() override {}
  123. Status Read(size_t n, Slice* result, char* scratch) override {
  124. Status s;
  125. DWORD bytes_read;
  126. // DWORD is 32-bit, but size_t could technically be larger. However leveldb
  127. // files are limited to leveldb::Options::max_file_size which is clamped to
  128. // 1<<30 or 1 GiB.
  129. assert(n <= std::numeric_limits<DWORD>::max());
  130. if (!::ReadFile(file_.get(), scratch, static_cast<DWORD>(n), &bytes_read,
  131. nullptr)) {
  132. s = WindowsError(filename_, ::GetLastError());
  133. } else {
  134. *result = Slice(scratch, bytes_read);
  135. }
  136. return s;
  137. }
  138. Status Skip(uint64_t n) override {
  139. LARGE_INTEGER distance;
  140. distance.QuadPart = n;
  141. if (!::SetFilePointerEx(file_.get(), distance, nullptr, FILE_CURRENT)) {
  142. return WindowsError(filename_, ::GetLastError());
  143. }
  144. return Status::OK();
  145. }
  146. private:
  147. std::string filename_;
  148. ScopedHandle file_;
  149. };
  150. class WindowsRandomAccessFile : public RandomAccessFile {
  151. public:
  152. WindowsRandomAccessFile(std::string fname, ScopedHandle handle)
  153. : filename_(fname), handle_(std::move(handle)) {}
  154. ~WindowsRandomAccessFile() override = default;
  155. Status Read(uint64_t offset, size_t n, Slice* result,
  156. char* scratch) const override {
  157. DWORD bytes_read = 0;
  158. OVERLAPPED overlapped = {0};
  159. overlapped.OffsetHigh = static_cast<DWORD>(offset >> 32);
  160. overlapped.Offset = static_cast<DWORD>(offset);
  161. if (!::ReadFile(handle_.get(), scratch, static_cast<DWORD>(n), &bytes_read,
  162. &overlapped)) {
  163. DWORD error_code = ::GetLastError();
  164. if (error_code != ERROR_HANDLE_EOF) {
  165. *result = Slice(scratch, 0);
  166. return Status::IOError(filename_, GetWindowsErrorMessage(error_code));
  167. }
  168. }
  169. *result = Slice(scratch, bytes_read);
  170. return Status::OK();
  171. }
  172. private:
  173. std::string filename_;
  174. ScopedHandle handle_;
  175. };
  176. class WindowsMmapReadableFile : public RandomAccessFile {
  177. public:
  178. // base[0,length-1] contains the mmapped contents of the file.
  179. WindowsMmapReadableFile(std::string fname, void* base, size_t length,
  180. Limiter* limiter)
  181. : filename_(std::move(fname)),
  182. mmapped_region_(base),
  183. length_(length),
  184. limiter_(limiter) {}
  185. ~WindowsMmapReadableFile() override {
  186. ::UnmapViewOfFile(mmapped_region_);
  187. limiter_->Release();
  188. }
  189. Status Read(uint64_t offset, size_t n, Slice* result,
  190. char* scratch) const override {
  191. Status s;
  192. if (offset + n > length_) {
  193. *result = Slice();
  194. s = WindowsError(filename_, ERROR_INVALID_PARAMETER);
  195. } else {
  196. *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
  197. }
  198. return s;
  199. }
  200. private:
  201. std::string filename_;
  202. void* mmapped_region_;
  203. size_t length_;
  204. Limiter* limiter_;
  205. };
  206. class WindowsWritableFile : public WritableFile {
  207. public:
  208. WindowsWritableFile(std::string fname, ScopedHandle handle)
  209. : filename_(std::move(fname)), handle_(std::move(handle)), pos_(0) {}
  210. ~WindowsWritableFile() override = default;
  211. Status Append(const Slice& data) override {
  212. size_t n = data.size();
  213. const char* p = data.data();
  214. // Fit as much as possible into buffer.
  215. size_t copy = std::min(n, kWritableFileBufferSize - pos_);
  216. memcpy(buf_ + pos_, p, copy);
  217. p += copy;
  218. n -= copy;
  219. pos_ += copy;
  220. if (n == 0) {
  221. return Status::OK();
  222. }
  223. // Can't fit in buffer, so need to do at least one write.
  224. Status s = FlushBuffered();
  225. if (!s.ok()) {
  226. return s;
  227. }
  228. // Small writes go to buffer, large writes are written directly.
  229. if (n < kWritableFileBufferSize) {
  230. memcpy(buf_, p, n);
  231. pos_ = n;
  232. return Status::OK();
  233. }
  234. return WriteRaw(p, n);
  235. }
  236. Status Close() override {
  237. Status result = FlushBuffered();
  238. if (!handle_.Close() && result.ok()) {
  239. result = WindowsError(filename_, ::GetLastError());
  240. }
  241. return result;
  242. }
  243. Status Flush() override { return FlushBuffered(); }
  244. Status Sync() override {
  245. // On Windows no need to sync parent directory. It's metadata will be
  246. // updated via the creation of the new file, without an explicit sync.
  247. return FlushBuffered();
  248. }
  249. private:
  250. Status FlushBuffered() {
  251. Status s = WriteRaw(buf_, pos_);
  252. pos_ = 0;
  253. return s;
  254. }
  255. Status WriteRaw(const char* p, size_t n) {
  256. DWORD bytes_written;
  257. if (!::WriteFile(handle_.get(), p, static_cast<DWORD>(n), &bytes_written,
  258. nullptr)) {
  259. return Status::IOError(filename_,
  260. GetWindowsErrorMessage(::GetLastError()));
  261. }
  262. return Status::OK();
  263. }
  264. // buf_[0, pos_-1] contains data to be written to handle_.
  265. const std::string filename_;
  266. ScopedHandle handle_;
  267. char buf_[kWritableFileBufferSize];
  268. size_t pos_;
  269. };
  270. // Lock or unlock the entire file as specified by |lock|. Returns true
  271. // when successful, false upon failure. Caller should call ::GetLastError()
  272. // to determine cause of failure
  273. bool LockOrUnlock(HANDLE handle, bool lock) {
  274. if (lock) {
  275. return ::LockFile(handle,
  276. /*dwFileOffsetLow=*/0, /*dwFileOffsetHigh=*/0,
  277. /*nNumberOfBytesToLockLow=*/MAXDWORD,
  278. /*nNumberOfBytesToLockHigh=*/MAXDWORD);
  279. } else {
  280. return ::UnlockFile(handle,
  281. /*dwFileOffsetLow=*/0, /*dwFileOffsetHigh=*/0,
  282. /*nNumberOfBytesToLockLow=*/MAXDWORD,
  283. /*nNumberOfBytesToLockHigh=*/MAXDWORD);
  284. }
  285. }
  286. class WindowsFileLock : public FileLock {
  287. public:
  288. WindowsFileLock(ScopedHandle handle, std::string name)
  289. : handle_(std::move(handle)), name_(std::move(name)) {}
  290. ScopedHandle& handle() { return handle_; }
  291. const std::string& name() const { return name_; }
  292. private:
  293. ScopedHandle handle_;
  294. std::string name_;
  295. };
  296. class WindowsEnv : public Env {
  297. public:
  298. WindowsEnv();
  299. ~WindowsEnv() override {
  300. static char msg[] = "Destroying Env::Default()\n";
  301. fwrite(msg, 1, sizeof(msg), stderr);
  302. abort();
  303. }
  304. Status NewSequentialFile(const std::string& fname,
  305. SequentialFile** result) override {
  306. *result = nullptr;
  307. DWORD desired_access = GENERIC_READ;
  308. DWORD share_mode = FILE_SHARE_READ;
  309. ScopedHandle handle =
  310. ::CreateFileA(fname.c_str(), desired_access, share_mode, nullptr,
  311. OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, nullptr);
  312. if (!handle.is_valid()) {
  313. return WindowsError(fname, ::GetLastError());
  314. }
  315. *result = new WindowsSequentialFile(fname, std::move(handle));
  316. return Status::OK();
  317. }
  318. Status NewRandomAccessFile(const std::string& fname,
  319. RandomAccessFile** result) override {
  320. *result = nullptr;
  321. DWORD desired_access = GENERIC_READ;
  322. DWORD share_mode = FILE_SHARE_READ;
  323. DWORD file_flags = FILE_ATTRIBUTE_READONLY;
  324. ScopedHandle handle =
  325. ::CreateFileA(fname.c_str(), desired_access, share_mode, nullptr,
  326. OPEN_EXISTING, file_flags, nullptr);
  327. if (!handle.is_valid()) {
  328. return WindowsError(fname, ::GetLastError());
  329. }
  330. if (!mmap_limiter_.Acquire()) {
  331. *result = new WindowsRandomAccessFile(fname, std::move(handle));
  332. return Status::OK();
  333. }
  334. LARGE_INTEGER file_size;
  335. if (!::GetFileSizeEx(handle.get(), &file_size)) {
  336. return WindowsError(fname, ::GetLastError());
  337. }
  338. ScopedHandle mapping =
  339. ::CreateFileMappingA(handle.get(),
  340. /*security attributes=*/nullptr, PAGE_READONLY,
  341. /*dwMaximumSizeHigh=*/0,
  342. /*dwMaximumSizeLow=*/0, nullptr);
  343. if (mapping.is_valid()) {
  344. void* base = MapViewOfFile(mapping.get(), FILE_MAP_READ, 0, 0, 0);
  345. if (base) {
  346. *result = new WindowsMmapReadableFile(
  347. fname, base, static_cast<size_t>(file_size.QuadPart),
  348. &mmap_limiter_);
  349. return Status::OK();
  350. }
  351. }
  352. Status s = WindowsError(fname, ::GetLastError());
  353. if (!s.ok()) {
  354. mmap_limiter_.Release();
  355. }
  356. return s;
  357. }
  358. Status NewWritableFile(const std::string& fname,
  359. WritableFile** result) override {
  360. DWORD desired_access = GENERIC_WRITE;
  361. DWORD share_mode = 0;
  362. ScopedHandle handle =
  363. ::CreateFileA(fname.c_str(), desired_access, share_mode, nullptr,
  364. CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, nullptr);
  365. if (!handle.is_valid()) {
  366. *result = nullptr;
  367. return WindowsError(fname, ::GetLastError());
  368. }
  369. *result = new WindowsWritableFile(fname, std::move(handle));
  370. return Status::OK();
  371. }
  372. Status NewAppendableFile(const std::string& fname,
  373. WritableFile** result) override {
  374. ScopedHandle handle =
  375. ::CreateFileA(fname.c_str(), FILE_APPEND_DATA, 0, nullptr, OPEN_ALWAYS,
  376. FILE_ATTRIBUTE_NORMAL, nullptr);
  377. if (!handle.is_valid()) {
  378. *result = nullptr;
  379. return WindowsError(fname, ::GetLastError());
  380. }
  381. *result = new WindowsWritableFile(fname, std::move(handle));
  382. return Status::OK();
  383. }
  384. bool FileExists(const std::string& fname) override {
  385. return GetFileAttributesA(fname.c_str()) != INVALID_FILE_ATTRIBUTES;
  386. }
  387. Status GetChildren(const std::string& dir,
  388. std::vector<std::string>* result) override {
  389. const std::string find_pattern = dir + "\\*";
  390. WIN32_FIND_DATAA find_data;
  391. HANDLE dir_handle = ::FindFirstFileA(find_pattern.c_str(), &find_data);
  392. if (dir_handle == INVALID_HANDLE_VALUE) {
  393. DWORD last_error = ::GetLastError();
  394. if (last_error == ERROR_FILE_NOT_FOUND) {
  395. return Status::OK();
  396. }
  397. return WindowsError(dir, last_error);
  398. }
  399. do {
  400. char base_name[_MAX_FNAME];
  401. char ext[_MAX_EXT];
  402. if (!_splitpath_s(find_data.cFileName, nullptr, 0, nullptr, 0, base_name,
  403. ARRAYSIZE(base_name), ext, ARRAYSIZE(ext))) {
  404. result->emplace_back(std::string(base_name) + ext);
  405. }
  406. } while (::FindNextFileA(dir_handle, &find_data));
  407. DWORD last_error = ::GetLastError();
  408. ::FindClose(dir_handle);
  409. if (last_error != ERROR_NO_MORE_FILES) {
  410. return WindowsError(dir, last_error);
  411. }
  412. return Status::OK();
  413. }
  414. Status DeleteFile(const std::string& fname) override {
  415. if (!::DeleteFileA(fname.c_str())) {
  416. return WindowsError(fname, ::GetLastError());
  417. }
  418. return Status::OK();
  419. }
  420. Status CreateDir(const std::string& name) override {
  421. if (!::CreateDirectoryA(name.c_str(), nullptr)) {
  422. return WindowsError(name, ::GetLastError());
  423. }
  424. return Status::OK();
  425. }
  426. Status DeleteDir(const std::string& name) override {
  427. if (!::RemoveDirectoryA(name.c_str())) {
  428. return WindowsError(name, ::GetLastError());
  429. }
  430. return Status::OK();
  431. }
  432. Status GetFileSize(const std::string& fname, uint64_t* size) override {
  433. WIN32_FILE_ATTRIBUTE_DATA attrs;
  434. if (!::GetFileAttributesExA(fname.c_str(), GetFileExInfoStandard, &attrs)) {
  435. return WindowsError(fname, ::GetLastError());
  436. }
  437. ULARGE_INTEGER file_size;
  438. file_size.HighPart = attrs.nFileSizeHigh;
  439. file_size.LowPart = attrs.nFileSizeLow;
  440. *size = file_size.QuadPart;
  441. return Status::OK();
  442. }
  443. Status RenameFile(const std::string& src,
  444. const std::string& target) override {
  445. // Try a simple move first. It will only succeed when |to_path| doesn't
  446. // already exist.
  447. if (::MoveFileA(src.c_str(), target.c_str())) {
  448. return Status::OK();
  449. }
  450. DWORD move_error = ::GetLastError();
  451. // Try the full-blown replace if the move fails, as ReplaceFile will only
  452. // succeed when |to_path| does exist. When writing to a network share, we
  453. // may not be able to change the ACLs. Ignore ACL errors then
  454. // (REPLACEFILE_IGNORE_MERGE_ERRORS).
  455. if (::ReplaceFileA(target.c_str(), src.c_str(), nullptr,
  456. REPLACEFILE_IGNORE_MERGE_ERRORS, nullptr, nullptr)) {
  457. return Status::OK();
  458. }
  459. DWORD replace_error = ::GetLastError();
  460. // In the case of FILE_ERROR_NOT_FOUND from ReplaceFile, it is likely
  461. // that |to_path| does not exist. In this case, the more relevant error
  462. // comes from the call to MoveFile.
  463. if (replace_error == ERROR_FILE_NOT_FOUND ||
  464. replace_error == ERROR_PATH_NOT_FOUND) {
  465. return WindowsError(src, move_error);
  466. } else {
  467. return WindowsError(src, replace_error);
  468. }
  469. }
  470. Status LockFile(const std::string& fname, FileLock** lock) override {
  471. *lock = nullptr;
  472. Status result;
  473. ScopedHandle handle = ::CreateFileA(
  474. fname.c_str(), GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ,
  475. /*lpSecurityAttributes=*/nullptr, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL,
  476. nullptr);
  477. if (!handle.is_valid()) {
  478. result = WindowsError(fname, ::GetLastError());
  479. } else if (!LockOrUnlock(handle.get(), true)) {
  480. result = WindowsError("lock " + fname, ::GetLastError());
  481. } else {
  482. *lock = new WindowsFileLock(std::move(handle), std::move(fname));
  483. }
  484. return result;
  485. }
  486. Status UnlockFile(FileLock* lock) override {
  487. std::unique_ptr<WindowsFileLock> my_lock(
  488. reinterpret_cast<WindowsFileLock*>(lock));
  489. Status result;
  490. if (!LockOrUnlock(my_lock->handle().get(), false)) {
  491. result = WindowsError("unlock", ::GetLastError());
  492. }
  493. return result;
  494. }
  495. void Schedule(void (*function)(void*), void* arg) override;
  496. void StartThread(void (*function)(void* arg), void* arg) override {
  497. std::thread t(function, arg);
  498. t.detach();
  499. }
  500. Status GetTestDirectory(std::string* result) override {
  501. const char* env = getenv("TEST_TMPDIR");
  502. if (env && env[0] != '\0') {
  503. *result = env;
  504. return Status::OK();
  505. }
  506. char tmp_path[MAX_PATH];
  507. if (!GetTempPathA(ARRAYSIZE(tmp_path), tmp_path)) {
  508. return WindowsError("GetTempPath", ::GetLastError());
  509. }
  510. std::stringstream ss;
  511. ss << tmp_path << "leveldbtest-" << std::this_thread::get_id();
  512. *result = ss.str();
  513. // Directory may already exist
  514. CreateDir(*result);
  515. return Status::OK();
  516. }
  517. Status NewLogger(const std::string& filename, Logger** result) override {
  518. std::FILE* fp = std::fopen(filename.c_str(), "w");
  519. if (fp == nullptr) {
  520. *result = nullptr;
  521. return WindowsError("NewLogger", ::GetLastError());
  522. } else {
  523. *result = new WindowsLogger(fp);
  524. return Status::OK();
  525. }
  526. }
  527. uint64_t NowMicros() override {
  528. // GetSystemTimeAsFileTime typically has a resolution of 10-20 msec.
  529. // TODO(cmumford): Switch to GetSystemTimePreciseAsFileTime which is
  530. // available in Windows 8 and later.
  531. FILETIME ft;
  532. ::GetSystemTimeAsFileTime(&ft);
  533. // Each tick represents a 100-nanosecond intervals since January 1, 1601
  534. // (UTC).
  535. uint64_t num_ticks =
  536. (static_cast<uint64_t>(ft.dwHighDateTime) << 32) + ft.dwLowDateTime;
  537. return num_ticks / 10;
  538. }
  539. void SleepForMicroseconds(int micros) override {
  540. std::this_thread::sleep_for(std::chrono::microseconds(micros));
  541. }
  542. private:
  543. // BGThread() is the body of the background thread
  544. void BGThread();
  545. std::mutex mu_;
  546. std::condition_variable bgsignal_;
  547. bool started_bgthread_;
  548. // Entry per Schedule() call
  549. struct BGItem {
  550. void* arg;
  551. void (*function)(void*);
  552. };
  553. typedef std::deque<BGItem> BGQueue;
  554. BGQueue queue_;
  555. Limiter mmap_limiter_;
  556. };
  557. // Return the maximum number of concurrent mmaps.
  558. int MaxMmaps() {
  559. if (g_mmap_limit >= 0) {
  560. return g_mmap_limit;
  561. }
  562. // Up to 1000 mmaps for 64-bit binaries; none for smaller pointer sizes.
  563. g_mmap_limit = sizeof(void*) >= 8 ? 1000 : 0;
  564. return g_mmap_limit;
  565. }
  566. WindowsEnv::WindowsEnv()
  567. : started_bgthread_(false), mmap_limiter_(MaxMmaps()) {}
  568. void WindowsEnv::Schedule(void (*function)(void*), void* arg) {
  569. std::lock_guard<std::mutex> guard(mu_);
  570. // Start background thread if necessary
  571. if (!started_bgthread_) {
  572. started_bgthread_ = true;
  573. std::thread t(&WindowsEnv::BGThread, this);
  574. t.detach();
  575. }
  576. // If the queue is currently empty, the background thread may currently be
  577. // waiting.
  578. if (queue_.empty()) {
  579. bgsignal_.notify_one();
  580. }
  581. // Add to priority queue
  582. queue_.push_back(BGItem());
  583. queue_.back().function = function;
  584. queue_.back().arg = arg;
  585. }
  586. void WindowsEnv::BGThread() {
  587. while (true) {
  588. // Wait until there is an item that is ready to run
  589. std::unique_lock<std::mutex> lk(mu_);
  590. bgsignal_.wait(lk, [this] { return !queue_.empty(); });
  591. void (*function)(void*) = queue_.front().function;
  592. void* arg = queue_.front().arg;
  593. queue_.pop_front();
  594. lk.unlock();
  595. (*function)(arg);
  596. }
  597. }
  598. } // namespace
  599. static std::once_flag once;
  600. static Env* default_env;
  601. static void InitDefaultEnv() { default_env = new WindowsEnv(); }
  602. void EnvWindowsTestHelper::SetReadOnlyMMapLimit(int limit) {
  603. assert(default_env == nullptr);
  604. g_mmap_limit = limit;
  605. }
  606. Env* Env::Default() {
  607. std::call_once(once, InitDefaultEnv);
  608. return default_env;
  609. }
  610. } // namespace leveldb