""" Contain utility functions and decorators useful when defining custom
:class:`.Arbitrary` instances. Usually they haven't much use otherwise.
"""
from itertools import cycle, islice, count, product as product_
import random
from decorator import decorator
from six import PY3
from six.moves import zip_longest
from pyquchk.lazylist import LazyList
__all__ = ['roundrobin', 'interleave', 'product',
'choice_weighted',
'filter_possible', 'until_possible', 'unique']
@decorator
[docs]def filter_possible(method, self, *args, **kwargs):
""" Decorator, when applied to a generator method (e.g. :meth:`.gen_serial`)
filters generated values to keep only those for which :meth:`.could_generate`
returns ``True``.
"""
fails_cnt = 0
for item in method(self, *args, **kwargs):
if self.could_generate(item):
yield item
fails_cnt = 0
else:
fails_cnt += 1
if fails_cnt > 10000:
raise RuntimeError('Unable to satisfy could_generate after 10000 items from gen_serial in a %s instance.' % type(self).__name__)
@decorator
[docs]def until_possible(method, self, *args, **kwargs):
""" Decorator, when applied to a method returning different values each time
(e.g. :meth:`.next_random`) runs this method multiple times until a value
for which :meth:`.could_generate` returns ``True`` is returned, and finally
returns this value.
"""
fails_cnt = 0
func = lambda: method(self, *args, **kwargs)
for item in iter(func, object()):
if self.could_generate(item):
return item
else:
fails_cnt += 1
if fails_cnt > 10000:
raise RuntimeError('Unable to satisfy could_generate after 10000 calls of next_random in a %s instance.' % type(self).__name__)
@decorator
[docs]def unique(method, self, *args):
""" Decorator, when applied to a generator method (e.g. :meth:`.gen_serial`,
:meth:`.shrink`) filters generated values to keep only unique values.
For the :meth:`.shrink` method the value passed to the method is also
considered.
"""
seen = set()
if 'shrink' in method.__name__:
seen.add(args[0])
for item in method(self, *args):
if isinstance(item, list):
s_item = tuple(item)
elif isinstance(item, dict):
s_item = tuple(item.items())
else:
s_item = item
if s_item not in seen:
yield item
seen.add(s_item)
[docs]def roundrobin(iterables):
""" Yield items from the given iterables in a round-robin fashion.
What it does exactly is better explained with a simple example:
.. ipython::
>>> list(roundrobin(['ABC', 'D', 'EF']))
['A', 'D', 'E', 'B', 'F', 'C']
:param iterables: List of finite or infinite iterables.
"""
# Recipe credited to George Sakkis
pending = len(iterables)
iterators = (iter(it) for it in iterables)
nexts = cycle(it.__next__ if PY3 else it.next for it in iterators)
while pending:
try:
for next_f in nexts:
yield next_f()
except StopIteration:
pending -= 1
nexts = cycle(islice(nexts, pending))
[docs]def interleave(xs, ys):
"""
>>> list(interleave([], []))
[]
>>> list(interleave([1], []))
[1]
>>> list(interleave([], ['a']))
['a']
>>> list(interleave([1], ['a']))
[1, 'a']
>>> list(interleave([1, 2], ['a']))
[1, 'a', 2]
>>> list(interleave([1], ['a', 'b']))
[1, 'a', 'b']
>>> list(interleave([1, 2], ['a', 'b']))
[1, 'a', 2, 'b']
"""
for tup in zip_longest(xs, ys):
for x in tup:
if x is not None:
yield x
[docs]def product(*iterables, **kwargs):
""" product(*iterable, product=1)
Cartesian product of all the iterables.
Difference compared to the :func:`itertools.product` is that tuples are yield
in the order suitable for infinite iterables (see examples below).
Corner cases:
.. ipython::
>>> list(product([], []))
[]
>>> list(product([1, 2, 3], []))
[]
>>> list(product([], [1, 2, 3]))
[]
2 iterables:
.. ipython::
>>> list(product([1], [1]))
[(1, 1)]
>>> list(product([1, 2], [1]))
[(1, 1), (2, 1)]
>>> list(product([1, 2], [1, 2]))
[(1, 1), (1, 2), (2, 1), (2, 2)]
>>> list(product([1, 2, 3], [1, 2, 3]))
[(1, 1), (1, 2), (2, 1), (2, 2), (1, 3), (2, 3), (3, 1), (3, 2), (3, 3)]
>>> list(product([1, 2, 3], [1, 2, 3])) == list(product([1, 2, 3], repeat=2))
True
>>> list(islice(product(count(1), count(1)), 1000)) == list(islice(product(count(1), repeat=2), 1000))
True
>>> infNN = product(count(1), count(1))
>>> list(islice(infNN, 20))
[(1, 1), (1, 2), (2, 1), (2, 2), (1, 3), (2, 3), (3, 1), (3, 2), (3, 3), (1, 4), (2, 4), (3, 4), (4, 1), (4, 2), (4, 3), (4, 4), (1, 5), (2, 5), (3, 5), (4, 5)]
>>> lstNN = list(islice(infNN, 10000))
>>> all(max(x) <= max(y) for x, y in zip(lstNN, lstNN[1:]))
True
3 iterables:
.. ipython::
>>> list(product([1, 2], [1, 2], [1, 2]))
[(1, 1, 1), (1, 1, 2), (1, 2, 1), (1, 2, 2), (2, 1, 1), (2, 1, 2), (2, 2, 1), (2, 2, 2)]
>>> len(list(product([1, 2, 3], [1, 2, 3], [1, 2, 3])))
27
>>> infNNN = product(count(1), count(1), count(1))
>>> list(islice(infNNN, 20))
[(1, 1, 1), (1, 1, 2), (1, 2, 1), (1, 2, 2), (2, 1, 1), (2, 1, 2), (2, 2, 1), (2, 2, 2), (1, 1, 3), (1, 2, 3), (2, 1, 3), (2, 2, 3), (1, 3, 1), (1, 3, 2), (1, 3, 3), (2, 3, 1), (2, 3, 2), (2, 3, 3), (3, 1, 1), (3, 1, 2)]
>>> lstNNN = list(islice(infNNN, 10000))
>>> all(max(x) <= max(y) for x, y in zip(lstNNN, lstNNN[1:]))
True
"""
iterables = list(map(LazyList, iterables)) * kwargs.get('repeat', 1)
for i in count():
yielded_any = False
for main_j, main_it in reversed(list(enumerate(iterables))):
try:
x = main_it[i]
except IndexError:
continue
prod = product_(*[it[:i + 1 if j > main_j else i] for j, it in
enumerate(iterables) if j != main_j])
for tup in prod:
yield tup[:main_j] + (x,) + tup[main_j:]
yielded_any = True
if not yielded_any:
break
[docs]def choice_weighted(elements):
""" Like :func:`random.choice`, but the probability an item is chosen is
proportional to its weight.
For convenience, this function is available as ``random.choice_weighted``
if you imported :mod:`.utils`.
:param elements: list of tuples ``(item, weight)``, weight is any numeric value
"""
rnd = random.random() * sum(w for _, w in elements)
for el, w in elements:
rnd -= w
if rnd < 0:
return el
random.choice_weighted = choice_weighted