Source code for Scheduler

# Scheduler
# Support for coroutines using Python generator functions.
#
# Copyright (c) 2014 Charles Weir.  Shared under the MIT Licence.

import datetime
import logging
import sys, traceback

[docs]class StopCoroutineException( Exception ): '''Exception used to stop a coroutine''' pass
ProgramStartTime = datetime.datetime.now()
[docs]class Scheduler(): ''' This manages an arbitrary number of coroutines (implemented as generator functions), supporting invoking each every *timeMillisBetweenWorkCalls*, and detecting when each has completed. It supports one special coroutine - the updatorCoroutine, which is invoked before and after all the other ones. ''' timeMillisBetweenWorkCalls = 50 @staticmethod
[docs] def currentTimeMillis(): 'Answers the time in floating point milliseconds since program start.' global ProgramStartTime c = datetime.datetime.now() - ProgramStartTime return c.days * (3600.0 * 1000 * 24) + c.seconds * 1000.0 + c.microseconds / 1000.0
def __init__(self, timeMillisBetweenWorkCalls = 50): Scheduler.timeMillisBetweenWorkCalls = timeMillisBetweenWorkCalls self.coroutines = [] self.timeOfLastCall = Scheduler.currentTimeMillis() self.updateCoroutine = self.nullCoroutine() # for testing - usually replaced. #: The most recent exception raised by a coroutine: self.lastExceptionCaught = Exception("None")
[docs] def doWork(self): 'Executes all the coroutines, handling exceptions' timeNow = Scheduler.currentTimeMillis() if timeNow == self.timeOfLastCall: # Ensure each call gets a different timer value. return self.timeOfLastCall = timeNow self.updateCoroutine.next() for coroutine in self.coroutines[:]: # Copy of coroutines, so it doesn't matter removing one try: coroutine.next() except (StopIteration): self.coroutines.remove( coroutine ) except Exception as e: self.lastExceptionCaught = e logging.info( "Scheduler - caught: %r" % (e) ) exc_type, exc_value, exc_traceback = sys.exc_info() trace = "".join(traceback.format_tb(exc_traceback)) logging.debug( "Traceback (latest call first):\n %s" % trace ) self.coroutines.remove( coroutine ) self.updateCoroutine.next()
[docs] def timeMillisToNextCall(self): 'Wait time before the next doWork call should be called.' timeRequired = self.timeMillisBetweenWorkCalls + self.timeOfLastCall - Scheduler.currentTimeMillis() return max( timeRequired, 0 )
[docs] def addSensorCoroutine(self, *coroutineList): '''Adds one or more new sensor/program coroutines to be scheduled, answering the last one to be added. Sensor coroutines are scheduled *before* Action coroutines''' self.coroutines[0:0] = coroutineList return coroutineList[-1]
[docs] def addActionCoroutine(self, *coroutineList): '''Adds one or more new motor control coroutines to be scheduled, answering the last coroutine to be added. Action coroutines are scheduled *after* Sensor coroutines''' self.coroutines.extend( coroutineList ) return coroutineList[-1]
def setUpdateCoroutine(self, coroutine): # Private - set the coroutine that manages the interaction with the BrickPi. # The coroutine will be invoked once at the start and once at the end of each doWork call. self.updateCoroutine = coroutine
[docs] def stopCoroutine( self, *coroutineList ): 'Terminates the given one or more coroutines' for coroutine in coroutineList: try: coroutine.throw(StopCoroutineException) except (StopCoroutineException,StopIteration): # If the coroutine doesn't catch the exception to tidy up, it comes back here. self.coroutines.remove( coroutine )
[docs] def stopAllCoroutines(self): 'Terminates all coroutines (except the updater one) - rather drastic!' self.stopCoroutine(*self.coroutines[:]) # Makes a copy of the list - don't want to be changing it.
[docs] def numCoroutines( self ): 'Answers the number of active coroutines' return len(self.coroutines)
[docs] def stillRunning( self, *coroutineList ): 'Answers whether any of the given coroutines are still executing' return any( c in self.coroutines for c in coroutineList ) ############################################################################################# # Coroutines #############################################################################################
@staticmethod
[docs] def nullCoroutine(): 'Null coroutine - runs forever and does nothing' while True: yield
@staticmethod
[docs] def runTillFirstCompletes( *coroutineList ): 'Coroutine that executes the given coroutines until the first completes, then stops the others and finishes.' while True: for coroutine in coroutineList: try: coroutine.next() except (StopIteration, StopCoroutineException): return # CW - I don't understand it, but we don't seem to need to terminate the others explicitly. yield
@staticmethod
[docs] def runTillAllComplete(*coroutineList ): 'Coroutine that executes the given coroutines until all have completed or one throws an exception.' coroutines = list( coroutineList ) while coroutines != []: for coroutine in coroutines: try: coroutine.next() except (StopIteration, StopCoroutineException): coroutines.remove( coroutine ) yield
@staticmethod
[docs] def waitMilliseconds( timeMillis ): 'Coroutine that waits for timeMillis, then finishes.' t = Scheduler.currentTimeMillis() while Scheduler.currentTimeMillis() - t < timeMillis: yield
@staticmethod
[docs] def withTimeout( timeoutMillis, *coroutineList ): 'Coroutine that wraps the given coroutine(s) with a timeout' return Scheduler.runTillFirstCompletes( Scheduler.waitMilliseconds( timeoutMillis ), *coroutineList )
@staticmethod
[docs] def waitFor(function, *args ): 'Coroutine that waits until the given function (with optional parameters) returns True.' while not function(*args): yield