Source code for clabtoolkit.dicomtools

import os
from glob import glob
import tarfile
import shutil
import pandas as pd
import sys
import pydicom
from datetime import datetime
from pathlib import Path
from shutil import copyfile
from concurrent.futures import ThreadPoolExecutor

import numpy as np
from rich.progress import Progress
from threading import Lock
import time

from typing import List, Optional, Union

# Importing local modules
from . import misctools as cltmisc


# simple progress indicator callback function
[docs] def progress_indicator(future): """ A simple progress indicator for the concurrent futures :param future: future object """ global lock, n_dics, n_comp, pb, pb1, subj_id, dicom_files # obtain the lock with lock: # update the counter n_comp += 1 # report progress # print(f'{tasks_completed}/{n_subj} completed, {n_subj-tasks_completed} remain.') # pb.update(task_id=pb1, description= f'[red]Completed {n_comp}/{n_subj}', completed=n_subj) pb.update( task_id=pb1, description=f"[red]{subj_id}: Finished ({n_comp}/{n_dics})", completed=n_comp, )
#################################################################################################### #################################################################################################### ############ ############ ############ ############ ############ Section 1: Methods dedicated to organice and copy DICOM files ############ ############ ############ ############ ############ #################################################################################################### ####################################################################################################
[docs] def org_conv_dicoms( in_dic_dir: str, out_dic_dir: str, demog_file: str = None, ids_file: str = None, ses_id: str = None, nosub: bool = False, booldic: bool = True, boolcomp: bool = False, force: bool = False, nthreads: int = 0, ): """ This method organizes the DICOM files in sessions and series. It could also use the demographics file to define the session ID. Parameters ---------- in_dic_dir : str Directory containing the subjects. It assumes all individual folders inside the directory as individual subjects. The subjects directory should start with 'sub-' otherwise the subjects will not be considered unless the "nosub" variable is set to True. out_dic_dir : str Output directory where the organized DICOM files will be saved. A new folder called 'Dicom' will be created inside this directory. demog_file : str, optional Demographics file containing the information about the subjects. The file should contain the following mandatory columns: 'participant_id', 'session_id', 'acq_date'. Other columns such as 'birth_date', 'sex', 'group_id' or 'scanner_id' could be added. ids_file : str, optional Text file containing the list of subject IDs to be considered. The file should contain the subject IDs in a single column. ses_id : str, optional Session ID to be added to the session name. If not provided, the session ID will be the date of the study or the session ID extracted from the demographics table. nosub : bool, optional, default=False Boolean variable to consider the subjects that do not start with 'sub-'. booldic : bool, optional, default=True Boolean variable to organize the DICOM files. If False it will leave the folders as they are. boolcomp : bool, optional, default=False Boolean variable to compress the sessions containing the organized DICOM files. If True it will compress the sessions. force : bool, optional, default=False Boolean variable to force the copy of the DICOM file if the file already exists. nthreds : int, optional, default=0 Number of threads to be used in the process. Default is 0 that means automatic selection of the number of cores. Returns ------- None This method performs file organization operations and does not return a value. Raises ------ FileNotFoundError If the input directory does not exist. ValueError If the demographics file is provided but does not contain the mandatory columns. PermissionError If there are insufficient permissions to write to the output directory. Examples -------- >>> # Basic usage with input and output directories >>> organize_dicom_files('/path/to/input/dicoms', '/path/to/output') >>> # Using demographics file and custom session ID >>> organize_dicom_files( ... in_dic_dir='/path/to/input/dicoms', ... out_dic_dir='/path/to/output', ... demog_file='/path/to/demographics.csv', ... ses_id='session01' ... ) >>> # Process only specific subjects with compression >>> organize_dicom_files( ... in_dic_dir='/path/to/input/dicoms', ... out_dic_dir='/path/to/output', ... ids_file='/path/to/subject_ids.txt', ... boolcomp=True, ... nthreds=4 ... ) """ # Declaring global variables global pb, pb1, n_dics, n_comp, lock, subj_id, dicom_files # Detecting the number of cores to be used ncores = os.cpu_count() if nthreads == 0: nthreads = ncores if nthreads > 4: nthreads = nthreads - 4 else: nthreads = 1 # Listing the subject ids inside the dicom folder my_list = os.listdir(in_dic_dir) subj_ids = [] for it in my_list: if nosub == False: if "sub-" in it: subj_ids.append(it) else: subj_ids.append(it) subj_ids.sort() # If subj_ids is empty do not continue if not subj_ids: print("No subjects found in the input directory") sys.exit() if ids_file != None: if os.path.isfile(ids_file): subj_ids = cltmisc.select_ids_from_file(subj_ids, ids_file) else: s_ids = ids_file.split(",") if nosub == False: temp_ids = [s.strip("sub-") for s in subj_ids] s_ids = cltmisc.list_intercept(s_ids, temp_ids) if not s_ids: s_ids = subj_ids else: s_ids = ["sub-" + s for s in s_ids] subj_ids = s_ids # Reading demographics demobool = False # Boolean variable to use the demographics table for the session id definition if demog_file != None: if os.path.isfile(demog_file): demobool = True # Boolean variable to use the demographics table for the session id definition demoDB = pd.read_csv(demog_file) all_ser_dirs = [] cont_subj = 0 n_subj = len(subj_ids) failed_ids = [] # Creating the progress bars with Progress() as pb: pb2 = pb.add_task("[green]Subjects...", total=n_subj) for cont_subj, subj_id in enumerate(subj_ids): # Loop along the IDs # create a lock for the counter lock = Lock() n_comp = 0 failed = [] pb.update( task_id=pb2, description=f"[green]Subject: {subj_id} ({cont_subj+1}/{n_subj})", completed=cont_subj + 1, ) subj_dir = os.path.join(in_dic_dir, subj_id) if os.path.isdir(subj_dir): # Default value for these variables for each subject gendVar = "Unknown" groupVar = "Unknown" AgeatScan = "Unknown" subTB = None date_times = [] if demobool: # Sub-table containing only the selected ID subTB = demoDB[ demoDB["participant_id"].str.contains(subj_id.split("-")[-1]) ] # Date times of all the series acquired for the current subject nrows = np.shape(subTB)[0] for nr in np.arange(0, nrows): temp = subTB.iloc[nr]["acq_date"] tempVar = temp.split("/") date_time = datetime( day=int(tempVar[1]), month=int(tempVar[0]), year=int(tempVar[2]), ) date_times.append(date_time) try: if booldic: dicom_files = cltmisc.get_all_files(subj_dir) ses_idprev = [] ser_idprev = [] n_dics = len(dicom_files) if nthreads == 1: pb1 = pb.add_task( f"[red]Copying DICOMs: Subject {subj_id} ({cont_subj + 1}/{n_subj}) ", total=n_dics, ) for cont_dic, dfiles in enumerate(dicom_files): ser_dir = copy_dicom_file( dfiles, subj_id, out_dic_dir, ses_id, date_times, demobool, subTB, force, ) all_ser_dirs.append(ser_dir) pb.update( task_id=pb1, description=f"[red]Copying DICOMs: Subject {subj_id} ({cont_dic+1}/{n_dics})", completed=cont_dic + 1, ) else: # create a progress bar for the subjects pb1 = pb.add_task( f"[red]Copying DICOMs: Subject {subj_id} ({cont_subj + 1}/{n_subj}) ", total=n_dics, ) # Adjusting the number of threads to the number of subjects if n_dics < nthreads: nthreads = n_dics # start the thread pool with ThreadPoolExecutor(nthreads) as executor: # send in the tasks # futures = [executor.submit(build_parcellation, t1s[i], # bids_dir, deriv_dir, parccode, growwm) for i in range(n_subj)] futures = [ executor.submit( copy_dicom_file, dicom_files[i], subj_id, out_dic_dir, ses_id, date_times, demobool, subTB, force, ) for i in range(n_dics) ] # futures = [executor.submit(test, i) for i in range(n_dics)] # register the progress indicator callback for future in futures: future.add_done_callback(progress_indicator) # wait for all tasks to complete else: for ses_id in os.listdir(subj_dir): # Loop along the session ses_dir = os.path.join(subj_dir, ses_id) if not ses_id[-2].isalpha(): if ( demobool ): # Adding the Visit ID to the last part o the session ID only in the DICOM Folder tempVar = ses_id.split("-")[-1] sdate_time = datetime.strptime( tempVar, "%Y%m%d%H%M%S" ) timediff = np.array(date_times) - np.array( sdate_time ) clostd = np.argmin(abs(timediff)) visitVar = subTB.iloc[clostd]["session_id"] newses_id = ses_id + visitVar newses_dir = os.path.join(subj_dir, newses_id) os.rename(ses_dir, newses_dir) ses_dir = newses_dir if os.path.isdir(ses_dir): for ser_id in os.listdir( ses_dir ): # Loop along the series serDir = os.path.join(ses_dir, ser_id) if os.path.isdir(serDir): all_ser_dirs.append(serDir) except: failed_ids.append(subj_id) print("Error at subject: " + subj_id) else: print("Subject: " + subj_id + " does not exist.") # pb.update(task_id=t2, completed=cont_subj+1) # pb.update(task_id=t2, completed=n_subj) all_ser_dirs = list(set(all_ser_dirs)) all_ser_dirs.sort() if boolcomp: compress_dicom_session(out_dic_dir)
####################################################################################################
[docs] def copy_dicom_file( dic_file: str, subj_id: str, out_dic_dir: str, ses_id: str = None, date_times: list = None, demogbool: bool = False, demog_tab: pd.DataFrame = None, force: bool = False, ): """ Function to copy the DICOM files to the output directory. Parameters ----------- dic_file: str Path to the DICOM file. subj_id: str Subject ID. out_dic_dir: str Output directory where the DICOM files will be saved. ses_id: str Session ID to be added to the session name. If not provided, the session ID will be the date of the study or the session ID extracted from the demographics table. date_times: list List containing the date and time of all the studies for that subject ID. demogbool: bool Boolean variable to use the demographics table for the session id definition. demog_tab: pd.DataFrame Demographics table containing the information about the subjects. force: bool Boolean variable to force the copy of the DICOM file. Returns -------- dest_dic_dir: str Destination directory where the DICOM file was copied. """ try: dataset = pydicom.dcmread(dic_file) dic_path = os.path.dirname(dic_file) dic_name = os.path.basename(dic_file) # Extracting the study date from DICOM file attributes = dataset.dir("") if attributes: sdate = dataset.data_element("StudyDate").value stime = dataset.data_element("StudyTime").value year = int(sdate[:4]) month = int(sdate[4:6]) day = int(sdate[6:8]) # Date format sdate_time = datetime(day=day, month=month, year=year) # Creating default current Session ID ses_id, ser_id = create_session_series_names(dataset) if not ses_id == None: ses_id = "ses-" + ses_id if "000000" in ses_id and ser_id in ser_idprev: ses_id = ses_idprev # visitId = dfiles.split('/')[8].split('-')[1] # ses_id = 'ses-'+ visitId ses_idprev = ses_id ser_idprev = ser_id # Changing the session Id in case we have access to the demographics file if demogbool: timediff = np.array(date_times) - np.array(sdate_time) clostd = np.argmin(abs(timediff)) visitVar = demog_tab.iloc[clostd]["session_id"] ses_id = ses_id + visitVar dest_dic_dir = os.path.join(out_dic_dir, subj_id, ses_id, ser_id) # Create the destination path if not os.path.isdir(dest_dic_dir): path = Path(dest_dic_dir) path.mkdir(parents=True, exist_ok=True) # print(newPath) dest_dic = os.path.join(dest_dic_dir, dic_name) if force: if os.path.isfile(dest_dic): os.remove(dest_dic) else: copyfile(dic_file, dest_dic) else: if not os.path.isfile(dest_dic): copyfile(dic_file, dest_dic) except pydicom.errors.InvalidDicomError: print("Error at file at path : " + dic_file) pass return dest_dic_dir
####################################################################################################
[docs] def create_session_series_names(dataset): """ Function to create names from a DICOM object. Parameters ---------- dataset: pydicom.dataset.FileDataset DICOM dataset object. Returns ------- ses_id: str Session ID. ser_id: str Series ID. """ # % This function creates the session and the series name for a dicom object # Extracting the study date from DICOM file attributes = dataset.dir("") sdate = dataset.data_element("StudyDate").value stime = dataset.data_element("StudyTime").value ########### ========== Creating current Session ID if sdate and stime: ses_id = str(sdate) + str(int(np.floor(float(stime)))) elif sdate and not stime: ses_id = str(sdate) + "000000" elif stime and not sdate: ses_id = "00000000" + str(stime) ########### ========== Creating current Series ID if any("SeriesDescription" in s for s in attributes): ser_id = dataset.data_element("SeriesDescription").value elif any("SeriesDescription" in s for s in attributes) == False and any( "SequenceName" in s for s in attributes ): ser_id = dataset.data_element("SequenceName").value elif ( any("SeriesDescription" in s for s in attributes) == False and any("SequenceName" in s for s in attributes) == False and any("ProtocolName" in s for s in attributes) ): ser_id = dataset.data_element("ProtocolName").value elif ( any("SeriesDescription" in s for s in attributes) == False and any("SequenceName" in s for s in attributes) == False and any("ProtocolName" in s for s in attributes) == False and any("ScanningSequence" in s for s in attributes) and any("SequenceVariant" in s for s in attributes) ): ser_id = ( dataset.data_element("ScanningSequence").value + "_" + dataset.data_element("SequenceVariant").value ) else: ser_id = "NoSerName" # Removing and substituting unwanted characters ser_id = ser_id.replace(" ", "_") ser_id = ser_id.replace("/", "_") # This function removes some characters from a string ser2rem = [ "*", "+", "(", ")", "=", ",", ">", "<", ";", ":", '"', "'", "?", "!", "@", "#", "$", "%", "^", "&", "*", ] for cad in ser2rem: ser_id = ser_id.replace(cad, "") # Removing the dupplicated _ characters and replacing the remaining by - ser_id = cltmisc.remove_consecutive_duplicates(ser_id, "_") ser_id = ser_id.replace("_", "-") if any("SeriesNumber" in s for s in attributes): serNumb = dataset.data_element("SeriesNumber").value # Adding the series number sNumb = f"{int(serNumb):04d}" ser_id = sNumb + "-" + ser_id return ses_id, ser_id
####################################################################################################
[docs] def uncompress_dicom_session( dic_dir: str, boolrmtar: bool = False, subj_ids: Optional[Union[str, List[str]]] = None, ) -> List[str]: """ Uncompress session folders containing the DICOM files for all the series. Parameters ---------- dic_dir : str Directory containing the subjects. It assumes an organization in: <subj_id>/<session_id>/<series_id> boolrmtar : bool, optional, default=False Boolean variable to remove the tar files after uncompressing the session. subj_ids : str, list of str, or None, optional Subject IDs to be considered. Can be: - None: consider all subjects in the directory (default) - str: path to text file containing subject IDs (one per line) - list of str: explicit list of subject IDs Returns ------- list of str List of tar files that failed to be uncompressed. Empty list if all successful. Raises ------ FileNotFoundError If the specified directory does not exist. ValueError If subj_ids is not None, str, or list, or if subject IDs file cannot be read. tarfile.TarError If there are issues with reading or extracting tar files. PermissionError If there are insufficient permissions to extract files or remove tar archives. OSError If there are filesystem-related errors during extraction. Examples -------- >>> # Basic usage - uncompress all sessions in directory >>> failed = uncompress_dicom_session('/path/to/dicom/directory') >>> if not failed: ... print("All sessions uncompressed successfully") >>> # Uncompress sessions and remove tar files after extraction >>> failed = uncompress_dicom_session('/path/to/dicom/directory', boolrmtar=True) >>> # Uncompress sessions for specific subjects only >>> failed = uncompress_dicom_session( ... dic_dir='/path/to/dicom/directory', ... subj_ids=['sub-001', 'sub-002', 'sub-003'] ... ) >>> # Use subject IDs from file >>> failed = uncompress_dicom_session( ... dic_dir='/path/to/dicom/directory', ... subj_ids='/path/to/subject_ids.txt', ... boolrmtar=True ... ) """ # Validate input directory dic_path = Path(dic_dir) if not dic_path.exists(): raise FileNotFoundError(f"Directory {dic_dir} does not exist") if not dic_path.is_dir(): raise ValueError(f"{dic_dir} is not a directory") # Process subject IDs if subj_ids is None: # Get all subjects with 'sub-' prefix subj_ids = [ item.name for item in dic_path.iterdir() if item.is_dir() and item.name.startswith("sub-") ] subj_ids.sort() elif isinstance(subj_ids, str): # Read subject IDs from file try: with open(subj_ids, "r", encoding="utf-8") as file: subj_ids = [line.strip() for line in file if line.strip()] except FileNotFoundError: raise FileNotFoundError(f"Subject IDs file {subj_ids} not found") except Exception as e: raise ValueError(f"Error reading subject IDs file: {e}") elif isinstance(subj_ids, list): # Validate list elements if not all(isinstance(subj_id, str) for subj_id in subj_ids): raise ValueError("All subject IDs must be strings") else: raise ValueError("subj_ids must be None, str (file path), or list of str") if not subj_ids: print("No subjects found to process") return [] n_subj = len(subj_ids) failed_sessions = [] with Progress() as pb: task = pb.add_task("[green]Uncompressing sessions...", total=n_subj) for i, subj_id in enumerate(subj_ids): subj_dir = dic_path / subj_id pb.update( task_id=task, description=f"[green]Processing {subj_id} ({i+1}/{n_subj})", completed=i, ) # Skip if subject directory doesn't exist if not subj_dir.exists(): print(f"Warning: Subject directory {subj_dir} not found, skipping...") continue # Find all tar.gz files in subject directory tar_files = list(subj_dir.glob("*.tar.gz")) for tar_file in tar_files: try: # Use Python's tarfile module for better error handling with tarfile.open(tar_file, "r:gz") as tar: # Extract to subject directory tar.extractall(path=subj_dir) # Remove tar file if requested if boolrmtar: tar_file.unlink() except tarfile.TarError as e: print(f"Error extracting {tar_file}: {e}") failed_sessions.append(str(tar_file)) except PermissionError as e: print(f"Permission error with {tar_file}: {e}") failed_sessions.append(str(tar_file)) except Exception as e: print(f"Unexpected error with {tar_file}: {e}") failed_sessions.append(str(tar_file)) pb.update( task_id=task, description=f"[green]Completed uncompression", completed=n_subj, ) # Report results if failed_sessions: print("\nTHE PROCESS FAILED TO UNCOMPRESS THE FOLLOWING TAR FILES:") for failed_file in failed_sessions: print(f" - {failed_file}") else: print("\nAll sessions uncompressed successfully!") print(f"\nProcessed {n_subj} subjects with {len(failed_sessions)} failures.") return failed_sessions
####################################################################################################
[docs] def compress_dicom_session( dic_dir: str, subj_ids: Optional[Union[str, List[str]]] = None, remove_original: bool = True, ) -> List[str]: """ Compress session folders containing DICOM files into tar.gz archives. Parameters ---------- dic_dir : str Directory containing the subjects. It assumes an organization in: <subj_id>/<session_id>/<series_id> subj_ids : str, list of str, or None, optional Subject IDs to be considered. Can be: - None: consider all subjects in the directory (default) - str: path to text file containing subject IDs (one per line) - list of str: explicit list of subject IDs remove_original : bool, optional, default=True Whether to remove the original session directories after successful compression. Returns ------- list of str List of session directories that failed to be compressed. Empty list if all successful. Raises ------ FileNotFoundError If the specified directory does not exist. ValueError If subj_ids is not None, str, or list, or if subject IDs file cannot be read. tarfile.TarError If there are issues with creating or writing tar files. PermissionError If there are insufficient permissions to compress files or remove directories. OSError If there are filesystem-related errors during compression. Examples -------- >>> # Basic usage - compress all sessions in directory >>> failed = compress_dicom_session('/path/to/dicom/directory') >>> if not failed: ... print("All sessions compressed successfully") >>> # Compress sessions but keep original directories >>> failed = compress_dicom_session( ... dic_dir='/path/to/dicom/directory', ... remove_original=False ... ) >>> # Compress sessions for specific subjects only >>> failed = compress_dicom_session( ... dic_dir='/path/to/dicom/directory', ... subj_ids=['sub-001', 'sub-002', 'sub-003'] ... ) >>> # Use subject IDs from file >>> failed = compress_dicom_session( ... dic_dir='/path/to/dicom/directory', ... subj_ids='/path/to/subject_ids.txt' ... ) """ # Validate input directory dic_path = Path(dic_dir) if not dic_path.exists(): raise FileNotFoundError(f"Directory {dic_dir} does not exist") if not dic_path.is_dir(): raise ValueError(f"{dic_dir} is not a directory") # Process subject IDs if subj_ids is None: # Get all subjects with 'sub-' prefix subj_ids = [ item.name for item in dic_path.iterdir() if item.is_dir() and item.name.startswith("sub-") ] subj_ids.sort() elif isinstance(subj_ids, str): # Read subject IDs from file try: with open(subj_ids, "r", encoding="utf-8") as file: subj_ids = [line.strip() for line in file if line.strip()] except FileNotFoundError: raise FileNotFoundError(f"Subject IDs file {subj_ids} not found") except Exception as e: raise ValueError(f"Error reading subject IDs file: {e}") elif isinstance(subj_ids, list): # Validate list elements if not all(isinstance(subj_id, str) for subj_id in subj_ids): raise ValueError("All subject IDs must be strings") else: raise ValueError("subj_ids must be None, str (file path), or list of str") if not subj_ids: print("No subjects found to process") return [] n_subj = len(subj_ids) failed_sessions = [] total_sessions = 0 compressed_sessions = 0 with Progress() as pb: task = pb.add_task("[green]Compressing sessions...", total=n_subj) for i, subj_id in enumerate(subj_ids): subj_dir = dic_path / subj_id pb.update( task_id=task, description=f"[green]Processing {subj_id} ({i+1}/{n_subj})", completed=i, ) # Skip if subject directory doesn't exist if not subj_dir.exists(): print(f"Warning: Subject directory {subj_dir} not found, skipping...") continue # Find all session directories (starting with 'ses-') session_dirs = [ item for item in subj_dir.iterdir() if item.is_dir() and item.name.startswith("ses-") ] total_sessions += len(session_dirs) for ses_dir in session_dirs: tar_file_path = ses_dir.with_suffix(".tar.gz") # Skip if tar file already exists if tar_file_path.exists(): print(f"Warning: {tar_file_path} already exists, skipping...") continue try: # Create tar.gz archive using Python's tarfile module with tarfile.open(tar_file_path, "w:gz") as tar: # Add the session directory to the archive # Use arcname to preserve the directory structure tar.add(ses_dir, arcname=ses_dir.name) # Remove original directory if requested and compression succeeded if remove_original: shutil.rmtree(ses_dir) compressed_sessions += 1 except tarfile.TarError as e: print(f"Error compressing {ses_dir}: {e}") failed_sessions.append(str(ses_dir)) # Clean up partially created tar file if tar_file_path.exists(): try: tar_file_path.unlink() except Exception: pass except PermissionError as e: print(f"Permission error with {ses_dir}: {e}") failed_sessions.append(str(ses_dir)) except Exception as e: print(f"Unexpected error with {ses_dir}: {e}") failed_sessions.append(str(ses_dir)) pb.update( task_id=task, description=f"[green]Completed compression", completed=n_subj ) # Report results if failed_sessions: print("\nTHE PROCESS FAILED TO COMPRESS THE FOLLOWING SESSIONS:") for failed_session in failed_sessions: print(f" - {failed_session}") else: print("\nAll sessions compressed successfully!") print( f"\nProcessed {n_subj} subjects, {compressed_sessions}/{total_sessions} sessions compressed successfully." ) return failed_sessions
#####################################################################################################
[docs] def get_dicom_info( dicom_file: str, tags: Optional[Union[str, List[str]]] = None, missing_tag_behavior: str = "warn", ) -> dict: """ Extracts metadata from a DICOM file and returns it as a dictionary. Can extract all metadata or only specific tags based on the parameters provided. Parameters ---------- dicom_file : str Path to the DICOM file. tags : str, list of str, or None, optional DICOM tags (keywords) to extract. If None, all tags are extracted. Can be a single tag as a string or multiple tags as a list. Default is None (extract all tags). missing_tag_behavior : {'warn', 'ignore', 'raise'}, optional How to handle missing tags: - 'warn': Print a warning and skip the tag (default) - 'ignore': Silently skip the tag - 'raise': Raise an AttributeError Default is 'warn'. Returns ------- dict A dictionary containing the extracted metadata from the DICOM file. Keys are tag keywords, values are the corresponding tag values. Raises ------ FileNotFoundError If the specified DICOM file does not exist. pydicom.errors.InvalidDicomError If the specified file is not a valid DICOM file. AttributeError If a requested tag does not exist and missing_tag_behavior='raise'. ValueError If missing_tag_behavior is not one of {'warn', 'ignore', 'raise'}. Examples -------- Extract all metadata: >>> metadata = get_dicom_info('/path/to/dicom/file.dcm') >>> print(metadata.keys()) dict_keys(['PatientID', 'StudyDate', 'Modality', ...]) Extract specific tags (as list): >>> tags_to_extract = ['PatientID', 'StudyDate', 'Modality'] >>> metadata = get_dicom_info('/path/to/dicom/file.dcm', tags=tags_to_extract) >>> print(metadata) {'PatientID': '12345', 'StudyDate': '20210101', 'Modality': 'MR'} Extract a single tag (as string): >>> metadata = get_dicom_info('/path/to/dicom/file.dcm', tags='PatientID') >>> print(metadata) {'PatientID': '12345'} Handle missing tags: >>> metadata = get_dicom_info( ... '/path/to/dicom/file.dcm', ... tags=['PatientID', 'NonExistentTag'], ... missing_tag_behavior='ignore' ... ) >>> print(metadata) {'PatientID': '12345'} """ # Validate missing_tag_behavior parameter valid_behaviors = {"warn", "ignore", "raise"} if missing_tag_behavior not in valid_behaviors: raise ValueError( f"missing_tag_behavior must be one of {valid_behaviors}, " f"got '{missing_tag_behavior}'" ) # Validate file existence dicom_path = Path(dicom_file) if not dicom_path.exists(): raise FileNotFoundError(f"DICOM file not found: {dicom_file}") # Read DICOM file try: dataset = pydicom.dcmread(dicom_file) except pydicom.errors.InvalidDicomError: raise pydicom.errors.InvalidDicomError( f"File is not a valid DICOM file: {dicom_file}" ) except Exception as e: raise Exception(f"Error reading DICOM file {dicom_file}: {e}") # If no tags specified, return all metadata if tags is None: metadata = { elem.keyword: elem.value for elem in dataset.iterall() if elem.keyword } return metadata # Convert single tag string to list for uniform processing if isinstance(tags, str): tags = [tags] # Validate that tags is now a list if not isinstance(tags, list): raise TypeError( f"tags must be a string, list of strings, or None. Got {type(tags)}" ) # Extract specified tags metadata = {} for tag in tags: if not isinstance(tag, str): raise TypeError( f"Each tag must be a string. Got {type(tag)} for tag: {tag}" ) try: # Check if tag exists in dataset if hasattr(dataset, tag): metadata[tag] = getattr(dataset, tag) else: # Tag doesn't exist - handle based on behavior parameter if missing_tag_behavior == "raise": raise AttributeError( f"Tag '{tag}' not found in DICOM file: {dicom_file}" ) elif missing_tag_behavior == "warn": print(f"Warning: Tag '{tag}' not found in {dicom_file}, skipping.") # 'ignore' behavior: do nothing, just skip the tag except Exception as e: if missing_tag_behavior == "raise": raise elif missing_tag_behavior == "warn": print(f"Warning: Error extracting tag '{tag}': {e}") # 'ignore' behavior: continue silently return metadata