Merge branch 'sse' of https://gitlab.com/eidheim/Simple-Web-Server
This commit is contained in:
commit
69b34bc7c2
2 changed files with 74 additions and 14 deletions
|
|
@ -5,6 +5,7 @@
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
#include <limits>
|
#include <limits>
|
||||||
|
#include <list>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
|
|
@ -57,12 +58,17 @@ namespace SimpleWeb {
|
||||||
friend class ServerBase<socket_type>;
|
friend class ServerBase<socket_type>;
|
||||||
friend class Server<socket_type>;
|
friend class Server<socket_type>;
|
||||||
|
|
||||||
asio::streambuf streambuf;
|
std::unique_ptr<asio::streambuf> streambuf = std::unique_ptr<asio::streambuf>(new asio::streambuf());
|
||||||
|
|
||||||
std::shared_ptr<Session> session;
|
std::shared_ptr<Session> session;
|
||||||
long timeout_content;
|
long timeout_content;
|
||||||
|
|
||||||
Response(std::shared_ptr<Session> session, long timeout_content) noexcept : std::ostream(&streambuf), session(std::move(session)), timeout_content(timeout_content) {}
|
asio::io_service::strand strand;
|
||||||
|
std::list<std::pair<std::shared_ptr<asio::streambuf>, std::function<void(const error_code &)>>> send_queue;
|
||||||
|
|
||||||
|
Response(std::shared_ptr<Session> session_, long timeout_content) noexcept : std::ostream(nullptr), session(std::move(session_)), timeout_content(timeout_content), strand(session->connection->socket->get_io_service()) {
|
||||||
|
rdbuf(streambuf.get());
|
||||||
|
}
|
||||||
|
|
||||||
template <typename size_type>
|
template <typename size_type>
|
||||||
void write_header(const CaseInsensitiveMultimap &header, size_type size) {
|
void write_header(const CaseInsensitiveMultimap &header, size_type size) {
|
||||||
|
|
@ -82,16 +88,31 @@ namespace SimpleWeb {
|
||||||
*this << "\r\n";
|
*this << "\r\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
public:
|
void send_from_queue() {
|
||||||
std::size_t size() noexcept {
|
auto self = this->shared_from_this();
|
||||||
return streambuf.size();
|
strand.post([self]() {
|
||||||
|
asio::async_write(*self->session->connection->socket, *self->send_queue.begin()->first, self->strand.wrap([self](const error_code &ec, std::size_t /*bytes_transferred*/) {
|
||||||
|
auto lock = self->session->connection->handler_runner->continue_lock();
|
||||||
|
if(!lock)
|
||||||
|
return;
|
||||||
|
auto it = self->send_queue.begin();
|
||||||
|
if(it->second)
|
||||||
|
it->second(ec);
|
||||||
|
if(!ec) {
|
||||||
|
self->send_queue.erase(it);
|
||||||
|
if(self->send_queue.size() > 0)
|
||||||
|
self->send_from_queue();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
self->send_queue.clear();
|
||||||
|
}));
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Use this function if you need to recursively send parts of a longer message
|
void send_on_delete(const std::function<void(const error_code &)> &callback = nullptr) noexcept {
|
||||||
void send(const std::function<void(const error_code &)> &callback = nullptr) noexcept {
|
|
||||||
session->connection->set_timeout(timeout_content);
|
session->connection->set_timeout(timeout_content);
|
||||||
auto self = this->shared_from_this(); // Keep Response instance alive through the following async_write
|
auto self = this->shared_from_this(); // Keep Response instance alive through the following async_write
|
||||||
asio::async_write(*session->connection->socket, streambuf, [self, callback](const error_code &ec, std::size_t /*bytes_transferred*/) {
|
asio::async_write(*session->connection->socket, *streambuf, [self, callback](const error_code &ec, std::size_t /*bytes_transferred*/) {
|
||||||
self->session->connection->cancel_timeout();
|
self->session->connection->cancel_timeout();
|
||||||
auto lock = self->session->connection->handler_runner->continue_lock();
|
auto lock = self->session->connection->handler_runner->continue_lock();
|
||||||
if(!lock)
|
if(!lock)
|
||||||
|
|
@ -101,6 +122,27 @@ namespace SimpleWeb {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
std::size_t size() noexcept {
|
||||||
|
return streambuf->size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Use this function if you need to recursively send parts of a longer message, or when using server-sent events (SSE).
|
||||||
|
void send(const std::function<void(const error_code &)> &callback = nullptr) noexcept {
|
||||||
|
session->connection->set_timeout(timeout_content);
|
||||||
|
|
||||||
|
std::shared_ptr<asio::streambuf> streambuf = std::move(this->streambuf);
|
||||||
|
this->streambuf = std::unique_ptr<asio::streambuf>(new asio::streambuf());
|
||||||
|
rdbuf(this->streambuf.get());
|
||||||
|
|
||||||
|
auto self = this->shared_from_this();
|
||||||
|
strand.post([self, streambuf, callback]() {
|
||||||
|
self->send_queue.emplace_back(streambuf, callback);
|
||||||
|
if(self->send_queue.size() == 1)
|
||||||
|
self->send_from_queue();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/// Write directly to stream buffer using std::ostream::write
|
/// Write directly to stream buffer using std::ostream::write
|
||||||
void write(const char_type *ptr, std::streamsize n) {
|
void write(const char_type *ptr, std::streamsize n) {
|
||||||
std::ostream::write(ptr, n);
|
std::ostream::write(ptr, n);
|
||||||
|
|
@ -465,7 +507,6 @@ namespace SimpleWeb {
|
||||||
if((!ec || ec == asio::error::not_found) && session->request->streambuf.size() == session->request->streambuf.max_size()) {
|
if((!ec || ec == asio::error::not_found) && session->request->streambuf.size() == session->request->streambuf.max_size()) {
|
||||||
auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
|
auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
|
||||||
response->write(StatusCode::client_error_payload_too_large);
|
response->write(StatusCode::client_error_payload_too_large);
|
||||||
response->send();
|
|
||||||
if(this->on_error)
|
if(this->on_error)
|
||||||
this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
|
this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
|
||||||
return;
|
return;
|
||||||
|
|
@ -507,7 +548,6 @@ namespace SimpleWeb {
|
||||||
if(session->request->streambuf.size() == session->request->streambuf.max_size()) {
|
if(session->request->streambuf.size() == session->request->streambuf.max_size()) {
|
||||||
auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
|
auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
|
||||||
response->write(StatusCode::client_error_payload_too_large);
|
response->write(StatusCode::client_error_payload_too_large);
|
||||||
response->send();
|
|
||||||
if(this->on_error)
|
if(this->on_error)
|
||||||
this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
|
this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
|
||||||
return;
|
return;
|
||||||
|
|
@ -543,7 +583,6 @@ namespace SimpleWeb {
|
||||||
if((!ec || ec == asio::error::not_found) && session->request->streambuf.size() == session->request->streambuf.max_size()) {
|
if((!ec || ec == asio::error::not_found) && session->request->streambuf.size() == session->request->streambuf.max_size()) {
|
||||||
auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
|
auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
|
||||||
response->write(StatusCode::client_error_payload_too_large);
|
response->write(StatusCode::client_error_payload_too_large);
|
||||||
response->send();
|
|
||||||
if(this->on_error)
|
if(this->on_error)
|
||||||
this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
|
this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
|
||||||
return;
|
return;
|
||||||
|
|
@ -576,7 +615,6 @@ namespace SimpleWeb {
|
||||||
if(session->request->streambuf.size() == session->request->streambuf.max_size()) {
|
if(session->request->streambuf.size() == session->request->streambuf.max_size()) {
|
||||||
auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
|
auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
|
||||||
response->write(StatusCode::client_error_payload_too_large);
|
response->write(StatusCode::client_error_payload_too_large);
|
||||||
response->send();
|
|
||||||
if(this->on_error)
|
if(this->on_error)
|
||||||
this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
|
this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
|
||||||
return;
|
return;
|
||||||
|
|
@ -604,7 +642,6 @@ namespace SimpleWeb {
|
||||||
if(chunks_streambuf->size() == chunks_streambuf->max_size()) {
|
if(chunks_streambuf->size() == chunks_streambuf->max_size()) {
|
||||||
auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
|
auto response = std::shared_ptr<Response>(new Response(session, this->config.timeout_content));
|
||||||
response->write(StatusCode::client_error_payload_too_large);
|
response->write(StatusCode::client_error_payload_too_large);
|
||||||
response->send();
|
|
||||||
if(this->on_error)
|
if(this->on_error)
|
||||||
this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
|
this->on_error(session->request, make_error_code::make_error_code(errc::message_size));
|
||||||
return;
|
return;
|
||||||
|
|
@ -665,7 +702,7 @@ namespace SimpleWeb {
|
||||||
session->connection->set_timeout(config.timeout_content);
|
session->connection->set_timeout(config.timeout_content);
|
||||||
auto response = std::shared_ptr<Response>(new Response(session, config.timeout_content), [this](Response *response_ptr) {
|
auto response = std::shared_ptr<Response>(new Response(session, config.timeout_content), [this](Response *response_ptr) {
|
||||||
auto response = std::shared_ptr<Response>(response_ptr);
|
auto response = std::shared_ptr<Response>(response_ptr);
|
||||||
response->send([this, response](const error_code &ec) {
|
response->send_on_delete([this, response](const error_code &ec) {
|
||||||
if(!ec) {
|
if(!ec) {
|
||||||
if(response->close_connection_after_response)
|
if(response->close_connection_after_response)
|
||||||
return;
|
return;
|
||||||
|
|
|
||||||
|
|
@ -69,6 +69,20 @@ int main() {
|
||||||
assert(request->remote_endpoint_port() != 0);
|
assert(request->remote_endpoint_port() != 0);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
server.resource["^/string/dup$"]["POST"] = [](shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request) {
|
||||||
|
auto content = request->content.string();
|
||||||
|
|
||||||
|
// Send content twice, before it has a chance to be written to the socket.
|
||||||
|
*response << "HTTP/1.1 200 OK\r\nContent-Length: " << (content.length() * 2) << "\r\n\r\n"
|
||||||
|
<< content;
|
||||||
|
response->send();
|
||||||
|
*response << content;
|
||||||
|
response->send();
|
||||||
|
|
||||||
|
assert(!request->remote_endpoint_address().empty());
|
||||||
|
assert(request->remote_endpoint_port() != 0);
|
||||||
|
};
|
||||||
|
|
||||||
server.resource["^/string2$"]["POST"] = [](shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request) {
|
server.resource["^/string2$"]["POST"] = [](shared_ptr<HttpServer::Response> response, shared_ptr<HttpServer::Request> request) {
|
||||||
response->write(request->content.string());
|
response->write(request->content.string());
|
||||||
};
|
};
|
||||||
|
|
@ -202,6 +216,15 @@ int main() {
|
||||||
assert(output.str() == "A string");
|
assert(output.str() == "A string");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// Test rapid calls to Response::send
|
||||||
|
stringstream output;
|
||||||
|
stringstream content("A string\n");
|
||||||
|
auto r = client.request("POST", "/string/dup", content);
|
||||||
|
output << r->content.rdbuf();
|
||||||
|
assert(output.str() == "A string\nA string\n");
|
||||||
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
stringstream output;
|
stringstream output;
|
||||||
auto r = client.request("GET", "/info", "", {{"Test Parameter", "test value"}});
|
auto r = client.request("GET", "/info", "", {{"Test Parameter", "test value"}});
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue