Added a network stats module to Ana,

...this will help remove the current implementation. Functional, but
design is in early stages.
This commit is contained in:
Guillermo Biset 2010-06-25 07:24:34 +00:00
parent ed8d74dc96
commit b3b203deed
11 changed files with 386 additions and 58 deletions

View file

@ -70,6 +70,7 @@
#define ANA_DETAIL_INTERNAL_HPP
#include "common.hpp" //Main definitions
#include "timers.hpp" //Timer related
#include "stats.hpp" //Network statistics
#include "predicates.hpp" //Client predicates, used for conditional sending
#include "binary_streams.hpp" //For serialization
#undef ANA_DETAIL_INTERNAL_HPP
@ -197,51 +198,26 @@ namespace ana
};
//@}
/** @name Time duration functions. */
//@{
/** @namespace time
*
* Time conversion functions.
*/
namespace time
struct network_stats_logger
{
/** Start logging network events for statistics collection. */
virtual void start_logging() = 0;
/** Stop logging network events (disables statistics collection.) */
virtual void stop_logging() = 0;
/**
* Create a time lapse from a given amount of milliseconds.
* Get the associated collected stats as per a stat_type.
*
* @param ms : Milliseconds of elapsed time, must be a positive integer value.
* @param type : stat_type to be collected ( ACCUMULATED, SECONDS, MINUTES, HOURS, DAYS )
*
* @returns : A time duration amount (in milliseconds) to be used with timers.
* @returns A const pointer to a stats object holding the stats.
*
* \sa stats
* \sa stat_type
*/
inline size_t milliseconds(size_t ms) { return ms; }
/**
* Create a time lapse from a given amount of seconds.
*
* @param ms : Seconds of elapsed time.
*
* @returns : A time duration amount (in milliseconds) to be used with timers.
*/
inline size_t seconds(double s) { return size_t(s * 1000);}
/**
* Create a time lapse from a given amount of minutes.
*
* @param ms : Minutes of elapsed time.
*
* @returns : A time duration amount (in milliseconds) to be used with timers.
*/
inline size_t minutes(double m) { return seconds(m * 60); }
/**
* Create a time lapse from a given amount of hours.
*
* @param ms : Hours of elapsed time.
*
* @returns : A time duration amount (in milliseconds) to be used with timers.
*/
inline size_t hours(double h) { return minutes(h * 60); }
}
//@}
virtual const stats* get_stats( stat_type type ) const = 0;
};
/** @name Main classes.
*
@ -252,7 +228,8 @@ namespace ana
* A network server. An object of this type can handle several connected clients.
*/
struct server : public virtual detail::listener,
public detail::timed_sender
public detail::timed_sender,
public network_stats_logger
{
/**
* Creates an ana server.
@ -377,7 +354,7 @@ namespace ana
/** Returns the string representing the ip address of the connected client. */
virtual std::string ip_address() const = 0;
// Allow server objects to invoke run_listener directly.
using detail::listener::run_listener;
};
@ -390,7 +367,8 @@ namespace ana
* \sa timed_sender
*/
struct client : public virtual detail::listener,
public detail::timed_sender
public detail::timed_sender,
public network_stats_logger
{
/**
* Creates a client.
@ -441,7 +419,7 @@ namespace ana
* \sa send_handler
*/
virtual void send(boost::asio::const_buffer buffer, send_handler* handler, send_type type = COPY_BUFFER ) = 0;
/** Standard destructor. */
virtual ~client() {}
};

194
src/ana/api/stats.hpp Normal file
View file

