Because of the Thanksgiving holiday in the U.S., TI E2E™ design support forum responses may be delayed from November 25 through December 2. Thank you for your patience.

This thread has been locked.

If you have a related question, please click the "Ask a related question" button in the top right corner. The newly created question will be automatically linked to this question.

AM62A7: Error Handling in GStreamer: Releasing Resources After Multiple CSI to MP4 Conversions

Part Number: AM62A7

Tool/software:

When saving the CSI frame as an MP4 file as shown in the attached code, an error occurs if the number of files exceeds 32 because the resources are not properly released.

I would appreciate any advice on how to properly release the resources.

  1. CSI Reception Pipeline

GstElement* pipeline = gst_parse_launch(
"v4l2src device=/dev/video3 io-mode=5 ! "
"video/x-raw,format=UYVY,width=1920,height=1080,framerate=30/1 ! tiovxmemalloc pool-size=8 ! "
"appsink name=sink emit-signals=True sync=True max-buffers=30 drop=False",
nullptr);

2. GStreamer Pipeline (MP4 Conversion)

std::string pipeline_str = "appsrc name=mysrc ! videoconvert ! v4l2h264enc ! video/x-h264,profile=baseline ! h264parse ! mp4mux ! filesink location=" + filename;

3. Resource Release

  • Please refer to the attached stop_and_cleanup() function.

# Error Code

[12148.061515] vdec 30210000.video-codec: Too many simultaneous instances: 32 (max: 32)

gstsave_20240819.cpp

