Source code for gpype.backend.sources.hybrid_black

from __future__ import annotations

import os
import sys
import threading
import time
from typing import Optional

import numpy as np

from ...common.constants import Constants
from ..core.o_port import OPort
from .base.amplifier_source import AmplifierSource

#: Default output port identifier
PORT_OUT = Constants.Defaults.PORT_OUT
#: Default input port identifier
PORT_IN = Constants.Defaults.PORT_IN


def _get_unicorn_lib_path() -> str:
    """Get the path to the Unicorn Python library.

    The UnicornPy module requires the Unicorn.dll to be accessible.
    This function returns the path to the Lib folder in the user's
    Documents folder where the Unicorn Suite is typically installed.

    Returns:
        Path to the Unicorn Python Lib directory.
    """
    # Get user documents folder on Windows
    documents_path = os.path.join(os.environ["USERPROFILE"], "Documents")
    unicorn_lib_path = os.path.join(
        documents_path,
        "gtec",
        "Unicorn Suite",
        "Hybrid Black",
        "Unicorn Python",
        "Lib",
    )
    return unicorn_lib_path


def _ensure_unicorn_path():
    """Ensure the Unicorn Python library path is in sys.path and PATH.

    Adds the Unicorn Lib directory to sys.path for module import and
    to the PATH environment variable for DLL loading.
    """
    lib_path = _get_unicorn_lib_path()

    # Add to sys.path if not already present
    if lib_path not in sys.path:
        sys.path.insert(0, lib_path)

    # Add to PATH for DLL loading
    current_path = os.environ.get("PATH", "")
    if lib_path not in current_path:
        os.environ["PATH"] = lib_path + os.pathsep + current_path


