import os
import sys
import copy
import warnings
import nibabel as nib
import numpy as np
import subprocess
from pathlib import Path
from typing import Union, Optional, List, Tuple, Dict
import pyvista as pv
import json
from scipy.ndimage import binary_erosion, binary_dilation, binary_opening, convolve
from scipy.ndimage import binary_fill_holes, label, binary_closing, gaussian_filter
from scipy.spatial import distance
from scipy.interpolate import RegularGridInterpolator
from skimage import measure
# Importing local modules
from . import misctools as cltmisc
from . import bidstools as cltbids
from . import parcellationtools as cltparc
from . import colorstools as cltcol
from . import dwitools_utils as dwiutils
from . import imagetools_utils as imgutils
####################################################################################################
####################################################################################################
############ ############
############ ############
############ Section 1: Class and methods to perform morphological operations on images ############
############ ############
############ ############
####################################################################################################
####################################################################################################
[docs]
class MorphologicalOperations:
"""
A class to perform morphological operations on binary arrays.
Provides methods for common morphological operations including erosion,
dilation, opening, closing, and hole filling on 2D and 3D binary images.
"""
[docs]
def __init__(self):
"""Initialize the morphological operations class."""
pass
########################################################################################################
[docs]
def create_structuring_element(self, shape="cube", size=3, dimensions=None):
"""
Create a structuring element for morphological operations.
Parameters
----------
shape : str, optional
Element shape: 'cube'/'square', 'ball'/'disk', or 'cross'. Default is 'cube'.
size : int, optional
Size of the structuring element. Default is 3.
dimensions : int, optional
Number of dimensions (2 or 3). If None, defaults to 3. Default is None.
Returns
-------
np.ndarray
Boolean array representing the structuring element.
Raises
------
ValueError
If shape is not supported or dimensions are invalid.
Examples
--------
>>> morph = MorphologicalOperations()
>>> cube_elem = morph.create_structuring_element('cube', size=5)
>>> ball_elem = morph.create_structuring_element('ball', size=7, dimensions=3)
"""
if dimensions is None:
dimensions = 3 # default to 3D
if shape in ["cube", "square"]:
# Create cubic/square structuring element
return np.ones((size,) * dimensions, dtype=bool)
elif shape in ["ball", "disk"]:
# Create spherical/circular structuring element
radius = size // 2
if dimensions == 2:
y, x = np.ogrid[-radius : radius + 1, -radius : radius + 1]
return x**2 + y**2 <= radius**2
elif dimensions == 3:
z, y, x = np.ogrid[
-radius : radius + 1, -radius : radius + 1, -radius : radius + 1
]
return x**2 + y**2 + z**2 <= radius**2
else:
raise ValueError("Ball/disk only supported for 2D and 3D")
elif shape == "cross":
# Create cross-shaped structuring element
if dimensions == 2:
cross = np.array([[0, 1, 0], [1, 1, 1], [0, 1, 0]], dtype=bool)
return cross
elif dimensions == 3:
cross = np.zeros((3, 3, 3), dtype=bool)
cross[1, 1, :] = True # x-axis
cross[1, :, 1] = True # y-axis
cross[:, 1, 1] = True # z-axis
return cross
else:
raise ValueError("Cross only supported for 2D and 3D")
else:
raise ValueError(
"Shape must be 'cube', 'ball', 'cross', 'square', or 'disk'"
)
########################################################################################################
[docs]
def erode(self, binary_array, structure=None, iterations=1):
"""
Perform binary erosion to shrink objects and remove small noise.
Parameters
----------
binary_array : np.ndarray
Binary numpy array (2D or 3D).
structure : np.ndarray, optional
Structuring element. If None, uses default 3x3 or 3x3x3 cube. Default is None.
iterations : int, optional
Number of erosion iterations. Default is 1.
Returns
-------
np.ndarray
Eroded binary array.
Examples
--------
>>> eroded = morph.erode(binary_image, iterations=2)
"""
binary_array = self._ensure_binary(binary_array)
if structure is None:
structure = self.create_structuring_element("cube", 3, binary_array.ndim)
return binary_erosion(binary_array, structure=structure, iterations=iterations)
########################################################################################################
[docs]
def dilate(self, binary_array, structure=None, iterations=1):
"""
Perform binary dilation to expand objects and fill small gaps.
Parameters
----------
binary_array : np.ndarray
Binary numpy array (2D or 3D).
structure : np.ndarray, optional
Structuring element. If None, uses default 3x3 or 3x3x3 cube. Default is None.
iterations : int, optional
Number of dilation iterations. Default is 1.
Returns
-------
np.ndarray
Dilated binary array.
Examples
--------
>>> dilated = morph.dilate(binary_image, iterations=3)
"""
binary_array = self._ensure_binary(binary_array)
if structure is None:
structure = self.create_structuring_element("cube", 3, binary_array.ndim)
return binary_dilation(binary_array, structure=structure, iterations=iterations)
############################################################################################
[docs]
def dilate_mm(
self,
array: np.ndarray,
affine: np.ndarray,
shape: str = "cube",
dilation_mm: float = 1,
) -> np.ndarray:
"""
Binarize a 3D array and dilate by a given distance in millimeters.
Parameters
----------
array : 3D numpy array with values and zeros
affine : 4x4 affine matrix (voxel → mm)
shape : 'cube' for a box or 'ball' for an ellipsoid structuring element
dilation_mm : dilation radius in millimeters
Returns
-------
dilated : binary 3D numpy array (bool)
"""
binary_array = self._ensure_binary(array)
voxel_sizes = get_voxel_size(affine)
# Per-axis radii in voxels
radii_vox = np.array([dilation_mm / vs for vs in voxel_sizes])
r = np.ceil(radii_vox).astype(int)
if shape is None:
shape = "cube"
# If voxels are isotropic, reuse create_structuring_element directly
if np.allclose(r, r[0]):
size = 2 * r[0] + 1
structure = self.create_structuring_element(
shape=shape, size=size, dimensions=binary_array.ndim
)
# Anisotropic voxels: build the structure manually per axis
else:
if shape in ["cube", "square"]:
# Non-square box sized independently per axis
structure = np.ones([2 * ri + 1 for ri in r], dtype=bool)
elif shape in ["ball", "disk"]:
# True ellipsoid in mm space
xi = np.arange(-r[0], r[0] + 1)
yi = np.arange(-r[1], r[1] + 1)
zi = np.arange(-r[2], r[2] + 1)
xx, yy, zz = np.meshgrid(xi, yi, zi, indexing="ij")
structure = (
(xx / radii_vox[0]) ** 2
+ (yy / radii_vox[1]) ** 2
+ (zz / radii_vox[2]) ** 2
) <= 1.0
else:
raise ValueError("Shape must be 'cube', 'ball', 'square', or 'disk'")
return binary_dilation(binary_array, structure=structure)
########################################################################################################
[docs]
def opening(self, binary_array, structure=None, iterations=1):
"""
Perform morphological opening (erosion followed by dilation).
Removes small objects and noise while preserving larger structures.
Parameters
----------
binary_array : np.ndarray
Binary numpy array (2D or 3D).
structure : np.ndarray, optional
Structuring element. Default is None.
iterations : int, optional
Number of iterations. Default is 1.
Returns
-------
np.ndarray
Opened binary array.
Examples
--------
>>> cleaned = morph.opening(noisy_image, iterations=2)
"""
binary_array = self._ensure_binary(binary_array)
if structure is None:
structure = self.create_structuring_element("cube", 3, binary_array.ndim)
return binary_opening(binary_array, structure=structure, iterations=iterations)
########################################################################################################
[docs]
def closing(self, binary_array, structure=None, iterations=1):
"""
Perform morphological closing (dilation followed by erosion).
Fills small holes and gaps while preserving object size.
Parameters
----------
binary_array : np.ndarray
Binary numpy array (2D or 3D).
structure : np.ndarray, optional
Structuring element. Default is None.
iterations : int, optional
Number of iterations. Default is 1.
Returns
-------
np.ndarray
Closed binary array.
Examples
--------
>>> filled = morph.closing(image_with_holes, iterations=1)
"""
binary_array = self._ensure_binary(binary_array)
if structure is None:
structure = self.create_structuring_element("cube", 3, binary_array.ndim)
return binary_closing(binary_array, structure=structure, iterations=iterations)
########################################################################################################
[docs]
def fill_holes(self, binary_array, structure=None):
"""
Fill holes in binary objects.
Parameters
----------
binary_array : np.ndarray
Binary numpy array (2D or 3D).
structure : np.ndarray, optional
Structuring element for connectivity. Default is None.
Returns
-------
np.ndarray
Binary array with filled holes.
Examples
--------
>>> filled = morph.fill_holes(binary_mask)
"""
binary_array = self._ensure_binary(binary_array)
return binary_fill_holes(binary_array, structure=structure)
########################################################################################################
[docs]
def remove_small_objects(self, binary_array, min_size=50):
"""
Remove connected components smaller than specified size.
Parameters
----------
binary_array : np.ndarray
Binary numpy array (2D or 3D).
min_size : int, optional
Minimum size of objects to keep in voxels/pixels. Default is 50.
Returns
-------
np.ndarray
Binary array with small objects removed.
Examples
--------
>>> cleaned = morph.remove_small_objects(binary_image, min_size=100)
"""
binary_array = self._ensure_binary(binary_array)
labeled_array, num_labels = label(binary_array)
# Count voxels/pixels in each connected component
label_sizes = np.bincount(labeled_array.ravel())
# Create mask for objects to keep (size >= min_size)
keep_labels = label_sizes >= min_size
keep_labels[0] = False # Always remove background (label 0)
# Create final result
result = np.zeros_like(binary_array, dtype=bool)
for label_idx in np.where(keep_labels)[0]:
result[labeled_array == label_idx] = True
return result
########################################################################################################
[docs]
def gradient(self, binary_array, structure=None):
"""
Morphological gradient (dilation - erosion) to highlight object boundaries.
Parameters
----------
binary_array : np.ndarray
Binary numpy array (2D or 3D).
structure : np.ndarray, optional
Structuring element. Default is None.
Returns
-------
np.ndarray
Binary array containing object boundaries.
Examples
--------
>>> edges = morph.gradient(binary_object)
"""
binary_array = self._ensure_binary(binary_array)
if structure is None:
structure = self.create_structuring_element("cube", 3, binary_array.ndim)
dilated = self.dilate(binary_array, structure)
eroded = self.erode(binary_array, structure)
return dilated & ~eroded # Return as boolean
########################################################################################################
[docs]
def tophat(self, binary_array, structure=None):
"""
White top-hat transform (original - opening) to extract small bright structures.
Parameters
----------
binary_array : np.ndarray
Binary numpy array (2D or 3D).
structure : np.ndarray, optional
Structuring element. Default is None.
Returns
-------
np.ndarray
Binary array containing small bright structures.
Examples
--------
>>> small_objects = morph.tophat(binary_image)
"""
binary_array = self._ensure_binary(binary_array)
opened = self.opening(binary_array, structure)
return binary_array & ~opened # Return as boolean
########################################################################################################
[docs]
def blackhat(self, binary_array, structure=None):
"""
Black top-hat transform (closing - original) to extract small dark structures.
Parameters
----------
binary_array : np.ndarray
Binary numpy array (2D or 3D).
structure : np.ndarray, optional
Structuring element. Default is None.
Returns
-------
np.ndarray
Binary array containing small dark structures (holes).
Examples
--------
>>> small_holes = morph.blackhat(binary_image)
"""
binary_array = self._ensure_binary(binary_array)
closed = self.closing(binary_array, structure)
return closed & ~binary_array # Return as boolean
def _ensure_binary(self, array):
"""Ensure the array is binary (boolean type)."""
if array.dtype != bool:
return array != 0
return array
#####################################################################################################
# Convenience function for quick operations
[docs]
def quick_morphology(binary_array, operation, **kwargs):
"""
Quick access to morphological operations without creating class instance.
Parameters
----------
binary_array : np.ndarray
Binary numpy array (2D or 3D).
operation : str
Operation name: 'erode', 'dilate', 'opening', 'closing', 'fill_holes',
'remove_small', 'gradient', 'tophat', 'blackhat'.
**kwargs
Additional arguments for the specific operation.
Returns
-------
np.ndarray
Result of the morphological operation.
Raises
------
ValueError
If operation is not supported.
Examples
--------
>>> # Quick erosion
>>> eroded = quick_morphology(binary_image, 'erode', iterations=2)
>>>
>>> # Quick hole filling
>>> filled = quick_morphology(binary_mask, 'fill_holes')
"""
morph = MorphologicalOperations()
operation_map = {
"erode": morph.erode,
"dilate": morph.dilate,
"opening": morph.opening,
"closing": morph.closing,
"fill_holes": morph.fill_holes,
"remove_small": morph.remove_small_objects,
"gradient": morph.gradient,
"tophat": morph.tophat,
"blackhat": morph.blackhat,
}
if operation not in operation_map:
raise ValueError(f"Operation must be one of: {list(operation_map.keys())}")
return operation_map[operation](binary_array, **kwargs)
####################################################################################################
####################################################################################################
############ ############
############ ############
############ Section 2: Methods to get attributes from the images ############
############ ############
############ ############
####################################################################################################
####################################################################################################
[docs]
def get_voxel_size(affine: np.ndarray):
"""
Compute voxel dimensions from NIfTI affine matrix.
Parameters
----------
affine : np.ndarray
4x4 affine transformation matrix from NIfTI header.
Returns
-------
tuple
Voxel sizes (voxel_x, voxel_y, voxel_z) in mm.
Examples
--------
>>> img = nib.load('image.nii.gz')
>>> vox_x, vox_y, vox_z = get_voxel_size(img.affine)
>>> print(f"Voxel size: {vox_x:.2f} x {vox_y:.2f} x {vox_z:.2f} mm")
"""
# Extract voxel sizes as the magnitude of each column vector
voxel_x = np.linalg.norm(affine[:3, 0])
voxel_y = np.linalg.norm(affine[:3, 1])
voxel_z = np.linalg.norm(affine[:3, 2])
return (voxel_x, voxel_y, voxel_z)
####################################################################################################
[docs]
def get_voxel_volume(affine: np.ndarray) -> float:
"""
Compute voxel dimensions from an affine matrix.
Parameters
----------
affine : np.ndarray
4x4 affine transformation matrix from NIfTI header.
Returns
-------
tuple
Voxel sizes (voxel_x, voxel_y, voxel_z) in mm.
Examples
--------
>>> img = nib.load('image.nii.gz')
>>> vox_x, vox_y, vox_z = get_voxel_size(img.affine)
>>> print(f"Voxel size: {vox_x:.2f} x {vox_y:.2f} x {vox_z:.2f} mm")
"""
voxel_x, voxel_y, voxel_z = get_voxel_size(affine)
return voxel_x * voxel_y * voxel_z
####################################################################################################
[docs]
def get_center(affine: np.ndarray) -> tuple:
"""
Compute voxel volume from NIfTI affine matrix.
Parameters
----------
affine : np.ndarray
4x4 affine transformation matrix from NIfTI header.
Returns
-------
float
Voxel volume in mm³.
Examples
--------
>>> img = nib.load('image.nii.gz')
>>> volume = get_voxel_volume(img.affine)
>>> print(f"Voxel volume: {volume:.3f} mm³")
"""
return (affine[0, 3], affine[1, 3], affine[2, 3])
####################################################################################################
[docs]
def get_rotation_matrix(affine: np.ndarray) -> np.ndarray:
"""
Extract normalized rotation matrix from affine matrix.
Parameters
----------
affine : np.ndarray
4x4 affine transformation matrix from NIfTI header.
Returns
-------
np.ndarray
3x3 normalized rotation matrix (without scaling).
Examples
--------
>>> rotation = get_rotation_matrix(img.affine)
>>> print(f"Rotation matrix shape: {rotation.shape}")
"""
# Extract 3x3 rotation/scaling matrix
rot_scale = affine[:3, :3]
# Normalize each column to remove scaling and keep only rotation
rotation = np.zeros_like(rot_scale)
for i in range(3):
rotation[:, i] = rot_scale[:, i] / np.linalg.norm(rot_scale[:, i])
return rotation
####################################################################################################
[docs]
def get_vox_neighbors(
coord: np.ndarray, neighborhood: str = "26", dims: str = "3", order: int = 1
) -> np.ndarray:
"""
Get neighborhood coordinates for a voxel.
Parameters
----------
coord : np.ndarray
Coordinates of the center voxel.
neighborhood : str, optional
Neighborhood type: '6', '18', '26' for 3D or '4', '8' for 2D. Default is '26'.
dims : str, optional
Number of dimensions: '2' or '3'. Default is '3'.
order : int, optional
Order parameter (currently unused). Default is 1.
Returns
-------
np.ndarray
Array of neighbor coordinates.
Raises
------
ValueError
If dimensions don't match coordinates or neighborhood type is invalid.
Examples
--------
>>> # Get 26-connected neighbors in 3D
>>> center = np.array([10, 15, 20])
>>> neighbors = get_vox_neighbors(center, neighborhood='26', dims='3')
>>> print(f"Found {len(neighbors)} neighbors")
"""
# Check if the number of dimensions in coord supported by the supplied coordinates
if len(coord) != int(dims):
raise ValueError(
"The number of dimensions in the coordinates is not supported."
)
# Check if the number of dimensions is supported
if dims == "3":
# Check if it is a valid neighborhood
if neighborhood not in ["6", "18", "26"]:
raise ValueError("The neighborhood type is not supported.")
# Constructing the neighborhood
if neighborhood == "6":
neighbors = np.array(
[[1, 0, 0], [-1, 0, 0], [0, 1, 0], [0, -1, 0], [0, 0, 1], [0, 0, -1]]
)
elif neighborhood == "12":
neighbors = np.array(
[
[1, 0, 0],
[-1, 0, 0],
[0, 1, 0],
[0, -1, 0],
[0, 0, 1],
[0, 0, -1],
[1, 1, 0],
[-1, -1, 0],
[1, -1, 0],
[-1, 1, 0],
[1, 0, 1],
[-1, 0, -1],
]
)
elif neighborhood == "18":
neighbors = np.array(
[
[1, 0, 0],
[-1, 0, 0],
[0, 1, 0],
[0, -1, 0],
[0, 0, 1],
[0, 0, -1],
[1, 1, 0],
[-1, -1, 0],
[1, -1, 0],
[-1, 1, 0],
[1, 0, 1],
[-1, 0, -1],
[1, 0, -1],
[-1, 0, 1],
[0, 1, 1],
[0, -1, -1],
[0, 1, -1],
[0, -1, 1],
]
)
elif neighborhood == "26":
neighbors = np.array(
[
[1, 0, 0],
[-1, 0, 0],
[0, 1, 0],
[0, -1, 0],
[0, 0, 1],
[0, 0, -1],
[1, 1, 0],
[-1, -1, 0],
[1, -1, 0],
[-1, 1, 0],
[1, 0, 1],
[-1, 0, -1],
[1, 0, -1],
[-1, 0, 1],
[0, 1, 1],
[0, -1, -1],
[0, 1, -1],
[0, -1, 1],
[1, 1, 1],
[-1, -1, -1],
[1, -1, -1],
[-1, 1, -1],
[1, 1, -1],
[-1, -1, 1],
[1, -1, 1],
[-1, 1, 1],
]
)
elif dims == "2":
if neighborhood not in ["4", "8"]:
raise ValueError("The neighborhood type is not supported.")
if neighborhood == "4":
neighbors = np.array([[1, 0], [-1, 0], [0, 1], [0, -1]])
elif neighborhood == "8":
neighbors = np.array(
[[1, 0], [-1, 0], [0, 1], [0, -1], [1, 1], [-1, -1], [1, -1], [-1, 1]]
)
else:
raise ValueError("The number of dimensions is not supported.")
neighbors = np.array([coord + n for n in neighbors])
return neighbors
####################################################################################################
####################################################################################################
############ ############
############ ############
############ Section 3: Methods to operate over images (e.g. crop) ############
############ ############
############ ############
####################################################################################################
####################################################################################################
[docs]
def crop_image_from_mask(
in_image: str,
mask: Union[str, np.ndarray],
out_image: str,
st_codes: Union[list, np.ndarray] = None,
) -> str:
"""
Crop image using a mask to minimum bounding box containing specified structures.
Parameters
----------
in_image : str
Path to input image file.
mask : str or np.ndarray
Path to mask file or mask array. Can be binary or multi-label.
out_image : str
Path for output cropped image.
st_codes : list or np.ndarray, optional
Structure codes to include in cropping. If None, uses all non-zero values.
Default is None.
Returns
-------
str
Path to the created output image.
Raises
------
ValueError
If input parameters are invalid or files don't exist.
Examples
--------
>>> # Crop using binary mask
>>> output = crop_image_from_mask(
... 'brain.nii.gz', 'mask.nii.gz', 'cropped_brain.nii.gz'
... )
>>>
>>> # Crop specific structures
>>> output = crop_image_from_mask(
... 'image.nii.gz', 'segmentation.nii.gz', 'cropped.nii.gz',
... st_codes=[1, 2, 3]
... )
"""
if isinstance(in_image, str) == False:
raise ValueError("The 'image' parameter must be a string.")
if isinstance(mask, str):
if not os.path.exists(mask):
raise ValueError("The 'mask' parameter must be a string.")
else:
mask = nib.load(mask)
mask_data = mask.get_fdata()
elif isinstance(mask, np.ndarray):
mask_data = mask
else:
raise ValueError("The 'mask' parameter must be a string or a numpy array.")
if st_codes is None:
st_codes = np.unique(mask_data)
st_codes = st_codes[st_codes != 0]
st_codes = cltmisc.build_indices(st_codes)
st_codes = np.array(st_codes)
# Create the output directory if it does not exist
out_pth = os.path.dirname(out_image)
if os.path.exists(out_pth) == False:
Path(out_pth).mkdir(parents=True, exist_ok=True)
# Loading both images
img1 = nib.load(in_image) # Original MRI image
# Get data and affine matrices
img1_affine = img1.affine
# Get the destination shape
img1_data = img1.get_fdata()
img1_shape = img1_data.shape
# Finding the minimum and maximum indexes for the mask
tmask = np.isin(mask_data, st_codes)
tmp_var = np.argwhere(tmask)
# Minimum and maximum indexes for X axis
i_start = np.min(tmp_var[:, 0])
i_end = np.max(tmp_var[:, 0])
# Minimum and maximum indexes for Y axis
j_start = np.min(tmp_var[:, 1])
j_end = np.max(tmp_var[:, 1])
# Minimum and maximum indexes for Z axis
k_start = np.min(tmp_var[:, 2])
k_end = np.max(tmp_var[:, 2])
# If img1_data is a 4D array we need to multiply it by the mask in the last dimension only. If not, we multiply it by the mask
# Applying the mask
if len(img1_data.shape) == 4:
masked_data = img1_data * tmask[..., np.newaxis]
else:
masked_data = img1_data * tmask
# Creating a new Nifti image with the same affine and header as img1
array_img = nib.Nifti1Image(masked_data, img1_affine)
# Cropping the masked data
if len(img1_data.shape) == 4:
cropped_img = array_img.slicer[i_start:i_end, j_start:j_end, k_start:k_end, :]
else:
cropped_img = array_img.slicer[i_start:i_end, j_start:j_end, k_start:k_end]
# Saving the cropped image
nib.save(cropped_img, out_image)
return out_image
####################################################################################################
[docs]
def cropped_to_native(in_image: str, native_image: str, out_image: str) -> str:
"""
Restore cropped image to dimensions of reference native image.
Parameters
----------
in_image : str
Path to cropped image file.
native_image : str
Path to reference image defining target dimensions.
out_image : str
Path for output restored image.
Returns
-------
str
Path to the created output image.
Raises
------
ValueError
If input parameters are not strings.
Examples
--------
>>> # Restore cropped image to original dimensions
>>> restored = cropped_to_native(
... 'cropped_result.nii.gz',
... 'original.nii.gz',
... 'restored_result.nii.gz'
... )
"""
if isinstance(in_image, str) == False:
raise ValueError("The 'in_image' parameter must be a string.")
if isinstance(native_image, str) == False:
raise ValueError("The 'native_image' parameter must be a string.")
# Create the output directory if it does not exist
out_pth = os.path.dirname(out_image)
if os.path.exists(out_pth) == False:
Path(out_pth).mkdir(parents=True, exist_ok=True)
# Loading both images
img1 = nib.load(native_image) # Original MRI image
img2 = nib.load(in_image) # Cropped image
# Get data and affine matrices
img1_affine = img1.affine
img2_affine = img2.affine
# Get the destination shape
img1_data = img1.get_fdata()
img1_shape = img1_data.shape
# Get data from IM2
img2_data = img2.get_fdata()
img2_shape = img2_data.shape
# Multiply the inverse of the affine matrix of img1 by the affine matrix of img2
affine_mult = np.linalg.inv(img1_affine) @ img2_affine
# If the img2 is a 4D add the forth dimension to the shape of the img1
if len(img2_shape) == 4:
img1_shape = (img1_shape[0], img1_shape[1], img1_shape[2], img2_shape[3])
# Create an empty array with the same dimensions as IM1
new_data = np.zeros(img1_shape, dtype=img2_data.dtype)
for vol in range(img2_data.shape[-1]):
# Find the coordinates in voxels of the voxels different from 0 on the img2
indices = np.argwhere(img2_data[..., vol] != 0)
# Apply the affine transformation to the coordinates of the voxels different from 0 on img2
new_coords = np.round(
affine_mult
[docs]
@ np.concatenate((indices.T, np.ones((1, indices.shape[0]))), axis=0)
).astype(int)
# Fill the new image with the values of the voxels different from 0 on img2
new_data[new_coords[0], new_coords[1], new_coords[2], vol] = img2_data[
indices[:, 0], indices[:, 1], indices[:, 2], vol
]
elif len(img2_shape) == 3:
# Create an empty array with the same dimensions as IM1
new_data = np.zeros(img1_shape, dtype=img2_data.dtype)
# Find the coordinates in voxels of the voxels different from 0 on the img2
indices = np.argwhere(img2_data != 0)
# Apply the affine transformation to the coordinates of the voxels different from 0 on img2
new_coords = np.round(
affine_mult
@ np.concatenate((indices.T, np.ones((1, indices.shape[0]))), axis=0)
).astype(int)
# Fill the new image with the values of the voxels different from 0 on img2
new_data[new_coords[0], new_coords[1], new_coords[2]] = img2_data[
indices[:, 0], indices[:, 1], indices[:, 2]
]
# Create a new Nifti image with the same affine and header as IM1
new_img2 = nib.Nifti1Image(new_data, affine=img1_affine, header=img1.header)
# Save the new image
nib.save(new_img2, out_image)
return out_image
####################################################################################################
####################################################################################################
############ ############
############ ############
############ Section 4: Methods to apply transformations or changes of space ############
############ ############
############ ############
####################################################################################################
####################################################################################################
def apply_multi_transf(
in_image: str,
out_image: str,
ref_image: str,
xfm_output,
interp_order: int = 0,
invert: bool = False,
cont_tech: str = "local",
cont_image: str = None,
force: bool = False,
) -> None:
"""
Apply ANTs transformation to image with support for multiple transform types.
Parameters
----------
in_image : str
Path to input image.
out_image : str
Path for transformed output image.
ref_image : str
Path to reference image defining target space.
xfm_output : str
Path to transformation files (supports affine and nonlinear).
interp_order : int, optional
Interpolation method: 0=NearestNeighbor, 1=Linear, 2=BSpline, etc.
Default is 0.
invert : bool, optional
Whether to invert the transformation. Default is False.
cont_tech : str, optional
Container technology: 'local', 'singularity', 'docker'. Default is 'local'.
cont_image : str, optional
Container image specification. Default is None.
force : bool, optional
Force recomputation if output exists. Default is False.
Examples
--------
>>> # Apply transformation with nearest neighbor interpolation
>>> apply_multi_transf(
... 'input.nii.gz', 'output.nii.gz', 'template.nii.gz',
... 'transform_prefix', interp_order=0
... )
"""
# Check if the path of out_basename exists
out_path = os.path.dirname(out_image)
if not os.path.isdir(out_path):
os.makedirs(out_path)
if interp_order == 0:
interp_cad = "NearestNeighbor"
elif interp_order == 1:
interp_cad = "Linear"
elif interp_order == 2:
interp_cad = "BSpline[3]"
elif interp_order == 3:
interp_cad = "CosineWindowedSinc"
elif interp_order == 4:
interp_cad = "WelchWindowedSinc"
elif interp_order == 5:
interp_cad = "HammingWindowedSinc"
elif interp_order == 6:
interp_cad = "LanczosWindowedSinc"
elif interp_order == 7:
interp_cad = "Welch"
######## -- Registration to the template space ------------ #
# Creating spatial transformation folder
stransf_dir = Path(os.path.dirname(xfm_output))
stransf_name = os.path.basename(xfm_output)
if stransf_name.endswith(".nii.gz"):
stransf_name = stransf_name[:-7]
elif stransf_name.endswith(".nii") or stransf_name.endswith(".mat"):
stransf_name = stransf_name[:-4]
if stransf_name.endswith("_xfm"):
stransf_name = stransf_name[:-4]
if "_desc-" in stransf_name:
affine_name = cltbids.replace_entity_value(stransf_name, {"desc": "affine"})
nl_name = cltbids.replace_entity_value(stransf_name, {"desc": "warp"})
invnl_name = cltbids.replace_entity_value(stransf_name, {"desc": "iwarp"})
else:
affine_name = stransf_name + "_desc-affine"
nl_name = stransf_name + "_desc-warp"
invnl_name = stransf_name + "_desc-iwarp"
affine_transf = os.path.join(stransf_dir, affine_name + "_xfm.mat")
nl_transf = os.path.join(stransf_dir, nl_name + "_xfm.nii.gz")
invnl_transf = os.path.join(stransf_dir, invnl_name + "_xfm.nii.gz")
# Check if out_image is not computed and force is True
if not os.path.isfile(out_image) or force:
if not os.path.isfile(affine_transf):
print("The spatial transformation file does not exist.")
sys.exit()
if os.path.isfile(invnl_transf) and os.path.isfile(nl_transf):
if invert:
bashargs_transforms = [
"-t",
invnl_transf,
"-t",
"[" + affine_transf + ",1]",
]
else:
bashargs_transforms = ["-t", nl_transf, "-t", affine_transf]
else:
if invert:
bashargs_transforms = ["-t", "[" + affine_transf + ",1]"]
else:
bashargs_transforms = ["-t", affine_transf]
# Creating the command
cmd_bashargs = [
"antsApplyTransforms",
"-e",
"3",
"-i",
in_image,
"-r",
ref_image,
"-o",
out_image,
"-n",
interp_cad,
]
cmd_bashargs.extend(bashargs_transforms)
# Running containerization
cmd_cont = cltmisc.generate_container_command(
cmd_bashargs, cont_tech, cont_image
) # Generating container command
out_cmd = subprocess.run(
cmd_cont, stdout=subprocess.PIPE, universal_newlines=True
)
####################################################################################################
[docs]
def vox2mm(vox_coords, affine) -> np.ndarray:
"""
Convert voxel coordinates to millimeter coordinates using affine matrix.
Parameters
----------
vox_coords : np.ndarray
Matrix with voxel coordinates (N x 3).
affine : np.ndarray
4x4 affine transformation matrix.
Returns
-------
np.ndarray
Matrix with millimeter coordinates (N x 3).
Raises
------
ValueError
If input matrix doesn't have 3 columns.
Examples
--------
>>> # Convert voxel coordinates to mm
>>> vox_coords = np.array([[10, 20, 30], [15, 25, 35]])
>>> mm_coords = vox2mm(vox_coords, img.affine)
>>> print(f"MM coordinates: {mm_coords}")
"""
# Detect if the number of rows is bigger than the number of columns. If not, transpose the matrix
nrows = np.shape(vox_coords)[0]
ncols = np.shape(vox_coords)[1]
if (nrows < ncols) and (ncols > 3):
vox_coords = np.transpose(vox_coords)
if np.shape(vox_coords)[1] == 3:
npoints = np.shape(vox_coords)
vox_coords = np.c_[vox_coords, np.full(npoints[0], 1)]
mm_coords = np.matmul(affine, vox_coords.T)
mm_coords = np.transpose(mm_coords)
mm_coords = mm_coords[:, :3]
else:
# Launch an error if the number of columns is different from 3
raise ValueError("The number of columns of the input matrix must be 3")
return mm_coords
####################################################################################################
[docs]
def mm2vox(mm_coords, affine) -> np.ndarray:
"""
Convert millimeter coordinates to voxel coordinates using affine matrix.
Parameters
----------
mm_coords : np.ndarray
Matrix with millimeter coordinates (N x 3).
affine : np.ndarray
4x4 affine transformation matrix.
Returns
-------
np.ndarray
Matrix with voxel coordinates (N x 3).
Raises
------
ValueError
If input matrix doesn't have 3 columns.
Examples
--------
>>> # Convert mm coordinates to voxels
>>> mm_coords = np.array([[45.5, -12.3, 78.9]])
>>> vox_coords = mm2vox(mm_coords, img.affine)
>>> print(f"Voxel coordinates: {vox_coords}")
"""
# Convert to homogeneous coordinates (add column of ones)
n_points = mm_coords.shape[0]
coords_homogeneous = np.column_stack([mm_coords, np.ones(n_points)])
# Apply inverse affine transformation
# (to go from world coordinates to voxel coordinates)
affine_inv = np.linalg.inv(affine)
coords_vox_homogeneous = coords_homogeneous @ affine_inv.T
# Remove the homogeneous coordinate (last column)
coords_vox = coords_vox_homogeneous[:, :3]
return coords_vox
####################################################################################################
####################################################################################################
############ ############
############ ############
############ Section 5: Methods to work with 4D Images ############
############ ############
############ ############
####################################################################################################
####################################################################################################
[docs]
def merge_to_4d(
file_paths: Union[str, Path, List[Union[str, Path]]],
output_path: Optional[Union[str, Path]] = None,
check_affine: bool = True,
) -> Tuple[nib.Nifti1Image, Optional[Dict]]:
"""
Merge multiple 3D or 4D NIfTI files into a single 4D NIfTI file.
Accepts any mix of 3D (x, y, z) and 4D (x, y, z, n) volumes and
concatenates them along the 4th dimension. Sidecar files are handled
as follows:
- **.json**: merged whenever *all* inputs have an associated JSON file,
regardless of modality. Fields identical across files are kept as
scalars; differing fields use the value from the first file.
- **.bvec / .bval**: concatenated when *all* inputs have both files
(DWI detection). The number of directions is cross-checked against
the volume count of each NIfTI.
Parameters
----------
file_paths : str, Path, or list of str/Path
Path(s) to 3D or 4D NIfTI files to be merged. All files must share
the same spatial dimensions (x, y, z).
output_path : str or Path, optional
Path where the merged 4D NIfTI file should be saved. Sidecar files
(.bvec, .bval, .json) are written to the same directory using the
same stem when applicable. If None, nothing is written to disk.
check_affine : bool, default True
Whether to verify that all inputs share the same affine matrix.
Raises ValueError on mismatch when True.
Returns
-------
merged_img : nibabel.Nifti1Image
A 4D NIfTI image where the 4th dimension is the concatenation of all
input volumes in the order provided.
metadata : dict or None
Populated whenever any sidecar is detected; None otherwise. May
contain:
- ``"bvecs"`` : np.ndarray, shape (3, N) — concatenated gradient
directions (DWI only).
- ``"bvals"`` : np.ndarray, shape (N,) — concatenated b-values
(DWI only).
- ``"json"`` : dict — merged JSON sidecar (all modalities with
JSON sidecars).
Raises
------
ValueError
If spatial dimensions or affines are inconsistent, if only some inputs
have sidecar files, or if bvec/bval direction counts do not match the
volume count of the corresponding NIfTI.
FileNotFoundError
If any input NIfTI file cannot be found.
IOError
If any input file cannot be read.
Examples
--------
Mix of 3D and 4D volumes (no sidecars):
>>> merged_img, meta = merge_to_4d(['run1.nii.gz', 'run2.nii.gz'])
>>> meta is None
True
fMRI runs — JSON sidecars merged, no bvec/bval:
>>> merged_img, meta = merge_to_4d(
... ['sub-01_run-1_bold.nii.gz', 'sub-01_run-2_bold.nii.gz'],
... output_path='sub-01_bold.nii.gz',
... )
>>> 'bvecs' in meta
False
>>> 'json' in meta
True
DWI runs — bvec, bval, and JSON all concatenated:
>>> merged_img, meta = merge_to_4d(
... ['sub-01_run-1_dwi.nii.gz', 'sub-01_run-2_dwi.nii.gz'],
... output_path='sub-01_dwi.nii.gz',
... )
>>> meta['bvecs'].shape # (3, total_directions)
>>> meta['bvals'].shape # (total_directions,)
"""
# ------------------------------------------------------------------ #
# Input normalisation #
# ------------------------------------------------------------------ #
if not file_paths:
raise ValueError("file_paths cannot be empty")
if isinstance(file_paths, (str, Path)):
file_paths = [file_paths]
file_paths = [Path(fp) for fp in file_paths]
if output_path is not None:
output_path = Path(output_path)
if not output_path.parent.exists():
raise ValueError(f"Output directory does not exist: {output_path.parent}")
# ------------------------------------------------------------------ #
# Existence checks #
# ------------------------------------------------------------------ #
for fp in file_paths:
if not fp.exists():
raise FileNotFoundError(f"File not found: {fp}")
# ------------------------------------------------------------------ #
# Sidecar detection #
# ------------------------------------------------------------------ #
sidecars = [imgutils.get_sidecars_files(fp) for fp in file_paths]
has_bvec = [s["bvec"] is not None for s in sidecars]
has_bval = [s["bval"] is not None for s in sidecars]
has_json = [s["json"] is not None for s in sidecars]
is_dwi = all(has_bvec) and all(has_bval)
has_all_json = all(has_json)
partial = (
(any(has_bvec) and not all(has_bvec))
or (any(has_bval) and not all(has_bval))
or (any(has_json) and not all(has_json))
)
if partial:
missing = [
fp
for fp, bvec, bval, js in zip(file_paths, has_bvec, has_bval, has_json)
if not (bvec and bval and js)
]
raise ValueError(
"Sidecar files (.bvec/.bval/.json) were found for some—but not "
"all—input volumes. Either all inputs must have sidecars or none "
"should. Volumes missing complete sidecars:\n"
+ "\n".join(f" {fp}" for fp in missing)
)
if is_dwi:
print(
f"DWI sidecars detected for all {len(file_paths)} volumes. "
"Will concatenate .bvec and .bval files."
)
if has_all_json:
print(f"JSON sidecars detected for all {len(file_paths)} volumes. Will merge.")
# ------------------------------------------------------------------ #
# Load NIfTI volumes — accept 3D or 4D #
# ------------------------------------------------------------------ #
print(f"Loading {len(file_paths)} NIfTI files...")
try:
first_img = nib.load(file_paths[0])
first_data = first_img.get_fdata()
if first_data.ndim not in (3, 4):
raise ValueError(
f"Expected 3D or 4D data, got {first_data.ndim}D in {file_paths[0]}"
)
reference_spatial = first_data.shape[:3]
reference_affine = first_img.affine
reference_header = first_img.header.copy()
if first_data.ndim == 3:
first_data = first_data[..., np.newaxis]
data_arrays = [first_data]
except Exception as e:
raise IOError(f"Error loading first file {file_paths[0]}: {e}")
for fp in file_paths[1:]:
try:
img = nib.load(fp)
data = img.get_fdata()
if data.ndim not in (3, 4):
raise ValueError(f"Expected 3D or 4D data, got {data.ndim}D in {fp}")
if data.shape[:3] != reference_spatial:
raise ValueError(
f"Spatial shape mismatch: {fp} has shape {data.shape[:3]}, "
f"expected {reference_spatial}"
)
if check_affine and not np.allclose(
img.affine, reference_affine, atol=1e-6
):
raise ValueError(
f"Affine matrix mismatch in {fp}. "
"Set check_affine=False to ignore this check."
)
if data.ndim == 3:
data = data[..., np.newaxis]
data_arrays.append(data)
except Exception as e:
raise IOError(f"Error loading file {fp}: {e}")
# ------------------------------------------------------------------ #
# Concatenate along 4th dimension #
# ------------------------------------------------------------------ #
print("Merging volumes...")
merged_data = np.concatenate(data_arrays, axis=3)
new_header = reference_header.copy()
new_header.set_data_shape(merged_data.shape)
merged_img = nib.Nifti1Image(merged_data, reference_affine, new_header)
print(
f"Successfully merged {len(file_paths)} files into 4D image "
f"with shape {merged_data.shape}"
)
# ------------------------------------------------------------------ #
# Load and concatenate sidecars #
# ------------------------------------------------------------------ #
metadata = None
if is_dwi or has_all_json:
metadata = {}
if is_dwi:
print("Loading and concatenating DWI sidecars...")
all_bvecs = [dwiutils.load_bvecs(s["bvec"]) for s in sidecars]
all_bvals = [dwiutils.load_bvals(s["bval"]) for s in sidecars]
for fp, arr, bvecs, bvals in zip(file_paths, data_arrays, all_bvecs, all_bvals):
n_vols = arr.shape[3]
if bvecs.shape[1] != n_vols:
raise ValueError(
f"bvec columns ({bvecs.shape[1]}) do not match volume "
f"count ({n_vols}) in {fp}"
)
if bvals.shape[0] != n_vols:
raise ValueError(
f"bval entries ({bvals.shape[0]}) do not match volume "
f"count ({n_vols}) in {fp}"
)
metadata["bvecs"] = np.concatenate(all_bvecs, axis=1) # (3, total_N)
metadata["bvals"] = np.concatenate(all_bvals, axis=0) # (total_N,)
print(
f"DWI concatenation complete — "
f"{metadata['bvecs'].shape[1]} total directions, "
f"b-value range: [{metadata['bvals'].min():.0f}, {metadata['bvals'].max():.0f}]"
)
if has_all_json:
print("Merging JSON sidecars...")
metadata["json"] = imgutils.merge_json_files([s["json"] for s in sidecars])
# ------------------------------------------------------------------ #
# Save outputs #
# ------------------------------------------------------------------ #
if output_path is not None:
print(f"Saving merged 4D volume to: {output_path}")
nib.save(merged_img, output_path)
if metadata:
stem = output_path.name
for suffix in (".nii.gz", ".nii"):
if stem.endswith(suffix):
stem = stem[: -len(suffix)]
break
out_dir = output_path.parent
if is_dwi:
bvec_out = out_dir / f"{stem}.bvec"
bval_out = out_dir / f"{stem}.bval"
dwiutils.save_bvecs(metadata["bvecs"], bvec_out)
dwiutils.save_bvals(metadata["bvals"], bval_out)
print(f"Saved DWI sidecars:\n {bvec_out}\n {bval_out}")
if has_all_json:
json_out = out_dir / f"{stem}.json"
with open(json_out, "w") as f:
json.dump(metadata["json"], f, indent=2)
print(f"Saved merged JSON sidecar:\n {json_out}")
return output_path, metadata
else:
return merged_img.data, metadata
#####################################################################################################
[docs]
def create_spams(
in_parcs: Union[List[Path], List[str]],
out_spams: Union[str, Path],
lut_table: Union[str, Path, dict],
save_color_spams: bool = False,
rgb255: bool = True,
):
"""
Create SPAM probability maps from multiple parcellation images.
Generates spatial probability maps (SPAMs) from a list of parcellation
images, using a lookup table (LUT) for region definitions and colors.
Parameters
----------
in_parcs : list of str or Path
List of file paths to input parcellation images.
out_spams : str or Path
File path for the output SPAM image.
lut_table : str, Path, or dict
Path to LUT file (TSV or LUT format) or a dictionary with 'index',
'name', and 'color' keys defining regions.
save_color_spams : bool, optional
Whether to save a color-coded SPAM image. Default is False.
rgb255 : bool, optional
Whether to scale RGB colors to 0-255 range. Default is True.
Raises
------
ValueError
If input parameters are invalid.
Examples
--------
>>> # Create SPAMs from parcellations with LUT file
>>> create_spams(
... in_parcs=['subj1_parc.nii.gz', 'subj2_parc.nii.gz'],
... out_spams='group_spams.nii.gz
... lut_table='parc_lut.tsv',
... save_color_spams=True,
... rgb255=True
... )
"""
# Check if the in_parcs is a list
if not isinstance(in_parcs, list):
raise ValueError("The 'in_parcs' parameter must be a list of file paths.")
# Unify and check if all the file exist
in_parcs_checked = []
for parc in in_parcs:
if isinstance(parc, str):
parc_path = Path(parc)
elif isinstance(parc, Path):
parc_path = parc
else:
raise ValueError("Each item in 'in_parcs' must be a string or Path object.")
if not parc_path.exists():
raise FileNotFoundError(f"The file {parc_path} does not exist.")
in_parcs_checked.append(parc_path)
# Load the image with nibabel to check
for i in range(len(in_parcs_checked)):
# Load the image with nibabel to check
img = nib.load(str(in_parcs[i]))
data = img.get_fdata()
# Create a 4D volume with 0s with the same shape as the original and number of subjects as 4th dimension
if i == 0:
all_subj = np.zeros(data.shape + (len(in_parcs_checked),))
all_subj[..., i] = data
# Read the LUT file
# Check if str or Path and if it exists
if lut_table is None:
sts_ids = np.unique(all_subj)
lut_dict = cltcol.create_lut_dictionary(sts_ids)
else:
if isinstance(lut_table, dict):
# Check if the keys are correct
required_keys = {"index", "name", "color"}
if not required_keys.issubset(lut_table.keys()):
raise ValueError(
f"The LUT dictionary must contain the keys: {required_keys}"
)
else:
if isinstance(lut_table, (str, Path)):
lut_table = Path(lut_table)
if not lut_table.exists():
lut_dict = cltcol.create_lut_dictionary(np.unique(all_subj))
else:
with open(lut_table, "r") as f:
first_line = f.readline().strip()
# TSV format has tab-separated header: index, name, color
if "\t" in first_line and "index" in first_line.lower():
lut_dict = cltparc.Parcellation.read_tsvtable(
in_file=str(lut_table)
)
else:
lut_dict = cltcol.ColorTableLoader.read_luttable(
str(lut_table)
)
sts_ids = lut_dict["index"]
sts_names = lut_dict["name"]
sts_colors = lut_dict["color"]
sts_colors = cltcol.multi_hex2rgb(sts_colors)
if rgb255:
sts_colors = cltcol.harmonize_colors(sts_colors, output_format="rgb")
else:
sts_colors = cltcol.harmonize_colors(sts_colors, output_format="rgbnorm")
# Create the SPAM image
spam_image = create_spams_from_volume(all_subj, sts_ids)
# Save the SPAM image
# Check if the output directory exists
if isinstance(out_spams, str):
out_spams = Path(out_spams)
# If the directory does not exist, lunch an error
out_dir = out_spams.parent
if not out_dir.exists():
raise ValueError(f"The output directory {out_dir} does not exist.")
spam_nifti = nib.Nifti1Image(spam_image, img.affine)
nib.save(spam_nifti, str(out_spams))
# Save the color SPAM image if required
if save_color_spams:
spams_dim = spam_image.shape
color_spam_image = np.zeros((spams_dim[0], spams_dim[1], spams_dim[2], 3))
# Take the same name and and colored after the original basename
colored_spam_name = os.path.splitext(out_spams.name)[0] + "_colored.nii.gz"
# Full path
color_spam_path = out_spams.with_name(colored_spam_name)
for i, vol_index in enumerate(sts_ids):
color = sts_colors[i]
color_spam_image[..., 0] = (
color_spam_image[..., 0] + spam_image[..., i] * color[0]
)
color_spam_image[..., 1] = (
color_spam_image[..., 1] + spam_image[..., i] * color[1]
)
color_spam_image[..., 2] = (
color_spam_image[..., 2] + spam_image[..., i] * color[2]
)
color_spam_nifti = nib.Nifti1Image(color_spam_image, img.affine)
color_spam_path = os.path.join(out_dir, colored_spam_name)
nib.save(color_spam_nifti, str(color_spam_path))
####################################################################################################
[docs]
def spams2maxprob(
spam_image: str,
prob_thresh: float = 0.05,
vol_indexes: np.array = None,
maxp_name: str = None,
):
"""
Convert SPAM probability maps to maximum probability parcellation.
Transforms 4D spatial probability maps into discrete 3D parcellation by
selecting the most probable label at each voxel, with optional thresholding
and volume selection.
Parameters
----------
spam_image : str
Path to 4D SPAM image file with probability maps for each region.
prob_thresh : float, optional
Minimum probability threshold. Values below this are set to zero.
Default is 0.05.
vol_indexes : np.ndarray, optional
Indices of volumes (regions) to include. If None, uses all volumes.
Default is None.
maxp_name : str, optional
Output file path for maximum probability image. If None, returns
array without saving. Default is None.
Returns
-------
str or np.ndarray
Output file path if maxp_name provided, otherwise numpy array
with maximum probability labels.
Notes
-----
The conversion process:
1. Applies probability threshold to remove low-confidence voxels
2. Optionally filters to specific volume indices
3. Finds maximum probability across volumes for each voxel
4. Assigns winner-take-all labels (1-indexed)
5. Sets background where no probabilities exceed threshold
Useful for converting probabilistic atlases to discrete parcellations
for analysis requiring discrete labels.
Examples
--------
>>> # Convert SPAM to discrete parcellation
>>> maxprob_file = spams2maxprob(
... 'AAL_SPAM.nii.gz',
... prob_thresh=0.1,
... maxp_name='AAL_discrete.nii.gz'
... )
>>>
>>> # Get array without saving, specific regions only
>>> selected_regions = np.array([0, 1, 2, 5, 6]) # Volume indices
>>> maxprob_array = spams2maxprob(
... 'probabilistic_atlas.nii.gz',
... vol_indexes=selected_regions,
... prob_thresh=0.2
... )
>>> print(f"Max label: {maxprob_array.max()}")
"""
spam_img = nib.load(spam_image)
affine = spam_img.affine
spam_vol = spam_img.get_fdata()
spam_vol[spam_vol < prob_thresh] = 0
spam_vol[spam_vol > 1] = 1
if vol_indexes is not None:
# Creating the maxprob
# I want to find the complementary indexes to vol_indexes
all_indexes = np.arange(0, spam_vol.shape[3])
set1 = set(all_indexes)
set2 = set(vol_indexes)
# Find the symmetric difference
diff_elements = set1.symmetric_difference(set2)
# Convert the result back to a NumPy array if needed
diff_array = np.array(list(diff_elements))
spam_vol[:, :, :, diff_array] = 0
# array_data = np.delete(spam_vol, diff_array, 3)
ind = np.where(np.sum(spam_vol, axis=3) == 0)
maxprob_thl = spam_vol.argmax(axis=3) + 1
maxprob_thl[ind] = 0
if maxp_name is not None:
# Save the image
imgcoll = nib.Nifti1Image(maxprob_thl.astype("int16"), affine)
nib.save(imgcoll, maxp_name)
else:
maxp_name = maxprob_thl
return maxp_name
#####################################################################################################
[docs]
def simulate_image(
input_image: Union[str, nib.Nifti1Image],
simulated_image: str = None,
n_volumes: int = 3,
distribution: str = "normal",
random_seed: Optional[int] = None,
**dist_params,
) -> nib.Nifti1Image:
"""
Generate a simulated image with random values at non-zero voxel positions.
This function creates a new NIfTI image where non-zero voxels from the input
image are filled with random values following a specified statistical distribution.
The output preserves the spatial dimensions, affine transformation, and header
information from the input image.
This is useful for simulating functional or structural images based on a mask
or anatomical image, allowing for controlled random value generation in specific
regions of interest.
Parameters
----------
input_image : str or nibabel.Nifti1Image
Input image used as a mask. Can be either a file path to a NIfTI image
or a nibabel Nifti1Image object.
simulated_image : str
Output file path where the simulated image will be saved. Must include
the .nii or .nii.gz extension. If None, the function will generate a
default name based on the input image.
n_volumes : int, default=3
Number of volumes in the output image:
- If n_volumes == 1: creates a 3D image
- If n_volumes > 1: creates a 4D image with n_volumes timepoints
distribution : str, default='normal'
Statistical distribution for random value generation. Supported options:
- 'normal': Normal (Gaussian) distribution
- 'uniform': Uniform distribution
- 'exponential': Exponential distribution
random_seed : int, optional
Random seed for reproducible results. If None, uses system time.
**dist_params : dict
Distribution-specific parameters:
- For 'normal': loc (mean, default=0), scale (std, default=1)
- For 'uniform': low (default=0), high (default=1)
- For 'exponential': scale (default=1)
Returns
-------
nibabel.Nifti1Image
The simulated image object with the same spatial properties as input.
Raises
------
FileNotFoundError
If input_image path does not exist.
ValueError
If input_image is not a valid type, distribution is unsupported,
n_volumes is invalid, or output directory doesn't exist.
RuntimeError
If no non-zero voxels are found in the input image.
Examples
--------
>>> # Create 3D simulation with normal distribution
>>> sim_img = simulate_image(
... 'brain_mask.nii.gz',
... 'output_3d.nii.gz',
... n_volumes=1,
... distribution='normal',
... loc=0,
... scale=1,
... random_seed=42
... )
>>> # Create 4D simulation with uniform distribution
>>> sim_img = simulate_image(
... 'brain_mask.nii.gz',
... 'output_4d.nii.gz',
... n_volumes=10,
... distribution='uniform',
... low=0,
... high=100
... )
Notes
-----
- Only voxels with non-zero values in the input image will contain random values
- All other voxels remain zero in the output
- The function preserves the input image's affine transformation and header
- Output file extension should be .nii or .nii.gz
"""
# Input validation
if not isinstance(n_volumes, int) or n_volumes < 1:
raise ValueError("n_volumes must be a positive integer")
if distribution not in ["normal", "uniform", "exponential"]:
raise ValueError(
f"Unsupported distribution '{distribution}'. "
"Supported: 'normal', 'uniform', 'exponential'"
)
# Set random seed if provided
if random_seed is not None:
np.random.seed(random_seed)
# Load and validate input image
if isinstance(input_image, str):
if not os.path.exists(input_image):
raise FileNotFoundError(f"Input image file not found: {input_image}")
try:
input_img = nib.load(input_image)
except Exception as e:
raise ValueError(f"Failed to load input image: {e}")
elif isinstance(input_image, nib.Nifti1Image):
input_img = input_image
else:
raise ValueError(
"input_image must be a file path (str) or nibabel.Nifti1Image object"
)
# Validate simulated_image path. If None, create a temporary filename
if simulated_image is None:
simulated_image = cltmisc.create_temporary_filename()
# Validate output path
output_dir = os.path.dirname(os.path.abspath(simulated_image))
if output_dir and not os.path.exists(output_dir):
raise ValueError(f"Output directory does not exist: {output_dir}")
if not simulated_image.endswith((".nii", ".nii.gz")):
warnings.warn(
"Output filename should have .nii or .nii.gz extension", UserWarning
)
# Extract image properties
input_data = input_img.get_fdata()
affine = input_img.affine.copy()
header = input_img.header.copy()
original_shape = input_data.shape
# Create mask for non-zero voxels
mask = input_data != 0
n_nonzero_voxels = np.sum(mask)
if n_nonzero_voxels == 0:
raise RuntimeError("No non-zero voxels found in input image")
# Distribution parameter validation and random value generation
def _generate_random_values(size: int) -> np.ndarray:
"""Generate random values based on specified distribution."""
try:
if distribution == "normal":
loc = dist_params.get("loc", 0.0)
scale = dist_params.get("scale", 1.0)
if scale <= 0:
raise ValueError(
"Scale parameter for normal distribution must be positive"
)
return np.random.normal(loc, scale, size)
elif distribution == "uniform":
low = dist_params.get("low", 0.0)
high = dist_params.get("high", 1.0)
if low >= high:
raise ValueError(
"Low parameter must be less than high parameter for uniform distribution"
)
return np.random.uniform(low, high, size)
elif distribution == "exponential":
scale = dist_params.get("scale", 1.0)
if scale <= 0:
raise ValueError(
"Scale parameter for exponential distribution must be positive"
)
return np.random.exponential(scale, size)
except Exception as e:
raise ValueError(f"Error generating random values: {e}")
# Create simulated data array
if n_volumes == 1:
# 3D output
simulated_data = np.zeros(original_shape, dtype=np.float32)
simulated_data[mask] = _generate_random_values(n_nonzero_voxels)
output_shape = original_shape
else:
# 4D output
spatial_shape = (
original_shape[:3] if len(original_shape) >= 3 else original_shape
)
output_shape = spatial_shape + (n_volumes,)
simulated_data = np.zeros(output_shape, dtype=np.float32)
# Fill each volume with independent random values
for volume_idx in range(n_volumes):
simulated_data[..., volume_idx][mask] = _generate_random_values(
n_nonzero_voxels
)
# Update header for correct dimensionality
header_copy = header.copy()
if n_volumes == 1 and len(original_shape) > 3:
# Convert from 4D+ to 3D
header_copy.structarr["dims"][4] = 1
elif n_volumes > 1:
# Ensure 4D header
header_copy.structarr["dims"][3] = n_volumes
# Create output image
try:
simulated_img = nib.Nifti1Image(simulated_data, affine, header_copy)
nib.save(simulated_img, simulated_image)
except Exception as e:
raise RuntimeError(f"Failed to create or save simulated image: {e}")
# Log summary information
print(f"Successfully created simulated image:")
print(f" Input shape: {original_shape}")
print(f" Output shape: {output_shape}")
print(f" Non-zero voxels: {n_nonzero_voxels:,}")
print(f" Distribution: {distribution}")
print(f" Saved to: {simulated_image}")
return simulated_img
#####################################################################################################
[docs]
def delete_volumes_from_4D_images(
in_image: str,
out_image: str,
vols_to_delete: List[Union[int, tuple, list, str, np.ndarray]] = None,
overwrite: bool = False,
) -> Tuple[str, List[int]]:
"""
Remove specific volumes from a 4D neuroimaging file.
This function removes specified volumes from 4D NIfTI images (such as fMRI time series,
DTI volumes, or other 4D datasets) and saves the result to a new file. It supports
flexible volume specification including individual indices, ranges, and complex
string-based patterns.
Parameters
----------
in_image : str
Path to the input 4D NIfTI image file (.nii or .nii.gz).
The file must exist and be a valid 4D image.
out_image : str
Path where the output 4D image will be saved. The directory must exist.
If the file already exists, use `overwrite=True` to replace it.
vols_to_delete : list of int, tuple, list, np.ndarray, or str
Specification of volumes to remove from the 4D image. Supports multiple formats:
**Individual integers:**
Single volume indices to remove (0-based indexing).
**Tuples of 2 integers:**
Ranges specified as (start, end) - both endpoints are included.
Example: (5, 8) removes volumes [5, 6, 7, 8]
**Lists or numpy arrays:**
Collections of volume indices, automatically flattened.
**Strings (flexible syntax):**
Powerful string-based specification supporting:
- Single numbers: "5" → [5]
- Hyphen ranges: "8-10" → [8, 9, 10]
- Colon ranges: "11:13" → [11, 12, 13]
- Step ranges: "14:2:22" → [14, 16, 18, 20, 22]
- Comma-separated: "1, 2, 3" → [1, 2, 3]
- Mixed combinations: "0-2, 5, 10:2:14, 20" → [0, 1, 2, 5, 10, 12, 14, 20]
**Note:** All formats can be mixed in a single list.
overwrite : bool, default=False
Whether to overwrite the output file if it already exists.
If False and the output file exists, raises FileExistsError.
Returns
-------
out_image : str
Path to the successfully created output image file.
vols_removed : list of int
Sorted list of all volume indices that were removed from the original image.
Useful for verification and logging purposes.
Raises
------
FileNotFoundError
- If the input image file does not exist
- If the output directory does not exist
ValueError
- If vols_to_delete is None or empty after parsing
- If the input image is not 4D (wrong number of dimensions)
- If any volume indices are out of range (negative or >= number of volumes)
FileExistsError
If the output file already exists and overwrite=False.
RuntimeError
If attempting to delete all volumes (would result in empty image).
Notes
-----
- Volume indexing is 0-based (first volume is index 0)
- Duplicate volume indices are automatically removed
- The function preserves the original image's affine transformation and header
- Memory usage scales with image size; very large 4D images may require substantial RAM
- The function validates all volume indices before processing to prevent partial failures
**Performance considerations:**
- Loading and processing large 4D images may take significant time and memory
- Consider processing in chunks for very large datasets
**File format support:**
- Input: .nii and .nii.gz files
- Output: Format determined by output filename extension
Examples
--------
**Basic usage - Remove specific volumes:**
>>> delete_volumes_from_4D_images(
... 'fmri_data.nii.gz',
... 'fmri_cleaned.nii.gz',
... vols_to_delete=[0, 1, 2, 99],
... overwrite=True
... )
**Remove ranges using tuples:**
>>> delete_volumes_from_4D_images(
... 'dti_data.nii.gz',
... 'dti_subset.nii.gz',
... vols_to_delete=[(0, 4), (95, 99)], # Remove first 5 and last 5 volumes
... overwrite=True
... )
**String-based range specification:**
>>> delete_volumes_from_4D_images(
... 'timeseries.nii.gz',
... 'timeseries_clean.nii.gz',
... vols_to_delete=["0-4", "95:99"], # Same as above using strings
... overwrite=True
... )
**Complex mixed specification:**
>>> delete_volumes_from_4D_images(
... 'bold_data.nii.gz',
... 'bold_processed.nii.gz',
... vols_to_delete=[
... "0-2", # Remove first 3 volumes (motion artifacts)
... (147, 149), # Remove volumes 147-149 (spike artifacts)
... "200:5:220", # Remove every 5th volume from 200-220
... [300, 301, 302], # Remove specific outlier volumes
... "450" # Remove final volume
... ],
... overwrite=True
... )
**Advanced string patterns:**
>>> # Remove multiple ranges and individual volumes in one string
>>> delete_volumes_from_4D_images(
... 'input.nii.gz',
... 'output.nii.gz',
... vols_to_delete=["0-2, 5, 10:2:14, 20-22, 50"],
... overwrite=True
... )
>>> # This removes: [0,1,2,5,10,12,14,20,21,22,50]
"""
# Check if input file exists
if not os.path.isfile(in_image):
raise FileNotFoundError(f"File {in_image} not found.")
# Check if volumes to delete is specified
if vols_to_delete is None:
raise ValueError(
"vols_to_delete parameter is required. Please specify which volumes to remove."
)
# Ensure vols_to_delete is a list
if not isinstance(vols_to_delete, list):
vols_to_delete = [vols_to_delete]
# Convert vols_to_delete to a flat list of integers
vols_to_delete = cltmisc.build_indices(vols_to_delete, nonzeros=False)
# Check if vols_to_delete is not empty
if len(vols_to_delete) == 0:
print("No volumes to delete. The volumes to delete list is empty.")
return in_image, []
# Create output filename if not specified
out_path = os.path.dirname(out_image)
# Check if the output path exists otherwise give an error
if out_path and not os.path.exists(out_path):
raise FileNotFoundError(f"Output directory {out_path} does not exist.")
if os.path.exists(out_image) and not overwrite:
raise FileExistsError(
f"Output file {out_image} already exists. Use 'overwrite=True' to replace it."
)
# Load the 4D image
img = nib.load(in_image)
# Get the dimensions of the image
dim = img.shape
# Check if the image is 4D
if len(dim) != 4:
raise ValueError(
f"Image {in_image} is not a 4D image. It has {len(dim)} dimensions."
)
# Get the number of volumes
nvols = dim[3]
# Check if trying to delete all volumes
if len(vols_to_delete) == nvols:
print(
"Number of volumes to delete is equal to the total number of volumes. No volumes will be deleted."
)
return in_image, []
# Check if volumes to delete are in valid range
if np.max(vols_to_delete) >= nvols:
vols_to_delete_array = np.array(vols_to_delete)
out_of_range = np.where(vols_to_delete_array >= nvols)[0]
raise ValueError(
f"Volumes out of range: {vols_to_delete_array[out_of_range]}. "
f"Values should be between 0 and {nvols-1}."
)
if np.min(vols_to_delete) < 0:
raise ValueError(
f"Volumes to delete {vols_to_delete} contain negative indices. "
f"Values should be between 0 and {nvols-1}."
)
# Get volumes to keep and remove
vols_to_remove = np.array(vols_to_delete)
vols_to_keep = np.where(np.isin(np.arange(nvols), vols_to_remove, invert=True))[0]
# Load image data
img_data = img.get_fdata()
affine = img.affine
header = img.header
# Remove the specified volumes
filtered_data = img_data[:, :, :, vols_to_keep]
# Create new image with filtered data
new_img = nib.Nifti1Image(filtered_data, affine, header)
# Save the new image
nib.save(new_img, out_image)
print(f"Successfully removed {len(vols_to_remove)} volumes from {in_image}")
print(f"Output saved to: {out_image}")
print(f"Volumes removed: {vols_to_remove.tolist()}")
return out_image, vols_to_remove.tolist()
####################################################################################################
####################################################################################################
############ ############
############ ############
############ Section 6: Methods to perform operations from 3D or 4D arrays ############
############ ############
############ ############
####################################################################################################
####################################################################################################
#####################################################################################################
####################################################################################################
[docs]
def create_spams_from_volume(
indiv_parc: np.ndarray, sts_ids: Union[List[int], np.ndarray]
) -> np.ndarray:
"""
Create SPAMs (Spatial Probability Maps) from individual parcellation volumes.
Parameters
----------
indiv_parc : numpy.ndarray
4D array with dimensions (X, Y, Z, N) where N is the number of subjects.
Each voxel contains integer labels representing different structures.
sts_ids : list or numpy.ndarray
List or array of integer structure IDs for which to create SPAMs.
Returns
-------
numpy.ndarray
4D array with dimensions (X, Y, Z, M) where M is the number of structure IDs.
Each voxel contains the proportion of subjects that have the corresponding
structure ID at that voxel.
Raises
------
ValueError
If indiv_parc is not a 4D numpy array.
ValueError
If sts_ids is not a list or numpy array of integers.
Notes
-----
- The function computes the proportion of subjects that have each specified
structure ID at each voxel.
- The output SPAMs can be used for group-level analyses in neuroimaging studies.
Examples
--------
>>> # Example usage
>>> indiv_parc = np.random.randint(0, 5, size=(64, 64, 64, 10)) # 10 subjects with labels 0-4
>>> sts_ids = [1, 2, 3]
>>> spams = create_spams_from_volume(indiv_parc, sts_ids)
>>> print(spams.shape) # Output shape will be (64, 64, 64, 3)
"""
# Validate inputs
if not isinstance(indiv_parc, np.ndarray) or indiv_parc.ndim != 4:
raise ValueError("indiv_parc must be a 4D numpy array (X, Y, Z, N).")
if not isinstance(sts_ids, (list, np.ndarray)) or not all(
isinstance(id, int) for id in sts_ids
):
raise ValueError("sts_ids must be a list or numpy array of integers.")
# Creating the SPAMs
# Get the dimensions of the data
data_shape = indiv_parc.shape
spam_image = np.zeros(data_shape[0:3] + (len(sts_ids),), dtype=np.float32)
for cont, sts_id in enumerate(sts_ids):
tmp_sts_img = indiv_parc == sts_id
# Convert to logical and sum across subjects
tmp_sts_img = tmp_sts_img.astype(np.int16)
sum_img = np.sum(tmp_sts_img, axis=3)
# Divide by number of subjects to get proportion
spam_image[:, :, :, cont] = sum_img / data_shape[3]
return spam_image
#####################################################################################################
[docs]
def spams2maxprob_from_volume(
spam_vol: np.ndarray,
prob_thresh: float = 0.0,
vol_indexes: Union[List[int], np.ndarray] = None,
) -> np.ndarray:
"""
Convert SPAMs (Spatial Probability Maps) to a maximum probability parcellation volume.
Parameters
----------
spam_vol : numpy.ndarray
4D array with dimensions (X, Y, Z, M) where M is the number of structures.
Each voxel contains the probability of belonging to each structure.
prob_thresh : float, optional
Probability threshold to apply before determining maximum probability.
Voxels with probabilities below this threshold will be set to zero. Default is 0.0.
vol_indexes : list or numpy.ndarray, optional
List or array of integer structure indexes to consider for maximum probability.
If provided, only these structures will be considered. Default is None.
Returns
-------
numpy.ndarray
3D array with dimensions (X, Y, Z) where each voxel contains the index of the structure
with the maximum probability, or 0 if below the threshold.
Raises
------
ValueError
If spam_vol is not a 4D numpy array.
Notes
-----
- The function applies a probability threshold to the SPAMs and then determines
the structure with the maximum probability at each voxel.
- Voxels with probabilities below the threshold are assigned a value of 0 in the output
- If vol_indexes is provided, only those structures are considered for maximum probability.
Examples
--------
>>> # Example usage
>>> spam_vol = np.random.rand(64, 64, 64, 5) # SPAMs for 5 structures
>>> maxprob_vol = spams2maxprob_from_volume(spam_vol, prob_thresh=0.2)
>>> print(maxprob_vol.shape) # Output shape will be (64, 64, 64)
"""
# Validate inputs
if not isinstance(spam_vol, np.ndarray) or spam_vol.ndim != 4:
raise ValueError("spam_vol must be a 4D numpy array (X, Y, Z, M).")
spam_vol[spam_vol < prob_thresh] = 0
spam_vol[spam_vol > 1] = 1
if vol_indexes is not None:
# Creating the maxprob
# I want to find the complementary indexes to vol_indexes
all_indexes = np.arange(0, spam_vol.shape[3])
set1 = set(all_indexes)
set2 = set(vol_indexes)
# Find the symmetric difference
diff_elements = set1.symmetric_difference(set2)
# Convert the result back to a NumPy array if needed
diff_array = np.array(list(diff_elements))
spam_vol[:, :, :, diff_array] = 0
# array_data = np.delete(spam_vol, diff_array, 3)
ind = np.where(np.sum(spam_vol, axis=3) == 0)
maxprob_thl = spam_vol.argmax(axis=3) + 1
maxprob_thl[ind] = 0
return maxprob_thl
####################################################################################################
[docs]
def compute_statistics_at_nonzero_voxels(
mask_array: np.ndarray, data_array: np.ndarray, metric: str = "mean"
) -> np.ndarray:
"""
Compute the value of certain statistic from data_array at positions where mask_array is non-zero.
Parameters
-----------
mask_array : numpy.ndarray
3D array with dimensions (N, M, P) - used to find non-zero positions
Non-zero positions in this array indicate where to compute the statistic in data_array.
data_array : numpy.ndarray
3D array (N, M, P) or 4D array (N, M, P, T) - data to compute mean from
If 3D, computes a single mean value.
If 4D, computes mean across non-zero positions for each time point T.
metric : str, optional
Metric to compute at non-zero positions. Default is "mean".
Supported metrics: "mean", "std", "var", "median", "sum", "max", "min".
Returns
--------
numpy.ndarray or float
- If data_array is 3D: returns a single float value
- If data_array is 4D: returns a 1D array of length T
Raises
-------
ValueError
If data_array is not 3D or 4D.
ValueError
If metric is not one of the supported metrics.
Notes
-----
- The function extracts values from data_array at positions where mask_array is non-zero.
- For 3D data_array, it computes the mean of all selected values.
- For 4D data_array, it computes the mean across non-zero positions for each time point,
returning a 1D array with the mean for each time point.
- If mask_array has no non-zero positions, the function will return NaN for 3D data_array
or an array of NaNs for 4D data_array.
Examples
---------
>>> # Example with 3D data_array
>>> mask = np.array([[[0, 1], [0, 0]],
... [[1, 0], [0, 1]]])
>>> data = np.array([[[1, 2], [3, 4]],
... [[5, 6], [7, 8]]])
>>> mean_value = compute_statistics_at_nonzero_voxels(mask, data)
>>> print(mean_value) # Output: 4.5 (mean of 2, 5, 6, 8)
>>>
>>> # Example with 4D data_array
>>> mask_4d = np.array([[[0, 1], [0, 0]],
... [[1, 0], [0, 1]]])
>>> data_4d = np.array([[[[1, 2], [3, 4]],
... [[5, 6], [7, 8]]],
... [[[9, 10], [11, 12]],
... [[13, 14], [15, 16]]]])
>>> mean_values_4d = compute_statistics_at_nonzero_voxels(mask_4d, data_4d)
>>> print(mean_values_4d) # Output: [ 6. 8. 10. 12.] (mean for each time point
1, 2, 3, 4)
"""
# Validate metric
metric = metric.lower()
valid_metrics = ["mean", "std", "var", "median", "sum", "max", "min"]
if metric not in valid_metrics:
raise ValueError(
f"Invalid metric '{metric}'. Supported metrics: {', '.join(valid_metrics)}"
)
# Find indices where mask_array is non-zero
nonzero_mask = mask_array != 0
# Check if data_array is 3D or 4D
if data_array.ndim == 3:
# 3D case: extract values at non-zero positions and compute mean
selected_values = data_array[nonzero_mask]
if len(selected_values) == 0:
return np.nan
if metric == "mean":
return np.mean(selected_values)
elif metric == "std":
return np.std(selected_values)
elif metric == "var":
return np.var(selected_values)
elif metric == "median":
return np.median(selected_values)
elif metric == "sum":
return np.sum(selected_values)
elif metric == "max":
return np.max(selected_values)
elif metric == "min":
return np.min(selected_values)
elif data_array.ndim == 4:
# 4D case: extract values at non-zero positions for each time point
selected_values = data_array[nonzero_mask, :] # Shape: (num_nonzero_voxels, T)
if metric == "mean":
return np.mean(selected_values, axis=0) # Mean across voxels, returns (T,)
elif metric == "std":
return np.std(selected_values, axis=0) # Std across voxels, returns (T,)
elif metric == "var":
return np.var(
selected_values, axis=0
) # Variance across voxels, returns (T,)
elif metric == "median":
return np.median(
selected_values, axis=0
) # Median across voxels, returns (T,)
elif metric == "sum":
return np.sum(selected_values, axis=0) # Sum across voxels, returns (T,)
elif metric == "max":
return np.max(selected_values, axis=0) # Max across voxels, returns (T,)
elif metric == "min":
return np.min(selected_values, axis=0) # Min across voxels, returns (T,)
else:
raise ValueError("data_array must be 3D or 4D")
#####################################################################################################
[docs]
def interpolate(
scalar_data: np.ndarray, vertices_vox: np.ndarray, interp_method: str = "linear"
) -> np.ndarray:
"""
Interpolate 3D or 4D scalar data at specified voxel coordinates using regular grid interpolation.
Parameters
----------
scalar_data : np.ndarray
3D or 4D array of scalar values to interpolate from.
vertices_vox : np.ndarray
Nx3 array of voxel coordinates where interpolation is desired.
interp_method : str, optional
Interpolation method: 'linear', 'nearest', or 'slinear'. Default is 'linear'.
Returns
-------
np.ndarray
Interpolated scalar values at the specified voxel coordinates.
Raises
------
ValueError
If scalar_data is not 3D or 4D, or if vertices_vox is not Nx3.
If interp_method is not one of the supported methods.
Notes
-----
- Uses scipy's RegularGridInterpolator for interpolation.
- Supports both 3D and 4D scalar data.
- Interpolates each volume separately if scalar_data is 4D.
- Out-of-bounds coordinates are filled with 0.
"""
# Validate inputs
if scalar_data.ndim not in [3, 4]:
raise ValueError("scalar_data must be a 3D or 4D numpy array.")
if vertices_vox.ndim != 2 or vertices_vox.shape[1] != 3:
raise ValueError("vertices_vox must be a Nx3 numpy array.")
if interp_method not in ["linear", "nearest", "slinear"]:
raise ValueError("interp_method must be 'linear', 'nearest', or 'slinear'.")
# Define grid points based on scalar_data shape # Creating interpolation function
x = np.arange(scalar_data.shape[0])
y = np.arange(scalar_data.shape[1])
z = np.arange(scalar_data.shape[2])
# Interpolate scalar data at specified voxel coordinates
if scalar_data.ndim == 3:
my_interpolating_scalmap = RegularGridInterpolator(
(x, y, z),
scalar_data, # Removed .data
method=interp_method,
bounds_error=False,
fill_value=0,
)
interpolated_data = my_interpolating_scalmap(vertices_vox)
elif scalar_data.ndim == 4:
# If 4D, interpolate each volume separately and stack results
interpolated_data = np.zeros((vertices_vox.shape[0], scalar_data.shape[3]))
for t in range(scalar_data.shape[3]):
my_interpolating_scalmap = RegularGridInterpolator(
(x, y, z),
scalar_data[:, :, :, t], # Removed .data
method=interp_method,
bounds_error=False,
fill_value=0,
)
interpolated_data[:, t] = my_interpolating_scalmap(vertices_vox)
return interpolated_data
#####################################################################################################
[docs]
def region_growing(
iparc: np.ndarray, mask: Union[np.ndarray, np.bool_], neighborhood="26"
) -> np.ndarray:
"""
Fill gaps in parcellation using region growing algorithm.
Labels unlabeled voxels within the mask by assigning the most frequent
label among their labeled neighbors, iteratively until convergence or
no more voxels can be labeled.
Parameters
----------
iparc : np.ndarray
3D parcellation array with labeled (>0) and unlabeled (0) voxels.
mask : np.ndarray or np.bool_
3D binary mask defining the region where growing should occur.
neighborhood : str, optional
Neighborhood connectivity: '6', '18', or '26' for 3D. Default is '26'.
Returns
-------
np.ndarray
Updated parcellation array with gaps filled, masked to input mask.
Notes
-----
The algorithm works iteratively:
1. Identifies unlabeled voxels with at least one labeled neighbor
2. For each candidate voxel, finds most frequent label among neighbors
3. In case of ties, selects label from spatially closest neighbor
4. Repeats until no more voxels can be labeled or convergence
Particularly useful for:
- Filling gaps in atlas-based parcellations
- Completing partial segmentations
- Correcting registration artifacts
Examples
--------
>>> # Fill gaps in parcellation
>>> filled_parc = region_growing(parcellation_array, brain_mask)
>>> print(f"Filled {np.sum(filled_parc > 0) - np.sum(parcellation_array > 0)} voxels")
>>>
>>> # Use 6-connectivity for more conservative growing
>>> conservative_fill = region_growing(
... incomplete_labels,
... region_mask,
... neighborhood='6'
... )
"""
# Create a binary array where labeled voxels are marked as 1
binary_labels = (iparc > 0).astype(int)
# Convolve with the kernel to count labeled neighbors for each voxel
kernel = np.array(
[
[[1, 1, 1], [1, 1, 1], [1, 1, 1]],
[[1, 1, 1], [1, 0, 1], [1, 1, 1]],
[[1, 1, 1], [1, 1, 1], [1, 1, 1]],
]
)
labeled_neighbor_count = convolve(binary_labels, kernel, mode="constant", cval=0)
# Mask for voxels that have at least one labeled neighbor
mask_with_labeled_neighbors = (labeled_neighbor_count > 0) & (iparc == 0)
ind = np.argwhere(
(mask_with_labeled_neighbors != 0) & (binary_labels == 0) & (mask)
)
ind_orig = ind.copy() * 0
# Loop until no more voxels could be labeled or all the voxels are labeled
while (len(ind) > 0) & (np.array_equal(ind, ind_orig) == False):
ind_orig = ind.copy()
# Process each unlabeled voxel
for coord in ind:
x, y, z = coord
# Detecting the neighbors
neighbors = get_vox_neighbors(coord=coord, neighborhood="26", dims="3")
# Remove from motion the coordinates out of the bounding box
neighbors = neighbors[
(neighbors[:, 0] >= 0)
& (neighbors[:, 0] < iparc.shape[0])
& (neighbors[:, 1] >= 0)
& (neighbors[:, 1] < iparc.shape[1])
& (neighbors[:, 2] >= 0)
& (neighbors[:, 2] < iparc.shape[2])
]
# Labels of the neighbors
neigh_lab = iparc[neighbors[:, 0], neighbors[:, 1], neighbors[:, 2]]
if len(np.argwhere(neigh_lab > 0)) > 2:
# Remove the neighbors that are not labeled
neighbors = neighbors[neigh_lab > 0]
neigh_lab = neigh_lab[neigh_lab > 0]
unique_labels, counts = np.unique(neigh_lab, return_counts=True)
max_count = counts.max()
max_labels = unique_labels[counts == max_count]
if len(max_labels) == 1:
iparc[x, y, z] = max_labels[0]
else:
# In case of tie, choose the label of the closest neighbor
distances = [
distance.euclidean(coord, (dx, dy, dz))
for (dx, dy, dz), lbl in zip(neighbors, neigh_lab)
if lbl in max_labels
]
closest_label = max_labels[np.argmin(distances)]
iparc[x, y, z] = closest_label
# most_frequent_label = np.bincount(neigh_lab[neigh_lab != 0]).argmax()
# Create a binary array where labeled voxels are marked as 1
binary_labels = (iparc > 0).astype(int)
# Convolve with the kernel to count labeled neighbors for each voxel
labeled_neighbor_count = convolve(
binary_labels, kernel, mode="constant", cval=0
)
# Mask for voxels that have at least one labeled neighbor
mask_with_labeled_neighbors = (labeled_neighbor_count > 0) & (iparc == 0)
ind = np.argwhere(
(mask_with_labeled_neighbors != 0) & (binary_labels == 0) & (mask)
)
return iparc * mask
#####################################################################################################
[docs]
def simulate_array(
mask_array: np.ndarray,
n_volumes: int = 1,
distribution: str = "normal",
random_seed: Optional[int] = None,
**dist_params,
) -> np.ndarray:
"""
Generate a simulated array with random values at non-zero voxel positions.
This function creates a new array where non-zero voxels from the mask array
are filled with random values following a specified statistical distribution.
Parameters
----------
mask_array : numpy.ndarray
3D array used as a mask to determine where to place random values.
Shape: (N, M, P)
n_volumes : int, default=1
Number of volumes in the output array:
- If n_volumes == 1: creates a 3D array (N, M, P)
- If n_volumes > 1: creates a 4D array (N, M, P, n_volumes)
distribution : str, default='normal'
Statistical distribution for random value generation. Supported options:
- 'normal': Normal (Gaussian) distribution
- 'uniform': Uniform distribution
- 'exponential': Exponential distribution
random_seed : int, optional
Random seed for reproducible results. If None, uses system time.
**dist_params : dict
Distribution-specific parameters:
- For 'normal': loc (mean, default=0), scale (std, default=1)
- For 'uniform': low (default=0), high (default=1)
- For 'exponential': scale (default=1)
Returns
-------
numpy.ndarray
Simulated array with random values at non-zero mask positions:
- 3D array (N, M, P) if n_volumes == 1
- 4D array (N, M, P, n_volumes) if n_volumes > 1
Raises
------
ValueError
If mask_array is not 3D, distribution is unsupported,
n_volumes is invalid, or distribution parameters are invalid.
RuntimeError
If no non-zero voxels are found in the mask array.
Examples
--------
>>> # Create 3D simulation with normal distribution
>>> mask = np.random.randint(0, 2, (64, 64, 30))
>>> sim_3d = simulate_array(
... mask,
... n_volumes=1,
... distribution='normal',
... loc=0,
... scale=1,
... random_seed=42
... )
>>> # Create 4D simulation with uniform distribution
>>> sim_4d = simulate_array(
... mask,
... n_volumes=10,
... distribution='uniform',
... low=0,
... high=100,
... random_seed=42
... )
Notes
-----
- Only voxels with non-zero values in the mask array will contain random values
- All other voxels remain zero in the output
- For 4D outputs, each volume gets independent random values
"""
# Input validation
if not isinstance(mask_array, np.ndarray):
raise ValueError("mask_array must be a numpy array")
if mask_array.ndim != 3:
raise ValueError(f"mask_array must be 3D, got {mask_array.ndim}D")
if not isinstance(n_volumes, int) or n_volumes < 1:
raise ValueError("n_volumes must be a positive integer")
if distribution not in ["normal", "uniform", "exponential"]:
raise ValueError(
f"Unsupported distribution '{distribution}'. "
"Supported: 'normal', 'uniform', 'exponential'"
)
# Set random seed if provided
if random_seed is not None:
np.random.seed(random_seed)
# Create mask for non-zero voxels
mask = mask_array != 0
n_nonzero_voxels = np.sum(mask)
if n_nonzero_voxels == 0:
raise RuntimeError("No non-zero voxels found in mask array")
# Distribution parameter validation and random value generation
def _generate_random_values(size: int) -> np.ndarray:
"""Generate random values based on specified distribution."""
try:
if distribution == "normal":
loc = dist_params.get("loc", 0.0)
scale = dist_params.get("scale", 1.0)
if scale <= 0:
raise ValueError(
"Scale parameter for normal distribution must be positive"
)
return np.random.normal(loc, scale, size).astype(np.float32)
elif distribution == "uniform":
low = dist_params.get("low", 0.0)
high = dist_params.get("high", 1.0)
if low >= high:
raise ValueError(
"Low parameter must be less than high parameter for uniform distribution"
)
return np.random.uniform(low, high, size).astype(np.float32)
elif distribution == "exponential":
scale = dist_params.get("scale", 1.0)
if scale <= 0:
raise ValueError(
"Scale parameter for exponential distribution must be positive"
)
return np.random.exponential(scale, size).astype(np.float32)
except Exception as e:
raise ValueError(f"Error generating random values: {e}")
# Get mask shape
mask_shape = mask_array.shape
# Create simulated data array
if n_volumes == 1:
# 3D output
simulated_data = np.zeros(mask_shape, dtype=np.float32)
simulated_data[mask] = _generate_random_values(n_nonzero_voxels)
output_shape = mask_shape
else:
# 4D output
output_shape = mask_shape + (n_volumes,)
simulated_data = np.zeros(output_shape, dtype=np.float32)
# Fill each volume with independent random values
for volume_idx in range(n_volumes):
simulated_data[..., volume_idx][mask] = _generate_random_values(
n_nonzero_voxels
)
return simulated_data
#####################################################################################################
[docs]
def delete_volumes_from_4D_array(
in_array: np.ndarray,
vols_to_delete: List[Union[int, tuple, list, str, np.ndarray]] = None,
) -> Tuple[str, List[int]]:
"""
Remove specific volumes from a 4D array.
This function allows you to delete specified volumes from a 4D numpy array,
which is commonly used in neuroimaging data (e.g., fMRI, DTI). It supports
various formats for specifying which volumes to remove, including individual indices,
ranges, lists, and flexible string-based specifications.
The function returns the modified array and a list of removed volume indices.
It is designed to handle large 4D arrays efficiently and ensures that the
integrity of the remaining data is preserved.
4D arrays are expected to have the shape (X, Y, Z, T), where T is the number of volumes.
Parameters
----------
in_array : np.ndarray
Input 4D numpy array from which volumes will be removed.
vols_to_delete : list of int, tuple, list, np.ndarray, or str
Specification of volumes to remove from the 4D image. Supports multiple formats:
**Individual integers:**
Single volume indices to remove (0-based indexing).
**Tuples of 2 integers:**
Ranges specified as (start, end) - both endpoints are included.
Example: (5, 8) removes volumes [5, 6, 7, 8]
**Lists or numpy arrays:**
Collections of volume indices, automatically flattened.
**Strings (flexible syntax):**
Powerful string-based specification supporting:
- Single numbers: "5" → [5]
- Hyphen ranges: "8-10" → [8, 9, 10]
- Colon ranges: "11:13" → [11, 12, 13]
- Step ranges: "14:2:22" → [14, 16, 18, 20, 22]
- Comma-separated: "1, 2, 3" → [1, 2, 3]
- Mixed combinations: "0-2, 5, 10:2:14, 20" → [0, 1, 2, 5, 10, 12, 14, 20]
**Note:** All formats can be mixed in a single list.
Returns
-------
out_array : np.ndarray
The modified 4D numpy array with specified volumes removed.
vols_removed : list of int
Sorted list of all volume indices that were removed from the original image.
Useful for verification and logging purposes.
Raises
------
TypeError
If `in_array` is not a numpy ndarray or if `vols_to_delete` is not a list or convertible to a list.
ValueError
If `in_array` is not a 4D array, if `vols_to_delete` is empty,
or if any specified volume indices are out of range.
Notes
- The function checks for valid input types and dimensions.
- It handles both individual volume indices and ranges specified as tuples or strings.
- The output array retains the original shape minus the removed volumes.
- If all volumes are specified for deletion, it returns the original array unchanged.
Examples
--------
**Basic usage - Remove specific volumes:**
>>> delete_volumes_from_4D_array(
... in_array=np.random.rand(64, 64, 30, 100), # Example 4D array
... vols_to_delete=[0, 1, 2], # Remove first 3 volumes
... )
(array with shape (64, 64, 30, 97), [0, 1, 2])
**Remove a range of volumes:**
>>> delete_volumes_from_4D_array(
... in_array=np.random.rand(64, 64, 30, 100), # Example 4D array
... vols_to_delete=[(5, 8)], # Remove volumes 5 to 8 (inclusive)
... )
(array with shape (64, 64, 30, 96), [5, 6, 7, 8])
**Remove volumes using a string specification:**
>>> delete_volumes_from_4D_array(
... in_array=np.random.rand(64, 64, 30, 100), # Example 4D array
... vols_to_delete=["0-2", "5", "10:2:14", "20"] # Mixed specification
... )
(array with shape (64, 64, 30, 92), [0, 1, 2, 5, 10, 12, 14, 20])
"""
# Ensure input is a numpy array
if not isinstance(in_array, np.ndarray):
raise TypeError("Input in_array must be a numpy ndarray.")
# Check if it is a 4D array
if in_array.ndim != 4:
raise ValueError(
f"Input in_array must be a 4D array. It has {in_array.ndim} dimensions."
)
# Ensure vols_to_delete is a list
if not isinstance(vols_to_delete, list):
vols_to_delete = [vols_to_delete]
# Convert vols_to_delete to a flat list of integers
vols_to_delete = cltmisc.build_indices(vols_to_delete, nonzeros=False)
# Check if vols_to_delete is not empty
if len(vols_to_delete) == 0:
print("No volumes to delete. The volumes to delete list is empty.")
return in_array, []
# Get the dimensions of the array
dim = in_array.shape
# Get the number of volumes
nvols = dim[3]
# Check if trying to delete all volumes
if len(vols_to_delete) == nvols:
print(
"Number of volumes to delete is equal to the total number of volumes. No volumes will be deleted."
)
return in_array, []
# Check if volumes to delete are in valid range
if np.max(vols_to_delete) >= nvols:
vols_to_delete_array = np.array(vols_to_delete)
out_of_range = np.where(vols_to_delete_array >= nvols)[0]
raise ValueError(
f"Volumes out of range: {vols_to_delete_array[out_of_range]}. "
f"Values should be between 0 and {nvols-1}."
)
if np.min(vols_to_delete) < 0:
raise ValueError(
f"Volumes to delete {vols_to_delete} contain negative indices. "
f"Values should be between 0 and {nvols-1}."
)
# Get volumes to keep and remove
vols_to_remove = np.array(vols_to_delete)
vols_to_keep = np.where(np.isin(np.arange(nvols), vols_to_remove, invert=True))[0]
# Remove the specified volumes from the input array
out_array = in_array[:, :, :, vols_to_keep]
return out_array, list(vols_to_remove.tolist())
######################################################################################################
[docs]
def dilate_mm(
array: np.ndarray, affine: np.ndarray, shape: str = "cube", dilation_mm: float = 1
) -> np.ndarray:
"""
Binarize a 3D array and dilate by a given distance in millimeters.
Parameters
----------
array : 3D numpy array with values and zeros
affine : 4x4 affine matrix (voxel → mm)
shape : 'cube' for a box or 'sphere' for an ellipsoid structuring element
dilation_mm : dilation radius in millimeters
Returns
-------
dilated : binary 3D numpy array (bool)
"""
binary_array = array != 0
voxel_sizes = get_voxel_size(affine)
# Per-axis radii in voxels
radii_vox = np.array([dilation_mm / vs for vs in voxel_sizes])
r = np.ceil(radii_vox).astype(int)
if shape is None or shape == "cube":
# Box: include every voxel within ±r[i] along each axis
structure = np.ones([2 * ri + 1 for ri in r], dtype=bool)
else:
# Ellipsoid: anisotropic sphere in mm space
xi = np.arange(-r[0], r[0] + 1)
yi = np.arange(-r[1], r[1] + 1)
zi = np.arange(-r[2], r[2] + 1)
xx, yy, zz = np.meshgrid(xi, yi, zi, indexing="ij")
structure = (
(xx / radii_vox[0]) ** 2
+ (yy / radii_vox[1]) ** 2
+ (zz / radii_vox[2]) ** 2
) <= 1.0
dilated = binary_dilation(binary_array, structure=structure)
return dilated