From 571fe8b0ad11b9f3508a7dab0ddcc0c51b52e818 Mon Sep 17 00:00:00 2001 From: Jens Moeller Date: Tue, 18 Feb 2020 13:05:43 -0600 Subject: [PATCH] Use futures in synchronous request() methods --- client_http.hpp | 124 ++++++++++++++++++++++-------------------------- 1 file changed, 56 insertions(+), 68 deletions(-) diff --git a/client_http.hpp b/client_http.hpp index 7cefe01..7c0b18c 100644 --- a/client_http.hpp +++ b/client_http.hpp @@ -3,6 +3,7 @@ #include "asio_compatibility.hpp" #include "mutex.hpp" +#include #include "utility.hpp" #include #include @@ -213,92 +214,72 @@ namespace SimpleWeb { /// Convenience function to perform synchronous request. The io_service is run within this function. /// If you reuse the io_service for other tasks, use the asynchronous request functions instead. - /// Do not use concurrently with the asynchronous request functions. /// When requesting Server-Sent Events: will throw on error::eof, please use asynchronous request functions instead. std::shared_ptr request(const std::string &method, const std::string &path = {"/"}, string_view content = {}, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) { + std::promise> response_promise; + std::future> response_future = response_promise.get_future(); std::shared_ptr response; error_code ec; - request(method, path, content, header, [&response, &ec](std::shared_ptr response_, const error_code &ec_) { + request(method, path, content, header, [&response_promise, response, ec](std::shared_ptr response_, const error_code &ec_) mutable { + if(ec_ && !ec) + ec = ec_; if(!response) response = response_; - else { + else if(!ec) { if(response_->streambuf.size() + response->streambuf.size() > response->streambuf.max_size()) - throw make_error_code::make_error_code(errc::message_size); - // Move partial response_ content to response: - auto &source = response_->streambuf; - auto &target = response->streambuf; - target.commit(asio::buffer_copy(target.prepare(source.size()), source.data())); - source.consume(source.size()); + ec = make_error_code::make_error_code(errc::message_size); + else { + // Move partial response_ content to response: + auto &source = response_->streambuf; + auto &target = response->streambuf; + target.commit(asio::buffer_copy(target.prepare(source.size()), source.data())); + source.consume(source.size()); + } + } + if(response_->content.end) { + if(!ec) + response_promise.set_value(response); + else + response_promise.set_exception(std::make_exception_ptr(system_error(ec))); } - ec = ec_; }); - { - LockGuard lock(concurrent_synchronous_requests_mutex); - ++concurrent_synchronous_requests; - } - try { - io_service->run(); - } - catch(const error_code &ec_) { - ec = ec_; - } - { - LockGuard lock(concurrent_synchronous_requests_mutex); - --concurrent_synchronous_requests; - if(!concurrent_synchronous_requests) - restart(*io_service); - } - - if(ec) - throw system_error(ec); - - return response; + return response_future.get(); } /// Convenience function to perform synchronous request. The io_service is run within this function. /// If you reuse the io_service for other tasks, use the asynchronous request functions instead. - /// Do not use concurrently with the asynchronous request functions. /// When requesting Server-Sent Events: will throw on error::eof, please use asynchronous request functions instead. std::shared_ptr request(const std::string &method, const std::string &path, std::istream &content, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) { + std::promise> response_promise; + std::future> response_future = response_promise.get_future(); std::shared_ptr response; error_code ec; - request(method, path, content, header, [&response, &ec](std::shared_ptr response_, const error_code &ec_) { + request(method, path, content, header, [&response_promise, response, ec](std::shared_ptr response_, const error_code &ec_) mutable { + if(ec_ && !ec) + ec = ec_; if(!response) response = response_; - else { + else if(!ec) { if(response_->streambuf.size() + response->streambuf.size() > response->streambuf.max_size()) - throw make_error_code::make_error_code(errc::message_size); - // Move partial response_ content to response: - auto &source = response_->streambuf; - auto &target = response->streambuf; - target.commit(asio::buffer_copy(target.prepare(source.size()), source.data())); - source.consume(source.size()); + ec = make_error_code::make_error_code(errc::message_size); + else { + // Move partial response_ content to response: + auto &source = response_->streambuf; + auto &target = response->streambuf; + target.commit(asio::buffer_copy(target.prepare(source.size()), source.data())); + source.consume(source.size()); + } + } + if(response_->content.end) { + if(!ec) + response_promise.set_value(response); + else + response_promise.set_exception(std::make_exception_ptr(system_error(ec))); } - ec = ec_; }); - { - LockGuard lock(concurrent_synchronous_requests_mutex); - ++concurrent_synchronous_requests; - } - try { - io_service->run(); - } - catch(const error_code &ec_) { - ec = ec_; - } - { - LockGuard lock(concurrent_synchronous_requests_mutex); - --concurrent_synchronous_requests; - if(!concurrent_synchronous_requests) - restart(*io_service); - } - - if(ec) - throw system_error(ec); - - return response; + return response_future.get(); } /// Asynchronous request where running Client's io_service is required. @@ -451,6 +432,12 @@ namespace SimpleWeb { (*it)->close(); it = connections.erase(it); } + if(internal_io_service_handler_thread) { + io_service->stop(); + internal_io_service_handler_thread->join(); + internal_io_service_handler_thread.reset(); + io_service.reset(); + } } virtual ~ClientBase() noexcept { @@ -459,7 +446,7 @@ namespace SimpleWeb { } protected: - bool internal_io_service = false; + std::unique_ptr internal_io_service_handler_thread GUARDED_BY(connections_mutex); std::string host; unsigned short port; @@ -472,9 +459,6 @@ namespace SimpleWeb { std::shared_ptr handler_runner; - Mutex concurrent_synchronous_requests_mutex; - std::size_t concurrent_synchronous_requests GUARDED_BY(concurrent_synchronous_requests_mutex) = 0; - ClientBase(const std::string &host_port, unsigned short default_port) noexcept : default_port(default_port), handler_runner(new ScopeRunner()) { auto parsed_host_port = parse_host_port(host_port, default_port); host = parsed_host_port.first; @@ -486,8 +470,12 @@ namespace SimpleWeb { LockGuard lock(connections_mutex); if(!io_service) { - io_service = std::make_shared(); - internal_io_service = true; + io_service = std::make_shared(); + internal_io_service_handler_thread = std::unique_ptr(new std::thread([this]{ + // Ensure that the io_service does not stop + asio::io_service::work dummy_work(*io_service); + io_service->run(); + })); } for(auto it = connections.begin(); it != connections.end(); ++it) {