|
@@ -7,6 +7,7 @@
|
|
|
#include <LibCore/CSyscallUtils.h>
|
|
|
#include <LibIPC/IMessage.h>
|
|
|
#include <stdio.h>
|
|
|
+#include <stdlib.h>
|
|
|
#include <sys/select.h>
|
|
|
#include <sys/socket.h>
|
|
|
#include <sys/types.h>
|
|
@@ -49,19 +50,19 @@ namespace Client {
|
|
|
class Connection : public CObject {
|
|
|
public:
|
|
|
Connection(const StringView& address)
|
|
|
- : m_connection(this)
|
|
|
- , m_notifier(CNotifier::create(m_connection.fd(), CNotifier::Read, this))
|
|
|
+ : m_connection(CLocalSocket::construct(this))
|
|
|
+ , m_notifier(CNotifier::create(m_connection->fd(), CNotifier::Read, this))
|
|
|
{
|
|
|
// We want to rate-limit our clients
|
|
|
- m_connection.set_blocking(true);
|
|
|
+ m_connection->set_blocking(true);
|
|
|
m_notifier->on_ready_to_read = [this] {
|
|
|
drain_messages_from_server();
|
|
|
- CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection.fd()));
|
|
|
+ CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection->fd()));
|
|
|
};
|
|
|
|
|
|
int retries = 1000;
|
|
|
while (retries) {
|
|
|
- if (m_connection.connect(CSocketAddress::local(address))) {
|
|
|
+ if (m_connection->connect(CSocketAddress::local(address))) {
|
|
|
break;
|
|
|
}
|
|
|
|
|
@@ -69,7 +70,7 @@ namespace Client {
|
|
|
sleep(1);
|
|
|
--retries;
|
|
|
}
|
|
|
- ASSERT(m_connection.is_connected());
|
|
|
+ ASSERT(m_connection->is_connected());
|
|
|
}
|
|
|
|
|
|
virtual void handshake() = 0;
|
|
@@ -97,20 +98,20 @@ namespace Client {
|
|
|
if (m_unprocessed_bundles[i].message.type == type) {
|
|
|
event = move(m_unprocessed_bundles[i].message);
|
|
|
m_unprocessed_bundles.remove(i);
|
|
|
- CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection.fd()));
|
|
|
+ CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection->fd()));
|
|
|
return true;
|
|
|
}
|
|
|
}
|
|
|
for (;;) {
|
|
|
fd_set rfds;
|
|
|
FD_ZERO(&rfds);
|
|
|
- FD_SET(m_connection.fd(), &rfds);
|
|
|
- int rc = CSyscallUtils::safe_syscall(select, m_connection.fd() + 1, &rfds, nullptr, nullptr, nullptr);
|
|
|
+ FD_SET(m_connection->fd(), &rfds);
|
|
|
+ int rc = CSyscallUtils::safe_syscall(select, m_connection->fd() + 1, &rfds, nullptr, nullptr, nullptr);
|
|
|
if (rc < 0) {
|
|
|
perror("select");
|
|
|
}
|
|
|
ASSERT(rc > 0);
|
|
|
- ASSERT(FD_ISSET(m_connection.fd(), &rfds));
|
|
|
+ ASSERT(FD_ISSET(m_connection->fd(), &rfds));
|
|
|
bool success = drain_messages_from_server();
|
|
|
if (!success)
|
|
|
return false;
|
|
@@ -118,7 +119,7 @@ namespace Client {
|
|
|
if (m_unprocessed_bundles[i].message.type == type) {
|
|
|
event = move(m_unprocessed_bundles[i].message);
|
|
|
m_unprocessed_bundles.remove(i);
|
|
|
- CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection.fd()));
|
|
|
+ CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection->fd()));
|
|
|
return true;
|
|
|
}
|
|
|
}
|
|
@@ -144,7 +145,7 @@ namespace Client {
|
|
|
++iov_count;
|
|
|
}
|
|
|
|
|
|
- int nwritten = writev(m_connection.fd(), iov, iov_count);
|
|
|
+ int nwritten = writev(m_connection->fd(), iov, iov_count);
|
|
|
if (nwritten < 0) {
|
|
|
perror("writev");
|
|
|
ASSERT_NOT_REACHED();
|
|
@@ -196,7 +197,7 @@ namespace Client {
|
|
|
{
|
|
|
for (;;) {
|
|
|
ServerMessage message;
|
|
|
- ssize_t nread = recv(m_connection.fd(), &message, sizeof(ServerMessage), MSG_DONTWAIT);
|
|
|
+ ssize_t nread = recv(m_connection->fd(), &message, sizeof(ServerMessage), MSG_DONTWAIT);
|
|
|
if (nread < 0) {
|
|
|
if (errno == EAGAIN) {
|
|
|
return true;
|
|
@@ -214,7 +215,7 @@ namespace Client {
|
|
|
ByteBuffer extra_data;
|
|
|
if (message.extra_size) {
|
|
|
extra_data = ByteBuffer::create_uninitialized(message.extra_size);
|
|
|
- int extra_nread = read(m_connection.fd(), extra_data.data(), extra_data.size());
|
|
|
+ int extra_nread = read(m_connection->fd(), extra_data.data(), extra_data.size());
|
|
|
if (extra_nread < 0) {
|
|
|
perror("read");
|
|
|
ASSERT_NOT_REACHED();
|
|
@@ -228,7 +229,7 @@ namespace Client {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- CLocalSocket m_connection;
|
|
|
+ ObjectPtr<CLocalSocket> m_connection;
|
|
|
ObjectPtr<CNotifier> m_notifier;
|
|
|
Vector<IncomingMessageBundle> m_unprocessed_bundles;
|
|
|
int m_server_pid { -1 };
|
|
@@ -239,19 +240,19 @@ namespace Client {
|
|
|
class ConnectionNG : public CObject {
|
|
|
public:
|
|
|
ConnectionNG(const StringView& address)
|
|
|
- : m_connection(this)
|
|
|
- , m_notifier(CNotifier::create(m_connection.fd(), CNotifier::Read, this))
|
|
|
+ : m_connection(CLocalSocket::construct(this))
|
|
|
+ , m_notifier(CNotifier::create(m_connection->fd(), CNotifier::Read, this))
|
|
|
{
|
|
|
// We want to rate-limit our clients
|
|
|
- m_connection.set_blocking(true);
|
|
|
+ m_connection->set_blocking(true);
|
|
|
m_notifier->on_ready_to_read = [this] {
|
|
|
drain_messages_from_server();
|
|
|
- CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection.fd()));
|
|
|
+ CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection->fd()));
|
|
|
};
|
|
|
|
|
|
int retries = 1000;
|
|
|
while (retries) {
|
|
|
- if (m_connection.connect(CSocketAddress::local(address))) {
|
|
|
+ if (m_connection->connect(CSocketAddress::local(address))) {
|
|
|
break;
|
|
|
}
|
|
|
|
|
@@ -259,7 +260,7 @@ namespace Client {
|
|
|
sleep(1);
|
|
|
--retries;
|
|
|
}
|
|
|
- ASSERT(m_connection.is_connected());
|
|
|
+ ASSERT(m_connection->is_connected());
|
|
|
}
|
|
|
|
|
|
virtual void handshake() = 0;
|
|
@@ -287,20 +288,20 @@ namespace Client {
|
|
|
if (m_unprocessed_messages[i]->id() == MessageType::static_message_id()) {
|
|
|
auto message = move(m_unprocessed_messages[i]);
|
|
|
m_unprocessed_messages.remove(i);
|
|
|
- CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection.fd()));
|
|
|
+ CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection->fd()));
|
|
|
return message;
|
|
|
}
|
|
|
}
|
|
|
for (;;) {
|
|
|
fd_set rfds;
|
|
|
FD_ZERO(&rfds);
|
|
|
- FD_SET(m_connection.fd(), &rfds);
|
|
|
- int rc = CSyscallUtils::safe_syscall(select, m_connection.fd() + 1, &rfds, nullptr, nullptr, nullptr);
|
|
|
+ FD_SET(m_connection->fd(), &rfds);
|
|
|
+ int rc = CSyscallUtils::safe_syscall(select, m_connection->fd() + 1, &rfds, nullptr, nullptr, nullptr);
|
|
|
if (rc < 0) {
|
|
|
perror("select");
|
|
|
}
|
|
|
ASSERT(rc > 0);
|
|
|
- ASSERT(FD_ISSET(m_connection.fd(), &rfds));
|
|
|
+ ASSERT(FD_ISSET(m_connection->fd(), &rfds));
|
|
|
bool success = drain_messages_from_server();
|
|
|
if (!success)
|
|
|
return nullptr;
|
|
@@ -308,7 +309,7 @@ namespace Client {
|
|
|
if (m_unprocessed_messages[i]->id() == MessageType::static_message_id()) {
|
|
|
auto message = move(m_unprocessed_messages[i]);
|
|
|
m_unprocessed_messages.remove(i);
|
|
|
- CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection.fd()));
|
|
|
+ CEventLoop::current().post_event(*this, make<PostProcessEvent>(m_connection->fd()));
|
|
|
return message;
|
|
|
}
|
|
|
}
|
|
@@ -318,7 +319,7 @@ namespace Client {
|
|
|
bool post_message_to_server(const IMessage& message)
|
|
|
{
|
|
|
auto buffer = message.encode();
|
|
|
- int nwritten = write(m_connection.fd(), buffer.data(), (size_t)buffer.size());
|
|
|
+ int nwritten = write(m_connection->fd(), buffer.data(), (size_t)buffer.size());
|
|
|
if (nwritten < 0) {
|
|
|
perror("write");
|
|
|
ASSERT_NOT_REACHED();
|
|
@@ -349,7 +350,7 @@ namespace Client {
|
|
|
{
|
|
|
for (;;) {
|
|
|
u8 buffer[4096];
|
|
|
- ssize_t nread = recv(m_connection.fd(), buffer, sizeof(buffer), MSG_DONTWAIT);
|
|
|
+ ssize_t nread = recv(m_connection->fd(), buffer, sizeof(buffer), MSG_DONTWAIT);
|
|
|
if (nread < 0) {
|
|
|
if (errno == EAGAIN) {
|
|
|
return true;
|
|
@@ -371,7 +372,7 @@ namespace Client {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- CLocalSocket m_connection;
|
|
|
+ ObjectPtr<CLocalSocket> m_connection;
|
|
|
ObjectPtr<CNotifier> m_notifier;
|
|
|
Vector<OwnPtr<IMessage>> m_unprocessed_messages;
|
|
|
int m_server_pid { -1 };
|