Source code for nicetoolbox.detectors.data_handlers.audio_handler

"""
Audio data extraction and organization handler.

Extracts audio from video files or locates standalone audio files,
organizes them into the nicetoolbox_input/audio/ folder.

DESIGN DECISIONS:
1. Audio is ALWAYS extracted in FULL (no time-range cutting).
   The time range is passed in the recipe for inference scripts to use
   with librosa's offset/duration parameters.
2. No preprocessing (resampling, normalization) — inference scripts own that.
3. Track configuration from dataset_properties.toml drives extraction.
"""

import glob as glob_module
import json
import logging
import subprocess
from pathlib import Path
from typing import Any, Dict, List, Optional

from nicetoolbox_core.input_recipes import AudioInputRecipe, AudioStreamRecipe

from ...configs.schemas.dataset_properties import AudioTrackConfig
from ...configs.video_runtime_config import SequenceRuntimeConfig
from ...utils.logging_utils import log_with_underscore
from ..in_out import SequenceIO
from .handler import BaseModalityHandler


[docs]class AudioDataHandler(BaseModalityHandler): """ Handles audio data extraction and organization. Uses the track configuration from dataset_properties to determine which audio streams to extract and from where. """ def __init__( self, # Shared fields (passed to base) io: SequenceIO, sequence_context: SequenceRuntimeConfig, audio_start_ms: float, audio_length_ms: float, tracks_config: dict[str, AudioTrackConfig], ): super().__init__(io, sequence_context) self.audio_start_ms = audio_start_ms self.audio_length_ms = audio_length_ms self.audio_end_ms = audio_start_ms + audio_length_ms self.tracks_config = tracks_config self.audio_output_folder = self.nice_input_folder / "audio" self._streams: Dict[str, AudioStreamRecipe] = {} @property def modality_name(self) -> str: return "audio" # TODO: Do we actually need this property? @property def streams(self) -> Dict[str, AudioStreamRecipe]: return self._streams
[docs] def prepare(self) -> None: """ Prepare audio data based on track configuration. Audio is always extracted in FULL. Already-extracted files are reused. Sets self._available = True if at least one track was prepared. """ log_with_underscore("Preparing Audio Modality...") self.audio_output_folder.mkdir(parents=True, exist_ok=True) for track_name, track_cfg in self.tracks_config.items(): self._prepare_track(track_name, track_cfg) self._available = len(self._streams) > 0 logging.info(f"Audio preparation complete. {len(self._streams)} track(s): {list(self._streams.keys())}")
[docs] def get_recipe(self) -> AudioInputRecipe: """Build audio input recipe with full file paths + time range.""" return AudioInputRecipe( streams=self._streams.copy(), start_time_ms=self.audio_start_ms, end_time_ms=self.audio_end_ms, )
# ------------------------------------------------------------------------- # Helper methods for track preparation # ------------------------------------------------------------------------- def _prepare_track(self, track_name: str, track_cfg: AudioTrackConfig) -> None: """Prepare a single audio track (embedded or standalone).""" stream_idx = track_cfg.stream channel_idx = track_cfg.channel hears = track_cfg.hears_subjects logging.info(f"Extracting audio track '{track_name}', stream: {stream_idx}, channel: {channel_idx}") camera = None # check where to look for a file if track_cfg.is_embedded: logging.info(f"Audio track is embedded into video, looking for camera {track_cfg.camera}...") source_path = self._find_video_for_camera(track_cfg.camera) source_type = "embedded" camera = track_cfg.camera elif track_cfg.is_standalone: source_path = Path(track_cfg.path) source_type = "standalone" else: raise ValueError("Audio track should be embedded or standalone") if not source_path.exists(): raise FileNotFoundError(f"Audio track '{track_name}': file not found: '{source_path}'") logging.info(f"Auido track file: '{source_path}'") # did we already extracted it before? output_path = self.audio_output_folder / f"{track_name}.wav" if output_path.exists(): logging.info(f"Audio track '{track_name}' already prepared: '{output_path}'") self._register_file(track_name, source_type, hears, output_path, source_path, camera=camera) return # validate input file with ffprobe before processing logging.info("Extracting Audio Specifications...") audio_streams = self._probe_file(source_path) self._dump_probe_json(output_path, audio_streams) logging.info("Validating Audio Specifications...") self._validate_stream_and_channel(track_name, source_path, stream_idx, channel_idx, audio_streams) # extract with ffmpeg logging.info("Extracting Audio Track from file...") self._ffmpeg_extract_full(source_path, output_path, stream_idx, channel_idx) self._register_file(track_name, source_type, hears, output_path, source_path, camera=camera) logging.info(f"Audio Track extracted: {output_path}") # ------------------------------------------------------------------------- # Registration # ------------------------------------------------------------------------- def _register_file( self, track_name: str, source_type: str, hears_subjects: List[int], output_path: Path, source_path: Optional[Path] = None, camera: Optional[str] = None, ) -> None: """Probe and register an audio file as a stream.""" streams = self._probe_file(output_path) if len(streams) != 1: raise RuntimeError(f"Expected exactly 1 audio stream in '{output_path}', got {len(streams)}") stream = streams[0] # Write ffprobe json data to file inside nicetoolbox_input folder # This is currently used only for logging and debugging self._dump_probe_json(output_path, streams, suffix="_wav_meta") # Validate that we have relevant fields # TODO: pydantic and type validation? sample_rate = stream.get("sample_rate") channels = stream.get("channels") if sample_rate is None or channels is None: raise RuntimeError(f"ffprobe missing sample_rate/channels for '{output_path}': {stream}") self._streams[track_name] = AudioStreamRecipe( path=str(output_path), source_path=str(source_path), sample_rate=int(sample_rate), channels=int(channels), source_type=source_type, camera=camera, hears_subjects=hears_subjects, ) # ------------------------------------------------------------------------- # File discovery (video for embedded tracks) # ------------------------------------------------------------------------- def _find_video_for_camera(self, camera_name: str) -> Optional[Path]: """Find the video file for a specific camera using IO.""" VIDEO_EXTS = [".mp4", ".avi"] # TODO: Add more extensions src = self.io.get_data_source_folder(camera_name) # TODO: STRONG assumptions here... # yeah, definitely asking for troubles for ext in VIDEO_EXTS: video_files = glob_module.glob(str(src / f"*{ext}")) matches = [path for path in video_files if camera_name in path] if matches: return Path(matches[0]) raise ValueError(f"No video file found for camera '{camera_name}' in {src}") # ------------------------------------------------------------------------- # FFmpeg / FFprobe helpers # ------------------------------------------------------------------------- def _dump_probe_json(self, path: Path, streams: List[Dict[str, Any]], suffix: str = "_audio_meta") -> None: """Dump ffprobe stream data to a JSON file in the audio output folder for debugging.""" dump_path = self.audio_output_folder / f"{path.stem}{suffix}.json" with open(dump_path, "w") as f: json.dump(streams, f, indent=4) def _validate_stream_and_channel( self, track_name: str, path: Path, stream_idx: int, channel_idx: Optional[int], audio_streams: List[Dict[str, Any]], ) -> None: """Validate that stream_idx and channel_idx exist in audio_streams. Raises on failure.""" if not audio_streams: raise RuntimeError(f"Track '{track_name}': no audio streams found in {path}") if stream_idx >= len(audio_streams): raise RuntimeError( f"Track '{track_name}': stream {stream_idx} not found in " f"{path} ({len(audio_streams)} audio stream(s))" ) if channel_idx is not None and channel_idx >= audio_streams[stream_idx]["channels"]: raise RuntimeError( f"Track '{track_name}': channel {channel_idx} not found in " f"stream {stream_idx} of {path} ({audio_streams[stream_idx]['channels']} channel(s))" ) def _probe_file(self, path: Path) -> List[Dict[str, Any]]: """Probe a file and return all audio streams as a list of dicts. Raises on failure.""" # fmt: off cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", "-select_streams", "a", str(path), ] # fmt: on # call ffprobe and try get meta information as a json try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) except subprocess.CalledProcessError as e: raise RuntimeError(f"ffprobe failed for {path}: {e.stderr}") from None # try to parse it try: data = json.loads(result.stdout) except json.JSONDecodeError as e: raise RuntimeError(f"ffprobe returned invalid JSON for {path}: {e}") from None # let's validate that we have relevant fields streams = data.get("streams", []) return streams def _ffmpeg_extract_full( self, input_path: Path, output_path: Path, stream_index: int, channel_index: int | None = None ) -> bool: """Extract full-length audio stream to WAV""" # fmt: off cmd = [ "ffmpeg", "-y", "-hide_banner", "-loglevel", "warning", "-i", str(input_path), "-map", f"0:a:{stream_index}", # audio stream specific index "-c:a", "pcm_s16le", "-f", "wav", ] # fmt: on # check if need to extract a specific audio channel if channel_index is not None: # 'pan=mono|c0=cX' forces the output to be mono and maps input channel X to output channel 0 cmd.extend(["-af", f"pan=mono|c0=c{channel_index}"]) # finally add output path cmd.append(str(output_path)) try: subprocess.run(cmd, check=True, capture_output=True, text=True) except subprocess.CalledProcessError as e: raise RuntimeError(f"FFmpeg failed for {input_path}: {e.stderr}") from None logging.info(f"Extracted audio to '{output_path}'")