浏览代码

LibCore: Move event queueing to a per-thread event queue

Instead of juggling events between individual instances of
Core::EventLoop, move queueing and processing to a separate per-thread
queue (ThreadEventQueue).
Andreas Kling 2 年之前
父节点
当前提交
1587caef84

+ 1 - 0
Userland/Libraries/LibCore/CMakeLists.txt

@@ -30,6 +30,7 @@ set(SOURCES
     System.cpp
     System.cpp
     SystemServerTakeover.cpp
     SystemServerTakeover.cpp
     TCPServer.cpp
     TCPServer.cpp
+    ThreadEventQueue.cpp
     Timer.cpp
     Timer.cpp
     UDPServer.cpp
     UDPServer.cpp
     Version.cpp
     Version.cpp

+ 1 - 0
Userland/Libraries/LibCore/Event.h

@@ -49,6 +49,7 @@ private:
 
 
 class DeferredInvocationEvent : public Event {
 class DeferredInvocationEvent : public Event {
     friend class EventLoop;
     friend class EventLoop;
+    friend class ThreadEventQueue;
 
 
 public:
 public:
     DeferredInvocationEvent(NonnullRefPtr<DeferredInvocationContext> context, Function<void()> invokee)
     DeferredInvocationEvent(NonnullRefPtr<DeferredInvocationContext> context, Function<void()> invokee)

+ 15 - 83
Userland/Libraries/LibCore/EventLoop.cpp

@@ -25,6 +25,7 @@
 #include <LibCore/Promise.h>
 #include <LibCore/Promise.h>
 #include <LibCore/SessionManagement.h>
 #include <LibCore/SessionManagement.h>
 #include <LibCore/Socket.h>
 #include <LibCore/Socket.h>
+#include <LibCore/ThreadEventQueue.h>
 #include <LibThreading/Mutex.h>
 #include <LibThreading/Mutex.h>
 #include <LibThreading/MutexProtected.h>
 #include <LibThreading/MutexProtected.h>
 #include <errno.h>
 #include <errno.h>
@@ -65,6 +66,12 @@ struct EventLoopTimer {
 
 
 struct EventLoop::Private {
 struct EventLoop::Private {
     Threading::Mutex lock;
     Threading::Mutex lock;
+    ThreadEventQueue& thread_event_queue;
+
+    Private()
+        : thread_event_queue(ThreadEventQueue::current())
+    {
+    }
 };
 };
 
 
 static Threading::MutexProtected<NeverDestroyed<IDAllocator>> s_id_allocator;
 static Threading::MutexProtected<NeverDestroyed<IDAllocator>> s_id_allocator;
@@ -78,7 +85,6 @@ static thread_local HashTable<Notifier*>* s_notifiers;
 // While wake() pushes zero into the pipe, signal numbers (by defintion nonzero, see signal_numbers.h) are pushed into the pipe verbatim.
 // While wake() pushes zero into the pipe, signal numbers (by defintion nonzero, see signal_numbers.h) are pushed into the pipe verbatim.
 thread_local int EventLoop::s_wake_pipe_fds[2];
 thread_local int EventLoop::s_wake_pipe_fds[2];
 thread_local bool EventLoop::s_wake_pipe_initialized { false };
 thread_local bool EventLoop::s_wake_pipe_initialized { false };
-thread_local bool s_warned_promise_count { false };
 
 
 void EventLoop::initialize_wake_pipes()
 void EventLoop::initialize_wake_pipes()
 {
 {
@@ -421,7 +427,6 @@ public:
         : m_event_loop(event_loop)
         : m_event_loop(event_loop)
     {
     {
         if (EventLoop::has_been_instantiated()) {
         if (EventLoop::has_been_instantiated()) {
-            m_event_loop.take_pending_events_from(EventLoop::current());
             s_event_loop_stack->append(event_loop);
             s_event_loop_stack->append(event_loop);
         }
         }
     }
     }
@@ -429,12 +434,6 @@ public:
     {
     {
         if (EventLoop::has_been_instantiated()) {
         if (EventLoop::has_been_instantiated()) {
             s_event_loop_stack->take_last();
             s_event_loop_stack->take_last();
-            for (auto& job : m_event_loop.m_pending_promises) {
-                // When this event loop was not running below another event loop, the jobs may very well have finished in the meantime.
-                if (!job->is_resolved())
-                    job->cancel(Error::from_string_view("EventLoop is exiting"sv));
-            }
-            EventLoop::current().take_pending_events_from(m_event_loop);
         }
         }
     }
     }
 
 
@@ -462,72 +461,21 @@ void EventLoop::spin_until(Function<bool()> goal_condition)
 
 
 size_t EventLoop::pump(WaitMode mode)
 size_t EventLoop::pump(WaitMode mode)
 {
 {
-    wait_for_event(mode);
+    // Pumping the event loop from another thread is not allowed.
+    VERIFY(&m_private->thread_event_queue == &ThreadEventQueue::current());
 
 
-    decltype(m_queued_events) events;
-    {
-        Threading::MutexLocker locker(m_private->lock);
-        events = move(m_queued_events);
-    }
-
-    m_pending_promises.remove_all_matching([](auto& job) { return job->is_resolved() || job->is_canceled(); });
-
-    size_t processed_events = 0;
-    for (size_t i = 0; i < events.size(); ++i) {
-        auto& queued_event = events.at(i);
-        auto receiver = queued_event.receiver.strong_ref();
-        auto& event = *queued_event.event;
-        if (receiver)
-            dbgln_if(EVENTLOOP_DEBUG, "Core::EventLoop: {} event {}", *receiver, event.type());
-
-        if (!receiver) {
-            switch (event.type()) {
-            case Event::Quit:
-                VERIFY_NOT_REACHED();
-            default:
-                dbgln_if(EVENTLOOP_DEBUG, "Event type {} with no receiver :(", event.type());
-                break;
-            }
-        } else if (event.type() == Event::Type::DeferredInvoke) {
-            dbgln_if(DEFERRED_INVOKE_DEBUG, "DeferredInvoke: receiver = {}", *receiver);
-            static_cast<DeferredInvocationEvent&>(event).m_invokee();
-        } else {
-            NonnullRefPtr<Object> protector(*receiver);
-            receiver->dispatch_event(event);
-        }
-        ++processed_events;
-
-        if (m_exit_requested) {
-            Threading::MutexLocker locker(m_private->lock);
-            dbgln_if(EVENTLOOP_DEBUG, "Core::EventLoop: Exit requested. Rejigging {} events.", events.size() - i);
-            decltype(m_queued_events) new_event_queue;
-            new_event_queue.ensure_capacity(m_queued_events.size() + events.size());
-            for (++i; i < events.size(); ++i)
-                new_event_queue.unchecked_append(move(events[i]));
-            new_event_queue.extend(move(m_queued_events));
-            m_queued_events = move(new_event_queue);
-            break;
-        }
-    }
-
-    if (m_pending_promises.size() > 30 && !s_warned_promise_count) {
-        s_warned_promise_count = true;
-        dbgln("EventLoop {:p} warning: Job queue wasn't designed for this load ({} promises). Please begin optimizing EventLoop::pump() -> m_pending_promises.remove_all_matching", this, m_pending_promises.size());
-    }
-
-    return processed_events;
+    wait_for_event(mode);
+    return m_private->thread_event_queue.process();
 }
 }
 
 
 void EventLoop::post_event(Object& receiver, NonnullOwnPtr<Event>&& event)
 void EventLoop::post_event(Object& receiver, NonnullOwnPtr<Event>&& event)
 {
 {
-    Threading::MutexLocker lock(m_private->lock);
-    dbgln_if(EVENTLOOP_DEBUG, "Core::EventLoop::post_event: ({}) << receiver={}, event={}", m_queued_events.size(), receiver, event);
-    m_queued_events.empend(receiver, move(event));
+    m_private->thread_event_queue.post_event(receiver, move(event));
 }
 }
 
 
 void EventLoop::add_job(NonnullRefPtr<Promise<NonnullRefPtr<Object>>> job_promise)
 void EventLoop::add_job(NonnullRefPtr<Promise<NonnullRefPtr<Object>>> job_promise)
 {
 {
-    m_pending_promises.append(move(job_promise));
+    ThreadEventQueue::current().add_job(move(job_promise));
 }
 }
 
 
 SignalHandlers::SignalHandlers(int signo, void (*handle_signal)(int))
 SignalHandlers::SignalHandlers(int signo, void (*handle_signal)(int))
@@ -711,18 +659,14 @@ retry:
             VERIFY_NOT_REACHED();
             VERIFY_NOT_REACHED();
     }
     }
 
 
-    bool queued_events_is_empty;
-    {
-        Threading::MutexLocker locker(m_private->lock);
-        queued_events_is_empty = m_queued_events.is_empty();
-    }
+    bool has_pending_events = m_private->thread_event_queue.has_pending_events();
 
 
     // Figure out how long to wait at maximum.
     // Figure out how long to wait at maximum.
     // This mainly depends on the WaitMode and whether we have pending events, but also the next expiring timer.
     // This mainly depends on the WaitMode and whether we have pending events, but also the next expiring timer.
     Time now;
     Time now;
     struct timeval timeout = { 0, 0 };
     struct timeval timeout = { 0, 0 };
     bool should_wait_forever = false;
     bool should_wait_forever = false;
-    if (mode == WaitMode::WaitForEvents && queued_events_is_empty) {
+    if (mode == WaitMode::WaitForEvents && !has_pending_events) {
         auto next_timer_expiration = get_next_timer_expiration();
         auto next_timer_expiration = get_next_timer_expiration();
         if (next_timer_expiration.has_value()) {
         if (next_timer_expiration.has_value()) {
             now = Time::now_monotonic_coarse();
             now = Time::now_monotonic_coarse();
@@ -905,16 +849,4 @@ void EventLoop::wake()
     }
     }
 }
 }
 
 
-EventLoop::QueuedEvent::QueuedEvent(Object& receiver, NonnullOwnPtr<Event> event)
-    : receiver(receiver)
-    , event(move(event))
-{
-}
-
-EventLoop::QueuedEvent::QueuedEvent(QueuedEvent&& other)
-    : receiver(other.receiver)
-    , event(move(other.event))
-{
-}
-
 }
 }

+ 0 - 19
Userland/Libraries/LibCore/EventLoop.h

@@ -111,11 +111,6 @@ public:
     };
     };
     static void notify_forked(ForkEvent);
     static void notify_forked(ForkEvent);
 
 
