# -*- coding: utf-8 -*-
"""Abstract Base Class for Basis Function and some common implementations."""
import abc
import numpy as np
[docs]class BasisFunction(object):
r"""ABC for basis functions used by LSPI Policies.
A basis function is a function that takes in a state vector and an action
index and returns a vector of features. The resulting feature vector is
referred to as :math:`\phi` in the LSPI paper (pg 9 of the PDF referenced
in this package's documentation). The :math:`\phi` vector is dotted with
the weight vector of the Policy to calculate the Q-value.
The dimensions of the state vector are usually smaller than the dimensions
of the :math:`\phi` vector. However, the dimensions of the :math:`\phi`
vector are usually much smaller than the dimensions of an exact
representation of the state which leads to significant savings when
computing and storing a policy.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
[docs] def size(self):
r"""Return the vector size of the basis function.
Returns
-------
int
The size of the :math:`\phi` vector.
(Referred to as k in the paper).
"""
pass # pragma: no cover
@abc.abstractmethod
[docs] def evaluate(self, state, action):
r"""Calculate the :math:`\phi` matrix for the given state-action pair.
The way this value is calculated depends entirely on the concrete
implementation of BasisFunction.
Parameters
----------
state : numpy.array
The state to get the features for.
When calculating Q(s, a) this is the s.
action : int
The action index to get the features for.
When calculating Q(s, a) this is the a.
Returns
-------
numpy.array
The :math:`\phi` vector. Used by Policy to compute Q-value.
"""
pass # pragma: no cover
@abc.abstractproperty
def num_actions(self):
"""Return number of possible actions.
Returns
-------
int
Number of possible actions.
"""
pass # pragma: no cover
@staticmethod
def _validate_num_actions(num_actions):
"""Return num_actions if valid. Otherwise raise ValueError.
Return
------
int
Number of possible actions.
Raises
------
ValueError
If num_actions < 1
"""
if num_actions < 1:
raise ValueError('num_actions must be >= 1')
return num_actions
[docs]class FakeBasis(BasisFunction):
r"""Basis that ignores all input. Useful for random sampling.
When creating a purely random Policy a basis function is still required.
This basis function just returns a :math:`\phi` equal to [1.] for all
inputs. It will however, still throw exceptions for impossible values like
negative action indexes.
"""
def __init__(self, num_actions):
"""Initialize FakeBasis."""
self.__num_actions = BasisFunction._validate_num_actions(num_actions)
[docs] def size(self):
r"""Return size of 1.
Returns
-------
int
Size of :math:`phi` which is always 1 for FakeBasis
Example
-------
>>> FakeBasis().size()
1
"""
return 1
[docs] def evaluate(self, state, action):
r"""Return :math:`\phi` equal to [1.].
Parameters
----------
state : numpy.array
The state to get the features for.
When calculating Q(s, a) this is the s. FakeBasis ignores these
values.
action : int
The action index to get the features for.
When calculating Q(s, a) this is the a. FakeBasis ignores these
values.
Returns
-------
numpy.array
:math:`\phi` vector equal to [1.].
Raises
------
IndexError
If action index is < 0
Example
-------
>>> FakeBasis().evaluate(np.arange(10), 0)
array([ 1.])
"""
if action < 0:
raise IndexError('action index must be >= 0')
if action >= self.num_actions:
raise IndexError('action must be < num_actions')
return np.array([1.])
@property
def num_actions(self):
"""Return number of possible actions."""
return self.__num_actions
@num_actions.setter
def num_actions(self, value):
"""Set the number of possible actions.
Parameters
----------
value: int
Number of possible actions. Must be >= 1.
Raises
------
ValueError
If value < 1.
"""
if value < 1:
raise ValueError('num_actions must be at least 1.')
self.__num_actions = value
[docs]class OneDimensionalPolynomialBasis(BasisFunction):
"""Polynomial features for a state with one dimension.
Takes the value of the state and constructs a vector proportional
to the specified degree and number of actions. The polynomial is first
constructed as [..., 1, value, value^2, ..., value^k, ...]
where k is the degree. The rest of the vector is 0.
Parameters
----------
degree : int
The polynomial degree.
num_actions: int
The total number of possible actions
Raises
------
ValueError
If degree is less than 0
ValueError
If num_actions is less than 1
"""
def __init__(self, degree, num_actions):
"""Initialize polynomial basis function."""
self.__num_actions = BasisFunction._validate_num_actions(num_actions)
if degree < 0:
raise ValueError('Degree must be >= 0')
self.degree = degree
[docs] def size(self):
"""Calculate the size of the basis function.
The base size will be degree + 1. This basic matrix is then
duplicated once for every action. Therefore the size is equal to
(degree + 1) * number of actions
Returns
-------
int
The size of the phi matrix that will be returned from evaluate.
Example
-------
>>> basis = OneDimensionalPolynomialBasis(2, 2)
>>> basis.size()
6
"""
return (self.degree + 1) * self.num_actions
[docs] def evaluate(self, state, action):
r"""Calculate :math:`\phi` matrix for given state action pair.
The :math:`\phi` matrix is used to calculate the Q function for the
given policy.
Parameters
----------
state : numpy.array
The state to get the features for.
When calculating Q(s, a) this is the s.
action : int
The action index to get the features for.
When calculating Q(s, a) this is the a.
Returns
-------
numpy.array
The :math:`\phi` vector. Used by Policy to compute Q-value.
Raises
------
IndexError
If :math:`0 \le action < num\_actions` then IndexError is raised.
ValueError
If the state vector has any number of dimensions other than 1 a
ValueError is raised.
Example
-------
>>> basis = OneDimensionalPolynomialBasis(2, 2)
>>> basis.evaluate(np.array([2]), 0)
array([ 1., 2., 4., 0., 0., 0.])
"""
if action < 0 or action >= self.num_actions:
raise IndexError('Action index out of bounds')
if state.shape != (1, ):
raise ValueError('This class only supports one dimensional states')
phi = np.zeros((self.size(), ))
offset = (self.size()/self.num_actions)*action
value = state[0]
phi[offset:offset + self.degree + 1] = \
np.array([pow(value, i) for i in range(self.degree+1)])
return phi
@property
def num_actions(self):
"""Return number of possible actions."""
return self.__num_actions
@num_actions.setter
def num_actions(self, value):
"""Set the number of possible actions.
Parameters
----------
value: int
Number of possible actions. Must be >= 1.
Raises
------
ValueError
If value < 1.
"""
if value < 1:
raise ValueError('num_actions must be at least 1.')
self.__num_actions = value
[docs]class RadialBasisFunction(BasisFunction):
r"""Gaussian Multidimensional Radial Basis Function (RBF).
Given a set of k means :math:`(\mu_1 , \ldots, \mu_k)` produce a feature
vector :math:`(1, e^{-\gamma || s - \mu_1 ||^2}, \cdots,
e^{-\gamma || s - \mu_k ||^2})` where `s` is the state vector and
:math:`\gamma` is a free parameter. This vector will be padded with
0's on both sides proportional to the number of possible actions
specified.
Parameters
----------
means: list(numpy.array)
List of numpy arrays representing :math:`(\mu_1, \ldots, \mu_k)`.
Each :math:`\mu` is a numpy array with dimensions matching the state
vector this basis function will be used with. If the dimensions of each
vector are not equal than an exception will be raised. If no means are
specified then a ValueError will be raised
gamma: float
Free parameter which controls the size/spread of the Gaussian "bumps".
This parameter is best selected via tuning through cross validation.
gamma must be > 0.
num_actions: int
Number of actions. Must be in range [1, :math:`\infty`] otherwise
an exception will be raised.
Raises
------
ValueError
If means list is empty
ValueError
If dimensions of each mean vector do not match.
ValueError
If gamma is <= 0.
ValueError
If num_actions is less than 1.
Note
----
The numpy arrays specifying the means are not copied.
"""
def __init__(self, means, gamma, num_actions):
"""Initialize RBF instance."""
self.__num_actions = BasisFunction._validate_num_actions(num_actions)
if len(means) == 0:
raise ValueError('You must specify at least one mean')
if reduce(RadialBasisFunction.__check_mean_size, means) is None:
raise ValueError('All mean vectors must have the same dimensions')
self.means = means
if gamma <= 0:
raise ValueError('gamma must be > 0')
self.gamma = gamma
@staticmethod
def __check_mean_size(left, right):
"""Apply f if the value is not None.
This method is meant to be used with reduce. It will return either the
right most numpy array or None if any of the array's had
differing sizes. I wanted to use a Maybe monad here,
but Python doesn't support that out of the box.
Return
------
None or numpy.array
None values will propogate through the reduce automatically.
"""
if left is None or right is None:
return None
else:
if left.shape != right.shape:
return None
return right
[docs] def size(self):
r"""Calculate size of the :math:`\phi` matrix.
The size is equal to the number of means + 1 times the number of
number actions.
Returns
-------
int
The size of the phi matrix that will be returned from evaluate.
"""
return (len(self.means) + 1) * self.num_actions
[docs] def evaluate(self, state, action):
r"""Calculate the :math:`\phi` matrix.
Matrix will have the following form:
:math:`[\cdots, 1, e^{-\gamma || s - \mu_1 ||^2}, \cdots,
e^{-\gamma || s - \mu_k ||^2}, \cdots]`
where the matrix will be padded with 0's on either side depending
on the specified action index and the number of possible actions.
Returns
-------
numpy.array
The :math:`\phi` vector. Used by Policy to compute Q-value.
Raises
------
IndexError
If :math:`0 \le action < num\_actions` then IndexError is raised.
ValueError
If the state vector has any number of dimensions other than 1 a
ValueError is raised.
"""
if action < 0 or action >= self.num_actions:
raise IndexError('Action index out of bounds')
if state.shape != self.means[0].shape:
raise ValueError('Dimensions of state must match '
'dimensions of means')
phi = np.zeros((self.size(), ))
offset = (len(self.means[0])+1)*action
rbf = [RadialBasisFunction.__calc_basis_component(state,
mean,
self.gamma)
for mean in self.means]
phi[offset] = 1.
phi[offset+1:offset+1+len(rbf)] = rbf
return phi
@staticmethod
def __calc_basis_component(state, mean, gamma):
mean_diff = state - mean
return np.exp(-gamma*np.sum(mean_diff*mean_diff))
@property
def num_actions(self):
"""Return number of possible actions."""
return self.__num_actions
@num_actions.setter
def num_actions(self, value):
"""Set the number of possible actions.
Parameters
----------
value: int
Number of possible actions. Must be >= 1.
Raises
------
ValueError
If value < 1.
"""
if value < 1:
raise ValueError('num_actions must be at least 1.')
self.__num_actions = value
[docs]class ExactBasis(BasisFunction):
"""Basis function with no functional approximation.
This can only be used in domains with finite, discrete state-spaces. For
example the Chain domain from the LSPI paper would work with this basis,
but the inverted pendulum domain would not.
Parameters
----------
num_states: list
A list containing integers representing the number of possible values
for each state variable.
num_actions: int
Number of possible actions.
"""
def __init__(self, num_states, num_actions):
"""Initialize ExactBasis."""
if len(np.where(num_states <= 0)[0]) != 0:
raise ValueError('num_states value\'s must be > 0')
self.__num_actions = BasisFunction._validate_num_actions(num_actions)
self._num_states = num_states
self._offsets = [1]
for i in range(1, len(num_states)):
self._offsets.append(self._offsets[-1]*num_states[i-1])
[docs] def size(self):
r"""Return the vector size of the basis function.
Returns
-------
int
The size of the :math:`\phi` vector.
(Referred to as k in the paper).
"""
return reduce(lambda x, y: x*y, self._num_states, 1)*self.__num_actions
[docs] def get_state_action_index(self, state, action):
"""Return the non-zero index of the basis.
Parameters
----------
state: numpy.array
The state to get the index for.
action: int
The state to get the index for.
Returns
-------
int
The non-zero index of the basis
Raises
------
IndexError
If action index < 0 or action index > num_actions
"""
if action < 0:
raise IndexError('action index must be >= 0')
if action >= self.num_actions:
raise IndexError('action must be < num_actions')
base = action * int(self.size() / self.__num_actions)
offset = 0
for i, value in enumerate(state):
offset += self._offsets[i] * state[i]
return base + offset
[docs] def evaluate(self, state, action):
r"""Return a :math:`\phi` vector that has a single non-zero value.
Parameters
----------
state: numpy.array
The state to get the features for. When calculating Q(s, a) this is
the s.
action: int
The action index to get the features for.
When calculating Q(s, a) this is the a.
Returns
-------
numpy.array
:math:`\phi` vector
Raises
------
IndexError
If action index < 0 or action index > num_actions
ValueError
If the size of the state does not match the the size of the
num_states list used during construction.
ValueError
If any of the state variables are < 0 or >= the corresponding
value in the num_states list used during construction.
"""
if len(state) != len(self._num_states):
raise ValueError('Number of state variables must match '
+ 'size of num_states.')
if len(np.where(state < 0)[0]) != 0:
raise ValueError('state cannot contain negative values.')
for state_var, num_state_values in zip(state, self._num_states):
if state_var >= num_state_values:
raise ValueError('state values must be <= corresponding '
+ 'num_states value.')
phi = np.zeros(self.size())
phi[self.get_state_action_index(state, action)] = 1
return phi
@property
def num_actions(self):
"""Return number of possible actions."""
return self.__num_actions
@num_actions.setter
def num_actions(self, value):
"""Set the number of possible actions.
Parameters
----------
value: int
Number of possible actions. Must be >= 1.
Raises
------
ValueError
if value < 1.
"""
if value < 1:
raise ValueError('num_actions must be at least 1.')
self.__num_actions = value