作者: 韩晨旭 10225101440 李畅 10225102463
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

609 lines
15 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include <deque>
  5. #include <dirent.h>
  6. #include <errno.h>
  7. #include <fcntl.h>
  8. #include <pthread.h>
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <string.h>
  12. #include <sys/mman.h>
  13. #include <sys/stat.h>
  14. #include <sys/time.h>
  15. #include <sys/types.h>
  16. #include <time.h>
  17. #include <unistd.h>
  18. #if defined(LEVELDB_PLATFORM_ANDROID)
  19. #include <sys/stat.h>
  20. #endif
  21. #include "leveldb/env.h"
  22. #include "leveldb/slice.h"
  23. #include "port/port.h"
  24. #include "util/logging.h"
  25. #include "util/posix_logger.h"
  26. namespace leveldb {
  27. namespace {
  28. static Status IOError(const std::string& context, int err_number) {
  29. return Status::IOError(context, strerror(err_number));
  30. }
  31. class PosixSequentialFile: public SequentialFile {
  32. private:
  33. std::string filename_;
  34. FILE* file_;
  35. public:
  36. PosixSequentialFile(const std::string& fname, FILE* f)
  37. : filename_(fname), file_(f) { }
  38. virtual ~PosixSequentialFile() { fclose(file_); }
  39. virtual Status Read(size_t n, Slice* result, char* scratch) {
  40. Status s;
  41. size_t r = fread_unlocked(scratch, 1, n, file_);
  42. *result = Slice(scratch, r);
  43. if (r < n) {
  44. if (feof(file_)) {
  45. // We leave status as ok if we hit the end of the file
  46. } else {
  47. // A partial read with an error: return a non-ok status
  48. s = IOError(filename_, errno);
  49. }
  50. }
  51. return s;
  52. }
  53. virtual Status Skip(uint64_t n) {
  54. if (fseek(file_, n, SEEK_CUR)) {
  55. return IOError(filename_, errno);
  56. }
  57. return Status::OK();
  58. }
  59. };
  60. // pread() based random-access
  61. class PosixRandomAccessFile: public RandomAccessFile {
  62. private:
  63. std::string filename_;
  64. int fd_;
  65. public:
  66. PosixRandomAccessFile(const std::string& fname, int fd)
  67. : filename_(fname), fd_(fd) { }
  68. virtual ~PosixRandomAccessFile() { close(fd_); }
  69. virtual Status Read(uint64_t offset, size_t n, Slice* result,
  70. char* scratch) const {
  71. Status s;
  72. ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
  73. *result = Slice(scratch, (r < 0) ? 0 : r);
  74. if (r < 0) {
  75. // An error: return a non-ok status
  76. s = IOError(filename_, errno);
  77. }
  78. return s;
  79. }
  80. };
  81. // mmap() based random-access
  82. class PosixMmapReadableFile: public RandomAccessFile {
  83. private:
  84. std::string filename_;
  85. void* mmapped_region_;
  86. size_t length_;
  87. public:
  88. // base[0,length-1] contains the mmapped contents of the file.
  89. PosixMmapReadableFile(const std::string& fname, void* base, size_t length)
  90. : filename_(fname), mmapped_region_(base), length_(length) { }
  91. virtual ~PosixMmapReadableFile() { munmap(mmapped_region_, length_); }
  92. virtual Status Read(uint64_t offset, size_t n, Slice* result,
  93. char* scratch) const {
  94. Status s;
  95. if (offset + n > length_) {
  96. *result = Slice();
  97. s = IOError(filename_, EINVAL);
  98. } else {
  99. *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
  100. }
  101. return s;
  102. }
  103. };
  104. // We preallocate up to an extra megabyte and use memcpy to append new
  105. // data to the file. This is safe since we either properly close the
  106. // file before reading from it, or for log files, the reading code
  107. // knows enough to skip zero suffixes.
  108. class PosixMmapFile : public WritableFile {
  109. private:
  110. std::string filename_;
  111. int fd_;
  112. size_t page_size_;
  113. size_t map_size_; // How much extra memory to map at a time
  114. char* base_; // The mapped region
  115. char* limit_; // Limit of the mapped region
  116. char* dst_; // Where to write next (in range [base_,limit_])
  117. char* last_sync_; // Where have we synced up to
  118. uint64_t file_offset_; // Offset of base_ in file
  119. // Have we done an munmap of unsynced data?
  120. bool pending_sync_;
  121. // Roundup x to a multiple of y
  122. static size_t Roundup(size_t x, size_t y) {
  123. return ((x + y - 1) / y) * y;
  124. }
  125. size_t TruncateToPageBoundary(size_t s) {
  126. s -= (s & (page_size_ - 1));
  127. assert((s % page_size_) == 0);
  128. return s;
  129. }
  130. bool UnmapCurrentRegion() {
  131. bool result = true;
  132. if (base_ != NULL) {
  133. if (last_sync_ < limit_) {
  134. // Defer syncing this data until next Sync() call, if any
  135. pending_sync_ = true;
  136. }
  137. if (munmap(base_, limit_ - base_) != 0) {
  138. result = false;
  139. }
  140. file_offset_ += limit_ - base_;
  141. base_ = NULL;
  142. limit_ = NULL;
  143. last_sync_ = NULL;
  144. dst_ = NULL;
  145. // Increase the amount we map the next time, but capped at 1MB
  146. if (map_size_ < (1<<20)) {
  147. map_size_ *= 2;
  148. }
  149. }
  150. return result;
  151. }
  152. bool MapNewRegion() {
  153. assert(base_ == NULL);
  154. if (ftruncate(fd_, file_offset_ + map_size_) < 0) {
  155. return false;
  156. }
  157. void* ptr = mmap(NULL, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED,
  158. fd_, file_offset_);
  159. if (ptr == MAP_FAILED) {
  160. return false;
  161. }
  162. base_ = reinterpret_cast<char*>(ptr);
  163. limit_ = base_ + map_size_;
  164. dst_ = base_;
  165. last_sync_ = base_;
  166. return true;
  167. }
  168. public:
  169. PosixMmapFile(const std::string& fname, int fd, size_t page_size)
  170. : filename_(fname),
  171. fd_(fd),
  172. page_size_(page_size),
  173. map_size_(Roundup(65536, page_size)),
  174. base_(NULL),
  175. limit_(NULL),
  176. dst_(NULL),
  177. last_sync_(NULL),
  178. file_offset_(0),
  179. pending_sync_(false) {
  180. assert((page_size & (page_size - 1)) == 0);
  181. }
  182. ~PosixMmapFile() {
  183. if (fd_ >= 0) {
  184. PosixMmapFile::Close();
  185. }
  186. }
  187. virtual Status Append(const Slice& data) {
  188. const char* src = data.data();
  189. size_t left = data.size();
  190. while (left > 0) {
  191. assert(base_ <= dst_);
  192. assert(dst_ <= limit_);
  193. size_t avail = limit_ - dst_;
  194. if (avail == 0) {
  195. if (!UnmapCurrentRegion() ||
  196. !MapNewRegion()) {
  197. return IOError(filename_, errno);
  198. }
  199. }
  200. size_t n = (left <= avail) ? left : avail;
  201. memcpy(dst_, src, n);
  202. dst_ += n;
  203. src += n;
  204. left -= n;
  205. }
  206. return Status::OK();
  207. }
  208. virtual Status Close() {
  209. Status s;
  210. size_t unused = limit_ - dst_;
  211. if (!UnmapCurrentRegion()) {
  212. s = IOError(filename_, errno);
  213. } else if (unused > 0) {
  214. // Trim the extra space at the end of the file
  215. if (ftruncate(fd_, file_offset_ - unused) < 0) {
  216. s = IOError(filename_, errno);
  217. }
  218. }
  219. if (close(fd_) < 0) {
  220. if (s.ok()) {
  221. s = IOError(filename_, errno);
  222. }
  223. }
  224. fd_ = -1;
  225. base_ = NULL;
  226. limit_ = NULL;
  227. return s;
  228. }
  229. virtual Status Flush() {
  230. return Status::OK();
  231. }
  232. virtual Status Sync() {
  233. Status s;
  234. if (pending_sync_) {
  235. // Some unmapped data was not synced
  236. pending_sync_ = false;
  237. if (fdatasync(fd_) < 0) {
  238. s = IOError(filename_, errno);
  239. }
  240. }
  241. if (dst_ > last_sync_) {
  242. // Find the beginnings of the pages that contain the first and last
  243. // bytes to be synced.
  244. size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
  245. size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
  246. last_sync_ = dst_;
  247. if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
  248. s = IOError(filename_, errno);
  249. }
  250. }
  251. return s;
  252. }
  253. };
  254. static int LockOrUnlock(int fd, bool lock) {
  255. errno = 0;
  256. struct flock f;
  257. memset(&f, 0, sizeof(f));
  258. f.l_type = (lock ? F_WRLCK : F_UNLCK);
  259. f.l_whence = SEEK_SET;
  260. f.l_start = 0;
  261. f.l_len = 0; // Lock/unlock entire file
  262. return fcntl(fd, F_SETLK, &f);
  263. }
  264. class PosixFileLock : public FileLock {
  265. public:
  266. int fd_;
  267. };
  268. class PosixEnv : public Env {
  269. public:
  270. PosixEnv();
  271. virtual ~PosixEnv() {
  272. fprintf(stderr, "Destroying Env::Default()\n");
  273. exit(1);
  274. }
  275. virtual Status NewSequentialFile(const std::string& fname,
  276. SequentialFile** result) {
  277. FILE* f = fopen(fname.c_str(), "r");
  278. if (f == NULL) {
  279. *result = NULL;
  280. return IOError(fname, errno);
  281. } else {
  282. *result = new PosixSequentialFile(fname, f);
  283. return Status::OK();
  284. }
  285. }
  286. virtual Status NewRandomAccessFile(const std::string& fname,
  287. RandomAccessFile** result) {
  288. *result = NULL;
  289. Status s;
  290. int fd = open(fname.c_str(), O_RDONLY);
  291. if (fd < 0) {
  292. s = IOError(fname, errno);
  293. } else if (sizeof(void*) >= 8) {
  294. // Use mmap when virtual address-space is plentiful.
  295. uint64_t size;
  296. s = GetFileSize(fname, &size);
  297. if (s.ok()) {
  298. void* base = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0);
  299. if (base != MAP_FAILED) {
  300. *result = new PosixMmapReadableFile(fname, base, size);
  301. } else {
  302. s = IOError(fname, errno);
  303. }
  304. }
  305. close(fd);
  306. } else {
  307. *result = new PosixRandomAccessFile(fname, fd);
  308. }
  309. return s;
  310. }
  311. virtual Status NewWritableFile(const std::string& fname,
  312. WritableFile** result) {
  313. Status s;
  314. const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
  315. if (fd < 0) {
  316. *result = NULL;
  317. s = IOError(fname, errno);
  318. } else {
  319. *result = new PosixMmapFile(fname, fd, page_size_);
  320. }
  321. return s;
  322. }
  323. virtual bool FileExists(const std::string& fname) {
  324. return access(fname.c_str(), F_OK) == 0;
  325. }
  326. virtual Status GetChildren(const std::string& dir,
  327. std::vector<std::string>* result) {
  328. result->clear();
  329. DIR* d = opendir(dir.c_str());
  330. if (d == NULL) {
  331. return IOError(dir, errno);
  332. }
  333. struct dirent* entry;
  334. while ((entry = readdir(d)) != NULL) {
  335. result->push_back(entry->d_name);
  336. }
  337. closedir(d);
  338. return Status::OK();
  339. }
  340. virtual Status DeleteFile(const std::string& fname) {
  341. Status result;
  342. if (unlink(fname.c_str()) != 0) {
  343. result = IOError(fname, errno);
  344. }
  345. return result;
  346. };
  347. virtual Status CreateDir(const std::string& name) {
  348. Status result;
  349. if (mkdir(name.c_str(), 0755) != 0) {
  350. result = IOError(name, errno);
  351. }
  352. return result;
  353. };
  354. virtual Status DeleteDir(const std::string& name) {
  355. Status result;
  356. if (rmdir(name.c_str()) != 0) {
  357. result = IOError(name, errno);
  358. }
  359. return result;
  360. };
  361. virtual Status GetFileSize(const std::string& fname, uint64_t* size) {
  362. Status s;
  363. struct stat sbuf;
  364. if (stat(fname.c_str(), &sbuf) != 0) {
  365. *size = 0;
  366. s = IOError(fname, errno);
  367. } else {
  368. *size = sbuf.st_size;
  369. }
  370. return s;
  371. }
  372. virtual Status RenameFile(const std::string& src, const std::string& target) {
  373. Status result;
  374. if (rename(src.c_str(), target.c_str()) != 0) {
  375. result = IOError(src, errno);
  376. }
  377. return result;
  378. }
  379. virtual Status LockFile(const std::string& fname, FileLock** lock) {
  380. *lock = NULL;
  381. Status result;
  382. int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
  383. if (fd < 0) {
  384. result = IOError(fname, errno);
  385. } else if (LockOrUnlock(fd, true) == -1) {
  386. result = IOError("lock " + fname, errno);
  387. close(fd);
  388. } else {
  389. PosixFileLock* my_lock = new PosixFileLock;
  390. my_lock->fd_ = fd;
  391. *lock = my_lock;
  392. }
  393. return result;
  394. }
  395. virtual Status UnlockFile(FileLock* lock) {
  396. PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
  397. Status result;
  398. if (LockOrUnlock(my_lock->fd_, false) == -1) {
  399. result = IOError("unlock", errno);
  400. }
  401. close(my_lock->fd_);
  402. delete my_lock;
  403. return result;
  404. }
  405. virtual void Schedule(void (*function)(void*), void* arg);
  406. virtual void StartThread(void (*function)(void* arg), void* arg);
  407. virtual Status GetTestDirectory(std::string* result) {
  408. const char* env = getenv("TEST_TMPDIR");
  409. if (env && env[0] != '\0') {
  410. *result = env;
  411. } else {
  412. char buf[100];
  413. snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid()));
  414. *result = buf;
  415. }
  416. // Directory may already exist
  417. CreateDir(*result);
  418. return Status::OK();
  419. }
  420. static uint64_t gettid() {
  421. pthread_t tid = pthread_self();
  422. uint64_t thread_id = 0;
  423. memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
  424. return thread_id;
  425. }
  426. virtual Status NewLogger(const std::string& fname, Logger** result) {
  427. FILE* f = fopen(fname.c_str(), "w");
  428. if (f == NULL) {
  429. *result = NULL;
  430. return IOError(fname, errno);
  431. } else {
  432. *result = new PosixLogger(f, &PosixEnv::gettid);
  433. return Status::OK();
  434. }
  435. }
  436. virtual uint64_t NowMicros() {
  437. struct timeval tv;
  438. gettimeofday(&tv, NULL);
  439. return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
  440. }
  441. virtual void SleepForMicroseconds(int micros) {
  442. usleep(micros);
  443. }
  444. private:
  445. void PthreadCall(const char* label, int result) {
  446. if (result != 0) {
  447. fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
  448. exit(1);
  449. }
  450. }
  451. // BGThread() is the body of the background thread
  452. void BGThread();
  453. static void* BGThreadWrapper(void* arg) {
  454. reinterpret_cast<PosixEnv*>(arg)->BGThread();
  455. return NULL;
  456. }
  457. size_t page_size_;
  458. pthread_mutex_t mu_;
  459. pthread_cond_t bgsignal_;
  460. pthread_t bgthread_;
  461. bool started_bgthread_;
  462. // Entry per Schedule() call
  463. struct BGItem { void* arg; void (*function)(void*); };
  464. typedef std::deque<BGItem> BGQueue;
  465. BGQueue queue_;
  466. };
  467. PosixEnv::PosixEnv() : page_size_(getpagesize()),
  468. started_bgthread_(false) {
  469. PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
  470. PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
  471. }
  472. void PosixEnv::Schedule(void (*function)(void*), void* arg) {
  473. PthreadCall("lock", pthread_mutex_lock(&mu_));
  474. // Start background thread if necessary
  475. if (!started_bgthread_) {
  476. started_bgthread_ = true;
  477. PthreadCall(
  478. "create thread",
  479. pthread_create(&bgthread_, NULL, &PosixEnv::BGThreadWrapper, this));
  480. }
  481. // If the queue is currently empty, the background thread may currently be
  482. // waiting.
  483. if (queue_.empty()) {
  484. PthreadCall("signal", pthread_cond_signal(&bgsignal_));
  485. }
  486. // Add to priority queue
  487. queue_.push_back(BGItem());
  488. queue_.back().function = function;
  489. queue_.back().arg = arg;
  490. PthreadCall("unlock", pthread_mutex_unlock(&mu_));
  491. }
  492. void PosixEnv::BGThread() {
  493. while (true) {
  494. // Wait until there is an item that is ready to run
  495. PthreadCall("lock", pthread_mutex_lock(&mu_));
  496. while (queue_.empty()) {
  497. PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
  498. }
  499. void (*function)(void*) = queue_.front().function;
  500. void* arg = queue_.front().arg;
  501. queue_.pop_front();
  502. PthreadCall("unlock", pthread_mutex_unlock(&mu_));
  503. (*function)(arg);
  504. }
  505. }
  506. namespace {
  507. struct StartThreadState {
  508. void (*user_function)(void*);
  509. void* arg;
  510. };
  511. }
  512. static void* StartThreadWrapper(void* arg) {
  513. StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
  514. state->user_function(state->arg);
  515. delete state;
  516. return NULL;
  517. }
  518. void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
  519. pthread_t t;
  520. StartThreadState* state = new StartThreadState;
  521. state->user_function = function;
  522. state->arg = arg;
  523. PthreadCall("start thread",
  524. pthread_create(&t, NULL, &StartThreadWrapper, state));
  525. }
  526. } // namespace
  527. static pthread_once_t once = PTHREAD_ONCE_INIT;
  528. static Env* default_env;
  529. static void InitDefaultEnv() { default_env = new PosixEnv; }
  530. Env* Env::Default() {
  531. pthread_once(&once, InitDefaultEnv);
  532. return default_env;
  533. }
  534. } // namespace leveldb