PlaybackStreamPulseAudio.cpp 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. /*
  2. * Copyright (c) 2023, Gregory Bertilson <zaggy1024@gmail.com>
  3. *
  4. * SPDX-License-Identifier: BSD-2-Clause
  5. */
  6. #include "PlaybackStreamPulseAudio.h"
  7. #include <LibCore/ThreadedPromise.h>
  8. namespace Audio {
  9. #define TRY_OR_EXIT_THREAD(expression) \
  10. ({ \
  11. auto&& __temporary_result = (expression); \
  12. if (__temporary_result.is_error()) [[unlikely]] { \
  13. warnln("Failure in PulseAudio control thread: {}", __temporary_result.error().string_literal()); \
  14. internal_state->exit(); \
  15. return 1; \
  16. } \
  17. __temporary_result.release_value(); \
  18. })
  19. ErrorOr<NonnullRefPtr<PlaybackStream>> PlaybackStreamPulseAudio::create(OutputState initial_state, u32 sample_rate, u8 channels, u32 target_latency_ms, AudioDataRequestCallback&& data_request_callback)
  20. {
  21. VERIFY(data_request_callback);
  22. // Create an internal state for the control thread to hold on to.
  23. auto internal_state = TRY(adopt_nonnull_ref_or_enomem(new (nothrow) InternalState()));
  24. auto playback_stream = TRY(adopt_nonnull_ref_or_enomem(new (nothrow) PlaybackStreamPulseAudio(internal_state)));
  25. // Create the control thread and start it.
  26. auto thread = TRY(Threading::Thread::try_create([=, data_request_callback = move(data_request_callback)]() mutable {
  27. auto context = TRY_OR_EXIT_THREAD(PulseAudioContext::instance());
  28. internal_state->set_stream(TRY_OR_EXIT_THREAD(context->create_stream(initial_state, sample_rate, channels, target_latency_ms, [data_request_callback = move(data_request_callback)](PulseAudioStream&, Bytes buffer, size_t sample_count) {
  29. return data_request_callback(buffer, PcmSampleFormat::Float32, sample_count);
  30. })));
  31. // PulseAudio retains the last volume it sets for an application. We want to consistently
  32. // start at 100% volume instead.
  33. TRY_OR_EXIT_THREAD(internal_state->stream()->set_volume(1.0));
  34. internal_state->thread_loop();
  35. return 0;
  36. },
  37. "Audio::PlaybackStream"sv));
  38. internal_state->set_thread(thread);
  39. thread->start();
  40. thread->detach();
  41. return playback_stream;
  42. }
  43. PlaybackStreamPulseAudio::PlaybackStreamPulseAudio(NonnullRefPtr<InternalState> state)
  44. : m_state(move(state))
  45. {
  46. }
  47. PlaybackStreamPulseAudio::~PlaybackStreamPulseAudio()
  48. {
  49. m_state->exit();
  50. }
  51. #define TRY_OR_REJECT(expression, ...) \
  52. ({ \
  53. auto&& __temporary_result = (expression); \
  54. if (__temporary_result.is_error()) [[unlikely]] { \
  55. promise->reject(__temporary_result.release_error()); \
  56. return __VA_ARGS__; \
  57. } \
  58. __temporary_result.release_value(); \
  59. })
  60. void PlaybackStreamPulseAudio::set_underrun_callback(Function<void()> callback)
  61. {
  62. m_state->enqueue([this, callback = move(callback)]() mutable {
  63. m_state->stream()->set_underrun_callback(move(callback));
  64. });
  65. }
  66. NonnullRefPtr<Core::ThreadedPromise<Duration>> PlaybackStreamPulseAudio::resume()
  67. {
  68. auto promise = Core::ThreadedPromise<Duration>::create();
  69. TRY_OR_REJECT(m_state->check_is_running(), promise);
  70. m_state->enqueue([this, promise]() {
  71. TRY_OR_REJECT(m_state->stream()->resume());
  72. promise->resolve(TRY_OR_REJECT(m_state->stream()->total_time_played()));
  73. });
  74. return promise;
  75. }
  76. NonnullRefPtr<Core::ThreadedPromise<void>> PlaybackStreamPulseAudio::drain_buffer_and_suspend()
  77. {
  78. auto promise = Core::ThreadedPromise<void>::create();
  79. TRY_OR_REJECT(m_state->check_is_running(), promise);
  80. m_state->enqueue([this, promise]() {
  81. TRY_OR_REJECT(m_state->stream()->drain_and_suspend());
  82. promise->resolve();
  83. });
  84. return promise;
  85. }
  86. NonnullRefPtr<Core::ThreadedPromise<void>> PlaybackStreamPulseAudio::discard_buffer_and_suspend()
  87. {
  88. auto promise = Core::ThreadedPromise<void>::create();
  89. TRY_OR_REJECT(m_state->check_is_running(), promise);
  90. m_state->enqueue([this, promise]() {
  91. TRY_OR_REJECT(m_state->stream()->flush_and_suspend());
  92. promise->resolve();
  93. });
  94. return promise;
  95. }
  96. ErrorOr<Duration> PlaybackStreamPulseAudio::total_time_played()
  97. {
  98. if (m_state->stream() != nullptr)
  99. return m_state->stream()->total_time_played();
  100. return Duration::zero();
  101. }
  102. NonnullRefPtr<Core::ThreadedPromise<void>> PlaybackStreamPulseAudio::set_volume(double volume)
  103. {
  104. auto promise = Core::ThreadedPromise<void>::create();
  105. TRY_OR_REJECT(m_state->check_is_running(), promise);
  106. m_state->enqueue([this, promise, volume]() {
  107. TRY_OR_REJECT(m_state->stream()->set_volume(volume));
  108. promise->resolve();
  109. });
  110. return promise;
  111. }
  112. ErrorOr<void> PlaybackStreamPulseAudio::InternalState::check_is_running()
  113. {
  114. if (m_exit)
  115. return Error::from_string_literal("PulseAudio control thread loop is not running");
  116. return {};
  117. }
  118. void PlaybackStreamPulseAudio::InternalState::set_thread(NonnullRefPtr<Threading::Thread> const& thread)
  119. {
  120. Threading::MutexLocker locker { m_mutex };
  121. m_thread = thread;
  122. }
  123. void PlaybackStreamPulseAudio::InternalState::set_stream(NonnullRefPtr<PulseAudioStream> const& stream)
  124. {
  125. m_stream = stream;
  126. }
  127. RefPtr<PulseAudioStream> PlaybackStreamPulseAudio::InternalState::stream()
  128. {
  129. return m_stream;
  130. }
  131. void PlaybackStreamPulseAudio::InternalState::enqueue(Function<void()>&& task)
  132. {
  133. Threading::MutexLocker locker { m_mutex };
  134. m_tasks.enqueue(forward<Function<void()>>(task));
  135. m_wake_condition.signal();
  136. }
  137. void PlaybackStreamPulseAudio::InternalState::thread_loop()
  138. {
  139. while (true) {
  140. auto task = [this]() -> Function<void()> {
  141. Threading::MutexLocker locker { m_mutex };
  142. while (m_tasks.is_empty() && !m_exit)
  143. m_wake_condition.wait();
  144. if (m_exit)
  145. return nullptr;
  146. return m_tasks.dequeue();
  147. }();
  148. if (!task) {
  149. VERIFY(m_exit);
  150. break;
  151. }
  152. task();
  153. }
  154. // Stop holding onto our thread so it can be deleted.
  155. Threading::MutexLocker locker { m_mutex };
  156. m_thread = nullptr;
  157. }
  158. void PlaybackStreamPulseAudio::InternalState::exit()
  159. {
  160. m_exit = true;
  161. m_wake_condition.signal();
  162. }
  163. }