Module curvepy.curve

Expand source code
import math
import weakref
import inspect
import arrow
import numpy as np
from numbers import Number
from collections.abc import Sequence, Mapping
from intervalpy import Interval
from .const import GOLD
from . import util

MIN_STEP = 1e-5

# TODO: Implement Duration and use its next ad previous methods
# Or make a super class which is not tied to a time interval.

_func_obj = None

class Curve:

    _token_counter = 0

    @classmethod
    def empty(cls):
        from .empty import Empty
        return Empty()

    @property
    def min_step(self):
        return self._min_step

    @min_step.setter
    def min_step(self, value):
        self._min_step = value

    @property
    def domain(self):
        if self.needs_domain_update:
            self._domain = self.get_domain()
        return self._domain

    @property
    def update_interval(self):
        return self._begin_update_interval

    @property
    def is_updating(self):
        return not self.update_interval.is_empty

    def __init__(self, min_step=None):
        self.name = None
        self._domain = None
        self._observer_data = {}
        self._ordered_observer_tokens = []
        self._begin_update_interval = Interval.empty()
        self._end_update_interval = Interval.empty()
        self.min_step = min_step

    def __call__(self, *args):
        return self.y(args[0])

    def __repr__(self):
        try:
            if bool(self.name):
                return self.name
            return f'{type(self).__name__}("{self.domain}")'
        except Exception as e:
            return super().__repr__() + f'({e})'

    def y(self, x):
        raise Exception("Not implemented")

    def y_start(self):
        return self.y(self.domain.start)

    def y_end(self):
        return self.y(self.domain.end)

    def first_point(self):
        d = self.domain
        if d.is_empty:
            return None
        return (d.start, self.y(d.start))

    def last_point(self):
        d = self.domain
        if d.is_empty:
            return None
        return (d.end, self.y(d.end))

    def d_y(self, x, forward=False, min_step=MIN_STEP, limit=None):
        min_step = self.resolve_min_step(min_step)
        if forward:
            x1 = self.x_next(x, min_step=min_step, limit=limit)
        else:
            x1 = self.x_previous(x, min_step=min_step, limit=limit)

        if x1 is None:
            return None
        y1 = self.y(x1)
        if y1 is None:
            return None
        y = self.y(x)
        if y is None:
            return None
        if x1 == x:
            dy = math.inf if y1 >= y else -math.inf
            if not forward:
                dy = -dy
        else:
            dy = (y1 - y) / (x1 - x)
        return dy

    def x(self, y):
        raise Exception("Not implemented")

    def x_next(self, x, min_step=MIN_STEP, limit=None):
        min_step = self.resolve_min_step(min_step)
        if math.isinf(min_step):
            x1 = self.domain.end
        else:
            x1 = x + min_step
        if limit is not None and x1 > limit:
            x1 = limit
        if not self.domain.contains(x1, enforce_start=False):
            return None
        return x1

    def x_previous(self, x, min_step=MIN_STEP, limit=None):
        min_step = self.resolve_min_step(min_step)
        if math.isinf(min_step):
            x1 = self.domain.start
        else:
            x1 = x - min_step
        if limit is not None and x1 < limit:
            x1 = limit
        if not self.domain.contains(x1, enforce_end=False):
            return None
        return x1

    def previous_point(self, x, min_step=MIN_STEP):
        x1 = self.x_previous(x, min_step=min_step)
        if x1 is None:
            return None
        y1 = self.y(x1)
        return (x1, y1)

    def next_point(self, x, min_step=MIN_STEP):
        x1 = self.x_next(x, min_step=min_step)
        if x1 is None:
            return None
        y1 = self.y(x1)
        return (x1, y1)

    def get_domain(self):
        return Interval.empty()

    def resolve_min_step(self, min_step):
        if min_step is None and self.min_step is None:
            return None
        elif min_step is None:
            return self.min_step
        elif self.min_step is None:
            return min_step
        else:
            return max(min_step, self.min_step)

    def sample_points(self, domain=None, min_step=MIN_STEP, step=None):
        min_step = self.resolve_min_step(min_step)
        if domain is None:
            domain = self.domain
        else:
            domain = Interval.intersection([self.domain, domain])
        if domain.is_empty:
            return []
        elif not domain.is_finite:
            raise Exception("Cannot sample points on an infinite domain {}. Specify a finite domain.".format(domain))
        x_start, x_end = domain
        x_end_bin = round(x_end / min_step) if min_step is not None else x_end
        if domain.start_open:
            points = []
        else:
            points = [(x_start, self.y(x_start))]
        if step is not None:
            x = x_start + step
            while x <= x_end:
                y = self.y(x)
                points.append((x, y))
                x += step
        elif min_step is not None and min_step > 0:
            x = self.x_next(x_start, min_step=min_step, limit=x_end)
            while x is not None and x <= x_end:
                y = self.y(x)
                points.append((x, y))
                x_bin = round(x / min_step) if min_step is not None else x
                if x_bin == x_end_bin:
                    break
                x1 = self.x_next(x, min_step=min_step, limit=x_end)
                if x1 is not None:
                    x1_bin = round(x1 / min_step) if min_step is not None else x1
                    if x1_bin <= x_bin:
                        raise Exception('Next x value {} should be greater than the previous x value {} by at least the minimum step of {}'.format(x1, x, min_step))
                x = x1

            if not domain.end_open and points[-1][0] != x_end:
                points.append((x_end, self.y(x_end)))
        else:
            raise Exception("Bad functions sample parameters.")

        return points

    def sample_points_from_x(self, x, limit, backward=False, open=False, min_step=None):
        assert limit is not None
        if limit < 0:
            limit = -limit
            backward = not backward
        min_step = self.resolve_min_step(min_step)
        points = []
        x1 = x
        i = 0

        if not open:
            if x is None:
                return points
            y = self.y(x)
            if y is None:
                return points
            i += 1

        while limit is None or i < limit:
            if not backward:
                x1 = self.x_next(x1, min_step=min_step)
            else:
                x1 = self.x_previous(x1, min_step=min_step)
            if x1 is None:
                break
            y1 = self.y(x1)
            if y1 is None:
                break
            points.append((x1, y1))
            i += 1

        return points

    def get_range(self, domain=None, **kwargs):
        points = self.sample_points(domain=domain, **kwargs)
        low = None
        high = None
        for p in points:
            if low is None or p[1] < low:
                low = p[1]
            if high is None or p[1] > high:
                high = p[1]
        if low is None or high is None:
            return Interval.empty()
        return Interval(low, high)

    def minimise(self, x, min_step=MIN_STEP, step=None, max_iterations=1000):
        x_min = x
        x_min_previous = None
        iterations = 0
        while iterations < max_iterations:
            iterations += 1
            y = self.y(x_min)
            if y is None:
                return x_min_previous
            dy0 = self.d_y(x_min, forward=False)
            dy1 = self.d_y(x_min, forward=True)
            forward = True
            if dy0 is None and dy1 is None:
                return x_min
            elif dy0 is None:
                if dy1 <= 0:
                    forward = True
                else:
                    # Sloping into null value
                    return None
            elif dy1 is None:
                if dy0 >= 0:
                    forward = False
                else:
                    # Sloping into null value
                    return None
            else:
                if dy0 * dy1 < 0 and dy0 <= 0 and dy1 >= 0:
                    # Found minimum
                    return x_min

                if dy0 * dy1 < 0:
                    # Found maximum
                    forward = abs(dy0) < abs(dy1)
                else:
                    # On slope
                    forward = dy1 < 0

            x_min_previous = x_min
            if forward:
                if step is not None:
                    x_min += step
                else:
                    x_min = self.x_next(x_min, min_step=min_step)
            else:
                if step is not None:
                    x_min -= step
                else:
                    x_min = self.x_previous(x_min, min_step=min_step)
        return x_min

    def maximise(self, x, min_step=MIN_STEP, step=None, max_iterations=1000):
        x_max = x
        x_max_previous = None
        iterations = 0
        while iterations < max_iterations:
            iterations += 1
            y = self.y(x_max)
            if y is None:
                return x_max_previous
            dy0 = self.d_y(x_max, forward=False)
            dy1 = self.d_y(x_max, forward=True)
            forward = True
            if dy0 is None and dy1 is None:
                return x_max
            elif dy0 is None:
                if dy1 >= 0:
                    forward = True
                else:
                    # Sloping into null value
                    return None
            elif dy1 is None:
                if dy0 <= 0:
                    forward = False
                else:
                    # Sloping into null value
                    return None
            else:
                if dy0 * dy1 < 0 and dy0 >= 0 and dy1 <= 0:
                    # Found maximum
                    return x_max

                if dy0 * dy1 < 0:
                    # Found minimum
                    forward = abs(dy0) < abs(dy1)
                else:
                    # On slope
                    forward = dy1 > 0

            x_max_previous = x_max
            if forward:
                if step is not None:
                    x_max += step
                else:
                    x_max = self.x_next(x_max, min_step=min_step)
            else:
                if step is not None:
                    x_max -= step
                else:
                    x_max = self.x_previous(x_max, min_step=min_step)
        return x_max

    def regression(self, domain=None, min_step=MIN_STEP, step=None):
        points = self.sample_points(domain=domain, min_step=min_step, step=step)
        for p in points:
            if p[1] is None:
                return None
        count = len(points)
        if count < 2:
            return None
            
        from .line import Line
        if count == 2:
            return Line(p1=points[0], p2=points[1])
        xy = np.vstack(points)
        x = xy[:,0]
        y = xy[:,1]
        A = np.array([x, np.ones(count)])

        # Regression
        w = np.linalg.lstsq(A.T, y, rcond=None)[0]
        m = w[0]
        c = w[1]

        return Line(const=c, slope=m)

    def add_observer(self, *obj, domain=None, begin=None, end=None, autoremove=False, prioritize=False):
        if begin is None and end is None:
            return 0
        
        Curve._token_counter += 1
        token = Curve._token_counter
        domain = Interval.parse(domain, default_inf=True)
        obj_ref = None

        if len(obj) != 0:
            if autoremove:
                # Remove observer automatically
                obj_ref = weakref.ref(obj[0], lambda _: self.remove_observer(token))
            else:
                # Calling remove_observer() is required
                obj_ref = weakref.ref(obj[0])
        elif autoremove:
            raise Exception('Autoremoving an observer requires an object')

        # Do the callback functions require the domain?
        begin_with_interval = False
        end_with_interval = False
        if begin:
            begin_with_interval = util.count_positional_args(begin) == 1
        if end:
            end_with_interval = util.count_positional_args(end) == 1

        # TODO: does saving strong references to callbacks create a retain cycle?
        self._observer_data[token] = (obj_ref, domain, begin, end, begin_with_interval, end_with_interval)
        if prioritize:
            self._ordered_observer_tokens.insert(0, token)
        else:
            self._ordered_observer_tokens.append(token)
        return token

    def remove_observer(self, token_or_obj):
        if isinstance(token_or_obj, Number):
            if token_or_obj in self._observer_data:
                del self._observer_data[token_or_obj]
                self._ordered_observer_tokens.remove(token_or_obj)
        else:
            for token in list(self._ordered_observer_tokens):
                obj_ref = self._observer_data[token][0]
                if obj_ref is not None:
                    obj = obj_ref()
                    if obj is None or obj == token_or_obj:
                        del self._observer_data[token]
                        self._ordered_observer_tokens.remove(token)

    def begin_update(self, domain):
        if domain.is_empty or self._begin_update_interval.is_superset_of(domain):
            return
        self._begin_update_interval = Interval.union([self._begin_update_interval, domain])
        for token in self._ordered_observer_tokens:
            _, callback_interval, callback, _, callback_with_interval, _ = self._observer_data[token]
            if callback_interval is None or domain.intersects(callback_interval):
                if callback is not None:
                    if callback_with_interval:
                        callback(domain)
                    else:
                        callback()

    def end_update(self, domain):
        if domain.is_empty or self._end_update_interval.is_superset_of(domain):
            return
        self._end_update_interval = Interval.union([self._end_update_interval, domain])
        if not self._end_update_interval.is_superset_of(self._begin_update_interval):
            # Keep collecting updates
            return
        
        # Updates complete
        update_interval = self._end_update_interval
        self._begin_update_interval = Interval.empty()
        self._end_update_interval = Interval.empty()
        self.set_needs_interval_update()
        for token in list(self._ordered_observer_tokens):
            _, callback_interval, _, callback, _, callback_with_interval = self._observer_data[token]
            if callback_interval is None or update_interval.intersects(callback_interval):
                if callback is not None:
                    if callback_with_interval:
                        callback(update_interval)
                    else:
                        callback()

    @property
    def needs_domain_update(self):
        return self._domain is None

    def set_needs_interval_update(self):
        self._domain = None

    def map(self, tfm, skip_none=False, name=None, **kwargs):
        from .map import Map
        return Map(self, tfm, skip_none=skip_none, name=name, **kwargs)

    def accumulator_map(self, tfm, degree, is_period=False, interpolation=None, min_step=MIN_STEP, uniform=True):
        from .accumulator_map import AccumulatorMap
        return AccumulatorMap(
            self,
            tfm,
            degree,
            is_period=is_period,
            interpolation=interpolation,
            min_step=min_step,
            uniform=uniform
        )

    def offset(self, x, duration=None):
        from .offset import Offset
        return Offset(self, x, duration=duration)

    def add(self, func):
        return Curve.add_many([self, func])

    def subtract(self, func):
        return Curve.subtract_many([self, func])

    def multiply(self, func):
        return Curve.multiply_many([self, func])

    def divide(self, func):
        return Curve.divide_many([self, func])

    def pow(self, power):
        return type(self).pow_many([self, power])

    def raised(self, base):
        return type(self).pow_many([base, self])

    def log(self, base=math.e):
        return type(self).log_many([self, base])

    def integral(self, const=0, interpolation=None, uniform=True):
        from .integral import Integral

        return Integral(self, const=const, interpolation=interpolation, uniform=uniform)

    def additive_inverse(self):
        return self.map(_additive_inverse)

    def multiplicative_inverse(self):
        return self.map(_multiplicative_inverse)

    def abs(self):
        return self.map(_abs)

    def blend(self, func, x_blend_start, x_blend_stop):
        from .aggregate import Aggregate
        from .piecewise import Piecewise
        
        x_blend_period = x_blend_stop - x_blend_start
        def blend_f(x, ys):
            u = (x - x_blend_start) / x_blend_period
            return (1.0 - u) * ys[0] + u * ys[1]
        c = Aggregate([self, func], tfm=blend_f, name='blend')

        funcs = [self, c, func]
        domains = self.domain.partition([x_blend_start, x_blend_stop])
        return Piecewise(funcs, domains)

    def extension(self, name, start=False, end=True, raise_on_empty=False, **kwds):
        from .extension import ConstantExtension
        from .extension import TangentExtension
        from .extension import SinExtension

        classes = [            
            ConstantExtension,
            TangentExtension,
            SinExtension,
        ]

        for c in classes:
            if c.name == name:
                return c(self, start=start, end=end, raise_on_empty=raise_on_empty, **kwds)
        
        raise Exception('Unknown extension type')

    # def wave_extended(self, ref_func, min_deviation=0, start=None, step=None, min_step=MIN_STEP):
    #     if self.domain.is_positive_infinite:
    #         return self
    #     ref_func = Curve.parse(ref_func)
    #     extremas = Extremas(self, ref_func, min_deviation=min_deviation, start=start, step=step, min_step=min_step)

    # def mom(self, degree, duration, **kwargs):
    #     """
    #     Returns the momentum of the reciever.

    #     The degree corresponds to the number of steps to take.
    #     """
    #     degree = int(degree)
    #     if degree < 1:
    #         raise ValueError(f'Momentum requires a positive degree, got: {degree}')
    #     from pyduration import Duration
    #     duration = Duration.parse(duration)
    #     def _mom(x, y):
    #         if y is None:
    #             return None
    #         # step back
    #         x0 = duration.step(x, -degree)
    #         y0 = self.y(x0)
    #         if y0 is None:
    #             return None
    #         return y - y0
    #     return self.map(_mom, name=f'mom({degree})', **kwargs)

    def sma(self, degree, is_period=False, **kwargs):
        from .sma import SMA
        return SMA(self, degree, is_period=is_period, **kwargs)

    def ema(self, degree, is_period=False, init=None, **kwargs):
        from .ema import EMA
        return EMA(self, degree, is_period=is_period, init=init, **kwargs)

    def smma(self, degree, **kwargs):
        from .sma import SMA
        from .ema import EMA
        sma = SMA(self, degree, is_period=False, **kwargs)
        ema = EMA(self, 1 / degree, is_period=False, init=sma, **kwargs)
        return ema

    def harmonic_smas(self, base_degree, count, stride=1, is_period=False, **kwargs):
        """
        Returns `count` SMAs from small to large. Their degrees
        are proportional to the golden ratio.
        """
        periods = []
        smas = []
        step = stride + 1

        for i in range(count):
            period = base_degree * GOLD ** float(i * step)
            period = round(period / base_degree) * base_degree
            periods.append(period)

        for i in range(count):
            period = periods[i]
            sma = self.sma(period, is_period=is_period, **kwargs)
            smas.append(sma)

        return smas

    def centered_macs(self, base_degree, count, stride=1, is_period=False, **kwargs):
        periods = []
        smas = []
        step = stride + 1

        for i in range(count):
            period = base_degree * GOLD ** float(i * step)
            period = round(period / base_degree) * base_degree
            periods.insert(0, period)

        for i in range(count):
            period = periods[i]
            sma = self.sma(period, is_period=is_period, **kwargs)
            smas.append(sma)

        return smas

    def rsi(self, degree, **kwargs):
        d = self.differential()
        du = Curve.max([d, 0], ignore_empty=False)
        dd = Curve.max([-d, 0], ignore_empty=False)
        rs = du.ema(1 / degree, **kwargs) / dd.ema(1 / degree, **kwargs)
        rsi = 100 - 100 / (1 + rs)
        rsi.name = f'rsi({degree})'
        return rsi

    def trailing_min(self, degree, is_period=False, interpolation=None, min_step=MIN_STEP, uniform=True):
        return self.accumulator_map(
            min,
            degree,
            is_period=is_period,
            interpolation=interpolation,
            min_step=min_step,
            uniform=uniform
        )

    def trailing_max(self, degree, is_period=False, interpolation=None, min_step=MIN_STEP, uniform=True):
        return self.accumulator_map(
            max,
            degree,
            is_period=is_period,
            interpolation=interpolation,
            min_step=min_step,
            uniform=uniform
        )

    def differential(self, forward=False):
        from .map import Map
        d = Map(self, lambda x, y: self.d_y(x, forward=forward))
        d.name = 'diff'
        return d

    def subset(self, domain):
        from .generic import Generic
        return Generic(self, domain=domain, min_step=self.min_step)

    @classmethod
    def first(cls, funcs, *args):
        """
        Return a func which returns the first value which is not `None`.
        """
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        from .aggregate import Aggregate

        def first_val(x, vals):
            for v in vals:
                if v is not None:
                    return v
            return None

        funcs = Curve.parse_many(funcs)
        return Aggregate(funcs, tfm=first_val, union=True, name='first')        

    @classmethod
    def min(cls, funcs, *args, ignore_empty=False):
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        from .aggregate import Aggregate

        def min_vals(x, vals):
            best = None
            for val in vals:
                if best is None or (val is not None and val < best):
                    best = val
            return best

        def min_vals_with_empty(x, vals):
            return min(filter(lambda y: y is not None, vals), default=None)

        funcs = Curve.parse_many(funcs)
        t = min_vals_with_empty if ignore_empty else min_vals
        return Aggregate(funcs, tfm=t, union=ignore_empty, name='min')

    @classmethod
    def max(cls, funcs, *args, ignore_empty=False):
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        from .aggregate import Aggregate

        def max_vals(x, vals):
            best = None
            for val in vals:
                if best is None or (val is not None and val > best):
                    best = val
            return best

        def max_vals_with_empty(x, vals):
            return max(filter(lambda y: y is not None, vals), default=None)

        funcs = Curve.parse_many(funcs)
        t = max_vals_with_empty if ignore_empty else max_vals
        return Aggregate(funcs, tfm=t, union=ignore_empty, name='max')

    @classmethod
    def add_many(cls, funcs, *args):
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        from .aggregate import Aggregate

        def add_f(x, ys):
            for y in ys:
                if y is None:
                    return None
            return sum(ys)
        return Aggregate(funcs, tfm=add_f, name='add', operator='+')

    @classmethod
    def subtract_many(cls, funcs, *args):
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        from .aggregate import Aggregate

        def sub_f(x, ys):
            result = 0
            for i, y in enumerate(ys):
                if y is None:
                    return None
                if i == 0:
                    result = y
                else:
                    result -= y
            return result
        return Aggregate(funcs, tfm=sub_f, name='sub', operator='-')

    @classmethod
    def multiply_many(cls, funcs, *args):
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        from .aggregate import Aggregate

        def mult_f(x, ys):
            geo_sum = 1.0
            for y in ys:
                if y is None:
                    return None
                geo_sum *= y
            return geo_sum
        return Aggregate(funcs, tfm=mult_f, name='mult', operator='*')

    @classmethod
    def divide_many(cls, funcs, *args):
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        from .aggregate import Aggregate

        def div_f(x, ys):
            result = 0
            for i, y in enumerate(ys):
                if y is None:
                    return None
                if i == 0:
                    result = y
                elif y == 0:
                    result = math.inf if result >= 0 else -math.inf
                else:
                    result /= y
            return result
        return Aggregate(funcs, tfm=div_f, name='div', operator='/')

    @classmethod
    def pow_many(cls, funcs, *args):
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        from .aggregate import Aggregate

        def log_f(x, ys):
            result = 0
            for i, y in enumerate(ys):
                if y is None:
                    return None
                if i == 0:
                    result = y
                else:
                    result = result ** y
            return result
        return Aggregate(funcs, tfm=log_f, name='pow', operator='^')

    @classmethod
    def log_many(cls, funcs, *args):
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        from .aggregate import Aggregate

        def log_f(x, ys):
            result = 0
            for i, y in enumerate(ys):
                if y is None:
                    return None
                if i == 0:
                    result = y
                else:
                    result = math.log(result, y)
            return result
        return Aggregate(funcs, tfm=log_f, name='log')

    @classmethod
    def zero(cls, value):
        from .constant import Constant
        return Constant.zero()

    @classmethod
    def const(cls, value):
        from .constant import Constant
        return Constant(value)

    @classmethod
    def parse(cls, func):
        from .generic import Generic
        from .constant import Constant
        from .points import Points

        if func is None:
            return None
        elif isinstance(func, Curve):
            return func
        elif callable(func):
            return Generic(func)
        elif isinstance(func, Number):
            return Constant(func)
        elif isinstance(func, Sequence):
            # Parse points
            if len(func) == 0:
                return Points(func)
            else:
                if isinstance(func[0], Sequence):
                    if len(func[0]) == 2:
                        return Points(func)
        elif isinstance(func, Mapping):
            return cls.parse_descriptor(func)
        raise Exception('Unable to parse function')

    @classmethod
    def parse_descriptor(cls, d, fragment=False, current_func=None, decorators=None):
        # Example:
        # {
        #     "$line": {
        #         "points": [
        #             ["2020-02-12 01:23+1200", 8765.56],
        #             ["2020-02-30 04:50+1200", 6765.56]
        #         ]
        #     }
        # }
        if decorators is None:
            decorators = []

        def next_func_constructor(fname):
            f = current_func or _func_obj
            assert isinstance(f, Curve)
            ftype = type(f)
            fconstructor = None
            fconstructor_from_instance = False
            type_method_names = list(map(lambda x: x[0], inspect.getmembers(ftype, predicate=inspect.ismethod)))
            f_method_names = list(map(lambda x: x[0], inspect.getmembers(f, predicate=inspect.ismethod)))
            if f'{fname}_many' in type_method_names:
                fname = f'{fname}_many'
            if fname in type_method_names:
                def _create_class_fconstructor(fname):
                    def _class_fconstructor(*args, **kwargs):
                        f = current_func or _func_obj
                        fmethod = getattr(type(f), fname)
                        return fmethod(*args, **kwargs)
                    return _class_fconstructor
                fconstructor = _create_class_fconstructor(fname)
                fconstructor_from_instance = False
            elif fname in f_method_names:
                def _create_fconstructor(fname):
                    def _fconstructor(*args, **kwargs):
                        f = current_func or _func_obj
                        fmethod = getattr(f, fname)
                        return fmethod(*args, **kwargs)
                    return _fconstructor
                fconstructor = _create_fconstructor(fname)
                fconstructor_from_instance = True
            else:
                raise ValueError(f'Bad function name: {fname}')
            return fconstructor, fconstructor_from_instance
        
        if isinstance(d, Mapping):
            fragment_vals = {}

            for k, v in d.items():

                if k.startswith('@'):
                    # This is an decorator descriptor
                    oname = k[1:]
                    decorator_i = len(decorators)
                    decorators.insert(decorator_i, oname)
                    v = cls.parse_descriptor(v,
                        fragment=fragment,
                        current_func=current_func,
                        decorators=decorators
                    )
                    del decorators[decorator_i]

                    if oname.startswith('log'):
                        # Log space has ended, exit log space
                        # by raising to power
                        base_str = oname[3:]
                        base = int(base_str) if bool(base_str) else math.e
                        v = base ** v
                    
                    if isinstance(v, Curve):
                        # Allow chaining
                        current_func = v
                        continue
                    if oname != 'args':
                        # Only let @args pass through to parent
                        if len(d) != 1:
                            raise ValueError(f'A decorator (@...) can only have siblings in a fragment')
                        return v

                elif k.startswith('$'):
                    # This is function descriptor
                    fname = k[1:]

                    fconstructor = None
                    fconstructor_from_instance = False

                    if fname == 'const' or fname == 'constant':
                        from .constant import Constant
                        fconstructor = Constant
                    elif fname == 'line':
                        from .line import Line
                        fconstructor = Line
                    elif fname.startswith('log'):
                        base_str = fname[3:]
                        base = int(base_str) if bool(base_str) else math.e
                        def _dot_log(*args, **kwargs):
                            return current_func.log(**util.extend({ "base": base }, kwargs))
                        fconstructor = _dot_log
                        fconstructor_from_instance = True
                    else:
                        fconstructor, fconstructor_from_instance = next_func_constructor(fname)

                    func_args = cls.parse_descriptor(v,
                        fragment=True,
                        decorators=decorators
                    )
                    args = []
                    kwargs = {}
                    if isinstance(func_args, dict):
                        kwargs = func_args
                    elif isinstance(func_args, list):
                        args = func_args
                    else:
                        args = [func_args]
                    
                    # Check for nested args
                    if '@args' in kwargs:
                        args = kwargs['@args']
                        del kwargs['@args']

                    if fconstructor_from_instance and current_func is None:
                        current_func = Curve.parse(args[0])
                        del args[0]
                    elif not fconstructor_from_instance and current_func is not None:
                        # Add current function as first argument or to
                        # list at first argument
                        if bool(args) and isinstance(args[0], list):
                            args[0][0:0] = [current_func]
                        else:
                            args[0:0] = [current_func]
                    current_func = fconstructor(*args, **kwargs)
                    continue

                if current_func is not None:
                    raise Exception(f'Unexpected key after a function: {k}')

                if isinstance(v, Mapping):
                    fragment_vals[k] = cls.parse_descriptor(v,
                        fragment=True,
                        decorators=decorators
                    )
                elif isinstance(v, Sequence) and not isinstance(v, (str, bytes)):
                    fragment_vals[k] = cls.parse_descriptor(v,
                        fragment=True,
                        decorators=decorators
                    )
                else:
                    fragment_vals[k] = v
            
            return current_func or fragment_vals
        elif fragment:
            if isinstance(d, Mapping):
                return {k: cls.parse_descriptor(v,
                    fragment=True,
                    decorators=decorators
                ) for k, v in d.items()}
            elif isinstance(d, Sequence) and not isinstance(d, (str, bytes)):
                return [cls.parse_descriptor(v,
                    fragment=True,
                    decorators=decorators
                ) for v in d]
            elif 'date' in decorators:
                return arrow.get(d).timestamp
            elif isinstance(d, Number):
                if 'log' in decorators:
                    return math.log(d)
                elif 'log2' in decorators:
                    return math.log(d, 2)
                elif 'log10' in decorators:
                    return math.log(d, 10)
                else:
                    return d
            else:
                return d
        else:
            raise TypeError('Unexpected type found while parsing a function')

    @classmethod
    def parse_many(cls, funcs, *args):
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        return list(map(cls.parse, funcs))

    @classmethod
    def count_positional_args(cls, f, default=1):
        if not callable(f):
            raise Exception('Expected callable function')
        if inspect.isbuiltin(f):
            return default
        sig = inspect.signature(f)
        count = 0
        for param in sig.parameters.values():
            if param.kind == inspect.Parameter.POSITIONAL_ONLY or \
                param.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD:
                count += 1
        return count

    def __add__(self, other):
        return Curve.add_many([self, other])

    def __sub__(self, other):
        return Curve.subtract_many([self, other])

    def __mul__(self, other):
        return Curve.multiply_many([self, other])

    def __truediv__(self, other):
        return Curve.divide_many([self, other])

    def __pow__(self, other):
        return Curve.pow_many([self, other])

    def __radd__(self, other):
        return Curve.add_many([other, self])

    def __rsub__(self, other):
        return Curve.subtract_many([other, self])

    def __rmul__(self, other):
        return Curve.multiply_many([other, self])

    def __rtruediv__(self, other):
        return Curve.divide_many([other, self])

    def __rpow__(self, other):
        return Curve.pow_many([other, self])

    def __neg__(self):
        return self.additive_inverse()

    def __pos__(self):
        return self

    def __abs__(self):
        return self.abs()

