Client now uses asynchronous asio read/write calls

This commit is contained in:
eidheim 2016-11-22 22:28:23 +01:00
commit 2f32a2b52f
3 changed files with 180 additions and 98 deletions

View file

@ -65,18 +65,28 @@ namespace SimpleWeb {
write_stream << "Content-Length: " << content.size() << "\r\n";
write_stream << "\r\n";
try {
connect();
boost::asio::write(*socket, write_buffer);
if(content.size()>0)
boost::asio::write(*socket, boost::asio::buffer(content.data(), content.size()));
}
catch(const std::exception& e) {
socket_error=true;
throw std::invalid_argument(e.what());
}
connect();
boost::asio::async_write(*socket, write_buffer,
[this, &content](const boost::system::error_code &ec, size_t /*bytes_transferred*/) {
if(!ec) {
if(!content.empty()) {
boost::asio::async_write(*socket, boost::asio::buffer(content.data(), content.size()),
[this](const boost::system::error_code &ec, size_t /*bytes_transferred*/) {
if(ec) {
socket=nullptr;
throw boost::system::system_error(ec);
}
});
}
}
else {
socket=nullptr;
throw boost::system::system_error(ec);
}
});
io_service.reset();
io_service.run();
return request_read();
}
@ -104,15 +114,15 @@ namespace SimpleWeb {
if(content_length>0)
write_stream << content.rdbuf();
try {
connect();
boost::asio::write(*socket, write_buffer);
}
catch(const std::exception& e) {
socket_error=true;
throw std::invalid_argument(e.what());
}
boost::asio::async_write(*socket, write_buffer,
[this](const boost::system::error_code &ec, size_t /*bytes_transferred*/) {
if(ec) {
socket=nullptr;
throw boost::system::system_error(ec);
}
});
io_service.reset();
io_service.run();
return request_read();
}
@ -122,14 +132,12 @@ namespace SimpleWeb {
boost::asio::ip::tcp::endpoint endpoint;
boost::asio::ip::tcp::resolver resolver;
std::shared_ptr<socket_type> socket;
bool socket_error;
std::unique_ptr<socket_type> socket;
std::string host;
unsigned short port;
ClientBase(const std::string& host_port, unsigned short default_port) :
resolver(io_service), socket_error(false) {
ClientBase(const std::string& host_port, unsigned short default_port) : resolver(io_service) {
size_t host_end=host_port.find(':');
if(host_end==std::string::npos) {
host=host_port;
@ -145,9 +153,9 @@ namespace SimpleWeb {
virtual void connect()=0;
void parse_response_header(const std::shared_ptr<Response> &response, std::istream& stream) const {
void parse_response_header(const std::shared_ptr<Response> &response) const {
std::string line;
getline(stream, line);
getline(response->content, line);
size_t version_end=line.find(' ');
if(version_end!=std::string::npos) {
if(5<line.size())
@ -155,7 +163,7 @@ namespace SimpleWeb {
if((version_end+1)<line.size())
response->status_code=line.substr(version_end+1, line.size()-(version_end+1)-1);
getline(stream, line);
getline(response->content, line);
size_t param_end;
while((param_end=line.find(':'))!=std::string::npos) {
size_t value_start=param_end+1;
@ -166,7 +174,7 @@ namespace SimpleWeb {
response->header.insert(std::make_pair(line.substr(0, param_end), line.substr(value_start, line.size()-value_start-1)));
}
getline(stream, line);
getline(response->content, line);
}
}
}
@ -174,61 +182,95 @@ namespace SimpleWeb {
std::shared_ptr<Response> request_read() {
std::shared_ptr<Response> response(new Response());
try {
size_t bytes_transferred = boost::asio::read_until(*socket, response->content_buffer, "\r\n\r\n");
size_t num_additional_bytes=response->content_buffer.size()-bytes_transferred;
parse_response_header(response, response->content);
auto header_it=response->header.find("Content-Length");
if(header_it!=response->header.end()) {
auto content_length=stoull(header_it->second);
if(content_length>num_additional_bytes) {
boost::asio::read(*socket, response->content_buffer,
boost::asio::transfer_exactly(content_length-num_additional_bytes));
boost::asio::streambuf chunked_streambuf;
boost::asio::async_read_until(*socket, response->content_buffer, "\r\n\r\n",
[this, &response, &chunked_streambuf](const boost::system::error_code& ec, size_t bytes_transferred) {
if(!ec) {
size_t num_additional_bytes=response->content_buffer.size()-bytes_transferred;
parse_response_header(response);
auto header_it=response->header.find("Content-Length");
if(header_it!=response->header.end()) {
auto content_length=stoull(header_it->second);
if(content_length>num_additional_bytes) {
boost::asio::async_read(*socket, response->content_buffer,
boost::asio::transfer_exactly(content_length-num_additional_bytes),
[this](const boost::system::error_code& ec, size_t /*bytes_transferred*/) {
if(ec) {
socket=nullptr;
throw boost::system::system_error(ec);
}
});
}
}
else if((header_it=response->header.find("Transfer-Encoding"))!=response->header.end() && header_it->second=="chunked") {
request_read_chunked(response, chunked_streambuf);
}
}
else if((header_it=response->header.find("Transfer-Encoding"))!=response->header.end() && header_it->second=="chunked") {
boost::asio::streambuf streambuf;
std::ostream content(&streambuf);
std::streamsize length;
std::string buffer;
do {
size_t bytes_transferred = boost::asio::read_until(*socket, response->content_buffer, "\r\n");
std::string line;
getline(response->content, line);
bytes_transferred-=line.size()+1;
line.pop_back();
length=stol(line, 0, 16);
else {
socket=nullptr;
throw boost::system::system_error(ec);
}
});
io_service.reset();
io_service.run();
auto num_additional_bytes=static_cast<std::streamsize>(response->content_buffer.size()-bytes_transferred);
return response;
}
void request_read_chunked(const std::shared_ptr<Response> &response, boost::asio::streambuf &streambuf) {
boost::asio::async_read_until(*socket, response->content_buffer, "\r\n",
[this, &response, &streambuf](const boost::system::error_code& ec, size_t bytes_transferred) {
if(!ec) {
std::string line;
getline(response->content, line);
bytes_transferred-=line.size()+1;
line.pop_back();
std::streamsize length=stol(line, 0, 16);
if((2+length)>num_additional_bytes) {
boost::asio::read(*socket, response->content_buffer,
boost::asio::transfer_exactly(2+length-num_additional_bytes));
}
buffer.resize(static_cast<size_t>(length));
auto num_additional_bytes=static_cast<std::streamsize>(response->content_buffer.size()-bytes_transferred);
auto post_process=[this, &response, &streambuf, length] {
std::ostream stream(&streambuf);
std::vector<char> buffer(static_cast<size_t>(length));
response->content.read(&buffer[0], length);
content.write(&buffer[0], length);
stream.write(&buffer[0], length);
//Remove "\r\n"
response->content.get();
response->content.get();
} while(length>0);
if(length>0)
request_read_chunked(response, streambuf);
else {
std::ostream response_stream(&response->content_buffer);
response_stream << stream.rdbuf();
}
};
std::ostream response_content_output_stream(&response->content_buffer);
response_content_output_stream << content.rdbuf();
if((2+length)>num_additional_bytes) {
boost::asio::async_read(*socket, response->content_buffer,
boost::asio::transfer_exactly(2+length-num_additional_bytes),
[this, post_process](const boost::system::error_code& ec, size_t /*bytes_transferred*/) {
if(!ec) {
post_process();
}
else {
socket=nullptr;
throw boost::system::system_error(ec);
}
});
}
else
post_process();
}
}
catch(const std::exception& e) {
socket_error=true;
throw std::invalid_argument(e.what());
}
return response;
else {
socket=nullptr;
throw boost::system::system_error(ec);
}
});
}
};
@ -240,20 +282,37 @@ namespace SimpleWeb {
template<>
class Client<HTTP> : public ClientBase<HTTP> {
public:
Client(const std::string& server_port_path) : ClientBase<HTTP>::ClientBase(server_port_path, 80) {
socket=std::make_shared<HTTP>(io_service);
}
Client(const std::string& server_port_path) : ClientBase<HTTP>::ClientBase(server_port_path, 80) {}
protected:
void connect() {
if(socket_error || !socket->is_open()) {
if(!socket || !socket->is_open()) {
boost::asio::ip::tcp::resolver::query query(host, std::to_string(port));
boost::asio::connect(*socket, resolver.resolve(query));
boost::asio::ip::tcp::no_delay option(true);
socket->set_option(option);
socket_error=false;
resolver.async_resolve(query, [this](const boost::system::error_code &ec,
boost::asio::ip::tcp::resolver::iterator it){
if(!ec) {
socket=std::unique_ptr<HTTP>(new HTTP(io_service));
boost::asio::async_connect(*socket, it, [this]
(const boost::system::error_code &ec, boost::asio::ip::tcp::resolver::iterator /*it*/){
if(!ec) {
boost::asio::ip::tcp::no_delay option(true);
socket->set_option(option);
}
else {
socket=nullptr;
throw boost::system::system_error(ec);
}
});
}
else {
socket=nullptr;
throw boost::system::system_error(ec);
}
});
io_service.reset();
io_service.run();
}
}
};