Connection.h 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. /*
  2. * Copyright (c) 2018-2020, Andreas Kling <kling@serenityos.org>
  3. *
  4. * SPDX-License-Identifier: BSD-2-Clause
  5. */
  6. #pragma once
  7. #include <AK/ByteBuffer.h>
  8. #include <AK/NonnullOwnPtrVector.h>
  9. #include <AK/Result.h>
  10. #include <AK/Try.h>
  11. #include <LibCore/Event.h>
  12. #include <LibCore/EventLoop.h>
  13. #include <LibCore/LocalSocket.h>
  14. #include <LibCore/Notifier.h>
  15. #include <LibCore/Timer.h>
  16. #include <LibIPC/Forward.h>
  17. #include <LibIPC/Message.h>
  18. #include <errno.h>
  19. #include <stdint.h>
  20. #include <stdio.h>
  21. #include <stdlib.h>
  22. #include <sys/socket.h>
  23. #include <sys/types.h>
  24. #include <unistd.h>
  25. namespace IPC {
  26. class ConnectionBase : public Core::Object {
  27. C_OBJECT_ABSTRACT(ConnectionBase);
  28. public:
  29. virtual ~ConnectionBase() override;
  30. bool is_open() const { return m_socket->is_open(); }
  31. void post_message(Message const&);
  32. void shutdown();
  33. virtual void die() { }
  34. protected:
  35. explicit ConnectionBase(IPC::Stub&, NonnullRefPtr<Core::LocalSocket>, u32 local_endpoint_magic);
  36. Core::LocalSocket& socket() { return *m_socket; }
  37. virtual void may_have_become_unresponsive() { }
  38. virtual void did_become_responsive() { }
  39. virtual void try_parse_messages(Vector<u8> const& bytes, size_t& index) = 0;
  40. OwnPtr<IPC::Message> wait_for_specific_endpoint_message_impl(u32 endpoint_magic, int message_id);
  41. void wait_for_socket_to_become_readable();
  42. Result<Vector<u8>, bool> read_as_much_as_possible_from_socket_without_blocking();
  43. bool drain_messages_from_peer();
  44. void post_message(MessageBuffer);
  45. void handle_messages();
  46. IPC::Stub& m_local_stub;
  47. NonnullRefPtr<Core::LocalSocket> m_socket;
  48. RefPtr<Core::Timer> m_responsiveness_timer;
  49. RefPtr<Core::Notifier> m_notifier;
  50. NonnullOwnPtrVector<Message> m_unprocessed_messages;
  51. ByteBuffer m_unprocessed_bytes;
  52. u32 m_local_endpoint_magic { 0 };
  53. };
  54. template<typename LocalEndpoint, typename PeerEndpoint>
  55. class Connection : public ConnectionBase {
  56. public:
  57. Connection(IPC::Stub& local_stub, NonnullRefPtr<Core::LocalSocket> socket)
  58. : ConnectionBase(local_stub, move(socket), LocalEndpoint::static_magic())
  59. {
  60. m_notifier->on_ready_to_read = [this] {
  61. NonnullRefPtr protect = *this;
  62. drain_messages_from_peer();
  63. handle_messages();
  64. };
  65. }
  66. template<typename MessageType>
  67. OwnPtr<MessageType> wait_for_specific_message()
  68. {
  69. return wait_for_specific_endpoint_message<MessageType, LocalEndpoint>();
  70. }
  71. template<typename RequestType, typename... Args>
  72. NonnullOwnPtr<typename RequestType::ResponseType> send_sync(Args&&... args)
  73. {
  74. post_message(RequestType(forward<Args>(args)...));
  75. auto response = wait_for_specific_endpoint_message<typename RequestType::ResponseType, PeerEndpoint>();
  76. VERIFY(response);
  77. return response.release_nonnull();
  78. }
  79. template<typename RequestType, typename... Args>
  80. OwnPtr<typename RequestType::ResponseType> send_sync_but_allow_failure(Args&&... args)
  81. {
  82. post_message(RequestType(forward<Args>(args)...));
  83. return wait_for_specific_endpoint_message<typename RequestType::ResponseType, PeerEndpoint>();
  84. }
  85. protected:
  86. template<typename MessageType, typename Endpoint>
  87. OwnPtr<MessageType> wait_for_specific_endpoint_message()
  88. {
  89. if (auto message = wait_for_specific_endpoint_message_impl(Endpoint::static_magic(), MessageType::static_message_id()))
  90. return message.template release_nonnull<MessageType>();
  91. return {};
  92. }
  93. virtual void try_parse_messages(Vector<u8> const& bytes, size_t& index) override
  94. {
  95. u32 message_size = 0;
  96. for (; index + sizeof(message_size) < bytes.size(); index += message_size) {
  97. memcpy(&message_size, bytes.data() + index, sizeof(message_size));
  98. if (message_size == 0 || bytes.size() - index - sizeof(uint32_t) < message_size)
  99. break;
  100. index += sizeof(message_size);
  101. auto remaining_bytes = ReadonlyBytes { bytes.data() + index, message_size };
  102. if (auto message = LocalEndpoint::decode_message(remaining_bytes, m_socket->fd())) {
  103. m_unprocessed_messages.append(message.release_nonnull());
  104. } else if (auto message = PeerEndpoint::decode_message(remaining_bytes, m_socket->fd())) {
  105. m_unprocessed_messages.append(message.release_nonnull());
  106. } else {
  107. dbgln("Failed to parse a message");
  108. break;
  109. }
  110. }
  111. }
  112. };
  113. }
  114. template<typename LocalEndpoint, typename PeerEndpoint>
  115. struct AK::Formatter<IPC::Connection<LocalEndpoint, PeerEndpoint>> : Formatter<Core::Object> {
  116. };