瀏覽代碼

Rework threading in env_posix.cc.

This commit replaces the use of pthreads in the POSIX port with std::thread
and port::Mutex + port::CondVar. This is intended to simplify porting
the env to a different platform.

The indirect use of pthreads in PosixLogger is replaced with
std:🧵:id(), based on an approach prototyped by @cmumfordx@.

The pthreads dependency in CMakeFiles is not removed, because some C++
standard library implementations must be linked against pthreads for
std::thread use. Figuring out this dependency is left for future work.

Switching away from pthreads also fixes
https://github.com/google/leveldb/issues/381

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=212478311
main
costan 6 年之前
committed by Victor Costan
父節點
當前提交
73d5834ece
共有 2 個檔案被更改,包括 80 行新增91 行删除
  1. +59
    -81
      util/env_posix.cc
  2. +21
    -10
      util/posix_logger.h

+ 59
- 81
util/env_posix.cc 查看文件

@ -6,7 +6,6 @@
#include <errno.h> #include <errno.h>
#include <fcntl.h> #include <fcntl.h>
#include <pthread.h> #include <pthread.h>
#include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <sys/mman.h> #include <sys/mman.h>
@ -19,9 +18,10 @@
#include <atomic> #include <atomic>
#include <cstring> #include <cstring>
#include <deque>
#include <limits> #include <limits>
#include <queue>
#include <set> #include <set>
#include <thread>
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/slice.h" #include "leveldb/slice.h"
@ -600,20 +600,13 @@ class PosixEnv : public Env {
return Status::OK(); return Status::OK();
} }
static uint64_t gettid() {
pthread_t tid = pthread_self();
uint64_t thread_id = 0;
memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
return thread_id;
}
virtual Status NewLogger(const std::string& fname, Logger** result) { virtual Status NewLogger(const std::string& fname, Logger** result) {
FILE* f = fopen(fname.c_str(), "w"); FILE* f = fopen(fname.c_str(), "w");
if (f == nullptr) { if (f == nullptr) {
*result = nullptr; *result = nullptr;
return PosixError(fname, errno); return PosixError(fname, errno);
} else { } else {
*result = new PosixLogger(f, &PosixEnv::gettid); *result = new PosixLogger(f);
return Status::OK(); return Status::OK();
} }
} }
@ -629,29 +622,33 @@ class PosixEnv : public Env {
} }
private: private:
void PthreadCall(const char* label, int result) { void BackgroundThreadMain();
if (result != 0) {
fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
abort();
}
}
// BGThread() is the body of the background thread static void BackgroundThreadEntryPoint(PosixEnv* env) {
void BGThread(); env->BackgroundThreadMain();
static void* BGThreadWrapper(void* arg) {
reinterpret_cast<PosixEnv*>(arg)->BGThread();
return nullptr;
} }
pthread_mutex_t mu_; // Stores the work item data in a Schedule() call.
pthread_cond_t bgsignal_; //
pthread_t bgthread_; // Instances are constructed on the thread calling Schedule() and used on the
bool started_bgthread_; // background thread.
//
// This structure is thread-safe beacuse it is immutable.
struct BackgroundWorkItem {
explicit BackgroundWorkItem(void (*function)(void* arg), void* arg)
: function(function), arg(arg) {}
void (* const function)(void*);
void* const arg;
};
port::Mutex background_work_mutex_;
port::CondVar background_work_cv_ GUARDED_BY(background_work_mutex_);
bool started_background_thread_ GUARDED_BY(background_work_mutex_);
// Entry per Schedule() call std::queue<BackgroundWorkItem> background_work_queue_
struct BGItem { void* arg; void (*function)(void*); }; GUARDED_BY(background_work_mutex_);
typedef std::deque<BGItem> BGQueue;
BGQueue queue_;
PosixLockTable locks_; PosixLockTable locks_;
Limiter mmap_limit_; Limiter mmap_limit_;
@ -687,79 +684,60 @@ static intptr_t MaxOpenFiles() {
} }
PosixEnv::PosixEnv() PosixEnv::PosixEnv()
: started_bgthread_(false), : background_work_cv_(&background_work_mutex_),
started_background_thread_(false),
mmap_limit_(MaxMmaps()), mmap_limit_(MaxMmaps()),
fd_limit_(MaxOpenFiles()) { fd_limit_(MaxOpenFiles()) {
PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr));
} }
void PosixEnv::Schedule(void (*function)(void*), void* arg) { void PosixEnv::Schedule(
PthreadCall("lock", pthread_mutex_lock(&mu_)); void (*background_work_function)(void* background_work_arg),
void* background_work_arg) {
MutexLock lock(&background_work_mutex_);
// Start background thread if necessary // Start the background thread, if we haven't done so already.
if (!started_bgthread_) { if (!started_background_thread_) {
started_bgthread_ = true; started_background_thread_ = true;
PthreadCall( std::thread background_thread(PosixEnv::BackgroundThreadEntryPoint, this);
"create thread", background_thread.detach();
pthread_create(&bgthread_, nullptr, &PosixEnv::BGThreadWrapper, this));
} }
// If the queue is currently empty, the background thread may currently be // If the queue is empty, the background thread may be waiting for work.
// waiting. if (background_work_queue_.empty()) {
if (queue_.empty()) { background_work_cv_.Signal();
PthreadCall("signal", pthread_cond_signal(&bgsignal_));
} }
// Add to priority queue background_work_queue_.emplace(background_work_function, background_work_arg);
queue_.push_back(BGItem());
queue_.back().function = function;
queue_.back().arg = arg;
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
} }
void PosixEnv::BGThread() { void PosixEnv::BackgroundThreadMain() {
while (true) { while (true) {
// Wait until there is an item that is ready to run background_work_mutex_.Lock();
PthreadCall("lock", pthread_mutex_lock(&mu_)); // Wait until there is work to be done.
while (queue_.empty()) { while (background_work_queue_.empty()) {
PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); background_work_cv_.Wait();
} }
void (*function)(void*) = queue_.front().function; assert(!background_work_queue_.empty());
void* arg = queue_.front().arg; auto background_work_function =
queue_.pop_front(); background_work_queue_.front().function;
void* background_work_arg = background_work_queue_.front().arg;
background_work_queue_.pop();
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); background_work_mutex_.Unlock();
(*function)(arg); background_work_function(background_work_arg);
} }
} }
namespace { } // namespace
struct StartThreadState {
void (*user_function)(void*);
void* arg;
};
}
static void* StartThreadWrapper(void* arg) {
StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
state->user_function(state->arg);
delete state;
return nullptr;
}
void PosixEnv::StartThread(void (*function)(void* arg), void* arg) { void PosixEnv::StartThread(void (*thread_main)(void* thread_main_arg),
pthread_t t; void* thread_main_arg) {
StartThreadState* state = new StartThreadState; std::thread new_thread(thread_main, thread_main_arg);
state->user_function = function; new_thread.detach();
state->arg = arg;
PthreadCall("start thread",
pthread_create(&t, nullptr, &StartThreadWrapper, state));
} }
} // namespace
static pthread_once_t once = PTHREAD_ONCE_INIT; static pthread_once_t once = PTHREAD_ONCE_INIT;
static Env* default_env; static Env* default_env;
static void InitDefaultEnv() { default_env = new PosixEnv; } static void InitDefaultEnv() { default_env = new PosixEnv; }

