ThreadEventQueue.cpp 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. /*
  2. * Copyright (c) 2023, Andreas Kling <kling@serenityos.org>
  3. *
  4. * SPDX-License-Identifier: BSD-2-Clause
  5. */
  6. #include <AK/Vector.h>
  7. #include <LibCore/DeferredInvocationContext.h>
  8. #include <LibCore/EventLoopImplementation.h>
  9. #include <LibCore/Object.h>
  10. #include <LibCore/Promise.h>
  11. #include <LibCore/ThreadEventQueue.h>
  12. #include <LibThreading/Mutex.h>
  13. namespace Core {
  14. struct ThreadEventQueue::Private {
  15. struct QueuedEvent {
  16. AK_MAKE_NONCOPYABLE(QueuedEvent);
  17. public:
  18. QueuedEvent(Object& receiver, NonnullOwnPtr<Event> event)
  19. : receiver(receiver)
  20. , event(move(event))
  21. {
  22. }
  23. QueuedEvent(QueuedEvent&& other)
  24. : receiver(other.receiver)
  25. , event(move(other.event))
  26. {
  27. }
  28. ~QueuedEvent() = default;
  29. WeakPtr<Object> receiver;
  30. NonnullOwnPtr<Event> event;
  31. };
  32. Threading::Mutex mutex;
  33. Vector<QueuedEvent, 128> queued_events;
  34. Vector<NonnullRefPtr<Promise<NonnullRefPtr<Object>>>, 16> pending_promises;
  35. bool warned_promise_count { false };
  36. };
  37. static thread_local ThreadEventQueue* s_current_thread_event_queue;
  38. ThreadEventQueue& ThreadEventQueue::current()
  39. {
  40. if (!s_current_thread_event_queue) {
  41. // FIXME: Don't leak these.
  42. s_current_thread_event_queue = new ThreadEventQueue;
  43. }
  44. return *s_current_thread_event_queue;
  45. }
  46. ThreadEventQueue::ThreadEventQueue()
  47. : m_private(make<Private>())
  48. {
  49. }
  50. ThreadEventQueue::~ThreadEventQueue() = default;
  51. void ThreadEventQueue::post_event(Core::Object& receiver, NonnullOwnPtr<Core::Event> event)
  52. {
  53. {
  54. Threading::MutexLocker lock(m_private->mutex);
  55. m_private->queued_events.empend(receiver, move(event));
  56. }
  57. Core::EventLoopManager::the().did_post_event();
  58. }
  59. void ThreadEventQueue::add_job(NonnullRefPtr<Promise<NonnullRefPtr<Object>>> promise)
  60. {
  61. Threading::MutexLocker lock(m_private->mutex);
  62. m_private->pending_promises.append(move(promise));
  63. }
  64. size_t ThreadEventQueue::process()
  65. {
  66. decltype(m_private->queued_events) events;
  67. {
  68. Threading::MutexLocker locker(m_private->mutex);
  69. events = move(m_private->queued_events);
  70. m_private->pending_promises.remove_all_matching([](auto& job) { return job->is_resolved() || job->is_canceled(); });
  71. }
  72. size_t processed_events = 0;
  73. for (size_t i = 0; i < events.size(); ++i) {
  74. auto& queued_event = events.at(i);
  75. auto receiver = queued_event.receiver.strong_ref();
  76. auto& event = *queued_event.event;
  77. if (!receiver) {
  78. switch (event.type()) {
  79. case Event::Quit:
  80. VERIFY_NOT_REACHED();
  81. default:
  82. // Receiver disappeared, drop the event on the floor.
  83. break;
  84. }
  85. } else if (event.type() == Event::Type::DeferredInvoke) {
  86. static_cast<DeferredInvocationEvent&>(event).m_invokee();
  87. } else {
  88. NonnullRefPtr<Object> protector(*receiver);
  89. receiver->dispatch_event(event);
  90. }
  91. ++processed_events;
  92. }
  93. {
  94. Threading::MutexLocker locker(m_private->mutex);
  95. if (m_private->pending_promises.size() > 30 && !m_private->warned_promise_count) {
  96. m_private->warned_promise_count = true;
  97. dbgln("ThreadEventQueue::process: Job queue wasn't designed for this load ({} promises)", m_private->pending_promises.size());
  98. }
  99. }
  100. return processed_events;
  101. }
  102. bool ThreadEventQueue::has_pending_events() const
  103. {
  104. Threading::MutexLocker locker(m_private->mutex);
  105. return !m_private->queued_events.is_empty();
  106. }
  107. }