@ -0,0 +1,194 @@
/* $Id$ */
/**
* @file stats.hpp
* @brief Implementation details for the ana project dealing with network statistics.
*
* ana: Asynchronous Network API.
* Copyright (C) 2010 Guillermo Biset.
*
* This file is part of the ana project.
*
* System: ana
* Language: C++
*
* Author: Guillermo Biset
* E-Mail: billybiset AT gmail DOT com
*
* ana is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* ana is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with ana. If not, see <http://www.gnu.org/licenses/>.
*
*/
#ifndef ANA_STATS_HPP
#define ANA_STATS_HPP
#include "timers.hpp"
#ifndef ANA_DETAIL_INTERNAL_HPP
#error "Private file, do not include directly."
#endif
namespace ana
{
enum stat_type
{
ACCUMULATED,
SECONDS,
MINUTES,
HOURS,
DAYS
};
struct stats
{
virtual size_t uptime() const = 0;
virtual size_t packets_in() const = 0;
virtual size_t packets_out() const = 0;
virtual size_t bytes_in() const = 0;
virtual size_t bytes_out() const = 0;
};
namespace detail
{
class stats_logger : public stats
{
public:
stats_logger(size_t ms_to_reset) :
ms_to_reset_(ms_to_reset),
timer_(),
start_time_( 0 ),
packets_in_( 0 ),
packets_out_( 0 ),
bytes_in_( 0 ),
bytes_out_( 0 )
{
if (ms_to_reset_ > 0 )
timer_.wait(ms_to_reset_, boost::bind( &stats_logger::reset, this, boost::asio::placeholders::error ) );
}
void log_send( detail::shared_buffer buffer )
{
++packets_out_;
bytes_out_ += buffer->size() + HEADER_LENGTH;
}
void log_receive( detail::read_buffer buffer )
{
++packets_in_;
bytes_in_ += buffer->size() + HEADER_LENGTH;
}
private:
void reset(boost::system::error_code& ec)
{
packets_in_ = 0;
packets_out_ = 0;
bytes_in_ = 0;
bytes_out_ = 0;
if (ms_to_reset_ > 0 )
timer_.wait(ms_to_reset_, boost::bind( &stats_logger::reset, this, boost::asio::placeholders::error ) );
}
virtual size_t uptime() const
{
return 0;
}
virtual size_t packets_in() const
{
return packets_in_;
}
virtual size_t packets_out() const
{
return packets_out_;
}
virtual size_t bytes_in() const
{
return bytes_in_;
}
virtual size_t bytes_out() const
{
return bytes_out_;
}
size_t ms_to_reset_;
timer timer_;
std::time_t start_time_;
size_t packets_in_;
size_t packets_out_;
size_t bytes_in_;
size_t bytes_out_;
};
}
class stats_collector
{
public:
stats_collector() :
accumulator_( 0 ),
seconds_stats_( time::seconds(1) ),
minutes_stats_( time::minutes(1) ),
hours_stats_( time::hours(1) ),
days_stats_( time::days(1) )
{
}
const stats* get_stats( stat_type type ) const
{
switch (type)
{
case ACCUMULATED : return &accumulator_;
case SECONDS : return &seconds_stats_;
case MINUTES : return &minutes_stats_;
case HOURS : return &hours_stats_;
case DAYS : return &days_stats_;
}
}
void log_send( detail::shared_buffer buffer )
{
accumulator_.log_send( buffer );
seconds_stats_.log_send( buffer );
minutes_stats_.log_send( buffer );
hours_stats_.log_send( buffer );
days_stats_.log_send( buffer );
}
void log_receive( detail::read_buffer buffer )
{
accumulator_.log_receive( buffer );
seconds_stats_.log_receive( buffer );
minutes_stats_.log_receive( buffer );
hours_stats_.log_receive( buffer );
days_stats_.log_receive( buffer );
}
private:
detail::stats_logger accumulator_;
detail::stats_logger seconds_stats_;
detail::stats_logger minutes_stats_;
detail::stats_logger hours_stats_;
detail::stats_logger days_stats_;
};
}
#endif

View file

