Compare commits

...

4 Commits

Author SHA1 Message Date
3b85d58a10 remove debug print 2025-10-03 09:01:41 -04:00
d78b82c9e3 bugfixes, etc 2025-10-03 08:58:45 -04:00
cf033c4b4f q_lock experiment 2025-09-30 09:21:30 -04:00
32c8436713 cleaning up a little 2025-09-28 09:26:07 -04:00
21 changed files with 1133 additions and 1675 deletions

94
functional_tests/sine.py Normal file
View File

@@ -0,0 +1,94 @@
import subprocess
import numpy as np
from pojagi_dsp.channel.generator.sine import SineWave
import sys
import datetime
from itertools import islice
from matplotlib import pyplot as plt
from scipy.io import wavfile
from pojagi_dsp.channel import Constantly
from pojagi_dsp.channel.filter.envelope import Envelope
SRATE = 44100
def test_sawtooth(
fundamental: float,
npartials: int = 10,
seconds: float = 1.0,
):
sine = Constantly(0, srate=SRATE)
for idx in range(1, npartials):
freq = fundamental * idx
amp = 1 / idx
partial = SineWave(freq, synchronize=True)
partial *= amp
sine += partial
sine |= lambda g: (x / 3 for x in g)
sine |= Envelope(
[
(0, 0.0),
(int(SRATE * seconds / 2), 1.0),
(SRATE * seconds, 0.0),
]
)
values = []
for y in sine.of_duration(datetime.timedelta(seconds=seconds)):
values.append(y)
return values
def test_pitchbend(
from_pitch: float,
to_pitch: float,
seconds: float = 1.0,
):
sine = SineWave(hz=from_pitch, srate=SRATE)
sig = sine | Envelope(
[
(0, 0.0),
(int(SRATE * seconds / 2), 1.0),
(SRATE * seconds, 0.0),
]
)
lfo = (SineWave(1/seconds, srate=SRATE) * 30)
lfo += (SineWave(seconds) * 20)
lfo = lfo.stream()
values = []
for idx in range(int(seconds * SRATE)):
values.append(next(sig))
# if idx % 5000 == 0:
sine.hz = from_pitch + next(lfo)
# print(from_pitch + next(lfo))
# import time
# time.sleep(0.001)
return values
def do_test(values: list[float]):
plt.plot(range(len(values)), values)
plt.show()
audio = np.array(values, dtype=np.float32)
wavfile.write("/tmp/output.wav", SRATE, audio)
with subprocess.Popen(["mplayer", "/tmp/output.wav"]) as p:
p.wait()
if __name__ == "__main__":
# do_test(test_sawtooth(55.0))
do_test(test_pitchbend(110.0, 110.0, seconds=10))

View File

@@ -6,23 +6,15 @@ build-backend = "setuptools.build_meta"
name = "pojagi-dsp"
description = "DSP tools for load testing."
urls = { "gitlab" = "https://gitlab.pojagi.org/tjb1982/dsp" }
authors = [
{ name = "Tom Brennan", email = "tjb1982@gmail.com" },
]
authors = [{ name = "Tom Brennan", email = "tjb1982@gmail.com" }]
readme = "README.md"
requires-python = ">=3.8"
classifiers = []
dependencies = [
"pydantic==1.10.2",
"scipy==1.8.1",
# These should be included in the above requirement
# "marshmallow>=3.3.0",
# "marshmallow_dataclass>=7.2.1",
# "marshmallow_oneofschema>=2.0.1",
# "PyYAML>=5.3.1",
"pydantic~=1.10.2",
"scipy~=1.16.2",
]
optional-dependencies = { test = ["pytest"] }
optional-dependencies = { test = ["pytest"], dev = ["matplotlib", "scipy"] }
version = "0.0.0.dev0"
# dynamic = ["version"]

View File

@@ -1,337 +1 @@
import abc
import copy
import datetime
import inspect
from itertools import islice
import logging
import math
import operator
from collections.abc import Iterable
from functools import reduce
import types
from typing import (Any, Callable, Generic, Iterator, Optional, Type, TypeVar,
Union)
logger = logging.getLogger(__name__)
T = TypeVar("T")
class IllegalStateError(ValueError):
...
def coerce_channels(x: Any) -> Iterator["ASignal"]:
if isinstance(x, ASignal):
yield x
else:
if callable(x):
if isinstance(x, Type):
yield x()
else:
yield SignalFunction(x)
elif isinstance(x, Iterable): # and not isinstance(x, str):
for it in (coerce_channels(y) for y in x):
for channel in it:
yield channel
else:
yield Constantly(x)
class ASignalMeta(abc.ABCMeta):
def __or__(self, other: Any) -> "Filter":
"""
Allows `|` composition starting from an uninitialized class.
See doc for `__or__` below in `ASignal`.
"""
return self() | coerce_channels(other)
def __radd__(self, other): return self() + other
def __add__(self, other): return self() + other
def __rmul__(self, other): return self() * other
def __mul__(self, other): return self() * other
class ASignal(Generic[T], metaclass=ASignalMeta):
def __init__(self, srate: Optional[float] = None):
self._srate = srate
self._cursor: Optional[Iterator[T]] = None
@property
def srate(self):
if self._srate is None:
raise IllegalStateError(
f"{self.__class__}: `srate` is None."
)
return self._srate
@srate.setter
def srate(self, val: float): self._srate = val
def __iter__(self):
self._cursor = self.samples()
return self
def __next__(self): return next(self.cursor)
@abc.abstractmethod
def samples(self) -> Iterator[T]: ...
@property
def cursor(self):
"""
An `Iterator` representing the current pipeline in progress.
"""
if self._cursor is None:
# this can only happen once
self._cursor = self.samples()
return self._cursor
def __getstate__(self):
"""
`_cursor` is a generator, and generators aren't picklable.
"""
state = self.__dict__.copy()
if state.get("_cursor"):
del state["_cursor"]
return state
def stream(self):
while True:
try:
yield next(self.cursor)
except StopIteration:
self = iter(self)
def of_duration(self, duration: datetime.timedelta):
"""
Returns an `Iterator` of samples for a particular duration expressed
as a `datetime.timedelta`
:param:`duration` - `datetime.timedelta` representing the duration
"""
return islice(
self.stream(),
0,
math.floor(self.srate * duration.total_seconds()),
)
def __or__(
left,
right: Union["Filter", Callable, Iterable],
) -> "Filter":
"""
Allows composition of filter pipelines with `|` operator.
e.g.,
```
myFooGenerator
| BarFilter
| baz_filter_func
| (lambda reader: (x for x in reader))
```
"""
if isinstance(right, SignalFunction):
return left | FilterFunction(fn=right._fn, name=right.Function)
if not isinstance(right, ASignal):
return reduce(operator.or_, (left, *coerce_channels(right)))
if not isinstance(right, Filter):
raise ValueError(
f"Right side must be a `{Filter.__name__}`; "
f"received: {type(right)}",
)
filter: Filter = right
while getattr(filter, "_reader", None) is not None:
# Assuming this is a filter pipeline, we want the last node's
# reader to be whatever's on the left side of this operation.
filter = filter.reader
if hasattr(filter, "_reader"):
# We hit the "bottom" and found a filter.
filter.reader = left
else:
# We hit the "bottom" and found a non-filter/generator.
raise ValueError(
f"{right.__class__.__name__}: filter pipeline already has a "
"generator."
)
# Will often be `None` unless `left` is a generator.
right.srate = left._srate
return right
def __radd__(right, left): return right.__add__(left)
def __add__(left, right): return left._operator_impl(operator.add, right)
def __rmul__(right, left): return right.__mul__(left)
def __mul__(left, right): return left._operator_impl(operator.mul, right)
# FIXME: other operators? Also, shouldn't `*` mean convolve instead?
def _operator_impl(left, operator: Callable[..., T], right: Any):
channels = list(coerce_channels(right))
for channel in channels:
if channel._srate is None:
channel.srate = left._srate
return Reduce(operator, left, *channels, srate=left._srate)
def __repr__(self):
members = {}
for k in [k for k in dir(self)
if not k.startswith("_")
and not k in {"stream", "reader", "cursor", "wave", }]:
try:
v = getattr(self, k)
if not inspect.isroutine(v):
members[k] = v
except IllegalStateError as e:
members[k] = None
return (
f"{self.__class__.__name__}"
f"""({
f", ".join(
f"{k}={v}"
for k, v in members.items()
)
})"""
)
S = TypeVar("S", bound=ASignal)
class Reduce(ASignal, Generic[S, T]):
def __init__(
self,
# FIXME: typing https://stackoverflow.com/a/67814270
fn: Callable[..., T],
*streams: S,
srate: Optional[float] = None,
stateful=False,
):
super().__init__(srate)
self._fn = fn
self.fn = fn.__name__
self.streams = []
for stream in streams:
if stateful:
self.streams.append(stream)
continue
stream_ = (
copy.deepcopy(stream)
if not isinstance(stream, types.GeneratorType)
else stream
)
stream_.srate = srate
self.streams.append(stream_)
@property
def srate(self): return ASignal.srate.fget(self)
@srate.setter
def srate(self, val: float):
ASignal.srate.fset(self, val)
for stream in self.streams:
if isinstance(stream, ASignal):
stream.srate = val
def samples(self): return (
reduce(self._fn, args)
for args in zip(*self.streams)
)
class Filter(ASignal, Generic[S]):
def __init__(
self,
reader: Optional[S] = None,
srate: Optional[float] = None,
):
super().__init__(srate)
self.reader: Optional[S] = reader
@property
def reader(self) -> S:
"""
The input stream this filter reads.
"""
if not self._reader:
raise IllegalStateError(
f"{self.__class__}: `reader` is None."
)
return self._reader
@reader.setter
def reader(self, val: S):
self._reader = val
if val is not None and self._srate is None:
self.srate = val._srate
@property
def srate(self): return ASignal.srate.fget(self)
@srate.setter
def srate(self, val: float):
ASignal.srate.fset(self, val)
child = getattr(self, "_reader", None)
previous_srate = val
while child is not None:
# Since `srate` is optional at initialization, but required in
# general, we make our best attempt to normalize it for the
# filter pipeline, which should be consistent for most
# applications, by applying it to all children.
if child._srate is None:
child.srate = previous_srate
child: Optional[ASignal] = getattr(child, "_reader", None)
if isinstance(child, ASignal) and child._srate is not None:
previous_srate = child._srate
def samples(self) -> Iterator[T]: return self.reader.samples()
def __repr__(self):
return (
f"{self._reader} | {super().__repr__()}"
)
class FilterFunction(Filter, Generic[T, S]):
def __init__(
self,
fn: Callable[[S], Iterator[T]],
name: Optional[str] = None,
reader: Optional[S] = None,
srate: Optional[float] = None,
):
super().__init__(reader, srate)
self._fn = fn
self.Function = name if name else fn.__name__
def samples(self): return self._fn(self.reader)
class SignalFunction(ASignal, Generic[T]):
def __init__(
self,
fn: Callable[[int], Iterator[T]],
name: Optional[str] = None,
srate: Optional[float] = None,
):
super().__init__(srate)
self._fn = fn
self.Function = name if name else fn.__name__
def samples(self) -> Iterator[T]: return self._fn(self.srate)
class Constantly(ASignal, Generic[T]):
def __init__(self, constant: T, srate: float = 0.0):
super().__init__(srate)
self.constant = constant
def samples(self) -> Iterator[T]:
while True:
yield self.constant
from .signal import *

