"""Module of all the classes related to sensors.
There are base classes for sensors whose source is a body, joint or simulation.
It also considers those which read information automatically by subscribing
to certain signals.
Some abstract classes are:
* :class:`BaseSourceSensor`
* :class:`BaseSignalSensor`
* :class:`BodySensor`
* :class:`JointSensor`
* :class:`SimulationSensor`
Some practical sensors are:
* :class:`RotaryJointSensor`, :class:`JointTorque`
* :class:`Laser`
* :class:`GPS`, :class:`Velometer`, :class:`Accelerometer`,
:class:`Inclinometer`
* :class:`KineticEnergy`, :class:`PotentialEnergy`, :class:`TotalEnergy`,
:class:`SystemTotalEnergy`
It also contains the auxiliary classes :class:`SensorData` and
:class:`SensorDataQueue`.
"""
from abc import ABCMeta, abstractmethod
import logging
from ...lib.pydispatch import dispatcher
from ...model.collision import ode_adapter as coll
from ...utils import containers
from ...utils.generic import get_current_epoch_time
from ...utils.geometry import calc_inclination
from ...utils.mathematical import calc_acceleration
from . import signals
logger = logging.getLogger(__name__)
#==============================================================================
# Abstract classes
#==============================================================================
[docs]class BaseSourceSensor(object):
"""Abstract base class for all sensors.
Sensor data is stored in a queue (:attr:`data_queue`), and it is
usually retrieved after the simulation ends but can be accessed at any
time::
measurement = sensor.data_queue.pull()
.. warning::
Beware that :meth:`ars.utils.containers.Queue.pull` returns the first
element of the queue and **removes** it.
"""
__metaclass__ = ABCMeta
@abstractmethod
def __init__(self, source):
self._source = source
self.data_queue = SensorDataQueue()
[docs] def on_change(self, time=None):
"""Build a :class:`SensorData` object and stores it in the
:attr:`data_queue`.
:param time: if None, current (computer's) time is used
:type time: number or None
"""
dispatcher.send(signals.SENSOR_PRE_ON_CHANGE, sender=self, time=time)
if time is None:
time = get_current_epoch_time()
data = self._build_data()
data.set_time(time)
self.data_queue.put(data)
dispatcher.send(signals.SENSOR_POST_ON_CHANGE, sender=self, data=data)
@abstractmethod
def _build_data(self):
"""Create and return a :class:`SensorData` object."""
pass
[docs] def get_measurement(self):
"""Return a measurement of the sensor packed in a data structure."""
return self._build_data()
@property
[docs] def source(self):
return self._source
[docs]class BaseSignalSensor(object):
"""Base class for sensors that handle signals with :meth:`on_send`."""
__metaclass__ = ABCMeta
any_sender = dispatcher.Any
@abstractmethod
def __init__(self, sender=any_sender, autotime=False):
"""Constructor.
:param sender: object that will send the signal; if it is
:attr:`any_sender`, subscription will be to any object
:param autotime: if True and :meth:`_get_time` is not overriden, every
measurement's time will set to the computer time in that instant
"""
self._sender = sender
self._autotime = autotime
self.data_queue = SensorDataQueue()
[docs] def on_send(self, sender, *args, **kwargs):
"""Handle signal sent/fired by ``sender`` through the dispatcher.
Takes care of building a data object, set time to it and save it in the
``data_queue``.
:param sender: signal sender
:param args: signal arguments
:param kwargs: signal keyword arguments
"""
data = self._build_data(sender, *args, **kwargs)
time = self._get_time()
if time is not None:
data.set_time(time)
self.data_queue.put(data)
@abstractmethod
def _build_data(self, sender, *args, **kwargs):
"""Build and return a SensorData object with current data.
:return: object containing data based on the state of the sender
:rtype: :class:`SensorData`
"""
pass
def _get_time(self):
"""Return the time to set to the :class`SensorData` instance.
Override this to define it based on some object's state or property.
:return: time value for the data
:rtype: float or None
"""
time = None
if self._autotime:
time = get_current_epoch_time()
return time
@property
[docs] def sender(self):
"""Return the sender of the signal to which the sensor listens."""
return self._sender
[docs]class SingleSignalSensor(BaseSignalSensor):
"""Abstract base class for sensors subscribed to one signal."""
__metaclass__ = ABCMeta
@abstractmethod
def __init__(self, signal, *args, **kwargs):
"""Constructor.
:param signal: signal to subscribe to
"""
super(SingleSignalSensor, self).__init__(*args, **kwargs)
self._signal = signal
# subscribe :meth:`on_send` handler to the signal sent by ``sender``
dispatcher.connect(self.on_send, signal=self._signal,
sender=self._sender)
[docs]class MultipleSignalsSensor(BaseSignalSensor):
"""Abstract base class for sensors subscribed to multiple signals."""
__metaclass__ = ABCMeta
@abstractmethod
def __init__(self, signals, *args, **kwargs):
"""Constructor.
:param signals: signals to subscribe to
:type signals: iterable
"""
super(MultipleSignalsSensor, self).__init__(*args, **kwargs)
self._signals = signals
# subscribe :meth:`on_send` handler to the signals in :attr:`_signals`
# sent by ``sender``
for signal in self._signals:
dispatcher.connect(self.on_send, signal=signal,
sender=self._sender)
[docs]class BodySensor(BaseSourceSensor):
"""Abstract base class for sensors whose source of data is a body."""
__metaclass__ = ABCMeta
def __init__(self, body):
super(BodySensor, self).__init__(body)
@property
[docs] def body(self):
return self.source
[docs]class JointSensor(BaseSourceSensor):
"""Abstract base class for sensors whose source of data is a joint."""
__metaclass__ = ABCMeta
def __init__(self, joint):
super(JointSensor, self).__init__(joint)
@property
[docs] def joint(self):
return self.source
[docs]class ActuatedJointSensor(JointSensor):
"""Sensor whose source of data is an ``ActuatedJoint`` joint."""
__metaclass__ = ABCMeta
[docs]class SimulationSensor(BaseSourceSensor):
"""Abstract base class for sensors whose source of data is a simulation."""
__metaclass__ = ABCMeta
def __init__(self, sim):
"""Constructor.
:param sim: simulation
:type sim: :class:`ars.model.simulator.Simulation`
"""
super(SimulationSensor, self).__init__(sim)
@property
[docs] def sim(self):
"""Return the simulation object.
:return: simulation
:rtype: :class:`ars.model.simulator.Simulation`
"""
return self.source
#==============================================================================
# classes
#==============================================================================
[docs]class RotaryJointSensor(ActuatedJointSensor):
"""Sensor measuring the angle (and its rate) of a rotary joint."""
def _build_data(self):
kwargs = {'angle': self.joint.angle,
'angle_rate': self.joint.angle_rate, }
return SensorData(**kwargs)
[docs]class JointTorque(SingleSignalSensor):
"""Sensor measuring torque added to a joint."""
signal = signals.JOINT_POST_ADD_TORQUE
def __init__(self, sim_joint, sim):
# The sender to connect to is sim_joint.joint because
# :class:`SimulatedJoint` wraps the "real" joint object that sends the
# signal.
super(JointTorque, self).__init__(signal=self.signal,
sender=sim_joint.joint)
self._sim = sim
def _build_data(self, sender, *args, **kwargs):
return SensorData(**{'torque': kwargs.get('torque')})
def _get_time(self):
return self._sim.sim_time
[docs]class JointForce(SingleSignalSensor):
"""Sensor measuring force 'added' to a joint."""
signal = signals.JOINT_POST_ADD_FORCE
def __init__(self, sim_joint, sim):
super(JointForce, self).__init__(signal=self.signal,
sender=sim_joint.joint)
self._sim = sim
def _build_data(self, sender, *args, **kwargs):
return SensorData(**{'force': kwargs.get('force')})
def _get_time(self):
return self._sim.sim_time
[docs]class JointPower(MultipleSignalsSensor):
"""Sensor measuring power applied by a joint (due to force and torque)."""
signals = [JointTorque.signal, JointForce.signal]
def __init__(self, sim_joint, sim):
super(JointPower, self).__init__(signals=self.signals,
sender=sim_joint.joint)
self._sim = sim
def _build_data(self, sender, *args, **kwargs):
power = 0.0
# both are scalars (float)
torque = kwargs.get('torque')
force = kwargs.get('force')
try:
if torque is not None:
power += torque * sender.angle_rate
if force is not None:
power += force * sender.position_rate
except Exception:
logger.exception("Error when calculating power")
return SensorData(**{'power': power})
def _get_time(self):
return self._sim.sim_time
[docs]class Laser(BaseSourceSensor):
"""Laser scanner."""
def __init__(self, space, max_distance=10.0):
self._ray = coll.Ray(space, max_distance)
super(Laser, self).__init__(self._ray)
[docs] def on_change(self, time=get_current_epoch_time()):
super(Laser, self).on_change(time)
self._ray.clear_contacts()
[docs] def set_position(self, pos):
"""Set position of the ray.
Useful mainly when it is not attached to a body.
:param pos: position
:type pos: 3-sequence of floats
"""
# TODO: if mounted, w.r.t what?
self._ray.set_position(pos)
[docs] def set_rotation(self, rot):
"""Set rotation of the ray.
Useful mainly when it is not attached to a body.
:param rot: rotation matrix
:type rot: 9-sequence of floats
"""
# TODO: if mounted, w.r.t what?
self._ray.set_rotation(rot)
[docs] def get_ray(self):
return self._ray
def _build_data(self):
ray_contact = self._ray.get_closer_contact()
if ray_contact is None:
kwargs = {'distance': None}
else:
kwargs = {'distance': ray_contact.depth}
return SensorData(**kwargs)
# class LaserRangeFinder(BodySensor):
#
# """Laser range finder."""
#
# pass
[docs]class GPS(BodySensor):
"""Retrieve a body's XYZ position."""
def _build_data(self):
kwargs = {'position': self.body.get_position()}
return SensorData(**kwargs)
[docs]class Velometer(BodySensor):
"""Calculate and retrieve a body's linear and angular velocity."""
def _build_data(self):
kwargs = {'linear velocity': self.body.get_linear_velocity(),
'angular velocity': self.body.get_angular_velocity(), }
return SensorData(**kwargs)
[docs]class Accelerometer(BodySensor):
"""Calculate and retrieve a body's linear and angular acceleration.
.. warning::
The provided `time_step` is used to calculate the acceleration based on
the velocity measured at two instants in time. If subsequent calls to
`on_change` are separated by a simulation time period different to the
given `time_step`, the results will be invalid.
"""
def __init__(self, body, time_step):
super(Accelerometer, self).__init__(body)
self._time_step = time_step
def _build_data(self):
linear_vel_prev, angular_vel_prev = self.body.get_saved_velocities()
linear_vel = self.body.get_linear_velocity()
angular_vel = self.body.get_angular_velocity()
self.body.save_velocities()
linear_accel = calc_acceleration(
self._time_step, linear_vel_prev, linear_vel)
angular_accel = calc_acceleration(
self._time_step, angular_vel_prev, angular_vel)
kwargs = {'linear acceleration': linear_accel,
'angular acceleration': angular_accel, }
return SensorData(**kwargs)
[docs]class Inclinometer(BodySensor):
"""Retrieve a body's `pitch` and `roll`."""
def _build_data(self):
pitch, roll = calc_inclination(self.body.get_rotation())
kwargs = {'pitch': pitch, 'roll': roll}
return SensorData(**kwargs)
[docs]class KineticEnergy(BodySensor):
r"""Retrieve a body's kinetic energy, both due to translation and rotation.
.. math::
E_t &= \frac{1}{2} m v^2 = \frac{1}{2} m \cdot v^\top v \\
E_r &= \frac{1}{2} I \omega^2 = \frac{1}{2} \omega^\top \mathbf{I} \omega
"""
def _build_data(self):
kwargs = {
'translation energy': self.body.calc_translation_kinetic_energy(),
'rotation energy': self.body.calc_rotation_kinetic_energy(), }
return SensorData(**kwargs)
[docs]class PotentialEnergy(BodySensor):
r"""Retrieve a body's potential energy.
Calculated based on the current position (`x`) and world's gravitational
acceleration (`g`).
.. math::
E_p = m \cdot g \cdot h = - m \cdot g^\top x
"""
def __init__(self, body, gravity):
super(PotentialEnergy, self).__init__(body)
self._gravity = gravity
def _build_data(self):
potential_e = self.body.calc_potential_energy(self._gravity)
kwargs = {'potential energy': potential_e, }
return SensorData(**kwargs)
[docs]class TotalEnergy(BodySensor):
r"""Retrieve a body's potential and kinetic energy.
The kinetic energy accounts for translation and rotation.
.. math::
E_p &= m \cdot g \cdot h = - m \cdot g^\top x \\
E_k &= \frac{1}{2} m \cdot v^\top v
+ \frac{1}{2} \omega^\top \mathbf{I} \omega
"""
def __init__(self, body, gravity, disaggregate=False):
super(TotalEnergy, self).__init__(body)
self._gravity = gravity
self._disaggregate = disaggregate
def _build_data(self):
potential_e = self.body.calc_potential_energy(self._gravity)
linear_ke = self.body.calc_translation_kinetic_energy()
angular_ke = self.body.calc_rotation_kinetic_energy()
if self._disaggregate:
kwargs = {'potential energy': potential_e,
'kinetic energy': linear_ke + angular_ke, }
else:
kwargs = {'total energy': potential_e + linear_ke + angular_ke, }
return SensorData(**kwargs)
[docs]class SystemTotalEnergy(SimulationSensor):
r"""Retrieve a system's total potential and kinetic energy.
It considers all bodies in the simulation. The kinetic energy accounts for
translation and rotation.
.. math::
E_p &= m \cdot g \cdot h = - m \cdot g^\top x \\
E_k &= \frac{1}{2} m \cdot v^\top v
+ \frac{1}{2} \omega^\top \mathbf{I} \omega
"""
def __init__(self, sim, disaggregate=False):
super(SystemTotalEnergy, self).__init__(sim)
self._disaggregate = disaggregate
def _build_data(self):
gravity = self.sim.gravity
bodies = self.sim.get_bodies()
potential_e = 0.0
linear_ke = 0.0
angular_ke = 0.0
for body in bodies:
potential_e += body.calc_potential_energy(gravity)
linear_ke += body.calc_translation_kinetic_energy()
angular_ke += body.calc_rotation_kinetic_energy()
if self._disaggregate:
kwargs = {'potential energy': potential_e,
'kinetic energy': linear_ke + angular_ke, }
else:
kwargs = {'total energy': potential_e + linear_ke + angular_ke, }
return SensorData(**kwargs)
#==============================================================================
# aux classes
#==============================================================================
[docs]class SensorData(object):
"""Data structure to pack a sensor measurement's information."""
def __init__(self, *args, **kwargs):
self._time = None
self._args = args # as a tuple?
self._kwargs = kwargs # as a dictionary?
[docs] def get_time(self):
return self._time
# TODO: does it make sense?
[docs] def set_time(self, time):
self._time = time
[docs] def get_args(self):
return self._args
[docs] def get_kwargs(self):
return self._kwargs
[docs] def get_arg(self, index):
return self._args[index]
[docs] def get_kwarg(self, key):
return self._kwargs[key]
# TODO: examples
def __str__(self):
line = '|'
if self._time is not None:
line = ' %f |' % self._time
for arg in self._args:
line += ' %s |' % str(arg) # TODO
for key in self._kwargs:
line += ' %s: %s |' % (str(key), str(self._kwargs[key])) # TODO
return line
[docs]class SensorDataQueue(containers.Queue):
"""Queue-like container for sensor measurements."""
pass