better handling of network threads dying

This commit is contained in:
Jérémy Rosen 2007-01-08 20:20:03 +00:00
parent c0307c3572
commit 623adc5b27
3 changed files with 31 additions and 12 deletions

View file

@ -696,10 +696,8 @@ void send_data(const config& cfg, connection connection_num)
// std::cerr << "--- SEND DATA to " << ((int)connection_num) << ": '"
// << cfg.write() << "'\n--- END SEND DATA\n";
std::vector<char> buf(4 + value.size() + 1);
SDLNet_Write32(value.size()+1,&buf[0]);
std::copy(value.begin(),value.end(),buf.begin()+4);
buf.back() = 0;
std::vector<char> buf( value.size());
std::copy(value.begin(),value.end(),buf.begin());
const connection_map::iterator info = connections.find(connection_num);
wassert(info != connections.end());

View file

@ -111,9 +111,15 @@ int socket_errors = 0;
threading::mutex* global_mutex = NULL;
threading::condition* cond = NULL;
std::vector<threading::thread*> threads;
std::map<Uint32,threading::thread*> threads;
std::vector<Uint32> to_clear;
SOCKET_STATE send_buf(TCPsocket sock, std::vector<char>& buf2) {
std::vector<char> buf(4 + buf2.size() + 1);
SDLNet_Write32(buf2.size()+1,&buf[0]);
std::copy(buf2.begin(),buf2.end(),buf.begin()+4);
buf.back() = 0;
SOCKET_STATE send_buf(TCPsocket sock, std::vector<char>& buf) {
#ifdef __BEOS__
int timeout = 15000;
#endif
@ -288,8 +294,17 @@ int process_queue(void*)
{
const threading::lock lock(*global_mutex);
while(managed && !to_clear.empty()) {
Uint32 tmp = to_clear.back();
to_clear.pop_back();
threading::thread *zombie = threads[tmp];
threads.erase(tmp);
delete zombie;
}
if(waiting_threads >= thread_pool) {
LOG_NW << "worker thread exiting... not enough job\n";
to_clear.push_back(threading::get_current_thread_id());
return 0;
}
waiting_threads++;
@ -330,6 +345,7 @@ int process_queue(void*)
if(managed == false) {
LOG_NW << "worker thread exiting...\n";
waiting_threads--;
to_clear.push_back(threading::get_current_thread_id());
return 0;
}
@ -337,8 +353,9 @@ int process_queue(void*)
}
waiting_threads--;
// if we are the last thread in the pool, create a new one
if(!waiting_threads) {
threads.push_back(new threading::thread(process_queue,NULL));
if(!waiting_threads && managed == true) {
threading::thread * tmp = new threading::thread(process_queue,NULL);
threads[tmp->get_id()] =tmp;
}
}
@ -390,7 +407,8 @@ manager::manager(size_t nthreads) : active_(!managed)
thread_pool = nthreads;
for(size_t n = 0; n != nthreads; ++n) {
threads.push_back(new threading::thread(process_queue,NULL));
threading::thread * tmp = new threading::thread(process_queue,NULL);
threads[tmp->get_id()] =tmp;
}
}
}
@ -405,9 +423,9 @@ manager::~manager()
}
cond->notify_all();
for(std::vector<threading::thread*>::const_iterator i = threads.begin(); i != threads.end(); ++i) {
LOG_NW << "waiting for thread " << int(i - threads.begin()) << " to exit...\n";
delete *i;
for(std::map<Uint32,threading::thread*>::const_iterator i = threads.begin(); i != threads.end(); ++i) {
LOG_NW << "waiting for thread " << i->first << " to exit...\n";
delete i->second;
LOG_NW << "thread exited...\n";
}

View file

@ -63,6 +63,8 @@ public:
void join();
void detach();
Uint32 get_id() { return SDL_GetThreadID(thread_); }
private:
thread(const thread&);
void operator=(const thread&);
@ -70,6 +72,7 @@ private:
SDL_Thread* thread_;
};
inline Uint32 get_current_thread_id() { return SDL_ThreadID(); }
// Binary mutexes.
//
// Implements an interface to binary mutexes. This class only defines the