Package ClusterShell :: Package Engine :: Module Poll
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Engine.Poll

  1  # 
  2  # Copyright (C) 2007-2016 CEA/DAM 
  3  # Copyright (C) 2016 Stephane Thiell <sthiell@stanford.edu> 
  4  # 
  5  # This file is part of ClusterShell. 
  6  # 
  7  # ClusterShell is free software; you can redistribute it and/or 
  8  # modify it under the terms of the GNU Lesser General Public 
  9  # License as published by the Free Software Foundation; either 
 10  # version 2.1 of the License, or (at your option) any later version. 
 11  # 
 12  # ClusterShell is distributed in the hope that it will be useful, 
 13  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 14  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 15  # Lesser General Public License for more details. 
 16  # 
 17  # You should have received a copy of the GNU Lesser General Public 
 18  # License along with ClusterShell; if not, write to the Free Software 
 19  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 
 20   
 21  """ 
 22  A poll() based ClusterShell Engine. 
 23   
 24  The poll() system call is available on Linux and BSD. 
 25  """ 
 26   
 27  import errno 
 28  import logging 
 29  import select 
 30  import time 
 31   
 32  from ClusterShell.Engine.Engine import Engine, E_READ, E_WRITE 
 33  from ClusterShell.Engine.Engine import EngineException 
 34  from ClusterShell.Engine.Engine import EngineNotSupportedError 
 35  from ClusterShell.Engine.Engine import EngineTimeoutException 
 36  from ClusterShell.Worker.EngineClient import EngineClientEOF 
 37   
 38   
39 -class EnginePoll(Engine):
40 """ 41 Poll Engine 42 43 ClusterShell engine using the select.poll mechanism (Linux poll() 44 syscall). 45 """ 46 47 identifier = "poll" 48
49 - def __init__(self, info):
50 """ 51 Initialize Engine. 52 """ 53 Engine.__init__(self, info) 54 try: 55 # get a polling object 56 self.polling = select.poll() 57 except AttributeError: 58 raise EngineNotSupportedError(EnginePoll.identifier)
59
60 - def _register_specific(self, fd, event):
61 """Engine-specific fd registering. Called by Engine register.""" 62 if event & E_READ: 63 eventmask = select.POLLIN 64 else: 65 assert event & E_WRITE 66 eventmask = select.POLLOUT 67 68 self.polling.register(fd, eventmask)
69
70 - def _unregister_specific(self, fd, ev_is_set):
71 if ev_is_set: 72 self.polling.unregister(fd)
73
74 - def _modify_specific(self, fd, event, setvalue):
75 """ 76 Engine-specific modifications after a interesting event change for 77 a file descriptor. Called automatically by Engine register/unregister 78 and set_events(). For the poll() engine, it reg/unreg or modifies the 79 event mask associated to a file descriptor. 80 """ 81 self._debug("MODSPEC fd=%d event=%x setvalue=%d" % (fd, event, 82 setvalue)) 83 if setvalue: 84 self._register_specific(fd, event) 85 else: 86 self.polling.unregister(fd)
87
88 - def runloop(self, timeout):
89 """ 90 Poll engine run(): start clients and properly get replies 91 """ 92 if not timeout: 93 timeout = -1 94 95 start_time = time.time() 96 97 # run main event loop... 98 while self.evlooprefcnt > 0: 99 self._debug("LOOP evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" \ 100 % (self.evlooprefcnt, self.reg_clifds.keys(), \ 101 len(self.timerq))) 102 try: 103 timeo = self.timerq.nextfire_delay() 104 if timeout > 0 and timeo >= timeout: 105 # task timeout may invalidate clients timeout 106 self.timerq.clear() 107 timeo = timeout 108 elif timeo == -1: 109 timeo = timeout 110 111 self._current_loopcnt += 1 112 evlist = self.polling.poll(timeo * 1000.0 + 1.0) 113 114 except select.error, (ex_errno, ex_strerror): 115 # might get interrupted by a signal 116 if ex_errno == errno.EINTR: 117 continue 118 elif ex_errno == errno.EINVAL: 119 msg = "Increase RLIMIT_NOFILE?" 120 logging.getLogger(__name__).error(msg) 121 raise 122 123 for fd, event in evlist: 124 125 if event & select.POLLNVAL: 126 raise EngineException("Caught POLLNVAL on fd %d" % fd) 127 128 # get client instance 129 client, stream = self._fd2client(fd) 130 if client is None: 131 continue 132 133 fdev = stream.evmask 134 sname = stream.name 135 136 # process this stream 137 self._current_stream = stream 138 139 # check for poll error condition of some sort 140 if event & select.POLLERR: 141 self._debug("POLLERR %s" % client) 142 assert fdev & E_WRITE 143 self._debug("POLLERR: remove_stream sname %s fdev 0x%x" 144 % (sname, fdev)) 145 self.remove_stream(client, stream) 146 self._current_stream = None 147 continue 148 149 # check for data to read 150 if event & select.POLLIN: 151 assert fdev & E_READ 152 assert stream.events & fdev, (stream.events, fdev) 153 self.modify(client, sname, 0, fdev) 154 try: 155 client._handle_read(sname) 156 except EngineClientEOF: 157 self._debug("EngineClientEOF %s %s" % (client, sname)) 158 self.remove_stream(client, stream) 159 self._current_stream = None 160 continue 161 162 # or check for end of stream (do not handle both at the same 163 # time because handle_read() may perform a partial read) 164 elif event & select.POLLHUP: 165 self._debug("POLLHUP fd=%d %s (%s)" % 166 (fd, client.__class__.__name__, client.streams)) 167 self.remove_stream(client, stream) 168 self._current_stream = None 169 continue 170 171 # check for writing 172 if event & select.POLLOUT: 173 self._debug("POLLOUT fd=%d %s (%s)" % 174 (fd, client.__class__.__name__, client.streams)) 175 assert fdev == E_WRITE 176 assert stream.events & fdev 177 self.modify(client, sname, 0, fdev) 178 client._handle_write(sname) 179 180 self._current_stream = None 181 182 # apply any changes occured during processing 183 if client.registered: 184 self.set_events(client, stream) 185 186 # check for task runloop timeout 187 if timeout > 0 and time.time() >= start_time + timeout: 188 raise EngineTimeoutException() 189 190 # process clients timeout 191 self.fire_timers() 192 193 self._debug("LOOP EXIT evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" % \ 194 (self.evlooprefcnt, self.reg_clifds, len(self.timerq)))
195