LibWeb: Post all MessagePort messages over their LocalSockets

This is to allow future changes to do cross-process MessagePorts in an
implementation-agnostic way. Add some tests for this behavior.

Delivering messages that were posted to a MessagePort just before it was
transferred is not yet implemented still.
This commit is contained in:
Andrew Kaster 2023-12-19 15:28:56 -07:00 committed by Andrew Kaster
parent 6e3b816763
commit c0f50b12a4
Notes: sideshowbarker 2024-07-16 22:11:09 +09:00
7 changed files with 301 additions and 52 deletions

View file

@ -0,0 +1,6 @@
Port1: "Hello"
Port1: {"foo":{}}
Port1: "DONE"
Port2: "Hello"
Port3: "Hello from the transferred port"
Port2: "DONE"

View file

@ -0,0 +1,32 @@
<script src="../include.js"></script>
<script>
asyncTest(done => {
let channel = new MessageChannel();
channel.port1.onmessage = (event) => {
println("Port1: " + JSON.stringify(event.data));
if (event.ports.length > 0) {
event.ports[0].postMessage("Hello from the transferred port");
return;
}
channel.port1.postMessage(event.data);
};
channel.port2.onmessage = (event) => {
println("Port2: " + JSON.stringify(event.data));
if (event.data === "DONE") {
done();
}
};
let channel2 = new MessageChannel();
channel2.port2.onmessage = (event) => {
println("Port3: " + JSON.stringify(event.data))
}
channel.port2.postMessage("Hello");
channel.port2.postMessage({ foo: channel2.port1 }, { transfer: [channel2.port1] });
channel.port2.postMessage("DONE");
});
</script>

View file

