""" The ``collections`` module contains types and functions that define
various collections and algorithms.
"""
import struct
import zlib
from operator import itemgetter
from wheezy.core.comp import defaultdict
from wheezy.core.comp import ntob
GZIP_HEADER = ntob('\x1F\x8B\x08\x00\x00\x00\x00\x00\x02\xFF', 'latin1')
MAX_INT = int('FFFFFFFF', 16)
def first_item_adapter(adaptee):
[docs] """ Adapts ``defaultdict(list)``.__getitem__ accessor to
return the first item from the list.
>>> d = defaultdict(list)
>>> d['a'].extend([1, 2, 3])
>>> a = first_item_adapter(d)
Return a first item from the list.
>>> a['a']
1
"""
return ItemAdapter(adaptee, 0)
def last_item_adapter(adaptee):
[docs] """ Adapts ``defaultdict(list)``.__getitem__ accessor to
return the last item from the list.
>>> d = defaultdict(list)
>>> d['a'].extend([1, 2, 3])
>>> a = last_item_adapter(d)
Return a last item from the list.
>>> a['a']
3
"""
return ItemAdapter(adaptee, -1)
class ItemAdapter(object):
[docs] """ Adapts ``defaultdict(list)``.__getitem__ accessor to
return item at ``index`` from the list. If ``key`` is not
found return None.
"""
__slots__ = ('adaptee', 'index')
def __init__(self, adaptee, index):
""" ``adaptee`` must be defaultdict(list).
>>> d = defaultdict(list)
>>> a = ItemAdapter(d, 0)
Otherwise raise ``TypeError``.
>>> ItemAdapter(None, 0) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
TypeError: ...
"""
if adaptee is None or not isinstance(adaptee, dict):
raise TypeError('first argument must be defaultdict(list)')
self.adaptee = adaptee
self.index = index
def __getitem__(self, key):
"""
>>> d = defaultdict(list)
>>> d['a'].extend([1, 2, 3])
>>> a = ItemAdapter(d, 0)
Return a first item from the list.
>>> a['a']
1
>>> a = ItemAdapter(d, -1)
Return a last item from the list.
>>> a['a']
3
If ``key`` not found return ``None``.
>>> a['x']
"""
l = self.adaptee[key]
if l:
return l[self.index]
return None
def get(self, key, default=None):
[docs] """ Return the value for key if key is in the adaptee,
else default. If default is not given, it defaults
to None, so that this method never raises a KeyError.
>>> d = defaultdict(list)
>>> d['a'].extend([1, 2, 3])
>>> a = ItemAdapter(d, 0)
>>> a.get('a')
1
>>> a.get('b', 100)
100
"""
if key in self.adaptee:
l = self.adaptee[key]
if l:
return l[self.index]
return default
class attrdict(dict):
[docs] """ A dictionary with attribute-style access. Maps attribute
access to dictionary.
>>> d = attrdict(a=1, b=2)
>>> sorted(d.items())
[('a', 1), ('b', 2)]
>>> d.a
1
>>> d.c = 3
>>> d.c
3
>>> d.d # doctest: +ELLIPSIS
Traceback (most recent call last):
...
AttributeError: ...
"""
__slots__ = ()
def __setattr__(self, key, value):
return super(attrdict, self).__setitem__(key, value)
def __getattr__(self, name):
try:
return super(attrdict, self).__getitem__(name)
except KeyError:
raise AttributeError(name)
class defaultattrdict(defaultdict):
[docs] """ A dictionary with attribute-style access. Maps attribute
access to dictionary.
>>> d = defaultattrdict(str, a=1, b=2)
>>> d.a
1
>>> d.c = 3
>>> d.c
3
>>> d.d
''
"""
__slots__ = ()
def __setattr__(self, key, value):
return super(defaultattrdict, self).__setitem__(key, value)
def __getattr__(self, name):
return super(defaultattrdict, self).__getitem__(name)
def distinct(seq):
[docs] """ Returns generator for unique items in ``seq`` with preserved
order.
>>> list(distinct('1234512345'))
['1', '2', '3', '4', '5']
If the order is not important consider using ``set`` which is
approximately eight times faster on large sequences.
>>> sorted(list(set('1234512345')))
['1', '2', '3', '4', '5']
"""
unique = {}
for item in seq:
if item not in unique:
unique[item] = None
yield item
def gzip_iterator(items, compress_level=6):
[docs] """ Iterates over ``items`` and returns generator of gzipped items.
``items`` - a list of bytes
>>> items = [ntob('Hello World', 'latin1')]
>>> result = list(gzip_iterator(items))
>>> assert 3 == len(result)
>>> assert GZIP_HEADER == result[0]
"""
size = 0
crc = 0
gzip = zlib.compressobj(
compress_level, zlib.DEFLATED, -zlib.MAX_WBITS,
zlib.DEF_MEM_LEVEL, 0)
yield GZIP_HEADER
for item in items:
size += len(item)
crc = zlib.crc32(item, crc) & MAX_INT
chunk = gzip.compress(item)
if chunk: # pragma: nocover
yield chunk
yield gzip.flush()
yield struct.pack('<2L', crc, size & MAX_INT)
def map_keys(function, dictionary):
[docs] """ Apply `function` to every key of `dictionary` and return
a dictionary of the results.
>>> d = {'1': 1, '2': 2}
>>> sorted_items(map_keys(lambda key: 'k' + key, d))
[('k1', 1), ('k2', 2)]
"""
return dict([(function(key), value) for key, value in dictionary.items()])
def map_values(function, dictionary):
[docs] """ Apply `function` to every value of `dictionary` and return
a dictionary of the results.
>>> d = {'1': 1, '2': 2}
>>> sorted_items(map_values(lambda value: 2 * value, d))
[('1', 2), ('2', 4)]
"""
return dict([(key, function(value)) for key, value in dictionary.items()])
def list_values(keys, dictionary):
[docs] """ Returns `dictionary` values orderd by `keys`.
>>> d = {'1': 1, '2': 2}
>>> list_values(['1', '2', '3'], d)
[1, 2, None]
"""
return [key in dictionary and dictionary[key] or None for key in keys]
def sorted_items(dictionary):
[docs] """ Returns `dictionary` items sorted by key.
>>> d = {'1': 1, '2': 2}
>>> sorted_items(d)
[('1', 1), ('2', 2)]
"""
return sorted(dictionary.items(), key=itemgetter(1))