1 """Unit tests for qbuf.
2 """
3 import StringIO
4 import unittest
5 import random
6 import struct
7 import qbuf
8
10 - def __init__(self, case, buf=None, data=None, data_length=10240):
11 self.case = case
12 if buf is None:
13 buf = qbuf.BufferQueue()
14 self.test_buf = buf
15 if data is None:
16 data = ''.join([chr(random.randrange(256))
17 for _ in xrange(data_length)])
18 self.in_buf = StringIO.StringIO(data)
19 self.out_buf = StringIO.StringIO(data)
20 self.size_delta = 0
21
22 - def push(self, size):
23 data = self.in_buf.read(size)
24 self.case.assertEquals(len(data), size)
25 self.test_buf.push(data)
26 self.size_delta += size
27
28 - def pop(self, size=None):
29 if size is None:
30 size = self.size_delta
31 self.case.assertEquals(
32 self.out_buf.read(size), self.test_buf.pop(size))
33 self.size_delta -= size
34
36 line = self.test_buf.popline()
37 size = len(line) + len(self.test_buf.delimiter)
38 self.case.assertEquals(
39 self.out_buf.read(size), line + self.test_buf.delimiter)
40 self.size_delta -= size
41
49
51 self.test_buf.clear()
52 self.out_buf.seek(self.size_delta, 1)
53 self.size_delta = 0
54
57
60
62 self.case.assertEquals(len(self.test_buf), 0)
63
71
80
89
91 data = (
92 'extra '
93 'chaff '
94 'foo ***'
95 ' bar **'
96 '* baz *'
97 '** bat *'
98 '*'
99 '* blat '
100 '***'
101 '**'
102 )
103 pair = BufferPair(self, qbuf.BufferQueue('***'), data)
104 for x in [6, 6, 7, 7, 7, 8, 1, 7, 3, 2]:
105 pair.push(x)
106 for x in xrange(5):
107 pair.popline()
108 pair.pop(2)
109 pair.close()
110
124
126 buf = qbuf.BufferQueue()
127 self.assertEquals(
128 '<BufferQueue of 0 bytes at %#x>' % id(buf), repr(buf))
129 buf.push('foobar')
130 self.assertEquals(
131 '<BufferQueue of 6 bytes at %#x>' % id(buf), repr(buf))
132
134 buf = qbuf.BufferQueue('\n')
135 buf.push('foo\nbar\nbaz')
136 self.assertEquals(['foo\n', 'bar\n'], list(buf))
137 self.assertEquals('baz', buf.pop())
138
140 buf = qbuf.BufferQueue()
141 self.assertRaises(ValueError, buf.next)
142 self.assertRaises(ValueError, buf.popline)
143 self.assertRaises(ValueError, buf.popline, '')
144 self.assertRaises(ValueError, buf.poplines)
145 self.assertRaises(ValueError, buf.pop, -1)
146 self.assertRaises(ValueError, buf.pop_atmost, -1)
147 self.assertRaises(qbuf.BufferUnderflow, buf.pop, 1)
148 self.assertRaises(TypeError, buf.push, None)
149 self.assertRaises(TypeError, buf.push_many, None)
150 self.assertRaises(ValueError, buf.push_many, [None])
151 buf.delimiter = '***'
152 self.assertRaises(ValueError, buf.popline)
153
162
171
184
186 buf = qbuf.BufferQueue()
187 buf.push_many(['foo', 'bar'])
188 for part in ['fo', 'ob', 'ar']:
189 b = buf.pop_view(2)
190 self.assert_(isinstance(b, buffer))
191 self.assertEquals(part, b[:])
192 self.assertRaises(qbuf.BufferUnderflow, buf.pop_view, 2)
193 b = buf.pop_view(0)
194 self.assert_(isinstance(b, buffer))
195 self.assertEquals('', b[:])
196 buf.push_many(['foo', 'bar', 'baz'])
197 b = buf.pop_view()
198 self.assert_(isinstance(b, buffer))
199 self.assertEquals('foobarbaz', b[:])
200
211