import select
import socket
import threading
import time
from ...common.constants import Constants
from .base.event_source import EventSource
#: Default output port identifier
PORT_OUT = Constants.Defaults.PORT_OUT
[docs]
class UDPReceiver(EventSource):
"""UDP network receiver for capturing remote trigger events.
Provides network-based event capture. Listens on specified IP/port
for UDP packets containing numeric trigger values. Each trigger
outputs the received value, followed immediately by a zero.
"""
#: Source code fingerprint for licensing verification
FINGERPRINT = "3d171a479e053fe6b394b4cdc6b84e90"
#: Default IP address for localhost binding
DEFAULT_IP: str = "127.0.0.1"
#: Default UDP port number for listening
DEFAULT_PORT: int = 1000
#: Hold time in milliseconds before sending reset trigger
HOLD_TIME_MS: int = 10
[docs]
class Configuration(EventSource.Configuration):
"""Configuration class for UDP receiver network parameters."""
[docs]
class Keys(EventSource.Configuration.Keys):
"""Configuration key constants for the UDP receiver."""
#: Configuration key for IP address binding
IP: str = "ip"
#: Configuration key for UDP port number
PORT: str = "port"
[docs]
def __init__(
self, ip: str = DEFAULT_IP, port: int = DEFAULT_PORT, **kwargs
):
"""Initialize UDP receiver.
Args:
ip: IP address to bind socket to. Use "0.0.0.0" for all interfaces
or "127.0.0.1" for localhost. Defaults to localhost.
port: UDP port number to listen on. Defaults to 1000.
**kwargs: Additional parameters for EventSource base class.
"""
# Initialize parent EventSource with network configuration
super().__init__(ip=ip, port=port, **kwargs)
#: Flag indicating if UDP listener thread is running
self._udp_thread_running = False
#: UDP socket instance for message reception
self._socket = None
#: Background thread for UDP message listening
self._udp_thread = None
#: Start time for timing analysis
self._t_start = None
def _udp_listener(self):
"""Background thread function for UDP message reception.
Creates UDP socket and continuously listens for incoming messages.
Parses numeric string data and triggers events. Uses select() for
non-blocking operation to allow clean shutdown.
"""
# Create UDP socket for receiving messages
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._socket.setblocking(False) # Non-blocking for select()
# Get network configuration
ip_key = self.Configuration.Keys.IP
port_key = self.Configuration.Keys.PORT
ip = self.config[ip_key]
port = self.config[port_key]
# Bind socket to specified address and port
self._socket.bind((ip, port))
# Main reception loop
while self._udp_thread_running:
# Use select with timeout for non-blocking check
ready, _, _ = select.select([self._socket], [], [], 0.01)
if not ready:
continue
if not self._udp_thread_running:
break
try:
# Receive UDP packet (max 1024 bytes)
data, _ = self._socket.recvfrom(1024)
message = data.decode().strip()
# Parse message - only numeric strings are supported
if message.isdigit():
# Valid integer format: "123"
value = int(message)
else:
# Non-numeric message, skip silently
raise ValueError("Unsupported message format")
# Trigger events: first the value, then 0 for reset
self.trigger(value)
self.trigger(0)
except Exception:
# Handle any decoding or parsing errors silently
continue
[docs]
def start(self):
"""Start UDP receiver and begin listening for messages.
Initializes background UDP listener thread and starts monitoring
for incoming trigger messages.
"""
# Start parent EventSource
super().start()
# Start UDP listener thread if not already running
if not self._udp_thread_running:
self._udp_thread_running = True
self._udp_thread = threading.Thread(
target=self._udp_listener, daemon=True
)
self._udp_thread.start()
# Record start time for potential timing analysis
self._t_start = time.perf_counter()
[docs]
def stop(self):
"""Stop UDP receiver and cleanup network resources.
Stops background listener thread, closes UDP socket, and waits for
clean thread termination.
"""
# Stop parent EventSource first
super().stop()
# Stop UDP listener thread and cleanup resources
if self._udp_thread_running:
self._udp_thread_running = False
# Close socket to release network resources
if self._socket:
self._socket.close()
self._socket = None
# Wait for thread to complete
if self._udp_thread:
self._udp_thread.join()
self._udp_thread = None