Further fix of #246 : corrected use of response streambuf after reconnect (wrong response object was used in Session::callback). Also closes connection when appropriate, and simplified Session::callback.

This commit is contained in:
eidheim 2019-06-21 19:08:43 +02:00
commit dbd2287418
2 changed files with 76 additions and 72 deletions

View file

@ -130,7 +130,7 @@ namespace SimpleWeb {
std::shared_ptr<Connection> connection; std::shared_ptr<Connection> connection;
std::unique_ptr<asio::streambuf> request_streambuf; std::unique_ptr<asio::streambuf> request_streambuf;
std::shared_ptr<Response> response; std::shared_ptr<Response> response;
std::function<void(const std::shared_ptr<Connection> &, const error_code &)> callback; std::function<void(const error_code &)> callback;
}; };
public: public:
@ -206,32 +206,34 @@ namespace SimpleWeb {
void request(const std::string &method, const std::string &path, string_view content, const CaseInsensitiveMultimap &header, void request(const std::string &method, const std::string &path, string_view content, const CaseInsensitiveMultimap &header,
std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) { std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) {
auto session = std::make_shared<Session>(config.max_response_streambuf_size, get_connection(), create_request_header(method, path, header)); auto session = std::make_shared<Session>(config.max_response_streambuf_size, get_connection(), create_request_header(method, path, header));
auto response = session->response; std::weak_ptr<Session> session_weak(session); // To avoid keeping session alive longer than needed
auto request_callback = std::make_shared<std::function<void(std::shared_ptr<Response>, const error_code &)>>(std::move(request_callback_)); auto request_callback = std::make_shared<std::function<void(std::shared_ptr<Response>, const error_code &)>>(std::move(request_callback_));
session->callback = [this, response, request_callback](const std::shared_ptr<Connection> &connection, const error_code &ec) { session->callback = [this, session_weak, request_callback](const error_code &ec) {
{ if(auto session = session_weak.lock()) {
std::lock_guard<std::mutex> lock(this->connections_mutex); if(session->connection) {
connection->in_use = false; std::lock_guard<std::mutex> lock(this->connections_mutex);
session->connection->in_use = false;
// Remove unused connections, but keep one open for HTTP persistent connection: // Remove unused connections, but keep one open for HTTP persistent connection:
std::size_t unused_connections = 0; std::size_t unused_connections = 0;
for(auto it = this->connections.begin(); it != this->connections.end();) { for(auto it = this->connections.begin(); it != this->connections.end();) {
if(ec && connection == *it) if(ec && session->connection == *it)
it = this->connections.erase(it);
else if((*it)->in_use)
++it;
else {
++unused_connections;
if(unused_connections > 1)
it = this->connections.erase(it); it = this->connections.erase(it);
else else if((*it)->in_use)
++it; ++it;
else {
++unused_connections;
if(unused_connections > 1)
it = this->connections.erase(it);
else
++it;
}
} }
} }
}
if(*request_callback) if(*request_callback)
(*request_callback)(response, ec); (*request_callback)(session->response, ec);
}
}; };
std::ostream write_stream(session->request_streambuf.get()); std::ostream write_stream(session->request_streambuf.get());
@ -271,32 +273,34 @@ namespace SimpleWeb {
void request(const std::string &method, const std::string &path, std::istream &content, const CaseInsensitiveMultimap &header, void request(const std::string &method, const std::string &path, std::istream &content, const CaseInsensitiveMultimap &header,
std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) { std::function<void(std::shared_ptr<Response>, const error_code &)> &&request_callback_) {
auto session = std::make_shared<Session>(config.max_response_streambuf_size, get_connection(), create_request_header(method, path, header)); auto session = std::make_shared<Session>(config.max_response_streambuf_size, get_connection(), create_request_header(method, path, header));
auto response = session->response; std::weak_ptr<Session> session_weak(session); // To avoid keeping session alive longer than needed
auto request_callback = std::make_shared<std::function<void(std::shared_ptr<Response>, const error_code &)>>(std::move(request_callback_)); auto request_callback = std::make_shared<std::function<void(std::shared_ptr<Response>, const error_code &)>>(std::move(request_callback_));
session->callback = [this, response, request_callback](const std::shared_ptr<Connection> &connection, const error_code &ec) { session->callback = [this, session_weak, request_callback](const error_code &ec) {
{ if(auto session = session_weak.lock()) {
std::lock_guard<std::mutex> lock(this->connections_mutex); if(session->connection) {
connection->in_use = false; std::lock_guard<std::mutex> lock(this->connections_mutex);
session->connection->in_use = false;
// Remove unused connections, but keep one open for HTTP persistent connection: // Remove unused connections, but keep one open for HTTP persistent connection:
std::size_t unused_connections = 0; std::size_t unused_connections = 0;
for(auto it = this->connections.begin(); it != this->connections.end();) { for(auto it = this->connections.begin(); it != this->connections.end();) {
if(ec && connection == *it) if(ec && session->connection == *it)
it = this->connections.erase(it);
else if((*it)->in_use)
++it;
else {
++unused_connections;
if(unused_connections > 1)
it = this->connections.erase(it); it = this->connections.erase(it);
else else if((*it)->in_use)
++it; ++it;
else {
++unused_connections;
if(unused_connections > 1)
it = this->connections.erase(it);
else
++it;
}
} }
} }
}
if(*request_callback) if(*request_callback)
(*request_callback)(response, ec); (*request_callback)(session->response, ec);
}
}; };
content.seekg(0, std::ios::end); content.seekg(0, std::ios::end);
@ -442,7 +446,7 @@ namespace SimpleWeb {
if(!ec) if(!ec)
this->read(session); this->read(session);
else else
session->callback(session->connection, ec); session->callback(ec);
}); });
} }
@ -454,7 +458,7 @@ namespace SimpleWeb {
if(!lock) if(!lock)
return; return;
if((!ec || ec == asio::error::not_found) && session->response->streambuf.size() == session->response->streambuf.max_size()) { if((!ec || ec == asio::error::not_found) && session->response->streambuf.size() == session->response->streambuf.max_size()) {
session->callback(session->connection, make_error_code::make_error_code(errc::message_size)); session->callback(make_error_code::make_error_code(errc::message_size));
return; return;
} }
if(!ec) { if(!ec) {
@ -462,7 +466,7 @@ namespace SimpleWeb {
std::size_t num_additional_bytes = session->response->streambuf.size() - bytes_transferred; std::size_t num_additional_bytes = session->response->streambuf.size() - bytes_transferred;
if(!ResponseMessage::parse(session->response->content, session->response->http_version, session->response->status_code, session->response->header)) { if(!ResponseMessage::parse(session->response->content, session->response->http_version, session->response->status_code, session->response->header)) {
session->callback(session->connection, make_error_code::make_error_code(errc::protocol_error)); session->callback(make_error_code::make_error_code(errc::protocol_error));
return; return;
} }
@ -478,17 +482,17 @@ namespace SimpleWeb {
return; return;
if(!ec) { if(!ec) {
if(session->response->streambuf.size() == session->response->streambuf.max_size()) { if(session->response->streambuf.size() == session->response->streambuf.max_size()) {
session->callback(session->connection, make_error_code::make_error_code(errc::message_size)); session->callback(make_error_code::make_error_code(errc::message_size));
return; return;
} }
session->callback(session->connection, ec); session->callback(ec);
} }
else else
session->callback(session->connection, ec); session->callback(ec);
}); });
} }
else else
session->callback(session->connection, ec); session->callback(ec);
} }
else if((header_it = session->response->header.find("Transfer-Encoding")) != session->response->header.end() && header_it->second == "chunked") { else if((header_it = session->response->header.find("Transfer-Encoding")) != session->response->header.end() && header_it->second == "chunked") {
auto chunks_streambuf = std::make_shared<asio::streambuf>(this->config.max_response_streambuf_size); auto chunks_streambuf = std::make_shared<asio::streambuf>(this->config.max_response_streambuf_size);
@ -501,19 +505,19 @@ namespace SimpleWeb {
auto lock = session->connection->handler_runner->continue_lock(); auto lock = session->connection->handler_runner->continue_lock();
if(!lock) if(!lock)
return; return;
session->connection = nullptr; // Disconnect
if(!ec) { if(!ec) {
if(session->response->streambuf.size() == session->response->streambuf.max_size()) { if(session->response->streambuf.size() == session->response->streambuf.max_size())
session->callback(session->connection, make_error_code::make_error_code(errc::message_size)); session->callback(make_error_code::make_error_code(errc::message_size));
return; else
} session->callback(ec);
session->callback(session->connection, ec);
} }
else else
session->callback(session->connection, ec == asio::error::eof ? error_code() : ec); session->callback(ec == asio::error::eof ? error_code() : ec);
}); });
} }
else else
session->callback(session->connection, ec); session->callback(ec);
} }
else { else {
if(session->connection->attempt_reconnect && ec != asio::error::operation_aborted) { if(session->connection->attempt_reconnect && ec != asio::error::operation_aborted) {
@ -531,11 +535,11 @@ namespace SimpleWeb {
} }
else { else {
lock.unlock(); lock.unlock();
session->callback(session->connection, ec); session->callback(ec);
} }
} }
else else
session->callback(session->connection, ec); session->callback(ec);
} }
}); });
} }
@ -548,7 +552,7 @@ namespace SimpleWeb {
if(!lock) if(!lock)
return; return;
if((!ec || ec == asio::error::not_found) && session->response->streambuf.size() == session->response->streambuf.max_size()) { if((!ec || ec == asio::error::not_found) && session->response->streambuf.size() == session->response->streambuf.max_size()) {
session->callback(session->connection, make_error_code::make_error_code(errc::message_size)); session->callback(make_error_code::make_error_code(errc::message_size));
return; return;
} }
if(!ec) { if(!ec) {
@ -561,7 +565,7 @@ namespace SimpleWeb {
length = stoul(line, 0, 16); length = stoul(line, 0, 16);
} }
catch(...) { catch(...) {
session->callback(session->connection, make_error_code::make_error_code(errc::protocol_error)); session->callback(make_error_code::make_error_code(errc::protocol_error));
return; return;
} }
@ -576,20 +580,20 @@ namespace SimpleWeb {
return; return;
if(!ec) { if(!ec) {
if(session->response->streambuf.size() == session->response->streambuf.max_size()) { if(session->response->streambuf.size() == session->response->streambuf.max_size()) {
session->callback(session->connection, make_error_code::make_error_code(errc::message_size)); session->callback(make_error_code::make_error_code(errc::message_size));
return; return;
} }
this->read_chunked_transfer_encoded_chunk(session, chunks_streambuf, length); this->read_chunked_transfer_encoded_chunk(session, chunks_streambuf, length);
} }
else else
session->callback(session->connection, ec); session->callback(ec);
}); });
} }
else else
this->read_chunked_transfer_encoded_chunk(session, chunks_streambuf, length); this->read_chunked_transfer_encoded_chunk(session, chunks_streambuf, length);
} }
else else
session->callback(session->connection, ec); session->callback(ec);
}); });
} }
@ -600,7 +604,7 @@ namespace SimpleWeb {
session->response->content.read(buffer.get(), static_cast<std::streamsize>(length)); session->response->content.read(buffer.get(), static_cast<std::streamsize>(length));
tmp_stream.write(buffer.get(), static_cast<std::streamsize>(length)); tmp_stream.write(buffer.get(), static_cast<std::streamsize>(length));
if(chunks_streambuf->size() == chunks_streambuf->max_size()) { if(chunks_streambuf->size() == chunks_streambuf->max_size()) {
session->callback(session->connection, make_error_code::make_error_code(errc::message_size)); session->callback(make_error_code::make_error_code(errc::message_size));
return; return;
} }
} }
@ -617,7 +621,7 @@ namespace SimpleWeb {
ostream << chunks_streambuf.get(); ostream << chunks_streambuf.get();
} }
error_code ec; error_code ec;
session->callback(session->connection, ec); session->callback(ec);
} }
} }
}; };
@ -660,11 +664,11 @@ namespace SimpleWeb {
this->write(session); this->write(session);
} }
else else
session->callback(session->connection, ec); session->callback(ec);
}); });
} }
else else
session->callback(session->connection, ec); session->callback(ec);
}); });
} }
else else