View File

@@ -68,13 +68,13 @@ class Segments:
class AECGChannel(ASignal[Number]):
@property
@abc.abstractproperty
@abc.abstractmethod
def heart_rate(self) -> float:
"""Frequency of impulses/waves in bpm."""
...
@property
@abc.abstractproperty
@abc.abstractmethod
def wavelength(self) -> int:
"""
The number of samples in a complete impulse/wave cycle.
@@ -83,16 +83,3 @@ class AECGChannel(ASignal[Number]):
heart rate converted from bpm to Hz.
"""
...
@property
@abc.abstractproperty
def segments(self) -> Segments:
"""The analytical segments of the impulse/wave."""
...
@property
def wave(self) -> Iterator[Number]:
"""
Returns an iterator over a single ECG impulse/wave.
"""
return itertools.islice(self, 0, self.wavelength)

View File

@@ -1,266 +1,4 @@
import dataclasses
import logging
import math
from numbers import Number
from typing import Dict, List, Optional, Tuple
import numpy as np
from scipy.interpolate import CubicSpline
from pojagi_dsp.channel.ecg import Segments
from pojagi_dsp.channel.ecg.generator import AECGSynthesizer
logger = logging.getLogger(__name__)
class ECGWaveTable:
"""
This type of wavetable is designed around the P and R. By
convention, R will always be equal to 1, and the baseline (P) will always
be 0. (That doesn't mean, however, that the other values can't cross these
boundaries. E.g., Q and S are often negative.)
"""
def __init__(
self,
data: List[Number],
segments: Segments,
bottom: Optional[Number] = None,
top: Optional[Number] = None,
table_length: int = 1 << 11, # 2048
):
"""
The table size is increased to `table_length` upon initialization
using linear interpolation (via `numpy.interp`).
"""
if len(data) == table_length:
self.data = np.array(data)
else:
# We generate a larger table for use with linear interpolation,
# trading time (CPU) for memory (table size).
#
# Here we use cubic spline interpolation instead of linear for
# wavetable construction, since it usually only happens once at
# startup, and should provide a much better quality table from
# limited data, making it possible to work with small, manually
# composed tables that we JIT convert to the larger table.
### FIXME: use the sinc function instead:
### f(x) = sin(x)/x where x =/= 0 and f(x) = 1 if x = 0
### you have to apply this scaled to each sample in the table and
### then add all of the resulting signals together.
### I think this is the same as summing the dft of each impulse
### as if the impulse is a member of a larger table.
cs = CubicSpline(
range(len(data)),
data,
bc_type="natural",
)
self.data = np.array([
cs(x) for x in np.linspace(0, len(data), table_length)
])
self.segments = Segments(
**{
k: int(v / (len(data) / table_length)) if v else v
for k, v in (
(f.name, getattr(segments, f.name))
for f in dataclasses.fields(segments)
)
},
)
# NOTE: these are not the data min/max, but the normal min/max, either
# provided as kwargs, or derived from P and R segment starts, by
# convention.
bottom = bottom if bottom is not None else data[segments.P]
top = top if top is not None else data[segments.R]
if not (0 == bottom and 1 == top):
# Normalize between 0 and 1:
self.data = (self.data - bottom) / (top - bottom)
def __getitem__(self, k): return self.data[k]
def __len__(self): return len(self.data) # O(1)
def linear_interpolation(
self,
index: float,
floor: Optional[int] = None,
ceiling: Optional[int] = None,
) -> float:
"""
Handles the situation where the floor would produce duplicate values,
which makes the waveform chunky with aliasing; instead, we obtain a
value weighted between the floor/ceiling, trading time (CPU) for
memory (table size).
"""
dl = len(self.data)
floor = floor if floor is not None else math.floor(index) % dl
ceiling = ceiling if ceiling is not None else (floor + 1) % dl
# e.g., a. 124.75 - 124 == 0.75
# b. 123 - 123 == 0 (no weight goes to ceiling)
ceiling_weight = index - floor
# e.g., a. 1 - 0.75 == 0.25
# b. 1 - 0 == 1 (all weight goes to floor)
floor_weight = 1 - ceiling_weight
return (
self[floor] * floor_weight + self[ceiling] * ceiling_weight
)
def merge(
self,
other: "ECGWaveTable",
weight: float,
):
self_weight = 1 - weight
return ECGWaveTable(
data=(self.data * self_weight + other.data * weight),
segments=self.segments.merge(other.segments, weight),
top=1,
bottom=0,
)
class ECGWaveTableSynthesizer(AECGSynthesizer):
def __init__(
self, /,
tables: Dict[Tuple[float, float], ECGWaveTable],
heart_rate: int,
srate: Optional[float] = None,
):
self.inc: float = 0.0
self.tables = tables
super().__init__(heart_rate, srate)
self._segments: Segments = None
def samples(self):
index: float = 0.0
inc: float = None
n: int = 0
while True:
if n > 0:
floor = math.floor(index)
else:
# `n` cannot be negative
floor = 0
index = 0.0
# NOTE: it's important that this `n` calculation occur before
# `yield` because the `heart_rate` (and, hence, the `wavelength`)
# may change in the meantime, which affects resetting to zero with
# the modulo operation below.
n = (n + 1) % self.wavelength
yield self.table.linear_interpolation(index, floor=floor)
if (self.heart_rate < 60
and
self.table.segments.T_P <= floor < self.table.segments.P):
inc = self.brady_inc
elif (self.heart_rate > 60
and
self.table.segments.T <= floor < self.table.segments.Q):
# FIXME: this is probably only good below a certain
# `heart_rate` threshold.
inc = self.tachy_inc
else:
inc = None
index += inc if inc is not None else self.inc
index %= len(self.table)
def _recalibrate(self, heart_rate: float) -> None:
table_matches = {
k: v
for k, v in self.tables.items()
if k[0] <= heart_rate < k[1]
}
if not table_matches:
raise ValueError(
f"No table found corresponding to heart rate: {heart_rate}."
)
# Since we may have more than two tables that match, we loop
# through all the matches, applying them in key order.
keys = iter(sorted(table_matches))
key = next(keys)
table = table_matches[key]
for next_key in keys:
next_table = table_matches[next_key]
if next_key[1] < key[1]:
# `next_key` is fully contained within `key`
floor, ceiling = next_key
next_weight = (heart_rate - floor) / (ceiling - floor)
weight = 1 - next_weight
if (heart_rate - floor) > ((ceiling - floor) / 2):
# Weights form an "X" shape; i.e., crossfade to 50%
# and back.
weight, next_weight = next_weight, weight
else:
floor = next_key[0] # i.e., the bottom of the top
ceiling = key[1] # i.e., the top of the bottom
next_weight = (heart_rate - floor) / (ceiling - floor)
table = table.merge(next_table, next_weight)
key = next_key
self.table = table
# ECG Tables are designed for 1Hz, and as a default, we don't want to
# stretch anything; hence, no reference to `self.heart_rate` here,
# instead constant 60:
self.inc = len(self.table) / (self.srate * (60 / 60))
# Stretch only the T_P segment to compensate, rather than
# stretching the whole wave.
table_segment_length = \
self.table.segments.P - self.table.segments.T_P
self.brady_inc = self.stretch_inc(table_segment_length)
# Preserve QRS-J-point; compress Jp-Q to compensate.
table_segment_length = \
self.table.segments.Q - self.table.segments.S_T
self.tachy_inc = self.stretch_inc(table_segment_length)
@AECGSynthesizer.heart_rate.setter
def heart_rate(self, val):
AECGSynthesizer.heart_rate.fset(self, val)
self._recalibrate(heart_rate=val)
def stretch_inc(self, table_segment_length):
# Get the missing samples by subtracting the number of samples
# contributed by the 1Hz table default, minus the segment we
# want to stretch.
tmp_wavelength = self.wavelength - \
(len(self.table) - table_segment_length) / self.inc
return table_segment_length / tmp_wavelength
@property
def segments(self):
if self._segments:
return self._segments
table_length = len(self.table)
table_segments = self.table.segments
# FIXME: this is a lie since we stretch T_P, etc.
self._segments = Segments(
**{
k: math.floor(v * self.srate / table_length) if v else v
for k, v in [
(f.name, getattr(table_segments, f.name))
for f in dataclasses.fields(table_segments)
]
}
)
return self._segments
from pojagi_dsp.channel.ecg.generator.wavetable.synthesizer import (
ECGWaveTableSynthesizer,
)
from pojagi_dsp.channel.ecg.generator.wavetable.wavetable import ECGWaveTable