old Issue: e2e.ti.com/.../5364680

  • Hi Hyunwoo,

    Can you elaborate on the use-case by dumping each buffer from CSI to encode and dumping into a file? 

    The Max instances of Encode/Decode with Wave5 (Codec IP) is 32 and thats the reason, when you are trying to instantiate another instance, it throws this error.

    https://git.ti.com/cgit/ti-linux-kernel/ti-linux-kernel/tree/drivers/media/platform/chips-media/wave5/wave5-vpuconfig.h?h=ti-linux-6.6.y#n31

    At a given point of time, max 32 instances of encode/decode can run simultaneously. 

    Best Regards,

    Suren

  • I am not writing 32 files simultaneously. I am writing the files sequentially, and once a file is completed, the next one starts. However, when writing the 32nd file, an error occurs. It seems that the previous 31 files are not being properly closed.

    Could you suggest a way to accurately close the instance once the file writing is completed?

  • Hi Hyunwoo,

    I was busy today with a customer escalation, Will try to look into the issue tomorrow and update. 

    Best Regards,

    Suren

  • I will be waiting for your reply.

  • Hi Hyunwoo,

    Can you add the below line in the code  stop_and_cleanup() and see if it helps:

    if (appsrc) {
                    std::cout << "Sending EOS signal to appsrc...\n";
                    GstFlowReturn ret = gst_app_src_end_of_stream(GST_APP_SRC(appsrc));  // EOS 신호 전송
                    if (ret != GST_FLOW_OK) {
                        std::cerr << "Failed to send EOS signal to appsrc.\n";
                    }
                     gst_object_unref(appsrc);  
                }

    Best Regards,

    Suren

  • I added the code you advised (gst_object_unref(appsrc);) to stop_and_cleanup(), but the result remains the same. I also tried changing the position of the gst_object_unref(appsrc); code.

    I shared the original code without changing the comments written in Korean. Below, I am sharing the final code again.

    The following error is still occurring: "[12148.061515] vdec 30210000.video-codec: Too many simultaneous instances: 32 (max: 32)"

    I would appreciate any advice on resolving this error.

    class ContinuousVideoSaver {
    public:
        ContinuousVideoSaver() : pipeline(nullptr), bus(nullptr), is_saving(false), running(false), timestamp(0) {
            gst_init(nullptr, nullptr);
        }
    
        ~ContinuousVideoSaver() {
            stop_and_cleanup();
        }
    
        void start_new_file() {
            if (is_saving) {
                printf("\n\n start_new_file stop and cleanup\n");
                stop_and_cleanup();
            }
            std::lock_guard<std::mutex> lock(mtx);
            
            // Generate the filename based on the current time
            auto now = std::chrono::high_resolution_clock::now();
            auto time_t_now = std::chrono::system_clock::to_time_t(now);
            std::tm* tm_now = std::localtime(&time_t_now);
            std::ostringstream oss;
            oss << std::put_time(tm_now, "%Y%m%d_%H%M%S");
            filename = "db/continuous_" + oss.str() + ".mp4";
    
            // Create the GStreamer pipeline
            std::string pipeline_str = "appsrc name=mysrc ! videoconvert ! v4l2h264enc ! video/x-h264,profile=baseline ! h264parse ! mp4mux ! filesink location=" + filename;
    
            std::cout << "start gst_parse_launch pipeline...\n";
            GError *error = nullptr;
            pipeline = gst_parse_launch(pipeline_str.c_str(), &error);
            if (error) {
                std::cerr << "Error creating GStreamer pipeline: " << error->message << std::endl;
                g_clear_error(&error);
                return;
            }
    
            std::cout << "start gst_bin_get_by_name appsrc...\n";
            // Retrieve the appsrc element
            appsrc = gst_bin_get_by_name(GST_BIN(pipeline), "mysrc");
    
            if (!appsrc) {
                std::cerr << "Could not find appsrc in the pipeline." << std::endl;
                gst_object_unref(pipeline);
                pipeline = nullptr;
                return;
            }
    
            std::cout << "start g_object_set gst_caps_new_simple...\n";
            // Set the appsrc properties
            g_object_set(appsrc,
                         "caps", gst_caps_new_simple("video/x-raw",
                                                     "format", G_TYPE_STRING, "BGR",
                                                     "width", G_TYPE_INT, 960,
                                                     "height", G_TYPE_INT, 540,
                                                     "framerate", GST_TYPE_FRACTION, 7, 1,
                                                     NULL),
                         "is-live", TRUE,
                         NULL);
    
    
            std::cout << "start gst_element_set_state GST_STATE_PLAYING...\n";
            // Set the pipeline to PLAY state
            GstStateChangeReturn ret = gst_element_set_state(pipeline, GST_STATE_PLAYING);
            if (ret == GST_STATE_CHANGE_FAILURE) {
                std::cerr << "Failed to start GStreamer pipeline." << std::endl;
                return;
            }
            
            std::cout << "start Done...\n";
            is_saving = true;
            timestamp = 0;  // Initialize timestamp
        }
        
        void save_continuous_frames() {
            running = true;
    
            while (running) {
                std::deque<std::tuple<std::chrono::high_resolution_clock::time_point, std::string, cv::Mat>> local_buffer;
                {
                    std::lock_guard<std::mutex> lock(mtx);
                    std::swap(local_buffer, continuous_buffer);
                }
                            
                for (const auto& frame : local_buffer) {
                    write_frame(std::get<2>(frame));
                }
                std::this_thread::sleep_for(std::chrono::milliseconds(33*4)); // Transmit frames at approximately 30fps
    
                if(g_running == false)
                {
                    std::cout << "Caught signal: Class stopping the program...\n";
                    break;
                }
            }
    
            if (is_saving) {
                printf("Class stop and cleanup\n");
                stop_and_cleanup();
            }
        }
    
        void add_frame(const cv::Mat& frame) {
            std::lock_guard<std::mutex> lock(mtx);
            continuous_buffer.emplace_back(std::chrono::high_resolution_clock::now(), "", frame);
        }
    
        void stop() {
            running = false;
            stop_and_cleanup();
        }
    
    private:
        GstElement *pipeline;
        GstElement* appsrc = nullptr;
        GstBus *bus;
        std::string filename;
        bool is_saving;
        bool running;
        guint64 timestamp;  // Variable for storing timestamp
        std::mutex mtx;
        std::deque<std::tuple<std::chrono::high_resolution_clock::time_point, std::string, cv::Mat>> continuous_buffer;
    
        void stop_and_cleanup() {
            std::lock_guard<std::mutex> lock(mtx);
            std::cout << "Stopping and cleaning up GStreamer pipeline...\n";
    
            if (pipeline) {
                // Send EOS signal
                if (appsrc) {
                    std::cout << "Sending EOS signal to appsrc...\n";
                    GstFlowReturn ret = gst_app_src_end_of_stream(GST_APP_SRC(appsrc));  // Send EOS signal
                    if (ret != GST_FLOW_OK) {
                        std::cerr << "Failed to send EOS signal to appsrc.\n";
                    }
                }
    
                // Wait for EOS or error message from the bus
                GstBus *bus = gst_element_get_bus(pipeline);
                if (bus) {
                    std::cout << "Waiting for EOS or error message from the bus...\n";
                    GstMessage *msg;
                    do {
                        msg = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE, 
                                                        static_cast<GstMessageType>(GST_MESSAGE_EOS | GST_MESSAGE_ERROR));
                        if (msg != nullptr) {
                            if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) {
                                GError *err;
                                gchar *debug_info;
                                gst_message_parse_error(msg, &err, &debug_info);
                                std::cerr << "Error received from pipeline: " << err->message << std::endl;
                                g_clear_error(&err);
                                g_free(debug_info);
                            }
                            gst_message_unref(msg);
                        }
                    } while (msg && GST_MESSAGE_TYPE(msg) != GST_MESSAGE_EOS);
    
                    gst_object_unref(bus);  // Unreference the bus
                }
    
                // Set the pipeline to NULL state to release all elements
                std::cout << "Setting pipeline to NULL state...\n";
                GstStateChangeReturn state_ret = gst_element_set_state(pipeline, GST_STATE_NULL);
                if (state_ret == GST_STATE_CHANGE_FAILURE) {
                    std::cerr << "Failed to set pipeline to NULL state.\n";
                } else {
                    std::cout << "Pipeline set to NULL state successfully.\n";
                }
    
                if (appsrc) {
                    gst_object_unref(appsrc);
                    appsrc = nullptr;
    
                }
    
                // Unreference the pipeline (internal elements will also be released)
                std::cout << "Unreferencing pipeline...\n";
                gst_object_unref(pipeline);
                pipeline = nullptr;
    
                // Wait to prevent memory leaks
                std::this_thread::sleep_for(std::chrono::milliseconds(500));  // Allow time for hardware resources to be released
    
                std::cout << "Pipeline and elements cleaned up.\n";
            } else {
                std::cerr << "Warning: pipeline is NULL, skipping cleanup.\n";
            }
        }
    
        void write_frame(const cv::Mat& frame) {
            std::lock_guard<std::mutex> lock(mtx);
    
            if (!is_saving || !pipeline || !appsrc) return;
    
            // Create a buffer from OpenCV
            cv::Mat bgr_frame;
            cv::cvtColor(frame, bgr_frame, cv::COLOR_YUV2BGR_UYVY);
            cv::Mat resized_frame;
            cv::resize(bgr_frame, resized_frame, cv::Size(960, 540));
    
            GstBuffer* buffer = gst_buffer_new_allocate(nullptr, resized_frame.total() * resized_frame.elemSize(), nullptr);
            GstMapInfo map;
            gst_buffer_map(buffer, &map, GST_MAP_WRITE);
            memcpy(map.data, resized_frame.data, resized_frame.total() * resized_frame.elemSize());
            gst_buffer_unmap(buffer, &map);
    
            // Set the timestamp
            GST_BUFFER_PTS(buffer) = timestamp;
            GST_BUFFER_DURATION(buffer) = gst_util_uint64_scale_int(1, GST_SECOND, 7);  // Assuming 30fps
            timestamp += GST_BUFFER_DURATION(buffer);
    
            // Push frame to GStreamer
            GstFlowReturn ret;
            g_signal_emit_by_name(appsrc, "push-buffer", buffer, &ret);
    
            gst_buffer_unref(buffer);
    
            if (ret != GST_FLOW_OK) {
                std::cerr << "Error pushing buffer to appsrc.\n";
                
                // If the pipeline is no longer functioning, stop and clean up
                stop_and_cleanup();
            }
        }
    };
    

  • I will be waiting for your reply.

  • Hi Hyunwoo,

    Can you also share your main code,
    where you are instantiating the class

    Also the error logs when writing 32nd file

    Regards
    Rahul T R

  • Attached is the test code.
    Please review it and advise me

    #include <gst/gst.h>
    #include <gst/app/gstappsink.h>
    #include <gst/app/gstappsrc.h>  // Added header file for GStreamer AppSrc
    #include <iostream>
    #include <fstream>
    #include <thread>
    #include <sys/stat.h>
    #include <opencv2/opencv.hpp>
    #include <fcntl.h>
    #include <sys/mman.h>
    #include <unistd.h>
    #include <cstring>
    #include <cstdlib> // Required to use the system function
    #include <sys/ipc.h>
    #include <sys/shm.h>
    #include <mutex>
    #include <chrono>
    #include <vector>
    #include <deque>
    #include <condition_variable>
    #include <numeric>  // Required to use std::accumulate
    
    #include <arpa/inet.h>
    
    /////////////////////////////
    #define PORT 65432
    #define SOCKET_BUFFER_SIZE 128
    
    int server_fd, new_socket;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    char socket_buffer[SOCKET_BUFFER_SIZE] = {0};
    ///////////////////////////
    
    #define SHM_FRAME_NAME "/s1cam_frame_shm"
    #define SHM_FRAME_SIZE (960 * 540 * 3)
    #define SHM_FOLDER_NAME "/s1cam_folder_shm"
    #define SHM_FOLDER_SIZE 256
    #define SHM_FILE_NAME "/s1cam_file_shm"
    #define SHM_FILE_SIZE 256
    
    std::mutex mtx;
    std::condition_variable cond_var;
    bool trigger_event = false;
    std::chrono::high_resolution_clock::time_point trigger_time;
    char trigger_time_name[SOCKET_BUFFER_SIZE] ={0,};
    
    int frame_shm_fd = shm_open(SHM_FRAME_NAME, O_CREAT | O_RDWR, 0666);
    int folder_shm_fd = shm_open(SHM_FOLDER_NAME, O_CREAT | O_RDWR, 0666);
    int file_shm_fd = shm_open(SHM_FILE_NAME, O_CREAT | O_RDWR, 0666);
    
    void* frame_shm_ptr = mmap(0, SHM_FRAME_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, frame_shm_fd, 0);
    void* folder_shm_ptr = mmap(0, SHM_FOLDER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, folder_shm_fd, 0);
    void* file_shm_ptr = mmap(0, SHM_FILE_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, file_shm_fd, 0);
    
    std::deque<std::tuple<std::chrono::high_resolution_clock::time_point, std::string, cv::Mat>> frame_buffer;  // 15fps 2MP trigger before and after 5 seconds
    std::deque<std::tuple<std::chrono::high_resolution_clock::time_point, std::string, cv::Mat>> continuous_buffer; // 7.5fps 960x540 continuous buffer
    const int buffer_duration = 10;  // 10 seconds buffer
    const int fps = 15;  // 15 FPS for 2MP
    const int continuous_fps = 7.5; // 7.5 FPS for continuous recording
    const int buffer_size = buffer_duration * fps;  // Total frames to store in buffer
    const int continuous_buffer_size = 2 * continuous_fps;  // Frames to store for 2 seconds
    static auto last_file_start_time = std::chrono::steady_clock::now();
    int cap_cnt = 0;
    
    bool g_running = true;  // Thread execution control variable
    // Global flag: Variable for handling program termination signals
    // std::atomic<bool> running(true);
    
    // Signal handler: Handles program termination with ctrl+c (SIGINT)
    void signal_handler(int signal) {
        std::cout << "Caught signal: " << signal << ", stopping the program...\n";
        g_running = false;
    }
    
    // Function to measure and print execution times
    #if 1
    std::vector<double> execution_times;
    std::mutex time_mtx;
    void measure_and_print_execution_times() {
        while (true) {
            std::this_thread::sleep_for(std::chrono::seconds(5));
    
            std::vector<double> local_times;
            {
                std::lock_guard<std::mutex> lock(time_mtx);
                local_times = execution_times;
                execution_times.clear();
            }
    
            if (!local_times.empty()) {
                double min_time = *std::min_element(local_times.begin(), local_times.end());
                double max_time = *std::max_element(local_times.begin(), local_times.end());
                double avg_time = std::accumulate(local_times.begin(), local_times.end(), 0.0) / local_times.size();
    
                std::cout << "Execution times over the last 5 seconds: "
                          << "Min: " << min_time << " ms, "
                          << "Max: " << max_time << " ms, "
                          << "Avg: " << avg_time << " ms" << std::endl;
            }
        }
    }
    #endif
    
    // Server initialization function
    void init_server() {
        // Create socket file descriptor
        if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
            perror("socket failed");
            exit(EXIT_FAILURE);
        }
    
        // Set socket options to allow port reuse
        if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
            perror("setsockopt");
            exit(EXIT_FAILURE);
        }
    
        address.sin_family = AF_INET;
        address.sin_addr.s_addr = INADDR_ANY;
        address.sin_port = htons(PORT);
    
        // Bind the socket to the port
        if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
            perror("bind failed");
            exit(EXIT_FAILURE);
        }
    
        // Listen for incoming connections
        if (listen(server_fd, 3) < 0) {
            perror("listen");
            exit(EXIT_FAILURE);
        }
    
        printf("Server initialized. Waiting for connections...\n");
    }
    
    // Client socket management function
    void manage_client_socket() {
        printf("Waiting for a new client connection...\n");
    
        // Accept incoming connection requests
        if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0) {
            perror("accept");
            exit(EXIT_FAILURE);
        }
    
        printf("Client connected.\n");
    }
    
    // Function to handle triggers
    void handle_trigger(const char *message) {
        printf("Received trigger: %s\n", message);
    
        {
            std::lock_guard<std::mutex> lock(mtx);
            trigger_event = true;
            trigger_time = std::chrono::high_resolution_clock::now();
            char file_name[SHM_FILE_SIZE] = "alarm";
    
            snprintf((char*)trigger_time_name, SOCKET_BUFFER_SIZE, "%s", message );
            printf("trigger_timename handle : %s\n",trigger_time_name);
    
            snprintf((char*)file_shm_ptr, SHM_FILE_SIZE, "%s", file_name);
        }
    
        // Additional processing logic can be written here.
    }
    
    // Message receiving function
    void receive_messages() {
        int valread;
    
        while (1) {
            memset(socket_buffer, 0, SOCKET_BUFFER_SIZE);
            valread = read(new_socket, socket_buffer, SOCKET_BUFFER_SIZE);
    
            // When the client disconnects
            if (valread == 0) {
                printf("Client disconnected.\n");
                close(new_socket);
                manage_client_socket();  // Wait for a new client connection
            } else if (valread > 0) {
                handle_trigger(socket_buffer);
            } else {
                perror("read");
                exit(EXIT_FAILURE);
            }
        }
    }
    
    void initialize_shared_memory() {
        char folder_name[SHM_FOLDER_SIZE] = "";
        char file_name[SHM_FILE_SIZE] = "image";
    
        snprintf((char*)folder_shm_ptr, SHM_FOLDER_SIZE, "%s", folder_name);
        snprintf((char*)file_shm_ptr, SHM_FILE_SIZE, "%s", file_name);
    
        struct stat info;
        if (stat("db", &info) != 0) {
            mkdir("db", 0777);
        }
    }
    
    class ContinuousVideoSaver {
    public:
        ContinuousVideoSaver() : pipeline(nullptr), bus(nullptr), is_saving(false), running(false), timestamp(0) {
            gst_init(nullptr, nullptr);
        }
    
        ~ContinuousVideoSaver() {
            stop_and_cleanup();
        }
    
        void start_new_file() {
            if (is_saving) {
                printf("\n\n start_new_file stop and cleanup\n");
                stop_and_cleanup();
            }
            std::lock_guard<std::mutex> lock(mtx);
            
            // Create a file name based on the current time
            auto now = std::chrono::high_resolution_clock::now();
            auto time_t_now = std::chrono::system_clock::to_time_t(now);
            std::tm* tm_now = std::localtime(&time_t_now);
            std::ostringstream oss;
            oss << std::put_time(tm_now, "%Y%m%d_%H%M%S");
            filename = "db/continuous_" + oss.str() + ".mp4";
    
            // Create GStreamer pipeline
            std::string pipeline_str = "appsrc name=mysrc ! videoconvert ! v4l2h264enc ! video/x-h264,profile=baseline ! h264parse ! mp4mux ! filesink location=" + filename;
    
            std::cout << "start gst_parse_launch pipeline...\n";
            GError *error = nullptr;
            pipeline = gst_parse_launch(pipeline_str.c_str(), &error);
            if (error) {
                std::cerr << "Error creating GStreamer pipeline: " << error->message << std::endl;
                g_clear_error(&error);
                return;
            }
    
            std::cout << "start gst_bin_get_by_name appsrc...\n";
            // Get appsrc element
            appsrc = gst_bin_get_by_name(GST_BIN(pipeline), "mysrc");
    
            if (!appsrc) {
                std::cerr << "Could not find appsrc in the pipeline." << std::endl;
                gst_object_unref(pipeline);
                pipeline = nullptr;
                return;
            }
    
            std::cout << "start g_object_set gst_caps_new_simple...\n";
            // Set properties for appsrc
            g_object_set(appsrc,
                         "caps", gst_caps_new_simple("video/x-raw",
                                                     "format", G_TYPE_STRING, "BGR",
                                                     "width", G_TYPE_INT, 960,
                                                     "height", G_TYPE_INT, 540,
                                                     "framerate", GST_TYPE_FRACTION, 7, 1,
                                                     NULL),
                         "is-live", TRUE,
                         NULL);
    
            std::cout << "start gst_element_set_state GST_STATE_PLAYING...\n";
            // Change the pipeline state to PLAY
            GstStateChangeReturn ret = gst_element_set_state(pipeline, GST_STATE_PLAYING);
            if (ret == GST_STATE_CHANGE_FAILURE) {
                std::cerr << "Failed to start GStreamer pipeline." << std::endl;
                return;
            }
            
            std::cout << "start Done...\n";
            is_saving = true;
            timestamp = 0;  // Initialize timestamp
        }
        
        void save_continuous_frames() {
            running = true;
    
            while (running) {
                std::deque<std::tuple<std::chrono::high_resolution_clock::time_point, std::string, cv::Mat>> local_buffer;
                {
                    std::lock_guard<std::mutex> lock(mtx);
                    std::swap(local_buffer, continuous_buffer);
                }
                            
                for (const auto& frame : local_buffer) {
                    write_frame(std::get<2>(frame));
                }
                std::this_thread::sleep_for(std::chrono::milliseconds(33*4)); // Transmit frames at approximately 30fps
    
                if(g_running == false)
                {
                    std::cout << "Caught signal: Class stopping the program...\n";
                    break;
                }
            }
    
            if (is_saving) {
                printf("Class stop and cleanup\n");
                stop_and_cleanup();
            }
        }
    
        void add_frame(const cv::Mat& frame) {
            std::lock_guard<std::mutex> lock(mtx);
            continuous_buffer.emplace_back(std::chrono::high_resolution_clock::now(), "", frame);
        }
    
        void stop() {
            running = false;
            stop_and_cleanup();
        }
    
    private:
        GstElement *pipeline;
        GstElement* appsrc = nullptr;
        GstBus *bus;
        std::string filename;
        bool is_saving;
        bool running;
        guint64 timestamp;  // Variable to store timestamp
        std::mutex mtx;
        std::deque<std::tuple<std::chrono::high_resolution_clock::time_point, std::string, cv::Mat>> continuous_buffer;
    
        void stop_and_cleanup() {
            std::lock_guard<std::mutex> lock(mtx);
            std::cout << "Stopping and cleaning up GStreamer pipeline...\n";
    
            if (pipeline) {
                // Send EOS signal
                if (appsrc) {
                    std::cout << "Sending EOS signal to appsrc...\n";
                    GstFlowReturn ret = gst_app_src_end_of_stream(GST_APP_SRC(appsrc));  // Send EOS signal
                    if (ret != GST_FLOW_OK) {
                        std::cerr << "Failed to send EOS signal to appsrc.\n";
                    }
                }
                // Wait to prevent memory leaks
                // std::this_thread::sleep_for(std::chrono::milliseconds(1000));  // Give time for hardware resources to be released.
    
                // Wait for EOS or error message from the bus
                GstBus *bus = gst_element_get_bus(pipeline);
                if (bus) {
                    std::cout << "Waiting for EOS or error message from the bus...\n";
                    GstMessage *msg;
                    do {
                        msg = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE, 
                                                        static_cast<GstMessageType>(GST_MESSAGE_EOS | GST_MESSAGE_ERROR));
                        if (msg != nullptr) {
                            if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) {
                                GError *err;
                                gchar *debug_info;
                                gst_message_parse_error(msg, &err, &debug_info);
                                std::cerr << "Error received from pipeline: " << err->message << std::endl;
                                g_clear_error(&err);
                                g_free(debug_info);
                            }
                            gst_message_unref(msg);
                        }
                    } while (msg && GST_MESSAGE_TYPE(msg) != GST_MESSAGE_EOS);
    
                    gst_object_unref(bus);  // Release bus reference
                }
    
                // Set pipeline to NULL state to automatically release all elements
                std::cout << "Setting pipeline to NULL state...\n";
                GstStateChangeReturn state_ret = gst_element_set_state(pipeline, GST_STATE_NULL);
                if (state_ret == GST_STATE_CHANGE_FAILURE) {
                    std::cerr << "Failed to set pipeline to NULL state.\n";
                } else {
                    std::cout << "Pipeline set to NULL state successfully.\n";
                }
    
                if (appsrc) {
                    gst_object_unref(appsrc);
                    appsrc = nullptr;
                }
    
                // Release the pipeline itself (internal elements will be automatically released)
                std::cout << "Unreferencing pipeline...\n";
                gst_object_unref(pipeline);
                pipeline = nullptr;
    
                // Wait to prevent memory leaks
                std::this_thread::sleep_for(std::chrono::milliseconds(500));  // Give time for hardware resources to be released.
    
                std::cout << "Pipeline and elements cleaned up.\n";
            } else {
                std::cerr << "Warning: pipeline is NULL, skipping cleanup.\n";
            }
        }
    
        void write_frame(const cv::Mat& frame) {
            std::lock_guard<std::mutex> lock(mtx);
    
            if (!is_saving || !pipeline || !appsrc) return;
    
            // Create GST_BUFFER from OpenCV
            // cv::Mat resized_frame;
            // cv::resize(frame, resized_frame, cv::Size(960, 540));
            cv::Mat bgr_frame;
            cv::cvtColor(frame, bgr_frame, cv::COLOR_YUV2BGR_UYVY);
            cv::Mat resized_frame;
            cv::resize(bgr_frame, resized_frame, cv::Size(960, 540));
    
            GstBuffer* buffer = gst_buffer_new_allocate(nullptr, resized_frame.total() * resized_frame.elemSize(), nullptr);
            GstMapInfo map;
            gst_buffer_map(buffer, &map, GST_MAP_WRITE);
            memcpy(map.data, resized_frame.data, resized_frame.total() * resized_frame.elemSize());
            gst_buffer_unmap(buffer, &map);
    
            // Set timestamp
            GST_BUFFER_PTS(buffer) = timestamp;
            GST_BUFFER_DURATION(buffer) = gst_util_uint64_scale_int(1, GST_SECOND, 7);  // Based on 30fps
            timestamp += GST_BUFFER_DURATION(buffer);
    
            // Push frame to GStreamer
            GstFlowReturn ret;
            
            g_signal_emit_by_name(appsrc, "push-buffer", buffer, &ret);
            if (ret != GST_FLOW_OK) {
                std::cerr << "Error pushing buffer to appsrc.\n";
            }
    
            gst_buffer_unref(buffer);
    
            if (ret != GST_FLOW_OK) {
                std::cerr << "Error pushing buffer to appsrc.\n";
                
                // If the pipeline is no longer functioning properly, stop and clean up the pipeline
                stop_and_cleanup();
            }
        }
    };
    
    ContinuousVideoSaver continuous_saver;
    
    void convert_and_save_frames_to_video(const std::deque<std::tuple<std::chrono::high_resolution_clock::time_point, std::string, cv::Mat>>& frames, const std::string& file_path, int fps, cv::Size frame_size) {
        std::string pipeline = "appsrc ! videoconvert ! v4l2h264enc ! video/x-h264,profile=baseline ! h264parse ! mp4mux ! filesink location=" + file_path;
    
        cv::VideoWriter writer(pipeline, cv::CAP_GSTREAMER, 0, fps, frame_size, true);
    
        if (!writer.isOpened()) {
            std::cerr << "Could not open the output video for write: " << file_path << std::endl;
            return;
        }
    
        std::string text_file_path = file_path.substr(0, file_path.find_last_of('.')) + ".txt";
        std::ofstream text_file(text_file_path);
        if (!text_file.is_open()) {
            std::cerr << "Could not open the output text file for write: " << text_file_path << std::endl;
            return;
        }
    
        for (const auto& frame : frames) {
            cv::Mat bgr_frame;
            cv::cvtColor(std::get<2>(frame), bgr_frame, cv::COLOR_YUV2BGR_UYVY);
            cv::Mat resized_frame;
            cv::resize(bgr_frame, resized_frame, frame_size);
            writer.write(resized_frame);
            text_file << std::get<1>(frame) << std::endl;
        }
    
        writer.release();
        text_file.close();
    
        std::cout << "Video saved to: " << file_path << std::endl;
        std::cout << "Text data saved to: " << text_file_path << std::endl;
    }
    
    void process_trigger() {
        char local_folder_name[SHM_FOLDER_SIZE];
        char local_file_name[SHM_FILE_SIZE];
        char filename[SOCKET_BUFFER_SIZE] = {0,};
        while(1) {
            std::deque<std::tuple<std::chrono::high_resolution_clock::time_point, std::string, cv::Mat>> frames_to_save;
            printf("process_trigger1\n");
    
            auto start_time_excution = std::chrono::high_resolution_clock::now();
            {
                std::unique_lock<std::mutex> lock(mtx);
                printf("process_trigger1##############\n");
                cond_var.wait(lock, [] { return trigger_event; });
                
                printf("process_trigger2\n");
                {
                    auto time_t_trigger = std::chrono::system_clock::to_time_t(trigger_time);
                    std::tm* tm_trigger = std::localtime(&time_t_trigger);
                    std::ostringstream oss;
                    oss << std::put_time(tm_trigger, "%Y%m%d_%H%M%S");
                    printf("trigger_timename process : %s\n",trigger_time_name);
                    snprintf(filename, sizeof(filename), "db/alarm_%s.mp4", (char*)trigger_time_name);
                    printf("filename : %s\n",filename);
                    snprintf(local_folder_name, sizeof(local_folder_name), "db/%s", (char*)folder_shm_ptr);
                    snprintf(local_file_name, sizeof(local_file_name), "%s", (char*)file_shm_ptr);
                }
    
                auto start_time = trigger_time - std::chrono::seconds(5);
                auto end_time = trigger_time + std::chrono::seconds(5);
    
                start_time_excution = std::chrono::high_resolution_clock::now();
                for (const auto& frame : frame_buffer) {
                    if (std::get<0>(frame) >= start_time && std::get<0>(frame) <= end_time) {
                        frames_to_save.push_back(frame);
                    }
                }
    
                trigger_event = false;
            }
    
            printf("process_trigger3\n");
            convert_and_save_frames_to_video(frames_to_save, filename, 15, cv::Size(1920, 1080));
            printf("process_trigger4\n");
            auto end_time_excution = std::chrono::high_resolution_clock::now();
            std::chrono::duration<double, std::milli> execution_time = end_time_excution - start_time_excution;
            std::cout << "Execution times  " << execution_time.count() << " ms, " << std::endl;
    
        }
    
    }
    int temp_cnt = 0;
    
    static GstFlowReturn on_new_sample(GstElement* sink, gpointer data) {
        GstSample* sample = nullptr;
        g_signal_emit_by_name(sink, "pull-sample", &sample);
        if (sample) {
            GstBuffer* buffer = gst_sample_get_buffer(sample);
            GstMapInfo map;
            gst_buffer_map(buffer, &map, GST_MAP_READ);
            auto start_time = std::chrono::high_resolution_clock::now(); // Measure start time
            cap_cnt++;
            if (cap_cnt % 2 == 0)  // Save at 15fps
            {
                std::lock_guard<std::mutex> lock(mtx);
    
                char frame_name[SHM_FILE_SIZE];
                strncpy(frame_name, (char*)file_shm_ptr, SHM_FILE_SIZE - 1);
                frame_name[SHM_FILE_SIZE - 1] = '\0';
    
                cv::Mat uyvy_frame(cv::Size(1920, 1080), CV_8UC2, (char*)map.data);
                auto frame_capture_time = std::chrono::high_resolution_clock::now();
                frame_buffer.push_back({ frame_capture_time, std::string(frame_name), uyvy_frame.clone() });
                if (frame_buffer.size() > buffer_size) {
                    frame_buffer.pop_front();
                }
    
                if (trigger_event) {
                    printf("trigger_event \n");
                    if(frame_capture_time > (trigger_time + std::chrono::seconds(5)))
                    {
                        printf("#########trigger_event \n");
                        cond_var.notify_one();
                    }
                }
            }
    
            auto now = std::chrono::steady_clock::now();
    
            // Start a new file every 1 minute
            // if (std::chrono::duration_cast<std::chrono::minutes>(now - last_file_start_time).count() >= 1) {
            //     printf("continuous file start!!\n");
            //     continuous_saver.start_new_file();
            //     last_file_start_time = now;
                
            // }
    
            // Start a new file every 30 seconds
            if (std::chrono::duration_cast<std::chrono::seconds>(now - last_file_start_time).count() >= 7) {
                printf("continuous file start %d!!\n",temp_cnt++);
                continuous_saver.start_new_file();
                last_file_start_time = now;
            }
    
            // Continuous saving buffer update
            if (cap_cnt % 4 == 0)  // Save at 7.5fps
            {
                cap_cnt = 0;
                auto now = std::chrono::steady_clock::now();
                auto frame_capture_time = std::chrono::high_resolution_clock::now();
    
                cv::Mat uyvy_frame(cv::Size(1920, 1080), CV_8UC2, (char*)map.data);
                {
                    std::lock_guard<std::mutex> lock(mtx);
                    // continuous_buffer.push_back({frame_capture_time, "continuous", uyvy_frame.clone()});
                    // if (continuous_buffer.size() > continuous_buffer_size) {
                    //     continuous_buffer.pop_front();
                    // }
                    continuous_saver.add_frame(uyvy_frame.clone());
                }
            }
            auto end_time = std::chrono::high_resolution_clock::now(); // Measure end time
            std::chrono::duration<double, std::milli> execution_time = end_time - start_time;
            {
                std::lock_guard<std::mutex> lock(time_mtx);
                execution_times.push_back(execution_time.count());
            }
    
            gst_buffer_unmap(buffer, &map);
            gst_sample_unref(sample);
            return GST_FLOW_OK;
        }
        return GST_FLOW_ERROR;
    }
    
    int main(int argc, char* argv[]) {
        gst_init(&argc, &argv);
        signal(SIGINT, signal_handler); // Set handler for SIGINT (ctrl+c)
    
        if (frame_shm_fd == -1 || folder_shm_fd == -1 || file_shm_fd == -1) {
            perror("shm_open");
            return 1;
        }
    
        ftruncate(frame_shm_fd, SHM_FRAME_SIZE);
        ftruncate(folder_shm_fd, SHM_FOLDER_SIZE);
        ftruncate(file_shm_fd, SHM_FILE_SIZE);
    
        if (frame_shm_ptr == MAP_FAILED || folder_shm_ptr == MAP_FAILED || file_shm_ptr == MAP_FAILED) {
            perror("mmap");
            close(frame_shm_fd);
            close(folder_shm_fd);
            close(file_shm_fd);
            return 1;
        }
    
        initialize_shared_memory();
    
        int ret = system("media-ctl -V '\"emtcam 1-003c\":0 [fmt:UYVY8_1X16/1920x1080 field:none]'");
        if (ret != 0) {
            std::cerr << "Failed to execute media-ctl command" << std::endl;
            return 1;
        }
    
        GstElement* pipeline = gst_parse_launch(
            "v4l2src device=/dev/video3 io-mode=5 ! "
            "video/x-raw,format=UYVY,width=1920,height=1080,framerate=30/1 ! tiovxmemalloc pool-size=8 ! "
            "appsink name=sink emit-signals=True sync=True max-buffers=30 drop=False",
            nullptr);
    
        GstElement* sink = gst_bin_get_by_name(GST_BIN(pipeline), "sink");
        g_signal_connect(sink, "new-sample", G_CALLBACK(on_new_sample), nullptr);
    
        gst_element_set_state(pipeline, GST_STATE_PLAYING);
    
        continuous_saver.start_new_file();
        std::thread process_thread(process_trigger);
        std::thread time_thread(measure_and_print_execution_times); // Execution time measurement thread
        std::thread continuous_save_thread(&ContinuousVideoSaver::save_continuous_frames, &continuous_saver);
    
        init_server();           // Initialize server
        manage_client_socket();  // Manage client connections
        std::thread socketrecv_thread(receive_messages);
    
        GMainLoop* loop = g_main_loop_new(nullptr, FALSE);
        g_main_loop_run(loop);
    
        gst_element_set_state(pipeline, GST_STATE_NULL);
        gst_object_unref(pipeline);
    
        g_running = false;  // Set thread termination flag
        process_thread.join();
        continuous_save_thread.join();
        socketrecv_thread.join();
    
        munmap(frame_shm_ptr, SHM_FRAME_SIZE);
        munmap(folder_shm_ptr, SHM_FOLDER_SIZE);
        munmap(file_shm_ptr, SHM_FILE_SIZE);
    
        close(frame_shm_fd);
        close(folder_shm_fd);
        close(file_shm_fd);
    
        shm_unlink(SHM_FRAME_NAME);
        shm_unlink(SHM_FOLDER_NAME);
        shm_unlink(SHM_FILE_NAME);
    
        return 0;
    }
    

    gstsave_20240828.cpp

  • I will be waiting for your reply.

  • Hi Hyunwoo,

    I am suspecting the issue is with max-buffers=30 that you have set in capture pipeline
    Can you change it to 60 and see if you are able to save more frames

    Regards
    Rahul T R

  • Hi Hyunwoo,

    Please find the attached sample application that I ran on my board to dump the camera captures into multiple files.

    I verified the below application with OV5640 camera connected on my board.

    gst-example-file.c
    #include <stdio.h>
    #include <stdlib.h>
    #include <gst/gst.h> 
    #include <gst/app/gstappsink.h> 
    
    static guint frame_count = 0;
    static GstElement *pipeline = NULL;
    static GstElement *source = NULL;
    static gboolean eos_sent = FALSE;
    
    static GstFlowReturn on_new_sample(GstAppSink *appsink, gpointer user_data) { 
        GstSample *sample; 
        GstBuffer *buffer; 
        GstMapInfo map; 
        gchar filename[64]; 
        FILE *file; 
        // Pull the sample from the appsink 
        sample = gst_app_sink_pull_sample(appsink); 
        if (!sample) { 
            return GST_FLOW_ERROR; 
        } 
        buffer = gst_sample_get_buffer(sample); 
        if (!buffer) { 
            gst_sample_unref(sample); return GST_FLOW_ERROR;
        } 
        // Map the buffer to access the data 
        if (gst_buffer_map(buffer, &map, GST_MAP_READ)) { 
            snprintf(filename, sizeof(filename), "frame_%u.h264", frame_count++); 
            // Write the buffer data to a file 
            file = fopen(filename, "wb");
            if (file) {
                fwrite(map.data, 1, map.size, file); 
                fclose(file);
            } 
            gst_buffer_unmap(buffer, &map); 
        } 
        
        gst_sample_unref(sample); 
        if (!eos_sent) {
            eos_sent = TRUE;
            gst_element_send_event(pipeline, gst_event_new_eos());
        }
        return GST_FLOW_OK; 
    } 
    int main(int argc, char *argv[]) 
    {
        GstBus *bus; GstMessage *msg; 
        GstStateChangeReturn ret; 
        GstCaps *caps; 
        // Initialize GStreamer 
        gst_init(&argc, &argv); 
        // Create GStreamer elements 
        pipeline = gst_parse_launch("v4l2src device=/dev/video2 ! video/x-raw, format=UYVY, width=640, height=480, framerate=30/1 ! videoconvert ! v4l2h264enc ! appsink name=sink", NULL); 
        source = gst_bin_get_by_name(GST_BIN(pipeline), "sink"); 
        g_object_set(source, "emit-signals", TRUE, NULL); 
        g_signal_connect(source, "new-sample", G_CALLBACK(on_new_sample), NULL); 
        // Start playing the pipeline 
        ret = gst_element_set_state(pipeline, GST_STATE_PLAYING); 
        if (ret == GST_STATE_CHANGE_FAILURE) { 
            g_printerr("Unable to set the pipeline to the playing state.\n"); 
            gst_object_unref(pipeline); 
            return -1; 
        } 
        // Wait for EOS or error message 
        bus = gst_element_get_bus(pipeline);
        while (TRUE) {
    
            msg = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE, GST_MESSAGE_ERROR | GST_MESSAGE_EOS); 
            // Free resources
            if (msg != NULL) { 
                switch (GST_MESSAGE_TYPE(msg)) {
                    case GST_MESSAGE_ERROR: {
                        GError *err;
                        gchar *debug_info;
                        gst_message_parse_error(msg, &err, &debug_info);
                        g_printerr("Error received: %s\n", err->message);
                        g_error_free(err);
                        g_free(debug_info);
                        break;
                    }
                    case GST_MESSAGE_EOS:
                        g_print(" EOS reached ! \n");
                        gst_element_set_state(pipeline, GST_STATE_NULL);
                        gst_element_set_state(pipeline, GST_STATE_PLAYING);
                        eos_sent = FALSE;
                        break;
                    default:
    		    break;
                }            
                gst_message_unref(msg);
                if (GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS && eos_sent) {
                    break;
                } 
            }
        } 
        gst_object_unref(bus); 
        gst_element_set_state(pipeline, GST_STATE_NULL);
        gst_object_unref(pipeline); 
        return 0; 
    } 
    

    Hope this help you proceed in your development.

    Best Regards,

    Suren

  • I will test it using the method you suggested and let you know the results.

    However, I have one question. Would the CPU load be the same when writing a file using fwrite versus using a GStreamer pipeline to handle the file writing?

  • Hi Hyunwoo,

    Were you able to test with the sample application?

    Best Regards,

    Suren

  • Sorry for the late reply. I couldn’t check earlier due to various reasons.

    It still doesn't work well. The code you suggested also encounters an error when eos is called 32 times. Does the error not occur on your end in TI after 32 calls? It functions correctly before the 32nd call.

    Please check this issue, and I would also appreciate any further suggestions you might have.

    if (!eos_sent) {
        eos_sent = TRUE;
        gst_element_send_event(pipeline, gst_event_new_eos());
        frame_count++;
    }
    ------------------------------------------------------------------    
    
    case GST_MESSAGE_EOS:
        g_print(" EOS reached ! (%d)\n", frame_count);
        gst_element_set_state(pipeline, GST_STATE_NULL);
        gst_element_set_state(pipeline, GST_STATE_PLAYING);
        eos_sent = FALSE;
        break;

  • Hi Hyunwoo,

    Can you try to use gst_bin_remove() api to remove the encoder after every sample is copied into a file.

    You can refer the below code where I am dynamically changing the resolution by removing the previous encoder instance and recreating one.

    #include <gst/gst.h>
    
    #include <gst/pbutils/pbutils.h>
    
    
    
    static gboolean change_encoder_resolution(GstElement *pipeline, int width, int height) {
    
        GstElement *encoder = gst_bin_get_by_name(GST_BIN(pipeline), "video_encoder");
    
        
    
        if (!encoder) {
    
            g_printerr("Encoder not found!\n");
    
            return FALSE;
    
        }
    
    
    
        // Stop the pipeline
    
        gst_element_set_state(pipeline, GST_STATE_PAUSED);
    
    
    
        // Remove the encoder from the pipeline
    
        gst_bin_remove(GST_BIN(pipeline), encoder);
    
    
    
        // Create a new encoder with the new resolution
    
        GstElement *new_encoder = gst_element_factory_make("x264enc", "video_encoder");
    
        g_object_set(new_encoder, "width", width, "height", height, NULL);
    
    
    
        // Add the new encoder to the pipeline
    
        gst_bin_add(GST_BIN(pipeline), new_encoder);
    
        
    
        // Link the elements (assuming a specific structure, adjust as necessary)
    
        // e.g., source -> encoder -> sink
    
        GstElement *source = gst_bin_get_by_name(GST_BIN(pipeline), "video_source");
    
        GstElement *sink = gst_bin_get_by_name(GST_BIN(pipeline), "video_sink");
    
    
    
        gst_element_link(source, new_encoder);
    
        gst_element_link(new_encoder, sink);
    
    
    
        // Set the pipeline to playing
    
        gst_element_set_state(pipeline, GST_STATE_PLAYING);
    
    
    
        gst_object_unref(encoder);
    
        gst_object_unref(new_encoder);
    
        gst_object_unref(source);
    
        gst_object_unref(sink);
    
    
    
        return TRUE;
    
    }
    
    
    
    int main(int argc, char *argv[]) {
    
        gst_init(&argc, &argv);
    
    
    
        // Create a GStreamer pipeline
    
        GstElement *pipeline = gst_parse_launch("videotestsrc name=video_source ! video/x-raw,format=I420 ! x264enc name=video_encoder ! mp4mux ! filesink location=output.mp4", NULL);
    
    
    
        // Start the pipeline
    
        gst_element_set_state(pipeline, GST_STATE_PLAYING);
    
    
    
        // Change the resolution after some time (e.g., after 5 seconds)
    
        g_usleep(5000000);  // 5 seconds
    
        change_encoder_resolution(pipeline, 1280, 720);
    
    
    
        // Let the pipeline run for a while
    
        g_usleep(5000000);  // 5 seconds
    
    
    
        // Clean up
    
        gst_element_set_state(pipeline, GST_STATE_NULL);
    
        gst_object_unref(pipeline);
    
        return 0;
    
    }
    
    

    Hope this help you debug further. 

    Apologize for the delay in responding.

    Best Regards,

    Suren