Source code for gpype.backend.sinks.base.file_writer

from __future__ import annotations

import os
import queue
import threading
import time
from abc import abstractmethod
from datetime import datetime
from typing import Optional

import ioiocore as ioc
import numpy as np

from ....common.constants import Constants
from ...core.i_node import INode
from ...core.i_port import IPort


[docs] class FileWriter(INode): """Abstract base class for threaded file writers. Implements a file writer that operates in a separate background thread to prevent blocking the main signal processing pipeline. Data is queued and written asynchronously to maintain real-time performance. Subclasses must implement format-specific file operations. """
[docs] class Configuration(ioc.INode.Configuration): """Configuration class for FileWriter parameters."""
[docs] class Keys(ioc.INode.Configuration.Keys): """Configuration keys for file writer settings.""" #: File name configuration key FILE_NAME = "file_name"
[docs] def __init__( self, file_name: str, **kwargs, ): """Initialize the file writer with specified filename. Args: file_name: Base filename for data output. A timestamp will be automatically appended. **kwargs: Additional arguments passed to parent INode class. """ # Initialize parent INode with configuration INode.__init__(self, file_name=file_name, **kwargs) # Initialize threading components for background file operations self._file_queue = queue.Queue() # Thread-safe data queue self._stop_event = threading.Event() # Shutdown coordination self._worker_thread = None # Background file writer thread self._sample_counter = 0 # Global sample index counter self._sampling_rate = None # Sampling rate from port context self._file_path = None # Full path to output file
def _generate_file_path(self) -> str: """Generate the output file path with timestamp. Takes the configured file name, inserts a timestamp before the extension, and validates the extension matches the writer's format. Returns: Full file path with timestamp inserted. Raises: ValueError: If file extension doesn't match writer's format. """ file_name = self.config[self.Configuration.Keys.FILE_NAME] name, ext = os.path.splitext(file_name) # Validate file extension expected_ext = self.file_extension if ext.lower() != expected_ext.lower(): raise ValueError( f"Invalid file extension '{ext}'. " f"Expected '{expected_ext}' for {self.__class__.__name__}." ) # Insert timestamp before extension timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") return f"{name}_{timestamp}{ext}"
[docs] def start(self): """Start the file writer and initialize background thread. Generates the output file path with timestamp and starts the background worker thread for asynchronous writing. The actual file opening is deferred to setup() when port context is available. Raises: ValueError: If the file extension is invalid for this writer. """ # Generate timestamped file path self._file_path = self._generate_file_path() # Reset sample counter self._sample_counter = 0 # Initialize and start background worker thread self._worker_thread = threading.Thread( target=self._file_worker, daemon=True ) self._stop_event.clear() # Reset stop event self._worker_thread.start() # Call parent start method super().start()
[docs] def stop(self): """Stop the file writer and clean up resources. Signals the background thread to stop, waits for it to finish processing remaining data, and properly closes the file. Ensures all queued data is written before stopping. """ # Signal background thread to stop self._stop_event.set() # Wait for worker thread to finish processing remaining data if self._worker_thread is not None: self._worker_thread.join() self._worker_thread = None # Close file using format-specific implementation self._close_file() # Call parent stop method super().stop()
[docs] def setup( self, data: dict[str, np.ndarray], port_context_in: dict[str, dict] ) -> dict[str, dict]: """Setup method called before processing begins. Extracts sampling rate from port context and opens the output file using the format-specific implementation. Args: data: Dictionary of input data arrays from connected ports. port_context_in: Context information from input ports. Returns: Empty dictionary as this is a sink node with no outputs. Raises: RuntimeError: If sampling rate is not provided in port context. """ # Extract sampling rate from port context PORT_IN = Constants.Defaults.PORT_IN sr_key = Constants.Keys.SAMPLING_RATE self._sampling_rate = port_context_in[PORT_IN].get(sr_key, None) if self._sampling_rate is None: raise RuntimeError("Sampling rate not provided in port context.") # Open file using format-specific implementation self._open_file(self._file_path, port_context_in) # No output context for sink nodes return {}
[docs] def step(self, data: dict[str, np.ndarray]) -> dict[str, np.ndarray]: """Process incoming data by queuing it for background writing. Args: data: Dictionary containing input data arrays. Uses the default input port to retrieve data for writing. Returns: Empty dictionary as this is a sink node with no outputs. """ # Get data from default input port d = data[Constants.Defaults.PORT_IN] # Copy data and queue for background writing (thread-safe) self._file_queue.put(d.copy()) # No output data for sink nodes return {}
def _file_worker(self): """Background worker thread for asynchronous file writing. Runs in a separate thread to handle file I/O operations without blocking the main signal processing pipeline. Continuously processes data blocks from the queue until stop is signaled. """ # Continue processing until stop signaled AND queue is empty while not self._stop_event.is_set() or not self._file_queue.empty(): if not self._file_queue.empty(): try: # Get next data block from queue block = self._file_queue.get(timeout=1) # Calculate timestamps for this block start_idx = self._sample_counter end_idx = self._sample_counter + block.shape[0] indices = np.arange(start_idx, end_idx) timestamps = indices / self._sampling_rate # Write block using format-specific implementation self._write_block(block, timestamps) # Update sample counter self._sample_counter += block.shape[0] except queue.Empty: # Timeout occurred, continue loop to check stop condition continue else: # Queue is empty, brief sleep to prevent busy waiting time.sleep(0.01) continue @property @abstractmethod def file_extension(self) -> str: """Return the file extension for this writer type. Returns: File extension including the dot (e.g., '.csv', '.hdf5'). """ pass # pragma: no cover @abstractmethod def _open_file( self, file_path: str, port_context_in: dict[str, dict] ) -> None: """Open the output file for writing. Format-specific implementation to create and initialize the output file. Called during setup() when port context is available. Args: file_path: Full path to the output file. port_context_in: Context information from input ports, containing channel count, sampling rate, and other metadata. """ pass # pragma: no cover @abstractmethod def _write_block(self, block: np.ndarray, timestamps: np.ndarray) -> None: """Write a data block to the file. Format-specific implementation to write data to the output file. Called from the background worker thread. Args: block: Data block to write, shape (samples, channels). timestamps: Timestamp array for each sample in the block. """ pass # pragma: no cover @abstractmethod def _close_file(self) -> None: """Close the output file and finalize. Format-specific implementation to properly close the file and perform any finalization steps. Called during stop(). """ pass # pragma: no cover