@ -41,6 +41,61 @@
namespace ana
{
/** @name Time duration functions. */
//@{
/** @namespace time
*
* Time conversion functions.
*/
namespace time
{
/**
* Create a time lapse from a given amount of milliseconds.
*
* @param ms : Milliseconds of elapsed time, must be a positive integer value.
*
* @returns : A time duration amount (in milliseconds) to be used with timers.
*/
inline size_t milliseconds(size_t ms) { return ms; }
/**
* Create a time lapse from a given amount of seconds.
*
* @param ms : Seconds of elapsed time.
*
* @returns : A time duration amount (in milliseconds) to be used with timers.
*/
inline size_t seconds(double s) { return size_t(s * 1000);}
/**
* Create a time lapse from a given amount of minutes.
*
* @param ms : Minutes of elapsed time.
*
* @returns : A time duration amount (in milliseconds) to be used with timers.
*/
inline size_t minutes(double m) { return seconds(m * 60); }
/**
* Create a time lapse from a given amount of hours.
*
* @param ms : Hours of elapsed time.
*
* @returns : A time duration amount (in milliseconds) to be used with timers.
*/
inline size_t hours(double h) { return minutes(h * 60); }
/**
* Create a time lapse from a given amount of days.
*
* @param ms : Hours of elapsed time.
*
* @returns : A time duration amount (in milliseconds) to be used with timers.
*/
inline size_t days(double d) { return hours(d * 24); }
}
//@}
/**
* Timeout policies for send operations.
*
@ -93,7 +148,7 @@ namespace ana
template<class Handler>
void wait(size_t milliseconds, Handler handler)
{
timer_.expires_from_now(milliseconds / 1000);
timer_.expires_from_now( milliseconds / 1000.0); //conversion will use a double or float
timer_.async_wait(handler);
boost::thread t( boost::bind( &boost::asio::io_service::run_one, &io_service_ ) );
}

View file

@ -110,9 +110,29 @@ class ChatClient : public ana::listener_handler,
void parse_command(const std::string& msg)
{
if (msg[1] == 'n') //Lame: assume name command
{
name_ = get_name(msg);
}
else if (msg[1] == 's')
{
const ana::stats* acum_stats = client_->get_stats( ana::ACCUMULATED );
const ana::stats* sec_stats = client_->get_stats( ana::SECONDS );
const ana::stats* min_stats = client_->get_stats( ana::MINUTES );
const ana::stats* hour_stats = client_->get_stats( ana::HOURS );
const ana::stats* day_stats = client_->get_stats( ana::DAYS );
std::cout << "Network Statistics:\n"
<< "\tBytes Out:\n"
<< "\t\tTotal: " << acum_stats->bytes_out() << std::endl
<< "\t\tLast Second: " << sec_stats->bytes_out() << std::endl
<< "\t\tLast Minute: " << min_stats->bytes_out() << std::endl
<< "\t\tLast Hour: " << hour_stats->bytes_out() << std::endl
<< "\t\tLast Day: " << day_stats->bytes_out() << std::endl
<< "\tBytes In:\n"
<< "\t\tTotal: " << acum_stats->bytes_in() << std::endl
<< "\t\tLast Second: " << sec_stats->bytes_in() << std::endl
<< "\t\tLast Minute: " << min_stats->bytes_in() << std::endl
<< "\t\tLast Hour: " << hour_stats->bytes_in() << std::endl
<< "\t\tLast Day: " << day_stats->bytes_in() << std::endl;
}
}
void run_input()
@ -139,7 +159,6 @@ class ChatClient : public ana::listener_handler,
{
try
{
if ( ! conn_info_.use_proxy() )
client_->connect( this );
else
@ -151,6 +170,7 @@ class ChatClient : public ana::listener_handler,
client_->set_listener_handler( this );
client_->run();
client_->start_logging();
std::cout << "Available commands: \n" <<
" '/quit' : Quit. \n"

View file

@ -48,12 +48,14 @@ asio_client::asio_client(ana::address address, ana::port pt) :
address_(address),
port_(pt),
proxy_( NULL ),
use_proxy_( false )
use_proxy_( false ),
stats_collector_( NULL )
{
}
asio_client::~asio_client()
{
stop_logging();
}
ana::client* ana::client::create(ana::address address, ana::port pt)
@ -171,6 +173,31 @@ void asio_client::send(boost::asio::const_buffer buffer, ana::send_handler* hand
}
}
void asio_client::log_receive( ana::detail::read_buffer buffer )
{
if (stats_collector_ != NULL )
stats_collector_->log_receive( buffer );
}
void asio_client::start_logging()
{
stop_logging();
stats_collector_ = new ana::stats_collector();
}
void asio_client::stop_logging()
{
delete stats_collector_;
}
const ana::stats* asio_client::get_stats( ana::stat_type type ) const
{
if (stats_collector_ != NULL )
return stats_collector_->get_stats( type );
else
throw std::runtime_error("Logging is disabled. Use start_logging first.");
}
void asio_client::handle_sent_header(const boost::system::error_code& ec,
ana::serializer::bostream* bos, ana::detail::shared_buffer buffer,
ana::send_handler* handler)
@ -188,9 +215,12 @@ void asio_client::handle_sent_header(const boost::system::error_code& ec,
void asio_client::handle_send(const boost::system::error_code& ec,
ana::detail::shared_buffer /*buffer*/,
ana::detail::shared_buffer buffer,
ana::send_handler* handler)
{
if ( stats_collector_ != NULL )
stats_collector_->log_send( buffer );
handler->handle_send( ec, id() );
if ( ec )
@ -201,3 +231,4 @@ void asio_client::disconnect_listener()
{
io_service_.stop();
}

View file

@ -77,6 +77,13 @@ class asio_client : public ana::client,
virtual tcp::socket& socket();
virtual void log_receive( ana::detail::read_buffer buffer );
virtual void start_logging();
virtual void stop_logging();
virtual const ana::stats* get_stats( ana::stat_type type ) const;
void handle_sent_header(const boost::system::error_code& ec,
ana::serializer::bostream*, ana::detail::shared_buffer,
ana::send_handler*);
@ -102,6 +109,8 @@ class asio_client : public ana::client,
proxy_connection* proxy_;
bool use_proxy_;
ana::stats_collector* stats_collector_;
};
#endif

