You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

380 lines
9.7 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include "db/skiplist.h"
  5. #include <set>
  6. #include "leveldb/env.h"
  7. #include "port/port.h"
  8. #include "port/thread_annotations.h"
  9. #include "util/arena.h"
  10. #include "util/hash.h"
  11. #include "util/random.h"
  12. #include "util/testharness.h"
  13. namespace leveldb {
  14. typedef uint64_t Key;
  15. struct Comparator {
  16. int operator()(const Key& a, const Key& b) const {
  17. if (a < b) {
  18. return -1;
  19. } else if (a > b) {
  20. return +1;
  21. } else {
  22. return 0;
  23. }
  24. }
  25. };
  26. class SkipTest { };
  27. TEST(SkipTest, Empty) {
  28. Arena arena;
  29. Comparator cmp;
  30. SkipList<Key, Comparator> list(cmp, &arena);
  31. ASSERT_TRUE(!list.Contains(10));
  32. SkipList<Key, Comparator>::Iterator iter(&list);
  33. ASSERT_TRUE(!iter.Valid());
  34. iter.SeekToFirst();
  35. ASSERT_TRUE(!iter.Valid());
  36. iter.Seek(100);
  37. ASSERT_TRUE(!iter.Valid());
  38. iter.SeekToLast();
  39. ASSERT_TRUE(!iter.Valid());
  40. }
  41. TEST(SkipTest, InsertAndLookup) {
  42. const int N = 2000;
  43. const int R = 5000;
  44. Random rnd(1000);
  45. std::set<Key> keys;
  46. Arena arena;
  47. Comparator cmp;
  48. SkipList<Key, Comparator> list(cmp, &arena);
  49. for (int i = 0; i < N; i++) {
  50. Key key = rnd.Next() % R;
  51. if (keys.insert(key).second) {
  52. list.Insert(key);
  53. }
  54. }
  55. for (int i = 0; i < R; i++) {
  56. if (list.Contains(i)) {
  57. ASSERT_EQ(keys.count(i), 1);
  58. } else {
  59. ASSERT_EQ(keys.count(i), 0);
  60. }
  61. }
  62. // Simple iterator tests
  63. {
  64. SkipList<Key, Comparator>::Iterator iter(&list);
  65. ASSERT_TRUE(!iter.Valid());
  66. iter.Seek(0);
  67. ASSERT_TRUE(iter.Valid());
  68. ASSERT_EQ(*(keys.begin()), iter.key());
  69. iter.SeekToFirst();
  70. ASSERT_TRUE(iter.Valid());
  71. ASSERT_EQ(*(keys.begin()), iter.key());
  72. iter.SeekToLast();
  73. ASSERT_TRUE(iter.Valid());
  74. ASSERT_EQ(*(keys.rbegin()), iter.key());
  75. }
  76. // Forward iteration test
  77. for (int i = 0; i < R; i++) {
  78. SkipList<Key, Comparator>::Iterator iter(&list);
  79. iter.Seek(i);
  80. // Compare against model iterator
  81. std::set<Key>::iterator model_iter = keys.lower_bound(i);
  82. for (int j = 0; j < 3; j++) {
  83. if (model_iter == keys.end()) {
  84. ASSERT_TRUE(!iter.Valid());
  85. break;
  86. } else {
  87. ASSERT_TRUE(iter.Valid());
  88. ASSERT_EQ(*model_iter, iter.key());
  89. ++model_iter;
  90. iter.Next();
  91. }
  92. }
  93. }
  94. // Backward iteration test
  95. {
  96. SkipList<Key, Comparator>::Iterator iter(&list);
  97. iter.SeekToLast();
  98. // Compare against model iterator
  99. for (std::set<Key>::reverse_iterator model_iter = keys.rbegin();
  100. model_iter != keys.rend();
  101. ++model_iter) {
  102. ASSERT_TRUE(iter.Valid());
  103. ASSERT_EQ(*model_iter, iter.key());
  104. iter.Prev();
  105. }
  106. ASSERT_TRUE(!iter.Valid());
  107. }
  108. }
  109. // We want to make sure that with a single writer and multiple
  110. // concurrent readers (with no synchronization other than when a
  111. // reader's iterator is created), the reader always observes all the
  112. // data that was present in the skip list when the iterator was
  113. // constructor. Because insertions are happening concurrently, we may
  114. // also observe new values that were inserted since the iterator was
  115. // constructed, but we should never miss any values that were present
  116. // at iterator construction time.
  117. //
  118. // We generate multi-part keys:
  119. // <key,gen,hash>
  120. // where:
  121. // key is in range [0..K-1]
  122. // gen is a generation number for key
  123. // hash is hash(key,gen)
  124. //
  125. // The insertion code picks a random key, sets gen to be 1 + the last
  126. // generation number inserted for that key, and sets hash to Hash(key,gen).
  127. //
  128. // At the beginning of a read, we snapshot the last inserted
  129. // generation number for each key. We then iterate, including random
  130. // calls to Next() and Seek(). For every key we encounter, we
  131. // check that it is either expected given the initial snapshot or has
  132. // been concurrently added since the iterator started.
  133. class ConcurrentTest {
  134. private:
  135. static const uint32_t K = 4;
  136. static uint64_t key(Key key) { return (key >> 40); }
  137. static uint64_t gen(Key key) { return (key >> 8) & 0xffffffffu; }
  138. static uint64_t hash(Key key) { return key & 0xff; }
  139. static uint64_t HashNumbers(uint64_t k, uint64_t g) {
  140. uint64_t data[2] = { k, g };
  141. return Hash(reinterpret_cast<char*>(data), sizeof(data), 0);
  142. }
  143. static Key MakeKey(uint64_t k, uint64_t g) {
  144. assert(sizeof(Key) == sizeof(uint64_t));
  145. assert(k <= K); // We sometimes pass K to seek to the end of the skiplist
  146. assert(g <= 0xffffffffu);
  147. return ((k << 40) | (g << 8) | (HashNumbers(k, g) & 0xff));
  148. }
  149. static bool IsValidKey(Key k) {
  150. return hash(k) == (HashNumbers(key(k), gen(k)) & 0xff);
  151. }
  152. static Key RandomTarget(Random* rnd) {
  153. switch (rnd->Next() % 10) {
  154. case 0:
  155. // Seek to beginning
  156. return MakeKey(0, 0);
  157. case 1:
  158. // Seek to end
  159. return MakeKey(K, 0);
  160. default:
  161. // Seek to middle
  162. return MakeKey(rnd->Next() % K, 0);
  163. }
  164. }
  165. // Per-key generation
  166. struct State {
  167. port::AtomicPointer generation[K];
  168. void Set(int k, intptr_t v) {
  169. generation[k].Release_Store(reinterpret_cast<void*>(v));
  170. }
  171. intptr_t Get(int k) {
  172. return reinterpret_cast<intptr_t>(generation[k].Acquire_Load());
  173. }
  174. State() {
  175. for (int k = 0; k < K; k++) {
  176. Set(k, 0);
  177. }
  178. }
  179. };
  180. // Current state of the test
  181. State current_;
  182. Arena arena_;
  183. // SkipList is not protected by mu_. We just use a single writer
  184. // thread to modify it.
  185. SkipList<Key, Comparator> list_;
  186. public:
  187. ConcurrentTest() : list_(Comparator(), &arena_) { }
  188. // REQUIRES: External synchronization
  189. void WriteStep(Random* rnd) {
  190. const uint32_t k = rnd->Next() % K;
  191. const intptr_t g = current_.Get(k) + 1;
  192. const Key key = MakeKey(k, g);
  193. list_.Insert(key);
  194. current_.Set(k, g);
  195. }
  196. void ReadStep(Random* rnd) {
  197. // Remember the initial committed state of the skiplist.
  198. State initial_state;
  199. for (int k = 0; k < K; k++) {
  200. initial_state.Set(k, current_.Get(k));
  201. }
  202. Key pos = RandomTarget(rnd);
  203. SkipList<Key, Comparator>::Iterator iter(&list_);
  204. iter.Seek(pos);
  205. while (true) {
  206. Key current;
  207. if (!iter.Valid()) {
  208. current = MakeKey(K, 0);
  209. } else {
  210. current = iter.key();
  211. ASSERT_TRUE(IsValidKey(current)) << current;
  212. }
  213. ASSERT_LE(pos, current) << "should not go backwards";
  214. // Verify that everything in [pos,current) was not present in
  215. // initial_state.
  216. while (pos < current) {
  217. ASSERT_LT(key(pos), K) << pos;
  218. // Note that generation 0 is never inserted, so it is ok if
  219. // <*,0,*> is missing.
  220. ASSERT_TRUE((gen(pos) == 0) ||
  221. (gen(pos) > static_cast<Key>(initial_state.Get(key(pos))))
  222. ) << "key: " << key(pos)
  223. << "; gen: " << gen(pos)
  224. << "; initgen: "
  225. << initial_state.Get(key(pos));
  226. // Advance to next key in the valid key space
  227. if (key(pos) < key(current)) {
  228. pos = MakeKey(key(pos) + 1, 0);
  229. } else {
  230. pos = MakeKey(key(pos), gen(pos) + 1);
  231. }
  232. }
  233. if (!iter.Valid()) {
  234. break;
  235. }
  236. if (rnd->Next() % 2) {
  237. iter.Next();
  238. pos = MakeKey(key(pos), gen(pos) + 1);
  239. } else {
  240. Key new_target = RandomTarget(rnd);
  241. if (new_target > pos) {
  242. pos = new_target;
  243. iter.Seek(new_target);
  244. }
  245. }
  246. }
  247. }
  248. };
  249. const uint32_t ConcurrentTest::K;
  250. // Simple test that does single-threaded testing of the ConcurrentTest
  251. // scaffolding.
  252. TEST(SkipTest, ConcurrentWithoutThreads) {
  253. ConcurrentTest test;
  254. Random rnd(test::RandomSeed());
  255. for (int i = 0; i < 10000; i++) {
  256. test.ReadStep(&rnd);
  257. test.WriteStep(&rnd);
  258. }
  259. }
  260. class TestState {
  261. public:
  262. ConcurrentTest t_;
  263. int seed_;
  264. port::AtomicPointer quit_flag_;
  265. enum ReaderState {
  266. STARTING,
  267. RUNNING,
  268. DONE
  269. };
  270. explicit TestState(int s)
  271. : seed_(s),
  272. quit_flag_(nullptr),
  273. state_(STARTING),
  274. state_cv_(&mu_) {}
  275. void Wait(ReaderState s) LOCKS_EXCLUDED(mu_) {
  276. mu_.Lock();
  277. while (state_ != s) {
  278. state_cv_.Wait();
  279. }
  280. mu_.Unlock();
  281. }
  282. void Change(ReaderState s) LOCKS_EXCLUDED(mu_) {
  283. mu_.Lock();
  284. state_ = s;
  285. state_cv_.Signal();
  286. mu_.Unlock();
  287. }
  288. private:
  289. port::Mutex mu_;
  290. ReaderState state_ GUARDED_BY(mu_);
  291. port::CondVar state_cv_ GUARDED_BY(mu_);
  292. };
  293. static void ConcurrentReader(void* arg) {
  294. TestState* state = reinterpret_cast<TestState*>(arg);
  295. Random rnd(state->seed_);
  296. int64_t reads = 0;
  297. state->Change(TestState::RUNNING);
  298. while (!state->quit_flag_.Acquire_Load()) {
  299. state->t_.ReadStep(&rnd);
  300. ++reads;
  301. }
  302. state->Change(TestState::DONE);
  303. }
  304. static void RunConcurrent(int run) {
  305. const int seed = test::RandomSeed() + (run * 100);
  306. Random rnd(seed);
  307. const int N = 1000;
  308. const int kSize = 1000;
  309. for (int i = 0; i < N; i++) {
  310. if ((i % 100) == 0) {
  311. fprintf(stderr, "Run %d of %d\n", i, N);
  312. }
  313. TestState state(seed + 1);
  314. Env::Default()->Schedule(ConcurrentReader, &state);
  315. state.Wait(TestState::RUNNING);
  316. for (int i = 0; i < kSize; i++) {
  317. state.t_.WriteStep(&rnd);
  318. }
  319. state.quit_flag_.Release_Store(&state); // Any non-null arg will do
  320. state.Wait(TestState::DONE);
  321. }
  322. }
  323. TEST(SkipTest, Concurrent1) { RunConcurrent(1); }
  324. TEST(SkipTest, Concurrent2) { RunConcurrent(2); }
  325. TEST(SkipTest, Concurrent3) { RunConcurrent(3); }
  326. TEST(SkipTest, Concurrent4) { RunConcurrent(4); }
  327. TEST(SkipTest, Concurrent5) { RunConcurrent(5); }
  328. } // namespace leveldb
  329. int main(int argc, char** argv) {
  330. return leveldb::test::RunAllTests();
  331. }