Source code for ginsfsm.gobj

# -*- encoding: utf-8 -*-
"""
Introduction
============

An :term:`GObj` is an object that has an inside :term:`simple-machine`
that defines its behavior.

The communication between :term:`gobj`'s happens via :term:`event`'s:

:class:`GObj` has the next methods:

* :meth:`GObj.start_up`
* :meth:`GObj.create_gobj`
* :meth:`GObj.destroy_gobj`
* :meth:`GObj.send_event`
* :meth:`GObj.post_event`
* :meth:`GObj.broadcast_event`
* :meth:`GObj.subscribe_event`
* :meth:`GObj.delete_subscription`
* :meth:`GObj.set_owned_event_filter`
* :meth:`GObj.overwrite_parameters`
* :meth:`GObj.overwrite_few_parameters`

And this inherited from :class:`ginsfsm.smachine.SMachine`:

* :meth:`ginsfsm.smachine.SMachine.set_new_state`
* :meth:`ginsfsm.smachine.SMachine.get_current_state`
* :meth:`ginsfsm.smachine.SMachine.get_event_list`
* :meth:`ginsfsm.smachine.SMachine.get_output_event_list`


:class:`GObj` has the next public attributes:

* :attr:`GObj.name`
* :attr:`GObj.parent`
* :attr:`GObj.dl_childs`

Events to manage events:

* sending events:

    * sending events by direct delivery: :meth:`GObj.send_event`.
    * sending events by queues: :meth:`GObj.post_event`.
    * sending events to subscribers: :meth:`GObj.broadcast_event`.

* receiving events:

    * directly from another :term:`gobj`'s who knows you.
    * subscribing to events by the :meth:`GObj.subscribe_event` method.
    * you can filtering events being broadcasting with
      :meth:`GObj.set_owned_event_filter` method.


Event
=====

.. autoclass:: Event
    :members:


GObj
====

.. autoclass:: GObj
    :members: start_up, create_gobj, destroy_gobj,
        send_event, post_event, broadcast_event,
        subscribe_event, delete_subscription, set_owned_event_filter,
        overwrite_parameters, overwrite_few_parameters

    .. attribute:: name

        My name.

        Set by :meth:`create_gobj`.

    .. attribute:: parent

        My parent, destination of my events... or not.

        Set by :meth:`create_gobj`.

    .. attribute:: dl_childs

        Set of my gobj childs. Me too like be parent!.


    .. method:: set_new_state

        Set a new state.
        Method to used inside actions, to force the change of state.

        :param new_state: new state to set.

        ``new_state`` must match some of the state names of the
        machine's :term:`state-list` or a :exc:`StateError` exception
        will be raised.

    .. method:: get_current_state

        Return the name of the current state.

        If there is no state it returns ``None``.

    .. method:: get_event_list

       Return the list with the :term:`input-event`'s names.

    .. method:: get_output_event_list

       Return the list with the :term:`output-event`'s names.


Exceptions
==========

.. autoexception:: GObjError

.. autoexception:: EventError

.. autoexception:: StateError

.. autoexception:: MachineError

.. autoexception:: EventNotAcceptedError

"""
import re
import ginsfsm.globals  # made it import available
from ginsfsm.compat import string_types
from ginsfsm.smachine import (
    SMachine,
    EventError,
    EventNotAcceptedError,  # made it import available
    StateError,  # made it import available
    MachineError,  # made it import available
)
from ginsfsm.gconfig import (
    GConfig,
    add_gconfig,
)
from ginsfsm.deferred import Deferred


