Source code for horetu.config

import os
import re
import csv
import logging
from copy import copy
from io import StringIO
from collections import MutableMapping, OrderedDict, defaultdict

logger = logging.getLogger(__name__)

class Colon(csv.Dialect):
    delimiter = ':'
    doublequote = False
    escapechar = '\\'
    lineterminator = '\n'
    quotechar = None
    quoting = csv.QUOTE_NONE
    skipinitialspace = True
    strict = False

    def reader(Class, fp):
        return csv.reader(fp, dialect=Class)

    def writer(Class, fp):
        return csv.writer(fp, dialect=Class)

class Space(Colon):
    delimiter = ' '

    def read(Class, strrow):
        x = strrow.rstrip(Class.lineterminator) + Class.lineterminator
        with StringIO(x) as fp:
            section = tuple(next(csv.reader(fp, dialect=Class)))
        return section

    def write(Class, row):
        with StringIO() as fp:
            w = csv.writer(fp, dialect=Class)
            strrow = fp.getvalue().rstrip(Class.lineterminator)
        return strrow

[docs]class Configuration(OrderedDict): def __init__(self, *args, **kwargs): if len(args) == 1 and len(kwargs) == 0: first_arg = args[0] else: first_arg = None if isinstance(first_arg, Configuration): self.update(first_arg) elif isinstance(first_arg, str): if os.path.isfile(first_arg): with open(first_arg) as fp: self.update(Configuration.read_file(fp)) elif hasattr(first_arg, 'read'): self.update(Configuration.read_file(first_arg)) else: super(Configuration, self).__init__(*args, **kwargs) @staticmethod def _parse(key): if isinstance(key, str): return (key,) elif key == None or isinstance(key, tuple): return key else: raise KeyError(key) def __getitem__(self, key): try: value = super(Configuration, self).__getitem__(self._parse(key)) except KeyError: value = Section() super(Configuration, self).__setitem__(self._parse(key), value) return value def __setitem__(self, key, value): if isinstance(value, Section): super(Configuration, self).__setitem__(self._parse(key), value) elif isinstance(value, dict): super(Configuration, self).__setitem__(self._parse(key), Section(value)) else: raise ValueError('Not section type: %s' % repr(value)) @classmethod def read_input(Class, section=None, positional=(), keyword1=(), var_positional=(), keyword2=()): ''' :returns: The configuration :type section: Iterable of tuple of str, or None :param section: Subcommand to apply the configuration to, or None (default) to apply it to all subcommands :type positional: Iterable of tuple of str and anything :param positional: Positional arguments :param keyword1: Keyword arguments that came before a variable positional argument :type var_positional: Iterable of tuple of str and anything :param var_positional: Variable positional arguments (``*args``) :param keyword2: Other keyword arguments :rtype: Configuration :returns: The configuration ''' merged_inputs = list(positional + keyword1) if 1 == len(var_positional): name, values = var_positional[0] for value in values: merged_inputs.append((name, value)) merged_inputs.extend(keyword2) c = Class() for key, value in merged_inputs: c[section].append(key, value) return c @classmethod def read_file(Class, source): ''' :type source: str or file-like object with "r" mode :param source: Configuration file :rtype: Configuration :returns: The configuration ''' if isinstance(source, str): fp = StringIO(source) elif hasattr(source, 'read'): fp = source else: raise TypeError('Unrecognized source: %s' % repr(source)) rows = list(Colon.reader(fp)) fp.close() lengths = set(map(len, rows)) c = Class() if lengths == {2}: for argument, value in rows: c[None].append(argument, value) elif lengths == {3}: for section, argument, value in rows: c[].append(argument, value) else: raise ValueError('Inconsistent formatting in DSV file') return c @classmethod def read_filename(Class, source): with open(source) as fp: c = Class.read_file(fp) return c def write(configuration, with_sections=True): ''' :param Configuration configuration: The configuration values to write :param bool with_sections: Save different values for arguments of the same name in different sections :rtype: str :returns: Configuration file ''' with StringIO() as fp: w = Colon.writer(fp) for section, s in configuration.items(): for argument, values in s.items(): if with_sections: left = Space.write(section), else: left = tuple() for value in values: w.writerow(left + (argument, value,)) text = fp.getvalue() return text @classmethod def merge(Class, *configurations): ''' :param Configuration configurations: The configurations to merge ''' c = Class() for configuration in configurations: for section, s in configuration.items(): for argument, value in s.items(): c[section][argument] = value return c
class ListArgument(list): pass class Section(OrderedDict): def __init__(self, *args, **kwargs): if len(args) == 1 and len(kwargs) == 0 and isinstance(args[0], Section): self.update(args[0]) else: super(Section, self).__init__(*args, **kwargs) def flatten(self, defaults={}, list_type=tuple()): out = OrderedDict() # Apply configurations. for key, value in self.items(): if key in list_type: if key not in out: out[key] = ListArgument() out[key].extend(value) else: out[key] = value[-1] # Apply defaults. for key in defaults: if key not in out: out[key] = defaults[key] return out @staticmethod def _check(key): if not isinstance(key, str): raise KeyError('Not str type: %s' % key) def __getitem__(self, key): self._check(key) return super(Section, self).__getitem__(key) def __setitem__(self, key, values): self._check(key) if not isinstance(values, list) and all(isinstance(v, str) for v in values): raise TypeError('Argument values must be a list of str') return super(Section, self).__setitem__(key, values) def append(self, key, value): if isinstance(value, str): if not key in self: self[key] = [] self[key].append(value) else: raise TypeError('Argument value must be str, not %s (key %s)' % \ (type(value), key)) def extend(self, key, values): for value in values: self.append(key, value)