小组成员:10215300402-朱维清 & 10222140408 谷杰
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

617 lines
16 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include <deque>
  5. #include <dirent.h>
  6. #include <errno.h>
  7. #include <fcntl.h>
  8. #include <pthread.h>
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <string.h>
  12. #include <sys/mman.h>
  13. #include <sys/stat.h>
  14. #include <sys/time.h>
  15. #include <sys/types.h>
  16. #include <time.h>
  17. #include <unistd.h>
  18. #if defined(LEVELDB_PLATFORM_ANDROID)
  19. #include <sys/stat.h>
  20. #endif
  21. #include "leveldb/env.h"
  22. #include "leveldb/slice.h"
  23. #include "port/port.h"
  24. #include "util/logging.h"
  25. namespace leveldb {
  26. namespace {
  27. static Status IOError(const std::string& context, int err_number) {
  28. return Status::IOError(context, strerror(err_number));
  29. }
  30. class PosixSequentialFile: public SequentialFile {
  31. private:
  32. std::string filename_;
  33. FILE* file_;
  34. public:
  35. PosixSequentialFile(const std::string& fname, FILE* f)
  36. : filename_(fname), file_(f) { }
  37. virtual ~PosixSequentialFile() { fclose(file_); }
  38. virtual Status Read(size_t n, Slice* result, char* scratch) {
  39. Status s;
  40. size_t r = fread_unlocked(scratch, 1, n, file_);
  41. *result = Slice(scratch, r);
  42. if (r < n) {
  43. if (feof(file_)) {
  44. // We leave status as ok if we hit the end of the file
  45. } else {
  46. // A partial read with an error: return a non-ok status
  47. s = IOError(filename_, errno);
  48. }
  49. }
  50. return s;
  51. }
  52. virtual Status Skip(uint64_t n) {
  53. if (fseek(file_, n, SEEK_CUR)) {
  54. return IOError(filename_, errno);
  55. }
  56. return Status::OK();
  57. }
  58. };
  59. class PosixRandomAccessFile: public RandomAccessFile {
  60. private:
  61. std::string filename_;
  62. int fd_;
  63. public:
  64. PosixRandomAccessFile(const std::string& fname, int fd)
  65. : filename_(fname), fd_(fd) { }
  66. virtual ~PosixRandomAccessFile() { close(fd_); }
  67. virtual Status Read(uint64_t offset, size_t n, Slice* result,
  68. char* scratch) const {
  69. Status s;
  70. ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
  71. *result = Slice(scratch, (r < 0) ? 0 : r);
  72. if (r < 0) {
  73. // An error: return a non-ok status
  74. s = IOError(filename_, errno);
  75. }
  76. return s;
  77. }
  78. };
  79. // We preallocate up to an extra megabyte and use memcpy to append new
  80. // data to the file. This is safe since we either properly close the
  81. // file before reading from it, or for log files, the reading code
  82. // knows enough to skip zero suffixes.
  83. class PosixMmapFile : public WritableFile {
  84. private:
  85. std::string filename_;
  86. int fd_;
  87. size_t page_size_;
  88. size_t map_size_; // How much extra memory to map at a time
  89. char* base_; // The mapped region
  90. char* limit_; // Limit of the mapped region
  91. char* dst_; // Where to write next (in range [base_,limit_])
  92. char* last_sync_; // Where have we synced up to
  93. uint64_t file_offset_; // Offset of base_ in file
  94. // Have we done an munmap of unsynced data?
  95. bool pending_sync_;
  96. // Roundup x to a multiple of y
  97. static size_t Roundup(size_t x, size_t y) {
  98. return ((x + y - 1) / y) * y;
  99. }
  100. size_t TruncateToPageBoundary(size_t s) {
  101. s -= (s & (page_size_ - 1));
  102. assert((s % page_size_) == 0);
  103. return s;
  104. }
  105. bool UnmapCurrentRegion() {
  106. bool result = true;
  107. if (base_ != NULL) {
  108. if (last_sync_ < limit_) {
  109. // Defer syncing this data until next Sync() call, if any
  110. pending_sync_ = true;
  111. }
  112. if (munmap(base_, limit_ - base_) != 0) {
  113. result = false;
  114. }
  115. file_offset_ += limit_ - base_;
  116. base_ = NULL;
  117. limit_ = NULL;
  118. last_sync_ = NULL;
  119. dst_ = NULL;
  120. // Increase the amount we map the next time, but capped at 1MB
  121. if (map_size_ < (1<<20)) {
  122. map_size_ *= 2;
  123. }
  124. }
  125. return result;
  126. }
  127. bool MapNewRegion() {
  128. assert(base_ == NULL);
  129. if (ftruncate(fd_, file_offset_ + map_size_) < 0) {
  130. return false;
  131. }
  132. void* ptr = mmap(NULL, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED,
  133. fd_, file_offset_);
  134. if (ptr == MAP_FAILED) {
  135. return false;
  136. }
  137. base_ = reinterpret_cast<char*>(ptr);
  138. limit_ = base_ + map_size_;
  139. dst_ = base_;
  140. last_sync_ = base_;
  141. return true;
  142. }
  143. public:
  144. PosixMmapFile(const std::string& fname, int fd, size_t page_size)
  145. : filename_(fname),
  146. fd_(fd),
  147. page_size_(page_size),
  148. map_size_(Roundup(65536, page_size)),
  149. base_(NULL),
  150. limit_(NULL),
  151. dst_(NULL),
  152. last_sync_(NULL),
  153. file_offset_(0),
  154. pending_sync_(false) {
  155. assert((page_size & (page_size - 1)) == 0);
  156. }
  157. ~PosixMmapFile() {
  158. if (fd_ >= 0) {
  159. PosixMmapFile::Close();
  160. }
  161. }
  162. virtual Status Append(const Slice& data) {
  163. const char* src = data.data();
  164. size_t left = data.size();
  165. while (left > 0) {
  166. assert(base_ <= dst_);
  167. assert(dst_ <= limit_);
  168. size_t avail = limit_ - dst_;
  169. if (avail == 0) {
  170. if (!UnmapCurrentRegion() ||
  171. !MapNewRegion()) {
  172. return IOError(filename_, errno);
  173. }
  174. }
  175. size_t n = (left <= avail) ? left : avail;
  176. memcpy(dst_, src, n);
  177. dst_ += n;
  178. src += n;
  179. left -= n;
  180. }
  181. return Status::OK();
  182. }
  183. virtual Status Close() {
  184. Status s;
  185. size_t unused = limit_ - dst_;
  186. if (!UnmapCurrentRegion()) {
  187. s = IOError(filename_, errno);
  188. } else if (unused > 0) {
  189. // Trim the extra space at the end of the file
  190. if (ftruncate(fd_, file_offset_ - unused) < 0) {
  191. s = IOError(filename_, errno);
  192. }
  193. }
  194. if (close(fd_) < 0) {
  195. if (s.ok()) {
  196. s = IOError(filename_, errno);
  197. }
  198. }
  199. fd_ = -1;
  200. base_ = NULL;
  201. limit_ = NULL;
  202. return s;
  203. }
  204. virtual Status Flush() {
  205. return Status::OK();
  206. }
  207. virtual Status Sync() {
  208. Status s;
  209. if (pending_sync_) {
  210. // Some unmapped data was not synced
  211. pending_sync_ = false;
  212. if (fdatasync(fd_) < 0) {
  213. s = IOError(filename_, errno);
  214. }
  215. }
  216. if (dst_ > last_sync_) {
  217. // Find the beginnings of the pages that contain the first and last
  218. // bytes to be synced.
  219. size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
  220. size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
  221. last_sync_ = dst_;
  222. if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
  223. s = IOError(filename_, errno);
  224. }
  225. }
  226. return s;
  227. }
  228. };
  229. static int LockOrUnlock(int fd, bool lock) {
  230. errno = 0;
  231. struct flock f;
  232. memset(&f, 0, sizeof(f));
  233. f.l_type = (lock ? F_WRLCK : F_UNLCK);
  234. f.l_whence = SEEK_SET;
  235. f.l_start = 0;
  236. f.l_len = 0; // Lock/unlock entire file
  237. return fcntl(fd, F_SETLK, &f);
  238. }
  239. class PosixFileLock : public FileLock {
  240. public:
  241. int fd_;
  242. };
  243. class PosixEnv : public Env {
  244. public:
  245. PosixEnv();
  246. virtual ~PosixEnv() {
  247. fprintf(stderr, "Destroying Env::Default()\n");
  248. exit(1);
  249. }
  250. virtual Status NewSequentialFile(const std::string& fname,
  251. SequentialFile** result) {
  252. FILE* f = fopen(fname.c_str(), "r");
  253. if (f == NULL) {
  254. *result = NULL;
  255. return IOError(fname, errno);
  256. } else {
  257. *result = new PosixSequentialFile(fname, f);
  258. return Status::OK();
  259. }
  260. }
  261. virtual Status NewRandomAccessFile(const std::string& fname,
  262. RandomAccessFile** result) {
  263. int fd = open(fname.c_str(), O_RDONLY);
  264. if (fd < 0) {
  265. *result = NULL;
  266. return IOError(fname, errno);
  267. }
  268. *result = new PosixRandomAccessFile(fname, fd);
  269. return Status::OK();
  270. }
  271. virtual Status NewWritableFile(const std::string& fname,
  272. WritableFile** result) {
  273. Status s;
  274. const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
  275. if (fd < 0) {
  276. *result = NULL;
  277. s = IOError(fname, errno);
  278. } else {
  279. *result = new PosixMmapFile(fname, fd, page_size_);
  280. }
  281. return s;
  282. }
  283. virtual bool FileExists(const std::string& fname) {
  284. return access(fname.c_str(), F_OK) == 0;
  285. }
  286. virtual Status GetChildren(const std::string& dir,
  287. std::vector<std::string>* result) {
  288. result->clear();
  289. DIR* d = opendir(dir.c_str());
  290. if (d == NULL) {
  291. return IOError(dir, errno);
  292. }
  293. struct dirent* entry;
  294. while ((entry = readdir(d)) != NULL) {
  295. result->push_back(entry->d_name);
  296. }
  297. closedir(d);
  298. return Status::OK();
  299. }
  300. virtual Status DeleteFile(const std::string& fname) {
  301. Status result;
  302. if (unlink(fname.c_str()) != 0) {
  303. result = IOError(fname, errno);
  304. }
  305. return result;
  306. };
  307. virtual Status CreateDir(const std::string& name) {
  308. Status result;
  309. if (mkdir(name.c_str(), 0755) != 0) {
  310. result = IOError(name, errno);
  311. }
  312. return result;
  313. };
  314. virtual Status DeleteDir(const std::string& name) {
  315. Status result;
  316. if (rmdir(name.c_str()) != 0) {
  317. result = IOError(name, errno);
  318. }
  319. return result;
  320. };
  321. virtual Status GetFileSize(const std::string& fname, uint64_t* size) {
  322. Status s;
  323. struct stat sbuf;
  324. if (stat(fname.c_str(), &sbuf) != 0) {
  325. *size = 0;
  326. s = IOError(fname, errno);
  327. } else {
  328. *size = sbuf.st_size;
  329. }
  330. return s;
  331. }
  332. virtual Status RenameFile(const std::string& src, const std::string& target) {
  333. Status result;
  334. if (rename(src.c_str(), target.c_str()) != 0) {
  335. result = IOError(src, errno);
  336. }
  337. return result;
  338. }
  339. virtual Status LockFile(const std::string& fname, FileLock** lock) {
  340. *lock = NULL;
  341. Status result;
  342. int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
  343. if (fd < 0) {
  344. result = IOError(fname, errno);
  345. } else if (LockOrUnlock(fd, true) == -1) {
  346. result = IOError("lock " + fname, errno);
  347. close(fd);
  348. } else {
  349. PosixFileLock* my_lock = new PosixFileLock;
  350. my_lock->fd_ = fd;
  351. *lock = my_lock;
  352. }
  353. return result;
  354. }
  355. virtual Status UnlockFile(FileLock* lock) {
  356. PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
  357. Status result;
  358. if (LockOrUnlock(my_lock->fd_, false) == -1) {
  359. result = IOError("unlock", errno);
  360. }
  361. close(my_lock->fd_);
  362. delete my_lock;
  363. return result;
  364. }
  365. virtual void Schedule(void (*function)(void*), void* arg);
  366. virtual void StartThread(void (*function)(void* arg), void* arg);
  367. virtual Status GetTestDirectory(std::string* result) {
  368. const char* env = getenv("TEST_TMPDIR");
  369. if (env && env[0] != '\0') {
  370. *result = env;
  371. } else {
  372. char buf[100];
  373. snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid()));
  374. *result = buf;
  375. }
  376. // Directory may already exist
  377. CreateDir(*result);
  378. return Status::OK();
  379. }
  380. virtual void Logv(WritableFile* info_log, const char* format, va_list ap) {
  381. pthread_t tid = pthread_self();
  382. uint64_t thread_id = 0;
  383. memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
  384. // We try twice: the first time with a fixed-size stack allocated buffer,
  385. // and the second time with a much larger dynamically allocated buffer.
  386. char buffer[500];
  387. for (int iter = 0; iter < 2; iter++) {
  388. char* base;
  389. int bufsize;
  390. if (iter == 0) {
  391. bufsize = sizeof(buffer);
  392. base = buffer;
  393. } else {
  394. bufsize = 30000;
  395. base = new char[bufsize];
  396. }
  397. char* p = base;
  398. char* limit = base + bufsize;
  399. struct timeval now_tv;
  400. gettimeofday(&now_tv, NULL);
  401. const time_t seconds = now_tv.tv_sec;
  402. struct tm t;
  403. localtime_r(&seconds, &t);
  404. p += snprintf(p, limit - p,
  405. "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
  406. t.tm_year + 1900,
  407. t.tm_mon + 1,
  408. t.tm_mday,
  409. t.tm_hour,
  410. t.tm_min,
  411. t.tm_sec,
  412. static_cast<int>(now_tv.tv_usec),
  413. static_cast<long long unsigned int>(thread_id));
  414. // Print the message
  415. if (p < limit) {
  416. va_list backup_ap;
  417. va_copy(backup_ap, ap);
  418. p += vsnprintf(p, limit - p, format, backup_ap);
  419. va_end(backup_ap);
  420. }
  421. // Truncate to available space if necessary
  422. if (p >= limit) {
  423. if (iter == 0) {
  424. continue; // Try again with larger buffer
  425. } else {
  426. p = limit - 1;
  427. }
  428. }
  429. // Add newline if necessary
  430. if (p == base || p[-1] != '\n') {
  431. *p++ = '\n';
  432. }
  433. assert(p <= limit);
  434. info_log->Append(Slice(base, p - base));
  435. info_log->Flush();
  436. if (base != buffer) {
  437. delete[] base;
  438. }
  439. break;
  440. }
  441. }
  442. virtual uint64_t NowMicros() {
  443. struct timeval tv;
  444. gettimeofday(&tv, NULL);
  445. return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
  446. }
  447. virtual void SleepForMicroseconds(int micros) {
  448. usleep(micros);
  449. }
  450. private:
  451. void PthreadCall(const char* label, int result) {
  452. if (result != 0) {
  453. fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
  454. exit(1);
  455. }
  456. }
  457. // BGThread() is the body of the background thread
  458. void BGThread();
  459. static void* BGThreadWrapper(void* arg) {
  460. reinterpret_cast<PosixEnv*>(arg)->BGThread();
  461. return NULL;
  462. }
  463. size_t page_size_;
  464. pthread_mutex_t mu_;
  465. pthread_cond_t bgsignal_;
  466. pthread_t bgthread_;
  467. bool started_bgthread_;
  468. // Entry per Schedule() call
  469. struct BGItem { void* arg; void (*function)(void*); };
  470. typedef std::deque<BGItem> BGQueue;
  471. BGQueue queue_;
  472. };
  473. PosixEnv::PosixEnv() : page_size_(getpagesize()),
  474. started_bgthread_(false) {
  475. PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
  476. PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
  477. }
  478. void PosixEnv::Schedule(void (*function)(void*), void* arg) {
  479. PthreadCall("lock", pthread_mutex_lock(&mu_));
  480. // Start background thread if necessary
  481. if (!started_bgthread_) {
  482. started_bgthread_ = true;
  483. PthreadCall(
  484. "create thread",
  485. pthread_create(&bgthread_, NULL, &PosixEnv::BGThreadWrapper, this));
  486. }
  487. // If the queue is currently empty, the background thread may currently be
  488. // waiting.
  489. if (queue_.empty()) {
  490. PthreadCall("signal", pthread_cond_signal(&bgsignal_));
  491. }
  492. // Add to priority queue
  493. queue_.push_back(BGItem());
  494. queue_.back().function = function;
  495. queue_.back().arg = arg;
  496. PthreadCall("unlock", pthread_mutex_unlock(&mu_));
  497. }
  498. void PosixEnv::BGThread() {
  499. while (true) {
  500. // Wait until there is an item that is ready to run
  501. PthreadCall("lock", pthread_mutex_lock(&mu_));
  502. while (queue_.empty()) {
  503. PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
  504. }
  505. void (*function)(void*) = queue_.front().function;
  506. void* arg = queue_.front().arg;
  507. queue_.pop_front();
  508. PthreadCall("unlock", pthread_mutex_unlock(&mu_));
  509. (*function)(arg);
  510. }
  511. }
  512. namespace {
  513. struct StartThreadState {
  514. void (*user_function)(void*);
  515. void* arg;
  516. };
  517. }
  518. static void* StartThreadWrapper(void* arg) {
  519. StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
  520. state->user_function(state->arg);
  521. delete state;
  522. return NULL;
  523. }
  524. void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
  525. pthread_t t;
  526. StartThreadState* state = new StartThreadState;
  527. state->user_function = function;
  528. state->arg = arg;
  529. PthreadCall("start thread",
  530. pthread_create(&t, NULL, &StartThreadWrapper, state));
  531. }
  532. }
  533. static pthread_once_t once = PTHREAD_ONCE_INIT;
  534. static Env* default_env;
  535. static void InitDefaultEnv() { default_env = new PosixEnv; }
  536. Env* Env::Default() {
  537. pthread_once(&once, InitDefaultEnv);
  538. return default_env;
  539. }
  540. }