Tool/software:
Hello Guys,
I am doing ADAS video recording on AM62A with two saving modes:
-
Event-based (triggered by something like distance threshold or detection)
-
Loop-based (continuous recording, overwriting oldest files to maintain limited storage)
Given below is the portion from post_process.py in the apps_python. The current file size is about 18mb, and I want to reduce it for cloud based saving. What's the solution?
def save_event_video(self, frames):
try:
if not frames:
print("[PostProcessDetection] No frames to save.")
return
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
out_path = os.path.join(self.output_dir, f"event_{timestamp}.avi")
height, width = frames[0].shape[:2]
writer = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'MJPG'), 24, (width, height))
if not writer.isOpened():
print(f"[ERROR] Failed to open VideoWriter for {out_path}")
return
for i, f in enumerate(frames):
if f is None or f.size == 0:
print(f"[PostProcessDetection] Skipping empty frame {i}")
continue
writer.write(f)
writer.release()
print(f"[PostProcessDetection] Event video saved: {out_path}")
except Exception as e:
print(f"[PostProcessDetection] Error saving video: {e}")
finally:
with self.record_lock:
self.recording = False
def loop_video_worker(self):
loop_index = 0
while self.loop_recording:
time.sleep(5)
if len(self.frame_buffer) < 10:
continue # Wait until buffer has enough frames
frames = self.frame_buffer.copy()
out_path = os.path.join(self.output_dir, f"loop_{loop_index % 10}.avi")
loop_index += 1
try:
height, width = frames[0].shape[:2]
writer = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'MJPG'), 24, (width, height))
if not writer.isOpened():
print(f"[Loop] Failed to open writer for: {out_path}")
continue
written = 0
for i, f in enumerate(frames):
if f is None or f.size == 0:
print(f"[Loop] Skipping empty frame {i}")
continue
writer.write(f)
written += 1
writer.release()
if written > 0:
print(f"[Loop] Saved: {out_path}")
else:
os.remove(out_path)
print(f"[Loop] Removed empty loop file: {out_path}")
except Exception as e:
print(f"[Loop] Exception: {e}"