Source code for nicetoolbox.visual.media.components

"""
Components module for defining various visual components.

Classes:
    GazeIndividualComponent: Class for visualizing individual gaze data.
    BodyJointsComponent: Class for visualizing body joints data.
    HandJointsComponent: Class for visualizing hand joints data.
    FaceLandmarksComponent: Class for visualizing face landmarks data.
    GazeInteractionComponent: Class for visualizing gaze interaction data.
    ProximityComponent: Class for visualizing proximity data.
    KinematicsComponent: Class for visualizing kinematics data.
"""

from abc import ABC, abstractmethod
from typing import Dict, List, Tuple

import numpy as np
import rerun as rr


[docs]class Component(ABC): """ Abstract class for defining visual components. Attributes: visualizer_config (dict): Configuration settings for the visualizer. component_name (str): The name of the component. logger (viewer.Viewer): The viewer object for logging the visualizations. algorithm_list (list): The list of algorithms used for the component. component_prediction_folder (str): The path to the component prediction folder. canvas_list (list): The list of canvases for the component. algorithms_results (list): The list of algorithm results for the component. canvas_data (dict): The dictionary of canvas data for the component. """ def __init__(self, visualizer_config, io, logger, component_name): self.visualizer_config = visualizer_config self.component_name = component_name self.logger = logger self.component_prediction_folder = io.get_component_results_folder( visualizer_config["io"]["video_name"], component_name=component_name ) self.algorithm_list = self.visualizer_config["media"][self.component_name]["algorithms"] # get canvas list from visualizer_config canvas_list = [] for canvases in self.visualizer_config["media"][self.component_name]["canvas"].values(): canvas_list.extend(canvases) self.canvas_list = list(set(canvas_list)) # load algorithm results self.algorithms_results = [] for alg in self.algorithm_list: alg_path = io.get_algorithm_result(self.component_prediction_folder, alg) try: self.algorithms_results.append(np.load(alg_path, allow_pickle=True)) except FileNotFoundError: print( f"ERROR: {alg}.npz file is not found in {self.component_name} folder." f"It will not be visualized\n " f"Remove {alg} or {self.component_name} in the visualizer_config.toml file" ) raise # create canvas data dictionary - key is data name, and value is algorithms data # (lists of algorithms results) self.canvas_data = {} for data_name, canvas in self.visualizer_config["media"][self.component_name]["canvas"].items(): if canvas != []: self.algorithms_data = [] if self.algorithms_results: for i, _alg in enumerate(self.algorithm_list): self.algorithms_data.append(self.algorithms_results[i][data_name]) self.canvas_data[data_name] = self.algorithms_data def _parse_alg_color(self, alg_idx: int) -> List[int]: """ Parse the color for the algorithm index. Args: alg_idx (int): The index of the algorithm. Returns: list: The color for the algorithm index. """ return self.visualizer_config["media"][self.component_name]["appearance"]["colors"][alg_idx] def _parse_radii(self, type: str) -> float: """ Parse the radii for the type. Type is one of '3d' or 'camera_view'. Args: type (str): The type of radii. Returns: float: The radii for the type. """ if type == "3d": return self.visualizer_config["media"][self.component_name]["appearance"]["radii"]["3d"] if type == "camera_view": return self.visualizer_config["media"][self.component_name]["appearance"]["radii"]["camera_view"] raise ValueError("Invalid type. Use either '3d' or 'camera_view'") @abstractmethod def _get_algorithms_labels(self): """ Abstract method to get the labels for the algorithms. """ pass @abstractmethod def _log_data(self): """ Abstract method to log the data. """ pass
[docs] @abstractmethod def visualize(self): """ Abstract method to visualize the component. """ pass
[docs]class BodyJointsComponent(Component): """ Class for visualizing body joint data. """ def __init__(self, visualizer_config: Dict, io, logger, component_name: str): super().__init__(visualizer_config, io, logger, component_name) # note: All these numpy arrays share a common structure in their first 3 # dimension : [number_of_subjects, number_of_cameras, number_of_frames] # by design all algorithms in same component shares the same cameras and # subjects -- therefore the camera_names and subject_names results will be # read from first algorithm data description axis0 gives subject information self.subject_names_2d = self.algorithms_results[0]["data_description"].item()["2d"]["axis0"] if "3D_Canvas" in self.canvas_list: self.subject_names_3d = self.algorithms_results[0]["data_description"].item()["3d"]["axis0"] # data description axis1 gives camera information self.camera_names = self.algorithms_results[0]["data_description"].item()["2d"]["axis1"]
[docs] def calculate_middle_eyes(self, dimension: int) -> Tuple[np.ndarray, List[str]]: """ Calculate the middle of the eyes for the given dimension. Args: dimension (int): The dimension for the middle eyes. Returns: Tuple[np.ndarray, List[str]]: The middle eyes data and the camera names. """ # we will use first algorithm results labels = self._get_algorithms_labels()[0] right_eye_idx = labels.index("right_eye") left_eye_idx = labels.index("left_eye") dim = f"{dimension}d" if (dimension < 2) | (dimension > 3): assert "supported dimensions are: 2 or 3" elif dimension == 3 and dim not in self.canvas_data: print( f"{dim} results could not found in selected canvas data.\n" f"If you don't have 3d results, set multi-view false in visualizer_config.toml.\n" f"If you have 3d results, add '3D_Canvas' into body_joint.canvas in visualizer_config.toml" ) return (None, None) data = self.algorithms_results[0][dim] mean_value = np.mean(data[:, :, :, [right_eye_idx, left_eye_idx], :dimension], axis=3) return (mean_value, self.camera_names)
def _get_algorithms_labels(self) -> List[List[str]]: """ Get the labels for the algorithms. Returns: List[List[str]]: The labels for the algorithms. """ # axis 3 gives labels information, this might be different for each algorithm algorithm_labels = [] for i, _alg in enumerate(self.algorithm_list): algorithm_labels.append(self.algorithms_results[i]["data_description"].item()["2d"]["axis3"]) return algorithm_labels def _get_skeleton_connections(self, alg_idx: int, predictions_mapping: Dict) -> List[List[str]]: """ Get the skeleton connections for the algorithm index from the predictions mapping. Args: alg_idx (int): The index of the algorithm. predictions_mapping (Dict): The predictions mapping. Returns: List[List[str]]: The skeleton connections for the algorithm index. """ alg_name = self.algorithm_list[alg_idx] # get algorithm keypoint type alg_type = self.visualizer_config["algorithms_properties"][alg_name]["keypoint_mapping"] return predictions_mapping["human_pose"][alg_type]["connections"][self.component_name] def _log_skeleton(self, entity_path: str, data_points: np.ndarray, dimension: int, alg_idx: int) -> None: """ Log the skeleton data points in rerun. Args: entity_path (str): The entity path. data_points (np.ndarray): The data points. dimension (int): The dimension. alg_idx (int): The algorithm index. """ keypoints_dict = {label: i for i, label in enumerate(self._get_algorithms_labels()[alg_idx])} connections = self._get_skeleton_connections(alg_idx, self.visualizer_config["predictions_mapping"]) start_points, end_points = [], [] for connect in connections: for k in range(len(connect) - 1): if (connect[k] in keypoints_dict) & (connect[k + 1] in keypoints_dict): start = keypoints_dict[connect[k]] end = keypoints_dict[connect[k + 1]] start_points.append([data_points[start]]) end_points.append([data_points[end]]) start_points = np.array(start_points).reshape(-1, dimension) end_points = np.array(end_points).reshape(-1, dimension) color = self._parse_alg_color(alg_idx) if dimension == 2: radii = self.visualizer_config["media"][self.component_name]["appearance"]["radii"]["camera_view"] rr.log( entity_path, rr.LineStrips2D( np.stack((start_points, end_points), axis=1), colors=color, radii=radii, ), ) else: radii = self.visualizer_config["media"][self.component_name]["appearance"]["radii"]["3d"] rr.log( entity_path, rr.LineStrips3D( np.stack((start_points, end_points), axis=1), colors=color, radii=radii, ), ) def _log_data(self, entity_path: str, data_points: np.ndarray, dimension: int, alg_idx: int) -> None: """ Log the data points in rerun. Args: entity_path (str): The entity path. data_points (np.ndarray): The data points. dimension (int): The dimension. alg_idx (int): The algorithm index. """ color = self._parse_alg_color(alg_idx) if dimension == "2d": radii = self._parse_radii("camera_view") rr.log( entity_path, rr.Points2D( data_points, keypoint_ids=list(range(data_points.shape[0])), colors=color, radii=radii, ), ) elif dimension == "3d": radii = self._parse_radii("3d") rr.log( entity_path, rr.Points3D( data_points, keypoint_ids=list(range(data_points.shape[0])), colors=color, radii=radii, ), )
[docs] def visualize(self, frame_idx: int) -> None: """ Visualize the body joints component. Combines the _log_data and _log_skeleton methods to visualize the body joints component in either 2D or 3D. Args: frame_idx (int): The frame index. """ for canvas in self.canvas_list: if canvas == "3D_Canvas": for alg_idx, alg_data in enumerate(self.canvas_data["3d"]): if frame_idx >= alg_data.shape[2]: # number of frames continue alg_name = self.algorithm_list[alg_idx] for subject_idx, subject in enumerate(self.subject_names_3d): subject_3d_points = alg_data[subject_idx, 0, frame_idx][ :, :3 ] # select first 3 values, 4th is confidence score entity_path = self.logger.generate_component_entity_path( self.component_name, is_3d=True, alg_name=alg_name, subject_name=subject, ) self._log_data(entity_path, subject_3d_points, "3d", alg_idx) self._log_skeleton( f"{entity_path}/skeleton", subject_3d_points, dimension=3, alg_idx=alg_idx, ) else: cam_name = canvas data_key = [c for c in self.canvas_data if c != "3d"] camera_index = self.camera_names.index(cam_name) for k in data_key: for alg_idx, alg_data in enumerate(self.canvas_data[k]): if frame_idx >= alg_data.shape[2]: # number of frames continue alg_name = self.algorithm_list[alg_idx] for subject_idx, subject in enumerate(self.subject_names_2d): subject_2d_points = alg_data[subject_idx, camera_index, frame_idx][ :, :2 ] # select first 2 values, 3rd is confidence score entity_path = self.logger.generate_component_entity_path( self.component_name, is_3d=False, alg_name=alg_name, subject_name=subject, cam_name=cam_name, ) self._log_data(entity_path, subject_2d_points, "2d", alg_idx) self._log_skeleton( f"{entity_path}/skeleton", subject_2d_points, dimension=2, alg_idx=alg_idx, )
[docs]class HandJointsComponent(BodyJointsComponent): """ Class for visualizing hand joints data. """ def __init__(self, visualizer_config, io, logger, component_name): """ Initialize the HandJointsComponent by calling the BodyJointsComponent constructor. Args: visualizer_config (dict): The visualizer configuration settings. io: The input/output object. logger: The logger object. component_name (str): The name of the component. """ super().__init__(visualizer_config, io, logger, component_name)
[docs]class FaceLandmarksComponent(BodyJointsComponent): """ Class for visualizing face landmarks data. """ def __init__(self, visualizer_config, io, logger, component_name): super().__init__(visualizer_config, io, logger, component_name)
[docs]class GazeIndividualComponent(Component): """ Class for visualizing individual gaze data. Attributes: calib (dict): The calibration parameters. camera_names (List[str]): The camera names. subject_names (List[str]): The subject names. landmarks_2d (np.ndarray): The 2D landmarks data. eyes_middle_3d_data (np.ndarray): The 3D eyes middle data. camera_view_subjects_middle_point_dict (Dict): The camera view subjects middle point dictionary. look_at_data (np.ndarray): The look at data. look_at_labels (List[str]): The look at labels. projected_gaze_data_algs (List[Dict]): The projected gaze data for the algorithms. """ def __init__( self, visualizer_config: Dict, io, logger, component_name: str, calib: Dict, eyes_middle_3d_data: np.ndarray = None, look_at_data_tuple: bool = None, ): """ Initialize the GazeIndividualComponent. Args: visualizer_config (Dict): The visualizer configuration settings. io: The input/output object. logger (viewer.Viewer): The viewer rerun object. component_name (str): The name of the component. calib (Dict): The calibration parameters. eyes_middle_3d_data (np.ndarray, optional): The 3D eyes middle data. Defaults to None. look_at_data_tuple (bool, optional): The look at data tuple. Defaults to None. """ super().__init__(visualizer_config, io, logger, component_name) self.calib = calib # the camera_names and subject_names results will be read from first algorithm # we are getting camera names from landmarks_2d because 3d doesn't have any # camera info self.camera_names = self.algorithms_results[0]["data_description"].item()["landmarks_2d"][ "axis1" ] # axis1 gives camera info self.subject_names = self.algorithms_results[0]["data_description"].item()["3d"][ "axis0" ] # axis0 gives subject info self.landmarks_2d = self.algorithms_results[0]["landmarks_2d"] # create subjects middle of face # 3d self.eyes_middle_3d_data, _ = eyes_middle_3d_data # camera view # create the camera view -- middle of subjects' face point dictionary mean_face = np.nanmean(self.landmarks_2d.astype(float)[:, :, :, :4, :], axis=3) self.camera_view_subjects_middle_point_dict = {} for cam_idx, cam_name in enumerate(self.camera_names): subjects_middle_points = [] for subject_idx, _subject in enumerate(self.subject_names): subjects_middle_points.append(mean_face[subject_idx, cam_idx, :]) self.camera_view_subjects_middle_point_dict[cam_name] = subjects_middle_points self.look_at_data = None self.look_at_labels = None if look_at_data_tuple: self.look_at_data = look_at_data_tuple[0] self.look_at_labels = look_at_data_tuple[1] # retrieve 3d data projected to 2d camera views key_2d = "2d_projected_from_3d_filtered" if "3d_filtered" in self.canvas_data else "2d_projected_from_3d" self.projected_gaze_data_algs = [] for alg_idx, _alg in enumerate(self.algorithm_list): proj = {} for cam_name in [c for c in self.canvas_list if "3d" not in c.lower()]: cam_idx = self.camera_names.index(cam_name) proj[cam_name] = self.algorithms_results[alg_idx][key_2d][:, cam_idx] self.projected_gaze_data_algs.append(proj) def _get_algorithms_labels(self) -> List[List[str]]: """ Get the labels for the algorithms. Returns: List[List[str]]: The labels for the algorithms. """ # axis 3 gives labels information, this might be different for each algorithm algorithm_labels = [] for i, _alg in enumerate(self.algorithm_list): algorithm_labels.append(self.algorithms_results[i]["data_description"].item()["3d"]["axis3"]) return algorithm_labels def _get_look_at_color(self, sub_idx: int, alg_idx: int, look_to_subject: str, frame_idx: int) -> List[int]: """ Get the look at color for the subject index, algorithm index, look to subject, and frame index. Args: sub_idx (int): The subject index. alg_idx (int): The algorithm index. look_to_subject (str): The look to subject. frame_idx (int): The frame index. Returns: List[int]: The look at color. """ look_to_ind = self.look_at_labels.index(look_to_subject) is_look_at = self.look_at_data[sub_idx, 0, frame_idx, look_to_ind] color_index = 0 if is_look_at else 1 return self.visualizer_config["media"]["gaze_interaction"]["appearance"]["colors"][alg_idx][color_index] def _log_data( self, entity_path: str, head_points: np.ndarray, data_points: np.ndarray, color: List[int], dimension: str, ) -> None: """ Log the gaze points and head points in rerun. Args: entity_path (str): The entity path. head_points (np.ndarray): The head points. data_points (np.ndarray): The gaze points. color (List[int]): The color. dimension (str): The dimension. """ if dimension == "2d": radii = self._parse_radii("camera_view") rr.log( entity_path, rr.Arrows2D( origins=np.array(head_points).reshape(-1, 2), vectors=np.array(data_points).reshape(-1, 2), colors=np.array(color), radii=radii, ), ) rr.components.DrawOrder(1) elif dimension == "3d": radii = self._parse_radii("3d") rr.log( entity_path, rr.Arrows3D( origins=np.array(head_points).reshape(-1, 3), vectors=np.array(data_points).reshape(-1, 3) / 2, # divided by two to make it shorter in visualization colors=np.array(color).reshape(-1, 3), radii=radii, ), )
[docs] def visualize(self, frame_idx: int) -> None: """ Visualize the gaze individual component. Combines the _log_data method to visualize the gaze individual component in either 2D or 3D. Args: frame_idx (int): The frame index. """ dataname = "3d_filtered" if "3d_filtered" in self.canvas_data else "3d" for canvas in self.canvas_list: if canvas == "3D_Canvas": for alg_idx, alg_data in enumerate(self.canvas_data[dataname]): if frame_idx >= alg_data.shape[2]: # number of frames continue alg_name = self.algorithm_list[alg_idx] for subject_idx, subject in enumerate(self.subject_names): subject_gaze_individual = -alg_data[subject_idx, 0, frame_idx] subject_eyes_middle_3d_data = self.eyes_middle_3d_data[subject_idx, 0, frame_idx] entity_path = self.logger.generate_component_entity_path( self.component_name, is_3d=True, alg_name=alg_name, subject_name=subject, ) # gaze interaction defines color if self.look_at_data is not None: if subject_idx + 1 < len(self.subject_names) - 1: # look at subject either one forward or one backward in # index look_to_subject = self.subject_names[subject_idx + 1] else: look_to_subject = self.subject_names[subject_idx - 1] color = self._get_look_at_color(subject_idx, alg_idx, look_to_subject, frame_idx) else: color = self.visualizer_config["media"]["gaze_individual"]["appearance"]["colors"][alg_idx] self._log_data( entity_path, subject_eyes_middle_3d_data, subject_gaze_individual, color, "3d", ) else: cam_name = canvas for alg_idx, alg_data in enumerate(self.canvas_data[dataname]): if frame_idx >= alg_data.shape[2]: # number of frames continue alg_name = self.algorithm_list[alg_idx] for subject_idx, subject in enumerate(self.subject_names): if subject_idx in self.visualizer_config["dataset_properties"]["cam_sees_subjects"][cam_name]: camera_data = self.projected_gaze_data_algs[alg_idx][canvas] if frame_idx >= camera_data.shape[1]: # number of frames continue frame_data = camera_data[subject_idx, frame_idx] subject_eyes_mid = self.camera_view_subjects_middle_point_dict[canvas][subject_idx][ frame_idx ][:2] entity_path = self.logger.generate_component_entity_path( self.component_name, is_3d=False, alg_name=alg_name, subject_name=subject, cam_name=cam_name, ) # gaze interaction defines color if self.look_at_data is not None: if subject_idx + 1 < len(self.subject_names) - 1: # look at subject either one forward or one # backward in index look_to_subject = self.subject_names[subject_idx + 1] else: look_to_subject = self.subject_names[subject_idx - 1] color = self._get_look_at_color(subject_idx, alg_idx, look_to_subject, frame_idx) else: color = self.visualizer_config["media"][self.component_name]["appearance"]["colors"][ alg_idx ] self._log_data(entity_path, subject_eyes_mid, frame_data, color, "2d")
[docs]class GazeInteractionComponent(Component): """ Class for visualizing gaze interaction data. """ def __init__(self, visualizer_config, io, logger, component_name): """ Initialize the GazeInteractionComponent. Args: visualizer_config (Dict): The visualizer configuration settings. io: The input/output object. logger(viewer.Viewer): The viewer rerun object. component_name (str): The name of the component. """ super().__init__(visualizer_config, io, logger, component_name) # selects first key - it might be distance_gaze_2d or distance_gaze_3d keyname = list(self.algorithms_results[0]["data_description"].item().keys())[0] self.camera_names = self.algorithms_results[0]["data_description"].item()[keyname]["axis1"] self.subject_names = self.algorithms_results[0]["data_description"].item()[keyname]["axis0"]
[docs] def get_lookat_data(self) -> Tuple[np.ndarray, List[str]]: """ Get the look at data. Returns: Tuple[np.ndarray, List[str]]: The look at data and the look at labels. """ # read from first algorithm if "gaze_look_at_3d" in self.algorithms_results[0]["data_description"].item(): data_name = "gaze_look_at_3d" else: data_name = "gaze_look_at_2d" data_labels = self._get_algorithms_labels(data_name)[0] # 0 first algorithm data = self.canvas_data[data_name][0] # 0 first alg return (data, data_labels)
def _get_algorithms_labels(self, data_name: str) -> List[List[str]]: """ Get the labels for the algorithms. Args: data_name (str): The data name. Returns: List[List[str]]: The labels for the algorithms. """ # axis 3 gives labels information, this might be different for each algorithm algorithm_labels = [] for i, _alg in enumerate(self.algorithm_list): algorithm_labels.append(self.algorithms_results[i]["data_description"].item()[data_name]["axis3"]) return algorithm_labels def _log_data(self): pass
[docs] def visualize(self): pass
[docs]class EmotionIndividualComponent(Component): """ Class for visualizing emotion individual data. """ def __init__(self, visualizer_config: Dict, io, logger, component_name: str): """ Initialize the EmotionIndividualComponent. Args: visualizer_config (Dict): The visualizer configuration settings. io: The input/output object. logger (viewer.Viewer): The viewer rerun object. component_name (str): The name of the component. """ super().__init__(visualizer_config, io, logger, component_name) # the camera_names and subject_names results will be read from first algorithm # we are getting camera names from landmarks_2d because 3d doesn't have any # camera info self.camera_names = self.algorithms_results[0]["data_description"].item()["emotions"][ "axis1" ] # axis1 gives camera info self.subject_names = self.algorithms_results[0]["data_description"].item()["emotions"][ "axis0" ] # axis0 gives subject info self.algorithm_labels = self._get_algorithms_labels() def _get_algorithms_labels(self) -> List[List[str]]: """ Get the labels for the algorithms. Returns: List[List[str]]: The labels for the algorithms. """ # axis 3 gives labels information, this might be different for each algorithm algorithm_labels = [] for i, _alg in enumerate(self.algorithm_list): algorithm_labels.append(self.algorithms_results[i]["data_description"].item()["emotions"]["axis3"]) return algorithm_labels def _log_data(self, entity_path: str, head_bbox: np.ndarray, colors: str, labels: str) -> None: """ Log the face bounding box and emotion. Args: entity_path (str): The entity path. head_points (np.ndarray): The head points. data_points (np.ndarray): The gaze points. color (List[int]): The color. dimension (str): The dimension. """ rr.log( entity_path, rr.Boxes2D( array=head_bbox, array_format=rr.Box2DFormat.XYWH, labels=labels, colors=colors, ), )
[docs] def visualize(self, frame_idx: int) -> None: """ Visualize the emotion individual component. Combines the _log_data and _log_annotation_context method to visualize the emotion individual component in camera views. Args: frame_idx (int): The frame index. """ dataname = "emotions" head_bbox = "faceboxes" for canvas in self.canvas_list: cam_name = canvas camera_index = self.camera_names.index(cam_name) for alg_idx, alg_data in enumerate(self.canvas_data[dataname]): alg_colors = self._parse_alg_color(alg_idx) if frame_idx >= alg_data.shape[2]: # number of frames continue alg_name = self.algorithm_list[alg_idx] for subject_idx, subject in enumerate(self.subject_names): if subject_idx in self.visualizer_config["dataset_properties"]["cam_sees_subjects"][cam_name]: subject_head_bbox = self.algorithms_results[alg_idx][head_bbox][ subject_idx, camera_index, frame_idx ] subject_emotion_probability = alg_data[subject_idx, camera_index, frame_idx] max_probability_idx = np.argmax(subject_emotion_probability) entity_path = self.logger.generate_component_entity_path( self.component_name, is_3d=False, alg_name=alg_name, subject_name=subject, cam_name=cam_name, ) self._log_data( entity_path, subject_head_bbox, labels=self.algorithm_labels[alg_idx][max_probability_idx], colors=alg_colors[max_probability_idx], )
[docs]class HeadOrientationComponent(Component): """ Class for visualizing head orientation data. """ def __init__(self, visualizer_config: Dict, io, logger, component_name: str): """ Initialize the HeadOrientationComponent. Args: visualizer_config (Dict): The visualizer configuration settings. io: The input/output object. logger (viewer.Viewer): The viewer rerun object. component_name (str): The name of the component. """ super().__init__(visualizer_config, io, logger, component_name) # the camera_names and subject_names results will be read from first algorithm # we are getting camera names from landmarks_2d because 3d doesn't have any # camera info self.camera_names = self.algorithms_results[0]["data_description"].item()["headpose"][ "axis1" ] # axis1 gives camera info self.subject_names = self.algorithms_results[0]["data_description"].item()["headpose"][ "axis0" ] # axis0 gives subject info self.algorithm_labels = self._get_algorithms_labels() def _get_algorithms_labels(self) -> List[List[str]]: """ Get the labels for the algorithms. Returns: List[List[str]]: The labels for the algorithms. """ # axis 3 gives labels information, this might be different for each algorithm algorithm_labels = [] for i, _alg in enumerate(self.algorithm_list): algorithm_labels.append( self.algorithms_results[i]["data_description"].item()["head_orientation_2d"]["axis3"] ) return algorithm_labels def _log_data( self, entity_path: str, head_points: np.ndarray, data_points: np.ndarray, color: List[int], dimension: str, ) -> None: """ Log the head orientation points into. Args: entity_path (str): The entity path. head_points (np.ndarray): The head points. data_points (np.ndarray): The gaze points. color (List[int]): The color. dimension (str): The dimension. """ vectors_forward = data_points[0:2] - head_points if dimension == "2d": radii = self._parse_radii("camera_view") rr.log( entity_path, rr.Arrows2D( origins=np.array(head_points).reshape(-1, 2), vectors=np.array(vectors_forward).reshape(-1, 2), colors=np.array(color), radii=radii, ), ) rr.components.DrawOrder(1)
[docs] def visualize(self, frame_idx: int) -> None: """ Visualize the head orientation component. Combines the _log_data method to visualize the head orientation component in either 2D. Args: frame_idx (int): The frame index. """ for canvas in self.canvas_list: cam_name = canvas camera_index = self.camera_names.index(cam_name) for alg_idx, alg_data in enumerate(self.canvas_data["head_orientation_2d"]): num_frames = alg_data.shape[2] if frame_idx >= num_frames: # number of frames continue alg_name = self.algorithm_list[alg_idx] for subject_idx, subject in enumerate(self.subject_names): if subject_idx in self.visualizer_config["dataset_properties"]["cam_sees_subjects"][cam_name]: frame_data = alg_data[subject_idx, camera_index, frame_idx] entity_path = self.logger.generate_component_entity_path( self.component_name, is_3d=False, alg_name=alg_name, subject_name=subject, cam_name=cam_name, ) color = self.visualizer_config["media"][self.component_name]["appearance"]["colors"][alg_idx] self._log_data(entity_path, frame_data[:2], frame_data[2:], color, "2d")
[docs]class ProximityComponent(Component): """ Class for visualizing proximity data. """ def __init__( self, visualizer_config: Dict, io, logger, component_name: str, eyes_middle_3d_data: Tuple[np.ndarray, List[str]] = None, eyes_middle_2d_data: Tuple[np.ndarray, List[str]] = None, ): """ Initialize the ProximityComponent. Args: visualizer_config (Dict): The visualizer configuration settings. io: The input/output object. logger (viewer.Viewer): The viewer rerun object. component_name (str): The name of the component. eyes_middle_3d_data (Tuple[np.ndarray, List[str]], optional): The 3D eyes middle data. Defaults to None. eyes_middle_2d_data (Tuple[np.ndarray, List[str]], optional): The 2D eyes middle data. Defaults to None. """ super().__init__(visualizer_config, io, logger, component_name) self.camera_names = self.algorithms_results[0]["data_description"].item()["body_distance_2d"]["axis1"] self.subject_names = self.algorithms_results[0]["data_description"].item()["body_distance_2d"]["axis0"] # create subjects middle data # 3d self.eyes_middle_3d_data, _ = eyes_middle_3d_data if self.eyes_middle_3d_data is not None: first_subject_eyes_middle_data = self.eyes_middle_3d_data[0, 0, :].mean(axis=0) second_subject_eyes_middle_data = self.eyes_middle_3d_data[1, 0, :].mean(axis=0) self.middle_point_3d = (first_subject_eyes_middle_data + second_subject_eyes_middle_data) / 2 # 2d - camera view self.eyes_middle_2d_data, _ = eyes_middle_2d_data # create camera view - middle point dictionary self.camera_view_middle_point_dict = {} for cam in self.camera_names: camera_idx = self.camera_names.index(cam) first_subject_eyes_middle_data = self.eyes_middle_2d_data[0, camera_idx, :].mean(axis=0) second_subject_eyes_middle_data = self.eyes_middle_2d_data[1, camera_idx, :].mean(axis=0) middle_point = (first_subject_eyes_middle_data + second_subject_eyes_middle_data) / 2 self.camera_view_middle_point_dict[cam] = middle_point def _get_algorithms_labels(self) -> List[List[str]]: """ Get the labels for the algorithms. Returns: List[List[str]]: The labels for the algorithms. """ # axis 3 gives labels information, this might be different for each algorithm algorithm_labels = [] for i, _alg in enumerate(self.algorithm_list): algorithm_labels.append(self.algorithms_results[i]["data_description"].item()["body_distance_2d"]["axis3"]) return algorithm_labels def _log_data( self, entity_path: str, data_points: np.ndarray, alg_idx: int, mid_point: np.ndarray, dimension: str, ) -> None: """ Logs the proximity score in rerun. Args: entity_path (str): The entity path. data_points (np.ndarray): The data points. alg_idx (int): The algorithm index. mid_point (np.ndarray): The middle point. dimension (str): The dimension. """ color = self._parse_alg_color(alg_idx) if dimension == "2d": radii = self._parse_radii("camera_view") proximity_start = np.array([mid_point[0] - (data_points / 2), mid_point[1] - 100]) proximity_end = np.array([mid_point[0] + (data_points / 2), mid_point[1] - 100]) rr.log( entity_path, rr.LineStrips2D( np.vstack((proximity_start, proximity_end)), colors=color, radii=radii, labels="Proximity", ), ) elif dimension == "3d": radii = self._parse_radii("3d") proximity_start = np.array([mid_point[0] - (data_points / 2), mid_point[1] - 0.5, mid_point[2]]) proximity_end = np.array([mid_point[0] + (data_points / 2), mid_point[1] - 0.5, mid_point[2]]) rr.log( entity_path, rr.LineStrips3D( np.vstack((proximity_start, proximity_end)), colors=color, radii=radii, labels="Proximity", ), )
[docs] def visualize(self, frame_idx: int) -> None: """ Visualize the proximity component. Uses the _log_data method to visualize the proximity component in either 2D or 3D. Args: frame_idx (int): The frame index. """ for canvas in self.canvas_list: if canvas == "3D_Canvas": for alg_idx, alg_data in enumerate(self.canvas_data["body_distance_3d"]): alg_name = self.algorithm_list[alg_idx] if frame_idx >= alg_data.shape[2]: # number of frames continue frame_proximity = alg_data[:, 0, frame_idx, 0][0] entity_path = self.logger.generate_component_entity_path( self.component_name, is_3d=True, alg_name=alg_name ) self._log_data( entity_path, frame_proximity, alg_idx, self.middle_point_3d, "3d", ) else: cam_name = canvas for alg_idx, alg_data in enumerate(self.canvas_data["body_distance_2d"]): alg_name = self.algorithm_list[alg_idx] camera_idx = self.camera_names.index(canvas) if frame_idx >= alg_data.shape[2]: # number of frames continue frame_proximity = alg_data[:, camera_idx, frame_idx, 0][0] entity_path = self.logger.generate_component_entity_path( self.component_name, is_3d=False, alg_name=alg_name, cam_name=cam_name, ) mid_point = self.camera_view_middle_point_dict[cam_name] self._log_data(entity_path, frame_proximity, alg_idx, mid_point, "2d")
[docs]class KinematicsComponent(Component): """ Class for visualizing kinematics data. """ def __init__(self, visualizer_config: Dict, io, logger, component_name: str): """ Initialize the KinematicsComponent. Args: visualizer_config (Dict): The visualizer configuration settings. io: The input/output object. logger (viewer.Viewer): The viewer rerun object. component_name (str): The name of the component. """ super().__init__(visualizer_config, io, logger, component_name) self.camera_names = self.algorithms_results[0]["data_description"].item()["velocity_body_2d"]["axis1"] self.subject_names = self.algorithms_results[0]["data_description"].item()["velocity_body_2d"]["axis0"] self.subject_names_2d = self.algorithms_results[0]["data_description"].item()["velocity_body_2d"]["axis0"] if "velocity_body_3d" in self.canvas_data: self.subject_names_3d = self.algorithms_results[0]["data_description"].item()["velocity_body_3d"]["axis0"] def _get_algorithms_labels(self) -> List[List[str]]: """ Get the labels for the algorithms. Returns: List[List[str]]: The labels for the algorithms. """ # axis 3 gives labels information, this might be different for each algorithm algorithm_labels = [] for i, _alg in enumerate(self.algorithm_list): algorithm_labels.append(self.algorithms_results[i]["data_description"].item()["velocity_body_2d"]["axis3"]) return algorithm_labels def _get_joints_movement_by_bodypart(self, alg_idx: int) -> np.ndarray: """ Get the joints movement by body part for the algorithm index. Args: alg_idx (int): The algorithm index. """ bodypart_motion = [] labels = self._get_algorithms_labels()[alg_idx] for _bodypart, joints in self.visualizer_config["media"][self.component_name]["joints"].items(): data = self.algorithms_data[alg_idx] joint_indices = [labels.index(joint) for joint in joints] # THRESHOLD = 0.3 # data[..., :, 0][data[..., : , 0]< THRESHOLD] = 0 selected_data = data[:, :, :, joint_indices, 0:1] # if use only 0, instead of 0:1, last dimension drops bodypart_motion.append(np.nanmean(selected_data, axis=-2)) bodypart_motion = np.concatenate(bodypart_motion, axis=-1) return bodypart_motion def _log_data(self, entity_path: str, data_points: np.ndarray) -> None: """ Log the data points in rerun. Args: entity_path (str): The entity path. data_points (np.ndarray): The data points. """ rr.log(entity_path, rr.Scalar(np.round(data_points, decimals=2)))
[docs] def visualize(self, frame_idx: int) -> None: """ Visualize the kinematics component. Uses the _log_data method to visualize the kinematics component. Args: frame_idx (int): The frame index. """ for data_name, data in self.canvas_data.items(): if data_name == "velocity_body_3d": subject_names = self.subject_names_3d else: subject_names = self.subject_names_2d for alg_idx, _alg_data in enumerate(data): alg_name = self.algorithm_list[alg_idx] for subject_idx, subject in enumerate(subject_names): joints_bodypart_motion = self._get_joints_movement_by_bodypart(alg_idx) for idx, bodypart in enumerate( self.visualizer_config["media"][self.component_name]["joints"].keys() ): if data_name == "velocity_body_3d": if frame_idx >= joints_bodypart_motion.shape[2]: # number of frames continue frame_bodypart_data = joints_bodypart_motion[subject_idx, 0, frame_idx][idx] entity_path = self.logger.generate_component_entity_path( self.component_name, is_3d=True, alg_name=alg_name, subject_name=subject, bodypart=bodypart, ) self._log_data(entity_path, frame_bodypart_data) elif data_name == "velocity_body_2d": for camera_idx, camera in enumerate(self.camera_names): if frame_idx >= joints_bodypart_motion.shape[2]: # number of frames continue frame_kinematic_2d = joints_bodypart_motion[subject_idx, camera_idx, frame_idx][idx] entity_path = self.logger.generate_component_entity_path( self.component_name, is_3d=False, alg_name=alg_name, subject_name=subject, cam_name=camera, bodypart=bodypart, ) self._log_data(entity_path, frame_kinematic_2d)