LibWeb: Implement AO readable_stream_pipe_to()

This is currently a naive implementation of readable_stream_pipe_to()
which will need some further iterations before it is par with the spec.
This commit is contained in:
Kenneth Myhra 2024-04-06 18:56:42 +02:00 committed by Andreas Kling
parent 12cfa08a09
commit 559d983fa1
Notes: sideshowbarker 2024-07-17 11:33:34 +09:00
2 changed files with 88 additions and 2 deletions

View file

@ -2,7 +2,7 @@
* Copyright (c) 2022, Linus Groh <linusg@serenityos.org>
* Copyright (c) 2023, Matthew Olsson <mattco@serenityos.org>
* Copyright (c) 2023-2024, Shannon Booth <shannon@serenityos.org>
* Copyright (c) 2023, Kenneth Myhra <kennethmyhra@serenityos.org>
* Copyright (c) 2023-2024, Kenneth Myhra <kennethmyhra@serenityos.org>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
@ -241,6 +241,90 @@ bool readable_stream_has_default_reader(ReadableStream const& stream)
return false;
}
// https://streams.spec.whatwg.org/#readable-stream-pipe-to
WebIDL::ExceptionOr<JS::NonnullGCPtr<WebIDL::Promise>> readable_stream_pipe_to(ReadableStream& source, WritableStream& dest, bool, bool, bool, Optional<JS::Value> signal)
{
auto& realm = source.realm();
// 1. Assert: source implements ReadableStream.
// 2. Assert: dest implements WritableStream.
// 3. Assert: preventClose, preventAbort, and preventCancel are all booleans.
// 4. If signal was not given, let signal be undefined.
if (!signal.has_value())
signal = JS::js_undefined();
// 5. Assert: either signal is undefined, or signal implements AbortSignal.
VERIFY(signal->is_undefined() || (signal->is_object() && is<DOM::AbortSignal>(signal->as_object())));
// 6. Assert: ! IsReadableStreamLocked(source) is false.
VERIFY(!is_readable_stream_locked(source));
// 7. Assert: ! IsWritableStreamLocked(dest) is false.
VERIFY(!is_writable_stream_locked(dest));
// 8. If source.[[controller]] implements ReadableByteStreamController, let reader be either ! AcquireReadableStreamBYOBReader(source)
// or ! AcquireReadableStreamDefaultReader(source), at the user agents discretion.
// 9. Otherwise, let reader be ! AcquireReadableStreamDefaultReader(source).
auto reader = MUST(source.controller()->visit(
[](auto const& controller) {
return acquire_readable_stream_default_reader(*controller->stream());
}));
// 10. Let writer be ! AcquireWritableStreamDefaultWriter(dest).
auto writer = MUST(acquire_writable_stream_default_writer(dest));
// 11. Set source.[[disturbed]] to true.
source.set_disturbed(true);
// FIXME: 12. Let shuttingDown be false.
// 13. Let promise be a new promise.
auto promise = WebIDL::create_promise(realm);
// FIXME 14. If signal is not undefined,
// 1. Let abortAlgorithm be the following steps:
// 1. Let error be signals abort reason.
// 2. Let actions be an empty ordered set.
// 3. If preventAbort is false, append the following action to actions:
// 1. If dest.[[state]] is "writable", return ! WritableStreamAbort(dest, error).
// 2. Otherwise, return a promise resolved with undefined.
// 4. If preventCancel is false, append the following action to actions:
// 1. If source.[[state]] is "readable", return ! ReadableStreamCancel(source, error).
// 2. Otherwise, return a promise resolved with undefined.
// 5. Shutdown with an action consisting of getting a promise to wait for all of the actions in actions, and with error.
// 2. If signal is aborted, perform abortAlgorithm and return promise.
// 3. Add abortAlgorithm to signal.
// 15. In parallel but not really; see #905, using reader and writer, read all chunks from source and write them to
// dest. Due to the locking provided by the reader and writer, the exact manner in which this happens is not
// observable to author code, and so there is flexibility in how this is done. The following constraints apply
// regardless of the exact algorithm used:
// - Public API must not be used: while reading or writing, or performing any of the operations below, the
// JavaScript-modifiable reader, writer, and stream APIs (i.e. methods on the appropriate prototypes) must not
// be used. Instead, the streams must be manipulated directly.
// FIXME: Currently a naive implementation that uses ReadableStreamDefaultReader::read_all_bytes() to read all chunks
// from the source and then through the callback success_steps writes those chunks to the destination.
auto success_steps = [promise, &realm, writer](Vector<ByteBuffer> const& bytes) {
for (auto byte_buffer : bytes) {
auto buffer = JS::ArrayBuffer::create(realm, move(byte_buffer));
auto inner_promise = MUST(writable_stream_default_writer_write(writer, JS::Value { buffer }));
WebIDL::resolve_promise(realm, inner_promise, JS::js_undefined());
}
WebIDL::resolve_promise(realm, promise, JS::js_undefined());
};
auto failure_steps = [promise, &realm](JS::Value error) {
WebIDL::reject_promise(realm, promise, error);
};
TRY(reader->read_all_bytes(move(success_steps), move(failure_steps)));
// 16. Return promise.
return promise;
}
// https://streams.spec.whatwg.org/#readable-stream-tee
WebIDL::ExceptionOr<ReadableStreamPair> readable_stream_tee(JS::Realm& realm, ReadableStream& stream, bool clone_for_branch2)
{

View file

@ -2,7 +2,7 @@
* Copyright (c) 2022, Linus Groh <linusg@serenityos.org>
* Copyright (c) 2023, Matthew Olsson <mattco@serenityos.org>
* Copyright (c) 2023-2024, Shannon Booth <shannon@serenityos.org>
* Copyright (c) 2023, Kenneth Myhra <kennethmyhra@serenityos.org>
* Copyright (c) 2023-2024, Kenneth Myhra <kennethmyhra@serenityos.org>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
@ -47,6 +47,8 @@ size_t readable_stream_get_num_read_requests(ReadableStream const&);
bool readable_stream_has_byob_reader(ReadableStream const&);
bool readable_stream_has_default_reader(ReadableStream const&);
WebIDL::ExceptionOr<JS::NonnullGCPtr<WebIDL::Promise>> readable_stream_pipe_to(ReadableStream& source, WritableStream& dest, bool prevent_close, bool prevent_abort, bool prevent_cancel, Optional<JS::Value> signal);
WebIDL::ExceptionOr<ReadableStreamPair> readable_stream_tee(JS::Realm&, ReadableStream&, bool clone_for_branch2);
WebIDL::ExceptionOr<ReadableStreamPair> readable_stream_default_tee(JS::Realm& realm, ReadableStream& stream, bool clone_for_branch2);
WebIDL::ExceptionOr<ReadableStreamPair> readable_byte_stream_tee(JS::Realm& realm, ReadableStream& stream);