You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

609 lines
16 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include <deque>
  5. #include <dirent.h>
  6. #include <errno.h>
  7. #include <fcntl.h>
  8. #include <pthread.h>
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <string.h>
  12. #include <sys/mman.h>
  13. #include <sys/stat.h>
  14. #include <sys/time.h>
  15. #include <sys/types.h>
  16. #include <time.h>
  17. #include <unistd.h>
  18. #if defined(LEVELDB_PLATFORM_ANDROID)
  19. #include <sys/stat.h>
  20. #endif
  21. #include "include/env.h"
  22. #include "include/slice.h"
  23. #include "port/port.h"
  24. #include "util/logging.h"
  25. namespace leveldb {
  26. namespace {
  27. class PosixSequentialFile: public SequentialFile {
  28. private:
  29. std::string filename_;
  30. FILE* file_;
  31. public:
  32. PosixSequentialFile(const std::string& fname, FILE* f)
  33. : filename_(fname), file_(f) { }
  34. virtual ~PosixSequentialFile() { fclose(file_); }
  35. virtual Status Read(size_t n, Slice* result, char* scratch) {
  36. Status s;
  37. size_t r = fread_unlocked(scratch, 1, n, file_);
  38. *result = Slice(scratch, r);
  39. if (r < n) {
  40. if (feof(file_)) {
  41. // We leave status as ok if we hit the end of the file
  42. } else {
  43. // A partial read with an error: return a non-ok status
  44. s = Status::IOError(filename_, strerror(errno));
  45. }
  46. }
  47. return s;
  48. }
  49. };
  50. class PosixRandomAccessFile: public RandomAccessFile {
  51. private:
  52. std::string filename_;
  53. uint64_t size_;
  54. int fd_;
  55. public:
  56. PosixRandomAccessFile(const std::string& fname, uint64_t size, int fd)
  57. : filename_(fname), size_(size), fd_(fd) { }
  58. virtual ~PosixRandomAccessFile() { close(fd_); }
  59. virtual uint64_t Size() const { return size_; }
  60. virtual Status Read(uint64_t offset, size_t n, Slice* result,
  61. char* scratch) const {
  62. Status s;
  63. ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
  64. *result = Slice(scratch, (r < 0) ? 0 : r);
  65. if (r < 0) {
  66. // An error: return a non-ok status
  67. s = Status::IOError(filename_, strerror(errno));
  68. }
  69. return s;
  70. }
  71. };
  72. // We preallocate up to an extra megabyte and use memcpy to append new
  73. // data to the file. This is safe since we either properly close the
  74. // file before reading from it, or for log files, the reading code
  75. // knows enough to skip zero suffixes.
  76. class PosixMmapFile : public WritableFile {
  77. private:
  78. std::string filename_;
  79. int fd_;
  80. size_t page_size_;
  81. size_t map_size_; // How much extra memory to map at a time
  82. char* base_; // The mapped region
  83. char* limit_; // Limit of the mapped region
  84. char* dst_; // Where to write next (in range [base_,limit_])
  85. char* last_sync_; // Where have we synced up to
  86. uint64_t file_offset_; // Offset of base_ in file
  87. // Have we done an munmap of unsynced data?
  88. bool pending_sync_;
  89. // Roundup x to a multiple of y
  90. static size_t Roundup(size_t x, size_t y) {
  91. return ((x + y - 1) / y) * y;
  92. }
  93. size_t TruncateToPageBoundary(size_t s) {
  94. s -= (s & (page_size_ - 1));
  95. assert((s % page_size_) == 0);
  96. return s;
  97. }
  98. void UnmapCurrentRegion() {
  99. if (base_ != NULL) {
  100. if (last_sync_ < limit_) {
  101. // Defer syncing this data until next Sync() call, if any
  102. pending_sync_ = true;
  103. }
  104. munmap(base_, limit_ - base_);
  105. file_offset_ += limit_ - base_;
  106. base_ = NULL;
  107. limit_ = NULL;
  108. last_sync_ = NULL;
  109. dst_ = NULL;
  110. // Increase the amount we map the next time, but capped at 1MB
  111. if (map_size_ < (1<<20)) {
  112. map_size_ *= 2;
  113. }
  114. }
  115. }
  116. bool MapNewRegion() {
  117. assert(base_ == NULL);
  118. if (ftruncate(fd_, file_offset_ + map_size_) < 0) {
  119. return false;
  120. }
  121. void* ptr = mmap(NULL, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED,
  122. fd_, file_offset_);
  123. if (ptr == MAP_FAILED) {
  124. return false;
  125. }
  126. base_ = reinterpret_cast<char*>(ptr);
  127. limit_ = base_ + map_size_;
  128. dst_ = base_;
  129. last_sync_ = base_;
  130. return true;
  131. }
  132. public:
  133. PosixMmapFile(const std::string& fname, int fd, size_t page_size)
  134. : filename_(fname),
  135. fd_(fd),
  136. page_size_(page_size),
  137. map_size_(Roundup(65536, page_size)),
  138. base_(NULL),
  139. limit_(NULL),
  140. dst_(NULL),
  141. last_sync_(NULL),
  142. file_offset_(0),
  143. pending_sync_(false) {
  144. assert((page_size & (page_size - 1)) == 0);
  145. }
  146. ~PosixMmapFile() {
  147. if (fd_ >= 0) {
  148. PosixMmapFile::Close();
  149. }
  150. }
  151. virtual Status Append(const Slice& data) {
  152. const char* src = data.data();
  153. size_t left = data.size();
  154. while (left > 0) {
  155. assert(base_ <= dst_);
  156. assert(dst_ <= limit_);
  157. size_t avail = limit_ - dst_;
  158. if (avail == 0) {
  159. UnmapCurrentRegion();
  160. MapNewRegion();
  161. }
  162. size_t n = (left <= avail) ? left : avail;
  163. memcpy(dst_, src, n);
  164. dst_ += n;
  165. src += n;
  166. left -= n;
  167. }
  168. return Status::OK();
  169. }
  170. virtual Status Close() {
  171. Status s;
  172. size_t unused = limit_ - dst_;
  173. UnmapCurrentRegion();
  174. if (unused > 0) {
  175. // Trim the extra space at the end of the file
  176. if (ftruncate(fd_, file_offset_ - unused) < 0) {
  177. s = Status::IOError(filename_, strerror(errno));
  178. }
  179. }
  180. if (close(fd_) < 0) {
  181. if (s.ok()) {
  182. s = Status::IOError(filename_, strerror(errno));
  183. }
  184. }
  185. fd_ = -1;
  186. base_ = NULL;
  187. limit_ = NULL;
  188. return s;
  189. }
  190. virtual Status Flush() {
  191. return Status::OK();
  192. }
  193. virtual Status Sync() {
  194. Status s;
  195. if (pending_sync_) {
  196. // Some unmapped data was not synced
  197. pending_sync_ = false;
  198. if (fdatasync(fd_) < 0) {
  199. s = Status::IOError(filename_, strerror(errno));
  200. }
  201. }
  202. if (dst_ > last_sync_) {
  203. // Find the beginnings of the pages that contain the first and last
  204. // bytes to be synced.
  205. size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
  206. size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
  207. last_sync_ = dst_;
  208. if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
  209. s = Status::IOError(filename_, strerror(errno));
  210. }
  211. }
  212. return s;
  213. }
  214. };
  215. static int LockOrUnlock(int fd, bool lock) {
  216. errno = 0;
  217. struct flock f;
  218. memset(&f, 0, sizeof(f));
  219. f.l_type = (lock ? F_WRLCK : F_UNLCK);
  220. f.l_whence = SEEK_SET;
  221. f.l_start = 0;
  222. f.l_len = 0; // Lock/unlock entire file
  223. return fcntl(fd, F_SETLK, &f);
  224. }
  225. class PosixFileLock : public FileLock {
  226. public:
  227. int fd_;
  228. };
  229. class PosixEnv : public Env {
  230. public:
  231. PosixEnv();
  232. virtual ~PosixEnv() {
  233. fprintf(stderr, "Destroying Env::Default()\n");
  234. exit(1);
  235. }
  236. virtual Status NewSequentialFile(const std::string& fname,
  237. SequentialFile** result) {
  238. FILE* f = fopen(fname.c_str(), "r");
  239. if (f == NULL) {
  240. *result = NULL;
  241. return Status::IOError(fname, strerror(errno));
  242. } else {
  243. *result = new PosixSequentialFile(fname, f);
  244. return Status::OK();
  245. }
  246. }
  247. virtual Status NewRandomAccessFile(const std::string& fname,
  248. RandomAccessFile** result) {
  249. int fd = open(fname.c_str(), O_RDONLY);
  250. if (fd < 0) {
  251. *result = NULL;
  252. return Status::IOError(fname, strerror(errno));
  253. }
  254. struct stat sbuf;
  255. if (fstat(fd, &sbuf) != 0) {
  256. *result = NULL;
  257. Status s = Status::IOError(fname, strerror(errno));
  258. close(fd);
  259. return s;
  260. }
  261. *result = new PosixRandomAccessFile(fname, sbuf.st_size, fd);
  262. return Status::OK();
  263. }
  264. virtual Status NewWritableFile(const std::string& fname,
  265. WritableFile** result) {
  266. Status s;
  267. const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
  268. if (fd < 0) {
  269. *result = NULL;
  270. s = Status::IOError(fname, strerror(errno));
  271. } else {
  272. *result = new PosixMmapFile(fname, fd, page_size_);
  273. }
  274. return s;
  275. }
  276. virtual bool FileExists(const std::string& fname) {
  277. return access(fname.c_str(), F_OK) == 0;
  278. }
  279. virtual Status GetChildren(const std::string& dir,
  280. std::vector<std::string>* result) {
  281. result->clear();
  282. DIR* d = opendir(dir.c_str());
  283. if (d == NULL) {
  284. return Status::IOError(dir, strerror(errno));
  285. }
  286. struct dirent* entry;
  287. while ((entry = readdir(d)) != NULL) {
  288. result->push_back(entry->d_name);
  289. }
  290. closedir(d);
  291. return Status::OK();
  292. }
  293. virtual Status DeleteFile(const std::string& fname) {
  294. Status result;
  295. if (unlink(fname.c_str()) != 0) {
  296. result = Status::IOError(fname, strerror(errno));
  297. }
  298. return result;
  299. };
  300. virtual Status CreateDir(const std::string& name) {
  301. Status result;
  302. if (mkdir(name.c_str(), 0755) != 0) {
  303. result = Status::IOError(name, strerror(errno));
  304. }
  305. return result;
  306. };
  307. virtual Status DeleteDir(const std::string& name) {
  308. Status result;
  309. if (rmdir(name.c_str()) != 0) {
  310. result = Status::IOError(name, strerror(errno));
  311. }
  312. return result;
  313. };
  314. virtual Status GetFileSize(const std::string& fname, uint64_t* size) {
  315. Status s;
  316. struct stat sbuf;
  317. if (stat(fname.c_str(), &sbuf) != 0) {
  318. *size = 0;
  319. s = Status::IOError(fname, strerror(errno));
  320. } else {
  321. *size = sbuf.st_size;
  322. }
  323. return s;
  324. }
  325. virtual Status RenameFile(const std::string& src, const std::string& target) {
  326. Status result;
  327. if (rename(src.c_str(), target.c_str()) != 0) {
  328. result = Status::IOError(src, strerror(errno));
  329. }
  330. return result;
  331. }
  332. virtual Status LockFile(const std::string& fname, FileLock** lock) {
  333. *lock = NULL;
  334. Status result;
  335. int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
  336. if (fd < 0) {
  337. result = Status::IOError(fname, strerror(errno));
  338. } else if (LockOrUnlock(fd, true) == -1) {
  339. result = Status::IOError("lock " + fname, strerror(errno));
  340. close(fd);
  341. } else {
  342. PosixFileLock* my_lock = new PosixFileLock;
  343. my_lock->fd_ = fd;
  344. *lock = my_lock;
  345. }
  346. return result;
  347. }
  348. virtual Status UnlockFile(FileLock* lock) {
  349. PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
  350. Status result;
  351. if (LockOrUnlock(my_lock->fd_, false) == -1) {
  352. result = Status::IOError(strerror(errno));
  353. }
  354. close(my_lock->fd_);
  355. delete my_lock;
  356. return result;
  357. }
  358. virtual void Schedule(void (*function)(void*), void* arg);
  359. virtual void StartThread(void (*function)(void* arg), void* arg);
  360. virtual Status GetTestDirectory(std::string* result) {
  361. const char* env = getenv("TEST_TMPDIR");
  362. if (env && env[0] != '\0') {
  363. *result = env;
  364. } else {
  365. char buf[100];
  366. snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid()));
  367. *result = buf;
  368. }
  369. // Directory may already exist
  370. CreateDir(*result);
  371. return Status::OK();
  372. }
  373. virtual void Logv(WritableFile* info_log, const char* format, va_list ap) {
  374. pthread_t tid = pthread_self();
  375. uint64_t thread_id = 0;
  376. memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
  377. // We try twice: the first time with a fixed-size stack allocated buffer,
  378. // and the second time with a much larger dynamically allocated buffer.
  379. char buffer[500];
  380. for (int iter = 0; iter < 2; iter++) {
  381. char* base;
  382. int bufsize;
  383. if (iter == 0) {
  384. bufsize = sizeof(buffer);
  385. base = buffer;
  386. } else {
  387. bufsize = 30000;
  388. base = new char[bufsize];
  389. }
  390. char* p = base;
  391. char* limit = base + bufsize;
  392. struct timeval now_tv;
  393. gettimeofday(&now_tv, NULL);
  394. const time_t seconds = now_tv.tv_sec;
  395. struct tm t;
  396. localtime_r(&seconds, &t);
  397. p += snprintf(p, limit - p,
  398. "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
  399. t.tm_year + 1900,
  400. t.tm_mon + 1,
  401. t.tm_mday,
  402. t.tm_hour,
  403. t.tm_min,
  404. t.tm_sec,
  405. static_cast<int>(now_tv.tv_usec),
  406. static_cast<long long unsigned int>(thread_id));
  407. // Print the message
  408. if (p < limit) {
  409. va_list backup_ap;
  410. va_copy(backup_ap, ap);
  411. p += vsnprintf(p, limit - p, format, backup_ap);
  412. va_end(backup_ap);
  413. }
  414. // Truncate to available space if necessary
  415. if (p >= limit) {
  416. if (iter == 0) {
  417. continue; // Try again with larger buffer
  418. } else {
  419. p = limit - 1;
  420. }
  421. }
  422. // Add newline if necessary
  423. if (p == base || p[-1] != '\n') {
  424. *p++ = '\n';
  425. }
  426. assert(p <= limit);
  427. info_log->Append(Slice(base, p - base));
  428. info_log->Flush();
  429. if (base != buffer) {
  430. delete[] base;
  431. }
  432. break;
  433. }
  434. }
  435. virtual uint64_t NowMicros() {
  436. struct timeval tv;
  437. gettimeofday(&tv, NULL);
  438. return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
  439. }
  440. virtual void SleepForMicroseconds(int micros) {
  441. usleep(micros);
  442. }
  443. private:
  444. void PthreadCall(const char* label, int result) {
  445. if (result != 0) {
  446. fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
  447. exit(1);
  448. }
  449. }
  450. // BGThread() is the body of the background thread
  451. void BGThread();
  452. static void* BGThreadWrapper(void* arg) {
  453. reinterpret_cast<PosixEnv*>(arg)->BGThread();
  454. return NULL;
  455. }
  456. size_t page_size_;
  457. pthread_mutex_t mu_;
  458. pthread_cond_t bgsignal_;
  459. pthread_t bgthread_;
  460. bool started_bgthread_;
  461. // Entry per Schedule() call
  462. struct BGItem { void* arg; void (*function)(void*); };
  463. typedef std::deque<BGItem> BGQueue;
  464. BGQueue queue_;
  465. };
  466. PosixEnv::PosixEnv() : page_size_(getpagesize()),
  467. started_bgthread_(false) {
  468. PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
  469. PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
  470. }
  471. void PosixEnv::Schedule(void (*function)(void*), void* arg) {
  472. PthreadCall("lock", pthread_mutex_lock(&mu_));
  473. // Start background thread if necessary
  474. if (!started_bgthread_) {
  475. started_bgthread_ = true;
  476. PthreadCall(
  477. "create thread",
  478. pthread_create(&bgthread_, NULL, &PosixEnv::BGThreadWrapper, this));
  479. }
  480. // If the queue is currently empty, the background thread may currently be
  481. // waiting.
  482. if (queue_.empty()) {
  483. PthreadCall("signal", pthread_cond_signal(&bgsignal_));
  484. }
  485. // Add to priority queue
  486. queue_.push_back(BGItem());
  487. queue_.back().function = function;
  488. queue_.back().arg = arg;
  489. PthreadCall("unlock", pthread_mutex_unlock(&mu_));
  490. }
  491. void PosixEnv::BGThread() {
  492. while (true) {
  493. // Wait until there is an item that is ready to run
  494. PthreadCall("lock", pthread_mutex_lock(&mu_));
  495. while (queue_.empty()) {
  496. PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
  497. }
  498. void (*function)(void*) = queue_.front().function;
  499. void* arg = queue_.front().arg;
  500. queue_.pop_front();
  501. PthreadCall("unlock", pthread_mutex_unlock(&mu_));
  502. (*function)(arg);
  503. }
  504. }
  505. namespace {
  506. struct StartThreadState {
  507. void (*user_function)(void*);
  508. void* arg;
  509. };
  510. }
  511. static void* StartThreadWrapper(void* arg) {
  512. StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
  513. state->user_function(state->arg);
  514. delete state;
  515. return NULL;
  516. }
  517. void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
  518. pthread_t t;
  519. StartThreadState* state = new StartThreadState;
  520. state->user_function = function;
  521. state->arg = arg;
  522. PthreadCall("start thread",
  523. pthread_create(&t, NULL, &StartThreadWrapper, state));
  524. }
  525. }
  526. static pthread_once_t once = PTHREAD_ONCE_INIT;
  527. static Env* default_env;
  528. static void InitDefaultEnv() { default_env = new PosixEnv; }
  529. Env* Env::Default() {
  530. pthread_once(&once, InitDefaultEnv);
  531. return default_env;
  532. }
  533. }