"""
Interface between the models and the fitters.
:class:`Fitness` defines the interface that model evaluators can follow.
These models can be bundled together into a :func:`FitProblem` and sent
to :class:`bumps.fitters.FitDriver` for optimization and uncertainty
analysis.
Summary of problem attributes::
# Used by fitters
nllf(p: Optional[Vector]) -> float # main calculation
bounds() -> Tuple(Vector, Vector) # or equivalent sequence
setp(p: Vector) -> None
getp() -> Vector
residuals() -> Vector # for LM, MPFit
parameter_residuals() -> Vector # for LM, MPFit
constraints_nllf() -> float # for LM, MPFit; constraint cost is spread across the individual residuals
randomize() -> None # for multistart
resynth_data() -> None # for Monte Carlo resampling of maximum likelihood
restore_data() -> None # for Monte Carlo resampling of maximum likelihood
name: str # DREAM uses this
chisq() -> float
chisq_str() -> str
labels() -> List[str]
summarize() -> str
show() -> None
load(input_path: str) -> None
save(output_path: str) -> None
plot(figfile: str, view: str) -> None
# Set/used by bumps.cli
model_reset() -> None # called by load_model
path: str # set by load_model
name: str # set by load_model
title: str = filename # set by load_moel
options: List[str] # from sys.argv[1:]
undefined:List[int] # when loading a save .par file, these parameters weren't defined
store: str # set by make_store
output_path: str # set by make_store
simulate_data(noise: float) -> None # for --simulate in opts
cov() -> Matrix # for --cov in opts
"""
__all__ = ["Fitness", "FitProblem", "CovarianceMixin", "load_problem"]
from contextlib import contextmanager
from dataclasses import dataclass
import logging
import os
import sys
import traceback
from typing import Generic, TypeVar, Union, Optional
import warnings
from pathlib import Path
import numpy as np
from numpy import inf, isnan, nan
from scipy.stats import chi2
from . import parameter, util
from .parameter import to_dict, Parameter, Variable, tag_all, priors
from .util import format_uncertainty
# Abstract base class:
# can use "isinstance" to check if a class implements the protocol
[docs]
@util.runtime_checkable
@dataclass(init=False)
class Fitness(util.Protocol):
"""
Manage parameters, data, and theory function evaluation.
See :ref:`fitness` for a detailed explanation.
"""
[docs]
def parameters(self) -> util.List[Parameter]:
"""
return the parameters in the model.
model parameters are a hierarchical structure of lists and
dictionaries.
"""
raise NotImplementedError()
[docs]
def update(self):
"""
Called when parameters have been updated. Any cached values will need
to be cleared and the model reevaluated.
"""
pass
[docs]
def numpoints(self):
"""
Return the number of data points.
"""
raise NotImplementedError()
[docs]
def nllf(self):
"""
Return the negative log likelihood value of the current parameter set.
"""
raise NotImplementedError()
[docs]
def resynth_data(self):
"""
Generate fake data based on uncertainties in the real data. For
Monte Carlo resynth-refit uncertainty analysis. Bootstrapping?
"""
raise NotImplementedError()
[docs]
def restore_data(self):
"""
Restore the original data in the model (after resynth).
"""
raise NotImplementedError()
[docs]
def residuals(self):
"""
Return residuals for current theory minus data.
Used for Levenburg-Marquardt, and for plotting.
"""
raise NotImplementedError()
[docs]
def save(self, basename):
"""
Save the model to a file based on basename+extension. This will point
to a path to a directory on a remote machine; don't make any
assumptions about information stored on the server. Return the set of
files saved so that the monitor software can make a pretty web page.
"""
pass
[docs]
def plot(self, view="linear"):
"""
Plot the model to the current figure. You only get one figure, but you
can make it as complex as you want. This will be saved as a png on
the server, and composed onto a results web page.
"""
pass
def no_constraints() -> float:
"""default constraints function for FitProblem"""
return 0
# The functions below would be methods of Fitness if it were a
# base class rather than a protocol.
def fitness_parameters(fitness: Fitness) -> util.List[Parameter]:
"""
Return a list of fittable (non-fixed) parameters in the model
"""
parameters = parameter.unique(fitness.parameters())
return [p for p in parameters if isinstance(getattr(p, "slot", None), parameter.Variable) and not p.fixed]
def fitness_chisq(fitness: Fitness) -> str:
"""
Return a string representing the chisq equivalent of the nllf for
a single dataset. Unlike FitProblem.chisq_str, this does not
include parameter uncertainty or constraint penalty.
"""
npars = len(fitness_parameters(fitness))
dof = fitness.numpoints() - npars
chisq_norm, _ = nllf_scale(dof=dof, npars=npars, norm=True)
# TODO: Check if parameters to fitness are in feasible region before computing nllf?
chisq = fitness.nllf() * chisq_norm
return chisq
def fitness_chisq_str(fitness: Fitness) -> str:
"""
Return a string representing the chisq equivalent of the nllf for
a single dataset. Unlike FitProblem.chisq_str, this does not
include parameter uncertainty or constraint penalty.
"""
npars = len(fitness_parameters(fitness))
dof = fitness.numpoints() - npars
chisq_norm, chisq_err = nllf_scale(dof=dof, npars=npars, norm=True)
# TODO: Check if parameters are in feasible region before computing nllf?
chisq = fitness.nllf() * chisq_norm
text = format_uncertainty(chisq, chisq_err)
# return f"{text} {fitness.nllf()=:.15e} {chisq_norm=:.15e} {chisq_err=:.15e} {dof=}"
return text
def fitness_show_parameters(fitness: Fitness, subs: util.Optional[util.Dict[util.Any, Parameter]] = None):
"""Print the available parameters to the console as a tree."""
print(parameter.format(fitness.parameters(), freevars=subs))
print("[chisq=%s, nllf=%g]" % (fitness_chisq_str(fitness), fitness.nllf()))
FitnessType = TypeVar("FitnessType", bound=Fitness)
[docs]
class CovarianceMixin:
"""
Add methods for *cov*, *show_cov* and *show_err* to a bumps problem definition.
This is done as a mixin because not all problems are FitProblem. See
for example :class:`bumps.pdfwrapper.PDF`.
"""
# Note: if caching is implemented for covariance, make sure it is cleared on setp
# This will be difficult to do as a mixin.
[docs]
def cov(self, x):
r"""
Return an estimate of the covariance of the fit.
Depending on the fitter and the problem, this may be computed from
existing evaluations within the fitter, or from numerical
differentiation around the minimum.
If the problem has residuals available, then the covariance
is derived from the Jacobian::
x = fit.problem.getp()
J = bumps.lsqerror.jacobian(fit.problem, x)
cov = bumps.lsqerror.jacobian_cov(J)
Otherwise, the numerical differentiation will use the Hessian
estimated from nllf::
x = fit.problem.getp()
H = bumps.lsqerror.hessian(fit.problem, x)
cov = bumps.lsqerror.hessian_cov(H)
"""
# Use Jacobian if residuals are available because it is faster
# to compute. Otherwise punt and use Hessian. The has_residuals
# attribute should be True if present. It may be false if
# the problem defines a residuals method but doesn't really
# have residuals (e.g. to allow levenberg-marquardt to run even
# though it is not fitting a sum-square problem).
from bumps import lsqerror
if hasattr(self, "has_residuals"):
has_residuals = self.has_residuals
else:
has_residuals = hasattr(self, "residuals")
if has_residuals:
J = lsqerror.jacobian(self, x)
# print("Jacobian", J)
return lsqerror.jacobian_cov(J)
else:
H = lsqerror.hessian(self, x)
# print("Hessian", H)
return lsqerror.hessian_cov(H)
[docs]
def show_cov(self, x, cov):
maxn = 1000 # max array dims to print
cov_str = np.array2string(
cov,
max_line_width=20 * maxn,
threshold=maxn * maxn,
precision=6, # suppress_small=True,
separator=", ",
)
print("=== Covariance matrix ===")
print(cov_str)
print("=========================")
[docs]
def show_err(self, x, dx):
"""
Display the error approximation from the covariance matrix.
*err* is the standard deviation computed from the covariance matrix. It
is available as *result.dx* from the simple fitter, or using::
from bumps import lsqerror
dx = lsqerror.stderr(problem.cov(x))
Warning: cost to compute cov grows as the cube of the number of parameters.
"""
# TODO: need cheaper uncertainty estimate
# Note: error estimated from hessian diagonal is insufficient.
print("=== Uncertainty from curvature: name value(unc.) ===")
for k, v, dv in zip(self.labels(), x, dx):
print(f"{k:>40s} {format_uncertainty(v, dv):<15s}")
print("=" * 58)
# The default penalty nllf has to be big enough that it won't be swamped by
# the nllf of the residuals even for a 100 Mpixel image, but not so
# big that the distance to the boundary is not lost in the floating point precision.
PENALTY_NLLF = 1e12
# TODO: add filename to fitproblem so we don't have to coordinate it elsewhere?
[docs]
@dataclass(init=False, eq=False)
class FitProblem(Generic[FitnessType], CovarianceMixin):
r"""
*models* is a sequence of :class:`Fitness` instances. Note that they
do not need to all be of the same class.
*weights* is an optional scale factor for each model. A weighted fit
returns nllf $L = \sum w_k^2 L_k$. If an individual nllf is the sum
squared residuals then this is equivalent to scaling the measurement
uncertainty by $1/w$. Unless the measurement uncertainty is unknown,
weights should be in [0, 1], representing an unknown systematic
uncertainty spread across the individual measurements.
*freevars* is :class:`.parameter.FreeVariables` instance defining the
per-model parameter assignments. See :ref:`freevariables` for details.
Additional parameters:
*name* name of the problem
*constraints* is a list of Constraint objects, which have a method to
calculate the nllf for that constraint.
Also supports an alternate form which cannot be serialized:
A function which returns the negative log likelihood
of seeing the parameters independent from the fitness function. Use
this for example to check for feasible regions of the search space, or
to add constraints that cannot be easily calculated per parameter.
Ideally, the constraints nllf will increase as you go farther from
the feasible region so that the fit will be directed toward feasible
values.
*penalty_nllf* is the nllf to use for *fitness* when *constraints*
or model parameter bounds are not satisfied. The total nllf is the
squared distance from the boundary plus the penalty so that the derivative
points the search back to the feasible region. The penalty should be larger
than any nllf you might see near the boundary so that the fit doesn't get
stuck outside, but small enough that penalty plus distance is different from
penalty. The default is 1e12.
Total nllf is the sum of the parameter nllf, the constraints nllf and the
depending on whether constraints is greater than soft_limit, either the
fitness nllf or the penalty nllf.
New in 0.9.0: weights are now squared when computing the sum rather than
linear.
"""
# TODO: problem.path is set by cli.load_model(); should we add it as standard?
name: util.Optional[str]
models: util.List[FitnessType]
freevars: util.Optional[parameter.FreeVariables]
weights: util.Union[util.List[float], util.Literal[None]]
constraints: util.Optional[util.Sequence[parameter.Constraint]]
penalty_nllf: util.Union[float, util.Literal["inf"]]
# The type is not quite correct. The models do not need to be of the same class
_models: util.List[FitnessType]
_priors: util.List[Parameter]
_parameters: util.List[Parameter]
_parameters_by_id: util.Dict[str, Parameter]
_dof: float = np.nan # not a schema field, and is not used in __init__
_active_model_index: int = 0
# _all_constraints: util.List[util.Union[Parameter, Expression]]
def __init__(
self,
models: util.Union[FitnessType, util.List[FitnessType]],
weights=None,
name=None,
constraints=None,
penalty_nllf=None,
freevars=None,
auto_tag=False,
):
if not isinstance(models, (list, tuple)):
models = [models]
if callable(constraints):
raise TypeError("Callable constraints function is no longer supported. Instead use a list of comparisons.")
self.constraints = constraints if constraints is not None else []
if freevars is None:
names = ["M%d" % i for i, _ in enumerate(models)]
freevars = parameter.FreeVariables(names=names)
self.freevars = freevars
if auto_tag:
for index, model in enumerate(models):
model_name = model.name if model.name is not None else f"Model{index}"
tag_all(model.parameters(), model_name)
self._models = models
if weights is None:
weights = [1.0 for _ in models]
self.weights = weights
self.penalty_nllf = float(penalty_nllf) if penalty_nllf is not None else PENALTY_NLLF
self.name = name
# Do these steps last so that it has all of the attributes initialized.
self.model_reset() # sets self._all_constraints
self.set_active_model(0) # Set the active model to model 0
@property
def fitness(self):
warnings.warn("Deprecated: use of problem.fitness will be removed at some point")
if len(self._models) == 1:
return self._models[0]
raise ValueError("problem.fitness is not defined")
@property
def dof(self):
return self._dof
@property
def num_models(self):
return len(self._models)
@property
def models(self):
"""Iterate over models, with free parameters set from model values"""
try:
for index in range(len(self._models)):
index, model = self._switch_model(index)
yield model
finally:
# Restore the active model after cycling, even if interrupted
self._switch_model(self._active_model_index)
# TODO: deprecate set_active_model and push_model
# set_active_model is only used once in the wx gui. push_model is only needed
# if we have a notion of active model.
# noinspection PyAttributeOutsideInit
[docs]
def set_active_model(self, index):
"""
Fetch model *index* with the appropriate free variables substituted.
This will remain the active model until a new active model is selected.
Operations like chisq_str() or plot() which cycle through the models will
restore the parameters upon completion.
"""
index, model = self._switch_model(index)
self._active_model_index, self.active_model = index, model
return model
[docs]
@contextmanager
def push_model(self, index):
"""
Fetch model *index* with the appropriate free variables substituted.
On completion of the context, restore the parameters for the active model.
"""
try:
index, model = self._switch_model(index)
yield model
finally:
# Restore the active model after cycling, even if interrupted
self._switch_model(self._active_model_index)
def _switch_model(self, index):
# print(f"switching to {index} with freevars={bool(self.freevars)}")
if not (-len(self._models) <= index < len(self._models)):
raise IndexError(f"Index {index} invalid when only {len(self._models)} models")
if index < 0:
index = len(self._models) + index
# TODO: FreeVariables destroys caching within the model. Replace it.
# TODO: Only update parameters if index has changed. We can track this in freevars.
self.freevars.set_model(index)
if self.freevars:
# Clear model cache when updating the parameters
getattr(self._models[index], "update", lambda: None)()
return index, self._models[index]
[docs]
def model_parameters(self):
"""Return parameters from all models"""
pars = {}
# Note: the self.models iterator plugs the free variables into
# the model in turn, so no need to walk self.freevars directly.
# The model.update() function is called each time, so whatever
# caching is happening in the model is cleared, and it knows that
# new parameter values have been inserted.
pars["models"] = [f.parameters() for f in self.models]
free = self.freevars.parameters()
if free:
pars["freevars"] = free
return pars
# TODO: no longer used?
[docs]
def to_dict(self):
return {
"type": type(self).__name__,
"name": self.name,
"models": to_dict(self._models),
"weights": self.weights,
"penalty_nllf": self.penalty_nllf,
# TODO: constraints may be a function.
"constraints": to_dict(self.constraints),
"freevars": to_dict(self.freevars),
}
def __repr__(self):
return "FitProblem(name=%s)" % self.name
[docs]
def valid(self, pvec):
"""Return true if the point is in the feasible region"""
return all(v in p.prior for p, v in zip(self._parameters, pvec))
[docs]
def setp(self, pvec):
"""
Set a new value for the parameters into the model. If the model
is valid, calls model_update to signal that the model should be
recalculated.
Returns True if the value is valid and the parameters were set,
otherwise returns False.
"""
# print("Calling setp with", pvec, self._parameters)
for v, p in zip(pvec, self._parameters):
p.value = v
# TODO: setp_hook is a hack to support parameter expressions in sasview
# Don't depend on this existing long term.
setp_hook = getattr(self, "setp_hook", no_constraints)
setp_hook()
self.model_update()
[docs]
def getp(self):
"""
Returns the current value of the parameter vector.
"""
return np.array([p.value for p in self._parameters], "d")
[docs]
def bounds(self):
"""Return the bounds for each parameter as a 2 x N array"""
limits = [p.prior.limits for p in self._parameters]
return np.array(limits, "d").T if limits else np.empty((2, 0))
[docs]
def randomize(self, n=None):
"""
Generates a random model.
*randomize()* sets the model to a random value.
*randomize(n)* returns a population of *n* random models.
For indefinite bounds, the random population distribution is centered
on initial value of the parameter, or 1. if the initial parameter is
not finite.
"""
# TODO: split into two: randomize and random_pop
if n is None:
self.setp(self.randomize(n=1)[0])
return # Not returning anything since no n is requested
# TODO: apply hard limits on parameters
target = self.getp()
target[~np.isfinite(target)] = 1.0
pop = [p.prior.random(n, target=v) for p, v in zip(self._parameters, target)]
return np.array(pop).T
[docs]
def nllf(self, pvec=None) -> float:
"""
Compute the cost function for a new parameter set p.
This is not simply the sum-squared residuals, but instead is the
negative log likelihood of seeing the data given the model parameters
plus the negative log likelihood of seeing the model parameters. The
value is used for a likelihood ratio test so normalization constants
can be ignored. There is an additional penalty value provided by
the model which can be used to implement inequality constraints. Any
penalty should be large enough that it is effectively excluded from
the parameter space returned from uncertainty analysis.
The model is not actually calculated if any of the parameters are
out of bounds, or any of the constraints are not satisfied, but
instead are assigned a value of *penalty_nllf*. This will prevent
expensive models from spending time computing values in the unfeasible
region.
"""
if pvec is not None:
# Note that valid() only checks that the fit parameters are in the bounding box.
# It doesn't check that all the constraints are satisfied. To do that we would
# have to use setp on the vector then loop over p._priors, resetting to the
# original pvec if the new pvec is invalid.
if self.valid(pvec):
self.setp(pvec)
else:
return inf
pparameter, pconstraints, pmodel, failing = self._nllf_components()
# Note: pmodel is zero if any constraints are failing. In that case the cost
# will be the squared distance from the boundary of the feasible region for
# the breaking parameters so that gradient descent can guide us back to the
# feasible region. The penalty would ideally be greater than any value of
# pmodel inside the boundary (otherwise the fitter may prefer the point in the
# infeasible region) but it has to be small enough that adding a small distance
# changes the penalty value (otherwise the slope outside the boundary is zero).
# Currently using 1e12 as the default penalty. This will be too small for many problems
# but any larger and the derivatives will break.
# TODO: Drop the penalty term, either by rewriting the fitters so they know that
# the constraints are broken, or rewriting the constraints to simple box constraints.
cost = pparameter + pconstraints + (self.penalty_nllf if failing else pmodel)
# print(f"prior:{float(pparameter)} + constraint:{float(pconstraints)} + nllf:{float(pmodel)} => {float(cost)} at [{pvec=}]")
if isnan(cost):
# TODO: make sure errors get back to the user
# print "point evaluates to nan"
# print parameter.summarize(self._parameters)
return inf
return cost
[docs]
def parameter_nllf(self) -> util.Tuple[float, util.List[str]]:
"""
Returns negative log likelihood of seeing parameters p.
"""
failing = []
nllf = 0.0
for p in self._priors:
p_nllf = p.nllf()
nllf += p_nllf
if p_nllf == np.inf:
failing.append(str(p))
# print("; ".join(f"{p}({p.value})={p.nllf()}" for p in self._priors))
return nllf, failing
[docs]
def parameter_residuals(self):
"""
Returns negative log likelihood of seeing parameters p.
"""
return [p.residual() for p in self._priors]
[docs]
def chisq(self, nllf: Union[float, util.NDArray, None] = None, norm: bool = True, compact: bool = True):
"""
Returns chisq as a floating point value.
See documentation for :meth:`chisq_str`.
"""
chisq_norm, chisq_err = nllf_scale(dof=self.dof, npars=len(self._parameters), norm=norm)
if nllf is None:
pparameter, pconstraints, pmodel, failing = self._nllf_components()
nllf = pmodel + pparameter + pconstraints + (self.penalty_nllf if failing else 0)
return nllf * chisq_norm
# TODO: Too many versions of chisq about.
# Note: norm and compact are no longer used in bumps, so they are not documented
[docs]
def chisq_str(self, nllf: Optional[float] = None, norm: bool = True, compact: bool = True):
"""
Return a string representing the chisq equivalent of the nllf.
If *nllf* is provided then use that instead of calling the model
evaluator. Fail if *compact* is False.
If the model has strictly gaussian independent uncertainties then the
negative log likelihood function will return 0.5*sum(residuals**2),
which is 1/2*chisq. Since we are printing normalized chisq, we
multiply the model nllf by 2/DOF before displaying the value. This
is different from the problem nllf function, which includes the
cost of the cost of the penalty constraints in the total nllf.
Parameter priors, if any, are treated as independent models
in the total nllf. The constraint value is displayed separately.
**Deprecated**: *norm:bool* and *compact:bool* are ignored.
"""
chisq_norm, chisq_err = nllf_scale(dof=self.dof, npars=len(self._parameters), norm=norm)
failing = []
if nllf is None:
pparameter, pconstraints, pmodel, failing = self._nllf_components()
nllf = pmodel + pparameter + pconstraints + (self.penalty_nllf if failing else 0)
# print(f"{pmodel=} {pparameter=} {pconstraints=} {nllf=} {chisq_norm=} {chisq_err=}")
if failing:
# Text is used in a context like f"χ² = {text}". We are not printing the list
# of failing constraints since parameter names are arbitrarily long.
text = "NaN [out of bounds]"
else:
text = format_uncertainty(nllf * chisq_norm, chisq_err)
# return f"{text} p={self.getp()} => {pmodel:.15e}"
return text
def _nllf_components(self) -> util.Tuple[float, float, float, util.List[str]]:
try:
pparameter, bad_priors = self.parameter_nllf()
pconstraints, bad_constraints = self.constraints_nllf()
failing = bad_priors + bad_constraints
# If constraints are failing don't bother to compute nllf.
# Using pvalue = zero rather than NaN so that handling of penalties is easier.
pmodel = self.model_nllf() if len(failing) == 0 else 0.0
if isnan(pparameter):
# TODO: make sure errors get back to the user
info = ["Parameter nllf is wrong"]
info += ["%s %g" % (p, p.nllf()) for p in self._priors]
logging.error("\n ".join(info))
return pparameter, pconstraints, pmodel, failing
except Exception:
# TODO: make sure errors get back to the user
info = (traceback.format_exc(), parameter.summarize(self._parameters))
logging.error("\n".join(info))
return nan, nan, nan, []
def __call__(self, pvec=None):
"""
Problem cost function.
Returns the negative log likelihood scaled by DOF so that
the result looks like the familiar normalized chi-squared. These
scale factors will not affect the value of the minimum, though some
care will be required when interpreting the uncertainty.
"""
return 2 * self.nllf(pvec) / self.dof
[docs]
def summarize(self):
"""Return a table of current parameter values with range bars."""
return parameter.summarize(self._parameters)
@property
def parameters(self):
"""Return the list of fitted parameters."""
return self._parameters
[docs]
def labels(self) -> util.List[str]:
"""Return the list of labels, one per fitted parameter."""
return [p.name for p in self._parameters]
# noinspection PyAttributeOutsideInit
[docs]
def model_reset(self):
"""
Prepare for the fit.
This sets the parameters and the bounds properties that the
solver is expecting from the fittable object. We also compute
the degrees of freedom so that we can return a normalized fit
likelihood.
If the set of fit parameters changes, then model_reset must
be called.
"""
# print("In model reset with", self.model_parameters())
all_parameters = parameter.unique(self.model_parameters())
# Check priors
broken = []
for p in all_parameters:
if not hasattr(p, "reset_prior"):
continue
p.reset_prior()
limits = p.prior.limits
value = p.value
if (limits[0] > value) or (value > limits[1]):
broken.append(f"{p}={value} is outside {limits}")
elif not np.isfinite(p.prior.nllf(value)):
broken.append(f"{p}={value} is outside {p.prior}")
# Check other constraints.
broken.extend([f"{c} fails" for c in self.constraints if float(c) > 0])
# Show broken constraints. Note that warnings.warn() will only show up once,
# so this acts as a pre-check of the model when running in batch mode on the
# command line.
if len(broken) > 0:
warnings.warn("Unsatisfied constraints: [%s]" % (",\n".join(broken)))
# TODO: return broken_constraints from reset() rather than setting state
self.broken_constraints = broken
# Collect all fitting parameters
pars = []
pars_by_id = {}
for p in all_parameters:
if hasattr(p, "id"):
pars_by_id[p.id] = p
slot = getattr(p, "slot", None)
if isinstance(slot, Variable) and not p.fixed:
pars.append(p)
self._parameters = pars
self._parameters_by_id = pars_by_id
# Collect all priors that need to be evaluated
self._priors = priors(all_parameters)
# The degrees of freedom is the number of data points minus the number of fit
# parameters. Gaussian priors on the parameters are treated as a simultaneous
# fit to the data plus a fit of the parameter to its previously measured value.
# That is, each prior adds a single data point to the fit without changing the
# number of free parameters, thus increasing DOF by 1. Again, the target value
# for a fit with no systematic error should be the number of degrees of freedom.
# Uniform priors do not modify the degrees of freedom, not even soft-bounded uniform.
self._dof = self.model_points() + sum(p.prior.dof for p in self._priors) - len(self._parameters)
if self.dof <= 0:
warnings.warn(
f"Need more data points (currently: {self.model_points()}) than fitting parameters ({len(self._parameters)})"
)
[docs]
def model_points(self):
"""Return number of points in all models"""
return sum(f.numpoints() for f in self.models)
[docs]
def model_update(self):
"""Let all models know they need to be recalculated"""
# The self.models iterator calls update for each model if there are
# free variable substitutions, so no need to update here.
if self.freevars:
return
for f in self.models:
getattr(f, "update", lambda: None)()
[docs]
def model_nllf(self):
"""Return cost function for all data sets"""
# print("In model nllf with", self.getp())
return sum(w**2 * f.nllf() for w, f in zip(self.weights, self.models))
[docs]
def constraints_nllf(self) -> util.Tuple[float, util.List[str]]:
"""Return the cost function for all constraints"""
failing = []
nllf = 0.0
# Process the list of inequality constraints
for c in self.constraints:
# TODO: convert to list of residuals for Levenberg-Marquardt
c_nllf = float(c) ** 2
nllf += c_nllf
if c_nllf > 0:
failing.append(str(c))
return nllf, failing
[docs]
def simulate_data(self, noise=None):
"""Simulate data with added noise"""
for f in self.models:
f.simulate_data(noise=noise)
[docs]
def resynth_data(self):
"""Resynthesize data with noise from the uncertainty estimates."""
for f in self.models:
f.resynth_data()
[docs]
def restore_data(self):
"""Restore original data after resynthesis."""
for f in self.models:
f.restore_data()
@property
def has_residuals(self):
"""
True if all underlying fitness functions define residuals.
"""
return all(hasattr(f, "residuals") for f in self.models)
[docs]
def residuals(self):
resid = np.hstack([w * f.residuals() for w, f in zip(self.weights, self.models)])
return resid
[docs]
def save(self, basename):
for i, f in enumerate(self.models):
if hasattr(f, "save"):
f.save(basename + "-%d" % (i + 1))
[docs]
def show(self):
for i, f in enumerate(self.models):
print("-- Model %d %s" % (i, getattr(f, "name", "")))
fitness_show_parameters(f, subs=self.freevars.get_model(i))
print("[overall chisq=%s, nllf=%g]" % (self.chisq_str(), self.nllf()))
[docs]
def plot(self, p=None, fignum=1, figfile=None, view=None, model_indices=None):
# TODO: remove duplicate logic from FitProblem.plot() and api.get_data_plot
import matplotlib.pyplot as plt
if p is not None:
self.setp(p)
# Don't show the figure calling problem.plot(figfile=base_file), as is
# done during --export=path. If called as problem.plot(), as from a jupyter
# notebook, then swe should show the plot.
show_fig = not figfile
# Overall chisq
overall_chisq_str = self.chisq_str()
for i, f in enumerate(self.models):
outfile = f"{figfile}-model{i}.png" if figfile else None
if model_indices is not None and i not in model_indices:
continue
backend = "matplotlib" if hasattr(f, "plot") else "plotly" if hasattr(f, "plotly") else None
# backend = "plotly" if hasattr(f, "plotly") else "matplotlib" if hasattr(f, "plot") else None
if backend is None:
continue
# TODO: duplicated in bumps.webserver.server.api._get_data_plot_mpl()
if self.num_models > 1:
chisq_str = fitness_chisq_str(f)
chisq = f"χ² = {chisq_str}; overall {overall_chisq_str}"
title = f"Model {i+1}: {f.name}"
else:
chisq = f"χ² = {overall_chisq_str}"
title = f"{f.name}"
fontsize = 16
if backend == "plotly":
fig = f.plotly()
# TODO: text offset of (x=0.5em, y=0.5ex)
text_offset = 0.01 # portion of graph axis length
font = dict(size=16)
fig.add_annotation(
x=text_offset,
y=1 + text_offset,
xanchor="left",
yanchor="bottom",
xref="paper",
yref="paper",
text=title,
showarrow=False,
font=font,
)
fig.add_annotation(
x=1 - text_offset,
y=1 + text_offset,
xanchor="right",
yanchor="bottom",
xref="paper",
yref="paper",
text=chisq,
showarrow=False,
font=font,
)
if outfile:
# Note: requires "pip install kaleido"
# Note: much slower than matplotlib
fig.write_image(outfile)
if show_fig:
# Try to guess whether we are in a jupyter notebook before deciding how
# to render the plot.
# TODO: gather all figures into one tab when rendering to the browser
import sys
jupyter = "ipykernel" in sys.modules
renderer = None if jupyter else "browser"
fig.show(renderer)
continue
# If not plotly then we must be using matplotlib.
# Note that during api.export our matplotlib backend is 'agg' so no plot will show.
fig = plt.figure(i + fignum)
f.plot(view=view)
# Make room for model name and chisq on the top of the plot
# TODO: attach margins to canvas resize_event so that margins are fixed
h, w = fig.get_size_inches()
h_ex = h * 72 / fontsize # (h in * 72 pt/in) / (fontsize pt/ex) = height in ex
text_offset = 0.5 / h_ex # 1/2 ex above and below the text
top = 1 - 2 / h_ex # leave 2 ex at the top of the figure
plt.subplots_adjust(top=top)
# Add model name and chisq
transform = fig.transFigure
x, y = text_offset, 1 - text_offset
ha, va = "left", "top"
fig.text(x, y, title, transform=transform, va=va, ha=ha, fontsize=fontsize)
x, y = 1 - text_offset, 1 - text_offset
ha, va = "right", "top"
fig.text(x, y, chisq, transform=transform, va=va, ha=ha, fontsize=fontsize)
# plt.suptitle("Model %d - %s" % (i, f.name))
# plt.text(0.01, 0.01, "chisq=%s" % fitness_chisq_str(f), transform=plt.gca().transAxes)
if outfile:
plt.savefig(outfile, format="png")
# Note: restore default behaviour of getstate/setstate rather than
# inheriting from BaseFitProblem
def __getstate__(self):
return self.__dict__
def __setstate__(self, state):
self.__dict__ = state
# TODO: consider adding nllf_scale to FitProblem.
ONE_SIGMA = 0.68268949213708585
def nllf_scale(dof: int, npars: int, norm: bool = True):
r"""
Return the scale factor for reporting the problem nllf as an approximate
normalized chisq, along with an associated "uncertainty". The uncertainty
is the amount that chisq must change in order for the fit to be
significantly better.
Parameters:
-----------
dof : int
Degrees of freedom (typically n - p where n is data points, p is parameters)
npars : int
Number of fitting parameters
norm : bool, optional
If True (default), normalize chisq by degrees of freedom
From Numerical Recipes 15.6: *Confidence Limits on Estimated Model
Parameters*, the $1-\sigma$ contour in parameter space corresponds
to $\Delta\chi^2 = \text{invCDF}(1-\sigma,k)$ where
$1-\sigma \approx 0.6827$ and $k$ is the number of fitting parameters.
If *norm* is True (default), then we need to normalize chisq by
the degrees of freedom. This allows us to assess fit quality as the
average squared error in each data point, which should be around 1.0
if the model and measurement uncertainties are correct.
"""
scale = dof if norm else 1
if scale <= 0 or np.isnan(scale) or np.isinf(scale):
return 1.0, 0.0
else:
npars = max(npars, 1)
return 2.0 / scale, chi2.ppf(ONE_SIGMA, npars) / scale
[docs]
def load_problem(path: Path | str, args: list[str] | None = None):
"""
Load a model file.
*path* contains the path to the model file. This could be a python script
or a previously saved problem, serialized as .json, .cloudpickle, .pickle or .dill
*args* are any additional arguments to the model. The sys.argv
variable will be set such that *sys.argv[1:] == model_options*.
"""
from .webview.server.state_hdf5_backed import SERIALIZER_EXTENSIONS, deserialize_problem_bytes
path = Path(path)
table = {f".{ext}": method for method, ext in SERIALIZER_EXTENSIONS.items()}
method = table.get(path.suffix, "script")
if method == "script":
problem = _load_script_from_path(path, args)
else:
# export saved data as binary with encoding utf-8
data = path.read_bytes()
problem = deserialize_problem_bytes(data, method)
# TODO: what is problem.path when we are deserializing from a session file?
problem.path = str(path.resolve())
if not getattr(problem, "name", None):
problem.name = path.stem
if not getattr(problem, "title", None):
problem.title = path.name
# Guard against the user changing parameters after defining the problem.
problem.model_reset()
return problem
def _load_script_from_path(path: Path | str, args: list[str] | None = None):
from .util import pushdir
from . import plugin
# Change to the target path before loading model so that data files
# can be given as relative paths in the model file. Add the directory
# to the python path (at the end) so that imports work as expected.
path = Path(path)
directory, filename = path.parent, path.name
with pushdir(directory):
# Try a specialized model loader
problem = plugin.load_model(filename)
if problem is None:
problem = _load_script(filename, options=args)
# Note: keeping problem.script_path separate from problem.path because
# the problem.path may be the result of deserializing the model.
problem.script_path = str(path.resolve())
problem.script_args = args
return problem
def _load_script(filename, options=None) -> FitProblem:
"""
Load a problem definition from a python script file.
sys.argv is set to ``[file] + options`` within the context of the script.
The user must define ``problem=FitProblem(...)`` within the script.
Raises ValueError if the script does not define problem.
Namespace for imports is `bumps.user`
"""
import re
from pathlib import Path
from hashlib import md5
from importlib.machinery import SourceFileLoader
from importlib.util import module_from_spec, spec_from_loader
script_path = Path(filename).resolve()
if options is None:
options = ()
# Turn filename into a python identifier
name = script_path.stem.split(".", 1)[0]
name = "_".join(name.split()) # convert whitespace
name = re.sub(r"[^a-zA-Z0-9_]", "_", name) # handle invalid characters
if name[0].isdigit(): # idnetifiers can't start with a digit
name = "_" + name
# Put all user scripts in the bumps.user namespace.
# They shouldn't conflict because they don't show up
# in sys.modules.
package = "bumps.user"
# Create the module for the script
fullname = f"{package}.{name}"
loader = SourceFileLoader(fullname, str(script_path))
spec = spec_from_loader(fullname, loader)
module = module_from_spec(spec)
# Execute the script
# TODO: Enable relative imports with ScriptFinder
old_argv = sys.argv
old_bytecode = sys.dont_write_bytecode
# meta_path_finder = ScriptFinder(script_path.parent, package)
try:
# sys.meta_path.insert(0, meta_path_finder)
sys.argv = [filename, *options]
sys.dont_write_bytecode = True # Suppress .pyc creation
loader.exec_module(module)
finally:
# sys.meta_path.pop(0)
sys.argv = old_argv
sys.dont_write_bytecode = old_bytecode
problem = getattr(module, "problem", None)
if problem is None:
raise ValueError(filename + " requires 'problem = FitProblem(...)'")
# # Capture the source code and any dependent libraries. On deserialize we
# # will need to preload the libs and stuff them in sys.modules before
# # calling dill. Be sure to remove them immediately so that the next load
# # from script will get the latest version (the user may have changed the
# # support libraries without having changed the script).
# # Note that we won't be able to rerun the script because we aren't capturing
# # the original datafiles, but we may want to show it to the user.
# script = script_path.read_text()
# context = dict(script=script, libs=meta_path_finder.sources, options=options)
return problem
# Note: Not currently used
class ScriptFinder:
"""
sys.meta_path finder allowing relative imports in scripts.
*script_dir* is the parent directory for the script file.
*package_name* is the module namespace for the script.
*sources* contains {fullname: source_code} for all the supporting
modules in the script directory. Use these to deserialize the
saved problem.
"""
def __init__(self, script_dir, package_name):
from importlib.machinery import ModuleSpec
self._path = script_dir
self._package = package_name + "."
self._parent_spec = ModuleSpec(package_name, None, is_package=True)
self.sources = {}
def find_spec(self, fullname, path, target=None):
from importlib.machinery import SourceFileLoader
from importlib.util import spec_from_loader
if fullname == self._package[:-1]:
# print(f'import looking for {fullname}')
return self._parent_spec
if fullname.startswith(self._package):
# print(f'import looking for {fullname}')
module_name = fullname.split(".")[-1]
module_path = self._path / f"{module_name}.py"
if module_path.exists():
loader = SourceFileLoader(fullname, str(module_path))
spec = spec_from_loader(fullname, loader)
# It is inefficient to load this twice, but the import
# hook machinery is too complicated to capture the source
# when it is loaded. Similarly for suppressing the .pyc
# file creation.
self.sources[fullname] = module_path.read_text()
return spec
return None
def MultiFitProblem(*args, **kwargs) -> FitProblem:
warnings.warn(DeprecationWarning("use FitProblem directly instead of MultiFitProblem"))
return FitProblem(*args, **kwargs)
def test_weighting_and_priors():
class SimpleFitness(Fitness):
def __init__(self, a=0.0, name="fit"):
self.a = parameter.Parameter.default(a, name=name + " a")
def parameters(self):
return {"a": self.a}
def numpoints(self):
return 1
def residuals(self):
y, dy = 0, 1 # fit 0 +/- 1 to a constant
return np.array([(self.a.value - y) / dy])
def nllf(self):
return sum(r**2 for r in self.residuals()) / 2
weights = 2, 3
M0, M1 = SimpleFitness(4.0, name="M0"), SimpleFitness(5.0, name="M1")
problem = FitProblem((M0, M1), weights=weights)
# Need to use problem.models to cycle through models in case FreeVariables is used in problem
assert (problem.residuals() == np.hstack([w * M.residuals() for w, M in zip(weights, problem.models)])).all()
assert problem.nllf() == sum(w**2 * M.nllf() for w, M in zip(weights, problem.models))
assert problem.nllf() == sum(problem.residuals() ** 2) / 2
# Test priors: constraint on expression
M0.a.range(-10, 10) # Set M0.a = 4 to be in bounds
# TODO: should setting equals() clear the constraints?
# If we set a constraint then assign an expression the constraint stays on
# the expression. The other order fails because the expression is considered
# "fixed" and can't accept constraints.
M1.a.range(0, 1) # Set M1.a to be out of bounds
M1.a.equals(M0.a * 2) # Set M1.a = 2*M0.a = 4*2 = 8
problem.model_reset()
# print(f"{problem._parameters=}, {problem._priors=} {problem._dof=}")
assert problem._parameters == [M0.a] # only M0.a is fitted
assert problem._priors == [M0.a, M1.a] # both M0.a and M1.a are bounded
assert problem._dof == 1
nllf, failing = problem.parameter_nllf()
assert np.isinf(nllf)
assert failing == [str(M1.a)]
M1.a.unlink()
M1.a.dev(mean=0, std=1) # Set M1.a to be in bounds but with a cost
M1.a.equals(M0.a * 2) # Set M1.a = 2*M0.a = 4*2 = 8
problem.model_reset()
# print(f"{problem._parameters=}, {problem._priors=} {problem._dof=}")
assert problem._parameters == [M0.a] # only M0.a is fitted
assert problem._priors == [M0.a, M1.a] # both M0.a and M1.a are bounded
# DOF is 2 because we have two data points plus one prior minus one fit parameter
assert problem._dof == 2
nllf, failing = problem.parameter_nllf()
assert nllf == 32 # (8 - 0)**2/(2 * 1)
assert failing == []
if __name__ == "__main__":
test_weighting_and_priors()