Because of the Thanksgiving holiday in the U.S., TI E2E™ design support forum responses may be delayed from November 25 through December 2. Thank you for your patience.

SK-AM62A-LP: Problem taking images properly with the imx219 using gstreamer

Part Number: SK-AM62A-LP

Tool/software:

Hi Team,

I'm trying to do a simple program that is able to take a picture using an imx219 sensor and placing it into a buffer. 

I don't have much experience with gstreamer but based on the example python app I was able to create this code:

import gi
import numpy as np
import cv2
from gi.repository import Gst, GLib, GObject

# Initialize GStreamer
gi.require_version("Gst", "1.0")
gi.require_version("GstApp", "1.0")
from gi.repository import Gst

Gst.init(None)

class GstPipe:
    def __init__(self, src_pipe):
        """
        Initialize the GStreamer pipeline.
        Args:
            src_pipe: GStreamer pipeline source (e.g., camera or file).
        """
        self.src_pipe = src_pipe
        self.frame_buffer = None
        self.frame_count = 0  # Track the number of frames captured

    def start(self):
        """
        Start the GStreamer source pipeline.
        """
        ret = self.src_pipe.set_state(Gst.State.PLAYING)
        if ret == Gst.StateChangeReturn.FAILURE:
            bus = self.src_pipe.get_bus()
            msg = bus.timed_pop_filtered(Gst.CLOCK_TIME_NONE, Gst.MessageType.ERROR)
            err, _ = msg.parse_error()
            print("[ERROR] Failed to start pipeline:", err.message)
            exit(1)
        print("[INFO] GStreamer pipeline started.")

    def on_new_sample(self, appsink, data):
        # print("[INFO] New sample received.")
        sample = appsink.emit("pull-sample")
        if not isinstance(sample, Gst.Sample):
            print("[ERROR] No valid sample received.")
            return Gst.FlowReturn.ERROR

        buffer = sample.get_buffer()
        success, map_info = buffer.map(Gst.MapFlags.READ)
        if not success:
            print("[ERROR] Could not map buffer")
            return Gst.FlowReturn.ERROR

        # Get frame dimensions from the caps
        caps = sample.get_caps()
        width = caps.get_structure(0).get_value("width")
        height = caps.get_structure(0).get_value("height")

        # Log the buffer size for debugging
        buffer_size = buffer.get_size()
        # print(f"[DEBUG] Buffer size: {buffer_size} bytes")
        # print(f"[DEBUG] Frame size: {width}x{height}")

        # Create a NumPy array with the Bayer frame data
        self.frame_buffer = np.ndarray((height, width), dtype=np.uint8, buffer=map_info.data)
        buffer.unmap(map_info)

        # Only convert if frame_buffer is valid
        if self.frame_buffer is not None and self.frame_buffer.size > 0:
            rgb_frame = cv2.cvtColor(self.frame_buffer, cv2.COLOR_BAYER_RG2BGR)  # Use correct conversion for your Bayer pattern
            self.frame_buffer = rgb_frame  # Update frame buffer with RGB frame
        else:
            print("[ERROR] Frame buffer is empty after conversion.")

        self.frame_count += 1  # Increment frame count
        # print(f"[INFO] Captured frame {self.frame_count} (Width: {width}, Height: {height})")

        return Gst.FlowReturn.OK
    


    def stop(self):
        """
        Stop the GStreamer pipeline.
        """
        print("[INFO] Stopping GStreamer pipeline.")
        self.src_pipe.set_state(Gst.State.NULL)


def build_gst_pipeline(input_config):
    """
    Create a GStreamer pipeline based on the input configuration.
    Args:
        input_config: Dictionary of input parameters from the configuration.
    Returns:
        pipeline: The constructed GStreamer pipeline.
        appsink: The appsink element to pull frames from.
    """
    source = input_config["source"]
    width = input_config["width"]
    height = input_config["height"]
    framerate = input_config["framerate"]
    format = input_config["format"]

    # Construct the GStreamer pipeline based on input type (camera or file)

    pipeline_description = (
        "v4l2src device=/dev/video-imx219-cam0 io-mode=5 ! queue leaky=2 ! video/x-bayer, width=1920, height=1080, format=rggb ! "
        "tiovxisp sensor-name=SENSOR_SONY_IMX219_RPI dcc-isp-file=/opt/imaging/imx219/linear/dcc_viss.bin format-msb=7 "
        "sink_0::dcc-2a-file=/opt/imaging/imx219/linear/dcc_2a.bin sink_0::device=/dev/v4l-imx219-subdev0 ! video/x-raw, format=NV12 ! "
        "appsink name=appsink"
    )

    print(f"[INFO] GStreamer pipeline: {pipeline_description}")
    pipeline = Gst.parse_launch(pipeline_description)
    appsink = pipeline.get_by_name("appsink")
    appsink.set_property("emit-signals", True)
    appsink.set_property("max-buffers", 1)
    appsink.set_property("drop", True)

    return pipeline, appsink


