# Copyright (C) 2013-2014 Michal Minar <miminar@redhat.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
"""
Defines base command class for all endpoint commands. Those having no children.
"""
import abc
import inspect
import re
from docopt import docopt
from lmi.scripts.common import errors
from lmi.scripts.common import formatter
from lmi.scripts.common import get_logger
from lmi.scripts.common.formatter import command as fcmd
from lmi.scripts.common.command import base
from lmi.scripts.common.command import meta
from lmi.scripts.common.command import util
LOG = get_logger(__name__)
[docs]def opt_name_sanitize(opt_name):
"""
Make a function parameter name out of option name. This replaces any
character not suitable for python identificator with ``'_'`` and
make the whole string lowercase.
:param string opt_name: Option name.
:returns: Modified option name.
:rtype: string
"""
return re.sub(r'[^a-zA-Z0-9]+', '_', opt_name).lower()
[docs]def options_dict2kwargs(options):
"""
Convert option name from resulting ``docopt`` dictionary to a valid python
identificator token used as function argument name.
:param dictionary options: Dictionary returned by docopt call.
:returns: New dictionary with keys passable to function as argument
names.
:rtype: dictionary
"""
# (new_name, value) for each pair in options dictionary
kwargs = {}
# (new_name, name)
orig_names = {}
for name, value in options.items():
for (reg, func) in (
(util.RE_OPT_BRACKET_ARGUMENT, lambda m: m.group('name')),
(util.RE_OPT_UPPER_ARGUMENT, lambda m: m.group('name')),
(util.RE_OPT_SHORT_OPTION, lambda m: m.group(0)),
(util.RE_OPT_LONG_OPTION, lambda m: m.group(0)),
(util.RE_COMMAND_NAME, lambda m: m.group(0))):
match = reg.match(name)
if match:
new_name = func(match)
break
else:
raise errors.LmiError(
'Failed to convert argument "%s" to function option.' %
name)
if new_name == '--':
continue # ignore double dash
new_name = opt_name_sanitize(new_name)
if new_name in kwargs:
raise errors.LmiError('Option clash for "%s" and "%s", which both'
' translate to "%s".' % (name, orig_names[new_name], new_name))
kwargs[new_name] = value
orig_names[new_name] = name
return kwargs
[docs]class LmiEndPointCommand(base.LmiBaseCommand):
"""
Base class for any leaf command.
List of additional recognized properties:
``CALLABLE`` : ``tuple``
Associated function. Will be wrapped in
:py:meth:`LmiEndPointCommand.execute` method and will be accessible
directly as a ``cmd.execute.dest`` property. It may be specified
either as a string in form ``"<module_name>:<callable>"`` or as a
reference to callable itself.
``ARG_ARRAY_SUFFIX`` : ``str``
String appended to every option parsed by ``docopt`` having list as
an associated value. It defaults to empty string. This modification
is applied before calling
:py:meth:`LmiEndPointCommand.verify_options` and
:py:meth:`LmiEndPointCommand.transform_options`.
``FORMATTER`` : callable
Default formatter factory for instances of given command. This
factory accepts an output stream as the only parameter and returns
an instance of :py:class:`~lmi.scripts.common.formatter.Formatter`.
Using metaclass:
:py:class:`.meta.EndPointCommandMetaClass`.
"""
__metaclass__ = meta.EndPointCommandMetaClass
def __init__(self, *args, **kwargs):
super(LmiEndPointCommand, self).__init__(*args, **kwargs)
self._formatter = None
# saved options dictionary after call to transform_options()
self._options = None
@abc.abstractmethod
[docs] def execute(self, *args, **kwargs):
"""
Subclasses must override this method to pass given arguments to
command library function. This function shall be specified in
``CALLABLE`` property.
"""
raise NotImplementedError("execute method must be overriden"
" in subclass")
@classmethod
[docs] def dest_pos_args_count(cls):
"""
Number of positional arguments the associated function takes from
command. These arguments are created by the command alone -- they do
not belong to options in usage string. Function can take additional
positional arguments that need to be covered by usage string.
:rtype: integer
"""
dest = getattr(cls.execute, "dest", cls.execute)
abstract = dest == cls.execute and util.is_abstract_method(
cls, 'execute', True)
# if the destination function is not yet defined (abstract is True)
# let's assume it's not a method => 0 positional arguments needed
return 1 if not abstract and inspect.ismethod(dest) else 0
[docs] def run_with_args(self, args, kwargs):
"""
Process end-point arguments and exit.
:param list args: Positional arguments to pass to associated
function in command library.
:param dictionary kwargs: Keyword arguments as a dictionary.
:returns: Exit code of application.
:rtype: integer
"""
return self.execute(*args, **kwargs)
@property
def _make_end_point_args(self, options):
"""
Creates a pair of positional and keyword arguments for a call to
associated function from command line options. All keyword
options not expected by target function are removed.
:param dictionary options: Output of ``docopt`` parser.
:returns: Positional and keyword arguments as a pair.
:rtype: tuple
"""
# if execute method does not have a *dest* attribute, then it's
# itself a destination
dest = getattr(self.execute, "dest", self.execute)
argspec = inspect.getargspec(dest)
kwargs = options_dict2kwargs(options)
# number of positional arguments not covered by usage string
pos_args_count = self.dest_pos_args_count()
to_remove = []
# if associated function takes keyword arguments in a single
# dictionary (kwargs), we can pass all options
if argspec.keywords is None:
# otherwise we need to remove any unhandled
for opt_name in kwargs:
if opt_name not in argspec.args[pos_args_count:]:
if opt_name not in self.cmd_name_parts:
LOG().debug('Option "%s" not handled in function "%s",'
' ignoring.', opt_name, self.cmd_name)
to_remove.append(opt_name)
for opt_name in to_remove:
# remove options unhandled by function
del kwargs[opt_name]
args = []
for arg_name in argspec.args[pos_args_count:]:
if arg_name not in kwargs:
raise errors.LmiCommandError(
self.__module__, self.__class__.__name__,
'registered command "%s" expects option "%s", which'
' is not covered in usage string'
% (self.cmd_name, arg_name))
args.append(kwargs.pop(arg_name))
return args, kwargs
def _preprocess_options(self, options):
"""
This method may be overriden by
:py:class:`~.meta.EndPointCommandMetaClass`
as a result of processing ``ARG_ARRAY_SUFFIX`` and other properties
modifying names of parsed options.
This should not be overriden in command class's body.
:param dictionary options: The result of ``docopt`` parser invocation
which can be modified by this method.
"""
pass
def _parse_args(self, args):
"""
Run ``docopt`` command line parser on given list of arguments.
Removes all unrelated commands from created dictionary of options.
:param list args: List of command line arguments just after the
current command.
:returns: Dictionary with parsed options. Please refer to
docopt_ documentation for more informations.
:rtype: dictionary
.. _docopt: http://docopt.org/
"""
full_args = self.get_cmd_name_parts(for_docopt=True) + args
options = docopt(self.get_usage(), full_args, help=False)
self._preprocess_options(options)
# remove all command names from options
cmd = self.parent
while cmd is not None and not cmd.has_own_usage():
cmd = cmd.parent
if cmd is not None:
for scn in cmd.child_commands():
try:
del options[scn]
except KeyError:
LOG().warn('Usage string of "%s.%s" command does not'
' contain registered command "%s" command.',
cmd.__module__, cmd.__class__.__name__, scn)
# remove also the root command name from options
if cmd is not None and cmd.cmd_name in options:
del options[cmd.cmd_name]
return options
[docs] def verify_options(self, options):
"""
This method can be overriden in subclasses to check, whether the
options given on command line are valid. If any flaw is discovered, an
:py:exc:`~lmi.scripts.common.errors.LmiInvalidOptions` exception shall
be raised. Any returned value is ignored.
.. note::
This is run before :py:meth:`transform_options()` method.
:param dictionary options: Dictionary as returned by ``docopt`` parser.
"""
pass
[docs] def produce_output(self, data):
"""
This method can be use to render and print results with default
formatter.
:param data: Is an object expected by the
:py:meth:`~lmi.scripts.common.formatter.Formatter.produce_output`
method of formatter.
"""
self.formatter.produce_output(data)
[docs] def run(self, args):
"""
Create options dictionary from input arguments, verify them,
transform them, make positional and keyword arguments out of them and
pass them to ``process_session()``.
:param list args: List of command arguments.
:returns: Exit code of application.
:rtype: integer
"""
options = self._parse_args(args)
self.verify_options(options)
self.transform_options(options)
self._options = options.copy()
args, kwargs = self._make_end_point_args(options)
return self.run_with_args(args, kwargs)
def _print_errors(self, error_list, new_line=True):
"""
Print list of errors.
:param list errors: Errors to print. Each error is a ``tuple``: ::
(hostname, [error, error])
Where ``error`` may be a test description or an instance of
exception.
:param new_line: Whether to print the new line before new error
table is printed.
"""
fmt = formatter.ErrorFormatter(self.app.stderr)
if new_line:
fmt.out.write('\n')
if error_list:
new_table_cmd = fcmd.NewTableCommand("There %s %d error%s" %
( 'were' if len(error_list) > 1 else 'was'
, len(error_list)
, 's' if len(error_list) > 1 else ''))
fmt.produce_output((new_table_cmd, ))
for hostname, host_errors in error_list:
fmt.produce_output((fcmd.NewHostCommand(hostname), ))
fmt.produce_output(host_errors)