ConnectionCache.cpp 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. /*
  2. * Copyright (c) 2021-2022, Ali Mohammad Pur <mpfard@serenityos.org>
  3. *
  4. * SPDX-License-Identifier: BSD-2-Clause
  5. */
  6. #include "ConnectionCache.h"
  7. #include <AK/Debug.h>
  8. #include <AK/Find.h>
  9. #include <LibCore/EventLoop.h>
  10. namespace RequestServer::ConnectionCache {
  11. Threading::RWLockProtected<HashMap<ConnectionKey, NonnullOwnPtr<Vector<NonnullOwnPtr<Connection<Core::TCPSocket, Core::Socket>>>>>> g_tcp_connection_cache {};
  12. Threading::RWLockProtected<HashMap<ConnectionKey, NonnullOwnPtr<Vector<NonnullOwnPtr<Connection<TLS::WolfTLS>>>>>> g_tls_connection_cache {};
  13. Threading::RWLockProtected<HashMap<ByteString, InferredServerProperties>> g_inferred_server_properties;
  14. void request_did_finish(URL::URL const& url, Core::Socket const* socket)
  15. {
  16. if (!socket) {
  17. dbgln("Request with a null socket finished for URL {}", url);
  18. return;
  19. }
  20. dbgln_if(REQUESTSERVER_DEBUG, "Request for {} finished", url);
  21. ConnectionKey partial_key { url.serialized_host().release_value_but_fixme_should_propagate_errors().to_byte_string(), url.port_or_default() };
  22. auto fire_off_next_job = [&](auto& cache) {
  23. using CacheType = typename RemoveCVReference<decltype(cache)>::ProtectedType;
  24. auto [it, end] = cache.with_read_locked([&](auto const& cache) {
  25. struct Result {
  26. decltype(cache.begin()) it;
  27. decltype(cache.end()) end;
  28. };
  29. return Result {
  30. find_if(cache.begin(), cache.end(), [&](auto& connection) {
  31. return connection.key.hostname == partial_key.hostname && connection.key.port == partial_key.port;
  32. }),
  33. cache.end(),
  34. };
  35. });
  36. if (it == end) {
  37. dbgln("Request for URL {} finished, but we don't own that!", url);
  38. return;
  39. }
  40. auto connection_it = it->value->find_if([&](auto& connection) { return connection->socket == socket; });
  41. if (connection_it.is_end()) {
  42. dbgln("Request for URL {} finished, but we don't have a socket for that!", url);
  43. return;
  44. }
  45. auto& connection = *connection_it;
  46. if constexpr (REQUESTSERVER_DEBUG) {
  47. connection->job_data->timing_info.performing_request = Duration::from_milliseconds(connection->job_data->timing_info.timer.elapsed_milliseconds());
  48. connection->job_data->timing_info.timer.start();
  49. }
  50. auto& properties = g_inferred_server_properties.with_write_locked([&](auto& map) -> InferredServerProperties& { return map.ensure(partial_key.hostname); });
  51. if (!connection->socket->is_open())
  52. properties.requests_served_per_connection = min(properties.requests_served_per_connection, connection->max_queue_length + 1);
  53. if (connection->request_queue.with_read_locked([](auto const& queue) { return queue.is_empty(); })) {
  54. // Immediately mark the connection as finished, as new jobs will never be run if they are queued
  55. // before the deferred_invoke() below runs otherwise.
  56. connection->has_started = false;
  57. connection->socket->set_notifications_enabled(false);
  58. Core::deferred_invoke([&connection, &cache_entry = *it->value, key = it->key, &cache] {
  59. if (connection->has_started)
  60. return;
  61. connection->current_url = {};
  62. connection->job_data = {};
  63. connection->removal_timer->on_timeout = [ptr = connection.ptr(), &cache_entry, key = move(key), &cache]() mutable {
  64. Core::deferred_invoke([&, key = move(key), ptr] {
  65. if (ptr->has_started)
  66. return;
  67. dbgln_if(REQUESTSERVER_DEBUG, "Removing no-longer-used connection {} (socket {})", ptr, ptr->socket.ptr());
  68. cache.with_write_locked([&](CacheType& cache) {
  69. auto did_remove = cache_entry.remove_first_matching([&](auto& entry) { return entry == ptr; });
  70. VERIFY(did_remove);
  71. if (cache_entry.is_empty())
  72. cache.remove(key);
  73. });
  74. });
  75. };
  76. connection->removal_timer->start();
  77. });
  78. } else {
  79. auto timer = Core::ElapsedTimer::start_new();
  80. if (auto result = recreate_socket_if_needed(*connection, url); result.is_error()) {
  81. if constexpr (REQUESTSERVER_DEBUG) {
  82. connection->job_data->timing_info.starting_connection += Duration::from_milliseconds(timer.elapsed_milliseconds());
  83. }
  84. cache.with_read_locked([&](auto&) {
  85. dbgln("ConnectionCache request finish handler, reconnection failed with {}", result.error());
  86. connection->job_data->fail(Core::NetworkJob::Error::ConnectionFailed);
  87. });
  88. return;
  89. }
  90. if constexpr (REQUESTSERVER_DEBUG) {
  91. connection->job_data->timing_info.starting_connection += Duration::from_milliseconds(timer.elapsed_milliseconds());
  92. }
  93. connection->has_started = true;
  94. Core::deferred_invoke([&connection = *connection, url, &cache] {
  95. cache.with_read_locked([&](auto&) {
  96. dbgln_if(REQUESTSERVER_DEBUG, "Running next job in queue for connection {}", &connection);
  97. connection.timer.start();
  98. connection.current_url = url;
  99. connection.job_data = connection.request_queue.with_write_locked([](auto& queue) { return queue.take_first(); });
  100. if constexpr (REQUESTSERVER_DEBUG) {
  101. connection.job_data->timing_info.waiting_in_queue = Duration::from_milliseconds(connection.job_data->timing_info.timer.elapsed_milliseconds() - connection.job_data->timing_info.performing_request.to_milliseconds());
  102. connection.job_data->timing_info.timer.start();
  103. }
  104. connection.socket->set_notifications_enabled(true);
  105. connection.job_data->start(*connection.socket);
  106. });
  107. });
  108. }
  109. };
  110. if (is<Core::BufferedSocket<TLS::WolfTLS>>(socket))
  111. fire_off_next_job(g_tls_connection_cache);
  112. else if (is<Core::BufferedSocket<Core::Socket>>(socket))
  113. fire_off_next_job(g_tcp_connection_cache);
  114. else
  115. dbgln("Unknown socket {} finished for URL {}", socket, url);
  116. }
  117. void dump_jobs()
  118. {
  119. g_tls_connection_cache.with_read_locked([](auto& cache) {
  120. dbgln("=========== TLS Connection Cache ==========");
  121. for (auto& connection : cache) {
  122. dbgln(" - {}:{}", connection.key.hostname, connection.key.port);
  123. for (auto& entry : *connection.value) {
  124. dbgln(" - Connection {} (started={}) (socket={})", &entry, entry->has_started, entry->socket.ptr());
  125. dbgln(" Currently loading {} ({} elapsed)", entry->current_url, entry->timer.is_valid() ? entry->timer.elapsed() : 0);
  126. dbgln(" Request Queue:");
  127. entry->request_queue.for_each_locked([](auto const& job) {
  128. dbgln(" - {}", &job);
  129. });
  130. }
  131. }
  132. });
  133. g_tcp_connection_cache.with_read_locked([](auto& cache) {
  134. dbgln("=========== TCP Connection Cache ==========");
  135. for (auto& connection : cache) {
  136. dbgln(" - {}:{}", connection.key.hostname, connection.key.port);
  137. for (auto& entry : *connection.value) {
  138. dbgln(" - Connection {} (started={}) (socket={})", &entry, entry->has_started, entry->socket.ptr());
  139. dbgln(" Currently loading {} ({} elapsed)", entry->current_url, entry->timer.is_valid() ? entry->timer.elapsed() : 0);
  140. dbgln(" Request Queue:");
  141. entry->request_queue.for_each_locked([](auto const& job) {
  142. dbgln(" - {}", &job);
  143. });
  144. }
  145. }
  146. });
  147. }
  148. size_t hits;
  149. size_t misses;
  150. }