pipelinetools module

clabtoolkit.pipelinetools.get_ids2process(ids=None, in_dir=None)[source]

Get list of subject IDs to process from various input sources.

Parameters:
  • ids (str, list of str, or None, optional) – Subject IDs specification. Can be: - None: discover all subjects in in_dir (default) - list: list of subject ID strings - str: comma-separated IDs, single ID, or path to text file

  • in_dir (str, optional) – Directory path to scan for subjects when ids is None. Only used when ids is None.

Returns:

List of subject ID strings, with empty entries filtered out.

Return type:

list of str

Raises:
  • ValueError – If ids is not None/list/str, or if in_dir is invalid when ids is None.

  • FileNotFoundError – If specified file path in ids does not exist.

  • IOError – If file cannot be read due to permissions or other IO issues.

Examples

>>> # Discover subjects from directory
>>> get_ids2process(ids=None, in_dir='/data/subjects')
['sub-001', 'sub-002', 'sub-003']
>>> # From list
>>> get_ids2process(['sub-001', 'sub-002'])
['sub-001', 'sub-002']
>>> # From comma-separated string
>>> get_ids2process('sub-001, sub-002, sub-003')
['sub-001', 'sub-002', 'sub-003']
>>> # Single subject ID
>>> get_ids2process('sub-001')
['sub-001']
>>> # From text file
>>> get_ids2process('/path/to/subjects.txt')
['sub-001', 'sub-002', 'sub-003']

Notes

When scanning directories (ids=None), only directories starting with ‘sub-’ are considered valid subject directories.

Text files should contain one subject ID per line. Empty lines and whitespace are automatically filtered out.

clabtoolkit.pipelinetools.create_processing_status_table(deriv_dir, subj_ids, output_table=None, n_jobs=-1)[source]

This method creates a table with the processing status of the subjects in the BIDs derivatives directory. Uses parallel processing for improved performance with rich progress visualization.

Parameters:
  • deriv_dir (str) – Path to the derivatives directory.

  • subj_ids (list or str) – List of subject IDs or a text file containing the subject IDs.

  • output_table (str, optional) – Path to save the resulting table. If None, the table is not saved.

  • n_jobs (int, optional) – Number of parallel jobs to run. Default is -1 which uses all available cores.

Returns:

  • pd.DataFrame – DataFrame containing the processing status of the subjects.

  • str – Path to the saved table if output_table is provided, otherwise None.

Raises:
  • FileNotFoundError – If the derivatives directory or the subject IDs file does not exist.

  • ValueError – If no derivatives folders are found or if the subject IDs list is empty.

  • TypeError – If subj_ids is not a list or a string path to a file.

Examples

>>> deriv_dir = "/path/to/derivatives"
>>> subj_ids = ["sub-01", "sub-02"]
>>> output_table = "/path/to/output_table.csv"
>>> df, saved_path = create_processing_status_table(deriv_dir, subj_ids, output_table)
>>> print(df)
clabtoolkit.pipelinetools.process_file(filepath)[source]

Parse BIDS entities from a single file path.

Parameters:

filepath (str) – Full path to the file to be processed.

Returns:

DataFrame with extracted entities if parsing is successful, otherwise None.

Return type:

pd.DataFrame or None

clabtoolkit.pipelinetools.process_freesurfer_subject(args)[source]

Process a single FreeSurfer subject.

Parameters:

args (tuple) – Tuple containing (fs_id, pipe_dir) where: - fs_id: FreeSurfer subject ID (e.g., ‘sub-001’) - pipe_dir: Path to the pipeline derivatives directory to scan for this subject (e.g., ‘/path/to/derivatives/fsl-firstparc’)

Returns:

DataFrame with file type counts for the subject if successful, otherwise None.

Return type:

pd.DataFrame or None

clabtoolkit.pipelinetools.scan_derivatives(pipe_dir, subj_ids=None, extensions=['.nii.gz', '.nii', '.mgz', '.stats', '.annot', '.gii', '.gii.gz'])[source]

Recursively collect all matching files under the derivatives folder.

