diff --git a/CMakeLists.txt b/CMakeLists.txt index e1c9d0db..7ceb9e32 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -134,6 +134,7 @@ set(SOURCE_FILES src/gstrtpreceiver.cpp src/gstrtpreceiver.h) file(GLOB ICONS src/icons/*.png) +file(GLOB SPLASH src/icons/*.mp4) file(GLOB OSD_CONFIGS *_osd.json) include_directories("/usr/include/libdrm" "/usr/include/cairo" "/usr/include/spdlog") @@ -185,7 +186,7 @@ include(GNUInstallDirs) install(TARGETS ${PROJECT_NAME} RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} ) -install(FILES ${ICONS} +install(FILES ${ICONS} ${SPLASH} DESTINATION ${CMAKE_INSTALL_DATAROOTDIR}/${PROJECT_NAME} ) diff --git a/pixelpilot.yaml b/pixelpilot.yaml index 22bc868c..601022a4 100644 --- a/pixelpilot.yaml +++ b/pixelpilot.yaml @@ -1,3 +1,7 @@ +splash: + enabled: false + file: /usr/local/share/pixelpilot/splash.mp4 # use ffmpeg -i input.mp4 -c:v libx265 -x265-params keyint=1:scenecut=0 -c:a copy splash.mp4 + # timeout: 3 gsmenu: enabled: true gpio: diff --git a/src/gstrtpreceiver.cpp b/src/gstrtpreceiver.cpp index 654890c1..549242c0 100644 --- a/src/gstrtpreceiver.cpp +++ b/src/gstrtpreceiver.cpp @@ -9,6 +9,7 @@ #include "gst/app/gstappsink.h" #include "gst/app/gstappsrc.h" #include "spdlog/spdlog.h" +#include #include #include #include @@ -21,6 +22,9 @@ #include #include #include +#include +std::mutex m_switch_mutex; +extern YAML::Node config; namespace pipeline { static std::string gst_create_rtp_caps(const VideoCodec& videoCodec){ @@ -78,13 +82,18 @@ GstRtpReceiver::GstRtpReceiver(int udp_port, const VideoCodec& codec) m_port=udp_port; m_video_codec=codec; initGstreamerOrThrow(); - + splash_enabled = config["splash"]["enabled"].as(false); + splash_file = config["splash"]["file"].as(""); + splash_timeout = config["splash"]["timeout"].as(3); } GstRtpReceiver::GstRtpReceiver(const char *s, const VideoCodec& codec) { unix_socket = strdup(s); m_video_codec = codec; initGstreamerOrThrow(); + splash_enabled = config["splash"]["enabled"].as(false); + splash_file = config["splash"]["file"].as(""); + splash_timeout = config["splash"]["timeout"].as(3); spdlog::debug("Creating receiver socket on {}", unix_socket); @@ -153,14 +162,35 @@ static void loop_pull_appsink_samples(bool& keep_looping,GstElement *app_sink_el std::string GstRtpReceiver::construct_gstreamer_pipeline() { std::stringstream ss; - if (! unix_socket) - ss<<"udpsrc port="<join(); + m_drain_watcher = nullptr; + } if (m_pull_samples_thread) { m_pull_samples_thread->join(); @@ -276,13 +314,64 @@ void GstRtpReceiver::stop_receiving() { m_read_socket_thread = nullptr; } + // Remove probe if it exists + if (buffer_probe_id != 0 && m_gst_pipeline) { + GstElement* src; + if (!unix_socket) { + src = gst_bin_get_by_name(GST_BIN(m_gst_pipeline), "udpsrc"); + } else { + src = gst_bin_get_by_name(GST_BIN(m_gst_pipeline), "appsrc"); + } + if (src) { + GstPad *src_pad = gst_element_get_static_pad(src, "src"); + if (src_pad) { + gst_pad_remove_probe(src_pad, buffer_probe_id); + gst_object_unref(src_pad); + } + gst_object_unref(src); + } + buffer_probe_id = 0; + } + + // Clean up pipeline if (m_gst_pipeline != nullptr) { gst_element_send_event((GstElement*)m_gst_pipeline, gst_event_new_eos()); - gst_element_set_state(m_gst_pipeline, GST_STATE_PAUSED); gst_element_set_state(m_gst_pipeline, GST_STATE_NULL); gst_object_unref(m_gst_pipeline); m_gst_pipeline = nullptr; } + + if (m_eos_probe_id != 0 && m_input_selector) { + GstPad* filesrc_pad = gst_element_get_static_pad(m_input_selector, "sink_1"); + if (filesrc_pad) { + gst_pad_remove_probe(filesrc_pad, m_eos_probe_id); + gst_object_unref(filesrc_pad); + spdlog::debug("Removed EOS probe."); + } + m_eos_probe_id = 0; + } + + if (m_bus) { + gst_bus_remove_watch(m_bus); + gst_object_unref(m_bus); + m_bus = nullptr; + } + + // Remove pad blocks + if (m_fallback_src_pad && m_fallback_block_id != 0) { + gst_pad_remove_probe(m_fallback_src_pad, m_fallback_block_id); + m_fallback_block_id = 0; + } + + // Unref pads + if (m_fallback_src_pad) { + gst_object_unref(m_fallback_src_pad); + m_fallback_src_pad = nullptr; + } + + m_input_selector = nullptr; + m_app_sink_element = nullptr; + spdlog::info("GstRtpReceiver::stop_receiving end"); } @@ -325,9 +414,193 @@ void GstRtpReceiver::switch_to_file_playback(const char * file_path) { m_pull_samples_thread = std::make_unique(&GstRtpReceiver::loop_pull_samples, this); } +static std::atomic last_data_time = {0}; +static std::atomic using_primary = {true}; + +static GstPadProbeReturn +buffer_probe_callback(GstPad *pad, GstPadProbeInfo *info, gpointer user_data) +{ + GstRtpReceiver* receiver = static_cast(user_data); + + if (info->type & GST_PAD_PROBE_TYPE_BUFFER) { + // Update timestamp atomically + last_data_time.store(g_get_monotonic_time()); + + // If we are not currently on primary, switch back + if (!using_primary.load()) { + receiver->switch_to_primary_source(); + } + } + return GST_PAD_PROBE_OK; +} + +static gboolean bus_callback(GstBus *bus, GstMessage *msg, gpointer data) { + GstRtpReceiver* receiver = static_cast(data); + + switch (GST_MESSAGE_TYPE(msg)) { + case GST_MESSAGE_ERROR: { + GError *err; + gchar *debug_info; + gst_message_parse_error(msg, &err, &debug_info); + spdlog::error("GStreamer Error: {}, {}", err->message, debug_info); + g_clear_error(&err); + g_free(debug_info); + break; + } + case GST_MESSAGE_EOS: + // This will likely never be called in this pipeline, but is good practice + spdlog::info("Main pipeline reached EOS."); + break; + + // Handle our custom message + case GST_MESSAGE_APPLICATION: { + const GstStructure *s = gst_message_get_structure(msg); + if (gst_structure_has_name(s, "loop-file-source")) { + spdlog::info("Bus received 'loop-file-source', seeking..."); + // It is now safe to perform the seek from this thread. + gst_element_seek_simple(receiver->get_pipeline(), GST_FORMAT_TIME, + (GstSeekFlags)(GST_SEEK_FLAG_FLUSH | GST_SEEK_FLAG_KEY_UNIT), 0); + } + break; + } + + default: + break; + } + return TRUE; // Keep the watch active +} + +static GstPadProbeReturn +filesrc_eos_probe_cb(GstPad *pad, GstPadProbeInfo *info, gpointer user_data) +{ + if (!(info->type & GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM)) { + return GST_PAD_PROBE_OK; + } + + if (GST_EVENT_TYPE(GST_PAD_PROBE_INFO_EVENT(info)) == GST_EVENT_EOS) { + GstRtpReceiver* receiver = static_cast(user_data); + spdlog::info("Caught EOS on filesrc branch, posting 'loop-file' message..."); + + // Post a custom application message to the bus instead of seeking directly. + // This is thread-safe. + gst_element_post_message(receiver->get_pipeline(), + gst_message_new_application( + GST_OBJECT(receiver->get_pipeline()), + gst_structure_new_empty("loop-file-source") + ) + ); + + // Drop the EOS event to prevent it from deactivating the selector pad. + return GST_PAD_PROBE_DROP; + } + + return GST_PAD_PROBE_OK; +} + + +void GstRtpReceiver::poll_bus() { + if (!m_bus) return; + + GstMessage *msg = gst_bus_pop(m_bus); + while (msg) { + bus_callback(m_bus, msg, this); + gst_message_unref(msg); + msg = gst_bus_pop(m_bus); + } +} + +void GstRtpReceiver::switch_to_primary_source() { + std::lock_guard lock(m_switch_mutex); + + if (using_primary.load()) { + return; // Already on primary + } + + if (!splash_enabled) // switching is disabled, maybe do other useful stuff above + return; + + spdlog::info("Data Resumed! Switching back to default source."); + + if (m_input_selector) { + + + if (m_fallback_src_pad && m_fallback_block_id == 0) { + m_fallback_block_id = gst_pad_add_probe(m_fallback_src_pad, + GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, + NULL, NULL, NULL); + spdlog::debug("Blocked fallback source"); + } + + GstPad* pad = gst_element_get_static_pad(m_input_selector, "sink_0"); + if (pad) { + g_object_set(m_input_selector, "active-pad", pad, NULL); + gst_object_unref(pad); + + // Reset timestamp to prevent immediate timeout + last_data_time.store(g_get_monotonic_time()); + using_primary.store(true); + } + } +} + +void GstRtpReceiver::switch_to_backup_source() { + std::lock_guard lock(m_switch_mutex); + + if (!using_primary.load()) { + return; // Already on backup + } + + if (! splash_enabled) // switching is disabled, maybe do other useful stuff above + return; + + spdlog::warn("Data Timeout! Switching to backup source."); + + if (m_input_selector) { + GstPad* pad = gst_element_get_static_pad(m_input_selector, "sink_1"); + if (pad) { + g_object_set(m_input_selector, "active-pad", pad, NULL); + gst_object_unref(pad); + using_primary.store(false); + } + + if (m_fallback_src_pad && m_fallback_block_id != 0) { + gst_pad_remove_probe(m_fallback_src_pad, m_fallback_block_id); + m_fallback_block_id = 0; + spdlog::debug("Unblocked fallback source"); + } + + } +} + +void GstRtpReceiver::drain_watcher() +{ + const gint64 TIMEOUT_MICROSECONDS = splash_timeout * G_TIME_SPAN_SECOND; // 3 seconds + const int CHECK_INTERVAL_MS = 500; // Check every 500ms + + while (m_drain_watcher_run) { + // Only check for timeout if we are currently on primary + if (using_primary.load()) { + gint64 current_time = g_get_monotonic_time(); + gint64 last_time = last_data_time.load(); + gint64 time_since_data = current_time - last_time; + + // Only switch if we have a valid timestamp and exceeded timeout + if (last_time > 0 && time_since_data > TIMEOUT_MICROSECONDS) { + switch_to_backup_source(); + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(CHECK_INTERVAL_MS)); + } +} + void GstRtpReceiver::switch_to_stream() { stop_receiving(); + // Reset atomic state + using_primary.store(true); + last_data_time.store(0); + const auto pipeline = construct_gstreamer_pipeline(); GError* error = nullptr; m_gst_pipeline = gst_parse_launch(pipeline.c_str(), &error); @@ -345,6 +618,48 @@ void GstRtpReceiver::switch_to_stream() { return; } + m_bus = gst_element_get_bus(m_gst_pipeline); + gst_bus_add_watch(m_bus, bus_callback, this); + + // Get input selector + m_input_selector = gst_bin_get_by_name(GST_BIN(m_gst_pipeline), "selector"); + if (!m_input_selector) { + spdlog::error("Could not find element 'selector' in the pipeline."); + gst_object_unref(m_gst_pipeline); + m_gst_pipeline = nullptr; + return; + } + + if (splash_enabled) { + GstPad* filesrc_pad = gst_element_get_static_pad(m_input_selector, "sink_1"); + if (filesrc_pad) { + m_eos_probe_id = gst_pad_add_probe(filesrc_pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, + filesrc_eos_probe_cb, this, NULL); + gst_object_unref(filesrc_pad); + spdlog::info("Added EOS probe to loop filesrc."); + } else { + spdlog::warn("Could not find selector:sink_1 to add EOS probe."); + } + + GstElement* filesrc = gst_bin_get_by_name(GST_BIN(m_gst_pipeline), "filesrc"); + m_fallback_src_pad = gst_element_get_static_pad(filesrc, "src"); + if (m_fallback_src_pad) { + m_fallback_block_id = gst_pad_add_probe(m_fallback_src_pad, + GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, + NULL, NULL, NULL); + spdlog::info("Initially blocked fallback source"); + } + gst_object_unref(filesrc); + } + + // Set initial active pad to primary (sink_0) + GstPad* initial_pad = gst_element_get_static_pad(m_input_selector, "sink_0"); + if (initial_pad) { + g_object_set(m_input_selector, "active-pad", initial_pad, NULL); + gst_object_unref(initial_pad); + spdlog::info("Set initial active pad to sink_0 (Primary stream)"); + } + // If using Unix socket, setup appsrc with buffer pool if (unix_socket) { GstElement* appsrc = gst_bin_get_by_name(GST_BIN(m_gst_pipeline), "appsrc"); @@ -396,12 +711,45 @@ void GstRtpReceiver::switch_to_stream() { // Setup appsink m_app_sink_element = gst_bin_get_by_name(GST_BIN(m_gst_pipeline), "out_appsink"); - assert(m_app_sink_element); - - gst_element_set_state(m_gst_pipeline, GST_STATE_PLAYING); + if (!m_app_sink_element) { + spdlog::error("Failed to get appsink element"); + return; + } + // Add buffer probe to monitor traffic + GstElement* src; + if (!unix_socket) { + src = gst_bin_get_by_name(GST_BIN(m_gst_pipeline), "udpsrc"); + } else { + src =gst_bin_get_by_name(GST_BIN(m_gst_pipeline), "appsrc"); + } + if (src) { + GstPad *src_pad = gst_element_get_static_pad(src, "src"); + if (src_pad) { + // Pass 'this' pointer instead of selector + buffer_probe_id = gst_pad_add_probe(src_pad, GST_PAD_PROBE_TYPE_BUFFER, + buffer_probe_callback, this, NULL); + gst_object_unref(src_pad); + } + gst_object_unref(src); + } + + // Initialize timestamp BEFORE starting pipeline + last_data_time.store(g_get_monotonic_time()); + + // Start pipeline + GstStateChangeReturn ret = gst_element_set_state(m_gst_pipeline, GST_STATE_PLAYING); + if (ret == GST_STATE_CHANGE_FAILURE) { + spdlog::error("Failed to set pipeline to PLAYING state"); + return; + } + + // Start threads m_pull_samples_run = true; m_pull_samples_thread = std::make_unique(&GstRtpReceiver::loop_pull_samples, this); + + m_drain_watcher_run = true; + m_drain_watcher = std::make_unique(&GstRtpReceiver::drain_watcher, this); } void GstRtpReceiver::set_playback_rate(double rate) { diff --git a/src/gstrtpreceiver.h b/src/gstrtpreceiver.h index 17dc9d8a..7bda16a3 100644 --- a/src/gstrtpreceiver.h +++ b/src/gstrtpreceiver.h @@ -12,6 +12,7 @@ #include #include #include +#include #define MAX_PACKET_SIZE 4096 #define RTP_HEADER_LEN 12 @@ -58,6 +59,11 @@ class GstRtpReceiver { void normal_playback(); void pause(); void resume(); + void switch_to_primary_source(); + void switch_to_backup_source(); + GstElement* get_pipeline() { return m_gst_pipeline; } + void poll_bus(); + private: std::string construct_gstreamer_pipeline(); std::string construct_file_playback_pipeline(const char * file_path); @@ -65,6 +71,7 @@ class GstRtpReceiver { void on_new_sample(std::shared_ptr> sample); // The gstreamer pipeline GstElement * m_gst_pipeline=nullptr; + GstBus *m_bus = nullptr; NEW_FRAME_CALLBACK m_cb; VideoCodec m_video_codec; int m_port; @@ -78,6 +85,21 @@ class GstRtpReceiver { bool m_read_socket_run = false; std::unique_ptr m_read_socket_thread; + // drain watcher + bool splash_enabled = false; + std::string splash_file = ""; + int splash_timeout = 3; + void drain_watcher(); + bool m_drain_watcher_run = false; + std::unique_ptr m_drain_watcher; + GstElement *m_input_selector = nullptr; + guint buffer_probe_id = 0; + guint idle_probe_id = 0; + gulong m_eos_probe_id = 0; + GstPad* m_fallback_src_pad = nullptr; + gulong m_fallback_block_id = 0; + + // dvr void set_playback_rate(double rate); double m_playback_rate = 1.0; diff --git a/src/icons/splash.mp4 b/src/icons/splash.mp4 new file mode 100644 index 00000000..132bdad8 Binary files /dev/null and b/src/icons/splash.mp4 differ diff --git a/src/main.cpp b/src/main.cpp index 20707011..860a7284 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -540,6 +540,9 @@ void main_loop() { // TODO: put gsmenu main loop here msg_manager.check_message(); os_sensors.run(); + if (receiver) { + receiver->poll_bus(); + } sleep(1); } return;