View file

@ -84,36 +84,36 @@ namespace SimpleWeb {
if(!lock) if(!lock)
return; return;
if((!ec || ec == asio::error::not_found) && response->streambuf.size() == response->streambuf.max_size()) { if((!ec || ec == asio::error::not_found) && response->streambuf.size() == response->streambuf.max_size()) {
session->callback(session->connection, make_error_code::make_error_code(errc::message_size)); session->callback(make_error_code::make_error_code(errc::message_size));
return; return;
} }
if(!ec) { if(!ec) {
if(!ResponseMessage::parse(response->content, response->http_version, response->status_code, response->header)) if(!ResponseMessage::parse(response->content, response->http_version, response->status_code, response->header))
session->callback(session->connection, make_error_code::make_error_code(errc::protocol_error)); session->callback(make_error_code::make_error_code(errc::protocol_error));
else { else {
if(response->status_code.compare(0, 3, "200") != 0) if(response->status_code.compare(0, 3, "200") != 0)
session->callback(session->connection, make_error_code::make_error_code(errc::permission_denied)); session->callback(make_error_code::make_error_code(errc::permission_denied));
else else
this->handshake(session); this->handshake(session);
} }
} }
else else
session->callback(session->connection, ec); session->callback(ec);
}); });
} }
else else
session->callback(session->connection, ec); session->callback(ec);
}); });
} }
else else
this->handshake(session); this->handshake(session);
} }
else else
session->callback(session->connection, ec); session->callback(ec);
}); });
} }
else else
session->callback(session->connection, ec); session->callback(ec);
}); });
} }
else else
@ -132,7 +132,7 @@ namespace SimpleWeb {
if(!ec) if(!ec)
this->write(session); this->write(session);
else else
session->callback(session->connection, ec); session->callback(ec);
}); });
} }
}; };