Package qbuf :: Module test

Source Code for Module qbuf.test

  1  """Unit tests for qbuf. 
  2  """ 
  3  import StringIO 
  4  import unittest 
  5  import random 
  6  import struct 
  7  import qbuf 
  8   
9 -class BufferPair(object):
10 - def __init__(self, case, buf=None, data=None, data_length=10240):
11 self.case = case 12 if buf is None: 13 buf = qbuf.BufferQueue() 14 self.test_buf = buf 15 if data is None: 16 data = ''.join([chr(random.randrange(256)) 17 for _ in xrange(data_length)]) 18 self.in_buf = StringIO.StringIO(data) 19 self.out_buf = StringIO.StringIO(data) 20 self.size_delta = 0
21
22 - def push(self, size):
23 data = self.in_buf.read(size) 24 self.case.assertEquals(len(data), size) 25 self.test_buf.push(data) 26 self.size_delta += size
27
28 - def pop(self, size=None):
29 if size is None: 30 size = self.size_delta 31 self.case.assertEquals( 32 self.out_buf.read(size), self.test_buf.pop(size)) 33 self.size_delta -= size
34
35 - def popline(self):
36 line = self.test_buf.popline() 37 size = len(line) + len(self.test_buf.delimiter) 38 self.case.assertEquals( 39 self.out_buf.read(size), line + self.test_buf.delimiter) 40 self.size_delta -= size
41
42 - def popline_with_delim(self, delimiter):
43 line = self.test_buf.popline(delimiter) 44 if delimiter is None: 45 delimiter = self.test_buf.delimiter 46 size = len(line) + len(delimiter) 47 self.case.assertEquals(self.out_buf.read(size), line + delimiter) 48 self.size_delta -= size
49
50 - def clear(self):
51 self.test_buf.clear() 52 self.out_buf.seek(self.size_delta, 1) 53 self.size_delta = 0
54
55 - def __enter__(self):
56 return self
57
58 - def __exit__(self, *exc_info):
59 self.close()
60
61 - def close(self):
62 self.case.assertEquals(len(self.test_buf), 0)
63
64 -class QbufTest(unittest.TestCase):
65 - def test_growth(self):
66 pair = BufferPair(self) 67 for x in xrange(128): 68 pair.push(x + 1) 69 pair.pop() 70 pair.close()
71
72 - def test_circularity(self):
73 pair = BufferPair(self) 74 pair.push(24) 75 for x in xrange(16): 76 pair.push(24) 77 pair.pop(24) 78 pair.pop(24) 79 pair.close()
80
82 pair = BufferPair(self) 83 pair.push(23) 84 for x in xrange(128): 85 pair.push(23) 86 pair.pop(6) 87 pair.pop(23*129 - 6*128) 88 pair.close()
89
90 - def test_delimiter_search(self):
91 data = ( 92 'extra ' # 6 93 'chaff ' # 6 94 'foo ***' # 7 95 ' bar **' # 7 96 '* baz *' # 7 97 '** bat *' # 8 98 '*' # 1 99 '* blat ' # 7 100 '***' # 3 101 '**' # 2 102 ) 103 pair = BufferPair(self, qbuf.BufferQueue('***'), data) 104 for x in [6, 6, 7, 7, 7, 8, 1, 7, 3, 2]: 105 pair.push(x) 106 for x in xrange(5): 107 pair.popline() 108 pair.pop(2) 109 pair.close()
110
111 - def test_passed_delimiter(self):
112 data = ( 113 'foo*bar&f' # 9 114 'oo*bar*foo&bar' # 14 115 ) 116 pair = BufferPair(self, qbuf.BufferQueue('&'), data) 117 for x in [9, 14]: 118 pair.push(x) 119 pair.popline_with_delim('*') 120 pair.popline_with_delim('&') 121 pair.popline_with_delim(None) 122 pair.pop(3) 123 pair.close()
124
125 - def test_repr(self):
126 buf = qbuf.BufferQueue() 127 self.assertEquals( 128 '<BufferQueue of 0 bytes at %#x>' % id(buf), repr(buf)) 129 buf.push('foobar') 130 self.assertEquals( 131 '<BufferQueue of 6 bytes at %#x>' % id(buf), repr(buf))
132
133 - def test_iter(self):
134 buf = qbuf.BufferQueue('\n') 135 buf.push('foo\nbar\nbaz') 136 self.assertEquals(['foo\n', 'bar\n'], list(buf)) 137 self.assertEquals('baz', buf.pop())
138
139 - def test_exceptions(self):
140 buf = qbuf.BufferQueue() 141 self.assertRaises(ValueError, buf.next) 142 self.assertRaises(ValueError, buf.popline) 143 self.assertRaises(ValueError, buf.popline, '') 144 self.assertRaises(ValueError, buf.poplines) 145 self.assertRaises(ValueError, buf.pop, -1) 146 self.assertRaises(ValueError, buf.pop_atmost, -1) 147 self.assertRaises(qbuf.BufferUnderflow, buf.pop, 1) 148 self.assertRaises(TypeError, buf.push, None) 149 self.assertRaises(TypeError, buf.push_many, None) 150 self.assertRaises(ValueError, buf.push_many, [None]) 151 buf.delimiter = '***' 152 self.assertRaises(ValueError, buf.popline)
153
154 - def test_corners(self):
155 buf = qbuf.BufferQueue() 156 buf.push('') 157 self.assertEquals('', buf.pop(0)) 158 self.assertEquals(None, buf.delimiter) 159 buf.delimiter = 'foo' 160 buf.delimiter = '' 161 self.assertEquals(None, buf.delimiter)
162
163 - def test_pushpop(self):
164 buf = qbuf.BufferQueue() 165 buf.push_many(['foo', 'bar', 'baz']) 166 self.assertEquals('foobarbaz', buf.pop_atmost(10)) 167 buf.delimiter = '\n' 168 buf.push('foo\nbar\nbaz') 169 self.assertEquals(['foo', 'bar'], buf.poplines()) 170 self.assertEquals('baz', buf.pop())
171
172 - def test_clear(self):
173 pair = BufferPair(self) 174 for x in xrange(24): 175 pair.push(8) 176 pair.clear() 177 self.assertRaises(qbuf.BufferUnderflow, pair.test_buf.pop, 1) 178 for x in xrange(12): 179 pair.push(8) 180 pair.pop(8*6) 181 pair.clear() 182 self.assertRaises(qbuf.BufferUnderflow, pair.test_buf.pop, 1) 183 pair.close()
184
185 - def test_pop_view(self):
186 buf = qbuf.BufferQueue() 187 buf.push_many(['foo', 'bar']) 188 for part in ['fo', 'ob', 'ar']: 189 b = buf.pop_view(2) 190 self.assert_(isinstance(b, buffer)) 191 self.assertEquals(part, b[:]) 192 self.assertRaises(qbuf.BufferUnderflow, buf.pop_view, 2) 193 b = buf.pop_view(0) 194 self.assert_(isinstance(b, buffer)) 195 self.assertEquals('', b[:]) 196 buf.push_many(['foo', 'bar', 'baz']) 197 b = buf.pop_view() 198 self.assert_(isinstance(b, buffer)) 199 self.assertEquals('foobarbaz', b[:])
200
201 - def test_pop_struct(self):
202 buf = qbuf.BufferQueue() 203 buf.push('\x01\x02\x03\x04\x05\x06') 204 a, b = buf.pop_struct('!BH') 205 self.assertEquals(a, 0x1) 206 self.assertEquals(b, 0x203) 207 a, = buf.pop_struct('!H') 208 self.assertEquals(a, 0x405) 209 self.assertRaises(qbuf.BufferUnderflow, buf.pop_struct, '!H') 210 self.assertRaises(struct.error, buf.pop_struct, '_bad_struct_format')
211