Parameters:
  • deriv_dir (str) – Path to the derivatives directory to scan.

  • extensions (list, optional) – Tuple of file extensions to include in the scan. Default is [“.nii.gz”, “.nii”, “.mgz”, “.stats”, “.annot”, “.gii”, “.gii.gz”].

  • Returns

  • list – Sorted list of file paths that match the specified extensions and start with “sub-“.

Notes

  • Only files that start with “sub-” and end with one of the specified extensions are included.

clabtoolkit.pipelinetools.build_inventory(deriv_dir, pipe_id, pipe_index, pipe_total, progress, subj_ids=None, extensions=['.nii.gz', '.nii', '.mgz', '.stats', '.annot', '.gii', '.gii.gz'], output_csv=None, n_workers=8)[source]

Build a file inventory for a single pipeline derivative folder.

Parameters:
  • deriv_dir (str) – Root derivatives directory.

  • pipe_id (str) – Name of the pipeline sub-folder inside deriv_dir.

  • pipe_index (int) – 1-based index of this pipeline (used in the progress bar label).

  • pipe_total (int) – Total number of pipelines being processed (used in the progress bar label).

  • progress (Progress) – Active Rich Progress instance to attach progress bars to.

  • extensions (list, optional) – File extensions to include in the scan. Defaults to [“.nii.gz”, “.nii”, “.mgz”, “.stats”, “.annot”, “.gii”, “.gii.gz”].

  • output_csv (str or Path, optional) – If provided, save the inventory DataFrame to this CSV path.

  • n_workers (int, optional) – Number of parallel worker threads.

Returns:

Inventory table for this pipeline.

Return type:

pd.DataFrame

clabtoolkit.pipelinetools.build_derivatives_inventory(deriv_dir, pipe_dirs=None, subj_ids=None, extensions=['.nii.gz', '.nii', '.mgz', '.stats', '.annot', '.gii', '.gii.gz'], output_csv=None, n_workers=8)[source]

Build a combined file inventory across all pipeline derivative folders.

Parameters:
  • deriv_dir (str) – Root derivatives directory containing one sub-folder per pipeline.

  • pipe_dirs (list of str, optional) – Pipeline sub-folder names to process. If None, all sub-folders discovered by cltbids.get_derivatives_folders() are used.

  • extensions (list, optional) –

    File extensions to include in the scan.

    Defaults to (“.nii.gz”, “.nii”).

  • output_csv (str or Path, optional) – If provided, save the combined inventory DataFrame to this CSV path.

  • n_workers (int, optional) – Number of parallel worker threads per pipeline. Defaults to 8.

Returns:

Combined inventory table with an extra leading “Pipeline” column.

Return type:

pd.DataFrame

Raises:

ValueError – If no pipeline folders are found or provided.

clabtoolkit.pipelinetools.get_processing_status_details_json(proc_status_df, subj_ids, deriv_dir, pipe_dirs=None, out_json=None, only_ids=False)[source]

This function creates a dictionary with the details of the processing status of the subjects in the BIDs derivatives directory. It provides the IDs of the subjects with incomplete or mismatched number of files.

Parameters:
  • proc_status_df (str or dict) – Path to the processing status DataFrame or a DataFrame itself. This DataFrame can be obtained with the function “create_processing_status_table”.

  • subj_ids (list or str) – List of subject IDs or a text file containing the subject IDs.

  • deriv_dir (str) – Path to the derivatives directory.

  • pipe_dirs (list or str, optional) – List of processing pipelines to check. If None, all pipelines will be checked.

  • out_json (str, optional) – Path to save the output JSON file. If None, the JSON file will not be saved.

  • only_ids (bool, optional) – If True, only the IDs of the subjects with mismatches will be returned, without the file details.

Returns:

  • dict – Dictionary containing the details of the processing status of the subjects.

  • str – Path to the saved JSON file if out_json is provided, otherwise None.

clabtoolkit.pipelinetools.get_processing_status_details_sqlite3(proc_status_df, subj_ids, deriv_dir, pipe_dirs=None, out_json=None, db_path=None, only_ids=False)[source]

This function creates a dictionary with the details of the processing status of the subjects in the BIDs derivatives directory. It provides the IDs of the subjects with incomplete or mismatched number of files.

