From ae4ad8213692bab34b7fcf2e2c2ef632ef0b46ce Mon Sep 17 00:00:00 2001 From: Christos Choutouridis Date: Sun, 8 Feb 2026 22:53:52 +0200 Subject: [PATCH] Level 2: Psychoacoustic first version --- source/core/aac_coder.py | 16 +- source/core/aac_configuration.py | 12 +- source/core/aac_decoder.py | 19 +- source/core/aac_filterbank.py | 83 +--- source/core/aac_psycho.py | 441 ++++++++++++++++++ source/core/aac_snr_db.py | 60 --- source/core/aac_ssc.py | 2 +- source/core/aac_tns.py | 40 +- source/core/aac_types.py | 19 + source/core/aac_utils.py | 270 +++++++++++ source/core/tests/test_aac_coder_decoder.py | 4 +- source/core/tests/test_filterbank.py | 2 +- source/core/tests/test_filterbank_internal.py | 117 ----- source/core/tests/test_psycho.py | 253 ++++++++++ source/core/tests/test_snr_db.py | 98 ---- source/core/tests/test_utils.py | 329 +++++++++++++ source/level_1/core/aac_coder.py | 16 +- source/level_1/core/aac_configuration.py | 12 +- source/level_1/core/aac_decoder.py | 19 +- source/level_1/core/aac_filterbank.py | 83 +--- source/level_1/core/aac_snr_db.py | 60 --- source/level_1/core/aac_ssc.py | 2 +- source/level_1/core/aac_types.py | 19 + source/level_1/core/aac_utils.py | 270 +++++++++++ source/level_1/level_1.py | 2 +- source/level_2/core/aac_coder.py | 16 +- source/level_2/core/aac_configuration.py | 12 +- source/level_2/core/aac_decoder.py | 19 +- source/level_2/core/aac_filterbank.py | 83 +--- source/level_2/core/aac_snr_db.py | 60 --- source/level_2/core/aac_ssc.py | 2 +- source/level_2/core/aac_tns.py | 40 +- source/level_2/core/aac_types.py | 19 + source/level_2/core/aac_utils.py | 270 +++++++++++ source/level_2/level_2.py | 2 +- 35 files changed, 2003 insertions(+), 768 deletions(-) create mode 100644 source/core/aac_psycho.py delete mode 100644 source/core/aac_snr_db.py create mode 100644 source/core/aac_utils.py delete mode 100644 source/core/tests/test_filterbank_internal.py create mode 100644 source/core/tests/test_psycho.py delete mode 100644 source/core/tests/test_snr_db.py create mode 100644 source/core/tests/test_utils.py delete mode 100644 source/level_1/core/aac_snr_db.py create mode 100644 source/level_1/core/aac_utils.py delete mode 100644 source/level_2/core/aac_snr_db.py create mode 100644 source/level_2/core/aac_utils.py diff --git a/source/core/aac_coder.py b/source/core/aac_coder.py index 0b6b939..fc3f8ac 100644 --- a/source/core/aac_coder.py +++ b/source/core/aac_coder.py @@ -9,16 +9,8 @@ # cchoutou@ece.auth.gr # # Description: -# Level 1 AAC encoder orchestration. -# Keeps the same functional behavior as the original level_1 implementation: -# - Reads WAV via soundfile -# - Validates stereo and 48 kHz -# - Frames into 2048 samples with hop=1024 and zero padding at both ends -# - SSC decision uses next-frame attack detection -# - Filterbank analysis (MDCT) -# - Stores per-channel spectra in AACSeq1 schema: -# * ESH: (128, 8) -# * else: (1024, 1) +# - Level 1 AAC encoder orchestration. +# - Level 2 AAC encoder orchestration. # ------------------------------------------------------------ from __future__ import annotations @@ -199,6 +191,10 @@ def aac_coder_1(filename_in: Union[str, Path]) -> AACSeq1: return aac_seq +# ----------------------------------------------------------------------------- +# Level 2 encoder +# ----------------------------------------------------------------------------- + def aac_coder_2(filename_in: Union[str, Path]) -> AACSeq2: """ Level-2 AAC encoder (Level 1 + TNS). diff --git a/source/core/aac_configuration.py b/source/core/aac_configuration.py index bb75d2e..63d6297 100644 --- a/source/core/aac_configuration.py +++ b/source/core/aac_configuration.py @@ -15,6 +15,8 @@ from __future__ import annotations # Imports +from typing import Final + from core.aac_types import WinType # Filterbank @@ -28,4 +30,12 @@ WIN_TYPE: WinType = "SIN" # ------------------------------------------------------------ PRED_ORDER = 4 QUANT_STEP = 0.1 -QUANT_MAX = 0.7 # 4-bit symmetric with step 0.1 -> clamp to [-0.7, +0.7] \ No newline at end of file +QUANT_MAX = 0.7 # 4-bit symmetric with step 0.1 -> clamp to [-0.7, +0.7] + + +# ----------------------------------------------------------------------------- +# Psycho +# ----------------------------------------------------------------------------- + +NMT_DB: Final[float] = 6.0 # Noise Masking Tone (dB) +TMN_DB: Final[float] = 18.0 # Tone Masking Noise (dB) \ No newline at end of file diff --git a/source/core/aac_decoder.py b/source/core/aac_decoder.py index 606ce78..5b366b8 100644 --- a/source/core/aac_decoder.py +++ b/source/core/aac_decoder.py @@ -9,16 +9,9 @@ # cchoutou@ece.auth.gr # # Description: -# Level 1 AAC decoder orchestration (inverse of aac_coder_1()). -# Keeps the same functional behavior as the original level_1 implementation: -# - Re-pack per-channel spectra into FrameF expected by aac_i_filter_bank() -# - IMDCT synthesis per frame -# - Overlap-add with hop=1024 -# - Remove encoder boundary padding: hop at start and hop at end +# - Level 1 AAC decoder orchestration (inverse of aac_coder_1()). +# - Level 2 AAC decoder orchestration (inverse of aac_coder_1()). # -# Note: -# This core module returns the reconstructed samples. Writing to disk is kept -# in level_x demos. # ------------------------------------------------------------ from __future__ import annotations @@ -33,7 +26,7 @@ from core.aac_types import * # ----------------------------------------------------------------------------- -# Public helpers (useful for level_x demo wrappers) +# Public helpers # ----------------------------------------------------------------------------- def aac_unpack_seq_channels_to_frame_f(frame_type: FrameType, chl_f: FrameChannelF, chr_f: FrameChannelF) -> FrameF: @@ -109,7 +102,7 @@ def aac_remove_padding(y_pad: StereoSignal, hop: int = 1024) -> StereoSignal: # ----------------------------------------------------------------------------- -# Level 1 decoder (core) +# Level 1 decoder # ----------------------------------------------------------------------------- def aac_decoder_1(aac_seq_1: AACSeq1, filename_out: Union[str, Path]) -> StereoSignal: @@ -167,6 +160,10 @@ def aac_decoder_1(aac_seq_1: AACSeq1, filename_out: Union[str, Path]) -> StereoS return y +# ----------------------------------------------------------------------------- +# Level 2 decoder +# ----------------------------------------------------------------------------- + def aac_decoder_2(aac_seq_2: AACSeq2, filename_out: Union[str, Path]) -> StereoSignal: """ Level-2 AAC decoder (inverse of aac_coder_2). diff --git a/source/core/aac_filterbank.py b/source/core/aac_filterbank.py index 60eb9c2..6bd6e76 100644 --- a/source/core/aac_filterbank.py +++ b/source/core/aac_filterbank.py @@ -14,6 +14,7 @@ # ------------------------------------------------------------ from __future__ import annotations +from core.aac_utils import mdct, imdct from core.aac_types import * from scipy.signal.windows import kaiser @@ -186,74 +187,6 @@ def _window_sequence(frame_type: FrameType, win_type: WinType) -> Window: raise ValueError(f"Invalid frame_type for long window sequence: {frame_type!r}") -def _mdct(s: TimeSignal) -> MdctCoeffs: - """ - MDCT (direct form) as specified in the assignment. - - Parameters - ---------- - s : TimeSignal - Windowed time samples, 1-D array of length N (N = 2048 or 256). - - Returns - ------- - MdctCoeffs - MDCT coefficients, 1-D array of length N/2. - - Definition - ---------- - X[k] = 2 * sum_{n=0..N-1} s[n] * cos((2*pi/N) * (n + n0) * (k + 1/2)), - where n0 = (N/2 + 1)/2. - """ - s = np.asarray(s, dtype=np.float64).reshape(-1) - N = int(s.shape[0]) - if N not in (2048, 256): - raise ValueError("MDCT input length must be 2048 or 256.") - - n0 = (N / 2.0 + 1.0) / 2.0 - n = np.arange(N, dtype=np.float64) + n0 - k = np.arange(N // 2, dtype=np.float64) + 0.5 - - C = np.cos((2.0 * np.pi / N) * np.outer(n, k)) # (N, N/2) - X = 2.0 * (s @ C) # (N/2,) - return X - - -def _imdct(X: MdctCoeffs) -> TimeSignal: - """ - IMDCT (direct form) as specified in the assignment. - - Parameters - ---------- - X : MdctCoeffs - MDCT coefficients, 1-D array of length K (K = 1024 or 128). - - Returns - ------- - TimeSignal - Reconstructed time samples, 1-D array of length N = 2K. - - Definition - ---------- - s[n] = (2/N) * sum_{k=0..N/2-1} X[k] * cos((2*pi/N) * (n + n0) * (k + 1/2)), - where n0 = (N/2 + 1)/2. - """ - X = np.asarray(X, dtype=np.float64).reshape(-1) - K = int(X.shape[0]) - if K not in (1024, 128): - raise ValueError("IMDCT input length must be 1024 or 128.") - - N = 2 * K - n0 = (N / 2.0 + 1.0) / 2.0 - - n = np.arange(N, dtype=np.float64) + n0 - k = np.arange(K, dtype=np.float64) + 0.5 - - C = np.cos((2.0 * np.pi / N) * np.outer(n, k)) # (N, K) - s = (2.0 / N) * (C @ X) # (N,) - return s - - def _filter_bank_esh_channel(x_ch: FrameChannelT, win_type: WinType) -> FrameChannelF: """ ESH analysis for one channel. @@ -279,7 +212,7 @@ def _filter_bank_esh_channel(x_ch: FrameChannelT, win_type: WinType) -> FrameCha for j in range(8): start = 448 + 128 * j seg = x_ch[start:start + 256] * wS # (256,) - X_esh[:, j] = _mdct(seg) # (128,) + X_esh[:, j] = mdct(seg) # (128,) return X_esh @@ -344,7 +277,7 @@ def _i_filter_bank_esh_channel(X_esh: FrameChannelF, win_type: WinType) -> Frame # Each short IMDCT returns 256 samples. Place them at: # start = 448 + 128*j, j=0..7 (50% overlap) for j in range(8): - seg = _imdct(X_esh[:, j]) * wS # (256,) + seg = imdct(X_esh[:, j]) * wS # (256,) start = 448 + 128 * j out[start:start + 256] += seg @@ -352,7 +285,7 @@ def _i_filter_bank_esh_channel(X_esh: FrameChannelF, win_type: WinType) -> Frame # ----------------------------------------------------------------------------- -# Public Function prototypes (Level 1) +# Public Function prototypes # ----------------------------------------------------------------------------- def aac_filter_bank(frame_T: FrameT, frame_type: FrameType, win_type: WinType) -> FrameF: @@ -385,8 +318,8 @@ def aac_filter_bank(frame_T: FrameT, frame_type: FrameType, win_type: WinType) - if frame_type in ("OLS", "LSS", "LPS"): w = _window_sequence(frame_type, win_type) # length 2048 - XL = _mdct(xL * w) # length 1024 - XR = _mdct(xR * w) # length 1024 + XL = mdct(xL * w) # length 1024 + XR = mdct(xR * w) # length 1024 out = np.empty((1024, 2), dtype=np.float64) out[:, 0] = XL out[:, 1] = XR @@ -430,8 +363,8 @@ def aac_i_filter_bank(frame_F: FrameF, frame_type: FrameType, win_type: WinType) w = _window_sequence(frame_type, win_type) - xL = _imdct(frame_F[:, 0]) * w - xR = _imdct(frame_F[:, 1]) * w + xL = imdct(frame_F[:, 0]) * w + xR = imdct(frame_F[:, 1]) * w out = np.empty((2048, 2), dtype=np.float64) out[:, 0] = xL diff --git a/source/core/aac_psycho.py b/source/core/aac_psycho.py new file mode 100644 index 0000000..a26cab0 --- /dev/null +++ b/source/core/aac_psycho.py @@ -0,0 +1,441 @@ +# ------------------------------------------------------------ +# AAC Coder/Decoder - Psychoacoustic Model +# +# Multimedia course at Aristotle University of +# Thessaloniki (AUTh) +# +# Author: +# Christos Choutouridis (ΑΕΜ 8997) +# cchoutou@ece.auth.gr +# +# Description: +# Psychoacoustic model for ONE channel, based on the assignment notes (Section 2.4). +# +# Public API: +# SMR = aac_psycho(frame_T, frame_type, frame_T_prev_1, frame_T_prev_2) +# +# Output: +# - For long frames ("OLS", "LSS", "LPS"): SMR has shape (69,) +# - For short frames ("ESH"): SMR has shape (42, 8) (one column per subframe) +# +# Notes: +# - Uses Bark band tables from material/TableB219.mat: +# * B219a for long windows (69 bands, N=2048 FFT, N/2=1024 bins) +# * B219b for short windows (42 bands, N=256 FFT, N/2=128 bins) +# - Applies a Hann window in time domain before FFT magnitude/phase extraction. +# - Implements: +# spreading function -> band spreading -> tonality index -> masking thresholds -> SMR. +# ------------------------------------------------------------ +from __future__ import annotations + +import numpy as np + +from core.aac_utils import band_limits, get_table +from core.aac_configuration import NMT_DB, TMN_DB +from core.aac_types import * + +# ----------------------------------------------------------------------------- +# Spreading function +# ----------------------------------------------------------------------------- + +def _spreading_matrix(bval: BandValueArray) -> FloatArray: + """ + Compute the spreading function matrix between psychoacoustic bands. + + The spreading function describes how energy in one critical band masks + nearby bands. The formula follows the assignment pseudo-code. + + Parameters + ---------- + bval : BandValueArray + Bark value per band, shape (B,). + + Returns + ------- + FloatArray + Spreading matrix S of shape (B, B), where: + S[bb, b] quantifies the contribution of band bb masking band b. + """ + bval = np.asarray(bval, dtype=np.float64).reshape(-1) + B = int(bval.shape[0]) + + spread = np.zeros((B, B), dtype=np.float64) + + for b in range(B): + for bb in range(B): + # tmpx depends on direction (asymmetric spreading) + if bb >= b: + tmpx = 3.0 * (bval[bb] - bval[b]) + else: + tmpx = 1.5 * (bval[bb] - bval[b]) + + # tmpz uses the "min(..., 0)" nonlinearity exactly as in the notes + tmpz = 8.0 * min((tmpx - 0.5) ** 2 - 2.0 * (tmpx - 0.5), 0.0) + tmpy = 15.811389 + 7.5 * (tmpx + 0.474) - 17.5 * np.sqrt(1.0 + (tmpx + 0.474) ** 2) + + # Clamp very small values (below -100 dB) to 0 contribution + if tmpy < -100.0: + spread[bb, b] = 0.0 + else: + spread[bb, b] = 10.0 ** ((tmpz + tmpy) / 10.0) + + return spread + + +# ----------------------------------------------------------------------------- +# Windowing + FFT feature extraction +# ----------------------------------------------------------------------------- + +def _hann_window(N: int) -> FloatArray: + """ + Hann window as specified in the notes: + w[n] = 0.5 - 0.5*cos(2*pi*(n + 0.5)/N) + + Parameters + ---------- + N : int + Window length. + + Returns + ------- + FloatArray + 1-D array of shape (N,), dtype float64. + """ + n = np.arange(N, dtype=np.float64) + return 0.5 - 0.5 * np.cos((2.0 * np.pi / N) * (n + 0.5)) + + +def _r_phi_from_time(x: FrameChannelT, N: int) -> tuple[FloatArray, FloatArray]: + """ + Compute FFT magnitude r(w) and phase phi(w) for bins w = 0 .. N/2-1. + + Processing: + 1) Apply Hann window in time domain. + 2) Compute N-point FFT. + 3) Keep only the positive-frequency bins [0 .. N/2-1]. + + Parameters + ---------- + x : FrameChannelT + Time-domain samples, shape (N,). + N : int + FFT size (2048 or 256). + + Returns + ------- + r : FloatArray + Magnitude spectrum for bins 0 .. N/2-1, shape (N/2,). + phi : FloatArray + Phase spectrum for bins 0 .. N/2-1, shape (N/2,). + """ + x = np.asarray(x, dtype=np.float64).reshape(-1) + if x.shape[0] != N: + raise ValueError(f"Expected time vector of length {N}, got {x.shape[0]}.") + + w = _hann_window(N) + X = np.fft.fft(x * w, n=N) + + Xp = X[: N // 2] + r = np.abs(Xp).astype(np.float64, copy=False) + phi = np.angle(Xp).astype(np.float64, copy=False) + return r, phi + + +def _predictability( + r: FloatArray, + phi: FloatArray, + r_m1: FloatArray, + phi_m1: FloatArray, + r_m2: FloatArray, + phi_m2: FloatArray, +) -> FloatArray: + """ + Compute predictability c(w) per spectral bin. + + The notes define: + r_pred(w) = 2*r_{-1}(w) - r_{-2}(w) + phi_pred(w) = 2*phi_{-1}(w) - phi_{-2}(w) + + c(w) = |X(w) - X_pred(w)| / (r(w) + |r_pred(w)|) + + where X(w) is represented in polar form using r(w), phi(w). + + Parameters + ---------- + r, phi : FloatArray + Current magnitude and phase, shape (N/2,). + r_m1, phi_m1 : FloatArray + Previous magnitude and phase, shape (N/2,). + r_m2, phi_m2 : FloatArray + Pre-previous magnitude and phase, shape (N/2,). + + Returns + ------- + FloatArray + Predictability c(w), shape (N/2,). + """ + r_pred = 2.0 * r_m1 - r_m2 + phi_pred = 2.0 * phi_m1 - phi_m2 + + num = np.sqrt( + (r * np.cos(phi) - r_pred * np.cos(phi_pred)) ** 2 + + (r * np.sin(phi) - r_pred * np.sin(phi_pred)) ** 2 + ) + den = r + np.abs(r_pred) + 1e-12 # avoid division-by-zero without altering behavior + return (num / den).astype(np.float64, copy=False) + + +# ----------------------------------------------------------------------------- +# Band-domain aggregation +# ----------------------------------------------------------------------------- + +def _band_energy_and_weighted_predictability( + r: FloatArray, + c: FloatArray, + wlow: BandIndexArray, + whigh: BandIndexArray, +) -> tuple[FloatArray, FloatArray]: + """ + Aggregate spectral bin quantities into psychoacoustic bands. + + Definitions (notes): + e(b) = sum_{w=wlow(b)..whigh(b)} r(w)^2 + c_num(b) = sum_{w=wlow(b)..whigh(b)} c(w) * r(w)^2 + + The band predictability c(b) is later computed after spreading as: + cb(b) = ct(b) / ecb(b) + + Parameters + ---------- + r : FloatArray + Magnitude spectrum, shape (N/2,). + c : FloatArray + Predictability per bin, shape (N/2,). + wlow, whigh : BandIndexArray + Band limits (inclusive indices), shape (B,). + + Returns + ------- + e_b : FloatArray + Band energies e(b), shape (B,). + c_num_b : FloatArray + Weighted predictability numerators c_num(b), shape (B,). + """ + r2 = (r * r).astype(np.float64, copy=False) + + B = int(wlow.shape[0]) + e_b = np.zeros(B, dtype=np.float64) + c_num_b = np.zeros(B, dtype=np.float64) + + for b in range(B): + a = int(wlow[b]) + z = int(whigh[b]) + + seg_r2 = r2[a : z + 1] + e_b[b] = float(np.sum(seg_r2)) + c_num_b[b] = float(np.sum(c[a : z + 1] * seg_r2)) + + return e_b, c_num_b + + +def _psycho_one_window( + time_x: FrameChannelT, + prev1_x: FrameChannelT, + prev2_x: FrameChannelT, + *, + N: int, + table: BarkTable, +) -> FloatArray: + """ + Compute SMR for one FFT analysis window (N=2048 for long, N=256 for short). + + This implements the pipeline described in the notes: + - FFT magnitude/phase + - predictability per bin + - band energies and predictability + - band spreading + - tonality index tb(b) + - masking threshold (noise + threshold in quiet) + - SMR(b) = e(b) / np(b) + + Parameters + ---------- + time_x : FrameChannelT + Current time-domain samples, shape (N,). + prev1_x : FrameChannelT + Previous time-domain samples, shape (N,). + prev2_x : FrameChannelT + Pre-previous time-domain samples, shape (N,). + N : int + FFT size. + table : BarkTable + Psychoacoustic band table (B219a or B219b). + + Returns + ------- + FloatArray + SMR per band, shape (B,). + """ + wlow, whigh, bval, qthr_db = band_limits(table) + spread = _spreading_matrix(bval) + + # FFT features for current and history windows + r, phi = _r_phi_from_time(time_x, N) + r_m1, phi_m1 = _r_phi_from_time(prev1_x, N) + r_m2, phi_m2 = _r_phi_from_time(prev2_x, N) + + # Predictability per bin + c_w = _predictability(r, phi, r_m1, phi_m1, r_m2, phi_m2) + + # Aggregate into psycho bands + e_b, c_num_b = _band_energy_and_weighted_predictability(r, c_w, wlow, whigh) + + # Spread energies and predictability across bands: + # ecb(b) = sum_bb e(bb) * S(bb, b) + # ct(b) = sum_bb c_num(bb) * S(bb, b) + ecb = spread.T @ e_b + ct = spread.T @ c_num_b + + # Band predictability after spreading: cb(b) = ct(b) / ecb(b) + cb = ct / (ecb + 1e-12) + + # Normalized energy term: + # en(b) = ecb(b) / sum_bb S(bb, b) + spread_colsum = np.sum(spread, axis=0) + en = ecb / (spread_colsum + 1e-12) + + # Tonality index (clamped to [0, 1]) + tb = -0.299 - 0.43 * np.log(np.maximum(cb, 1e-12)) + tb = np.clip(tb, 0.0, 1.0) + + # Required SNR per band (dB): interpolate between TMN and NMT + snr_b = tb * TMN_DB + (1.0 - tb) * NMT_DB + bc = 10.0 ** (-snr_b / 10.0) + + # Noise masking threshold estimate (power domain) + nb = en * bc + + # Threshold in quiet (convert from dB to power domain): + # qthr_power = (N/2) * 10^(qthr_db/10) + qthr_power = (N / 2.0) * (10.0 ** (qthr_db / 10.0)) + + # Final masking threshold per band: + # np(b) = max(nb(b), qthr(b)) + npart = np.maximum(nb, qthr_power) + + # Signal-to-mask ratio: + # SMR(b) = e(b) / np(b) + smr = e_b / (npart + 1e-12) + return smr.astype(np.float64, copy=False) + + +# ----------------------------------------------------------------------------- +# ESH window slicing (match filterbank conventions) +# ----------------------------------------------------------------------------- + +def _esh_subframes_256(x_2048: FrameChannelT) -> list[FrameChannelT]: + """ + Extract the 8 overlapping 256-sample short windows used by AAC ESH. + + The project convention (matching the filterbank) is: + start_j = 448 + 128*j, for j = 0..7 + subframe_j = x[start_j : start_j + 256] + + This selects the central 1152-sample region [448, 1600) and produces + 8 windows with 50% overlap. + + Parameters + ---------- + x_2048 : FrameChannelT + Time-domain channel frame, shape (2048,). + + Returns + ------- + list[FrameChannelT] + List of 8 subframes, each of shape (256,). + """ + x_2048 = np.asarray(x_2048, dtype=np.float64).reshape(-1) + if x_2048.shape[0] != 2048: + raise ValueError("ESH requires 2048-sample input frames.") + + subs: list[FrameChannelT] = [] + for j in range(8): + start = 448 + 128 * j + subs.append(x_2048[start : start + 256]) + return subs + + +# ----------------------------------------------------------------------------- +# Public API +# ----------------------------------------------------------------------------- + +def aac_psycho( + frame_T: FrameChannelT, + frame_type: FrameType, + frame_T_prev_1: FrameChannelT, + frame_T_prev_2: FrameChannelT, +) -> FloatArray: + """ + Psychoacoustic model for ONE channel. + + Parameters + ---------- + frame_T : FrameChannelT + Current time-domain channel frame, shape (2048,). + For "ESH", the 8 short windows are derived internally. + frame_type : FrameType + AAC frame type ("OLS", "LSS", "ESH", "LPS"). + frame_T_prev_1 : FrameChannelT + Previous time-domain channel frame, shape (2048,). + frame_T_prev_2 : FrameChannelT + Pre-previous time-domain channel frame, shape (2048,). + + Returns + ------- + FloatArray + Signal-to-Mask Ratio (SMR), per psychoacoustic band. + - If frame_type == "ESH": shape (42, 8) + - Else: shape (69,) + """ + frame_T = np.asarray(frame_T, dtype=np.float64).reshape(-1) + frame_T_prev_1 = np.asarray(frame_T_prev_1, dtype=np.float64).reshape(-1) + frame_T_prev_2 = np.asarray(frame_T_prev_2, dtype=np.float64).reshape(-1) + + if frame_T.shape[0] != 2048 or frame_T_prev_1.shape[0] != 2048 or frame_T_prev_2.shape[0] != 2048: + raise ValueError("aac_psycho expects 2048-sample frames for current/prev1/prev2.") + + table, N = get_table(frame_type) + + # Long frame types: compute one SMR vector (69 bands) + if frame_type != "ESH": + return _psycho_one_window(frame_T, frame_T_prev_1, frame_T_prev_2, N=N, table=table) + + # ESH: compute 8 SMR vectors (42 bands each), one per short subframe. + # + # The notes use short-window history for predictability: + # - For j=0: use previous frame's subframes (7, 6) + # - For j=1: use current subframe 0 and previous frame's subframe 7 + # - For j>=2: use current subframes (j-1, j-2) + # + # This matches the "within-frame history" convention commonly used in + # simplified psycho models for ESH. + cur_subs = _esh_subframes_256(frame_T) + prev1_subs = _esh_subframes_256(frame_T_prev_1) + + B = int(table.shape[0]) # expected 42 + smr_out = np.zeros((B, 8), dtype=np.float64) + + for j in range(8): + if j == 0: + x_m1 = prev1_subs[7] + x_m2 = prev1_subs[6] + elif j == 1: + x_m1 = cur_subs[0] + x_m2 = prev1_subs[7] + else: + x_m1 = cur_subs[j - 1] + x_m2 = cur_subs[j - 2] + + smr_out[:, j] = _psycho_one_window(cur_subs[j], x_m1, x_m2, N=256, table=table) + + return smr_out diff --git a/source/core/aac_snr_db.py b/source/core/aac_snr_db.py deleted file mode 100644 index 21e5184..0000000 --- a/source/core/aac_snr_db.py +++ /dev/null @@ -1,60 +0,0 @@ -# ------------------------------------------------------------ -# AAC Coder/Decoder - SNR dB calculator -# -# Multimedia course at Aristotle University of -# Thessaloniki (AUTh) -# -# Author: -# Christos Choutouridis (ΑΕΜ 8997) -# cchoutou@ece.auth.gr -# -# Description: -# This module implements SNR calculation in dB -# ------------------------------------------------------------ -from __future__ import annotations - -from core.aac_types import StereoSignal -import numpy as np - -def snr_db(x_ref: StereoSignal, x_hat: StereoSignal) -> float: - """ - Compute overall SNR (dB) over all samples and channels after aligning lengths. - - Parameters - ---------- - x_ref : StereoSignal - Reference stereo stream. - x_hat : StereoSignal - Reconstructed stereo stream. - - Returns - ------- - float - SNR in dB. - - Returns +inf if noise power is zero. - - Returns -inf if signal power is zero. - """ - x_ref = np.asarray(x_ref, dtype=np.float64) - x_hat = np.asarray(x_hat, dtype=np.float64) - - if x_ref.ndim == 1: - x_ref = x_ref.reshape(-1, 1) - if x_hat.ndim == 1: - x_hat = x_hat.reshape(-1, 1) - - n = min(x_ref.shape[0], x_hat.shape[0]) - c = min(x_ref.shape[1], x_hat.shape[1]) - - x_ref = x_ref[:n, :c] - x_hat = x_hat[:n, :c] - - err = x_ref - x_hat - ps = float(np.sum(x_ref * x_ref)) - pn = float(np.sum(err * err)) - - if pn <= 0.0: - return float("inf") - if ps <= 0.0: - return float("-inf") - - return float(10.0 * np.log10(ps / pn)) \ No newline at end of file diff --git a/source/core/aac_ssc.py b/source/core/aac_ssc.py index 926c854..8c082b3 100644 --- a/source/core/aac_ssc.py +++ b/source/core/aac_ssc.py @@ -173,7 +173,7 @@ def _stereo_merge(ft_l: FrameType, ft_r: FrameType) -> FrameType: # ----------------------------------------------------------------------------- -# Public Function prototypes (Level 1) +# Public Function prototypes # ----------------------------------------------------------------------------- def aac_SSC(frame_T: FrameT, next_frame_T: FrameT, prev_frame_type: FrameType) -> FrameType: diff --git a/source/core/aac_tns.py b/source/core/aac_tns.py index 29e9921..88d9da8 100644 --- a/source/core/aac_tns.py +++ b/source/core/aac_tns.py @@ -33,6 +33,7 @@ from typing import Tuple import numpy as np from scipy.io import loadmat +from core.aac_utils import load_b219_tables from core.aac_configuration import PRED_ORDER, QUANT_STEP, QUANT_MAX from core.aac_types import * @@ -40,41 +41,6 @@ from core.aac_types import * # Private helpers # ----------------------------------------------------------------------------- -_B219_CACHE: dict[str, FloatArray] | None = None - - -def _load_b219_tables() -> dict[str, FloatArray]: - """ - Load TableB219.mat and cache the contents. - - The project layout guarantees that a 'material' directory is discoverable - from the current working directory (tests and level_123 entrypoints). - - Returns - ------- - dict[str, FloatArray] - Keys: - - "B219a": long bands table (for K=1024 MDCT lines) - - "B219b": short bands table (for K=128 MDCT lines) - """ - global _B219_CACHE - if _B219_CACHE is not None: - return _B219_CACHE - - mat_path = Path("material") / "TableB219.mat" - if not mat_path.exists(): - raise FileNotFoundError("Could not locate material/TableB219.mat in the current working directory.") - - d = loadmat(str(mat_path)) - if "B219a" not in d or "B219b" not in d: - raise ValueError("TableB219.mat missing required variables B219a and/or B219b.") - - _B219_CACHE = { - "B219a": np.asarray(d["B219a"], dtype=np.float64), - "B219b": np.asarray(d["B219b"], dtype=np.float64), - } - return _B219_CACHE - def _band_ranges_for_kcount(k_count: int) -> BandRanges: """ @@ -92,7 +58,7 @@ def _band_ranges_for_kcount(k_count: int) -> BandRanges: BandRanges (list[tuple[int, int]]) Each tuple is (start_k, end_k) inclusive. """ - tables = _load_b219_tables() + tables = load_b219_tables() if k_count == 1024: tbl = tables["B219a"] elif k_count == 128: @@ -425,7 +391,7 @@ def _tns_one_vector(x: MdctCoeffs) -> tuple[MdctCoeffs, MdctCoeffs]: # ----------------------------------------------------------------------------- -# Public Functions (Level 2) +# Public Functions # ----------------------------------------------------------------------------- def aac_tns(frame_F_in: FrameChannelF, frame_type: FrameType) -> Tuple[FrameChannelF, TnsCoeffs]: diff --git a/source/core/aac_types.py b/source/core/aac_types.py index b52891a..6642e18 100644 --- a/source/core/aac_types.py +++ b/source/core/aac_types.py @@ -193,6 +193,25 @@ Bark-band index ranges [start, end] (inclusive) for MDCT lines. Used by TNS to map MDCT indices k to Bark bands. """ +BarkTable: TypeAlias = FloatArray +""" +Psychoacoustic Bark band table loaded from TableB219.mat. + +Typical shapes: +- Long: (69, 6) +- Short: (42, 6) +""" + +BandIndexArray: TypeAlias = NDArray[np.int_] +""" +Array of FFT bin indices per psychoacoustic band. +""" + +BandValueArray: TypeAlias = FloatArray +""" +Per-band psychoacoustic values (e.g. Bark position, thresholds). +""" + # ----------------------------------------------------------------------------- # Level 1 AAC sequence payload types # ----------------------------------------------------------------------------- diff --git a/source/core/aac_utils.py b/source/core/aac_utils.py new file mode 100644 index 0000000..0e18ed1 --- /dev/null +++ b/source/core/aac_utils.py @@ -0,0 +1,270 @@ +# ------------------------------------------------------------ +# AAC Coder/Decoder - AAC Utilities +# +# Multimedia course at Aristotle University of +# Thessaloniki (AUTh) +# +# Author: +# Christos Choutouridis (ΑΕΜ 8997) +# cchoutou@ece.auth.gr +# +# Description: +# Shared utility functions used across AAC encoder/decoder levels. +# +# This module currently provides: +# - MDCT / IMDCT conversions +# - Signal-to-Noise Ratio (SNR) computation in dB +# - Loading and access helpers for psychoacoustic band tables +# (TableB219.mat, Tables B.2.1.9a / B.2.1.9b of the AAC specification) +# ------------------------------------------------------------ +from __future__ import annotations + +import numpy as np +from pathlib import Path + +from scipy.io import loadmat +from core.aac_types import * + + +# ----------------------------------------------------------------------------- +# Global cached data +# ----------------------------------------------------------------------------- +# Cached contents of TableB219.mat to avoid repeated disk I/O. +# Keys: +# - "B219a": long-window psychoacoustic bands (69 bands, FFT size 2048) +# - "B219b": short-window psychoacoustic bands (42 bands, FFT size 256) +B219_CACHE: dict[str, BarkTable] | None = None + + +# ----------------------------------------------------------------------------- +# MDCT / IMDCT +# ----------------------------------------------------------------------------- +def mdct(s: TimeSignal) -> MdctCoeffs: + """ + MDCT (direct form) as specified in the assignment. + + Parameters + ---------- + s : TimeSignal + Windowed time samples, 1-D array of length N (N = 2048 or 256). + + Returns + ------- + MdctCoeffs + MDCT coefficients, 1-D array of length N/2. + + Definition + ---------- + X[k] = 2 * sum_{n=0..N-1} s[n] * cos((2*pi/N) * (n + n0) * (k + 1/2)), + where n0 = (N/2 + 1)/2. + """ + s = np.asarray(s, dtype=np.float64).reshape(-1) + N = int(s.shape[0]) + if N not in (2048, 256): + raise ValueError("MDCT input length must be 2048 or 256.") + + n0 = (N / 2.0 + 1.0) / 2.0 + n = np.arange(N, dtype=np.float64) + n0 + k = np.arange(N // 2, dtype=np.float64) + 0.5 + + C = np.cos((2.0 * np.pi / N) * np.outer(n, k)) # (N, N/2) + X = 2.0 * (s @ C) # (N/2,) + return X + + +def imdct(X: MdctCoeffs) -> TimeSignal: + """ + IMDCT (direct form) as specified in the assignment. + + Parameters + ---------- + X : MdctCoeffs + MDCT coefficients, 1-D array of length K (K = 1024 or 128). + + Returns + ------- + TimeSignal + Reconstructed time samples, 1-D array of length N = 2K. + + Definition + ---------- + s[n] = (2/N) * sum_{k=0..N/2-1} X[k] * cos((2*pi/N) * (n + n0) * (k + 1/2)), + where n0 = (N/2 + 1)/2. + """ + X = np.asarray(X, dtype=np.float64).reshape(-1) + K = int(X.shape[0]) + if K not in (1024, 128): + raise ValueError("IMDCT input length must be 1024 or 128.") + + N = 2 * K + n0 = (N / 2.0 + 1.0) / 2.0 + + n = np.arange(N, dtype=np.float64) + n0 + k = np.arange(K, dtype=np.float64) + 0.5 + + C = np.cos((2.0 * np.pi / N) * np.outer(n, k)) # (N, K) + s = (2.0 / N) * (C @ X) # (N,) + return s + + +# ----------------------------------------------------------------------------- +# Signal quality metrics +# ----------------------------------------------------------------------------- + +def snr_db(x_ref: StereoSignal, x_hat: StereoSignal) -> float: + """ + Compute the overall Signal-to-Noise Ratio (SNR) in dB. + + The SNR is computed over all available samples and channels, + after conservatively aligning the two signals to their common + length and channel count. + + Parameters + ---------- + x_ref : StereoSignal + Reference (original) signal. + Typical shape: (N, 2) for stereo. + x_hat : StereoSignal + Reconstructed or processed signal. + Typical shape: (M, 2) for stereo. + + Returns + ------- + float + SNR in dB. + - +inf if the noise power is zero (perfect reconstruction). + - -inf if the reference signal power is zero. + """ + x_ref = np.asarray(x_ref, dtype=np.float64) + x_hat = np.asarray(x_hat, dtype=np.float64) + + # Ensure 2-D shape: (samples, channels) + if x_ref.ndim == 1: + x_ref = x_ref.reshape(-1, 1) + if x_hat.ndim == 1: + x_hat = x_hat.reshape(-1, 1) + + # Align lengths and channel count conservatively + n = min(x_ref.shape[0], x_hat.shape[0]) + c = min(x_ref.shape[1], x_hat.shape[1]) + + x_ref = x_ref[:n, :c] + x_hat = x_hat[:n, :c] + + err = x_ref - x_hat + ps = float(np.sum(x_ref * x_ref)) # signal power + pn = float(np.sum(err * err)) # noise power + + if pn <= 0.0: + return float("inf") + if ps <= 0.0: + return float("-inf") + + return float(10.0 * np.log10(ps / pn)) + + +# ----------------------------------------------------------------------------- +# Psychoacoustic band tables (TableB219.mat) +# ----------------------------------------------------------------------------- + +def load_b219_tables() -> dict[str, BarkTable]: + """ + Load and cache psychoacoustic band tables from TableB219.mat. + + The assignment/project layout assumes that a 'material' directory + is available in the current working directory when running: + - tests + - level_1 / level_2 / level_3 entrypoints + + This function loads the tables once and caches them for subsequent calls. + + Returns + ------- + dict[str, BarkTable] + Dictionary with the following entries: + - "B219a": long-window psychoacoustic table + (69 bands, FFT size 2048 / 1024 spectral lines) + - "B219b": short-window psychoacoustic table + (42 bands, FFT size 256 / 128 spectral lines) + """ + global B219_CACHE + if B219_CACHE is not None: + return B219_CACHE + + mat_path = Path("material") / "TableB219.mat" + if not mat_path.exists(): + raise FileNotFoundError( + "Could not locate material/TableB219.mat in the current working directory." + ) + + data = loadmat(str(mat_path)) + if "B219a" not in data or "B219b" not in data: + raise ValueError( + "TableB219.mat missing required variables 'B219a' and/or 'B219b'." + ) + + B219_CACHE = { + "B219a": np.asarray(data["B219a"], dtype=np.float64), + "B219b": np.asarray(data["B219b"], dtype=np.float64), + } + return B219_CACHE + + +def get_table(frame_type: FrameType) -> tuple[BarkTable, int]: + """ + Select the appropriate psychoacoustic band table and FFT size + based on the AAC frame type. + + Parameters + ---------- + frame_type : FrameType + AAC frame type ("OLS", "LSS", "ESH", "LPS"). + + Returns + ------- + table : BarkTable + Psychoacoustic band table: + - B219a for long frames + - B219b for ESH short subframes + N : int + FFT size corresponding to the table: + - 2048 for long frames + - 256 for short frames (ESH) + """ + tables = load_b219_tables() + if frame_type == "ESH": + return tables["B219b"], 256 + return tables["B219a"], 2048 + + +def band_limits( + table: BarkTable, +) -> tuple[BandIndexArray, BandIndexArray, BandValueArray, BandValueArray]: + """ + Extract per-band metadata from a TableB2.1.9 psychoacoustic table. + + The column layout follows the provided TableB219.mat file and the + AAC specification tables B.2.1.9a / B.2.1.9b. + + Parameters + ---------- + table : BarkTable + Psychoacoustic band table (B219a or B219b). + + Returns + ------- + wlow : BandIndexArray + Lower FFT bin index (inclusive) for each band. + whigh : BandIndexArray + Upper FFT bin index (inclusive) for each band. + bval : BandValueArray + Bark-scale (or equivalent) band position values. + Used in the spreading function. + qthr_db : BandValueArray + Threshold in quiet for each band, in dB. + """ + wlow = table[:, 1].astype(int) + whigh = table[:, 2].astype(int) + bval = table[:, 4].astype(np.float64) + qthr_db = table[:, 5].astype(np.float64) + return wlow, whigh, bval, qthr_db diff --git a/source/core/tests/test_aac_coder_decoder.py b/source/core/tests/test_aac_coder_decoder.py index 696c94f..1d83e89 100644 --- a/source/core/tests/test_aac_coder_decoder.py +++ b/source/core/tests/test_aac_coder_decoder.py @@ -22,7 +22,7 @@ import soundfile as sf from core.aac_coder import aac_coder_1, aac_coder_2, aac_read_wav_stereo_48k from core.aac_decoder import aac_decoder_1, aac_decoder_2, aac_remove_padding from core.aac_types import * -from core.aac_snr_db import snr_db +from core.aac_utils import snr_db # Helper "fixtures" for aac_coder_1 / i_aac_coder_1 @@ -222,4 +222,4 @@ def test_end_to_end_level_2_high_snr(tmp_stereo_wav: Path, tmp_path: Path) -> No assert int(fs_hat) == 48000 snr = snr_db(x_ref, x_hat) - assert snr > 75.0 \ No newline at end of file + assert snr > 80 \ No newline at end of file diff --git a/source/core/tests/test_filterbank.py b/source/core/tests/test_filterbank.py index e0309ed..34e69e2 100644 --- a/source/core/tests/test_filterbank.py +++ b/source/core/tests/test_filterbank.py @@ -17,7 +17,7 @@ from typing import Sequence import pytest from core.aac_filterbank import aac_filter_bank, aac_i_filter_bank -from core.aac_snr_db import snr_db +from core.aac_utils import snr_db from core.aac_types import * # Helper fixtures for filterbank diff --git a/source/core/tests/test_filterbank_internal.py b/source/core/tests/test_filterbank_internal.py deleted file mode 100644 index e092ad1..0000000 --- a/source/core/tests/test_filterbank_internal.py +++ /dev/null @@ -1,117 +0,0 @@ -# ------------------------------------------------------------ -# AAC Coder/Decoder - Filterbank internal (mdct) Tests -# -# Multimedia course at Aristotle University of -# Thessaloniki (AUTh) -# -# Author: -# Christos Choutouridis (ΑΕΜ 8997) -# cchoutou@ece.auth.gr -# -# Description: -# Tests for Filterbank internal MDCT/IMDCT functionality. -# ------------------------------------------------------------ -from __future__ import annotations - -import numpy as np -import pytest - -from core.aac_filterbank import _imdct, _mdct -from core.aac_types import FloatArray, TimeSignal, MdctCoeffs - - -def _assert_allclose(a: FloatArray, b: FloatArray, *, rtol: float, atol: float) -> None: - """ - Helper for consistent tolerances across tests. - """ - np.testing.assert_allclose(a, b, rtol=rtol, atol=atol) - - -def _estimate_gain(y: MdctCoeffs, x: MdctCoeffs) -> float: - """ - Estimate scalar gain g such that y ~= g*x in least-squares sense. - """ - denom = float(np.dot(x, x)) - if denom == 0.0: - return 0.0 - return float(np.dot(y, x) / denom) - - -tolerance = 1e-10 - -@pytest.mark.parametrize("N", [256, 2048]) -def test_mdct_imdct_mdct_identity_up_to_gain(N: int) -> None: - """ - Consistency test in coefficient domain: - mdct(imdct(X)) ~= g * X - - For the chosen (non-orthonormal) scaling, g is expected to be close to 2. - """ - rng = np.random.default_rng(0) - K = N // 2 - - X: MdctCoeffs = rng.normal(size=K).astype(np.float64) - x: TimeSignal = _imdct(X) - X_hat: MdctCoeffs = _mdct(x) - - g = _estimate_gain(X_hat, X) - _assert_allclose(X_hat, g * X, rtol=tolerance, atol=tolerance) - _assert_allclose(np.array([g], dtype=np.float64), np.array([2.0], dtype=np.float64), rtol=tolerance, atol=tolerance) - - -@pytest.mark.parametrize("N", [256, 2048]) -def test_mdct_linearity(N: int) -> None: - """ - Linearity test: - mdct(a*x + b*y) == a*mdct(x) + b*mdct(y) - """ - rng = np.random.default_rng(1) - x: TimeSignal = rng.normal(size=N).astype(np.float64) - y: TimeSignal = rng.normal(size=N).astype(np.float64) - - a = 0.37 - b = -1.12 - - left: MdctCoeffs = _mdct(a * x + b * y) - right: MdctCoeffs = a * _mdct(x) + b * _mdct(y) - - _assert_allclose(left, right, rtol=tolerance, atol=tolerance) - - -@pytest.mark.parametrize("N", [256, 2048]) -def test_imdct_linearity(N: int) -> None: - """ - Linearity test for IMDCT: - imdct(a*X + b*Y) == a*imdct(X) + b*imdct(Y) - """ - rng = np.random.default_rng(2) - K = N // 2 - - X: MdctCoeffs = rng.normal(size=K).astype(np.float64) - Y: MdctCoeffs = rng.normal(size=K).astype(np.float64) - - a = -0.5 - b = 2.0 - - left: TimeSignal = _imdct(a * X + b * Y) - right: TimeSignal = a * _imdct(X) + b * _imdct(Y) - - _assert_allclose(left, right, rtol=tolerance, atol=tolerance) - - -@pytest.mark.parametrize("N", [256, 2048]) -def test_mdct_imdct_outputs_are_finite(N: int) -> None: - """ - Sanity test: no NaN/inf on random inputs. - """ - rng = np.random.default_rng(3) - K = N // 2 - - x: TimeSignal = rng.normal(size=N).astype(np.float64) - X: MdctCoeffs = rng.normal(size=K).astype(np.float64) - - X1 = _mdct(x) - x1 = _imdct(X) - - assert np.isfinite(X1).all() - assert np.isfinite(x1).all() diff --git a/source/core/tests/test_psycho.py b/source/core/tests/test_psycho.py new file mode 100644 index 0000000..0238a83 --- /dev/null +++ b/source/core/tests/test_psycho.py @@ -0,0 +1,253 @@ +# ------------------------------------------------------------ +# AAC Coder/Decoder - Psychoacoustic Model Tests +# +# Multimedia course at Aristotle University of +# Thessaloniki (AUTh) +# +# Author: +# Christos Choutouridis (ΑΕΜ 8997) +# +# Description: +# Contract + sanity tests for the psychoacoustic model (core.aac_psycho). +# +# These tests focus on: +# - output shapes per frame_type (long vs ESH), +# - numerical sanity (finite, non-negative), +# - deterministic behavior, +# - ESH central-region dependency (outer regions must not affect result), +# - basic input validation (length checks). +# +# We intentionally avoid asserting exact numeric values, because the model +# includes FFT operations and table-driven psychoacoustic parameters. +# ------------------------------------------------------------ +from __future__ import annotations + +import numpy as np +import pytest + +from core.aac_psycho import aac_psycho +from core.aac_types import FrameChannelT, FrameType + + +# ----------------------------------------------------------------------------- +# Helpers +# ----------------------------------------------------------------------------- + +def _make_frames( + *, + kind: str, + amp: float = 1.0, + seed: int = 0, +) -> tuple[FrameChannelT, FrameChannelT, FrameChannelT]: + """ + Create (current, prev1, prev2) 2048-sample frames for one channel. + + Parameters + ---------- + kind : str + "noise" or "tone". + amp : float + Amplitude scaling (applied to all frames). + seed : int + RNG seed for reproducibility (noise case). + + Returns + ------- + tuple[FrameChannelT, FrameChannelT, FrameChannelT] + Three arrays of shape (2048,), dtype float64. + """ + if kind == "noise": + rng = np.random.default_rng(seed) + x2 = amp * rng.normal(size=2048).astype(np.float64) + x1 = amp * rng.normal(size=2048).astype(np.float64) + x0 = amp * rng.normal(size=2048).astype(np.float64) + return x0, x1, x2 + + if kind == "tone": + # A simple sinusoid which is identical across frames (highly predictable). + n = np.arange(2048, dtype=np.float64) + f0 = 13.0 # arbitrary normalized-bin-ish tone (not critical for these tests) + tone = amp * np.sin(2.0 * np.pi * f0 * n / 2048.0).astype(np.float64) + return tone, tone.copy(), tone.copy() + + raise ValueError(f"Unknown kind: {kind!r}") + + +def _assert_finite_nonnegative(x: np.ndarray) -> None: + """Utility assertions for psycho outputs.""" + assert np.isfinite(x).all() + # SMR is a ratio of energies, it should not be negative. + assert np.min(x) >= 0.0 + + +# ----------------------------------------------------------------------------- +# Shape / contract tests +# ----------------------------------------------------------------------------- + +@pytest.mark.parametrize("frame_type", ["OLS", "LSS", "LPS"]) +def test_psycho_long_shapes(frame_type: FrameType) -> None: + """ + Contract test: + For long frame types, psycho returns SMR shape (69,). + """ + x0, x1, x2 = _make_frames(kind="noise", seed=1, amp=1.0) + + smr = aac_psycho(x0, frame_type, x1, x2) + assert isinstance(smr, np.ndarray) + assert smr.shape == (69,) + _assert_finite_nonnegative(smr) + + +def test_psycho_esh_shape() -> None: + """ + Contract test: + For ESH, psycho returns SMR shape (42, 8). + """ + x0, x1, x2 = _make_frames(kind="noise", seed=2, amp=1.0) + + smr = aac_psycho(x0, "ESH", x1, x2) + assert isinstance(smr, np.ndarray) + assert smr.shape == (42, 8) + _assert_finite_nonnegative(smr) + + +def test_psycho_is_deterministic_for_same_inputs() -> None: + """ + Determinism test: + Psycho must return the same output for identical inputs. + """ + x0, x1, x2 = _make_frames(kind="noise", seed=3, amp=1.0) + + smr1 = aac_psycho(x0, "OLS", x1, x2) + smr2 = aac_psycho(x0, "OLS", x1, x2) + + np.testing.assert_allclose(smr1, smr2, rtol=0.0, atol=0.0) + + +# ----------------------------------------------------------------------------- +# ESH-specific behavior tests +# ----------------------------------------------------------------------------- + +def test_psycho_esh_ignores_outer_regions() -> None: + """ + Spec-driven behavior test: + In this project, ESH uses only the central region of the 2048-sample frame to + derive the 8 overlapping 256-sample subframes: + start = 448 + 128*j, j=0..7 + + Therefore, changing samples outside [448, 1600) must not affect the output. + """ + rng = np.random.default_rng(10) + + # Build base frames (current and prev1) with identical central region. + center_cur = rng.normal(size=1152).astype(np.float64) + center_prev1 = rng.normal(size=1152).astype(np.float64) + + cur_a = np.zeros(2048, dtype=np.float64) + cur_b = np.zeros(2048, dtype=np.float64) + prev1_a = np.zeros(2048, dtype=np.float64) + prev1_b = np.zeros(2048, dtype=np.float64) + + cur_a[448:1600] = center_cur + cur_b[448:1600] = center_cur + prev1_a[448:1600] = center_prev1 + prev1_b[448:1600] = center_prev1 + + # Modify only outer regions in the *_b variants. + cur_b[:448] = rng.normal(size=448) + cur_b[1600:] = rng.normal(size=448) + prev1_b[:448] = rng.normal(size=448) + prev1_b[1600:] = rng.normal(size=448) + + # prev2 is irrelevant for the chosen ESH history convention; keep it fixed. + prev2 = rng.normal(size=2048).astype(np.float64) + + smr_a = aac_psycho(cur_a, "ESH", prev1_a, prev2) + smr_b = aac_psycho(cur_b, "ESH", prev1_b, prev2) + + np.testing.assert_allclose(smr_a, smr_b, rtol=0.0, atol=0.0) + + +def test_psycho_esh_columns_are_not_all_identical_for_random_input() -> None: + """ + Sanity test: + For random input, different ESH subframes should typically produce + different SMR columns (not a strict requirement, but a strong sanity signal). + + We check that at least one column differs from another beyond a tiny tolerance. + """ + x0, x1, x2 = _make_frames(kind="noise", seed=11, amp=1.0) + + smr = aac_psycho(x0, "ESH", x1, x2) + # Compare column 0 vs column 7; for random signals they should differ. + diff = np.max(np.abs(smr[:, 0] - smr[:, 7])) + assert diff > 1e-12 + + +# ----------------------------------------------------------------------------- +# Scaling sanity (avoid fragile numeric targets) +# ----------------------------------------------------------------------------- + +def test_psycho_long_smr_is_mostly_monotone_with_amplitude() -> None: + """ + Sanity test: + Increasing signal amplitude should not reduce the SMR for the vast majority + of Bark bands. + + Due to the use of max(nb, qthr), a small fraction of bands close to the + threshold-in-quiet boundary may violate strict monotonicity. This is expected + behavior, so we test a percentage-based criterion instead of a strict one. + """ + x0, x1, x2 = _make_frames(kind="noise", seed=20, amp=1e3) + y0, y1, y2 = (2.0 * x0, 2.0 * x1, 2.0 * x2) + + smr1 = aac_psycho(x0, "OLS", x1, x2) + smr2 = aac_psycho(y0, "OLS", y1, y2) + + eps = 1e-12 + nondecreasing = np.sum(smr2 + eps >= smr1) + + ratio = nondecreasing / smr1.size + + # Expect monotonic behavior for the overwhelming majority of bands. + assert ratio >= 0.95 + +def test_psycho_long_is_approximately_scale_invariant_at_high_level() -> None: + """ + Sanity test (robust): + At high levels, SMR should be approximately scale-invariant for most bands. + Some bands may deviate due to the max(nb, qthr) branch. + """ + x0, x1, x2 = _make_frames(kind="noise", seed=20, amp=1e3) + y0, y1, y2 = (2.0 * x0, 2.0 * x1, 2.0 * x2) + + smr1 = aac_psycho(x0, "OLS", x1, x2) + smr2 = aac_psycho(y0, "OLS", y1, y2) + + rel = np.abs(smr2 - smr1) / np.maximum(np.abs(smr1), 1e-12) + + # Most bands should be close (<= 5%), but allow a small number of outliers. + close = np.sum(rel <= 5e-2) + assert close >= (smr1.size - 2) # allow up to 2 bands to deviate + + +# ----------------------------------------------------------------------------- +# Input validation tests +# ----------------------------------------------------------------------------- + +def test_psycho_rejects_wrong_lengths() -> None: + """ + Contract test: + aac_psycho requires 2048-sample frames for current/prev1/prev2. + """ + x = np.zeros(2048, dtype=np.float64) + bad = np.zeros(2047, dtype=np.float64) + + with pytest.raises(ValueError): + _ = aac_psycho(bad, "OLS", x, x) + + with pytest.raises(ValueError): + _ = aac_psycho(x, "OLS", bad, x) + + with pytest.raises(ValueError): + _ = aac_psycho(x, "OLS", x, bad) diff --git a/source/core/tests/test_snr_db.py b/source/core/tests/test_snr_db.py deleted file mode 100644 index 639fdd1..0000000 --- a/source/core/tests/test_snr_db.py +++ /dev/null @@ -1,98 +0,0 @@ -# ------------------------------------------------------------ -# AAC Coder/Decoder - SNR dB Tests -# -# Multimedia course at Aristotle University of -# Thessaloniki (AUTh) -# -# Author: -# Christos Choutouridis (ΑΕΜ 8997) -# cchoutou@ece.auth.gr -# -# Description: -# Basic tests for SNR calculation utility. -# ------------------------------------------------------------ -from __future__ import annotations - -import numpy as np -import pytest - -from core.aac_snr_db import snr_db -from core.aac_types import StereoSignal - - -def test_snr_perfect_reconstruction_returns_inf() -> None: - """ - If x_hat == x_ref exactly, noise power is zero and SNR must be +inf. - """ - rng = np.random.default_rng(0) - x: StereoSignal = rng.normal(size=(1024, 2)).astype(np.float64) - - snr = snr_db(x, x) - assert snr == float("inf") - - -def test_snr_zero_reference_returns_minus_inf() -> None: - """ - If reference signal is identically zero, signal power is zero - and SNR must be -inf (unless noise is also zero, which is degenerate). - """ - x_ref: StereoSignal = np.zeros((1024, 2), dtype=np.float64) - x_hat: StereoSignal = np.ones((1024, 2), dtype=np.float64) - - snr = snr_db(x_ref, x_hat) - assert snr == float("-inf") - - -def test_snr_known_noise_level_matches_expected_value() -> None: - """ - Deterministic test with known signal and noise power. - - Let: - x_ref = ones - x_hat = ones + noise - - With noise variance sigma^2, expected SNR: - 10 * log10(Ps / Pn) - """ - n = 1000 - sigma = 0.1 - - x_ref: StereoSignal = np.ones((n, 2), dtype=np.float64) - noise = sigma * np.ones((n, 2), dtype=np.float64) - x_hat: StereoSignal = x_ref + noise - - ps = float(np.sum(x_ref * x_ref)) - pn = float(np.sum(noise * noise)) - expected = 10.0 * np.log10(ps / pn) - - snr = snr_db(x_ref, x_hat) - assert np.isclose(snr, expected, rtol=1e-12, atol=1e-12) - - -def test_snr_aligns_different_lengths_and_channels() -> None: - """ - The function must: - - align to minimum length - - align to minimum channel count - without crashing. - """ - rng = np.random.default_rng(1) - - x_ref: StereoSignal = rng.normal(size=(1000, 2)).astype(np.float64) - x_hat: StereoSignal = rng.normal(size=(800, 1)).astype(np.float64) - - snr = snr_db(x_ref, x_hat) - assert np.isfinite(snr) - - -def test_snr_accepts_1d_inputs() -> None: - """ - 1-D inputs must be accepted and treated as single-channel signals. - """ - rng = np.random.default_rng(2) - - x_ref = rng.normal(size=1024).astype(np.float64) - x_hat = x_ref + 0.01 * rng.normal(size=1024).astype(np.float64) - - snr = snr_db(x_ref, x_hat) - assert np.isfinite(snr) diff --git a/source/core/tests/test_utils.py b/source/core/tests/test_utils.py new file mode 100644 index 0000000..9262259 --- /dev/null +++ b/source/core/tests/test_utils.py @@ -0,0 +1,329 @@ +# ------------------------------------------------------------ +# AAC Coder/Decoder - SNR dB Tests +# +# Multimedia course at Aristotle University of +# Thessaloniki (AUTh) +# +# Author: +# Christos Choutouridis (ΑΕΜ 8997) +# cchoutou@ece.auth.gr +# +# Description: +# - Basic tests for SNR calculation utility. +# - Contract and sanity tests for TableB219-related utilities. +# +# These tests do NOT validate the numerical correctness of the +# Bark tables themselves (they are given by the AAC spec), +# but instead ensure: +# - correct loading from disk, +# - correct table selection per frame type, +# - internal consistency of band limits, +# - correct caching behavior. +# ------------------------------------------------------------ +from __future__ import annotations + +import numpy as np +import pytest + +from core.aac_utils import mdct, imdct, snr_db, load_b219_tables, get_table, band_limits +from core.aac_types import * + +tolerance = 1e-10 + +# mdct / imdct +# ------------------------------------------------------------ + +def _assert_allclose(a: FloatArray, b: FloatArray, *, rtol: float, atol: float) -> None: + """ + Helper for consistent tolerances across tests. + """ + np.testing.assert_allclose(a, b, rtol=rtol, atol=atol) + + +def _estimate_gain(y: MdctCoeffs, x: MdctCoeffs) -> float: + """ + Estimate scalar gain g such that y ~= g*x in least-squares sense. + """ + denom = float(np.dot(x, x)) + if denom == 0.0: + return 0.0 + return float(np.dot(y, x) / denom) + + + +@pytest.mark.parametrize("N", [256, 2048]) +def test_mdct_imdct_mdct_identity_up_to_gain(N: int) -> None: + """ + Consistency test in coefficient domain: + mdct(imdct(X)) ~= g * X + + For the chosen (non-orthonormal) scaling, g is expected to be close to 2. + """ + rng = np.random.default_rng(0) + K = N // 2 + + X: MdctCoeffs = rng.normal(size=K).astype(np.float64) + x: TimeSignal = imdct(X) + X_hat: MdctCoeffs = mdct(x) + + g = _estimate_gain(X_hat, X) + _assert_allclose(X_hat, g * X, rtol=tolerance, atol=tolerance) + _assert_allclose(np.array([g], dtype=np.float64), np.array([2.0], dtype=np.float64), rtol=tolerance, atol=tolerance) + + +@pytest.mark.parametrize("N", [256, 2048]) +def test_mdct_linearity(N: int) -> None: + """ + Linearity test: + mdct(a*x + b*y) == a*mdct(x) + b*mdct(y) + """ + rng = np.random.default_rng(1) + x: TimeSignal = rng.normal(size=N).astype(np.float64) + y: TimeSignal = rng.normal(size=N).astype(np.float64) + + a = 0.37 + b = -1.12 + + left: MdctCoeffs = mdct(a * x + b * y) + right: MdctCoeffs = a * mdct(x) + b * mdct(y) + + _assert_allclose(left, right, rtol=tolerance, atol=tolerance) + + +@pytest.mark.parametrize("N", [256, 2048]) +def test_imdct_linearity(N: int) -> None: + """ + Linearity test for IMDCT: + imdct(a*X + b*Y) == a*imdct(X) + b*imdct(Y) + """ + rng = np.random.default_rng(2) + K = N // 2 + + X: MdctCoeffs = rng.normal(size=K).astype(np.float64) + Y: MdctCoeffs = rng.normal(size=K).astype(np.float64) + + a = -0.5 + b = 2.0 + + left: TimeSignal = imdct(a * X + b * Y) + right: TimeSignal = a * imdct(X) + b * imdct(Y) + + _assert_allclose(left, right, rtol=tolerance, atol=tolerance) + + +@pytest.mark.parametrize("N", [256, 2048]) +def test_mdct_imdct_outputs_are_finite(N: int) -> None: + """ + Sanity test: no NaN/inf on random inputs. + """ + rng = np.random.default_rng(3) + K = N // 2 + + x: TimeSignal = rng.normal(size=N).astype(np.float64) + X: MdctCoeffs = rng.normal(size=K).astype(np.float64) + + X1 = mdct(x) + x1 = imdct(X) + + assert np.isfinite(X1).all() + assert np.isfinite(x1).all() + + + +# SNR +# ------------------------------------------------------------ + +def test_snr_perfect_reconstruction_returns_inf() -> None: + """ + If x_hat == x_ref exactly, noise power is zero and SNR must be +inf. + """ + rng = np.random.default_rng(0) + x: StereoSignal = rng.normal(size=(1024, 2)).astype(np.float64) + + snr = snr_db(x, x) + assert snr == float("inf") + + +def test_snr_zero_reference_returns_minus_inf() -> None: + """ + If reference signal is identically zero, signal power is zero + and SNR must be -inf (unless noise is also zero, which is degenerate). + """ + x_ref: StereoSignal = np.zeros((1024, 2), dtype=np.float64) + x_hat: StereoSignal = np.ones((1024, 2), dtype=np.float64) + + snr = snr_db(x_ref, x_hat) + assert snr == float("-inf") + + +def test_snr_known_noise_level_matches_expected_value() -> None: + """ + Deterministic test with known signal and noise power. + + Let: + x_ref = ones + x_hat = ones + noise + + With noise variance sigma^2, expected SNR: + 10 * log10(Ps / Pn) + """ + n = 1000 + sigma = 0.1 + + x_ref: StereoSignal = np.ones((n, 2), dtype=np.float64) + noise = sigma * np.ones((n, 2), dtype=np.float64) + x_hat: StereoSignal = x_ref + noise + + ps = float(np.sum(x_ref * x_ref)) + pn = float(np.sum(noise * noise)) + expected = 10.0 * np.log10(ps / pn) + + snr = snr_db(x_ref, x_hat) + assert np.isclose(snr, expected, rtol=1e-12, atol=1e-12) + + +def test_snr_aligns_different_lengths_and_channels() -> None: + """ + The function must: + - align to minimum length + - align to minimum channel count + without crashing. + """ + rng = np.random.default_rng(1) + + x_ref: StereoSignal = rng.normal(size=(1000, 2)).astype(np.float64) + x_hat: StereoSignal = rng.normal(size=(800, 1)).astype(np.float64) + + snr = snr_db(x_ref, x_hat) + assert np.isfinite(snr) + + +def test_snr_accepts_1d_inputs() -> None: + """ + 1-D inputs must be accepted and treated as single-channel signals. + """ + rng = np.random.default_rng(2) + + x_ref = rng.normal(size=1024).astype(np.float64) + x_hat = x_ref + 0.01 * rng.normal(size=1024).astype(np.float64) + + snr = snr_db(x_ref, x_hat) + assert np.isfinite(snr) + + + +# Table219b +# ------------------------------------------------------------ + +def test_load_b219_tables_returns_expected_keys() -> None: + """ + Contract test: + TableB219.mat must load successfully and expose both tables + required by the psychoacoustic model. + + The AAC spec defines: + - B219a: long-frame Bark bands + - B219b: short-frame Bark bands + """ + tables = load_b219_tables() + + assert isinstance(tables, dict) + assert "B219a" in tables + assert "B219b" in tables + + +def test_b219_table_shapes_are_correct() -> None: + """ + Sanity test: + Verify that the Bark tables have the expected number of bands + and sufficient columns. + + Expected from AAC spec: + - B219a: 69 bands (long frames) + - B219b: 42 bands (short frames) + - At least 6 columns (as accessed by band_limits()). + """ + tables = load_b219_tables() + + B219a = tables["B219a"] + assert B219a.ndim == 2 + assert B219a.shape[0] == 69 + assert B219a.shape[1] >= 6 + + B219b = tables["B219b"] + assert B219b.ndim == 2 + assert B219b.shape[0] == 42 + assert B219b.shape[1] >= 6 + + +def test_get_table_returns_correct_fft_size() -> None: + """ + Interface test: + get_table(frame_type) must return both: + - the correct Bark table + - the correct FFT size N + + This mapping is fundamental for the psychoacoustic model. + """ + table_long, N_long = get_table("OLS") + assert N_long == 2048 + assert table_long.shape[0] == 69 + + table_short, N_short = get_table("ESH") + assert N_short == 256 + assert table_short.shape[0] == 42 + + +def test_band_limits_are_consistent_for_long_table() -> None: + """ + Sanity test for band limits (long frames): + + For each Bark band: + - wlow <= whigh + - frequency indices stay within [0, N/2) + - all returned arrays have consistent lengths + """ + table, N = get_table("OLS") + wlow, whigh, bval, qthr = band_limits(table) + + B = table.shape[0] + assert len(wlow) == B + assert len(whigh) == B + assert len(bval) == B + assert len(qthr) == B + + for b in range(B): + assert 0 <= wlow[b] <= whigh[b] + assert whigh[b] < N // 2 + + +def test_band_limits_are_consistent_for_short_table() -> None: + """ + Sanity test for band limits (short frames / ESH). + + Same invariants as for long frames, but with FFT size N=256. + """ + table, N = get_table("ESH") + wlow, whigh, bval, qthr = band_limits(table) + + B = table.shape[0] + assert len(wlow) == B + assert len(whigh) == B + + for b in range(B): + assert 0 <= wlow[b] <= whigh[b] + assert whigh[b] < N // 2 + + +def test_b219_tables_are_cached() -> None: + """ + Implementation test: + load_b219_tables() should cache the loaded tables so that + subsequent calls return the same object (identity check). + + This avoids repeated disk I/O during psychoacoustic analysis. + """ + t1 = load_b219_tables() + t2 = load_b219_tables() + + assert t1 is t2 \ No newline at end of file diff --git a/source/level_1/core/aac_coder.py b/source/level_1/core/aac_coder.py index 0b6b939..fc3f8ac 100644 --- a/source/level_1/core/aac_coder.py +++ b/source/level_1/core/aac_coder.py @@ -9,16 +9,8 @@ # cchoutou@ece.auth.gr # # Description: -# Level 1 AAC encoder orchestration. -# Keeps the same functional behavior as the original level_1 implementation: -# - Reads WAV via soundfile -# - Validates stereo and 48 kHz -# - Frames into 2048 samples with hop=1024 and zero padding at both ends -# - SSC decision uses next-frame attack detection -# - Filterbank analysis (MDCT) -# - Stores per-channel spectra in AACSeq1 schema: -# * ESH: (128, 8) -# * else: (1024, 1) +# - Level 1 AAC encoder orchestration. +# - Level 2 AAC encoder orchestration. # ------------------------------------------------------------ from __future__ import annotations @@ -199,6 +191,10 @@ def aac_coder_1(filename_in: Union[str, Path]) -> AACSeq1: return aac_seq +# ----------------------------------------------------------------------------- +# Level 2 encoder +# ----------------------------------------------------------------------------- + def aac_coder_2(filename_in: Union[str, Path]) -> AACSeq2: """ Level-2 AAC encoder (Level 1 + TNS). diff --git a/source/level_1/core/aac_configuration.py b/source/level_1/core/aac_configuration.py index bb75d2e..63d6297 100644 --- a/source/level_1/core/aac_configuration.py +++ b/source/level_1/core/aac_configuration.py @@ -15,6 +15,8 @@ from __future__ import annotations # Imports +from typing import Final + from core.aac_types import WinType # Filterbank @@ -28,4 +30,12 @@ WIN_TYPE: WinType = "SIN" # ------------------------------------------------------------ PRED_ORDER = 4 QUANT_STEP = 0.1 -QUANT_MAX = 0.7 # 4-bit symmetric with step 0.1 -> clamp to [-0.7, +0.7] \ No newline at end of file +QUANT_MAX = 0.7 # 4-bit symmetric with step 0.1 -> clamp to [-0.7, +0.7] + + +# ----------------------------------------------------------------------------- +# Psycho +# ----------------------------------------------------------------------------- + +NMT_DB: Final[float] = 6.0 # Noise Masking Tone (dB) +TMN_DB: Final[float] = 18.0 # Tone Masking Noise (dB) \ No newline at end of file diff --git a/source/level_1/core/aac_decoder.py b/source/level_1/core/aac_decoder.py index 606ce78..5b366b8 100644 --- a/source/level_1/core/aac_decoder.py +++ b/source/level_1/core/aac_decoder.py @@ -9,16 +9,9 @@ # cchoutou@ece.auth.gr # # Description: -# Level 1 AAC decoder orchestration (inverse of aac_coder_1()). -# Keeps the same functional behavior as the original level_1 implementation: -# - Re-pack per-channel spectra into FrameF expected by aac_i_filter_bank() -# - IMDCT synthesis per frame -# - Overlap-add with hop=1024 -# - Remove encoder boundary padding: hop at start and hop at end +# - Level 1 AAC decoder orchestration (inverse of aac_coder_1()). +# - Level 2 AAC decoder orchestration (inverse of aac_coder_1()). # -# Note: -# This core module returns the reconstructed samples. Writing to disk is kept -# in level_x demos. # ------------------------------------------------------------ from __future__ import annotations @@ -33,7 +26,7 @@ from core.aac_types import * # ----------------------------------------------------------------------------- -# Public helpers (useful for level_x demo wrappers) +# Public helpers # ----------------------------------------------------------------------------- def aac_unpack_seq_channels_to_frame_f(frame_type: FrameType, chl_f: FrameChannelF, chr_f: FrameChannelF) -> FrameF: @@ -109,7 +102,7 @@ def aac_remove_padding(y_pad: StereoSignal, hop: int = 1024) -> StereoSignal: # ----------------------------------------------------------------------------- -# Level 1 decoder (core) +# Level 1 decoder # ----------------------------------------------------------------------------- def aac_decoder_1(aac_seq_1: AACSeq1, filename_out: Union[str, Path]) -> StereoSignal: @@ -167,6 +160,10 @@ def aac_decoder_1(aac_seq_1: AACSeq1, filename_out: Union[str, Path]) -> StereoS return y +# ----------------------------------------------------------------------------- +# Level 2 decoder +# ----------------------------------------------------------------------------- + def aac_decoder_2(aac_seq_2: AACSeq2, filename_out: Union[str, Path]) -> StereoSignal: """ Level-2 AAC decoder (inverse of aac_coder_2). diff --git a/source/level_1/core/aac_filterbank.py b/source/level_1/core/aac_filterbank.py index 60eb9c2..6bd6e76 100644 --- a/source/level_1/core/aac_filterbank.py +++ b/source/level_1/core/aac_filterbank.py @@ -14,6 +14,7 @@ # ------------------------------------------------------------ from __future__ import annotations +from core.aac_utils import mdct, imdct from core.aac_types import * from scipy.signal.windows import kaiser @@ -186,74 +187,6 @@ def _window_sequence(frame_type: FrameType, win_type: WinType) -> Window: raise ValueError(f"Invalid frame_type for long window sequence: {frame_type!r}") -def _mdct(s: TimeSignal) -> MdctCoeffs: - """ - MDCT (direct form) as specified in the assignment. - - Parameters - ---------- - s : TimeSignal - Windowed time samples, 1-D array of length N (N = 2048 or 256). - - Returns - ------- - MdctCoeffs - MDCT coefficients, 1-D array of length N/2. - - Definition - ---------- - X[k] = 2 * sum_{n=0..N-1} s[n] * cos((2*pi/N) * (n + n0) * (k + 1/2)), - where n0 = (N/2 + 1)/2. - """ - s = np.asarray(s, dtype=np.float64).reshape(-1) - N = int(s.shape[0]) - if N not in (2048, 256): - raise ValueError("MDCT input length must be 2048 or 256.") - - n0 = (N / 2.0 + 1.0) / 2.0 - n = np.arange(N, dtype=np.float64) + n0 - k = np.arange(N // 2, dtype=np.float64) + 0.5 - - C = np.cos((2.0 * np.pi / N) * np.outer(n, k)) # (N, N/2) - X = 2.0 * (s @ C) # (N/2,) - return X - - -def _imdct(X: MdctCoeffs) -> TimeSignal: - """ - IMDCT (direct form) as specified in the assignment. - - Parameters - ---------- - X : MdctCoeffs - MDCT coefficients, 1-D array of length K (K = 1024 or 128). - - Returns - ------- - TimeSignal - Reconstructed time samples, 1-D array of length N = 2K. - - Definition - ---------- - s[n] = (2/N) * sum_{k=0..N/2-1} X[k] * cos((2*pi/N) * (n + n0) * (k + 1/2)), - where n0 = (N/2 + 1)/2. - """ - X = np.asarray(X, dtype=np.float64).reshape(-1) - K = int(X.shape[0]) - if K not in (1024, 128): - raise ValueError("IMDCT input length must be 1024 or 128.") - - N = 2 * K - n0 = (N / 2.0 + 1.0) / 2.0 - - n = np.arange(N, dtype=np.float64) + n0 - k = np.arange(K, dtype=np.float64) + 0.5 - - C = np.cos((2.0 * np.pi / N) * np.outer(n, k)) # (N, K) - s = (2.0 / N) * (C @ X) # (N,) - return s - - def _filter_bank_esh_channel(x_ch: FrameChannelT, win_type: WinType) -> FrameChannelF: """ ESH analysis for one channel. @@ -279,7 +212,7 @@ def _filter_bank_esh_channel(x_ch: FrameChannelT, win_type: WinType) -> FrameCha for j in range(8): start = 448 + 128 * j seg = x_ch[start:start + 256] * wS # (256,) - X_esh[:, j] = _mdct(seg) # (128,) + X_esh[:, j] = mdct(seg) # (128,) return X_esh @@ -344,7 +277,7 @@ def _i_filter_bank_esh_channel(X_esh: FrameChannelF, win_type: WinType) -> Frame # Each short IMDCT returns 256 samples. Place them at: # start = 448 + 128*j, j=0..7 (50% overlap) for j in range(8): - seg = _imdct(X_esh[:, j]) * wS # (256,) + seg = imdct(X_esh[:, j]) * wS # (256,) start = 448 + 128 * j out[start:start + 256] += seg @@ -352,7 +285,7 @@ def _i_filter_bank_esh_channel(X_esh: FrameChannelF, win_type: WinType) -> Frame # ----------------------------------------------------------------------------- -# Public Function prototypes (Level 1) +# Public Function prototypes # ----------------------------------------------------------------------------- def aac_filter_bank(frame_T: FrameT, frame_type: FrameType, win_type: WinType) -> FrameF: @@ -385,8 +318,8 @@ def aac_filter_bank(frame_T: FrameT, frame_type: FrameType, win_type: WinType) - if frame_type in ("OLS", "LSS", "LPS"): w = _window_sequence(frame_type, win_type) # length 2048 - XL = _mdct(xL * w) # length 1024 - XR = _mdct(xR * w) # length 1024 + XL = mdct(xL * w) # length 1024 + XR = mdct(xR * w) # length 1024 out = np.empty((1024, 2), dtype=np.float64) out[:, 0] = XL out[:, 1] = XR @@ -430,8 +363,8 @@ def aac_i_filter_bank(frame_F: FrameF, frame_type: FrameType, win_type: WinType) w = _window_sequence(frame_type, win_type) - xL = _imdct(frame_F[:, 0]) * w - xR = _imdct(frame_F[:, 1]) * w + xL = imdct(frame_F[:, 0]) * w + xR = imdct(frame_F[:, 1]) * w out = np.empty((2048, 2), dtype=np.float64) out[:, 0] = xL diff --git a/source/level_1/core/aac_snr_db.py b/source/level_1/core/aac_snr_db.py deleted file mode 100644 index 21e5184..0000000 --- a/source/level_1/core/aac_snr_db.py +++ /dev/null @@ -1,60 +0,0 @@ -# ------------------------------------------------------------ -# AAC Coder/Decoder - SNR dB calculator -# -# Multimedia course at Aristotle University of -# Thessaloniki (AUTh) -# -# Author: -# Christos Choutouridis (ΑΕΜ 8997) -# cchoutou@ece.auth.gr -# -# Description: -# This module implements SNR calculation in dB -# ------------------------------------------------------------ -from __future__ import annotations - -from core.aac_types import StereoSignal -import numpy as np - -def snr_db(x_ref: StereoSignal, x_hat: StereoSignal) -> float: - """ - Compute overall SNR (dB) over all samples and channels after aligning lengths. - - Parameters - ---------- - x_ref : StereoSignal - Reference stereo stream. - x_hat : StereoSignal - Reconstructed stereo stream. - - Returns - ------- - float - SNR in dB. - - Returns +inf if noise power is zero. - - Returns -inf if signal power is zero. - """ - x_ref = np.asarray(x_ref, dtype=np.float64) - x_hat = np.asarray(x_hat, dtype=np.float64) - - if x_ref.ndim == 1: - x_ref = x_ref.reshape(-1, 1) - if x_hat.ndim == 1: - x_hat = x_hat.reshape(-1, 1) - - n = min(x_ref.shape[0], x_hat.shape[0]) - c = min(x_ref.shape[1], x_hat.shape[1]) - - x_ref = x_ref[:n, :c] - x_hat = x_hat[:n, :c] - - err = x_ref - x_hat - ps = float(np.sum(x_ref * x_ref)) - pn = float(np.sum(err * err)) - - if pn <= 0.0: - return float("inf") - if ps <= 0.0: - return float("-inf") - - return float(10.0 * np.log10(ps / pn)) \ No newline at end of file diff --git a/source/level_1/core/aac_ssc.py b/source/level_1/core/aac_ssc.py index 926c854..8c082b3 100644 --- a/source/level_1/core/aac_ssc.py +++ b/source/level_1/core/aac_ssc.py @@ -173,7 +173,7 @@ def _stereo_merge(ft_l: FrameType, ft_r: FrameType) -> FrameType: # ----------------------------------------------------------------------------- -# Public Function prototypes (Level 1) +# Public Function prototypes # ----------------------------------------------------------------------------- def aac_SSC(frame_T: FrameT, next_frame_T: FrameT, prev_frame_type: FrameType) -> FrameType: diff --git a/source/level_1/core/aac_types.py b/source/level_1/core/aac_types.py index b52891a..6642e18 100644 --- a/source/level_1/core/aac_types.py +++ b/source/level_1/core/aac_types.py @@ -193,6 +193,25 @@ Bark-band index ranges [start, end] (inclusive) for MDCT lines. Used by TNS to map MDCT indices k to Bark bands. """ +BarkTable: TypeAlias = FloatArray +""" +Psychoacoustic Bark band table loaded from TableB219.mat. + +Typical shapes: +- Long: (69, 6) +- Short: (42, 6) +""" + +BandIndexArray: TypeAlias = NDArray[np.int_] +""" +Array of FFT bin indices per psychoacoustic band. +""" + +BandValueArray: TypeAlias = FloatArray +""" +Per-band psychoacoustic values (e.g. Bark position, thresholds). +""" + # ----------------------------------------------------------------------------- # Level 1 AAC sequence payload types # ----------------------------------------------------------------------------- diff --git a/source/level_1/core/aac_utils.py b/source/level_1/core/aac_utils.py new file mode 100644 index 0000000..0e18ed1 --- /dev/null +++ b/source/level_1/core/aac_utils.py @@ -0,0 +1,270 @@ +# ------------------------------------------------------------ +# AAC Coder/Decoder - AAC Utilities +# +# Multimedia course at Aristotle University of +# Thessaloniki (AUTh) +# +# Author: +# Christos Choutouridis (ΑΕΜ 8997) +# cchoutou@ece.auth.gr +# +# Description: +# Shared utility functions used across AAC encoder/decoder levels. +# +# This module currently provides: +# - MDCT / IMDCT conversions +# - Signal-to-Noise Ratio (SNR) computation in dB +# - Loading and access helpers for psychoacoustic band tables +# (TableB219.mat, Tables B.2.1.9a / B.2.1.9b of the AAC specification) +# ------------------------------------------------------------ +from __future__ import annotations + +import numpy as np +from pathlib import Path + +from scipy.io import loadmat +from core.aac_types import * + + +# ----------------------------------------------------------------------------- +# Global cached data +# ----------------------------------------------------------------------------- +# Cached contents of TableB219.mat to avoid repeated disk I/O. +# Keys: +# - "B219a": long-window psychoacoustic bands (69 bands, FFT size 2048) +# - "B219b": short-window psychoacoustic bands (42 bands, FFT size 256) +B219_CACHE: dict[str, BarkTable] | None = None + + +# ----------------------------------------------------------------------------- +# MDCT / IMDCT +# ----------------------------------------------------------------------------- +def mdct(s: TimeSignal) -> MdctCoeffs: + """ + MDCT (direct form) as specified in the assignment. + + Parameters + ---------- + s : TimeSignal + Windowed time samples, 1-D array of length N (N = 2048 or 256). + + Returns + ------- + MdctCoeffs + MDCT coefficients, 1-D array of length N/2. + + Definition + ---------- + X[k] = 2 * sum_{n=0..N-1} s[n] * cos((2*pi/N) * (n + n0) * (k + 1/2)), + where n0 = (N/2 + 1)/2. + """ + s = np.asarray(s, dtype=np.float64).reshape(-1) + N = int(s.shape[0]) + if N not in (2048, 256): + raise ValueError("MDCT input length must be 2048 or 256.") + + n0 = (N / 2.0 + 1.0) / 2.0 + n = np.arange(N, dtype=np.float64) + n0 + k = np.arange(N // 2, dtype=np.float64) + 0.5 + + C = np.cos((2.0 * np.pi / N) * np.outer(n, k)) # (N, N/2) + X = 2.0 * (s @ C) # (N/2,) + return X + + +def imdct(X: MdctCoeffs) -> TimeSignal: + """ + IMDCT (direct form) as specified in the assignment. + + Parameters + ---------- + X : MdctCoeffs + MDCT coefficients, 1-D array of length K (K = 1024 or 128). + + Returns + ------- + TimeSignal + Reconstructed time samples, 1-D array of length N = 2K. + + Definition + ---------- + s[n] = (2/N) * sum_{k=0..N/2-1} X[k] * cos((2*pi/N) * (n + n0) * (k + 1/2)), + where n0 = (N/2 + 1)/2. + """ + X = np.asarray(X, dtype=np.float64).reshape(-1) + K = int(X.shape[0]) + if K not in (1024, 128): + raise ValueError("IMDCT input length must be 1024 or 128.") + + N = 2 * K + n0 = (N / 2.0 + 1.0) / 2.0 + + n = np.arange(N, dtype=np.float64) + n0 + k = np.arange(K, dtype=np.float64) + 0.5 + + C = np.cos((2.0 * np.pi / N) * np.outer(n, k)) # (N, K) + s = (2.0 / N) * (C @ X) # (N,) + return s + + +# ----------------------------------------------------------------------------- +# Signal quality metrics +# ----------------------------------------------------------------------------- + +def snr_db(x_ref: StereoSignal, x_hat: StereoSignal) -> float: + """ + Compute the overall Signal-to-Noise Ratio (SNR) in dB. + + The SNR is computed over all available samples and channels, + after conservatively aligning the two signals to their common + length and channel count. + + Parameters + ---------- + x_ref : StereoSignal + Reference (original) signal. + Typical shape: (N, 2) for stereo. + x_hat : StereoSignal + Reconstructed or processed signal. + Typical shape: (M, 2) for stereo. + + Returns + ------- + float + SNR in dB. + - +inf if the noise power is zero (perfect reconstruction). + - -inf if the reference signal power is zero. + """ + x_ref = np.asarray(x_ref, dtype=np.float64) + x_hat = np.asarray(x_hat, dtype=np.float64) + + # Ensure 2-D shape: (samples, channels) + if x_ref.ndim == 1: + x_ref = x_ref.reshape(-1, 1) + if x_hat.ndim == 1: + x_hat = x_hat.reshape(-1, 1) + + # Align lengths and channel count conservatively + n = min(x_ref.shape[0], x_hat.shape[0]) + c = min(x_ref.shape[1], x_hat.shape[1]) + + x_ref = x_ref[:n, :c] + x_hat = x_hat[:n, :c] + + err = x_ref - x_hat + ps = float(np.sum(x_ref * x_ref)) # signal power + pn = float(np.sum(err * err)) # noise power + + if pn <= 0.0: + return float("inf") + if ps <= 0.0: + return float("-inf") + + return float(10.0 * np.log10(ps / pn)) + + +# ----------------------------------------------------------------------------- +# Psychoacoustic band tables (TableB219.mat) +# ----------------------------------------------------------------------------- + +def load_b219_tables() -> dict[str, BarkTable]: + """ + Load and cache psychoacoustic band tables from TableB219.mat. + + The assignment/project layout assumes that a 'material' directory + is available in the current working directory when running: + - tests + - level_1 / level_2 / level_3 entrypoints + + This function loads the tables once and caches them for subsequent calls. + + Returns + ------- + dict[str, BarkTable] + Dictionary with the following entries: + - "B219a": long-window psychoacoustic table + (69 bands, FFT size 2048 / 1024 spectral lines) + - "B219b": short-window psychoacoustic table + (42 bands, FFT size 256 / 128 spectral lines) + """ + global B219_CACHE + if B219_CACHE is not None: + return B219_CACHE + + mat_path = Path("material") / "TableB219.mat" + if not mat_path.exists(): + raise FileNotFoundError( + "Could not locate material/TableB219.mat in the current working directory." + ) + + data = loadmat(str(mat_path)) + if "B219a" not in data or "B219b" not in data: + raise ValueError( + "TableB219.mat missing required variables 'B219a' and/or 'B219b'." + ) + + B219_CACHE = { + "B219a": np.asarray(data["B219a"], dtype=np.float64), + "B219b": np.asarray(data["B219b"], dtype=np.float64), + } + return B219_CACHE + + +def get_table(frame_type: FrameType) -> tuple[BarkTable, int]: + """ + Select the appropriate psychoacoustic band table and FFT size + based on the AAC frame type. + + Parameters + ---------- + frame_type : FrameType + AAC frame type ("OLS", "LSS", "ESH", "LPS"). + + Returns + ------- + table : BarkTable + Psychoacoustic band table: + - B219a for long frames + - B219b for ESH short subframes + N : int + FFT size corresponding to the table: + - 2048 for long frames + - 256 for short frames (ESH) + """ + tables = load_b219_tables() + if frame_type == "ESH": + return tables["B219b"], 256 + return tables["B219a"], 2048 + + +def band_limits( + table: BarkTable, +) -> tuple[BandIndexArray, BandIndexArray, BandValueArray, BandValueArray]: + """ + Extract per-band metadata from a TableB2.1.9 psychoacoustic table. + + The column layout follows the provided TableB219.mat file and the + AAC specification tables B.2.1.9a / B.2.1.9b. + + Parameters + ---------- + table : BarkTable + Psychoacoustic band table (B219a or B219b). + + Returns + ------- + wlow : BandIndexArray + Lower FFT bin index (inclusive) for each band. + whigh : BandIndexArray + Upper FFT bin index (inclusive) for each band. + bval : BandValueArray + Bark-scale (or equivalent) band position values. + Used in the spreading function. + qthr_db : BandValueArray + Threshold in quiet for each band, in dB. + """ + wlow = table[:, 1].astype(int) + whigh = table[:, 2].astype(int) + bval = table[:, 4].astype(np.float64) + qthr_db = table[:, 5].astype(np.float64) + return wlow, whigh, bval, qthr_db diff --git a/source/level_1/level_1.py b/source/level_1/level_1.py index 84e72c6..15958bc 100644 --- a/source/level_1/level_1.py +++ b/source/level_1/level_1.py @@ -28,7 +28,7 @@ from core.aac_types import AACSeq1, StereoSignal from core.aac_coder import aac_coder_1 as core_aac_coder_1 from core.aac_coder import aac_read_wav_stereo_48k from core.aac_decoder import aac_decoder_1 as core_aac_decoder_1 -from core.aac_snr_db import snr_db +from core.aac_utils import snr_db # ----------------------------------------------------------------------------- diff --git a/source/level_2/core/aac_coder.py b/source/level_2/core/aac_coder.py index 0b6b939..fc3f8ac 100644 --- a/source/level_2/core/aac_coder.py +++ b/source/level_2/core/aac_coder.py @@ -9,16 +9,8 @@ # cchoutou@ece.auth.gr # # Description: -# Level 1 AAC encoder orchestration. -# Keeps the same functional behavior as the original level_1 implementation: -# - Reads WAV via soundfile -# - Validates stereo and 48 kHz -# - Frames into 2048 samples with hop=1024 and zero padding at both ends -# - SSC decision uses next-frame attack detection -# - Filterbank analysis (MDCT) -# - Stores per-channel spectra in AACSeq1 schema: -# * ESH: (128, 8) -# * else: (1024, 1) +# - Level 1 AAC encoder orchestration. +# - Level 2 AAC encoder orchestration. # ------------------------------------------------------------ from __future__ import annotations @@ -199,6 +191,10 @@ def aac_coder_1(filename_in: Union[str, Path]) -> AACSeq1: return aac_seq +# ----------------------------------------------------------------------------- +# Level 2 encoder +# ----------------------------------------------------------------------------- + def aac_coder_2(filename_in: Union[str, Path]) -> AACSeq2: """ Level-2 AAC encoder (Level 1 + TNS). diff --git a/source/level_2/core/aac_configuration.py b/source/level_2/core/aac_configuration.py index bb75d2e..63d6297 100644 --- a/source/level_2/core/aac_configuration.py +++ b/source/level_2/core/aac_configuration.py @@ -15,6 +15,8 @@ from __future__ import annotations # Imports +from typing import Final + from core.aac_types import WinType # Filterbank @@ -28,4 +30,12 @@ WIN_TYPE: WinType = "SIN" # ------------------------------------------------------------ PRED_ORDER = 4 QUANT_STEP = 0.1 -QUANT_MAX = 0.7 # 4-bit symmetric with step 0.1 -> clamp to [-0.7, +0.7] \ No newline at end of file +QUANT_MAX = 0.7 # 4-bit symmetric with step 0.1 -> clamp to [-0.7, +0.7] + + +# ----------------------------------------------------------------------------- +# Psycho +# ----------------------------------------------------------------------------- + +NMT_DB: Final[float] = 6.0 # Noise Masking Tone (dB) +TMN_DB: Final[float] = 18.0 # Tone Masking Noise (dB) \ No newline at end of file diff --git a/source/level_2/core/aac_decoder.py b/source/level_2/core/aac_decoder.py index 606ce78..5b366b8 100644 --- a/source/level_2/core/aac_decoder.py +++ b/source/level_2/core/aac_decoder.py @@ -9,16 +9,9 @@ # cchoutou@ece.auth.gr # # Description: -# Level 1 AAC decoder orchestration (inverse of aac_coder_1()). -# Keeps the same functional behavior as the original level_1 implementation: -# - Re-pack per-channel spectra into FrameF expected by aac_i_filter_bank() -# - IMDCT synthesis per frame -# - Overlap-add with hop=1024 -# - Remove encoder boundary padding: hop at start and hop at end +# - Level 1 AAC decoder orchestration (inverse of aac_coder_1()). +# - Level 2 AAC decoder orchestration (inverse of aac_coder_1()). # -# Note: -# This core module returns the reconstructed samples. Writing to disk is kept -# in level_x demos. # ------------------------------------------------------------ from __future__ import annotations @@ -33,7 +26,7 @@ from core.aac_types import * # ----------------------------------------------------------------------------- -# Public helpers (useful for level_x demo wrappers) +# Public helpers # ----------------------------------------------------------------------------- def aac_unpack_seq_channels_to_frame_f(frame_type: FrameType, chl_f: FrameChannelF, chr_f: FrameChannelF) -> FrameF: @@ -109,7 +102,7 @@ def aac_remove_padding(y_pad: StereoSignal, hop: int = 1024) -> StereoSignal: # ----------------------------------------------------------------------------- -# Level 1 decoder (core) +# Level 1 decoder # ----------------------------------------------------------------------------- def aac_decoder_1(aac_seq_1: AACSeq1, filename_out: Union[str, Path]) -> StereoSignal: @@ -167,6 +160,10 @@ def aac_decoder_1(aac_seq_1: AACSeq1, filename_out: Union[str, Path]) -> StereoS return y +# ----------------------------------------------------------------------------- +# Level 2 decoder +# ----------------------------------------------------------------------------- + def aac_decoder_2(aac_seq_2: AACSeq2, filename_out: Union[str, Path]) -> StereoSignal: """ Level-2 AAC decoder (inverse of aac_coder_2). diff --git a/source/level_2/core/aac_filterbank.py b/source/level_2/core/aac_filterbank.py index 60eb9c2..6bd6e76 100644 --- a/source/level_2/core/aac_filterbank.py +++ b/source/level_2/core/aac_filterbank.py @@ -14,6 +14,7 @@ # ------------------------------------------------------------ from __future__ import annotations +from core.aac_utils import mdct, imdct from core.aac_types import * from scipy.signal.windows import kaiser @@ -186,74 +187,6 @@ def _window_sequence(frame_type: FrameType, win_type: WinType) -> Window: raise ValueError(f"Invalid frame_type for long window sequence: {frame_type!r}") -def _mdct(s: TimeSignal) -> MdctCoeffs: - """ - MDCT (direct form) as specified in the assignment. - - Parameters - ---------- - s : TimeSignal - Windowed time samples, 1-D array of length N (N = 2048 or 256). - - Returns - ------- - MdctCoeffs - MDCT coefficients, 1-D array of length N/2. - - Definition - ---------- - X[k] = 2 * sum_{n=0..N-1} s[n] * cos((2*pi/N) * (n + n0) * (k + 1/2)), - where n0 = (N/2 + 1)/2. - """ - s = np.asarray(s, dtype=np.float64).reshape(-1) - N = int(s.shape[0]) - if N not in (2048, 256): - raise ValueError("MDCT input length must be 2048 or 256.") - - n0 = (N / 2.0 + 1.0) / 2.0 - n = np.arange(N, dtype=np.float64) + n0 - k = np.arange(N // 2, dtype=np.float64) + 0.5 - - C = np.cos((2.0 * np.pi / N) * np.outer(n, k)) # (N, N/2) - X = 2.0 * (s @ C) # (N/2,) - return X - - -def _imdct(X: MdctCoeffs) -> TimeSignal: - """ - IMDCT (direct form) as specified in the assignment. - - Parameters - ---------- - X : MdctCoeffs - MDCT coefficients, 1-D array of length K (K = 1024 or 128). - - Returns - ------- - TimeSignal - Reconstructed time samples, 1-D array of length N = 2K. - - Definition - ---------- - s[n] = (2/N) * sum_{k=0..N/2-1} X[k] * cos((2*pi/N) * (n + n0) * (k + 1/2)), - where n0 = (N/2 + 1)/2. - """ - X = np.asarray(X, dtype=np.float64).reshape(-1) - K = int(X.shape[0]) - if K not in (1024, 128): - raise ValueError("IMDCT input length must be 1024 or 128.") - - N = 2 * K - n0 = (N / 2.0 + 1.0) / 2.0 - - n = np.arange(N, dtype=np.float64) + n0 - k = np.arange(K, dtype=np.float64) + 0.5 - - C = np.cos((2.0 * np.pi / N) * np.outer(n, k)) # (N, K) - s = (2.0 / N) * (C @ X) # (N,) - return s - - def _filter_bank_esh_channel(x_ch: FrameChannelT, win_type: WinType) -> FrameChannelF: """ ESH analysis for one channel. @@ -279,7 +212,7 @@ def _filter_bank_esh_channel(x_ch: FrameChannelT, win_type: WinType) -> FrameCha for j in range(8): start = 448 + 128 * j seg = x_ch[start:start + 256] * wS # (256,) - X_esh[:, j] = _mdct(seg) # (128,) + X_esh[:, j] = mdct(seg) # (128,) return X_esh @@ -344,7 +277,7 @@ def _i_filter_bank_esh_channel(X_esh: FrameChannelF, win_type: WinType) -> Frame # Each short IMDCT returns 256 samples. Place them at: # start = 448 + 128*j, j=0..7 (50% overlap) for j in range(8): - seg = _imdct(X_esh[:, j]) * wS # (256,) + seg = imdct(X_esh[:, j]) * wS # (256,) start = 448 + 128 * j out[start:start + 256] += seg @@ -352,7 +285,7 @@ def _i_filter_bank_esh_channel(X_esh: FrameChannelF, win_type: WinType) -> Frame # ----------------------------------------------------------------------------- -# Public Function prototypes (Level 1) +# Public Function prototypes # ----------------------------------------------------------------------------- def aac_filter_bank(frame_T: FrameT, frame_type: FrameType, win_type: WinType) -> FrameF: @@ -385,8 +318,8 @@ def aac_filter_bank(frame_T: FrameT, frame_type: FrameType, win_type: WinType) - if frame_type in ("OLS", "LSS", "LPS"): w = _window_sequence(frame_type, win_type) # length 2048 - XL = _mdct(xL * w) # length 1024 - XR = _mdct(xR * w) # length 1024 + XL = mdct(xL * w) # length 1024 + XR = mdct(xR * w) # length 1024 out = np.empty((1024, 2), dtype=np.float64) out[:, 0] = XL out[:, 1] = XR @@ -430,8 +363,8 @@ def aac_i_filter_bank(frame_F: FrameF, frame_type: FrameType, win_type: WinType) w = _window_sequence(frame_type, win_type) - xL = _imdct(frame_F[:, 0]) * w - xR = _imdct(frame_F[:, 1]) * w + xL = imdct(frame_F[:, 0]) * w + xR = imdct(frame_F[:, 1]) * w out = np.empty((2048, 2), dtype=np.float64) out[:, 0] = xL diff --git a/source/level_2/core/aac_snr_db.py b/source/level_2/core/aac_snr_db.py deleted file mode 100644 index 21e5184..0000000 --- a/source/level_2/core/aac_snr_db.py +++ /dev/null @@ -1,60 +0,0 @@ -# ------------------------------------------------------------ -# AAC Coder/Decoder - SNR dB calculator -# -# Multimedia course at Aristotle University of -# Thessaloniki (AUTh) -# -# Author: -# Christos Choutouridis (ΑΕΜ 8997) -# cchoutou@ece.auth.gr -# -# Description: -# This module implements SNR calculation in dB -# ------------------------------------------------------------ -from __future__ import annotations - -from core.aac_types import StereoSignal -import numpy as np - -def snr_db(x_ref: StereoSignal, x_hat: StereoSignal) -> float: - """ - Compute overall SNR (dB) over all samples and channels after aligning lengths. - - Parameters - ---------- - x_ref : StereoSignal - Reference stereo stream. - x_hat : StereoSignal - Reconstructed stereo stream. - - Returns - ------- - float - SNR in dB. - - Returns +inf if noise power is zero. - - Returns -inf if signal power is zero. - """ - x_ref = np.asarray(x_ref, dtype=np.float64) - x_hat = np.asarray(x_hat, dtype=np.float64) - - if x_ref.ndim == 1: - x_ref = x_ref.reshape(-1, 1) - if x_hat.ndim == 1: - x_hat = x_hat.reshape(-1, 1) - - n = min(x_ref.shape[0], x_hat.shape[0]) - c = min(x_ref.shape[1], x_hat.shape[1]) - - x_ref = x_ref[:n, :c] - x_hat = x_hat[:n, :c] - - err = x_ref - x_hat - ps = float(np.sum(x_ref * x_ref)) - pn = float(np.sum(err * err)) - - if pn <= 0.0: - return float("inf") - if ps <= 0.0: - return float("-inf") - - return float(10.0 * np.log10(ps / pn)) \ No newline at end of file diff --git a/source/level_2/core/aac_ssc.py b/source/level_2/core/aac_ssc.py index 926c854..8c082b3 100644 --- a/source/level_2/core/aac_ssc.py +++ b/source/level_2/core/aac_ssc.py @@ -173,7 +173,7 @@ def _stereo_merge(ft_l: FrameType, ft_r: FrameType) -> FrameType: # ----------------------------------------------------------------------------- -# Public Function prototypes (Level 1) +# Public Function prototypes # ----------------------------------------------------------------------------- def aac_SSC(frame_T: FrameT, next_frame_T: FrameT, prev_frame_type: FrameType) -> FrameType: diff --git a/source/level_2/core/aac_tns.py b/source/level_2/core/aac_tns.py index 29e9921..88d9da8 100644 --- a/source/level_2/core/aac_tns.py +++ b/source/level_2/core/aac_tns.py @@ -33,6 +33,7 @@ from typing import Tuple import numpy as np from scipy.io import loadmat +from core.aac_utils import load_b219_tables from core.aac_configuration import PRED_ORDER, QUANT_STEP, QUANT_MAX from core.aac_types import * @@ -40,41 +41,6 @@ from core.aac_types import * # Private helpers # ----------------------------------------------------------------------------- -_B219_CACHE: dict[str, FloatArray] | None = None - - -def _load_b219_tables() -> dict[str, FloatArray]: - """ - Load TableB219.mat and cache the contents. - - The project layout guarantees that a 'material' directory is discoverable - from the current working directory (tests and level_123 entrypoints). - - Returns - ------- - dict[str, FloatArray] - Keys: - - "B219a": long bands table (for K=1024 MDCT lines) - - "B219b": short bands table (for K=128 MDCT lines) - """ - global _B219_CACHE - if _B219_CACHE is not None: - return _B219_CACHE - - mat_path = Path("material") / "TableB219.mat" - if not mat_path.exists(): - raise FileNotFoundError("Could not locate material/TableB219.mat in the current working directory.") - - d = loadmat(str(mat_path)) - if "B219a" not in d or "B219b" not in d: - raise ValueError("TableB219.mat missing required variables B219a and/or B219b.") - - _B219_CACHE = { - "B219a": np.asarray(d["B219a"], dtype=np.float64), - "B219b": np.asarray(d["B219b"], dtype=np.float64), - } - return _B219_CACHE - def _band_ranges_for_kcount(k_count: int) -> BandRanges: """ @@ -92,7 +58,7 @@ def _band_ranges_for_kcount(k_count: int) -> BandRanges: BandRanges (list[tuple[int, int]]) Each tuple is (start_k, end_k) inclusive. """ - tables = _load_b219_tables() + tables = load_b219_tables() if k_count == 1024: tbl = tables["B219a"] elif k_count == 128: @@ -425,7 +391,7 @@ def _tns_one_vector(x: MdctCoeffs) -> tuple[MdctCoeffs, MdctCoeffs]: # ----------------------------------------------------------------------------- -# Public Functions (Level 2) +# Public Functions # ----------------------------------------------------------------------------- def aac_tns(frame_F_in: FrameChannelF, frame_type: FrameType) -> Tuple[FrameChannelF, TnsCoeffs]: diff --git a/source/level_2/core/aac_types.py b/source/level_2/core/aac_types.py index b52891a..6642e18 100644 --- a/source/level_2/core/aac_types.py +++ b/source/level_2/core/aac_types.py @@ -193,6 +193,25 @@ Bark-band index ranges [start, end] (inclusive) for MDCT lines. Used by TNS to map MDCT indices k to Bark bands. """ +BarkTable: TypeAlias = FloatArray +""" +Psychoacoustic Bark band table loaded from TableB219.mat. + +Typical shapes: +- Long: (69, 6) +- Short: (42, 6) +""" + +BandIndexArray: TypeAlias = NDArray[np.int_] +""" +Array of FFT bin indices per psychoacoustic band. +""" + +BandValueArray: TypeAlias = FloatArray +""" +Per-band psychoacoustic values (e.g. Bark position, thresholds). +""" + # ----------------------------------------------------------------------------- # Level 1 AAC sequence payload types # ----------------------------------------------------------------------------- diff --git a/source/level_2/core/aac_utils.py b/source/level_2/core/aac_utils.py new file mode 100644 index 0000000..0e18ed1 --- /dev/null +++ b/source/level_2/core/aac_utils.py @@ -0,0 +1,270 @@ +# ------------------------------------------------------------ +# AAC Coder/Decoder - AAC Utilities +# +# Multimedia course at Aristotle University of +# Thessaloniki (AUTh) +# +# Author: +# Christos Choutouridis (ΑΕΜ 8997) +# cchoutou@ece.auth.gr +# +# Description: +# Shared utility functions used across AAC encoder/decoder levels. +# +# This module currently provides: +# - MDCT / IMDCT conversions +# - Signal-to-Noise Ratio (SNR) computation in dB +# - Loading and access helpers for psychoacoustic band tables +# (TableB219.mat, Tables B.2.1.9a / B.2.1.9b of the AAC specification) +# ------------------------------------------------------------ +from __future__ import annotations + +import numpy as np +from pathlib import Path + +from scipy.io import loadmat +from core.aac_types import * + + +# ----------------------------------------------------------------------------- +# Global cached data +# ----------------------------------------------------------------------------- +# Cached contents of TableB219.mat to avoid repeated disk I/O. +# Keys: +# - "B219a": long-window psychoacoustic bands (69 bands, FFT size 2048) +# - "B219b": short-window psychoacoustic bands (42 bands, FFT size 256) +B219_CACHE: dict[str, BarkTable] | None = None + + +# ----------------------------------------------------------------------------- +# MDCT / IMDCT +# ----------------------------------------------------------------------------- +def mdct(s: TimeSignal) -> MdctCoeffs: + """ + MDCT (direct form) as specified in the assignment. + + Parameters + ---------- + s : TimeSignal + Windowed time samples, 1-D array of length N (N = 2048 or 256). + + Returns + ------- + MdctCoeffs + MDCT coefficients, 1-D array of length N/2. + + Definition + ---------- + X[k] = 2 * sum_{n=0..N-1} s[n] * cos((2*pi/N) * (n + n0) * (k + 1/2)), + where n0 = (N/2 + 1)/2. + """ + s = np.asarray(s, dtype=np.float64).reshape(-1) + N = int(s.shape[0]) + if N not in (2048, 256): + raise ValueError("MDCT input length must be 2048 or 256.") + + n0 = (N / 2.0 + 1.0) / 2.0 + n = np.arange(N, dtype=np.float64) + n0 + k = np.arange(N // 2, dtype=np.float64) + 0.5 + + C = np.cos((2.0 * np.pi / N) * np.outer(n, k)) # (N, N/2) + X = 2.0 * (s @ C) # (N/2,) + return X + + +def imdct(X: MdctCoeffs) -> TimeSignal: + """ + IMDCT (direct form) as specified in the assignment. + + Parameters + ---------- + X : MdctCoeffs + MDCT coefficients, 1-D array of length K (K = 1024 or 128). + + Returns + ------- + TimeSignal + Reconstructed time samples, 1-D array of length N = 2K. + + Definition + ---------- + s[n] = (2/N) * sum_{k=0..N/2-1} X[k] * cos((2*pi/N) * (n + n0) * (k + 1/2)), + where n0 = (N/2 + 1)/2. + """ + X = np.asarray(X, dtype=np.float64).reshape(-1) + K = int(X.shape[0]) + if K not in (1024, 128): + raise ValueError("IMDCT input length must be 1024 or 128.") + + N = 2 * K + n0 = (N / 2.0 + 1.0) / 2.0 + + n = np.arange(N, dtype=np.float64) + n0 + k = np.arange(K, dtype=np.float64) + 0.5 + + C = np.cos((2.0 * np.pi / N) * np.outer(n, k)) # (N, K) + s = (2.0 / N) * (C @ X) # (N,) + return s + + +# ----------------------------------------------------------------------------- +# Signal quality metrics +# ----------------------------------------------------------------------------- + +def snr_db(x_ref: StereoSignal, x_hat: StereoSignal) -> float: + """ + Compute the overall Signal-to-Noise Ratio (SNR) in dB. + + The SNR is computed over all available samples and channels, + after conservatively aligning the two signals to their common + length and channel count. + + Parameters + ---------- + x_ref : StereoSignal + Reference (original) signal. + Typical shape: (N, 2) for stereo. + x_hat : StereoSignal + Reconstructed or processed signal. + Typical shape: (M, 2) for stereo. + + Returns + ------- + float + SNR in dB. + - +inf if the noise power is zero (perfect reconstruction). + - -inf if the reference signal power is zero. + """ + x_ref = np.asarray(x_ref, dtype=np.float64) + x_hat = np.asarray(x_hat, dtype=np.float64) + + # Ensure 2-D shape: (samples, channels) + if x_ref.ndim == 1: + x_ref = x_ref.reshape(-1, 1) + if x_hat.ndim == 1: + x_hat = x_hat.reshape(-1, 1) + + # Align lengths and channel count conservatively + n = min(x_ref.shape[0], x_hat.shape[0]) + c = min(x_ref.shape[1], x_hat.shape[1]) + + x_ref = x_ref[:n, :c] + x_hat = x_hat[:n, :c] + + err = x_ref - x_hat + ps = float(np.sum(x_ref * x_ref)) # signal power + pn = float(np.sum(err * err)) # noise power + + if pn <= 0.0: + return float("inf") + if ps <= 0.0: + return float("-inf") + + return float(10.0 * np.log10(ps / pn)) + + +# ----------------------------------------------------------------------------- +# Psychoacoustic band tables (TableB219.mat) +# ----------------------------------------------------------------------------- + +def load_b219_tables() -> dict[str, BarkTable]: + """ + Load and cache psychoacoustic band tables from TableB219.mat. + + The assignment/project layout assumes that a 'material' directory + is available in the current working directory when running: + - tests + - level_1 / level_2 / level_3 entrypoints + + This function loads the tables once and caches them for subsequent calls. + + Returns + ------- + dict[str, BarkTable] + Dictionary with the following entries: + - "B219a": long-window psychoacoustic table + (69 bands, FFT size 2048 / 1024 spectral lines) + - "B219b": short-window psychoacoustic table + (42 bands, FFT size 256 / 128 spectral lines) + """ + global B219_CACHE + if B219_CACHE is not None: + return B219_CACHE + + mat_path = Path("material") / "TableB219.mat" + if not mat_path.exists(): + raise FileNotFoundError( + "Could not locate material/TableB219.mat in the current working directory." + ) + + data = loadmat(str(mat_path)) + if "B219a" not in data or "B219b" not in data: + raise ValueError( + "TableB219.mat missing required variables 'B219a' and/or 'B219b'." + ) + + B219_CACHE = { + "B219a": np.asarray(data["B219a"], dtype=np.float64), + "B219b": np.asarray(data["B219b"], dtype=np.float64), + } + return B219_CACHE + + +def get_table(frame_type: FrameType) -> tuple[BarkTable, int]: + """ + Select the appropriate psychoacoustic band table and FFT size + based on the AAC frame type. + + Parameters + ---------- + frame_type : FrameType + AAC frame type ("OLS", "LSS", "ESH", "LPS"). + + Returns + ------- + table : BarkTable + Psychoacoustic band table: + - B219a for long frames + - B219b for ESH short subframes + N : int + FFT size corresponding to the table: + - 2048 for long frames + - 256 for short frames (ESH) + """ + tables = load_b219_tables() + if frame_type == "ESH": + return tables["B219b"], 256 + return tables["B219a"], 2048 + + +def band_limits( + table: BarkTable, +) -> tuple[BandIndexArray, BandIndexArray, BandValueArray, BandValueArray]: + """ + Extract per-band metadata from a TableB2.1.9 psychoacoustic table. + + The column layout follows the provided TableB219.mat file and the + AAC specification tables B.2.1.9a / B.2.1.9b. + + Parameters + ---------- + table : BarkTable + Psychoacoustic band table (B219a or B219b). + + Returns + ------- + wlow : BandIndexArray + Lower FFT bin index (inclusive) for each band. + whigh : BandIndexArray + Upper FFT bin index (inclusive) for each band. + bval : BandValueArray + Bark-scale (or equivalent) band position values. + Used in the spreading function. + qthr_db : BandValueArray + Threshold in quiet for each band, in dB. + """ + wlow = table[:, 1].astype(int) + whigh = table[:, 2].astype(int) + bval = table[:, 4].astype(np.float64) + qthr_db = table[:, 5].astype(np.float64) + return wlow, whigh, bval, qthr_db diff --git a/source/level_2/level_2.py b/source/level_2/level_2.py index 40adc1c..3ed9029 100644 --- a/source/level_2/level_2.py +++ b/source/level_2/level_2.py @@ -28,7 +28,7 @@ from core.aac_types import AACSeq2, StereoSignal from core.aac_coder import aac_coder_2 as core_aac_coder_2 from core.aac_coder import aac_read_wav_stereo_48k from core.aac_decoder import aac_decoder_2 as core_aac_decoder_2 -from core.aac_snr_db import snr_db +from core.aac_utils import snr_db # ----------------------------------------------------------------------------- # Public Level 2 API (wrappers)