commit fc9f76a01bb01b01edd8cea1e23ec103126263f0 Author: Tom Brennan Date: Mon Apr 22 11:38:19 2024 -0400 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..94fb88c --- /dev/null +++ b/.gitignore @@ -0,0 +1,24 @@ +notes.* + +.venv +.vscode + +linen* + +build/ + +loadtest.coverage.timespan + +node_modules/ +yarn-error.log + +__pycache__/ + +*.log + +*.env +my-* + +pojagi_dsp.egg-info/ + +.pytest_cache/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..069c486 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# pojagi-dsp + +DSP tools for load testing. diff --git a/documentation/QRS-complex.png b/documentation/QRS-complex.png new file mode 100644 index 0000000..3e88413 Binary files /dev/null and b/documentation/QRS-complex.png differ diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..f9a2b35 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,28 @@ +[build-system] +requires = ["setuptools", "setuptools-scm"] +build-backend = "setuptools.build_meta" + +[project] +name = "pojagi-dsp" +description = "DSP tools for load testing." +urls = { "gitlab" = "https://gitlab.pojagi.org/tjb1982/dsp" } +authors = [ + { name = "Tom Brennan", email = "tjb1982@gmail.com" }, +] +readme = "README.md" +requires-python = ">=3.8" +classifiers = [] +dependencies = [ + "pydantic==1.10.2", + "scipy==1.8.1", + + # These should be included in the above requirement + # "marshmallow>=3.3.0", + # "marshmallow_dataclass>=7.2.1", + # "marshmallow_oneofschema>=2.0.1", + # "PyYAML>=5.3.1", +] + +optional-dependencies = { test = ["pytest"] } +version = "0.0.0.dev0" +# dynamic = ["version"] diff --git a/src/pojagi_dsp/channel/__init__.py b/src/pojagi_dsp/channel/__init__.py new file mode 100644 index 0000000..722128d --- /dev/null +++ b/src/pojagi_dsp/channel/__init__.py @@ -0,0 +1,337 @@ +import abc +import copy +import datetime +import inspect +from itertools import islice +import logging +import math +import operator +from collections.abc import Iterable +from functools import reduce +import types +from typing import (Any, Callable, Generic, Iterator, Optional, Type, TypeVar, + Union) + +logger = logging.getLogger(__name__) + +T = TypeVar("T") + + +class IllegalStateError(ValueError): + ... + + +def coerce_channels(x: Any) -> Iterator["ASignal"]: + if isinstance(x, ASignal): + yield x + else: + if callable(x): + if isinstance(x, Type): + yield x() + else: + yield SignalFunction(x) + elif isinstance(x, Iterable): # and not isinstance(x, str): + for it in (coerce_channels(y) for y in x): + for channel in it: + yield channel + else: + yield Constantly(x) + + +class ASignalMeta(abc.ABCMeta): + def __or__(self, other: Any) -> "Filter": + """ + Allows `|` composition starting from an uninitialized class. + See doc for `__or__` below in `ASignal`. + """ + return self() | coerce_channels(other) + + def __radd__(self, other): return self() + other + def __add__(self, other): return self() + other + def __rmul__(self, other): return self() * other + def __mul__(self, other): return self() * other + + +class ASignal(Generic[T], metaclass=ASignalMeta): + def __init__(self, srate: Optional[float] = None): + self._srate = srate + self._cursor: Optional[Iterator[T]] = None + + @property + def srate(self): + if self._srate is None: + raise IllegalStateError( + f"{self.__class__}: `srate` is None." + ) + return self._srate + + @srate.setter + def srate(self, val: float): self._srate = val + + def __iter__(self): + self._cursor = self.samples() + return self + + def __next__(self): return next(self.cursor) + + @abc.abstractmethod + def samples(self) -> Iterator[T]: ... + + @property + def cursor(self): + """ + An `Iterator` representing the current pipeline in progress. + """ + if self._cursor is None: + # this can only happen once + self._cursor = self.samples() + return self._cursor + + def __getstate__(self): + """ + `_cursor` is a generator, and generators aren't picklable. + """ + state = self.__dict__.copy() + if state.get("_cursor"): + del state["_cursor"] + return state + + def stream(self): + while True: + try: + yield next(self.cursor) + except StopIteration: + self = iter(self) + + def of_duration(self, duration: datetime.timedelta): + """ + Returns an `Iterator` of samples for a particular duration expressed + as a `datetime.timedelta` + :param:`duration` - `datetime.timedelta` representing the duration + """ + return islice( + self.stream(), + 0, + math.floor(self.srate * duration.total_seconds()), + ) + + def __or__( + left, + right: Union["Filter", Callable, Iterable], + ) -> "Filter": + """ + Allows composition of filter pipelines with `|` operator. + + e.g., + ``` + myFooGenerator + | BarFilter + | baz_filter_func + | (lambda reader: (x for x in reader)) + ``` + """ + if isinstance(right, SignalFunction): + return left | FilterFunction(fn=right._fn, name=right.Function) + + if not isinstance(right, ASignal): + return reduce(operator.or_, (left, *coerce_channels(right))) + + if not isinstance(right, Filter): + raise ValueError( + f"Right side must be a `{Filter.__name__}`; " + f"received: {type(right)}", + ) + + filter: Filter = right + while getattr(filter, "_reader", None) is not None: + # Assuming this is a filter pipeline, we want the last node's + # reader to be whatever's on the left side of this operation. + filter = filter.reader + + if hasattr(filter, "_reader"): + # We hit the "bottom" and found a filter. + filter.reader = left + else: + # We hit the "bottom" and found a non-filter/generator. + raise ValueError( + f"{right.__class__.__name__}: filter pipeline already has a " + "generator." + ) + + # Will often be `None` unless `left` is a generator. + right.srate = left._srate + + return right + + def __radd__(right, left): return right.__add__(left) + def __add__(left, right): return left._operator_impl(operator.add, right) + def __rmul__(right, left): return right.__mul__(left) + def __mul__(left, right): return left._operator_impl(operator.mul, right) + # FIXME: other operators? Also, shouldn't `*` mean convolve instead? + + def _operator_impl(left, operator: Callable[..., T], right: Any): + channels = list(coerce_channels(right)) + for channel in channels: + if channel._srate is None: + channel.srate = left._srate + return Reduce(operator, left, *channels, srate=left._srate) + + def __repr__(self): + members = {} + for k in [k for k in dir(self) + if not k.startswith("_") + and not k in {"stream", "reader", "cursor", "wave", }]: + try: + v = getattr(self, k) + if not inspect.isroutine(v): + members[k] = v + except IllegalStateError as e: + members[k] = None + + return ( + f"{self.__class__.__name__}" + f"""({ + f", ".join( + f"{k}={v}" + for k, v in members.items() + ) + })""" + ) + + +S = TypeVar("S", bound=ASignal) + + +class Reduce(ASignal, Generic[S, T]): + def __init__( + self, + # FIXME: typing https://stackoverflow.com/a/67814270 + fn: Callable[..., T], + *streams: S, + srate: Optional[float] = None, + stateful=False, + ): + super().__init__(srate) + self._fn = fn + self.fn = fn.__name__ + self.streams = [] + for stream in streams: + if stateful: + self.streams.append(stream) + continue + + stream_ = ( + copy.deepcopy(stream) + if not isinstance(stream, types.GeneratorType) + else stream + ) + stream_.srate = srate + self.streams.append(stream_) + + @property + def srate(self): return ASignal.srate.fget(self) + + @srate.setter + def srate(self, val: float): + ASignal.srate.fset(self, val) + for stream in self.streams: + if isinstance(stream, ASignal): + stream.srate = val + + def samples(self): return ( + reduce(self._fn, args) + for args in zip(*self.streams) + ) + + +class Filter(ASignal, Generic[S]): + + def __init__( + self, + reader: Optional[S] = None, + srate: Optional[float] = None, + ): + super().__init__(srate) + self.reader: Optional[S] = reader + + @property + def reader(self) -> S: + """ + The input stream this filter reads. + """ + if not self._reader: + raise IllegalStateError( + f"{self.__class__}: `reader` is None." + ) + return self._reader + + @reader.setter + def reader(self, val: S): + self._reader = val + if val is not None and self._srate is None: + self.srate = val._srate + + @property + def srate(self): return ASignal.srate.fget(self) + + @srate.setter + def srate(self, val: float): + ASignal.srate.fset(self, val) + child = getattr(self, "_reader", None) + previous_srate = val + while child is not None: + # Since `srate` is optional at initialization, but required in + # general, we make our best attempt to normalize it for the + # filter pipeline, which should be consistent for most + # applications, by applying it to all children. + if child._srate is None: + child.srate = previous_srate + child: Optional[ASignal] = getattr(child, "_reader", None) + if isinstance(child, ASignal) and child._srate is not None: + previous_srate = child._srate + + def samples(self) -> Iterator[T]: return self.reader.samples() + + def __repr__(self): + return ( + f"{self._reader} | {super().__repr__()}" + ) + + +class FilterFunction(Filter, Generic[T, S]): + def __init__( + self, + fn: Callable[[S], Iterator[T]], + name: Optional[str] = None, + reader: Optional[S] = None, + srate: Optional[float] = None, + ): + super().__init__(reader, srate) + self._fn = fn + self.Function = name if name else fn.__name__ + + def samples(self): return self._fn(self.reader) + + +class SignalFunction(ASignal, Generic[T]): + def __init__( + self, + fn: Callable[[int], Iterator[T]], + name: Optional[str] = None, + srate: Optional[float] = None, + ): + super().__init__(srate) + self._fn = fn + self.Function = name if name else fn.__name__ + + def samples(self) -> Iterator[T]: return self._fn(self.srate) + + +class Constantly(ASignal, Generic[T]): + def __init__(self, constant: T, srate: float = 0.0): + super().__init__(srate) + self.constant = constant + + def samples(self) -> Iterator[T]: + while True: + yield self.constant diff --git a/src/pojagi_dsp/channel/accelerometer/generator.py b/src/pojagi_dsp/channel/accelerometer/generator.py new file mode 100644 index 0000000..6c797b4 --- /dev/null +++ b/src/pojagi_dsp/channel/accelerometer/generator.py @@ -0,0 +1,36 @@ +from typing import Optional +from pojagi_dsp.channel import ASignal + + +class AccelerometerSynthesizer(ASignal): + """ + Features/dimensions: + - respiration + - awake vs. asleep + - activity level (while awake or asleep?) + + I think what's needed here is to identify activities in the raw data and + then use those as wavetables for various activities. Walking, running, sitting, + laying down, sleeping in various positions. Then, theoretically, these + tables could be mixed together. But the problem with that is that we really + need to extract the components from the signal so that we can recombine them. + Perhaps do Fourier analysis on various segments you suspect to be wakefulness + and sleep, and see if you can find any commonality between them in the frequency + domain, that could be filtered out and reapplied synthetically. + + https://www.ncbi.nlm.nih.gov/pmc/articles/PMC3203837/ + + The above link gives some insight into respiration divisions based on activity, + and how to derive that data using bandpass filtering. + """ + def __init__(self, srate: Optional[float] = None): + super().__init__(srate) + self.activity_level = 0.0 + # Normal range at rest: 12 - 20 (up to 25) + # https://my.clevelandclinic.org/health/articles/10881-vital-signs + # Normal range during exercise 40-60 + # https://www.ncbi.nlm.nih.gov/pmc/articles/PMC4818249/ + self.respiration_rate = 12 # bpm + + def samples(self): + ... diff --git a/src/pojagi_dsp/channel/ecg/__init__.py b/src/pojagi_dsp/channel/ecg/__init__.py new file mode 100644 index 0000000..034c1f6 --- /dev/null +++ b/src/pojagi_dsp/channel/ecg/__init__.py @@ -0,0 +1,98 @@ +import abc +import dataclasses +import itertools +import math +from enum import Enum +from numbers import Number +from typing import ClassVar, Iterator, Optional + +from pojagi_dsp.channel import ASignal + + +Segment = Enum("Segment", [ + "R", + "S", + "S_T", + "T", + "U", + "T_P", + "P", + "P_R", + "Q", +]) + + +@dataclasses.dataclass(frozen=True) +class Segments: + """ + The ECG wave is a kind of impulse with 8 to 9 segments. + + Segment pointers based on https://geekymedics.com/how-to-read-an-ecg/ + """ + # ClassVar means it's not part of the constructor (i.e., `__init__` args). + R: ClassVar[int] = 0 + """R is constant 0""" + S: int + S_T: int # i.e., J point + T: int + T_P: int + P: int + P_R: int # actually, only goes from P-Q, confusingly + Q: int + + U: Optional[int] = None + """Rare, but if found, comes before T_P.""" + + def merge(self, other: "Segments", weight: float): + """ + Merge two `Segments`, with a given weight. + :param:`other` - the other `Segments` to merge + :param:`weight` - a value between 0.0 and 1.0 to indicate how much + weight to give to `other`. + """ + self_weight = 1 - weight + return Segments( + **{ + k: math.floor(lv * self_weight + getattr(other, k) * weight) + if lv is not None + else getattr(other, k) + + for k, lv in ( + (f.name, getattr(self, f.name)) + for f in dataclasses.fields(self) + ) + }, + ) + + +class AECGChannel(ASignal[Number]): + + @property + @abc.abstractproperty + def heart_rate(self) -> float: + """Frequency of impulses/waves in bpm.""" + ... + + @property + @abc.abstractproperty + def wavelength(self) -> int: + """ + The number of samples in a complete impulse/wave cycle. + + Formula: `srate / (heart_rate / 60)` where `heart_rate / 60` is the + heart rate converted from bpm to Hz. + """ + ... + + @property + @abc.abstractproperty + def segments(self) -> Segments: + """The analytical segments of the impulse/wave.""" + ... + + @property + def wave(self) -> Iterator[Number]: + """ + Returns an iterator over a single ECG impulse/wave. + """ + return itertools.islice(self, 0, self.wavelength) diff --git a/src/pojagi_dsp/channel/ecg/filter/__init__.py b/src/pojagi_dsp/channel/ecg/filter/__init__.py new file mode 100644 index 0000000..8a799bc --- /dev/null +++ b/src/pojagi_dsp/channel/ecg/filter/__init__.py @@ -0,0 +1,18 @@ +from numbers import Number +from pojagi_dsp.channel.ecg import AECGChannel, Segments + +from pojagi_dsp.channel import Filter + + +class ECGFilter(Filter[AECGChannel], AECGChannel): + + @property + def heart_rate(self): return self.reader.heart_rate + + @property + def wavelength(self): return self.reader.wavelength + + @property + def segments(self) -> Segments: return self.reader.segments + + def samples(self): return self.reader.samples() diff --git a/src/pojagi_dsp/channel/ecg/filter/segment.py b/src/pojagi_dsp/channel/ecg/filter/segment.py new file mode 100644 index 0000000..24d096f --- /dev/null +++ b/src/pojagi_dsp/channel/ecg/filter/segment.py @@ -0,0 +1,11 @@ +from pojagi_dsp.channel.ecg import Segment +from pojagi_dsp.channel.ecg.filter import ECGFilter + + +class WithSegment(ECGFilter): + def __init__(self, segment: Segment, **kwargs): + super().__init__(**kwargs) + self.segment = segment + + # def stream(self): + # ... \ No newline at end of file diff --git a/src/pojagi_dsp/channel/ecg/generator/__init__.py b/src/pojagi_dsp/channel/ecg/generator/__init__.py new file mode 100644 index 0000000..33808f5 --- /dev/null +++ b/src/pojagi_dsp/channel/ecg/generator/__init__.py @@ -0,0 +1,24 @@ +import math +from typing import Optional + +from pojagi_dsp.channel.ecg import AECGChannel + + +class AECGSynthesizer(AECGChannel): + def __init__(self, /, heart_rate: int, srate: Optional[float] = None): + """ + :param:`heart_rate` - The heart rate in bpm. + """ + super().__init__(srate) + self.heart_rate = heart_rate + + @property + def heart_rate(self): return self._heart_rate + + @heart_rate.setter + def heart_rate(self, val: int): + self._heart_rate = val + self._wavelength = math.floor(self.srate * (60 / self.heart_rate)) + + @property + def wavelength(self): return self._wavelength diff --git a/src/pojagi_dsp/channel/ecg/generator/wavetable/__init__.py b/src/pojagi_dsp/channel/ecg/generator/wavetable/__init__.py new file mode 100644 index 0000000..18ec385 --- /dev/null +++ b/src/pojagi_dsp/channel/ecg/generator/wavetable/__init__.py @@ -0,0 +1,266 @@ +import dataclasses +import logging +import math +from numbers import Number +from typing import Dict, List, Optional, Tuple + +import numpy as np +from scipy.interpolate import CubicSpline + +from pojagi_dsp.channel.ecg import Segments +from pojagi_dsp.channel.ecg.generator import AECGSynthesizer + +logger = logging.getLogger(__name__) + + +class ECGWaveTable: + """ + This type of wavetable is designed around the P and R. By + convention, R will always be equal to 1, and the baseline (P) will always + be 0. (That doesn't mean, however, that the other values can't cross these + boundaries. E.g., Q and S are often negative.) + """ + + def __init__( + self, + data: List[Number], + segments: Segments, + bottom: Optional[Number] = None, + top: Optional[Number] = None, + table_length: int = 1 << 11, # 2048 + ): + """ + The table size is increased to `table_length` upon initialization + using linear interpolation (via `numpy.interp`). + """ + + if len(data) == table_length: + self.data = np.array(data) + else: + # We generate a larger table for use with linear interpolation, + # trading time (CPU) for memory (table size). + # + # Here we use cubic spline interpolation instead of linear for + # wavetable construction, since it usually only happens once at + # startup, and should provide a much better quality table from + # limited data, making it possible to work with small, manually + # composed tables that we JIT convert to the larger table. + ### FIXME: use the sinc function instead: + ### f(x) = sin(x)/x where x =/= 0 and f(x) = 1 if x = 0 + ### you have to apply this scaled to each sample in the table and + ### then add all of the resulting signals together. + ### I think this is the same as summing the dft of each impulse + ### as if the impulse is a member of a larger table. + cs = CubicSpline( + range(len(data)), + data, + bc_type="natural", + ) + self.data = np.array([ + cs(x) for x in np.linspace(0, len(data), table_length) + ]) + + self.segments = Segments( + **{ + k: int(v / (len(data) / table_length)) if v else v + for k, v in ( + (f.name, getattr(segments, f.name)) + for f in dataclasses.fields(segments) + ) + }, + ) + + # NOTE: these are not the data min/max, but the normal min/max, either + # provided as kwargs, or derived from P and R segment starts, by + # convention. + bottom = bottom if bottom is not None else data[segments.P] + top = top if top is not None else data[segments.R] + + if not (0 == bottom and 1 == top): + # Normalize between 0 and 1: + self.data = (self.data - bottom) / (top - bottom) + + def __getitem__(self, k): return self.data[k] + + def __len__(self): return len(self.data) # O(1) + + def linear_interpolation( + self, + index: float, + floor: Optional[int] = None, + ceiling: Optional[int] = None, + ) -> float: + """ + Handles the situation where the floor would produce duplicate values, + which makes the waveform chunky with aliasing; instead, we obtain a + value weighted between the floor/ceiling, trading time (CPU) for + memory (table size). + """ + dl = len(self.data) + floor = floor if floor is not None else math.floor(index) % dl + ceiling = ceiling if ceiling is not None else (floor + 1) % dl + + # e.g., a. 124.75 - 124 == 0.75 + # b. 123 - 123 == 0 (no weight goes to ceiling) + ceiling_weight = index - floor + # e.g., a. 1 - 0.75 == 0.25 + # b. 1 - 0 == 1 (all weight goes to floor) + floor_weight = 1 - ceiling_weight + + return ( + self[floor] * floor_weight + self[ceiling] * ceiling_weight + ) + + def merge( + self, + other: "ECGWaveTable", + weight: float, + ): + self_weight = 1 - weight + return ECGWaveTable( + data=(self.data * self_weight + other.data * weight), + segments=self.segments.merge(other.segments, weight), + top=1, + bottom=0, + ) + + +class ECGWaveTableSynthesizer(AECGSynthesizer): + def __init__( + self, /, + tables: Dict[Tuple[float, float], ECGWaveTable], + heart_rate: int, + srate: Optional[float] = None, + ): + self.inc: float = 0.0 + self.tables = tables + super().__init__(heart_rate, srate) + + self._segments: Segments = None + + def samples(self): + index: float = 0.0 + inc: float = None + n: int = 0 + + while True: + if n > 0: + floor = math.floor(index) + else: + # `n` cannot be negative + floor = 0 + index = 0.0 + + # NOTE: it's important that this `n` calculation occur before + # `yield` because the `heart_rate` (and, hence, the `wavelength`) + # may change in the meantime, which affects resetting to zero with + # the modulo operation below. + n = (n + 1) % self.wavelength + + yield self.table.linear_interpolation(index, floor=floor) + + if (self.heart_rate < 60 + and + self.table.segments.T_P <= floor < self.table.segments.P): + inc = self.brady_inc + elif (self.heart_rate > 60 + and + self.table.segments.T <= floor < self.table.segments.Q): + # FIXME: this is probably only good below a certain + # `heart_rate` threshold. + inc = self.tachy_inc + else: + inc = None + + index += inc if inc is not None else self.inc + index %= len(self.table) + + def _recalibrate(self, heart_rate: float) -> None: + table_matches = { + k: v + for k, v in self.tables.items() + if k[0] <= heart_rate < k[1] + } + + if not table_matches: + raise ValueError( + f"No table found corresponding to heart rate: {heart_rate}." + ) + + # Since we may have more than two tables that match, we loop + # through all the matches, applying them in key order. + keys = iter(sorted(table_matches)) + key = next(keys) + table = table_matches[key] + + for next_key in keys: + next_table = table_matches[next_key] + + if next_key[1] < key[1]: + # `next_key` is fully contained within `key` + floor, ceiling = next_key + next_weight = (heart_rate - floor) / (ceiling - floor) + weight = 1 - next_weight + + if (heart_rate - floor) > ((ceiling - floor) / 2): + # Weights form an "X" shape; i.e., crossfade to 50% + # and back. + weight, next_weight = next_weight, weight + else: + floor = next_key[0] # i.e., the bottom of the top + ceiling = key[1] # i.e., the top of the bottom + next_weight = (heart_rate - floor) / (ceiling - floor) + + table = table.merge(next_table, next_weight) + key = next_key + + self.table = table + + # ECG Tables are designed for 1Hz, and as a default, we don't want to + # stretch anything; hence, no reference to `self.heart_rate` here, + # instead constant 60: + self.inc = len(self.table) / (self.srate * (60 / 60)) + + # Stretch only the T_P segment to compensate, rather than + # stretching the whole wave. + table_segment_length = \ + self.table.segments.P - self.table.segments.T_P + self.brady_inc = self.stretch_inc(table_segment_length) + + # Preserve QRS-J-point; compress Jp-Q to compensate. + table_segment_length = \ + self.table.segments.Q - self.table.segments.S_T + self.tachy_inc = self.stretch_inc(table_segment_length) + + @AECGSynthesizer.heart_rate.setter + def heart_rate(self, val): + AECGSynthesizer.heart_rate.fset(self, val) + self._recalibrate(heart_rate=val) + + def stretch_inc(self, table_segment_length): + # Get the missing samples by subtracting the number of samples + # contributed by the 1Hz table default, minus the segment we + # want to stretch. + tmp_wavelength = self.wavelength - \ + (len(self.table) - table_segment_length) / self.inc + + return table_segment_length / tmp_wavelength + + @property + def segments(self): + if self._segments: + return self._segments + + table_length = len(self.table) + table_segments = self.table.segments + # FIXME: this is a lie since we stretch T_P, etc. + self._segments = Segments( + **{ + k: math.floor(v * self.srate / table_length) if v else v + for k, v in [ + (f.name, getattr(table_segments, f.name)) + for f in dataclasses.fields(table_segments) + ] + } + ) + return self._segments diff --git a/src/pojagi_dsp/channel/ecg/generator/wavetable/sinus.py b/src/pojagi_dsp/channel/ecg/generator/wavetable/sinus.py new file mode 100644 index 0000000..d0b6b75 --- /dev/null +++ b/src/pojagi_dsp/channel/ecg/generator/wavetable/sinus.py @@ -0,0 +1,104 @@ +import math +import numpy as np +from numbers import Number +import random +from typing import List +from pojagi_dsp.channel.ecg import Segments +from pojagi_dsp.channel.ecg.generator.wavetable import ECGWaveTable + +# NOTE: larger table required for avoiding aliasing at different srates than 125Hz +sinus_data = [ + # R-S: 0 + 2000, 1822, 374, + + # S-Jp: 3 + -474, -271, -28, 18, 66, + + # Jp-T: 9 + 63, 73, 91, 101, 101, 101, 116, 124, + 124, + + # T: 17 + 141, 171, 186, 196, 229, 265, 297, 327, + 363, 406, 446, 475, 493, 508, 526, 533, + 518, 475, 403, 327, 272, 222, 174, 138, + 109, 88, 73, 66, 69, 69, 66, 73, + 81, 76, 73, 76, 76, 66, 58, 58, + 63, 63, 41, 26, 26, 18, 8, 8, + 8, + + # U: 66 -- not found + + # T-P: 66 + 2, 3, 2, 2, 2, -1, 2, 2, + 2, -1, 0, -1, -1, 3, 2, 1, + 3, 2, 1, 0, 1, + + # P: 87 + 0, 3, 11, 11, 0, 8, 18, 18, + 18, 15, 8, 18, 26, 26, 26, 8, + 32, 61, 116, 164, 182, 159, 131, 116, + 116, 109, 91, 73, 58, 55, 58, 63, + 69, + + # P-R: 120 + 48, -14, + + # Q-R: 122 + -40, 131, 931, +] # len == 125 + + +tpr = np.arange(-math.floor(120-18)/2, math.ceil(120-18)/2) +tpr_curve: np.ndarray = (tpr ** 2 * -0.1) +tpr_curve = (tpr_curve - tpr_curve[0]) + 141 + +tachycardia = np.array([ # 58-107 flat + # R-S: 0 + 2000, 1822, 374, + + # S-Jp: 3 + -474, -271, -28, 18, 66, + + # Jp-T: 9 + 63, 73, 91, 101, 101, 101, 116, 124, + 124, + + *tpr_curve, + + # P-R: 119 + 124, 48, -14, + + # Q-R: 122 + *(np.array([-40, 131, 931]) * 0.25), + +]) * (2/3) # len == 125 + + +def SinusWaveTable(): return ECGWaveTable( + data=sinus_data, + segments=Segments( + S=3, + S_T=9, + T=17, + T_P=66, + P=87, + P_R=120, + Q=122, + ), +) + +def TachycardiaWaveTable(): return ECGWaveTable( + data=tachycardia, + segments=Segments( + S=3, + S_T=9, + T=17, + T_P=66, + P=87, + P_R=119, + Q=122, + ), + top=2000, + bottom=0, +) diff --git a/src/pojagi_dsp/channel/generator/sine.py b/src/pojagi_dsp/channel/generator/sine.py new file mode 100644 index 0000000..6705fda --- /dev/null +++ b/src/pojagi_dsp/channel/generator/sine.py @@ -0,0 +1,51 @@ +import datetime +import math +from typing import Optional + +from pojagi_dsp.channel import ASignal + +_2_PI = 2 * math.pi + + +class SineWave(ASignal[float]): + def __init__( + self, + hz: float, + phase: float = 0.0, # radians + srate: Optional[float] = None + ): + super().__init__(srate) + self.hz = hz + self.phase = phase + + @property + def wavelength(self): return self.srate/self.hz + + def samples(self): + """An iterator over one period.""" + # zero-ish, but not always zero (to keep things smooth) + self.phase %= _2_PI + + while self.phase < _2_PI: + inc = (_2_PI * self.hz)/self.srate + yield math.sin(self.phase) + self.phase += inc + + +if __name__ == "__main__": + from matplotlib import pyplot as plt + + concert_a = 440 + cd_quality_srate = 44100 + sine = SineWave(concert_a, srate=cd_quality_srate) + + values = [] + hz = sine.hz + # for _ in range(10): + # values += list(sine) + + for y in sine.of_duration(datetime.timedelta(milliseconds=10)): + values.append(y) + + plt.plot(range(len(values)), values) + plt.show() diff --git a/src/pojagi_dsp/series_frame/__init__.py b/src/pojagi_dsp/series_frame/__init__.py new file mode 100644 index 0000000..8a58d96 --- /dev/null +++ b/src/pojagi_dsp/series_frame/__init__.py @@ -0,0 +1,295 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass +import logging +from typing import Iterator, List, Literal, Optional, Type, Union + +from physiq_cloud.series_frame import fb_builder +from physiq_cloud.series_frame.fb_wrapper import FlatBufferWrapper +from physiq_cloud.time import Instant, TimeUnit +from physiqfb.SeriesFrame import SeriesFrame +from physiqfb.SeriesFrameHolderList import SeriesFrameHolderList +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + + +class SamplingSetInfo(BaseModel): + id: int + alias: str + timestamped: bool + # FIXME: not in the INFO.yaml, but isn't this just the `frame_size_micros` + # divided by the number of values in the channel data? We could either get + # it from the first frame, or look it up in the API/repo. + # frequency: int + + +class SftInfo(BaseModel): + alias: Optional[str] + frame_size_micros: int + max_frame_size_bytes: int + sampling_sets: List[SamplingSetInfo] + + +@dataclass(frozen=True) +class SeriesFramePackage: + frame: FlatBufferWrapper[SeriesFrame] + sft_info: SftInfo + + def relative_instant(self, offset=0): + return Instant.from_unix( + TimeUnit.MICROSECONDS, + (self.frame.fb.FrameId() + offset) + * self.sft_info.frame_size_micros, + ) + + @property + def start(self): + return self.relative_instant(0) + + @property + def end(self): + return self.relative_instant(1) + + +class ASeriesFrameEmitter(ABC): + """ + Abstract class defining the characteristics of a series frame emitter, + which is designed to be subclassed by all generators and filters. + + A "generator" is a type of emitter that produces frames from "nothing"; + i.e., by some external process, such as reading a file, an API, etc.; or + by generating frames algorithmically. + + A "filter" is any emitter that takes another emitter and affects its + :meth:`frames` in some arbitrary way. This terminology is borrowed from + the digital signal processing domain, and doesn't imply that the signal + will be reduced or shortened, etc. In fact the signal coming from the + injected emitter might be lengthened or amplified by a filter to any + extent, including the maximum or infinity, etc. + + This class defines an abstract property called `frames` that should + trigger the pipeline to begin and return an `Iterator` of + :class:`SeriesFramePackage`. + + These emitters are designed to be chained together with filters. E.g., a + typical scenario would be to start with some generator, and chain it + together with one or more filters: + + ``` + my_generator = SomeGenerator() + my_first_filter = SomeFilter(reader=my_generator) + my_second_filter = SomeOtherFilter(reader=my_first_filter) + for frames in my_second_filter.frames: ... + ``` + + Alternatively, the `__or__` method has been overloaded so that you can + also do something equivalent to the above like this: + + ``` + for frame in ( + SomeGenerator() + | SomeFilter + | SomeOtherFilter + ): ... + ``` + + NOTE: Accessing `frames` may trigger side effects; it starts a chain of + calls to the reader accessors in the pipeline, which restarts the pipeline + anew each time. Use :meth:`cursor` to access the existing `Iterator` if + you need multiple accesses. + + i.e., calling + ``` + next(reader.cursor) + ``` + is different than + ``` + next(reader.frames()) + ``` + or + ``` + next(reader.stream()) + ``` + in that calling `stream` is like making a database query (that returns and + caches a cursor), and `cursor` is just providing access to the cached + cursor, while calling `frames` directly provides an Iterator without + caching a cursor. + + That said, :meth:`cursor` is empty until :meth:`stream` is called for the + first time. (It requires the initial "query" that :meth:`frames` provides.) + """ + + def __init__(self, sft_info: Optional[SftInfo] = None) -> None: + """ + :param:`sft_info` - an :class:`SftInfo` describing the series frame + type this emitter emits. + """ + super().__init__() + self._sft_info = sft_info + + @property + def sft_info(self): + if not self._sft_info: + raise RuntimeError( + "Illegal state: `_sft_info` is None." + ) + return self._sft_info + + @sft_info.setter + def sft_info(self, val): + self._sft_info = val + + def __iter__(self): return self.stream() + + @abstractmethod + def frames(self) -> Iterator[SeriesFramePackage]: ... + + @property + def cursor(self): + """ + An `Iterator` representing the current pipeline in progress. + """ + return self._cursor + + def stream(self): + """ + Start the pipeline and return an :class:`Iterator` of + :class:`SeriesFramePackage`. + + Each time :meth:`frames` is accessed, a new `Iterator` is instantiated by + accessing the `reader`'s `frames`, which may trigger side effects. It + will also start the process pipeline over, so if you need to iterate + over the frames with multiple calls within a subclass of this, you should + use :meth:`cursor` instead, which returns the same object returned from this + method, without restarting the pipeline. + """ + self._cursor = self.frames() + return self.cursor + + def __or__( + self, + right: Union["SeriesFrameFilter", Type["SeriesFrameFilter"]], + ) -> "SeriesFrameFilter": + if callable(right): + right = right() + + right.reader = self + right.sft_info = self.sft_info + return right + + def sfhls( + self, + max_bytelen: int, + ) -> Iterator[FlatBufferWrapper[SeriesFrameHolderList]]: + """ + Repackage input frames as a series frame holder list (sfhl) iterator. + :param:`max_bytelen` - the max byte length allowed per sfhl. + """ + chunk = list() + chunk_bytelen = 0 + + for pkg in self.frames(): + if chunk_bytelen + len(pkg.frame.bytes) > max_bytelen: + if not chunk: + break + yield sfhl_from_frames(chunk) + chunk_bytelen = chruncate_frames(chunk) + + chunk_bytelen += append_frame(chunk, pkg.frame) + + if chunk: + yield sfhl_from_frames(chunk) + chunk_bytelen = chruncate_frames(chunk) + + +class SeriesFrameFilter(ASeriesFrameEmitter): + """ + Class defining the characteristics of a filter, which is a kind of emitter. + + The difference between a filter and an emitter is: + + 1. an emitter doesn't know how to provide `frames` (no concrete + implementation) + 2. a filter assumes it will read `frames` from another injected emitter + (called a `reader`), and by default simply returns those `frames` + unadulterated. + """ + + def __init__( + self, + reader: Optional[ASeriesFrameEmitter] = None, + **kwargs + ) -> None: + """ + :param:`reader` - input stream this filter reads. + """ + super().__init__(**kwargs) + self._reader = reader + self._cursor: Iterator[SeriesFramePackage] = iter(()) + + @property + def reader(self) -> ASeriesFrameEmitter: + """ + The input stream this filter reads. + """ + if self._reader is None: + raise RuntimeError("Illegal state: `reader` is None.") + return self._reader + + @reader.setter + def reader(self, val): + self._reader = val + + def frames(self): + """ + This has to exist because it's subclassing an abstract class that + declares it, but a concrete implementation of this would be confusing. + Generally speaking, implementations should read the `self.reader` and + do something to modify the frames. + """ + raise NotImplementedError() + + +def sfhl_from_frames( + frames: Iterator[FlatBufferWrapper[SeriesFrame]] +) -> FlatBufferWrapper[SeriesFrameHolderList]: + builder = fb_builder.Builder(0) + builder.Finish( + fb_builder.CreateSeriesFrameHolderList( + builder=builder, + frames=[ + fb_builder.CreateSeriesFrameHolder( + builder=builder, + data=frame.bytes, + ) + for frame in frames + ] + ) + ) + + return FlatBufferWrapper( + _bytes=builder.Output(), + schema=SeriesFrameHolderList, + ) + + +def append_frame( + chunk: List[FlatBufferWrapper[SeriesFrame]], + frame: FlatBufferWrapper[SeriesFrame], +) -> int: + """ + Appends a frame to the given chunk and returns the bytelen it appended. + """ + chunk.append(frame) + return len(frame.bytes) + + +def chruncate_frames( + chunk: List[FlatBufferWrapper[SeriesFrame]], +) -> Literal[0]: + """ + "Truncates" a "chunk" of frames to zero. Returns bytelen (constantly `0`) + to be consistent with :func:`append_frame`. + """ + chunk.clear() + return 0 diff --git a/src/pojagi_dsp/series_frame/ext.py b/src/pojagi_dsp/series_frame/ext.py new file mode 100644 index 0000000..e883ebb --- /dev/null +++ b/src/pojagi_dsp/series_frame/ext.py @@ -0,0 +1,106 @@ +from dataclasses import dataclass +import logging +from pathlib import Path +from typing import Dict, List, Optional + +import yaml +from physiq_cloud.time import Instant, Interval, TimeUnit +from pydantic import BaseModel, validator + +from pojagi_dsp.series_frame import SftInfo +from pojagi_dsp.series_frame.generator.sampler import SftInterval + +logger = logging.getLogger(__name__) + + +class ExcerptType(BaseModel): + series_frame_type: str # vci-vitalpatch-telemetry + sft_interval: Optional[SftInterval] + + +class Excerpt(BaseModel): + account: str # uuid4 + id: str # patient1 + interval: Interval # "iso/iso" + organization: str # "maps" + source_id: str # "maps.physiq.io" + types: List[ExcerptType] + + @validator("interval", pre=True) + def timespan_to_interval(cls, timespan: str): return Interval( + **{ + ["start", "end_exclusive"][idx]: Instant.parse_iso8601(stamp) + for idx, stamp in enumerate(timespan.split("/")) + } + ) + + +class ExtBundleConfig(BaseModel): + excerpts: List[Excerpt] + id: str # clinical-event-trigger + type: str = "PhysIQCloud" + + +@dataclass(frozen=True) +class BundleConfig: + excerpts: Dict[str, Excerpt] + id: str + + @staticmethod + def create(bundle_dir: Path) -> "BundleConfig": + yaml_files = (x.resolve() for x in bundle_dir.glob("**/*.yaml")) + + try: + # the first yaml you encounter is BUNDLE_CONFIG.yaml + ext_bundle_config = ExtBundleConfig.parse_obj( + yaml.safe_load(next(yaml_files).read_text()) + ) + except StopIteration as si: + raise RuntimeError( + f"No yaml files found under \"{bundle_dir}\"." + ) from si + + # the rest are INFO.yaml files + info_files = list(yaml_files) + + for excerpt in ext_bundle_config.excerpts: + logger.debug(f"Loading excerpt \"{excerpt.id}\".") + + for type_ in excerpt.types: + logger.debug(f"Loading SFT \"{type_.series_frame_type}\".") + # Make sure the info file belongs to this excerpt + INFO = next( + x for x in info_files + if x.as_posix() + .split("/")[-3] + .startswith(excerpt.id) + if x.as_posix() + .split("/")[-2] + .startswith(type_.series_frame_type) + ) + + sft_info = SftInfo.parse_obj( + yaml.safe_load(INFO.read_text()) + ) + + sft_info.alias = type_.series_frame_type + + type_.sft_interval = SftInterval( + info=sft_info, + interval=excerpt.interval, + frame_count=( + excerpt.interval + .to_duration() + .to_unix(TimeUnit.MICROSECONDS) + // sft_info.frame_size_micros + ), + files=sorted(INFO.parent.glob("**/*.sfhl")), + ) + + return BundleConfig( + id=ext_bundle_config.id, + excerpts={ + x.id: x + for x in ext_bundle_config.excerpts + }, + ) diff --git a/src/pojagi_dsp/series_frame/filter/__init__.py b/src/pojagi_dsp/series_frame/filter/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/pojagi_dsp/series_frame/filter/looping.py b/src/pojagi_dsp/series_frame/filter/looping.py new file mode 100644 index 0000000..cc7c0d7 --- /dev/null +++ b/src/pojagi_dsp/series_frame/filter/looping.py @@ -0,0 +1,40 @@ +import logging +import math +from typing import Iterator, Union + +from pojagi_dsp.series_frame import (SeriesFrameFilter, + SeriesFramePackage) + +logger = logging.getLogger(__name__) + + +class Looping(SeriesFrameFilter): + """ + Filter that loops through the input reader's frames until it reaches + the given :param:`iterations` + """ + + def __init__( + self, + iterations: Union[int, float] = math.inf, + **kwargs, + ) -> None: + """ + :param:`iterations` - number of times to loop through the `frames`. + """ + super().__init__(**kwargs) + self.iterations = iterations + self.iteration = 0 + + def increment(self) -> None: + """Override to react to `iteration` increment.""" + self.iteration += 1 + logger.debug( + f"Loop incremented ({self.sft_info.alias}): {self.iteration}" + ) + + def frames(self) -> Iterator[SeriesFramePackage]: + while self.iteration < self.iterations: + for pkg in self.reader.frames(): + yield pkg + self.increment() diff --git a/src/pojagi_dsp/series_frame/filter/replaying.py b/src/pojagi_dsp/series_frame/filter/replaying.py new file mode 100644 index 0000000..5787747 --- /dev/null +++ b/src/pojagi_dsp/series_frame/filter/replaying.py @@ -0,0 +1,277 @@ +import functools +import logging +from typing import Any, Iterator, List, Optional + +import flatbuffers +import numpy as np +from physiq_cloud.series_frame import fb_builder +from physiq_cloud.series_frame.fb.math import ceildiv +from physiq_cloud.series_frame.fb_to_numpy import from_channel_data +from physiq_cloud.series_frame.fb_wrapper import FlatBufferWrapper +from physiq_cloud.time import Instant, TimeUnit +from physiqfb.ChannelData import ChannelData +from physiqfb.ReadingType import ReadingType +from physiqfb.SamplingSetData import SamplingSetData +from physiqfb.SeriesFrame import SeriesFrame +from pydantic import BaseModel + +from pojagi_dsp.series_frame import ( + ASeriesFrameEmitter, + SeriesFrameFilter, + SeriesFramePackage, + SftInfo, +) + +logger = logging.getLogger(__name__) + + +class Synchronize(BaseModel): + hour: bool = False + minute: bool = False + second: bool = False + microsecond: bool = False + + +class Replaying(SeriesFrameFilter): + """ + Filter that replays input frames starting at a given :class:`Instant` + in time. + """ + + def __init__( + self, + emit_start: Instant = Instant.now(), + synchronize: Optional[Synchronize] = None, + **kwargs, + ) -> None: + """ + :param:`emit_start` - instant in time to replay the frames. + :param:`synchronize` - Whether and at what granularity to synchronize the Instant from the source + data with the configured :param:`emit_start`. This will alter the precision of the + emit start seconds depending on the level of granularity you choose, + possibly drastically. + """ + super().__init__(**kwargs) + + self.frame_idx = 0 + + self.emit_start = emit_start + self.synchronize = synchronize + self._first_frame_id: int | None = None + self._start_frame_id_inst: int | None = None + + @ASeriesFrameEmitter.sft_info.setter + def sft_info(self, val: SftInfo): + """ + This override's :class:`ASeriesFrameFilter`'s :meth:`sft_info` + setter, so that we can initialize the `timestamped_map` property. + """ + self._sft_info = val + self.timestamped_map = { + s.id: s.timestamped for s in self.sft_info.sampling_sets + } + + @property + def start_frame_id(self): + """ + :meth:`frames` must be called before accessing this for the first time, + because the calculation depends on the first frame. + """ + if self._start_frame_id_inst is not None: + return self._start_frame_id_inst + + if not self.synchronize or not any(v for _, v in self.synchronize): + self.synchronized_emit_start = self.emit_start + else: + emit_start_dt = self.emit_start.to_datetime() + source_start_dt = Instant.from_unix( + TimeUnit.MICROSECONDS, + self._first_frame_id * self.sft_info.frame_size_micros, + ).to_datetime() + + self.synchronized_emit_start = Instant.from_datetime( + emit_start_dt.replace( + **{ + k: getattr(source_start_dt, k) + if getattr(self.synchronize, k) + else getattr(emit_start_dt, k) + for k in ["hour", "minute", "second", "microsecond"] + }, + ), + ) + + # We use ceildiv to calculate the first frame ID, because normal + # division would likely yield a frame BEFORE the emit_start. We want to + # start emitting frames at or after emit_start, and ceildiv does that + # for us. + self._start_frame_id_inst = ceildiv( + self.synchronized_emit_start.to_unix( + TimeUnit.MICROSECONDS, + ), + self.sft_info.frame_size_micros, + ) + + return self._start_frame_id_inst + + def replayed_frame( + self, + source: SeriesFrame, + target_frame_id: int, + ) -> FlatBufferWrapper[SeriesFrame]: + builder = flatbuffers.Builder(0) + target = fb_builder.CreateSeriesFrame( + builder=builder, + frame_id=target_frame_id, + ingested_at_micros=-1, # source.IngestedAtMicros(), + sampling_sets_offsets=[ + replayed_sampling_set( + builder=builder, + sampling_set_data=sampling_set_data, + source_frame_id=source.FrameId(), + target_frame_id=target_frame_id, + frame_size_micros=self.sft_info.frame_size_micros, + is_timestamped=self.timestamped_map[sampling_set_data.Id()], + ) + for sampling_set_data in [ + source.SamplingSets(idx) + for idx in range(source.SamplingSetsLength()) + ] + ], + submitter_id=-1, # source.SubmitterId(), + sensor_id=-1, # source.SensorId(), + ) + builder.Finish(target) + target_frame_bytes = builder.Output() + + return FlatBufferWrapper( + _bytes=target_frame_bytes, + schema=SeriesFrame, + ) + + def frames(self) -> Iterator[SeriesFramePackage]: + """ + Each time this property is accessed, it will access the injected + reader anew, but continue to increment the `frame_idx` so that + the next frame will replay where the last one left off. + + To reset the `frame_idx`, client code can just set the `frame_idx` + manually (or create a new instance). + """ + + for pkg in self.reader.frames(): + if self._first_frame_id is None: + self._first_frame_id = pkg.frame.fb.FrameId() + + yield SeriesFramePackage( + frame=self.replayed_frame( + source=pkg.frame.fb, + target_frame_id=self.start_frame_id + self.frame_idx, + ), + sft_info=pkg.sft_info, + ) + # `int` in python 3 is unbounded, can continue indefinitely. Also, + # the `FrameId` is effectively a timestamp, so continuously + # `inc`ing this value is by design. + self.frame_idx += 1 + + +def replayed_sampling_set( + builder: flatbuffers.Builder, + sampling_set_data: SamplingSetData, + source_frame_id: int, + target_frame_id: int, + frame_size_micros: int, + is_timestamped: bool, +): + source_channels = [ + sampling_set_data.Channels(idx) + for idx in range(sampling_set_data.ChannelsLength()) + ] + + target_channels: List[int] = list() + for source_channel_data in source_channels: + is_timestamp_channel = is_timestamped and source_channel_data.Id() == 0 + + if not is_timestamp_channel or (source_frame_id == target_frame_id): + # Just copy verbatim + target_channel_data = replayed_channel_data( + builder, + source_channel_data, + ) + else: + # Update the timestamps relative to the target frameId + source_frame_start_micros = source_frame_id * frame_size_micros + target_frame_start_micros = target_frame_id * frame_size_micros + + # np.copy is used because the returned array is read-only + timestamps = np.copy(from_channel_data(data=source_channel_data)) + timestamps -= source_frame_start_micros # relativize + timestamps += target_frame_start_micros # frame shift + + target_channel_data = fb_builder.CreateChannelData( + builder=builder, + id=0, # `source_channel_data.Id()` is constantly 0 here + readings_offset=fb_builder.CreateInt64Channel( + builder=builder, + data=timestamps.tolist(), # List[int] + ), + readings_type=ReadingType.INT64, + ) + + target_channels.append(target_channel_data) + + np_relative_timestamps = sampling_set_data.RelativeTimestampsAsNumpy() + relative_timestamps = ( + np_relative_timestamps.tobytes() + if np_relative_timestamps not in {0, -1} # error conditions/no data + else None + ) + + return fb_builder.CreateSamplingSetData( + builder=builder, + channel_offsets=target_channels, + id=sampling_set_data.Id(), + start_offset_micros=sampling_set_data.StartOffsetMicros(), + relative_timestamps_type=sampling_set_data.RelativeTimestampsType(), + relative_timestamps_unit_in_micros=sampling_set_data.RelativeTimestampsUnitInMicros(), + relative_timestamps=relative_timestamps, + relative_timestamps_sample_count=sampling_set_data.RelativeTimestampsSampleCount(), + ) + + +def replayed_channel_data( + builder: flatbuffers.Builder, + channel_data: ChannelData, +) -> int: + readings_type = channel_data.ReadingsType() + + return fb_builder.CreateChannelData( + builder=builder, + id=channel_data.Id(), + readings_offset=channel( + builder=builder, + readings_type=readings_type, + readings=from_channel_data(data=channel_data).tolist(), + ), + readings_type=readings_type, + ) + + +reading_type_map = { + ReadingType.BINARY32: fb_builder.CreateBinary32Channel, + ReadingType.BINARY64: fb_builder.CreateBinary64Channel, + ReadingType.INT8: fb_builder.CreateInt8Channel, + ReadingType.INT16: fb_builder.CreateInt16Channel, + ReadingType.INT32: fb_builder.CreateInt32Channel, + ReadingType.INT64: fb_builder.CreateInt64Channel, + ReadingType.NAMED_READING: fb_builder.CreateNamedReadingChannel, + ReadingType.STRING: fb_builder.CreateStringChannel, +} + + +def channel( + builder: flatbuffers.Builder, + readings_type: int, + readings: List[Any], +) -> int: + return reading_type_map[readings_type](builder=builder, data=readings) diff --git a/src/pojagi_dsp/series_frame/filter/updating.py b/src/pojagi_dsp/series_frame/filter/updating.py new file mode 100644 index 0000000..331b4b3 --- /dev/null +++ b/src/pojagi_dsp/series_frame/filter/updating.py @@ -0,0 +1,76 @@ +import logging +from typing import Callable, Iterator + +from physiq_cloud.time import Duration, Instant, TimeUnit + +from pojagi_dsp.series_frame import SeriesFrameFilter, SeriesFramePackage + +logger = logging.getLogger(__name__) + + +class Updating(SeriesFrameFilter): + """ + Filter that outputs frames until it runs out or the next frame would + surpass a given :class:`Instant` in time, whichever comes first. + """ + + def __init__( + self, + until: Callable[[], Instant] = Instant.now, + **kwargs, + ): + """ + :param:`until` - closure returning an `Instant` at which to stop + iterating when the next frame would surpass it. The cursor will + otherwise persist until it has been exhausted. + """ + super().__init__(**kwargs) + self.until = until + self.last_update_count = 0 + self._next_pkg = None + + def log_status(filter, end: Instant, until: Instant): + between = Duration.between(end, until).to_unix(TimeUnit.MILLISECONDS) / float( + 1000 + ) + + logger.debug( + f"{filter.sft_info.alias}: Update scope : {between} seconds" + if end < until + else ( + f"{filter.sft_info.alias}: " + f"Next update available in {-between} seconds" + ) + ) + + def frames(self) -> Iterator[SeriesFramePackage]: + self.last_update_count = 0 + + if self._next_pkg is None: + cursor = self.reader.stream() + self._next_pkg = next(cursor) + else: + cursor = self.reader.cursor + + logger.debug(f"{self.sft_info.alias}: {cursor}") + + try: + until = self.until() + end = self._next_pkg.end + + self.log_status(end, until) + + while end < until: + yield self._next_pkg + self.last_update_count += 1 + self._next_pkg = next(cursor) + end = self._next_pkg.end + except StopIteration: + logger.warning("StopIteration caught") + logger.warning(f"{self.sft_info.alias}: {cursor}") + ... + + if self.last_update_count: + logger.debug( + f"{self.sft_info.alias}: " f"Update frames : {self.last_update_count}" + ) diff --git a/src/pojagi_dsp/series_frame/generator/sampler/__init__.py b/src/pojagi_dsp/series_frame/generator/sampler/__init__.py new file mode 100644 index 0000000..f87cc0e --- /dev/null +++ b/src/pojagi_dsp/series_frame/generator/sampler/__init__.py @@ -0,0 +1,15 @@ +from dataclasses import dataclass +from pathlib import Path +from typing import List + +from physiq_cloud.time import Interval + +from pojagi_dsp.series_frame import SftInfo + + +@dataclass(frozen=True) +class SftInterval: + info: SftInfo + interval: Interval + frame_count: int + files: List[Path] diff --git a/src/pojagi_dsp/series_frame/generator/sampler/file.py b/src/pojagi_dsp/series_frame/generator/sampler/file.py new file mode 100644 index 0000000..4fd2e95 --- /dev/null +++ b/src/pojagi_dsp/series_frame/generator/sampler/file.py @@ -0,0 +1,95 @@ +import logging +import math +from pathlib import Path +from typing import Iterator + +from physiq_cloud.series_frame.fb_wrapper import FlatBufferWrapper +from physiqfb.SeriesFrame import SeriesFrame +from physiqfb.SeriesFrameHolderList import SeriesFrameHolderList + +from pojagi_dsp.series_frame import (ASeriesFrameEmitter, + SeriesFramePackage) + + +logger = logging.getLogger(__name__) + + +class FileSampler(ASeriesFrameEmitter): + def __init__( + self, + *sfhl_files: Path, + frame_count: int = math.inf, + frame_offset: int = 0, + **kwargs, + ) -> None: + """ + :param:`sfhl_files` - One or more files containing sfhl data. It's + assumed that the files are sorted and contain contiguous (sorted) + data, and no effort is made to validate that or handle any error + condition related to correctness. The files are treated as a single + logical file, and an `EOFError` is raised if the `frame_count` wants + more than the available frames. + + :param:`frame_count` - The number of frames this generator will + attempt to read from the given files. + + :param:`frame_offset` - The number of frames to skip over before + yielding frames from the given files. + """ + super().__init__(**kwargs) + self.files = sfhl_files + self.frame_offset = frame_offset + self.frame_count = frame_count + self.frame_idx = 0 + + def frames_from_sfhl_bytes( + self, data: bytes, + ) -> Iterator[FlatBufferWrapper[SeriesFrame]]: + """ + Emit frames from series frame holder list data. + + :param:`data` - sfhl bytes. + """ + sfhl = SeriesFrameHolderList.GetRootAsSeriesFrameHolderList(data, 0) + for index in range(sfhl.FramesLength()): + yield FlatBufferWrapper( + _bytes=sfhl.Frames(index).DataAsNumpy().tobytes(), + schema=SeriesFrame, + ) + + def frames(self) -> Iterator[SeriesFramePackage]: + self.frame_idx = 0 + end_frame_idx = self.frame_offset + self.frame_count + + for file in self.files: + """ + Finite set of files passed in. + """ + for frame in self.frames_from_sfhl_bytes(file.read_bytes()): + """ + Finite set of frames from `read_bytes`. + """ + if self.frame_idx < self.frame_offset: + """ + Fast-forward to the declared offset, for each file. + """ + self.frame_idx += 1 + continue + elif self.frame_idx > end_frame_idx: + """ + `end_frame_idx` is finite. + """ + break + + yield SeriesFramePackage(frame=frame, sft_info=self.sft_info) + self.frame_idx += 1 + + if end_frame_idx != math.inf: + if self.frame_idx < end_frame_idx: + frames_read = self.frame_idx - self.frame_offset + raise EOFError( + f"Failed to read {self.frame_count - frames_read} frames. " + "Not enough data.\n" + f"\toffset : {self.frame_offset}\n" + f"\tframes read : {frames_read}" + ) diff --git a/src/pojagi_dsp/series_frame/generator/synthesizer/frame/vci_vitalpatch_telemetry.py b/src/pojagi_dsp/series_frame/generator/synthesizer/frame/vci_vitalpatch_telemetry.py new file mode 100644 index 0000000..d3fdb91 --- /dev/null +++ b/src/pojagi_dsp/series_frame/generator/synthesizer/frame/vci_vitalpatch_telemetry.py @@ -0,0 +1,48 @@ +from typing import Iterator + +import flatbuffers +from physiq_cloud.series_frame import fb_builder +from physiq_cloud.series_frame.fb_wrapper import FlatBufferWrapper +from physiq_cloud.time import Instant, TimeUnit +from physiqfb import SeriesFrame + +from pojagi_dsp.series_frame import (ASeriesFrameEmitter, + SeriesFramePackage) + + +class VCIVitalPatchTelemetrySynthesizer(ASeriesFrameEmitter): + + def frames(self) -> Iterator[SeriesFramePackage]: + builder = flatbuffers.Builder(0) + target = fb_builder.CreateSeriesFrame( + builder=builder, + frame_id=Instant.now().to_unix( + TimeUnit.MICROSECONDS, + ) // self.sft_info.frame_size_micros, + ingested_at_micros=-1, + sampling_sets_offsets=[ + # replayed_sampling_set( + # builder=builder, + # sampling_set_data=sampling_set_data, + # source_frame_id=source.FrameId(), + # target_frame_id=target_frame_id, + # frame_size_micros=self.sft_info.frame_size_micros, + # is_timestamped=self.timestamped_map[ + # sampling_set_data.Id() + # ], + # ) + # for sampling_set_data in [ + # source.SamplingSets(idx) + # for idx in range(source.SamplingSetsLength()) + # ] + ], + submitter_id=-1, + sensor_id=-1, + ) + builder.Finish(target) + target_frame_bytes = builder.Output() + + return FlatBufferWrapper( + _bytes=target_frame_bytes, + schema=SeriesFrame, + ) diff --git a/tests/channel/pipeline_test.py b/tests/channel/pipeline_test.py new file mode 100644 index 0000000..3118765 --- /dev/null +++ b/tests/channel/pipeline_test.py @@ -0,0 +1,199 @@ +from copy import deepcopy +import math +from typing import Iterator +import pytest +from pojagi_dsp.channel import ASignal, Constantly, Filter, FilterFunction, SignalFunction, IllegalStateError, Map +from pojagi_dsp.channel.generator.sine import SineWave + + +@pytest.fixture +def srate(): return 44100 + + +@pytest.fixture +def freq(): return 440 + + +@pytest.fixture +def const(srate): return Constantly(42, srate=srate) + + +@pytest.fixture +def sine(srate, freq): return SineWave(freq, srate=srate) + + +@pytest.fixture +def sine_generator_factory(srate, freq): + def sine(): + phase = 0.0 + inc = (2 * math.pi * freq)/srate + while True: + yield math.sin(phase) + phase += inc + return sine + + +def test_missing_srate_assigned_from_kwarg_reader(const: ASignal): + pipeline = Filter(reader=const) + assert pipeline.srate == const.srate + + +def test_missing_srate_assigned_from_ored_reader(const: ASignal): + pipeline = const | Filter + assert pipeline.srate == const.srate + + +def test_add_operator(const: Constantly): + cursor = (const + 1).stream() + assert next(cursor) == const.constant + 1 + + +def test_mul_operator(const: Constantly): + cursor = (const * 100).stream() + assert next(cursor) == const.constant * 100 + + +def test_filter_iterable(const: Constantly): + pipeline = const | (Filter, Filter, Filter) + assert pipeline.reader.reader.reader == const + assert next(pipeline) == const.constant == next(const) + + +def test_filter_expression(const: Constantly): + pipeline = const | (Filter | Filter | Filter) + assert pipeline.reader.reader.reader == const + assert next(pipeline) == const.constant == next(const) + + +def test_filter_nested_expression(const: Constantly): + pipeline = const | (Filter | (Filter, Filter) | (Filter | Filter) | Filter) + assert pipeline.reader.reader.reader.reader.reader.reader == const + assert next(pipeline) == const.constant == next(const) + + +def test_reader(const: Constantly): + filter = Filter() + with pytest.raises(IllegalStateError, match=".reader. is None"): + filter.reader + assert (const | filter).reader == const + + +def test_pipeline_missing_reader(const: ASignal): + pipeline = Filter | Filter | Filter + with pytest.raises(IllegalStateError, match=".reader. is None"): + next(pipeline) + assert next(const | pipeline) + + +def test_filter_can_only_be_assigned_one_generator(const: Constantly): + pipeline = const | Filter | Filter + + with pytest.raises(ValueError, match="already has a generator"): + Constantly(0) | pipeline + + +def test_add_tuple(const: Constantly): + pipeline = const + (100, 200, 300) + assert isinstance(pipeline, Map) + assert next(pipeline) == const.constant + (100 + 200 + 300) + + +def test_mul_tuple(const: Constantly): + pipeline = const * (100, 200, 300) + assert isinstance(pipeline, Map) + assert next(pipeline) == const.constant * (100 * 200 * 300) + + +def test_radd(const: Constantly): + pipeline = 1 + const + assert next(pipeline) == const.constant + 1 + + +def test_radd_tuple(const: Constantly): + pipeline = (1, 2, 3) + const + assert next(pipeline) == const.constant + (1 + 2 + 3) + + +def test_rmul(const: Constantly): + pipeline = 2 * const + assert next(pipeline) == const.constant * 2 + + +def test_rmul_tuple(const: Constantly): + pipeline = (2, 3, 4) * const + assert next(pipeline) == const.constant * (2 * 3 * 4) + + +def test_samples(sine: SineWave, sine_generator_factory): + pipeline = sine | Filter + cursor = pipeline.samples() + cursor2 = sine_generator_factory() + for val in cursor: + assert val == next(cursor2) + + +def test_stream(sine: SineWave, sine_generator_factory): + pipeline = sine | Filter + cursor = pipeline.stream() + cursor2 = sine_generator_factory() + for _ in range(5): + assert next(cursor) == next(cursor2) + + +def test_cursor(sine: SineWave, sine_generator_factory): + pipeline = sine | Filter + cursor1 = pipeline.cursor + cursor2 = pipeline.cursor + cursor3 = sine_generator_factory() + assert cursor1 == cursor2 + for v1 in cursor1: + assert v1 == next(cursor3) + + with pytest.raises(StopIteration): + next(cursor2) + + +def test_iter(sine: SineWave): + count = 0 + for _ in sine: + count += 1 + assert count == math.ceil(sine.wavelength) + for _ in sine: + count += 1 + assert count == math.ceil(sine.wavelength * 2) + + +def test_next(sine: SineWave): + pipeline = sine | Filter + for _ in range(5): + val = next(pipeline) + assert type(val) == float + assert val != next(pipeline) + + +def test_function_filter(const: Constantly): + pipeline = const | FilterFunction(lambda r: (x + 1 for x in r)) + assert next(pipeline) == const.constant + 1 + + +def test_filter_function_literal(const: Constantly): + pipeline = const | (lambda r: (x + 1 for x in r)) + assert next(pipeline) == const.constant + 1 + + +def test_signal_function(sine: SineWave, sine_generator_factory): + pipeline = SignalFunction(lambda _: sine.samples(), srate=srate) | Filter + sine_generator = sine_generator_factory() + for _ in range(5): + assert next(sine_generator) == next(pipeline) + + +def test_sine_mod(sine: SineWave): + seconds = 10 + stream = sine.stream() + for _ in range(sine.srate * seconds): + next(stream) + assert sine.phase <= (2 * math.pi) + + for _ in sine: + assert sine.phase <= (2 * math.pi)