-    void take_pending_events_from(EventLoop& other)
-    {
-        m_queued_events.extend(move(other.m_queued_events));
-    }
-
     static EventLoop& current();
     static EventLoop& current();
 
 
 private:
 private:
@@ -124,20 +119,6 @@ private:
     static void dispatch_signal(int);
     static void dispatch_signal(int);
     static void handle_signal(int);
     static void handle_signal(int);
 
 
-    struct QueuedEvent {
-        AK_MAKE_NONCOPYABLE(QueuedEvent);
-
-    public:
-        QueuedEvent(Object& receiver, NonnullOwnPtr<Event>);
-        QueuedEvent(QueuedEvent&&);
-        ~QueuedEvent() = default;
-
-        WeakPtr<Object> receiver;
-        NonnullOwnPtr<Event> event;
-    };
-
-    Vector<QueuedEvent, 64> m_queued_events;
-    Vector<NonnullRefPtr<Promise<NonnullRefPtr<Object>>>> m_pending_promises;
     static pid_t s_pid;
     static pid_t s_pid;
 
 
     bool m_exit_requested { false };
     bool m_exit_requested { false };

+ 123 - 0
Userland/Libraries/LibCore/ThreadEventQueue.cpp

@@ -0,0 +1,123 @@
+/*
+ * Copyright (c) 2023, Andreas Kling <kling@serenityos.org>
+ *
+ * SPDX-License-Identifier: BSD-2-Clause
+ */
+
+#include <AK/Vector.h>
+#include <LibCore/DeferredInvocationContext.h>
+#include <LibCore/Object.h>
+#include <LibCore/Promise.h>
+#include <LibCore/ThreadEventQueue.h>
+#include <LibThreading/Mutex.h>
+
+namespace Core {
+
+struct ThreadEventQueue::Private {
+    struct QueuedEvent {
+        AK_MAKE_NONCOPYABLE(QueuedEvent);
+
+    public:
+        QueuedEvent(Object& receiver, NonnullOwnPtr<Event> event)
+            : receiver(receiver)
+            , event(move(event))
+        {
+        }
+
+        QueuedEvent(QueuedEvent&& other)
+            : receiver(other.receiver)
+            , event(move(other.event))
+        {
+        }
+
+        ~QueuedEvent() = default;
+
+        WeakPtr<Object> receiver;
+        NonnullOwnPtr<Event> event;
+    };
+
+    Threading::Mutex mutex;
+    Vector<QueuedEvent, 128> queued_events;
+    Vector<NonnullRefPtr<Promise<NonnullRefPtr<Object>>>, 16> pending_promises;
+    bool warned_promise_count { false };
+};
+
+static thread_local ThreadEventQueue* s_current_thread_event_queue;
+
+ThreadEventQueue& ThreadEventQueue::current()
+{
+    if (!s_current_thread_event_queue) {
+        // FIXME: Don't leak these.
+        s_current_thread_event_queue = new ThreadEventQueue;
+    }
+    return *s_current_thread_event_queue;
+}
+
+ThreadEventQueue::ThreadEventQueue()
+    : m_private(make<Private>())
+{
+}
+
+ThreadEventQueue::~ThreadEventQueue() = default;
+
+void ThreadEventQueue::post_event(Core::Object& receiver, NonnullOwnPtr<Core::Event> event)
+{
+    Threading::MutexLocker lock(m_private->mutex);
+    m_private->queued_events.empend(receiver, move(event));
+}
+
+void ThreadEventQueue::add_job(NonnullRefPtr<Promise<NonnullRefPtr<Object>>> promise)
+{
+    Threading::MutexLocker lock(m_private->mutex);
+    m_private->pending_promises.append(move(promise));
+}
+
+size_t ThreadEventQueue::process()
+{
+    decltype(m_private->queued_events) events;
+    {
+        Threading::MutexLocker locker(m_private->mutex);
+        events = move(m_private->queued_events);
+        m_private->pending_promises.remove_all_matching([](auto& job) { return job->is_resolved() || job->is_canceled(); });
+    }
+
+    size_t processed_events = 0;
+    for (size_t i = 0; i < events.size(); ++i) {
+        auto& queued_event = events.at(i);
+        auto receiver = queued_event.receiver.strong_ref();
+        auto& event = *queued_event.event;
+
+        if (!receiver) {
+            switch (event.type()) {
+            case Event::Quit:
+                VERIFY_NOT_REACHED();
+            default:
+                dbgln("ThreadEventQueue::process: Event of type {} with no receiver", event.type());
+                break;
+            }
+        } else if (event.type() == Event::Type::DeferredInvoke) {
+            static_cast<DeferredInvocationEvent&>(event).m_invokee();
+        } else {
+            NonnullRefPtr<Object> protector(*receiver);
+            receiver->dispatch_event(event);
+        }
+        ++processed_events;
+    }
+
+    {
+        Threading::MutexLocker locker(m_private->mutex);
+        if (m_private->pending_promises.size() > 30 && !m_private->warned_promise_count) {
+            m_private->warned_promise_count = true;
+            dbgln("ThreadEventQueue::process: Job queue wasn't designed for this load ({} promises)", m_private->pending_promises.size());
+        }
+    }
+    return processed_events;
+}
+
+bool ThreadEventQueue::has_pending_events() const
+{
+    Threading::MutexLocker locker(m_private->mutex);
+    return !m_private->queued_events.is_empty();
+}
+
+}

