Source code for biobss.ecgtools.ecg_filter

from numpy.typing import ArrayLike
from scipy import signal


[docs]def filter_ecg(sig: ArrayLike, sampling_rate: float, method: str, **kwargs) -> ArrayLike: """Filters ECG signal using predefined filter parameters. Args: sig (ArrayLike): ECG signal. sampling_rate (float): Sampling rate of the ECG signal (Hz). method (str): Filtering method. Should be one of ['notch', 'bandpass', 'pantompkins', 'hamilton', 'elgendi]. Kwargs: f_notch (float) : Center frequency of the notch filter (w0). quality_factor (float): Quality factor (Q). It is calculated as Q = w0/bw where bw is the -3dB bandwidth. Raises: ValueError: If sampling rate is less than or equal to 0. ValueError: If cut-off frequency is less than 0. ValueError: If required parameters are not provided for the selected method. ValueError: If filtering method is not one of ['notch', 'pantompkins', 'hamilton', 'elgendi]. Returns: ArrayLike: Filtered ECG signal. """ if sampling_rate <= 0: raise ValueError("Sampling rate must be greater than 0.") method = method.lower() if method == "notch": filtered_sig = _filter_ecg_notch(sig=sig, sampling_rate=sampling_rate, **kwargs) elif method == "pantompkins": filtered_sig = _filter_ecg_pantompkins(sig=sig, sampling_rate=sampling_rate) elif method == "hamilton": filtered_sig = _filter_ecg_hamilton(sig=sig, sampling_rate=sampling_rate) elif method == "elgendi": filtered_sig = _filter_ecg_elgendi(sig=sig, sampling_rate=sampling_rate) else: raise ValueError(f"Undefined method: {method}.") return filtered_sig
def _filter_ecg_notch(sig: ArrayLike, sampling_rate: float, **kwargs) -> ArrayLike: """Filters ECG signal using a Notch filter.""" if all(k in kwargs.keys() for k in ("f_notch", "quality_factor")): if kwargs["f_notch"] <= 0: raise ValueError("Cut-off frequencies must be greater than 0.") b, a = signal.iirnotch(kwargs["f_notch"], kwargs["quality_factor"], sampling_rate) filtered_sig = signal.filtfilt(b, a, sig) else: raise ValueError(f'Missing keyword arguments for the selected method: "notch".') return filtered_sig def _filter_ecg_pantompkins(sig: ArrayLike, sampling_rate: float) -> ArrayLike: """Filters ECG signal using the filter parameters defined in: Pan, J. & Tompkins, W. J.,(1985). 'A real-time QRS detection algorithm'.""" W1 = 5 / (sampling_rate / 2) # normalized frequency W2 = 15 / (sampling_rate / 2) # normalized frequency N = 1 btype = "bandpass" sos = signal.butter(N, [W1, W2], btype, output="sos") filtered_sig = signal.sosfiltfilt(sos, sig) return filtered_sig def _filter_ecg_hamilton(sig: ArrayLike, sampling_rate: float) -> ArrayLike: """Filters ECG signal using the filter parameters defined in: Hamilton, P.S. (2002), 'Open Source ECG Analysis Software Documentation'.""" W1 = 8 / (sampling_rate / 2) # normalized frequency W2 = 16 / (sampling_rate / 2) # normalized frequency N = 1 btype = "bandpass" sos = signal.butter(N, [W1, W2], btype, output="sos") filtered_sig = signal.sosfiltfilt(sos, sig) return filtered_sig def _filter_ecg_elgendi(sig: ArrayLike, sampling_rate: float) -> ArrayLike: """Filters ECG signal using the filter parameters defined in: Elgendi, M. & Jonkman, M. & De Boer, F. (2010). 'Frequency Bands Effects on QRS Detection'.""" W1 = 8 / (sampling_rate / 2) # normalized frequency W2 = 20 / (sampling_rate / 2) # normalized frequency N = 2 btype = "bandpass" sos = signal.butter(N, [W1, W2], btype, output="sos") filtered_sig = signal.sosfiltfilt(sos, sig) return filtered_sig