View File

@@ -1,83 +1,204 @@
import math
import numpy as np
from numbers import Number
import random
from typing import List
from pojagi_dsp.channel.ecg import Segments
from pojagi_dsp.channel.ecg.generator.wavetable import ECGWaveTable
# NOTE: larger table required for avoiding aliasing at different srates than 125Hz
sinus_data = [
# R-S: 0
2000, 1822, 374,
# S-Jp: 3
-474, -271, -28, 18, 66,
# Jp-T: 9
63, 73, 91, 101, 101, 101, 116, 124,
124,
# T: 17
141, 171, 186, 196, 229, 265, 297, 327,
363, 406, 446, 475, 493, 508, 526, 533,
518, 475, 403, 327, 272, 222, 174, 138,
109, 88, 73, 66, 69, 69, 66, 73,
81, 76, 73, 76, 76, 66, 58, 58,
63, 63, 41, 26, 26, 18, 8, 8,
8,
# U: 66 -- not found
# T-P: 66
2, 3, 2, 2, 2, -1, 2, 2,
2, -1, 0, -1, -1, 3, 2, 1,
3, 2, 1, 0, 1,
# P: 87
0, 3, 11, 11, 0, 8, 18, 18,
18, 15, 8, 18, 26, 26, 26, 8,
32, 61, 116, 164, 182, 159, 131, 116,
116, 109, 91, 73, 58, 55, 58, 63,
69,
# P-R: 120
48, -14,
# Q-R: 122
-40, 131, 931,
] # len == 125
sinus_data = np.array(
[
# R-S: 0
2000,
1822,
374,
# S-Jp: 3
-474,
-271,
-28,
18,
66,
# Jp-T: 9
63,
73,
91,
101,
101,
101,
116,
124,
124,
# T: 17
141,
171,
186,
196,
229,
265,
297,
327,
363,
406,
446,
475,
493,
508,
526,
533,
518,
475,
403,
327,
272,
222,
174,
138,
109,
88,
73,
66,
69,
69,
66,
73,
81,
76,
73,
76,
76,
66,
58,
58,
63,
63,
41,
26,
26,
18,
8,
8,
8,
# U: 66 -- not found
# T-P: 66
2,
3,
2,
2,
2,
-1,
2,
2,
2,
-1,
0,
-1,
-1,
3,
2,
1,
3,
2,
1,
0,
1,
# P: 87
0,
3,
11,
11,
0,
8,
18,
18,
18,
15,
8,
18,
26,
26,
26,
8,
32,
61,
116,
164,
182,
159,
131,
116,
116,
109,
91,
73,
58,
55,
58,
63,
69,
# P-R: 120
48,
-14,
# Q-R: 122
-40,
131,
931,
]
) # len == 125
tpr = np.arange(-math.floor(120-18)/2, math.ceil(120-18)/2)
tpr_curve: np.ndarray = (tpr ** 2 * -0.1)
tpr_curve = (tpr_curve - tpr_curve[0]) + 141
def parabolic_curve(t_idx, pr_idx):
"""Calculates a smooth, physiologically mimetic curve for the
T-P-R segment of the ECG waveform.
"""
# Compute the difference in indices to determine the segment length
t_pr_idx_diff = pr_idx - t_idx
tachycardia = np.array([ # 58-107 flat
# R-S: 0
2000, 1822, 374,
# S-Jp: 3
-474, -271, -28, 18, 66,
# Jp-T: 9
63, 73, 91, 101, 101, 101, 116, 124,
124,
# Generate a symmetric range of values centered around zero for the segment
t_pr = np.arange(-math.floor(t_pr_idx_diff / 2), math.ceil(t_pr_idx_diff / 2))
*tpr_curve,
# P-R: 119
124, 48, -14,
# Apply a parabolic transformation to create a smooth transition
t_pr_curve: np.ndarray = t_pr**2 * -0.25
# Q-R: 122
*(np.array([-40, 131, 931]) * 0.25),
]) * (2/3) # len == 125
# Normalize the curve so it starts at the T wave amplitude (141)
return t_pr_curve - t_pr_curve[0]
def SinusWaveTable(): return ECGWaveTable(
data=sinus_data,
segments=Segments(
tachycardia = np.array(
[ # 58-107 flat
# R-S: 0
2000,
1822,
374,
# S-Jp: 3
-474,
-271,
-28,
18,
66,
# Jp-T: 8
63,
73,
91,
101,
101,
101,
116,
124,
124,
# T: 17
*parabolic_curve(17, 119) + 141,
# P-R: 120
124,
48,
-14,
# Q-R: 122
-40,
731,
1231,
]
) # len == 125
def SinusWaveTable():
segments = Segments(
S=3,
S_T=9,
T=17,
@@ -85,20 +206,77 @@ def SinusWaveTable(): return ECGWaveTable(
P=87,
P_R=120,
Q=122,
),
)
)
def TachycardiaWaveTable(): return ECGWaveTable(
data=tachycardia,
segments=Segments(
return ECGWaveTable(
data=sinus_data,
segments=segments,
)
def TachycardiaWaveTable():
segments = Segments(
S=3,
S_T=9,
S_T=8,
T=17,
T_P=66,
P=87,
P_R=119,
Q=122,
),
top=2000,
bottom=0,
)
)
return ECGWaveTable(
data=tachycardia,
segments=segments,
# Tachy is weaker than sinus, so we inflate the range here by 3/2,
# which effectively attenuates the signal by 1/3 (i.e., it is 2/3 of
# the amplitude of the data definition).
top=2000 * (3 / 2),
bottom=0,
)
def FastTachycardiaWaveTable():
segments = Segments(
S=3,
S_T=8,
T=17,
T_P=66,
P=87,
P_R=119,
Q=122,
)
return ECGWaveTable(
data=np.arange(-50, 51) ** 11 / 1e19,
segments=segments,
tachy_compress=("R", "R"),
top=2,
bottom=0,
)
def ImpulseWaveTable():
return ECGWaveTable(
data=np.array([1, *([0] * 124)]),
segments=Segments(
S=3,
S_T=9,
T=17,
T_P=66,
P=87,
P_R=119,
Q=122,
),
top=1,
bottom=0,
)
if __name__ == "__main__":
from matplotlib import pyplot as plt
samples = np.tile(FastTachycardiaWaveTable().data, 3)
plt.plot(range(len(samples)), samples)
plt.show()

View File

@@ -0,0 +1,163 @@
import numpy as np
from pojagi_dsp.channel.ecg.generator import AECGSynthesizer
from pojagi_dsp.channel.ecg.generator.wavetable.wavetable import (
ECGWaveTable,
)
class ECGWaveTableSynthesizer(AECGSynthesizer):
def __init__(
self,
/,
tables: dict[tuple[float, float], ECGWaveTable],
heart_rate: int,
srate: float | None = None,
):
super().__init__(heart_rate, srate)
self.inc: float = 0.0
self.tables = tables
self.table: ECGWaveTable | None = None
self.phase: float = 0.0
self.brady_start: int = 0
self.brady_end: int = 0
self.brady_inc: float = 0.0
self.tachy_start: int = 0
self.tachy_end: int = 0
self.tachy_inc: float = 0.0
def samples(self):
inc: float = None
idx: int = 0
heart_rate = self.heart_rate
self._calibrate()
while idx < self.wavelength:
phase = self.phase
floor = np.floor(phase)
yield self.table.linear_interpolation(
phase, floor=floor
)
if heart_rate < 60 and (
self.brady_start <= phase < self.brady_end
):
inc = self.brady_inc
if phase + inc > self.brady_end:
inc = self.brady_end - phase
elif heart_rate > 60 and (
self.tachy_start <= phase < self.tachy_end
):
# FIXME: this might only good below a certain `heart_rate`
# threshold, because at some high frequency, even the QRS
# complex will not have enough room to complete.
inc = self.tachy_inc
if phase + inc > self.tachy_end:
inc = self.tachy_end - phase
else:
inc = None
phase += inc if inc is not None else self.inc
if phase > len(self.table):
phase = 0.0
self.phase = phase
idx += 1
@AECGSynthesizer.heart_rate.setter
def heart_rate(self, val):
AECGSynthesizer.heart_rate.fset(self, val)
print("setter", hex(id(self)))
def _calibrate(self):
heart_rate = self.heart_rate
table_matches = {
k: v
for k, v in self.tables.items()
if k[0] <= heart_rate < k[1]
}
if not table_matches:
raise ValueError(
f"No table found corresponding to heart rate: {heart_rate}."
)
# Since we may have more than two tables that match, we loop
# through all the matches, applying them in key order.
keys = iter(sorted(table_matches))
key = next(keys)
table = table_matches[key]
for next_key in keys:
next_table = table_matches[next_key]
if next_key[1] < key[1]:
# `next_key` is fully contained within `key`
floor, ceiling = next_key
next_weight = (heart_rate - floor) / (
ceiling - floor
)
weight = 1 - next_weight
if (heart_rate - floor) > (
(ceiling - floor) / 2
):
# Weights form an "X" shape; i.e., crossfade to 50%
# and back.
weight, next_weight = next_weight, weight
else:
floor = next_key[
0
] # i.e., the bottom of the top
ceiling = key[1] # i.e., the top of the bottom
next_weight = (heart_rate - floor) / (
ceiling - floor
)
table = table.merge(next_table, next_weight)
key = next_key
self.table = table
# ECG Tables are designed for 1Hz, and as a default, we don't want to
# stretch anything; hence, no reference to `self.heart_rate` here,
# instead constant 60:
self.inc = len(self.table) / (self.srate * (60 / 60))
self.brady_start, self.brady_end = (
getattr(self.table.segments, x)
for x in self.table.brady_stretch
)
if self.table.brady_stretch == "R":
self.brady_end == len(self.table) - 1
# Stretch only the T_P segment to compensate, rather than
# stretching the whole wave.
table_segment_length = self.brady_end - self.brady_start
self.brady_inc = self.stretch_inc(table_segment_length)
self.tachy_start, self.tachy_end = (
getattr(self.table.segments, x)
for x in self.table.tachy_compress
)
if self.table.tachy_compress[1] == "R":
self.tachy_end = len(self.table) - 1
# Preserve QRS-J-point; compress Jp-Q to compensate.
table_segment_length = self.tachy_end - self.tachy_start
self.tachy_inc = self.stretch_inc(table_segment_length)
def stretch_inc(self, table_segment_length: int) -> float:
# Get the missing samples by subtracting the number of samples
# contributed by the 1Hz table default, minus the segment we
# want to stretch.
tmp_wavelength = (
self.wavelength
- (len(self.table) - table_segment_length) / self.inc
)
return table_segment_length / tmp_wavelength

View File

@@ -0,0 +1,182 @@
import dataclasses
from numbers import Number
import numpy as np
from scipy.interpolate import CubicSpline
from pojagi_dsp.channel.ecg import Segments
class ECGWaveTable:
"""
This type of wavetable is designed around the P and R. By
convention, R will always be equal to 1, and the baseline (P) will always
be 0. (That doesn't mean, however, that the other values can't cross these
boundaries. E.g., Q and S are often negative.)
"""
def __init__(
self,
/,
data: list[Number],
segments: Segments,
brady_stretch: tuple[str, str] | None = None,
tachy_compress: tuple[str, str] | None = None,
bottom: Number | None = None,
top: Number | None = None,
table_length: int = (1 << 10) * 2,
):
"""
Initialize an ECGWaveTable.
Args:
data (List[Number]): The raw waveform data points for one cardiac cycle.
segments (Segments): Segment indices (e.g., P, Q, R, S, T) marking key features in the waveform.
brady_stretch (tuple[str, str] | None): Segment interval to stretch for bradycardia (slow heart rate).
tachy_compress (tuple[str, str] | None): Segment interval to compress for tachycardia (fast heart rate).
bottom (Optional[Number]): Value to use as the baseline (P segment) for normalization. If None, uses data[segments.P].
top (Optional[Number]): Value to use as the peak (R segment) for normalization. If None, uses data[segments.R].
table_length (int): Number of samples in the expanded wavetable (default: 2048).
"""
if len(data) == table_length:
self.data = np.array(data)
else:
# We generate a larger table for use with linear interpolation,
# trading time (CPU) for memory (table size).
#
# Here we use cubic spline interpolation instead of linear for
# wavetable construction, since it usually only happens once at
# startup, and should provide a much better quality table from
# limited data, making it possible to work with small, manually
# composed tables that we JIT convert to the larger table.
### FIXME: use the sinc function instead:
### f(x) = sin(x)/x where x =/= 0 and f(x) = 1 if x = 0
### you have to apply this scaled to each sample in the table and
### then add all of the resulting signals together.
### I think this is the same as summing the dft of each impulse
### as if the impulse is a member of a larger table.
cs = CubicSpline(
range(len(data)),
data,
bc_type="natural",
)
self.data = np.array(
[
cs(x)
for x in np.linspace(
0, len(data), table_length
)
]
)
# Scale the declared segments to the table_length
self.segments = Segments(
**{
k: (
int(v / (len(data) / table_length))
if v
else v
)
for k, v in (
(f.name, getattr(segments, f.name))
for f in dataclasses.fields(segments)
)
},
)
# NOTE: these are not the data min/max, but the normal min/max, either
# provided as kwargs, or derived from P and R segment starts, by
# convention.
bottom = (
bottom if bottom is not None else data[segments.P]
)
top = top if top is not None else data[segments.R]
if not (0 == bottom and 1 == top):
# Normalize between 0 and 1:
self.data = (self.data - bottom) / (top - bottom)
self.brady_stretch = (
brady_stretch
if brady_stretch is not None
else ("S_T", "P")
)
self.tachy_compress = (
tachy_compress
if tachy_compress is not None
else ("S_T", "Q")
)
def __getitem__(self, k):
return self.data[k]
def __len__(self):
return len(self.data) # O(1)
def linear_interpolation(
self,
index: float,
floor: Number | None = None,
ceiling: Number | None = None,
) -> float:
"""
Returns a smoothly interpolated value from the wavetable at a fractional index.
Instead of returning discrete values (which can cause aliasing and a "chunky" waveform),
this method linearly interpolates between the nearest lower (floor) and upper (ceiling)
indices, weighted by their distance from the requested index. This improves waveform
smoothness and reduces artifacts when sampling at arbitrary positions.
Args:
index (float): The fractional index to sample.
floor (Optional[Number]): Override for the lower index (default: floor of index).
ceiling (Optional[Number]): Override for the upper index (default: floor + 1).
Returns:
float: The interpolated value at the given index.
"""
dl = len(self.data)
floor = (
floor if floor is not None else np.floor(index) % dl
)
ceiling = (
ceiling if ceiling is not None else (floor + 1) % dl
)
# e.g., a. 124.75 - 124 == 0.75
# b. 123 - 123 == 0 (no weight goes to ceiling)
ceiling_weight = index - floor
# e.g., a. 1 - 0.75 == 0.25
# b. 1 - 0 == 1 (all weight goes to floor)
floor_weight = 1 - ceiling_weight
return (
self[int(floor)] * floor_weight
+ self[int(ceiling)] * ceiling_weight
)
def merge(
self,
other: "ECGWaveTable",
weight: float,
):
self_weight = 1 - weight
return ECGWaveTable(
data=(self.data * self_weight + other.data * weight),
segments=self.segments.merge(other.segments, weight),
brady_stretch=(
other.brady_stretch
if self_weight < 0.5
else self.brady_stretch
),
tachy_compress=(
other.tachy_compress
if self_weight < 0.5
else self.tachy_compress
),
top=1,
bottom=0,
)

View File

@@ -0,0 +1,37 @@
from typing import Optional
from pojagi_dsp.channel import Filter
class Envelope(Filter[float]):
def __init__(
self,
checkpoints: list[tuple[int, float]],
*,
srate=None,
reader=None,
):
super().__init__(reader, srate)
self.checkpoints = sorted(checkpoints)
def samples(self):
checkpoints = self.checkpoints
idx = 0
try:
n = 0
while True:
# Find current segment
while idx + 1 < len(checkpoints) and n >= checkpoints[idx + 1][0]:
idx += 1
if idx + 1 < len(checkpoints):
start, dest = checkpoints[idx]
end, end_dest = checkpoints[idx + 1]
t = (n - start) / (end - start) if end > start else 0.0
value = dest + (end_dest - dest) * t
else:
value = checkpoints[-1][1]
yield next(self.reader.stream()) * value
n += 1
except StopIteration:
...

View File

@@ -12,14 +12,15 @@ class SineWave(ASignal[float]):
self,
hz: float,
phase: float = 0.0, # radians
srate: Optional[float] = None
srate: Optional[float] = None,
):
super().__init__(srate)
self.hz = hz
self.phase = phase
@property
def wavelength(self): return self.srate/self.hz
def wavelength(self):
return self.srate / self.hz
def samples(self):
"""An iterator over one period."""
@@ -27,7 +28,7 @@ class SineWave(ASignal[float]):
self.phase %= _2_PI
while self.phase < _2_PI:
inc = (_2_PI * self.hz)/self.srate
inc = (_2_PI * self.hz) / self.srate
yield math.sin(self.phase)
self.phase += inc
@@ -44,7 +45,9 @@ if __name__ == "__main__":
# for _ in range(10):
# values += list(sine)
for y in sine.of_duration(datetime.timedelta(milliseconds=10)):
for y in sine.of_duration(
datetime.timedelta(milliseconds=10)
):
values.append(y)
plt.plot(range(len(values)), values)

