ReadableStream.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. /*
  2. * Copyright (c) 2022, Linus Groh <linusg@serenityos.org>
  3. * Copyright (c) 2023-2024, Shannon Booth <shannon@serenityos.org>
  4. * Copyright (c) 2024, Kenneth Myhra <kennethmyhra@serenityos.org>
  5. *
  6. * SPDX-License-Identifier: BSD-2-Clause
  7. */
  8. #include <LibJS/Runtime/PromiseCapability.h>
  9. #include <LibJS/Runtime/TypedArray.h>
  10. #include <LibWeb/Bindings/Intrinsics.h>
  11. #include <LibWeb/Bindings/ReadableStreamPrototype.h>
  12. #include <LibWeb/DOM/AbortSignal.h>
  13. #include <LibWeb/Streams/AbstractOperations.h>
  14. #include <LibWeb/Streams/ReadableByteStreamController.h>
  15. #include <LibWeb/Streams/ReadableStream.h>
  16. #include <LibWeb/Streams/ReadableStreamBYOBReader.h>
  17. #include <LibWeb/Streams/ReadableStreamDefaultController.h>
  18. #include <LibWeb/Streams/ReadableStreamDefaultReader.h>
  19. #include <LibWeb/Streams/UnderlyingSource.h>
  20. #include <LibWeb/WebIDL/ExceptionOr.h>
  21. namespace Web::Streams {
  22. GC_DEFINE_ALLOCATOR(ReadableStream);
  23. // https://streams.spec.whatwg.org/#rs-constructor
  24. WebIDL::ExceptionOr<GC::Ref<ReadableStream>> ReadableStream::construct_impl(JS::Realm& realm, Optional<GC::Root<JS::Object>> const& underlying_source_object, QueuingStrategy const& strategy)
  25. {
  26. auto& vm = realm.vm();
  27. auto readable_stream = realm.create<ReadableStream>(realm);
  28. // 1. If underlyingSource is missing, set it to null.
  29. auto underlying_source = underlying_source_object.has_value() ? JS::Value(underlying_source_object.value()) : JS::js_null();
  30. // 2. Let underlyingSourceDict be underlyingSource, converted to an IDL value of type UnderlyingSource.
  31. auto underlying_source_dict = TRY(UnderlyingSource::from_value(vm, underlying_source));
  32. // 3. Perform ! InitializeReadableStream(this).
  33. // 4. If underlyingSourceDict["type"] is "bytes":
  34. if (underlying_source_dict.type.has_value() && underlying_source_dict.type.value() == ReadableStreamType::Bytes) {
  35. // 1. If strategy["size"] exists, throw a RangeError exception.
  36. if (strategy.size)
  37. return WebIDL::SimpleException { WebIDL::SimpleExceptionType::RangeError, "Size strategy not allowed for byte stream"sv };
  38. // 2. Let highWaterMark be ? ExtractHighWaterMark(strategy, 0).
  39. auto high_water_mark = TRY(extract_high_water_mark(strategy, 0));
  40. // 3. Perform ? SetUpReadableByteStreamControllerFromUnderlyingSource(this, underlyingSource, underlyingSourceDict, highWaterMark).
  41. TRY(set_up_readable_byte_stream_controller_from_underlying_source(*readable_stream, underlying_source, underlying_source_dict, high_water_mark));
  42. }
  43. // 5. Otherwise,
  44. else {
  45. // 1. Assert: underlyingSourceDict["type"] does not exist.
  46. VERIFY(!underlying_source_dict.type.has_value());
  47. // 2. Let sizeAlgorithm be ! ExtractSizeAlgorithm(strategy).
  48. auto size_algorithm = extract_size_algorithm(vm, strategy);
  49. // 3. Let highWaterMark be ? ExtractHighWaterMark(strategy, 1).
  50. auto high_water_mark = TRY(extract_high_water_mark(strategy, 1));
  51. // 4. Perform ? SetUpReadableStreamDefaultControllerFromUnderlyingSource(this, underlyingSource, underlyingSourceDict, highWaterMark, sizeAlgorithm).
  52. TRY(set_up_readable_stream_default_controller_from_underlying_source(*readable_stream, underlying_source, underlying_source_dict, high_water_mark, size_algorithm));
  53. }
  54. return readable_stream;
  55. }
  56. // https://streams.spec.whatwg.org/#rs-from
  57. WebIDL::ExceptionOr<GC::Ref<ReadableStream>> ReadableStream::from(JS::VM& vm, JS::Value async_iterable)
  58. {
  59. // 1. Return ? ReadableStreamFromIterable(asyncIterable).
  60. return TRY(readable_stream_from_iterable(vm, async_iterable));
  61. }
  62. ReadableStream::ReadableStream(JS::Realm& realm)
  63. : PlatformObject(realm)
  64. {
  65. }
  66. ReadableStream::~ReadableStream() = default;
  67. // https://streams.spec.whatwg.org/#rs-locked
  68. bool ReadableStream::locked() const
  69. {
  70. // 1. Return ! IsReadableStreamLocked(this).
  71. return is_readable_stream_locked(*this);
  72. }
  73. // https://streams.spec.whatwg.org/#rs-cancel
  74. GC::Ref<WebIDL::Promise> ReadableStream::cancel(JS::Value reason)
  75. {
  76. auto& realm = this->realm();
  77. // 1. If ! IsReadableStreamLocked(this) is true, return a promise rejected with a TypeError exception.
  78. if (is_readable_stream_locked(*this)) {
  79. auto exception = JS::TypeError::create(realm, "Cannot cancel a locked stream"sv);
  80. return WebIDL::create_rejected_promise(realm, exception);
  81. }
  82. // 2. Return ! ReadableStreamCancel(this, reason).
  83. return readable_stream_cancel(*this, reason);
  84. }
  85. // https://streams.spec.whatwg.org/#rs-get-reader
  86. WebIDL::ExceptionOr<ReadableStreamReader> ReadableStream::get_reader(ReadableStreamGetReaderOptions const& options)
  87. {
  88. // 1. If options["mode"] does not exist, return ? AcquireReadableStreamDefaultReader(this).
  89. if (!options.mode.has_value())
  90. return ReadableStreamReader { TRY(acquire_readable_stream_default_reader(*this)) };
  91. // 2. Assert: options["mode"] is "byob".
  92. VERIFY(*options.mode == Bindings::ReadableStreamReaderMode::Byob);
  93. // 3. Return ? AcquireReadableStreamBYOBReader(this).
  94. return ReadableStreamReader { TRY(acquire_readable_stream_byob_reader(*this)) };
  95. }
  96. // https://streams.spec.whatwg.org/#rs-pipe-through
  97. WebIDL::ExceptionOr<GC::Ref<ReadableStream>> ReadableStream::pipe_through(ReadableWritablePair transform, StreamPipeOptions const& options)
  98. {
  99. // 1. If ! IsReadableStreamLocked(this) is true, throw a TypeError exception.
  100. if (is_readable_stream_locked(*this))
  101. return WebIDL::SimpleException { WebIDL::SimpleExceptionType::TypeError, "Failed to execute 'pipeThrough' on 'ReadableStream': Cannot pipe a locked stream"sv };
  102. // 2. If ! IsWritableStreamLocked(transform["writable"]) is true, throw a TypeError exception.
  103. if (is_writable_stream_locked(*transform.writable))
  104. return WebIDL::SimpleException { WebIDL::SimpleExceptionType::TypeError, "Failed to execute 'pipeThrough' on 'ReadableStream': parameter 1's 'writable' is locked"sv };
  105. // 3. Let signal be options["signal"] if it exists, or undefined otherwise.
  106. auto signal = options.signal ? JS::Value(options.signal) : JS::js_undefined();
  107. // 4. Let promise be ! ReadableStreamPipeTo(this, transform["writable"], options["preventClose"], options["preventAbort"], options["preventCancel"], signal).
  108. auto promise = readable_stream_pipe_to(*this, *transform.writable, options.prevent_close, options.prevent_abort, options.prevent_cancel, signal);
  109. // 5. Set promise.[[PromiseIsHandled]] to true.
  110. WebIDL::mark_promise_as_handled(*promise);
  111. // 6. Return transform["readable"].
  112. return GC::Ref { *transform.readable };
  113. }
  114. // https://streams.spec.whatwg.org/#rs-pipe-to
  115. GC::Ref<WebIDL::Promise> ReadableStream::pipe_to(WritableStream& destination, StreamPipeOptions const& options)
  116. {
  117. auto& realm = this->realm();
  118. auto& vm = realm.vm();
  119. // 1. If ! IsReadableStreamLocked(this) is true, return a promise rejected with a TypeError exception.
  120. if (is_readable_stream_locked(*this)) {
  121. return WebIDL::create_rejected_promise_from_exception(realm, vm.throw_completion<JS::TypeError>("Failed to execute 'pipeTo' on 'ReadableStream': Cannot pipe a locked stream"sv));
  122. }
  123. // 2. If ! IsWritableStreamLocked(destination) is true, return a promise rejected with a TypeError exception.
  124. if (is_writable_stream_locked(destination)) {
  125. return WebIDL::create_rejected_promise_from_exception(realm, vm.throw_completion<JS::TypeError>("Failed to execute 'pipeTo' on 'ReadableStream': Cannot pipe to a locked stream"sv));
  126. }
  127. // 3. Let signal be options["signal"] if it exists, or undefined otherwise.
  128. auto signal = options.signal ? JS::Value(options.signal) : JS::js_undefined();
  129. // 4. Return ! ReadableStreamPipeTo(this, destination, options["preventClose"], options["preventAbort"], options["preventCancel"], signal).
  130. return readable_stream_pipe_to(*this, destination, options.prevent_close, options.prevent_abort, options.prevent_cancel, signal);
  131. }
  132. // https://streams.spec.whatwg.org/#readablestream-tee
  133. WebIDL::ExceptionOr<ReadableStreamPair> ReadableStream::tee()
  134. {
  135. // To tee a ReadableStream stream, return ? ReadableStreamTee(stream, true).
  136. return TRY(readable_stream_tee(realm(), *this, true));
  137. }
  138. // https://streams.spec.whatwg.org/#readablestream-close
  139. void ReadableStream::close()
  140. {
  141. controller()->visit(
  142. // 1. If stream.[[controller]] implements ReadableByteStreamController
  143. [&](GC::Ref<ReadableByteStreamController> controller) {
  144. // 1. Perform ! ReadableByteStreamControllerClose(stream.[[controller]]).
  145. MUST(readable_byte_stream_controller_close(controller));
  146. // 2. If stream.[[controller]].[[pendingPullIntos]] is not empty, perform ! ReadableByteStreamControllerRespond(stream.[[controller]], 0).
  147. if (!controller->pending_pull_intos().is_empty())
  148. MUST(readable_byte_stream_controller_respond(controller, 0));
  149. },
  150. // 2. Otherwise, perform ! ReadableStreamDefaultControllerClose(stream.[[controller]]).
  151. [&](GC::Ref<ReadableStreamDefaultController> controller) {
  152. readable_stream_default_controller_close(*controller);
  153. });
  154. }
  155. // https://streams.spec.whatwg.org/#readablestream-error
  156. void ReadableStream::error(JS::Value error)
  157. {
  158. controller()->visit(
  159. // 1. If stream.[[controller]] implements ReadableByteStreamController, then perform
  160. // ! ReadableByteStreamControllerError(stream.[[controller]], e).
  161. [&](GC::Ref<ReadableByteStreamController> controller) {
  162. readable_byte_stream_controller_error(controller, error);
  163. },
  164. // 2. Otherwise, perform ! ReadableStreamDefaultControllerError(stream.[[controller]], e).
  165. [&](GC::Ref<ReadableStreamDefaultController> controller) {
  166. readable_stream_default_controller_error(controller, error);
  167. });
  168. }
  169. void ReadableStream::initialize(JS::Realm& realm)
  170. {
  171. Base::initialize(realm);
  172. WEB_SET_PROTOTYPE_FOR_INTERFACE(ReadableStream);
  173. }
  174. void ReadableStream::visit_edges(Cell::Visitor& visitor)
  175. {
  176. Base::visit_edges(visitor);
  177. if (m_controller.has_value())
  178. m_controller->visit([&](auto& controller) { visitor.visit(controller); });
  179. visitor.visit(m_stored_error);
  180. if (m_reader.has_value())
  181. m_reader->visit([&](auto& reader) { visitor.visit(reader); });
  182. }
  183. // https://streams.spec.whatwg.org/#readablestream-locked
  184. bool ReadableStream::is_readable() const
  185. {
  186. // A ReadableStream stream is readable if stream.[[state]] is "readable".
  187. return m_state == State::Readable;
  188. }
  189. // https://streams.spec.whatwg.org/#readablestream-closed
  190. bool ReadableStream::is_closed() const
  191. {
  192. // A ReadableStream stream is closed if stream.[[state]] is "closed".
  193. return m_state == State::Closed;
  194. }
  195. // https://streams.spec.whatwg.org/#readablestream-errored
  196. bool ReadableStream::is_errored() const
  197. {
  198. // A ReadableStream stream is errored if stream.[[state]] is "errored".
  199. return m_state == State::Errored;
  200. }
  201. // https://streams.spec.whatwg.org/#readablestream-locked
  202. bool ReadableStream::is_locked() const
  203. {
  204. // A ReadableStream stream is locked if ! IsReadableStreamLocked(stream) returns true.
  205. return is_readable_stream_locked(*this);
  206. }
  207. // https://streams.spec.whatwg.org/#is-readable-stream-disturbed
  208. bool ReadableStream::is_disturbed() const
  209. {
  210. // A ReadableStream stream is disturbed if stream.[[disturbed]] is true.
  211. return m_disturbed;
  212. }
  213. // https://streams.spec.whatwg.org/#readablestream-pull-from-bytes
  214. WebIDL::ExceptionOr<void> ReadableStream::pull_from_bytes(ByteBuffer bytes)
  215. {
  216. auto& realm = this->realm();
  217. // 1. Assert: stream.[[controller]] implements ReadableByteStreamController.
  218. auto& controller = this->controller()->get<GC::Ref<ReadableByteStreamController>>();
  219. // 2. Let available be bytes’s length.
  220. auto available = bytes.size();
  221. // 3. Let desiredSize be available.
  222. auto desired_size = available;
  223. // FIXME: 4. If stream’s current BYOB request view is non-null, then set desiredSize to stream’s current BYOB request
  224. // view's byte length.
  225. // 5. Let pullSize be the smaller value of available and desiredSize.
  226. auto pull_size = min(available, desired_size);
  227. // 6. Let pulled be the first pullSize bytes of bytes.
  228. auto pulled = pull_size == available ? move(bytes) : MUST(bytes.slice(0, pull_size));
  229. // 7. Remove the first pullSize bytes from bytes.
  230. if (pull_size != available)
  231. bytes = MUST(bytes.slice(pull_size, available - pull_size));
  232. // FIXME: 8. If stream’s current BYOB request view is non-null, then:
  233. // 1. Write pulled into stream’s current BYOB request view.
  234. // 2. Perform ? ReadableByteStreamControllerRespond(stream.[[controller]], pullSize).
  235. // 9. Otherwise,
  236. {
  237. // 1. Set view to the result of creating a Uint8Array from pulled in stream’s relevant Realm.
  238. auto array_buffer = JS::ArrayBuffer::create(realm, move(pulled));
  239. auto view = JS::Uint8Array::create(realm, array_buffer->byte_length(), *array_buffer);
  240. // 2. Perform ? ReadableByteStreamControllerEnqueue(stream.[[controller]], view).
  241. TRY(readable_byte_stream_controller_enqueue(controller, view));
  242. }
  243. return {};
  244. }
  245. // https://streams.spec.whatwg.org/#readablestream-enqueue
  246. WebIDL::ExceptionOr<void> ReadableStream::enqueue(JS::Value chunk)
  247. {
  248. VERIFY(m_controller.has_value());
  249. // 1. If stream.[[controller]] implements ReadableStreamDefaultController,
  250. if (m_controller->has<GC::Ref<ReadableStreamDefaultController>>()) {
  251. // 1. Perform ! ReadableStreamDefaultControllerEnqueue(stream.[[controller]], chunk).
  252. return readable_stream_default_controller_enqueue(m_controller->get<GC::Ref<ReadableStreamDefaultController>>(), chunk);
  253. }
  254. // 2. Otherwise,
  255. else {
  256. // 1. Assert: stream.[[controller]] implements ReadableByteStreamController.
  257. VERIFY(m_controller->has<GC::Ref<ReadableByteStreamController>>());
  258. auto readable_byte_controller = m_controller->get<GC::Ref<ReadableByteStreamController>>();
  259. // FIXME: 2. Assert: chunk is an ArrayBufferView.
  260. // 3. Let byobView be the current BYOB request view for stream.
  261. // FIXME: This is not what the spec means by 'current BYOB request view'
  262. auto byob_view = readable_byte_controller->raw_byob_request();
  263. // 4. If byobView is non-null, and chunk.[[ViewedArrayBuffer]] is byobView.[[ViewedArrayBuffer]], then:
  264. if (byob_view) {
  265. // FIXME: 1. Assert: chunk.[[ByteOffset]] is byobView.[[ByteOffset]].
  266. // FIXME: 2. Assert: chunk.[[ByteLength]] ≤ byobView.[[ByteLength]].
  267. // FIXME: 3. Perform ? ReadableByteStreamControllerRespond(stream.[[controller]], chunk.[[ByteLength]]).
  268. TODO();
  269. }
  270. // 5. Otherwise, perform ? ReadableByteStreamControllerEnqueue(stream.[[controller]], chunk).
  271. return readable_byte_stream_controller_enqueue(readable_byte_controller, chunk);
  272. }
  273. }
  274. // https://streams.spec.whatwg.org/#readablestream-set-up-with-byte-reading-support
  275. void ReadableStream::set_up_with_byte_reading_support(GC::Ptr<PullAlgorithm> pull_algorithm, GC::Ptr<CancelAlgorithm> cancel_algorithm, double high_water_mark)
  276. {
  277. auto& realm = this->realm();
  278. // 1. Let startAlgorithm be an algorithm that returns undefined.
  279. auto start_algorithm = GC::create_function(realm.heap(), []() -> WebIDL::ExceptionOr<JS::Value> { return JS::js_undefined(); });
  280. // 2. Let pullAlgorithmWrapper be an algorithm that runs these steps:
  281. auto pull_algorithm_wrapper = GC::create_function(realm.heap(), [&realm, pull_algorithm]() {
  282. // 1. Let result be the result of running pullAlgorithm, if pullAlgorithm was given, or null otherwise. If this throws an exception e, return a promise rejected with e.
  283. GC::Ptr<JS::PromiseCapability> result = nullptr;
  284. if (pull_algorithm)
  285. result = pull_algorithm->function()();
  286. // 2. If result is a Promise, then return result.
  287. if (result != nullptr)
  288. return GC::Ref(*result);
  289. // 3. Return a promise resolved with undefined.
  290. return WebIDL::create_resolved_promise(realm, JS::js_undefined());
  291. });
  292. // 3. Let cancelAlgorithmWrapper be an algorithm that runs these steps:
  293. auto cancel_algorithm_wrapper = GC::create_function(realm.heap(), [&realm, cancel_algorithm](JS::Value c) {
  294. // 1. Let result be the result of running cancelAlgorithm, if cancelAlgorithm was given, or null otherwise. If this throws an exception e, return a promise rejected with e.
  295. GC::Ptr<JS::PromiseCapability> result = nullptr;
  296. if (cancel_algorithm)
  297. result = cancel_algorithm->function()(c);
  298. // 2. If result is a Promise, then return result.
  299. if (result != nullptr)
  300. return GC::Ref(*result);
  301. // 3. Return a promise resolved with undefined.
  302. return WebIDL::create_resolved_promise(realm, JS::js_undefined());
  303. });
  304. // 4. Perform ! InitializeReadableStream(stream).
  305. // 5. Let controller be a new ReadableByteStreamController.
  306. auto controller = realm.create<ReadableByteStreamController>(realm);
  307. // 6. Perform ! SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithmWrapper, cancelAlgorithmWrapper, highWaterMark, undefined).
  308. MUST(set_up_readable_byte_stream_controller(*this, controller, start_algorithm, pull_algorithm_wrapper, cancel_algorithm_wrapper, high_water_mark, JS::js_undefined()));
  309. }
  310. // https://streams.spec.whatwg.org/#readablestream-pipe-through
  311. GC::Ref<WebIDL::Promise> ReadableStream::piped_through(GC::Ref<WritableStream> writable, bool prevent_close, bool prevent_abort, bool prevent_cancel, JS::Value signal)
  312. {
  313. // 1. Assert: ! IsReadableStreamLocked(readable) is false.
  314. VERIFY(!is_readable_stream_locked(*this));
  315. // 2. Assert: ! IsWritableStreamLocked(writable) is false.
  316. VERIFY(!is_writable_stream_locked(writable));
  317. // 3. Let signalArg be signal if signal was given, or undefined otherwise.
  318. // NOTE: Done by default arguments.
  319. // 4. Return ! ReadableStreamPipeTo(readable, writable, preventClose, preventAbort, preventCancel, signalArg).
  320. return readable_stream_pipe_to(*this, writable, prevent_close, prevent_abort, prevent_cancel, signal);
  321. }
  322. }