[docs]class GObjError(Exception): """ General GObj exception"""
[docs]class Event(object): """ Collect event properties. This is the argument received by actions. :param destination: destination gobj whom to send the event. :param event_name: event name. :param source: list with path of gobj sources. Firt item ``source[0]`` is the original sender gobj. Last item ``source[-1]`` is the nearest sender gobj. :param kw: keyword arguments with associated data to event. """ # For now, event_factory is private. Automatically using by send_event... #Use the :meth:`GObj.event_factory` factory function to create Event #instances. def __init__(self, destination, event_name, source, **kw): self.destination = destination self.event_name = event_name if not isinstance(source, (list, tuple)): source = [source] self.source = source self.kw = kw.copy() self.__dict__.update(**kw) def copy(self): pass def __repr__(self): return 'Event object: name %r, destination: %r, kw: %r' % ( self.event_name, self.destination, self.kw, ) def __str__(self): return 'Event object: name %r, destination: %r, kw: %r' % ( self.event_name, self.destination, self.kw, )
class _Subscription(object): """ Collect subscriber properties `event_name`: event name. `subscriber_gobj`: subcriber gobj to sending event. `kw`: event parameters """ def __init__(self, me, event_name, subscriber_gobj, **kw): self.me = me self.event_name = event_name self.subscriber_gobj = subscriber_gobj self.kw = kw self.__dict__.update(**kw) def __repr__(self): resp = "Subscription:\n me : %r\n ev : %r\n sub: %r\n" % ( self.me, self.event_name, self.subscriber_gobj) return resp GOBJ_GCONFIG = { 'trace_traverse': [bool, False, 0, None, 'Trace traverse search'], 'trace_creation': [ bool, False, GConfig.FLAG_DIRECT_ATTR, None, 'Trace creation/destroy of gobj'], 'trace_subscription': [bool, False, 0, None, 'Trace event subscribing'], 're_name': [ str, None, 0, None, 'Regular expression name to search the gobj in the resource tree.' 'Used in Pyramid traversal.'], # trace_mach is inherited from SMachine. 'trace_mach': [ bool, False, GConfig.FLAG_DIRECT_ATTR, None, 'Trace machine activity' ], # logger is inherited from SMachine. 'logger': [None, None, GConfig.FLAG_DIRECT_ATTR, None, ''], 'gaplic': [None, None, GConfig.FLAG_DIRECT_ATTR, None, 'My grand-father'], 'create_gobj': [None, None, GConfig.FLAG_DIRECT_ATTR, None, ''], 'destroy_gobj': [None, None, GConfig.FLAG_DIRECT_ATTR, None, ''], '_increase_inside': [None, None, GConfig.FLAG_DIRECT_ATTR, None, ''], '_decrease_inside': [None, None, GConfig.FLAG_DIRECT_ATTR, None, ''], '_tab': [None, None, GConfig.FLAG_DIRECT_ATTR, None, ''], '__unique_name__': [ bool, False, GConfig.FLAG_DIRECT_ATTR, None, 'If using gaplic, unique name' ], } _urandom_name = 0
[docs]class GObj(SMachine, GConfig): """ Well, yes, I'm a very simple brain. Only a machine. But write a good FSM, and I never fail you. Derive me, and write my FSM. Sample GObj:: class MyGObj(GObj): def __init__(self): GObj.__init__(self, FSM, GCONFIG) def start_up(self): ''' Initialization zone.''' :param fsm: FSM :term:`simple-machine`. :param gconfig: GCONFIG :term:`gconfig-template`. """ # name and parent attributes must be Location-Aware Resources # for Pyramid framework compatibility: # changed to point to __name__ and __parent__. @property def name(self): return self.__name__ @name.setter
[docs] def name(self, name): self.__name__ = name
@property def parent(self): return self.__parent__ @parent.setter
[docs] def parent(self, parent): self.__parent__ = parent
def __init__(self, fsm, gconfig=None): SMachine.__init__(self, fsm) self.name = '' """ My name. Set by :meth:`create_gobj` """ self.parent = None """My parent, destination of my events... or not. Set by :meth:`create_gobj` """ self.dl_childs = set() # my childs... me too like be parent. """List of gobj childs. """ self.owned_event_filter = None # TODO debe ser una lista de filtros """Filter to broadcast_event function to check the owner of events. """ self._dl_subscriptions = set() # uauuu, how many fans!! self._some_subscriptions = False self._destroyed = False # mark as destroyed when destroy_gobj() self._re_compiled_name = '' # re compiled name when using re_name self.re_matched_name = '' # matched name when using re_name self.trace_creation = False gconfig = add_gconfig(gconfig, GOBJ_GCONFIG) GConfig.__init__(self, gconfig, self.logger) def __del__(self): if self.trace_creation: self.logger and self.logger.info("Destroyed! <-- %r" % (self)) def __str__(self): name = "%s" % (self.name) parent = self.parent while(parent): parent_name = "%s" % (parent.name) name = parent_name + '.' + name parent = parent.parent return "%s" % (name) def __repr__(self): name = "%s:%s" % (self.__class__.__name__, self.name) parent = self.parent while(parent): parent_name = "%s:%s" % (parent.__class__.__name__, parent.name) name = parent_name + '.' + name parent = parent.parent return "'%s: %s'" % (name, 'Destroyed' if self._destroyed else "Lived")
[docs] def create_gobj(self, name, gclass, parent, **kw): """ Factory function to create gobj's instances. :param name: Name of the gobj. :param gclass: `gclass` is the :class:`GObj` type used to create the new gobj. It's must be a derived class of :class:`GObj` otherwise a :exc:`GObjError` exception will be raised. :param parent: parent of the new :term:`gobj`. ``None`` if it has no parent. If it's not ``None``, then must be a derived class of :class:`GObj` otherwise a :exc:`GObjError` exception will be raised. :param kw: Attributes that are added to the new :term:`gobj`. All the keyword arguments used in the creation function **are added as attributes** to ``config`` object. You must consult the attributes supported by each `gclass` type. The attributes must be defined in the gclass GCONFIG, otherwise they are ignored. Special kw: ``'__random_name__'``: Generate a random gobj name TODO: doc re_name :rtype: new gobj instance. The factory funcion does: * Add the :term:`gobj` to their parent child list :attr:`GObj.dl_childs`, * If it's a :term:`named-gobj` call the :meth:`GObj.register_unique_gobj`. * Call :meth:`GObj.start_up`. * Add to the :term:`gobj` several attributes: * **name**: name of the created :term:`gobj`. * **parent**: the parent :term:`gobj` of the created :term:`gobj`. """ if gclass is None: raise GObjError( '''ERROR create_gobj(): No GObj class supplied.''') if not issubclass(gclass, GObj): raise GObjError( '''ERROR create_gobj(): class '%r' is NOT a GObj subclass''' % (gclass)) if parent is not None: if not isinstance(parent, GObj): raise GObjError( '''ERROR create_gobj(): parent '%r' ''' '''is NOT a GObj subclass''' % (gclass)) self.__random_name__ = kw.pop('__random_name__', False) if self.__random_name__: name = self.get_random_name(name) gobj = gclass() if name: gobj.name = name # Who wins? arguments or file ini_settings? # ini global (in gaplic) wins. gobj.write_parameters(**kw) if self.gaplic and self.gaplic.ini_settings is not None: # ini global win. gobj.overwrite_parameters(0, **self.gaplic.ini_settings) if parent is not None: parent._add_child(gobj) if gobj.config.re_name: gobj._re_compiled_name = re.compile(gobj.config.re_name) if gobj.trace_creation: self.logger and self.logger.info("Creating --> '%s:%s'" % ( gclass.__name__, name)) if gobj.__unique_name__: registered = self.gaplic._register_unique_gobj(gobj) if not registered: raise GObjError( "ERROR create_gobj():" " cannot _register_unique_gobj '%s' " % (gobj.name)) gobj.start_up() if gobj.trace_creation and self.logger: if gobj.__unique_name__: unique = '!' else: unique = ' ' self.logger.info("Created <--%s '%r'" % (unique, gobj)) return gobj
def get_random_name(self, prefix): global _urandom_name _urandom_name += 1 if prefix: return '%s_%d' % (prefix, _urandom_name) else: return '%d' % (_urandom_name) @staticmethod
[docs] def destroy_gobj(gobj): """ Destroy a gobj """ if gobj.trace_creation and gobj.logger: gobj.logger.info("Destroying --> %r" % (gobj)) if gobj._destroyed: if gobj.logger: gobj.logger.error( "ERROR reentering in destroy_gobj: %r" % gobj) return gobj._destroyed = True if gobj.parent is not None: gobj.parent._remove_child(gobj) while len(gobj.dl_childs): try: for child in gobj.dl_childs: if not child._destroyed: GObj.destroy_gobj(child) except RuntimeError: pass # "Set changed size during iteration" is OK gobj.delete_all_subscriptions(force=True) gobj.go_out() del gobj
[docs] def start_up(self): """ Initialization zone. Well, the __init__ method is used to build the FSM so I need another function to initialize the new gobj. Please, **override me**, and write here all the code you need to start up the machine: create your owns childs, etc. This function is called by :meth:`create_gobj` after creating the gobj instance. """
def go_out(self): """ Finish zone. Please, **override me** to do extra work when the gobj is being destroyed. In this point, all childs and subscriptions are already deleted. """ def _resolv_destination(self, destination): """ Resolve the destination gobj. If destination it's a string: * check is gaplic exists. * try to resolv the destination by gaplic. If the string destination cannot be resolved then return the string. It must be resolved by gaplic router. """ if not (isinstance(destination, (string_types, GObj))): raise GObjError( 'Destination gobj ("%s") must be a string or a GObj instance' % (destination) ) if isinstance(destination, string_types): if not self.gaplic: raise GObjError( 'When destination gobj ("%s") is a string, ' 'you need a GAplic!' % (destination) ) named_gobj = self.gaplic.find_unique_gobj(destination) if not named_gobj: raise GObjError('GObj %r UNKNOWN in %r' % ( destination, self.gaplic)) destination = named_gobj return destination def _event_factory(self, destination, event, **kw): """ Factory to create Event instances. :param destination: destination gobj whom send the event. :param event: an :term:`event`. :param kw: keyword arguments with associated data to event. :rtype: Return Event instance. ``event`` must be a `string` or :class:`Event` types, otherwise a :exc:`EventError` will be raised. If ``event`` is an :class:`Event` instance, a new :class:`Event` duplicated instance is returned, but it will be updated with the new ``destination`` and ``kw`` keyword arguments. .. note:: All the keyword arguments used in the factory function **are added as attributes** to the created :term:`event` instance. You must consult the attributes supported by each machine's event. """ if not (isinstance(event, string_types) or isinstance(event, Event)): raise EventError( 'Event "%r" must be a string or Event instance' % (event,)) # if destination is not None: if not (isinstance(destination, string_types) or isinstance(destination, GObj) or isinstance(destination, Deferred) ): raise GObjError( 'Destination "%r" must be a string or GObj/Deferred instance' % (destination,)) if isinstance(event, Event): # duplicate the event if event.source[-1] != self: event.source.append(self) if len(kw): event.kw.update(**kw) event = Event( event.destination, event.event_name, event.source, **event.kw ) if destination is not None: event.destination = destination else: event = Event(destination, event, self, **kw) return event
[docs] def send_event(self, destination, event, **kw): """ Send ``event`` to ``destination``, with associated event data ``kw``. :param destination: Must be a string gobj name or a :term:`gobj` instance, otherwise a :exc:`GObjError` will be raised. If it's a string and the gobj name cannot be resolved inside the current :term:`gaplic`, then the event is :func:`post_event` in order to be resolved by the gaplic router. :param event: Must be a string event name or a :class:`Event` instance, otherwise an :exc:`EventError` will be raised. :param kw: All the keyword arguments **are added as attributes** to the sent :class:`Event` instance. You must consult the attributes supported by each machine's event. :rtype: return the returned value from the executed action. Return ``None`` if the event has been :func:`post_event`. If the :term:`event-name` exists in the machine, but it's not accepted by the current state, then no exception is raised but the function **returns** :exc:`EventNotAcceptedError`. .. note:: The :meth:`inject_event` method doesn't **raise** :exc:`EventNotAcceptedError` because a :term:`machine` should run under any circumstances. In any way an action can raise exceptions. """ destination = self._resolv_destination(destination) event = self._event_factory(destination, event, **kw) if isinstance(destination, string_types): # string gobjs must be resolved by gaplic return self.post_event(destination, event) if destination._destroyed: self.logger and self.logger.error( "ERROR internal: sending event %r to a destroyed gobj %r" % ( event.event_name, destination) ) return -1 ret = destination.inject_event(event) return ret
[docs] def post_event(self, destination, event, **kw): """ Same funcionality as :func:`send_event`, but the event is processed by gaplic. If the destination is inside of the current gaplic the event will be sent in the next loop cycle. If the destination is in another gaplic, the event will be processed by the gaplic's router. """ event = self._event_factory(destination, event, **kw) if not self.gaplic: raise GObjError( 'To use post_event, you need a GAplic!' ) self.gaplic.enqueue_event(event)
[docs] def broadcast_event(self, event, **kw): """ Broadcast the ``event`` to all subscribers. :param event: :term:`event` to send. :param kw: keyword argument with data associated to event. .. note:: All the keyword arguments **are added as attributes** to the sent :term:`event`. Use this function when you don't know who are your event's clients, when you don't know the :term:`gobj` destination of your :term:`output-event`'s If there is no subscriptors, the event is not sent. When an event has several subscriptors, there is a mechanism called :term:`event-filter` that allows to a subcriptor to own the event and no further spread by more subscribers. The filter function set by :meth:`set_owned_event_filter` method, is call with the returned value of an :term:`action` as argument: If the filter function return ``True``, the event is owned, and the :func:`ginsfsm.gobj.GObj.broadcast_event` function doesn't continue sending the event to other subscribers. .. note:: If :func:`ginsfsm.gobj.GObj.broadcast_event` function uses :func:`ginsfsm.gobj.GObj.post_event`, the :term:`event-filter` cannot be applied. """ if not self._some_subscriptions: return subscriptions = self._dl_subscriptions.copy() sended_gobj = set() # don't repeat events for sub in subscriptions: if sub.subscriber_gobj in sended_gobj: continue oevent = self._event_factory(sub.subscriber_gobj, event, **kw) if None in sub.event_name or \ oevent.event_name in sub.event_name: if hasattr(sub, '__rename_event_name__'): oevent.kw.update( { 'original_event_name': oevent.event_name, } ) oevent.event_name = sub.__rename_event_name__ if hasattr(sub, '__subscription_reference__'): oevent.kw.update( { '__subscription_reference__': sub.__subscription_reference__, } ) oevent.event_name = sub.__rename_event_name__ ret = False if isinstance(sub.subscriber_gobj, Deferred): # outside world sub.subscriber_gobj(event=oevent) else: # gobj-ecosistema if hasattr(sub, '__use_post_event__'): ret = self.post_event(sub.subscriber_gobj, oevent) else: ret = self.send_event(sub.subscriber_gobj, oevent) if self.owned_event_filter: ret = self.owned_event_filter(ret) if ret is True: return # propietary event sended_gobj.add(sub.subscriber_gobj)
[docs] def subscribe_event(self, event_name, subscriber_gobj, **kw): """ Subscribe to an event. :param event_name: string event name or tuple/list of string event names. If ``event_name`` is ``None`` then it subscribes to all events. If it's not ``None`` then it must be a valid event name from the :term:`output-event` list, otherwise a :exc:`EventError` will be raised. :param subscriber_gobj: subscriber obj that wants receive the event. ``subscriber_gobj`` must be: * `None`: the subscriber is the parent. * `string`: the subscriber is a :term:`unique-named-gobj`. * type :class:`GObj` instance. * Deferred callback. otherwise otherwise a :exc:`GObjError` will be raised. :param kw: keyword arguments. :return: the subscription object. Possible values for **kw** arguments: * `__use_post_event__`: ``bool`` You must set it to `True` in order to broadcast the events using `post-event` instead of `send-event`. * `__rename_event_name__`: `'new event name'` You can rename the output original event name. The :attr:`original_event_name` attribute is added to the sent event with the value of the original event name. * `__hard_subscription__`: ``bool`` True for permanent subscription. This subscription cannot be remove, neither with delete_all_subscriptions(). (Well, with force you can) * `__subscription_reference__`: ``str`` If exists, it will be added as kw in the event broadcast. Can be used by the subscriptor for general purposes. """ #if subscriber_gobj is not None: if not (isinstance(subscriber_gobj, string_types) or isinstance(subscriber_gobj, Deferred) or isinstance(subscriber_gobj, GObj) ): raise GObjError('Bad type of subscriber_gobj %r' % ( subscriber_gobj,)) output_events = self.get_output_event_list() if not isinstance(event_name, (list, tuple)): event_name = (event_name,) for name in event_name: if name is None: continue if not isinstance(name, string_types): raise EventError( 'subscribe_event(): event %r is not string in %r' % (name, self)) if name not in output_events: raise EventError( 'subscribe_event(): output-event %r not defined in' ' %r' % (event_name, self)) existing_subs = self._find_subscription(event_name, subscriber_gobj) if existing_subs: # avoid duplication subscriptions if self.logger: self.logger.warning( "WARNING duplicate subscription:\n" " me : %r\n" " ev : %r\n" " sub: %r\n" % (self, event_name, subscriber_gobj)) self.delete_subscription(event_name, subscriber_gobj) subscription = _Subscription(self, event_name, subscriber_gobj, **kw) if self.config.trace_subscription and self.logger: self.logger.info('NEW %r' % subscription) self._dl_subscriptions.add(subscription) self._some_subscriptions = True return subscription
def _find_subscription(self, event_name, subscriber_gobj): """ Find a subscription by event_name and subscriber gobj. Internal use to avoid duplicates subscriptions. """ if not isinstance(event_name, (list, tuple)): event_name = (event_name,) for sub in self._dl_subscriptions: if sorted(list(sub.event_name)) == sorted(list(event_name)): if sub.subscriber_gobj == subscriber_gobj: return sub return None def delete_subscription_by_object(self, subscription): if subscription in self._dl_subscriptions: if self.config.trace_subscription and self.logger: self.logger.info('DEL %r' % subscription) self._dl_subscriptions.remove(subscription) if len(self._dl_subscriptions) == 0: self._some_subscriptions = False return True return False
[docs] def delete_subscription(self, event_name, subscriber_gobj): """ Remove `subscription`. :param event_name: string event name or tuple/list of string event names. :param subscriber_gobj: subscriber gobj. """ sub = self._find_subscription(event_name, subscriber_gobj) if self.config.trace_subscription and self.logger: self.logger.info('DEL %r' % sub) if sub: if sub.kw.get('__hard_subscription__', None): if self.logger: self.logger.error( "WARNING cannot delete a hard subscription(): '%s'" % ( event_name)) return False else: self._dl_subscriptions.remove(sub) if len(self._dl_subscriptions) == 0: self._some_subscriptions = False return True if self.logger: self.logger.error( "ERROR delete_subscription(): '%s' NOT FOUND " % (event_name)) return False
def delete_all_subscriptions(self, force=False): """ Remove all subscriptions. """ subscriptions = self._dl_subscriptions.copy() for sub in subscriptions: if not force: if sub.kw.get('__hard_subscription__'): continue if self.config.trace_subscription and self.logger: self.logger.info('DEL %r' % sub) self._dl_subscriptions.remove(sub) if len(self._dl_subscriptions) == 0: self._some_subscriptions = False
[docs] def set_owned_event_filter(self, filter): """ Set a filter function to be used by :meth:`broadcast_event` function to check the owner of events. """ self.owned_event_filter = filter
def _add_child(self, gobj): """ Add a child ``gobj``. :param gobj: :term:`gobj` child to add. Raise :exc:`GObjError` is ``gobj`` already has a parent. This function is called by :meth:`create_gobj` after creating the gobj instance. """ if gobj.parent: raise GObjError('GObj "%r" already has parent' % (gobj)) self.dl_childs.add(gobj) gobj.parent = self def _remove_child(self, gobj): """ Remove the child ``gobj``. :param gobj: :term:`gobj` child to remove. This function is called by :meth:`destroy_gobj`. """ if gobj in self.dl_childs: self.dl_childs.remove(gobj) gobj.parent = None def __getitem__(self, name): """ Enable gobjs tree to work with Pyramid traversal URL dispatch. """ trace_traverse = self.config.trace_traverse if trace_traverse and self.logger: self.logger.debug("==> TRAVERSING('%s') -> %s" % ( name, self.resource_path())) if name is None: raise KeyError('Parameter name cannot be None') # Firstly search no re_name for gobj in self.dl_childs: if gobj.config.re_name: continue else: if gobj.name == name: if trace_traverse and self.logger: self.logger.debug(" TRAVERSING FOUND!") return gobj # Secondly search re_name for gobj in self.dl_childs: if gobj.config.re_name: if gobj._re_compiled_name.match(name) is not None: # this gobj has been got as re_matched_name. gobj.re_matched_name = name if trace_traverse and self.logger: self.logger.debug(" TRAVERSING %s FOUND RE!" % name) return gobj else: continue if trace_traverse and self.logger: self.logger.debug("<== TRAVERSING('%s') -> %s END!" % ( name, self.resource_path())) raise KeyError('No such child named %s' % name) def __iter__(self): """ Iterates over child elements. """ return self.dl_childs.__iter__() # helper for pyramid def resource_path(self, separator='/'): """ Same as Pyramid traversal.resource_path, but without escaping. """ path = [loc.__name__ or '' for loc in lineage(self)] path.reverse() return path and separator.join([x for x in path]) or separator
[docs] def overwrite_parameters(self, level, **settings): """ The parameters in settings must be defined in the gobj. :param level: level trace of childs. indicates the depth of the childs as far to change. * `0` only this gobj. * `-1` all tree of childs. :param settings: parameters and their values. The settings are filtered by the named-gobj or gclass name of this gobj. The parameter name in settings, must be a dot-named, with the first item being the named-gobj o gclass name. """ parameters = self.filter_parameters(**settings) self.write_parameters(**parameters) if level > 0: for child in self.dl_childs: child.overwrite_parameters(level - 1, **settings)
[docs] def overwrite_few_parameters(self, parameter_list, **settings): """ The parameters in settings must be defined in the gobj. :param parameter_list: write only the parameters in ``parameter_list``. :param settings: parameters and their values. The settings are filtered by the named-gobj or gclass name of this gobj. The parameter name in settings, must be a dot-named, with the first item being the named-gobj o gclass name. """ parameters = self.filter_parameters(**settings) self.write_few_parameters(parameter_list, **parameters)
def inside(resource1, resource2): """Is ``resource1`` 'inside' ``resource2``? Return ``True`` if so, else ``False``. ``resource1`` is 'inside' ``resource2`` if ``resource2`` is a :term:`lineage` ancestor of ``resource1``. It is a lineage ancestor if its parent (or one of its parent's parents, etc.) is an ancestor. """ while resource1 is not None: if resource1 is resource2: return True resource1 = resource1.__parent__ return False def lineage(resource): """ Return a generator representing the :term:`lineage` of the :term:`resource` object implied by the ``resource`` argument. The generator first returns ``resource`` unconditionally. Then, if ``resource`` supplies a ``__parent__`` attribute, return the resource represented by ``resource.__parent__``. If *that* resource has a ``__parent__`` attribute, return that resource's parent, and so on, until the resource being inspected either has no ``__parent__`` attribute or which has a ``__parent__`` attribute of ``None``. For example, if the resource tree is:: thing1 = Thing() thing2 = Thing() thing2.__parent__ = thing1 Calling ``lineage(thing2)`` will return a generator. When we turn it into a list, we will get:: list(lineage(thing2)) [ <Thing object at thing2>, <Thing object at thing1> ] """ while resource is not None: yield resource # The common case is that the AttributeError exception below # is exceptional as long as the developer is a "good citizen" # who has a root object with a __parent__ of None. Using an # exception here instead of a getattr with a default is an # important micro-optimization, because this function is # called in any non-trivial application over and over again to # generate URLs and paths. try: resource = resource.__parent__ except AttributeError: resource = None