Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

# Copyright (c) 2014, Facebook, Inc.  All rights reserved. 

# 

# This source code is licensed under the BSD-style license found in the 

# LICENSE file in the root directory of this source tree. An additional grant 

# of patent rights can be found in the PATENTS file in the same directory. 

# 

"""Module that provides an API for executing and managing child processes.""" 

from __future__ import absolute_import 

 

from sparts.counters import counter 

from sparts.sparts import option 

 

from twisted.internet.protocol import ProcessProtocol 

from twisted.protocols.basic import LineReceiver 

from sparts.tasks.twisted import TwistedTask 

 

import functools 

import signal 

import six 

import time 

import twisted.python.threadable 

import twisted.internet.threads 

 

 

class CommandTask(TwistedTask): 

    """A task that provides a useful API for executing other commands. 

 

    Python's Popen() can be hard to use, especially if you are executing long 

    running child processes, and need to handle various stdout, stderr, and 

    process exit events asynchronously. 

 

    This particular implementation relies on Twisted's ProcessProtocol, but it 

    wraps it in a way that makes it mostly opaque. 

    """ 

    LOOPLESS = True 

    OPT_PREFIX = 'cmd' 

 

    kill_timeout = option(type=float, default=10.0, 

                          help="Default shutdown kill timeout for outstanding " 

                               "commands [%(default)s]") 

    started = counter() 

    finished = counter() 

 

    def run(self, command, on_stdout=None, on_stderr=None, on_exit=None, 

            line_buffered=True, kill_timeout=None, env=None): 

        """Call this function to start a new child process running `command`. 

         

        Additional callbacks, such as `on_stdout`, `on_stderr`, and `on_exit`, 

        can be provided, that will receive a variety of parameters on the 

        appropriate events. 

 

        Line buffering can be disabled by passing `line_buffered`=False. 

 

        Also, a custom `kill_timeout` (seconds) may be set that overrides the 

        task default, in the event that a shutdown is received and you want 

        to allow more time for the command to shut down.""" 

        self.logger.debug("task starting %s...", command) 

        if isinstance(command, six.string_types): 

            command = command.split(" ") 

 

        # wrap on_exit with helper to remove registered comments 

        on_exit = functools.partial(self._procExited, on_exit) 

 

        proto = _ProcessProtocolAdapter(on_stdout, on_stderr, on_exit, 

                                       line_buffered=line_buffered) 

 

        if twisted.python.threadable.isInIOThread(): 

            result = self.reactor.spawnProcess(proto, executable=command[0], 

                                               args=command) 

        else: 

            result = twisted.internet.threads.blockingCallFromThread( 

                self.reactor, self.reactor.spawnProcess, proto, 

                executable=command[0], args=command, env=env) 

 

        self.outstanding[result] = kill_timeout 

 

        self.started.increment() 

        return result 

 

    def initTask(self): 

        super(CommandTask, self).initTask() 

        self.outstanding = {} 

 

    def _procExited(self, on_exit, proto, trans, reason): 

        self.logger.debug("%s closed for %s", trans, reason) 

        if on_exit is not None: 

            on_exit(reason) 

 

        self.outstanding.pop(trans) 

 

        self.finished.increment() 

        return None 

 

    def join(self): 

        """Overridden to block for process workers to shutdown / be killed.""" 

        # TODO: Conditions instead of sleep polling? 

        while len(self.outstanding) > 0: 

            time.sleep(0.250) 

 

    def _killOutstanding(self, trans): 

        if trans in self.outstanding: 

            self.logger.info("Sending SIGKILL to %s", trans) 

            trans.signalProcess(signal.SIGKILL) 

 

    def stop(self): 

        # twisted is pretty smart; the default signal handlers it installs 

        # propagate SIGTERM to its children, so while we don't need to manually 

        # TERM, we might still need to set some kill timeouts 

        super(CommandTask, self).stop() 

 

        for trans, kill_timeout in self.outstanding.items(): 

            if kill_timeout is None: 

                kill_timeout = self.kill_timeout 

 

            self.logger.info("Enqueuing kill for %s in %.1fs", 

                             trans, kill_timeout) 

            args = (kill_timeout, self._killOutstanding, trans) 

            if twisted.python.threadable.isInIOThread(): 

                self.reactor.callLater(*args) 

            else: 

                self.reactor.callFromThread(self.reactor.callLater, *args) 

 

    def isDoneWithReactor(self): 

        """Overridden to keep reactor running until all commands finish.""" 

        return len(self.outstanding) == 0 

 

class _ProcessProtocolAdapter(ProcessProtocol): 

    """ProcessProtocol that allows custom callbacks, buffering.""" 

    def __init__(self, on_stdout=None, on_stderr=None, on_exit=None, 

                 line_buffered=True): 

 

        self.line_buffered = line_buffered 

        self.on_stdout = on_stdout or (lambda *args: None) 

        self.on_stderr = on_stderr or (lambda *args: None) 

        self.on_exit = on_exit or (lambda *args: None) 

 

        if self.line_buffered: 

            self.stdout_buffer = self._makeBuffer(self.on_stdout) 

            self.stderr_buffer = self._makeBuffer(self.on_stderr) 

 

    def _makeBuffer(self, callback): 

        buf = LineReceiver() 

        buf.delimiter = "\n" 

        buf.lineReceived = lambda line: callback(self.transport, line) 

        return buf 

 

    def connectionMade(self): 

        ProcessProtocol.connectionMade(self) 

        self.original_pid = self.transport.pid 

 

    def outReceived(self, data): 

        if self.line_buffered: 

            self.stdout_buffer.dataReceived(data) 

        else: 

            self.on_stdout(self.transport, data) 

 

    def errReceived(self, data): 

        if self.line_buffered: 

            self.stderr_buffer.dataReceived(data) 

        else: 

            self.on_stderr(self.transport, data) 

 

    def processEnded(self, reason): 

        # Ugh, this is a hack... I need to pass self because Twisted unsets pid 

        # after the process has exited, but I need the *old* pid to do proper 

        # accounting.  *YES* I get why this is fundamentally broken 

        self.on_exit(self, self.transport, reason) 

        self.transport.loseConnection()