Source code for newportESP

"""Driver for Newport's ESP (and compatible) motion controllers over a serial port (RS232). 

Refer to the controller's manual for detailed operational instructions: http://assets.newport.com/webDocuments-EN/images/ESP300_User_Manual.pdf

:Usage:

>>> from mewportESP import ESP
>>> esp = ESP('/dev/ttyUSB0')  # open communication with controller
>>> stage = esp.axis(1)        # open axis no 1
>>> stage.id                   # print stage ID
>>> stage.move_to(1.2)         # Move to position 1.2 mm

**Revision history:**

  - v. 0.3 (27 August 2015): update all methods/properties names to lowercase.
    Capitalised versions are still available for backward compatibility.

  - v. 0.4 (28 September 2015): added a threading.Lock() to the query() method.
    This should make the module thread-safe.

  - v. 1.0 (16 February 2016) clean up Sphinx documentation.
    Remove all capitalised names.
    Upload to PyPi.
    
----------------------------------------   
"""

import serial
from time import sleep
import signal
import threading
#import logging
#from  matplotlib.cbook import Stack


[docs]class NewportError(Exception): """Represents errors raised by the ESP controller.""" # Error are in the form of 'code, timestamp, MESSAGE', eg: '0, 451322, NO ERROR DETECTED' def __init__(self, string): self._string = string split_string = string.split(',') code = split_string[0] if len(code) == 3: self.axis = code[0] self.code = code[1:] else: self.axis = None self.code = code self.code = split_string[0] self.timestamp = split_string[1][1:] self.message = split_string[2][1:] def __str__(self): if self.axis is not None: if_axis_specific = ' on axis ' + self.axis else: if_axis_specific = '' return self.message + if_axis_specific
[docs]def catch_error(func): """A decorator to read error messages after calling ESP functions.""" def inner(*args, **kwargs): self = args[0] func(*args, **kwargs) self.write('TB?', axis="") error_string = self.read_error() if error_string[0] is not '0': self.abort() raise NewportError(error_string) return inner
def check_previous_motion_is_done(func): def checked_previous_motion_is_done(*args, **kwargs): self = args[0] if self.moving: raise RuntimeError('Previous motion is not done! Aborting.') else: func(*args, **kwargs) return checked_previous_motion_is_done
[docs]class ESP(object): """ Driver for Newport's ESP (100/300) motion controller. :Usage: >>> esp = NewportESP.ESP('/dev/ttyUSB0') # open communication with controller >>> stage = esp.axis(1) # open axis no 1 """ def __init__(self, port): """:param port: Serial port connected to the controller.""" self.lock = threading.Lock() self.ser = serial.Serial(port=port, baudrate=19200, bytesize=8, timeout=1, parity='N', rtscts=1) #print("Found controller: " + self.version) self.Abort = self.abort def __del__(self): self.ser.close()
[docs] def read(self): """ Serial read with EOL character removed.""" with DelayedKeyboardInterrupt(): # stuff here will not be interrupted by SIGINT str = self.ser.readline() return str[0:-2]
[docs] def write(self, string, axis=None): """ Serial write. The EOL character is automatically appended :param string: the string to write to the port :param axis: index of the destination axis. If unspecified, the destination is the controller.""" self.ser.write((str(axis) if axis is not None else "") + string + "\r")
[docs] def query(self, string, axis=None, check_error=False): """write a command and read the reply. :param string: string to write to the port :param axis: index of the destination axis. If unspecified, the destination is the controller. :param check_error: if True, query the controller for any error, both before writing and before reading. :type check_error: bool :return: the reply string :rtype: string """ with self.lock: if check_error: self.raise_error() self.write(string+'?', axis=axis) if check_error: self.raise_error() return self.read()
@property def version(self): """The controller firmware version number.""" return self.query('VE')
[docs] def abort(self): """Send an 'Abort motion' command to the controller.""" self.write('AB')
[docs] def read_error(self): """Return the last error as a string.""" return self.query('TB')
[docs] def raise_error(self): """Check the last error message and raise a NewportError.""" err = self.read_error() if err[0] != "0": raise NewportError(err)
[docs] def axis(self, axis_index=1): """create an Axis object. :param axis_index: the axis index :type axis_index: int :rtype: :class:`Axis` """ return Axis(self, axis=axis_index)
[docs]class Axis(object): """ Represents a Newport actuator or motorised stage attached to the ESP controller. :Usage: >>> esp = NewportESP.ESP('/dev/ttyUSB0') # open communication with controller >>> stage = NewportESP.Axis(esp, axis = 1) # open axis no 1 """ def __init__(self, controller, axis=1): """ :param controller: The axis' Newport ESP controller :type controller: :class:`ESP` :param axis: the axis index :type axis: int """ self.axis = axis self.esp = controller self.read = self.esp.read self.step_size_list = (1, 0.5, 0.1, 0.05, 0.01, 0.005, 0.001, 0.0005, 0.0001) self.step_size = 6 # default increment for gotoRel (index in step_size_list) self.polling_time = 0.02 self.abort = self.esp.abort self.read_error = self.esp.read_error def __del__(self): self.off()
[docs] def write(self, string, axis=None): """ Send a command string to the axis. Can be used to send commands that are not covered by class methods. :param string: the command string. The axis index and EOL characters are automatically appended. :param axis: if unspecified, the command is directed to the current axis. However, by setting ``axis`` to an empty string, the command will be directed to the controller instead. """ if axis is None: axis = self.axis # optional argument allows none-axis-specific commands # (most of which should be in the controller class, but a few are useful here.) self.esp.write(string, axis=axis)
[docs] def query(self, string, check_error=False): """write a command and read the reply. :param string: string to write to the port :param check_error: if True, query the controller for any error, both before writing and before reading. :type check_error: bool :return: the reply string :rtype: string """ return self.esp.query(string, self.axis, check_error)
@property def id(self): """The axis model and serial number.""" return self.query('ID')
[docs] def on(self): """Power on the axis.""" self.write("MO")
[docs] def off(self): """Power on the axis.""" self.write("MF")
@property def home(self): """Define home. This command is used to define current position, HOME position. This means that the current position will be preset to the value defined by parameter 'nn'. """ return self.query('DH') @home.setter def home(self, nn): self.write('DH'+str(nn)) @property def moving(self): """Return True is motion is finished. :rtype: bool """ return False if self.query('MD') == "1" else True
[docs] def wait(self): """This method will block until current motion is finished.""" while self.moving: sleep(self.polling_time)
#catch_error
[docs] def move_to(self, pos, wait=False): """Go to absolute position. :param pos: the final position (in mm) :type pos: float :param wait: whether to block until the motion is finished :type wait: bool """ self.write("PA" + str(pos)) if wait: self.WaitForMotionDone()
#catch_error
[docs] def move_by(self, pos, wait=False): """Go to relative position. :param pos: the requested step (in mm) :type pos: float :param wait: whether to block until the motion is finished :type wait: bool """ self.write("PR" + str(pos)) if wait: self.WaitForMotionDone()
@property def position(self): """The current position, in mm.""" return float(self.query('TP'))
[docs] def move_to_hardware_limit(self, direction): """Move to hardware limit. :param direction: negative or position, indicate which direction to move towards. :type direction: float """ self.write('MT'+str(direction))
@check_previous_motion_is_done def move_up(self): """Move continuously upwards (negative direction). Call :func:`stop` to stop the motion. """ if not self.moving: self.write('MV-') else: print("Previous motion is not done!") @check_previous_motion_is_done def move_down(self): """Move continuously downwards (positive direction). Call :func:`stop` to stop the motion. """ if not self.moving: self.write('MV+') else: print("Previous motion is not done!") @check_previous_motion_is_done def move_(self, direction): if not self.moving: self.write('MV+') else: print("Previous motion is not done!")
[docs] def stop(self): """Stop the current motion.""" self.write('ST')
@property def backlash(self): """set backlash compensation (in mm). This command initiates a backlash compensation algorithm when motion direction is reversed. The controller keeps track of the motion sequence and for each direction change it adds the specified nn correction. Setting nn to zero disables the backlash compensation. NOTE: The command is affective only after a home search (OR) or define home (DH) is performed on the specified axis. """ return float(self.query('BA')) @backlash.setter def backlash(self, value): self.write('BA'+str(value)) @property def resolution(self): """Return the encoder resolution, in mm.""" return float(self.query('SU')) @property def unit(self): """Return the encoder unit.""" return UNIT[int(self.query('SN'))]
[docs] def travel_limits(self, left=None, right=None): """Set or query the axis travel limits. :param float left: left (negative) travel limit :param float right: right (positive) travel limit If both limits are unspecified, returns the current settings as a dictionary. """ if left is None and right is None: left_lim = float(self.query('SL')) right_lim = float(self.query('SR')) return {'left': left_lim, 'right': right_lim} if left is not None: self.write('SL'+str(left)) if right is not None: self.write('SR'+str(right))
#def setVelocity(self, vel): UNIT = {0: 'encoder count', 1: 'motor step', 2: 'millimiter', 3: 'micrometer', 4: 'inches', 5: 'milli-inches', 6: 'micro-inches', 7: 'degree', 8: 'gradient', 9: 'radian', 10: 'milliradian', 11: 'microradian'}
[docs]class DelayedKeyboardInterrupt(object): """Context manager that capture a KeyboardInterrupt Event and ignores it for the duration of the context, triggering the interrupt when exiting. If called in a thread, does nothing. """ def __enter__(self): try: self.signal_received = False self.old_handler = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, self.handler) self.threaded = False except ValueError: self.threaded = True def handler(self, signal, frame): self.signal_received = (signal, frame) #logging.debug('SIGINT received. Delaying KeyboardInterrupt.') def __exit__(self, type, value, traceback): if not self.threaded: signal.signal(signal.SIGINT, self.old_handler) if self.signal_received: self.old_handler(*self.signal_received) else: pass