Bläddra i källkod

LibIPC: Move non-templated parts of IPC::Connection out of line

This patch splits IPC::Connection into Connection and ConnectionBase.
ConnectionBase moves into Connection.cpp so we don't have to inline it
for every single templated subclass.
Andreas Kling 3 år sedan
förälder
incheckning
f3c4a357ea

+ 1 - 0
Userland/Libraries/LibIPC/CMakeLists.txt

@@ -1,4 +1,5 @@
 set(SOURCES
+    Connection.cpp
     Decoder.cpp
     Encoder.cpp
     Message.cpp

+ 152 - 0
Userland/Libraries/LibIPC/Connection.cpp

@@ -0,0 +1,152 @@
+/*
+ * Copyright (c) 2021, Andreas Kling <kling@serenityos.org>
+ *
+ * SPDX-License-Identifier: BSD-2-Clause
+ */
+
+#include <LibIPC/Connection.h>
+#include <LibIPC/Stub.h>
+#include <sys/select.h>
+
+namespace IPC {
+
+ConnectionBase::ConnectionBase(IPC::Stub& local_stub, NonnullRefPtr<Core::LocalSocket> socket)
+    : m_local_stub(local_stub)
+    , m_socket(move(socket))
+    , m_notifier(Core::Notifier::construct(m_socket->fd(), Core::Notifier::Read, this))
+{
+    m_responsiveness_timer = Core::Timer::create_single_shot(3000, [this] { may_have_become_unresponsive(); });
+}
+
+ConnectionBase::~ConnectionBase()
+{
+}
+
+void ConnectionBase::post_message(Message const& message)
+{
+    post_message(message.encode());
+}
+
+void ConnectionBase::post_message(MessageBuffer buffer)
+{
+    // NOTE: If this connection is being shut down, but has not yet been destroyed,
+    //       the socket will be closed. Don't try to send more messages.
+    if (!m_socket->is_open())
+        return;
+
+    // Prepend the message size.
+    uint32_t message_size = buffer.data.size();
+    buffer.data.prepend(reinterpret_cast<const u8*>(&message_size), sizeof(message_size));
+
+#ifdef __serenity__
+    for (auto& fd : buffer.fds) {
+        auto rc = sendfd(m_socket->fd(), fd->value());
+        if (rc < 0) {
+            perror("sendfd");
+            shutdown();
+        }
+    }
+#else
+    if (!buffer.fds.is_empty())
+        warnln("fd passing is not supported on this platform, sorry :(");
+#endif
+
+    size_t total_nwritten = 0;
+    while (total_nwritten < buffer.data.size()) {
+        auto nwritten = write(m_socket->fd(), buffer.data.data() + total_nwritten, buffer.data.size() - total_nwritten);
+        if (nwritten < 0) {
+            switch (errno) {
+            case EPIPE:
+                dbgln("{}::post_message: Disconnected from peer", static_cast<Core::Object const&>(*this));
+                shutdown();
+                return;
+            case EAGAIN:
+                dbgln("{}::post_message: Peer buffer overflowed", static_cast<Core::Object const&>(*this));
+                shutdown();
+                return;
+            default:
+                perror("Connection::post_message write");
+                shutdown();
+                return;
+            }
+        }
+        total_nwritten += nwritten;
+    }
+
+    m_responsiveness_timer->start();
+}
+
+void ConnectionBase::shutdown()
+{
+    m_notifier->close();
+    m_socket->close();
+    die();
+}
+
+void ConnectionBase::handle_messages(u32 local_endpoint_magic)
+{
+    auto messages = move(m_unprocessed_messages);
+    for (auto& message : messages) {
+        if (message.endpoint_magic() == local_endpoint_magic)
+            if (auto response = m_local_stub.handle(message))
+                post_message(*response);
+    }
+}
+
+void ConnectionBase::wait_for_socket_to_become_readable()
+{
+    fd_set read_fds;
+    FD_ZERO(&read_fds);
+    FD_SET(m_socket->fd(), &read_fds);
+    for (;;) {
+        if (auto rc = select(m_socket->fd() + 1, &read_fds, nullptr, nullptr, nullptr); rc < 0) {
+            if (errno == EINTR)
+                continue;
+            perror("wait_for_specific_endpoint_message: select");
+            VERIFY_NOT_REACHED();
+        } else {
+            VERIFY(rc > 0);
+            VERIFY(FD_ISSET(m_socket->fd(), &read_fds));
+            break;
+        }
+    }
+}
+
+Result<Vector<u8>, bool> ConnectionBase::read_as_much_as_possible_from_socket_without_blocking()
+{
+    Vector<u8> bytes;
+
+    if (!m_unprocessed_bytes.is_empty()) {
+        bytes.append(m_unprocessed_bytes.data(), m_unprocessed_bytes.size());
+        m_unprocessed_bytes.clear();
+    }
+
+    while (m_socket->is_open()) {
+        u8 buffer[4096];
+        ssize_t nread = recv(m_socket->fd(), buffer, sizeof(buffer), MSG_DONTWAIT);
+        if (nread < 0) {
+            if (errno == EAGAIN)
+                break;
+            perror("recv");
+            exit(1);
+            return false;
+        }
+        if (nread == 0) {
+            if (bytes.is_empty()) {
+                deferred_invoke([this] { shutdown(); });
+                return false;
+            }
+            break;
+        }
+        bytes.append(buffer, nread);
+    }
+
+    if (!bytes.is_empty()) {
+        m_responsiveness_timer->stop();
+        did_become_responsive();
+    }
+
+    return bytes;
+}
+
+}

+ 46 - 147
Userland/Libraries/LibIPC/Connection.h

@@ -8,38 +8,70 @@
 
 #include <AK/ByteBuffer.h>
 #include <AK/NonnullOwnPtrVector.h>
+#include <AK/Result.h>
+#include <AK/Try.h>
 #include <LibCore/Event.h>
 #include <LibCore/EventLoop.h>
 #include <LibCore/LocalSocket.h>
 #include <LibCore/Notifier.h>
 #include <LibCore/Timer.h>
+#include <LibIPC/Forward.h>
 #include <LibIPC/Message.h>
 #include <errno.h>
 #include <stdint.h>
 #include <stdio.h>
 #include <stdlib.h>
-#include <sys/select.h>
 #include <sys/socket.h>
 #include <sys/types.h>
 #include <unistd.h>
 
 namespace IPC {
 
-template<typename LocalEndpoint, typename PeerEndpoint>
-class Connection : public Core::Object {
+class ConnectionBase : public Core::Object {
+    C_OBJECT_ABSTRACT(ConnectionBase);
+
 public:
-    using LocalStub = typename LocalEndpoint::Stub;
+    virtual ~ConnectionBase() override;
+
+    bool is_open() const { return m_socket->is_open(); }
+    void post_message(Message const&);
+
+    void shutdown();
+    virtual void die() { }
+
+protected:
+    explicit ConnectionBase(IPC::Stub&, NonnullRefPtr<Core::LocalSocket>);
+
+    Core::LocalSocket& socket() { return *m_socket; }
+
+    virtual void may_have_become_unresponsive() { }
+    virtual void did_become_responsive() { }
+
+    void wait_for_socket_to_become_readable();
+    Result<Vector<u8>, bool> read_as_much_as_possible_from_socket_without_blocking();
+    void post_message(MessageBuffer);
+    void handle_messages(u32 local_endpoint_magic);
+
+    IPC::Stub& m_local_stub;
+
+    NonnullRefPtr<Core::LocalSocket> m_socket;
+    RefPtr<Core::Timer> m_responsiveness_timer;
+
+    RefPtr<Core::Notifier> m_notifier;
+    NonnullOwnPtrVector<Message> m_unprocessed_messages;
+    ByteBuffer m_unprocessed_bytes;
+};
 
-    Connection(LocalStub& local_stub, NonnullRefPtr<Core::LocalSocket> socket)
-        : m_local_stub(local_stub)
-        , m_socket(move(socket))
-        , m_notifier(Core::Notifier::construct(m_socket->fd(), Core::Notifier::Read, this))
+template<typename LocalEndpoint, typename PeerEndpoint>
+class Connection : public ConnectionBase {
+public:
+    Connection(IPC::Stub& local_stub, NonnullRefPtr<Core::LocalSocket> socket)
+        : ConnectionBase(local_stub, move(socket))
     {
-        m_responsiveness_timer = Core::Timer::create_single_shot(3000, [this] { may_have_become_unresponsive(); });
         m_notifier->on_ready_to_read = [this] {
             NonnullRefPtr protect = *this;
             drain_messages_from_peer();
-            handle_messages();
+            handle_messages(LocalEndpoint::static_magic());
         };
     }
 
@@ -49,61 +81,6 @@ public:
         return wait_for_specific_endpoint_message<MessageType, LocalEndpoint>();
     }
 
-    void post_message(const Message& message)
-    {
-        post_message(message.encode());
-    }
-
-    // FIXME: unnecessary copy
-    void post_message(MessageBuffer buffer)
-    {
-        // NOTE: If this connection is being shut down, but has not yet been destroyed,
-        //       the socket will be closed. Don't try to send more messages.
-        if (!m_socket->is_open())
-            return;
-
-        // Prepend the message size.
-        uint32_t message_size = buffer.data.size();
-        buffer.data.prepend(reinterpret_cast<const u8*>(&message_size), sizeof(message_size));
-
-#ifdef __serenity__
-        for (auto& fd : buffer.fds) {
-            auto rc = sendfd(m_socket->fd(), fd->value());
-            if (rc < 0) {
-                perror("sendfd");
-                shutdown();
-            }
-        }
-#else
-        if (!buffer.fds.is_empty())
-            warnln("fd passing is not supported on this platform, sorry :(");
-#endif
-
-        size_t total_nwritten = 0;
-        while (total_nwritten < buffer.data.size()) {
-            auto nwritten = write(m_socket->fd(), buffer.data.data() + total_nwritten, buffer.data.size() - total_nwritten);
-            if (nwritten < 0) {
-                switch (errno) {
-                case EPIPE:
-                    dbgln("{}::post_message: Disconnected from peer", *this);
-                    shutdown();
-                    return;
-                case EAGAIN:
-                    dbgln("{}::post_message: Peer buffer overflowed", *this);
-                    shutdown();
-                    return;
-                default:
-                    perror("Connection::post_message write");
-                    shutdown();
-                    return;
-                }
-            }
-            total_nwritten += nwritten;
-        }
-
-        m_responsiveness_timer->start();
-    }
-
     template<typename RequestType, typename... Args>
     NonnullOwnPtr<typename RequestType::ResponseType> send_sync(Args&&... args)
     {
@@ -120,23 +97,7 @@ public:
         return wait_for_specific_endpoint_message<typename RequestType::ResponseType, PeerEndpoint>();
     }
 
-    virtual void may_have_become_unresponsive() { }
-    virtual void did_become_responsive() { }
-
-    void shutdown()
-    {
-        m_notifier->close();
-        m_socket->close();
-        die();
-    }
-
-    virtual void die() { }
-
-    bool is_open() const { return m_socket->is_open(); }
-
 protected:
-    Core::LocalSocket& socket() { return *m_socket; }
-
     template<typename MessageType, typename Endpoint>
     OwnPtr<MessageType> wait_for_specific_endpoint_message()
     {
@@ -153,21 +114,8 @@ protected:
 
             if (!m_socket->is_open())
                 break;
-            fd_set rfds;
-            FD_ZERO(&rfds);
-            FD_SET(m_socket->fd(), &rfds);
-            for (;;) {
-                if (auto rc = select(m_socket->fd() + 1, &rfds, nullptr, nullptr, nullptr); rc < 0) {
-                    if (errno == EINTR)
-                        continue;
-                    perror("wait_for_specific_endpoint_message: select");
-                    VERIFY_NOT_REACHED();
-                } else {
-                    VERIFY(rc > 0);
-                    VERIFY(FD_ISSET(m_socket->fd(), &rfds));
-                    break;
-                }
-            }
+
+            wait_for_socket_to_become_readable();
 
             if (!drain_messages_from_peer())
                 break;
@@ -177,37 +125,7 @@ protected:
 
     bool drain_messages_from_peer()
     {
-        Vector<u8> bytes;
-
-        if (!m_unprocessed_bytes.is_empty()) {
-            bytes.append(m_unprocessed_bytes.data(), m_unprocessed_bytes.size());
-            m_unprocessed_bytes.clear();
-        }
-
-        while (m_socket->is_open()) {
-            u8 buffer[4096];
-            ssize_t nread = recv(m_socket->fd(), buffer, sizeof(buffer), MSG_DONTWAIT);
-            if (nread < 0) {
-                if (errno == EAGAIN)
-                    break;
-                perror("recv");
-                exit(1);
-                return false;
-            }
-            if (nread == 0) {
-                if (bytes.is_empty()) {
-                    deferred_invoke([this] { shutdown(); });
-                    return false;
-                }
-                break;
-            }
-            bytes.append(buffer, nread);
-        }
-
-        if (!bytes.is_empty()) {
-            m_responsiveness_timer->stop();
-            did_become_responsive();
-        }
+        auto bytes = TRY(read_as_much_as_possible_from_socket_without_blocking());
 
         size_t index = 0;
         u32 message_size = 0;
@@ -246,30 +164,11 @@ protected:
 
         if (!m_unprocessed_messages.is_empty()) {
             deferred_invoke([this] {
-                handle_messages();
+                handle_messages(LocalEndpoint::static_magic());
             });
         }
         return true;
     }
-
-    void handle_messages()
-    {
-        auto messages = move(m_unprocessed_messages);
-        for (auto& message : messages) {
-            if (message.endpoint_magic() == LocalEndpoint::static_magic())
-                if (auto response = m_local_stub.handle(message))
-                    post_message(*response);
-        }
-    }
-
-protected:
-    LocalStub& m_local_stub;
-    NonnullRefPtr<Core::LocalSocket> m_socket;
-    RefPtr<Core::Timer> m_responsiveness_timer;
-
-    RefPtr<Core::Notifier> m_notifier;
-    NonnullOwnPtrVector<Message> m_unprocessed_messages;
-    ByteBuffer m_unprocessed_bytes;
 };
 
 }