Source code for lmi.shell.LMIMethod

# Copyright (C) 2012-2014 Peter Hatina <phatina@redhat.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.

import sys
import logging
import signal
import socket
import urlparse
import threading
import collections

from lmi.shell.compat import *

from lmi.shell.LMIBaseObject import LMIWrapperBaseObject
from lmi.shell.LMIShellConfig import LMIShellConfig
from lmi.shell.LMIObjectFactory import LMIObjectFactory
from lmi.shell.LMIFormatter import LMIMethodFormatter
from lmi.shell.LMIFormatter import LMIMofFormatter
from lmi.shell.LMIReturnValue import LMIReturnValue
from lmi.shell.LMIConstantValues import LMIConstantValuesParamProp
from lmi.shell.LMIConstantValues import LMIConstantValuesMethodReturnType
from lmi.shell.LMIIndicationListener import LMIIndicationListener

from lmi.shell.LMIJob import lmi_is_job_finished
from lmi.shell.LMIJob import lmi_is_job_completed
from lmi.shell.LMIJob import lmi_is_job_terminated
from lmi.shell.LMIJob import lmi_is_job_killed
from lmi.shell.LMIJob import lmi_is_job_exception
from lmi.shell.LMIJob import JOB_NOT_FINISHED
from lmi.shell.LMIJob import JOB_FINISH_DELAYED
from lmi.shell.LMIJob import JOB_FINISH_EARLY

from lmi.shell.LMIUtil import LMIPassByRef
from lmi.shell.LMIUtil import lmi_get_use_exceptions
from lmi.shell.LMIUtil import lmi_raise_or_dump_exception
from lmi.shell.LMIUtil import lmi_transform_to_cim_param
from lmi.shell.LMIUtil import lmi_transform_to_lmi
from lmi.shell.LMIUtil import lmi_wrap_cim_class

from lmi.shell.LMIExceptions import *

logger = logging.getLogger(__name__)


