pipelinetools module
- clabtoolkit.pipelinetools.get_ids2process(ids=None, in_dir=None)[source]
Get list of subject IDs to process from various input sources.
- Parameters:
ids (str, list of str, or None, optional) – Subject IDs specification. Can be: - None: discover all subjects in in_dir (default) - list: list of subject ID strings - str: comma-separated IDs, single ID, or path to text file
in_dir (str, optional) – Directory path to scan for subjects when ids is None. Only used when ids is None.
- Returns:
List of subject ID strings, with empty entries filtered out.
- Return type:
- Raises:
ValueError – If ids is not None/list/str, or if in_dir is invalid when ids is None.
FileNotFoundError – If specified file path in ids does not exist.
IOError – If file cannot be read due to permissions or other IO issues.
Examples
>>> # Discover subjects from directory >>> get_ids2process(ids=None, in_dir='/data/subjects') ['sub-001', 'sub-002', 'sub-003']
>>> # From list >>> get_ids2process(['sub-001', 'sub-002']) ['sub-001', 'sub-002']
>>> # From comma-separated string >>> get_ids2process('sub-001, sub-002, sub-003') ['sub-001', 'sub-002', 'sub-003']
>>> # Single subject ID >>> get_ids2process('sub-001') ['sub-001']
>>> # From text file >>> get_ids2process('/path/to/subjects.txt') ['sub-001', 'sub-002', 'sub-003']
Notes
When scanning directories (ids=None), only directories starting with ‘sub-’ are considered valid subject directories.
Text files should contain one subject ID per line. Empty lines and whitespace are automatically filtered out.
- clabtoolkit.pipelinetools.create_processing_status_table(deriv_dir, subj_ids, output_table=None, n_jobs=-1)[source]
This method creates a table with the processing status of the subjects in the BIDs derivatives directory. Uses parallel processing for improved performance with rich progress visualization.
- Parameters:
deriv_dir (str) – Path to the derivatives directory.
subj_ids (list or str) – List of subject IDs or a text file containing the subject IDs.
output_table (str, optional) – Path to save the resulting table. If None, the table is not saved.
n_jobs (int, optional) – Number of parallel jobs to run. Default is -1 which uses all available cores.
- Returns:
pd.DataFrame – DataFrame containing the processing status of the subjects.
str – Path to the saved table if output_table is provided, otherwise None.
- Raises:
FileNotFoundError – If the derivatives directory or the subject IDs file does not exist.
ValueError – If no derivatives folders are found or if the subject IDs list is empty.
TypeError – If subj_ids is not a list or a string path to a file.
Examples
>>> deriv_dir = "/path/to/derivatives" >>> subj_ids = ["sub-01", "sub-02"] >>> output_table = "/path/to/output_table.csv" >>> df, saved_path = create_processing_status_table(deriv_dir, subj_ids, output_table) >>> print(df)
- clabtoolkit.pipelinetools.process_file(filepath)[source]
Parse BIDS entities from a single file path.
- Parameters:
filepath (str) – Full path to the file to be processed.
- Returns:
DataFrame with extracted entities if parsing is successful, otherwise None.
- Return type:
pd.DataFrame or None
- clabtoolkit.pipelinetools.process_freesurfer_subject(args)[source]
Process a single FreeSurfer subject.
- Parameters:
args (tuple) – Tuple containing (fs_id, pipe_dir) where: - fs_id: FreeSurfer subject ID (e.g., ‘sub-001’) - pipe_dir: Path to the pipeline derivatives directory to scan for this subject (e.g., ‘/path/to/derivatives/fsl-firstparc’)
- Returns:
DataFrame with file type counts for the subject if successful, otherwise None.
- Return type:
pd.DataFrame or None
- clabtoolkit.pipelinetools.scan_derivatives(pipe_dir, subj_ids=None, extensions=['.nii.gz', '.nii', '.mgz', '.stats', '.annot', '.gii', '.gii.gz'])[source]
Recursively collect all matching files under the derivatives folder.
- Parameters:
deriv_dir (str) – Path to the derivatives directory to scan.
extensions (list, optional) – Tuple of file extensions to include in the scan. Default is [“.nii.gz”, “.nii”, “.mgz”, “.stats”, “.annot”, “.gii”, “.gii.gz”].
Returns
list – Sorted list of file paths that match the specified extensions and start with “sub-“.
Notes
Only files that start with “sub-” and end with one of the specified extensions are included.
- clabtoolkit.pipelinetools.build_inventory(deriv_dir, pipe_id, pipe_index, pipe_total, progress, subj_ids=None, extensions=['.nii.gz', '.nii', '.mgz', '.stats', '.annot', '.gii', '.gii.gz'], output_csv=None, n_workers=8)[source]
Build a file inventory for a single pipeline derivative folder.
- Parameters:
deriv_dir (str) – Root derivatives directory.
pipe_id (str) – Name of the pipeline sub-folder inside deriv_dir.
pipe_index (int) – 1-based index of this pipeline (used in the progress bar label).
pipe_total (int) – Total number of pipelines being processed (used in the progress bar label).
progress (Progress) – Active Rich Progress instance to attach progress bars to.
extensions (list, optional) – File extensions to include in the scan. Defaults to [“.nii.gz”, “.nii”, “.mgz”, “.stats”, “.annot”, “.gii”, “.gii.gz”].
output_csv (str or Path, optional) – If provided, save the inventory DataFrame to this CSV path.
n_workers (int, optional) – Number of parallel worker threads.
- Returns:
Inventory table for this pipeline.
- Return type:
pd.DataFrame
- clabtoolkit.pipelinetools.build_derivatives_inventory(deriv_dir, pipe_dirs=None, subj_ids=None, extensions=['.nii.gz', '.nii', '.mgz', '.stats', '.annot', '.gii', '.gii.gz'], output_csv=None, n_workers=8)[source]
Build a combined file inventory across all pipeline derivative folders.
- Parameters:
deriv_dir (str) – Root derivatives directory containing one sub-folder per pipeline.
pipe_dirs (list of str, optional) – Pipeline sub-folder names to process. If None, all sub-folders discovered by cltbids.get_derivatives_folders() are used.
extensions (list, optional) –
File extensions to include in the scan.
Defaults to (“.nii.gz”, “.nii”).
output_csv (str or Path, optional) – If provided, save the combined inventory DataFrame to this CSV path.
n_workers (int, optional) – Number of parallel worker threads per pipeline. Defaults to 8.
- Returns:
Combined inventory table with an extra leading “Pipeline” column.
- Return type:
pd.DataFrame
- Raises:
ValueError – If no pipeline folders are found or provided.
- clabtoolkit.pipelinetools.get_processing_status_details_json(proc_status_df, subj_ids, deriv_dir, pipe_dirs=None, out_json=None, only_ids=False)[source]
This function creates a dictionary with the details of the processing status of the subjects in the BIDs derivatives directory. It provides the IDs of the subjects with incomplete or mismatched number of files.
- Parameters:
proc_status_df (str or dict) – Path to the processing status DataFrame or a DataFrame itself. This DataFrame can be obtained with the function “create_processing_status_table”.
subj_ids (list or str) – List of subject IDs or a text file containing the subject IDs.
deriv_dir (str) – Path to the derivatives directory.
pipe_dirs (list or str, optional) – List of processing pipelines to check. If None, all pipelines will be checked.
out_json (str, optional) – Path to save the output JSON file. If None, the JSON file will not be saved.
only_ids (bool, optional) – If True, only the IDs of the subjects with mismatches will be returned, without the file details.
- Returns:
dict – Dictionary containing the details of the processing status of the subjects.
str – Path to the saved JSON file if out_json is provided, otherwise None.
- clabtoolkit.pipelinetools.get_processing_status_details_sqlite3(proc_status_df, subj_ids, deriv_dir, pipe_dirs=None, out_json=None, db_path=None, only_ids=False)[source]
This function creates a dictionary with the details of the processing status of the subjects in the BIDs derivatives directory. It provides the IDs of the subjects with incomplete or mismatched number of files.
- Parameters:
proc_status_df (str or dict) – Path to the processing status DataFrame or a DataFrame itself. This DataFrame can be obtained with the function “create_processing_status_table”.
subj_ids (list or str) – List of subject IDs or a text file containing the subject IDs.
deriv_dir (str) – Path to the derivatives directory.
pipe_dirs (list or str, optional) – List of processing pipelines to check. If None, all pipelines will be checked.
out_json (str, optional) – Path to save the output JSON file. If None, the JSON file will not be saved.
db_path (str, optional) – Path to save the SQLite database file. If None, the database will not be created.
only_ids (bool, optional) – If True, only the IDs of the subjects with mismatches will be returned, without the file details.
- Returns:
dict – Dictionary containing the details of the processing status of the subjects.
str – Path to the saved JSON file if out_json is provided, otherwise None.
- clabtoolkit.pipelinetools.query_processing_status_db(db_path, query_type='subjects_with_mismatches', pipeline=None)[source]
Query the processing status database to extract useful information.
- Parameters:
db_path (str) – Path to the SQLite database file.
query_type (str, optional) – Type of query to run. Options: - “subjects_with_mismatches”: Get all subjects with mismatches - “pipelines_with_mismatches”: Get all pipelines with mismatches and count - “missing_files_count”: Get number of missing files per subject - “extra_files_count”: Get number of extra files per subject
pipeline (str, optional) – Name of the pipeline to filter by. Used only with certain query types.
- Returns:
Result of the query as a DataFrame.
- Return type:
pd.DataFrame
- clabtoolkit.pipelinetools.export_db_to_json(db_path, out_json)[source]
Export the processing status database to a JSON file in the same format as returned by get_processing_status_details.
The pipelinetools module provides workflow orchestration and batch processing capabilities for large-scale neuroimaging analysis pipelines.
Key Features
Subject ID management for batch processing
Parallel processing utilities with progress tracking
Pipeline workflow orchestration
Error handling and recovery mechanisms
Integration with BIDS datasets
Resource management and optimization
Main Functions
Batch Processing
get_ids2process(): Generate subject IDs for batch processing workflowscreate_processing_status_table(): Create table to track processing statusget_processing_status_details_json(): Get processing status details in JSON formatget_processing_status_details_sqlite3(): Get processing status details from SQLite databasequery_processing_status_db(): Query processing status databaseexport_db_to_json(): Export database contents to JSON format
Common Usage Examples
Batch subject processing:
from clabtoolkit.pipelinetools import get_ids2process
from concurrent.futures import ThreadPoolExecutor
# Get subjects to process from BIDS dataset
subjects_to_process = get_ids2process(
bids_dir="/path/to/bids/dataset",
exclude_processed="/path/to/derivatives",
pattern="sub-*"
)
print(f"Processing {len(subjects_to_process)} subjects")
Parallel processing pipeline:
def process_single_subject(subject_id):
"""Process a single subject through the analysis pipeline"""
# Your processing logic here
return f"Processed {subject_id}"
# Execute parallel processing
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_single_subject, subjects_to_process))
Processing status tracking:
# Create processing status tracking table
status_table = create_processing_status_table(
subjects_list=subjects_to_process,
processing_steps=["preproc", "morphometry", "qc"]
)
# Query processing status from database
status_results = query_processing_status_db(
database_path="/path/to/processing_status.db",
query="SELECT * FROM status WHERE step='morphometry'"
)
Data export and management:
# Export processing status to JSON
export_db_to_json(
database_path="/path/to/processing_status.db",
output_json="/path/to/status_export.json"
)
# Get detailed processing status
details = get_processing_status_details_json(
json_file="/path/to/status.json",
subject_filter=["sub-001", "sub-002"]
)