Source code for nicetoolbox.connectors.napari.parser

"""Parse napari body-joint annotations into Toolbox body_joints NPZ.

The napari (DeepLabCut-style) annotation file uses a 4-level column index
(scorer, individuals, bodyparts, coords) and a 3-level row index
(root, camera, image_filename).

Output shape: 2d (subjects, cameras, frames, joints, 3) — x/y/confidence.
Missing frames within the kept window are written as NaN.
"""

import logging
from pathlib import Path

import numpy as np
import pandas as pd

from ...configs.models.video_timestamp import timestamp_to_frame_index
from ...utils.video import get_fps
from .napari_configs import NapariImportBodyJointsConfig, NapariSequenceConfig

REQUIRED_COORDS = ("x", "y", "likelihood")


[docs]def load_napari_dataframe(input_path: Path) -> pd.DataFrame: """Load a napari CSV or HDF5 annotation file as a MultiIndex DataFrame.""" suffix = input_path.suffix.lower() if suffix == ".csv": df = pd.read_csv(input_path, header=[0, 1, 2, 3], index_col=[0, 1, 2]) elif suffix in {".h5", ".hdf", ".hdf5"}: df = pd.read_hdf(input_path) else: raise ValueError(f"Unsupported napari input extension '{suffix}'. Use .csv or .h5") df.columns = pd.MultiIndex.from_tuples( [tuple(str(lvl) for lvl in col) for col in df.columns], names=["scorer", "individuals", "bodyparts", "coords"], ) return df
def _frame_number(image_name: str) -> int: """Extract integer frame number from a filename like '000003600.png'.""" return int(Path(image_name).stem)
[docs]def napari_to_body_joints_npz( sequence: NapariSequenceConfig, cfg: NapariImportBodyJointsConfig, ) -> None: """Convert one napari annotation file to a Toolbox body_joints NPZ.""" df = load_napari_dataframe(sequence.input) logging.info(f"Loaded napari data from: {sequence.input} (rows={len(df)})") individuals = list(df.columns.get_level_values("individuals").unique()) bodyparts = list(df.columns.get_level_values("bodyparts").unique()) coords = list(df.columns.get_level_values("coords").unique()) logging.info(f"Napari columns: individuals={individuals}, bodyparts={len(bodyparts)}, coords={coords}") for required in REQUIRED_COORDS: if required not in coords: raise ValueError(f"Expected coord '{required}' in napari columns, got: {coords}") if cfg.subjects: unknown = [v for v in individuals if v not in cfg.subjects] if unknown: raise ValueError(f"Napari individuals {unknown} not in [subjects] mapping {sorted(cfg.subjects)}") subjects = [cfg.subjects[v] for v in individuals] else: subjects = list(individuals) logging.info(f"Subject order: {individuals} -> {subjects}") napari_cameras = list(dict.fromkeys(df.index.get_level_values(1))) if cfg.cameras: unknown = [v for v in napari_cameras if v not in cfg.cameras] if unknown: raise ValueError(f"Napari cameras {unknown} not in [cameras] mapping {sorted(cfg.cameras)}") cameras = [cfg.cameras[v] for v in napari_cameras] else: cameras = list(napari_cameras) logging.info(f"Camera order: {napari_cameras} -> {cameras}") all_frame_numbers = sorted({_frame_number(name) for _, _, name in df.index}) first_frame, last_frame = all_frame_numbers[0], all_frame_numbers[-1] fps = get_fps(str(sequence.video)) logging.info(f"Video fps={fps}") start_frame = max(first_frame, timestamp_to_frame_index(sequence.start, fps)) if sequence.end != -1: end_frame = timestamp_to_frame_index(sequence.end, fps) else: end_frame = last_frame n_frames = end_frame - start_frame if sequence.reset_frames: frame_labels = [f"{i:09d}" for i in range(n_frames)] else: frame_labels = [f"{start_frame + i:09d}" for i in range(n_frames)] logging.info(f"Frame range: {start_frame}..{end_frame} ({n_frames} frames, reset_frames={sequence.reset_frames})") arr = np.full( (len(subjects), len(cameras), n_frames, len(bodyparts), 3), fill_value=np.nan, dtype=np.float64, ) for cam_idx, napari_cam in enumerate(napari_cameras): cam_df = df.xs(napari_cam, level=1) raw_frames = np.array([_frame_number(name) for _, name in cam_df.index]) keep = (raw_frames >= start_frame) & (raw_frames < end_frame) cam_df: pd.DataFrame = cam_df.iloc[keep] frame_indices = raw_frames[keep] - start_frame for ind_idx, individual in enumerate(individuals): for joint_idx, joint in enumerate(bodyparts): for coord_idx, coord in enumerate(REQUIRED_COORDS): col_values = cam_df.xs((individual, joint, coord), level=(1, 2, 3), axis=1).to_numpy().ravel() arr[ind_idx, cam_idx, frame_indices, joint_idx, coord_idx] = col_values logging.info(f"Filled array of shape {arr.shape}") data_description = { "2d": { "axis0": subjects, "axis1": cameras, "axis2": frame_labels, "axis3": list(bodyparts), "axis4": ["coordinate_x", "coordinate_y", "confidence_score"], }, } npz_dict = { "2d": arr, "data_description": np.array(data_description, dtype=object), } sequence.output.parent.mkdir(parents=True, exist_ok=True) np.savez(sequence.output, **npz_dict) logging.info(f"Saved body_joints NPZ to: {sequence.output}")