ThreadEventQueue.cpp 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. /*
  2. * Copyright (c) 2023, Andreas Kling <kling@serenityos.org>
  3. *
  4. * SPDX-License-Identifier: BSD-2-Clause
  5. */
  6. #include <AK/Vector.h>
  7. #include <LibCore/DeferredInvocationContext.h>
  8. #include <LibCore/Object.h>
  9. #include <LibCore/Promise.h>
  10. #include <LibCore/ThreadEventQueue.h>
  11. #include <LibThreading/Mutex.h>
  12. namespace Core {
  13. struct ThreadEventQueue::Private {
  14. struct QueuedEvent {
  15. AK_MAKE_NONCOPYABLE(QueuedEvent);
  16. public:
  17. QueuedEvent(Object& receiver, NonnullOwnPtr<Event> event)
  18. : receiver(receiver)
  19. , event(move(event))
  20. {
  21. }
  22. QueuedEvent(QueuedEvent&& other)
  23. : receiver(other.receiver)
  24. , event(move(other.event))
  25. {
  26. }
  27. ~QueuedEvent() = default;
  28. WeakPtr<Object> receiver;
  29. NonnullOwnPtr<Event> event;
  30. };
  31. Threading::Mutex mutex;
  32. Vector<QueuedEvent, 128> queued_events;
  33. Vector<NonnullRefPtr<Promise<NonnullRefPtr<Object>>>, 16> pending_promises;
  34. bool warned_promise_count { false };
  35. };
  36. static thread_local ThreadEventQueue* s_current_thread_event_queue;
  37. ThreadEventQueue& ThreadEventQueue::current()
  38. {
  39. if (!s_current_thread_event_queue) {
  40. // FIXME: Don't leak these.
  41. s_current_thread_event_queue = new ThreadEventQueue;
  42. }
  43. return *s_current_thread_event_queue;
  44. }
  45. ThreadEventQueue::ThreadEventQueue()
  46. : m_private(make<Private>())
  47. {
  48. }
  49. ThreadEventQueue::~ThreadEventQueue() = default;
  50. void ThreadEventQueue::post_event(Core::Object& receiver, NonnullOwnPtr<Core::Event> event)
  51. {
  52. {
  53. Threading::MutexLocker lock(m_private->mutex);
  54. m_private->queued_events.empend(receiver, move(event));
  55. }
  56. Core::EventLoop::current().did_post_event({});
  57. }
  58. void ThreadEventQueue::add_job(NonnullRefPtr<Promise<NonnullRefPtr<Object>>> promise)
  59. {
  60. Threading::MutexLocker lock(m_private->mutex);
  61. m_private->pending_promises.append(move(promise));
  62. }
  63. size_t ThreadEventQueue::process()
  64. {
  65. decltype(m_private->queued_events) events;
  66. {
  67. Threading::MutexLocker locker(m_private->mutex);
  68. events = move(m_private->queued_events);
  69. m_private->pending_promises.remove_all_matching([](auto& job) { return job->is_resolved() || job->is_canceled(); });
  70. }
  71. size_t processed_events = 0;
  72. for (size_t i = 0; i < events.size(); ++i) {
  73. auto& queued_event = events.at(i);
  74. auto receiver = queued_event.receiver.strong_ref();
  75. auto& event = *queued_event.event;
  76. if (!receiver) {
  77. switch (event.type()) {
  78. case Event::Quit:
  79. VERIFY_NOT_REACHED();
  80. default:
  81. // Receiver disappeared, drop the event on the floor.
  82. break;
  83. }
  84. } else if (event.type() == Event::Type::DeferredInvoke) {
  85. static_cast<DeferredInvocationEvent&>(event).m_invokee();
  86. } else {
  87. NonnullRefPtr<Object> protector(*receiver);
  88. receiver->dispatch_event(event);
  89. }
  90. ++processed_events;
  91. }
  92. {
  93. Threading::MutexLocker locker(m_private->mutex);
  94. if (m_private->pending_promises.size() > 30 && !m_private->warned_promise_count) {
  95. m_private->warned_promise_count = true;
  96. dbgln("ThreadEventQueue::process: Job queue wasn't designed for this load ({} promises)", m_private->pending_promises.size());
  97. }
  98. }
  99. return processed_events;
  100. }
  101. bool ThreadEventQueue::has_pending_events() const
  102. {
  103. Threading::MutexLocker locker(m_private->mutex);
  104. return !m_private->queued_events.is_empty();
  105. }
  106. }