Cancel handlers feature: replaced readers-writer lock with a spinlock implementation
This commit is contained in:
parent
632bb1ec2f
commit
35f835a67b
6 changed files with 138 additions and 231 deletions
|
|
@ -91,12 +91,10 @@ namespace SimpleWeb {
|
||||||
class Connection : public std::enable_shared_from_this<Connection> {
|
class Connection : public std::enable_shared_from_this<Connection> {
|
||||||
public:
|
public:
|
||||||
template <typename... Args>
|
template <typename... Args>
|
||||||
Connection(std::shared_ptr<bool> cancel_handlers, std::shared_ptr<SharedMutex> cancel_handlers_mutex, long timeout, Args &&... args)
|
Connection(std::shared_ptr<ContinueScopes> continue_handlers, long timeout, Args &&... args)
|
||||||
: cancel_handlers(std::move(cancel_handlers)), cancel_handlers_mutex(std::move(cancel_handlers_mutex)), timeout(timeout),
|
: continue_handlers(std::move(continue_handlers)), timeout(timeout), socket(new socket_type(std::forward<Args>(args)...)) {}
|
||||||
socket(new socket_type(std::forward<Args>(args)...)) {}
|
|
||||||
|
|
||||||
std::shared_ptr<bool> cancel_handlers;
|
std::shared_ptr<ContinueScopes> continue_handlers;
|
||||||
std::shared_ptr<SharedMutex> cancel_handlers_mutex;
|
|
||||||
long timeout;
|
long timeout;
|
||||||
|
|
||||||
std::unique_ptr<socket_type> socket; // Socket must be unique_ptr since asio::ssl::stream<asio::ip::tcp::socket> is not movable
|
std::unique_ptr<socket_type> socket; // Socket must be unique_ptr since asio::ssl::stream<asio::ip::tcp::socket> is not movable
|
||||||
|
|
@ -127,13 +125,6 @@ namespace SimpleWeb {
|
||||||
if(timer)
|
if(timer)
|
||||||
timer->cancel();
|
timer->cancel();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::pair<bool, std::unique_ptr<SharedMutex::SharedLock>> cancel_handlers_bool_and_lock() {
|
|
||||||
if(!cancel_handlers)
|
|
||||||
return {false, nullptr};
|
|
||||||
auto lock = cancel_handlers_mutex->shared_lock();
|
|
||||||
return {*cancel_handlers, std::move(lock)};
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
class Session {
|
class Session {
|
||||||
|
|
@ -337,12 +328,7 @@ namespace SimpleWeb {
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual ~ClientBase() {
|
virtual ~ClientBase() {
|
||||||
{
|
continue_handlers->stop();
|
||||||
if(!internal_io_service) {
|
|
||||||
auto lock = cancel_handlers_mutex->unique_lock();
|
|
||||||
*cancel_handlers = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
stop();
|
stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -357,13 +343,12 @@ namespace SimpleWeb {
|
||||||
std::unordered_set<std::shared_ptr<Connection>> connections;
|
std::unordered_set<std::shared_ptr<Connection>> connections;
|
||||||
std::mutex connections_mutex;
|
std::mutex connections_mutex;
|
||||||
|
|
||||||
std::shared_ptr<bool> cancel_handlers;
|
std::shared_ptr<ContinueScopes> continue_handlers;
|
||||||
std::shared_ptr<SharedMutex> cancel_handlers_mutex;
|
|
||||||
|
|
||||||
size_t concurrent_synchronous_requests = 0;
|
size_t concurrent_synchronous_requests = 0;
|
||||||
std::mutex concurrent_synchronous_requests_mutex;
|
std::mutex concurrent_synchronous_requests_mutex;
|
||||||
|
|
||||||
ClientBase(const std::string &host_port, unsigned short default_port) : cancel_handlers(new bool(false)), cancel_handlers_mutex(new SharedMutex()) {
|
ClientBase(const std::string &host_port, unsigned short default_port) : continue_handlers(new ContinueScopes()) {
|
||||||
auto parsed_host_port = parse_host_port(host_port, default_port);
|
auto parsed_host_port = parse_host_port(host_port, default_port);
|
||||||
host = parsed_host_port.first;
|
host = parsed_host_port.first;
|
||||||
port = parsed_host_port.second;
|
port = parsed_host_port.second;
|
||||||
|
|
@ -376,8 +361,6 @@ namespace SimpleWeb {
|
||||||
if(!io_service) {
|
if(!io_service) {
|
||||||
io_service = std::make_shared<asio::io_service>();
|
io_service = std::make_shared<asio::io_service>();
|
||||||
internal_io_service = true;
|
internal_io_service = true;
|
||||||
cancel_handlers = nullptr;
|
|
||||||
cancel_handlers_mutex = nullptr;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for(auto it = connections.begin(); it != connections.end(); ++it) {
|
for(auto it = connections.begin(); it != connections.end(); ++it) {
|
||||||
|
|
@ -442,8 +425,8 @@ namespace SimpleWeb {
|
||||||
session->connection->set_timeout();
|
session->connection->set_timeout();
|
||||||
asio::async_write(*session->connection->socket, session->request_buffer->data(), [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
|
asio::async_write(*session->connection->socket, session->request_buffer->data(), [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
|
||||||
session->connection->cancel_timeout();
|
session->connection->cancel_timeout();
|
||||||
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
|
auto lock = session->connection->continue_handlers->shared_lock();
|
||||||
if(cancel_pair.first)
|
if(!lock)
|
||||||
return;
|
return;
|
||||||
if(!ec)
|
if(!ec)
|
||||||
this->read(session);
|
this->read(session);
|
||||||
|
|
@ -456,8 +439,8 @@ namespace SimpleWeb {
|
||||||
session->connection->set_timeout();
|
session->connection->set_timeout();
|
||||||
asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n\r\n", [this, session](const error_code &ec, size_t bytes_transferred) {
|
asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n\r\n", [this, session](const error_code &ec, size_t bytes_transferred) {
|
||||||
session->connection->cancel_timeout();
|
session->connection->cancel_timeout();
|
||||||
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
|
auto lock = session->connection->continue_handlers->shared_lock();
|
||||||
if(cancel_pair.first)
|
if(!lock)
|
||||||
return;
|
return;
|
||||||
if(!ec) {
|
if(!ec) {
|
||||||
session->connection->attempt_reconnect = true;
|
session->connection->attempt_reconnect = true;
|
||||||
|
|
@ -476,8 +459,8 @@ namespace SimpleWeb {
|
||||||
session->connection->set_timeout();
|
session->connection->set_timeout();
|
||||||
asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
|
asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
|
||||||
session->connection->cancel_timeout();
|
session->connection->cancel_timeout();
|
||||||
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
|
auto lock = session->connection->continue_handlers->shared_lock();
|
||||||
if(cancel_pair.first)
|
if(!lock)
|
||||||
return;
|
return;
|
||||||
if(!ec)
|
if(!ec)
|
||||||
session->callback(session->connection, ec);
|
session->callback(session->connection, ec);
|
||||||
|
|
@ -496,8 +479,8 @@ namespace SimpleWeb {
|
||||||
session->connection->set_timeout();
|
session->connection->set_timeout();
|
||||||
asio::async_read(*session->connection->socket, session->response->content_buffer, [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
|
asio::async_read(*session->connection->socket, session->response->content_buffer, [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
|
||||||
session->connection->cancel_timeout();
|
session->connection->cancel_timeout();
|
||||||
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
|
auto lock = session->connection->continue_handlers->shared_lock();
|
||||||
if(cancel_pair.first)
|
if(!lock)
|
||||||
return;
|
return;
|
||||||
if(!ec)
|
if(!ec)
|
||||||
session->callback(session->connection, ec);
|
session->callback(session->connection, ec);
|
||||||
|
|
@ -536,8 +519,8 @@ namespace SimpleWeb {
|
||||||
session->connection->set_timeout();
|
session->connection->set_timeout();
|
||||||
asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n", [this, session, tmp_streambuf](const error_code &ec, size_t bytes_transferred) {
|
asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n", [this, session, tmp_streambuf](const error_code &ec, size_t bytes_transferred) {
|
||||||
session->connection->cancel_timeout();
|
session->connection->cancel_timeout();
|
||||||
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
|
auto lock = session->connection->continue_handlers->shared_lock();
|
||||||
if(cancel_pair.first)
|
if(!lock)
|
||||||
return;
|
return;
|
||||||
if(!ec) {
|
if(!ec) {
|
||||||
std::string line;
|
std::string line;
|
||||||
|
|
@ -574,8 +557,8 @@ namespace SimpleWeb {
|
||||||
session->connection->set_timeout();
|
session->connection->set_timeout();
|
||||||
asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(2 + length - num_additional_bytes), [this, session, post_process](const error_code &ec, size_t /*bytes_transferred*/) {
|
asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(2 + length - num_additional_bytes), [this, session, post_process](const error_code &ec, size_t /*bytes_transferred*/) {
|
||||||
session->connection->cancel_timeout();
|
session->connection->cancel_timeout();
|
||||||
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
|
auto lock = session->connection->continue_handlers->shared_lock();
|
||||||
if(cancel_pair.first)
|
if(!lock)
|
||||||
return;
|
return;
|
||||||
if(!ec)
|
if(!ec)
|
||||||
post_process();
|
post_process();
|
||||||
|
|
@ -604,7 +587,7 @@ namespace SimpleWeb {
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
std::shared_ptr<Connection> create_connection() override {
|
std::shared_ptr<Connection> create_connection() override {
|
||||||
return std::make_shared<Connection>(cancel_handlers, cancel_handlers_mutex, config.timeout, *io_service);
|
return std::make_shared<Connection>(continue_handlers, config.timeout, *io_service);
|
||||||
}
|
}
|
||||||
|
|
||||||
void connect(const std::shared_ptr<Session> &session) override {
|
void connect(const std::shared_ptr<Session> &session) override {
|
||||||
|
|
@ -613,15 +596,15 @@ namespace SimpleWeb {
|
||||||
session->connection->set_timeout(config.timeout_connect);
|
session->connection->set_timeout(config.timeout_connect);
|
||||||
resolver->async_resolve(*query, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator it) {
|
resolver->async_resolve(*query, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator it) {
|
||||||
session->connection->cancel_timeout();
|
session->connection->cancel_timeout();
|
||||||
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
|
auto lock = session->connection->continue_handlers->shared_lock();
|
||||||
if(cancel_pair.first)
|
if(!lock)
|
||||||
return;
|
return;
|
||||||
if(!ec) {
|
if(!ec) {
|
||||||
session->connection->set_timeout(config.timeout_connect);
|
session->connection->set_timeout(config.timeout_connect);
|
||||||
asio::async_connect(*session->connection->socket, it, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator /*it*/) {
|
asio::async_connect(*session->connection->socket, it, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator /*it*/) {
|
||||||
session->connection->cancel_timeout();
|
session->connection->cancel_timeout();
|
||||||
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
|
auto lock = session->connection->continue_handlers->shared_lock();
|
||||||
if(cancel_pair.first)
|
if(!lock)
|
||||||
return;
|
return;
|
||||||
if(!ec) {
|
if(!ec) {
|
||||||
asio::ip::tcp::no_delay option(true);
|
asio::ip::tcp::no_delay option(true);
|
||||||
|
|
|
||||||
|
|
@ -41,22 +41,22 @@ namespace SimpleWeb {
|
||||||
asio::ssl::context context;
|
asio::ssl::context context;
|
||||||
|
|
||||||
std::shared_ptr<Connection> create_connection() override {
|
std::shared_ptr<Connection> create_connection() override {
|
||||||
return std::make_shared<Connection>(cancel_handlers, cancel_handlers_mutex, config.timeout, *io_service, context);
|
return std::make_shared<Connection>(continue_handlers, config.timeout, *io_service, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
void connect(const std::shared_ptr<Session> &session) override {
|
void connect(const std::shared_ptr<Session> &session) override {
|
||||||
if(!session->connection->socket->lowest_layer().is_open()) {
|
if(!session->connection->socket->lowest_layer().is_open()) {
|
||||||
auto resolver = std::make_shared<asio::ip::tcp::resolver>(*io_service);
|
auto resolver = std::make_shared<asio::ip::tcp::resolver>(*io_service);
|
||||||
resolver->async_resolve(*query, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator it) {
|
resolver->async_resolve(*query, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator it) {
|
||||||
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
|
auto lock = session->connection->continue_handlers->shared_lock();
|
||||||
if(cancel_pair.first)
|
if(!lock)
|
||||||
return;
|
return;
|
||||||
if(!ec) {
|
if(!ec) {
|
||||||
session->connection->set_timeout(this->config.timeout_connect);
|
session->connection->set_timeout(this->config.timeout_connect);
|
||||||
asio::async_connect(session->connection->socket->lowest_layer(), it, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator /*it*/) {
|
asio::async_connect(session->connection->socket->lowest_layer(), it, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator /*it*/) {
|
||||||
session->connection->cancel_timeout();
|
session->connection->cancel_timeout();
|
||||||
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
|
auto lock = session->connection->continue_handlers->shared_lock();
|
||||||
if(cancel_pair.first)
|
if(!lock)
|
||||||
return;
|
return;
|
||||||
if(!ec) {
|
if(!ec) {
|
||||||
asio::ip::tcp::no_delay option(true);
|
asio::ip::tcp::no_delay option(true);
|
||||||
|
|
@ -72,16 +72,16 @@ namespace SimpleWeb {
|
||||||
session->connection->set_timeout(this->config.timeout_connect);
|
session->connection->set_timeout(this->config.timeout_connect);
|
||||||
asio::async_write(session->connection->socket->next_layer(), *write_buffer, [this, session, write_buffer](const error_code &ec, size_t /*bytes_transferred*/) {
|
asio::async_write(session->connection->socket->next_layer(), *write_buffer, [this, session, write_buffer](const error_code &ec, size_t /*bytes_transferred*/) {
|
||||||
session->connection->cancel_timeout();
|
session->connection->cancel_timeout();
|
||||||
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
|
auto lock = session->connection->continue_handlers->shared_lock();
|
||||||
if(cancel_pair.first)
|
if(!lock)
|
||||||
return;
|
return;
|
||||||
if(!ec) {
|
if(!ec) {
|
||||||
std::shared_ptr<Response> response(new Response());
|
std::shared_ptr<Response> response(new Response());
|
||||||
session->connection->set_timeout(this->config.timeout_connect);
|
session->connection->set_timeout(this->config.timeout_connect);
|
||||||
asio::async_read_until(session->connection->socket->next_layer(), response->content_buffer, "\r\n\r\n", [this, session, response](const error_code &ec, size_t /*bytes_transferred*/) {
|
asio::async_read_until(session->connection->socket->next_layer(), response->content_buffer, "\r\n\r\n", [this, session, response](const error_code &ec, size_t /*bytes_transferred*/) {
|
||||||
session->connection->cancel_timeout();
|
session->connection->cancel_timeout();
|
||||||
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
|
auto lock = session->connection->continue_handlers->shared_lock();
|
||||||
if(cancel_pair.first)
|
if(!lock)
|
||||||
return;
|
return;
|
||||||
if(!ec) {
|
if(!ec) {
|
||||||
if(!ResponseMessage::parse(response->content, response->http_version, response->status_code, response->header))
|
if(!ResponseMessage::parse(response->content, response->http_version, response->status_code, response->header))
|
||||||
|
|
@ -120,8 +120,8 @@ namespace SimpleWeb {
|
||||||
session->connection->set_timeout(this->config.timeout_connect);
|
session->connection->set_timeout(this->config.timeout_connect);
|
||||||
session->connection->socket->async_handshake(asio::ssl::stream_base::client, [this, session](const error_code &ec) {
|
session->connection->socket->async_handshake(asio::ssl::stream_base::client, [this, session](const error_code &ec) {
|
||||||
session->connection->cancel_timeout();
|
session->connection->cancel_timeout();
|
||||||
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
|
auto lock = session->connection->continue_handlers->shared_lock();
|
||||||
if(cancel_pair.first)
|
if(!lock)
|
||||||
return;
|
return;
|
||||||
if(!ec)
|
if(!ec)
|
||||||
this->write(session);
|
this->write(session);
|
||||||
|
|
|
||||||
|
|
@ -90,8 +90,8 @@ namespace SimpleWeb {
|
||||||
auto self = this->shared_from_this(); // Keep Response instance alive through the following async_write
|
auto self = this->shared_from_this(); // Keep Response instance alive through the following async_write
|
||||||
asio::async_write(*session->connection->socket, streambuf, [self, callback](const error_code &ec, size_t /*bytes_transferred*/) {
|
asio::async_write(*session->connection->socket, streambuf, [self, callback](const error_code &ec, size_t /*bytes_transferred*/) {
|
||||||
self->session->connection->cancel_timeout();
|
self->session->connection->cancel_timeout();
|
||||||
auto cancel_pair = self->session->connection->cancel_handlers_bool_and_lock();
|
auto lock = self->session->connection->continue_handlers->shared_lock();
|
||||||
if(cancel_pair.first)
|
if(!lock)
|
||||||
return;
|
return;
|
||||||
if(callback)
|
if(callback)
|
||||||
callback(ec);
|
callback(ec);
|
||||||
|
|
@ -202,11 +202,9 @@ namespace SimpleWeb {
|
||||||
class Connection : public std::enable_shared_from_this<Connection> {
|
class Connection : public std::enable_shared_from_this<Connection> {
|
||||||
public:
|
public:
|
||||||
template <typename... Args>
|
template <typename... Args>
|
||||||
Connection(std::shared_ptr<bool> cancel_handlers, std::shared_ptr<SharedMutex> cancel_handlers_mutex, Args &&... args)
|
Connection(std::shared_ptr<ContinueScopes> continue_handlers, Args &&... args) : continue_handlers(std::move(continue_handlers)), socket(new socket_type(std::forward<Args>(args)...)) {}
|
||||||
: cancel_handlers(std::move(cancel_handlers)), cancel_handlers_mutex(std::move(cancel_handlers_mutex)), socket(new socket_type(std::forward<Args>(args)...)) {}
|
|
||||||
|
|
||||||
std::shared_ptr<bool> cancel_handlers;
|
std::shared_ptr<ContinueScopes> continue_handlers;
|
||||||
std::shared_ptr<SharedMutex> cancel_handlers_mutex;
|
|
||||||
|
|
||||||
std::unique_ptr<socket_type> socket; // Socket must be unique_ptr since asio::ssl::stream<asio::ip::tcp::socket> is not movable
|
std::unique_ptr<socket_type> socket; // Socket must be unique_ptr since asio::ssl::stream<asio::ip::tcp::socket> is not movable
|
||||||
std::mutex socket_close_mutex;
|
std::mutex socket_close_mutex;
|
||||||
|
|
@ -239,13 +237,6 @@ namespace SimpleWeb {
|
||||||
if(timer)
|
if(timer)
|
||||||
timer->cancel();
|
timer->cancel();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::pair<bool, std::unique_ptr<SharedMutex::SharedLock>> cancel_handlers_bool_and_lock() {
|
|
||||||
if(!cancel_handlers)
|
|
||||||
return {false, nullptr};
|
|
||||||
auto lock = cancel_handlers_mutex->shared_lock();
|
|
||||||
return {*cancel_handlers, std::move(lock)};
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
class Session {
|
class Session {
|
||||||
|
|
@ -318,8 +309,6 @@ namespace SimpleWeb {
|
||||||
if(!io_service) {
|
if(!io_service) {
|
||||||
io_service = std::make_shared<asio::io_service>();
|
io_service = std::make_shared<asio::io_service>();
|
||||||
internal_io_service = true;
|
internal_io_service = true;
|
||||||
cancel_handlers = nullptr;
|
|
||||||
cancel_handlers_mutex = nullptr;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if(io_service->stopped())
|
if(io_service->stopped())
|
||||||
|
|
@ -378,12 +367,7 @@ namespace SimpleWeb {
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual ~ServerBase() {
|
virtual ~ServerBase() {
|
||||||
{
|
continue_handlers->stop();
|
||||||
if(!internal_io_service) {
|
|
||||||
auto lock = cancel_handlers_mutex->unique_lock();
|
|
||||||
*cancel_handlers = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
stop();
|
stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -396,11 +380,9 @@ namespace SimpleWeb {
|
||||||
std::shared_ptr<std::unordered_set<Connection *>> connections;
|
std::shared_ptr<std::unordered_set<Connection *>> connections;
|
||||||
std::shared_ptr<std::mutex> connections_mutex;
|
std::shared_ptr<std::mutex> connections_mutex;
|
||||||
|
|
||||||
std::shared_ptr<bool> cancel_handlers;
|
std::shared_ptr<ContinueScopes> continue_handlers;
|
||||||
std::shared_ptr<SharedMutex> cancel_handlers_mutex;
|
|
||||||
|
|
||||||
ServerBase(unsigned short port) : config(port), connections(new std::unordered_set<Connection *>()), connections_mutex(new std::mutex()),
|
ServerBase(unsigned short port) : config(port), connections(new std::unordered_set<Connection *>()), connections_mutex(new std::mutex()), continue_handlers(new ContinueScopes()) {}
|
||||||
cancel_handlers(new bool(false)), cancel_handlers_mutex(new SharedMutex()) {}
|
|
||||||
|
|
||||||
virtual void accept() = 0;
|
virtual void accept() = 0;
|
||||||
|
|
||||||
|
|
@ -408,7 +390,7 @@ namespace SimpleWeb {
|
||||||
std::shared_ptr<Connection> create_connection(Args &&... args) {
|
std::shared_ptr<Connection> create_connection(Args &&... args) {
|
||||||
auto connections = this->connections;
|
auto connections = this->connections;
|
||||||
auto connections_mutex = this->connections_mutex;
|
auto connections_mutex = this->connections_mutex;
|
||||||
auto connection = std::shared_ptr<Connection>(new Connection(cancel_handlers, cancel_handlers_mutex, std::forward<Args>(args)...), [connections, connections_mutex](Connection *connection) {
|
auto connection = std::shared_ptr<Connection>(new Connection(continue_handlers, std::forward<Args>(args)...), [connections, connections_mutex](Connection *connection) {
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> lock(*connections_mutex);
|
std::unique_lock<std::mutex> lock(*connections_mutex);
|
||||||
auto it = connections->find(connection);
|
auto it = connections->find(connection);
|
||||||
|
|
@ -428,8 +410,8 @@ namespace SimpleWeb {
|
||||||
session->connection->set_timeout(config.timeout_request);
|
session->connection->set_timeout(config.timeout_request);
|
||||||
asio::async_read_until(*session->connection->socket, session->request->streambuf, "\r\n\r\n", [this, session](const error_code &ec, size_t bytes_transferred) {
|
asio::async_read_until(*session->connection->socket, session->request->streambuf, "\r\n\r\n", [this, session](const error_code &ec, size_t bytes_transferred) {
|
||||||
session->connection->cancel_timeout();
|
session->connection->cancel_timeout();
|
||||||
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
|
auto lock = session->connection->continue_handlers->shared_lock();
|
||||||
if(cancel_pair.first)
|
if(!lock)
|
||||||
return;
|
return;
|
||||||
if(!ec) {
|
if(!ec) {
|
||||||
// request->streambuf.size() is not necessarily the same as bytes_transferred, from Boost-docs:
|
// request->streambuf.size() is not necessarily the same as bytes_transferred, from Boost-docs:
|
||||||
|
|
@ -461,8 +443,8 @@ namespace SimpleWeb {
|
||||||
session->connection->set_timeout(config.timeout_content);
|
session->connection->set_timeout(config.timeout_content);
|
||||||
asio::async_read(*session->connection->socket, session->request->streambuf, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
|
asio::async_read(*session->connection->socket, session->request->streambuf, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, size_t /*bytes_transferred*/) {
|
||||||
session->connection->cancel_timeout();
|
session->connection->cancel_timeout();
|
||||||
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
|
auto lock = session->connection->continue_handlers->shared_lock();
|
||||||
if(cancel_pair.first)
|
if(!lock)
|
||||||
return;
|
return;
|
||||||
if(!ec)
|
if(!ec)
|
||||||
this->find_resource(session);
|
this->find_resource(session);
|
||||||
|
|
@ -572,8 +554,8 @@ namespace SimpleWeb {
|
||||||
auto session = std::make_shared<Session>(create_connection(*io_service));
|
auto session = std::make_shared<Session>(create_connection(*io_service));
|
||||||
|
|
||||||
acceptor->async_accept(*session->connection->socket, [this, session](const error_code &ec) {
|
acceptor->async_accept(*session->connection->socket, [this, session](const error_code &ec) {
|
||||||
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
|
auto lock = session->connection->continue_handlers->shared_lock();
|
||||||
if(cancel_pair.first)
|
if(!lock)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
// Immediately start accepting a new connection (unless io_service has been stopped)
|
// Immediately start accepting a new connection (unless io_service has been stopped)
|
||||||
|
|
|
||||||
|
|
@ -51,8 +51,8 @@ namespace SimpleWeb {
|
||||||
auto session = std::make_shared<Session>(create_connection(*io_service, context));
|
auto session = std::make_shared<Session>(create_connection(*io_service, context));
|
||||||
|
|
||||||
acceptor->async_accept(session->connection->socket->lowest_layer(), [this, session](const error_code &ec) {
|
acceptor->async_accept(session->connection->socket->lowest_layer(), [this, session](const error_code &ec) {
|
||||||
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
|
auto lock = session->connection->continue_handlers->shared_lock();
|
||||||
if(cancel_pair.first)
|
if(!lock)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if(ec != asio::error::operation_aborted)
|
if(ec != asio::error::operation_aborted)
|
||||||
|
|
@ -66,8 +66,8 @@ namespace SimpleWeb {
|
||||||
session->connection->set_timeout(config.timeout_request);
|
session->connection->set_timeout(config.timeout_request);
|
||||||
session->connection->socket->async_handshake(asio::ssl::stream_base::server, [this, session](const error_code &ec) {
|
session->connection->socket->async_handshake(asio::ssl::stream_base::server, [this, session](const error_code &ec) {
|
||||||
session->connection->cancel_timeout();
|
session->connection->cancel_timeout();
|
||||||
auto cancel_pair = session->connection->cancel_handlers_bool_and_lock();
|
auto lock = session->connection->continue_handlers->shared_lock();
|
||||||
if(cancel_pair.first)
|
if(!lock)
|
||||||
return;
|
return;
|
||||||
if(!ec)
|
if(!ec)
|
||||||
this->read_request_and_content(session);
|
this->read_request_and_content(session);
|
||||||
|
|
|
||||||
|
|
@ -13,36 +13,42 @@ typedef SimpleWeb::Server<SimpleWeb::HTTP> HttpServer;
|
||||||
typedef SimpleWeb::Client<SimpleWeb::HTTP> HttpClient;
|
typedef SimpleWeb::Client<SimpleWeb::HTTP> HttpClient;
|
||||||
|
|
||||||
int main() {
|
int main() {
|
||||||
|
// Test ContinueScopes
|
||||||
{
|
{
|
||||||
// Test SharedMutex
|
SimpleWeb::ContinueScopes continue_scopes;
|
||||||
SimpleWeb::SharedMutex mutex;
|
std::thread cancel_thread;
|
||||||
int count = 0;
|
|
||||||
{
|
{
|
||||||
thread t([&] {
|
assert(continue_scopes.count == 0);
|
||||||
auto lock = mutex.shared_lock();
|
auto lock = continue_scopes.shared_lock();
|
||||||
{
|
assert(continue_scopes.count == 1);
|
||||||
auto lock = mutex.shared_lock();
|
{
|
||||||
++count;
|
auto lock = continue_scopes.shared_lock();
|
||||||
}
|
assert(continue_scopes.count == 2);
|
||||||
|
}
|
||||||
|
assert(continue_scopes.count == 1);
|
||||||
|
cancel_thread = thread([&continue_scopes] {
|
||||||
|
continue_scopes.stop();
|
||||||
|
assert(continue_scopes.count == -1);
|
||||||
});
|
});
|
||||||
this_thread::sleep_for(chrono::milliseconds(100));
|
this_thread::sleep_for(chrono::milliseconds(500));
|
||||||
t.detach();
|
assert(continue_scopes.count == 1);
|
||||||
assert(count == 1);
|
|
||||||
}
|
}
|
||||||
thread t;
|
cancel_thread.join();
|
||||||
{
|
assert(continue_scopes.count == -1);
|
||||||
auto lock = mutex.unique_lock();
|
|
||||||
t = thread([&] {
|
|
||||||
auto lock = mutex.unique_lock();
|
|
||||||
++count;
|
|
||||||
});
|
|
||||||
this_thread::sleep_for(chrono::milliseconds(100));
|
|
||||||
assert(count == 1);
|
|
||||||
}
|
|
||||||
t.join();
|
|
||||||
assert(count == 2);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
continue_scopes.count = 0;
|
||||||
|
|
||||||
|
vector<thread> threads;
|
||||||
|
for(size_t c = 0; c < 100; ++c) {
|
||||||
|
threads.emplace_back([&continue_scopes] {
|
||||||
|
auto lock = continue_scopes.shared_lock();
|
||||||
|
assert(continue_scopes.count > 0);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
for(auto &thread : threads)
|
||||||
|
thread.join();
|
||||||
|
assert(continue_scopes.count == 0);
|
||||||
|
}
|
||||||
|
|
||||||
HttpServer server;
|
HttpServer server;
|
||||||
server.config.port = 8080;
|
server.config.port = 8080;
|
||||||
|
|
|
||||||
174
utility.hpp
174
utility.hpp
|
|
@ -2,6 +2,7 @@
|
||||||
#define SIMPLE_WEB_UTILITY_HPP
|
#define SIMPLE_WEB_UTILITY_HPP
|
||||||
|
|
||||||
#include "status_code.hpp"
|
#include "status_code.hpp"
|
||||||
|
#include <atomic>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
@ -234,130 +235,65 @@ namespace SimpleWeb {
|
||||||
};
|
};
|
||||||
} // namespace SimpleWeb
|
} // namespace SimpleWeb
|
||||||
|
|
||||||
#ifdef PTHREAD_RWLOCK_INITIALIZER
|
#ifdef __SSE2__
|
||||||
|
#include <emmintrin.h>
|
||||||
namespace SimpleWeb {
|
namespace SimpleWeb {
|
||||||
/// Read-preferring R/W lock.
|
inline void spin_loop_pause() { _mm_pause(); }
|
||||||
/// Uses pthread_rwlock.
|
} // namespace SimpleWeb
|
||||||
class SharedMutex {
|
// TODO: need verification that the following checks are correct:
|
||||||
pthread_rwlock_t rwlock;
|
#elif defined(_MSC_VER) && _MSC_VER >= 1800 && (defined(_M_X64) || defined(_M_IX86))
|
||||||
|
#include <intrin.h>
|
||||||
public:
|
namespace SimpleWeb {
|
||||||
class SharedLock {
|
inline void spin_loop_pause() { _mm_pause(); }
|
||||||
friend class SharedMutex;
|
|
||||||
pthread_rwlock_t &rwlock;
|
|
||||||
|
|
||||||
SharedLock(pthread_rwlock_t &rwlock) : rwlock(rwlock) {
|
|
||||||
pthread_rwlock_rdlock(&rwlock);
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
|
||||||
~SharedLock() {
|
|
||||||
pthread_rwlock_unlock(&rwlock);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
class UniqueLock {
|
|
||||||
friend class SharedMutex;
|
|
||||||
pthread_rwlock_t &rwlock;
|
|
||||||
|
|
||||||
UniqueLock(pthread_rwlock_t &rwlock) : rwlock(rwlock) {
|
|
||||||
pthread_rwlock_wrlock(&rwlock);
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
|
||||||
~UniqueLock() {
|
|
||||||
pthread_rwlock_unlock(&rwlock);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
public:
|
|
||||||
SharedMutex() {
|
|
||||||
|
|
||||||
pthread_rwlock_init(&rwlock, nullptr);
|
|
||||||
}
|
|
||||||
|
|
||||||
~SharedMutex() {
|
|
||||||
pthread_rwlock_destroy(&rwlock);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::unique_ptr<SharedLock> shared_lock() {
|
|
||||||
return std::unique_ptr<SharedLock>(new SharedLock(rwlock));
|
|
||||||
}
|
|
||||||
|
|
||||||
std::unique_ptr<UniqueLock> unique_lock() {
|
|
||||||
return std::unique_ptr<UniqueLock>(new UniqueLock(rwlock));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
} // namespace SimpleWeb
|
} // namespace SimpleWeb
|
||||||
#else
|
#else
|
||||||
#include <condition_variable>
|
|
||||||
#include <mutex>
|
|
||||||
namespace SimpleWeb {
|
namespace SimpleWeb {
|
||||||
/// Read-preferring R/W lock.
|
inline void spin_loop_pause() {}
|
||||||
/// Based on https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Using_a_condition_variable_and_a_mutex pseudocode.
|
|
||||||
/// TODO: Someone that uses Windows should implement Windows specific R/W locks here.
|
|
||||||
class SharedMutex {
|
|
||||||
std::mutex m;
|
|
||||||
std::condition_variable c;
|
|
||||||
int r = 0;
|
|
||||||
bool w = false;
|
|
||||||
|
|
||||||
public:
|
|
||||||
class SharedLock {
|
|
||||||
friend class SharedMutex;
|
|
||||||
std::condition_variable &c;
|
|
||||||
int &r;
|
|
||||||
std::unique_lock<std::mutex> lock;
|
|
||||||
|
|
||||||
SharedLock(std::mutex &m, std::condition_variable &c, int &r, bool &w) : c(c), r(r), lock(m) {
|
|
||||||
while(w)
|
|
||||||
c.wait(lock);
|
|
||||||
++r;
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
|
||||||
~SharedLock() {
|
|
||||||
lock.lock();
|
|
||||||
--r;
|
|
||||||
if(r == 0)
|
|
||||||
c.notify_all();
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
class UniqueLock {
|
|
||||||
friend class SharedMutex;
|
|
||||||
std::condition_variable &c;
|
|
||||||
bool &w;
|
|
||||||
std::unique_lock<std::mutex> lock;
|
|
||||||
|
|
||||||
UniqueLock(std::mutex &m, std::condition_variable &c, int &r, bool &w) : c(c), w(w), lock(m) {
|
|
||||||
while(w || r > 0)
|
|
||||||
c.wait(lock);
|
|
||||||
w = true;
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
|
||||||
~UniqueLock() {
|
|
||||||
lock.lock();
|
|
||||||
w = false;
|
|
||||||
c.notify_all();
|
|
||||||
lock.unlock();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
public:
|
|
||||||
std::unique_ptr<SharedLock> shared_lock() {
|
|
||||||
return std::unique_ptr<SharedLock>(new SharedLock(m, c, r, w));
|
|
||||||
}
|
|
||||||
|
|
||||||
std::unique_ptr<UniqueLock> unique_lock() {
|
|
||||||
return std::unique_ptr<UniqueLock>(new UniqueLock(m, c, r, w));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
} // namespace SimpleWeb
|
} // namespace SimpleWeb
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
namespace SimpleWeb {
|
||||||
|
/// Makes it possible to for instance cancel Asio handlers without stopping asio::io_service
|
||||||
|
class ContinueScopes {
|
||||||
|
/// Scope count that is set to -1 if scopes are to be canceled
|
||||||
|
std::atomic<long> count;
|
||||||
|
|
||||||
|
public:
|
||||||
|
class SharedLock {
|
||||||
|
std::atomic<long> &count;
|
||||||
|
SharedLock &operator=(const SharedLock &) = delete;
|
||||||
|
SharedLock(const SharedLock &) = delete;
|
||||||
|
|
||||||
|
public:
|
||||||
|
SharedLock(std::atomic<long> &count) : count(count) {}
|
||||||
|
~SharedLock() {
|
||||||
|
count.fetch_sub(1);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
ContinueScopes() : count(0) {}
|
||||||
|
|
||||||
|
/// Returns nullptr if scope is to be cancelled, or a shared lock otherwise
|
||||||
|
std::unique_ptr<SharedLock> shared_lock() {
|
||||||
|
long expected = count;
|
||||||
|
while(expected >= 0 && !count.compare_exchange_weak(expected, expected + 1))
|
||||||
|
spin_loop_pause();
|
||||||
|
|
||||||
|
if(expected < 0)
|
||||||
|
return nullptr;
|
||||||
|
else
|
||||||
|
return std::unique_ptr<SharedLock>(new SharedLock(count));
|
||||||
|
}
|
||||||
|
|
||||||
|
//// Blocks until all shared locks are released, then prevents future shared locks
|
||||||
|
void stop() {
|
||||||
|
long expected = 0;
|
||||||
|
while(!count.compare_exchange_weak(expected, -1)) {
|
||||||
|
expected = 0;
|
||||||
|
spin_loop_pause();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} // namespace SimpleWeb
|
||||||
|
|
||||||
#endif // SIMPLE_WEB_UTILITY_HPP
|
#endif // SIMPLE_WEB_UTILITY_HPP
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue