Source code for horetu.interfaces.cli

import sys
import itertools
import textwrap
import shutil
import subprocess
from collections import OrderedDict
from triedict import triedict
from . import _util
from . import _bind
from .. import exceptions
from ..program import Program
from ..annotations import Flag

import os
XDG_OPEN = ('xdg-open', '/dev/stdin')

[docs]def cli(program, argv=sys.argv, return_=False, handlers=None, freedesktop=False, exit_=sys.exit, stdout=sys.stdout.buffer, stderr=sys.stderr.buffer): ''' :type program: Program, callable, list, or dict :param program: The program for which to produce the interface :param list argv: :py:obj:`sys.argv` by default :param bool return_: If this is ``True``, simply return the result of the function; do not write stuff to ``exit_``, ``stdout``, or ``stderr``. :param function exit_: :py:func:`sys.exit` by default :param dict handlers: Mapping from content type to handler program :param bool freedesktop: Fall back to xdg-open if a handler is not available :type stdout: Binary file :param stdout: :py:func:`sys.stdout.buffer` by default :type stderr: Binary file :param stderr: :py:func:`sys.stderr.buffer` by default ''' if not handlers: handlers = {} argv = list(argv) # Prevent myself from modifying the input. if not isinstance(program, Program): program = Program(program) if not = argv[0] def write(stream, fp): for line in stream: fp.write(line.encode('utf-8') + b'\n') fp.flush() if return_: section, raw_arguments = _bind.web_section(program, argv[1:]) function = program[section] function.require_help() arguments = from_argv(function, raw_arguments) return'raw', arguments) else: try: try: section, raw_arguments = _bind.cli_section(program, argv[1:]) try: function = program[section] function.require_help() arguments = from_argv(function, raw_arguments) raw_output ='raw', arguments) rt = function.return_type if rt and (freedesktop or (rt.parsed in handlers)): handler = handlers.get(rt.parsed, XDG_OPEN) if isinstance(raw_output, bytes): stream = [raw_output] else: stream = raw_output p = subprocess.Popen(handler, stdin=subprocess.PIPE, stderr=stderr) for element in stream: if isinstance(element, bytes): p.stdin.write(element) else: raise ValueError('When you set a content type, the function must return either bytes or an iterable of bytes.') p.stdin.close() p.wait() else: for line in stdout.write(line + b'\n') stdout.flush() except exceptions.BaseHoretuException as e: e.section = section raise e except exceptions.ShowHelp as e: write(man(program, e), stdout) exit_(0) except exceptions.CouldNotParse as e: stdout.flush() write(usage(program, e), stderr) exit_(2) except exceptions.Error as e: stdout.flush() write(usage(program, e), stderr) exit_(3) else: exit_(0) except BrokenPipeError: stderr.close()
def from_argv(function, raw_arguments): mixed = iter(raw_arguments) keyword2_params = triedict({ param for param in function.keyword2}) positional = [] keyword2 = [] before_double_hyphen = True while True: try: raw_arg = next(mixed) except StopIteration: break arg = raw_arg[1:] if before_double_hyphen and raw_arg.startswith('-'): if raw_arg == '--': before_double_hyphen = False elif arg in keyword2_params: p = keyword2_params[arg] if isinstance(p, Flag): value = Flag.YES else: try: value = next(mixed) except StopIteration: raise exceptions.CouldNotParse('Needs parameter -- %s' % arg) keyword2.append((, value)) else: raise exceptions.CouldNotParse('Unknown option -- %s' % arg) else: positional.append(raw_arg) arguments = _bind.positionals(function, positional) for name, value in keyword2: if not name in arguments: arguments[name] = [] arguments[name].append(value) return arguments def _section_prefix(prog, x): for section in sorted(prog, key=len, reverse=True): if tuple(section[:len(x)]) == x: return x return tuple() def _endpoints(program, section): for subsection in program.subset(section): s = program[subsection] signature = '' for x in s.positional: signature += ' %s' % for x in s.keyword1: signature += ' [%s]' % if s.var_positional: signature += ' [%s ...]' % s.var_positional[0].name yield subsection, signature[1:] def _join(f): def decorator(*args, **kwargs): return '\n'.join(f(*args, **kwargs)) return decorator @_join def _format_arg(prefix, indent, param): columns, _ = shutil.get_terminal_size((80, 20)) whitespace = ' ' * len( n = columns - len( - indent - len(prefix) - 2 first = True for right in textwrap.wrap(param.description, n): if first: left = + ': ' first = False else: left = whitespace + ' ' yield (' ' * indent) + left + right def usage(prog, h): p = { 'name':, 'message': h.message, 'endpoints': _endpoints(prog, h.section if h.section in prog else tuple()), } if p['message']: yield 'error: %(message)s' % p for i, (section, signature) in enumerate(p['endpoints']): yield _usage_line(prog, section, signature, 'usage: ' if i == 0 else ' ') def _usage_line(program, section, signature, prefix): function = program[section] function.require_help() help_flag = function.help_flag() q = { 'prefix': prefix, 'name':, 'sub': (' ' + ' '.join(section)).rstrip() + ' ', 'signature': signature, 'sep': ' [--] ' if any(signature) else ' ', 'help': ('[-%s] ' % help_flag) if help_flag else '', } return '%(prefix)s%(name)s%(sub)s%(help)s[options]%(sep)s%(signature)s' % q def man(prog, h): columns, _ = shutil.get_terminal_size((80, 20)) f = prog[h.section] p = { 'name':, 'endpoints': _endpoints(prog, h.section), 'description': f.description, 'args': (_format_arg(' ', 2, a) for a in f.all_positionals()), 'kwargs': (_format_arg('-', 2, a) for a in f.keyword2), } yield 'SYNOPSIS' for section, signature in p['endpoints']: yield _usage_line(prog, section, signature, ' ') if p['description']: yield 'DESCRIPTION' for line in textwrap.wrap(p['description'], columns-2): yield ' ' + line if p['args']: yield 'INPUTS' for arg in p['args']: yield arg try: arg except NameError: yield ' (None)' yield 'OPTIONS' for kwarg in p['kwargs']: yield kwarg