Implemented send_file to reduce memory usage when sending files

This commit is contained in:
Pauli Nieminen 2008-05-28 20:31:44 +00:00
parent bace6dfb6c
commit 7c2e25f738
6 changed files with 108 additions and 30 deletions

View file

@ -87,6 +87,7 @@ Version 1.5.0+svn:
* dissallow_observers is on as default if side has controller=null
* Fixed null-pointer reference in network code
* Made networking code check system buffer size and limit send length
* Implemented send_file to reduce memory usage when sending files
* Fixed tokenizer not to strip CR from quoted string becaue it would
destroy images transfered over network
* Added possibility to use per fight EV statistics proposed by maboul

View file

@ -377,15 +377,11 @@ namespace {
if(campaign == NULL) {
network::send_data(construct_error("Add-on '" + (*req)["name"] + "'not found."), sock, gzipped);
} else {
size_t size = file_size((*campaign)["filename"]);
scoped_istream stream = istream_file((*campaign)["filename"]);
if (gzipped)
{
util::scoped_resource<char*,util::delete_array> buf(new char[size+1]);
stream->read(buf,size);
network::send_raw_data(buf, size, sock);
network::send_file((*campaign)["filename"], sock);
} else {
scoped_istream stream = istream_file((*campaign)["filename"]);
config cfg;
read_gz(cfg, *stream);
network::send_data(cfg, sock, false);
@ -498,8 +494,8 @@ namespace {
add_license(*data);
scoped_ostream campaign_file = ostream_file(filename);
{
scoped_ostream campaign_file = ostream_file(filename);
config_writer writer(*campaign_file, true, "",compress_level_);
writer.write(*data);
}
@ -575,8 +571,8 @@ namespace {
std::string scripts = validate_all_python_scripts(campaign_file);
if (!scripts.empty()) {
// Write the campaign with changed filenames back to disk
scoped_ostream ostream = ostream_file((*campaign)["filename"]);
{
scoped_ostream ostream = ostream_file((*campaign)["filename"]);
config_writer writer(*ostream, true, "",compress_level_);
writer.write(campaign_file);
}

View file

@ -819,6 +819,24 @@ connection receive_data(std::vector<char>& buf)
return result;
}
void send_file(const std::string& filename, connection connection_num)
{
assert(connection_num > 0);
if(bad_sockets.count(connection_num) || bad_sockets.count(0)) {
return;
}
const connection_map::iterator info = connections.find(connection_num);
if (info == connections.end()) {
ERR_NW << "Error: socket: " << connection_num
<< "\tnot found in connection_map. Not sending...\n";
return;
}
network_worker_pool::queue_file(info->second.sock, filename);
}
//! @todo Note the gzipped parameter should be removed later, we want to send
//! all data gzipped. This can be done once the campaign server is also updated
//! to work with gzipped data.

View file

@ -133,6 +133,8 @@ connection receive_data(config& cfg, connection connection_num=0, bool* gzipped
connection receive_data(config& cfg, connection connection_num, unsigned int timeout);
connection receive_data(std::vector<char>& buf);
void send_file(const std::string&, connection);
//! Function to send data down a given connection,
//! or broadcast to all peers if connection_num is 0.
//! Throws error.

View file

@ -24,6 +24,7 @@
#include "log.hpp"
#include "network_worker.hpp"
#include "network.hpp"
#include "filesystem.hpp"
#include "thread.hpp"
#include "serialization/binary_or_text.hpp"
#include "serialization/binary_wml.hpp"
@ -38,6 +39,10 @@
#include <map>
#include <vector>
#ifdef USE_SENDFILE
#include <sys/sendfile.h>
#endif
#include <boost/iostreams/filter/gzip.hpp>
#ifdef __AMIGAOS4__
@ -294,7 +299,7 @@ static void make_network_buffer(const char* input, int len, std::vector<char>& b
buf.back() = 0;
}
static SOCKET_STATE send_buffer(TCPsocket sock, std::vector<char>& buf)
static SOCKET_STATE send_buffer(TCPsocket sock, std::vector<char>& buf, int in_size = -1)
{
#ifdef __BEOS__
int timeout = 15000;
@ -302,6 +307,8 @@ static SOCKET_STATE send_buffer(TCPsocket sock, std::vector<char>& buf)
check_send_buffer_size(sock);
size_t upto = 0;
size_t size = buf.size();
if (in_size != -1)
size = in_size;
int send_len = 0;
if (!raw_data_only)
@ -387,6 +394,43 @@ static SOCKET_STATE send_buffer(TCPsocket sock, std::vector<char>& buf)
}
}
static SOCKET_STATE send_file(buffer* buf)
{
size_t upto = 0;
int send_size = 0;
size_t filesize = file_size(buf->config_error);
#ifdef USE_SENDFILE
std::vector<char>& buffer;
buffer.reserve(4);
SDLNet_Write32(filesize,&buffer[0]);
#else
SDLNet_Write32(filesize,&buf->raw_buffer[0]);
scoped_istream file_stream = istream_file(buf->config_error);
send_buffer(buf->sock, buf->raw_buffer, 4);
#endif
while (true)
{
#ifdef USE_SENDFILE
#else
// read data
file_stream->read(&buf->raw_buffer[0], buf->raw_buffer.size());
send_size = file_stream->gcount();
upto += send_size;
// send data to socket
send_buffer(buf->sock, buf->raw_buffer, send_size);
if (upto == filesize)
{
buf->raw_buffer[0] = 0;
send_buffer(buf->sock, buf->raw_buffer, 1);
break;
}
#endif
}
return SOCKET_READY;
}
static SOCKET_STATE receive_buf(TCPsocket sock, std::vector<char>& buf)
{
#ifdef __GNUC__
@ -529,12 +573,19 @@ static int process_queue(void* shard_num)
std::vector<char> buf;
if(sent_buf) {
if(sent_buf->raw_buffer.empty()) {
const std::string &value = sent_buf->stream.str();
make_network_buffer(value.c_str(), value.size(), sent_buf->raw_buffer);
}
result = send_buffer(sent_buf->sock, sent_buf->raw_buffer);
if(!sent_buf->config_error.empty())
{
// We have file to send over net
send_file(sent_buf);
} else {
if(sent_buf->raw_buffer.empty()) {
const std::string &value = sent_buf->stream.str();
make_network_buffer(value.c_str(), value.size(), sent_buf->raw_buffer);
}
result = send_buffer(sent_buf->sock, sent_buf->raw_buffer);
}
delete sent_buf;
} else {
result = receive_buf(sock,buf);
@ -748,11 +799,8 @@ TCPsocket get_received_data(std::vector<char>& out)
return res;
}
void queue_raw_data(TCPsocket sock, const char* buf, int len)
static void queue_buffer(TCPsocket sock, buffer* queued_buf)
{
buffer* queued_buf = new buffer(sock);
assert(*buf == 31);
make_network_buffer(buf, len, queued_buf->raw_buffer);
const size_t shard = get_shard(sock);
const threading::lock lock(*shard_mutexes[shard]);
outgoing_bufs[shard].push_back(queued_buf);
@ -763,6 +811,28 @@ void queue_raw_data(TCPsocket sock, const char* buf, int len)
}
void queue_raw_data(TCPsocket sock, const char* buf, int len)
{
buffer* queued_buf = new buffer(sock);
assert(*buf == 31);
make_network_buffer(buf, len, queued_buf->raw_buffer);
queue_buffer(sock, queued_buf);
}
void queue_file(TCPsocket sock, const std::string& filename)
{
buffer* queued_buf = new buffer(sock);
queued_buf->config_error = filename;
#ifndef USE_SENDFILE
// We reserve buffer in main thread
// this helps in memory problems with threads
// We use 8KB buffer
queued_buf->raw_buffer.resize(1024*8);
#endif
queue_buffer(sock, queued_buf);
}
void queue_data(TCPsocket sock,const config& buf, const bool gzipped)
{
DBG_NW << "queuing data...\n";
@ -770,18 +840,7 @@ void queue_data(TCPsocket sock,const config& buf, const bool gzipped)
buffer* queued_buf = new buffer(sock);
output_to_buffer(sock, buf, queued_buf->stream, gzipped);
queued_buf->gzipped = gzipped;
{
const size_t shard = get_shard(sock);
const threading::lock lock(*shard_mutexes[shard]);
outgoing_bufs[shard].push_back(queued_buf);
socket_state_map::const_iterator i = sockets_locked[shard].insert(std::pair<TCPsocket,SOCKET_STATE>(sock,SOCKET_READY)).first;
if(i->second == SOCKET_READY || i->second == SOCKET_ERRORED) {
cond[shard]->notify_one();
}
}
queue_buffer(sock, queued_buf);
}
namespace

View file

@ -46,6 +46,8 @@ void receive_data(TCPsocket sock);
TCPsocket get_received_data(TCPsocket sock, config& cfg, bool* gzipped= 0);
TCPsocket get_received_data(std::vector<char>& buf);
void queue_file(TCPsocket sock, const std::string&);
void queue_raw_data(TCPsocket sock, const char* buf, int len);
void queue_data(TCPsocket sock, const config& buf, const bool gzipped);
bool is_locked(const TCPsocket sock);