from __future__ import annotations
import queue
import threading
import time
from typing import Optional
import numpy as np
from ....common.constants import Constants
from ...core.o_port import OPort
from .source import Source
# Convenience constant for default output port name
PORT_OUT = Constants.Defaults.PORT_OUT
[docs]
class EventSource(Source):
"""Event-driven source for asynchronous data generation.
Generates events in response to external triggers.
"""
[docs]
def __init__(self, **kwargs):
"""Initialize event source with asynchronous output configuration.
Args:
**kwargs: Additional arguments for parent Source class including
channel_count, frame_size, and other configuration parameters.
"""
# Extract output_ports from kwargs with default async configuration
op_key = self.Configuration.Keys.OUTPUT_PORTS
output_ports: list[OPort.Configuration] = kwargs.pop(
op_key, [OPort.Configuration(timing=Constants.Timing.ASYNC)]
)
# Initialize parent Source with configuration
Source.__init__(self, output_ports=output_ports, **kwargs)
# Initialize delay thread components if delay is configured
if self.source_delay > 0:
self._delay_thread_queue: Optional[queue.Queue] = None
self._delay_thread: Optional[threading.Thread] = None
self._delay_thread_running: Optional[bool] = None
[docs]
def start(self):
"""Start event source."""
# Call parent start method first
Source.start(self)
# Initialize delay processing thread if needed
if self.source_delay > 0:
self._delay_thread_queue = queue.Queue()
self._delay_thread = threading.Thread(
target=self._timer_loop, daemon=True
)
self._delay_thread_running = True
self._delay_thread.start()
# Trigger initial cycle with empty data
op_key = self.Configuration.Keys.OUTPUT_PORTS
cc_key = self.Configuration.Keys.CHANNEL_COUNT
name_key = OPort.Configuration.Keys.NAME
data = {}
for op, cc in zip(self.config[op_key], self.config[cc_key]):
# Create zero-filled array with proper data type
data[op[name_key]] = None
self.cycle(data)
[docs]
def stop(self):
"""Stop event source."""
# Signal delay thread to stop
if hasattr(self, "_delay_thread_running"):
self._delay_thread_running = False
# Call parent stop method
Source.stop(self)
[docs]
def trigger(self, value, port_name=PORT_OUT):
"""Trigger an event with the specified value.
Args:
port_name: Name of the output port to trigger.
value: Event value to be transmitted. Converted to appropriate
data type and formatted as single-sample array.
"""
# Create data array with the event value
data = {}
if not isinstance(port_name, list):
port_name = [PORT_OUT]
value = [value]
for i in range(len(port_name)):
pn = port_name[i]
pv = value[i]
data[pn] = np.array([[pv]], dtype=Constants.DATA_TYPE)
if self.source_delay > 0:
# Queue event with timestamp for delayed processing
timestamp = time.monotonic()
self._delay_thread_queue.put((timestamp, data))
else:
# Process event immediately
self.cycle(data) # Trigger node cycle
def _timer_loop(self):
"""Background thread loop for delayed event processing.
Monitors delay queue for events that have reached their delay time
and processes them by triggering pipeline cycles.
"""
while self._delay_thread_running:
try:
# Check the oldest queued event without removing it
timestamp, data = self._delay_thread_queue.queue[0]
now = time.monotonic()
if now - timestamp >= self.source_delay:
# Delay period has elapsed, process the event
_, data = self._delay_thread_queue.get()
self.cycle(data) # Trigger node cycle
else:
# Delay period not yet elapsed, wait briefly
time.sleep(0.001)
except IndexError:
# Queue is empty, wait briefly before checking again
time.sleep(0.001)
[docs]
def step(self, data: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
"""Return current event data if available.
Args:
data: Input data dictionary (unused for source nodes).
Returns:
Dictionary containing event data if event is active, empty dict
otherwise.
"""
return data