# Copyright 2013 Novo Nordisk Foundation Center for Biosustainability, DTU.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, print_function
import colorsys
import inspect
import itertools
import logging
import math
import platform
import re
from collections import OrderedDict
from datetime import datetime
from functools import partial
from itertools import islice
from time import time
from uuid import uuid1
import numpy
import pandas
import pip
import six
from numpy.random import RandomState
from six.moves import range
logger = logging.getLogger(__name__)
_BIOMASS_RE_ = re.compile("biomass", re.IGNORECASE)
[docs]class frozendict(dict):
def __init__(self, iterable, **kwargs):
super(frozendict, self).__init__(iterable, **kwargs)
[docs] def popitem(self):
raise AttributeError("'frozendict' object has no attribute 'popitem")
[docs] def pop(self, k, d=None):
raise AttributeError("'frozendict' object has no attribute 'pop")
def __setitem__(self, key, value):
raise AttributeError("'frozendict' object has no attribute '__setitem__")
[docs] def setdefault(self, k, d=None):
raise AttributeError("'frozendict' object has no attribute 'setdefault")
def __delitem__(self, key):
raise AttributeError("'frozendict' object has no attribute '__delitem__")
def __hash__(self):
return hash(tuple(sorted(self.items())))
[docs] def update(self, E=None, **F):
raise AttributeError("'frozendict' object has no attribute 'update")
[docs]def float_ceil(val, decimals=0):
"""
Like ceil but the number of decimals can be set.
Equivalent of $$round(val + 1e^{-decimals}/2, decimals)$$
val: float, numpy.array
The initial value.
decimals: int
The number of decimal places.
Returns
-------
float, numpy.array
"""
aux = math.pow(10, -decimals) / 2
return numpy.round(val + aux, decimals)
[docs]def float_floor(val, decimals=0):
"""
Like floor but the number of decimals can be set.
Equivalent of $$round(val - 1e^{-decimals}/2, decimals)$$
val: float, numpy.array
The initial value.
decimals: int
The number of decimal places.
Returns
-------
float, numpy.array
"""
aux = math.pow(10, -decimals) / 2
return numpy.round(val - aux, decimals)
[docs]class ProblemCache(object):
"""
Variable and constraint cache for models.
To be used in complex methods that require many extra variables and constraints when one must run
simulations with the same method many times.
It allows rollback to the previous state in case one iteration fails to build the problem or
generates an invalid state.
"""
def __init__(self, model):
self.time_machine = None
self._model = model
self.variables = {}
self.constraints = {}
self.objective = None
self.original_objective = model.objective
self.time_machine = TimeMachine()
self.transaction_id = None
[docs] def begin_transaction(self):
"""
Creates a time point. If rollback is called, the variables and constrains will be reverted to this point.
"""
self.transaction_id = uuid1()
self.time_machine(do=int, undo=int, bookmark=self.transaction_id)
@property
def model(self):
return self._model
def _append_constraint(self, constraint_id, create, *args, **kwargs):
self.constraints[constraint_id] = create(self.model, constraint_id, *args, **kwargs)
self._model.solver.add(self.constraints[constraint_id])
def _remove_constraint(self, constraint_id):
constraint = self.constraints.pop(constraint_id)
self.model.solver.remove(constraint)
def _append_variable(self, variable_id, create, *args, **kwargs):
self.variables[variable_id] = create(self.model, variable_id, *args, **kwargs)
self.model.solver.add(self.variables[variable_id])
def _remove_variable(self, variable_id):
variable = self.variables.pop(variable_id)
self.model.solver.remove(variable)
def _rebuild_variable(self, variable):
(type, lb, ub, name) = variable.type, variable.lb, variable.ub, variable.name
def rebuild():
self.model.solver.remove(variable)
new_variable = self.model.solver.interface.Variable(name, lb=lb, ub=ub, type=type)
self.variables[name] = variable
self.model.solver.add(new_variable, sloppy=True)
return rebuild
[docs] def add_constraint(self, constraint_id, create, update, *args, **kwargs):
"""
Adds a new cached constraint.
The create and update functions must have the following signatures:
>>> create(model, constraint_id, *args)
>>> update(model, constraint, *args)
"args" in the first example must match args on the second example.
Arguments
---------
constraint_id: str
The identifier of the constraint
create: function
A function that creates an optlang.interface.Constraint
update: function
a function that updates an optlang.interface.Constraint
"""
if constraint_id in self.constraints:
if update is not None:
update(self.model, self.constraints[constraint_id], *args, **kwargs)
else:
self.time_machine(
do=partial(self._append_constraint, constraint_id, create, *args, **kwargs),
undo=partial(self._remove_constraint, constraint_id)
)
[docs] def add_variable(self, variable_id, create, update, *args, **kwargs):
"""
Adds a new cached variable.
The create and update functions must have the following signatures:
>>> create(model, variable_id, *args)
>>> update(model, variable, *args)
"args" in the first example must match args on the second example.
Arguments
---------
constraint_id: str
The identifier of the constraint
create: function
A function that creates an optlang.interface.Variable
update: function
a function that updates an optlang.interface.Variable
"""
if variable_id in self.variables:
if update is not None:
self.time_machine(
do=partial(update, self.model, self.variables[variable_id], *args, **kwargs),
undo=self._rebuild_variable(self.variables[variable_id]))
else:
self.time_machine(
do=partial(self._append_variable, variable_id, create, *args, **kwargs),
undo=partial(self._remove_variable, variable_id)
)
[docs] def add_objective(self, create, update, *args):
if self.objective is None:
self.objective = create(self.model, *args)
self.time_machine(
do=partial(setattr, self.model, 'objective', self.objective),
undo=partial(setattr, self.model, 'objective', self.model.objective)
)
else:
if update:
self.objective = update(self.model, *args)
self.time_machine(
do=partial(setattr, self.model, 'objective', self.objective),
undo=partial(setattr, self.model, 'objective', self.model.objective)
)
[docs] def reset(self):
"""
Removes all constraints and variables from the cache.
"""
self.model.solver.remove(self.constraints.values())
self.model.solver.remove(self.variables.values())
self.model.objective = self.original_objective
self.variables = {}
self.objective = None
self.constraints = {}
self.transaction_id = None
self.time_machine.history.clear()
[docs] def rollback(self):
"""
Returns to the previous transaction start point.
"""
if self.transaction_id is None:
raise RuntimeError("Start transaction must be called before rollback")
self.time_machine.undo(self.transaction_id)
self.transaction_id = None
def __enter__(self):
"""
Allows problem cache to be used with a _with_ statement.
Examples
--------
You want to run room/lmoma for every single knockout.
>>> with ProblemCache(model) as cache:
>>> for reaction in reactions:
>>> result = lmoma(model, reference=reference, cache=cache)
Returns
-------
ProblemCache
returns itself
"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.reset()
[docs]class RandomGenerator(object):
def __init__(self, seed=None):
self._random = RandomState(seed=seed)
[docs] def seed(self, seed):
self._random = RandomState(seed=seed)
[docs] def random(self):
return self._random.rand()
[docs] def randint(self, a, b=None):
if b is None:
b = a
a = 0
r = self._random.randint(a, high=b, size=1)
return r[0]
[docs] def sample(self, population, k):
if k == 0:
return []
return list(self._random.choice(population, size=k, replace=False))
def __getattr__(self, attr):
return getattr(self._random, attr)
def __getstate__(self):
return {'_random': self._random}
def __setstate__(self, d):
self._random = d['_random']
[docs]class Singleton(object):
"""
Singleton class to be extended
"""
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(Singleton, cls).__new__(cls)
return cls._instance
[docs]class AutoVivification(dict):
"""Implementation of perl's autovivification feature. Checkout http://stackoverflow.com/a/652284/280182"""
def __getitem__(self, item):
try:
return dict.__getitem__(self, item)
except KeyError:
value = self[item] = type(self)()
return value
[docs]class TimeMachine(object):
"""Travel back and forth in time."""
def __init__(self):
super(TimeMachine, self).__init__()
self.history = OrderedDict()
def __call__(self, do=None, undo=None, bookmark=None):
output = do()
current_time = time()
if bookmark is None:
entry_id = uuid1()
else:
entry_id = bookmark
# make sure that entry is added to the end of history
self.history.pop(entry_id, None)
self.history[entry_id] = {'unix_epoch': current_time, 'undo': undo, 'redo': do}
return entry_id, output
def __str__(self):
info = '\n'
for item in six.iteritems(self.history):
info += self._history_item_to_str(item)
return info
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.reset()
@staticmethod
def _history_item_to_str(item):
info = ''
uuid, entry = item
info += datetime.fromtimestamp(entry['unix_epoch']).strftime('%Y-%m-%d %H:%M:%S') + '\n'
undo_entry = entry['undo']
try:
# partial (if .keywords is None print {} instead)
elements = undo_entry.func, undo_entry.args, undo_entry.keywords or {}
info += 'undo: ' + ' '.join([str(elem) for elem in elements]) + '\n'
except AttributeError: # normal python function
info += 'undo: ' + undo_entry.__name__ + '\n'
redo_entry = entry['redo']
try:
elements = redo_entry.func, redo_entry.args, redo_entry.keywords or {} # partial
info += 'redo: ' + ' '.join([str(elem) for elem in elements]) + '\n'
except AttributeError:
info += 'redo: ' + redo_entry.__name__ + '\n'
return info
[docs] def undo(self, bookmark=None):
if bookmark is None:
try:
(uuid, entry) = self.history.popitem()
entry['undo']()
except KeyError: # history is empty
pass
elif bookmark in list(self.history.keys()):
uuid = False
while uuid is not bookmark:
(uuid, entry) = self.history.popitem()
entry['undo']()
else:
raise Exception(
'Provided bookmark %s cannot be found in the time machine.')
[docs] def redo(self):
raise NotImplementedError
[docs] def reset(self):
if self.history: # history is not empty
self.undo(bookmark=list(self.history.keys())[0])
[docs]class Timer(object):
"""Taken from http://stackoverflow.com/a/5849861/280182"""
def __init__(self, name=None):
self.name = name
def __enter__(self):
self.tstart = time()
def __exit__(self, type, value, traceback):
if self.name:
print('[%s]' % self.name, end=' ')
print('Elapsed: %s' % (time() - self.tstart))
[docs]class IntelliContainer(object):
def __init__(self, **kwargs):
self._dict = dict(**kwargs)
def __getattr__(self, value):
return self._dict.get(value)
def __setitem__(self, key, value):
self._dict[key] = value
def __iter__(self):
return six.itervalues(self._dict)
def __dir__(self):
return list(self._dict.keys())
[docs]def inheritdocstring(name, bases, attrs):
"""Use as metaclass to inherit class and method docstrings from parent.
Adapted from http://stackoverflow.com/questions/13937500/inherit-a-parent-class-docstring-as-doc-attribute"""
temp = type('temporaryclass', bases, {})
if '__doc__' not in attrs or not attrs["__doc__"]:
# create a temporary 'parent' to (greatly) simplify the MRO search
for cls in inspect.getmro(temp):
if cls.__doc__ is not None:
attrs['__doc__'] = cls.__doc__
break
for attr_name, attr in attrs.items():
if not attr.__doc__:
for cls in inspect.getmro(temp):
try:
if getattr(cls, attr_name).__doc__ is not None:
attr.__doc__ = getattr(cls, attr_name).__doc__
break
except (AttributeError, TypeError):
continue
return type(name, bases, attrs)
[docs]def partition_(lst, n):
"""Partition a list into n bite size chunks."""
division = len(lst) / float(n)
return [lst[int(round(division * i)): int(round(division * (i + 1)))] for i in range(n)]
[docs]def partition(ite, n):
"""Partition an iterable into n bite size chunks."""
try:
length = len(ite)
except TypeError:
ite = list(ite)
length = len(ite)
division = length / float(n)
iterator = iter(ite)
return [list(islice(iterator, 0, round(division * (i + 1)) - round(division * i))) for i in range(n)]
[docs]def flatten(l):
return [item for sublist in l for item in sublist]
[docs]def generate_colors(n):
hsv_tuples = [(v * 1.0 / n, 0.5, 0.5) for v in range(n)]
color_map = {}
for i in range(n):
rgb = colorsys.hsv_to_rgb(*hsv_tuples[i])
color = tuple(int(channel * 256) for channel in rgb)
color_map[i] = '#%02x%02x%02x' % color
return color_map
[docs]def memoize(function, memo={}):
def wrapper(*args):
if args in memo:
return memo[args]
else:
rv = function(*args)
memo[args] = rv
return rv
return wrapper
[docs]def get_system_info():
# pip freeze (adapted from http://stackoverflow.com/a/24322465/280182)
package_info = list()
for dist in pip.get_installed_distributions():
req = str(dist.as_requirement())
package_info.append(req)
return dict(package_info=package_info,
platform=platform.platform(),
machine=platform.machine(),
system=platform.system())
[docs]def in_ipnb():
"""
Check if it is running inside an IPython Notebook (updated for new notebooks)
"""
return pandas.core.common.in_ipython_frontend()
[docs]def str_to_valid_variable_name(s):
"""Adapted from http://stackoverflow.com/a/3303361/280182"""
# Remove invalid characters
s = re.sub('[^0-9a-zA-Z_]', '_', s)
# Remove leading characters until we find a letter or underscore
s = re.sub('^[^a-zA-Z_]+', '', s)
return s
[docs]def zip_repeat(long_iter, short_iter):
"""
Zips two iterable objects but repeats the second one if it is shorter than the first one.
Parameters
----------
long_iter: iterable
short_iter: iterable
Returns
-------
generator
"""
for i, j in zip(long_iter, itertools.cycle(short_iter)):
yield i, j
[docs]def pick_one(iterable):
"""
Helper function that returns an element of an iterable (it the iterable is ordered this will be the first
element).
"""
it = iter(iterable)
return next(it)
[docs]def reduce_reaction_set(reaction_set, groups):
"""
Reduces a set of reactions according to a number of groups of reactions.
The reduction will be performed so that the resulting set will contain no more than 1 reaction from each group.
Reactions that are not in any of the groups will remain in the set.
Parameters
----------
reaction_set: Set
groups: Iterable of sets
Returns
-------
Set
"""
reaction_set = set(reaction_set) # Make a shallow copy
result = []
for group in groups:
intersection = group & reaction_set
if intersection: # If any elements of group are in reaction_set, add one of these to result
result.append(pick_one(intersection))
reaction_set = reaction_set - intersection
result = set(result) | reaction_set # Add the remaining reactions to result
return result
[docs]def decompose_reaction_groups(reaction_groups, reactions):
"""
reaction_groups : list
A list with dictionaries (element: relative_coefficient)
reactions : list, set, tuple
A collection of reactions.
Returns
-------
list
A list of all possible group substitutions.
"""
to_keep = []
to_replace = {}
for element in reactions:
for g in reaction_groups:
if element in g:
to_replace[element] = g.keys()
break
if element not in to_replace:
to_keep.append(element)
return (list(combo) + to_keep for combo in itertools.product(*to_replace.values()))
[docs]def current_solver_name(model):
"""Give a string representation for an optlang interface.
Parameters
----------
model : cameo.core.SolverBasedModel
A model
Returns
-------
string
The name of the interface as a string
"""
interface = model.solver.interface.__name__
return re.sub(r"optlang.|.interface", "", interface)