ThreadedPromise.h 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. /*
  2. * Copyright (c) 2021, Kyle Pereira <hey@xylepereira.me>
  3. * Copyright (c) 2022, kleines Filmröllchen <filmroellchen@serenityos.org>
  4. * Copyright (c) 2021-2023, Ali Mohammad Pur <mpfard@serenityos.org>
  5. * Copyright (c) 2023, Gregory Bertilson <zaggy1024@gmail.com>
  6. *
  7. * SPDX-License-Identifier: BSD-2-Clause
  8. */
  9. #pragma once
  10. #include <AK/AtomicRefCounted.h>
  11. #include <AK/Concepts.h>
  12. #include <LibCore/EventLoop.h>
  13. #include <LibCore/EventReceiver.h>
  14. #include <LibThreading/Mutex.h>
  15. namespace Core {
  16. template<typename TResult, typename TError>
  17. class ThreadedPromise
  18. : public AtomicRefCounted<ThreadedPromise<TResult, TError>> {
  19. public:
  20. static NonnullRefPtr<ThreadedPromise<TResult, TError>> create()
  21. {
  22. return adopt_ref(*new ThreadedPromise<TResult, TError>());
  23. }
  24. using ResultType = Conditional<IsSame<TResult, void>, Empty, TResult>;
  25. using ErrorType = TError;
  26. void resolve(ResultType&& result)
  27. {
  28. when_error_handler_is_ready([self = NonnullRefPtr(*this), result = move(result)]() mutable {
  29. if (self->m_resolution_handler) {
  30. auto handler_result = self->m_resolution_handler(forward<ResultType>(result));
  31. if (handler_result.is_error())
  32. self->m_rejection_handler(handler_result.release_error());
  33. self->m_has_completed = true;
  34. }
  35. });
  36. }
  37. void resolve()
  38. requires IsSame<ResultType, Empty>
  39. {
  40. resolve(Empty());
  41. }
  42. void reject(ErrorType&& error)
  43. {
  44. when_error_handler_is_ready([this, error = move(error)]() mutable {
  45. m_rejection_handler(forward<ErrorType>(error));
  46. m_has_completed = true;
  47. });
  48. }
  49. void reject(ErrorType const& error)
  50. requires IsTriviallyCopyable<ErrorType>
  51. {
  52. reject(ErrorType(error));
  53. }
  54. bool has_completed()
  55. {
  56. Threading::MutexLocker locker { m_mutex };
  57. return m_has_completed;
  58. }
  59. void await()
  60. {
  61. while (!has_completed())
  62. Core::EventLoop::current().pump(EventLoop::WaitMode::PollForEvents);
  63. }
  64. // Set the callback to be called when the promise is resolved. A rejection callback
  65. // must also be provided before any callback will be called.
  66. template<CallableAs<ErrorOr<void>, ResultType&&> ResolvedHandler>
  67. ThreadedPromise& when_resolved(ResolvedHandler handler)
  68. {
  69. Threading::MutexLocker locker { m_mutex };
  70. VERIFY(!m_resolution_handler);
  71. m_resolution_handler = move(handler);
  72. return *this;
  73. }
  74. template<CallableAs<void, ResultType&&> ResolvedHandler>
  75. ThreadedPromise& when_resolved(ResolvedHandler handler)
  76. {
  77. return when_resolved([handler = move(handler)](ResultType&& result) -> ErrorOr<void> {
  78. handler(forward<ResultType>(result));
  79. return {};
  80. });
  81. }
  82. template<CallableAs<ErrorOr<void>> ResolvedHandler>
  83. ThreadedPromise& when_resolved(ResolvedHandler handler)
  84. {
  85. return when_resolved([handler = move(handler)](ResultType&&) -> ErrorOr<void> {
  86. return handler();
  87. });
  88. }
  89. template<CallableAs<void> ResolvedHandler>
  90. ThreadedPromise& when_resolved(ResolvedHandler handler)
  91. {
  92. return when_resolved([handler = move(handler)](ResultType&&) -> ErrorOr<void> {
  93. handler();
  94. return {};
  95. });
  96. }
  97. // Set the callback to be called when the promise is rejected. Setting this callback
  98. // will cause the promise fulfillment to be ready to be handled.
  99. template<CallableAs<void, ErrorType&&> RejectedHandler>
  100. ThreadedPromise& when_rejected(RejectedHandler when_rejected = [](ErrorType&) {})
  101. {
  102. Threading::MutexLocker locker { m_mutex };
  103. VERIFY(!m_rejection_handler);
  104. m_rejection_handler = move(when_rejected);
  105. return *this;
  106. }
  107. template<typename T, CallableAs<NonnullRefPtr<ThreadedPromise<T, ErrorType>>, ResultType&&> ChainedResolution>
  108. NonnullRefPtr<ThreadedPromise<T, ErrorType>> chain_promise(ChainedResolution chained_resolution)
  109. {
  110. auto new_promise = ThreadedPromise<T, ErrorType>::create();
  111. when_resolved([=, chained_resolution = move(chained_resolution)](ResultType&& result) mutable -> ErrorOr<void> {
  112. chained_resolution(forward<ResultType>(result))
  113. ->when_resolved([=](auto&& new_result) { new_promise->resolve(move(new_result)); })
  114. .when_rejected([=](ErrorType&& error) { new_promise->reject(move(error)); });
  115. return {};
  116. });
  117. when_rejected([=](ErrorType&& error) { new_promise->reject(move(error)); });
  118. return new_promise;
  119. }
  120. template<typename T, CallableAs<ErrorOr<T, ErrorType>, ResultType&&> MappingFunction>
  121. NonnullRefPtr<ThreadedPromise<T, ErrorType>> map(MappingFunction mapping_function)
  122. {
  123. auto new_promise = ThreadedPromise<T, ErrorType>::create();
  124. when_resolved([=, mapping_function = move(mapping_function)](ResultType&& result) -> ErrorOr<void> {
  125. new_promise->resolve(TRY(mapping_function(forward<ResultType>(result))));
  126. return {};
  127. });
  128. when_rejected([=](ErrorType&& error) { new_promise->reject(move(error)); });
  129. return new_promise;
  130. }
  131. private:
  132. template<typename F>
  133. static void deferred_handler_check(NonnullRefPtr<ThreadedPromise> self, F&& function)
  134. {
  135. Threading::MutexLocker locker { self->m_mutex };
  136. if (self->m_rejection_handler) {
  137. function();
  138. return;
  139. }
  140. EventLoop::current().deferred_invoke([self, function = forward<F>(function)]() mutable {
  141. deferred_handler_check(self, move(function));
  142. });
  143. }
  144. template<typename F>
  145. void when_error_handler_is_ready(F function)
  146. {
  147. if (EventLoop::is_running()) {
  148. deferred_handler_check(NonnullRefPtr(*this), move(function));
  149. } else {
  150. // NOTE: Handlers should always be set almost immediately, so we can expect this
  151. // to spin extremely briefly. Therefore, sleeping the thread should not be
  152. // necessary.
  153. while (true) {
  154. Threading::MutexLocker locker { m_mutex };
  155. if (m_rejection_handler)
  156. break;
  157. }
  158. VERIFY(m_rejection_handler);
  159. function();
  160. }
  161. }
  162. ThreadedPromise() = default;
  163. ThreadedPromise(EventReceiver* parent)
  164. : EventReceiver(parent)
  165. {
  166. }
  167. Function<ErrorOr<void>(ResultType&&)> m_resolution_handler;
  168. Function<void(ErrorType&&)> m_rejection_handler;
  169. Threading::Mutex m_mutex;
  170. bool m_has_completed;
  171. };
  172. }