from __future__ import annotations
import queue
import threading
import time
from typing import Optional
import gtec_ble as ble
import numpy as np
from ...common.constants import Constants
from ..core.o_port import OPort
from .base.amplifier_source import AmplifierSource
#: Default output port identifier
PORT_OUT = Constants.Defaults.PORT_OUT
#: Default input port identifier
PORT_IN = Constants.Defaults.PORT_IN
[docs]
class BCICore8(AmplifierSource):
"""g.tec BCI Core-8 amplifier source for wireless EEG acquisition.
Interface to g.tec BCI Core-8 wireless EEG amplifier using BLE. Supports
8-channel acquisition at 250 Hz.
"""
#: Source code fingerprint for licensing verification
FINGERPRINT = "a08e74f489d777c589e562d2ddff0d42"
#: Optional buffer level monitoring output port name
PORT_BUF_LEVEL = "buffer_level"
#: Bluetooth scanning timeout in seconds
SCANNING_TIMEOUT_S = 10
#: Fixed sampling rate for BCI Core-8 amplifier in Hz
SAMPLING_RATE = 250
#: Maximum number of supported EEG channels
MAX_NUM_CHANNELS = 8
#: Default internal buffer delay in milliseconds
DEFAULT_BUFFER_DELAY_MS = 40
#: Target buffer fill ratio for stable operation
TARGET_FILL_RATIO = 0.5
#: Smoothing factor for buffer fill ratio calculation
FILL_RATIO_ALPHA = 0.9995
#: Correction interval for buffer timing in seconds
FILL_RATIO_CORRECTION_INTERVAL_S = 1.0
#: Maximum allowed consecutive buffer underruns
NUM_UNDERRUNS_ALLOWED = 10
#: Hardware delay compensation in milliseconds
DEVICE_DELAY_MS = 18
[docs]
class Configuration(AmplifierSource.Configuration):
"""Configuration class for BCI Core-8 specific parameters."""
[docs]
class Keys(AmplifierSource.Configuration.Keys):
"""Configuration keys for BCI Core-8 settings."""
OUTPUT_BUFFER_LEVEL = "output_buffer_level"
BUFFER_DELAY_MS = "buffer_delay_ms"
#: BLE amplifier device connection instance
_device: Optional[ble.Amplifier]
#: Target device serial number for connection
_target_sn: Optional[str]
[docs]
def __init__(
self,
serial: Optional[str] = None,
channel_count: Optional[int] = None,
frame_size: Optional[int] = None,
buffer_delay_ms: Optional[int] = None,
output_buffer_level: Optional[bool] = None,
**kwargs,
):
"""Initialize BCI Core-8 amplifier source.
Args:
serial: Serial number of target device. Uses first discovered
if None.
channel_count: Number of EEG channels (1-8). Defaults to 8.
frame_size: Samples per processing frame.
buffer_delay_ms: Internal buffer delay in milliseconds.
output_buffer_level: Enable buffer level monitoring output.
**kwargs: Additional arguments for parent AmplifierSource.
"""
# Validate and set channel count (1-8 channels supported)
if channel_count is None:
channel_count = self.MAX_NUM_CHANNELS
channel_count = max(1, min(channel_count, self.MAX_NUM_CHANNELS))
# Set default buffer level monitoring
if output_buffer_level is None:
output_buffer_level = False
# Configure output ports based on buffer level monitoring
output_ports = [OPort.Configuration()]
if output_buffer_level:
output_ports.append(OPort.Configuration(name=self.PORT_BUF_LEVEL))
channel_count = [channel_count, 1] # Main data + buffer level
if buffer_delay_ms is None:
buffer_delay_ms = self.DEFAULT_BUFFER_DELAY_MS
# Initialize parent amplifier source with BCI Core-8 specifications
super().__init__(
channel_count=channel_count,
sampling_rate=self.SAMPLING_RATE,
frame_size=frame_size,
decimation_factor=frame_size,
output_buffer_level=output_buffer_level,
buffer_delay_ms=buffer_delay_ms,
output_ports=output_ports,
**kwargs,
)
self._frame_size = self.config[self.Configuration.Keys.FRAME_SIZE][0]
# Calculate buffer configuration
self._target_fill_samples = int(
buffer_delay_ms / 1000 * self.SAMPLING_RATE
)
buf_size_samples = int(
self._target_fill_samples / self.TARGET_FILL_RATIO
)
buf_size_frames = int(np.ceil(buf_size_samples / self._frame_size))
# Store device configuration
self._target_sn = serial
# Initialize device connection (will be established in start())
self._device = None
# Initialize threading components for real-time processing
self._running: bool = False
self._thread: Optional[threading.Thread] = None
self._time_start: Optional[float] = None
# Initialize data management components
self._in_sample_counter: int = 0
self._out_sample_counter: int = 0
self._frame_buffer: Optional[queue.Queue] = None
self._sample_buffer: Optional[np.ndarray] = None
self._buffer_size_frames: int = buf_size_frames
self._underrun_counter: int = None
self._fill_ratio: float = None
# Calculate and set source delay for timing synchronization
self.source_delay = (buffer_delay_ms + self.DEVICE_DELAY_MS) / 1000
print(
f"BCI Core-8 buffer delay is set to " f"{buffer_delay_ms:.2f} ms."
)
# Initialize buffer level monitoring if enabled
if self.config[self.Configuration.Keys.OUTPUT_BUFFER_LEVEL]:
self._buffer_level_buffer: Optional[queue.Queue] = None
[docs]
def start(self) -> None:
"""Start BCI Core-8 amplifier and begin data acquisition.
Initializes buffers, starts background thread, establishes BLE
connection, and begins real-time data streaming.
Raises:
ConnectionError: If amplifier connection fails.
RuntimeError: If background thread creation fails.
"""
# Get configuration parameters
frame_size = self.config[self.Configuration.Keys.FRAME_SIZE]
channel_count = self.config[self.Configuration.Keys.CHANNEL_COUNT]
# Initialize data buffers for frame-based processing
self._frame_buffer = queue.Queue(maxsize=self._buffer_size_frames)
self._sample_buffer = np.zeros((frame_size[0], channel_count[0]))
self._underrun_counter = 0
self._fill_ratio = self.TARGET_FILL_RATIO
# Start background thread for timing control
if not self._running:
self._running = True
self._thread = threading.Thread(
target=self._thread_function, daemon=True
)
self._thread.start()
# Call parent start method
super().start()
# Initialize and connect to BCI Core-8 amplifier
if self._device is None:
self._device = ble.Amplifier(serial=self._target_sn)
self._device.set_data_callback(self._data_callback)
# Begin data acquisition from amplifier
self._device.start()
[docs]
def setup(
self, data: dict[str, np.ndarray], port_context_in: dict[str, dict]
) -> dict[str, dict]:
"""Setup output port contexts for BCI Core-8 data streams.
Args:
data: Input data arrays (empty for source nodes).
port_context_in: Input port contexts (empty for source nodes).
Returns:
Dictionary of output port contexts with 250 Hz sampling rate.
"""
return super().setup(data, port_context_in)
[docs]
def stop(self):
"""Stop BCI Core-8 amplifier and clean up resources.
Stops data acquisition, terminates background thread, and disconnects
from amplifier hardware.
"""
# Stop amplifier data acquisition
if self._device is not None:
self._device.stop()
# Call parent stop method
super().stop()
# Stop background thread and wait for completion
if self._running:
self._running = False
if self._thread and self._thread.is_alive():
self._thread.join(timeout=10) # Wait up to 10 seconds
# Clean up device connection
self._device = None
[docs]
def step(self, data: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
"""Retrieve processed data frames from the amplifier.
Returns data frames when decimation step is active. Handles buffer
underruns by providing zero-filled frames to maintain continuity.
Args:
data: Input data dictionary (unused for source nodes).
Returns:
Dictionary containing EEG data and optionally buffer level data,
or None if not a decimation step.
"""
if self.is_decimation_step():
out_data = {}
if self._frame_buffer.qsize() == 0:
self._underrun_counter += 1
else:
self._underrun_counter = 0
if self._underrun_counter > self.NUM_UNDERRUNS_ALLOWED:
self.log(
"Buffer underrun - performance may lag. Consider "
"increasing buffer size.",
type=Constants.LogTypes.WARNING,
)
try:
out_data = {PORT_OUT: self._frame_buffer.get(timeout=1)}
except queue.Empty:
# Provide zero-filled frame to maintain pipeline continuity
frame_size = self.config[self.Configuration.Keys.FRAME_SIZE][0]
cc_key = self.Configuration.Keys.CHANNEL_COUNT
channel_count = self.config[cc_key][0]
zero_frame = np.zeros((frame_size, channel_count))
out_data = {PORT_OUT: zero_frame}
# Buffer level monitoring
delta = self._in_sample_counter - self._out_sample_counter
cur_fill_ratio = delta / self._frame_buffer.maxsize
alpha = self.FILL_RATIO_ALPHA
self._fill_ratio = (
1 - alpha
) * cur_fill_ratio + alpha * self._fill_ratio
if self.config[self.Configuration.Keys.OUTPUT_BUFFER_LEVEL]:
buf_lvl = (cur_fill_ratio - 0.5) * 2 * 100
level_data = buf_lvl * np.ones((self._frame_size, 1))
out_data[self.PORT_BUF_LEVEL] = level_data
return out_data
else:
# Not a decimation step, return None
return None
def _data_callback(self, data: np.ndarray):
"""Callback function for incoming amplifier data.
Processes individual samples, assembles them into frames, and manages
buffer queues for real-time processing.
Args:
data: Single sample data array from amplifier with shape
(n_channels,). Only configured channels are used.
"""
# Safety check for buffer initialization
if self._sample_buffer is None:
return
# Determine position within current frame
idx_in_frame = self._in_sample_counter % self._frame_size
# Store sample data (only configured number of channels)
cc_key = AmplifierSource.Configuration.Keys.CHANNEL_COUNT
num_channels = self.config[cc_key][0]
self._sample_buffer[idx_in_frame, :] = data[:num_channels]
self._in_sample_counter += 1
# Check if frame is complete
if self._in_sample_counter % self._frame_size == 0:
try:
# Queue completed frame for processing
self._frame_buffer.put_nowait(self._sample_buffer.copy())
except queue.Full:
self._in_sample_counter -= self._frame_size
# self.log(
# "Buffer overflow - data lost. Consider "
# "increasing buffer size.",
# type=Constants.LogTypes.WARNING,
# )
pass
def _thread_function(self):
"""Background thread function for timing control and node cycles.
Implements timing control for data processing pipeline. Waits for
buffer to partially fill, then maintains precise timing for cycles.
"""
# Get sampling rate for timing calculations
sampling_rate = self.config[self.Configuration.Keys.SAMPLING_RATE]
# Startup phase: wait for buffer to partially fill
# This ensures stable data flow before real-time processing begins
limit = self._target_fill_samples
while self._running and self._in_sample_counter < limit:
time.sleep(1 / sampling_rate)
# Initialize timing for precise pipeline control
if self._time_start is None:
self._time_start = time.monotonic()
# Initialize variables
self._out_sample_counter = 0
next_wakeup_time = self._time_start
time_correction = 0
correction_interval_samples = int(
self.FILL_RATIO_CORRECTION_INTERVAL_S * sampling_rate
)
buf_size = self._frame_buffer.maxsize
fill_samples_target = self.TARGET_FILL_RATIO * buf_size
# Main timing loop
while self._running:
# Calculate expected time for next pipeline cycle
self._out_sample_counter += 1
next_sample_time = self._out_sample_counter / sampling_rate
next_wakeup_time = self._time_start + next_sample_time
next_wakeup_time += time_correction
# Determine sleep time to maintain precise timing
current_time = time.monotonic()
sleep_time = next_wakeup_time - current_time
# Sleep only if duration is significant (avoid timing jitter)
if sleep_time > 0.001:
time.sleep(sleep_time)
# Trigger pipeline processing cycle
self.cycle()
# Correction mechanism to adjust timing based on buffer fill ratio.
# This is important to maintain the target buffer delay and
# prevent drift.
if self._out_sample_counter % correction_interval_samples == 0:
fill_samples = self._fill_ratio * buf_size
delta_samples = fill_samples - fill_samples_target
delta_s = delta_samples / sampling_rate
if abs(delta_s) > 1e-3:
# reset fill ratio to target to avoid overcorrection
self._fill_ratio = self.TARGET_FILL_RATIO
# Adjust time correction based on fill ratio deviation
time_correction -= delta_s