Replaced Server/Client::shared_from_this with asio callback checks if the Server/Client has been destroyed. Also added SharedMutex to utility.hpp.

This commit is contained in:
eidheim 2017-07-05 10:16:31 +02:00
commit 9b5063f422
9 changed files with 500 additions and 163 deletions

View file

@ -2,6 +2,7 @@
#define CLIENT_HTTP_HPP
#include "utility.hpp"
#include <condition_variable>
#include <mutex>
#include <random>
#include <vector>
@ -33,7 +34,7 @@ namespace SimpleWeb {
class Client;
template <class socket_type>
class ClientBase : public std::enable_shared_from_this<ClientBase<socket_type>> {
class ClientBase {
ClientBase(const ClientBase &) = delete;
ClientBase &operator=(const ClientBase &) = delete;
@ -121,11 +122,13 @@ namespace SimpleWeb {
Connection(std::unique_ptr<socket_type> &&socket) : socket(std::move(socket)) {}
std::unique_ptr<socket_type> socket;
std::mutex socket_close_mutex;
bool in_use = false;
bool reconnecting = false;
bool attempt_reconnect = true;
void close() {
error_code ec;
std::unique_lock<std::mutex> lock(socket_close_mutex); // the following operations seems to be needed to run sequentially
socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec);
socket->lowest_layer().close(ec);
}
@ -133,9 +136,12 @@ namespace SimpleWeb {
class Session {
public:
Session(const std::shared_ptr<ClientBase<socket_type>> &client, const std::shared_ptr<Connection> &connection, std::unique_ptr<asio::streambuf> &&request_buffer)
: client(client), connection(connection), request_buffer(std::move(request_buffer)), response(new Response()) {}
std::shared_ptr<ClientBase<socket_type>> client;
Session(ClientBase<socket_type> *client, const std::shared_ptr<Connection> &connection, std::unique_ptr<asio::streambuf> &&request_buffer)
: client(client), cancel_callbacks(client->cancel_callbacks), cancel_callbacks_mutex(client->cancel_callbacks_mutex),
connection(connection), request_buffer(std::move(request_buffer)), response(new Response()) {}
ClientBase<socket_type> *client;
std::shared_ptr<bool> cancel_callbacks;
std::shared_ptr<SharedMutex> cancel_callbacks_mutex;
std::shared_ptr<Connection> connection;
std::unique_ptr<asio::streambuf> request_buffer;
std::shared_ptr<Response> response;
@ -174,8 +180,6 @@ namespace SimpleWeb {
/// When using asynchronous requests, running the io_service is up to the programmer.
std::shared_ptr<asio::io_service> io_service;
virtual ~ClientBase() {}
/// Convenience function to perform synchronous request. The io_service is run within this function.
/// If reusing the io_service for other tasks, please use the asynchronous request functions instead.
std::shared_ptr<Response> request(const std::string &method, const std::string &path = std::string("/"),
@ -213,14 +217,14 @@ namespace SimpleWeb {
/// Asynchronous request where setting and/or running Client's io_service is required.
void request(const std::string &method, const std::string &path, string_view content, const CaseInsensitiveMultimap &header,
std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) {
auto session = std::make_shared<Session>(this->shared_from_this(), get_connection(), create_request_header(method, path, header));
auto session = std::make_shared<Session>(this, get_connection(), create_request_header(method, path, header));
auto client = session->client;
auto connection = session->connection;
auto response = session->response;
auto request_callback = std::make_shared<std::function<void(std::shared_ptr<Response>, const error_code &)>>(std::move(request_callback_));
session->callback = [client, connection, response, request_callback](const error_code &ec) {
{
std::lock_guard<std::mutex> lock(client->connections_mutex);
std::unique_lock<std::mutex> lock(client->connections_mutex);
connection->in_use = false;
// Remove unused connections, but keep one open for HTTP persistent connection:
@ -271,14 +275,14 @@ namespace SimpleWeb {
/// Asynchronous request where setting and/or running Client's io_service is required.
void request(const std::string &method, const std::string &path, std::istream &content, const CaseInsensitiveMultimap &header,
std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) {
auto session = std::make_shared<Session>(this->shared_from_this(), get_connection(), create_request_header(method, path, header));
auto session = std::make_shared<Session>(this, get_connection(), create_request_header(method, path, header));
auto client = session->client;
auto connection = session->connection;
auto response = session->response;
auto request_callback = std::make_shared<std::function<void(std::shared_ptr<Response>, const error_code &)>>(std::move(request_callback_));
session->callback = [client, connection, response, request_callback](const error_code &ec) {
{
std::lock_guard<std::mutex> lock(client->connections_mutex);
std::unique_lock<std::mutex> lock(client->connections_mutex);
connection->in_use = false;
// Remove unused connections, but keep one open for HTTP persistent connection:
@ -321,13 +325,22 @@ namespace SimpleWeb {
/// Close connections
void close() {
std::lock_guard<std::mutex> lock(connections_mutex);
std::unique_lock<std::mutex> lock(connections_mutex);
for(auto it = connections.begin(); it != connections.end();) {
(*it)->attempt_reconnect = false;
(*it)->close();
it = connections.erase(it);
}
}
virtual ~ClientBase() {
{
auto lock = cancel_callbacks_mutex->unique_lock();
*cancel_callbacks = true;
}
close();
}
protected:
std::string host;
unsigned short port;
@ -337,7 +350,10 @@ namespace SimpleWeb {
std::vector<std::shared_ptr<Connection>> connections;
std::mutex connections_mutex;
ClientBase(const std::string &host_port, unsigned short default_port) : io_service(new asio::io_service()) {
std::shared_ptr<bool> cancel_callbacks;
std::shared_ptr<SharedMutex> cancel_callbacks_mutex;
ClientBase(const std::string &host_port, unsigned short default_port) : io_service(new asio::io_service()), cancel_callbacks(new bool(false)), cancel_callbacks_mutex(new SharedMutex()) {
auto parsed_host_port = parse_host_port(host_port, default_port);
host = parsed_host_port.first;
port = parsed_host_port.second;
@ -345,7 +361,7 @@ namespace SimpleWeb {
std::shared_ptr<Connection> get_connection() {
std::shared_ptr<Connection> connection;
std::lock_guard<std::mutex> lock(connections_mutex);
std::unique_lock<std::mutex> lock(connections_mutex);
for(auto it = connections.begin(); it != connections.end(); ++it) {
if(!(*it)->in_use && !connection) {
connection = *it;
@ -356,7 +372,7 @@ namespace SimpleWeb {
connection = create_connection();
connections.emplace_back(connection);
}
connection->reconnecting = false;
connection->attempt_reconnect = true;
connection->in_use = true;
if(!query) {
@ -408,6 +424,9 @@ namespace SimpleWeb {
session->set_timeout();
asio::async_write(*session->connection->socket, session->request_buffer->data(), [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
session->cancel_timeout();
auto lock = session->cancel_callbacks_mutex->shared_lock();
if(*session->cancel_callbacks)
return;
if(!ec)
this->read(session);
else {
@ -421,8 +440,11 @@ namespace SimpleWeb {
session->set_timeout();
asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n\r\n", [this, session](const error_code &ec, size_t bytes_transferred) {
session->cancel_timeout();
auto lock = session->cancel_callbacks_mutex->shared_lock();
if(*session->cancel_callbacks)
return;
if(!ec) {
session->connection->reconnecting = false;
session->connection->attempt_reconnect = true;
size_t num_additional_bytes = session->response->content_buffer.size() - bytes_transferred;
@ -435,6 +457,9 @@ namespace SimpleWeb {
session->set_timeout();
asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
session->cancel_timeout();
auto lock = session->cancel_callbacks_mutex->shared_lock();
if(*session->cancel_callbacks)
return;
if(!ec)
session->callback(ec);
else {
@ -454,6 +479,9 @@ namespace SimpleWeb {
session->set_timeout();
asio::async_read(*session->connection->socket, session->response->content_buffer, [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
session->cancel_timeout();
auto lock = session->cancel_callbacks_mutex->shared_lock();
if(*session->cancel_callbacks)
return;
if(!ec)
session->callback(ec);
else {
@ -471,8 +499,8 @@ namespace SimpleWeb {
session->callback(ec);
}
else {
if(!session->connection->reconnecting) {
session->connection->reconnecting = true;
if(session->connection->attempt_reconnect) {
session->connection->attempt_reconnect = false;
session->connection->close();
this->connect(session);
}
@ -488,6 +516,9 @@ namespace SimpleWeb {
session->set_timeout();
asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n", [this, session, tmp_streambuf](const error_code &ec, size_t bytes_transferred) {
session->cancel_timeout();
auto lock = session->cancel_callbacks_mutex->shared_lock();
if(*session->cancel_callbacks)
return;
if(!ec) {
std::string line;
getline(session->response->content, line);
@ -523,6 +554,9 @@ namespace SimpleWeb {
session->set_timeout();
asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(2 + length - num_additional_bytes), [this, session, post_process](const error_code &ec, size_t /*bytes_transferred*/) {
session->cancel_timeout();
auto lock = session->cancel_callbacks_mutex->shared_lock();
if(*session->cancel_callbacks)
return;
if(!ec)
post_process();
else {
@ -554,13 +588,9 @@ namespace SimpleWeb {
Client &operator=(const Client &) = delete;
public:
static std::shared_ptr<Client> create(const std::string &server_port_path) {
return std::shared_ptr<Client>(new Client(server_port_path));
}
protected:
Client(const std::string &server_port_path) : ClientBase<HTTP>::ClientBase(server_port_path, 80) {}
protected:
std::shared_ptr<Connection> create_connection() override {
return std::make_shared<Connection>(std::unique_ptr<HTTP>(new HTTP(*io_service)));
}
@ -571,13 +601,20 @@ namespace SimpleWeb {
session->set_timeout(config.timeout_connect);
resolver->async_resolve(*query, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator it) {
session->cancel_timeout();
auto lock = session->cancel_callbacks_mutex->shared_lock();
if(*session->cancel_callbacks)
return;
if(!ec) {
session->set_timeout(config.timeout_connect);
asio::async_connect(*session->connection->socket, it, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator /*it*/) {
session->cancel_timeout();
auto lock = session->cancel_callbacks_mutex->shared_lock();
if(*session->cancel_callbacks)
return;
if(!ec) {
asio::ip::tcp::no_delay option(true);
session->connection->socket->set_option(option);
error_code ec;
session->connection->socket->set_option(option, ec);
this->write(session);
}
else {