1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 """
22 A poll() based ClusterShell Engine.
23
24 The poll() system call is available on Linux and BSD.
25 """
26
27 import errno
28 import logging
29 import select
30 import time
31
32 from ClusterShell.Engine.Engine import Engine, E_READ, E_WRITE
33 from ClusterShell.Engine.Engine import EngineException
34 from ClusterShell.Engine.Engine import EngineNotSupportedError
35 from ClusterShell.Engine.Engine import EngineTimeoutException
36 from ClusterShell.Worker.EngineClient import EngineClientEOF
37
38
40 """
41 Poll Engine
42
43 ClusterShell engine using the select.poll mechanism (Linux poll()
44 syscall).
45 """
46
47 identifier = "poll"
48
59
61 """Engine-specific fd registering. Called by Engine register."""
62 if event & E_READ:
63 eventmask = select.POLLIN
64 else:
65 assert event & E_WRITE
66 eventmask = select.POLLOUT
67
68 self.polling.register(fd, eventmask)
69
73
75 """
76 Engine-specific modifications after a interesting event change for
77 a file descriptor. Called automatically by Engine register/unregister
78 and set_events(). For the poll() engine, it reg/unreg or modifies the
79 event mask associated to a file descriptor.
80 """
81 self._debug("MODSPEC fd=%d event=%x setvalue=%d" % (fd, event,
82 setvalue))
83 if setvalue:
84 self._register_specific(fd, event)
85 else:
86 self.polling.unregister(fd)
87
89 """
90 Poll engine run(): start clients and properly get replies
91 """
92 if not timeout:
93 timeout = -1
94
95 start_time = time.time()
96
97
98 while self.evlooprefcnt > 0:
99 self._debug("LOOP evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" \
100 % (self.evlooprefcnt, self.reg_clifds.keys(), \
101 len(self.timerq)))
102 try:
103 timeo = self.timerq.nextfire_delay()
104 if timeout > 0 and timeo >= timeout:
105
106 self.timerq.clear()
107 timeo = timeout
108 elif timeo == -1:
109 timeo = timeout
110
111 self._current_loopcnt += 1
112 evlist = self.polling.poll(timeo * 1000.0 + 1.0)
113
114 except select.error, (ex_errno, ex_strerror):
115
116 if ex_errno == errno.EINTR:
117 continue
118 elif ex_errno == errno.EINVAL:
119 msg = "Increase RLIMIT_NOFILE?"
120 logging.getLogger(__name__).error(msg)
121 raise
122
123 for fd, event in evlist:
124
125 if event & select.POLLNVAL:
126 raise EngineException("Caught POLLNVAL on fd %d" % fd)
127
128
129 client, stream = self._fd2client(fd)
130 if client is None:
131 continue
132
133 fdev = stream.evmask
134 sname = stream.name
135
136
137 self._current_stream = stream
138
139
140 if event & select.POLLERR:
141 self._debug("POLLERR %s" % client)
142 assert fdev & E_WRITE
143 self._debug("POLLERR: remove_stream sname %s fdev 0x%x"
144 % (sname, fdev))
145 self.remove_stream(client, stream)
146 self._current_stream = None
147 continue
148
149
150 if event & select.POLLIN:
151 assert fdev & E_READ
152 assert stream.events & fdev, (stream.events, fdev)
153 self.modify(client, sname, 0, fdev)
154 try:
155 client._handle_read(sname)
156 except EngineClientEOF:
157 self._debug("EngineClientEOF %s %s" % (client, sname))
158 self.remove_stream(client, stream)
159 self._current_stream = None
160 continue
161
162
163
164 elif event & select.POLLHUP:
165 self._debug("POLLHUP fd=%d %s (%s)" %
166 (fd, client.__class__.__name__, client.streams))
167 self.remove_stream(client, stream)
168 self._current_stream = None
169 continue
170
171
172 if event & select.POLLOUT:
173 self._debug("POLLOUT fd=%d %s (%s)" %
174 (fd, client.__class__.__name__, client.streams))
175 assert fdev == E_WRITE
176 assert stream.events & fdev
177 self.modify(client, sname, 0, fdev)
178 client._handle_write(sname)
179
180 self._current_stream = None
181
182
183 if client.registered:
184 self.set_events(client, stream)
185
186
187 if timeout > 0 and time.time() >= start_time + timeout:
188 raise EngineTimeoutException()
189
190
191 self.fire_timers()
192
193 self._debug("LOOP EXIT evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" % \
194 (self.evlooprefcnt, self.reg_clifds, len(self.timerq)))
195