From 8472b2b1fb98eb3695a77939bce0089f7643e711 Mon Sep 17 00:00:00 2001 From: Timothy Flynn Date: Sun, 22 Sep 2024 13:41:31 -0400 Subject: [PATCH] LibThreading: Remove the thread pool The thread pool test is currently flakey and takes over 2 minutes to run on CI. It also currently has no users now that RequestServer uses curl, so let's just remove it for now. If we need it in the future, we can revive it from git history. --- Tests/LibThreading/CMakeLists.txt | 1 - Tests/LibThreading/TestThreadPool.cpp | 57 ------- Userland/Libraries/LibThreading/ThreadPool.h | 154 ------------------- 3 files changed, 212 deletions(-) delete mode 100644 Tests/LibThreading/TestThreadPool.cpp delete mode 100644 Userland/Libraries/LibThreading/ThreadPool.h diff --git a/Tests/LibThreading/CMakeLists.txt b/Tests/LibThreading/CMakeLists.txt index b505bfe17d5..5f4914ab4fd 100644 --- a/Tests/LibThreading/CMakeLists.txt +++ b/Tests/LibThreading/CMakeLists.txt @@ -1,6 +1,5 @@ set(TEST_SOURCES TestThread.cpp - TestThreadPool.cpp ) foreach(source IN LISTS TEST_SOURCES) diff --git a/Tests/LibThreading/TestThreadPool.cpp b/Tests/LibThreading/TestThreadPool.cpp deleted file mode 100644 index 2d12d0970fb..00000000000 --- a/Tests/LibThreading/TestThreadPool.cpp +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (c) 2024, Braydn Moore - * - * SPDX-License-Identifier: BSD-2-Clause - */ - -#include -#include -#include -#include - -using namespace AK::TimeLiterals; - -TEST_CASE(thread_pool_deadlock) -{ - static constexpr auto RUN_TIMEOUT = 120_sec; - static constexpr u64 NUM_RUNS = 1000; - static constexpr u64 MAX_VALUE = 1 << 15; - - for (u64 i = 0; i < NUM_RUNS; ++i) { - u64 expected_value = (MAX_VALUE * (MAX_VALUE + 1)) / 2; - Atomic sum; - - // heap allocate the ThreadPool in case it deadlocks. Exiting in the - // case of a deadlock will purposefully leak memory to avoid calling the - // destructor and hanging the test - auto* thread_pool = new Threading::ThreadPool( - [&sum](u64 current_val) { - sum += current_val; - }); - - for (u64 j = 0; j <= MAX_VALUE; ++j) { - thread_pool->submit(j); - } - - auto join_thread = Threading::Thread::construct([thread_pool]() -> intptr_t { - thread_pool->wait_for_all(); - delete thread_pool; - return 0; - }); - - join_thread->start(); - auto timer = Core::ElapsedTimer::start_new(Core::TimerType::Precise); - while (!join_thread->has_exited() && timer.elapsed_milliseconds() < RUN_TIMEOUT.to_milliseconds()) - ; - EXPECT(join_thread->has_exited()); - // exit since the current pool is deadlocked and we have no way of - // unblocking the pool other than having the OS teardown the process - // struct - if (!join_thread->has_exited()) { - return; - } - - (void)join_thread->join(); - EXPECT_EQ(sum.load(), expected_value); - } -} diff --git a/Userland/Libraries/LibThreading/ThreadPool.h b/Userland/Libraries/LibThreading/ThreadPool.h deleted file mode 100644 index 91e6af99dc5..00000000000 --- a/Userland/Libraries/LibThreading/ThreadPool.h +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright (c) 2024, Ali Mohammad Pur - * - * SPDX-License-Identifier: BSD-2-Clause - */ - -#pragma once - -#include -#include -#include -#include -#include -#include -#include - -namespace Threading { - -template -struct ThreadPoolLooper { - IterationDecision next(Pool& pool, bool wait) - { - Optional entry; - while (true) { - pool.m_busy_count++; - entry = pool.m_work_queue.with_locked([&](auto& queue) -> Optional { - if (queue.is_empty()) - return {}; - return queue.dequeue(); - }); - if (entry.has_value()) - break; - - pool.m_busy_count--; - if (pool.m_should_exit) - return IterationDecision::Break; - - if (!wait) - return IterationDecision::Continue; - - pool.m_mutex.lock(); - // broadcast on m_work_done here since it is possible the - // wait_for_all loop missed the previous broadcast when work was - // actually done. Without this broadcast the ThreadPool could - // deadlock as there is no remaining work to be done, so this thread - // never resumes and the wait_for_all loop never wakes as there is no - // more work to be completed. - pool.m_work_done.broadcast(); - pool.m_work_available.wait(); - pool.m_mutex.unlock(); - } - - pool.m_handler(entry.release_value()); - pool.m_busy_count--; - pool.m_work_done.signal(); - return IterationDecision::Continue; - } -}; - -template class Looper = ThreadPoolLooper> -class ThreadPool { - AK_MAKE_NONCOPYABLE(ThreadPool); - AK_MAKE_NONMOVABLE(ThreadPool); - -public: - using Work = TWork; - friend struct ThreadPoolLooper; - - ThreadPool(Optional concurrency = {}) - requires(IsFunction) - : m_handler([](Work work) { return work(); }) - , m_work_available(m_mutex) - , m_work_done(m_mutex) - { - initialize_workers(concurrency.value_or(Core::System::hardware_concurrency())); - } - - explicit ThreadPool(Function handler, Optional concurrency = {}) - : m_handler(move(handler)) - , m_work_available(m_mutex) - , m_work_done(m_mutex) - { - initialize_workers(concurrency.value_or(Core::System::hardware_concurrency())); - } - - ~ThreadPool() - { - m_should_exit.store(true, AK::MemoryOrder::memory_order_release); - for (auto& worker : m_workers) { - while (!worker->has_exited()) { - m_work_available.broadcast(); - } - (void)worker->join(); - } - } - - void submit(Work work) - { - m_work_queue.with_locked([&](auto& queue) { - queue.enqueue({ move(work) }); - }); - m_work_available.broadcast(); - } - - void wait_for_all() - { - { - MutexLocker lock(m_mutex); - m_work_done.wait_while([this]() { - return m_work_queue.with_locked([](auto& queue) { - return !queue.is_empty(); - }); - }); - } - { - MutexLocker lock(m_mutex); - m_work_done.wait_while([this] { - return m_busy_count.load(AK::MemoryOrder::memory_order_acquire) > 0; - }); - } - } - -private: - void initialize_workers(size_t concurrency) - { - for (size_t i = 0; i < concurrency; ++i) { - m_workers.append(Thread::construct([this]() -> intptr_t { - Looper thread_looper; - for (; !m_should_exit;) { - auto result = thread_looper.next(*this, true); - if (result == IterationDecision::Break) - break; - } - - return 0; - }, - "ThreadPool worker"sv)); - } - - for (auto& worker : m_workers) - worker->start(); - } - - Vector> m_workers; - MutexProtected> m_work_queue; - Function m_handler; - Mutex m_mutex; - ConditionVariable m_work_available; - ConditionVariable m_work_done; - Atomic m_should_exit { false }; - Atomic m_busy_count { 0 }; -}; - -}