Source code for nicetoolbox.detectors.feature_detectors.kinematics.velocity_body

"""
Velocity Body feature detector class for kinematics of the body.
"""

import logging
import os

import numpy as np

from ....configs.config_handler import load_config, load_validated_config_raw
from ....configs.schemas.predictions_mapping import PredictionsMappingConfig
from ....utils import check_and_exception as check
from ..base_feature import BaseFeature
from . import utils as kinematics_utils


[docs]class VelocityBody(BaseFeature): """ The VelocityBody class is a feature detector that computes the kinematics component. The VelocityBody feature detector accepts the human_pose component (body_joints) as itsprimary input, which is computed using the human_pose method detector. The kinematics component of this feature detector calculates the Euclidean distance between adjacent frames, essentially determining the velocity of body movement from one frame to the next. Component: kinematics Attributes: components (list): A list containing the name of the component this class is responsible for: kinematics: algorithm (str): The name of the algorithm used to compute the kinematics component. velocity_body: the velocity of body movement from one frame to the next predictions_mapping (dict): A dictionary containing the mapping of body parts to their respective indices. camera_names (list): A list containing the names of the cameras used to capture the original input data. bodyparts_list (list): A list containing the names of the body parts. fps (int): The frames per second of the input data. """ components = ["kinematics"] algorithm = "velocity_body" def __init__(self, config, io, data): """ Setup input/output folders and data for the Kinematics feature detector. This method initializes the Kinematics class by setting up the necessary configurations, input/output handler, and data. It also extracts the body_joints component and algorithm from the configuration and prepares the list of body parts. It supports handling of multiple cameras by loading the camera names from the pose configuration. Args: config (dict): The configuration settings for the feature detector. It should include 'input_detector_names' key which contains joints component and algorithm. io (object): The input/output handler object. It should have a method 'get_detector_output_folder' which returns the output folder for the joints detector. data (object): The data object containing the input data. It should have an attribute 'fps' which represents the frames per second of the input data. """ super().__init__(config, io, data, requires_out_folder=False) joints_component, joints_algorithm = [ name for name in config["input_detector_names"] if any(["joints" in s for s in name]) ][0] pose_config_folder = io.get_detector_output_folder(joints_component, joints_algorithm, "run_config") pose_config = load_config(os.path.join(pose_config_folder, "run_config.toml")) predictions_mapping_config = load_validated_config_raw( "./configs/predictions_mapping.toml", PredictionsMappingConfig ) self.predictions_mapping = predictions_mapping_config["human_pose"][pose_config["keypoint_mapping"]] self.camera_names = pose_config["camera_names"] self.bodyparts_list = list(self.predictions_mapping["bodypart_index"].keys()) self.fps = data.fps # # will be used during visualizations # viz_camera_name = config['viz_camera_name'].strip('<').strip('>') # self.frames_data = os.path.join(pose_config['input_data_folder'], # data.camera_mapping[viz_camera_name]) # self.frames_data_list = [os.path.join(self.frames_data, f) # for f in sorted(os.listdir(self.frames_data))] logging.info(f"Feature detector {self.components} {self.algorithm} initialized.")
[docs] def compute(self): """ Computes the kinematics component. This method calculates the Euclidean distance between adjacent frames for each keypoint. It handles both 2D and 3D data. The method starts by loading the joint data from the input files. It then computes the differences between adjacent frames for each keypoint. A zero-filled frame is added at the beginning to match the original number of frames. Finally, the Euclidean distance for each keypoint between adjacent frames is computed. The computed differences are stored in a dictionary and saved to a compressed .npz file with the following structure: - displacement_vector_body_2d: A numpy array containing the computed differences for 2D data. - velocity_body_2d: A numpy array containing the computed velocity for 2D data. - displacement_vector_body_3d: A numpy array containing the computed differences for 3D data. - velocity_body_3d: A numpy array containing the computed velocity for 3D data. - data_description: A dictionary containing the data description for all of the above output numpy arrays. See the documentation of the output for more details. Returns: out_dict (dict): A dictionary containing the above mentioned parts of the kinematics component. """ joint_data = np.load(self.input_files[0], allow_pickle=True) dimensions = ["2d"] if "3d" in joint_data["data_description"].item(): dimensions.append("3d") out_dict = {"data_description": {}} for dim in dimensions: dim_data = "2d_filtered" if dim == "2d" else dim data = joint_data[dim_data] data_description = joint_data["data_description"].item()[dim] # data.shape = (#persons, #cameras, #frames, #joints/keypoints, 3) if dim == "2d": # check if it is got from single camera) # if data is 2d get only 2 values from last cell keypoints = data[..., :2] conf_score = data[..., 2, np.newaxis] elif dim == "3d": keypoints = data[..., :3] conf_score = data[..., 3, np.newaxis] # Compute the differences for each keypoint between adjacent frames # We first allocate tensor with difference for each frame differences = np.full_like(keypoints, np.nan) # differences[t] gives the diff btw [t] and [t-1] # first frame keeps nan (can't calculate velocity if no frame existed before) differences[:, :, 1:] = keypoints[:, :, 1:] - keypoints[:, :, :-1] # calculate confidence score, saves the minimum confidence between consecutive frames # again, first frame keeps nan min_confidence = np.full_like(conf_score, np.nan) min_confidence[:, :, 1:] = np.minimum(conf_score[:, :, 1:], conf_score[:, :, :-1]) # Compute the Euclidean distance for each keypoint between adjacent frames motion_magnitude = np.linalg.norm(differences, axis=-1, keepdims=True) motion_velocity = motion_magnitude * self.fps # Add confidence score to results differences = np.concatenate([differences, min_confidence], axis=-1) motion_velocity = np.concatenate([motion_velocity, min_confidence], axis=-1) # Standardized # motion_magnitude_mean = np.nanmean(motion_magnitude, axis=0) # motion_magnitude_std = np.nanstd(motion_magnitude, axis=0) # standardized_magnitudes = (motion_magnitude - motion_magnitude_mean) / # motion_magnitude_std # save results del data_description["axis4"] out_dict.update( { f"displacement_vector_body_{dim}": differences, f"velocity_body_{dim}": motion_velocity, } ) out_dict["data_description"].update( { f"displacement_vector_body_{dim}": dict( **data_description, axis4=[ "coordinate_x", "coordinate_y", "coordinate_z", "confidence_score", ] if dim == "3d" else ["coordinate_x", "coordinate_y", "confidence_score"], ), f"velocity_body_{dim}": dict(**data_description, axis4=["velocity", "confidence_score"]), } ) save_file_path = os.path.join(self.result_folders["kinematics"], f"{self.algorithm}.npz") np.savez_compressed(save_file_path, **out_dict) logging.info(f"Computation of feature detector for {self.components} completed.") return out_dict
[docs] def visualization(self, out_dict): """ Creates visualizations for the computed kinematics component. This method generates visualizations for the computed kinematics component. It checks if the output dictionary contains the keys 'velocity_body_2d' and 'velocity_body_3d'. If these keys are present, their corresponding values are used to create the visualizations. The method calculates the sum of movement per body part using the post_compute method. It then determines the global minimum and maximum values to define the y-limits of the graphs. Finally, it calls the visualize_mean_of_motion_magnitude_by_bodypart function from the kinematics_utils module to generate the visualizations. Parameters: out_dict (dict): The output dictionary containing the computed kinematics component. It should contain the keys 'velocity_body_2d' and/or 'velocity_body_3d'. """ logging.info(f"Visualizing the feature detector output {self.components}.") data = {} if "velocity_body_2d" in out_dict: data["2d"] = out_dict["velocity_body_2d"] if "velocity_body_3d" in out_dict: data["3d"] = out_dict["velocity_body_3d"] for dim, velocity_body in data.items(): # Calculate sum of movement per bodypart motion_per_bodypart = self.post_compute(velocity_body) # Determine global_min and global_max - define y-lims of graphs # global_min = np.nanmin(motion_per_bodypart) - 0.05 # global_max = np.nanmax(motion_per_bodypart) + 0.05 camera_names = self.camera_names if dim == "2d" else ["3d"] kinematics_utils.visualize_mean_of_motion_magnitude_by_bodypart( motion_per_bodypart, self.bodyparts_list, self.viz_folder, self.subjects_descr, camera_names, ) # kinematics_utils.create_video_evolving_linegraphs( # self.frames_data_list, motion_per_bodypart, self.bodyparts_list, # global_min, global_max, self.viz_folder) logging.info(f"Visualization of feature detector {self.components} completed.")
[docs] def post_compute(self, motion_velocity): """ Calculates the sum of movement per body part. This method calculates the sum of movement per body part by averaging the motion velocity over the joint indices corresponding to each body part. It then concatenates the results for all body parts. The method also checks for any [0,0,0] prediction using the check_zeros function from the check module. Parameters: motion_velocity (numpy.ndarray): A 5D numpy array with shape (#persons, #cameras, #frames, #joints/keypoints, 1/velocity) representing the motion velocity. Returns: bodypart_motion (numpy.ndarray): A 4D numpy array with shape (#persons, #cameras, #frames, #bodyparts) representing the sum of movement per body part. """ bodypart_motion = [] for joint_indices in self.predictions_mapping["bodypart_index"].values(): bodypart_motion.append(np.nanmean(motion_velocity[:, :, :, joint_indices, :], axis=-2)) bodypart_motion = np.concatenate(bodypart_motion, axis=-1) # check for any [0,0,0] prediction check.check_zeros(bodypart_motion[:, :, 1:]) return bodypart_motion