@ -1,18 +1,20 @@
/*
* Copyright (c) 2021, Andreas Kling <kling@serenityos.org>
* Copyright (c) 2023, Andrew Kaster <akaster@serenityos.org>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#include <AK/MemoryStream.h>
#include <LibCore/Socket.h>
#include <LibCore/System.h>
#include <LibIPC/Decoder.h>
#include <LibIPC/Encoder.h>
#include <LibIPC/File.h>
#include <LibWeb/Bindings/ExceptionOrUtils.h>
#include <LibWeb/Bindings/Intrinsics.h>
#include <LibWeb/Bindings/MessagePortPrototype.h>
#include <LibWeb/DOM/EventDispatcher.h>
#include <LibWeb/HTML/EventHandler.h>
#include <LibWeb/HTML/EventLoop/EventLoop.h>
#include <LibWeb/HTML/EventNames.h>
#include <LibWeb/HTML/MessageEvent.h>
#include <LibWeb/HTML/MessagePort.h>
@ -34,7 +36,10 @@ MessagePort::MessagePort(JS::Realm& realm)
{
}
MessagePort::~MessagePort() = default;
MessagePort::~MessagePort()
{
disentangle();
}
void MessagePort::initialize(JS::Realm& realm)
{
@ -67,6 +72,11 @@ WebIDL::ExceptionOr<void> MessagePort::transfer_steps(HTML::TransferDataHolder&
m_socket = nullptr;
data_holder.fds.append(fd);
data_holder.data.append(IPC_FILE_TAG);
auto fd_passing_socket = MUST(m_fd_passing_socket->release_fd());
m_fd_passing_socket = nullptr;
data_holder.fds.append(fd_passing_socket);
data_holder.data.append(IPC_FILE_TAG);
}
// 4. Otherwise, set dataHolder.[[RemotePort]] to null.
@ -91,7 +101,12 @@ WebIDL::ExceptionOr<void> MessagePort::transfer_receiving_steps(HTML::TransferDa
auto fd_tag = data_holder.data.take_first();
if (fd_tag == IPC_FILE_TAG) {
auto fd = data_holder.fds.take_first();
m_socket = MUST(Core::LocalSocket::adopt_fd(fd.take_fd()));
m_socket = MUST(Core::LocalSocket::adopt_fd(fd.take_fd(), Core::LocalSocket::PreventSIGPIPE::Yes));
fd_tag = data_holder.data.take_first();
VERIFY(fd_tag == IPC_FILE_TAG);
fd = data_holder.fds.take_first();
m_fd_passing_socket = MUST(Core::LocalSocket::adopt_fd(fd.take_fd(), Core::LocalSocket::PreventSIGPIPE::Yes));
} else if (fd_tag != 0) {
dbgln("Unexpected byte {:x} in MessagePort transfer data", fd_tag);
VERIFY_NOT_REACHED();
@ -102,10 +117,12 @@ WebIDL::ExceptionOr<void> MessagePort::transfer_receiving_steps(HTML::TransferDa
void MessagePort::disentangle()
{
m_remote_port->m_remote_port = nullptr;
if (m_remote_port)
m_remote_port->m_remote_port = nullptr;
m_remote_port = nullptr;
m_socket = nullptr;
m_fd_passing_socket = nullptr;
}
// https://html.spec.whatwg.org/multipage/web-messaging.html#entangle
@ -125,17 +142,34 @@ void MessagePort::entangle_with(MessagePort& remote_port)
remote_port.m_remote_port = this;
m_remote_port = &remote_port;
int fds[2] = {};
MUST(Core::System::socketpair(AF_LOCAL, SOCK_STREAM, 0, fds));
auto socket0 = MUST(Core::LocalSocket::adopt_fd(fds[0]));
MUST(socket0->set_blocking(false));
MUST(socket0->set_close_on_exec(true));
auto socket1 = MUST(Core::LocalSocket::adopt_fd(fds[1]));
MUST(socket1->set_blocking(false));
MUST(socket1->set_close_on_exec(true));
auto create_paired_sockets = []() -> Array<NonnullOwnPtr<Core::LocalSocket>, 2> {
int fds[2] = {};
MUST(Core::System::socketpair(AF_LOCAL, SOCK_STREAM, 0, fds));
auto socket0 = MUST(Core::LocalSocket::adopt_fd(fds[0], Core::LocalSocket::PreventSIGPIPE::Yes));
MUST(socket0->set_blocking(false));
MUST(socket0->set_close_on_exec(true));
auto socket1 = MUST(Core::LocalSocket::adopt_fd(fds[1], Core::LocalSocket::PreventSIGPIPE::Yes));
MUST(socket1->set_blocking(false));
MUST(socket1->set_close_on_exec(true));
m_socket = move(socket0);
m_remote_port->m_socket = move(socket1);
return Array { move(socket0), move(socket1) };
};
auto sockets = create_paired_sockets();
m_socket = move(sockets[0]);
m_remote_port->m_socket = move(sockets[1]);
m_socket->on_ready_to_read = [strong_this = JS::make_handle(this)]() {
strong_this->read_from_socket();
};
m_remote_port->m_socket->on_ready_to_read = [remote_port = JS::make_handle(m_remote_port)]() {
remote_port->read_from_socket();
};
auto fd_sockets = create_paired_sockets();
m_fd_passing_socket = move(fd_sockets[0]);
m_remote_port->m_fd_passing_socket = move(fd_sockets[1]);
}
// https://html.spec.whatwg.org/multipage/web-messaging.html#dom-messageport-postmessage-options
@ -193,58 +227,167 @@ WebIDL::ExceptionOr<void> MessagePort::message_port_post_message_steps(JS::GCPtr
auto serialize_with_transfer_result = TRY(structured_serialize_with_transfer(vm, message, transfer));
// 6. If targetPort is null, or if doomed is true, then return.
if (!target_port || doomed)
// IMPLEMENTATION DEFINED: Actually check the socket here, not the target port.
// If there's no target message port in the same realm, we still want to send the message over IPC
if (!m_socket || doomed) {
return {};
}
// FIXME: 7. Add a task that runs the following steps to the port message queue of targetPort:
// FIXME: Implement this using the port message queue/unshipped port message queue concept
main_thread_event_loop().task_queue().add(HTML::Task::create(HTML::Task::Source::PostedMessage, nullptr, [target_port, serialize_with_transfer_result = move(serialize_with_transfer_result)]() mutable {
// FIXME: 1. Let finalTargetPort be the MessagePort in whose port message queue the task now finds itself.
// FIXME: NOTE: This can be different from targetPort, if targetPort itself was transferred and thus all its tasks moved along with it.
auto final_target_port = target_port;
// 7. Add a task that runs the following steps to the port message queue of targetPort:
post_port_message(move(serialize_with_transfer_result));
// 2. Let targetRealm be finalTargetPort's relevant realm.
auto& target_realm = relevant_realm(*final_target_port);
auto& target_vm = target_realm.vm();
return {};
}
// 3. Let deserializeRecord be StructuredDeserializeWithTransfer(serializeWithTransferResult, targetRealm).
TemporaryExecutionContext context { relevant_settings_object(*final_target_port) };
auto deserialize_record_or_error = structured_deserialize_with_transfer(target_vm, serialize_with_transfer_result);
if (deserialize_record_or_error.is_error()) {
// If this throws an exception, catch it, fire an event named messageerror at finalTargetPort, using MessageEvent, and then return.
auto exception = deserialize_record_or_error.release_error();
MessageEventInit event_init {};
final_target_port->dispatch_event(MessageEvent::create(target_realm, HTML::EventNames::messageerror, event_init));
return;
ErrorOr<void> MessagePort::send_message_on_socket(SerializedTransferRecord const& serialize_with_transfer_result)
{
IPC::MessageBuffer buffer;
IPC::Encoder encoder(buffer);
MUST(encoder.encode<u32>(0)); // placeholder for total size
MUST(encoder.encode(serialize_with_transfer_result));
u32 buffer_size = buffer.data.size() - sizeof(u32); // size of *payload*
buffer.data[0] = buffer_size & 0xFF;
buffer.data[1] = (buffer_size >> 8) & 0xFF;
buffer.data[2] = (buffer_size >> 16) & 0xFF;
buffer.data[3] = (buffer_size >> 24) & 0xFF;
for (auto& fd : buffer.fds) {
if (auto result = m_fd_passing_socket->send_fd(fd->value()); result.is_error()) {
return Error::from_string_view("Can't send fd"sv);
}
auto deserialize_record = deserialize_record_or_error.release_value();
}
// 4. Let messageClone be deserializeRecord.[[Deserialized]].
auto message_clone = deserialize_record.deserialized;
// 5. Let newPorts be a new frozen array consisting of all MessagePort objects in deserializeRecord.[[TransferredValues]], if any, maintaining their relative order.
// FIXME: Use a FrozenArray
Vector<JS::Handle<JS::Object>> new_ports;
for (auto const& object : deserialize_record.transferred_values) {
if (is<HTML::MessagePort>(*object)) {
new_ports.append(object);
ReadonlyBytes bytes_to_write { buffer.data.span() };
int writes_done = 0;
size_t initial_size = bytes_to_write.size();
while (!bytes_to_write.is_empty()) {
auto maybe_nwritten = m_socket->write_some(bytes_to_write);
writes_done++;
if (maybe_nwritten.is_error()) {
auto error = maybe_nwritten.release_error();
if (error.is_errno()) {
// FIXME: This is a hacky way to at least not crash on large messages
// The limit of 100 writes is arbitrary, and there to prevent indefinite spinning on the EventLoop
if (error.code() == EAGAIN && writes_done < 100) {
sched_yield();
continue;
}
switch (error.code()) {
case EPIPE:
return Error::from_string_literal("IPC::Connection::post_message: Disconnected from peer");
case EAGAIN:
return Error::from_string_literal("IPC::Connection::post_message: Peer buffer overflowed");
default:
return Error::from_syscall("IPC::Connection::post_message write"sv, -error.code());
}
} else {
return error;
}
}
// 6. Fire an event named message at finalTargetPort, using MessageEvent, with the data attribute initialized to messageClone and the ports attribute initialized to newPorts.
MessageEventInit event_init {};
event_init.data = message_clone;
event_init.ports = move(new_ports);
final_target_port->dispatch_event(MessageEvent::create(target_realm, HTML::EventNames::message, event_init));
}));
bytes_to_write = bytes_to_write.slice(maybe_nwritten.value());
}
if (writes_done > 1) {
dbgln("LibIPC::Connection FIXME Warning, needed {} writes needed to send message of size {}B, this is pretty bad, as it spins on the EventLoop", writes_done, initial_size);
}
return {};
}
void MessagePort::post_port_message(SerializedTransferRecord serialize_with_transfer_result)
{
// FIXME: Use the correct task source?
queue_global_task(Task::Source::PostedMessage, relevant_global_object(*this), [this, serialize_with_transfer_result = move(serialize_with_transfer_result)]() mutable {
if (!m_socket || !m_socket->is_open())
return;
if (auto result = send_message_on_socket(serialize_with_transfer_result); result.is_error()) {
dbgln("Failed to post message: {}", result.error());
disentangle();
}
});
}
void MessagePort::read_from_socket()
{
auto num_bytes_ready = MUST(m_socket->pending_bytes());
switch (m_socket_state) {
case SocketState::Header: {
if (num_bytes_ready < sizeof(u32))
break;
m_socket_incoming_message_size = MUST(m_socket->read_value<u32>());
num_bytes_ready -= sizeof(u32);
m_socket_state = SocketState::Data;
}
[[fallthrough]];
case SocketState::Data: {
if (num_bytes_ready < m_socket_incoming_message_size)
break;
Vector<u8, 1024> data;
data.resize(m_socket_incoming_message_size, true);
MUST(m_socket->read_until_filled(data));
FixedMemoryStream stream { data, FixedMemoryStream::Mode::ReadOnly };
IPC::Decoder decoder(stream, *m_fd_passing_socket);
auto serialize_with_transfer_result = MUST(decoder.decode<SerializedTransferRecord>());
post_message_task_steps(serialize_with_transfer_result);
m_socket_state = SocketState::Header;
break;
}
case SocketState::Error:
VERIFY_NOT_REACHED();
break;
}
}
void MessagePort::post_message_task_steps(SerializedTransferRecord& serialize_with_transfer_result)
{
// 1. Let finalTargetPort be the MessagePort in whose port message queue the task now finds itself.
// NOTE: This can be different from targetPort, if targetPort itself was transferred and thus all its tasks moved along with it.
auto* final_target_port = this;
// 2. Let targetRealm be finalTargetPort's relevant realm.
auto& target_realm = relevant_realm(*final_target_port);
auto& target_vm = target_realm.vm();
// 3. Let deserializeRecord be StructuredDeserializeWithTransfer(serializeWithTransferResult, targetRealm).
TemporaryExecutionContext context { relevant_settings_object(*final_target_port) };
auto deserialize_record_or_error = structured_deserialize_with_transfer(target_vm, serialize_with_transfer_result);
if (deserialize_record_or_error.is_error()) {
// If this throws an exception, catch it, fire an event named messageerror at finalTargetPort, using MessageEvent, and then return.
auto exception = deserialize_record_or_error.release_error();
MessageEventInit event_init {};
final_target_port->dispatch_event(MessageEvent::create(target_realm, HTML::EventNames::messageerror, event_init));
return;
}
auto deserialize_record = deserialize_record_or_error.release_value();
// 4. Let messageClone be deserializeRecord.[[Deserialized]].
auto message_clone = deserialize_record.deserialized;
// 5. Let newPorts be a new frozen array consisting of all MessagePort objects in deserializeRecord.[[TransferredValues]], if any, maintaining their relative order.
// FIXME: Use a FrozenArray
Vector<JS::Handle<JS::Object>> new_ports;
for (auto const& object : deserialize_record.transferred_values) {
if (is<HTML::MessagePort>(*object)) {
new_ports.append(object);
}
}
// 6. Fire an event named message at finalTargetPort, using MessageEvent, with the data attribute initialized to messageClone and the ports attribute initialized to newPorts.
MessageEventInit event_init {};
event_init.data = message_clone;
event_init.ports = move(new_ports);
final_target_port->dispatch_event(MessageEvent::create(target_realm, HTML::EventNames::message, event_init));
}
// https://html.spec.whatwg.org/multipage/web-messaging.html#dom-messageport-start
void MessagePort::start()
{
VERIFY(m_socket);
VERIFY(m_fd_passing_socket);
// TODO: The start() method steps are to enable this's port message queue, if it is not already enabled.
}

View file

@ -1,5 +1,6 @@
/*
* Copyright (c) 2021, Andreas Kling <kling@serenityos.org>
* Copyright (c) 2023, Andrew Kaster <akaster@serenityos.org>
*
* SPDX-License-Identifier: BSD-2-Clause
*/
@ -70,6 +71,10 @@ private:
void disentangle();
WebIDL::ExceptionOr<void> message_port_post_message_steps(JS::GCPtr<MessagePort> target_port, JS::Value message, StructuredSerializeOptions const& options);
void post_message_task_steps(SerializedTransferRecord&);
void post_port_message(SerializedTransferRecord);
ErrorOr<void> send_message_on_socket(SerializedTransferRecord const&);
void read_from_socket();
// The HTML spec implies(!) that this is MessagePort.[[RemotePort]]
JS::GCPtr<MessagePort> m_remote_port;
@ -78,6 +83,14 @@ private:
bool m_has_been_shipped { false };
OwnPtr<Core::LocalSocket> m_socket;
OwnPtr<Core::LocalSocket> m_fd_passing_socket;
enum class SocketState : u8 {
Header,
Data,
Error,
} m_socket_state { SocketState::Header };
size_t m_socket_incoming_message_size { 0 };
};
}

