Przeglądaj źródła

LibAudio: Create a playback class with a PulseAudio implementation

This adds an abstract `Audio::PlaybackStream` class to allow cross-
platform audio playback to be done in an opaque manner by applications
in both Serenity and Lagom.

Currently, the only supported audio API is PulseAudio, but a Serenity
implementation should be added shortly as well.
Zaggy1024 2 lat temu
rodzic
commit
bc4d4f0f95

+ 2 - 0
Meta/Lagom/CMakeLists.txt

@@ -143,6 +143,8 @@ if (ENABLE_LAGOM_LADYBIRD AND (ENABLE_FUZZERS OR ENABLE_COMPILER_EXPLORER_BUILD)
     )
 endif()
 
+CHECK_INCLUDE_FILE(pulse/pulseaudio.h HAVE_PULSEAUDIO)
+
 if (CMAKE_CXX_COMPILER_ID MATCHES "Clang$")
     add_compile_options(-Wno-overloaded-virtual)
     # FIXME: Re-enable this check when the warning stops triggering, or document why we can't stop it from triggering.

+ 13 - 0
Userland/Libraries/LibAudio/CMakeLists.txt

@@ -8,6 +8,7 @@ set(SOURCES
     WavWriter.cpp
     Metadata.cpp
     MP3Loader.cpp
+    PlaybackStream.cpp
     QOALoader.cpp
     QOATypes.cpp
     UserSampleQueue.cpp
@@ -25,5 +26,17 @@ if (SERENITYOS)
     )
 endif()
 
+if (HAVE_PULSEAUDIO)
+    list(APPEND SOURCES
+        PlaybackStreamPulseAudio.cpp
+        PulseAudioWrappers.cpp
+    )
+endif()
+
 serenity_lib(LibAudio audio)
 target_link_libraries(LibAudio PRIVATE LibCore LibIPC LibThreading LibUnicode LibCrypto)
+
+if (HAVE_PULSEAUDIO)
+    target_link_libraries(LibAudio PRIVATE pulse)
+    target_compile_definitions(LibAudio PRIVATE HAVE_PULSEAUDIO=1)
+endif()

+ 1 - 0
Userland/Libraries/LibAudio/Forward.h

