import neurokit2 as nk
import numpy as np
import pandas as pd
import scipy as sp
from numpy.typing import ArrayLike
from biobss.common import signal_hjorth
from biobss.preprocess import signal_detectpeaks
# These constants are defined considering the respiration range (6-60 breaths/min)
LAG_MIN = 1.33 # Period of respiration cycle for upper limit of respiration range (seconds).
LAG_MAX = 10 # Period of respiration cycle for lower limit of respiration range (seconds).
def _interpolate_and_resample(sig: ArrayLike, sig_x: ArrayLike, peaks_locs: ArrayLike, rs_ratio: float, offset: int):
"""Interpolates and resamples respiratory signals."""
sig_interp = sp.interpolate.interp1d(sig_x, sig)
num_x = peaks_locs[offset] - peaks_locs[0] + 1
sig_xnew = np.linspace(peaks_locs[0], peaks_locs[offset], num_x)
sig_new = sig_interp(sig_xnew)
respsig = sig_new[::rs_ratio]
resp_x = sig_xnew[::rs_ratio].astype(int)
return respsig, resp_x
[docs]def filter_resp_sig(resampling_rate: float = 10.0, rsp_clean_method: str = "khodadad2018", **kwargs) -> dict:
"""Filters extracted respiratory signal(s) and returns a dictionary of cleaned signal(s). Uses 'rsp_clean' function from the Neurokit2 library.
Args:
resampling_rate (float, optional): Resampling rate. Defaults to 10.
rsp_clean_method (str, optional): Method to clean the respiratory signal. Defaults to 'khodadad2018'.
kwargs:
am_sig (Array, optional):Respiratory signal calculated from amplitude modulation. Defaults to None.
am_x (Array, optional):x-axis values of amplitude modulation signal. Defaults to None.
fm_sig (Array, optional):Respiratory signal calculated from frequency modulation. Defaults to None.
fm_x (Array, optional):x-axis values of frequency modulation signal. Defaults to None.
bw_sig (Array, optional):Respiratory signal calculated from baseline wander. Defaults to None.
bw_x (Array, optional):x-axis values of baseline wander signal. Defaults to None.
Returns:
dict: Dictionary of filtered respiratory signals and x-axis values for each signal.
"""
if resampling_rate <= 0:
raise ValueError("Resampling rate must be greater than 0.")
rsp_clean_method = rsp_clean_method.lower()
filtered_resp = {}
if "am_sig" in kwargs.keys():
cl_am = nk.rsp_clean(kwargs["am_sig"], sampling_rate=resampling_rate, method=rsp_clean_method)
filtered_resp["am_sig"] = cl_am
filtered_resp["am_x"] = kwargs["am_x"]
if "fm_sig" in kwargs.keys():
cl_fm = nk.rsp_clean(kwargs["fm_sig"], sampling_rate=resampling_rate, method=rsp_clean_method)
filtered_resp["fm_sig"] = cl_fm
filtered_resp["fm_x"] = kwargs["fm_x"]
if "bw_sig" in kwargs.keys():
cl_bw = nk.rsp_clean(kwargs["bw_sig"], sampling_rate=resampling_rate, method=rsp_clean_method)
filtered_resp["bw_sig"] = cl_bw
filtered_resp["bw_x"] = kwargs["bw_x"]
return filtered_resp
[docs]def estimate_rr(
resp_sig: ArrayLike, resampling_rate: float = 10.0, method: str = "peakdet", delta: float = 0.001
) -> float:
"""Estimates respiratory rate from the respiratory signal.
Args:
resp_sig (ArrayLike): Respiratory signal.
sampling_rate (float): Sampling rate of the respiratory signal (resampling rate, Hz). Defaults to 10.
method (str): Method used for respiratory rate estimation. Can be one of 'peakdet' or 'xcorr'. Defaults to 'peakdet'.
Uses 'rsp_rate' function from the Neurokit2 library if the method is 'xcorr'.
delta (float, optional): Parameter of 'peakdet' method. Defaults to 0.001.
Returns:
float: Estimated respiratory rate (breaths/minutes).
"""
if resampling_rate <= 0:
raise ValueError("Sampling rate must be greater than 0.")
method = method.lower()
if method == "peakdet":
mean_rr = _estimate_rr_peakdet(resp_sig, resampling_rate, delta=delta)
elif method == "xcorr":
mean_rr = _estimate_rr_xcorr(resp_sig, resampling_rate=resampling_rate)
else:
raise ValueError("The method should be one of 'peakdet' or 'xcorr'.")
return mean_rr
def _estimate_rr_peakdet(resp_sig: ArrayLike, resampling_rate: float = 10.0, delta: float = 0.001) -> float:
"""Estimates respiratory rate using 'peakdet' method."""
maxtab, mintab = signal_detectpeaks._peakdetection_peakdet(resp_sig, delta)
if (maxtab.size == 0) or (mintab.size == 0):
raise ValueError("No peaks or troughs found.Respiratory rate can not be estimated!")
else:
peaks_locs = maxtab[:, 0].astype(int)
intervals = abs(np.diff(peaks_locs)) / resampling_rate # seconds
mean_int = np.mean(intervals)
mean_rr = 60 / mean_int # br/minutes
return mean_rr
def _estimate_rr_xcorr(resp_sig: ArrayLike, resampling_rate: float = 10.0) -> float:
"""Estimates raspiratory rate using 'xcorr' method."""
rr_inst = nk.rsp_rate(resp_sig, sampling_rate=resampling_rate, method="xcorr")
mean_rr = rr_inst.mean()
return mean_rr
[docs]def calc_rqi(resp_sig: ArrayLike, resampling_rate: float = 10.0, rqi_method: list = ["autocorr", "hjorth"]) -> dict:
"""Calculates respiratory quality index for the respiratory signal.
Args:
resp_sig (ArrayLike): Respiratory signal.
resampling_rate (float, optional): Sampling rate (after resampling, Hz) of the respiratory signal. Defaults to 10.
rqi_method (list, optional): Method for calculating respiratory quality index. Defaults to ['autocorr','hjorth'].
Returns:
dict: Dictionary of calculated RQIs.
"""
if resampling_rate <= 0:
raise ValueError("Resampling rate must be greater than 0.")
rqi_method = [x.lower() for x in rqi_method]
rqindices = {}
if "autocorr" in rqi_method:
corr_coeff = _calc_rqi_autocorr(resp_sig, resampling_rate)
rqindices["autocorr"] = max(corr_coeff)
if "hjorth" in rqi_method:
rqindices["hjorth"] = signal_hjorth.hjorth_complexity_mobility(resp_sig)[0]
return rqindices
def _calc_rqi_autocorr(resp_sig: ArrayLike, resampling_rate: float = 10.0) -> dict:
"""Calculates respiratory quality index using autocorrelation method."""
corr_coeff = []
sig = pd.Series(resp_sig)
lag_min = int(LAG_MIN * resampling_rate) # seconds to samples
lag_max = int(LAG_MAX * resampling_rate) # seconds to samples
for lag in range(lag_min, lag_max + 1):
c = sig.autocorr(lag=lag)
corr_coeff.append(c)
return corr_coeff
[docs]def fuse_rr(rr_est: ArrayLike, rqi: ArrayLike = None, fusion_method: str = "smartfusion") -> float:
"""Fuses respiratory rates calculated from different modulation types.
Args:
rr_est (Array): Respiratory rate estimations.
rqi (Array, optional): Respiratory quality indices. Defaults to None.
fusion_method (str, optional): Fusion method. Can be one of 'SmartFusion' of 'QualityFusion'. Defaults to 'SmartFusion'.
Raises:
ValueError: If lengths of 'rr_est' and 'rqi' arrays do not match.
ValueError: If method is 'QualityFusion' and RQI values are not provided.
ValueError: If method is not one of 'SmartFusion' and 'QualityFusion'.
Returns:
float: Fused respiratory rate
"""
fusion_method = fusion_method.lower()
if fusion_method == "smartfusion":
rr_fused = _fuse_rr_smartfusion(rr_est)
elif fusion_method == "qualityfusion":
rr_fused = _fuse_rr_qualityfusion(rr_est, rqi)
else:
raise ValueError("The method should be one of 'SmartFusion' and 'QualityFusion'.")
return rr_fused
def _fuse_rr_smartfusion(rr_est: ArrayLike) -> float:
"""Fuses respiratory rate estimations using 'SmartFusion' method."""
rr_std = np.std(rr_est)
if rr_std <= 4:
rr_fused = np.mean(rr_est)
else:
rr_fused = ["invalid signal"]
return rr_fused
def _fuse_rr_qualityfusion(rr_est: ArrayLike, rqi: ArrayLike = None) -> float:
"""Fuses respiratory rate estimations using 'QualityFusion' method."""
if rqi is not None:
if len(rr_est) != len(rqi):
raise ValueError("The lengths of 'rr_est' and 'rqi' arrays do not match!")
rr_fused = np.sum(np.asarray(rqi) * np.asarray(rr_est)) / np.sum(rqi)
else:
raise ValueError("RQI values are required for the 'QualityFusion' method.'")
return rr_fused