Source code for botlib.rest
# services/rest.py
#
#
""" rest interface. """
from .object import Object
import http.server
import logging
import botlib
import http
import time
import os
[docs]class REST(http.server.HTTPServer, Object):
allow_reuse_address = True
daemon_thread = True
path = os.path.join("runtime", "rest")
def __init__(self, *args, **kwargs):
http.server.HTTPServer.__init__(self, *args, **kwargs)
Object.__init__(self)
self.host = args[0]
self._last = time.time()
self._starttime = time.time()
self._status = "start"
[docs] def exit(self):
self._status = ""
[docs] def start(self):
from .space import launcher, runtime
logging.warn("# rest http://%s:%s" % self.host)
self._status = "ok"
runtime.register("REST", self)
self.ready()
launcher.launch(self.serve_forever, name="REST.server")
[docs] def request(self):
self._last = time.time()
[docs] def error(self, request, addr):
logging.warn('# error rest %s %s' % (request, addr))
[docs]class RESTHandler(http.server.BaseHTTPRequestHandler):
[docs] def setup(self):
http.server.BaseHTTPRequestHandler.setup(self)
self._ip = self.client_address[0]
self._size = 0
[docs] def write_header(self, type='text/plain'):
self.send_response(200)
self.send_header('Content-type', '%s; charset=%s ' % (type, "utf-8"))
self.send_header('Server', botlib.__version__)
self.end_headers()
[docs] def do_GET(self):
from .space import cfg
fn = cfg.workdir + os.sep + self.path
try:
f = open(fn, "r")
txt = f.read()
f.close()
except (TypeError, FileNotFoundError, IsADirectoryError):
self.send_response(404)
self.end_headers()
return
txt = txt.replace("\\n", "\n")
txt = txt.replace("\\t", "\t")
self.write_header()
self.wfile.write(bytes(txt, "utf-8"))
self.wfile.flush()
[docs] def log(self, code):
logging.warn('# log rest %s code %s path %s' % (self.address_string(), code, self.path))
[docs]def init(event):
from .space import cfg, kernel
global server
try:
server = REST(("localhost", int(cfg.port) or 10102), RESTHandler)
except OSError as ex:
logging.error("rest error: %s" % str(ex))
return
kernel.launch(server.start)
return server
[docs]def shutdown(event):
from .space import runtime
rest = runtime.get("REST", [])
for object in rest:
object.exit()