您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

378 行
9.5 KiB

  1. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style license that can be
  3. // found in the LICENSE file. See the AUTHORS file for names of contributors.
  4. #include "db/skiplist.h"
  5. #include <set>
  6. #include "leveldb/env.h"
  7. #include "util/arena.h"
  8. #include "util/hash.h"
  9. #include "util/random.h"
  10. #include "util/testharness.h"
  11. namespace leveldb {
  12. typedef uint64_t Key;
  13. struct Comparator {
  14. int operator()(const Key& a, const Key& b) const {
  15. if (a < b) {
  16. return -1;
  17. } else if (a > b) {
  18. return +1;
  19. } else {
  20. return 0;
  21. }
  22. }
  23. };
  24. class SkipTest { };
  25. TEST(SkipTest, Empty) {
  26. Arena arena;
  27. Comparator cmp;
  28. SkipList<Key, Comparator> list(cmp, &arena);
  29. ASSERT_TRUE(!list.Contains(10));
  30. SkipList<Key, Comparator>::Iterator iter(&list);
  31. ASSERT_TRUE(!iter.Valid());
  32. iter.SeekToFirst();
  33. ASSERT_TRUE(!iter.Valid());
  34. iter.Seek(100);
  35. ASSERT_TRUE(!iter.Valid());
  36. iter.SeekToLast();
  37. ASSERT_TRUE(!iter.Valid());
  38. }
  39. TEST(SkipTest, InsertAndLookup) {
  40. const int N = 2000;
  41. const int R = 5000;
  42. Random rnd(1000);
  43. std::set<Key> keys;
  44. Arena arena;
  45. Comparator cmp;
  46. SkipList<Key, Comparator> list(cmp, &arena);
  47. for (int i = 0; i < N; i++) {
  48. Key key = rnd.Next() % R;
  49. if (keys.insert(key).second) {
  50. list.Insert(key);
  51. }
  52. }
  53. for (int i = 0; i < R; i++) {
  54. if (list.Contains(i)) {
  55. ASSERT_EQ(keys.count(i), 1);
  56. } else {
  57. ASSERT_EQ(keys.count(i), 0);
  58. }
  59. }
  60. // Simple iterator tests
  61. {
  62. SkipList<Key, Comparator>::Iterator iter(&list);
  63. ASSERT_TRUE(!iter.Valid());
  64. iter.Seek(0);
  65. ASSERT_TRUE(iter.Valid());
  66. ASSERT_EQ(*(keys.begin()), iter.key());
  67. iter.SeekToFirst();
  68. ASSERT_TRUE(iter.Valid());
  69. ASSERT_EQ(*(keys.begin()), iter.key());
  70. iter.SeekToLast();
  71. ASSERT_TRUE(iter.Valid());
  72. ASSERT_EQ(*(keys.rbegin()), iter.key());
  73. }
  74. // Forward iteration test
  75. for (int i = 0; i < R; i++) {
  76. SkipList<Key, Comparator>::Iterator iter(&list);
  77. iter.Seek(i);
  78. // Compare against model iterator
  79. std::set<Key>::iterator model_iter = keys.lower_bound(i);
  80. for (int j = 0; j < 3; j++) {
  81. if (model_iter == keys.end()) {
  82. ASSERT_TRUE(!iter.Valid());
  83. break;
  84. } else {
  85. ASSERT_TRUE(iter.Valid());
  86. ASSERT_EQ(*model_iter, iter.key());
  87. ++model_iter;
  88. iter.Next();
  89. }
  90. }
  91. }
  92. // Backward iteration test
  93. {
  94. SkipList<Key, Comparator>::Iterator iter(&list);
  95. iter.SeekToLast();
  96. // Compare against model iterator
  97. for (std::set<Key>::reverse_iterator model_iter = keys.rbegin();
  98. model_iter != keys.rend();
  99. ++model_iter) {
  100. ASSERT_TRUE(iter.Valid());
  101. ASSERT_EQ(*model_iter, iter.key());
  102. iter.Prev();
  103. }
  104. ASSERT_TRUE(!iter.Valid());
  105. }
  106. }
  107. // We want to make sure that with a single writer and multiple
  108. // concurrent readers (with no synchronization other than when a
  109. // reader's iterator is created), the reader always observes all the
  110. // data that was present in the skip list when the iterator was
  111. // constructor. Because insertions are happening concurrently, we may
  112. // also observe new values that were inserted since the iterator was
  113. // constructed, but we should never miss any values that were present
  114. // at iterator construction time.
  115. //
  116. // We generate multi-part keys:
  117. // <key,gen,hash>
  118. // where:
  119. // key is in range [0..K-1]
  120. // gen is a generation number for key
  121. // hash is hash(key,gen)
  122. //
  123. // The insertion code picks a random key, sets gen to be 1 + the last
  124. // generation number inserted for that key, and sets hash to Hash(key,gen).
  125. //
  126. // At the beginning of a read, we snapshot the last inserted
  127. // generation number for each key. We then iterate, including random
  128. // calls to Next() and Seek(). For every key we encounter, we
  129. // check that it is either expected given the initial snapshot or has
  130. // been concurrently added since the iterator started.
  131. class ConcurrentTest {
  132. private:
  133. static const uint32_t K = 4;
  134. static uint64_t key(Key key) { return (key >> 40); }
  135. static uint64_t gen(Key key) { return (key >> 8) & 0xffffffffu; }
  136. static uint64_t hash(Key key) { return key & 0xff; }
  137. static uint64_t HashNumbers(uint64_t k, uint64_t g) {
  138. uint64_t data[2] = { k, g };
  139. return Hash(reinterpret_cast<char*>(data), sizeof(data), 0);
  140. }
  141. static Key MakeKey(uint64_t k, uint64_t g) {
  142. assert(sizeof(Key) == sizeof(uint64_t));
  143. assert(k <= K); // We sometimes pass K to seek to the end of the skiplist
  144. assert(g <= 0xffffffffu);
  145. return ((k << 40) | (g << 8) | (HashNumbers(k, g) & 0xff));
  146. }
  147. static bool IsValidKey(Key k) {
  148. return hash(k) == (HashNumbers(key(k), gen(k)) & 0xff);
  149. }
  150. static Key RandomTarget(Random* rnd) {
  151. switch (rnd->Next() % 10) {
  152. case 0:
  153. // Seek to beginning
  154. return MakeKey(0, 0);
  155. case 1:
  156. // Seek to end
  157. return MakeKey(K, 0);
  158. default:
  159. // Seek to middle
  160. return MakeKey(rnd->Next() % K, 0);
  161. }
  162. }
  163. // Per-key generation
  164. struct State {
  165. port::AtomicPointer generation[K];
  166. void Set(int k, intptr_t v) {
  167. generation[k].Release_Store(reinterpret_cast<void*>(v));
  168. }
  169. intptr_t Get(int k) {
  170. return reinterpret_cast<intptr_t>(generation[k].Acquire_Load());
  171. }
  172. State() {
  173. for (int k = 0; k < K; k++) {
  174. Set(k, 0);
  175. }
  176. }
  177. };
  178. // Current state of the test
  179. State current_;
  180. Arena arena_;
  181. // SkipList is not protected by mu_. We just use a single writer
  182. // thread to modify it.
  183. SkipList<Key, Comparator> list_;
  184. public:
  185. ConcurrentTest() : list_(Comparator(), &arena_) { }
  186. // REQUIRES: External synchronization
  187. void WriteStep(Random* rnd) {
  188. const uint32_t k = rnd->Next() % K;
  189. const intptr_t g = current_.Get(k) + 1;
  190. const Key key = MakeKey(k, g);
  191. list_.Insert(key);
  192. current_.Set(k, g);
  193. }
  194. void ReadStep(Random* rnd) {
  195. // Remember the initial committed state of the skiplist.
  196. State initial_state;
  197. for (int k = 0; k < K; k++) {
  198. initial_state.Set(k, current_.Get(k));
  199. }
  200. Key pos = RandomTarget(rnd);
  201. SkipList<Key, Comparator>::Iterator iter(&list_);
  202. iter.Seek(pos);
  203. while (true) {
  204. Key current;
  205. if (!iter.Valid()) {
  206. current = MakeKey(K, 0);
  207. } else {
  208. current = iter.key();
  209. ASSERT_TRUE(IsValidKey(current)) << current;
  210. }
  211. ASSERT_LE(pos, current) << "should not go backwards";
  212. // Verify that everything in [pos,current) was not present in
  213. // initial_state.
  214. while (pos < current) {
  215. ASSERT_LT(key(pos), K) << pos;
  216. // Note that generation 0 is never inserted, so it is ok if
  217. // <*,0,*> is missing.
  218. ASSERT_TRUE((gen(pos) == 0) ||
  219. (gen(pos) > initial_state.Get(key(pos)))
  220. ) << "key: " << key(pos)
  221. << "; gen: " << gen(pos)
  222. << "; initgen: "
  223. << initial_state.Get(key(pos));
  224. // Advance to next key in the valid key space
  225. if (key(pos) < key(current)) {
  226. pos = MakeKey(key(pos) + 1, 0);
  227. } else {
  228. pos = MakeKey(key(pos), gen(pos) + 1);
  229. }
  230. }
  231. if (!iter.Valid()) {
  232. break;
  233. }
  234. if (rnd->Next() % 2) {
  235. iter.Next();
  236. pos = MakeKey(key(pos), gen(pos) + 1);
  237. } else {
  238. Key new_target = RandomTarget(rnd);
  239. if (new_target > pos) {
  240. pos = new_target;
  241. iter.Seek(new_target);
  242. }
  243. }
  244. }
  245. }
  246. };
  247. const uint32_t ConcurrentTest::K;
  248. // Simple test that does single-threaded testing of the ConcurrentTest
  249. // scaffolding.
  250. TEST(SkipTest, ConcurrentWithoutThreads) {
  251. ConcurrentTest test;
  252. Random rnd(test::RandomSeed());
  253. for (int i = 0; i < 10000; i++) {
  254. test.ReadStep(&rnd);
  255. test.WriteStep(&rnd);
  256. }
  257. }
  258. class TestState {
  259. public:
  260. ConcurrentTest t_;
  261. int seed_;
  262. port::AtomicPointer quit_flag_;
  263. enum ReaderState {
  264. STARTING,
  265. RUNNING,
  266. DONE
  267. };
  268. explicit TestState(int s)
  269. : seed_(s),
  270. quit_flag_(NULL),
  271. state_(STARTING),
  272. state_cv_(&mu_) {}
  273. void Wait(ReaderState s) {
  274. mu_.Lock();
  275. while (state_ != s) {
  276. state_cv_.Wait();
  277. }
  278. mu_.Unlock();
  279. }
  280. void Change(ReaderState s) {
  281. mu_.Lock();
  282. state_ = s;
  283. state_cv_.Signal();
  284. mu_.Unlock();
  285. }
  286. private:
  287. port::Mutex mu_;
  288. ReaderState state_;
  289. port::CondVar state_cv_;
  290. };
  291. static void ConcurrentReader(void* arg) {
  292. TestState* state = reinterpret_cast<TestState*>(arg);
  293. Random rnd(state->seed_);
  294. int64_t reads = 0;
  295. state->Change(TestState::RUNNING);
  296. while (!state->quit_flag_.Acquire_Load()) {
  297. state->t_.ReadStep(&rnd);
  298. ++reads;
  299. }
  300. state->Change(TestState::DONE);
  301. }
  302. static void RunConcurrent(int run) {
  303. const int seed = test::RandomSeed() + (run * 100);
  304. Random rnd(seed);
  305. const int N = 1000;
  306. const int kSize = 1000;
  307. for (int i = 0; i < N; i++) {
  308. if ((i % 100) == 0) {
  309. fprintf(stderr, "Run %d of %d\n", i, N);
  310. }
  311. TestState state(seed + 1);
  312. Env::Default()->Schedule(ConcurrentReader, &state);
  313. state.Wait(TestState::RUNNING);
  314. for (int i = 0; i < kSize; i++) {
  315. state.t_.WriteStep(&rnd);
  316. }
  317. state.quit_flag_.Release_Store(&state); // Any non-NULL arg will do
  318. state.Wait(TestState::DONE);
  319. }
  320. }
  321. TEST(SkipTest, Concurrent1) { RunConcurrent(1); }
  322. TEST(SkipTest, Concurrent2) { RunConcurrent(2); }
  323. TEST(SkipTest, Concurrent3) { RunConcurrent(3); }
  324. TEST(SkipTest, Concurrent4) { RunConcurrent(4); }
  325. TEST(SkipTest, Concurrent5) { RunConcurrent(5); }
  326. }
  327. int main(int argc, char** argv) {
  328. return leveldb::test::RunAllTests();
  329. }