Package ClusterShell :: Module Gateway
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Gateway

  1  #!/usr/bin/env python 
  2  # 
  3  # Copyright (C) 2010-2016 CEA/DAM 
  4  # Copyright (C) 2010-2011 Henri Doreau <henri.doreau@cea.fr> 
  5  # Copyright (C) 2015-2016 Stephane Thiell <sthiell@stanford.edu> 
  6  # 
  7  # This file is part of ClusterShell. 
  8  # 
  9  # ClusterShell is free software; you can redistribute it and/or 
 10  # modify it under the terms of the GNU Lesser General Public 
 11  # License as published by the Free Software Foundation; either 
 12  # version 2.1 of the License, or (at your option) any later version. 
 13  # 
 14  # ClusterShell is distributed in the hope that it will be useful, 
 15  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 16  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 17  # Lesser General Public License for more details. 
 18  # 
 19  # You should have received a copy of the GNU Lesser General Public 
 20  # License along with ClusterShell; if not, write to the Free Software 
 21  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 
 22   
 23  """ 
 24  ClusterShell agent launched on remote gateway nodes. This script reads messages 
 25  on stdin via the SSH connection, interprets them, takes decisions, and prints 
 26  out replies on stdout. 
 27  """ 
 28   
 29  import logging 
 30  import os 
 31  import sys 
 32  import traceback 
 33   
 34  from ClusterShell.Event import EventHandler 
 35  from ClusterShell.NodeSet import NodeSet 
 36  from ClusterShell.Task import task_self, _getshorthostname 
 37  from ClusterShell.Engine.Engine import EngineAbortException 
 38  from ClusterShell.Worker.fastsubprocess import set_nonblock_flag 
 39  from ClusterShell.Worker.Worker import StreamWorker, FANOUT_UNLIMITED 
 40  from ClusterShell.Worker.Tree import WorkerTree 
 41  from ClusterShell.Communication import Channel, ConfigurationMessage, \ 
 42      ControlMessage, ACKMessage, ErrorMessage, StartMessage, EndMessage, \ 
 43      StdOutMessage, StdErrMessage, RetcodeMessage, TimeoutMessage, \ 
 44      MessageProcessingError 
 45   
 46   
