Source code for biobss.pipeline.channel_input

import numpy as np
import pandas as pd

from .bio_channel import Channel
from .bio_data import Bio_Data


[docs]def convert_channel(signal, sampling_rate=None, name=None, n_windows=1, n_signal=None, index=None): if isinstance(signal, Channel): output = Bio_Data() output.add_channel(signal) return output elif isinstance(signal, Bio_Data): return signal elif isinstance(signal, dict): output = _from_dict(signal, sampling_rate, name, n_windows, n_signal, index) elif isinstance(signal, list): output = _from_list(signal, sampling_rate, name, n_windows, n_signal, index) elif isinstance(signal, tuple): output = _from_tuple(signal, sampling_rate, name, n_windows, n_signal, index) elif isinstance(signal, np.ndarray): output = _from_array(signal, sampling_rate, name, n_windows, n_signal, index) elif isinstance(signal, pd.DataFrame): output = _from_dataframe(signal, sampling_rate, name, n_windows, n_signal, index) return output
def _from_dataframe(signal, sampling_rate=None, name=None, n_windows=1, n_signal=None, index=None): if name is not None: if isinstance(name, str): if len(signal.columns) == 1: signal.columns = [name] else: raise ValueError("name must be a list of strings if signal has more than one column") elif isinstance(name, list): if len(signal.columns) == len(name): signal.columns = name else: raise ValueError("name must be a list of strings if signal has more than one column") if index is not None: if isinstance(index, str): signal = signal[index] elif isinstance(index, list): if any([not isinstance(key, str) for key in index]): raise ValueError("index must be a list of strings") else: new_signal = {} for key in index: new_signal[key] = signal[key] signal = new_signal _check_sampling_rate(sampling_rate, n_signal) if len(signal.keys()) == 0: raise ValueError("signal must have at least one key") output = Bio_Data() for key in signal.keys(): data = signal[key] data = np.array(data) data = data.reshape(n_windows, -1) data = data.squeeze() output.add_channel(Channel(data, key, sampling_rate)) return output def _from_dict(signal, sampling_rate=None, name=None, n_windows=1, n_signal=None, index=None): if index is not None: if isinstance(index, str): signal = signal[index] elif isinstance(index, list): if any([not isinstance(key, str) for key in index]): raise ValueError("index must be a list of strings") else: new_signal = {} for key in index: new_signal[key] = signal[key] signal = new_signal _check_sampling_rate(sampling_rate, n_signal) if len(signal.keys()) == 0: raise ValueError("signal must have at least one key") output = Bio_Data() for key in signal.keys(): if not isinstance(signal[key], (list, tuple, np.ndarray)): raise ValueError("signal must be a list, tuple or numpy array") data = signal[key] data = np.array(data) data = data.reshape(n_windows, -1) data = data.squeeze() output.add_channel(Channel(data, key, sampling_rate)) return output def _from_list(signal, sampling_rate=None, name=None, n_windows=1, n_signal=None, index=None): if index is not None: if isinstance(index, int): signal = [signal[index]] elif isinstance(index, list): if any([not isinstance(key, int) for key in index]): raise ValueError("index must be a list of integers") else: new_signal = [] for key in index: new_signal.append(signal[key]) signal = new_signal if all(isinstance(item, (Channel,)) for item in signal): output = Bio_Data() if name is not None: if isinstance(name, str): for i in range(len(signal)): signal[i].signal_name = name + "_" + str(i) elif isinstance(name, list): for i in range(len(name)): signal[i].signal_name = name[i] for s in signal: output.add_channel(s) return output _check_sampling_rate(sampling_rate, n_signal) if any([not isinstance(item, (list, np.ndarray)) for item in signal]): raise ValueError("signal must be a list of lists, tuples or numpy arrays") if name is None: name = "signal" elif isinstance(name, list): if len(name) != len(signal): raise ValueError("name must be a string or a list of length len(signal)") signal = np.array(signal) if n_signal is None: n_signal = 1 output = Bio_Data() signal = signal.reshape(n_signal, n_windows, -1) for i in range(signal.shape[0]): data = signal[i] data = data.reshape(n_windows, -1) data = data.squeeze() if name == "signal": cur_name = "signal_" + str(i) elif isinstance(name, list): cur_name = name[i] elif isinstance(name, str): cur_name = name else: raise ValueError("name must be a string or a list of length len(signal)") if not isinstance(sampling_rate, (int, float)): cur_rate = sampling_rate[i] else: cur_rate = sampling_rate output.add_channel(Channel(data, cur_name, cur_rate)) return output def _from_tuple(signal, sampling_rate=None, name=None, n_windows=1, n_signal=None, index=None): if not isinstance(signal, tuple): raise ValueError("signal must be a tuple") if index is not None: if isinstance(index, int): signal = [signal[index]] elif isinstance(index, list): if any([not isinstance(key, int) for key in index]): raise ValueError("index must be a list of integers") else: new_signal = [] for key in index: new_signal.append(signal[key]) signal = new_signal if all(isinstance(item, (Channel,)) for item in signal): output = Bio_Data() for s in signal: output.add_channel(s) return output if n_signal is not None: if len(signal) != n_signal: raise ValueError("signal must be a tuple of length n_signal") if name is None: name = "signal" output = Bio_Data() _check_sampling_rate(sampling_rate, n_signal) for i, s in enumerate(signal): data = np.array(s) data = data.reshape(n_windows, -1) data = data.squeeze() if name == "signal": cur_name = "signal_" + str(i) elif isinstance(name, list): cur_name = name[i] else: cur_name = name if isinstance(sampling_rate, (int, float)): cur_rate = sampling_rate elif isinstance(sampling_rate, list): cur_rate = sampling_rate[i] output.add_channel(Channel(data, cur_name, cur_rate)) return output def _from_array(signal, sampling_rate=None, name=None, n_windows=1, n_signal=None, index=None): if index is not None: if isinstance(index, int): signal = signal[index] elif isinstance(index, list): if any([not isinstance(key, int) for key in index]): raise ValueError("index must be a list of integers") else: new_signal = [] for key in index: new_signal.append(signal[key]) signal = new_signal if not isinstance(signal, (np.ndarray)): raise ValueError("signal must be a numpy array") if n_signal is not None: if signal.shape[0] != n_signal: raise ValueError("signal must be a numpy array of shape (n_signal,n_windows,n_samples)") else: n_signal = 1 if name is None: name = "signal" _check_sampling_rate(sampling_rate, n_signal) output = Bio_Data() signal = signal.reshape(n_signal, n_windows, -1) for i in range(signal.shape[0]): data = signal[i] data = data.reshape(n_windows, -1) data = data.squeeze() if name == "signal": cur_name = "signal_" + str(i) elif isinstance(name, list): cur_name = name[i] else: cur_name = name if isinstance(sampling_rate, (int, float)): cur_rate = sampling_rate elif isinstance(sampling_rate, list): cur_rate = sampling_rate[i] output.add_channel(Channel(data, cur_name, cur_rate)) return output def _check_sampling_rate(sampling_rate, n_signal): if sampling_rate is None: raise ValueError("sampling_rate must be provided") elif not isinstance(sampling_rate, (int, float, list)): raise ValueError("sampling_rate must be a number or a list of length n_signal") elif isinstance(sampling_rate, list): if any([not isinstance(rate, (int, float)) for rate in sampling_rate]): raise ValueError("sampling_rate must be a number or a list of length n_signal") if n_signal is not None: if isinstance(sampling_rate, (int, float)): sampling_rate = sampling_rate elif isinstance(sampling_rate, list): if len(sampling_rate) != n_signal: raise ValueError("sampling_rate must be a number or a list of length n_signal") for rate in sampling_rate: if not isinstance(rate, (int, float)): raise ValueError("sampling_rate must be a number or a list of length n_signal") elif rate <= 0: raise ValueError("sampling_rate must be positive") return True