#!/usr/bin/python
# -*- coding: utf-8 -*-
##
# abstract_model.py: Abstract interfaces for models with different levels of
# functionality.
##
# © 2012 Chris Ferrie (csferrie@gmail.com) and
# Christopher E. Granade (cgranade@gmail.com)
#
# This file is a part of the Qinfer project.
# Licensed under the AGPL version 3.
##
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
##
## FEATURES ##################################################################
from __future__ import absolute_import
from __future__ import division, unicode_literals
## EXPORTS ###################################################################
__all__ = [
'Simulatable',
'Model',
'FiniteOutcomeModel',
'DifferentiableModel'
]
## IMPORTS ###################################################################
from builtins import range, map
from future.utils import with_metaclass
import abc
# Python standard library package for specifying abstract classes.
import numpy as np
import warnings
from qinfer.utils import safe_shape
from qinfer.domains import IntegerDomain
## CLASSES ###################################################################
[docs]class Simulatable(with_metaclass(abc.ABCMeta, object)):
r"""
Represents a system which can be simulated according to
various model parameters and experimental control parameters
in order to produce representative data.
See :ref:`models_guide` for more details.
:param bool allow_identical_outcomes: Whether the method ``outcomes`` should
be allowed to return multiple identical outcomes for a given ``expparam``.
It will be more efficient to set this to ``True`` whenever it is likely
that multiple identical outcomes will occur.
"""
def __init__(self):
"""
:param bool always_resample_outcomes: Resample outcomes stochastically with
each outcome call.
:param :class:`~numpy.ndarray` initial_outcomes: Initial set of outcomes
that may be supplied. Otherwise initial outcomes default to
zeros.
"""
self._sim_count = 0
# Initialize a default scale matrix.
self._Q = np.ones((self.n_modelparams,))
## ABSTRACT PROPERTIES ##
@abc.abstractproperty
def n_modelparams(self):
"""
Returns the number of real model parameters admitted by this model.
This property is assumed by inference engines to be constant for
the lifetime of a :class:`Model` instance.
"""
pass
@abc.abstractproperty
def expparams_dtype(self):
"""
Returns the dtype of an experiment parameter array. For a
model with single-parameter control, this will likely be a scalar dtype,
such as ``"float64"``. More generally, this can be an example of a
record type, such as ``[('time', py.'float64'), ('axis', 'uint8')]``.
This property is assumed by inference engines to be constant for
the lifetime of a Model instance.
"""
pass
## CONCRETE PROPERTIES ##
@property
def is_n_outcomes_constant(self):
"""
Returns ``True`` if and only if both the domain and ``n_outcomes``
are independent of the expparam.
This property is assumed by inference engines to be constant for
the lifetime of a Model instance.
"""
return True
@property
def model_chain(self):
"""
Returns a tuple of models upon which this model is based,
such that properties and methods of underlying models for
models that decorate other models can be accessed. For a
standalone model, this is always the empty tuple.
"""
return ()
@property
def base_model(self):
"""
Returns the most basic model that this model depends on.
For standalone models, this property satisfies ``model.base_model is model``.
"""
return self
@property
def underlying_model(self):
"""
Returns the model that this model is based on (decorates)
if such a model exists, or ``None`` if this model is
independent.
"""
return self.model_chain[-1] if self.model_chain else None
@property
def sim_count(self):
"""
Returns the number of data samples that have been produced by
this simulator.
:rtype: int
"""
return self._sim_count
@property
def Q(self):
r"""
Returns the diagonal of the scale matrix :math:`\matr{Q}` that
relates the scales of each of the model parameters. In particular,
the quadratic loss for this Model is defined as:
.. math::
L_{\matr{Q}}(\vec{x}, \hat{\vec{x}}) =
(\vec{x} - \hat{\vec{x}})^\T \matr{Q} (\vec{x} - \hat{\vec{x}})
If a subclass does not explicitly define the scale matrix, it is taken
to be the identity matrix of appropriate dimension.
:return: The diagonal elements of :math:`\matr{Q}`.
:rtype: :class:`~numpy.ndarray` of shape ``(n_modelparams, )``.
"""
return self._Q
@property
def modelparam_names(self):
"""
Returns the names of the various model parameters admitted by this
model, formatted as LaTeX strings.
"""
return list(map("x_{{{}}}".format, range(self.n_modelparams)))
## CONCRETE METHODS ##
def _repr_html_(self, suppress_base=False):
s = r"""
<strong>{type.__name__}</strong> at 0x{id:0x}: {n_mp} model parameter{plural}
""".format(
id=id(self), type=type(self),
n_mp=self.n_modelparams,
plural="" if self.n_modelparams == 1 else "s"
)
if not suppress_base and self.model_chain:
s += r"""<br>
<p>Model chain:</p>
<ul>{}
</ul>
""".format(r"\n".join(
u"<li>{}</li>".format(model._repr_html_(suppress_base=True))
for model in reversed(self.model_chain)
))
return s
[docs] def are_expparam_dtypes_consistent(self, expparams):
"""
Returns ``True`` iff all of the given expparams
correspond to outcome domains with the same dtype.
For efficiency, concrete subclasses should override this method
if the result is always ``True``.
:param np.ndarray expparams: Array of expparamms
of type ``expparams_dtype``
:rtype: ``bool``
"""
if self.is_n_outcomes_constant:
# This implies that all domains are equal, so this must be true
return True
# otherwise we have to actually check all the dtypes
if expparams.size > 0:
domains = self.domain(expparams)
first_dtype = domains[0].dtype
return all(domain.dtype == first_dtype for domain in domains[1:])
else:
return True
## ABSTRACT METHODS ##
@abc.abstractmethod
[docs] def n_outcomes(self, expparams):
"""
Returns an array of dtype ``uint`` describing the number of outcomes
for each experiment specified by ``expparams``.
If the number of outcomes does not depend on expparams (i.e.
``is_n_outcomes_constant`` is ``True``), this method
should return a single number.
If there are an infinite (or intractibly large) number of outcomes,
this value specifies the number of outcomes to randomly sample.
:param numpy.ndarray expparams: Array of experimental parameters. This
array must be of dtype agreeing with the ``expparams_dtype``
property.
"""
pass
@abc.abstractmethod
[docs] def domain(self, exparams):
"""
Returns a list of :class:`Domain` objects, one for each input expparam.
:param numpy.ndarray expparams: Array of experimental parameters. This
array must be of dtype agreeing with the ``expparams_dtype``
property, or, in the case where ``n_outcomes_constant`` is ``True``,
``None`` should be a valid input.
:rtype: list of :class:`Domain`
"""
pass
@abc.abstractmethod
[docs] def are_models_valid(self, modelparams):
"""
Given a shape ``(n_models, n_modelparams)`` array of model parameters,
returns a boolean array of shape ``(n_models)`` specifying whether
each set of model parameters represents is valid under this model.
"""
pass
@abc.abstractmethod
[docs] def simulate_experiment(self, modelparams, expparams, repeat=1):
"""
Produces data according to the given model parameters and experimental
parameters, structured as a NumPy array.
:param np.ndarray modelparams: A shape ``(n_models, n_modelparams)``
array of model parameter vectors describing the hypotheses under
which data should be simulated.
:param np.ndarray expparams: A shape ``(n_experiments, )`` array of
experimental control settings, with ``dtype`` given by
:attr:`~qinfer.Model.expparams_dtype`, describing the
experiments whose outcomes should be simulated.
:param int repeat: How many times the specified experiment should
be repeated.
:rtype: np.ndarray
:return: A three-index tensor ``data[i, j, k]``, where ``i`` is the repetition,
``j`` indexes which vector of model parameters was used, and where
``k`` indexes which experimental parameters where used. If ``repeat == 1``,
``len(modelparams) == 1`` and ``len(expparams) == 1``, then a scalar
datum is returned instead.
"""
self._sim_count += modelparams.shape[0] * expparams.shape[0] * repeat
assert(self.are_expparam_dtypes_consistent(expparams))
## CONCRETE METHODS ##
[docs] def clear_cache(self):
"""
Tells the model to clear any internal caches used in computing
likelihoods and drawing samples. Calling this method should not cause
any different results, but should only affect performance.
"""
# By default, no cache to clear.
pass
[docs] def experiment_cost(self, expparams):
"""
Given an array of experimental parameters, returns the cost associated
with performing each experiment. By default, this cost is constant
(one) for every experiment.
:param expparams: An array of experimental parameters for which the cost
is to be evaluated.
:type expparams: :class:`~numpy.ndarray` of ``dtype`` given by
:attr:`~Model.expparams_dtype`
:return: An array of costs corresponding to the specified experiments.
:rtype: :class:`~numpy.ndarray` of ``dtype`` ``float`` and of the
same shape as ``expparams``.
"""
return np.ones(expparams.shape)
[docs] def distance(self, a, b):
r"""
Gives the distance between two model parameter vectors :math:`\vec{a}` and
:math:`\vec{b}`. By default, this is the vector 1-norm of the difference
:math:`\mathbf{Q} (\vec{a} - \vec{b})` rescaled by
:attr:`~Model.Q`.
:param np.ndarray a: Array of model parameter vectors having shape
``(n_models, n_modelparams)``.
:param np.ndarray b: Array of model parameters to compare to, having
the same shape as ``a``.
:return: An array ``d`` of distances ``d[i]`` between ``a[i, :]`` and
``b[i, :]``.
"""
return np.apply_along_axis(
lambda vec: np.linalg.norm(vec, 1),
1,
self.Q * (a - b)
)
[docs] def update_timestep(self, modelparams, expparams):
r"""
Returns a set of model parameter vectors that is the update of an
input set of model parameter vectors, such that the new models are
conditioned on a particular experiment having been performed.
By default, this is the trivial function
:math:`\vec{x}(t_{k+1}) = \vec{x}(t_k)`.
:param np.ndarray modelparams: Set of model parameter vectors to be
updated.
:param np.ndarray expparams: An experiment parameter array describing
the experiment that was just performed.
:return np.ndarray: Array of shape
``(n_models, n_modelparams, n_experiments)`` describing the update
of each model according to each experiment.
"""
return np.tile(modelparams, (expparams.shape[0],1,1)).transpose((1,2,0))
[docs] def canonicalize(self, modelparams):
r"""
Returns a canonical set of model parameters corresponding to a given
possibly non-canonical set. This is used for models in which there
exist model parameters :math:`\vec{x}_i` and :\math:`\vec{x}_j` such
that
.. math::
\Pr(d | \vec{x}_i; \vec{e}) = \Pr(d | \vec{x}_j; \vec{e})
for all outcomes :math:`d` and experiments :math:`\vec{e}`. For
models admitting such an ambiguity, this
method should then be overridden to return a consistent choice
out of such vectors, hence avoiding supurious model degeneracies.
Note that, by default, :class:`~qinfer.smc.SMCUpdater` will *not*
call this method.
"""
return modelparams
[docs]class Model(Simulatable):
"""
Represents a system which can be simulated according to
various model parameters and experimental control parameters
in order to produce the probability of a hypothetical data
record. As opposed to :class:`~qinfer.Simulatable`, instances
of :class:`~qinfer.Model` not only produce data consistent
with the description of a system, but also evaluate the probability
of that data arising from the system.
:param bool allow_identical_outcomes: Whether the method
``representative_outcomes`` should be allowed to return multiple
identical outcomes for a given ``expparam``.
:param float outcome_warning_threshold: Threshold value below which
``representative_outcomes``
will issue a warning about the representative outcomes
not adequately covering the domain with respect to
the relevant distribution.
See :ref:`models_guide` for more details.
"""
## INITIALIZERS ##
def __init__(self, allow_identical_outcomes=False,
outcome_warning_threshold=0.99):
super(Model, self).__init__()
self._call_count = 0
self._allow_identical_outcomes = allow_identical_outcomes
self._outcome_warning_threshold = outcome_warning_threshold
## CONCRETE PROPERTIES ##
@property
def call_count(self):
"""
Returns the number of points at which
the probability of this model has been evaluated,
where a point consists of a hypothesis about the model (a vector of
model parameters), an experimental control setting (expparams) and
a hypothetical or actual datum.
:rtype: int
"""
return self._call_count
## ABSTRACT METHODS ##
@abc.abstractmethod
[docs] def likelihood(self, outcomes, modelparams, expparams):
r"""
Calculates the probability of each given outcome, conditioned on each
given model parameter vector and each given experimental control setting.
:param np.ndarray modelparams: A shape ``(n_models, n_modelparams)``
array of model parameter vectors describing the hypotheses for
which the likelihood function is to be calculated.
:param np.ndarray expparams: A shape ``(n_experiments, )`` array of
experimental control settings, with ``dtype`` given by
:attr:`~qinfer.Simulatable.expparams_dtype`, describing the
experiments from which the given outcomes were drawn.
:rtype: np.ndarray
:return: A three-index tensor ``L[i, j, k]``, where ``i`` is the outcome
being considered, ``j`` indexes which vector of model parameters was used,
and where ``k`` indexes which experimental parameters where used.
Each element ``L[i, j, k]`` then corresponds to the likelihood
:math:`\Pr(d_i | \vec{x}_j; e_k)`.
"""
# Count the number of times the inner-most loop is called.
self._call_count += (
safe_shape(outcomes) * safe_shape(modelparams) * safe_shape(expparams)
)
@property
def allow_identical_outcomes(self):
"""
Whether the method ``representative_outcomes`` should be allowed to return multiple
identical outcomes for a given ``expparam``.
It will be more efficient to set this to ``True`` whenever it is likely
that multiple identical outcomes will occur.
:return: Flag state.
:rtype: ``bool``
"""
return self._allow_identical_outcomes
@allow_identical_outcomes.setter
def allow_identical_outcomes(self, value):
"""
Whether the method ``representative_outcomes`` should be allowed to return multiple
identical outcomes for a given ``expparam``.
It will be more efficient to set this to ``True`` whenever it is likely
that multiple identical outcomes will occur.
:param bool allow_identical_outcomes: Value of flag.
"""
self._allow_identical_outcomes = value
@property
def outcome_warning_threshold(self):
"""
Threshold value below which ``representative_outcomes``
will issue a warning about the representative outcomes
not adequately covering the domain with respect to
the relevant distribution.
:return: Threshold value.
:rtype: ``float``
"""
return self._outcome_warning_threshold
@outcome_warning_threshold.setter
def outcome_warning_threshold(self, value):
"""
Threshold value below which ``representative_outcomes``
will issue a warning about the representative outcomes
not adequately covering the domain with respect to
the relevant distribution.
:param float value: Threshold value.
"""
self._outcome_warning_threshold = value
## CONCRETE METHODS ##
# These methods depend on the abstract methods, and thus their behaviors
# change in each inheriting class.
[docs] def is_model_valid(self, modelparams):
"""
Returns True if and only if the model parameters given are valid for
this model.
"""
return self.are_models_valid(modelparams[np.newaxis, :])[0]
class LinearCostModelMixin(Model):
# FIXME: move this mixin to a new module.
# TODO: test this mixin.
"""
This mixin implements :meth:`Model.experiment_cost` by setting the
cost of an experiment equal to the value of a given field of each
``expparams`` element (by default, ``t``).
"""
_field = "t"
def experiment_cost(self, expparams):
return expparams[self._field]
[docs]class FiniteOutcomeModel(Model):
"""
Represents a system in the same way that :class:`~qinfer.Model`,
except that it is demanded that the number of outcomes for any
experiment be known and finite.
:param bool allow_identical_outcomes: Whether the method
``representative_outcomes`` should be allowed to return multiple
identical outcomes for a given ``expparam``.
:param float outcome_warning_threshold: Threshold value below which
``representative_outcomes``
will issue a warning about the representative outcomes
not adequately covering the domain with respect to
the relevant distribution.
:param int n_outcomes_cutoff: If ``n_outcomes`` exceeds this value,
``representative_outcomes`` will use this
value in its place. This is useful in the case
of a finite yet untractible number of outcomes. Use ``None``
for no cutoff.
See :class:`~qinfer.Model` and :ref:`models_guide` for more details.
"""
## INITIALIZERS ##
def __init__(self,
allow_identical_outcomes=False,
outcome_warning_threshold=0.99,
n_outcomes_cutoff=None
):
super(FiniteOutcomeModel, self).__init__(
outcome_warning_threshold=outcome_warning_threshold,
allow_identical_outcomes=allow_identical_outcomes)
self._n_outcomes_cutoff = n_outcomes_cutoff
if self.is_n_outcomes_constant:
# predefine if we can
self._domain = IntegerDomain(min=0,max=self.n_outcomes(None)-1)
## CONCRETE PROPERTIES ##
@property
def n_outcomes_cutoff(self):
"""
If ``n_outcomes`` exceeds this value for
some expparm, ``representative_outcomes`` will use this
value in its place. This is useful in the case
of a finite yet untractible number of outcomes.
:return: Cutoff value.
:rtype: ``int``
"""
return self._n_outcomes_cutoff
@n_outcomes_cutoff.setter
def n_outcomes_cutoff(self, value):
"""
If ``n_outcomes`` exceeds this value,
``representative_outcomes`` will use this
value in its place. This is useful in the case
of a finite yet untractible number of outcomes.
:param int value: Cutoff value.
"""
self.n_outcomes_cutoff = value
## ABSTRACT METHODS ##
## CONCRETE METHODS ##
# These methods depend on the abstract methods, and thus their behaviors
# change in each inheriting class.
[docs] def domain(self, expparams):
"""
Returns a list of :class:`Domain` objects, one for each input expparam.
:param numpy.ndarray expparams: Array of experimental parameters. This
array must be of dtype agreeing with the ``expparams_dtype``
property, or, in the case where ``n_outcomes_constant`` is ``True``,
``None`` should be a valid input.
:rtype: list of ``Domain``
"""
# As a convenience to most users, we define domain for them. If a
# fancier domain is desired, this method can easily be overridden.
if self.is_n_outcomes_constant:
return self._domain if expparams is None else [self._domain for ep in expparams]
else:
return [IntegerDomain(min=0,max=n_o-1) for n_o in self.n_outcomes(expparams)]
[docs] def simulate_experiment(self, modelparams, expparams, repeat=1):
# Call the superclass simulate_experiment, not recording the result.
# This is used to count simulation calls.
super(FiniteOutcomeModel, self).simulate_experiment(modelparams, expparams, repeat)
if self.is_n_outcomes_constant:
# In this case, all expparams have the same domain
all_outcomes = self.domain(None).values
probabilities = self.likelihood(all_outcomes, modelparams, expparams)
cdf = np.cumsum(probabilities, axis=0)
randnum = np.random.random((repeat, 1, modelparams.shape[0], expparams.shape[0]))
outcome_idxs = all_outcomes[np.argmax(cdf > randnum, axis=1)]
outcomes = all_outcomes[outcome_idxs]
else:
# Loop over each experiment, sadly.
# Assume all domains have the same dtype
assert(self.are_expparam_dtypes_consistent(expparams))
dtype = self.domain(expparams[0, np.newaxis])[0].dtype
outcomes = np.empty((repeat, modelparams.shape[0], expparams.shape[0]), dtype=dtype)
for idx_experiment, single_expparams in enumerate(expparams[:, np.newaxis]):
all_outcomes = self.domain(single_expparams).values
probabilities = self.likelihood(all_outcomes, modelparams, single_expparams)
cdf = np.cumsum(probabilities, axis=0)[..., 0]
randnum = np.random.random((repeat, 1, modelparams.shape[0]))
outcomes[:, :, idx_experiment] = all_outcomes[np.argmax(cdf > randnum, axis=1)]
return outcomes[0, 0, 0] if repeat == 1 and expparams.shape[0] == 1 and modelparams.shape[0] == 1 else outcomes
## STATIC METHODS ##
# These methods are provided as a convienence to make it easier to write
# simple models.
@staticmethod
[docs] def pr0_to_likelihood_array(outcomes, pr0):
"""
Assuming a two-outcome measurement with probabilities given by the
array ``pr0``, returns an array of the form expected to be returned by
``likelihood`` method.
:param numpy.ndarray outcomes: Array of integers indexing outcomes.
:param numpy.ndarray pr0: Array of shape ``(n_models, n_experiments)``
describing the probability of obtaining outcome ``0`` from each
set of model parameters and experiment parameters.
"""
pr0 = pr0[np.newaxis, ...]
pr1 = 1 - pr0
if len(np.shape(outcomes)) == 0:
outcomes = np.array(outcomes)[None]
return np.concatenate([
pr0 if outcomes[idx] == 0 else pr1
for idx in range(safe_shape(outcomes))
])
[docs]class DifferentiableModel(with_metaclass(abc.ABCMeta, Model)):
@abc.abstractmethod
[docs] def score(self, outcomes, modelparams, expparams, return_L=False):
r"""
Returns the score of this likelihood function, defined as:
.. math::
q(d, \vec{x}; \vec{e}) = \vec{\nabla}_{\vec{x}} \log \Pr(d | \vec{x}; \vec{e}).
Calls are represented as a four-index tensor
``score[idx_modelparam, idx_outcome, idx_model, idx_experiment]``.
The left-most index may be suppressed for single-parameter models.
If return_L is True, both `q` and the likelihood `L` are returned as `q, L`.
"""
pass