def save_frame_as_image(frame, filename):
    """
    Save a single frame as an image file.
    Args:
        frame: The frame to save (NumPy array).
        filename: The output filename (e.g., 'frame.jpg').
    """
    if frame is not None and frame.size > 0:
        cv2.imwrite(filename, frame)
        print(f"[INFO] Frame saved as {filename}")
    else:
        print("[ERROR] No valid frame to save.")


import time

import time

def capture_frames(gst_pipe, num_frames, delay=1):
    """
    Capture a specific number of frames with a delay between captures.
    Args:
        gst_pipe: The GstPipe object managing the GStreamer pipeline.
        num_frames: The number of frames to capture.
        delay: Time in seconds to wait between frame captures.
    """
    saved_frames = 0  # Track the number of successfully saved frames

    # Run until we've saved the desired number of frames
    while saved_frames < num_frames:
        # Process pending events in the main context
        GLib.MainContext.default().iteration(False)

        # Only process if frame_buffer has valid data
        if gst_pipe.frame_buffer is not None:
            filename = f"frame_{saved_frames}.jpg"
            save_frame_as_image(gst_pipe.frame_buffer, filename)
            gst_pipe.frame_buffer = None  # Reset the buffer after saving
            saved_frames += 1  # Increment the saved frame count
            print(f"[INFO] Successfully saved frame {saved_frames} as {filename}")

            # Wait for the specified delay after saving the frame
            time.sleep(delay)

        # If frame_buffer is None, you can add a small sleep to avoid busy-waiting
        else:
            print(f"[WARNING] Frame buffer is None; skipping save.")
            time.sleep(0.1)  # Add a small delay to avoid tight loop

    print("[INFO] Finished capturing frames.")



def main():
    # Hardcoded configuration (similar to the YAML file)
    config = {
        "inputs": {
            "input0": {
                "source": "/dev/video-imx219-cam0",
                "subdev-id": "/dev/v4l-imx219-subdev0",
                "width": 1920,
                "height": 1080,
                "format": "rggb",
                "framerate": 30,
            },
        },
    }

    # Select the input0 configuration from the hardcoded config
    input_config = config["inputs"]["input0"]

    # Build the GStreamer pipeline based on the input configuration
    pipeline, appsink = build_gst_pipeline(input_config)

    # Initialize the GstPipe wrapper
    gst_pipe = GstPipe(pipeline)

    # Connect the new-sample signal to the callback
    appsink.connect("new-sample", gst_pipe.on_new_sample, appsink)

    # Start the GStreamer pipeline
    gst_pipe.start()

    # Start frame capture process
    try:
        capture_frames(gst_pipe, 10, delay=2)  # Capture 20 frames with a 2-second delay between each frame
    except KeyboardInterrupt:
        print("Exiting...")
    finally:
        # Stop the pipeline after capture
        gst_pipe.stop()


if __name__ == "__main__":
    main()

This code has the problem that the images taken are in black and white: This is probably due to the pipeline:

pipeline_description = (
    "v4l2src device=/dev/video-imx219-cam0 io-mode=5 ! queue leaky=2 ! video/x-bayer, width=1920, height=1080, format=rggb ! "
    "tiovxisp sensor-name=SENSOR_SONY_IMX219_RPI dcc-isp-file=/opt/imaging/imx219/linear/dcc_viss.bin format-msb=7 "
    "sink_0::dcc-2a-file=/opt/imaging/imx219/linear/dcc_2a.bin sink_0::device=/dev/v4l-imx219-subdev0 ! video/x-raw, format=NV12 ! "
    "appsink name=appsink"
)

In another test I was able to save the images in color using this code:

from gi.repository import Gst, GLib, GObject

def stop_pipeline(pipeline, loop):
    print("[INFO] Stopping pipeline...")
    pipeline.set_state(Gst.State.NULL)
    loop.quit()

