Source code for biobss.pipeline.bio_data

import copy as copy
import warnings
from typing import Union

import numpy as np
import pandas as pd
from numpy.typing import ArrayLike

from .bio_channel import Channel
from .event_channel import Event_Channel


[docs]class Bio_Data: """Signal object with add and iterate process objects""" def __init__(self): # Docstring """Signal object with add and iterate process objects Attributes ----------- channels: dict Dictionary of channels """ # self.channels = {} # Data_Channel objects
[docs] def add_channel( self, signal: Union[ArrayLike, Channel, Event_Channel], channel_name: str = None, sampling_rate: Union[int, float] = None, modify_existed: bool = False, is_event=False, ): # Docstrin """Add a channel to the signal Parameters ---------- signal: Union[ArrayLike, Channel ,Event_Channel] Signal to add channel_name: str Name of the channel sampling_rate: Union[int, float] Sampling rate of the signal modify_existed: bool If True, if a channel with the same name already exists, it will be overwritten If False, if a channel with the same name already exists, a new name will be generated is_event: bool If True, the signal will be converted to an Event_Channel object If False, the signal will be converted to a Channel object """ # if isinstance(signal, pd.DataFrame): if channel_name is None: channel_name = "channel" if channel_name in self.channels.keys(): if modify_existed: warnings.warn("Overwriting channel " + channel_name) else: i = 1 while channel_name in self.channels.keys(): channel_name = channel_name + "_" + str(i) i = i + 1 self.channels[channel_name] = Event_Channel(signal, channel_name, sampling_rate) return if isinstance(signal, (Channel, Event_Channel)): if channel_name is None: channel_name = signal.signal_name if channel_name in self.channels.keys(): if modify_existed: warnings.warn("Overwriting channel " + channel_name) else: i = 1 while channel_name in self.channels.keys(): channel_name = channel_name + "_" + str(i) i = i + 1 self.channels[channel_name] = signal.copy() else: if isinstance(signal, (np.ndarray, list)): signal = np.array(signal) if sampling_rate is None: raise ValueError("sampling_rate must be provided if signal is not a Channel or Event_Channel object") if not isinstance(sampling_rate, (int, float)): raise ValueError("sampling_rate must be a float or integer") if channel_name is None: channel_name = "channel" if channel_name in self.channels.keys(): if modify_existed: warnings.warn("Overwriting channel " + channel_name) else: i = 1 while channel_name in self.channels.keys(): channel_name = channel_name + "_" + str(i) i = i + 1 if is_event: self.channels[channel_name] = Event_Channel(signal, channel_name, sampling_rate) else: self.channels[channel_name] = Channel(signal, channel_name, sampling_rate)
[docs] def remove_channel(self, channel_name): # Docstring """Remove a channel from the signal Parameters ----------- channel_name: str Name of the channel to remove """ # if channel_name not in self.channels.keys(): raise ValueError("Channel does not exist") self.channels.pop(channel_name)
[docs] def rename_channel(self, old_name, new_name): # Docstring """Rename a channel Parameters ----------- old_name: str Old name of the channel new_name: str New name of the channel """ # if old_name not in self.channels.keys(): raise ValueError("Channel does not exist") if new_name in self.channels.keys(): raise ValueError("Channel name already exists") self.channels[new_name] = self.channels.pop(old_name)
[docs] def get_channel_names(self): return list(self.channels.keys())
[docs] def copy(self): return copy.deepcopy(self)
[docs] def join(self, other: "Bio_Data", overwrite: bool = False): # Docstring """Join two Bio_Data objects Parameters ----------- other: Bio_Data Bio_Data object to join overwrite: bool If True, if a channel with the same name already exists, it will be overwritten If False, if a channel with the same name already exists, a new name will be generated """ # if not isinstance(other, Bio_Data): raise ValueError("Can only join Bio_Data objects") other = other.copy() for k, v in other.channels.items(): if k in self.channels.keys(): if overwrite: warnings.warn("Overwriting channel " + k) else: k = k + "_1" self.channels[k] = v.copy() return self
def __getitem__(self, key: Union[str, int]) -> Union[Channel, Event_Channel]: if isinstance(key, str): return self.channels[key] elif isinstance(key, int): return self.channels[list(self.channels.keys())[key]] def __setitem__(self, key, value): if isinstance(value, (Channel, Event_Channel)): self.channels[key] = value else: raise ValueError("Value must be a Channel or Event_Channel object") return None def __repr__(self) -> str: representation = "Signal object with " + str(self.channel_count) + " channel(s)\n" for k, v in self.channels.items(): representation += v.signal_name representation += " (" + str(v.sampling_rate) + "Hz)" # representation += " (" + str(v.signal_duration) + "s)" representation += " (" + str(v.n_windows) + " windows)" if isinstance(v, Channel): representation += " (" + str(v.channel.shape) + ")" representation += "\n" return representation def __eq__(self, other: object) -> bool: if isinstance(other, Bio_Data): if self.get_channel_names() != other.get_channel_names(): return False else: for k in self.get_channel_names(): if self[k] != other[k]: return False else: return True else: return False @property def channel_count(self): return len(self.channels) @property def multichannel(self): return self.channel_count > 1