Source code for biobss.ppgtools.ppg_timedomain

import matplotlib.pyplot as plt
import numpy as np
from numpy.typing import ArrayLike

from biobss.ppgtools.ppg_peaks import *

# Time domain features
FEATURES_TIME_CYCLE = {
    "a_S": lambda sig, _0, locs_S, _1, _2, _3: np.mean(sig[locs_S]),
    "t_S": lambda _0, sampling_rate, locs_S, locs_O, _1, _2: np.mean((locs_S - locs_O[:-1]) / sampling_rate),
    "t_C": lambda _0, sampling_rate, _1, locs_O, _2, _3: np.mean(np.diff(locs_O) / sampling_rate),
    "DW": lambda _0, sampling_rate, locs_S, locs_O, _1, _2: np.mean((locs_O[1:] - locs_S) / sampling_rate),
    "SW_10": lambda sig, sampling_rate, locs_S, locs_O, _0, _1: np.mean(
        _calculate_SW(sig, locs_S, locs_O, sampling_rate, 0.1)
    ),
    "SW_25": lambda sig, sampling_rate, locs_S, locs_O, _0, _1: np.mean(
        _calculate_SW(sig, locs_S, locs_O, sampling_rate, 0.25)
    ),
    "SW_33": lambda sig, sampling_rate, locs_S, locs_O, _0, _1: np.mean(
        _calculate_SW(sig, locs_S, locs_O, sampling_rate, 0.33)
    ),
    "SW_50": lambda sig, sampling_rate, locs_S, locs_O, _0, _1: np.mean(
        _calculate_SW(sig, locs_S, locs_O, sampling_rate, 0.5)
    ),
    "SW_66": lambda sig, sampling_rate, locs_S, locs_O, _0, _1: np.mean(
        _calculate_SW(sig, locs_S, locs_O, sampling_rate, 0.66)
    ),
    "SW_75": lambda sig, sampling_rate, locs_S, locs_O, _0, _1: np.mean(
        _calculate_SW(sig, locs_S, locs_O, sampling_rate, 0.75)
    ),
    "DW_10": lambda sig, sampling_rate, locs_S, locs_O, _0, _1: np.mean(
        _calculate_DW(sig, locs_S, locs_O, sampling_rate, 0.1)
    ),
    "DW_25": lambda sig, sampling_rate, locs_S, locs_O, _0, _1: np.mean(
        _calculate_DW(sig, locs_S, locs_O, sampling_rate, 0.25)
    ),
    "DW_33": lambda sig, sampling_rate, locs_S, locs_O, _0, _1: np.mean(
        _calculate_DW(sig, locs_S, locs_O, sampling_rate, 0.33)
    ),
    "DW_50": lambda sig, sampling_rate, locs_S, locs_O, _0, _1: np.mean(
        _calculate_DW(sig, locs_S, locs_O, sampling_rate, 0.5)
    ),
    "DW_66": lambda sig, sampling_rate, locs_S, locs_O, _0, _1: np.mean(
        _calculate_DW(sig, locs_S, locs_O, sampling_rate, 0.66)
    ),
    "DW_75": lambda sig, sampling_rate, locs_S, locs_O, _0, _1: np.mean(
        _calculate_DW(sig, locs_S, locs_O, sampling_rate, 0.75)
    ),
    "DW_SW_10": lambda sig, sampling_rate, locs_S, locs_O, _0, _1: np.mean(
        _calculate_DW_SW(sig, locs_S, locs_O, sampling_rate, 0.1)
    ),
    "DW_SW_25": lambda sig, sampling_rate, locs_S, locs_O, _0, _1: np.mean(
        _calculate_DW_SW(sig, locs_S, locs_O, sampling_rate, 0.25)
    ),
    "DW_SW_33": lambda sig, sampling_rate, locs_S, locs_O, _0, _1: np.mean(
        _calculate_DW_SW(sig, locs_S, locs_O, sampling_rate, 0.33)
    ),
    "DW_SW_50": lambda sig, sampling_rate, locs_S, locs_O, _0, _1: np.mean(
        _calculate_DW_SW(sig, locs_S, locs_O, sampling_rate, 0.50)
    ),
    "DW_SW_66": lambda sig, sampling_rate, locs_S, locs_O, _0, _1: np.mean(
        _calculate_DW_SW(sig, locs_S, locs_O, sampling_rate, 0.66)
    ),
    "DW_SW_75": lambda sig, sampling_rate, locs_S, locs_O, _0, _1: np.mean(
        _calculate_DW_SW(sig, locs_S, locs_O, sampling_rate, 0.75)
    ),
    "PR_mean": lambda _0, sampling_rate, locs_S, _1, _2, _3: 60 / np.mean(np.diff(locs_S) / sampling_rate),
    "a_D": lambda sig, _0, _1, _2, locs_D, _3: np.mean(sig[locs_D]),
    "t_D": lambda _0, sampling_rate, _1, locs_O, locs_D, _2: np.mean((locs_D - locs_O[:-1]) / sampling_rate),
    "r_D": lambda sig, sampling_rate, _0, locs_O, locs_D, _1: np.mean(
        sig[locs_D] / ((locs_D - locs_O[:-1]) / sampling_rate)
    ),
    "a_N": lambda sig, _0, _1, _2, _3, locs_N: np.mean(sig[locs_N]),
    "t_N": lambda _0, sampling_rate, _1, locs_O, _2, locs_N: np.mean((locs_N - locs_O[:-1]) / sampling_rate),
    "r_N": lambda sig, sampling_rate, _0, locs_O, _1, locs_N: np.mean(
        sig[locs_N] / ((locs_N - locs_O[:-1]) / sampling_rate)
    ),
    "dT": lambda _0, sampling_rate, locs_S, _1, locs_D, _2: np.mean((locs_D - locs_S) / sampling_rate),
    "r_D_NC": lambda sig, sampling_rate, _0, locs_O, locs_D, locs_N: np.mean(
        sig[locs_D] / ((np.diff(locs_O) / sampling_rate) - ((locs_N - locs_O[:-1]) / sampling_rate))
    ),
    "r_N_NC": lambda sig, sampling_rate, _0, locs_O, _1, locs_N: np.mean(
        sig[locs_N] / ((np.diff(locs_O) / sampling_rate) - ((locs_N - locs_O[:-1]) / sampling_rate))
    ),
    "a_N_S": lambda sig, _0, locs_S, _1, _2, locs_N: np.mean(sig[locs_N] / sig[locs_S]),
    "AI": lambda sig, _0, locs_S, _1, locs_D, _2: np.mean(sig[locs_D] / sig[locs_S]),
    "AI_2": lambda sig, _0, locs_S, _1, locs_D, _2: np.mean((sig[locs_S] - sig[locs_D]) / sig[locs_S]),
}

