TestLibCoreSharedSingleProducerCircularQueue.cpp 6.8 KB


  1. /*
  2. * Copyright (c) 2022, kleines Filmröllchen <filmroellchen@serenityos.org>
  3. *
  4. * SPDX-License-Identifier: BSD-2-Clause
  5. */
  6. #include <LibCore/SharedCircularQueue.h>
  7. #include <LibTest/TestCase.h>
  8. #include <LibThreading/Thread.h>
  9. #include <sched.h>
  10. using TestQueue = Core::SharedSingleProducerCircularQueue<int>;
  11. using QueueError = ErrorOr<int, TestQueue::QueueStatus>;
  12. Function<intptr_t()> dequeuer(TestQueue& queue, Atomic<size_t>& dequeue_count, size_t test_count);
  13. // These first two cases don't multithread at all.
  14. TEST_CASE(simple_enqueue)
  15. {
  16. auto queue = MUST(TestQueue::try_create());
  17. for (size_t i = 0; i < queue.size() - 1; ++i)
  18. EXPECT(!queue.try_enqueue((int)i).is_error());
  19. auto result = queue.try_enqueue(0);
  20. EXPECT(result.is_error());
  21. EXPECT_EQ(result.release_error(), TestQueue::QueueStatus::Full);
  22. }
  23. TEST_CASE(simple_dequeue)
  24. {
  25. auto queue = MUST(TestQueue::try_create());
  26. auto const test_count = 10;
  27. for (int i = 0; i < test_count; ++i)
  28. (void)queue.try_enqueue(i);
  29. for (int i = 0; i < test_count; ++i) {
  30. auto const element = queue.try_dequeue();
  31. EXPECT(!element.is_error());
  32. EXPECT_EQ(element.value(), i);
  33. }
  34. }
  35. // There is one parallel consumer, but nobody is producing at the same time.
  36. TEST_CASE(simple_multithread)
  37. {
  38. auto queue = MUST(TestQueue::try_create());
  39. auto const test_count = 10;
  40. for (int i = 0; i < test_count; ++i)
  41. (void)queue.try_enqueue(i);
  42. auto second_thread = Threading::Thread::construct([&queue]() {
  43. auto copied_queue = queue;
  44. for (int i = 0; i < test_count; ++i) {
  45. QueueError result = TestQueue::QueueStatus::Invalid;
  46. do {
  47. result = copied_queue.try_dequeue();
  48. if (!result.is_error())
  49. EXPECT_EQ(result.value(), i);
  50. } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
  51. if (result.is_error())
  52. FAIL("Unexpected error while dequeueing.");
  53. }
  54. return 0;
  55. });
  56. second_thread->start();
  57. (void)second_thread->join();
  58. EXPECT_EQ(queue.weak_used(), (size_t)0);
  59. }
  60. // There is one parallel consumer and one parallel producer.
  61. TEST_CASE(producer_consumer_multithread)
  62. {
  63. auto queue = MUST(TestQueue::try_create());
  64. // Ensure that we have the possibility of filling the queue up.
  65. auto const test_count = queue.size() * 4;
  66. Atomic<bool> other_thread_running { false };
  67. auto second_thread = Threading::Thread::construct([&queue, &other_thread_running]() {
  68. auto copied_queue = queue;
  69. other_thread_running.store(true);
  70. for (size_t i = 0; i < test_count; ++i) {
  71. QueueError result = TestQueue::QueueStatus::Invalid;
  72. do {
  73. result = copied_queue.try_dequeue();
  74. if (!result.is_error())
  75. EXPECT_EQ(result.value(), (int)i);
  76. } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
  77. if (result.is_error())
  78. FAIL("Unexpected error while dequeueing.");
  79. }
  80. return 0;
  81. });
  82. second_thread->start();
  83. while (!other_thread_running.load())
  84. ;
  85. for (size_t i = 0; i < test_count; ++i) {
  86. ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid;
  87. do {
  88. result = queue.try_enqueue((int)i);
  89. } while (result.is_error() && result.error() == TestQueue::QueueStatus::Full);
  90. if (result.is_error())
  91. FAIL("Unexpected error while enqueueing.");
  92. }
  93. (void)second_thread->join();
  94. EXPECT_EQ(queue.weak_used(), (size_t)0);
  95. }
  96. // There are multiple parallel consumers, but nobody is producing at the same time.
  97. TEST_CASE(multi_consumer)
  98. {
  99. auto queue = MUST(TestQueue::try_create());
  100. // This needs to be divisible by 4!
  101. size_t const test_count = queue.size() - 4;
  102. Atomic<size_t> dequeue_count = 0;
  103. auto threads = {
  104. Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
  105. Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
  106. Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
  107. Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
  108. };
  109. for (size_t i = 0; i < test_count; ++i)
  110. (void)queue.try_enqueue((int)i);
  111. for (auto thread : threads)
  112. thread->start();
  113. for (auto thread : threads)
  114. (void)thread->join();
  115. EXPECT_EQ(queue.weak_used(), (size_t)0);
  116. EXPECT_EQ(dequeue_count.load(), (size_t)test_count);
  117. }
  118. // There are multiple parallel consumers and one parallel producer.
  119. TEST_CASE(single_producer_multi_consumer)
  120. {
  121. auto queue = MUST(TestQueue::try_create());
  122. // Choose a higher number to provoke possible race conditions.
  123. size_t const test_count = queue.size() * 8;
  124. Atomic<size_t> dequeue_count = 0;
  125. auto threads = {
  126. Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
  127. Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
  128. Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
  129. Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
  130. };
  131. for (auto thread : threads)
  132. thread->start();
  133. for (size_t i = 0; i < test_count; ++i) {
  134. ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid;
  135. do {
  136. result = queue.try_enqueue((int)i);
  137. // After we put something in the first time, let's wait while nobody has dequeued yet.
  138. while (dequeue_count.load() == 0)
  139. ;
  140. // Give others time to do something.
  141. sched_yield();
  142. } while (result.is_error() && result.error() == TestQueue::QueueStatus::Full);
  143. if (result.is_error())
  144. FAIL("Unexpected error while enqueueing.");
  145. }
  146. for (auto thread : threads)
  147. (void)thread->join();
  148. EXPECT_EQ(queue.weak_used(), (size_t)0);
  149. EXPECT_EQ(dequeue_count.load(), (size_t)test_count);
  150. }
  151. Function<intptr_t()> dequeuer(TestQueue& queue, Atomic<size_t>& dequeue_count, size_t const test_count)
  152. {
  153. return [&queue, &dequeue_count, test_count]() {
  154. auto copied_queue = queue;
  155. for (size_t i = 0; i < test_count / 4; ++i) {
  156. QueueError result = TestQueue::QueueStatus::Invalid;
  157. do {
  158. result = copied_queue.try_dequeue();
  159. if (!result.is_error())
  160. dequeue_count.fetch_add(1);
  161. // Give others time to do something.
  162. sched_yield();
  163. } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
  164. if (result.is_error())
  165. FAIL("Unexpected error while dequeueing.");
  166. }
  167. return (intptr_t)0;
  168. };
  169. }