Module curvepy.extremas

Expand source code
import bisect
from .scan import Scan
from .curve import Curve, MIN_STEP
from intervalpy import Interval

class Extremas(Scan):

    # TODO: Extremas doesn't need to be a subclass of `Scan` as it only needs to find the local extrema about a point.

    def __init__(self, func, ref_func, min_deviation=0, min_step=MIN_STEP):
        self.ref_func = Curve.parse(ref_func)
        self.min_deviation = abs(min_deviation)
        self.extremas = []
        self.extrema_xs = []
        self.extrema_interval = Interval.empty()
        self.possible_extrema = None
        self.possible_extrema_phase = None
        self._did_update_extremas()
        super().__init__(func, self._extrema_scan, min_step=min_step)
        self._ref_observer_token = self.ref_func.add_observer(begin=self.begin_update, end=self.end_update)

    def __del__(self):
        self.ref_func.remove_observer(self._ref_observer_token)

    def scanned_y(self, x):
        if not self.extrema_interval.contains(x):
            return None
        if x == self.extrema_interval.start:
            return self.extremas[0][1]
        elif x == self.extrema_interval.end:
            return self.extremas[-1][1]
        i = bisect.bisect(self.extrema_xs, x)
        p1 = self.extremas[i - 1]
        p2 = self.extremas[i]
        u = (x - p1[0]) / (p2[0] - p1[0])
        return (1 - u) * p1[1] + u * p2[1]

    def continue_scan(self, x):
        if super().continue_scan(x):
            return True
        if not self.domain.contains(self.current):
            return False
        return not self.extrema_interval.contains(x, enforce_start=False)

    def x_previous(self, x, min_step=MIN_STEP, limit=None):
        min_step = self.resolve_min_step(min_step)
        if self.extrema_interval.contains(x - min_step):
            i = bisect.bisect_left(self.extrema_xs, x - min_step)
            x1 = self.extremas[i][0]
            if x1 > x - min_step:
                x1 = self.extremas[i - 1][0]
            return x1
        return self.curve.x_previous(x, min_step=min_step, limit=limit)

    def x_next(self, x, min_step=MIN_STEP, limit=None):
        min_step = self.resolve_min_step(min_step)
        if self.extrema_interval.contains(x + min_step):
            i = bisect.bisect_left(self.extrema_xs, x + min_step)
            x1 = self.extremas[i][0]
            if x1 < x + min_step:
                x1 = self.extremas[i + 1][0]
            return x1
        return self.curve.x_next(x, min_step=min_step, limit=limit)

    def begin_update(self, domain):
        super().begin_update(domain)
        # remove stale points
        for i in reversed(range(len(self.extremas))):
            x = self.extrema_xs[i]
            if domain.contains(x):
                self._remove_extrema_index(i)
            elif domain.start > x:
                break

        # scan from last extrema
        extrema_count = len(self.extremas)
        if extrema_count == 0:
            self.current = None
        else:
            last_extrema = self.extremas[-1]
            x = last_extrema[0]
            self.current = x
            self.possible_extrema = last_extrema
            self.possible_extrema_phase = last_extrema[1] - self.ref_func(x)

    def sample_points(self, domain=None, min_step=MIN_STEP, step=None):
        points = super().sample_points(domain=domain, min_step=min_step, step=step)
        return list(filter(lambda p: p[1] is not None, points))

    def _extrema_scan(self, x, y):
        if y is None:
            return
        y0 = self.ref_func(x)
        if y0 is None:
            return
        div = y / y0 - 1
        if abs(div) <= self.min_deviation:
            # function is too close to reference function
            return
        phase = y - y0
        if self.possible_extrema is None:
            self.possible_extrema = (x, y)
            self.possible_extrema_phase = phase
            return
        is_possible_extrema = False
        if self.possible_extrema_phase * phase < 0:
            # phase inflection
            self._confirm_extrema()
            is_possible_extrema = True
        elif (phase > 0 and y > self.possible_extrema[1]) or (phase < 0 and y < self.possible_extrema[1]):
            is_possible_extrema = True
        if is_possible_extrema:
            self.possible_extrema = (x, y)
            self.possible_extrema_phase = phase
    
    def _remove_extrema_index(self, i):
        del self.extremas[i]
        del self.extrema_xs[i]
        self._did_update_extremas()

    def _confirm_extrema(self):
        extrema = self.possible_extrema

        self.extremas.append(extrema)
        self.extrema_xs.append(extrema[0])
        self._did_update_extremas()

    def _did_update_extremas(self):
        self.possible_extrema = None
        self.possible_extrema_phase = None

        if len(self.extremas) == 0:
            self.extrema_interval = Interval.empty()
        else:
            self.extrema_interval = Interval(self.extrema_xs[0], self.extrema_xs[-1], start_open=False, end_open=False)

    def _i(self, x):
        if not self.extrema_interval.contains(x):
            return None
        if x == self.extrema_interval.start:
            return 0
        elif x == self.extrema_interval.end:
            return len(self.extremas) - 1
        i = bisect.bisect(self.extrema_xs, x)
        p1 = self.extremas[i - 1]
        p2 = self.extremas[i]
        u = (x - p1[0]) / (p2[0] - p1[0])
        return i + u

