Module curvepy.scan

Expand source code
import math
from .curve import Curve, MIN_STEP
from .constant import Constant
from intervalpy import Interval

class Scan(Curve):

    def get_domain(self):
        return self.curve.domain

    def __init__(self, func, tfm, min_step=MIN_STEP):
        super().__init__(min_step=min_step)
        self.curve = Curve.parse(func)
        self.tfm = tfm
        self.scan_start = None
        self.current = None
        self._observer_token = self.curve.add_observer(begin=self.begin_scan_update, end=self.end_scan_update, prioritize=True)

    def __del__(self):
        try:
            self.curve.remove_observer(self._observer_token)
        except Exception:
            pass

    def y(self, x):
        if not self.curve.domain.contains(x) and not self.domain.contains(x):
            return None
        self.scan(x)
        return self.scanned_y(x)

    def scanned_y(self, x):
        raise Exception("Not implemented")

    def continue_scan(self, x):
        return self.current < x

    def init_scan(self, x):
        if self.curve.domain.is_empty or self.curve.domain.is_negative_infinite:
            return x
        x0 = self.curve.domain.start
        if not self.domain.contains(x0):
            x0 = self.curve.x_next(x0, min_step=self.min_step)
        return x0

    def offset_scan(self, x):
        return x

    def reset_scan(self):
        self.scan_start = None
        self.current = None

    def scan(self, x0):
        x = self.offset_scan(x0)
        if self.scan_start is not None and x < self.scan_start:
            self.reset_scan()
        while self.current is None or self.continue_scan(x0):
            if self.current is None:
                self.scan_start = self.init_scan(x)
                current = self.scan_start
            else:
                current = self.x_next(self.current, min_step=self.min_step, limit=x0)
            if current is None or not self.curve.domain.contains(current):
                break
            if self.current is not None and current <= self.current:
                raise Exception('Next scan x value ({}) is smaller than or equal to the current scan x value ({}).'.format(current, self.current))
            self.tfm(current, self.curve.y(current))
            self.current = current

    def sample_points(self, domain=None, min_step=MIN_STEP, step=None):
        min_step = self.resolve_min_step(min_step)
        if domain is None:
            domain = self.domain
        else:
            domain = Interval.intersection([self.domain, domain])
        if domain.is_empty:
            return []
        elif domain.is_infinite:
            raise Exception("Cannot sample points on an infinite domain. Specify a finite domain.")
        self.scan(domain.start)
        self.scan(domain.end)
        return super().sample_points(domain=domain, min_step=min_step, step=step)

    def x_previous(self, x, min_step=MIN_STEP, limit=None):
        min_step = self.resolve_min_step(min_step)
        return self.curve.x_previous(x, min_step=min_step, limit=limit)

    def x_next(self, x, min_step=MIN_STEP, limit=None):
        min_step = self.resolve_min_step(min_step)
        return self.curve.x_next(x, min_step=min_step, limit=limit)

    def begin_scan_reset(self):
        self.reset_scan()
        self.begin_update(Interval.infinite())

    def end_scan_reset(self):
        self.end_update(Interval.infinite())

    def begin_scan_update(self, domain):
        domain = self._update_interval(domain)
        if self.current is not None:
            if domain.start < self.current or (domain.start == self.current and not domain.start_open):
                self.reset_scan()

        if self.current is not None and self.scan_start is not None and self.current < self.scan_start:
            self.reset_scan()

        self.begin_update(domain)

    def end_scan_update(self, domain):
        domain = self._update_interval(domain)
        self.end_update(domain)

    def _update_interval(self, domain):
        return Interval(domain.start, math.inf, start_open=domain.start_open, end_open=True)

Classes

class Scan (func, tfm, min_step=1e-05)
Expand source code
class Scan(Curve):

    def get_domain(self):
        return self.curve.domain

    def __init__(self, func, tfm, min_step=MIN_STEP):
        super().__init__(min_step=min_step)
        self.curve = Curve.parse(func)
        self.tfm = tfm
        self.scan_start = None
        self.current = None
        self._observer_token = self.curve.add_observer(begin=self.begin_scan_update, end=self.end_scan_update, prioritize=True)

    def __del__(self):
        try:
            self.curve.remove_observer(self._observer_token)
        except Exception:
            pass

    def y(self, x):
        if not self.curve.domain.contains(x) and not self.domain.contains(x):
            return None
        self.scan(x)
        return self.scanned_y(x)

    def scanned_y(self, x):
        raise Exception("Not implemented")

    def continue_scan(self, x):
        return self.current < x

    def init_scan(self, x):
        if self.curve.domain.is_empty or self.curve.domain.is_negative_infinite:
            return x
        x0 = self.curve.domain.start
        if not self.domain.contains(x0):
            x0 = self.curve.x_next(x0, min_step=self.min_step)
        return x0

    def offset_scan(self, x):
        return x

    def reset_scan(self):
        self.scan_start = None
        self.current = None

    def scan(self, x0):
        x = self.offset_scan(x0)
        if self.scan_start is not None and x < self.scan_start:
            self.reset_scan()
        while self.current is None or self.continue_scan(x0):
            if self.current is None:
                self.scan_start = self.init_scan(x)
                current = self.scan_start
            else:
                current = self.x_next(self.current, min_step=self.min_step, limit=x0)
            if current is None or not self.curve.domain.contains(current):
                break
            if self.current is not None and current <= self.current:
                raise Exception('Next scan x value ({}) is smaller than or equal to the current scan x value ({}).'.format(current, self.current))
            self.tfm(current, self.curve.y(current))
            self.current = current

    def sample_points(self, domain=None, min_step=MIN_STEP, step=None):
        min_step = self.resolve_min_step(min_step)
        if domain is None:
            domain = self.domain
        else:
            domain = Interval.intersection([self.domain, domain])
        if domain.is_empty:
            return []
        elif domain.is_infinite:
            raise Exception("Cannot sample points on an infinite domain. Specify a finite domain.")
        self.scan(domain.start)
        self.scan(domain.end)
        return super().sample_points(domain=domain, min_step=min_step, step=step)

    def x_previous(self, x, min_step=MIN_STEP, limit=None):
        min_step = self.resolve_min_step(min_step)
        return self.curve.x_previous(x, min_step=min_step, limit=limit)

    def x_next(self, x, min_step=MIN_STEP, limit=None):
        min_step = self.resolve_min_step(min_step)
        return self.curve.x_next(x, min_step=min_step, limit=limit)

    def begin_scan_reset(self):
        self.reset_scan()
        self.begin_update(Interval.infinite())

    def end_scan_reset(self):
        self.end_update(Interval.infinite())

    def begin_scan_update(self, domain):
        domain = self._update_interval(domain)
        if self.current is not None:
            if domain.start < self.current or (domain.start == self.current and not domain.start_open):
                self.reset_scan()

        if self.current is not None and self.scan_start is not None and self.current < self.scan_start:
            self.reset_scan()

        self.begin_update(domain)

    def end_scan_update(self, domain):
        domain = self._update_interval(domain)
        self.end_update(domain)

    def _update_interval(self, domain):
        return Interval(domain.start, math.inf, start_open=domain.start_open, end_open=True)

