# ------------------------------------------------------------ # AAC Coder/Decoder - AAC Coder (Core) # # Multimedia course at Aristotle University of # Thessaloniki (AUTh) # # Author: # Christos Choutouridis (ΑΕΜ 8997) # cchoutou@ece.auth.gr # # Description: # Level 1 AAC encoder orchestration. # Keeps the same functional behavior as the original level_1 implementation: # - Reads WAV via soundfile # - Validates stereo and 48 kHz # - Frames into 2048 samples with hop=1024 and zero padding at both ends # - SSC decision uses next-frame attack detection # - Filterbank analysis (MDCT) # - Stores per-channel spectra in AACSeq1 schema: # * ESH: (128, 8) # * else: (1024, 1) # ------------------------------------------------------------ from __future__ import annotations from pathlib import Path from typing import Union import soundfile as sf from scipy.io import savemat from core.aac_configuration import WIN_TYPE from core.aac_filterbank import aac_filter_bank from core.aac_ssc import aac_ssc from core.aac_tns import aac_tns from core.aac_psycho import aac_psycho from core.aac_quantizer import aac_quantizer # assumes your quantizer file is core/aac_quantizer.py from core.aac_huffman import aac_encode_huff from core.aac_utils import get_table, band_limits from material.huff_utils import load_LUT from core.aac_types import * # ----------------------------------------------------------------------------- # Helpers for thresholds (T(b)) # ----------------------------------------------------------------------------- def _band_slices_from_table(frame_type: FrameType) -> list[tuple[int, int]]: """ Return inclusive (lo, hi) band slices derived from TableB219. """ table, _ = get_table(frame_type) wlow, whigh, _bval, _qthr_db = band_limits(table) return [(int(lo), int(hi)) for lo, hi in zip(wlow, whigh)] def _thresholds_from_smr( frame_F_ch: FrameChannelF, frame_type: FrameType, SMR: FloatArray, ) -> FloatArray: """ Compute thresholds T(b) = P(b) / SMR(b), where P(b) is band energy. Shapes: - Long: returns (NB, 1) - ESH: returns (NB, 8) """ bands = _band_slices_from_table(frame_type) NB = len(bands) X = np.asarray(frame_F_ch, dtype=np.float64) SMR = np.asarray(SMR, dtype=np.float64) if frame_type == "ESH": if X.shape != (128, 8): raise ValueError("For ESH, frame_F_ch must have shape (128, 8).") if SMR.shape != (NB, 8): raise ValueError(f"For ESH, SMR must have shape ({NB}, 8).") T = np.zeros((NB, 8), dtype=np.float64) for j in range(8): Xj = X[:, j] for b, (lo, hi) in enumerate(bands): P = float(np.sum(Xj[lo : hi + 1] ** 2)) smr = float(SMR[b, j]) T[b, j] = 0.0 if smr <= 1e-12 else (P / smr) return T # Long if X.shape == (1024,): Xv = X elif X.shape == (1024, 1): Xv = X[:, 0] else: raise ValueError("For non-ESH, frame_F_ch must be shape (1024,) or (1024, 1).") if SMR.shape == (NB,): SMRv = SMR elif SMR.shape == (NB, 1): SMRv = SMR[:, 0] else: raise ValueError(f"For non-ESH, SMR must be shape ({NB},) or ({NB}, 1).") T = np.zeros((NB, 1), dtype=np.float64) for b, (lo, hi) in enumerate(bands): P = float(np.sum(Xv[lo : hi + 1] ** 2)) smr = float(SMRv[b]) T[b, 0] = 0.0 if smr <= 1e-12 else (P / smr) return T # ----------------------------------------------------------------------------- # Public helpers (useful for level_x demo wrappers) # ----------------------------------------------------------------------------- def aac_read_wav_stereo_48k(filename_in: Union[str, Path]) -> tuple[StereoSignal, int]: """ Read a WAV file using soundfile and validate the Level-1 assumptions. Parameters ---------- filename_in : Union[str, Path] Input WAV filename. Returns ------- x : StereoSignal (np.ndarray) Stereo samples as float64, shape (N, 2). fs : int Sampling rate (Hz). Must be 48000. Raises ------ ValueError If the input is not stereo or the sampling rate is not 48 kHz. """ filename_in = Path(filename_in) x, fs = sf.read(str(filename_in), always_2d=True) x = np.asarray(x, dtype=np.float64) if x.shape[1] != 2: raise ValueError("Input must be stereo (2 channels).") if int(fs) != 48000: raise ValueError("Input sampling rate must be 48 kHz.") return x, int(fs) def aac_pack_frame_f_to_seq_channels(frame_type: FrameType, frame_f: FrameF) -> tuple[FrameChannelF, FrameChannelF]: """ Convert the stereo FrameF returned by aac_filter_bank() into per-channel arrays as required by the Level-1 AACSeq1 schema. Parameters ---------- frame_type : FrameType "OLS" | "LSS" | "ESH" | "LPS". frame_f : FrameF Output of aac_filter_bank(): - If frame_type != "ESH": shape (1024, 2) - If frame_type == "ESH": shape (128, 16) packed as [L0 R0 L1 R1 ... L7 R7] Returns ------- chl_f : FrameChannelF Left channel coefficients: - ESH: shape (128, 8) - else: shape (1024, 1) chr_f : FrameChannelF Right channel coefficients: - ESH: shape (128, 8) - else: shape (1024, 1) """ if frame_type == "ESH": if frame_f.shape != (128, 16): raise ValueError("For ESH, frame_f must have shape (128, 16).") chl_f = np.empty((128, 8), dtype=np.float64) chr_f = np.empty((128, 8), dtype=np.float64) for j in range(8): chl_f[:, j] = frame_f[:, 2 * j + 0] chr_f[:, j] = frame_f[:, 2 * j + 1] return chl_f, chr_f # Non-ESH: store as (1024, 1) as required by the original Level-1 schema. if frame_f.shape != (1024, 2): raise ValueError("For OLS/LSS/LPS, frame_f must have shape (1024, 2).") chl_f = frame_f[:, 0:1].astype(np.float64, copy=False) chr_f = frame_f[:, 1:2].astype(np.float64, copy=False) return chl_f, chr_f # ----------------------------------------------------------------------------- # Level 1 encoder # ----------------------------------------------------------------------------- def aac_coder_1( filename_in: Union[str, Path], verbose: bool = False ) -> AACSeq1: """ Level-1 AAC encoder. This function preserves the behavior of the original level_1 implementation: - Read stereo 48 kHz WAV - Pad hop samples at start and hop samples at end - Frame with win=2048, hop=1024 - Use SSC with next-frame lookahead - Apply filterbank analysis - Store per-channel coefficients using AACSeq1 schema Parameters ---------- filename_in : Union[str, Path] Input WAV filename. Assumption: stereo audio, sampling rate 48 kHz. verbose : bool Optional argument to print encoding status Returns ------- AACSeq1 List of encoded frames (Level 1 schema). """ x, _ = aac_read_wav_stereo_48k(filename_in) # The assignment assumes 48 kHz hop = 1024 win = 2048 # Pad at the beginning to support the first overlap region. # Tail padding is kept minimal; next-frame is padded on-the-fly when needed. pad_pre = np.zeros((hop, 2), dtype=np.float64) pad_post = np.zeros((hop, 2), dtype=np.float64) x_pad = np.vstack([pad_pre, x, pad_post]) # Number of frames such that current frame fits; next frame will be padded if needed. K = int((x_pad.shape[0] - win) // hop + 1) if K <= 0: raise ValueError("Input too short for framing.") aac_seq: AACSeq1 = [] prev_frame_type: FrameType = "OLS" if verbose: print("Encoding ", end="", flush=True) for i in range(K): start = i * hop frame_t: FrameT = x_pad[start:start + win, :] if frame_t.shape != (win, 2): # This should not happen due to K definition, but keep it explicit. raise ValueError("Internal framing error: frame_t has wrong shape.") next_t = x_pad[start + hop:start + hop + win, :] # Ensure next_t is always (2048, 2) by zero-padding at the tail. if next_t.shape[0] < win: tail = np.zeros((win - next_t.shape[0], 2), dtype=np.float64) next_t = np.vstack([next_t, tail]) frame_type = aac_ssc(frame_t, next_t, prev_frame_type) frame_f = aac_filter_bank(frame_t, frame_type, WIN_TYPE) chl_f, chr_f = aac_pack_frame_f_to_seq_channels(frame_type, frame_f) aac_seq.append({ "frame_type": frame_type, "win_type": WIN_TYPE, "chl": {"frame_F": chl_f}, "chr": {"frame_F": chr_f}, }) prev_frame_type = frame_type if verbose and (i % (K//20)) == 0: print(".", end="", flush=True) if verbose: print(" done") return aac_seq def aac_coder_2( filename_in: Union[str, Path], verbose: bool = False ) -> AACSeq2: """ Level-2 AAC encoder (Level 1 + TNS). Parameters ---------- filename_in : Union[str, Path] Input WAV filename (stereo, 48 kHz). verbose : bool Optional argument to print encoding status Returns ------- AACSeq2 Encoded AAC sequence (Level 2 payload schema). For each frame i: - "frame_type": FrameType - "win_type": WinType - "chl"/"chr": - "frame_F": FrameChannelF (after TNS) - "tns_coeffs": TnsCoeffs """ filename_in = Path(filename_in) x, _ = aac_read_wav_stereo_48k(filename_in) # The assignment assumes 48 kHz hop = 1024 win = 2048 pad_pre = np.zeros((hop, 2), dtype=np.float64) pad_post = np.zeros((hop, 2), dtype=np.float64) x_pad = np.vstack([pad_pre, x, pad_post]) K = int((x_pad.shape[0] - win) // hop + 1) if K <= 0: raise ValueError("Input too short for framing.") aac_seq: AACSeq2 = [] prev_frame_type: FrameType = "OLS" if verbose: print("Encoding ", end="", flush=True) for i in range(K): start = i * hop frame_t: FrameT = x_pad[start : start + win, :] if frame_t.shape != (win, 2): raise ValueError("Internal framing error: frame_t has wrong shape.") next_t = x_pad[start + hop : start + hop + win, :] if next_t.shape[0] < win: tail = np.zeros((win - next_t.shape[0], 2), dtype=np.float64) next_t = np.vstack([next_t, tail]) frame_type = aac_ssc(frame_t, next_t, prev_frame_type) # Level 1 analysis (packed stereo container) frame_f_stereo = aac_filter_bank(frame_t, frame_type, WIN_TYPE) chl_f, chr_f = aac_pack_frame_f_to_seq_channels(frame_type, frame_f_stereo) # Level 2: apply TNS per channel chl_f_tns, chl_tns_coeffs = aac_tns(chl_f, frame_type) chr_f_tns, chr_tns_coeffs = aac_tns(chr_f, frame_type) aac_seq.append( { "frame_type": frame_type, "win_type": WIN_TYPE, "chl": {"frame_F": chl_f_tns, "tns_coeffs": chl_tns_coeffs}, "chr": {"frame_F": chr_f_tns, "tns_coeffs": chr_tns_coeffs}, } ) prev_frame_type = frame_type if verbose and (i % (K//20)) == 0: print(".", end="", flush=True) if verbose: print(" done") return aac_seq def aac_coder_3( filename_in: Union[str, Path], filename_aac_coded: Union[str, Path] | None = None, verbose: bool = False, ) -> AACSeq3: """ Level-3 AAC encoder (Level 2 + Psycho + Quantizer + Huffman). Parameters ---------- filename_in : Union[str, Path] Input WAV filename (stereo, 48 kHz). filename_aac_coded : Union[str, Path] | None Optional .mat filename to store aac_seq_3 (assignment convenience). verbose : bool Optional argument to print encoding status Returns ------- AACSeq3 Encoded AAC sequence (Level 3 payload schema). """ filename_in = Path(filename_in) x, _ = aac_read_wav_stereo_48k(filename_in) hop = 1024 win = 2048 pad_pre = np.zeros((hop, 2), dtype=np.float64) pad_post = np.zeros((hop, 2), dtype=np.float64) x_pad = np.vstack([pad_pre, x, pad_post]) K = int((x_pad.shape[0] - win) // hop + 1) if K <= 0: raise ValueError("Input too short for framing.") # Load Huffman LUTs once. huff_LUT_list = load_LUT() aac_seq: AACSeq3 = [] prev_frame_type: FrameType = "OLS" # Psycho model needs per-channel history (prev1, prev2) of 2048-sample frames. prev1_L = np.zeros((2048,), dtype=np.float64) prev2_L = np.zeros((2048,), dtype=np.float64) prev1_R = np.zeros((2048,), dtype=np.float64) prev2_R = np.zeros((2048,), dtype=np.float64) if verbose: print("Encoding ", end="", flush=True) for i in range(K): start = i * hop frame_t: FrameT = x_pad[start : start + win, :] if frame_t.shape != (win, 2): raise ValueError("Internal framing error: frame_t has wrong shape.") next_t = x_pad[start + hop : start + hop + win, :] if next_t.shape[0] < win: tail = np.zeros((win - next_t.shape[0], 2), dtype=np.float64) next_t = np.vstack([next_t, tail]) frame_type = aac_ssc(frame_t, next_t, prev_frame_type) # Analysis filterbank (stereo packed) frame_f_stereo = aac_filter_bank(frame_t, frame_type, WIN_TYPE) chl_f, chr_f = aac_pack_frame_f_to_seq_channels(frame_type, frame_f_stereo) # TNS per channel chl_f_tns, chl_tns_coeffs = aac_tns(chl_f, frame_type) chr_f_tns, chr_tns_coeffs = aac_tns(chr_f, frame_type) # Psychoacoustic model per channel (time-domain) frame_L = np.asarray(frame_t[:, 0], dtype=np.float64) frame_R = np.asarray(frame_t[:, 1], dtype=np.float64) SMR_L = aac_psycho(frame_L, frame_type, prev1_L, prev2_L) SMR_R = aac_psycho(frame_R, frame_type, prev1_R, prev2_R) # Thresholds T(b) (stored, not entropy-coded) T_L = _thresholds_from_smr(chl_f_tns, frame_type, SMR_L) T_R = _thresholds_from_smr(chr_f_tns, frame_type, SMR_R) # Quantizer per channel S_L, sfc_L, G_L = aac_quantizer(chl_f_tns, frame_type, SMR_L) S_R, sfc_R, G_R = aac_quantizer(chr_f_tns, frame_type, SMR_R) # Huffman-code ONLY the DPCM differences for b>0. # sfc[0] corresponds to alpha(0)=G and is stored separately in the frame. sfc_L_dpcm = np.asarray(sfc_L, dtype=np.int64)[1:, ...] sfc_R_dpcm = np.asarray(sfc_R, dtype=np.int64)[1:, ...] # sfc_L_stream, cb_sfc_L = aac_encode_huff(sfc_L_dpcm.reshape(-1, order="F"), huff_LUT_list, force_codebook=11) # sfc_R_stream, cb_sfc_R = aac_encode_huff(sfc_R_dpcm.reshape(-1, order="F"), huff_LUT_list, force_codebook=11) sfc_L_stream, cb_sfc_L = aac_encode_huff(sfc_L_dpcm.reshape(-1, order="F"), huff_LUT_list) sfc_R_stream, cb_sfc_R = aac_encode_huff(sfc_R_dpcm.reshape(-1, order="F"), huff_LUT_list) if cb_sfc_L != 11 or cb_sfc_R != 11: raise ValueError(f"Illegal codebook value for frame: {i}: cb_sfc_l={cb_sfc_L}, cb_sfc_r={cb_sfc_R}.") mdct_L_stream, cb_L = aac_encode_huff(np.asarray(S_L, dtype=np.int64).reshape(-1), huff_LUT_list) mdct_R_stream, cb_R = aac_encode_huff(np.asarray(S_R, dtype=np.int64).reshape(-1), huff_LUT_list) # Typed dict construction helps static analyzers validate the schema. frame_out: AACSeq3Frame = { "frame_type": frame_type, "win_type": WIN_TYPE, "chl": { "tns_coeffs": np.asarray(chl_tns_coeffs, dtype=np.float64), "T": np.asarray(T_L, dtype=np.float64), "G": G_L, "sfc": sfc_L_stream, "stream": mdct_L_stream, "codebook": int(cb_L), }, "chr": { "tns_coeffs": np.asarray(chr_tns_coeffs, dtype=np.float64), "T": np.asarray(T_R, dtype=np.float64), "G": G_R, "sfc": sfc_R_stream, "stream": mdct_R_stream, "codebook": int(cb_R), }, } aac_seq.append(frame_out) # Update psycho history (shift register) prev2_L = prev1_L prev1_L = frame_L prev2_R = prev1_R prev1_R = frame_R prev_frame_type = frame_type if verbose and (i % (K//20)) == 0: print(".", end="", flush=True) if verbose: print(" done") # Optional: store to .mat for the assignment wrapper if filename_aac_coded is not None: filename_aac_coded = Path(filename_aac_coded) savemat( str(filename_aac_coded), {"aac_seq_3": np.array(aac_seq, dtype=object)}, do_compression=True, ) return aac_seq