165 lines
5.2 KiB
Python
165 lines
5.2 KiB
Python
#
|
|
# Spectral clustering routine
|
|
#
|
|
# For the given data we have:
|
|
# dip_hw_3.mat["d1a"] # [MN x MN] affinity matrix
|
|
# dip_hw_3.mat["d2a"] # [M x N x 3] RGB image
|
|
# dip_hw_3.mat["d2b"] # [M x N x 3] RGB image
|
|
#
|
|
#
|
|
# author: Christos Choutouridis <cchoutou@ece.auth.gr>
|
|
# date: 05/07/2025
|
|
#
|
|
|
|
try:
|
|
import numpy as np
|
|
from numpy.typing import NDArray
|
|
from scipy.sparse.linalg import eigs
|
|
from sklearn.cluster import KMeans
|
|
|
|
# Testing requirements
|
|
from scipy.io import loadmat
|
|
import matplotlib.pyplot
|
|
from sklearn.cluster import spectral_clustering as sk_spectral
|
|
from sklearn.metrics import adjusted_rand_score
|
|
except ImportError as e:
|
|
print("Missing package:", e)
|
|
print("Run: pip install -r requirements.txt")
|
|
exit(1)
|
|
|
|
|
|
def spectral_clustering(
|
|
affinity_mat: NDArray[np.floating],
|
|
k: int,
|
|
normalized: bool = False
|
|
) -> NDArray[np.int32]:
|
|
"""
|
|
Performs spectral clustering on a given affinity matrix.
|
|
|
|
Parameters:
|
|
----------
|
|
affinity_mat : np.ndarray of shape (n, n), dtype=float
|
|
The symmetric affinity matrix representing a graph.
|
|
k : int
|
|
The number of clusters.
|
|
normalized : bool, optional (default=False)
|
|
Whether to use the normalized Laplacian (L_sym = I - D^(-1/2) A D^(-1/2))
|
|
!note: Don't miss-interpret this with normalized-cuts implementation!!
|
|
|
|
Returns:
|
|
-------
|
|
cluster_idx : np.ndarray of shape (n,), dtype=int
|
|
An array of cluster labels for each node.
|
|
"""
|
|
# Degree matrix
|
|
D = np.diag(affinity_mat.sum(axis=1))
|
|
|
|
if normalized:
|
|
with np.errstate(divide='ignore'):
|
|
D_inv_sqrt = np.diag(1.0 / np.sqrt(np.diag(D)))
|
|
D_inv_sqrt[np.isinf(D_inv_sqrt)] = 0.0
|
|
# L = I - D^(-1/2) A D^(-1/2)
|
|
L = np.eye(affinity_mat.shape[0]) - D_inv_sqrt @ affinity_mat @ D_inv_sqrt
|
|
else:
|
|
L = D - affinity_mat
|
|
|
|
# Compute k smallest eigenvectors (use eigsh if L is symmetric positive-definite)
|
|
eigvals, eigvecs = eigs(L, k=k, which='SR') # 'SR' = Smallest Real part
|
|
|
|
# Form matrix U with eigenvectors as columns
|
|
# Convert complex -> real (imaginary parts should be negligible)
|
|
U = np.real(eigvecs)
|
|
|
|
# Each row is a vector to be clustered
|
|
# random_state parameter to 1, to ensure reproducibility across experiments.
|
|
kmeans = KMeans(n_clusters=k, random_state=1)
|
|
kmeans.fit(U)
|
|
|
|
# obtain the cluster labels for each input data point
|
|
return kmeans.labels_.astype(np.int32)
|
|
|
|
|
|
def _test_d1a(k: int, plot :bool = False):
|
|
"""
|
|
Runs spectral clustering on d1a from dip_hw_3.mat for a given k.
|
|
"""
|
|
print(f"=== Spectral clustering test on d1a (k={k}) ===")
|
|
print(f"")
|
|
|
|
data = loadmat("dip_hw_3.mat")
|
|
A = data["d1a"]
|
|
print("Loaded d1a affinity matrix with shape:", A.shape)
|
|
|
|
labels = spectral_clustering(A, k)
|
|
|
|
print("Cluster labels:")
|
|
print(labels)
|
|
print("Unique clusters:", np.unique(labels))
|
|
|
|
if plot:
|
|
D = np.diag(A.sum(axis=1))
|
|
L = D - A
|
|
eigvals, eigvecs = eigs(L, k=k, which='SR')
|
|
eigvecs = np.real(eigvecs)
|
|
|
|
# Plot first 2 dimensions
|
|
matplotlib.use("TkAgg")
|
|
matplotlib.pyplot.figure()
|
|
matplotlib.pyplot.scatter(eigvecs[:, 0], eigvecs[:, 1], c=labels, cmap='tab10', s=100, edgecolors='k')
|
|
matplotlib.pyplot.title(f"Spectral Clustering (k={k}) on d1a")
|
|
matplotlib.pyplot.xlabel("Eigenvector 1")
|
|
matplotlib.pyplot.ylabel("Eigenvector 2")
|
|
matplotlib.pyplot.grid(True)
|
|
matplotlib.pyplot.tight_layout()
|
|
matplotlib.pyplot.show()
|
|
#matplotlib.pyplot.savefig(f"spectral_d1a_k{k}.png")
|
|
|
|
|
|
def compare_with_sklearn(k: int, plot: bool = False):
|
|
print(f"=== Comparing with sklearn spectral_clustering (k={k}) ===")
|
|
print(f"")
|
|
|
|
data = loadmat("dip_hw_3.mat")
|
|
A = data["d1a"]
|
|
|
|
# Your implementation
|
|
labels_own = spectral_clustering(A, k, True)
|
|
|
|
# sklearn implementation (uses normalized Laplacian)
|
|
labels_sklearn = sk_spectral(
|
|
affinity=A,
|
|
n_clusters=k,
|
|
assign_labels="kmeans",
|
|
random_state=1
|
|
)
|
|
|
|
# Compare clustering assignments using ARI
|
|
ari = adjusted_rand_score(labels_own, labels_sklearn)
|
|
|
|
print(f"Labels (own): {labels_own}")
|
|
print(f"Labels (sklearn): {labels_sklearn}")
|
|
print(f"Adjusted Rand Index (ARI): {ari:.4f}")
|
|
|
|
# Optional: bar plot to compare visually
|
|
if plot:
|
|
matplotlib.use("TkAgg")
|
|
matplotlib.pyplot.figure(figsize=(6, 1.5))
|
|
matplotlib.pyplot.title(f"Cluster comparison (k={k})")
|
|
matplotlib.pyplot.plot(labels_own, 'o-', label='own', markersize=8)
|
|
matplotlib.pyplot.plot(labels_sklearn, 'x--', label='sklearn', markersize=6)
|
|
matplotlib.pyplot.yticks(range(k))
|
|
matplotlib.pyplot.xlabel("Node index")
|
|
matplotlib.pyplot.legend()
|
|
matplotlib.pyplot.tight_layout()
|
|
matplotlib.pyplot.show()
|
|
# matplotlib.pyplot.savefig(f"compare_k{k}.png")
|
|
# print(f"Saved plot: compare_k{k}.png")
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
for k in [2, 3, 4]:
|
|
# If you have TkAgg you can pass plot=True, otherwise pass False
|
|
_test_d1a(k, plot=False)
|
|
compare_with_sklearn(k, plot=False)
|