210 lines
5.6 KiB
Python
210 lines
5.6 KiB
Python
#
|
|
# Normalized Cuts
|
|
#
|
|
# author: Christos Choutouridis <cchoutou@ece.auth.gr>
|
|
# date: 05/07/2025
|
|
#
|
|
|
|
try:
|
|
import numpy as np
|
|
from numpy.typing import NDArray
|
|
from sklearn.cluster import KMeans
|
|
from scipy.sparse.linalg import eigs
|
|
|
|
# Testing requirements
|
|
from scipy.io import loadmat
|
|
from image_to_graph import image_to_graph
|
|
import matplotlib.pyplot
|
|
except ImportError as e:
|
|
print("Missing package:", e)
|
|
exit(1)
|
|
|
|
|
|
def n_cuts(
|
|
affinity_mat: NDArray[np.floating],
|
|
k: int
|
|
) -> NDArray[np.int32]:
|
|
"""
|
|
Non-recursive normalized cuts implementation using spectral embedding.
|
|
|
|
Parameters:
|
|
-----------
|
|
affinity_mat : np.ndarray of shape (n, n), dtype=float
|
|
Symmetric affinity matrix representing the graph.
|
|
k : int
|
|
Number of clusters.
|
|
|
|
Returns:
|
|
--------
|
|
cluster_idx : np.ndarray of shape (n,), dtype=int
|
|
Cluster label for each node.
|
|
"""
|
|
# Degree matrix
|
|
D = np.diag(affinity_mat.sum(axis=1))
|
|
|
|
# Unnormalized Laplacian
|
|
L = D - affinity_mat
|
|
|
|
# Solve the generalized eigenvalue problem: Lx = λDx
|
|
eigvals, eigvecs = eigs(A=L, M=D, k=k, which='SR') # SR = Smallest Real part
|
|
|
|
# Each row of U is a node's representation in spectral space
|
|
# Convert complex -> real (imaginary parts should be negligible)
|
|
U = np.real(eigvecs)
|
|
|
|
# Each row is a vector to be clustered
|
|
# random_state parameter to 1, to ensure reproducibility across experiments.
|
|
kmeans = KMeans(n_clusters=k, random_state=1)
|
|
kmeans.fit(U)
|
|
|
|
return kmeans.labels_.astype(np.int32)
|
|
|
|
|
|
def calculate_n_cut_value(
|
|
affinity_mat: NDArray[np.floating],
|
|
cluster_idx: NDArray[np.int32]
|
|
) -> float:
|
|
"""
|
|
Calculates the Ncut(A, B) metric for a binary clustering.
|
|
|
|
Parameters:
|
|
-----------
|
|
affinity_mat : np.ndarray of shape (n, n)
|
|
Symmetric affinity matrix.
|
|
cluster_idx : np.ndarray of shape (n,)
|
|
Cluster labels with values in {0, 1}.
|
|
|
|
Returns:
|
|
--------
|
|
n_cut_value : float
|
|
The value of the Ncut metric.
|
|
"""
|
|
A = np.where(cluster_idx == 0)[0]
|
|
B = np.where(cluster_idx == 1)[0]
|
|
|
|
assoc_AA = np.sum(affinity_mat[np.ix_(A, A)])
|
|
assoc_BB = np.sum(affinity_mat[np.ix_(B, B)])
|
|
assoc_AV = np.sum(affinity_mat[A, :])
|
|
assoc_BV = np.sum(affinity_mat[B, :])
|
|
|
|
nassoc_AB = (assoc_AA / assoc_AV) + (assoc_BB / assoc_BV)
|
|
ncut = 2 - nassoc_AB
|
|
|
|
return ncut
|
|
|
|
|
|
def n_cuts_recursive(
|
|
affinity_mat: NDArray[np.floating],
|
|
T1: int,
|
|
T2: float
|
|
) -> NDArray[np.int32]:
|
|
"""
|
|
Recursive normalized cuts clustering.
|
|
|
|
Parameters:
|
|
-----------
|
|
affinity_mat : np.ndarray of shape (n, n)
|
|
Symmetric affinity matrix.
|
|
T1 : int
|
|
Minimum size for splitting a group.
|
|
T2 : float
|
|
Maximum acceptable Ncut value to allow further splitting.
|
|
|
|
Returns:
|
|
--------
|
|
cluster_idx : np.ndarray of shape (n,), dtype=int
|
|
Final cluster labels after recursive partitioning.
|
|
"""
|
|
|
|
n = affinity_mat.shape[0]
|
|
cluster_idx = np.zeros(n, dtype=np.int32)
|
|
next_label = [1] # Mutable counter to assign new cluster labels
|
|
|
|
def recursive_partition(indices: NDArray[np.int32], current_label: int):
|
|
if len(indices) <= T1:
|
|
cluster_idx[indices] = current_label
|
|
return
|
|
|
|
sub_affinity = affinity_mat[np.ix_(indices, indices)]
|
|
labels = n_cuts(sub_affinity, k=2)
|
|
|
|
ncut_value = calculate_n_cut_value(sub_affinity, labels)
|
|
A = indices[labels == 0]
|
|
B = indices[labels == 1]
|
|
|
|
if len(A) <= T1 or len(B) <= T1 or ncut_value > T2:
|
|
cluster_idx[indices] = current_label
|
|
return
|
|
|
|
# Assign recursively new labels
|
|
recursive_partition(A, current_label)
|
|
recursive_partition(B, next_label[0])
|
|
next_label[0] += 1
|
|
|
|
# Start with all indices
|
|
recursive_partition(np.arange(n), 0)
|
|
return cluster_idx
|
|
|
|
|
|
|
|
def _test_n_cuts(k: int, plot: bool = False):
|
|
data = loadmat("dip_hw_3.mat")
|
|
img = data["d2a"]
|
|
M, N, _ = img.shape
|
|
|
|
print("Running non-recursive n_cuts on d2a...")
|
|
|
|
A = image_to_graph(img)
|
|
labels = n_cuts(A, k=k)
|
|
|
|
print("Unique cluster labels:", np.unique(labels))
|
|
|
|
# Visualize
|
|
if plot:
|
|
clustered = labels.reshape(M, N)
|
|
matplotlib.use("TkAgg")
|
|
matplotlib.pyplot.imshow(clustered, cmap='tab10', vmin=0, vmax=2)
|
|
matplotlib.pyplot.title("n_cuts clustering (k={k}) on d2a")
|
|
matplotlib.pyplot.axis('off')
|
|
matplotlib.pyplot.tight_layout()
|
|
matplotlib.pyplot.show()
|
|
#matplotlib.pyplot.savefig("ncuts_d2a_k3.png")
|
|
#print("Saved result to: ncuts_d2a_k3.png")
|
|
|
|
|
|
def _test_n_cuts_req(plot: bool = False):
|
|
data = loadmat("dip_hw_3.mat")
|
|
img = data["d2b"]
|
|
M, N, _ = img.shape
|
|
|
|
print("Running recursive n_cuts on d2b...")
|
|
|
|
affinity = image_to_graph(img)
|
|
|
|
# Thresholds from the demo instructions
|
|
T1 = 5
|
|
T2 = 0.95
|
|
|
|
labels = n_cuts_recursive(affinity, T1=T1, T2=T2)
|
|
|
|
print("Number of unique clusters:", len(np.unique(labels)))
|
|
print("Labels:", np.unique(labels))
|
|
|
|
if plot:
|
|
segmented = labels.reshape(M, N)
|
|
matplotlib.pyplot.imshow(segmented, cmap='tab20')
|
|
matplotlib.pyplot.title(f"Recursive n_cuts on d2b (T1={T1}, T2={T2})")
|
|
matplotlib.pyplot.axis('off')
|
|
matplotlib.pyplot.tight_layout()
|
|
matplotlib.pyplot.show()
|
|
#matplotlib.pyplot.savefig("ncuts_recursive_d2b.png")
|
|
#print("Saved result to: ncuts_recursive_d2b.png")
|
|
|
|
if __name__ == '__main__':
|
|
for k in [2, 3, 4]:
|
|
_test_n_cuts(k, False)
|
|
|
|
_test_n_cuts_req(False)
|
|
|
|
|