1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 """
24 ClusterShell inter-nodes communication module
25
26 This module contains the required material for nodes to communicate between each
27 others within the propagation tree. At the highest level, messages are instances
28 of several classes. They can be converted into XML to be sent over SSH links
29 through a Channel instance.
30
31 In the other side, XML is parsed and new message objects are instanciated.
32
33 Communication channels have been implemented as ClusterShell events handlers.
34 Whenever a message chunk is read, the data is given to a SAX XML parser, that
35 will use it to create corresponding messages instances as a messages factory.
36
37 As soon as an instance is ready, it is then passed to a recv() method in the
38 channel. The recv() method of the Channel class is a stub, that requires to be
39 implemented in subclass to process incoming messages. So is the start() method
40 too.
41
42 Subclassing the Channel class allows implementing whatever logic you want on the
43 top of a communication channel.
44 """
45
46 import cPickle
47 import base64
48 import logging
49 import os
50 import xml.sax
51
52 from xml.sax.handler import ContentHandler
53 from xml.sax.saxutils import XMLGenerator
54 from xml.sax import SAXParseException
55
56 from collections import deque
57 from cStringIO import StringIO
58
59 from ClusterShell import __version__
60 from ClusterShell.Event import EventHandler
61
62
63
64 ENCODING = 'utf-8'
65
66
67 DEFAULT_B64_LINE_LENGTH = 65536
68
69
71 """base exception raised when an error occurs while processing incoming or
72 outgoing messages.
73 """
74
75
77 """SAX handler for XML -> Messages instances conversion"""
79 """XMLReader initializer"""
80 ContentHandler.__init__(self)
81 self.msg_queue = deque()
82 self.version = None
83
84 self._draft = None
85 self._sections_map = None
86
88 """read a starting xml tag"""
89 if name == 'channel':
90 self.version = attrs.get('version')
91 self.msg_queue.appendleft(StartMessage())
92 elif name == 'message':
93 self._draft_new(attrs)
94 else:
95 raise MessageProcessingError('Invalid starting tag %s' % name)
96
98 """read an ending xml tag"""
99
100 if name == 'message':
101 self.msg_queue.appendleft(self._draft)
102 self._draft = None
103 elif name == 'channel':
104 self.msg_queue.appendleft(EndMessage())
105
107 """read content characters"""
108 if self._draft is not None:
109 content = content.decode(ENCODING)
110 self._draft.data_update(content)
111
113 """return whether a message is available for delivery or not"""
114 return len(self.msg_queue) > 0
115
117 """pop and return the oldest message queued"""
118 if self.msg_available():
119 return self.msg_queue.pop()
120
144
145
147 """Use this event handler to establish a communication channel between to
148 hosts whithin the propagation tree.
149
150 The endpoint's logic has to be implemented by subclassing the Channel class
151 and overriding the start() and recv() methods.
152
153 There is no default behavior for these methods apart raising a
154 NotImplementedError.
155
156 Usage:
157 >> chan = MyChannel() # inherits Channel
158 >> task = task_self()
159 >> task.shell("uname -a", node="host2", handler=chan)
160 >> task.resume()
161 """
162
163
164 SNAME_WRITER = 'ch-writer'
165 SNAME_READER = 'ch-reader'
166 SNAME_ERROR = 'ch-error'
167
168 - def __init__(self, error_response=False):
169 """
170 """
171 EventHandler.__init__(self)
172
173 self.worker = None
174
175
176 self.opened = False
177 self.setup = False
178
179 self.error_response = error_response
180
181 self._xml_reader = XMLReader()
182 self._parser = xml.sax.make_parser(["IncrementalParser"])
183 self._parser.setContentHandler(self._xml_reader)
184
185 self.logger = logging.getLogger(__name__)
186
188 """start xml document for communication"""
189 XMLGenerator(self.worker, encoding=ENCODING).startDocument()
190
192 """open a new communication channel from src to dst"""
193 xmlgen = XMLGenerator(self.worker, encoding=ENCODING)
194 xmlgen.startElement('channel', {'version': __version__})
195
197 """close an already opened channel"""
198 send_endtag = self.opened
199
200 self.opened = self.setup = False
201 if send_endtag:
202 XMLGenerator(self.worker, encoding=ENCODING).endElement('channel')
203 self.worker.abort()
204
206 """connection established. Open higher level channel"""
207 self.worker = worker
208 self.start()
209
211 """channel has data to read"""
212 raw = worker.current_msg
213 try:
214 self._parser.feed(raw + '\n')
215 except SAXParseException, ex:
216 self.logger.error("SAXParseException: %s: %s", ex.getMessage(), raw)
217
218 if self.error_response:
219 self.send(ErrorMessage('Parse error: %s' % ex.getMessage()))
220 self._close()
221 return
222 except MessageProcessingError, ex:
223 self.logger.error("MessageProcessingError: %s", ex)
224 if self.error_response:
225 self.send(ErrorMessage(str(ex)))
226 self._close()
227 return
228
229
230 while self._xml_reader.msg_available():
231 msg = self._xml_reader.pop_msg()
232 assert msg is not None
233 self.recv(msg)
234
235 - def send(self, msg):
236 """write an outgoing message as its XML representation"""
237
238
239 self.worker.write(msg.xml() + '\n', sname=self.SNAME_WRITER)
240
242 """initialization logic"""
243 raise NotImplementedError('Abstract method: subclasses must implement')
244
245 - def recv(self, msg):
246 """callback: process incoming message"""
247 raise NotImplementedError('Abstract method: subclasses must implement')
248
249
251 """base message class"""
252 _inst_counter = 0
253 ident = 'GEN'
254 has_payload = False
255
264
266 """serialize an instance and store the result"""
267
268
269
270 encoded = base64.b64encode(cPickle.dumps(inst))
271
272
273
274
275
276 line_length = int(os.environ.get('CLUSTERSHELL_GW_B64_LINE_LENGTH',
277 DEFAULT_B64_LINE_LENGTH))
278 self.data = '\n'.join(encoded[pos:pos+line_length]
279 for pos in xrange(0, len(encoded), line_length))
280
282 """deserialize a previously encoded instance and return it"""
283
284 try:
285 return cPickle.loads(base64.b64decode(self.data))
286 except (EOFError, TypeError):
287
288 raise MessageProcessingError('Message %s has an invalid payload'
289 % self.ident)
290
292 """append data to the instance (used for deserialization)"""
293 if self.has_payload:
294 if self.data is None:
295 self.data = raw
296 else:
297 self.data += raw
298 else:
299
300 raise MessageProcessingError('Got unexpected payload for Message %s'
301 % self.ident)
302
304 """self construction from a table of attributes"""
305 for k, fmt in self.attr.iteritems():
306 try:
307 setattr(self, k, fmt(attributes[k]))
308 except KeyError:
309 raise MessageProcessingError(
310 'Invalid "message" attributes: missing key "%s"' % k)
311
313 """printable representation"""
314 elts = ['%s: %s' % (k, str(self.__dict__[k])) for k in self.attr.keys()]
315 attributes = ', '.join(elts)
316 return "Message %s (%s)" % (self.type, attributes)
317
319 """generate XML version of a configuration message"""
320 out = StringIO()
321 generator = XMLGenerator(out, encoding=ENCODING)
322
323
324 state = {}
325 for k in self.attr:
326 state[k] = str(getattr(self, k))
327
328 generator.startElement('message', state)
329 if self.data:
330 generator.characters(self.data)
331 generator.endElement('message')
332 xml_msg = out.getvalue()
333 out.close()
334 return xml_msg
335
337 """configuration propagation container"""
338 ident = 'CFG'
339 has_payload = True
340
342 """initialize with gateway node name"""
343 Message.__init__(self)
344 self.attr.update({'gateway': str})
345 self.gateway = gateway
346
348 """abstract class for routed message (with worker source id)"""
353
366
368 """acknowledgement message"""
369 ident = 'ACK'
370
377
379 """error message"""
380 ident = 'ERR'
381
388
390 """container message for standard output"""
391 ident = 'OUT'
392 has_payload = True
393
394 - def __init__(self, nodes='', output=None, srcid=0):
395 """
396 Initialized either with empty payload (to be loaded, already encoded),
397 or with payload provided (via output to encode here).
398 """
399 RoutedMessageBase.__init__(self, srcid)
400 self.attr.update({'nodes': str})
401 self.nodes = nodes
402 self.data = None
403 if output is not None:
404 self.data_encode(output)
405
407 """container message for stderr output"""
408 ident = 'SER'
409
411 """container message for return code"""
412 ident = 'RET'
413
414 - def __init__(self, nodes='', retcode=0, srcid=0):
421
423 """container message for timeout notification"""
424 ident = 'TIM'
425
432
434 """message indicating the start of a channel communication"""
435 ident = 'CHA'
436
438 """end of channel message"""
439 ident = 'END'
440