def _additive_inverse(x, y):
    if y is None:
        return None
    return -y

def _multiplicative_inverse(x, y):
    if y is None:
        return None
    return 1 / y

def _abs(x, y):
    if y is None:
        return None
    return abs(y)

def _callable_arg_len(f, vararg_ret_val):
    args, varargs, _, _ = inspect.getargspec(f)
    if varargs is not None:
        return vararg_ret_val
    arg_len = len(args)
    if arg_len == 0:
        return 0
    if args[0] == 'self':
        arg_len -= 1
    return arg_len

_func_obj = Curve()

Classes

class Curve (min_step=None)
Expand source code
class Curve:

    _token_counter = 0

    @classmethod
    def empty(cls):
        from .empty import Empty
        return Empty()

    @property
    def min_step(self):
        return self._min_step

    @min_step.setter
    def min_step(self, value):
        self._min_step = value

    @property
    def domain(self):
        if self.needs_domain_update:
            self._domain = self.get_domain()
        return self._domain

    @property
    def update_interval(self):
        return self._begin_update_interval

    @property
    def is_updating(self):
        return not self.update_interval.is_empty

    def __init__(self, min_step=None):
        self.name = None
        self._domain = None
        self._observer_data = {}
        self._ordered_observer_tokens = []
        self._begin_update_interval = Interval.empty()
        self._end_update_interval = Interval.empty()
        self.min_step = min_step

    def __call__(self, *args):
        return self.y(args[0])

    def __repr__(self):
        try:
            if bool(self.name):
                return self.name
            return f'{type(self).__name__}("{self.domain}")'
        except Exception as e:
            return super().__repr__() + f'({e})'

    def y(self, x):
        raise Exception("Not implemented")

    def y_start(self):
        return self.y(self.domain.start)

    def y_end(self):
        return self.y(self.domain.end)

    def first_point(self):
        d = self.domain
        if d.is_empty:
            return None
        return (d.start, self.y(d.start))

    def last_point(self):
        d = self.domain
        if d.is_empty:
            return None
        return (d.end, self.y(d.end))

    def d_y(self, x, forward=False, min_step=MIN_STEP, limit=None):
        min_step = self.resolve_min_step(min_step)
        if forward:
            x1 = self.x_next(x, min_step=min_step, limit=limit)
        else:
            x1 = self.x_previous(x, min_step=min_step, limit=limit)

        if x1 is None:
            return None
        y1 = self.y(x1)
        if y1 is None:
            return None
        y = self.y(x)
        if y is None:
            return None
        if x1 == x:
            dy = math.inf if y1 >= y else -math.inf
            if not forward:
                dy = -dy
        else:
            dy = (y1 - y) / (x1 - x)
        return dy

    def x(self, y):
        raise Exception("Not implemented")

    def x_next(self, x, min_step=MIN_STEP, limit=None):
        min_step = self.resolve_min_step(min_step)
        if math.isinf(min_step):
            x1 = self.domain.end
        else:
            x1 = x + min_step
        if limit is not None and x1 > limit:
            x1 = limit
        if not self.domain.contains(x1, enforce_start=False):
            return None
        return x1

    def x_previous(self, x, min_step=MIN_STEP, limit=None):
        min_step = self.resolve_min_step(min_step)
        if math.isinf(min_step):
            x1 = self.domain.start
        else:
            x1 = x - min_step
        if limit is not None and x1 < limit:
            x1 = limit
        if not self.domain.contains(x1, enforce_end=False):
            return None
        return x1

    def previous_point(self, x, min_step=MIN_STEP):
        x1 = self.x_previous(x, min_step=min_step)
        if x1 is None:
            return None
        y1 = self.y(x1)
        return (x1, y1)

    def next_point(self, x, min_step=MIN_STEP):
        x1 = self.x_next(x, min_step=min_step)
        if x1 is None:
            return None
        y1 = self.y(x1)
        return (x1, y1)

    def get_domain(self):
        return Interval.empty()

    def resolve_min_step(self, min_step):
        if min_step is None and self.min_step is None:
            return None
        elif min_step is None:
            return self.min_step
        elif self.min_step is None:
            return min_step
        else:
            return max(min_step, self.min_step)

    def sample_points(self, domain=None, min_step=MIN_STEP, step=None):
        min_step = self.resolve_min_step(min_step)
        if domain is None:
            domain = self.domain
        else:
            domain = Interval.intersection([self.domain, domain])
        if domain.is_empty:
            return []
        elif not domain.is_finite:
            raise Exception("Cannot sample points on an infinite domain {}. Specify a finite domain.".format(domain))
        x_start, x_end = domain
        x_end_bin = round(x_end / min_step) if min_step is not None else x_end
        if domain.start_open:
            points = []
        else:
            points = [(x_start, self.y(x_start))]
        if step is not None:
            x = x_start + step
            while x <= x_end:
                y = self.y(x)
                points.append((x, y))
                x += step
        elif min_step is not None and min_step > 0:
            x = self.x_next(x_start, min_step=min_step, limit=x_end)
            while x is not None and x <= x_end:
                y = self.y(x)
                points.append((x, y))
                x_bin = round(x / min_step) if min_step is not None else x
                if x_bin == x_end_bin:
                    break
                x1 = self.x_next(x, min_step=min_step, limit=x_end)
                if x1 is not None:
                    x1_bin = round(x1 / min_step) if min_step is not None else x1
                    if x1_bin <= x_bin:
                        raise Exception('Next x value {} should be greater than the previous x value {} by at least the minimum step of {}'.format(x1, x, min_step))
                x = x1

            if not domain.end_open and points[-1][0] != x_end:
                points.append((x_end, self.y(x_end)))
        else:
            raise Exception("Bad functions sample parameters.")

        return points

    def sample_points_from_x(self, x, limit, backward=False, open=False, min_step=None):
        assert limit is not None
        if limit < 0:
            limit = -limit
            backward = not backward
        min_step = self.resolve_min_step(min_step)
        points = []
        x1 = x
        i = 0

        if not open:
            if x is None:
                return points
            y = self.y(x)
            if y is None:
                return points
            i += 1

        while limit is None or i < limit:
            if not backward:
                x1 = self.x_next(x1, min_step=min_step)
            else:
                x1 = self.x_previous(x1, min_step=min_step)
            if x1 is None:
                break
            y1 = self.y(x1)
            if y1 is None:
                break
            points.append((x1, y1))
            i += 1

        return points

    def get_range(self, domain=None, **kwargs):
        points = self.sample_points(domain=domain, **kwargs)
        low = None
        high = None
        for p in points:
            if low is None or p[1] < low:
                low = p[1]
            if high is None or p[1] > high:
                high = p[1]
        if low is None or high is None:
            return Interval.empty()
        return Interval(low, high)

    def minimise(self, x, min_step=MIN_STEP, step=None, max_iterations=1000):
        x_min = x
        x_min_previous = None
        iterations = 0
        while iterations < max_iterations:
            iterations += 1
            y = self.y(x_min)
            if y is None:
                return x_min_previous
            dy0 = self.d_y(x_min, forward=False)
            dy1 = self.d_y(x_min, forward=True)
            forward = True
            if dy0 is None and dy1 is None:
                return x_min
            elif dy0 is None:
                if dy1 <= 0:
                    forward = True
                else:
                    # Sloping into null value
                    return None
            elif dy1 is None:
                if dy0 >= 0:
                    forward = False
                else:
                    # Sloping into null value
                    return None
            else:
                if dy0 * dy1 < 0 and dy0 <= 0 and dy1 >= 0:
                    # Found minimum
                    return x_min

                if dy0 * dy1 < 0:
                    # Found maximum
                    forward = abs(dy0) < abs(dy1)
                else:
                    # On slope
                    forward = dy1 < 0

            x_min_previous = x_min
            if forward:
                if step is not None:
                    x_min += step
                else:
                    x_min = self.x_next(x_min, min_step=min_step)
            else:
                if step is not None:
                    x_min -= step
                else:
                    x_min = self.x_previous(x_min, min_step=min_step)
        return x_min

    def maximise(self, x, min_step=MIN_STEP, step=None, max_iterations=1000):
        x_max = x
        x_max_previous = None
        iterations = 0
        while iterations < max_iterations:
            iterations += 1
            y = self.y(x_max)
            if y is None:
                return x_max_previous
            dy0 = self.d_y(x_max, forward=False)
            dy1 = self.d_y(x_max, forward=True)
            forward = True
            if dy0 is None and dy1 is None:
                return x_max
            elif dy0 is None:
                if dy1 >= 0:
                    forward = True
                else:
                    # Sloping into null value
                    return None
            elif dy1 is None:
                if dy0 <= 0:
                    forward = False
                else:
                    # Sloping into null value
                    return None
            else:
                if dy0 * dy1 < 0 and dy0 >= 0 and dy1 <= 0:
                    # Found maximum
                    return x_max

                if dy0 * dy1 < 0:
                    # Found minimum
                    forward = abs(dy0) < abs(dy1)
                else:
                    # On slope
                    forward = dy1 > 0

            x_max_previous = x_max
            if forward:
                if step is not None:
                    x_max += step
                else:
                    x_max = self.x_next(x_max, min_step=min_step)
            else:
                if step is not None:
                    x_max -= step
                else:
                    x_max = self.x_previous(x_max, min_step=min_step)
        return x_max

    def regression(self, domain=None, min_step=MIN_STEP, step=None):
        points = self.sample_points(domain=domain, min_step=min_step, step=step)
        for p in points:
            if p[1] is None:
                return None
        count = len(points)
        if count < 2:
            return None
            
        from .line import Line
        if count == 2:
            return Line(p1=points[0], p2=points[1])
        xy = np.vstack(points)
        x = xy[:,0]
        y = xy[:,1]
        A = np.array([x, np.ones(count)])

        # Regression
        w = np.linalg.lstsq(A.T, y, rcond=None)[0]
        m = w[0]
        c = w[1]

        return Line(const=c, slope=m)

    def add_observer(self, *obj, domain=None, begin=None, end=None, autoremove=False, prioritize=False):
        if begin is None and end is None:
            return 0
        
        Curve._token_counter += 1
        token = Curve._token_counter
        domain = Interval.parse(domain, default_inf=True)
        obj_ref = None

        if len(obj) != 0:
            if autoremove:
                # Remove observer automatically
                obj_ref = weakref.ref(obj[0], lambda _: self.remove_observer(token))
            else:
                # Calling remove_observer() is required
                obj_ref = weakref.ref(obj[0])
        elif autoremove:
            raise Exception('Autoremoving an observer requires an object')

        # Do the callback functions require the domain?
        begin_with_interval = False
        end_with_interval = False
        if begin:
            begin_with_interval = util.count_positional_args(begin) == 1
        if end:
            end_with_interval = util.count_positional_args(end) == 1

        # TODO: does saving strong references to callbacks create a retain cycle?
        self._observer_data[token] = (obj_ref, domain, begin, end, begin_with_interval, end_with_interval)
        if prioritize:
            self._ordered_observer_tokens.insert(0, token)
        else:
            self._ordered_observer_tokens.append(token)
        return token

    def remove_observer(self, token_or_obj):
        if isinstance(token_or_obj, Number):
            if token_or_obj in self._observer_data:
                del self._observer_data[token_or_obj]
                self._ordered_observer_tokens.remove(token_or_obj)
        else:
            for token in list(self._ordered_observer_tokens):
                obj_ref = self._observer_data[token][0]
                if obj_ref is not None:
                    obj = obj_ref()
                    if obj is None or obj == token_or_obj:
                        del self._observer_data[token]
                        self._ordered_observer_tokens.remove(token)

    def begin_update(self, domain):
        if domain.is_empty or self._begin_update_interval.is_superset_of(domain):
            return
        self._begin_update_interval = Interval.union([self._begin_update_interval, domain])
        for token in self._ordered_observer_tokens:
            _, callback_interval, callback, _, callback_with_interval, _ = self._observer_data[token]
            if callback_interval is None or domain.intersects(callback_interval):
                if callback is not None:
                    if callback_with_interval:
                        callback(domain)
                    else:
                        callback()

    def end_update(self, domain):
        if domain.is_empty or self._end_update_interval.is_superset_of(domain):
            return
        self._end_update_interval = Interval.union([self._end_update_interval, domain])
        if not self._end_update_interval.is_superset_of(self._begin_update_interval):
            # Keep collecting updates
            return
        
        # Updates complete
        update_interval = self._end_update_interval
        self._begin_update_interval = Interval.empty()
        self._end_update_interval = Interval.empty()
        self.set_needs_interval_update()
        for token in list(self._ordered_observer_tokens):
            _, callback_interval, _, callback, _, callback_with_interval = self._observer_data[token]
            if callback_interval is None or update_interval.intersects(callback_interval):
                if callback is not None:
                    if callback_with_interval:
                        callback(update_interval)
                    else:
                        callback()

    @property
    def needs_domain_update(self):
        return self._domain is None

    def set_needs_interval_update(self):
        self._domain = None

    def map(self, tfm, skip_none=False, name=None, **kwargs):
        from .map import Map
        return Map(self, tfm, skip_none=skip_none, name=name, **kwargs)

    def accumulator_map(self, tfm, degree, is_period=False, interpolation=None, min_step=MIN_STEP, uniform=True):
        from .accumulator_map import AccumulatorMap
        return AccumulatorMap(
            self,
            tfm,
            degree,
            is_period=is_period,
            interpolation=interpolation,
            min_step=min_step,
            uniform=uniform
        )

    def offset(self, x, duration=None):
        from .offset import Offset
        return Offset(self, x, duration=duration)

    def add(self, func):
        return Curve.add_many([self, func])

    def subtract(self, func):
        return Curve.subtract_many([self, func])

    def multiply(self, func):
        return Curve.multiply_many([self, func])

    def divide(self, func):
        return Curve.divide_many([self, func])

    def pow(self, power):
        return type(self).pow_many([self, power])

    def raised(self, base):
        return type(self).pow_many([base, self])

    def log(self, base=math.e):
        return type(self).log_many([self, base])

    def integral(self, const=0, interpolation=None, uniform=True):
        from .integral import Integral

        return Integral(self, const=const, interpolation=interpolation, uniform=uniform)

    def additive_inverse(self):
        return self.map(_additive_inverse)

    def multiplicative_inverse(self):
        return self.map(_multiplicative_inverse)

    def abs(self):
        return self.map(_abs)

    def blend(self, func, x_blend_start, x_blend_stop):
        from .aggregate import Aggregate
        from .piecewise import Piecewise
        
        x_blend_period = x_blend_stop - x_blend_start
        def blend_f(x, ys):
            u = (x - x_blend_start) / x_blend_period
            return (1.0 - u) * ys[0] + u * ys[1]
        c = Aggregate([self, func], tfm=blend_f, name='blend')

        funcs = [self, c, func]
        domains = self.domain.partition([x_blend_start, x_blend_stop])
        return Piecewise(funcs, domains)

    def extension(self, name, start=False, end=True, raise_on_empty=False, **kwds):
        from .extension import ConstantExtension
        from .extension import TangentExtension
        from .extension import SinExtension

        classes = [            
            ConstantExtension,
            TangentExtension,
            SinExtension,
        ]

        for c in classes:
            if c.name == name:
                return c(self, start=start, end=end, raise_on_empty=raise_on_empty, **kwds)
        
        raise Exception('Unknown extension type')

    # def wave_extended(self, ref_func, min_deviation=0, start=None, step=None, min_step=MIN_STEP):
    #     if self.domain.is_positive_infinite:
    #         return self
    #     ref_func = Curve.parse(ref_func)
    #     extremas = Extremas(self, ref_func, min_deviation=min_deviation, start=start, step=step, min_step=min_step)

    # def mom(self, degree, duration, **kwargs):
    #     """
    #     Returns the momentum of the reciever.

    #     The degree corresponds to the number of steps to take.
    #     """
    #     degree = int(degree)
    #     if degree < 1:
    #         raise ValueError(f'Momentum requires a positive degree, got: {degree}')
    #     from pyduration import Duration
    #     duration = Duration.parse(duration)
    #     def _mom(x, y):
    #         if y is None:
    #             return None
    #         # step back
    #         x0 = duration.step(x, -degree)
    #         y0 = self.y(x0)
    #         if y0 is None:
    #             return None
    #         return y - y0
    #     return self.map(_mom, name=f'mom({degree})', **kwargs)

    def sma(self, degree, is_period=False, **kwargs):
        from .sma import SMA
        return SMA(self, degree, is_period=is_period, **kwargs)

    def ema(self, degree, is_period=False, init=None, **kwargs):
        from .ema import EMA
        return EMA(self, degree, is_period=is_period, init=init, **kwargs)

    def smma(self, degree, **kwargs):
        from .sma import SMA
        from .ema import EMA
        sma = SMA(self, degree, is_period=False, **kwargs)
        ema = EMA(self, 1 / degree, is_period=False, init=sma, **kwargs)
        return ema

    def harmonic_smas(self, base_degree, count, stride=1, is_period=False, **kwargs):
        """
        Returns `count` SMAs from small to large. Their degrees
        are proportional to the golden ratio.
        """
        periods = []
        smas = []
        step = stride + 1

        for i in range(count):
            period = base_degree * GOLD ** float(i * step)
            period = round(period / base_degree) * base_degree
            periods.append(period)

        for i in range(count):
            period = periods[i]
            sma = self.sma(period, is_period=is_period, **kwargs)
            smas.append(sma)

        return smas

    def centered_macs(self, base_degree, count, stride=1, is_period=False, **kwargs):
        periods = []
        smas = []
        step = stride + 1

        for i in range(count):
            period = base_degree * GOLD ** float(i * step)
            period = round(period / base_degree) * base_degree
            periods.insert(0, period)

        for i in range(count):
            period = periods[i]
            sma = self.sma(period, is_period=is_period, **kwargs)
            smas.append(sma)

        return smas

    def rsi(self, degree, **kwargs):
        d = self.differential()
        du = Curve.max([d, 0], ignore_empty=False)
        dd = Curve.max([-d, 0], ignore_empty=False)
        rs = du.ema(1 / degree, **kwargs) / dd.ema(1 / degree, **kwargs)
        rsi = 100 - 100 / (1 + rs)
        rsi.name = f'rsi({degree})'
        return rsi

    def trailing_min(self, degree, is_period=False, interpolation=None, min_step=MIN_STEP, uniform=True):
        return self.accumulator_map(
            min,
            degree,
            is_period=is_period,
            interpolation=interpolation,
            min_step=min_step,
            uniform=uniform
        )

    def trailing_max(self, degree, is_period=False, interpolation=None, min_step=MIN_STEP, uniform=True):
        return self.accumulator_map(
            max,
            degree,
            is_period=is_period,
            interpolation=interpolation,
            min_step=min_step,
            uniform=uniform
        )

    def differential(self, forward=False):
        from .map import Map
        d = Map(self, lambda x, y: self.d_y(x, forward=forward))
        d.name = 'diff'
        return d

    def subset(self, domain):
        from .generic import Generic
        return Generic(self, domain=domain, min_step=self.min_step)

    @classmethod
    def first(cls, funcs, *args):
        """
        Return a func which returns the first value which is not `None`.
        """
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        from .aggregate import Aggregate

        def first_val(x, vals):
            for v in vals:
                if v is not None:
                    return v
            return None

        funcs = Curve.parse_many(funcs)
        return Aggregate(funcs, tfm=first_val, union=True, name='first')        

    @classmethod
    def min(cls, funcs, *args, ignore_empty=False):
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        from .aggregate import Aggregate

        def min_vals(x, vals):
            best = None
            for val in vals:
                if best is None or (val is not None and val < best):
                    best = val
            return best

        def min_vals_with_empty(x, vals):
            return min(filter(lambda y: y is not None, vals), default=None)

        funcs = Curve.parse_many(funcs)
        t = min_vals_with_empty if ignore_empty else min_vals
        return Aggregate(funcs, tfm=t, union=ignore_empty, name='min')

    @classmethod
    def max(cls, funcs, *args, ignore_empty=False):
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        from .aggregate import Aggregate

        def max_vals(x, vals):
            best = None
            for val in vals:
                if best is None or (val is not None and val > best):
                    best = val
            return best

        def max_vals_with_empty(x, vals):
            return max(filter(lambda y: y is not None, vals), default=None)

        funcs = Curve.parse_many(funcs)
        t = max_vals_with_empty if ignore_empty else max_vals
        return Aggregate(funcs, tfm=t, union=ignore_empty, name='max')

    @classmethod
    def add_many(cls, funcs, *args):
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        from .aggregate import Aggregate

        def add_f(x, ys):
            for y in ys:
                if y is None:
                    return None
            return sum(ys)
        return Aggregate(funcs, tfm=add_f, name='add', operator='+')

    @classmethod
    def subtract_many(cls, funcs, *args):
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        from .aggregate import Aggregate

        def sub_f(x, ys):
            result = 0
            for i, y in enumerate(ys):
                if y is None:
                    return None
                if i == 0:
                    result = y
                else:
                    result -= y
            return result
        return Aggregate(funcs, tfm=sub_f, name='sub', operator='-')

    @classmethod
    def multiply_many(cls, funcs, *args):
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        from .aggregate import Aggregate

        def mult_f(x, ys):
            geo_sum = 1.0
            for y in ys:
                if y is None:
                    return None
                geo_sum *= y
            return geo_sum
        return Aggregate(funcs, tfm=mult_f, name='mult', operator='*')

    @classmethod
    def divide_many(cls, funcs, *args):
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        from .aggregate import Aggregate

        def div_f(x, ys):
            result = 0
            for i, y in enumerate(ys):
                if y is None:
                    return None
                if i == 0:
                    result = y
                elif y == 0:
                    result = math.inf if result >= 0 else -math.inf
                else:
                    result /= y
            return result
        return Aggregate(funcs, tfm=div_f, name='div', operator='/')

    @classmethod
    def pow_many(cls, funcs, *args):
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        from .aggregate import Aggregate

        def log_f(x, ys):
            result = 0
            for i, y in enumerate(ys):
                if y is None:
                    return None
                if i == 0:
                    result = y
                else:
                    result = result ** y
            return result
        return Aggregate(funcs, tfm=log_f, name='pow', operator='^')

    @classmethod
    def log_many(cls, funcs, *args):
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        from .aggregate import Aggregate

        def log_f(x, ys):
            result = 0
            for i, y in enumerate(ys):
                if y is None:
                    return None
                if i == 0:
                    result = y
                else:
                    result = math.log(result, y)
            return result
        return Aggregate(funcs, tfm=log_f, name='log')

    @classmethod
    def zero(cls, value):
        from .constant import Constant
        return Constant.zero()

    @classmethod
    def const(cls, value):
        from .constant import Constant
        return Constant(value)

    @classmethod
    def parse(cls, func):
        from .generic import Generic
        from .constant import Constant
        from .points import Points

        if func is None:
            return None
        elif isinstance(func, Curve):
            return func
        elif callable(func):
            return Generic(func)
        elif isinstance(func, Number):
            return Constant(func)
        elif isinstance(func, Sequence):
            # Parse points
            if len(func) == 0:
                return Points(func)
            else:
                if isinstance(func[0], Sequence):
                    if len(func[0]) == 2:
                        return Points(func)
        elif isinstance(func, Mapping):
            return cls.parse_descriptor(func)
        raise Exception('Unable to parse function')

    @classmethod
    def parse_descriptor(cls, d, fragment=False, current_func=None, decorators=None):
        # Example:
        # {
        #     "$line": {
        #         "points": [
        #             ["2020-02-12 01:23+1200", 8765.56],
        #             ["2020-02-30 04:50+1200", 6765.56]
        #         ]
        #     }
        # }
        if decorators is None:
            decorators = []

        def next_func_constructor(fname):
            f = current_func or _func_obj
            assert isinstance(f, Curve)
            ftype = type(f)
            fconstructor = None
            fconstructor_from_instance = False
            type_method_names = list(map(lambda x: x[0], inspect.getmembers(ftype, predicate=inspect.ismethod)))
            f_method_names = list(map(lambda x: x[0], inspect.getmembers(f, predicate=inspect.ismethod)))
            if f'{fname}_many' in type_method_names:
                fname = f'{fname}_many'
            if fname in type_method_names:
                def _create_class_fconstructor(fname):
                    def _class_fconstructor(*args, **kwargs):
                        f = current_func or _func_obj
                        fmethod = getattr(type(f), fname)
                        return fmethod(*args, **kwargs)
                    return _class_fconstructor
                fconstructor = _create_class_fconstructor(fname)
                fconstructor_from_instance = False
            elif fname in f_method_names:
                def _create_fconstructor(fname):
                    def _fconstructor(*args, **kwargs):
                        f = current_func or _func_obj
                        fmethod = getattr(f, fname)
                        return fmethod(*args, **kwargs)
                    return _fconstructor
                fconstructor = _create_fconstructor(fname)
                fconstructor_from_instance = True
            else:
                raise ValueError(f'Bad function name: {fname}')
            return fconstructor, fconstructor_from_instance
        
        if isinstance(d, Mapping):
            fragment_vals = {}

            for k, v in d.items():

                if k.startswith('@'):
                    # This is an decorator descriptor
                    oname = k[1:]
                    decorator_i = len(decorators)
                    decorators.insert(decorator_i, oname)
                    v = cls.parse_descriptor(v,
                        fragment=fragment,
                        current_func=current_func,
                        decorators=decorators
                    )
                    del decorators[decorator_i]

                    if oname.startswith('log'):
                        # Log space has ended, exit log space
                        # by raising to power
                        base_str = oname[3:]
                        base = int(base_str) if bool(base_str) else math.e
                        v = base ** v
                    
                    if isinstance(v, Curve):
                        # Allow chaining
                        current_func = v
                        continue
                    if oname != 'args':
                        # Only let @args pass through to parent
                        if len(d) != 1:
                            raise ValueError(f'A decorator (@...) can only have siblings in a fragment')
                        return v

                elif k.startswith('$'):
                    # This is function descriptor
                    fname = k[1:]

                    fconstructor = None
                    fconstructor_from_instance = False

                    if fname == 'const' or fname == 'constant':
                        from .constant import Constant
                        fconstructor = Constant
                    elif fname == 'line':
                        from .line import Line
                        fconstructor = Line
                    elif fname.startswith('log'):
                        base_str = fname[3:]
                        base = int(base_str) if bool(base_str) else math.e
                        def _dot_log(*args, **kwargs):
                            return current_func.log(**util.extend({ "base": base }, kwargs))
                        fconstructor = _dot_log
                        fconstructor_from_instance = True
                    else:
                        fconstructor, fconstructor_from_instance = next_func_constructor(fname)

                    func_args = cls.parse_descriptor(v,
                        fragment=True,
                        decorators=decorators
                    )
                    args = []
                    kwargs = {}
                    if isinstance(func_args, dict):
                        kwargs = func_args
                    elif isinstance(func_args, list):
                        args = func_args
                    else:
                        args = [func_args]
                    
                    # Check for nested args
                    if '@args' in kwargs:
                        args = kwargs['@args']
                        del kwargs['@args']

                    if fconstructor_from_instance and current_func is None:
                        current_func = Curve.parse(args[0])
                        del args[0]
                    elif not fconstructor_from_instance and current_func is not None:
                        # Add current function as first argument or to
                        # list at first argument
                        if bool(args) and isinstance(args[0], list):
                            args[0][0:0] = [current_func]
                        else:
                            args[0:0] = [current_func]
                    current_func = fconstructor(*args, **kwargs)
                    continue

                if current_func is not None:
                    raise Exception(f'Unexpected key after a function: {k}')

                if isinstance(v, Mapping):
                    fragment_vals[k] = cls.parse_descriptor(v,
                        fragment=True,
                        decorators=decorators
                    )
                elif isinstance(v, Sequence) and not isinstance(v, (str, bytes)):
                    fragment_vals[k] = cls.parse_descriptor(v,
                        fragment=True,
                        decorators=decorators
                    )
                else:
                    fragment_vals[k] = v
            
            return current_func or fragment_vals
        elif fragment:
            if isinstance(d, Mapping):
                return {k: cls.parse_descriptor(v,
                    fragment=True,
                    decorators=decorators
                ) for k, v in d.items()}
            elif isinstance(d, Sequence) and not isinstance(d, (str, bytes)):
                return [cls.parse_descriptor(v,
                    fragment=True,
                    decorators=decorators
                ) for v in d]
            elif 'date' in decorators:
                return arrow.get(d).timestamp
            elif isinstance(d, Number):
                if 'log' in decorators:
                    return math.log(d)
                elif 'log2' in decorators:
                    return math.log(d, 2)
                elif 'log10' in decorators:
                    return math.log(d, 10)
                else:
                    return d
            else:
                return d
        else:
            raise TypeError('Unexpected type found while parsing a function')

    @classmethod
    def parse_many(cls, funcs, *args):
        if not isinstance(funcs, Sequence):
            funcs = [funcs] + list(args)
        return list(map(cls.parse, funcs))

    @classmethod
    def count_positional_args(cls, f, default=1):
        if not callable(f):
            raise Exception('Expected callable function')
        if inspect.isbuiltin(f):
            return default
        sig = inspect.signature(f)
        count = 0
        for param in sig.parameters.values():
            if param.kind == inspect.Parameter.POSITIONAL_ONLY or \
                param.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD:
                count += 1
        return count

    def __add__(self, other):
        return Curve.add_many([self, other])

    def __sub__(self, other):
        return Curve.subtract_many([self, other])

    def __mul__(self, other):
        return Curve.multiply_many([self, other])

    def __truediv__(self, other):
        return Curve.divide_many([self, other])

    def __pow__(self, other):
        return Curve.pow_many([self, other])

    def __radd__(self, other):
        return Curve.add_many([other, self])

    def __rsub__(self, other):
        return Curve.subtract_many([other, self])

    def __rmul__(self, other):
        return Curve.multiply_many([other, self])

    def __rtruediv__(self, other):
        return Curve.divide_many([other, self])

    def __rpow__(self, other):
        return Curve.pow_many([other, self])

    def __neg__(self):
        return self.additive_inverse()

    def __pos__(self):
        return self

    def __abs__(self):
        return self.abs()

