Module curvepy.aggregate
Expand source code
from .curve import Curve, MIN_STEP
from intervalpy import Interval
from collections.abc import Sequence
class Aggregate(Curve):
def get_domain(self):
domains = list(map(lambda f: f.domain, self.funcs))
if self.is_union:
return Interval.union(domains)
else:
return Interval.intersection(domains)
def __init__(self, funcs, *args, tfm=None, default=None, union=False, operator=None, name=None):
if not isinstance(funcs, Sequence):
funcs = [funcs] + list(args)
super().__init__()
self.funcs = Curve.parse_many(funcs)
self.tfm = tfm
self.is_union = union
self.operator = operator
self.name = name
self.default = default
unique_funcs = list(set(list(self.funcs)))
self._observer_tokens = list(map(lambda f: f.add_observer(
begin=self.begin_update, end=self.end_update, prioritize=True), unique_funcs))
def __del__(self):
for func, token in zip(self.funcs, self._observer_tokens):
func.remove_observer(token)
def __repr__(self):
try:
params = ", ".join(list(map(str, self.funcs)))
return f'{self.name or self.operator or self.util.__name__}({params})'
# if self.operator is not None:
# def child_str(f):
# if isinstance(f, Aggregate):
# return f'({f})'
# else:
# return str(f)
# return f" {self.operator} ".join(list(map(child_str, self.funcs)))
# else:
# params = ", ".join(list(map(str, self.funcs)))
# return f'{self.name or self.util.__name__}({params})'
except Exception as e:
return super().__repr__() + f'({e})'
def y(self, x):
if not self.domain.contains(x):
return self.default
func_vals = list(map(lambda f: f.y(x), self.funcs))
if self.tfm is None:
return func_vals
return self.tfm(x, func_vals)
def x_previous(self, x, min_step=MIN_STEP, limit=None):
return max_or_none(map(lambda f: f.x_previous(x, min_step=min_step, limit=limit), self.funcs), x)
def x_next(self, x, min_step=MIN_STEP, limit=None):
return min_or_none(map(lambda f: f.x_next(x, min_step=min_step, limit=limit), self.funcs), x)
def min_or_none(iterable, gt):
min = None
for item in iterable:
if item is None:
continue
if min is None or item < min:
if item > gt:
min = item
return min
def max_or_none(iterable, lt):
max = None
for item in iterable:
if item is None:
continue
if max is None or item > max:
if item < lt:
max = item
return max
Functions
def max_or_none(iterable, lt)
-
Expand source code
def max_or_none(iterable, lt): max = None for item in iterable: if item is None: continue if max is None or item > max: if item < lt: max = item return max
def min_or_none(iterable, gt)
-
Expand source code
def min_or_none(iterable, gt): min = None for item in iterable: if item is None: continue if min is None or item < min: if item > gt: min = item return min
Classes
class Aggregate (funcs, *args, tfm=None, default=None, union=False, operator=None, name=None)
-
Expand source code
class Aggregate(Curve): def get_domain(self): domains = list(map(lambda f: f.domain, self.funcs)) if self.is_union: return Interval.union(domains) else: return Interval.intersection(domains) def __init__(self, funcs, *args, tfm=None, default=None, union=False, operator=None, name=None): if not isinstance(funcs, Sequence): funcs = [funcs] + list(args) super().__init__() self.funcs = Curve.parse_many(funcs) self.tfm = tfm self.is_union = union self.operator = operator self.name = name self.default = default unique_funcs = list(set(list(self.funcs))) self._observer_tokens = list(map(lambda f: f.add_observer( begin=self.begin_update, end=self.end_update, prioritize=True), unique_funcs)) def __del__(self): for func, token in zip(self.funcs, self._observer_tokens): func.remove_observer(token) def __repr__(self): try: params = ", ".join(list(map(str, self.funcs))) return f'{self.name or self.operator or self.util.__name__}({params})' # if self.operator is not None: # def child_str(f): # if isinstance(f, Aggregate): # return f'({f})' # else: # return str(f) # return f" {self.operator} ".join(list(map(child_str, self.funcs))) # else: # params = ", ".join(list(map(str, self.funcs))) # return f'{self.name or self.util.__name__}({params})' except Exception as e: return super().__repr__() + f'({e})' def y(self, x): if not self.domain.contains(x): return self.default func_vals = list(map(lambda f: f.y(x), self.funcs)) if self.tfm is None: return func_vals return self.tfm(x, func_vals) def x_previous(self, x, min_step=MIN_STEP, limit=None): return max_or_none(map(lambda f: f.x_previous(x, min_step=min_step, limit=limit), self.funcs), x) def x_next(self, x, min_step=MIN_STEP, limit=None): return min_or_none(map(lambda f: f.x_next(x, min_step=min_step, limit=limit), self.funcs), x)
Ancestors
Methods
def get_domain(self)
-
Expand source code
def get_domain(self): domains = list(map(lambda f: f.domain, self.funcs)) if self.is_union: return Interval.union(domains) else: return Interval.intersection(domains)
def x_next(self, x, min_step=1e-05, limit=None)
-
Expand source code
def x_next(self, x, min_step=MIN_STEP, limit=None): return min_or_none(map(lambda f: f.x_next(x, min_step=min_step, limit=limit), self.funcs), x)
def x_previous(self, x, min_step=1e-05, limit=None)
-
Expand source code
def x_previous(self, x, min_step=MIN_STEP, limit=None): return max_or_none(map(lambda f: f.x_previous(x, min_step=min_step, limit=limit), self.funcs), x)
def y(self, x)
-
Expand source code
def y(self, x): if not self.domain.contains(x): return self.default func_vals = list(map(lambda f: f.y(x), self.funcs)) if self.tfm is None: return func_vals return self.tfm(x, func_vals)
Inherited members