Classes

class Extremas (func, ref_func, min_deviation=0, min_step=1e-05)
Expand source code
class Extremas(Scan):

    # TODO: Extremas doesn't need to be a subclass of `Scan` as it only needs to find the local extrema about a point.

    def __init__(self, func, ref_func, min_deviation=0, min_step=MIN_STEP):
        self.ref_func = Curve.parse(ref_func)
        self.min_deviation = abs(min_deviation)
        self.extremas = []
        self.extrema_xs = []
        self.extrema_interval = Interval.empty()
        self.possible_extrema = None
        self.possible_extrema_phase = None
        self._did_update_extremas()
        super().__init__(func, self._extrema_scan, min_step=min_step)
        self._ref_observer_token = self.ref_func.add_observer(begin=self.begin_update, end=self.end_update)

    def __del__(self):
        self.ref_func.remove_observer(self._ref_observer_token)

    def scanned_y(self, x):
        if not self.extrema_interval.contains(x):
            return None
        if x == self.extrema_interval.start:
            return self.extremas[0][1]
        elif x == self.extrema_interval.end:
            return self.extremas[-1][1]
        i = bisect.bisect(self.extrema_xs, x)
        p1 = self.extremas[i - 1]
        p2 = self.extremas[i]
        u = (x - p1[0]) / (p2[0] - p1[0])
        return (1 - u) * p1[1] + u * p2[1]

    def continue_scan(self, x):
        if super().continue_scan(x):
            return True
        if not self.domain.contains(self.current):
            return False
        return not self.extrema_interval.contains(x, enforce_start=False)

    def x_previous(self, x, min_step=MIN_STEP, limit=None):
        min_step = self.resolve_min_step(min_step)
        if self.extrema_interval.contains(x - min_step):
            i = bisect.bisect_left(self.extrema_xs, x - min_step)
            x1 = self.extremas[i][0]
            if x1 > x - min_step:
                x1 = self.extremas[i - 1][0]
            return x1
        return self.curve.x_previous(x, min_step=min_step, limit=limit)

    def x_next(self, x, min_step=MIN_STEP, limit=None):
        min_step = self.resolve_min_step(min_step)
        if self.extrema_interval.contains(x + min_step):
            i = bisect.bisect_left(self.extrema_xs, x + min_step)
            x1 = self.extremas[i][0]
            if x1 < x + min_step:
                x1 = self.extremas[i + 1][0]
            return x1
        return self.curve.x_next(x, min_step=min_step, limit=limit)

    def begin_update(self, domain):
        super().begin_update(domain)
        # remove stale points
        for i in reversed(range(len(self.extremas))):
            x = self.extrema_xs[i]
            if domain.contains(x):
                self._remove_extrema_index(i)
            elif domain.start > x:
                break

        # scan from last extrema
        extrema_count = len(self.extremas)
        if extrema_count == 0:
            self.current = None
        else:
            last_extrema = self.extremas[-1]
            x = last_extrema[0]
            self.current = x
            self.possible_extrema = last_extrema
            self.possible_extrema_phase = last_extrema[1] - self.ref_func(x)

    def sample_points(self, domain=None, min_step=MIN_STEP, step=None):
        points = super().sample_points(domain=domain, min_step=min_step, step=step)
        return list(filter(lambda p: p[1] is not None, points))

    def _extrema_scan(self, x, y):
        if y is None:
            return
        y0 = self.ref_func(x)
        if y0 is None:
            return
        div = y / y0 - 1
        if abs(div) <= self.min_deviation:
            # function is too close to reference function
            return
        phase = y - y0
        if self.possible_extrema is None:
            self.possible_extrema = (x, y)
            self.possible_extrema_phase = phase
            return
        is_possible_extrema = False
        if self.possible_extrema_phase * phase < 0:
            # phase inflection
            self._confirm_extrema()
            is_possible_extrema = True
        elif (phase > 0 and y > self.possible_extrema[1]) or (phase < 0 and y < self.possible_extrema[1]):
            is_possible_extrema = True
        if is_possible_extrema:
            self.possible_extrema = (x, y)
            self.possible_extrema_phase = phase
    
    def _remove_extrema_index(self, i):
        del self.extremas[i]
        del self.extrema_xs[i]
        self._did_update_extremas()

    def _confirm_extrema(self):
        extrema = self.possible_extrema

        self.extremas.append(extrema)
        self.extrema_xs.append(extrema[0])
        self._did_update_extremas()

    def _did_update_extremas(self):
        self.possible_extrema = None
        self.possible_extrema_phase = None

        if len(self.extremas) == 0:
            self.extrema_interval = Interval.empty()
        else:
            self.extrema_interval = Interval(self.extrema_xs[0], self.extrema_xs[-1], start_open=False, end_open=False)

    def _i(self, x):
        if not self.extrema_interval.contains(x):
            return None
        if x == self.extrema_interval.start:
            return 0
        elif x == self.extrema_interval.end:
            return len(self.extremas) - 1
        i = bisect.bisect(self.extrema_xs, x)
        p1 = self.extremas[i - 1]
        p2 = self.extremas[i]
        u = (x - p1[0]) / (p2[0] - p1[0])
        return i + u

