2021-10-23 19:48:42 +00:00
/*
* Copyright ( c ) 2021 , Andreas Kling < kling @ serenityos . org >
2022-03-03 18:37:49 +00:00
* Copyright ( c ) 2022 , the SerenityOS developers .
2021-10-23 19:48:42 +00:00
*
* SPDX - License - Identifier : BSD - 2 - Clause
*/
2021-11-28 08:59:36 +00:00
# include <LibCore/System.h>
2021-10-23 19:48:42 +00:00
# include <LibIPC/Connection.h>
# include <LibIPC/Stub.h>
2023-04-24 05:39:55 +00:00
# include <sched.h>
2021-10-23 19:48:42 +00:00
# include <sys/select.h>
namespace IPC {
2022-10-05 18:09:55 +00:00
struct CoreEventLoopDeferredInvoker final : public DeferredInvoker {
virtual ~ CoreEventLoopDeferredInvoker ( ) = default ;
virtual void schedule ( Function < void ( ) > callback ) override
{
Core : : deferred_invoke ( move ( callback ) ) ;
}
} ;
2023-02-08 22:05:44 +00:00
ConnectionBase : : ConnectionBase ( IPC : : Stub & local_stub , NonnullOwnPtr < Core : : LocalSocket > socket , u32 local_endpoint_magic )
2021-10-23 19:48:42 +00:00
: m_local_stub ( local_stub )
, m_socket ( move ( socket ) )
2021-10-23 20:59:04 +00:00
, m_local_endpoint_magic ( local_endpoint_magic )
2022-10-05 18:09:55 +00:00
, m_deferred_invoker ( make < CoreEventLoopDeferredInvoker > ( ) )
2021-10-23 19:48:42 +00:00
{
2023-01-11 16:31:10 +00:00
m_responsiveness_timer = Core : : Timer : : create_single_shot ( 3000 , [ this ] { may_have_become_unresponsive ( ) ; } ) . release_value_but_fixme_should_propagate_errors ( ) ;
2021-10-23 19:48:42 +00:00
}
2022-10-05 18:09:55 +00:00
void ConnectionBase : : set_deferred_invoker ( NonnullOwnPtr < DeferredInvoker > deferred_invoker )
{
m_deferred_invoker = move ( deferred_invoker ) ;
}
2023-02-08 22:05:44 +00:00
void ConnectionBase : : set_fd_passing_socket ( NonnullOwnPtr < Core : : LocalSocket > socket )
2022-10-05 11:54:09 +00:00
{
m_fd_passing_socket = move ( socket ) ;
}
2023-02-08 22:05:44 +00:00
Core : : LocalSocket & ConnectionBase : : fd_passing_socket ( )
2022-10-05 11:54:09 +00:00
{
if ( m_fd_passing_socket )
return * m_fd_passing_socket ;
return * m_socket ;
}
2021-11-28 08:59:36 +00:00
ErrorOr < void > ConnectionBase : : post_message ( Message const & message )
2021-10-23 19:48:42 +00:00
{
2023-01-02 04:58:49 +00:00
return post_message ( TRY ( message . encode ( ) ) ) ;
2021-10-23 19:48:42 +00:00
}
2021-11-28 08:59:36 +00:00
ErrorOr < void > ConnectionBase : : post_message ( MessageBuffer buffer )
2021-10-23 19:48:42 +00:00
{
// NOTE: If this connection is being shut down, but has not yet been destroyed,
// the socket will be closed. Don't try to send more messages.
if ( ! m_socket - > is_open ( ) )
2022-07-11 17:57:32 +00:00
return Error : : from_string_literal ( " Trying to post_message during IPC shutdown " ) ;
2021-10-23 19:48:42 +00:00
// Prepend the message size.
uint32_t message_size = buffer . data . size ( ) ;
2022-04-01 17:58:27 +00:00
TRY ( buffer . data . try_prepend ( reinterpret_cast < u8 const * > ( & message_size ) , sizeof ( message_size ) ) ) ;
2021-10-23 19:48:42 +00:00
for ( auto & fd : buffer . fds ) {
2023-03-06 13:17:01 +00:00
if ( auto result = fd_passing_socket ( ) . send_fd ( fd - > value ( ) ) ; result . is_error ( ) ) {
2022-06-10 15:21:00 +00:00
shutdown_with_error ( result . error ( ) ) ;
2021-11-28 08:59:36 +00:00
return result ;
2021-10-23 19:48:42 +00:00
}
}
2022-01-14 13:12:49 +00:00
ReadonlyBytes bytes_to_write { buffer . data . span ( ) } ;
2022-07-03 16:29:06 +00:00
int writes_done = 0 ;
size_t initial_size = bytes_to_write . size ( ) ;
2022-01-14 13:12:49 +00:00
while ( ! bytes_to_write . is_empty ( ) ) {
2023-02-24 21:38:01 +00:00
auto maybe_nwritten = m_socket - > write_some ( bytes_to_write ) ;
2022-07-03 16:29:06 +00:00
writes_done + + ;
2022-01-14 13:12:49 +00:00
if ( maybe_nwritten . is_error ( ) ) {
auto error = maybe_nwritten . release_error ( ) ;
if ( error . is_errno ( ) ) {
2022-07-03 16:29:06 +00:00
// FIXME: This is a hacky way to at least not crash on large messages
// The limit of 100 writes is arbitrary, and there to prevent indefinite spinning on the EventLoop
if ( error . code ( ) = = EAGAIN & & writes_done < 100 ) {
sched_yield ( ) ;
continue ;
}
2022-06-10 15:21:00 +00:00
shutdown_with_error ( error ) ;
2022-01-14 13:12:49 +00:00
switch ( error . code ( ) ) {
case EPIPE :
2022-07-11 17:57:32 +00:00
return Error : : from_string_literal ( " IPC::Connection::post_message: Disconnected from peer " ) ;
2022-01-14 13:12:49 +00:00
case EAGAIN :
2022-07-11 17:57:32 +00:00
return Error : : from_string_literal ( " IPC::Connection::post_message: Peer buffer overflowed " ) ;
2022-01-14 13:12:49 +00:00
default :
return Error : : from_syscall ( " IPC::Connection::post_message write " sv , - error . code ( ) ) ;
}
} else {
return error ;
2021-10-23 19:48:42 +00:00
}
}
2022-01-14 13:12:49 +00:00
bytes_to_write = bytes_to_write . slice ( maybe_nwritten . value ( ) ) ;
2021-10-23 19:48:42 +00:00
}
2022-07-03 16:29:06 +00:00
if ( writes_done > 1 ) {
dbgln ( " LibIPC::Connection FIXME Warning, needed {} writes needed to send message of size {}B, this is pretty bad, as it spins on the EventLoop " , writes_done , initial_size ) ;
}
2021-10-23 19:48:42 +00:00
2023-04-24 10:25:14 +00:00
m_responsiveness_timer - > start ( ) ;
2021-11-28 08:59:36 +00:00
return { } ;
2021-10-23 19:48:42 +00:00
}
void ConnectionBase : : shutdown ( )
{
m_socket - > close ( ) ;
die ( ) ;
}
2022-06-10 15:21:00 +00:00
void ConnectionBase : : shutdown_with_error ( Error const & error )
{
dbgln ( " IPC::ConnectionBase ({:p}) had an error ({}), disconnecting. " , this , error ) ;
shutdown ( ) ;
}
2021-10-23 20:59:04 +00:00
void ConnectionBase : : handle_messages ( )
2021-10-23 19:48:42 +00:00
{
auto messages = move ( m_unprocessed_messages ) ;
for ( auto & message : messages ) {
2023-03-06 16:16:25 +00:00
if ( message - > endpoint_magic ( ) = = m_local_endpoint_magic ) {
auto handler_result = m_local_stub . handle ( * message ) ;
2023-01-02 04:58:49 +00:00
if ( handler_result . is_error ( ) ) {
dbgln ( " IPC::ConnectionBase::handle_messages: {} " , handler_result . error ( ) ) ;
continue ;
}
if ( auto response = handler_result . release_value ( ) ) {
if ( auto post_result = post_message ( * response ) ; post_result . is_error ( ) ) {
dbgln ( " IPC::ConnectionBase::handle_messages: {} " , post_result . error ( ) ) ;
2021-11-28 08:59:36 +00:00
}
}
}
2021-10-23 19:48:42 +00:00
}
}
void ConnectionBase : : wait_for_socket_to_become_readable ( )
{
2022-01-14 13:12:49 +00:00
auto maybe_did_become_readable = m_socket - > can_read_without_blocking ( - 1 ) ;
if ( maybe_did_become_readable . is_error ( ) ) {
dbgln ( " ConnectionBase::wait_for_socket_to_become_readable: {} " , maybe_did_become_readable . error ( ) ) ;
warnln ( " ConnectionBase::wait_for_socket_to_become_readable: {} " , maybe_did_become_readable . error ( ) ) ;
VERIFY_NOT_REACHED ( ) ;
2021-10-23 19:48:42 +00:00
}
2022-01-14 13:12:49 +00:00
VERIFY ( maybe_did_become_readable . value ( ) ) ;
2021-10-23 19:48:42 +00:00
}
2021-11-07 10:59:43 +00:00
ErrorOr < Vector < u8 > > ConnectionBase : : read_as_much_as_possible_from_socket_without_blocking ( )
2021-10-23 19:48:42 +00:00
{
Vector < u8 > bytes ;
if ( ! m_unprocessed_bytes . is_empty ( ) ) {
bytes . append ( m_unprocessed_bytes . data ( ) , m_unprocessed_bytes . size ( ) ) ;
m_unprocessed_bytes . clear ( ) ;
}
2022-01-14 13:12:49 +00:00
u8 buffer [ 4096 ] ;
2022-11-24 07:03:11 +00:00
bool should_shut_down = false ;
auto schedule_shutdown = [ this , & should_shut_down ] ( ) {
should_shut_down = true ;
m_deferred_invoker - > schedule ( [ strong_this = NonnullRefPtr ( * this ) ] {
strong_this - > shutdown ( ) ;
} ) ;
} ;
2021-10-23 19:48:42 +00:00
while ( m_socket - > is_open ( ) ) {
2022-04-15 14:11:11 +00:00
auto maybe_bytes_read = m_socket - > read_without_waiting ( { buffer , 4096 } ) ;
if ( maybe_bytes_read . is_error ( ) ) {
auto error = maybe_bytes_read . release_error ( ) ;
2022-01-14 13:12:49 +00:00
if ( error . is_syscall ( ) & & error . code ( ) = = EAGAIN ) {
2021-10-23 19:48:42 +00:00
break ;
2022-01-14 13:12:49 +00:00
}
2022-11-24 07:03:11 +00:00
if ( error . is_syscall ( ) & & error . code ( ) = = ECONNRESET ) {
schedule_shutdown ( ) ;
break ;
}
2022-01-14 13:12:49 +00:00
dbgln ( " ConnectionBase::read_as_much_as_possible_from_socket_without_blocking: {} " , error ) ;
warnln ( " ConnectionBase::read_as_much_as_possible_from_socket_without_blocking: {} " , error ) ;
VERIFY_NOT_REACHED ( ) ;
2021-10-23 19:48:42 +00:00
}
2022-01-14 13:12:49 +00:00
2022-04-15 14:11:11 +00:00
auto bytes_read = maybe_bytes_read . release_value ( ) ;
if ( bytes_read . is_empty ( ) ) {
2022-11-24 07:03:11 +00:00
schedule_shutdown ( ) ;
break ;
2021-10-23 19:48:42 +00:00
}
2022-01-14 13:12:49 +00:00
2022-04-15 14:11:11 +00:00
bytes . append ( bytes_read . data ( ) , bytes_read . size ( ) ) ;
2021-10-23 19:48:42 +00:00
}
if ( ! bytes . is_empty ( ) ) {
m_responsiveness_timer - > stop ( ) ;
did_become_responsive ( ) ;
2022-11-24 07:03:11 +00:00
} else if ( should_shut_down ) {
return Error : : from_string_literal ( " IPC connection EOF " ) ;
2021-10-23 19:48:42 +00:00
}
return bytes ;
}
2021-11-07 10:59:43 +00:00
ErrorOr < void > ConnectionBase : : drain_messages_from_peer ( )
2021-10-23 20:16:53 +00:00
{
auto bytes = TRY ( read_as_much_as_possible_from_socket_without_blocking ( ) ) ;
size_t index = 0 ;
try_parse_messages ( bytes , index ) ;
if ( index < bytes . size ( ) ) {
// Sometimes we might receive a partial message. That's okay, just stash away
// the unprocessed bytes and we'll prepend them to the next incoming message
// in the next run of this function.
2022-01-20 17:47:39 +00:00
auto remaining_bytes = TRY ( ByteBuffer : : copy ( bytes . span ( ) . slice ( index ) ) ) ;
2021-10-23 20:16:53 +00:00
if ( ! m_unprocessed_bytes . is_empty ( ) ) {
shutdown ( ) ;
2022-07-11 17:57:32 +00:00
return Error : : from_string_literal ( " drain_messages_from_peer: Already have unprocessed bytes " ) ;
2021-10-23 20:16:53 +00:00
}
2022-01-20 17:47:39 +00:00
m_unprocessed_bytes = move ( remaining_bytes ) ;
2021-10-23 20:16:53 +00:00
}
if ( ! m_unprocessed_messages . is_empty ( ) ) {
2022-11-19 01:09:53 +00:00
m_deferred_invoker - > schedule ( [ strong_this = NonnullRefPtr ( * this ) ] {
2022-10-05 18:09:55 +00:00
strong_this - > handle_messages ( ) ;
2021-11-03 18:55:59 +00:00
} ) ;
2021-10-23 20:16:53 +00:00
}
2021-11-07 10:59:43 +00:00
return { } ;
2021-10-23 20:16:53 +00:00
}
2021-10-23 20:59:04 +00:00
OwnPtr < IPC : : Message > ConnectionBase : : wait_for_specific_endpoint_message_impl ( u32 endpoint_magic , int message_id )
2021-10-23 20:21:47 +00:00
{
for ( ; ; ) {
// Double check we don't already have the event waiting for us.
// Otherwise we might end up blocked for a while for no reason.
for ( size_t i = 0 ; i < m_unprocessed_messages . size ( ) ; + + i ) {
auto & message = m_unprocessed_messages [ i ] ;
2023-03-06 16:16:25 +00:00
if ( message - > endpoint_magic ( ) ! = endpoint_magic )
2021-10-23 20:21:47 +00:00
continue ;
2023-03-06 16:16:25 +00:00
if ( message - > message_id ( ) = = message_id )
2021-10-23 20:21:47 +00:00
return m_unprocessed_messages . take ( i ) ;
}
if ( ! m_socket - > is_open ( ) )
break ;
wait_for_socket_to_become_readable ( ) ;
2021-11-07 10:59:43 +00:00
if ( drain_messages_from_peer ( ) . is_error ( ) )
2021-10-23 20:21:47 +00:00
break ;
}
return { } ;
}
2021-10-23 19:48:42 +00:00
}