Source code for biobss.sqatools.signal_quality

import math
from typing import Tuple

import numpy as np
from numpy.typing import ArrayLike

# Constants to check for physiological viability and morphological features.
HR_MIN = 40
HR_MAX = 180
PP_MAX = 3
MAX_PP_RATIO = 2.2
MIN_SPD = 0.08
MAX_SPD = 0.49
SP_DP_RATIO = 1.1
MIN_PWD = 0.27
MAX_PWD = 2.4
MAX_VAR_DUR = 300
MAX_VAR_AMP = 400
CORR_TH = 0.9


[docs]def detect_clipped_segments(sig: ArrayLike, threshold_pos: float, threshold_neg: float = None) -> list: """Detects clipped segments in a signal. Args: sig (ArrayLike): Signal to be analyzed (ECG or PPG). threshold_pos (float): Threshold for positive clipping threshold_neg (float, optional): Threshold for negative clipping. Defaults to None. Returns: list: Dictionary of boundaries of clipped segments. """ if threshold_neg is None: threshold_neg = -threshold_pos start_indices = [] end_indices = [] in_clipped_segment = False for i, value in enumerate(sig): if value >= threshold_pos or value <= threshold_neg: if not in_clipped_segment: # Start of a new clipped segment start_indices.append(i) in_clipped_segment = True else: if in_clipped_segment: # End of a clipped segment end_indices.append(i - 1) in_clipped_segment = False if in_clipped_segment: # The last segment extends until the end of the signal end_indices.append(len(sig) - 1) return list(zip(start_indices, end_indices))
[docs]def detect_flatline_segments(sig: ArrayLike, min_duration: float, change_threshold: float) -> list: """Detects flatline segments in a signal. Args: sig (ArrayLike): Signal to be analyzed (ECG or PPG). min_duration (float): Mimimum duration of flat segments for flatline detection. change_threshold (float): Threshold for change in signal amplitude. Returns: list: List of boundaries of flatline segments. """ start_indices = [] end_indices = [] in_flatline_segment = False for i in range(1, len(sig)): change = abs(sig[i] - sig[i - 1]) if change <= change_threshold and sig[i] != max(sig) and sig[i] != min(sig): if not in_flatline_segment: # Start of a new flatline segment start_indices.append(i - 1) in_flatline_segment = True else: if in_flatline_segment: # End of a flatline segment end_indices.append(i - 1) in_flatline_segment = False if in_flatline_segment: # The last segment extends until the end of the signal end_indices.append(len(sig) - 1) # Filter segments by duration durations = [end - start + 1 for start, end in zip(start_indices, end_indices)] start_indices = [start for start, duration in zip(start_indices, durations) if duration >= min_duration] end_indices = [end for end, duration in zip(end_indices, durations) if duration >= min_duration] return list(zip(start_indices, end_indices))
[docs]def check_phys(peaks_locs: ArrayLike, sampling_rate: float) -> dict: """Checks for physiological viability. Rule 1: Average HR should be between 40-180 bpm (up to 300 bpm in the case of exercise) Rule 2: Maximum P-P interval: 1.5 seconds. Allowing for a single missing beat, it is 3 seconds Rule 3: Maximum P-P interval / minimum P-P interval ratio: 10 of the signal length for a short signal. For 10 seconds signal, it is 1.1; allowing for a single missing beat, it is 2.2 Args: peaks_locs (ArrayLike): Array of peak locations. sampling_rate (float): Sampling rate of the input signal. Returns: dict: Dictionary of decisions. """ if sampling_rate <= 0: raise ValueError("Sampling rate must be greater than 0.") info = {} # Rule 1: Average HR should be between 40-180 bpm (up to 300 bpm in the case of exercise) intervals = np.diff(peaks_locs) / sampling_rate HR_mean = 60 / np.mean(intervals) if HR_mean < HR_MIN or HR_mean > HR_MAX: info["Rule 1"] = False else: info["Rule 1"] = True # Rule 2: Maximum P-P interval: 1.5 seconds. Allowing for a single missing beat, it is 3 seconds if np.size(np.where(intervals > PP_MAX)) > 0: info["Rule 2"] = False else: info["Rule 2"] = True # Rule 3: Maximum P-P interval / minimum P-P interval ratio: 10 of the signal length for a short signal. # For 10 seconds signal, it is 1.1; allowing for a single missing beat, it is 2.2 if (intervals.max() / intervals.min()) > MAX_PP_RATIO: info["Rule 3"] = False else: info["Rule 3"] = True return info
[docs]def check_morph(sig: ArrayLike, peaks_locs: ArrayLike, troughs_locs: ArrayLike, sampling_rate: float) -> dict: """Checks for ranges of morphological features. Rule 1: Systolic phase duration(rise time): 0.08 to 0.49 s Rule 2: Ratio of systolic phase duration to diastolic phase duration: max 1.1 Rule 3: Pulse wave duration: 0.27 to 2.4 s Rule 4: Variation in PWD and SP: 33-300% Rule 5: Variation in PWA: 25-400% (Pulse wave amplitude: a threshold which was set heuristically) Args: peaks_locs (Array): Array of peak locations. peaks_amps (Array): Array of peak amplitudes. troughs_locs (Array): Array of trough locations. troughs_amps (Array): Array of trough amplitudes. sampling_rate (float): Sampling rate of the input signal. Returns: dict: Dictionary of decisions. """ if sampling_rate <= 0: raise ValueError("Sampling rate must be greater than 0.") if len(peaks_locs) != len(troughs_locs) - 1: raise ValueError("Number of peaks and troughs are not compatible!") peaks_amps = sig[peaks_locs] troughs_amps = sig[troughs_locs] info = {} # Rule 1 SP = (peaks_locs - troughs_locs[:-1]) / sampling_rate if np.size(np.where(SP < MIN_SPD)) > 0 or np.size(np.where(SP > MAX_SPD)) > 0: info["Rule 1"] = False else: info["Rule 1"] = True # Rule 2: DP = (troughs_locs[1:] - peaks_locs) / sampling_rate # Diastolic phase duration SP_DP = SP / DP if np.size(np.where(SP_DP > SP_DP_RATIO)) > 0: info["Rule 2"] = False else: info["Rule 2"] = True # Rule 3: PWD = np.diff(troughs_locs) / sampling_rate if np.size(np.where(PWD < MIN_PWD)) > 0 or np.size(np.where(PWD > MAX_PWD)) > 0: info["Rule 3"] = False else: info["Rule 3"] = True # Rule 4: var_SP = (np.max(peaks_amps) - np.min(peaks_amps)) / np.min(peaks_amps) * 100 var_PWD = (np.max(PWD) - np.min(PWD)) / np.min(PWD) * 100 if (var_SP > MAX_VAR_DUR) or (var_PWD > MAX_VAR_DUR): info["Rule 4"] = False else: info["Rule 4"] = True # Rule 5: PWA = peaks_amps - troughs_amps[:-1] var_PWA = (np.max(PWA) - np.min(PWA)) / np.min(PWA) * 100 if var_PWA > MAX_VAR_AMP: info["Rule 5"] = False else: info["Rule 5"] = True return info
[docs]def template_matching(sig: ArrayLike, peaks_locs: ArrayLike, corr_th: float = CORR_TH) -> Tuple[float, bool]: """Applies template matching method for signal quality assessment. Args: sig (ArrayLike): Signal to be analyzed. peaks_locs (ArrayLike): Peak locations (Systolic peaks for PPG signal, R peaks for ECG signal). corr_th (float, optional): Threshold for the correlation coefficient above which the signal is considered to be valid. Defaults to CORR_TH. Returns: Tuple[float,bool]: Correlation coefficient and the decision """ if corr_th <= 0: raise ValueError("Threshold for the correlation coefficient must be greater than 0.") wl = np.median(np.diff(peaks_locs)) waves = np.empty((0, 2 * math.floor(wl / 2) + 1)) nofwaves = np.size(peaks_locs) for i in range((nofwaves)): wave_st = peaks_locs[i] - math.floor(wl / 2) wave_end = peaks_locs[i] + math.floor(wl / 2) wave = [] if wave_st < 0: wave = sig[:wave_end] for _ in range(-wave_st + 1): wave = np.insert(wave, 0, wave[0]) elif wave_end > len(sig) - 1: wave = sig[wave_st - 1 :] for _ in range(wave_end - len(sig)): wave = np.append(wave, wave[-1]) else: wave = sig[wave_st : wave_end + 1] waves = np.vstack([waves, wave]) sig_temp = np.mean(waves, axis=0) ps = np.array([]) for j in range(np.size(peaks_locs)): p = np.corrcoef(waves[j], sig_temp, rowvar=True) ps = np.append(ps, p[0][1]) if np.size(np.where(ps < corr_th)) > 0: result = False else: result = True return ps, result