# ------------------------------------------------------------ # AAC Coder/Decoder - AAC Coder (Core) # # Multimedia course at Aristotle University of # Thessaloniki (AUTh) # # Author: # Christos Choutouridis (ΑΕΜ 8997) # cchoutou@ece.auth.gr # # Description: # Level 1 AAC encoder orchestration. # Keeps the same functional behavior as the original level_1 implementation: # - Reads WAV via soundfile # - Validates stereo and 48 kHz # - Frames into 2048 samples with hop=1024 and zero padding at both ends # - SSC decision uses next-frame attack detection # - Filterbank analysis (MDCT) # - Stores per-channel spectra in AACSeq1 schema: # * ESH: (128, 8) # * else: (1024, 1) # ------------------------------------------------------------ from __future__ import annotations from pathlib import Path from typing import Union import soundfile as sf from core.aac_configuration import WIN_TYPE from core.aac_filterbank import aac_filter_bank from core.aac_ssc import aac_SSC from core.aac_tns import aac_tns from core.aac_types import * # ----------------------------------------------------------------------------- # Public helpers (useful for level_x demo wrappers) # ----------------------------------------------------------------------------- def aac_read_wav_stereo_48k(filename_in: Union[str, Path]) -> tuple[StereoSignal, int]: """ Read a WAV file using soundfile and validate the Level-1 assumptions. Parameters ---------- filename_in : Union[str, Path] Input WAV filename. Returns ------- x : StereoSignal (np.ndarray) Stereo samples as float64, shape (N, 2). fs : int Sampling rate (Hz). Must be 48000. Raises ------ ValueError If the input is not stereo or the sampling rate is not 48 kHz. """ filename_in = Path(filename_in) x, fs = sf.read(str(filename_in), always_2d=True) x = np.asarray(x, dtype=np.float64) if x.shape[1] != 2: raise ValueError("Input must be stereo (2 channels).") if int(fs) != 48000: raise ValueError("Input sampling rate must be 48 kHz.") return x, int(fs) def aac_pack_frame_f_to_seq_channels(frame_type: FrameType, frame_f: FrameF) -> tuple[FrameChannelF, FrameChannelF]: """ Convert the stereo FrameF returned by aac_filter_bank() into per-channel arrays as required by the Level-1 AACSeq1 schema. Parameters ---------- frame_type : FrameType "OLS" | "LSS" | "ESH" | "LPS". frame_f : FrameF Output of aac_filter_bank(): - If frame_type != "ESH": shape (1024, 2) - If frame_type == "ESH": shape (128, 16) packed as [L0 R0 L1 R1 ... L7 R7] Returns ------- chl_f : FrameChannelF Left channel coefficients: - ESH: shape (128, 8) - else: shape (1024, 1) chr_f : FrameChannelF Right channel coefficients: - ESH: shape (128, 8) - else: shape (1024, 1) """ if frame_type == "ESH": if frame_f.shape != (128, 16): raise ValueError("For ESH, frame_f must have shape (128, 16).") chl_f = np.empty((128, 8), dtype=np.float64) chr_f = np.empty((128, 8), dtype=np.float64) for j in range(8): chl_f[:, j] = frame_f[:, 2 * j + 0] chr_f[:, j] = frame_f[:, 2 * j + 1] return chl_f, chr_f # Non-ESH: store as (1024, 1) as required by the original Level-1 schema. if frame_f.shape != (1024, 2): raise ValueError("For OLS/LSS/LPS, frame_f must have shape (1024, 2).") chl_f = frame_f[:, 0:1].astype(np.float64, copy=False) chr_f = frame_f[:, 1:2].astype(np.float64, copy=False) return chl_f, chr_f # ----------------------------------------------------------------------------- # Level 1 encoder # ----------------------------------------------------------------------------- def aac_coder_1(filename_in: Union[str, Path]) -> AACSeq1: """ Level-1 AAC encoder. This function preserves the behavior of the original level_1 implementation: - Read stereo 48 kHz WAV - Pad hop samples at start and hop samples at end - Frame with win=2048, hop=1024 - Use SSC with next-frame lookahead - Apply filterbank analysis - Store per-channel coefficients using AACSeq1 schema Parameters ---------- filename_in : Union[str, Path] Input WAV filename. Assumption: stereo audio, sampling rate 48 kHz. Returns ------- AACSeq1 List of encoded frames (Level 1 schema). """ x, _ = aac_read_wav_stereo_48k(filename_in) # The assignment assumes 48 kHz hop = 1024 win = 2048 # Pad at the beginning to support the first overlap region. # Tail padding is kept minimal; next-frame is padded on-the-fly when needed. pad_pre = np.zeros((hop, 2), dtype=np.float64) pad_post = np.zeros((hop, 2), dtype=np.float64) x_pad = np.vstack([pad_pre, x, pad_post]) # Number of frames such that current frame fits; next frame will be padded if needed. K = int((x_pad.shape[0] - win) // hop + 1) if K <= 0: raise ValueError("Input too short for framing.") aac_seq: AACSeq1 = [] prev_frame_type: FrameType = "OLS" win_type: WinType = WIN_TYPE for i in range(K): start = i * hop frame_t: FrameT = x_pad[start:start + win, :] if frame_t.shape != (win, 2): # This should not happen due to K definition, but keep it explicit. raise ValueError("Internal framing error: frame_t has wrong shape.") next_t = x_pad[start + hop:start + hop + win, :] # Ensure next_t is always (2048, 2) by zero-padding at the tail. if next_t.shape[0] < win: tail = np.zeros((win - next_t.shape[0], 2), dtype=np.float64) next_t = np.vstack([next_t, tail]) frame_type = aac_SSC(frame_t, next_t, prev_frame_type) frame_f = aac_filter_bank(frame_t, frame_type, win_type) chl_f, chr_f = aac_pack_frame_f_to_seq_channels(frame_type, frame_f) aac_seq.append({ "frame_type": frame_type, "win_type": win_type, "chl": {"frame_F": chl_f}, "chr": {"frame_F": chr_f}, }) prev_frame_type = frame_type return aac_seq def aac_coder_2(filename_in: Union[str, Path]) -> AACSeq2: """ Level-2 AAC encoder (Level 1 + TNS). Parameters ---------- filename_in : Union[str, Path] Input WAV filename (stereo, 48 kHz). Returns ------- AACSeq2 Encoded AAC sequence (Level 2 payload schema). For each frame i: - "frame_type": FrameType - "win_type": WinType - "chl"/"chr": - "frame_F": FrameChannelF (after TNS) - "tns_coeffs": TnsCoeffs """ filename_in = Path(filename_in) x, _ = aac_read_wav_stereo_48k(filename_in) # The assignment assumes 48 kHz hop = 1024 win = 2048 pad_pre = np.zeros((hop, 2), dtype=np.float64) pad_post = np.zeros((hop, 2), dtype=np.float64) x_pad = np.vstack([pad_pre, x, pad_post]) K = int((x_pad.shape[0] - win) // hop + 1) if K <= 0: raise ValueError("Input too short for framing.") aac_seq: AACSeq2 = [] prev_frame_type: FrameType = "OLS" for i in range(K): start = i * hop frame_t: FrameT = x_pad[start : start + win, :] if frame_t.shape != (win, 2): raise ValueError("Internal framing error: frame_t has wrong shape.") next_t = x_pad[start + hop : start + hop + win, :] if next_t.shape[0] < win: tail = np.zeros((win - next_t.shape[0], 2), dtype=np.float64) next_t = np.vstack([next_t, tail]) frame_type = aac_SSC(frame_t, next_t, prev_frame_type) # Level 1 analysis (packed stereo container) frame_f_stereo = aac_filter_bank(frame_t, frame_type, WIN_TYPE) # Unpack to per-channel (as you already do in Level 1) if frame_type == "ESH": chl_f = np.empty((128, 8), dtype=np.float64) chr_f = np.empty((128, 8), dtype=np.float64) for j in range(8): chl_f[:, j] = frame_f_stereo[:, 2 * j + 0] chr_f[:, j] = frame_f_stereo[:, 2 * j + 1] else: chl_f = frame_f_stereo[:, 0:1].astype(np.float64, copy=False) chr_f = frame_f_stereo[:, 1:2].astype(np.float64, copy=False) # Level 2: apply TNS per channel chl_f_tns, chl_tns_coeffs = aac_tns(chl_f, frame_type) chr_f_tns, chr_tns_coeffs = aac_tns(chr_f, frame_type) aac_seq.append( { "frame_type": frame_type, "win_type": WIN_TYPE, "chl": {"frame_F": chl_f_tns, "tns_coeffs": chl_tns_coeffs}, "chr": {"frame_F": chr_f_tns, "tns_coeffs": chr_tns_coeffs}, } ) prev_frame_type = frame_type return aac_seq