Connection.h 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. /*
  2. * Copyright (c) 2018-2020, Andreas Kling <kling@serenityos.org>
  3. * Copyright (c) 2022, the SerenityOS developers.
  4. *
  5. * SPDX-License-Identifier: BSD-2-Clause
  6. */
  7. #pragma once
  8. #include <AK/ByteBuffer.h>
  9. #include <AK/NonnullOwnPtrVector.h>
  10. #include <AK/Try.h>
  11. #include <LibCore/Event.h>
  12. #include <LibCore/EventLoop.h>
  13. #include <LibCore/Notifier.h>
  14. #include <LibCore/Stream.h>
  15. #include <LibCore/Timer.h>
  16. #include <LibIPC/Forward.h>
  17. #include <LibIPC/Message.h>
  18. #include <errno.h>
  19. #include <stdint.h>
  20. #include <stdio.h>
  21. #include <stdlib.h>
  22. #include <sys/socket.h>
  23. #include <sys/types.h>
  24. #include <unistd.h>
  25. namespace IPC {
  26. class ConnectionBase : public Core::Object {
  27. C_OBJECT_ABSTRACT(ConnectionBase);
  28. public:
  29. virtual ~ConnectionBase() override = default;
  30. bool is_open() const { return m_socket->is_open(); }
  31. ErrorOr<void> post_message(Message const&);
  32. void shutdown();
  33. virtual void die() { }
  34. protected:
  35. explicit ConnectionBase(IPC::Stub&, NonnullOwnPtr<Core::Stream::LocalSocket>, u32 local_endpoint_magic);
  36. Core::Stream::LocalSocket& socket() { return *m_socket; }
  37. virtual void may_have_become_unresponsive() { }
  38. virtual void did_become_responsive() { }
  39. virtual void try_parse_messages(Vector<u8> const& bytes, size_t& index) = 0;
  40. OwnPtr<IPC::Message> wait_for_specific_endpoint_message_impl(u32 endpoint_magic, int message_id);
  41. void wait_for_socket_to_become_readable();
  42. ErrorOr<Vector<u8>> read_as_much_as_possible_from_socket_without_blocking();
  43. ErrorOr<void> drain_messages_from_peer();
  44. ErrorOr<void> post_message(MessageBuffer);
  45. void handle_messages();
  46. IPC::Stub& m_local_stub;
  47. NonnullOwnPtr<Core::Stream::LocalSocket> m_socket;
  48. RefPtr<Core::Timer> m_responsiveness_timer;
  49. NonnullOwnPtrVector<Message> m_unprocessed_messages;
  50. ByteBuffer m_unprocessed_bytes;
  51. u32 m_local_endpoint_magic { 0 };
  52. };
  53. template<typename LocalEndpoint, typename PeerEndpoint>
  54. class Connection : public ConnectionBase {
  55. public:
  56. Connection(IPC::Stub& local_stub, NonnullOwnPtr<Core::Stream::LocalSocket> socket)
  57. : ConnectionBase(local_stub, move(socket), LocalEndpoint::static_magic())
  58. {
  59. m_socket->on_ready_to_read = [this] {
  60. NonnullRefPtr protect = *this;
  61. // FIXME: Do something about errors.
  62. (void)drain_messages_from_peer();
  63. handle_messages();
  64. };
  65. }
  66. template<typename MessageType>
  67. OwnPtr<MessageType> wait_for_specific_message()
  68. {
  69. return wait_for_specific_endpoint_message<MessageType, LocalEndpoint>();
  70. }
  71. template<typename RequestType, typename... Args>
  72. NonnullOwnPtr<typename RequestType::ResponseType> send_sync(Args&&... args)
  73. {
  74. MUST(post_message(RequestType(forward<Args>(args)...)));
  75. auto response = wait_for_specific_endpoint_message<typename RequestType::ResponseType, PeerEndpoint>();
  76. VERIFY(response);
  77. return response.release_nonnull();
  78. }
  79. template<typename RequestType, typename... Args>
  80. OwnPtr<typename RequestType::ResponseType> send_sync_but_allow_failure(Args&&... args)
  81. {
  82. if (post_message(RequestType(forward<Args>(args)...)).is_error())
  83. return nullptr;
  84. return wait_for_specific_endpoint_message<typename RequestType::ResponseType, PeerEndpoint>();
  85. }
  86. protected:
  87. template<typename MessageType, typename Endpoint>
  88. OwnPtr<MessageType> wait_for_specific_endpoint_message()
  89. {
  90. if (auto message = wait_for_specific_endpoint_message_impl(Endpoint::static_magic(), MessageType::static_message_id()))
  91. return message.template release_nonnull<MessageType>();
  92. return {};
  93. }
  94. virtual void try_parse_messages(Vector<u8> const& bytes, size_t& index) override
  95. {
  96. u32 message_size = 0;
  97. for (; index + sizeof(message_size) < bytes.size(); index += message_size) {
  98. memcpy(&message_size, bytes.data() + index, sizeof(message_size));
  99. if (message_size == 0 || bytes.size() - index - sizeof(uint32_t) < message_size)
  100. break;
  101. index += sizeof(message_size);
  102. auto remaining_bytes = ReadonlyBytes { bytes.data() + index, message_size };
  103. if (auto message = LocalEndpoint::decode_message(remaining_bytes, *m_socket)) {
  104. m_unprocessed_messages.append(message.release_nonnull());
  105. } else if (auto message = PeerEndpoint::decode_message(remaining_bytes, *m_socket)) {
  106. m_unprocessed_messages.append(message.release_nonnull());
  107. } else {
  108. dbgln("Failed to parse a message");
  109. break;
  110. }
  111. }
  112. }
  113. };
  114. }
  115. template<typename LocalEndpoint, typename PeerEndpoint>
  116. struct AK::Formatter<IPC::Connection<LocalEndpoint, PeerEndpoint>> : Formatter<Core::Object> {
  117. };