View File

@@ -0,0 +1,361 @@
import abc
import copy
import datetime
import inspect
import logging
import math
import operator
import types
from collections.abc import Iterable
from functools import reduce
from itertools import islice
from typing import Any, Callable, Generic, Iterator, Optional, Type, TypeVar, Union
logger = logging.getLogger(__name__)
T = TypeVar("T")
class IllegalStateException(ValueError): ...
def coerce_channels(x: Any) -> Iterator["ASignal"]:
if isinstance(x, ASignal):
yield x
else:
if callable(x):
if isinstance(x, Type):
yield x()
else:
yield SignalFunction(x)
elif isinstance(x, Iterable): # and not isinstance(x, str):
for it in (coerce_channels(y) for y in x):
for channel in it:
yield channel
else:
yield Constantly(x)
class ASignalMeta(abc.ABCMeta):
def __or__(self, other: Any) -> "Filter":
"""
Allows `|` composition starting from an uninitialized class.
See doc for `__or__` below in `ASignal`.
"""
return self() | coerce_channels(other)
def __radd__(self, other):
return self() + other
def __add__(self, other):
return self() + other
def __rmul__(self, other):
return self() * other
def __mul__(self, other):
return self() * other
class ASignal(Generic[T], metaclass=ASignalMeta):
def __init__(self, srate: Optional[float] = None):
self._srate = srate
self._cursor: Optional[Iterator[T]] = None
@property
def srate(self):
if self._srate is None:
raise IllegalStateException(f"{self.__class__}: `srate` is None.")
return self._srate
@srate.setter
def srate(self, val: float):
self._srate = val
def __iter__(self):
self._cursor = self.samples()
return self
def __next__(self):
return next(self.cursor)
@abc.abstractmethod
def samples(self) -> Iterator[T]: ...
@property
def cursor(self):
"""
An `Iterator` representing the current pipeline in progress.
"""
if self._cursor is None:
# this can only happen once
self._cursor = self.samples()
return self._cursor
def __getstate__(self):
"""
`_cursor` is a generator, and generators aren't picklable.
"""
state = self.__dict__.copy()
if state.get("_cursor"):
del state["_cursor"]
return state
def stream(self):
while True:
try:
yield next(self.cursor)
except StopIteration:
self = iter(self)
def of_duration(self, duration: datetime.timedelta):
"""
Returns an `Iterator` of samples for a particular duration expressed
as a `datetime.timedelta`
:param:`duration` - `datetime.timedelta` representing the duration
"""
return islice(
self.stream(),
0,
math.floor(self.srate * duration.total_seconds()),
)
def __or__(
left,
right: Union["Filter", Callable, Iterable],
) -> "Filter":
"""
Allows composition of filter pipelines with `|` operator.
e.g.,
```
myFooGenerator
| BarFilter
| baz_filter_func
| (lambda reader: (x for x in reader))
```
"""
if isinstance(right, SignalFunction):
return left | FilterFunction(fn=right._fn, name=right.Function)
if not isinstance(right, ASignal):
return reduce(operator.or_, (left, *coerce_channels(right)))
if not isinstance(right, Filter):
raise ValueError(
f"Right side must be a `{Filter.__name__}`; "
f"received: {type(right)}",
)
filter: Filter = right
while getattr(filter, "_reader", None) is not None:
# Assuming this is a filter pipeline, we want the last node's
# reader to be whatever's on the left side of this operation.
filter = filter.reader
if hasattr(filter, "_reader"):
# We hit the "bottom" and found a filter.
filter.reader = left
else:
# We hit the "bottom" and found a non-filter/generator.
raise ValueError(
f"{right.__class__.__name__}: filter pipeline already has a "
"generator."
)
# Will often be `None` unless `left` is a generator.
right.srate = left._srate
return right
def __radd__(right, left):
return right.__add__(left)
def __add__(left, right):
return left._operator_impl(operator.add, right)
def __rmul__(right, left):
return right.__mul__(left)
def __mul__(left, right):
return left._operator_impl(operator.mul, right)
# FIXME: other operators? Also, shouldn't `*` mean convolve instead?
def _operator_impl(left, operator: Callable[..., T], right: Any):
channels = list(coerce_channels(right))
for channel in channels:
if channel._srate is None:
channel.srate = left._srate
return Reduce(operator, left, *channels, srate=left._srate)
def __repr__(self):
members = {}
for k in [
k
for k in dir(self)
if not k.startswith("_")
and not k
in {
"stream",
"reader",
"cursor",
"wave",
}
]:
try:
v = getattr(self, k)
if not inspect.isroutine(v):
members[k] = v
except IllegalStateException as e:
members[k] = None
return (
f"{self.__class__.__name__}"
f"""({
f", ".join(
f"{k}={v}"
for k, v in members.items()
)
})"""
)
S = TypeVar("S", bound=ASignal)
class Reduce(ASignal, Generic[S, T]):
def __init__(
self,
# FIXME: typing https://stackoverflow.com/a/67814270
fn: Callable[..., T],
*streams: S,
srate: Optional[float] = None,
stateful=True,
):
super().__init__(srate)
self._fn = fn
self.fn = fn.__name__
self.streams = []
for stream in streams:
if stateful:
self.streams.append(stream)
continue
stream_ = (
copy.deepcopy(stream)
if not isinstance(stream, types.GeneratorType)
else stream
)
stream_.srate = srate
self.streams.append(stream_)
@property
def srate(self):
return ASignal.srate.fget(self)
@srate.setter
def srate(self, val: float):
ASignal.srate.fset(self, val)
for stream in self.streams:
if isinstance(stream, ASignal):
stream.srate = val
def samples(self):
return (reduce(self._fn, args) for args in zip(*self.streams))
class Filter(ASignal, Generic[S]):
def __init__(
self,
reader: Optional[S] = None,
srate: Optional[float] = None,
):
super().__init__(srate)
self.reader: Optional[S] = reader
@property
def reader(self) -> S:
"""
The input stream this filter reads.
"""
if not self._reader:
raise IllegalStateException(f"{self.__class__}: `reader` is None.")
return self._reader
@reader.setter
def reader(self, val: S):
self._reader = val
if val is not None and self._srate is None:
self.srate = val._srate
@property
def srate(self):
return ASignal.srate.fget(self)
@srate.setter
def srate(self, val: float):
ASignal.srate.fset(self, val)
child = getattr(self, "_reader", None)
previous_srate = val
while child is not None:
# Since `srate` is optional at initialization, but required in
# general, we make our best attempt to normalize it for the
# filter pipeline, which should be consistent for most
# applications, by applying it to all children.
if child._srate is None:
child.srate = previous_srate
child: Optional[ASignal] = getattr(child, "_reader", None)
if isinstance(child, ASignal) and child._srate is not None:
previous_srate = child._srate
def samples(self) -> Iterator[T]:
"""The below is a default implementation, but this is meant to be
overrided.
"""
return self.reader.samples()
def __repr__(self):
return f"{self._reader} | {super().__repr__()}"
class FilterFunction(Filter, Generic[T, S]):
def __init__(
self,
fn: Callable[[S], Iterator[T]],
name: Optional[str] = None,
reader: Optional[S] = None,
srate: Optional[float] = None,
):
super().__init__(reader, srate)
self._fn = fn
self.Function = name if name else fn.__name__
def samples(self):
return self._fn(self.reader)
class SignalFunction(ASignal, Generic[T]):
def __init__(
self,
fn: Callable[[int], Iterator[T]],
name: Optional[str] = None,
srate: Optional[float] = None,
):
super().__init__(srate)
self._fn = fn
self.Function = name if name else fn.__name__
def samples(self) -> Iterator[T]:
return self._fn(self.srate)
class Constantly(ASignal, Generic[T]):
def __init__(self, constant: T, srate: float = 0.0):
super().__init__(srate)
self.constant = constant
def samples(self) -> Iterator[T]:
while True:
yield self.constant

