123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- /*
- * Copyright (c) 2023, Andreas Kling <kling@serenityos.org>
- *
- * SPDX-License-Identifier: BSD-2-Clause
- */
- #include <AK/Vector.h>
- #include <LibCore/DeferredInvocationContext.h>
- #include <LibCore/EventLoopImplementation.h>
- #include <LibCore/EventReceiver.h>
- #include <LibCore/Promise.h>
- #include <LibCore/ThreadEventQueue.h>
- #include <LibThreading/Mutex.h>
- #include <errno.h>
- namespace Core {
- struct ThreadEventQueue::Private {
- struct QueuedEvent {
- AK_MAKE_NONCOPYABLE(QueuedEvent);
- AK_MAKE_DEFAULT_MOVABLE(QueuedEvent);
- public:
- QueuedEvent(EventReceiver& receiver, NonnullOwnPtr<Event> event)
- : receiver(receiver)
- , event(move(event))
- {
- }
- ~QueuedEvent() = default;
- WeakPtr<EventReceiver> receiver;
- NonnullOwnPtr<Event> event;
- };
- Threading::Mutex mutex;
- Vector<QueuedEvent, 128> queued_events;
- Vector<NonnullRefPtr<Promise<NonnullRefPtr<EventReceiver>>>, 16> pending_promises;
- bool warned_promise_count { false };
- };
- static thread_local ThreadEventQueue* s_current_thread_event_queue;
- ThreadEventQueue& ThreadEventQueue::current()
- {
- if (!s_current_thread_event_queue) {
- // FIXME: Don't leak these.
- s_current_thread_event_queue = new ThreadEventQueue;
- }
- return *s_current_thread_event_queue;
- }
- ThreadEventQueue::ThreadEventQueue()
- : m_private(make<Private>())
- {
- }
- ThreadEventQueue::~ThreadEventQueue() = default;
- void ThreadEventQueue::post_event(Core::EventReceiver& receiver, NonnullOwnPtr<Core::Event> event)
- {
- {
- Threading::MutexLocker lock(m_private->mutex);
- m_private->queued_events.empend(receiver, move(event));
- }
- Core::EventLoopManager::the().did_post_event();
- }
- void ThreadEventQueue::add_job(NonnullRefPtr<Promise<NonnullRefPtr<EventReceiver>>> promise)
- {
- Threading::MutexLocker lock(m_private->mutex);
- m_private->pending_promises.append(move(promise));
- }
- void ThreadEventQueue::cancel_all_pending_jobs()
- {
- Threading::MutexLocker lock(m_private->mutex);
- for (auto const& promise : m_private->pending_promises)
- promise->reject(Error::from_errno(ECANCELED));
- m_private->pending_promises.clear();
- }
- size_t ThreadEventQueue::process()
- {
- decltype(m_private->queued_events) events;
- {
- Threading::MutexLocker locker(m_private->mutex);
- events = move(m_private->queued_events);
- m_private->pending_promises.remove_all_matching([](auto& job) { return job->is_resolved() || job->is_rejected(); });
- }
- size_t processed_events = 0;
- for (size_t i = 0; i < events.size(); ++i) {
- auto& queued_event = events.at(i);
- auto receiver = queued_event.receiver.strong_ref();
- auto& event = *queued_event.event;
- if (!receiver) {
- switch (event.type()) {
- case Event::Quit:
- VERIFY_NOT_REACHED();
- default:
- // Receiver disappeared, drop the event on the floor.
- break;
- }
- } else if (event.type() == Event::Type::DeferredInvoke) {
- static_cast<DeferredInvocationEvent&>(event).m_invokee();
- } else {
- NonnullRefPtr<EventReceiver> protector(*receiver);
- receiver->dispatch_event(event);
- }
- ++processed_events;
- }
- {
- Threading::MutexLocker locker(m_private->mutex);
- if (m_private->pending_promises.size() > 30 && !m_private->warned_promise_count) {
- m_private->warned_promise_count = true;
- dbgln("ThreadEventQueue::process: Job queue wasn't designed for this load ({} promises)", m_private->pending_promises.size());
- }
- }
- return processed_events;
- }
- bool ThreadEventQueue::has_pending_events() const
- {
- Threading::MutexLocker locker(m_private->mutex);
- return !m_private->queued_events.is_empty();
- }
- }
|