Package ClusterShell :: Package Engine :: Module Select
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.Engine.Select

  1  # 
  2  # Copyright (C) 2009-2016 CEA/DAM 
  3  # Copyright (C) 2009-2012 Henri Doreau <henri.doreau@cea.fr> 
  4  # Copyright (C) 2009-2012 Aurelien Degremont <aurelien.degremont@cea.fr> 
  5  # Copyright (C) 2016 Stephane Thiell <sthiell@stanford.edu> 
  6  # 
  7  # This file is part of ClusterShell. 
  8  # 
  9  # ClusterShell is free software; you can redistribute it and/or 
 10  # modify it under the terms of the GNU Lesser General Public 
 11  # License as published by the Free Software Foundation; either 
 12  # version 2.1 of the License, or (at your option) any later version. 
 13  # 
 14  # ClusterShell is distributed in the hope that it will be useful, 
 15  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 16  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 17  # Lesser General Public License for more details. 
 18  # 
 19  # You should have received a copy of the GNU Lesser General Public 
 20  # License along with ClusterShell; if not, write to the Free Software 
 21  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 
 22   
 23  """ 
 24  A select() based ClusterShell Engine. 
 25   
 26  The select() system call is available on almost every UNIX-like systems. 
 27  """ 
 28   
 29  import errno 
 30  import select 
 31  import sys 
 32  import time 
 33   
 34  from ClusterShell.Engine.Engine import Engine, E_READ, E_WRITE 
 35  from ClusterShell.Engine.Engine import EngineTimeoutException 
 36  from ClusterShell.Worker.EngineClient import EngineClientEOF 
 37   
 38   
39 -class EngineSelect(Engine):
40 """ 41 Select Engine 42 43 ClusterShell engine using the select.select mechanism 44 """ 45 46 identifier = "select" 47
48 - def __init__(self, info):
49 """ 50 Initialize Engine. 51 """ 52 Engine.__init__(self, info) 53 self._fds_r = [] 54 self._fds_w = []
55
56 - def _register_specific(self, fd, event):
57 """ 58 Engine-specific fd registering. Called by Engine register. 59 """ 60 if event & E_READ: 61 self._fds_r.append(fd) 62 else: 63 assert event & E_WRITE 64 self._fds_w.append(fd)
65
66 - def _unregister_specific(self, fd, ev_is_set):
67 """ 68 Engine-specific fd unregistering. Called by Engine unregister. 69 """ 70 if ev_is_set or True: 71 if fd in self._fds_r: 72 self._fds_r.remove(fd) 73 if fd in self._fds_w: 74 self._fds_w.remove(fd)
75
76 - def _modify_specific(self, fd, event, setvalue):
77 """ 78 Engine-specific modifications after a interesting event change 79 for a file descriptor. Called automatically by Engine 80 register/unregister and set_events(). For the select() engine, 81 it appends/remove the fd to/from the concerned fd_sets. 82 """ 83 self._debug("MODSPEC fd=%d event=%x setvalue=%d" % (fd, event, 84 setvalue)) 85 if setvalue: 86 self._register_specific(fd, event) 87 else: 88 self._unregister_specific(fd, True)
89
90 - def runloop(self, timeout):
91 """ 92 Select engine run(): start clients and properly get replies 93 """ 94 if not timeout: 95 timeout = -1 96 97 start_time = time.time() 98 99 # run main event loop... 100 while self.evlooprefcnt > 0: 101 self._debug("LOOP evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" % 102 (self.evlooprefcnt, self.reg_clifds.keys(), len(self.timerq))) 103 try: 104 timeo = self.timerq.nextfire_delay() 105 if timeout > 0 and timeo >= timeout: 106 # task timeout may invalidate clients timeout 107 self.timerq.clear() 108 timeo = timeout 109 elif timeo == -1: 110 timeo = timeout 111 112 self._current_loopcnt += 1 113 if timeo >= 0: 114 r_ready, w_ready, x_ready = \ 115 select.select(self._fds_r, self._fds_w, [], timeo) 116 else: 117 # no timeout specified, do not supply the timeout argument 118 r_ready, w_ready, x_ready = \ 119 select.select(self._fds_r, self._fds_w, []) 120 except select.error, (ex_errno, ex_strerror): 121 # might get interrupted by a signal 122 if ex_errno == errno.EINTR: 123 continue 124 elif ex_errno in [errno.EINVAL, errno.EBADF, errno.ENOMEM]: 125 msg = "Increase RLIMIT_NOFILE?" 126 logging.getLogger(__name__).error(msg) 127 raise 128 129 # iterate over fd on which events occured 130 for fd in set(r_ready) | set(w_ready): 131 132 # get client instance 133 client, stream = self._fd2client(fd) 134 if client is None: 135 continue 136 137 fdev = stream.evmask 138 sname = stream.name 139 140 # process this stream 141 self._current_stream = stream 142 143 # check for possible unblocking read on this fd 144 if fd in r_ready: 145 self._debug("R_READY fd=%d %s (%s)" % (fd, 146 client.__class__.__name__, client.streams)) 147 assert fdev & E_READ 148 assert stream.events & fdev 149 self.modify(client, sname, 0, fdev) 150 try: 151 client._handle_read(sname) 152 except EngineClientEOF: 153 self._debug("EngineClientEOF %s" % client) 154 self.remove_stream(client, stream) 155 156 # check for writing 157 if fd in w_ready: 158 self._debug("W_READY fd=%d %s (%s)" % (fd, 159 client.__class__.__name__, client.streams)) 160 assert fdev == E_WRITE 161 assert stream.events & fdev 162 self.modify(client, sname, 0, fdev) 163 client._handle_write(sname) 164 165 # post processing 166 self._current_stream = None 167 168 # apply any changes occured during processing 169 if client.registered: 170 self.set_events(client, stream) 171 172 # check for task runloop timeout 173 if timeout > 0 and time.time() >= start_time + timeout: 174 raise EngineTimeoutException() 175 176 # process clients timeout 177 self.fire_timers() 178 179 self._debug("LOOP EXIT evlooprefcnt=%d (reg_clifds=%s) (timers=%d)" % 180 (self.evlooprefcnt, self.reg_clifds, len(self.timerq)))
181