View File

@@ -1,295 +0,0 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
import logging
from typing import Iterator, List, Literal, Optional, Type, Union
from physiq_cloud.series_frame import fb_builder
from physiq_cloud.series_frame.fb_wrapper import FlatBufferWrapper
from physiq_cloud.time import Instant, TimeUnit
from physiqfb.SeriesFrame import SeriesFrame
from physiqfb.SeriesFrameHolderList import SeriesFrameHolderList
from pydantic import BaseModel
logger = logging.getLogger(__name__)
class SamplingSetInfo(BaseModel):
id: int
alias: str
timestamped: bool
# FIXME: not in the INFO.yaml, but isn't this just the `frame_size_micros`
# divided by the number of values in the channel data? We could either get
# it from the first frame, or look it up in the API/repo.
# frequency: int
class SftInfo(BaseModel):
alias: Optional[str]
frame_size_micros: int
max_frame_size_bytes: int
sampling_sets: List[SamplingSetInfo]
@dataclass(frozen=True)
class SeriesFramePackage:
frame: FlatBufferWrapper[SeriesFrame]
sft_info: SftInfo
def relative_instant(self, offset=0):
return Instant.from_unix(
TimeUnit.MICROSECONDS,
(self.frame.fb.FrameId() + offset)
* self.sft_info.frame_size_micros,
)
@property
def start(self):
return self.relative_instant(0)
@property
def end(self):
return self.relative_instant(1)
class ASeriesFrameEmitter(ABC):
"""
Abstract class defining the characteristics of a series frame emitter,
which is designed to be subclassed by all generators and filters.
A "generator" is a type of emitter that produces frames from "nothing";
i.e., by some external process, such as reading a file, an API, etc.; or
by generating frames algorithmically.
A "filter" is any emitter that takes another emitter and affects its
:meth:`frames` in some arbitrary way. This terminology is borrowed from
the digital signal processing domain, and doesn't imply that the signal
will be reduced or shortened, etc. In fact the signal coming from the
injected emitter might be lengthened or amplified by a filter to any
extent, including the maximum or infinity, etc.
This class defines an abstract property called `frames` that should
trigger the pipeline to begin and return an `Iterator` of
:class:`SeriesFramePackage`.
These emitters are designed to be chained together with filters. E.g., a
typical scenario would be to start with some generator, and chain it
together with one or more filters:
```
my_generator = SomeGenerator()
my_first_filter = SomeFilter(reader=my_generator)
my_second_filter = SomeOtherFilter(reader=my_first_filter)
for frames in my_second_filter.frames: ...
```
Alternatively, the `__or__` method has been overloaded so that you can
also do something equivalent to the above like this:
```
for frame in (
SomeGenerator()
| SomeFilter
| SomeOtherFilter
): ...
```
NOTE: Accessing `frames` may trigger side effects; it starts a chain of
calls to the reader accessors in the pipeline, which restarts the pipeline
anew each time. Use :meth:`cursor` to access the existing `Iterator` if
you need multiple accesses.
i.e., calling
```
next(reader.cursor)
```
is different than
```
next(reader.frames())
```
or
```
next(reader.stream())
```
in that calling `stream` is like making a database query (that returns and
caches a cursor), and `cursor` is just providing access to the cached
cursor, while calling `frames` directly provides an Iterator without
caching a cursor.
That said, :meth:`cursor` is empty until :meth:`stream` is called for the
first time. (It requires the initial "query" that :meth:`frames` provides.)
"""
def __init__(self, sft_info: Optional[SftInfo] = None) -> None:
"""
:param:`sft_info` - an :class:`SftInfo` describing the series frame
type this emitter emits.
"""
super().__init__()
self._sft_info = sft_info
@property
def sft_info(self):
if not self._sft_info:
raise RuntimeError(
"Illegal state: `_sft_info` is None."
)
return self._sft_info
@sft_info.setter
def sft_info(self, val):
self._sft_info = val
def __iter__(self): return self.stream()
@abstractmethod
def frames(self) -> Iterator[SeriesFramePackage]: ...
@property
def cursor(self):
"""
An `Iterator` representing the current pipeline in progress.
"""
return self._cursor
def stream(self):
"""
Start the pipeline and return an :class:`Iterator` of
:class:`SeriesFramePackage`.
Each time :meth:`frames` is accessed, a new `Iterator` is instantiated by
accessing the `reader`'s `frames`, which may trigger side effects. It
will also start the process pipeline over, so if you need to iterate
over the frames with multiple calls within a subclass of this, you should
use :meth:`cursor` instead, which returns the same object returned from this
method, without restarting the pipeline.
"""
self._cursor = self.frames()
return self.cursor
def __or__(
self,
right: Union["SeriesFrameFilter", Type["SeriesFrameFilter"]],
) -> "SeriesFrameFilter":
if callable(right):
right = right()
right.reader = self
right.sft_info = self.sft_info
return right
def sfhls(
self,
max_bytelen: int,
) -> Iterator[FlatBufferWrapper[SeriesFrameHolderList]]:
"""
Repackage input frames as a series frame holder list (sfhl) iterator.
:param:`max_bytelen` - the max byte length allowed per sfhl.
"""
chunk = list()
chunk_bytelen = 0
for pkg in self.frames():
if chunk_bytelen + len(pkg.frame.bytes) > max_bytelen:
if not chunk:
break
yield sfhl_from_frames(chunk)
chunk_bytelen = chruncate_frames(chunk)
chunk_bytelen += append_frame(chunk, pkg.frame)
if chunk:
yield sfhl_from_frames(chunk)
chunk_bytelen = chruncate_frames(chunk)
class SeriesFrameFilter(ASeriesFrameEmitter):
"""
Class defining the characteristics of a filter, which is a kind of emitter.
The difference between a filter and an emitter is:
1. an emitter doesn't know how to provide `frames` (no concrete
implementation)
2. a filter assumes it will read `frames` from another injected emitter
(called a `reader`), and by default simply returns those `frames`
unadulterated.
"""
def __init__(
self,
reader: Optional[ASeriesFrameEmitter] = None,
**kwargs
) -> None:
"""
:param:`reader` - input stream this filter reads.
"""
super().__init__(**kwargs)
self._reader = reader
self._cursor: Iterator[SeriesFramePackage] = iter(())
@property
def reader(self) -> ASeriesFrameEmitter:
"""
The input stream this filter reads.
"""
if self._reader is None:
raise RuntimeError("Illegal state: `reader` is None.")
return self._reader
@reader.setter
def reader(self, val):
self._reader = val
def frames(self):
"""
This has to exist because it's subclassing an abstract class that
declares it, but a concrete implementation of this would be confusing.
Generally speaking, implementations should read the `self.reader` and
do something to modify the frames.
"""
raise NotImplementedError()
def sfhl_from_frames(
frames: Iterator[FlatBufferWrapper[SeriesFrame]]
) -> FlatBufferWrapper[SeriesFrameHolderList]:
builder = fb_builder.Builder(0)
builder.Finish(
fb_builder.CreateSeriesFrameHolderList(
builder=builder,
frames=[
fb_builder.CreateSeriesFrameHolder(
builder=builder,
data=frame.bytes,
)
for frame in frames
]
)
)
return FlatBufferWrapper(
_bytes=builder.Output(),
schema=SeriesFrameHolderList,
)
def append_frame(
chunk: List[FlatBufferWrapper[SeriesFrame]],
frame: FlatBufferWrapper[SeriesFrame],
) -> int:
"""
Appends a frame to the given chunk and returns the bytelen it appended.
"""
chunk.append(frame)
return len(frame.bytes)
def chruncate_frames(
chunk: List[FlatBufferWrapper[SeriesFrame]],
) -> Literal[0]:
"""
"Truncates" a "chunk" of frames to zero. Returns bytelen (constantly `0`)
to be consistent with :func:`append_frame`.
"""
chunk.clear()
return 0

