From 7d8e41e49b8fddda66a2c5f0a6a47f1a916e8d26 Mon Sep 17 00:00:00 2001 From: costan Date: Mon, 11 Mar 2019 13:04:53 -0700 Subject: [PATCH] leveldb: Replace AtomicPointer with std::atomic. This CL removes AtomicPointer from leveldb's port interface. Its usage is replaced with std::atomic<> from the C++11 standard library. AtomicPointer was used to wrap flags, numbers, and pointers, so its instances are replaced with std::atomic, std::atomic, std::atomic and std::atomic. This CL does not revise the memory ordering. AtomicPointer's methods are replaced mechanically with their std::atomic equivalents, even when the underlying usage is incorrect. (Example: DBImpl::has_imm_ is written using release stores, even though it is always read using relaxed ordering.) Revising the memory ordering is left for future CLs. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=237865146 --- db/db_bench.cc | 20 ------ db/db_impl.cc | 33 +++++----- db/db_impl.h | 7 ++- db/db_test.cc | 135 ++++++++++++++++++++------------------- db/skiplist.h | 77 +++++++++++------------ db/skiplist_test.cc | 21 ++++--- port/atomic_pointer.h | 171 -------------------------------------------------- port/port_example.h | 29 --------- port/port_stdcxx.h | 1 - util/arena.cc | 5 +- util/arena.h | 17 ++--- util/env_test.cc | 57 +++++++++-------- util/env_windows.cc | 56 +++++++---------- 13 files changed, 210 insertions(+), 419 deletions(-) delete mode 100644 port/atomic_pointer.h diff --git a/db/db_bench.cc b/db/db_bench.cc index 115cf45..f9403f4 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -34,7 +34,6 @@ // seekrandom -- N random seeks // open -- cost of opening a DB // crc32c -- repeated crc32c of 4K of data -// acquireload -- load N*1000 times // Meta operations: // compact -- Compact the entire DB // stats -- Print DB stats @@ -57,7 +56,6 @@ static const char* FLAGS_benchmarks = "crc32c," "snappycomp," "snappyuncomp," - "acquireload," ; // Number of key/values to place in database @@ -510,8 +508,6 @@ class Benchmark { method = &Benchmark::Compact; } else if (name == Slice("crc32c")) { method = &Benchmark::Crc32c; - } else if (name == Slice("acquireload")) { - method = &Benchmark::AcquireLoad; } else if (name == Slice("snappycomp")) { method = &Benchmark::SnappyCompress; } else if (name == Slice("snappyuncomp")) { @@ -639,22 +635,6 @@ class Benchmark { thread->stats.AddMessage(label); } - void AcquireLoad(ThreadState* thread) { - int dummy; - port::AtomicPointer ap(&dummy); - int count = 0; - void *ptr = nullptr; - thread->stats.AddMessage("(each op is 1000 loads)"); - while (count < 100000) { - for (int i = 0; i < 1000; i++) { - ptr = ap.Acquire_Load(); - } - count++; - thread->stats.FinishedSingleOp(); - } - if (ptr == nullptr) exit(1); // Disable unused variable warning. - } - void SnappyCompress(ThreadState* thread) { RandomGenerator gen; Slice input = gen.Generate(Options().block_size); diff --git a/db/db_impl.cc b/db/db_impl.cc index fefb883..3468862 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -132,10 +133,11 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) dbname_(dbname), table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))), db_lock_(nullptr), - shutting_down_(nullptr), + shutting_down_(false), background_work_finished_signal_(&mutex_), mem_(nullptr), imm_(nullptr), + has_imm_(false), logfile_(nullptr), logfile_number_(0), log_(nullptr), @@ -144,14 +146,12 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) background_compaction_scheduled_(false), manual_compaction_(nullptr), versions_(new VersionSet(dbname_, &options_, table_cache_, - &internal_comparator_)) { - has_imm_.Release_Store(nullptr); -} + &internal_comparator_)) {} DBImpl::~DBImpl() { - // Wait for background work to finish + // Wait for background work to finish. mutex_.Lock(); - shutting_down_.Release_Store(this); // Any non-null value is ok + shutting_down_.store(true, std::memory_order_release); while (background_compaction_scheduled_) { background_work_finished_signal_.Wait(); } @@ -547,7 +547,7 @@ void DBImpl::CompactMemTable() { Status s = WriteLevel0Table(imm_, &edit, base); base->Unref(); - if (s.ok() && shutting_down_.Acquire_Load()) { + if (s.ok() && shutting_down_.load(std::memory_order_acquire)) { s = Status::IOError("Deleting DB during memtable compaction"); } @@ -562,7 +562,7 @@ void DBImpl::CompactMemTable() { // Commit to the new state imm_->Unref(); imm_ = nullptr; - has_imm_.Release_Store(nullptr); + has_imm_.store(false, std::memory_order_release); DeleteObsoleteFiles(); } else { RecordBackgroundError(s); @@ -610,7 +610,8 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin, } MutexLock l(&mutex_); - while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) { + while (!manual.done && !shutting_down_.load(std::memory_order_acquire) && + bg_error_.ok()) { if (manual_compaction_ == nullptr) { // Idle manual_compaction_ = &manual; MaybeScheduleCompaction(); @@ -652,7 +653,7 @@ void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); if (background_compaction_scheduled_) { // Already scheduled - } else if (shutting_down_.Acquire_Load()) { + } else if (shutting_down_.load(std::memory_order_acquire)) { // DB is being deleted; no more background compactions } else if (!bg_error_.ok()) { // Already got an error; no more changes @@ -673,7 +674,7 @@ void DBImpl::BGWork(void* db) { void DBImpl::BackgroundCall() { MutexLock l(&mutex_); assert(background_compaction_scheduled_); - if (shutting_down_.Acquire_Load()) { + if (shutting_down_.load(std::memory_order_acquire)) { // No more background work when shutting down. } else if (!bg_error_.ok()) { // No more background work after a background error. @@ -752,7 +753,7 @@ void DBImpl::BackgroundCompaction() { if (status.ok()) { // Done - } else if (shutting_down_.Acquire_Load()) { + } else if (shutting_down_.load(std::memory_order_acquire)) { // Ignore compaction errors found during shutting down } else { Log(options_.info_log, @@ -919,9 +920,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { std::string current_user_key; bool has_current_user_key = false; SequenceNumber last_sequence_for_key = kMaxSequenceNumber; - for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { + for (; input->Valid() && !shutting_down_.load(std::memory_order_acquire); ) { // Prioritize immutable compaction work - if (has_imm_.NoBarrier_Load() != nullptr) { + if (has_imm_.load(std::memory_order_relaxed)) { const uint64_t imm_start = env_->NowMicros(); mutex_.Lock(); if (imm_ != nullptr) { @@ -1014,7 +1015,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { input->Next(); } - if (status.ok() && shutting_down_.Acquire_Load()) { + if (status.ok() && shutting_down_.load(std::memory_order_acquire)) { status = Status::IOError("Deleting DB during compaction"); } if (status.ok() && compact->builder != nullptr) { @@ -1378,7 +1379,7 @@ Status DBImpl::MakeRoomForWrite(bool force) { logfile_number_ = new_log_number; log_ = new log::Writer(lfile); imm_ = mem_; - has_imm_.Release_Store(imm_); + has_imm_.store(true, std::memory_order_release); mem_ = new MemTable(internal_comparator_); mem_->Ref(); force = false; // Do not force another compaction if have room diff --git a/db/db_impl.h b/db/db_impl.h index 00e800a..ca00d42 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -5,8 +5,11 @@ #ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_ #define STORAGE_LEVELDB_DB_DB_IMPL_H_ +#include #include #include +#include + #include "db/dbformat.h" #include "db/log_writer.h" #include "db/snapshot.h" @@ -136,11 +139,11 @@ class DBImpl : public DB { // State below is protected by mutex_ port::Mutex mutex_; - port::AtomicPointer shutting_down_; + std::atomic shutting_down_; port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_); MemTable* mem_; MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted - port::AtomicPointer has_imm_; // So bg thread can detect non-null imm_ + std::atomic has_imm_; // So bg thread can detect non-null imm_ WritableFile* logfile_; uint64_t logfile_number_ GUARDED_BY(mutex_); log::Writer* log_; diff --git a/db/db_test.cc b/db/db_test.cc index 894ed23..e889a74 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2,6 +2,9 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include +#include + #include "leveldb/db.h" #include "leveldb/filter_policy.h" #include "db/db_impl.h" @@ -61,7 +64,7 @@ class AtomicCounter { void DelayMilliseconds(int millis) { Env::Default()->SleepForMicroseconds(millis * 1000); } -} +} // namespace // Test Env to override default Env behavior for testing. class TestEnv : public EnvWrapper { @@ -93,45 +96,45 @@ class TestEnv : public EnvWrapper { bool ignore_dot_files_; }; -// Special Env used to delay background operations +// Special Env used to delay background operations. class SpecialEnv : public EnvWrapper { public: // sstable/log Sync() calls are blocked while this pointer is non-null. - port::AtomicPointer delay_data_sync_; + std::atomic delay_data_sync_; // sstable/log Sync() calls return an error. - port::AtomicPointer data_sync_error_; + std::atomic data_sync_error_; // Simulate no-space errors while this pointer is non-null. - port::AtomicPointer no_space_; + std::atomic no_space_; // Simulate non-writable file system while this pointer is non-null. - port::AtomicPointer non_writable_; + std::atomic non_writable_; // Force sync of manifest files to fail while this pointer is non-null. - port::AtomicPointer manifest_sync_error_; + std::atomic manifest_sync_error_; // Force write to manifest files to fail while this pointer is non-null. - port::AtomicPointer manifest_write_error_; + std::atomic manifest_write_error_; bool count_random_reads_; AtomicCounter random_read_counter_; - explicit SpecialEnv(Env* base) : EnvWrapper(base) { - delay_data_sync_.Release_Store(nullptr); - data_sync_error_.Release_Store(nullptr); - no_space_.Release_Store(nullptr); - non_writable_.Release_Store(nullptr); - count_random_reads_ = false; - manifest_sync_error_.Release_Store(nullptr); - manifest_write_error_.Release_Store(nullptr); + explicit SpecialEnv(Env* base) : EnvWrapper(base), + delay_data_sync_(false), + data_sync_error_(false), + no_space_(false), + non_writable_(false), + manifest_sync_error_(false), + manifest_write_error_(false), + count_random_reads_(false) { } Status NewWritableFile(const std::string& f, WritableFile** r) { class DataFile : public WritableFile { private: - SpecialEnv* env_; - WritableFile* base_; + SpecialEnv* const env_; + WritableFile* const base_; public: DataFile(SpecialEnv* env, WritableFile* base) @@ -140,7 +143,7 @@ class SpecialEnv : public EnvWrapper { } ~DataFile() { delete base_; } Status Append(const Slice& data) { - if (env_->no_space_.Acquire_Load() != nullptr) { + if (env_->no_space_.load(std::memory_order_acquire)) { // Drop writes on the floor return Status::OK(); } else { @@ -150,10 +153,10 @@ class SpecialEnv : public EnvWrapper { Status Close() { return base_->Close(); } Status Flush() { return base_->Flush(); } Status Sync() { - if (env_->data_sync_error_.Acquire_Load() != nullptr) { + if (env_->data_sync_error_.load(std::memory_order_acquire)) { return Status::IOError("simulated data sync error"); } - while (env_->delay_data_sync_.Acquire_Load() != nullptr) { + while (env_->delay_data_sync_.load(std::memory_order_acquire)) { DelayMilliseconds(100); } return base_->Sync(); @@ -167,7 +170,7 @@ class SpecialEnv : public EnvWrapper { ManifestFile(SpecialEnv* env, WritableFile* b) : env_(env), base_(b) { } ~ManifestFile() { delete base_; } Status Append(const Slice& data) { - if (env_->manifest_write_error_.Acquire_Load() != nullptr) { + if (env_->manifest_write_error_.load(std::memory_order_acquire)) { return Status::IOError("simulated writer error"); } else { return base_->Append(data); @@ -176,7 +179,7 @@ class SpecialEnv : public EnvWrapper { Status Close() { return base_->Close(); } Status Flush() { return base_->Flush(); } Status Sync() { - if (env_->manifest_sync_error_.Acquire_Load() != nullptr) { + if (env_->manifest_sync_error_.load(std::memory_order_acquire)) { return Status::IOError("simulated sync error"); } else { return base_->Sync(); @@ -184,7 +187,7 @@ class SpecialEnv : public EnvWrapper { } }; - if (non_writable_.Acquire_Load() != nullptr) { + if (non_writable_.load(std::memory_order_acquire)) { return Status::IOError("simulated write error"); } @@ -424,7 +427,7 @@ class DBTest { ASSERT_TRUE( db_->GetProperty("leveldb.num-files-at-level" + NumberToString(level), &property)); - return atoi(property.c_str()); + return std::stoi(property); } int TotalTableFiles() { @@ -587,11 +590,13 @@ TEST(DBTest, GetFromImmutableLayer) { ASSERT_OK(Put("foo", "v1")); ASSERT_EQ("v1", Get("foo")); - env_->delay_data_sync_.Release_Store(env_); // Block sync calls - Put("k1", std::string(100000, 'x')); // Fill memtable - Put("k2", std::string(100000, 'y')); // Trigger compaction + // Block sync calls. + env_->delay_data_sync_.store(true, std::memory_order_release); + Put("k1", std::string(100000, 'x')); // Fill memtable. + Put("k2", std::string(100000, 'y')); // Trigger compaction. ASSERT_EQ("v1", Get("foo")); - env_->delay_data_sync_.Release_Store(nullptr); // Release sync calls + // Release sync calls. + env_->delay_data_sync_.store(false, std::memory_order_release); } while (ChangeOptions()); } @@ -608,7 +613,7 @@ TEST(DBTest, GetMemUsage) { ASSERT_OK(Put("foo", "v1")); std::string val; ASSERT_TRUE(db_->GetProperty("leveldb.approximate-memory-usage", &val)); - int mem_usage = atoi(val.c_str()); + int mem_usage = std::stoi(val); ASSERT_GT(mem_usage, 0); ASSERT_LT(mem_usage, 5*1024*1024); } while (ChangeOptions()); @@ -1106,7 +1111,7 @@ TEST(DBTest, RepeatedWritesToSameKey) { for (int i = 0; i < 5 * kMaxFiles; i++) { Put("key", value); ASSERT_LE(TotalTableFiles(), kMaxFiles); - fprintf(stderr, "after %d: %d files\n", int(i+1), TotalTableFiles()); + fprintf(stderr, "after %d: %d files\n", i + 1, TotalTableFiles()); } } @@ -1271,7 +1276,7 @@ TEST(DBTest, IteratorPinsRef) { // Write to force compactions Put("foo", "newvalue1"); for (int i = 0; i < 100; i++) { - ASSERT_OK(Put(Key(i), Key(i) + std::string(100000, 'v'))); // 100K values + ASSERT_OK(Put(Key(i), Key(i) + std::string(100000, 'v'))); // 100K values } Put("foo", "newvalue2"); @@ -1459,21 +1464,21 @@ TEST(DBTest, L0_CompactionBug_Issue44_a) { TEST(DBTest, L0_CompactionBug_Issue44_b) { Reopen(); - Put("",""); + Put("", ""); Reopen(); Delete("e"); - Put("",""); + Put("", ""); Reopen(); Put("c", "cv"); Reopen(); - Put("",""); + Put("", ""); Reopen(); - Put("",""); + Put("", ""); DelayMilliseconds(1000); // Wait for compaction to finish Reopen(); - Put("d","dv"); + Put("d", "dv"); Reopen(); - Put("",""); + Put("", ""); Reopen(); Delete("d"); Delete("b"); @@ -1711,13 +1716,14 @@ TEST(DBTest, NoSpace) { ASSERT_EQ("v1", Get("foo")); Compact("a", "z"); const int num_files = CountFiles(); - env_->no_space_.Release_Store(env_); // Force out-of-space errors + // Force out-of-space errors. + env_->no_space_.store(true, std::memory_order_release); for (int i = 0; i < 10; i++) { for (int level = 0; level < config::kNumLevels-1; level++) { dbfull()->TEST_CompactRange(level, nullptr, nullptr); } } - env_->no_space_.Release_Store(nullptr); + env_->no_space_.store(false, std::memory_order_release); ASSERT_LT(CountFiles(), num_files + 3); } @@ -1727,7 +1733,8 @@ TEST(DBTest, NonWritableFileSystem) { options.env = env_; Reopen(&options); ASSERT_OK(Put("foo", "v1")); - env_->non_writable_.Release_Store(env_); // Force errors for new files + // Force errors for new files. + env_->non_writable_.store(true, std::memory_order_release); std::string big(100000, 'x'); int errors = 0; for (int i = 0; i < 20; i++) { @@ -1738,7 +1745,7 @@ TEST(DBTest, NonWritableFileSystem) { } } ASSERT_GT(errors, 0); - env_->non_writable_.Release_Store(nullptr); + env_->non_writable_.store(false, std::memory_order_release); } TEST(DBTest, WriteSyncError) { @@ -1748,7 +1755,7 @@ TEST(DBTest, WriteSyncError) { Options options = CurrentOptions(); options.env = env_; Reopen(&options); - env_->data_sync_error_.Release_Store(env_); + env_->data_sync_error_.store(true, std::memory_order_release); // (b) Normal write should succeed WriteOptions w; @@ -1762,7 +1769,7 @@ TEST(DBTest, WriteSyncError) { ASSERT_EQ("NOT_FOUND", Get("k2")); // (d) make sync behave normally - env_->data_sync_error_.Release_Store(nullptr); + env_->data_sync_error_.store(false, std::memory_order_release); // (e) Do a non-sync write; should fail w.sync = false; @@ -1782,7 +1789,7 @@ TEST(DBTest, ManifestWriteError) { // We iterate twice. In the second iteration, everything is the // same except the log record never makes it to the MANIFEST file. for (int iter = 0; iter < 2; iter++) { - port::AtomicPointer* error_type = (iter == 0) + std::atomic* error_type = (iter == 0) ? &env_->manifest_sync_error_ : &env_->manifest_write_error_; @@ -1802,12 +1809,12 @@ TEST(DBTest, ManifestWriteError) { ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo=>bar is now in last level // Merging compaction (will fail) - error_type->Release_Store(env_); + error_type->store(true, std::memory_order_release); dbfull()->TEST_CompactRange(last, nullptr, nullptr); // Should fail ASSERT_EQ("bar", Get("foo")); // Recovery: should not lose data - error_type->Release_Store(nullptr); + error_type->store(false, std::memory_order_release); Reopen(&options); ASSERT_EQ("bar", Get("foo")); } @@ -1878,7 +1885,7 @@ TEST(DBTest, BloomFilter) { dbfull()->TEST_CompactMemTable(); // Prevent auto compactions triggered by seeks - env_->delay_data_sync_.Release_Store(env_); + env_->delay_data_sync_.store(true, std::memory_order_release); // Lookup present keys. Should rarely read from small sstable. env_->random_read_counter_.Reset(); @@ -1899,7 +1906,7 @@ TEST(DBTest, BloomFilter) { fprintf(stderr, "%d missing => %d reads\n", N, reads); ASSERT_LE(reads, 3*N/100); - env_->delay_data_sync_.Release_Store(nullptr); + env_->delay_data_sync_.store(false, std::memory_order_release); Close(); delete options.block_cache; delete options.filter_policy; @@ -1914,9 +1921,9 @@ static const int kNumKeys = 1000; struct MTState { DBTest* test; - port::AtomicPointer stop; - port::AtomicPointer counter[kNumThreads]; - port::AtomicPointer thread_done[kNumThreads]; + std::atomic stop; + std::atomic counter[kNumThreads]; + std::atomic thread_done[kNumThreads]; }; struct MTThread { @@ -1928,13 +1935,13 @@ static void MTThreadBody(void* arg) { MTThread* t = reinterpret_cast(arg); int id = t->id; DB* db = t->state->test->db_; - uintptr_t counter = 0; + int counter = 0; fprintf(stderr, "... starting thread %d\n", id); Random rnd(1000 + id); std::string value; char valbuf[1500]; - while (t->state->stop.Acquire_Load() == nullptr) { - t->state->counter[id].Release_Store(reinterpret_cast(counter)); + while (!t->state->stop.load(std::memory_order_acquire)) { + t->state->counter[id].store(counter, std::memory_order_release); int key = rnd.Uniform(kNumKeys); char keybuf[20]; @@ -1959,14 +1966,13 @@ static void MTThreadBody(void* arg) { ASSERT_EQ(k, key); ASSERT_GE(w, 0); ASSERT_LT(w, kNumThreads); - ASSERT_LE(static_cast(c), reinterpret_cast( - t->state->counter[w].Acquire_Load())); + ASSERT_LE(c, t->state->counter[w].load(std::memory_order_acquire)); } } counter++; } - t->state->thread_done[id].Release_Store(t); - fprintf(stderr, "... stopping thread %d after %d ops\n", id, int(counter)); + t->state->thread_done[id].store(true, std::memory_order_release); + fprintf(stderr, "... stopping thread %d after %d ops\n", id, counter); } } // namespace @@ -1976,10 +1982,10 @@ TEST(DBTest, MultiThreaded) { // Initialize state MTState mt; mt.test = this; - mt.stop.Release_Store(0); + mt.stop.store(false, std::memory_order_release); for (int id = 0; id < kNumThreads; id++) { - mt.counter[id].Release_Store(0); - mt.thread_done[id].Release_Store(0); + mt.counter[id].store(false, std::memory_order_release); + mt.thread_done[id].store(false, std::memory_order_release); } // Start threads @@ -1994,9 +2000,9 @@ TEST(DBTest, MultiThreaded) { DelayMilliseconds(kTestSeconds * 1000); // Stop the threads and wait for them to finish - mt.stop.Release_Store(&mt); + mt.stop.store(true, std::memory_order_release); for (int id = 0; id < kNumThreads; id++) { - while (mt.thread_done[id].Acquire_Load() == nullptr) { + while (!mt.thread_done[id].load(std::memory_order_acquire)) { DelayMilliseconds(100); } } @@ -2100,6 +2106,7 @@ class ModelDB: public DB { virtual Slice key() const { return iter_->first; } virtual Slice value() const { return iter_->second; } virtual Status status() const { return Status::OK(); } + private: const KVMap* const map_; const bool owned_; // Do we own map_ diff --git a/db/skiplist.h b/db/skiplist.h index b806ce0..7ac914b 100644 --- a/db/skiplist.h +++ b/db/skiplist.h @@ -27,9 +27,10 @@ // // ... prev vs. next pointer ordering ... -#include -#include -#include "port/port.h" +#include +#include +#include + #include "util/arena.h" #include "util/random.h" @@ -105,11 +106,10 @@ class SkipList { // Modified only by Insert(). Read racily by readers, but stale // values are ok. - port::AtomicPointer max_height_; // Height of the entire list + std::atomic max_height_; // Height of the entire list inline int GetMaxHeight() const { - return static_cast( - reinterpret_cast(max_height_.NoBarrier_Load())); + return max_height_.load(std::memory_order_relaxed); } // Read/written only by Insert(). @@ -144,7 +144,7 @@ class SkipList { // Implementation details follow template -struct SkipList::Node { +struct SkipList::Node { explicit Node(const Key& k) : key(k) { } Key const key; @@ -155,63 +155,63 @@ struct SkipList::Node { assert(n >= 0); // Use an 'acquire load' so that we observe a fully initialized // version of the returned Node. - return reinterpret_cast(next_[n].Acquire_Load()); + return next_[n].load(std::memory_order_acquire); } void SetNext(int n, Node* x) { assert(n >= 0); // Use a 'release store' so that anybody who reads through this // pointer observes a fully initialized version of the inserted node. - next_[n].Release_Store(x); + next_[n].store(x, std::memory_order_release); } // No-barrier variants that can be safely used in a few locations. Node* NoBarrier_Next(int n) { assert(n >= 0); - return reinterpret_cast(next_[n].NoBarrier_Load()); + return next_[n].load(std::memory_order_relaxed); } void NoBarrier_SetNext(int n, Node* x) { assert(n >= 0); - next_[n].NoBarrier_Store(x); + next_[n].store(x, std::memory_order_relaxed); } private: // Array of length equal to the node height. next_[0] is lowest level link. - port::AtomicPointer next_[1]; + std::atomic next_[1]; }; template -typename SkipList::Node* -SkipList::NewNode(const Key& key, int height) { - char* mem = arena_->AllocateAligned( - sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1)); - return new (mem) Node(key); +typename SkipList::Node* +SkipList::NewNode(const Key& key, int height) { + char* const node_memory = arena_->AllocateAligned( + sizeof(Node) + sizeof(std::atomic) * (height - 1)); + return new (node_memory) Node(key); } template -inline SkipList::Iterator::Iterator(const SkipList* list) { +inline SkipList::Iterator::Iterator(const SkipList* list) { list_ = list; node_ = nullptr; } template -inline bool SkipList::Iterator::Valid() const { +inline bool SkipList::Iterator::Valid() const { return node_ != nullptr; } template -inline const Key& SkipList::Iterator::key() const { +inline const Key& SkipList::Iterator::key() const { assert(Valid()); return node_->key; } template -inline void SkipList::Iterator::Next() { +inline void SkipList::Iterator::Next() { assert(Valid()); node_ = node_->Next(0); } template -inline void SkipList::Iterator::Prev() { +inline void SkipList::Iterator::Prev() { // Instead of using explicit "prev" links, we just search for the // last node that falls before key. assert(Valid()); @@ -222,17 +222,17 @@ inline void SkipList::Iterator::Prev() { } template -inline void SkipList::Iterator::Seek(const Key& target) { +inline void SkipList::Iterator::Seek(const Key& target) { node_ = list_->FindGreaterOrEqual(target, nullptr); } template -inline void SkipList::Iterator::SeekToFirst() { +inline void SkipList::Iterator::SeekToFirst() { node_ = list_->head_->Next(0); } template -inline void SkipList::Iterator::SeekToLast() { +inline void SkipList::Iterator::SeekToLast() { node_ = list_->FindLast(); if (node_ == list_->head_) { node_ = nullptr; @@ -240,7 +240,7 @@ inline void SkipList::Iterator::SeekToLast() { } template -int SkipList::RandomHeight() { +int SkipList::RandomHeight() { // Increase height with probability 1 in kBranching static const unsigned int kBranching = 4; int height = 1; @@ -253,14 +253,15 @@ int SkipList::RandomHeight() { } template -bool SkipList::KeyIsAfterNode(const Key& key, Node* n) const { +bool SkipList::KeyIsAfterNode(const Key& key, Node* n) const { // null n is considered infinite return (n != nullptr) && (compare_(n->key, key) < 0); } template -typename SkipList::Node* SkipList::FindGreaterOrEqual(const Key& key, Node** prev) - const { +typename SkipList::Node* +SkipList::FindGreaterOrEqual(const Key& key, + Node** prev) const { Node* x = head_; int level = GetMaxHeight() - 1; while (true) { @@ -281,8 +282,8 @@ typename SkipList::Node* SkipList::FindGreaterOr } template -typename SkipList::Node* -SkipList::FindLessThan(const Key& key) const { +typename SkipList::Node* +SkipList::FindLessThan(const Key& key) const { Node* x = head_; int level = GetMaxHeight() - 1; while (true) { @@ -302,7 +303,7 @@ SkipList::FindLessThan(const Key& key) const { } template -typename SkipList::Node* SkipList::FindLast() +typename SkipList::Node* SkipList::FindLast() const { Node* x = head_; int level = GetMaxHeight() - 1; @@ -322,11 +323,11 @@ typename SkipList::Node* SkipList::FindLast() } template -SkipList::SkipList(Comparator cmp, Arena* arena) +SkipList::SkipList(Comparator cmp, Arena* arena) : compare_(cmp), arena_(arena), head_(NewNode(0 /* any key will do */, kMaxHeight)), - max_height_(reinterpret_cast(1)), + max_height_(1), rnd_(0xdeadbeef) { for (int i = 0; i < kMaxHeight; i++) { head_->SetNext(i, nullptr); @@ -334,7 +335,7 @@ SkipList::SkipList(Comparator cmp, Arena* arena) } template -void SkipList::Insert(const Key& key) { +void SkipList::Insert(const Key& key) { // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual() // here since Insert() is externally synchronized. Node* prev[kMaxHeight]; @@ -348,8 +349,6 @@ void SkipList::Insert(const Key& key) { for (int i = GetMaxHeight(); i < height; i++) { prev[i] = head_; } - //fprintf(stderr, "Change height from %d to %d\n", max_height_, height); - // It is ok to mutate max_height_ without any synchronization // with concurrent readers. A concurrent reader that observes // the new value of max_height_ will see either the old value of @@ -357,7 +356,7 @@ void SkipList::Insert(const Key& key) { // the loop below. In the former case the reader will // immediately drop to the next level since nullptr sorts after all // keys. In the latter case the reader will use the new node. - max_height_.NoBarrier_Store(reinterpret_cast(height)); + max_height_.store(height, std::memory_order_relaxed); } x = NewNode(key, height); @@ -370,7 +369,7 @@ void SkipList::Insert(const Key& key) { } template -bool SkipList::Contains(const Key& key) const { +bool SkipList::Contains(const Key& key) const { Node* x = FindGreaterOrEqual(key, nullptr); if (x != nullptr && Equal(key, x->key)) { return true; diff --git a/db/skiplist_test.cc b/db/skiplist_test.cc index 24e0887..c4cf146 100644 --- a/db/skiplist_test.cc +++ b/db/skiplist_test.cc @@ -3,7 +3,10 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/skiplist.h" + +#include #include + #include "leveldb/env.h" #include "port/port.h" #include "port/thread_annotations.h" @@ -188,12 +191,12 @@ class ConcurrentTest { // Per-key generation struct State { - port::AtomicPointer generation[K]; - void Set(int k, intptr_t v) { - generation[k].Release_Store(reinterpret_cast(v)); + std::atomic generation[K]; + void Set(int k, int v) { + generation[k].store(v, std::memory_order_release); } - intptr_t Get(int k) { - return reinterpret_cast(generation[k].Acquire_Load()); + int Get(int k) { + return generation[k].load(std::memory_order_acquire); } State() { @@ -300,7 +303,7 @@ class TestState { public: ConcurrentTest t_; int seed_; - port::AtomicPointer quit_flag_; + std::atomic quit_flag_; enum ReaderState { STARTING, @@ -310,7 +313,7 @@ class TestState { explicit TestState(int s) : seed_(s), - quit_flag_(nullptr), + quit_flag_(false), state_(STARTING), state_cv_(&mu_) {} @@ -340,7 +343,7 @@ static void ConcurrentReader(void* arg) { Random rnd(state->seed_); int64_t reads = 0; state->Change(TestState::RUNNING); - while (!state->quit_flag_.Acquire_Load()) { + while (!state->quit_flag_.load(std::memory_order_acquire)) { state->t_.ReadStep(&rnd); ++reads; } @@ -362,7 +365,7 @@ static void RunConcurrent(int run) { for (int i = 0; i < kSize; i++) { state.t_.WriteStep(&rnd); } - state.quit_flag_.Release_Store(&state); // Any non-null arg will do + state.quit_flag_.store(true, std::memory_order_release); state.Wait(TestState::DONE); } } diff --git a/port/atomic_pointer.h b/port/atomic_pointer.h deleted file mode 100644 index d906f63..0000000 --- a/port/atomic_pointer.h +++ /dev/null @@ -1,171 +0,0 @@ -// Copyright (c) 2011 The LevelDB Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. See the AUTHORS file for names of contributors. - -// AtomicPointer provides storage for a lock-free pointer. -// Platform-dependent implementation of AtomicPointer: -// - If the platform provides a cheap barrier, we use it with raw pointers -// - If is present (on newer versions of gcc, it is), we use -// a -based AtomicPointer. However we prefer the memory -// barrier based version, because at least on a gcc 4.4 32-bit build -// on linux, we have encountered a buggy implementation. -// Also, some implementations are much slower than a memory-barrier -// based implementation (~16ns for based acquire-load vs. ~1ns for -// a barrier based acquire-load). -// This code is based on atomicops-internals-* in Google's perftools: -// http://code.google.com/p/google-perftools/source/browse/#svn%2Ftrunk%2Fsrc%2Fbase - -#ifndef PORT_ATOMIC_POINTER_H_ -#define PORT_ATOMIC_POINTER_H_ - -#include - -#include - -#if defined(_M_X64) || defined(__x86_64__) -#define ARCH_CPU_X86_FAMILY 1 -#elif defined(_M_IX86) || defined(__i386__) || defined(__i386) -#define ARCH_CPU_X86_FAMILY 1 -#elif defined(__ARMEL__) -#define ARCH_CPU_ARM_FAMILY 1 -#elif defined(__aarch64__) -#define ARCH_CPU_ARM64_FAMILY 1 -#elif defined(__ppc__) || defined(__powerpc__) || defined(__powerpc64__) -#define ARCH_CPU_PPC_FAMILY 1 -#elif defined(__mips__) -#define ARCH_CPU_MIPS_FAMILY 1 -#endif - -namespace leveldb { -namespace port { - -// Define MemoryBarrier() if available -// Windows on x86 -#if defined(OS_WIN) && defined(COMPILER_MSVC) && defined(ARCH_CPU_X86_FAMILY) -// windows.h already provides a MemoryBarrier(void) macro -// http://msdn.microsoft.com/en-us/library/ms684208(v=vs.85).aspx -#define LEVELDB_HAVE_MEMORY_BARRIER - -// Mac OS -#elif defined(__APPLE__) -inline void MemoryBarrier() { - std::atomic_thread_fence(std::memory_order_seq_cst); -} -#define LEVELDB_HAVE_MEMORY_BARRIER - -// Gcc on x86 -#elif defined(ARCH_CPU_X86_FAMILY) && defined(__GNUC__) -inline void MemoryBarrier() { - // See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on - // this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering. - __asm__ __volatile__("" : : : "memory"); -} -#define LEVELDB_HAVE_MEMORY_BARRIER - -// Sun Studio -#elif defined(ARCH_CPU_X86_FAMILY) && defined(__SUNPRO_CC) -inline void MemoryBarrier() { - // See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on - // this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering. - asm volatile("" : : : "memory"); -} -#define LEVELDB_HAVE_MEMORY_BARRIER - -// ARM Linux -#elif defined(ARCH_CPU_ARM_FAMILY) && defined(__linux__) -typedef void (*LinuxKernelMemoryBarrierFunc)(void); -// The Linux ARM kernel provides a highly optimized device-specific memory -// barrier function at a fixed memory address that is mapped in every -// user-level process. -// -// This beats using CPU-specific instructions which are, on single-core -// devices, un-necessary and very costly (e.g. ARMv7-A "dmb" takes more -// than 180ns on a Cortex-A8 like the one on a Nexus One). Benchmarking -// shows that the extra function call cost is completely negligible on -// multi-core devices. -// -inline void MemoryBarrier() { - (*(LinuxKernelMemoryBarrierFunc)0xffff0fa0)(); -} -#define LEVELDB_HAVE_MEMORY_BARRIER - -// ARM64 -#elif defined(ARCH_CPU_ARM64_FAMILY) -inline void MemoryBarrier() { - asm volatile("dmb sy" : : : "memory"); -} -#define LEVELDB_HAVE_MEMORY_BARRIER - -// PPC -#elif defined(ARCH_CPU_PPC_FAMILY) && defined(__GNUC__) -inline void MemoryBarrier() { - // TODO for some powerpc expert: is there a cheaper suitable variant? - // Perhaps by having separate barriers for acquire and release ops. - asm volatile("sync" : : : "memory"); -} -#define LEVELDB_HAVE_MEMORY_BARRIER - -// MIPS -#elif defined(ARCH_CPU_MIPS_FAMILY) && defined(__GNUC__) -inline void MemoryBarrier() { - __asm__ __volatile__("sync" : : : "memory"); -} -#define LEVELDB_HAVE_MEMORY_BARRIER - -#endif - -// AtomicPointer built using platform-specific MemoryBarrier(). -#if defined(LEVELDB_HAVE_MEMORY_BARRIER) -class AtomicPointer { - private: - void* rep_; - public: - AtomicPointer() { } - explicit AtomicPointer(void* p) : rep_(p) {} - inline void* NoBarrier_Load() const { return rep_; } - inline void NoBarrier_Store(void* v) { rep_ = v; } - inline void* Acquire_Load() const { - void* result = rep_; - MemoryBarrier(); - return result; - } - inline void Release_Store(void* v) { - MemoryBarrier(); - rep_ = v; - } -}; - -// AtomicPointer based on C++11 . -#else -class AtomicPointer { - private: - std::atomic rep_; - public: - AtomicPointer() { } - explicit AtomicPointer(void* v) : rep_(v) { } - inline void* Acquire_Load() const { - return rep_.load(std::memory_order_acquire); - } - inline void Release_Store(void* v) { - rep_.store(v, std::memory_order_release); - } - inline void* NoBarrier_Load() const { - return rep_.load(std::memory_order_relaxed); - } - inline void NoBarrier_Store(void* v) { - rep_.store(v, std::memory_order_relaxed); - } -}; - -#endif - -#undef LEVELDB_HAVE_MEMORY_BARRIER -#undef ARCH_CPU_X86_FAMILY -#undef ARCH_CPU_ARM_FAMILY -#undef ARCH_CPU_ARM64_FAMILY -#undef ARCH_CPU_PPC_FAMILY - -} // namespace port -} // namespace leveldb - -#endif // PORT_ATOMIC_POINTER_H_ diff --git a/port/port_example.h b/port/port_example.h index 9c648c3..1a8fca2 100644 --- a/port/port_example.h +++ b/port/port_example.h @@ -62,35 +62,6 @@ class CondVar { void SignallAll(); }; -// A type that holds a pointer that can be read or written atomically -// (i.e., without word-tearing.) -class AtomicPointer { - private: - intptr_t rep_; - public: - // Initialize to arbitrary value - AtomicPointer(); - - // Initialize to hold v - explicit AtomicPointer(void* v) : rep_(v) { } - - // Read and return the stored pointer with the guarantee that no - // later memory access (read or write) by this thread can be - // reordered ahead of this read. - void* Acquire_Load() const; - - // Set v as the stored pointer with the guarantee that no earlier - // memory access (read or write) by this thread can be reordered - // after this store. - void Release_Store(void* v); - - // Read the stored pointer with no ordering guarantees. - void* NoBarrier_Load() const; - - // Set va as the stored pointer with no ordering guarantees. - void NoBarrier_Store(void* v); -}; - // ------------------ Compression ------------------- // Store the snappy compression of "input[0,input_length-1]" in *output. diff --git a/port/port_stdcxx.h b/port/port_stdcxx.h index 4713e26..e21fa70 100644 --- a/port/port_stdcxx.h +++ b/port/port_stdcxx.h @@ -35,7 +35,6 @@ #include // NOLINT #include // NOLINT #include -#include "port/atomic_pointer.h" #include "port/thread_annotations.h" namespace leveldb { diff --git a/util/arena.cc b/util/arena.cc index a0338bf..a496ad0 100644 --- a/util/arena.cc +++ b/util/arena.cc @@ -3,7 +3,6 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "util/arena.h" -#include namespace leveldb { @@ -60,8 +59,8 @@ char* Arena::AllocateAligned(size_t bytes) { char* Arena::AllocateNewBlock(size_t block_bytes) { char* result = new char[block_bytes]; blocks_.push_back(result); - memory_usage_.NoBarrier_Store( - reinterpret_cast(MemoryUsage() + block_bytes + sizeof(char*))); + memory_usage_.fetch_add(block_bytes + sizeof(char*), + std::memory_order_relaxed); return result; } diff --git a/util/arena.h b/util/arena.h index 48bab33..624e262 100644 --- a/util/arena.h +++ b/util/arena.h @@ -5,11 +5,11 @@ #ifndef STORAGE_LEVELDB_UTIL_ARENA_H_ #define STORAGE_LEVELDB_UTIL_ARENA_H_ +#include +#include +#include +#include #include -#include -#include -#include -#include "port/port.h" namespace leveldb { @@ -21,13 +21,13 @@ class Arena { // Return a pointer to a newly allocated memory block of "bytes" bytes. char* Allocate(size_t bytes); - // Allocate memory with the normal alignment guarantees provided by malloc + // Allocate memory with the normal alignment guarantees provided by malloc. char* AllocateAligned(size_t bytes); // Returns an estimate of the total memory usage of data allocated // by the arena. size_t MemoryUsage() const { - return reinterpret_cast(memory_usage_.NoBarrier_Load()); + return memory_usage_.load(std::memory_order_relaxed); } private: @@ -42,7 +42,10 @@ class Arena { std::vector blocks_; // Total memory usage of the arena. - port::AtomicPointer memory_usage_; + // + // TODO(costan): This member is accessed via atomics, but the others are + // accessed without any locking. Is this OK? + std::atomic memory_usage_; // No copying allowed Arena(const Arena&); diff --git a/util/env_test.cc b/util/env_test.cc index 070109b..b204089 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -5,6 +5,7 @@ #include "leveldb/env.h" #include +#include #include "port/port.h" #include "port/thread_annotations.h" @@ -24,10 +25,15 @@ class EnvTest { EnvTest() : env_(Env::Default()) { } }; -static void SetBool(void* ptr) { - reinterpret_cast(ptr)->NoBarrier_Store(ptr); +namespace { + +static void SetAtomicBool(void* atomic_bool_ptr) { + std::atomic* atomic_bool = + reinterpret_cast*>(atomic_bool_ptr); + atomic_bool->store(true, std::memory_order_relaxed); } +} // namespace TEST(EnvTest, ReadWrite) { Random rnd(test::RandomSeed()); @@ -77,42 +83,41 @@ TEST(EnvTest, ReadWrite) { } TEST(EnvTest, RunImmediately) { - port::AtomicPointer called(nullptr); - env_->Schedule(&SetBool, &called); + std::atomic called(false); + env_->Schedule(&SetAtomicBool, &called); env_->SleepForMicroseconds(kDelayMicros); - ASSERT_TRUE(called.NoBarrier_Load() != nullptr); + ASSERT_TRUE(called.load(std::memory_order_relaxed)); } TEST(EnvTest, RunMany) { - port::AtomicPointer last_id(nullptr); + std::atomic last_id(0); - struct CB { - port::AtomicPointer* last_id_ptr; // Pointer to shared slot - uintptr_t id; // Order# for the execution of this callback + struct Callback { + std::atomic* const last_id_ptr_; // Pointer to shared state. + const int id_; // Order# for the execution of this callback. - CB(port::AtomicPointer* p, int i) : last_id_ptr(p), id(i) { } + Callback(std::atomic* last_id_ptr, int id) + : last_id_ptr_(last_id_ptr), id_(id) { } - static void Run(void* v) { - CB* cb = reinterpret_cast(v); - void* cur = cb->last_id_ptr->NoBarrier_Load(); - ASSERT_EQ(cb->id-1, reinterpret_cast(cur)); - cb->last_id_ptr->Release_Store(reinterpret_cast(cb->id)); + static void Run(void* arg) { + Callback* callback = reinterpret_cast(arg); + int current_id = callback->last_id_ptr_->load(std::memory_order_relaxed); + ASSERT_EQ(callback->id_ - 1, current_id); + callback->last_id_ptr_->store(callback->id_, std::memory_order_relaxed); } }; - // Schedule in different order than start time - CB cb1(&last_id, 1); - CB cb2(&last_id, 2); - CB cb3(&last_id, 3); - CB cb4(&last_id, 4); - env_->Schedule(&CB::Run, &cb1); - env_->Schedule(&CB::Run, &cb2); - env_->Schedule(&CB::Run, &cb3); - env_->Schedule(&CB::Run, &cb4); + Callback callback1(&last_id, 1); + Callback callback2(&last_id, 2); + Callback callback3(&last_id, 3); + Callback callback4(&last_id, 4); + env_->Schedule(&Callback::Run, &callback1); + env_->Schedule(&Callback::Run, &callback2); + env_->Schedule(&Callback::Run, &callback3); + env_->Schedule(&Callback::Run, &callback4); env_->SleepForMicroseconds(kDelayMicros); - void* cur = last_id.Acquire_Load(); - ASSERT_EQ(4, reinterpret_cast(cur)); + ASSERT_EQ(4, last_id.load(std::memory_order_relaxed)); } struct State { diff --git a/util/env_windows.cc b/util/env_windows.cc index 57932bb..3b4496b 100644 --- a/util/env_windows.cc +++ b/util/env_windows.cc @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -105,51 +106,42 @@ class ScopedHandle { }; // Helper class to limit resource usage to avoid exhaustion. -// Currently used to limit mmap file usage so that we do not end -// up running out virtual memory, or running into kernel performance -// problems for very large databases. +// Currently used to limit read-only file descriptors and mmap file usage +// so that we do not run out of file descriptors or virtual memory, or run into +// kernel performance problems for very large databases. class Limiter { public: - // Limit maximum number of resources to |n|. - Limiter(intptr_t n) { SetAllowed(n); } + // Limit maximum number of resources to |max_acquires|. + Limiter(int max_acquires) : acquires_allowed_(max_acquires) {} + + Limiter(const Limiter&) = delete; + Limiter operator=(const Limiter&) = delete; // If another resource is available, acquire it and return true. // Else return false. - bool Acquire() LOCKS_EXCLUDED(mu_) { - if (GetAllowed() <= 0) { - return false; - } - MutexLock l(&mu_); - intptr_t x = GetAllowed(); - if (x <= 0) { - return false; - } else { - SetAllowed(x - 1); + bool Acquire() { + int old_acquires_allowed = + acquires_allowed_.fetch_sub(1, std::memory_order_relaxed); + + if (old_acquires_allowed > 0) return true; - } + + acquires_allowed_.fetch_add(1, std::memory_order_relaxed); + return false; } // Release a resource acquired by a previous call to Acquire() that returned // true. - void Release() LOCKS_EXCLUDED(mu_) { - MutexLock l(&mu_); - SetAllowed(GetAllowed() + 1); + void Release() { + acquires_allowed_.fetch_add(1, std::memory_order_relaxed); } private: - port::Mutex mu_; - port::AtomicPointer allowed_; - - intptr_t GetAllowed() const { - return reinterpret_cast(allowed_.Acquire_Load()); - } - - void SetAllowed(intptr_t v) EXCLUSIVE_LOCKS_REQUIRED(mu_) { - allowed_.Release_Store(reinterpret_cast(v)); - } - - Limiter(const Limiter&); - void operator=(const Limiter&); + // The number of available resources. + // + // This is a counter and is not tied to the invariants of any other class, so + // it can be operated on safely using std::memory_order_relaxed. + std::atomic acquires_allowed_; }; class WindowsSequentialFile : public SequentialFile {