Source code for aslack.slack_bot.bot

"""A Slack bot using the real-time messaging API."""

from itertools import count
import json
import logging
from random import randint
from textwrap import dedent

from aiohttp import MsgType, ws_connect

from aslack import __name__ as mod_name, __version__
from aslack.slack_api import SlackApiError, SlackBotApi
from aslack.utils import truncate

logger = logging.getLogger(__name__)


[docs]class SlackBot: """Base class Slack bot. Arguments: id_ (:py:class:`str`): The bot's Slack ID. user (:py:class:`str`): The bot's friendly name. api (:py:class:`SlackApi`): The Slack API wrapper. Attributes: address_as (:py:class:`str`): The text that appears at the start of messages addressed to this bot (e.g. ``'<@user>: '``). full_name (:py:class:`str`): The name of the bot as it appears in messages about the bot (e.g. ``'<@user>'``). socket (:py:class:`aiohttp.web.WebSocketResponse`): The web socket to respond on. API_AUTH_ENDPOINT (:py:class:`str`): Test endpoint for API authorisation. INSTRUCTIONS (:py:class:`str`): Message to give the user when they request instructions. MESSAGE_FILTERS (:py:class:`list`): Default filters for incoming messages RTM_HANDSHAKE (:py:class:`dict`): Expected handshake message from RTM API. RTM_START_ENDPOINT (:py:class:`str`): Start endpoint for real-time messaging. VERSION (:py:class:`str`): Version string to show to the user (if not overridden, will show the aSlack version). """ API_AUTH_ENDPOINT = 'auth.test' INSTRUCTIONS = dedent(""" These are the default instructions for an aSlack bot. Override these as appropriate for your specific needs. """) MESSAGE_FILTERS = [] RTM_HANDSHAKE = {'type': 'hello'} RTM_START_ENDPOINT = 'rtm.start' VERSION = ' '.join((mod_name, __version__)) def __init__(self, id_, user, api): self.id_ = id_ self.user = user self.api = api self.full_name = '<@{}>'.format(id_) self.address_as = '{}: '.format(self.full_name) self._msg_ids = count(randint(1, 1000)) self.socket = None
[docs] async def join_rtm(self, filters=None): """Join the real-time messaging service. Arguments: filters (:py:class:`dict`, optional): Dictionary mapping message filters to the functions they should dispatch to. Use a :py:class:`collections.OrderedDict` if precedence is important; only one filter, the first match, will be applied to each message. """ if filters is None: filters = [cls(self) for cls in self.MESSAGE_FILTERS] url = await self._get_socket_url() logger.debug('Connecting to %r', url) async with ws_connect(url) as socket: first_msg = await socket.receive() self._validate_first_message(first_msg) self.socket = socket async for message in socket: if message.tp == MsgType.text: await self.handle_message(message, filters) elif message.tp in (MsgType.closed, MsgType.error): if not socket.closed: await socket.close() self.socket = None break logger.info('Left real-time messaging.')
[docs] async def handle_message(self, message, filters): """Handle an incoming message appropriately. Arguments: message (:py:class:`aiohttp.websocket.Message`): The incoming message to handle. filters (:py:class:`list`): The filters to apply to incoming messages. """ data = self._unpack_message(message) logger.debug(data) if data.get('type') == 'error': raise SlackApiError( data.get('error', {}).get('msg', str(data)) ) elif self.message_is_to_me(data): text = data['text'][len(self.address_as):].strip() if text == 'help': return self._respond( channel=data['channel'], text=self._instruction_list(filters), ) elif text == 'version': return self._respond( channel=data['channel'], text=self.VERSION, ) for _filter in filters: if _filter.matches(data): logger.debug('Response triggered') async for response in _filter: self._respond(channel=data['channel'], text=response)
[docs] def message_mentions_me(self, data): """If you send a message that mentions me""" return (data.get('type') == 'message' and self.full_name in data.get('text', ''))
[docs] def message_is_to_me(self, data): """If you send a message directly to me""" return (data.get('type') == 'message' and data.get('text', '').startswith(self.address_as))
@classmethod
[docs] async def from_api_token(cls, token=None, api_cls=SlackBotApi): """Create a new instance from the API token. Arguments: token (:py:class:`str`, optional): The bot's API token (defaults to ``None``, which means looking in the environment). api_cls (:py:class:`type`, optional): The class to create as the ``api`` argument for API access (defaults to :py:class:`aslack.slack_api.SlackBotApi`). Returns: :py:class:`SlackBot`: The new instance. """ api = api_cls.from_env() if token is None else api_cls(api_token=token) data = await api.execute_method(cls.API_AUTH_ENDPOINT) return cls(data['user_id'], data['user'], api)
def _format_message(self, channel, text): """Format an outgoing message for transmission. Note: Adds the message type (``'message'``) and incremental ID. Arguments: channel (:py:class:`str`): The channel to send to. text (:py:class:`str`): The message text to send. Returns: :py:class:`str`: The JSON string of the message. """ payload = {'type': 'message', 'id': next(self._msg_ids)} payload.update(channel=channel, text=text) return json.dumps(payload) async def _get_socket_url(self): """Get the WebSocket URL for the RTM session. Warning: The URL expires if the session is not joined within 30 seconds of the API call to the start endpoint. Returns: :py:class:`str`: The socket URL. """ data = await self.api.execute_method( self.RTM_START_ENDPOINT, simple_latest=True, no_unreads=True, ) return data['url'] def _instruction_list(self, filters): """Generates the instructions for a bot and its filters. Note: The guidance for each filter is generated by combining the docstrings of the predicate filter and resulting dispatch function with a single space between. The class's :py:attr:`INSTRUCTIONS` and the default help command are added. Arguments: filters (:py:class:`list`): The filters to apply to incoming messages. Returns: :py:class:`str`: The bot's instructions. """ return '\n\n'.join([ self.INSTRUCTIONS.strip(), '*Supported methods:*', 'If you send "@{}: help" to me I reply with these ' 'instructions.'.format(self.user), 'If you send "@{}: version" to me I reply with my current ' 'version.'.format(self.user), ] + [filter.description() for filter in filters]) def _respond(self, channel, text): """Respond to a message on the current socket. Args: channel (:py:class:`str`): The channel to send to. text (:py:class:`str`): The message text to send. """ result = self._format_message(channel, text) if result is not None: logger.info( 'Sending message: %r', truncate(result, max_len=50), ) self.socket.send_str(result) @staticmethod def _unpack_message(msg): """Unpack the data from the message. Arguments: msg (:py:class:`aiohttp.websocket.Message`): The message to unpack. Returns: :py:class:`dict`: The loaded data. Raises: :py:class:`AttributeError`: If there is no data attribute. :py:class:`json.JSONDecodeError`: If the data isn't valid JSON. """ return json.loads(msg.data) @classmethod def _validate_first_message(cls, msg): """Check the first message matches the expected handshake. Note: The handshake is provided as :py:attr:`RTM_HANDSHAKE`. Arguments: msg (:py:class:`aiohttp.Message`): The message to validate. Raises: :py:class:`SlackApiError`: If the data doesn't match the expected handshake. """ data = cls._unpack_message(msg) logger.debug(data) if data != cls.RTM_HANDSHAKE: raise SlackApiError('Unexpected response: {!r}'.format(data)) logger.info('Joined real-time messaging.')