47 -def _gw_print_debug(task, line):
48 """Default gateway task debug printing function""" 49 logging.getLogger(__name__).debug(line)
50
51 -def gateway_excepthook(exc_type, exc_value, tb):
52 """ 53 Default excepthook for Gateway to redirect any unhandled exception 54 to logger instead of stderr. 55 """ 56 tbexc = traceback.format_exception(exc_type, exc_value, tb) 57 logging.getLogger(__name__).error(''.join(tbexc))
58 59
60 -class WorkerTreeResponder(EventHandler):
61 """Gateway WorkerTree handler""" 62
63 - def __init__(self, task, gwchan, srcwkr):
64 EventHandler.__init__(self) 65 self.gwchan = gwchan # gateway channel 66 self.srcwkr = srcwkr # id of distant parent WorkerTree 67 self.worker = None # local WorkerTree instance 68 self.retcodes = {} # self-managed retcodes 69 self.logger = logging.getLogger(__name__) 70 71 # Grooming initialization 72 self.timer = None 73 qdelay = task.info("grooming_delay") 74 if qdelay > 1.0e-3: 75 # Enable messages and rc grooming - enable msgtree (#181) 76 task.set_default("stdout_msgtree", True) 77 task.set_default("stderr_msgtree", True) 78 # create auto-closing timer object for grooming 79 self.timer = task.timer(qdelay, self, qdelay, autoclose=True) 80 81 self.logger.debug("WorkerTreeResponder initialized grooming=%f", qdelay)
82
83 - def ev_start(self, worker):
84 self.logger.debug("WorkerTreeResponder: ev_start") 85 self.worker = worker
86
87 - def ev_timer(self, timer):
88 """perform gateway traffic grooming""" 89 if not self.worker: 90 return 91 logger = self.logger 92 93 # check for grooming opportunities for stdout/stderr 94 for msg_elem, nodes in self.worker.iter_errors(): 95 logger.debug("iter(stderr): %s: %d bytes", nodes, 96 len(msg_elem.message())) 97 self.gwchan.send(StdErrMessage(nodes, msg_elem.message(), 98 self.srcwkr)) 99 for msg_elem, nodes in self.worker.iter_buffers(): 100 logger.debug("iter(stdout): %s: %d bytes", nodes, 101 len(msg_elem.message())) 102 self.gwchan.send(StdOutMessage(nodes, msg_elem.message(), 103 self.srcwkr)) 104 # empty internal MsgTree buffers 105 self.worker.flush_buffers() 106 self.worker.flush_errors() 107 108 # specifically manage retcodes to periodically return latest 109 # retcodes to parent node, instead of doing it at ev_hup (no msg 110 # aggregation) or at ev_close (no parent node live updates) 111 for rc, nodes in self.retcodes.iteritems(): 112 self.logger.debug("iter(rc): %s: rc=%d", nodes, rc) 113 self.gwchan.send(RetcodeMessage(nodes, rc, self.srcwkr)) 114 self.retcodes.clear()
115
116 - def ev_read(self, worker):
117 """message received on stdout""" 118 if self.timer is None: 119 self.gwchan.send(StdOutMessage(worker.current_node, 120 worker.current_msg, 121 self.srcwkr))
122
123 - def ev_error(self, worker):
124 """message received on stderr""" 125 self.logger.debug("WorkerTreeResponder: ev_error %s %s", 126 worker.current_node, 127 worker.current_errmsg) 128 if self.timer is None: 129 self.gwchan.send(StdErrMessage(worker.current_node, 130 worker.current_errmsg, 131 self.srcwkr))
132
133 - def ev_timeout(self, worker):
134 """Received timeout event: some nodes did timeout""" 135 msg = TimeoutMessage(NodeSet._fromlist1(worker.iter_keys_timeout()), 136 self.srcwkr) 137 self.gwchan.send(msg)
138
139 - def ev_hup(self, worker):
140 """Received end of command from one node""" 141 if self.timer is None: 142 self.gwchan.send(RetcodeMessage(worker.current_node, 143 worker.current_rc, 144 self.srcwkr)) 145 else: 146 # retcode grooming 147 if worker.current_rc in self.retcodes: 148 self.retcodes[worker.current_rc].add(worker.current_node) 149 else: 150 self.retcodes[worker.current_rc] = NodeSet(worker.current_node)
151
152 - def ev_close(self, worker):
153 """End of CTL responder""" 154 self.logger.debug("WorkerTreeResponder: ev_close") 155 if self.timer is not None: 156 # finalize grooming 157 self.ev_timer(None) 158 self.timer.invalidate()
159 160
161 -class GatewayChannel(Channel):
162 """high level logic for gateways"""
163 - def __init__(self, task):
164 Channel.__init__(self, error_response=True) 165 self.task = task 166 self.nodename = None 167 self.topology = None 168 self.propagation = None 169 self.logger = logging.getLogger(__name__)
170
171 - def start(self):
172 """initialization""" 173 # prepare communication 174 self._init() 175 self.logger.debug('ready to accept channel communication')
176
177 - def close(self):
178 """close gw channel""" 179 self.logger.debug('closing gateway channel') 180 self._close()
181
182 - def recv(self, msg):
183 """handle incoming message""" 184 try: 185 self.logger.debug('handling incoming message: %s', str(msg)) 186 if msg.type == EndMessage.ident: 187 self.logger.debug('recv: got EndMessage') 188 self._close() 189 elif self.setup: 190 self.recv_ctl(msg) 191 elif self.opened: 192 self.recv_cfg(msg) 193 elif msg.type == StartMessage.ident: 194 self.logger.debug('got start message %s', msg) 195 self.opened = True 196 self._open() 197 self.logger.debug('channel started (version %s on remote end)', 198 self._xml_reader.version) 199 else: 200 self.logger.error('unexpected message: %s', str(msg)) 201 raise MessageProcessingError('unexpected message: %s' % msg) 202 except MessageProcessingError, ex: 203 self.logger.error('on recv(): %s', str(ex)) 204 self.send(ErrorMessage(str(ex))) 205 self._close() 206 207 except EngineAbortException: 208 # gateway task abort: don't handle like other exceptions 209 raise 210 211 except Exception, ex: 212 self.logger.exception('on recv(): %s', str(ex)) 213 self.send(ErrorMessage(str(ex))) 214 self._close()
215
216 - def recv_cfg(self, msg):
217 """receive cfg/topology configuration""" 218 if msg.type != ConfigurationMessage.ident: 219 raise MessageProcessingError('unexpected message: %s' % msg) 220 221 self.logger.debug('got channel configuration') 222 223 # gw node name 224 hostname = _getshorthostname() 225 if not msg.gateway: 226 self.nodename = hostname 227 self.logger.warn('gw name not provided, using system hostname %s', 228 self.nodename) 229 else: 230 self.nodename = msg.gateway 231 232 self.logger.debug('using gateway node name %s', self.nodename) 233 if self.nodename.lower() != hostname.lower(): 234 self.logger.debug('gw name %s does not match system hostname %s', 235 self.nodename, hostname) 236 237 # topology 238 task_self().topology = self.topology = msg.data_decode() 239 self.logger.debug('decoded propagation tree') 240 self.logger.debug('\n%s', self.topology) 241 self.setup = True 242 self._ack(msg)
243
244 - def recv_ctl(self, msg):
245 """receive control message with actions to perform""" 246 if msg.type == ControlMessage.ident: 247 self.logger.debug('GatewayChannel._state_ctl') 248 if msg.action == 'shell': 249 data = msg.data_decode() 250 cmd = data['cmd'] 251 252 stderr = data['stderr'] 253 timeout = data['timeout'] 254 remote = data['remote'] 255 256 #self.propagation.invoke_gateway = data['invoke_gateway'] 257 self.logger.debug('decoded gw invoke (%s)', 258 data['invoke_gateway']) 259 260 taskinfo = data['taskinfo'] 261 self.logger.debug('assigning task infos (%s)', data['taskinfo']) 262 263 task = task_self() 264 task._info.update(taskinfo) 265 task.set_info('print_debug', _gw_print_debug) 266 267 if task.info('debug'): 268 self.logger.setLevel(logging.DEBUG) 269 270 self.logger.debug('inherited fanout value=%d', 271 task.info("fanout")) 272 273 self.logger.debug('launching execution/enter gathering state') 274 275 responder = WorkerTreeResponder(task, self, msg.srcid) 276 277 self.propagation = WorkerTree(msg.target, responder, timeout, 278 command=cmd, 279 topology=self.topology, 280 newroot=self.nodename, 281 stderr=stderr, 282 remote=remote) 283 # FIXME ev_start-not-called workaround 284 responder.worker = self.propagation 285 self.propagation.upchannel = self 286 task.schedule(self.propagation) 287 self.logger.debug("WorkerTree scheduled") 288 self._ack(msg) 289 elif msg.action == 'write': 290 data = msg.data_decode() 291 self.logger.debug('GatewayChannel write: %d bytes', 292 len(data['buf'])) 293 self.propagation.write(data['buf']) 294 self._ack(msg) 295 elif msg.action == 'eof': 296 self.logger.debug('GatewayChannel eof') 297 self.propagation.set_write_eof() 298 self._ack(msg) 299 else: 300 self.logger.error('unexpected CTL action: %s', msg.action) 301 else: 302 self.logger.error('unexpected message: %s', str(msg))
303
304 - def _ack(self, msg):
305 """acknowledge a received message""" 306 self.send(ACKMessage(msg.msgid))
307
308 - def ev_close(self, worker):
309 """Gateway (parent) channel is closing. 310 311 We abort the whole gateway task to stop other running workers. 312 This avoids any unwanted remaining processes on gateways. 313 """ 314 self.logger.debug('GatewayChannel: ev_close') 315 self.worker.task.abort()
316 317
318 -def gateway_main():
319 """ClusterShell gateway entry point""" 320 host = _getshorthostname() 321 # configure root logger 322 logdir = os.path.expanduser(os.environ.get('CLUSTERSHELL_GW_LOG_DIR', \ 323 '/tmp')) 324 loglevel = os.environ.get('CLUSTERSHELL_GW_LOG_LEVEL', 'INFO') 325 logging.basicConfig(level=getattr(logging, loglevel.upper(), logging.INFO), 326 format='%(asctime)s %(name)s %(levelname)s %(message)s', 327 filename=os.path.join(logdir, "%s.gw.log" % host)) 328 logger = logging.getLogger(__name__) 329 sys.excepthook = gateway_excepthook 330 331 logger.debug('Starting gateway on %s', host) 332 logger.debug("environ=%s", os.environ) 333 334 335 set_nonblock_flag(sys.stdin.fileno()) 336 set_nonblock_flag(sys.stdout.fileno()) 337 set_nonblock_flag(sys.stderr.fileno()) 338 339 task = task_self() 340 341 # Disable MsgTree buffering, it is enabled later when needed 342 task.set_default("stdout_msgtree", False) 343 task.set_default("stderr_msgtree", False) 344 345 if sys.stdin.isatty(): 346 logger.critical('Gateway failure: sys.stdin.isatty() is True') 347 sys.exit(1) 348 349 gateway = GatewayChannel(task) 350 worker = StreamWorker(handler=gateway) 351 # Define worker._fanout to not rely on the engine's fanout, and use 352 # the special value FANOUT_UNLIMITED to always allow registration 353 worker._fanout = FANOUT_UNLIMITED 354 worker.set_reader(gateway.SNAME_READER, sys.stdin) 355 worker.set_writer(gateway.SNAME_WRITER, sys.stdout, retain=False) 356 # must stay disabled for now (see #274) 357 #worker.set_writer(gateway.SNAME_ERROR, sys.stderr, retain=False) 358 task.schedule(worker) 359 logger.debug('Starting task') 360 try: 361 task.resume() 362 logger.debug('Task performed') 363 except EngineAbortException, exc: 364 logger.debug('EngineAbortException') 365 except IOError, exc: 366 logger.debug('Broken pipe (%s)', exc) 367 raise 368 except Exception, exc: 369 logger.exception('Gateway failure: %s', exc) 370 logger.debug('-------- The End --------')
371 372 if __name__ == '__main__': 373 __name__ = 'ClusterShell.Gateway' 374 # To enable gateway profiling: 375 #import cProfile 376 #cProfile.run('gateway_main()', '/tmp/gwprof') 377 gateway_main() 378