View file

@ -62,6 +62,8 @@ void asio_listener::handle_body( ana::detail::read_buffer buf,
disconnect(listener, ec);
else
{
log_receive( buf );
listener->handle_message( ec, id(), buf );
listen_one_message();

View file

@ -56,6 +56,8 @@ class asio_listener : public virtual ana::detail::listener
private:
virtual void disconnect_listener() {}
virtual void log_receive( ana::detail::read_buffer buffer ) {}
void listen_one_message();
void disconnect( ana::listener_handler* listener, boost::system::error_code error);

View file

@ -51,7 +51,8 @@ asio_server::asio_server() :
listening_(false),
listener_( NULL ),
connection_handler_( NULL ),
last_client_proxy_( NULL )
last_client_proxy_( NULL ),
stats_collector_( NULL )
{
}
@ -213,6 +214,32 @@ std::string asio_server::ip_address( net_id id ) const
return "";
}
void asio_server::log_receive( ana::detail::read_buffer buffer )
{
if (stats_collector_ != NULL )
stats_collector_->log_receive( buffer );
}
void asio_server::start_logging()
{
stop_logging();
stats_collector_ = new ana::stats_collector();
}
void asio_server::stop_logging()
{
delete stats_collector_;
}
const ana::stats* asio_server::get_stats( ana::stat_type type ) const
{
if (stats_collector_ != NULL )
return stats_collector_->get_stats( type );
else
throw std::runtime_error("Logging is disabled. Use start_logging first.");
}
asio_server::asio_client_proxy::asio_client_proxy(boost::asio::io_service& io_service, asio_proxy_manager* server) :
client_proxy(),
asio_listener(),

View file

@ -67,7 +67,7 @@ class asio_server : public ana::server,
virtual void send(ana::detail::shared_buffer, ana::send_handler*, ana::detail::timed_sender* );
virtual std::string ip_address( ) const;
void handle_sent_header(const boost::system::error_code& ec,
ana::serializer::bostream*, ana::detail::shared_buffer,
ana::send_handler*, ana::timer*);
@ -102,6 +102,13 @@ class asio_server : public ana::server,
virtual std::string ip_address( ana::net_id ) const;
virtual void log_receive( ana::detail::read_buffer buffer );
virtual void start_logging();
virtual void stop_logging();
virtual const ana::stats* get_stats( ana::stat_type type ) const;
void handle_accept (const boost::system::error_code& ec,asio_client_proxy* client, ana::connection_handler* );
void register_client(client_proxy* client);
@ -118,6 +125,8 @@ class asio_server : public ana::server,
ana::listener_handler* listener_;
ana::connection_handler* connection_handler_;
asio_client_proxy* last_client_proxy_;
ana::stats_collector* stats_collector_;
};
#endif

View file

@ -148,7 +148,7 @@ class ana_network_manager : public ana::listener_handler,
return connected_clients_.size();
}
void send_all( const config& cfg, bool zipped )
size_t send_all( const config& cfg, bool zipped )
{
std::stringstream out;
config_writer cfg_writer(out, zipped);
@ -163,9 +163,10 @@ class ana_network_manager : public ana::listener_handler,
it->second->send_all( ana::buffer( out.str() ), &handler, ana::ZERO_COPY);
}
mutex.lock(); // this won't work with multiple sends
return out.str().size();
}
void send( network::connection id, const config& cfg, bool zipped )
size_t send( network::connection id, const config& cfg, bool zipped )
{
std::stringstream out;
config_writer cfg_writer(out, zipped);
@ -180,6 +181,7 @@ class ana_network_manager : public ana::listener_handler,
it->second->send_one( ana::net_id( id ), ana::buffer( out.str() ), &handler, ana::ZERO_COPY);
}
mutex.lock();
return out.str().size();
}
private:
@ -578,11 +580,9 @@ namespace network {
return 0;
if( connection_num == 0 )
ana_manager.send_all( cfg, gzipped );
return ana_manager.send_all( cfg, gzipped );
else
ana_manager.send( connection_num, cfg, gzipped );
throw std::runtime_error("TODO:Not implemented send_data");
return ana_manager.send( connection_num, cfg, gzipped );
}
void send_raw_data(const char* /*buf*/,
@ -615,6 +615,7 @@ namespace network {
{
throw std::runtime_error("TODO:Not implemented get_send_stats");
}
statistics get_receive_stats(connection /*handle*/)
{
throw std::runtime_error("TODO:Not implemented get_receive_stats");