Source code for clabtoolkit.bidstools

import os
import shutil
import pandas as pd
import time
import queue
import threading
import numpy as np

from typing import Union, Dict, Set, Tuple, List, Optional
from pathlib import Path

import re
import json
from glob import glob

from rich.progress import (
    Progress,
    BarColumn,
    TimeRemainingColumn,
    TextColumn,
    MofNCompleteColumn,
    SpinnerColumn,
)
from rich.console import Console
from rich.panel import Panel
import concurrent.futures


# Importing the clabtoolkit modules
from . import misctools as cltmisc


####################################################################################################
####################################################################################################
############                                                                            ############
############                                                                            ############
############     Section 1: Methods dedicated to work with BIDs naming conventions      ############
############                                                                            ############
############                                                                            ############
####################################################################################################
####################################################################################################
[docs] def str2entity(string: str) -> dict: """ Converts a formatted string into a dictionary. Parameters ---------- string : str String to convert, with the format `key1-value1_key2-value2...suffix.extension`. Returns ------- dict Dictionary containing the entities extracted from the string. Examples -------- >>> str2entity("sub-01_ses-M00_acq-3T_dir-AP_run-01_T1w.nii.gz") Returns: {'sub': '01', 'ses': 'M00', 'acq': '3T', 'dir': 'AP', 'run': '01', 'suffix': 'T1w', 'extension': 'nii.gz'} """ ent_dict = {} suffix, extension = "", "" # Split the string into entities based on underscores. ent_list = string.split("_") # Detect suffix and extension for ent in ent_list[:]: if "-" not in ent: # If entity does not contain a '-', it's a suffix or extension. if "." in ent: # Split suffix and extension parts suffix, extension = ent.split(".", 1) else: suffix = ent ent_list.remove(ent) # Process the remaining entities for ent in ent_list: key, value = ent.split("-", 1) # Split each entity on the first "-" ent_dict[key] = value # Add suffix and extension to the dictionary if they were found if suffix: ent_dict["suffix"] = suffix if extension: ent_dict["extension"] = extension return ent_dict
####################################################################################################
[docs] def entity2str(entity: dict) -> str: """ Converts an entity dictionary to a string representation. Parameters ---------- entity : dict Dictionary containing the entities. Returns ------- str String containing the entities in the format `key1-value1_key2-value2...suffix.extension`. Examples -------- >>> entity2str({'sub': '01', 'ses': 'M00', 'acq': '3T', 'dir': 'AP', 'run': '01', 'suffix': 'T1w', 'extension': 'nii.gz'}) Returns: "sub-01_ses-M00_acq-3T_dir-AP_run-01_T1w.nii.gz" """ # Make a copy of the entity dictionary to avoid mutating the original. entity = entity.copy() # Extract optional 'suffix' and 'extension' fields if present. suffix = entity.pop("suffix", "") extension = entity.pop("extension", "") # Construct the main part of the string by joining key-value pairs with '_' ent_string = "_".join(f"{key}-{str(value)}" for key, value in entity.items()) # Append suffix if it exists if suffix: ent_string += "_" + suffix else: ent_string = ent_string.rstrip("_") # Remove trailing underscore if no suffix # Append extension if it exists if extension: ent_string += f".{extension}" return ent_string
####################################################################################################
[docs] def delete_entity( entity: Union[dict, str], ent2rem: Union[List[str], str, dict] ) -> Union[dict, str]: """ Removes specified keys from an entity dictionary or string representation. Parameters ---------- entity : dict or str Dictionary or string containing the entities. ent2rem : List[str], str or dict Entities to be removed from the entity dictionary or string. If ent2rem is a dictionary, only the combination key-value will be removed from the filenames. Returns ------- Union[dict, str] The updated entity as a dictionary or string (matching the input type). Examples -------- >>> delete_entity("sub-01_ses-M00_acq-3T_dir-AP_run-01_T1w.nii.gz", "acq") Returns: "sub-01_ses-M00_dir-AP_run-01_T1w.nii.gz" """ # Determine if `entity` is a string and convert if necessary. is_string = isinstance(entity, str) rem_is_dict = False # Boolean variable in case you want to delete an entity with a certain value and the key2remis a dictionary if is_string: entity_out = str2entity(entity) elif isinstance(entity, dict): entity_out = entity.copy() else: raise ValueError("The entity must be a dictionary or a string.") # Ensure `key2rem` is a list for uniform processing. if isinstance(ent2rem, str): key2rem = [ent2rem] elif isinstance(ent2rem, list): key2rem = list(set(ent2rem)) # Convert to a set for unique keys elif isinstance(ent2rem, dict): rem_is_dict = True key2rem = list(ent2rem.keys()) else: raise ValueError( "The ent2rem parameter must be a string, list of strings, or dictionary." ) # Remove specified keys from the entity dictionary. for key in key2rem: if rem_is_dict: # If `key2rem` is a dictionary, check if the key exists and has the specified value. if key in entity_out and entity_out[key] in ent2rem[key]: entity_out.pop(key, None) else: entity_out.pop(key, None) # `pop` with default `None` avoids KeyErrors. # Convert back to string format if original input was a string. if is_string: return entity2str(entity_out) return entity_out
####################################################################################################
[docs] def replace_entity_value( entity: Union[dict, str], ent2replace: Union[dict, str], verbose: bool = False ) -> Union[dict, str]: """ Replaces values in an entity dictionary or string representation. Parameters ---------- entity : dict or str Dictionary or string containing the entities. ent2replace : dict or str Dictionary or string containing entities to replace with new values. verbose : bool, optional If True, prints warnings for non-existent or empty values. Returns ------- Union[dict, str] Updated entity as a dictionary or string (matching the input type). Examples -------- >>> replace_entity_value("sub-01_ses-M00_acq-3T_dir-AP_run-01_T1w.nii.gz", {"acq": "7T"}) Returns: "sub-01_ses-M00_acq-7T_dir-AP_run-01_T1w.nii.gz" """ # Determine if `entity` is a string and convert if necessary. is_string = isinstance(entity, str) if is_string: entity_out = str2entity(entity) elif isinstance(entity, dict): entity_out = entity.copy() else: raise ValueError("The entity must be a dictionary or a string.") # Adding the possibility to enter a string value. It will convert it to a dictionary if isinstance(ent2replace, str): ent2replace = str2entity(ent2replace) # Remove any empty keys or values from `ent2replace`. ent2replace = {k: v for k, v in ent2replace.items() if v} # Replace values in `entity_out` based on `ent2replace`. for key, new_value in ent2replace.items(): if key in entity_out: if new_value: entity_out[key] = new_value elif verbose: print(f"Warning: Replacement value for '{key}' is empty.") elif verbose: print(f"Warning: Entity '{key}' not found in entity dictionary.") # Convert back to string format if original input was a string. if is_string: return entity2str(entity_out) return entity_out
####################################################################################################
[docs] def replace_entity_key( entity: Union[dict, str], keys2replace: Dict[str, str], verbose: bool = False ) -> Union[dict, str]: """ Replaces specified keys in an entity dictionary or string representation. Parameters ---------- entity : dict or str Dictionary containing the entities or a string that follows the BIDS naming specifications. keys2replace : dict Dictionary mapping old keys to new keys. verbose : bool, optional If True, prints warnings for keys in `keys2replace` that are not found in `entity`. Returns ------- Union[dict, str] Updated entity as a dictionary or string (matching the input type). Examples -------- >>> replace_entity_key("sub-01_ses-M00_acq-3T_dir-AP_run-01_T1w.nii.gz", {"acq": "TESTrep1", "dir": "TESTrep2"}) Returns: "sub-01_ses-M00_TESTrep1-3T_TESTrep2-AP_run-01_T1w.nii.gz" """ # Convert `entity` to a dictionary if it's a string is_string = isinstance(entity, str) if is_string: entity = str2entity(entity) elif not isinstance(entity, dict): raise ValueError("The entity must be a dictionary or a string.") # Validate that `keys2replace` is a dictionary if not isinstance(keys2replace, dict): raise ValueError("The keys2replace parameter must be a dictionary.") # Filter out any empty keys or values from `keys2replace` keys2replace = {k: v for k, v in keys2replace.items() if k and v} # Replace key names in the entity entity_out = {} for key, value in entity.items(): # Use the new key if it exists in `keys2replace`, otherwise keep the original key new_key = keys2replace.get(key, key) entity_out[new_key] = value # Verbose output if the key to replace does not exist in the entity if verbose and key in keys2replace and key not in entity: print(f"Warning: Key '{key}' not found in the original dictionary.") # Convert back to string format if the original input was a string if is_string: return entity2str(entity_out) return entity_out
####################################################################################################
[docs] def insert_entity( entity: Union[dict, str], entity2add: Dict[str, str], prev_entity: str = None ) -> Union[dict, str]: """ Adds entities to an existing entity dictionary or string representation. Parameters ---------- entity : dict or str Dictionary containing the entities or a string that follows the BIDS naming specifications. entity2add : dict Dictionary containing the entities to add. IMPORTANT: If the `entity2add` contains keys that already exist in the `entity`, they will not be added. prev_entity : str, optional Key in `entity` after which to insert the new entities. Returns ------- Union[dict, str] Updated entity with the new entities added (matching the input type). Examples -------- >>> insert_entity("sub-01_ses-M00_acq-3T_dir-AP_run-01_T1w.nii.gz", {"task": "rest"}) Returns: "sub-01_ses-M00_acq-3T_dir-AP_run-01_task-rest_T1w.nii.gz" >>> insert_entity("sub-01_ses-M00_acq-3T_dir-AP_run-01_T1w.nii.gz", {"task": "rest"}, prev_entity="ses") Returns: "sub-01_ses-M00_task-rest_acq-3T_dir-AP_run-01_T1w.nii.gz" """ # Determine if `entity` is a string and convert if necessary is_string = isinstance(entity, str) if is_string: entity = str2entity(entity) elif not isinstance(entity, dict): raise ValueError("The entity must be a dictionary or a string.") # Clean `entity2add` by removing any empty keys or values entity2add = {k: v for k, v in entity2add.items() if k and v} # Validate `prev_entity` if provided if prev_entity is not None and prev_entity not in entity: raise ValueError( f"Reference entity '{prev_entity}' is not in the entity dictionary." ) # Temporarily remove `suffix` and `extension` if they exist suffix = entity.pop("suffix", None) extension = entity.pop("extension", None) # Check if there are some entities already on the name and removing those items from # the dictionary if len(cltmisc.list_intercept(list(entity2add.keys()), list(entity.keys()))) > 0: # Removes the items from `entity2add` that already exist in `entity` entity2add = {k: v for k, v in entity2add.items() if k not in entity} # If `entity2add` is empty after filtering, return the original entity if not entity2add: if is_string: # Restore `suffix` and `extension` if they were removed if suffix: entity["suffix"] = suffix if extension: entity["extension"] = extension return entity2str(entity) # Restore `suffix` and `extension` if they were removed if suffix: entity["suffix"] = suffix if extension: entity["extension"] = extension return entity else: # Build `ent_out` by adding items from `entity`, and insert `entity2add` after `prev_entity` if specified ent_out = {} for key, value in entity.items(): ent_out[key] = value if key == prev_entity: ent_out.update( entity2add ) # Insert new entities immediately after `prev_entity` # If no `prev_entity` is specified or if `prev_entity` is "suffix", append `entity2add` at the end if prev_entity is None or prev_entity == "suffix": ent_out.update(entity2add) # Restore `suffix` and `extension` if they were removed if suffix: ent_out["suffix"] = suffix if extension: ent_out["extension"] = extension # Convert back to string format if the original input was a string if is_string: return entity2str(ent_out) return ent_out
####################################################################################################
[docs] def recursively_replace_entity_value( root_dir: str, dict2old: Union[dict, str], dict2new: Union[dict, str] ): """ This method replaces the values of certain entities in all the files and folders of a BIDs dataset. Parameters ---------- root_dir: str Root directory of the BIDs dataset dict2old: dict or str Dictionary containing the entities to replace and their old values dict2new: dict or str Dictionary containing the entities to replace and their new values """ # Detect if the BIDs directory exists if not os.path.isdir(root_dir): raise ValueError("The BIDs directory does not exist.") # Convert the strings to dictionaries if isinstance(dict2old, str): dict2old = str2entity(dict2old) if isinstance(dict2new, str): dict2new = str2entity(dict2new) # Leave in the dictionaries only the keys that are common dict2old = {k: dict2old[k] for k in dict2old if k in dict2new} dict2new = {k: dict2new[k] for k in dict2new if k in dict2old} # Order the dictionaries alphabetically by key dict2old = dict(sorted(dict2old.items())) dict2new = dict(sorted(dict2new.items())) # Creating the list of strings dict2old_list = [f"{key}-{value}" for key, value in dict2old.items()] dict2new_list = [f"{key}-{value}" for key, value in dict2new.items()] replacements = dict(zip(dict2old_list, dict2new_list)) # all_files = cltmisc.get_all_files( root_dir, or_filter=dict2old_list[0], and_filter=dict2old_list ) if not all_files: print( "No files found that match the specified entities. Please check the input parameters." ) return else: all_dirs = [] for file in all_files: file_path = os.path.dirname(file) file_name = os.path.basename(file) all_dirs.append(file_path) for i, subst_x in enumerate(dict2old_list): subst_y = dict2new_list[i] if subst_x in file_name: old_path = os.path.join(file_path, file_name) new_name = file_name.replace(subst_x, subst_y) new_path = os.path.join(file_path, new_name) os.rename(old_path, new_path) file_name = ( new_name # Update old_path to the new path after renaming ) all_dirs = set(all_dirs) # Remove duplicates from the directory list # Renaming the directories cltmisc.rename_folders(all_dirs, replacements)
####################################################################################################
[docs] def recursively_replace_entity_key(root_dir: str, replacements: dict): """ This method replaces the keys of certain entities in all the files and folders of a BIDs dataset. Parameters ---------- root_dir: str Root directory of the BIDs dataset replacements: dict Dictionary containing the entities to replace and their new keys. Example: {'acq': 'desc', 'run': 'runny'} Returns ------- None The method will rename the files and folders in the BIDs dataset. All the files or folders containing the old entities' names on their names will be renamed and the old entities will be replaced with the new entities. """ # Detect if the BIDs directory exists if not os.path.isdir(root_dir): raise ValueError("The BIDs directory does not exist.") old_keys = list(replacements.keys()) new_keys = list(replacements.values()) all_files = cltmisc.get_all_files( root_dir, or_filter=old_keys[0], and_filter=old_keys ) if not all_files: print( "No files found that match the specified entities. Please check the input parameters." ) return else: all_dirs = [] for file in all_files: file_path = os.path.dirname(file) file_name = os.path.basename(file) all_dirs.append(file_path) for i, subst_x in enumerate(old_keys): subst_y = new_keys[i] if subst_x in file_name: old_path = os.path.join(file_path, file_name) new_name = file_name.replace(subst_x, subst_y) new_path = os.path.join(file_path, new_name) os.rename(old_path, new_path) file_name = ( new_name # Update old_path to the new path after renaming ) all_dirs = set(all_dirs) # Remove duplicates from the directory list # Renaming the directories cltmisc.rename_folders(all_dirs, replacements)
####################################################################################################
[docs] def recursively_delete_entity(root_dir: str, key2rem: Union[List[str], str, dict]): """ This method deletes entities in all the files and folders of a BIDs dataset. Parameters ---------- root_dir: str Root directory of the BIDs dataset key2rem: list or str Key(s) of the entities that will be removed from the files and folders. Returns ------- None The method will rename the files and folders in the BIDs dataset, removing from file names and folder names the entities containing the specified keys. """ # Detect if the BIDs directory exists root_dir = os.path.abspath(root_dir) if not os.path.isdir(root_dir): raise ValueError("The BIDs directory does not exist.") # Convert the strings to lists if isinstance(key2rem, str): key2rem = [key2rem] if isinstance(key2rem, dict): tmp_keys = list(key2rem.keys()) all_files = cltmisc.get_all_files( root_dir, or_filter=tmp_keys[0], and_filter=tmp_keys ) else: all_files = cltmisc.get_all_files( root_dir, or_filter=key2rem[0], and_filter=key2rem ) if not all_files: print( "No files found that match the specified entities. Please check the input parameters." ) return else: all_dirs = [] for file in all_files: file_path = os.path.dirname(file) file_name = os.path.basename(file) all_dirs.append(file_path) new_entity = delete_entity(file_name, key2rem) old_path = os.path.join(file_path, file_name) new_path = os.path.join(file_path, new_entity) os.rename(old_path, new_path) all_dirs = set(all_dirs) # Remove duplicates from the directory list # Renaming the directories key2rem_dict = {key: "" for key in key2rem} # Create a dict with empty values cltmisc.rename_folders(all_dirs, key2rem_dict)
####################################################################################################
[docs] def recursively_insert_entity( root_dir: str, entity2add: Dict[str, str], prev_entity: str = None ) -> None: """ This method inserts entities in all the files and folders of a BIDs dataset. Parameters ---------- root_dir: str Root directory of the BIDs dataset entity2add: dict Dictionary containing the entities to add. Example: {'task': 'rest', 'run': '01'} prev_entity: str, optional Key in `entity` after which to insert the new entities. Otherwise it will be added at the end of the file name, just before the suffix. Returns ------- None The method will rename the files and folders in the BIDs dataset. All the files or folders containing the old entities' names on their names will be renamed and the old entities will be replaced with the new entities. """ # Detect if the BIDs directory exists root_dir = os.path.abspath(root_dir) if not os.path.isdir(root_dir): raise ValueError("The BIDs directory does not exist.") # Convert the strings to dictionaries if isinstance(entity2add, str): entity2add = str2entity(entity2add) # Order the dictionaries alphabetically by key entity2add = dict(sorted(entity2add.items())) # Creating the list of strings entity2add_list = [f"{key}-{value}" for key, value in entity2add.items()] if prev_entity is not None: all_files = cltmisc.get_all_files(root_dir, or_filter=prev_entity) else: all_files = cltmisc.get_all_files(root_dir, or_filter="sub-") if not all_files: print( "No files found that match the specified entities. Please check the input parameters." ) return else: all_dirs = [] for file in all_files: file_path = os.path.dirname(file) file_name = os.path.basename(file) all_dirs.append(file_path) new_entity = insert_entity(file_name, entity2add, prev_entity=prev_entity) old_path = os.path.join(file_path, file_name) new_path = os.path.join(file_path, new_entity) os.rename(old_path, new_path)
####################################################################################################
[docs] def get_all_entities(root_dir: str) -> Tuple[Dict[str, Set[str]], List[str]]: """ Returns a set of all unique entities found in the BIDS dataset. Parameters ---------- root_dir : str Root directory of the BIDS dataset. Returns ------- all_entities: Set[str] A set of unique entity names found in the dataset. all_suffixes: List[str] A list of unique suffixes found in the dataset. Raises ------ ValueError If the specified root directory does not exist. FileNotFoundError If the default configuration file is not found. ValueError If the default configuration JSON does not have the expected structure. Examples -------- >>> get_all_entities('/path/to/bids/dataset') {'sub', 'ses', 'task', 'run', ...} >>> get_all_entities('/path/to/bids/dataset') {'sub', 'ses', 'task', 'run', ...}, ['T1w', 'bold', ...] """ # Check if the root directory exists if not os.path.isdir(root_dir): raise ValueError(f"The specified root directory does not exist: {root_dir}") # Load the config bids.json file cwd = os.path.dirname(os.path.abspath(__file__)) try: config_data = load_bids_json() # Merge raw and derivatives entities if ( "bids_entities" in config_data and "raw_entities" in config_data["bids_entities"] and "derivatives_entities" in config_data["bids_entities"] ): entities = { **config_data["bids_entities"]["raw_entities"], **config_data["bids_entities"]["derivatives_entities"], } else: raise ValueError( "Default config JSON does not have the expected structure." ) except FileNotFoundError: raise FileNotFoundError( f"Default configuration file not found at: {os.path.join(cwd, 'config', 'bids.json')}" ) # Get all the leaf directories in the BIDS dataset bids_folders = cltmisc.get_leaf_directories(root_dir) # Get all the files in the folder # Leave only the files that have a correct BIDs suffix suffixes = config_data["bids_entities"]["raw_suffix"] extensions = config_data["bids_entities"]["extensions"] # Multiply the suffixes and extensions to get all the possible combinations suffixes = [f"_{s}{e}" for s in suffixes for e in extensions] bids_files = cltmisc.get_all_files(root_dir, or_filter=suffixes) # Leave only the directories that are BIDs folders bids_folders = cltmisc.filter_by_substring( bids_folders, config_data["bids_entities"]["bids_folders"] ) # Leave only the files that are BIDs files bids_files = cltmisc.filter_by_substring(bids_files, bids_folders) def process_file(file): filename = os.path.basename(file) ent_dict = str2entity(filename) suffix = None if "suffix" in ent_dict: suffix = ent_dict.pop("suffix") ent_dict.pop("extension", None) return list(ent_dict.keys()), suffix all_entities = [] all_suffixes = [] with concurrent.futures.ThreadPoolExecutor() as executor: results = executor.map(process_file, bids_files) for entity_keys, suffix in results: all_entities.extend(entity_keys) if suffix is not None: all_suffixes.append(suffix) # Unique entities all_entities = sorted(set(all_entities)) all_suffixes = sorted(set(all_suffixes)) selected_ent_dict = entities4table(selected_entities=all_entities) return selected_ent_dict, all_suffixes
####################################################################################################
[docs] def entities4table( entities_json: str = None, selected_entities: Union[str, Dict, List] = None ) -> Dict: """ Returns the BIDS entities that will be included in the morphometric table. This function loads BIDS entities from a JSON configuration file and filters them based on optional selected entities. Parameters ---------- entities_json : str, optional Path to the JSON file with entity definitions. If None, the method uses the default config JSON file. selected_entities : Union[str, Dict, List], optional Entities to select from the loaded entities. Can be: - A string with comma-separated entity names - A dictionary with entity names as keys - A list of entity names If None, all entities are included. Returns ------- Dict Dictionary of entity names and their values. Raises ------ ValueError If the provided JSON file path is invalid or the JSON format is incorrect. FileNotFoundError If the specified JSON file does not exist. Examples -------- >>> # Using default config file (returns all entities) >>> entities4table() {'sub': {'...'}, 'ses': {'...'}, ... 'scale': {'...'}} >>> # Using a custom JSON file >>> entities4table('path/to/custom/entities.json') {'sub': {'...'}, 'ses': {'...'}, ... 'scale': {'...'}} >>> # Selecting specific entities >>> entities4table(selected_entities='sub,ses,run') {'sub': {'...'}, 'ses': {'...'}, 'run': {'...'}} >>> # Using a dictionary to select entities >>> entities4table(selected_entities={'sub': None, 'ses': None}) {'sub': {'...'}, 'ses': {'...'}} >>> # Using a list to select entities >>> entities4table(selected_entities=['sub', 'ses']) {'sub': {'...'}, 'ses': {'...'}} """ import os import json from typing import Dict, Union, List # Load entities from JSON if entities_json is None: # Define path to default config JSON default_config_path = os.path.join( os.path.dirname(__file__), "config", "bids.json" ) try: config_data = load_bids_json(default_config_path) # Merge raw and derivatives entities if ( "bids_entities" in config_data and "raw_entities" in config_data["bids_entities"] and "derivatives_entities" in config_data["bids_entities"] ): ent_out_dict = { **config_data["bids_entities"]["raw_entities"], **config_data["bids_entities"]["derivatives_entities"], } else: raise ValueError( "Default config JSON does not have the expected structure." ) except FileNotFoundError: raise FileNotFoundError( f"Default configuration file not found at: {default_config_path}" ) elif isinstance(entities_json, str): # Load from provided JSON file path if not os.path.isfile(entities_json): raise FileNotFoundError(f"JSON file not found: {entities_json}") try: ent_out_dict = cltmisc.extract_string_values(entities_json) except json.JSONDecodeError: raise ValueError(f"Error parsing the JSON file: {entities_json}") else: raise TypeError("entities_json must be None or a string path to a JSON file.") # Filter entities based on selected_entities if selected_entities is not None: selected_entity_keys = [] # Handle string input (convert to list of keys) if isinstance(selected_entities, str): try: # Assume it's a comma-separated string if "," in selected_entities: selected_entity_keys = [ e.strip() for e in selected_entities.split(",") ] elif is_bids_filename(selected_entities): selected_entities = str2entity(selected_entities) selected_entity_keys = list(selected_entities.keys()) except (ImportError, AttributeError): raise ValueError( "Cannot parse selected_entities string. Provide a comma-separated list or a BIDs-like string (e.g. sub-XXX_ses-SSS_run-01 )." ) # Handle dictionary input elif isinstance(selected_entities, dict): selected_entity_keys = list(selected_entities.keys()) # Handle list input elif isinstance(selected_entities, list): selected_entity_keys = selected_entities # Filter the output dictionary to include only selected entities ent_out_dict = { k: v for k, v in ent_out_dict.items() if k in selected_entity_keys } return ent_out_dict
####################################################################################################
[docs] def entities_to_table( filepath: str, entities_to_extract: Optional[Union[str, List[str], Dict[str, str]]] = None, include_suffix: bool = False, ) -> pd.DataFrame: """ Creates a DataFrame with BIDS entities extracted from a filename. ... Parameters ---------- filepath : str Full path to the BIDS file from which to extract entities. entities_to_extract : str, list, dict, or None, default=None Specifies which entities to extract from the filename: - If str: A single entity name to extract - If list: Multiple entity names to extract - If dict: Keys are entity names, values are custom column names - If None: Returns a single column with the full filename include_suffix : bool, default=True If True, adds a 'Type' column containing the BIDS suffix (e.g., 'bold', 'T1w', 'dwi') extracted from the filename. Returns ------- pd.DataFrame DataFrame containing the extracted entities as columns. If include_suffix is True, a 'Type' column is appended at the end. If the file is not BIDS-compliant, returns an empty DataFrame. Examples -------- >>> df = entities_to_table( ... '/data/sub-01/ses-pre/sub-01_ses-pre_task-rest_bold.nii.gz', ... ['sub', 'ses'], ... include_suffix=True ... ) >>> print(df) Participant Session Type 0 01 pre bold """ # Type checking if filepath is None or not isinstance(filepath, str): raise TypeError("filepath must be a string") # Reading the mapping dictionary cwd = os.path.dirname(os.path.abspath(__file__)) default_config_path = os.path.join(cwd, "config", "bids.json") try: with open(default_config_path, "r") as f: config_data = json.load(f) if ( "bids_entities" in config_data and "raw_entities" in config_data["bids_entities"] and "derivatives_entities" in config_data["bids_entities"] ): ent_out_dict = { **config_data["bids_entities"]["raw_entities"], **config_data["bids_entities"]["derivatives_entities"], } else: raise ValueError( "Default config JSON does not have the expected structure." ) except FileNotFoundError: raise FileNotFoundError( f"Default configuration file not found at: {default_config_path}" ) except json.JSONDecodeError: raise ValueError( f"Error parsing the default configuration file: {default_config_path}" ) file_directory = os.path.dirname(filepath) filename = cltmisc.get_real_basename(filepath) filename = filename.split(".")[0] result_df = pd.DataFrame() if is_bids_filename(filename): entities_dict = str2entity(filename) if "extension" in entities_dict: entities_dict.pop("extension") # Extract suffix before removing it if "suffix" in entities_dict: if include_suffix: # Add an entity at the end called "Type" tmp_suffix = entities_dict["suffix"] entities_dict.pop("suffix") if entities_to_extract is not None: if isinstance(entities_to_extract, str): entities_to_extract = [entities_to_extract] if isinstance(entities_to_extract, list): entities_to_extract = {entity: "" for entity in entities_to_extract} else: entities_to_extract = {entity: "" for entity in entities_dict.keys()} if result_df.empty: result_df = pd.DataFrame([{}]) entity_keys = list(entities_to_extract.keys()) for entity in reversed(entity_keys): value = entities_dict.get(entity, "") if entity in ent_out_dict.keys(): var_name = ent_out_dict[entity] if entity == "atlas": if "chimera" in value: result_df.insert(0, "ChimeraCode", value.replace("chimera", "")) result_df.insert(0, "Atlas", "chimera") else: result_df.insert(0, "ChimeraCode", "") result_df.insert(0, "Atlas", value) elif entity == "desc": result_df.insert(0, "Description", value) if "grow" in value: result_df.insert(0, "GrowIntoWM", value.replace("grow", "")) else: result_df.insert(0, var_name, value) else: result_df.insert(0, entity.capitalize(), value) # Append Type column at the end if requested if "tmp_suffix" in locals() and include_suffix: result_df.insert(0, "Type", tmp_suffix) else: if result_df.empty: result_df = pd.DataFrame([{}]) if "Participant" not in result_df.columns: temp = os.path.splitext(filename)[0] result_df.insert(0, "Participant", temp) # No suffix available for non-BIDS files if include_suffix: result_df["Type"] = "" for col in result_df.columns: result_df[col] = result_df[col].astype(str) return result_df
#################################################################################################### #################################################################################################### ############ ############ ############ ############ ############ Section 2: Methods dedicated to work with BIDs file organization ############ ############ ############ ############ ############ #################################################################################################### ####################################################################################################
[docs] def get_subjects(bids_dir: str) -> list: """ Get a list of all subjects in the BIDs directory. Parameters ---------- bids_dir : str Path to the BIDs directory. Returns ------- list List of subject IDs. Usage example: >>> bids_dir = "/path/to/bids" >>> print(get_subjects(bids_dir)) ["sub-01", "sub-02", ...] """ subjects = [] for root, dirs, files in os.walk(bids_dir): for dir_name in dirs: if dir_name.startswith("sub-"): subjects.append(dir_name) return subjects
####################################################################################################
[docs] def copy_bids_folder( bids_dir: str, out_dir: str, subjects_to_copy: Union[list, str] = None, folders_to_copy: Union[list, str] = "all", deriv_dir: str = None, include_derivatives: Union[str, list] = None, ): """ This function copies the BIDs folder and its derivatives for given subjects to a new location. Parameters ---------- bids_dir : str Path to the BIDs directory. out_dir : str Path to the output directory where the copied BIDs folder will be saved. subjects_to_copy : list or str, optional List of subject IDs to copy. If None, all subjects will be copied. folders_to_copy : list or str, optional List of BIDs folders to copy. If "all", all folders will be copied. Default is "all". deriv_dir : str, optional Path to the derivatives directory. If None, it will be set to "derivatives" in the BIDs directory. include_derivatives : str or list, optional List of derivatives to include. If "all", all derivatives will be included. Default is None. If None, no derivatives will be copied. If "chimera", only the chimera derivatives will be copied. If "all", all derivatives will be copied. If a list, only the derivatives in the list will be copied. If a string, only the derivatives with the name in the string will be copied. Returns ------- None Copies the specified folders and subjects to the output directory. Usage example: >>> bids_dir = "/path/to/bids" >>> out_dir = "/path/to/output" >>> copy_bids_folder(bids_dir, out_dir, subjects_to_copy=["sub-01"], folders_to_copy=["anat"]) >>> copy_bids_folder(bids_dir, out_dir, subjects_to_copy=["sub-01"], include_derivatives=["chimera", "freesurfer"]) >>> copy_bids_folder(bids_dir, out_dir, subjects_to_copy=["sub-01"], deriv_dir="/path/to/derivatives") """ bids_dir = cltmisc.remove_trailing_separators(bids_dir) out_dir = cltmisc.remove_trailing_separators(out_dir) if not os.path.isdir(bids_dir): raise FileNotFoundError(f"The BIDs directory {bids_dir} does not exist.") if not os.path.isdir(out_dir): raise FileNotFoundError(f"The output directory {out_dir} does not exist.") if deriv_dir is not None: deriv_dir = cltmisc.remove_trailing_separators(deriv_dir) if not os.path.isdir(deriv_dir): raise FileNotFoundError( f"The derivatives directory {deriv_dir} does not exist." ) # Selecting the subjects that will be copied if isinstance(subjects_to_copy, str): subjects_to_copy = [subjects_to_copy] if subjects_to_copy is None: subjects_to_copy = get_subjects(bids_dir) else: # Check if the cad "sub-" is in the subjects_to_copy of not add it subjects_to_copy = [ sub if sub.startswith("sub-") else f"sub-{sub}" for sub in subjects_to_copy ] # Selecting the BIDs folders that will be copied if isinstance(folders_to_copy, str): folders_to_copy = [folders_to_copy] if "all" in folders_to_copy: folders_to_copy = ["all"] # Number of subjects to copy n_subj = len(subjects_to_copy) # Selecting the derivatives folder if include_derivatives is not None: copy_derivatives = True if deriv_dir is None: deriv_dir = os.path.join(bids_dir, "derivatives") if not os.path.isdir(deriv_dir): # Lunch a warning message if the derivatives folder does not exist print("WARNING: The derivatives folder does not exist.") print("WARNING: The derivatives folder will not be copied.") copy_derivatives = False else: # Check if the derivatives folder is empty if len(os.listdir(deriv_dir)) == 0: print("WARNING: The derivatives folder is empty.") print("WARNING: The derivatives folder will not be copied.") copy_derivatives = False # Selecting all the derivatives folders directories = os.listdir(deriv_dir) der_pipe_folders = [] for directory in directories: pipe_dir = os.path.join(deriv_dir, directory) if not directory.startswith(".") and os.path.isdir(pipe_dir): der_pipe_folders.append(pipe_dir) if isinstance(include_derivatives, str): include_derivatives = [include_derivatives] if "all" not in include_derivatives: include_derivatives = [ os.path.join(deriv_dir, i) for i in include_derivatives ] der_pipe_folders = cltmisc.list_intercept( der_pipe_folders, include_derivatives ) if len(der_pipe_folders) == 0: print( "WARNING: No derivatives folders were found with the specified names." ) copy_derivatives = False else: copy_derivatives = False der_pipe_folders = [] progress = Progress( TextColumn("[bold blue]{task.description}", justify="right"), BarColumn(bar_width=None), "[progress.percentage]{task.percentage:>3.0f}%", TimeRemainingColumn(), MofNCompleteColumn(), ) with progress: task = progress.add_task( "[cyan]Copying BIDs folder and derivatives...", total=None ) progress.update(task, total=n_subj) progress.start_task(task) progress.update(task, completed=0) # Loop around all the subjects for i, full_id in enumerate(subjects_to_copy): # Loop along the IDs # Extract the subject id subj_entity = str2entity(full_id) # Remove the extension from the subj_entity if "extension" in subj_entity.keys(): del subj_entity["extension"] if "suffix" in subj_entity.keys(): suffix = subj_entity["suffix"] del subj_entity["suffix"] else: suffix = None # Generate a list with the format [key_value] file_filter = [f"{k}-{v}" for k, v in subj_entity.items()] fold_filter = [f"{k}-{v}" for k, v in subj_entity.items()] if suffix is not None: file_filter.append(f"_{suffix}") if subj_entity is None: print(f"WARNING: The subject ID {full_id} is not valid.") continue subj_id = subj_entity["sub"] subj_dir = os.path.join(bids_dir, f"sub-{subj_id}") all_subj_subfold = cltmisc.get_leaf_directories(subj_dir) # Upgrade the progress bar and include the subject id on the text progress.update( task, description=f"[red]Copying subject: {full_id}", completed=i + 1 ) # Detect the session id if it exists ses_id = None if "ses" in subj_entity.keys(): ses_id = subj_entity["ses"] full_ses_id = f"ses-{ses_id}" all_subj_subfold = cltmisc.filter_by_substring( input_list=all_subj_subfold, or_filter=full_ses_id ) ####### Copying the BIDs folder if "all" not in folders_to_copy: tmp_list = [os.path.basename(i) for i in all_subj_subfold] indexes = cltmisc.get_indexes_by_substring(tmp_list, folders_to_copy) all_subj_subfold = [all_subj_subfold[i] for i in indexes] raw_files_to_copy = [] # Loop along all the folders to copy for fc in all_subj_subfold: all_subj_raw_files = cltmisc.get_all_files(fc) raw_files_to_copy = raw_files_to_copy + all_subj_raw_files # Filtering the files to copy according to the full raw_files_to_copy = cltmisc.filter_by_substring( input_list=raw_files_to_copy, or_filter=f"sub-{subj_id}", and_filter=file_filter, ) for file in raw_files_to_copy: if os.path.isfile(file): # Copying the file try: dest_dir = file.replace(bids_dir, out_dir) os.makedirs(os.path.dirname(dest_dir), exist_ok=True) shutil.copy2(file, dest_dir) except: print(f"WARNING: The file {file} could not be copied.") continue # Copying the Derivatives folder if copy_derivatives: # Loop along all the derivatives folders for pipe_dir in der_pipe_folders: # Check if the derivatives folder exists if os.path.isdir(pipe_dir): # Copying the derivatives folder subj_deriv = glob(os.path.join(pipe_dir, f"sub-{subj_id}*")) if len(subj_deriv) > 0: # Detecting the files and folders to copy. # We are doing this separation because there are derivatives that contain # only the folders with the name of the subject and not the files (i.e. freesurfer) # Loop along all the folders to copy deriv_files_to_copy = [] deriv_folds_to_copy = [] for deriv in subj_deriv: # Detect the files and folders to copy tmp_fold_to_copy = cltmisc.filter_by_substring( input_list=deriv, or_filter=f"sub-{subj_id}", and_filter=fold_filter, ) if len(tmp_fold_to_copy) > 0: # Detect the files and folders to copy for tmp in tmp_fold_to_copy: if os.path.isdir(tmp): deriv_folds_to_copy.append(tmp) else: deriv_files_to_copy.append(tmp) else: all_subj_deriv_files = cltmisc.get_all_files(deriv) # Filtering the files to copy according to the full all_subj_deriv_files = cltmisc.filter_by_substring( input_list=all_subj_deriv_files, or_filter=f"sub-{subj_id}", and_filter=fold_filter, ) deriv_files_to_copy = ( deriv_files_to_copy + all_subj_deriv_files ) # Copying the folders if len(deriv_folds_to_copy) > 0: for fold in deriv_folds_to_copy: try: dest_dir = fold.replace( deriv_dir, os.path.join(out_dir, "derivatives"), ) os.makedirs(dest_dir, exist_ok=True) shutil.copytree( fold, dest_dir, dirs_exist_ok=True ) except: print( f"WARNING: The folder or file {fold} could not be copied." ) continue # Copying the files if len(deriv_files_to_copy) > 0: for file in deriv_files_to_copy: try: dest_dir = file.replace( deriv_dir, os.path.join(out_dir, "derivatives"), ) os.makedirs( os.path.dirname(dest_dir), exist_ok=True ) shutil.copy2(file, dest_dir) except: print( f"WARNING: The file {file} could not be copied." ) continue # Update the progress bar to 100% progress.update(task, completed=n_subj)
####################################################################################################
[docs] def get_bids_database_table( root_dir: str, output_table: Optional[str] = None ) -> pd.DataFrame: """ Generate a comprehensive summary table of all neuroimaging files in a BIDS dataset. This function scans a BIDS dataset directory structure and creates a detailed table containing all BIDS entities (subject, session, acquisition, etc.) and file counts. The output table provides an overview of the dataset composition, making it easy to identify data availability, missing files, and dataset structure. Parameters ---------- root_dir : str Path to the BIDS dataset root directory. This should be the top-level directory containing subject folders (sub-*) and optionally a dataset_description.json file. output_table : str, optional Path where the resulting CSV table should be saved. If None, the table is not saved to disk but still returned as a DataFrame. Default is None. Returns ------- pd.DataFrame DataFrame with columns for each detected BIDS entity (Subject, Session, Acquisition, etc.), plus 'suffix' (image type like T1w, FLAIR) and 'N' (number of files for each unique combination). Each row represents a unique combination of BIDS entities and their file count. Raises ------ FileNotFoundError If the specified root_dir does not exist. NotADirectoryError If root_dir exists but is not a directory. ValueError If no subjects are found in the BIDS dataset (no sub-* folders). Examples -------- Basic usage - analyze dataset and return summary table: >>> import pandas as pd >>> bids_table = get_bids_table('/path/to/bids/dataset') >>> print(f"Dataset contains {len(bids_table)} unique file combinations") >>> print(f"Total files: {bids_table['N'].sum()}") Save summary table to CSV file: >>> bids_table = get_bids_table( ... root_dir='/data/my_study', ... output_table='/data/my_study/bids_summary.csv' ... ) Analyze specific aspects of the dataset: >>> # Count files by image type >>> suffix_counts = bids_table.groupby('suffix')['N'].sum() >>> print("Files by image type:") >>> print(suffix_counts) >>> # Check data availability per subject >>> subject_counts = bids_table.groupby('Subject')['N'].sum() >>> print("Files per subject:") >>> print(subject_counts) >>> # Find subjects with specific image types >>> t1w_subjects = bids_table[bids_table['suffix'] == 'T1w']['Subject'].unique() >>> print(f"Subjects with T1w images: {len(t1w_subjects)}") Example output table structure: >>> print(bids_table.head()) Subject Session Acquisition suffix N 0 sub-01 ses-01 acq-mprage T1w 1 1 sub-01 ses-01 acq-space T2w 1 2 sub-01 ses-01 None FLAIR 1 3 sub-02 ses-01 acq-mprage T1w 1 4 sub-02 ses-02 acq-mprage T1w 1 Notes ----- - Only processes .nii.gz files (NIfTI compressed format) - Automatically detects all BIDS entities present in the dataset - Groups identical combinations and sums file counts - Results are sorted by Subject, Session, and suffix for readability - Progress is displayed using Rich progress bar during processing - Column names are converted to human-readable format (e.g., 'sub' -> 'Subject') See Also -------- clabtoolkit.bidstools.get_subjects : Get list of subjects in BIDS dataset clabtoolkit.bidstools.get_all_entities : Extract all BIDS entities from dataset clabtoolkit.bidstools.str2entity : Parse BIDS filename to extract entities """ # Check if the root directory exists if not os.path.exists(root_dir): raise FileNotFoundError(f"BIDS root directory not found: {root_dir}") if not os.path.isdir(root_dir): raise NotADirectoryError(f"Provided path is not a directory: {root_dir}") # Initialize console console = Console() # Get all the subjects in the BIDs dataset subj_ids = get_subjects(root_dir) if not subj_ids: raise ValueError(f"No subjects found in the BIDS dataset at {root_dir}") # Get all entities in the BIDs dataset all_dataset_entities, all_dataset_suffixes = get_all_entities(root_dir) console.print(f"[green]Detected entities: {list(all_dataset_entities.keys())}") n_subj = len(subj_ids) # Create column names column_names = list(all_dataset_entities.keys()) column_names.extend(["suffix", "N"]) # List to store all rows all_rows = [] # Create Progress with explicit configuration progress = Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(bar_width=None), "[progress.percentage]{task.percentage:>3.1f}%", "•", MofNCompleteColumn(), "•", TimeRemainingColumn(), console=console, auto_refresh=True, refresh_per_second=10, ) # Use the progress bar with progress: # Add the task task_id = progress.add_task( description="Starting BIDS processing...", total=n_subj ) # Process each subject for i, subj_id in enumerate(subj_ids): # Update task description at the start of processing progress.update( task_id, description=f"Processing subject {subj_id}...", completed=i ) subj_dir = os.path.join(root_dir, subj_id) all_files = cltmisc.get_all_files( subj_dir, recursive=True, or_filter=[".nii.gz"] ) files_processed = 0 for file_path in all_files: file_name = os.path.basename(file_path) # Parse the BIDS filename to get entities ent_dict = str2entity(file_name) # Create a row dictionary with all possible entity columns initialized to None row_dict = {col: None for col in column_names} # Fill in the entities that are present in this file for entity, value in ent_dict.items(): if entity in row_dict: row_dict[entity] = value # Extract suffix from filename row_dict["suffix"] = ent_dict["suffix"] # Set N to 1 for individual files (we'll aggregate later) row_dict["N"] = 1 all_rows.append(row_dict) files_processed += 1 # Update progress AFTER processing each subject (this is the key fix!) progress.update( task_id, description=f"Completed {subj_id} ({files_processed} files)", completed=i + 1, # i+1 because we just finished this subject ) # Force refresh to ensure display updates progress.refresh() # Small delay to see the progress update time.sleep(0.1) # Final update progress.update(task_id, description="Processing complete!", completed=n_subj) # Brief pause to show completion time.sleep(0.5) # Create DataFrame from all rows df_table = pd.DataFrame(all_rows) if df_table.empty: return df_table # Group by all columns except 'N' and sum the counts groupby_cols = [col for col in df_table.columns if col != "N"] df_grouped = ( df_table.groupby(groupby_cols, dropna=False).agg({"N": "sum"}).reset_index() ) # Sort the DataFrame for better readability if not df_grouped.empty: sort_cols = [] if "sub" in df_grouped.columns: sort_cols.append("sub") if "ses" in df_grouped.columns: sort_cols.append("ses") if "suffix" in df_grouped.columns: sort_cols.append("suffix") if sort_cols: df_grouped = df_grouped.sort_values(sort_cols).reset_index(drop=True) all_entities = entities4table() old_colnames = cltmisc.list_intercept( list(df_grouped.columns), list(all_entities.keys()) ) new_colnames = list(entities4table(selected_entities=old_colnames).values()) # Rename columns to match BIDS entities df_grouped.rename(columns=dict(zip(old_colnames, new_colnames)), inplace=True) # Save the table if output path is provided if output_table: df_grouped.to_csv(output_table, index=False) return df_grouped
####################################################################################################
[docs] def get_derivatives_folders( deriv_dir: str, ) -> list: """ Get a list of all derivatives folders in the specified directory. Parameters ---------- deriv_dir : str Path to the derivatives directory. Returns ------- list List of derivatives folder names. Raises ------ ValueError If the derivatives directory does not exist. TypeError If the derivatives directory is not a string. Usage example: >>> deriv_dir = "/path/to/derivatives" >>> print(get_derivatives_folders(deriv_dir)) """ # Check if the derivatives directory exists if not os.path.isdir(deriv_dir): raise ValueError("The derivatives directory does not exist.") # Get all directories in the derivatives directory directories = os.listdir(deriv_dir) # Filter out hidden directories and keep only valid directories der_pipe_folders = [] for directory in directories: pipe_dir = os.path.join(deriv_dir, directory) if not directory.startswith(".") and os.path.isdir(pipe_dir): der_pipe_folders.append(directory) # Remove the derivatives folders that do not include folders starting with "sub-" der_pipe_folders = [ i for i in der_pipe_folders if any(j.startswith("sub-") for j in os.listdir(os.path.join(deriv_dir, i))) ] return der_pipe_folders
####################################################################################################
[docs] def is_bids_filename(filename: str, extensive: bool = False) -> bool: """ Validates a BIDS filename structure, handling extensions and entity order. Parameters: ----------- filename : str The filename to validate, which may include extensions (e.g., .nii.gz). extensive : bool, default=False If True, performs additional checks against a BIDS configuration file to ensure that all entities in the filename are defined in the config. If False, only checks the basic structure of the filename. Returns: -------- bool True if the filename follows BIDS conventions, False otherwise. Examples: --------- >>> is_bids_filename("sub-01_ses-pre_task-rest_bold.nii.gz") True >>> is_bids_filename("sub-01_ses-pre_task-rest_bold") True >>> is_bids_filename("sub-01_ses-pre_task-rest.nii.gz") True >>> is_bids_filename("sub-01_ses-pre_task-rest") True >>> is_bids_filename("sub-01_ses-pre_task-rest_bold_extra.nii.gz") False >>> is_bids_filename("sub-01_ses-pre_task-rest_bold_extra") """ # Remove extension if present filename = cltmisc.get_real_basename(filename) base_filename = filename.split(".")[0] parts = base_filename.split("_") if len(parts) < 2: # Need at least one entity-label pair + a suffix return False entity_pattern = re.compile(r"^[a-zA-Z0-9]+-[a-zA-Z0-9]+$") suffix_pattern = re.compile(r"^[a-zA-Z0-9]+$") # All parts except the last must be valid entity-label pairs (e.g. sub-01, ses-M00) for part in parts[:-1]: if not entity_pattern.match(part): return False # The last part is the suffix (e.g. T1w, bold) — no dash allowed if not suffix_pattern.match(parts[-1]): return False # Check also that all the entities are on the json config file cwd = os.path.dirname(os.path.abspath(__file__)) default_config_path = os.path.join(cwd, "config", "bids.json") if extensive: try: with open(default_config_path, "r") as f: config_data = json.load(f) if ( "bids_entities" in config_data and "raw_entities" in config_data["bids_entities"] and "derivatives_entities" in config_data["bids_entities"] ): valid_entities = set( list(config_data["bids_entities"]["raw_entities"].keys()) + list(config_data["bids_entities"]["derivatives_entities"].keys()) ) valid_suffixes = set( config_data["bids_entities"].get("raw_suffix", []) + config_data["bids_entities"].get("derivatives_suffix", []) ) else: raise ValueError( "Default config JSON does not have the expected structure." ) except FileNotFoundError: raise FileNotFoundError( f"Default configuration file not found at: {default_config_path}" ) except json.JSONDecodeError: raise ValueError( f"Error parsing the default configuration file: {default_config_path}" ) # Validate each entity key against the known BIDS entities for part in parts[:-1]: entity_key = part.split("-")[0] if entity_key not in valid_entities: return False # Validate the suffix against the known BIDS suffixes if parts[-1] not in valid_suffixes: return False return True
####################################################################################################
[docs] def get_individual_files_and_folders( input_folder: str, cad4query: Union[str, list, dict] ): """ This function detects all the files or folders inside a folder and its subfolders containing the strings supplied by the variable cad4query. Parameters ---------- input_folder : str Path to the input folder. cad4query : str, list, or dict String or list of strings to filter the files and folders. If a dictionary is provided, it should contain key-value pairs where the key is the string before '-' and the value is the string after '-'. Returns ------- list List of files or folders that match the query. Raises ------ ValueError If the input folder does not exist. TypeError If the input folder is not a string. Examples -------- >>> input_folder = "/path/to/input/folder" >>> cad4query = "sub-01" >>> files = get_individual_files_and_folders(input_folder, cad4query) """ # Checking if the folder is not a string or if the folder does not exist if not isinstance(input_folder, str): raise TypeError("The input folder must be a string.") if not os.path.isdir(input_folder): raise ValueError(f"The input folder {input_folder} does not exist.") # Create a dictionary from cad4query where the key is the string before - and the value is the string after - if isinstance(cad4query, str): cad4query = [cad4query] if isinstance(cad4query, list): clean_id_dict = dict([i.split("-") for i in cad4query]) elif isinstance(cad4query, dict): clean_id_dict = cad4query.copy() # Detecting the all the files for the reference subject ind_der_dir = glob(os.path.join(input_folder, "sub-" + clean_id_dict["sub"] + "*")) # Filter if multiple directories found if len(ind_der_dir) > 1: ind_der_dir = cltmisc.filter_by_substring( ind_der_dir, or_filter=[clean_id_dict["sub"]], and_filter=cad4query, ) # Set count to 0 if no directory found if len(ind_der_dir) > 0: # Count files for this subject in this pipeline all_files = cltmisc.get_all_files(ind_der_dir[0]) all_files = cltmisc.filter_by_substring( all_files, or_filter=clean_id_dict["sub"], and_filter=cad4query ) else: all_files = [] return all_files
####################################################################################################
[docs] def generate_bids_tree( bids_root: str, max_depth: Optional[int] = None, show_hidden: bool = False, exclude_patterns: Optional[Set[str]] = None, save_to_file: Optional[str] = None, ) -> str: """ Generate an MS-DOS tree-style visualization of a BIDS folder structure. Parameters ---------- bids_root : str Path to the BIDS root directory. max_depth : int, optional Maximum depth to traverse. If None (default), traverses entire directory structure without depth limitation. show_hidden : bool, optional Whether to show hidden files and folders (starting with '.'). Default is False. exclude_patterns : set of str, optional Set of file/folder name patterns to exclude from the tree. If None, defaults to {'.git', '__pycache__', '.DS_Store', 'Thumbs.db'}. save_to_file : str, optional Path to save the tree output as a text file. If None, only returns the string without saving. Returns ------- str MS-DOS tree representation of the BIDS structure with proper tree symbols (├──, └──, │) and directory indicators (/). Raises ------ FileNotFoundError If the specified bids_root path does not exist. NotADirectoryError If the specified bids_root path is not a directory. PermissionError If there are insufficient permissions to read certain directories. Individual permission errors are handled gracefully and noted in output. OSError If there are file system related errors during tree generation or file saving operations. Notes ----- - Directories are displayed with a trailing '/' to distinguish from files - Items are sorted with directories first, then files, both alphabetically - Hidden files/folders (starting with '.') are excluded by default - Permission errors for individual subdirectories are handled gracefully - The tree uses standard MS-DOS tree symbols for proper visualization - When max_depth is None, the entire directory structure is traversed Examples -------- Basic usage with unlimited depth: >>> tree = generate_bids_tree('/path/to/bids/dataset') >>> print(tree) my-bids-dataset/ ├── dataset_description.json ├── participants.tsv ├── sub-01/ │ ├── anat/ │ │ └── sub-01_T1w.nii.gz │ └── func/ │ ├── sub-01_task-rest_bold.nii.gz │ └── sub-01_task-rest_events.tsv └── derivatives/ └── preprocessing/ └── sub-01/ Limited depth with file saving: >>> tree = generate_bids_tree('/path/to/bids/dataset', ... max_depth=2, ... save_to_file='bids_tree.txt') >>> print("Tree saved to bids_tree.txt") Include hidden files and custom exclusions: >>> tree = generate_bids_tree('/path/to/bids/dataset', ... show_hidden=True, ... exclude_patterns={'temp', 'backup'}) """ if exclude_patterns is None: exclude_patterns = {".git", "__pycache__", ".DS_Store", "Thumbs.db"} bids_path = Path(bids_root) if not bids_path.exists(): raise FileNotFoundError(f"The specified path does not exist: {bids_root}") if not bids_path.is_dir(): raise NotADirectoryError(f"The specified path is not a directory: {bids_root}") tree_lines = [f"{bids_path.name}/"] def _build_tree(current_path: Path, prefix: str = "", depth: int = 0) -> None: """Recursively build the tree structure.""" if max_depth is not None and depth >= max_depth: return try: # Get all items in current directory items = [] for item in current_path.iterdir(): # Skip hidden files if not requested if not show_hidden and item.name.startswith("."): continue # Skip excluded patterns if item.name in exclude_patterns: continue items.append(item) # Sort items: directories first, then files, both alphabetically items.sort(key=lambda x: (x.is_file(), x.name.lower())) for i, item in enumerate(items): is_last = i == len(items) - 1 # Choose the appropriate tree symbols if is_last: current_prefix = "└── " next_prefix = prefix + " " else: current_prefix = "├── " next_prefix = prefix + "│ " # Add item to tree if item.is_dir(): tree_lines.append(f"{prefix}{current_prefix}{item.name}/") # Recursively process subdirectory _build_tree(item, next_prefix, depth + 1) else: tree_lines.append(f"{prefix}{current_prefix}{item.name}") except PermissionError: tree_lines.append(f"{prefix}├── [Permission Denied]") except Exception as e: tree_lines.append(f"{prefix}├── [Error: {str(e)}]") # Build the tree starting from the root _build_tree(bids_path) # Join all tree lines tree_output = "\n".join(tree_lines) # Save to file if requested if save_to_file: try: with open(save_to_file, "w", encoding="utf-8") as f: f.write(tree_output) except OSError as e: raise OSError(f"Failed to save tree to file '{save_to_file}': {str(e)}") return tree_output
####################################################################################################
[docs] def generate_bids_tree_with_stats(bids_root: str, **kwargs) -> str: """ Generate a BIDS tree with additional statistics. Parameters ---------- bids_root : str Path to the BIDS root directory. **kwargs Additional keyword arguments passed to generate_bids_tree(). See generate_bids_tree() documentation for available parameters. Returns ------- str Tree representation with file and folder count statistics appended. Raises ------ FileNotFoundError If the specified bids_root path does not exist. NotADirectoryError If the specified bids_root path is not a directory. PermissionError If there are insufficient permissions to read directories. OSError If there are file system related errors. Notes ----- Statistics are calculated by recursively counting all files and directories in the BIDS structure, regardless of the max_depth parameter used for tree visualization. Examples -------- >>> tree_with_stats = generate_bids_tree_with_stats('/path/to/bids/dataset') >>> print(tree_with_stats) my-bids-dataset/ ├── dataset_description.json └── sub-01/ └── anat/ └── sub-01_T1w.nii.gz Statistics: ├── Directories: 2 └── Files: 2 """ tree = generate_bids_tree(bids_root, **kwargs) # Count files and directories bids_path = Path(bids_root) file_count = 0 dir_count = 0 try: for item in bids_path.rglob("*"): if item.is_file(): file_count += 1 elif item.is_dir(): dir_count += 1 except Exception: pass stats = f"\n\nStatistics:\n├── Directories: {dir_count}\n└── Files: {file_count}" return tree + stats
####################################################################################################
[docs] def validate_bids_structure(bids_root: str) -> List[str]: """ Basic validation of BIDS structure and return warnings. Parameters ---------- bids_root : str Path to the BIDS root directory. Returns ------- list of str List of validation warnings and notes about the BIDS structure. Empty list indicates no issues found. Raises ------ FileNotFoundError If the specified bids_root path does not exist. NotADirectoryError If the specified bids_root path is not a directory. Notes ----- This function performs basic BIDS validation including: - Checking for required files (dataset_description.json) - Verifying presence of subject directories (sub-*) - Noting presence of derivatives directory For comprehensive BIDS validation, consider using the official BIDS validator tool. Examples -------- >>> warnings = validate_bids_structure('/path/to/bids/dataset') >>> if warnings: ... for warning in warnings: ... print(f"⚠️ {warning}") >>> else: ... print("✅ Basic BIDS structure looks good!") """ warnings = [] bids_path = Path(bids_root) if not bids_path.exists(): raise FileNotFoundError(f"The specified path does not exist: {bids_root}") if not bids_path.is_dir(): raise NotADirectoryError(f"The specified path is not a directory: {bids_root}") # Check for required files required_files = ["dataset_description.json"] for req_file in required_files: if not (bids_path / req_file).exists(): warnings.append(f"Missing required file: {req_file}") # Check for common BIDS directories common_dirs = ["sub-*"] # Using glob pattern subject_dirs = list(bids_path.glob("sub-*")) if not subject_dirs: warnings.append("No subject directories found (sub-*)") # Check for derivatives directory derivatives_dir = bids_path / "derivatives" if derivatives_dir.exists(): warnings.append("Derivatives directory found") return warnings
#################################################################################################### #################################################################################################### ############ ############ ############ ############ ############ Section 3: Useful methods for BIDs file organization ############ ############ ############ ############ ############ #################################################################################################### #################################################################################################### # Loading the JSON file containing the BIDs configuration file. It includes the entities and suffixes # of the BIDs dataset. ####################################################################################################
[docs] def load_bids_json(bids_json: str = None): """ Load the JSON file containing the BIDs configuration file. Parameters ---------- bids_json : str JSON file containing the BIDs configuration. Returns -------- config_dict : dict Dictionary containing the default . """ # Get the absolute of this file if bids_json is None: cwd = os.path.dirname(os.path.abspath(__file__)) bids_json = os.path.join(cwd, "config", "bids.json") else: if not os.path.isfile(bids_json): raise ValueError( "Please, provide a valid JSON file containing the BIDs configuration dictionary." ) try: with open(bids_json) as f: config_dict = json.load(f) except FileNotFoundError: raise FileNotFoundError(f"Default configuration file not found at: {bids_json}") except json.JSONDecodeError: raise ValueError(f"Error parsing the default configuration file: {bids_json}") return config_dict