[docs]class LMISignalHelperBase(object): """ Base signal handling class. """ @staticmethod
[docs] def signal(signo, handler): """ Calls signal() for signo, handler and returns the old signal handler. If signo is list of signals, the signal() call is applied for each signo. If handler is also list, each signal from signo will be handled by corresponding handler. In such case, tuple of previous handlers will be returned. """ if isinstance(signo, (list, tuple)): if not isinstance(handler, (list, tuple)): handler = [handler] * len(signo) signo_handler = zip(signo, handler) if not signo_handler: return (None,) * len(signo) old_handlers = [] for signal, handler in signo_handler: old_handlers.append( LMISignalHelperBase.signal_core(signal, handler)) return tuple(old_handlers) else: return LMISignalHelperBase.signal_core(signal, handler)
@staticmethod
[docs] def signal_core(signo, handler): """ Wrapper method for signal.signal(). In case of ValueError, it returns None, old signal handler otherwise. If handler is None, default signal handler is set for such signal. """ try: if handler is None: handler = signal.SIG_DFL return signal.signal(signo, handler) except ValueError: return None
[docs]class LMIMethodSignalHelper(LMISignalHelperBase): """ Helper class which takes care of signal (de)registration and handling. """ INTERRUPT_SIGNALS = ( signal.SIGINT, signal.SIGTERM) def __init__(self): super(LMIMethodSignalHelper, self).__init__() self._signal_handled = False self._signal_prev_handlers = (signal.SIG_DFL,) * 2 self._callbacks = collections.OrderedDict()
[docs] def signal_attach(self): """ Registers *SIGINT* and *SIGTERM* signals to local handler in which, the flags for each signal are modified, if such signal is caught. """ def handler(sig, action): self.signal_handler(sig, action) self._signal_handled = False self._signal_prev_handlers = self.signal( self.INTERRUPT_SIGNALS, handler)
[docs] def signal_detach(self): """ Unregisters *SIGINT* and *SIGTERM* handler and removes all the attached callbacks. """ self.signal(self.INTERRUPT_SIGNALS, self._signal_prev_handlers)
[docs] def signal_handled(self): """ :returns: True, if any of *SIGINT* or *SIGTERM* has been caught; \ False otherwise """ return self._signal_handled
[docs] def callback_attach(self, cb_name, cb): """ Registers a callback, which will be called when a *SIGINT* or *SIGTERM* is caught. :param string cb_name: callback name :param cb: callable object, which takes zero arguments """ self._callbacks[cb_name] = cb
[docs] def callback_detach(self, cb_name): """ Removes a callback from the callback dictionary. :param string cb_name: callback name """ self._callbacks.pop(cb_name)
[docs] def signal_handler(self, signo, frame): """ Signal handler, which is called, when *SIGINT* and *SIGTERM* are sent to the LMIShell. :param int signo: signal number :param frame: -- stack frame """ if signo in self.INTERRUPT_SIGNALS: self._signal_handled = True [cb() for cb in self._callbacks.values()]
[docs]class LMIMethod(LMIWrapperBaseObject): """ LMI wrapper class representing :py:class:`wbem.CIMMethod`. :param LMIConnection conn: connection object :param LMIInstance(Name) lmi_instance: :py:class:`.LMIInstance` or :py:class:`.LMIInstanceName` object, on which the method call will be issued :param string method_name: method name """ # 15 seconds sleep timeout for main waiting thread _COND_WAIT_TIME = 15 # Wake count of main thread, when the GetInstance is performed to check, # if the job object is present. Prevents infinite waiting for indication # delivery. Maximum waiting time, before the GetInstance for job object # will be called is: _COND_WAIT_TIME * _COND_WAIT_WAKE_CNT _COND_WAIT_WAKE_CNT = 4 # Default tcp port, where the indications will be delivered. # TODO: create a configuration option for the port _INDICATION_DESTINATION_PORT = 10240 # Default number retries when bind fails. _INDICATION_BIND_TRIES = 10 # Job classes, which can be used for synchro method calls # TODO: create a configuration option for the static filters' classnames _INDICATION_JOB_CLASSNAMES = ( "LMI_SELinuxJob", "LMI_StorageJob", "LMI_SoftwareInstallationJob", "LMI_SoftwareVerificationJob", "LMI_NetworkJob") # Default namespace where the indication subscriptions, used for # synchronous method calls, will be registered. _INDICATION_NAMESPACE = "root/interop" # When performing a synchronous method call and using the polling method to # get a job object status, the sleep time between 2 polls doubles if it is # less than _POLLING_ADAPT_MAX_WAITING_TIME. _POLLING_ADAPT_MAX_WAITING_TIME = 32 def __init__(self, conn, lmi_instance, method_name): super(LMIMethod, self).__init__(conn) if not isinstance(lmi_instance, ( LMIObjectFactory().LMIInstance, LMIObjectFactory().LMIInstanceName)): raise TypeError("lmi_instance must be LMIInstance(Name) type") self._lmi_instance = lmi_instance self._sync_method = False self._method = None self._method_name = method_name self._valuemap_parameters_list = [] is_sync = method_name.startswith("Sync") if not conn.is_wsman(): if is_sync: # Store the synchronous flag and trim the method name self._sync_method = True self._method_name = method_name[4:] # We need to have CIMClass with qualifiers. Fetch full class. if isinstance(self._lmi_instance, LMIObjectFactory().LMIInstance): # LMIInstance lmi_class = self._lmi_instance._lmi_class else: # LMIInstanceName lmi_class = lmi_wrap_cim_class( self._conn, self._lmi_instance.classname, self._lmi_instance.namespace) lmi_class.fetch(True) self._method = lmi_class._cim_class.methods[self._method_name] # Store the constant values as a list. This can consume some time, if # computed on demand. self._valuemap_parameters_list = [ k for k, v in self._method.parameters.iteritems() if "ValueMap" in v.qualifiers] # For simplicity, we add return value constants to the same list if "ValueMap" in self._method.qualifiers: self._valuemap_parameters_list.append(self._method_name) elif conn.is_wsman() and is_sync: # XXX: We can't perform synchro calls using WSMAN errorstr = "Can't perform synchronous method calls using WSMAN" raise ValueError(errorstr) def __return_synchro_method_call(self, job_inst, job_refresh=True): """ Returns a :py:class:`.LMIReturnValue` object with Job output parameters set. :param LMIInstance job_inst: job returned from a synchronous method call :param bool job_refresh: flag, which indicates, if the ``job_inst`` needs to be refreshed :returns: :py:class:`.LMIReturnValue` object with ``rval`` set to Job output parameters """ def get_rval_errorstr(err_list): for err in err_list: if err.CIMStatusCode is not None: return err.CIMStatusCode, err.Message # There is no CIM_Error instance with CIMStatusCode set. In such # case, we set rval to CIM_ERR_FAILED and for error message we use # the first instance from err_list. message = err_list[0].Message if err_list else "Failed" return wbem.CIM_ERR_FAILED, message # Adjust return value from the job object. if job_refresh: job_inst.refresh() rval = None rparams = wbem.NocaseDict() if job_inst.JobOutParameters is not None: props = job_inst.JobOutParameters.properties rparams = wbem.NocaseDict() for name, prop in props.iteritems(): # Return just those properties of given class that are listed # as output parameters of particular method. if self._method.parameters.has_key(name) \ and (self._method.parameters[name].qualifiers is None \ or self._method.parameters[name].qualifiers.get( 'Out', False)): rparams[name] = prop.value elif name == '__ReturnValue': rval = prop.value errorstr = "" # Is job in exception state? If so, adjust corresponding error string # from job.GetError() instance if lmi_is_job_exception(job_inst): try: refreshed, _, errorstr = job_inst.refresh() except CIMError, e: logger.debug( "Sync%s: Job instance can not be refreshed; %s" % (self._method_name, str(e))) job_exception.value = e if not refreshed: logger.debug( "Sync%s: Job instance can not be refreshed; %s" % (self._method_name, errorstr)) raise LMISynchroMethodCallError(errorstr) exc_rval, exc_rparams, exc_errorstr = job_inst.GetErrors() err_list = exc_rparams.get("errors", None) if not err_list: logger.debug( "Sync%s: Can not retrieve Error instances; %s" % (self._method_name, exc_errorstr)) raise LMISynchroMethodCallError( "Could not get Job error message") rval, errorstr = get_rval_errorstr(err_list) if lmi_get_use_exceptions(): raise CIMError(rval, errorstr) return LMIReturnValue(rval=rval, rparams=rparams, errorstr=errorstr) def __handle_synchro_method_call_indication(self, job_inst): """ Handles a synchronous call for asynchronous methods returning a job object. This method uses static filters installed by each OpenLMI provider, which is capable of using jobs. :param LMIInstance job_inst: job object returned from a synchronous method call :returns: :py:class:`.LMIReturnValue` object with ``rval`` set to 0 and ``rparams`` set to job's output parameters :raises: :py:exc:`.LMIIndicationListenerError`, :py:exc:`.LMISynchroMethodCallError`, :py:exc:`.LMISynchroMethodCallFilterError` **NOTE:** Static filters' names need to be in format "``LMI:<job_class_name>:Changed``" """ def handle_job(ind, cond, job_finished, job_exception): """ :py:class:`.LMIListener` handler for synchronous method call, which uses indication means of waiting for the job. This function is called, when a job changes its state. :param threading.Condition cond: condition object used for thread synchronization :param LMIPassByRef job_finished: used for synchronization, whether the job has finished :param LMIPassByRef job_exception: contains an exception object, if any exception was raised """ cond.acquire() try: src_inst = ind["SourceInstance"] if lmi_is_job_finished(src_inst): # Job has just finished job_finished.value = JOB_FINISH_DELAYED except Exception, e: # Notify main thread, we are not able to work with such # objects. job_finished.value = JOB_FINISH_DELAYED job_exception.value = e finally: # XXX: Let's be defensive, always notify+release main thread. cond.notify() cond.release() cond = threading.Condition() job_finished = LMIPassByRef(JOB_NOT_FINISHED) job_exception = LMIPassByRef(None) # There needs to be a pattern of at least 8 "X" in a row at the end of # the indication_name. indication_name = "synchro-method-call-XXXXXXXX" # Start the indication listener listener = LMIIndicationListener( "0.0.0.0", self._INDICATION_DESTINATION_PORT, LMIShellConfig().cert_file, LMIShellConfig().key_file) indication_name = listener.add_handler( indication_name, handle_job, cond, job_finished, job_exception) listener.start(self._INDICATION_BIND_TRIES) logger.debug( "Choosing port for Sync%s() indications: %d" % (self._method_name, listener.port)) # Search for necessary static filter filter_name = "LMI:%s:Changed" % job_inst.classname cim_filters, _, _ = self._conn.client.get_instances( "CIM_IndicationFilter", LMIMethod._INDICATION_NAMESPACE, {"Name": filter_name}) if not cim_filters: listener.stop() errorstr = "Can not find proper CIM_IndicationFilter" raise LMISynchroMethodCallFilterError(errorstr) cim_filter = cim_filters[0] netloc = urlparse.urlparse(self._conn.uri).netloc if not netloc: listener.stop() errorstr = "Can not determine netloc from client's uri" raise LMISynchroMethodCallError(errorstr) netloc = netloc.split(":")[0] # NOTE: This will work only on a local area network. Complicated # networks may require additional configuration to make this work. See # LMIMethod() and PreferPolling. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: s.connect((netloc, listener.port)) except socket.gaierror, e: listener.stop() errorstr = "Can not determine IP address of this machine" raise LMISynchroMethodCallError(errorstr) destination = s.getsockname()[0] s.close() # Create handler object cim_handler_props = { "Name": indication_name, "Destination": "%s://%s:%d/CIMListener/%s" % ( "https" if listener.uses_ssl else "http", destination, listener.port, indication_name), } cim_handler, _, _ = self._conn.client.create_instance( "CIM_IndicationHandlerCIMXML", LMIMethod._INDICATION_NAMESPACE, self._conn.hostname, cim_handler_props) if not cim_handler: listener.stop() errorstr = "Can not create CIM_IndicationHandlerCIMXML object" raise LMISynchroMethodCallError(errorstr) # Create indication subscription object cim_subscription_props = { "Filter": cim_filter.path, "Handler": cim_handler.path } cim_subscription, _, _ = self._conn.client.create_instance( "CIM_IndicationSubscription", LMIMethod._INDICATION_NAMESPACE, self._conn.hostname, cim_subscription_props) if not cim_subscription: self._conn.client.delete_instance(cim_handler) listener.stop() errorstr = "Can not create CIM_IndicationSubscription object" raise LMISynchroMethodCallError(errorstr) # Check, if the job is not already in finished state, # while we were subscribing for the indications job_inst.refresh() if lmi_is_job_finished(job_inst): job_finished.value = JOB_FINISH_EARLY # Register signal callback for SIGINT, SIGTERM with callback, # which awakes waiting thread for immediate return. signal_helper = LMIMethodSignalHelper() signal_helper.callback_attach( "indication", lambda: LMIMethod.__wake(cond)) signal_helper.signal_attach() # Wait for the job to finish wake_cnt = 0 cond.acquire() while not signal_helper.signal_handled() and \ not job_finished.value and \ not lmi_is_job_finished(job_inst): cond.wait(LMIMethod._COND_WAIT_TIME) wake_cnt += 1 # XXX: threading.Condition.wait() does not inform about timeout or # being awaken by notify call. There is a counting to 4 sleep # cycles before we actually check for job status manually. This # number can be increased, so we rely more on indications, rather # then on manual polling. if wake_cnt >= LMIMethod._COND_WAIT_WAKE_CNT and \ not job_finished.value: wake_cnt = 0 try: refreshed, _, errorstr = job_inst.refresh() except CIMError, e: logger.debug( "Sync%s: Job instance can not be refreshed; %s" % (self._method_name, str(e))) job_exception.value = e break if not refreshed: logger.debug( "Sync%s: Job instance can not be refreshed; %s" % (self._method_name, errorstr)) job_exception.value = LMISynchroMethodCallError(errorstr) break # Unregister signal handler signal_helper.signal_detach() signal_helper.callback_detach("indication") cond.release() # Cleanup listener.stop() self._conn.client.delete_instance(cim_subscription.path) self._conn.client.delete_instance(cim_handler.path) if job_exception.value: raise job_exception.value if signal_helper.signal_handled() and not job_finished.value: # We got SIGINT or SIGTERM, when waiting for the job, cancelling # the job logger.warn("Cancelling a job '%s'" % job_inst.Name) rstate = job_inst.RequestStateChange.RequestedStateValues.Terminate job_inst.RequestStateChange(RequestedState=rstate) return LMIReturnValue(rval=None) # Return the job return values, refresh the job_inst object, if we got # notified about the job finish state by indication. return self.__return_synchro_method_call( job_inst, job_finished.value == JOB_FINISH_DELAYED) def __handle_synchro_method_call_polling(self, job_inst): """ Handles a synchronous call for asynchronous methods returning a job object. This call uses polling method to wait for the job to finish. :param LMIInstance job_inst: job object :returns: :py:class:`.LMIReturnValue` object with ``rval`` set to 0 and ``rparams`` set to job's output parameters. :raises: :py:exc:`.LMISynchroMethodCallError` """ # Register signal callback for SIGINT, SIGTERM with callback, # which awakes waiting thread for immediate return. signal_helper = LMIMethodSignalHelper() signal_helper.callback_attach( "polling", lambda: LMIMethod.__wake(cond)) signal_helper.signal_attach() cond = threading.Condition() cond.acquire() job_exception = None try: sleep_time = 1 while not signal_helper.signal_handled() and \ not lmi_is_job_finished(job_inst): # Sleep, a bit longer in every iteration cond.wait(sleep_time) if sleep_time < LMIMethod._POLLING_ADAPT_MAX_WAITING_TIME: sleep_time *= 2 refreshed, _, errorstr = job_inst.refresh() if not refreshed: logger.debug( "Sync%s: Job instance can not be refreshed; %s" % (self._method_name, errorstr)) job_exception = LMISynchroMethodCallError(errorstr) break except CIMError, e: logger.debug( "Sync%s: Job instance can not be refreshed; %s" % (self._method_name, str(e))) job_exception = LMISynchroMethodCallError(e.message) finally: cond.release() # Unregister signal handler and callback signal_helper.signal_detach() signal_helper.callback_detach("polling") if signal_helper.signal_handled() and \ not lmi_is_job_finished(job_inst): # We got SIGINT or SIGTERM, when waiting for the job, cancelling # the job logger.warn("Cancelling a job '%s'" % job_inst.Name) rstate = job_inst.RequestStateChange.RequestedStateValues.Terminate job_inst.RequestStateChange(RequestedState=rstate) return LMIReturnValue(rval=None) if job_exception is not None: raise job_exception # Return the job return values. No need to refresh the job instance, we # already have a "fresh" one. return self.__return_synchro_method_call(job_inst, False) def __call__(self, method_args=None, **kwargs): """ Perform a method call. Method arguments are preferably passed by dictionary (parameter : value). Using former means of passing arguments, by keyword arguments, to a method call works too. If performing a synchronous method call, passing PreferPolling can be used to select which method should be used -- either subscribing to an indication or polling method. This is available only, when talking to CIMOM via CIM-XML. :param dictionary method_args: method arguments :param dictionary kwargs: keyword method arguments (``method_args`` preffered) * **RefreshInstance** (*bool*) flag, which tells the LMIShell, whether the instance should be refreshed after a method call. Default value is False. :returns: :py:class:`.LMIReturnValue` object with ``rval`` set to method's return value; ``rparams`` set to method's return parameters; ``errorstr`` set method's error string in case of failure :raises: :py:exc:`.LMIUnknownParameterError`, :py:exc:`.LMIMethodCallError` :py:exc:`.LMISynchroMethodCallError` **Usage:** :ref:`instances_methods`. """ synchro_method_polling = kwargs.pop("PreferPolling", False) refresh_requested = kwargs.pop("RefreshInstance", False) # Prepare method parameters if method_args is None: method_args = {} method_args.update(kwargs) if not self._conn.is_wsman(): for param, value in method_args.iteritems(): if param in self._method.parameters: # Cast input parameters into acceptable CIM types t = self._method.parameters[param].type method_args[param] = lmi_transform_to_cim_param(t, value) else: # NOTE: maybe we could check for wbem type and not to exit # prematurely errorstr = "Unknown parameter '%s' supplied for method '%s'" % \ (param, self._method.name) lmi_raise_or_dump_exception(LMIUnknownParameterError(errorstr)) return LMIReturnValue(rval=-1, errorstr=errorstr) else: for param, value in method_args.iteritems(): method_args[param] = str(value) # Call CIM method client = self._conn.client rval, call_rparams, call_errorstr = client.call_method( self._lmi_instance, self._method_name, **method_args) rval = lmi_transform_to_lmi(self._conn, rval) # Do we have any return parameters? If so, transform them to LMIShells' # types. If it is possible, perform synchronous method call. if call_rparams: call_rparams = lmi_transform_to_lmi(self._conn, call_rparams) if not call_rparams: # NOTE: this is wrong! What should we do? errorstr = "Could not perform CIM -> LMI object transformation" lmi_raise_or_dump_exception(LMIMethodCallError(errorstr)) return LMIReturnValue(rval=rval, errorstr=errorstr) # Check if we can perform synchronous method call job = call_rparams.get("job", None) can_perform_sync_call = False if not self._conn.is_wsman() and job and \ job.classname in LMIMethod._INDICATION_JOB_CLASSNAMES: # We can perform synchro methods only using CIM-XML can_perform_sync_call = True # Perform synchronous method call if self._sync_method and can_perform_sync_call: # We work with LMIInstance object, which simplifies the code # with instance refreshing and stuff. job_inst = call_rparams["job"].to_instance() # At first, try to wait for the call to finish by subscribing # to an indication handled_by_indication = False if not synchro_method_polling: try: rval, call_rparams, call_errorstr = \ self.__handle_synchro_method_call_indication( job_inst) handled_by_indication = True except CIMError, e: lmi_raise_or_dump_exception(e) return LMIReturnValue(rval=-1, errorstr=e.args[1]) except LMISynchroMethodCallError, e: lmi_raise_or_dump_exception(e) return LMIReturnValue(rval=-1, errorstr=e.message) # Fall through, try to handle the synchro call by polling except LMIHandlerNamePatternError, e: handled_by_indication = False except LMISynchroMethodCallFilterError, e: handled_by_indication = False except LMIIndicationListenerError, e: handled_by_indication = False if not handled_by_indication: # Executed, when LMIListener can not be started try: rval, call_rparams, call_errorstr = \ self.__handle_synchro_method_call_polling(job_inst) except LMISynchroMethodCallError, e: lmi_raise_or_dump_exception(e) return LMIReturnValue(rval=-1, errorstr=e.message) call_rparams = lmi_transform_to_lmi(self._conn, call_rparams) # Refresh is requested if refresh_requested: # Refresh the instance, within which the method was called ref_rval, _, ref_errorstr = self.__refresh_instance() if not ref_rval: errorstr = "Could not update an LMI object after a method call" lmi_raise_or_dump_exception(LMIMethodCallError(ref_errorstr)) return LMIReturnValue(rval=ref_rval, errorstr=ref_errorstr) return LMIReturnValue( rval=rval, rparams=call_rparams, errorstr=call_errorstr) def __getattr__(self, name): """ Returns either a class member, or a constant value. :param string name: class member, or the constant value name """ if not self._conn.is_wsman() and name.endswith("Values"): parameter_name = name[:-6] if parameter_name in self._method.parameters: return LMIConstantValuesParamProp( self._method.parameters[parameter_name]) elif parameter_name == self._method.name: return LMIConstantValuesMethodReturnType(self._method) raise AttributeError(name) @staticmethod def __wake(cond): """ Helper function used for manual :py:attr:`threading.Condition` wakeup. :param threading.Condition cond: condition object """ cond.acquire() cond.notify() cond.release() def __refresh_instance(self): """ Refreshes nested :py:class:`.LMIInstance`. **NOTE:** This method refreshes only nested :py:class:`.LMIInstance`. Objects of this class may also contain :py:class:`.LMIInstanceName`; in this case, no refreshing is performed. :rtype: :py:class:`.LMIReturnValue` """ if not isinstance(self._lmi_instance, LMIObjectFactory().LMIInstance): # There is nothing to refresh here. return LMIReturnValue(rval=True) return self._lmi_instance.refresh()
[docs] def doc(self): """ Prints out pretty verbose message with documentation for the class. If the LMIShell is run in a interactive mode, the output will be redirected to a pager set by environment variable :envvar:`PAGER`. If there is not :envvar:`PAGER` set, less or more will be used as a fall-back. """ if self._conn.is_wsman(): # WSMAN doesn't support reflextion, we can't call GetClass(). lmi_get_logger().info("WSMAN client doesn't support GetClass()") return formatter = LMIMethodFormatter(self._method) formatter.fancy_format(self._conn.client.interactive)
@property
[docs] def return_type(self): """ :returns: string of the method call's return type """ if self._conn.is_wsman(): # WSMAN doesn't support reflextion, we can't call GetClass(). return "unknown" return self._method.return_type
[docs] def tomof(self): """ Prints out a message with MOF representation of :py:class:`wbem.CIMMethod`. If the LMIShell is run in a interactive mode, the output will be redirected to a pager set by environment variable :envvar:`PAGER`. If there is not :envvar:`PAGER` set, less or more will be used as a fall-back. """ if self._conn.is_wsman(): # WSMAN doesn't support reflextion, we can't call GetClass(). lmi_get_logger().info("WSMAN client doesn't support GetClass()") return formatter = LMIMofFormatter(self._method) formatter.fancy_format(self._conn.client.interactive)
[docs] def valuemap_parameters(self): """ :returns: list of strings of the constant names """ return self._valuemap_parameters_list
[docs] def print_valuemap_parameters(self): """ Prints out the list of strings of constant names. """ for i in self._valuemap_parameters_list: sys.stdout.write("%s\n" % i)
[docs] def parameters(self): """ :returns: list of strings of :py:class:`wbem.CIMMethod`'s parameters """ if self._conn.is_wsman(): # WSMAN doesn't support reflextion, we can't call GetClass(). return [] return self._method.parameters
[docs] def print_parameters(self): """ Prints out :py:class:`wbem.CIMMethod`'s parameters. """ if self._conn.is_wsman(): # WSMAN doesn't support reflextion, we can't call GetClass(). return for param, value in self._method.parameters.iteritems(): sys.stdout.write( "%s %s%s\n" % (value.type, param, "[]" if value.is_array else ""))
@property
[docs] def wrapped_object(self): """ :returns: wrapped :py:class:`wbem.CIMmethod` object """ return self._method