Browse Source

leveldb: Replace AtomicPointer with std::atomic.

This CL removes AtomicPointer from leveldb's port interface. Its usage is replaced with std::atomic<> from the C++11 standard library.

AtomicPointer was used to wrap flags, numbers, and pointers, so its instances are replaced with std::atomic<bool>, std::atomic<int>, std::atomic<size_t> and std::atomic<Node*>.

This CL does not revise the memory ordering. AtomicPointer's methods are replaced mechanically with their std::atomic equivalents, even when the underlying usage is incorrect. (Example: DBImpl::has_imm_ is written using release stores, even though it is always read using relaxed ordering.) Revising the memory ordering is left for future CLs.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=237865146
pull/1/head
costan 5 years ago
committed by Chris Mumford
parent
commit
7d8e41e49b
13 changed files with 210 additions and 419 deletions
  1. +0
    -20
      db/db_bench.cc
  2. +17
    -16
      db/db_impl.cc
  3. +5
    -2
      db/db_impl.h
  4. +71
    -64
      db/db_test.cc
  5. +38
    -39
      db/skiplist.h
  6. +12
    -9
      db/skiplist_test.cc
  7. +0
    -171
      port/atomic_pointer.h
  8. +0
    -29
      port/port_example.h
  9. +0
    -1
      port/port_stdcxx.h
  10. +2
    -3
      util/arena.cc
  11. +10
    -7
      util/arena.h
  12. +31
    -26
      util/env_test.cc
  13. +24
    -32
      util/env_windows.cc

+ 0
- 20
db/db_bench.cc View File

