merged send_file from trunk

This commit is contained in:
Pauli Nieminen 2008-06-05 13:33:40 +00:00
parent ce6239df48
commit 1dddfba779

View file

@ -42,6 +42,9 @@
#ifdef USE_SENDFILE
#include <sys/sendfile.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#define HAVE_POLL_H
#endif
#include <boost/iostreams/filter/gzip.hpp>
@ -313,10 +316,12 @@ static SOCKET_STATE send_buffer(TCPsocket sock, std::vector<char>& buf, int in_s
// check if the socket is still locked
const threading::lock lock(*shard_mutexes[shard]);
if(sockets_locked[shard][sock] != SOCKET_LOCKED)
{
return SOCKET_ERRORED;
}
}
const int res = SDLNet_TCP_Send(sock, &buf[upto], static_cast<int>(size - upto));
if(res == static_cast<int>(size - upto)) {
if (!raw_data_only)
{
@ -325,11 +330,7 @@ static SOCKET_STATE send_buffer(TCPsocket sock, std::vector<char>& buf, int in_s
}
return SOCKET_READY;
}
#if defined(EAGAIN) && !defined(__BEOS__) && !defined(_WIN32)
if(errno == EAGAIN)
#elif defined(EWOULDBLOCK)
if(errno == EWOULDBLOCK)
#endif
{
// update how far we are
upto += static_cast<size_t>(res);
@ -346,6 +347,7 @@ static SOCKET_STATE send_buffer(TCPsocket sock, std::vector<char>& buf, int in_s
poll_res = poll(&fd, 1, 15000);
} while(poll_res == -1 && errno == EINTR);
if(poll_res > 0)
continue;
#elif defined(USE_SELECT) && !defined(__BEOS__)
@ -385,36 +387,93 @@ static SOCKET_STATE send_file(buffer* buf)
size_t upto = 0;
int send_size = 0;
size_t filesize = file_size(buf->config_error);
#ifdef USE_SENDFILE
std::vector<char>& buffer;
std::vector<char> buffer;
buffer.reserve(4);
SDLNet_Write32(filesize,&buffer[0]);
int socket = reinterpret_cast<_TCPsocket*>(buf->sock)->channel;
int in_file = open(buf->config_error.c_str(),O_NOATIME | O_RDONLY);
int cock = 1;
int poll_res;
struct pollfd fd = {socket, POLLOUT, 0 };
setsockopt(socket, IPPROTO_TCP, TCP_CORK, &cock, sizeof(cock));;
do {
poll_res = poll(&fd, 1, 600000);
} while(poll_res == -1 && errno == EINTR);
SOCKET_STATE result = send_buffer(buf->sock, buffer, 4);
if (result != SOCKET_READY)
{
close(in_file);
cock = 0;
setsockopt(socket, IPPROTO_TCP, TCP_CORK, &cock, sizeof(cock));
return result;
}
result = SOCKET_READY;
#else
SDLNet_Write32(filesize,&buf->raw_buffer[0]);
scoped_istream file_stream = istream_file(buf->config_error);
send_buffer(buf->sock, buf->raw_buffer, 4);
SOCKET_STATE result = send_buffer(buf->sock, buf->raw_buffer, 4);
if (result != SOCKET_READY)
{
return result;
}
#endif
while (true)
{
#ifdef USE_SENDFILE
do {
poll_res = poll(&fd, 1, 600000);
} while(poll_res == -1 && errno == EINTR);
if (poll_res <= 0 )
{
result = SOCKET_ERRORED;
break;
}
int bytes = ::sendfile(socket, in_file, 0, filesize);
upto += bytes;
if (upto == filesize)
{
buf->raw_buffer.push_back(0);
result = send_buffer(buf->sock, buf->raw_buffer, 1);
break;
}
#else
// read data
file_stream->read(&buf->raw_buffer[0], buf->raw_buffer.size());
send_size = file_stream->gcount();
upto += send_size;
// send data to socket
send_buffer(buf->sock, buf->raw_buffer, send_size);
result = send_buffer(buf->sock, buf->raw_buffer, send_size);
if (result != SOCKET_READY)
{
break;
}
if (upto == filesize)
{
buf->raw_buffer[0] = 0;
send_buffer(buf->sock, buf->raw_buffer, 1);
result = send_buffer(buf->sock, buf->raw_buffer, 1);
break;
}
#endif
}
return SOCKET_READY;
#ifdef USE_SENDFILE
close(in_file);
cock = 0;
setsockopt(socket, IPPROTO_TCP, TCP_CORK, &cock, sizeof(cock));
#endif
return result;
}
static SOCKET_STATE receive_buf(TCPsocket sock, std::vector<char>& buf)
@ -559,10 +618,11 @@ static int process_queue(void* shard_num)
std::vector<char> buf;
if(sent_buf) {
if(!sent_buf->config_error.empty())
{
// We have file to send over net
send_file(sent_buf);
if(!sent_buf->config_error.empty())
{
// We have file to send over net
result = send_file(sent_buf);
} else {
if(sent_buf->raw_buffer.empty()) {
output_to_buffer(sent_buf->sock, sent_buf->config_buf, sent_buf->stream, sent_buf->gzipped);
@ -750,16 +810,16 @@ TCPsocket get_received_data(TCPsocket sock, config& cfg, bool* gzipped)
std::string error = (*itor)->config_error;
buffer* buf = *itor;
received_data_queue.erase(itor);
if (gzipped)
*gzipped = buf->gzipped;
delete buf;
throw config::error(error);
} else {
cfg.swap((*itor)->config_buf);
const TCPsocket res = (*itor)->sock;
if (gzipped)
{
*gzipped = (*itor)->gzipped;
}
buffer* buf = *itor;
if (gzipped)
*gzipped = buf->gzipped;
received_data_queue.erase(itor);
delete buf;
return res;