Ana build connects fine but is bugged when trying to connect...

...to a non existing server.
This commit is contained in:
Guillermo Biset 2010-07-01 08:35:55 +00:00
parent 5d7ca5cff1
commit be403b9ff3
7 changed files with 149 additions and 30 deletions

View file

@ -150,7 +150,7 @@ namespace ana
{
timer_.expires_from_now( milliseconds / 1000.0); //conversion will use a double or float
timer_.async_wait(handler);
boost::thread t( boost::bind( &boost::asio::io_service::run_one, &io_service_ ) );
boost::thread t( boost::bind( &timer::run, this ) );
}
/** Cancel the timer if running. */
@ -167,6 +167,18 @@ namespace ana
}
private:
void run()
{
try
{
io_service_.run_one();
}
catch(const std::exception& e)
{
// Timer was cancelled. Don't propagate exception
}
}
/** Private class providing traits for the timer type. */
struct time_t_traits
{

View file

@ -58,7 +58,7 @@ class ChatServer : public listener_handler,
server_->run(pt);
server_->set_timeouts(ana::FixedTime, ana::time::seconds(10));
server_->set_timeouts(ana::FixedTime, ana::time::milliseconds(1));
std::cout << "Server running, Enter to quit." << std::endl;

View file

@ -127,7 +127,6 @@ void asio_client::connect( ana::connection_handler* handler )
catch (const std::exception& e)
{
handler->handle_connect( boost::system::error_code(1,boost::system::system_category ), 0 );
std::cerr << "Client: An error ocurred, " << e.what() << std::endl;
}
}

View file