Subclasses

Static methods

def add_many(funcs, *args)
Expand source code
@classmethod
def add_many(cls, funcs, *args):
    if not isinstance(funcs, Sequence):
        funcs = [funcs] + list(args)
    from .aggregate import Aggregate

    def add_f(x, ys):
        for y in ys:
            if y is None:
                return None
        return sum(ys)
    return Aggregate(funcs, tfm=add_f, name='add', operator='+')
def const(value)
Expand source code
@classmethod
def const(cls, value):
    from .constant import Constant
    return Constant(value)
def count_positional_args(f, default=1)
Expand source code
@classmethod
def count_positional_args(cls, f, default=1):
    if not callable(f):
        raise Exception('Expected callable function')
    if inspect.isbuiltin(f):
        return default
    sig = inspect.signature(f)
    count = 0
    for param in sig.parameters.values():
        if param.kind == inspect.Parameter.POSITIONAL_ONLY or \
            param.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD:
            count += 1
    return count
def divide_many(funcs, *args)
Expand source code
@classmethod
def divide_many(cls, funcs, *args):
    if not isinstance(funcs, Sequence):
        funcs = [funcs] + list(args)
    from .aggregate import Aggregate

    def div_f(x, ys):
        result = 0
        for i, y in enumerate(ys):
            if y is None:
                return None
            if i == 0:
                result = y
            elif y == 0:
                result = math.inf if result >= 0 else -math.inf
            else:
                result /= y
        return result
    return Aggregate(funcs, tfm=div_f, name='div', operator='/')
