from typing import Union
import ioiocore as ioc
from .core.node import Node
[docs]
class Pipeline(ioc.Pipeline):
"""Brain-Computer Interface pipeline for real-time data processing.
Extends ioiocore Pipeline for BCI applications with automatic logging
to platform-specific directories. Manages node lifecycle, data flow
connections, and real-time execution of interconnected processing nodes.
"""
[docs]
def __init__(self):
"""Initialize Pipeline with platform-specific logging directory."""
# Determine platform-specific log directory
import os
import sys
if sys.platform == "win32":
log_dir = os.path.join(os.getenv("APPDATA", ""), "gtec", "gPype")
elif sys.platform == "darwin":
app_support = os.path.expanduser("~/Library/Application Support")
log_dir = os.path.join(app_support, "gtec", "gPype")
else:
log_dir = None # Use default ioiocore directory
# Initialize parent pipeline with logging directory
super().__init__(directory=log_dir)
[docs]
def connect(self, source: Union[Node, dict], target: Union[Node, dict]):
"""Connect two nodes to establish data flow in the pipeline.
Nodes are automatically added to pipeline if not already present.
Args:
source (Union[Node, dict]): Source node or port specification.
Use dict for specific ports (e.g., node["port_name"]).
target (Union[Node, dict]): Target node or port specification.
Use dict for specific ports (e.g., node["port_name"]).
"""
super().connect(source, target)
[docs]
def start(self):
"""Start the pipeline and begin real-time data processing.
Initiates execution of all nodes according to their configured
connections and timing. Runs continuously until stop() is called.
This method is non-blocking.
"""
super().start()
[docs]
def stop(self):
"""Stop the pipeline and terminate all data processing.
Gracefully shuts down all nodes and cleans up resources including
threads, file handles, and hardware connections. Always call
stop() before program termination to ensure proper cleanup,
especially when using hardware interfaces.
"""
super().stop()
[docs]
def serialize(self) -> dict:
"""Serialize the pipeline configuration to a dictionary.
Returns:
dict: Dictionary containing the complete pipeline configuration,
including nodes, connections, parameters, and metadata.
"""
return super().serialize()
[docs]
@staticmethod
def deserialize(data: dict) -> "Pipeline":
"""Deserialize a pipeline configuration from a dictionary.
Args:
data (dict): Serialized pipeline configuration dictionary.
Returns:
Pipeline: A new Pipeline instance with the specified configuration.
"""
# Deserialize using parent class
ioc_pipeline = ioc.Pipeline.deserialize(data)
# Create Pipeline instance without calling __init__
pipeline = object.__new__(Pipeline)
# Copy all instance attributes from deserialized pipeline
pipeline.__dict__.update(ioc_pipeline.__dict__)
return pipeline