Source code for intessa.conneg.streaming
class StreamingBody(object):
"""
A simple class to implement streaming request bodies with lengths.
[docs]
>>> from cStringIO import StringIO
>>> sb = StreamingBody(StringIO("ABCD"), 4)
>>> sb # doctest: +ELLIPSIS
<StreamingBody(<cStringIO.StringI object at 0x...>, 4)>
>>> len(sb)
4
Features automatic length-checking on files:
>>> sb = StreamingBody(open('file.txt')) # doctest: +SKIP
>>> sb # doctest: +SKIP
<StreamingBody(<open file 'file.txt', mode 'r' at 0x...>, 111)>
"""
def __init__(self, body, length=None):
self.body = body
if length is None and hasattr(self.body, 'fileno'):
try:
body_stat = os.fstat(self.body.fileno())
if stat.S_ISREG(body_stat.st_mode):
length = body_stat.st_size
except OSError:
length = None
if length is None:
raise TypeError("Needs a `length` argument (cannot stat: %r)" %
self.body)
self.length = length
def __repr__(self):
return '<StreamingBody(%r, %d)>' % (self.body, self.length)
def __len__(self):
return self.length
def __getattr__(self, attr):
return getattr(self.body, attr)