#ifndef CLIENT_HTTP_HPP #define CLIENT_HTTP_HPP #include #include #include #include #include "utility.hpp" #ifdef USE_STANDALONE_ASIO #include #include namespace SimpleWeb { using error_code = std::error_code; using errc = std::errc; using system_error = std::system_error; namespace make_error_code = std; using string_view = const std::string&; // TODO c++17: use std::string_view } #else #include #include namespace SimpleWeb { namespace asio = boost::asio; using error_code = boost::system::error_code; namespace errc = boost::system::errc; using system_error = boost::system::system_error; namespace make_error_code = boost::system::errc; using string_view = boost::string_ref; } #endif namespace SimpleWeb { template class Client; template class ClientBase { public: class Content : public std::istream { friend class ClientBase; public: size_t size() { return streambuf.size(); } /// Convenience function to return std::string. Note that the stream buffer is emptied when this functions is used. std::string string() { std::stringstream ss; ss << rdbuf(); return ss.str(); } private: asio::streambuf &streambuf; Content(asio::streambuf &streambuf): std::istream(&streambuf), streambuf(streambuf) {} }; class Response { friend class ClientBase; friend class Client; public: std::string http_version, status_code; Content content; CaseInsensitiveMultimap header; private: asio::streambuf content_buffer; Response(): content(content_buffer) {} }; class Config { friend class ClientBase; private: Config() {} public: /// Set timeout on requests in seconds. Default value: 0 (no timeout). size_t timeout=0; /// Set connect timeout in seconds. Default value: 0 (Config::timeout is then used instead). size_t timeout_connect=0; /// Set proxy server (server:port) std::string proxy_server; }; protected: class Connection { public: Connection(const std::string &host, unsigned short port, const Config &config, std::unique_ptr &&socket) : host(host), port(port), config(config), socket(std::move(socket)) { if(config.proxy_server.empty()) query=std::unique_ptr(new asio::ip::tcp::resolver::query(host, std::to_string(port))); else { auto proxy_host_port=parse_host_port(config.proxy_server, 8080); query=std::unique_ptr(new asio::ip::tcp::resolver::query(proxy_host_port.first, std::to_string(proxy_host_port.second))); } } std::string host; unsigned short port; Config config; std::unique_ptr socket; bool in_use=false; bool reconnecting=false; std::unique_ptr query; }; class Session { public: Session(const std::shared_ptr &io_service, const std::shared_ptr &connection, std::unique_ptr &&request_buffer) : io_service(io_service), connection(connection), request_buffer(std::move(request_buffer)), response(new Response()) {} std::shared_ptr io_service; std::shared_ptr connection; std::unique_ptr request_buffer; std::shared_ptr response; std::function callback; }; public: /// Set before calling request Config config; /// If you have your own asio::io_service, store its pointer here before calling request(). /// When using asynchronous requests, running the io_service is up to the programmer. std::shared_ptr io_service; virtual ~ClientBase() {} /// Synchronous request. The io_service is run within this function. std::shared_ptr request(const std::string& method, const std::string& path=std::string("/"), string_view content="", const CaseInsensitiveMultimap& header=CaseInsensitiveMultimap()) { auto session=std::make_shared(io_service, get_connection(), create_request_header(method, path, header)); std::shared_ptr response; session->callback=[this, &response, session](const error_code &ec) { { std::lock_guard lock(*connections_mutex); session->connection->in_use=false; } response=session->response; if(ec) throw system_error(ec); }; std::ostream write_stream(session->request_buffer.get()); if(content.size()>0) write_stream << "Content-Length: " << content.size() << "\r\n"; write_stream << "\r\n" << content; Client::connect(session); io_service->reset(); io_service->run(); return response; } /// Synchronous request. The io_service is run within this function. std::shared_ptr request(const std::string& method, const std::string& path, std::iostream& content, const CaseInsensitiveMultimap& header=CaseInsensitiveMultimap()) { auto session=std::make_shared(io_service, get_connection(), create_request_header(method, path, header)); std::shared_ptr response; session->callback=[this, &response, session](const error_code &ec) { { std::lock_guard lock(*connections_mutex); session->connection->in_use=false; } response=session->response; if(ec) throw system_error(ec); }; content.seekp(0, std::ios::end); auto content_length=content.tellp(); content.seekp(0, std::ios::beg); std::ostream write_stream(session->request_buffer.get()); if(content_length>0) write_stream << "Content-Length: " << content_length << "\r\n"; write_stream << "\r\n"; if(content_length>0) write_stream << content.rdbuf(); Client::connect(session); io_service->reset(); io_service->run(); return response; } /// Asynchronous request where setting and/or running Client's io_service is required. void request(const std::string &method, const std::string &path, string_view content, const CaseInsensitiveMultimap& header, std::function, const error_code&)> &&request_callback_) { auto session=std::make_shared(io_service, get_connection(), create_request_header(method, path, header)); auto request_callback=std::make_shared, const error_code&)>>(std::move(request_callback_)); auto connections_mutex=this->connections_mutex; session->callback=[session, request_callback, connections_mutex](const error_code &ec) { { std::lock_guard lock(*connections_mutex); session->connection->in_use=false; } if(*request_callback) (*request_callback)(session->response, ec); }; std::ostream write_stream(session->request_buffer.get()); if(content.size()>0) write_stream << "Content-Length: " << content.size() << "\r\n"; write_stream << "\r\n" << content; Client::connect(session); } /// Asynchronous request where setting and/or running Client's io_service is required. void request(const std::string &method, const std::string &path, string_view content, std::function, const error_code&)> &&request_callback) { request(method, path, content, CaseInsensitiveMultimap(), std::move(request_callback)); } /// Asynchronous request where setting and/or running Client's io_service is required. void request(const std::string &method, const std::string &path, std::function, const error_code&)> &&request_callback) { request(method, path, std::string(), CaseInsensitiveMultimap(), std::move(request_callback)); } /// Asynchronous request where setting and/or running Client's io_service is required. void request(const std::string &method, std::function, const error_code&)> &&request_callback) { request(method, std::string("/"), std::string(), CaseInsensitiveMultimap(), std::move(request_callback)); } /// Asynchronous request where setting and/or running Client's io_service is required. void request(const std::string &method, const std::string &path, std::iostream& content, const CaseInsensitiveMultimap& header, std::function, const error_code&)> &&request_callback_) { auto session=std::make_shared(io_service, get_connection(), create_request_header(method, path, header)); auto request_callback=std::make_shared, const error_code&)>>(std::move(request_callback_)); auto connections_mutex=this->connections_mutex; session->callback=[session, request_callback, connections_mutex](const error_code &ec) { { std::lock_guard lock(*connections_mutex); session->connection->in_use=false; } if(*request_callback) (*request_callback)(session->response, ec); }; content.seekp(0, std::ios::end); auto content_length=content.tellp(); content.seekp(0, std::ios::beg); std::ostream write_stream(session->request_buffer.get()); if(content_length>0) write_stream << "Content-Length: " << content_length << "\r\n"; write_stream << "\r\n"; if(content_length>0) write_stream << content.rdbuf(); Client::connect(session); } /// Asynchronous request where setting and/or running Client's io_service is required. void request(const std::string &method, const std::string &path, std::iostream& content, std::function, const error_code&)> &&request_callback) { request(method, path, content, CaseInsensitiveMultimap(), std::move(request_callback)); } protected: std::string host; unsigned short port; std::vector> connections; std::shared_ptr connections_mutex; ClientBase(const std::string& host_port, unsigned short default_port) : io_service(new asio::io_service()), connections_mutex(new std::mutex()) { auto parsed_host_port=parse_host_port(host_port, default_port); host=parsed_host_port.first; port=parsed_host_port.second; } std::shared_ptr get_connection() { std::shared_ptr connection; std::lock_guard lock(*connections_mutex); for(auto it=connections.begin();it!=connections.end();) { if((*it)->in_use) ++it; else if(!connection) { connection=*it; ++it; } else it=connections.erase(it); } if(!connection) { connection=create_connection(); connections.emplace_back(connection); } connection->reconnecting=false; connection->in_use=true; return connection; } virtual std::shared_ptr create_connection()=0; std::unique_ptr create_request_header(const std::string& method, const std::string& path, const CaseInsensitiveMultimap& header) const { auto corrected_path=path; if(corrected_path=="") corrected_path="/"; if(!config.proxy_server.empty() && std::is_same::value) corrected_path="http://"+host+':'+std::to_string(port)+corrected_path; std::unique_ptr request_buffer(new asio::streambuf()); std::ostream write_stream(request_buffer.get()); write_stream << method << " " << corrected_path << " HTTP/1.1\r\n"; write_stream << "Host: " << host << "\r\n"; for(auto& h: header) write_stream << h.first << ": " << h.second << "\r\n"; return request_buffer; } static std::pair parse_host_port(const std::string &host_port, unsigned short default_port) { std::pair parsed_host_port; size_t host_end=host_port.find(':'); if(host_end==std::string::npos) { parsed_host_port.first=host_port; parsed_host_port.second=default_port; } else { parsed_host_port.first=host_port.substr(0, host_end); parsed_host_port.second=static_cast(stoul(host_port.substr(host_end+1))); } return parsed_host_port; } static std::shared_ptr get_timeout_timer(const std::shared_ptr &session, size_t timeout=0) { if(timeout==0) timeout=session->connection->config.timeout; if(timeout==0) return nullptr; auto timer=std::make_shared(*session->io_service); timer->expires_from_now(boost::posix_time::seconds(timeout)); timer->async_wait([session](const error_code& ec) { if(!ec) close(session); }); return timer; } static void parse_response_header(const std::shared_ptr &response) { std::string line; getline(response->content, line); size_t version_end=line.find(' '); if(version_end!=std::string::npos) { if(5http_version=line.substr(5, version_end-5); if((version_end+1)status_code=line.substr(version_end+1, line.size()-(version_end+1)-1); getline(response->content, line); size_t param_end; while((param_end=line.find(':'))!=std::string::npos) { size_t value_start=param_end+1; if((value_start)header.insert(std::make_pair(line.substr(0, param_end), line.substr(value_start, line.size()-value_start-1))); } getline(response->content, line); } } } static void write(const std::shared_ptr &session) { auto timer=get_timeout_timer(session); asio::async_write(*session->connection->socket, session->request_buffer->data(), [session, timer](const error_code &ec, size_t /*bytes_transferred*/) { if(timer) timer->cancel(); if(!ec) read(session); else { close(session); session->callback(ec); } }); } static void read(const std::shared_ptr &session) { auto timer=get_timeout_timer(session); asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n\r\n", [session, timer](const error_code& ec, size_t bytes_transferred) { if(timer) timer->cancel(); if(!ec) { session->connection->reconnecting=false; size_t num_additional_bytes=session->response->content_buffer.size()-bytes_transferred; parse_response_header(session->response); auto header_it=session->response->header.find("Content-Length"); if(header_it!=session->response->header.end()) { auto content_length=stoull(header_it->second); if(content_length>num_additional_bytes) { auto timer=get_timeout_timer(session); asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(content_length-num_additional_bytes), [session, timer](const error_code& ec, size_t /*bytes_transferred*/) { if(timer) timer->cancel(); if(!ec) session->callback(ec); else { close(session); session->callback(ec); } }); } else session->callback(ec); } else if((header_it=session->response->header.find("Transfer-Encoding"))!=session->response->header.end() && header_it->second=="chunked") { auto tmp_streambuf=std::make_shared(); read_chunked(session, tmp_streambuf); } else if(session->response->http_version<"1.1" || ((header_it=session->response->header.find("Session"))!=session->response->header.end() && header_it->second=="close")) { auto timer=get_timeout_timer(session); asio::async_read(*session->connection->socket, session->response->content_buffer, [session, timer](const error_code& ec, size_t /*bytes_transferred*/) { if(timer) timer->cancel(); if(!ec) session->callback(ec); else { close(session); if(ec==asio::error::eof) { error_code ec; session->callback(ec); } else session->callback(ec); } }); } else session->callback(ec); } else { if(!session->connection->reconnecting) { session->connection->reconnecting=true; close(session); Client::connect(session); } else { close(session); session->callback(ec); } } }); } static void read_chunked(const std::shared_ptr &session, const std::shared_ptr &tmp_streambuf) { auto timer=get_timeout_timer(session); asio::async_read_until(*session->connection->socket, session->response->content_buffer, "\r\n", [session, tmp_streambuf, timer](const error_code& ec, size_t bytes_transferred) { if(timer) timer->cancel(); if(!ec) { std::string line; getline(session->response->content, line); bytes_transferred-=line.size()+1; line.pop_back(); std::streamsize length=stol(line, 0, 16); auto num_additional_bytes=static_cast(session->response->content_buffer.size()-bytes_transferred); auto post_process=[session, tmp_streambuf, length] { std::ostream tmp_stream(tmp_streambuf.get()); if(length>0) { std::vector buffer(static_cast(length)); session->response->content.read(&buffer[0], length); tmp_stream.write(&buffer[0], length); } //Remove "\r\n" session->response->content.get(); session->response->content.get(); if(length>0) read_chunked(session, tmp_streambuf); else { std::ostream response_stream(&session->response->content_buffer); response_stream << tmp_stream.rdbuf(); error_code ec; session->callback(ec); } }; if((2+length)>num_additional_bytes) { auto timer=get_timeout_timer(session); asio::async_read(*session->connection->socket, session->response->content_buffer, asio::transfer_exactly(2+length-num_additional_bytes), [session, post_process, timer](const error_code& ec, size_t /*bytes_transferred*/) { if(timer) timer->cancel(); if(!ec) post_process(); else { close(session); session->callback(ec); } }); } else post_process(); } else { close(session); session->callback(ec); } }); } static void close(const std::shared_ptr &session) { error_code ec; session->connection->socket->lowest_layer().shutdown(asio::ip::tcp::socket::shutdown_both, ec); session->connection->socket->lowest_layer().close(ec); } }; template class Client : public ClientBase {}; typedef asio::ip::tcp::socket HTTP; template<> class Client : public ClientBase { public: friend ClientBase; Client(const std::string& server_port_path) : ClientBase::ClientBase(server_port_path, 80) {} protected: std::shared_ptr create_connection() override { return std::make_shared(host, port, config, std::unique_ptr(new HTTP(*io_service))); } static void connect(const std::shared_ptr &session) { if(!session->connection->socket->lowest_layer().is_open()) { auto resolver=std::make_shared(*session->io_service); auto timer=get_timeout_timer(session, session->connection->config.timeout_connect); resolver->async_resolve(*session->connection->query, [session, timer, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator it){ if(timer) timer->cancel(); if(!ec) { auto timer=get_timeout_timer(session, session->connection->config.timeout_connect); asio::async_connect(*session->connection->socket, it, [session, timer, resolver](const error_code &ec, asio::ip::tcp::resolver::iterator /*it*/){ if(timer) timer->cancel(); if(!ec) { asio::ip::tcp::no_delay option(true); session->connection->socket->set_option(option); write(session); } else { close(session); session->callback(ec); } }); } else { close(session); session->callback(ec); } }); } else write(session); } }; } #endif /* CLIENT_HTTP_HPP */