def empty()
Expand source code
@classmethod
def empty(cls):
    from .empty import Empty
    return Empty()
def first(funcs, *args)

Return a func which returns the first value which is not None.

Expand source code
@classmethod
def first(cls, funcs, *args):
    """
    Return a func which returns the first value which is not `None`.
    """
    if not isinstance(funcs, Sequence):
        funcs = [funcs] + list(args)
    from .aggregate import Aggregate

    def first_val(x, vals):
        for v in vals:
            if v is not None:
                return v
        return None

    funcs = Curve.parse_many(funcs)
    return Aggregate(funcs, tfm=first_val, union=True, name='first')        
def log_many(funcs, *args)
Expand source code
@classmethod
def log_many(cls, funcs, *args):
    if not isinstance(funcs, Sequence):
        funcs = [funcs] + list(args)
    from .aggregate import Aggregate

    def log_f(x, ys):
        result = 0
        for i, y in enumerate(ys):
            if y is None:
                return None
            if i == 0:
                result = y
            else:
                result = math.log(result, y)
        return result
    return Aggregate(funcs, tfm=log_f, name='log')
def max(funcs, *args, ignore_empty=False)
Expand source code
@classmethod
def max(cls, funcs, *args, ignore_empty=False):
    if not isinstance(funcs, Sequence):
        funcs = [funcs] + list(args)
    from .aggregate import Aggregate

    def max_vals(x, vals):
        best = None
        for val in vals:
            if best is None or (val is not None and val > best):
                best = val
        return best

    def max_vals_with_empty(x, vals):
        return max(filter(lambda y: y is not None, vals), default=None)

    funcs = Curve.parse_many(funcs)
    t = max_vals_with_empty if ignore_empty else max_vals
    return Aggregate(funcs, tfm=t, union=ignore_empty, name='max')
