1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 """
24 ClusterShell agent launched on remote gateway nodes. This script reads messages
25 on stdin via the SSH connection, interprets them, takes decisions, and prints
26 out replies on stdout.
27 """
28
29 import logging
30 import os
31 import sys
32 import traceback
33
34 from ClusterShell.Event import EventHandler
35 from ClusterShell.NodeSet import NodeSet
36 from ClusterShell.Task import task_self, _getshorthostname
37 from ClusterShell.Engine.Engine import EngineAbortException
38 from ClusterShell.Worker.fastsubprocess import set_nonblock_flag
39 from ClusterShell.Worker.Worker import StreamWorker, FANOUT_UNLIMITED
40 from ClusterShell.Worker.Tree import WorkerTree
41 from ClusterShell.Communication import Channel, ConfigurationMessage, \
42 ControlMessage, ACKMessage, ErrorMessage, StartMessage, EndMessage, \
43 StdOutMessage, StdErrMessage, RetcodeMessage, TimeoutMessage, \
44 MessageProcessingError
45
46
48 """Default gateway task debug printing function"""
49 logging.getLogger(__name__).debug(line)
50
52 """
53 Default excepthook for Gateway to redirect any unhandled exception
54 to logger instead of stderr.
55 """
56 tbexc = traceback.format_exception(exc_type, exc_value, tb)
57 logging.getLogger(__name__).error(''.join(tbexc))
58
59
61 """Gateway WorkerTree handler"""
62
63 - def __init__(self, task, gwchan, srcwkr):
64 EventHandler.__init__(self)
65 self.gwchan = gwchan
66 self.srcwkr = srcwkr
67 self.worker = None
68 self.retcodes = {}
69 self.logger = logging.getLogger(__name__)
70
71
72 self.timer = None
73 qdelay = task.info("grooming_delay")
74 if qdelay > 1.0e-3:
75
76 task.set_default("stdout_msgtree", True)
77 task.set_default("stderr_msgtree", True)
78
79 self.timer = task.timer(qdelay, self, qdelay, autoclose=True)
80
81 self.logger.debug("WorkerTreeResponder initialized grooming=%f", qdelay)
82
84 self.logger.debug("WorkerTreeResponder: ev_start")
85 self.worker = worker
86
88 """perform gateway traffic grooming"""
89 if not self.worker:
90 return
91 logger = self.logger
92
93
94 for msg_elem, nodes in self.worker.iter_errors():
95 logger.debug("iter(stderr): %s: %d bytes", nodes,
96 len(msg_elem.message()))
97 self.gwchan.send(StdErrMessage(nodes, msg_elem.message(),
98 self.srcwkr))
99 for msg_elem, nodes in self.worker.iter_buffers():
100 logger.debug("iter(stdout): %s: %d bytes", nodes,
101 len(msg_elem.message()))
102 self.gwchan.send(StdOutMessage(nodes, msg_elem.message(),
103 self.srcwkr))
104
105 self.worker.flush_buffers()
106 self.worker.flush_errors()
107
108
109
110
111 for rc, nodes in self.retcodes.iteritems():
112 self.logger.debug("iter(rc): %s: rc=%d", nodes, rc)
113 self.gwchan.send(RetcodeMessage(nodes, rc, self.srcwkr))
114 self.retcodes.clear()
115
117 """message received on stdout"""
118 if self.timer is None:
119 self.gwchan.send(StdOutMessage(worker.current_node,
120 worker.current_msg,
121 self.srcwkr))
122
124 """message received on stderr"""
125 self.logger.debug("WorkerTreeResponder: ev_error %s %s",
126 worker.current_node,
127 worker.current_errmsg)
128 if self.timer is None:
129 self.gwchan.send(StdErrMessage(worker.current_node,
130 worker.current_errmsg,
131 self.srcwkr))
132
138
140 """Received end of command from one node"""
141 if self.timer is None:
142 self.gwchan.send(RetcodeMessage(worker.current_node,
143 worker.current_rc,
144 self.srcwkr))
145 else:
146
147 if worker.current_rc in self.retcodes:
148 self.retcodes[worker.current_rc].add(worker.current_node)
149 else:
150 self.retcodes[worker.current_rc] = NodeSet(worker.current_node)
151
153 """End of CTL responder"""
154 self.logger.debug("WorkerTreeResponder: ev_close")
155 if self.timer is not None:
156
157 self.ev_timer(None)
158 self.timer.invalidate()
159
160
162 """high level logic for gateways"""
164 Channel.__init__(self, error_response=True)
165 self.task = task
166 self.nodename = None
167 self.topology = None
168 self.propagation = None
169 self.logger = logging.getLogger(__name__)
170
172 """initialization"""
173
174 self._init()
175 self.logger.debug('ready to accept channel communication')
176
178 """close gw channel"""
179 self.logger.debug('closing gateway channel')
180 self._close()
181
182 - def recv(self, msg):
215
217 """receive cfg/topology configuration"""
218 if msg.type != ConfigurationMessage.ident:
219 raise MessageProcessingError('unexpected message: %s' % msg)
220
221 self.logger.debug('got channel configuration')
222
223
224 hostname = _getshorthostname()
225 if not msg.gateway:
226 self.nodename = hostname
227 self.logger.warn('gw name not provided, using system hostname %s',
228 self.nodename)
229 else:
230 self.nodename = msg.gateway
231
232 self.logger.debug('using gateway node name %s', self.nodename)
233 if self.nodename.lower() != hostname.lower():
234 self.logger.debug('gw name %s does not match system hostname %s',
235 self.nodename, hostname)
236
237
238 task_self().topology = self.topology = msg.data_decode()
239 self.logger.debug('decoded propagation tree')
240 self.logger.debug('\n%s', self.topology)
241 self.setup = True
242 self._ack(msg)
243
245 """receive control message with actions to perform"""
246 if msg.type == ControlMessage.ident:
247 self.logger.debug('GatewayChannel._state_ctl')
248 if msg.action == 'shell':
249 data = msg.data_decode()
250 cmd = data['cmd']
251
252 stderr = data['stderr']
253 timeout = data['timeout']
254 remote = data['remote']
255
256
257 self.logger.debug('decoded gw invoke (%s)',
258 data['invoke_gateway'])
259
260 taskinfo = data['taskinfo']
261 self.logger.debug('assigning task infos (%s)', data['taskinfo'])
262
263 task = task_self()
264 task._info.update(taskinfo)
265 task.set_info('print_debug', _gw_print_debug)
266
267 if task.info('debug'):
268 self.logger.setLevel(logging.DEBUG)
269
270 self.logger.debug('inherited fanout value=%d',
271 task.info("fanout"))
272
273 self.logger.debug('launching execution/enter gathering state')
274
275 responder = WorkerTreeResponder(task, self, msg.srcid)
276
277 self.propagation = WorkerTree(msg.target, responder, timeout,
278 command=cmd,
279 topology=self.topology,
280 newroot=self.nodename,
281 stderr=stderr,
282 remote=remote)
283
284 responder.worker = self.propagation
285 self.propagation.upchannel = self
286 task.schedule(self.propagation)
287 self.logger.debug("WorkerTree scheduled")
288 self._ack(msg)
289 elif msg.action == 'write':
290 data = msg.data_decode()
291 self.logger.debug('GatewayChannel write: %d bytes',
292 len(data['buf']))
293 self.propagation.write(data['buf'])
294 self._ack(msg)
295 elif msg.action == 'eof':
296 self.logger.debug('GatewayChannel eof')
297 self.propagation.set_write_eof()
298 self._ack(msg)
299 else:
300 self.logger.error('unexpected CTL action: %s', msg.action)
301 else:
302 self.logger.error('unexpected message: %s', str(msg))
303
304 - def _ack(self, msg):
307
309 """Gateway (parent) channel is closing.
310
311 We abort the whole gateway task to stop other running workers.
312 This avoids any unwanted remaining processes on gateways.
313 """
314 self.logger.debug('GatewayChannel: ev_close')
315 self.worker.task.abort()
316
317
319 """ClusterShell gateway entry point"""
320 host = _getshorthostname()
321
322 logdir = os.path.expanduser(os.environ.get('CLUSTERSHELL_GW_LOG_DIR', \
323 '/tmp'))
324 loglevel = os.environ.get('CLUSTERSHELL_GW_LOG_LEVEL', 'INFO')
325 logging.basicConfig(level=getattr(logging, loglevel.upper(), logging.INFO),
326 format='%(asctime)s %(name)s %(levelname)s %(message)s',
327 filename=os.path.join(logdir, "%s.gw.log" % host))
328 logger = logging.getLogger(__name__)
329 sys.excepthook = gateway_excepthook
330
331 logger.debug('Starting gateway on %s', host)
332 logger.debug("environ=%s", os.environ)
333
334
335 set_nonblock_flag(sys.stdin.fileno())
336 set_nonblock_flag(sys.stdout.fileno())
337 set_nonblock_flag(sys.stderr.fileno())
338
339 task = task_self()
340
341
342 task.set_default("stdout_msgtree", False)
343 task.set_default("stderr_msgtree", False)
344
345 if sys.stdin.isatty():
346 logger.critical('Gateway failure: sys.stdin.isatty() is True')
347 sys.exit(1)
348
349 gateway = GatewayChannel(task)
350 worker = StreamWorker(handler=gateway)
351
352
353 worker._fanout = FANOUT_UNLIMITED
354 worker.set_reader(gateway.SNAME_READER, sys.stdin)
355 worker.set_writer(gateway.SNAME_WRITER, sys.stdout, retain=False)
356
357
358 task.schedule(worker)
359 logger.debug('Starting task')
360 try:
361 task.resume()
362 logger.debug('Task performed')
363 except EngineAbortException, exc:
364 logger.debug('EngineAbortException')
365 except IOError, exc:
366 logger.debug('Broken pipe (%s)', exc)
367 raise
368 except Exception, exc:
369 logger.exception('Gateway failure: %s', exc)
370 logger.debug('-------- The End --------')
371
372 if __name__ == '__main__':
373 __name__ = 'ClusterShell.Gateway'
374
375
376
377 gateway_main()
378