Package ClusterShell :: Module Communication
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Communication

  1  #!/usr/bin/env python 
  2  # 
  3  # Copyright (C) 2010-2016 CEA/DAM 
  4  # Copyright (C) 2010-2011 Henri Doreau <henri.doreau@cea.fr> 
  5  # Copyright (C) 2015-2016 Stephane Thiell <sthiell@stanford.edu> 
  6  # 
  7  # This file is part of ClusterShell. 
  8  # 
  9  # ClusterShell is free software; you can redistribute it and/or 
 10  # modify it under the terms of the GNU Lesser General Public 
 11  # License as published by the Free Software Foundation; either 
 12  # version 2.1 of the License, or (at your option) any later version. 
 13  # 
 14  # ClusterShell is distributed in the hope that it will be useful, 
 15  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 16  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 17  # Lesser General Public License for more details. 
 18  # 
 19  # You should have received a copy of the GNU Lesser General Public 
 20  # License along with ClusterShell; if not, write to the Free Software 
 21  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 
 22   
 23  """ 
 24  ClusterShell inter-nodes communication module 
 25   
 26  This module contains the required material for nodes to communicate between each 
 27  others within the propagation tree. At the highest level, messages are instances 
 28  of several classes. They can be converted into XML to be sent over SSH links 
 29  through a Channel instance. 
 30   
 31  In the other side, XML is parsed and new message objects are instanciated. 
 32   
 33  Communication channels have been implemented as ClusterShell events handlers. 
 34  Whenever a message chunk is read, the data is given to a SAX XML parser, that 
 35  will use it to create corresponding messages instances as a messages factory. 
 36   
 37  As soon as an instance is ready, it is then passed to a recv() method in the 
 38  channel. The recv() method of the Channel class is a stub, that requires to be 
 39  implemented in subclass to process incoming messages. So is the start() method 
 40  too. 
 41   
 42  Subclassing the Channel class allows implementing whatever logic you want on the 
 43  top of a communication channel. 
 44  """ 
 45   
 46  import cPickle 
 47  import base64 
 48  import logging 
 49  import os 
 50  import xml.sax 
 51   
 52  from xml.sax.handler import ContentHandler 
 53  from xml.sax.saxutils import XMLGenerator 
 54  from xml.sax import SAXParseException 
 55   
 56  from collections import deque 
 57  from cStringIO import StringIO 
 58   
 59  from ClusterShell import __version__ 
 60  from ClusterShell.Event import EventHandler 
 61   
 62   
 63  # XML character encoding 
 64  ENCODING = 'utf-8' 
 65   
 66  # See Message.data_encode() 
 67  DEFAULT_B64_LINE_LENGTH = 65536 
 68   
 69   
