作者: 谢瑞阳 10225101483 徐翔宇 10225101535
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

599 lines
15 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include <deque>
  5. #include <dirent.h>
  6. #include <errno.h>
  7. #include <fcntl.h>
  8. #include <pthread.h>
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <string.h>
  12. #include <sys/mman.h>
  13. #include <sys/stat.h>
  14. #include <sys/time.h>
  15. #include <sys/types.h>
  16. #include <time.h>
  17. #include <unistd.h>
  18. #if defined(LEVELDB_PLATFORM_ANDROID)
  19. #include <sys/stat.h>
  20. #endif
  21. #include "leveldb/env.h"
  22. #include "leveldb/slice.h"
  23. #include "port/port.h"
  24. #include "util/logging.h"
  25. namespace leveldb {
  26. namespace {
  27. class PosixSequentialFile: public SequentialFile {
  28. private:
  29. std::string filename_;
  30. FILE* file_;
  31. public:
  32. PosixSequentialFile(const std::string& fname, FILE* f)
  33. : filename_(fname), file_(f) { }
  34. virtual ~PosixSequentialFile() { fclose(file_); }
  35. virtual Status Read(size_t n, Slice* result, char* scratch) {
  36. Status s;
  37. size_t r = fread_unlocked(scratch, 1, n, file_);
  38. *result = Slice(scratch, r);
  39. if (r < n) {
  40. if (feof(file_)) {
  41. // We leave status as ok if we hit the end of the file
  42. } else {
  43. // A partial read with an error: return a non-ok status
  44. s = Status::IOError(filename_, strerror(errno));
  45. }
  46. }
  47. return s;
  48. }
  49. };
  50. class PosixRandomAccessFile: public RandomAccessFile {
  51. private:
  52. std::string filename_;
  53. int fd_;
  54. public:
  55. PosixRandomAccessFile(const std::string& fname, int fd)
  56. : filename_(fname), fd_(fd) { }
  57. virtual ~PosixRandomAccessFile() { close(fd_); }
  58. virtual Status Read(uint64_t offset, size_t n, Slice* result,
  59. char* scratch) const {
  60. Status s;
  61. ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
  62. *result = Slice(scratch, (r < 0) ? 0 : r);
  63. if (r < 0) {
  64. // An error: return a non-ok status
  65. s = Status::IOError(filename_, strerror(errno));
  66. }
  67. return s;
  68. }
  69. };
  70. // We preallocate up to an extra megabyte and use memcpy to append new
  71. // data to the file. This is safe since we either properly close the
  72. // file before reading from it, or for log files, the reading code
  73. // knows enough to skip zero suffixes.
  74. class PosixMmapFile : public WritableFile {
  75. private:
  76. std::string filename_;
  77. int fd_;
  78. size_t page_size_;
  79. size_t map_size_; // How much extra memory to map at a time
  80. char* base_; // The mapped region
  81. char* limit_; // Limit of the mapped region
  82. char* dst_; // Where to write next (in range [base_,limit_])
  83. char* last_sync_; // Where have we synced up to
  84. uint64_t file_offset_; // Offset of base_ in file
  85. // Have we done an munmap of unsynced data?
  86. bool pending_sync_;
  87. // Roundup x to a multiple of y
  88. static size_t Roundup(size_t x, size_t y) {
  89. return ((x + y - 1) / y) * y;
  90. }
  91. size_t TruncateToPageBoundary(size_t s) {
  92. s -= (s & (page_size_ - 1));
  93. assert((s % page_size_) == 0);
  94. return s;
  95. }
  96. void UnmapCurrentRegion() {
  97. if (base_ != NULL) {
  98. if (last_sync_ < limit_) {
  99. // Defer syncing this data until next Sync() call, if any
  100. pending_sync_ = true;
  101. }
  102. munmap(base_, limit_ - base_);
  103. file_offset_ += limit_ - base_;
  104. base_ = NULL;
  105. limit_ = NULL;
  106. last_sync_ = NULL;
  107. dst_ = NULL;
  108. // Increase the amount we map the next time, but capped at 1MB
  109. if (map_size_ < (1<<20)) {
  110. map_size_ *= 2;
  111. }
  112. }
  113. }
  114. bool MapNewRegion() {
  115. assert(base_ == NULL);
  116. if (ftruncate(fd_, file_offset_ + map_size_) < 0) {
  117. return false;
  118. }
  119. void* ptr = mmap(NULL, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED,
  120. fd_, file_offset_);
  121. if (ptr == MAP_FAILED) {
  122. return false;
  123. }
  124. base_ = reinterpret_cast<char*>(ptr);
  125. limit_ = base_ + map_size_;
  126. dst_ = base_;
  127. last_sync_ = base_;
  128. return true;
  129. }
  130. public:
  131. PosixMmapFile(const std::string& fname, int fd, size_t page_size)
  132. : filename_(fname),
  133. fd_(fd),
  134. page_size_(page_size),
  135. map_size_(Roundup(65536, page_size)),
  136. base_(NULL),
  137. limit_(NULL),
  138. dst_(NULL),
  139. last_sync_(NULL),
  140. file_offset_(0),
  141. pending_sync_(false) {
  142. assert((page_size & (page_size - 1)) == 0);
  143. }
  144. ~PosixMmapFile() {
  145. if (fd_ >= 0) {
  146. PosixMmapFile::Close();
  147. }
  148. }
  149. virtual Status Append(const Slice& data) {
  150. const char* src = data.data();
  151. size_t left = data.size();
  152. while (left > 0) {
  153. assert(base_ <= dst_);
  154. assert(dst_ <= limit_);
  155. size_t avail = limit_ - dst_;
  156. if (avail == 0) {
  157. UnmapCurrentRegion();
  158. MapNewRegion();
  159. }
  160. size_t n = (left <= avail) ? left : avail;
  161. memcpy(dst_, src, n);
  162. dst_ += n;
  163. src += n;
  164. left -= n;
  165. }
  166. return Status::OK();
  167. }
  168. virtual Status Close() {
  169. Status s;
  170. size_t unused = limit_ - dst_;
  171. UnmapCurrentRegion();
  172. if (unused > 0) {
  173. // Trim the extra space at the end of the file
  174. if (ftruncate(fd_, file_offset_ - unused) < 0) {
  175. s = Status::IOError(filename_, strerror(errno));
  176. }
  177. }
  178. if (close(fd_) < 0) {
  179. if (s.ok()) {
  180. s = Status::IOError(filename_, strerror(errno));
  181. }
  182. }
  183. fd_ = -1;
  184. base_ = NULL;
  185. limit_ = NULL;
  186. return s;
  187. }
  188. virtual Status Flush() {
  189. return Status::OK();
  190. }
  191. virtual Status Sync() {
  192. Status s;
  193. if (pending_sync_) {
  194. // Some unmapped data was not synced
  195. pending_sync_ = false;
  196. if (fdatasync(fd_) < 0) {
  197. s = Status::IOError(filename_, strerror(errno));
  198. }
  199. }
  200. if (dst_ > last_sync_) {
  201. // Find the beginnings of the pages that contain the first and last
  202. // bytes to be synced.
  203. size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
  204. size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
  205. last_sync_ = dst_;
  206. if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
  207. s = Status::IOError(filename_, strerror(errno));
  208. }
  209. }
  210. return s;
  211. }
  212. };
  213. static int LockOrUnlock(int fd, bool lock) {
  214. errno = 0;
  215. struct flock f;
  216. memset(&f, 0, sizeof(f));
  217. f.l_type = (lock ? F_WRLCK : F_UNLCK);
  218. f.l_whence = SEEK_SET;
  219. f.l_start = 0;
  220. f.l_len = 0; // Lock/unlock entire file
  221. return fcntl(fd, F_SETLK, &f);
  222. }
  223. class PosixFileLock : public FileLock {
  224. public:
  225. int fd_;
  226. };
  227. class PosixEnv : public Env {
  228. public:
  229. PosixEnv();
  230. virtual ~PosixEnv() {
  231. fprintf(stderr, "Destroying Env::Default()\n");
  232. exit(1);
  233. }
  234. virtual Status NewSequentialFile(const std::string& fname,
  235. SequentialFile** result) {
  236. FILE* f = fopen(fname.c_str(), "r");
  237. if (f == NULL) {
  238. *result = NULL;
  239. return Status::IOError(fname, strerror(errno));
  240. } else {
  241. *result = new PosixSequentialFile(fname, f);
  242. return Status::OK();
  243. }
  244. }
  245. virtual Status NewRandomAccessFile(const std::string& fname,
  246. RandomAccessFile** result) {
  247. int fd = open(fname.c_str(), O_RDONLY);
  248. if (fd < 0) {
  249. *result = NULL;
  250. return Status::IOError(fname, strerror(errno));
  251. }
  252. *result = new PosixRandomAccessFile(fname, fd);
  253. return Status::OK();
  254. }
  255. virtual Status NewWritableFile(const std::string& fname,
  256. WritableFile** result) {
  257. Status s;
  258. const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
  259. if (fd < 0) {
  260. *result = NULL;
  261. s = Status::IOError(fname, strerror(errno));
  262. } else {
  263. *result = new PosixMmapFile(fname, fd, page_size_);
  264. }
  265. return s;
  266. }
  267. virtual bool FileExists(const std::string& fname) {
  268. return access(fname.c_str(), F_OK) == 0;
  269. }
  270. virtual Status GetChildren(const std::string& dir,
  271. std::vector<std::string>* result) {
  272. result->clear();
  273. DIR* d = opendir(dir.c_str());
  274. if (d == NULL) {
  275. return Status::IOError(dir, strerror(errno));
  276. }
  277. struct dirent* entry;
  278. while ((entry = readdir(d)) != NULL) {
  279. result->push_back(entry->d_name);
  280. }
  281. closedir(d);
  282. return Status::OK();
  283. }
  284. virtual Status DeleteFile(const std::string& fname) {
  285. Status result;
  286. if (unlink(fname.c_str()) != 0) {
  287. result = Status::IOError(fname, strerror(errno));
  288. }
  289. return result;
  290. };
  291. virtual Status CreateDir(const std::string& name) {
  292. Status result;
  293. if (mkdir(name.c_str(), 0755) != 0) {
  294. result = Status::IOError(name, strerror(errno));
  295. }
  296. return result;
  297. };
  298. virtual Status DeleteDir(const std::string& name) {
  299. Status result;
  300. if (rmdir(name.c_str()) != 0) {
  301. result = Status::IOError(name, strerror(errno));
  302. }
  303. return result;
  304. };
  305. virtual Status GetFileSize(const std::string& fname, uint64_t* size) {
  306. Status s;
  307. struct stat sbuf;
  308. if (stat(fname.c_str(), &sbuf) != 0) {
  309. *size = 0;
  310. s = Status::IOError(fname, strerror(errno));
  311. } else {
  312. *size = sbuf.st_size;
  313. }
  314. return s;
  315. }
  316. virtual Status RenameFile(const std::string& src, const std::string& target) {
  317. Status result;
  318. if (rename(src.c_str(), target.c_str()) != 0) {
  319. result = Status::IOError(src, strerror(errno));
  320. }
  321. return result;
  322. }
  323. virtual Status LockFile(const std::string& fname, FileLock** lock) {
  324. *lock = NULL;
  325. Status result;
  326. int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
  327. if (fd < 0) {
  328. result = Status::IOError(fname, strerror(errno));
  329. } else if (LockOrUnlock(fd, true) == -1) {
  330. result = Status::IOError("lock " + fname, strerror(errno));
  331. close(fd);
  332. } else {
  333. PosixFileLock* my_lock = new PosixFileLock;
  334. my_lock->fd_ = fd;
  335. *lock = my_lock;
  336. }
  337. return result;
  338. }
  339. virtual Status UnlockFile(FileLock* lock) {
  340. PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
  341. Status result;
  342. if (LockOrUnlock(my_lock->fd_, false) == -1) {
  343. result = Status::IOError(strerror(errno));
  344. }
  345. close(my_lock->fd_);
  346. delete my_lock;
  347. return result;
  348. }
  349. virtual void Schedule(void (*function)(void*), void* arg);
  350. virtual void StartThread(void (*function)(void* arg), void* arg);
  351. virtual Status GetTestDirectory(std::string* result) {
  352. const char* env = getenv("TEST_TMPDIR");
  353. if (env && env[0] != '\0') {
  354. *result = env;
  355. } else {
  356. char buf[100];
  357. snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid()));
  358. *result = buf;
  359. }
  360. // Directory may already exist
  361. CreateDir(*result);
  362. return Status::OK();
  363. }
  364. virtual void Logv(WritableFile* info_log, const char* format, va_list ap) {
  365. pthread_t tid = pthread_self();
  366. uint64_t thread_id = 0;
  367. memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
  368. // We try twice: the first time with a fixed-size stack allocated buffer,
  369. // and the second time with a much larger dynamically allocated buffer.
  370. char buffer[500];
  371. for (int iter = 0; iter < 2; iter++) {
  372. char* base;
  373. int bufsize;
  374. if (iter == 0) {
  375. bufsize = sizeof(buffer);
  376. base = buffer;
  377. } else {
  378. bufsize = 30000;
  379. base = new char[bufsize];
  380. }
  381. char* p = base;
  382. char* limit = base + bufsize;
  383. struct timeval now_tv;
  384. gettimeofday(&now_tv, NULL);
  385. const time_t seconds = now_tv.tv_sec;
  386. struct tm t;
  387. localtime_r(&seconds, &t);
  388. p += snprintf(p, limit - p,
  389. "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
  390. t.tm_year + 1900,
  391. t.tm_mon + 1,
  392. t.tm_mday,
  393. t.tm_hour,
  394. t.tm_min,
  395. t.tm_sec,
  396. static_cast<int>(now_tv.tv_usec),
  397. static_cast<long long unsigned int>(thread_id));
  398. // Print the message
  399. if (p < limit) {
  400. va_list backup_ap;
  401. va_copy(backup_ap, ap);
  402. p += vsnprintf(p, limit - p, format, backup_ap);
  403. va_end(backup_ap);
  404. }
  405. // Truncate to available space if necessary
  406. if (p >= limit) {
  407. if (iter == 0) {
  408. continue; // Try again with larger buffer
  409. } else {
  410. p = limit - 1;
  411. }
  412. }
  413. // Add newline if necessary
  414. if (p == base || p[-1] != '\n') {
  415. *p++ = '\n';
  416. }
  417. assert(p <= limit);
  418. info_log->Append(Slice(base, p - base));
  419. info_log->Flush();
  420. if (base != buffer) {
  421. delete[] base;
  422. }
  423. break;
  424. }
  425. }
  426. virtual uint64_t NowMicros() {
  427. struct timeval tv;
  428. gettimeofday(&tv, NULL);
  429. return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
  430. }
  431. virtual void SleepForMicroseconds(int micros) {
  432. usleep(micros);
  433. }
  434. private:
  435. void PthreadCall(const char* label, int result) {
  436. if (result != 0) {
  437. fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
  438. exit(1);
  439. }
  440. }
  441. // BGThread() is the body of the background thread
  442. void BGThread();
  443. static void* BGThreadWrapper(void* arg) {
  444. reinterpret_cast<PosixEnv*>(arg)->BGThread();
  445. return NULL;
  446. }
  447. size_t page_size_;
  448. pthread_mutex_t mu_;
  449. pthread_cond_t bgsignal_;
  450. pthread_t bgthread_;
  451. bool started_bgthread_;
  452. // Entry per Schedule() call
  453. struct BGItem { void* arg; void (*function)(void*); };
  454. typedef std::deque<BGItem> BGQueue;
  455. BGQueue queue_;
  456. };
  457. PosixEnv::PosixEnv() : page_size_(getpagesize()),
  458. started_bgthread_(false) {
  459. PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
  460. PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
  461. }
  462. void PosixEnv::Schedule(void (*function)(void*), void* arg) {
  463. PthreadCall("lock", pthread_mutex_lock(&mu_));
  464. // Start background thread if necessary
  465. if (!started_bgthread_) {
  466. started_bgthread_ = true;
  467. PthreadCall(
  468. "create thread",
  469. pthread_create(&bgthread_, NULL, &PosixEnv::BGThreadWrapper, this));
  470. }
  471. // If the queue is currently empty, the background thread may currently be
  472. // waiting.
  473. if (queue_.empty()) {
  474. PthreadCall("signal", pthread_cond_signal(&bgsignal_));
  475. }
  476. // Add to priority queue
  477. queue_.push_back(BGItem());
  478. queue_.back().function = function;
  479. queue_.back().arg = arg;
  480. PthreadCall("unlock", pthread_mutex_unlock(&mu_));
  481. }
  482. void PosixEnv::BGThread() {
  483. while (true) {
  484. // Wait until there is an item that is ready to run
  485. PthreadCall("lock", pthread_mutex_lock(&mu_));
  486. while (queue_.empty()) {
  487. PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
  488. }
  489. void (*function)(void*) = queue_.front().function;
  490. void* arg = queue_.front().arg;
  491. queue_.pop_front();
  492. PthreadCall("unlock", pthread_mutex_unlock(&mu_));
  493. (*function)(arg);
  494. }
  495. }
  496. namespace {
  497. struct StartThreadState {
  498. void (*user_function)(void*);
  499. void* arg;
  500. };
  501. }
  502. static void* StartThreadWrapper(void* arg) {
  503. StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
  504. state->user_function(state->arg);
  505. delete state;
  506. return NULL;
  507. }
  508. void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
  509. pthread_t t;
  510. StartThreadState* state = new StartThreadState;
  511. state->user_function = function;
  512. state->arg = arg;
  513. PthreadCall("start thread",
  514. pthread_create(&t, NULL, &StartThreadWrapper, state));
  515. }
  516. }
  517. static pthread_once_t once = PTHREAD_ONCE_INIT;
  518. static Env* default_env;
  519. static void InitDefaultEnv() { default_env = new PosixEnv; }
  520. Env* Env::Default() {
  521. pthread_once(&once, InitDefaultEnv);
  522. return default_env;
  523. }
  524. }