Package ClusterShell :: Package Worker :: Module fastsubprocess
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Worker.fastsubprocess

  1  # fastsubprocess - POSIX relaxed revision of subprocess.py 
  2  # Based on Python 2.6.4 subprocess.py 
  3  # This is a performance oriented version of subprocess module. 
  4  # Modified by Stephane Thiell 
  5  # Changes: 
  6  #   * removed Windows specific code parts 
  7  #   * removed pipe for transferring possible exec failure from child to 
  8  #     parent, to avoid os.read() blocking call after each fork. 
  9  #   * child returns status code 255 on execv failure, which can be 
 10  #     handled with Popen.wait(). 
 11  #   * removed file objects creation using costly fdopen(): this version 
 12  #     returns non-blocking file descriptors bound to child 
 13  #   * added module method set_nonblock_flag() and used it in Popen(). 
 14  ## 
 15  # Original Disclaimer: 
 16  # 
 17  # For more information about this module, see PEP 324. 
 18  # 
 19  # This module should remain compatible with Python 2.2, see PEP 291. 
 20  # 
 21  # Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se> 
 22  # 
 23  # Licensed to PSF under a Contributor Agreement. 
 24  # See http://www.python.org/2.4/license for licensing details. 
 25   
 26  """_subprocess - Subprocesses with accessible I/O non-blocking file 
 27  descriptors 
 28   
 29  Faster revision of subprocess-like module. 
 30  """ 
 31   
 32  import sys 
 33  import os 
 34  import types 
 35  import gc 
 36  import signal 
 37   
 38  # Exception classes used by this module. 
39 -class CalledProcessError(Exception):
40 """This exception is raised when a process run by check_call() returns 41 a non-zero exit status. The exit status will be stored in the 42 returncode attribute."""
43 - def __init__(self, returncode, cmd):
44 self.returncode = returncode 45 self.cmd = cmd
46 - def __str__(self):
47 return "Command '%s' returned non-zero exit status %d" % (self.cmd, 48 self.returncode)
49 50 import select 51 import errno 52 import fcntl 53 54 __all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", \ 55 "CalledProcessError"] 56 57 try: 58 MAXFD = os.sysconf("SC_OPEN_MAX") 59 except: 60 MAXFD = 256 61 62 _active = [] 63
64 -def _cleanup():
65 for inst in _active[:]: 66 if inst._internal_poll(_deadstate=sys.maxint) >= 0: 67 try: 68 _active.remove(inst) 69 except ValueError: 70 # This can happen if two threads create a new Popen instance. 71 # It's harmless that it was already removed, so ignore. 72 pass
73 74 PIPE = -1 75 STDOUT = -2 76 77
78 -def call(*popenargs, **kwargs):
79 """Run command with arguments. Wait for command to complete, then 80 return the returncode attribute. 81 82 The arguments are the same as for the Popen constructor. Example: 83 84 retcode = call(["ls", "-l"]) 85 """ 86 return Popen(*popenargs, **kwargs).wait()
87 88
89 -def check_call(*popenargs, **kwargs):
90 """Run command with arguments. Wait for command to complete. If 91 the exit code was zero then return, otherwise raise 92 CalledProcessError. The CalledProcessError object will have the 93 return code in the returncode attribute. 94 95 The arguments are the same as for the Popen constructor. Example: 96 97 check_call(["ls", "-l"]) 98 """ 99 retcode = call(*popenargs, **kwargs) 100 cmd = kwargs.get("args") 101 if cmd is None: 102 cmd = popenargs[0] 103 if retcode: 104 raise CalledProcessError(retcode, cmd) 105 return retcode
106 107
108 -def set_nonblock_flag(fd):
109 """Set non blocking flag to file descriptor fd""" 110 old = fcntl.fcntl(fd, fcntl.F_GETFL) 111 fcntl.fcntl(fd, fcntl.F_SETFL, old | os.O_NDELAY)
112 113
114 -class Popen(object):
115 """A faster Popen"""
116 - def __init__(self, args, bufsize=0, executable=None, 117 stdin=None, stdout=None, stderr=None, 118 preexec_fn=None, shell=False, 119 cwd=None, env=None, universal_newlines=False):
120 """Create new Popen instance.""" 121 _cleanup() 122 123 self._child_created = False 124 if not isinstance(bufsize, (int, long)): 125 raise TypeError("bufsize must be an integer") 126 127 self.pid = None 128 self.returncode = None 129 self.universal_newlines = universal_newlines 130 131 # Input and output objects. The general principle is like 132 # this: 133 # 134 # Parent Child 135 # ------ ----- 136 # p2cwrite ---stdin---> p2cread 137 # c2pread <--stdout--- c2pwrite 138 # errread <--stderr--- errwrite 139 # 140 # On POSIX, the child objects are file descriptors. On 141 # Windows, these are Windows file handles. The parent objects 142 # are file descriptors on both platforms. The parent objects 143 # are None when not using PIPEs. The child objects are None 144 # when not redirecting. 145 146 (p2cread, p2cwrite, 147 c2pread, c2pwrite, 148 errread, errwrite) = self._get_handles(stdin, stdout, stderr) 149 150 self._execute_child(args, executable, preexec_fn, 151 cwd, env, universal_newlines, shell, 152 p2cread, p2cwrite, 153 c2pread, c2pwrite, 154 errread, errwrite) 155 156 if p2cwrite is not None: 157 set_nonblock_flag(p2cwrite) 158 self.stdin = p2cwrite 159 if c2pread is not None: 160 set_nonblock_flag(c2pread) 161 self.stdout = c2pread 162 if errread is not None: 163 set_nonblock_flag(errread) 164 self.stderr = errread
165 166
167 - def _translate_newlines(self, data):
168 data = data.replace("\r\n", "\n") 169 data = data.replace("\r", "\n") 170 return data
171 172
173 - def __del__(self, sys=sys):
174 if not self._child_created: 175 # We didn't get to successfully create a child process. 176 return 177 # In case the child hasn't been waited on, check if it's done. 178 self._internal_poll(_deadstate=sys.maxint) 179 if self.returncode is None and _active is not None: 180 # Child is still running, keep us alive until we can wait on it. 181 _active.append(self)
182 183
184 - def communicate(self, input=None):
185 """Interact with process: Send data to stdin. Read data from 186 stdout and stderr, until end-of-file is reached. Wait for 187 process to terminate. The optional input argument should be a 188 string to be sent to the child process, or None, if no data 189 should be sent to the child. 190 191 communicate() returns a tuple (stdout, stderr).""" 192 193 # Optimization: If we are only using one pipe, or no pipe at 194 # all, using select() or threads is unnecessary. 195 if [self.stdin, self.stdout, self.stderr].count(None) >= 2: 196 stdout = None 197 stderr = None 198 if self.stdin: 199 if input: 200 self.stdin.write(input) 201 self.stdin.close() 202 elif self.stdout: 203 stdout = self.stdout.read() 204 self.stdout.close() 205 elif self.stderr: 206 stderr = self.stderr.read() 207 self.stderr.close() 208 self.wait() 209 return (stdout, stderr) 210 211 return self._communicate(input)
212 213
214 - def poll(self):
215 return self._internal_poll()
216 217
218 - def _get_handles(self, stdin, stdout, stderr):
219 """Construct and return tupel with IO objects: 220 p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite 221 """ 222 p2cread, p2cwrite = None, None 223 c2pread, c2pwrite = None, None 224 errread, errwrite = None, None 225 226 if stdin is None: 227 pass 228 elif stdin == PIPE: 229 p2cread, p2cwrite = os.pipe() 230 elif isinstance(stdin, int): 231 p2cread = stdin 232 else: 233 # Assuming file-like object 234 p2cread = stdin.fileno() 235 236 if stdout is None: 237 pass 238 elif stdout == PIPE: 239 c2pread, c2pwrite = os.pipe() 240 elif isinstance(stdout, int): 241 c2pwrite = stdout 242 else: 243 # Assuming file-like object 244 c2pwrite = stdout.fileno() 245 246 if stderr is None: 247 pass 248 elif stderr == PIPE: 249 errread, errwrite = os.pipe() 250 elif stderr == STDOUT: 251 errwrite = c2pwrite 252 elif isinstance(stderr, int): 253 errwrite = stderr 254 else: 255 # Assuming file-like object 256 errwrite = stderr.fileno() 257 258 return (p2cread, p2cwrite, 259 c2pread, c2pwrite, 260 errread, errwrite)
261 262
263 - def _execute_child(self, args, executable, preexec_fn, 264 cwd, env, universal_newlines, shell, 265 p2cread, p2cwrite, 266 c2pread, c2pwrite, 267 errread, errwrite):
268 """Execute program (POSIX version)""" 269 270 if isinstance(args, types.StringTypes): 271 args = [args] 272 else: 273 args = list(args) 274 275 if shell: 276 args = ["/bin/sh", "-c"] + args 277 278 if executable is None: 279 executable = args[0] 280 281 gc_was_enabled = gc.isenabled() 282 # Disable gc to avoid bug where gc -> file_dealloc -> 283 # write to stderr -> hang. http://bugs.python.org/issue1336 284 gc.disable() 285 try: 286 self.pid = os.fork() 287 except: 288 if gc_was_enabled: 289 gc.enable() 290 raise 291 self._child_created = True 292 if self.pid == 0: 293 # Child 294 try: 295 # Close parent's pipe ends 296 if p2cwrite is not None: 297 os.close(p2cwrite) 298 if c2pread is not None: 299 os.close(c2pread) 300 if errread is not None: 301 os.close(errread) 302 303 # Dup fds for child 304 if p2cread is not None: 305 os.dup2(p2cread, 0) 306 if c2pwrite is not None: 307 os.dup2(c2pwrite, 1) 308 if errwrite is not None: 309 os.dup2(errwrite, 2) 310 311 # Close pipe fds. Make sure we don't close the same 312 # fd more than once, or standard fds. 313 if p2cread is not None and p2cread not in (0,): 314 os.close(p2cread) 315 if c2pwrite is not None and c2pwrite not in (p2cread, 1): 316 os.close(c2pwrite) 317 if errwrite is not None and errwrite not in \ 318 (p2cread, c2pwrite, 2): 319 os.close(errwrite) 320 321 if cwd is not None: 322 os.chdir(cwd) 323 324 if preexec_fn: 325 preexec_fn() 326 327 if env is None: 328 os.execvp(executable, args) 329 else: 330 os.execvpe(executable, args, env) 331 except: 332 # Child execution failure 333 os._exit(255) 334 335 # Parent 336 if gc_was_enabled: 337 gc.enable() 338 339 if p2cread is not None and p2cwrite is not None: 340 os.close(p2cread) 341 if c2pwrite is not None and c2pread is not None: 342 os.close(c2pwrite) 343 if errwrite is not None and errread is not None: 344 os.close(errwrite)
345 346
347 - def _handle_exitstatus(self, sts):
348 if os.WIFSIGNALED(sts): 349 self.returncode = -os.WTERMSIG(sts) 350 elif os.WIFEXITED(sts): 351 self.returncode = os.WEXITSTATUS(sts) 352 else: 353 # Should never happen 354 raise RuntimeError("Unknown child exit status!")
355 356
357 - def _internal_poll(self, _deadstate=None):
358 """Check if child process has terminated. Returns returncode 359 attribute.""" 360 if self.returncode is None: 361 try: 362 pid, sts = os.waitpid(self.pid, os.WNOHANG) 363 if pid == self.pid: 364 self._handle_exitstatus(sts) 365 except os.error: 366 if _deadstate is not None: 367 self.returncode = _deadstate 368 return self.returncode
369 370
371 - def wait(self):
372 """Wait for child process to terminate. Returns returncode 373 attribute.""" 374 if self.returncode is None: 375 pid, sts = os.waitpid(self.pid, 0) 376 self._handle_exitstatus(sts) 377 return self.returncode
378 379
380 - def _communicate(self, input):
381 read_set = [] 382 write_set = [] 383 stdout = None # Return 384 stderr = None # Return 385 386 if self.stdin: 387 # Flush stdio buffer. This might block, if the user has 388 # been writing to .stdin in an uncontrolled fashion. 389 self.stdin.flush() 390 if input: 391 write_set.append(self.stdin) 392 else: 393 self.stdin.close() 394 if self.stdout: 395 read_set.append(self.stdout) 396 stdout = [] 397 if self.stderr: 398 read_set.append(self.stderr) 399 stderr = [] 400 401 input_offset = 0 402 while read_set or write_set: 403 try: 404 rlist, wlist, xlist = select.select(read_set, write_set, []) 405 except select.error, ex: 406 if ex.args[0] == errno.EINTR: 407 continue 408 raise 409 410 if self.stdin in wlist: 411 # When select has indicated that the file is writable, 412 # we can write up to PIPE_BUF bytes without risk 413 # blocking. POSIX defines PIPE_BUF >= 512 414 chunk = input[input_offset : input_offset + 512] 415 bytes_written = os.write(self.stdin.fileno(), chunk) 416 input_offset += bytes_written 417 if input_offset >= len(input): 418 self.stdin.close() 419 write_set.remove(self.stdin) 420 421 if self.stdout in rlist: 422 data = os.read(self.stdout.fileno(), 1024) 423 if data == "": 424 self.stdout.close() 425 read_set.remove(self.stdout) 426 stdout.append(data) 427 428 if self.stderr in rlist: 429 data = os.read(self.stderr.fileno(), 1024) 430 if data == "": 431 self.stderr.close() 432 read_set.remove(self.stderr) 433 stderr.append(data) 434 435 # All data exchanged. Translate lists into strings. 436 if stdout is not None: 437 stdout = ''.join(stdout) 438 if stderr is not None: 439 stderr = ''.join(stderr) 440 441 # Translate newlines, if requested. We cannot let the file 442 # object do the translation: It is based on stdio, which is 443 # impossible to combine with select (unless forcing no 444 # buffering). 445 if self.universal_newlines and hasattr(file, 'newlines'): 446 if stdout: 447 stdout = self._translate_newlines(stdout) 448 if stderr: 449 stderr = self._translate_newlines(stderr) 450 451 self.wait() 452 return (stdout, stderr)
453
454 - def send_signal(self, sig):
455 """Send a signal to the process 456 """ 457 os.kill(self.pid, sig)
458
459 - def terminate(self):
460 """Terminate the process with SIGTERM 461 """ 462 self.send_signal(signal.SIGTERM)
463
464 - def kill(self):
465 """Kill the process with SIGKILL 466 """ 467 self.send_signal(signal.SIGKILL)
468