def min(funcs, *args, ignore_empty=False)
Expand source code
@classmethod
def min(cls, funcs, *args, ignore_empty=False):
    if not isinstance(funcs, Sequence):
        funcs = [funcs] + list(args)
    from .aggregate import Aggregate

    def min_vals(x, vals):
        best = None
        for val in vals:
            if best is None or (val is not None and val < best):
                best = val
        return best

    def min_vals_with_empty(x, vals):
        return min(filter(lambda y: y is not None, vals), default=None)

    funcs = Curve.parse_many(funcs)
    t = min_vals_with_empty if ignore_empty else min_vals
    return Aggregate(funcs, tfm=t, union=ignore_empty, name='min')
def multiply_many(funcs, *args)
Expand source code
@classmethod
def multiply_many(cls, funcs, *args):
    if not isinstance(funcs, Sequence):
        funcs = [funcs] + list(args)
    from .aggregate import Aggregate

    def mult_f(x, ys):
        geo_sum = 1.0
        for y in ys:
            if y is None:
                return None
            geo_sum *= y
        return geo_sum
    return Aggregate(funcs, tfm=mult_f, name='mult', operator='*')
def parse(func)
Expand source code
@classmethod
def parse(cls, func):
    from .generic import Generic
    from .constant import Constant
    from .points import Points

    if func is None:
        return None
    elif isinstance(func, Curve):
        return func
    elif callable(func):
        return Generic(func)
    elif isinstance(func, Number):
        return Constant(func)
    elif isinstance(func, Sequence):
        # Parse points
        if len(func) == 0:
            return Points(func)
        else:
            if isinstance(func[0], Sequence):
                if len(func[0]) == 2:
                    return Points(func)
    elif isinstance(func, Mapping):
        return cls.parse_descriptor(func)
    raise Exception('Unable to parse function')
