import time
import traceback
import triangula.chassis
import triangula.imu
from abc import ABCMeta, abstractmethod
from triangula.input import SixAxis
[docs]class TaskManager:
"""
Manages the task loop
"""
[docs] def __init__(self, arduino, lcd, chassis, joystick):
self.arduino = arduino
self.lcd = lcd
self.chassis = chassis
self.joystick = joystick
self.home_task = None
def _build_context(self, include_bearing):
bearing = None
imu_data = None
if include_bearing:
imu_data = triangula.imu.read()
bearing = imu_data[2]
return TaskContext(arduino=self.arduino,
lcd=self.lcd,
bearing=bearing,
imu_data=imu_data,
chassis=self.chassis,
joystick=self.joystick,
buttons_pressed=self.joystick.get_and_clear_button_press_history())
[docs] def run(self, initial_task):
"""
Start the task loop. Handles task switching and initialisation as well as any exceptions thrown within tasks.
:param initial_task:
An instance of :class:`triangula.task.Task` to use as the first task. Typically this is a menu or startup
task of some kind.
"""
active_task = initial_task
task_initialised = False
tick = 0
if self.home_task is None:
self.home_task = initial_task
while 1:
try:
context = self._build_context(active_task.requires_compass)
if context.button_pressed(SixAxis.BUTTON_SELECT):
active_task = ClearStateTask(self.home_task)
task_initialised = False
tick = 0
if task_initialised:
new_task = active_task.poll_task(context=context, tick=tick)
if new_task is None:
tick += 1
else:
active_task = new_task
if isinstance(active_task, ExitTask):
active_task = ClearStateTask(self.home_task)
task_initialised = False
tick = 0
else:
active_task.init_task(context=context)
task_initialised = True
except Exception as e:
active_task = ClearStateTask(ErrorTask(e))
task_initialised = False
[docs]class TaskContext:
"""
Contains the resources a task might need to perform its function
:ivar timestamp:
The time, in seconds since the epoch, that this context was created. In effect this is also the task creation
time as they're created at the same time.
"""
[docs] def __init__(self, arduino, lcd, bearing, imu_data, chassis, joystick, buttons_pressed):
"""
Create a new task context
:param arduino:
Instance of :class:`triangula.arduino.Arduino` that can be used to manipulate the motors and lights, and
which can poll the encoders attached to the motors.
:param lcd:
Instance of :class:`triangula.lcd.LCD` that can be used to display messages.
:param bearing:
If the task has indicated that a bearing is required, this is float value from the compass on the IMU.
:param imu_data:
If the task has indicated that a bearing is required, contains the entire IMU data block.
:param chassis:
An instance of :class:`triangula.chassis.HoloChassis` defining the motion dynamics for the robot.
:param joystick:
An instance of :class:`triangula.input.SixAxis` which can be used to get the joystick axes.
:param buttons_pressed:
A bitfield where bits are set to 1 if the corresponding SixAxis button was pressed since the start of the
previous task poll. Use this in preference to handler registration as it simplifies threading and cleanup
"""
self.arduino = arduino
self.lcd = lcd
self.bearing = bearing
self.chassis = chassis
self.joystick = joystick
self.buttons_pressed = buttons_pressed
self.timestamp = time.time()
self.imu_data = imu_data
[docs] def button_pressed(self, button_code):
"""
Helper method, equivalent to 'self.buttons_pressed & 1 << button_code
:param button_code:
A button index from :class:`triangula.input.SixAxis` i.e. SixAxis.BUTTON_SQUARE
:return:
0 if the button wasn't pressed at the time the context was created, non-zero otherwise
"""
return self.buttons_pressed & 1 << button_code
[docs]class Task:
"""
Base class for tasks. Tasks are single-minded activities which are run, one at a time, on Triangula's
processor. The service script is responsible for polling the active task, providing it with an appropriate
set of objects and properties such that it can interact with its environment.
"""
__metaclass__ = ABCMeta
[docs] def __init__(self, task_name='New Task', requires_compass=False):
"""
Create a new task with the specified name
:param task_name:
Name for this task, used in debug mostly. Defaults to 'New Task'
:param requires_compass:
Set to True to require that the task is provided with the current compass bearing when polled. This defaults
to False because I2C transactions are expensive and we don't want to make more of them than we have to.
"""
self.task_name = task_name
self.requires_compass = requires_compass
def __str__(self):
return 'Task[ task_name={} ]'.format(self.task_name)
@abstractmethod
[docs] def init_task(self, context):
"""
Called exactly once, the first time a new task is activated. Use this to set up any properties which weren't
available during construction.
:param context:
An instance of :class:`triangula.task.TaskContext` containing objects and properties which allow the task
to comprehend and act on its environment.
"""
return None
@abstractmethod
[docs] def poll_task(self, context, tick):
"""
Polled to perform the task's action, you shouldn't hang around too long in this method but there's no explicit
requirement for timely processing.
:param context:
An instance of :class:`triangula.task.TaskContext` containing objects and properties which allow the task
to comprehend and act on its environment.
:param int tick:
A counter, incremented each time poll is called.
:return:
Either None, to continue this task, or a subclass of :class:`triangula.task.Task` to switch to that task.
"""
return None
[docs]class ClearStateTask(Task):
"""
Task which clears the state, turns the lights off and stops the motors, then immediately passes control to another
task.
"""
[docs] def __init__(self, following_task):
"""
Create a new clear state task, this will effectively reset the robot's peripherals and pass control to the
next task. Use this when switching to ensure we're not leaving the wheels running etc.
:param following_task:
Another :class:`triangula.task.Task` which is immediately returned from the first poll operation.
:return:
"""
super(ClearStateTask, self).__init__(task_name='Clear state task', requires_compass=False)
self.following_task = following_task
def init_task(self, context):
context.arduino.set_motor_power(0, 0, 0)
context.lcd.set_text(row1='', row2='')
context.lcd.set_backlight(0, 0, 0)
context.arduino.set_lights(0, 0, 0)
def poll_task(self, context, tick):
return self.following_task
[docs]class ErrorTask(Task):
"""
Task used to display an error message
"""
[docs] def __init__(self, exception):
"""
Create a new error display task
:param exception:
An exception which caused this display to be shown
"""
super(ErrorTask, self).__init__(task_name='Error', requires_compass=False)
self.exception = exception
print exception
traceback.print_exc()
def init_task(self, context):
context.lcd.set_backlight(red=10, green=0, blue=0)
def poll_task(self, context, tick):
context.lcd.set_text(row1='ERROR!', row2=((' ' * 16) + str(self.exception) + (' ' * 16))[
tick % (len(str(self.exception)) + 16):])
time.sleep(0.2)
[docs]class ExitTask(Task):
"""
Special case Task, used to indicate that the current level of the task manager has completed and should take no
further actions.
"""
[docs] def __init__(self):
"""
No argument constructor
"""
super(ExitTask, self).__init__(task_name='Exit', requires_compass=False)
def init_task(self, context):
pass
def poll_task(self, context, tick):
pass
[docs]class PauseTask(Task):
"""
Task which will pause for at least the specified number of seconds, then yield to the specified task. If no task
is specified an ExitTask is used.
"""
[docs] def __init__(self, pause_time, following_task=None, led_hue=None):
"""
Constructor
:param pause_time:
This task should wait for at least this number of seconds before yielding.
:param following_task:
A task to which this will yield, if this is None an instance of ExitTask will be used.
:param led_hue:
Optional, if specified must be an integer between 0 and 255 and requests that all LEDs on the robot
are set to the specified hue value.
"""
super(PauseTask, self).__init__(task_name='Pause', requires_compass=False)
self.start_time = None
self.task = following_task
if self.task is None:
self.task = ExitTask()
self.pause_time = pause_time
self.led_hue = led_hue
def init_task(self, context):
self.start_time = time.time()
if self.led_hue is not None:
context.arduino.set_lights(self.led_hue, 255, 60)
def poll_task(self, context, tick):
now = time.time()
if now - self.start_time >= self.pause_time:
return self.task
else:
return None