Source code for cbmpy.CBGLPK

"""
CBMPy: CBGLPK module
====================
PySCeS Constraint Based Modelling (http://cbmpy.sourceforge.net)
Copyright (C) 2009-2017 Brett G. Olivier, VU University Amsterdam, Amsterdam, The Netherlands

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>

Author: Brett G. Olivier
Contact email: bgoli@users.sourceforge.net
Last edit: $Author: bgoli $ ($Id: CBGLPK.py 575 2017-04-13 12:18:44Z bgoli $)

"""

# preparing for Python 3 port
from __future__ import division, print_function
from __future__ import absolute_import
#from __future__ import unicode_literals

HAVE_SYMPY = False
try:
    import sympy
    if int(sympy.__version__.split('.')[1]) >= 7 and int(sympy.__version__.split('.')[2]) >= 4:
        HAVE_SYMPY = True
    else:
        del sympy
except ImportError:
    HAVE_SYMPY = False

import os, time
import numpy
from . import CBWrite
from .CBConfig import __CBCONFIG__ as __CBCONFIG__
__DEBUG__ = __CBCONFIG__['DEBUG']
__version__ = __CBCONFIG__['VERSION']


HAVE_GLPK = False
GLPK_SOLUTION_STATUS = None

try:
    import glpk
    lp = glpk.LPX()
    HAVE_GLPK = True
except:
    raise ImportError

# configuration options for GLPK
GLPK_CFG = {'simplex' : {'meth' : glpk.LPX.PRIMAL,
                       #'meth' : glpk.LPX.DUAL,
                       #'meth' : glpk.LPX.DUALP,
                       'tol_bnd' : 1.0e-6,
                       'tol_dj'  : 1.0e-6,
                       'tol_piv' : 1.0e-10
                       }
          }

GLPK_STATUS = {
    1 : 'LPS_UNDEF',
    2 : 'LPS_FEAS',
    3 : 'LPS_INFEAS',
    4 : 'LPS_NOFEAS',
    5 : 'LPS_OPT',
    6 : 'LPS_UNBND'}

GLPK_STATUS2 = {
    'opt' : 'LPS_OPT',
    'undef' : 'LPS_UNDEF',
    'feas' : 'LPS_FEAS',
    'infeas' : 'LPS_INFEAS',
    'nofeas' : 'LPS_NOFEAS',
    'unbnd' : 'LPS_UNBND'}


GLPK_SILENT_MODE = True
GLPK_INFINITY = 1.0e9