+ 21
- 10
util/posix_logger.h 查看文件

@ -11,10 +11,11 @@
#include <sys/time.h> #include <sys/time.h>
#include <cassert> #include <cassert>
#include <cinttypes>
#include <cstdarg> #include <cstdarg>
#include <cstdio> #include <cstdio>
#include <ctime> #include <ctime>
#include <sstream>
#include <thread>
#include "leveldb/env.h" #include "leveldb/env.h"
@ -22,7 +23,10 @@ namespace leveldb {
class PosixLogger final : public Logger { class PosixLogger final : public Logger {
public: public:
PosixLogger(FILE* fp, uint64_t (*gettid)()) : fp_(fp), gettid_(gettid) { // Creates a logger that writes to the given file.
//
// The PosixLogger instance takes ownership of the file handle.
explicit PosixLogger(std::FILE* fp) : fp_(fp) {
assert(fp != nullptr); assert(fp != nullptr);
} }
@ -38,7 +42,14 @@ class PosixLogger final : public Logger {
struct std::tm now_components; struct std::tm now_components;
::localtime_r(&now_seconds, &now_components); ::localtime_r(&now_seconds, &now_components);
const uint64_t thread_id = (*gettid_)(); // Record the thread ID.
constexpr const int kMaxThreadIdSize = 32;
std::ostringstream thread_stream;
thread_stream << std::this_thread::get_id();
std::string thread_id = thread_stream.str();
if (thread_id.size() > kMaxThreadIdSize) {
thread_id.resize(kMaxThreadIdSize);
}
// We first attempt to print into a stack-allocated buffer. If this attempt // We first attempt to print into a stack-allocated buffer. If this attempt
// fails, we make a second attempt with a dynamically allocated buffer. // fails, we make a second attempt with a dynamically allocated buffer.
@ -57,7 +68,7 @@ class PosixLogger final : public Logger {
// Print the header into the buffer. // Print the header into the buffer.
int buffer_offset = snprintf( int buffer_offset = snprintf(
buffer, buffer_size, buffer, buffer_size,
"%04d/%02d/%02d-%02d:%02d:%02d.%06d %span>" PRIx64 " ", "%04d/%02d/%02d-%02d:%02d:%02d.%06d %s",
now_components.tm_year + 1900, now_components.tm_year + 1900,
now_components.tm_mon + 1, now_components.tm_mon + 1,
now_components.tm_mday, now_components.tm_mday,
@ -65,12 +76,13 @@ class PosixLogger final : public Logger {
now_components.tm_min, now_components.tm_min,
now_components.tm_sec, now_components.tm_sec,
static_cast<int>(now_timeval.tv_usec), static_cast<int>(now_timeval.tv_usec),
thread_id); thread_id.c_str());
// The header can be at most 48 characters (10 date + 15 time + 3 spacing // The header can be at most 28 characters (10 date + 15 time +
// + 20 thread ID), which should fit comfortably into the static buffer. // 3 spacing) plus the thread ID, which should fit comfortably into the
assert(buffer_offset <= 48); // static buffer.
static_assert(48 < kStackBufferSize, assert(buffer_offset <= 28 + kMaxThreadIdSize);
static_assert(28 + kMaxThreadIdSize < kStackBufferSize,
"stack-allocated buffer may not fit the message header"); "stack-allocated buffer may not fit the message header");
assert(buffer_offset < buffer_size); assert(buffer_offset < buffer_size);
@ -120,7 +132,6 @@ class PosixLogger final : public Logger {
private: private:
std::FILE* const fp_; std::FILE* const fp_;
uint64_t (* const gettid_)(); // Return the thread id for the current thread.
}; };
} // namespace leveldb } // namespace leveldb

||||||
x
 
000:0
Loading…
取消
儲存