+ 44 - 0
Userland/Libraries/LibCore/ThreadEventQueue.h

@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2023, Andreas Kling <kling@serenityos.org>
+ *
+ * SPDX-License-Identifier: BSD-2-Clause
+ */
+
+#pragma once
+
+#include <AK/NonnullOwnPtr.h>
+#include <AK/OwnPtr.h>
+
+namespace Core {
+
+// Per-thread global event queue. This is where events are queued for the EventLoop to process.
+// There is only one ThreadEventQueue per thread, and it is accessed via ThreadEventQueue::current().
+// It is allowed to post events to other threads' event queues.
+class ThreadEventQueue {
+    AK_MAKE_NONCOPYABLE(ThreadEventQueue);
+    AK_MAKE_NONMOVABLE(ThreadEventQueue);
+
+public:
+    static ThreadEventQueue& current();
+
+    // Process all queued events. Returns the number of events that were processed.
+    size_t process();
+
+    // Posts an event to the event queue.
+    void post_event(Object& receiver, NonnullOwnPtr<Event>);
+
+    // Used by Threading::BackgroundAction.
+    void add_job(NonnullRefPtr<Promise<NonnullRefPtr<Object>>>);
+
+    // Returns true if there are events waiting to be flushed.
+    bool has_pending_events() const;
+
+private:
+    ThreadEventQueue();
+    ~ThreadEventQueue();
+
+    struct Private;
+    OwnPtr<Private> m_private;
+};
+
+}