From 7e3abefc2cc1347bb4cceb8876c266eed3417aba Mon Sep 17 00:00:00 2001 From: loki Date: Tue, 22 Jun 2021 22:26:11 +0200 Subject: [PATCH] pass session event objects through safe::mail_t --- sunshine/audio.cpp | 13 ++++--- sunshine/audio.h | 6 ++-- sunshine/input.cpp | 14 ++++---- sunshine/input.h | 5 +-- sunshine/main.cpp | 1 + sunshine/main.h | 6 ++++ sunshine/stream.cpp | 77 +++++++++++++++++++++++++----------------- sunshine/thread_safe.h | 20 ++++++----- sunshine/video.cpp | 61 ++++++++++++++++++--------------- sunshine/video.h | 10 ++---- 10 files changed, 122 insertions(+), 91 deletions(-) diff --git a/sunshine/audio.cpp b/sunshine/audio.cpp index 9753f021..437703ba 100644 --- a/sunshine/audio.cpp +++ b/sunshine/audio.cpp @@ -72,7 +72,9 @@ opus_stream_config_t stream_configs[MAX_STREAM_CONFIG] { auto control_shared = safe::make_shared(start_audio_control, stop_audio_control); -void encodeThread(packet_queue_t packets, sample_queue_t samples, config_t config, void *channel_data) { +void encodeThread(sample_queue_t samples, config_t config, void *channel_data) { + auto packets = mail::man->queue(mail::audio_packets); + //FIXME: Pick correct opus_stream_config_t based on config.channels auto stream = &stream_configs[map_stream(config.channels, config.flags[config_t::HIGH_QUALITY])]; @@ -89,7 +91,7 @@ void encodeThread(packet_queue_t packets, sample_queue_t samples, config_t confi auto frame_size = config.packetDuration * stream->sampleRate / 1000; while(auto sample = samples->pop()) { - packet_t packet { 1024 }; // 1KB + buffer_t packet { 1024 }; // 1KB int bytes = opus_multistream_encode(opus.get(), sample->data(), frame_size, std::begin(packet), packet.size()); if(bytes < 0) { @@ -104,7 +106,9 @@ void encodeThread(packet_queue_t packets, sample_queue_t samples, config_t confi } } -void capture(safe::signal_t *shutdown_event, packet_queue_t packets, config_t config, void *channel_data) { +void capture(safe::mail_t mail, config_t config, void *channel_data) { + auto shutdown_event = mail->event(mail::shutdown); + //FIXME: Pick correct opus_stream_config_t based on config.channels auto stream = &stream_configs[map_stream(config.channels, config.flags[config_t::HIGH_QUALITY])]; @@ -148,7 +152,7 @@ void capture(safe::signal_t *shutdown_event, packet_queue_t packets, config_t co } auto samples = std::make_shared(30); - std::thread thread { encodeThread, packets, samples, config, channel_data }; + std::thread thread { encodeThread, samples, config, channel_data }; auto fg = util::fail_guard([&]() { samples->stop(); @@ -225,6 +229,7 @@ int start_audio_control(audio_ctx_t &ctx) { ctx.sink = std::move(*sink); return 0; } + void stop_audio_control(audio_ctx_t &ctx) { // restore audio-sink if applicable if(!ctx.restore_sink) { diff --git a/sunshine/audio.h b/sunshine/audio.h index 27018423..67f3af13 100644 --- a/sunshine/audio.h +++ b/sunshine/audio.h @@ -37,9 +37,9 @@ struct config_t { std::bitset flags; }; -using packet_t = util::buffer_t; -using packet_queue_t = std::shared_ptr>>; -void capture(safe::signal_t *shutdown_event, packet_queue_t packets, config_t config, void *channel_data); +using buffer_t = util::buffer_t; +using packet_t = std::pair; +void capture(safe::mail_t mail, config_t config, void *channel_data); } // namespace audio #endif diff --git a/sunshine/input.cpp b/sunshine/input.cpp index 51b608a2..d3eed222 100644 --- a/sunshine/input.cpp +++ b/sunshine/input.cpp @@ -46,7 +46,6 @@ void free_id(std::bitset &gamepad_mask, int id) { gamepad_mask[id] = false; } -touch_port_event_t touch_port_event; platf::touch_port_t touch_port { 0, 0, 0, 0 }; @@ -89,11 +88,14 @@ struct gamepad_t { }; struct input_t { - input_t() : active_gamepad_state {}, gamepads(MAX_GAMEPADS), mouse_left_button_timeout {} {} + input_t(safe::mail_raw_t::event_t touch_port_event) + : active_gamepad_state {}, gamepads(MAX_GAMEPADS), touch_port_event { std::move(touch_port_event) }, mouse_left_button_timeout {} {} std::uint16_t active_gamepad_state; std::vector gamepads; + safe::mail_raw_t::event_t touch_port_event; + util::ThreadPool::task_id_t mouse_left_button_timeout; }; @@ -201,6 +203,7 @@ void passthrough(std::shared_ptr &input, PNV_ABS_MOUSE_MOVE_PACKET pack input->mouse_left_button_timeout = ENABLE_LEFT_BUTTON_DELAY; } + auto &touch_port_event = input->touch_port_event; if(touch_port_event->peek()) { touch_port = *touch_port_event->pop(); } @@ -552,12 +555,11 @@ void reset(std::shared_ptr &input) { } void init() { - touch_port_event = std::make_unique(); - platf_input = platf::input(); + platf_input = platf::input(); } -std::shared_ptr alloc() { - auto input = std::make_shared(); +std::shared_ptr alloc(safe::mail_t mail) { + auto input = std::make_shared(mail->event(mail::touch_port)); // Workaround to ensure new frames will be captured when a client connects task_pool.pushDelayed([]() { diff --git a/sunshine/input.h b/sunshine/input.h index 4c94bb4a..5c9cfb75 100644 --- a/sunshine/input.h +++ b/sunshine/input.h @@ -18,10 +18,7 @@ void passthrough(std::shared_ptr &input, std::vector &&in void init(); -std::shared_ptr alloc(); - -using touch_port_event_t = std::unique_ptr>; -extern touch_port_event_t touch_port_event; +std::shared_ptr alloc(safe::mail_t mail); } // namespace input #endif //SUNSHINE_INPUT_H diff --git a/sunshine/main.cpp b/sunshine/main.cpp index 308e781e..00638049 100644 --- a/sunshine/main.cpp +++ b/sunshine/main.cpp @@ -200,6 +200,7 @@ int main(int argc, char *argv[]) { } reed_solomon_init(); + input::init(); if(video::init()) { return 2; } diff --git a/sunshine/main.h b/sunshine/main.h index 0c1a2869..7de488ab 100644 --- a/sunshine/main.h +++ b/sunshine/main.h @@ -35,10 +35,16 @@ namespace mail { extern safe::mail_t man; +// Global mail MAIL(shutdown); MAIL(broadcast_shutdown); +MAIL(video_packets); +MAIL(audio_packets); +// Local mail +MAIL(touch_port); +MAIL(idr); #undef MAIL } // namespace mail diff --git a/sunshine/stream.cpp b/sunshine/stream.cpp index cf0e598f..8e124025 100644 --- a/sunshine/stream.cpp +++ b/sunshine/stream.cpp @@ -150,10 +150,6 @@ public: }; struct broadcast_ctx_t { - video::packet_queue_t video_packets; - audio::packet_queue_t audio_packets; - - std::shared_ptr> broadcast_shutdown_event; message_queue_queue_t message_queue_queue; std::thread recv_thread; @@ -170,6 +166,9 @@ struct broadcast_ctx_t { struct session_t { config_t config; + + safe::mail_t mail; + std::shared_ptr input; std::thread audioThread; @@ -182,7 +181,7 @@ struct session_t { struct { int lowseq; udp::endpoint peer; - video::idr_event_t idr_events; + safe::mail_raw_t::event_t idr_events; } video; struct { @@ -197,7 +196,7 @@ struct session_t { crypto::aes_t gcm_key; crypto::aes_t iv; - safe::signal_t shutdown_event; + safe::mail_raw_t::event_t shutdown_event; safe::signal_t controlEnd; std::atomic state; @@ -402,7 +401,7 @@ std::vector replace(const std::string_view &original, const std::string return replaced; } -void controlBroadcastThread(safe::signal_t *shutdown_event, control_server_t *server) { +void controlBroadcastThread(control_server_t *server) { server->map(packetTypes[IDX_START_A], [&](session_t *session, const std::string_view &payload) { BOOST_LOG(debug) << "type [IDX_START_A]"sv; }); @@ -466,6 +465,7 @@ void controlBroadcastThread(safe::signal_t *shutdown_event, control_server_t *se input::passthrough(session->input, std::move(plaintext)); }); + auto shutdown_event = mail::man->event(mail::broadcast_shutdown); while(!shutdown_event->peek()) { { auto lg = server->_map_addr_session.lock(); @@ -507,7 +507,7 @@ void controlBroadcastThread(safe::signal_t *shutdown_event, control_server_t *se auto lg = server->_map_addr_session.lock(); for(auto pos = std::begin(*server->_map_addr_session); pos != std::end(*server->_map_addr_session); ++pos) { auto session = pos->second.second; - session->shutdown_event.raise(true); + session->shutdown_event->raise(true); } } @@ -522,8 +522,10 @@ void recvThread(broadcast_ctx_t &ctx) { auto &video_sock = ctx.video_sock; auto &audio_sock = ctx.audio_sock; - auto &message_queue_queue = ctx.message_queue_queue; - auto &io = ctx.io; + auto &message_queue_queue = ctx.message_queue_queue; + auto broadcast_shutdown_event = mail::man->event(mail::broadcast_shutdown); + + auto &io = ctx.io; udp::endpoint peer; @@ -594,12 +596,15 @@ void recvThread(broadcast_ctx_t &ctx) { video_sock.async_receive_from(asio::buffer(buf[0]), peer, 0, recv_func[0]); audio_sock.async_receive_from(asio::buffer(buf[1]), peer, 0, recv_func[1]); - while(!ctx.broadcast_shutdown_event->peek()) { + while(!broadcast_shutdown_event->peek()) { io.run(); } } -void videoBroadcastThread(safe::signal_t *shutdown_event, udp::socket &sock, video::packet_queue_t packets) { +void videoBroadcastThread(udp::socket &sock) { + auto shutdown_event = mail::man->event(mail::broadcast_shutdown); + auto packets = mail::man->queue(mail::video_packets); + while(auto packet = packets->pop()) { if(shutdown_event->peek()) { break; @@ -693,7 +698,10 @@ void videoBroadcastThread(safe::signal_t *shutdown_event, udp::socket &sock, vid shutdown_event->raise(true); } -void audioBroadcastThread(safe::signal_t *shutdown_event, udp::socket &sock, audio::packet_queue_t packets) { +void audioBroadcastThread(udp::socket &sock) { + auto shutdown_event = mail::man->event(mail::broadcast_shutdown); + auto packets = mail::man->queue(mail::audio_packets); + while(auto packet = packets->pop()) { if(shutdown_event->peek()) { break; @@ -722,8 +730,6 @@ void audioBroadcastThread(safe::signal_t *shutdown_event, udp::socket &sock, aud } int start_broadcast(broadcast_ctx_t &ctx) { - ctx.broadcast_shutdown_event = mail::man->event(mail::broadcast_shutdown); - if(ctx.control_server.bind(CONTROL_PORT)) { BOOST_LOG(error) << "Couldn't bind Control server to port ["sv << CONTROL_PORT << "], likely another process already bound to the port"sv; @@ -759,13 +765,11 @@ int start_broadcast(broadcast_ctx_t &ctx) { return -1; } - ctx.video_packets = std::make_shared(30); - ctx.audio_packets = std::make_shared(30); ctx.message_queue_queue = std::make_shared(30); - ctx.video_thread = std::thread { videoBroadcastThread, ctx.broadcast_shutdown_event.get(), std::ref(ctx.video_sock), ctx.video_packets }; - ctx.audio_thread = std::thread { audioBroadcastThread, ctx.broadcast_shutdown_event.get(), std::ref(ctx.audio_sock), ctx.audio_packets }; - ctx.control_thread = std::thread { controlBroadcastThread, ctx.broadcast_shutdown_event.get(), &ctx.control_server }; + ctx.video_thread = std::thread { videoBroadcastThread, std::ref(ctx.video_sock) }; + ctx.audio_thread = std::thread { audioBroadcastThread, std::ref(ctx.audio_sock) }; + ctx.control_thread = std::thread { controlBroadcastThread, &ctx.control_server }; ctx.recv_thread = std::thread { recvThread, std::ref(ctx) }; @@ -773,11 +777,16 @@ int start_broadcast(broadcast_ctx_t &ctx) { } void end_broadcast(broadcast_ctx_t &ctx) { - ctx.broadcast_shutdown_event->raise(true); + auto broadcast_shutdown_event = mail::man->event(mail::broadcast_shutdown); + + broadcast_shutdown_event->raise(true); + + auto video_packets = mail::man->queue(mail::video_packets); + auto audio_packets = mail::man->queue(mail::audio_packets); // Minimize delay stopping video/audio threads - ctx.video_packets->stop(); - ctx.audio_packets->stop(); + video_packets->stop(); + audio_packets->stop(); ctx.message_queue_queue->stop(); ctx.io.stop(); @@ -785,8 +794,8 @@ void end_broadcast(broadcast_ctx_t &ctx) { ctx.video_sock.close(); ctx.audio_sock.close(); - ctx.video_packets.reset(); - ctx.audio_packets.reset(); + video_packets.reset(); + audio_packets.reset(); BOOST_LOG(debug) << "Waiting for main listening thread to end..."sv; ctx.recv_thread.join(); @@ -798,7 +807,7 @@ void end_broadcast(broadcast_ctx_t &ctx) { ctx.control_thread.join(); BOOST_LOG(debug) << "All broadcasting threads ended"sv; - ctx.broadcast_shutdown_event->reset(); + broadcast_shutdown_event->reset(); } int recv_ping(decltype(broadcast)::ptr_t ref, socket_e type, asio::ip::address &addr, std::chrono::milliseconds timeout) { @@ -850,7 +859,7 @@ void videoThread(session_t *session, std::string addr_str) { session->video.peer.port(port); BOOST_LOG(debug) << "Start capturing Video"sv; - video::capture(&session->shutdown_event, ref->video_packets, session->video.idr_events, session->config.monitor, session); + video::capture(session->mail, session->config.monitor, session); } void audioThread(session_t *session, std::string addr_str) { @@ -872,7 +881,7 @@ void audioThread(session_t *session, std::string addr_str) { session->audio.peer.port(port); BOOST_LOG(debug) << "Start capturing Audio"sv; - audio::capture(&session->shutdown_event, ref->audio_packets, session->config.audio, session); + audio::capture(session->mail, session->config.audio, session); } namespace session { @@ -888,7 +897,7 @@ void stop(session_t &session) { return; } - session.shutdown_event.raise(true); + session.shutdown_event->raise(true); } void join(session_t &session) { @@ -905,7 +914,7 @@ void join(session_t &session) { } int start(session_t &session, const std::string &addr_string) { - session.input = input::alloc(); + session.input = input::alloc(session.mail); session.broadcast_ref = broadcast.ref(); if(!session.broadcast_ref) { @@ -927,11 +936,15 @@ int start(session_t &session, const std::string &addr_string) { std::shared_ptr alloc(config_t &config, crypto::aes_t &gcm_key, crypto::aes_t &iv) { auto session = std::make_shared(); + auto mail = std::make_shared(); + + session->shutdown_event = mail->event(mail::shutdown); + session->config = config; session->gcm_key = gcm_key; session->iv = iv; - session->video.idr_events = std::make_shared(); + session->video.idr_events = mail->event(mail::idr); session->video.lowseq = 0; session->audio.frame = 1; @@ -939,6 +952,8 @@ std::shared_ptr alloc(config_t &config, crypto::aes_t &gcm_key, crypt session->control.peer = nullptr; session->state.store(state_e::STOPPED, std::memory_order_relaxed); + session->mail = std::move(mail); + return session; } } // namespace session diff --git a/sunshine/thread_safe.h b/sunshine/thread_safe.h index f36d4182..01a122fe 100644 --- a/sunshine/thread_safe.h +++ b/sunshine/thread_safe.h @@ -98,8 +98,6 @@ public: } bool peek() { - std::lock_guard lg { _lock }; - return _continue && (bool)_status; } @@ -243,8 +241,6 @@ public: } bool peek() { - std::lock_guard lg { _lock }; - return _continue && !_queue.empty(); } @@ -447,13 +443,19 @@ public: template inline auto lock(const std::weak_ptr &wp) { - return std::reinterpret_pointer_cast>(wp.lock()); + return std::reinterpret_pointer_cast(wp.lock()); } class mail_raw_t : public std::enable_shared_from_this { public: template - std::shared_ptr>> event(const std::string_view &id) { + using event_t = std::shared_ptr>>; + + template + using queue_t = std::shared_ptr>>; + + template + event_t event(const std::string_view &id) { std::lock_guard lg { mutex }; auto it = id_to_post.find(id); @@ -461,14 +463,14 @@ public: return lock>(it->second); } - auto post = std::make_shared>>(shared_from_this()); + auto post = std::make_shared::element_type>(shared_from_this()); id_to_post.emplace(std::pair> { std::string { id }, post }); return post; } template - std::shared_ptr>> queue(const std::string_view &id) { + queue_t queue(const std::string_view &id) { std::lock_guard lg { mutex }; auto it = id_to_post.find(id); @@ -476,7 +478,7 @@ public: return lock>(it->second); } - auto post = std::make_shared>>(shared_from_this(), 32); + auto post = std::make_shared::element_type>(shared_from_this(), 32); id_to_post.emplace(std::pair> { std::string { id }, post }); return post; diff --git a/sunshine/video.cpp b/sunshine/video.cpp index 45c6537f..6c4b490d 100644 --- a/sunshine/video.cpp +++ b/sunshine/video.cpp @@ -330,7 +330,6 @@ public: replacements = std::move(other.replacements); sps = std::move(other.sps); vps = std::move(other.vps); - pps = std::move(other.pps); inject = other.inject; @@ -344,17 +343,18 @@ public: cbs::nal_t sps; cbs::nal_t vps; - cbs::nal_t pps; // inject sps/vps data into idr pictures int inject; }; struct sync_session_ctx_t { - safe::signal_t *shutdown_event; safe::signal_t *join_event; - packet_queue_t packets; - idr_event_t idr_events; + safe::mail_raw_t::event_t shutdown_event; + safe::mail_raw_t::queue_t packets; + safe::mail_raw_t::event_t idr_events; + safe::mail_raw_t::event_t touch_port_events; + config_t config; int frame_nr; int key_frame_nr; @@ -696,7 +696,7 @@ void captureThread( } } -int encode(int64_t frame_nr, session_t &session, frame_t::pointer frame, packet_queue_t &packets, void *channel_data) { +int encode(int64_t frame_nr, session_t &session, frame_t::pointer frame, safe::mail_raw_t::queue_t &packets, void *channel_data) { frame->pts = frame_nr; auto &ctx = session.ctx; @@ -986,9 +986,7 @@ std::optional make_session(const encoder_t &encoder, const config_t & void encode_run( int &frame_nr, int &key_frame_nr, // Store progress of the frame number - safe::signal_t *shutdown_event, // Signal for shutdown event of the session - packet_queue_t packets, - idr_event_t idr_events, + safe::mail_t mail, img_event_t images, config_t config, int width, int height, @@ -1007,6 +1005,11 @@ void encode_run( auto next_frame = std::chrono::steady_clock::now(); auto frame = session->device->frame; + + auto shutdown_event = mail->event(mail::shutdown); + auto packets = mail::man->queue(mail::video_packets); + auto idr_events = mail->queue(mail::idr); + while(true) { if(shutdown_event->peek() || reinit_event.peek() || !images->running()) { break; @@ -1109,9 +1112,6 @@ encode_e encode_run_sync(std::vector> &synce return encode_e::error; } - // absolute mouse coordinates require that the dimensions of the screen are known - input::touch_port_event->raise(disp->offset_x, disp->offset_y, disp->width, disp->height); - std::vector synced_sessions; for(auto &ctx : synced_session_ctxs) { auto synced_session = make_synced_session(disp.get(), encoder, *img, *ctx); @@ -1262,12 +1262,12 @@ void captureThreadSync() { } void capture_async( - safe::signal_t *shutdown_event, - packet_queue_t &packets, - idr_event_t &idr_events, + safe::mail_t mail, config_t &config, void *channel_data) { + auto shutdown_event = mail->event(mail::shutdown); + auto images = std::make_shared(); auto lg = util::fail_guard([&]() { images->stop(); @@ -1290,6 +1290,8 @@ void capture_async( int frame_nr = 1; int key_frame_nr = 1; + auto touch_port_event = mail->event(mail::touch_port); + while(!shutdown_event->peek() && images->running()) { // Wait for the main capture event when the display is being reinitialized if(ref->reinit_event.peek()) { @@ -1321,12 +1323,11 @@ void capture_async( images->raise(std::move(dummy_img)); // absolute mouse coordinates require that the dimensions of the screen are known - input::touch_port_event->raise(display->offset_x, display->offset_y, display->width, display->height); + touch_port_event->raise(display->offset_x, display->offset_y, display->width, display->height); encode_run( frame_nr, key_frame_nr, - shutdown_event, - packets, idr_events, images, + mail, images, config, display->width, display->height, hwdevice.get(), ref->reinit_event, *ref->encoder_p, @@ -1335,21 +1336,30 @@ void capture_async( } void capture( - safe::signal_t *shutdown_event, - packet_queue_t packets, - idr_event_t idr_events, + safe::mail_t mail, config_t config, void *channel_data) { + auto idr_events = mail->event(mail::idr); + idr_events->raise(std::make_pair(0, 1)); if(encoders.front().flags & SYSTEM_MEMORY) { - capture_async(shutdown_event, packets, idr_events, config, channel_data); + capture_async(std::move(mail), config, channel_data); } else { safe::signal_t join_event; auto ref = capture_thread_sync.ref(); ref->encode_session_ctx_queue.raise(sync_session_ctx_t { - shutdown_event, &join_event, packets, idr_events, config, 1, 1, channel_data }); + &join_event, + mail->event(mail::shutdown), + mail::man->queue(mail::video_packets), + std::move(idr_events), + mail->event(mail::touch_port), + config, + 1, + 1, + channel_data, + }); // Wait for join signal join_event.view(); @@ -1390,7 +1400,7 @@ int validate_config(std::shared_ptr &disp, const encoder_t &en frame->pict_type = AV_PICTURE_TYPE_I; - auto packets = std::make_shared(30); + auto packets = mail::man->queue(mail::video_packets); while(!packets->peek()) { if(encode(1, *session, frame, packets, nullptr)) { return -1; @@ -1520,9 +1530,6 @@ bool validate_encoder(encoder_t &encoder) { } int init() { - // video depends on input for input::touch_port_event - input::init(); - BOOST_LOG(info) << "//////////////////////////////////////////////////////////////////"sv; BOOST_LOG(info) << "// //"sv; BOOST_LOG(info) << "// Testing for available encoders, this may generate errors. //"sv; diff --git a/sunshine/video.h b/sunshine/video.h index e6b7dd21..1c142147 100644 --- a/sunshine/video.h +++ b/sunshine/video.h @@ -56,10 +56,8 @@ struct packet_raw_t : public AVPacket { void *channel_data; }; -using packet_t = std::unique_ptr; -using packet_queue_t = std::shared_ptr>; -using idr_event_t = std::shared_ptr>>; -using img_event_t = std::shared_ptr>>; +using packet_t = std::unique_ptr; +using idr_t = std::pair; struct config_t { int width; @@ -88,9 +86,7 @@ struct __attribute__((__aligned__(16))) color_t { extern color_t colors[4]; void capture( - safe::signal_t *shutdown_event, - packet_queue_t packets, - idr_event_t idr_events, + safe::mail_t mail, config_t config, void *channel_data);