Browse Source

LibWeb+WebWorker: Implement a first cut of post_message for Workers

This implementation completely ignores MessagePorts, and manually plumbs
data through LocalSockets.
Andrew Kaster 1 year ago
parent
commit
1602663b9e

+ 2 - 2
Base/res/html/misc/worker.js

@@ -1,10 +1,10 @@
 onmessage = evt => {
     console.log("In Worker - Got message:", JSON.stringify(evt.data));
 
-    postMessage(JSON.stringify(evt.data));
+    postMessage(evt.data, null);
 };
 
 console.log("In Worker - Loaded", this);
 console.log("Keys: ", JSON.stringify(Object.keys(this)));
 
-postMessage("loaded");
+postMessage("loaded", null);

+ 1 - 1
Base/res/html/misc/worker_parent.html

@@ -21,7 +21,7 @@
                     .getElementById("btn_hello")
                     .addEventListener("click", function() {
                         console.log("Sending Message");
-                        work.postMessage("Hey buddy!");
+                        work.postMessage({ "msg": "Hey buddy!" });
                     });
             });
         </script>

+ 3 - 0
Tests/LibWeb/Text/expected/Worker/Worker-echo.txt

@@ -0,0 +1,3 @@
+Got message from worker: "loaded"
+Got message from worker: {"msg":"marco"}
+DONE

+ 18 - 0
Tests/LibWeb/Text/input/Worker/Worker-echo.html

@@ -0,0 +1,18 @@
+<script src="../include.js"></script>
+<script>
+    asyncTest((done) => {
+        let work = new Worker("worker.js");
+        let count = 0;
+        work.onmessage = (evt) => {
+            println("Got message from worker: " + JSON.stringify(evt.data));
+            count++;
+            work.postMessage({"msg": "marco"});
+            if (count === 2) {
+                println("DONE");
+                work.onmessage = null;
+                work.terminate();
+                done();
+            }
+        };
+    });
+</script>

+ 4 - 0
Tests/LibWeb/Text/input/Worker/worker.js

@@ -0,0 +1,4 @@
+onmessage = evt => {
+    postMessage(evt.data, null);
+};
+postMessage("loaded", null);

+ 68 - 6
Userland/Libraries/LibWeb/HTML/Worker.cpp

@@ -9,6 +9,7 @@
 #include <LibJS/Runtime/Realm.h>
 #include <LibWeb/Bindings/MainThreadVM.h>
 #include <LibWeb/HTML/Scripting/Environments.h>
+#include <LibWeb/HTML/Scripting/TemporaryExecutionContext.h>
 #include <LibWeb/HTML/Worker.h>
 #include <LibWeb/HTML/WorkerDebugConsoleClient.h>
 #include <LibWeb/WebIDL/ExceptionOr.h>
@@ -109,6 +110,54 @@ void Worker::run_a_worker(AK::URL& url, EnvironmentSettingsObject& outside_setti
 
     // Note: This spawns a new process to act as the 'agent' for the worker.
     m_agent = heap().allocate_without_realm<WorkerAgent>(url, options);
+
+    auto& socket = m_agent->socket();
+    // FIXME: Hide this logic in MessagePort
+    socket.set_notifications_enabled(true);
+    socket.on_ready_to_read = [this] {
+        auto& socket = this->m_agent->socket();
+        auto& vm = this->vm();
+        auto& realm = this->realm();
+
+        auto num_bytes_ready = MUST(socket.pending_bytes());
+        switch (m_outside_port_state) {
+        case PortState::Header: {
+            if (num_bytes_ready < 8)
+                break;
+            auto const magic = MUST(socket.read_value<u32>());
+            if (magic != 0xDEADBEEF) {
+                m_outside_port_state = PortState::Error;
+                break;
+            }
+            m_outside_port_incoming_message_size = MUST(socket.read_value<u32>());
+            num_bytes_ready -= 8;
+            m_outside_port_state = PortState::Data;
+        }
+            [[fallthrough]];
+        case PortState::Data: {
+            if (num_bytes_ready < m_outside_port_incoming_message_size)
+                break;
+            SerializationRecord rec; // FIXME: Keep in class scope
+            rec.resize(m_outside_port_incoming_message_size / sizeof(u32));
+
+            MUST(socket.read_until_filled(to_bytes(rec.span())));
+
+            TemporaryExecutionContext cxt(relevant_settings_object(*this));
+            VERIFY(&realm == vm.current_realm());
+            MessageEventInit event_init {};
+            event_init.data = MUST(structured_deserialize(vm, rec, realm, {}));
+            // FIXME: Fill in the rest of the info from MessagePort
+
+            this->dispatch_event(MessageEvent::create(realm, EventNames::message, event_init));
+
+            m_outside_port_state = PortState::Header;
+            break;
+        }
+        case PortState::Error:
+            VERIFY_NOT_REACHED();
+            break;
+        }
+    };
 }
 
 // https://html.spec.whatwg.org/multipage/workers.html#dom-worker-terminate
