Source code for gpype.frontend.widgets.time_series_scope

import threading
import time

import ioiocore as ioc
import numpy as np
import pyqtgraph as pg
from PySide6 import QtWidgets
from PySide6.QtCore import QEvent, QObject, Qt
from PySide6.QtGui import QColor, QFont, QPalette

from ...backend.core.i_port import IPort
from ...common.constants import Constants
from .base.scope import Scope

#: Default input port identifier for time series data
PORT_IN = ioc.Constants.Defaults.PORT_IN


[docs] class TimeSeriesScope(Scope): """Real-time oscilloscope widget for multi-channel time series. Displays continuous time-series data from BCI pipelines with configurable time windows, amplitude scaling, channel hiding, and event markers. """ #: Default display window duration in seconds DEFAULT_TIME_WINDOW: int = 10 #: Default amplitude scale limit in microvolts DEFAULT_AMPLITUDE_LIMIT: float = 50
[docs] class Markers(dict): """Container for event marker configuration. Stores marker properties including color, label, channel association, and trigger value for event-based visualization. Args: color: Hex color code or color name for marker visualization. label: Text label for marker identification. channel: Channel index where marker should be displayed. value: Trigger value that activates the marker. """
[docs] def __init__(self, color: str, label: str, channel: int, value: int): """Initialize marker configuration with display properties.""" super().__init__() self["color"] = color self["label"] = label self["channel"] = channel self["value"] = value
[docs] class Configuration(Scope.Configuration): """Configuration keys for TimeSeriesScope widget settings. Extends the base Scope configuration with time series specific parameters for display window, amplitude scaling, event markers, and channel visibility management. """
[docs] class Keys(Scope.Configuration.Keys): """Required configuration parameter keys.""" #: Configuration key for display duration in seconds TIME_WINDOW = "time_window" #: Configuration key for Y-axis scale in microvolts AMPLITUDE_LIMIT = "amplitude_limit"
[docs] class KeysOptional: """Optional configuration parameter keys.""" #: Configuration key for event marker configurations MARKERS = "markers" #: Configuration key for channels to hide from display HIDDEN_CHANNELS = "hidden_channels"
[docs] class KeyPressFilter(QObject): """Event filter for keyboard shortcuts in the time series scope. Captures keyboard events to provide interactive functionality. Currently handles Alt+R for performance monitoring toggle. Args: callback: Function to call when target key combination is pressed. """
[docs] def __init__(self, callback): """Initialize key press filter with callback function.""" super().__init__() self.callback = callback
[docs] def eventFilter(self, obj, event): """Filter keyboard events and trigger callbacks for shortcuts. Args: obj: Qt object that received the event. event: Qt event to process. Returns: bool: False to allow event propagation, True to consume event. """ if event.type() == QEvent.KeyPress: if event.key() == Qt.Key_R: if event.modifiers() & Qt.AltModifier: self.callback() return False
[docs] def __init__( self, time_window: int = None, amplitude_limit: float = None, markers: list = None, hidden_channels: list = None, name: str = None, **kwargs, ): """Initialize the time series oscilloscope widget. Args: time_window: Display window duration in seconds (1-120). Uses DEFAULT_TIME_WINDOW if None. amplitude_limit: Y-axis scale limit in microvolts (1-5000). Uses DEFAULT_AMPLITUDE_LIMIT if None. markers: List of marker configurations for event visualization. Empty list if None. hidden_channels: List of channel indices to hide from display. Empty list if None. **kwargs: Additional arguments passed to parent Scope class. Raises: ValueError: If time_window is outside valid range (1-120). ValueError: If amplitude_limit is outside reasonable range. """ # Set default values if not provided if time_window is None: time_window = self.DEFAULT_TIME_WINDOW if amplitude_limit is None: amplitude_limit = self.DEFAULT_AMPLITUDE_LIMIT # Validate time window parameters if time_window <= 1: raise ValueError("time_window must be longer than 1 second.") if time_window >= 240: raise ValueError("time_window must be shorter than 240 seconds.") time_window = round(time_window) # Validate amplitude limit parameters if amplitude_limit > 5e3 or amplitude_limit < 1: raise ValueError("amplitude_limit without reasonable range.") # Initialize marker and hidden channel lists if markers is None: markers = [] if hidden_channels is None: hidden_channels = [] # Configure input ports for data reception ip_key = self.Configuration.Keys.INPUT_PORTS input_ports: list[IPort.Configuration] = kwargs.pop( ip_key, [IPort.Configuration(name=PORT_IN)] ) # Set name if not provided if name is None: name = "Time Series Scope" # Initialize parent Scope class with configuration Scope.__init__( self, input_ports=input_ports, time_window=time_window, amplitude_limit=amplitude_limit, name=name, markers=markers, hidden_channels=hidden_channels, **kwargs, ) # Data buffer management #: Maximum number of displayable data points self._max_points: int = None #: Raw data storage buffer for all channels self._data_buffer: np.ndarray = None #: Processed display data for visible channels only self._display_buffer: np.ndarray = None #: Current position index in circular buffer self._plot_index: int = 0 #: Flag indicating circular buffer overflow status self._buffer_full: bool = False #: Global sample counter for data tracking self._sample_index: int = 0 # Performance monitoring #: Widget initialization timestamp for rate calculations self._start_time = time.time() #: Counter for display update operations self._update_counts = 0 #: Counter for data processing steps self._step_counts = 0 #: Calculated data processing rate in Hz self._step_rate = 0 # Thread synchronization #: Thread lock for safe data buffer access self._lock = threading.Lock() #: Flag indicating new data is available for display self._new_data = False # UI components #: Label widget for displaying performance statistics self._rate_label = None # Theme and appearance setup p = self.widget.palette() #: Foreground color extracted from system theme self._foreground_color = p.color(QPalette.ColorRole.WindowText) #: Background color extracted from system theme self._background_color = p.color(QPalette.ColorRole.Window) # Interactive features #: Flag controlling performance statistics visibility self._show_rates = False #: Vertical cursor line showing current data position self._cursor = None #: Keyboard event filter for interactive shortcuts self._key_filter = self.KeyPressFilter(self._toggle_show_rates) self.widget.installEventFilter(self._key_filter)
def _toggle_show_rates(self): """Toggle visibility of performance monitoring information. Shows or hides performance statistics including frame rates and processing rates. Triggered by the Alt+R keyboard shortcut. """ self._show_rates = not self._show_rates self._rate_label.setVisible(self._show_rates)
[docs] def setup( self, data: dict[str, np.ndarray], port_context_in: dict[str, dict] ) -> dict[str, dict]: """Initialize the widget with data stream parameters and buffers. Args: data: Input data dictionary (not used in setup phase). port_context_in: Context information for input ports containing sampling rate, channel count, and frame size. Returns: dict: Updated port context for downstream components. Raises: ValueError: If required context parameters are missing. """ c = port_context_in[PORT_IN] # Extract and validate required context parameters sampling_rate = c.get(Constants.Keys.SAMPLING_RATE) if sampling_rate is None: raise ValueError("sampling rate must be provided.") channel_count = c.get(Constants.Keys.CHANNEL_COUNT) if channel_count is None: raise ValueError("channel count must be provided.") frame_size = c.get(Constants.Keys.FRAME_SIZE) if frame_size is None: raise ValueError("frame size must be provided.") # Calculate buffer dimensions based on time window and sampling rate time_window = self.config[self.Configuration.Keys.TIME_WINDOW] self._max_points = int(round(time_window * sampling_rate)) self._t_vec = np.arange(0, self._max_points) / sampling_rate # Determine visible channels (exclude hidden ones) hidden_channels = self.config[ self.Configuration.KeysOptional.HIDDEN_CHANNELS ] self._channel_vec = [ i for i in range(channel_count) if i not in hidden_channels ] self._channel_count = len(self._channel_vec) # Store processing parameters #: Number of samples per data frame from pipeline self._frame_size = frame_size #: Data acquisition sampling rate in Hz self._sampling_rate = sampling_rate #: Last displayed second for time axis tick updates self._last_second = None # Allocate data buffers # Raw data buffer holds all channels self._data_buffer = np.zeros((self._max_points, channel_count)) # Display buffer holds only visible channels self._display_buffer = np.zeros( (self._max_points, self._channel_count) ) # Initialize state variables #: Flag indicating new data availability for display self._new_data = False #: Widget initialization timestamp self._start_time = time.time() #: Dictionary storing active event markers self._markers = {} return super().setup(data, port_context_in)
def _update(self): """Update the visual display with new data from the buffer. Called by the Qt timer to refresh the plot with latest data. Handles curve creation, data plotting, performance monitoring, and marker visualization. Only updates when new data is available. """ if not self._new_data: return # Set up UI elements. Note that this has to be done in the main Qt # thread (like this) ylim = (0, self._channel_count) if self._curves is None: # Create curves for each visible channel [self.add_curve() for _ in range(self._channel_count)] amp_lim = self.config[self.Configuration.Keys.AMPLITUDE_LIMIT] yl = f"EEG Amplitudes (-{amp_lim} ... +{amp_lim} µV)" self.set_labels(x_label="Time (s)", y_label=yl) # Configure channel labels on Y-axis ticks = [ ( self._channel_count - i - 0.5, f"CH{self._channel_vec[i] + 1}", ) for i in range(self._channel_count) ] self._plot_item.getAxis("left").setTicks([ticks]) self._plot_item.setYRange(*ylim) # Create cursor for time tracking col = QColor(self._foreground_color) # makes a copy col.setAlpha(128) pen = pg.mkPen(self._pen) pen.setColor(col) self._cursor = pg.PlotCurveItem(pen=pen) self._plot_item.addItem(self._cursor) # Create performance monitoring labels x = self.config[self.Configuration.Keys.TIME_WINDOW] y = ylim[1] font_size = 7 self._rate_label = pg.TextItem( text="", color=QColor(col), anchor=(1, 1) ) self._plot_item.addItem(self._rate_label) self._rate_label.setPos(x, y) self._rate_label.setFont(QFont("Arial", font_size)) # Update data with decimation for performance optimization # Decimation factor N reduces displayed points based on widget width N = np.maximum( 1, int(np.floor(self._max_points / self.widget.width())) ) with self._lock: # Copy decimated data from buffer for visible channels only self._display_buffer[::N, :] = self._data_buffer[ ::N, self._channel_vec ] self._new_data = False sample_idx = self._sample_index - 1 # Plot time cursor to show current data position t_cursor = (sample_idx % self._max_points) / self._sampling_rate self._cursor.setData([t_cursor] * 2, [*ylim], antialias=False) # Update x-axis ticks dynamically cur_second = int( np.floor((sample_idx % self._max_points) / self._sampling_rate) ) if cur_second != self._last_second: time_window = self.config[self.Configuration.Keys.TIME_WINDOW] if sample_idx > self._max_points: ticks = [] for i in range(int(np.floor(time_window)) + 1): tick_val = ( np.mod(i - (cur_second + 1), time_window) + cur_second + 1 ) offset = ( np.floor(sample_idx / self._max_points - 1) * time_window ) tick_val += offset tick_label = f"{tick_val:.0f}" ticks.append((i, tick_label)) else: ticks = [ (i, f"{i:.0f}" if i <= cur_second else "") for i in range(int(np.floor(time_window)) + 1) ] self._plot_item.getAxis("bottom").setTicks([ticks]) self._last_second = cur_second # Plot channel data with amplitude scaling and vertical offset ch_lim_key = TimeSeriesScope.Configuration.Keys.AMPLITUDE_LIMIT ch_lim = self.config[ch_lim_key] for i in range(len(self._channel_vec)): # Vertical offset: each channel gets its own "lane" d = self._channel_count - i - 0.5 self._curves[i].setData( self._t_vec[::N], self._display_buffer[::N, i] / ch_lim / 2 + d, antialias=False, ) # Update x-axis range with small margin tw = self.config[self.Configuration.Keys.TIME_WINDOW] margin = tw * 0.0125 xlim = (-margin, tw + margin) self._plot_item.setXRange(*xlim) # Update event markers: detect state changes in marker channels mk_key = self.Configuration.KeysOptional.MARKERS markers: dict = {} for m in self.config[mk_key]: ch = m["channel"] val = m["value"] # Find rising edges where marker value appears hit = ( np.where( (self._data_buffer[1:, ch] == val) & (self._data_buffer[:-1, ch] != val) )[0] + 1 ) for h in hit: id = hash(tuple([h, ch, val])) markers[id] = {"index": h, "curve": None, **m} # Add new markers to plot for k in { k: markers[k] for k in markers.keys() if k not in self._markers.keys() }: m = markers[k] idx = m["index"] # Create text label for marker text = pg.TextItem( text=m["label"], anchor=(0.5, 1), color=pg.mkColor(m["color"]) ) self._plot_item.addItem(text) text.setPos(self._t_vec[idx], self._channel_count) # Create vertical line for marker curve = self._plot_item.plot(pen=pg.mkPen(pg.mkColor(m["color"]))) curve.setData(self._t_vec[[idx, idx]], np.array([*ylim])) markers[k]["curve"] = curve markers[k]["text"] = text self._markers[k] = markers[k] # Remove outdated markers from plot for k in { k: self._markers[k] for k in self._markers.keys() if k not in markers.keys() }: m = self._markers[k] self._plot_item.removeItem(m["curve"]) self._plot_item.removeItem(m["text"]) del self._markers[k] # Update performance monitoring display self._update_counts += 1 if self._show_rates: update_rate = ( self._update_counts / sample_idx * self._sampling_rate ) self._rate_label.setText( f"data rate: {self._step_rate:.1f} Hz, " f"refresh rate: {update_rate:.1f} Hz" ) # Mark all layers dirty to force redraw scene = self._plot_item.scene() scene.invalidate(scene.sceneRect(), QtWidgets.QGraphicsScene.AllLayers)
[docs] def step(self, data: dict[str, np.ndarray]) -> dict[str, np.ndarray]: """Process incoming data frames and store them in the circular buffer. Called by the pipeline for each new data frame. Handles performance monitoring, circular buffer management, and thread-safe data storage. Args: data: Dictionary containing input data arrays from connected ports. Expected to have PORT_IN key with shape (frame_size, channels). Returns: dict: Empty dictionary (this is a sink node with no outputs). """ self._step_counts += 1 # Calculate data processing rate for performance monitoring t_el = time.time() - self._start_time + 1e-10 self._step_rate = self._step_counts / t_el # Calculate circular buffer write indices write_idx = self._sample_index + np.arange(self._frame_size) write_idx %= self._max_points # Thread-safe data storage in circular buffer with self._lock: self._data_buffer[write_idx, :] = data[PORT_IN] self._sample_index = self._sample_index + self._frame_size # Signal that new data is available for display update self._new_data = True