@ -34,7 +34,6 @@
// seekrandom -- N random seeks
// open -- cost of opening a DB
// crc32c -- repeated crc32c of 4K of data
// acquireload -- load N*1000 times
// Meta operations:
// compact -- Compact the entire DB
// stats -- Print DB stats
@ -57,7 +56,6 @@ static const char* FLAGS_benchmarks =
"crc32c,"
"snappycomp,"
"snappyuncomp,"
"acquireload,"
;
// Number of key/values to place in database
@ -510,8 +508,6 @@ class Benchmark {
method = &Benchmark::Compact;
} else if (name == Slice("crc32c")) {
method = &Benchmark::Crc32c;
} else if (name == Slice("acquireload")) {
method = &Benchmark::AcquireLoad;
} else if (name == Slice("snappycomp")) {
method = &Benchmark::SnappyCompress;
} else if (name == Slice("snappyuncomp")) {
@ -639,22 +635,6 @@ class Benchmark {
thread->stats.AddMessage(label);
}
void AcquireLoad(ThreadState* thread) {
int dummy;
port::AtomicPointer ap(&dummy);
int count = 0;
void *ptr = nullptr;
thread->stats.AddMessage("(each op is 1000 loads)");
while (count < 100000) {
for (int i = 0; i < 1000; i++) {
ptr = ap.Acquire_Load();
}
count++;
thread->stats.FinishedSingleOp();
}
if (ptr == nullptr) exit(1); // Disable unused variable warning.
}
void SnappyCompress(ThreadState* thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);

+ 17
- 16
db/db_impl.cc View File

@ -8,6 +8,7 @@
#include <stdio.h>
#include <algorithm>
#include <atomic>
#include <set>
#include <string>
#include <vector>
@ -132,10 +133,11 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
dbname_(dbname),
table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
db_lock_(nullptr),
shutting_down_(nullptr),
shutting_down_(false),
background_work_finished_signal_(&mutex_),
mem_(nullptr),
imm_(nullptr),
has_imm_(false),
logfile_(nullptr),
logfile_number_(0),
log_(nullptr),
@ -144,14 +146,12 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
background_compaction_scheduled_(false),
manual_compaction_(nullptr),
versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {
has_imm_.Release_Store(nullptr);
}
&internal_comparator_)) {}
DBImpl::~DBImpl() {
// Wait for background work to finish
// Wait for background work to finish.
mutex_.Lock();
shutting_down_.Release_Store(this); // Any non-null value is ok
shutting_down_.store(true, std::memory_order_release);
while (background_compaction_scheduled_) {
background_work_finished_signal_.Wait();
}
@ -547,7 +547,7 @@ void DBImpl::CompactMemTable() {
Status s = WriteLevel0Table(imm_, &edit, base);
base->Unref();
if (s.ok() && shutting_down_.Acquire_Load()) {
if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
s = Status::IOError("Deleting DB during memtable compaction");
}
@ -562,7 +562,7 @@ void DBImpl::CompactMemTable() {
// Commit to the new state
imm_->Unref();
imm_ = nullptr;
has_imm_.Release_Store(nullptr);
has_imm_.store(false, std::memory_order_release);
DeleteObsoleteFiles();
} else {
RecordBackgroundError(s);
@ -610,7 +610,8 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,
}
MutexLock l(&mutex_);
while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) {
while (!manual.done && !shutting_down_.load(std::memory_order_acquire) &&
bg_error_.ok()) {
if (manual_compaction_ == nullptr) { // Idle
manual_compaction_ = &manual;
MaybeScheduleCompaction();
@ -652,7 +653,7 @@ void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (background_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.Acquire_Load()) {
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
@ -673,7 +674,7 @@ void DBImpl::BGWork(void* db) {
void DBImpl::BackgroundCall() {
MutexLock l(&mutex_);
assert(background_compaction_scheduled_);
if (shutting_down_.Acquire_Load()) {
if (shutting_down_.load(std::memory_order_acquire)) {
// No more background work when shutting down.
} else if (!bg_error_.ok()) {
// No more background work after a background error.
@ -752,7 +753,7 @@ void DBImpl::BackgroundCompaction() {
if (status.ok()) {
// Done
} else if (shutting_down_.Acquire_Load()) {
} else if (shutting_down_.load(std::memory_order_acquire)) {
// Ignore compaction errors found during shutting down
} else {
Log(options_.info_log,
@ -919,9 +920,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
for (; input->Valid() && !shutting_down_.load(std::memory_order_acquire); ) {
// Prioritize immutable compaction work
if (has_imm_.NoBarrier_Load() != nullptr) {
if (has_imm_.load(std::memory_order_relaxed)) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (imm_ != nullptr) {
@ -1014,7 +1015,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
input->Next();
}
if (status.ok() && shutting_down_.Acquire_Load()) {
if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
status = Status::IOError("Deleting DB during compaction");
}
if (status.ok() && compact->builder != nullptr) {
@ -1378,7 +1379,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.Release_Store(imm_);
has_imm_.store(true, std::memory_order_release);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room

+ 5
- 2
db/db_impl.h View File

@ -5,8 +5,11 @@
#ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_
#define STORAGE_LEVELDB_DB_DB_IMPL_H_
#include <atomic>
#include <deque>
#include <set>
#include <string>
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
@ -136,11 +139,11 @@ class DBImpl : public DB {
// State below is protected by mutex_
port::Mutex mutex_;
port::AtomicPointer shutting_down_;
std::atomic<bool> shutting_down_;
port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_);
MemTable* mem_;
MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted
port::AtomicPointer has_imm_; // So bg thread can detect non-null imm_
std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_
WritableFile* logfile_;
uint64_t logfile_number_ GUARDED_BY(mutex_);
log::Writer* log_;

+ 71
- 64
db/db_test.cc View File

@ -2,6 +2,9 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <atomic>
#include <string>
#include "leveldb/db.h"
#include "leveldb/filter_policy.h"
#include "db/db_impl.h"
@ -61,7 +64,7 @@ class AtomicCounter {
void DelayMilliseconds(int millis) {
Env::Default()->SleepForMicroseconds(millis * 1000);
}
}
} // namespace
// Test Env to override default Env behavior for testing.
class TestEnv : public EnvWrapper {
@ -93,45 +96,45 @@ class TestEnv : public EnvWrapper {
bool ignore_dot_files_;
};
// Special Env used to delay background operations
// Special Env used to delay background operations.
class SpecialEnv : public EnvWrapper {
public:
// sstable/log Sync() calls are blocked while this pointer is non-null.
port::AtomicPointer delay_data_sync_;
std::atomic<bool> delay_data_sync_;
// sstable/log Sync() calls return an error.
port::AtomicPointer data_sync_error_;
std::atomic<bool> data_sync_error_;
// Simulate no-space errors while this pointer is non-null.
port::AtomicPointer no_space_;
std::atomic<bool> no_space_;
// Simulate non-writable file system while this pointer is non-null.
port::AtomicPointer non_writable_;
std::atomic<bool> non_writable_;
// Force sync of manifest files to fail while this pointer is non-null.
port::AtomicPointer manifest_sync_error_;
std::atomic<bool> manifest_sync_error_;
// Force write to manifest files to fail while this pointer is non-null.
port::AtomicPointer manifest_write_error_;
std::atomic<bool> manifest_write_error_;
bool count_random_reads_;
AtomicCounter random_read_counter_;
explicit SpecialEnv(Env* base) : EnvWrapper(base) {
delay_data_sync_.Release_Store(nullptr);
data_sync_error_.Release_Store(nullptr);
no_space_.Release_Store(nullptr);
non_writable_.Release_Store(nullptr);
count_random_reads_ = false;
manifest_sync_error_.Release_Store(nullptr);
manifest_write_error_.Release_Store(nullptr);
explicit SpecialEnv(Env* base) : EnvWrapper(base),
delay_data_sync_(false),
data_sync_error_(false),
no_space_(false),
non_writable_(false),
manifest_sync_error_(false),
manifest_write_error_(false),
count_random_reads_(false) {
}
Status NewWritableFile(const std::string& f, WritableFile** r) {
class DataFile : public WritableFile {
private:
SpecialEnv* env_;
WritableFile* base_;
SpecialEnv* const env_;
WritableFile* const base_;
public:
DataFile(SpecialEnv* env, WritableFile* base)
@ -140,7 +143,7 @@ class SpecialEnv : public EnvWrapper {
}
~DataFile() { delete base_; }
Status Append(const Slice& data) {
if (env_->no_space_.Acquire_Load() != nullptr) {
if (env_->no_space_.load(std::memory_order_acquire)) {
// Drop writes on the floor
return Status::OK();
} else {
@ -150,10 +153,10 @@ class SpecialEnv : public EnvWrapper {
Status Close() { return base_->Close(); }
Status Flush() { return base_->Flush(); }
Status Sync() {
if (env_->data_sync_error_.Acquire_Load() != nullptr) {
if (env_->data_sync_error_.load(std::memory_order_acquire)) {
return Status::IOError("simulated data sync error");
}
while (env_->delay_data_sync_.Acquire_Load() != nullptr) {
while (env_->delay_data_sync_.load(std::memory_order_acquire)) {
DelayMilliseconds(100);
}
return base_->Sync();
@ -167,7 +170,7 @@ class SpecialEnv : public EnvWrapper {
ManifestFile(SpecialEnv* env, WritableFile* b) : env_(env), base_(b) { }
~ManifestFile() { delete base_; }
Status Append(const Slice& data) {
if (env_->manifest_write_error_.Acquire_Load() != nullptr) {
if (env_->manifest_write_error_.load(std::memory_order_acquire)) {
return Status::IOError("simulated writer error");
} else {
return base_->Append(data);
@ -176,7 +179,7 @@ class SpecialEnv : public EnvWrapper {
Status Close() { return base_->Close(); }
Status Flush() { return base_->Flush(); }
Status Sync() {
if (env_->manifest_sync_error_.Acquire_Load() != nullptr) {
if (env_->manifest_sync_error_.load(std::memory_order_acquire)) {
return Status::IOError("simulated sync error");
} else {
return base_->Sync();
@ -184,7 +187,7 @@ class SpecialEnv : public EnvWrapper {
}
};
if (non_writable_.Acquire_Load() != nullptr) {
if (non_writable_.load(std::memory_order_acquire)) {
return Status::IOError("simulated write error");
}
@ -424,7 +427,7 @@ class DBTest {
ASSERT_TRUE(
db_->GetProperty("leveldb.num-files-at-level" + NumberToString(level),
&property));
return atoi(property.c_str());
return std::stoi(property);
}
int TotalTableFiles() {
@ -587,11 +590,13 @@ TEST(DBTest, GetFromImmutableLayer) {
ASSERT_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
env_->delay_data_sync_.Release_Store(env_); // Block sync calls
Put("k1", std::string(100000, 'x')); // Fill memtable
Put("k2", std::string(100000, 'y')); // Trigger compaction
// Block sync calls.
env_->delay_data_sync_.store(true, std::memory_order_release);
Put("k1", std::string(100000, 'x')); // Fill memtable.
Put("k2", std::string(100000, 'y')); // Trigger compaction.
ASSERT_EQ("v1", Get("foo"));
env_->delay_data_sync_.Release_Store(nullptr); // Release sync calls
// Release sync calls.
env_->delay_data_sync_.store(false, std::memory_order_release);
} while (ChangeOptions());
}
@ -608,7 +613,7 @@ TEST(DBTest, GetMemUsage) {
ASSERT_OK(Put("foo", "v1"));
std::string val;
ASSERT_TRUE(db_->GetProperty("leveldb.approximate-memory-usage", &val));
int mem_usage = atoi(val.c_str());
int mem_usage = std::stoi(val);
ASSERT_GT(mem_usage, 0);
ASSERT_LT(mem_usage, 5*1024*1024);
} while (ChangeOptions());
@ -1106,7 +1111,7 @@ TEST(DBTest, RepeatedWritesToSameKey) {
for (int i = 0; i < 5 * kMaxFiles; i++) {
Put("key", value);
ASSERT_LE(TotalTableFiles(), kMaxFiles);
fprintf(stderr, "after %d: %d files\n", int(i+1), TotalTableFiles());
fprintf(stderr, "after %d: %d files\n", i + 1, TotalTableFiles());
}
}
@ -1271,7 +1276,7 @@ TEST(DBTest, IteratorPinsRef) {
// Write to force compactions
Put("foo", "newvalue1");
for (int i = 0; i < 100; i++) {
ASSERT_OK(Put(Key(i), Key(i) + std::string(100000, 'v'))); // 100K values
ASSERT_OK(Put(Key(i), Key(i) + std::string(100000, 'v'))); // 100K values
}
Put("foo", "newvalue2");
@ -1459,21 +1464,21 @@ TEST(DBTest, L0_CompactionBug_Issue44_a) {
TEST(DBTest, L0_CompactionBug_Issue44_b) {
Reopen();
Put("","");
Put("", "");
Reopen();
Delete("e");
Put("","");
Put("", "");
Reopen();
Put("c", "cv");
Reopen();
Put("","");
Put("", "");
Reopen();
Put("","");
Put("", "");
DelayMilliseconds(1000); // Wait for compaction to finish
Reopen();
Put("d","dv");
Put("d", "dv");
Reopen();
Put("","");
Put("", "");
Reopen();
Delete("d");
Delete("b");
@ -1711,13 +1716,14 @@ TEST(DBTest, NoSpace) {
ASSERT_EQ("v1", Get("foo"));
Compact("a", "z");
const int num_files = CountFiles();
env_->no_space_.Release_Store(env_); // Force out-of-space errors
// Force out-of-space errors.
env_->no_space_.store(true, std::memory_order_release);
for (int i = 0; i < 10; i++) {
for (int level = 0; level < config::kNumLevels-1; level++) {
dbfull()->TEST_CompactRange(level, nullptr, nullptr);
}
}
env_->no_space_.Release_Store(nullptr);
env_->no_space_.store(false, std::memory_order_release);
ASSERT_LT(CountFiles(), num_files + 3);
}
@ -1727,7 +1733,8 @@ TEST(DBTest, NonWritableFileSystem) {
options.env = env_;
Reopen(&options);
ASSERT_OK(Put("foo", "v1"));
env_->non_writable_.Release_Store(env_); // Force errors for new files
// Force errors for new files.
env_->non_writable_.store(true, std::memory_order_release);
std::string big(100000, 'x');
int errors = 0;
for (int i = 0; i < 20; i++) {
@ -1738,7 +1745,7 @@ TEST(DBTest, NonWritableFileSystem) {
}
}
ASSERT_GT(errors, 0);
env_->non_writable_.Release_Store(nullptr);
env_->non_writable_.store(false, std::memory_order_release);
}
TEST(DBTest, WriteSyncError) {
@ -1748,7 +1755,7 @@ TEST(DBTest, WriteSyncError) {
Options options = CurrentOptions();
options.env = env_;
Reopen(&options);
env_->data_sync_error_.Release_Store(env_);
env_->data_sync_error_.store(true, std::memory_order_release);
// (b) Normal write should succeed
WriteOptions w;
@ -1762,7 +1769,7 @@ TEST(DBTest, WriteSyncError) {
ASSERT_EQ("NOT_FOUND", Get("k2"));
// (d) make sync behave normally
env_->data_sync_error_.Release_Store(nullptr);
env_->data_sync_error_.store(false, std::memory_order_release);
// (e) Do a non-sync write; should fail
w.sync = false;
@ -1782,7 +1789,7 @@ TEST(DBTest, ManifestWriteError) {
// We iterate twice. In the second iteration, everything is the
// same except the log record never makes it to the MANIFEST file.
for (int iter = 0; iter < 2; iter++) {
port::AtomicPointer* error_type = (iter == 0)
std::atomic<bool>* error_type = (iter == 0)
? &env_->manifest_sync_error_
: &env_->manifest_write_error_;
@ -1802,12 +1809,12 @@ TEST(DBTest, ManifestWriteError) {
ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo=>bar is now in last level
// Merging compaction (will fail)
error_type->Release_Store(env_);
error_type->store(true, std::memory_order_release);
dbfull()->TEST_CompactRange(last, nullptr, nullptr); // Should fail
ASSERT_EQ("bar", Get("foo"));
// Recovery: should not lose data
error_type->Release_Store(nullptr);
error_type->store(false, std::memory_order_release);
Reopen(&options);
ASSERT_EQ("bar", Get("foo"));
}
@ -1878,7 +1885,7 @@ TEST(DBTest, BloomFilter) {
dbfull()->TEST_CompactMemTable();
// Prevent auto compactions triggered by seeks
env_->delay_data_sync_.Release_Store(env_);
env_->delay_data_sync_.store(true, std::memory_order_release);
// Lookup present keys. Should rarely read from small sstable.
env_->random_read_counter_.Reset();
@ -1899,7 +1906,7 @@ TEST(DBTest, BloomFilter) {
fprintf(stderr, "%d missing => %d reads\n", N, reads);
ASSERT_LE(reads, 3*N/100);
env_->delay_data_sync_.Release_Store(nullptr);
env_->delay_data_sync_.store(false, std::memory_order_release);
Close();
delete options.block_cache;
delete options.filter_policy;
@ -1914,9 +1921,9 @@ static const int kNumKeys = 1000;
struct MTState {
DBTest* test;
port::AtomicPointer stop;
port::AtomicPointer counter[kNumThreads];
port::AtomicPointer thread_done[kNumThreads];
std::atomic<bool> stop;
std::atomic<int> counter[kNumThreads];
std::atomic<bool> thread_done[kNumThreads];
};
struct MTThread {
@ -1928,13 +1935,13 @@ static void MTThreadBody(void* arg) {
MTThread* t = reinterpret_cast<MTThread*>(arg);
int id = t->id;
DB* db = t->state->test->db_;
uintptr_t counter = 0;
int counter = 0;
fprintf(stderr, "... starting thread %d\n", id);
Random rnd(1000 + id);
std::string value;
char valbuf[1500];
while (t->state->stop.Acquire_Load() == nullptr) {
t->state->counter[id].Release_Store(reinterpret_cast<void*>(counter));
while (!t->state->stop.load(std::memory_order_acquire)) {
t->state->counter[id].store(counter, std::memory_order_release);
int key = rnd.Uniform(kNumKeys);
char keybuf[20];
@ -1959,14 +1966,13 @@ static void MTThreadBody(void* arg) {
ASSERT_EQ(k, key);
ASSERT_GE(w, 0);
ASSERT_LT(w, kNumThreads);
ASSERT_LE(static_cast<uintptr_t>(c), reinterpret_cast<uintptr_t>(
t->state->counter[w].Acquire_Load()));
ASSERT_LE(c, t->state->counter[w].load(std::memory_order_acquire));
}
}
counter++;
}
t->state->thread_done[id].Release_Store(t);
fprintf(stderr, "... stopping thread %d after %d ops\n", id, int(counter));
t->state->thread_done[id].store(true, std::memory_order_release);
fprintf(stderr, "... stopping thread %d after %d ops\n", id, counter);
}
} // namespace
@ -1976,10 +1982,10 @@ TEST(DBTest, MultiThreaded) {
// Initialize state
MTState mt;
mt.test = this;
mt.stop.Release_Store(0);
mt.stop.store(false, std::memory_order_release);
for (int id = 0; id < kNumThreads; id++) {
mt.counter[id].Release_Store(0);
mt.thread_done[id].Release_Store(0);
mt.counter[id].store(false, std::memory_order_release);
mt.thread_done[id].store(false, std::memory_order_release);
}
// Start threads
@ -1994,9 +2000,9 @@ TEST(DBTest, MultiThreaded) {
DelayMilliseconds(kTestSeconds * 1000);
// Stop the threads and wait for them to finish
mt.stop.Release_Store(&mt);
mt.stop.store(true, std::memory_order_release);
for (int id = 0; id < kNumThreads; id++) {
while (mt.thread_done[id].Acquire_Load() == nullptr) {
while (!mt.thread_done[id].load(std::memory_order_acquire)) {
DelayMilliseconds(100);
}
}
@ -2100,6 +2106,7 @@ class ModelDB: public DB {
virtual Slice key() const { return iter_->first; }
virtual Slice value() const { return iter_->second; }
virtual Status status() const { return Status::OK(); }
private:
const KVMap* const map_;
const bool owned_; // Do we own map_

+ 38
- 39
db/skiplist.h View File

@ -27,9 +27,10 @@
//
// ... prev vs. next pointer ordering ...
#include <assert.h>
#include <stdlib.h>
#include "port/port.h"
#include <atomic>
#include <cassert>
#include <cstdlib>
#include "util/arena.h"
#include "util/random.h"
@ -105,11 +106,10 @@ class SkipList {
// Modified only by Insert(). Read racily by readers, but stale
// values are ok.
port::AtomicPointer max_height_; // Height of the entire list
std::atomic<int> max_height_; // Height of the entire list
inline int GetMaxHeight() const {
return static_cast<int>(
reinterpret_cast<intptr_t>(max_height_.NoBarrier_Load()));
return max_height_.load(std::memory_order_relaxed);
}
// Read/written only by Insert().
@ -144,7 +144,7 @@ class SkipList {
// Implementation details follow
template<typename Key, class Comparator>
struct SkipList<Key,Comparator>::Node {
struct SkipList<Key, Comparator>::Node {
explicit Node(const Key& k) : key(k) { }
Key const key;
@ -155,63 +155,63 @@ struct SkipList::Node {
assert(n >= 0);
// Use an 'acquire load' so that we observe a fully initialized
// version of the returned Node.
return reinterpret_cast<Node*>(next_[n].Acquire_Load());
return next_[n].load(std::memory_order_acquire);
}
void SetNext(int n, Node* x) {
assert(n >= 0);
// Use a 'release store' so that anybody who reads through this
// pointer observes a fully initialized version of the inserted node.
next_[n].Release_Store(x);
next_[n].store(x, std::memory_order_release);
}
// No-barrier variants that can be safely used in a few locations.
Node* NoBarrier_Next(int n) {
assert(n >= 0);
return reinterpret_cast<Node*>(next_[n].NoBarrier_Load());
return next_[n].load(std::memory_order_relaxed);
}
void NoBarrier_SetNext(int n, Node* x) {
assert(n >= 0);
next_[n].NoBarrier_Store(x);
next_[n].store(x, std::memory_order_relaxed);
}
private:
// Array of length equal to the node height. next_[0] is lowest level link.
port::AtomicPointer next_[1];
std::atomic<Node*> next_[1];
};
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::NewNode(const Key& key, int height) {
char* mem = arena_->AllocateAligned(
sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1));
return new (mem) Node(key);
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::NewNode(const Key& key, int height) {
char* const node_memory = arena_->AllocateAligned(
sizeof(Node) + sizeof(std::atomic<Node*>) * (height - 1));
return new (node_memory) Node(key);
}
template<typename Key, class Comparator>
inline SkipList<Key,Comparator>::Iterator::Iterator(const SkipList* list) {
inline SkipList<Key, Comparator>::Iterator::Iterator(const SkipList* list) {
list_ = list;
node_ = nullptr;
}
template<typename Key, class Comparator>
inline bool SkipList<Key,Comparator>::Iterator::Valid() const {
inline bool SkipList<Key, Comparator>::Iterator::Valid() const {
return node_ != nullptr;
}
template<typename Key, class Comparator>
inline const Key& SkipList<Key,Comparator>::Iterator::key() const {
inline const Key& SkipList<Key, Comparator>::Iterator::key() const {
assert(Valid());
return node_->key;
}
template<typename Key, class Comparator>
inline void SkipList<Key,Comparator>::Iterator::Next() {
inline void SkipList<Key, Comparator>::Iterator::Next() {
assert(Valid());
node_ = node_->Next(0);
}
template<typename Key, class Comparator>
inline void SkipList<Key,Comparator>::Iterator::Prev() {
inline void SkipList<Key, Comparator>::Iterator::Prev() {
// Instead of using explicit "prev" links, we just search for the
// last node that falls before key.
assert(Valid());
@ -222,17 +222,17 @@ inline void SkipList::Iterator::Prev() {
}
template<typename Key, class Comparator>
inline void SkipList<Key,Comparator>::Iterator::Seek(const Key& target) {
inline void SkipList<Key, Comparator>::Iterator::Seek(const Key& target) {
node_ = list_->FindGreaterOrEqual(target, nullptr);
}
template<typename Key, class Comparator>
inline void SkipList<Key,Comparator>::Iterator::SeekToFirst() {
inline void SkipList<Key, Comparator>::Iterator::SeekToFirst() {
node_ = list_->head_->Next(0);
}
template<typename Key, class Comparator>
inline void SkipList<Key,Comparator>::Iterator::SeekToLast() {
inline void SkipList<Key, Comparator>::Iterator::SeekToLast() {
node_ = list_->FindLast();
if (node_ == list_->head_) {
node_ = nullptr;
@ -240,7 +240,7 @@ inline void SkipList::Iterator::SeekToLast() {
}
template<typename Key, class Comparator>
int SkipList<Key,Comparator>::RandomHeight() {
int SkipList<Key, Comparator>::RandomHeight() {
// Increase height with probability 1 in kBranching
static const unsigned int kBranching = 4;
int height = 1;
@ -253,14 +253,15 @@ int SkipList::RandomHeight() {
}
template<typename Key, class Comparator>
bool SkipList<Key,Comparator>::KeyIsAfterNode(const Key& key, Node* n) const {
bool SkipList<Key, Comparator>::KeyIsAfterNode(const Key& key, Node* n) const {
// null n is considered infinite
return (n != nullptr) && (compare_(n->key, key) < 0);
}
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindGreaterOrEqual(const Key& key, Node** prev)
const {
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindGreaterOrEqual(const Key& key,
Node** prev) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
@ -281,8 +282,8 @@ typename SkipList::Node* SkipList::FindGreaterOr
}
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node*
SkipList<Key,Comparator>::FindLessThan(const Key& key) const {
typename SkipList<Key, Comparator>::Node*
SkipList<Key, Comparator>::FindLessThan(const Key& key) const {
Node* x = head_;
int level = GetMaxHeight() - 1;
while (true) {
@ -302,7 +303,7 @@ SkipList::FindLessThan(const Key& key) const {
}
template<typename Key, class Comparator>
typename SkipList<Key,Comparator>::Node* SkipList<Key,Comparator>::FindLast()
typename SkipList<Key, Comparator>::Node* SkipList<Key, Comparator>::FindLast()
const {
Node* x = head_;
int level = GetMaxHeight() - 1;
@ -322,11 +323,11 @@ typename SkipList::Node* SkipList::FindLast()
}
template<typename Key, class Comparator>
SkipList<Key,Comparator>::SkipList(Comparator cmp, Arena* arena)
SkipList<Key, Comparator>::SkipList(Comparator cmp, Arena* arena)
: compare_(cmp),
arena_(arena),
head_(NewNode(0 /* any key will do */, kMaxHeight)),
max_height_(reinterpret_cast<void*>(1)),
max_height_(1),
rnd_(0xdeadbeef) {
for (int i = 0; i < kMaxHeight; i++) {
head_->SetNext(i, nullptr);
@ -334,7 +335,7 @@ SkipList::SkipList(Comparator cmp, Arena* arena)
}
template<typename Key, class Comparator>
void SkipList<Key,Comparator>::Insert(const Key& key) {
void SkipList<Key, Comparator>::Insert(const Key& key) {
// TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
// here since Insert() is externally synchronized.
Node* prev[kMaxHeight];
@ -348,8 +349,6 @@ void SkipList::Insert(const Key& key) {
for (int i = GetMaxHeight(); i < height; i++) {
prev[i] = head_;
}
//fprintf(stderr, "Change height from %d to %d\n", max_height_, height);
// It is ok to mutate max_height_ without any synchronization
// with concurrent readers. A concurrent reader that observes
// the new value of max_height_ will see either the old value of
@ -357,7 +356,7 @@ void SkipList::Insert(const Key& key) {
// the loop below. In the former case the reader will
// immediately drop to the next level since nullptr sorts after all
// keys. In the latter case the reader will use the new node.
max_height_.NoBarrier_Store(reinterpret_cast<void*>(height));
max_height_.store(height, std::memory_order_relaxed);
}
x = NewNode(key, height);
@ -370,7 +369,7 @@ void SkipList::Insert(const Key& key) {
}
template<typename Key, class Comparator>
bool SkipList<Key,Comparator>::Contains(const Key& key) const {
bool SkipList<Key, Comparator>::Contains(const Key& key) const {
Node* x = FindGreaterOrEqual(key, nullptr);
if (x != nullptr && Equal(key, x->key)) {
return true;

+ 12
- 9
db/skiplist_test.cc View File

@ -3,7 +3,10 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/skiplist.h"
#include <atomic>
#include <set>
#include "leveldb/env.h"
#include "port/port.h"
#include "port/thread_annotations.h"
@ -188,12 +191,12 @@ class ConcurrentTest {
// Per-key generation
struct State {
port::AtomicPointer generation[K];
void Set(int k, intptr_t v) {
generation[k].Release_Store(reinterpret_cast<void*>(v));
std::atomic<int> generation[K];
void Set(int k, int v) {
generation[k].store(v, std::memory_order_release);
}
intptr_t Get(int k) {
return reinterpret_cast<intptr_t>(generation[k].Acquire_Load());
int Get(int k) {
return generation[k].load(std::memory_order_acquire);
}
State() {
@ -300,7 +303,7 @@ class TestState {
public:
ConcurrentTest t_;
int seed_;
port::AtomicPointer quit_flag_;
std::atomic<bool> quit_flag_;
enum ReaderState {
STARTING,
@ -310,7 +313,7 @@ class TestState {
explicit TestState(int s)
: seed_(s),
quit_flag_(nullptr),
quit_flag_(false),
state_(STARTING),
state_cv_(&mu_) {}
@ -340,7 +343,7 @@ static void ConcurrentReader(void* arg) {
Random rnd(state->seed_);
int64_t reads = 0;
state->Change(TestState::RUNNING);
while (!state->quit_flag_.Acquire_Load()) {
while (!state->quit_flag_.load(std::memory_order_acquire)) {
state->t_.ReadStep(&rnd);
++reads;
}
@ -362,7 +365,7 @@ static void RunConcurrent(int run) {
for (int i = 0; i < kSize; i++) {
state.t_.WriteStep(&rnd);
}
state.quit_flag_.Release_Store(&state); // Any non-null arg will do
state.quit_flag_.store(true, std::memory_order_release);
state.Wait(TestState::DONE);
}
}

+ 0
- 171
port/atomic_pointer.h View File

@ -1,171 +0,0 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
// AtomicPointer provides storage for a lock-free pointer.
// Platform-dependent implementation of AtomicPointer:
// - If the platform provides a cheap barrier, we use it with raw pointers
// - If <atomic> is present (on newer versions of gcc, it is), we use
// a <atomic>-based AtomicPointer. However we prefer the memory
// barrier based version, because at least on a gcc 4.4 32-bit build
// on linux, we have encountered a buggy <atomic> implementation.
// Also, some <atomic> implementations are much slower than a memory-barrier
// based implementation (~16ns for <atomic> based acquire-load vs. ~1ns for
// a barrier based acquire-load).
// This code is based on atomicops-internals-* in Google's perftools:
// http://code.google.com/p/google-perftools/source/browse/#svn%2Ftrunk%2Fsrc%2Fbase
#ifndef PORT_ATOMIC_POINTER_H_
#define PORT_ATOMIC_POINTER_H_
#include <stdint.h>
#include <atomic>
#if defined(_M_X64) || defined(__x86_64__)
#define ARCH_CPU_X86_FAMILY 1
#elif defined(_M_IX86) || defined(__i386__) || defined(__i386)
#define ARCH_CPU_X86_FAMILY 1
#elif defined(__ARMEL__)
#define ARCH_CPU_ARM_FAMILY 1
#elif defined(__aarch64__)
#define ARCH_CPU_ARM64_FAMILY 1
#elif defined(__ppc__) || defined(__powerpc__) || defined(__powerpc64__)
#define ARCH_CPU_PPC_FAMILY 1
#elif defined(__mips__)
#define ARCH_CPU_MIPS_FAMILY 1
#endif
namespace leveldb {
namespace port {
// Define MemoryBarrier() if available
// Windows on x86
#if defined(OS_WIN) && defined(COMPILER_MSVC) && defined(ARCH_CPU_X86_FAMILY)
// windows.h already provides a MemoryBarrier(void) macro
// http://msdn.microsoft.com/en-us/library/ms684208(v=vs.85).aspx
#define LEVELDB_HAVE_MEMORY_BARRIER
// Mac OS
#elif defined(__APPLE__)
inline void MemoryBarrier() {
std::atomic_thread_fence(std::memory_order_seq_cst);
}
#define LEVELDB_HAVE_MEMORY_BARRIER
// Gcc on x86
#elif defined(ARCH_CPU_X86_FAMILY) && defined(__GNUC__)
inline void MemoryBarrier() {
// See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on
// this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering.
__asm__ __volatile__("" : : : "memory");
}
#define LEVELDB_HAVE_MEMORY_BARRIER
// Sun Studio
#elif defined(ARCH_CPU_X86_FAMILY) && defined(__SUNPRO_CC)
inline void MemoryBarrier() {
// See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on
// this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering.
asm volatile("" : : : "memory");
}
#define LEVELDB_HAVE_MEMORY_BARRIER
// ARM Linux
#elif defined(ARCH_CPU_ARM_FAMILY) && defined(__linux__)
typedef void (*LinuxKernelMemoryBarrierFunc)(void);
// The Linux ARM kernel provides a highly optimized device-specific memory
// barrier function at a fixed memory address that is mapped in every
// user-level process.
//
// This beats using CPU-specific instructions which are, on single-core
// devices, un-necessary and very costly (e.g. ARMv7-A "dmb" takes more
// than 180ns on a Cortex-A8 like the one on a Nexus One). Benchmarking
// shows that the extra function call cost is completely negligible on
// multi-core devices.
//
inline void MemoryBarrier() {
(*(LinuxKernelMemoryBarrierFunc)0xffff0fa0)();
}
#define LEVELDB_HAVE_MEMORY_BARRIER
// ARM64
#elif defined(ARCH_CPU_ARM64_FAMILY)
inline void MemoryBarrier() {
asm volatile("dmb sy" : : : "memory");
}
#define LEVELDB_HAVE_MEMORY_BARRIER
// PPC
#elif defined(ARCH_CPU_PPC_FAMILY) && defined(__GNUC__)
inline void MemoryBarrier() {
// TODO for some powerpc expert: is there a cheaper suitable variant?
// Perhaps by having separate barriers for acquire and release ops.
asm volatile("sync" : : : "memory");
}
#define LEVELDB_HAVE_MEMORY_BARRIER
// MIPS
#elif defined(ARCH_CPU_MIPS_FAMILY) && defined(__GNUC__)
inline void MemoryBarrier() {
__asm__ __volatile__("sync" : : : "memory");
}
#define LEVELDB_HAVE_MEMORY_BARRIER
#endif
// AtomicPointer built using platform-specific MemoryBarrier().
#if defined(LEVELDB_HAVE_MEMORY_BARRIER)
class AtomicPointer {
private:
void* rep_;
public:
AtomicPointer() { }
explicit AtomicPointer(void* p) : rep_(p) {}
inline void* NoBarrier_Load() const { return rep_; }
inline void NoBarrier_Store(void* v) { rep_ = v; }
inline void* Acquire_Load() const {
void* result = rep_;
MemoryBarrier();
return result;
}
inline void Release_Store(void* v) {
MemoryBarrier();
rep_ = v;
}
};
// AtomicPointer based on C++11 <atomic>.
#else
class AtomicPointer {
private:
std::atomic<void*> rep_;
public:
AtomicPointer() { }
explicit AtomicPointer(void* v) : rep_(v) { }
inline void* Acquire_Load() const {
return rep_.load(std::memory_order_acquire);
}
inline void Release_Store(void* v) {
rep_.store(v, std::memory_order_release);
}
inline void* NoBarrier_Load() const {
return rep_.load(std::memory_order_relaxed);
}
inline void NoBarrier_Store(void* v) {
rep_.store(v, std::memory_order_relaxed);
}
};
#endif
#undef LEVELDB_HAVE_MEMORY_BARRIER
#undef ARCH_CPU_X86_FAMILY
#undef ARCH_CPU_ARM_FAMILY
#undef ARCH_CPU_ARM64_FAMILY
#undef ARCH_CPU_PPC_FAMILY
} // namespace port
} // namespace leveldb
#endif // PORT_ATOMIC_POINTER_H_

+ 0
- 29
port/port_example.h View File

@ -62,35 +62,6 @@ class CondVar {
void SignallAll();
};
// A type that holds a pointer that can be read or written atomically
// (i.e., without word-tearing.)
class AtomicPointer {
private:
intptr_t rep_;
public:
// Initialize to arbitrary value
AtomicPointer();
// Initialize to hold v
explicit AtomicPointer(void* v) : rep_(v) { }
// Read and return the stored pointer with the guarantee that no
// later memory access (read or write) by this thread can be
// reordered ahead of this read.
void* Acquire_Load() const;
// Set v as the stored pointer with the guarantee that no earlier
// memory access (read or write) by this thread can be reordered
// after this store.
void Release_Store(void* v);
// Read the stored pointer with no ordering guarantees.
void* NoBarrier_Load() const;
// Set va as the stored pointer with no ordering guarantees.
void NoBarrier_Store(void* v);
};
// ------------------ Compression -------------------
// Store the snappy compression of "input[0,input_length-1]" in *output.

+ 0
- 1
port/port_stdcxx.h View File

@ -35,7 +35,6 @@
#include <condition_variable> // NOLINT
#include <mutex> // NOLINT
#include <string>
#include "port/atomic_pointer.h"
#include "port/thread_annotations.h"
namespace leveldb {

+ 2
- 3
util/arena.cc View File

@ -3,7 +3,6 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/arena.h"
#include <assert.h>
namespace leveldb {
@ -60,8 +59,8 @@ char* Arena::AllocateAligned(size_t bytes) {
char* Arena::AllocateNewBlock(size_t block_bytes) {
char* result = new char[block_bytes];
blocks_.push_back(result);
memory_usage_.NoBarrier_Store(
reinterpret_cast<void*>(MemoryUsage() + block_bytes + sizeof(char*)));
memory_usage_.fetch_add(block_bytes + sizeof(char*),
std::memory_order_relaxed);
return result;
}

+ 10
- 7
util/arena.h View File

@ -5,11 +5,11 @@
#ifndef STORAGE_LEVELDB_UTIL_ARENA_H_
#define STORAGE_LEVELDB_UTIL_ARENA_H_
#include <atomic>
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <vector>
#include <assert.h>
#include <stddef.h>
#include <stdint.h>
#include "port/port.h"
namespace leveldb {
@ -21,13 +21,13 @@ class Arena {
// Return a pointer to a newly allocated memory block of "bytes" bytes.
char* Allocate(size_t bytes);
// Allocate memory with the normal alignment guarantees provided by malloc
// Allocate memory with the normal alignment guarantees provided by malloc.
char* AllocateAligned(size_t bytes);
// Returns an estimate of the total memory usage of data allocated
// by the arena.
size_t MemoryUsage() const {
return reinterpret_cast<uintptr_t>(memory_usage_.NoBarrier_Load());
return memory_usage_.load(std::memory_order_relaxed);
}
private:
@ -42,7 +42,10 @@ class Arena {
std::vector<char*> blocks_;
// Total memory usage of the arena.
port::AtomicPointer memory_usage_;
//
// TODO(costan): This member is accessed via atomics, but the others are
// accessed without any locking. Is this OK?
std::atomic<size_t> memory_usage_;
// No copying allowed
Arena(const Arena&);

+ 31
- 26
util/env_test.cc View File

@ -5,6 +5,7 @@
#include "leveldb/env.h"
#include <algorithm>
#include <atomic>
#include "port/port.h"
#include "port/thread_annotations.h"
@ -24,10 +25,15 @@ class EnvTest {
EnvTest() : env_(Env::Default()) { }
};
static void SetBool(void* ptr) {
reinterpret_cast<port::AtomicPointer*>(ptr)->NoBarrier_Store(ptr);
namespace {
static void SetAtomicBool(void* atomic_bool_ptr) {
std::atomic<bool>* atomic_bool =
reinterpret_cast<std::atomic<bool>*>(atomic_bool_ptr);
atomic_bool->store(true, std::memory_order_relaxed);
}
} // namespace
TEST(EnvTest, ReadWrite) {
Random rnd(test::RandomSeed());
@ -77,42 +83,41 @@ TEST(EnvTest, ReadWrite) {
}
TEST(EnvTest, RunImmediately) {
port::AtomicPointer called(nullptr);
env_->Schedule(&SetBool, &called);
std::atomic<bool> called(false);
env_->Schedule(&SetAtomicBool, &called);
env_->SleepForMicroseconds(kDelayMicros);
ASSERT_TRUE(called.NoBarrier_Load() != nullptr);
ASSERT_TRUE(called.load(std::memory_order_relaxed));
}
TEST(EnvTest, RunMany) {
port::AtomicPointer last_id(nullptr);
std::atomic<int> last_id(0);
struct CB {
port::AtomicPointer* last_id_ptr; // Pointer to shared slot
uintptr_t id; // Order# for the execution of this callback
struct Callback {
std::atomic<int>* const last_id_ptr_; // Pointer to shared state.
const int id_; // Order# for the execution of this callback.
CB(port::AtomicPointer* p, int i) : last_id_ptr(p), id(i) { }
Callback(std::atomic<int>* last_id_ptr, int id)
: last_id_ptr_(last_id_ptr), id_(id) { }
static void Run(void* v) {
CB* cb = reinterpret_cast<CB*>(v);
void* cur = cb->last_id_ptr->NoBarrier_Load();
ASSERT_EQ(cb->id-1, reinterpret_cast<uintptr_t>(cur));
cb->last_id_ptr->Release_Store(reinterpret_cast<void*>(cb->id));
static void Run(void* arg) {
Callback* callback = reinterpret_cast<Callback*>(arg);
int current_id = callback->last_id_ptr_->load(std::memory_order_relaxed);
ASSERT_EQ(callback->id_ - 1, current_id);
callback->last_id_ptr_->store(callback->id_, std::memory_order_relaxed);
}
};
// Schedule in different order than start time
CB cb1(&last_id, 1);
CB cb2(&last_id, 2);
CB cb3(&last_id, 3);
CB cb4(&last_id, 4);
env_->Schedule(&CB::Run, &cb1);
env_->Schedule(&CB::Run, &cb2);
env_->Schedule(&CB::Run, &cb3);
env_->Schedule(&CB::Run, &cb4);
Callback callback1(&last_id, 1);
Callback callback2(&last_id, 2);
Callback callback3(&last_id, 3);
Callback callback4(&last_id, 4);
env_->Schedule(&Callback::Run, &callback1);
env_->Schedule(&Callback::Run, &callback2);
env_->Schedule(&Callback::Run, &callback3);
env_->Schedule(&Callback::Run, &callback4);
env_->SleepForMicroseconds(kDelayMicros);
void* cur = last_id.Acquire_Load();
ASSERT_EQ(4, reinterpret_cast<uintptr_t>(cur));
ASSERT_EQ(4, last_id.load(std::memory_order_relaxed));
}
struct State {

+ 24
- 32
util/env_windows.cc View File

@ -8,6 +8,7 @@
#include <windows.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <deque>
@ -105,51 +106,42 @@ class ScopedHandle {
};
// Helper class to limit resource usage to avoid exhaustion.
// Currently used to limit mmap file usage so that we do not end
// up running out virtual memory, or running into kernel performance
// problems for very large databases.
// Currently used to limit read-only file descriptors and mmap file usage
// so that we do not run out of file descriptors or virtual memory, or run into
// kernel performance problems for very large databases.
class Limiter {
public:
// Limit maximum number of resources to |n|.
Limiter(intptr_t n) { SetAllowed(n); }
// Limit maximum number of resources to |max_acquires|.
Limiter(int max_acquires) : acquires_allowed_(max_acquires) {}
Limiter(const Limiter&) = delete;
Limiter operator=(const Limiter&) = delete;
// If another resource is available, acquire it and return true.
// Else return false.
bool Acquire() LOCKS_EXCLUDED(mu_) {
if (GetAllowed() <= 0) {
return false;
}
MutexLock l(&mu_);
intptr_t x = GetAllowed();
if (x <= 0) {
return false;
} else {
SetAllowed(x - 1);
bool Acquire() {
int old_acquires_allowed =
acquires_allowed_.fetch_sub(1, std::memory_order_relaxed);
if (old_acquires_allowed > 0)
return true;
}
acquires_allowed_.fetch_add(1, std::memory_order_relaxed);
return false;
}
// Release a resource acquired by a previous call to Acquire() that returned
// true.
void Release() LOCKS_EXCLUDED(mu_) {
MutexLock l(&mu_);
SetAllowed(GetAllowed() + 1);
void Release() {
acquires_allowed_.fetch_add(1, std::memory_order_relaxed);
}
private:
port::Mutex mu_;
port::AtomicPointer allowed_;
intptr_t GetAllowed() const {
return reinterpret_cast<intptr_t>(allowed_.Acquire_Load());
}
void SetAllowed(intptr_t v) EXCLUSIVE_LOCKS_REQUIRED(mu_) {
allowed_.Release_Store(reinterpret_cast<void*>(v));
}
Limiter(const Limiter&);
void operator=(const Limiter&);
// The number of available resources.
//
// This is a counter and is not tied to the invariants of any other class, so
// it can be operated on safely using std::memory_order_relaxed.
std::atomic<int> acquires_allowed_;
};
class WindowsSequentialFile : public SequentialFile {

Loading…
Cancel
Save