Pyteomics documentation v2.1.5

pyteomics.auxiliary

Contents

Source code for pyteomics.auxiliary

"""
auxiliary - common functions and objects 
========================================

Math
----

  :py:func:`linear_regression` - a wrapper for numpy linear regression

Project infrastructure
----------------------

  :py:class:`PyteomicsError` - a pyteomics-specific exception

-------------------------------------------------------------------------------

"""

#   Copyright 2012 Anton Goloborodko, Lev Levitsky
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.

import numpy
from functools import wraps
from lxml import etree
from warnings import warn
from traceback import format_exc
import re
import operator
try: # Python 2.7
    from urllib2 import urlopen, URLError
except ImportError: # Python 3.x
    from urllib.request import urlopen, URLError
import sys

[docs]class PyteomicsError(Exception): """Exception raised for errors in Pyteomics library. Attributes ---------- message : str Error message. """ def __init__(self, msg): self.message = msg def __str__(self): return "Pyteomics error, message: %s" % (repr(self.message),)
[docs]def linear_regression(x, y, a=None, b=None): """Calculate coefficients of a linear regression y = a * x + b. Parameters ---------- x, y : array_like of float a : float, optional If specified then the slope coefficient is fixed and equals a. b : float, optional If specified then the free term is fixed and equals b. Returns ------- out : 4-tuple of float The structure is (a, b, r, stderr), where a -- slope coefficient, b -- free term, r -- Peason correlation coefficient, stderr -- standard deviation. """ if not isinstance(x, numpy.ndarray): x = numpy.array(x) if not isinstance(y, numpy.ndarray): y = numpy.array(y) if (a is not None and b is None): b = (y - a * x).mean() elif (a is not None and b is not None): pass else: a, b = numpy.polyfit(x, y, 1) r = numpy.corrcoef(x, y)[0, 1] stderr = (y - a * x - b).std() return a, b, r, stderr ### Public API ends here ### ### Next section: File-reading helpers
def _keepstate(func): """Decorator to help keep the position in open file passed as first argument to functions""" @wraps(func) def wrapped(source, *args, **kwargs): if hasattr(source, 'seek') and hasattr(source, 'tell'): pos = source.tell() source.seek(0) res = func(source, *args, **kwargs) source.seek(pos) return res else: return func(source, *args, **kwargs) return wrapped class _file_obj(object): """Check if `f` is a file name and open the file in `mode`. A context manager.""" def __init__(self, f, mode): if f is None: self.file = {'r': sys.stdin, 'a': sys.stdout, 'w': sys.stdout }[mode[0]] self.none = True elif isinstance(f, str): self.file = open(f, mode) else: self.file = f self.close_file = (self.file is not f) def __enter__(self): return self def __exit__(self, *args, **kwargs): if (not self.close_file) or hasattr(self, 'none'): return # do nothing # clean up exit = getattr(self.file, '__exit__', None) if exit is not None: return exit(*args, **kwargs) else: exit = getattr(self.file, 'close', None) if exit is not None: exit() def __getattr__(self, attr): return getattr(self.file, attr) def __iter__(self): return iter(self.file) def _file_reader(mode='r'): # a lot of the code below is borrowed from # http://stackoverflow.com/a/14095585/1258041 def decorator(func): """A decorator implementing the context manager protocol for functions that read files. Note: 'close' must be in kwargs! Otherwise it won't be respected.""" class CManager(object): def __init__(self, source, *args, **kwargs): self.file = _file_obj(source, mode) try: self.reader = func(self.file, *args, **kwargs) except: # clean up on any error self.__exit__(*sys.exc_info()) raise # context manager support def __enter__(self): return self def __exit__(self, *args, **kwargs): self.file.__exit__(*args, **kwargs) # iterator support def __iter__(self): return self def __next__(self): try: return next(self.reader) except StopIteration: self.__exit__(None, None, None) raise next = __next__ # Python 2 support # delegate everything else to file object def __getattr__(self, attr): return getattr(self.file, attr) @wraps(func) def helper(*args, **kwargs): return CManager(*args, **kwargs) return helper return decorator ### End of file helpers section ###
[docs]def memoize(maxsize=1000): """Make a memoization decorator. A negative value of `maxsize` means no size limit.""" def deco(f): """Memoization decorator. Items of `kwargs` must be hashable.""" memo = {} @wraps(f) def func(*args, **kwargs): key = (args, frozenset(kwargs.items())) if key not in memo: if len(memo) == maxsize: memo.popitem() memo[key] = f(*args, **kwargs) return memo[key] return func return deco
def _parse_charge(s): try: return int(s) except ValueError: if ',' in s or 'and' in s: return list(map(_parse_charge, re.split(r'(?:,\s*)|(?:\s*and\s*)', s))) num, sign = re.match(r'(\d+)(\+|-)', s).groups() return int(num)*(-1 if sign == '-' else 1) ### XML-related stuff below ### def _local_name(element): """Strip namespace from the XML element's name""" if element.tag.startswith('{'): return element.tag.rsplit('}', 1)[1] else: return element.tag def _make_version_info(env): @_keepstate def version_info(source): with _file_obj(source, 'rb') as s: s.seek(0) for _, elem in etree.iterparse(s, events=('start',), remove_comments=True): if _local_name(elem) == env['element']: vinfo = elem.attrib.get('version'), elem.attrib.get(( '{{{}}}'.format(elem.nsmap['xsi']) if 'xsi' in elem.nsmap else '') + 'schemaLocation') break return vinfo version_info.__doc__ = """ Provide version information about the {0} file. Parameters: ----------- source : str or file {0} file object or path to file Returns: -------- out : tuple A (version, schema URL) tuple, both elements are strings or None. """.format(env['format']) return version_info def _make_schema_info(env): @memoize(100) def _schema_info(source): version, schema = env['version_info'](source) if version == env['default_version']: return env['defaults'] ret = {} try: if not schema: schema_url = '' raise PyteomicsError( 'Schema information not found in {}.'.format(source)) schema_url = schema.split()[-1] if not (schema_url.startswith('http://') or schema_url.startswith('file://')): schema_url = 'file://' + schema_url schema_file = urlopen(schema_url) p = etree.XMLParser(remove_comments=True) schema_tree = etree.parse(schema_file, parser=p) types = {'ints': {'int', 'long', 'nonNegativeInteger', 'positiveInt', 'integer', 'unsignedInt'}, 'floats': {'float', 'double'}, 'bools': {'boolean'}, 'intlists': {'listOfIntegers'}, 'floatlists': {'listOfFloats'}, 'charlists': {'listOfChars', 'listOfCharsOrAny'}} for k, val in types.items(): tuples = set() for elem in schema_tree.iter(): if _local_name(elem) == 'attribute' and elem.attrib.get( 'type', '').split(':')[-1] in val: anc = elem.getparent() while not ( (_local_name(anc) == 'complexType' and 'name' in anc.attrib) or _local_name(anc) == 'element'): anc = anc.getparent() if anc is None: break else: if _local_name(anc) == 'complexType': elnames = [x.attrib['name'] for x in schema_tree.iter() if x.attrib.get( 'type', '').split(':')[-1] == anc.attrib['name']] else: elnames = (anc.attrib['name'],) for elname in elnames: tuples.add( (elname, elem.attrib['name'])) ret[k] = tuples ret['lists'] = set(elem.attrib['name'] for elem in schema_tree.xpath( '//*[local-name()="element"]') if 'name' in elem.attrib and elem.attrib.get('maxOccurs', '1') != '1') except Exception as e: if isinstance(e, URLError): warn("Can't get the {0[format]} schema for version {1}" " from {2} at the moment.\n" "Using defaults for {0[default_version]}".format(env, version, schema_url)) else: warn("Unknown {0[format]} version `{1}`. " "Attempt to use schema\n" "information from {2} failed.\n" "Exception information:\n{3}\n" "Falling back to defaults for {0[default_version]}\n" "NOTE: This is just a warning, probably from a badly-" "generated XML file.\nYou'll still most probably get " "decent results.\nLook here for suppressing warnings:\n" "http://docs.python.org/library/warnings.html#" "temporarily-suppressing-warnings\n" "If you think this shouldn't have happenned, you can " "report this to\n" "http://hg.theorchromo.ru/pyteomics/issues\n" "".format(env, version, schema_url, format_exc())) ret = env['defaults'] return ret _schema_info.__doc__ = """ Stores defaults for version {}, tries to retrieve the schema for other versions. Keys are: 'floats', 'ints', 'bools', 'lists', 'intlists', 'floatlists', 'charlists'.""".format(env['default_version']) return _schema_info def _make_get_info(env): def _get_info(source, element, recursive=False, retrieve_refs=False): """Extract info from element's attributes, possibly recursive. <cvParam> and <userParam> elements are treated in a special way.""" name = _local_name(element) kwargs = dict(recursive=recursive, retrieve_refs=retrieve_refs) if name in {'cvParam', 'userParam'}: if 'value' in element.attrib: try: value = float(element.attrib['value']) except ValueError: value = element.attrib['value'] return {element.attrib['name']: value} else: return {'name': element.attrib['name']} info = dict(element.attrib) # process subelements if recursive: for child in element.iterchildren(): cname = _local_name(child) if cname in {'cvParam', 'userParam'}: newinfo = _get_info(source, child) if not ('name' in info and 'name' in newinfo): info.update(newinfo) else: if not isinstance(info['name'], list): info['name'] = [info['name']] info['name'].append(newinfo.pop('name')) else: if cname not in env['schema_info'](source)['lists']: info[cname] = env['get_info_smart'](source, child, **kwargs) else: if cname not in info: info[cname] = [] info[cname].append(env['get_info_smart'](source, child, **kwargs)) # process element text if element.text and element.text.strip(): stext = element.text.strip() if stext: if info: info[name] = stext else: return stext # convert types def str_to_bool(s): if s.lower() in {'true', '1'}: return True if s.lower() in {'false', '0'}: return False raise PyteomicsError('Cannot convert string to bool: ' + s) converters = {'ints': int, 'floats': float, 'bools': str_to_bool, 'intlists': lambda x: numpy.fromstring(x, dtype=int, sep=' '), 'floatlists': lambda x: numpy.fromstring(x, sep=' '), 'charlists': list} for k, v in info.items(): for t, a in converters.items(): if (_local_name(element), k) in env['schema_info'](source)[t]: info[k] = a(v) # resolve refs # loop is needed to resolve refs pulled from other refs if retrieve_refs: while True: refs = False for k, v in dict(info).items(): if k.endswith('_ref'): refs = True info.update(env['get_by_id'](source, v)) del info[k] del info['id'] if not refs: break # flatten the excessive nesting for k, v in dict(info).items(): if k in env['keys']: info.update(v) del info[k] # another simplification for k, v in dict(info).items(): if isinstance(v, dict) and 'name' in v and len(v) == 1: info[k] = v['name'] if len(info) == 2 and 'name' in info and ( 'value' in info or 'values' in info): name = info.pop('name') info = {name: info.popitem()[1]} return info return _get_info def _make_iterfind(env): pattern_path = re.compile('([\w/*]*)(\[(\w+[<>=]{1,2}\w+)\])?') pattern_cond = re.compile('^\s*(\w+)\s*([<>=]{,2})\s*(\w+)$') def get_rel_path(element, names): if not names: yield element else: for child in element.iterchildren(): if _local_name(child).lower() == names[0].lower( ) or names[0] == '*': if len(names) == 1: yield child else: for gchild in get_rel_path(child, names[1:]): yield gchild def satisfied(d, cond): func = {'<': 'lt', '<=': 'le', '=': 'eq', '==': 'eq', '!=': 'ne', '<>': 'ne', '>': 'gt', '>=': 'ge'} try: lhs, sign, rhs = re.match(pattern_cond, cond).groups() return getattr(operator, func[sign])(d[lhs], type(d[lhs])(rhs)) except (AttributeError, KeyError, ValueError): raise PyteomicsError('Invalid condition: ' + cond) @_keepstate def iterfind(source, path, **kwargs): """Parse ``source`` and yield info on elements with specified local name or by specified "XPath". Only local names separated with slashes are accepted. An asterisk (`*`) means any element. You can specify a single condition in the end, such as: "/path/to/element[some_value>1.5]" Note: you can do much more powerful filtering using plain Python. The path can be absolute or "free". Please don't specify namespaces.""" try: path, _, cond = re.match(pattern_path, path).groups() except AttributeError: raise PyteomicsError('Invalid path: ' + path) if path.startswith('//') or not path.startswith('/'): absolute = False if path.startswith('//'): path = path[2:] if path.startswith('/') or '//' in path: raise ValueError("Too many /'s in a row.") else: absolute = True path = path[1:] nodes = path.rstrip('/').lower().split('/') localname = nodes[0] found = False for ev, elem in etree.iterparse(source, events=('start', 'end'), remove_comments=True): name_lc = _local_name(elem).lower() if ev == 'start': if name_lc == localname or localname == '*': found = True else: if name_lc == localname or localname == '*': if (absolute and elem.getparent() is None ) or not absolute: for child in get_rel_path(elem, nodes[1:]): info = env['get_info_smart'](source, child, **kwargs) if cond is None or satisfied(info, cond): yield info if not localname == '*': found = False if not found: elem.clear() return iterfind def _xpath(tree, path, ns=None): """Return the results of XPath query with added namespaces. Assumes the ns declaration is on the root element or absent.""" if hasattr(tree, 'getroot'): root = tree.getroot() else: root = tree while root.getparent() is not None: root = root.getparent() ns = root.nsmap.get(ns) def repl(m): s = m.group(1) if not ns: return s if not s: return 'd:' return '/d:' new_path = re.sub('(\/|^)(?![\*\/])', repl, path) n_s = ({'d': ns} if ns else None) return tree.xpath(new_path, namespaces=n_s)

Contents