446 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# ------------------------------------------------------------
# AAC Coder/Decoder - Inverse AAC Coder (Core)
#
# Multimedia course at Aristotle University of
# Thessaloniki (AUTh)
#
# Author:
# Christos Choutouridis (ΑΕΜ 8997)
# cchoutou@ece.auth.gr
#
# Description:
# - Level 1 AAC decoder orchestration (inverse of aac_coder_1()).
# - Level 2 AAC decoder orchestration (inverse of aac_coder_1()).
#
# ------------------------------------------------------------
from __future__ import annotations
from pathlib import Path
from typing import Union
import soundfile as sf
from core.aac_filterbank import aac_i_filter_bank
from core.aac_tns import aac_i_tns
from core.aac_quantizer import aac_i_quantizer
from core.aac_huffman import aac_decode_huff
from core.aac_utils import get_table, band_limits
from material.huff_utils import load_LUT
from core.aac_types import *
# -----------------------------------------------------------------------------
# Helper for NB
# -----------------------------------------------------------------------------
def _nbands(frame_type: FrameType) -> int:
table, _ = get_table(frame_type)
wlow, _whigh, _bval, _qthr_db = band_limits(table)
return int(len(wlow))
# -----------------------------------------------------------------------------
# Public helpers
# -----------------------------------------------------------------------------
def aac_unpack_seq_channels(frame_type: FrameType, chl_f: FrameChannelF, chr_f: FrameChannelF) -> FrameF:
"""
Re-pack per-channel spectra from the Level-1 AACSeq1 schema into the stereo
FrameF container expected by aac_i_filter_bank().
Parameters
----------
frame_type : FrameType
"OLS" | "LSS" | "ESH" | "LPS".
chl_f : FrameChannelF
Left channel coefficients:
- ESH: (128, 8)
- else: (1024, 1)
chr_f : FrameChannelF
Right channel coefficients:
- ESH: (128, 8)
- else: (1024, 1)
Returns
-------
FrameF
Stereo coefficients:
- ESH: (128, 16) packed as [L0 R0 L1 R1 ... L7 R7]
- else: (1024, 2)
"""
if frame_type == "ESH":
if chl_f.shape != (128, 8) or chr_f.shape != (128, 8):
raise ValueError("ESH channel frame_F must have shape (128, 8).")
frame_f = np.empty((128, 16), dtype=np.float64)
for j in range(8):
frame_f[:, 2 * j + 0] = chl_f[:, j]
frame_f[:, 2 * j + 1] = chr_f[:, j]
return frame_f
# Non-ESH: expected (1024, 1) per channel in Level-1 schema.
if chl_f.shape != (1024, 1) or chr_f.shape != (1024, 1):
raise ValueError("Non-ESH channel frame_F must have shape (1024, 1).")
frame_f = np.empty((1024, 2), dtype=np.float64)
frame_f[:, 0] = chl_f[:, 0]
frame_f[:, 1] = chr_f[:, 0]
return frame_f
def aac_remove_padding(y_pad: StereoSignal, hop: int = 1024) -> StereoSignal:
"""
Remove the boundary padding that the Level-1 encoder adds:
hop samples at start and hop samples at end.
Parameters
----------
y_pad : StereoSignal (np.ndarray)
Reconstructed padded stream, shape (N_pad, 2).
hop : int
Hop size in samples (default 1024).
Returns
-------
StereoSignal (np.ndarray)
Unpadded reconstructed stream, shape (N_pad - 2*hop, 2).
Raises
------
ValueError
If y_pad is too short to unpad.
"""
if y_pad.shape[0] < 2 * hop:
raise ValueError("Decoded stream too short to unpad.")
return y_pad[hop:-hop, :]
# -----------------------------------------------------------------------------
# Level 1 decoder
# -----------------------------------------------------------------------------
def aac_decoder_1(
aac_seq_1: AACSeq1,
filename_out: Union[str, Path],
verbose: bool = False
) -> StereoSignal:
"""
Level-1 AAC decoder (inverse of aac_coder_1()).
This function preserves the behavior of the original level_1 implementation:
- Reconstruct the full padded stream by overlap-adding K synthesized frames
- Remove hop padding at the beginning and hop padding at the end
- Write the reconstructed stereo WAV file (48 kHz)
- Return reconstructed stereo samples as float64
Parameters
----------
aac_seq_1 : AACSeq1
Encoded sequence as produced by aac_coder_1().
filename_out : Union[str, Path]
Output WAV filename. Assumption: 48 kHz, stereo.
verbose : bool
Optional argument to print encoding status
Returns
-------
StereoSignal
Decoded audio samples (time-domain), stereo, shape (N, 2), dtype float64.
"""
filename_out = Path(filename_out)
hop = 1024
win = 2048
K = len(aac_seq_1)
# Output includes the encoder padding region, so we reconstruct the full padded stream.
# For K frames: last frame starts at (K-1)*hop and spans win,
# so total length = (K-1)*hop + win.
n_pad = (K - 1) * hop + win
y_pad: StereoSignal = np.zeros((n_pad, 2), dtype=np.float64)
if verbose:
print("Decoding ", end="", flush=True)
for i, fr in enumerate(aac_seq_1):
frame_type: FrameType = fr["frame_type"]
win_type: WinType = fr["win_type"]
chl_f = np.asarray(fr["chl"]["frame_F"], dtype=np.float64)
chr_f = np.asarray(fr["chr"]["frame_F"], dtype=np.float64)
frame_f: FrameF = aac_unpack_seq_channels(frame_type, chl_f, chr_f)
frame_t_hat: FrameT = aac_i_filter_bank(frame_f, frame_type, win_type) # (2048, 2)
start = i * hop
y_pad[start:start + win, :] += frame_t_hat
if verbose and (i % (K//20)) == 0:
print(".", end="", flush=True)
y: StereoSignal = aac_remove_padding(y_pad, hop=hop)
if verbose:
print(" done")
# Level 1 assumption: 48 kHz output.
sf.write(str(filename_out), y, 48000)
return y
# -----------------------------------------------------------------------------
# Level 2 decoder
# -----------------------------------------------------------------------------
def aac_decoder_2(
aac_seq_2: AACSeq2,
filename_out: Union[str, Path],
verbose: bool = False
) -> StereoSignal:
"""
Level-2 AAC decoder (inverse of aac_coder_2).
Behavior matches Level 1 decoder pipeline, with additional iTNS stage:
- Per frame/channel: inverse TNS using stored coefficients
- Re-pack to stereo frame_F
- IMDCT + windowing
- Overlap-add over frames
- Remove Level-1 padding (hop samples start/end)
- Write output WAV (48 kHz)
Parameters
----------
aac_seq_2 : AACSeq2
Encoded sequence as produced by aac_coder_2().
filename_out : Union[str, Path]
Output WAV filename.
verbose : bool
Optional argument to print encoding status
Returns
-------
StereoSignal
Decoded audio samples (time-domain), stereo, shape (N, 2), dtype float64.
"""
filename_out = Path(filename_out)
hop = 1024
win = 2048
K = len(aac_seq_2)
if K <= 0:
raise ValueError("aac_seq_2 must contain at least one frame.")
n_pad = (K - 1) * hop + win
y_pad = np.zeros((n_pad, 2), dtype=np.float64)
if verbose:
print("Decoding ", end="", flush=True)
for i, fr in enumerate(aac_seq_2):
frame_type: FrameType = fr["frame_type"]
win_type: WinType = fr["win_type"]
chl_f_tns = np.asarray(fr["chl"]["frame_F"], dtype=np.float64)
chr_f_tns = np.asarray(fr["chr"]["frame_F"], dtype=np.float64)
chl_coeffs = np.asarray(fr["chl"]["tns_coeffs"], dtype=np.float64)
chr_coeffs = np.asarray(fr["chr"]["tns_coeffs"], dtype=np.float64)
# Inverse TNS per channel
chl_f = aac_i_tns(chl_f_tns, frame_type, chl_coeffs)
chr_f = aac_i_tns(chr_f_tns, frame_type, chr_coeffs)
# Re-pack to the stereo container expected by aac_i_filter_bank
if frame_type == "ESH":
if chl_f.shape != (128, 8) or chr_f.shape != (128, 8):
raise ValueError("ESH channel frame_F must have shape (128, 8).")
frame_f: FrameF = np.empty((128, 16), dtype=np.float64)
for j in range(8):
frame_f[:, 2 * j + 0] = chl_f[:, j]
frame_f[:, 2 * j + 1] = chr_f[:, j]
else:
# Accept either (1024,1) or (1024,) from your internal convention.
if chl_f.shape == (1024,):
chl_col = chl_f.reshape(1024, 1)
elif chl_f.shape == (1024, 1):
chl_col = chl_f
else:
raise ValueError("Non-ESH left channel frame_F must be shape (1024,) or (1024, 1).")
if chr_f.shape == (1024,):
chr_col = chr_f.reshape(1024, 1)
elif chr_f.shape == (1024, 1):
chr_col = chr_f
else:
raise ValueError("Non-ESH right channel frame_F must be shape (1024,) or (1024, 1).")
frame_f = np.empty((1024, 2), dtype=np.float64)
frame_f[:, 0] = chl_col[:, 0]
frame_f[:, 1] = chr_col[:, 0]
frame_t_hat: FrameT = aac_i_filter_bank(frame_f, frame_type, win_type)
start = i * hop
y_pad[start : start + win, :] += frame_t_hat
if verbose and (i % (K//20)) == 0:
print(".", end="", flush=True)
y = aac_remove_padding(y_pad, hop=hop)
if verbose:
print(" done")
sf.write(str(filename_out), y, 48000)
return y
def aac_decoder_3(
aac_seq_3: AACSeq3,
filename_out: Union[str, Path],
verbose: bool = False,
) -> StereoSignal:
"""
Level-3 AAC decoder (inverse of aac_coder_3).
Steps per frame:
- Huffman decode scalefactors (sfc) using codebook 11
- Huffman decode MDCT symbols (stream) using stored codebook
- iQuantizer -> MDCT coefficients after TNS
- iTNS using stored predictor coefficients
- IMDCT filterbank -> time domain
- Overlap-add, remove padding, write WAV
Parameters
----------
aac_seq_3 : AACSeq3
Encoded sequence as produced by aac_coder_3.
filename_out : Union[str, Path]
Output WAV filename.
verbose : bool
Optional argument to print encoding status
Returns
-------
StereoSignal
Decoded audio samples (time-domain), stereo, shape (N, 2), dtype float64.
"""
filename_out = Path(filename_out)
hop = 1024
win = 2048
K = len(aac_seq_3)
if K <= 0:
raise ValueError("aac_seq_3 must contain at least one frame.")
# Load Huffman LUTs once.
huff_LUT_list = load_LUT()
n_pad = (K - 1) * hop + win
y_pad = np.zeros((n_pad, 2), dtype=np.float64)
if verbose:
print("Decoding ", end="", flush=True)
for i, fr in enumerate(aac_seq_3):
frame_type: FrameType = fr["frame_type"]
win_type: WinType = fr["win_type"]
NB = _nbands(frame_type)
# We store G separately, so Huffman stream contains only (NB-1) DPCM differences.
sfc_len = (NB - 1) * (8 if frame_type == "ESH" else 1)
# -------------------------
# Left channel
# -------------------------
tns_L = np.asarray(fr["chl"]["tns_coeffs"], dtype=np.float64)
G_L = fr["chl"]["G"]
sfc_bits_L = fr["chl"]["sfc"]
mdct_bits_L = fr["chl"]["stream"]
cb_L = int(fr["chl"]["codebook"])
sfc_dec_L = aac_decode_huff(sfc_bits_L, 11, huff_LUT_list)[:sfc_len].astype(np.int64, copy=False)
if frame_type == "ESH":
sfc_dpcm_L = sfc_dec_L.reshape(NB - 1, 8, order="F")
sfc_L = np.zeros((NB, 8), dtype=np.int64)
Gv = np.asarray(G_L, dtype=np.float64).reshape(1, 8)
sfc_L[0, :] = Gv[0, :].astype(np.int64)
sfc_L[1:, :] = sfc_dpcm_L
else:
sfc_dpcm_L = sfc_dec_L.reshape(NB - 1, 1, order="F")
sfc_L = np.zeros((NB, 1), dtype=np.int64)
sfc_L[0, 0] = int(float(G_L))
sfc_L[1:, :] = sfc_dpcm_L
# MDCT symbols: codebook 0 means "all-zero section"
if cb_L == 0:
S_dec_L = np.zeros((1024,), dtype=np.int64)
else:
S_tmp_L = aac_decode_huff(mdct_bits_L, cb_L, huff_LUT_list).astype(np.int64, copy=False)
# Tuple coding may produce extra trailing symbols; caller knows the true length (1024).
# Also guard against short outputs by zero-padding.
if S_tmp_L.size < 1024:
S_dec_L = np.zeros((1024,), dtype=np.int64)
S_dec_L[: S_tmp_L.size] = S_tmp_L
else:
S_dec_L = S_tmp_L[:1024]
S_L = S_dec_L.reshape(1024, 1)
Xq_L = aac_i_quantizer(S_L, sfc_L, G_L, frame_type)
X_L = aac_i_tns(Xq_L, frame_type, tns_L)
# -------------------------
# Right channel
# -------------------------
tns_R = np.asarray(fr["chr"]["tns_coeffs"], dtype=np.float64)
G_R = fr["chr"]["G"]
sfc_bits_R = fr["chr"]["sfc"]
mdct_bits_R = fr["chr"]["stream"]
cb_R = int(fr["chr"]["codebook"])
sfc_dec_R = aac_decode_huff(sfc_bits_R, 11, huff_LUT_list)[:sfc_len].astype(np.int64, copy=False)
if frame_type == "ESH":
sfc_dpcm_R = sfc_dec_R.reshape(NB - 1, 8, order="F")
sfc_R = np.zeros((NB, 8), dtype=np.int64)
Gv = np.asarray(G_R, dtype=np.float64).reshape(1, 8)
sfc_R[0, :] = Gv[0, :].astype(np.int64)
sfc_R[1:, :] = sfc_dpcm_R
else:
sfc_dpcm_R = sfc_dec_R.reshape(NB - 1, 1, order="F")
sfc_R = np.zeros((NB, 1), dtype=np.int64)
sfc_R[0, 0] = int(float(G_R))
sfc_R[1:, :] = sfc_dpcm_R
if cb_R == 0:
S_dec_R = np.zeros((1024,), dtype=np.int64)
else:
S_tmp_R = aac_decode_huff(mdct_bits_R, cb_R, huff_LUT_list).astype(np.int64, copy=False)
if S_tmp_R.size < 1024:
S_dec_R = np.zeros((1024,), dtype=np.int64)
S_dec_R[: S_tmp_R.size] = S_tmp_R
else:
S_dec_R = S_tmp_R[:1024]
S_R = S_dec_R.reshape(1024, 1)
Xq_R = aac_i_quantizer(S_R, sfc_R, G_R, frame_type)
X_R = aac_i_tns(Xq_R, frame_type, tns_R)
# Re-pack to stereo container and inverse filterbank
frame_f = aac_unpack_seq_channels(frame_type, np.asarray(X_L), np.asarray(X_R))
frame_t_hat: FrameT = aac_i_filter_bank(frame_f, frame_type, win_type)
start = i * hop
y_pad[start : start + win, :] += frame_t_hat
if verbose and (i % (K//20)) == 0:
print(".", end="", flush=True)
y = aac_remove_padding(y_pad, hop=hop)
if verbose:
print(" done")
sf.write(str(filename_out), y, 48000)
return y