Source code for gpype.backend.sinks.udp_sender

from __future__ import annotations

import socket
from typing import Optional

import numpy as np

from ...common.constants import Constants
from ..core.i_node import INode


[docs] class UDPSender(INode): """UDP sink node for real-time data transmission. Transmits data as float64 numpy arrays via UDP packets to a configurable target address. Each step() sends one packet with direct transmission. """ #: Default target IP address (localhost) DEFAULT_IP = "127.0.0.1" #: Default target UDP port number DEFAULT_PORT = 56000
[docs] class Configuration(INode.Configuration): """Configuration class for UDPSender parameters."""
[docs] class Keys(INode.Configuration.Keys): """Configuration keys for UDP sender settings.""" #: IP address configuration key IP = "ip" #: Port number configuration key PORT = "port"
[docs] def __init__( self, ip: Optional[str] = None, port: Optional[int] = None, **kwargs ): """Initialize UDP sender with target address and port. Args: ip: Target IP address. Defaults to localhost if None. port: Target port number. Defaults to DEFAULT_PORT if None. **kwargs: Additional arguments for parent INode. """ # Use default values if not specified if ip is None: ip = UDPSender.DEFAULT_IP if port is None: port = UDPSender.DEFAULT_PORT # Initialize parent INode with configuration INode.__init__(self, ip=ip, port=port, **kwargs) # Initialize networking components self._socket = None # UDP socket (created on start) self._target = (ip, port) # Target address tuple
[docs] def start(self): """Start UDP sender and initialize socket connection. Creates UDP socket and configures target address from configuration. Raises: OSError: If socket creation fails. """ # Create UDP socket for data transmission self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Update target address from current configuration self._target = ( self.config[self.Configuration.Keys.IP], self.config[self.Configuration.Keys.PORT], ) # Call parent start method super().start()
[docs] def stop(self): """Stop UDP sender and clean up socket resources.""" # Close socket and clean up resources if self._socket: self._socket.close() self._socket = None # Call parent stop method super().stop()
[docs] def setup( self, data: dict[str, np.ndarray], port_context_in: dict[str, dict] ) -> dict[str, dict]: """Setup method for pipeline initialization. Args: data: Input data arrays from connected ports. port_context_in: Context information from input ports. Returns: Empty dictionary (sink node has no output context). """ # No setup required for UDP transmission return {}
[docs] def step(self, data: dict[str, np.ndarray]) -> dict[str, np.ndarray]: """Process and transmit data via UDP. Converts data to float64, serializes to bytes, and sends via UDP. Args: data: Input data arrays. Uses default input port. Returns: Empty dictionary (sink node has no output). """ # Get data from default input port d = data[Constants.Defaults.PORT_IN] # Transmit data if socket is available if self._socket: # Convert to float64 and serialize to bytes for transmission # Using copy=False to avoid unnecessary allocation if already # float64 payload = d.astype(np.float64, copy=False).tobytes() # Send UDP packet to target address self._socket.sendto(payload, self._target) # No output data for sink nodes return {}