diff --git a/sunshine/stream.cpp b/sunshine/stream.cpp index 52bb6087..2fdfb1af 100644 --- a/sunshine/stream.cpp +++ b/sunshine/stream.cpp @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -559,7 +560,44 @@ void controlThread(video::idr_event_t idr_events) { } } -std::optional recv_peer(udp::socket &sock) { +template +util::Either asio_read(Stream &s, asio::io_service &io, const BufferSequence &bufs, Peer &peer, const asio::deadline_timer::duration_type& expire_time) { + std::optional timer_result, read_result; + + asio::deadline_timer timer { io }; + + timer.expires_from_now(boost::posix_time::milliseconds(config::stream.ping_timeout.count())); + timer.async_wait([&](sys::error_code c){ + timer_result = c; + }); + + std::size_t len = 0; + s.async_receive_from(bufs, peer, 0, [&](const boost::system::error_code &ec, size_t bytes) { + len = bytes; + + read_result = ec; + }); + + io.reset(); + + while(io.run_one()) { + if(read_result) { + timer.cancel(); + } + else if(timer_result) { + s.cancel(); + } + } + + if(*read_result) { + return *read_result; + } + + return len; +} + +template +std::optional recv_peer(std::shared_ptr> &queue, udp::socket &sock, asio::io_service &io) { std::array buf; char ping[] = { @@ -567,31 +605,25 @@ std::optional recv_peer(udp::socket &sock) { }; udp::endpoint peer; - while (session.video_packets->running()) { - asio::deadline_timer timer { EXECUTOR((&sock)) }; - timer.expires_from_now(boost::posix_time::seconds(2)); - timer.async_wait([&](sys::error_code c){ - sock.cancel(); - }); - sys::error_code ping_error; - auto len = sock.receive_from(asio::buffer(buf), peer, 0, ping_error); - if(ping_error == sys::errc::make_error_code(sys::errc::operation_canceled)) { - return {}; + while (queue->running()) { + auto len_or_err = asio_read(sock, io, asio::buffer(buf), peer, boost::posix_time::milliseconds(config::stream.ping_timeout.count())); + + if(len_or_err.has_right() || len_or_err.left() == 0) { + return std::nullopt; } - timer.cancel(); - + auto len = len_or_err.left(); if (len == 4 && !std::memcmp(ping, buf.data(), sizeof(ping))) { std::cout << "PING from ["sv << peer.address().to_string() << ':' << peer.port() << ']' << std::endl; - return std::make_optional(std::move(peer));; + return std::make_optional(std::move(peer)); } std::cout << "Unknown transmission: "sv << util::hex_vec(std::string_view{buf.data(), len}) << std::endl; } - return {}; + return std::nullopt; } void audioThread() { @@ -600,7 +632,7 @@ void audioThread() { asio::io_service io; udp::socket sock{io, udp::endpoint(udp::v6(), AUDIO_STREAM_PORT)}; - auto peer = recv_peer(sock); + auto peer = recv_peer(session.audio_packets, sock, io); if(!peer) { return; } @@ -621,7 +653,9 @@ void audioThread() { // std::cout << "Audio ["sv << frame << "] :: send..."sv << std::endl; } + std::cout << "Audio: Joining()" << std::endl; captureThread.join(); + std::cout << "Audio: Joining()" << std::endl; } void videoThread(video::idr_event_t idr_events) { @@ -632,7 +666,7 @@ void videoThread(video::idr_event_t idr_events) { asio::io_service io; udp::socket sock{io, udp::endpoint(udp::v6(), VIDEO_STREAM_PORT)}; - auto peer = recv_peer(sock); + auto peer = recv_peer(session.video_packets, sock, io); if(!peer) { return; } @@ -726,8 +760,9 @@ void videoThread(video::idr_event_t idr_events) { lowseq += shards.size(); } - + std::cout << "Video: Joining()" << std::endl; captureThread.join(); + std::cout << "Video: Joined()" << std::endl; } void respond(host_t &host, peer_t peer, msg_t &resp) { @@ -985,13 +1020,17 @@ void rtpThread() { server.map("PLAY"sv, &cmd_play); while(true) { - server.iterate(1s); + server.iterate(config::stream.ping_timeout); if(session.video_packets && !session.video_packets->running()) { + std::cout << "Waiting for Audio to end..."sv << std::endl; session.audioThread.join(); + std::cout << "Waiting for Video to end..."sv << std::endl; session.videoThread.join(); + std::cout << "Waiting for Control to end..."sv << std::endl; session.controlThread.join(); + std::cout << "Resetting Session..."sv << std::endl; session.video_packets = video::packet_queue_t(); session.audio_packets = audio::packet_queue_t(); }