Source code for habitat.parser

# Copyright 2011 (C) Adam Greig, Daniel Richman
#
# This file is part of habitat.
#
# habitat is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# habitat is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with habitat.  If not, see <http://www.gnu.org/licenses/>.

"""
The parser interprets incoming telemetry strings into useful telemetry data.
"""

import time
import base64
import logging
import hashlib
import M2Crypto
import os
import os.path
from copy import deepcopy

from habitat.message_server import SimpleSink, Message
from habitat.utils import dynamicloader

__all__ = ["ParserSink", "ParserModule"]

logger = logging.getLogger("habitat.parser")

[docs]class ParserSink(SimpleSink): """ The Parser Sink The parser sink is the interface between the message server and the parser modules. It is responsible for receiving raw telemetry from the message server, giving it to modules which turn it into beautiful telemetry data, and then sending that back to the message server. """
[docs] def setup(self): """ Initialises the sink, adding the types of telemetry we care about to our types list and setting up lists of modules Scans the certs/ca directory to find CA certificates and loads them. """ self.add_type(Message.RECEIVED_TELEM) self.modules = [] for module in self.server.db["parser_config"]["modules"]: m = dynamicloader.load(module["class"]) dynamicloader.expecthasmethod(m, "pre_parse") dynamicloader.expecthasmethod(m, "parse") dynamicloader.expecthasnumargs(m.pre_parse, 1) dynamicloader.expecthasnumargs(m.parse, 2) module["module"] = m(self) self.modules.append(module) self.cert_path = self.server.program.options["certs_dir"] self.certificate_authorities = [] ca_path = os.path.join(self.cert_path, 'ca') for f in os.listdir(ca_path): ca = M2Crypto.X509.load_cert(os.path.join(ca_path, f)) self.certificate_authorities.append(ca) self.loaded_certs = {}
[docs] def message(self, message): """ Handles a new message from the server, hopefully turning it into parsed telemetry data. This function attempts to determine which of the loaded parser modules should be used to parse the message, and which config file it should be given to do so. For the priority ordered list self.modules, resolution proceeds as:: for module in modules: module.pre_parse to find a callsign if a callsign is found: look up the configuration document for that callsign if a configuration document is found: check it specifies that this module should be used if it does: module.parse to get the data return if all modules were attempted but no config docs were found: for module in modules: if this module has a default configuration: module.pre_parse to find a callsign if a callsign is found: use this module's default configuration module.parse to get the data return if we still can't get any data: error Note that in the loops below, the pre_parse, _find_config_doc and parse methods will all raise a ValueError if failure occurs, continuing the loop. The output is a new message of type Message.TELEM, with message.data being the parsed data as well as any special fields, identified by a leading underscore in the key. These fields may include: * _protocol which gives the parser module name that was used to decode this message * _used_default_config is a boolean value set to True if a default configuration was used for the module as no specific configuration could be found * _raw gives the original submitted data * _sentence gives the ASCII sentence from the UKHAS parser * _extra_data from the UKHAS parser, where the sentence contained more data than the UKHAS parser was configured for Parser modules should be wary when outputting field names with leading underscores. """ data = None original_data = message.data["string"] raw_data = base64.b64decode(original_data) # Try using real configs for module in self.modules: try: data = self._pre_filter(raw_data, module) callsign = module["module"].pre_parse(data) config_doc = self._find_config_doc(callsign, message.time_created) config = config_doc["payloads"][callsign] if config["sentence"]["protocol"] == module["name"]: data = self._intermediate_filter(data, config) data = module["module"].parse(data, config["sentence"]) data = self._post_filter(data, config) data["_protocol"] = module["name"] data["_flight"] = config_doc["_id"] break except ValueError as e: err = "ValueError from {0}: '{1}'" logger.debug(err.format(module["name"], e)) continue # If that didn't work, try using default configurations if type(data) is not dict: for module in self.modules: try: config = module["default_config"] except KeyError: continue try: data = self._pre_filter(raw_data, module) callsign = module["module"].pre_parse(data) data = self._intermediate_filter(data, config) data = module["module"].parse(data, config["sentence"]) data = self._post_filter(data, config) data["_protocol"] = module["name"] data["_used_default_config"] = True logger.info("Using a default configuration document") break except ValueError as e: errstr = "Error from {0} with default config: '{1}'" logger.debug(errstr.format(module["name"], e)) continue if type(data) is dict: data["_raw"] = original_data # Every key apart from string contains RECEIVED_TELEM metadata data["_listener_metadata"] = deepcopy(message.data) del data["_listener_metadata"]["string"] new_message = Message(message.source, Message.TELEM, message.time_created, message.time_uploaded, data) self.server.push_message(new_message) logger.info("{module} parsed data from {callsign} succesfully" \ .format(module=module["name"], callsign=callsign)) else: logger.info("Unable to parse any data from '{d}'" \ .format(d=original_data))
def _find_config_doc(self, callsign, time_created): """ Check Couch for a configuration document we can use for this payload. The Couch view first tries to find any Flight documents with this callsign in their payloads dictionary, but will also return any Sandbox documents with this payload if no valid Flight documents could be found. Flight documents only count if their end time is in the future. If no config can be found, raises :py:exc:`ValueError <exceptions.ValueError>`, otherwise returns the sentence dictionary out of the payload config dictionary. """ startkey = [callsign, time_created] result = self.server.db.view("habitat/payload_config", limit=1, include_docs=True, startkey=startkey).first() if not result or callsign not in result["doc"]["payloads"]: err = "No configuration document for callsign '{0}' found." err = err.format(callsign) logger.warning(err) raise ValueError(err) return result["doc"] def _pre_filter(self, data, module): """ Apply all the module's pre filters, in order, to the data and return the resulting filtered data. """ if "pre-filters" in module: for f in module["pre-filters"]: data = self._filter(data, f) return data def _intermediate_filter(self, data, config): """ Apply all the intermediate (between getting the callsign and parsing) filters specified in the payload's configuration document and return the resulting filtered data. """ if "filters" in config: if "intermediate" in config["filters"]: for f in config["filters"]["intermediate"]: data = self._filter(data, f) return data def _post_filter(self, data, config): """ Apply all the post (after parsing) filters specified in the payload's configuration document and return the resulting filtered data. """ if "filters" in config: if "post" in config["filters"]: for f in config["filters"]["post"]: data = self._filter(data, f) return data def _filter(self, data, f): """ Load and run a filter from a dictionary specifying type, the relevant callable/code and maybe a config. Returns the filtered data, or leaves the data untouched if the filter could not be run. """ if "type" not in f: logger.warning("A filter didn't have a type: " + repr(f)) return data if f["type"] == "normal": return self._normal_filter(data, f) elif f["type"] == "hotfix": return self._hotfix_filter(data, f) else: return data def _normal_filter(self, data, f): """Load and run a filter specified by a callable.""" config = None if "config" in f: config = f["config"] fil = dynamicloader.load(f["callable"]) if not dynamicloader.iscallable(fil): logger.warning("A loaded filter wasn't callable: " + repr(f)) return data if dynamicloader.hasnumargs(fil, 1): return fil(data) elif dynamicloader.hasnumargs(fil, 2): return fil(data, config) else: logger.warning("A loaded filter had wrong number of args: " + repr(f)) return data def _hotfix_filter(self, data, f): """Load a filter specified by some code in the database. Check its authenticity by verifying its certificate, then run if OK.""" if "code" not in f: logger.warning("A hotfix didn't have any code: " + repr(f)) return data if "signature" not in f: logger.warning("A hotfix didn't have a signature: " + repr(f)) return data if "certificate" not in f: logger.warning("A hotfix didn't specify a certificate: " + repr(f)) return data if os.path.basename(f["certificate"]) != f["certificate"]: logger.warning("A hotfix's specified certificate was invalid: " + \ repr(f)) return data # Load requested certificate try: cert = self._get_certificate(f["certificate"]) except RuntimeError: logger.error("Could not load certificate '" + f["certificate"] + "'.") return data # Check the certificate is valid valid = False for ca_cert in self.certificate_authorities: if cert.verify(ca_cert.get_pubkey()): valid = True break if not valid: logger.error("Certificate is not signed by a recognised CA.") return data # Check the signature is valid digest = hashlib.sha256(f["code"]).hexdigest() sig = base64.b64decode(f["signature"]) try: ok = cert.get_pubkey().get_rsa().verify(digest, sig, 'sha256') except M2Crypto.RSA.RSAError: logger.error("Signature is invalid.") return data if not ok: logger.error("Hotfix signature is not valid") return data logger.debug("Compiling a hotfix") body = "def f(data):\n" for line in f["code"].split("\n"): body += " " + line + "\n" env = {} try: code = compile(body, "<filter>", "exec") exec code in env except (SyntaxError, TypeError): logger.warning("Hotfix code didn't compile: " + repr(f)) return data logger.debug("Hotfix compiled, executing") try: return env["f"](data) except: # this is a pretty hardcore except! it'l catch anything. # but that's desirable as who knows what this crazy code # might do. logger.warning("An exception occured when trying to run a " + "hotfix: " + repr(f)) return data def _get_certificate(self, certname): """Fetch the specified certificate, returning the X509 object. Uses an instance cache to prevent too much filesystem I/O.""" if certname in self.loaded_certs: return self.loaded_certs[certname] cert_path = os.path.join(self.cert_path, "certs", certname) if os.path.exists(cert_path): try: cert = M2Crypto.X509.load_cert(cert_path) except (IOError, M2Crypto.X509.X509Error): raise RuntimeError("Certificate could not be loaded.") self.loaded_certs[certname] = cert return cert else: raise RuntimeError("Certificate could not be loaded.")
[docs]class ParserModule(object): """ **ParserModules** are classes which turn radio strings into useful data. ParserModules * can be given various configuration parameters. * should probably inherit from **ParserModule**. """ def __init__(self, parser): """Store the parser reference for later use.""" self.parser = parser self.sensors = parser.server.program.sensor_manager
[docs] def pre_parse(self, string): """ Go though a string and attempt to extract a callsign, returning it as a string. If no callsign could be extracted, a :py:exc:`ValueError <exceptions.ValueError>` is raised. """ raise ValueError()
[docs] def parse(self, string, config): """ Go through a string which has been identified as the format this parser module should be able to parse, extracting the data as per the information in the config parameter, which is the ``sentence`` dictionary extracted from the payload's configuration document. """ raise ValueError()