Source code for nicetoolbox.detectors.data_handlers.video_handler

"""
Video/frame data handler for the NICE Toolbox.

Handles frame extraction from video files and preparation of image sequences.
Also owns camera calibration loading since calibration is video-specific.
"""

import json
import logging
import os
from pathlib import Path
from typing import Any, Dict, Optional

import numpy as np

from nicetoolbox_core.input_recipes import VideoInputRecipe

from ...configs.models.video_timestamp import timestamp_to_frame_index
from ...configs.video_runtime_config import SequenceRuntimeConfig
from ...utils import video as vid
from ...utils.logging_utils import log_with_underscore
from ..in_out import SequenceIO
from .handler import BaseModalityHandler

FILENAME_TEMPLATE = "{idx:09d}.png"


[docs]class VideoDataHandler(BaseModalityHandler): """ Handles video/frame data preparation. Responsibilities: - Validate camera names and locate one video file per camera - Validate that all cameras share the same FPS and frame count - Extract frames from video files (mp4, mov) - Validate existing frame sequences - Generate input recipes for frame loaders - Load camera calibration data """ def __init__(self, io: SequenceIO, sequence_context: SequenceRuntimeConfig): # Shared fields super().__init__(io, sequence_context) # Video-specific state self.start_frame_index = self.dataset_properties.start_frame_index # Resolved during prepare() self.camera_video_paths: Optional[Dict[str, Path]] = None self.calibration: Optional[Dict[str, Any]] = None @property def modality_name(self) -> str: return "video"
[docs] def prepare(self) -> None: log_with_underscore("Preparing Video Modality...") # Validate camera names not empty if not self.all_camera_names: raise ValueError("No camera names provided.") for name in self.all_camera_names: if not name or not name.strip(): raise ValueError(f"Invalid camera name {name!r}") # Find exactly one video per camera (recursive, exact name match) self.camera_video_paths = self._find_video_paths() # Probe all videos, check cross-camera consistency, then validate against config self.fps, self.length_frames = self._resolve_fps_and_length() self.start_frame = timestamp_to_frame_index(self.sequence_context.video_start, self.fps) # Check and create input data if necessary self._input_data_creation() # Load camera calibration if available self.calibration = self._load_calibration() self._available = True logging.info("Video DATA CREATION completed.")
[docs] def get_recipe(self) -> VideoInputRecipe: """ Generates the Recipe config to be injected into the subprocess TOML. """ return VideoInputRecipe( root_path=str(self.nice_input_folder), camera_names=sorted(list(self.all_camera_names)), filename_template="{camera}/frames/" + FILENAME_TEMPLATE, range_start=self.start_frame, range_end=self.start_frame + self.length_frames, step=1, )
# ------------------------------------------------------------------------- # Helper methods # ------------------------------------------------------------------------- def _find_video_paths(self) -> Dict[str, Path]: """ For each camera, recursively search its source folder for exactly one video file whose stem or any ancestor directory name equals the camera name exactly. Returns: dict mapping camera name -> Path of the matched video file. Raises: ValueError: If a camera matches zero or more than one file. """ result: Dict[str, Path] = {} for cam in self.all_camera_names: source_folder = self.io.get_data_source_folder(cam) candidates = [] for p in source_folder.rglob("*"): # is this a video extension? if p.suffix.lower() not in vid.VIDEO_EXTENSIONS: continue # does it contains exact match of camera name in path? if not _contains_camera_name(p, cam): continue candidates.append(p) if len(candidates) == 0: raise ValueError( f"No video file found for camera '{cam}' in '{source_folder}'. " f"Expected a file or directory named exactly '{cam}'." ) if len(candidates) > 1: raise ValueError( f"Ambiguous: found {len(candidates)} video files for camera '{cam}' " f"in '{source_folder}': {[str(p) for p in candidates]}" ) result[cam] = candidates[0] return result def _resolve_fps_and_length(self) -> tuple[int, int]: """ Probe all camera videos, assert cross-camera consistency of FPS and frame count, then validate FPS against config. Returns: (fps, length_frames) — fps detected from videos, length in frames from config or auto-detected. Raises: ValueError: If cameras disagree on FPS or frame count, or if video_start is beyond the end of the video. """ infos = {} for cam, path in self.camera_video_paths.items(): raw = vid.probe_video(str(path)) infos[cam] = vid.json_to_video_info(raw) # Cross-camera consistency fps_values = {cam: int(info.fps) for cam, info in infos.items() if info.fps is not None} frame_values = {cam: info.frames for cam, info in infos.items() if info.frames is not None} if len(set(fps_values.values())) > 1: raise ValueError(f"Cameras have inconsistent FPS: {fps_values}") if len(set(frame_values.values())) > 1: raise ValueError(f"Cameras have inconsistent frame counts: {frame_values}") # Resolve FPS fps = next(iter(fps_values.values())) if fps_values else None if fps is None: raise ValueError("Could not determine FPS from any camera video.") if fps != self.sequence_context.fps: logging.warning(f"Detected fps={fps} does not match config fps={self.sequence_context.fps}!") # Resolve length video_length_frame = timestamp_to_frame_index(self.sequence_context.video_length, fps) if video_length_frame > 0: return fps, video_length_frame # Auto-detect from frame count total_frames = next(iter(frame_values.values())) if frame_values else None if total_frames is None: raise ValueError("Could not determine frame count from any camera video.") start_frame = timestamp_to_frame_index(self.sequence_context.video_start, fps) available = total_frames - start_frame if available <= 0: raise ValueError(f"video_start ({start_frame}) is beyond the end of the video " f"({total_frames} frames).") logging.info(f"Auto-detected length: {available} frames " f"(Total: {total_frames}, Start: {start_frame})") return fps, available def _input_data_creation(self) -> None: """ Initializes the data required for running NICE toolbox. """ if self._check_frames_exist(): logging.info("Frames FOUND in nicetoolbox input folder") else: logging.info("EXTRACTING frames from video...") self._extract_frames_from_video() def _check_frames_exist(self) -> bool: """ Check if frames exist in the nicetoolbox input folder ("Source of truth"). Returns: bool: True if frames exist for all cameras, False otherwise. """ start_idx = self.start_frame end_idx = self.start_frame + self.length_frames - 1 for cam in self.all_camera_names: cam_folder = self.nice_input_folder / cam / "frames" start_name = FILENAME_TEMPLATE.format(idx=start_idx) end_name = FILENAME_TEMPLATE.format(idx=end_idx) if not ((cam_folder / start_name).exists() and (cam_folder / end_name).exists()): logging.info(f"No input frames found for camera '{cam}': " f"Files will be created in '{cam_folder}'.") return False return True def _extract_frames_from_video(self) -> None: """ Extract frames from each camera's resolved video file into the nicetoolbox_input folder. """ for cam, video_path in self.camera_video_paths.items(): logging.info(f"Extracting frames for camera '{cam}' from '{video_path}'...") raw_video_info = vid.probe_video(str(video_path)) video_info_path = self.nice_input_folder / f"{cam}_meta.json" with open(video_info_path, "w") as f: json.dump(raw_video_info, f, indent=4) video_info = vid.json_to_video_info(raw_video_info) cam_folder = self.nice_input_folder / cam frames_folder = cam_folder / "frames" frames_folder.mkdir(parents=True, exist_ok=True) vid.split_into_frames( str(video_path), str(frames_folder) + "/", video_info.frames, start_frame=self.start_frame_index, keep_indices=True, ) def _load_calibration(self) -> dict | None: """ Load camera calibration from a file for a specific dataset. Returns: dict: A dictionary containing the loaded camera calibration. Raises: KeyError: If loading camera calibration for the specified dataset is not implemented. """ calib_path = self.io.get_calibration_file() if not calib_path or not os.path.isfile(calib_path): logging.warning("Calibration file not found, skipping calibration.") return None calib_details = "__".join([word for word in [self.session_id, self.sequence_id] if word]) try: loaded_calib = np.load(calib_path, allow_pickle=True)[calib_details].item() except KeyError as err: logging.exception( f"Calibration for session '{self.session_id}' and sequence " f"'{self.sequence_id}' not found for calibration file at " f"'{calib_path}'." ) raise err try: calib = {key: value for key, value in loaded_calib.items() if key in self.all_camera_names} except Exception as err: logging.exception(f"An error occurred while creating calibration dictionary: {err}") raise err return calib
# ------------------------------------------------------------------------- # Module-level helpers # ------------------------------------------------------------------------- def _contains_camera_name(video_path: Path, camera_name: str) -> bool: """ Return True if the camera name matches the video file's stem or any directory component in its path exactly (case-insensitive). 'cam_front' matches: - cam_front.mp4 (stem == camera_name) - cam_front/video.mp4 (parent dir == camera_name) - root/cam_front/sub/v.mp4 'cam_front' does NOT match: - cam_front_test.mp4 (stem != camera_name) - cam_front_test/v.mp4 (dir != camera_name) """ cam_lower = camera_name.lower() # Check file stem if video_path.stem.lower() == cam_lower: return True # Check every directory component in the path return any(part.lower() == cam_lower for part in video_path.parts[:-1])