Source code for periphery.gpio

import os
import select

[docs]class GPIOError(IOError): """Base class for GPIO errors.""" pass
[docs]class GPIO(object): def __init__(self, pin, direction="preserve"): """Instantiate a GPIO object and open the sysfs GPIO corresponding to the specified pin, with the specified direction. `direction` can be "in" for input; "out" for output, initialized to low; "high" for output, initialized to high; "low" for output, initialized to low, or "preserve" for preserving existing direction. Default is "preserve". Args: pin (int): Linux pin number. direction (str): pin direction, can be "in", "out", "high", "low", or "preserve". Returns: GPIO: GPIO object. Raises: GPIOError: if an I/O or OS error occurs. TypeError: if `pin` or `direction` types are invalid. ValueError: if `direction` value is invalid. """ self._fd = None self._pin = None self._open(pin, direction) def __del__(self): self.close() def __enter__(self): pass def __exit__(self, t, value, traceback): self.close() def _open(self, pin, direction): if not isinstance(pin, int): raise TypeError("Invalid pin type, should be integer.") if not isinstance(direction, str): raise TypeError("Invalid direction type, should be string.") if direction.lower() not in ["in", "out", "high", "low", "preserve"]: raise ValueError("Invalid direction, can be: \"in\", \"out\", \"high\", \"low\", \"preserve\".") gpio_path = "/sys/class/gpio/gpio%d" % pin if not os.path.isdir(gpio_path): # Export the pin try: with open("/sys/class/gpio/export", "w") as f_export: f_export.write("%d\n" % pin) except IOError as e: raise GPIOError(e.errno, "Exporting GPIO: " + e.strerror) # Write direction, if it's not to be preserved direction = direction.lower() if direction != "preserve": try: with open("/sys/class/gpio/gpio%d/direction" % pin, "w") as f_direction: f_direction.write(direction + "\n") except IOError as e: raise GPIOError(e.errno, "Setting GPIO direction: " + e.strerror) # Open value try: self._fd = os.open("/sys/class/gpio/gpio%d/value" % pin, os.O_RDWR) except OSError as e: raise GPIOError(e.errno, "Opening GPIO: " + e.strerror) self._pin = pin # Methods
[docs] def read(self): """Read the state of the GPIO. Returns: bool: ``True`` for high state, ``False`` for low state. Raises: GPIOError: if an I/O or OS error occurs. """ # Read value try: buf = os.read(self._fd, 2) except OSError as e: raise GPIOError(e.errno, "Reading GPIO: " + e.strerror) # Rewind try: os.lseek(self._fd, 0, os.SEEK_SET) except OSError as e: raise GPIOError(e.errno, "Rewinding GPIO: " + e.strerror) if buf[0] == b"0"[0]: return False elif buf[0] == b"1"[0]: return True raise GPIOError(None, "Unknown GPIO value: \"%s\"" % buf[0])
[docs] def write(self, value): """Set the state of the GPIO to `value`. Args: value (bool): ``True`` for high state, ``False`` for low state. Raises: GPIOError: if an I/O or OS error occurs. TypeError: if `value` type is not bool. """ if not isinstance(value, bool): raise TypeError("Invalid value type, should be bool.") # Write value try: if value: os.write(self._fd, b"1\n") else: os.write(self._fd, b"0\n") except OSError as e: raise GPIOError(e.errno, "Writing GPIO: " + e.strerror) # Rewind try: os.lseek(self._fd, 0, os.SEEK_SET) except OSError as e: raise GPIOError(e.errno, "Rewinding GPIO: " + e.strerror)
[docs] def poll(self, timeout=None): """Poll a GPIO for the edge event configured with the .edge property. `timeout` can be a positive number for a timeout in seconds, 0 for a non-blocking poll, or negative or None for a blocking poll. Defaults to blocking poll. Args: timeout (int, float, None): timeout duration in seconds. Returns: bool: ``True`` if an edge event occurred, ``False`` on timeout. Raises: GPIOError: if an I/O or OS error occurs. TypeError: if `timeout` type is not None or int. """ if not isinstance(timeout, (int, float, type(None))): raise TypeError("Invalid timeout type, should be integer, float, or None.") # Setup epoll p = select.epoll() p.register(self._fd, select.EPOLLIN | select.EPOLLET | select.EPOLLPRI) # Poll twice, as first call returns with current state for _ in range(2): events = p.poll(timeout) # If GPIO edge interrupt occurred if events: # Rewind try: os.lseek(self._fd, 0, os.SEEK_SET) except OSError as e: raise GPIOError(e.errno, "Rewinding GPIO: " + e.strerror) return True return False
[docs] def close(self): """Close the sysfs GPIO. Raises: GPIOError: if an I/O or OS error occurs. """ if self._fd is None: return try: os.close(self._fd) except OSError as e: raise GPIOError(e.errno, "Closing GPIO: " + e.strerror) self._fd = None
# Immutable properties @property def fd(self): """Get the file descriptor for the underlying sysfs GPIO "value" file of the GPIO object. :type: int """ return self._fd @property def pin(self): """Get the sysfs GPIO pin number. :type: int """ return self._pin @property def supports_interrupts(self): """Get whether or not this GPIO supports edge interrupts, configurable with the .edge property. :type: bool """ return os.path.isfile("/sys/class/gpio/gpio%d/edge" % self._pin) # Mutable properties def _get_direction(self): # Read direction try: with open("/sys/class/gpio/gpio%d/direction" % self._pin, "r") as f_direction: direction = f_direction.read() except IOError as e: raise GPIOError(e.errno, "Getting GPIO direction: " + e.strerror) return direction.strip() def _set_direction(self, direction): if not isinstance(direction, str): raise TypeError("Invalid direction type, should be string.") if direction.lower() not in ["in", "out", "high", "low"]: raise ValueError("Invalid direction, can be: \"in\", \"out\", \"high\", \"low\".") # Write direction try: direction = direction.lower() with open("/sys/class/gpio/gpio%d/direction" % self._pin, "w") as f_direction: f_direction.write(direction + "\n") except IOError as e: raise GPIOError(e.errno, "Setting GPIO direction: " + e.strerror) direction = property(_get_direction, _set_direction) """Get or set the GPIO's direction. Can be "in", "out", "high", "low". Direction "in" is input; "out" is output, initialized to low; "high" is output, initialized to high; and "low" is output, initialized to low. Raises: GPIOError: if an I/O or OS error occurs. TypeError: if `direction` type is not str. ValueError: if `direction` value is invalid. :type: str """ def _get_edge(self): # Read edge try: with open("/sys/class/gpio/gpio%d/edge" % self._pin, "r") as f_edge: edge = f_edge.read() except IOError as e: raise GPIOError(e.errno, "Getting GPIO edge: " + e.strerror) return edge.strip() def _set_edge(self, edge): if not isinstance(edge, str): raise TypeError("Invalid edge type, should be string.") if edge.lower() not in ["none", "rising", "falling", "both"]: raise ValueError("Invalid edge, can be: \"none\", \"rising\", \"falling\", \"both\".") # Write edge try: edge = edge.lower() with open("/sys/class/gpio/gpio%d/edge" % self._pin, "w") as f_edge: f_edge.write(edge + "\n") except IOError as e: raise GPIOError(e.errno, "Setting GPIO edge: " + e.strerror) edge = property(_get_edge, _set_edge) """Get or set the GPIO's interrupt edge. Can be "none", "rising", "falling", "both". Raises: GPIOError: if an I/O or OS error occurs. TypeError: if `edge` type is not str. ValueError: if `edge` value is invalid. :type: str """ # String representation def __str__(self): if self.supports_interrupts: return "GPIO %d (fd=%d, direction=%s, supports interrupts, edge=%s)" % (self._pin, self._fd, self.direction, self.edge) return "GPIO %d (fd=%d, direction=%s, no interrupts)" % (self._pin, self._fd, self.direction)