Source code for clabtoolkit.networktools

import numpy as np
from scipy.sparse import csr_matrix, csgraph
from typing import Tuple, Union, Optional, List
import warnings
from collections import deque


####################################################################################################
####################################################################################################
############                                                                            ############
############                                                                            ############
############  Section 1: Methods dedicated create CSR graphs from different sources     ############
############  CSR (Compressed Sparse Row) format is efficient for graph representation  ############
############  and is widely used in scientific computing and machine learning.          ############
############                                                                            ############
############                                                                            ############
####################################################################################################
####################################################################################################
[docs] def adjacency_matrix_to_csr(adj_matrix: np.ndarray) -> csr_matrix: """ Convert a dense adjacency matrix to CSR (Compressed Sparse Row) format. This method takes a square adjacency matrix where non-zero entries represent connections between vertices and converts it to an efficient sparse representation. Parameters ---------- adj_matrix : np.ndarray A square 2D numpy array representing the adjacency matrix. Shape should be (n_vertices, n_vertices) where n_vertices is the number of vertices in the graph. Non-zero values represent edge weights. Returns ------- csr_matrix A scipy sparse CSR matrix representing the same graph connectivity. Raises ------ ValueError If the input matrix is not 2D or not square. TypeError If the input is not a numpy array. Examples -------- >>> import numpy as np >>> # Create a simple 4-vertex graph >>> adj = np.array([[0, 1, 1, 0], ... [1, 0, 1, 1], ... [1, 1, 0, 1], ... [0, 1, 1, 0]]) >>> csr_graph = adjacency_matrix_to_csr(adj) >>> print(csr_graph.toarray()) [[0 1 1 0] [1 0 1 1] [1 1 0 1] [0 1 1 0]] >>> # With weighted edges >>> adj_weighted = np.array([[0, 2.5, 1.0, 0], ... [2.5, 0, 0, 3.2], ... [1.0, 0, 0, 1.8], ... [0, 3.2, 1.8, 0]]) >>> csr_weighted = adjacency_matrix_to_csr(adj_weighted) >>> print(f"Non-zero values: {csr_weighted.data}") Non-zero values: [2.5 1. 2.5 3.2 1. 1.8 3.2 1.8] """ if not isinstance(adj_matrix, np.ndarray): raise TypeError("Input must be a numpy array") if adj_matrix.ndim != 2: raise ValueError("Input must be a 2D array") if adj_matrix.shape[0] != adj_matrix.shape[1]: raise ValueError("Adjacency matrix must be square") return csr_matrix(adj_matrix)
####################################################################################################
[docs] def triangulated_mesh_to_csr( faces: np.ndarray, n_vertices: Optional[int] = None ) -> csr_matrix: """ Convert triangulated mesh faces to a CSR graph representation. This method constructs a graph where vertices are connected if they share an edge in any triangle face. All edge weights are set to 1. The resulting graph represents the 1-ring neighborhood connectivity of the mesh. Parameters ---------- faces : np.ndarray A 2D numpy array of shape (n_faces, 3) where each row contains the indices of three vertices forming a triangle. Vertex indices should be non-negative integers. n_vertices : int, optional Total number of vertices in the mesh. If None, it will be inferred as the maximum vertex index + 1. Providing this parameter is recommended for meshes with isolated vertices. Returns ------- csr_matrix A scipy sparse CSR matrix of shape (n_vertices, n_vertices) where entry (i,j) is 1 if vertices i and j are connected by an edge in the mesh, and 0 otherwise. The matrix is symmetric for undirected graphs. Raises ------ ValueError If faces array is not 2D, doesn't have 3 columns, contains negative indices, or if n_vertices is less than the maximum vertex index. TypeError If faces is not a numpy array or contains non-integer values. Examples -------- >>> import numpy as np >>> # Define a simple tetrahedron (4 faces, 4 vertices) >>> faces = np.array([[0, 1, 2], ... [0, 1, 3], ... [0, 2, 3], ... [1, 2, 3]]) >>> csr_graph = triangulated_mesh_to_csr(faces) >>> print("Adjacency matrix:") >>> print(csr_graph.toarray()) Adjacency matrix: [[0 1 1 1] [1 0 1 1] [1 1 0 1] [1 1 1 0]] >>> # Triangle mesh with explicit vertex count >>> faces_triangle = np.array([[0, 1, 2]]) >>> csr_triangle = triangulated_mesh_to_csr(faces_triangle, n_vertices=5) >>> print(f"Shape: {csr_triangle.shape}") >>> print("Connections for triangle [0,1,2]:") >>> print(csr_triangle.toarray()) Shape: (5, 5) Connections for triangle [0,1,2]: [[0 1 1 0 0] [1 0 1 0 0] [1 1 0 0 0] [0 0 0 0 0] [0 0 0 0 0]] """ if not isinstance(faces, np.ndarray): raise TypeError("Faces must be a numpy array") if faces.ndim != 2: raise ValueError("Faces array must be 2D") if faces.shape[1] != 3: raise ValueError( "Faces array must have exactly 3 columns for triangulated mesh" ) if not np.issubdtype(faces.dtype, np.integer): raise TypeError("Faces array must contain integer vertex indices") if np.any(faces < 0): raise ValueError("Vertex indices must be non-negative") max_vertex_idx = np.max(faces) if n_vertices is None: n_vertices = max_vertex_idx + 1 elif n_vertices <= max_vertex_idx: raise ValueError( f"n_vertices ({n_vertices}) must be greater than maximum vertex index ({max_vertex_idx})" ) # Extract all edges from triangular faces # Each triangle (v0, v1, v2) generates edges: (v0,v1), (v1,v2), (v0,v2) edges = [] for face in faces: v0, v1, v2 = face edges.extend([(v0, v1), (v1, v2), (v0, v2)]) edges = np.array(edges) # Create symmetric edges (undirected graph) edges_symmetric = np.vstack([edges, edges[:, [1, 0]]]) # Remove duplicate edges and create CSR matrix row_indices = edges_symmetric[:, 0] col_indices = edges_symmetric[:, 1] data = np.ones(len(edges_symmetric), dtype=int) # Create sparse matrix and eliminate duplicates by summing csr_graph = csr_matrix( (data, (row_indices, col_indices)), shape=(n_vertices, n_vertices) ) # Convert to binary (in case of duplicate edges) csr_graph.data = (csr_graph.data > 0).astype(int) return csr_graph
####################################################################################################
[docs] def edges_to_csr( edges: np.ndarray, edge_values: np.ndarray = None, n_vertices: Optional[int] = None, symmetric: bool = True, ) -> csr_matrix: """ Convert an edge list with values to CSR graph representation. This method constructs a graph from a list of edges and their corresponding weights/values. Useful for creating graphs from pre-computed edge lists. Parameters ---------- edges : np.ndarray A 2D numpy array of shape (n_edges, 2) where each row contains the indices of two connected vertices. Vertex indices should be non-negative integers. edge_values : np.ndarray A 1D numpy array of length n_edges containing the weight/value for each edge. Values can be any numeric type (int, float, etc.). n_vertices : int, optional Total number of vertices in the graph. If None, it will be inferred as the maximum vertex index + 1. Providing this parameter is recommended for graphs with isolated vertices. symmetric : bool, default=True If True, creates an undirected graph by adding reverse edges with the same values. If False, creates a directed graph using only the provided edges. Returns ------- csr_matrix A scipy sparse CSR matrix of shape (n_vertices, n_vertices) where entry (i,j) contains the weight of the edge from vertex i to vertex j. For undirected graphs (symmetric=True), the matrix is symmetric. Raises ------ ValueError If edges array is not 2D, doesn't have 2 columns, contains negative indices, edge_values length doesn't match number of edges, or if n_vertices is less than the maximum vertex index. TypeError If edges contains non-integer values or edge_values is not numeric. Examples -------- >>> import numpy as np >>> # Create a simple weighted graph >>> edges = np.array([[0, 1], ... [1, 2], ... [0, 2]]) >>> values = np.array([2.5, 1.0, 3.2]) >>> csr_graph = edges_to_csr(edges, values) >>> print("Symmetric weighted graph:") >>> print(csr_graph.toarray()) Symmetric weighted graph: [[0. 2.5 3.2] [2.5 0. 1. ] [3.2 1. 0. ]] >>> # Directed graph example >>> edges_directed = np.array([[0, 1], [1, 2]]) >>> values_directed = np.array([0.8, 1.5]) >>> csr_directed = edges_to_csr(edges_directed, values_directed, symmetric=False) >>> print("Directed graph:") >>> print(csr_directed.toarray()) Directed graph: [[0. 0.8 0. ] [0. 0. 1.5] [0. 0. 0. ]] >>> # Handle duplicate edges (values are summed) >>> edges_dup = np.array([[0, 1], [0, 1], [1, 0]]) >>> values_dup = np.array([1.0, 2.0, 0.5]) >>> csr_dup = edges_to_csr(edges_dup, values_dup) >>> print("Duplicate edges (summed):") >>> print(csr_dup.toarray()) Duplicate edges (summed): [[0. 3.5] [3.5 0. ]] """ if not isinstance(edges, np.ndarray): raise TypeError("Edges must be a numpy array") if edge_values is None: edge_values = np.ones(len(edges)) if not isinstance(edge_values, np.ndarray): raise TypeError("Edge values must be a numpy array") if edges.ndim != 2: raise ValueError("Edges array must be 2D") if edges.shape[1] != 2: raise ValueError("Edges array must have exactly 2 columns") if edge_values.ndim != 1: raise ValueError("Edge values must be a 1D array") if len(edges) != len(edge_values): raise ValueError( f"Number of edges ({len(edges)}) must match number of edge values ({len(edge_values)})" ) if not np.issubdtype(edges.dtype, np.integer): raise TypeError("Edges array must contain integer vertex indices") if not np.issubdtype(edge_values.dtype, np.number): raise TypeError("Edge values must be numeric") if np.any(edges < 0): raise ValueError("Vertex indices must be non-negative") if len(edges) == 0: warnings.warn("Empty edge list provided", UserWarning) if n_vertices is None: n_vertices = 0 return csr_matrix((n_vertices, n_vertices)) max_vertex_idx = np.max(edges) if n_vertices is None: n_vertices = max_vertex_idx + 1 elif n_vertices <= max_vertex_idx: raise ValueError( f"n_vertices ({n_vertices}) must be greater than maximum vertex index ({max_vertex_idx})" ) # Prepare edge data if symmetric: # Add reverse edges for undirected graph all_edges = np.vstack([edges, edges[:, [1, 0]]]) all_values = np.concatenate([edge_values, edge_values]) else: all_edges = edges all_values = edge_values row_indices = all_edges[:, 0] col_indices = all_edges[:, 1] # Create CSR matrix (duplicate edges will be summed automatically) csr_graph = csr_matrix( (all_values, (row_indices, col_indices)), shape=(n_vertices, n_vertices) ) return csr_graph
#####################################################################################################
[docs] def edges_to_components(edges: np.ndarray, verbose: bool = True): """ Compute connected components from an edge array of arbitrary vertex indices. Components are labelled in decreasing order of size (0 = largest). Parameters ---------- edges : np.ndarray, shape (n_edges, 2) Array of vertex index pairs. Indices can be global/non-contiguous. verbose : bool If True, print component sizes to the console. If False, suppress output. Returns ------- n_components : int Number of connected components. labels : np.ndarray, shape (n_vert, 2) Column 0: original vertex index. Column 1: component label (0 = largest). sizes : dict {component_label: size} sorted by decreasing size. """ unique_verts = np.unique(edges) n_vert = len(unique_verts) global_to_local = np.full(unique_verts.max() + 1, fill_value=-1, dtype=np.int64) global_to_local[unique_verts] = np.arange(n_vert) local_edges = global_to_local[edges] conn_matrix = edges_to_csr(local_edges) # n_components, local_labels = csgraph.connected_components( # conn_matrix, directed=False # ) n_components, labels, sizes = connected_components(conn_matrix, verbose=verbose) # Map local labels back to global vertex indices in case of non-contiguous indices labels[:, 0] = unique_verts[labels[:, 0]] return n_components, labels, sizes
#####################################################################################################
[docs] def connected_components( csr_graph: csr_matrix, verbose: bool = True ) -> Tuple[int, np.ndarray, dict]: """ Find connected components in a CSR graph representation. This method identifies all connected components in an undirected graph represented as a CSR matrix. A connected component is a maximal set of vertices such that there is a path between every pair of vertices in the set. Parameters ---------- csr_graph : csr_matrix A scipy sparse CSR matrix representing the graph adjacency matrix. Should be square with shape (n_vertices, n_vertices). For undirected graphs, the matrix should be symmetric. Non-zero entries represent connections. verbose : bool If True, print the number of components and their sizes to the console. If False, suppress output. Returns ------- n_components : int The total number of connected components found in the graph. labels : np.ndarray, shape (n_vertices, 2) A 2D array where the first column contains the original vertex indices (0 to n_vertices-1) and the second column contains the corresponding component label (0 for largest component). sizes : dict A dictionary mapping component labels to their sizes (number of vertices), sorted by decreasing size. Raises ------ TypeError If csr_graph is not a scipy csr_matrix. ValueError If csr_graph is not square, method is not recognized, or graph is empty. UserWarning If the graph appears to be directed (non-symmetric) when undirected behavior is expected. Examples -------- >>> import numpy as np >>> from scipy.sparse import csr_matrix >>> >>> # Create a graph with 3 components: [0,1], [2,3,4], [5] >>> row = np.array([0, 1, 2, 2, 3, 3, 4, 4]) >>> col = np.array([1, 0, 3, 4, 2, 4, 2, 3]) >>> data = np.ones(len(row)) >>> graph = csr_matrix((data, (row, col)), shape=(6, 6)) >>> >>> components = connected_components(graph) >>> print("Connected components:") >>> for i, comp in enumerate(components): ... print(f" Component {i}: {comp}") Connected components: Component 0: [0, 1] Component 1: [2, 3, 4] Component 2: [5] >>> # Get component labels as well >>> components, labels = connected_components(graph, return_labels=True) >>> print(f"Component labels: {labels}") >>> print(f"Vertex 3 belongs to component: {labels[3]}") Component labels: [0 0 1 1 1 2] Vertex 3 belongs to component: 1 >>> # Using different algorithms >>> comp_bfs = connected_components(graph, method="bfs") >>> comp_dfs = connected_components(graph, method="dfs") >>> # All methods should give the same result (possibly in different order) >>> # Example with weighted edges (weights are ignored for connectivity) >>> weighted_graph = edges_to_csr( ... np.array([[0, 1], [1, 2]]), ... np.array([2.5, 3.0]) ... ) >>> components = connected_components(weighted_graph) >>> print(f"Weighted graph components: {components}") Weighted graph components: [[0, 1, 2]] Notes ----- - Edge weights are ignored; only connectivity matters. - Self-loops (diagonal entries) are ignored for component detection. - For directed graphs, this finds weakly connected components (treating edges as undirected). - Empty components (isolated vertices) are included as single-vertex components. """ if not isinstance(csr_graph, csr_matrix): raise TypeError("Input must be a scipy csr_matrix") if csr_graph.shape[0] != csr_graph.shape[1]: raise ValueError("CSR graph must be square") n_nodes = csr_graph.shape[0] if n_nodes == 0: raise ValueError("Graph cannot be empty") # Check if graph is symmetric (undirected) if not np.allclose(csr_graph.data, csr_graph.T.data) or not np.array_equal( csr_graph.indices, csr_graph.T.indices ): warnings.warn( "Graph appears to be directed (non-symmetric). " "Finding weakly connected components.", UserWarning, ) # Ensure we work with the full connectivity (treat as undirected) symmetric_graph = csr_graph + csr_graph.T symmetric_graph.data = (symmetric_graph.data > 0).astype(int) n_components, local_labels = csgraph.connected_components( symmetric_graph, directed=False ) raw_sizes = np.bincount(local_labels) rank_map = np.argsort(raw_sizes)[::-1] # old label → rank position inv_map = np.empty_like(rank_map) inv_map[rank_map] = np.arange(n_components) # old label → new label sorted_labels = inv_map[local_labels] labels = np.column_stack([np.arange(n_nodes), sorted_labels]) sizes = { new_label: int(raw_sizes[old_label]) for new_label, old_label in enumerate(rank_map) } if verbose: print(f"Components : {n_components}") for label, size in sizes.items(): print(f" └─ Component {label:>3d} : {size} vertices") return n_components, labels, sizes