Ancestors

Subclasses

Methods

def begin_scan_reset(self)
Expand source code
def begin_scan_reset(self):
    self.reset_scan()
    self.begin_update(Interval.infinite())
def begin_scan_update(self, domain)
Expand source code
def begin_scan_update(self, domain):
    domain = self._update_interval(domain)
    if self.current is not None:
        if domain.start < self.current or (domain.start == self.current and not domain.start_open):
            self.reset_scan()

    if self.current is not None and self.scan_start is not None and self.current < self.scan_start:
        self.reset_scan()

    self.begin_update(domain)
def continue_scan(self, x)
Expand source code
def continue_scan(self, x):
    return self.current < x
def end_scan_reset(self)
Expand source code
def end_scan_reset(self):
    self.end_update(Interval.infinite())
def end_scan_update(self, domain)
Expand source code
def end_scan_update(self, domain):
    domain = self._update_interval(domain)
    self.end_update(domain)
def get_domain(self)
Expand source code
def get_domain(self):
    return self.curve.domain
def init_scan(self, x)
Expand source code
def init_scan(self, x):
    if self.curve.domain.is_empty or self.curve.domain.is_negative_infinite:
        return x
    x0 = self.curve.domain.start
    if not self.domain.contains(x0):
        x0 = self.curve.x_next(x0, min_step=self.min_step)
    return x0
def offset_scan(self, x)
Expand source code
def offset_scan(self, x):
    return x
def reset_scan(self)
Expand source code
def reset_scan(self):
    self.scan_start = None
    self.current = None
def sample_points(self, domain=None, min_step=1e-05, step=None)
Expand source code
def sample_points(self, domain=None, min_step=MIN_STEP, step=None):
    min_step = self.resolve_min_step(min_step)
    if domain is None:
        domain = self.domain
    else:
        domain = Interval.intersection([self.domain, domain])
    if domain.is_empty:
        return []
    elif domain.is_infinite:
        raise Exception("Cannot sample points on an infinite domain. Specify a finite domain.")
    self.scan(domain.start)
    self.scan(domain.end)
    return super().sample_points(domain=domain, min_step=min_step, step=step)
def scan(self, x0)
Expand source code
def scan(self, x0):
    x = self.offset_scan(x0)
    if self.scan_start is not None and x < self.scan_start:
        self.reset_scan()
    while self.current is None or self.continue_scan(x0):
        if self.current is None:
            self.scan_start = self.init_scan(x)
            current = self.scan_start
        else:
            current = self.x_next(self.current, min_step=self.min_step, limit=x0)
        if current is None or not self.curve.domain.contains(current):
            break
        if self.current is not None and current <= self.current:
            raise Exception('Next scan x value ({}) is smaller than or equal to the current scan x value ({}).'.format(current, self.current))
        self.tfm(current, self.curve.y(current))
        self.current = current
def scanned_y(self, x)
Expand source code
def scanned_y(self, x):
    raise Exception("Not implemented")
def x_next(self, x, min_step=1e-05, limit=None)
Expand source code
def x_next(self, x, min_step=MIN_STEP, limit=None):
    min_step = self.resolve_min_step(min_step)
    return self.curve.x_next(x, min_step=min_step, limit=limit)
def x_previous(self, x, min_step=1e-05, limit=None)
Expand source code
def x_previous(self, x, min_step=MIN_STEP, limit=None):
    min_step = self.resolve_min_step(min_step)
    return self.curve.x_previous(x, min_step=min_step, limit=limit)
def y(self, x)
Expand source code
def y(self, x):
    if not self.curve.domain.contains(x) and not self.domain.contains(x):
        return None
    self.scan(x)
    return self.scanned_y(x)

Inherited members