70 -class MessageProcessingError(Exception):
71 """base exception raised when an error occurs while processing incoming or 72 outgoing messages. 73 """
74 75
76 -class XMLReader(ContentHandler):
77 """SAX handler for XML -> Messages instances conversion"""
78 - def __init__(self):
79 """XMLReader initializer""" 80 ContentHandler.__init__(self) 81 self.msg_queue = deque() 82 self.version = None 83 # current packet under construction 84 self._draft = None 85 self._sections_map = None
86
87 - def startElement(self, name, attrs):
88 """read a starting xml tag""" 89 if name == 'channel': 90 self.version = attrs.get('version') 91 self.msg_queue.appendleft(StartMessage()) 92 elif name == 'message': 93 self._draft_new(attrs) 94 else: 95 raise MessageProcessingError('Invalid starting tag %s' % name)
96
97 - def endElement(self, name):
98 """read an ending xml tag""" 99 # end of message 100 if name == 'message': 101 self.msg_queue.appendleft(self._draft) 102 self._draft = None 103 elif name == 'channel': 104 self.msg_queue.appendleft(EndMessage())
105
106 - def characters(self, content):
107 """read content characters""" 108 if self._draft is not None: 109 content = content.decode(ENCODING) 110 self._draft.data_update(content)
111
112 - def msg_available(self):
113 """return whether a message is available for delivery or not""" 114 return len(self.msg_queue) > 0
115
116 - def pop_msg(self):
117 """pop and return the oldest message queued""" 118 if self.msg_available(): 119 return self.msg_queue.pop()
120
121 - def _draft_new(self, attributes):
122 """start a new packet construction""" 123 # associative array to select to correct constructor according to the 124 # message type field contained in the serialized representation 125 ctors_map = { 126 ConfigurationMessage.ident: ConfigurationMessage, 127 ControlMessage.ident: ControlMessage, 128 ACKMessage.ident: ACKMessage, 129 ErrorMessage.ident: ErrorMessage, 130 StdOutMessage.ident: StdOutMessage, 131 StdErrMessage.ident: StdErrMessage, 132 RetcodeMessage.ident: RetcodeMessage, 133 TimeoutMessage.ident: TimeoutMessage, 134 } 135 try: 136 msg_type = attributes['type'] 137 # select the good constructor 138 ctor = ctors_map[msg_type] 139 except KeyError: 140 raise MessageProcessingError('Unknown message type') 141 # build message with its attributes 142 self._draft = ctor() 143 self._draft.selfbuild(attributes)
144 145
146 -class Channel(EventHandler):
147 """Use this event handler to establish a communication channel between to 148 hosts whithin the propagation tree. 149 150 The endpoint's logic has to be implemented by subclassing the Channel class 151 and overriding the start() and recv() methods. 152 153 There is no default behavior for these methods apart raising a 154 NotImplementedError. 155 156 Usage: 157 >> chan = MyChannel() # inherits Channel 158 >> task = task_self() 159 >> task.shell("uname -a", node="host2", handler=chan) 160 >> task.resume() 161 """ 162 163 # Common channel stream names 164 SNAME_WRITER = 'ch-writer' 165 SNAME_READER = 'ch-reader' 166 SNAME_ERROR = 'ch-error' 167
168 - def __init__(self, error_response=False):
169 """ 170 """ 171 EventHandler.__init__(self) 172 173 self.worker = None 174 175 # channel state flags 176 self.opened = False 177 self.setup = False 178 # will this channel send communication error responses? 179 self.error_response = error_response 180 181 self._xml_reader = XMLReader() 182 self._parser = xml.sax.make_parser(["IncrementalParser"]) 183 self._parser.setContentHandler(self._xml_reader) 184 185 self.logger = logging.getLogger(__name__)
186
187 - def _init(self):
188 """start xml document for communication""" 189 XMLGenerator(self.worker, encoding=ENCODING).startDocument()
190
191 - def _open(self):
192 """open a new communication channel from src to dst""" 193 xmlgen = XMLGenerator(self.worker, encoding=ENCODING) 194 xmlgen.startElement('channel', {'version': __version__})
195
196 - def _close(self):
197 """close an already opened channel""" 198 send_endtag = self.opened 199 # set to False before sending tag for state test purposes 200 self.opened = self.setup = False 201 if send_endtag: 202 XMLGenerator(self.worker, encoding=ENCODING).endElement('channel') 203 self.worker.abort()
204
205 - def ev_start(self, worker):
206 """connection established. Open higher level channel""" 207 self.worker = worker 208 self.start()
209
210 - def ev_read(self, worker):
211 """channel has data to read""" 212 raw = worker.current_msg 213 try: 214 self._parser.feed(raw + '\n') 215 except SAXParseException, ex: 216 self.logger.error("SAXParseException: %s: %s", ex.getMessage(), raw) 217 # Warning: do not send malformed raw message back 218 if self.error_response: 219 self.send(ErrorMessage('Parse error: %s' % ex.getMessage())) 220 self._close() 221 return 222 except MessageProcessingError, ex: 223 self.logger.error("MessageProcessingError: %s", ex) 224 if self.error_response: 225 self.send(ErrorMessage(str(ex))) 226 self._close() 227 return 228 229 # pass messages to the driver if ready 230 while self._xml_reader.msg_available(): 231 msg = self._xml_reader.pop_msg() 232 assert msg is not None 233 self.recv(msg)
234
235 - def send(self, msg):
236 """write an outgoing message as its XML representation""" 237 #self.logger.debug('SENDING to worker %s: "%s"', id(self.worker), 238 # msg.xml()) 239 self.worker.write(msg.xml() + '\n', sname=self.SNAME_WRITER)
240
241 - def start(self):
242 """initialization logic""" 243 raise NotImplementedError('Abstract method: subclasses must implement')
244
245 - def recv(self, msg):
246 """callback: process incoming message""" 247 raise NotImplementedError('Abstract method: subclasses must implement')
248 249
250 -class Message(object):
251 """base message class""" 252 _inst_counter = 0 253 ident = 'GEN' 254 has_payload = False 255
256 - def __init__(self):
257 """ 258 """ 259 self.attr = {'type': str, 'msgid': int} 260 self.type = self.__class__.ident 261 self.msgid = Message._inst_counter 262 self.data = None 263 Message._inst_counter += 1
264
265 - def data_encode(self, inst):
266 """serialize an instance and store the result""" 267 # Base64 transfer encoding for MIME mandates a fixed line length 268 # of 76 characters, which is way too small for our per-line ev_read 269 # mechanism. So use b64encode() here instead of encodestring(). 270 encoded = base64.b64encode(cPickle.dumps(inst)) 271 272 # We now follow relaxed RFC-4648 for base64, but we still add some 273 # newlines to very long lines to avoid memory pressure (eg. --rcopy). 274 # In RFC-4648, CRLF characters constitute "non-alphabet characters" 275 # and are ignored. 276 line_length = int(os.environ.get('CLUSTERSHELL_GW_B64_LINE_LENGTH', 277 DEFAULT_B64_LINE_LENGTH)) 278 self.data = '\n'.join(encoded[pos:pos+line_length] 279 for pos in xrange(0, len(encoded), line_length))
280
281 - def data_decode(self):
282 """deserialize a previously encoded instance and return it""" 283 # if self.data is None then an exception is raised here 284 try: 285 return cPickle.loads(base64.b64decode(self.data)) 286 except (EOFError, TypeError): 287 # raised by cPickle.loads() if self.data is not valid 288 raise MessageProcessingError('Message %s has an invalid payload' 289 % self.ident)
290
291 - def data_update(self, raw):
292 """append data to the instance (used for deserialization)""" 293 if self.has_payload: 294 if self.data is None: 295 self.data = raw # first encoded packet 296 else: 297 self.data += raw 298 else: 299 # ensure that incoming messages don't contain unexpected payloads 300 raise MessageProcessingError('Got unexpected payload for Message %s' 301 % self.ident)
302
303 - def selfbuild(self, attributes):
304 """self construction from a table of attributes""" 305 for k, fmt in self.attr.iteritems(): 306 try: 307 setattr(self, k, fmt(attributes[k])) 308 except KeyError: 309 raise MessageProcessingError( 310 'Invalid "message" attributes: missing key "%s"' % k)
311
312 - def __str__(self):
313 """printable representation""" 314 elts = ['%s: %s' % (k, str(self.__dict__[k])) for k in self.attr.keys()] 315 attributes = ', '.join(elts) 316 return "Message %s (%s)" % (self.type, attributes)
317
318 - def xml(self):
319 """generate XML version of a configuration message""" 320 out = StringIO() 321 generator = XMLGenerator(out, encoding=ENCODING) 322 323 # "stringify" entries for XML conversion 324 state = {} 325 for k in self.attr: 326 state[k] = str(getattr(self, k)) 327 328 generator.startElement('message', state) 329 if self.data: 330 generator.characters(self.data) 331 generator.endElement('message') 332 xml_msg = out.getvalue() 333 out.close() 334 return xml_msg
335
336 -class ConfigurationMessage(Message):
337 """configuration propagation container""" 338 ident = 'CFG' 339 has_payload = True 340
341 - def __init__(self, gateway=''):
342 """initialize with gateway node name""" 343 Message.__init__(self) 344 self.attr.update({'gateway': str}) 345 self.gateway = gateway
346
347 -class RoutedMessageBase(Message):
348 """abstract class for routed message (with worker source id)"""
349 - def __init__(self, srcid):
350 Message.__init__(self) 351 self.attr.update({'srcid': int}) 352 self.srcid = srcid
353
354 -class ControlMessage(RoutedMessageBase):
355 """action request""" 356 ident = 'CTL' 357 has_payload = True 358
359 - def __init__(self, srcid=0):
360 """ 361 """ 362 RoutedMessageBase.__init__(self, srcid) 363 self.attr.update({'action': str, 'target': str}) 364 self.action = '' 365 self.target = ''
366
367 -class ACKMessage(Message):
368 """acknowledgement message""" 369 ident = 'ACK' 370
371 - def __init__(self, ackid=0):
372 """ 373 """ 374 Message.__init__(self) 375 self.attr.update({'ack': int}) 376 self.ack = ackid
377
378 -class ErrorMessage(Message):
379 """error message""" 380 ident = 'ERR' 381
382 - def __init__(self, err=''):
383 """ 384 """ 385 Message.__init__(self) 386 self.attr.update({'reason': str}) 387 self.reason = err
388
389 -class StdOutMessage(RoutedMessageBase):
390 """container message for standard output""" 391 ident = 'OUT' 392 has_payload = True 393
394 - def __init__(self, nodes='', output=None, srcid=0):
395 """ 396 Initialized either with empty payload (to be loaded, already encoded), 397 or with payload provided (via output to encode here). 398 """ 399 RoutedMessageBase.__init__(self, srcid) 400 self.attr.update({'nodes': str}) 401 self.nodes = nodes 402 self.data = None # something encoded or None 403 if output is not None: 404 self.data_encode(output)
405
406 -class StdErrMessage(StdOutMessage):
407 """container message for stderr output""" 408 ident = 'SER'
409
410 -class RetcodeMessage(RoutedMessageBase):
411 """container message for return code""" 412 ident = 'RET' 413
414 - def __init__(self, nodes='', retcode=0, srcid=0):
415 """ 416 """ 417 RoutedMessageBase.__init__(self, srcid) 418 self.attr.update({'retcode': int, 'nodes': str}) 419 self.retcode = retcode 420 self.nodes = nodes
421
422 -class TimeoutMessage(RoutedMessageBase):
423 """container message for timeout notification""" 424 ident = 'TIM' 425
426 - def __init__(self, nodes='', srcid=0):
427 """ 428 """ 429 RoutedMessageBase.__init__(self, srcid) 430 self.attr.update({'nodes': str}) 431 self.nodes = nodes
432
433 -class StartMessage(Message):
434 """message indicating the start of a channel communication""" 435 ident = 'CHA'
436
437 -class EndMessage(Message):
438 """end of channel message""" 439 ident = 'END'
440