from __future__ import annotations
from typing import Optional
import numpy as np
from scipy.signal import get_window
from .base.generic_filter import GenericFilter
[docs]
class MovingAverage(GenericFilter):
"""Moving average filter for signal smoothing and noise reduction.
Implements a moving average filter using various window functions. Moving
average filters are linear phase FIR filters that smooth signals by
averaging values within a sliding window. Supports window functions
including rectangular, Hamming, Hanning, and others.
"""
[docs]
class Configuration(GenericFilter.Configuration):
"""Configuration class for MovingAverage filter parameters."""
[docs]
class Keys(GenericFilter.Configuration.Keys):
"""Configuration keys for moving average parameters."""
#: Window size configuration key
WINDOW_SIZE = "window_size"
#: Window function configuration key
WINDOW_FUNCTION = "window_function"
[docs]
def __init__(
self,
window_size: Optional[int] = None,
window_function: Optional[str] = None,
**kwargs,
):
"""Initialize the moving average filter.
Args:
window_size: Number of samples in the averaging window.
Must be greater than 1.
window_function: Type of window function to apply. Supported
functions include 'boxcar', 'hamming', 'hanning', etc.
Defaults to 'boxcar'.
**kwargs: Additional arguments passed to parent GenericFilter.
Raises:
ValueError: If window_size is None, <= 1, or if window_function
is not supported by scipy.signal.get_window.
"""
# Validate window size parameter
if window_size is None:
raise ValueError("window_size must not be None.")
if window_size <= 1:
raise ValueError("window_size must be greater than 1.")
# Set default window function
if window_function is None:
window_function = "boxcar" # Rectangular window
# Generate window coefficients using scipy
try:
b = get_window(window_function, window_size, fftbins=False)
except ValueError as e:
msg = f"Invalid window function '{window_function}': {e}"
raise ValueError(msg)
# Normalize coefficients to maintain unity gain
b = b / np.sum(b)
# FIR filter (no feedback, denominator = 1)
a = np.array([1.0])
# Allow overriding of coefficients via kwargs
b = kwargs.pop(MovingAverage.Configuration.Keys.B, b)
a = kwargs.pop(MovingAverage.Configuration.Keys.A, a)
# Initialize parent generic filter with computed coefficients
super().__init__(
b=b,
a=a,
window_size=window_size,
window_function=window_function,
**kwargs,
)