|
@@ -146,7 +146,10 @@ void Job::register_on_ready_to_read(Function<void()> callback)
|
|
|
|
|
|
// As `m_socket` is a buffered object, we might not get notifications for data in the buffer
|
|
|
// so exhaust the buffer to ensure we don't end up waiting forever.
|
|
|
- if (MUST(m_socket->can_read_without_blocking()) && m_state != State::Finished && !has_error()) {
|
|
|
+ auto can_read_without_blocking = m_socket->can_read_without_blocking();
|
|
|
+ if (can_read_without_blocking.is_error())
|
|
|
+ return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
|
|
|
+ if (can_read_without_blocking.value() && m_state != State::Finished && !has_error()) {
|
|
|
deferred_invoke([this] {
|
|
|
if (m_socket && m_socket->on_ready_to_read)
|
|
|
m_socket->on_ready_to_read();
|
|
@@ -155,29 +158,25 @@ void Job::register_on_ready_to_read(Function<void()> callback)
|
|
|
};
|
|
|
}
|
|
|
|
|
|
-String Job::read_line(size_t size)
|
|
|
+ErrorOr<String> Job::read_line(size_t size)
|
|
|
{
|
|
|
- auto buffer = ByteBuffer::create_uninitialized(size).release_value_but_fixme_should_propagate_errors();
|
|
|
- auto nread = m_socket->read_until(buffer, "\r\n"sv).release_value_but_fixme_should_propagate_errors();
|
|
|
+ auto buffer = TRY(ByteBuffer::create_uninitialized(size));
|
|
|
+ auto nread = TRY(m_socket->read_until(buffer, "\r\n"sv));
|
|
|
return String::copy(buffer.span().slice(0, nread));
|
|
|
}
|
|
|
|
|
|
-ByteBuffer Job::receive(size_t size)
|
|
|
+ErrorOr<ByteBuffer> Job::receive(size_t size)
|
|
|
{
|
|
|
if (size == 0)
|
|
|
- return {};
|
|
|
+ return ByteBuffer {};
|
|
|
|
|
|
- auto buffer = ByteBuffer::create_uninitialized(size).release_value_but_fixme_should_propagate_errors();
|
|
|
+ auto buffer = TRY(ByteBuffer::create_uninitialized(size));
|
|
|
size_t nread;
|
|
|
do {
|
|
|
auto result = m_socket->read(buffer);
|
|
|
if (result.is_error() && result.error().is_errno() && result.error().code() == EINTR)
|
|
|
continue;
|
|
|
- if (result.is_error()) {
|
|
|
- dbgln_if(JOB_DEBUG, "Failed while reading: {}", result.error());
|
|
|
- VERIFY_NOT_REACHED();
|
|
|
- }
|
|
|
- nread = MUST(result);
|
|
|
+ nread = TRY(result);
|
|
|
break;
|
|
|
} while (true);
|
|
|
return buffer.slice(0, nread);
|
|
@@ -214,13 +213,31 @@ void Job::on_socket_connected()
|
|
|
}
|
|
|
|
|
|
while (m_state == State::InStatus) {
|
|
|
- if (!MUST(m_socket->can_read_line())) {
|
|
|
+ auto can_read_line = m_socket->can_read_line();
|
|
|
+ if (can_read_line.is_error()) {
|
|
|
+ dbgln_if(JOB_DEBUG, "Job {} could not figure out whether we could read a line", m_request.url());
|
|
|
+ return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!can_read_line.value()) {
|
|
|
dbgln_if(JOB_DEBUG, "Job {} cannot read line", m_request.url());
|
|
|
- auto buf = receive(64);
|
|
|
- dbgln_if(JOB_DEBUG, "{} bytes was read", buf.bytes().size());
|
|
|
+ auto maybe_buf = receive(64);
|
|
|
+ if (maybe_buf.is_error()) {
|
|
|
+ dbgln_if(JOB_DEBUG, "Job {} cannot read any bytes!", m_request.url());
|
|
|
+ return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
|
|
|
+ }
|
|
|
+
|
|
|
+ dbgln_if(JOB_DEBUG, "{} bytes was read", maybe_buf.value().bytes().size());
|
|
|
return;
|
|
|
}
|
|
|
- auto line = read_line(PAGE_SIZE);
|
|
|
+ auto maybe_line = read_line(PAGE_SIZE);
|
|
|
+ if (maybe_line.is_error()) {
|
|
|
+ dbgln_if(JOB_DEBUG, "Job {} could not read line: {}", m_request.url(), maybe_line.error());
|
|
|
+ return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
|
|
|
+ }
|
|
|
+
|
|
|
+ auto line = maybe_line.release_value();
|
|
|
+
|
|
|
dbgln_if(JOB_DEBUG, "Job {} read line of length {}", m_request.url(), line.length());
|
|
|
if (line.is_null()) {
|
|
|
dbgln("Job: Expected HTTP status");
|
|
@@ -238,16 +255,34 @@ void Job::on_socket_connected()
|
|
|
}
|
|
|
m_code = code.value();
|
|
|
m_state = State::InHeaders;
|
|
|
- if (!MUST(m_socket->can_read_without_blocking()))
|
|
|
+
|
|
|
+ auto can_read_without_blocking = m_socket->can_read_without_blocking();
|
|
|
+ if (can_read_without_blocking.is_error())
|
|
|
+ return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
|
|
|
+
|
|
|
+ if (!can_read_without_blocking.value())
|
|
|
return;
|
|
|
}
|
|
|
while (m_state == State::InHeaders || m_state == State::Trailers) {
|
|
|
- if (!MUST(m_socket->can_read_line())) {
|
|
|
+ auto can_read_line = m_socket->can_read_line();
|
|
|
+ if (can_read_line.is_error()) {
|
|
|
+ dbgln_if(JOB_DEBUG, "Job {} could not figure out whether we could read a line", m_request.url());
|
|
|
+ return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!can_read_line.value()) {
|
|
|
dbgln_if(JOB_DEBUG, "Can't read lines anymore :(");
|
|
|
return;
|
|
|
}
|
|
|
+
|
|
|
// There's no max limit defined on headers, but for our sanity, let's limit it to 32K.
|
|
|
- auto line = read_line(32 * KiB);
|
|
|
+ auto maybe_line = read_line(32 * KiB);
|
|
|
+ if (maybe_line.is_error()) {
|
|
|
+ dbgln_if(JOB_DEBUG, "Job {} could not read a header line: {}", m_request.url(), maybe_line.error());
|
|
|
+ return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
|
|
|
+ }
|
|
|
+ auto line = maybe_line.release_value();
|
|
|
+
|
|
|
if (line.is_null()) {
|
|
|
if (m_state == State::Trailers) {
|
|
|
// Some servers like to send two ending chunks
|
|
@@ -276,7 +311,12 @@ void Job::on_socket_connected()
|
|
|
if (result.value() == 0 && !m_headers.get("Transfer-Encoding"sv).value_or(""sv).view().trim_whitespace().equals_ignoring_case("chunked"sv))
|
|
|
return finish_up();
|
|
|
}
|
|
|
- if (!MUST(m_socket->can_read_line()))
|
|
|
+
|
|
|
+ can_read_line = m_socket->can_read_line();
|
|
|
+ if (can_read_line.is_error())
|
|
|
+ return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
|
|
|
+
|
|
|
+ if (!can_read_line.value())
|
|
|
return;
|
|
|
break;
|
|
|
}
|
|
@@ -306,7 +346,11 @@ void Job::on_socket_connected()
|
|
|
if (name.equals_ignoring_case("Set-Cookie")) {
|
|
|
dbgln_if(JOB_DEBUG, "Job: Received Set-Cookie header: '{}'", value);
|
|
|
m_set_cookie_headers.append(move(value));
|
|
|
- if (!MUST(m_socket->can_read_without_blocking()))
|
|
|
+
|
|
|
+ auto can_read_without_blocking = m_socket->can_read_without_blocking();
|
|
|
+ if (can_read_without_blocking.is_error())
|
|
|
+ return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
|
|
|
+ if (!can_read_without_blocking.value())
|
|
|
return;
|
|
|
} else if (auto existing_value = m_headers.get(name); existing_value.has_value()) {
|
|
|
StringBuilder builder;
|
|
@@ -327,23 +371,42 @@ void Job::on_socket_connected()
|
|
|
m_content_length = length.value();
|
|
|
}
|
|
|
dbgln_if(JOB_DEBUG, "Job: [{}] = '{}'", name, value);
|
|
|
- if (!MUST(m_socket->can_read_without_blocking())) {
|
|
|
+
|
|
|
+ auto can_read_without_blocking = m_socket->can_read_without_blocking();
|
|
|
+ if (can_read_without_blocking.is_error())
|
|
|
+ return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
|
|
|
+ if (!can_read_without_blocking.value()) {
|
|
|
dbgln_if(JOB_DEBUG, "Can't read headers anymore, byebye :(");
|
|
|
return;
|
|
|
}
|
|
|
}
|
|
|
VERIFY(m_state == State::InBody);
|
|
|
- if (!MUST(m_socket->can_read_without_blocking()))
|
|
|
+
|
|
|
+ auto can_read_without_blocking = m_socket->can_read_without_blocking();
|
|
|
+ if (can_read_without_blocking.is_error())
|
|
|
+ return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
|
|
|
+ if (!can_read_without_blocking.value())
|
|
|
return;
|
|
|
|
|
|
- while (MUST(m_socket->can_read_without_blocking())) {
|
|
|
+ while (true) {
|
|
|
+ auto can_read_without_blocking = m_socket->can_read_without_blocking();
|
|
|
+ if (can_read_without_blocking.is_error())
|
|
|
+ return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
|
|
|
+ if (!can_read_without_blocking.value())
|
|
|
+ break;
|
|
|
+
|
|
|
auto read_size = 64 * KiB;
|
|
|
if (m_current_chunk_remaining_size.has_value()) {
|
|
|
read_chunk_size:;
|
|
|
auto remaining = m_current_chunk_remaining_size.value();
|
|
|
if (remaining == -1) {
|
|
|
// read size
|
|
|
- auto size_data = read_line(PAGE_SIZE);
|
|
|
+ auto maybe_size_data = read_line(PAGE_SIZE);
|
|
|
+ if (maybe_size_data.is_error()) {
|
|
|
+ dbgln_if(JOB_DEBUG, "Job: Could not receive chunk: {}", maybe_size_data.error());
|
|
|
+ }
|
|
|
+ auto size_data = maybe_size_data.release_value();
|
|
|
+
|
|
|
if (m_should_read_chunk_ending_line) {
|
|
|
VERIFY(size_data.is_empty());
|
|
|
m_should_read_chunk_ending_line = false;
|
|
@@ -410,11 +473,21 @@ void Job::on_socket_connected()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (!MUST(m_socket->can_read_without_blocking()))
|
|
|
+ can_read_without_blocking = m_socket->can_read_without_blocking();
|
|
|
+ if (can_read_without_blocking.is_error())
|
|
|
+ return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
|
|
|
+ if (!can_read_without_blocking.value())
|
|
|
break;
|
|
|
|
|
|
dbgln_if(JOB_DEBUG, "Waiting for payload for {}", m_request.url());
|
|
|
- auto payload = receive(read_size);
|
|
|
+ auto maybe_payload = receive(read_size);
|
|
|
+ if (maybe_payload.is_error()) {
|
|
|
+ dbgln_if(JOB_DEBUG, "Could not read the payload: {}", maybe_payload.error());
|
|
|
+ return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
|
|
|
+ }
|
|
|
+
|
|
|
+ auto payload = maybe_payload.release_value();
|
|
|
+
|
|
|
if (payload.is_empty() && m_socket->is_eof()) {
|
|
|
finish_up();
|
|
|
break;
|
|
@@ -456,9 +529,17 @@ void Job::on_socket_connected()
|
|
|
|
|
|
// we've read everything, now let's get the next chunk
|
|
|
size = -1;
|
|
|
- if (MUST(m_socket->can_read_line())) {
|
|
|
- auto line = read_line(PAGE_SIZE);
|
|
|
- VERIFY(line.is_empty());
|
|
|
+
|
|
|
+ auto can_read_line = m_socket->can_read_line();
|
|
|
+ if (can_read_line.is_error())
|
|
|
+ return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
|
|
|
+ if (can_read_line.value()) {
|
|
|
+ auto maybe_line = read_line(PAGE_SIZE);
|
|
|
+ if (maybe_line.is_error()) {
|
|
|
+ return deferred_invoke([this] { did_fail(Core::NetworkJob::Error::TransmissionFailed); });
|
|
|
+ }
|
|
|
+
|
|
|
+ VERIFY(maybe_line.value().is_empty());
|
|
|
} else {
|
|
|
m_should_read_chunk_ending_line = true;
|
|
|
}
|
|
@@ -487,7 +568,11 @@ void Job::finish_up()
|
|
|
VERIFY(!m_has_scheduled_finish);
|
|
|
m_state = State::Finished;
|
|
|
if (!m_can_stream_response) {
|
|
|
- auto flattened_buffer = ByteBuffer::create_uninitialized(m_buffered_size).release_value_but_fixme_should_propagate_errors(); // FIXME: Handle possible OOM situation.
|
|
|
+ auto maybe_flattened_buffer = ByteBuffer::create_uninitialized(m_buffered_size);
|
|
|
+ if (maybe_flattened_buffer.is_error())
|
|
|
+ return did_fail(Core::NetworkJob::Error::TransmissionFailed);
|
|
|
+ auto flattened_buffer = maybe_flattened_buffer.release_value();
|
|
|
+
|
|
|
u8* flat_ptr = flattened_buffer.data();
|
|
|
for (auto& received_buffer : m_received_buffers) {
|
|
|
memcpy(flat_ptr, received_buffer.data(), received_buffer.size());
|