diff --git a/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp b/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp index 2f500322d2d..4ccdc3738fd 100644 --- a/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp +++ b/Userland/Libraries/LibWeb/Streams/AbstractOperations.cpp @@ -2,7 +2,7 @@ * Copyright (c) 2022, Linus Groh * Copyright (c) 2023, Matthew Olsson * Copyright (c) 2023-2024, Shannon Booth - * Copyright (c) 2023, Kenneth Myhra + * Copyright (c) 2023-2024, Kenneth Myhra * * SPDX-License-Identifier: BSD-2-Clause */ @@ -241,6 +241,90 @@ bool readable_stream_has_default_reader(ReadableStream const& stream) return false; } +// https://streams.spec.whatwg.org/#readable-stream-pipe-to +WebIDL::ExceptionOr> readable_stream_pipe_to(ReadableStream& source, WritableStream& dest, bool, bool, bool, Optional signal) +{ + auto& realm = source.realm(); + + // 1. Assert: source implements ReadableStream. + // 2. Assert: dest implements WritableStream. + // 3. Assert: preventClose, preventAbort, and preventCancel are all booleans. + + // 4. If signal was not given, let signal be undefined. + if (!signal.has_value()) + signal = JS::js_undefined(); + + // 5. Assert: either signal is undefined, or signal implements AbortSignal. + VERIFY(signal->is_undefined() || (signal->is_object() && is(signal->as_object()))); + + // 6. Assert: ! IsReadableStreamLocked(source) is false. + VERIFY(!is_readable_stream_locked(source)); + + // 7. Assert: ! IsWritableStreamLocked(dest) is false. + VERIFY(!is_writable_stream_locked(dest)); + + // 8. If source.[[controller]] implements ReadableByteStreamController, let reader be either ! AcquireReadableStreamBYOBReader(source) + // or ! AcquireReadableStreamDefaultReader(source), at the user agent’s discretion. + // 9. Otherwise, let reader be ! AcquireReadableStreamDefaultReader(source). + auto reader = MUST(source.controller()->visit( + [](auto const& controller) { + return acquire_readable_stream_default_reader(*controller->stream()); + })); + + // 10. Let writer be ! AcquireWritableStreamDefaultWriter(dest). + auto writer = MUST(acquire_writable_stream_default_writer(dest)); + + // 11. Set source.[[disturbed]] to true. + source.set_disturbed(true); + + // FIXME: 12. Let shuttingDown be false. + + // 13. Let promise be a new promise. + auto promise = WebIDL::create_promise(realm); + + // FIXME 14. If signal is not undefined, + // 1. Let abortAlgorithm be the following steps: + // 1. Let error be signal’s abort reason. + // 2. Let actions be an empty ordered set. + // 3. If preventAbort is false, append the following action to actions: + // 1. If dest.[[state]] is "writable", return ! WritableStreamAbort(dest, error). + // 2. Otherwise, return a promise resolved with undefined. + // 4. If preventCancel is false, append the following action to actions: + // 1. If source.[[state]] is "readable", return ! ReadableStreamCancel(source, error). + // 2. Otherwise, return a promise resolved with undefined. + // 5. Shutdown with an action consisting of getting a promise to wait for all of the actions in actions, and with error. + // 2. If signal is aborted, perform abortAlgorithm and return promise. + // 3. Add abortAlgorithm to signal. + + // 15. In parallel but not really; see #905, using reader and writer, read all chunks from source and write them to + // dest. Due to the locking provided by the reader and writer, the exact manner in which this happens is not + // observable to author code, and so there is flexibility in how this is done. The following constraints apply + // regardless of the exact algorithm used: + // - Public API must not be used: while reading or writing, or performing any of the operations below, the + // JavaScript-modifiable reader, writer, and stream APIs (i.e. methods on the appropriate prototypes) must not + // be used. Instead, the streams must be manipulated directly. + + // FIXME: Currently a naive implementation that uses ReadableStreamDefaultReader::read_all_bytes() to read all chunks + // from the source and then through the callback success_steps writes those chunks to the destination. + auto success_steps = [promise, &realm, writer](Vector const& bytes) { + for (auto byte_buffer : bytes) { + auto buffer = JS::ArrayBuffer::create(realm, move(byte_buffer)); + auto inner_promise = MUST(writable_stream_default_writer_write(writer, JS::Value { buffer })); + WebIDL::resolve_promise(realm, inner_promise, JS::js_undefined()); + } + WebIDL::resolve_promise(realm, promise, JS::js_undefined()); + }; + + auto failure_steps = [promise, &realm](JS::Value error) { + WebIDL::reject_promise(realm, promise, error); + }; + + TRY(reader->read_all_bytes(move(success_steps), move(failure_steps))); + + // 16. Return promise. + return promise; +} + // https://streams.spec.whatwg.org/#readable-stream-tee WebIDL::ExceptionOr readable_stream_tee(JS::Realm& realm, ReadableStream& stream, bool clone_for_branch2) { diff --git a/Userland/Libraries/LibWeb/Streams/AbstractOperations.h b/Userland/Libraries/LibWeb/Streams/AbstractOperations.h index cba0df38c0f..85efcebdcf3 100644 --- a/Userland/Libraries/LibWeb/Streams/AbstractOperations.h +++ b/Userland/Libraries/LibWeb/Streams/AbstractOperations.h @@ -2,7 +2,7 @@ * Copyright (c) 2022, Linus Groh * Copyright (c) 2023, Matthew Olsson * Copyright (c) 2023-2024, Shannon Booth - * Copyright (c) 2023, Kenneth Myhra + * Copyright (c) 2023-2024, Kenneth Myhra * * SPDX-License-Identifier: BSD-2-Clause */ @@ -47,6 +47,8 @@ size_t readable_stream_get_num_read_requests(ReadableStream const&); bool readable_stream_has_byob_reader(ReadableStream const&); bool readable_stream_has_default_reader(ReadableStream const&); +WebIDL::ExceptionOr> readable_stream_pipe_to(ReadableStream& source, WritableStream& dest, bool prevent_close, bool prevent_abort, bool prevent_cancel, Optional signal); + WebIDL::ExceptionOr readable_stream_tee(JS::Realm&, ReadableStream&, bool clone_for_branch2); WebIDL::ExceptionOr readable_stream_default_tee(JS::Realm& realm, ReadableStream& stream, bool clone_for_branch2); WebIDL::ExceptionOr readable_byte_stream_tee(JS::Realm& realm, ReadableStream& stream);