# Standard library imports
import json
import os
import queue
import re
import shutil
import threading
import time
import warnings
from concurrent.futures import ThreadPoolExecutor, as_completed
from glob import glob
from pathlib import Path
from typing import Dict, List, Optional, Union
# Third-party imports
import numpy as np
import pandas as pd
from rich.console import Console
from rich.panel import Panel
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
SpinnerColumn,
TextColumn,
TimeRemainingColumn,
)
# Local imports
# - Importing the submodules of clabtoolkit that are used in this file. This is done to avoid circular imports and to keep the code organized.
from . import bidstools as cltbids
from . import freesurfertools as cltfree
from . import misctools as cltmisc
# Suppress specific deprecation warnings from ipykernel and ipywidgets that can occur in certain environments
warnings.filterwarnings(
"ignore",
message="Kernel._parent_header is deprecated",
category=DeprecationWarning,
)
warnings.filterwarnings("ignore", category=DeprecationWarning, module="ipywidgets")
warnings.filterwarnings("ignore", category=DeprecationWarning, module="ipykernel")
####################################################################################################
####################################################################################################
############ ############
############ ############
############ Section 1: Utility methods ############
############ ############
############ ############
####################################################################################################
####################################################################################################
[docs]
def get_ids2process(
ids: Union[str, List[str], None] = None, in_dir: str = None
) -> List[str]:
"""
Get list of subject IDs to process from various input sources.
Parameters
----------
ids : str, list of str, or None, optional
Subject IDs specification. Can be:
- None: discover all subjects in `in_dir` (default)
- list: list of subject ID strings
- str: comma-separated IDs, single ID, or path to text file
in_dir : str, optional
Directory path to scan for subjects when `ids` is None.
Only used when `ids` is None.
Returns
-------
list of str
List of subject ID strings, with empty entries filtered out.
Raises
------
ValueError
If `ids` is not None/list/str, or if `in_dir` is invalid when `ids` is None.
FileNotFoundError
If specified file path in `ids` does not exist.
IOError
If file cannot be read due to permissions or other IO issues.
Examples
--------
>>> # Discover subjects from directory
>>> get_ids2process(ids=None, in_dir='/data/subjects')
['sub-001', 'sub-002', 'sub-003']
>>> # From list
>>> get_ids2process(['sub-001', 'sub-002'])
['sub-001', 'sub-002']
>>> # From comma-separated string
>>> get_ids2process('sub-001, sub-002, sub-003')
['sub-001', 'sub-002', 'sub-003']
>>> # Single subject ID
>>> get_ids2process('sub-001')
['sub-001']
>>> # From text file
>>> get_ids2process('/path/to/subjects.txt')
['sub-001', 'sub-002', 'sub-003']
Notes
-----
When scanning directories (ids=None), only directories starting with 'sub-'
are considered valid subject directories.
Text files should contain one subject ID per line. Empty lines and
whitespace are automatically filtered out.
"""
# Handle None case - discover from directory
if ids is None:
if not in_dir or not os.path.isdir(in_dir):
raise ValueError(f"Valid in_dir required when ids is None. Got: {in_dir}")
return [d for d in os.listdir(in_dir) if d.startswith("sub-")]
# Handle list case
if isinstance(ids, list):
return [str(id_).strip() for id_ in ids if str(id_).strip()]
# Handle string case
if isinstance(ids, str):
ids = ids.strip()
if not ids:
return []
# File path
if os.path.isfile(ids):
with open(ids, "r", encoding="utf-8") as f:
return [line.strip() for line in f if line.strip()]
# Comma-separated or single ID
return [id_.strip() for id_ in ids.split(",") if id_.strip()]
raise ValueError(f"ids must be None, list, or string, got {type(ids)}")
####################################################################################################
####################################################################################################
############ ############
############ ############
############ Section 2: Methods for assessing the processing status of the pipelines ############
############ ############
############ ############
####################################################################################################
####################################################################################################
[docs]
def create_processing_status_table(
deriv_dir: str,
subj_ids: Union[list, str],
output_table: str = None,
n_jobs: int = -1,
):
"""
This method creates a table with the processing status of the subjects in the BIDs derivatives directory.
Uses parallel processing for improved performance with rich progress visualization.
Parameters
----------
deriv_dir : str
Path to the derivatives directory.
subj_ids : list or str
List of subject IDs or a text file containing the subject IDs.
output_table : str, optional
Path to save the resulting table. If None, the table is not saved.
n_jobs : int, optional
Number of parallel jobs to run. Default is -1 which uses all available cores.
Returns
-------
pd.DataFrame
DataFrame containing the processing status of the subjects.
str
Path to the saved table if output_table is provided, otherwise None.
Raises
------
FileNotFoundError
If the derivatives directory or the subject IDs file does not exist.
ValueError
If no derivatives folders are found or if the subject IDs list is empty.
TypeError
If subj_ids is not a list or a string path to a file.
Examples
--------
>>> deriv_dir = "/path/to/derivatives"
>>> subj_ids = ["sub-01", "sub-02"]
>>> output_table = "/path/to/output_table.csv"
>>> df, saved_path = create_processing_status_table(deriv_dir, subj_ids, output_table)
>>> print(df)
"""
from joblib import Parallel, delayed
from . import morphometrytools as cltmorpho
# Initialize rich console
console = Console()
# Check if the derivatives directory exists
deriv_dir = cltmisc.remove_trailing_separators(deriv_dir)
if not os.path.isdir(deriv_dir):
raise FileNotFoundError(
f"The derivatives directory {deriv_dir} does not exist."
)
# Process subject IDs
if isinstance(subj_ids, str):
if not os.path.isfile(subj_ids):
raise FileNotFoundError(f"The file {subj_ids} does not exist.")
else:
with open(subj_ids, "r") as f:
subj_list = f.read().splitlines()
elif isinstance(subj_ids, list):
if len(subj_ids) == 0:
raise ValueError("The list of subject IDs is empty.")
else:
subj_list = subj_ids
else:
raise TypeError("subj_ids must be a list or a string path to a file")
# Number of Ids
n_subj = len(subj_list)
# Find all the derivatives folders
pipe_dirs = cltbids.get_derivatives_folders(deriv_dir)
if len(pipe_dirs) == 0:
raise ValueError(
"No derivatives folders were found in the specified directory."
)
# Create a message queue to communicate across threads
progress_queue = queue.Queue()
# Function to process a single subject
def process_subject(full_id):
try:
# Parse the subject ID
id_dict = cltbids.str2entity(full_id)
subject = id_dict["sub"]
# Get entity information for this subject
ent_list = cltbids.entities4table(selected_entities=full_id)
df_add = cltbids.entities_to_table(
filepath=full_id, entities_to_extract=ent_list
)
# Create a new DataFrame for this subject's processing status
proc_table = pd.DataFrame(
columns=pipe_dirs, index=[0]
) # Single row for this subject
# Remove suffix and extension from entities
clean_id_dict = id_dict.copy()
if "suffix" in clean_id_dict:
del clean_id_dict["suffix"]
if "extension" in clean_id_dict:
del clean_id_dict["extension"]
# Create list of entity key-value pairs
subj_ent = [f"{k}-{v}" for k, v in clean_id_dict.items()]
# Process each derivatives directory
for tmp_pipe_deriv in pipe_dirs:
# Find subject's directory in this pipeline
ind_der_dir = glob(
os.path.join(
deriv_dir, tmp_pipe_deriv, "sub-" + clean_id_dict["sub"] + "*"
)
)
# Filter if multiple directories found
if len(ind_der_dir) > 1:
ind_der_dir = cltmisc.filter_by_substring(
ind_der_dir,
or_filter=[clean_id_dict["sub"]],
and_filter=subj_ent,
)
# Set count to 0 if no directory found
if len(ind_der_dir) == 0:
proc_table.at[0, tmp_pipe_deriv] = 0
continue
# Count files for this subject in this pipeline
all_pip_files = cltmisc.get_all_files(ind_der_dir[0])
subj_pipe_files = cltmisc.filter_by_substring(
all_pip_files, or_filter=clean_id_dict["sub"], and_filter=subj_ent
)
n_files = len(subj_pipe_files)
# Store the count
proc_table.at[0, tmp_pipe_deriv] = n_files
# Combine the entity info with processing counts
subj_proc_table = cltmisc.expand_and_concatenate(df_add, proc_table)
# Signal completion through the queue
progress_queue.put((True, full_id))
return subj_proc_table
except Exception as e:
# Signal error through the queue
progress_queue.put((False, f"{full_id}: {str(e)}"))
raise e
# Use Rich for progress tracking
all_results = []
stop_monitor = threading.Event()
# Start a separate thread for the progress bar
def progress_monitor(progress_queue, total_subjects, progress_task, stop_event):
completed = 0
errors = 0
while (completed + errors < total_subjects) and not stop_event.is_set():
try:
success, message = progress_queue.get(timeout=0.1)
if success:
completed += 1
else:
errors += 1
console.print(f"[bold red]Error: {message}[/bold red]")
# Update progress bar
progress.update(
progress_task,
completed=completed,
description=f"[yellow]Processing subjects - {completed}/{total_subjects} completed",
)
# Important: mark the task as done in the queue
progress_queue.task_done()
except queue.Empty:
# No updates, just continue waiting
pass
# Ensure the progress bar shows 100% completion
if not stop_event.is_set():
progress.update(progress_task, completed=total_subjects)
with Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}"),
BarColumn(bar_width=50),
TextColumn("[progress.percentage]{task.percentage:>3.1f}%"),
TimeRemainingColumn(),
MofNCompleteColumn(),
console=console,
refresh_per_second=10, # Increase refresh rate
) as progress:
# Add main task to track progress
main_task = progress.add_task("[yellow]Processing subjects", total=n_subj)
# Start progress monitor thread
monitor_thread = threading.Thread(
target=progress_monitor,
args=(progress_queue, n_subj, main_task, stop_monitor),
daemon=True,
)
monitor_thread.start()
try:
# Process subjects in parallel with joblib
results = Parallel(n_jobs=n_jobs, backend="threading", verbose=0)(
delayed(process_subject)(subject) for subject in subj_list
)
# Allow some time for the queue to process final updates
time.sleep(0.5)
# Directly set progress to 100% after all processing is done
progress.update(main_task, completed=n_subj, refresh=True)
# Wait for the progress queue to be empty
progress_queue.join()
finally:
# Signal the monitor thread to stop
stop_monitor.set()
# Make absolutely sure progress shows completion
progress.update(main_task, completed=n_subj, refresh=True)
# Combine all results
proc_status_df = pd.concat(results, ignore_index=True)
# Save table if requested
if output_table is not None:
output_dir = os.path.dirname(output_table)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
proc_status_df.to_csv(output_table, sep="\t", index=False)
return proc_status_df, output_table
######################################################################################################
[docs]
def process_file(filepath: str):
"""
Parse BIDS entities from a single file path.
Parameters
----------
filepath : str
Full path to the file to be processed.
Returns
-------
pd.DataFrame or None
DataFrame with extracted entities if parsing is successful, otherwise None.
"""
try:
table = cltbids.entities_to_table(filepath, include_suffix=True)
if table is None or table.empty:
return None, filepath
return table, None
except Exception:
return None, filepath
######################################################################################################
[docs]
def process_freesurfer_subject(args):
"""
Process a single FreeSurfer subject.
Parameters
----------
args : tuple
Tuple containing (fs_id, pipe_dir) where:
- fs_id: FreeSurfer subject ID (e.g., 'sub-001')
- pipe_dir: Path to the pipeline derivatives directory to scan for this subject (e.g., '/path/to/derivatives/fsl-firstparc')
Returns
-------
pd.DataFrame or None
DataFrame with file type counts for the subject if successful, otherwise None.
"""
fs_id, pipe_dir = args
try:
fs_subj = cltfree.FreeSurferSubject(fs_id, pipe_dir)
f_types = fs_subj.fs_files_count.keys()
f_counts = [fs_subj.fs_files_count[ft] for ft in f_types]
table = cltbids.entities_to_table(fs_id)
tmp_df = pd.DataFrame({"Type": f_types, "Count": f_counts})
tmp_df = cltmisc.expand_and_concatenate(table, tmp_df)
return tmp_df, None
except Exception:
return None, fs_id
########################################################################################################
[docs]
def scan_derivatives(
pipe_dir: str,
subj_ids: Union[str, list] = None,
extensions: list = [
".nii.gz",
".nii",
".mgz",
".stats",
".annot",
".gii",
".gii.gz",
],
) -> list:
"""
Recursively collect all matching files under the derivatives folder.
Parameters
----------
deriv_dir : str
Path to the derivatives directory to scan.
extensions : list, optional
Tuple of file extensions to include in the scan. Default is [".nii.gz", ".nii", ".mgz", ".stats", ".annot", ".gii", ".gii.gz"].
Returns
list
Sorted list of file paths that match the specified extensions and start with "sub-".
Notes
-----
- Only files that start with "sub-" and end with one of the specified extensions are included.
"""
# Detect all the directories in case no subject IDs are provided, just to check if there are any subject folders in the derivatives
if subj_ids is None:
subj_ids = get_ids2process(None, in_dir=pipe_dir)
# Clean list with the the subject IDs to look for in the file paths
if isinstance(subj_ids, str):
subj_ids = [subj_ids]
files = []
for subj_id in subj_ids:
ent_dict = cltbids.str2entity(
subj_id
) # Just to check if the IDs are valid BIDS entities
# Check if there is any folder in the derivatives that starts with sub-ent_dict["sub"]
matching_folders = glob(os.path.join(pipe_dir, f"sub-{ent_dict['sub']}*"))
if matching_folders:
for folder in matching_folders:
sub_files = cltmisc.get_all_files(
folder, or_filter=[subj_id], recursive=True
)
files.extend(sub_files)
files = [f for f in files if any(f.endswith(ext) for ext in extensions)]
return sorted(files)
###############################################################################################
def _run_parallel(
items: list,
submit_fn,
progress: Progress,
task_id,
n_workers: int,
) -> tuple[list, list]:
"""
Submit items to a ThreadPoolExecutor and advance the progress bar in the
main thread via as_completed, which is guaranteed to yield every future
exactly once — eliminating the callback timing issues that caused the bar
to stall before 100%.
Parameters
----------
items : list Items to process.
submit_fn : callable Worker function; must return (result, fail_info).
progress : Progress Active Rich Progress instance.
task_id : Task ID returned by progress.add_task().
n_workers : int Thread-pool size.
Returns
-------
results : list Non-None return values from submit_fn.
failed : list Items whose submit_fn returned None or raised.
"""
results, failed = [], []
with ThreadPoolExecutor(max_workers=n_workers) as executor:
futures = {executor.submit(submit_fn, item): item for item in items}
for f in as_completed(futures):
progress.advance(
task_id
) # main thread; always fires exactly once per future
try:
result, fail_info = f.result()
if result is not None:
results.append(result)
else:
failed.append(fail_info)
except Exception as e:
print(f"[ERROR] {futures[f]}: {e}")
failed.append(futures[f])
return results, failed
###################################################################################################
[docs]
def build_inventory(
deriv_dir: str,
pipe_id: str,
pipe_index: int,
pipe_total: int,
progress: Progress,
subj_ids: Union[str, list] = None,
extensions: list = [
".nii.gz",
".nii",
".mgz",
".stats",
".annot",
".gii",
".gii.gz",
],
output_csv: Union[str, Path] = None,
n_workers: int = 8,
) -> pd.DataFrame:
"""
Build a file inventory for a single pipeline derivative folder.
Parameters
----------
deriv_dir : str
Root derivatives directory.
pipe_id : str
Name of the pipeline sub-folder inside deriv_dir.
pipe_index : int
1-based index of this pipeline (used in the progress bar label).
pipe_total : int
Total number of pipelines being processed (used in the progress bar label).
progress : Progress
Active Rich Progress instance to attach progress bars to.
extensions : list, optional
File extensions to include in the scan. Defaults to [".nii.gz", ".nii", ".mgz", ".stats", ".annot", ".gii", ".gii.gz"].
output_csv : str or Path, optional
If provided, save the inventory DataFrame to this CSV path.
n_workers : int, optional
Number of parallel worker threads.
Returns
-------
pd.DataFrame
Inventory table for this pipeline.
"""
pipe_dir = os.path.join(deriv_dir, pipe_id)
bar_label = f"Pipeline: {pipe_id} [{pipe_index}/{pipe_total}]"
if not os.path.isdir(pipe_dir):
progress.print(f"[bold red]Error:[/bold red] '{pipe_dir}' does not exist.")
return pd.DataFrame()
# ── Decide upfront which processing path to take ──────────────────────
# Ensures exactly one progress task is created per pipeline.
if subj_ids is None:
subj_ids = get_ids2process(
None, in_dir=pipe_dir
) # Just to check if subject folders exist
files = scan_derivatives(pipe_dir, subj_ids, extensions)
if files:
# ── BIDS path ─────────────────────────────────────────────────────
task_id = progress.add_task(bar_label, total=len(files))
results, _ = _run_parallel(
items=files,
submit_fn=process_file,
progress=progress,
task_id=task_id,
n_workers=n_workers,
)
if results:
df = pd.concat(results, ignore_index=True)
df.drop_duplicates(inplace=True)
df["Count"] = 1
priority_cols = [
"count",
"sub",
"ses",
"run",
"acq",
"space",
"model",
"desc",
"res",
"suffix",
"extension",
"full_path",
]
ordered = [c for c in priority_cols if c in df.columns]
remaining = [c for c in df.columns if c not in ordered]
df = df[ordered + remaining]
df.sort_values(
by=[c for c in ["sub", "ses", "run"] if c in df.columns], inplace=True
)
else:
progress.print(
f" [yellow]No parseable BIDS files found in '{pipe_id}'.[/yellow]"
)
df = pd.DataFrame()
else:
# ── FreeSurfer fallback ───────────────────────────────────────────
progress.print(
f" [yellow]No matching files found — falling back to FreeSurfer subject scan for '{pipe_id}'…[/yellow]"
)
fs_ids = get_ids2process(None, in_dir=pipe_dir)
task_id = progress.add_task(bar_label, total=len(fs_ids))
fs_results, _ = _run_parallel(
items=[(fs_id, pipe_dir) for fs_id in fs_ids],
submit_fn=process_freesurfer_subject,
progress=progress,
task_id=task_id,
n_workers=n_workers,
)
df = pd.concat(fs_results, ignore_index=True)
df.drop_duplicates(inplace=True)
df.sort_values(
by=[c for c in ["sub", "ses", "Type"] if c in df.columns], inplace=True
)
# Remove the columns that are completely empty except the column name
df = df.apply(
lambda col: col.map(
lambda x: pd.NA if isinstance(x, str) and x.strip() == "" else x
)
)
df = df.dropna(axis=1, how="all")
# ── Save ──────────────────────────────────────────────────────────────
if output_csv is not None:
output_dir = os.path.dirname(output_csv)
if output_dir and not os.path.exists(output_dir):
progress.print(
f" [yellow]Warning: output directory '{output_dir}' does not exist — skipping save.[/yellow]"
)
else:
save_path = (
str(output_csv)
if str(output_csv).endswith(".csv")
else f"{output_csv}.csv"
)
df.to_csv(save_path, index=False)
progress.print(
f" Saved [bold]{len(df)}[/bold] rows → [green]{save_path}[/green]"
)
else:
progress.print(f" Returning [bold]{len(df)}[/bold] rows (no save).")
return df
####################################################################################################
[docs]
def build_derivatives_inventory(
deriv_dir: str,
pipe_dirs: Union[list, None] = None,
subj_ids: Union[str, list] = None,
extensions: list = [
".nii.gz",
".nii",
".mgz",
".stats",
".annot",
".gii",
".gii.gz",
],
output_csv: Union[str, Path] = None,
n_workers: int = 8,
) -> pd.DataFrame:
"""
Build a combined file inventory across all pipeline derivative folders.
Parameters
----------
deriv_dir : str
Root derivatives directory containing one sub-folder per pipeline.
pipe_dirs : list of str, optional
Pipeline sub-folder names to process. If None, all sub-folders
discovered by cltbids.get_derivatives_folders() are used.
extensions : list, optional
File extensions to include in the scan.
Defaults to (".nii.gz", ".nii").
output_csv : str or Path, optional
If provided, save the combined inventory DataFrame to this CSV path.
n_workers : int, optional
Number of parallel worker threads per pipeline. Defaults to 8.
Returns
-------
pd.DataFrame
Combined inventory table with an extra leading "Pipeline" column.
Raises
------
ValueError
If no pipeline folders are found or provided.
"""
if pipe_dirs is None:
pipe_dirs = cltbids.get_derivatives_folders(deriv_dir)
if not pipe_dirs:
raise ValueError(
"No derivatives folders were found in the specified directory."
)
n_pipes = len(pipe_dirs)
combined_summary = pd.DataFrame()
with Progress() as progress:
for idx, tmp_pipe_deriv in enumerate(pipe_dirs, start=1):
tmp_summary = build_inventory(
deriv_dir=deriv_dir,
pipe_id=tmp_pipe_deriv,
subj_ids=subj_ids,
pipe_index=idx,
pipe_total=n_pipes,
progress=progress,
extensions=extensions,
n_workers=n_workers,
)
tmp_summary.insert(0, "Pipeline", tmp_pipe_deriv)
combined_summary = (
tmp_summary
if idx == 1
else pd.concat([combined_summary, tmp_summary], ignore_index=True)
)
# ── Save combined result ──────────────────────────────────────────────
if output_csv is not None:
save_path = (
str(output_csv) if str(output_csv).endswith(".csv") else f"{output_csv}.csv"
)
combined_summary.to_csv(save_path, index=False)
print(f"\n✓ Combined inventory ({len(combined_summary)} rows) → {save_path}")
else:
print(
f"\n✓ Combined inventory ({len(combined_summary)} rows) returned (no save)."
)
return combined_summary
####################################################################################################
[docs]
def get_processing_status_details_json(
proc_status_df: Union[str, dict],
subj_ids: Union[List[str], str],
deriv_dir: str,
pipe_dirs: Union[List[str], str] = None,
out_json: str = None,
only_ids: bool = False,
):
"""
This function creates a dictionary with the details of the processing status of the subjects in the BIDs derivatives directory.
It provides the IDs of the subjects with incomplete or mismatched number of files.
Parameters
----------
proc_status_df : str or dict
Path to the processing status DataFrame or a DataFrame itself. This DataFrame can be
obtained with the function "create_processing_status_table".
subj_ids : list or str
List of subject IDs or a text file containing the subject IDs.
deriv_dir : str
Path to the derivatives directory.
pipe_dirs : list or str, optional
List of processing pipelines to check. If None, all pipelines will be checked.
out_json : str, optional
Path to save the output JSON file. If None, the JSON file will not be saved.
only_ids : bool, optional
If True, only the IDs of the subjects with mismatches will be returned, without the file details.
Returns
-------
dict
Dictionary containing the details of the processing status of the subjects.
str
Path to the saved JSON file if out_json is provided, otherwise None.
"""
from . import morphometrytools as cltmorpho
import os
import numpy as np
if isinstance(proc_status_df, str):
if not os.path.isfile(proc_status_df):
raise FileNotFoundError(f"The file {proc_status_df} does not exist.")
else:
proc_status_df = cltmisc.smart_read_table(proc_status_df)
elif not isinstance(proc_status_df, pd.DataFrame):
raise TypeError("proc_status_df must be a DataFrame or a string path to a file")
# Process subject IDs
if isinstance(subj_ids, str):
if not os.path.isfile(subj_ids):
raise FileNotFoundError(f"The file {subj_ids} does not exist.")
else:
with open(subj_ids, "r") as f:
subj_list = f.read().splitlines()
elif isinstance(subj_ids, list):
if len(subj_ids) == 0:
raise ValueError("The list of subject IDs is empty.")
else:
subj_list = subj_ids
else:
raise TypeError("subj_ids must be a list or a string path to a file")
# Check if the derivatives directory exists
deriv_dir = cltmisc.remove_trailing_separators(deriv_dir)
if not os.path.isdir(deriv_dir):
raise FileNotFoundError(
f"The derivatives directory {deriv_dir} does not exist."
)
# Find all the derivatives folders
all_pipe_dirs = cltbids.get_derivatives_folders(deriv_dir)
if len(all_pipe_dirs) == 0:
raise ValueError(
"No derivatives folders were found in the specified directory."
)
if pipe_dirs is not None:
if isinstance(pipe_dirs, str):
pipe_dirs = [pipe_dirs]
pipe_dirs = cltmisc.filter_by_substring(all_pipe_dirs, or_filter=pipe_dirs)
else:
pipe_dirs = all_pipe_dirs
# All entities
ent_list = cltbids.entities4table()
# Get all the columns names
col_names = proc_status_df.columns.tolist()
# Get all the columns that are not in the pipe_dirs
subj_columns = list(set(col_names) - set(pipe_dirs))
subj_ids_df = proc_status_df[subj_columns]
# Create a consistent structure for the output dictionary
missmatch_summary = {}
# Process each pipeline
for i in pipe_dirs:
proc_status_df[i] = proc_status_df[i].astype(int)
pipe_dir_fold = os.path.join(deriv_dir, i)
# Initialize consistent structure
missmatch_pipe = {"ref_fullid": "", "missmatch_fullid": {}}
# Get the mode for the column to determine the reference value
mode_value = proc_status_df[i].mode()[0]
# Find rows that match the mode (will be used as reference)
agreement_rows = proc_status_df[proc_status_df[i] == mode_value].index
# Get reference subject details (using the first row that matches the mode)
ref_ids = subj_ids_df.loc[agreement_rows].iloc[0, :]
# Create identifiers for the reference subject
cad2look_ref = [
f"{key}-{ref_ids[value]}"
for key, value in ent_list.items()
if value in subj_columns
]
# Get files for the reference subject
ref_files = cltbids.get_individual_files_and_folders(
pipe_dir_fold,
cad2look_ref,
)
# Find the full ID of the reference subject
try:
ref_full_id = cltmisc.filter_by_substring(
subj_list, or_filter=cad2look_ref[0], and_filter=cad2look_ref
)[0]
except IndexError:
# Handle case where reference ID is not found
ref_full_id = "unknown_reference"
missmatch_pipe["ref_fullid"] = ref_full_id
# Find rows that don't match the mode (disagreement rows)
disagreement_rows = proc_status_df[proc_status_df[i] != mode_value].index
# Only process mismatches if reference files exist and there are disagreements
if ref_files and len(disagreement_rows) > 0:
# Process reference files to remove path prefixes for comparison
cad2look_ref.append(pipe_dir_fold)
tmp_ref_files = cltmisc.remove_substrings(ref_files, cad2look_ref)
# Get the ids of the subjects with disagreement
subtable_ids = subj_ids_df.loc[disagreement_rows]
# Loop through all subjects with disagreement
for j in range(len(disagreement_rows)):
# Get the subject ID
sub_row = subtable_ids.iloc[j, :]
# Create identifiers for this subject
cad2look_ind = [
f"{key}-{sub_row[value]}"
for key, value in ent_list.items()
if value in subj_columns
]
# Get files for this subject
indiv_files = cltbids.get_individual_files_and_folders(
pipe_dir_fold,
cad2look_ind,
)
try:
# Find the full ID of this subject
indiv_full_id = cltmisc.filter_by_substring(
subj_list,
or_filter=cad2look_ind[0],
and_filter=cad2look_ind,
)[0]
except IndexError:
# Handle case where subject ID is not found
indiv_full_id = f"unknown_subject_{j}"
# Initialize results for this subject
missmatch_subject = {"missing_files": [], "extra_files": []}
if indiv_files:
# Process individual files to remove path prefixes for comparison
cad2look_ind.append(pipe_dir_fold)
tmp_indiv_files = cltmisc.remove_substrings(
indiv_files, cad2look_ind
)
# Find missing files (in reference but not in this subject)
tmp_miss = list(set(tmp_ref_files) - set(tmp_indiv_files))
if tmp_miss:
miss_indices = cltmisc.get_indexes_by_substring(
tmp_ref_files, tmp_miss
)
selected_files_ref = [ref_files[i] for i in miss_indices]
missmatch_subject["missing_files"] = cltmisc.replace_substrings(
selected_files_ref, cad2look_ref, cad2look_ind
)
# Find extra files (in this subject but not in reference)
tmp_extra = list(set(tmp_indiv_files) - set(tmp_ref_files))
if tmp_extra:
extra_indices = cltmisc.get_indexes_by_substring(
tmp_indiv_files, tmp_extra
)
selected_files_indiv = [indiv_files[i] for i in extra_indices]
missmatch_subject["extra_files"] = cltmisc.replace_substrings(
selected_files_indiv, cad2look_ind, cad2look_ref
)
else:
# If no files found for this subject, all reference files are missing
missmatch_subject["missing_files"] = cltmisc.replace_substrings(
ref_files, cad2look_ref, cad2look_ind
)
# Add this subject's details to the results
missmatch_pipe["missmatch_fullid"][indiv_full_id] = missmatch_subject
# Add this pipeline's results to the summary
missmatch_summary[i] = missmatch_pipe
# If only_ids is True, simplify the output to just include IDs
if only_ids:
for i in missmatch_summary.keys():
missmatch_summary[i]["missmatch_fullid"] = list(
missmatch_summary[i]["missmatch_fullid"].keys()
)
# Save results to JSON if requested
if out_json is not None:
json_path = os.path.dirname(out_json)
if not os.path.isdir(json_path):
# Raise an error if the directory does not exist
raise FileNotFoundError(f"The directory {json_path} does not exist.")
cltmisc.save_dictionary_to_json(missmatch_summary, out_json)
return missmatch_summary, out_json
####################################################################################################
[docs]
def get_processing_status_details_sqlite3(
proc_status_df: Union[str, dict],
subj_ids: Union[List[str], str],
deriv_dir: str,
pipe_dirs: Union[List[str], str] = None,
out_json: str = None,
db_path: str = None,
only_ids: bool = False,
):
"""
This function creates a dictionary with the details of the processing status of the subjects in the BIDs derivatives directory.
It provides the IDs of the subjects with incomplete or mismatched number of files.
Parameters
----------
proc_status_df : str or dict
Path to the processing status DataFrame or a DataFrame itself. This DataFrame can be
obtained with the function "create_processing_status_table".
subj_ids : list or str
List of subject IDs or a text file containing the subject IDs.
deriv_dir : str
Path to the derivatives directory.
pipe_dirs : list or str, optional
List of processing pipelines to check. If None, all pipelines will be checked.
out_json : str, optional
Path to save the output JSON file. If None, the JSON file will not be saved.
db_path : str, optional
Path to save the SQLite database file. If None, the database will not be created.
only_ids : bool, optional
If True, only the IDs of the subjects with mismatches will be returned, without the file details.
Returns
-------
dict
Dictionary containing the details of the processing status of the subjects.
str
Path to the saved JSON file if out_json is provided, otherwise None.
"""
from . import morphometrytools as cltmorpho
import sqlite3
if isinstance(proc_status_df, str):
if not os.path.isfile(proc_status_df):
raise FileNotFoundError(f"The file {proc_status_df} does not exist.")
else:
proc_status_df = cltmisc.smart_read_table(proc_status_df)
elif not isinstance(proc_status_df, pd.DataFrame):
raise TypeError("proc_status_df must be a DataFrame or a string path to a file")
# Process subject IDs
if isinstance(subj_ids, str):
if not os.path.isfile(subj_ids):
raise FileNotFoundError(f"The file {subj_ids} does not exist.")
else:
with open(subj_ids, "r") as f:
subj_list = f.read().splitlines()
elif isinstance(subj_ids, list):
if len(subj_ids) == 0:
raise ValueError("The list of subject IDs is empty.")
else:
subj_list = subj_ids
else:
raise TypeError("subj_ids must be a list or a string path to a file")
# Check if the derivatives directory exists
deriv_dir = cltmisc.remove_trailing_separators(deriv_dir)
if not os.path.isdir(deriv_dir):
raise FileNotFoundError(
f"The derivatives directory {deriv_dir} does not exist."
)
# Find all the derivatives folders
all_pipe_dirs = cltbids.get_derivatives_folders(deriv_dir)
if len(all_pipe_dirs) == 0:
raise ValueError(
"No derivatives folders were found in the specified directory."
)
if pipe_dirs is not None:
if isinstance(pipe_dirs, str):
pipe_dirs = [pipe_dirs]
pipe_dirs = cltmisc.filter_by_substring(all_pipe_dirs, or_filter=pipe_dirs)
else:
pipe_dirs = all_pipe_dirs
# All entities
ent_list = cltbids.entities4table()
# Get all the columns names
col_names = proc_status_df.columns.tolist()
# Get all the columns that are not in the pipe_dirs
subj_columns = list(set(col_names) - set(pipe_dirs))
subj_ids_df = proc_status_df[subj_columns]
# Create a consistent structure for the output dictionary
missmatch_summary = {}
# Initialize SQLite database if db_path is provided
if db_path:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create tables
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS pipelines (
pipeline_id TEXT PRIMARY KEY,
ref_fullid TEXT
)"""
)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS mismatches (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pipeline_id TEXT,
subject_id TEXT,
FOREIGN KEY (pipeline_id) REFERENCES pipelines(pipeline_id),
UNIQUE (pipeline_id, subject_id)
)"""
)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS file_details (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mismatch_id INTEGER,
file_path TEXT,
status TEXT,
FOREIGN KEY (mismatch_id) REFERENCES mismatches(id)
)"""
)
# Clear existing data if needed
cursor.execute("DELETE FROM file_details")
cursor.execute("DELETE FROM mismatches")
cursor.execute("DELETE FROM pipelines")
# Process each pipeline
for i in pipe_dirs:
proc_status_df[i] = proc_status_df[i].astype(int)
pipe_dir_fold = os.path.join(deriv_dir, i)
# Initialize consistent structure
missmatch_pipe = {"ref_fullid": "", "missmatch_fullid": {}}
# Get the mode for the column to determine the reference value
mode_value = proc_status_df[i].mode()[0]
# Find rows that match the mode (will be used as reference)
agreement_rows = proc_status_df[proc_status_df[i] == mode_value].index
# Get reference subject details (using the first row that matches the mode)
ref_ids = subj_ids_df.loc[agreement_rows].iloc[0, :]
# Create identifiers for the reference subject
cad2look_ref = [
f"{key}-{ref_ids[value]}"
for key, value in ent_list.items()
if value in subj_columns
]
# Get files for the reference subject
ref_files = cltbids.get_individual_files_and_folders(
pipe_dir_fold,
cad2look_ref,
)
# Find the full ID of the reference subject
try:
ref_full_id = cltmisc.filter_by_substring(
subj_list, or_filter=cad2look_ref[0], and_filter=cad2look_ref
)[0]
except IndexError:
# Handle case where reference ID is not found
ref_full_id = "unknown_reference"
missmatch_pipe["ref_fullid"] = ref_full_id
# If using SQLite, insert pipeline info
if db_path:
cursor.execute(
"INSERT OR REPLACE INTO pipelines VALUES (?, ?)", (i, ref_full_id)
)
# Find rows that don't match the mode (disagreement rows)
disagreement_rows = proc_status_df[proc_status_df[i] != mode_value].index
# Only process mismatches if reference files exist and there are disagreements
if ref_files and len(disagreement_rows) > 0:
# Process reference files to remove path prefixes for comparison
cad2look_ref.append(pipe_dir_fold)
tmp_ref_files = cltmisc.remove_substrings(ref_files, cad2look_ref)
# Get the ids of the subjects with disagreement
subtable_ids = subj_ids_df.loc[disagreement_rows]
# Loop through all subjects with disagreement
for j in range(len(disagreement_rows)):
# Get the subject ID
sub_row = subtable_ids.iloc[j, :]
# Create identifiers for this subject
cad2look_ind = [
f"{key}-{sub_row[value]}"
for key, value in ent_list.items()
if value in subj_columns
]
# Get files for this subject
indiv_files = cltbids.get_individual_files_and_folders(
pipe_dir_fold,
cad2look_ind,
)
try:
# Find the full ID of this subject
indiv_full_id = cltmisc.filter_by_substring(
subj_list,
or_filter=cad2look_ind[0],
and_filter=cad2look_ind,
)[0]
except IndexError:
# Handle case where subject ID is not found
indiv_full_id = f"unknown_subject_{j}"
# Initialize results for this subject
missmatch_subject = {"missing_files": [], "extra_files": []}
# Insert subject into mismatches table if using SQLite
if db_path:
cursor.execute(
"INSERT OR REPLACE INTO mismatches (pipeline_id, subject_id) VALUES (?, ?)",
(i, indiv_full_id),
)
mismatch_id = cursor.lastrowid
if indiv_files:
# Process individual files to remove path prefixes for comparison
cad2look_ind.append(pipe_dir_fold)
tmp_indiv_files = cltmisc.remove_substrings(
indiv_files, cad2look_ind
)
# Find missing files (in reference but not in this subject)
tmp_miss = list(set(tmp_ref_files) - set(tmp_indiv_files))
if tmp_miss:
miss_indices = cltmisc.get_indexes_by_substring(
tmp_ref_files, tmp_miss
)
selected_files_ref = [ref_files[i] for i in miss_indices]
missing_files = cltmisc.replace_substrings(
selected_files_ref, cad2look_ref, cad2look_ind
)
missmatch_subject["missing_files"] = missing_files
# Insert missing files into database if using SQLite
if db_path:
for file_path in missing_files:
cursor.execute(
"INSERT INTO file_details (mismatch_id, file_path, status) VALUES (?, ?, ?)",
(mismatch_id, file_path, "missing"),
)
# Find extra files (in this subject but not in reference)
tmp_extra = list(set(tmp_indiv_files) - set(tmp_ref_files))
if tmp_extra:
extra_indices = cltmisc.get_indexes_by_substring(
tmp_indiv_files, tmp_extra
)
selected_files_indiv = [indiv_files[i] for i in extra_indices]
extra_files = cltmisc.replace_substrings(
selected_files_indiv, cad2look_ind, cad2look_ref
)
missmatch_subject["extra_files"] = extra_files
# Insert extra files into database if using SQLite
if db_path:
for file_path in extra_files:
cursor.execute(
"INSERT INTO file_details (mismatch_id, file_path, status) VALUES (?, ?, ?)",
(mismatch_id, file_path, "extra"),
)
else:
# If no files found for this subject, all reference files are missing
missing_files = cltmisc.replace_substrings(
ref_files, cad2look_ref, cad2look_ind
)
missmatch_subject["missing_files"] = missing_files
# Insert missing files into database if using SQLite
if db_path:
for file_path in missing_files:
cursor.execute(
"INSERT INTO file_details (mismatch_id, file_path, status) VALUES (?, ?, ?)",
(mismatch_id, file_path, "missing"),
)
# Add this subject's details to the results
missmatch_pipe["missmatch_fullid"][indiv_full_id] = missmatch_subject
# Add this pipeline's results to the summary
missmatch_summary[i] = missmatch_pipe
# If only_ids is True, simplify the output to just include IDs
if only_ids:
for i in missmatch_summary.keys():
missmatch_summary[i]["missmatch_fullid"] = list(
missmatch_summary[i]["missmatch_fullid"].keys()
)
# Commit changes and close database connection if using SQLite
if db_path:
conn.commit()
conn.close()
# Save results to JSON if requested
if out_json is not None:
json_path = os.path.dirname(out_json)
if not os.path.isdir(json_path):
# Raise an error if the directory does not exist
raise FileNotFoundError(f"The directory {json_path} does not exist.")
cltmisc.save_dictionary_to_json(missmatch_summary, out_json)
return missmatch_summary, out_json
####################################################################################################
[docs]
def query_processing_status_db(
db_path, query_type="subjects_with_mismatches", pipeline=None
):
"""
Query the processing status database to extract useful information.
Parameters
----------
db_path : str
Path to the SQLite database file.
query_type : str, optional
Type of query to run. Options:
- "subjects_with_mismatches": Get all subjects with mismatches
- "pipelines_with_mismatches": Get all pipelines with mismatches and count
- "missing_files_count": Get number of missing files per subject
- "extra_files_count": Get number of extra files per subject
pipeline : str, optional
Name of the pipeline to filter by. Used only with certain query types.
Returns
-------
pd.DataFrame
Result of the query as a DataFrame.
"""
import sqlite3
import pandas as pd
conn = sqlite3.connect(db_path)
if query_type == "subjects_with_mismatches":
if pipeline:
query = """
SELECT subject_id, pipeline_id
FROM mismatches
WHERE pipeline_id = ?
ORDER BY subject_id
"""
df = pd.read_sql_query(query, conn, params=(pipeline,))
else:
query = """
SELECT subject_id, GROUP_CONCAT(pipeline_id) as pipelines
FROM mismatches
GROUP BY subject_id
ORDER BY subject_id
"""
df = pd.read_sql_query(query, conn)
elif query_type == "pipelines_with_mismatches":
query = """
SELECT pipeline_id, COUNT(DISTINCT subject_id) as subject_count
FROM mismatches
GROUP BY pipeline_id
ORDER BY subject_count DESC
"""
df = pd.read_sql_query(query, conn)
elif query_type == "missing_files_count":
if pipeline:
query = """
SELECT m.subject_id, COUNT(*) as missing_count
FROM mismatches m
JOIN file_details f ON m.id = f.mismatch_id
WHERE f.status = 'missing' AND m.pipeline_id = ?
GROUP BY m.subject_id
ORDER BY missing_count DESC
"""
df = pd.read_sql_query(query, conn, params=(pipeline,))
else:
query = """
SELECT m.subject_id, m.pipeline_id, COUNT(*) as missing_count
FROM mismatches m
JOIN file_details f ON m.id = f.mismatch_id
WHERE f.status = 'missing'
GROUP BY m.subject_id, m.pipeline_id
ORDER BY missing_count DESC
"""
df = pd.read_sql_query(query, conn)
elif query_type == "extra_files_count":
if pipeline:
query = """
SELECT m.subject_id, COUNT(*) as extra_count
FROM mismatches m
JOIN file_details f ON m.id = f.mismatch_id
WHERE f.status = 'extra' AND m.pipeline_id = ?
GROUP BY m.subject_id
ORDER BY extra_count DESC
"""
df = pd.read_sql_query(query, conn, params=(pipeline,))
else:
query = """
SELECT m.subject_id, m.pipeline_id, COUNT(*) as extra_count
FROM mismatches m
JOIN file_details f ON m.id = f.mismatch_id
WHERE f.status = 'extra'
GROUP BY m.subject_id, m.pipeline_id
ORDER BY extra_count DESC
"""
df = pd.read_sql_query(query, conn)
else:
raise ValueError(f"Unknown query type: {query_type}")
conn.close()
return df
####################################################################################################
[docs]
def export_db_to_json(db_path, out_json):
"""
Export the processing status database to a JSON file in the same format
as returned by get_processing_status_details.
Parameters
----------
db_path : str
Path to the SQLite database file.
out_json : str
Path to save the output JSON file.
Returns
-------
dict
Dictionary containing the details of the processing status of the subjects.
"""
import sqlite3
import json
import os
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Get all pipelines
cursor.execute("SELECT pipeline_id, ref_fullid FROM pipelines")
pipelines = cursor.fetchall()
# Create the output dictionary
output_dict = {}
for pipe_id, ref_fullid in pipelines:
# Initialize pipeline entry
pipe_entry = {"ref_fullid": ref_fullid, "missmatch_fullid": {}}
# Get all mismatches for this pipeline
cursor.execute(
"""
SELECT id, subject_id
FROM mismatches
WHERE pipeline_id = ?
""",
(pipe_id,),
)
mismatches = cursor.fetchall()
for mismatch_id, subject_id in mismatches:
# Get missing files
cursor.execute(
"""
SELECT file_path
FROM file_details
WHERE mismatch_id = ? AND status = 'missing'
""",
(mismatch_id,),
)
missing_files = [row[0] for row in cursor.fetchall()]
# Get extra files
cursor.execute(
"""
SELECT file_path
FROM file_details
WHERE mismatch_id = ? AND status = 'extra'
""",
(mismatch_id,),
)
extra_files = [row[0] for row in cursor.fetchall()]
# Add to dictionary
pipe_entry["missmatch_fullid"][subject_id] = {
"missing_files": missing_files,
"extra_files": extra_files,
}
# Add pipeline to output
output_dict[pipe_id] = pipe_entry
conn.close()
# Save to JSON
with open(out_json, "w") as f:
json.dump(output_dict, f, indent=2)
return output_dict