1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """
21 A ClusterShell Engine using epoll, an I/O event notification facility.
22
23 The epoll event distribution interface is available on Linux 2.6, and
24 has been included in Python 2.6.
25 """
26
27 import errno
28 import select
29 import time
30
31 from ClusterShell.Engine.Engine import Engine, E_READ, E_WRITE
32 from ClusterShell.Engine.Engine import EngineNotSupportedError
33 from ClusterShell.Engine.Engine import EngineTimeoutException
34 from ClusterShell.Worker.EngineClient import EngineClientEOF
35
36
38 """
39 EPoll Engine
40
41 ClusterShell Engine class using the select.epoll mechanism.
42 """
43
44 identifier = "epoll"
45
56
58 """Release engine-specific resources."""
59 self.epolling.close()
60
62 """Engine-specific fd registering. Called by Engine register."""
63 if event & E_READ:
64 eventmask = select.EPOLLIN
65 else:
66 assert event & E_WRITE
67 eventmask = select.EPOLLOUT
68
69 self.epolling.register(fd, eventmask)
70
72 """
73 Engine-specific fd unregistering. Called by Engine unregister.
74 """
75 self._debug("UNREGSPEC fd=%d ev_is_set=%x"% (fd, ev_is_set))
76 if ev_is_set:
77 self.epolling.unregister(fd)
78
80 """
81 Engine-specific modifications after a interesting event change for
82 a file descriptor. Called automatically by Engine set_events().
83 For the epoll engine, it modifies the event mask associated to a file
84 descriptor.
85 """
86 self._debug("MODSPEC fd=%d event=%x setvalue=%d" % (fd, event,
87 setvalue))
88 if setvalue:
89 self._register_specific(fd, event)
90 else:
91 self.epolling.unregister(fd)
92
94 """
95 Run epoll main loop.
96 """
97 if not timeout:
98 timeout = -1
99
100 start_time = time.time()
101
102
103 while self.evlooprefcnt > 0:
104 self._debug("LOOP evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" % \
105 (self.evlooprefcnt, self.reg_clifds.keys(),
106 len(self.timerq)))
107 try:
108 timeo = self.timerq.nextfire_delay()
109 if timeout > 0 and timeo >= timeout:
110
111 self.timerq.clear()
112 timeo = timeout
113 elif timeo == -1:
114 timeo = timeout
115
116 self._current_loopcnt += 1
117 evlist = self.epolling.poll(timeo + 0.001)
118
119 except IOError, ex:
120
121 if ex.errno == errno.EINTR:
122 continue
123
124 for fd, event in evlist:
125
126
127 client, stream = self._fd2client(fd)
128 if client is None:
129 continue
130
131 fdev = stream.evmask
132 sname = stream.name
133
134
135 self._current_stream = stream
136
137
138 if event & select.EPOLLERR:
139 self._debug("EPOLLERR fd=%d sname=%s fdev=0x%x (%s)" % \
140 (fd, sname, fdev, client))
141 assert fdev & E_WRITE
142 self.remove_stream(client, stream)
143 self._current_stream = None
144 continue
145
146
147 if event & select.EPOLLIN:
148 assert fdev & E_READ
149 assert stream.events & fdev, (stream.events, fdev)
150 self.modify(client, sname, 0, fdev)
151 try:
152 client._handle_read(sname)
153 except EngineClientEOF:
154 self._debug("EngineClientEOF %s %s" % (client, sname))
155 self.remove_stream(client, stream)
156 self._current_stream = None
157 continue
158
159
160
161 elif event & select.EPOLLHUP:
162 assert fdev & E_READ, "fdev 0x%x & E_READ" % fdev
163 self._debug("EPOLLHUP fd=%d sname=%s %s (%s)" % \
164 (fd, sname, client, client.streams))
165 self.remove_stream(client, stream)
166 self._current_stream = None
167 continue
168
169
170 if event & select.EPOLLOUT:
171 self._debug("EPOLLOUT fd=%d sname=%s %s (%s)" % \
172 (fd, sname, client, client.streams))
173 assert fdev & E_WRITE
174 assert stream.events & fdev, (stream.events, fdev)
175 self.modify(client, sname, 0, fdev)
176 client._handle_write(sname)
177
178 self._current_stream = None
179
180
181 if client.registered:
182 self.set_events(client, stream)
183
184
185 if timeout > 0 and time.time() >= start_time + timeout:
186 raise EngineTimeoutException()
187
188
189 self.fire_timers()
190
191 self._debug("LOOP EXIT evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" % \
192 (self.evlooprefcnt, self.reg_clifds, len(self.timerq)))
193