View File

@@ -1,106 +0,0 @@
from dataclasses import dataclass
import logging
from pathlib import Path
from typing import Dict, List, Optional
import yaml
from physiq_cloud.time import Instant, Interval, TimeUnit
from pydantic import BaseModel, validator
from pojagi_dsp.series_frame import SftInfo
from pojagi_dsp.series_frame.generator.sampler import SftInterval
logger = logging.getLogger(__name__)
class ExcerptType(BaseModel):
series_frame_type: str # vci-vitalpatch-telemetry
sft_interval: Optional[SftInterval]
class Excerpt(BaseModel):
account: str # uuid4
id: str # patient1
interval: Interval # "iso/iso"
organization: str # "maps"
source_id: str # "maps.physiq.io"
types: List[ExcerptType]
@validator("interval", pre=True)
def timespan_to_interval(cls, timespan: str): return Interval(
**{
["start", "end_exclusive"][idx]: Instant.parse_iso8601(stamp)
for idx, stamp in enumerate(timespan.split("/"))
}
)
class ExtBundleConfig(BaseModel):
excerpts: List[Excerpt]
id: str # clinical-event-trigger
type: str = "PhysIQCloud"
@dataclass(frozen=True)
class BundleConfig:
excerpts: Dict[str, Excerpt]
id: str
@staticmethod
def create(bundle_dir: Path) -> "BundleConfig":
yaml_files = (x.resolve() for x in bundle_dir.glob("**/*.yaml"))
try:
# the first yaml you encounter is BUNDLE_CONFIG.yaml
ext_bundle_config = ExtBundleConfig.parse_obj(
yaml.safe_load(next(yaml_files).read_text())
)
except StopIteration as si:
raise RuntimeError(
f"No yaml files found under \"{bundle_dir}\"."
) from si
# the rest are INFO.yaml files
info_files = list(yaml_files)
for excerpt in ext_bundle_config.excerpts:
logger.debug(f"Loading excerpt \"{excerpt.id}\".")
for type_ in excerpt.types:
logger.debug(f"Loading SFT \"{type_.series_frame_type}\".")
# Make sure the info file belongs to this excerpt
INFO = next(
x for x in info_files
if x.as_posix()
.split("/")[-3]
.startswith(excerpt.id)
if x.as_posix()
.split("/")[-2]
.startswith(type_.series_frame_type)
)
sft_info = SftInfo.parse_obj(
yaml.safe_load(INFO.read_text())
)
sft_info.alias = type_.series_frame_type
type_.sft_interval = SftInterval(
info=sft_info,
interval=excerpt.interval,
frame_count=(
excerpt.interval
.to_duration()
.to_unix(TimeUnit.MICROSECONDS)
// sft_info.frame_size_micros
),
files=sorted(INFO.parent.glob("**/*.sfhl")),
)
return BundleConfig(
id=ext_bundle_config.id,
excerpts={
x.id: x
for x in ext_bundle_config.excerpts
},
)

