10225501448 李度 10225101546 陈胤遒 10215501422 高宇菲
Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.

567 righe
14 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include <deque>
  5. #include <dirent.h>
  6. #include <errno.h>
  7. #include <fcntl.h>
  8. #include <pthread.h>
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <string.h>
  12. #include <sys/mman.h>
  13. #include <sys/stat.h>
  14. #include <sys/time.h>
  15. #include <sys/types.h>
  16. #include <time.h>
  17. #include <unistd.h>
  18. #if defined(LEVELDB_PLATFORM_ANDROID)
  19. #include <sys/stat.h>
  20. #endif
  21. #include "leveldb/env.h"
  22. #include "leveldb/slice.h"
  23. #include "port/port.h"
  24. #include "util/logging.h"
  25. #include "util/posix_logger.h"
  26. namespace leveldb {
  27. namespace {
  28. static Status IOError(const std::string& context, int err_number) {
  29. return Status::IOError(context, strerror(err_number));
  30. }
  31. class PosixSequentialFile: public SequentialFile {
  32. private:
  33. std::string filename_;
  34. FILE* file_;
  35. public:
  36. PosixSequentialFile(const std::string& fname, FILE* f)
  37. : filename_(fname), file_(f) { }
  38. virtual ~PosixSequentialFile() { fclose(file_); }
  39. virtual Status Read(size_t n, Slice* result, char* scratch) {
  40. Status s;
  41. size_t r = fread_unlocked(scratch, 1, n, file_);
  42. *result = Slice(scratch, r);
  43. if (r < n) {
  44. if (feof(file_)) {
  45. // We leave status as ok if we hit the end of the file
  46. } else {
  47. // A partial read with an error: return a non-ok status
  48. s = IOError(filename_, errno);
  49. }
  50. }
  51. return s;
  52. }
  53. virtual Status Skip(uint64_t n) {
  54. if (fseek(file_, n, SEEK_CUR)) {
  55. return IOError(filename_, errno);
  56. }
  57. return Status::OK();
  58. }
  59. };
  60. class PosixRandomAccessFile: public RandomAccessFile {
  61. private:
  62. std::string filename_;
  63. int fd_;
  64. public:
  65. PosixRandomAccessFile(const std::string& fname, int fd)
  66. : filename_(fname), fd_(fd) { }
  67. virtual ~PosixRandomAccessFile() { close(fd_); }
  68. virtual Status Read(uint64_t offset, size_t n, Slice* result,
  69. char* scratch) const {
  70. Status s;
  71. ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
  72. *result = Slice(scratch, (r < 0) ? 0 : r);
  73. if (r < 0) {
  74. // An error: return a non-ok status
  75. s = IOError(filename_, errno);
  76. }
  77. return s;
  78. }
  79. };
  80. // We preallocate up to an extra megabyte and use memcpy to append new
  81. // data to the file. This is safe since we either properly close the
  82. // file before reading from it, or for log files, the reading code
  83. // knows enough to skip zero suffixes.
  84. class PosixMmapFile : public WritableFile {
  85. private:
  86. std::string filename_;
  87. int fd_;
  88. size_t page_size_;
  89. size_t map_size_; // How much extra memory to map at a time
  90. char* base_; // The mapped region
  91. char* limit_; // Limit of the mapped region
  92. char* dst_; // Where to write next (in range [base_,limit_])
  93. char* last_sync_; // Where have we synced up to
  94. uint64_t file_offset_; // Offset of base_ in file
  95. // Have we done an munmap of unsynced data?
  96. bool pending_sync_;
  97. // Roundup x to a multiple of y
  98. static size_t Roundup(size_t x, size_t y) {
  99. return ((x + y - 1) / y) * y;
  100. }
  101. size_t TruncateToPageBoundary(size_t s) {
  102. s -= (s & (page_size_ - 1));
  103. assert((s % page_size_) == 0);
  104. return s;
  105. }
  106. bool UnmapCurrentRegion() {
  107. bool result = true;
  108. if (base_ != NULL) {
  109. if (last_sync_ < limit_) {
  110. // Defer syncing this data until next Sync() call, if any
  111. pending_sync_ = true;
  112. }
  113. if (munmap(base_, limit_ - base_) != 0) {
  114. result = false;
  115. }
  116. file_offset_ += limit_ - base_;
  117. base_ = NULL;
  118. limit_ = NULL;
  119. last_sync_ = NULL;
  120. dst_ = NULL;
  121. // Increase the amount we map the next time, but capped at 1MB
  122. if (map_size_ < (1<<20)) {
  123. map_size_ *= 2;
  124. }
  125. }
  126. return result;
  127. }
  128. bool MapNewRegion() {
  129. assert(base_ == NULL);
  130. if (ftruncate(fd_, file_offset_ + map_size_) < 0) {
  131. return false;
  132. }
  133. void* ptr = mmap(NULL, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED,
  134. fd_, file_offset_);
  135. if (ptr == MAP_FAILED) {
  136. return false;
  137. }
  138. base_ = reinterpret_cast<char*>(ptr);
  139. limit_ = base_ + map_size_;
  140. dst_ = base_;
  141. last_sync_ = base_;
  142. return true;
  143. }
  144. public:
  145. PosixMmapFile(const std::string& fname, int fd, size_t page_size)
  146. : filename_(fname),
  147. fd_(fd),
  148. page_size_(page_size),
  149. map_size_(Roundup(65536, page_size)),
  150. base_(NULL),
  151. limit_(NULL),
  152. dst_(NULL),
  153. last_sync_(NULL),
  154. file_offset_(0),
  155. pending_sync_(false) {
  156. assert((page_size & (page_size - 1)) == 0);
  157. }
  158. ~PosixMmapFile() {
  159. if (fd_ >= 0) {
  160. PosixMmapFile::Close();
  161. }
  162. }
  163. virtual Status Append(const Slice& data) {
  164. const char* src = data.data();
  165. size_t left = data.size();
  166. while (left > 0) {
  167. assert(base_ <= dst_);
  168. assert(dst_ <= limit_);
  169. size_t avail = limit_ - dst_;
  170. if (avail == 0) {
  171. if (!UnmapCurrentRegion() ||
  172. !MapNewRegion()) {
  173. return IOError(filename_, errno);
  174. }
  175. }
  176. size_t n = (left <= avail) ? left : avail;
  177. memcpy(dst_, src, n);
  178. dst_ += n;
  179. src += n;
  180. left -= n;
  181. }
  182. return Status::OK();
  183. }
  184. virtual Status Close() {
  185. Status s;
  186. size_t unused = limit_ - dst_;
  187. if (!UnmapCurrentRegion()) {
  188. s = IOError(filename_, errno);
  189. } else if (unused > 0) {
  190. // Trim the extra space at the end of the file
  191. if (ftruncate(fd_, file_offset_ - unused) < 0) {
  192. s = IOError(filename_, errno);
  193. }
  194. }
  195. if (close(fd_) < 0) {
  196. if (s.ok()) {
  197. s = IOError(filename_, errno);
  198. }
  199. }
  200. fd_ = -1;
  201. base_ = NULL;
  202. limit_ = NULL;
  203. return s;
  204. }
  205. virtual Status Flush() {
  206. return Status::OK();
  207. }
  208. virtual Status Sync() {
  209. Status s;
  210. if (pending_sync_) {
  211. // Some unmapped data was not synced
  212. pending_sync_ = false;
  213. if (fdatasync(fd_) < 0) {
  214. s = IOError(filename_, errno);
  215. }
  216. }
  217. if (dst_ > last_sync_) {
  218. // Find the beginnings of the pages that contain the first and last
  219. // bytes to be synced.
  220. size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
  221. size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
  222. last_sync_ = dst_;
  223. if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
  224. s = IOError(filename_, errno);
  225. }
  226. }
  227. return s;
  228. }
  229. };
  230. static int LockOrUnlock(int fd, bool lock) {
  231. errno = 0;
  232. struct flock f;
  233. memset(&f, 0, sizeof(f));
  234. f.l_type = (lock ? F_WRLCK : F_UNLCK);
  235. f.l_whence = SEEK_SET;
  236. f.l_start = 0;
  237. f.l_len = 0; // Lock/unlock entire file
  238. return fcntl(fd, F_SETLK, &f);
  239. }
  240. class PosixFileLock : public FileLock {
  241. public:
  242. int fd_;
  243. };
  244. class PosixEnv : public Env {
  245. public:
  246. PosixEnv();
  247. virtual ~PosixEnv() {
  248. fprintf(stderr, "Destroying Env::Default()\n");
  249. exit(1);
  250. }
  251. virtual Status NewSequentialFile(const std::string& fname,
  252. SequentialFile** result) {
  253. FILE* f = fopen(fname.c_str(), "r");
  254. if (f == NULL) {
  255. *result = NULL;
  256. return IOError(fname, errno);
  257. } else {
  258. *result = new PosixSequentialFile(fname, f);
  259. return Status::OK();
  260. }
  261. }
  262. virtual Status NewRandomAccessFile(const std::string& fname,
  263. RandomAccessFile** result) {
  264. int fd = open(fname.c_str(), O_RDONLY);
  265. if (fd < 0) {
  266. *result = NULL;
  267. return IOError(fname, errno);
  268. }
  269. *result = new PosixRandomAccessFile(fname, fd);
  270. return Status::OK();
  271. }
  272. virtual Status NewWritableFile(const std::string& fname,
  273. WritableFile** result) {
  274. Status s;
  275. const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
  276. if (fd < 0) {
  277. *result = NULL;
  278. s = IOError(fname, errno);
  279. } else {
  280. *result = new PosixMmapFile(fname, fd, page_size_);
  281. }
  282. return s;
  283. }
  284. virtual bool FileExists(const std::string& fname) {
  285. return access(fname.c_str(), F_OK) == 0;
  286. }
  287. virtual Status GetChildren(const std::string& dir,
  288. std::vector<std::string>* result) {
  289. result->clear();
  290. DIR* d = opendir(dir.c_str());
  291. if (d == NULL) {
  292. return IOError(dir, errno);
  293. }
  294. struct dirent* entry;
  295. while ((entry = readdir(d)) != NULL) {
  296. result->push_back(entry->d_name);
  297. }
  298. closedir(d);
  299. return Status::OK();
  300. }
  301. virtual Status DeleteFile(const std::string& fname) {
  302. Status result;
  303. if (unlink(fname.c_str()) != 0) {
  304. result = IOError(fname, errno);
  305. }
  306. return result;
  307. };
  308. virtual Status CreateDir(const std::string& name) {
  309. Status result;
  310. if (mkdir(name.c_str(), 0755) != 0) {
  311. result = IOError(name, errno);
  312. }
  313. return result;
  314. };
  315. virtual Status DeleteDir(const std::string& name) {
  316. Status result;
  317. if (rmdir(name.c_str()) != 0) {
  318. result = IOError(name, errno);
  319. }
  320. return result;
  321. };
  322. virtual Status GetFileSize(const std::string& fname, uint64_t* size) {
  323. Status s;
  324. struct stat sbuf;
  325. if (stat(fname.c_str(), &sbuf) != 0) {
  326. *size = 0;
  327. s = IOError(fname, errno);
  328. } else {
  329. *size = sbuf.st_size;
  330. }
  331. return s;
  332. }
  333. virtual Status RenameFile(const std::string& src, const std::string& target) {
  334. Status result;
  335. if (rename(src.c_str(), target.c_str()) != 0) {
  336. result = IOError(src, errno);
  337. }
  338. return result;
  339. }
  340. virtual Status LockFile(const std::string& fname, FileLock** lock) {
  341. *lock = NULL;
  342. Status result;
  343. int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
  344. if (fd < 0) {
  345. result = IOError(fname, errno);
  346. } else if (LockOrUnlock(fd, true) == -1) {
  347. result = IOError("lock " + fname, errno);
  348. close(fd);
  349. } else {
  350. PosixFileLock* my_lock = new PosixFileLock;
  351. my_lock->fd_ = fd;
  352. *lock = my_lock;
  353. }
  354. return result;
  355. }
  356. virtual Status UnlockFile(FileLock* lock) {
  357. PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
  358. Status result;
  359. if (LockOrUnlock(my_lock->fd_, false) == -1) {
  360. result = IOError("unlock", errno);
  361. }
  362. close(my_lock->fd_);
  363. delete my_lock;
  364. return result;
  365. }
  366. virtual void Schedule(void (*function)(void*), void* arg);
  367. virtual void StartThread(void (*function)(void* arg), void* arg);
  368. virtual Status GetTestDirectory(std::string* result) {
  369. const char* env = getenv("TEST_TMPDIR");
  370. if (env && env[0] != '\0') {
  371. *result = env;
  372. } else {
  373. char buf[100];
  374. snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid()));
  375. *result = buf;
  376. }
  377. // Directory may already exist
  378. CreateDir(*result);
  379. return Status::OK();
  380. }
  381. static uint64_t gettid() {
  382. pthread_t tid = pthread_self();
  383. uint64_t thread_id = 0;
  384. memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
  385. return thread_id;
  386. }
  387. virtual Status NewLogger(const std::string& fname, Logger** result) {
  388. FILE* f = fopen(fname.c_str(), "w");
  389. if (f == NULL) {
  390. *result = NULL;
  391. return IOError(fname, errno);
  392. } else {
  393. *result = new PosixLogger(f, &PosixEnv::gettid);
  394. return Status::OK();
  395. }
  396. }
  397. virtual uint64_t NowMicros() {
  398. struct timeval tv;
  399. gettimeofday(&tv, NULL);
  400. return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
  401. }
  402. virtual void SleepForMicroseconds(int micros) {
  403. usleep(micros);
  404. }
  405. private:
  406. void PthreadCall(const char* label, int result) {
  407. if (result != 0) {
  408. fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
  409. exit(1);
  410. }
  411. }
  412. // BGThread() is the body of the background thread
  413. void BGThread();
  414. static void* BGThreadWrapper(void* arg) {
  415. reinterpret_cast<PosixEnv*>(arg)->BGThread();
  416. return NULL;
  417. }
  418. size_t page_size_;
  419. pthread_mutex_t mu_;
  420. pthread_cond_t bgsignal_;
  421. pthread_t bgthread_;
  422. bool started_bgthread_;
  423. // Entry per Schedule() call
  424. struct BGItem { void* arg; void (*function)(void*); };
  425. typedef std::deque<BGItem> BGQueue;
  426. BGQueue queue_;
  427. };
  428. PosixEnv::PosixEnv() : page_size_(getpagesize()),
  429. started_bgthread_(false) {
  430. PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
  431. PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
  432. }
  433. void PosixEnv::Schedule(void (*function)(void*), void* arg) {
  434. PthreadCall("lock", pthread_mutex_lock(&mu_));
  435. // Start background thread if necessary
  436. if (!started_bgthread_) {
  437. started_bgthread_ = true;
  438. PthreadCall(
  439. "create thread",
  440. pthread_create(&bgthread_, NULL, &PosixEnv::BGThreadWrapper, this));
  441. }
  442. // If the queue is currently empty, the background thread may currently be
  443. // waiting.
  444. if (queue_.empty()) {
  445. PthreadCall("signal", pthread_cond_signal(&bgsignal_));
  446. }
  447. // Add to priority queue
  448. queue_.push_back(BGItem());
  449. queue_.back().function = function;
  450. queue_.back().arg = arg;
  451. PthreadCall("unlock", pthread_mutex_unlock(&mu_));
  452. }
  453. void PosixEnv::BGThread() {
  454. while (true) {
  455. // Wait until there is an item that is ready to run
  456. PthreadCall("lock", pthread_mutex_lock(&mu_));
  457. while (queue_.empty()) {
  458. PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
  459. }
  460. void (*function)(void*) = queue_.front().function;
  461. void* arg = queue_.front().arg;
  462. queue_.pop_front();
  463. PthreadCall("unlock", pthread_mutex_unlock(&mu_));
  464. (*function)(arg);
  465. }
  466. }
  467. namespace {
  468. struct StartThreadState {
  469. void (*user_function)(void*);
  470. void* arg;
  471. };
  472. }
  473. static void* StartThreadWrapper(void* arg) {
  474. StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
  475. state->user_function(state->arg);
  476. delete state;
  477. return NULL;
  478. }
  479. void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
  480. pthread_t t;
  481. StartThreadState* state = new StartThreadState;
  482. state->user_function = function;
  483. state->arg = arg;
  484. PthreadCall("start thread",
  485. pthread_create(&t, NULL, &StartThreadWrapper, state));
  486. }
  487. }
  488. static pthread_once_t once = PTHREAD_ONCE_INIT;
  489. static Env* default_env;
  490. static void InitDefaultEnv() { default_env = new PosixEnv; }
  491. Env* Env::Default() {
  492. pthread_once(&once, InitDefaultEnv);
  493. return default_env;
  494. }
  495. }