sharded network worker threads to reduce contention

This commit is contained in:
David White 2008-03-11 04:40:20 +00:00
parent 75effab510
commit 12da898f72
4 changed files with 133 additions and 123 deletions

View file

@ -787,12 +787,6 @@ std::string ip_address(connection connection_num)
return str.str();
}
std::pair<unsigned int,size_t> get_thread_state()
{
return network_worker_pool::thread_state();
}
statistics get_send_stats(connection handle)
{
return network_worker_pool::get_current_transfer_stats(handle == 0 ? get_socket(sockets.back()) : get_socket(handle)).first;

View file

@ -137,10 +137,6 @@ void send_data_all_except(const config& cfg, connection connection_num, const bo
//! Function to get the remote ip address of a socket.
std::string ip_address(connection connection_num);
//! Function to know the total number of threads and the number of idle threads.
std::pair<unsigned int,size_t> get_thread_state();
struct connection_stats
{
connection_stats(int sent, int received, int connected_at);

View file

@ -92,9 +92,17 @@ struct _TCPsocket {
IPaddress localAddress;
int sflag;
};
unsigned int waiting_threads = 0; // management_mutex
size_t min_threads = 0; // management_mutex
size_t max_threads = 0; // management_mutex
#ifndef NUM_SHARDS
#define NUM_SHARDS 1
#endif
unsigned int waiting_threads[NUM_SHARDS];
size_t min_threads = 0;
size_t max_threads = 0;
int get_shard(TCPsocket sock) { return intptr_t(sock)%NUM_SHARDS; }
struct buffer {
explicit buffer(TCPsocket sock) :
@ -115,9 +123,9 @@ struct buffer {
};
bool managed = false; // management_mutex
bool managed = false;
typedef std::vector< buffer* > buffer_set;
buffer_set bufs; // management_mutex
buffer_set bufs[NUM_SHARDS];
struct schema_pair
{
@ -130,7 +138,7 @@ schema_map schemas; //schemas_mutex
//a queue of sockets that we are waiting to receive on
typedef std::vector<TCPsocket> receive_list;
receive_list pending_receives; // management_mutex
receive_list pending_receives[NUM_SHARDS];
typedef std::deque<buffer*> received_queue;
received_queue received_data_queue; // receive_mutex
@ -139,18 +147,18 @@ enum SOCKET_STATE { SOCKET_READY, SOCKET_LOCKED, SOCKET_ERRORED, SOCKET_INTERRUP
typedef std::map<TCPsocket,SOCKET_STATE> socket_state_map;
typedef std::map<TCPsocket, std::pair<network::statistics,network::statistics> > socket_stats_map;
socket_state_map sockets_locked; // management_mutex
socket_state_map sockets_locked[NUM_SHARDS];
socket_stats_map transfer_stats; // stats_mutex
int socket_errors = 0; // management_mutex
threading::mutex* management_mutex = NULL;
int socket_errors[NUM_SHARDS];
threading::mutex* shard_mutexes[NUM_SHARDS];
threading::mutex* stats_mutex = NULL;
threading::mutex* schemas_mutex = NULL;
threading::mutex* received_mutex = NULL;
threading::condition* cond = NULL;
threading::condition* cond[NUM_SHARDS];
std::map<Uint32,threading::thread*> threads; // management_mutex
std::vector<Uint32> to_clear; // management_mutex
std::map<Uint32,threading::thread*> threads[NUM_SHARDS];
std::vector<Uint32> to_clear[NUM_SHARDS];
int receive_bytes(TCPsocket s, char* buf, size_t nbytes)
{
@ -278,9 +286,10 @@ static SOCKET_STATE send_buffer(TCPsocket sock, config& config_in, const bool gz
while(true) {
#endif
{
const int shard = get_shard(sock);
// check if the socket is still locked
const threading::lock lock(*management_mutex);
if(sockets_locked[sock] != SOCKET_LOCKED)
const threading::lock lock(*shard_mutexes[shard]);
if(sockets_locked[shard][sock] != SOCKET_LOCKED)
return SOCKET_ERRORED;
}
const int res = SDLNet_TCP_Send(sock, &buf[upto], static_cast<int>(size - upto));
@ -380,17 +389,19 @@ static SOCKET_STATE receive_buf(TCPsocket sock, std::vector<char>& buf)
inline void check_socket_result(TCPsocket& sock, SOCKET_STATE& result)
{
const threading::lock lock(*management_mutex);
socket_state_map::iterator lock_it = sockets_locked.find(sock);
assert(lock_it != sockets_locked.end());
const int shard = get_shard(sock);
const threading::lock lock(*shard_mutexes[shard]);
socket_state_map::iterator lock_it = sockets_locked[shard].find(sock);
assert(lock_it != sockets_locked[shard].end());
lock_it->second = result;
if(result == SOCKET_ERRORED) {
++socket_errors;
++socket_errors[shard];
}
}
static int process_queue(void*)
static int process_queue(void* shard_num)
{
int shard = static_cast<int>(reinterpret_cast<intptr_t>(shard_num));
DBG_NW << "thread started...\n";
for(;;) {
@ -401,46 +412,45 @@ static int process_queue(void*)
buffer* sent_buf = 0;
{
const threading::lock lock(*management_mutex);
while(managed && !to_clear.empty()) {
Uint32 tmp = to_clear.back();
to_clear.pop_back();
threading::thread *zombie = threads[tmp];
threads.erase(tmp);
const threading::lock lock(*shard_mutexes[shard]);
while(managed && !to_clear[shard].empty()) {
Uint32 tmp = to_clear[shard].back();
to_clear[shard].pop_back();
threading::thread *zombie = threads[shard][tmp];
threads[shard].erase(tmp);
delete zombie;
}
if(min_threads && waiting_threads >= min_threads) {
if(min_threads && waiting_threads[shard] >= min_threads) {
DBG_NW << "worker thread exiting... not enough jobs\n";
to_clear.push_back(threading::get_current_thread_id());
to_clear[shard].push_back(threading::get_current_thread_id());
return 0;
}
waiting_threads++;
waiting_threads[shard]++;
for(;;) {
buffer_set::iterator itor = bufs.begin(), itor_end = bufs.end();
buffer_set::iterator itor = bufs[shard].begin(), itor_end = bufs[shard].end();
for(; itor != itor_end; ++itor) {
socket_state_map::iterator lock_it = sockets_locked.find((*itor)->sock);
assert(lock_it != sockets_locked.end());
socket_state_map::iterator lock_it = sockets_locked[shard].find((*itor)->sock);
assert(lock_it != sockets_locked[shard].end());
if(lock_it->second == SOCKET_READY) {
lock_it->second = SOCKET_LOCKED;
sent_buf = *itor;
sock = sent_buf->sock;
bufs.erase(itor);
bufs[shard].erase(itor);
break;
}
}
if(sock == NULL) {
receive_list::iterator itor = pending_receives.begin(), itor_end = pending_receives.end();
receive_list::iterator itor = pending_receives[shard].begin(), itor_end = pending_receives[shard].end();
for(; itor != itor_end; ++itor) {
socket_state_map::iterator lock_it = sockets_locked.find(*itor);
assert(lock_it != sockets_locked.end());
socket_state_map::iterator lock_it = sockets_locked[shard].find(*itor);
assert(lock_it != sockets_locked[shard].end());
if(lock_it->second == SOCKET_READY) {
lock_it->second = SOCKET_LOCKED;
sock = *itor;
pending_receives.erase(itor);
pending_receives[shard].erase(itor);
break;
}
}
@ -452,20 +462,20 @@ static int process_queue(void*)
if(managed == false) {
DBG_NW << "worker thread exiting...\n";
waiting_threads--;
to_clear.push_back(threading::get_current_thread_id());
waiting_threads[shard]--;
to_clear[shard].push_back(threading::get_current_thread_id());
return 0;
}
cond->wait(*management_mutex); // temporarily release the mutex and wait for a buffer
cond[shard]->wait(*shard_mutexes[shard]); // temporarily release the mutex and wait for a buffer
}
waiting_threads--;
waiting_threads[shard]--;
// if we are the last thread in the pool, create a new one
if(!waiting_threads && managed == true) {
if(!waiting_threads[shard] && managed == true) {
// max_threads of 0 is unlimited
if(!max_threads || max_threads >threads.size()) {
threading::thread * tmp = new threading::thread(process_queue,NULL);
threads[tmp->get_id()] =tmp;
if(!max_threads || max_threads >threads[shard].size()) {
threading::thread * tmp = new threading::thread(process_queue,shard_num);
threads[shard][tmp->get_id()] =tmp;
}
}
}
@ -531,19 +541,23 @@ manager::manager(size_t p_min_threads,size_t p_max_threads) : active_(!managed)
{
if(active_) {
managed = true;
management_mutex = new threading::mutex();
for(int i = 0; i != NUM_SHARDS; ++i) {
shard_mutexes[i] = new threading::mutex();
cond[i] = new threading::condition();
}
stats_mutex = new threading::mutex();
schemas_mutex = new threading::mutex();
received_mutex = new threading::mutex();
cond = new threading::condition();
const threading::lock lock(*management_mutex);
min_threads = p_min_threads;
max_threads = p_max_threads;
for(size_t n = 0; n != min_threads; ++n) {
threading::thread * tmp = new threading::thread(process_queue,NULL);
threads[tmp->get_id()] =tmp;
for(int shard = 0; shard != NUM_SHARDS; ++shard) {
const threading::lock lock(*shard_mutexes[shard]);
for(size_t n = 0; n != min_threads; ++n) {
threading::thread * tmp = new threading::thread(process_queue,(void*)intptr_t(shard));
threads[shard][tmp->get_id()] = tmp;
}
}
}
}
@ -551,55 +565,56 @@ manager::manager(size_t p_min_threads,size_t p_max_threads) : active_(!managed)
manager::~manager()
{
if(active_) {
{
const threading::lock lock(*management_mutex);
managed = false;
socket_errors = 0;
}
cond->notify_all();
managed = false;
for(std::map<Uint32,threading::thread*>::const_iterator i = threads.begin(); i != threads.end(); ++i) {
DBG_NW << "waiting for thread " << i->first << " to exit...\n";
delete i->second;
DBG_NW << "thread exited...\n";
for(int shard = 0; shard != NUM_SHARDS; ++shard) {
{
const threading::lock lock(*shard_mutexes[shard]);
socket_errors[shard] = 0;
}
cond[shard]->notify_all();
for(std::map<Uint32,threading::thread*>::const_iterator i = threads[shard].begin(); i != threads[shard].end(); ++i) {
DBG_NW << "waiting for thread " << i->first << " to exit...\n";
delete i->second;
DBG_NW << "thread exited...\n";
}
threads[shard].clear();
delete shard_mutexes[shard];
shard_mutexes[shard] = NULL;
delete cond[shard];
cond[shard] = NULL;
}
threads.clear();
delete cond;
delete management_mutex;
delete stats_mutex;
delete schemas_mutex;
delete received_mutex;
management_mutex = NULL;
stats_mutex = 0;
schemas_mutex = 0;
received_mutex = 0;
cond = NULL;
sockets_locked.clear();
for(int i = 0; i != NUM_SHARDS; ++i) {
sockets_locked[i].clear();
}
transfer_stats.clear();
DBG_NW << "exiting manager::~manager()\n";
}
}
std::pair<unsigned int,size_t> thread_state()
{
const threading::lock lock(*management_mutex);
return std::pair<unsigned int,size_t>(waiting_threads,threads.size());
}
void receive_data(TCPsocket sock)
{
{
const threading::lock lock(*management_mutex);
pending_receives.push_back(sock);
const int shard = get_shard(sock);
const threading::lock lock(*shard_mutexes[shard]);
pending_receives[shard].push_back(sock);
socket_state_map::const_iterator i = sockets_locked.insert(std::pair<TCPsocket,SOCKET_STATE>(sock,SOCKET_READY)).first;
socket_state_map::const_iterator i = sockets_locked[shard].insert(std::pair<TCPsocket,SOCKET_STATE>(sock,SOCKET_READY)).first;
if(i->second == SOCKET_READY || i->second == SOCKET_ERRORED) {
cond->notify_one();
cond[shard]->notify_one();
}
}
}
@ -643,13 +658,14 @@ void queue_data(TCPsocket sock,const config& buf, const bool gzipped)
queued_buf->config_buf = buf;
queued_buf->gzipped = gzipped;
{
const threading::lock lock(*management_mutex);
const int shard = get_shard(sock);
const threading::lock lock(*shard_mutexes[shard]);
bufs.push_back(queued_buf);
bufs[shard].push_back(queued_buf);
socket_state_map::const_iterator i = sockets_locked.insert(std::pair<TCPsocket,SOCKET_STATE>(sock,SOCKET_READY)).first;
socket_state_map::const_iterator i = sockets_locked[shard].insert(std::pair<TCPsocket,SOCKET_STATE>(sock,SOCKET_READY)).first;
if(i->second == SOCKET_READY || i->second == SOCKET_ERRORED) {
cond->notify_one();
cond[shard]->notify_one();
}
}
@ -658,15 +674,16 @@ void queue_data(TCPsocket sock,const config& buf, const bool gzipped)
namespace
{
//! Caller has to make sure to own management_mutex
//! Caller has to make sure to own the mutex for this shard
void remove_buffers(TCPsocket sock)
{
{
for(buffer_set::iterator i = bufs.begin(); i != bufs.end();) {
const int shard = get_shard(sock);
for(buffer_set::iterator i = bufs[shard].begin(); i != bufs[shard].end();) {
if ((*i)->sock == sock)
{
buffer* buf = *i;
i = bufs.erase(i);
i = bufs[shard].erase(i);
delete buf;
}
else
@ -694,20 +711,22 @@ void remove_buffers(TCPsocket sock)
} // anonymous namespace
bool is_locked(const TCPsocket sock) {
const threading::lock lock(*management_mutex);
const socket_state_map::iterator lock_it = sockets_locked.find(sock);
if (lock_it == sockets_locked.end()) return false;
const int shard = get_shard(sock);
const threading::lock lock(*shard_mutexes[shard]);
const socket_state_map::iterator lock_it = sockets_locked[shard].find(sock);
if (lock_it == sockets_locked[shard].end()) return false;
return (lock_it->second == SOCKET_LOCKED);
}
bool close_socket(TCPsocket sock, bool force)
{
{
const threading::lock lock(*management_mutex);
const int shard = get_shard(sock);
const threading::lock lock(*shard_mutexes[shard]);
pending_receives.erase(std::remove(pending_receives.begin(),pending_receives.end(),sock),pending_receives.end());
const socket_state_map::iterator lock_it = sockets_locked.find(sock);
if(lock_it == sockets_locked.end()) {
pending_receives[shard].erase(std::remove(pending_receives[shard].begin(),pending_receives[shard].end(),sock),pending_receives[shard].end());
const socket_state_map::iterator lock_it = sockets_locked[shard].find(sock);
if(lock_it == sockets_locked[shard].end()) {
remove_buffers(sock);
return true;
}
@ -717,7 +736,7 @@ bool close_socket(TCPsocket sock, bool force)
}
if (!(lock_it->second == SOCKET_LOCKED || lock_it->second == SOCKET_INTERRUPT) || force) {
sockets_locked.erase(lock_it);
sockets_locked[shard].erase(lock_it);
remove_buffers(sock);
return true;
} else {
@ -732,27 +751,29 @@ bool close_socket(TCPsocket sock, bool force)
TCPsocket detect_error()
{
const threading::lock lock(*management_mutex);
if(socket_errors > 0) {
for(socket_state_map::iterator i = sockets_locked.begin(); i != sockets_locked.end();) {
if(i->second == SOCKET_ERRORED) {
--socket_errors;
const TCPsocket sock = i->first;
sockets_locked.erase(i++);
pending_receives.erase(std::remove(pending_receives.begin(),pending_receives.end(),sock),pending_receives.end());
remove_buffers(sock);
const threading::lock lock_schema(*schemas_mutex);
schemas.erase(sock);
return sock;
}
else
{
++i;
for(int shard = 0; shard != NUM_SHARDS; ++shard) {
const threading::lock lock(*shard_mutexes[shard]);
if(socket_errors[shard] > 0) {
for(socket_state_map::iterator i = sockets_locked[shard].begin(); i != sockets_locked[shard].end();) {
if(i->second == SOCKET_ERRORED) {
--socket_errors[shard];
const TCPsocket sock = i->first;
sockets_locked[shard].erase(i++);
pending_receives[shard].erase(std::remove(pending_receives[shard].begin(),pending_receives[shard].end(),sock),pending_receives[shard].end());
remove_buffers(sock);
const threading::lock lock_schema(*schemas_mutex);
schemas.erase(sock);
return sock;
}
else
{
++i;
}
}
}
}
socket_errors = 0;
socket_errors[shard] = 0;
}
return 0;
}

View file

@ -44,7 +44,6 @@ TCPsocket get_received_data(TCPsocket sock, config& cfg);
void queue_data(TCPsocket sock, const config& buf, const bool gzipped);
bool is_locked(const TCPsocket sock);
bool close_socket(TCPsocket sock, bool force=false);
std::pair<unsigned int,size_t> thread_state();
TCPsocket detect_error();
std::pair<network::statistics,network::statistics> get_current_transfer_stats(TCPsocket sock);