1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 """
24 A select() based ClusterShell Engine.
25
26 The select() system call is available on almost every UNIX-like systems.
27 """
28
29 import errno
30 import select
31 import sys
32 import time
33
34 from ClusterShell.Engine.Engine import Engine, E_READ, E_WRITE
35 from ClusterShell.Engine.Engine import EngineTimeoutException
36 from ClusterShell.Worker.EngineClient import EngineClientEOF
37
38
40 """
41 Select Engine
42
43 ClusterShell engine using the select.select mechanism
44 """
45
46 identifier = "select"
47
49 """
50 Initialize Engine.
51 """
52 Engine.__init__(self, info)
53 self._fds_r = []
54 self._fds_w = []
55
57 """
58 Engine-specific fd registering. Called by Engine register.
59 """
60 if event & E_READ:
61 self._fds_r.append(fd)
62 else:
63 assert event & E_WRITE
64 self._fds_w.append(fd)
65
67 """
68 Engine-specific fd unregistering. Called by Engine unregister.
69 """
70 if ev_is_set or True:
71 if fd in self._fds_r:
72 self._fds_r.remove(fd)
73 if fd in self._fds_w:
74 self._fds_w.remove(fd)
75
77 """
78 Engine-specific modifications after a interesting event change
79 for a file descriptor. Called automatically by Engine
80 register/unregister and set_events(). For the select() engine,
81 it appends/remove the fd to/from the concerned fd_sets.
82 """
83 self._debug("MODSPEC fd=%d event=%x setvalue=%d" % (fd, event,
84 setvalue))
85 if setvalue:
86 self._register_specific(fd, event)
87 else:
88 self._unregister_specific(fd, True)
89
91 """
92 Select engine run(): start clients and properly get replies
93 """
94 if not timeout:
95 timeout = -1
96
97 start_time = time.time()
98
99
100 while self.evlooprefcnt > 0:
101 self._debug("LOOP evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" %
102 (self.evlooprefcnt, self.reg_clifds.keys(), len(self.timerq)))
103 try:
104 timeo = self.timerq.nextfire_delay()
105 if timeout > 0 and timeo >= timeout:
106
107 self.timerq.clear()
108 timeo = timeout
109 elif timeo == -1:
110 timeo = timeout
111
112 self._current_loopcnt += 1
113 if timeo >= 0:
114 r_ready, w_ready, x_ready = \
115 select.select(self._fds_r, self._fds_w, [], timeo)
116 else:
117
118 r_ready, w_ready, x_ready = \
119 select.select(self._fds_r, self._fds_w, [])
120 except select.error, (ex_errno, ex_strerror):
121
122 if ex_errno == errno.EINTR:
123 continue
124 elif ex_errno in [errno.EINVAL, errno.EBADF, errno.ENOMEM]:
125 msg = "Increase RLIMIT_NOFILE?"
126 logging.getLogger(__name__).error(msg)
127 raise
128
129
130 for fd in set(r_ready) | set(w_ready):
131
132
133 client, stream = self._fd2client(fd)
134 if client is None:
135 continue
136
137 fdev = stream.evmask
138 sname = stream.name
139
140
141 self._current_stream = stream
142
143
144 if fd in r_ready:
145 self._debug("R_READY fd=%d %s (%s)" % (fd,
146 client.__class__.__name__, client.streams))
147 assert fdev & E_READ
148 assert stream.events & fdev
149 self.modify(client, sname, 0, fdev)
150 try:
151 client._handle_read(sname)
152 except EngineClientEOF:
153 self._debug("EngineClientEOF %s" % client)
154 self.remove_stream(client, stream)
155
156
157 if fd in w_ready:
158 self._debug("W_READY fd=%d %s (%s)" % (fd,
159 client.__class__.__name__, client.streams))
160 assert fdev == E_WRITE
161 assert stream.events & fdev
162 self.modify(client, sname, 0, fdev)
163 client._handle_write(sname)
164
165
166 self._current_stream = None
167
168
169 if client.registered:
170 self.set_events(client, stream)
171
172
173 if timeout > 0 and time.time() >= start_time + timeout:
174 raise EngineTimeoutException()
175
176
177 self.fire_timers()
178
179 self._debug("LOOP EXIT evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" %
180 (self.evlooprefcnt, self.reg_clifds, len(self.timerq)))
181