first commit
This commit is contained in:
24
.gitignore
vendored
Normal file
24
.gitignore
vendored
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
notes.*
|
||||||
|
|
||||||
|
.venv
|
||||||
|
.vscode
|
||||||
|
|
||||||
|
linen*
|
||||||
|
|
||||||
|
build/
|
||||||
|
|
||||||
|
loadtest.coverage.timespan
|
||||||
|
|
||||||
|
node_modules/
|
||||||
|
yarn-error.log
|
||||||
|
|
||||||
|
__pycache__/
|
||||||
|
|
||||||
|
*.log
|
||||||
|
|
||||||
|
*.env
|
||||||
|
my-*
|
||||||
|
|
||||||
|
pojagi_dsp.egg-info/
|
||||||
|
|
||||||
|
.pytest_cache/
|
||||||
BIN
documentation/QRS-complex.png
Normal file
BIN
documentation/QRS-complex.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 1.8 MiB |
28
pyproject.toml
Normal file
28
pyproject.toml
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
[build-system]
|
||||||
|
requires = ["setuptools", "setuptools-scm"]
|
||||||
|
build-backend = "setuptools.build_meta"
|
||||||
|
|
||||||
|
[project]
|
||||||
|
name = "pojagi-dsp"
|
||||||
|
description = "DSP tools for load testing."
|
||||||
|
urls = { "gitlab" = "https://gitlab.pojagi.org/tjb1982/dsp" }
|
||||||
|
authors = [
|
||||||
|
{ name = "Tom Brennan", email = "tjb1982@gmail.com" },
|
||||||
|
]
|
||||||
|
readme = "README.md"
|
||||||
|
requires-python = ">=3.8"
|
||||||
|
classifiers = []
|
||||||
|
dependencies = [
|
||||||
|
"pydantic==1.10.2",
|
||||||
|
"scipy==1.8.1",
|
||||||
|
|
||||||
|
# These should be included in the above requirement
|
||||||
|
# "marshmallow>=3.3.0",
|
||||||
|
# "marshmallow_dataclass>=7.2.1",
|
||||||
|
# "marshmallow_oneofschema>=2.0.1",
|
||||||
|
# "PyYAML>=5.3.1",
|
||||||
|
]
|
||||||
|
|
||||||
|
optional-dependencies = { test = ["pytest"] }
|
||||||
|
version = "0.0.0.dev0"
|
||||||
|
# dynamic = ["version"]
|
||||||
337
src/pojagi_dsp/channel/__init__.py
Normal file
337
src/pojagi_dsp/channel/__init__.py
Normal file
@@ -0,0 +1,337 @@
|
|||||||
|
import abc
|
||||||
|
import copy
|
||||||
|
import datetime
|
||||||
|
import inspect
|
||||||
|
from itertools import islice
|
||||||
|
import logging
|
||||||
|
import math
|
||||||
|
import operator
|
||||||
|
from collections.abc import Iterable
|
||||||
|
from functools import reduce
|
||||||
|
import types
|
||||||
|
from typing import (Any, Callable, Generic, Iterator, Optional, Type, TypeVar,
|
||||||
|
Union)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
|
||||||
|
class IllegalStateError(ValueError):
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
def coerce_channels(x: Any) -> Iterator["ASignal"]:
|
||||||
|
if isinstance(x, ASignal):
|
||||||
|
yield x
|
||||||
|
else:
|
||||||
|
if callable(x):
|
||||||
|
if isinstance(x, Type):
|
||||||
|
yield x()
|
||||||
|
else:
|
||||||
|
yield SignalFunction(x)
|
||||||
|
elif isinstance(x, Iterable): # and not isinstance(x, str):
|
||||||
|
for it in (coerce_channels(y) for y in x):
|
||||||
|
for channel in it:
|
||||||
|
yield channel
|
||||||
|
else:
|
||||||
|
yield Constantly(x)
|
||||||
|
|
||||||
|
|
||||||
|
class ASignalMeta(abc.ABCMeta):
|
||||||
|
def __or__(self, other: Any) -> "Filter":
|
||||||
|
"""
|
||||||
|
Allows `|` composition starting from an uninitialized class.
|
||||||
|
See doc for `__or__` below in `ASignal`.
|
||||||
|
"""
|
||||||
|
return self() | coerce_channels(other)
|
||||||
|
|
||||||
|
def __radd__(self, other): return self() + other
|
||||||
|
def __add__(self, other): return self() + other
|
||||||
|
def __rmul__(self, other): return self() * other
|
||||||
|
def __mul__(self, other): return self() * other
|
||||||
|
|
||||||
|
|
||||||
|
class ASignal(Generic[T], metaclass=ASignalMeta):
|
||||||
|
def __init__(self, srate: Optional[float] = None):
|
||||||
|
self._srate = srate
|
||||||
|
self._cursor: Optional[Iterator[T]] = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def srate(self):
|
||||||
|
if self._srate is None:
|
||||||
|
raise IllegalStateError(
|
||||||
|
f"{self.__class__}: `srate` is None."
|
||||||
|
)
|
||||||
|
return self._srate
|
||||||
|
|
||||||
|
@srate.setter
|
||||||
|
def srate(self, val: float): self._srate = val
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
self._cursor = self.samples()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __next__(self): return next(self.cursor)
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def samples(self) -> Iterator[T]: ...
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cursor(self):
|
||||||
|
"""
|
||||||
|
An `Iterator` representing the current pipeline in progress.
|
||||||
|
"""
|
||||||
|
if self._cursor is None:
|
||||||
|
# this can only happen once
|
||||||
|
self._cursor = self.samples()
|
||||||
|
return self._cursor
|
||||||
|
|
||||||
|
def __getstate__(self):
|
||||||
|
"""
|
||||||
|
`_cursor` is a generator, and generators aren't picklable.
|
||||||
|
"""
|
||||||
|
state = self.__dict__.copy()
|
||||||
|
if state.get("_cursor"):
|
||||||
|
del state["_cursor"]
|
||||||
|
return state
|
||||||
|
|
||||||
|
def stream(self):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
yield next(self.cursor)
|
||||||
|
except StopIteration:
|
||||||
|
self = iter(self)
|
||||||
|
|
||||||
|
def of_duration(self, duration: datetime.timedelta):
|
||||||
|
"""
|
||||||
|
Returns an `Iterator` of samples for a particular duration expressed
|
||||||
|
as a `datetime.timedelta`
|
||||||
|
:param:`duration` - `datetime.timedelta` representing the duration
|
||||||
|
"""
|
||||||
|
return islice(
|
||||||
|
self.stream(),
|
||||||
|
0,
|
||||||
|
math.floor(self.srate * duration.total_seconds()),
|
||||||
|
)
|
||||||
|
|
||||||
|
def __or__(
|
||||||
|
left,
|
||||||
|
right: Union["Filter", Callable, Iterable],
|
||||||
|
) -> "Filter":
|
||||||
|
"""
|
||||||
|
Allows composition of filter pipelines with `|` operator.
|
||||||
|
|
||||||
|
e.g.,
|
||||||
|
```
|
||||||
|
myFooGenerator
|
||||||
|
| BarFilter
|
||||||
|
| baz_filter_func
|
||||||
|
| (lambda reader: (x for x in reader))
|
||||||
|
```
|
||||||
|
"""
|
||||||
|
if isinstance(right, SignalFunction):
|
||||||
|
return left | FilterFunction(fn=right._fn, name=right.Function)
|
||||||
|
|
||||||
|
if not isinstance(right, ASignal):
|
||||||
|
return reduce(operator.or_, (left, *coerce_channels(right)))
|
||||||
|
|
||||||
|
if not isinstance(right, Filter):
|
||||||
|
raise ValueError(
|
||||||
|
f"Right side must be a `{Filter.__name__}`; "
|
||||||
|
f"received: {type(right)}",
|
||||||
|
)
|
||||||
|
|
||||||
|
filter: Filter = right
|
||||||
|
while getattr(filter, "_reader", None) is not None:
|
||||||
|
# Assuming this is a filter pipeline, we want the last node's
|
||||||
|
# reader to be whatever's on the left side of this operation.
|
||||||
|
filter = filter.reader
|
||||||
|
|
||||||
|
if hasattr(filter, "_reader"):
|
||||||
|
# We hit the "bottom" and found a filter.
|
||||||
|
filter.reader = left
|
||||||
|
else:
|
||||||
|
# We hit the "bottom" and found a non-filter/generator.
|
||||||
|
raise ValueError(
|
||||||
|
f"{right.__class__.__name__}: filter pipeline already has a "
|
||||||
|
"generator."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Will often be `None` unless `left` is a generator.
|
||||||
|
right.srate = left._srate
|
||||||
|
|
||||||
|
return right
|
||||||
|
|
||||||
|
def __radd__(right, left): return right.__add__(left)
|
||||||
|
def __add__(left, right): return left._operator_impl(operator.add, right)
|
||||||
|
def __rmul__(right, left): return right.__mul__(left)
|
||||||
|
def __mul__(left, right): return left._operator_impl(operator.mul, right)
|
||||||
|
# FIXME: other operators? Also, shouldn't `*` mean convolve instead?
|
||||||
|
|
||||||
|
def _operator_impl(left, operator: Callable[..., T], right: Any):
|
||||||
|
channels = list(coerce_channels(right))
|
||||||
|
for channel in channels:
|
||||||
|
if channel._srate is None:
|
||||||
|
channel.srate = left._srate
|
||||||
|
return Reduce(operator, left, *channels, srate=left._srate)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
members = {}
|
||||||
|
for k in [k for k in dir(self)
|
||||||
|
if not k.startswith("_")
|
||||||
|
and not k in {"stream", "reader", "cursor", "wave", }]:
|
||||||
|
try:
|
||||||
|
v = getattr(self, k)
|
||||||
|
if not inspect.isroutine(v):
|
||||||
|
members[k] = v
|
||||||
|
except IllegalStateError as e:
|
||||||
|
members[k] = None
|
||||||
|
|
||||||
|
return (
|
||||||
|
f"{self.__class__.__name__}"
|
||||||
|
f"""({
|
||||||
|
f", ".join(
|
||||||
|
f"{k}={v}"
|
||||||
|
for k, v in members.items()
|
||||||
|
)
|
||||||
|
})"""
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
S = TypeVar("S", bound=ASignal)
|
||||||
|
|
||||||
|
|
||||||
|
class Reduce(ASignal, Generic[S, T]):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
# FIXME: typing https://stackoverflow.com/a/67814270
|
||||||
|
fn: Callable[..., T],
|
||||||
|
*streams: S,
|
||||||
|
srate: Optional[float] = None,
|
||||||
|
stateful=False,
|
||||||
|
):
|
||||||
|
super().__init__(srate)
|
||||||
|
self._fn = fn
|
||||||
|
self.fn = fn.__name__
|
||||||
|
self.streams = []
|
||||||
|
for stream in streams:
|
||||||
|
if stateful:
|
||||||
|
self.streams.append(stream)
|
||||||
|
continue
|
||||||
|
|
||||||
|
stream_ = (
|
||||||
|
copy.deepcopy(stream)
|
||||||
|
if not isinstance(stream, types.GeneratorType)
|
||||||
|
else stream
|
||||||
|
)
|
||||||
|
stream_.srate = srate
|
||||||
|
self.streams.append(stream_)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def srate(self): return ASignal.srate.fget(self)
|
||||||
|
|
||||||
|
@srate.setter
|
||||||
|
def srate(self, val: float):
|
||||||
|
ASignal.srate.fset(self, val)
|
||||||
|
for stream in self.streams:
|
||||||
|
if isinstance(stream, ASignal):
|
||||||
|
stream.srate = val
|
||||||
|
|
||||||
|
def samples(self): return (
|
||||||
|
reduce(self._fn, args)
|
||||||
|
for args in zip(*self.streams)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class Filter(ASignal, Generic[S]):
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
reader: Optional[S] = None,
|
||||||
|
srate: Optional[float] = None,
|
||||||
|
):
|
||||||
|
super().__init__(srate)
|
||||||
|
self.reader: Optional[S] = reader
|
||||||
|
|
||||||
|
@property
|
||||||
|
def reader(self) -> S:
|
||||||
|
"""
|
||||||
|
The input stream this filter reads.
|
||||||
|
"""
|
||||||
|
if not self._reader:
|
||||||
|
raise IllegalStateError(
|
||||||
|
f"{self.__class__}: `reader` is None."
|
||||||
|
)
|
||||||
|
return self._reader
|
||||||
|
|
||||||
|
@reader.setter
|
||||||
|
def reader(self, val: S):
|
||||||
|
self._reader = val
|
||||||
|
if val is not None and self._srate is None:
|
||||||
|
self.srate = val._srate
|
||||||
|
|
||||||
|
@property
|
||||||
|
def srate(self): return ASignal.srate.fget(self)
|
||||||
|
|
||||||
|
@srate.setter
|
||||||
|
def srate(self, val: float):
|
||||||
|
ASignal.srate.fset(self, val)
|
||||||
|
child = getattr(self, "_reader", None)
|
||||||
|
previous_srate = val
|
||||||
|
while child is not None:
|
||||||
|
# Since `srate` is optional at initialization, but required in
|
||||||
|
# general, we make our best attempt to normalize it for the
|
||||||
|
# filter pipeline, which should be consistent for most
|
||||||
|
# applications, by applying it to all children.
|
||||||
|
if child._srate is None:
|
||||||
|
child.srate = previous_srate
|
||||||
|
child: Optional[ASignal] = getattr(child, "_reader", None)
|
||||||
|
if isinstance(child, ASignal) and child._srate is not None:
|
||||||
|
previous_srate = child._srate
|
||||||
|
|
||||||
|
def samples(self) -> Iterator[T]: return self.reader.samples()
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return (
|
||||||
|
f"{self._reader} | {super().__repr__()}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class FilterFunction(Filter, Generic[T, S]):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
fn: Callable[[S], Iterator[T]],
|
||||||
|
name: Optional[str] = None,
|
||||||
|
reader: Optional[S] = None,
|
||||||
|
srate: Optional[float] = None,
|
||||||
|
):
|
||||||
|
super().__init__(reader, srate)
|
||||||
|
self._fn = fn
|
||||||
|
self.Function = name if name else fn.__name__
|
||||||
|
|
||||||
|
def samples(self): return self._fn(self.reader)
|
||||||
|
|
||||||
|
|
||||||
|
class SignalFunction(ASignal, Generic[T]):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
fn: Callable[[int], Iterator[T]],
|
||||||
|
name: Optional[str] = None,
|
||||||
|
srate: Optional[float] = None,
|
||||||
|
):
|
||||||
|
super().__init__(srate)
|
||||||
|
self._fn = fn
|
||||||
|
self.Function = name if name else fn.__name__
|
||||||
|
|
||||||
|
def samples(self) -> Iterator[T]: return self._fn(self.srate)
|
||||||
|
|
||||||
|
|
||||||
|
class Constantly(ASignal, Generic[T]):
|
||||||
|
def __init__(self, constant: T, srate: float = 0.0):
|
||||||
|
super().__init__(srate)
|
||||||
|
self.constant = constant
|
||||||
|
|
||||||
|
def samples(self) -> Iterator[T]:
|
||||||
|
while True:
|
||||||
|
yield self.constant
|
||||||
36
src/pojagi_dsp/channel/accelerometer/generator.py
Normal file
36
src/pojagi_dsp/channel/accelerometer/generator.py
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
from typing import Optional
|
||||||
|
from pojagi_dsp.channel import ASignal
|
||||||
|
|
||||||
|
|
||||||
|
class AccelerometerSynthesizer(ASignal):
|
||||||
|
"""
|
||||||
|
Features/dimensions:
|
||||||
|
- respiration
|
||||||
|
- awake vs. asleep
|
||||||
|
- activity level (while awake or asleep?)
|
||||||
|
|
||||||
|
I think what's needed here is to identify activities in the raw data and
|
||||||
|
then use those as wavetables for various activities. Walking, running, sitting,
|
||||||
|
laying down, sleeping in various positions. Then, theoretically, these
|
||||||
|
tables could be mixed together. But the problem with that is that we really
|
||||||
|
need to extract the components from the signal so that we can recombine them.
|
||||||
|
Perhaps do Fourier analysis on various segments you suspect to be wakefulness
|
||||||
|
and sleep, and see if you can find any commonality between them in the frequency
|
||||||
|
domain, that could be filtered out and reapplied synthetically.
|
||||||
|
|
||||||
|
https://www.ncbi.nlm.nih.gov/pmc/articles/PMC3203837/
|
||||||
|
|
||||||
|
The above link gives some insight into respiration divisions based on activity,
|
||||||
|
and how to derive that data using bandpass filtering.
|
||||||
|
"""
|
||||||
|
def __init__(self, srate: Optional[float] = None):
|
||||||
|
super().__init__(srate)
|
||||||
|
self.activity_level = 0.0
|
||||||
|
# Normal range at rest: 12 - 20 (up to 25)
|
||||||
|
# https://my.clevelandclinic.org/health/articles/10881-vital-signs
|
||||||
|
# Normal range during exercise 40-60
|
||||||
|
# https://www.ncbi.nlm.nih.gov/pmc/articles/PMC4818249/
|
||||||
|
self.respiration_rate = 12 # bpm
|
||||||
|
|
||||||
|
def samples(self):
|
||||||
|
...
|
||||||
98
src/pojagi_dsp/channel/ecg/__init__.py
Normal file
98
src/pojagi_dsp/channel/ecg/__init__.py
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
import abc
|
||||||
|
import dataclasses
|
||||||
|
import itertools
|
||||||
|
import math
|
||||||
|
from enum import Enum
|
||||||
|
from numbers import Number
|
||||||
|
from typing import ClassVar, Iterator, Optional
|
||||||
|
|
||||||
|
from pojagi_dsp.channel import ASignal
|
||||||
|
|
||||||
|
|
||||||
|
Segment = Enum("Segment", [
|
||||||
|
"R",
|
||||||
|
"S",
|
||||||
|
"S_T",
|
||||||
|
"T",
|
||||||
|
"U",
|
||||||
|
"T_P",
|
||||||
|
"P",
|
||||||
|
"P_R",
|
||||||
|
"Q",
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass(frozen=True)
|
||||||
|
class Segments:
|
||||||
|
"""
|
||||||
|
The ECG wave is a kind of impulse with 8 to 9 segments.
|
||||||
|
|
||||||
|
Segment pointers based on https://geekymedics.com/how-to-read-an-ecg/
|
||||||
|
"""
|
||||||
|
# ClassVar means it's not part of the constructor (i.e., `__init__` args).
|
||||||
|
R: ClassVar[int] = 0
|
||||||
|
"""R is constant 0"""
|
||||||
|
S: int
|
||||||
|
S_T: int # i.e., J point
|
||||||
|
T: int
|
||||||
|
T_P: int
|
||||||
|
P: int
|
||||||
|
P_R: int # actually, only goes from P-Q, confusingly
|
||||||
|
Q: int
|
||||||
|
|
||||||
|
U: Optional[int] = None
|
||||||
|
"""Rare, but if found, comes before T_P."""
|
||||||
|
|
||||||
|
def merge(self, other: "Segments", weight: float):
|
||||||
|
"""
|
||||||
|
Merge two `Segments`, with a given weight.
|
||||||
|
:param:`other` - the other `Segments` to merge
|
||||||
|
:param:`weight` - a value between 0.0 and 1.0 to indicate how much
|
||||||
|
weight to give to `other`.
|
||||||
|
"""
|
||||||
|
self_weight = 1 - weight
|
||||||
|
return Segments(
|
||||||
|
**{
|
||||||
|
k: math.floor(lv * self_weight + getattr(other, k) * weight)
|
||||||
|
if lv is not None
|
||||||
|
else getattr(other, k)
|
||||||
|
|
||||||
|
for k, lv in (
|
||||||
|
(f.name, getattr(self, f.name))
|
||||||
|
for f in dataclasses.fields(self)
|
||||||
|
)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class AECGChannel(ASignal[Number]):
|
||||||
|
|
||||||
|
@property
|
||||||
|
@abc.abstractproperty
|
||||||
|
def heart_rate(self) -> float:
|
||||||
|
"""Frequency of impulses/waves in bpm."""
|
||||||
|
...
|
||||||
|
|
||||||
|
@property
|
||||||
|
@abc.abstractproperty
|
||||||
|
def wavelength(self) -> int:
|
||||||
|
"""
|
||||||
|
The number of samples in a complete impulse/wave cycle.
|
||||||
|
|
||||||
|
Formula: `srate / (heart_rate / 60)` where `heart_rate / 60` is the
|
||||||
|
heart rate converted from bpm to Hz.
|
||||||
|
"""
|
||||||
|
...
|
||||||
|
|
||||||
|
@property
|
||||||
|
@abc.abstractproperty
|
||||||
|
def segments(self) -> Segments:
|
||||||
|
"""The analytical segments of the impulse/wave."""
|
||||||
|
...
|
||||||
|
|
||||||
|
@property
|
||||||
|
def wave(self) -> Iterator[Number]:
|
||||||
|
"""
|
||||||
|
Returns an iterator over a single ECG impulse/wave.
|
||||||
|
"""
|
||||||
|
return itertools.islice(self, 0, self.wavelength)
|
||||||
18
src/pojagi_dsp/channel/ecg/filter/__init__.py
Normal file
18
src/pojagi_dsp/channel/ecg/filter/__init__.py
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
from numbers import Number
|
||||||
|
from pojagi_dsp.channel.ecg import AECGChannel, Segments
|
||||||
|
|
||||||
|
from pojagi_dsp.channel import Filter
|
||||||
|
|
||||||
|
|
||||||
|
class ECGFilter(Filter[AECGChannel], AECGChannel):
|
||||||
|
|
||||||
|
@property
|
||||||
|
def heart_rate(self): return self.reader.heart_rate
|
||||||
|
|
||||||
|
@property
|
||||||
|
def wavelength(self): return self.reader.wavelength
|
||||||
|
|
||||||
|
@property
|
||||||
|
def segments(self) -> Segments: return self.reader.segments
|
||||||
|
|
||||||
|
def samples(self): return self.reader.samples()
|
||||||
11
src/pojagi_dsp/channel/ecg/filter/segment.py
Normal file
11
src/pojagi_dsp/channel/ecg/filter/segment.py
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
from pojagi_dsp.channel.ecg import Segment
|
||||||
|
from pojagi_dsp.channel.ecg.filter import ECGFilter
|
||||||
|
|
||||||
|
|
||||||
|
class WithSegment(ECGFilter):
|
||||||
|
def __init__(self, segment: Segment, **kwargs):
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
self.segment = segment
|
||||||
|
|
||||||
|
# def stream(self):
|
||||||
|
# ...
|
||||||
24
src/pojagi_dsp/channel/ecg/generator/__init__.py
Normal file
24
src/pojagi_dsp/channel/ecg/generator/__init__.py
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
import math
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from pojagi_dsp.channel.ecg import AECGChannel
|
||||||
|
|
||||||
|
|
||||||
|
class AECGSynthesizer(AECGChannel):
|
||||||
|
def __init__(self, /, heart_rate: int, srate: Optional[float] = None):
|
||||||
|
"""
|
||||||
|
:param:`heart_rate` - The heart rate in bpm.
|
||||||
|
"""
|
||||||
|
super().__init__(srate)
|
||||||
|
self.heart_rate = heart_rate
|
||||||
|
|
||||||
|
@property
|
||||||
|
def heart_rate(self): return self._heart_rate
|
||||||
|
|
||||||
|
@heart_rate.setter
|
||||||
|
def heart_rate(self, val: int):
|
||||||
|
self._heart_rate = val
|
||||||
|
self._wavelength = math.floor(self.srate * (60 / self.heart_rate))
|
||||||
|
|
||||||
|
@property
|
||||||
|
def wavelength(self): return self._wavelength
|
||||||
266
src/pojagi_dsp/channel/ecg/generator/wavetable/__init__.py
Normal file
266
src/pojagi_dsp/channel/ecg/generator/wavetable/__init__.py
Normal file
@@ -0,0 +1,266 @@
|
|||||||
|
import dataclasses
|
||||||
|
import logging
|
||||||
|
import math
|
||||||
|
from numbers import Number
|
||||||
|
from typing import Dict, List, Optional, Tuple
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from scipy.interpolate import CubicSpline
|
||||||
|
|
||||||
|
from pojagi_dsp.channel.ecg import Segments
|
||||||
|
from pojagi_dsp.channel.ecg.generator import AECGSynthesizer
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ECGWaveTable:
|
||||||
|
"""
|
||||||
|
This type of wavetable is designed around the P and R. By
|
||||||
|
convention, R will always be equal to 1, and the baseline (P) will always
|
||||||
|
be 0. (That doesn't mean, however, that the other values can't cross these
|
||||||
|
boundaries. E.g., Q and S are often negative.)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
data: List[Number],
|
||||||
|
segments: Segments,
|
||||||
|
bottom: Optional[Number] = None,
|
||||||
|
top: Optional[Number] = None,
|
||||||
|
table_length: int = 1 << 11, # 2048
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
The table size is increased to `table_length` upon initialization
|
||||||
|
using linear interpolation (via `numpy.interp`).
|
||||||
|
"""
|
||||||
|
|
||||||
|
if len(data) == table_length:
|
||||||
|
self.data = np.array(data)
|
||||||
|
else:
|
||||||
|
# We generate a larger table for use with linear interpolation,
|
||||||
|
# trading time (CPU) for memory (table size).
|
||||||
|
#
|
||||||
|
# Here we use cubic spline interpolation instead of linear for
|
||||||
|
# wavetable construction, since it usually only happens once at
|
||||||
|
# startup, and should provide a much better quality table from
|
||||||
|
# limited data, making it possible to work with small, manually
|
||||||
|
# composed tables that we JIT convert to the larger table.
|
||||||
|
### FIXME: use the sinc function instead:
|
||||||
|
### f(x) = sin(x)/x where x =/= 0 and f(x) = 1 if x = 0
|
||||||
|
### you have to apply this scaled to each sample in the table and
|
||||||
|
### then add all of the resulting signals together.
|
||||||
|
### I think this is the same as summing the dft of each impulse
|
||||||
|
### as if the impulse is a member of a larger table.
|
||||||
|
cs = CubicSpline(
|
||||||
|
range(len(data)),
|
||||||
|
data,
|
||||||
|
bc_type="natural",
|
||||||
|
)
|
||||||
|
self.data = np.array([
|
||||||
|
cs(x) for x in np.linspace(0, len(data), table_length)
|
||||||
|
])
|
||||||
|
|
||||||
|
self.segments = Segments(
|
||||||
|
**{
|
||||||
|
k: int(v / (len(data) / table_length)) if v else v
|
||||||
|
for k, v in (
|
||||||
|
(f.name, getattr(segments, f.name))
|
||||||
|
for f in dataclasses.fields(segments)
|
||||||
|
)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# NOTE: these are not the data min/max, but the normal min/max, either
|
||||||
|
# provided as kwargs, or derived from P and R segment starts, by
|
||||||
|
# convention.
|
||||||
|
bottom = bottom if bottom is not None else data[segments.P]
|
||||||
|
top = top if top is not None else data[segments.R]
|
||||||
|
|
||||||
|
if not (0 == bottom and 1 == top):
|
||||||
|
# Normalize between 0 and 1:
|
||||||
|
self.data = (self.data - bottom) / (top - bottom)
|
||||||
|
|
||||||
|
def __getitem__(self, k): return self.data[k]
|
||||||
|
|
||||||
|
def __len__(self): return len(self.data) # O(1)
|
||||||
|
|
||||||
|
def linear_interpolation(
|
||||||
|
self,
|
||||||
|
index: float,
|
||||||
|
floor: Optional[int] = None,
|
||||||
|
ceiling: Optional[int] = None,
|
||||||
|
) -> float:
|
||||||
|
"""
|
||||||
|
Handles the situation where the floor would produce duplicate values,
|
||||||
|
which makes the waveform chunky with aliasing; instead, we obtain a
|
||||||
|
value weighted between the floor/ceiling, trading time (CPU) for
|
||||||
|
memory (table size).
|
||||||
|
"""
|
||||||
|
dl = len(self.data)
|
||||||
|
floor = floor if floor is not None else math.floor(index) % dl
|
||||||
|
ceiling = ceiling if ceiling is not None else (floor + 1) % dl
|
||||||
|
|
||||||
|
# e.g., a. 124.75 - 124 == 0.75
|
||||||
|
# b. 123 - 123 == 0 (no weight goes to ceiling)
|
||||||
|
ceiling_weight = index - floor
|
||||||
|
# e.g., a. 1 - 0.75 == 0.25
|
||||||
|
# b. 1 - 0 == 1 (all weight goes to floor)
|
||||||
|
floor_weight = 1 - ceiling_weight
|
||||||
|
|
||||||
|
return (
|
||||||
|
self[floor] * floor_weight + self[ceiling] * ceiling_weight
|
||||||
|
)
|
||||||
|
|
||||||
|
def merge(
|
||||||
|
self,
|
||||||
|
other: "ECGWaveTable",
|
||||||
|
weight: float,
|
||||||
|
):
|
||||||
|
self_weight = 1 - weight
|
||||||
|
return ECGWaveTable(
|
||||||
|
data=(self.data * self_weight + other.data * weight),
|
||||||
|
segments=self.segments.merge(other.segments, weight),
|
||||||
|
top=1,
|
||||||
|
bottom=0,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ECGWaveTableSynthesizer(AECGSynthesizer):
|
||||||
|
def __init__(
|
||||||
|
self, /,
|
||||||
|
tables: Dict[Tuple[float, float], ECGWaveTable],
|
||||||
|
heart_rate: int,
|
||||||
|
srate: Optional[float] = None,
|
||||||
|
):
|
||||||
|
self.inc: float = 0.0
|
||||||
|
self.tables = tables
|
||||||
|
super().__init__(heart_rate, srate)
|
||||||
|
|
||||||
|
self._segments: Segments = None
|
||||||
|
|
||||||
|
def samples(self):
|
||||||
|
index: float = 0.0
|
||||||
|
inc: float = None
|
||||||
|
n: int = 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if n > 0:
|
||||||
|
floor = math.floor(index)
|
||||||
|
else:
|
||||||
|
# `n` cannot be negative
|
||||||
|
floor = 0
|
||||||
|
index = 0.0
|
||||||
|
|
||||||
|
# NOTE: it's important that this `n` calculation occur before
|
||||||
|
# `yield` because the `heart_rate` (and, hence, the `wavelength`)
|
||||||
|
# may change in the meantime, which affects resetting to zero with
|
||||||
|
# the modulo operation below.
|
||||||
|
n = (n + 1) % self.wavelength
|
||||||
|
|
||||||
|
yield self.table.linear_interpolation(index, floor=floor)
|
||||||
|
|
||||||
|
if (self.heart_rate < 60
|
||||||
|
and
|
||||||
|
self.table.segments.T_P <= floor < self.table.segments.P):
|
||||||
|
inc = self.brady_inc
|
||||||
|
elif (self.heart_rate > 60
|
||||||
|
and
|
||||||
|
self.table.segments.T <= floor < self.table.segments.Q):
|
||||||
|
# FIXME: this is probably only good below a certain
|
||||||
|
# `heart_rate` threshold.
|
||||||
|
inc = self.tachy_inc
|
||||||
|
else:
|
||||||
|
inc = None
|
||||||
|
|
||||||
|
index += inc if inc is not None else self.inc
|
||||||
|
index %= len(self.table)
|
||||||
|
|
||||||
|
def _recalibrate(self, heart_rate: float) -> None:
|
||||||
|
table_matches = {
|
||||||
|
k: v
|
||||||
|
for k, v in self.tables.items()
|
||||||
|
if k[0] <= heart_rate < k[1]
|
||||||
|
}
|
||||||
|
|
||||||
|
if not table_matches:
|
||||||
|
raise ValueError(
|
||||||
|
f"No table found corresponding to heart rate: {heart_rate}."
|
||||||
|
)
|
||||||
|
|
||||||
|
# Since we may have more than two tables that match, we loop
|
||||||
|
# through all the matches, applying them in key order.
|
||||||
|
keys = iter(sorted(table_matches))
|
||||||
|
key = next(keys)
|
||||||
|
table = table_matches[key]
|
||||||
|
|
||||||
|
for next_key in keys:
|
||||||
|
next_table = table_matches[next_key]
|
||||||
|
|
||||||
|
if next_key[1] < key[1]:
|
||||||
|
# `next_key` is fully contained within `key`
|
||||||
|
floor, ceiling = next_key
|
||||||
|
next_weight = (heart_rate - floor) / (ceiling - floor)
|
||||||
|
weight = 1 - next_weight
|
||||||
|
|
||||||
|
if (heart_rate - floor) > ((ceiling - floor) / 2):
|
||||||
|
# Weights form an "X" shape; i.e., crossfade to 50%
|
||||||
|
# and back.
|
||||||
|
weight, next_weight = next_weight, weight
|
||||||
|
else:
|
||||||
|
floor = next_key[0] # i.e., the bottom of the top
|
||||||
|
ceiling = key[1] # i.e., the top of the bottom
|
||||||
|
next_weight = (heart_rate - floor) / (ceiling - floor)
|
||||||
|
|
||||||
|
table = table.merge(next_table, next_weight)
|
||||||
|
key = next_key
|
||||||
|
|
||||||
|
self.table = table
|
||||||
|
|
||||||
|
# ECG Tables are designed for 1Hz, and as a default, we don't want to
|
||||||
|
# stretch anything; hence, no reference to `self.heart_rate` here,
|
||||||
|
# instead constant 60:
|
||||||
|
self.inc = len(self.table) / (self.srate * (60 / 60))
|
||||||
|
|
||||||
|
# Stretch only the T_P segment to compensate, rather than
|
||||||
|
# stretching the whole wave.
|
||||||
|
table_segment_length = \
|
||||||
|
self.table.segments.P - self.table.segments.T_P
|
||||||
|
self.brady_inc = self.stretch_inc(table_segment_length)
|
||||||
|
|
||||||
|
# Preserve QRS-J-point; compress Jp-Q to compensate.
|
||||||
|
table_segment_length = \
|
||||||
|
self.table.segments.Q - self.table.segments.S_T
|
||||||
|
self.tachy_inc = self.stretch_inc(table_segment_length)
|
||||||
|
|
||||||
|
@AECGSynthesizer.heart_rate.setter
|
||||||
|
def heart_rate(self, val):
|
||||||
|
AECGSynthesizer.heart_rate.fset(self, val)
|
||||||
|
self._recalibrate(heart_rate=val)
|
||||||
|
|
||||||
|
def stretch_inc(self, table_segment_length):
|
||||||
|
# Get the missing samples by subtracting the number of samples
|
||||||
|
# contributed by the 1Hz table default, minus the segment we
|
||||||
|
# want to stretch.
|
||||||
|
tmp_wavelength = self.wavelength - \
|
||||||
|
(len(self.table) - table_segment_length) / self.inc
|
||||||
|
|
||||||
|
return table_segment_length / tmp_wavelength
|
||||||
|
|
||||||
|
@property
|
||||||
|
def segments(self):
|
||||||
|
if self._segments:
|
||||||
|
return self._segments
|
||||||
|
|
||||||
|
table_length = len(self.table)
|
||||||
|
table_segments = self.table.segments
|
||||||
|
# FIXME: this is a lie since we stretch T_P, etc.
|
||||||
|
self._segments = Segments(
|
||||||
|
**{
|
||||||
|
k: math.floor(v * self.srate / table_length) if v else v
|
||||||
|
for k, v in [
|
||||||
|
(f.name, getattr(table_segments, f.name))
|
||||||
|
for f in dataclasses.fields(table_segments)
|
||||||
|
]
|
||||||
|
}
|
||||||
|
)
|
||||||
|
return self._segments
|
||||||
104
src/pojagi_dsp/channel/ecg/generator/wavetable/sinus.py
Normal file
104
src/pojagi_dsp/channel/ecg/generator/wavetable/sinus.py
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
import math
|
||||||
|
import numpy as np
|
||||||
|
from numbers import Number
|
||||||
|
import random
|
||||||
|
from typing import List
|
||||||
|
from pojagi_dsp.channel.ecg import Segments
|
||||||
|
from pojagi_dsp.channel.ecg.generator.wavetable import ECGWaveTable
|
||||||
|
|
||||||
|
# NOTE: larger table required for avoiding aliasing at different srates than 125Hz
|
||||||
|
sinus_data = [
|
||||||
|
# R-S: 0
|
||||||
|
2000, 1822, 374,
|
||||||
|
|
||||||
|
# S-Jp: 3
|
||||||
|
-474, -271, -28, 18, 66,
|
||||||
|
|
||||||
|
# Jp-T: 9
|
||||||
|
63, 73, 91, 101, 101, 101, 116, 124,
|
||||||
|
124,
|
||||||
|
|
||||||
|
# T: 17
|
||||||
|
141, 171, 186, 196, 229, 265, 297, 327,
|
||||||
|
363, 406, 446, 475, 493, 508, 526, 533,
|
||||||
|
518, 475, 403, 327, 272, 222, 174, 138,
|
||||||
|
109, 88, 73, 66, 69, 69, 66, 73,
|
||||||
|
81, 76, 73, 76, 76, 66, 58, 58,
|
||||||
|
63, 63, 41, 26, 26, 18, 8, 8,
|
||||||
|
8,
|
||||||
|
|
||||||
|
# U: 66 -- not found
|
||||||
|
|
||||||
|
# T-P: 66
|
||||||
|
2, 3, 2, 2, 2, -1, 2, 2,
|
||||||
|
2, -1, 0, -1, -1, 3, 2, 1,
|
||||||
|
3, 2, 1, 0, 1,
|
||||||
|
|
||||||
|
# P: 87
|
||||||
|
0, 3, 11, 11, 0, 8, 18, 18,
|
||||||
|
18, 15, 8, 18, 26, 26, 26, 8,
|
||||||
|
32, 61, 116, 164, 182, 159, 131, 116,
|
||||||
|
116, 109, 91, 73, 58, 55, 58, 63,
|
||||||
|
69,
|
||||||
|
|
||||||
|
# P-R: 120
|
||||||
|
48, -14,
|
||||||
|
|
||||||
|
# Q-R: 122
|
||||||
|
-40, 131, 931,
|
||||||
|
] # len == 125
|
||||||
|
|
||||||
|
|
||||||
|
tpr = np.arange(-math.floor(120-18)/2, math.ceil(120-18)/2)
|
||||||
|
tpr_curve: np.ndarray = (tpr ** 2 * -0.1)
|
||||||
|
tpr_curve = (tpr_curve - tpr_curve[0]) + 141
|
||||||
|
|
||||||
|
tachycardia = np.array([ # 58-107 flat
|
||||||
|
# R-S: 0
|
||||||
|
2000, 1822, 374,
|
||||||
|
|
||||||
|
# S-Jp: 3
|
||||||
|
-474, -271, -28, 18, 66,
|
||||||
|
|
||||||
|
# Jp-T: 9
|
||||||
|
63, 73, 91, 101, 101, 101, 116, 124,
|
||||||
|
124,
|
||||||
|
|
||||||
|
*tpr_curve,
|
||||||
|
|
||||||
|
# P-R: 119
|
||||||
|
124, 48, -14,
|
||||||
|
|
||||||
|
# Q-R: 122
|
||||||
|
*(np.array([-40, 131, 931]) * 0.25),
|
||||||
|
|
||||||
|
]) * (2/3) # len == 125
|
||||||
|
|
||||||
|
|
||||||
|
def SinusWaveTable(): return ECGWaveTable(
|
||||||
|
data=sinus_data,
|
||||||
|
segments=Segments(
|
||||||
|
S=3,
|
||||||
|
S_T=9,
|
||||||
|
T=17,
|
||||||
|
T_P=66,
|
||||||
|
P=87,
|
||||||
|
P_R=120,
|
||||||
|
Q=122,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
def TachycardiaWaveTable(): return ECGWaveTable(
|
||||||
|
data=tachycardia,
|
||||||
|
segments=Segments(
|
||||||
|
S=3,
|
||||||
|
S_T=9,
|
||||||
|
T=17,
|
||||||
|
T_P=66,
|
||||||
|
P=87,
|
||||||
|
P_R=119,
|
||||||
|
Q=122,
|
||||||
|
),
|
||||||
|
top=2000,
|
||||||
|
bottom=0,
|
||||||
|
)
|
||||||
51
src/pojagi_dsp/channel/generator/sine.py
Normal file
51
src/pojagi_dsp/channel/generator/sine.py
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
import datetime
|
||||||
|
import math
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from pojagi_dsp.channel import ASignal
|
||||||
|
|
||||||
|
_2_PI = 2 * math.pi
|
||||||
|
|
||||||
|
|
||||||
|
class SineWave(ASignal[float]):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
hz: float,
|
||||||
|
phase: float = 0.0, # radians
|
||||||
|
srate: Optional[float] = None
|
||||||
|
):
|
||||||
|
super().__init__(srate)
|
||||||
|
self.hz = hz
|
||||||
|
self.phase = phase
|
||||||
|
|
||||||
|
@property
|
||||||
|
def wavelength(self): return self.srate/self.hz
|
||||||
|
|
||||||
|
def samples(self):
|
||||||
|
"""An iterator over one period."""
|
||||||
|
# zero-ish, but not always zero (to keep things smooth)
|
||||||
|
self.phase %= _2_PI
|
||||||
|
|
||||||
|
while self.phase < _2_PI:
|
||||||
|
inc = (_2_PI * self.hz)/self.srate
|
||||||
|
yield math.sin(self.phase)
|
||||||
|
self.phase += inc
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
from matplotlib import pyplot as plt
|
||||||
|
|
||||||
|
concert_a = 440
|
||||||
|
cd_quality_srate = 44100
|
||||||
|
sine = SineWave(concert_a, srate=cd_quality_srate)
|
||||||
|
|
||||||
|
values = []
|
||||||
|
hz = sine.hz
|
||||||
|
# for _ in range(10):
|
||||||
|
# values += list(sine)
|
||||||
|
|
||||||
|
for y in sine.of_duration(datetime.timedelta(milliseconds=10)):
|
||||||
|
values.append(y)
|
||||||
|
|
||||||
|
plt.plot(range(len(values)), values)
|
||||||
|
plt.show()
|
||||||
295
src/pojagi_dsp/series_frame/__init__.py
Normal file
295
src/pojagi_dsp/series_frame/__init__.py
Normal file
@@ -0,0 +1,295 @@
|
|||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from dataclasses import dataclass
|
||||||
|
import logging
|
||||||
|
from typing import Iterator, List, Literal, Optional, Type, Union
|
||||||
|
|
||||||
|
from physiq_cloud.series_frame import fb_builder
|
||||||
|
from physiq_cloud.series_frame.fb_wrapper import FlatBufferWrapper
|
||||||
|
from physiq_cloud.time import Instant, TimeUnit
|
||||||
|
from physiqfb.SeriesFrame import SeriesFrame
|
||||||
|
from physiqfb.SeriesFrameHolderList import SeriesFrameHolderList
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class SamplingSetInfo(BaseModel):
|
||||||
|
id: int
|
||||||
|
alias: str
|
||||||
|
timestamped: bool
|
||||||
|
# FIXME: not in the INFO.yaml, but isn't this just the `frame_size_micros`
|
||||||
|
# divided by the number of values in the channel data? We could either get
|
||||||
|
# it from the first frame, or look it up in the API/repo.
|
||||||
|
# frequency: int
|
||||||
|
|
||||||
|
|
||||||
|
class SftInfo(BaseModel):
|
||||||
|
alias: Optional[str]
|
||||||
|
frame_size_micros: int
|
||||||
|
max_frame_size_bytes: int
|
||||||
|
sampling_sets: List[SamplingSetInfo]
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class SeriesFramePackage:
|
||||||
|
frame: FlatBufferWrapper[SeriesFrame]
|
||||||
|
sft_info: SftInfo
|
||||||
|
|
||||||
|
def relative_instant(self, offset=0):
|
||||||
|
return Instant.from_unix(
|
||||||
|
TimeUnit.MICROSECONDS,
|
||||||
|
(self.frame.fb.FrameId() + offset)
|
||||||
|
* self.sft_info.frame_size_micros,
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def start(self):
|
||||||
|
return self.relative_instant(0)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def end(self):
|
||||||
|
return self.relative_instant(1)
|
||||||
|
|
||||||
|
|
||||||
|
class ASeriesFrameEmitter(ABC):
|
||||||
|
"""
|
||||||
|
Abstract class defining the characteristics of a series frame emitter,
|
||||||
|
which is designed to be subclassed by all generators and filters.
|
||||||
|
|
||||||
|
A "generator" is a type of emitter that produces frames from "nothing";
|
||||||
|
i.e., by some external process, such as reading a file, an API, etc.; or
|
||||||
|
by generating frames algorithmically.
|
||||||
|
|
||||||
|
A "filter" is any emitter that takes another emitter and affects its
|
||||||
|
:meth:`frames` in some arbitrary way. This terminology is borrowed from
|
||||||
|
the digital signal processing domain, and doesn't imply that the signal
|
||||||
|
will be reduced or shortened, etc. In fact the signal coming from the
|
||||||
|
injected emitter might be lengthened or amplified by a filter to any
|
||||||
|
extent, including the maximum or infinity, etc.
|
||||||
|
|
||||||
|
This class defines an abstract property called `frames` that should
|
||||||
|
trigger the pipeline to begin and return an `Iterator` of
|
||||||
|
:class:`SeriesFramePackage`.
|
||||||
|
|
||||||
|
These emitters are designed to be chained together with filters. E.g., a
|
||||||
|
typical scenario would be to start with some generator, and chain it
|
||||||
|
together with one or more filters:
|
||||||
|
|
||||||
|
```
|
||||||
|
my_generator = SomeGenerator()
|
||||||
|
my_first_filter = SomeFilter(reader=my_generator)
|
||||||
|
my_second_filter = SomeOtherFilter(reader=my_first_filter)
|
||||||
|
for frames in my_second_filter.frames: ...
|
||||||
|
```
|
||||||
|
|
||||||
|
Alternatively, the `__or__` method has been overloaded so that you can
|
||||||
|
also do something equivalent to the above like this:
|
||||||
|
|
||||||
|
```
|
||||||
|
for frame in (
|
||||||
|
SomeGenerator()
|
||||||
|
| SomeFilter
|
||||||
|
| SomeOtherFilter
|
||||||
|
): ...
|
||||||
|
```
|
||||||
|
|
||||||
|
NOTE: Accessing `frames` may trigger side effects; it starts a chain of
|
||||||
|
calls to the reader accessors in the pipeline, which restarts the pipeline
|
||||||
|
anew each time. Use :meth:`cursor` to access the existing `Iterator` if
|
||||||
|
you need multiple accesses.
|
||||||
|
|
||||||
|
i.e., calling
|
||||||
|
```
|
||||||
|
next(reader.cursor)
|
||||||
|
```
|
||||||
|
is different than
|
||||||
|
```
|
||||||
|
next(reader.frames())
|
||||||
|
```
|
||||||
|
or
|
||||||
|
```
|
||||||
|
next(reader.stream())
|
||||||
|
```
|
||||||
|
in that calling `stream` is like making a database query (that returns and
|
||||||
|
caches a cursor), and `cursor` is just providing access to the cached
|
||||||
|
cursor, while calling `frames` directly provides an Iterator without
|
||||||
|
caching a cursor.
|
||||||
|
|
||||||
|
That said, :meth:`cursor` is empty until :meth:`stream` is called for the
|
||||||
|
first time. (It requires the initial "query" that :meth:`frames` provides.)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, sft_info: Optional[SftInfo] = None) -> None:
|
||||||
|
"""
|
||||||
|
:param:`sft_info` - an :class:`SftInfo` describing the series frame
|
||||||
|
type this emitter emits.
|
||||||
|
"""
|
||||||
|
super().__init__()
|
||||||
|
self._sft_info = sft_info
|
||||||
|
|
||||||
|
@property
|
||||||
|
def sft_info(self):
|
||||||
|
if not self._sft_info:
|
||||||
|
raise RuntimeError(
|
||||||
|
"Illegal state: `_sft_info` is None."
|
||||||
|
)
|
||||||
|
return self._sft_info
|
||||||
|
|
||||||
|
@sft_info.setter
|
||||||
|
def sft_info(self, val):
|
||||||
|
self._sft_info = val
|
||||||
|
|
||||||
|
def __iter__(self): return self.stream()
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def frames(self) -> Iterator[SeriesFramePackage]: ...
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cursor(self):
|
||||||
|
"""
|
||||||
|
An `Iterator` representing the current pipeline in progress.
|
||||||
|
"""
|
||||||
|
return self._cursor
|
||||||
|
|
||||||
|
def stream(self):
|
||||||
|
"""
|
||||||
|
Start the pipeline and return an :class:`Iterator` of
|
||||||
|
:class:`SeriesFramePackage`.
|
||||||
|
|
||||||
|
Each time :meth:`frames` is accessed, a new `Iterator` is instantiated by
|
||||||
|
accessing the `reader`'s `frames`, which may trigger side effects. It
|
||||||
|
will also start the process pipeline over, so if you need to iterate
|
||||||
|
over the frames with multiple calls within a subclass of this, you should
|
||||||
|
use :meth:`cursor` instead, which returns the same object returned from this
|
||||||
|
method, without restarting the pipeline.
|
||||||
|
"""
|
||||||
|
self._cursor = self.frames()
|
||||||
|
return self.cursor
|
||||||
|
|
||||||
|
def __or__(
|
||||||
|
self,
|
||||||
|
right: Union["SeriesFrameFilter", Type["SeriesFrameFilter"]],
|
||||||
|
) -> "SeriesFrameFilter":
|
||||||
|
if callable(right):
|
||||||
|
right = right()
|
||||||
|
|
||||||
|
right.reader = self
|
||||||
|
right.sft_info = self.sft_info
|
||||||
|
return right
|
||||||
|
|
||||||
|
def sfhls(
|
||||||
|
self,
|
||||||
|
max_bytelen: int,
|
||||||
|
) -> Iterator[FlatBufferWrapper[SeriesFrameHolderList]]:
|
||||||
|
"""
|
||||||
|
Repackage input frames as a series frame holder list (sfhl) iterator.
|
||||||
|
:param:`max_bytelen` - the max byte length allowed per sfhl.
|
||||||
|
"""
|
||||||
|
chunk = list()
|
||||||
|
chunk_bytelen = 0
|
||||||
|
|
||||||
|
for pkg in self.frames():
|
||||||
|
if chunk_bytelen + len(pkg.frame.bytes) > max_bytelen:
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
yield sfhl_from_frames(chunk)
|
||||||
|
chunk_bytelen = chruncate_frames(chunk)
|
||||||
|
|
||||||
|
chunk_bytelen += append_frame(chunk, pkg.frame)
|
||||||
|
|
||||||
|
if chunk:
|
||||||
|
yield sfhl_from_frames(chunk)
|
||||||
|
chunk_bytelen = chruncate_frames(chunk)
|
||||||
|
|
||||||
|
|
||||||
|
class SeriesFrameFilter(ASeriesFrameEmitter):
|
||||||
|
"""
|
||||||
|
Class defining the characteristics of a filter, which is a kind of emitter.
|
||||||
|
|
||||||
|
The difference between a filter and an emitter is:
|
||||||
|
|
||||||
|
1. an emitter doesn't know how to provide `frames` (no concrete
|
||||||
|
implementation)
|
||||||
|
2. a filter assumes it will read `frames` from another injected emitter
|
||||||
|
(called a `reader`), and by default simply returns those `frames`
|
||||||
|
unadulterated.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
reader: Optional[ASeriesFrameEmitter] = None,
|
||||||
|
**kwargs
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
:param:`reader` - input stream this filter reads.
|
||||||
|
"""
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
self._reader = reader
|
||||||
|
self._cursor: Iterator[SeriesFramePackage] = iter(())
|
||||||
|
|
||||||
|
@property
|
||||||
|
def reader(self) -> ASeriesFrameEmitter:
|
||||||
|
"""
|
||||||
|
The input stream this filter reads.
|
||||||
|
"""
|
||||||
|
if self._reader is None:
|
||||||
|
raise RuntimeError("Illegal state: `reader` is None.")
|
||||||
|
return self._reader
|
||||||
|
|
||||||
|
@reader.setter
|
||||||
|
def reader(self, val):
|
||||||
|
self._reader = val
|
||||||
|
|
||||||
|
def frames(self):
|
||||||
|
"""
|
||||||
|
This has to exist because it's subclassing an abstract class that
|
||||||
|
declares it, but a concrete implementation of this would be confusing.
|
||||||
|
Generally speaking, implementations should read the `self.reader` and
|
||||||
|
do something to modify the frames.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
|
||||||
|
def sfhl_from_frames(
|
||||||
|
frames: Iterator[FlatBufferWrapper[SeriesFrame]]
|
||||||
|
) -> FlatBufferWrapper[SeriesFrameHolderList]:
|
||||||
|
builder = fb_builder.Builder(0)
|
||||||
|
builder.Finish(
|
||||||
|
fb_builder.CreateSeriesFrameHolderList(
|
||||||
|
builder=builder,
|
||||||
|
frames=[
|
||||||
|
fb_builder.CreateSeriesFrameHolder(
|
||||||
|
builder=builder,
|
||||||
|
data=frame.bytes,
|
||||||
|
)
|
||||||
|
for frame in frames
|
||||||
|
]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return FlatBufferWrapper(
|
||||||
|
_bytes=builder.Output(),
|
||||||
|
schema=SeriesFrameHolderList,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def append_frame(
|
||||||
|
chunk: List[FlatBufferWrapper[SeriesFrame]],
|
||||||
|
frame: FlatBufferWrapper[SeriesFrame],
|
||||||
|
) -> int:
|
||||||
|
"""
|
||||||
|
Appends a frame to the given chunk and returns the bytelen it appended.
|
||||||
|
"""
|
||||||
|
chunk.append(frame)
|
||||||
|
return len(frame.bytes)
|
||||||
|
|
||||||
|
|
||||||
|
def chruncate_frames(
|
||||||
|
chunk: List[FlatBufferWrapper[SeriesFrame]],
|
||||||
|
) -> Literal[0]:
|
||||||
|
"""
|
||||||
|
"Truncates" a "chunk" of frames to zero. Returns bytelen (constantly `0`)
|
||||||
|
to be consistent with :func:`append_frame`.
|
||||||
|
"""
|
||||||
|
chunk.clear()
|
||||||
|
return 0
|
||||||
106
src/pojagi_dsp/series_frame/ext.py
Normal file
106
src/pojagi_dsp/series_frame/ext.py
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
import logging
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Dict, List, Optional
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
from physiq_cloud.time import Instant, Interval, TimeUnit
|
||||||
|
from pydantic import BaseModel, validator
|
||||||
|
|
||||||
|
from pojagi_dsp.series_frame import SftInfo
|
||||||
|
from pojagi_dsp.series_frame.generator.sampler import SftInterval
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ExcerptType(BaseModel):
|
||||||
|
series_frame_type: str # vci-vitalpatch-telemetry
|
||||||
|
sft_interval: Optional[SftInterval]
|
||||||
|
|
||||||
|
|
||||||
|
class Excerpt(BaseModel):
|
||||||
|
account: str # uuid4
|
||||||
|
id: str # patient1
|
||||||
|
interval: Interval # "iso/iso"
|
||||||
|
organization: str # "maps"
|
||||||
|
source_id: str # "maps.physiq.io"
|
||||||
|
types: List[ExcerptType]
|
||||||
|
|
||||||
|
@validator("interval", pre=True)
|
||||||
|
def timespan_to_interval(cls, timespan: str): return Interval(
|
||||||
|
**{
|
||||||
|
["start", "end_exclusive"][idx]: Instant.parse_iso8601(stamp)
|
||||||
|
for idx, stamp in enumerate(timespan.split("/"))
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ExtBundleConfig(BaseModel):
|
||||||
|
excerpts: List[Excerpt]
|
||||||
|
id: str # clinical-event-trigger
|
||||||
|
type: str = "PhysIQCloud"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class BundleConfig:
|
||||||
|
excerpts: Dict[str, Excerpt]
|
||||||
|
id: str
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def create(bundle_dir: Path) -> "BundleConfig":
|
||||||
|
yaml_files = (x.resolve() for x in bundle_dir.glob("**/*.yaml"))
|
||||||
|
|
||||||
|
try:
|
||||||
|
# the first yaml you encounter is BUNDLE_CONFIG.yaml
|
||||||
|
ext_bundle_config = ExtBundleConfig.parse_obj(
|
||||||
|
yaml.safe_load(next(yaml_files).read_text())
|
||||||
|
)
|
||||||
|
except StopIteration as si:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"No yaml files found under \"{bundle_dir}\"."
|
||||||
|
) from si
|
||||||
|
|
||||||
|
# the rest are INFO.yaml files
|
||||||
|
info_files = list(yaml_files)
|
||||||
|
|
||||||
|
for excerpt in ext_bundle_config.excerpts:
|
||||||
|
logger.debug(f"Loading excerpt \"{excerpt.id}\".")
|
||||||
|
|
||||||
|
for type_ in excerpt.types:
|
||||||
|
logger.debug(f"Loading SFT \"{type_.series_frame_type}\".")
|
||||||
|
# Make sure the info file belongs to this excerpt
|
||||||
|
INFO = next(
|
||||||
|
x for x in info_files
|
||||||
|
if x.as_posix()
|
||||||
|
.split("/")[-3]
|
||||||
|
.startswith(excerpt.id)
|
||||||
|
if x.as_posix()
|
||||||
|
.split("/")[-2]
|
||||||
|
.startswith(type_.series_frame_type)
|
||||||
|
)
|
||||||
|
|
||||||
|
sft_info = SftInfo.parse_obj(
|
||||||
|
yaml.safe_load(INFO.read_text())
|
||||||
|
)
|
||||||
|
|
||||||
|
sft_info.alias = type_.series_frame_type
|
||||||
|
|
||||||
|
type_.sft_interval = SftInterval(
|
||||||
|
info=sft_info,
|
||||||
|
interval=excerpt.interval,
|
||||||
|
frame_count=(
|
||||||
|
excerpt.interval
|
||||||
|
.to_duration()
|
||||||
|
.to_unix(TimeUnit.MICROSECONDS)
|
||||||
|
// sft_info.frame_size_micros
|
||||||
|
),
|
||||||
|
files=sorted(INFO.parent.glob("**/*.sfhl")),
|
||||||
|
)
|
||||||
|
|
||||||
|
return BundleConfig(
|
||||||
|
id=ext_bundle_config.id,
|
||||||
|
excerpts={
|
||||||
|
x.id: x
|
||||||
|
for x in ext_bundle_config.excerpts
|
||||||
|
},
|
||||||
|
)
|
||||||
0
src/pojagi_dsp/series_frame/filter/__init__.py
Normal file
0
src/pojagi_dsp/series_frame/filter/__init__.py
Normal file
40
src/pojagi_dsp/series_frame/filter/looping.py
Normal file
40
src/pojagi_dsp/series_frame/filter/looping.py
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
import logging
|
||||||
|
import math
|
||||||
|
from typing import Iterator, Union
|
||||||
|
|
||||||
|
from pojagi_dsp.series_frame import (SeriesFrameFilter,
|
||||||
|
SeriesFramePackage)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class Looping(SeriesFrameFilter):
|
||||||
|
"""
|
||||||
|
Filter that loops through the input reader's frames until it reaches
|
||||||
|
the given :param:`iterations`
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
iterations: Union[int, float] = math.inf,
|
||||||
|
**kwargs,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
:param:`iterations` - number of times to loop through the `frames`.
|
||||||
|
"""
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
self.iterations = iterations
|
||||||
|
self.iteration = 0
|
||||||
|
|
||||||
|
def increment(self) -> None:
|
||||||
|
"""Override to react to `iteration` increment."""
|
||||||
|
self.iteration += 1
|
||||||
|
logger.debug(
|
||||||
|
f"Loop incremented ({self.sft_info.alias}): {self.iteration}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def frames(self) -> Iterator[SeriesFramePackage]:
|
||||||
|
while self.iteration < self.iterations:
|
||||||
|
for pkg in self.reader.frames():
|
||||||
|
yield pkg
|
||||||
|
self.increment()
|
||||||
277
src/pojagi_dsp/series_frame/filter/replaying.py
Normal file
277
src/pojagi_dsp/series_frame/filter/replaying.py
Normal file
@@ -0,0 +1,277 @@
|
|||||||
|
import functools
|
||||||
|
import logging
|
||||||
|
from typing import Any, Iterator, List, Optional
|
||||||
|
|
||||||
|
import flatbuffers
|
||||||
|
import numpy as np
|
||||||
|
from physiq_cloud.series_frame import fb_builder
|
||||||
|
from physiq_cloud.series_frame.fb.math import ceildiv
|
||||||
|
from physiq_cloud.series_frame.fb_to_numpy import from_channel_data
|
||||||
|
from physiq_cloud.series_frame.fb_wrapper import FlatBufferWrapper
|
||||||
|
from physiq_cloud.time import Instant, TimeUnit
|
||||||
|
from physiqfb.ChannelData import ChannelData
|
||||||
|
from physiqfb.ReadingType import ReadingType
|
||||||
|
from physiqfb.SamplingSetData import SamplingSetData
|
||||||
|
from physiqfb.SeriesFrame import SeriesFrame
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from pojagi_dsp.series_frame import (
|
||||||
|
ASeriesFrameEmitter,
|
||||||
|
SeriesFrameFilter,
|
||||||
|
SeriesFramePackage,
|
||||||
|
SftInfo,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class Synchronize(BaseModel):
|
||||||
|
hour: bool = False
|
||||||
|
minute: bool = False
|
||||||
|
second: bool = False
|
||||||
|
microsecond: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
class Replaying(SeriesFrameFilter):
|
||||||
|
"""
|
||||||
|
Filter that replays input frames starting at a given :class:`Instant`
|
||||||
|
in time.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
emit_start: Instant = Instant.now(),
|
||||||
|
synchronize: Optional[Synchronize] = None,
|
||||||
|
**kwargs,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
:param:`emit_start` - instant in time to replay the frames.
|
||||||
|
:param:`synchronize` - Whether and at what granularity to synchronize the Instant from the source
|
||||||
|
data with the configured :param:`emit_start`. This will alter the precision of the
|
||||||
|
emit start seconds depending on the level of granularity you choose,
|
||||||
|
possibly drastically.
|
||||||
|
"""
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
|
||||||
|
self.frame_idx = 0
|
||||||
|
|
||||||
|
self.emit_start = emit_start
|
||||||
|
self.synchronize = synchronize
|
||||||
|
self._first_frame_id: int | None = None
|
||||||
|
self._start_frame_id_inst: int | None = None
|
||||||
|
|
||||||
|
@ASeriesFrameEmitter.sft_info.setter
|
||||||
|
def sft_info(self, val: SftInfo):
|
||||||
|
"""
|
||||||
|
This override's :class:`ASeriesFrameFilter`'s :meth:`sft_info`
|
||||||
|
setter, so that we can initialize the `timestamped_map` property.
|
||||||
|
"""
|
||||||
|
self._sft_info = val
|
||||||
|
self.timestamped_map = {
|
||||||
|
s.id: s.timestamped for s in self.sft_info.sampling_sets
|
||||||
|
}
|
||||||
|
|
||||||
|
@property
|
||||||
|
def start_frame_id(self):
|
||||||
|
"""
|
||||||
|
:meth:`frames` must be called before accessing this for the first time,
|
||||||
|
because the calculation depends on the first frame.
|
||||||
|
"""
|
||||||
|
if self._start_frame_id_inst is not None:
|
||||||
|
return self._start_frame_id_inst
|
||||||
|
|
||||||
|
if not self.synchronize or not any(v for _, v in self.synchronize):
|
||||||
|
self.synchronized_emit_start = self.emit_start
|
||||||
|
else:
|
||||||
|
emit_start_dt = self.emit_start.to_datetime()
|
||||||
|
source_start_dt = Instant.from_unix(
|
||||||
|
TimeUnit.MICROSECONDS,
|
||||||
|
self._first_frame_id * self.sft_info.frame_size_micros,
|
||||||
|
).to_datetime()
|
||||||
|
|
||||||
|
self.synchronized_emit_start = Instant.from_datetime(
|
||||||
|
emit_start_dt.replace(
|
||||||
|
**{
|
||||||
|
k: getattr(source_start_dt, k)
|
||||||
|
if getattr(self.synchronize, k)
|
||||||
|
else getattr(emit_start_dt, k)
|
||||||
|
for k in ["hour", "minute", "second", "microsecond"]
|
||||||
|
},
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
# We use ceildiv to calculate the first frame ID, because normal
|
||||||
|
# division would likely yield a frame BEFORE the emit_start. We want to
|
||||||
|
# start emitting frames at or after emit_start, and ceildiv does that
|
||||||
|
# for us.
|
||||||
|
self._start_frame_id_inst = ceildiv(
|
||||||
|
self.synchronized_emit_start.to_unix(
|
||||||
|
TimeUnit.MICROSECONDS,
|
||||||
|
),
|
||||||
|
self.sft_info.frame_size_micros,
|
||||||
|
)
|
||||||
|
|
||||||
|
return self._start_frame_id_inst
|
||||||
|
|
||||||
|
def replayed_frame(
|
||||||
|
self,
|
||||||
|
source: SeriesFrame,
|
||||||
|
target_frame_id: int,
|
||||||
|
) -> FlatBufferWrapper[SeriesFrame]:
|
||||||
|
builder = flatbuffers.Builder(0)
|
||||||
|
target = fb_builder.CreateSeriesFrame(
|
||||||
|
builder=builder,
|
||||||
|
frame_id=target_frame_id,
|
||||||
|
ingested_at_micros=-1, # source.IngestedAtMicros(),
|
||||||
|
sampling_sets_offsets=[
|
||||||
|
replayed_sampling_set(
|
||||||
|
builder=builder,
|
||||||
|
sampling_set_data=sampling_set_data,
|
||||||
|
source_frame_id=source.FrameId(),
|
||||||
|
target_frame_id=target_frame_id,
|
||||||
|
frame_size_micros=self.sft_info.frame_size_micros,
|
||||||
|
is_timestamped=self.timestamped_map[sampling_set_data.Id()],
|
||||||
|
)
|
||||||
|
for sampling_set_data in [
|
||||||
|
source.SamplingSets(idx)
|
||||||
|
for idx in range(source.SamplingSetsLength())
|
||||||
|
]
|
||||||
|
],
|
||||||
|
submitter_id=-1, # source.SubmitterId(),
|
||||||
|
sensor_id=-1, # source.SensorId(),
|
||||||
|
)
|
||||||
|
builder.Finish(target)
|
||||||
|
target_frame_bytes = builder.Output()
|
||||||
|
|
||||||
|
return FlatBufferWrapper(
|
||||||
|
_bytes=target_frame_bytes,
|
||||||
|
schema=SeriesFrame,
|
||||||
|
)
|
||||||
|
|
||||||
|
def frames(self) -> Iterator[SeriesFramePackage]:
|
||||||
|
"""
|
||||||
|
Each time this property is accessed, it will access the injected
|
||||||
|
reader anew, but continue to increment the `frame_idx` so that
|
||||||
|
the next frame will replay where the last one left off.
|
||||||
|
|
||||||
|
To reset the `frame_idx`, client code can just set the `frame_idx`
|
||||||
|
manually (or create a new instance).
|
||||||
|
"""
|
||||||
|
|
||||||
|
for pkg in self.reader.frames():
|
||||||
|
if self._first_frame_id is None:
|
||||||
|
self._first_frame_id = pkg.frame.fb.FrameId()
|
||||||
|
|
||||||
|
yield SeriesFramePackage(
|
||||||
|
frame=self.replayed_frame(
|
||||||
|
source=pkg.frame.fb,
|
||||||
|
target_frame_id=self.start_frame_id + self.frame_idx,
|
||||||
|
),
|
||||||
|
sft_info=pkg.sft_info,
|
||||||
|
)
|
||||||
|
# `int` in python 3 is unbounded, can continue indefinitely. Also,
|
||||||
|
# the `FrameId` is effectively a timestamp, so continuously
|
||||||
|
# `inc`ing this value is by design.
|
||||||
|
self.frame_idx += 1
|
||||||
|
|
||||||
|
|
||||||
|
def replayed_sampling_set(
|
||||||
|
builder: flatbuffers.Builder,
|
||||||
|
sampling_set_data: SamplingSetData,
|
||||||
|
source_frame_id: int,
|
||||||
|
target_frame_id: int,
|
||||||
|
frame_size_micros: int,
|
||||||
|
is_timestamped: bool,
|
||||||
|
):
|
||||||
|
source_channels = [
|
||||||
|
sampling_set_data.Channels(idx)
|
||||||
|
for idx in range(sampling_set_data.ChannelsLength())
|
||||||
|
]
|
||||||
|
|
||||||
|
target_channels: List[int] = list()
|
||||||
|
for source_channel_data in source_channels:
|
||||||
|
is_timestamp_channel = is_timestamped and source_channel_data.Id() == 0
|
||||||
|
|
||||||
|
if not is_timestamp_channel or (source_frame_id == target_frame_id):
|
||||||
|
# Just copy verbatim
|
||||||
|
target_channel_data = replayed_channel_data(
|
||||||
|
builder,
|
||||||
|
source_channel_data,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Update the timestamps relative to the target frameId
|
||||||
|
source_frame_start_micros = source_frame_id * frame_size_micros
|
||||||
|
target_frame_start_micros = target_frame_id * frame_size_micros
|
||||||
|
|
||||||
|
# np.copy is used because the returned array is read-only
|
||||||
|
timestamps = np.copy(from_channel_data(data=source_channel_data))
|
||||||
|
timestamps -= source_frame_start_micros # relativize
|
||||||
|
timestamps += target_frame_start_micros # frame shift
|
||||||
|
|
||||||
|
target_channel_data = fb_builder.CreateChannelData(
|
||||||
|
builder=builder,
|
||||||
|
id=0, # `source_channel_data.Id()` is constantly 0 here
|
||||||
|
readings_offset=fb_builder.CreateInt64Channel(
|
||||||
|
builder=builder,
|
||||||
|
data=timestamps.tolist(), # List[int]
|
||||||
|
),
|
||||||
|
readings_type=ReadingType.INT64,
|
||||||
|
)
|
||||||
|
|
||||||
|
target_channels.append(target_channel_data)
|
||||||
|
|
||||||
|
np_relative_timestamps = sampling_set_data.RelativeTimestampsAsNumpy()
|
||||||
|
relative_timestamps = (
|
||||||
|
np_relative_timestamps.tobytes()
|
||||||
|
if np_relative_timestamps not in {0, -1} # error conditions/no data
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
|
||||||
|
return fb_builder.CreateSamplingSetData(
|
||||||
|
builder=builder,
|
||||||
|
channel_offsets=target_channels,
|
||||||
|
id=sampling_set_data.Id(),
|
||||||
|
start_offset_micros=sampling_set_data.StartOffsetMicros(),
|
||||||
|
relative_timestamps_type=sampling_set_data.RelativeTimestampsType(),
|
||||||
|
relative_timestamps_unit_in_micros=sampling_set_data.RelativeTimestampsUnitInMicros(),
|
||||||
|
relative_timestamps=relative_timestamps,
|
||||||
|
relative_timestamps_sample_count=sampling_set_data.RelativeTimestampsSampleCount(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def replayed_channel_data(
|
||||||
|
builder: flatbuffers.Builder,
|
||||||
|
channel_data: ChannelData,
|
||||||
|
) -> int:
|
||||||
|
readings_type = channel_data.ReadingsType()
|
||||||
|
|
||||||
|
return fb_builder.CreateChannelData(
|
||||||
|
builder=builder,
|
||||||
|
id=channel_data.Id(),
|
||||||
|
readings_offset=channel(
|
||||||
|
builder=builder,
|
||||||
|
readings_type=readings_type,
|
||||||
|
readings=from_channel_data(data=channel_data).tolist(),
|
||||||
|
),
|
||||||
|
readings_type=readings_type,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
reading_type_map = {
|
||||||
|
ReadingType.BINARY32: fb_builder.CreateBinary32Channel,
|
||||||
|
ReadingType.BINARY64: fb_builder.CreateBinary64Channel,
|
||||||
|
ReadingType.INT8: fb_builder.CreateInt8Channel,
|
||||||
|
ReadingType.INT16: fb_builder.CreateInt16Channel,
|
||||||
|
ReadingType.INT32: fb_builder.CreateInt32Channel,
|
||||||
|
ReadingType.INT64: fb_builder.CreateInt64Channel,
|
||||||
|
ReadingType.NAMED_READING: fb_builder.CreateNamedReadingChannel,
|
||||||
|
ReadingType.STRING: fb_builder.CreateStringChannel,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def channel(
|
||||||
|
builder: flatbuffers.Builder,
|
||||||
|
readings_type: int,
|
||||||
|
readings: List[Any],
|
||||||
|
) -> int:
|
||||||
|
return reading_type_map[readings_type](builder=builder, data=readings)
|
||||||
76
src/pojagi_dsp/series_frame/filter/updating.py
Normal file
76
src/pojagi_dsp/series_frame/filter/updating.py
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
import logging
|
||||||
|
from typing import Callable, Iterator
|
||||||
|
|
||||||
|
from physiq_cloud.time import Duration, Instant, TimeUnit
|
||||||
|
|
||||||
|
from pojagi_dsp.series_frame import SeriesFrameFilter, SeriesFramePackage
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class Updating(SeriesFrameFilter):
|
||||||
|
"""
|
||||||
|
Filter that outputs frames until it runs out or the next frame would
|
||||||
|
surpass a given :class:`Instant` in time, whichever comes first.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
until: Callable[[], Instant] = Instant.now,
|
||||||
|
**kwargs,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
:param:`until` - closure returning an `Instant` at which to stop
|
||||||
|
iterating when the next frame would surpass it. The cursor will
|
||||||
|
otherwise persist until it has been exhausted.
|
||||||
|
"""
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
self.until = until
|
||||||
|
self.last_update_count = 0
|
||||||
|
self._next_pkg = None
|
||||||
|
|
||||||
|
def log_status(filter, end: Instant, until: Instant):
|
||||||
|
between = Duration.between(end, until).to_unix(TimeUnit.MILLISECONDS) / float(
|
||||||
|
1000
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.debug(
|
||||||
|
f"{filter.sft_info.alias}: Update scope : {between} seconds"
|
||||||
|
if end < until
|
||||||
|
else (
|
||||||
|
f"{filter.sft_info.alias}: "
|
||||||
|
f"Next update available in {-between} seconds"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def frames(self) -> Iterator[SeriesFramePackage]:
|
||||||
|
self.last_update_count = 0
|
||||||
|
|
||||||
|
if self._next_pkg is None:
|
||||||
|
cursor = self.reader.stream()
|
||||||
|
self._next_pkg = next(cursor)
|
||||||
|
else:
|
||||||
|
cursor = self.reader.cursor
|
||||||
|
|
||||||
|
logger.debug(f"{self.sft_info.alias}: {cursor}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
until = self.until()
|
||||||
|
end = self._next_pkg.end
|
||||||
|
|
||||||
|
self.log_status(end, until)
|
||||||
|
|
||||||
|
while end < until:
|
||||||
|
yield self._next_pkg
|
||||||
|
self.last_update_count += 1
|
||||||
|
self._next_pkg = next(cursor)
|
||||||
|
end = self._next_pkg.end
|
||||||
|
except StopIteration:
|
||||||
|
logger.warning("StopIteration caught")
|
||||||
|
logger.warning(f"{self.sft_info.alias}: {cursor}")
|
||||||
|
...
|
||||||
|
|
||||||
|
if self.last_update_count:
|
||||||
|
logger.debug(
|
||||||
|
f"{self.sft_info.alias}: " f"Update frames : {self.last_update_count}"
|
||||||
|
)
|
||||||
15
src/pojagi_dsp/series_frame/generator/sampler/__init__.py
Normal file
15
src/pojagi_dsp/series_frame/generator/sampler/__init__.py
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
from physiq_cloud.time import Interval
|
||||||
|
|
||||||
|
from pojagi_dsp.series_frame import SftInfo
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class SftInterval:
|
||||||
|
info: SftInfo
|
||||||
|
interval: Interval
|
||||||
|
frame_count: int
|
||||||
|
files: List[Path]
|
||||||
95
src/pojagi_dsp/series_frame/generator/sampler/file.py
Normal file
95
src/pojagi_dsp/series_frame/generator/sampler/file.py
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
import logging
|
||||||
|
import math
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Iterator
|
||||||
|
|
||||||
|
from physiq_cloud.series_frame.fb_wrapper import FlatBufferWrapper
|
||||||
|
from physiqfb.SeriesFrame import SeriesFrame
|
||||||
|
from physiqfb.SeriesFrameHolderList import SeriesFrameHolderList
|
||||||
|
|
||||||
|
from pojagi_dsp.series_frame import (ASeriesFrameEmitter,
|
||||||
|
SeriesFramePackage)
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class FileSampler(ASeriesFrameEmitter):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
*sfhl_files: Path,
|
||||||
|
frame_count: int = math.inf,
|
||||||
|
frame_offset: int = 0,
|
||||||
|
**kwargs,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
:param:`sfhl_files` - One or more files containing sfhl data. It's
|
||||||
|
assumed that the files are sorted and contain contiguous (sorted)
|
||||||
|
data, and no effort is made to validate that or handle any error
|
||||||
|
condition related to correctness. The files are treated as a single
|
||||||
|
logical file, and an `EOFError` is raised if the `frame_count` wants
|
||||||
|
more than the available frames.
|
||||||
|
|
||||||
|
:param:`frame_count` - The number of frames this generator will
|
||||||
|
attempt to read from the given files.
|
||||||
|
|
||||||
|
:param:`frame_offset` - The number of frames to skip over before
|
||||||
|
yielding frames from the given files.
|
||||||
|
"""
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
self.files = sfhl_files
|
||||||
|
self.frame_offset = frame_offset
|
||||||
|
self.frame_count = frame_count
|
||||||
|
self.frame_idx = 0
|
||||||
|
|
||||||
|
def frames_from_sfhl_bytes(
|
||||||
|
self, data: bytes,
|
||||||
|
) -> Iterator[FlatBufferWrapper[SeriesFrame]]:
|
||||||
|
"""
|
||||||
|
Emit frames from series frame holder list data.
|
||||||
|
|
||||||
|
:param:`data` - sfhl bytes.
|
||||||
|
"""
|
||||||
|
sfhl = SeriesFrameHolderList.GetRootAsSeriesFrameHolderList(data, 0)
|
||||||
|
for index in range(sfhl.FramesLength()):
|
||||||
|
yield FlatBufferWrapper(
|
||||||
|
_bytes=sfhl.Frames(index).DataAsNumpy().tobytes(),
|
||||||
|
schema=SeriesFrame,
|
||||||
|
)
|
||||||
|
|
||||||
|
def frames(self) -> Iterator[SeriesFramePackage]:
|
||||||
|
self.frame_idx = 0
|
||||||
|
end_frame_idx = self.frame_offset + self.frame_count
|
||||||
|
|
||||||
|
for file in self.files:
|
||||||
|
"""
|
||||||
|
Finite set of files passed in.
|
||||||
|
"""
|
||||||
|
for frame in self.frames_from_sfhl_bytes(file.read_bytes()):
|
||||||
|
"""
|
||||||
|
Finite set of frames from `read_bytes`.
|
||||||
|
"""
|
||||||
|
if self.frame_idx < self.frame_offset:
|
||||||
|
"""
|
||||||
|
Fast-forward to the declared offset, for each file.
|
||||||
|
"""
|
||||||
|
self.frame_idx += 1
|
||||||
|
continue
|
||||||
|
elif self.frame_idx > end_frame_idx:
|
||||||
|
"""
|
||||||
|
`end_frame_idx` is finite.
|
||||||
|
"""
|
||||||
|
break
|
||||||
|
|
||||||
|
yield SeriesFramePackage(frame=frame, sft_info=self.sft_info)
|
||||||
|
self.frame_idx += 1
|
||||||
|
|
||||||
|
if end_frame_idx != math.inf:
|
||||||
|
if self.frame_idx < end_frame_idx:
|
||||||
|
frames_read = self.frame_idx - self.frame_offset
|
||||||
|
raise EOFError(
|
||||||
|
f"Failed to read {self.frame_count - frames_read} frames. "
|
||||||
|
"Not enough data.\n"
|
||||||
|
f"\toffset : {self.frame_offset}\n"
|
||||||
|
f"\tframes read : {frames_read}"
|
||||||
|
)
|
||||||
@@ -0,0 +1,48 @@
|
|||||||
|
from typing import Iterator
|
||||||
|
|
||||||
|
import flatbuffers
|
||||||
|
from physiq_cloud.series_frame import fb_builder
|
||||||
|
from physiq_cloud.series_frame.fb_wrapper import FlatBufferWrapper
|
||||||
|
from physiq_cloud.time import Instant, TimeUnit
|
||||||
|
from physiqfb import SeriesFrame
|
||||||
|
|
||||||
|
from pojagi_dsp.series_frame import (ASeriesFrameEmitter,
|
||||||
|
SeriesFramePackage)
|
||||||
|
|
||||||
|
|
||||||
|
class VCIVitalPatchTelemetrySynthesizer(ASeriesFrameEmitter):
|
||||||
|
|
||||||
|
def frames(self) -> Iterator[SeriesFramePackage]:
|
||||||
|
builder = flatbuffers.Builder(0)
|
||||||
|
target = fb_builder.CreateSeriesFrame(
|
||||||
|
builder=builder,
|
||||||
|
frame_id=Instant.now().to_unix(
|
||||||
|
TimeUnit.MICROSECONDS,
|
||||||
|
) // self.sft_info.frame_size_micros,
|
||||||
|
ingested_at_micros=-1,
|
||||||
|
sampling_sets_offsets=[
|
||||||
|
# replayed_sampling_set(
|
||||||
|
# builder=builder,
|
||||||
|
# sampling_set_data=sampling_set_data,
|
||||||
|
# source_frame_id=source.FrameId(),
|
||||||
|
# target_frame_id=target_frame_id,
|
||||||
|
# frame_size_micros=self.sft_info.frame_size_micros,
|
||||||
|
# is_timestamped=self.timestamped_map[
|
||||||
|
# sampling_set_data.Id()
|
||||||
|
# ],
|
||||||
|
# )
|
||||||
|
# for sampling_set_data in [
|
||||||
|
# source.SamplingSets(idx)
|
||||||
|
# for idx in range(source.SamplingSetsLength())
|
||||||
|
# ]
|
||||||
|
],
|
||||||
|
submitter_id=-1,
|
||||||
|
sensor_id=-1,
|
||||||
|
)
|
||||||
|
builder.Finish(target)
|
||||||
|
target_frame_bytes = builder.Output()
|
||||||
|
|
||||||
|
return FlatBufferWrapper(
|
||||||
|
_bytes=target_frame_bytes,
|
||||||
|
schema=SeriesFrame,
|
||||||
|
)
|
||||||
199
tests/channel/pipeline_test.py
Normal file
199
tests/channel/pipeline_test.py
Normal file
@@ -0,0 +1,199 @@
|
|||||||
|
from copy import deepcopy
|
||||||
|
import math
|
||||||
|
from typing import Iterator
|
||||||
|
import pytest
|
||||||
|
from pojagi_dsp.channel import ASignal, Constantly, Filter, FilterFunction, SignalFunction, IllegalStateError, Map
|
||||||
|
from pojagi_dsp.channel.generator.sine import SineWave
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def srate(): return 44100
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def freq(): return 440
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def const(srate): return Constantly(42, srate=srate)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def sine(srate, freq): return SineWave(freq, srate=srate)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def sine_generator_factory(srate, freq):
|
||||||
|
def sine():
|
||||||
|
phase = 0.0
|
||||||
|
inc = (2 * math.pi * freq)/srate
|
||||||
|
while True:
|
||||||
|
yield math.sin(phase)
|
||||||
|
phase += inc
|
||||||
|
return sine
|
||||||
|
|
||||||
|
|
||||||
|
def test_missing_srate_assigned_from_kwarg_reader(const: ASignal):
|
||||||
|
pipeline = Filter(reader=const)
|
||||||
|
assert pipeline.srate == const.srate
|
||||||
|
|
||||||
|
|
||||||
|
def test_missing_srate_assigned_from_ored_reader(const: ASignal):
|
||||||
|
pipeline = const | Filter
|
||||||
|
assert pipeline.srate == const.srate
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_operator(const: Constantly):
|
||||||
|
cursor = (const + 1).stream()
|
||||||
|
assert next(cursor) == const.constant + 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_mul_operator(const: Constantly):
|
||||||
|
cursor = (const * 100).stream()
|
||||||
|
assert next(cursor) == const.constant * 100
|
||||||
|
|
||||||
|
|
||||||
|
def test_filter_iterable(const: Constantly):
|
||||||
|
pipeline = const | (Filter, Filter, Filter)
|
||||||
|
assert pipeline.reader.reader.reader == const
|
||||||
|
assert next(pipeline) == const.constant == next(const)
|
||||||
|
|
||||||
|
|
||||||
|
def test_filter_expression(const: Constantly):
|
||||||
|
pipeline = const | (Filter | Filter | Filter)
|
||||||
|
assert pipeline.reader.reader.reader == const
|
||||||
|
assert next(pipeline) == const.constant == next(const)
|
||||||
|
|
||||||
|
|
||||||
|
def test_filter_nested_expression(const: Constantly):
|
||||||
|
pipeline = const | (Filter | (Filter, Filter) | (Filter | Filter) | Filter)
|
||||||
|
assert pipeline.reader.reader.reader.reader.reader.reader == const
|
||||||
|
assert next(pipeline) == const.constant == next(const)
|
||||||
|
|
||||||
|
|
||||||
|
def test_reader(const: Constantly):
|
||||||
|
filter = Filter()
|
||||||
|
with pytest.raises(IllegalStateError, match=".reader. is None"):
|
||||||
|
filter.reader
|
||||||
|
assert (const | filter).reader == const
|
||||||
|
|
||||||
|
|
||||||
|
def test_pipeline_missing_reader(const: ASignal):
|
||||||
|
pipeline = Filter | Filter | Filter
|
||||||
|
with pytest.raises(IllegalStateError, match=".reader. is None"):
|
||||||
|
next(pipeline)
|
||||||
|
assert next(const | pipeline)
|
||||||
|
|
||||||
|
|
||||||
|
def test_filter_can_only_be_assigned_one_generator(const: Constantly):
|
||||||
|
pipeline = const | Filter | Filter
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="already has a generator"):
|
||||||
|
Constantly(0) | pipeline
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_tuple(const: Constantly):
|
||||||
|
pipeline = const + (100, 200, 300)
|
||||||
|
assert isinstance(pipeline, Map)
|
||||||
|
assert next(pipeline) == const.constant + (100 + 200 + 300)
|
||||||
|
|
||||||
|
|
||||||
|
def test_mul_tuple(const: Constantly):
|
||||||
|
pipeline = const * (100, 200, 300)
|
||||||
|
assert isinstance(pipeline, Map)
|
||||||
|
assert next(pipeline) == const.constant * (100 * 200 * 300)
|
||||||
|
|
||||||
|
|
||||||
|
def test_radd(const: Constantly):
|
||||||
|
pipeline = 1 + const
|
||||||
|
assert next(pipeline) == const.constant + 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_radd_tuple(const: Constantly):
|
||||||
|
pipeline = (1, 2, 3) + const
|
||||||
|
assert next(pipeline) == const.constant + (1 + 2 + 3)
|
||||||
|
|
||||||
|
|
||||||
|
def test_rmul(const: Constantly):
|
||||||
|
pipeline = 2 * const
|
||||||
|
assert next(pipeline) == const.constant * 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_rmul_tuple(const: Constantly):
|
||||||
|
pipeline = (2, 3, 4) * const
|
||||||
|
assert next(pipeline) == const.constant * (2 * 3 * 4)
|
||||||
|
|
||||||
|
|
||||||
|
def test_samples(sine: SineWave, sine_generator_factory):
|
||||||
|
pipeline = sine | Filter
|
||||||
|
cursor = pipeline.samples()
|
||||||
|
cursor2 = sine_generator_factory()
|
||||||
|
for val in cursor:
|
||||||
|
assert val == next(cursor2)
|
||||||
|
|
||||||
|
|
||||||
|
def test_stream(sine: SineWave, sine_generator_factory):
|
||||||
|
pipeline = sine | Filter
|
||||||
|
cursor = pipeline.stream()
|
||||||
|
cursor2 = sine_generator_factory()
|
||||||
|
for _ in range(5):
|
||||||
|
assert next(cursor) == next(cursor2)
|
||||||
|
|
||||||
|
|
||||||
|
def test_cursor(sine: SineWave, sine_generator_factory):
|
||||||
|
pipeline = sine | Filter
|
||||||
|
cursor1 = pipeline.cursor
|
||||||
|
cursor2 = pipeline.cursor
|
||||||
|
cursor3 = sine_generator_factory()
|
||||||
|
assert cursor1 == cursor2
|
||||||
|
for v1 in cursor1:
|
||||||
|
assert v1 == next(cursor3)
|
||||||
|
|
||||||
|
with pytest.raises(StopIteration):
|
||||||
|
next(cursor2)
|
||||||
|
|
||||||
|
|
||||||
|
def test_iter(sine: SineWave):
|
||||||
|
count = 0
|
||||||
|
for _ in sine:
|
||||||
|
count += 1
|
||||||
|
assert count == math.ceil(sine.wavelength)
|
||||||
|
for _ in sine:
|
||||||
|
count += 1
|
||||||
|
assert count == math.ceil(sine.wavelength * 2)
|
||||||
|
|
||||||
|
|
||||||
|
def test_next(sine: SineWave):
|
||||||
|
pipeline = sine | Filter
|
||||||
|
for _ in range(5):
|
||||||
|
val = next(pipeline)
|
||||||
|
assert type(val) == float
|
||||||
|
assert val != next(pipeline)
|
||||||
|
|
||||||
|
|
||||||
|
def test_function_filter(const: Constantly):
|
||||||
|
pipeline = const | FilterFunction(lambda r: (x + 1 for x in r))
|
||||||
|
assert next(pipeline) == const.constant + 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_filter_function_literal(const: Constantly):
|
||||||
|
pipeline = const | (lambda r: (x + 1 for x in r))
|
||||||
|
assert next(pipeline) == const.constant + 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_signal_function(sine: SineWave, sine_generator_factory):
|
||||||
|
pipeline = SignalFunction(lambda _: sine.samples(), srate=srate) | Filter
|
||||||
|
sine_generator = sine_generator_factory()
|
||||||
|
for _ in range(5):
|
||||||
|
assert next(sine_generator) == next(pipeline)
|
||||||
|
|
||||||
|
|
||||||
|
def test_sine_mod(sine: SineWave):
|
||||||
|
seconds = 10
|
||||||
|
stream = sine.stream()
|
||||||
|
for _ in range(sine.srate * seconds):
|
||||||
|
next(stream)
|
||||||
|
assert sine.phase <= (2 * math.pi)
|
||||||
|
|
||||||
|
for _ in sine:
|
||||||
|
assert sine.phase <= (2 * math.pi)
|
||||||
Reference in New Issue
Block a user