def parse_descriptor(d, fragment=False, current_func=None, decorators=None)
Expand source code
@classmethod
def parse_descriptor(cls, d, fragment=False, current_func=None, decorators=None):
    # Example:
    # {
    #     "$line": {
    #         "points": [
    #             ["2020-02-12 01:23+1200", 8765.56],
    #             ["2020-02-30 04:50+1200", 6765.56]
    #         ]
    #     }
    # }
    if decorators is None:
        decorators = []

    def next_func_constructor(fname):
        f = current_func or _func_obj
        assert isinstance(f, Curve)
        ftype = type(f)
        fconstructor = None
        fconstructor_from_instance = False
        type_method_names = list(map(lambda x: x[0], inspect.getmembers(ftype, predicate=inspect.ismethod)))
        f_method_names = list(map(lambda x: x[0], inspect.getmembers(f, predicate=inspect.ismethod)))
        if f'{fname}_many' in type_method_names:
            fname = f'{fname}_many'
        if fname in type_method_names:
            def _create_class_fconstructor(fname):
                def _class_fconstructor(*args, **kwargs):
                    f = current_func or _func_obj
                    fmethod = getattr(type(f), fname)
                    return fmethod(*args, **kwargs)
                return _class_fconstructor
            fconstructor = _create_class_fconstructor(fname)
            fconstructor_from_instance = False
        elif fname in f_method_names:
            def _create_fconstructor(fname):
                def _fconstructor(*args, **kwargs):
                    f = current_func or _func_obj
                    fmethod = getattr(f, fname)
                    return fmethod(*args, **kwargs)
                return _fconstructor
            fconstructor = _create_fconstructor(fname)
            fconstructor_from_instance = True
        else:
            raise ValueError(f'Bad function name: {fname}')
        return fconstructor, fconstructor_from_instance
    
    if isinstance(d, Mapping):
        fragment_vals = {}

        for k, v in d.items():

            if k.startswith('@'):
                # This is an decorator descriptor
                oname = k[1:]
                decorator_i = len(decorators)
                decorators.insert(decorator_i, oname)
                v = cls.parse_descriptor(v,
                    fragment=fragment,
                    current_func=current_func,
                    decorators=decorators
                )
                del decorators[decorator_i]

                if oname.startswith('log'):
                    # Log space has ended, exit log space
                    # by raising to power
                    base_str = oname[3:]
                    base = int(base_str) if bool(base_str) else math.e
                    v = base ** v
                
                if isinstance(v, Curve):
                    # Allow chaining
                    current_func = v
                    continue
                if oname != 'args':
                    # Only let @args pass through to parent
                    if len(d) != 1:
                        raise ValueError(f'A decorator (@...) can only have siblings in a fragment')
                    return v

            elif k.startswith('$'):
                # This is function descriptor
                fname = k[1:]

                fconstructor = None
                fconstructor_from_instance = False

                if fname == 'const' or fname == 'constant':
                    from .constant import Constant
                    fconstructor = Constant
                elif fname == 'line':
                    from .line import Line
                    fconstructor = Line
                elif fname.startswith('log'):
                    base_str = fname[3:]
                    base = int(base_str) if bool(base_str) else math.e
                    def _dot_log(*args, **kwargs):
                        return current_func.log(**util.extend({ "base": base }, kwargs))
                    fconstructor = _dot_log
                    fconstructor_from_instance = True
                else:
                    fconstructor, fconstructor_from_instance = next_func_constructor(fname)

                func_args = cls.parse_descriptor(v,
                    fragment=True,
                    decorators=decorators
                )
                args = []
                kwargs = {}
                if isinstance(func_args, dict):
                    kwargs = func_args
                elif isinstance(func_args, list):
                    args = func_args
                else:
                    args = [func_args]
                
                # Check for nested args
                if '@args' in kwargs:
                    args = kwargs['@args']
                    del kwargs['@args']

                if fconstructor_from_instance and current_func is None:
                    current_func = Curve.parse(args[0])
                    del args[0]
                elif not fconstructor_from_instance and current_func is not None:
                    # Add current function as first argument or to
                    # list at first argument
                    if bool(args) and isinstance(args[0], list):
                        args[0][0:0] = [current_func]
                    else:
                        args[0:0] = [current_func]
                current_func = fconstructor(*args, **kwargs)
                continue

            if current_func is not None:
                raise Exception(f'Unexpected key after a function: {k}')

            if isinstance(v, Mapping):
                fragment_vals[k] = cls.parse_descriptor(v,
                    fragment=True,
                    decorators=decorators
                )
            elif isinstance(v, Sequence) and not isinstance(v, (str, bytes)):
                fragment_vals[k] = cls.parse_descriptor(v,
                    fragment=True,
                    decorators=decorators
                )
            else:
                fragment_vals[k] = v
        
        return current_func or fragment_vals
    elif fragment:
        if isinstance(d, Mapping):
            return {k: cls.parse_descriptor(v,
                fragment=True,
                decorators=decorators
            ) for k, v in d.items()}
        elif isinstance(d, Sequence) and not isinstance(d, (str, bytes)):
            return [cls.parse_descriptor(v,
                fragment=True,
                decorators=decorators
            ) for v in d]
        elif 'date' in decorators:
            return arrow.get(d).timestamp
        elif isinstance(d, Number):
            if 'log' in decorators:
                return math.log(d)
            elif 'log2' in decorators:
                return math.log(d, 2)
            elif 'log10' in decorators:
                return math.log(d, 10)
            else:
                return d
        else:
            return d
    else:
        raise TypeError('Unexpected type found while parsing a function')