Ancestors

Methods

def begin_update(self, domain)
Expand source code
def begin_update(self, domain):
    super().begin_update(domain)
    # remove stale points
    for i in reversed(range(len(self.extremas))):
        x = self.extrema_xs[i]
        if domain.contains(x):
            self._remove_extrema_index(i)
        elif domain.start > x:
            break

    # scan from last extrema
    extrema_count = len(self.extremas)
    if extrema_count == 0:
        self.current = None
    else:
        last_extrema = self.extremas[-1]
        x = last_extrema[0]
        self.current = x
        self.possible_extrema = last_extrema
        self.possible_extrema_phase = last_extrema[1] - self.ref_func(x)
def continue_scan(self, x)
Expand source code
def continue_scan(self, x):
    if super().continue_scan(x):
        return True
    if not self.domain.contains(self.current):
        return False
    return not self.extrema_interval.contains(x, enforce_start=False)
def sample_points(self, domain=None, min_step=1e-05, step=None)
Expand source code
def sample_points(self, domain=None, min_step=MIN_STEP, step=None):
    points = super().sample_points(domain=domain, min_step=min_step, step=step)
    return list(filter(lambda p: p[1] is not None, points))
def scanned_y(self, x)
Expand source code
def scanned_y(self, x):
    if not self.extrema_interval.contains(x):
        return None
    if x == self.extrema_interval.start:
        return self.extremas[0][1]
    elif x == self.extrema_interval.end:
        return self.extremas[-1][1]
    i = bisect.bisect(self.extrema_xs, x)
    p1 = self.extremas[i - 1]
    p2 = self.extremas[i]
    u = (x - p1[0]) / (p2[0] - p1[0])
    return (1 - u) * p1[1] + u * p2[1]
def x_next(self, x, min_step=1e-05, limit=None)
Expand source code
def x_next(self, x, min_step=MIN_STEP, limit=None):
    min_step = self.resolve_min_step(min_step)
    if self.extrema_interval.contains(x + min_step):
        i = bisect.bisect_left(self.extrema_xs, x + min_step)
        x1 = self.extremas[i][0]
        if x1 < x + min_step:
            x1 = self.extremas[i + 1][0]
        return x1
    return self.curve.x_next(x, min_step=min_step, limit=limit)
def x_previous(self, x, min_step=1e-05, limit=None)
Expand source code
def x_previous(self, x, min_step=MIN_STEP, limit=None):
    min_step = self.resolve_min_step(min_step)
    if self.extrema_interval.contains(x - min_step):
        i = bisect.bisect_left(self.extrema_xs, x - min_step)
        x1 = self.extremas[i][0]
        if x1 > x - min_step:
            x1 = self.extremas[i - 1][0]
        return x1
    return self.curve.x_previous(x, min_step=min_step, limit=limit)

Inherited members