Package ClusterShell :: Module NodeUtils
[hide private]
[frames] | no frames]

Source Code for Module ClusterShell.NodeUtils

  1  # 
  2  # Copyright (C) 2010-2016 CEA/DAM 
  3  # Copyright (C) 2010-2016 Aurelien Degremont <aurelien.degremont@cea.fr> 
  4  # Copyright (C) 2015-2016 Stephane Thiell <sthiell@stanford.edu> 
  5  # 
  6  # This file is part of ClusterShell. 
  7  # 
  8  # ClusterShell is free software; you can redistribute it and/or 
  9  # modify it under the terms of the GNU Lesser General Public 
 10  # License as published by the Free Software Foundation; either 
 11  # version 2.1 of the License, or (at your option) any later version. 
 12  # 
 13  # ClusterShell is distributed in the hope that it will be useful, 
 14  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 15  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 16  # Lesser General Public License for more details. 
 17  # 
 18  # You should have received a copy of the GNU Lesser General Public 
 19  # License along with ClusterShell; if not, write to the Free Software 
 20  # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA 
 21   
 22  """ 
 23  Cluster nodes utility module 
 24   
 25  The NodeUtils module is a ClusterShell helper module that provides 
 26  supplementary services to manage nodes in a cluster. It is primarily 
 27  designed to enhance the NodeSet module providing some binding support 
 28  to external node groups sources in separate namespaces (example of 
 29  group sources are: files, jobs scheduler, custom scripts, etc.). 
 30  """ 
 31   
 32  import glob 
 33  import logging 
 34  import os 
 35  import shlex 
 36  import time 
 37   
 38  from ConfigParser import ConfigParser, NoOptionError, NoSectionError 
 39  from string import Template 
 40  from subprocess import Popen, PIPE 