@@ -120,16 +169,29 @@ WebIDL::ExceptionOr<void> Worker::terminate()
 }
 
 // https://html.spec.whatwg.org/multipage/workers.html#dom-worker-postmessage
-void Worker::post_message(JS::Value message, JS::Value)
+WebIDL::ExceptionOr<void> Worker::post_message(JS::Value message, JS::Value)
 {
     dbgln_if(WEB_WORKER_DEBUG, "WebWorker: Post Message: {}", message.to_string_without_side_effects());
 
-    // 1. Let targetPort be the port with which this is entangled, if any; otherwise let it be null.
-    auto& target_port = m_outside_port;
+    // FIXME: 1. Let targetPort be the port with which this is entangled, if any; otherwise let it be null.
+    // FIXME: 2. Let options be «[ "transfer" → transfer ]».
+    // FIXME: 3. Run the message port post message steps providing this, targetPort, message and options.
+
+    auto& realm = this->realm();
+    auto& vm = this->vm();
+
+    // FIXME: Use the with-transfer variant, which should(?) prepend the magic + size at the front
+    auto data = TRY(structured_serialize(vm, message));
+
+    Array<u32, 2> header = { 0xDEADBEEF, static_cast<u32>(data.size() * sizeof(u32)) };
 
-    // 2. Let options be «[ "transfer" → transfer ]».
-    // 3. Run the message port post message steps providing this, targetPort, message and options.
-    target_port->post_message(message);
+    if (auto const err = m_agent->socket().write_until_depleted(to_readonly_bytes(header.span())); err.is_error())
+        return WebIDL::DataCloneError::create(realm, TRY_OR_THROW_OOM(vm, String::formatted("{}", err.error())));
+
+    if (auto const err = m_agent->socket().write_until_depleted(to_readonly_bytes(data.span())); err.is_error())
+        return WebIDL::DataCloneError::create(realm, TRY_OR_THROW_OOM(vm, String::formatted("{}", err.error())));
+
+    return {};
 }
 
 #undef __ENUMERATE

+ 8 - 1
Userland/Libraries/LibWeb/HTML/Worker.h

@@ -41,7 +41,7 @@ public:
 
     WebIDL::ExceptionOr<void> terminate();
 
-    void post_message(JS::Value message, JS::Value transfer);
+    WebIDL::ExceptionOr<void> post_message(JS::Value message, JS::Value transfer);
 
     virtual ~Worker() = default;
 
@@ -66,6 +66,13 @@ private:
 
     JS::GCPtr<DOM::Document> m_document;
     JS::GCPtr<MessagePort> m_outside_port;
+    // FIXME: Move tihs state into the message port (and actually use it :) )
+    enum class PortState : u8 {
+        Header,
+        Data,
+        Error,
+    } m_outside_port_state { PortState::Header };
+    size_t m_outside_port_incoming_message_size { 0 };
 
     JS::GCPtr<WorkerAgent> m_agent;
 

+ 2 - 1
Userland/Libraries/LibWeb/HTML/WorkerAgent.cpp

@@ -109,7 +109,8 @@ WorkerAgent::WorkerAgent(AK::URL url, WorkerOptions const& options)
 
     int fds[2] = {};
     MUST(Core::System::socketpair(AF_LOCAL, SOCK_STREAM, 0, fds));
-    m_message_port_fd = fds[0];
+
+    m_socket = MUST(Core::BufferedLocalSocket::create(MUST(Core::LocalSocket::adopt_fd(fds[0]))));
 
     m_worker_ipc->async_start_dedicated_worker(m_url, options.type, options.credentials, options.name, fds[1]);
 }

+ 5 - 4
Userland/Libraries/LibWeb/HTML/WorkerAgent.h

@@ -6,11 +6,10 @@
 
 #pragma once
 
-#include <AK/RefCounted.h>
+#include <LibCore/Socket.h>
 #include <LibWeb/Forward.h>
 #include <LibWeb/HTML/MessageEvent.h>
 #include <LibWeb/HTML/MessagePort.h>
