1 """Twisted support for qbuf.
2
3 MODE_RAW, MODE_DELIMITED, and MODE_STATEFUL are module-level constants for
4 changing the buffering mode of MultiBufferer.
5 """
6
7
8 from qbuf import BufferQueue, BufferUnderflow
9 from twisted.internet import protocol, defer
10 import collections
11 import struct
12
13 MODE_RAW, MODE_DELIMITED, MODE_STATEFUL = xrange(3)
14
16 """A replacement for a couple of buffering classes provided by twisted.
17
18 Without subclassing, it can work the same way as LineReceiver and
19 StatefulProtocol.
20
21 When the buffering mode is MODE_RAW, rawDataReceived is called with all of
22 the data that comes in.
23
24 When the buffering mode is MODE_DELIMITED, lineReceived is called with
25 each complete line without the delimiter on the end. The 'delimiter'
26 instance attribute is used for keeping track of the current delimiter.
27
28 When the buffering mode is MODE_STATEFUL, a user-passed function is
29 called for every so many bytes received. If self.current_state is None,
30 getInitialState is called to get the initial state. See the documentation
31 on getInitialState.
32
33 MultiBufferers can also return Deferreds that are fired when a certain
34 amount of data has been sent over the wire. This is intended for use with
35 twisted.internet.defer.inlineCallbacks.
36 """
37 mode = MODE_RAW
38 initial_delimiter = '\r\n'
39 current_state = None
40 _closed = False
41
45
46 - def read(self, size=None):
47 """Wait for some data to be received.
48
49 If 'size' is provided, wait for that many bytes to be received.
50 Otherwise, wait for the next chunk of incoming data, regardless of the
51 size. Returns a Deferred that will be fired with the received data.
52 """
53 d = defer.Deferred()
54 if size is None:
55 self._callbacks.append((d, MODE_RAW, None))
56 else:
57 self._callbacks.append((d, MODE_STATEFUL, size))
58 return d
59
61 """Wait for some struct to be received.
62
63 This method takes a struct format as understood by the 'struct' module
64 and waits for enough data to be received to unpack it. Returns a
65 Deferred that will be fired with the unpacked struct.
66 """
67 def _transform(data):
68 return struct.unpack(fmt, data)
69 d = self.read(struct.calcsize(fmt))
70 d.addCallback(_transform)
71 return d
72
74 """Wait for a line to be received.
75
76 Wait for a line of data to be received. If 'delimiter' is provided, the
77 underlying BufferQueue's delimiter will be set to whatever is provided
78 when the MultiBufferer is ready to receive that line. Returns a
79 Deferred that will be fired with the received line, without delimiter.
80 """
81 d = defer.Deferred()
82 self._callbacks.append((d, MODE_DELIMITED, delimiter))
83 return d
84
86 """Send some data over the wire.
87
88 This method merely forwards the data to the underlying transport,
89 provided to parallel the read/readline methods.
90 """
91 self.transport.write(data)
92
94 d, _, _ = self._callbacks.popleft()
95 d.callback(data)
96
135
136 - def setMode(self, mode, extra='', flush=False, state=None, delimiter=None):
137 """Change the buffering mode.
138
139 If 'extra' is provided, add that to the buffer. If 'flush' is True and
140 'extra' is provided, also flush the buffer as much as possible. If
141 'state' is not None, that value will be assigned to self.current_state
142 before anything else. If 'delimiter' is not None, the delimiter will
143 be set before anything else.
144 """
145 self.mode = mode
146 if state is not None:
147 self.current_state = state
148 if delimiter is not None:
149 self.delimiter = delimiter
150 if extra and flush:
151 self.dataReceived(extra)
152 elif extra:
153 self._buffer.push(extra)
154
157
160
161 delimiter = property(_get_delimiter, _set_delimiter)
162
164 if self._callbacks:
165 self._updateCallbacks(data)
166 else:
167 self.rawDataReceived(data)
168
170 """Called when the buffering mode is MODE_RAW and there is new data
171 available.
172 """
173 raise NotImplementedError
174
176 if self._callbacks:
177 self._updateCallbacks(line)
178 else:
179 self.lineReceived(line)
180
182 """Called when the buffering mode is MODE_DELIMITED and there is a new
183 line available.
184 """
185 raise NotImplementedError
186
188 """Called when there is no current state for MODE_STATEFUL.
189
190 This function must return a tuple of (some_callable, bytes_to_read),
191 where the callable will be called when that number of bytes has been
192 read, with those same bytes as the only argument. That callable can
193 return None to keep the same (callable, n_bytes) state, or return a
194 new (callable, n_bytes) tuple.
195 """
196 raise NotImplementedError
197
198 - def close(self, disconnect=True):
199 """Stop buffering incoming data.
200
201 This will clear the input buffer and stop adding new data to the
202 buffer. If 'disconnect' is True, this will also lose the connection on
203 the transport.
204 """
205 self._buffer.clear()
206 self._closed = True
207 if disconnect:
208 self.transport.loseConnection()
209
211 self.close(False)
212 for d, _, _ in self._callbacks:
213 d.errback(reason)
214
216 """This class is identical to the IntNStringReceiver provided by Twisted,
217 implemented using MultiBufferer as a demonstration of how MODE_STATEFUL
218 works.
219 """
220 mode = MODE_STATEFUL
221
224
228
232
235
237 raise NotImplementedError
238
240 """This class is an implementation of the abstract IntNStringReceiver
241 class, only provided as a demonstration of MODE_STATEFUL.
242 """
243 structFormat = '!I'
244 prefixLength = struct.calcsize(structFormat)
245