def parse_many(funcs, *args)
Expand source code
@classmethod
def parse_many(cls, funcs, *args):
    if not isinstance(funcs, Sequence):
        funcs = [funcs] + list(args)
    return list(map(cls.parse, funcs))
def pow_many(funcs, *args)
Expand source code
@classmethod
def pow_many(cls, funcs, *args):
    if not isinstance(funcs, Sequence):
        funcs = [funcs] + list(args)
    from .aggregate import Aggregate

    def log_f(x, ys):
        result = 0
        for i, y in enumerate(ys):
            if y is None:
                return None
            if i == 0:
                result = y
            else:
                result = result ** y
        return result
    return Aggregate(funcs, tfm=log_f, name='pow', operator='^')
def subtract_many(funcs, *args)
Expand source code
@classmethod
def subtract_many(cls, funcs, *args):
    if not isinstance(funcs, Sequence):
        funcs = [funcs] + list(args)
    from .aggregate import Aggregate

    def sub_f(x, ys):
        result = 0
        for i, y in enumerate(ys):
            if y is None:
                return None
            if i == 0:
                result = y
            else:
                result -= y
        return result
    return Aggregate(funcs, tfm=sub_f, name='sub', operator='-')
def zero(value)
Expand source code
@classmethod
def zero(cls, value):
    from .constant import Constant
    return Constant.zero()

Instance variables

var domain
Expand source code
@property
def domain(self):
    if self.needs_domain_update:
        self._domain = self.get_domain()
    return self._domain
var is_updating
Expand source code
@property
def is_updating(self):
    return not self.update_interval.is_empty
var min_step
Expand source code
@property
def min_step(self):
    return self._min_step
var needs_domain_update
Expand source code
@property
def needs_domain_update(self):
    return self._domain is None
var update_interval
Expand source code
@property
def update_interval(self):
    return self._begin_update_interval

Methods

def abs(self)
Expand source code
def abs(self):
    return self.map(_abs)
def accumulator_map(self, tfm, degree, is_period=False, interpolation=None, min_step=1e-05, uniform=True)
Expand source code
def accumulator_map(self, tfm, degree, is_period=False, interpolation=None, min_step=MIN_STEP, uniform=True):
    from .accumulator_map import AccumulatorMap
    return AccumulatorMap(
        self,
        tfm,
        degree,
        is_period=is_period,
        interpolation=interpolation,
        min_step=min_step,
        uniform=uniform
    )
def add(self, func)
Expand source code
def add(self, func):
    return Curve.add_many([self, func])
def add_observer(self, *obj, domain=None, begin=None, end=None, autoremove=False, prioritize=False)
Expand source code
def add_observer(self, *obj, domain=None, begin=None, end=None, autoremove=False, prioritize=False):
    if begin is None and end is None:
        return 0
    
    Curve._token_counter += 1
    token = Curve._token_counter
    domain = Interval.parse(domain, default_inf=True)
    obj_ref = None

    if len(obj) != 0:
        if autoremove:
            # Remove observer automatically
            obj_ref = weakref.ref(obj[0], lambda _: self.remove_observer(token))
        else:
            # Calling remove_observer() is required
            obj_ref = weakref.ref(obj[0])
    elif autoremove:
        raise Exception('Autoremoving an observer requires an object')

    # Do the callback functions require the domain?
    begin_with_interval = False
    end_with_interval = False
    if begin:
        begin_with_interval = util.count_positional_args(begin) == 1
    if end:
        end_with_interval = util.count_positional_args(end) == 1

    # TODO: does saving strong references to callbacks create a retain cycle?
    self._observer_data[token] = (obj_ref, domain, begin, end, begin_with_interval, end_with_interval)
    if prioritize:
        self._ordered_observer_tokens.insert(0, token)
    else:
        self._ordered_observer_tokens.append(token)
    return token
def additive_inverse(self)
Expand source code
def additive_inverse(self):
    return self.map(_additive_inverse)
def begin_update(self, domain)
Expand source code
def begin_update(self, domain):
    if domain.is_empty or self._begin_update_interval.is_superset_of(domain):
        return
    self._begin_update_interval = Interval.union([self._begin_update_interval, domain])
    for token in self._ordered_observer_tokens:
        _, callback_interval, callback, _, callback_with_interval, _ = self._observer_data[token]
        if callback_interval is None or domain.intersects(callback_interval):
            if callback is not None:
                if callback_with_interval:
                    callback(domain)
                else:
                    callback()
def blend(self, func, x_blend_start, x_blend_stop)
Expand source code
def blend(self, func, x_blend_start, x_blend_stop):
    from .aggregate import Aggregate
    from .piecewise import Piecewise
    
    x_blend_period = x_blend_stop - x_blend_start
    def blend_f(x, ys):
        u = (x - x_blend_start) / x_blend_period
        return (1.0 - u) * ys[0] + u * ys[1]
    c = Aggregate([self, func], tfm=blend_f, name='blend')

    funcs = [self, c, func]
    domains = self.domain.partition([x_blend_start, x_blend_stop])
    return Piecewise(funcs, domains)
def centered_macs(self, base_degree, count, stride=1, is_period=False, **kwargs)
Expand source code
def centered_macs(self, base_degree, count, stride=1, is_period=False, **kwargs):
    periods = []
    smas = []
    step = stride + 1

    for i in range(count):
        period = base_degree * GOLD ** float(i * step)
        period = round(period / base_degree) * base_degree
        periods.insert(0, period)

    for i in range(count):
        period = periods[i]
        sma = self.sma(period, is_period=is_period, **kwargs)
        smas.append(sma)

    return smas
def d_y(self, x, forward=False, min_step=1e-05, limit=None)
Expand source code
def d_y(self, x, forward=False, min_step=MIN_STEP, limit=None):
    min_step = self.resolve_min_step(min_step)
    if forward:
        x1 = self.x_next(x, min_step=min_step, limit=limit)
    else:
        x1 = self.x_previous(x, min_step=min_step, limit=limit)

    if x1 is None:
        return None
    y1 = self.y(x1)
    if y1 is None:
        return None
    y = self.y(x)
    if y is None:
        return None
    if x1 == x:
        dy = math.inf if y1 >= y else -math.inf
        if not forward:
            dy = -dy
    else:
        dy = (y1 - y) / (x1 - x)
    return dy
def differential(self, forward=False)
Expand source code
def differential(self, forward=False):
    from .map import Map
    d = Map(self, lambda x, y: self.d_y(x, forward=forward))
    d.name = 'diff'
    return d
def divide(self, func)
Expand source code
def divide(self, func):
    return Curve.divide_many([self, func])
def ema(self, degree, is_period=False, init=None, **kwargs)
Expand source code
def ema(self, degree, is_period=False, init=None, **kwargs):
    from .ema import EMA
    return EMA(self, degree, is_period=is_period, init=init, **kwargs)
def end_update(self, domain)
Expand source code
def end_update(self, domain):
    if domain.is_empty or self._end_update_interval.is_superset_of(domain):
        return
    self._end_update_interval = Interval.union([self._end_update_interval, domain])
    if not self._end_update_interval.is_superset_of(self._begin_update_interval):
        # Keep collecting updates
        return
    
    # Updates complete
    update_interval = self._end_update_interval
    self._begin_update_interval = Interval.empty()
    self._end_update_interval = Interval.empty()
    self.set_needs_interval_update()
    for token in list(self._ordered_observer_tokens):
        _, callback_interval, _, callback, _, callback_with_interval = self._observer_data[token]
        if callback_interval is None or update_interval.intersects(callback_interval):
            if callback is not None:
                if callback_with_interval:
                    callback(update_interval)
                else:
                    callback()
def extension(self, name, start=False, end=True, raise_on_empty=False, **kwds)
Expand source code
def extension(self, name, start=False, end=True, raise_on_empty=False, **kwds):
    from .extension import ConstantExtension
    from .extension import TangentExtension
    from .extension import SinExtension

    classes = [            
        ConstantExtension,
        TangentExtension,
        SinExtension,
    ]

    for c in classes:
        if c.name == name:
            return c(self, start=start, end=end, raise_on_empty=raise_on_empty, **kwds)
    
    raise Exception('Unknown extension type')
def first_point(self)
Expand source code
def first_point(self):
    d = self.domain
    if d.is_empty:
        return None
    return (d.start, self.y(d.start))
def get_domain(self)
Expand source code
def get_domain(self):
    return Interval.empty()
def get_range(self, domain=None, **kwargs)
Expand source code
def get_range(self, domain=None, **kwargs):
    points = self.sample_points(domain=domain, **kwargs)
    low = None
    high = None
    for p in points:
        if low is None or p[1] < low:
            low = p[1]
        if high is None or p[1] > high:
            high = p[1]
    if low is None or high is None:
        return Interval.empty()
    return Interval(low, high)
def harmonic_smas(self, base_degree, count, stride=1, is_period=False, **kwargs)

Returns count SMAs from small to large. Their degrees are proportional to the golden ratio.

Expand source code
def harmonic_smas(self, base_degree, count, stride=1, is_period=False, **kwargs):
    """
    Returns `count` SMAs from small to large. Their degrees
    are proportional to the golden ratio.
    """
    periods = []
    smas = []
    step = stride + 1

    for i in range(count):
        period = base_degree * GOLD ** float(i * step)
        period = round(period / base_degree) * base_degree
        periods.append(period)

    for i in range(count):
        period = periods[i]
        sma = self.sma(period, is_period=is_period, **kwargs)
        smas.append(sma)

    return smas
def integral(self, const=0, interpolation=None, uniform=True)
Expand source code
def integral(self, const=0, interpolation=None, uniform=True):
    from .integral import Integral

    return Integral(self, const=const, interpolation=interpolation, uniform=uniform)