-#include <LibWeb/HTML/Scripting/ClassicScript.h>
 #include <LibWeb/HTML/Scripting/WorkerEnvironmentSettingsObject.h>
 #include <LibWeb/HTML/Window.h>
 #include <LibWeb/Worker/WebWorkerClient.h>
@@ -31,12 +30,14 @@ struct WorkerAgent : JS::Cell {
 
     RefPtr<Web::HTML::WebWorkerClient> m_worker_ipc;
 
+    Core::BufferedLocalSocket& socket() const { return *m_socket; }
+
 private:
     WorkerOptions m_worker_options;
     AK::URL m_url;
 
-    // TODO: associate with MessagePorts?
-    int m_message_port_fd;
+    // FIXME: associate with MessagePorts
+    OwnPtr<Core::BufferedLocalSocket> m_socket;
 };
 
 }

+ 72 - 1
Userland/Libraries/LibWeb/HTML/WorkerGlobalScope.cpp

@@ -4,14 +4,17 @@
  * SPDX-License-Identifier: BSD-2-Clause
  */
 
+#include <AK/Array.h>
 #include <AK/Vector.h>
 #include <LibWeb/Bindings/DedicatedWorkerExposedInterfaces.h>
-#include <LibWeb/Bindings/ExceptionOrUtils.h>
 #include <LibWeb/Bindings/Intrinsics.h>
 #include <LibWeb/Bindings/WorkerGlobalScopePrototype.h>
 #include <LibWeb/Forward.h>
 #include <LibWeb/HTML/EventHandler.h>
 #include <LibWeb/HTML/EventNames.h>
+#include <LibWeb/HTML/MessageEvent.h>
+#include <LibWeb/HTML/Scripting/TemporaryExecutionContext.h>
+#include <LibWeb/HTML/StructuredSerialize.h>
 #include <LibWeb/HTML/WorkerGlobalScope.h>
 #include <LibWeb/HTML/WorkerLocation.h>
 #include <LibWeb/HTML/WorkerNavigator.h>
@@ -52,6 +55,55 @@ void WorkerGlobalScope::visit_edges(Cell::Visitor& visitor)
     visitor.visit(m_navigator);
 }
 
+void WorkerGlobalScope::set_outside_port(NonnullOwnPtr<Core::BufferedLocalSocket> port)
+{
+    m_outside_port = move(port);
+
+    // FIXME: Hide this logic in MessagePort
+    m_outside_port->set_notifications_enabled(true);
+    m_outside_port->on_ready_to_read = [this] {
+        auto& vm = this->vm();
+        auto& realm = this->realm();
+
+        auto num_bytes_ready = MUST(m_outside_port->pending_bytes());
+        switch (m_outside_port_state) {
+        case PortState::Header: {
+            if (num_bytes_ready < 8)
+                break;
+            auto const magic = MUST(m_outside_port->read_value<u32>());
+            if (magic != 0xDEADBEEF) {
+                m_outside_port_state = PortState::Error;
+                break;
+            }
+            m_outside_port_incoming_message_size = MUST(m_outside_port->read_value<u32>());
+            num_bytes_ready -= 8;
+            m_outside_port_state = PortState::Data;
+        }
+            [[fallthrough]];
+        case PortState::Data: {
+            if (num_bytes_ready < m_outside_port_incoming_message_size)
+                break;
+            SerializationRecord rec; // FIXME: Keep in class scope
+            rec.resize(m_outside_port_incoming_message_size / sizeof(u32));
+            MUST(m_outside_port->read_until_filled(to_bytes(rec.span())));
+
+            TemporaryExecutionContext cxt(relevant_settings_object(*this));
+            MessageEventInit event_init {};
+            event_init.data = MUST(structured_deserialize(vm, rec, realm, {}));
+            // FIXME: Fill in the rest of the info from MessagePort
+
+            this->dispatch_event(MessageEvent::create(realm, EventNames::message, event_init));
+
+            m_outside_port_state = PortState::Header;
+            break;
+        }
+        case PortState::Error:
+            VERIFY_NOT_REACHED();
+            break;
+        }
+    };
+}
+
 // https://html.spec.whatwg.org/multipage/workers.html#importing-scripts-and-libraries
 WebIDL::ExceptionOr<void> WorkerGlobalScope::import_scripts(Vector<String> urls)
 {
@@ -94,6 +146,25 @@ JS::NonnullGCPtr<WorkerNavigator> WorkerGlobalScope::navigator() const
     return *m_navigator;
 }
 
