1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 Cluster nodes utility module
24
25 The NodeUtils module is a ClusterShell helper module that provides
26 supplementary services to manage nodes in a cluster. It is primarily
27 designed to enhance the NodeSet module providing some binding support
28 to external node groups sources in separate namespaces (example of
29 group sources are: files, jobs scheduler, custom scripts, etc.).
30 """
31
32 import glob
33 import logging
34 import os
35 import shlex
36 import time
37
38 from ConfigParser import ConfigParser, NoOptionError, NoSectionError
39 from string import Template
40 from subprocess import Popen, PIPE
44 """Base GroupSource error exception"""
45 - def __init__(self, message, group_source):
48
50 """Raised when upcall or method is not available"""
51
53 """Raised when a query failed (eg. no group found)"""
54
56 """Base GroupResolver error"""
57
59 """Raised when upcall is not available"""
60
62 """Raised when an illegal group character is encountered"""
63
65 """Raised when a configuration error is encountered"""
66
67
68 _DEFAULT_CACHE_TIME = 3600
72 """ClusterShell Group Source class.
73
74 A Group Source object defines resolv_map, resolv_list, resolv_all and
75 optional resolv_reverse methods for node group resolution. It is
76 constituting a group resolution namespace.
77 """
78
79 - def __init__(self, name, groups=None, allgroups=None):
80 """Initialize GroupSource
81
82 :param name: group source name
83 :param groups: group to nodes dict
84 :param allgroups: optional _all groups_ result (string)
85 """
86 self.name = name
87 self.groups = groups or {}
88 self.allgroups = allgroups
89 self.has_reverse = False
90
92 """Get nodes from group `group`"""
93 return self.groups.get(group, '')
94
96 """Return a list of all group names for this group source"""
97 return self.groups.keys()
98
100 """Return the content of all groups as defined by this GroupSource"""
101 if self.allgroups is None:
102 raise GroupSourceNoUpcall("All groups info not available", self)
103 return self.allgroups
104
106 """
107 Return the group name matching the provided node.
108 """
109 raise GroupSourceNoUpcall("Not implemented", self)
110
113 """File-based Group Source using loader for file format and cache expiry."""
114
116 """
117 Initialize FileGroupSource object.
118
119 :param name: group source name (eg. key name of yaml root dict)
120 :param loader: associated content loader (eg. YAMLGroupLoader object)
121 """
122
123 self.name = name
124 self.loader = loader
125 self.has_reverse = False
126
127 @property
129 """groups property (dict)"""
130 return self.loader.groups(self.name)
131
132 @property
134 """allgroups property (string)"""
135
136 return self.groups.get('all')
137
140 """
141 GroupSource class managing external calls for nodegroup support.
142
143 Upcall results are cached for a customizable amount of time. This is
144 controlled by `cache_time` attribute. Default is 3600 seconds.
145 """
146
147 - def __init__(self, name, map_upcall, all_upcall=None,
148 list_upcall=None, reverse_upcall=None, cfgdir=None,
149 cache_time=None):
150 GroupSource.__init__(self, name)
151 self.verbosity = 0
152 self.cfgdir = cfgdir
153 self.logger = logging.getLogger(__name__)
154
155
156 self.upcalls = {}
157 self.upcalls['map'] = map_upcall
158 if all_upcall:
159 self.upcalls['all'] = all_upcall
160 if list_upcall:
161 self.upcalls['list'] = list_upcall
162 if reverse_upcall:
163 self.upcalls['reverse'] = reverse_upcall
164 self.has_reverse = True
165
166
167 if cache_time is None:
168 self.cache_time = _DEFAULT_CACHE_TIME
169 else:
170 self.cache_time = cache_time
171 self._cache = {}
172 self.clear_cache()
173
175 """
176 Remove all previously cached upcall results whatever their lifetime is.
177 """
178 self._cache = {
179 'map': {},
180 'reverse': {}
181 }
182
184 """
185 Invoke the specified upcall command, raise an Exception if
186 something goes wrong and return the command output otherwise.
187 """
188 cmdline = Template(self.upcalls[cmdtpl]).safe_substitute(args)
189 self.logger.debug("EXEC '%s'", cmdline)
190 proc = Popen(cmdline, stdout=PIPE, shell=True, cwd=self.cfgdir)
191 output = proc.communicate()[0].strip()
192 self.logger.debug("READ '%s'", output)
193 if proc.returncode != 0:
194 self.logger.debug("ERROR '%s' returned %d", cmdline,
195 proc.returncode)
196 raise GroupSourceQueryFailed(cmdline, self)
197 return output
198
200 """
201 Look for `key' in provided `cache'. If not found, call the
202 corresponding `upcall'.
203
204 If `key' is missing, it is added to provided `cache'. Each entry in a
205 cache is kept only for a limited time equal to self.cache_time .
206 """
207 if not self.upcalls.get(upcall):
208 raise GroupSourceNoUpcall(upcall, self)
209
210
211 if key in cache and cache[key][1] < time.time():
212 self.logger.debug("PURGE EXPIRED (%d)'%s'", cache[key][1], key)
213 del cache[key]
214
215
216 if key not in cache:
217 cache_expiry = time.time() + self.cache_time
218
219 args['CFGDIR'] = self.cfgdir
220 args['SOURCE'] = self.name
221 cache[key] = (self._upcall_read(upcall, args), cache_expiry)
222
223 return cache[key][0]
224
226 """
227 Get nodes from group 'group', using the cached value if
228 available.
229 """
230 return self._upcall_cache('map', self._cache['map'], group, GROUP=group)
231
233 """
234 Return a list of all group names for this group source, using
235 the cached value if available.
236 """
237 return self._upcall_cache('list', self._cache, 'list')
238
240 """
241 Return the content of special group ALL, using the cached value
242 if available.
243 """
244 return self._upcall_cache('all', self._cache, 'all')
245
247 """
248 Return the group name matching the provided node, using the
249 cached value if available.
250 """
251 return self._upcall_cache('reverse', self._cache['reverse'], node,
252 NODE=node)
253
256 """
257 YAML group file loader/reloader.
258
259 Load or reload a YAML multi group sources file:
260
261 - create GroupSource objects
262 - gather groups dict content on load
263 - reload the file once cache_time has expired
264 """
265
266 - def __init__(self, filename, cache_time=None):
267 """
268 Initialize YAMLGroupLoader and load file.
269
270 :param filename: YAML file path
271 :param cache_time: cache time (seconds)
272 """
273 if cache_time is None:
274 self.cache_time = _DEFAULT_CACHE_TIME
275 else:
276 self.cache_time = cache_time
277 self.cache_expiry = 0
278 self.filename = filename
279 self.sources = {}
280 self._groups = {}
281
282 self._load()
283
285 """Load or reload YAML group file to create GroupSource objects."""
286 yamlfile = open(self.filename)
287 try:
288 try:
289 import yaml
290 sources = yaml.load(yamlfile)
291 except ImportError, exc:
292 msg = "Disable autodir or install PyYAML!"
293 raise GroupResolverConfigError("%s (%s)" % (str(exc), msg))
294 except yaml.YAMLError, exc:
295 raise GroupResolverConfigError("%s: %s" % (self.filename, exc))
296 finally:
297 yamlfile.close()
298
299
300 if not isinstance(sources, dict):
301 fmt = "%s: invalid content (base is not a dict)"
302 raise GroupResolverConfigError(fmt % self.filename)
303
304 first = not self.sources
305
306 for srcname, groups in sources.items():
307
308 if not isinstance(groups, dict):
309 fmt = "%s: invalid content (group source '%s' is not a dict)"
310 raise GroupResolverConfigError(fmt % (self.filename, srcname))
311
312 if first:
313 self._groups[srcname] = groups
314 self.sources[srcname] = FileGroupSource(srcname, self)
315 elif srcname in self.sources:
316
317 self._groups[srcname] = groups
318
319
320
321 self.cache_expiry = time.time() + self.cache_time
322
324 """Iterate over GroupSource objects."""
325
326 return self.sources.itervalues()
327
328 - def groups(self, sourcename):
329 """
330 Groups dict accessor for sourcename.
331
332 This method is called by associated FileGroupSource objects and simply
333 returns dict content, after reloading file if cache_time has expired.
334 """
335 if self.cache_expiry < time.time():
336
337 self._load()
338
339 return self._groups[sourcename]
340
343 """
344 Base class GroupResolver that aims to provide node/group resolution
345 from multiple GroupSources.
346
347 A GroupResolver object might be initialized with a default
348 GroupSource object, that is later used when group resolution is
349 requested with no source information. As of version 1.7, a set of
350 illegal group characters may also be provided for sanity check
351 (raising GroupResolverIllegalCharError when found).
352 """
353
354 - def __init__(self, default_source=None, illegal_chars=None):
355 """Initialize GroupResolver object."""
356 self._sources = {}
357 self._default_source = default_source
358 self.illegal_chars = illegal_chars or set()
359 if default_source:
360 self._sources[default_source.name] = default_source
361
363 """Set debugging verbosity value (DEPRECATED: use logging.DEBUG)."""
364 for source in self._sources.itervalues():
365 source.verbosity = value
366
368 """Add a GroupSource to this resolver."""
369 if group_source.name in self._sources:
370 raise ValueError("GroupSource '%s': name collision" % \
371 group_source.name)
372 self._sources[group_source.name] = group_source
373
375 """Get the list of all resolver source names. """
376 srcs = list(self._sources.keys())
377 if srcs and srcs[0] is not self._default_source:
378 srcs.remove(self._default_source.name)
379 srcs.insert(0, self._default_source.name)
380 return srcs
381
383 """Get default source name of resolver."""
384 if self._default_source is None:
385 return None
386 return self._default_source.name
387
389 """Set default source of resolver (by name)."""
390 try:
391 self._default_source = self._sources[sourcename]
392 except KeyError:
393 raise GroupResolverSourceError(sourcename)
394
395 default_source_name = property(_get_default_source_name,
396 _set_default_source_name)
397
399 """Helper method that returns a list of results (nodes) when
400 the source is defined."""
401 result = []
402 assert source
403 raw = getattr(source, 'resolv_%s' % what)(*args)
404 for line in raw.splitlines():
405 [result.append(x) for x in line.strip().split()]
406 return result
407
409 """Helper method that returns a list of results (groups) when
410 the source is defined."""
411 result = []
412 assert source
413 raw = getattr(source, 'resolv_%s' % what)(*args)
414
415 try:
416 grpiter = raw.splitlines()
417 except AttributeError:
418 grpiter = raw
419
420 for line in grpiter:
421 for grpstr in line.strip().split():
422 if self.illegal_chars.intersection(grpstr):
423 errmsg = ' '.join(self.illegal_chars.intersection(grpstr))
424 raise GroupResolverIllegalCharError(errmsg)
425 result.append(grpstr)
426 return result
427
429 """Helper method that returns the source by namespace name."""
430 if not namespace:
431 source = self._default_source
432 else:
433 source = self._sources.get(namespace)
434 if not source:
435 raise GroupResolverSourceError(namespace or "<default>")
436 return source
437
439 """
440 Find nodes for specified group name and optional namespace.
441 """
442 source = self._source(namespace)
443 return self._list_nodes(source, 'map', group)
444
446 """
447 Find all nodes. You may specify an optional namespace.
448 """
449 source = self._source(namespace)
450 return self._list_nodes(source, 'all')
451
453 """
454 Get full group list. You may specify an optional
455 namespace.
456 """
457 source = self._source(namespace)
458 return self._list_groups(source, 'list')
459
461 """
462 Return whether finding group list for a specified node is
463 supported by the resolver (in optional namespace).
464 """
465 try:
466 return self._source(namespace).has_reverse
467 except GroupResolverSourceError:
468 return False
469
471 """
472 Find group list for specified node and optional namespace.
473 """
474 source = self._source(namespace)
475 return self._list_groups(source, 'reverse', node)
476
479 """
480 GroupResolver class that is able to automatically setup its
481 GroupSource's from a configuration file. This is the default
482 resolver for NodeSet.
483 """
484 SECTION_MAIN = 'Main'
485
486 - def __init__(self, filenames, illegal_chars=None):
487 """
488 Initialize GroupResolverConfig from filenames. Only the first
489 accessible config filename is loaded.
490 """
491 GroupResolver.__init__(self, illegal_chars=illegal_chars)
492
493
494 self.config = ConfigParser()
495 parsed = self.config.read(filenames)
496
497
498
499 if parsed:
500
501 self._parse_config(os.path.dirname(parsed[-1]))
502
504 """parse config using relative dir cfg_dirname"""
505
506 try:
507 if self.config.has_option(self.SECTION_MAIN, 'groupsdir'):
508 opt_confdir = 'groupsdir'
509 else:
510 opt_confdir = 'confdir'
511
512
513 loaded_confdirs = set()
514
515 confdirstr = self.config.get(self.SECTION_MAIN, opt_confdir)
516 for confdir in shlex.split(confdirstr):
517
518
519 confdir = Template(confdir).safe_substitute(CFGDIR=cfg_dirname)
520 confdir = os.path.normpath(confdir)
521 if confdir in loaded_confdirs:
522 continue
523 loaded_confdirs.add(confdir)
524 if not os.path.isdir(confdir):
525 if not os.path.exists(confdir):
526 continue
527 raise GroupResolverConfigError("Defined confdir %s is not"
528 " a directory" % confdir)
529
530 for groupsfn in sorted(glob.glob('%s/*.conf' % confdir)):
531 grpcfg = ConfigParser()
532 grpcfg.read(groupsfn)
533 self._sources_from_cfg(grpcfg, confdir)
534 except (NoSectionError, NoOptionError):
535 pass
536
537
538 try:
539
540 loaded_autodirs = set()
541
542 autodirstr = self.config.get(self.SECTION_MAIN, 'autodir')
543 for autodir in shlex.split(autodirstr):
544
545
546 autodir = Template(autodir).safe_substitute(CFGDIR=cfg_dirname)
547 autodir = os.path.normpath(autodir)
548 if autodir in loaded_autodirs:
549 continue
550 loaded_autodirs.add(autodir)
551 if not os.path.isdir(autodir):
552 if not os.path.exists(autodir):
553 continue
554 raise GroupResolverConfigError("Defined autodir %s is not"
555 " a directory" % autodir)
556
557 for autosfn in sorted(glob.glob('%s/*.yaml' % autodir)):
558 self._sources_from_yaml(autosfn)
559 except (NoSectionError, NoOptionError):
560 pass
561
562
563 self._sources_from_cfg(self.config, cfg_dirname)
564
565
566 try:
567 def_sourcename = self.config.get('Main', 'default')
568
569 self.default_source_name = def_sourcename
570 except (NoSectionError, NoOptionError):
571 pass
572 except GroupResolverSourceError:
573 if def_sourcename:
574 fmt = 'Default group source not found: "%s"'
575 raise GroupResolverConfigError(fmt % self.config.get('Main',
576 'default'))
577
578 if not self.default_source_name and self._sources:
579 self.default_source_name = self._sources.keys()[0]
580
582 """
583 Instantiate as many UpcallGroupSources needed from cfg object,
584 cfgdir (CWD for callbacks) and cfg filename.
585 """
586 try:
587 for section in cfg.sections():
588
589 for srcname in section.split(','):
590 if srcname != self.SECTION_MAIN:
591
592 map_upcall = cfg.get(section, 'map', True)
593 all_upcall = list_upcall = reverse_upcall = ctime = None
594 if cfg.has_option(section, 'all'):
595 all_upcall = cfg.get(section, 'all', True)
596 if cfg.has_option(section, 'list'):
597 list_upcall = cfg.get(section, 'list', True)
598 if cfg.has_option(section, 'reverse'):
599 reverse_upcall = cfg.get(section, 'reverse', True)
600 if cfg.has_option(section, 'cache_time'):
601 ctime = float(cfg.get(section, 'cache_time', True))
602
603 self.add_source(UpcallGroupSource(srcname, map_upcall,
604 all_upcall,
605 list_upcall,
606 reverse_upcall,
607 cfgdir, ctime))
608 except (NoSectionError, NoOptionError, ValueError), exc:
609 raise GroupResolverConfigError(str(exc))
610
615