First attempt at solving Quit/reconnect issue in ana,

...behavior is undefined in most cases. Needs debugging.
This commit is contained in:
Guillermo Biset 2010-07-09 19:27:57 +00:00
parent 5acc6aeb91
commit c720ee800b
2 changed files with 330 additions and 91 deletions

View file

@ -164,6 +164,142 @@ void ana_receive_handler::handle_timeout(ana::error_code error_code)
timeout_called_mutex_.unlock();
}
// Begin ana_multiple_receive_handler implementation ------------------------------------------------------------
ana_multiple_receive_handler::ana_multiple_receive_handler( ana_component_set& components ) :
components_( components ),
mutex_(),
handler_mutex_(),
timeout_called_mutex_(),
error_code_(),
buffer_(),
wesnoth_id_(0),
receive_timer_( NULL ),
finished_( false )
{
std::cout << "DEBUG: Constructing a new ana_multiple_receive_handler...\n";
ana_component_set::iterator it;
for (it = components_.begin(); it != components_.end(); ++it )
{
if ( (*it)->is_server() )
(*it)->server()->set_listener_handler( this );
else
(*it)->client()->set_listener_handler( this );
}
mutex_.lock();
timeout_called_mutex_.lock();
}
ana_multiple_receive_handler::~ana_multiple_receive_handler()
{
timeout_called_mutex_.lock();
timeout_called_mutex_.unlock();
handler_mutex_.lock();
handler_mutex_.unlock();
}
void ana_multiple_receive_handler::wait_completion(size_t timeout_ms )
{
ana_component_set::iterator it;
{
boost::mutex::scoped_lock lock( handler_mutex_);
it = components_.begin();
ana::detail::timed_sender* component;
if ( (*it)->is_server())
component = (*it)->server();
else
component = (*it)->client();
if ( finished_ )
{
mutex_.unlock();
timeout_called_mutex_.unlock();
}
else if ( timeout_ms > 0 )
{
receive_timer_ = component->create_timer();
receive_timer_->wait( ana::time::milliseconds(timeout_ms),
boost::bind(&ana_multiple_receive_handler::handle_timeout, this, ana::timeout_error ) );
}
}
mutex_.lock();
mutex_.unlock();
}
void ana_multiple_receive_handler::handle_message(ana::error_code error_c,
ana::net_id id,
ana::detail::read_buffer read_buffer)
{
boost::mutex::scoped_lock lock( handler_mutex_);
delete receive_timer_;
receive_timer_ = NULL;
buffer_ = read_buffer;
error_code_ = error_c;
ana_component_set::iterator it;
it = std::find_if( components_.begin(), components_.end(),
boost::bind(&ana_component::get_id, _1) == id );
if ( it != components_.end())
wesnoth_id_ = (*it)->get_wesnoth_id();
else
throw std::runtime_error("Wrong read.");
if (! finished_ )
{
finished_ = true;
mutex_.unlock();
}
}
void ana_multiple_receive_handler::handle_disconnect(ana::error_code error_c, ana::net_id)
{
boost::mutex::scoped_lock lock( handler_mutex_);
delete receive_timer_;
receive_timer_ = NULL;
error_code_ = error_c;
if (! finished_ )
{
finished_ = true;
mutex_.unlock();
}
}
void ana_multiple_receive_handler::handle_timeout(ana::error_code error_code)
{
boost::mutex::scoped_lock lock( handler_mutex_ );
delete receive_timer_;
receive_timer_ = NULL;
if (! finished_ )
{
if (error_code)
std::cout << "DEBUG: Receive attempt timed out\n";
else
std::cout << "DEBUG: Shouldn't reach here\n";
error_code_ = error_code;
finished_ = true;
mutex_.unlock();
}
timeout_called_mutex_.unlock();
}
// Begin ana_connect_handler implementation ------------------------------------------------------------
ana_connect_handler::ana_connect_handler( ana::timer* timer ) :
@ -558,7 +694,7 @@ size_t ana_network_manager::send_all( const config& cfg, bool zipped )
ana_send_handler handler( *it, out.str().size() );
(*it)->client()->send( ana::buffer( out.str() ), &handler, ana::ZERO_COPY );
handler.wait_completion(); // the handler will release the mutex after necessary_calls calls
handler.wait_completion();
}
}
std::cout << "Sent data.\n";
@ -659,7 +795,43 @@ network::connection ana_network_manager::read_from( network::connection connecti
}
}
else
throw std::runtime_error("Global Buffer Queue here?");
{
//Check first if there is an available buffer
for (it = components_.begin(); it != components_.end(); ++it)
{
if ( (*it)->new_buffer_ready() )
{
buffer = (*it)->wait_for_element();
return (*it)->get_wesnoth_id();
}
}
// If no timeout was requested, return
if (timeout_ms == 0 )
return 0;
// Wait timeout_ms milliseconds to see if any component will receive something
ana_multiple_receive_handler handler( components_ );
for (it = components_.begin(); it != components_.end(); ++it )
{
if ( (*it)->is_server() )
(*it)->server()->set_listener_handler( this );
else
(*it)->client()->set_listener_handler( this );
}
handler.wait_completion( timeout_ms );
if ( handler.error() )
return 0;
else
{
buffer = handler.buffer();
return handler.get_wesnoth_id();
}
// throw std::runtime_error("Global Buffer Queue here?");
}
}
else
{

View file

@ -35,6 +35,97 @@ struct send_stats_logger
virtual void update_send_stats( size_t ) = 0;
};
/**
* A representative of a network component to the application.
*/
class ana_component : public send_stats_logger
{
public:
/** Constructs a server component. */
ana_component( );
/**
* Constructs a client component.
*
* @param host : The hostname to which it is supposed to connect to.
* @param port : The port it is supposed to connect to.
*/
ana_component( const std::string& host, const std::string& port);
/** Get network upload statistics for this component. */
network::statistics get_send_stats() const;
/** Get network download statistics for this component. */
network::statistics get_receive_stats() const;
/**
* Get the pointer to an ana::server object for this component.
*
* @Pre : This component is a server.
*/
ana::server* server() const;
/**
* Get the pointer to an ana::client object for this component.
*
* @Pre : This component is a client.
*/
ana::client* client() const;
/** Returns true iff this component is a server. */
bool is_server() const;
/** Returns true iff this component is a client. */
bool is_client() const;
/** Returns this component's id. */
ana::net_id get_id() const;
network::connection get_wesnoth_id() const;
void set_wesnoth_id( network::connection ) ;
/** Returns a pointer to the ana::stats object for accumulated network stats. */
const ana::stats* get_stats() const;
/** Push a buffer to the queue of incoming messages. */
void add_buffer(ana::detail::read_buffer buffer);
/**
* Blocking operation to wait for a message in a component.
*
* @returns The buffer that was received first from all pending buffers.
*/
ana::detail::read_buffer wait_for_element();
bool new_buffer_ready(); // non const due to mutex blockage
/** Log an incoming buffer. */
void update_receive_stats( size_t buffer_size );
private:
virtual void update_send_stats( size_t buffer_size);
boost::variant<ana::server*, ana::client*> base_;
bool is_server_;
ana::net_id id_;
network::connection wesnoth_id_;
network::statistics send_stats_;
network::statistics receive_stats_;
//Buffer queue attributes
boost::mutex mutex_;
boost::condition_variable condition_;
std::queue< ana::detail::read_buffer > buffers_;
};
typedef std::set<ana_component*> ana_component_set;
/**
* To use the asynchronous library synchronously, objects of this
* type lock a mutex until enough calls have been made to the
@ -127,6 +218,70 @@ class ana_receive_handler : public ana::listener_handler
bool finished_;
};
/**
* To use the asynchronous library synchronously, objects of this
* type lock a mutex until enough calls have been made to the
* associated handler.
*/
class ana_multiple_receive_handler : public ana::listener_handler
{
public:
/**
* Constructs a reader handler object.
*/
ana_multiple_receive_handler( ana_component_set& components );
/** Destructor. */
~ana_multiple_receive_handler();
/**
* Attempts to read from those network components associated with this
* handler object up until timeout_ms milliseconds.
*
* If the timeout parameter is 0, it will lock the current thread until
* one of these components has received a message.
*
* @param component : A network component running an io_service which supports timeout capabilities.
* @param timeout_ms : Amount of milliseconds to timeout the operation.
*/
void wait_completion(size_t timeout_ms = 0);
/** Returns the error_code from the operation. */
const ana::error_code& error() const
{
return error_code_;
}
/** Returns the buffer from the operation. */
ana::detail::read_buffer buffer() const
{
return buffer_;
}
network::connection get_wesnoth_id() const
{
return wesnoth_id_;
}
private:
virtual void handle_message (ana::error_code, ana::net_id, ana::detail::read_buffer);
virtual void handle_disconnect(ana::error_code, ana::net_id);
void handle_timeout(ana::error_code error_code);
ana_component_set& components_;
boost::mutex mutex_;
boost::mutex handler_mutex_;
boost::mutex timeout_called_mutex_;
ana::error_code error_code_;
ana::detail::read_buffer buffer_;
network::connection wesnoth_id_;
ana::timer* receive_timer_;
bool finished_;
};
/**
* To use the asynchronous library synchronously, objects of this
@ -170,94 +325,6 @@ class ana_connect_handler : public ana::connection_handler
bool connected_;
};
/**
* A representative of a network component to the application.
*/
class ana_component : public send_stats_logger
{
public:
/** Constructs a server component. */
ana_component( );
/**
* Constructs a client component.
*
* @param host : The hostname to which it is supposed to connect to.
* @param port : The port it is supposed to connect to.
*/
ana_component( const std::string& host, const std::string& port);
/** Get network upload statistics for this component. */
network::statistics get_send_stats() const;
/** Get network download statistics for this component. */
network::statistics get_receive_stats() const;
/**
* Get the pointer to an ana::server object for this component.
*
* @Pre : This component is a server.
*/
ana::server* server() const;
/**
* Get the pointer to an ana::client object for this component.
*
* @Pre : This component is a client.
*/
ana::client* client() const;
/** Returns true iff this component is a server. */
bool is_server() const;
/** Returns true iff this component is a client. */
bool is_client() const;
/** Returns this component's id. */
ana::net_id get_id() const;
network::connection get_wesnoth_id() const;
void set_wesnoth_id( network::connection ) ;
/** Returns a pointer to the ana::stats object for accumulated network stats. */
const ana::stats* get_stats() const;
/** Push a buffer to the queue of incoming messages. */
void add_buffer(ana::detail::read_buffer buffer);
/**
* Blocking operation to wait for a message in a component.
*
* @returns The buffer that was received first from all pending buffers.
*/
ana::detail::read_buffer wait_for_element();
bool new_buffer_ready(); // non const due to mutex blockage
/** Log an incoming buffer. */
void update_receive_stats( size_t buffer_size );
private:
virtual void update_send_stats( size_t buffer_size);
boost::variant<ana::server*, ana::client*> base_;
bool is_server_;
ana::net_id id_;
network::connection wesnoth_id_;
network::statistics send_stats_;
network::statistics receive_stats_;
//Buffer queue attributes
boost::mutex mutex_;
boost::condition_variable condition_;
std::queue< ana::detail::read_buffer > buffers_;
};
/**
* Manages connected client ids for a given server.
*/
@ -354,7 +421,7 @@ class ana_network_manager : public ana::listener_handler,
virtual void handle_disconnect(ana::error_code /*error_code*/, ana::net_id client);
ana::timer* connect_timer_;
std::set< ana_component* > components_;
ana_component_set components_;
std::map< ana::server*, const clients_manager* > server_manager_;
};