View file

@ -4,7 +4,8 @@
// https://html.spec.whatwg.org/multipage/web-messaging.html#messageport
[Exposed=(Window,Worker,AudioWorklet), Transferable]
interface MessagePort : EventTarget {
undefined postMessage(any message, sequence<object> transfer);
// FIXME: IDL Overload resolution fails here
// FIXME: undefined postMessage(any message, sequence<object> transfer);
undefined postMessage(any message, optional StructuredSerializeOptions options = {});
undefined start();
undefined close();

View file

@ -10,6 +10,8 @@
#include <AK/StdLibExtras.h>
#include <AK/String.h>
#include <AK/Vector.h>
#include <LibIPC/Decoder.h>
#include <LibIPC/Encoder.h>
#include <LibIPC/File.h>
#include <LibJS/Forward.h>
#include <LibJS/Runtime/Array.h>
@ -1129,3 +1131,39 @@ WebIDL::ExceptionOr<JS::Value> structured_deserialize(JS::VM& vm, SerializationR
}
}
namespace IPC {
template<>
ErrorOr<void> encode(Encoder& encoder, ::Web::HTML::TransferDataHolder const& data_holder)
{
TRY(encoder.encode(data_holder.data));
TRY(encoder.encode(data_holder.fds));
return {};
}
template<>
ErrorOr<void> encode(Encoder& encoder, ::Web::HTML::SerializedTransferRecord const& record)
{
TRY(encoder.encode(record.serialized));
TRY(encoder.encode(record.transfer_data_holders));
return {};
}
template<>
ErrorOr<::Web::HTML::TransferDataHolder> decode(Decoder& decoder)
{
auto data = TRY(decoder.decode<Vector<u8>>());
auto fds = TRY(decoder.decode<Vector<IPC::File>>());
return ::Web::HTML::TransferDataHolder { move(data), move(fds) };
}
template<>
ErrorOr<::Web::HTML::SerializedTransferRecord> decode(Decoder& decoder)
{
auto serialized = TRY(decoder.decode<Vector<u32>>());
auto transfer_data_holders = TRY(decoder.decode<Vector<::Web::HTML::TransferDataHolder>>());
return ::Web::HTML::SerializedTransferRecord { move(serialized), move(transfer_data_holders) };
}
}

View file

@ -54,3 +54,19 @@ WebIDL::ExceptionOr<SerializedTransferRecord> structured_serialize_with_transfer
WebIDL::ExceptionOr<DeserializedTransferRecord> structured_deserialize_with_transfer(JS::VM& vm, SerializedTransferRecord&);
}
namespace IPC {
template<>
ErrorOr<void> encode(Encoder&, ::Web::HTML::SerializedTransferRecord const&);
template<>
ErrorOr<void> encode(Encoder&, ::Web::HTML::TransferDataHolder const&);
template<>
ErrorOr<::Web::HTML::SerializedTransferRecord> decode(Decoder&);
template<>
ErrorOr<::Web::HTML::TransferDataHolder> decode(Decoder&);
}