ThreadEventQueue.cpp 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. /*
  2. * Copyright (c) 2023, Andreas Kling <kling@serenityos.org>
  3. *
  4. * SPDX-License-Identifier: BSD-2-Clause
  5. */
  6. #include <AK/Vector.h>
  7. #include <LibCore/DeferredInvocationContext.h>
  8. #include <LibCore/EventLoopImplementation.h>
  9. #include <LibCore/Object.h>
  10. #include <LibCore/Promise.h>
  11. #include <LibCore/ThreadEventQueue.h>
  12. #include <LibThreading/Mutex.h>
  13. #include <errno.h>
  14. namespace Core {
  15. struct ThreadEventQueue::Private {
  16. struct QueuedEvent {
  17. AK_MAKE_NONCOPYABLE(QueuedEvent);
  18. public:
  19. QueuedEvent(Object& receiver, NonnullOwnPtr<Event> event)
  20. : receiver(receiver)
  21. , event(move(event))
  22. {
  23. }
  24. QueuedEvent(QueuedEvent&& other)
  25. : receiver(other.receiver)
  26. , event(move(other.event))
  27. {
  28. }
  29. ~QueuedEvent() = default;
  30. WeakPtr<Object> receiver;
  31. NonnullOwnPtr<Event> event;
  32. };
  33. Threading::Mutex mutex;
  34. Vector<QueuedEvent, 128> queued_events;
  35. Vector<NonnullRefPtr<Promise<NonnullRefPtr<Object>>>, 16> pending_promises;
  36. bool warned_promise_count { false };
  37. };
  38. static thread_local ThreadEventQueue* s_current_thread_event_queue;
  39. ThreadEventQueue& ThreadEventQueue::current()
  40. {
  41. if (!s_current_thread_event_queue) {
  42. // FIXME: Don't leak these.
  43. s_current_thread_event_queue = new ThreadEventQueue;
  44. }
  45. return *s_current_thread_event_queue;
  46. }
  47. ThreadEventQueue::ThreadEventQueue()
  48. : m_private(make<Private>())
  49. {
  50. }
  51. ThreadEventQueue::~ThreadEventQueue() = default;
  52. void ThreadEventQueue::post_event(Core::Object& receiver, NonnullOwnPtr<Core::Event> event)
  53. {
  54. {
  55. Threading::MutexLocker lock(m_private->mutex);
  56. m_private->queued_events.empend(receiver, move(event));
  57. }
  58. Core::EventLoopManager::the().did_post_event();
  59. }
  60. void ThreadEventQueue::add_job(NonnullRefPtr<Promise<NonnullRefPtr<Object>>> promise)
  61. {
  62. Threading::MutexLocker lock(m_private->mutex);
  63. m_private->pending_promises.append(move(promise));
  64. }
  65. void ThreadEventQueue::cancel_all_pending_jobs()
  66. {
  67. Threading::MutexLocker lock(m_private->mutex);
  68. for (auto const& promise : m_private->pending_promises)
  69. promise->cancel(Error::from_errno(ECANCELED));
  70. m_private->pending_promises.clear();
  71. }
  72. size_t ThreadEventQueue::process()
  73. {
  74. decltype(m_private->queued_events) events;
  75. {
  76. Threading::MutexLocker locker(m_private->mutex);
  77. events = move(m_private->queued_events);
  78. m_private->pending_promises.remove_all_matching([](auto& job) { return job->is_resolved() || job->is_canceled(); });
  79. }
  80. size_t processed_events = 0;
  81. for (size_t i = 0; i < events.size(); ++i) {
  82. auto& queued_event = events.at(i);
  83. auto receiver = queued_event.receiver.strong_ref();
  84. auto& event = *queued_event.event;
  85. if (!receiver) {
  86. switch (event.type()) {
  87. case Event::Quit:
  88. VERIFY_NOT_REACHED();
  89. default:
  90. // Receiver disappeared, drop the event on the floor.
  91. break;
  92. }
  93. } else if (event.type() == Event::Type::DeferredInvoke) {
  94. static_cast<DeferredInvocationEvent&>(event).m_invokee();
  95. } else {
  96. NonnullRefPtr<Object> protector(*receiver);
  97. receiver->dispatch_event(event);
  98. }
  99. ++processed_events;
  100. }
  101. {
  102. Threading::MutexLocker locker(m_private->mutex);
  103. if (m_private->pending_promises.size() > 30 && !m_private->warned_promise_count) {
  104. m_private->warned_promise_count = true;
  105. dbgln("ThreadEventQueue::process: Job queue wasn't designed for this load ({} promises)", m_private->pending_promises.size());
  106. }
  107. }
  108. return processed_events;
  109. }
  110. bool ThreadEventQueue::has_pending_events() const
  111. {
  112. Threading::MutexLocker locker(m_private->mutex);
  113. return !m_private->queued_events.is_empty();
  114. }
  115. }