Made use of clang's Thread Safety Analysis
This commit is contained in:
parent
39fb720aa9
commit
a743915d72
5 changed files with 153 additions and 40 deletions
|
|
@ -2,13 +2,13 @@
|
|||
#define SERVER_HTTP_HPP
|
||||
|
||||
#include "asio_compatibility.hpp"
|
||||
#include "mutex.hpp"
|
||||
#include "utility.hpp"
|
||||
#include <functional>
|
||||
#include <iostream>
|
||||
#include <limits>
|
||||
#include <list>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
#include <sstream>
|
||||
#include <thread>
|
||||
#include <unordered_set>
|
||||
|
|
@ -45,8 +45,8 @@ namespace SimpleWeb {
|
|||
std::shared_ptr<Session> session;
|
||||
long timeout_content;
|
||||
|
||||
std::mutex send_queue_mutex;
|
||||
std::list<std::pair<std::shared_ptr<asio::streambuf>, std::function<void(const error_code &)>>> send_queue;
|
||||
Mutex send_queue_mutex;
|
||||
std::list<std::pair<std::shared_ptr<asio::streambuf>, std::function<void(const error_code &)>>> send_queue GUARDED_BY(send_queue_mutex);
|
||||
|
||||
Response(std::shared_ptr<Session> session_, long timeout_content) noexcept : std::ostream(nullptr), session(std::move(session_)), timeout_content(timeout_content) {
|
||||
rdbuf(streambuf.get());
|
||||
|
|
@ -70,15 +70,14 @@ namespace SimpleWeb {
|
|||
*this << "\r\n";
|
||||
}
|
||||
|
||||
/// send_queue_mutex must be locked here
|
||||
void send_from_queue() {
|
||||
void send_from_queue() REQUIRES(send_queue_mutex) {
|
||||
auto self = this->shared_from_this();
|
||||
asio::async_write(*self->session->connection->socket, *send_queue.begin()->first, [self](const error_code &ec, std::size_t /*bytes_transferred*/) {
|
||||
auto lock = self->session->connection->handler_runner->continue_lock();
|
||||
if(!lock)
|
||||
return;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(self->send_queue_mutex);
|
||||
LockGuard lock(self->send_queue_mutex);
|
||||
if(!ec) {
|
||||
auto it = self->send_queue.begin();
|
||||
auto callback = std::move(it->second);
|
||||
|
|
@ -133,7 +132,7 @@ namespace SimpleWeb {
|
|||
this->streambuf = std::unique_ptr<asio::streambuf>(new asio::streambuf());
|
||||
rdbuf(this->streambuf.get());
|
||||
|
||||
std::lock_guard<std::mutex> lock(send_queue_mutex);
|
||||
LockGuard lock(send_queue_mutex);
|
||||
send_queue.emplace_back(streambuf, callback);
|
||||
if(send_queue.size() == 1)
|
||||
send_from_queue();
|
||||
|
|
@ -451,10 +450,10 @@ namespace SimpleWeb {
|
|||
acceptor->close(ec);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(*connections_mutex);
|
||||
for(auto &connection : *connections)
|
||||
LockGuard lock(connections->mutex);
|
||||
for(auto &connection : connections->set)
|
||||
connection->close();
|
||||
connections->clear();
|
||||
connections->set.clear();
|
||||
}
|
||||
|
||||
if(internal_io_service)
|
||||
|
|
@ -473,12 +472,15 @@ namespace SimpleWeb {
|
|||
std::unique_ptr<asio::ip::tcp::acceptor> acceptor;
|
||||
std::vector<std::thread> threads;
|
||||
|
||||
std::shared_ptr<std::unordered_set<Connection *>> connections;
|
||||
std::shared_ptr<std::mutex> connections_mutex;
|
||||
struct Connections {
|
||||
Mutex mutex;
|
||||
std::unordered_set<Connection *> set GUARDED_BY(mutex);
|
||||
};
|
||||
std::shared_ptr<Connections> connections;
|
||||
|
||||
std::shared_ptr<ScopeRunner> handler_runner;
|
||||
|
||||
ServerBase(unsigned short port) noexcept : config(port), connections(new std::unordered_set<Connection *>()), connections_mutex(new std::mutex()), handler_runner(new ScopeRunner()) {}
|
||||
ServerBase(unsigned short port) noexcept : config(port), connections(new Connections()), handler_runner(new ScopeRunner()) {}
|
||||
|
||||
virtual void after_bind() {}
|
||||
virtual void accept() = 0;
|
||||
|
|
@ -486,19 +488,18 @@ namespace SimpleWeb {
|
|||
template <typename... Args>
|
||||
std::shared_ptr<Connection> create_connection(Args &&... args) noexcept {
|
||||
auto connections = this->connections;
|
||||
auto connections_mutex = this->connections_mutex;
|
||||
auto connection = std::shared_ptr<Connection>(new Connection(handler_runner, std::forward<Args>(args)...), [connections, connections_mutex](Connection *connection) {
|
||||
auto connection = std::shared_ptr<Connection>(new Connection(handler_runner, std::forward<Args>(args)...), [connections](Connection *connection) {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(*connections_mutex);
|
||||
auto it = connections->find(connection);
|
||||
if(it != connections->end())
|
||||
connections->erase(it);
|
||||
LockGuard lock(connections->mutex);
|
||||
auto it = connections->set.find(connection);
|
||||
if(it != connections->set.end())
|
||||
connections->set.erase(it);
|
||||
}
|
||||
delete connection;
|
||||
});
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(*connections_mutex);
|
||||
connections->emplace(connection.get());
|
||||
LockGuard lock(connections->mutex);
|
||||
connections->set.emplace(connection.get());
|
||||
}
|
||||
return connection;
|
||||
}
|
||||
|
|
@ -684,10 +685,10 @@ namespace SimpleWeb {
|
|||
if(it != session->request->header.end()) {
|
||||
// remove connection from connections
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(*connections_mutex);
|
||||
auto it = connections->find(session->connection.get());
|
||||
if(it != connections->end())
|
||||
connections->erase(it);
|
||||
LockGuard lock(connections->mutex);
|
||||
auto it = connections->set.find(session->connection.get());
|
||||
if(it != connections->set.end())
|
||||
connections->set.erase(it);
|
||||
}
|
||||
|
||||
on_upgrade(session->connection->socket, session->request);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue