From 54eca45e5de484342f0bd07d16adfdd38819d471 Mon Sep 17 00:00:00 2001 From: brodo Date: Fri, 1 Sep 2023 11:13:39 +0200 Subject: [PATCH] Fix pipelined requests handling --- server_http.hpp | 63 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 44 insertions(+), 19 deletions(-) diff --git a/server_http.hpp b/server_http.hpp index 1db0368..2e94be8 100644 --- a/server_http.hpp +++ b/server_http.hpp @@ -228,10 +228,16 @@ namespace SimpleWeb { friend class Session; asio::streambuf streambuf; + asio::streambuf content_streambuf; std::weak_ptr connection; std::string optimization = std::to_string(0); // TODO: figure out what goes wrong in gcc optimization without this line - Request(std::size_t max_request_streambuf_size, const std::shared_ptr &connection_) noexcept : streambuf(max_request_streambuf_size), connection(connection_), content(streambuf) {} + Request(std::size_t max_request_streambuf_size, const std::shared_ptr &connection_) noexcept + : streambuf(max_request_streambuf_size), content_streambuf(max_request_streambuf_size), connection(connection_), content(content_streambuf) {} + Request(std::size_t max_request_streambuf_size, const std::shared_ptr &connection_, asio::streambuf &&pending) noexcept + : streambuf(max_request_streambuf_size), content_streambuf(max_request_streambuf_size), connection(connection_), content(content_streambuf) { + std::ostream(&streambuf) << &pending; + } public: std::string method, path, query_string, http_version; @@ -348,6 +354,8 @@ namespace SimpleWeb { class Session { public: Session(std::size_t max_request_streambuf_size, std::shared_ptr connection_) noexcept : connection(std::move(connection_)), request(new Request(max_request_streambuf_size, connection)) {} + Session(std::size_t max_request_streambuf_size, std::shared_ptr connection_, asio::streambuf &&s) noexcept + : connection(std::move(connection_)), request(new Request(max_request_streambuf_size, connection, std::move(s))) {} std::shared_ptr connection; std::shared_ptr request; @@ -572,7 +580,8 @@ namespace SimpleWeb { // streambuf (maybe some bytes of the content) is appended to in the async_read-function below (for retrieving content). std::size_t num_additional_bytes = session->request->streambuf.size() - bytes_transferred; - if(!RequestMessage::parse(session->request->content, session->request->method, session->request->path, + std::istream istream(&session->request->streambuf); + if(!RequestMessage::parse(istream, session->request->method, session->request->path, session->request->query_string, session->request->http_version, session->request->header)) { if(this->on_error) this->on_error(session->request, make_error_code::make_error_code(errc::protocol_error)); @@ -598,8 +607,17 @@ namespace SimpleWeb { this->on_error(session->request, make_error_code::make_error_code(errc::message_size)); return; } + + if(num_additional_bytes > 0) { + auto content_bytes = std::min(content_length, (unsigned long long)num_additional_bytes); + auto &source = session->request->streambuf; + auto &target = session->request->content_streambuf; + target.commit(asio::buffer_copy(target.prepare(content_bytes), source.data(), content_bytes)); + source.consume(content_bytes); + } + if(content_length > num_additional_bytes) { - asio::async_read(*session->connection->socket, session->request->streambuf, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, std::size_t /*bytes_transferred*/) { + asio::async_read(*session->connection->socket, session->request->content_streambuf, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, std::size_t /*bytes_transferred*/) { auto lock = session->connection->handler_runner->continue_lock(); if(!lock) return; @@ -610,8 +628,9 @@ namespace SimpleWeb { this->on_error(session->request, ec); }); } - else + else { this->find_resource(session); + } } else if((header_it = session->request->header.find("Transfer-Encoding")) != session->request->header.end() && header_it->second == "chunked") { // Expect hex number to not exceed 16 bytes (64-bit number), but take into account previous additional read bytes @@ -654,12 +673,7 @@ namespace SimpleWeb { return; } - if(chunk_size == 0) { - this->find_resource(session); - return; - } - - if(chunk_size + session->request->streambuf.size() > session->request->streambuf.max_size()) { + if(chunk_size + session->request->content_streambuf.size() > session->request->content_streambuf.max_size()) { auto response = std::shared_ptr(new Response(session, this->config.timeout_content)); response->write(StatusCode::client_error_payload_too_large); if(this->on_error) @@ -667,19 +681,30 @@ namespace SimpleWeb { return; } + auto next_step = [this, chunk_size](const std::shared_ptr &session, const std::shared_ptr &chunk_size_streambuf) { + if(chunk_size == 0) { + // remaining bytes are the beginning of the next request + std::istream(chunk_size_streambuf.get()) >> &session->request->streambuf; + this->find_resource(session); + } + else { + this->read_chunked_transfer_encoded(session, chunk_size_streambuf); + } + }; + auto num_additional_bytes = chunk_size_streambuf->size() - bytes_transferred; auto bytes_to_move = std::min(chunk_size, num_additional_bytes); if(bytes_to_move > 0) { // Move leftover bytes auto &source = *chunk_size_streambuf; - auto &target = session->request->streambuf; + auto &target = session->request->content_streambuf; target.commit(asio::buffer_copy(target.prepare(bytes_to_move), source.data(), bytes_to_move)); source.consume(bytes_to_move); } if(chunk_size > num_additional_bytes) { - asio::async_read(*session->connection->socket, session->request->streambuf, asio::transfer_exactly(chunk_size - num_additional_bytes), [this, session, chunk_size_streambuf](const error_code &ec, size_t /*bytes_transferred*/) { + asio::async_read(*session->connection->socket, session->request->content_streambuf, asio::transfer_exactly(chunk_size - num_additional_bytes), [this, session, chunk_size_streambuf, next_step](const error_code &ec, size_t /*bytes_transferred*/) { auto lock = session->connection->handler_runner->continue_lock(); if(!lock) return; @@ -687,12 +712,12 @@ namespace SimpleWeb { if(!ec) { // Remove "\r\n" auto null_buffer = std::make_shared(2); - asio::async_read(*session->connection->socket, *null_buffer, asio::transfer_exactly(2), [this, session, chunk_size_streambuf, null_buffer](const error_code &ec, size_t /*bytes_transferred*/) { + asio::async_read(*session->connection->socket, *null_buffer, asio::transfer_exactly(2), [this, session, chunk_size_streambuf, null_buffer, next_step](const error_code &ec, size_t /*bytes_transferred*/) { auto lock = session->connection->handler_runner->continue_lock(); if(!lock) return; if(!ec) - read_chunked_transfer_encoded(session, chunk_size_streambuf); + next_step(session, chunk_size_streambuf); else this->on_error(session->request, ec); }); @@ -706,12 +731,12 @@ namespace SimpleWeb { if(2 + chunk_size - num_additional_bytes == 1) istream.get(); auto null_buffer = std::make_shared(2); - asio::async_read(*session->connection->socket, *null_buffer, asio::transfer_exactly(2 + chunk_size - num_additional_bytes), [this, session, chunk_size_streambuf, null_buffer](const error_code &ec, size_t /*bytes_transferred*/) { + asio::async_read(*session->connection->socket, *null_buffer, asio::transfer_exactly(2 + chunk_size - num_additional_bytes), [this, session, chunk_size_streambuf, null_buffer, next_step](const error_code &ec, size_t /*bytes_transferred*/) { auto lock = session->connection->handler_runner->continue_lock(); if(!lock) return; if(!ec) - read_chunked_transfer_encoded(session, chunk_size_streambuf); + next_step(session, chunk_size_streambuf); else this->on_error(session->request, ec); }); @@ -721,7 +746,7 @@ namespace SimpleWeb { istream.get(); istream.get(); - read_chunked_transfer_encoded(session, chunk_size_streambuf); + next_step(session, chunk_size_streambuf); } } else if(this->on_error) @@ -778,13 +803,13 @@ namespace SimpleWeb { if(case_insensitive_equal(it->second, "close")) return; else if(case_insensitive_equal(it->second, "keep-alive")) { - auto new_session = std::make_shared(this->config.max_request_streambuf_size, response->session->connection); + auto new_session = std::make_shared(this->config.max_request_streambuf_size, response->session->connection, std::move(response->session->request->streambuf)); this->read(new_session); return; } } if(response->session->request->http_version >= "1.1") { - auto new_session = std::make_shared(this->config.max_request_streambuf_size, response->session->connection); + auto new_session = std::make_shared(this->config.max_request_streambuf_size, response->session->connection, std::move(response->session->request->streambuf)); this->read(new_session); return; }