New asio: removed use of strand, and fixed steady_timer constructor call

This commit is contained in:
eidheim 2019-05-22 11:50:18 +00:00 committed by eidheim
commit 1b5f062678
3 changed files with 32 additions and 42 deletions

View file

@ -45,10 +45,10 @@ namespace SimpleWeb {
std::shared_ptr<Session> session;
long timeout_content;
io_context::strand strand;
std::mutex send_queue_mutex;
std::list<std::pair<std::shared_ptr<asio::streambuf>, std::function<void(const error_code &)>>> send_queue;
Response(std::shared_ptr<Session> session_, long timeout_content) noexcept : std::ostream(nullptr), session(std::move(session_)), timeout_content(timeout_content), strand(get_socket_context(*session->connection->socket)) {
Response(std::shared_ptr<Session> session_, long timeout_content) noexcept : std::ostream(nullptr), session(std::move(session_)), timeout_content(timeout_content) {
rdbuf(streambuf.get());
}
@ -70,30 +70,30 @@ namespace SimpleWeb {
*this << "\r\n";
}
/// send_queue_mutex must be locked here
void send_from_queue() {
auto self = this->shared_from_this();
SimpleWeb::post(strand, [self]() {
asio::async_write(*self->session->connection->socket, *self->send_queue.begin()->first, SimpleWeb::bind_executor(self->strand, [self](const error_code &ec, std::size_t /*bytes_transferred*/) {
auto lock = self->session->connection->handler_runner->continue_lock();
if(!lock)
return;
if(!ec) {
auto it = self->send_queue.begin();
if(it->second)
it->second(ec);
self->send_queue.erase(it);
if(self->send_queue.size() > 0)
self->send_from_queue();
asio::async_write(*self->session->connection->socket, *send_queue.begin()->first, [self](const error_code &ec, std::size_t /*bytes_transferred*/) {
auto lock = self->session->connection->handler_runner->continue_lock();
if(!lock)
return;
std::lock_guard<std::mutex> send_queue_lock(self->send_queue_mutex);
if(!ec) {
auto it = self->send_queue.begin();
if(it->second)
it->second(ec);
self->send_queue.erase(it);
if(self->send_queue.size() > 0)
self->send_from_queue();
}
else {
// All handlers in the queue is called with ec:
for(auto &pair : self->send_queue) {
if(pair.second)
pair.second(ec);
}
else {
// All handlers in the queue is called with ec:
for(auto &pair : self->send_queue) {
if(pair.second)
pair.second(ec);
}
self->send_queue.clear();
}
}));
self->send_queue.clear();
}
});
}
@ -123,12 +123,10 @@ namespace SimpleWeb {
this->streambuf = std::unique_ptr<asio::streambuf>(new asio::streambuf());
rdbuf(this->streambuf.get());
auto self = this->shared_from_this();
SimpleWeb::post(strand, [self, streambuf, callback]() {
self->send_queue.emplace_back(streambuf, callback);
if(self->send_queue.size() == 1)
self->send_from_queue();
});
std::lock_guard<std::mutex> lock(send_queue_mutex);
send_queue.emplace_back(streambuf, callback);
if(send_queue.size() == 1)
send_from_queue();
}
/// Write directly to stream buffer using std::ostream::write
@ -278,7 +276,7 @@ namespace SimpleWeb {
return;
}
timer = std::unique_ptr<asio::steady_timer>(new asio::steady_timer(get_socket_context(*socket), std::chrono::seconds(seconds)));
timer = std::unique_ptr<asio::steady_timer>(new asio::steady_timer(get_socket_executor(*socket), std::chrono::seconds(seconds)));
auto self = this->shared_from_this();
timer->async_wait([self](const error_code &ec) {
if(!ec)