from abc import ABCMeta, abstractmethod
import math
import sys
import types
from sympy.core.cache import clear_cache as sympy_clear_cache
import numpy as np
from parampy import Parameters
from .operator import Operator
from .stateoperator import StateOperator
[docs]class Integrator(object):
'''
`Integrator` instances perform a numerical integration on arbitrary initial states
using `StateOperator` objects to describe the instantaneous derivative. It is itself
an abstract class, which must be subclassed. This allows the separation of logic from
actual integration machinery.
:param identifier: An object to identify this integrator from others. Can be left unspecified.
:type identifier: object
:param initial: A sequence of states/ensembles to use as initial states in the integration.
:type initial: list/tuple of numpy.arrays
:param t_offset: Normally integration starts from t=0. Use this to specify a time offset. Can
be any value understood by a `Parameters` instance.
:type t_offset: object
:param parameters: A `Parameters` instance for `Integrator` to use.
:type parameters: Parameters or None
:param params: Parameter overrides to use during when evaluating `StateOperator` objects.
:type params: dict
:param error_rel: The maximum relative error allowable.
:type error_rel: float
:param error_abs: The maximum absolute error allowable.
:type error_abs: float
:param time_ops: A dictionary of `StateOperator` objects to be applied at the time indicated
by their index (which can be any object understood by `Parameters`).
:type time_ops: dict
:param progress: `True` if progress should be shown using the fallback callback, `False` if not, or
and `IntegratorCallback` instance. This is used to report integrator progress.
:type progress: bool or IntegratorCallback
:param kwargs: Additional keyword arguments to pass to the `_integrator` method.
Subclassing Integrator:
Subclasses of `Integrator` must implement the following methods:
- _integrator(self, f, **kwargs)
- _integrate(self, integrator, initial, times=None, **kwargs)
- _derivative(self, t, y, dim)
- _state_internal2ode(self, state)
- _state_ode2internal(self, state, dimensions)
The documentation for these methods is available using:
>>> help(Integrator.<name of function>)
Their documentation will not appear in a complete API listing because
they are private methods.
'''
__metaclass__ = ABCMeta
def __init__(self,
identifier=None,
initial=None,
t_offset=0,
operators=None,
parameters=None,
params={},
error_rel=1e-8,
error_abs=1e-8,
time_ops={},
progress=False,
**kwargs):
self.identifier = identifier
# Set up initial conditions
self.t_offset = t_offset
self.initial = initial
# Parameter Object
self.p = parameters
# Set solver and tolerances
self.error_abs = error_abs
self.error_rel = error_rel
# Set up integration operator
self.operators = operators
self.params = params
# Set up intermediate pulse operators
self.time_ops = time_ops
# Set up results cache
self.results = None
self.progress_callback = progress
self.int_kwargs = kwargs
@property
def p(self):
'''
A reference to the `Parameters` instance used by this object. This reference can be updated
using:
>>> integrator.p = <Parameters instance>
'''
if self.__p is None:
raise ValueError("Parameters instance required by Integrator object, but a Parameters object has not been configured.")
return self.__p
@p.setter
def p(self, parameters):
if parameters is not None and not isinstance(parameters, Parameters):
raise ValueError("Parameters reference must be an instance of Parameters or None.")
self.__p = parameters
########### CONFIGURATION ##############################################
@property
def initial(self):
'''
The initial state. Ignored in `Integrator.extend`. Can be set using:
>>> integrator.initial = <list of states>
Each state will be converted to a complex numpy array if it is not already.
'''
return self.__initial
@initial.setter
def initial(self, initial):
self.__initial = map(lambda state: np.array(state if not isinstance(state,Operator) else state(t=self.t_offset), dtype='complex'),initial)
if len(initial) > 0:
shapes = map(lambda x: x.shape, self.__initial)
if not all([shape == shapes[0] for shape in shapes]):
raise ValueError("All y_0s must have the same dimension.")
@property
def t_offset(self):
'''
The initial time to use in the integration. Ignored in `Integrator.extend`.
Can be set using:
>>> integrator.t_offset = <time>
'''
return self.__t_offset
@t_offset.setter
def t_offset(self, t_offset):
self.__t_offset = t_offset
@property
def error_rel(self):
'''
The maximum relative error permitted in the integrator. This can be set using:
>>> integrator.error_rel = <float>
'''
return self.__error_rel
@error_rel.setter
def error_rel(self, error_rel):
self.__error_rel = error_rel
@property
def error_abs(self):
'''
The maximum absolute error permitted in the integrator. This can be set using:
>>> integrator.error_abs = <float>
'''
return self.__error_abs
@error_abs.setter
def error_abs(self, error_abs):
self.__error_abs = error_abs
@property
def results(self):
'''
The currently stored results. Used to continue integration in `Integrator.extend`. While
it is possible to overwrite the results, the new value is not checked, and care should be taken.
>>> integrator.results = <valid results object>
'''
return self.__results
@results.setter
def results(self, results):
self.__results = results
[docs] def reset(self):
'''
This method resets the internal stored `Integrator.results` to `None`, effectively resetting
the `Integrator` object to its pre-integration status.
>>> integrator.reset()
'''
self.results = None
@property
def time_ops(self):
'''
A reference to the dictionary of time operators. Can be updated directly by adding to the dictionary, or
(much more safely) using:
>>> self.time_ops = {'T': <StateOperator>, ...}
The above is shorthand for:
>>> for time, time_op in {'T': <StateOperator>, ...}:
self.add_time_op(time, time_op)
'''
return self.__time_ops
@time_ops.setter
def time_ops(self, time_ops):
self.__time_ops = {}
for time, time_op in time_ops.items():
self.add_time_op(time, time_op)
[docs] def add_time_op(self, time, time_op):
'''
This method adds a time operator `time_op` at time `time`. Note that there can only be
one time operator for any given time. A "time operator" is just a `StateOperator` that
will be applied at a particular time. Useful in constructing ideal pulse sequences.
:param time: Time can be either a float or object interpretable by a `Parameters` instance.
:type time: object
:param time_op: The `StateOperator` instance to be applied at time `time`.
:type time_op: StateOperator
'''
if not isinstance(time_op, StateOperator):
raise ValueError("Time operator must be an instance of State Operator.")
self.__time_ops[time] = time_op
[docs] def get_time_ops(self, indices=None):
'''
This method returns the "time operators" of `Integrator.time_ops` restricted to
the indicies specified (using `StateOperator.restrict`).
This is used internally by `Integrator` to optimise the integration
process (by restricting integration to the indices which could possibly
affect the state).
:param indicies: A sequence of basis indices.
:type indicies: interable of int
'''
time_ops = {}
for time, op in self.time_ops.items():
time = float(self.p('t', t=time)) # ,**self.get_op_params()
if time in time_ops:
raise ValueError("Timed operators clash. Consider merging them.")
if indices is None:
time_ops[time] = op.collapse('t') # ,**self.get_op_params()
else:
time_ops[time] = op.restrict(*indices).collapse('t') # ,**self.get_op_params()
return time_ops
@property
def operators(self):
'''
A reference to the list of operators (each of which is a `StateOperator`) used internally.
To add operators you can directly add to this list, or use (much safer):
>>> integrator.operators = [<StateOperator>, <StateOperator>, ...]
Or, alternatively:
>>> integrator.add_operator( <StateOperator> )
'''
return self.__operators
@operators.setter
def operators(self, operators):
self.__operators = []
for operator in operators:
self.add_operator(operator)
[docs] def add_operator(self, operator):
'''
This method appends the provided `StateOperator` to the list of operators
to be used during integrations.
:param operator: The operator to add to the list of operators contributing
to the instantaneous derivative.
:type operator: StateOperator
'''
if not isinstance(operator, StateOperator):
raise ValueError("Operator must be an instance of State Operator.")
try:
del self.__operators_linear
except:
pass
self.__operators.append(operator)
[docs] def get_operators(self, indices=None):
'''
This method returns the operators of `Integrator.operators` restricted to
the indicies specified (using `StateOperator.restrict`).
This is used internally by `Integrator` to optimise the integration
process (by restricting integration to the indices which could possibly
affect the state).
:param indicies: A sequence of basis indices.
:type indicies: interable of int
'''
if indices is None:
return self.operators
operators = []
for operator in self.operators:
operators.append(operator.restrict(*indices).collapse('t')) # ,**self.get_op_params()
return operators
@property
def operators_linear(self):
'''
`True` if all provided operators are linear, and `False` otherwise.
Mainly used by `Integrator` subclasses to optimise their algorithms.
'''
try:
return self.__operators_linear
except:
self.__operators_linear = all(map(lambda x: x.is_linear, self.operators))
return self.__operators_linear
@property
def params(self):
'''
A reference to the parameter overrides to be used by the `StateOperator` objects
used by `Integrator`. The parameter overrides can be set using:
>>> integrator.params = { .... }
See `parampy.Parameters` for more information about parameter overrides.
'''
return self.__operator_params
@params.setter
def params(self, params):
self.__operator_params = params
@property
def progress_callback(self):
'''
The currently set progress callback. This can be `True`, in which case the default
fallback callback is used; `False`, in which case the callback is disabled; or a
manually created instance of `IntegratorCallback`. To retrieve the `IntegratorCallback`
that will be used (including the fallback), use `Integrator.get_progress_callback`.
The progress callback instance can be set using:
>>> integrator.progress_callback = <True, False, or IntegratorCallback instance>
'''
return self.__progress_callback
@progress_callback.setter
def progress_callback(self, progress):
if not isinstance(progress, IntegratorCallback) and not type(progress) == bool:
raise ValueError("Invalid type '%s' for progress_callback." % type(progress))
self.__progress_callback = progress
[docs] def get_progress_callback(self):
'''
This method returns the `IntegratorCallback` object that will be used by `Integrator`. Note that
if a callback has not been specified, and `Integrator.progress_callback` is `False`,
then an impotent `IntegratorCallback` object is returned, which has methods that do
nothing when called.
'''
if isinstance(self.progress_callback, IntegratorCallback):
return self.progress_callback
return ProgressBarCallback() if self.progress_callback else IntegratorCallback()
@property
def int_kwargs(self):
'''
A reference to the dictionary of extra keyword arguments to pass to the
`_integrator` initialisation method; which in turn can use these keyword
arguments to initialise the integration.
'''
return self.__int_kwargs
@int_kwargs.setter
def int_kwargs(self, kwargs):
self.__int_kwargs = kwargs
########## USER METHODS ################################################
#
# Start integration and cache results
[docs] def start(self, times=None, **kwargs):
'''
This method starts an integration with returned states for the times
of interest specified. Any additional keyword arguments are passed to the
`_integrate` method. See the documentation for your `Integrator` instance
for more information.
:param times: A sequence of times, which can be any objected understood by `Parameters`.
:type times: iterable
:param kwargs: Additional keyword arguments to send to the integrator.
:type kwargs: dict
'''
self.results, return_results = self.__integrate(self.initial, times=times, t_offset=self.t_offset, kwargs=kwargs)
return return_results
#
# Continue last integration
[docs] def extend(self, times=None, **kwargs):
'''
This method extends an integration with returned states for the times
of interest specified. This method requires that `Integrator.start` has
already been called at least once, and that at least some of the times in `times`
are after the latest times already integrated. Any previous times are ignored.
Any additional keyword arguments are passed to the
`_integrate` method. See the documentation for your `Integrator` instance
for more information.
:param times: A sequence of times, which can be any objected understood by `Parameters`.
Should all be larger than the last time of the previous results.
:type times: iterable
:param kwargs: Additional keyword arguments to send to the integrator.
:type kwargs: dict
'''
if self.results is None:
raise RuntimeError("No previous results to extend. Aborting!")
t_offset = self.results[0][-1][0]
current_states = []
for result in self.results:
current_states.append(result[-1][1])
results, return_results = self.__integrate(current_states, times=times, t_offset=t_offset, kwargs=kwargs)
for i, result in enumerate(results):
for j, tuple in enumerate(result):
if j > 0:
self.results[i].append((tuple[0] + t_offset, tuple[1]))
return return_results
########## INTEGRATION #################################################
@abstractmethod
def _integrator(self, f, **kwargs):
'''
This method should return the object(s) necessary to perform
the integration step. `f` is the the function which will return
the derivative at each step.
:param f: A function with signature f(t,y) which returns the derivative
at time `t` for the state `y`. Note that the derivative that is returned
is that of `_derivative`, but `f` also handles progress reporting.
:type f: function
:param kwargs: Any additional keyword arguments passed to the `Integrator`
constructor.
:type kwargs: dict
'''
pass
@abstractmethod
def _integrate(self, integrator, initial, times=None, **kwargs):
'''
This method should perform the integration using `integrator`, and
return a list of two-tuples, each containing
a time and a corresponding state. The times should be those listed in times,
which will have been processed into floats.
:param integrator: Whichever value was returned from `_integrator`.
:type integrator: object
:param initial: The state at which to start integrating. Will be the type
returned by `_state_internal2ode`.
:type initial: object
:param times: A sequence of times for which to return the state.
:type times: list of float
:param kwargs: Additional keyword arguments passed to `Integrator.start`
and/or `Integrator.extend`.
:type kwargs: dict
'''
pass
@abstractmethod
def _derivative(self, t, y, dim):
'''
This method should return the instantaneous derivative at time `t`
with current state `y` with dimensions `dim` (as returned by
`_state_internal2ode`. The derivative should be expressed
in a form understood by the integrator returned by `_integrator`
as used in `_integrate`.
:param t: The current time.
:type t: float
:param y: The current state (in whatever form is returned by the integrator).
:type y: object
:param dim: The original dimensions of the state (as returned by `_state_internal2ode`).
:type dim: object
'''
pass
@abstractmethod
def _state_internal2ode(self, state):
'''
This method should return a tuple of a state and its original dimensions in some form.
The state should be in a form understandable by the integrator returned by `_integrator`,
and the derivative returned by `_derivative`.
:param state: The state represented as a numpy array. Maybe 1D or 2D.
:type state: numpy.ndarray
'''
pass
@abstractmethod
def _state_ode2internal(self, state, dimensions):
'''
This method should restore and return the state (currently represented in the form used by the integrator
returned by `_integrator`) to its representation as a numpy array using the
`dimensions` returned by `_state_internal2ode`.
:param state: The state to re-represented as a numpy array.
:type state: object
:param dimensions: The dimensions returned by `_state_internal2ode`.
:type dimensions: object
'''
pass
#
# Process solution results to convert solution back to complex form
def __results_ode2internal(self, results, dimensions):
presults = []
for cut in results:
presults.append((cut[0], self._state_ode2internal(cut[1], dimensions)))
return presults
def __state_prepare(self, y_0):
nz = np.nonzero(y_0)
indices = set()
for n in nz:
indices.update(list(n))
indices = self.__get_connected(*indices)
return self.__get_restricted_state(y_0, indices), indices
def __get_restricted_state(self, y_0, indices):
if len(y_0.shape) == 2:
y_0 = y_0[indices, :]
return y_0[:, indices]
elif len(y_0.shape) == 1:
return y_0[indices]
raise ValueError("Cannot restrict y_0. Too many dimensions.")
def __get_connected(self, *indices):
new = set(indices)
operators = self.get_operators()# + self.get_time_ops().values()
for operator in operators:
new.update(operator.connected(*indices)) # ,**self.get_op_params()
if len(new.difference(indices)) != 0:
new.update(self.__get_connected(*new))
return sorted(new)
def __state_restore(self, y, indices, shape):
if len(shape) not in (1, 2):
raise ValueError("Integrator only knows how to handle 1 and 2 dimensional states.")
new_y = np.zeros(shape, dtype=np.complex128)
if len(shape) == 1:
new_y[indices] = y
else:
for i, index in enumerate(indices):
new_y[index, indices] = np.array(y)[i, :]
return new_y
def __results_restore(self, ys, indices, shape):
new_ys = []
for y in ys:
new_ys.append((y[0], self.__state_restore(y[1], indices, shape)))
return new_ys
#
# Integration routine. Should not ordinarily be called directly
def __integrate(self, y_0s, times=None, t_offset=0, kwargs={}):
with self.p:
self.p(**self.params)
# Process times
if times is None:
raise ValueError("Times must be a sequence of interesting times, which will be pased to `Parameters.range` as the parameter `t`.")
times_p = self.p.range('t', t=times)
if y_0s is None:
y_0s, t_offset = self.initial, self.t_offset
# Setup Callback
callback = self.get_progress_callback()
callback.onStart()
# Initialise Progress
state = Progress()
state.run = 0
state.runs = len(y_0s)
state.callback = callback
state.max_time = max(times_p)
state.operators = None
state.indices = set()
# Specify the derivative function
def f(t, y):
# Register progress
try:
current_progress = (t / state.max_time + float(state.run)) / float(state.runs)
state.callback.onProgress(current_progress, identifier=self.identifier)
except Exception, e:
print "Error updating state.", e
# Do integrations
return self._derivative(t, y, dim, state.operators)
results = []
# Initialise ODE Solver
integrator = self._integrator(f, **self.int_kwargs)
sequence = self._sequence(t_offset, times_p, self.time_ops)
for y_0 in y_0s:
solution = []
required = []
for i, segment in enumerate(sequence):
# If sequence element is an integration stretch
if not isinstance(segment[0], StateOperator):
y_0_dim = y_0.shape
y_0, indices = self.__state_prepare(y_0)
if set(indices) != state.indices:
state.indices = set(indices)
state.operators = self.get_operators(indices=indices)
y_0_ode, dim = self._state_internal2ode(y_0)
sol = self._integrate(integrator, y_0_ode, times=segment[0], **kwargs)
sol = self.__results_restore(self.__results_ode2internal(sol, dim), indices, y_0_dim)
if len(solution) == 0:
required.extend(segment[1])
solution.extend(sol)
else:
required.extend(segment[1][1:])
solution.extend(sol[1:])
y_0 = sol[-1][1]
# If sequence element is a time_op
else:
y_0 = segment[0](
state=y_0,
t=0 if i == 0 else sequence[i - 1][0][-1] # time of application
)
solution.append((segment[1], y_0))
required.append(segment[2])
results.append(solution)
state.run += 1
# since we use sympy objects, potentially with cache enabled, we should clear it lest it build up
sympy_clear_cache()
callback.onComplete(identifier=self.identifier)
# Having completed the integration, generate results to output to user
result_type = [('time', float), ('label', object), ('state', complex, self.initial[0].shape)]
return_results = np.empty(dtype=result_type, shape=(len(results), len(times)))
# Create a map of times to original labels
time_map = {}
for i, time in enumerate(list(times) + list(self.time_ops.keys())):
if not isinstance(time, (int, long, float, complex)):
time_map[float(self.p(time))] = time
# Populate return results
for i, result_set in enumerate(results):
k = 0
for j, (time, y) in enumerate(result_set):
if required[j]:
return_results[i, k] = (time, time_map.get(time, None), y)
k += 1
return results, return_results
#
# Return a sequence of events for easy integration
def _sequence(self, t_offset, times, time_ops):
'''
This method processes the incoming times and generates a list of
either tuple elements (indicating integration periods, of the form
`( (<start_time>,....,<end_time>), (bool required for each time))` ) or a tuple of a StateOperator object
with a time and a boolean indicating whether the value should be stored (indicating some operation on the current state).
'''
t_offset = float(self.p(t_offset))
sequence = []
# Value checking (assuming iterable list)
if (t_offset > np.max(times)):
raise ValueError("All interesting times before t_offset. Aborting.")
times = np.sort(np.unique(np.array(times)))
if times.shape == ():
times.shape = (1,)
times = times[np.where(times >= t_offset)]
if len(time_ops) > 0:
optimes_raw = np.array(time_ops.keys(),dtype=object)
optimes = np.array(self.p.range('t',t=optimes_raw))
optimes_map = dict(zip(optimes,optimes_raw))
optimes = optimes[np.where((optimes >= t_offset) & (optimes <= np.max(times)))]
else:
optimes = []
subtimes = []
required = []
for time in np.sort(np.unique(np.concatenate((times, optimes,[t_offset])))):
subtimes.append(time)
required.append(time in times)
if time in optimes:
if len(subtimes) > 1:
required[-1] = False
sequence.append((tuple(subtimes), tuple(required)))
sequence.append((time_ops[optimes_map[time]], time, time in times))
subtimes = [time]
required = [False]
if len(subtimes) > 1:
sequence.append((subtimes, required))
return sequence
############# CALLBACKS ##############################################################
[docs]class IntegratorCallback(object):
#
# Trigger start
[docs] def onStart(self):
pass
#
# Receives a float value in [0,1].
[docs] def onProgress(self, progress, identifier=None):
pass
#
# Called when function is complete.
# Level = 0 -> OKAY
# Level = 1 -> WARN
# Level = 2 -> ERROR
# Status = string message
[docs] def onComplete(self, identifier=None, message=None, status=0):
pass
# Try to use the full implementation of progress bar
try:
import progressbar as pbar
class ProgressBarCallback(IntegratorCallback):
def onStart(self):
self.pb = pbar.ProgressBar(widgets=['\033[0;34m', pbar.Percentage(), ' ', pbar.Bar(), '\033[0m'])
self.pb.start()
self._progress = 0
def onProgress(self, progress, identifier=None):
progress *= 100
if (progress > self._progress):
self._progress = math.ceil(progress)
self.pb.update(min(100, progress))
def onComplete(self, identifier=None, message=None, status=0):
self.pb.finish()
except:
[docs] class ProgressBarCallback(IntegratorCallback):
[docs] def onStart(self):
self._progress = 0
[docs] def onProgress(self, progress, identifier=None):
progress *= 100
# if (progress > self._progress):
self._progress = math.ceil(progress)
sys.stderr.write("\r%3d%%" % min(100, progress))
[docs] def onComplete(self, identifier=None, message=None, status=0):
print "\n"
if message:
print "%s:" % identifier, message
[docs]class Progress(object):
def __init__(self):
self.run = 0
self.runs = 1
self.callback = None