diff --git a/asio_compatibility.hpp b/asio_compatibility.hpp index 5a7007d..cbb16a0 100644 --- a/asio_compatibility.hpp +++ b/asio_compatibility.hpp @@ -31,6 +31,7 @@ namespace SimpleWeb { using io_context = asio::io_context; using resolver_results = asio::ip::tcp::resolver::results_type; using async_connect_endpoint = asio::ip::tcp::endpoint; + using strand = asio::strand; template inline void post(io_context &context, handler_type &&handler) { @@ -43,25 +44,38 @@ namespace SimpleWeb { return asio::ip::make_address(str); } template - std::unique_ptr make_steady_timer(socket_type &socket, std::chrono::duration duration) { + inline std::unique_ptr make_steady_timer(socket_type &socket, std::chrono::duration duration) { return std::unique_ptr(new asio::steady_timer(socket.get_executor(), duration)); } template - void async_resolve(asio::ip::tcp::resolver &resolver, const std::pair &host_port, handler_type &&handler) { + inline void async_resolve(asio::ip::tcp::resolver &resolver, const std::pair &host_port, handler_type &&handler) { resolver.async_resolve(host_port.first, host_port.second, std::forward(handler)); } inline asio::executor_work_guard make_work_guard(io_context &context) { return asio::make_work_guard(context); } + template + inline asio::basic_socket::executor_type get_executor(socket_type &socket) { + return socket.get_executor(); + } + template + inline asio::executor_binder::type, typename execution_context::executor_type> bind_executor(strand &strand, handler_type &&handler) { + return asio::bind_executor(strand, std::forward(handler)); + } #else using io_context = asio::io_service; using resolver_results = asio::ip::tcp::resolver::iterator; using async_connect_endpoint = asio::ip::tcp::resolver::iterator; + using strand = asio::io_service::strand; template inline void post(io_context &context, handler_type &&handler) { context.post(std::forward(handler)); } + template + inline void post(strand &strand, handler_type &&handler) { + strand.post(std::forward(handler)); + } inline void restart(io_context &context) noexcept { context.reset(); } @@ -69,16 +83,24 @@ namespace SimpleWeb { return asio::ip::address::from_string(str); } template - std::unique_ptr make_steady_timer(socket_type &socket, std::chrono::duration duration) { + inline std::unique_ptr make_steady_timer(socket_type &socket, std::chrono::duration duration) { return std::unique_ptr(new asio::steady_timer(socket.get_io_service(), duration)); } template - void async_resolve(asio::ip::tcp::resolver &resolver, const std::pair &host_port, handler_type &&handler) { + inline void async_resolve(asio::ip::tcp::resolver &resolver, const std::pair &host_port, handler_type &&handler) { resolver.async_resolve(asio::ip::tcp::resolver::query(host_port.first, host_port.second), std::forward(handler)); } inline io_context::work make_work_guard(io_context &context) { return io_context::work(context); } + template + inline io_context &get_executor(socket_type &socket) { + return socket.get_io_service(); + } + template + inline asio::detail::wrapped_handler bind_executor(strand &strand, handler_type &&handler) { + return strand.wrap(std::forward(handler)); + } #endif } // namespace SimpleWeb diff --git a/server_http.hpp b/server_http.hpp index 5729a4f..c07543d 100644 --- a/server_http.hpp +++ b/server_http.hpp @@ -76,49 +76,61 @@ namespace SimpleWeb { } void send_from_queue() REQUIRES(send_queue_mutex) { + auto buffer = send_queue.begin()->first->data(); auto self = this->shared_from_this(); - asio::async_write(*self->session->connection->socket, *send_queue.begin()->first, [self](const error_code &ec, std::size_t /*bytes_transferred*/) { + post(session->connection->read_write_strand, [self, buffer] { auto lock = self->session->connection->handler_runner->continue_lock(); if(!lock) return; - { - LockGuard lock(self->send_queue_mutex); - if(!ec) { - auto it = self->send_queue.begin(); - auto callback = std::move(it->second); - self->send_queue.erase(it); - if(self->send_queue.size() > 0) - self->send_from_queue(); + asio::async_write(*self->session->connection->socket, buffer, [self](const error_code &ec, std::size_t /*bytes_transferred*/) { + auto lock = self->session->connection->handler_runner->continue_lock(); + if(!lock) + return; + { + LockGuard lock(self->send_queue_mutex); + if(!ec) { + auto it = self->send_queue.begin(); + auto callback = std::move(it->second); + self->send_queue.erase(it); + if(self->send_queue.size() > 0) + self->send_from_queue(); - lock.unlock(); - if(callback) - callback(ec); - } - else { - // All handlers in the queue is called with ec: - std::vector> callbacks; - for(auto &pair : self->send_queue) { - if(pair.second) - callbacks.emplace_back(std::move(pair.second)); + lock.unlock(); + if(callback) + callback(ec); } - self->send_queue.clear(); + else { + // All handlers in the queue is called with ec: + std::vector> callbacks; + for(auto &pair : self->send_queue) { + if(pair.second) + callbacks.emplace_back(std::move(pair.second)); + } + self->send_queue.clear(); - lock.unlock(); - for(auto &callback : callbacks) - callback(ec); + lock.unlock(); + for(auto &callback : callbacks) + callback(ec); + } } - } + }); }); } void send_on_delete(const std::function &callback = nullptr) noexcept { + auto buffer = streambuf->data(); auto self = this->shared_from_this(); // Keep Response instance alive through the following async_write - asio::async_write(*session->connection->socket, *streambuf, [self, callback](const error_code &ec, std::size_t /*bytes_transferred*/) { + post(session->connection->read_write_strand, [self, buffer, callback] { auto lock = self->session->connection->handler_runner->continue_lock(); if(!lock) return; - if(callback) - callback(ec); + asio::async_write(*self->session->connection->socket, buffer, [self, callback](const error_code &ec, std::size_t /*bytes_transferred*/) { + auto lock = self->session->connection->handler_runner->continue_lock(); + if(!lock) + return; + if(callback) + callback(ec); + }); }); } @@ -286,12 +298,18 @@ namespace SimpleWeb { class Connection : public std::enable_shared_from_this { public: template - Connection(std::shared_ptr handler_runner_, Args &&...args) noexcept : handler_runner(std::move(handler_runner_)), socket(new socket_type(std::forward(args)...)) {} + Connection(std::shared_ptr handler_runner_, Args &&...args) noexcept : handler_runner(std::move(handler_runner_)), socket(new socket_type(std::forward(args)...)), read_write_strand(get_executor(socket->lowest_layer())) {} std::shared_ptr handler_runner; std::unique_ptr socket; // Socket must be unique_ptr since asio::ssl::stream is not movable + /** + * Needed for TLS communication where async_write could be called outside of the io_context runners. + * For more information see https://stackoverflow.com/a/12801042. + */ + strand read_write_strand; + std::unique_ptr timer; void close() noexcept {