Package qbuf :: Module twisted_support

Source Code for Module qbuf.twisted_support

  1  """Twisted support for qbuf. 
  2   
  3  MODE_RAW, MODE_DELIMITED, and MODE_STATEFUL are module-level constants for 
  4  changing the buffering mode of MultiBufferer. 
  5  """ 
  6   
  7  #from __future__ import absolute_import 
  8  from qbuf import BufferQueue, BufferUnderflow 
  9  from twisted.internet import protocol, defer 
 10  import collections 
 11  import struct 
 12   
 13  MODE_RAW, MODE_DELIMITED, MODE_STATEFUL = xrange(3) 
 14   
15 -class MultiBufferer(protocol.Protocol):
16 """A replacement for a couple of buffering classes provided by twisted. 17 18 Without subclassing, it can work the same way as LineReceiver and 19 StatefulProtocol. 20 21 When the buffering mode is MODE_RAW, rawDataReceived is called with all of 22 the data that comes in. 23 24 When the buffering mode is MODE_DELIMITED, lineReceived is called with 25 each complete line without the delimiter on the end. The 'delimiter' 26 instance attribute is used for keeping track of the current delimiter. 27 28 When the buffering mode is MODE_STATEFUL, a user-passed function is 29 called for every so many bytes received. If self.current_state is None, 30 getInitialState is called to get the initial state. See the documentation 31 on getInitialState. 32 33 MultiBufferers can also return Deferreds that are fired when a certain 34 amount of data has been sent over the wire. This is intended for use with 35 twisted.internet.defer.inlineCallbacks. 36 """ 37 mode = MODE_RAW 38 initial_delimiter = '\r\n' 39 current_state = None 40 _closed = False 41
42 - def __init__(self):
43 self._buffer = BufferQueue(self.initial_delimiter) 44 self._callbacks = collections.deque()
45
46 - def read(self, size=None):
47 """Wait for some data to be received. 48 49 If 'size' is provided, wait for that many bytes to be received. 50 Otherwise, wait for the next chunk of incoming data, regardless of the 51 size. Returns a Deferred that will be fired with the received data. 52 """ 53 d = defer.Deferred() 54 if size is None: 55 self._callbacks.append((d, MODE_RAW, None)) 56 else: 57 self._callbacks.append((d, MODE_STATEFUL, size)) 58 return d
59
60 - def unpack(self, fmt):
61 """Wait for some struct to be received. 62 63 This method takes a struct format as understood by the 'struct' module 64 and waits for enough data to be received to unpack it. Returns a 65 Deferred that will be fired with the unpacked struct. 66 """ 67 def _transform(data): 68 return struct.unpack(fmt, data)
69 d = self.read(struct.calcsize(fmt)) 70 d.addCallback(_transform) 71 return d
72
73 - def readline(self, delimiter=None):
74 """Wait for a line to be received. 75 76 Wait for a line of data to be received. If 'delimiter' is provided, the 77 underlying BufferQueue's delimiter will be set to whatever is provided 78 when the MultiBufferer is ready to receive that line. Returns a 79 Deferred that will be fired with the received line, without delimiter. 80 """ 81 d = defer.Deferred() 82 self._callbacks.append((d, MODE_DELIMITED, delimiter)) 83 return d
84
85 - def write(self, data):
86 """Send some data over the wire. 87 88 This method merely forwards the data to the underlying transport, 89 provided to parallel the read/readline methods. 90 """ 91 self.transport.write(data)
92
93 - def _updateCallbacks(self, data):
94 d, _, _ = self._callbacks.popleft() 95 d.callback(data)
96
97 - def dataReceived(self, data):
98 if self._closed: 99 return 100 101 self._buffer.push(data) 102 while self._buffer and not self._closed: 103 if self._callbacks: 104 _, mode, extra = self._callbacks[0] 105 else: 106 mode, extra = self.mode, None 107 if mode == MODE_RAW: 108 self._rawDataReceived(self._buffer.pop()) 109 elif mode == MODE_DELIMITED: 110 try: 111 line = self._buffer.popline(extra) 112 except ValueError: 113 break 114 else: 115 self._lineReceived(line) 116 elif mode == MODE_STATEFUL: 117 if self._callbacks: 118 try: 119 chunk = self._buffer.pop(extra) 120 except BufferUnderflow: 121 break 122 else: 123 self._updateCallbacks(chunk) 124 else: 125 if self.current_state is None: 126 self.current_state = self.getInitialState() 127 try: 128 chunk = self._buffer.pop(self.current_state[1]) 129 except BufferUnderflow: 130 break 131 else: 132 result = self.current_state[0](chunk) 133 if result: 134 self.current_state = result
135
136 - def setMode(self, mode, extra='', flush=False, state=None, delimiter=None):
137 """Change the buffering mode. 138 139 If 'extra' is provided, add that to the buffer. If 'flush' is True and 140 'extra' is provided, also flush the buffer as much as possible. If 141 'state' is not None, that value will be assigned to self.current_state 142 before anything else. If 'delimiter' is not None, the delimiter will 143 be set before anything else. 144 """ 145 self.mode = mode 146 if state is not None: 147 self.current_state = state 148 if delimiter is not None: 149 self.delimiter = delimiter 150 if extra and flush: 151 self.dataReceived(extra) 152 elif extra: 153 self._buffer.push(extra)
154
155 - def _get_delimiter(self):
156 return self._buffer.delimiter
157
158 - def _set_delimiter(self, delimiter):
159 self._buffer.delimiter = delimiter
160 161 delimiter = property(_get_delimiter, _set_delimiter) 162
163 - def _rawDataReceived(self, data):
164 if self._callbacks: 165 self._updateCallbacks(data) 166 else: 167 self.rawDataReceived(data)
168
169 - def rawDataReceived(self, data):
170 """Called when the buffering mode is MODE_RAW and there is new data 171 available. 172 """ 173 raise NotImplementedError
174
175 - def _lineReceived(self, line):
176 if self._callbacks: 177 self._updateCallbacks(line) 178 else: 179 self.lineReceived(line)
180
181 - def lineReceived(self, line):
182 """Called when the buffering mode is MODE_DELIMITED and there is a new 183 line available. 184 """ 185 raise NotImplementedError
186
187 - def getInitialState(self):
188 """Called when there is no current state for MODE_STATEFUL. 189 190 This function must return a tuple of (some_callable, bytes_to_read), 191 where the callable will be called when that number of bytes has been 192 read, with those same bytes as the only argument. That callable can 193 return None to keep the same (callable, n_bytes) state, or return a 194 new (callable, n_bytes) tuple. 195 """ 196 raise NotImplementedError
197
198 - def close(self, disconnect=True):
199 """Stop buffering incoming data. 200 201 This will clear the input buffer and stop adding new data to the 202 buffer. If 'disconnect' is True, this will also lose the connection on 203 the transport. 204 """ 205 self._buffer.clear() 206 self._closed = True 207 if disconnect: 208 self.transport.loseConnection()
209
210 - def connectionLost(self, reason):
211 self.close(False) 212 for d, _, _ in self._callbacks: 213 d.errback(reason)
214
215 -class IntNStringReceiver(MultiBufferer):
216 """This class is identical to the IntNStringReceiver provided by Twisted, 217 implemented using MultiBufferer as a demonstration of how MODE_STATEFUL 218 works. 219 """ 220 mode = MODE_STATEFUL 221
222 - def getInitialState(self):
223 return self.receiveLength, self.prefixLength
224
225 - def receiveLength(self, data):
226 length, = struct.unpack(self.structFormat, data) 227 return self.receiveString, length
228
229 - def receiveString(self, string):
230 self.stringReceived(string) 231 return self.receiveLength, self.prefixLength
232
233 - def sendString(self, data):
234 self.transport.write(struct.pack(self.structFormat, len(data)) + data)
235
236 - def stringReceived(self, string):
237 raise NotImplementedError
238
239 -class Int32StringReceiver(IntNStringReceiver):
240 """This class is an implementation of the abstract IntNStringReceiver 241 class, only provided as a demonstration of MODE_STATEFUL. 242 """ 243 structFormat = '!I' 244 prefixLength = struct.calcsize(structFormat)
245