Tool/software:
Hi Team,
I'm trying to do a simple program that is able to take a picture using an imx219 sensor and placing it into a buffer.
I don't have much experience with gstreamer but based on the example python app I was able to create this code:
import gi import numpy as np import cv2 from gi.repository import Gst, GLib, GObject # Initialize GStreamer gi.require_version("Gst", "1.0") gi.require_version("GstApp", "1.0") from gi.repository import Gst Gst.init(None) class GstPipe: def __init__(self, src_pipe): """ Initialize the GStreamer pipeline. Args: src_pipe: GStreamer pipeline source (e.g., camera or file). """ self.src_pipe = src_pipe self.frame_buffer = None self.frame_count = 0 # Track the number of frames captured def start(self): """ Start the GStreamer source pipeline. """ ret = self.src_pipe.set_state(Gst.State.PLAYING) if ret == Gst.StateChangeReturn.FAILURE: bus = self.src_pipe.get_bus() msg = bus.timed_pop_filtered(Gst.CLOCK_TIME_NONE, Gst.MessageType.ERROR) err, _ = msg.parse_error() print("[ERROR] Failed to start pipeline:", err.message) exit(1) print("[INFO] GStreamer pipeline started.") def on_new_sample(self, appsink, data): # print("[INFO] New sample received.") sample = appsink.emit("pull-sample") if not isinstance(sample, Gst.Sample): print("[ERROR] No valid sample received.") return Gst.FlowReturn.ERROR buffer = sample.get_buffer() success, map_info = buffer.map(Gst.MapFlags.READ) if not success: print("[ERROR] Could not map buffer") return Gst.FlowReturn.ERROR # Get frame dimensions from the caps caps = sample.get_caps() width = caps.get_structure(0).get_value("width") height = caps.get_structure(0).get_value("height") # Log the buffer size for debugging buffer_size = buffer.get_size() # print(f"[DEBUG] Buffer size: {buffer_size} bytes") # print(f"[DEBUG] Frame size: {width}x{height}") # Create a NumPy array with the Bayer frame data self.frame_buffer = np.ndarray((height, width), dtype=np.uint8, buffer=map_info.data) buffer.unmap(map_info) # Only convert if frame_buffer is valid if self.frame_buffer is not None and self.frame_buffer.size > 0: rgb_frame = cv2.cvtColor(self.frame_buffer, cv2.COLOR_BAYER_RG2BGR) # Use correct conversion for your Bayer pattern self.frame_buffer = rgb_frame # Update frame buffer with RGB frame else: print("[ERROR] Frame buffer is empty after conversion.") self.frame_count += 1 # Increment frame count # print(f"[INFO] Captured frame {self.frame_count} (Width: {width}, Height: {height})") return Gst.FlowReturn.OK def stop(self): """ Stop the GStreamer pipeline. """ print("[INFO] Stopping GStreamer pipeline.") self.src_pipe.set_state(Gst.State.NULL) def build_gst_pipeline(input_config): """ Create a GStreamer pipeline based on the input configuration. Args: input_config: Dictionary of input parameters from the configuration. Returns: pipeline: The constructed GStreamer pipeline. appsink: The appsink element to pull frames from. """ source = input_config["source"] width = input_config["width"] height = input_config["height"] framerate = input_config["framerate"] format = input_config["format"] # Construct the GStreamer pipeline based on input type (camera or file) pipeline_description = ( "v4l2src device=/dev/video-imx219-cam0 io-mode=5 ! queue leaky=2 ! video/x-bayer, width=1920, height=1080, format=rggb ! " "tiovxisp sensor-name=SENSOR_SONY_IMX219_RPI dcc-isp-file=/opt/imaging/imx219/linear/dcc_viss.bin format-msb=7 " "sink_0::dcc-2a-file=/opt/imaging/imx219/linear/dcc_2a.bin sink_0::device=/dev/v4l-imx219-subdev0 ! video/x-raw, format=NV12 ! " "appsink name=appsink" ) print(f"[INFO] GStreamer pipeline: {pipeline_description}") pipeline = Gst.parse_launch(pipeline_description) appsink = pipeline.get_by_name("appsink") appsink.set_property("emit-signals", True) appsink.set_property("max-buffers", 1) appsink.set_property("drop", True) return pipeline, appsink def save_frame_as_image(frame, filename): """ Save a single frame as an image file. Args: frame: The frame to save (NumPy array). filename: The output filename (e.g., 'frame.jpg'). """ if frame is not None and frame.size > 0: cv2.imwrite(filename, frame) print(f"[INFO] Frame saved as {filename}") else: print("[ERROR] No valid frame to save.") import time import time def capture_frames(gst_pipe, num_frames, delay=1): """ Capture a specific number of frames with a delay between captures. Args: gst_pipe: The GstPipe object managing the GStreamer pipeline. num_frames: The number of frames to capture. delay: Time in seconds to wait between frame captures. """ saved_frames = 0 # Track the number of successfully saved frames # Run until we've saved the desired number of frames while saved_frames < num_frames: # Process pending events in the main context GLib.MainContext.default().iteration(False) # Only process if frame_buffer has valid data if gst_pipe.frame_buffer is not None: filename = f"frame_{saved_frames}.jpg" save_frame_as_image(gst_pipe.frame_buffer, filename) gst_pipe.frame_buffer = None # Reset the buffer after saving saved_frames += 1 # Increment the saved frame count print(f"[INFO] Successfully saved frame {saved_frames} as {filename}") # Wait for the specified delay after saving the frame time.sleep(delay) # If frame_buffer is None, you can add a small sleep to avoid busy-waiting else: print(f"[WARNING] Frame buffer is None; skipping save.") time.sleep(0.1) # Add a small delay to avoid tight loop print("[INFO] Finished capturing frames.") def main(): # Hardcoded configuration (similar to the YAML file) config = { "inputs": { "input0": { "source": "/dev/video-imx219-cam0", "subdev-id": "/dev/v4l-imx219-subdev0", "width": 1920, "height": 1080, "format": "rggb", "framerate": 30, }, }, } # Select the input0 configuration from the hardcoded config input_config = config["inputs"]["input0"] # Build the GStreamer pipeline based on the input configuration pipeline, appsink = build_gst_pipeline(input_config) # Initialize the GstPipe wrapper gst_pipe = GstPipe(pipeline) # Connect the new-sample signal to the callback appsink.connect("new-sample", gst_pipe.on_new_sample, appsink) # Start the GStreamer pipeline gst_pipe.start() # Start frame capture process try: capture_frames(gst_pipe, 10, delay=2) # Capture 20 frames with a 2-second delay between each frame except KeyboardInterrupt: print("Exiting...") finally: # Stop the pipeline after capture gst_pipe.stop() if __name__ == "__main__": main()
This code has the problem that the images taken are in black and white: This is probably due to the pipeline:
pipeline_description = ( "v4l2src device=/dev/video-imx219-cam0 io-mode=5 ! queue leaky=2 ! video/x-bayer, width=1920, height=1080, format=rggb ! " "tiovxisp sensor-name=SENSOR_SONY_IMX219_RPI dcc-isp-file=/opt/imaging/imx219/linear/dcc_viss.bin format-msb=7 " "sink_0::dcc-2a-file=/opt/imaging/imx219/linear/dcc_2a.bin sink_0::device=/dev/v4l-imx219-subdev0 ! video/x-raw, format=NV12 ! " "appsink name=appsink" )
In another test I was able to save the images in color using this code:
from gi.repository import Gst, GLib, GObject
def stop_pipeline(pipeline, loop):
print("[INFO] Stopping pipeline...")
pipeline.set_state(Gst.State.NULL)
loop.quit()
def main():
Gst.init(None)
# Define the GStreamer pipeline description
pipeline_description = (
"v4l2src device=/dev/video-imx219-cam0 io-mode=5 ! queue leaky=2 ! video/x-bayer, width=3280, height=2464, format=rggb, framerate=30/1 ! "
"tiovxisp sensor-name=SENSOR_SONY_IMX219_RPI dcc-isp-file=/opt/imaging/imx219/linear/dcc_viss.bin format-msb=7 "
"sink_0::dcc-2a-file=/opt/imaging/imx219/linear/dcc_2a.bin sink_0::device=/dev/v4l-imx219-subdev0 ! video/x-raw, format=NV12, framerate=30/1 ! "
"videoconvert ! jpegenc ! filesink location=/home/casemiro_v2/tests/test_camera/captured_image.jpg"
)
# Create the GStreamer pipeline
pipeline = Gst.parse_launch(pipeline_description)
# Create a GLib Main Loop to run the pipeline
loop = GLib.MainLoop()
# Set the pipeline to the PLAYING state
pipeline.set_state(Gst.State.PLAYING)
# Set a timer to stop the pipeline after a short delay (e.g., 1 second)
GLib.timeout_add_seconds(1, stop_pipeline, pipeline, loop)
try:
print("[INFO] Pipeline started, capturing image...")
loop.run()
print("[INFO] Image captured successfully.")
except KeyboardInterrupt:
print("[INFO] Interrupted by user, stopping...")
pipeline.set_state(Gst.State.NULL)
if __name__ == "__main__":
main()
Which has the pipeline:
pipeline_description = (
"v4l2src device=/dev/video-imx219-cam0 io-mode=5 ! queue leaky=2 ! video/x-bayer, width=3280, height=2464, format=rggb, framerate=30/1 ! "
"tiovxisp sensor-name=SENSOR_SONY_IMX219_RPI dcc-isp-file=/opt/imaging/imx219/linear/dcc_viss.bin format-msb=7 "
"sink_0::dcc-2a-file=/opt/imaging/imx219/linear/dcc_2a.bin sink_0::device=/dev/v4l-imx219-subdev0 ! video/x-raw, format=NV12, framerate=30/1 ! "
"videoconvert ! jpegenc ! filesink location=/home/casemiro_v2/tests/test_camera/captured_image.jpg"
)
One of the problems I am encountering is that I am not being able to use the structure of the second pipeline to save images in color onto a buffer.
The other issue is that despite the sensor being capable of having a better resolution (3280 x 2464), when changing these values in the pipeline, the images are not being taken properly.
I would be glad of any help that could be offered.
Thanks,
Joaquin Perez