Fixed-Rate Source

class gpype.backend.sources.base.fixed_rate_source.FixedRateSource[source]

Bases: Source

Fixed-rate source for continuous data generation at sampling rate.

Generates data at fixed sampling rate using background thread with precise timing control and drift compensation.

class Configuration[source]

Bases: Configuration

Configuration class for FixedRateSource parameters.

class Keys[source]

Bases: Keys

Configuration keys for fixed-rate source settings.

SAMPLING_RATE = 'sampling_rate'

Configuration key for sampling rate in Hz

__init__(sampling_rate, **kwargs)[source]

Initialize configuration with sampling rate validation.

Parameters:
  • sampling_rate (float) – Sampling rate in Hz. Must be positive.

  • **kwargs – Additional configuration parameters.

Raises:

ValueError – If sampling_rate is not positive.

__init__(sampling_rate, output_ports=None, **kwargs)[source]

Initialize fixed-rate source with sampling configuration.

Parameters:
  • sampling_rate (float) – Sampling rate in Hz for data generation.

  • output_ports (Optional[list[Configuration]]) – List of output port configurations. Defaults to default port configuration if None.

  • **kwargs – Additional arguments for parent Source class.

start()[source]

Start fixed-rate source and begin data generation.

Initializes parent source and starts background thread for continuous data generation at specified sampling rate.

stop()[source]

Stop fixed-rate source and terminate data generation.

Signals background thread to stop and waits for completion.

setup(data, port_context_in)[source]

Setup output port contexts with sampling rate information.

Parameters:
  • data (dict[str, ndarray]) – Input data arrays (empty for source nodes).

  • port_context_in (dict[str, dict]) – Input port contexts (empty for source nodes).

Return type:

dict[str, dict]

Returns:

Dictionary of output port contexts with sampling_rate information.