516 lines
14 KiB
Python
516 lines
14 KiB
Python
# ------------------------------------------------------------
|
||
# AAC Coder/Decoder - Temporal Noise Shaping (TNS)
|
||
#
|
||
# Multimedia course at Aristotle University of
|
||
# Thessaloniki (AUTh)
|
||
#
|
||
# Author:
|
||
# Christos Choutouridis (ΑΕΜ 8997)
|
||
# cchoutou@ece.auth.gr
|
||
#
|
||
# Description:
|
||
# Temporal Noise Shaping (TNS) module (Level 2).
|
||
#
|
||
# Public API:
|
||
# frame_F_out, tns_coeffs = aac_tns(frame_F_in, frame_type)
|
||
# frame_F_out = aac_i_tns(frame_F_in, frame_type, tns_coeffs)
|
||
#
|
||
# Notes (per assignment):
|
||
# - TNS is applied per channel (not stereo).
|
||
# - For ESH, TNS is applied independently to each of the 8 short subframes.
|
||
# - Bark band tables are taken from TableB.2.1.9a (long) and TableB.2.1.9b (short)
|
||
# provided in TableB219.mat.
|
||
# - Predictor order is fixed to p = 4.
|
||
# - Coefficients are quantized with a 4-bit uniform symmetric quantizer, step = 0.1.
|
||
# - Forward TNS applies FIR: H_TNS(z) = 1 - a1 z^-1 - ... - ap z^-p
|
||
# - Inverse TNS applies the inverse IIR filter using the same quantized coefficients.
|
||
# ------------------------------------------------------------
|
||
from __future__ import annotations
|
||
|
||
from pathlib import Path
|
||
from typing import Tuple
|
||
|
||
import numpy as np
|
||
from scipy.io import loadmat
|
||
|
||
from core.aac_utils import load_b219_tables
|
||
from core.aac_configuration import PRED_ORDER, QUANT_STEP, QUANT_MAX
|
||
from core.aac_types import *
|
||
|
||
# -----------------------------------------------------------------------------
|
||
# Private helpers
|
||
# -----------------------------------------------------------------------------
|
||
|
||
|
||
def _band_ranges_for_kcount(k_count: int) -> BandRanges:
|
||
"""
|
||
Return Bark band index ranges [start, end] (inclusive) for the given MDCT line count.
|
||
|
||
Parameters
|
||
----------
|
||
k_count : int
|
||
Number of MDCT lines:
|
||
- 1024 for long frames
|
||
- 128 for short subframes (ESH)
|
||
|
||
Returns
|
||
-------
|
||
BandRanges (list[tuple[int, int]])
|
||
Each tuple is (start_k, end_k) inclusive.
|
||
"""
|
||
tables = load_b219_tables()
|
||
if k_count == 1024:
|
||
tbl = tables["B219a"]
|
||
elif k_count == 128:
|
||
tbl = tables["B219b"]
|
||
else:
|
||
raise ValueError("TNS supports only k_count=1024 (long) or k_count=128 (short).")
|
||
|
||
start = tbl[:, 1].astype(int)
|
||
end = tbl[:, 2].astype(int)
|
||
|
||
ranges: list[tuple[int, int]] = [(int(s), int(e)) for s, e in zip(start, end)]
|
||
|
||
for s, e in ranges:
|
||
if s < 0 or e < s or e >= k_count:
|
||
raise ValueError("Invalid band table ranges for given k_count.")
|
||
return ranges
|
||
|
||
|
||
# -----------------------------------------------------------------------------
|
||
# Core DSP helpers
|
||
# -----------------------------------------------------------------------------
|
||
|
||
def _smooth_sw_inplace(sw: MdctCoeffs) -> None:
|
||
"""
|
||
Smooth Sw(k) to reduce discontinuities between adjacent Bark bands.
|
||
|
||
The assignment applies two passes:
|
||
- Backward: Sw(k) = (Sw(k) + Sw(k+1))/2
|
||
- Forward: Sw(k) = (Sw(k) + Sw(k-1))/2
|
||
|
||
Parameters
|
||
----------
|
||
sw : MdctCoeffs
|
||
1-D array of length K (float64). Modified in-place.
|
||
"""
|
||
k_count = int(sw.shape[0])
|
||
|
||
for k in range(k_count - 2, -1, -1):
|
||
sw[k] = 0.5 * (sw[k] + sw[k + 1])
|
||
|
||
for k in range(1, k_count):
|
||
sw[k] = 0.5 * (sw[k] + sw[k - 1])
|
||
|
||
|
||
def _compute_sw(x: MdctCoeffs) -> MdctCoeffs:
|
||
"""
|
||
Compute Sw(k) from band energies P(j) and apply boundary smoothing.
|
||
|
||
Parameters
|
||
----------
|
||
x : MdctCoeffs
|
||
1-D MDCT line array, length K.
|
||
|
||
Returns
|
||
-------
|
||
MdctCoeffs
|
||
Sw(k), 1-D array of length K, float64.
|
||
"""
|
||
x = np.asarray(x, dtype=np.float64).reshape(-1)
|
||
k_count = int(x.shape[0])
|
||
|
||
bands = _band_ranges_for_kcount(k_count)
|
||
sw = np.zeros(k_count, dtype=np.float64)
|
||
|
||
for s, e in bands:
|
||
seg = x[s : e + 1]
|
||
p_j = float(np.sum(seg * seg))
|
||
sw_val = float(np.sqrt(p_j))
|
||
sw[s : e + 1] = sw_val
|
||
|
||
_smooth_sw_inplace(sw)
|
||
return sw
|
||
|
||
|
||
def _autocorr(x: MdctCoeffs, p: int) -> MdctCoeffs:
|
||
"""
|
||
Autocorrelation r(m) for m=0..p.
|
||
|
||
Parameters
|
||
----------
|
||
x : MdctCoeffs
|
||
1-D signal.
|
||
p : int
|
||
Maximum lag.
|
||
|
||
Returns
|
||
-------
|
||
MdctCoeffs
|
||
r, shape (p+1,), float64.
|
||
"""
|
||
x = np.asarray(x, dtype=np.float64).reshape(-1)
|
||
n = int(x.shape[0])
|
||
|
||
r = np.zeros(p + 1, dtype=np.float64)
|
||
for m in range(p + 1):
|
||
r[m] = float(np.dot(x[m:], x[: n - m]))
|
||
return r
|
||
|
||
|
||
def _lpc_coeffs(xw: MdctCoeffs, p: int) -> MdctCoeffs:
|
||
"""
|
||
Solve Yule-Walker normal equations for LPC coefficients of order p.
|
||
|
||
Parameters
|
||
----------
|
||
xw : MdctCoeffs
|
||
1-D normalized sequence Xw(k).
|
||
p : int
|
||
Predictor order.
|
||
|
||
Returns
|
||
-------
|
||
MdctCoeffs
|
||
LPC coefficients a[0..p-1], shape (p,), float64.
|
||
"""
|
||
r = _autocorr(xw, p)
|
||
|
||
R = np.empty((p, p), dtype=np.float64)
|
||
for i in range(p):
|
||
for j in range(p):
|
||
R[i, j] = r[abs(i - j)]
|
||
|
||
rhs = r[1 : p + 1].reshape(p)
|
||
|
||
reg = 1e-12
|
||
R_reg = R + reg * np.eye(p, dtype=np.float64)
|
||
|
||
a = np.linalg.solve(R_reg, rhs)
|
||
return a
|
||
|
||
|
||
def _quantize_coeffs(a: MdctCoeffs) -> MdctCoeffs:
|
||
"""
|
||
Quantize LPC coefficients with uniform symmetric quantizer and clamp.
|
||
|
||
Parameters
|
||
----------
|
||
a : MdctCoeffs
|
||
LPC coefficient array, shape (p,).
|
||
|
||
Returns
|
||
-------
|
||
MdctCoeffs
|
||
Quantized coefficients, shape (p,), float64.
|
||
"""
|
||
a = np.asarray(a, dtype=np.float64).reshape(-1)
|
||
q = np.round(a / QUANT_STEP) * QUANT_STEP
|
||
q = np.clip(q, -QUANT_MAX, QUANT_MAX)
|
||
return q.astype(np.float64, copy=False)
|
||
|
||
|
||
def _is_inverse_stable(a_q: MdctCoeffs) -> bool:
|
||
"""
|
||
Check stability of the inverse TNS filter H_TNS^{-1}.
|
||
|
||
Forward filter:
|
||
H_TNS(z) = 1 - a1 z^-1 - ... - ap z^-p
|
||
|
||
Inverse filter poles are roots of:
|
||
A(z) = 1 - a1 z^-1 - ... - ap z^-p
|
||
Multiply by z^p:
|
||
z^p - a1 z^{p-1} - ... - ap = 0
|
||
|
||
Stability condition:
|
||
all roots satisfy |z| < 1.
|
||
|
||
Parameters
|
||
----------
|
||
a_q : MdctCoeffs
|
||
Quantized predictor coefficients, shape (p,).
|
||
|
||
Returns
|
||
-------
|
||
bool
|
||
True if stable, else False.
|
||
"""
|
||
a_q = np.asarray(a_q, dtype=np.float64).reshape(-1)
|
||
p = int(a_q.shape[0])
|
||
|
||
# Polynomial in z: z^p - a1 z^{p-1} - ... - ap
|
||
poly = np.empty(p + 1, dtype=np.float64)
|
||
poly[0] = 1.0
|
||
poly[1:] = -a_q
|
||
|
||
roots = np.roots(poly)
|
||
|
||
# Strictly inside unit circle for stability. Add tiny margin for numeric safety.
|
||
margin = 1e-12
|
||
return bool(np.all(np.abs(roots) < (1.0 - margin)))
|
||
|
||
|
||
def _stabilize_quantized_coeffs(a_q: MdctCoeffs) -> MdctCoeffs:
|
||
"""
|
||
Make quantized predictor coefficients stable for inverse filtering.
|
||
|
||
Policy:
|
||
- If already stable: return as-is.
|
||
- Else: iteratively shrink coefficients by gamma and re-quantize to the 0.1 grid.
|
||
- If still unstable after attempts: fall back to all-zero coefficients (disable TNS).
|
||
|
||
Parameters
|
||
----------
|
||
a_q : MdctCoeffs
|
||
Quantized predictor coefficients, shape (p,).
|
||
|
||
Returns
|
||
-------
|
||
MdctCoeffs
|
||
Stable quantized coefficients, shape (p,).
|
||
"""
|
||
a_q = np.asarray(a_q, dtype=np.float64).reshape(-1)
|
||
|
||
if _is_inverse_stable(a_q):
|
||
return a_q
|
||
|
||
# Try a few shrinking factors. Re-quantize after shrinking to keep coefficients on-grid.
|
||
gammas = (0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1)
|
||
|
||
for g in gammas:
|
||
cand = _quantize_coeffs(g * a_q)
|
||
if _is_inverse_stable(cand):
|
||
return cand
|
||
|
||
# Last resort: disable TNS for this vector
|
||
return np.zeros_like(a_q, dtype=np.float64)
|
||
|
||
|
||
def _apply_tns_fir(x: MdctCoeffs, a_q: MdctCoeffs) -> MdctCoeffs:
|
||
"""
|
||
Apply forward TNS FIR filter:
|
||
y[k] = x[k] - sum_{l=1..p} a_l * x[k-l]
|
||
|
||
Parameters
|
||
----------
|
||
x : MdctCoeffs
|
||
1-D MDCT lines, length K.
|
||
a_q : MdctCoeffs
|
||
Quantized LPC coefficients, shape (p,).
|
||
|
||
Returns
|
||
-------
|
||
MdctCoeffs
|
||
Filtered MDCT lines y, length K.
|
||
"""
|
||
x = np.asarray(x, dtype=np.float64).reshape(-1)
|
||
a_q = np.asarray(a_q, dtype=np.float64).reshape(-1)
|
||
p = int(a_q.shape[0])
|
||
k_count = int(x.shape[0])
|
||
|
||
y = np.zeros(k_count, dtype=np.float64)
|
||
for k in range(k_count):
|
||
acc = x[k]
|
||
for l in range(1, p + 1):
|
||
if k - l >= 0:
|
||
acc -= a_q[l - 1] * x[k - l]
|
||
y[k] = acc
|
||
return y
|
||
|
||
|
||
def _apply_itns_iir(y: MdctCoeffs, a_q: MdctCoeffs) -> MdctCoeffs:
|
||
"""
|
||
Apply inverse TNS IIR filter:
|
||
x_hat[k] = y[k] + sum_{l=1..p} a_l * x_hat[k-l]
|
||
|
||
Parameters
|
||
----------
|
||
y : MdctCoeffs
|
||
1-D MDCT lines after TNS, length K.
|
||
a_q : MdctCoeffs
|
||
Quantized LPC coefficients, shape (p,).
|
||
|
||
Returns
|
||
-------
|
||
MdctCoeffs
|
||
Reconstructed MDCT lines x_hat, length K.
|
||
"""
|
||
y = np.asarray(y, dtype=np.float64).reshape(-1)
|
||
a_q = np.asarray(a_q, dtype=np.float64).reshape(-1)
|
||
p = int(a_q.shape[0])
|
||
k_count = int(y.shape[0])
|
||
|
||
x_hat = np.zeros(k_count, dtype=np.float64)
|
||
for k in range(k_count):
|
||
acc = y[k]
|
||
for l in range(1, p + 1):
|
||
if k - l >= 0:
|
||
acc += a_q[l - 1] * x_hat[k - l]
|
||
x_hat[k] = acc
|
||
return x_hat
|
||
|
||
|
||
def _tns_one_vector(x: MdctCoeffs) -> tuple[MdctCoeffs, MdctCoeffs]:
|
||
"""
|
||
TNS for a single MDCT vector (one long frame or one short subframe).
|
||
|
||
Steps:
|
||
1) Compute Sw(k) from Bark band energies and smooth it.
|
||
2) Normalize: Xw(k) = X(k) / Sw(k) (safe when Sw=0).
|
||
3) Compute LPC coefficients (order p=PRED_ORDER) on Xw.
|
||
4) Quantize coefficients (4-bit symmetric, step QUANT_STEP).
|
||
5) Apply FIR filter on original X(k) using quantized coefficients.
|
||
|
||
Parameters
|
||
----------
|
||
x : MdctCoeffs
|
||
1-D MDCT vector.
|
||
|
||
Returns
|
||
-------
|
||
y : MdctCoeffs
|
||
TNS-processed MDCT vector (same length).
|
||
a_q : MdctCoeffs
|
||
Quantized LPC coefficients, shape (PRED_ORDER,).
|
||
"""
|
||
x = np.asarray(x, dtype=np.float64).reshape(-1)
|
||
sw = _compute_sw(x)
|
||
|
||
eps = 1e-12
|
||
xw = np.where(sw > eps, x / sw, 0.0)
|
||
|
||
a = _lpc_coeffs(xw, PRED_ORDER)
|
||
a_q = _quantize_coeffs(a)
|
||
|
||
# Ensure inverse stability (assignment requirement)
|
||
a_q = _stabilize_quantized_coeffs(a_q)
|
||
|
||
y = _apply_tns_fir(x, a_q)
|
||
return y, a_q
|
||
|
||
|
||
|
||
# -----------------------------------------------------------------------------
|
||
# Public Functions
|
||
# -----------------------------------------------------------------------------
|
||
|
||
def aac_tns(frame_F_in: FrameChannelF, frame_type: FrameType) -> Tuple[FrameChannelF, TnsCoeffs]:
|
||
"""
|
||
Temporal Noise Shaping (TNS) for ONE channel.
|
||
|
||
Parameters
|
||
----------
|
||
frame_F_in : FrameChannelF
|
||
Per-channel MDCT coefficients.
|
||
Expected (typical) shapes:
|
||
- If frame_type == "ESH": (128, 8)
|
||
- Else: (1024, 1) or (1024,)
|
||
|
||
frame_type : FrameType
|
||
Frame type code ("OLS", "LSS", "ESH", "LPS").
|
||
|
||
Returns
|
||
-------
|
||
frame_F_out : FrameChannelF
|
||
Per-channel MDCT coefficients after applying TNS.
|
||
Same shape convention as input.
|
||
|
||
tns_coeffs : TnsCoeffs
|
||
Quantized TNS predictor coefficients.
|
||
Expected shapes:
|
||
- If frame_type == "ESH": (PRED_ORDER, 8)
|
||
- Else: (PRED_ORDER, 1)
|
||
"""
|
||
x = np.asarray(frame_F_in, dtype=np.float64)
|
||
|
||
if frame_type == "ESH":
|
||
if x.shape != (128, 8):
|
||
raise ValueError("For ESH, frame_F_in must have shape (128, 8).")
|
||
|
||
y = np.empty_like(x, dtype=np.float64)
|
||
a_out = np.empty((PRED_ORDER, 8), dtype=np.float64)
|
||
|
||
for j in range(8):
|
||
y[:, j], a_out[:, j] = _tns_one_vector(x[:, j])
|
||
|
||
return y, a_out
|
||
|
||
if x.shape == (1024,):
|
||
x_vec = x
|
||
out_shape = (1024,)
|
||
elif x.shape == (1024, 1):
|
||
x_vec = x[:, 0]
|
||
out_shape = (1024, 1)
|
||
else:
|
||
raise ValueError('For non-ESH, frame_F_in must have shape (1024,) or (1024, 1).')
|
||
|
||
y_vec, a_q = _tns_one_vector(x_vec)
|
||
|
||
if out_shape == (1024,):
|
||
y_out = y_vec
|
||
else:
|
||
y_out = y_vec.reshape(1024, 1)
|
||
|
||
a_out = a_q.reshape(PRED_ORDER, 1)
|
||
return y_out, a_out
|
||
|
||
|
||
def aac_i_tns(frame_F_in: FrameChannelF, frame_type: FrameType, tns_coeffs: TnsCoeffs) -> FrameChannelF:
|
||
"""
|
||
Inverse Temporal Noise Shaping (iTNS) for ONE channel.
|
||
|
||
Parameters
|
||
----------
|
||
frame_F_in : FrameChannelF
|
||
Per-channel MDCT coefficients after TNS.
|
||
Expected (typical) shapes:
|
||
- If frame_type == "ESH": (128, 8)
|
||
- Else: (1024, 1) or (1024,)
|
||
|
||
frame_type : FrameType
|
||
Frame type code ("OLS", "LSS", "ESH", "LPS").
|
||
|
||
tns_coeffs : TnsCoeffs
|
||
Quantized TNS predictor coefficients.
|
||
Expected shapes:
|
||
- If frame_type == "ESH": (PRED_ORDER, 8)
|
||
- Else: (PRED_ORDER, 1)
|
||
|
||
Returns
|
||
-------
|
||
FrameChannelF
|
||
Per-channel MDCT coefficients after inverse TNS.
|
||
Same shape convention as input frame_F_in.
|
||
"""
|
||
x = np.asarray(frame_F_in, dtype=np.float64)
|
||
a = np.asarray(tns_coeffs, dtype=np.float64)
|
||
|
||
if frame_type == "ESH":
|
||
if x.shape != (128, 8):
|
||
raise ValueError("For ESH, frame_F_in must have shape (128, 8).")
|
||
if a.shape != (PRED_ORDER, 8):
|
||
raise ValueError("For ESH, tns_coeffs must have shape (PRED_ORDER, 8).")
|
||
|
||
y = np.empty_like(x, dtype=np.float64)
|
||
for j in range(8):
|
||
y[:, j] = _apply_itns_iir(x[:, j], a[:, j])
|
||
return y
|
||
|
||
if a.shape != (PRED_ORDER, 1):
|
||
raise ValueError("For non-ESH, tns_coeffs must have shape (PRED_ORDER, 1).")
|
||
|
||
if x.shape == (1024,):
|
||
x_vec = x
|
||
out_shape = (1024,)
|
||
elif x.shape == (1024, 1):
|
||
x_vec = x[:, 0]
|
||
out_shape = (1024, 1)
|
||
else:
|
||
raise ValueError('For non-ESH, frame_F_in must have shape (1024,) or (1024, 1).')
|
||
|
||
y_vec = _apply_itns_iir(x_vec, a[:, 0])
|
||
|
||
if out_shape == (1024,):
|
||
return y_vec
|
||
return y_vec.reshape(1024, 1)
|