More work on the ana server, it now handshakes properly...

...with connecting clients. Work remains to set things up for further
communication.
This commit is contained in:
Guillermo Biset 2010-07-12 08:16:07 +00:00
parent e8acda4077
commit 1a5cc1437e
5 changed files with 137 additions and 36 deletions

View file

@ -386,7 +386,7 @@ namespace ana
* \sa send_handler
*/
virtual void send_if(boost::asio::const_buffer,
send_handler*, const client_predicate&, send_type = COPY_BUFFER ) = 0;
send_handler*, const client_predicate&, send_type = COPY_BUFFER ) = 0;
/**
* Send a buffer to every connected client except one. Equals a send_all if the client doesn't exist.
@ -435,6 +435,15 @@ namespace ana
*/
virtual void run(port pt) = 0;
/**
* Disconnect a connected client by force.
*
* @param id : The net_id of the connected client.
*
* \sa net_id
*/
virtual void disconnect( net_id id ) = 0;
/** Returns the string representing the ip address of the connected client with id net_id. */
virtual std::string ip_address( net_id ) const = 0;

View file

@ -137,11 +137,14 @@ void asio_server::deregister_client(client_proxy* client)
}
void asio_server::handle_accept(const boost::system::error_code& ec,
asio_client_proxy* client,
connection_handler* handler )
asio_client_proxy* client,
connection_handler* handler )
{
if (! ec)
{
if ( raw_mode() ) // only test for the non default setting
client->set_raw_data_mode();
register_client(client);
handler->handle_connect( ec, client->id() );
}
@ -291,6 +294,36 @@ void asio_server::asio_client_proxy::stop_logging()
stats_collector_ = NULL;
}
void asio_server::disconnect( ana::net_id id )
{
std::list<ana::server::client_proxy*>::const_iterator it;
it = std::find_if( client_proxies_.begin(), client_proxies_.end(),
boost::bind( &client_proxy::id, _1) == id );
if ( it != client_proxies_.end() )
delete *it;
}
void asio_server::disconnect()
{
io_service_.stop();
io_thread_.join();
for (std::list<client_proxy*>::iterator it = client_proxies_.begin(); it != client_proxies_.end(); ++it)
delete *it;
client_proxies_.clear();
io_service_.reset();
}
void asio_server::set_raw_buffer_max_size( size_t size)
{
for (std::list<client_proxy*>::iterator it = client_proxies_.begin(); it != client_proxies_.end(); ++it)
(*it)->set_raw_buffer_max_size( size );
}
const ana::stats* asio_server::asio_client_proxy::get_stats( ana::stat_type type ) const
{
if (stats_collector_ != NULL )

View file

@ -119,9 +119,13 @@ class asio_server : public ana::server,
virtual const ana::stats* get_stats( ana::stat_type type ) const;
virtual void disconnect() {}
virtual void disconnect( ana::net_id );
virtual void disconnect();
virtual void set_raw_buffer_max_size( size_t );
// TODO: implement this, or improve OO design with 2nd param (vote for 2)
virtual void wait_raw_object(ana::serializer::bistream& , size_t ) {}
virtual void set_raw_buffer_max_size( size_t ) {}
virtual ana::stats_collector* stats_collector() { return stats_collector_; }

View file

@ -473,7 +473,7 @@ const ana::stats* ana_component::get_stats() const
if ( is_server_)
return server()->get_stats();
else
return client()->get_stats();
return client()->get_stats(); // TODO: listener()->get_stats(); ?
}
void ana_component::add_buffer(ana::detail::read_buffer buffer, ana::net_id id)
@ -518,9 +518,11 @@ network::connection ana_component::oldest_sender_id_still_pending()
// Begin clients_manager implementation ---------------------------------------------------------------
clients_manager::clients_manager() :
clients_manager::clients_manager( ana::server* server) :
server_( server ),
ids_(),
pending_ids_()
pending_ids_(),
pending_handshakes_()
{
}
@ -531,10 +533,12 @@ size_t clients_manager::client_amount() const
void clients_manager::handle_connect(ana::error_code error, ana::net_id client)
{
std::cout << "New client connected with id " << client << "\n";
if (! error )
{
ids_.insert(client);
pending_ids_.insert( network::connection( client ) );
ids_.insert( client );
pending_handshakes_.insert( client );
}
}
@ -544,11 +548,21 @@ void clients_manager::handle_disconnect(ana::error_code /*error*/, ana::net_id c
pending_ids_.erase( network::connection( client ) );
}
void clients_manager::has_connected( network::connection id )
{
pending_ids_.insert( id );
}
bool clients_manager::has_connection_pending() const
{
return ! pending_ids_.empty();
}
bool clients_manager::is_pending_handshake( ana::net_id id ) const
{
return pending_handshakes_.find( id ) != pending_handshakes_.end();
}
network::connection clients_manager::get_pending_connection_id()
{
const network::connection result = *pending_ids_.begin();
@ -574,7 +588,7 @@ ana::net_id ana_network_manager::create_server( )
ana::server* server = new_component->server();
clients_manager* manager = new clients_manager();
clients_manager* manager = new clients_manager( server );
server_manager_[ server ] = manager;
server->set_connection_handler( manager );
@ -602,7 +616,7 @@ network::connection ana_network_manager::create_client_and_connect(std::string h
ana_connect_handler handler(connect_timer_);
connect_timer_->wait( ana::time::seconds(10), // 10 seconds to connection timeout, will be configurable
connect_timer_->wait( ana::time::seconds(10), // 10 seconds to connection timeout, TODO: configurable
boost::bind(&ana_connect_handler::handle_timeout, &handler, ana::timeout_error) );
client->set_raw_data_mode();
@ -850,34 +864,29 @@ size_t ana_network_manager::send_raw_data( const char* base_char, size_t size, n
return 0;
}
void ana_network_manager::send_all_except(const config& /*cfg*/, network::connection /*connection_num*/)
void ana_network_manager::send_all_except(const config& cfg, network::connection connection_num)
{
throw std::runtime_error("send_all_except is not finished.");
/*
std::cout << "DEBUG: send_all_except " << connection_num << "\n";
std::stringstream out;
config_writer cfg_writer(out, true);
cfg_writer.write(cfg);
std::ostringstream out;
compress_config(cfg, out );
std::set<ana_component*>::iterator it;
ana_component_set::iterator it;
for (it = components_.begin(); it != components_.end(); ++it)
ana::net_id id_to_avoid( connection_num ); //I should have issued this id earlier
for ( it = components_.begin(); it != components_.end(); ++it)
{
std::cout << "DEBUG: found component with wesnoth id " << (*it)->get_wesnoth_id() << "\n";
if ( (*it)->get_wesnoth_id() != connection_num )
{
ana_send_handler handler( *it, out.str().size() );
(*it)->get_id();
if ( (*it)->is_server() )
(*it)->server()->send_all( ana::buffer( out.str() ), &handler, ana::ZERO_COPY);
else
(*it)->client()->send( ana::buffer( out.str() ), &handler, ana::ZERO_COPY );
if ((*it)->is_client())
throw std::runtime_error("send_all_except shouldn't be used on clients.");
handler.wait_completion();
}
ana_send_handler handler( out.str().size() );
(*it)->server()->send_all_except( id_to_avoid, ana::buffer( out.str() ), &handler, ana::ZERO_COPY);
handler.wait_completion();
}
*/
}
network::connection ana_network_manager::read_from_ready_buffer( const ana_component_set::iterator& it, config& cfg)
@ -1079,16 +1088,54 @@ void ana_network_manager::handle_message( ana::error_code error,
std::set< ana_component* >::iterator it;
for (it = components_.begin(); it != components_.end(); ++it)
std::cout << "DEBUG: Component id : " << (*it)->get_id() << "\n";
it = std::find_if( components_.begin(), components_.end(),
boost::bind(&ana_component::get_id, _1) == client );
if ( it != components_.end() )
(*it)->add_buffer( buffer, client );
else
throw std::runtime_error("Received message from a non connected component.");
{
// I received info from something not directly in components_, it must be the id of a client in a server
it = components_.begin();
if ( ( ! components_.empty() ) && (*it)->is_server() )
{
clients_manager* clients_mgr = server_manager_[ (*it)->server() ];
if ( clients_mgr->is_pending_handshake( client ) )
{
if ( buffer->size() != sizeof(uint32_t) ) // all handshakes are 4 bytes long
(*it)->server()->disconnect( client );
uint32_t handshake;
{
ana::serializer::bistream bis( buffer->string() );
bis >> handshake;
ana::network_to_host_long( handshake ); //not necessary since I'm expecting a 0 anyway
}
if ( handshake != 0 )
(*it)->server()->disconnect( client );
else
{
clients_mgr->has_connected( network::connection( client ) );
//send back it's id
ana::serializer::bostream bos;
uint32_t network_byte_order_id = client;
ana::host_to_network_long( network_byte_order_id );
bos << network_byte_order_id;
send_raw_data( bos.str().c_str(), bos.str().size(), client );
}
}
else
(*it)->add_buffer( buffer, client );
}
}
}
}

View file

@ -103,6 +103,7 @@ class ana_component
/** Returns the network id of the oldest sender of a pending buffer. */
network::connection oldest_sender_id_still_pending();
/** Returns true iff. the component has a read buffer ready that hasn't been returned. */
bool new_buffer_ready(); // non const due to mutex blockage
private:
@ -328,13 +329,17 @@ class clients_manager : public ana::connection_handler
{
public:
/** Constructor. */
clients_manager();
clients_manager( ana::server* );
/** Returns the amount of components connected to this server. */
size_t client_amount() const;
void has_connected( network::connection id );
bool has_connection_pending() const;
bool is_pending_handshake( ana::net_id ) const;
network::connection get_pending_connection_id();
private:
@ -342,8 +347,11 @@ class clients_manager : public ana::connection_handler
virtual void handle_disconnect(ana::error_code /*error*/, ana::net_id client);
ana::server* server_; // the server managing these clients
std::set< ana::net_id > ids_;
std::set< network::connection > pending_ids_;
std::set< ana::net_id > pending_handshakes_;
};
/**