[docs]def glpk_constructLPfromFBA(fba, fname=None): """ Create a GLPK LP in memory. - *fba* an FBA object - *fname* optional filename if defined writes out the constructed lp """ _Stime = time.time() # define model and add variables lp = glpk.LPX() lp.name = fba.getId() lp.cols.add(len(fba.N.col)) if HAVE_SYMPY and fba.N.__array_type__ == sympy.MutableDenseMatrix: print('INFO: GLPK requires floating point, converting N') Nmat = numpy.array(fba.N.array).astype('float') RHSmat = numpy.array(fba.N.RHS).astype('float') if fba.CM != None: CMmat = numpy.array(fba.CM.array).astype('float') CMrhs = numpy.array(fba.CM.RHS).astype('float') else: Nmat = fba.N.array RHSmat = fba.N.RHS if fba.CM != None: CMmat = fba.CM.array CMrhs = fba.CM.RHS varMap = {} for n_ in range(Nmat.shape[1]): varMap[fba.N.col[n_]] = n_ lp.cols[n_].name = fba.N.col[n_] #print varMap try: # define objective osense = fba.getActiveObjective().operation.lower() if osense in ['minimize', 'minimise', 'min']: lp.obj.maximize = False elif osense in ['maximize', 'maximise', 'max']: lp.obj.maximize = True else: raise RuntimeError('\n%s - is not a valid objective operation' % osense) lp.obj.name = fba.getActiveObjective().getId() for fo_ in fba.getActiveObjective().fluxObjectives: lp.obj[varMap[fo_.reaction]] = fo_.coefficient except AttributeError: print('\nWARNING(GLPK create LP): no objective function defined') # create N constraints lp.rows.add(Nmat.shape[0]) #conMap = {} #for n_ in range(Nmat.shape[0]): ##conMap[fba.N.row[n_]] = n_ #lp.rows[n_].name = fba.N.row[n_] #tnew = time.time() for r_ in range(Nmat.shape[0]): # name and coefficients newCon = [] for c_ in range(Nmat.shape[1]): newCon.append((c_, Nmat[r_,c_])) lp.rows[r_].name = fba.N.row[r_] lp.rows[r_].matrix = newCon # sense and rhs rhs = RHSmat[r_] if fba.N.operators[r_] in ['<=','<','L']: lp.rows[r_].bounds = None, rhs elif fba.N.operators[r_] in ['>=','>','G']: lp.rows[r_].bounds = rhs, None elif fba.N.operators[r_] in ['=','E']: lp.rows[r_].bounds = rhs else: raise RuntimeError('INFO: invalid operator: %s' % fba.N.operators[n]) # add user defined constraints if fba.CM != None: baseRows = len(lp.rows) lp.rows.add(CMmat.shape[0]) for r_ in range(CMmat.shape[0]): # name and coefficients newCon = [] for c_ in range(CMmat.shape[1]): newCon.append((c_, CMmat[r_, c_])) lp.rows[baseRows+r_].name = fba.CM.row[r_] lp.rows[baseRows+r_].matrix = newCon # sense and rhs rhs = CMrhs[r_] if fba.CM.operators[r_] in ['<=','<','L']: lp.rows[baseRows+r_].bounds = None, rhs elif fba.CM.operators[r_] in ['>=','>','G']: lp.rows[baseRows+r_].bounds = rhs, None elif fba.CM.operators[r_] in ['=','E']: lp.rows[baseRows+r_].bounds = rhs else: raise RuntimeError('INFO: invalid operator: %s' % fba.N.operators[n]) # add bounds for r_ in fba.reactions: lb = ub = None lb = fba.getReactionLowerBound(r_.getId()) ub = fba.getReactionUpperBound(r_.getId()) if lb in ['Infinity', 'inf', 'Inf', 'infinity']: lb = GLPK_INFINITY elif lb in ['-Infinity', '-inf', '-Inf', '-infinity', None]: lb = -GLPK_INFINITY elif numpy.isinf(lb): if lb < 0.0: lb = -GLPK_INFINITY else: lb = GLPK_INFINITY if ub in ['Infinity', 'inf', 'Inf', 'infinity', None]: ub = GLPK_INFINITY elif ub in ['-Infinity', '-inf', '-Inf', '-infinity']: ub = -GLPK_INFINITY elif numpy.isinf(ub): if ub < 0.0: ub = -GLPK_INFINITY else: ub = GLPK_INFINITY if ub != GLPK_INFINITY and lb != -GLPK_INFINITY and ub == lb: lp.cols[varMap[r_.getId()]].bounds = lb elif ub != GLPK_INFINITY and lb != -GLPK_INFINITY: lp.cols[varMap[r_.getId()]].bounds = lb, ub elif ub != GLPK_INFINITY: lp.cols[varMap[r_.getId()]].bounds = None, ub elif lb != -GLPK_INFINITY: lp.cols[varMap[r_.getId()]].bounds = lb, None else: lp.cols[varMap[r_.getId()]].bounds = None print('\ngplk_constructLPfromFBA time: {}\n'.format(time.time() - _Stime)) if fname != None: lp.write(cpxlp=fname+'.lp') return lp
[docs]def glpk_Solve(lp, method='s'): """ Solve the LP and create a status attribute with the solution status - *method* [default='s'] 's' = simplex, 'i' = interior, 'e' = exact GLPK solver options can be set in the dictionary GLPK_CFG """ if method == 'i': glpksol = lp.interior() elif method == 'e': glpksol = lp.exact() else: glpksol = lp.simplex(**GLPK_CFG['simplex']) global GLPK_SOLUTION_STATUS GLPK_SOLUTION_STATUS = glpk_getSolutionStatus(lp) #if status == 'LPS_UNDEF': #sd.presolve = False #status = glpk.glp_simplex(lp, sd) #if status == 'LPS_UNDEF': #print('\nINFO: Primal solver failure switching to dual-primal solver\n') #sd.presolve = False #sd.obj_ul = 1.0e10 #sd.obj_ll = 1.0e10 #sd.meth = glpk.GLP_DUALP #glpk.glp_simplex(lp, sd) #status = glpk_getSolutionStatus(lp) if GLPK_SOLUTION_STATUS in ['LPS_UNDEF', 'LPS_FEAS', 'LPS_INFEAS', 'LPS_NOFEAS', 'LPS_OPT', 'LPS_UNBND']: if not GLPK_SILENT_MODE: print('Solution status returned as: {}'.format(GLPK_SOLUTION_STATUS)) print("Objective value = " , lp.obj.value) return GLPK_SOLUTION_STATUS else: print("INFO: No solution available ({})".format(GLPK_SOLUTION_STATUS)) return None
[docs]def glpk_getSolutionStatus(lp): """ Returns one of: - *LPS_OPT*: solution is optimal; - *LPS_FEAS*: solution is feasible; - *LPS_INFEAS*: solution is infeasible; - *LPS_NOFEAS*: problem has no feasible solution; - *LPS_UNBND*: problem has unbounded solution; - *LPS_UNDEF*: solution is undefined. """ return GLPK_STATUS2[lp.status]
[docs]def glpk_analyzeModel(f, lpFname=None, return_lp_obj=False, with_reduced_costs='unscaled', with_sensitivity=False,\ del_intermediate=False, build_n=True, quiet=False, oldlpgen=False, method='s'): """ Optimize a model and add the result of the optimization to the model object (e.g. `reaction.value`, `objectiveFunction.value`). The stoichiometric matrix is automatically generated. This is a common function available in all solver interfaces. By default returns the objective function value - *f* an instantiated PySCeSCBM model object - *lpFname* [default=None] the name of the intermediate LP file saved when this has a string value. - *return_lp_obj* [default=False] off by default when enabled it returns the PyGLPK LP object - *with_reduced_costs* [default='unscaled'] calculate and add reduced cost information to mode this can be: 'unscaled' or 'scaled' or anything else which is interpreted as 'None'. Scaled means s_rcost = (r.reduced_cost*rval)/obj_value - *with_sensitivity* [default=False] add solution sensitivity information (not yet implemented) - *del_intermediate* [default=False] delete the intermediary files after updating model object, useful for server applications - *build_n* [default=True] generate stoichiometry from the reaction network (reactions/reagents/species) - *quiet* [default=False] suppress glpk output - *method* [default='s'] select the GLPK solver method, see the GLPK documentation for details - 's': simplex - 'i': interior - 'e': exact """ if build_n: f.buildStoichMatrix() #CBTools.addStoichToFBAModel(f) fid = f.id if with_reduced_costs == 'scaled': f.SCALED_REDUCED_COSTS = True elif with_reduced_costs == 'unscaled': f.SCALED_REDUCED_COSTS = False else: f.SCALED_REDUCED_COSTS = None if lpFname == None: flp = glpk_constructLPfromFBA(f, fname=None) f.id = '_glpktmp_.tmp' else: flp = glpk_constructLPfromFBA(f, fname=lpFname) fid = lpFname _Stime = time.time() print('\nglpk_analyzeModel FBA --> LP time: {}\n'.format(time.time() - _Stime)) f.id = fid glpk_Solve(flp, method) glpk_setFBAsolutionToModel(f, flp, with_reduced_costs=with_reduced_costs) glpk_setSolutionStatusToModel(f, flp) if oldlpgen and del_intermediate: os.remove(LPF) objv = f.getActiveObjective().getValue() print('\nanalyzeModel objective value: {}\n'.format(objv)) if return_lp_obj: return flp else: del flp return objv
[docs]def glpk_setSolutionStatusToModel(m, lp): """ Sets the lp solutions status to the CBMPy model """ m.SOLUTION_STATUS = glpk_getSolutionStatus(lp) # TODO: need to synchronise all solvers to same integer system if m.SOLUTION_STATUS == 'LPS_OPT': m.SOLUTION_STATUS_INT = 1 else: m.SOLUTION_STATUS_INT = 999
[docs]def glpk_setFBAsolutionToModel(fba, lp, with_reduced_costs='unscaled'): """ Sets the FBA solution from a CPLEX solution to an FBA object - *fba* and fba object - *lp* a CPLEX LP object - *with_reduced_costs* [default='unscaled'] calculate and add reduced cost information to mode this can be: 'unscaled' or 'scaled' or anything else which is interpreted as None. Scaled is: s_rcost = (r.reduced_cost*rval)/obj_value """ sol, objname, objval = glpk_getOptimalSolution(lp) if glpk_getSolutionStatus(lp) == 'LPS_OPT': fba.objectives[fba.activeObjIdx].solution, fba.objectives[fba.activeObjIdx].value = sol, objval else: fba.objectives[fba.activeObjIdx].solution, fba.objectives[fba.activeObjIdx].value = sol, numpy.NaN for r in fba.reactions: rid = r.getId() if rid in sol: r.value = sol[rid] else: r.value = None scaled = False if with_reduced_costs == 'scaled': scaled = True with_reduced_costs = True elif with_reduced_costs == 'unscaled': scaled = False with_reduced_costs = True else: with_reduced_costs = False if objval != None and sol != {} and with_reduced_costs: RC = glpk_getReducedCosts(lp, scaled=scaled) setReducedCosts(fba, RC) else: setReducedCosts(fba, {})
[docs]def glpk_getOptimalSolution(c): """ From a GLPK model extract a tuple of solution, ObjFuncName and ObjFuncVal """ s_val = [] s_name = [] fba_sol = {} objf_name = None objf_val = None try: objf_name = c.obj.name objf_val = c.obj.value for n in c.cols: fba_sol.update({n.name : n.value}) except Exception as ex: print(ex) print('WARNING: No solution to get') s_val = [] s_name = [] fba_sol = {} objf_val = None del s_name,s_val return fba_sol, objf_name, objf_val
[docs]def glpk_getReducedCosts(c, scaled=False): """ Extract ReducedCosts from LP and return as a dictionary 'Rid' : reduced cost - *c* a GLPK LP object - *scaled* scale the reduced cost by the optimal flux value """ s_name = [] r_costs = [] s_val = [] for c_ in c.cols: s_name.append(c_.name) r_costs.append(c_.dual) s_val.append(c_.value) objf_val = c.obj.value output = {} for v in range(len(s_name)): if scaled: try: r_val = r_costs[v]*s_val[v]/objf_val except: r_val = float('nan') else: r_val = r_costs[v] output.update({s_name[v] : r_val}) del s_name, r_costs, s_val return output
[docs]def glpk_FluxVariabilityAnalysis(fba, selected_reactions=None, pre_opt=True, tol=None, objF2constr=True, rhs_sense='lower', optPercentage=100.0, work_dir=None, quiet=True, debug=False, oldlpgen=False, markupmodel=True, default_on_fail=False, roundoff_span=10, method='s'): """ Perform a flux variability analysis on an fba model: - *fba* an FBA model object - *selected reactions* [default=None] means use all reactions otherwise use the reactions listed here - *pre_opt* [default=True] attempt to presolve the FBA and report its results in the ouput, if this is disabled and *objF2constr* is True then the vid/value of the current active objective is used - *tol* [default=None] do not floor/ceiling the objective function constraint, otherwise round of to *tol* - *rhs_sense* [default='lower'] means objC >= objVal the inequality to use for the objective constraint can also be *upper* or *equal* - *optPercentage* [default=100.0] means the percentage optimal value to use for the RHS of the objective constraint: optimal_value*(optPercentage/100.0) - *work_dir* [default=None] the FVA working directory for temporary files default = cwd+fva - *debug* [default=False] if True write out all the intermediate FVA LP's into work_dir - *quiet* [default=False] if enabled supress CPLEX output - *objF2constr* [default=True] add the model objective function as a constraint using rhs_sense etc. If this is True with pre_opt=False then the id/value of the active objective is used to form the constraint - *markupmodel* [default=True] add the values returned by the fva to the reaction.fva_min and reaction.fva_max - *default_on_fail* [default=False] if *pre_opt* is enabled replace a failed minimum/maximum with the solution value - *roundoff_span* [default=10] number of digits is round off (not individual min/max values) - *method* [default='s'] select the GLPK solver method, see the GLPK documentation for details - 's': simplex - 'i': interior - 'e': exact Returns an array with columns Reaction, Reduced Costs, Variability Min, Variability Max, abs(Max-Min), MinStatus, MaxStatus and a list containing the row names. """ if work_dir == None: work_dir = os.getcwd() else: assert os.path.exists(work_dir), '\nWhat did you think would happen now!' if debug: debug_dir = os.path.join(work_dir,'DEBUG') if not os.path.exists(debug_dir): os.mkdir(debug_dir) s2time = time.time() # generate a presolution print('GLPK is using solver option: "{}"'.format(method)) cpx = OPTIMAL_PRESOLUTION = None pre_sol = pre_oid = pre_oval = None REDUCED_COSTS = {} cpx, pre_sol, pre_oid, pre_oval, OPTIMAL_PRESOLUTION, REDUCED_COSTS = glpk_func_GetCPXandPresolve(fba, pre_opt, objF2constr, quiet=quiet, oldlpgen=oldlpgen, method=method) # if required add the objective function as a constraint if objF2constr: glpk_func_SetObjectiveFunctionAsConstraint(cpx, rhs_sense, pre_oval, tol, optPercentage) if debug: cpx.write(cpxlp=os.path.join(debug_dir, 'FVA_base.lp')) # do the FVA NUM_FLX = len(fba.reactions) print('Total number of reactions: {}'.format(NUM_FLX)) if selected_reactions != None: rids = fba.getReactionIds() for r in selected_reactions: assert r in rids, "\n%s is not a valid reaction name" % r else: selected_reactions = fba.getReactionIds() NUM_FLX = len(selected_reactions) print('Number of user selected variables: {}'.format(NUM_FLX)) try: OUTPUT_ARRAY = numpy.zeros((NUM_FLX, 7), numpy.double) except AttributeError: OUTPUT_ARRAY = numpy.zeros((NUM_FLX, 7)) OUTPUT_NAMES = [] cntr = 0 tcnt = 0 # this is a memory hack --> prevents solver going berserk mcntr = 0 mcntr_cntrl = 3 mps_filename = '_{}_.mps'.format(str(time.time()).replace('.', '_')) cpx.write(mps=mps_filename) for Ridx in range(NUM_FLX): R = selected_reactions[Ridx] OUTPUT_NAMES.append(R) max_error_iter = 1 GOMIN = True gomin_cntr = 0 while GOMIN: MIN_STAT = 0 # MIN # TODO: bgoli: see whether this also works with 'minimize' glpk_setObjective(cpx, 'min%s' % R, [(1, R)], 'min', reset=True) ## cplx_setBounds(c, id, min=None, max=None) # think about this MIN_STAT = glpk_Solve(cpx, method=method) if MIN_STAT == 'LPS_OPT': MIN_STAT = 1 elif MIN_STAT == 'LPS_UNBND': MIN_STAT = 2 elif MIN_STAT == 'LPS_NOFEAS': MIN_STAT = 3 else: MIN_STAT = 4 if MIN_STAT == 1: # solved min_oval = cpx.obj.value elif MIN_STAT == 2: # unbound min_oval = -numpy.Inf elif MIN_STAT == 3: #min_oval = pre_sol[R] min_oval = numpy.NaN else: # other failure min_oval = numpy.NaN if debug: cpx.write(cpxlp=os.path.join(debug_dir, '%smin.lp' % R)) if MIN_STAT >= 3: if gomin_cntr == 0: cpx.erase() del cpx time.sleep(0.1) cpx = glpk.LPX(mps=mps_filename) gomin_cntr += 1 else: GOMIN = False else: GOMIN = False if gomin_cntr >= max_error_iter: GOMIN = False GOMAX = True gomax_cntr = 0 while GOMAX: MAX_STAT = 0 # MAX glpk_setObjective(cpx, 'max%s' % R, expr=None, sense='max', reset=False) ## cplx_setBounds(c, id, min=None, max=None) # think about this MAX_STAT = glpk_Solve(cpx, method=method) if MAX_STAT == 'LPS_OPT': MAX_STAT = 1 elif MAX_STAT == 'LPS_UNBND': MAX_STAT = 2 elif MAX_STAT == 'LPS_NOFEAS': MAX_STAT = 3 else: MAX_STAT = 4 if MAX_STAT == 1: # solved max_oval = cpx.obj.value elif MAX_STAT == 2: # unbound max_oval = numpy.Inf elif MAX_STAT == 3: # infeasable #max_oval = pre_sol[R] max_oval = numpy.NaN else: # other fail max_oval = numpy.NaN if MAX_STAT >= 3: if gomax_cntr == 0: cpx.erase() del cpx time.sleep(0.1) cpx = glpk.LPX(mps=mps_filename) gomax_cntr += 1 else: GOMAX = False else: GOMAX = False if gomax_cntr >= max_error_iter: GOMAX = False # check for solver going berserk if MIN_STAT > 1 and MAX_STAT > 1: print(Ridx) time.sleep(1) # enables using the default value as a solution if the solver fails if pre_opt and default_on_fail: if MAX_STAT > 1 and not MIN_STAT > 1: max_oval = pre_sol[R] if MIN_STAT > 1 and not MAX_STAT > 1: min_oval = pre_sol[R] if debug: cpx.write(cpxlp=os.path.join(debug_dir, '%smax.lp' % R)) OUTPUT_ARRAY[Ridx,0] = pre_sol[R] if R in REDUCED_COSTS: OUTPUT_ARRAY[Ridx,1] = REDUCED_COSTS[R] OUTPUT_ARRAY[Ridx,2] = min_oval OUTPUT_ARRAY[Ridx,3] = max_oval OUTPUT_ARRAY[Ridx,4] = round(abs(max_oval - min_oval), roundoff_span) OUTPUT_ARRAY[Ridx,5] = MIN_STAT OUTPUT_ARRAY[Ridx,6] = MAX_STAT if markupmodel: REAC = fba.getReaction(R) REAC.setValue(pre_sol[R]) REAC.fva_min = min_oval REAC.fva_max = max_oval REAC.fva_status = (MIN_STAT, MAX_STAT) if R in REDUCED_COSTS: REAC.reduced_costs = REDUCED_COSTS[R] if not quiet and MAX_STAT > 1 or MIN_STAT > 1: print('Solver fail for reaction \"{}\" (MIN_STAT: {} MAX_STAT: {})'.format(R, MIN_STAT, MAX_STAT)) cntr += 1 if cntr == 200: tcnt += cntr print('FVA has processed {} of {} reactions'.format(tcnt, NUM_FLX)) cntr = 0 os.remove(mps_filename) #cpx.write(cpxlp='thefinaldebug.lp') del cpx print('\nSinglecore FVA took: {} min (1 process)\n'.format((time.time()-s2time)/60.)) print('Output array has columns:') print('Reaction, Reduced Costs, Variability Min, Variability Max, abs(Max-Min), MinStatus, MaxStatus') return OUTPUT_ARRAY, OUTPUT_NAMES
[docs]def glpk_func_GetCPXandPresolve(fba, pre_opt, objF2constr, quiet=False, oldlpgen=True, with_reduced_costs='unscaled', method='s'): """ This is a utility function that does a presolve for FVA, MSAF etc. Generates properly formatted empty objects if pre_opt == False - *pre_opt* a boolean - *fba* a CBModel object - *objF2constr* add objective function as constraint - *quiet* [default=False] supress cplex output - *with_reduced_costs* [default='unscaled'] can be 'scaled' or 'unscaled' - *method* [default='s'] select the GLPK solver method, see the GLPK documentation for details - 's': simplex - 'i': interior - 'e': exact Returns: pre_sol, pre_oid, pre_oval, OPTIMAL_PRESOLUTION, REDUCED_COSTS """ cpx = glpk_constructLPfromFBA(fba, fname=None) OPTIMAL_PRESOLUTION = None pre_sol = pre_oid = pre_oval = None REDUCED_COSTS = {} if pre_opt: status = glpk_Solve(cpx, method=method) if glpk_getSolutionStatus(cpx) == 'LPS_OPT': print('Valid Presolution') OPTIMAL_PRESOLUTION = True pre_sol, pre_oid, pre_oval = glpk_getOptimalSolution(cpx) fba.objectives[fba.activeObjIdx].solution, fba.objectives[fba.activeObjIdx].value = pre_sol, pre_oval scaled = False if with_reduced_costs == 'scaled': REDUCED_COSTS = glpk_getReducedCosts(cpx, scaled=True) elif with_reduced_costs == 'unscaled': REDUCED_COSTS = glpk_getReducedCosts(cpx, scaled=False) else: print('Invalid Presolution, because {}'.format(glpk_getSolutionStatus(cpx) )) OPTIMAL_PRESOLUTION = False pre_sol = {} for r in fba.reactions: pre_sol.update({r.getId() : 0.0}) r.reduced_cost = 0.0 pre_oval = 0.0 pre_oid = 'None' raise RuntimeError('\nPresolve failed to optimize this model and cannot continue!') else: pre_sol = {} for r in fba.reactions: pre_sol.update({r.getId() : 0.0}) if objF2constr: pre_oval = fba.objectives[fba.activeObjIdx].value pre_oid = fba.objectives[fba.activeObjIdx].getId() else: pre_oval = 0.0 pre_oid = 'None' for r in fba.reactions: r.value = pre_sol[r.id] return cpx, pre_sol, pre_oid, pre_oval, OPTIMAL_PRESOLUTION, REDUCED_COSTS
[docs]def glpk_func_SetObjectiveFunctionAsConstraint(cpx, rhs_sense, oval, tol, optPercentage): """ Take the objective function and "optimum" value and add it as a constraint - *cpx* a cplex object - *oval* the objective value - *tol* [default=None] do not floor/ceiling the objective function constraint, otherwise round of to *tol* - *rhs_sense* [default='lower'] means objC >= objVal the inequality to use for the objective constraint can also be *upper* or *equal* - *optPercentage* [default=100.0] means the percentage optimal value to use for the RHS of the objective constraint: optimal_value*(optPercentage/100.0) """ # generate new constraint from old objective value (use non-zero coefficients) LCS = [] LCN = [] new_constraint = [] for v_ in range(len(cpx.cols)): LCN.append(cpx.cols[v_].name) LCS.append(cpx.obj[v_]) if LCS[-1] != 0.0: new_constraint.append((LCS[-1], LCN[-1])) if rhs_sense == 'equal': pass elif cpx.obj.maximize and (rhs_sense == 'upper'): print('\nWarning: RHS sense error: \"upper\" does not match \"maximize\" changing to \"lower\"') rhs_sense = 'lower' time.sleep(1) elif not cpx.obj.maximize and (rhs_sense == 'lower'): print('\nWarning: RHS sense error: \"lower\" does not match \"minimize\" changing to \"upper\"') rhs_sense = 'upper' time.sleep(1) else: print('\nRHS sense ok.') # set objective constraint if rhs_sense == 'equal': cplx_setSingleConstraint(cpx, 'ObjCstr', expr=new_constraint, sense='E', rhs=oval) ## cplx_setSingleConstraint(cpx, 'ObjCstr', expr=[(1, pre_oid)], sense='E', rhs=oval) elif rhs_sense == 'upper': if tol != None: ub = numpy.ceil(oval/tol)*tol*optPercentage/100.0 else: ub = oval*(optPercentage/100.0) glpk_setSingleConstraint(cpx, 'ObjCstr', expr=new_constraint, sense='L', rhs=ub) ## cplx_setSingleConstraint(cpx, 'ObjCstr', expr=[(1, pre_oid)], sense='L', rhs=ub) elif rhs_sense == 'lower': if tol != None: lb = numpy.floor(oval/tol)*tol*optPercentage/100.0 else: lb = oval*(optPercentage/100.0) glpk_setSingleConstraint(cpx, 'ObjCstr', expr=new_constraint, sense='G', rhs=lb) ## cplx_setSingleConstraint(cpx, 'ObjCstr', expr=[(1, pre_oid)], sense='G', rhs=lb) else: raise RuntimeError("\nInvalid RHS sense: %s" % rhs_sense)
[docs]def glpk_setSingleConstraint(c, cid, expr=[], sense='E', rhs=0.0): """ Sets a new sigle constraint to a GLPK model - *c* a GLPK instance - *cid* the constraint id - *expr* a list of (coefficient, name) pairs - *sense* [default='G'] LGE - *rhs* [default=0.0] the right hand side """ baseRows = len(c.rows) c.rows.add(1) ofnames = [a[1] for a in expr] ofval = [a[0] for a in expr] # name and coefficients newCon = [] for cidx_ in range(len(c.cols)): if c.cols[cidx_].name in ofnames: newCon.append((cidx_, ofval[ofnames.index(c.cols[cidx_].name)])) print(newCon[-1]) else: newCon.append((cidx_, 0.0)) c.rows[baseRows].name = cid c.rows[baseRows].matrix = newCon # sense and rhs if sense in ['<=','<','L']: c.rows[baseRows].bounds = None, rhs elif sense in ['>=','>','G']: c.rows[baseRows].bounds = rhs, None elif sense in ['=','E']: c.rows[baseRows].bounds = rhs else: raise RuntimeError('INFO: invalid operator: %s' % sense)
#c.write(cpxlp='test.setc.lp')
[docs]def glpk_setObjective(c, oid, expr=None, sense='maximize', reset=True): """ Set a new objective function note that there is a major memory leak in `c.variables.get_names()` whch is used when reset=True. If this is a problem use cplx_setObjective2 which takes *names* as an input: - *c* a GLPK LP object - *oid* the r_id of the flux to be optimized - *expr* a list of (coefficient, flux) pairs - *sense* 'maximize'/'minimize' - *reset* [default=True] reset all objective function coefficients to zero """ sense = sense.lower() if sense == 'max': sense = 'maximize' if sense == 'min': sense = 'minimize' if sense in ['maximise', 'minimise']: sense = sense.replace('se','ze') assert sense in ['maximize', 'minimize'], "\nsense must be ['maximize', 'minimize'] not %s" % sense if expr != None: ofnames = [a[1] for a in expr] ofval = [a[0] for a in expr] for cidx_ in range(len(c.cols)): if c.cols[cidx_].name in ofnames: c.obj[cidx_] = ofval[ofnames.index(c.cols[cidx_].name)] elif reset: c.obj[cidx_] = 0.0 else: c.obj[cidx_] = c.obj[cidx_] c.obj.name = oid if sense == 'minimize': c.obj.maximize = False if __DEBUG__: print('Set minimizing') else: c.obj.maximize = True if __DEBUG__: print('Set maximizing')
#c.write(cpxlp='test_obja.lp')
[docs]def getReducedCosts(fba): """ Get a dictionary of reduced costs for each reaction/flux """ output = {} for r in fba.reactions: output.update({r.getId() : r.reduced_cost}) return output
[docs]def setReducedCosts(fba, reduced_costs): """ For each reaction/flux, sets the attribute "reduced_cost" from a dictionary of reduced costs - *fba* an fba object - *reduced_costs* a dictionary of {reaction : value} pairs """ if len(reduced_costs) == 0: pass else: for r in fba.reactions: if r.getId() in reduced_costs: r.reduced_cost = reduced_costs[r.getId()] else: r.reduced_cost = None
#def cplx_getDualValues(c): #""" #Get the get the dual values of the solution #- *c* a CPLEX LP #Output is a dictionary of {name : value} pairs #""" #d_names = c.linear_constraints.get_names() #d_values = c.solution.get_dual_values() #output = {} #for j in range(len(d_names)): #output.update({d_names[j] : d_values[j]}) #return output #def cplx_getSensitivities(c): #""" #Get the sensitivities of each constraint on the objective function with input #- *c* a CPLEX LP #Output is a tuple of bound and objective sensitivities where the objective #sensitivity is described in the CPLEX reference manual as:: #... the objective sensitivity shows each variable, its reduced cost and the range over #which its objective function coefficient can vary without forcing a change #in the optimal basis. The current value of each objective coefficient is #also displayed for reference. #- *objective coefficient sensitivity* {flux : (reduced_cost, lower_obj_sensitivity, coeff_value, upper_obj_sensitivity)} #- *rhs sensitivity* {constraint : (low, value, high)} #- *bound sensitivity ranges* {flux : (lb_low, lb_high, ub_low, ub_high)} #""" #SENSE_RHS = {} #SENSE_BND = {} #SENSE_OBJ = {} #c_names = c.linear_constraints.get_names() #rhs_val = c.linear_constraints.get_rhs() #j_names = c.variables.get_names() #rhs_sense = c.solution.sensitivity.rhs() #bnd_sense = c.solution.sensitivity.bounds() #obj_sense = c.solution.sensitivity.objective() #obj_coeff = c.objective.get_linear() #red_cost = c.solution.get_reduced_costs() #for r in range(c.variables.get_num()): #SENSE_BND.update({j_names[r] : (bnd_sense[r][0], bnd_sense[r][1], bnd_sense[r][2], bnd_sense[r][3])}) #SENSE_OBJ.update({j_names[r] : (red_cost[r], obj_sense[r][0], obj_coeff[r], obj_sense[r][1])}) #for s in range(c.linear_constraints.get_num()): #SENSE_RHS.update({c_names[s] : (rhs_sense[s][0], rhs_val[s], rhs_sense[s][1])}) #return (SENSE_OBJ, SENSE_RHS, SENSE_BND)
[docs]def glpk_MinimizeSumOfAbsFluxes(fba, selected_reactions=None, pre_opt=True, tol=None, objF2constr=True, rhs_sense='lower', optPercentage=100.0, work_dir=None, quiet=False, debug=False, objective_coefficients={}, return_lp_obj=False, oldlpgen=False, with_reduced_costs=None, method='s'): """ Minimize the sum of absolute fluxes sum(abs(J1) + abs(J2) + abs(J3) ... abs(Jn)) by adding two constraints per flux and a variable representing the absolute value: Min: Ci abs_Ji Ji - abs_Ji <= 0 Ji + abs_Ji >= 0 Such that: NJi = 0 Jopt = opt returns the value of the flux minimization objective function (not the model objective function which remains unchanged from) Arguments: - *fba* an FBA model object - *selected reactions* [default=None] means use all reactions otherwise use the reactions listed here - *pre_opt* [default=True] attempt to presolve the FBA and report its results in the ouput, if this is disabled and *objF2constr* is True then the vid/value of the current active objective is used - *tol* [default=None] do not floor/ceiling the objective function constraint, otherwise round of to *tol* - *rhs_sense* [default='lower'] means objC >= objVal the inequality to use for the objective constraint can also be *upper* or *equal* - *optPercentage* [default=100.0] means the percentage optimal value to use for the RHS of the objective constraint: optimal_value*(optPercentage/100.0) - *work_dir* [default=None] the MSAF working directory for temporary files default = cwd+fva - *debug* [default=False] if True write out all the intermediate MSAF LP's into work_dir - *quiet* [default=False] if enabled supress CPLEX output - *objF2constr* [default=True] add the model objective function as a constraint using rhs_sense etc. If this is True with pre_opt=False then the id/value of the active objective is used to form the constraint - *objective_coefficients* [default={}] a dictionary of (reaction_id : float) pairs that provide the are introduced as objective coefficients to the absolute flux value. Note that the default value of the coefficient (non-specified) is +1. - *return_lp_obj* [default=False] off by default when enabled it returns the CPLEX LP object - *with_reduced_costs* [default=None] if not None should be 'scaled' or 'unscaled' - *method* [default='s'] select the GLPK solver method, see the GLPK documentation for details - 's': simplex - 'i': interior - 'e': exact With outputs: - *fba* an update instance of a CBModel. Note that the FBA model objective function value is the original value set as a constraint """ if with_reduced_costs == 'scaled': fba.SCALED_REDUCED_COSTS = True elif with_reduced_costs == 'unscaled': fba.SCALED_REDUCED_COSTS = False elif fba.SCALED_REDUCED_COSTS == True: with_reduced_costs = 'scaled' elif fba.SCALED_REDUCED_COSTS == False: with_reduced_costs = 'unscaled' else: with_reduced_costs = None print('\nINFO: using \"{}\" reduced costs.\n'.format(with_reduced_costs)) if work_dir == None: work_dir = os.getcwd() else: assert os.path.exists(work_dir), '\nWhat did you think would happen now!' if debug: debug_dir = os.path.join(work_dir,'DEBUG') if not os.path.exists(debug_dir): os.mkdir(debug_dir) # generate a presolution print('GLPK is using solver option: "{}"'.format(method)) cpx = OPTIMAL_PRESOLUTION = None pre_sol = pre_oid = pre_oval = None REDUCED_COSTS = {} cpx, pre_sol, pre_oid, pre_oval, OPTIMAL_PRESOLUTION, REDUCED_COSTS = glpk_func_GetCPXandPresolve(fba, pre_opt, objF2constr, quiet=quiet, oldlpgen=oldlpgen, with_reduced_costs=with_reduced_costs, method=method) # if required add the objective function as a constraint if objF2constr: glpk_func_SetObjectiveFunctionAsConstraint(cpx, rhs_sense, pre_oval, tol, optPercentage) STORED_OPT = None if pre_opt: STORED_OPT = pre_oval else: STORED_OPT = fba.getActiveObjective().getValue() # minimize the absolute sum of fluxes if selected_reactions != None: rids = fba.getReactionIds() for r in selected_reactions: assert r in rids, "\n%s is not a valid reaction name" % r else: selected_reactions = fba.getReactionIds() # this removes the objective function ## fba_obj_ids = fba.getActiveObjective().getFluxObjectiveReactions() ## print fba_obj_ids ## for R in fba_obj_ids: ## if R in selected_reactions: ## selected_reactions.pop(selected_reactions.index(R)) ## del R, fba_obj_ids NUM_FLX = len(selected_reactions) print('Total number of reactions: {}'.format(NUM_FLX)) abs_selected_reactions = ['abs_%s' % r for r in selected_reactions] # absJ implicitly assumed to be absJ >= 0 #C#cpx.variables.add(names=abs_selected_reactions) basecols = len(cpx.cols) cpx.cols.add(len(abs_selected_reactions)) for v_ in range(len(abs_selected_reactions)): cpx.cols[basecols+v_].name = abs_selected_reactions[v_] cpx.cols[basecols+v_].bounds = 0, None """ J - abs_J <= 0 J + abs_J >= 0 """ baserows = len(cpx.rows) #tnew = time.time() rowNames = tuple([c.name for c in cpx.rows]) colNames = tuple([c.name for c in cpx.cols]) cRange = range(baserows, baserows+(len(selected_reactions))) rcntr = 0 cpx.rows.add(len(selected_reactions)*2) for r_ in cRange: # name and coefficients cpx.rows[r_].name = 'abs{}a'.format(selected_reactions[rcntr]) cpx.rows[r_].matrix = [(colNames.index(selected_reactions[rcntr]), 1),\ (colNames.index(abs_selected_reactions[rcntr]), -1)] cpx.rows[r_].bounds = None, 0 cpx.rows[r_+len(selected_reactions)].name = 'abs{}b'.format(selected_reactions[rcntr]) cpx.rows[r_+len(selected_reactions)].matrix = [(colNames.index(selected_reactions[rcntr]), 1),\ (colNames.index(abs_selected_reactions[rcntr]), 1)] cpx.rows[r_+len(selected_reactions)].bounds = 0, None rcntr += 1 glpk_setObjective(cpx, 'MAFS', [(1, j) for j in abs_selected_reactions], 'min', reset=True) if debug: cpx.write(cpxlp=os.path.join(debug_dir, 'MSAF_base_(%s).lp' % time.time())) #cplx_writeLPtoLPTfile(cpx, , title=None, Dir=debug_dir) ## cplx_writeLPtoLPTfile(cpx, 'MSAF_base_%s' % time.time() , title=None, Dir=debug_dir) glpk_Solve(cpx, method=method) glpk_setFBAsolutionToModel(fba, cpx, with_reduced_costs=with_reduced_costs) glpk_setSolutionStatusToModel(fba, cpx) #cpx.write(cpxlp='test.lp'); return cpx minSum = cpx.obj.value fba.setAnnotation('min_flux_sum', minSum) fba.getActiveObjective().setValue(STORED_OPT) print('\nMinimizeSumOfAbsFluxes objective value: {}\n'.format(minSum)) if quiet: pass #cplx_setOutputStreams(cpx, mode='default') if not return_lp_obj: return minSum else: return cpx
[docs]def glpk_writeLPtoLPTfile(c, filename, title=None, Dir=None): """ Write out a GLPK model as an LP format file """ if Dir != None: filename = os.path.join(Dir, filename) #if title != None: #c.set_problem_name(title) #c.write(filename+'.lp', filetype='lp') c.write(cpxlp=filename+'.lp') print('LP output as {}'.format(filename+'.lp'))