Renamed ScopesContinue to ScopeRunner, and some related cleanup

This commit is contained in:
eidheim 2017-07-20 17:21:19 +02:00
commit 474fab90f3
6 changed files with 58 additions and 56 deletions

View file

@ -91,10 +91,10 @@ namespace SimpleWeb {
class Connection : public std::enable_shared_from_this<Connection> { class Connection : public std::enable_shared_from_this<Connection> {
public: public:
template <typename... Args> template <typename... Args>
Connection(std::shared_ptr<ScopesContinue> handlers_continue, long timeout, Args &&... args) Connection(std::shared_ptr<ScopeRunner> handler_runner, long timeout, Args &&... args)
: handlers_continue(std::move(handlers_continue)), timeout(timeout), socket(new socket_type(std::forward<Args>(args)...)) {} : handler_runner(std::move(handler_runner)), timeout(timeout), socket(new socket_type(std::forward<Args>(args)...)) {}
std::shared_ptr<ScopesContinue> handlers_continue; std::shared_ptr<ScopeRunner> handler_runner;
long timeout; long timeout;
std::unique_ptr<socket_type> socket; // Socket must be unique_ptr since asio::ssl::stream<asio::ip::tcp::socket> is not movable std::unique_ptr<socket_type> socket; // Socket must be unique_ptr since asio::ssl::stream<asio::ip::tcp::socket> is not movable
@ -328,7 +328,7 @@ namespace SimpleWeb {
} }
virtual ~ClientBase() { virtual ~ClientBase() {
handlers_continue->stop(); handler_runner->stop();
stop(); stop();
} }
@ -343,12 +343,12 @@ namespace SimpleWeb {
std::unordered_set<std::shared_ptr<Connection>> connections; std::unordered_set<std::shared_ptr<Connection>> connections;
std::mutex connections_mutex; std::mutex connections_mutex;
std::shared_ptr<ScopesContinue> handlers_continue; std::shared_ptr<ScopeRunner> handler_runner;
size_t concurrent_synchronous_requests = 0; size_t concurrent_synchronous_requests = 0;
std::mutex concurrent_synchronous_requests_mutex; std::mutex concurrent_synchronous_requests_mutex;
ClientBase(const std::string &host_port, unsigned short default_port) : handlers_continue(new ScopesContinue()) { ClientBase(const std::string &host_port, unsigned short default_port) : handler_runner(new ScopeRunner()) {
auto parsed_host_port = parse_host_port(host_port, default_port); auto parsed_host_port = parse_host_port(host_port, default_port);
host = parsed_host_port.first; host = parsed_host_port.first;
port = parsed_host_port.second; port = parsed_host_port.second;
@ -425,7 +425,7 @@ namespace SimpleWeb {
session->connection->set_timeout(); session->connection->set_timeout();
asio::async_write(*session->connection->socket, session->request_buffer->data(), [this, session](const error_code &ec, size_t /*bytes_transferred*/) { asio::async_write(*session->connection->socket, session->request_buffer->data(), [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
session->connection->cancel_timeout(); session->connection->cancel_timeout();
auto lock = session->connection->handlers_continue->shared_lock(); auto lock = session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;
if(!ec) if(!ec)
@ -439,7 +439,7 @@ namespace SimpleWeb {
session->connection->set_timeout(); session->connection->set_timeout();
asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n\r\n", [this, session](const error_code &ec, size_t bytes_transferred) { asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n\r\n", [this, session](const error_code &ec, size_t bytes_transferred) {
session->connection->cancel_timeout(); session->connection->cancel_timeout();
auto lock = session->connection->handlers_continue->shared_lock(); auto lock = session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;
if(!ec) { if(!ec) {
@ -459,7 +459,7 @@ namespace SimpleWeb {
session->connection->set_timeout(); session->connection->set_timeout();
asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, size_t /*bytes_transferred*/) { asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
session->connection->cancel_timeout(); session->connection->cancel_timeout();
auto lock = session->connection->handlers_continue->shared_lock(); auto lock = session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;
if(!ec) if(!ec)
@ -479,7 +479,7 @@ namespace SimpleWeb {
session->connection->set_timeout(); session->connection->set_timeout();
asio::async_read(*session->connection->socket, session->response->content_buffer, [this, session](const error_code &ec, size_t /*bytes_transferred*/) { asio::async_read(*session->connection->socket, session->response->content_buffer, [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
session->connection->cancel_timeout(); session->connection->cancel_timeout();
auto lock = session->connection->handlers_continue->shared_lock(); auto lock = session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;
if(!ec) if(!ec)
@ -519,7 +519,7 @@ namespace SimpleWeb {
session->connection->set_timeout(); session->connection->set_timeout();
asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n", [this, session, tmp_streambuf](const error_code &ec, size_t bytes_transferred) { asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n", [this, session, tmp_streambuf](const error_code &ec, size_t bytes_transferred) {
session->connection->cancel_timeout(); session->connection->cancel_timeout();
auto lock = session->connection->handlers_continue->shared_lock(); auto lock = session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;
if(!ec) { if(!ec) {
@ -557,7 +557,7 @@ namespace SimpleWeb {
session->connection->set_timeout(); session->connection->set_timeout();
asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(2 + length - num_additional_bytes), [this, session, post_process](const error_code &ec, size_t /*bytes_transferred*/) { asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(2 + length - num_additional_bytes), [this, session, post_process](const error_code &ec, size_t /*bytes_transferred*/) {
session->connection->cancel_timeout(); session->connection->cancel_timeout();
auto lock = session->connection->handlers_continue->shared_lock(); auto lock = session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;
if(!ec) if(!ec)
@ -587,7 +587,7 @@ namespace SimpleWeb {
protected: protected:
std::shared_ptr<Connection> create_connection() override { std::shared_ptr<Connection> create_connection() override {
return std::make_shared<Connection>(handlers_continue, config.timeout, *io_service); return std::make_shared<Connection>(handler_runner, config.timeout, *io_service);
} }
void connect(const std::shared_ptr<Session> &session) override { void connect(const std::shared_ptr<Session> &session) override {
@ -596,14 +596,14 @@ namespace SimpleWeb {
session->connection->set_timeout(config.timeout_connect); session->connection->set_timeout(config.timeout_connect);
resolver->async_resolve(*query, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator it) { resolver->async_resolve(*query, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator it) {
session->connection->cancel_timeout(); session->connection->cancel_timeout();
auto lock = session->connection->handlers_continue->shared_lock(); auto lock = session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;
if(!ec) { if(!ec) {
session->connection->set_timeout(config.timeout_connect); session->connection->set_timeout(config.timeout_connect);
asio::async_connect(*session->connection->socket, it, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator /*it*/) { asio::async_connect(*session->connection->socket, it, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator /*it*/) {
session->connection->cancel_timeout(); session->connection->cancel_timeout();
auto lock = session->connection->handlers_continue->shared_lock(); auto lock = session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;
if(!ec) { if(!ec) {

View file

@ -41,21 +41,21 @@ namespace SimpleWeb {
asio::ssl::context context; asio::ssl::context context;
std::shared_ptr<Connection> create_connection() override { std::shared_ptr<Connection> create_connection() override {
return std::make_shared<Connection>(handlers_continue, config.timeout, *io_service, context); return std::make_shared<Connection>(handler_runner, config.timeout, *io_service, context);
} }
void connect(const std::shared_ptr<Session> &session) override { void connect(const std::shared_ptr<Session> &session) override {
if(!session->connection->socket->lowest_layer().is_open()) { if(!session->connection->socket->lowest_layer().is_open()) {
auto resolver = std::make_shared<asio::ip::tcp::resolver>(*io_service); auto resolver = std::make_shared<asio::ip::tcp::resolver>(*io_service);
resolver->async_resolve(*query, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator it) { resolver->async_resolve(*query, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator it) {
auto lock = session->connection->handlers_continue->shared_lock(); auto lock = session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;
if(!ec) { if(!ec) {
session->connection->set_timeout(this->config.timeout_connect); session->connection->set_timeout(this->config.timeout_connect);
asio::async_connect(session->connection->socket->lowest_layer(), it, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator /*it*/) { asio::async_connect(session->connection->socket->lowest_layer(), it, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator /*it*/) {
session->connection->cancel_timeout(); session->connection->cancel_timeout();
auto lock = session->connection->handlers_continue->shared_lock(); auto lock = session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;
if(!ec) { if(!ec) {
@ -72,7 +72,7 @@ namespace SimpleWeb {
session->connection->set_timeout(this->config.timeout_connect); session->connection->set_timeout(this->config.timeout_connect);
asio::async_write(session->connection->socket->next_layer(), *write_buffer, [this, session, write_buffer](const error_code &ec, size_t /*bytes_transferred*/) { asio::async_write(session->connection->socket->next_layer(), *write_buffer, [this, session, write_buffer](const error_code &ec, size_t /*bytes_transferred*/) {
session->connection->cancel_timeout(); session->connection->cancel_timeout();
auto lock = session->connection->handlers_continue->shared_lock(); auto lock = session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;
if(!ec) { if(!ec) {
@ -80,7 +80,7 @@ namespace SimpleWeb {
session->connection->set_timeout(this->config.timeout_connect); session->connection->set_timeout(this->config.timeout_connect);
asio::async_read_until(session->connection->socket->next_layer(), response->content_buffer, "\r\n\r\n", [this, session, response](const error_code &ec, size_t /*bytes_transferred*/) { asio::async_read_until(session->connection->socket->next_layer(), response->content_buffer, "\r\n\r\n", [this, session, response](const error_code &ec, size_t /*bytes_transferred*/) {
session->connection->cancel_timeout(); session->connection->cancel_timeout();
auto lock = session->connection->handlers_continue->shared_lock(); auto lock = session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;
if(!ec) { if(!ec) {
@ -120,7 +120,7 @@ namespace SimpleWeb {
session->connection->set_timeout(this->config.timeout_connect); session->connection->set_timeout(this->config.timeout_connect);
session->connection->socket->async_handshake(asio::ssl::stream_base::client, [this, session](const error_code &ec) { session->connection->socket->async_handshake(asio::ssl::stream_base::client, [this, session](const error_code &ec) {
session->connection->cancel_timeout(); session->connection->cancel_timeout();
auto lock = session->connection->handlers_continue->shared_lock(); auto lock = session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;
if(!ec) if(!ec)

View file

@ -90,7 +90,7 @@ namespace SimpleWeb {
auto self = this->shared_from_this(); // Keep Response instance alive through the following async_write auto self = this->shared_from_this(); // Keep Response instance alive through the following async_write
asio::async_write(*session->connection->socket, streambuf, [self, callback](const error_code &ec, size_t /*bytes_transferred*/) { asio::async_write(*session->connection->socket, streambuf, [self, callback](const error_code &ec, size_t /*bytes_transferred*/) {
self->session->connection->cancel_timeout(); self->session->connection->cancel_timeout();
auto lock = self->session->connection->handlers_continue->shared_lock(); auto lock = self->session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;
if(callback) if(callback)
@ -202,9 +202,9 @@ namespace SimpleWeb {
class Connection : public std::enable_shared_from_this<Connection> { class Connection : public std::enable_shared_from_this<Connection> {
public: public:
template <typename... Args> template <typename... Args>
Connection(std::shared_ptr<ScopesContinue> handlers_continue, Args &&... args) : handlers_continue(std::move(handlers_continue)), socket(new socket_type(std::forward<Args>(args)...)) {} Connection(std::shared_ptr<ScopeRunner> handler_runner, Args &&... args) : handler_runner(std::move(handler_runner)), socket(new socket_type(std::forward<Args>(args)...)) {}
std::shared_ptr<ScopesContinue> handlers_continue; std::shared_ptr<ScopeRunner> handler_runner;
std::unique_ptr<socket_type> socket; // Socket must be unique_ptr since asio::ssl::stream<asio::ip::tcp::socket> is not movable std::unique_ptr<socket_type> socket; // Socket must be unique_ptr since asio::ssl::stream<asio::ip::tcp::socket> is not movable
std::mutex socket_close_mutex; std::mutex socket_close_mutex;
@ -367,7 +367,7 @@ namespace SimpleWeb {
} }
virtual ~ServerBase() { virtual ~ServerBase() {
handlers_continue->stop(); handler_runner->stop();
stop(); stop();
} }
@ -380,9 +380,9 @@ namespace SimpleWeb {
std::shared_ptr<std::unordered_set<Connection *>> connections; std::shared_ptr<std::unordered_set<Connection *>> connections;
std::shared_ptr<std::mutex> connections_mutex; std::shared_ptr<std::mutex> connections_mutex;
std::shared_ptr<ScopesContinue> handlers_continue; std::shared_ptr<ScopeRunner> handler_runner;
ServerBase(unsigned short port) : config(port), connections(new std::unordered_set<Connection *>()), connections_mutex(new std::mutex()), handlers_continue(new ScopesContinue()) {} ServerBase(unsigned short port) : config(port), connections(new std::unordered_set<Connection *>()), connections_mutex(new std::mutex()), handler_runner(new ScopeRunner()) {}
virtual void accept() = 0; virtual void accept() = 0;
@ -390,7 +390,7 @@ namespace SimpleWeb {
std::shared_ptr<Connection> create_connection(Args &&... args) { std::shared_ptr<Connection> create_connection(Args &&... args) {
auto connections = this->connections; auto connections = this->connections;
auto connections_mutex = this->connections_mutex; auto connections_mutex = this->connections_mutex;
auto connection = std::shared_ptr<Connection>(new Connection(handlers_continue, std::forward<Args>(args)...), [connections, connections_mutex](Connection *connection) { auto connection = std::shared_ptr<Connection>(new Connection(handler_runner, std::forward<Args>(args)...), [connections, connections_mutex](Connection *connection) {
{ {
std::unique_lock<std::mutex> lock(*connections_mutex); std::unique_lock<std::mutex> lock(*connections_mutex);
auto it = connections->find(connection); auto it = connections->find(connection);
@ -410,7 +410,7 @@ namespace SimpleWeb {
session->connection->set_timeout(config.timeout_request); session->connection->set_timeout(config.timeout_request);
asio::async_read_until(*session->connection->socket, session->request->streambuf, "\r\n\r\n", [this, session](const error_code &ec, size_t bytes_transferred) { asio::async_read_until(*session->connection->socket, session->request->streambuf, "\r\n\r\n", [this, session](const error_code &ec, size_t bytes_transferred) {
session->connection->cancel_timeout(); session->connection->cancel_timeout();
auto lock = session->connection->handlers_continue->shared_lock(); auto lock = session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;
if(!ec) { if(!ec) {
@ -443,7 +443,7 @@ namespace SimpleWeb {
session->connection->set_timeout(config.timeout_content); session->connection->set_timeout(config.timeout_content);
asio::async_read(*session->connection->socket, session->request->streambuf, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, size_t /*bytes_transferred*/) { asio::async_read(*session->connection->socket, session->request->streambuf, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
session->connection->cancel_timeout(); session->connection->cancel_timeout();
auto lock = session->connection->handlers_continue->shared_lock(); auto lock = session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;
if(!ec) if(!ec)
@ -554,7 +554,7 @@ namespace SimpleWeb {
auto session = std::make_shared<Session>(create_connection(*io_service)); auto session = std::make_shared<Session>(create_connection(*io_service));
acceptor->async_accept(*session->connection->socket, [this, session](const error_code &ec) { acceptor->async_accept(*session->connection->socket, [this, session](const error_code &ec) {
auto lock = session->connection->handlers_continue->shared_lock(); auto lock = session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;

View file

@ -51,7 +51,7 @@ namespace SimpleWeb {
auto session = std::make_shared<Session>(create_connection(*io_service, context)); auto session = std::make_shared<Session>(create_connection(*io_service, context));
acceptor->async_accept(session->connection->socket->lowest_layer(), [this, session](const error_code &ec) { acceptor->async_accept(session->connection->socket->lowest_layer(), [this, session](const error_code &ec) {
auto lock = session->connection->handlers_continue->shared_lock(); auto lock = session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;
@ -66,7 +66,7 @@ namespace SimpleWeb {
session->connection->set_timeout(config.timeout_request); session->connection->set_timeout(config.timeout_request);
session->connection->socket->async_handshake(asio::ssl::stream_base::server, [this, session](const error_code &ec) { session->connection->socket->async_handshake(asio::ssl::stream_base::server, [this, session](const error_code &ec) {
session->connection->cancel_timeout(); session->connection->cancel_timeout();
auto lock = session->connection->handlers_continue->shared_lock(); auto lock = session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;
if(!ec) if(!ec)

View file

@ -15,41 +15,43 @@ typedef SimpleWeb::Client<SimpleWeb::HTTP> HttpClient;
int main() { int main() {
// Test ScopesContinue // Test ScopesContinue
{ {
SimpleWeb::ScopesContinue scopes_continue; SimpleWeb::ScopeRunner scope_runner;
std::thread cancel_thread; std::thread cancel_thread;
{ {
assert(scopes_continue.count == 0); assert(scope_runner.count == 0);
auto lock = scopes_continue.shared_lock(); auto lock = scope_runner.continue_lock();
assert(scopes_continue.count == 1); assert(lock);
assert(scope_runner.count == 1);
{ {
auto lock = scopes_continue.shared_lock(); auto lock = scope_runner.continue_lock();
assert(scopes_continue.count == 2); assert(lock);
assert(scope_runner.count == 2);
} }
assert(scopes_continue.count == 1); assert(scope_runner.count == 1);
cancel_thread = thread([&scopes_continue] { cancel_thread = thread([&scope_runner] {
scopes_continue.stop(); scope_runner.stop();
assert(scopes_continue.count == -1); assert(scope_runner.count == -1);
}); });
this_thread::sleep_for(chrono::milliseconds(500)); this_thread::sleep_for(chrono::milliseconds(500));
assert(scopes_continue.count == 1); assert(scope_runner.count == 1);
} }
cancel_thread.join(); cancel_thread.join();
assert(scopes_continue.count == -1); assert(scope_runner.count == -1);
auto lock = scopes_continue.shared_lock(); auto lock = scope_runner.continue_lock();
assert(!lock); assert(!lock);
scopes_continue.count = 0; scope_runner.count = 0;
vector<thread> threads; vector<thread> threads;
for(size_t c = 0; c < 100; ++c) { for(size_t c = 0; c < 100; ++c) {
threads.emplace_back([&scopes_continue] { threads.emplace_back([&scope_runner] {
auto lock = scopes_continue.shared_lock(); auto lock = scope_runner.continue_lock();
assert(scopes_continue.count > 0); assert(scope_runner.count > 0);
}); });
} }
for(auto &thread : threads) for(auto &thread : threads)
thread.join(); thread.join();
assert(scopes_continue.count == 0); assert(scope_runner.count == 0);
} }
HttpServer server; HttpServer server;

View file

@ -253,13 +253,13 @@ namespace SimpleWeb {
namespace SimpleWeb { namespace SimpleWeb {
/// Makes it possible to for instance cancel Asio handlers without stopping asio::io_service /// Makes it possible to for instance cancel Asio handlers without stopping asio::io_service
class ScopesContinue { class ScopeRunner {
/// Scope count that is set to -1 if scopes are to be canceled /// Scope count that is set to -1 if scopes are to be canceled
std::atomic<long> count; std::atomic<long> count;
public: public:
class SharedLock { class SharedLock {
friend class ScopesContinue; friend class ScopeRunner;
std::atomic<long> &count; std::atomic<long> &count;
SharedLock(std::atomic<long> &count) : count(count) {} SharedLock(std::atomic<long> &count) : count(count) {}
SharedLock &operator=(const SharedLock &) = delete; SharedLock &operator=(const SharedLock &) = delete;
@ -271,10 +271,10 @@ namespace SimpleWeb {
} }
}; };
ScopesContinue() : count(0) {} ScopeRunner() : count(0) {}
/// Returns nullptr if scope is to be cancelled, or a shared lock otherwise /// Returns nullptr if scope should be exited, or a shared lock otherwise
std::unique_ptr<SharedLock> shared_lock() { std::unique_ptr<SharedLock> continue_lock() {
long expected = count; long expected = count;
while(expected >= 0 && !count.compare_exchange_weak(expected, expected + 1)) while(expected >= 0 && !count.compare_exchange_weak(expected, expected + 1))
spin_loop_pause(); spin_loop_pause();