# coding=utf-8
# pynput
# Copyright (C) 2015-2016 Moses Palmér
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import contextlib
import itertools
import Xlib.display
import Xlib.threaded
import Xlib.XK
from . import AbstractListener
from .xorg_keysyms import *
# Create a display to verify that we have an X connection
display = Xlib.display.Display()
display.close()
del display
[docs]class X11Error(Exception):
"""An error that is thrown at the end of a code block managed by a
:func:`display_manager` if an *X11* error occurred.
"""
pass
@contextlib.contextmanager
[docs]def display_manager(display):
"""Traps *X* errors and raises an :class:``X11Error`` at the end if any
error occurred.
This handler also ensures that the :class:`Xlib.display.Display` being
managed is sync'd.
:param Xlib.display.Display display: The *X* display.
:return: the display
:rtype: Xlib.display.Display
"""
errors = []
def handler(*args):
errors.append(args)
old_handler = display.set_error_handler(handler)
try:
yield display
display.sync()
finally:
display.set_error_handler(old_handler)
if errors:
raise X11Error(errors)
[docs]def _find_mask(display, symbol):
"""Returns the mode flags to use for a modifier symbol.
:param Xlib.display.Display display: The *X* display.
:param str symbol: The name of the symbol.
:return: the modifier mask
"""
# Get the key code for the symbol
modifier_keycode = display.keysym_to_keycode(
Xlib.XK.string_to_keysym(symbol))
for index, keycodes in enumerate(display.get_modifier_mapping()):
for keycode in keycodes:
if keycode == modifier_keycode:
return 1 << index
return 0
[docs]def alt_mask(display):
"""Returns the *alt* mask flags.
The first time this function is called for a display, the value is cached.
Subsequent calls will return the cached value.
:param Xlib.display.Display display: The *X* display.
:return: the modifier mask
"""
if not hasattr(display, '__alt_mask'):
display.__alt_mask = _find_mask(display, 'Alt_L')
return display.__alt_mask
[docs]def alt_gr_mask(display):
"""Returns the *alt* mask flags.
The first time this function is called for a display, the value is cached.
Subsequent calls will return the cached value.
:param Xlib.display.Display display: The *X* display.
:return: the modifier mask
"""
if not hasattr(display, '__altgr_mask'):
display.__altgr_mask = _find_mask(display, 'Mode_switch')
return display.__altgr_mask
[docs]def keysym_is_latin_upper(keysym):
"""Determines whether a *keysym* is an upper case *latin* character.
This is true only if ``XK_A`` <= ``keysym`` <= ` XK_Z``.
:param in keysym: The *keysym* to check.
"""
return Xlib.XK.XK_A <= keysym <= Xlib.XK.XK_Z
[docs]def keysym_is_latin_lower(keysym):
"""Determines whether a *keysym* is a lower case *latin* character.
This is true only if ``XK_a`` <= ``keysym`` <= ` XK_z``.
:param in keysym: The *keysym* to check.
"""
return Xlib.XK.XK_a <= keysym <= Xlib.XK.XK_z
[docs]def keysym_group(a, b):
"""Generates a group from two *keysyms*.
The implementation of this function comes from:
Within each group, if the second element of the group is ``NoSymbol``,
then the group should be treated as if the second element were the same
as the first element, except when the first element is an alphabetic
*KeySym* ``K`` for which both lowercase and uppercase forms are
defined.
In that case, the group should be treated as if the first element were
the lowercase form of ``K`` and the second element were the uppercase
form of ``K``.
This function assumes that *alphabetic* means *latin*; this assumption
appears to be consistent with observations of the return values from
``XGetKeyboardMapping``.
:param a: The first *keysym*.
:param b: The second *keysym*.
:return: a tuple conforming to the description above
"""
if b == Xlib.XK.NoSymbol:
if keysym_is_latin_upper(a):
return (Xlib.XK.XK_a + a - Xlib.XK.XK_A, a)
elif keysym_is_latin_lower(a):
return (a, Xlib.XK.XK_A + a - Xlib.XK.XK_a)
else:
return (a, a)
else:
return (a, b)
[docs]def keysym_normalize(keysym):
"""Normalises a list of *keysyms*.
The implementation of this function comes from:
If the list (ignoring trailing ``NoSymbol`` entries) is a single
*KeySym* ``K``, then the list is treated as if it were the list
``K NoSymbol K NoSymbol``.
If the list (ignoring trailing ``NoSymbol`` entries) is a pair of
*KeySyms* ``K1 K2``, then the list is treated as if it were the list
``K1 K2 K1 K2``.
If the list (ignoring trailing ``NoSymbol`` entries) is a triple of
*KeySyms* ``K1 K2 K3``, then the list is treated as if it were the list
``K1 K2 K3 NoSymbol``.
This function will also group the *keysyms* using :func:`keysym_group`.
:param keysyms: A list of keysyms.
:return: the tuple ``(group_1, group_2)`` or ``None``
"""
# Remove trailing NoSymbol
stripped = list(reversed(list(
itertools.dropwhile(
lambda n: n == Xlib.XK.NoSymbol,
reversed(keysym)))))
if not stripped:
return
elif len(stripped) == 1:
return (
keysym_group(stripped[0], Xlib.XK.NoSymbol),
keysym_group(stripped[0], Xlib.XK.NoSymbol))
elif len(stripped) == 2:
return (
keysym_group(stripped[0], stripped[1]),
keysym_group(stripped[0], stripped[1]))
elif len(stripped) == 3:
return (
keysym_group(stripped[0], stripped[1]),
keysym_group(stripped[2], Xlib.XK.NoSymbol))
elif len(stripped) >= 6:
# TODO: Find out why this is necessary; using only the documented
# behaviour may lead to only a US layout being used?
return (
keysym_group(stripped[0], stripped[1]),
keysym_group(stripped[4], stripped[5]))
else:
return (
keysym_group(stripped[0], stripped[1]),
keysym_group(stripped[2], stripped[3]))
[docs]def index_to_shift(display, index):
"""Converts an index in a *key code* list to the corresponding shift state.
:param Xlib.display.Display display: The display for which to retrieve the
shift mask.
:param int index: The keyboard mapping *key code* index.
:return: a shift mask
"""
return (
(1 << 0 if index & 1 else 0) |
(alt_gr_mask(display) if index & 2 else 0))
[docs]def shift_to_index(display, shift):
"""Converts an index in a *key code* list to the corresponding shift state.
:param Xlib.display.Display display: The display for which to retrieve the
shift mask.
:param int index: The keyboard mapping *key code* index.
:retur: a shift mask
"""
return (
(1 if shift & 1 else 0) +
(2 if shift & alt_gr_mask(display) else 0))
[docs]def keyboard_mapping(display):
"""Generates a mapping from *keysyms* to *key codes* and required
modifier shift states.
:param Xlib.display.Display display: The display for which to retrieve the
keyboard mapping.
:return: the keyboard mapping
"""
mapping = {}
shift_mask = 1 << 0
group_mask = alt_gr_mask(display)
# Iterate over all keysym lists in the keyboard mapping
min_keycode = display.display.info.min_keycode
keycode_count = display.display.info.max_keycode - min_keycode + 1
for index, keysyms in enumerate(display.get_keyboard_mapping(
min_keycode, keycode_count)):
key_code = index + min_keycode
# Normalise the keysym list to yield a tuple containing the two groups
normalized = keysym_normalize(keysyms)
if not normalized:
continue
# Iterate over the groups to extract the shift and modifier state
for groups, group in zip(normalized, (False, True)):
for keysym, shift in zip(groups, (False, True)):
if not keysym:
continue
shift_state = 0 \
| (shift_mask if shift else 0) \
| (group_mask if group else 0)
# Prefer already known lesser shift states
if keysym in mapping and mapping[keysym][1] < shift_state:
continue
mapping[keysym] = (key_code, shift_state)
return mapping
[docs]def symbol_to_keysym(symbol):
"""Converts a symbol name to a *keysym*.
:param str symbol: The name of the symbol.
:return: the corresponding *keysym*, or ``0`` if it cannot be found
"""
# First try simple translation
keysym = Xlib.XK.string_to_keysym(symbol)
if keysym:
return keysym
# If that fails, try checking a module attribute of Xlib.keysymdef.xkb
if not keysym:
try:
return getattr(Xlib.keysymdef.xkb, 'XK_' + symbol, 0)
except:
return SYMBOLS.get(symbol, (0,))[0]
[docs]class ListenerMixin(object):
"""A mixin for *X* event listeners.
Subclasses should set a value for :attr:`_EVENTS` and implement
:meth:`_handle`.
"""
#: The events for which to listen
_EVENTS = tuple()
#: We use this instance for parsing the binary data
_EVENT_PARSER = Xlib.protocol.rq.EventField(None)
[docs] class _WrappedException(Exception):
"""Raised by the handler wrapper when an exception is raised in the
handler, or when the listener is stopped to escape the recording.
In the former case, the root exception is passed as the first argument
to the constructor, and in the latter case no arguments are passed.
"""
pass
def _run(self):
self._display_stop = Xlib.display.Display()
self._display_record = Xlib.display.Display()
with display_manager(self._display_record) as d:
self._context = d.record_create_context(
0,
[Xlib.ext.record.AllClients],
[{
'core_requests': (0, 0),
'core_replies': (0, 0),
'ext_requests': (0, 0, 0, 0),
'ext_replies': (0, 0, 0, 0),
'delivered_events': (0, 0),
'device_events': self._EVENTS,
'errors': (0, 0),
'client_started': False,
'client_died': False}])
try:
self._initialize(self._display_stop)
self._mark_ready()
self._display_record.record_enable_context(
self._context, self._handler)
except self._WrappedException as e:
if e.args:
# TODO: Handle
pass
finally:
self._display_record.record_free_context(self._context)
self._display_stop.close()
self._display_record.close()
def _stop(self):
if not hasattr(self, '_context'):
self.wait()
try:
self._display_stop.record_disable_context(self._context)
except:
# Ignore errors at this point
pass
@AbstractListener._emitter
[docs] def _handler(self, events):
"""The callback registered with *X* for mouse events.
This method will parse the response and call the callbacks registered
on initialisation.
:param events: The events passed by *X*. This is a binary block
parsable by :attr:`_EVENT_PARSER`.
"""
# If
if not self.running:
raise self._WrappedException()
try:
data = events.data
while len(data):
event, data = self._EVENT_PARSER.parse_binary_value(
data, self._display_record.display, None, None)
self._handle(self._display_stop, event)
except self.StopException:
raise
except BaseException as e:
raise self._WrappedException(e)
[docs] def _initialize(self, display):
"""Initialises this listener.
This method is called immediately before the event loop, from the
handler thread.
:param display: The display being used.
"""
pass
[docs] def _handle(self, display, event):
"""The device specific callback handler.
This method calls the appropriate callback registered when this
listener was created based on the event.
:param display: The display being used.
:param event: The event.
"""
pass