Use futures in synchronous request() methods

This commit is contained in:
Jens Moeller 2020-02-18 13:05:43 -06:00
commit 571fe8b0ad

View file

@ -3,6 +3,7 @@
#include "asio_compatibility.hpp" #include "asio_compatibility.hpp"
#include "mutex.hpp" #include "mutex.hpp"
#include <future>
#include "utility.hpp" #include "utility.hpp"
#include <limits> #include <limits>
#include <random> #include <random>
@ -213,92 +214,72 @@ namespace SimpleWeb {
/// Convenience function to perform synchronous request. The io_service is run within this function. /// Convenience function to perform synchronous request. The io_service is run within this function.
/// If you reuse the io_service for other tasks, use the asynchronous request functions instead. /// If you reuse the io_service for other tasks, use the asynchronous request functions instead.
/// Do not use concurrently with the asynchronous request functions.
/// When requesting Server-Sent Events: will throw on error::eof, please use asynchronous request functions instead. /// When requesting Server-Sent Events: will throw on error::eof, please use asynchronous request functions instead.
std::shared_ptr<Response> request(const std::string &method, const std::string &path = {"/"}, string_view content = {}, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) { std::shared_ptr<Response> request(const std::string &method, const std::string &path = {"/"}, string_view content = {}, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) {
std::promise<std::shared_ptr<Response>> response_promise;
std::future<std::shared_ptr<Response>> response_future = response_promise.get_future();
std::shared_ptr<Response> response; std::shared_ptr<Response> response;
error_code ec; error_code ec;
request(method, path, content, header, [&response, &ec](std::shared_ptr<Response> response_, const error_code &ec_) { request(method, path, content, header, [&response_promise, response, ec](std::shared_ptr<Response> response_, const error_code &ec_) mutable {
if(ec_ && !ec)
ec = ec_;
if(!response) if(!response)
response = response_; response = response_;
else { else if(!ec) {
if(response_->streambuf.size() + response->streambuf.size() > response->streambuf.max_size()) if(response_->streambuf.size() + response->streambuf.size() > response->streambuf.max_size())
throw make_error_code::make_error_code(errc::message_size); ec = make_error_code::make_error_code(errc::message_size);
// Move partial response_ content to response: else {
auto &source = response_->streambuf; // Move partial response_ content to response:
auto &target = response->streambuf; auto &source = response_->streambuf;
target.commit(asio::buffer_copy(target.prepare(source.size()), source.data())); auto &target = response->streambuf;
source.consume(source.size()); target.commit(asio::buffer_copy(target.prepare(source.size()), source.data()));
source.consume(source.size());
}
}
if(response_->content.end) {
if(!ec)
response_promise.set_value(response);
else
response_promise.set_exception(std::make_exception_ptr(system_error(ec)));
} }
ec = ec_;
}); });
{ return response_future.get();
LockGuard lock(concurrent_synchronous_requests_mutex);
++concurrent_synchronous_requests;
}
try {
io_service->run();
}
catch(const error_code &ec_) {
ec = ec_;
}
{
LockGuard lock(concurrent_synchronous_requests_mutex);
--concurrent_synchronous_requests;
if(!concurrent_synchronous_requests)
restart(*io_service);
}
if(ec)
throw system_error(ec);
return response;
} }
/// Convenience function to perform synchronous request. The io_service is run within this function. /// Convenience function to perform synchronous request. The io_service is run within this function.
/// If you reuse the io_service for other tasks, use the asynchronous request functions instead. /// If you reuse the io_service for other tasks, use the asynchronous request functions instead.
/// Do not use concurrently with the asynchronous request functions.
/// When requesting Server-Sent Events: will throw on error::eof, please use asynchronous request functions instead. /// When requesting Server-Sent Events: will throw on error::eof, please use asynchronous request functions instead.
std::shared_ptr<Response> request(const std::string &method, const std::string &path, std::istream &content, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) { std::shared_ptr<Response> request(const std::string &method, const std::string &path, std::istream &content, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) {
std::promise<std::shared_ptr<Response>> response_promise;
std::future<std::shared_ptr<Response>> response_future = response_promise.get_future();
std::shared_ptr<Response> response; std::shared_ptr<Response> response;
error_code ec; error_code ec;
request(method, path, content, header, [&response, &ec](std::shared_ptr<Response> response_, const error_code &ec_) { request(method, path, content, header, [&response_promise, response, ec](std::shared_ptr<Response> response_, const error_code &ec_) mutable {
if(ec_ && !ec)
ec = ec_;
if(!response) if(!response)
response = response_; response = response_;
else { else if(!ec) {
if(response_->streambuf.size() + response->streambuf.size() > response->streambuf.max_size()) if(response_->streambuf.size() + response->streambuf.size() > response->streambuf.max_size())
throw make_error_code::make_error_code(errc::message_size); ec = make_error_code::make_error_code(errc::message_size);
// Move partial response_ content to response: else {
auto &source = response_->streambuf; // Move partial response_ content to response:
auto &target = response->streambuf; auto &source = response_->streambuf;
target.commit(asio::buffer_copy(target.prepare(source.size()), source.data())); auto &target = response->streambuf;
source.consume(source.size()); target.commit(asio::buffer_copy(target.prepare(source.size()), source.data()));
source.consume(source.size());
}
}
if(response_->content.end) {
if(!ec)
response_promise.set_value(response);
else
response_promise.set_exception(std::make_exception_ptr(system_error(ec)));
} }
ec = ec_;
}); });
{ return response_future.get();
LockGuard lock(concurrent_synchronous_requests_mutex);
++concurrent_synchronous_requests;
}
try {
io_service->run();
}
catch(const error_code &ec_) {
ec = ec_;
}
{
LockGuard lock(concurrent_synchronous_requests_mutex);
--concurrent_synchronous_requests;
if(!concurrent_synchronous_requests)
restart(*io_service);
}
if(ec)
throw system_error(ec);
return response;
} }
/// Asynchronous request where running Client's io_service is required. /// Asynchronous request where running Client's io_service is required.
@ -451,6 +432,12 @@ namespace SimpleWeb {
(*it)->close(); (*it)->close();
it = connections.erase(it); it = connections.erase(it);
} }
if(internal_io_service_handler_thread) {
io_service->stop();
internal_io_service_handler_thread->join();
internal_io_service_handler_thread.reset();
io_service.reset();
}
} }
virtual ~ClientBase() noexcept { virtual ~ClientBase() noexcept {
@ -459,7 +446,7 @@ namespace SimpleWeb {
} }
protected: protected:
bool internal_io_service = false; std::unique_ptr<std::thread> internal_io_service_handler_thread GUARDED_BY(connections_mutex);
std::string host; std::string host;
unsigned short port; unsigned short port;
@ -472,9 +459,6 @@ namespace SimpleWeb {
std::shared_ptr<ScopeRunner> handler_runner; std::shared_ptr<ScopeRunner> handler_runner;
Mutex concurrent_synchronous_requests_mutex;
std::size_t concurrent_synchronous_requests GUARDED_BY(concurrent_synchronous_requests_mutex) = 0;
ClientBase(const std::string &host_port, unsigned short default_port) noexcept : default_port(default_port), handler_runner(new ScopeRunner()) { ClientBase(const std::string &host_port, unsigned short default_port) noexcept : default_port(default_port), handler_runner(new ScopeRunner()) {
auto parsed_host_port = parse_host_port(host_port, default_port); auto parsed_host_port = parse_host_port(host_port, default_port);
host = parsed_host_port.first; host = parsed_host_port.first;
@ -486,8 +470,12 @@ namespace SimpleWeb {
LockGuard lock(connections_mutex); LockGuard lock(connections_mutex);
if(!io_service) { if(!io_service) {
io_service = std::make_shared<io_context>(); io_service = std::make_shared<asio::io_service>();
internal_io_service = true; internal_io_service_handler_thread = std::unique_ptr<std::thread>(new std::thread([this]{
// Ensure that the io_service does not stop
asio::io_service::work dummy_work(*io_service);
io_service->run();
}));
} }
for(auto it = connections.begin(); it != connections.end(); ++it) { for(auto it = connections.begin(); it != connections.end(); ++it) {