Source code for botlib.udp

# udp.py
#
#

""" relay txt through a udp port listener. """

from .object import Object

import logging
import socket
import time

[docs]class UDP(Object): """ UDP class to echo txt through the bot, use the mad-udp program to send. """ def __init__(self): super().__init__(self) self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) self._sock.setblocking(1) self._status = "start" self._starttime = time.time()
[docs] def start(self, *args, **kwargs): """ start the UDP server. """ from .space import kernel kernel.launch(self.server)
[docs] def server(self, host="", port="", *args, **kwargs): from .space import runtime logging.warn("# start udp %s:%s" % (host or self.cfg.host, port or self.cfg.port)) runtime.register("UDP", self) self._sock.bind((host or self.cfg.host, port or self.cfg.port)) self._status = "running" while self._status: (txt, addr) = self._sock.recvfrom(64000) if not self._status: break data = str(txt.rstrip(), "utf-8") if not data: break self.output(data, addr) self.ready() logging.warn("# stop udp %s:%s" % (self.cfg.host, self.cfg.port))
[docs] def exit(self): """ shutdown the UDP server. """ self._status = "" self._sock.settimeout(0.01) self._sock.sendto(bytes("bla", "utf-8"), (self.cfg.host, self.cfg.port))
[docs] def output(self, txt, addr=None): """ output to all bot on fleet. """ from .space import fleet, partyline try: (passwd, text) = txt.split(" ", 1) except: return text = text.replace("\00", "") if passwd == self.cfg.password: for orig, sockets in partyline.items(): for sock in sockets: sock.write(text) sock.write("\n") sock.flush()
[docs]def init(event): udp = UDP() udp.start() return udp
[docs]def shutdown(event): from .space import runtime udps = runtime.get("UDP", []) for udp in udps: udp.exit()