Package qbuf :: Module socket_support

Source Code for Module qbuf.socket_support

  1  """socket module support for qbuf. 
  2  """ 
  3   
  4  #from __future__ import absolute_import 
  5  from qbuf import BufferQueue, BufferUnderflow 
  6  import socket, errno 
  7   
8 -class SocketClosedError(Exception):
9 pass
10
11 -class _SocketWrapper(object):
12 - def __init__(self, sock, buffer_size=4096):
13 self.buffer = BufferQueue() 14 self.sock = sock 15 self.buffer_size = buffer_size
16
17 - def fileno(self):
18 """Returns the wrapped socket's fileno. 19 """ 20 return self.sock.fileno()
21
22 - def send(self, data):
23 """Calls send on the wrapped socket. 24 """ 25 return self.sock.send(data)
26
27 - def sendall(self, data):
28 """Calls sendall on the wrapped socket. 29 """ 30 self.sock.sendall(data)
31
32 - def close(self):
33 """Closes the wrapped socket. 34 """ 35 self.closed = True 36 self.sock.close()
37
38 - def pump_buffer(self):
39 """Try to read from the wrapped socket into the buffer. 40 41 Returns True if the socket is still open, and False otherwise. 42 """ 43 try: 44 data = self.sock.recv(self.buffer_size) 45 except socket.error, e: 46 if e[0] == errno.EGAIN: 47 return True 48 else: 49 raise 50 if not data: 51 self.closed = True 52 return False 53 self.buffer.push(data) 54 return True
55
56 -class LineReceiver(_SocketWrapper):
57 """A socket wrapper for line buffering. 58 59 Iterating over a LineBuffer will yield each line available in the buffer. 60 If the socket is closed, iterating over a LineBuffer will raise a 61 SocketClosedError. 62 """ 63
64 - def __init__(self, sock, buffer_size=4096, 65 delimiter="\r\n", auto_pump=True):
66 """Wrap a socket for easier line buffering of incoming data. 67 68 The buffer_size parameter indicates how much should be read from the 69 socket at a time. If the auto_pump parameter is True, iterating over 70 the LineReceiver and calling readline will automatically try to read 71 new socket data into the buffer first. 72 """ 73 super(LineReceiver, self).__init__(sock, buffer_size) 74 self.buffer.delimiter = delimiter 75 self.auto_pump = auto_pump
76
77 - def __iter__(self):
78 if self.auto_pump: 79 if not self.pump_buffer(): 80 raise SocketClosedError 81 return self.buffer
82
83 - def readline(self):
84 """Read a line out of the buffer. 85 86 If the socket is closed, this function returns an empty string. If no 87 line is available, socket.error is raised with an errno of EAGAIN. 88 Otherwise, return the next line available in the buffer. 89 """ 90 if self.auto_pump: 91 if not self.pump_buffer(): 92 return '' 93 try: 94 return self.buffer.popline() 95 except ValueError: 96 raise socket.error(errno.EAGAIN, 'no full line available')
97
98 -class StatefulProtocol(_SocketWrapper):
99 """A socket wrapper similar to Twisted's StatefulProtocol. 100 """
101 - def __init__(self, sock, buffer_size=4096):
102 """Wrap a socket for stateful reads. 103 104 The buffer_size parameter indicates how much should be read from the 105 socket at a time. 106 """ 107 super(StatefulProtocol, self).__init__(sock, buffer_size) 108 self.next_func, self.waiting_on = self.get_initial_state()
109
110 - def update(self):
111 """Pump the buffer and try to make as many callbacks as possible. 112 113 If the socket is closed, a SocketClosedError is raised. 114 """ 115 if not self.pump_buffer(): 116 raise SocketClosedError 117 while self.buffer: 118 try: 119 data = self.buffer.pop(self.waiting_on) 120 except BufferUnderflow: 121 break 122 else: 123 result = self.next_func(data) 124 if result: 125 self.next_func, self.waiting_on = result
126
127 - def get_initial_state(self):
128 """Get the initial state for the StatefulProtocol. 129 130 This function must return a tuple of (some_callable, bytes_to_read), 131 where the callable will be called when that number of bytes has been 132 read, with those same bytes as the only argument. That callable can 133 return None to keep the same (callable, n_bytes) state, or return a 134 new (callable, n_bytes) tuple. 135 """ 136 pass
137