@@ -10,6 +10,7 @@ namespace Audio {
 
 class ConnectionToServer;
 class Loader;
+class PlaybackStream;
 struct Sample;
 
 template<typename SampleType>

+ 39 - 0
Userland/Libraries/LibAudio/PlaybackStream.cpp

@@ -0,0 +1,39 @@
+/*
+ * Copyright (c) 2023, Gregory Bertilson <zaggy1024@gmail.com>
+ *
+ * SPDX-License-Identifier: BSD-2-Clause
+ */
+
+#include "PlaybackStream.h"
+
+#include <LibCore/ThreadedPromise.h>
+
+#if defined(HAVE_PULSEAUDIO)
+#    include <LibAudio/PlaybackStreamPulseAudio.h>
+#endif
+
+namespace Audio {
+
+#define TRY_OR_REJECT_AND_STOP(expression, promise)                \
+    ({                                                             \
+        auto&& __temporary_result = (expression);                  \
+        if (__temporary_result.is_error()) [[unlikely]] {          \
+            (promise)->reject(__temporary_result.release_error()); \
+            return 1;                                              \
+        }                                                          \
+        __temporary_result.release_value();                        \
+    })
+
+ErrorOr<NonnullRefPtr<PlaybackStream>> PlaybackStream::create(OutputState initial_output_state, u32 sample_rate, u8 channels, u32 target_latency_ms, AudioDataRequestCallback&& data_request_callback)
+{
+    VERIFY(data_request_callback);
+    // Create the platform-specific implementation for this stream.
+#if defined(HAVE_PULSEAUDIO)
+    return PlaybackStreamPulseAudio::create(initial_output_state, sample_rate, channels, target_latency_ms, move(data_request_callback));
+#else
+    (void)initial_output_state, (void)sample_rate, (void)channels, (void)target_latency_ms;
+    return Error::from_string_literal("Audio output is not available for this platform");
+#endif
+}
+
+}

+ 72 - 0
Userland/Libraries/LibAudio/PlaybackStream.h

@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2023, Gregory Bertilson <zaggy1024@gmail.com>
+ *
+ * SPDX-License-Identifier: BSD-2-Clause
+ */
+
+#pragma once
+
+#include <AK/AtomicRefCounted.h>
+#include <AK/Function.h>
+#include <AK/Queue.h>
+#include <AK/Time.h>
+#include <LibAudio/SampleFormats.h>
+#include <LibCore/Forward.h>
+#include <LibThreading/ConditionVariable.h>
+#include <LibThreading/MutexProtected.h>
+#include <LibThreading/Thread.h>
+
+namespace Audio {
+
+enum class OutputState {
+    Playing,
+    Suspended,
+};
+
+// This class implements high-level audio playback behavior. It is primarily intended as an abstract cross-platform
+// interface to be used by Ladybird (and its dependent libraries) for playback.
+//
+// The interface is designed to be simple and robust. All control functions can be called safely from any thread.
+// Timing information provided by the class should allow audio timestamps to be tracked with the best accuracy possible.
+class PlaybackStream : public AtomicRefCounted<PlaybackStream> {
+public:
+    using AudioDataRequestCallback = Function<ReadonlyBytes(Bytes buffer, PcmSampleFormat format, size_t sample_count)>;
+
+    // Creates a new audio Output class.
+    //
+    // The initial_output_state parameter determines whether it will begin playback immediately.
+    //
+    // The AudioDataRequestCallback will be called when the Output needs more audio data to fill
+    // its buffers and continue playback.
+    static ErrorOr<NonnullRefPtr<PlaybackStream>> create(OutputState initial_output_state, u32 sample_rate, u8 channels, u32 target_latency_ms, AudioDataRequestCallback&&);
+
+    virtual ~PlaybackStream() = default;
+
+    // Sets the callback function that will be fired whenever the server consumes more data than is made available
+    // by the data request callback. It will fire when either the data request runs too long, or the data request
+    // returns no data. If all the input data has been exhausted and this event fires, that means that playback
+    // has ended.
+    virtual void set_underrun_callback(Function<void()>) = 0;
+
+    // Resume playback from the suspended state, requesting new data for audio buffers as soon as possible.
+    //
+    // The value provided to the promise resolution will match the `total_time_played()` at the exact moment that
+    // the stream was resumed.
+    virtual NonnullRefPtr<Core::ThreadedPromise<Duration>> resume() = 0;
+    // Completes playback of any buffered audio data and then suspends playback and buffering.
+    virtual NonnullRefPtr<Core::ThreadedPromise<void>> drain_buffer_and_suspend() = 0;
+    // Drops any buffered audio data and then suspends playback and buffering. This can used be to stop playback
+    // as soon as possible instead of waiting for remaining audio to play.
+    virtual NonnullRefPtr<Core::ThreadedPromise<void>> discard_buffer_and_suspend() = 0;
+
+    // Returns a accurate monotonically-increasing time duration that is based on the number of samples that have
+    // been played by the output device. The value is interpolated and takes into account latency to the speakers
+    // whenever possible.
+    //
+    // This function should be able to run from any thread safely.
+    virtual ErrorOr<Duration> total_time_played() = 0;
+
+    virtual NonnullRefPtr<Core::ThreadedPromise<void>> set_volume(double volume) = 0;
+};
+
+}

+ 192 - 0
Userland/Libraries/LibAudio/PlaybackStreamPulseAudio.cpp

@@ -0,0 +1,192 @@
+/*
+ * Copyright (c) 2023, Gregory Bertilson <zaggy1024@gmail.com>
+ *
+ * SPDX-License-Identifier: BSD-2-Clause
+ */
+
+#include "PlaybackStreamPulseAudio.h"
+
+#include <LibCore/ThreadedPromise.h>
+
+namespace Audio {
+
+#define TRY_OR_EXIT_THREAD(expression)                                                                       \
+    ({                                                                                                       \
+        auto&& __temporary_result = (expression);                                                            \
+        if (__temporary_result.is_error()) [[unlikely]] {                                                    \
+            warnln("Failure in PulseAudio control thread: {}", __temporary_result.error().string_literal()); \
+            internal_state->exit();                                                                          \
+            return 1;                                                                                        \
+        }                                                                                                    \
+        __temporary_result.release_value();                                                                  \
+    })
+
+ErrorOr<NonnullRefPtr<PlaybackStream>> PlaybackStreamPulseAudio::create(OutputState initial_state, u32 sample_rate, u8 channels, u32 target_latency_ms, AudioDataRequestCallback&& data_request_callback)
+{
+    VERIFY(data_request_callback);
+
+    // Create an internal state for the control thread to hold on to.
+    auto internal_state = TRY(adopt_nonnull_ref_or_enomem(new (nothrow) InternalState()));
+    auto playback_stream = TRY(adopt_nonnull_ref_or_enomem(new (nothrow) PlaybackStreamPulseAudio(internal_state)));
+
+    // Create the control thread and start it.
+    auto thread = TRY(Threading::Thread::try_create([=, data_request_callback = move(data_request_callback)]() mutable {
+        auto context = TRY_OR_EXIT_THREAD(PulseAudioContext::instance());
+        internal_state->set_stream(TRY_OR_EXIT_THREAD(context->create_stream(initial_state, sample_rate, channels, target_latency_ms, [data_request_callback = move(data_request_callback)](PulseAudioStream&, Bytes buffer, size_t sample_count) {
+            return data_request_callback(buffer, PcmSampleFormat::Float32, sample_count);
+        })));
+
+        // PulseAudio retains the last volume it sets for an application. We want to consistently
+        // start at 100% volume instead.
+        TRY_OR_EXIT_THREAD(internal_state->stream()->set_volume(1.0));
+
+        internal_state->thread_loop();
+        return 0;
+    },
+        "Audio::PlaybackStream"sv));
+
+    internal_state->set_thread(thread);
+    thread->start();
+    thread->detach();
+    return playback_stream;
+}
+
+PlaybackStreamPulseAudio::PlaybackStreamPulseAudio(NonnullRefPtr<InternalState> state)
+    : m_state(move(state))
+{
+}
+
+PlaybackStreamPulseAudio::~PlaybackStreamPulseAudio()
+{
+    m_state->exit();
+}
+
+#define TRY_OR_REJECT(expression, ...)                           \
+    ({                                                           \
+        auto&& __temporary_result = (expression);                \
+        if (__temporary_result.is_error()) [[unlikely]] {        \
+            promise->reject(__temporary_result.release_error()); \
+            return __VA_ARGS__;                                  \
+        }                                                        \
+        __temporary_result.release_value();                      \
+    })
+
+void PlaybackStreamPulseAudio::set_underrun_callback(Function<void()> callback)
+{
+    m_state->enqueue([this, callback = move(callback)]() mutable {
+        m_state->stream()->set_underrun_callback(move(callback));
+    });
+}
+
+NonnullRefPtr<Core::ThreadedPromise<Duration>> PlaybackStreamPulseAudio::resume()
+{
+    auto promise = Core::ThreadedPromise<Duration>::create();
+    TRY_OR_REJECT(m_state->check_is_running(), promise);
+    m_state->enqueue([this, promise]() {
+        TRY_OR_REJECT(m_state->stream()->resume());
+        promise->resolve(TRY_OR_REJECT(m_state->stream()->total_time_played()));
+    });
+    return promise;
+}
+
+NonnullRefPtr<Core::ThreadedPromise<void>> PlaybackStreamPulseAudio::drain_buffer_and_suspend()
+{
+    auto promise = Core::ThreadedPromise<void>::create();
+    TRY_OR_REJECT(m_state->check_is_running(), promise);
+    m_state->enqueue([this, promise]() {
+        TRY_OR_REJECT(m_state->stream()->drain_and_suspend());
+        promise->resolve();
+    });
+    return promise;
+}
+
+NonnullRefPtr<Core::ThreadedPromise<void>> PlaybackStreamPulseAudio::discard_buffer_and_suspend()
+{
+    auto promise = Core::ThreadedPromise<void>::create();
+    TRY_OR_REJECT(m_state->check_is_running(), promise);
+    m_state->enqueue([this, promise]() {
+        TRY_OR_REJECT(m_state->stream()->flush_and_suspend());
+        promise->resolve();
+    });
+    return promise;
+}
+
+ErrorOr<Duration> PlaybackStreamPulseAudio::total_time_played()
+{
+    if (m_state->stream() != nullptr)
+        return m_state->stream()->total_time_played();
+    return Duration::zero();
+}
+
+NonnullRefPtr<Core::ThreadedPromise<void>> PlaybackStreamPulseAudio::set_volume(double volume)
+{
+    auto promise = Core::ThreadedPromise<void>::create();
+    TRY_OR_REJECT(m_state->check_is_running(), promise);
+    m_state->enqueue([this, promise, volume]() {
+        TRY_OR_REJECT(m_state->stream()->set_volume(volume));
+        promise->resolve();
+    });
+    return promise;
+}
+
+ErrorOr<void> PlaybackStreamPulseAudio::InternalState::check_is_running()
+{
+    if (m_exit)
+        return Error::from_string_literal("PulseAudio control thread loop is not running");
+    return {};
+}
+
+void PlaybackStreamPulseAudio::InternalState::set_thread(NonnullRefPtr<Threading::Thread> const& thread)
+{
+    Threading::MutexLocker locker { m_mutex };
+    m_thread = thread;
+}
+
+void PlaybackStreamPulseAudio::InternalState::set_stream(NonnullRefPtr<PulseAudioStream> const& stream)
+{
+    m_stream = stream;
+}
+
+RefPtr<PulseAudioStream> PlaybackStreamPulseAudio::InternalState::stream()
+{
+    return m_stream;
+}
+
+void PlaybackStreamPulseAudio::InternalState::enqueue(Function<void()>&& task)
+{
+    Threading::MutexLocker locker { m_mutex };
+    m_tasks.enqueue(forward<Function<void()>>(task));
+    m_wake_condition.signal();
+}
+
+void PlaybackStreamPulseAudio::InternalState::thread_loop()
+{
+    while (true) {
+        auto task = [this]() -> Function<void()> {
+            Threading::MutexLocker locker { m_mutex };
+
+            while (m_tasks.is_empty() && !m_exit)
+                m_wake_condition.wait();
+            if (m_exit)
+                return nullptr;
+            return m_tasks.dequeue();
+        }();
+        if (!task) {
+            VERIFY(m_exit);
+            break;
+        }
+        task();
+    }
+
+    // Stop holding onto our thread so it can be deleted.
+    Threading::MutexLocker locker { m_mutex };
+    m_thread = nullptr;
+}
+
+void PlaybackStreamPulseAudio::InternalState::exit()
+{
+    m_exit = true;
+    m_wake_condition.signal();
+}
+
+}

+ 62 - 0
Userland/Libraries/LibAudio/PlaybackStreamPulseAudio.h

@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2023, Gregory Bertilson <zaggy1024@gmail.com>
+ *
+ * SPDX-License-Identifier: BSD-2-Clause
+ */
+
+#pragma once
+
+#include <LibAudio/PlaybackStream.h>
+#include <LibAudio/PulseAudioWrappers.h>
+
+namespace Audio {
+
+class PlaybackStreamPulseAudio final
+    : public PlaybackStream {
+public:
+    static ErrorOr<NonnullRefPtr<PlaybackStream>> create(OutputState initial_state, u32 sample_rate, u8 channels, u32 target_latency_ms, AudioDataRequestCallback&& data_request_callback);
+
+    virtual void set_underrun_callback(Function<void()>) override;
+
+    virtual NonnullRefPtr<Core::ThreadedPromise<Duration>> resume() override;
+    virtual NonnullRefPtr<Core::ThreadedPromise<void>> drain_buffer_and_suspend() override;
+    virtual NonnullRefPtr<Core::ThreadedPromise<void>> discard_buffer_and_suspend() override;
+
+    virtual ErrorOr<Duration> total_time_played() override;
+
+    virtual NonnullRefPtr<Core::ThreadedPromise<void>> set_volume(double) override;
+
+private:
+    // This struct is kept alive until the control thread exits to prevent a use-after-free without blocking on
+    // the UI thread.
+    class InternalState : public AtomicRefCounted<InternalState> {
+    public:
+        void set_thread(NonnullRefPtr<Threading::Thread> const&);
+
+        void set_stream(NonnullRefPtr<PulseAudioStream> const&);
+        RefPtr<PulseAudioStream> stream();
+
+        void enqueue(Function<void()>&&);
+        void thread_loop();
+        ErrorOr<void> check_is_running();
+        void exit();
+
+    private:
+        RefPtr<PulseAudioStream> m_stream { nullptr };
+
+        Queue<Function<void()>> m_tasks;
+        Threading::Mutex m_mutex;
+        Threading::ConditionVariable m_wake_condition { m_mutex };
+
+        Atomic<bool> m_exit { false };
+
+        RefPtr<Threading::Thread> m_thread { nullptr };
+    };
+
+    PlaybackStreamPulseAudio(NonnullRefPtr<InternalState>);
+    ~PlaybackStreamPulseAudio();
+
+    RefPtr<InternalState> m_state;
+};
+
+}

+ 476 - 0
Userland/Libraries/LibAudio/PulseAudioWrappers.cpp

@@ -0,0 +1,476 @@
+/*
+ * Copyright (c) 2023, Gregory Bertilson <zaggy1024@gmail.com>
+ *
+ * SPDX-License-Identifier: BSD-2-Clause
+ */
+
+#include "PulseAudioWrappers.h"
+
+#include <AK/WeakPtr.h>
+#include <LibThreading/Mutex.h>
+
+namespace Audio {
+
+ErrorOr<NonnullRefPtr<PulseAudioContext>> PulseAudioContext::instance()
+{
+    // Use a weak pointer to allow the context to be shut down if we stop outputting audio.
+    static WeakPtr<PulseAudioContext> the_instance;
+    static Threading::Mutex instantiation_mutex;
+    auto instantiation_locker = Threading::MutexLocker(instantiation_mutex);
+
+    RefPtr<PulseAudioContext> strong_instance_pointer = the_instance.strong_ref();
+
+    if (strong_instance_pointer == nullptr) {
+        auto* main_loop = pa_threaded_mainloop_new();
+        if (main_loop == nullptr)
+            return Error::from_string_literal("Failed to create PulseAudio main loop");
+
+        auto* api = pa_threaded_mainloop_get_api(main_loop);
+        if (api == nullptr)
+            return Error::from_string_literal("Failed to get PulseAudio API");
+
+        auto* context = pa_context_new(api, "Ladybird");
+        if (context == nullptr)
+            return Error::from_string_literal("Failed to get PulseAudio connection context");
+
+        strong_instance_pointer = make_ref_counted<PulseAudioContext>(main_loop, api, context);
+
+        // Set a callback to signal ourselves to wake when the state changes, so that we can
+        // synchronously wait for the connection.
+        pa_context_set_state_callback(
+            context, [](pa_context*, void* user_data) {
+                static_cast<PulseAudioContext*>(user_data)->signal_to_wake();
+            },
+            strong_instance_pointer.ptr());
+
+        if (auto error = pa_context_connect(context, nullptr, PA_CONTEXT_NOFLAGS, nullptr); error < 0) {
+            warnln("Starting PulseAudio context connection failed with error: {}", pulse_audio_error_to_string(static_cast<PulseAudioErrorCode>(-error)));
+            return Error::from_string_literal("Error while starting PulseAudio daemon connection");
+        }
+
+        if (auto error = pa_threaded_mainloop_start(main_loop); error < 0) {
+            warnln("Starting PulseAudio main loop failed with error: {}", pulse_audio_error_to_string(static_cast<PulseAudioErrorCode>(-error)));
+            return Error::from_string_literal("Failed to start PulseAudio main loop");
+        }
+
+        {
+            auto locker = strong_instance_pointer->main_loop_locker();
+            while (true) {
+                bool is_ready = false;
+                switch (strong_instance_pointer->get_connection_state()) {
+                case PulseAudioContextState::Connecting:
+                case PulseAudioContextState::Authorizing:
+                case PulseAudioContextState::SettingName:
+                    break;
+                case PulseAudioContextState::Ready:
+                    is_ready = true;
+                    break;
+                case PulseAudioContextState::Failed:
+                    warnln("PulseAudio server connection failed with error: {}", pulse_audio_error_to_string(strong_instance_pointer->get_last_error()));
+                    return Error::from_string_literal("Failed to connect to PulseAudio server");
+                case PulseAudioContextState::Unconnected:
+                case PulseAudioContextState::Terminated:
+                    VERIFY_NOT_REACHED();
+                    break;
+                }
+
+                if (is_ready)
+                    break;
+
+                strong_instance_pointer->wait_for_signal();
+            }
+
+            pa_context_set_state_callback(context, nullptr, nullptr);
+        }
+
+        the_instance = strong_instance_pointer;
+    }
+
+    return strong_instance_pointer.release_nonnull();
+}
+
+PulseAudioContext::PulseAudioContext(pa_threaded_mainloop* main_loop, pa_mainloop_api* api, pa_context* context)
+    : m_main_loop(main_loop)
+    , m_api(api)
+    , m_context(context)
+{
+}
+
+PulseAudioContext::~PulseAudioContext()
+{
+    pa_context_disconnect(m_context);
+    pa_context_unref(m_context);
+    pa_threaded_mainloop_stop(m_main_loop);
+    pa_threaded_mainloop_free(m_main_loop);
+}
+
+bool PulseAudioContext::current_thread_is_main_loop_thread()
+{
+    return static_cast<bool>(pa_threaded_mainloop_in_thread(m_main_loop));
+}
+
+void PulseAudioContext::lock_main_loop()
+{
+    if (!current_thread_is_main_loop_thread())
+        pa_threaded_mainloop_lock(m_main_loop);
+}
+
+void PulseAudioContext::unlock_main_loop()
+{
+    if (!current_thread_is_main_loop_thread())
+        pa_threaded_mainloop_unlock(m_main_loop);
+}
+
+void PulseAudioContext::wait_for_signal()
+{
+    pa_threaded_mainloop_wait(m_main_loop);
+}
+
+void PulseAudioContext::signal_to_wake()
+{
+    pa_threaded_mainloop_signal(m_main_loop, 0);
+}
+
+PulseAudioContextState PulseAudioContext::get_connection_state()
+{
+    return static_cast<PulseAudioContextState>(pa_context_get_state(m_context));
+}
+
+bool PulseAudioContext::connection_is_good()
+{
+    return PA_CONTEXT_IS_GOOD(pa_context_get_state(m_context));
+}
+
+PulseAudioErrorCode PulseAudioContext::get_last_error()
+{
+    return static_cast<PulseAudioErrorCode>(pa_context_errno(m_context));
+}
+
+#define STREAM_SIGNAL_CALLBACK(stream)                                          \
+    [](auto*, int, void* user_data) {                                           \
+        static_cast<PulseAudioStream*>(user_data)->m_context->signal_to_wake(); \
+    },                                                                          \
+        (stream)
+
+ErrorOr<NonnullRefPtr<PulseAudioStream>> PulseAudioContext::create_stream(OutputState initial_state, u32 sample_rate, u8 channels, u32 target_latency_ms, PulseAudioDataRequestCallback write_callback)
+{
+    auto locker = main_loop_locker();
+
+    VERIFY(get_connection_state() == PulseAudioContextState::Ready);
+    pa_sample_spec sample_specification {
+        // FIXME: Support more audio sample types.
+        __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ ? PA_SAMPLE_FLOAT32LE : PA_SAMPLE_FLOAT32BE,
+        sample_rate,
+        channels,
+    };
+
+    // Check the sample specification and channel map here. These are also checked by stream_new(),
+    // but we can return a more accurate error if we check beforehand.
+    if (pa_sample_spec_valid(&sample_specification) == 0)
+        return Error::from_string_literal("PulseAudio sample specification is invalid");
+    pa_channel_map channel_map;
+    if (pa_channel_map_init_auto(&channel_map, sample_specification.channels, PA_CHANNEL_MAP_DEFAULT) == 0) {
+        warnln("Getting default PulseAudio channel map failed with error: {}", pulse_audio_error_to_string(get_last_error()));
+        return Error::from_string_literal("Failed to get default PulseAudio channel map");
+    }
+
+    // Create the stream object and set a callback to signal ourselves to wake when the stream changes states,
+    // allowing us to wait synchronously for it to become Ready or Failed.
+    auto* stream = pa_stream_new_with_proplist(m_context, "Audio Stream", &sample_specification, &channel_map, nullptr);
+    if (stream == nullptr) {
+        warnln("Instantiating PulseAudio stream failed with error: {}", pulse_audio_error_to_string(get_last_error()));
+        return Error::from_string_literal("Failed to create PulseAudio stream");
+    }
+    pa_stream_set_state_callback(
+        stream, [](pa_stream*, void* user_data) {
+            static_cast<PulseAudioContext*>(user_data)->signal_to_wake();
+        },
+        this);
+
+    auto stream_wrapper = TRY(adopt_nonnull_ref_or_enomem(new (nothrow) PulseAudioStream(NonnullRefPtr(*this), stream)));
+
+    stream_wrapper->m_write_callback = move(write_callback);
+    pa_stream_set_write_callback(
+        stream, [](pa_stream* stream, size_t bytes_to_write, void* user_data) {
+            auto& stream_wrapper = *static_cast<PulseAudioStream*>(user_data);
+            VERIFY(stream_wrapper.m_stream == stream);
+            stream_wrapper.on_write_requested(bytes_to_write);
+        },
+        stream_wrapper.ptr());
+
+    // Borrowing logic from cubeb to set reasonable buffer sizes for a target latency:
+    // https://searchfox.org/mozilla-central/rev/3b707c8fd7e978eebf24279ee51ccf07895cfbcb/third_party/rust/cubeb-sys/libcubeb/src/cubeb_pulse.c#910-927
+    pa_buffer_attr buffer_attributes;
+    buffer_attributes.maxlength = -1;
+    buffer_attributes.prebuf = -1;
+    buffer_attributes.tlength = target_latency_ms * sample_rate / 1000;
+    buffer_attributes.minreq = buffer_attributes.tlength / 4;
+    buffer_attributes.fragsize = buffer_attributes.minreq;
+    auto flags = static_cast<pa_stream_flags>(PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_ADJUST_LATENCY | PA_STREAM_RELATIVE_VOLUME);
+
+    if (initial_state == OutputState::Suspended) {
+        stream_wrapper->m_suspended = true;
+        flags = static_cast<pa_stream_flags>(static_cast<u32>(flags) | PA_STREAM_START_CORKED);
+    }
+
+    // This is a workaround for an issue with starting the stream corked, see PulseAudioPlaybackStream::total_time_played().
+    pa_stream_set_started_callback(
+        stream, [](pa_stream* stream, void* user_data) {
+            static_cast<PulseAudioStream*>(user_data)->m_started_playback = true;
+            pa_stream_set_started_callback(stream, nullptr, nullptr);
+        },
+        stream_wrapper.ptr());
+
+    pa_stream_set_underflow_callback(
+        stream, [](pa_stream*, void* user_data) {
+            auto& stream = *static_cast<PulseAudioStream*>(user_data);
+            if (stream.m_underrun_callback)
+                stream.m_underrun_callback();
+        },
+        stream_wrapper.ptr());
+
+    if (auto error = pa_stream_connect_playback(stream, nullptr, &buffer_attributes, flags, nullptr, nullptr); error != 0) {
+        warnln("PulseAudio stream connection failed with error: {}", pulse_audio_error_to_string(static_cast<PulseAudioErrorCode>(error)));
+        return Error::from_string_literal("Error while connecting the PulseAudio stream");
+    }
+
+    // FIXME: This should be asynchronous if connection can take longer than a fraction of a second.
+    while (true) {
+        bool is_ready = false;
+        switch (stream_wrapper->get_connection_state()) {
+        case PulseAudioStreamState::Creating:
+            break;
+        case PulseAudioStreamState::Ready:
+            is_ready = true;
+            break;
+        case PulseAudioStreamState::Failed:
+            return Error::from_string_literal("Failed to connect to PulseAudio daemon");
+        case PulseAudioStreamState::Unconnected:
+        case PulseAudioStreamState::Terminated:
+            VERIFY_NOT_REACHED();
+            break;
+        }
+        if (is_ready)
+            break;
+
+        wait_for_signal();
+    }
+
+    return stream_wrapper;
+}
+
+PulseAudioStream::~PulseAudioStream()
+{
+    pa_stream_unref(m_stream);
+}
+
+PulseAudioStreamState PulseAudioStream::get_connection_state()
+{
+    return static_cast<PulseAudioStreamState>(pa_stream_get_state(m_stream));
+}
+
+bool PulseAudioStream::connection_is_good()
+{
+    return PA_STREAM_IS_GOOD(pa_stream_get_state(m_stream));
+}
+
+void PulseAudioStream::set_underrun_callback(Function<void()> callback)
+{
+    auto locker = m_context->main_loop_locker();
+    m_underrun_callback = move(callback);
+}
+
+u32 PulseAudioStream::sample_rate()
+{
+    return pa_stream_get_sample_spec(m_stream)->rate;
+}
+
+size_t PulseAudioStream::sample_size()
+{
+    return pa_sample_size(pa_stream_get_sample_spec(m_stream));
+}
+
+size_t PulseAudioStream::frame_size()
+{
+    return pa_frame_size(pa_stream_get_sample_spec(m_stream));
+}
+
+u8 PulseAudioStream::channel_count()
+{
+    return pa_stream_get_sample_spec(m_stream)->channels;
+}
+
+void PulseAudioStream::on_write_requested(size_t bytes_to_write)
+{
+    VERIFY(m_write_callback);
+    if (m_suspended)
+        return;
+    while (bytes_to_write > 0) {
+        auto buffer = begin_write(bytes_to_write).release_value_but_fixme_should_propagate_errors();
+        auto frame_size = this->frame_size();
+        VERIFY(buffer.size() % frame_size == 0);
+        auto written_buffer = m_write_callback(*this, buffer, buffer.size() / frame_size);
+        if (written_buffer.size() == 0) {
+            cancel_write().release_value_but_fixme_should_propagate_errors();
+            break;
+        }
+        bytes_to_write -= written_buffer.size();
+        write(written_buffer).release_value_but_fixme_should_propagate_errors();
+    }
+}
+
+ErrorOr<Bytes> PulseAudioStream::begin_write(size_t bytes_to_write)
+{
+    void* data_pointer;
+    size_t data_size = bytes_to_write;
+    if (pa_stream_begin_write(m_stream, &data_pointer, &data_size) != 0 || data_pointer == nullptr)
+        return Error::from_string_literal("Failed to get the playback stream's write buffer from PulseAudio");
+    return Bytes { data_pointer, data_size };
+}
+
+ErrorOr<void> PulseAudioStream::write(ReadonlyBytes data)
+{
+    if (pa_stream_write(m_stream, data.data(), data.size(), nullptr, 0, PA_SEEK_RELATIVE) != 0)
+        return Error::from_string_literal("Failed to write data to PulseAudio playback stream");
+    return {};
+}
+
+ErrorOr<void> PulseAudioStream::cancel_write()
+{
+    if (pa_stream_cancel_write(m_stream) != 0)
+        return Error::from_string_literal("Failed to get the playback stream's write buffer from PulseAudio");
+    return {};
+}
+
+bool PulseAudioStream::is_suspended() const
+{
+    return m_suspended;
+}
+
+StringView pulse_audio_error_to_string(PulseAudioErrorCode code)
+{
+    if (code < PulseAudioErrorCode::OK || code >= PulseAudioErrorCode::Sentinel)
+        return "Unknown error code"sv;
+
+    char const* string = pa_strerror(static_cast<int>(code));
+    return StringView { string, strlen(string) };
+}
+
+ErrorOr<void> PulseAudioStream::wait_for_operation(pa_operation* operation, StringView error_message)
+{
+    while (pa_operation_get_state(operation) == PA_OPERATION_RUNNING)
+        m_context->wait_for_signal();
+    if (!m_context->connection_is_good() || !this->connection_is_good()) {
+        auto pulse_audio_error_name = pulse_audio_error_to_string(m_context->get_last_error());
+        warnln("Encountered stream error: {}", pulse_audio_error_name);
+        return Error::from_string_view(error_message);
+    }
+    pa_operation_unref(operation);
+    return {};
+}
+
+ErrorOr<void> PulseAudioStream::drain_and_suspend()
+{
+    auto locker = m_context->main_loop_locker();
+
+    if (m_suspended)
+        return {};
+    m_suspended = true;
+
+    if (pa_stream_is_corked(m_stream) > 0)
+        return {};
+
+    TRY(wait_for_operation(pa_stream_drain(m_stream, STREAM_SIGNAL_CALLBACK(this)), "Draining PulseAudio stream failed"sv));
+    TRY(wait_for_operation(pa_stream_cork(m_stream, 1, STREAM_SIGNAL_CALLBACK(this)), "Corking PulseAudio stream after drain failed"sv));
+    return {};
+}
+
+ErrorOr<void> PulseAudioStream::flush_and_suspend()
+{
+    auto locker = m_context->main_loop_locker();
+
+    if (m_suspended)
+        return {};
+    m_suspended = true;
+
+    if (pa_stream_is_corked(m_stream) > 0)
+        return {};
+
+    TRY(wait_for_operation(pa_stream_flush(m_stream, STREAM_SIGNAL_CALLBACK(this)), "Flushing PulseAudio stream failed"sv));
+    TRY(wait_for_operation(pa_stream_cork(m_stream, 1, STREAM_SIGNAL_CALLBACK(this)), "Corking PulseAudio stream after flush failed"sv));
+    return {};
+}
+
+ErrorOr<void> PulseAudioStream::resume()
+{
+    auto locker = m_context->main_loop_locker();
+
+    if (!m_suspended)
+        return {};
+    m_suspended = false;
+
+    TRY(wait_for_operation(pa_stream_cork(m_stream, 0, STREAM_SIGNAL_CALLBACK(this)), "Uncorking PulseAudio stream failed"sv));
+
+    // Defer a write to the playback buffer on the PulseAudio main loop. Otherwise, playback will not
+    // begin again, despite the fact that we uncorked.
+    // NOTE: We ref here and then unref in the callback so that this stream will not be deleted until
+    //       it finishes.
+    ref();
+    pa_mainloop_api_once(
+        m_context->m_api, [](pa_mainloop_api*, void* user_data) {
+            auto& stream = *static_cast<PulseAudioStream*>(user_data);
+            // NOTE: writable_size() returns -1 in case of an error. However, the value is still safe
+            //       since begin_write() will interpret -1 as a default parameter and choose a good size.
+            auto bytes_to_write = pa_stream_writable_size(stream.m_stream);
+            stream.on_write_requested(bytes_to_write);
+            stream.unref();
+        },
+        this);
+    return {};
+}
+
+ErrorOr<Duration> PulseAudioStream::total_time_played()
+{
+    auto locker = m_context->main_loop_locker();
+
+    // NOTE: This is a workaround for a PulseAudio issue. When a stream is started corked,
+    //       the time smoother doesn't seem to be aware of it, so it will return the time
+    //       since the stream was connected. Once the playback actually starts, the time
+    //       resets back to zero. However, since we request monotonically-increasing time,
+    //       this means that the smoother will register that it had a larger time before,
+    //       and return that time instead, until we reach a timestamp greater than the
+    //       last-returned time. If we never call pa_stream_get_time() until after giving
+    //       the stream its first samples, the issue never occurs.
+    if (!m_started_playback)
+        return Duration::zero();
+
+    pa_usec_t time = 0;
+    auto error = pa_stream_get_time(m_stream, &time);
+    if (error == -PA_ERR_NODATA)
+        return Duration::zero();
+    if (error != 0)
+        return Error::from_string_literal("Failed to get time from PulseAudio stream");
+    if (time > NumericLimits<i64>::max()) {
+        warnln("WARNING: Audio time is too large!");
+        time -= NumericLimits<i64>::max();
+    }
+    return Duration::from_microseconds(static_cast<i64>(time));
+}
+
+ErrorOr<void> PulseAudioStream::set_volume(double volume)
+{
+    auto locker = m_context->main_loop_locker();
+
+    auto index = pa_stream_get_index(m_stream);
+    if (index == PA_INVALID_INDEX)
+        return Error::from_string_literal("Failed to get PulseAudio stream index while setting volume");
+
+    auto pulse_volume = pa_sw_volume_from_linear(volume);
+    pa_cvolume per_channel_volumes;
+    pa_cvolume_set(&per_channel_volumes, channel_count(), pulse_volume);
+
+    auto* operation = pa_context_set_sink_input_volume(m_context->m_context, index, &per_channel_volumes, STREAM_SIGNAL_CALLBACK(this));
+    return wait_for_operation(operation, "Failed to set PulseAudio stream volume"sv);
+}
+
+}

+ 184 - 0
Userland/Libraries/LibAudio/PulseAudioWrappers.h

@@ -0,0 +1,184 @@
+/*
+ * Copyright (c) 2023, Gregory Bertilson <zaggy1024@gmail.com>
+ *
+ * SPDX-License-Identifier: BSD-2-Clause
+ */
+
+#pragma once
+
+#include <AK/AtomicRefCounted.h>
+#include <AK/Error.h>
+#include <AK/NonnullRefPtr.h>
+#include <AK/Time.h>
+#include <LibAudio/Forward.h>
+#include <LibAudio/PlaybackStream.h>
+#include <LibAudio/SampleFormats.h>
+#include <LibThreading/Thread.h>
+#include <pulse/pulseaudio.h>
+
+namespace Audio {
+
+class PulseAudioStream;
+
+enum class PulseAudioContextState {
+    Unconnected = PA_CONTEXT_UNCONNECTED,
+    Connecting = PA_CONTEXT_CONNECTING,
+    Authorizing = PA_CONTEXT_AUTHORIZING,
+    SettingName = PA_CONTEXT_SETTING_NAME,
+    Ready = PA_CONTEXT_READY,
+    Failed = PA_CONTEXT_FAILED,
+    Terminated = PA_CONTEXT_TERMINATED,
+};
+
+enum class PulseAudioErrorCode;
+
+using PulseAudioDataRequestCallback = Function<ReadonlyBytes(PulseAudioStream&, Bytes buffer, size_t sample_count)>;
+
+// A wrapper around the PulseAudio main loop and context structs.
+// Generally, only one instance of this should be needed for a single process.
+class PulseAudioContext
+    : public AtomicRefCounted<PulseAudioContext>
+    , public Weakable<PulseAudioContext> {
+public:
+    static ErrorOr<NonnullRefPtr<PulseAudioContext>> instance();
+
+    explicit PulseAudioContext(pa_threaded_mainloop*, pa_mainloop_api*, pa_context*);
+    PulseAudioContext(PulseAudioContext const& other) = delete;
+    ~PulseAudioContext();
+
+    bool current_thread_is_main_loop_thread();
+    void lock_main_loop();
+    void unlock_main_loop();
+    [[nodiscard]] auto main_loop_locker()
+    {
+        lock_main_loop();
+        return ScopeGuard([this]() { unlock_main_loop(); });
+    }
+    // Waits for signal_to_wake() to be called.
+    // This must be called with the main loop locked.
+    void wait_for_signal();
+    // Signals to wake all threads from calls to signal_to_wake()
+    void signal_to_wake();
+
+    PulseAudioContextState get_connection_state();
+    bool connection_is_good();
+    PulseAudioErrorCode get_last_error();
+
+    ErrorOr<NonnullRefPtr<PulseAudioStream>> create_stream(OutputState initial_state, u32 sample_rate, u8 channels, u32 target_latency_ms, PulseAudioDataRequestCallback write_callback);
+
+private:
+    friend class PulseAudioStream;
+
+    pa_threaded_mainloop* m_main_loop { nullptr };
+    pa_mainloop_api* m_api { nullptr };
+    pa_context* m_context;
+};
+
+enum class PulseAudioStreamState {
+    Unconnected = PA_STREAM_UNCONNECTED,
+    Creating = PA_STREAM_CREATING,
+    Ready = PA_STREAM_READY,
+    Failed = PA_STREAM_FAILED,
+    Terminated = PA_STREAM_TERMINATED,
+};
+
+class PulseAudioStream : public AtomicRefCounted<PulseAudioStream> {
+public:
+    static constexpr bool start_corked = true;
+
+    ~PulseAudioStream();
+
+    PulseAudioStreamState get_connection_state();
+    bool connection_is_good();
+
+    // Sets the callback to be run when the server consumes more of the buffer than
+    // has been written yet.
+    void set_underrun_callback(Function<void()>);
+
+    u32 sample_rate();
+    size_t sample_size();
+    size_t frame_size();
+    u8 channel_count();
+    // Gets a data buffer that can be written to and then passed back to PulseAudio through
+    // the write() function. This avoids a copy vs directly calling write().
+    ErrorOr<Bytes> begin_write(size_t bytes_to_write = NumericLimits<size_t>::max());
+    // Writes a data buffer to the playback stream.
+    ErrorOr<void> write(ReadonlyBytes data);
+    // Cancels the previous begin_write() call.
+    ErrorOr<void> cancel_write();
+
+    bool is_suspended() const;
+    // Plays back all buffered data and corks the stream. Until resume() is called, no data
+    // will be written to the stream.
+    ErrorOr<void> drain_and_suspend();
+    // Drops all buffered data and corks the stream. Until resume() is called, no data will
+    // be written to the stream.
+    ErrorOr<void> flush_and_suspend();
+    // Uncorks the stream and forces data to be written to the buffers to force playback to
+    // resume as soon as possible.
+    ErrorOr<void> resume();
+    ErrorOr<Duration> total_time_played();
+
+    ErrorOr<void> set_volume(double volume);
+
+    PulseAudioContext& context() { return *m_context; }
+
+private:
+    friend class PulseAudioContext;
+
+    explicit PulseAudioStream(NonnullRefPtr<PulseAudioContext>&& context, pa_stream* stream)
+        : m_context(context)
+        , m_stream(stream)
+    {
+    }
+    PulseAudioStream(PulseAudioStream const& other) = delete;
+
+    ErrorOr<void> wait_for_operation(pa_operation*, StringView error_message);
+
+    void on_write_requested(size_t bytes_to_write);
+
+    NonnullRefPtr<PulseAudioContext> m_context;
+    pa_stream* m_stream { nullptr };
+    bool m_started_playback { false };
+    PulseAudioDataRequestCallback m_write_callback { nullptr };
+    // Determines whether we will allow the write callback to run. This should only be true
+    // if the stream is becoming or is already corked.
+    bool m_suspended { false };
+
+    Function<void()> m_underrun_callback;
+};
+
+enum class PulseAudioErrorCode {
+    OK = 0,
+    AccessFailure,
+    UnknownCommand,
+    InvalidArgument,
+    EntityExists,
+    NoSuchEntity,
+    ConnectionRefused,
+    ProtocolError,
+    Timeout,
+    NoAuthenticationKey,
+    InternalError,
+    ConnectionTerminated,
+    EntityKilled,
+    InvalidServer,
+    NoduleInitFailed,
+    BadState,
+    NoData,
+    IncompatibleProtocolVersion,
+    DataTooLarge,
+    NotSupported,
+    Unknown,
+    NoExtension,
+    Obsolete,
+    NotImplemented,
+    CalledFromFork,
+    IOError,
+    Busy,
+    Sentinel
+};
+
+StringView pulse_audio_error_to_string(PulseAudioErrorCode code);
+
+}