Push live session activity over WebSocket with polling fallback
Some checks failed
ci-bundle.yml / Push live session activity over WebSocket with polling fallback (push) Failing after 0s
ci-copr.yml / Push live session activity over WebSocket with polling fallback (push) Failing after 0s
ci-homebrew.yml / Push live session activity over WebSocket with polling fallback (push) Failing after 0s

This commit is contained in:
Joey Yakimowich-Payne 2026-02-11 16:02:49 -07:00
commit 54afa9bb67
2 changed files with 320 additions and 13 deletions

View file

@ -7,10 +7,18 @@
#define BOOST_BIND_GLOBAL_PLACEHOLDERS #define BOOST_BIND_GLOBAL_PLACEHOLDERS
// standard includes // standard includes
#include <algorithm>
#include <array>
#include <atomic>
#include <chrono>
#include <filesystem> #include <filesystem>
#include <format> #include <format>
#include <fstream> #include <fstream>
#include <mutex>
#include <set> #include <set>
#include <thread>
#include <unordered_map>
#include <vector>
// lib includes // lib includes
#include <boost/algorithm/string.hpp> #include <boost/algorithm/string.hpp>
@ -19,6 +27,8 @@
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
#include <Simple-Web-Server/crypto.hpp> #include <Simple-Web-Server/crypto.hpp>
#include <Simple-Web-Server/server_https.hpp> #include <Simple-Web-Server/server_https.hpp>
#include <openssl/evp.h>
#include <openssl/sha.h>
#ifdef _WIN32 #ifdef _WIN32
#include "platform/windows/misc.h" #include "platform/windows/misc.h"
@ -55,6 +65,119 @@ namespace confighttp {
using resp_https_t = std::shared_ptr<typename SimpleWeb::ServerBase<SimpleWeb::HTTPS>::Response>; using resp_https_t = std::shared_ptr<typename SimpleWeb::ServerBase<SimpleWeb::HTTPS>::Response>;
using req_https_t = std::shared_ptr<typename SimpleWeb::ServerBase<SimpleWeb::HTTPS>::Request>; using req_https_t = std::shared_ptr<typename SimpleWeb::ServerBase<SimpleWeb::HTTPS>::Request>;
namespace {
struct ws_client_t {
std::unique_ptr<SimpleWeb::HTTPS> socket;
std::mutex write_mutex;
std::atomic_bool alive {true};
};
constexpr std::chrono::seconds WS_TOKEN_TTL {30};
constexpr std::chrono::milliseconds WS_PUSH_INTERVAL {250};
constexpr std::string_view WS_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
std::mutex ws_clients_mutex;
std::vector<std::shared_ptr<ws_client_t>> ws_clients;
std::mutex ws_tokens_mutex;
std::unordered_map<std::string, std::chrono::steady_clock::time_point> ws_tokens;
void cleanup_ws_tokens(const std::chrono::steady_clock::time_point &now) {
for (auto it = ws_tokens.begin(); it != ws_tokens.end();) {
if (it->second <= now) {
it = ws_tokens.erase(it);
} else {
++it;
}
}
}
std::string create_ws_token() {
auto now = std::chrono::steady_clock::now();
auto token = uuid_util::uuid_t::generate().string();
std::lock_guard lg {ws_tokens_mutex};
cleanup_ws_tokens(now);
ws_tokens[token] = now + WS_TOKEN_TTL;
return token;
}
bool consume_ws_token(std::string_view token) {
auto now = std::chrono::steady_clock::now();
std::lock_guard lg {ws_tokens_mutex};
cleanup_ws_tokens(now);
auto it = ws_tokens.find(std::string(token));
if (it == ws_tokens.end()) {
return false;
}
ws_tokens.erase(it);
return true;
}
std::string websocket_accept_key(std::string_view key) {
std::string input;
input.reserve(key.size() + WS_GUID.size());
input.append(key);
input.append(WS_GUID);
std::array<unsigned char, SHA_DIGEST_LENGTH> digest {};
SHA1(reinterpret_cast<const unsigned char *>(input.data()), input.size(), digest.data());
std::array<unsigned char, 4 * ((SHA_DIGEST_LENGTH + 2) / 3) + 1> encoded {};
auto len = EVP_EncodeBlock(encoded.data(), digest.data(), digest.size());
return std::string(reinterpret_cast<const char *>(encoded.data()), len);
}
std::string websocket_text_frame(const std::string &payload) {
std::string frame;
frame.reserve(payload.size() + 10);
frame.push_back(static_cast<char>(0x81));
auto len = payload.size();
if (len <= 125) {
frame.push_back(static_cast<char>(len));
} else if (len <= 0xFFFF) {
frame.push_back(static_cast<char>(126));
frame.push_back(static_cast<char>((len >> 8) & 0xFF));
frame.push_back(static_cast<char>(len & 0xFF));
} else {
frame.push_back(static_cast<char>(127));
for (int i = 7; i >= 0; --i) {
frame.push_back(static_cast<char>((len >> (i * 8)) & 0xFF));
}
}
frame.append(payload);
return frame;
}
void ws_write_http_response(std::unique_ptr<SimpleWeb::HTTPS> &socket, SimpleWeb::StatusCode code, std::string_view body) {
if (!socket) {
return;
}
std::string response;
response.reserve(256 + body.size());
response.append("HTTP/1.1 ");
response.append(SimpleWeb::status_code(code));
response.append("\r\nContent-Type: application/json\r\n");
response.append("Connection: close\r\n");
response.append("Content-Length: ");
response.append(std::to_string(body.size()));
response.append("\r\n\r\n");
response.append(body);
boost::system::error_code ec;
boost::asio::write(*socket, boost::asio::buffer(response), ec);
socket->lowest_layer().close(ec);
}
}
enum class op_e { enum class op_e {
ADD, ///< Add client ADD, ///< Add client
REMOVE ///< Remove client REMOVE ///< Remove client
@ -137,6 +260,24 @@ namespace confighttp {
response->write(SimpleWeb::StatusCode::redirection_temporary_redirect, headers); response->write(SimpleWeb::StatusCode::redirection_temporary_redirect, headers);
} }
bool authenticate_header(std::string_view raw_auth) {
if (raw_auth.size() < "Basic "sv.size() || raw_auth.substr(0, "Basic "sv.size()) != "Basic "sv) {
return false;
}
auto auth_data = SimpleWeb::Crypto::Base64::decode(std::string(raw_auth.substr("Basic "sv.length())));
auto index = (int) auth_data.find(':');
if (index >= auth_data.size() - 1) {
return false;
}
auto username = auth_data.substr(0, index);
auto password = auth_data.substr(index + 1);
auto hash = util::hex(crypto::hash(password + config::sunshine.salt)).to_string();
return boost::iequals(username, config::sunshine.username) && hash == config::sunshine.password;
}
/** /**
* @brief Authenticate the user. * @brief Authenticate the user.
* @param response The HTTP response object. * @param response The HTTP response object.
@ -168,19 +309,7 @@ namespace confighttp {
return false; return false;
} }
auto &rawAuth = auth->second; if (!authenticate_header(auth->second)) {
auto authData = SimpleWeb::Crypto::Base64::decode(rawAuth.substr("Basic "sv.length()));
auto index = (int) authData.find(':');
if (index >= authData.size() - 1) {
return false;
}
auto username = authData.substr(0, index);
auto password = authData.substr(index + 1);
auto hash = util::hex(crypto::hash(password + config::sunshine.salt)).to_string();
if (!boost::iequals(username, config::sunshine.username) || hash != config::sunshine.password) {
return false; return false;
} }
@ -819,6 +948,19 @@ namespace confighttp {
send_response(response, output_tree); send_response(response, output_tree);
} }
void getActiveSessionsWsToken(resp_https_t response, req_https_t request) {
if (!authenticate(response, request)) {
return;
}
print_req(request);
nlohmann::json output_tree;
output_tree["token"] = create_ws_token();
output_tree["status"] = true;
send_response(response, output_tree);
}
/** /**
* @brief Unpair a client. * @brief Unpair a client.
* @param response The HTTP response object. * @param response The HTTP response object.
@ -1469,12 +1611,72 @@ namespace confighttp {
server.resource["^/api/clients/list$"]["GET"] = getClients; server.resource["^/api/clients/list$"]["GET"] = getClients;
server.resource["^/api/clients/unpair$"]["POST"] = unpair; server.resource["^/api/clients/unpair$"]["POST"] = unpair;
server.resource["^/api/sessions/active$"]["GET"] = getActiveSessions; server.resource["^/api/sessions/active$"]["GET"] = getActiveSessions;
server.resource["^/api/sessions/ws-token$"]["GET"] = getActiveSessionsWsToken;
server.resource["^/api/apps/close$"]["POST"] = closeApp; server.resource["^/api/apps/close$"]["POST"] = closeApp;
server.resource["^/api/covers/upload$"]["POST"] = uploadCover; server.resource["^/api/covers/upload$"]["POST"] = uploadCover;
server.resource["^/api/covers/([0-9]+)$"]["GET"] = getCover; server.resource["^/api/covers/([0-9]+)$"]["GET"] = getCover;
server.resource["^/images/sunshine.ico$"]["GET"] = getFaviconImage; server.resource["^/images/sunshine.ico$"]["GET"] = getFaviconImage;
server.resource["^/images/logo-sunshine-45.png$"]["GET"] = getSunshineLogoImage; server.resource["^/images/logo-sunshine-45.png$"]["GET"] = getSunshineLogoImage;
server.resource["^/assets\\/.+$"]["GET"] = getNodeModules; server.resource["^/assets\\/.+$"]["GET"] = getNodeModules;
server.on_upgrade = [](std::unique_ptr<SimpleWeb::HTTPS> &socket, req_https_t request) {
if (!socket || request->path != "/api/sessions/active/ws") {
ws_write_http_response(socket, SimpleWeb::StatusCode::client_error_not_found, R"({"status":false})");
return;
}
auto address = net::addr_to_normalized_string(request->remote_endpoint().address());
if (net::from_address(address) > http::origin_web_ui_allowed) {
ws_write_http_response(socket, SimpleWeb::StatusCode::client_error_forbidden, R"({"status":false})");
return;
}
if (config::sunshine.username.empty()) {
ws_write_http_response(socket, SimpleWeb::StatusCode::client_error_unauthorized, R"({"status":false})");
return;
}
auto auth = request->header.find("Authorization");
if (auth != request->header.end() && !authenticate_header(auth->second)) {
ws_write_http_response(socket, SimpleWeb::StatusCode::client_error_unauthorized, R"({"status":false})");
return;
}
auto query = request->parse_query_string();
auto token_it = query.find("token");
if (token_it == query.end() || !consume_ws_token(token_it->second)) {
ws_write_http_response(socket, SimpleWeb::StatusCode::client_error_forbidden, R"({"status":false})");
return;
}
auto key_it = request->header.find("Sec-WebSocket-Key");
if (key_it == request->header.end()) {
ws_write_http_response(socket, SimpleWeb::StatusCode::client_error_bad_request, R"({"status":false})");
return;
}
std::string response;
response.reserve(256);
response.append("HTTP/1.1 101 Switching Protocols\r\n");
response.append("Upgrade: websocket\r\n");
response.append("Connection: Upgrade\r\n");
response.append("Sec-WebSocket-Accept: ");
response.append(websocket_accept_key(key_it->second));
response.append("\r\n\r\n");
boost::system::error_code ec;
boost::asio::write(*socket, boost::asio::buffer(response), ec);
if (ec) {
socket->lowest_layer().close(ec);
return;
}
auto client = std::make_shared<ws_client_t>();
client->socket = std::move(socket);
std::lock_guard lg {ws_clients_mutex};
ws_clients.emplace_back(std::move(client));
};
server.config.reuse_address = true; server.config.reuse_address = true;
server.config.address = net::get_bind_address(address_family); server.config.address = net::get_bind_address(address_family);
server.config.port = port_https; server.config.port = port_https;
@ -1498,11 +1700,58 @@ namespace confighttp {
}; };
std::thread tcp {accept_and_run, &server}; std::thread tcp {accept_and_run, &server};
std::thread ws_broadcast {[&] {
platf::set_thread_name("confighttp::ws");
while (!shutdown_event->peek()) {
std::vector<std::shared_ptr<ws_client_t>> clients;
{
std::lock_guard lg {ws_clients_mutex};
clients = ws_clients;
}
if (!clients.empty()) {
nlohmann::json output_tree;
output_tree["sessions"] = stream::get_active_sessions_info();
output_tree["status"] = true;
auto frame = websocket_text_frame(output_tree.dump());
for (auto &client : clients) {
if (!client->alive.load(std::memory_order_relaxed)) {
continue;
}
std::lock_guard write_lg {client->write_mutex};
if (!client->socket) {
client->alive.store(false, std::memory_order_relaxed);
continue;
}
boost::system::error_code ec;
boost::asio::write(*client->socket, boost::asio::buffer(frame), ec);
if (ec) {
client->alive.store(false, std::memory_order_relaxed);
client->socket->lowest_layer().close(ec);
}
}
std::lock_guard lg {ws_clients_mutex};
ws_clients.erase(std::remove_if(std::begin(ws_clients), std::end(ws_clients), [](const auto &client) {
return !client->alive.load(std::memory_order_relaxed);
}), std::end(ws_clients));
}
std::this_thread::sleep_for(WS_PUSH_INTERVAL);
}
}};
// Wait for any event // Wait for any event
shutdown_event->view(); shutdown_event->view();
server.stop(); server.stop();
ws_broadcast.join();
tcp.join(); tcp.join();
} }
} // namespace confighttp } // namespace confighttp

View file

@ -282,6 +282,8 @@
logFilter: null, logFilter: null,
logInterval: null, logInterval: null,
sessionInterval: null, sessionInterval: null,
sessionSocket: null,
sessionReconnectTimer: null,
restartPressed: false, restartPressed: false,
showApplyMessage: false, showApplyMessage: false,
platform: "", platform: "",
@ -440,13 +442,69 @@
this.refreshLogs(); this.refreshLogs();
this.refreshClients(); this.refreshClients();
this.refreshActiveSessions(); this.refreshActiveSessions();
this.connectActiveSessionsSocket();
}, },
beforeDestroy() { beforeDestroy() {
clearInterval(this.logInterval); clearInterval(this.logInterval);
clearInterval(this.sessionInterval); clearInterval(this.sessionInterval);
clearTimeout(this.sessionReconnectTimer);
if (this.sessionSocket) {
this.sessionSocket.onclose = null;
this.sessionSocket.close();
this.sessionSocket = null;
}
}, },
methods: { methods: {
connectActiveSessionsSocket() {
fetch("./api/sessions/ws-token")
.then((r) => r.json())
.then((r) => {
if (!r || r.status !== true || !r.token) {
throw new Error("No websocket token");
}
const protocol = window.location.protocol === "https:" ? "wss" : "ws";
const wsUrl = `${protocol}://${window.location.host}/api/sessions/active/ws?token=${encodeURIComponent(r.token)}`;
this.sessionSocket = new WebSocket(wsUrl);
this.sessionSocket.onmessage = (event) => {
try {
const payload = JSON.parse(event.data);
if (payload && payload.status === true && payload.sessions) {
this.activeSessions = payload.sessions;
}
} catch (_e) {
return;
}
};
this.sessionSocket.onclose = () => {
this.sessionSocket = null;
clearTimeout(this.sessionReconnectTimer);
this.sessionReconnectTimer = setTimeout(() => {
this.connectActiveSessionsSocket();
}, 1000);
};
this.sessionSocket.onerror = () => {
if (this.sessionSocket) {
this.sessionSocket.close();
}
};
})
.catch(() => {
clearTimeout(this.sessionReconnectTimer);
this.sessionReconnectTimer = setTimeout(() => {
this.connectActiveSessionsSocket();
}, 1000);
});
},
refreshActiveSessions() { refreshActiveSessions() {
if (this.sessionSocket && this.sessionSocket.readyState === WebSocket.OPEN) {
return;
}
fetch("./api/sessions/active") fetch("./api/sessions/active")
.then((r) => r.json()) .then((r) => r.json())
.then((r) => { .then((r) => {