Source code for gpype.backend.timing.decimator

from __future__ import annotations

import numpy as np

from ...common.constants import Constants
from ..core.io_node import IONode

#: Default input port identifier
PORT_IN = Constants.Defaults.PORT_IN
#: Default output port identifier
PORT_OUT = Constants.Defaults.PORT_OUT


[docs] class Decimator(IONode): """Decimator node for downsampling data streams. Reduces data rate by outputting only every Nth sample based on decimation factor. Adjusts sampling rate and frame size in output context accordingly. """
[docs] class Configuration(IONode.Configuration):
[docs] class Keys(IONode.Configuration.Keys): pass
[docs] def __init__(self, decimation_factor: int = 1, **kwargs): """Initialize decimator with decimation factor. Args: decimation_factor: Factor by which to reduce data rate. Must be positive integer. Value of 1 means no decimation. **kwargs: Additional arguments for parent IONode. """ super().__init__(decimation_factor=decimation_factor, **kwargs)
[docs] def setup( self, data: dict[str, np.ndarray], port_context_in: dict[str, dict] ) -> dict[str, dict]: """Setup output context with adjusted sampling rate and frame size. Args: data: Input data arrays. port_context_in: Input port contexts containing frame size and sampling rate information. Returns: Output port contexts with decimated sampling rate. Raises: ValueError: If frame_size is not provided or doesn't match decimation_factor. """ port_context_out = super().setup(data, port_context_in) frame_size = port_context_out[PORT_OUT][Constants.Keys.FRAME_SIZE] if frame_size is None: raise ValueError("frame_size must be provided in context.") M = self.config[self.Configuration.Keys.DECIMATION_FACTOR] if frame_size != 1 and frame_size != M: raise ValueError( f"frame_size {frame_size} must match " f"decimation_factor {M}" ) port_context_out[PORT_OUT][Constants.Keys.FRAME_SIZE] = 1 sr_key = Constants.Keys.SAMPLING_RATE sampling_rate_out = port_context_in[PORT_IN][sr_key] / M port_context_out[PORT_OUT][sr_key] = sampling_rate_out return port_context_out
[docs] def step(self, data: dict): """Process one step of decimation. Args: data: Input data dictionary containing data to be decimated. Returns: Dictionary with last sample of input data if decimation step, None otherwise. """ if self.is_decimation_step(): return {PORT_OUT: data[PORT_IN][-1:, :]} else: return None