mirror of
https://github.com/LadybirdBrowser/ladybird.git
synced 2024-12-04 13:30:31 +00:00
LibWeb: Implement the ReadableByteStreamTee half of ReadableStreamTee
This commit is contained in:
parent
d7612969e0
commit
6981ddfe13
Notes:
sideshowbarker
2024-07-17 08:34:29 +09:00
Author: https://github.com/trflynn89 Commit: https://github.com/SerenityOS/serenity/commit/6981ddfe13 Pull-request: https://github.com/SerenityOS/serenity/pull/22977 Reviewed-by: https://github.com/shannonbooth ✅
6 changed files with 785 additions and 1 deletions
|
@ -0,0 +1,6 @@
|
|||
stream1: abcdefghijklmnopqrstuvwxyz
|
||||
stream1: ABCDEFGHIJKLMNOPQRSTUVWXYZ
|
||||
stream1: 0123456789!@#$%^&*()-=_+,<
|
||||
stream1: Done!
|
||||
stream2: abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*()-=_+,<
|
||||
stream2: Done!
|
|
@ -0,0 +1,8 @@
|
|||
stream1: abcdefghijklmnopqrstuvwxyz
|
||||
stream1: ABCDEFGHIJKLMNOPQRSTUVWXYZ
|
||||
stream1: 0123456789!@#$%^&*()-=_+,<
|
||||
stream1: Done!
|
||||
stream2: abcdefghijklmnopqrstuvwxyz
|
||||
stream2: ABCDEFGHIJKLMNOPQRSTUVWXYZ
|
||||
stream2: 0123456789!@#$%^&*()-=_+,<
|
||||
stream2: Done!
|
|
@ -0,0 +1,84 @@
|
|||
<script src="../include.js"></script>
|
||||
<script>
|
||||
const CHUNK1 = "abcdefghijklmnopqrstuvwxyz";
|
||||
const CHUNK2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
|
||||
const CHUNK3 = "0123456789!@#$%^&*()-=_+,<";
|
||||
|
||||
const readStream = (stream, name) => {
|
||||
const reader = stream.getReader({ mode: "byob" });
|
||||
|
||||
let buffer = new ArrayBuffer(256);
|
||||
let offset = 0;
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const processText = ({ done, value }) => {
|
||||
if (done) {
|
||||
println(`${name}: Done!`);
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
|
||||
buffer = value.buffer;
|
||||
offset += value.byteLength;
|
||||
|
||||
value = new TextDecoder().decode(value);
|
||||
println(`${name}: ${value}`);
|
||||
|
||||
return reader
|
||||
.read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
|
||||
.then(processText);
|
||||
};
|
||||
|
||||
reader
|
||||
.read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
|
||||
.then(processText);
|
||||
});
|
||||
};
|
||||
|
||||
const writeStream = (controller, data) => {
|
||||
const view = controller.byobRequest.view;
|
||||
|
||||
const target = new Uint8Array(view.buffer, view.byteOffset, view.byteLength);
|
||||
const source = Uint8Array.from(Array.from(data).map(ch => ch.charCodeAt(0)));
|
||||
|
||||
for (let i = 0; i < source.length; ++i) {
|
||||
target[i] = source[i];
|
||||
}
|
||||
|
||||
controller.byobRequest.respond(source.length);
|
||||
};
|
||||
|
||||
asyncTest(done => {
|
||||
const stream = new ReadableStream({
|
||||
type: "bytes",
|
||||
|
||||
start(controller) {
|
||||
pullCount = 0;
|
||||
},
|
||||
|
||||
pull(controller) {
|
||||
const view = controller.byobRequest.view;
|
||||
++pullCount;
|
||||
|
||||
if (pullCount == 1) {
|
||||
writeStream(controller, CHUNK1);
|
||||
} else if (pullCount == 2) {
|
||||
writeStream(controller, CHUNK2);
|
||||
} else if (pullCount == 3) {
|
||||
writeStream(controller, CHUNK3);
|
||||
} else {
|
||||
controller.close();
|
||||
controller.byobRequest.respond(0);
|
||||
}
|
||||
},
|
||||
|
||||
cancel() {},
|
||||
});
|
||||
|
||||
const teed = stream.tee();
|
||||
|
||||
readStream(teed[0], "stream1").then(() => {
|
||||
readStream(teed[1], "stream2").then(done);
|
||||
});
|
||||
});
|
||||
</script>
|
|
@ -0,0 +1,64 @@
|
|||
<script src="../include.js"></script>
|
||||
<script>
|
||||
const CHUNK1 = "abcdefghijklmnopqrstuvwxyz";
|
||||
const CHUNK2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
|
||||
const CHUNK3 = "0123456789!@#$%^&*()-=_+,<";
|
||||
|
||||
const readStream = (stream, name) => {
|
||||
const reader = stream.getReader();
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const processText = ({ done, value }) => {
|
||||
if (done) {
|
||||
println(`${name}: Done!`);
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
|
||||
value = new TextDecoder().decode(value);
|
||||
println(`${name}: ${value}`);
|
||||
|
||||
return reader.read().then(processText);
|
||||
};
|
||||
|
||||
reader.read().then(processText);
|
||||
});
|
||||
};
|
||||
|
||||
const writeStream = (controller, data) => {
|
||||
const source = Uint8Array.from(Array.from(data).map(ch => ch.charCodeAt(0)));
|
||||
controller.enqueue(source);
|
||||
};
|
||||
|
||||
asyncTest(done => {
|
||||
const stream = new ReadableStream({
|
||||
type: "bytes",
|
||||
|
||||
start(controller) {
|
||||
pullCount = 0;
|
||||
},
|
||||
|
||||
pull(controller) {
|
||||
++pullCount;
|
||||
|
||||
if (pullCount == 1) {
|
||||
writeStream(controller, CHUNK1);
|
||||
} else if (pullCount == 2) {
|
||||
writeStream(controller, CHUNK2);
|
||||
} else if (pullCount == 3) {
|
||||
writeStream(controller, CHUNK3);
|
||||
} else {
|
||||
controller.close();
|
||||
}
|
||||
},
|
||||
|
||||
cancel() {},
|
||||
});
|
||||
|
||||
const teed = stream.tee();
|
||||
|
||||
readStream(teed[0], "stream1").then(() => {
|
||||
readStream(teed[1], "stream2").then(done);
|
||||
});
|
||||
});
|
||||
</script>
|
|
@ -249,7 +249,7 @@ WebIDL::ExceptionOr<ReadableStreamPair> readable_stream_tee(JS::Realm& realm, Re
|
|||
|
||||
// 3. If stream.[[controller]] implements ReadableByteStreamController, return ? ReadableByteStreamTee(stream).
|
||||
if (stream.controller()->has<JS::NonnullGCPtr<Streams::ReadableByteStreamController>>()) {
|
||||
return realm.vm().throw_completion<JS::InternalError>(JS::ErrorType::NotImplemented, "Byte stream teeing");
|
||||
return TRY(readable_byte_stream_tee(realm, stream));
|
||||
}
|
||||
|
||||
// 4. Return ? ReadableStreamDefaultTee(stream, cloneForBranch2).
|
||||
|
@ -557,6 +557,627 @@ WebIDL::ExceptionOr<ReadableStreamPair> readable_stream_default_tee(JS::Realm& r
|
|||
return ReadableStreamPair { *params->branch1, *params->branch2 };
|
||||
}
|
||||
|
||||
struct ByteStreamTeeParams final : JS::Cell {
|
||||
JS_CELL(TeeParams, JS::Cell);
|
||||
JS_DECLARE_ALLOCATOR(ByteStreamTeeParams);
|
||||
|
||||
explicit ByteStreamTeeParams(ReadableStreamReader reader)
|
||||
: reader(move(reader))
|
||||
{
|
||||
}
|
||||
|
||||
virtual void visit_edges(Visitor& visitor) override
|
||||
{
|
||||
Base::visit_edges(visitor);
|
||||
visitor.visit(reason1);
|
||||
visitor.visit(reason2);
|
||||
visitor.visit(branch1);
|
||||
visitor.visit(branch2);
|
||||
visitor.visit(pull1_algorithm);
|
||||
visitor.visit(pull2_algorithm);
|
||||
reader.visit([&](auto const& underlying_reader) { visitor.visit(underlying_reader); });
|
||||
}
|
||||
|
||||
bool reading { false };
|
||||
bool read_again_for_branch1 { false };
|
||||
bool read_again_for_branch2 { false };
|
||||
bool canceled1 { false };
|
||||
bool canceled2 { false };
|
||||
JS::Value reason1 { JS::js_undefined() };
|
||||
JS::Value reason2 { JS::js_undefined() };
|
||||
JS::GCPtr<ReadableStream> branch1;
|
||||
JS::GCPtr<ReadableStream> branch2;
|
||||
JS::GCPtr<PullAlgorithm> pull1_algorithm;
|
||||
JS::GCPtr<PullAlgorithm> pull2_algorithm;
|
||||
ReadableStreamReader reader;
|
||||
};
|
||||
|
||||
JS_DEFINE_ALLOCATOR(ByteStreamTeeParams);
|
||||
|
||||
// https://streams.spec.whatwg.org/#ref-for-read-request④
|
||||
class ByteStreamTeeDefaultReadRequest final : public ReadRequest {
|
||||
JS_CELL(ByteStreamTeeDefaultReadRequest, Cell);
|
||||
JS_DECLARE_ALLOCATOR(ByteStreamTeeDefaultReadRequest);
|
||||
|
||||
public:
|
||||
ByteStreamTeeDefaultReadRequest(
|
||||
JS::Realm& realm,
|
||||
JS::NonnullGCPtr<ReadableStream> stream,
|
||||
JS::NonnullGCPtr<ByteStreamTeeParams> params,
|
||||
JS::NonnullGCPtr<WebIDL::Promise> cancel_promise)
|
||||
: m_realm(realm)
|
||||
, m_stream(stream)
|
||||
, m_params(params)
|
||||
, m_cancel_promise(cancel_promise)
|
||||
{
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps④
|
||||
virtual void on_chunk(JS::Value chunk) override
|
||||
{
|
||||
// 1. Queue a microtask to perform the following steps:
|
||||
HTML::queue_a_microtask(nullptr, [this, chunk]() mutable {
|
||||
HTML::TemporaryExecutionContext execution_context { Bindings::host_defined_environment_settings_object(m_realm) };
|
||||
|
||||
auto controller1 = m_params->branch1->controller()->get<JS::NonnullGCPtr<ReadableByteStreamController>>();
|
||||
auto controller2 = m_params->branch2->controller()->get<JS::NonnullGCPtr<ReadableByteStreamController>>();
|
||||
|
||||
// 1. Set readAgainForBranch1 to false.
|
||||
m_params->read_again_for_branch1 = false;
|
||||
|
||||
// 2. Set readAgainForBranch2 to false.
|
||||
m_params->read_again_for_branch2 = false;
|
||||
|
||||
// 3. Let chunk1 and chunk2 be chunk.
|
||||
auto chunk1 = chunk;
|
||||
auto chunk2 = chunk;
|
||||
|
||||
// 4. If canceled1 is false and canceled2 is false,
|
||||
if (!m_params->canceled1 && !m_params->canceled2) {
|
||||
// 1. Let cloneResult be CloneAsUint8Array(chunk).
|
||||
auto chunk_view = m_realm->vm().heap().allocate<WebIDL::ArrayBufferView>(m_realm, chunk.as_object());
|
||||
auto clone_result = clone_as_uint8_array(m_realm, chunk_view);
|
||||
|
||||
// 2. If cloneResult is an abrupt completion,
|
||||
if (clone_result.is_exception()) {
|
||||
auto completion = Bindings::dom_exception_to_throw_completion(m_realm->vm(), clone_result.release_error());
|
||||
|
||||
// 1. Perform ! ReadableByteStreamControllerError(branch1.[[controller]], cloneResult.[[Value]]).
|
||||
readable_byte_stream_controller_error(controller1, completion.value().value());
|
||||
|
||||
// 2. Perform ! ReadableByteStreamControllerError(branch2.[[controller]], cloneResult.[[Value]]).
|
||||
readable_byte_stream_controller_error(controller2, completion.value().value());
|
||||
|
||||
// 3. Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]).
|
||||
auto cancel_result = MUST(readable_stream_cancel(m_stream, completion.value().value()));
|
||||
JS::NonnullGCPtr cancel_value = verify_cast<JS::Promise>(*cancel_result->promise().ptr());
|
||||
|
||||
WebIDL::resolve_promise(m_realm, m_cancel_promise, cancel_value);
|
||||
|
||||
// 4. Return.
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. Otherwise, set chunk2 to cloneResult.[[Value]].
|
||||
chunk2 = clone_result.release_value();
|
||||
}
|
||||
|
||||
// 5. If canceled1 is false, perform ! ReadableByteStreamControllerEnqueue(branch1.[[controller]], chunk1).
|
||||
if (!m_params->canceled1) {
|
||||
MUST(readable_byte_stream_controller_enqueue(controller1, chunk1));
|
||||
}
|
||||
|
||||
// 6. If canceled2 is false, perform ! ReadableByteStreamControllerEnqueue(branch2.[[controller]], chunk2).
|
||||
if (!m_params->canceled2) {
|
||||
MUST(readable_byte_stream_controller_enqueue(controller2, chunk2));
|
||||
}
|
||||
|
||||
// 7. Set reading to false.
|
||||
m_params->reading = false;
|
||||
|
||||
// 8. If readAgainForBranch1 is true, perform pull1Algorithm.
|
||||
if (m_params->read_again_for_branch1) {
|
||||
MUST(m_params->pull1_algorithm->function()());
|
||||
}
|
||||
// 9. Otherwise, if readAgainForBranch2 is true, perform pull2Algorithm.
|
||||
else if (m_params->read_again_for_branch2) {
|
||||
MUST(m_params->pull2_algorithm->function()());
|
||||
}
|
||||
});
|
||||
|
||||
// NOTE: The microtask delay here is necessary because it takes at least a microtask to detect errors, when we
|
||||
// use reader.[[closedPromise]] below. We want errors in stream to error both branches immediately, so we
|
||||
// cannot let successful synchronously-available reads happen ahead of asynchronously-available errors.
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#ref-for-read-request-close-steps③
|
||||
virtual void on_close() override
|
||||
{
|
||||
auto controller1 = m_params->branch1->controller()->get<JS::NonnullGCPtr<ReadableByteStreamController>>();
|
||||
auto controller2 = m_params->branch2->controller()->get<JS::NonnullGCPtr<ReadableByteStreamController>>();
|
||||
|
||||
// 1. Set reading to false.
|
||||
m_params->reading = false;
|
||||
|
||||
// 2. If canceled1 is false, perform ! ReadableByteStreamControllerClose(branch1.[[controller]]).
|
||||
if (!m_params->canceled1) {
|
||||
MUST(readable_byte_stream_controller_close(controller1));
|
||||
}
|
||||
|
||||
// 3. If canceled2 is false, perform ! ReadableByteStreamControllerClose(branch2.[[controller]]).
|
||||
if (!m_params->canceled2) {
|
||||
MUST(readable_byte_stream_controller_close(controller2));
|
||||
}
|
||||
|
||||
// 4. If branch1.[[controller]].[[pendingPullIntos]] is not empty, perform ! ReadableByteStreamControllerRespond(branch1.[[controller]], 0).
|
||||
if (!controller1->pending_pull_intos().is_empty()) {
|
||||
MUST(readable_byte_stream_controller_respond(controller1, 0));
|
||||
}
|
||||
|
||||
// 5. If branch2.[[controller]].[[pendingPullIntos]] is not empty, perform ! ReadableByteStreamControllerRespond(branch2.[[controller]], 0).
|
||||
if (!controller2->pending_pull_intos().is_empty()) {
|
||||
MUST(readable_byte_stream_controller_respond(controller2, 0));
|
||||
}
|
||||
|
||||
// 6. If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
|
||||
if (!m_params->canceled1 || !m_params->canceled2) {
|
||||
WebIDL::resolve_promise(m_realm, m_cancel_promise, JS::js_undefined());
|
||||
}
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#ref-for-read-request-error-steps④
|
||||
virtual void on_error(JS::Value) override
|
||||
{
|
||||
// 1. Set reading to false.
|
||||
m_params->reading = false;
|
||||
}
|
||||
|
||||
private:
|
||||
virtual void visit_edges(Visitor& visitor) override
|
||||
{
|
||||
Base::visit_edges(visitor);
|
||||
visitor.visit(m_realm);
|
||||
visitor.visit(m_stream);
|
||||
visitor.visit(m_params);
|
||||
visitor.visit(m_cancel_promise);
|
||||
}
|
||||
|
||||
JS::NonnullGCPtr<JS::Realm> m_realm;
|
||||
JS::NonnullGCPtr<ReadableStream> m_stream;
|
||||
JS::NonnullGCPtr<ByteStreamTeeParams> m_params;
|
||||
JS::NonnullGCPtr<WebIDL::Promise> m_cancel_promise;
|
||||
};
|
||||
|
||||
JS_DEFINE_ALLOCATOR(ByteStreamTeeDefaultReadRequest);
|
||||
|
||||
// https://streams.spec.whatwg.org/#ref-for-read-into-request②
|
||||
class ByteStreamTeeBYOBReadRequest final : public ReadIntoRequest {
|
||||
JS_CELL(ByteStreamTeeBYOBReadRequest, Cell);
|
||||
JS_DECLARE_ALLOCATOR(ByteStreamTeeBYOBReadRequest);
|
||||
|
||||
public:
|
||||
ByteStreamTeeBYOBReadRequest(
|
||||
JS::Realm& realm,
|
||||
JS::NonnullGCPtr<ReadableStream> stream,
|
||||
JS::NonnullGCPtr<ByteStreamTeeParams> params,
|
||||
JS::NonnullGCPtr<WebIDL::Promise> cancel_promise,
|
||||
JS::NonnullGCPtr<ReadableStream> byob_branch,
|
||||
JS::NonnullGCPtr<ReadableStream> other_branch,
|
||||
bool for_branch2)
|
||||
: m_realm(realm)
|
||||
, m_stream(stream)
|
||||
, m_params(params)
|
||||
, m_cancel_promise(cancel_promise)
|
||||
, m_byob_branch(byob_branch)
|
||||
, m_other_branch(other_branch)
|
||||
, m_for_branch2(for_branch2)
|
||||
{
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#ref-for-read-into-request-chunk-steps①
|
||||
virtual void on_chunk(JS::Value chunk) override
|
||||
{
|
||||
auto chunk_view = m_realm->vm().heap().allocate<WebIDL::ArrayBufferView>(m_realm, chunk.as_object());
|
||||
|
||||
// 1. Queue a microtask to perform the following steps:
|
||||
HTML::queue_a_microtask(nullptr, [this, chunk = chunk_view]() {
|
||||
HTML::TemporaryExecutionContext execution_context { Bindings::host_defined_environment_settings_object(m_realm) };
|
||||
|
||||
auto byob_controller = m_byob_branch->controller()->get<JS::NonnullGCPtr<ReadableByteStreamController>>();
|
||||
auto other_controller = m_other_branch->controller()->get<JS::NonnullGCPtr<ReadableByteStreamController>>();
|
||||
|
||||
// 1. Set readAgainForBranch1 to false.
|
||||
m_params->read_again_for_branch1 = false;
|
||||
|
||||
// 2. Set readAgainForBranch2 to false.
|
||||
m_params->read_again_for_branch2 = false;
|
||||
|
||||
// 3. Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise.
|
||||
auto byob_cancelled = m_for_branch2 ? m_params->canceled2 : m_params->canceled1;
|
||||
|
||||
// 4. Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise.
|
||||
auto other_cancelled = !m_for_branch2 ? m_params->canceled2 : m_params->canceled1;
|
||||
|
||||
// 5. If otherCanceled is false,
|
||||
if (!other_cancelled) {
|
||||
// 1. Let cloneResult be CloneAsUint8Array(chunk).
|
||||
auto clone_result = clone_as_uint8_array(m_realm, chunk);
|
||||
|
||||
// 2. If cloneResult is an abrupt completion,
|
||||
if (clone_result.is_exception()) {
|
||||
auto completion = Bindings::dom_exception_to_throw_completion(m_realm->vm(), clone_result.release_error());
|
||||
|
||||
// 1. Perform ! ReadableByteStreamControllerError(byobBranch.[[controller]], cloneResult.[[Value]]).
|
||||
readable_byte_stream_controller_error(byob_controller, completion.value().value());
|
||||
|
||||
// 2. Perform ! ReadableByteStreamControllerError(otherBranch.[[controller]], cloneResult.[[Value]]).
|
||||
readable_byte_stream_controller_error(other_controller, completion.value().value());
|
||||
|
||||
// 3. Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]).
|
||||
auto cancel_result = MUST(readable_stream_cancel(m_stream, completion.value().value()));
|
||||
JS::NonnullGCPtr cancel_value = verify_cast<JS::Promise>(*cancel_result->promise().ptr());
|
||||
|
||||
WebIDL::resolve_promise(m_realm, m_cancel_promise, cancel_value);
|
||||
|
||||
// 4. Return.
|
||||
return;
|
||||
}
|
||||
|
||||
// 3. Otherwise, let clonedChunk be cloneResult.[[Value]].
|
||||
auto cloned_chunk = clone_result.release_value();
|
||||
|
||||
// 4. If byobCanceled is false, perform ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
|
||||
if (!byob_cancelled) {
|
||||
MUST(readable_byte_stream_controller_respond_with_new_view(m_realm, byob_controller, chunk));
|
||||
}
|
||||
|
||||
// 5. Perform ! ReadableByteStreamControllerEnqueue(otherBranch.[[controller]], clonedChunk).
|
||||
MUST(readable_byte_stream_controller_enqueue(other_controller, cloned_chunk));
|
||||
}
|
||||
// 6. Otherwise, if byobCanceled is false, perform ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
|
||||
else if (!byob_cancelled) {
|
||||
MUST(readable_byte_stream_controller_respond_with_new_view(m_realm, byob_controller, chunk));
|
||||
}
|
||||
|
||||
// 7. Set reading to false.
|
||||
m_params->reading = false;
|
||||
|
||||
// 8. If readAgainForBranch1 is true, perform pull1Algorithm.
|
||||
if (m_params->read_again_for_branch1) {
|
||||
MUST(m_params->pull1_algorithm->function()());
|
||||
}
|
||||
// 9. Otherwise, if readAgainForBranch2 is true, perform pull2Algorithm.
|
||||
else if (m_params->read_again_for_branch2) {
|
||||
MUST(m_params->pull2_algorithm->function()());
|
||||
}
|
||||
});
|
||||
|
||||
// NOTE: The microtask delay here is necessary because it takes at least a microtask to detect errors, when we
|
||||
// use reader.[[closedPromise]] below. We want errors in stream to error both branches immediately, so we
|
||||
// cannot let successful synchronously-available reads happen ahead of asynchronously-available errors.
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#ref-for-read-into-request-close-steps②
|
||||
virtual void on_close(JS::Value chunk) override
|
||||
{
|
||||
auto byob_controller = m_byob_branch->controller()->get<JS::NonnullGCPtr<ReadableByteStreamController>>();
|
||||
auto other_controller = m_other_branch->controller()->get<JS::NonnullGCPtr<ReadableByteStreamController>>();
|
||||
|
||||
// 1. Set reading to false.
|
||||
m_params->reading = false;
|
||||
|
||||
// 2. Let byobCanceled be canceled2 if forBranch2 is true, and canceled1 otherwise.
|
||||
auto byob_cancelled = m_for_branch2 ? m_params->canceled2 : m_params->canceled1;
|
||||
|
||||
// 3. Let otherCanceled be canceled2 if forBranch2 is false, and canceled1 otherwise.
|
||||
auto other_cancelled = !m_for_branch2 ? m_params->canceled2 : m_params->canceled1;
|
||||
|
||||
// 4. If byobCanceled is false, perform ! ReadableByteStreamControllerClose(byobBranch.[[controller]]).
|
||||
if (!byob_cancelled) {
|
||||
MUST(readable_byte_stream_controller_close(byob_controller));
|
||||
}
|
||||
|
||||
// 5. If otherCanceled is false, perform ! ReadableByteStreamControllerClose(otherBranch.[[controller]]).
|
||||
if (!other_cancelled) {
|
||||
MUST(readable_byte_stream_controller_close(other_controller));
|
||||
}
|
||||
|
||||
// 6. If chunk is not undefined,
|
||||
if (!chunk.is_undefined()) {
|
||||
// 1. Assert: chunk.[[ByteLength]] is 0.
|
||||
|
||||
// 2. If byobCanceled is false, perform ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk).
|
||||
if (!byob_cancelled) {
|
||||
auto array_buffer_view = m_realm->vm().heap().allocate<WebIDL::ArrayBufferView>(m_realm, chunk.as_object());
|
||||
MUST(readable_byte_stream_controller_respond_with_new_view(m_realm, byob_controller, array_buffer_view));
|
||||
}
|
||||
|
||||
// 3. If otherCanceled is false and otherBranch.[[controller]].[[pendingPullIntos]] is not empty,
|
||||
// perform ! ReadableByteStreamControllerRespond(otherBranch.[[controller]], 0).
|
||||
if (!other_cancelled && !other_controller->pending_pull_intos().is_empty()) {
|
||||
MUST(readable_byte_stream_controller_respond(other_controller, 0));
|
||||
}
|
||||
}
|
||||
|
||||
// 7. If byobCanceled is false or otherCanceled is false, resolve cancelPromise with undefined.
|
||||
if (!byob_cancelled || !other_cancelled) {
|
||||
WebIDL::resolve_promise(m_realm, m_cancel_promise, JS::js_undefined());
|
||||
}
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#ref-for-read-into-request-error-steps①
|
||||
virtual void on_error(JS::Value) override
|
||||
{
|
||||
// 1. Set reading to false.
|
||||
m_params->reading = false;
|
||||
}
|
||||
|
||||
private:
|
||||
virtual void visit_edges(Visitor& visitor) override
|
||||
{
|
||||
Base::visit_edges(visitor);
|
||||
visitor.visit(m_realm);
|
||||
visitor.visit(m_stream);
|
||||
visitor.visit(m_params);
|
||||
visitor.visit(m_cancel_promise);
|
||||
visitor.visit(m_byob_branch);
|
||||
visitor.visit(m_other_branch);
|
||||
}
|
||||
|
||||
JS::NonnullGCPtr<JS::Realm> m_realm;
|
||||
JS::NonnullGCPtr<ReadableStream> m_stream;
|
||||
JS::NonnullGCPtr<ByteStreamTeeParams> m_params;
|
||||
JS::NonnullGCPtr<WebIDL::Promise> m_cancel_promise;
|
||||
JS::NonnullGCPtr<ReadableStream> m_byob_branch;
|
||||
JS::NonnullGCPtr<ReadableStream> m_other_branch;
|
||||
bool m_for_branch2 { false };
|
||||
};
|
||||
|
||||
JS_DEFINE_ALLOCATOR(ByteStreamTeeBYOBReadRequest);
|
||||
|
||||
// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee
|
||||
WebIDL::ExceptionOr<ReadableStreamPair> readable_byte_stream_tee(JS::Realm& realm, ReadableStream& stream)
|
||||
{
|
||||
// 1. Assert: stream implements ReadableStream.
|
||||
// 2. Assert: stream.[[controller]] implements ReadableByteStreamController.
|
||||
VERIFY(stream.controller().has_value() && stream.controller()->has<JS::NonnullGCPtr<ReadableByteStreamController>>());
|
||||
|
||||
// 3. Let reader be ? AcquireReadableStreamDefaultReader(stream).
|
||||
auto reader = TRY(acquire_readable_stream_default_reader(stream));
|
||||
|
||||
// 4. Let reading be false.
|
||||
// 5. Let readAgainForBranch1 be false.
|
||||
// 6. Let readAgainForBranch2 be false.
|
||||
// 7. Let canceled1 be false.
|
||||
// 8. Let canceled2 be false.
|
||||
// 9. Let reason1 be undefined.
|
||||
// 10. Let reason2 be undefined.
|
||||
// 11. Let branch1 be undefined.
|
||||
// 12. Let branch2 be undefined.
|
||||
auto params = realm.heap().allocate<ByteStreamTeeParams>(realm, reader);
|
||||
|
||||
// 13. Let cancelPromise be a new promise.
|
||||
auto cancel_promise = WebIDL::create_promise(realm);
|
||||
|
||||
// 14. Let forwardReaderError be the following steps, taking a thisReader argument:
|
||||
auto forward_reader_error = JS::create_heap_function(realm.heap(), [&realm, params, cancel_promise](ReadableStreamReader const& this_reader) {
|
||||
// 1. Upon rejection of thisReader.[[closedPromise]] with reason r,
|
||||
auto closed_promise = this_reader.visit([](auto const& underlying_reader) { return underlying_reader->closed_promise_capability(); });
|
||||
|
||||
WebIDL::upon_rejection(*closed_promise, [&realm, this_reader, params, cancel_promise](auto reason) -> WebIDL::ExceptionOr<JS::Value> {
|
||||
auto controller1 = params->branch1->controller()->get<JS::NonnullGCPtr<ReadableByteStreamController>>();
|
||||
auto controller2 = params->branch2->controller()->get<JS::NonnullGCPtr<ReadableByteStreamController>>();
|
||||
|
||||
// 1. If thisReader is not reader, return.
|
||||
if (this_reader != params->reader) {
|
||||
return JS::js_undefined();
|
||||
}
|
||||
|
||||
// 2. Perform ! ReadableByteStreamControllerError(branch1.[[controller]], r).
|
||||
readable_byte_stream_controller_error(controller1, reason);
|
||||
|
||||
// 3. Perform ! ReadableByteStreamControllerError(branch2.[[controller]], r).
|
||||
readable_byte_stream_controller_error(controller2, reason);
|
||||
|
||||
// 4. If canceled1 is false or canceled2 is false, resolve cancelPromise with undefined.
|
||||
if (!params->canceled1 || !params->canceled2) {
|
||||
WebIDL::resolve_promise(realm, cancel_promise, JS::js_undefined());
|
||||
}
|
||||
|
||||
return JS::js_undefined();
|
||||
});
|
||||
});
|
||||
|
||||
// 15. Let pullWithDefaultReader be the following steps:
|
||||
auto pull_with_default_reader = JS::create_heap_function(realm.heap(), [&realm, &stream, params, cancel_promise, forward_reader_error]() mutable {
|
||||
// 1. If reader implements ReadableStreamBYOBReader,
|
||||
if (auto const* byob_reader = params->reader.get_pointer<JS::NonnullGCPtr<ReadableStreamBYOBReader>>()) {
|
||||
// 1. Assert: reader.[[readIntoRequests]] is empty.
|
||||
VERIFY((*byob_reader)->read_into_requests().is_empty());
|
||||
|
||||
// 2. Perform ! ReadableStreamBYOBReaderRelease(reader).
|
||||
readable_stream_byob_reader_release(*byob_reader);
|
||||
|
||||
// 3. Set reader to ! AcquireReadableStreamDefaultReader(stream).
|
||||
params->reader = MUST(acquire_readable_stream_default_reader(stream));
|
||||
|
||||
// 4. Perform forwardReaderError, given reader.
|
||||
forward_reader_error->function()(params->reader);
|
||||
}
|
||||
|
||||
// 2. Let readRequest be a read request with the following items:
|
||||
auto read_request = realm.heap().allocate_without_realm<ByteStreamTeeDefaultReadRequest>(realm, stream, params, cancel_promise);
|
||||
|
||||
// 3. Perform ! ReadableStreamDefaultReaderRead(reader, readRequest).
|
||||
MUST(readable_stream_default_reader_read(params->reader.get<JS::NonnullGCPtr<ReadableStreamDefaultReader>>(), read_request));
|
||||
});
|
||||
|
||||
// 16. Let pullWithBYOBReader be the following steps, given view and forBranch2:
|
||||
auto pull_with_byob_reader = JS::create_heap_function(realm.heap(), [&realm, &stream, params, cancel_promise, forward_reader_error](JS::NonnullGCPtr<WebIDL::ArrayBufferView> view, bool for_branch2) mutable {
|
||||
// 1. If reader implements ReadableStreamDefaultReader,
|
||||
if (auto const* default_reader = params->reader.get_pointer<JS::NonnullGCPtr<ReadableStreamDefaultReader>>()) {
|
||||
// 2. Assert: reader.[[readRequests]] is empty.
|
||||
VERIFY((*default_reader)->read_requests().is_empty());
|
||||
|
||||
// 3. Perform ! ReadableStreamDefaultReaderRelease(reader).
|
||||
MUST(readable_stream_default_reader_release(*default_reader));
|
||||
|
||||
// 4. Set reader to ! AcquireReadableStreamBYOBReader(stream).
|
||||
params->reader = MUST(acquire_readable_stream_byob_reader(stream));
|
||||
|
||||
// 5. Perform forwardReaderError, given reader.
|
||||
forward_reader_error->function()(params->reader);
|
||||
};
|
||||
|
||||
// 2. Let byobBranch be branch2 if forBranch2 is true, and branch1 otherwise.
|
||||
auto byob_branch = for_branch2 ? params->branch2 : params->branch1;
|
||||
|
||||
// 3. Let otherBranch be branch2 if forBranch2 is false, and branch1 otherwise.
|
||||
auto other_branch = !for_branch2 ? params->branch2 : params->branch1;
|
||||
|
||||
// 4. Let readIntoRequest be a read-into request with the following items:
|
||||
auto read_into_request = realm.heap().allocate_without_realm<ByteStreamTeeBYOBReadRequest>(realm, stream, params, cancel_promise, *byob_branch, *other_branch, for_branch2);
|
||||
|
||||
// 5. Perform ! ReadableStreamBYOBReaderRead(reader, view, 1, readIntoRequest).
|
||||
readable_stream_byob_reader_read(params->reader.get<JS::NonnullGCPtr<ReadableStreamBYOBReader>>(), view, read_into_request);
|
||||
});
|
||||
|
||||
// 17. Let pull1Algorithm be the following steps:
|
||||
auto pull1_algorithm = JS::create_heap_function(realm.heap(), [&realm, params, pull_with_default_reader, pull_with_byob_reader]() -> WebIDL::ExceptionOr<JS::NonnullGCPtr<WebIDL::Promise>> {
|
||||
auto controller1 = params->branch1->controller()->get<JS::NonnullGCPtr<ReadableByteStreamController>>();
|
||||
|
||||
// 1. If reading is true,
|
||||
if (params->reading) {
|
||||
// 1. Set readAgainForBranch1 to true.
|
||||
params->read_again_for_branch1 = true;
|
||||
|
||||
// 2. Return a promise resolved with undefined.
|
||||
return WebIDL::create_resolved_promise(realm, JS::js_undefined());
|
||||
}
|
||||
|
||||
// 2. Set reading to true.
|
||||
params->reading = true;
|
||||
|
||||
// 3. Let byobRequest be ! ReadableByteStreamControllerGetBYOBRequest(branch1.[[controller]]).
|
||||
auto byob_request = readable_byte_stream_controller_get_byob_request(controller1);
|
||||
|
||||
// 4. If byobRequest is null, perform pullWithDefaultReader.
|
||||
if (!byob_request) {
|
||||
pull_with_default_reader->function()();
|
||||
}
|
||||
// 5. Otherwise, perform pullWithBYOBReader, given byobRequest.[[view]] and false.
|
||||
else {
|
||||
pull_with_byob_reader->function()(*byob_request->view(), false);
|
||||
}
|
||||
|
||||
// 6. Return a promise resolved with undefined.
|
||||
return WebIDL::create_resolved_promise(realm, JS::js_undefined());
|
||||
});
|
||||
|
||||
// 18. Let pull2Algorithm be the following steps:
|
||||
auto pull2_algorithm = JS::create_heap_function(realm.heap(), [&realm, params, pull_with_default_reader, pull_with_byob_reader]() -> WebIDL::ExceptionOr<JS::NonnullGCPtr<WebIDL::Promise>> {
|
||||
auto controller2 = params->branch2->controller()->get<JS::NonnullGCPtr<ReadableByteStreamController>>();
|
||||
|
||||
// 1. If reading is true,
|
||||
if (params->reading) {
|
||||
// 1. Set readAgainForBranch2 to true.
|
||||
params->read_again_for_branch2 = true;
|
||||
|
||||
// 2. Return a promise resolved with undefined.
|
||||
return WebIDL::create_resolved_promise(realm, JS::js_undefined());
|
||||
}
|
||||
|
||||
// 2. Set reading to true.
|
||||
params->reading = true;
|
||||
|
||||
// 3. Let byobRequest be ! ReadableByteStreamControllerGetBYOBRequest(branch2.[[controller]]).
|
||||
auto byob_request = readable_byte_stream_controller_get_byob_request(controller2);
|
||||
|
||||
// 4. If byobRequest is null, perform pullWithDefaultReader.
|
||||
if (!byob_request) {
|
||||
pull_with_default_reader->function()();
|
||||
}
|
||||
// 5. Otherwise, perform pullWithBYOBReader, given byobRequest.[[view]] and true.
|
||||
else {
|
||||
pull_with_byob_reader->function()(*byob_request->view(), true);
|
||||
}
|
||||
|
||||
// 6. Return a promise resolved with undefined.
|
||||
return WebIDL::create_resolved_promise(realm, JS::js_undefined());
|
||||
});
|
||||
|
||||
// AD-HOC: The read requests within the pull algorithms must be able to re-invoke the pull algorithms, so cache them here.
|
||||
params->pull1_algorithm = pull1_algorithm;
|
||||
params->pull2_algorithm = pull2_algorithm;
|
||||
|
||||
// 19. Let cancel1Algorithm be the following steps, taking a reason argument:
|
||||
auto cancel1_algorithm = JS::create_heap_function(realm.heap(), [&realm, &stream, params, cancel_promise](JS::Value reason) -> WebIDL::ExceptionOr<JS::NonnullGCPtr<WebIDL::Promise>> {
|
||||
// 1. Set canceled1 to true.
|
||||
params->canceled1 = true;
|
||||
|
||||
// 2. Set reason1 to reason.
|
||||
params->reason1 = reason;
|
||||
|
||||
// 3. If canceled2 is true,
|
||||
if (params->canceled2) {
|
||||
// 1. Let compositeReason be ! CreateArrayFromList(« reason1, reason2 »).
|
||||
auto composite_reason = JS::Array::create_from(realm, AK::Array { params->reason1, params->reason2 });
|
||||
|
||||
// 2. Let cancelResult be ! ReadableStreamCancel(stream, compositeReason).
|
||||
auto cancel_result = MUST(readable_stream_cancel(stream, composite_reason));
|
||||
|
||||
// 3. Resolve cancelPromise with cancelResult.
|
||||
JS::NonnullGCPtr cancel_value = verify_cast<JS::Promise>(*cancel_result->promise().ptr());
|
||||
WebIDL::resolve_promise(realm, cancel_promise, cancel_value);
|
||||
}
|
||||
|
||||
// 4. Return cancelPromise.
|
||||
return cancel_promise;
|
||||
});
|
||||
|
||||
// 20. Let cancel2Algorithm be the following steps, taking a reason argument:
|
||||
auto cancel2_algorithm = JS::create_heap_function(realm.heap(), [&realm, &stream, params, cancel_promise](JS::Value reason) -> WebIDL::ExceptionOr<JS::NonnullGCPtr<WebIDL::Promise>> {
|
||||
// 1. Set canceled2 to true.
|
||||
params->canceled2 = true;
|
||||
|
||||
// 2. Set reason2 to reason.
|
||||
params->reason2 = reason;
|
||||
|
||||
// 3. If canceled1 is true,
|
||||
if (params->canceled1) {
|
||||
// 1. Let compositeReason be ! CreateArrayFromList(« reason1, reason2 »).
|
||||
auto composite_reason = JS::Array::create_from(realm, AK::Array { params->reason1, params->reason2 });
|
||||
|
||||
// 2. Let cancelResult be ! ReadableStreamCancel(stream, compositeReason).
|
||||
auto cancel_result = MUST(readable_stream_cancel(stream, composite_reason));
|
||||
|
||||
// 3. Resolve cancelPromise with cancelResult.
|
||||
JS::NonnullGCPtr cancel_value = verify_cast<JS::Promise>(*cancel_result->promise().ptr());
|
||||
WebIDL::resolve_promise(realm, cancel_promise, cancel_value);
|
||||
}
|
||||
|
||||
// 4. Return cancelPromise.
|
||||
return cancel_promise;
|
||||
});
|
||||
|
||||
// 21. Let startAlgorithm be an algorithm that returns undefined.
|
||||
auto start_algorithm = JS::create_heap_function(realm.heap(), []() -> WebIDL::ExceptionOr<JS::Value> {
|
||||
return JS::js_undefined();
|
||||
});
|
||||
|
||||
// 22. Set branch1 to ! CreateReadableByteStream(startAlgorithm, pull1Algorithm, cancel1Algorithm).
|
||||
params->branch1 = MUST(create_readable_byte_stream(realm, start_algorithm, pull1_algorithm, cancel1_algorithm));
|
||||
|
||||
// 23. Set branch2 to ! CreateReadableByteStream(startAlgorithm, pull2Algorithm, cancel2Algorithm).
|
||||
params->branch2 = MUST(create_readable_byte_stream(realm, start_algorithm, pull2_algorithm, cancel2_algorithm));
|
||||
|
||||
// 24. Perform forwardReaderError, given reader.
|
||||
forward_reader_error->function()(reader);
|
||||
|
||||
// 25. Return « branch1, branch2 ».
|
||||
return ReadableStreamPair { *params->branch1, *params->branch2 };
|
||||
}
|
||||
|
||||
// https://streams.spec.whatwg.org/#make-size-algorithm-from-size-function
|
||||
JS::NonnullGCPtr<SizeAlgorithm> extract_size_algorithm(JS::VM& vm, QueuingStrategy const& strategy)
|
||||
{
|
||||
|
|
|
@ -49,6 +49,7 @@ bool readable_stream_has_default_reader(ReadableStream const&);
|
|||
|
||||
WebIDL::ExceptionOr<ReadableStreamPair> readable_stream_tee(JS::Realm&, ReadableStream&, bool clone_for_branch2);
|
||||
WebIDL::ExceptionOr<ReadableStreamPair> readable_stream_default_tee(JS::Realm& realm, ReadableStream& stream, bool clone_for_branch2);
|
||||
WebIDL::ExceptionOr<ReadableStreamPair> readable_byte_stream_tee(JS::Realm& realm, ReadableStream& stream);
|
||||
|
||||
WebIDL::ExceptionOr<JS::NonnullGCPtr<WebIDL::Promise>> readable_stream_reader_generic_cancel(ReadableStreamGenericReaderMixin&, JS::Value reason);
|
||||
void readable_stream_reader_generic_initialize(ReadableStreamReader, ReadableStream&);
|
||||
|
|
Loading…
Reference in a new issue