diff --git a/client_http.hpp b/client_http.hpp index a14c2c8..9700657 100644 --- a/client_http.hpp +++ b/client_http.hpp @@ -2,6 +2,7 @@ #define CLIENT_HTTP_HPP #include "utility.hpp" +#include #include #include #include @@ -33,7 +34,7 @@ namespace SimpleWeb { class Client; template - class ClientBase : public std::enable_shared_from_this> { + class ClientBase { ClientBase(const ClientBase &) = delete; ClientBase &operator=(const ClientBase &) = delete; @@ -121,11 +122,13 @@ namespace SimpleWeb { Connection(std::unique_ptr &&socket) : socket(std::move(socket)) {} std::unique_ptr socket; + std::mutex socket_close_mutex; bool in_use = false; - bool reconnecting = false; + bool attempt_reconnect = true; void close() { error_code ec; + std::unique_lock lock(socket_close_mutex); // the following operations seems to be needed to run sequentially socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec); socket->lowest_layer().close(ec); } @@ -133,9 +136,12 @@ namespace SimpleWeb { class Session { public: - Session(const std::shared_ptr> &client, const std::shared_ptr &connection, std::unique_ptr &&request_buffer) - : client(client), connection(connection), request_buffer(std::move(request_buffer)), response(new Response()) {} - std::shared_ptr> client; + Session(ClientBase *client, const std::shared_ptr &connection, std::unique_ptr &&request_buffer) + : client(client), cancel_callbacks(client->cancel_callbacks), cancel_callbacks_mutex(client->cancel_callbacks_mutex), + connection(connection), request_buffer(std::move(request_buffer)), response(new Response()) {} + ClientBase *client; + std::shared_ptr cancel_callbacks; + std::shared_ptr cancel_callbacks_mutex; std::shared_ptr connection; std::unique_ptr request_buffer; std::shared_ptr response; @@ -174,8 +180,6 @@ namespace SimpleWeb { /// When using asynchronous requests, running the io_service is up to the programmer. std::shared_ptr io_service; - virtual ~ClientBase() {} - /// Convenience function to perform synchronous request. The io_service is run within this function. /// If reusing the io_service for other tasks, please use the asynchronous request functions instead. std::shared_ptr request(const std::string &method, const std::string &path = std::string("/"), @@ -213,14 +217,14 @@ namespace SimpleWeb { /// Asynchronous request where setting and/or running Client's io_service is required. void request(const std::string &method, const std::string &path, string_view content, const CaseInsensitiveMultimap &header, std::function, const error_code &)> &&request_callback_) { - auto session = std::make_shared(this->shared_from_this(), get_connection(), create_request_header(method, path, header)); + auto session = std::make_shared(this, get_connection(), create_request_header(method, path, header)); auto client = session->client; auto connection = session->connection; auto response = session->response; auto request_callback = std::make_shared, const error_code &)>>(std::move(request_callback_)); session->callback = [client, connection, response, request_callback](const error_code &ec) { { - std::lock_guard lock(client->connections_mutex); + std::unique_lock lock(client->connections_mutex); connection->in_use = false; // Remove unused connections, but keep one open for HTTP persistent connection: @@ -271,14 +275,14 @@ namespace SimpleWeb { /// Asynchronous request where setting and/or running Client's io_service is required. void request(const std::string &method, const std::string &path, std::istream &content, const CaseInsensitiveMultimap &header, std::function, const error_code &)> &&request_callback_) { - auto session = std::make_shared(this->shared_from_this(), get_connection(), create_request_header(method, path, header)); + auto session = std::make_shared(this, get_connection(), create_request_header(method, path, header)); auto client = session->client; auto connection = session->connection; auto response = session->response; auto request_callback = std::make_shared, const error_code &)>>(std::move(request_callback_)); session->callback = [client, connection, response, request_callback](const error_code &ec) { { - std::lock_guard lock(client->connections_mutex); + std::unique_lock lock(client->connections_mutex); connection->in_use = false; // Remove unused connections, but keep one open for HTTP persistent connection: @@ -321,13 +325,22 @@ namespace SimpleWeb { /// Close connections void close() { - std::lock_guard lock(connections_mutex); + std::unique_lock lock(connections_mutex); for(auto it = connections.begin(); it != connections.end();) { + (*it)->attempt_reconnect = false; (*it)->close(); it = connections.erase(it); } } + virtual ~ClientBase() { + { + auto lock = cancel_callbacks_mutex->unique_lock(); + *cancel_callbacks = true; + } + close(); + } + protected: std::string host; unsigned short port; @@ -337,7 +350,10 @@ namespace SimpleWeb { std::vector> connections; std::mutex connections_mutex; - ClientBase(const std::string &host_port, unsigned short default_port) : io_service(new asio::io_service()) { + std::shared_ptr cancel_callbacks; + std::shared_ptr cancel_callbacks_mutex; + + ClientBase(const std::string &host_port, unsigned short default_port) : io_service(new asio::io_service()), cancel_callbacks(new bool(false)), cancel_callbacks_mutex(new SharedMutex()) { auto parsed_host_port = parse_host_port(host_port, default_port); host = parsed_host_port.first; port = parsed_host_port.second; @@ -345,7 +361,7 @@ namespace SimpleWeb { std::shared_ptr get_connection() { std::shared_ptr connection; - std::lock_guard lock(connections_mutex); + std::unique_lock lock(connections_mutex); for(auto it = connections.begin(); it != connections.end(); ++it) { if(!(*it)->in_use && !connection) { connection = *it; @@ -356,7 +372,7 @@ namespace SimpleWeb { connection = create_connection(); connections.emplace_back(connection); } - connection->reconnecting = false; + connection->attempt_reconnect = true; connection->in_use = true; if(!query) { @@ -408,6 +424,9 @@ namespace SimpleWeb { session->set_timeout(); asio::async_write(*session->connection->socket, session->request_buffer->data(), [this, session](const error_code &ec, size_t /*bytes_transferred*/) { session->cancel_timeout(); + auto lock = session->cancel_callbacks_mutex->shared_lock(); + if(*session->cancel_callbacks) + return; if(!ec) this->read(session); else { @@ -421,8 +440,11 @@ namespace SimpleWeb { session->set_timeout(); asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n\r\n", [this, session](const error_code &ec, size_t bytes_transferred) { session->cancel_timeout(); + auto lock = session->cancel_callbacks_mutex->shared_lock(); + if(*session->cancel_callbacks) + return; if(!ec) { - session->connection->reconnecting = false; + session->connection->attempt_reconnect = true; size_t num_additional_bytes = session->response->content_buffer.size() - bytes_transferred; @@ -435,6 +457,9 @@ namespace SimpleWeb { session->set_timeout(); asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, size_t /*bytes_transferred*/) { session->cancel_timeout(); + auto lock = session->cancel_callbacks_mutex->shared_lock(); + if(*session->cancel_callbacks) + return; if(!ec) session->callback(ec); else { @@ -454,6 +479,9 @@ namespace SimpleWeb { session->set_timeout(); asio::async_read(*session->connection->socket, session->response->content_buffer, [this, session](const error_code &ec, size_t /*bytes_transferred*/) { session->cancel_timeout(); + auto lock = session->cancel_callbacks_mutex->shared_lock(); + if(*session->cancel_callbacks) + return; if(!ec) session->callback(ec); else { @@ -471,8 +499,8 @@ namespace SimpleWeb { session->callback(ec); } else { - if(!session->connection->reconnecting) { - session->connection->reconnecting = true; + if(session->connection->attempt_reconnect) { + session->connection->attempt_reconnect = false; session->connection->close(); this->connect(session); } @@ -488,6 +516,9 @@ namespace SimpleWeb { session->set_timeout(); asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n", [this, session, tmp_streambuf](const error_code &ec, size_t bytes_transferred) { session->cancel_timeout(); + auto lock = session->cancel_callbacks_mutex->shared_lock(); + if(*session->cancel_callbacks) + return; if(!ec) { std::string line; getline(session->response->content, line); @@ -523,6 +554,9 @@ namespace SimpleWeb { session->set_timeout(); asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(2 + length - num_additional_bytes), [this, session, post_process](const error_code &ec, size_t /*bytes_transferred*/) { session->cancel_timeout(); + auto lock = session->cancel_callbacks_mutex->shared_lock(); + if(*session->cancel_callbacks) + return; if(!ec) post_process(); else { @@ -554,13 +588,9 @@ namespace SimpleWeb { Client &operator=(const Client &) = delete; public: - static std::shared_ptr create(const std::string &server_port_path) { - return std::shared_ptr(new Client(server_port_path)); - } - - protected: Client(const std::string &server_port_path) : ClientBase::ClientBase(server_port_path, 80) {} + protected: std::shared_ptr create_connection() override { return std::make_shared(std::unique_ptr(new HTTP(*io_service))); } @@ -571,13 +601,20 @@ namespace SimpleWeb { session->set_timeout(config.timeout_connect); resolver->async_resolve(*query, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator it) { session->cancel_timeout(); + auto lock = session->cancel_callbacks_mutex->shared_lock(); + if(*session->cancel_callbacks) + return; if(!ec) { session->set_timeout(config.timeout_connect); asio::async_connect(*session->connection->socket, it, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator /*it*/) { session->cancel_timeout(); + auto lock = session->cancel_callbacks_mutex->shared_lock(); + if(*session->cancel_callbacks) + return; if(!ec) { asio::ip::tcp::no_delay option(true); - session->connection->socket->set_option(option); + error_code ec; + session->connection->socket->set_option(option, ec); this->write(session); } else { diff --git a/client_https.hpp b/client_https.hpp index 950eeee..28af427 100644 --- a/client_https.hpp +++ b/client_https.hpp @@ -19,14 +19,8 @@ namespace SimpleWeb { Client &operator=(const Client &) = delete; public: - static std::shared_ptr create(const std::string &server_port_path, bool verify_certificate = true, const std::string &cert_file = std::string(), - const std::string &private_key_file = std::string(), const std::string &verify_file = std::string()) { - return std::shared_ptr(new Client(server_port_path, verify_certificate, cert_file, private_key_file, verify_file)); - } - - protected: - Client(const std::string &server_port_path, bool verify_certificate, const std::string &cert_file, - const std::string &private_key_file, const std::string &verify_file) + Client(const std::string &server_port_path, bool verify_certificate = true, const std::string &cert_file = std::string(), + const std::string &private_key_file = std::string(), const std::string &verify_file = std::string()) : ClientBase::ClientBase(server_port_path, 443), context(asio::ssl::context::tlsv12) { if(cert_file.size() > 0 && private_key_file.size() > 0) { context.use_certificate_chain_file(cert_file); @@ -47,6 +41,7 @@ namespace SimpleWeb { context.set_verify_mode(asio::ssl::verify_none); } + protected: asio::ssl::context context; std::shared_ptr create_connection() override { @@ -57,13 +52,20 @@ namespace SimpleWeb { if(!session->connection->socket->lowest_layer().is_open()) { auto resolver = std::make_shared(*io_service); resolver->async_resolve(*query, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator it) { + auto lock = session->cancel_callbacks_mutex->shared_lock(); + if(*session->cancel_callbacks) + return; if(!ec) { session->set_timeout(this->config.timeout_connect); asio::async_connect(session->connection->socket->lowest_layer(), it, [this, session, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator /*it*/) { session->cancel_timeout(); + auto lock = session->cancel_callbacks_mutex->shared_lock(); + if(*session->cancel_callbacks) + return; if(!ec) { asio::ip::tcp::no_delay option(true); - session->connection->socket->lowest_layer().set_option(option); + error_code ec; + session->connection->socket->lowest_layer().set_option(option, ec); if(!this->config.proxy_server.empty()) { auto write_buffer = std::make_shared(); @@ -74,11 +76,17 @@ namespace SimpleWeb { session->set_timeout(this->config.timeout_connect); asio::async_write(session->connection->socket->next_layer(), *write_buffer, [this, session, write_buffer](const error_code &ec, size_t /*bytes_transferred*/) { session->cancel_timeout(); + auto lock = session->cancel_callbacks_mutex->shared_lock(); + if(*session->cancel_callbacks) + return; if(!ec) { std::shared_ptr response(new Response()); session->set_timeout(this->config.timeout_connect); asio::async_read_until(session->connection->socket->next_layer(), response->content_buffer, "\r\n\r\n", [this, session, response](const error_code &ec, size_t /*bytes_transferred*/) { session->cancel_timeout(); + auto lock = session->cancel_callbacks_mutex->shared_lock(); + if(*session->cancel_callbacks) + return; if(!ec) { response->parse_header(); if(response->status_code.empty() || response->status_code.compare(0, 3, "200") != 0) { @@ -123,6 +131,9 @@ namespace SimpleWeb { session->set_timeout(this->config.timeout_connect); session->connection->socket->async_handshake(asio::ssl::stream_base::client, [this, session](const error_code &ec) { session->cancel_timeout(); + auto lock = session->cancel_callbacks_mutex->shared_lock(); + if(*session->cancel_callbacks) + return; if(!ec) this->write(session); else { diff --git a/http_examples.cpp b/http_examples.cpp index 833f25e..77ed8cb 100644 --- a/http_examples.cpp +++ b/http_examples.cpp @@ -26,12 +26,12 @@ int main() { //HTTP-server at port 8080 using 1 thread //Unless you do more heavy non-threaded processing in the resources, //1 thread is usually faster than several threads - auto server = HttpServer::create(); - server->config.port = 8080; + HttpServer server; + server.config.port = 8080; //Add resources using path-regex and method-string, and an anonymous function //POST-example for the path /string, responds the posted string - server->resource["^/string$"]["POST"] = [](shared_ptr response, shared_ptr request) { + server.resource["^/string$"]["POST"] = [](shared_ptr response, shared_ptr request) { //Retrieve string: auto content = request->content.string(); //request->content.string() is a convenience function for: @@ -55,7 +55,7 @@ int main() { // "lastName": "Smith", // "age": 25 //} - server->resource["^/json$"]["POST"] = [](shared_ptr response, shared_ptr request) { + server.resource["^/json$"]["POST"] = [](shared_ptr response, shared_ptr request) { try { ptree pt; read_json(request->content, pt); @@ -87,7 +87,7 @@ int main() { //GET-example for the path /info //Responds with request-information - server->resource["^/info$"]["GET"] = [](shared_ptr response, shared_ptr request) { + server.resource["^/info$"]["GET"] = [](shared_ptr response, shared_ptr request) { stringstream stream; stream << "

