changes to network protocol to support framework for recovering connections
This commit is contained in:
parent
b858d93f21
commit
40e4b42aaa
2 changed files with 198 additions and 31 deletions
227
src/network.cpp
227
src/network.cpp
|
@ -12,6 +12,66 @@
|
|||
|
||||
namespace {
|
||||
|
||||
//We store the details of a connection in a map that must be looked up by its handle.
|
||||
//This allows a connection to be disconnected and then recovered, but the handle remains
|
||||
//the same, so it's all seamless to the user
|
||||
struct connection_details {
|
||||
connection_details(TCPsocket sock, const std::string& host, int port)
|
||||
: sock(sock), disconnected_at(0), host(host), port(port), remote_handle(0)
|
||||
{}
|
||||
|
||||
TCPsocket sock;
|
||||
int disconnected_at;
|
||||
std::string host;
|
||||
int port;
|
||||
|
||||
//the remote handle is the handle assigned to this connection by the remote host.
|
||||
//is 0 before a handle has been assigned.
|
||||
int remote_handle;
|
||||
};
|
||||
|
||||
typedef std::map<network::connection,connection_details> connection_map;
|
||||
connection_map connections;
|
||||
|
||||
network::connection connection_id = 1;
|
||||
|
||||
int create_connection(TCPsocket sock, const std::string& host, int port)
|
||||
{
|
||||
connections.insert(std::pair<network::connection,connection_details>(connection_id,connection_details(sock,host,port)));
|
||||
return connection_id++;
|
||||
}
|
||||
|
||||
connection_details& get_connection_details(network::connection handle)
|
||||
{
|
||||
const connection_map::iterator i = connections.find(handle);
|
||||
if(i == connections.end()) {
|
||||
throw network::error("invalid network handle");
|
||||
}
|
||||
|
||||
return i->second;
|
||||
}
|
||||
|
||||
TCPsocket get_socket(network::connection handle)
|
||||
{
|
||||
return get_connection_details(handle).sock;
|
||||
}
|
||||
|
||||
void remove_connection(network::connection handle)
|
||||
{
|
||||
connections.erase(handle);
|
||||
}
|
||||
|
||||
bool is_pending_remote_handle(network::connection handle)
|
||||
{
|
||||
const connection_details& details = get_connection_details(handle);
|
||||
return details.host != "" && details.remote_handle == 0;
|
||||
}
|
||||
|
||||
void set_remote_handle(network::connection handle, int remote_handle)
|
||||
{
|
||||
get_connection_details(handle).remote_handle = remote_handle;
|
||||
}
|
||||
|
||||
SDLNet_SocketSet socket_set = 0;
|
||||
typedef std::vector<network::connection> sockets_list;
|
||||
sockets_list sockets;
|
||||
|
@ -88,7 +148,7 @@ server_manager::server_manager(int port, bool create_server)
|
|||
: free_(false)
|
||||
{
|
||||
if(create_server && !server_socket) {
|
||||
server_socket = connect("",port);
|
||||
server_socket = get_socket(connect("",port));
|
||||
std::cerr << "server socket initialized: " << server_socket << "\n";
|
||||
free_ = true;
|
||||
}
|
||||
|
@ -125,42 +185,122 @@ connection connect(const std::string& host, int port)
|
|||
throw error("Could not connect to host");
|
||||
}
|
||||
|
||||
//if this is not a server socket, then add it to the list
|
||||
//of sockets we listen to
|
||||
if(hostname != NULL) {
|
||||
const int res = SDLNet_TCP_AddSocket(socket_set,sock);
|
||||
if(res == -1) {
|
||||
SDLNet_TCP_Close(sock);
|
||||
throw network::error("Could not add socket to socket set");
|
||||
}
|
||||
|
||||
assert(sock != server_socket);
|
||||
sockets.push_back(sock);
|
||||
schemas.insert(std::pair<network::connection,schema_pair>(sock,schema_pair()));
|
||||
//if this is a server socket
|
||||
if(hostname == NULL) {
|
||||
return create_connection(sock,"",port);
|
||||
}
|
||||
|
||||
return sock;
|
||||
std::cerr << "sending handshake...\n";
|
||||
//send data telling the remote host that this is a new connection
|
||||
char buf[4];
|
||||
SDLNet_Write32(0,buf);
|
||||
const int nbytes = SDLNet_TCP_Send(sock,buf,4);
|
||||
if(nbytes != 4) {
|
||||
SDLNet_TCP_Close(sock);
|
||||
throw network::error("Could not send initial handshake");
|
||||
}
|
||||
std::cerr << "sent handshake...\n";
|
||||
|
||||
//allocate this connection a connection handle
|
||||
const network::connection connect = create_connection(sock,host,port);
|
||||
|
||||
const int res = SDLNet_TCP_AddSocket(socket_set,sock);
|
||||
if(res == -1) {
|
||||
SDLNet_TCP_Close(sock);
|
||||
throw network::error("Could not add socket to socket set");
|
||||
}
|
||||
|
||||
assert(sock != server_socket);
|
||||
sockets.push_back(connect);
|
||||
schemas.insert(std::pair<network::connection,schema_pair>(connect,schema_pair()));
|
||||
|
||||
return connect;
|
||||
}
|
||||
|
||||
connection accept_connection()
|
||||
{
|
||||
if(!server_socket)
|
||||
if(!server_socket) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
const connection sock = SDLNet_TCP_Accept(server_socket);
|
||||
//a connection isn't considered 'accepted' until it has sent its initial handshake.
|
||||
//The initial handshake is a 4 byte value, which is 0 for a new connection, or the
|
||||
//handle of the connection if it's trying to recover a lost connection.
|
||||
|
||||
//a list of all the sockets which have connected, but haven't had their initial
|
||||
//handshake received
|
||||
static std::vector<TCPsocket> pending_sockets;
|
||||
static SDLNet_SocketSet pending_socket_set = 0;
|
||||
|
||||
const TCPsocket sock = SDLNet_TCP_Accept(server_socket);
|
||||
if(sock) {
|
||||
std::cerr << "received connection. Pending handshake...\n";
|
||||
pending_sockets.push_back(sock);
|
||||
if(pending_socket_set == 0) {
|
||||
pending_socket_set = SDLNet_AllocSocketSet(64);
|
||||
}
|
||||
|
||||
if(pending_socket_set != 0) {
|
||||
SDLNet_TCP_AddSocket(pending_socket_set,sock);
|
||||
}
|
||||
}
|
||||
|
||||
if(pending_socket_set == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
const int set_res = SDLNet_CheckSockets(pending_socket_set,0);
|
||||
if(set_res <= 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
for(std::vector<TCPsocket>::iterator i = pending_sockets.begin(); i != pending_sockets.end(); ++i) {
|
||||
if(!SDLNet_SocketReady(*i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
//receive the 4 bytes telling us if they're a new connection or trying to
|
||||
//recover a connection
|
||||
char buf[4];
|
||||
|
||||
const TCPsocket sock = *i;
|
||||
SDLNet_TCP_DelSocket(pending_socket_set,sock);
|
||||
pending_sockets.erase(i);
|
||||
|
||||
const size_t len = SDLNet_TCP_Recv(sock,buf,4);
|
||||
if(len != 4) {
|
||||
SDLNet_TCP_Close(sock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
const int handle = SDLNet_Read32(buf);
|
||||
|
||||
std::cerr << "received handshake from client: '" << handle << "'\n";
|
||||
|
||||
const int res = SDLNet_TCP_AddSocket(socket_set,sock);
|
||||
if(res == -1) {
|
||||
SDLNet_TCP_Close(sock);
|
||||
|
||||
throw network::error("Could not add socket to socket set");
|
||||
}
|
||||
|
||||
const connection connect = create_connection(sock,"",0);
|
||||
|
||||
//send back their connection number
|
||||
SDLNet_Write32(connect,buf);
|
||||
const int nbytes = SDLNet_TCP_Send(sock,buf,4);
|
||||
if(nbytes != 4) {
|
||||
SDLNet_TCP_Close(sock);
|
||||
throw network::error("Could not send initial handshake");
|
||||
}
|
||||
|
||||
assert(sock != server_socket);
|
||||
sockets.push_back(sock);
|
||||
schemas.insert(std::pair<network::connection,schema_pair>(sock,schema_pair()));
|
||||
sockets.push_back(connect);
|
||||
schemas.insert(std::pair<network::connection,schema_pair>(connect,schema_pair()));
|
||||
return connect;
|
||||
}
|
||||
|
||||
return sock;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void disconnect(connection s)
|
||||
|
@ -169,7 +309,7 @@ void disconnect(connection s)
|
|||
while(sockets.empty() == false) {
|
||||
assert(sockets.back() != 0);
|
||||
disconnect(sockets.back());
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
@ -187,8 +327,13 @@ void disconnect(connection s)
|
|||
const sockets_list::iterator i = std::find(sockets.begin(),sockets.end(),s);
|
||||
if(i != sockets.end()) {
|
||||
sockets.erase(i);
|
||||
SDLNet_TCP_DelSocket(socket_set,s);
|
||||
SDLNet_TCP_Close(s);
|
||||
|
||||
const TCPsocket sock = get_socket(s);
|
||||
|
||||
SDLNet_TCP_DelSocket(socket_set,sock);
|
||||
SDLNet_TCP_Close(sock);
|
||||
|
||||
remove_connection(s);
|
||||
} else {
|
||||
if(sockets.size() == 1) {
|
||||
std::cerr << "valid socket: " << (int)*sockets.begin() << "\n";
|
||||
|
@ -225,11 +370,29 @@ connection receive_data(config& cfg, connection connection_num, int timeout)
|
|||
}
|
||||
|
||||
for(sockets_list::const_iterator i = sockets.begin(); i != sockets.end(); ++i) {
|
||||
if(SDLNet_SocketReady(*i)) {
|
||||
const TCPsocket sock = get_socket(*i);
|
||||
if(SDLNet_SocketReady(sock)) {
|
||||
|
||||
//see if this socket is still waiting for it to be assigned its remote handle
|
||||
//if it is, then the first 4 bytes must be the remote handle.
|
||||
if(is_pending_remote_handle(*i)) {
|
||||
char buf[4];
|
||||
size_t len = SDLNet_TCP_Recv(sock,buf,4);
|
||||
if(len != 4) {
|
||||
throw error("Remote host disconnected",*i);
|
||||
}
|
||||
|
||||
const int remote_handle = SDLNet_Read32(buf);
|
||||
set_remote_handle(*i,remote_handle);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
std::map<connection,partial_buffer>::iterator part_received = received_data.find(*i);
|
||||
if(part_received == received_data.end()) {
|
||||
char num_buf[4];
|
||||
size_t len = SDLNet_TCP_Recv(*i,num_buf,4);
|
||||
size_t len = SDLNet_TCP_Recv(sock,num_buf,4);
|
||||
|
||||
if(len != 4) {
|
||||
throw error("Remote host disconnected",*i);
|
||||
|
@ -252,7 +415,7 @@ connection receive_data(config& cfg, connection connection_num, int timeout)
|
|||
partial_buffer& buf = part_received->second;
|
||||
|
||||
const size_t expected = buf.buf.size() - buf.upto;
|
||||
const size_t nbytes = SDLNet_TCP_Recv(*i,&buf.buf[buf.upto],expected);
|
||||
const size_t nbytes = SDLNet_TCP_Recv(sock,&buf.buf[buf.upto],expected);
|
||||
if(nbytes > expected) {
|
||||
std::cerr << "received " << nbytes << "/" << expected << "\n";
|
||||
throw error(std::string("network error: received wrong number of bytes: ") + SDLNet_GetError(),*i);
|
||||
|
@ -306,8 +469,9 @@ void set_default_send_size(size_t max_size)
|
|||
|
||||
void send_data(const config& cfg, connection connection_num, size_t max_size)
|
||||
{
|
||||
if(bad_sockets.count(connection_num) || bad_sockets.count(0))
|
||||
if(bad_sockets.count(connection_num) || bad_sockets.count(0)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if(max_size == 0) {
|
||||
max_size = default_max_send_size;
|
||||
|
@ -350,7 +514,7 @@ void send_data(const config& cfg, connection connection_num, size_t max_size)
|
|||
//to this host, then send all data now
|
||||
if((max_size == 0 || value.size()+1 <= max_size) && send_queue.count(connection_num) == 0) {
|
||||
std::cerr << "sending " << (value.size()+1) << " bytes\n";
|
||||
const int res = SDLNet_TCP_Send(connection_num,
|
||||
const int res = SDLNet_TCP_Send(get_socket(connection_num),
|
||||
const_cast<char*>(value.c_str()),
|
||||
value.size()+1);
|
||||
|
||||
|
@ -390,6 +554,8 @@ void process_send_queue(connection connection_num, size_t max_size)
|
|||
max_size = 8;
|
||||
}
|
||||
|
||||
const TCPsocket sock = get_socket(connection_num);
|
||||
|
||||
std::pair<send_queue_map::iterator,send_queue_map::iterator> itor = send_queue.equal_range(connection_num);
|
||||
if(itor.first != itor.second) {
|
||||
std::vector<char>& buf = itor.first->second.buf;
|
||||
|
@ -402,7 +568,7 @@ void process_send_queue(connection connection_num, size_t max_size)
|
|||
|
||||
std::cerr << "sending " << bytes_to_send << " from send queue\n";
|
||||
|
||||
const int res = SDLNet_TCP_Send(connection_num,&buf[upto],bytes_to_send);
|
||||
const int res = SDLNet_TCP_Send(sock,&buf[upto],bytes_to_send);
|
||||
if(res < int(bytes_to_send)) {
|
||||
std::cerr << "sending data failed: " << res << "/" << bytes_to_send << "\n";
|
||||
throw error("Sending queued data failed",connection_num);
|
||||
|
@ -427,8 +593,9 @@ void process_send_queue(connection connection_num, size_t max_size)
|
|||
void send_data_all_except(const config& cfg, connection connection_num, size_t max_size)
|
||||
{
|
||||
for(sockets_list::const_iterator i = sockets.begin(); i != sockets.end(); ++i) {
|
||||
if(*i == connection_num)
|
||||
if(*i == connection_num) {
|
||||
continue;
|
||||
}
|
||||
|
||||
assert(*i && *i != server_socket);
|
||||
send_data(cfg,*i,max_size);
|
||||
|
@ -438,7 +605,7 @@ void send_data_all_except(const config& cfg, connection connection_num, size_t m
|
|||
std::string ip_address(connection connection_num)
|
||||
{
|
||||
std::stringstream str;
|
||||
const IPaddress* const ip = SDLNet_TCP_GetPeerAddress(connection_num);
|
||||
const IPaddress* const ip = SDLNet_TCP_GetPeerAddress(get_socket(connection_num));
|
||||
if(ip != NULL) {
|
||||
const unsigned char* buf = reinterpret_cast<const unsigned char*>(&ip->host);
|
||||
for(int i = 0; i != sizeof(ip->host); ++i) {
|
||||
|
|
|
@ -33,7 +33,7 @@ private:
|
|||
bool free_;
|
||||
};
|
||||
|
||||
typedef TCPsocket connection;
|
||||
typedef int connection;
|
||||
|
||||
//the number of peers we are connected to
|
||||
size_t nconnections();
|
||||
|
|
Loading…
Add table
Reference in a new issue