View File

@@ -1,40 +0,0 @@
import logging
import math
from typing import Iterator, Union
from pojagi_dsp.series_frame import (SeriesFrameFilter,
SeriesFramePackage)
logger = logging.getLogger(__name__)
class Looping(SeriesFrameFilter):
"""
Filter that loops through the input reader's frames until it reaches
the given :param:`iterations`
"""
def __init__(
self,
iterations: Union[int, float] = math.inf,
**kwargs,
) -> None:
"""
:param:`iterations` - number of times to loop through the `frames`.
"""
super().__init__(**kwargs)
self.iterations = iterations
self.iteration = 0
def increment(self) -> None:
"""Override to react to `iteration` increment."""
self.iteration += 1
logger.debug(
f"Loop incremented ({self.sft_info.alias}): {self.iteration}"
)
def frames(self) -> Iterator[SeriesFramePackage]:
while self.iteration < self.iterations:
for pkg in self.reader.frames():
yield pkg
self.increment()

View File

@@ -1,277 +0,0 @@
import functools
import logging
from typing import Any, Iterator, List, Optional
import flatbuffers
import numpy as np
from physiq_cloud.series_frame import fb_builder
from physiq_cloud.series_frame.fb.math import ceildiv
from physiq_cloud.series_frame.fb_to_numpy import from_channel_data
from physiq_cloud.series_frame.fb_wrapper import FlatBufferWrapper
from physiq_cloud.time import Instant, TimeUnit
from physiqfb.ChannelData import ChannelData
from physiqfb.ReadingType import ReadingType
from physiqfb.SamplingSetData import SamplingSetData
from physiqfb.SeriesFrame import SeriesFrame
from pydantic import BaseModel
from pojagi_dsp.series_frame import (
ASeriesFrameEmitter,
SeriesFrameFilter,
SeriesFramePackage,
SftInfo,
)
logger = logging.getLogger(__name__)
class Synchronize(BaseModel):
hour: bool = False
minute: bool = False
second: bool = False
microsecond: bool = False
class Replaying(SeriesFrameFilter):
"""
Filter that replays input frames starting at a given :class:`Instant`
in time.
"""
def __init__(
self,
emit_start: Instant = Instant.now(),
synchronize: Optional[Synchronize] = None,
**kwargs,
) -> None:
"""
:param:`emit_start` - instant in time to replay the frames.
:param:`synchronize` - Whether and at what granularity to synchronize the Instant from the source
data with the configured :param:`emit_start`. This will alter the precision of the
emit start seconds depending on the level of granularity you choose,
possibly drastically.
"""
super().__init__(**kwargs)
self.frame_idx = 0
self.emit_start = emit_start
self.synchronize = synchronize
self._first_frame_id: int | None = None
self._start_frame_id_inst: int | None = None
@ASeriesFrameEmitter.sft_info.setter
def sft_info(self, val: SftInfo):
"""
This override's :class:`ASeriesFrameFilter`'s :meth:`sft_info`
setter, so that we can initialize the `timestamped_map` property.
"""
self._sft_info = val
self.timestamped_map = {
s.id: s.timestamped for s in self.sft_info.sampling_sets
}
@property
def start_frame_id(self):
"""
:meth:`frames` must be called before accessing this for the first time,
because the calculation depends on the first frame.
"""
if self._start_frame_id_inst is not None:
return self._start_frame_id_inst
if not self.synchronize or not any(v for _, v in self.synchronize):
self.synchronized_emit_start = self.emit_start
else:
emit_start_dt = self.emit_start.to_datetime()
source_start_dt = Instant.from_unix(
TimeUnit.MICROSECONDS,
self._first_frame_id * self.sft_info.frame_size_micros,
).to_datetime()
self.synchronized_emit_start = Instant.from_datetime(
emit_start_dt.replace(
**{
k: getattr(source_start_dt, k)
if getattr(self.synchronize, k)
else getattr(emit_start_dt, k)
for k in ["hour", "minute", "second", "microsecond"]
},
),
)
# We use ceildiv to calculate the first frame ID, because normal
# division would likely yield a frame BEFORE the emit_start. We want to
# start emitting frames at or after emit_start, and ceildiv does that
# for us.
self._start_frame_id_inst = ceildiv(
self.synchronized_emit_start.to_unix(
TimeUnit.MICROSECONDS,
),
self.sft_info.frame_size_micros,
)
return self._start_frame_id_inst
def replayed_frame(
self,
source: SeriesFrame,
target_frame_id: int,
) -> FlatBufferWrapper[SeriesFrame]:
builder = flatbuffers.Builder(0)
target = fb_builder.CreateSeriesFrame(
builder=builder,
frame_id=target_frame_id,
ingested_at_micros=-1, # source.IngestedAtMicros(),
sampling_sets_offsets=[
replayed_sampling_set(
builder=builder,
sampling_set_data=sampling_set_data,
source_frame_id=source.FrameId(),
target_frame_id=target_frame_id,
frame_size_micros=self.sft_info.frame_size_micros,
is_timestamped=self.timestamped_map[sampling_set_data.Id()],
)
for sampling_set_data in [
source.SamplingSets(idx)
for idx in range(source.SamplingSetsLength())
]
],
submitter_id=-1, # source.SubmitterId(),
sensor_id=-1, # source.SensorId(),
)
builder.Finish(target)
target_frame_bytes = builder.Output()
return FlatBufferWrapper(
_bytes=target_frame_bytes,
schema=SeriesFrame,
)
def frames(self) -> Iterator[SeriesFramePackage]:
"""
Each time this property is accessed, it will access the injected
reader anew, but continue to increment the `frame_idx` so that
the next frame will replay where the last one left off.
To reset the `frame_idx`, client code can just set the `frame_idx`
manually (or create a new instance).
"""
for pkg in self.reader.frames():
if self._first_frame_id is None:
self._first_frame_id = pkg.frame.fb.FrameId()
yield SeriesFramePackage(
frame=self.replayed_frame(
source=pkg.frame.fb,
target_frame_id=self.start_frame_id + self.frame_idx,
),
sft_info=pkg.sft_info,
)
# `int` in python 3 is unbounded, can continue indefinitely. Also,
# the `FrameId` is effectively a timestamp, so continuously
# `inc`ing this value is by design.
self.frame_idx += 1
def replayed_sampling_set(
builder: flatbuffers.Builder,
sampling_set_data: SamplingSetData,
source_frame_id: int,
target_frame_id: int,
frame_size_micros: int,
is_timestamped: bool,
):
source_channels = [
sampling_set_data.Channels(idx)
for idx in range(sampling_set_data.ChannelsLength())
]
target_channels: List[int] = list()
for source_channel_data in source_channels:
is_timestamp_channel = is_timestamped and source_channel_data.Id() == 0
if not is_timestamp_channel or (source_frame_id == target_frame_id):
# Just copy verbatim
target_channel_data = replayed_channel_data(
builder,
source_channel_data,
)
else:
# Update the timestamps relative to the target frameId
source_frame_start_micros = source_frame_id * frame_size_micros
target_frame_start_micros = target_frame_id * frame_size_micros
# np.copy is used because the returned array is read-only
timestamps = np.copy(from_channel_data(data=source_channel_data))
timestamps -= source_frame_start_micros # relativize
timestamps += target_frame_start_micros # frame shift
target_channel_data = fb_builder.CreateChannelData(
builder=builder,
id=0, # `source_channel_data.Id()` is constantly 0 here
readings_offset=fb_builder.CreateInt64Channel(
builder=builder,
data=timestamps.tolist(), # List[int]
),
readings_type=ReadingType.INT64,
)
target_channels.append(target_channel_data)
np_relative_timestamps = sampling_set_data.RelativeTimestampsAsNumpy()
relative_timestamps = (
np_relative_timestamps.tobytes()
if np_relative_timestamps not in {0, -1} # error conditions/no data
else None
)
return fb_builder.CreateSamplingSetData(
builder=builder,
channel_offsets=target_channels,
id=sampling_set_data.Id(),
start_offset_micros=sampling_set_data.StartOffsetMicros(),
relative_timestamps_type=sampling_set_data.RelativeTimestampsType(),
relative_timestamps_unit_in_micros=sampling_set_data.RelativeTimestampsUnitInMicros(),
relative_timestamps=relative_timestamps,
relative_timestamps_sample_count=sampling_set_data.RelativeTimestampsSampleCount(),
)
def replayed_channel_data(
builder: flatbuffers.Builder,
channel_data: ChannelData,
) -> int:
readings_type = channel_data.ReadingsType()
return fb_builder.CreateChannelData(
builder=builder,
id=channel_data.Id(),
readings_offset=channel(
builder=builder,
readings_type=readings_type,
readings=from_channel_data(data=channel_data).tolist(),
),
readings_type=readings_type,
)
reading_type_map = {
ReadingType.BINARY32: fb_builder.CreateBinary32Channel,
ReadingType.BINARY64: fb_builder.CreateBinary64Channel,
ReadingType.INT8: fb_builder.CreateInt8Channel,
ReadingType.INT16: fb_builder.CreateInt16Channel,
ReadingType.INT32: fb_builder.CreateInt32Channel,
ReadingType.INT64: fb_builder.CreateInt64Channel,
ReadingType.NAMED_READING: fb_builder.CreateNamedReadingChannel,
ReadingType.STRING: fb_builder.CreateStringChannel,
}
def channel(
builder: flatbuffers.Builder,
readings_type: int,
readings: List[Any],
) -> int:
return reading_type_map[readings_type](builder=builder, data=readings)