def last_point(self)
Expand source code
def last_point(self):
    d = self.domain
    if d.is_empty:
        return None
    return (d.end, self.y(d.end))
def log(self, base=2.718281828459045)
Expand source code
def log(self, base=math.e):
    return type(self).log_many([self, base])
def map(self, tfm, skip_none=False, name=None, **kwargs)
Expand source code
def map(self, tfm, skip_none=False, name=None, **kwargs):
    from .map import Map
    return Map(self, tfm, skip_none=skip_none, name=name, **kwargs)
def maximise(self, x, min_step=1e-05, step=None, max_iterations=1000)
Expand source code
def maximise(self, x, min_step=MIN_STEP, step=None, max_iterations=1000):
    x_max = x
    x_max_previous = None
    iterations = 0
    while iterations < max_iterations:
        iterations += 1
        y = self.y(x_max)
        if y is None:
            return x_max_previous
        dy0 = self.d_y(x_max, forward=False)
        dy1 = self.d_y(x_max, forward=True)
        forward = True
        if dy0 is None and dy1 is None:
            return x_max
        elif dy0 is None:
            if dy1 >= 0:
                forward = True
            else:
                # Sloping into null value
                return None
        elif dy1 is None:
            if dy0 <= 0:
                forward = False
            else:
                # Sloping into null value
                return None
        else:
            if dy0 * dy1 < 0 and dy0 >= 0 and dy1 <= 0:
                # Found maximum
                return x_max

            if dy0 * dy1 < 0:
                # Found minimum
                forward = abs(dy0) < abs(dy1)
            else:
                # On slope
                forward = dy1 > 0

        x_max_previous = x_max
        if forward:
            if step is not None:
                x_max += step
            else:
                x_max = self.x_next(x_max, min_step=min_step)
        else:
            if step is not None:
                x_max -= step
            else:
                x_max = self.x_previous(x_max, min_step=min_step)
    return x_max
def minimise(self, x, min_step=1e-05, step=None, max_iterations=1000)
Expand source code
def minimise(self, x, min_step=MIN_STEP, step=None, max_iterations=1000):
    x_min = x
    x_min_previous = None
    iterations = 0
    while iterations < max_iterations:
        iterations += 1
        y = self.y(x_min)
        if y is None:
            return x_min_previous
        dy0 = self.d_y(x_min, forward=False)
        dy1 = self.d_y(x_min, forward=True)
        forward = True
        if dy0 is None and dy1 is None:
            return x_min
        elif dy0 is None:
            if dy1 <= 0:
                forward = True
            else:
                # Sloping into null value
                return None
        elif dy1 is None:
            if dy0 >= 0:
                forward = False
            else:
                # Sloping into null value
                return None
        else:
            if dy0 * dy1 < 0 and dy0 <= 0 and dy1 >= 0:
                # Found minimum
                return x_min

            if dy0 * dy1 < 0:
                # Found maximum
                forward = abs(dy0) < abs(dy1)
            else:
                # On slope
                forward = dy1 < 0

        x_min_previous = x_min
        if forward:
            if step is not None:
                x_min += step
            else:
                x_min = self.x_next(x_min, min_step=min_step)
        else:
            if step is not None:
                x_min -= step
            else:
                x_min = self.x_previous(x_min, min_step=min_step)
    return x_min
def multiplicative_inverse(self)
Expand source code
def multiplicative_inverse(self):
    return self.map(_multiplicative_inverse)
def multiply(self, func)
Expand source code
def multiply(self, func):
    return Curve.multiply_many([self, func])
def next_point(self, x, min_step=1e-05)
Expand source code
def next_point(self, x, min_step=MIN_STEP):
    x1 = self.x_next(x, min_step=min_step)
    if x1 is None:
        return None
    y1 = self.y(x1)
    return (x1, y1)
def offset(self, x, duration=None)
Expand source code
def offset(self, x, duration=None):
    from .offset import Offset
    return Offset(self, x, duration=duration)
def pow(self, power)
Expand source code
def pow(self, power):
    return type(self).pow_many([self, power])
def previous_point(self, x, min_step=1e-05)
Expand source code
def previous_point(self, x, min_step=MIN_STEP):
    x1 = self.x_previous(x, min_step=min_step)
    if x1 is None:
        return None
    y1 = self.y(x1)
    return (x1, y1)
def raised(self, base)
Expand source code
def raised(self, base):
    return type(self).pow_many([base, self])
def regression(self, domain=None, min_step=1e-05, step=None)
Expand source code
def regression(self, domain=None, min_step=MIN_STEP, step=None):
    points = self.sample_points(domain=domain, min_step=min_step, step=step)
    for p in points:
        if p[1] is None:
            return None
    count = len(points)
    if count < 2:
        return None
        
    from .line import Line
    if count == 2:
        return Line(p1=points[0], p2=points[1])
    xy = np.vstack(points)
    x = xy[:,0]
    y = xy[:,1]
    A = np.array([x, np.ones(count)])

    # Regression
    w = np.linalg.lstsq(A.T, y, rcond=None)[0]
    m = w[0]
    c = w[1]

    return Line(const=c, slope=m)
def remove_observer(self, token_or_obj)
Expand source code
def remove_observer(self, token_or_obj):
    if isinstance(token_or_obj, Number):
        if token_or_obj in self._observer_data:
            del self._observer_data[token_or_obj]
            self._ordered_observer_tokens.remove(token_or_obj)
    else:
        for token in list(self._ordered_observer_tokens):
            obj_ref = self._observer_data[token][0]
            if obj_ref is not None:
                obj = obj_ref()
                if obj is None or obj == token_or_obj:
                    del self._observer_data[token]
                    self._ordered_observer_tokens.remove(token)
def resolve_min_step(self, min_step)
Expand source code
def resolve_min_step(self, min_step):
    if min_step is None and self.min_step is None:
        return None
    elif min_step is None:
        return self.min_step
    elif self.min_step is None:
        return min_step
    else:
        return max(min_step, self.min_step)
def rsi(self, degree, **kwargs)
Expand source code
def rsi(self, degree, **kwargs):
    d = self.differential()
    du = Curve.max([d, 0], ignore_empty=False)
    dd = Curve.max([-d, 0], ignore_empty=False)
    rs = du.ema(1 / degree, **kwargs) / dd.ema(1 / degree, **kwargs)
    rsi = 100 - 100 / (1 + rs)
    rsi.name = f'rsi({degree})'
    return rsi
def sample_points(self, domain=None, min_step=1e-05, step=None)
Expand source code
def sample_points(self, domain=None, min_step=MIN_STEP, step=None):
    min_step = self.resolve_min_step(min_step)
    if domain is None:
        domain = self.domain
    else:
        domain = Interval.intersection([self.domain, domain])
    if domain.is_empty:
        return []
    elif not domain.is_finite:
        raise Exception("Cannot sample points on an infinite domain {}. Specify a finite domain.".format(domain))
    x_start, x_end = domain
    x_end_bin = round(x_end / min_step) if min_step is not None else x_end
    if domain.start_open:
        points = []
    else:
        points = [(x_start, self.y(x_start))]
    if step is not None:
        x = x_start + step
        while x <= x_end:
            y = self.y(x)
            points.append((x, y))
            x += step
    elif min_step is not None and min_step > 0:
        x = self.x_next(x_start, min_step=min_step, limit=x_end)
        while x is not None and x <= x_end:
            y = self.y(x)
            points.append((x, y))
            x_bin = round(x / min_step) if min_step is not None else x
            if x_bin == x_end_bin:
                break
            x1 = self.x_next(x, min_step=min_step, limit=x_end)
            if x1 is not None:
                x1_bin = round(x1 / min_step) if min_step is not None else x1
                if x1_bin <= x_bin:
                    raise Exception('Next x value {} should be greater than the previous x value {} by at least the minimum step of {}'.format(x1, x, min_step))
            x = x1

        if not domain.end_open and points[-1][0] != x_end:
            points.append((x_end, self.y(x_end)))
    else:
        raise Exception("Bad functions sample parameters.")

    return points
def sample_points_from_x(self, x, limit, backward=False, open=False, min_step=None)
Expand source code
def sample_points_from_x(self, x, limit, backward=False, open=False, min_step=None):
    assert limit is not None
    if limit < 0:
        limit = -limit
        backward = not backward
    min_step = self.resolve_min_step(min_step)
    points = []
    x1 = x
    i = 0

    if not open:
        if x is None:
            return points
        y = self.y(x)
        if y is None:
            return points
        i += 1

    while limit is None or i < limit:
        if not backward:
            x1 = self.x_next(x1, min_step=min_step)
        else:
            x1 = self.x_previous(x1, min_step=min_step)
        if x1 is None:
            break
        y1 = self.y(x1)
        if y1 is None:
            break
        points.append((x1, y1))
        i += 1

    return points
def set_needs_interval_update(self)
Expand source code
def set_needs_interval_update(self):
    self._domain = None
def sma(self, degree, is_period=False, **kwargs)
Expand source code
def sma(self, degree, is_period=False, **kwargs):
    from .sma import SMA
    return SMA(self, degree, is_period=is_period, **kwargs)
def smma(self, degree, **kwargs)
Expand source code
def smma(self, degree, **kwargs):
    from .sma import SMA
    from .ema import EMA
    sma = SMA(self, degree, is_period=False, **kwargs)
    ema = EMA(self, 1 / degree, is_period=False, init=sma, **kwargs)
    return ema
def subset(self, domain)
Expand source code
def subset(self, domain):
    from .generic import Generic
    return Generic(self, domain=domain, min_step=self.min_step)
def subtract(self, func)
Expand source code
def subtract(self, func):
    return Curve.subtract_many([self, func])
def trailing_max(self, degree, is_period=False, interpolation=None, min_step=1e-05, uniform=True)
Expand source code
def trailing_max(self, degree, is_period=False, interpolation=None, min_step=MIN_STEP, uniform=True):
    return self.accumulator_map(
        max,
        degree,
        is_period=is_period,
        interpolation=interpolation,
        min_step=min_step,
        uniform=uniform
    )
def trailing_min(self, degree, is_period=False, interpolation=None, min_step=1e-05, uniform=True)
Expand source code
def trailing_min(self, degree, is_period=False, interpolation=None, min_step=MIN_STEP, uniform=True):
    return self.accumulator_map(
        min,
        degree,
        is_period=is_period,
        interpolation=interpolation,
        min_step=min_step,
        uniform=uniform
    )
def x(self, y)
Expand source code
def x(self, y):
    raise Exception("Not implemented")
def x_next(self, x, min_step=1e-05, limit=None)
Expand source code
def x_next(self, x, min_step=MIN_STEP, limit=None):
    min_step = self.resolve_min_step(min_step)
    if math.isinf(min_step):
        x1 = self.domain.end
    else:
        x1 = x + min_step
    if limit is not None and x1 > limit:
        x1 = limit
    if not self.domain.contains(x1, enforce_start=False):
        return None
    return x1
def x_previous(self, x, min_step=1e-05, limit=None)
Expand source code
def x_previous(self, x, min_step=MIN_STEP, limit=None):
    min_step = self.resolve_min_step(min_step)
    if math.isinf(min_step):
        x1 = self.domain.start
    else:
        x1 = x - min_step
    if limit is not None and x1 < limit:
        x1 = limit
    if not self.domain.contains(x1, enforce_end=False):
        return None
    return x1
def y(self, x)
Expand source code
def y(self, x):
    raise Exception("Not implemented")
def y_end(self)
Expand source code
def y_end(self):
    return self.y(self.domain.end)
def y_start(self)
Expand source code
def y_start(self):
    return self.y(self.domain.start)