41 42 43 -class GroupSourceError(Exception):
44 """Base GroupSource error exception"""
45 - def __init__(self, message, group_source):
46 Exception.__init__(self, message) 47 self.group_source = group_source
48
49 -class GroupSourceNoUpcall(GroupSourceError):
50 """Raised when upcall or method is not available"""
51
52 -class GroupSourceQueryFailed(GroupSourceError):
53 """Raised when a query failed (eg. no group found)"""
54
55 -class GroupResolverError(Exception):
56 """Base GroupResolver error"""
57
58 -class GroupResolverSourceError(GroupResolverError):
59 """Raised when upcall is not available"""
60
61 -class GroupResolverIllegalCharError(GroupResolverError):
62 """Raised when an illegal group character is encountered"""
63
64 -class GroupResolverConfigError(GroupResolverError):
65 """Raised when a configuration error is encountered"""
66 67 68 _DEFAULT_CACHE_TIME = 3600
69 70 71 -class GroupSource(object):
72 """ClusterShell Group Source class. 73 74 A Group Source object defines resolv_map, resolv_list, resolv_all and 75 optional resolv_reverse methods for node group resolution. It is 76 constituting a group resolution namespace. 77 """ 78
79 - def __init__(self, name, groups=None, allgroups=None):
80 """Initialize GroupSource 81 82 :param name: group source name 83 :param groups: group to nodes dict 84 :param allgroups: optional _all groups_ result (string) 85 """ 86 self.name = name 87 self.groups = groups or {} # we avoid the use of {} as default argument 88 self.allgroups = allgroups 89 self.has_reverse = False
90
91 - def resolv_map(self, group):
92 """Get nodes from group `group`""" 93 return self.groups.get(group, '')
94
95 - def resolv_list(self):
96 """Return a list of all group names for this group source""" 97 return self.groups.keys()
98
99 - def resolv_all(self):
100 """Return the content of all groups as defined by this GroupSource""" 101 if self.allgroups is None: 102 raise GroupSourceNoUpcall("All groups info not available", self) 103 return self.allgroups
104
105 - def resolv_reverse(self, node):
106 """ 107 Return the group name matching the provided node. 108 """ 109 raise GroupSourceNoUpcall("Not implemented", self)
110
111 112 -class FileGroupSource(GroupSource):
113 """File-based Group Source using loader for file format and cache expiry.""" 114
115 - def __init__(self, name, loader):
116 """ 117 Initialize FileGroupSource object. 118 119 :param name: group source name (eg. key name of yaml root dict) 120 :param loader: associated content loader (eg. YAMLGroupLoader object) 121 """ 122 # do not call super.__init__ to allow the use of r/o properties 123 self.name = name 124 self.loader = loader 125 self.has_reverse = False
126 127 @property
128 - def groups(self):
129 """groups property (dict)""" 130 return self.loader.groups(self.name)
131 132 @property
133 - def allgroups(self):
134 """allgroups property (string)""" 135 # FileGroupSource uses the 'all' group to implement resolv_all 136 return self.groups.get('all')
137
138 139 -class UpcallGroupSource(GroupSource):
140 """ 141 GroupSource class managing external calls for nodegroup support. 142 143 Upcall results are cached for a customizable amount of time. This is 144 controlled by `cache_time` attribute. Default is 3600 seconds. 145 """ 146
147 - def __init__(self, name, map_upcall, all_upcall=None, 148 list_upcall=None, reverse_upcall=None, cfgdir=None, 149 cache_time=None):
150 GroupSource.__init__(self, name) 151 self.verbosity = 0 # deprecated 152 self.cfgdir = cfgdir 153 self.logger = logging.getLogger(__name__) 154 155 # Supported external upcalls 156 self.upcalls = {} 157 self.upcalls['map'] = map_upcall 158 if all_upcall: 159 self.upcalls['all'] = all_upcall 160 if list_upcall: 161 self.upcalls['list'] = list_upcall 162 if reverse_upcall: 163 self.upcalls['reverse'] = reverse_upcall 164 self.has_reverse = True 165 166 # Cache upcall data 167 if cache_time is None: 168 self.cache_time = _DEFAULT_CACHE_TIME 169 else: 170 self.cache_time = cache_time 171 self._cache = {} 172 self.clear_cache()
173
174 - def clear_cache(self):
175 """ 176 Remove all previously cached upcall results whatever their lifetime is. 177 """ 178 self._cache = { 179 'map': {}, 180 'reverse': {} 181 }
182
183 - def _upcall_read(self, cmdtpl, args=dict()):
184 """ 185 Invoke the specified upcall command, raise an Exception if 186 something goes wrong and return the command output otherwise. 187 """ 188 cmdline = Template(self.upcalls[cmdtpl]).safe_substitute(args) 189 self.logger.debug("EXEC '%s'", cmdline) 190 proc = Popen(cmdline, stdout=PIPE, shell=True, cwd=self.cfgdir) 191 output = proc.communicate()[0].strip() 192 self.logger.debug("READ '%s'", output) 193 if proc.returncode != 0: 194 self.logger.debug("ERROR '%s' returned %d", cmdline, 195 proc.returncode) 196 raise GroupSourceQueryFailed(cmdline, self) 197 return output
198
199 - def _upcall_cache(self, upcall, cache, key, **args):
200 """ 201 Look for `key' in provided `cache'. If not found, call the 202 corresponding `upcall'. 203 204 If `key' is missing, it is added to provided `cache'. Each entry in a 205 cache is kept only for a limited time equal to self.cache_time . 206 """ 207 if not self.upcalls.get(upcall): 208 raise GroupSourceNoUpcall(upcall, self) 209 210 # Purge expired data from cache 211 if key in cache and cache[key][1] < time.time(): 212 self.logger.debug("PURGE EXPIRED (%d)'%s'", cache[key][1], key) 213 del cache[key] 214 215 # Fetch the data if unknown of just purged 216 if key not in cache: 217 cache_expiry = time.time() + self.cache_time 218 # $CFGDIR and $SOURCE always replaced 219 args['CFGDIR'] = self.cfgdir 220 args['SOURCE'] = self.name 221 cache[key] = (self._upcall_read(upcall, args), cache_expiry) 222 223 return cache[key][0]
224
225 - def resolv_map(self, group):
226 """ 227 Get nodes from group 'group', using the cached value if 228 available. 229 """ 230 return self._upcall_cache('map', self._cache['map'], group, GROUP=group)
231
232 - def resolv_list(self):
233 """ 234 Return a list of all group names for this group source, using 235 the cached value if available. 236 """ 237 return self._upcall_cache('list', self._cache, 'list')
238
239 - def resolv_all(self):
240 """ 241 Return the content of special group ALL, using the cached value 242 if available. 243 """ 244 return self._upcall_cache('all', self._cache, 'all')
245
246 - def resolv_reverse(self, node):
247 """ 248 Return the group name matching the provided node, using the 249 cached value if available. 250 """ 251 return self._upcall_cache('reverse', self._cache['reverse'], node, 252 NODE=node)
253
254 255 -class YAMLGroupLoader(object):
256 """ 257 YAML group file loader/reloader. 258 259 Load or reload a YAML multi group sources file: 260 261 - create GroupSource objects 262 - gather groups dict content on load 263 - reload the file once cache_time has expired 264 """ 265
266 - def __init__(self, filename, cache_time=None):
267 """ 268 Initialize YAMLGroupLoader and load file. 269 270 :param filename: YAML file path 271 :param cache_time: cache time (seconds) 272 """ 273 if cache_time is None: 274 self.cache_time = _DEFAULT_CACHE_TIME 275 else: 276 self.cache_time = cache_time 277 self.cache_expiry = 0 278 self.filename = filename 279 self.sources = {} 280 self._groups = {} 281 # must be loaded after initialization so self.sources is set 282 self._load()
283
284 - def _load(self):
285 """Load or reload YAML group file to create GroupSource objects.""" 286 yamlfile = open(self.filename) # later use: with open(filepath) as yfile 287 try: 288 try: 289 import yaml 290 sources = yaml.load(yamlfile) 291 except ImportError, exc: 292 msg = "Disable autodir or install PyYAML!" 293 raise GroupResolverConfigError("%s (%s)" % (str(exc), msg)) 294 except yaml.YAMLError, exc: 295 raise GroupResolverConfigError("%s: %s" % (self.filename, exc)) 296 finally: 297 yamlfile.close() 298 299 # NOTE: change to isinstance(sources, collections.Mapping) with py2.6+ 300 if not isinstance(sources, dict): 301 fmt = "%s: invalid content (base is not a dict)" 302 raise GroupResolverConfigError(fmt % self.filename) 303 304 first = not self.sources 305 306 for srcname, groups in sources.items(): 307 308 if not isinstance(groups, dict): 309 fmt = "%s: invalid content (group source '%s' is not a dict)" 310 raise GroupResolverConfigError(fmt % (self.filename, srcname)) 311 312 if first: 313 self._groups[srcname] = groups 314 self.sources[srcname] = FileGroupSource(srcname, self) 315 elif srcname in self.sources: 316 # update groups of existing source 317 self._groups[srcname] = groups 318 # else: cannot add new source on reload - just ignore it 319 320 # groups are loaded, set cache expiry 321 self.cache_expiry = time.time() + self.cache_time
322
323 - def __iter__(self):
324 """Iterate over GroupSource objects.""" 325 # safe as long as self.sources is set at init (once) 326 return self.sources.itervalues()
327
328 - def groups(self, sourcename):
329 """ 330 Groups dict accessor for sourcename. 331 332 This method is called by associated FileGroupSource objects and simply 333 returns dict content, after reloading file if cache_time has expired. 334 """ 335 if self.cache_expiry < time.time(): 336 # reload whole file if cache time expired 337 self._load() 338 339 return self._groups[sourcename]
340
341 342 -class GroupResolver(object):
343 """ 344 Base class GroupResolver that aims to provide node/group resolution 345 from multiple GroupSources. 346 347 A GroupResolver object might be initialized with a default 348 GroupSource object, that is later used when group resolution is 349 requested with no source information. As of version 1.7, a set of 350 illegal group characters may also be provided for sanity check 351 (raising GroupResolverIllegalCharError when found). 352 """ 353
354 - def __init__(self, default_source=None, illegal_chars=None):
355 """Initialize GroupResolver object.""" 356 self._sources = {} 357 self._default_source = default_source 358 self.illegal_chars = illegal_chars or set() 359 if default_source: 360 self._sources[default_source.name] = default_source
361
362 - def set_verbosity(self, value):
363 """Set debugging verbosity value (DEPRECATED: use logging.DEBUG).""" 364 for source in self._sources.itervalues(): 365 source.verbosity = value
366
367 - def add_source(self, group_source):
368 """Add a GroupSource to this resolver.""" 369 if group_source.name in self._sources: 370 raise ValueError("GroupSource '%s': name collision" % \ 371 group_source.name) 372 self._sources[group_source.name] = group_source
373
374 - def sources(self):
375 """Get the list of all resolver source names. """ 376 srcs = list(self._sources.keys()) 377 if srcs and srcs[0] is not self._default_source: 378 srcs.remove(self._default_source.name) 379 srcs.insert(0, self._default_source.name) 380 return srcs
381
382 - def _get_default_source_name(self):
383 """Get default source name of resolver.""" 384 if self._default_source is None: 385 return None 386 return self._default_source.name
387
388 - def _set_default_source_name(self, sourcename):
389 """Set default source of resolver (by name).""" 390 try: 391 self._default_source = self._sources[sourcename] 392 except KeyError: 393 raise GroupResolverSourceError(sourcename)
394 395 default_source_name = property(_get_default_source_name, 396 _set_default_source_name) 397
398 - def _list_nodes(self, source, what, *args):
399 """Helper method that returns a list of results (nodes) when 400 the source is defined.""" 401 result = [] 402 assert source 403 raw = getattr(source, 'resolv_%s' % what)(*args) 404 for line in raw.splitlines(): 405 [result.append(x) for x in line.strip().split()] 406 return result
407
408 - def _list_groups(self, source, what, *args):
409 """Helper method that returns a list of results (groups) when 410 the source is defined.""" 411 result = [] 412 assert source 413 raw = getattr(source, 'resolv_%s' % what)(*args) 414 415 try: 416 grpiter = raw.splitlines() 417 except AttributeError: 418 grpiter = raw 419 420 for line in grpiter: 421 for grpstr in line.strip().split(): 422 if self.illegal_chars.intersection(grpstr): 423 errmsg = ' '.join(self.illegal_chars.intersection(grpstr)) 424 raise GroupResolverIllegalCharError(errmsg) 425 result.append(grpstr) 426 return result
427
428 - def _source(self, namespace):
429 """Helper method that returns the source by namespace name.""" 430 if not namespace: 431 source = self._default_source 432 else: 433 source = self._sources.get(namespace) 434 if not source: 435 raise GroupResolverSourceError(namespace or "<default>") 436 return source
437
438 - def group_nodes(self, group, namespace=None):
439 """ 440 Find nodes for specified group name and optional namespace. 441 """ 442 source = self._source(namespace) 443 return self._list_nodes(source, 'map', group)
444
445 - def all_nodes(self, namespace=None):
446 """ 447 Find all nodes. You may specify an optional namespace. 448 """ 449 source = self._source(namespace) 450 return self._list_nodes(source, 'all')
451
452 - def grouplist(self, namespace=None):
453 """ 454 Get full group list. You may specify an optional 455 namespace. 456 """ 457 source = self._source(namespace) 458 return self._list_groups(source, 'list')
459
460 - def has_node_groups(self, namespace=None):
461 """ 462 Return whether finding group list for a specified node is 463 supported by the resolver (in optional namespace). 464 """ 465 try: 466 return self._source(namespace).has_reverse 467 except GroupResolverSourceError: 468 return False
469
470 - def node_groups(self, node, namespace=None):
471 """ 472 Find group list for specified node and optional namespace. 473 """ 474 source = self._source(namespace) 475 return self._list_groups(source, 'reverse', node)
476
477 478 -class GroupResolverConfig(GroupResolver):
479 """ 480 GroupResolver class that is able to automatically setup its 481 GroupSource's from a configuration file. This is the default 482 resolver for NodeSet. 483 """ 484 SECTION_MAIN = 'Main' 485
486 - def __init__(self, filenames, illegal_chars=None):
487 """ 488 Initialize GroupResolverConfig from filenames. Only the first 489 accessible config filename is loaded. 490 """ 491 GroupResolver.__init__(self, illegal_chars=illegal_chars) 492 493 # support single or multiple config filenames 494 self.config = ConfigParser() 495 parsed = self.config.read(filenames) 496 497 # check if at least one parsable config file has been found, otherwise 498 # continue with an empty self._sources 499 if parsed: 500 # for proper $CFGDIR selection, take last parsed configfile only 501 self._parse_config(os.path.dirname(parsed[-1]))
502
503 - def _parse_config(self, cfg_dirname):
504 """parse config using relative dir cfg_dirname""" 505 # parse Main.confdir 506 try: 507 if self.config.has_option(self.SECTION_MAIN, 'groupsdir'): 508 opt_confdir = 'groupsdir' 509 else: 510 opt_confdir = 'confdir' 511 512 # keep track of loaded confdirs 513 loaded_confdirs = set() 514 515 confdirstr = self.config.get(self.SECTION_MAIN, opt_confdir) 516 for confdir in shlex.split(confdirstr): 517 # substitute $CFGDIR, set to the highest priority clustershell 518 # configuration directory that has been found 519 confdir = Template(confdir).safe_substitute(CFGDIR=cfg_dirname) 520 confdir = os.path.normpath(confdir) 521 if confdir in loaded_confdirs: 522 continue # load each confdir only once 523 loaded_confdirs.add(confdir) 524 if not os.path.isdir(confdir): 525 if not os.path.exists(confdir): 526 continue 527 raise GroupResolverConfigError("Defined confdir %s is not" 528 " a directory" % confdir) 529 # add sources declared in groups.conf.d file parts 530 for groupsfn in sorted(glob.glob('%s/*.conf' % confdir)): 531 grpcfg = ConfigParser() 532 grpcfg.read(groupsfn) # ignore files that cannot be read 533 self._sources_from_cfg(grpcfg, confdir) 534 except (NoSectionError, NoOptionError): 535 pass 536 537 # parse Main.autodir 538 try: 539 # keep track of loaded autodirs 540 loaded_autodirs = set() 541 542 autodirstr = self.config.get(self.SECTION_MAIN, 'autodir') 543 for autodir in shlex.split(autodirstr): 544 # substitute $CFGDIR, set to the highest priority clustershell 545 # configuration directory that has been found 546 autodir = Template(autodir).safe_substitute(CFGDIR=cfg_dirname) 547 autodir = os.path.normpath(autodir) 548 if autodir in loaded_autodirs: 549 continue # load each autodir only once 550 loaded_autodirs.add(autodir) 551 if not os.path.isdir(autodir): 552 if not os.path.exists(autodir): 553 continue 554 raise GroupResolverConfigError("Defined autodir %s is not" 555 " a directory" % autodir) 556 # add auto sources declared in groups.d YAML files 557 for autosfn in sorted(glob.glob('%s/*.yaml' % autodir)): 558 self._sources_from_yaml(autosfn) 559 except (NoSectionError, NoOptionError): 560 pass 561 562 # add sources declared directly in groups.conf 563 self._sources_from_cfg(self.config, cfg_dirname) 564 565 # parse Main.default 566 try: 567 def_sourcename = self.config.get('Main', 'default') 568 # warning: default_source_name is a property 569 self.default_source_name = def_sourcename 570 except (NoSectionError, NoOptionError): 571 pass 572 except GroupResolverSourceError: 573 if def_sourcename: # allow empty Main.default 574 fmt = 'Default group source not found: "%s"' 575 raise GroupResolverConfigError(fmt % self.config.get('Main', 576 'default')) 577 # pick random default source if not provided by config 578 if not self.default_source_name and self._sources: 579 self.default_source_name = self._sources.keys()[0]
580
581 - def _sources_from_cfg(self, cfg, cfgdir):
582 """ 583 Instantiate as many UpcallGroupSources needed from cfg object, 584 cfgdir (CWD for callbacks) and cfg filename. 585 """ 586 try: 587 for section in cfg.sections(): 588 # Support grouped sections: section1,section2,section3 589 for srcname in section.split(','): 590 if srcname != self.SECTION_MAIN: 591 # only map is a mandatory upcall 592 map_upcall = cfg.get(section, 'map', True) 593 all_upcall = list_upcall = reverse_upcall = ctime = None 594 if cfg.has_option(section, 'all'): 595 all_upcall = cfg.get(section, 'all', True) 596 if cfg.has_option(section, 'list'): 597 list_upcall = cfg.get(section, 'list', True) 598 if cfg.has_option(section, 'reverse'): 599 reverse_upcall = cfg.get(section, 'reverse', True) 600 if cfg.has_option(section, 'cache_time'): 601 ctime = float(cfg.get(section, 'cache_time', True)) 602 # add new group source 603 self.add_source(UpcallGroupSource(srcname, map_upcall, 604 all_upcall, 605 list_upcall, 606 reverse_upcall, 607 cfgdir, ctime)) 608 except (NoSectionError, NoOptionError, ValueError), exc: 609 raise GroupResolverConfigError(str(exc))
610
611 - def _sources_from_yaml(self, filepath):
612 """Load source(s) from YAML file.""" 613 for source in YAMLGroupLoader(filepath): 614 self.add_source(source)
615