Source code for gpype.backend.sources.generator

from __future__ import annotations

import ioiocore as ioc
import numpy as np

from ...common.constants import Constants
from ..core.o_port import OPort
from .base.fixed_rate_source import FixedRateSource

#: Port identifier for signal output
OUT_PORT = ioc.Constants.Defaults.PORT_OUT


[docs] class Generator(FixedRateSource): """Signal generator source for creating synthetic test signals. Generates configurable test signals with optional noise for testing pipelines. Supports multiple waveforms (sine, rectangular, pulse) with multi-channel output. """ #: Sinusoidal waveform signal shape SHAPE_SINUSOID = "sine" #: Square wave signal shape SHAPE_RECTANGULAR = "rect" #: Brief pulses signal shape SHAPE_PULSE = "pulse" #: Source code fingerprint for integrity verification FINGERPRINT = "3baf246bbbfcfe671b205dffcf10b52e" #: Default sampling rate in Hz DEFAULT_SAMPLING_RATE = 250.0 #: Default number of channels DEFAULT_CHANNEL_COUNT = 8 #: Default signal frequency in Hz DEFAULT_SIGNAL_FREQUENCY = 10.0 #: Default signal shape DEFAULT_SIGNAL_SHAPE = SHAPE_SINUSOID #: Default signal amplitude DEFAULT_SIGNAL_AMPLITUDE = 0.0 #: Default noise amplitude DEFAULT_NOISE_AMPLITUDE = 0.0
[docs] class Configuration(FixedRateSource.Configuration): """Configuration class for Generator signal parameters."""
[docs] class Keys(FixedRateSource.Configuration.Keys): """Configuration key constants for the Generator.""" #: Signal frequency configuration key SIGNAL_FREQUENCY = "signal_frequency" #: Signal shape configuration key SIGNAL_SHAPE = "signal_shape" #: Signal amplitude configuration key SIGNAL_AMPLITUDE = "signal_amplitude" #: Noise amplitude configuration key NOISE_AMPLITUDE = "noise_amplitude"
[docs] def __init__( self, sampling_rate: float = None, channel_count: int = None, frame_size: int = None, signal_frequency: float = None, signal_shape: str = None, signal_amplitude: float = 0.0, noise_amplitude: float = 0.0, **kwargs, ): """Initialize signal generator. Args: sampling_rate: Sampling frequency in Hz. channel_count: Number of output channels. All get same signals. frame_size: Samples per output frame. signal_frequency: Signal frequency in Hz. Defaults to 10.0. signal_shape: Waveform shape (sine, rect, pulse). Defaults to sine. signal_amplitude: Peak amplitude of signal component. noise_amplitude: Standard deviation of Gaussian noise. **kwargs: Additional parameters for FixedRateSource. Raises: ValueError: If signal_frequency or noise_amplitude is negative, or signal_shape is unsupported. """ # Set default values and validate parameters if sampling_rate is None: sampling_rate = self.DEFAULT_SAMPLING_RATE if channel_count is None: channel_count = self.DEFAULT_CHANNEL_COUNT if signal_frequency is None: signal_frequency = self.DEFAULT_SIGNAL_FREQUENCY if signal_frequency < 0: raise ValueError("signal_frequency must be positive.") if signal_shape is None: signal_shape = self.DEFAULT_SIGNAL_SHAPE if signal_amplitude is None: signal_amplitude = self.DEFAULT_SIGNAL_AMPLITUDE if noise_amplitude is None: noise_amplitude = self.DEFAULT_NOISE_AMPLITUDE if noise_amplitude < 0: raise ValueError("noise_amplitude must be positive.") frame_size = kwargs.pop( Generator.Configuration.Keys.FRAME_SIZE, frame_size ) decimation_factor = frame_size decimation_factor = kwargs.pop( Generator.Configuration.Keys.DECIMATION_FACTOR, decimation_factor ) # Configure output ports output_ports = kwargs.pop( Generator.Configuration.Keys.OUTPUT_PORTS, [OPort.Configuration()] ) # Initialize parent FixedRateSource with all parameters FixedRateSource.__init__( self, sampling_rate=sampling_rate, channel_count=channel_count, frame_size=frame_size, decimation_factor=decimation_factor, signal_frequency=signal_frequency, signal_amplitude=signal_amplitude, signal_shape=signal_shape, noise_amplitude=noise_amplitude, output_ports=output_ports, **kwargs, ) # Initialize time tracking for continuous signal generation self._time = 0.0 # Initialize random number generator for noise generation self._rng = np.random.default_rng()
[docs] def step(self, data: dict[str, np.ndarray]) -> dict[str, np.ndarray]: """Generate one frame of synthetic signal data. Creates a frame containing the configured waveform plus optional noise. Maintains phase continuity across frames through time tracking. Args: data: Input data dictionary (unused for signal generation). Returns: Output data dictionary with generated signal frame of shape (frame_size, channel_count), or None if not a decimation step. """ # Check if this is a decimation step (frame generation timing) if not self.is_decimation_step(): return None # Get configuration parameters config = self.config frame_size = config[self.Configuration.Keys.FRAME_SIZE][0] ch_count = config[self.Configuration.Keys.CHANNEL_COUNT][0] # Initialize output frame with zeros output = np.zeros((frame_size, ch_count), dtype=Constants.DATA_TYPE) # Create time vector for this frame dt = 1.0 / config[self.Configuration.Keys.SAMPLING_RATE] t = np.linspace( self._time, self._time + (frame_size - 1) * dt, frame_size ) # Generate signal component if amplitude > 0 freq = config[self.Configuration.Keys.SIGNAL_FREQUENCY] amp = config[self.Configuration.Keys.SIGNAL_AMPLITUDE] shape = config[self.Configuration.Keys.SIGNAL_SHAPE] if freq and amp > 0.0: # Generate waveform based on selected shape if shape == self.SHAPE_SINUSOID: # Smooth sinusoidal waveform wave = amp * np.sin(2 * np.pi * freq * t) elif shape == self.SHAPE_RECTANGULAR: # Square wave (sign of sine function) wave = amp * np.sign(np.sin(2 * np.pi * freq * t)) elif shape == self.SHAPE_PULSE: # Brief pulses at specified frequency period = 1.0 / freq wave = np.zeros_like(t) for i, ti in enumerate(t): # Generate pulse at start of each period if (ti % period) < dt: wave[i] = amp else: raise ValueError(f"Unsupported signal shape: {shape}") # Broadcast signal to all channels output += wave[:, np.newaxis] # Update internal time for next frame (maintains phase continuity) self._time += frame_size * dt # Add noise component if amplitude > 0 noise_amp = config[self.Configuration.Keys.NOISE_AMPLITUDE] if noise_amp > 0.0: # Generate Gaussian noise for all channels noise = self._rng.standard_normal(size=output.shape) * noise_amp output += noise.astype(Constants.DATA_TYPE) return {OUT_PORT: output}