def main():
    Gst.init(None)

    # Define the GStreamer pipeline description
    pipeline_description = (
        "v4l2src device=/dev/video-imx219-cam0 io-mode=5 ! queue leaky=2 ! video/x-bayer, width=3280, height=2464, format=rggb, framerate=30/1 ! "
        "tiovxisp sensor-name=SENSOR_SONY_IMX219_RPI dcc-isp-file=/opt/imaging/imx219/linear/dcc_viss.bin format-msb=7 "
        "sink_0::dcc-2a-file=/opt/imaging/imx219/linear/dcc_2a.bin sink_0::device=/dev/v4l-imx219-subdev0 ! video/x-raw, format=NV12, framerate=30/1 ! "
        "videoconvert ! jpegenc ! filesink location=/home/casemiro_v2/tests/test_camera/captured_image.jpg"
    )

    # Create the GStreamer pipeline
    pipeline = Gst.parse_launch(pipeline_description)
    
    # Create a GLib Main Loop to run the pipeline
    loop = GLib.MainLoop()

    # Set the pipeline to the PLAYING state
    pipeline.set_state(Gst.State.PLAYING)

    # Set a timer to stop the pipeline after a short delay (e.g., 1 second)
    GLib.timeout_add_seconds(1, stop_pipeline, pipeline, loop)
    
    try:
        print("[INFO] Pipeline started, capturing image...")
        loop.run()
        print("[INFO] Image captured successfully.")
    except KeyboardInterrupt:
        print("[INFO] Interrupted by user, stopping...")
        pipeline.set_state(Gst.State.NULL)

if __name__ == "__main__":
    main()

Which has the pipeline:

pipeline_description = (
    "v4l2src device=/dev/video-imx219-cam0 io-mode=5 ! queue leaky=2 ! video/x-bayer, width=3280, height=2464, format=rggb, framerate=30/1 ! "
    "tiovxisp sensor-name=SENSOR_SONY_IMX219_RPI dcc-isp-file=/opt/imaging/imx219/linear/dcc_viss.bin format-msb=7 "
    "sink_0::dcc-2a-file=/opt/imaging/imx219/linear/dcc_2a.bin sink_0::device=/dev/v4l-imx219-subdev0 ! video/x-raw, format=NV12, framerate=30/1 ! "
    "videoconvert ! jpegenc ! filesink location=/home/casemiro_v2/tests/test_camera/captured_image.jpg"
)

 
One of the problems I am encountering is that I am not being able to use the structure of the second pipeline to save images in color onto a buffer.
The other issue is that despite the sensor being capable of having a better resolution (3280 x 2464), when changing these values in the pipeline, the images are not being taken properly.

I would be glad of any help that could be offered.

Thanks,
Joaquin Perez

  • Hi Joaquin,

    The other issue is that despite the sensor being capable of having a better resolution (3280 x 2464), when changing these values in the pipeline, the images are not being taken properly.

    You'll need to configure the sensor format as well as other entities in the media pipe. You can edit file /opt/edgeai-gst-apps/scripts/setup_cameras.sh and change the IMX219 format as below:

    IMX219_CAM_FMT="${IMX219_CAM_FMT:-[fmt:SRGGB8_1X8/3280x2464]}".

    Then run this script and you should be able to capture images at resolution 3280 x 2464.

    Regards,

    Jianzhong

  • Thanks a lot! That solved the issue of the resolution.

    Now, about my other question, is there a way to modify the first pipeline so that it takes color images?

  • Please try the following pipeline to save color images:

    gst-launch-1.0 -v v4l2src num-buffers=5 device=/dev/video-imx219-cam0 io-mode=dmabuf-import ! \
    video/x-bayer, width=1920, height=1080, framerate=30/1, format=rggb10 ! \
    tiovxisp sink_0::device=/dev/v4l-imx219-subdev0 \
    sensor-name="SENSOR_SONY_IMX219_RPI" \
    dcc-isp-file=/opt/imaging/imx219/linear/dcc_viss_10b.bin \
    sink_0::dcc-2a-file=/opt/imaging/imx219/linear/dcc_2a_10b.bin format-msb=9 ! \
    video/x-raw, format=NV12, width=1920, height=1080, framerate=30/1 ! \
    jpegenc ! multifilesink location="imx219-image-%d.jpg"