Completed support for multiple sequential Server::Response::send operations

This commit is contained in:
eidheim 2018-06-03 12:07:02 +02:00
commit cc234506b1

View file

@ -5,6 +5,7 @@
#include <functional>
#include <iostream>
#include <limits>
#include <list>
#include <map>
#include <mutex>
#include <sstream>
@ -57,12 +58,17 @@ namespace SimpleWeb {
friend class ServerBase<socket_type>;
friend class Server<socket_type>;
asio::streambuf streambuf;
std::unique_ptr<asio::streambuf> streambuf = std::unique_ptr<asio::streambuf>(new asio::streambuf());
std::shared_ptr<Session> session;
long timeout_content;
Response(std::shared_ptr<Session> session, long timeout_content) noexcept : std::ostream(&streambuf), session(std::move(session)), timeout_content(timeout_content) {}
asio::io_service::strand strand;
std::list<std::pair<std::shared_ptr<asio::streambuf>, std::function<void(const error_code &)>>> send_queue;
Response(std::shared_ptr<Session> session_, long timeout_content) noexcept : std::ostream(nullptr), session(std::move(session_)), timeout_content(timeout_content), strand(session->connection->socket->get_io_service()) {
rdbuf(streambuf.get());
}
template <typename size_type>
void write_header(const CaseInsensitiveMultimap &header, size_type size) {
@ -82,24 +88,31 @@ namespace SimpleWeb {
*this << "\r\n";
}
public:
std::size_t size() noexcept {
return streambuf.size();
void send_from_queue() {
auto self = this->shared_from_this();
strand.post([self]() {
asio::async_write(*self->session->connection->socket, *self->send_queue.begin()->first, self->strand.wrap([self](const error_code &ec, std::size_t /*bytes_transferred*/) {
auto lock = self->session->connection->handler_runner->continue_lock();
if(!lock)
return;
auto it = self->send_queue.begin();
if(it->second)
it->second(ec);
if(!ec) {
self->send_queue.erase(it);
if(self->send_queue.size() > 0)
self->send_from_queue();
}
else
self->send_queue.clear();
}));
});
}
/// Use this function if you need to recursively send parts of a longer message
void send(const std::function<void(const error_code &)> &callback = nullptr) noexcept {
// Take a snapshot of the stream buffer. Otherwise async_write may not
// have consumed the buffer before the next call, in which case the
// data from the first call may be sent twice. Use a captured shared
// pointer to keep the snapshot alive during the async call.
auto stream_snapshot = std::make_shared<asio::streambuf>();
std::ostream snapshot_writer(stream_snapshot.get());
snapshot_writer << &streambuf;
void send_on_delete(const std::function<void(const error_code &)> &callback = nullptr) noexcept {
session->connection->set_timeout(timeout_content);
auto self = this->shared_from_this(); // Keep Response instance alive through the following async_write
asio::async_write(*session->connection->socket, *stream_snapshot, [self, callback, stream_snapshot](const error_code &ec, std::size_t /*bytes_transferred*/) {
asio::async_write(*session->connection->socket, *streambuf, [self, callback](const error_code &ec, std::size_t /*bytes_transferred*/) {
self->session->connection->cancel_timeout();
auto lock = self->session->connection->handler_runner->continue_lock();
if(!lock)
@ -109,6 +122,27 @@ namespace SimpleWeb {
});
}
public:
std::size_t size() noexcept {
return streambuf->size();
}
/// Use this function if you need to recursively send parts of a longer message, or when using server-sent events (SSE).
void send(const std::function<void(const error_code &)> &callback = nullptr) noexcept {
session->connection->set_timeout(timeout_content);
std::shared_ptr<asio::streambuf> streambuf = std::move(this->streambuf);
this->streambuf = std::unique_ptr<asio::streambuf>(new asio::streambuf());
rdbuf(this->streambuf.get());
auto self = this->shared_from_this();
strand.post([self, streambuf, callback]() {
self->send_queue.emplace_back(streambuf, callback);
if(self->send_queue.size() == 1)
self->send_from_queue();
});
}
/// Write directly to stream buffer using std::ostream::write
void write(const char_type *ptr, std::streamsize n) {
std::ostream::write(ptr, n);
@ -473,7 +507,6 @@ namespace SimpleWeb {
if((!ec || ec == asio::error::not_found) && session->request->streambuf.size() == session->request->streambuf.max_size()) {
auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
response->write(StatusCode::client_error_payload_too_large);
response->send();
if(this->on_error)
this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
return;
@ -515,7 +548,6 @@ namespace SimpleWeb {
if(session->request->streambuf.size() == session->request->streambuf.max_size()) {
auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
response->write(StatusCode::client_error_payload_too_large);
response->send();
if(this->on_error)
this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
return;
@ -551,7 +583,6 @@ namespace SimpleWeb {
if((!ec || ec == asio::error::not_found) && session->request->streambuf.size() == session->request->streambuf.max_size()) {
auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
response->write(StatusCode::client_error_payload_too_large);
response->send();
if(this->on_error)
this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
return;
@ -584,7 +615,6 @@ namespace SimpleWeb {
if(session->request->streambuf.size() == session->request->streambuf.max_size()) {
auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
response->write(StatusCode::client_error_payload_too_large);
response->send();
if(this->on_error)
this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
return;
@ -612,7 +642,6 @@ namespace SimpleWeb {
if(chunks_streambuf->size() == chunks_streambuf->max_size()) {
auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
response->write(StatusCode::client_error_payload_too_large);
response->send();
if(this->on_error)
this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
return;
@ -673,7 +702,7 @@ namespace SimpleWeb {
session->connection->set_timeout(config.timeout_content);
auto response = std::shared_ptr<Response>(new Response(session, config.timeout_content), [this](Response *response_ptr) {
auto response = std::shared_ptr<Response>(response_ptr);
response->send([this, response](const error_code &ec) {
response->send_on_delete([this, response](const error_code &ec) {
if(!ec) {
if(response->close_connection_after_response)
return;