You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

383 lines
9.7 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include "db/skiplist.h"
  5. #include <atomic>
  6. #include <set>
  7. #include "leveldb/env.h"
  8. #include "port/port.h"
  9. #include "port/thread_annotations.h"
  10. #include "util/arena.h"
  11. #include "util/hash.h"
  12. #include "util/random.h"
  13. #include "util/testharness.h"
  14. namespace leveldb {
  15. typedef uint64_t Key;
  16. struct Comparator {
  17. int operator()(const Key& a, const Key& b) const {
  18. if (a < b) {
  19. return -1;
  20. } else if (a > b) {
  21. return +1;
  22. } else {
  23. return 0;
  24. }
  25. }
  26. };
  27. class SkipTest { };
  28. TEST(SkipTest, Empty) {
  29. Arena arena;
  30. Comparator cmp;
  31. SkipList<Key, Comparator> list(cmp, &arena);
  32. ASSERT_TRUE(!list.Contains(10));
  33. SkipList<Key, Comparator>::Iterator iter(&list);
  34. ASSERT_TRUE(!iter.Valid());
  35. iter.SeekToFirst();
  36. ASSERT_TRUE(!iter.Valid());
  37. iter.Seek(100);
  38. ASSERT_TRUE(!iter.Valid());
  39. iter.SeekToLast();
  40. ASSERT_TRUE(!iter.Valid());
  41. }
  42. TEST(SkipTest, InsertAndLookup) {
  43. const int N = 2000;
  44. const int R = 5000;
  45. Random rnd(1000);
  46. std::set<Key> keys;
  47. Arena arena;
  48. Comparator cmp;
  49. SkipList<Key, Comparator> list(cmp, &arena);
  50. for (int i = 0; i < N; i++) {
  51. Key key = rnd.Next() % R;
  52. if (keys.insert(key).second) {
  53. list.Insert(key);
  54. }
  55. }
  56. for (int i = 0; i < R; i++) {
  57. if (list.Contains(i)) {
  58. ASSERT_EQ(keys.count(i), 1);
  59. } else {
  60. ASSERT_EQ(keys.count(i), 0);
  61. }
  62. }
  63. // Simple iterator tests
  64. {
  65. SkipList<Key, Comparator>::Iterator iter(&list);
  66. ASSERT_TRUE(!iter.Valid());
  67. iter.Seek(0);
  68. ASSERT_TRUE(iter.Valid());
  69. ASSERT_EQ(*(keys.begin()), iter.key());
  70. iter.SeekToFirst();
  71. ASSERT_TRUE(iter.Valid());
  72. ASSERT_EQ(*(keys.begin()), iter.key());
  73. iter.SeekToLast();
  74. ASSERT_TRUE(iter.Valid());
  75. ASSERT_EQ(*(keys.rbegin()), iter.key());
  76. }
  77. // Forward iteration test
  78. for (int i = 0; i < R; i++) {
  79. SkipList<Key, Comparator>::Iterator iter(&list);
  80. iter.Seek(i);
  81. // Compare against model iterator
  82. std::set<Key>::iterator model_iter = keys.lower_bound(i);
  83. for (int j = 0; j < 3; j++) {
  84. if (model_iter == keys.end()) {
  85. ASSERT_TRUE(!iter.Valid());
  86. break;
  87. } else {
  88. ASSERT_TRUE(iter.Valid());
  89. ASSERT_EQ(*model_iter, iter.key());
  90. ++model_iter;
  91. iter.Next();
  92. }
  93. }
  94. }
  95. // Backward iteration test
  96. {
  97. SkipList<Key, Comparator>::Iterator iter(&list);
  98. iter.SeekToLast();
  99. // Compare against model iterator
  100. for (std::set<Key>::reverse_iterator model_iter = keys.rbegin();
  101. model_iter != keys.rend();
  102. ++model_iter) {
  103. ASSERT_TRUE(iter.Valid());
  104. ASSERT_EQ(*model_iter, iter.key());
  105. iter.Prev();
  106. }
  107. ASSERT_TRUE(!iter.Valid());
  108. }
  109. }
  110. // We want to make sure that with a single writer and multiple
  111. // concurrent readers (with no synchronization other than when a
  112. // reader's iterator is created), the reader always observes all the
  113. // data that was present in the skip list when the iterator was
  114. // constructor. Because insertions are happening concurrently, we may
  115. // also observe new values that were inserted since the iterator was
  116. // constructed, but we should never miss any values that were present
  117. // at iterator construction time.
  118. //
  119. // We generate multi-part keys:
  120. // <key,gen,hash>
  121. // where:
  122. // key is in range [0..K-1]
  123. // gen is a generation number for key
  124. // hash is hash(key,gen)
  125. //
  126. // The insertion code picks a random key, sets gen to be 1 + the last
  127. // generation number inserted for that key, and sets hash to Hash(key,gen).
  128. //
  129. // At the beginning of a read, we snapshot the last inserted
  130. // generation number for each key. We then iterate, including random
  131. // calls to Next() and Seek(). For every key we encounter, we
  132. // check that it is either expected given the initial snapshot or has
  133. // been concurrently added since the iterator started.
  134. class ConcurrentTest {
  135. private:
  136. static const uint32_t K = 4;
  137. static uint64_t key(Key key) { return (key >> 40); }
  138. static uint64_t gen(Key key) { return (key >> 8) & 0xffffffffu; }
  139. static uint64_t hash(Key key) { return key & 0xff; }
  140. static uint64_t HashNumbers(uint64_t k, uint64_t g) {
  141. uint64_t data[2] = { k, g };
  142. return Hash(reinterpret_cast<char*>(data), sizeof(data), 0);
  143. }
  144. static Key MakeKey(uint64_t k, uint64_t g) {
  145. assert(sizeof(Key) == sizeof(uint64_t));
  146. assert(k <= K); // We sometimes pass K to seek to the end of the skiplist
  147. assert(g <= 0xffffffffu);
  148. return ((k << 40) | (g << 8) | (HashNumbers(k, g) & 0xff));
  149. }
  150. static bool IsValidKey(Key k) {
  151. return hash(k) == (HashNumbers(key(k), gen(k)) & 0xff);
  152. }
  153. static Key RandomTarget(Random* rnd) {
  154. switch (rnd->Next() % 10) {
  155. case 0:
  156. // Seek to beginning
  157. return MakeKey(0, 0);
  158. case 1:
  159. // Seek to end
  160. return MakeKey(K, 0);
  161. default:
  162. // Seek to middle
  163. return MakeKey(rnd->Next() % K, 0);
  164. }
  165. }
  166. // Per-key generation
  167. struct State {
  168. std::atomic<int> generation[K];
  169. void Set(int k, int v) {
  170. generation[k].store(v, std::memory_order_release);
  171. }
  172. int Get(int k) {
  173. return generation[k].load(std::memory_order_acquire);
  174. }
  175. State() {
  176. for (int k = 0; k < K; k++) {
  177. Set(k, 0);
  178. }
  179. }
  180. };
  181. // Current state of the test
  182. State current_;
  183. Arena arena_;
  184. // SkipList is not protected by mu_. We just use a single writer
  185. // thread to modify it.
  186. SkipList<Key, Comparator> list_;
  187. public:
  188. ConcurrentTest() : list_(Comparator(), &arena_) { }
  189. // REQUIRES: External synchronization
  190. void WriteStep(Random* rnd) {
  191. const uint32_t k = rnd->Next() % K;
  192. const intptr_t g = current_.Get(k) + 1;
  193. const Key key = MakeKey(k, g);
  194. list_.Insert(key);
  195. current_.Set(k, g);
  196. }
  197. void ReadStep(Random* rnd) {
  198. // Remember the initial committed state of the skiplist.
  199. State initial_state;
  200. for (int k = 0; k < K; k++) {
  201. initial_state.Set(k, current_.Get(k));
  202. }
  203. Key pos = RandomTarget(rnd);
  204. SkipList<Key, Comparator>::Iterator iter(&list_);
  205. iter.Seek(pos);
  206. while (true) {
  207. Key current;
  208. if (!iter.Valid()) {
  209. current = MakeKey(K, 0);
  210. } else {
  211. current = iter.key();
  212. ASSERT_TRUE(IsValidKey(current)) << current;
  213. }
  214. ASSERT_LE(pos, current) << "should not go backwards";
  215. // Verify that everything in [pos,current) was not present in
  216. // initial_state.
  217. while (pos < current) {
  218. ASSERT_LT(key(pos), K) << pos;
  219. // Note that generation 0 is never inserted, so it is ok if
  220. // <*,0,*> is missing.
  221. ASSERT_TRUE((gen(pos) == 0) ||
  222. (gen(pos) > static_cast<Key>(initial_state.Get(key(pos))))
  223. ) << "key: " << key(pos)
  224. << "; gen: " << gen(pos)
  225. << "; initgen: "
  226. << initial_state.Get(key(pos));
  227. // Advance to next key in the valid key space
  228. if (key(pos) < key(current)) {
  229. pos = MakeKey(key(pos) + 1, 0);
  230. } else {
  231. pos = MakeKey(key(pos), gen(pos) + 1);
  232. }
  233. }
  234. if (!iter.Valid()) {
  235. break;
  236. }
  237. if (rnd->Next() % 2) {
  238. iter.Next();
  239. pos = MakeKey(key(pos), gen(pos) + 1);
  240. } else {
  241. Key new_target = RandomTarget(rnd);
  242. if (new_target > pos) {
  243. pos = new_target;
  244. iter.Seek(new_target);
  245. }
  246. }
  247. }
  248. }
  249. };
  250. const uint32_t ConcurrentTest::K;
  251. // Simple test that does single-threaded testing of the ConcurrentTest
  252. // scaffolding.
  253. TEST(SkipTest, ConcurrentWithoutThreads) {
  254. ConcurrentTest test;
  255. Random rnd(test::RandomSeed());
  256. for (int i = 0; i < 10000; i++) {
  257. test.ReadStep(&rnd);
  258. test.WriteStep(&rnd);
  259. }
  260. }
  261. class TestState {
  262. public:
  263. ConcurrentTest t_;
  264. int seed_;
  265. std::atomic<bool> quit_flag_;
  266. enum ReaderState {
  267. STARTING,
  268. RUNNING,
  269. DONE
  270. };
  271. explicit TestState(int s)
  272. : seed_(s),
  273. quit_flag_(false),
  274. state_(STARTING),
  275. state_cv_(&mu_) {}
  276. void Wait(ReaderState s) LOCKS_EXCLUDED(mu_) {
  277. mu_.Lock();
  278. while (state_ != s) {
  279. state_cv_.Wait();
  280. }
  281. mu_.Unlock();
  282. }
  283. void Change(ReaderState s) LOCKS_EXCLUDED(mu_) {
  284. mu_.Lock();
  285. state_ = s;
  286. state_cv_.Signal();
  287. mu_.Unlock();
  288. }
  289. private:
  290. port::Mutex mu_;
  291. ReaderState state_ GUARDED_BY(mu_);
  292. port::CondVar state_cv_ GUARDED_BY(mu_);
  293. };
  294. static void ConcurrentReader(void* arg) {
  295. TestState* state = reinterpret_cast<TestState*>(arg);
  296. Random rnd(state->seed_);
  297. int64_t reads = 0;
  298. state->Change(TestState::RUNNING);
  299. while (!state->quit_flag_.load(std::memory_order_acquire)) {
  300. state->t_.ReadStep(&rnd);
  301. ++reads;
  302. }
  303. state->Change(TestState::DONE);
  304. }
  305. static void RunConcurrent(int run) {
  306. const int seed = test::RandomSeed() + (run * 100);
  307. Random rnd(seed);
  308. const int N = 1000;
  309. const int kSize = 1000;
  310. for (int i = 0; i < N; i++) {
  311. if ((i % 100) == 0) {
  312. fprintf(stderr, "Run %d of %d\n", i, N);
  313. }
  314. TestState state(seed + 1);
  315. Env::Default()->Schedule(ConcurrentReader, &state);
  316. state.Wait(TestState::RUNNING);
  317. for (int i = 0; i < kSize; i++) {
  318. state.t_.WriteStep(&rnd);
  319. }
  320. state.quit_flag_.store(true, std::memory_order_release);
  321. state.Wait(TestState::DONE);
  322. }
  323. }
  324. TEST(SkipTest, Concurrent1) { RunConcurrent(1); }
  325. TEST(SkipTest, Concurrent2) { RunConcurrent(2); }
  326. TEST(SkipTest, Concurrent3) { RunConcurrent(3); }
  327. TEST(SkipTest, Concurrent4) { RunConcurrent(4); }
  328. TEST(SkipTest, Concurrent5) { RunConcurrent(5); }
  329. } // namespace leveldb
  330. int main(int argc, char** argv) {
  331. return leveldb::test::RunAllTests();
  332. }