作者: 韩晨旭 10225101440 李畅 10225102463
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

606 lines
16 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include <deque>
  5. #include <dirent.h>
  6. #include <errno.h>
  7. #include <fcntl.h>
  8. #include <pthread.h>
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <string.h>
  12. #include <sys/mman.h>
  13. #include <sys/stat.h>
  14. #include <sys/time.h>
  15. #include <sys/types.h>
  16. #include <time.h>
  17. #include <unistd.h>
  18. #if defined(LEVELDB_PLATFORM_ANDROID)
  19. #include <sys/stat.h>
  20. #endif
  21. #include "leveldb/env.h"
  22. #include "leveldb/slice.h"
  23. #include "port/port.h"
  24. #include "util/logging.h"
  25. namespace leveldb {
  26. namespace {
  27. class PosixSequentialFile: public SequentialFile {
  28. private:
  29. std::string filename_;
  30. FILE* file_;
  31. public:
  32. PosixSequentialFile(const std::string& fname, FILE* f)
  33. : filename_(fname), file_(f) { }
  34. virtual ~PosixSequentialFile() { fclose(file_); }
  35. virtual Status Read(size_t n, Slice* result, char* scratch) {
  36. Status s;
  37. size_t r = fread_unlocked(scratch, 1, n, file_);
  38. *result = Slice(scratch, r);
  39. if (r < n) {
  40. if (feof(file_)) {
  41. // We leave status as ok if we hit the end of the file
  42. } else {
  43. // A partial read with an error: return a non-ok status
  44. s = Status::IOError(filename_, strerror(errno));
  45. }
  46. }
  47. return s;
  48. }
  49. virtual Status Skip(uint64_t n) {
  50. if (fseek(file_, n, SEEK_CUR)) {
  51. return Status::IOError(filename_, strerror(errno));
  52. }
  53. return Status::OK();
  54. }
  55. };
  56. class PosixRandomAccessFile: public RandomAccessFile {
  57. private:
  58. std::string filename_;
  59. int fd_;
  60. public:
  61. PosixRandomAccessFile(const std::string& fname, int fd)
  62. : filename_(fname), fd_(fd) { }
  63. virtual ~PosixRandomAccessFile() { close(fd_); }
  64. virtual Status Read(uint64_t offset, size_t n, Slice* result,
  65. char* scratch) const {
  66. Status s;
  67. ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
  68. *result = Slice(scratch, (r < 0) ? 0 : r);
  69. if (r < 0) {
  70. // An error: return a non-ok status
  71. s = Status::IOError(filename_, strerror(errno));
  72. }
  73. return s;
  74. }
  75. };
  76. // We preallocate up to an extra megabyte and use memcpy to append new
  77. // data to the file. This is safe since we either properly close the
  78. // file before reading from it, or for log files, the reading code
  79. // knows enough to skip zero suffixes.
  80. class PosixMmapFile : public WritableFile {
  81. private:
  82. std::string filename_;
  83. int fd_;
  84. size_t page_size_;
  85. size_t map_size_; // How much extra memory to map at a time
  86. char* base_; // The mapped region
  87. char* limit_; // Limit of the mapped region
  88. char* dst_; // Where to write next (in range [base_,limit_])
  89. char* last_sync_; // Where have we synced up to
  90. uint64_t file_offset_; // Offset of base_ in file
  91. // Have we done an munmap of unsynced data?
  92. bool pending_sync_;
  93. // Roundup x to a multiple of y
  94. static size_t Roundup(size_t x, size_t y) {
  95. return ((x + y - 1) / y) * y;
  96. }
  97. size_t TruncateToPageBoundary(size_t s) {
  98. s -= (s & (page_size_ - 1));
  99. assert((s % page_size_) == 0);
  100. return s;
  101. }
  102. void UnmapCurrentRegion() {
  103. if (base_ != NULL) {
  104. if (last_sync_ < limit_) {
  105. // Defer syncing this data until next Sync() call, if any
  106. pending_sync_ = true;
  107. }
  108. munmap(base_, limit_ - base_);
  109. file_offset_ += limit_ - base_;
  110. base_ = NULL;
  111. limit_ = NULL;
  112. last_sync_ = NULL;
  113. dst_ = NULL;
  114. // Increase the amount we map the next time, but capped at 1MB
  115. if (map_size_ < (1<<20)) {
  116. map_size_ *= 2;
  117. }
  118. }
  119. }
  120. bool MapNewRegion() {
  121. assert(base_ == NULL);
  122. if (ftruncate(fd_, file_offset_ + map_size_) < 0) {
  123. return false;
  124. }
  125. void* ptr = mmap(NULL, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED,
  126. fd_, file_offset_);
  127. if (ptr == MAP_FAILED) {
  128. return false;
  129. }
  130. base_ = reinterpret_cast<char*>(ptr);
  131. limit_ = base_ + map_size_;
  132. dst_ = base_;
  133. last_sync_ = base_;
  134. return true;
  135. }
  136. public:
  137. PosixMmapFile(const std::string& fname, int fd, size_t page_size)
  138. : filename_(fname),
  139. fd_(fd),
  140. page_size_(page_size),
  141. map_size_(Roundup(65536, page_size)),
  142. base_(NULL),
  143. limit_(NULL),
  144. dst_(NULL),
  145. last_sync_(NULL),
  146. file_offset_(0),
  147. pending_sync_(false) {
  148. assert((page_size & (page_size - 1)) == 0);
  149. }
  150. ~PosixMmapFile() {
  151. if (fd_ >= 0) {
  152. PosixMmapFile::Close();
  153. }
  154. }
  155. virtual Status Append(const Slice& data) {
  156. const char* src = data.data();
  157. size_t left = data.size();
  158. while (left > 0) {
  159. assert(base_ <= dst_);
  160. assert(dst_ <= limit_);
  161. size_t avail = limit_ - dst_;
  162. if (avail == 0) {
  163. UnmapCurrentRegion();
  164. MapNewRegion();
  165. }
  166. size_t n = (left <= avail) ? left : avail;
  167. memcpy(dst_, src, n);
  168. dst_ += n;
  169. src += n;
  170. left -= n;
  171. }
  172. return Status::OK();
  173. }
  174. virtual Status Close() {
  175. Status s;
  176. size_t unused = limit_ - dst_;
  177. UnmapCurrentRegion();
  178. if (unused > 0) {
  179. // Trim the extra space at the end of the file
  180. if (ftruncate(fd_, file_offset_ - unused) < 0) {
  181. s = Status::IOError(filename_, strerror(errno));
  182. }
  183. }
  184. if (close(fd_) < 0) {
  185. if (s.ok()) {
  186. s = Status::IOError(filename_, strerror(errno));
  187. }
  188. }
  189. fd_ = -1;
  190. base_ = NULL;
  191. limit_ = NULL;
  192. return s;
  193. }
  194. virtual Status Flush() {
  195. return Status::OK();
  196. }
  197. virtual Status Sync() {
  198. Status s;
  199. if (pending_sync_) {
  200. // Some unmapped data was not synced
  201. pending_sync_ = false;
  202. if (fdatasync(fd_) < 0) {
  203. s = Status::IOError(filename_, strerror(errno));
  204. }
  205. }
  206. if (dst_ > last_sync_) {
  207. // Find the beginnings of the pages that contain the first and last
  208. // bytes to be synced.
  209. size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
  210. size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
  211. last_sync_ = dst_;
  212. if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
  213. s = Status::IOError(filename_, strerror(errno));
  214. }
  215. }
  216. return s;
  217. }
  218. };
  219. static int LockOrUnlock(int fd, bool lock) {
  220. errno = 0;
  221. struct flock f;
  222. memset(&f, 0, sizeof(f));
  223. f.l_type = (lock ? F_WRLCK : F_UNLCK);
  224. f.l_whence = SEEK_SET;
  225. f.l_start = 0;
  226. f.l_len = 0; // Lock/unlock entire file
  227. return fcntl(fd, F_SETLK, &f);
  228. }
  229. class PosixFileLock : public FileLock {
  230. public:
  231. int fd_;
  232. };
  233. class PosixEnv : public Env {
  234. public:
  235. PosixEnv();
  236. virtual ~PosixEnv() {
  237. fprintf(stderr, "Destroying Env::Default()\n");
  238. exit(1);
  239. }
  240. virtual Status NewSequentialFile(const std::string& fname,
  241. SequentialFile** result) {
  242. FILE* f = fopen(fname.c_str(), "r");
  243. if (f == NULL) {
  244. *result = NULL;
  245. return Status::IOError(fname, strerror(errno));
  246. } else {
  247. *result = new PosixSequentialFile(fname, f);
  248. return Status::OK();
  249. }
  250. }
  251. virtual Status NewRandomAccessFile(const std::string& fname,
  252. RandomAccessFile** result) {
  253. int fd = open(fname.c_str(), O_RDONLY);
  254. if (fd < 0) {
  255. *result = NULL;
  256. return Status::IOError(fname, strerror(errno));
  257. }
  258. *result = new PosixRandomAccessFile(fname, fd);
  259. return Status::OK();
  260. }
  261. virtual Status NewWritableFile(const std::string& fname,
  262. WritableFile** result) {
  263. Status s;
  264. const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
  265. if (fd < 0) {
  266. *result = NULL;
  267. s = Status::IOError(fname, strerror(errno));
  268. } else {
  269. *result = new PosixMmapFile(fname, fd, page_size_);
  270. }
  271. return s;
  272. }
  273. virtual bool FileExists(const std::string& fname) {
  274. return access(fname.c_str(), F_OK) == 0;
  275. }
  276. virtual Status GetChildren(const std::string& dir,
  277. std::vector<std::string>* result) {
  278. result->clear();
  279. DIR* d = opendir(dir.c_str());
  280. if (d == NULL) {
  281. return Status::IOError(dir, strerror(errno));
  282. }
  283. struct dirent* entry;
  284. while ((entry = readdir(d)) != NULL) {
  285. result->push_back(entry->d_name);
  286. }
  287. closedir(d);
  288. return Status::OK();
  289. }
  290. virtual Status DeleteFile(const std::string& fname) {
  291. Status result;
  292. if (unlink(fname.c_str()) != 0) {
  293. result = Status::IOError(fname, strerror(errno));
  294. }
  295. return result;
  296. };
  297. virtual Status CreateDir(const std::string& name) {
  298. Status result;
  299. if (mkdir(name.c_str(), 0755) != 0) {
  300. result = Status::IOError(name, strerror(errno));
  301. }
  302. return result;
  303. };
  304. virtual Status DeleteDir(const std::string& name) {
  305. Status result;
  306. if (rmdir(name.c_str()) != 0) {
  307. result = Status::IOError(name, strerror(errno));
  308. }
  309. return result;
  310. };
  311. virtual Status GetFileSize(const std::string& fname, uint64_t* size) {
  312. Status s;
  313. struct stat sbuf;
  314. if (stat(fname.c_str(), &sbuf) != 0) {
  315. *size = 0;
  316. s = Status::IOError(fname, strerror(errno));
  317. } else {
  318. *size = sbuf.st_size;
  319. }
  320. return s;
  321. }
  322. virtual Status RenameFile(const std::string& src, const std::string& target) {
  323. Status result;
  324. if (rename(src.c_str(), target.c_str()) != 0) {
  325. result = Status::IOError(src, strerror(errno));
  326. }
  327. return result;
  328. }
  329. virtual Status LockFile(const std::string& fname, FileLock** lock) {
  330. *lock = NULL;
  331. Status result;
  332. int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
  333. if (fd < 0) {
  334. result = Status::IOError(fname, strerror(errno));
  335. } else if (LockOrUnlock(fd, true) == -1) {
  336. result = Status::IOError("lock " + fname, strerror(errno));
  337. close(fd);
  338. } else {
  339. PosixFileLock* my_lock = new PosixFileLock;
  340. my_lock->fd_ = fd;
  341. *lock = my_lock;
  342. }
  343. return result;
  344. }
  345. virtual Status UnlockFile(FileLock* lock) {
  346. PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
  347. Status result;
  348. if (LockOrUnlock(my_lock->fd_, false) == -1) {
  349. result = Status::IOError(strerror(errno));
  350. }
  351. close(my_lock->fd_);
  352. delete my_lock;
  353. return result;
  354. }
  355. virtual void Schedule(void (*function)(void*), void* arg);
  356. virtual void StartThread(void (*function)(void* arg), void* arg);
  357. virtual Status GetTestDirectory(std::string* result) {
  358. const char* env = getenv("TEST_TMPDIR");
  359. if (env && env[0] != '\0') {
  360. *result = env;
  361. } else {
  362. char buf[100];
  363. snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid()));
  364. *result = buf;
  365. }
  366. // Directory may already exist
  367. CreateDir(*result);
  368. return Status::OK();
  369. }
  370. virtual void Logv(WritableFile* info_log, const char* format, va_list ap) {
  371. pthread_t tid = pthread_self();
  372. uint64_t thread_id = 0;
  373. memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
  374. // We try twice: the first time with a fixed-size stack allocated buffer,
  375. // and the second time with a much larger dynamically allocated buffer.
  376. char buffer[500];
  377. for (int iter = 0; iter < 2; iter++) {
  378. char* base;
  379. int bufsize;
  380. if (iter == 0) {
  381. bufsize = sizeof(buffer);
  382. base = buffer;
  383. } else {
  384. bufsize = 30000;
  385. base = new char[bufsize];
  386. }
  387. char* p = base;
  388. char* limit = base + bufsize;
  389. struct timeval now_tv;
  390. gettimeofday(&now_tv, NULL);
  391. const time_t seconds = now_tv.tv_sec;
  392. struct tm t;
  393. localtime_r(&seconds, &t);
  394. p += snprintf(p, limit - p,
  395. "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
  396. t.tm_year + 1900,
  397. t.tm_mon + 1,
  398. t.tm_mday,
  399. t.tm_hour,
  400. t.tm_min,
  401. t.tm_sec,
  402. static_cast<int>(now_tv.tv_usec),
  403. static_cast<long long unsigned int>(thread_id));
  404. // Print the message
  405. if (p < limit) {
  406. va_list backup_ap;
  407. va_copy(backup_ap, ap);
  408. p += vsnprintf(p, limit - p, format, backup_ap);
  409. va_end(backup_ap);
  410. }
  411. // Truncate to available space if necessary
  412. if (p >= limit) {
  413. if (iter == 0) {
  414. continue; // Try again with larger buffer
  415. } else {
  416. p = limit - 1;
  417. }
  418. }
  419. // Add newline if necessary
  420. if (p == base || p[-1] != '\n') {
  421. *p++ = '\n';
  422. }
  423. assert(p <= limit);
  424. info_log->Append(Slice(base, p - base));
  425. info_log->Flush();
  426. if (base != buffer) {
  427. delete[] base;
  428. }
  429. break;
  430. }
  431. }
  432. virtual uint64_t NowMicros() {
  433. struct timeval tv;
  434. gettimeofday(&tv, NULL);
  435. return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
  436. }
  437. virtual void SleepForMicroseconds(int micros) {
  438. usleep(micros);
  439. }
  440. private:
  441. void PthreadCall(const char* label, int result) {
  442. if (result != 0) {
  443. fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
  444. exit(1);
  445. }
  446. }
  447. // BGThread() is the body of the background thread
  448. void BGThread();
  449. static void* BGThreadWrapper(void* arg) {
  450. reinterpret_cast<PosixEnv*>(arg)->BGThread();
  451. return NULL;
  452. }
  453. size_t page_size_;
  454. pthread_mutex_t mu_;
  455. pthread_cond_t bgsignal_;
  456. pthread_t bgthread_;
  457. bool started_bgthread_;
  458. // Entry per Schedule() call
  459. struct BGItem { void* arg; void (*function)(void*); };
  460. typedef std::deque<BGItem> BGQueue;
  461. BGQueue queue_;
  462. };
  463. PosixEnv::PosixEnv() : page_size_(getpagesize()),
  464. started_bgthread_(false) {
  465. PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
  466. PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
  467. }
  468. void PosixEnv::Schedule(void (*function)(void*), void* arg) {
  469. PthreadCall("lock", pthread_mutex_lock(&mu_));
  470. // Start background thread if necessary
  471. if (!started_bgthread_) {
  472. started_bgthread_ = true;
  473. PthreadCall(
  474. "create thread",
  475. pthread_create(&bgthread_, NULL, &PosixEnv::BGThreadWrapper, this));
  476. }
  477. // If the queue is currently empty, the background thread may currently be
  478. // waiting.
  479. if (queue_.empty()) {
  480. PthreadCall("signal", pthread_cond_signal(&bgsignal_));
  481. }
  482. // Add to priority queue
  483. queue_.push_back(BGItem());
  484. queue_.back().function = function;
  485. queue_.back().arg = arg;
  486. PthreadCall("unlock", pthread_mutex_unlock(&mu_));
  487. }
  488. void PosixEnv::BGThread() {
  489. while (true) {
  490. // Wait until there is an item that is ready to run
  491. PthreadCall("lock", pthread_mutex_lock(&mu_));
  492. while (queue_.empty()) {
  493. PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
  494. }
  495. void (*function)(void*) = queue_.front().function;
  496. void* arg = queue_.front().arg;
  497. queue_.pop_front();
  498. PthreadCall("unlock", pthread_mutex_unlock(&mu_));
  499. (*function)(arg);
  500. }
  501. }
  502. namespace {
  503. struct StartThreadState {
  504. void (*user_function)(void*);
  505. void* arg;
  506. };
  507. }
  508. static void* StartThreadWrapper(void* arg) {
  509. StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
  510. state->user_function(state->arg);
  511. delete state;
  512. return NULL;
  513. }
  514. void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
  515. pthread_t t;
  516. StartThreadState* state = new StartThreadState;
  517. state->user_function = function;
  518. state->arg = arg;
  519. PthreadCall("start thread",
  520. pthread_create(&t, NULL, &StartThreadWrapper, state));
  521. }
  522. }
  523. static pthread_once_t once = PTHREAD_ONCE_INIT;
  524. static Env* default_env;
  525. static void InitDefaultEnv() { default_env = new PosixEnv; }
  526. Env* Env::Default() {
  527. pthread_once(&once, InitDefaultEnv);
  528. return default_env;
  529. }
  530. }