123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- /*
- * Copyright (c) 2022, kleines Filmröllchen <filmroellchen@serenityos.org>
- *
- * SPDX-License-Identifier: BSD-2-Clause
- */
- #include <LibCore/SharedCircularQueue.h>
- #include <LibTest/TestCase.h>
- #include <LibThreading/Thread.h>
- #include <sched.h>
- using TestQueue = Core::SharedSingleProducerCircularQueue<int>;
- using QueueError = ErrorOr<int, TestQueue::QueueStatus>;
- Function<intptr_t()> dequeuer(TestQueue& queue, Atomic<size_t>& dequeue_count, size_t test_count);
- // These first two cases don't multithread at all.
- TEST_CASE(simple_enqueue)
- {
- auto queue = MUST(TestQueue::try_create());
- for (size_t i = 0; i < queue.size() - 1; ++i)
- EXPECT(!queue.try_enqueue((int)i).is_error());
- auto result = queue.try_enqueue(0);
- EXPECT(result.is_error());
- EXPECT_EQ(result.release_error(), TestQueue::QueueStatus::Full);
- }
- TEST_CASE(simple_dequeue)
- {
- auto queue = MUST(TestQueue::try_create());
- auto const test_count = 10;
- for (int i = 0; i < test_count; ++i)
- (void)queue.try_enqueue(i);
- for (int i = 0; i < test_count; ++i) {
- auto const element = queue.try_dequeue();
- EXPECT(!element.is_error());
- EXPECT_EQ(element.value(), i);
- }
- }
- // There is one parallel consumer, but nobody is producing at the same time.
- TEST_CASE(simple_multithread)
- {
- auto queue = MUST(TestQueue::try_create());
- auto const test_count = 10;
- for (int i = 0; i < test_count; ++i)
- (void)queue.try_enqueue(i);
- auto second_thread = Threading::Thread::construct([&queue]() {
- auto copied_queue = queue;
- for (int i = 0; i < test_count; ++i) {
- QueueError result = TestQueue::QueueStatus::Invalid;
- do {
- result = copied_queue.try_dequeue();
- if (!result.is_error())
- EXPECT_EQ(result.value(), i);
- } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
- if (result.is_error())
- FAIL("Unexpected error while dequeueing.");
- }
- return 0;
- });
- second_thread->start();
- (void)second_thread->join();
- EXPECT_EQ(queue.weak_used(), (size_t)0);
- }
- // There is one parallel consumer and one parallel producer.
- TEST_CASE(producer_consumer_multithread)
- {
- auto queue = MUST(TestQueue::try_create());
- // Ensure that we have the possibility of filling the queue up.
- auto const test_count = queue.size() * 4;
- Atomic<bool> other_thread_running { false };
- auto second_thread = Threading::Thread::construct([&queue, &other_thread_running]() {
- auto copied_queue = queue;
- other_thread_running.store(true);
- for (size_t i = 0; i < test_count; ++i) {
- QueueError result = TestQueue::QueueStatus::Invalid;
- do {
- result = copied_queue.try_dequeue();
- if (!result.is_error())
- EXPECT_EQ(result.value(), (int)i);
- } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
- if (result.is_error())
- FAIL("Unexpected error while dequeueing.");
- }
- return 0;
- });
- second_thread->start();
- while (!other_thread_running.load())
- ;
- for (size_t i = 0; i < test_count; ++i) {
- ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid;
- do {
- result = queue.try_enqueue((int)i);
- } while (result.is_error() && result.error() == TestQueue::QueueStatus::Full);
- if (result.is_error())
- FAIL("Unexpected error while enqueueing.");
- }
- (void)second_thread->join();
- EXPECT_EQ(queue.weak_used(), (size_t)0);
- }
- // There are multiple parallel consumers, but nobody is producing at the same time.
- TEST_CASE(multi_consumer)
- {
- auto queue = MUST(TestQueue::try_create());
- // This needs to be divisible by 4!
- size_t const test_count = queue.size() - 4;
- Atomic<size_t> dequeue_count = 0;
- auto threads = {
- Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
- Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
- Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
- Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
- };
- for (size_t i = 0; i < test_count; ++i)
- (void)queue.try_enqueue((int)i);
- for (auto thread : threads)
- thread->start();
- for (auto thread : threads)
- (void)thread->join();
- EXPECT_EQ(queue.weak_used(), (size_t)0);
- EXPECT_EQ(dequeue_count.load(), (size_t)test_count);
- }
- // There are multiple parallel consumers and one parallel producer.
- TEST_CASE(single_producer_multi_consumer)
- {
- auto queue = MUST(TestQueue::try_create());
- // Choose a higher number to provoke possible race conditions.
- size_t const test_count = queue.size() * 8;
- Atomic<size_t> dequeue_count = 0;
- auto threads = {
- Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
- Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
- Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
- Threading::Thread::construct(dequeuer(queue, dequeue_count, test_count)),
- };
- for (auto thread : threads)
- thread->start();
- for (size_t i = 0; i < test_count; ++i) {
- ErrorOr<void, TestQueue::QueueStatus> result = TestQueue::QueueStatus::Invalid;
- do {
- result = queue.try_enqueue((int)i);
- // After we put something in the first time, let's wait while nobody has dequeued yet.
- while (dequeue_count.load() == 0)
- ;
- // Give others time to do something.
- sched_yield();
- } while (result.is_error() && result.error() == TestQueue::QueueStatus::Full);
- if (result.is_error())
- FAIL("Unexpected error while enqueueing.");
- }
- for (auto thread : threads)
- (void)thread->join();
- EXPECT_EQ(queue.weak_used(), (size_t)0);
- EXPECT_EQ(dequeue_count.load(), (size_t)test_count);
- }
- Function<intptr_t()> dequeuer(TestQueue& queue, Atomic<size_t>& dequeue_count, size_t const test_count)
- {
- return [&queue, &dequeue_count, test_count]() {
- auto copied_queue = queue;
- for (size_t i = 0; i < test_count / 4; ++i) {
- QueueError result = TestQueue::QueueStatus::Invalid;
- do {
- result = copied_queue.try_dequeue();
- if (!result.is_error())
- dequeue_count.fetch_add(1);
- // Give others time to do something.
- sched_yield();
- } while (result.is_error() && result.error() == TestQueue::QueueStatus::Empty);
- if (result.is_error())
- FAIL("Unexpected error while dequeueing.");
- }
- return (intptr_t)0;
- };
- }
|