FEATURES_TIME_SEGMENT = {
    "zcr": lambda sig, _0: _calculate_zcr(sig),
    "snr": lambda sig, _0: _calculate_snr(sig),
}


[docs]def ppg_time_features( sig: ArrayLike, sampling_rate: float, input_types: str, fiducials: dict = None, prefix: str = "ppg", **kwargs ) -> dict: """Calculates time-domain features. Cycle-based features: a_S: Mean amplitude of the systolic peaks t_S: Mean systolic peak duration t_C: Mean cycle duration DW: Mean diastolic peak duration SW_10: The systolic peak duration at 10% of systolic amplitude SW_25: The systolic peak duration at 25% of systolic amplitude SW_33: The systolic peak duration at 33% of systolic amplitude SW_50: The systolic peak duration at 50% of systolic amplitude SW_66: The systolic peak duration at 66% of systolic amplitude SW_75: The systolic peak duration at 75% of systolic amplitude DW_10: The diastolic peak duration at 10% of systolic amplitude DW_25: The diastolic peak duration at 25% of systolic amplitude DW_33: The diastolic peak duration at 33% of systolic amplitude DW_50: The diastolic peak duration at 50% of systolic amplitude DW_66: The diastolic peak duration at 66% of systolic amplitude DW_75: The diastolic peak duration at 75% of systolic amplitude DW_SW_10: The ratio of diastolic peak duration to systolic peak duration at 10% of systolic amplitude DW_SW_25: The ratio of diastolic peak duration to systolic peak duration at 25% of systolic amplitude DW_SW_33: The ratio of diastolic peak duration to systolic peak duration at 33% of systolic amplitude DW_SW_50: The ratio of diastolic peak duration to systolic peak duration at 50% of systolic amplitude DW_SW_66: The ratio of diastolic peak duration to systolic peak duration at 66% of systolic amplitude DW_SW_75: The ratio of diastolic peak duration to systolic peak duration at 75% of systolic amplitude PR_mean: Mean pulse rate a_D: Mean amplitude of the diastolic peaks t_D: Mean difference between diastolic peak and onset r_D: Mean ratio of the diastolic peak amplitude to diastolic peak duration a_N: Mean amplitude of the dicrotic notchs t_N: Mean dicrotic notch duration r_N: Mean ratio of the dicrotic notch amplitude to dicrotic notch duration dT: Mean duration from systolic to diastolic peaks r_D_NC: Mean ratio of diastolic peak amplitudes to difference between ppg wave duration and dictoric notch duration r_N_NC: Mean ratio of dicrotic notch amplitudes to difference between ppg wave duration and dictoric notch duration a_N_S: Mean ratio of dicrotic notch amplitudes to systolic peak amplitudes AI: Mean ratio of diastolic peak amplitudes to systolic peak amplitudes AI_2: Mean ratio of difference between systolic and diastolic peak amplitudes to systolic peak amplitudes Segment-based features: zcr: Zero crossing rate snr: Signal to noise ratio Args: sig (ArrayLike): Signal to be analyzed. sampling_rate (float): Sampling rate of the signal (Hz). input_types (str): Type of feature calculation, should be 'segment' or 'cycle'. fiducials (dict, optional): Dictionary of fiducial point locations. Defaults to None. prefix (str, optional): Prefix for signal type. Defaults to 'ppg'. Kwargs: peaks_locs (ArrayLike): Array of peak locations troughs_locs (ArrayLike): Array of trough locations Raises: ValueError: If sampling rate is not greater than 0. ValueError: If PPG onset locations is not provided. ValueError: If PPG S-peak locations is not provided. ValueError: If Type is not 'cycle' or 'segment'. Returns: dict: Dictionary of calculated features. """ if sampling_rate <= 0: raise ValueError("Sampling rate must be greater than 0.") input_types = [x.lower() for x in input_types] features_time = {} for type in input_types: if type == "cycle": feature_list = FEATURES_TIME_CYCLE.copy() if fiducials is not None: fiducial_names = ["O_waves", "S_waves", "D_waves", "N_waves"] fiducials = {key: fiducials.get(key, []) for key in fiducial_names} locs_O = fiducials["O_waves"] locs_S = fiducials["S_waves"] locs_D = fiducials["D_waves"] locs_N = fiducials["N_waves"] locs_S, _ = correct_missing_duplicate_peaks(locs_valleys=locs_O, locs_peaks=locs_S, peaks=sig[locs_S]) locs_D, _ = correct_missing_duplicate_peaks(locs_valleys=locs_O, locs_peaks=locs_D, peaks=sig[locs_D]) locs_N, _ = correct_missing_duplicate_peaks(locs_valleys=locs_O, locs_peaks=locs_N, peaks=sig[locs_N]) else: locs_O = kwargs["troughs_locs"] locs_S = kwargs["peaks_locs"] locs_D = np.array([]) locs_N = np.array([]) locs_S, _ = correct_missing_duplicate_peaks(locs_valleys=locs_O, locs_peaks=locs_S, peaks=sig[locs_S]) if len(locs_O) == 0: raise ValueError("PPG onset locations must be provided to calculate cycle-based features.") if len(locs_S) == 0: raise ValueError("PPG S peak locations must be provided to calculate cycle-based features.") if len(locs_D) == 0: D_features = ["a_D", "t_D", "r_D", "dT", "r_D_NC", "r_N_NC", "AI", "AI_2"] [feature_list.pop(key, None) for key in D_features] if len(locs_N) == 0: N_features = ["a_N", "t_N", "r_N", "r_D_NC", "r_N_NC", "a_N_S"] [feature_list.pop(key, None) for key in N_features] for key, func in feature_list.items(): try: features_time["_".join([prefix, key])] = func(sig, sampling_rate, locs_S, locs_O, locs_D, locs_N) except: features_time["_".join([prefix, key])] = np.nan elif type == "segment": for key, func in FEATURES_TIME_SEGMENT.items(): try: features_time["_".join([prefix, key])] = func(sig, sampling_rate) except: features_time["_".join([prefix, key])] = np.nan else: raise ValueError("Type should be 'cycle' or 'segment'.") return features_time
def _calculate_SW( sig: ArrayLike, peaks_locs: ArrayLike, troughs_locs: ArrayLike, sampling_rate: float, ratio: float ) -> float: """Calculates systolic phase duration of the PPG waveform.""" peaks_amp = sig[peaks_locs] troughs_amp = sig[troughs_locs] SWs = [] for c in range(len(troughs_locs) - 1): sys_amp = peaks_amp[c] - troughs_amp[c] thresh = (ratio * sys_amp) + troughs_amp[c] ind = np.where(sig[troughs_locs[c] : peaks_locs[c]] >= thresh) SWs.append(len(ind[0]) / sampling_rate) return np.array(SWs) def _calculate_DW( sig: ArrayLike, peaks_locs: ArrayLike, troughs_locs: ArrayLike, sampling_rate: float, ratio: float ) -> float: """Calculates diastolic phase duration of the waveform.""" peaks_amp = sig[peaks_locs] troughs_amp = sig[troughs_locs] DWs = [] for c in range(len(troughs_locs) - 1): sys_amp = peaks_amp[c] - troughs_amp[c] thresh = (ratio * sys_amp) + troughs_amp[c] ind = np.where(sig[peaks_locs[c] : troughs_locs[c + 1]] >= thresh) DWs.append(len(ind[0]) / sampling_rate) return np.array(DWs) def _calculate_DW_SW( sig: ArrayLike, peaks_locs: ArrayLike, troughs_locs: ArrayLike, sampling_rate: float, ratio: float ) -> float: """Calculates the ratio of diastolic phase duration of the waveform to systolic phase duration of the waveform.""" dw = _calculate_DW(sig, peaks_locs=peaks_locs, troughs_locs=troughs_locs, sampling_rate=sampling_rate, ratio=ratio) sw = _calculate_SW(sig, peaks_locs=peaks_locs, troughs_locs=troughs_locs, sampling_rate=sampling_rate, ratio=ratio) return dw / sw def _calculate_zcr(sig: ArrayLike) -> float: """Calculates zero crossing rate, defined as number of zero-crossings to signal length.""" sig_ = sig - np.mean(sig) numZeroCrossing = len(np.where(np.diff(np.sign(sig_)))[0]) return numZeroCrossing / len(sig_) def _calculate_snr(sig: ArrayLike) -> float: """Calculates signal to noise ratio.""" mn_sig = np.mean(sig) std_sig = np.std(sig) snratio = np.where(std_sig == 0, 0, mn_sig / std_sig).item() return snratio