"""
SPIGA method detector class.
"""
import logging
import os
import cv2
import numpy as np
from nicetoolbox_core.video_loaders import ImagePathsByFrameIndexLoader
from ....configs.schemas.detectors_instances_configs import SpigaConfig
from ....utils import video as vd
from ... import config_handler as confh
from ..base_method import BaseMethod
[docs]def return_direction_vector(rotation_matrix, axis):
arr = np.array([100.0, 0, 0])
if axis == "x":
arr = np.array([100.0, 0, 0])
elif axis == "y":
arr = np.array([0, -50, 0])
elif axis == "z":
arr = np.array([0, 0, -50.0])
direction_3D = arr
direction_2D = rotation_matrix @ direction_3D.reshape(3, 1)
return direction_2D[:2].flatten()
[docs]class Spiga(BaseMethod):
"""
SPIGA is a method detector that computes the head_orientation component.
Component: head_orientation
Attributes:
components (list): A list containing the name of the component: head_orientation
algorithm (str): Algorithm name used to compute the head_orientation component.
camera_names (list): List of camera names used to capture original input data.
"""
components = ["head_orientation"]
algorithm_type = "spiga"
def _initialize_detector(self) -> SpigaConfig.RuntimeConfig:
"""
Initializes the Spiga class with extra configuration settings.
"""
# === (1) Store convenience references for this class ===
self.video_start = self.data.video_start_frame_index
self.subjects_descr = self.data.subjects_descr
self.cam_sees_subjects = self.data.cam_sees_subjects
self.results_folder = self.result_folders[self.components[0]]
self.camera_names = self.detector_config.camera_names
self.keypoints_indices = self.predictions_mapping.head_orientation.spiga.keypoints_index
# Initialise data loader
self.dataloader = ImagePathsByFrameIndexLoader(
config=self.data.get_input_recipes(), expected_cameras=self.camera_names
)
# === (2) EXTRA FIELDS for Spiga ===
self._face_landmarks_description = confh.flatten_list(extract_key_per_value(self.keypoints_indices.face))
# Call BaseMethod _initialize_detector() to build runtime + add extra fields
base_runtime = super()._initialize_detector()
# Return extended runtime with Spiga-specific fields
return SpigaConfig.RuntimeConfig(
**base_runtime.model_dump(),
face_landmarks_description=self._face_landmarks_description,
)
[docs] def post_inference(self):
"""
Calculate head orientation in 2D image after SPIGA inference.
"""
n_subjects = len(self.subjects_descr)
n_cams = len(self.camera_names)
n_frames = len(self.dataloader)
spiga_vectors = np.zeros((n_subjects, n_cams, n_frames, 8))
prediction_file = os.path.join(self.results_folder, f"{self.algorithm_instance}.npz")
prediction = np.load(prediction_file, allow_pickle=True)
predictions_dict = {key: prediction[key] for key in prediction.files}
data_description = predictions_dict["data_description"].item()
headposes = prediction["headpose"]
face_landmarks = prediction["face_landmark_2d"]
# Todo: vectorize
for subj_idx in range(n_subjects):
for cam_idx in range(n_cams):
for frame_idx in range(n_frames):
# Extract headpose
headpose = headposes[subj_idx][cam_idx][frame_idx] # shape (6,)
landmarks = face_landmarks[subj_idx][cam_idx][frame_idx]
euler_yzx = np.array(headpose[:3]) # first three value is euler angles
# Rotation matrix
rotation_matrix = self._euler_to_rotation_matrix(euler_yzx)
# 2D nose projection
nose_down = self.keypoints_indices.face["nose_down"]
# select middle point of nose_down landmarks
nose_down_index = nose_down[int(len(nose_down) / 2)]
nose_org = np.array(
[
(landmarks[nose_down_index][0]),
(landmarks[nose_down_index][1]),
],
dtype=np.float32,
)
# Rotation order Y-Z-X, body’s forward axis is +X
forward_tip = nose_org + return_direction_vector(rotation_matrix, "x")
axisy_tip = nose_org + return_direction_vector(rotation_matrix, "y")
axisz_tip = nose_org + return_direction_vector(rotation_matrix, "z")
# Optional: logging or boundary checks
if subj_idx >= len(self.subjects_descr):
logging.warning(f"Subject index {subj_idx} out of bounds")
continue
spiga_vectors[subj_idx, cam_idx, frame_idx, :] = [
nose_org[0],
nose_org[1],
forward_tip[0],
forward_tip[1],
axisy_tip[0],
axisy_tip[1],
axisz_tip[0],
axisz_tip[1],
]
predictions_dict["head_orientation_2d"] = spiga_vectors
data_description.update(
{
"head_orientation_2d": {
"axis0": self.subjects_descr,
"axis1": self.camera_names,
"axis2": data_description["headpose"]["axis2"],
"axis3": [
"start_x",
"start_y",
"end_forward_x",
"end_forward_y",
"end_yaxis_x",
"end_yaxis_y",
"end_zaxis_x",
"end_zaxis_y",
],
}
}
)
np.savez_compressed(prediction_file, **predictions_dict)
logging.info("SPIGA post-processing result saved successfully.")
[docs] def visualization(self, data):
_data = data # TODO Remove argument!
n_subj = len(self.subjects_descr)
prediction_file = os.path.join(self.results_folder, f"{self.algorithm_instance}.npz")
predictions = np.load(prediction_file, allow_pickle=True)
head_data = predictions["head_orientation_2d"]
# per camera and frame, visualize each subject's gaze
success = True
for cam_idx, camera_name in enumerate(self.camera_names):
os.makedirs(os.path.join(self.viz_folder, camera_name), exist_ok=True)
for frame_idx, (real_frame_idx, frame_paths_per_camera) in enumerate(self.dataloader):
image_file = frame_paths_per_camera[camera_name]
image = cv2.imread(image_file)
colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255)]
# colors = [(300, 30, 60), (0, 128, 0)]
for subject_idx in range(n_subj):
if subject_idx not in self.cam_sees_subjects[camera_name]:
continue
head_orientation = head_data[subject_idx, cam_idx, frame_idx]
start = (int(head_orientation[0]), int(head_orientation[1]))
forward_tip = (int(head_orientation[2]), int(head_orientation[3]))
axisy_tip = (int(head_orientation[4]), int(head_orientation[5]))
axisz_tip = (int(head_orientation[6]), int(head_orientation[7]))
for i, tip in enumerate([axisz_tip, axisy_tip, forward_tip]):
if tip == forward_tip:
cv2.arrowedLine(image, start, tip, colors[i], thickness=3, tipLength=0.1)
else:
cv2.line(image, start, tip, colors[i], thickness=3)
cv2.imwrite(
os.path.join(self.viz_folder, camera_name, f"{(real_frame_idx):09d}.jpg"),
image,
)
# create and save video
success *= vd.frames_to_video(
os.path.join(self.viz_folder, camera_name),
os.path.join(self.viz_folder, f"{camera_name}.mp4"),
fps=self.data.fps,
start_frame=int(self.video_start),
)
logging.info(f"Detector {self.components}: visualization finished with code " f"{success}.")
# Note taken from spiga.demo.visualize.layouts.plot_headpose
def _euler_to_rotation_matrix(self, headpose):
# Change coordinates system
euler = np.array([-(headpose[0] - 90), -headpose[1], -(headpose[2] + 90)])
# Convert to radians
rad = euler * (np.pi / 180.0)
cy = np.cos(rad[0])
sy = np.sin(rad[0])
cp = np.cos(rad[1])
sp = np.sin(rad[1])
cr = np.cos(rad[2])
sr = np.sin(rad[2])
# labels in original Spiga function corrected,
# the rotation in y-axis would named pitch, and z-axis yaw.
Ry = np.array([[cy, 0.0, sy], [0.0, 1.0, 0.0], [-sy, 0.0, cy]]) # yaw
Rp = np.array([[cp, -sp, 0.0], [sp, cp, 0.0], [0.0, 0.0, 1.0]]) # pitch
Rr = np.array([[1.0, 0.0, 0.0], [0.0, cr, -sr], [0.0, sr, cr]]) # roll
return np.matmul(np.matmul(Ry, Rp), Rr)