Request from " << request->remote_endpoint_address << " (" << request->remote_endpoint_port << ")

"; stream << request->method << " " << request->path << " HTTP/" << request->http_version << "
"; @@ -112,7 +112,7 @@ int main() { //GET-example for the path /match/[number], responds with the matched string in path (number) //For instance a request GET /match/123 will receive: 123 - server->resource["^/match/([0-9]+)$"]["GET"] = [](shared_ptr response, shared_ptr request) { + server.resource["^/match/([0-9]+)$"]["GET"] = [](shared_ptr response, shared_ptr request) { string number = request->path_match[1]; *response << "HTTP/1.1 200 OK\r\nContent-Length: " << number.length() << "\r\n\r\n" << number; @@ -123,7 +123,7 @@ int main() { }; //Get example simulating heavy work in a separate thread - server->resource["^/work$"]["GET"] = [](shared_ptr response, shared_ptr /*request*/) { + server.resource["^/work$"]["GET"] = [](shared_ptr response, shared_ptr /*request*/) { thread work_thread([response] { this_thread::sleep_for(chrono::seconds(5)); response->write("Work done"); @@ -135,7 +135,7 @@ int main() { //Will respond with content in the web/-directory, and its subdirectories. //Default file: index.html //Can for instance be used to retrieve an HTML 5 client that uses REST-resources on this server - server->default_resource["GET"] = [](shared_ptr response, shared_ptr request) { + server.default_resource["GET"] = [](shared_ptr response, shared_ptr request) { try { auto web_root_path = boost::filesystem::canonical("web"); auto path = boost::filesystem::canonical(web_root_path / request->path); @@ -210,36 +210,36 @@ int main() { } }; - server->on_error = [](shared_ptr /*request*/, const SimpleWeb::error_code & /*ec*/) { + server.on_error = [](shared_ptr /*request*/, const SimpleWeb::error_code & /*ec*/) { // handle errors here }; thread server_thread([&server]() { //Start server - server->start(); + server.start(); }); //Wait for server to start so that the client can connect this_thread::sleep_for(chrono::seconds(1)); //Client examples - auto client = HttpClient::create("localhost:8080"); + HttpClient client("localhost:8080"); // synchronous request examples - auto r1 = client->request("GET", "/match/123"); + auto r1 = client.request("GET", "/match/123"); cout << r1->content.rdbuf() << endl; // Alternatively, use the convenience function r1->content.string() string json_string = "{\"firstName\": \"John\",\"lastName\": \"Smith\",\"age\": 25}"; - auto r2 = client->request("POST", "/string", json_string); + auto r2 = client.request("POST", "/string", json_string); cout << r2->content.rdbuf() << endl; // asynchronous request example - client->request("POST", "/json", json_string, [](shared_ptr response, const SimpleWeb::error_code &ec) { + client.request("POST", "/json", json_string, [](shared_ptr response, const SimpleWeb::error_code &ec) { if(!ec) cout << response->content.rdbuf() << endl; }); - client->io_service->reset(); // needed because the io_service has been run already in the synchronous examples - client->io_service->run(); + client.io_service->reset(); // needed because the io_service has been run already in the synchronous examples + client.io_service->run(); server_thread.join(); } diff --git a/https_examples.cpp b/https_examples.cpp index b1237c0..d2a6384 100644 --- a/https_examples.cpp +++ b/https_examples.cpp @@ -24,12 +24,12 @@ int main() { //HTTPS-server at port 8080 using 1 thread //Unless you do more heavy non-threaded processing in the resources, //1 thread is usually faster than several threads - auto server = HttpsServer::create("server.crt", "server.key"); - server->config.port = 8080; + HttpsServer server("server.crt", "server.key"); + server.config.port = 8080; //Add resources using path-regex and method-string, and an anonymous function //POST-example for the path /string, responds the posted string - server->resource["^/string$"]["POST"] = [](shared_ptr response, shared_ptr request) { + server.resource["^/string$"]["POST"] = [](shared_ptr response, shared_ptr request) { //Retrieve string: auto content = request->content.string(); //request->content.string() is a convenience function for: @@ -53,7 +53,7 @@ int main() { // "lastName": "Smith", // "age": 25 //} - server->resource["^/json$"]["POST"] = [](shared_ptr response, shared_ptr request) { + server.resource["^/json$"]["POST"] = [](shared_ptr response, shared_ptr request) { try { ptree pt; read_json(request->content, pt); @@ -85,7 +85,7 @@ int main() { //GET-example for the path /info //Responds with request-information - server->resource["^/info$"]["GET"] = [](shared_ptr response, shared_ptr request) { + server.resource["^/info$"]["GET"] = [](shared_ptr response, shared_ptr request) { stringstream stream; stream << "

Request from " << request->remote_endpoint_address << " (" << request->remote_endpoint_port << ")

"; stream << request->method << " " << request->path << " HTTP/" << request->http_version << "
"; @@ -110,7 +110,7 @@ int main() { //GET-example for the path /match/[number], responds with the matched string in path (number) //For instance a request GET /match/123 will receive: 123 - server->resource["^/match/([0-9]+)$"]["GET"] = [](shared_ptr response, shared_ptr request) { + server.resource["^/match/([0-9]+)$"]["GET"] = [](shared_ptr response, shared_ptr request) { string number = request->path_match[1]; *response << "HTTP/1.1 200 OK\r\nContent-Length: " << number.length() << "\r\n\r\n" << number; @@ -121,7 +121,7 @@ int main() { }; //Get example simulating heavy work in a separate thread - server->resource["^/work$"]["GET"] = [](shared_ptr response, shared_ptr /*request*/) { + server.resource["^/work$"]["GET"] = [](shared_ptr response, shared_ptr /*request*/) { thread work_thread([response] { this_thread::sleep_for(chrono::seconds(5)); response->write("Work done"); @@ -133,7 +133,7 @@ int main() { //Will respond with content in the web/-directory, and its subdirectories. //Default file: index.html //Can for instance be used to retrieve an HTML 5 client that uses REST-resources on this server - server->default_resource["GET"] = [](shared_ptr response, shared_ptr request) { + server.default_resource["GET"] = [](shared_ptr response, shared_ptr request) { try { auto web_root_path = boost::filesystem::canonical("web"); auto path = boost::filesystem::canonical(web_root_path / request->path); @@ -208,13 +208,13 @@ int main() { } }; - server->on_error = [](shared_ptr /*request*/, const SimpleWeb::error_code & /*ec*/) { + server.on_error = [](shared_ptr /*request*/, const SimpleWeb::error_code & /*ec*/) { // handle errors here }; thread server_thread([&server]() { //Start server - server->start(); + server.start(); }); //Wait for server to start so that the client can connect @@ -222,23 +222,23 @@ int main() { //Client examples //Second create() parameter set to false: no certificate verification - auto client = HttpsClient::create("localhost:8080", false); + HttpsClient client("localhost:8080", false); // synchronous request examples - auto r1 = client->request("GET", "/match/123"); + auto r1 = client.request("GET", "/match/123"); cout << r1->content.rdbuf() << endl; // Alternatively, use the convenience function r1->content.string() string json_string = "{\"firstName\": \"John\",\"lastName\": \"Smith\",\"age\": 25}"; - auto r2 = client->request("POST", "/string", json_string); + auto r2 = client.request("POST", "/string", json_string); cout << r2->content.rdbuf() << endl; // asynchronous request example - client->request("POST", "/json", json_string, [](shared_ptr response, const SimpleWeb::error_code &ec) { + client.request("POST", "/json", json_string, [](shared_ptr response, const SimpleWeb::error_code &ec) { if(!ec) cout << response->content.rdbuf() << endl; }); - client->io_service->reset(); // needed because the io_service has been run already in the synchronous examples - client->io_service->run(); + client.io_service->reset(); // needed because the io_service has been run already in the synchronous examples + client.io_service->run(); server_thread.join(); } diff --git a/server_http.hpp b/server_http.hpp index c17a178..5d52336 100644 --- a/server_http.hpp +++ b/server_http.hpp @@ -2,6 +2,7 @@ #define SERVER_HTTP_HPP #include "utility.hpp" +#include #include #include #include @@ -43,7 +44,7 @@ namespace SimpleWeb { class Server; template - class ServerBase : public std::enable_shared_from_this> { + class ServerBase { ServerBase(const ServerBase &) = delete; ServerBase &operator=(const ServerBase &) = delete; @@ -51,8 +52,6 @@ namespace SimpleWeb { class Session; public: - virtual ~ServerBase() {} - class Response : public std::enable_shared_from_this, public std::ostream { friend class ServerBase; friend class Server; @@ -88,9 +87,16 @@ namespace SimpleWeb { /// Use this function if you need to recursively send parts of a longer message void send(const std::function &callback = nullptr) { + auto lock = session->cancel_callbacks_mutex->shared_lock(); + if(*session->cancel_callbacks) + return; auto self = this->shared_from_this(); session->set_timeout(session->server->config.timeout_content); asio::async_write(*session->socket, streambuf, [self, callback](const error_code &ec, size_t /*bytes_transferred*/) { + self->session->cancel_timeout(); + auto lock = self->session->cancel_callbacks_mutex->shared_lock(); + if(*self->session->cancel_callbacks) + return; if(callback) callback(ec); }); @@ -261,11 +267,15 @@ namespace SimpleWeb { protected: class Session { public: - Session(const std::shared_ptr> &server, const std::shared_ptr &socket) - : server(server), socket(socket), request(new Request(*this->socket)) {} + Session(ServerBase *server, const std::shared_ptr &socket) + : server(server), cancel_callbacks(server->cancel_callbacks), cancel_callbacks_mutex(server->cancel_callbacks_mutex), + socket(socket), socket_close_mutex(new std::mutex()), request(new Request(*this->socket)) {} - std::shared_ptr> server; + ServerBase *server; + std::shared_ptr cancel_callbacks; + std::shared_ptr cancel_callbacks_mutex; std::shared_ptr socket; + std::shared_ptr socket_close_mutex; std::shared_ptr request; std::unique_ptr timer; @@ -279,11 +289,13 @@ namespace SimpleWeb { timer = std::unique_ptr(new asio::deadline_timer(socket->get_io_service())); timer->expires_from_now(boost::posix_time::seconds(seconds)); auto socket = this->socket; - timer->async_wait([socket](const error_code &ec) { + auto socket_close_mutex = this->socket_close_mutex; + timer->async_wait([socket, socket_close_mutex](const error_code &ec) { if(!ec) { error_code ec; + std::unique_lock lock(*socket_close_mutex); // the following operations seems to be needed to run sequentially socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec); - socket->lowest_layer().close(); + socket->lowest_layer().close(ec); } }); } @@ -341,15 +353,15 @@ namespace SimpleWeb { std::function, std::shared_ptr::Request>)> on_upgrade; + /// If you have your own asio::io_service, store its pointer here before running start(). + std::shared_ptr io_service; + virtual void start() { if(!io_service) { io_service = std::make_shared(); internal_io_service = true; } - if(io_service->stopped()) - io_service->reset(); - asio::ip::tcp::endpoint endpoint; if(config.address.size() > 0) endpoint = asio::ip::tcp::endpoint(asio::ip::address::from_string(config.address), config.port); @@ -384,14 +396,26 @@ namespace SimpleWeb { } } + /// Stop accepting new requests, and close current sessions. void stop() { - acceptor->close(); - if(internal_io_service) - io_service->stop(); + if(acceptor) { + acceptor->close(); + if(internal_io_service) { + io_service->stop(); + while(!io_service->stopped()) + std::this_thread::yield(); + io_service->reset(); + } + } } - /// If you have your own asio::io_service, store its pointer here before running start(). - std::shared_ptr io_service; + virtual ~ServerBase() { + { + auto lock = cancel_callbacks_mutex->unique_lock(); + *cancel_callbacks = true; + } + stop(); + } protected: bool internal_io_service = false; @@ -399,7 +423,10 @@ namespace SimpleWeb { std::unique_ptr acceptor; std::vector threads; - ServerBase(unsigned short port) : config(port) {} + std::shared_ptr cancel_callbacks; + std::shared_ptr cancel_callbacks_mutex; + + ServerBase(unsigned short port) : config(port), cancel_callbacks(new bool(false)), cancel_callbacks_mutex(new SharedMutex()) {} virtual void accept() = 0; @@ -407,6 +434,9 @@ namespace SimpleWeb { session->set_timeout(config.timeout_request); asio::async_read_until(*session->socket, session->request->streambuf, "\r\n\r\n", [this, session](const error_code &ec, size_t bytes_transferred) { session->cancel_timeout(); + auto lock = session->cancel_callbacks_mutex->shared_lock(); + if(*session->cancel_callbacks) + return; if(!ec) { //request->streambuf.size() is not necessarily the same as bytes_transferred, from Boost-docs: //"After a successful async_read_until operation, the streambuf may contain additional data beyond the delimiter" @@ -433,6 +463,9 @@ namespace SimpleWeb { session->set_timeout(config.timeout_content); asio::async_read(*session->socket, session->request->streambuf, asio::transfer_exactly(content_length - num_additional_bytes), [this, session](const error_code &ec, size_t /*bytes_transferred*/) { session->cancel_timeout(); + auto lock = session->cancel_callbacks_mutex->shared_lock(); + if(*session->cancel_callbacks) + return; if(!ec) this->find_resource(session); else if(this->on_error) @@ -482,7 +515,6 @@ namespace SimpleWeb { auto response = std::shared_ptr(new Response(session), [this](Response *response_ptr) { auto response = std::shared_ptr(response_ptr); response->send([this, response](const error_code &ec) { - response->session->cancel_timeout(); if(!ec) { if(response->close_connection_after_response) return; @@ -530,24 +562,25 @@ namespace SimpleWeb { Server &operator=(const Server &) = delete; public: - static std::shared_ptr create() { - return std::shared_ptr(new Server()); - } - - protected: Server() : ServerBase::ServerBase(80) {} + protected: void accept() override { - auto session = std::make_shared(this->shared_from_this(), std::make_shared(*io_service)); + auto session = std::make_shared(this, std::make_shared(*io_service)); acceptor->async_accept(*session->socket, [this, session](const error_code &ec) { + auto lock = session->cancel_callbacks_mutex->shared_lock(); + if(*session->cancel_callbacks) + return; + //Immediately start accepting a new connection (unless io_service has been stopped) if(ec != asio::error::operation_aborted) this->accept(); if(!ec) { asio::ip::tcp::no_delay option(true); - session->socket->set_option(option); + error_code ec; + session->socket->set_option(option, ec); this->read_request_and_content(session); } diff --git a/server_https.hpp b/server_https.hpp index 3ab1796..44a209c 100644 --- a/server_https.hpp +++ b/server_https.hpp @@ -24,8 +24,16 @@ namespace SimpleWeb { bool set_session_id_context = false; public: - static std::shared_ptr create(const std::string &cert_file, const std::string &private_key_file, const std::string &verify_file = std::string()) { - return std::shared_ptr(new Server(cert_file, private_key_file, verify_file)); + Server(const std::string &cert_file, const std::string &private_key_file, const std::string &verify_file = std::string()) + : ServerBase::ServerBase(443), context(asio::ssl::context::tlsv12) { + context.use_certificate_chain_file(cert_file); + context.use_private_key_file(private_key_file, asio::ssl::context::pem); + + if(verify_file.size() > 0) { + context.load_verify_file(verify_file); + context.set_verify_mode(asio::ssl::verify_peer | asio::ssl::verify_fail_if_no_peer_cert | asio::ssl::verify_client_once); + set_session_id_context = true; + } } void start() override { @@ -40,34 +48,30 @@ namespace SimpleWeb { } protected: - Server(const std::string &cert_file, const std::string &private_key_file, const std::string &verify_file) - : ServerBase::ServerBase(443), context(asio::ssl::context::tlsv12) { - context.use_certificate_chain_file(cert_file); - context.use_private_key_file(private_key_file, asio::ssl::context::pem); - - if(verify_file.size() > 0) { - context.load_verify_file(verify_file); - context.set_verify_mode(asio::ssl::verify_peer | asio::ssl::verify_fail_if_no_peer_cert | asio::ssl::verify_client_once); - set_session_id_context = true; - } - } - asio::ssl::context context; void accept() override { - auto session = std::make_shared(this->shared_from_this(), std::make_shared(*io_service, context)); + auto session = std::make_shared(this, std::make_shared(*io_service, context)); acceptor->async_accept(session->socket->lowest_layer(), [this, session](const error_code &ec) { + auto lock = session->cancel_callbacks_mutex->shared_lock(); + if(*session->cancel_callbacks) + return; + if(ec != asio::error::operation_aborted) this->accept(); if(!ec) { asio::ip::tcp::no_delay option(true); - session->socket->lowest_layer().set_option(option); + error_code ec; + session->socket->lowest_layer().set_option(option, ec); session->set_timeout(config.timeout_request); session->socket->async_handshake(asio::ssl::stream_base::server, [this, session](const error_code &ec) { session->cancel_timeout(); + auto lock = session->cancel_callbacks_mutex->shared_lock(); + if(*session->cancel_callbacks) + return; if(!ec) this->read_request_and_content(session); else if(this->on_error) diff --git a/tests/io_test.cpp b/tests/io_test.cpp index 9d47c5f..73315ab 100644 --- a/tests/io_test.cpp +++ b/tests/io_test.cpp @@ -5,35 +5,69 @@ using namespace std; +#ifndef USE_STANDALONE_ASIO +namespace asio = boost::asio; +#endif + typedef SimpleWeb::Server HttpServer; typedef SimpleWeb::Client HttpClient; int main() { - auto server = HttpServer::create(); - server->config.port = 8080; + { + SimpleWeb::SharedMutex mutex; + int count = 0; + { + thread t([&] { + auto lock = mutex.shared_lock(); + { + auto lock = mutex.shared_lock(); + ++count; + } + }); + this_thread::sleep_for(chrono::milliseconds(100)); + t.detach(); + assert(count == 1); + } + thread t; + { + auto lock = mutex.unique_lock(); + t = thread([&] { + auto lock = mutex.unique_lock(); + ++count; + }); + this_thread::sleep_for(chrono::milliseconds(100)); + assert(count == 1); + } + t.join(); + assert(count == 2); + } - server->resource["^/string$"]["POST"] = [](shared_ptr response, shared_ptr request) { + + HttpServer server; + server.config.port = 8080; + + server.resource["^/string$"]["POST"] = [](shared_ptr response, shared_ptr request) { auto content = request->content.string(); *response << "HTTP/1.1 200 OK\r\nContent-Length: " << content.length() << "\r\n\r\n" << content; }; - server->resource["^/string2$"]["POST"] = [](shared_ptr response, shared_ptr request) { + server.resource["^/string2$"]["POST"] = [](shared_ptr response, shared_ptr request) { response->write(request->content.string()); }; - server->resource["^/string3$"]["POST"] = [](shared_ptr response, shared_ptr request) { - std::stringstream stream; + server.resource["^/string3$"]["POST"] = [](shared_ptr response, shared_ptr request) { + stringstream stream; stream << request->content.rdbuf(); response->write(stream); }; - server->resource["^/string4$"]["POST"] = [](shared_ptr response, shared_ptr /*request*/) { + server.resource["^/string4$"]["POST"] = [](shared_ptr response, shared_ptr /*request*/) { response->write(SimpleWeb::StatusCode::client_error_forbidden, {{"Test1", "test2"}, {"tesT3", "test4"}}); }; - server->resource["^/info$"]["GET"] = [](shared_ptr response, shared_ptr request) { + server.resource["^/info$"]["GET"] = [](shared_ptr response, shared_ptr request) { stringstream content_stream; content_stream << request->method << " " << request->path << " " << request->http_version << " "; content_stream << request->header.find("test parameter")->second; @@ -44,20 +78,28 @@ int main() { << content_stream.rdbuf(); }; - server->resource["^/match/([0-9]+)$"]["GET"] = [](shared_ptr response, shared_ptr request) { + server.resource["^/work$"]["GET"] = [](shared_ptr response, shared_ptr /*request*/) { + thread work_thread([response] { + this_thread::sleep_for(chrono::seconds(5)); + response->write("Work done"); + }); + work_thread.detach(); + }; + + server.resource["^/match/([0-9]+)$"]["GET"] = [](shared_ptr response, shared_ptr request) { string number = request->path_match[1]; *response << "HTTP/1.1 200 OK\r\nContent-Length: " << number.length() << "\r\n\r\n" << number; }; - server->resource["^/header$"]["GET"] = [](shared_ptr response, shared_ptr request) { + server.resource["^/header$"]["GET"] = [](shared_ptr response, shared_ptr request) { auto content = request->header.find("test1")->second + request->header.find("test2")->second; *response << "HTTP/1.1 200 OK\r\nContent-Length: " << content.length() << "\r\n\r\n" << content; }; - server->resource["^/query_string$"]["GET"] = [](shared_ptr response, shared_ptr request) { + server.resource["^/query_string$"]["GET"] = [](shared_ptr response, shared_ptr request) { assert(request->path == "/query_string"); assert(request->query_string == "testing"); auto queries = request->parse_query_string(); @@ -68,16 +110,16 @@ int main() { thread server_thread([&server]() { //Start server - server->start(); + server.start(); }); this_thread::sleep_for(chrono::seconds(1)); { - auto client = HttpClient::create("localhost:8080"); + HttpClient client("localhost:8080"); { stringstream output; - auto r = client->request("POST", "/string", "A string"); + auto r = client.request("POST", "/string", "A string"); assert(SimpleWeb::status_code(r->status_code) == SimpleWeb::StatusCode::success_ok); output << r->content.rdbuf(); assert(output.str() == "A string"); @@ -85,14 +127,14 @@ int main() { { stringstream output; - auto r = client->request("POST", "/string", "A string"); + auto r = client.request("POST", "/string", "A string"); assert(SimpleWeb::status_code(r->status_code) == SimpleWeb::StatusCode::success_ok); assert(r->content.string() == "A string"); } { stringstream output; - auto r = client->request("POST", "/string2", "A string"); + auto r = client.request("POST", "/string2", "A string"); assert(SimpleWeb::status_code(r->status_code) == SimpleWeb::StatusCode::success_ok); output << r->content.rdbuf(); assert(output.str() == "A string"); @@ -100,7 +142,7 @@ int main() { { stringstream output; - auto r = client->request("POST", "/string3", "A string"); + auto r = client.request("POST", "/string3", "A string"); assert(SimpleWeb::status_code(r->status_code) == SimpleWeb::StatusCode::success_ok); output << r->content.rdbuf(); assert(output.str() == "A string"); @@ -108,7 +150,7 @@ int main() { { stringstream output; - auto r = client->request("POST", "/string4", "A string"); + auto r = client.request("POST", "/string4", "A string"); assert(SimpleWeb::status_code(r->status_code) == SimpleWeb::StatusCode::client_error_forbidden); assert(r->header.size() == 3); assert(r->header.find("test1")->second == "test2"); @@ -121,78 +163,78 @@ int main() { { stringstream output; stringstream content("A string"); - auto r = client->request("POST", "/string", content); + auto r = client.request("POST", "/string", content); output << r->content.rdbuf(); assert(output.str() == "A string"); } { stringstream output; - auto r = client->request("GET", "/info", "", {{"Test Parameter", "test value"}}); + auto r = client.request("GET", "/info", "", {{"Test Parameter", "test value"}}); output << r->content.rdbuf(); assert(output.str() == "GET /info 1.1 test value"); } { stringstream output; - auto r = client->request("GET", "/match/123"); + auto r = client.request("GET", "/match/123"); output << r->content.rdbuf(); assert(output.str() == "123"); } } { - auto client = HttpClient::create("localhost:8080"); + HttpClient client("localhost:8080"); HttpClient::Connection *connection; { // test performing the stream version of the request methods first stringstream output; stringstream content("A string"); - auto r = client->request("POST", "/string", content); + auto r = client.request("POST", "/string", content); output << r->content.rdbuf(); assert(output.str() == "A string"); - assert(client->connections.size() == 1); - connection = client->connections.front().get(); + assert(client.connections.size() == 1); + connection = client.connections.front().get(); } { stringstream output; - auto r = client->request("POST", "/string", "A string"); + auto r = client.request("POST", "/string", "A string"); output << r->content.rdbuf(); assert(output.str() == "A string"); - assert(client->connections.size() == 1); - assert(connection == client->connections.front().get()); + assert(client.connections.size() == 1); + assert(connection == client.connections.front().get()); } { stringstream output; - auto r = client->request("GET", "/header", "", {{"test1", "test"}, {"test2", "ing"}}); + auto r = client.request("GET", "/header", "", {{"test1", "test"}, {"test2", "ing"}}); output << r->content.rdbuf(); assert(output.str() == "testing"); - assert(client->connections.size() == 1); - assert(connection == client->connections.front().get()); + assert(client.connections.size() == 1); + assert(connection == client.connections.front().get()); } { stringstream output; - auto r = client->request("GET", "/query_string?testing"); + auto r = client.request("GET", "/query_string?testing"); assert(r->content.string() == "testing"); - assert(client->connections.size() == 1); - assert(connection == client->connections.front().get()); + assert(client.connections.size() == 1); + assert(connection == client.connections.front().get()); } } { - auto client = HttpClient::create("localhost:8080"); + HttpClient client("localhost:8080"); bool call = false; - client->request("GET", "/match/123", [&call](shared_ptr response, const SimpleWeb::error_code &ec) { + client.request("GET", "/match/123", [&call](shared_ptr response, const SimpleWeb::error_code &ec) { assert(!ec); stringstream output; output << response->content.rdbuf(); assert(output.str() == "123"); call = true; }); - client->io_service->run(); + client.io_service->run(); assert(call); { @@ -201,7 +243,7 @@ int main() { for(size_t c = 0; c < 100; ++c) { calls[c] = 0; threads.emplace_back([c, &client, &calls] { - client->request("GET", "/match/123", [c, &calls](shared_ptr response, const SimpleWeb::error_code &ec) { + client.request("GET", "/match/123", [c, &calls](shared_ptr response, const SimpleWeb::error_code &ec) { assert(!ec); stringstream output; output << response->content.rdbuf(); @@ -212,53 +254,136 @@ int main() { } for(auto &thread : threads) thread.join(); - assert(client->connections.size() == 100); - client->io_service->reset(); - client->io_service->run(); - assert(client->connections.size() == 1); + assert(client.connections.size() == 100); + client.io_service->reset(); + client.io_service->run(); + assert(client.connections.size() == 1); for(auto call : calls) assert(call); } } { - auto client = HttpClient::create("localhost:8080"); - assert(client->connections.size() == 0); + HttpClient client("localhost:8080"); + assert(client.connections.size() == 0); for(size_t c = 0; c < 5000; ++c) { - auto r1 = client->request("POST", "/string", "A string"); + auto r1 = client.request("POST", "/string", "A string"); assert(SimpleWeb::status_code(r1->status_code) == SimpleWeb::StatusCode::success_ok); assert(r1->content.string() == "A string"); - assert(client->connections.size() == 1); + assert(client.connections.size() == 1); stringstream content("A string"); - auto r2 = client->request("POST", "/string", content); + auto r2 = client.request("POST", "/string", content); assert(SimpleWeb::status_code(r2->status_code) == SimpleWeb::StatusCode::success_ok); assert(r2->content.string() == "A string"); - assert(client->connections.size() == 1); + assert(client.connections.size() == 1); } } - for(size_t c = 0; c < 500; ++c) { + for(size_t c = 0; c < 100; ++c) { { - auto client = HttpClient::create("localhost:8080"); - auto r = client->request("POST", "/string", "A string"); + HttpClient client("localhost:8080"); + auto r = client.request("POST", "/string", "A string"); assert(SimpleWeb::status_code(r->status_code) == SimpleWeb::StatusCode::success_ok); assert(r->content.string() == "A string"); - assert(client->connections.size() == 1); + assert(client.connections.size() == 1); } { - auto client = HttpClient::create("localhost:8080"); + HttpClient client("localhost:8080"); stringstream content("A string"); - auto r = client->request("POST", "/string", content); + auto r = client.request("POST", "/string", content); assert(SimpleWeb::status_code(r->status_code) == SimpleWeb::StatusCode::success_ok); assert(r->content.string() == "A string"); - assert(client->connections.size() == 1); + assert(client.connections.size() == 1); } } - server->stop(); + // Test Client client's stop() + for(size_t c = 0; c < 40; ++c) { + auto io_service = make_shared(); + bool call = false; + HttpClient client("localhost:8080"); + client.io_service = io_service; + client.request("GET", "/work", [&call](shared_ptr /*response*/, const SimpleWeb::error_code &ec) { + call = true; + assert(ec); + }); + thread thread([io_service] { + io_service->run(); + }); + this_thread::sleep_for(chrono::milliseconds(100)); + client.close(); + this_thread::sleep_for(chrono::milliseconds(100)); + thread.join(); + assert(call); + } + + // Test Client destructor that should cancel the client's request + for(size_t c = 0; c < 40; ++c) { + auto io_service = make_shared(); + { + HttpClient client("localhost:8080"); + client.io_service = io_service; + client.request("GET", "/work", [](shared_ptr /*response*/, const SimpleWeb::error_code & /*ec*/) { + assert(false); + }); + thread thread([io_service] { + io_service->run(); + }); + thread.detach(); + this_thread::sleep_for(chrono::milliseconds(100)); + } + this_thread::sleep_for(chrono::milliseconds(100)); + } + + server.stop(); server_thread.join(); + // Test server destructor + { + auto io_service = make_shared(); + bool call = false; + bool client_catch = false; + { + HttpServer server; + server.config.port = 8081; + server.io_service = io_service; + server.resource["^/test$"]["GET"] = [&call](shared_ptr response, shared_ptr /*request*/) { + call = true; + thread sleep_thread([response] { + this_thread::sleep_for(chrono::seconds(5)); + response->write(SimpleWeb::StatusCode::success_ok, "test"); + response->send([](const SimpleWeb::error_code & /*ec*/) { + assert(false); + }); + }); + sleep_thread.detach(); + }; + server.start(); + thread server_thread([io_service] { + io_service->run(); + }); + server_thread.detach(); + this_thread::sleep_for(chrono::seconds(1)); + thread client_thread([&client_catch] { + HttpClient client("localhost:8081"); + try { + auto r = client.request("GET", "/test"); + assert(false); + } + catch(...) { + client_catch = true; + } + }); + client_thread.detach(); + this_thread::sleep_for(chrono::seconds(1)); + } + this_thread::sleep_for(chrono::seconds(5)); + assert(call); + assert(client_catch); + io_service->stop(); + } + return 0; } diff --git a/tests/parse_test.cpp b/tests/parse_test.cpp index 8bc495f..35cfb57 100644 --- a/tests/parse_test.cpp +++ b/tests/parse_test.cpp @@ -13,7 +13,7 @@ public: void accept() override {} void parse_request_test() { - auto session = std::make_shared(this->shared_from_this(), std::make_shared(*io_service)); + auto session = std::make_shared(this, std::make_shared(*io_service)); std::ostream stream(&session->request->content.streambuf); stream << "GET /test/ HTTP/1.1\r\n"; diff --git a/utility.hpp b/utility.hpp index f63d8e0..7a1531b 100644 --- a/utility.hpp +++ b/utility.hpp @@ -2,7 +2,9 @@ #define SIMPLE_WEB_SERVER_UTILITY_HPP #include "status_code.hpp" +#include #include +#include #include #include @@ -134,6 +136,131 @@ namespace SimpleWeb { return result; } }; +} + +//TODO: see if there is an MSYS2 definition in an MSYS2 environment +#ifdef PTHREAD_RWLOCK_INITIALIZER +namespace SimpleWeb { + /// Read-preferring R/W lock. + /// Uses pthread_rwlock. + class SharedMutex { + pthread_rwlock_t rwlock; + + public: + class SharedLock { + friend class SharedMutex; + pthread_rwlock_t &rwlock; + + SharedLock(pthread_rwlock_t &rwlock) : rwlock(rwlock) { + pthread_rwlock_rdlock(&rwlock); + } + + public: + ~SharedLock() { + pthread_rwlock_unlock(&rwlock); + } + }; + + class UniqueLock { + friend class SharedMutex; + pthread_rwlock_t &rwlock; + + UniqueLock(pthread_rwlock_t &rwlock) : rwlock(rwlock) { + pthread_rwlock_wrlock(&rwlock); + } + + public: + ~UniqueLock() { + pthread_rwlock_unlock(&rwlock); + } + }; + + public: + SharedMutex() { + + pthread_rwlock_init(&rwlock, nullptr); + } + + ~SharedMutex() { + pthread_rwlock_destroy(&rwlock); + } + + std::unique_ptr shared_lock() { + return std::unique_ptr(new SharedLock(rwlock)); + } + + std::unique_ptr unique_lock() { + return std::unique_ptr(new UniqueLock(rwlock)); + } + }; } // namespace SimpleWeb +#else +namespace SimpleWeb { + /// Read-preferring R/W lock. + /// Based on https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Using_a_condition_variable_and_a_mutex pseudocode. + /// TODO: Someone that uses Windows should implement Windows specific R/W locks here. + class SharedMutex { + std::mutex m; + std::condition_variable c; + int r = 0; + bool w = false; + + public: + class SharedLock { + friend class SharedMutex; + std::condition_variable &c; + int &r; + std::unique_lock lock; + + SharedLock(std::mutex &m, std::condition_variable &c, int &r, bool &w) : c(c), r(r), lock(m) { + while(w) + c.wait(lock); + ++r; + lock.unlock(); + } + + public: + ~SharedLock() { + lock.lock(); + --r; + if(r == 0) + c.notify_all(); + lock.unlock(); + } + }; + + class UniqueLock { + friend class SharedMutex; + std::condition_variable &c; + bool &w; + std::unique_lock lock; + + UniqueLock(std::mutex &m, std::condition_variable &c, int &r, bool &w) : c(c), w(w), lock(m) { + while(w || r > 0) + c.wait(lock); + w = true; + lock.unlock(); + } + + public: + ~UniqueLock() { + lock.lock(); + w = false; + c.notify_all(); + lock.unlock(); + } + }; + + public: + std::unique_ptr shared_lock() { + return std::unique_ptr(new SharedLock(m, c, r, w)); + } + + std::unique_ptr unique_lock() { + return std::unique_ptr(new UniqueLock(m, c, r, w)); + } + }; +} // namespace SimpleWeb +#endif #endif // SIMPLE_WEB_SERVER_UTILITY_HPP