Now close connections when Server::stop is called
This commit is contained in:
parent
490e33e2d1
commit
f3f527467f
4 changed files with 72 additions and 32 deletions
|
|
@ -121,7 +121,7 @@ namespace SimpleWeb {
|
||||||
public:
|
public:
|
||||||
Connection(std::unique_ptr<socket_type> &&socket) : socket(std::move(socket)) {}
|
Connection(std::unique_ptr<socket_type> &&socket) : socket(std::move(socket)) {}
|
||||||
|
|
||||||
std::unique_ptr<socket_type> socket;
|
std::unique_ptr<socket_type> socket; // Socket must be unique_ptr since asio::ssl::stream<asio::ip::tcp::socket> is not movable
|
||||||
std::mutex socket_close_mutex;
|
std::mutex socket_close_mutex;
|
||||||
bool in_use = false;
|
bool in_use = false;
|
||||||
bool attempt_reconnect = true;
|
bool attempt_reconnect = true;
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
#include <unordered_set>
|
||||||
|
|
||||||
#ifdef USE_STANDALONE_ASIO
|
#ifdef USE_STANDALONE_ASIO
|
||||||
#include <asio.hpp>
|
#include <asio.hpp>
|
||||||
|
|
@ -92,7 +93,7 @@ namespace SimpleWeb {
|
||||||
return;
|
return;
|
||||||
auto self = this->shared_from_this();
|
auto self = this->shared_from_this();
|
||||||
session->set_timeout(session->server->config.timeout_content);
|
session->set_timeout(session->server->config.timeout_content);
|
||||||
asio::async_write(*session->socket, streambuf, [self, callback](const error_code &ec, size_t /*bytes_transferred*/) {
|
asio::async_write(*session->connection->socket, streambuf, [self, callback](const error_code &ec, size_t /*bytes_transferred*/) {
|
||||||
self->session->cancel_timeout();
|
self->session->cancel_timeout();
|
||||||
auto lock = self->session->cancel_callbacks_mutex->shared_lock();
|
auto lock = self->session->cancel_callbacks_mutex->shared_lock();
|
||||||
if(*self->session->cancel_callbacks)
|
if(*self->session->cancel_callbacks)
|
||||||
|
|
@ -265,17 +266,31 @@ namespace SimpleWeb {
|
||||||
};
|
};
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
class Connection {
|
||||||
|
public:
|
||||||
|
Connection(std::unique_ptr<socket_type> &&socket) : socket(std::move(socket)) {}
|
||||||
|
|
||||||
|
std::unique_ptr<socket_type> socket; // Socket must be unique_ptr since asio::ssl::stream<asio::ip::tcp::socket> is not movable
|
||||||
|
std::mutex socket_close_mutex;
|
||||||
|
|
||||||
|
void close() {
|
||||||
|
error_code ec;
|
||||||
|
std::unique_lock<std::mutex> lock(socket_close_mutex); // the following operations seems to be needed to run sequentially
|
||||||
|
socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec);
|
||||||
|
socket->lowest_layer().close(ec);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
class Session {
|
class Session {
|
||||||
public:
|
public:
|
||||||
Session(ServerBase<socket_type> *server, const std::shared_ptr<socket_type> &socket)
|
Session(ServerBase<socket_type> *server, const std::shared_ptr<Connection> &connection)
|
||||||
: server(server), cancel_callbacks(server->cancel_callbacks), cancel_callbacks_mutex(server->cancel_callbacks_mutex),
|
: server(server), cancel_callbacks(server->cancel_callbacks), cancel_callbacks_mutex(server->cancel_callbacks_mutex),
|
||||||
socket(socket), socket_close_mutex(new std::mutex()), request(new Request(*this->socket)) {}
|
connection(connection), request(new Request(*connection->socket)) {}
|
||||||
|
|
||||||
ServerBase<socket_type> *server;
|
ServerBase<socket_type> *server;
|
||||||
std::shared_ptr<bool> cancel_callbacks;
|
std::shared_ptr<bool> cancel_callbacks;
|
||||||
std::shared_ptr<SharedMutex> cancel_callbacks_mutex;
|
std::shared_ptr<SharedMutex> cancel_callbacks_mutex;
|
||||||
std::shared_ptr<socket_type> socket;
|
std::shared_ptr<Connection> connection;
|
||||||
std::shared_ptr<std::mutex> socket_close_mutex;
|
|
||||||
std::shared_ptr<Request> request;
|
std::shared_ptr<Request> request;
|
||||||
|
|
||||||
std::unique_ptr<asio::deadline_timer> timer;
|
std::unique_ptr<asio::deadline_timer> timer;
|
||||||
|
|
@ -286,17 +301,12 @@ namespace SimpleWeb {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
timer = std::unique_ptr<asio::deadline_timer>(new asio::deadline_timer(socket->get_io_service()));
|
timer = std::unique_ptr<asio::deadline_timer>(new asio::deadline_timer(connection->socket->get_io_service()));
|
||||||
timer->expires_from_now(boost::posix_time::seconds(seconds));
|
timer->expires_from_now(boost::posix_time::seconds(seconds));
|
||||||
auto socket = this->socket;
|
auto connection = this->connection;
|
||||||
auto socket_close_mutex = this->socket_close_mutex;
|
timer->async_wait([connection](const error_code &ec) {
|
||||||
timer->async_wait([socket, socket_close_mutex](const error_code &ec) {
|
if(!ec)
|
||||||
if(!ec) {
|
connection->close();
|
||||||
error_code ec;
|
|
||||||
std::unique_lock<std::mutex> lock(*socket_close_mutex); // the following operations seems to be needed to run sequentially
|
|
||||||
socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec);
|
|
||||||
socket->lowest_layer().close(ec);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -351,7 +361,7 @@ namespace SimpleWeb {
|
||||||
|
|
||||||
std::function<void(std::shared_ptr<typename ServerBase<socket_type>::Request>, const error_code &)> on_error;
|
std::function<void(std::shared_ptr<typename ServerBase<socket_type>::Request>, const error_code &)> on_error;
|
||||||
|
|
||||||
std::function<void(std::shared_ptr<socket_type>, std::shared_ptr<typename ServerBase<socket_type>::Request>)> on_upgrade;
|
std::function<void(std::unique_ptr<socket_type> &, std::shared_ptr<typename ServerBase<socket_type>::Request>)> on_upgrade;
|
||||||
|
|
||||||
/// If you have your own asio::io_service, store its pointer here before running start().
|
/// If you have your own asio::io_service, store its pointer here before running start().
|
||||||
std::shared_ptr<asio::io_service> io_service;
|
std::shared_ptr<asio::io_service> io_service;
|
||||||
|
|
@ -399,12 +409,19 @@ namespace SimpleWeb {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stop accepting new requests, and close current sessions.
|
/// Stop accepting new requests, and close current connections.
|
||||||
void stop() {
|
void stop() {
|
||||||
if(acceptor) {
|
if(acceptor) {
|
||||||
acceptor->close();
|
acceptor->close();
|
||||||
if(internal_io_service)
|
if(internal_io_service)
|
||||||
io_service->stop();
|
io_service->stop();
|
||||||
|
|
||||||
|
std::unique_lock<std::mutex> lock(*connections_mutex);
|
||||||
|
if(!internal_io_service) {
|
||||||
|
for(auto &connection : *connections)
|
||||||
|
connection->close();
|
||||||
|
}
|
||||||
|
connections->clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -422,16 +439,39 @@ namespace SimpleWeb {
|
||||||
std::unique_ptr<asio::ip::tcp::acceptor> acceptor;
|
std::unique_ptr<asio::ip::tcp::acceptor> acceptor;
|
||||||
std::vector<std::thread> threads;
|
std::vector<std::thread> threads;
|
||||||
|
|
||||||
|
std::shared_ptr<std::unordered_set<Connection *>> connections;
|
||||||
|
std::shared_ptr<std::mutex> connections_mutex;
|
||||||
|
|
||||||
std::shared_ptr<bool> cancel_callbacks;
|
std::shared_ptr<bool> cancel_callbacks;
|
||||||
std::shared_ptr<SharedMutex> cancel_callbacks_mutex;
|
std::shared_ptr<SharedMutex> cancel_callbacks_mutex;
|
||||||
|
|
||||||
ServerBase(unsigned short port) : config(port), cancel_callbacks(new bool(false)), cancel_callbacks_mutex(new SharedMutex()) {}
|
ServerBase(unsigned short port) : config(port), connections(new std::unordered_set<Connection *>()), connections_mutex(new std::mutex()),
|
||||||
|
cancel_callbacks(new bool(false)), cancel_callbacks_mutex(new SharedMutex()) {}
|
||||||
|
|
||||||
virtual void accept() = 0;
|
virtual void accept() = 0;
|
||||||
|
|
||||||
|
std::shared_ptr<Connection> create_connection(socket_type *socket) {
|
||||||
|
auto connections = this->connections;
|
||||||
|
auto connections_mutex = this->connections_mutex;
|
||||||
|
auto connection = std::shared_ptr<Connection>(new Connection(std::unique_ptr<socket_type>(socket)), [connections, connections_mutex](Connection *connection) {
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(*connections_mutex);
|
||||||
|
auto it = connections->find(connection);
|
||||||
|
if(it != connections->end())
|
||||||
|
connections->erase(it);
|
||||||
|
}
|
||||||
|
delete connection;
|
||||||
|
});
|
||||||
|
{
|
||||||
|
std::unique_lock<std::mutex> lock(*connections_mutex);
|
||||||
|
connections->emplace(connection.get());
|
||||||
|
}
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
|
|
||||||
void read_request_and_content(const std::shared_ptr<Session> &session) {
|
void read_request_and_content(const std::shared_ptr<Session> &session) {
|
||||||
session->set_timeout(config.timeout_request);
|
session->set_timeout(config.timeout_request);
|
||||||
asio::async_read_until(*session->socket, session->request->streambuf, "\r\n\r\n", [this, session](const error_code &ec, size_t bytes_transferred) {
|
asio::async_read_until(*session->connection->socket, session->request->streambuf, "\r\n\r\n", [this, session](const error_code &ec, size_t bytes_transferred) {
|
||||||
session->cancel_timeout();
|
session->cancel_timeout();
|
||||||
auto lock = session->cancel_callbacks_mutex->shared_lock();
|
auto lock = session->cancel_callbacks_mutex->shared_lock();
|
||||||
if(*session->cancel_callbacks)
|
if(*session->cancel_callbacks)
|
||||||
|
|
@ -460,7 +500,7 @@ namespace SimpleWeb {
|
||||||
}
|
}
|
||||||
if(content_length > num_additional_bytes) {
|
if(content_length > num_additional_bytes) {
|
||||||
session->set_timeout(config.timeout_content);
|
session->set_timeout(config.timeout_content);
|
||||||
asio::async_read(*session->socket, session->request->streambuf, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
|
asio::async_read(*session->connection->socket, session->request->streambuf, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
|
||||||
session->cancel_timeout();
|
session->cancel_timeout();
|
||||||
auto lock = session->cancel_callbacks_mutex->shared_lock();
|
auto lock = session->cancel_callbacks_mutex->shared_lock();
|
||||||
if(*session->cancel_callbacks)
|
if(*session->cancel_callbacks)
|
||||||
|
|
@ -487,7 +527,7 @@ namespace SimpleWeb {
|
||||||
if(on_upgrade) {
|
if(on_upgrade) {
|
||||||
auto it = session->request->header.find("Upgrade");
|
auto it = session->request->header.find("Upgrade");
|
||||||
if(it != session->request->header.end()) {
|
if(it != session->request->header.end()) {
|
||||||
on_upgrade(session->socket, session->request);
|
on_upgrade(session->connection->socket, session->request);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -523,13 +563,13 @@ namespace SimpleWeb {
|
||||||
if(case_insensitive_equal(it->second, "close"))
|
if(case_insensitive_equal(it->second, "close"))
|
||||||
return;
|
return;
|
||||||
else if(case_insensitive_equal(it->second, "keep-alive")) {
|
else if(case_insensitive_equal(it->second, "keep-alive")) {
|
||||||
auto new_session = std::make_shared<Session>(response->session->server, response->session->socket);
|
auto new_session = std::make_shared<Session>(response->session->server, response->session->connection);
|
||||||
this->read_request_and_content(new_session);
|
this->read_request_and_content(new_session);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if(response->session->request->http_version >= "1.1") {
|
if(response->session->request->http_version >= "1.1") {
|
||||||
auto new_session = std::make_shared<Session>(response->session->server, response->session->socket);
|
auto new_session = std::make_shared<Session>(response->session->server, response->session->connection);
|
||||||
this->read_request_and_content(new_session);
|
this->read_request_and_content(new_session);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
@ -565,9 +605,9 @@ namespace SimpleWeb {
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void accept() override {
|
void accept() override {
|
||||||
auto session = std::make_shared<Session>(this, std::make_shared<HTTP>(*io_service));
|
auto session = std::make_shared<Session>(this, create_connection(new HTTP(*io_service)));
|
||||||
|
|
||||||
acceptor->async_accept(*session->socket, [this, session](const error_code &ec) {
|
acceptor->async_accept(*session->connection->socket, [this, session](const error_code &ec) {
|
||||||
auto lock = session->cancel_callbacks_mutex->shared_lock();
|
auto lock = session->cancel_callbacks_mutex->shared_lock();
|
||||||
if(*session->cancel_callbacks)
|
if(*session->cancel_callbacks)
|
||||||
return;
|
return;
|
||||||
|
|
@ -579,7 +619,7 @@ namespace SimpleWeb {
|
||||||
if(!ec) {
|
if(!ec) {
|
||||||
asio::ip::tcp::no_delay option(true);
|
asio::ip::tcp::no_delay option(true);
|
||||||
error_code ec;
|
error_code ec;
|
||||||
session->socket->set_option(option, ec);
|
session->connection->socket->set_option(option, ec);
|
||||||
|
|
||||||
this->read_request_and_content(session);
|
this->read_request_and_content(session);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -51,9 +51,9 @@ namespace SimpleWeb {
|
||||||
asio::ssl::context context;
|
asio::ssl::context context;
|
||||||
|
|
||||||
void accept() override {
|
void accept() override {
|
||||||
auto session = std::make_shared<Session>(this, std::make_shared<HTTPS>(*io_service, context));
|
auto session = std::make_shared<Session>(this, create_connection(new HTTPS(*io_service, context)));
|
||||||
|
|
||||||
acceptor->async_accept(session->socket->lowest_layer(), [this, session](const error_code &ec) {
|
acceptor->async_accept(session->connection->socket->lowest_layer(), [this, session](const error_code &ec) {
|
||||||
auto lock = session->cancel_callbacks_mutex->shared_lock();
|
auto lock = session->cancel_callbacks_mutex->shared_lock();
|
||||||
if(*session->cancel_callbacks)
|
if(*session->cancel_callbacks)
|
||||||
return;
|
return;
|
||||||
|
|
@ -64,10 +64,10 @@ namespace SimpleWeb {
|
||||||
if(!ec) {
|
if(!ec) {
|
||||||
asio::ip::tcp::no_delay option(true);
|
asio::ip::tcp::no_delay option(true);
|
||||||
error_code ec;
|
error_code ec;
|
||||||
session->socket->lowest_layer().set_option(option, ec);
|
session->connection->socket->lowest_layer().set_option(option, ec);
|
||||||
|
|
||||||
session->set_timeout(config.timeout_request);
|
session->set_timeout(config.timeout_request);
|
||||||
session->socket->async_handshake(asio::ssl::stream_base::server, [this, session](const error_code &ec) {
|
session->connection->socket->async_handshake(asio::ssl::stream_base::server, [this, session](const error_code &ec) {
|
||||||
session->cancel_timeout();
|
session->cancel_timeout();
|
||||||
auto lock = session->cancel_callbacks_mutex->shared_lock();
|
auto lock = session->cancel_callbacks_mutex->shared_lock();
|
||||||
if(*session->cancel_callbacks)
|
if(*session->cancel_callbacks)
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ public:
|
||||||
void accept() override {}
|
void accept() override {}
|
||||||
|
|
||||||
void parse_request_test() {
|
void parse_request_test() {
|
||||||
auto session = std::make_shared<Session>(this, std::make_shared<HTTP>(*io_service));
|
auto session = std::make_shared<Session>(this, create_connection(new HTTP(*io_service)));
|
||||||
|
|
||||||
std::ostream stream(&session->request->content.streambuf);
|
std::ostream stream(&session->request->content.streambuf);
|
||||||
stream << "GET /test/ HTTP/1.1\r\n";
|
stream << "GET /test/ HTTP/1.1\r\n";
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue