Package ClusterShell :: Package Engine :: Module EPoll
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Engine.EPoll

  1  # 
  2  # Copyright (C) 2009-2015 CEA/DAM 
  3  # 
  4  # This file is part of ClusterShell. 
  5  # 
  6  # ClusterShell is free software; you can redistribute it and/or 
  7  # modify it under the terms of the GNU Lesser General Public 
  8  # License as published by the Free Software Foundation; either 
  9  # version 2.1 of the License, or (at your option) any later version. 
 10  # 
 11  # ClusterShell is distributed in the hope that it will be useful, 
 12  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 13  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 14  # Lesser General Public License for more details. 
 15  # 
 16  # You should have received a copy of the GNU Lesser General Public 
 17  # License along with ClusterShell; if not, write to the Free Software 
 18  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 
 19   
 20  """ 
 21  A ClusterShell Engine using epoll, an I/O event notification facility. 
 22   
 23  The epoll event distribution interface is available on Linux 2.6, and 
 24  has been included in Python 2.6. 
 25  """ 
 26   
 27  import errno 
 28  import select 
 29  import time 
 30   
 31  from ClusterShell.Engine.Engine import Engine, E_READ, E_WRITE 
 32  from ClusterShell.Engine.Engine import EngineNotSupportedError 
 33  from ClusterShell.Engine.Engine import EngineTimeoutException 
 34  from ClusterShell.Worker.EngineClient import EngineClientEOF 
 35   
 36   
37 -class EngineEPoll(Engine):
38 """ 39 EPoll Engine 40 41 ClusterShell Engine class using the select.epoll mechanism. 42 """ 43 44 identifier = "epoll" 45
46 - def __init__(self, info):
47 """ 48 Initialize Engine. 49 """ 50 Engine.__init__(self, info) 51 try: 52 # get an epoll object 53 self.epolling = select.epoll() 54 except AttributeError: 55 raise EngineNotSupportedError(EngineEPoll.identifier)
56
57 - def release(self):
58 """Release engine-specific resources.""" 59 self.epolling.close()
60
61 - def _register_specific(self, fd, event):
62 """Engine-specific fd registering. Called by Engine register.""" 63 if event & E_READ: 64 eventmask = select.EPOLLIN 65 else: 66 assert event & E_WRITE 67 eventmask = select.EPOLLOUT 68 69 self.epolling.register(fd, eventmask)
70
71 - def _unregister_specific(self, fd, ev_is_set):
72 """ 73 Engine-specific fd unregistering. Called by Engine unregister. 74 """ 75 self._debug("UNREGSPEC fd=%d ev_is_set=%x"% (fd, ev_is_set)) 76 if ev_is_set: 77 self.epolling.unregister(fd)
78
79 - def _modify_specific(self, fd, event, setvalue):
80 """ 81 Engine-specific modifications after a interesting event change for 82 a file descriptor. Called automatically by Engine set_events(). 83 For the epoll engine, it modifies the event mask associated to a file 84 descriptor. 85 """ 86 self._debug("MODSPEC fd=%d event=%x setvalue=%d" % (fd, event, 87 setvalue)) 88 if setvalue: 89 self._register_specific(fd, event) 90 else: 91 self.epolling.unregister(fd)
92
93 - def runloop(self, timeout):
94 """ 95 Run epoll main loop. 96 """ 97 if not timeout: 98 timeout = -1 99 100 start_time = time.time() 101 102 # run main event loop... 103 while self.evlooprefcnt > 0: 104 self._debug("LOOP evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" % \ 105 (self.evlooprefcnt, self.reg_clifds.keys(), 106 len(self.timerq))) 107 try: 108 timeo = self.timerq.nextfire_delay() 109 if timeout > 0 and timeo >= timeout: 110 # task timeout may invalidate clients timeout 111 self.timerq.clear() 112 timeo = timeout 113 elif timeo == -1: 114 timeo = timeout 115 116 self._current_loopcnt += 1 117 evlist = self.epolling.poll(timeo + 0.001) 118 119 except IOError, ex: 120 # might get interrupted by a signal 121 if ex.errno == errno.EINTR: 122 continue 123 124 for fd, event in evlist: 125 126 # get client instance 127 client, stream = self._fd2client(fd) 128 if client is None: 129 continue 130 131 fdev = stream.evmask 132 sname = stream.name 133 134 # set as current processed stream 135 self._current_stream = stream 136 137 # check for poll error condition of some sort 138 if event & select.EPOLLERR: 139 self._debug("EPOLLERR fd=%d sname=%s fdev=0x%x (%s)" % \ 140 (fd, sname, fdev, client)) 141 assert fdev & E_WRITE 142 self.remove_stream(client, stream) 143 self._current_stream = None 144 continue 145 146 # check for data to read 147 if event & select.EPOLLIN: 148 assert fdev & E_READ 149 assert stream.events & fdev, (stream.events, fdev) 150 self.modify(client, sname, 0, fdev) 151 try: 152 client._handle_read(sname) 153 except EngineClientEOF: 154 self._debug("EngineClientEOF %s %s" % (client, sname)) 155 self.remove_stream(client, stream) 156 self._current_stream = None 157 continue 158 159 # or check for end of stream (do not handle both at the same 160 # time because handle_read() may perform a partial read) 161 elif event & select.EPOLLHUP: 162 assert fdev & E_READ, "fdev 0x%x & E_READ" % fdev 163 self._debug("EPOLLHUP fd=%d sname=%s %s (%s)" % \ 164 (fd, sname, client, client.streams)) 165 self.remove_stream(client, stream) 166 self._current_stream = None 167 continue 168 169 # check for writing 170 if event & select.EPOLLOUT: 171 self._debug("EPOLLOUT fd=%d sname=%s %s (%s)" % \ 172 (fd, sname, client, client.streams)) 173 assert fdev & E_WRITE 174 assert stream.events & fdev, (stream.events, fdev) 175 self.modify(client, sname, 0, fdev) 176 client._handle_write(sname) 177 178 self._current_stream = None 179 180 # apply any changes occured during processing 181 if client.registered: 182 self.set_events(client, stream) 183 184 # check for task runloop timeout 185 if timeout > 0 and time.time() >= start_time + timeout: 186 raise EngineTimeoutException() 187 188 # process clients timeout 189 self.fire_timers() 190 191 self._debug("LOOP EXIT evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" % \ 192 (self.evlooprefcnt, self.reg_clifds, len(self.timerq)))
193