@ -38,6 +38,7 @@
using boost::asio::ip::tcp;
asio_listener::asio_listener( ) :
disconnected_( false ),
listener_( NULL )
{
}
@ -48,8 +49,12 @@ asio_listener::~asio_listener()
void asio_listener::disconnect( ana::listener_handler* listener, boost::system::error_code error)
{
listener->handle_disconnect( error, id() );
disconnect_listener();
if ( ! disconnected_ )
{
listener->handle_disconnect( error, id() );
disconnect_listener();
disconnected_ = true;
}
}
@ -63,7 +68,7 @@ void asio_listener::handle_body( ana::detail::read_buffer buf,
else
{
log_receive( buf );
listener->handle_message( ec, id(), buf );
listen_one_message();

View file

@ -67,6 +67,7 @@ class asio_listener : public virtual ana::detail::listener
void handle_body( ana::detail::read_buffer , const boost::system::error_code& , ana::listener_handler* );
/*attr*/
bool disconnected_;
ana::listener_handler* listener_;
char header_[ana::HEADER_LENGTH];
};

View file

@ -314,7 +314,7 @@ void asio_server::asio_client_proxy::handle_sent_header(const boost::system::err
else
{
disconnect_listener();
running_timer->cancel();
delete running_timer;
}
}
@ -323,7 +323,7 @@ void asio_server::asio_client_proxy::handle_send(const boost::system::error_code
ana::detail::shared_buffer /*buffer*/,
send_handler* handler, timer* running_timer)
{
running_timer->cancel();
delete running_timer;
handler->handle_send( ec, id() );
@ -361,7 +361,7 @@ void asio_server::asio_client_proxy::send(ana::detail::shared_buffer buffer,
catch(std::exception& e)
{
disconnect_listener();
running_timer->cancel();
delete running_timer;
}
}

View file

@ -82,12 +82,14 @@ class ana_handler : public ana::send_handler
logger_( logger ),
buf_size_( buf_size )
{
std::cout << "DEBUG: Constructing a new ana_handler...\n";
if ( calls > 0 )
mutex_.lock();
}
~ana_handler()
{
std::cout << "DEBUG: Terminating an ana_handler...\n";
if ( target_calls_ > 0 )
throw std::runtime_error("Handler wasn't called enough times.");
}
@ -111,6 +113,64 @@ class ana_handler : public ana::send_handler
size_t buf_size_;
};
class ana_connect_handler : public ana::connection_handler
{
public:
ana_connect_handler( boost::mutex& mutex, ana::timer* timer) :
mutex_(mutex),
timer_(timer),
error_code_(),
connected_(false)
{
std::cout << "DEBUG: Constructing a new ana_connect_handler...\n";
mutex_.lock();
}
void handle_timeout(ana::error_code error_code)
{
if ( ! connected_ ) // disregard this call after a connect termination (regardless of result)
{
if (error_code)
std::cout << "DEBUG: Connection attempt timed out\n";
else
std::cout << "DEBUG: Shouldn't reach here\n";
error_code_ = error_code;
mutex_.unlock();
}
}
~ana_connect_handler()
{
std::cout << "DEBUG: Terminating an ana_connect_handler...\n";
}
const ana::error_code& error() const
{
return error_code_;
}
private:
virtual void handle_connect(ana::error_code error_code, ana::net_id /*client*/)
{
connected_ = true;
if (! error_code)
std::cout << "DEBUG: Connected.\n";
else
std::cout << "DEBUG: Can't connect.\n";
error_code_ = error_code;
mutex_.unlock();
}
boost::mutex& mutex_;
ana::timer* timer_;
ana::error_code error_code_;
bool connected_;
};
class ana_component : public send_stats_logger
{
public:
@ -234,11 +294,11 @@ class clients_manager : public ana::connection_handler
std::set<ana::net_id> ids_;
};
class ana_network_manager : public ana::listener_handler,
public ana::connection_handler
class ana_network_manager : public ana::listener_handler
{
public:
ana_network_manager() :
connect_timer_( NULL ),
components_(),
server_manager_()
{
@ -246,6 +306,8 @@ class ana_network_manager : public ana::listener_handler,
ana::net_id create_server( )
{
std::cout << "DEBUG: Creating server.\n";
ana_component* new_component = new ana_component( );
components_.insert( new_component );
@ -256,21 +318,56 @@ class ana_network_manager : public ana::listener_handler,
server->set_connection_handler( manager );
server->set_listener_handler( this );
server->start_logging();
return server->id();
}
network::connection create_client_and_connect(std::string host, int port)
{
std::stringstream ss;
ss << port;
std::cout << "DEBUG: Creating client and connecting...\n";
ana_component* client = new ana_component( host, ss.str() );
components_.insert(client);
try
{
std::stringstream ss;
ss << port;
client->client()->connect( this );
ana_component* new_component = new ana_component( host, ss.str() );
components_.insert( new_component );
return network::connection( client->client()->id() );
ana::client* const client = new_component->client();
boost::mutex mutex;
connect_timer_ = new ana::timer();
ana_connect_handler handler(mutex, connect_timer_);
connect_timer_->wait( ana::time::seconds(3), // 3 seconds to connection timeout
boost::bind(&ana_connect_handler::handle_timeout, &handler,
boost::asio::error::make_error_code( boost::asio::error::timed_out ) ) );
client->connect( &handler );
client->set_listener_handler( this );
client->run();
client->start_logging();
mutex.lock(); // just wait for handler to release it
mutex.unlock(); // unlock for destruction
delete connect_timer_;
if( ! handler.error() )
return network::connection( client->id() );
else
return 0;
}
catch( const std::exception& e )
{
std::cout << "DEBUG: Caught an exception while trying to connect.\n";
return 0;
}
}
const ana::stats* get_stats( network::connection connection_num )
@ -325,6 +422,7 @@ class ana_network_manager : public ana::listener_handler,
size_t send_all( const config& cfg, bool zipped )
{
std::cout << "DEBUG: Sending to everybody...\n";
std::stringstream out;
config_writer cfg_writer(out, zipped);
cfg_writer.write(cfg);
@ -350,6 +448,7 @@ class ana_network_manager : public ana::listener_handler,
size_t send( network::connection connection_num , const config& cfg, bool zipped )
{
std::cout << "DEBUG: Single send...\n";
ana::net_id id( connection_num );
std::stringstream out;
@ -438,18 +537,25 @@ class ana_network_manager : public ana::listener_handler,
}
}
// Only for client connection/disconnection
virtual void handle_connect(ana::error_code error, ana::net_id /*client*/)
{
if ( error )
throw std::runtime_error("Error connecting to server.");
}
virtual void handle_disconnect(ana::error_code /*error*/, ana::net_id /*client*/)
virtual void handle_disconnect(ana::error_code /*error_code*/, ana::net_id client)
{
std::cout << "DEBUG: Disconnected from server.\n";
std::set< ana_component* >::iterator it;
it = std::find_if( components_.begin(), components_.end(),
boost::bind(&ana_component::get_id, _1) == client );
if ( it != components_.end() )
{
std::cout << "DEBUG: Removing bad component.\n";
components_.erase(it);
}
}
ana::timer* connect_timer_;
std::set< ana_component* > components_;
std::map< ana::server*, const clients_manager* > server_manager_;
};
@ -603,7 +709,6 @@ namespace network {
size_t nconnections()
{
return ana_manager.number_of_connections();
// return sockets.size();
}
bool is_server()
@ -614,13 +719,11 @@ namespace network {
connection connect(const std::string& host, int port)
{
return ana_manager.create_client_and_connect( host, port );
// throw std::runtime_error("TODO:Not implemented");
}
connection connect(const std::string& host, int port, threading::waiter& /*waiter*/)
{
return connect( host, port );
// throw std::runtime_error("TODO:Not implemented");
}
namespace {
@ -645,7 +748,7 @@ namespace network {
void queue_disconnect(network::connection sock)
{
// throw("TODO:Not implemented");
// throw("TODO:Not implemented queue_disconnect");
disconnection_queue.push_back(sock);
}
@ -655,7 +758,6 @@ namespace network {
bandwidth_in_ptr* /*bandwidth_in*/)
{
// connection conn = ana_manager.receive_data( connection_num );
throw std::runtime_error("TODO:Not implemented receive_data0");
}