1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 ClusterShell base worker for process-based workers.
24
25 This module manages the worker class to spawn local commands, possibly using
26 a nodeset to behave like a distant worker. Like other workers it can run
27 commands or copy files, locally.
28
29 This is the base class for most of other distant workers.
30 """
31
32 import os
33 from string import Template
34
35 from ClusterShell.NodeSet import NodeSet
36 from ClusterShell.Worker.EngineClient import EngineClient
37 from ClusterShell.Worker.Worker import WorkerError, DistantWorker
38
39
41 """
42 Replace keywords in `pattern' with value from `node' and `rank'.
43
44 %h, %host map `node'
45 %n, %rank map `rank'
46 """
47 variables = {
48 'h': node,
49 'host': node,
50 'hosts': node,
51 'n': rank or 0,
52 'rank': rank or 0,
53
54 }
55 class Replacer(Template):
56 delimiter = '%'
57 try:
58 cmd = Replacer(pattern).substitute(variables)
59 except (KeyError, ValueError), error:
60 msg = "%s is not a valid pattern, use '%%%%' to escape '%%'" % error
61 raise WorkerError(msg)
62 return cmd
63
65 """
66 Run a simple local command.
67
68 Useful as a superclass for other more specific workers.
69 """
70
71 - def __init__(self, node, command, worker, stderr, timeout, autoclose=False,
72 rank=None):
73 """
74 Create an EngineClient-type instance to locally run `command'.
75
76 :param node: will be used as key.
77 """
78 EngineClient.__init__(self, worker, node, stderr, timeout, autoclose)
79 self.rank = rank
80 self.command = command
81 self.popen = None
82
83 self.streams.set_writer(worker.SNAME_STDIN, None, retain=True)
84
86 """
87 Build the shell command line to start the commmand.
88
89 Return a tuple containing command and arguments as a string or a list
90 of string, and a dict of additional environment variables. None could
91 be returned if no environment change is required.
92 """
93 return (_replace_cmd(self.command, self.key, self.rank), None)
94
96 """Prepare command and start client."""
97
98
99 cmd, cmd_env = self._build_cmd()
100
101
102 shell = type(cmd) is str
103
104 task = self.worker.task
105 if task.info("debug", False):
106 name = str(self.__class__).upper().split('.')[-1]
107 if shell:
108 task.info("print_debug")(task, "%s: %s" % (name, cmd))
109 else:
110 task.info("print_debug")(task, "%s: %s" % (name, ' '.join(cmd)))
111
112 self.popen = self._exec_nonblock(cmd, env=cmd_env, shell=shell)
113 self._on_nodeset_start(self.key)
114 return self
115
116 - def _close(self, abort, timeout):
117 """Close client. See EngineClient._close()."""
118 if abort:
119
120 prc = self.popen.poll()
121
122 if prc is None:
123 try:
124 self.popen.kill()
125 except OSError:
126 pass
127 prc = self.popen.wait()
128
129 self.streams.clear()
130
131 if prc >= 0:
132 self._on_nodeset_rc(self.key, prc)
133 elif timeout:
134 assert abort, "abort flag not set on timeout"
135 self.worker._on_node_timeout(self.key)
136 elif not abort:
137
138 self._on_nodeset_rc(self.key, 128 + -prc)
139
140 self.worker._check_fini()
141
143 """local wrapper over _on_start that can also handle nodeset"""
144 if isinstance(nodes, NodeSet):
145 for node in nodes:
146 self.worker._on_start(node)
147 else:
148 self.worker._on_start(nodes)
149
151 """local wrapper over _on_node_rc that can also handle nodeset"""
152 if isinstance(nodes, NodeSet):
153 for node in nodes:
154 self.worker._on_node_rc(node, rc)
155 else:
156 self.worker._on_node_rc(nodes, rc)
157
159 """local wrapper over _on_node_msgline that can also handle nodeset"""
160 if isinstance(nodes, NodeSet):
161 for node in nodes:
162 self.worker._on_node_msgline(node, msg, sname)
163 else:
164 self.worker._on_node_msgline(nodes, msg, sname)
165
167 """Called at close time to flush stream read buffer."""
168 stream = self.streams[sname]
169 if stream.readable() and stream.rbuf:
170
171
172 self._on_nodeset_msgline(self.key, stream.rbuf, sname)
173
175 """
176 Handle a read notification. Called by the engine as the result of an
177 event indicating that a read is available.
178 """
179
180 worker = self.worker
181 task = worker.task
182 key = self.key
183 node_msgline = self._on_nodeset_msgline
184 debug = task.info("debug", False)
185 if debug:
186 print_debug = task.info("print_debug")
187 for msg in self._readlines(sname):
188 if debug:
189 print_debug(task, "%s: %s" % (key, msg))
190 node_msgline(key, msg, sname)
191
193 """
194 Run a local `cp' between a source and destination.
195
196 Destination could be a directory.
197 """
198
199 - def __init__(self, node, source, dest, worker, stderr, timeout, autoclose,
200 preserve, reverse, rank=None):
201 """Create an EngineClient-type instance to locally run 'cp'."""
202 ExecClient.__init__(self, node, None, worker, stderr, timeout,
203 autoclose, rank)
204 self.source = source
205 self.dest = dest
206
207
208 self.preserve = preserve
209
210
211 self.reverse = reverse
212
213
214
215
216 if self.reverse:
217 self.isdir = os.path.isdir(self.dest)
218 if not self.isdir:
219 raise ValueError("reverse copy dest must be a directory")
220 else:
221 self.isdir = os.path.isdir(self.source)
222
224 """
225 Build the shell command line to start the rcp commmand.
226 Return an array of command and arguments.
227 """
228 source = _replace_cmd(self.source, self.key, self.rank)
229 dest = _replace_cmd(self.dest, self.key, self.rank)
230
231 cmd_l = [ "cp" ]
232
233 if self.isdir:
234 cmd_l.append("-r")
235
236 if self.preserve:
237 cmd_l.append("-p")
238
239 if self.reverse:
240 cmd_l.append(dest)
241 cmd_l.append(source)
242 else:
243 cmd_l.append(source)
244 cmd_l.append(dest)
245
246 return (cmd_l, None)
247
248
250 """
251 ClusterShell simple execution worker Class.
252
253 It runs commands locally. If a node list is provided, one command will be
254 launched for each node and specific keywords will be replaced based on node
255 name and rank.
256
257 Local shell usage example:
258
259 >>> worker = ExecWorker(nodeset, handler=MyEventHandler(),
260 ... timeout=30, command="/bin/uptime")
261 >>> task.schedule(worker) # schedule worker for execution
262 >>> task.run() # run
263
264 Local copy usage example:
265
266 >>> worker = ExecWorker(nodeset, handler=MyEventHandler(),
267 ... source="/etc/my.cnf",
268 ... dest="/etc/my.cnf.bak")
269 >>> task.schedule(worker) # schedule worker for execution
270 >>> task.run() # run
271
272 connect_timeout option is ignored by this worker.
273 """
274
275 SHELL_CLASS = ExecClient
276 COPY_CLASS = CopyClient
277
278 - def __init__(self, nodes, handler, timeout=None, **kwargs):
279 """Create an ExecWorker and its engine client instances."""
280 DistantWorker.__init__(self, handler)
281 self._close_count = 0
282 self._has_timeout = False
283 self._clients = []
284
285 self.nodes = NodeSet(nodes)
286 self.command = kwargs.get('command')
287 self.source = kwargs.get('source')
288 self.dest = kwargs.get('dest')
289
290 self._create_clients(timeout=timeout, **kwargs)
291
292
293
294
295
297 """
298 Create several shell and copy engine client instances based on worker
299 properties.
300
301 Additional arguments in `kwargs' will be used for client creation.
302 There will be one client per node in self.nodes
303 """
304
305 if self.command and ('%hosts' in self.command or
306 '%{hosts}' in self.command):
307 self._add_client(self.nodes, rank=None, **kwargs)
308 else:
309 for rank, node in enumerate(self.nodes):
310 self._add_client(node, rank=rank, **kwargs)
311
313 """Create one shell or copy client."""
314 autoclose = kwargs.get('autoclose', False)
315 stderr = kwargs.get('stderr', False)
316 rank = kwargs.get('rank')
317 timeout = kwargs.get('timeout')
318
319 if self.command is not None:
320 cls = self.__class__.SHELL_CLASS
321 self._clients.append(cls(nodes, self.command, self, stderr,
322 timeout, autoclose, rank))
323 elif self.source:
324 cls = self.__class__.COPY_CLASS
325 self._clients.append(cls(nodes, self.source, self.dest, self,
326 stderr, timeout, autoclose,
327 kwargs.get('preserve', False),
328 kwargs.get('reverse', False), rank))
329 else:
330 raise ValueError("missing command or source parameter in "
331 "worker constructor")
332
334 """
335 Used by upper layer to get the list of underlying created engine
336 clients.
337 """
338 return self._clients
339
340 - def write(self, buf, sname=None):
341 """Write to worker clients."""
342 sname = sname or self.SNAME_STDIN
343 for client in self._clients:
344 if sname in client.streams:
345 client._write(sname, buf)
346
348 """
349 Tell worker to close its writer file descriptors once flushed. Do not
350 perform writes after this call.
351 """
352 for client in self._clients:
353 client._set_write_eof(sname or self.SNAME_STDIN)
354
356 """Abort processing any action by this worker."""
357 for client in self._clients:
358 client.abort()
359
360
361
362
363
367
369 """
370 Must be called by each client when closing.
371
372 If they are all closed, trigger the required events.
373 """
374 self._close_count += 1
375 assert self._close_count <= len(self._clients)
376 if self._close_count == len(self._clients) and self.eh:
377 if self._has_timeout:
378 self.eh.ev_timeout(self)
379 self.eh.ev_close(self)
380
381 WORKER_CLASS = ExecWorker
382