diff --git a/functional_tests/sine.py b/functional_tests/sine.py new file mode 100644 index 0000000..363c653 --- /dev/null +++ b/functional_tests/sine.py @@ -0,0 +1,94 @@ +import subprocess + +import numpy as np +from pojagi_dsp.channel.generator.sine import SineWave +import sys +import datetime +from itertools import islice + +from matplotlib import pyplot as plt +from scipy.io import wavfile + +from pojagi_dsp.channel import Constantly +from pojagi_dsp.channel.filter.envelope import Envelope + + +SRATE = 44100 + + +def test_sawtooth( + fundamental: float, + npartials: int = 10, + seconds: float = 1.0, +): + sine = Constantly(0, srate=SRATE) + + for idx in range(1, npartials): + freq = fundamental * idx + amp = 1 / idx + + partial = SineWave(freq, synchronize=True) + partial *= amp + sine += partial + + sine |= lambda g: (x / 3 for x in g) + sine |= Envelope( + [ + (0, 0.0), + (int(SRATE * seconds / 2), 1.0), + (SRATE * seconds, 0.0), + ] + ) + + values = [] + for y in sine.of_duration(datetime.timedelta(seconds=seconds)): + values.append(y) + + return values + + +def test_pitchbend( + from_pitch: float, + to_pitch: float, + seconds: float = 1.0, +): + sine = SineWave(hz=from_pitch, srate=SRATE) + + sig = sine | Envelope( + [ + (0, 0.0), + (int(SRATE * seconds / 2), 1.0), + (SRATE * seconds, 0.0), + ] + ) + + lfo = (SineWave(1/seconds, srate=SRATE) * 30) + lfo += (SineWave(seconds) * 20) + lfo = lfo.stream() + + values = [] + for idx in range(int(seconds * SRATE)): + values.append(next(sig)) + # if idx % 5000 == 0: + sine.hz = from_pitch + next(lfo) + # print(from_pitch + next(lfo)) + # import time + # time.sleep(0.001) + + return values + + +def do_test(values: list[float]): + plt.plot(range(len(values)), values) + plt.show() + + audio = np.array(values, dtype=np.float32) + wavfile.write("/tmp/output.wav", SRATE, audio) + + with subprocess.Popen(["mplayer", "/tmp/output.wav"]) as p: + p.wait() + + +if __name__ == "__main__": + # do_test(test_sawtooth(55.0)) + do_test(test_pitchbend(110.0, 110.0, seconds=10)) diff --git a/pyproject.toml b/pyproject.toml index f9a2b35..1e62eb5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,23 +6,15 @@ build-backend = "setuptools.build_meta" name = "pojagi-dsp" description = "DSP tools for load testing." urls = { "gitlab" = "https://gitlab.pojagi.org/tjb1982/dsp" } -authors = [ - { name = "Tom Brennan", email = "tjb1982@gmail.com" }, -] +authors = [{ name = "Tom Brennan", email = "tjb1982@gmail.com" }] readme = "README.md" requires-python = ">=3.8" classifiers = [] dependencies = [ - "pydantic==1.10.2", - "scipy==1.8.1", - - # These should be included in the above requirement - # "marshmallow>=3.3.0", - # "marshmallow_dataclass>=7.2.1", - # "marshmallow_oneofschema>=2.0.1", - # "PyYAML>=5.3.1", + "pydantic~=1.10.2", + "scipy~=1.16.2", ] -optional-dependencies = { test = ["pytest"] } +optional-dependencies = { test = ["pytest"], dev = ["matplotlib", "scipy"] } version = "0.0.0.dev0" # dynamic = ["version"] diff --git a/src/pojagi_dsp/channel/__init__.py b/src/pojagi_dsp/channel/__init__.py index 722128d..11ec81b 100644 --- a/src/pojagi_dsp/channel/__init__.py +++ b/src/pojagi_dsp/channel/__init__.py @@ -9,7 +9,7 @@ import operator from collections.abc import Iterable from functools import reduce import types -from typing import (Any, Callable, Generic, Iterator, Optional, Type, TypeVar, +from typing import (Any, Callable, Generator, Generic, Iterator, Optional, Type, TypeVar, Union) logger = logging.getLogger(__name__) @@ -17,7 +17,7 @@ logger = logging.getLogger(__name__) T = TypeVar("T") -class IllegalStateError(ValueError): +class IllegalStateException(ValueError): ... @@ -60,7 +60,7 @@ class ASignal(Generic[T], metaclass=ASignalMeta): @property def srate(self): if self._srate is None: - raise IllegalStateError( + raise IllegalStateException( f"{self.__class__}: `srate` is None." ) return self._srate @@ -185,7 +185,7 @@ class ASignal(Generic[T], metaclass=ASignalMeta): v = getattr(self, k) if not inspect.isroutine(v): members[k] = v - except IllegalStateError as e: + except IllegalStateException as e: members[k] = None return ( @@ -260,7 +260,7 @@ class Filter(ASignal, Generic[S]): The input stream this filter reads. """ if not self._reader: - raise IllegalStateError( + raise IllegalStateException( f"{self.__class__}: `reader` is None." ) return self._reader diff --git a/src/pojagi_dsp/channel/ecg/generator/wavetable/sinus.py b/src/pojagi_dsp/channel/ecg/generator/wavetable/sinus.py index d0b6b75..2a9b56d 100644 --- a/src/pojagi_dsp/channel/ecg/generator/wavetable/sinus.py +++ b/src/pojagi_dsp/channel/ecg/generator/wavetable/sinus.py @@ -102,3 +102,10 @@ def TachycardiaWaveTable(): return ECGWaveTable( top=2000, bottom=0, ) + + +if __name__ == "__main__": + from matplotlib import pyplot as plt + + plt.plot(range(len(tachycardia.tolist() * 3)), tachycardia.tolist() * 3) + plt.show() \ No newline at end of file diff --git a/src/pojagi_dsp/channel/filter/envelope.py b/src/pojagi_dsp/channel/filter/envelope.py new file mode 100644 index 0000000..79341a3 --- /dev/null +++ b/src/pojagi_dsp/channel/filter/envelope.py @@ -0,0 +1,37 @@ +from typing import Optional +from pojagi_dsp.channel import Filter + + +class Envelope(Filter[float]): + def __init__( + self, + checkpoints: list[tuple[int, float]], + *, + srate=None, + reader=None, + ): + super().__init__(reader, srate) + self.checkpoints = sorted(checkpoints) + + def samples(self): + checkpoints = self.checkpoints + idx = 0 + try: + n = 0 + while True: + # Find current segment + while idx + 1 < len(checkpoints) and n >= checkpoints[idx + 1][0]: + idx += 1 + + if idx + 1 < len(checkpoints): + start, dest = checkpoints[idx] + end, end_dest = checkpoints[idx + 1] + t = (n - start) / (end - start) if end > start else 0.0 + value = dest + (end_dest - dest) * t + else: + value = checkpoints[-1][1] + yield next(self.reader.stream()) * value + n += 1 + except StopIteration: + ... + \ No newline at end of file diff --git a/src/pojagi_dsp/series_frame/__init__.py b/src/pojagi_dsp/series_frame/__init__.py deleted file mode 100644 index 8a58d96..0000000 --- a/src/pojagi_dsp/series_frame/__init__.py +++ /dev/null @@ -1,295 +0,0 @@ -from abc import ABC, abstractmethod -from dataclasses import dataclass -import logging -from typing import Iterator, List, Literal, Optional, Type, Union - -from physiq_cloud.series_frame import fb_builder -from physiq_cloud.series_frame.fb_wrapper import FlatBufferWrapper -from physiq_cloud.time import Instant, TimeUnit -from physiqfb.SeriesFrame import SeriesFrame -from physiqfb.SeriesFrameHolderList import SeriesFrameHolderList -from pydantic import BaseModel - -logger = logging.getLogger(__name__) - - -class SamplingSetInfo(BaseModel): - id: int - alias: str - timestamped: bool - # FIXME: not in the INFO.yaml, but isn't this just the `frame_size_micros` - # divided by the number of values in the channel data? We could either get - # it from the first frame, or look it up in the API/repo. - # frequency: int - - -class SftInfo(BaseModel): - alias: Optional[str] - frame_size_micros: int - max_frame_size_bytes: int - sampling_sets: List[SamplingSetInfo] - - -@dataclass(frozen=True) -class SeriesFramePackage: - frame: FlatBufferWrapper[SeriesFrame] - sft_info: SftInfo - - def relative_instant(self, offset=0): - return Instant.from_unix( - TimeUnit.MICROSECONDS, - (self.frame.fb.FrameId() + offset) - * self.sft_info.frame_size_micros, - ) - - @property - def start(self): - return self.relative_instant(0) - - @property - def end(self): - return self.relative_instant(1) - - -class ASeriesFrameEmitter(ABC): - """ - Abstract class defining the characteristics of a series frame emitter, - which is designed to be subclassed by all generators and filters. - - A "generator" is a type of emitter that produces frames from "nothing"; - i.e., by some external process, such as reading a file, an API, etc.; or - by generating frames algorithmically. - - A "filter" is any emitter that takes another emitter and affects its - :meth:`frames` in some arbitrary way. This terminology is borrowed from - the digital signal processing domain, and doesn't imply that the signal - will be reduced or shortened, etc. In fact the signal coming from the - injected emitter might be lengthened or amplified by a filter to any - extent, including the maximum or infinity, etc. - - This class defines an abstract property called `frames` that should - trigger the pipeline to begin and return an `Iterator` of - :class:`SeriesFramePackage`. - - These emitters are designed to be chained together with filters. E.g., a - typical scenario would be to start with some generator, and chain it - together with one or more filters: - - ``` - my_generator = SomeGenerator() - my_first_filter = SomeFilter(reader=my_generator) - my_second_filter = SomeOtherFilter(reader=my_first_filter) - for frames in my_second_filter.frames: ... - ``` - - Alternatively, the `__or__` method has been overloaded so that you can - also do something equivalent to the above like this: - - ``` - for frame in ( - SomeGenerator() - | SomeFilter - | SomeOtherFilter - ): ... - ``` - - NOTE: Accessing `frames` may trigger side effects; it starts a chain of - calls to the reader accessors in the pipeline, which restarts the pipeline - anew each time. Use :meth:`cursor` to access the existing `Iterator` if - you need multiple accesses. - - i.e., calling - ``` - next(reader.cursor) - ``` - is different than - ``` - next(reader.frames()) - ``` - or - ``` - next(reader.stream()) - ``` - in that calling `stream` is like making a database query (that returns and - caches a cursor), and `cursor` is just providing access to the cached - cursor, while calling `frames` directly provides an Iterator without - caching a cursor. - - That said, :meth:`cursor` is empty until :meth:`stream` is called for the - first time. (It requires the initial "query" that :meth:`frames` provides.) - """ - - def __init__(self, sft_info: Optional[SftInfo] = None) -> None: - """ - :param:`sft_info` - an :class:`SftInfo` describing the series frame - type this emitter emits. - """ - super().__init__() - self._sft_info = sft_info - - @property - def sft_info(self): - if not self._sft_info: - raise RuntimeError( - "Illegal state: `_sft_info` is None." - ) - return self._sft_info - - @sft_info.setter - def sft_info(self, val): - self._sft_info = val - - def __iter__(self): return self.stream() - - @abstractmethod - def frames(self) -> Iterator[SeriesFramePackage]: ... - - @property - def cursor(self): - """ - An `Iterator` representing the current pipeline in progress. - """ - return self._cursor - - def stream(self): - """ - Start the pipeline and return an :class:`Iterator` of - :class:`SeriesFramePackage`. - - Each time :meth:`frames` is accessed, a new `Iterator` is instantiated by - accessing the `reader`'s `frames`, which may trigger side effects. It - will also start the process pipeline over, so if you need to iterate - over the frames with multiple calls within a subclass of this, you should - use :meth:`cursor` instead, which returns the same object returned from this - method, without restarting the pipeline. - """ - self._cursor = self.frames() - return self.cursor - - def __or__( - self, - right: Union["SeriesFrameFilter", Type["SeriesFrameFilter"]], - ) -> "SeriesFrameFilter": - if callable(right): - right = right() - - right.reader = self - right.sft_info = self.sft_info - return right - - def sfhls( - self, - max_bytelen: int, - ) -> Iterator[FlatBufferWrapper[SeriesFrameHolderList]]: - """ - Repackage input frames as a series frame holder list (sfhl) iterator. - :param:`max_bytelen` - the max byte length allowed per sfhl. - """ - chunk = list() - chunk_bytelen = 0 - - for pkg in self.frames(): - if chunk_bytelen + len(pkg.frame.bytes) > max_bytelen: - if not chunk: - break - yield sfhl_from_frames(chunk) - chunk_bytelen = chruncate_frames(chunk) - - chunk_bytelen += append_frame(chunk, pkg.frame) - - if chunk: - yield sfhl_from_frames(chunk) - chunk_bytelen = chruncate_frames(chunk) - - -class SeriesFrameFilter(ASeriesFrameEmitter): - """ - Class defining the characteristics of a filter, which is a kind of emitter. - - The difference between a filter and an emitter is: - - 1. an emitter doesn't know how to provide `frames` (no concrete - implementation) - 2. a filter assumes it will read `frames` from another injected emitter - (called a `reader`), and by default simply returns those `frames` - unadulterated. - """ - - def __init__( - self, - reader: Optional[ASeriesFrameEmitter] = None, - **kwargs - ) -> None: - """ - :param:`reader` - input stream this filter reads. - """ - super().__init__(**kwargs) - self._reader = reader - self._cursor: Iterator[SeriesFramePackage] = iter(()) - - @property - def reader(self) -> ASeriesFrameEmitter: - """ - The input stream this filter reads. - """ - if self._reader is None: - raise RuntimeError("Illegal state: `reader` is None.") - return self._reader - - @reader.setter - def reader(self, val): - self._reader = val - - def frames(self): - """ - This has to exist because it's subclassing an abstract class that - declares it, but a concrete implementation of this would be confusing. - Generally speaking, implementations should read the `self.reader` and - do something to modify the frames. - """ - raise NotImplementedError() - - -def sfhl_from_frames( - frames: Iterator[FlatBufferWrapper[SeriesFrame]] -) -> FlatBufferWrapper[SeriesFrameHolderList]: - builder = fb_builder.Builder(0) - builder.Finish( - fb_builder.CreateSeriesFrameHolderList( - builder=builder, - frames=[ - fb_builder.CreateSeriesFrameHolder( - builder=builder, - data=frame.bytes, - ) - for frame in frames - ] - ) - ) - - return FlatBufferWrapper( - _bytes=builder.Output(), - schema=SeriesFrameHolderList, - ) - - -def append_frame( - chunk: List[FlatBufferWrapper[SeriesFrame]], - frame: FlatBufferWrapper[SeriesFrame], -) -> int: - """ - Appends a frame to the given chunk and returns the bytelen it appended. - """ - chunk.append(frame) - return len(frame.bytes) - - -def chruncate_frames( - chunk: List[FlatBufferWrapper[SeriesFrame]], -) -> Literal[0]: - """ - "Truncates" a "chunk" of frames to zero. Returns bytelen (constantly `0`) - to be consistent with :func:`append_frame`. - """ - chunk.clear() - return 0 diff --git a/src/pojagi_dsp/series_frame/ext.py b/src/pojagi_dsp/series_frame/ext.py deleted file mode 100644 index e883ebb..0000000 --- a/src/pojagi_dsp/series_frame/ext.py +++ /dev/null @@ -1,106 +0,0 @@ -from dataclasses import dataclass -import logging -from pathlib import Path -from typing import Dict, List, Optional - -import yaml -from physiq_cloud.time import Instant, Interval, TimeUnit -from pydantic import BaseModel, validator - -from pojagi_dsp.series_frame import SftInfo -from pojagi_dsp.series_frame.generator.sampler import SftInterval - -logger = logging.getLogger(__name__) - - -class ExcerptType(BaseModel): - series_frame_type: str # vci-vitalpatch-telemetry - sft_interval: Optional[SftInterval] - - -class Excerpt(BaseModel): - account: str # uuid4 - id: str # patient1 - interval: Interval # "iso/iso" - organization: str # "maps" - source_id: str # "maps.physiq.io" - types: List[ExcerptType] - - @validator("interval", pre=True) - def timespan_to_interval(cls, timespan: str): return Interval( - **{ - ["start", "end_exclusive"][idx]: Instant.parse_iso8601(stamp) - for idx, stamp in enumerate(timespan.split("/")) - } - ) - - -class ExtBundleConfig(BaseModel): - excerpts: List[Excerpt] - id: str # clinical-event-trigger - type: str = "PhysIQCloud" - - -@dataclass(frozen=True) -class BundleConfig: - excerpts: Dict[str, Excerpt] - id: str - - @staticmethod - def create(bundle_dir: Path) -> "BundleConfig": - yaml_files = (x.resolve() for x in bundle_dir.glob("**/*.yaml")) - - try: - # the first yaml you encounter is BUNDLE_CONFIG.yaml - ext_bundle_config = ExtBundleConfig.parse_obj( - yaml.safe_load(next(yaml_files).read_text()) - ) - except StopIteration as si: - raise RuntimeError( - f"No yaml files found under \"{bundle_dir}\"." - ) from si - - # the rest are INFO.yaml files - info_files = list(yaml_files) - - for excerpt in ext_bundle_config.excerpts: - logger.debug(f"Loading excerpt \"{excerpt.id}\".") - - for type_ in excerpt.types: - logger.debug(f"Loading SFT \"{type_.series_frame_type}\".") - # Make sure the info file belongs to this excerpt - INFO = next( - x for x in info_files - if x.as_posix() - .split("/")[-3] - .startswith(excerpt.id) - if x.as_posix() - .split("/")[-2] - .startswith(type_.series_frame_type) - ) - - sft_info = SftInfo.parse_obj( - yaml.safe_load(INFO.read_text()) - ) - - sft_info.alias = type_.series_frame_type - - type_.sft_interval = SftInterval( - info=sft_info, - interval=excerpt.interval, - frame_count=( - excerpt.interval - .to_duration() - .to_unix(TimeUnit.MICROSECONDS) - // sft_info.frame_size_micros - ), - files=sorted(INFO.parent.glob("**/*.sfhl")), - ) - - return BundleConfig( - id=ext_bundle_config.id, - excerpts={ - x.id: x - for x in ext_bundle_config.excerpts - }, - ) diff --git a/src/pojagi_dsp/series_frame/filter/__init__.py b/src/pojagi_dsp/series_frame/filter/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/pojagi_dsp/series_frame/filter/looping.py b/src/pojagi_dsp/series_frame/filter/looping.py deleted file mode 100644 index cc7c0d7..0000000 --- a/src/pojagi_dsp/series_frame/filter/looping.py +++ /dev/null @@ -1,40 +0,0 @@ -import logging -import math -from typing import Iterator, Union - -from pojagi_dsp.series_frame import (SeriesFrameFilter, - SeriesFramePackage) - -logger = logging.getLogger(__name__) - - -class Looping(SeriesFrameFilter): - """ - Filter that loops through the input reader's frames until it reaches - the given :param:`iterations` - """ - - def __init__( - self, - iterations: Union[int, float] = math.inf, - **kwargs, - ) -> None: - """ - :param:`iterations` - number of times to loop through the `frames`. - """ - super().__init__(**kwargs) - self.iterations = iterations - self.iteration = 0 - - def increment(self) -> None: - """Override to react to `iteration` increment.""" - self.iteration += 1 - logger.debug( - f"Loop incremented ({self.sft_info.alias}): {self.iteration}" - ) - - def frames(self) -> Iterator[SeriesFramePackage]: - while self.iteration < self.iterations: - for pkg in self.reader.frames(): - yield pkg - self.increment() diff --git a/src/pojagi_dsp/series_frame/filter/replaying.py b/src/pojagi_dsp/series_frame/filter/replaying.py deleted file mode 100644 index 5787747..0000000 --- a/src/pojagi_dsp/series_frame/filter/replaying.py +++ /dev/null @@ -1,277 +0,0 @@ -import functools -import logging -from typing import Any, Iterator, List, Optional - -import flatbuffers -import numpy as np -from physiq_cloud.series_frame import fb_builder -from physiq_cloud.series_frame.fb.math import ceildiv -from physiq_cloud.series_frame.fb_to_numpy import from_channel_data -from physiq_cloud.series_frame.fb_wrapper import FlatBufferWrapper -from physiq_cloud.time import Instant, TimeUnit -from physiqfb.ChannelData import ChannelData -from physiqfb.ReadingType import ReadingType -from physiqfb.SamplingSetData import SamplingSetData -from physiqfb.SeriesFrame import SeriesFrame -from pydantic import BaseModel - -from pojagi_dsp.series_frame import ( - ASeriesFrameEmitter, - SeriesFrameFilter, - SeriesFramePackage, - SftInfo, -) - -logger = logging.getLogger(__name__) - - -class Synchronize(BaseModel): - hour: bool = False - minute: bool = False - second: bool = False - microsecond: bool = False - - -class Replaying(SeriesFrameFilter): - """ - Filter that replays input frames starting at a given :class:`Instant` - in time. - """ - - def __init__( - self, - emit_start: Instant = Instant.now(), - synchronize: Optional[Synchronize] = None, - **kwargs, - ) -> None: - """ - :param:`emit_start` - instant in time to replay the frames. - :param:`synchronize` - Whether and at what granularity to synchronize the Instant from the source - data with the configured :param:`emit_start`. This will alter the precision of the - emit start seconds depending on the level of granularity you choose, - possibly drastically. - """ - super().__init__(**kwargs) - - self.frame_idx = 0 - - self.emit_start = emit_start - self.synchronize = synchronize - self._first_frame_id: int | None = None - self._start_frame_id_inst: int | None = None - - @ASeriesFrameEmitter.sft_info.setter - def sft_info(self, val: SftInfo): - """ - This override's :class:`ASeriesFrameFilter`'s :meth:`sft_info` - setter, so that we can initialize the `timestamped_map` property. - """ - self._sft_info = val - self.timestamped_map = { - s.id: s.timestamped for s in self.sft_info.sampling_sets - } - - @property - def start_frame_id(self): - """ - :meth:`frames` must be called before accessing this for the first time, - because the calculation depends on the first frame. - """ - if self._start_frame_id_inst is not None: - return self._start_frame_id_inst - - if not self.synchronize or not any(v for _, v in self.synchronize): - self.synchronized_emit_start = self.emit_start - else: - emit_start_dt = self.emit_start.to_datetime() - source_start_dt = Instant.from_unix( - TimeUnit.MICROSECONDS, - self._first_frame_id * self.sft_info.frame_size_micros, - ).to_datetime() - - self.synchronized_emit_start = Instant.from_datetime( - emit_start_dt.replace( - **{ - k: getattr(source_start_dt, k) - if getattr(self.synchronize, k) - else getattr(emit_start_dt, k) - for k in ["hour", "minute", "second", "microsecond"] - }, - ), - ) - - # We use ceildiv to calculate the first frame ID, because normal - # division would likely yield a frame BEFORE the emit_start. We want to - # start emitting frames at or after emit_start, and ceildiv does that - # for us. - self._start_frame_id_inst = ceildiv( - self.synchronized_emit_start.to_unix( - TimeUnit.MICROSECONDS, - ), - self.sft_info.frame_size_micros, - ) - - return self._start_frame_id_inst - - def replayed_frame( - self, - source: SeriesFrame, - target_frame_id: int, - ) -> FlatBufferWrapper[SeriesFrame]: - builder = flatbuffers.Builder(0) - target = fb_builder.CreateSeriesFrame( - builder=builder, - frame_id=target_frame_id, - ingested_at_micros=-1, # source.IngestedAtMicros(), - sampling_sets_offsets=[ - replayed_sampling_set( - builder=builder, - sampling_set_data=sampling_set_data, - source_frame_id=source.FrameId(), - target_frame_id=target_frame_id, - frame_size_micros=self.sft_info.frame_size_micros, - is_timestamped=self.timestamped_map[sampling_set_data.Id()], - ) - for sampling_set_data in [ - source.SamplingSets(idx) - for idx in range(source.SamplingSetsLength()) - ] - ], - submitter_id=-1, # source.SubmitterId(), - sensor_id=-1, # source.SensorId(), - ) - builder.Finish(target) - target_frame_bytes = builder.Output() - - return FlatBufferWrapper( - _bytes=target_frame_bytes, - schema=SeriesFrame, - ) - - def frames(self) -> Iterator[SeriesFramePackage]: - """ - Each time this property is accessed, it will access the injected - reader anew, but continue to increment the `frame_idx` so that - the next frame will replay where the last one left off. - - To reset the `frame_idx`, client code can just set the `frame_idx` - manually (or create a new instance). - """ - - for pkg in self.reader.frames(): - if self._first_frame_id is None: - self._first_frame_id = pkg.frame.fb.FrameId() - - yield SeriesFramePackage( - frame=self.replayed_frame( - source=pkg.frame.fb, - target_frame_id=self.start_frame_id + self.frame_idx, - ), - sft_info=pkg.sft_info, - ) - # `int` in python 3 is unbounded, can continue indefinitely. Also, - # the `FrameId` is effectively a timestamp, so continuously - # `inc`ing this value is by design. - self.frame_idx += 1 - - -def replayed_sampling_set( - builder: flatbuffers.Builder, - sampling_set_data: SamplingSetData, - source_frame_id: int, - target_frame_id: int, - frame_size_micros: int, - is_timestamped: bool, -): - source_channels = [ - sampling_set_data.Channels(idx) - for idx in range(sampling_set_data.ChannelsLength()) - ] - - target_channels: List[int] = list() - for source_channel_data in source_channels: - is_timestamp_channel = is_timestamped and source_channel_data.Id() == 0 - - if not is_timestamp_channel or (source_frame_id == target_frame_id): - # Just copy verbatim - target_channel_data = replayed_channel_data( - builder, - source_channel_data, - ) - else: - # Update the timestamps relative to the target frameId - source_frame_start_micros = source_frame_id * frame_size_micros - target_frame_start_micros = target_frame_id * frame_size_micros - - # np.copy is used because the returned array is read-only - timestamps = np.copy(from_channel_data(data=source_channel_data)) - timestamps -= source_frame_start_micros # relativize - timestamps += target_frame_start_micros # frame shift - - target_channel_data = fb_builder.CreateChannelData( - builder=builder, - id=0, # `source_channel_data.Id()` is constantly 0 here - readings_offset=fb_builder.CreateInt64Channel( - builder=builder, - data=timestamps.tolist(), # List[int] - ), - readings_type=ReadingType.INT64, - ) - - target_channels.append(target_channel_data) - - np_relative_timestamps = sampling_set_data.RelativeTimestampsAsNumpy() - relative_timestamps = ( - np_relative_timestamps.tobytes() - if np_relative_timestamps not in {0, -1} # error conditions/no data - else None - ) - - return fb_builder.CreateSamplingSetData( - builder=builder, - channel_offsets=target_channels, - id=sampling_set_data.Id(), - start_offset_micros=sampling_set_data.StartOffsetMicros(), - relative_timestamps_type=sampling_set_data.RelativeTimestampsType(), - relative_timestamps_unit_in_micros=sampling_set_data.RelativeTimestampsUnitInMicros(), - relative_timestamps=relative_timestamps, - relative_timestamps_sample_count=sampling_set_data.RelativeTimestampsSampleCount(), - ) - - -def replayed_channel_data( - builder: flatbuffers.Builder, - channel_data: ChannelData, -) -> int: - readings_type = channel_data.ReadingsType() - - return fb_builder.CreateChannelData( - builder=builder, - id=channel_data.Id(), - readings_offset=channel( - builder=builder, - readings_type=readings_type, - readings=from_channel_data(data=channel_data).tolist(), - ), - readings_type=readings_type, - ) - - -reading_type_map = { - ReadingType.BINARY32: fb_builder.CreateBinary32Channel, - ReadingType.BINARY64: fb_builder.CreateBinary64Channel, - ReadingType.INT8: fb_builder.CreateInt8Channel, - ReadingType.INT16: fb_builder.CreateInt16Channel, - ReadingType.INT32: fb_builder.CreateInt32Channel, - ReadingType.INT64: fb_builder.CreateInt64Channel, - ReadingType.NAMED_READING: fb_builder.CreateNamedReadingChannel, - ReadingType.STRING: fb_builder.CreateStringChannel, -} - - -def channel( - builder: flatbuffers.Builder, - readings_type: int, - readings: List[Any], -) -> int: - return reading_type_map[readings_type](builder=builder, data=readings) diff --git a/src/pojagi_dsp/series_frame/filter/updating.py b/src/pojagi_dsp/series_frame/filter/updating.py deleted file mode 100644 index 331b4b3..0000000 --- a/src/pojagi_dsp/series_frame/filter/updating.py +++ /dev/null @@ -1,76 +0,0 @@ -import logging -from typing import Callable, Iterator - -from physiq_cloud.time import Duration, Instant, TimeUnit - -from pojagi_dsp.series_frame import SeriesFrameFilter, SeriesFramePackage - -logger = logging.getLogger(__name__) - - -class Updating(SeriesFrameFilter): - """ - Filter that outputs frames until it runs out or the next frame would - surpass a given :class:`Instant` in time, whichever comes first. - """ - - def __init__( - self, - until: Callable[[], Instant] = Instant.now, - **kwargs, - ): - """ - :param:`until` - closure returning an `Instant` at which to stop - iterating when the next frame would surpass it. The cursor will - otherwise persist until it has been exhausted. - """ - super().__init__(**kwargs) - self.until = until - self.last_update_count = 0 - self._next_pkg = None - - def log_status(filter, end: Instant, until: Instant): - between = Duration.between(end, until).to_unix(TimeUnit.MILLISECONDS) / float( - 1000 - ) - - logger.debug( - f"{filter.sft_info.alias}: Update scope : {between} seconds" - if end < until - else ( - f"{filter.sft_info.alias}: " - f"Next update available in {-between} seconds" - ) - ) - - def frames(self) -> Iterator[SeriesFramePackage]: - self.last_update_count = 0 - - if self._next_pkg is None: - cursor = self.reader.stream() - self._next_pkg = next(cursor) - else: - cursor = self.reader.cursor - - logger.debug(f"{self.sft_info.alias}: {cursor}") - - try: - until = self.until() - end = self._next_pkg.end - - self.log_status(end, until) - - while end < until: - yield self._next_pkg - self.last_update_count += 1 - self._next_pkg = next(cursor) - end = self._next_pkg.end - except StopIteration: - logger.warning("StopIteration caught") - logger.warning(f"{self.sft_info.alias}: {cursor}") - ... - - if self.last_update_count: - logger.debug( - f"{self.sft_info.alias}: " f"Update frames : {self.last_update_count}" - ) diff --git a/src/pojagi_dsp/series_frame/generator/sampler/__init__.py b/src/pojagi_dsp/series_frame/generator/sampler/__init__.py deleted file mode 100644 index f87cc0e..0000000 --- a/src/pojagi_dsp/series_frame/generator/sampler/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -from dataclasses import dataclass -from pathlib import Path -from typing import List - -from physiq_cloud.time import Interval - -from pojagi_dsp.series_frame import SftInfo - - -@dataclass(frozen=True) -class SftInterval: - info: SftInfo - interval: Interval - frame_count: int - files: List[Path] diff --git a/src/pojagi_dsp/series_frame/generator/sampler/file.py b/src/pojagi_dsp/series_frame/generator/sampler/file.py deleted file mode 100644 index 4fd2e95..0000000 --- a/src/pojagi_dsp/series_frame/generator/sampler/file.py +++ /dev/null @@ -1,95 +0,0 @@ -import logging -import math -from pathlib import Path -from typing import Iterator - -from physiq_cloud.series_frame.fb_wrapper import FlatBufferWrapper -from physiqfb.SeriesFrame import SeriesFrame -from physiqfb.SeriesFrameHolderList import SeriesFrameHolderList - -from pojagi_dsp.series_frame import (ASeriesFrameEmitter, - SeriesFramePackage) - - -logger = logging.getLogger(__name__) - - -class FileSampler(ASeriesFrameEmitter): - def __init__( - self, - *sfhl_files: Path, - frame_count: int = math.inf, - frame_offset: int = 0, - **kwargs, - ) -> None: - """ - :param:`sfhl_files` - One or more files containing sfhl data. It's - assumed that the files are sorted and contain contiguous (sorted) - data, and no effort is made to validate that or handle any error - condition related to correctness. The files are treated as a single - logical file, and an `EOFError` is raised if the `frame_count` wants - more than the available frames. - - :param:`frame_count` - The number of frames this generator will - attempt to read from the given files. - - :param:`frame_offset` - The number of frames to skip over before - yielding frames from the given files. - """ - super().__init__(**kwargs) - self.files = sfhl_files - self.frame_offset = frame_offset - self.frame_count = frame_count - self.frame_idx = 0 - - def frames_from_sfhl_bytes( - self, data: bytes, - ) -> Iterator[FlatBufferWrapper[SeriesFrame]]: - """ - Emit frames from series frame holder list data. - - :param:`data` - sfhl bytes. - """ - sfhl = SeriesFrameHolderList.GetRootAsSeriesFrameHolderList(data, 0) - for index in range(sfhl.FramesLength()): - yield FlatBufferWrapper( - _bytes=sfhl.Frames(index).DataAsNumpy().tobytes(), - schema=SeriesFrame, - ) - - def frames(self) -> Iterator[SeriesFramePackage]: - self.frame_idx = 0 - end_frame_idx = self.frame_offset + self.frame_count - - for file in self.files: - """ - Finite set of files passed in. - """ - for frame in self.frames_from_sfhl_bytes(file.read_bytes()): - """ - Finite set of frames from `read_bytes`. - """ - if self.frame_idx < self.frame_offset: - """ - Fast-forward to the declared offset, for each file. - """ - self.frame_idx += 1 - continue - elif self.frame_idx > end_frame_idx: - """ - `end_frame_idx` is finite. - """ - break - - yield SeriesFramePackage(frame=frame, sft_info=self.sft_info) - self.frame_idx += 1 - - if end_frame_idx != math.inf: - if self.frame_idx < end_frame_idx: - frames_read = self.frame_idx - self.frame_offset - raise EOFError( - f"Failed to read {self.frame_count - frames_read} frames. " - "Not enough data.\n" - f"\toffset : {self.frame_offset}\n" - f"\tframes read : {frames_read}" - ) diff --git a/src/pojagi_dsp/series_frame/generator/synthesizer/frame/vci_vitalpatch_telemetry.py b/src/pojagi_dsp/series_frame/generator/synthesizer/frame/vci_vitalpatch_telemetry.py deleted file mode 100644 index d3fdb91..0000000 --- a/src/pojagi_dsp/series_frame/generator/synthesizer/frame/vci_vitalpatch_telemetry.py +++ /dev/null @@ -1,48 +0,0 @@ -from typing import Iterator - -import flatbuffers -from physiq_cloud.series_frame import fb_builder -from physiq_cloud.series_frame.fb_wrapper import FlatBufferWrapper -from physiq_cloud.time import Instant, TimeUnit -from physiqfb import SeriesFrame - -from pojagi_dsp.series_frame import (ASeriesFrameEmitter, - SeriesFramePackage) - - -class VCIVitalPatchTelemetrySynthesizer(ASeriesFrameEmitter): - - def frames(self) -> Iterator[SeriesFramePackage]: - builder = flatbuffers.Builder(0) - target = fb_builder.CreateSeriesFrame( - builder=builder, - frame_id=Instant.now().to_unix( - TimeUnit.MICROSECONDS, - ) // self.sft_info.frame_size_micros, - ingested_at_micros=-1, - sampling_sets_offsets=[ - # replayed_sampling_set( - # builder=builder, - # sampling_set_data=sampling_set_data, - # source_frame_id=source.FrameId(), - # target_frame_id=target_frame_id, - # frame_size_micros=self.sft_info.frame_size_micros, - # is_timestamped=self.timestamped_map[ - # sampling_set_data.Id() - # ], - # ) - # for sampling_set_data in [ - # source.SamplingSets(idx) - # for idx in range(source.SamplingSetsLength()) - # ] - ], - submitter_id=-1, - sensor_id=-1, - ) - builder.Finish(target) - target_frame_bytes = builder.Output() - - return FlatBufferWrapper( - _bytes=target_frame_bytes, - schema=SeriesFrame, - ) diff --git a/tests/channel/pipeline_test.py b/tests/channel/pipeline_test.py index 3118765..ffb5b88 100644 --- a/tests/channel/pipeline_test.py +++ b/tests/channel/pipeline_test.py @@ -1,35 +1,46 @@ -from copy import deepcopy import math -from typing import Iterator import pytest -from pojagi_dsp.channel import ASignal, Constantly, Filter, FilterFunction, SignalFunction, IllegalStateError, Map +from pojagi_dsp.channel import ( + ASignal, + Constantly, + Filter, + FilterFunction, + SignalFunction, + IllegalStateException, + Reduce, +) from pojagi_dsp.channel.generator.sine import SineWave @pytest.fixture -def srate(): return 44100 +def srate(): + return 44100 @pytest.fixture -def freq(): return 440 +def freq(): + return 440 @pytest.fixture -def const(srate): return Constantly(42, srate=srate) +def const(srate): + return Constantly(42, srate=srate) @pytest.fixture -def sine(srate, freq): return SineWave(freq, srate=srate) +def sine(srate, freq): + return SineWave(freq, srate=srate) @pytest.fixture def sine_generator_factory(srate, freq): def sine(): phase = 0.0 - inc = (2 * math.pi * freq)/srate + inc = (2 * math.pi * freq) / srate while True: yield math.sin(phase) phase += inc + return sine @@ -73,14 +84,14 @@ def test_filter_nested_expression(const: Constantly): def test_reader(const: Constantly): filter = Filter() - with pytest.raises(IllegalStateError, match=".reader. is None"): + with pytest.raises(IllegalStateException, match=".reader. is None"): filter.reader assert (const | filter).reader == const def test_pipeline_missing_reader(const: ASignal): pipeline = Filter | Filter | Filter - with pytest.raises(IllegalStateError, match=".reader. is None"): + with pytest.raises(IllegalStateException, match=".reader. is None"): next(pipeline) assert next(const | pipeline) @@ -94,13 +105,13 @@ def test_filter_can_only_be_assigned_one_generator(const: Constantly): def test_add_tuple(const: Constantly): pipeline = const + (100, 200, 300) - assert isinstance(pipeline, Map) + assert isinstance(pipeline, Reduce) assert next(pipeline) == const.constant + (100 + 200 + 300) def test_mul_tuple(const: Constantly): pipeline = const * (100, 200, 300) - assert isinstance(pipeline, Map) + assert isinstance(pipeline, Reduce) assert next(pipeline) == const.constant * (100 * 200 * 300)