From 2f32a2b52ff5c07393a21362900652c5f9f4dc88 Mon Sep 17 00:00:00 2001 From: eidheim Date: Tue, 22 Nov 2016 22:28:23 +0100 Subject: [PATCH] Client now uses asynchronous asio read/write calls --- client_http.hpp | 225 +++++++++++++++++++++++++++---------------- client_https.hpp | 41 ++++++-- tests/parse_test.cpp | 12 +-- 3 files changed, 180 insertions(+), 98 deletions(-) diff --git a/client_http.hpp b/client_http.hpp index 05ac640..795dcd3 100644 --- a/client_http.hpp +++ b/client_http.hpp @@ -65,18 +65,28 @@ namespace SimpleWeb { write_stream << "Content-Length: " << content.size() << "\r\n"; write_stream << "\r\n"; - try { - connect(); - - boost::asio::write(*socket, write_buffer); - if(content.size()>0) - boost::asio::write(*socket, boost::asio::buffer(content.data(), content.size())); - - } - catch(const std::exception& e) { - socket_error=true; - throw std::invalid_argument(e.what()); - } + connect(); + + boost::asio::async_write(*socket, write_buffer, + [this, &content](const boost::system::error_code &ec, size_t /*bytes_transferred*/) { + if(!ec) { + if(!content.empty()) { + boost::asio::async_write(*socket, boost::asio::buffer(content.data(), content.size()), + [this](const boost::system::error_code &ec, size_t /*bytes_transferred*/) { + if(ec) { + socket=nullptr; + throw boost::system::system_error(ec); + } + }); + } + } + else { + socket=nullptr; + throw boost::system::system_error(ec); + } + }); + io_service.reset(); + io_service.run(); return request_read(); } @@ -104,15 +114,15 @@ namespace SimpleWeb { if(content_length>0) write_stream << content.rdbuf(); - try { - connect(); - - boost::asio::write(*socket, write_buffer); - } - catch(const std::exception& e) { - socket_error=true; - throw std::invalid_argument(e.what()); - } + boost::asio::async_write(*socket, write_buffer, + [this](const boost::system::error_code &ec, size_t /*bytes_transferred*/) { + if(ec) { + socket=nullptr; + throw boost::system::system_error(ec); + } + }); + io_service.reset(); + io_service.run(); return request_read(); } @@ -122,14 +132,12 @@ namespace SimpleWeb { boost::asio::ip::tcp::endpoint endpoint; boost::asio::ip::tcp::resolver resolver; - std::shared_ptr socket; - bool socket_error; + std::unique_ptr socket; std::string host; unsigned short port; - ClientBase(const std::string& host_port, unsigned short default_port) : - resolver(io_service), socket_error(false) { + ClientBase(const std::string& host_port, unsigned short default_port) : resolver(io_service) { size_t host_end=host_port.find(':'); if(host_end==std::string::npos) { host=host_port; @@ -145,9 +153,9 @@ namespace SimpleWeb { virtual void connect()=0; - void parse_response_header(const std::shared_ptr &response, std::istream& stream) const { + void parse_response_header(const std::shared_ptr &response) const { std::string line; - getline(stream, line); + getline(response->content, line); size_t version_end=line.find(' '); if(version_end!=std::string::npos) { if(5status_code=line.substr(version_end+1, line.size()-(version_end+1)-1); - getline(stream, line); + getline(response->content, line); size_t param_end; while((param_end=line.find(':'))!=std::string::npos) { size_t value_start=param_end+1; @@ -166,7 +174,7 @@ namespace SimpleWeb { response->header.insert(std::make_pair(line.substr(0, param_end), line.substr(value_start, line.size()-value_start-1))); } - getline(stream, line); + getline(response->content, line); } } } @@ -174,61 +182,95 @@ namespace SimpleWeb { std::shared_ptr request_read() { std::shared_ptr response(new Response()); - try { - size_t bytes_transferred = boost::asio::read_until(*socket, response->content_buffer, "\r\n\r\n"); - - size_t num_additional_bytes=response->content_buffer.size()-bytes_transferred; - - parse_response_header(response, response->content); - - auto header_it=response->header.find("Content-Length"); - if(header_it!=response->header.end()) { - auto content_length=stoull(header_it->second); - if(content_length>num_additional_bytes) { - boost::asio::read(*socket, response->content_buffer, - boost::asio::transfer_exactly(content_length-num_additional_bytes)); + boost::asio::streambuf chunked_streambuf; + + boost::asio::async_read_until(*socket, response->content_buffer, "\r\n\r\n", + [this, &response, &chunked_streambuf](const boost::system::error_code& ec, size_t bytes_transferred) { + if(!ec) { + size_t num_additional_bytes=response->content_buffer.size()-bytes_transferred; + + parse_response_header(response); + + auto header_it=response->header.find("Content-Length"); + if(header_it!=response->header.end()) { + auto content_length=stoull(header_it->second); + if(content_length>num_additional_bytes) { + boost::asio::async_read(*socket, response->content_buffer, + boost::asio::transfer_exactly(content_length-num_additional_bytes), + [this](const boost::system::error_code& ec, size_t /*bytes_transferred*/) { + if(ec) { + socket=nullptr; + throw boost::system::system_error(ec); + } + }); + } + } + else if((header_it=response->header.find("Transfer-Encoding"))!=response->header.end() && header_it->second=="chunked") { + request_read_chunked(response, chunked_streambuf); } } - else if((header_it=response->header.find("Transfer-Encoding"))!=response->header.end() && header_it->second=="chunked") { - boost::asio::streambuf streambuf; - std::ostream content(&streambuf); - - std::streamsize length; - std::string buffer; - do { - size_t bytes_transferred = boost::asio::read_until(*socket, response->content_buffer, "\r\n"); - std::string line; - getline(response->content, line); - bytes_transferred-=line.size()+1; - line.pop_back(); - length=stol(line, 0, 16); + else { + socket=nullptr; + throw boost::system::system_error(ec); + } + }); + io_service.reset(); + io_service.run(); - auto num_additional_bytes=static_cast(response->content_buffer.size()-bytes_transferred); + return response; + } + + void request_read_chunked(const std::shared_ptr &response, boost::asio::streambuf &streambuf) { + boost::asio::async_read_until(*socket, response->content_buffer, "\r\n", + [this, &response, &streambuf](const boost::system::error_code& ec, size_t bytes_transferred) { + if(!ec) { + std::string line; + getline(response->content, line); + bytes_transferred-=line.size()+1; + line.pop_back(); + std::streamsize length=stol(line, 0, 16); - if((2+length)>num_additional_bytes) { - boost::asio::read(*socket, response->content_buffer, - boost::asio::transfer_exactly(2+length-num_additional_bytes)); - } - - buffer.resize(static_cast(length)); + auto num_additional_bytes=static_cast(response->content_buffer.size()-bytes_transferred); + + auto post_process=[this, &response, &streambuf, length] { + std::ostream stream(&streambuf); + std::vector buffer(static_cast(length)); response->content.read(&buffer[0], length); - content.write(&buffer[0], length); - + stream.write(&buffer[0], length); + //Remove "\r\n" response->content.get(); response->content.get(); - } while(length>0); + + if(length>0) + request_read_chunked(response, streambuf); + else { + std::ostream response_stream(&response->content_buffer); + response_stream << stream.rdbuf(); + } + }; - std::ostream response_content_output_stream(&response->content_buffer); - response_content_output_stream << content.rdbuf(); + if((2+length)>num_additional_bytes) { + boost::asio::async_read(*socket, response->content_buffer, + boost::asio::transfer_exactly(2+length-num_additional_bytes), + [this, post_process](const boost::system::error_code& ec, size_t /*bytes_transferred*/) { + if(!ec) { + post_process(); + } + else { + socket=nullptr; + throw boost::system::system_error(ec); + } + }); + } + else + post_process(); } - } - catch(const std::exception& e) { - socket_error=true; - throw std::invalid_argument(e.what()); - } - - return response; + else { + socket=nullptr; + throw boost::system::system_error(ec); + } + }); } }; @@ -240,20 +282,37 @@ namespace SimpleWeb { template<> class Client : public ClientBase { public: - Client(const std::string& server_port_path) : ClientBase::ClientBase(server_port_path, 80) { - socket=std::make_shared(io_service); - } + Client(const std::string& server_port_path) : ClientBase::ClientBase(server_port_path, 80) {} protected: void connect() { - if(socket_error || !socket->is_open()) { + if(!socket || !socket->is_open()) { boost::asio::ip::tcp::resolver::query query(host, std::to_string(port)); - boost::asio::connect(*socket, resolver.resolve(query)); - boost::asio::ip::tcp::no_delay option(true); - socket->set_option(option); - - socket_error=false; + resolver.async_resolve(query, [this](const boost::system::error_code &ec, + boost::asio::ip::tcp::resolver::iterator it){ + if(!ec) { + socket=std::unique_ptr(new HTTP(io_service)); + + boost::asio::async_connect(*socket, it, [this] + (const boost::system::error_code &ec, boost::asio::ip::tcp::resolver::iterator /*it*/){ + if(!ec) { + boost::asio::ip::tcp::no_delay option(true); + socket->set_option(option); + } + else { + socket=nullptr; + throw boost::system::system_error(ec); + } + }); + } + else { + socket=nullptr; + throw boost::system::system_error(ec); + } + }); + io_service.reset(); + io_service.run(); } } }; diff --git a/client_https.hpp b/client_https.hpp index 06cff14..c6cd89b 100644 --- a/client_https.hpp +++ b/client_https.hpp @@ -29,23 +29,46 @@ namespace SimpleWeb { if(verify_file.size()>0) context.load_verify_file(verify_file); - socket=std::make_shared(io_service, context); + socket=std::unique_ptr(new HTTPS(io_service, context)); } protected: boost::asio::ssl::context context; void connect() { - if(socket_error || !socket->lowest_layer().is_open()) { + if(!socket || !socket->lowest_layer().is_open()) { boost::asio::ip::tcp::resolver::query query(host, std::to_string(port)); - boost::asio::connect(socket->lowest_layer(), resolver.resolve(query)); - boost::asio::ip::tcp::no_delay option(true); - socket->lowest_layer().set_option(option); - - socket->handshake(boost::asio::ssl::stream_base::client); - - socket_error=false; + resolver.async_resolve(query, [this] + (const boost::system::error_code &ec, boost::asio::ip::tcp::resolver::iterator it){ + if(!ec) { + boost::asio::async_connect(socket->lowest_layer(), it, [this] + (const boost::system::error_code &ec, boost::asio::ip::tcp::resolver::iterator /*it*/){ + if(!ec) { + boost::asio::ip::tcp::no_delay option(true); + socket->lowest_layer().set_option(option); + + socket->async_handshake(boost::asio::ssl::stream_base::client, + [this](const boost::system::error_code& ec) { + if(ec) { + socket=nullptr; + throw boost::system::system_error(ec); + } + }); + } + else { + socket=nullptr; + throw boost::system::system_error(ec); + } + }); + } + else { + socket=nullptr; + throw boost::system::system_error(ec); + } + }); + io_service.reset(); + io_service.run(); } } }; diff --git a/tests/parse_test.cpp b/tests/parse_test.cpp index eba533d..6774a88 100644 --- a/tests/parse_test.cpp +++ b/tests/parse_test.cpp @@ -71,13 +71,13 @@ public: bool parse_response_header_test() { std::shared_ptr response(new Response()); - stringstream ss; - ss << "HTTP/1.1 200 OK\r\n"; - ss << "TestHeader: test\r\n"; - ss << "TestHeader2:test2\r\n"; - ss << "\r\n"; + ostream stream(&response->content_buffer); + stream << "HTTP/1.1 200 OK\r\n"; + stream << "TestHeader: test\r\n"; + stream << "TestHeader2:test2\r\n"; + stream << "\r\n"; - parse_response_header(response, ss); + parse_response_header(response); if(response->http_version!="1.1") return 0;