From 6b91839cb603d3e480b75e396446c56981ef62f5 Mon Sep 17 00:00:00 2001 From: Alex Fraser Date: Fri, 18 May 2018 15:01:01 +1000 Subject: [PATCH 1/2] Allow calling Response::send multiple times: copy the buffer before async_write --- server_http.hpp | 10 +++++++++- tests/io_test.cpp | 23 +++++++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/server_http.hpp b/server_http.hpp index 235dcb8..a62a542 100644 --- a/server_http.hpp +++ b/server_http.hpp @@ -89,9 +89,17 @@ namespace SimpleWeb { /// Use this function if you need to recursively send parts of a longer message void send(const std::function &callback = nullptr) noexcept { + // Take a snapshot of the stream buffer. Otherwise async_write may not + // have consumed the buffer before the next call, in which case the + // data from the first call may be sent twice. Use a captured shared + // pointer to keep the snapshot alive during the async call. + auto stream_snapshot = std::make_shared(); + std::ostream snapshot_writer(stream_snapshot.get()); + snapshot_writer << &streambuf; + session->connection->set_timeout(timeout_content); auto self = this->shared_from_this(); // Keep Response instance alive through the following async_write - asio::async_write(*session->connection->socket, streambuf, [self, callback](const error_code &ec, std::size_t /*bytes_transferred*/) { + asio::async_write(*session->connection->socket, *stream_snapshot, [self, callback, stream_snapshot](const error_code &ec, std::size_t /*bytes_transferred*/) { self->session->connection->cancel_timeout(); auto lock = self->session->connection->handler_runner->continue_lock(); if(!lock) diff --git a/tests/io_test.cpp b/tests/io_test.cpp index 66b0e39..106d4df 100644 --- a/tests/io_test.cpp +++ b/tests/io_test.cpp @@ -69,6 +69,20 @@ int main() { assert(request->remote_endpoint_port() != 0); }; + server.resource["^/string/dup$"]["POST"] = [](shared_ptr response, shared_ptr request) { + auto content = request->content.string(); + + // Send content twice, before it has a chance to be written to the socket. + *response << "HTTP/1.1 200 OK\r\nContent-Length: " << (content.length() * 2) << "\r\n\r\n" + << content; + response->send(); + *response << content; + response->send(); + + assert(!request->remote_endpoint_address().empty()); + assert(request->remote_endpoint_port() != 0); + }; + server.resource["^/string2$"]["POST"] = [](shared_ptr response, shared_ptr request) { response->write(request->content.string()); }; @@ -202,6 +216,15 @@ int main() { assert(output.str() == "A string"); } + { + // Test rapid calls to Response::send + stringstream output; + stringstream content("A string\n"); + auto r = client.request("POST", "/string/dup", content); + output << r->content.rdbuf(); + assert(output.str() == "A string\nA string\n"); + } + { stringstream output; auto r = client.request("GET", "/info", "", {{"Test Parameter", "test value"}}); From cc234506b181af17125faa97e9caa6202695082d Mon Sep 17 00:00:00 2001 From: eidheim Date: Sun, 3 Jun 2018 12:07:02 +0200 Subject: [PATCH 2/2] Completed support for multiple sequential Server::Response::send operations --- server_http.hpp | 73 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 51 insertions(+), 22 deletions(-) diff --git a/server_http.hpp b/server_http.hpp index a62a542..1032766 100644 --- a/server_http.hpp +++ b/server_http.hpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -57,12 +58,17 @@ namespace SimpleWeb { friend class ServerBase; friend class Server; - asio::streambuf streambuf; + std::unique_ptr streambuf = std::unique_ptr(new asio::streambuf()); std::shared_ptr session; long timeout_content; - Response(std::shared_ptr session, long timeout_content) noexcept : std::ostream(&streambuf), session(std::move(session)), timeout_content(timeout_content) {} + asio::io_service::strand strand; + std::list, std::function>> send_queue; + + Response(std::shared_ptr session_, long timeout_content) noexcept : std::ostream(nullptr), session(std::move(session_)), timeout_content(timeout_content), strand(session->connection->socket->get_io_service()) { + rdbuf(streambuf.get()); + } template void write_header(const CaseInsensitiveMultimap &header, size_type size) { @@ -82,24 +88,31 @@ namespace SimpleWeb { *this << "\r\n"; } - public: - std::size_t size() noexcept { - return streambuf.size(); + void send_from_queue() { + auto self = this->shared_from_this(); + strand.post([self]() { + asio::async_write(*self->session->connection->socket, *self->send_queue.begin()->first, self->strand.wrap([self](const error_code &ec, std::size_t /*bytes_transferred*/) { + auto lock = self->session->connection->handler_runner->continue_lock(); + if(!lock) + return; + auto it = self->send_queue.begin(); + if(it->second) + it->second(ec); + if(!ec) { + self->send_queue.erase(it); + if(self->send_queue.size() > 0) + self->send_from_queue(); + } + else + self->send_queue.clear(); + })); + }); } - /// Use this function if you need to recursively send parts of a longer message - void send(const std::function &callback = nullptr) noexcept { - // Take a snapshot of the stream buffer. Otherwise async_write may not - // have consumed the buffer before the next call, in which case the - // data from the first call may be sent twice. Use a captured shared - // pointer to keep the snapshot alive during the async call. - auto stream_snapshot = std::make_shared(); - std::ostream snapshot_writer(stream_snapshot.get()); - snapshot_writer << &streambuf; - + void send_on_delete(const std::function &callback = nullptr) noexcept { session->connection->set_timeout(timeout_content); auto self = this->shared_from_this(); // Keep Response instance alive through the following async_write - asio::async_write(*session->connection->socket, *stream_snapshot, [self, callback, stream_snapshot](const error_code &ec, std::size_t /*bytes_transferred*/) { + asio::async_write(*session->connection->socket, *streambuf, [self, callback](const error_code &ec, std::size_t /*bytes_transferred*/) { self->session->connection->cancel_timeout(); auto lock = self->session->connection->handler_runner->continue_lock(); if(!lock) @@ -109,6 +122,27 @@ namespace SimpleWeb { }); } + public: + std::size_t size() noexcept { + return streambuf->size(); + } + + /// Use this function if you need to recursively send parts of a longer message, or when using server-sent events (SSE). + void send(const std::function &callback = nullptr) noexcept { + session->connection->set_timeout(timeout_content); + + std::shared_ptr streambuf = std::move(this->streambuf); + this->streambuf = std::unique_ptr(new asio::streambuf()); + rdbuf(this->streambuf.get()); + + auto self = this->shared_from_this(); + strand.post([self, streambuf, callback]() { + self->send_queue.emplace_back(streambuf, callback); + if(self->send_queue.size() == 1) + self->send_from_queue(); + }); + } + /// Write directly to stream buffer using std::ostream::write void write(const char_type *ptr, std::streamsize n) { std::ostream::write(ptr, n); @@ -473,7 +507,6 @@ namespace SimpleWeb { if((!ec || ec == asio::error::not_found) && session->request->streambuf.size() == session->request->streambuf.max_size()) { auto response = std::shared_ptr(new Response(session, this->config.timeout_content)); response->write(StatusCode::client_error_payload_too_large); - response->send(); if(this->on_error) this->on_error(session->request, make_error_code::make_error_code(errc::message_size)); return; @@ -515,7 +548,6 @@ namespace SimpleWeb { if(session->request->streambuf.size() == session->request->streambuf.max_size()) { auto response = std::shared_ptr(new Response(session, this->config.timeout_content)); response->write(StatusCode::client_error_payload_too_large); - response->send(); if(this->on_error) this->on_error(session->request, make_error_code::make_error_code(errc::message_size)); return; @@ -551,7 +583,6 @@ namespace SimpleWeb { if((!ec || ec == asio::error::not_found) && session->request->streambuf.size() == session->request->streambuf.max_size()) { auto response = std::shared_ptr(new Response(session, this->config.timeout_content)); response->write(StatusCode::client_error_payload_too_large); - response->send(); if(this->on_error) this->on_error(session->request, make_error_code::make_error_code(errc::message_size)); return; @@ -584,7 +615,6 @@ namespace SimpleWeb { if(session->request->streambuf.size() == session->request->streambuf.max_size()) { auto response = std::shared_ptr(new Response(session, this->config.timeout_content)); response->write(StatusCode::client_error_payload_too_large); - response->send(); if(this->on_error) this->on_error(session->request, make_error_code::make_error_code(errc::message_size)); return; @@ -612,7 +642,6 @@ namespace SimpleWeb { if(chunks_streambuf->size() == chunks_streambuf->max_size()) { auto response = std::shared_ptr(new Response(session, this->config.timeout_content)); response->write(StatusCode::client_error_payload_too_large); - response->send(); if(this->on_error) this->on_error(session->request, make_error_code::make_error_code(errc::message_size)); return; @@ -673,7 +702,7 @@ namespace SimpleWeb { session->connection->set_timeout(config.timeout_content); auto response = std::shared_ptr(new Response(session, config.timeout_content), [this](Response *response_ptr) { auto response = std::shared_ptr(response_ptr); - response->send([this, response](const error_code &ec) { + response->send_on_delete([this, response](const error_code &ec) { if(!ec) { if(response->close_connection_after_response) return;