Source code for pyhocon

import re
import os
import socket
import urllib2
from pyhocon.config_tree import ConfigTree, ConfigSubstitution, ConfigList, ConfigValues, ConfigUnquotedString
from pyhocon.exceptions import ConfigSubstitutionException
from pyparsing import *


[docs]class ConfigFactory(object): @staticmethod
[docs] def parse_file(filename): """Parse file :param filename: filename :type filename: basestring :return: Config object :type return: Config """ with open(filename, 'r') as fd: content = fd.read() return ConfigFactory.parse_string(content, os.path.dirname(filename))
@staticmethod
[docs] def parse_URL(url, timeout=None): """Parse URL :param url: url to parse :type url: basestring :return: Config object :type return: Config """ socket_timeout = socket._GLOBAL_DEFAULT_TIMEOUT if timeout is None else timeout fd = urllib2.urlopen(url, timeout=socket_timeout) try: content = fd.read() return ConfigFactory.parse_string(content, os.path.dirname(url)) finally: fd.close()
@staticmethod
[docs] def parse_string(content, basedir=None): """Parse URL :param url: url to parse :type url: basestring :return: Config object :type return: Config """ return ConfigParser().parse(content, basedir)
[docs]class ConfigParser(object): """ Parse HOCON files: https://github.com/typesafehub/config/blob/master/HOCON.md """ REPLACEMENTS = { '\\\n': '\n', '\\n': '\n', '\\r': '\r', '\\t': '\t', '\\=': '=', '\\#': '#', '\\!': '!', '\\"': '"' } @staticmethod
[docs] def parse(content, basedir=None): """parse a HOCON content :param content: HOCON content to parse :type content: basestring :return: a ConfigTree or a list """ def norm_string(value): for k, v in ConfigParser.REPLACEMENTS.items(): value = value.replace(k, v) return value def unescape_string(tokens): return ConfigUnquotedString(norm_string(tokens[0])) def unescape_multi_string(tokens): # remove the first and last 3 " return norm_string(tokens[0][3: -3]) def convert_number(tokens): n = tokens[0] try: return int(n) except ValueError: return float(n) substitutions = [] SUBSTITUTION = "\$\{(?P<variable>[^}]+)\}(?P<ws>\s*)" def create_substitution(token): # remove the ${ and } match = re.match(SUBSTITUTION, token[0]) variable = match.group('variable') ws = match.group('ws') substitution = ConfigSubstitution(variable, ws) substitutions.append(substitution) return substitution def include_config(token): url = None file = None if len(token) == 1: # include "test" if token[0].startswith("http://") or token[0].startswith("https://") or token[0].startswith("file://"): url = token[0] else: file = token[0] elif len(token) == 2: # include url("test") or file("test") if token[0] == 'url': url = token[1] else: file = token[1] if url is not None: obj = ConfigFactory.parse_URL(url) if file is not None: path = file if basedir is None else os.path.join(basedir, file) obj = ConfigFactory.parse_file(path) return [obj] ParserElement.setDefaultWhitespaceChars(' \t') dict_expr = Forward() list_expr = Forward() assign_expr = Forward() true_expr = Keyword("true", caseless=True).setParseAction(replaceWith(True)) false_expr = Keyword("false", caseless=True).setParseAction(replaceWith(False)) null_expr = Keyword("null", caseless=True).setParseAction(replaceWith(None)) key = QuotedString('"', escChar='\\') | Word(alphanums + '._') eol = Word('\n\r').suppress() eol_comma = Word('\n\r,').suppress() comment = (Optional(eol_comma) + (Literal('#') | Literal('//')) - SkipTo(eol)).suppress() number_expr = Regex('[+-]?(\d*\.\d+|\d+(\.\d+)?)([eE]\d+)?(?=[ \t]*([\$\}\],#\n\r]|//))', re.DOTALL).setParseAction(convert_number) # multi line string using """ # Using fix described in http://pyparsing.wikispaces.com/share/view/3778969 multiline_string = Regex('""".*?"""', re.DOTALL | re.UNICODE).setParseAction(unescape_multi_string) # single quoted line string quoted_string = QuotedString(quoteChar='"', escChar='\\', multiline=True) # unquoted string that takes the rest of the line until an optional comment # we support .properties multiline support which is like this: # line1 \ # line2 \ # so a backslash precedes the \n unquoted_string = Regex(r'(\\[ \t]*[\r\n]|[^\[\{\n\]\}#,=\$])+?(?=(\$|[ \t]*(//|[\}\],#\n\r])))', re.DOTALL).setParseAction(unescape_string) substitution_expr = Regex('\$\{[^\}]+\}\s*').setParseAction(create_substitution) string_expr = multiline_string | quoted_string | unquoted_string value_expr = number_expr | true_expr | false_expr | null_expr | string_expr | substitution_expr values_expr = ConcatenatedValueParser(value_expr - ZeroOrMore(comment | (value_expr - Optional(Literal('\\') - eol).suppress()) | value_expr)) # multiline if \ at the end of the line list_expr << ListParser(Suppress('[') - ZeroOrMore(comment | values_expr | eol_comma) - Suppress(']')) - ZeroOrMore(list_expr) include_expr = (Keyword("include", caseless=True).suppress() - ( quoted_string | ((Keyword('url') | Keyword('file')) - Literal('(').suppress() - quoted_string - Literal(')').suppress())))\ .setParseAction(include_config) # for a dictionary : or = is optional # last zeroOrMore is because we can have t = {a:4} {b: 6} {c: 7} which is dictionary concatenation inside_dict_expr = ConfigTreeParser(ZeroOrMore(comment | include_expr | assign_expr | eol_comma)) dict_expr << Suppress('{') - inside_dict_expr + Suppress('}') - ZeroOrMore(dict_expr) assign_dict_expr = Suppress(Optional(oneOf(['=', ':']))) + dict_expr # special case when we have a value assignment where the string can potentially be the remainder of the line assign_value_or_list_expr = Suppress(oneOf(['=', ':'])) + (comment | list_expr | values_expr | eol_comma) assign_expr << Group(key + (assign_dict_expr | assign_value_or_list_expr)) # the file can be { ... } where {} can be omitted or [] config_expr = ZeroOrMore(comment | eol) \ + (list_expr | dict_expr | inside_dict_expr) \ + ZeroOrMore(comment | eol_comma) config = config_expr.parseString(content, parseAll=True)[0] ConfigParser._resolve_substitutions(config, substitutions) return config
@staticmethod def _resolve_variable(config, substitution): variable = substitution.variable try: return config.get(variable) except: # default to environment variable value = os.environ.get(variable) if value is None: raise ConfigSubstitutionException("Cannot resolve variable ${{{variable}}}".format(variable=variable)) elif isinstance(value, ConfigList) or isinstance(value, ConfigTree): raise ConfigSubstitutionException( "Cannot substitute variable ${{{variable}}} because it does not point to a string, int, float, boolean or null (type)".format( variable=variable, type=value.__class__.__name__) ) return value @staticmethod def _resolve_substitutions(config, substitutions): if len(substitutions) > 0: _substitutions = set(substitutions) for i in range(len(substitutions)): unresolved = False for substitution in list(_substitutions): resolved_value = ConfigParser._resolve_variable(config, substitution) if isinstance(resolved_value, ConfigValues): unresolved = True else: # replace token by substitution config_values = substitution.parent # if there is more than one element in the config values then it's a string tokens = substitution.parent.tokens config_values.put(substitution.index, resolved_value) config_values.parent[config_values.key] = config_values.transform() _substitutions.remove(substitution) if not unresolved: break else: raise ConfigSubstitutionException("Cannot resolve {variables}. Check for cycles.".format( variables=', '.join('${' + substitution.variable + '}' for substitution in _substitutions) )) return config
[docs]class ListParser(TokenConverter): """Parse a list [elt1, etl2, ...] """ def __init__(self, expr=None): super(ListParser, self).__init__(expr) self.saveAsList = True
[docs] def postParse(self, instring, loc, token_list): """Create a list from the tokens :param instring: :param loc: :param token_list: :return: """ config_list = ConfigList(token_list) return [config_list]
[docs]class ConcatenatedValueParser(TokenConverter): def __init__(self, expr=None): super(ConcatenatedValueParser, self).__init__(expr) self.parent = None self.key = None
[docs] def postParse(self, instring, loc, token_list): config_values = ConfigValues(token_list) return config_values.transform()
[docs]class ConfigTreeParser(TokenConverter): """ Parse a config tree from tokens """ def __init__(self, expr=None): super(ConfigTreeParser, self).__init__(expr) self.saveAsList = True
[docs] def postParse(self, instring, loc, token_list): """Create ConfigTree from tokens :param instring: :param loc: :param token_list: :return: """ config_tree = ConfigTree() for element in token_list: # from include then merge items expanded_tokens = element._dictionary.iteritems() if isinstance(element, ConfigTree) else [element] for tokens in expanded_tokens: # key, value1, value2, ... key = tokens[0] values = tokens[1:] # empty string if len(values) == 0: config_tree.put(key, '') else: if isinstance(values[0], list): # Merge arrays config_tree.put(key, values[0], False) for value in values[1:]: config_tree.put(key, value, True) else: # Merge dict for value in values: if isinstance(value, ConfigList): conf_value = list(value) elif isinstance(value, ConfigValues): conf_value = value value.parent = config_tree value.key = key else: conf_value = value config_tree.put(key, conf_value) return config_tree