From 35f835a67bf603348c4e985b95cb00ccfe01464e Mon Sep 17 00:00:00 2001 From: eidheim Date: Wed, 19 Jul 2017 13:03:48 +0200 Subject: [PATCH] Cancel handlers feature: replaced readers-writer lock with a spinlock implementation --- client_http.hpp | 63 ++++++----------- client_https.hpp | 22 +++--- server_http.hpp | 46 ++++-------- server_https.hpp | 8 +-- tests/io_test.cpp | 56 ++++++++------- utility.hpp | 174 +++++++++++++++------------------------------- 6 files changed, 138 insertions(+), 231 deletions(-) diff --git a/client_http.hpp b/client_http.hpp index e4bf516..a9c2ebd 100644 --- a/client_http.hpp +++ b/client_http.hpp @@ -91,12 +91,10 @@ namespace SimpleWeb { class Connection : public std::enable_shared_from_this { public: template - Connection(std::shared_ptr cancel_handlers, std::shared_ptr cancel_handlers_mutex, long timeout, Args &&... args) - : cancel_handlers(std::move(cancel_handlers)), cancel_handlers_mutex(std::move(cancel_handlers_mutex)), timeout(timeout), - socket(new socket_type(std::forward(args)...)) {} + Connection(std::shared_ptr continue_handlers, long timeout, Args &&... args) + : continue_handlers(std::move(continue_handlers)), timeout(timeout), socket(new socket_type(std::forward(args)...)) {} - std::shared_ptr cancel_handlers; - std::shared_ptr cancel_handlers_mutex; + std::shared_ptr continue_handlers; long timeout; std::unique_ptr socket; // Socket must be unique_ptr since asio::ssl::stream is not movable @@ -127,13 +125,6 @@ namespace SimpleWeb { if(timer) timer->cancel(); } - - std::pair> cancel_handlers_bool_and_lock() { - if(!cancel_handlers) - return {false, nullptr}; - auto lock = cancel_handlers_mutex->shared_lock(); - return {*cancel_handlers, std::move(lock)}; - } }; class Session { @@ -337,12 +328,7 @@ namespace SimpleWeb { } virtual ~ClientBase() { - { - if(!internal_io_service) { - auto lock = cancel_handlers_mutex->unique_lock(); - *cancel_handlers = true; - } - } + continue_handlers->stop(); stop(); } @@ -357,13 +343,12 @@ namespace SimpleWeb { std::unordered_set> connections; std::mutex connections_mutex; - std::shared_ptr cancel_handlers; - std::shared_ptr cancel_handlers_mutex; + std::shared_ptr continue_handlers; size_t concurrent_synchronous_requests = 0; std::mutex concurrent_synchronous_requests_mutex; - ClientBase(const std::string &host_port, unsigned short default_port) : cancel_handlers(new bool(false)), cancel_handlers_mutex(new SharedMutex()) { + ClientBase(const std::string &host_port, unsigned short default_port) : continue_handlers(new ContinueScopes()) { auto parsed_host_port = parse_host_port(host_port, default_port); host = parsed_host_port.first; port = parsed_host_port.second; @@ -376,8 +361,6 @@ namespace SimpleWeb { if(!io_service) { io_service = std::make_shared(); internal_io_service = true; - cancel_handlers = nullptr; - cancel_handlers_mutex = nullptr; } for(auto it = connections.begin(); it != connections.end(); ++it) { @@ -442,8 +425,8 @@ namespace SimpleWeb { session->connection->set_timeout(); asio::async_write(*session->connection->socket, session->request_buffer->data(), [this, session](const error_code &ec, size_t /*bytes_transferred*/) { session->connection->cancel_timeout(); - auto cancel_pair = session->connection->cancel_handlers_bool_and_lock(); - if(cancel_pair.first) + auto lock = session->connection->continue_handlers->shared_lock(); + if(!lock) return; if(!ec) this->read(session); @@ -456,8 +439,8 @@ namespace SimpleWeb { session->connection->set_timeout(); asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n\r\n", [this, session](const error_code &ec, size_t bytes_transferred) { session->connection->cancel_timeout(); - auto cancel_pair = session->connection->cancel_handlers_bool_and_lock(); - if(cancel_pair.first) + auto lock = session->connection->continue_handlers->shared_lock(); + if(!lock) return; if(!ec) { session->connection->attempt_reconnect = true; @@ -476,8 +459,8 @@ namespace SimpleWeb { session->connection->set_timeout(); asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, size_t /*bytes_transferred*/) { session->connection->cancel_timeout(); - auto cancel_pair = session->connection->cancel_handlers_bool_and_lock(); - if(cancel_pair.first) + auto lock = session->connection->continue_handlers->shared_lock(); + if(!lock) return; if(!ec) session->callback(session->connection, ec); @@ -496,8 +479,8 @@ namespace SimpleWeb { session->connection->set_timeout(); asio::async_read(*session->connection->socket, session->response->content_buffer, [this, session](const error_code &ec, size_t /*bytes_transferred*/) { session->connection->cancel_timeout(); - auto cancel_pair = session->connection->cancel_handlers_bool_and_lock(); - if(cancel_pair.first) + auto lock = session->connection->continue_handlers->shared_lock(); + if(!lock) return; if(!ec) session->callback(session->connection, ec); @@ -536,8 +519,8 @@ namespace SimpleWeb { session->connection->set_timeout(); asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n", [this, session, tmp_streambuf](const error_code &ec, size_t bytes_transferred) { session->connection->cancel_timeout(); - auto cancel_pair = session->connection->cancel_handlers_bool_and_lock(); - if(cancel_pair.first) + auto lock = session->connection->continue_handlers->shared_lock(); + if(!lock) return; if(!ec) { std::string line; @@ -574,8 +557,8 @@ namespace SimpleWeb { session->connection->set_timeout(); asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(2 + length - num_additional_bytes), [this, session, post_process](const error_code &ec, size_t /*bytes_transferred*/) { session->connection->cancel_timeout(); - auto cancel_pair = session->connection->cancel_handlers_bool_and_lock(); - if(cancel_pair.first) + auto lock = session->connection->continue_handlers->shared_lock(); + if(!lock) return; if(!ec) post_process(); @@ -604,7 +587,7 @@ namespace SimpleWeb { protected: std::shared_ptr create_connection() override { - return std::make_shared(cancel_handlers, cancel_handlers_mutex, config.timeout, *io_service); + return std::make_shared(continue_handlers, config.timeout, *io_service); } void connect(const std::shared_ptr &session) override { @@ -613,15 +596,15 @@ namespace SimpleWeb { session->connection->set_timeout(config.timeout_connect); resolver->async_resolve(*query, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator it) { session->connection->cancel_timeout(); - auto cancel_pair = session->connection->cancel_handlers_bool_and_lock(); - if(cancel_pair.first) + auto lock = session->connection->continue_handlers->shared_lock(); + if(!lock) return; if(!ec) { session->connection->set_timeout(config.timeout_connect); asio::async_connect(*session->connection->socket, it, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator /*it*/) { session->connection->cancel_timeout(); - auto cancel_pair = session->connection->cancel_handlers_bool_and_lock(); - if(cancel_pair.first) + auto lock = session->connection->continue_handlers->shared_lock(); + if(!lock) return; if(!ec) { asio::ip::tcp::no_delay option(true); diff --git a/client_https.hpp b/client_https.hpp index 9d1fe1b..064acfa 100644 --- a/client_https.hpp +++ b/client_https.hpp @@ -41,22 +41,22 @@ namespace SimpleWeb { asio::ssl::context context; std::shared_ptr create_connection() override { - return std::make_shared(cancel_handlers, cancel_handlers_mutex, config.timeout, *io_service, context); + return std::make_shared(continue_handlers, config.timeout, *io_service, context); } void connect(const std::shared_ptr &session) override { if(!session->connection->socket->lowest_layer().is_open()) { auto resolver = std::make_shared(*io_service); resolver->async_resolve(*query, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator it) { - auto cancel_pair = session->connection->cancel_handlers_bool_and_lock(); - if(cancel_pair.first) + auto lock = session->connection->continue_handlers->shared_lock(); + if(!lock) return; if(!ec) { session->connection->set_timeout(this->config.timeout_connect); asio::async_connect(session->connection->socket->lowest_layer(), it, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator /*it*/) { session->connection->cancel_timeout(); - auto cancel_pair = session->connection->cancel_handlers_bool_and_lock(); - if(cancel_pair.first) + auto lock = session->connection->continue_handlers->shared_lock(); + if(!lock) return; if(!ec) { asio::ip::tcp::no_delay option(true); @@ -72,16 +72,16 @@ namespace SimpleWeb { session->connection->set_timeout(this->config.timeout_connect); asio::async_write(session->connection->socket->next_layer(), *write_buffer, [this, session, write_buffer](const error_code &ec, size_t /*bytes_transferred*/) { session->connection->cancel_timeout(); - auto cancel_pair = session->connection->cancel_handlers_bool_and_lock(); - if(cancel_pair.first) + auto lock = session->connection->continue_handlers->shared_lock(); + if(!lock) return; if(!ec) { std::shared_ptr response(new Response()); session->connection->set_timeout(this->config.timeout_connect); asio::async_read_until(session->connection->socket->next_layer(), response->content_buffer, "\r\n\r\n", [this, session, response](const error_code &ec, size_t /*bytes_transferred*/) { session->connection->cancel_timeout(); - auto cancel_pair = session->connection->cancel_handlers_bool_and_lock(); - if(cancel_pair.first) + auto lock = session->connection->continue_handlers->shared_lock(); + if(!lock) return; if(!ec) { if(!ResponseMessage::parse(response->content, response->http_version, response->status_code, response->header)) @@ -120,8 +120,8 @@ namespace SimpleWeb { session->connection->set_timeout(this->config.timeout_connect); session->connection->socket->async_handshake(asio::ssl::stream_base::client, [this, session](const error_code &ec) { session->connection->cancel_timeout(); - auto cancel_pair = session->connection->cancel_handlers_bool_and_lock(); - if(cancel_pair.first) + auto lock = session->connection->continue_handlers->shared_lock(); + if(!lock) return; if(!ec) this->write(session); diff --git a/server_http.hpp b/server_http.hpp index b42b835..648ce0a 100644 --- a/server_http.hpp +++ b/server_http.hpp @@ -90,8 +90,8 @@ namespace SimpleWeb { auto self = this->shared_from_this(); // Keep Response instance alive through the following async_write asio::async_write(*session->connection->socket, streambuf, [self, callback](const error_code &ec, size_t /*bytes_transferred*/) { self->session->connection->cancel_timeout(); - auto cancel_pair = self->session->connection->cancel_handlers_bool_and_lock(); - if(cancel_pair.first) + auto lock = self->session->connection->continue_handlers->shared_lock(); + if(!lock) return; if(callback) callback(ec); @@ -202,11 +202,9 @@ namespace SimpleWeb { class Connection : public std::enable_shared_from_this { public: template - Connection(std::shared_ptr cancel_handlers, std::shared_ptr cancel_handlers_mutex, Args &&... args) - : cancel_handlers(std::move(cancel_handlers)), cancel_handlers_mutex(std::move(cancel_handlers_mutex)), socket(new socket_type(std::forward(args)...)) {} + Connection(std::shared_ptr continue_handlers, Args &&... args) : continue_handlers(std::move(continue_handlers)), socket(new socket_type(std::forward(args)...)) {} - std::shared_ptr cancel_handlers; - std::shared_ptr cancel_handlers_mutex; + std::shared_ptr continue_handlers; std::unique_ptr socket; // Socket must be unique_ptr since asio::ssl::stream is not movable std::mutex socket_close_mutex; @@ -239,13 +237,6 @@ namespace SimpleWeb { if(timer) timer->cancel(); } - - std::pair> cancel_handlers_bool_and_lock() { - if(!cancel_handlers) - return {false, nullptr}; - auto lock = cancel_handlers_mutex->shared_lock(); - return {*cancel_handlers, std::move(lock)}; - } }; class Session { @@ -318,8 +309,6 @@ namespace SimpleWeb { if(!io_service) { io_service = std::make_shared(); internal_io_service = true; - cancel_handlers = nullptr; - cancel_handlers_mutex = nullptr; } if(io_service->stopped()) @@ -378,12 +367,7 @@ namespace SimpleWeb { } virtual ~ServerBase() { - { - if(!internal_io_service) { - auto lock = cancel_handlers_mutex->unique_lock(); - *cancel_handlers = true; - } - } + continue_handlers->stop(); stop(); } @@ -396,11 +380,9 @@ namespace SimpleWeb { std::shared_ptr> connections; std::shared_ptr connections_mutex; - std::shared_ptr cancel_handlers; - std::shared_ptr cancel_handlers_mutex; + std::shared_ptr continue_handlers; - ServerBase(unsigned short port) : config(port), connections(new std::unordered_set()), connections_mutex(new std::mutex()), - cancel_handlers(new bool(false)), cancel_handlers_mutex(new SharedMutex()) {} + ServerBase(unsigned short port) : config(port), connections(new std::unordered_set()), connections_mutex(new std::mutex()), continue_handlers(new ContinueScopes()) {} virtual void accept() = 0; @@ -408,7 +390,7 @@ namespace SimpleWeb { std::shared_ptr create_connection(Args &&... args) { auto connections = this->connections; auto connections_mutex = this->connections_mutex; - auto connection = std::shared_ptr(new Connection(cancel_handlers, cancel_handlers_mutex, std::forward(args)...), [connections, connections_mutex](Connection *connection) { + auto connection = std::shared_ptr(new Connection(continue_handlers, std::forward(args)...), [connections, connections_mutex](Connection *connection) { { std::unique_lock lock(*connections_mutex); auto it = connections->find(connection); @@ -428,8 +410,8 @@ namespace SimpleWeb { session->connection->set_timeout(config.timeout_request); asio::async_read_until(*session->connection->socket, session->request->streambuf, "\r\n\r\n", [this, session](const error_code &ec, size_t bytes_transferred) { session->connection->cancel_timeout(); - auto cancel_pair = session->connection->cancel_handlers_bool_and_lock(); - if(cancel_pair.first) + auto lock = session->connection->continue_handlers->shared_lock(); + if(!lock) return; if(!ec) { // request->streambuf.size() is not necessarily the same as bytes_transferred, from Boost-docs: @@ -461,8 +443,8 @@ namespace SimpleWeb { session->connection->set_timeout(config.timeout_content); asio::async_read(*session->connection->socket, session->request->streambuf, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, size_t /*bytes_transferred*/) { session->connection->cancel_timeout(); - auto cancel_pair = session->connection->cancel_handlers_bool_and_lock(); - if(cancel_pair.first) + auto lock = session->connection->continue_handlers->shared_lock(); + if(!lock) return; if(!ec) this->find_resource(session); @@ -572,8 +554,8 @@ namespace SimpleWeb { auto session = std::make_shared(create_connection(*io_service)); acceptor->async_accept(*session->connection->socket, [this, session](const error_code &ec) { - auto cancel_pair = session->connection->cancel_handlers_bool_and_lock(); - if(cancel_pair.first) + auto lock = session->connection->continue_handlers->shared_lock(); + if(!lock) return; // Immediately start accepting a new connection (unless io_service has been stopped) diff --git a/server_https.hpp b/server_https.hpp index f247308..9f3e5fc 100644 --- a/server_https.hpp +++ b/server_https.hpp @@ -51,8 +51,8 @@ namespace SimpleWeb { auto session = std::make_shared(create_connection(*io_service, context)); acceptor->async_accept(session->connection->socket->lowest_layer(), [this, session](const error_code &ec) { - auto cancel_pair = session->connection->cancel_handlers_bool_and_lock(); - if(cancel_pair.first) + auto lock = session->connection->continue_handlers->shared_lock(); + if(!lock) return; if(ec != asio::error::operation_aborted) @@ -66,8 +66,8 @@ namespace SimpleWeb { session->connection->set_timeout(config.timeout_request); session->connection->socket->async_handshake(asio::ssl::stream_base::server, [this, session](const error_code &ec) { session->connection->cancel_timeout(); - auto cancel_pair = session->connection->cancel_handlers_bool_and_lock(); - if(cancel_pair.first) + auto lock = session->connection->continue_handlers->shared_lock(); + if(!lock) return; if(!ec) this->read_request_and_content(session); diff --git a/tests/io_test.cpp b/tests/io_test.cpp index 30895dd..e33241e 100644 --- a/tests/io_test.cpp +++ b/tests/io_test.cpp @@ -13,36 +13,42 @@ typedef SimpleWeb::Server HttpServer; typedef SimpleWeb::Client HttpClient; int main() { + // Test ContinueScopes { - // Test SharedMutex - SimpleWeb::SharedMutex mutex; - int count = 0; + SimpleWeb::ContinueScopes continue_scopes; + std::thread cancel_thread; { - thread t([&] { - auto lock = mutex.shared_lock(); - { - auto lock = mutex.shared_lock(); - ++count; - } + assert(continue_scopes.count == 0); + auto lock = continue_scopes.shared_lock(); + assert(continue_scopes.count == 1); + { + auto lock = continue_scopes.shared_lock(); + assert(continue_scopes.count == 2); + } + assert(continue_scopes.count == 1); + cancel_thread = thread([&continue_scopes] { + continue_scopes.stop(); + assert(continue_scopes.count == -1); }); - this_thread::sleep_for(chrono::milliseconds(100)); - t.detach(); - assert(count == 1); + this_thread::sleep_for(chrono::milliseconds(500)); + assert(continue_scopes.count == 1); } - thread t; - { - auto lock = mutex.unique_lock(); - t = thread([&] { - auto lock = mutex.unique_lock(); - ++count; - }); - this_thread::sleep_for(chrono::milliseconds(100)); - assert(count == 1); - } - t.join(); - assert(count == 2); - } + cancel_thread.join(); + assert(continue_scopes.count == -1); + continue_scopes.count = 0; + + vector threads; + for(size_t c = 0; c < 100; ++c) { + threads.emplace_back([&continue_scopes] { + auto lock = continue_scopes.shared_lock(); + assert(continue_scopes.count > 0); + }); + } + for(auto &thread : threads) + thread.join(); + assert(continue_scopes.count == 0); + } HttpServer server; server.config.port = 8080; diff --git a/utility.hpp b/utility.hpp index 70f81db..3068444 100644 --- a/utility.hpp +++ b/utility.hpp @@ -2,6 +2,7 @@ #define SIMPLE_WEB_UTILITY_HPP #include "status_code.hpp" +#include #include #include #include @@ -234,130 +235,65 @@ namespace SimpleWeb { }; } // namespace SimpleWeb -#ifdef PTHREAD_RWLOCK_INITIALIZER +#ifdef __SSE2__ +#include namespace SimpleWeb { - /// Read-preferring R/W lock. - /// Uses pthread_rwlock. - class SharedMutex { - pthread_rwlock_t rwlock; - - public: - class SharedLock { - friend class SharedMutex; - pthread_rwlock_t &rwlock; - - SharedLock(pthread_rwlock_t &rwlock) : rwlock(rwlock) { - pthread_rwlock_rdlock(&rwlock); - } - - public: - ~SharedLock() { - pthread_rwlock_unlock(&rwlock); - } - }; - - class UniqueLock { - friend class SharedMutex; - pthread_rwlock_t &rwlock; - - UniqueLock(pthread_rwlock_t &rwlock) : rwlock(rwlock) { - pthread_rwlock_wrlock(&rwlock); - } - - public: - ~UniqueLock() { - pthread_rwlock_unlock(&rwlock); - } - }; - - public: - SharedMutex() { - - pthread_rwlock_init(&rwlock, nullptr); - } - - ~SharedMutex() { - pthread_rwlock_destroy(&rwlock); - } - - std::unique_ptr shared_lock() { - return std::unique_ptr(new SharedLock(rwlock)); - } - - std::unique_ptr unique_lock() { - return std::unique_ptr(new UniqueLock(rwlock)); - } - }; + inline void spin_loop_pause() { _mm_pause(); } +} // namespace SimpleWeb +// TODO: need verification that the following checks are correct: +#elif defined(_MSC_VER) && _MSC_VER >= 1800 && (defined(_M_X64) || defined(_M_IX86)) +#include +namespace SimpleWeb { + inline void spin_loop_pause() { _mm_pause(); } } // namespace SimpleWeb #else -#include -#include namespace SimpleWeb { - /// Read-preferring R/W lock. - /// Based on https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Using_a_condition_variable_and_a_mutex pseudocode. - /// TODO: Someone that uses Windows should implement Windows specific R/W locks here. - class SharedMutex { - std::mutex m; - std::condition_variable c; - int r = 0; - bool w = false; - - public: - class SharedLock { - friend class SharedMutex; - std::condition_variable &c; - int &r; - std::unique_lock lock; - - SharedLock(std::mutex &m, std::condition_variable &c, int &r, bool &w) : c(c), r(r), lock(m) { - while(w) - c.wait(lock); - ++r; - lock.unlock(); - } - - public: - ~SharedLock() { - lock.lock(); - --r; - if(r == 0) - c.notify_all(); - lock.unlock(); - } - }; - - class UniqueLock { - friend class SharedMutex; - std::condition_variable &c; - bool &w; - std::unique_lock lock; - - UniqueLock(std::mutex &m, std::condition_variable &c, int &r, bool &w) : c(c), w(w), lock(m) { - while(w || r > 0) - c.wait(lock); - w = true; - lock.unlock(); - } - - public: - ~UniqueLock() { - lock.lock(); - w = false; - c.notify_all(); - lock.unlock(); - } - }; - - public: - std::unique_ptr shared_lock() { - return std::unique_ptr(new SharedLock(m, c, r, w)); - } - - std::unique_ptr unique_lock() { - return std::unique_ptr(new UniqueLock(m, c, r, w)); - } - }; + inline void spin_loop_pause() {} } // namespace SimpleWeb #endif +namespace SimpleWeb { + /// Makes it possible to for instance cancel Asio handlers without stopping asio::io_service + class ContinueScopes { + /// Scope count that is set to -1 if scopes are to be canceled + std::atomic count; + + public: + class SharedLock { + std::atomic &count; + SharedLock &operator=(const SharedLock &) = delete; + SharedLock(const SharedLock &) = delete; + + public: + SharedLock(std::atomic &count) : count(count) {} + ~SharedLock() { + count.fetch_sub(1); + } + }; + + ContinueScopes() : count(0) {} + + /// Returns nullptr if scope is to be cancelled, or a shared lock otherwise + std::unique_ptr shared_lock() { + long expected = count; + while(expected >= 0 && !count.compare_exchange_weak(expected, expected + 1)) + spin_loop_pause(); + + if(expected < 0) + return nullptr; + else + return std::unique_ptr(new SharedLock(count)); + } + + //// Blocks until all shared locks are released, then prevents future shared locks + void stop() { + long expected = 0; + while(!count.compare_exchange_weak(expected, -1)) { + expected = 0; + spin_loop_pause(); + } + } + }; +} // namespace SimpleWeb + #endif // SIMPLE_WEB_UTILITY_HPP