Parameters:
  • proc_status_df (str or dict) – Path to the processing status DataFrame or a DataFrame itself. This DataFrame can be obtained with the function “create_processing_status_table”.

  • subj_ids (list or str) – List of subject IDs or a text file containing the subject IDs.

  • deriv_dir (str) – Path to the derivatives directory.

  • pipe_dirs (list or str, optional) – List of processing pipelines to check. If None, all pipelines will be checked.

  • out_json (str, optional) – Path to save the output JSON file. If None, the JSON file will not be saved.

  • db_path (str, optional) – Path to save the SQLite database file. If None, the database will not be created.

  • only_ids (bool, optional) – If True, only the IDs of the subjects with mismatches will be returned, without the file details.

Returns:

  • dict – Dictionary containing the details of the processing status of the subjects.

  • str – Path to the saved JSON file if out_json is provided, otherwise None.

clabtoolkit.pipelinetools.query_processing_status_db(db_path, query_type='subjects_with_mismatches', pipeline=None)[source]

Query the processing status database to extract useful information.

Parameters:
  • db_path (str) – Path to the SQLite database file.

  • query_type (str, optional) – Type of query to run. Options: - “subjects_with_mismatches”: Get all subjects with mismatches - “pipelines_with_mismatches”: Get all pipelines with mismatches and count - “missing_files_count”: Get number of missing files per subject - “extra_files_count”: Get number of extra files per subject

  • pipeline (str, optional) – Name of the pipeline to filter by. Used only with certain query types.

Returns:

Result of the query as a DataFrame.

Return type:

pd.DataFrame

clabtoolkit.pipelinetools.export_db_to_json(db_path, out_json)[source]

Export the processing status database to a JSON file in the same format as returned by get_processing_status_details.

Parameters:
  • db_path (str) – Path to the SQLite database file.

  • out_json (str) – Path to save the output JSON file.

Returns:

Dictionary containing the details of the processing status of the subjects.

Return type:

dict

The pipelinetools module provides workflow orchestration and batch processing capabilities for large-scale neuroimaging analysis pipelines.

Key Features

  • Subject ID management for batch processing

  • Parallel processing utilities with progress tracking

  • Pipeline workflow orchestration

  • Error handling and recovery mechanisms

  • Integration with BIDS datasets

  • Resource management and optimization

Main Functions

Batch Processing

  • get_ids2process(): Generate subject IDs for batch processing workflows

  • create_processing_status_table(): Create table to track processing status

  • get_processing_status_details_json(): Get processing status details in JSON format

  • get_processing_status_details_sqlite3(): Get processing status details from SQLite database

  • query_processing_status_db(): Query processing status database

  • export_db_to_json(): Export database contents to JSON format

Common Usage Examples

Batch subject processing:

from clabtoolkit.pipelinetools import get_ids2process
from concurrent.futures import ThreadPoolExecutor

# Get subjects to process from BIDS dataset
subjects_to_process = get_ids2process(
    bids_dir="/path/to/bids/dataset",
    exclude_processed="/path/to/derivatives",
    pattern="sub-*"
)

print(f"Processing {len(subjects_to_process)} subjects")

Parallel processing pipeline:

def process_single_subject(subject_id):
    """Process a single subject through the analysis pipeline"""
    # Your processing logic here
    return f"Processed {subject_id}"

# Execute parallel processing
with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(process_single_subject, subjects_to_process))

Processing status tracking:

# Create processing status tracking table
status_table = create_processing_status_table(
    subjects_list=subjects_to_process,
    processing_steps=["preproc", "morphometry", "qc"]
)

# Query processing status from database
status_results = query_processing_status_db(
    database_path="/path/to/processing_status.db",
    query="SELECT * FROM status WHERE step='morphometry'"
)

Data export and management:

# Export processing status to JSON
export_db_to_json(
    database_path="/path/to/processing_status.db",
    output_json="/path/to/status_export.json"
)

# Get detailed processing status
details = get_processing_status_details_json(
    json_file="/path/to/status.json",
    subject_filter=["sub-001", "sub-002"]
)