小组成员:谢瑞阳、徐翔宇
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

607 lines
15 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include <deque>
  5. #include <set>
  6. #include <dirent.h>
  7. #include <errno.h>
  8. #include <fcntl.h>
  9. #include <pthread.h>
  10. #include <stdio.h>
  11. #include <stdlib.h>
  12. #include <string.h>
  13. #include <sys/mman.h>
  14. #include <sys/stat.h>
  15. #include <sys/time.h>
  16. #include <sys/types.h>
  17. #include <time.h>
  18. #include <unistd.h>
  19. #if defined(LEVELDB_PLATFORM_ANDROID)
  20. #include <sys/stat.h>
  21. #endif
  22. #include "leveldb/env.h"
  23. #include "leveldb/slice.h"
  24. #include "port/port.h"
  25. #include "util/logging.h"
  26. #include "util/mutexlock.h"
  27. #include "util/posix_logger.h"
  28. namespace leveldb {
  29. namespace {
  30. static Status IOError(const std::string& context, int err_number) {
  31. return Status::IOError(context, strerror(err_number));
  32. }
  33. class PosixSequentialFile: public SequentialFile {
  34. private:
  35. std::string filename_;
  36. FILE* file_;
  37. public:
  38. PosixSequentialFile(const std::string& fname, FILE* f)
  39. : filename_(fname), file_(f) { }
  40. virtual ~PosixSequentialFile() { fclose(file_); }
  41. virtual Status Read(size_t n, Slice* result, char* scratch) {
  42. Status s;
  43. size_t r = fread_unlocked(scratch, 1, n, file_);
  44. *result = Slice(scratch, r);
  45. if (r < n) {
  46. if (feof(file_)) {
  47. // We leave status as ok if we hit the end of the file
  48. } else {
  49. // A partial read with an error: return a non-ok status
  50. s = IOError(filename_, errno);
  51. }
  52. }
  53. return s;
  54. }
  55. virtual Status Skip(uint64_t n) {
  56. if (fseek(file_, n, SEEK_CUR)) {
  57. return IOError(filename_, errno);
  58. }
  59. return Status::OK();
  60. }
  61. };
  62. // pread() based random-access
  63. class PosixRandomAccessFile: public RandomAccessFile {
  64. private:
  65. std::string filename_;
  66. int fd_;
  67. public:
  68. PosixRandomAccessFile(const std::string& fname, int fd)
  69. : filename_(fname), fd_(fd) { }
  70. virtual ~PosixRandomAccessFile() { close(fd_); }
  71. virtual Status Read(uint64_t offset, size_t n, Slice* result,
  72. char* scratch) const {
  73. Status s;
  74. ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
  75. *result = Slice(scratch, (r < 0) ? 0 : r);
  76. if (r < 0) {
  77. // An error: return a non-ok status
  78. s = IOError(filename_, errno);
  79. }
  80. return s;
  81. }
  82. };
  83. // Helper class to limit mmap file usage so that we do not end up
  84. // running out virtual memory or running into kernel performance
  85. // problems for very large databases.
  86. class MmapLimiter {
  87. public:
  88. // Up to 1000 mmaps for 64-bit binaries; none for smaller pointer sizes.
  89. MmapLimiter() {
  90. SetAllowed(sizeof(void*) >= 8 ? 1000 : 0);
  91. }
  92. // If another mmap slot is available, acquire it and return true.
  93. // Else return false.
  94. bool Acquire() {
  95. if (GetAllowed() <= 0) {
  96. return false;
  97. }
  98. MutexLock l(&mu_);
  99. intptr_t x = GetAllowed();
  100. if (x <= 0) {
  101. return false;
  102. } else {
  103. SetAllowed(x - 1);
  104. return true;
  105. }
  106. }
  107. // Release a slot acquired by a previous call to Acquire() that returned true.
  108. void Release() {
  109. MutexLock l(&mu_);
  110. SetAllowed(GetAllowed() + 1);
  111. }
  112. private:
  113. port::Mutex mu_;
  114. port::AtomicPointer allowed_;
  115. intptr_t GetAllowed() const {
  116. return reinterpret_cast<intptr_t>(allowed_.Acquire_Load());
  117. }
  118. // REQUIRES: mu_ must be held
  119. void SetAllowed(intptr_t v) {
  120. allowed_.Release_Store(reinterpret_cast<void*>(v));
  121. }
  122. MmapLimiter(const MmapLimiter&);
  123. void operator=(const MmapLimiter&);
  124. };
  125. // mmap() based random-access
  126. class PosixMmapReadableFile: public RandomAccessFile {
  127. private:
  128. std::string filename_;
  129. void* mmapped_region_;
  130. size_t length_;
  131. MmapLimiter* limiter_;
  132. public:
  133. // base[0,length-1] contains the mmapped contents of the file.
  134. PosixMmapReadableFile(const std::string& fname, void* base, size_t length,
  135. MmapLimiter* limiter)
  136. : filename_(fname), mmapped_region_(base), length_(length),
  137. limiter_(limiter) {
  138. }
  139. virtual ~PosixMmapReadableFile() {
  140. munmap(mmapped_region_, length_);
  141. limiter_->Release();
  142. }
  143. virtual Status Read(uint64_t offset, size_t n, Slice* result,
  144. char* scratch) const {
  145. Status s;
  146. if (offset + n > length_) {
  147. *result = Slice();
  148. s = IOError(filename_, EINVAL);
  149. } else {
  150. *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
  151. }
  152. return s;
  153. }
  154. };
  155. class PosixWritableFile : public WritableFile {
  156. private:
  157. std::string filename_;
  158. FILE* file_;
  159. public:
  160. PosixWritableFile(const std::string& fname, FILE* f)
  161. : filename_(fname), file_(f) { }
  162. ~PosixWritableFile() {
  163. if (file_ != NULL) {
  164. // Ignoring any potential errors
  165. fclose(file_);
  166. }
  167. }
  168. virtual Status Append(const Slice& data) {
  169. size_t r = fwrite_unlocked(data.data(), 1, data.size(), file_);
  170. if (r != data.size()) {
  171. return IOError(filename_, errno);
  172. }
  173. return Status::OK();
  174. }
  175. virtual Status Close() {
  176. Status result;
  177. if (fclose(file_) != 0) {
  178. result = IOError(filename_, errno);
  179. }
  180. file_ = NULL;
  181. return result;
  182. }
  183. virtual Status Flush() {
  184. if (fflush_unlocked(file_) != 0) {
  185. return IOError(filename_, errno);
  186. }
  187. return Status::OK();
  188. }
  189. Status SyncDirIfManifest() {
  190. const char* f = filename_.c_str();
  191. const char* sep = strrchr(f, '/');
  192. Slice basename;
  193. std::string dir;
  194. if (sep == NULL) {
  195. dir = ".";
  196. basename = f;
  197. } else {
  198. dir = std::string(f, sep - f);
  199. basename = sep + 1;
  200. }
  201. Status s;
  202. if (basename.starts_with("MANIFEST")) {
  203. int fd = open(dir.c_str(), O_RDONLY);
  204. if (fd < 0) {
  205. s = IOError(dir, errno);
  206. } else {
  207. if (fsync(fd) < 0) {
  208. s = IOError(dir, errno);
  209. }
  210. close(fd);
  211. }
  212. }
  213. return s;
  214. }
  215. virtual Status Sync() {
  216. // Ensure new files referred to by the manifest are in the filesystem.
  217. Status s = SyncDirIfManifest();
  218. if (!s.ok()) {
  219. return s;
  220. }
  221. if (fflush_unlocked(file_) != 0 ||
  222. fdatasync(fileno(file_)) != 0) {
  223. s = Status::IOError(filename_, strerror(errno));
  224. }
  225. return s;
  226. }
  227. };
  228. static int LockOrUnlock(int fd, bool lock) {
  229. errno = 0;
  230. struct flock f;
  231. memset(&f, 0, sizeof(f));
  232. f.l_type = (lock ? F_WRLCK : F_UNLCK);
  233. f.l_whence = SEEK_SET;
  234. f.l_start = 0;
  235. f.l_len = 0; // Lock/unlock entire file
  236. return fcntl(fd, F_SETLK, &f);
  237. }
  238. class PosixFileLock : public FileLock {
  239. public:
  240. int fd_;
  241. std::string name_;
  242. };
  243. // Set of locked files. We keep a separate set instead of just
  244. // relying on fcntrl(F_SETLK) since fcntl(F_SETLK) does not provide
  245. // any protection against multiple uses from the same process.
  246. class PosixLockTable {
  247. private:
  248. port::Mutex mu_;
  249. std::set<std::string> locked_files_;
  250. public:
  251. bool Insert(const std::string& fname) {
  252. MutexLock l(&mu_);
  253. return locked_files_.insert(fname).second;
  254. }
  255. void Remove(const std::string& fname) {
  256. MutexLock l(&mu_);
  257. locked_files_.erase(fname);
  258. }
  259. };
  260. class PosixEnv : public Env {
  261. public:
  262. PosixEnv();
  263. virtual ~PosixEnv() {
  264. fprintf(stderr, "Destroying Env::Default()\n");
  265. abort();
  266. }
  267. virtual Status NewSequentialFile(const std::string& fname,
  268. SequentialFile** result) {
  269. FILE* f = fopen(fname.c_str(), "r");
  270. if (f == NULL) {
  271. *result = NULL;
  272. return IOError(fname, errno);
  273. } else {
  274. *result = new PosixSequentialFile(fname, f);
  275. return Status::OK();
  276. }
  277. }
  278. virtual Status NewRandomAccessFile(const std::string& fname,
  279. RandomAccessFile** result) {
  280. *result = NULL;
  281. Status s;
  282. int fd = open(fname.c_str(), O_RDONLY);
  283. if (fd < 0) {
  284. s = IOError(fname, errno);
  285. } else if (mmap_limit_.Acquire()) {
  286. uint64_t size;
  287. s = GetFileSize(fname, &size);
  288. if (s.ok()) {
  289. void* base = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0);
  290. if (base != MAP_FAILED) {
  291. *result = new PosixMmapReadableFile(fname, base, size, &mmap_limit_);
  292. } else {
  293. s = IOError(fname, errno);
  294. }
  295. }
  296. close(fd);
  297. if (!s.ok()) {
  298. mmap_limit_.Release();
  299. }
  300. } else {
  301. *result = new PosixRandomAccessFile(fname, fd);
  302. }
  303. return s;
  304. }
  305. virtual Status NewWritableFile(const std::string& fname,
  306. WritableFile** result) {
  307. Status s;
  308. FILE* f = fopen(fname.c_str(), "w");
  309. if (f == NULL) {
  310. *result = NULL;
  311. s = IOError(fname, errno);
  312. } else {
  313. *result = new PosixWritableFile(fname, f);
  314. }
  315. return s;
  316. }
  317. virtual bool FileExists(const std::string& fname) {
  318. return access(fname.c_str(), F_OK) == 0;
  319. }
  320. virtual Status GetChildren(const std::string& dir,
  321. std::vector<std::string>* result) {
  322. result->clear();
  323. DIR* d = opendir(dir.c_str());
  324. if (d == NULL) {
  325. return IOError(dir, errno);
  326. }
  327. struct dirent* entry;
  328. while ((entry = readdir(d)) != NULL) {
  329. result->push_back(entry->d_name);
  330. }
  331. closedir(d);
  332. return Status::OK();
  333. }
  334. virtual Status DeleteFile(const std::string& fname) {
  335. Status result;
  336. if (unlink(fname.c_str()) != 0) {
  337. result = IOError(fname, errno);
  338. }
  339. return result;
  340. }
  341. virtual Status CreateDir(const std::string& name) {
  342. Status result;
  343. if (mkdir(name.c_str(), 0755) != 0) {
  344. result = IOError(name, errno);
  345. }
  346. return result;
  347. }
  348. virtual Status DeleteDir(const std::string& name) {
  349. Status result;
  350. if (rmdir(name.c_str()) != 0) {
  351. result = IOError(name, errno);
  352. }
  353. return result;
  354. }
  355. virtual Status GetFileSize(const std::string& fname, uint64_t* size) {
  356. Status s;
  357. struct stat sbuf;
  358. if (stat(fname.c_str(), &sbuf) != 0) {
  359. *size = 0;
  360. s = IOError(fname, errno);
  361. } else {
  362. *size = sbuf.st_size;
  363. }
  364. return s;
  365. }
  366. virtual Status RenameFile(const std::string& src, const std::string& target) {
  367. Status result;
  368. if (rename(src.c_str(), target.c_str()) != 0) {
  369. result = IOError(src, errno);
  370. }
  371. return result;
  372. }
  373. virtual Status LockFile(const std::string& fname, FileLock** lock) {
  374. *lock = NULL;
  375. Status result;
  376. int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
  377. if (fd < 0) {
  378. result = IOError(fname, errno);
  379. } else if (!locks_.Insert(fname)) {
  380. close(fd);
  381. result = Status::IOError("lock " + fname, "already held by process");
  382. } else if (LockOrUnlock(fd, true) == -1) {
  383. result = IOError("lock " + fname, errno);
  384. close(fd);
  385. locks_.Remove(fname);
  386. } else {
  387. PosixFileLock* my_lock = new PosixFileLock;
  388. my_lock->fd_ = fd;
  389. my_lock->name_ = fname;
  390. *lock = my_lock;
  391. }
  392. return result;
  393. }
  394. virtual Status UnlockFile(FileLock* lock) {
  395. PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
  396. Status result;
  397. if (LockOrUnlock(my_lock->fd_, false) == -1) {
  398. result = IOError("unlock", errno);
  399. }
  400. locks_.Remove(my_lock->name_);
  401. close(my_lock->fd_);
  402. delete my_lock;
  403. return result;
  404. }
  405. virtual void Schedule(void (*function)(void*), void* arg);
  406. virtual void StartThread(void (*function)(void* arg), void* arg);
  407. virtual Status GetTestDirectory(std::string* result) {
  408. const char* env = getenv("TEST_TMPDIR");
  409. if (env && env[0] != '\0') {
  410. *result = env;
  411. } else {
  412. char buf[100];
  413. snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid()));
  414. *result = buf;
  415. }
  416. // Directory may already exist
  417. CreateDir(*result);
  418. return Status::OK();
  419. }
  420. static uint64_t gettid() {
  421. pthread_t tid = pthread_self();
  422. uint64_t thread_id = 0;
  423. memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
  424. return thread_id;
  425. }
  426. virtual Status NewLogger(const std::string& fname, Logger** result) {
  427. FILE* f = fopen(fname.c_str(), "w");
  428. if (f == NULL) {
  429. *result = NULL;
  430. return IOError(fname, errno);
  431. } else {
  432. *result = new PosixLogger(f, &PosixEnv::gettid);
  433. return Status::OK();
  434. }
  435. }
  436. virtual uint64_t NowMicros() {
  437. struct timeval tv;
  438. gettimeofday(&tv, NULL);
  439. return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
  440. }
  441. virtual void SleepForMicroseconds(int micros) {
  442. usleep(micros);
  443. }
  444. private:
  445. void PthreadCall(const char* label, int result) {
  446. if (result != 0) {
  447. fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
  448. abort();
  449. }
  450. }
  451. // BGThread() is the body of the background thread
  452. void BGThread();
  453. static void* BGThreadWrapper(void* arg) {
  454. reinterpret_cast<PosixEnv*>(arg)->BGThread();
  455. return NULL;
  456. }
  457. pthread_mutex_t mu_;
  458. pthread_cond_t bgsignal_;
  459. pthread_t bgthread_;
  460. bool started_bgthread_;
  461. // Entry per Schedule() call
  462. struct BGItem { void* arg; void (*function)(void*); };
  463. typedef std::deque<BGItem> BGQueue;
  464. BGQueue queue_;
  465. PosixLockTable locks_;
  466. MmapLimiter mmap_limit_;
  467. };
  468. PosixEnv::PosixEnv() : started_bgthread_(false) {
  469. PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
  470. PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
  471. }
  472. void PosixEnv::Schedule(void (*function)(void*), void* arg) {
  473. PthreadCall("lock", pthread_mutex_lock(&mu_));
  474. // Start background thread if necessary
  475. if (!started_bgthread_) {
  476. started_bgthread_ = true;
  477. PthreadCall(
  478. "create thread",
  479. pthread_create(&bgthread_, NULL, &PosixEnv::BGThreadWrapper, this));
  480. }
  481. // If the queue is currently empty, the background thread may currently be
  482. // waiting.
  483. if (queue_.empty()) {
  484. PthreadCall("signal", pthread_cond_signal(&bgsignal_));
  485. }
  486. // Add to priority queue
  487. queue_.push_back(BGItem());
  488. queue_.back().function = function;
  489. queue_.back().arg = arg;
  490. PthreadCall("unlock", pthread_mutex_unlock(&mu_));
  491. }
  492. void PosixEnv::BGThread() {
  493. while (true) {
  494. // Wait until there is an item that is ready to run
  495. PthreadCall("lock", pthread_mutex_lock(&mu_));
  496. while (queue_.empty()) {
  497. PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
  498. }
  499. void (*function)(void*) = queue_.front().function;
  500. void* arg = queue_.front().arg;
  501. queue_.pop_front();
  502. PthreadCall("unlock", pthread_mutex_unlock(&mu_));
  503. (*function)(arg);
  504. }
  505. }
  506. namespace {
  507. struct StartThreadState {
  508. void (*user_function)(void*);
  509. void* arg;
  510. };
  511. }
  512. static void* StartThreadWrapper(void* arg) {
  513. StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
  514. state->user_function(state->arg);
  515. delete state;
  516. return NULL;
  517. }
  518. void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
  519. pthread_t t;
  520. StartThreadState* state = new StartThreadState;
  521. state->user_function = function;
  522. state->arg = arg;
  523. PthreadCall("start thread",
  524. pthread_create(&t, NULL, &StartThreadWrapper, state));
  525. }
  526. } // namespace
  527. static pthread_once_t once = PTHREAD_ONCE_INIT;
  528. static Env* default_env;
  529. static void InitDefaultEnv() { default_env = new PosixEnv; }
  530. Env* Env::Default() {
  531. pthread_once(&once, InitDefaultEnv);
  532. return default_env;
  533. }
  534. } // namespace leveldb