+WebIDL::ExceptionOr<void> WorkerGlobalScope::post_message(JS::Value message, JS::Value)
+{
+    auto& realm = this->realm();
+    auto& vm = this->vm();
+
+    // FIXME: Use the with-transfer variant, which should(?) prepend the magic + size at the front
+    auto data = TRY(structured_serialize(vm, message));
+
+    Array<u32, 2> header = { 0xDEADBEEF, static_cast<u32>(data.size() * sizeof(u32)) };
+
+    if (auto const err = m_outside_port->write_until_depleted(to_readonly_bytes(header.span())); err.is_error())
+        return WebIDL::DataCloneError::create(realm, TRY_OR_THROW_OOM(vm, String::formatted("{}", err.error())));
+
+    if (auto const err = m_outside_port->write_until_depleted(to_readonly_bytes(data.span())); err.is_error())
+        return WebIDL::DataCloneError::create(realm, TRY_OR_THROW_OOM(vm, String::formatted("{}", err.error())));
+
+    return {};
+}
+
 #undef __ENUMERATE
 #define __ENUMERATE(attribute_name, event_name)                               \
     void WorkerGlobalScope::set_##attribute_name(WebIDL::CallbackType* value) \

+ 23 - 7
Userland/Libraries/LibWeb/HTML/WorkerGlobalScope.h

@@ -9,6 +9,7 @@
 #include <AK/Optional.h>
 #include <AK/RefCounted.h>
 #include <AK/URL.h>
+#include <LibCore/Socket.h>
 #include <LibWeb/DOM/EventTarget.h>
 #include <LibWeb/Forward.h>
 #include <LibWeb/HTML/WindowOrWorkerGlobalScope.h>
@@ -16,13 +17,16 @@
 #include <LibWeb/HTML/WorkerNavigator.h>
 #include <LibWeb/WebIDL/ExceptionOr.h>
 
-#define ENUMERATE_WORKER_GLOBAL_SCOPE_EVENT_HANDLERS(E)       \
-    E(onerror, HTML::EventNames::error)                       \
-    E(onlanguagechange, HTML::EventNames::languagechange)     \
-    E(ononline, HTML::EventNames::online)                     \
-    E(onoffline, HTML::EventNames::offline)                   \
-    E(onrejectionhandled, HTML::EventNames::rejectionhandled) \
-    E(onunhandledrejection, HTML::EventNames::unhandledrejection)
+// FIXME: message/messageerror belong on subclasses only
+#define ENUMERATE_WORKER_GLOBAL_SCOPE_EVENT_HANDLERS(E)           \
+    E(onerror, HTML::EventNames::error)                           \
+    E(onlanguagechange, HTML::EventNames::languagechange)         \
+    E(ononline, HTML::EventNames::online)                         \
+    E(onoffline, HTML::EventNames::offline)                       \
+    E(onrejectionhandled, HTML::EventNames::rejectionhandled)     \
+    E(onunhandledrejection, HTML::EventNames::unhandledrejection) \
+    E(onmessage, HTML::EventNames::message)                       \
+    E(onmessageerror, HTML::EventNames::messageerror)
 
 namespace Web::HTML {
 
@@ -69,6 +73,8 @@ public:
     ENUMERATE_WORKER_GLOBAL_SCOPE_EVENT_HANDLERS(__ENUMERATE)
 #undef __ENUMERATE
 
+    WebIDL::ExceptionOr<void> post_message(JS::Value message, JS::Value transfer);
+
     // Non-IDL public methods
 
     AK::URL const& url() const { return m_url.value(); }
@@ -78,6 +84,8 @@ public:
     //            this is not problematic as it cannot be observed from script.
     void set_location(JS::NonnullGCPtr<WorkerLocation> loc) { m_location = move(loc); }
 
+    void set_outside_port(NonnullOwnPtr<Core::BufferedLocalSocket> port);
+
     void initialize_web_interfaces(Badge<WorkerEnvironmentSettingsObject>);
 
     Web::Page* page() { return &m_page; }
@@ -91,6 +99,14 @@ private:
     JS::GCPtr<WorkerLocation> m_location;
     JS::GCPtr<WorkerNavigator> m_navigator;
 
+    OwnPtr<Core::BufferedLocalSocket> m_outside_port;
+    enum class PortState : u8 {
+        Header,
+        Data,
+        Error,
+    } m_outside_port_state { PortState::Header };
+    size_t m_outside_port_incoming_message_size { 0 };
+
     // FIXME: Add all these internal slots
 
     // https://html.spec.whatwg.org/multipage/workers.html#concept-WorkerGlobalScope-owner-set

+ 5 - 0
Userland/Libraries/LibWeb/HTML/WorkerGlobalScope.idl

@@ -18,6 +18,11 @@ interface WorkerGlobalScope : EventTarget {
     attribute EventHandler ononline;
     attribute EventHandler onrejectionhandled;
     attribute EventHandler onunhandledrejection;
+
+    // FIXME: This belongs on the subclasses of WorkerGlobalScope
+    undefined postMessage(any message, any transfer);
+    attribute EventHandler onmessage;
+    attribute EventHandler onmessageerror;
 };
 
 WorkerGlobalScope includes WindowOrWorkerGlobalScope;

+ 2 - 2
Userland/Services/WebWorker/ConnectionFromClient.cpp

@@ -52,9 +52,9 @@ Web::Page const& ConnectionFromClient::page() const
     return m_page_host->page();
 }
 
