Source code for gpype.backend.timing.delay

from __future__ import annotations

from collections import deque

import numpy as np

from ...common.constants import Constants
from ..core.io_node import IONode

#: Default input port identifier
PORT_IN = Constants.Defaults.PORT_IN
#: Default output port identifier
PORT_OUT = Constants.Defaults.PORT_OUT


[docs] class Delay(IONode): """Introduces configurable N-sample delay to input signal. Uses efficient deque-based buffer. Output is zero-initialized until buffer is filled with sufficient samples. """
[docs] class Configuration(IONode.Configuration):
[docs] class Keys(IONode.Configuration.Keys): #: Configuration key for number of delay samples NUM_SAMPLES = "num_samples"
[docs] def __init__(self, num_samples: int, **kwargs): """Initialize delay with specified number of samples. Args: num_samples: Number of samples to delay signal. Must be non-negative. **kwargs: Additional arguments for parent IONode. Raises: ValueError: If num_samples is negative. """ if num_samples < 0: raise ValueError("Number of taps must be non-negative.") super().__init__(num_samples=num_samples, **kwargs) #: Circular buffer storing delayed samples self._buffer: deque[np.ndarray] = None #: Number of samples to delay (same as num_samples) self._taps: int = num_samples #: Zero-filled frame for initialization period self._zero_frame: np.ndarray = None
[docs] def setup( self, data: dict[str, np.ndarray], port_context_in: dict[str, dict] ) -> dict[str, dict]: """Setup delay buffer and zero frame. Args: data: Input data arrays. port_context_in: Input port contexts containing channel count. Returns: Output port contexts from parent setup. Raises: ValueError: If channel count is not provided in context. """ md = port_context_in[PORT_IN] channel_count = md.get(Constants.Keys.CHANNEL_COUNT) if channel_count is None: raise ValueError("Channel count must be provided in context.") self._buffer = deque(maxlen=self._taps) self._zero_frame = np.zeros((1, channel_count), dtype=np.float32) return super().setup(data, port_context_in)
[docs] def step(self, data: dict[str, np.ndarray]) -> dict[str, np.ndarray]: """Process one step of delay. Args: data: Input data dictionary containing signal to delay. Returns: Dictionary with delayed signal or zero frame if buffer not full. """ data_in = data[PORT_IN] if self._taps == 0: return {PORT_OUT: data_in} # Append current sample self._buffer.append(data_in) # If not enough data collected yet, return zero if len(self._buffer) < self._taps: return {PORT_OUT: self._zero_frame} # Return delayed output return {PORT_OUT: self._buffer[0]}