diff --git a/src/pojagi_dsp/channel/ecg/generator/wavetable/__init__.py b/src/pojagi_dsp/channel/ecg/generator/wavetable/__init__.py index 18ec385..7f79d87 100644 --- a/src/pojagi_dsp/channel/ecg/generator/wavetable/__init__.py +++ b/src/pojagi_dsp/channel/ecg/generator/wavetable/__init__.py @@ -56,10 +56,11 @@ class ECGWaveTable: data, bc_type="natural", ) - self.data = np.array([ - cs(x) for x in np.linspace(0, len(data), table_length) - ]) + self.data = np.array( + [cs(x) for x in np.linspace(0, len(data), table_length)] + ) + # Scale the declared segments to the table_length self.segments = Segments( **{ k: int(v / (len(data) / table_length)) if v else v @@ -80,9 +81,11 @@ class ECGWaveTable: # Normalize between 0 and 1: self.data = (self.data - bottom) / (top - bottom) - def __getitem__(self, k): return self.data[k] + def __getitem__(self, k): + return self.data[k] - def __len__(self): return len(self.data) # O(1) + def __len__(self): + return len(self.data) # O(1) def linear_interpolation( self, @@ -107,9 +110,7 @@ class ECGWaveTable: # b. 1 - 0 == 1 (all weight goes to floor) floor_weight = 1 - ceiling_weight - return ( - self[floor] * floor_weight + self[ceiling] * ceiling_weight - ) + return self[floor] * floor_weight + self[ceiling] * ceiling_weight def merge( self, @@ -127,59 +128,77 @@ class ECGWaveTable: class ECGWaveTableSynthesizer(AECGSynthesizer): def __init__( - self, /, + self, + /, tables: Dict[Tuple[float, float], ECGWaveTable], heart_rate: int, srate: Optional[float] = None, ): + super().__init__(heart_rate, srate) self.inc: float = 0.0 self.tables = tables - super().__init__(heart_rate, srate) - self._segments: Segments = None + self.phase = 0.0 + self.q_lock = False def samples(self): - index: float = 0.0 + # phase: float = 0.0 inc: float = None - n: int = 0 + idx: int = 0 + heart_rate = self.heart_rate - while True: - if n > 0: - floor = math.floor(index) - else: - # `n` cannot be negative - floor = 0 - index = 0.0 + self._calibrate() - # NOTE: it's important that this `n` calculation occur before - # `yield` because the `heart_rate` (and, hence, the `wavelength`) - # may change in the meantime, which affects resetting to zero with - # the modulo operation below. - n = (n + 1) % self.wavelength + while idx < self.wavelength: + phase = self.phase + floor = math.floor(phase) - yield self.table.linear_interpolation(index, floor=floor) + yield self.table.linear_interpolation(phase, floor=floor) - if (self.heart_rate < 60 - and - self.table.segments.T_P <= floor < self.table.segments.P): + if ( + heart_rate < 60 + and self.table.segments.T_P <= floor < self.table.segments.P + ): inc = self.brady_inc - elif (self.heart_rate > 60 - and - self.table.segments.T <= floor < self.table.segments.Q): + logger.info(["brady", inc, self.inc]) + self.q_lock = True + elif ( + heart_rate > 60 + and self.table.segments.S_T <= floor < self.table.segments.Q + ): # FIXME: this is probably only good below a certain - # `heart_rate` threshold. + # `heart_rate` threshold, because at some high frequency, even + # the QRS complex will not have enough room to complete. inc = self.tachy_inc + self.q_lock = True else: inc = None + if self.q_lock: + phase = self.table.segments.Q + self.q_lock = False + logger.info(f"\n{[ + self.table.linear_interpolation(phase, floor=floor), + inc, + self.inc, + self.table.segments.Q, + phase, + self.table.segments.S_T + ]}") - index += inc if inc is not None else self.inc - index %= len(self.table) + phase += inc if inc is not None else self.inc + phase %= len(self.table) + self.phase = phase + idx += 1 + + @AECGSynthesizer.heart_rate.setter + def heart_rate(self, val): + AECGSynthesizer.heart_rate.fset(self, val) + + def _calibrate(self): + heart_rate = self.heart_rate - def _recalibrate(self, heart_rate: float) -> None: table_matches = { - k: v - for k, v in self.tables.items() - if k[0] <= heart_rate < k[1] + k: v for k, v in self.tables.items() if k[0] <= heart_rate < k[1] } if not table_matches: @@ -223,26 +242,20 @@ class ECGWaveTableSynthesizer(AECGSynthesizer): # Stretch only the T_P segment to compensate, rather than # stretching the whole wave. - table_segment_length = \ - self.table.segments.P - self.table.segments.T_P + table_segment_length = self.table.segments.P - self.table.segments.T_P self.brady_inc = self.stretch_inc(table_segment_length) # Preserve QRS-J-point; compress Jp-Q to compensate. - table_segment_length = \ - self.table.segments.Q - self.table.segments.S_T + table_segment_length = self.table.segments.Q - self.table.segments.S_T self.tachy_inc = self.stretch_inc(table_segment_length) - @AECGSynthesizer.heart_rate.setter - def heart_rate(self, val): - AECGSynthesizer.heart_rate.fset(self, val) - self._recalibrate(heart_rate=val) - def stretch_inc(self, table_segment_length): # Get the missing samples by subtracting the number of samples # contributed by the 1Hz table default, minus the segment we # want to stretch. - tmp_wavelength = self.wavelength - \ - (len(self.table) - table_segment_length) / self.inc + tmp_wavelength = ( + self.wavelength - (len(self.table) - table_segment_length) / self.inc + ) return table_segment_length / tmp_wavelength diff --git a/src/pojagi_dsp/channel/ecg/generator/wavetable/sinus.py b/src/pojagi_dsp/channel/ecg/generator/wavetable/sinus.py index 2a9b56d..2d4582c 100644 --- a/src/pojagi_dsp/channel/ecg/generator/wavetable/sinus.py +++ b/src/pojagi_dsp/channel/ecg/generator/wavetable/sinus.py @@ -7,105 +7,242 @@ from pojagi_dsp.channel.ecg import Segments from pojagi_dsp.channel.ecg.generator.wavetable import ECGWaveTable # NOTE: larger table required for avoiding aliasing at different srates than 125Hz -sinus_data = [ +sinus_data = np.array([ # R-S: 0 - 2000, 1822, 374, - + 2000, + 1822, + 374, # S-Jp: 3 - -474, -271, -28, 18, 66, - + -474, + -271, + -28, + 18, + 66, # Jp-T: 9 - 63, 73, 91, 101, 101, 101, 116, 124, + 63, + 73, + 91, + 101, + 101, + 101, + 116, + 124, 124, - # T: 17 - 141, 171, 186, 196, 229, 265, 297, 327, - 363, 406, 446, 475, 493, 508, 526, 533, - 518, 475, 403, 327, 272, 222, 174, 138, - 109, 88, 73, 66, 69, 69, 66, 73, - 81, 76, 73, 76, 76, 66, 58, 58, - 63, 63, 41, 26, 26, 18, 8, 8, - 8, - - # U: 66 -- not found - - # T-P: 66 - 2, 3, 2, 2, 2, -1, 2, 2, - 2, -1, 0, -1, -1, 3, 2, 1, - 3, 2, 1, 0, 1, - - # P: 87 - 0, 3, 11, 11, 0, 8, 18, 18, - 18, 15, 8, 18, 26, 26, 26, 8, - 32, 61, 116, 164, 182, 159, 131, 116, - 116, 109, 91, 73, 58, 55, 58, 63, + 141, + 171, + 186, + 196, + 229, + 265, + 297, + 327, + 363, + 406, + 446, + 475, + 493, + 508, + 526, + 533, + 518, + 475, + 403, + 327, + 272, + 222, + 174, + 138, + 109, + 88, + 73, + 66, + 69, + 69, + 66, + 73, + 81, + 76, + 73, + 76, + 76, + 66, + 58, + 58, + 63, + 63, + 41, + 26, + 26, + 18, + 8, + 8, + 8, + # U: 66 -- not found + # T-P: 66 + 2, + 3, + 2, + 2, + 2, + -1, + 2, + 2, + 2, + -1, + 0, + -1, + -1, + 3, + 2, + 1, + 3, + 2, + 1, + 0, + 1, + # P: 87 + 0, + 3, + 11, + 11, + 0, + 8, + 18, + 18, + 18, + 15, + 8, + 18, + 26, + 26, + 26, + 8, + 32, + 61, + 116, + 164, + 182, + 159, + 131, + 116, + 116, + 109, + 91, + 73, + 58, + 55, + 58, + 63, 69, - # P-R: 120 - 48, -14, - + 48, + -14, # Q-R: 122 - -40, 131, 931, -] # len == 125 + -40, + 131, + 931, +]) # len == 125 -tpr = np.arange(-math.floor(120-18)/2, math.ceil(120-18)/2) -tpr_curve: np.ndarray = (tpr ** 2 * -0.1) -tpr_curve = (tpr_curve - tpr_curve[0]) + 141 +pr_idx = 120 +t_idx = 18 +t_pr_idx_diff = pr_idx - t_idx +t_pr = np.arange(-math.floor(t_pr_idx_diff / 2), math.ceil(t_pr_idx_diff / 2)) +t_pr_curve: np.ndarray = t_pr**2 * -0.2 +t_pr_curve = (t_pr_curve - t_pr_curve[0]) + 141 -tachycardia = np.array([ # 58-107 flat - # R-S: 0 - 2000, 1822, 374, - - # S-Jp: 3 - -474, -271, -28, 18, 66, - - # Jp-T: 9 - 63, 73, 91, 101, 101, 101, 116, 124, - 124, - - *tpr_curve, - - # P-R: 119 - 124, 48, -14, - - # Q-R: 122 - *(np.array([-40, 131, 931]) * 0.25), - -]) * (2/3) # len == 125 +tachycardia = np.array( + [ # 58-107 flat + # R-S: 0 + 2000, + 1822, + 374, + # S-Jp: 3 + -474, + -271, + -28, + 18, + 66, + # Jp-T: 8 + 63, + 73, + 91, + 101, + 101, + 101, + 116, + 124, + 124, + *t_pr_curve, + # P-R: 119 + 124, + 48, + -14, + # Q-R: 122 + -40, + 731, + 1231, + ] +) # len == 125 -def SinusWaveTable(): return ECGWaveTable( - data=sinus_data, - segments=Segments( - S=3, - S_T=9, - T=17, - T_P=66, - P=87, - P_R=120, - Q=122, - ), -) +def SinusWaveTable(): + return ECGWaveTable( + data=sinus_data, + segments=Segments( + S=3, + S_T=9, + T=17, + T_P=66, + P=87, + P_R=120, + Q=122, + ), + ) -def TachycardiaWaveTable(): return ECGWaveTable( - data=tachycardia, - segments=Segments( - S=3, - S_T=9, - T=17, - T_P=66, - P=87, - P_R=119, - Q=122, - ), - top=2000, - bottom=0, -) + +def TachycardiaWaveTable(): + return ECGWaveTable( + data=tachycardia, + segments=Segments( + S=3, + S_T=8, + T=17, + T_P=66, + P=87, + P_R=119, + Q=122, + ), + # Tachy is weaker than sinus, so we inflate the range here, which + # effectively attenuates the signal by 1/3 (i.e., it is 2/3 the + # original). + top=2000 * (3 / 2), + bottom=0, + ) + +impulse_data = np.array([1, *([0] * 124)]) +def ImpulseWaveTable(): + return ECGWaveTable( + data=impulse_data, + segments=Segments( + S=3, + S_T=9, + T=17, + T_P=66, + P=87, + P_R=119, + Q=122, + ), + top=1, + bottom=0, + ) if __name__ == "__main__": from matplotlib import pyplot as plt - plt.plot(range(len(tachycardia.tolist() * 3)), tachycardia.tolist() * 3) - plt.show() \ No newline at end of file + samples = np.tile(SinusWaveTable().data, 3) + + plt.plot(range(len(samples)), samples) + plt.show()