From e1aebba3445bc42fe6b552b825ba9f4bda16d343 Mon Sep 17 00:00:00 2001 From: eidheim Date: Wed, 14 Jun 2017 15:16:24 +0200 Subject: [PATCH] Fixes #114: added support for async client requests. Also includes some cleanup, support for parallel requests, and client reconnect --- client_http.hpp | 488 ++++++++++++++++++++++++++++++------------- client_https.hpp | 142 ++++++------- tests/io_test.cpp | 50 +++++ tests/parse_test.cpp | 4 +- 4 files changed, 471 insertions(+), 213 deletions(-) diff --git a/client_http.hpp b/client_http.hpp index 0f285d3..fc2adf8 100644 --- a/client_http.hpp +++ b/client_http.hpp @@ -2,13 +2,16 @@ #define CLIENT_HTTP_HPP #include -#include +#include #include #include #include -#ifdef USE_STANDALONE_ASIO #include +#include +#include + +#ifdef USE_STANDALONE_ASIO #include #include #include @@ -20,7 +23,6 @@ namespace SimpleWeb { } #else #include -#include #include #include namespace SimpleWeb { @@ -62,14 +64,14 @@ namespace SimpleWeb { # endif namespace SimpleWeb { + using Header = std::unordered_multimap; + template class Client; template class ClientBase { public: - virtual ~ClientBase() {} - class Response { friend class ClientBase; friend class Client; @@ -78,7 +80,7 @@ namespace SimpleWeb { std::istream content; - std::unordered_multimap header; + Header header; private: asio::streambuf content_buffer; @@ -99,69 +101,268 @@ namespace SimpleWeb { std::string proxy_server; }; + protected: + class RequestCallback { + public: + bool stop=false; + std::mutex stop_mutex; + }; + + class Connection { + public: + Connection(const std::string &host, unsigned short port, const Config &config, std::unique_ptr &&socket) : + host(host), port(port), config(config), socket(std::move(socket)) { + if(config.proxy_server.empty()) + query=std::unique_ptr(new asio::ip::tcp::resolver::query(host, std::to_string(port))); + else { + auto proxy_host_port=parse_host_port(config.proxy_server, 8080); + query=std::unique_ptr(new asio::ip::tcp::resolver::query(proxy_host_port.first, std::to_string(proxy_host_port.second))); + } + } + std::string host; + unsigned short port; + Config config; + + std::unique_ptr socket; + bool in_use=false; + bool reconnecting=false; + + std::unique_ptr query; + }; + + class Session { + public: + Session(const std::shared_ptr &io_service, const std::shared_ptr &connection, std::unique_ptr &&request_buffer) : + io_service(io_service), connection(connection), request_buffer(std::move(request_buffer)), response(new Response()) {} + std::shared_ptr io_service; + std::shared_ptr connection; + std::unique_ptr request_buffer; + std::shared_ptr response; + std::function callback; + }; + + public: /// Set before calling request Config config; - std::shared_ptr request(const std::string& request_type, const std::string& path="/", boost::string_ref content="", - const std::map& header=std::map()) { - auto write_buffer=create_request_header(request_type, path, header); + /// If you have your own asio::io_service, store its pointer here before calling request(). + /// When using asynchronous requests, running the io_service is up to the programmer. + std::shared_ptr io_service; + + virtual ~ClientBase() { + boost::unique_lock lock(*stop_request_callback_mutex); + *stop_request_callback=true; + } + + /// Synchronous request. The io_service is run within this function. + std::shared_ptr request(const std::string& method, const std::string& path=std::string("/"), boost::string_ref content="", const Header& header=Header()) { + auto session=std::make_shared(io_service, get_connection(), create_request_header(method, path, header)); + std::shared_ptr response; + session->callback=[this, &response, session](const error_code &ec) { + { + std::lock_guard lock(*connections_mutex); + session->connection->in_use=false; + } + response=session->response; + if(ec) + throw system_error(ec); + }; - std::ostream write_stream(write_buffer.get()); + std::ostream write_stream(session->request_buffer.get()); if(content.size()>0) write_stream << "Content-Length: " << content.size() << "\r\n"; write_stream << "\r\n" << content; - request_write(write_buffer); + Client::connect(session); io_service->reset(); io_service->run(); - return request_read(); + return response; } - std::shared_ptr request(const std::string& request_type, const std::string& path, std::iostream& content, - const std::map& header=std::map()) { - auto write_buffer=create_request_header(request_type, path, header); + /// Synchronous request. The io_service is run within this function. + std::shared_ptr request(const std::string& method, const std::string& path, std::iostream& content, const Header& header=Header()) { + auto session=std::make_shared(io_service, get_connection(), create_request_header(method, path, header)); + std::shared_ptr response; + session->callback=[this, &response, session](const error_code &ec) { + { + std::lock_guard lock(*connections_mutex); + session->connection->in_use=false; + } + response=session->response; + if(ec) + throw system_error(ec); + }; content.seekp(0, std::ios::end); auto content_length=content.tellp(); content.seekp(0, std::ios::beg); - std::ostream write_stream(write_buffer.get()); + std::ostream write_stream(session->request_buffer.get()); if(content_length>0) write_stream << "Content-Length: " << content_length << "\r\n"; write_stream << "\r\n"; if(content_length>0) write_stream << content.rdbuf(); - request_write(write_buffer); + Client::connect(session); io_service->reset(); io_service->run(); - return request_read(); + return response; } - void close() { - error_code ec; - socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec); - socket->lowest_layer().close(ec); + /// Asynchronous request where setting and/or running Client's io_service is required. + /// The request callback is not run if the calling Client object has been destroyed. + void request(const std::string &method, const std::string &path, boost::string_ref content, const Header& header, + std::function, const error_code&)> &&request_callback_) { + auto session=std::make_shared(io_service, get_connection(), create_request_header(method, path, header)); + auto request_callback=std::make_shared, const error_code&)>>(std::move(request_callback_)); + auto connections_mutex=this->connections_mutex; + auto stop_request_callback=this->stop_request_callback; + auto stop_request_callback_mutex=this->stop_request_callback_mutex; + session->callback=[session, request_callback, connections_mutex, stop_request_callback, stop_request_callback_mutex](const error_code &ec) { + { + std::lock_guard lock(*connections_mutex); + session->connection->in_use=false; + } + + boost::shared_lock lock(*stop_request_callback_mutex); + if(*stop_request_callback) + return; + + if(*request_callback) + (*request_callback)(session->response, ec); + }; + + std::ostream write_stream(session->request_buffer.get()); + if(content.size()>0) + write_stream << "Content-Length: " << content.size() << "\r\n"; + write_stream << "\r\n" << content; + + Client::connect(session); + } + + /// Asynchronous request where setting and/or running Client's io_service is required. + /// The request callback is not run if the calling Client object has been destroyed. + void request(const std::string &method, const std::string &path, boost::string_ref content, + std::function, const error_code&)> &&request_callback) { + request(method, path, content, Header(), std::move(request_callback)); + } + + /// Asynchronous request where setting and/or running Client's io_service is required. + /// The request callback is not run if the calling Client object has been destroyed. + void request(const std::string &method, const std::string &path, + std::function, const error_code&)> &&request_callback) { + request(method, path, std::string(), Header(), std::move(request_callback)); + } + + /// Asynchronous request where setting and/or running Client's io_service is required. + /// The request callback is not run if the calling Client object has been destroyed. + void request(const std::string &method, std::function, const error_code&)> &&request_callback) { + request(method, std::string("/"), std::string(), Header(), std::move(request_callback)); + } + + /// Asynchronous request where setting and/or running Client's io_service is required. + /// The request callback is not run if the calling Client object has been destroyed. + void request(const std::string &method, const std::string &path, std::iostream& content, const Header& header, + std::function, const error_code&)> &&request_callback_) { + auto session=std::make_shared(io_service, get_connection(), create_request_header(method, path, header)); + auto request_callback=std::make_shared, const error_code&)>>(std::move(request_callback_)); + auto connections_mutex=this->connections_mutex; + auto stop_request_callback=this->stop_request_callback; + auto stop_request_callback_mutex=this->stop_request_callback_mutex; + session->callback=[session, request_callback, connections_mutex, stop_request_callback, stop_request_callback_mutex](const error_code &ec) { + { + std::lock_guard lock(*connections_mutex); + session->connection->in_use=false; + } + + boost::shared_lock lock(*stop_request_callback_mutex); + if(*stop_request_callback) + return; + + if(*request_callback) + (*request_callback)(session->response, ec); + }; + + content.seekp(0, std::ios::end); + auto content_length=content.tellp(); + content.seekp(0, std::ios::beg); + std::ostream write_stream(session->request_buffer.get()); + if(content_length>0) + write_stream << "Content-Length: " << content_length << "\r\n"; + write_stream << "\r\n"; + if(content_length>0) + write_stream << content.rdbuf(); + + Client::connect(session); + } + + void request(const std::string &method, const std::string &path, std::iostream& content, + std::function, const error_code&)> &&request_callback) { + request(method, path, content, Header(), std::move(request_callback)); } protected: - std::shared_ptr io_service; - - std::unique_ptr socket; - std::string host; unsigned short port; - - ClientBase(const std::string& host_port, unsigned short default_port) : io_service(new asio::io_service()) { + + std::vector> connections; + std::shared_ptr connections_mutex; + + std::shared_ptr stop_request_callback; + std::shared_ptr stop_request_callback_mutex; + + ClientBase(const std::string& host_port, unsigned short default_port) : io_service(new asio::io_service()), + connections_mutex(new std::mutex()), stop_request_callback(new bool(false)), stop_request_callback_mutex(new boost::shared_mutex()) { auto parsed_host_port=parse_host_port(host_port, default_port); host=parsed_host_port.first; port=parsed_host_port.second; } - std::pair parse_host_port(const std::string &host_port, unsigned short default_port) { + std::shared_ptr get_connection() { + std::shared_ptr connection; + std::lock_guard lock(*connections_mutex); + for(auto it=connections.begin();it!=connections.end();) { + if((*it)->in_use) + ++it; + else if(!connection) { + connection=*it; + ++it; + } + else + it=connections.erase(it); + } + if(!connection) { + connection=create_connection(); + connections.emplace_back(connection); + } + connection->reconnecting=false; + connection->in_use=true; + return connection; + } + + virtual std::shared_ptr create_connection()=0; + + std::unique_ptr create_request_header(const std::string& method, const std::string& path, const Header& header) const { + auto corrected_path=path; + if(corrected_path=="") + corrected_path="/"; + if(!config.proxy_server.empty() && std::is_same::value) + corrected_path="http://"+host+':'+std::to_string(port)+corrected_path; + + std::unique_ptr request_buffer(new asio::streambuf()); + std::ostream write_stream(request_buffer.get()); + write_stream << method << " " << corrected_path << " HTTP/1.1\r\n"; + write_stream << "Host: " << host << "\r\n"; + for(auto& h: header) + write_stream << h.first << ": " << h.second << "\r\n"; + return request_buffer; + } + + static std::pair parse_host_port(const std::string &host_port, unsigned short default_port) { std::pair parsed_host_port; size_t host_end=host_port.find(':'); if(host_end==std::string::npos) { @@ -175,25 +376,22 @@ namespace SimpleWeb { return parsed_host_port; } - virtual void connect()=0; - - std::shared_ptr get_timeout_timer(size_t timeout=0) { + static std::shared_ptr get_timeout_timer(const std::shared_ptr &session, size_t timeout=0) { if(timeout==0) - timeout=config.timeout; + timeout=session->connection->config.timeout; if(timeout==0) return nullptr; - auto timer=std::make_shared(*io_service); + auto timer=std::make_shared(*session->io_service); timer->expires_from_now(boost::posix_time::seconds(timeout)); - timer->async_wait([this](const error_code& ec) { - if(!ec) { - close(); - } + timer->async_wait([session](const error_code& ec) { + if(!ec) + close(session); }); return timer; } - void parse_response_header(const std::shared_ptr &response) const { + static void parse_response_header(const std::shared_ptr &response) { std::string line; getline(response->content, line); size_t version_end=line.find(' '); @@ -202,7 +400,7 @@ namespace SimpleWeb { response->http_version=line.substr(5, version_end-5); if((version_end+1)status_code=line.substr(version_end+1, line.size()-(version_end+1)-1); - + getline(response->content, line); size_t param_end; while((param_end=line.find(':'))!=std::string::npos) { @@ -213,145 +411,145 @@ namespace SimpleWeb { if(value_startheader.insert(std::make_pair(line.substr(0, param_end), line.substr(value_start, line.size()-value_start-1))); } - + getline(response->content, line); } } } - std::shared_ptr create_request_header(const std::string& request_type, const std::string& path, const std::map& header) const { - auto corrected_path=path; - if(corrected_path=="") - corrected_path="/"; - if(!config.proxy_server.empty() && std::is_same::value) - corrected_path="http://"+host+':'+std::to_string(port)+corrected_path; - - auto write_buffer=std::make_shared(); - std::ostream write_stream(write_buffer.get()); - write_stream << request_type << " " << corrected_path << " HTTP/1.1\r\n"; - write_stream << "Host: " << host << "\r\n"; - for(auto& h: header) - write_stream << h.first << ": " << h.second << "\r\n"; - return write_buffer; - } - - void request_write(const std::shared_ptr &write_buffer) { - connect(); - - auto timer=get_timeout_timer(); - asio::async_write(*socket, *write_buffer, [this, write_buffer, timer](const error_code &ec, size_t /*bytes_transferred*/) { + static void write(const std::shared_ptr &session) { + auto timer=get_timeout_timer(session); + asio::async_write(*session->connection->socket, session->request_buffer->data(), [session, timer](const error_code &ec, size_t /*bytes_transferred*/) { if(timer) timer->cancel(); - if(ec) { - close(); - throw system_error(ec); + if(!ec) + read(session); + else { + close(session); + session->callback(ec); } }); } - std::shared_ptr request_read() { - std::shared_ptr response(new Response()); - - auto timer=get_timeout_timer(); - asio::async_read_until(*socket, response->content_buffer, "\r\n\r\n", - [this, response, timer](const error_code& ec, size_t bytes_transferred) { + static void read(const std::shared_ptr &session) { + auto timer=get_timeout_timer(session); + asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n\r\n", + [session, timer](const error_code& ec, size_t bytes_transferred) { if(timer) timer->cancel(); if(!ec) { - size_t num_additional_bytes=response->content_buffer.size()-bytes_transferred; + session->connection->reconnecting=false; - parse_response_header(response); - - auto header_it=response->header.find("Content-Length"); - if(header_it!=response->header.end()) { + size_t num_additional_bytes=session->response->content_buffer.size()-bytes_transferred; + + parse_response_header(session->response); + + auto header_it=session->response->header.find("Content-Length"); + if(header_it!=session->response->header.end()) { auto content_length=stoull(header_it->second); if(content_length>num_additional_bytes) { - auto timer=get_timeout_timer(); - asio::async_read(*socket, response->content_buffer, asio::transfer_exactly(content_length-num_additional_bytes), - [this, response, timer](const error_code& ec, size_t /*bytes_transferred*/) { + auto timer=get_timeout_timer(session); + asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(content_length-num_additional_bytes), + [session, timer](const error_code& ec, size_t /*bytes_transferred*/) { if(timer) timer->cancel(); - if(ec) { - close(); - throw system_error(ec); + if(!ec) + session->callback(ec); + else { + close(session); + session->callback(ec); } }); } + else + session->callback(ec); } - else if((header_it=response->header.find("Transfer-Encoding"))!=response->header.end() && header_it->second=="chunked") { + else if((header_it=session->response->header.find("Transfer-Encoding"))!=session->response->header.end() && header_it->second=="chunked") { auto tmp_streambuf=std::make_shared(); - request_read_chunked(response, tmp_streambuf); + read_chunked(session, tmp_streambuf); } - else if(response->http_version<"1.1" || ((header_it=response->header.find("Connection"))!=response->header.end() && header_it->second=="close")) { - auto timer=get_timeout_timer(); - asio::async_read(*socket, response->content_buffer, [this, response, timer](const error_code& ec, size_t /*bytes_transferred*/) { + else if(session->response->http_version<"1.1" || ((header_it=session->response->header.find("Session"))!=session->response->header.end() && header_it->second=="close")) { + auto timer=get_timeout_timer(session); + asio::async_read(*session->connection->socket, session->response->content_buffer, [session, timer](const error_code& ec, size_t /*bytes_transferred*/) { if(timer) - timer->cancel(); - if(ec) { - close(); - if(ec!=asio::error::eof) - throw system_error(ec); + timer->cancel(); + if(!ec) + session->callback(ec); + else { + close(session); + if(ec==asio::error::eof) { + error_code ec; + session->callback(ec); + } + else + session->callback(ec); } }); } + else + session->callback(ec); } else { - close(); - throw system_error(ec); + if(!session->connection->reconnecting) { + session->connection->reconnecting=true; + close(session); + Client::connect(session); + } + else { + close(session); + session->callback(ec); + } } }); - io_service->reset(); - io_service->run(); - - return response; } - void request_read_chunked(const std::shared_ptr &response, const std::shared_ptr &tmp_streambuf) { - auto timer=get_timeout_timer(); - asio::async_read_until(*socket, response->content_buffer, "\r\n", [this, response, tmp_streambuf, timer](const error_code& ec, size_t bytes_transferred) { + static void read_chunked(const std::shared_ptr &session, const std::shared_ptr &tmp_streambuf) { + auto timer=get_timeout_timer(session); + asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n", [session, tmp_streambuf, timer](const error_code& ec, size_t bytes_transferred) { if(timer) timer->cancel(); if(!ec) { std::string line; - getline(response->content, line); + getline(session->response->content, line); bytes_transferred-=line.size()+1; line.pop_back(); std::streamsize length=stol(line, 0, 16); - auto num_additional_bytes=static_cast(response->content_buffer.size()-bytes_transferred); + auto num_additional_bytes=static_cast(session->response->content_buffer.size()-bytes_transferred); - auto post_process=[this, response, tmp_streambuf, length] { + auto post_process=[session, tmp_streambuf, length] { std::ostream tmp_stream(tmp_streambuf.get()); if(length>0) { std::vector buffer(static_cast(length)); - response->content.read(&buffer[0], length); + session->response->content.read(&buffer[0], length); tmp_stream.write(&buffer[0], length); } //Remove "\r\n" - response->content.get(); - response->content.get(); + session->response->content.get(); + session->response->content.get(); if(length>0) - request_read_chunked(response, tmp_streambuf); + read_chunked(session, tmp_streambuf); else { - std::ostream response_stream(&response->content_buffer); + std::ostream response_stream(&session->response->content_buffer); response_stream << tmp_stream.rdbuf(); + error_code ec; + session->callback(ec); } }; if((2+length)>num_additional_bytes) { - auto timer=get_timeout_timer(); - asio::async_read(*socket, response->content_buffer, asio::transfer_exactly(2+length-num_additional_bytes), - [this, response, post_process, timer](const error_code& ec, size_t /*bytes_transferred*/) { + auto timer=get_timeout_timer(session); + asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(2+length-num_additional_bytes), + [session, post_process, timer](const error_code& ec, size_t /*bytes_transferred*/) { if(timer) timer->cancel(); - if(!ec) { + if(!ec) post_process(); - } else { - close(); - throw system_error(ec); + close(session); + session->callback(ec); } }); } @@ -359,11 +557,17 @@ namespace SimpleWeb { post_process(); } else { - close(); - throw system_error(ec); + close(session); + session->callback(ec); } }); } + + static void close(const std::shared_ptr &session) { + error_code ec; + session->connection->socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec); + session->connection->socket->lowest_layer().close(ec); + } }; template @@ -374,46 +578,46 @@ namespace SimpleWeb { template<> class Client : public ClientBase { public: - Client(const std::string& server_port_path) : ClientBase::ClientBase(server_port_path, 80) { - socket=std::unique_ptr(new HTTP(*io_service)); - } + friend ClientBase; + + Client(const std::string& server_port_path) : ClientBase::ClientBase(server_port_path, 80) {} protected: - void connect() { - if(!socket->lowest_layer().is_open()) { - std::unique_ptr query; - if(config.proxy_server.empty()) - query=std::unique_ptr(new asio::ip::tcp::resolver::query(host, std::to_string(port))); - else { - auto proxy_host_port=parse_host_port(config.proxy_server, 8080); - query=std::unique_ptr(new asio::ip::tcp::resolver::query(proxy_host_port.first, std::to_string(proxy_host_port.second))); - } - - auto resolver=std::make_shared(*io_service); - resolver->async_resolve(*query, [this, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator it){ + std::shared_ptr create_connection() override { + return std::make_shared(host, port, config, std::unique_ptr(new HTTP(*io_service))); + } + + static void connect(const std::shared_ptr &session) { + if(!session->connection->socket->lowest_layer().is_open()) { + auto resolver=std::make_shared(*session->io_service); + auto timer=get_timeout_timer(session, session->connection->config.timeout_connect); + resolver->async_resolve(*session->connection->query, [session, timer, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator it){ + if(timer) + timer->cancel(); if(!ec) { - auto timer=get_timeout_timer(config.timeout_connect); - asio::async_connect(*socket, it, [this, resolver, timer](const error_code &ec, asio::ip::tcp::resolver::iterator /*it*/){ + auto timer=get_timeout_timer(session, session->connection->config.timeout_connect); + asio::async_connect(*session->connection->socket, it, [session, timer, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator /*it*/){ if(timer) timer->cancel(); if(!ec) { asio::ip::tcp::no_delay option(true); - this->socket->set_option(option); + session->connection->socket->set_option(option); + write(session); } else { - close(); - throw system_error(ec); + close(session); + session->callback(ec); } }); } else { - close(); - throw system_error(ec); + close(session); + session->callback(ec); } }); - io_service->reset(); - io_service->run(); } + else + write(session); } }; } diff --git a/client_https.hpp b/client_https.hpp index 8a6fff2..36ec316 100644 --- a/client_https.hpp +++ b/client_https.hpp @@ -15,6 +15,8 @@ namespace SimpleWeb { template<> class Client : public ClientBase { public: + friend ClientBase; + Client(const std::string& server_port_path, bool verify_certificate=true, const std::string& cert_file=std::string(), const std::string& private_key_file=std::string(), const std::string& verify_file=std::string()) : @@ -36,95 +38,95 @@ namespace SimpleWeb { context.set_verify_mode(asio::ssl::verify_peer); else context.set_verify_mode(asio::ssl::verify_none); - - socket=std::unique_ptr(new HTTPS(*io_service, context)); } protected: asio::ssl::context context; - void connect() { - if(!socket->lowest_layer().is_open()) { - std::unique_ptr query; - if(config.proxy_server.empty()) - query=std::unique_ptr(new asio::ip::tcp::resolver::query(host, std::to_string(port))); - else { - auto proxy_host_port=parse_host_port(config.proxy_server, 8080); - query=std::unique_ptr(new asio::ip::tcp::resolver::query(proxy_host_port.first, std::to_string(proxy_host_port.second))); - } - auto resolver=std::make_shared(*io_service); - resolver->async_resolve(*query, [this, resolver] (const error_code &ec, asio::ip::tcp::resolver::iterator it){ + std::shared_ptr create_connection() override { + return std::make_shared(host, port, config, std::unique_ptr(new HTTPS(*io_service, context))); + } + + static void connect(const std::shared_ptr &session) { + if(!session->connection->socket->lowest_layer().is_open()) { + auto resolver=std::make_shared(*session->io_service); + resolver->async_resolve(*session->connection->query, [session, resolver] (const error_code &ec, asio::ip::tcp::resolver::iterator it){ if(!ec) { - auto timer=get_timeout_timer(config.timeout_connect); - asio::async_connect(socket->lowest_layer(), it, [this, resolver, timer] (const error_code &ec, asio::ip::tcp::resolver::iterator /*it*/){ + auto timer=get_timeout_timer(session, session->connection->config.timeout_connect); + asio::async_connect(session->connection->socket->lowest_layer(), it, [session, resolver, timer] (const error_code &ec, asio::ip::tcp::resolver::iterator /*it*/){ if(timer) timer->cancel(); if(!ec) { asio::ip::tcp::no_delay option(true); - this->socket->lowest_layer().set_option(option); + session->connection->socket->lowest_layer().set_option(option); + + if(!session->connection->config.proxy_server.empty()) { + auto write_buffer=std::make_shared(); + std::ostream write_stream(write_buffer.get()); + auto host_port=session->connection->host+':'+std::to_string(session->connection->port); + write_stream << "CONNECT "+host_port+" HTTP/1.1\r\n" << "Host: " << host_port << "\r\n\r\n"; + auto timer=get_timeout_timer(session, session->connection->config.timeout_connect); + asio::async_write(session->connection->socket->next_layer(), *write_buffer, [session, write_buffer, timer](const error_code &ec, size_t /*bytes_transferred*/) { + if(timer) + timer->cancel(); + if(!ec) { + std::shared_ptr response(new Response()); + auto timer=get_timeout_timer(session, session->connection->config.timeout_connect); + asio::async_read_until(session->connection->socket->next_layer(), response->content_buffer, "\r\n\r\n", [session, response, timer](const error_code& ec, size_t /*bytes_transferred*/) { + if(timer) + timer->cancel(); + if(!ec) { + parse_response_header(response); + if (response->status_code.empty() || response->status_code.compare(0, 3, "200") != 0) { + close(session); + session->callback(make_error_code::make_error_code(errc::permission_denied)); + } + else + handshake(session); + } + else { + close(session); + session->callback(ec); + } + }); + } + else { + close(session); + session->callback(ec); + } + }); + } + else + handshake(session); } else { - close(); - throw system_error(ec); + close(session); + session->callback(ec); } }); } else { - close(); - throw system_error(ec); + close(session); + session->callback(ec); } }); - io_service->reset(); - io_service->run(); - - if(!config.proxy_server.empty()) { - auto write_buffer=std::make_shared(); - std::ostream write_stream(write_buffer.get()); - auto host_port=host+':'+std::to_string(port); - write_stream << "CONNECT "+host_port+" HTTP/1.1\r\n" << "Host: " << host_port << "\r\n\r\n"; - auto timer=get_timeout_timer(); - asio::async_write(socket->next_layer(), *write_buffer, [this, write_buffer, timer](const error_code &ec, size_t /*bytes_transferred*/) { - if(timer) - timer->cancel(); - if(ec) { - close(); - throw system_error(ec); - } - }); - io_service->reset(); - io_service->run(); - - std::shared_ptr response(new Response()); - timer=get_timeout_timer(); - asio::async_read_until(socket->next_layer(), response->content_buffer, "\r\n\r\n", [this, response, timer](const error_code& ec, size_t /*bytes_transferred*/) { - if(timer) - timer->cancel(); - if(ec) { - close(); - throw system_error(ec); - } - }); - io_service->reset(); - io_service->run(); - parse_response_header(response); - if (response->status_code.empty() || response->status_code.compare(0, 3, "200") != 0) { - close(); - throw make_error_code::make_error_code(errc::permission_denied); - } - } - - auto timer=get_timeout_timer(); - this->socket->async_handshake(asio::ssl::stream_base::client, [this, timer](const error_code& ec) { - if(timer) - timer->cancel(); - if(ec) { - close(); - throw system_error(ec); - } - }); - io_service->reset(); - io_service->run(); } + else + write(session); + } + + static void handshake(const std::shared_ptr &session) { + auto timer=get_timeout_timer(session, session->connection->config.timeout_connect); + session->connection->socket->async_handshake(asio::ssl::stream_base::client, [session, timer](const error_code& ec) { + if(timer) + timer->cancel(); + if(!ec) + write(session); + else { + close(session); + session->callback(ec); + } + }); } }; } diff --git a/tests/io_test.cpp b/tests/io_test.cpp index db999cc..9686fe4 100644 --- a/tests/io_test.cpp +++ b/tests/io_test.cpp @@ -33,6 +33,12 @@ int main() { *response << "HTTP/1.1 200 OK\r\nContent-Length: " << number.length() << "\r\n\r\n" << number; }; + server.resource["^/header$"]["GET"]=[](shared_ptr response, shared_ptr request) { + auto content=request->header.find("test1")->second+request->header.find("test2")->second; + + *response << "HTTP/1.1 200 OK\r\nContent-Length: " << content.length() << "\r\n\r\n" << content; + }; + thread server_thread([&server](){ //Start server server.start(); @@ -89,6 +95,50 @@ int main() { output << r->content.rdbuf(); assert(output.str()=="A string"); } + + { + stringstream output; + auto r=client.request("GET", "/header", "", {{"test1", "test"}, {"test2", "ing"}}); + output << r->content.rdbuf(); + assert(output.str()=="testing"); + } + } + + { + HttpClient client("localhost:8080"); + bool call=false; + client.request("GET", "/match/123", [&call](shared_ptr response, const SimpleWeb::error_code &ec) { + assert(!ec); + stringstream output; + output << response->content.rdbuf(); + assert(output.str()=="123"); + call=true; + }); + client.io_service->run(); + assert(call); + + { + vector calls(100); + vector threads; + for(size_t c=0;c<100;++c) { + calls[c]=false; + threads.emplace_back([c, &client, &calls] { + client.request("GET", "/match/123", [c, &calls](shared_ptr response, const SimpleWeb::error_code &ec) { + assert(!ec); + stringstream output; + output << response->content.rdbuf(); + assert(output.str()=="123"); + calls[c]=true; + }); + }); + } + for(auto &thread: threads) + thread.join(); + client.io_service->reset(); + client.io_service->run(); + for(auto call: calls) + assert(call); + } } server.stop(); diff --git a/tests/parse_test.cpp b/tests/parse_test.cpp index 692f0d3..948730e 100644 --- a/tests/parse_test.cpp +++ b/tests/parse_test.cpp @@ -55,7 +55,9 @@ class ClientTest : public ClientBase { public: ClientTest(const std::string& server_port_path) : ClientBase::ClientBase(server_port_path, 80) {} - void connect() {} + std::shared_ptr create_connection() override { + return nullptr; + } void constructor_parse_test1() { assert(host=="test.org");