/* * Copyright (c) 2023, Andreas Kling * * SPDX-License-Identifier: BSD-2-Clause */ #include #include #include #include #include #include #include #include #include namespace Core { struct ThreadEventQueue::Private { struct QueuedEvent { AK_MAKE_NONCOPYABLE(QueuedEvent); AK_MAKE_DEFAULT_MOVABLE(QueuedEvent); public: QueuedEvent(EventReceiver& receiver, NonnullOwnPtr event) : receiver(receiver) , event(move(event)) { } ~QueuedEvent() = default; WeakPtr receiver; NonnullOwnPtr event; }; Threading::Mutex mutex; Vector queued_events; Vector>>, 16> pending_promises; bool warned_promise_count { false }; }; static pthread_key_t s_current_thread_event_queue_key; static pthread_once_t s_current_thread_event_queue_key_once = PTHREAD_ONCE_INIT; ThreadEventQueue& ThreadEventQueue::current() { pthread_once(&s_current_thread_event_queue_key_once, [] { pthread_key_create(&s_current_thread_event_queue_key, [](void* value) { if (value) delete static_cast(value); }); }); auto* ptr = static_cast(pthread_getspecific(s_current_thread_event_queue_key)); if (!ptr) { ptr = new ThreadEventQueue; pthread_setspecific(s_current_thread_event_queue_key, ptr); } return *ptr; } ThreadEventQueue::ThreadEventQueue() : m_private(make()) { } ThreadEventQueue::~ThreadEventQueue() = default; void ThreadEventQueue::post_event(Core::EventReceiver& receiver, NonnullOwnPtr event) { { Threading::MutexLocker lock(m_private->mutex); m_private->queued_events.empend(receiver, move(event)); } Core::EventLoopManager::the().did_post_event(); } void ThreadEventQueue::add_job(NonnullRefPtr>> promise) { Threading::MutexLocker lock(m_private->mutex); m_private->pending_promises.append(move(promise)); } void ThreadEventQueue::cancel_all_pending_jobs() { Threading::MutexLocker lock(m_private->mutex); for (auto const& promise : m_private->pending_promises) promise->reject(Error::from_errno(ECANCELED)); m_private->pending_promises.clear(); } size_t ThreadEventQueue::process() { decltype(m_private->queued_events) events; { Threading::MutexLocker locker(m_private->mutex); events = move(m_private->queued_events); m_private->pending_promises.remove_all_matching([](auto& job) { return job->is_resolved() || job->is_rejected(); }); } size_t processed_events = 0; for (size_t i = 0; i < events.size(); ++i) { auto& queued_event = events.at(i); auto receiver = queued_event.receiver.strong_ref(); auto& event = *queued_event.event; if (!receiver) { switch (event.type()) { case Event::Quit: VERIFY_NOT_REACHED(); default: // Receiver disappeared, drop the event on the floor. break; } } else if (event.type() == Event::Type::DeferredInvoke) { static_cast(event).m_invokee(); } else { NonnullRefPtr protector(*receiver); receiver->dispatch_event(event); } ++processed_events; } { Threading::MutexLocker locker(m_private->mutex); if (m_private->pending_promises.size() > 30 && !m_private->warned_promise_count) { m_private->warned_promise_count = true; dbgln("ThreadEventQueue::process: Job queue wasn't designed for this load ({} promises)", m_private->pending_promises.size()); } } return processed_events; } bool ThreadEventQueue::has_pending_events() const { Threading::MutexLocker locker(m_private->mutex); return !m_private->queued_events.is_empty(); } }