Fixed socket locking problem in network_worker

This commit is contained in:
Pauli Nieminen 2008-01-28 08:37:15 +00:00
parent 1a68096fce
commit dc98a6e1fb

View file

@ -38,6 +38,7 @@
#include <iostream>
#include <map>
#include <vector>
#include <boost/shared_ptr.hpp>
#include <boost/iostreams/filter/gzip.hpp>
@ -114,8 +115,10 @@ struct buffer {
};
typedef boost::shared_ptr<buffer> bufferPtr;
bool managed = false; // management_mutex
typedef std::vector< buffer * > buffer_set;
typedef std::vector< bufferPtr > buffer_set;
buffer_set bufs; // management_mutex
struct schema_pair
@ -131,7 +134,7 @@ schema_map schemas; //schemas_mutex
typedef std::vector<TCPsocket> receive_list;
receive_list pending_receives; // management_mutex
typedef std::deque<buffer *> received_queue;
typedef std::deque<bufferPtr> received_queue;
received_queue received_data_queue; // receive_mutex
enum SOCKET_STATE { SOCKET_READY, SOCKET_LOCKED, SOCKET_ERRORED, SOCKET_INTERRUPT };
@ -196,7 +199,7 @@ bool receive_with_timeout(TCPsocket s, char* buf, size_t nbytes,
int poll_res;
do {
time_used = SDL_GetTicks() - startTicks;
poll_res = poll(&fd, 1, minimum<int>(15000,timeout_ms - time_used));
poll_res = poll(&fd, 1, timeout_ms - time_used);
} while(poll_res == -1 && errno == EINTR);
#elif defined(USE_SELECT)
@ -209,7 +212,7 @@ bool receive_with_timeout(TCPsocket s, char* buf, size_t nbytes,
do {
time_used = SDL_GetTicks() - startTicks;
time_left = minimum<int>(15000, timeout_ms - time_used);
time_left = timeout_ms - time_used;
tv.tv_sec = time_left/1000;
tv.tv_usec = time_left % 1000;
retval = select(((_TCPsocket*)s)->channel + 1, &readfds, NULL, NULL, &tv);
@ -374,6 +377,17 @@ static SOCKET_STATE receive_buf(TCPsocket sock, std::vector<char>& buf)
return SOCKET_READY;
}
inline void check_socket_result(TCPsocket& sock, SOCKET_STATE& result)
{
const threading::lock lock(*management_mutex);
socket_state_map::iterator lock_it = sockets_locked.find(sock);
assert(lock_it != sockets_locked.end());
lock_it->second = result;
if(result == SOCKET_ERRORED) {
++socket_errors;
}
}
static int process_queue(void*)
{
DBG_NW << "thread started...\n";
@ -383,7 +397,7 @@ static int process_queue(void*)
//to receive data from, sent_buf will be NULL. 'sock' will always refer to the socket
//that data is being sent to/received from
TCPsocket sock = NULL;
buffer *sent_buf = NULL;
bufferPtr sent_buf;
{
const threading::lock lock(*management_mutex);
@ -462,27 +476,21 @@ static int process_queue(void*)
SOCKET_STATE result = SOCKET_READY;
std::vector<char> buf;
if(sent_buf != NULL) {
if(sent_buf) {
result = send_buffer(sent_buf->sock, sent_buf->config_buf, sent_buf->gzipped);
delete sent_buf;
sent_buf = NULL;
sent_buf.reset();
} else {
result = receive_buf(sock,buf);
}
{
const threading::lock lock(*management_mutex);
socket_state_map::iterator lock_it = sockets_locked.find(sock);
assert(lock_it != sockets_locked.end());
lock_it->second = result;
if(result == SOCKET_ERRORED) {
++socket_errors;
}
if(result != SOCKET_READY || buf.empty()) continue;
if(result != SOCKET_READY || buf.empty())
{
check_socket_result(sock,result);
continue;
}
//if we received data, add it to the queue
buffer *received_data = new buffer(sock);
bufferPtr received_data(new buffer(sock));
std::string buffer(buf.begin(), buf.end());
std::istringstream stream(buffer);
// Binary wml starts with a char < 4, the first char of a gzip header is 31
@ -508,6 +516,7 @@ static int process_queue(void*)
const threading::lock lock_received(*received_mutex);
received_data_queue.push_back(received_data);
}
check_socket_result(sock,result);
}
// unreachable
}
@ -610,16 +619,12 @@ TCPsocket get_received_data(TCPsocket sock, config& cfg)
} else if (!(*itor)->config_error.empty()){
// throw the error in parent thread
std::string error = (*itor)->config_error;
buffer *buf = *itor;
received_data_queue.erase(itor);
delete buf;
throw config::error(error);
} else {
cfg = (*itor)->config_buf;
const TCPsocket res = (*itor)->sock;
buffer *buf = *itor;
received_data_queue.erase(itor);
delete buf;
return res;
}
}
@ -628,7 +633,7 @@ void queue_data(TCPsocket sock,const config& buf, const bool gzipped)
{
DBG_NW << "queuing data...\n";
buffer *queued_buf = new buffer(sock);
bufferPtr queued_buf(new buffer(sock));
queued_buf->config_buf = buf;
queued_buf->gzipped = gzipped;
{
@ -649,12 +654,14 @@ namespace
static void remove_buffers(TCPsocket sock)
{
{
for(buffer_set::iterator i = bufs.begin(), i_end = bufs.end(); i != i_end; ++i) {
for(buffer_set::iterator i = bufs.begin(), i_end = bufs.end(); i != i_end;) {
if ((*i)->sock == sock)
{
buffer *buf = *i;
bufs.erase(i);
delete buf;
i = bufs.erase(i);
}
else
{
++i;
}
}
}
@ -664,9 +671,7 @@ static void remove_buffers(TCPsocket sock)
for(received_queue::iterator j = received_data_queue.begin(); j != received_data_queue.end(); ) {
if((*j)->sock == sock) {
buffer *buf = *j;
j = received_data_queue.erase(j);
delete buf;
} else {
++j;
}
@ -717,17 +722,21 @@ TCPsocket detect_error()
{
const threading::lock lock(*management_mutex);
if(socket_errors > 0) {
for(socket_state_map::iterator i = sockets_locked.begin(); i != sockets_locked.end(); ++i) {
for(socket_state_map::iterator i = sockets_locked.begin(); i != sockets_locked.end();) {
if(i->second == SOCKET_ERRORED) {
--socket_errors;
const TCPsocket sock = i->first;
sockets_locked.erase(i);
sockets_locked.erase(++i);
pending_receives.erase(std::remove(pending_receives.begin(),pending_receives.end(),sock),pending_receives.end());
remove_buffers(sock);
const threading::lock lock_schema(*schemas_mutex);
schemas.erase(sock);
return sock;
}
else
{
++i;
}
}
}