Source code for nicetoolbox.detectors.method_detectors.spiga.spiga_detector

"""
SPIGA method detector class.
"""

import logging
import os

import cv2
import numpy as np

from nicetoolbox_core.video_loaders import ImagePathsByFrameIndexLoader

from ....configs.schemas.detectors_instances_configs import SpigaConfig
from ....utils import video as vd
from ... import config_handler as confh
from ..base_method import BaseMethod


[docs]def extract_key_per_value(input_dict): """ Extracts keys from a dictionary based on the type of their values. If all values in the dictionary are integers, it returns a list of keys. If any value is a list, it appends an index to the key to create a unique key. Args: input_dict (dict): The input dictionary to extract keys from. Returns: return_keys (list): A list of keys extracted from the input dictionary. Raises: NotImplementedError: If a value in the dictionary is neither an integer nor a list. """ if all(isinstance(val, int) for val in list(input_dict.values())): return list(input_dict.keys()) return_keys = [] for key, value in input_dict.items(): if isinstance(value, int): return_keys.append(key) elif isinstance(value, list): for idx, _ in enumerate(value): return_keys.append(f"{key}_{idx}") else: raise NotImplementedError return return_keys
[docs]def return_direction_vector(rotation_matrix, axis): arr = np.array([100.0, 0, 0]) if axis == "x": arr = np.array([100.0, 0, 0]) elif axis == "y": arr = np.array([0, -50, 0]) elif axis == "z": arr = np.array([0, 0, -50.0]) direction_3D = arr direction_2D = rotation_matrix @ direction_3D.reshape(3, 1) return direction_2D[:2].flatten()
[docs]class Spiga(BaseMethod): """ SPIGA is a method detector that computes the head_orientation component. Component: head_orientation Attributes: components (list): A list containing the name of the component: head_orientation algorithm (str): Algorithm name used to compute the head_orientation component. camera_names (list): List of camera names used to capture original input data. """ components = ["head_orientation"] algorithm_type = "spiga" def _initialize_detector(self) -> SpigaConfig.RuntimeConfig: """ Initializes the Spiga class with extra configuration settings. """ # === (1) Store convenience references for this class === self.video_start = self.data.video_start_frame_index self.subjects_descr = self.data.subjects_descr self.cam_sees_subjects = self.data.cam_sees_subjects self.results_folder = self.result_folders[self.components[0]] self.camera_names = self.detector_config.camera_names self.keypoints_indices = self.predictions_mapping.head_orientation.spiga.keypoints_index # Initialise data loader self.dataloader = ImagePathsByFrameIndexLoader( config=self.data.get_input_recipes(), expected_cameras=self.camera_names ) # === (2) EXTRA FIELDS for Spiga === self._face_landmarks_description = confh.flatten_list(extract_key_per_value(self.keypoints_indices.face)) # Call BaseMethod _initialize_detector() to build runtime + add extra fields base_runtime = super()._initialize_detector() # Return extended runtime with Spiga-specific fields return SpigaConfig.RuntimeConfig( **base_runtime.model_dump(), face_landmarks_description=self._face_landmarks_description, )
[docs] def post_inference(self): """ Calculate head orientation in 2D image after SPIGA inference. """ n_subjects = len(self.subjects_descr) n_cams = len(self.camera_names) n_frames = len(self.dataloader) spiga_vectors = np.zeros((n_subjects, n_cams, n_frames, 8)) prediction_file = os.path.join(self.results_folder, f"{self.algorithm_instance}.npz") prediction = np.load(prediction_file, allow_pickle=True) predictions_dict = {key: prediction[key] for key in prediction.files} data_description = predictions_dict["data_description"].item() headposes = prediction["headpose"] face_landmarks = prediction["face_landmark_2d"] # Todo: vectorize for subj_idx in range(n_subjects): for cam_idx in range(n_cams): for frame_idx in range(n_frames): # Extract headpose headpose = headposes[subj_idx][cam_idx][frame_idx] # shape (6,) landmarks = face_landmarks[subj_idx][cam_idx][frame_idx] euler_yzx = np.array(headpose[:3]) # first three value is euler angles # Rotation matrix rotation_matrix = self._euler_to_rotation_matrix(euler_yzx) # 2D nose projection nose_down = self.keypoints_indices.face["nose_down"] # select middle point of nose_down landmarks nose_down_index = nose_down[int(len(nose_down) / 2)] nose_org = np.array( [ (landmarks[nose_down_index][0]), (landmarks[nose_down_index][1]), ], dtype=np.float32, ) # Rotation order Y-Z-X, body’s forward axis is +X forward_tip = nose_org + return_direction_vector(rotation_matrix, "x") axisy_tip = nose_org + return_direction_vector(rotation_matrix, "y") axisz_tip = nose_org + return_direction_vector(rotation_matrix, "z") # Optional: logging or boundary checks if subj_idx >= len(self.subjects_descr): logging.warning(f"Subject index {subj_idx} out of bounds") continue spiga_vectors[subj_idx, cam_idx, frame_idx, :] = [ nose_org[0], nose_org[1], forward_tip[0], forward_tip[1], axisy_tip[0], axisy_tip[1], axisz_tip[0], axisz_tip[1], ] predictions_dict["head_orientation_2d"] = spiga_vectors data_description.update( { "head_orientation_2d": { "axis0": self.subjects_descr, "axis1": self.camera_names, "axis2": data_description["headpose"]["axis2"], "axis3": [ "start_x", "start_y", "end_forward_x", "end_forward_y", "end_yaxis_x", "end_yaxis_y", "end_zaxis_x", "end_zaxis_y", ], } } ) np.savez_compressed(prediction_file, **predictions_dict) logging.info("SPIGA post-processing result saved successfully.")
[docs] def visualization(self, data): _data = data # TODO Remove argument! n_subj = len(self.subjects_descr) prediction_file = os.path.join(self.results_folder, f"{self.algorithm_instance}.npz") predictions = np.load(prediction_file, allow_pickle=True) head_data = predictions["head_orientation_2d"] # per camera and frame, visualize each subject's gaze success = True for cam_idx, camera_name in enumerate(self.camera_names): os.makedirs(os.path.join(self.viz_folder, camera_name), exist_ok=True) for frame_idx, (real_frame_idx, frame_paths_per_camera) in enumerate(self.dataloader): image_file = frame_paths_per_camera[camera_name] image = cv2.imread(image_file) colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255)] # colors = [(300, 30, 60), (0, 128, 0)] for subject_idx in range(n_subj): if subject_idx not in self.cam_sees_subjects[camera_name]: continue head_orientation = head_data[subject_idx, cam_idx, frame_idx] start = (int(head_orientation[0]), int(head_orientation[1])) forward_tip = (int(head_orientation[2]), int(head_orientation[3])) axisy_tip = (int(head_orientation[4]), int(head_orientation[5])) axisz_tip = (int(head_orientation[6]), int(head_orientation[7])) for i, tip in enumerate([axisz_tip, axisy_tip, forward_tip]): if tip == forward_tip: cv2.arrowedLine(image, start, tip, colors[i], thickness=3, tipLength=0.1) else: cv2.line(image, start, tip, colors[i], thickness=3) cv2.imwrite( os.path.join(self.viz_folder, camera_name, f"{(real_frame_idx):09d}.jpg"), image, ) # create and save video success *= vd.frames_to_video( os.path.join(self.viz_folder, camera_name), os.path.join(self.viz_folder, f"{camera_name}.mp4"), fps=self.data.fps, start_frame=int(self.video_start), ) logging.info(f"Detector {self.components}: visualization finished with code " f"{success}.")
# Note taken from spiga.demo.visualize.layouts.plot_headpose def _euler_to_rotation_matrix(self, headpose): # Change coordinates system euler = np.array([-(headpose[0] - 90), -headpose[1], -(headpose[2] + 90)]) # Convert to radians rad = euler * (np.pi / 180.0) cy = np.cos(rad[0]) sy = np.sin(rad[0]) cp = np.cos(rad[1]) sp = np.sin(rad[1]) cr = np.cos(rad[2]) sr = np.sin(rad[2]) # labels in original Spiga function corrected, # the rotation in y-axis would named pitch, and z-axis yaw. Ry = np.array([[cy, 0.0, sy], [0.0, 1.0, 0.0], [-sy, 0.0, cy]]) # yaw Rp = np.array([[cp, -sp, 0.0], [sp, cp, 0.0], [0.0, 0.0, 1.0]]) # pitch Rr = np.array([[1.0, 0.0, 0.0], [0.0, cr, -sr], [0.0, sr, cr]]) # roll return np.matmul(np.matmul(Ry, Rp), Rr)