Source code for GeminiMotorDrive

# Copyright 2014-2016 Freja Nordsiek
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
This is the GeminiMotorDrive package, a Python package for controlling a
Parker Hannifin Gemini GV-6 and GT-6 servo and stepper motor drives.

Version 0.2

"""

__version__ = "0.2"

import math
import copy
import re

from . import drivers, utilities


[docs]class GeminiError(IOError): """Base exception class for this module.""" pass
[docs]class CommandError(GeminiError): """Exception executing a command.""" pass
[docs]def get_driver(driver='ASCII_RS232', *args, **keywords): """ Gets a driver for a Parker Motion Gemini drive. Gets and connects a particular driver in ``drivers`` to a Parker Motion Gemini GV-6 or GT-6 servo/stepper motor drive. The only driver currently supported is the ``'ASCII_RS232'`` driver which corresponds to ``drivers.ASCII_RS232``. Parameters ---------- driver : str, optional The driver to communicate to the particular driver with, which includes the hardware connection and possibly the communications protocol. The only driver currently supported is the ``'ASCII_RS232'`` driver which corresponds to ``drivers.ASCII_RS232``. *args : additional positional arguments Additional positional arguments to pass onto the constructor for the driver. **keywords : additional keyword arguments Additional keyword arguments to pass onto the constructor for the driver. Returns ------- drivers : drivers The connected drivers class that is connected to the drive. Raises ------ NotImplementedError If the `driver` is not supported. See Also -------- drivers drivers.ASCII_RS232 """ if driver.upper() == 'ASCII_RS232': return drivers.ASCII_RS232(*args, **keywords) else: raise NotImplementedError('Driver not supported: ' + str(driver))
[docs]class GeminiG6(object): """ Controller for a Parker Motion Gemini GV-6 or GT-6. An object to connect to and control a Parker Motion Gemini GV-6 or GT-6 servo/stepper motor drive already connected to with a particular `driver`. Parameters ---------- driver : driver Connected instance of a class in ``drivers``. Use ``get_driver`` to load one. Is stored in the attribute ``driver``. Raises ------ GeminiError If the attached device is not a Gemini GV-6 or GT-6. Attributes ---------- driver : driver Driver for communicating to the drive. energized : bool denergize_on_kill : bool encoder_resolution : int electrical_pitch : float max_velocity : float motion_commanded : bool See Also -------- get_driver """ def __init__(self, driver): #: Driver for communicating to the drive. #: #: driver #: #: A class from ``GeminiMotorDriver.drivers``. Can be loaded #: using ``get_driver``. #: #: See Also #: -------- #: get_driver self.driver = driver # Make sure that it is indeed a GV/T6, and throw an exception # otherwise. It should respond to the 'TREV' command with 'TREV' # echoed and '*TREV-GV6-L3E_D1.50_F1.00' where everything after # the 'GV6' (possibly replaced with a 'GT6') part is model # dependent. response = self.driver.send_command('TREV', timeout=1.0, immediate=True) if re.search('^!TREV\r\\*TREV-G[VT]{1}6', response[1]) is None: raise GeminiError('Not a valid Gemini GV-6 or GT-6 device.') def _get_parameter(self, name, tp, timeout=1.0, max_retries=2): """ Gets the specified drive parameter. Gets a parameter from the drive. Only supports ``bool``, ``int``, and ``float`` parameters. Parameters ---------- name : str Name of the parameter to check. It is always the command to set it but without the value. tp : type {bool, int, float} The type of the parameter. timeout : number, optional Optional timeout in seconds to use when reading the response. A negative value or ``None`` indicates that the an infinite timeout should be used. max_retries : int, optional Maximum number of retries to do per command in the case of errors. Returns ------- value : bool, int, or float The value of the specified parameter. Raises ------ TypeError If 'tp' is not an allowed type (``bool``, ``int``, ``float``). CommandError If the command to retrieve the parameter returned an error. ValueError If the value returned to the drive cannot be converted to the proper type. See Also -------- _set_parameter : Set a parameter. """ # Raise a TypeError if tp isn't one of the valid types. if tp not in (bool, int, float): raise TypeError('Only supports bool, int, and float; not ' + str(tp)) # Sending a command of name queries the state for that # parameter. The response will have name preceeded by an '*' and # then followed by a number which will have to be converted. response = self.driver.send_command(name, timeout=timeout, immediate=True, max_retries=max_retries) # If the response has an error, there are no response lines, or # the first response line isn't '*'+name; then there was an # error and an exception needs to be thrown. if self.driver.command_error(response) \ or len(response[4]) == 0 \ or not response[4][0].startswith('*' + name): raise CommandError('Couldn''t retrieve parameter ' + name) # Extract the string representation of the value, which is after # the '*'+name. value_str = response[4][0][(len(name)+1):] # Convert the value string to the appropriate type and return # it. Throw an error if it is not supported. if tp == bool: return (value_str == '1') elif tp == int: return int(value_str) elif tp == float: return float(value_str) def _set_parameter(self, name, value, tp, timeout=1.0, max_retries=2): """ Sets the specified drive parameter. Sets a parameter on the drive. Only supports ``bool``, ``int``, and ``float`` parameters. Parameters ---------- name : str Name of the parameter to set. It is always the command to set it when followed by the value. value : bool, int, or float Value to set the parameter to. tp : type {bool, int, float} The type of the parameter. timeout : number, optional Optional timeout in seconds to use when reading the response. A negative value or ``None`` indicates that the an infinite timeout should be used. max_retries : int, optional Maximum number of retries to do per command in the case of errors. Returns ------- success : bool Whether the last attempt to set the parameter was successful (``True``) or not (``False`` meaning it had an error). See Also -------- _get_parameter : Get a parameter. """ # Return False if tp isn't one of the valid types. if tp not in (bool, int, float): return False # Convert value to the string that the drive will expect. value # must first be converted to the proper type before getting # converted to str in the usual fasion. As bools need to be a # '1' or a '0', it must be converted to int before going through # str. if tp == bool: value_str = str(int(bool(value))) elif tp == int: value_str = str(int(value)) elif tp == float: value_str = str(float(value)) # Immediately set the named parameter of the drive. The command # is just the parameter name followed by the value string. response = self.driver.send_command(name+value_str, \ timeout=timeout, immediate=True, max_retries=max_retries) # Return whether the setting was successful or not. return not self.driver.command_error(response)
[docs] def pause(self, max_retries=0): """ Pauses the drive (execution of commands). Causes the drive to pause execution of commands till it is unpaused. Commands will be queued until it is unpaused. Motion is not stopped. Parameters ---------- max_retries : int, optional Maximum number of retries to do to pause the drive in the case of errors. Returns ------- success : bool Whether the last pause command (last try or retry) was successful (``True``) or not (``False`` meaning it had an error). Notes ----- The command sent to the drive is '!C'. See Also -------- unpause : Unpause the drive. """ return (not self.driver.command_error( self.driver.send_command('PS', timeout=1.0, immediate=True, max_retries=max_retries)))
[docs] def unpause(self, max_retries=0): """ Unpauses the drive. Unpauses the drive. Commands queued while it is paused will then be executed. Parameters ---------- max_retries : int, optional Maximum number of retries to do to unpause the drive in the case of errors. Returns ------- success : bool Whether the last unpause command (last try or retry) was successful (``True``) or not (``False`` meaning it had an error). Notes ----- The command sent to the drive is '!C'. See Also -------- pause : Pause the drive. """ return (not self.driver.command_error( self.driver.send_command('C', timeout=1.0, immediate=True, max_retries=max_retries)))
[docs] def stop(self, max_retries=0): """ Stops motion. The drive stops the motor. Parameters ---------- max_retries : int, optional Maximum number of retries to do to kill the drive in the case of errors. Returns ------- success : bool Whether the last stop command (last try or retry) was successful (``True``) or not (``False`` meaning it had an error). Notes ----- The command sent to the drive is '!S1'. """ return (not self.driver.command_error( self.driver.send_command('S1', timeout=1.0, immediate=True, max_retries=max_retries)))
[docs] def kill(self, max_retries=0): """ Kills the drive. The drive stops the motor and any running program. The motor will de-energize depending on the state of ``denergize_on_kill``. Parameters ---------- max_retries : int, optional Maximum number of retries to do to kill the drive in the case of errors. Returns ------- success : bool Whether the last kill command (last try or retry) was successful (``True``) or not (``False`` meaning it had an error). Notes ----- The command sent to the drive is '!K'. See Also -------- denergize_on_kill : Controls whether the motor de-energizes after the drive is killed or not. """ return (not self.driver.command_error( self.driver.send_command('K', timeout=1.0, immediate=True, max_retries=max_retries)))
[docs] def reset(self, max_retries=0): """ Resets the drive. Resets the drive, which is equivalent to a power cycling. Parameters ---------- max_retries : int, optional Maximum number of retries to do to reset the drive in the case of errors. Returns ------- success : bool Whether the last reset command (last try or retry) was successful (``True``) or not (``False`` meaning it had an error). Notes ----- The command sent to the drive is '!RESET'. """ return (not self.driver.command_error( self.driver.send_command('RESET', timeout=10.0, immediate=True, max_retries=max_retries)))
[docs] def get_program(self, n, timeout=2.0, max_retries=2): """ Get a program from the drive. Gets program 'n' from the drive and returns its commands. Parameters ---------- n : int Which program to get. timeout : number, optional Optional timeout in seconds to use when reading the response. A negative value or ``None`` indicates that the an infinite timeout should be used. max_retries : int, optional Maximum number of retries to do per command in the case of errors. Returns ------- commands : list of str ``list`` of ``str`` commands making up the program. The trailing 'END' is removed. Empty if there was an error. Notes ----- The command sent to the drive is '!TPROG PROGn'. See Also -------- set_program_profile : Sets a program or profile. run_program_profile : Runs a program or profile. """ # Send the 'TPROG PROGn' command to read the program. response = self.driver.send_command( \ 'TPROG PROG' + str(int(n)), timeout=timeout, \ immediate=True, max_retries=max_retries) # If there was an error, then return empty. Otherwise, return # the response lines but strip the leading '*' first and the # 'END' at the end of the list. if self.driver.command_error(response) \ or len(response[4]) == 0: return [] else: if '*END' in response[4]: response[4].remove('*END') return [line[1:] for line in response[4]]
[docs] def set_program_profile(self, n, commands, program_or_profile='program', timeout=1.0, max_retries=0): """ Sets a program/profile on the drive. Sets program or profile 'n' on the drive to the sequence of commands in 'commands'. If the existing program is identical, it is not overwritten (can't check this for a profile). Returns whether the program or profile was successfully set or not (if the existing one is identical, it is considered a success). Parameters ---------- n : int Which program to set. commands : list or tuple of strings ``list`` or ``tuple`` of commands to send to the drive. Each command must be a string. program_or_profile : {'program', 'profile'}, optional Whether to read a program or a profile. Anything other than these two values implies the default. timeout : number, optional Optional timeout in seconds to use when reading the response. A negative value or ``None`` indicates that the an infinite timeout should be used. max_retries : int, optional Maximum number of retries to do per command in the case of errors. Returns ------- success : bool Whether the program or profile was successfully set or not (an identical program already existing on the drive is considered a success). Notes ----- 'commands' gets wrapped between ['DEL PROGn', 'DEF PROGn'] and 'END' or the equivalent profile ones. See Also -------- get_program : Gets a program. run_program_profile : Runs a program or profile. """ # Grab the n'th program on the drive and strip commands. If we # are doing a profile, None will be used as a placeholder. if program_or_profile != 'profile': current_program = self.get_program(n, timeout=timeout, \ max_retries=max_retries+2) else: current_program = None stripped_commands = utilities.strip_commands(commands) # If the two are identical and we are doing a program, then # nothing needs to be done and the program is already set # (return True). Otherwise, it needs to be overwritten. If there # were no errors on the last command, then it was written # successfully. Otherwise, the program or profile needs to be # terminated and then deleted. if current_program is not None \ and current_program == stripped_commands: return True else: # Construct the End Of Responses for each command that will # be sent. They are '\n' for deletion and ending, but are # '\n- ' for the rest. eor = ['\n'] + (['\n- '] * (1 + len(stripped_commands))) \ + ['\n'] # The commands consist of a header that tells which program # or profile to set, the stripped commands, followed by an # 'END'. if program_or_profile != 'profile': header = ['DEL PROG'+str(int(n)), 'DEF PROG'+str(int(n))] else: header = ['DEL PROF'+str(int(n)), 'DEF PROF'+str(int(n))] responses = self.driver.send_commands(\ header + stripped_commands + ['END'], \ timeout=timeout, max_retries=max_retries, eor=eor) # Check to see if it was set successfully. If it was (the # last command had no errors), return True. Otherwise, the # program or profile needs to be ended and deleted before # returning False. if not self.driver.command_error(responses[-1]): return True else: if program_or_profile != 'profile': cmds = ['END', 'DEL PROG'+str(int(n))] else: cmds = ['END', 'DEL PROF'+str(int(n))] self.driver.send_commands(cmds, timeout=timeout, max_retries=max_retries+2) return False
[docs] def run_program_profile(self, n, program_or_profile='program', timeout=10.0): """ Runs a program/profile on the drive. Runs program or profile 'n' on the drive, grabs its output, and processes the output. The response from the drive is broken down into the echoed command (drive echoes it back), any error returned by the drive (leading '*' is stripped), and the different lines of the response; which are all returned. It is **VERY IMPORTANT** that 'timeout' is long enough for the program to run if all the output from the drive is to be collected. Parameters ---------- n : int Which program to get. program_or_profile : {'program', 'profile'}, optional Whether to read a program or a profile. Anything other than these two values implies the default. timeout : number, optional Optional timeout in seconds to use when reading the response for running a program (set to 1.0 for a profile regardless of what is given). A negative value or ``None`` indicates that the an infinite timeout should be used. Returns ------- output : list A 5-element ``list``. The elements, in order, are the sanitized command (``str``), the full response (``str``), the echoed command (``str``), any error response (``None`` if none, or the ``str`` of the error), and the lines of the response that are not the echo or error line (``list`` of ``str`` with newlines stripped). Notes ----- Sends 'RUN PROGn' (program) or 'PRUN PROFn' (profile) as the command to the drive. For a profile, the only output is that command echoed back. For a program, it will echo back each command in the program (preceeded by an '*' and followed by a line feed as opposed to a carriage return). See Also -------- get_program : Gets a program. set_program_profile : Sets a program or profile. """ if program_or_profile != 'profile': return self.driver.send_command('RUN PROG' + str(int(n)), \ timeout=timeout, immediate=True, eor='*END\n') else: return self.driver.send_command( \ 'PRUN PROF' + str(int(n)), timeout=1.0, immediate=True)
@property def energized(self): """ Energized state of the motor. ``bool`` with energized being ``True``. Setting it sends an immediate command to the drive to energize the motor. Notes ----- This uses the 'DRIVE' command. """ return self._get_parameter('DRIVE', bool) @energized.setter def energized(self, value): self._set_parameter('DRIVE', value, bool) @property def denergize_on_kill(self): """ De-energize motor when the drive is killed. ``bool`` with ``True`` meaning that whenever the drive is given the kill signal, the motor will de-energize. Setting it sends an immediate command to the drive to set it. Notes ----- This uses the 'KDRIVE' command. See Also -------- energized : Get/set the motor energized state. kill : Kill the drive. """ return self._get_parameter('KDRIVE', bool) @denergize_on_kill.setter def denergize_on_kill(self, value): self._set_parameter('KDRIVE', value, bool) @property def encoder_resolution(self): """ Encoder/Resolver resolution. ``int`` with units counts/rev (servo) or counts/pitch (linear) Setting it sends an immediate command to the drive to change the encoder/resolver resolution. Notes ----- This uses the 'ERES' command. """ return self._get_parameter('ERES', int) @encoder_resolution.setter def encoder_resolution(self, value): self._set_parameter('ERES', value, int) self.reset() @property def electrical_pitch(self): """ The motor's electrical pitch. float with units of mm It gives the spacing between two magnets (full magnetic cycle) on a linear motor. Velocities and accelerations are in units of pitches/s and pitches/s^2, so it is important. Setting it sends an immediate command to the drive to change the electrical pitch. Notes ----- This uses the 'DMEPIT' command. """ return self._get_parameter('DMEPIT', float) @electrical_pitch.setter def electrical_pitch(self, value): self._set_parameter('DMEPIT', value, float) self.reset() @property def max_velocity(self): """ The motor's velocity limit. ``float`` in motor units Notes ----- This uses the 'DMVLIM' command. """ return self._get_parameter('DMVLIM', float) @max_velocity.setter def max_velocity(self, value): self._set_parameter('DMVLIM', value, float) @property def motion_commanded(self): """ Whether motion is commanded or not. ``bool`` Can't be set. Notes ----- It is the value of the first bit of the 'TAS' command. """ rsp = self.driver.send_command('TAS', immediate=True) if self.driver.command_error(rsp) or len(rsp[4]) != 1 \ or rsp[4][0][0:4] != '*TAS': return False else: return (rsp[4][0][4] == '1')