ThreadEventQueue.cpp 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. /*
  2. * Copyright (c) 2023, Andreas Kling <kling@serenityos.org>
  3. *
  4. * SPDX-License-Identifier: BSD-2-Clause
  5. */
  6. #include <AK/Vector.h>
  7. #include <LibCore/DeferredInvocationContext.h>
  8. #include <LibCore/EventLoopImplementation.h>
  9. #include <LibCore/EventReceiver.h>
  10. #include <LibCore/Promise.h>
  11. #include <LibCore/ThreadEventQueue.h>
  12. #include <LibThreading/Mutex.h>
  13. #include <errno.h>
  14. #include <pthread.h>
  15. namespace Core {
  16. struct ThreadEventQueue::Private {
  17. struct QueuedEvent {
  18. AK_MAKE_NONCOPYABLE(QueuedEvent);
  19. AK_MAKE_DEFAULT_MOVABLE(QueuedEvent);
  20. public:
  21. QueuedEvent(EventReceiver& receiver, NonnullOwnPtr<Event> event)
  22. : receiver(receiver)
  23. , event(move(event))
  24. {
  25. }
  26. ~QueuedEvent() = default;
  27. WeakPtr<EventReceiver> receiver;
  28. NonnullOwnPtr<Event> event;
  29. };
  30. Threading::Mutex mutex;
  31. Vector<QueuedEvent, 128> queued_events;
  32. Vector<NonnullRefPtr<Promise<NonnullRefPtr<EventReceiver>>>, 16> pending_promises;
  33. bool warned_promise_count { false };
  34. };
  35. static pthread_key_t s_current_thread_event_queue_key;
  36. static pthread_once_t s_current_thread_event_queue_key_once = PTHREAD_ONCE_INIT;
  37. ThreadEventQueue& ThreadEventQueue::current()
  38. {
  39. pthread_once(&s_current_thread_event_queue_key_once, [] {
  40. pthread_key_create(&s_current_thread_event_queue_key, [](void* value) {
  41. if (value)
  42. delete static_cast<ThreadEventQueue*>(value);
  43. });
  44. });
  45. auto* ptr = static_cast<ThreadEventQueue*>(pthread_getspecific(s_current_thread_event_queue_key));
  46. if (!ptr) {
  47. ptr = new ThreadEventQueue;
  48. pthread_setspecific(s_current_thread_event_queue_key, ptr);
  49. }
  50. return *ptr;
  51. }
  52. ThreadEventQueue::ThreadEventQueue()
  53. : m_private(make<Private>())
  54. {
  55. }
  56. ThreadEventQueue::~ThreadEventQueue() = default;
  57. void ThreadEventQueue::post_event(Core::EventReceiver& receiver, NonnullOwnPtr<Core::Event> event)
  58. {
  59. {
  60. Threading::MutexLocker lock(m_private->mutex);
  61. m_private->queued_events.empend(receiver, move(event));
  62. }
  63. Core::EventLoopManager::the().did_post_event();
  64. }
  65. void ThreadEventQueue::add_job(NonnullRefPtr<Promise<NonnullRefPtr<EventReceiver>>> promise)
  66. {
  67. Threading::MutexLocker lock(m_private->mutex);
  68. m_private->pending_promises.append(move(promise));
  69. }
  70. void ThreadEventQueue::cancel_all_pending_jobs()
  71. {
  72. Threading::MutexLocker lock(m_private->mutex);
  73. for (auto const& promise : m_private->pending_promises)
  74. promise->reject(Error::from_errno(ECANCELED));
  75. m_private->pending_promises.clear();
  76. }
  77. size_t ThreadEventQueue::process()
  78. {
  79. decltype(m_private->queued_events) events;
  80. {
  81. Threading::MutexLocker locker(m_private->mutex);
  82. events = move(m_private->queued_events);
  83. m_private->pending_promises.remove_all_matching([](auto& job) { return job->is_resolved() || job->is_rejected(); });
  84. }
  85. size_t processed_events = 0;
  86. for (size_t i = 0; i < events.size(); ++i) {
  87. auto& queued_event = events.at(i);
  88. auto receiver = queued_event.receiver.strong_ref();
  89. auto& event = *queued_event.event;
  90. if (!receiver) {
  91. switch (event.type()) {
  92. case Event::Quit:
  93. VERIFY_NOT_REACHED();
  94. default:
  95. // Receiver disappeared, drop the event on the floor.
  96. break;
  97. }
  98. } else if (event.type() == Event::Type::DeferredInvoke) {
  99. static_cast<DeferredInvocationEvent&>(event).m_invokee();
  100. } else {
  101. NonnullRefPtr<EventReceiver> protector(*receiver);
  102. receiver->dispatch_event(event);
  103. }
  104. ++processed_events;
  105. }
  106. {
  107. Threading::MutexLocker locker(m_private->mutex);
  108. if (m_private->pending_promises.size() > 30 && !m_private->warned_promise_count) {
  109. m_private->warned_promise_count = true;
  110. dbgln("ThreadEventQueue::process: Job queue wasn't designed for this load ({} promises)", m_private->pending_promises.size());
  111. }
  112. }
  113. return processed_events;
  114. }
  115. bool ThreadEventQueue::has_pending_events() const
  116. {
  117. Threading::MutexLocker locker(m_private->mutex);
  118. return !m_private->queued_events.is_empty();
  119. }
  120. }