Source code for traffic.libloop

"""
Tools for working with :py:class:`traffic.abstract.Traffic` cycles.
"""
import operator
import collections
from . import protocol

#: Used to hold a snapshot of events and data that were produced by a Transit and their
#: associated information or callback. This is useful for extracting transfer
#: data from a Transit that can be used outside of the Traffic Cycle.
#:
#: Attributes:
#:
#:  **transit**
#:   The :py:class:`traffic.abstract.Transit` instance.
##:
#:  **terminated**
#:   Whether or not a terminate event occurred: `None` if not terminated.
#:   Otherwise, the result of the :py:meth:`traffic.abstract.Transit.failure` method.
#:
#:  **transferred**
#:   An integer designating the amount transferred iff the transit sends.
#:   The transfer slice of the resource iff the transit receives.
#:
#:  **demand**
#:   The :py:meth:`traffic.abstract.Transit.acquire` method of the Transit if an exhaust event
#:   occurred, otherwise, `None`.
#:
Activity = collections.namedtuple("Activity",
	("transit", "termination", "transferred", "demand",)
)

[docs]def activity(transit, Activity = Activity, len = len): """ activity(transit) Given a :py:class:`traffic.abstract.Transit`, get an :py:class:`traffic.libloop.Activity` snapshot of the Transit. """ termination = None # not terminated demand = None xfer = None s = transit.transfer() if s is not None: xfer = transit.resource[s] if transit.exhausted: demand = transit.acquire if transit.terminated: termination = transit.status() return Activity(transit, termination, xfer, demand)
[docs]def snapshot(traffic, _list = list, _map = map): """ :param traffic: The Traffic instance to extract Activity from. :type traffic: :py:class:`traffic.abstract.Traffic` :returns: List of Activity instances. :rtype: [:py:class:`Activity`] Given a :py:class:`traffic.abstract.Traffic` instance, get a snapshot of all :py:func:`traffic.libloop.activity`. """ # list() is invoked here as materialization of the # snapshot is needed in cases where the cycle exits. (like libloop.cycle) return _list(_map(activity, traffic.transfer()))
[docs]def cycle(traffic): """ :param traffic: The Traffic to cycle. :type traffic: :py:class:`traffic.abstract.Traffic` :returns: List of Activity instances. :rtype: [:py:class:`Activity`] Cycle the traffic instance and return a :py:func:`traffic.libloop.snapshot` list of activity. .. warning:: If insufficient memory is available for building the snapshot, transfer-loss *will* occur. """ with traffic: return snapshot(traffic)
[docs]def loop(deliver, traffic, _cycle = cycle): """ loop(deliver, traffic) :param deliver: The callable that is given the sequence of :py:class:`Activity` instances. :type deliver: callable :param traffic: The Traffic instance that is being cycled until termination. :type traffic: :py:class:`traffic.abstract.Traffic` :returns: None Function that runs a Traffic loop and delivers sequences of :py:class:`Activity` instances into the given :py:obj:`deliver` callable so long as the Traffic instance is not terminated:: Q = queue.Queue(8) T = libtraffic.Traffic() libloop.loop(Q.put, T) Running the call to loop in a thread is an appropriate way to collect events. By doing this, events and wire transfers can be processed in parallel. """ while not traffic.terminated: deliver(_cycle(traffic))