-void ConnectionFromClient::start_dedicated_worker(AK::URL const& url, String const& type, String const&, String const&, IPC::File const&)
+void ConnectionFromClient::start_dedicated_worker(AK::URL const& url, String const& type, String const&, String const&, IPC::File const& implicit_port)
 {
-    m_worker_host = make_ref_counted<DedicatedWorkerHost>(page(), url, type);
+    m_worker_host = make_ref_counted<DedicatedWorkerHost>(page(), url, type, implicit_port.take_fd());
 
     m_worker_host->run();
 }

+ 9 - 3
Userland/Services/WebWorker/DedicatedWorkerHost.cpp

@@ -17,14 +17,18 @@
 
 namespace WebWorker {
 
-DedicatedWorkerHost::DedicatedWorkerHost(Web::Page& page, AK::URL url, String type)
+DedicatedWorkerHost::DedicatedWorkerHost(Web::Page& page, AK::URL url, String type, int outside_port)
     : m_page(page)
     , m_url(move(url))
     , m_type(move(type))
+    , m_outside_port(outside_port)
 {
 }
 
-DedicatedWorkerHost::~DedicatedWorkerHost() = default;
+DedicatedWorkerHost::~DedicatedWorkerHost()
+{
+    ::close(m_outside_port);
+}
 
 // https://html.spec.whatwg.org/multipage/workers.html#run-a-worker
 // FIXME: Extract out into a helper for both shared and dedicated workers
@@ -128,7 +132,7 @@ void DedicatedWorkerHost::run()
     };
     auto perform_fetch = Web::HTML::create_perform_the_fetch_hook(inner_settings->heap(), move(perform_fetch_function));
 
-    auto on_complete_function = [inner_settings, worker_global_scope](JS::GCPtr<Web::HTML::Script> script) {
+    auto on_complete_function = [inner_settings, worker_global_scope, outside_port = m_outside_port](JS::GCPtr<Web::HTML::Script> script) {
         auto& realm = inner_settings->realm();
         // 1. If script is null or if script's error to rethrow is non-null, then:
         if (!script || !script->error_to_rethrow().is_null()) {
@@ -147,6 +151,8 @@ void DedicatedWorkerHost::run()
         // FIXME: 3. Let inside port be a new MessagePort object in inside settings's Realm.
         // FIXME: 4. Associate inside port with worker global scope.
         // FIXME: 5. Entangle outside port and inside port.
+        // This is a hack, move to a real MessagePort object per above FIXMEs.
+        worker_global_scope->set_outside_port(MUST(Core::BufferedLocalSocket::create(MUST(Core::LocalSocket::adopt_fd(outside_port)))));
 
         // 6. Create a new WorkerLocation object and associate it with worker global scope.
         worker_global_scope->set_location(realm.heap().allocate<Web::HTML::WorkerLocation>(realm, *worker_global_scope));

+ 3 - 1
Userland/Services/WebWorker/DedicatedWorkerHost.h

@@ -15,7 +15,7 @@ namespace WebWorker {
 
 class DedicatedWorkerHost : public RefCounted<DedicatedWorkerHost> {
 public:
-    explicit DedicatedWorkerHost(Web::Page&, AK::URL url, String type);
+    explicit DedicatedWorkerHost(Web::Page&, AK::URL url, String type, int outside_port);
     ~DedicatedWorkerHost();
 
     void run();
@@ -26,6 +26,8 @@ private:
 
     AK::URL m_url;
     String m_type;
+
+    int m_outside_port { -1 };
 };
 
 }