From 75d17a3d598ae31fc6c599ba9f866e81bfa200df Mon Sep 17 00:00:00 2001 From: loki Date: Sun, 8 Dec 2019 23:31:37 +0100 Subject: [PATCH] Fix potential deadlock --- sunshine/audio.h | 1 + sunshine/stream.cpp | 73 +++++++++++++-------------------------------- 2 files changed, 22 insertions(+), 52 deletions(-) diff --git a/sunshine/audio.h b/sunshine/audio.h index a1c348f9..eb64b0ed 100644 --- a/sunshine/audio.h +++ b/sunshine/audio.h @@ -11,6 +11,7 @@ struct config_t { }; using packet_t = util::buffer_t; +using packet_queue_t = std::shared_ptr>; void capture(std::shared_ptr> packets, config_t config); } diff --git a/sunshine/stream.cpp b/sunshine/stream.cpp index 267c5659..906578e4 100644 --- a/sunshine/stream.cpp +++ b/sunshine/stream.cpp @@ -108,7 +108,9 @@ struct session_t { std::thread controlThread; std::chrono::steady_clock::time_point pingTimeout; - int client_state; + + video::packet_queue_t video_packets; + audio::packet_queue_t audio_packets; crypto::aes_t gcm_key; crypto::aes_t iv; @@ -411,7 +413,7 @@ void print_msg(PRTSP_MESSAGE msg) { } using frame_queue_t = std::vector; -video::packet_t next_packet(uint16_t &frame, std::shared_ptr> &packets, frame_queue_t &packet_queue) { +video::packet_t next_packet(uint16_t &frame, video::packet_queue_t &packets, frame_queue_t &packet_queue) { auto packet = packets->pop(); if(!packet) { @@ -440,27 +442,7 @@ video::packet_t next_packet(uint16_t &frame, std::shared_ptr replace(const std::string_view &original, const std::string_view &old, const std::string_view &_new) { - std::vector replaced; - auto search = [&](auto it) { - return std::search(it, std::end(original), std::begin(old), std::end(old)); - }; - - auto begin = std::begin(original); - for(auto next = search(begin); next != std::end(original); next = search(++next)) { - std::copy(begin, next, std::back_inserter(replaced)); - std::copy(std::begin(_new), std::end(_new), std::back_inserter(replaced)); - - next = begin = next + old.size(); - } - - std::copy(begin, std::end(original), std::back_inserter(replaced)); - - return replaced; -} -*/ std::vector replace(const std::string_view &original, const std::string_view &old, const std::string_view &_new) { std::vector replaced; @@ -547,7 +529,8 @@ void controlThread(video::event_queue_t idr_events) { // something went wrong :( std::cout << "failed to verify tag"sv << std::endl; - session.client_state = 0; + session.video_packets->stop(); + session.audio_packets->stop(); } if(tagged_cipher_length >= 16 + session.iv.size()) { @@ -558,9 +541,10 @@ void controlThread(video::event_queue_t idr_events) { input::passthrough(input, plaintext.data()); }); - while(session.client_state > 0) { + while(session.video_packets->running()) { if(std::chrono::steady_clock::now() > session.pingTimeout) { - session.client_state = 0; + session.video_packets->stop(); + session.audio_packets->stop(); } server.iterate(500ms); @@ -575,7 +559,7 @@ std::optional recv_peer(udp::socket &sock) { }; udp::endpoint peer; - while (session.client_state > 0) { + while (session.video_packets->running()) { asio::deadline_timer timer { EXECUTOR((&sock)) }; timer.expires_from_now(boost::posix_time::seconds(2)); timer.async_wait([&](sys::error_code c){ @@ -613,19 +597,12 @@ void audioThread() { return; } - std::shared_ptr> packets{new safe::queue_t}; - + auto &packets = session.audio_packets; std::thread captureThread{audio::capture, packets, config.audio}; uint16_t frame{1}; while (auto packet = packets->pop()) { - if(session.client_state == 0) { - packets->stop(); - - break; - } - audio_packet_t audio_packet { (audio_packet_raw_t*)malloc(sizeof(audio_packet_raw_t) + packet->size()) }; audio_packet->rtp.sequenceNumber = util::endian::big(frame++); @@ -652,20 +629,13 @@ void videoThread(video::event_queue_t idr_events) { return; } - video::packet_queue_t packets{new safe::queue_t}; - + auto &packets = session.video_packets; std::thread captureThread{video::capture_display, packets, idr_events, config.monitor}; frame_queue_t packet_queue; uint16_t frame{1}; while (auto packet = next_packet(frame, packets, packet_queue)) { - if(session.client_state == 0) { - packets->stop(); - - break; - } - std::string_view payload{(char *) packet->data, (size_t) packet->size}; std::vector payload_new; @@ -841,7 +811,7 @@ void cmd_setup(host_t &host, peer_t peer, msg_t &&req) { auto seqn_str = std::to_string(req->sequenceNumber); seqn.content = const_cast(seqn_str.c_str()); - if(session.client_state >= 0) { + if(session.video_packets) { // already streaming respond(host, peer, &seqn, 503, "Service Unavailable", req->sequenceNumber, {}); @@ -877,7 +847,7 @@ void cmd_announce(host_t &host, peer_t peer, msg_t &&req) { auto seqn_str = std::to_string(req->sequenceNumber); option.content = const_cast(seqn_str.c_str()); - if(session.client_state >= 0) { + if(session.video_packets) { // already streaming respond(host, peer, &option, 503, "Service Unavailable", req->sequenceNumber, {}); @@ -942,9 +912,7 @@ void cmd_announce(host_t &host, peer_t peer, msg_t &&req) { config.monitor.slicesPerFrame = util::from_view(args.at("x-nv-video[0].videoEncoderSlicesPerFrame"sv)); } catch(std::out_of_range &) { - // This piece of code is reached when for some reason, the payload length received < payload length send - // Not sure if this is an issue with Sunshine or Moonlight or the network - // TODO: find out + respond(host, peer, &option, 400, "BAD REQUEST", req->sequenceNumber, {}); return; } @@ -953,7 +921,9 @@ void cmd_announce(host_t &host, peer_t peer, msg_t &&req) { std::copy(std::begin(iv), std::end(iv), std::begin(session.iv)); session.pingTimeout = std::chrono::steady_clock::now() + config::stream.ping_timeout; - session.client_state = 1; + + session.video_packets = std::make_shared(); + session.audio_packets = std::make_shared(); video::event_queue_t idr_events { new video::event_queue_t::element_type }; session.audioThread = std::thread {audioThread}; @@ -976,8 +946,6 @@ void cmd_play(host_t &host, peer_t peer, msg_t &&req) { } void rtpThread() { - session.client_state = -1; - rtsp_server_t server(RTSP_SETUP_PORT); server.map("OPTIONS"sv, &cmd_option); @@ -990,12 +958,13 @@ void rtpThread() { while(true) { server.iterate(1s); - if(session.client_state == 0) { + if(session.video_packets && !session.video_packets->running()) { session.audioThread.join(); session.videoThread.join(); session.controlThread.join(); - session.client_state = -1; + session.video_packets = video::packet_queue_t(); + session.audio_packets = audio::packet_queue_t(); } } }