CoreIPCServer.h 11 KB


  1. #pragma once
  2. #include <AK/Queue.h>
  3. #include <LibCore/CEvent.h>
  4. #include <LibCore/CEventLoop.h>
  5. #include <LibCore/CIODevice.h>
  6. #include <LibCore/CLocalSocket.h>
  7. #include <LibCore/CNotifier.h>
  8. #include <LibCore/CObject.h>
  9. #include <LibIPC/IEndpoint.h>
  10. #include <LibIPC/IMessage.h>
  11. #include <errno.h>
  12. #include <sched.h>
  13. #include <stdio.h>
  14. #include <sys/socket.h>
  15. #include <sys/types.h>
  16. #include <sys/uio.h>
  17. #include <unistd.h>
  18. //#define CIPC_DEBUG
  19. namespace IPC {
  20. namespace Server {
  21. class Event : public CEvent {
  22. public:
  23. enum Type {
  24. Invalid = 2000,
  25. Disconnected,
  26. };
  27. Event() {}
  28. explicit Event(Type type)
  29. : CEvent(type)
  30. {
  31. }
  32. };
  33. class DisconnectedEvent : public Event {
  34. public:
  35. explicit DisconnectedEvent(int client_id)
  36. : Event(Disconnected)
  37. , m_client_id(client_id)
  38. {
  39. }
  40. int client_id() const { return m_client_id; }
  41. private:
  42. int m_client_id { 0 };
  43. };
  44. template<typename T, class... Args>
  45. NonnullRefPtr<T> new_connection_for_client(Args&&... args)
  46. {
  47. auto conn = T::construct(forward<Args>(args)...);
  48. conn->send_greeting();
  49. return conn;
  50. }
  51. template<typename T, class... Args>
  52. NonnullRefPtr<T> new_connection_ng_for_client(Args&&... args)
  53. {
  54. return T::construct(forward<Args>(args)...) /* arghs */;
  55. }
  56. template<typename ServerMessage, typename ClientMessage>
  57. class Connection : public CObject {
  58. protected:
  59. Connection(CLocalSocket& socket, int client_id)
  60. : m_socket(socket)
  61. , m_client_id(client_id)
  62. {
  63. add_child(socket);
  64. m_socket->on_ready_to_read = [this] {
  65. drain_client();
  66. flush_outgoing_messages();
  67. };
  68. #if defined(CIPC_DEBUG)
  69. dbg() << "S: Created new Connection " << fd << client_id << " and said hello";
  70. #endif
  71. }
  72. public:
  73. ~Connection()
  74. {
  75. #if defined(CIPC_DEBUG)
  76. dbg() << "S: Destroyed Connection " << m_socket->fd() << client_id();
  77. #endif
  78. }
  79. void post_message(const ServerMessage& message, const ByteBuffer& extra_data = {})
  80. {
  81. #if defined(CIPC_DEBUG)
  82. dbg() << "S: -> C " << int(message.type) << " extra " << extra_data.size();
  83. #endif
  84. flush_outgoing_messages();
  85. if (try_send_message(message, extra_data))
  86. return;
  87. if (m_queue.size() >= max_queued_messages) {
  88. dbg() << "Connection::post_message: Client has too many queued messages already, disconnecting it.";
  89. shutdown();
  90. return;
  91. }
  92. QueuedMessage queued_message { message, extra_data };
  93. if (!extra_data.is_empty())
  94. queued_message.message.extra_size = extra_data.size();
  95. m_queue.enqueue(move(queued_message));
  96. }
  97. bool try_send_message(const ServerMessage& message, const ByteBuffer& extra_data)
  98. {
  99. struct iovec iov[2];
  100. int iov_count = 1;
  101. iov[0].iov_base = const_cast<ServerMessage*>(&message);
  102. iov[0].iov_len = sizeof(message);
  103. if (!extra_data.is_empty()) {
  104. iov[1].iov_base = const_cast<u8*>(extra_data.data());
  105. iov[1].iov_len = extra_data.size();
  106. ++iov_count;
  107. }
  108. int nwritten = writev(m_socket->fd(), iov, iov_count);
  109. if (nwritten < 0) {
  110. switch (errno) {
  111. case EPIPE:
  112. dbgprintf("Connection::post_message: Disconnected from peer.\n");
  113. shutdown();
  114. return false;
  115. case EAGAIN:
  116. #ifdef CIPC_DEBUG
  117. dbg() << "EAGAIN when trying to send WindowServer message, queue size: " << m_queue.size();
  118. #endif
  119. return false;
  120. default:
  121. perror("Connection::post_message writev");
  122. ASSERT_NOT_REACHED();
  123. }
  124. }
  125. ASSERT(nwritten == (int)(sizeof(message) + extra_data.size()));
  126. return true;
  127. }
  128. void flush_outgoing_messages()
  129. {
  130. while (!m_queue.is_empty()) {
  131. auto& queued_message = m_queue.head();
  132. if (!try_send_message(queued_message.message, queued_message.extra_data))
  133. break;
  134. m_queue.dequeue();
  135. }
  136. }
  137. void drain_client()
  138. {
  139. unsigned messages_received = 0;
  140. for (;;) {
  141. ClientMessage message;
  142. // FIXME: Don't go one message at a time, that's so much context switching, oof.
  143. ssize_t nread = recv(m_socket->fd(), &message, sizeof(ClientMessage), MSG_DONTWAIT);
  144. if (nread == 0 || (nread == -1 && errno == EAGAIN)) {
  145. if (!messages_received) {
  146. CEventLoop::current().post_event(*this, make<DisconnectedEvent>(client_id()));
  147. }
  148. break;
  149. }
  150. if (nread < 0) {
  151. perror("recv");
  152. ASSERT_NOT_REACHED();
  153. }
  154. ByteBuffer extra_data;
  155. if (message.extra_size) {
  156. if (message.extra_size >= 32768) {
  157. dbgprintf("message.extra_size is way too large\n");
  158. return did_misbehave();
  159. }
  160. extra_data = ByteBuffer::create_uninitialized(message.extra_size);
  161. // FIXME: We should allow this to time out. Maybe use a socket timeout?
  162. int extra_nread = read(m_socket->fd(), extra_data.data(), extra_data.size());
  163. if (extra_nread != (int)message.extra_size) {
  164. dbgprintf("extra_nread(%d) != extra_size(%d)\n", extra_nread, extra_data.size());
  165. if (extra_nread < 0)
  166. perror("read");
  167. return did_misbehave();
  168. }
  169. }
  170. #if defined(CIPC_DEBUG)
  171. dbg() << "S: <- C " << int(message.type) << " extra " << extra_data.size();
  172. #endif
  173. if (!handle_message(message, move(extra_data)))
  174. return;
  175. ++messages_received;
  176. }
  177. }
  178. void did_misbehave()
  179. {
  180. dbgprintf("Connection{%p} (id=%d, pid=%d) misbehaved, disconnecting.\n", this, client_id(), m_client_pid);
  181. shutdown();
  182. }
  183. void shutdown()
  184. {
  185. m_socket->close();
  186. die();
  187. }
  188. int client_id() const { return m_client_id; }
  189. pid_t client_pid() const { return m_client_pid; }
  190. void set_client_pid(pid_t pid) { m_client_pid = pid; }
  191. // ### having this public is sad
  192. virtual void send_greeting() = 0;
  193. virtual void die() = 0;
  194. protected:
  195. void event(CEvent& event)
  196. {
  197. if (event.type() == Event::Disconnected) {
  198. int client_id = static_cast<const DisconnectedEvent&>(event).client_id();
  199. dbgprintf("Connection: Client disconnected: %d\n", client_id);
  200. die();
  201. return;
  202. }
  203. CObject::event(event);
  204. }
  205. virtual bool handle_message(const ClientMessage&, const ByteBuffer&& = {}) = 0;
  206. private:
  207. RefPtr<CLocalSocket> m_socket;
  208. struct QueuedMessage {
  209. ServerMessage message;
  210. ByteBuffer extra_data;
  211. };
  212. static const int max_queued_messages = 200;
  213. Queue<QueuedMessage, 16> m_queue;
  214. int m_client_id { -1 };
  215. int m_client_pid { -1 };
  216. };
  217. template<typename Endpoint>
  218. class ConnectionNG : public CObject {
  219. public:
  220. ConnectionNG(Endpoint& endpoint, CLocalSocket& socket, int client_id)
  221. : m_endpoint(endpoint)
  222. , m_socket(socket)
  223. , m_client_id(client_id)
  224. {
  225. add_child(socket);
  226. m_socket->on_ready_to_read = [this] { drain_client(); };
  227. }
  228. virtual ~ConnectionNG() override
  229. {
  230. }
  231. void post_message(const IMessage& message)
  232. {
  233. auto buffer = message.encode();
  234. int nwritten = write(m_socket->fd(), buffer.data(), (size_t)buffer.size());
  235. if (nwritten < 0) {
  236. switch (errno) {
  237. case EPIPE:
  238. dbg() << "Connection::post_message: Disconnected from peer";
  239. shutdown();
  240. return;
  241. case EAGAIN:
  242. dbg() << "Connection::post_message: Client buffer overflowed.";
  243. did_misbehave();
  244. return;
  245. default:
  246. perror("Connection::post_message write");
  247. ASSERT_NOT_REACHED();
  248. }
  249. }
  250. ASSERT(nwritten == buffer.size());
  251. }
  252. void drain_client()
  253. {
  254. unsigned messages_received = 0;
  255. for (;;) {
  256. u8 buffer[4096];
  257. ssize_t nread = recv(m_socket->fd(), buffer, sizeof(buffer), MSG_DONTWAIT);
  258. if (nread == 0 || (nread == -1 && errno == EAGAIN)) {
  259. if (!messages_received) {
  260. CEventLoop::current().post_event(*this, make<DisconnectedEvent>(client_id()));
  261. }
  262. break;
  263. }
  264. if (nread < 0) {
  265. perror("recv");
  266. ASSERT_NOT_REACHED();
  267. }
  268. auto message = m_endpoint.decode_message(ByteBuffer::wrap(buffer, nread));
  269. if (!message) {
  270. dbg() << "drain_client: Endpoint didn't recognize message";
  271. did_misbehave();
  272. return;
  273. }
  274. ++messages_received;
  275. auto response = m_endpoint.handle(*message);
  276. if (response)
  277. post_message(*response);
  278. }
  279. }
  280. void did_misbehave()
  281. {
  282. dbg() << "Connection{" << this << "} (id=" << m_client_id << ", pid=" << m_client_pid << ") misbehaved, disconnecting.";
  283. shutdown();
  284. }
  285. void shutdown()
  286. {
  287. m_socket->close();
  288. die();
  289. }
  290. int client_id() const { return m_client_id; }
  291. pid_t client_pid() const { return m_client_pid; }
  292. void set_client_pid(pid_t pid) { m_client_pid = pid; }
  293. virtual void die() = 0;
  294. protected:
  295. void event(CEvent& event) override
  296. {
  297. if (event.type() == Event::Disconnected) {
  298. int client_id = static_cast<const DisconnectedEvent&>(event).client_id();
  299. dbgprintf("Connection: Client disconnected: %d\n", client_id);
  300. die();
  301. return;
  302. }
  303. CObject::event(event);
  304. }
  305. private:
  306. Endpoint& m_endpoint;
  307. RefPtr<CLocalSocket> m_socket;
  308. int m_client_id { -1 };
  309. int m_client_pid { -1 };
  310. };
  311. } // Server
  312. } // IPC