Session and Connection cleanup

This commit is contained in:
eidheim 2017-07-09 12:54:56 +02:00
commit d8b8716a17
5 changed files with 172 additions and 151 deletions

View file

@ -114,39 +114,30 @@ namespace SimpleWeb {
};
protected:
class Connection {
class Connection : public std::enable_shared_from_this<Connection> {
public:
template <typename... Args>
Connection(Args &&... args) : socket(new socket_type(std::forward<Args>(args)...)) {}
Connection(std::shared_ptr<bool> cancel_handlers, std::shared_ptr<SharedMutex> cancel_handlers_mutex, long timeout, Args &&... args)
: cancel_handlers(std::move(cancel_handlers)), cancel_handlers_mutex(std::move(cancel_handlers_mutex)), timeout(timeout),
socket(new socket_type(std::forward<Args>(args)...)) {}
std::shared_ptr<bool> cancel_handlers;
std::shared_ptr<SharedMutex> cancel_handlers_mutex;
long timeout;
std::unique_ptr<socket_type> socket; // Socket must be unique_ptr since asio::ssl::stream<asio::ip::tcp::socket> is not movable
std::mutex socket_close_mutex;
bool in_use = false;
bool attempt_reconnect = true;
std::unique_ptr<asio::deadline_timer> timer;
void close() {
error_code ec;
std::unique_lock<std::mutex> lock(socket_close_mutex); // the following operations seems to be needed to run sequentially
socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec);
socket->lowest_layer().close(ec);
}
};
class Session {
public:
Session(std::shared_ptr<bool> cancel_handlers, std::shared_ptr<SharedMutex> cancel_handlers_mutex, long timeout,
std::shared_ptr<Connection> connection, std::unique_ptr<asio::streambuf> request_buffer)
: cancel_handlers(std::move(cancel_handlers)), cancel_handlers_mutex(std::move(cancel_handlers_mutex)), timeout(timeout),
connection(std::move(connection)), request_buffer(std::move(request_buffer)), response(new Response()) {}
std::shared_ptr<bool> cancel_handlers;
std::shared_ptr<SharedMutex> cancel_handlers_mutex;
long timeout;
std::shared_ptr<Connection> connection;
std::unique_ptr<asio::streambuf> request_buffer;
std::shared_ptr<Response> response;
std::function<void(const error_code &)> callback;
std::unique_ptr<asio::deadline_timer> timer;
void set_timeout(long seconds = 0) {
if(seconds == 0)
@ -155,13 +146,12 @@ namespace SimpleWeb {
timer = nullptr;
return;
}
auto timer = std::unique_ptr<asio::deadline_timer>(new asio::deadline_timer(connection->socket->get_io_service()));
timer = std::unique_ptr<asio::deadline_timer>(new asio::deadline_timer(socket->get_io_service()));
timer->expires_from_now(boost::posix_time::seconds(seconds));
auto connection = this->connection;
timer->async_wait([connection](const error_code &ec) {
auto self = this->shared_from_this();
timer->async_wait([self](const error_code &ec) {
if(!ec)
connection->close();
self->close();
});
}
@ -169,6 +159,24 @@ namespace SimpleWeb {
if(timer)
timer->cancel();
}
std::pair<bool, std::unique_ptr<SharedMutex::SharedLock>> cancel_handlers_bool_and_lock() {
if(!cancel_handlers)
return {false, nullptr};
auto lock = cancel_handlers_mutex->shared_lock();
return {*cancel_handlers, std::move(lock)};
}
};
class Session {
public:
Session(std::shared_ptr<Connection> connection, std::unique_ptr<asio::streambuf> request_buffer)
: connection(std::move(connection)), request_buffer(std::move(request_buffer)), response(new Response()) {}
std::shared_ptr<Connection> connection;
std::unique_ptr<asio::streambuf> request_buffer;
std::shared_ptr<Response> response;
std::function<void(const error_code &)> callback;
};
public:
@ -243,7 +251,7 @@ namespace SimpleWeb {
/// Do not use concurrently with the synchronous request functions.
void request(const std::string &method, const std::string &path, string_view content, const CaseInsensitiveMultimap &header,
std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) {
auto session = std::make_shared<Session>(cancel_handlers, cancel_handlers_mutex, config.timeout, get_connection(), create_request_header(method, path, header));
auto session = std::make_shared<Session>(get_connection(), create_request_header(method, path, header));
auto connection = session->connection;
auto response = session->response;
auto request_callback = std::make_shared<std::function<void(std::shared_ptr<Response>, const error_code &)>>(std::move(request_callback_));
@ -301,7 +309,7 @@ namespace SimpleWeb {
/// Asynchronous request where setting and/or running Client's io_service is required.
void request(const std::string &method, const std::string &path, std::istream &content, const CaseInsensitiveMultimap &header,
std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) {
auto session = std::make_shared<Session>(cancel_handlers, cancel_handlers_mutex, config.timeout, get_connection(), create_request_header(method, path, header));
auto session = std::make_shared<Session>(get_connection(), create_request_header(method, path, header));
auto connection = session->connection;
auto response = session->response;
auto request_callback = std::make_shared<std::function<void(std::shared_ptr<Response>, const error_code &)>>(std::move(request_callback_));
@ -352,21 +360,27 @@ namespace SimpleWeb {
void stop() {
std::unique_lock<std::mutex> lock(connections_mutex);
for(auto it = connections.begin(); it != connections.end();) {
(*it)->attempt_reconnect = false;
(*it)->close();
if(!internal_io_service) {
(*it)->attempt_reconnect = false;
(*it)->close();
}
it = connections.erase(it);
}
}
virtual ~ClientBase() {
{
auto lock = cancel_handlers_mutex->unique_lock();
*cancel_handlers = true;
if(!internal_io_service) {
auto lock = cancel_handlers_mutex->unique_lock();
*cancel_handlers = true;
}
}
stop();
}
protected:
bool internal_io_service = false;
std::string host;
unsigned short port;
@ -381,7 +395,7 @@ namespace SimpleWeb {
size_t concurrent_synchronous_requests = 0;
std::mutex concurrent_synchronous_requests_mutex;
ClientBase(const std::string &host_port, unsigned short default_port) : io_service(new asio::io_service()), cancel_handlers(new bool(false)), cancel_handlers_mutex(new SharedMutex()) {
ClientBase(const std::string &host_port, unsigned short default_port) : cancel_handlers(new bool(false)), cancel_handlers_mutex(new SharedMutex()) {
auto parsed_host_port = parse_host_port(host_port, default_port);
host = parsed_host_port.first;
port = parsed_host_port.second;
@ -390,6 +404,14 @@ namespace SimpleWeb {
std::shared_ptr<Connection> get_connection() {
std::shared_ptr<Connection> connection;
std::unique_lock<std::mutex> lock(connections_mutex);
if(!io_service) {
io_service = std::make_shared<asio::io_service>();
internal_io_service = true;
cancel_handlers = nullptr;
cancel_handlers_mutex = nullptr;
}
for(auto it = connections.begin(); it != connections.end(); ++it) {
if(!(*it)->in_use && !connection) {
connection = *it;
@ -449,11 +471,11 @@ namespace SimpleWeb {
}
void write(const std::shared_ptr<Session> &session) {
session->set_timeout();
session->connection->set_timeout();
asio::async_write(*session->connection->socket, session->request_buffer->data(), [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
session->cancel_timeout();
auto lock = session->cancel_handlers_mutex->shared_lock();
if(*session->cancel_handlers)
session->connection->cancel_timeout();
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
if(cancel_pair.first)
return;
if(!ec)
this->read(session);
@ -465,11 +487,11 @@ namespace SimpleWeb {
}
void read(const std::shared_ptr<Session> &session) {
session->set_timeout();
session->connection->set_timeout();
asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n\r\n", [this, session](const error_code &ec, size_t bytes_transferred) {
session->cancel_timeout();
auto lock = session->cancel_handlers_mutex->shared_lock();
if(*session->cancel_handlers)
session->connection->cancel_timeout();
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
if(cancel_pair.first)
return;
if(!ec) {
session->connection->attempt_reconnect = true;
@ -482,11 +504,11 @@ namespace SimpleWeb {
if(header_it != session->response->header.end()) {
auto content_length = stoull(header_it->second);
if(content_length > num_additional_bytes) {
session->set_timeout();
session->connection->set_timeout();
asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
session->cancel_timeout();
auto lock = session->cancel_handlers_mutex->shared_lock();
if(*session->cancel_handlers)
session->connection->cancel_timeout();
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
if(cancel_pair.first)
return;
if(!ec)
session->callback(ec);
@ -504,11 +526,11 @@ namespace SimpleWeb {
this->read_chunked(session, tmp_streambuf);
}
else if(session->response->http_version < "1.1" || ((header_it = session->response->header.find("Session")) != session->response->header.end() && header_it->second == "close")) {
session->set_timeout();
session->connection->set_timeout();
asio::async_read(*session->connection->socket, session->response->content_buffer, [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
session->cancel_timeout();
auto lock = session->cancel_handlers_mutex->shared_lock();
if(*session->cancel_handlers)
session->connection->cancel_timeout();
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
if(cancel_pair.first)
return;
if(!ec)
session->callback(ec);
@ -541,11 +563,11 @@ namespace SimpleWeb {
}
void read_chunked(const std::shared_ptr<Session> &session, const std::shared_ptr<asio::streambuf> &tmp_streambuf) {
session->set_timeout();
session->connection->set_timeout();
asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n", [this, session, tmp_streambuf](const error_code &ec, size_t bytes_transferred) {
session->cancel_timeout();
auto lock = session->cancel_handlers_mutex->shared_lock();
if(*session->cancel_handlers)
session->connection->cancel_timeout();
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
if(cancel_pair.first)
return;
if(!ec) {
std::string line;
@ -579,11 +601,11 @@ namespace SimpleWeb {
};
if((2 + length) > num_additional_bytes) {
session->set_timeout();
session->connection->set_timeout();
asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(2 + length - num_additional_bytes), [this, session, post_process](const error_code &ec, size_t /*bytes_transferred*/) {
session->cancel_timeout();
auto lock = session->cancel_handlers_mutex->shared_lock();
if(*session->cancel_handlers)
session->connection->cancel_timeout();
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
if(cancel_pair.first)
return;
if(!ec)
post_process();
@ -616,24 +638,24 @@ namespace SimpleWeb {
protected:
std::shared_ptr<Connection> create_connection() override {
return std::make_shared<Connection>(*io_service);
return std::make_shared<Connection>(cancel_handlers, cancel_handlers_mutex, config.timeout, *io_service);
}
void connect(const std::shared_ptr<Session> &session) override {
if(!session->connection->socket->lowest_layer().is_open()) {
auto resolver = std::make_shared<asio::ip::tcp::resolver>(*io_service);
session->set_timeout(config.timeout_connect);
session->connection->set_timeout(config.timeout_connect);
resolver->async_resolve(*query, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator it) {
session->cancel_timeout();
auto lock = session->cancel_handlers_mutex->shared_lock();
if(*session->cancel_handlers)
session->connection->cancel_timeout();
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
if(cancel_pair.first)
return;
if(!ec) {
session->set_timeout(config.timeout_connect);
session->connection->set_timeout(config.timeout_connect);
asio::async_connect(*session->connection->socket, it, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator /*it*/) {
session->cancel_timeout();
auto lock = session->cancel_handlers_mutex->shared_lock();
if(*session->cancel_handlers)
session->connection->cancel_timeout();
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
if(cancel_pair.first)
return;
if(!ec) {
asio::ip::tcp::no_delay option(true);