1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """
21 WorkerPdsh
22
23 ClusterShell worker for executing commands with LLNL pdsh.
24 """
25
26 import errno
27 import os
28 import shlex
29
30 from ClusterShell.NodeSet import NodeSet
31 from ClusterShell.Worker.EngineClient import EngineClientError
32 from ClusterShell.Worker.EngineClient import EngineClientNotSupportedError
33 from ClusterShell.Worker.Worker import WorkerError
34 from ClusterShell.Worker.Exec import ExecWorker, ExecClient, CopyClient
35
36
38 """EngineClient which run 'pdsh'"""
39
40 MODE = 'pdsh'
41
42 - def __init__(self, node, command, worker, stderr, timeout, autoclose=False,
43 rank=None):
47
49 """
50 Build the shell command line to start the commmand.
51 Return an array of command and arguments.
52 """
53 task = self.worker.task
54 pdsh_env = {}
55
56
57 path = task.info("pdsh_path") or "pdsh"
58 cmd_l = [os.path.expanduser(pathc) for pathc in shlex.split(path)]
59 cmd_l.append("-b")
60
61 fanout = task.info("fanout", 0)
62 if fanout > 0:
63 cmd_l.append("-f %d" % fanout)
64
65
66
67
68 connect_timeout = task.info("connect_timeout", 0)
69 if connect_timeout > 0:
70 pdsh_env['PDSH_SSH_ARGS_APPEND'] = "-o ConnectTimeout=%d" % \
71 connect_timeout
72
73 command_timeout = task.info("command_timeout", 0)
74 if command_timeout > 0:
75 cmd_l.append("-u %d" % command_timeout)
76
77 cmd_l.append("-w %s" % self.key)
78 cmd_l.append("%s" % self.command)
79
80 return (cmd_l, pdsh_env)
81
82 - def _close(self, abort, timeout):
83 """Close client. See EngineClient._close()."""
84 if abort:
85
86 prc = self.popen.poll()
87
88 if prc is None:
89 try:
90 self.popen.kill()
91 except OSError:
92 pass
93 prc = self.popen.wait()
94
95 if prc > 0:
96 raise WorkerError("Cannot run pdsh (error %d)" % prc)
97
98 self.streams.clear()
99
100 if timeout:
101 assert abort, "abort flag not set on timeout"
102 for node in (self.key - self._closed_nodes):
103 self.worker._on_node_timeout(node)
104 else:
105 for node in (self.key - self._closed_nodes):
106 self.worker._on_node_rc(node, 0)
107
108 self.worker._check_fini()
109
111 """
112 Parse Pdsh line syntax.
113 """
114 if line.startswith("pdsh@") or \
115 line.startswith("pdcp@") or \
116 line.startswith("sending "):
117 try:
118
119
120
121
122
123
124
125
126
127
128
129
130 words = line.split()
131
132 if self.MODE == 'pdsh':
133 if len(words) == 4 and words[2] == "command" and \
134 words[3] == "timeout":
135 pass
136 elif len(words) == 8 and words[3] == "exited" and \
137 words[7].isdigit():
138 self._closed_nodes.add(words[1][:-1])
139 self.worker._on_node_rc(words[1][:-1], int(words[7]))
140 elif self.MODE == 'pdcp':
141 self._closed_nodes.add(words[1][:-1])
142 self.worker._on_node_rc(words[1][:-1], errno.ENOENT)
143
144 except Exception, exc:
145 raise EngineClientError("Pdsh parser error: %s" % exc)
146 else:
147
148 nodename, msg = line.split(': ', 1)
149 self.worker._on_node_msgline(nodename, msg, sname)
150
152 """Called at close time to flush stream read buffer."""
153 pass
154
156 """Engine is telling us a read is available."""
157 debug = self.worker.task.info("debug", False)
158 if debug:
159 print_debug = self.worker.task.info("print_debug")
160
161 suffix = ""
162 if sname == 'stderr':
163 suffix = "@STDERR"
164
165 for msg in self._readlines(sname):
166 if debug:
167 print_debug(self.worker.task, "PDSH%s: %s" % (suffix, msg))
168 self._parse_line(msg, sname)
169
170
172 """EngineClient when pdsh is run to copy file, using pdcp."""
173
174 MODE = 'pdcp'
175
177
178 cmd_l = []
179
180
181 if self.reverse:
182 path = self.worker.task.info("rpdcp_path") or "rpdcp"
183 else:
184 path = self.worker.task.info("pdcp_path") or "pdcp"
185 cmd_l = [os.path.expanduser(pathc) for pathc in shlex.split(path)]
186 cmd_l.append("-b")
187
188 fanout = self.worker.task.info("fanout", 0)
189 if fanout > 0:
190 cmd_l.append("-f %d" % fanout)
191
192 connect_timeout = self.worker.task.info("connect_timeout", 0)
193 if connect_timeout > 0:
194 cmd_l.append("-t %d" % connect_timeout)
195
196 cmd_l.append("-w %s" % self.key)
197
198 if self.isdir:
199 cmd_l.append("-r")
200
201 if self.preserve:
202 cmd_l.append("-p")
203
204 cmd_l.append(self.source)
205 cmd_l.append(self.dest)
206
207 return (cmd_l, None)
208
209
211 """
212 ClusterShell pdsh-based worker Class.
213
214 Remote Shell (pdsh) usage example:
215 >>> worker = WorkerPdsh(nodeset, handler=MyEventHandler(),
216 ... timeout=30, command="/bin/hostname")
217 >>> task.schedule(worker) # schedule worker for execution
218 >>> task.resume() # run
219
220 Remote Copy (pdcp) usage example:
221 >>> worker = WorkerPdsh(nodeset, handler=MyEventHandler(),
222 ... timeout=30, source="/etc/my.conf",
223 ... dest="/etc/my.conf")
224 >>> task.schedule(worker) # schedule worker for execution
225 >>> task.resume() # run
226
227 Known limitations:
228 - write() is not supported by WorkerPdsh
229 - return codes == 0 are not garanteed when a timeout is used (rc > 0
230 are fine)
231 """
232
233 SHELL_CLASS = PdshClient
234 COPY_CLASS = PdcpClient
235
236
237
238
239
242
244 """
245 Write data to process. Not supported with Pdsh worker.
246 """
247 raise EngineClientNotSupportedError("writing is not supported by pdsh "
248 "worker")
249
251 """
252 Tell worker to close its writer file descriptor once flushed. Do not
253 perform writes after this call.
254
255 Not supported by PDSH Worker.
256 """
257 raise EngineClientNotSupportedError("writing is not supported by pdsh "
258 "worker")
259
260 WORKER_CLASS = WorkerPdsh
261