Source code for traffic.libloop
"""
Tools for working with :py:class:`traffic.abstract.Traffic` cycles.
"""
import operator
import collections
from . import protocol
#: Used to hold a snapshot of events and data that were produced by a Transit and their
#: associated information or callback. This is useful for extracting transfer
#: data from a Transit that can be used outside of the Traffic Cycle.
#:
#: Attributes:
#:
#: **transit**
#: The :py:class:`traffic.abstract.Transit` instance.
##:
#: **terminated**
#: Whether or not a terminate event occurred: `None` if not terminated.
#: Otherwise, the result of the :py:meth:`traffic.abstract.Transit.failure` method.
#:
#: **transferred**
#: An integer designating the amount transferred iff the transit sends.
#: The transfer slice of the resource iff the transit receives.
#:
#: **demand**
#: The :py:meth:`traffic.abstract.Transit.acquire` method of the Transit if an exhaust event
#: occurred, otherwise, `None`.
#:
Activity = collections.namedtuple("Activity",
("transit", "termination", "transferred", "demand",)
)
[docs]def activity(transit, Activity = Activity, len = len):
"""
activity(transit)
Given a :py:class:`traffic.abstract.Transit`, get an :py:class:`traffic.libloop.Activity`
snapshot of the Transit.
"""
termination = None # not terminated
demand = None
xfer = None
s = transit.transfer()
if s is not None:
xfer = transit.resource[s]
if transit.exhausted:
demand = transit.acquire
if transit.terminated:
termination = transit.status()
return Activity(transit, termination, xfer, demand)
[docs]def snapshot(traffic, _list = list, _map = map):
"""
:param traffic: The Traffic instance to extract Activity from.
:type traffic: :py:class:`traffic.abstract.Traffic`
:returns: List of Activity instances.
:rtype: [:py:class:`Activity`]
Given a :py:class:`traffic.abstract.Traffic` instance, get a snapshot of all
:py:func:`traffic.libloop.activity`.
"""
# list() is invoked here as materialization of the
# snapshot is needed in cases where the cycle exits. (like libloop.cycle)
return _list(_map(activity, traffic.transfer()))
[docs]def cycle(traffic):
"""
:param traffic: The Traffic to cycle.
:type traffic: :py:class:`traffic.abstract.Traffic`
:returns: List of Activity instances.
:rtype: [:py:class:`Activity`]
Cycle the traffic instance and return a :py:func:`traffic.libloop.snapshot`
list of activity.
.. warning:: If insufficient memory is available for building the snapshot,
transfer-loss *will* occur.
"""
with traffic:
return snapshot(traffic)
[docs]def loop(deliver, traffic, _cycle = cycle):
"""
loop(deliver, traffic)
:param deliver: The callable that is given the sequence of :py:class:`Activity` instances.
:type deliver: callable
:param traffic: The Traffic instance that is being cycled until termination.
:type traffic: :py:class:`traffic.abstract.Traffic`
:returns: None
Function that runs a Traffic loop and delivers sequences of
:py:class:`Activity` instances into the given :py:obj:`deliver` callable
so long as the Traffic instance is not terminated::
Q = queue.Queue(8)
T = libtraffic.Traffic()
libloop.loop(Q.put, T)
Running the call to loop in a thread is an appropriate way to collect events.
By doing this, events and wire transfers can be processed in parallel.
"""
while not traffic.terminated:
deliver(_cycle(traffic))