import numpy as np
from scipy.sparse import csr_matrix
from typing import Tuple, Union, Optional, List
import warnings
from collections import deque
####################################################################################################
####################################################################################################
############ ############
############ ############
############ Section 1: Methods dedicated create CSR graphs from different sources ############
############ CSR (Compressed Sparse Row) format is efficient for graph representation ############
############ and is widely used in scientific computing and machine learning. ############
############ ############
############ ############
####################################################################################################
####################################################################################################
[docs]
def adjacency_matrix_to_csr(adj_matrix: np.ndarray) -> csr_matrix:
"""
Convert a dense adjacency matrix to CSR (Compressed Sparse Row) format.
This method takes a square adjacency matrix where non-zero entries represent
connections between vertices and converts it to an efficient sparse representation.
Parameters
----------
adj_matrix : np.ndarray
A square 2D numpy array representing the adjacency matrix.
Shape should be (n_vertices, n_vertices) where n_vertices is the number
of vertices in the graph. Non-zero values represent edge weights.
Returns
-------
csr_matrix
A scipy sparse CSR matrix representing the same graph connectivity.
Raises
------
ValueError
If the input matrix is not 2D or not square.
TypeError
If the input is not a numpy array.
Examples
--------
>>> import numpy as np
>>> # Create a simple 4-vertex graph
>>> adj = np.array([[0, 1, 1, 0],
... [1, 0, 1, 1],
... [1, 1, 0, 1],
... [0, 1, 1, 0]])
>>> csr_graph = adjacency_matrix_to_csr(adj)
>>> print(csr_graph.toarray())
[[0 1 1 0]
[1 0 1 1]
[1 1 0 1]
[0 1 1 0]]
>>> # With weighted edges
>>> adj_weighted = np.array([[0, 2.5, 1.0, 0],
... [2.5, 0, 0, 3.2],
... [1.0, 0, 0, 1.8],
... [0, 3.2, 1.8, 0]])
>>> csr_weighted = adjacency_matrix_to_csr(adj_weighted)
>>> print(f"Non-zero values: {csr_weighted.data}")
Non-zero values: [2.5 1. 2.5 3.2 1. 1.8 3.2 1.8]
"""
if not isinstance(adj_matrix, np.ndarray):
raise TypeError("Input must be a numpy array")
if adj_matrix.ndim != 2:
raise ValueError("Input must be a 2D array")
if adj_matrix.shape[0] != adj_matrix.shape[1]:
raise ValueError("Adjacency matrix must be square")
return csr_matrix(adj_matrix)
####################################################################################################
[docs]
def triangulated_mesh_to_csr(
faces: np.ndarray, n_vertices: Optional[int] = None
) -> csr_matrix:
"""
Convert triangulated mesh faces to a CSR graph representation.
This method constructs a graph where vertices are connected if they share
an edge in any triangle face. All edge weights are set to 1. The resulting
graph represents the 1-ring neighborhood connectivity of the mesh.
Parameters
----------
faces : np.ndarray
A 2D numpy array of shape (n_faces, 3) where each row contains the
indices of three vertices forming a triangle. Vertex indices should
be non-negative integers.
n_vertices : int, optional
Total number of vertices in the mesh. If None, it will be inferred
as the maximum vertex index + 1. Providing this parameter is recommended
for meshes with isolated vertices.
Returns
-------
csr_matrix
A scipy sparse CSR matrix of shape (n_vertices, n_vertices) where
entry (i,j) is 1 if vertices i and j are connected by an edge in the mesh,
and 0 otherwise. The matrix is symmetric for undirected graphs.
Raises
------
ValueError
If faces array is not 2D, doesn't have 3 columns, contains negative
indices, or if n_vertices is less than the maximum vertex index.
TypeError
If faces is not a numpy array or contains non-integer values.
Examples
--------
>>> import numpy as np
>>> # Define a simple tetrahedron (4 faces, 4 vertices)
>>> faces = np.array([[0, 1, 2],
... [0, 1, 3],
... [0, 2, 3],
... [1, 2, 3]])
>>> csr_graph = triangulated_mesh_to_csr(faces)
>>> print("Adjacency matrix:")
>>> print(csr_graph.toarray())
Adjacency matrix:
[[0 1 1 1]
[1 0 1 1]
[1 1 0 1]
[1 1 1 0]]
>>> # Triangle mesh with explicit vertex count
>>> faces_triangle = np.array([[0, 1, 2]])
>>> csr_triangle = triangulated_mesh_to_csr(faces_triangle, n_vertices=5)
>>> print(f"Shape: {csr_triangle.shape}")
>>> print("Connections for triangle [0,1,2]:")
>>> print(csr_triangle.toarray())
Shape: (5, 5)
Connections for triangle [0,1,2]:
[[0 1 1 0 0]
[1 0 1 0 0]
[1 1 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]]
"""
if not isinstance(faces, np.ndarray):
raise TypeError("Faces must be a numpy array")
if faces.ndim != 2:
raise ValueError("Faces array must be 2D")
if faces.shape[1] != 3:
raise ValueError(
"Faces array must have exactly 3 columns for triangulated mesh"
)
if not np.issubdtype(faces.dtype, np.integer):
raise TypeError("Faces array must contain integer vertex indices")
if np.any(faces < 0):
raise ValueError("Vertex indices must be non-negative")
max_vertex_idx = np.max(faces)
if n_vertices is None:
n_vertices = max_vertex_idx + 1
elif n_vertices <= max_vertex_idx:
raise ValueError(
f"n_vertices ({n_vertices}) must be greater than maximum vertex index ({max_vertex_idx})"
)
# Extract all edges from triangular faces
# Each triangle (v0, v1, v2) generates edges: (v0,v1), (v1,v2), (v0,v2)
edges = []
for face in faces:
v0, v1, v2 = face
edges.extend([(v0, v1), (v1, v2), (v0, v2)])
edges = np.array(edges)
# Create symmetric edges (undirected graph)
edges_symmetric = np.vstack([edges, edges[:, [1, 0]]])
# Remove duplicate edges and create CSR matrix
row_indices = edges_symmetric[:, 0]
col_indices = edges_symmetric[:, 1]
data = np.ones(len(edges_symmetric), dtype=int)
# Create sparse matrix and eliminate duplicates by summing
csr_graph = csr_matrix(
(data, (row_indices, col_indices)), shape=(n_vertices, n_vertices)
)
# Convert to binary (in case of duplicate edges)
csr_graph.data = (csr_graph.data > 0).astype(int)
return csr_graph
####################################################################################################
[docs]
def edges_to_csr(
edges: np.ndarray,
edge_values: np.ndarray = None,
n_vertices: Optional[int] = None,
symmetric: bool = True,
) -> csr_matrix:
"""
Convert an edge list with values to CSR graph representation.
This method constructs a graph from a list of edges and their corresponding
weights/values. Useful for creating graphs from pre-computed edge lists.
Parameters
----------
edges : np.ndarray
A 2D numpy array of shape (n_edges, 2) where each row contains the
indices of two connected vertices. Vertex indices should be non-negative integers.
edge_values : np.ndarray
A 1D numpy array of length n_edges containing the weight/value for each edge.
Values can be any numeric type (int, float, etc.).
n_vertices : int, optional
Total number of vertices in the graph. If None, it will be inferred
as the maximum vertex index + 1. Providing this parameter is recommended
for graphs with isolated vertices.
symmetric : bool, default=True
If True, creates an undirected graph by adding reverse edges with the same values.
If False, creates a directed graph using only the provided edges.
Returns
-------
csr_matrix
A scipy sparse CSR matrix of shape (n_vertices, n_vertices) where
entry (i,j) contains the weight of the edge from vertex i to vertex j.
For undirected graphs (symmetric=True), the matrix is symmetric.
Raises
------
ValueError
If edges array is not 2D, doesn't have 2 columns, contains negative indices,
edge_values length doesn't match number of edges, or if n_vertices is less
than the maximum vertex index.
TypeError
If edges contains non-integer values or edge_values is not numeric.
Examples
--------
>>> import numpy as np
>>> # Create a simple weighted graph
>>> edges = np.array([[0, 1],
... [1, 2],
... [0, 2]])
>>> values = np.array([2.5, 1.0, 3.2])
>>> csr_graph = edges_to_csr(edges, values)
>>> print("Symmetric weighted graph:")
>>> print(csr_graph.toarray())
Symmetric weighted graph:
[[0. 2.5 3.2]
[2.5 0. 1. ]
[3.2 1. 0. ]]
>>> # Directed graph example
>>> edges_directed = np.array([[0, 1], [1, 2]])
>>> values_directed = np.array([0.8, 1.5])
>>> csr_directed = edges_to_csr(edges_directed, values_directed, symmetric=False)
>>> print("Directed graph:")
>>> print(csr_directed.toarray())
Directed graph:
[[0. 0.8 0. ]
[0. 0. 1.5]
[0. 0. 0. ]]
>>> # Handle duplicate edges (values are summed)
>>> edges_dup = np.array([[0, 1], [0, 1], [1, 0]])
>>> values_dup = np.array([1.0, 2.0, 0.5])
>>> csr_dup = edges_to_csr(edges_dup, values_dup)
>>> print("Duplicate edges (summed):")
>>> print(csr_dup.toarray())
Duplicate edges (summed):
[[0. 3.5]
[3.5 0. ]]
"""
if not isinstance(edges, np.ndarray):
raise TypeError("Edges must be a numpy array")
if edge_values is None:
edge_values = np.ones(len(edges))
if not isinstance(edge_values, np.ndarray):
raise TypeError("Edge values must be a numpy array")
if edges.ndim != 2:
raise ValueError("Edges array must be 2D")
if edges.shape[1] != 2:
raise ValueError("Edges array must have exactly 2 columns")
if edge_values.ndim != 1:
raise ValueError("Edge values must be a 1D array")
if len(edges) != len(edge_values):
raise ValueError(
f"Number of edges ({len(edges)}) must match number of edge values ({len(edge_values)})"
)
if not np.issubdtype(edges.dtype, np.integer):
raise TypeError("Edges array must contain integer vertex indices")
if not np.issubdtype(edge_values.dtype, np.number):
raise TypeError("Edge values must be numeric")
if np.any(edges < 0):
raise ValueError("Vertex indices must be non-negative")
if len(edges) == 0:
warnings.warn("Empty edge list provided", UserWarning)
if n_vertices is None:
n_vertices = 0
return csr_matrix((n_vertices, n_vertices))
max_vertex_idx = np.max(edges)
if n_vertices is None:
n_vertices = max_vertex_idx + 1
elif n_vertices <= max_vertex_idx:
raise ValueError(
f"n_vertices ({n_vertices}) must be greater than maximum vertex index ({max_vertex_idx})"
)
# Prepare edge data
if symmetric:
# Add reverse edges for undirected graph
all_edges = np.vstack([edges, edges[:, [1, 0]]])
all_values = np.concatenate([edge_values, edge_values])
else:
all_edges = edges
all_values = edge_values
row_indices = all_edges[:, 0]
col_indices = all_edges[:, 1]
# Create CSR matrix (duplicate edges will be summed automatically)
csr_graph = csr_matrix(
(all_values, (row_indices, col_indices)), shape=(n_vertices, n_vertices)
)
return csr_graph
#####################################################################################################
[docs]
def connected_components(
csr_graph: csr_matrix, method: str = "union_find", return_labels: bool = False
) -> Union[List[List[int]], Tuple[List[List[int]], np.ndarray]]:
"""
Find connected components in a CSR graph representation.
This method identifies all connected components in an undirected graph represented
as a CSR matrix. A connected component is a maximal set of vertices such that
there is a path between every pair of vertices in the set.
Parameters
----------
csr_graph : csr_matrix
A scipy sparse CSR matrix representing the graph adjacency matrix.
Should be square with shape (n_vertices, n_vertices). For undirected graphs,
the matrix should be symmetric. Non-zero entries represent connections.
method : str, default="union_find"
Algorithm to use for finding connected components:
- "union_find": Disjoint Set Union with path compression and union by rank.
Most efficient for sparse graphs. Time: O(E * α(V)), Space: O(V).
- "bfs": Breadth-First Search traversal.
Good memory characteristics. Time: O(V + E), Space: O(V).
- "dfs": Depth-First Search traversal.
Simple and intuitive. Time: O(V + E), Space: O(V).
return_labels : bool, default=False
If True, also return a label array where labels[i] indicates which
component vertex i belongs to. If False, only return the component lists.
Returns
-------
components : List[List[int]]
A list of connected components, where each component is a list of vertex
indices belonging to that component. Components are sorted by the
smallest vertex index in each component.
labels : np.ndarray, optional
Only returned if return_labels=True. A 1D array of length n_vertices
where labels[i] is the component ID (0-indexed) that vertex i belongs to.
Raises
------
TypeError
If csr_graph is not a scipy csr_matrix.
ValueError
If csr_graph is not square, method is not recognized, or graph is empty.
UserWarning
If the graph appears to be directed (non-symmetric) when undirected
behavior is expected.
Examples
--------
>>> import numpy as np
>>> from scipy.sparse import csr_matrix
>>>
>>> # Create a graph with 3 components: [0,1], [2,3,4], [5]
>>> row = np.array([0, 1, 2, 2, 3, 3, 4, 4])
>>> col = np.array([1, 0, 3, 4, 2, 4, 2, 3])
>>> data = np.ones(len(row))
>>> graph = csr_matrix((data, (row, col)), shape=(6, 6))
>>>
>>> components = connected_components(graph)
>>> print("Connected components:")
>>> for i, comp in enumerate(components):
... print(f" Component {i}: {comp}")
Connected components:
Component 0: [0, 1]
Component 1: [2, 3, 4]
Component 2: [5]
>>> # Get component labels as well
>>> components, labels = connected_components(graph, return_labels=True)
>>> print(f"Component labels: {labels}")
>>> print(f"Vertex 3 belongs to component: {labels[3]}")
Component labels: [0 0 1 1 1 2]
Vertex 3 belongs to component: 1
>>> # Using different algorithms
>>> comp_bfs = connected_components(graph, method="bfs")
>>> comp_dfs = connected_components(graph, method="dfs")
>>> # All methods should give the same result (possibly in different order)
>>> # Example with weighted edges (weights are ignored for connectivity)
>>> weighted_graph = edges_to_csr(
... np.array([[0, 1], [1, 2]]),
... np.array([2.5, 3.0])
... )
>>> components = connected_components(weighted_graph)
>>> print(f"Weighted graph components: {components}")
Weighted graph components: [[0, 1, 2]]
Notes
-----
- Edge weights are ignored; only connectivity matters.
- Self-loops (diagonal entries) are ignored for component detection.
- For directed graphs, this finds weakly connected components (treating
edges as undirected).
- Empty components (isolated vertices) are included as single-vertex components.
"""
if not isinstance(csr_graph, csr_matrix):
raise TypeError("Input must be a scipy csr_matrix")
if csr_graph.shape[0] != csr_graph.shape[1]:
raise ValueError("CSR graph must be square")
n_vertices = csr_graph.shape[0]
if n_vertices == 0:
raise ValueError("Graph cannot be empty")
# Check if graph is symmetric (undirected)
if not np.allclose(csr_graph.data, csr_graph.T.data) or not np.array_equal(
csr_graph.indices, csr_graph.T.indices
):
warnings.warn(
"Graph appears to be directed (non-symmetric). "
"Finding weakly connected components.",
UserWarning,
)
# Ensure we work with the full connectivity (treat as undirected)
symmetric_graph = csr_graph + csr_graph.T
symmetric_graph.data = (symmetric_graph.data > 0).astype(int)
# Choose algorithm
if method == "union_find":
components = _connected_components_union_find(symmetric_graph)
elif method == "bfs":
components = _connected_components_bfs(symmetric_graph)
elif method == "dfs":
components = _connected_components_dfs(symmetric_graph)
else:
raise ValueError(
f"Unknown method '{method}'. Choose from: 'union_find', 'bfs', 'dfs'"
)
# Sort components by their smallest vertex index
components.sort(key=lambda comp: min(comp))
if return_labels:
labels = np.empty(n_vertices, dtype=int)
for comp_id, component in enumerate(components):
for vertex in component:
labels[vertex] = comp_id
return components, labels
else:
return components
######################################################################################################
def _connected_components_union_find(csr_graph: csr_matrix) -> List[List[int]]:
"""Union-Find implementation for connected components."""
n_vertices = csr_graph.shape[0]
parent = np.arange(n_vertices)
rank = np.zeros(n_vertices, dtype=int)
def find(x: int) -> int:
if parent[x] != x:
parent[x] = find(parent[x]) # Path compression
return parent[x]
def union(x: int, y: int):
px, py = find(x), find(y)
if px == py:
return
# Union by rank
if rank[px] < rank[py]:
parent[px] = py
elif rank[px] > rank[py]:
parent[py] = px
else:
parent[py] = px
rank[px] += 1
# Process all edges
csr_graph.eliminate_zeros() # Remove explicit zeros
for i in range(n_vertices):
start_idx = csr_graph.indptr[i]
end_idx = csr_graph.indptr[i + 1]
for j_idx in range(start_idx, end_idx):
j = csr_graph.indices[j_idx]
if i != j: # Ignore self-loops
union(i, j)
# Group vertices by root
components_dict = {}
for vertex in range(n_vertices):
root = find(vertex)
if root not in components_dict:
components_dict[root] = []
components_dict[root].append(vertex)
return list(components_dict.values())
#####################################################################################################
def _connected_components_bfs(csr_graph: csr_matrix) -> List[List[int]]:
"""BFS implementation for connected components."""
n_vertices = csr_graph.shape[0]
visited = np.zeros(n_vertices, dtype=bool)
components = []
def get_neighbors(vertex: int) -> List[int]:
start_idx = csr_graph.indptr[vertex]
end_idx = csr_graph.indptr[vertex + 1]
neighbors = []
for j_idx in range(start_idx, end_idx):
neighbor = csr_graph.indices[j_idx]
if vertex != neighbor: # Ignore self-loops
neighbors.append(neighbor)
return neighbors
for start_vertex in range(n_vertices):
if not visited[start_vertex]:
component = []
queue = deque([start_vertex])
visited[start_vertex] = True
while queue:
vertex = queue.popleft()
component.append(vertex)
for neighbor in get_neighbors(vertex):
if not visited[neighbor]:
visited[neighbor] = True
queue.append(neighbor)
components.append(component)
return components
#####################################################################################################
def _connected_components_dfs(csr_graph: csr_matrix) -> List[List[int]]:
"""DFS implementation for connected components."""
n_vertices = csr_graph.shape[0]
visited = np.zeros(n_vertices, dtype=bool)
components = []
def get_neighbors(vertex: int) -> List[int]:
start_idx = csr_graph.indptr[vertex]
end_idx = csr_graph.indptr[vertex + 1]
neighbors = []
for j_idx in range(start_idx, end_idx):
neighbor = csr_graph.indices[j_idx]
if vertex != neighbor: # Ignore self-loops
neighbors.append(neighbor)
return neighbors
def dfs(vertex: int, component: List[int]):
visited[vertex] = True
component.append(vertex)
for neighbor in get_neighbors(vertex):
if not visited[neighbor]:
dfs(neighbor, component)
for vertex in range(n_vertices):
if not visited[vertex]:
component = []
dfs(vertex, component)
components.append(component)
return components
#####################################################################################################
[docs]
def component_statistics(components: List[List[int]]) -> dict:
"""
Compute statistics about connected components.
Parameters
----------
components : List[List[int]]
List of connected components from connected_components().
Returns
-------
dict
Dictionary containing component statistics:
- 'num_components': Number of connected components
- 'largest_component_size': Size of the largest component
- 'smallest_component_size': Size of the smallest component
- 'average_component_size': Average component size
- 'component_sizes': List of all component sizes
- 'giant_component_ratio': Ratio of largest to total vertices
Examples
--------
>>> components = [[0, 1, 2, 3], [4], [5, 6]]
>>> stats = component_statistics(components)
>>> print(f"Number of components: {stats['num_components']}")
>>> print(f"Giant component ratio: {stats['giant_component_ratio']:.2f}")
Number of components: 3
Giant component ratio: 0.57
"""
if not components:
return {
"num_components": 0,
"largest_component_size": 0,
"smallest_component_size": 0,
"average_component_size": 0,
"component_sizes": [],
"giant_component_ratio": 0.0,
}
sizes = [len(comp) for comp in components]
total_vertices = sum(sizes)
return {
"num_components": len(components),
"largest_component_size": max(sizes),
"smallest_component_size": min(sizes),
"average_component_size": total_vertices / len(components),
"component_sizes": sizes,
"giant_component_ratio": (
max(sizes) / total_vertices if total_vertices > 0 else 0.0
),
}