import abc import copy import datetime import inspect from itertools import islice import logging import math import operator from collections.abc import Iterable from functools import reduce import types from typing import (Any, Callable, Generator, Generic, Iterator, Optional, Type, TypeVar, Union) logger = logging.getLogger(__name__) T = TypeVar("T") class IllegalStateException(ValueError): ... def coerce_channels(x: Any) -> Iterator["ASignal"]: if isinstance(x, ASignal): yield x else: if callable(x): if isinstance(x, Type): yield x() else: yield SignalFunction(x) elif isinstance(x, Iterable): # and not isinstance(x, str): for it in (coerce_channels(y) for y in x): for channel in it: yield channel else: yield Constantly(x) class ASignalMeta(abc.ABCMeta): def __or__(self, other: Any) -> "Filter": """ Allows `|` composition starting from an uninitialized class. See doc for `__or__` below in `ASignal`. """ return self() | coerce_channels(other) def __radd__(self, other): return self() + other def __add__(self, other): return self() + other def __rmul__(self, other): return self() * other def __mul__(self, other): return self() * other class ASignal(Generic[T], metaclass=ASignalMeta): def __init__(self, srate: Optional[float] = None): self._srate = srate self._cursor: Optional[Iterator[T]] = None @property def srate(self): if self._srate is None: raise IllegalStateException( f"{self.__class__}: `srate` is None." ) return self._srate @srate.setter def srate(self, val: float): self._srate = val def __iter__(self): self._cursor = self.samples() return self def __next__(self): return next(self.cursor) @abc.abstractmethod def samples(self) -> Iterator[T]: ... @property def cursor(self): """ An `Iterator` representing the current pipeline in progress. """ if self._cursor is None: # this can only happen once self._cursor = self.samples() return self._cursor def __getstate__(self): """ `_cursor` is a generator, and generators aren't picklable. """ state = self.__dict__.copy() if state.get("_cursor"): del state["_cursor"] return state def stream(self): while True: try: yield next(self.cursor) except StopIteration: self = iter(self) def of_duration(self, duration: datetime.timedelta): """ Returns an `Iterator` of samples for a particular duration expressed as a `datetime.timedelta` :param:`duration` - `datetime.timedelta` representing the duration """ return islice( self.stream(), 0, math.floor(self.srate * duration.total_seconds()), ) def __or__( left, right: Union["Filter", Callable, Iterable], ) -> "Filter": """ Allows composition of filter pipelines with `|` operator. e.g., ``` myFooGenerator | BarFilter | baz_filter_func | (lambda reader: (x for x in reader)) ``` """ if isinstance(right, SignalFunction): return left | FilterFunction(fn=right._fn, name=right.Function) if not isinstance(right, ASignal): return reduce(operator.or_, (left, *coerce_channels(right))) if not isinstance(right, Filter): raise ValueError( f"Right side must be a `{Filter.__name__}`; " f"received: {type(right)}", ) filter: Filter = right while getattr(filter, "_reader", None) is not None: # Assuming this is a filter pipeline, we want the last node's # reader to be whatever's on the left side of this operation. filter = filter.reader if hasattr(filter, "_reader"): # We hit the "bottom" and found a filter. filter.reader = left else: # We hit the "bottom" and found a non-filter/generator. raise ValueError( f"{right.__class__.__name__}: filter pipeline already has a " "generator." ) # Will often be `None` unless `left` is a generator. right.srate = left._srate return right def __radd__(right, left): return right.__add__(left) def __add__(left, right): return left._operator_impl(operator.add, right) def __rmul__(right, left): return right.__mul__(left) def __mul__(left, right): return left._operator_impl(operator.mul, right) # FIXME: other operators? Also, shouldn't `*` mean convolve instead? def _operator_impl(left, operator: Callable[..., T], right: Any): channels = list(coerce_channels(right)) for channel in channels: if channel._srate is None: channel.srate = left._srate return Reduce(operator, left, *channels, srate=left._srate) def __repr__(self): members = {} for k in [k for k in dir(self) if not k.startswith("_") and not k in {"stream", "reader", "cursor", "wave", }]: try: v = getattr(self, k) if not inspect.isroutine(v): members[k] = v except IllegalStateException as e: members[k] = None return ( f"{self.__class__.__name__}" f"""({ f", ".join( f"{k}={v}" for k, v in members.items() ) })""" ) S = TypeVar("S", bound=ASignal) class Reduce(ASignal, Generic[S, T]): def __init__( self, # FIXME: typing https://stackoverflow.com/a/67814270 fn: Callable[..., T], *streams: S, srate: Optional[float] = None, stateful=False, ): super().__init__(srate) self._fn = fn self.fn = fn.__name__ self.streams = [] for stream in streams: if stateful: self.streams.append(stream) continue stream_ = ( copy.deepcopy(stream) if not isinstance(stream, types.GeneratorType) else stream ) stream_.srate = srate self.streams.append(stream_) @property def srate(self): return ASignal.srate.fget(self) @srate.setter def srate(self, val: float): ASignal.srate.fset(self, val) for stream in self.streams: if isinstance(stream, ASignal): stream.srate = val def samples(self): return ( reduce(self._fn, args) for args in zip(*self.streams) ) class Filter(ASignal, Generic[S]): def __init__( self, reader: Optional[S] = None, srate: Optional[float] = None, ): super().__init__(srate) self.reader: Optional[S] = reader @property def reader(self) -> S: """ The input stream this filter reads. """ if not self._reader: raise IllegalStateException( f"{self.__class__}: `reader` is None." ) return self._reader @reader.setter def reader(self, val: S): self._reader = val if val is not None and self._srate is None: self.srate = val._srate @property def srate(self): return ASignal.srate.fget(self) @srate.setter def srate(self, val: float): ASignal.srate.fset(self, val) child = getattr(self, "_reader", None) previous_srate = val while child is not None: # Since `srate` is optional at initialization, but required in # general, we make our best attempt to normalize it for the # filter pipeline, which should be consistent for most # applications, by applying it to all children. if child._srate is None: child.srate = previous_srate child: Optional[ASignal] = getattr(child, "_reader", None) if isinstance(child, ASignal) and child._srate is not None: previous_srate = child._srate def samples(self) -> Iterator[T]: return self.reader.samples() def __repr__(self): return ( f"{self._reader} | {super().__repr__()}" ) class FilterFunction(Filter, Generic[T, S]): def __init__( self, fn: Callable[[S], Iterator[T]], name: Optional[str] = None, reader: Optional[S] = None, srate: Optional[float] = None, ): super().__init__(reader, srate) self._fn = fn self.Function = name if name else fn.__name__ def samples(self): return self._fn(self.reader) class SignalFunction(ASignal, Generic[T]): def __init__( self, fn: Callable[[int], Iterator[T]], name: Optional[str] = None, srate: Optional[float] = None, ): super().__init__(srate) self._fn = fn self.Function = name if name else fn.__name__ def samples(self) -> Iterator[T]: return self._fn(self.srate) class Constantly(ASignal, Generic[T]): def __init__(self, constant: T, srate: float = 0.0): super().__init__(srate) self.constant = constant def samples(self) -> Iterator[T]: while True: yield self.constant