View File

@@ -1,76 +0,0 @@
import logging
from typing import Callable, Iterator
from physiq_cloud.time import Duration, Instant, TimeUnit
from pojagi_dsp.series_frame import SeriesFrameFilter, SeriesFramePackage
logger = logging.getLogger(__name__)
class Updating(SeriesFrameFilter):
"""
Filter that outputs frames until it runs out or the next frame would
surpass a given :class:`Instant` in time, whichever comes first.
"""
def __init__(
self,
until: Callable[[], Instant] = Instant.now,
**kwargs,
):
"""
:param:`until` - closure returning an `Instant` at which to stop
iterating when the next frame would surpass it. The cursor will
otherwise persist until it has been exhausted.
"""
super().__init__(**kwargs)
self.until = until
self.last_update_count = 0
self._next_pkg = None
def log_status(filter, end: Instant, until: Instant):
between = Duration.between(end, until).to_unix(TimeUnit.MILLISECONDS) / float(
1000
)
logger.debug(
f"{filter.sft_info.alias}: Update scope : {between} seconds"
if end < until
else (
f"{filter.sft_info.alias}: "
f"Next update available in {-between} seconds"
)
)
def frames(self) -> Iterator[SeriesFramePackage]:
self.last_update_count = 0
if self._next_pkg is None:
cursor = self.reader.stream()
self._next_pkg = next(cursor)
else:
cursor = self.reader.cursor
logger.debug(f"{self.sft_info.alias}: {cursor}")
try:
until = self.until()
end = self._next_pkg.end
self.log_status(end, until)
while end < until:
yield self._next_pkg
self.last_update_count += 1
self._next_pkg = next(cursor)
end = self._next_pkg.end
except StopIteration:
logger.warning("StopIteration caught")
logger.warning(f"{self.sft_info.alias}: {cursor}")
...
if self.last_update_count:
logger.debug(
f"{self.sft_info.alias}: " f"Update frames : {self.last_update_count}"
)

View File

@@ -1,15 +0,0 @@
from dataclasses import dataclass
from pathlib import Path
from typing import List
from physiq_cloud.time import Interval
from pojagi_dsp.series_frame import SftInfo
@dataclass(frozen=True)
class SftInterval:
info: SftInfo
interval: Interval
frame_count: int
files: List[Path]

View File

@@ -1,95 +0,0 @@
import logging
import math
from pathlib import Path
from typing import Iterator
from physiq_cloud.series_frame.fb_wrapper import FlatBufferWrapper
from physiqfb.SeriesFrame import SeriesFrame
from physiqfb.SeriesFrameHolderList import SeriesFrameHolderList
from pojagi_dsp.series_frame import (ASeriesFrameEmitter,
SeriesFramePackage)
logger = logging.getLogger(__name__)
class FileSampler(ASeriesFrameEmitter):
def __init__(
self,
*sfhl_files: Path,
frame_count: int = math.inf,
frame_offset: int = 0,
**kwargs,
) -> None:
"""
:param:`sfhl_files` - One or more files containing sfhl data. It's
assumed that the files are sorted and contain contiguous (sorted)
data, and no effort is made to validate that or handle any error
condition related to correctness. The files are treated as a single
logical file, and an `EOFError` is raised if the `frame_count` wants
more than the available frames.
:param:`frame_count` - The number of frames this generator will
attempt to read from the given files.
:param:`frame_offset` - The number of frames to skip over before
yielding frames from the given files.
"""
super().__init__(**kwargs)
self.files = sfhl_files
self.frame_offset = frame_offset
self.frame_count = frame_count
self.frame_idx = 0
def frames_from_sfhl_bytes(
self, data: bytes,
) -> Iterator[FlatBufferWrapper[SeriesFrame]]:
"""
Emit frames from series frame holder list data.
:param:`data` - sfhl bytes.
"""
sfhl = SeriesFrameHolderList.GetRootAsSeriesFrameHolderList(data, 0)
for index in range(sfhl.FramesLength()):
yield FlatBufferWrapper(
_bytes=sfhl.Frames(index).DataAsNumpy().tobytes(),
schema=SeriesFrame,
)
def frames(self) -> Iterator[SeriesFramePackage]:
self.frame_idx = 0
end_frame_idx = self.frame_offset + self.frame_count
for file in self.files:
"""
Finite set of files passed in.
"""
for frame in self.frames_from_sfhl_bytes(file.read_bytes()):
"""
Finite set of frames from `read_bytes`.
"""
if self.frame_idx < self.frame_offset:
"""
Fast-forward to the declared offset, for each file.
"""
self.frame_idx += 1
continue
elif self.frame_idx > end_frame_idx:
"""
`end_frame_idx` is finite.
"""
break
yield SeriesFramePackage(frame=frame, sft_info=self.sft_info)
self.frame_idx += 1
if end_frame_idx != math.inf:
if self.frame_idx < end_frame_idx:
frames_read = self.frame_idx - self.frame_offset
raise EOFError(
f"Failed to read {self.frame_count - frames_read} frames. "
"Not enough data.\n"
f"\toffset : {self.frame_offset}\n"
f"\tframes read : {frames_read}"
)

View File

@@ -1,48 +0,0 @@
from typing import Iterator
import flatbuffers
from physiq_cloud.series_frame import fb_builder
from physiq_cloud.series_frame.fb_wrapper import FlatBufferWrapper
from physiq_cloud.time import Instant, TimeUnit
from physiqfb import SeriesFrame
from pojagi_dsp.series_frame import (ASeriesFrameEmitter,
SeriesFramePackage)
class VCIVitalPatchTelemetrySynthesizer(ASeriesFrameEmitter):
def frames(self) -> Iterator[SeriesFramePackage]:
builder = flatbuffers.Builder(0)
target = fb_builder.CreateSeriesFrame(
builder=builder,
frame_id=Instant.now().to_unix(
TimeUnit.MICROSECONDS,
) // self.sft_info.frame_size_micros,
ingested_at_micros=-1,
sampling_sets_offsets=[
# replayed_sampling_set(
# builder=builder,
# sampling_set_data=sampling_set_data,
# source_frame_id=source.FrameId(),
# target_frame_id=target_frame_id,
# frame_size_micros=self.sft_info.frame_size_micros,
# is_timestamped=self.timestamped_map[
# sampling_set_data.Id()
# ],
# )
# for sampling_set_data in [
# source.SamplingSets(idx)
# for idx in range(source.SamplingSetsLength())
# ]
],
submitter_id=-1,
sensor_id=-1,
)
builder.Finish(target)
target_frame_bytes = builder.Output()
return FlatBufferWrapper(
_bytes=target_frame_bytes,
schema=SeriesFrame,
)

View File

@@ -1,35 +1,46 @@
from copy import deepcopy
import math
from typing import Iterator
import pytest
from pojagi_dsp.channel import ASignal, Constantly, Filter, FilterFunction, SignalFunction, IllegalStateError, Map
from pojagi_dsp.channel import (
ASignal,
Constantly,
Filter,
FilterFunction,
SignalFunction,
IllegalStateException,
Reduce,
)
from pojagi_dsp.channel.generator.sine import SineWave
@pytest.fixture
def srate(): return 44100
def srate():
return 44100
@pytest.fixture
def freq(): return 440
def freq():
return 440
@pytest.fixture
def const(srate): return Constantly(42, srate=srate)
def const(srate):
return Constantly(42, srate=srate)
@pytest.fixture
def sine(srate, freq): return SineWave(freq, srate=srate)
def sine(srate, freq):
return SineWave(freq, srate=srate)
@pytest.fixture
def sine_generator_factory(srate, freq):
def sine():
phase = 0.0
inc = (2 * math.pi * freq)/srate
inc = (2 * math.pi * freq) / srate
while True:
yield math.sin(phase)
phase += inc
return sine
@@ -73,14 +84,14 @@ def test_filter_nested_expression(const: Constantly):
def test_reader(const: Constantly):
filter = Filter()
with pytest.raises(IllegalStateError, match=".reader. is None"):
with pytest.raises(IllegalStateException, match=".reader. is None"):
filter.reader
assert (const | filter).reader == const
def test_pipeline_missing_reader(const: ASignal):
pipeline = Filter | Filter | Filter
with pytest.raises(IllegalStateError, match=".reader. is None"):
with pytest.raises(IllegalStateException, match=".reader. is None"):
next(pipeline)
assert next(const | pipeline)
@@ -94,13 +105,13 @@ def test_filter_can_only_be_assigned_one_generator(const: Constantly):
def test_add_tuple(const: Constantly):
pipeline = const + (100, 200, 300)
assert isinstance(pipeline, Map)
assert isinstance(pipeline, Reduce)
assert next(pipeline) == const.constant + (100 + 200 + 300)
def test_mul_tuple(const: Constantly):
pipeline = const * (100, 200, 300)
assert isinstance(pipeline, Map)
assert isinstance(pipeline, Reduce)
assert next(pipeline) == const.constant * (100 * 200 * 300)