[docs] class HybridBlack(AmplifierSource): """g.tec Unicorn Hybrid Black amplifier source for EEG acquisition. Interface to g.tec Unicorn Hybrid Black wireless EEG amplifier using Bluetooth. Supports 8-channel EEG acquisition at 250 Hz, plus optional accelerometer, gyroscope, battery, counter, and validation channels. """ #: Source code fingerprint for licensing verification FINGERPRINT = "f83f85021ea989e78bf8dd50fbbdb796" #: Fixed sampling rate for Unicorn Hybrid Black amplifier in Hz SAMPLING_RATE = 250 #: Number of EEG channels NUM_EEG_CHANNELS = 8 #: Number of accelerometer channels (X, Y, Z) NUM_ACCEL_CHANNELS = 3 #: Number of gyroscope channels (X, Y, Z) NUM_GYRO_CHANNELS = 3 #: Total number of acquired channels (EEG + Accel + Gyro + Battery + #: Counter + Validation) TOTAL_ACQUIRED_CHANNELS = 17 #: Hardware delay compensation in milliseconds (Bluetooth latency) DEVICE_DELAY_MS = 40 #: Maximum allowed consecutive buffer underruns before warning NUM_UNDERRUNS_ALLOWED = 5 #: Wait interval when behind (GetData < 1ms), in seconds WAIT_BEHIND_S = 0.003 #: Wait interval when on time (GetData >= 1ms), in seconds WAIT_ON_TIME_S = 0.0039 #: Threshold for determining if we're behind (GetData blocking time) BEHIND_THRESHOLD_S = 0.001
[docs] class Configuration(AmplifierSource.Configuration): """Configuration class for Unicorn Hybrid Black specific parameters."""
[docs] class Keys(AmplifierSource.Configuration.Keys): """Configuration keys for Unicorn Hybrid Black settings.""" INCLUDE_ACCEL = "include_accel" INCLUDE_GYRO = "include_gyro" INCLUDE_AUX = "include_aux" TEST_SIGNAL = "test_signal"
[docs] def __init__( self, serial: Optional[str] = None, channel_count: Optional[int] = None, frame_size: Optional[int] = None, include_accel: Optional[bool] = None, include_gyro: Optional[bool] = None, include_aux: Optional[bool] = None, test_signal: Optional[bool] = None, **kwargs, ): """Initialize Unicorn Hybrid Black amplifier source. Args: serial: Serial number of target device. Uses first discovered if None. channel_count: Number of EEG channels (1-8). Defaults to 8. frame_size: Samples per processing frame. include_accel: Include accelerometer channels (3 channels). include_gyro: Include gyroscope channels (3 channels). include_aux: Include auxiliary channels (battery, counter, validation). test_signal: Enable test signal mode instead of live data. **kwargs: Additional arguments for parent AmplifierSource. Raises: NotImplementedError: If not running on Windows. """ # Platform check - Hybrid Black is only supported on Windows if sys.platform != "win32": raise NotImplementedError( "HybridBlack amplifier is only supported on Windows." ) # Ensure UnicornPy is importable _ensure_unicorn_path() # Validate and set channel count (1-8 EEG channels supported) if channel_count is None: channel_count = self.NUM_EEG_CHANNELS channel_count = max(1, min(channel_count, self.NUM_EEG_CHANNELS)) # Set default values for optional parameters if include_accel is None: include_accel = False if include_gyro is None: include_gyro = False if include_aux is None: include_aux = False if test_signal is None: test_signal = False # Calculate total output channels based on configuration total_channels = channel_count if include_accel: total_channels += self.NUM_ACCEL_CHANNELS if include_gyro: total_channels += self.NUM_GYRO_CHANNELS if include_aux: total_channels += 3 # Battery, Counter, Validation # Configure output ports output_ports = [OPort.Configuration()] # Initialize parent amplifier source with Hybrid Black specifications super().__init__( channel_count=total_channels, sampling_rate=self.SAMPLING_RATE, frame_size=frame_size, decimation_factor=frame_size, include_accel=include_accel, include_gyro=include_gyro, include_aux=include_aux, test_signal=test_signal, output_ports=output_ports, **kwargs, ) self._frame_size = self.config[self.Configuration.Keys.FRAME_SIZE][0] # Store device configuration self._target_sn = serial self._eeg_channel_count = channel_count self._total_channels = total_channels self._include_accel = include_accel self._include_gyro = include_gyro self._include_aux = include_aux self._test_signal = test_signal # Initialize device connection (will be established in start()) self._device = None # Initialize threading components self._running: bool = False self._acquisition_thread: Optional[threading.Thread] = None # Current frame for passing data from acquisition to step() self._current_frame: Optional[np.ndarray] = None # Underrun tracking self._underrun_counter: int = 0 # Set source delay for timing synchronization (hardware delay only) self.source_delay = self.DEVICE_DELAY_MS / 1000
[docs] def start(self) -> None: """Start Unicorn Hybrid Black amplifier and begin data acquisition. Establishes Bluetooth connection and starts background thread that acquires data and drives the pipeline via cycle(). Raises: ConnectionError: If amplifier connection fails. RuntimeError: If background thread creation fails. """ # Import UnicornPy when actually needed (lazy import) try: import UnicornPy except ImportError as e: raise RuntimeError( f"UnicornPy library not available: {e}. " "Please ensure the Unicorn Suite is installed and the " "library path is correct." ) from e # Initialize current frame holder self._current_frame = None self._underrun_counter = 0 # Initialize and connect to Unicorn Hybrid Black amplifier if self._device is None: # Get available devices if no serial specified if self._target_sn is None: device_list = UnicornPy.GetAvailableDevices(True) if len(device_list) <= 0 or device_list is None: raise ConnectionError( "No Unicorn device available. Please pair with a " "Unicorn first." ) self._target_sn = device_list[0] print(f"Using first available device: {self._target_sn}") # Connect to the device self._device = UnicornPy.Unicorn(self._target_sn) print(f"Connected to Unicorn Hybrid Black: {self._target_sn}") # Start single acquisition thread (handles both data and timing) if not self._running: self._running = True self._acquisition_thread = threading.Thread( target=self._acquisition_function, daemon=True ) self._acquisition_thread.start() # Call parent start method super().start()
[docs] def setup( self, data: dict[str, np.ndarray], port_context_in: dict[str, dict] ) -> dict[str, dict]: """Setup output port contexts for Unicorn Hybrid Black data streams. Args: data: Input data arrays (empty for source nodes). port_context_in: Input port contexts (empty for source nodes). Returns: Dictionary of output port contexts with 250 Hz sampling rate. """ return super().setup(data, port_context_in)
[docs] def stop(self): """Stop Unicorn Hybrid Black amplifier and clean up resources. Stops data acquisition, terminates background thread, and disconnects from amplifier hardware. """ # Stop background thread and wait for completion if self._running: self._running = False acq_thread = self._acquisition_thread if acq_thread and acq_thread.is_alive(): acq_thread.join(timeout=10) # Stop amplifier data acquisition if self._device is not None: try: self._device.StopAcquisition() except Exception: pass # Device may already be stopped # Clean up device connection del self._device self._device = None # Call parent stop method super().stop()
[docs] def step(self, data: dict[str, np.ndarray]) -> dict[str, np.ndarray]: """Retrieve processed data frames from the amplifier. Returns data frames when decimation step is active. Args: data: Input data dictionary (unused for source nodes). Returns: Dictionary containing EEG data, or None if not a decimation step. """ if self.is_decimation_step(): # Return current frame (set by acquisition thread before cycle()) if self._current_frame is not None: out_data = {PORT_OUT: self._current_frame} self._current_frame = None return out_data else: # Provide zero-filled frame if no data available frame_size = self.config[self.Configuration.Keys.FRAME_SIZE][0] cc_key = self.Configuration.Keys.CHANNEL_COUNT channel_count = self.config[cc_key][0] zero_frame = np.zeros((frame_size, channel_count)) return {PORT_OUT: zero_frame} else: # Not a decimation step, return None return None
def _acquisition_function(self): """Background thread function for data acquisition and pipeline timing. Uses adaptive timing: if GetData is fast (<1ms), we're behind and use shorter wait. If GetData blocks (>=1ms), we're on time and use normal wait interval. """ try: import UnicornPy except ImportError: return # Get number of acquired channels from device num_acquired_channels = self._device.GetNumberOfAcquiredChannels() # Acquisition frame size matches output frame size acq_frame_length = self._frame_size receive_buffer_length = acq_frame_length * num_acquired_channels * 4 receive_buffer = bytearray(receive_buffer_length) # Build column indices for channel selection col_indices = list(range(self._eeg_channel_count)) if self._include_accel: start = self.NUM_EEG_CHANNELS col_indices.extend(range(start, start + 3)) if self._include_gyro: start = self.NUM_EEG_CHANNELS + self.NUM_ACCEL_CHANNELS col_indices.extend(range(start, start + 3)) if self._include_aux: start = self.NUM_EEG_CHANNELS + self.NUM_ACCEL_CHANNELS + 3 col_indices.extend(range(start, num_acquired_channels)) # Check if we can use fast contiguous slice (EEG only, all 8 channels) use_slice = ( not self._include_accel and not self._include_gyro and not self._include_aux and self._eeg_channel_count == self.NUM_EEG_CHANNELS ) # Start data acquisition from amplifier self._device.StartAcquisition(self._test_signal) # Initialize adaptive timing wait_interval_s = self.WAIT_ON_TIME_S t_last_call = time.perf_counter() while self._running: try: # Calculate expected next call time and sleep if needed t_expected_next = t_last_call + wait_interval_s t_now = time.perf_counter() t_remaining = t_expected_next - t_now if t_remaining > 0: time.sleep(t_remaining) # GetData call with timing t_pre_get = time.perf_counter() self._device.GetData( acq_frame_length, receive_buffer, receive_buffer_length ) t_post_get = time.perf_counter() t_last_call = t_post_get # Adaptive timing: adjust wait based on GetData blocking time blocking_time_s = t_post_get - t_pre_get if blocking_time_s < self.BEHIND_THRESHOLD_S: # Fast return = buffer had data = we're behind wait_interval_s = self.WAIT_BEHIND_S self._underrun_counter = 0 else: # Slow return = had to wait for data = we're on time wait_interval_s = self.WAIT_ON_TIME_S self._underrun_counter += 1 if self._underrun_counter > self.NUM_UNDERRUNS_ALLOWED: self.log( "Buffer underrun - performance may lag.", type=Constants.LogTypes.WARNING, ) self._underrun_counter = 0 # Convert to numpy array raw_data = np.frombuffer( receive_buffer, dtype=np.float32, count=num_acquired_channels * acq_frame_length, ).reshape(acq_frame_length, num_acquired_channels) # Extract selected channels if use_slice: frame_data = raw_data[:, :self.NUM_EEG_CHANNELS].copy() else: frame_data = raw_data[:, col_indices].copy() # Set current frame and trigger pipeline cycle immediately self._current_frame = frame_data self.cycle() except Exception as e: if self._running: self.log( f"Data acquisition error: {e}", type=Constants.LogTypes.WARNING, ) # Clean up receive buffer del receive_buffer
[docs] @staticmethod def get_available_devices() -> list[str]: """Get list of available Unicorn Hybrid Black devices. Returns: List of device serial numbers that are available for connection. Returns empty list on non-Windows platforms. """ if sys.platform != "win32": return [] _ensure_unicorn_path() try: import UnicornPy device_list = UnicornPy.GetAvailableDevices(True) return device_list if device_list else [] except ImportError: return [] except Exception: return []