Related to !248: completed synchronous request() fixes
This commit is contained in:
parent
49e2bb9261
commit
c6c7d0a6f2
5 changed files with 160 additions and 124 deletions
132
client_http.hpp
132
client_http.hpp
|
|
@ -3,8 +3,8 @@
|
|||
|
||||
#include "asio_compatibility.hpp"
|
||||
#include "mutex.hpp"
|
||||
#include <future>
|
||||
#include "utility.hpp"
|
||||
#include <future>
|
||||
#include <limits>
|
||||
#include <random>
|
||||
#include <unordered_set>
|
||||
|
|
@ -210,25 +210,45 @@ namespace SimpleWeb {
|
|||
Config config;
|
||||
|
||||
/// If you want to reuse an already created asio::io_service, store its pointer here before calling a request function.
|
||||
/// Do not set when using synchronous request functions.
|
||||
std::shared_ptr<io_context> io_service;
|
||||
|
||||
/// Convenience function to perform synchronous request. The io_service is run within this function.
|
||||
/// Convenience function to perform synchronous request. The io_service is started in this function.
|
||||
/// Should not be combined with asynchronous request functions.
|
||||
/// If you reuse the io_service for other tasks, use the asynchronous request functions instead.
|
||||
/// When requesting Server-Sent Events: will throw on error::eof, please use asynchronous request functions instead.
|
||||
std::shared_ptr<Response> request(const std::string &method, const std::string &path = {"/"}, string_view content = {}, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) {
|
||||
start_internal_io_service_handler_thread();
|
||||
std::promise<std::shared_ptr<Response>> response_promise;
|
||||
std::future<std::shared_ptr<Response>> response_future = response_promise.get_future();
|
||||
{
|
||||
LockGuard lock(synchronous_request_mutex);
|
||||
if(!synchronous_request_called) {
|
||||
if(io_service) // Throw if io_service already set
|
||||
throw make_error_code::make_error_code(errc::operation_not_permitted);
|
||||
io_service = std::make_shared<io_context>();
|
||||
internal_io_service = true;
|
||||
auto io_service_ = io_service;
|
||||
std::thread thread([io_service_] {
|
||||
auto work = make_work_guard(*io_service_);
|
||||
io_service_->run();
|
||||
});
|
||||
thread.detach();
|
||||
synchronous_request_called = true;
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<Response> response;
|
||||
error_code ec;
|
||||
request(method, path, content, header, [&response_promise, response, ec](std::shared_ptr<Response> response_, const error_code &ec_) mutable {
|
||||
if(ec_ && !ec)
|
||||
ec = ec_;
|
||||
std::promise<std::shared_ptr<Response>> response_promise;
|
||||
bool stop_future_handlers = false;
|
||||
request(method, path, content, header, [&response, &response_promise, &stop_future_handlers](std::shared_ptr<Response> response_, error_code ec) {
|
||||
if(stop_future_handlers)
|
||||
return;
|
||||
|
||||
if(!response)
|
||||
response = response_;
|
||||
else if(!ec) {
|
||||
if(response_->streambuf.size() + response->streambuf.size() > response->streambuf.max_size())
|
||||
if(response_->streambuf.size() + response->streambuf.size() > response->streambuf.max_size()) {
|
||||
ec = make_error_code::make_error_code(errc::message_size);
|
||||
response->close();
|
||||
}
|
||||
else {
|
||||
// Move partial response_ content to response:
|
||||
auto &source = response_->streambuf;
|
||||
|
|
@ -237,34 +257,54 @@ namespace SimpleWeb {
|
|||
source.consume(source.size());
|
||||
}
|
||||
}
|
||||
if(response_->content.end) {
|
||||
if(!ec)
|
||||
response_promise.set_value(response);
|
||||
else
|
||||
response_promise.set_exception(std::make_exception_ptr(system_error(ec)));
|
||||
|
||||
if(ec) {
|
||||
response_promise.set_exception(std::make_exception_ptr(system_error(ec)));
|
||||
stop_future_handlers = true;
|
||||
}
|
||||
else if(response_->content.end)
|
||||
response_promise.set_value(response);
|
||||
});
|
||||
|
||||
return response_future.get();
|
||||
return response_promise.get_future().get();
|
||||
}
|
||||
|
||||
/// Convenience function to perform synchronous request. The io_service is run within this function.
|
||||
/// Convenience function to perform synchronous request. The io_service is started in this function.
|
||||
/// Should not be combined with asynchronous request functions.
|
||||
/// If you reuse the io_service for other tasks, use the asynchronous request functions instead.
|
||||
/// When requesting Server-Sent Events: will throw on error::eof, please use asynchronous request functions instead.
|
||||
std::shared_ptr<Response> request(const std::string &method, const std::string &path, std::istream &content, const CaseInsensitiveMultimap &header = CaseInsensitiveMultimap()) {
|
||||
start_internal_io_service_handler_thread();
|
||||
std::promise<std::shared_ptr<Response>> response_promise;
|
||||
std::future<std::shared_ptr<Response>> response_future = response_promise.get_future();
|
||||
{
|
||||
LockGuard lock(synchronous_request_mutex);
|
||||
if(!synchronous_request_called) {
|
||||
if(io_service) // Throw if io_service already set
|
||||
throw make_error_code::make_error_code(errc::operation_not_permitted);
|
||||
io_service = std::make_shared<io_context>();
|
||||
internal_io_service = true;
|
||||
auto io_service_ = io_service;
|
||||
std::thread thread([io_service_] {
|
||||
auto work = make_work_guard(*io_service_);
|
||||
io_service_->run();
|
||||
});
|
||||
thread.detach();
|
||||
synchronous_request_called = 1;
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<Response> response;
|
||||
error_code ec;
|
||||
request(method, path, content, header, [&response_promise, response, ec](std::shared_ptr<Response> response_, const error_code &ec_) mutable {
|
||||
if(ec_ && !ec)
|
||||
ec = ec_;
|
||||
std::promise<std::shared_ptr<Response>> response_promise;
|
||||
bool stop_future_handlers = false;
|
||||
request(method, path, content, header, [&response, &response_promise, &stop_future_handlers](std::shared_ptr<Response> response_, error_code ec) {
|
||||
if(stop_future_handlers)
|
||||
return;
|
||||
|
||||
if(!response)
|
||||
response = response_;
|
||||
else if(!ec) {
|
||||
if(response_->streambuf.size() + response->streambuf.size() > response->streambuf.max_size())
|
||||
if(response_->streambuf.size() + response->streambuf.size() > response->streambuf.max_size()) {
|
||||
ec = make_error_code::make_error_code(errc::message_size);
|
||||
response->close();
|
||||
}
|
||||
else {
|
||||
// Move partial response_ content to response:
|
||||
auto &source = response_->streambuf;
|
||||
|
|
@ -273,15 +313,16 @@ namespace SimpleWeb {
|
|||
source.consume(source.size());
|
||||
}
|
||||
}
|
||||
if(response_->content.end) {
|
||||
if(!ec)
|
||||
response_promise.set_value(response);
|
||||
else
|
||||
response_promise.set_exception(std::make_exception_ptr(system_error(ec)));
|
||||
|
||||
if(ec) {
|
||||
response_promise.set_exception(std::make_exception_ptr(system_error(ec)));
|
||||
stop_future_handlers = true;
|
||||
}
|
||||
else if(response_->content.end)
|
||||
response_promise.set_value(response);
|
||||
});
|
||||
|
||||
return response_future.get();
|
||||
return response_promise.get_future().get();
|
||||
}
|
||||
|
||||
/// Asynchronous request where running Client's io_service is required.
|
||||
|
|
@ -434,21 +475,17 @@ namespace SimpleWeb {
|
|||
(*it)->close();
|
||||
it = connections.erase(it);
|
||||
}
|
||||
if(internal_io_service_handler_thread) {
|
||||
io_service->stop();
|
||||
internal_io_service_handler_thread->join();
|
||||
internal_io_service_handler_thread.reset();
|
||||
io_service.reset();
|
||||
}
|
||||
}
|
||||
|
||||
virtual ~ClientBase() noexcept {
|
||||
handler_runner->stop();
|
||||
stop();
|
||||
if(internal_io_service)
|
||||
io_service->stop();
|
||||
}
|
||||
|
||||
protected:
|
||||
std::unique_ptr<std::thread> internal_io_service_handler_thread GUARDED_BY(connections_mutex);
|
||||
bool internal_io_service = false;
|
||||
|
||||
std::string host;
|
||||
unsigned short port;
|
||||
|
|
@ -461,31 +498,22 @@ namespace SimpleWeb {
|
|||
|
||||
std::shared_ptr<ScopeRunner> handler_runner;
|
||||
|
||||
Mutex synchronous_request_mutex;
|
||||
bool synchronous_request_called GUARDED_BY(synchronous_request_mutex) = false;
|
||||
|
||||
ClientBase(const std::string &host_port, unsigned short default_port) noexcept : default_port(default_port), handler_runner(new ScopeRunner()) {
|
||||
auto parsed_host_port = parse_host_port(host_port, default_port);
|
||||
host = parsed_host_port.first;
|
||||
port = parsed_host_port.second;
|
||||
}
|
||||
|
||||
void start_internal_io_service_handler_thread() noexcept {
|
||||
LockGuard lock(connections_mutex);
|
||||
|
||||
if(!io_service) {
|
||||
io_service = std::make_shared<asio::io_service>();
|
||||
internal_io_service_handler_thread = std::unique_ptr<std::thread>(new std::thread([this]{
|
||||
// Ensure that the io_service does not stop
|
||||
asio::io_service::work dummy_work(*io_service);
|
||||
io_service->run();
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<Connection> get_connection() noexcept {
|
||||
std::shared_ptr<Connection> connection;
|
||||
LockGuard lock(connections_mutex);
|
||||
|
||||
if(!io_service) {
|
||||
io_service = std::make_shared<asio::io_service>();
|
||||
io_service = std::make_shared<io_context>();
|
||||
internal_io_service = true;
|
||||
}
|
||||
|
||||
for(auto it = connections.begin(); it != connections.end(); ++it) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue