Connection.h 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. /*
  2. * Copyright (c) 2018-2020, Andreas Kling <kling@serenityos.org>
  3. * Copyright (c) 2022, the SerenityOS developers.
  4. *
  5. * SPDX-License-Identifier: BSD-2-Clause
  6. */
  7. #pragma once
  8. #include <AK/ByteBuffer.h>
  9. #include <AK/NonnullOwnPtrVector.h>
  10. #include <AK/Try.h>
  11. #include <LibCore/Event.h>
  12. #include <LibCore/EventLoop.h>
  13. #include <LibCore/Notifier.h>
  14. #include <LibCore/Stream.h>
  15. #include <LibCore/Timer.h>
  16. #include <LibIPC/Forward.h>
  17. #include <LibIPC/Message.h>
  18. #include <errno.h>
  19. #include <stdint.h>
  20. #include <stdio.h>
  21. #include <stdlib.h>
  22. #include <sys/socket.h>
  23. #include <sys/types.h>
  24. #include <unistd.h>
  25. namespace IPC {
  26. // NOTE: This is an abstraction to allow using IPC::Connection without a Core::EventLoop.
  27. // FIXME: It's not particularly nice, think of something nicer.
  28. struct DeferredInvoker {
  29. virtual ~DeferredInvoker() = default;
  30. virtual void schedule(Function<void()>) = 0;
  31. };
  32. class ConnectionBase : public Core::Object {
  33. C_OBJECT_ABSTRACT(ConnectionBase);
  34. public:
  35. virtual ~ConnectionBase() override = default;
  36. void set_fd_passing_socket(NonnullOwnPtr<Core::Stream::LocalSocket>);
  37. void set_deferred_invoker(NonnullOwnPtr<DeferredInvoker>);
  38. DeferredInvoker& deferred_invoker() { return *m_deferred_invoker; }
  39. bool is_open() const { return m_socket->is_open(); }
  40. ErrorOr<void> post_message(Message const&);
  41. void shutdown();
  42. virtual void die() { }
  43. Core::Stream::LocalSocket& socket() { return *m_socket; }
  44. Core::Stream::LocalSocket& fd_passing_socket();
  45. protected:
  46. explicit ConnectionBase(IPC::Stub&, NonnullOwnPtr<Core::Stream::LocalSocket>, u32 local_endpoint_magic);
  47. virtual void may_have_become_unresponsive() { }
  48. virtual void did_become_responsive() { }
  49. virtual void try_parse_messages(Vector<u8> const& bytes, size_t& index) = 0;
  50. virtual void shutdown_with_error(Error const&);
  51. OwnPtr<IPC::Message> wait_for_specific_endpoint_message_impl(u32 endpoint_magic, int message_id);
  52. void wait_for_socket_to_become_readable();
  53. ErrorOr<Vector<u8>> read_as_much_as_possible_from_socket_without_blocking();
  54. ErrorOr<void> drain_messages_from_peer();
  55. ErrorOr<void> post_message(MessageBuffer);
  56. void handle_messages();
  57. IPC::Stub& m_local_stub;
  58. NonnullOwnPtr<Core::Stream::LocalSocket> m_socket;
  59. OwnPtr<Core::Stream::LocalSocket> m_fd_passing_socket;
  60. RefPtr<Core::Timer> m_responsiveness_timer;
  61. NonnullOwnPtrVector<Message> m_unprocessed_messages;
  62. ByteBuffer m_unprocessed_bytes;
  63. u32 m_local_endpoint_magic { 0 };
  64. NonnullOwnPtr<DeferredInvoker> m_deferred_invoker;
  65. };
  66. template<typename LocalEndpoint, typename PeerEndpoint>
  67. class Connection : public ConnectionBase {
  68. public:
  69. Connection(IPC::Stub& local_stub, NonnullOwnPtr<Core::Stream::LocalSocket> socket)
  70. : ConnectionBase(local_stub, move(socket), LocalEndpoint::static_magic())
  71. {
  72. m_socket->on_ready_to_read = [this] {
  73. NonnullRefPtr protect = *this;
  74. // FIXME: Do something about errors.
  75. (void)drain_messages_from_peer();
  76. handle_messages();
  77. };
  78. }
  79. template<typename MessageType>
  80. OwnPtr<MessageType> wait_for_specific_message()
  81. {
  82. return wait_for_specific_endpoint_message<MessageType, LocalEndpoint>();
  83. }
  84. template<typename RequestType, typename... Args>
  85. NonnullOwnPtr<typename RequestType::ResponseType> send_sync(Args&&... args)
  86. {
  87. MUST(post_message(RequestType(forward<Args>(args)...)));
  88. auto response = wait_for_specific_endpoint_message<typename RequestType::ResponseType, PeerEndpoint>();
  89. VERIFY(response);
  90. return response.release_nonnull();
  91. }
  92. template<typename RequestType, typename... Args>
  93. OwnPtr<typename RequestType::ResponseType> send_sync_but_allow_failure(Args&&... args)
  94. {
  95. if (post_message(RequestType(forward<Args>(args)...)).is_error())
  96. return nullptr;
  97. return wait_for_specific_endpoint_message<typename RequestType::ResponseType, PeerEndpoint>();
  98. }
  99. protected:
  100. template<typename MessageType, typename Endpoint>
  101. OwnPtr<MessageType> wait_for_specific_endpoint_message()
  102. {
  103. if (auto message = wait_for_specific_endpoint_message_impl(Endpoint::static_magic(), MessageType::static_message_id()))
  104. return message.template release_nonnull<MessageType>();
  105. return {};
  106. }
  107. virtual void try_parse_messages(Vector<u8> const& bytes, size_t& index) override
  108. {
  109. u32 message_size = 0;
  110. for (; index + sizeof(message_size) < bytes.size(); index += message_size) {
  111. memcpy(&message_size, bytes.data() + index, sizeof(message_size));
  112. if (message_size == 0 || bytes.size() - index - sizeof(uint32_t) < message_size)
  113. break;
  114. index += sizeof(message_size);
  115. auto remaining_bytes = ReadonlyBytes { bytes.data() + index, message_size };
  116. auto local_message = LocalEndpoint::decode_message(remaining_bytes, fd_passing_socket());
  117. if (!local_message.is_error()) {
  118. m_unprocessed_messages.append(local_message.release_value());
  119. continue;
  120. }
  121. auto peer_message = PeerEndpoint::decode_message(remaining_bytes, fd_passing_socket());
  122. if (!peer_message.is_error()) {
  123. m_unprocessed_messages.append(peer_message.release_value());
  124. continue;
  125. }
  126. dbgln("Failed to parse a message");
  127. dbgln("Local endpoint error: {}", local_message.error());
  128. dbgln("Peer endpoint error: {}", peer_message.error());
  129. break;
  130. }
  131. }
  132. };
  133. }
  134. template<typename LocalEndpoint, typename PeerEndpoint>
  135. struct AK::Formatter<IPC::Connection<LocalEndpoint, PeerEndpoint>> : Formatter<Core::Object> {
  136. };