Source code for nicetoolbox.detectors.data

"""
Data module handling the data loading and processing of the give datasets.
"""

import glob
import logging
import os

import numpy as np

from ..utils import check_and_exception as exc
from ..utils import in_out as ut_in_out
from ..utils import system as oslab_sys
from ..utils import video as vid


[docs]class Data: """ A data class for loading and processing data. Attributes: name (str): The name of the data. data_folder (str): The folder path for the data. tmp_folder (str): The folder path for temporary files. code_folder (str): The folder path for code files. data_input_folder (str): The folder path for input data. session_ID (str): The session ID. sequence_ID (str): The sequence ID. video_length (int): The length of the video. video_start (int): The starting frame of the video. video_skip_frames (int or None): The number of frames to skip in the video. annotation_interval (float): The time interval for segments. subjects_descr (str): The description of the subjects. camera_mapping (dict): The mapping of camera names. data_formats (list): The list of data formats required. all_camera_names (list): The list of all camera names. segments_list (None or list): The list of data segments. frames_list (None or list): The list of frames. frame_indices_list (None or list): The list of frame indices. snippets_list (None or list): The list of snippets. calibration (dict): The camera calibration. fps (int): The frames per second of the input video files. """ name = "data" def __init__( self, config, io, data_formats, all_camera_names, all_dataset_names ) -> None: """ Initialize the Data class. Args: config (dict): The configuration dictionary. io (IO): The IO object for file and folder operations. data_formats (list): The list of data formats required. all_camera_names (list): The list of all camera names. Returns: None """ logging.info("Start DATA LOADING and processing.") # collect all required file/folder paths self.data_folder = io.get_data_folder() self.tmp_folder = io.get_output_folder("tmp") self.code_folder = config["code_folder"] self.data_input_folder = io.get_input_folder() # collect data details from config self.session_ID = config["session_ID"] self.sequence_ID = config["sequence_ID"] self.video_length = config["video_length"] self.video_start = config["video_start"] self.video_skip_frames = None self.annotation_interval = 2.0 self.subjects_descr = config["subjects_descr"] self.start_frame_index = config["start_frame_index"] self.camera_mapping = dict( (key, config[key]) for key in config if "cam_" in key ) # collect which data slices and formats are required self.data_formats = data_formats self.all_camera_names = all_camera_names self.segments_list = None self.frames_list = None self.frame_indices_list = None self.snippets_list = None # DATA INITIALIZATION self.data_initialization() # LOAD CALIBRATION self.calibration = self.load_calibration( io.get_calibration_file(), config["dataset_name"], all_dataset_names ) self.fps = self.get_fps(config["fps"]) logging.info("Data loading and processing finished.\n\n")
[docs] def get_inference_path(self, component_name, detector_name): """ Get the file path for the inference script of a given detector. Args: detector_name (str): The name of the detector. Returns: str: The file path for the inference script. Raises: FileNotFoundError: If the inference script file does not exist. """ filepath = os.path.join( self.code_folder, "nicetoolbox", "detectors", "method_detectors", component_name, f"{detector_name}_inference.py", ) try: exc.file_exists(filepath) except FileNotFoundError: logging.exception(f"Detector inference file {filepath} does not exist!") raise return filepath
[docs] def get_venv_path(self, detector_name, env_name): """ Get the file path of the virtual environment for the given detector and environment name. Args: detector_name (str): The name of the detector. env_name (str): The name of the environment. Returns: str: The file path of the virtual environment. Raises: FileNotFoundError: If the virtual environment does not exist. """ os_type = oslab_sys.detect_os_type() if os_type == "linux": filepath = os.path.join(self.code_folder, "envs", env_name, "bin/activate") elif os_type == "windows": filepath = os.path.join( self.code_folder, "envs", env_name, "Scripts", "activate" ) try: exc.file_exists(filepath) except FileNotFoundError: logging.exception( f"Virtual environment file {filepath} for detector = " f"'{detector_name}' does not exist!" ) raise return filepath
[docs] def get_fps(self, config_fps): """ Get the frames per second (fps) of the input video files. Args: config_fps (int): The desired fps specified in the configuration. Returns: int: The fps of the input video files. If the input formats are not 'mp4' or 'avi', the config_fps value is returned. """ input_formats = self.get_input_format(self.all_camera_names) if input_formats in ["mp4", "avi"]: example_input_folder = self.data_input_folder.replace( "<camera_name>", next(iter(self.all_camera_names)) ) video_files = sorted(glob.glob(os.path.join(example_input_folder, "*"))) fps = vid.get_fps(video_files[0]) if fps != config_fps: logging.warning( f"Detected fps = {fps} does not match fps given in the " f"config = {config_fps}!" ) return fps return config_fps
[docs] def load_calibration(self, calibration_file, dataset_name, all_dataset_names): """ Load camera calibration from a file for a specific dataset. Currently implemented for the datasets 'dyadic_communication' and 'mpi_inf_3dhp'. Args: calibration_file (str): The path to the calibration file. dataset_name (str): The name of the dataset. Returns: dict: A dictionary containing the loaded camera calibration. Raises: NotImplementedError: If loading camera calibration for the specified dataset is not implemented. """ try: exc.check_options(dataset_name, str, all_dataset_names) except (TypeError, ValueError): logging.exception( f"Loading camera calibration for dataset '{dataset_name}' is " f"not implemented." ) raise NotImplementedError from None calib_details = "__".join( [word for word in [self.session_ID, self.sequence_ID] if word] ) loaded_calib = np.load(calibration_file, allow_pickle=True)[ calib_details ].item() calib = dict( (key, value) for key, value in loaded_calib.items() if key in self.all_camera_names ) return calib
[docs] def get_input_format(self, camera_names): """ Get the input format for the given camera names. Args: camera_names (list): A list of camera names. Returns: str: The input format for the given camera names. Raises: ValueError: If multiple or no valid input format is found in the data input folder. """ example_input_folder = self.data_input_folder.replace( "<camera_name>", next(iter(camera_names)) ) input_formats = [ name in "_".join(sorted(os.listdir(example_input_folder))) for name in [".mp4", ".avi", ".png", ".jpg", ".jpeg"] ] if sum(input_formats) != 1: exc.error_log_and_raise( ValueError, "Reading input data", f"Multiple/no valid input format found in '{self.data_input_folder}'. " f"Found '{input_formats}', valid formats are ['mp4', 'avi'].", ) input_format = ["mp4", "avi", "png", "jpg", "jpeg"][input_formats.index(True)] return input_format
[docs] def get_inputs_list(self, input_format, data_format, camera_names): """ Returns a list of input file paths based on the specified input format, data format, and camera names. Args: input_format (str): The format of the input files. data_format (str): The format/type of the video data. One of snippets, segments, or frames. camera_names (list): A list of camera names. Returns: list: A list of input file paths. """ start = self.video_start end = self.video_start + self.video_length inputs_list = [] if data_format == "snippets": for camera_name in camera_names: file_name = f"{camera_name}_s{start}_e{end}.{input_format}" inputs_list.append( os.path.join(self.data_folder, camera_name, "snippets", file_name) ) elif data_format == "segments": video_files = sorted(glob.glob(os.path.join(self.data_input_folder, "*"))) step = int(self.annotation_interval * vid.get_fps(video_files[0])) file_names = [ f"s{s}_e{s + step}.{input_format}" for s in range(start, end, step) ] for camera_name in camera_names: inputs_list += [ os.path.join(self.data_folder, camera_name, "segments", n) for n in file_names ] elif data_format == "frames": skip = 1 if not self.video_skip_frames else self.video_skip_frames file_names = [f"{x:05d}.png" for x in range(start, end, skip)] for camera_name in camera_names: inputs_list += [ os.path.join(self.data_folder, camera_name, "frames", n) for n in file_names ] return inputs_list
[docs] def data_initialization(self): """ Initializes the data required for running NICE toolbox. This method performs the following steps: 1. Determines the input format based on the available camera names. 2. Creates a list of all input files required to run NICE toolbox. 3. Checks whether all required data files exist. 4. Initializes data lists for frames, segments, and snippets. 5. If the data exists, extracts frame indices and organizes frames by camera name. 6. If the data does not exist, creates the required data from video or frames. 7. Logs the completion of data creation. """ # find data input format input_format = self.get_input_format(self.all_camera_names) # create a list of all input files required to run the nice toolbox # given the current run_config.toml data_list = [] for data_format in self.data_formats: data_list += self.get_inputs_list( input_format, data_format, self.all_camera_names ) # check whether all required data exists already data_exists = True for file in data_list: if not os.path.isfile(file): data_exists = False # initialize data lists self.frames_list = [] self.segments_list = [] self.snippets_list = [] if data_exists: logging.info(f"DATA FOUND in '{self.data_folder}'!") frames_list = [] frame_indices_list = set() for filename in data_list: if "frames" in filename: frames_list.append(filename) frame_indices_list.add(int(os.path.basename(filename)[:-4])) elif "segments" in filename: self.segments_list.append(filename) elif "snippets" in filename: self.snippets_list.append(filename) self.frame_indices_list = list(frame_indices_list) for camera_name in sorted(self.all_camera_names): cam_frames = sorted( [file for file in frames_list if camera_name in file] ) self.frames_list.append(cam_frames) self.frames_list = [ frame.tolist() for frame in np.array(self.frames_list).T ] else: logging.info( f"DATA NOT EXISTING OR INCOMPLETE! Creating data in " f"'{self.data_folder}'!" ) if input_format in ["avi", "mp4"]: self.create_inputs_from_video() elif input_format in ["png", "jpg", "jpeg"]: self.create_inputs_from_frames(input_format) logging.info("DATA creation completed.")
[docs] def create_inputs_from_video(self): """ Create inputs from video files. This method detects video input files, splits them into frames, and organizes the frames into different data formats which are frames, segments, and snippets. Raises: AssertionError: If the length of the frame indices list does not match the specified video length. AssertionError: If the frame indices of different cameras do not match. """ # detect all video input files video_files = sorted( glob.glob( os.path.join(self.data_input_folder.replace("<camera_name>", "*"), "*") ) ) for video_file in video_files: camera_name_indices = [ name.lower() in video_file.lower() for name in list(self.all_camera_names) ] if not any(camera_name_indices): continue camera_name = list(self.all_camera_names)[camera_name_indices.index(True)] # split video into frames data_folder = os.path.join(self.data_folder, camera_name) os.makedirs(data_folder, exist_ok=True) if "frames" in self.data_formats: os.makedirs(os.path.join(data_folder, "frames"), exist_ok=True) frames_list, frame_indices_list = vid.split_into_frames( video_file, os.path.join(data_folder, "frames/"), self.video_start, self.video_length, self.video_skip_frames, ) assert len(frame_indices_list) == self.video_length, ( f"ERROR. len(frame_indices_list) = " f"{len(frame_indices_list)} and self.video_length = " f"{self.video_length}" ) if self.frame_indices_list is None: self.frame_indices_list = frame_indices_list self.frames_list = [[f] for f in frames_list] else: for n, (i_old, i_new, f) in enumerate( zip(self.frame_indices_list, frame_indices_list, frames_list) ): assert ( i_old == i_new ), "Frame indices of different cameras do not match!" self.frames_list[n].append(f) if "segments" in self.data_formats: os.makedirs(os.path.join(data_folder, "segments"), exist_ok=True) # calculate frames per annotation interval frames_per_segment = int( self.annotation_interval * vid.get_fps(video_files[0]) ) # split video into segments of length annotation_interval self.segments_list = vid.equal_splits_by_frames( video_file, os.path.join(data_folder, "segments/"), frames_per_segment, keep_last_split=False, start_frame=self.video_start, number_of_frames=self.video_length, ) if "snippets" in self.data_formats: os.makedirs(os.path.join(data_folder, "snippets"), exist_ok=True) out_name = ( f"{camera_name}_s{self.video_start}_e" f"{self.video_start + self.video_length}" ) # cut video to the required number of frames out_video_file = vid.cut_length( video_file, os.path.join(data_folder, f"snippets/{out_name}"), start_frame=self.video_start, number_of_frames=self.video_length, ) # read result list self.snippets_list.append(out_video_file)
[docs] def create_inputs_from_frames(self, input_format): """ Processes frames and organizes them into specified data formats for further processing in the NICE pipeline. This method iterates through all camera names, and for each camera, it performs the following operations based on the given data formats: 1. Frames: For each frame in the specified range, it checks if the frame exists in the input directory. If it does, the method creates a symbolic link in the output directory under a 'frames' subdirectory. 2. Segments: (Not Implemented) This part is intended to split the video into segments of a specified length based on the annotation interval. 3. Snippets: (Not Implemented) This part is intended to cut the video into snippets based on the specified start and length. Args: input_format (str): The file format of the input frames (e.g., 'jpg', 'png'). Raises: NotImplementedError: If the filename convention inferred from the first frame's filename does not apply to any frame or if the 'segments' or 'snippets' data formats are specified, as these are not implemented. """ frames_list = [] frame_indices_list = set() for camera_name in self.all_camera_names: # define frames input folder frames_input_folder = self.data_input_folder.replace( "<camera_name>", camera_name ) input_frame_paths = sorted( glob.glob(os.path.join(frames_input_folder, f"*.{input_format}")) ) # guess for number of characters in filename base base_name = ".".join(os.path.basename(input_frame_paths[0]).split(".")[:-1]) chars = "".join([b for b in base_name if not b.isdigit()]) if base_name.isdigit(): filename_template = f"%0{len(base_name)}d.{input_format}" elif base_name[0].isdigit() and base_name[: -len(chars)].isdigit(): # in case the filename starts with a digit and all letters are in # the end filename_template = ( f"%0{len(base_name[:-len(chars)])}d{chars}.{input_format}" ) elif not base_name[0].isdigit() and base_name[len(chars) :].isdigit(): # in case the filename starts with a letter and all digits are in # the end filename_template = ( f"{chars}%0{len(base_name[len(chars):])}d.{input_format}" ) else: logging.error( f"Can not detect filename pattern from file basename {base_name}." ) # split video into frames data_folder = os.path.join(self.data_folder, camera_name) os.makedirs(data_folder, exist_ok=True) if "frames" in self.data_formats: os.makedirs(os.path.join(data_folder, "frames"), exist_ok=True) camera_frames_list = [] skip = ( self.video_skip_frames if self.video_skip_frames is not None else 1 ) for iteration, frame_idx in enumerate( range(self.video_start, self.video_start + self.video_length, skip) ): dataset_frame_idx = frame_idx + self.start_frame_index input_frame_indices = np.where( [ filename_template % dataset_frame_idx in path for path in input_frame_paths ] )[0] if len(input_frame_indices) != 1: exc.error_log_and_raise( NotImplementedError, "Create input data from frames. Detected dataset filename ", f"convention '{filename_template}', not applicable for " f"camera name '{camera_name}' and frame index " f"'{dataset_frame_idx}'.", ) input_frame_idx = input_frame_indices[0] in_framename = input_frame_paths[input_frame_idx] out_filename = os.path.join( data_folder, "frames", f"{frame_idx:05d}.png" ) if (iteration == 0) and (input_frame_idx != self.video_start): logging.warning( "Create input data from frames.", f"First input frame index '{input_frame_idx}' does not " f"match video start '{self.video_start}'. Dataset " f"filename '{in_framename}' is likely incorrect.", ) if not os.path.exists(out_filename): # create system link os.symlink(in_framename, out_filename) # update class attributes frame_list and frame_indices_list frame_indices_list.add(frame_idx) camera_frames_list.append(out_filename) frames_list.append(camera_frames_list) if "segments" in self.data_formats: raise NotImplementedError if "snippets" in self.data_formats: raise NotImplementedError self.frame_indices_list = list(frame_indices_list) self.frames_list = [list(pair) for pair in zip(*frames_list)]
def create_symlink_input_folder(self, data_format, camera_names): camera_names = [cam for cam in camera_names if cam != ""] # define folder structure and naming folder_name = ( f"{data_format}_{'_'.join(camera_names)}_" f"s{self.video_start}_" f"e{self.video_start + self.video_length}" ) if data_format == "frames": folder_name += f"_s{self.video_skip_frames}" if data_format == "segments": folder_name += f"_s{self.annotation_interval}" data_folder = os.path.join( self.data_folder, "symlink_input_folders", folder_name ) # get the list of needed input files input_format = self.get_input_format(camera_names) source_file_list = self.get_inputs_list(input_format, data_format, camera_names) # Check if the data folder & symlinks inside is already exists. if os.path.isdir(data_folder): logging.info( f"Data folder is found'{data_folder}' - Checking if symlinks are valid" ) existing_symlink_list = ut_in_out.list_files_under_root(data_folder) if len(source_file_list) != len(existing_symlink_list): logging.info( "Checking data folder - Number of files in the existing data " "folder does not match. New symlinks will be created" ) # delete already existing symlinks ut_in_out.delete_files_into_list(existing_symlink_list) # check if the first file into list is a valid file elif not os.path.isfile(existing_symlink_list[0]): logging.info( "Checking data folder - Symlink is not valid New symlinks will " "be created" ) # delete already existing symlinks ut_in_out.delete_files_into_list(existing_symlink_list) else: logging.info(f"Symlinks are found in {data_folder}") return data_folder # create all folders and subfolders os.makedirs(data_folder, exist_ok=True) for camera_name in camera_names: os.makedirs(os.path.join(data_folder, camera_name), exist_ok=True) # create symbolic links logging.info(f"Creating symlinks under {data_folder}") for source_file in source_file_list: if not os.path.exists(source_file): logging.warning(f"WARNING! data file '{source_file}' does not exist!") else: indices = [source_file.find(name) for name in camera_names] cam_name = source_file[max(indices) :] os_type = oslab_sys.detect_os_type() if os_type == "linux": cam_name = cam_name[: cam_name.find("/")] elif os_type == "windows": cam_name = cam_name[: cam_name.find("\\")] else: logging.error("Unknown os type in create_symlink_input_folder") try: source_file_abs_path = os.path.abspath(source_file) data_folder_abs_path = os.path.abspath(data_folder) target_abs_path = os.path.join( data_folder_abs_path, cam_name, os.path.basename(source_file) ) os.symlink(source_file_abs_path, target_abs_path) except OSError as e: logging.error(f"Error creating symlink: {e}") return data_folder