q_lock experiment

This commit is contained in:
2025-09-30 09:21:30 -04:00
parent 32c8436713
commit cf033c4b4f
2 changed files with 283 additions and 133 deletions

View File

@@ -56,10 +56,11 @@ class ECGWaveTable:
data,
bc_type="natural",
)
self.data = np.array([
cs(x) for x in np.linspace(0, len(data), table_length)
])
self.data = np.array(
[cs(x) for x in np.linspace(0, len(data), table_length)]
)
# Scale the declared segments to the table_length
self.segments = Segments(
**{
k: int(v / (len(data) / table_length)) if v else v
@@ -80,9 +81,11 @@ class ECGWaveTable:
# Normalize between 0 and 1:
self.data = (self.data - bottom) / (top - bottom)
def __getitem__(self, k): return self.data[k]
def __getitem__(self, k):
return self.data[k]
def __len__(self): return len(self.data) # O(1)
def __len__(self):
return len(self.data) # O(1)
def linear_interpolation(
self,
@@ -107,9 +110,7 @@ class ECGWaveTable:
# b. 1 - 0 == 1 (all weight goes to floor)
floor_weight = 1 - ceiling_weight
return (
self[floor] * floor_weight + self[ceiling] * ceiling_weight
)
return self[floor] * floor_weight + self[ceiling] * ceiling_weight
def merge(
self,
@@ -127,59 +128,77 @@ class ECGWaveTable:
class ECGWaveTableSynthesizer(AECGSynthesizer):
def __init__(
self, /,
self,
/,
tables: Dict[Tuple[float, float], ECGWaveTable],
heart_rate: int,
srate: Optional[float] = None,
):
super().__init__(heart_rate, srate)
self.inc: float = 0.0
self.tables = tables
super().__init__(heart_rate, srate)
self._segments: Segments = None
self.phase = 0.0
self.q_lock = False
def samples(self):
index: float = 0.0
# phase: float = 0.0
inc: float = None
n: int = 0
idx: int = 0
heart_rate = self.heart_rate
while True:
if n > 0:
floor = math.floor(index)
else:
# `n` cannot be negative
floor = 0
index = 0.0
self._calibrate()
# NOTE: it's important that this `n` calculation occur before
# `yield` because the `heart_rate` (and, hence, the `wavelength`)
# may change in the meantime, which affects resetting to zero with
# the modulo operation below.
n = (n + 1) % self.wavelength
while idx < self.wavelength:
phase = self.phase
floor = math.floor(phase)
yield self.table.linear_interpolation(index, floor=floor)
yield self.table.linear_interpolation(phase, floor=floor)
if (self.heart_rate < 60
and
self.table.segments.T_P <= floor < self.table.segments.P):
if (
heart_rate < 60
and self.table.segments.T_P <= floor < self.table.segments.P
):
inc = self.brady_inc
elif (self.heart_rate > 60
and
self.table.segments.T <= floor < self.table.segments.Q):
logger.info(["brady", inc, self.inc])
self.q_lock = True
elif (
heart_rate > 60
and self.table.segments.S_T <= floor < self.table.segments.Q
):
# FIXME: this is probably only good below a certain
# `heart_rate` threshold.
# `heart_rate` threshold, because at some high frequency, even
# the QRS complex will not have enough room to complete.
inc = self.tachy_inc
self.q_lock = True
else:
inc = None
if self.q_lock:
phase = self.table.segments.Q
self.q_lock = False
logger.info(f"\n{[
self.table.linear_interpolation(phase, floor=floor),
inc,
self.inc,
self.table.segments.Q,
phase,
self.table.segments.S_T
]}")
index += inc if inc is not None else self.inc
index %= len(self.table)
phase += inc if inc is not None else self.inc
phase %= len(self.table)
self.phase = phase
idx += 1
@AECGSynthesizer.heart_rate.setter
def heart_rate(self, val):
AECGSynthesizer.heart_rate.fset(self, val)
def _calibrate(self):
heart_rate = self.heart_rate
def _recalibrate(self, heart_rate: float) -> None:
table_matches = {
k: v
for k, v in self.tables.items()
if k[0] <= heart_rate < k[1]
k: v for k, v in self.tables.items() if k[0] <= heart_rate < k[1]
}
if not table_matches:
@@ -223,26 +242,20 @@ class ECGWaveTableSynthesizer(AECGSynthesizer):
# Stretch only the T_P segment to compensate, rather than
# stretching the whole wave.
table_segment_length = \
self.table.segments.P - self.table.segments.T_P
table_segment_length = self.table.segments.P - self.table.segments.T_P
self.brady_inc = self.stretch_inc(table_segment_length)
# Preserve QRS-J-point; compress Jp-Q to compensate.
table_segment_length = \
self.table.segments.Q - self.table.segments.S_T
table_segment_length = self.table.segments.Q - self.table.segments.S_T
self.tachy_inc = self.stretch_inc(table_segment_length)
@AECGSynthesizer.heart_rate.setter
def heart_rate(self, val):
AECGSynthesizer.heart_rate.fset(self, val)
self._recalibrate(heart_rate=val)
def stretch_inc(self, table_segment_length):
# Get the missing samples by subtracting the number of samples
# contributed by the 1Hz table default, minus the segment we
# want to stretch.
tmp_wavelength = self.wavelength - \
(len(self.table) - table_segment_length) / self.inc
tmp_wavelength = (
self.wavelength - (len(self.table) - table_segment_length) / self.inc
)
return table_segment_length / tmp_wavelength

View File

@@ -7,105 +7,242 @@ from pojagi_dsp.channel.ecg import Segments
from pojagi_dsp.channel.ecg.generator.wavetable import ECGWaveTable
# NOTE: larger table required for avoiding aliasing at different srates than 125Hz
sinus_data = [
sinus_data = np.array([
# R-S: 0
2000, 1822, 374,
2000,
1822,
374,
# S-Jp: 3
-474, -271, -28, 18, 66,
-474,
-271,
-28,
18,
66,
# Jp-T: 9
63, 73, 91, 101, 101, 101, 116, 124,
63,
73,
91,
101,
101,
101,
116,
124,
124,
# T: 17
141, 171, 186, 196, 229, 265, 297, 327,
363, 406, 446, 475, 493, 508, 526, 533,
518, 475, 403, 327, 272, 222, 174, 138,
109, 88, 73, 66, 69, 69, 66, 73,
81, 76, 73, 76, 76, 66, 58, 58,
63, 63, 41, 26, 26, 18, 8, 8,
8,
# U: 66 -- not found
# T-P: 66
2, 3, 2, 2, 2, -1, 2, 2,
2, -1, 0, -1, -1, 3, 2, 1,
3, 2, 1, 0, 1,
# P: 87
0, 3, 11, 11, 0, 8, 18, 18,
18, 15, 8, 18, 26, 26, 26, 8,
32, 61, 116, 164, 182, 159, 131, 116,
116, 109, 91, 73, 58, 55, 58, 63,
141,
171,
186,
196,
229,
265,
297,
327,
363,
406,
446,
475,
493,
508,
526,
533,
518,
475,
403,
327,
272,
222,
174,
138,
109,
88,
73,
66,
69,
69,
66,
73,
81,
76,
73,
76,
76,
66,
58,
58,
63,
63,
41,
26,
26,
18,
8,
8,
8,
# U: 66 -- not found
# T-P: 66
2,
3,
2,
2,
2,
-1,
2,
2,
2,
-1,
0,
-1,
-1,
3,
2,
1,
3,
2,
1,
0,
1,
# P: 87
0,
3,
11,
11,
0,
8,
18,
18,
18,
15,
8,
18,
26,
26,
26,
8,
32,
61,
116,
164,
182,
159,
131,
116,
116,
109,
91,
73,
58,
55,
58,
63,
69,
# P-R: 120
48, -14,
48,
-14,
# Q-R: 122
-40, 131, 931,
] # len == 125
-40,
131,
931,
]) # len == 125
tpr = np.arange(-math.floor(120-18)/2, math.ceil(120-18)/2)
tpr_curve: np.ndarray = (tpr ** 2 * -0.1)
tpr_curve = (tpr_curve - tpr_curve[0]) + 141
pr_idx = 120
t_idx = 18
t_pr_idx_diff = pr_idx - t_idx
t_pr = np.arange(-math.floor(t_pr_idx_diff / 2), math.ceil(t_pr_idx_diff / 2))
t_pr_curve: np.ndarray = t_pr**2 * -0.2
t_pr_curve = (t_pr_curve - t_pr_curve[0]) + 141
tachycardia = np.array([ # 58-107 flat
# R-S: 0
2000, 1822, 374,
# S-Jp: 3
-474, -271, -28, 18, 66,
# Jp-T: 9
63, 73, 91, 101, 101, 101, 116, 124,
124,
*tpr_curve,
# P-R: 119
124, 48, -14,
# Q-R: 122
*(np.array([-40, 131, 931]) * 0.25),
]) * (2/3) # len == 125
tachycardia = np.array(
[ # 58-107 flat
# R-S: 0
2000,
1822,
374,
# S-Jp: 3
-474,
-271,
-28,
18,
66,
# Jp-T: 8
63,
73,
91,
101,
101,
101,
116,
124,
124,
*t_pr_curve,
# P-R: 119
124,
48,
-14,
# Q-R: 122
-40,
731,
1231,
]
) # len == 125
def SinusWaveTable(): return ECGWaveTable(
data=sinus_data,
segments=Segments(
S=3,
S_T=9,
T=17,
T_P=66,
P=87,
P_R=120,
Q=122,
),
)
def SinusWaveTable():
return ECGWaveTable(
data=sinus_data,
segments=Segments(
S=3,
S_T=9,
T=17,
T_P=66,
P=87,
P_R=120,
Q=122,
),
)
def TachycardiaWaveTable(): return ECGWaveTable(
data=tachycardia,
segments=Segments(
S=3,
S_T=9,
T=17,
T_P=66,
P=87,
P_R=119,
Q=122,
),
top=2000,
bottom=0,
)
def TachycardiaWaveTable():
return ECGWaveTable(
data=tachycardia,
segments=Segments(
S=3,
S_T=8,
T=17,
T_P=66,
P=87,
P_R=119,
Q=122,
),
# Tachy is weaker than sinus, so we inflate the range here, which
# effectively attenuates the signal by 1/3 (i.e., it is 2/3 the
# original).
top=2000 * (3 / 2),
bottom=0,
)
impulse_data = np.array([1, *([0] * 124)])
def ImpulseWaveTable():
return ECGWaveTable(
data=impulse_data,
segments=Segments(
S=3,
S_T=9,
T=17,
T_P=66,
P=87,
P_R=119,
Q=122,
),
top=1,
bottom=0,
)
if __name__ == "__main__":
from matplotlib import pyplot as plt
plt.plot(range(len(tachycardia.tolist() * 3)), tachycardia.tolist() * 3)
samples = np.tile(SinusWaveTable().data, 3)
plt.plot(range(len(samples)), samples)
plt.show()