Source code for cbmpy.CBRead

"""
CBMPy: CBRead module
====================
PySCeS Constraint Based Modelling (http://cbmpy.sourceforge.net)
Copyright (C) 2009-2017 Brett G. Olivier, VU University Amsterdam, Amsterdam, The Netherlands

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>

Author: Brett G. Olivier
Contact email: bgoli@users.sourceforge.net
Last edit: $Author: bgoli $ ($Id: CBRead.py 575 2017-04-13 12:18:44Z bgoli $)

"""

# preparing for Python 3 port
from __future__ import division, print_function
from __future__ import absolute_import
#from __future__ import unicode_literals

#__doc__ = """
#CBMPy: CBRead module
#"""


import os, time, numpy
# this is a hack that needs to be streamlined a bit
try:
    import cStringIO as csio
except ImportError:
    import io as csio
from . import CBXML, CBModel

from .CBConfig import __CBCONFIG__ as __CBCONFIG__
__DEBUG__ = __CBCONFIG__['DEBUG']
__version__ = __CBCONFIG__['VERSION']

##  try:
    ##  import psyco
    ##  psyco.full()
##  except:
    ##  pass

_HAVE_SYMPY_ = None
try:
    import sympy
    if int(sympy.__version__.split('.')[1]) >= 7 and int(sympy.__version__.split('.')[2]) >= 4:
        HAVE_SYMPY = True
    else:
        del sympy
except ImportError:
    print('Rational IO not available')
    _HAVE_SYMPY_ = False
_HAVE_SYMPY_ = None
try:
    import h5py
    _HAVE_HD5_ = True
except ImportError:
    _HAVE_HD5_ = False

_HAVE_XLRD_ = False
try:
    import xlrd
    _HAVE_XLRD_ = True
except ImportError:
    print('\nINFO: No xlrd module available, Excel spreadsheet reading disabled')



__example_models__ = {'cbmpy_test_core' : 'core_memesa_model.l3.xml',
                   'cbmpy_test_ecoli' : 'Ecoli_iJR904.glc.l3.xml',
                   }
__example_model_path__ = os.path.join(__CBCONFIG__['CBMPY_DIR'], 'models')

if not os.path.exists(os.path.join(__example_model_path__, 'core_memesa_model.l3.xml')) or\
   not os.path.exists(os.path.join(__example_model_path__, 'Ecoli_iJR904.glc.l3.xml')):
    import zipfile
    print('Installing default models ...')
    zfile = zipfile.ZipFile(os.path.join(__example_model_path__, 'default_models.zip.py'), allowZip64=True)
    zfile.extractall(path=__example_model_path__)
    zfile.close()
    del zipfile, zfile


[docs]def readSBML3FBC(fname, work_dir=None, return_sbml_model=False, xoptions={'validate' : False}, scan_notes_gpr=True): """ Read in an SBML Level 3 file with FBC annotation where and return either a CBM model object - *fname* is the filename - *work_dir* is the working directory - *return_sbml_model* deprecated and ignored please update code - *xoptions* special load options enable with option = True - *nogenes* do not load/process genes - *noannot* do not load/process any annotations - *validate* validate model and display errors and warnings before loading - *readcobra* read the cobra annotation - *read_model_string* [default=False] read the model from a string (instead of a filename) containing an SBML document - *scan_notes_gpr* [default=True] if the model is loaded and no genes are__example_models__can the <notes> field for GPR associationa """ if fname in __example_models__: fname = __example_models__[fname] print(fname) fname = os.path.join(__example_model_path__, fname) xmod = CBXML.sbml_readSBML3FBC(fname, work_dir, return_sbml_model, xoptions) if scan_notes_gpr and len(xmod.getGeneIds()) == 0: print('INFO: no standard gene encoding detected, attempting to load from annotations.') xmod.createGeneAssociationsFromAnnotations() return xmod
[docs]def readCOBRASBML(fname, work_dir=None, return_sbml_model=False, delete_intermediate=False, fake_boundary_species_search=False,\ output_dir=None, skip_genes=False, scan_notes_gpr=True): """ Read in a COBRA format SBML Level 2 file with FBA annotation where and return either a CBM model object or a (cbm_mod, sbml_mod) pair if return_sbml_model=True - *fname* is the filename - *work_dir* is the working directory - *delete_intermediate* [default=False] delete the intermediate SBML Level 3 FBC file - *fake_boundary_species_search* [default=False] after looking for the boundary_condition of a species search for overloaded id's <id>_b - *output_dir* [default=None] the directory to output the intermediate SBML L3 files (if generated) default to input directory - *skip_genes* [default=False] do not load GPR data - *scan_notes_gpr* [default=True] if the model is loaded and no genes are detected the scan the <notes> field for GPR associationa """ xmod = CBXML.sbml_readCOBRASBML(fname, work_dir=work_dir, return_sbml_model=False, delete_intermediate=delete_intermediate, fake_boundary_species_search=fake_boundary_species_search, output_dir=output_dir, skip_genes=skip_genes) if scan_notes_gpr and len(xmod.getGeneIds()) == 0: xmod.createGeneAssociationsFromAnnotations() return xmod
[docs]def readSBML2FBA(fname, work_dir=None, return_sbml_model=False, fake_boundary_species_search=False, scan_notes_gpr=True): """ Read in an SBML Level 2 file with FBA annotation where: - *fname* is the filename - *work_dir* is the working directory if None then only fname is used - *return_sbml_model* [default=False] return a a (cbm_mod, sbml_mod) pair - *fake_boundary_species_search* [default=False] after looking for the boundary_condition of a species search for overloaded id's <id>_b - *scan_notes_gpr* [default=True] if the model is loaded and no genes are detected the scan the <notes> field for GPR associationa """ xmod = CBXML.sbml_readSBML2FBA(fname, work_dir, return_sbml_model=False, fake_boundary_species_search=fake_boundary_species_search) if scan_notes_gpr and len(xmod.getGeneIds()) == 0: xmod.createGeneAssociationsFromAnnotations() return xmod
def readLPtoList(fname, work_dir): NEW = False TYPE = None Object = [] Constr =[] Bounds = [] F = file(os.path.join(work_dir, fname), 'r') for l in F: if l == '' or l[:2] == '\\\\' or l == '\n' or l.strip() == 'END': print('skipping') if __DEBUG__: print(l) else: L = l.strip() if L == 'Maximize': TYPE = 'ObjFunc' NEW = True elif L == 'Subject To': TYPE = 'Constr' NEW = True elif L == 'Bounds': TYPE = 'Bounds' NEW = True else: NEW = False if TYPE == 'ObjFunc' and not NEW: Object.append(L) elif TYPE == 'Constr' and not NEW: Constr.append(L) elif TYPE == 'Bounds' and not NEW: Bounds.append(L) F.close() if __DEBUG__: print('ObjectiveLines') print(Object) print('ConstraintLines') print(Constr) print('BoundsLines') print(Bounds) return Object, Constr, Bounds
[docs]def readSK_FVA(filename): """ Read Stevens FVA results (opt.fva) file and return a list of dictionaries """ assert os.path.exists(filename), '\nGive me a break!\n' ## name = [] vari = [] F = file(filename, 'r') for l in F: L = l.split(':') Jn = L[0].strip() Vmin = L[1].strip() V = L[2].strip() V = V.split('--') Vmax = V[0].strip() Vstat = V[1].strip() ## name.append((Jn, Vstat)) ## vari.append((Vmin, Vmax)) vari.append({'name' : Jn, 'min' : Vmin, 'max' : Vmax, 'status' : Vstat }) if __DEBUG__: print(Jn, Vmin, Vmax, Vstat) return vari
## def readSK_vertexOld(fname, bigfile=False): ## """ ## Reads in Stevens vertex analysis file and returns: ## - a list of vertex vectors ## - a list of ray vectors ## - the basis of the lineality space as a list of vectors ## all vectors in terms of the column space of N ## """ ## assert _HAVE_SYMPY_, 'Install Sympy for rational IO support' ## assert os.path.exists(fname), 'Uhm exqueeze me ...' ## SK_vert_file = file(fname, 'r') ## VertOut = [] ## LinOut = [] ## RayOut = [] ## if bigfile: ## VertTmp = gzip.open('_vtx_.tmp.gz','wb', compresslevel=3) ## LinTmp = gzip.open('_lin_.tmp.gz','wb', compresslevel=3) ## RayTmp = gzip.open('_ray_.tmp.gz','wb', compresslevel=3) ## GOvert = False ## GOray = False ## GOlin = False ## lcntr = 0 ## lcntrtmp = 0 ## for l in SK_vert_file: ## lcntr += 1 ## if lcntr == 1000: ## print 'Processing vertex: %s' % (lcntr + lcntrtmp) ## lcntrtmp += lcntr ## lcntr = 0 ## if '* Lineality basis ' in l: ## GOvert = False ## GOray = False ## GOlin = True ## if '* Rays ' in l: ## GOvert = False ## GOray = True ## GOlin = False ## if '* Vertices ' in l: ## GOvert = True ## GOray = False ## GOlin = False ## if l[:2] != '* ': ## L = l.split() ## rowL = [] ## for c in L: ## c = c.strip() ## if c == '0': ## rnum = '0' ## rowL.append(0.0) ## else: ## rnum = sympy.Rational('%s' % c) ## rowL.append(rnum.evalf()) ## del rnum ## del L ## if GOlin: ## if bigfile: ## rowEnd = len(rowL) ## cntr = 0 ## for e in rowL: ## cntr += 1 ## if e == 0.0: ## LinTmp.write('0.0') ## else: ## LinTmp.write('%.14f' % e) ## if cntr == rowEnd: ## LinTmp.write('\n') ## else: ## LinTmp.write(',') ## else: ## LinOut.append(rowL) ## elif GOray: ## if bigfile: ## rowEnd = len(rowL) ## cntr = 0 ## for e in rowL: ## cntr += 1 ## if e == 0.0: ## RayTmp.write('0.0') ## else: ## RayTmp.write('%.14f' % e) ## if cntr == rowEnd: ## RayTmp.write('\n') ## else: ## RayTmp.write(',') ## else: ## RayOut.append(rowL) ## elif GOvert: ## if bigfile: ## rowEnd = len(rowL) ## cntr = 0 ## for e in rowL: ## cntr += 1 ## if e == 0.0: ## VertTmp.write('0.0') ## else: ## VertTmp.write('%.14f' % e) ## if cntr == rowEnd: ## VertTmp.write('\n') ## else: ## VertTmp.write(',') ## else: ## VertOut.append(rowL) ## del rowL ## print '\nProcessed %s vertices.\n' % (lcntr + lcntrtmp) ## SK_vert_file.close() ## if bigfile: ## VertTmp.close() ## RayTmp.close() ## LinTmp.close() ## VertTmp = gzip.open('_vtx_.tmp.gz','rb') ## LinTmp = gzip.open('_lin_.tmp.gz','rb') ## RayTmp = gzip.open('_ray_.tmp.gz','rb') ## return VertTmp, RayTmp, LinTmp ## else: ## print 'Lineality basis: %s' % len(LinOut) ## print 'Number of rays: %s' % len(RayOut) ## print 'Number of vertices: %s' % len(VertOut) ## return VertOut, RayOut, LinOut
[docs]def readSK_vertexOld(fname, bigfile=False, fast_rational=False, nformat='%.14f', compresslevel=3): """ Reads in Stevens vertex analysis file and returns, even more optimized for large datasets than the original. - a list of vertex vectors - a list of ray vectors - the basis of the lineality space as a list of vectors all vectors in terms of the column space of N """ import gzip if fast_rational: pass else: assert _HAVE_SYMPY_, 'Install Sympy for rational IO support' assert os.path.exists(fname), 'Uhm exqueeze me ...' print('\n**********\nreadSK_vertex options are:\n') print('bigfile: {}'.format(bigfile)) print('fast_rational: %s'.format(fast_rational)) print('nformat: {}'.format(nformat % 0.12345678901234567890)) print('**********\n') SK_vert_file = file(fname, 'r') VertOut = [] LinOut = [] RayOut = [] if bigfile: VertTmp = gzip.open('_vtx_.tmp.gz','wb', compresslevel=compresslevel) LinTmp = gzip.open('_lin_.tmp.gz','wb', compresslevel=compresslevel) RayTmp = gzip.open('_ray_.tmp.gz','wb', compresslevel=compresslevel) GOvert = False GOray = False GOlin = False lcntr = 0 lcntrtmp = 0 TZero = time.time() for l in SK_vert_file: lcntr += 1 if lcntr == 1000: print('Processing vertex: {} ({} min)'.format(lcntr + lcntrtmp, round((time.time()-TZero)/60.0,1))) lcntrtmp += lcntr lcntr = 0 if '* Lineality basis ' in l: GOvert = False GOray = False GOlin = True if '* Rays ' in l: GOvert = False GOray = True GOlin = False if '* Vertices ' in l: GOvert = True GOray = False GOlin = False if l[:2] != '* ': L = l.split() rowL = [] for c in L: rnum = None c = c.strip() if c == '0': rnum = '0' rowL.append(0.0) else: if not fast_rational: rnum = sympy.Rational('%s' % c) rowL.append(rnum.evalf()) ## print c, rnum.evalf() else: rnum = c.split('/') if len(rnum) == 1: rowL.append(float(rnum[0])) ## print c, float(rnum[0]) else: rowL.append(float(rnum[0])/float(rnum[1])) ## print c, float(rnum[0].strip())/float(rnum[1].strip()) del rnum del L rowL = tuple(rowL) if GOlin: if bigfile: rowEnd = len(rowL) cntr = 0 for e in rowL: cntr += 1 if e == 0.0: LinTmp.write('0.0') else: LinTmp.write(nformat % e) if cntr == rowEnd: LinTmp.write('\n') else: LinTmp.write(',') else: LinOut.append(rowL) elif GOray: if bigfile: rowEnd = len(rowL) cntr = 0 for e in rowL: cntr += 1 if e == 0.0: RayTmp.write('0.0') else: RayTmp.write(nformat % e) if cntr == rowEnd: RayTmp.write('\n') else: RayTmp.write(',') else: RayOut.append(rowL) elif GOvert: if bigfile: rowEnd = len(rowL) cntr = 0 for e in rowL: cntr += 1 if e == 0.0: VertTmp.write('0.0') else: VertTmp.write(nformat % e) if cntr == rowEnd: VertTmp.write('\n') else: VertTmp.write(',') else: VertOut.append(rowL) del rowL print('\nProcessed {} vertices.\n'.format(lcntr + lcntrtmp)) SK_vert_file.close() if bigfile: VertTmp.close() RayTmp.close() LinTmp.close() try: VertTmp = gzip.open('_vtx_.tmp.gz','rb') LinTmp = gzip.open('_lin_.tmp.gz','rb') RayTmp = gzip.open('_ray_.tmp.gz','rb') return VertTmp, RayTmp, LinTmp except Exception as ex: print(ex) print('\nReturning file names:') return '_vtx_.tmp.gz', '_lin_.tmp.gz', '_ray_.tmp.gz' else: print('Lineality basis: {}'.format(len(LinOut))) print('Number of rays: {}'.format(len(RayOut))) print('Number of vertices: {}'.format(len(VertOut))) return VertOut, RayOut, LinOut
[docs]def readSK_vertex(fname, bigfile=True, fast_rational=False, nformat='%.14f', compression=None, hdf5file=None): """ Reads in Stevens vertex analysis file: - *fname* the input filename (.all file that results from Stevens pipeline) - *bigfile* [default=True] this option is now always true and is left in for backwards compatability - *fast_rational* [default=False] by default off and uses SymPy for rational-->float conversion, when on uses float decomposition with a slight (2th decimal) decrease in accuracy - *nformat* [default='%.14f'] the number format used in output files - *compression* [default=None] compression to be used in hdf5 files can be one of [None, 'lzf', 'gz?', 'szip'] - *hdf5file* [default=None] if None then generic filename '_vtx_.tmp.hdf5' is uses otherwise <hdf5file>.hdf5 and returns an hdf5 *filename* of the results with a single group named **data** which countains datasets - vertices - rays - lin where all vectors are in terms of the column space of N. """ bigfile=True if not fast_rational: assert _HAVE_SYMPY_, 'Install Sympy for rational IO support' if bigfile: assert _HAVE_HD5_, 'Install h5py for large dataset support' assert os.path.exists(fname), 'Uhm exqueeze me ...' print('\n**********\nreadSK_vertex options are:\n') print('bigfile: {}'.format(bigfile)) print('fast_rational: {}'.format(fast_rational)) print('nformat: {}'.format(nformat % 0.12345678901234567890)) print('**********\n') SK_vert_file = file(fname, 'r') VCNTR = 0 LCNTR = 0 RCNTR = 0 CCNTR = 0 for l in SK_vert_file: if '* Vertices (' in l: VCNTR = long(l.replace('* Vertices (','').replace('vectors):','').strip()) break SK_vert_file.seek(0) for l in SK_vert_file: if '* Lineality basis (' in l: LCNTR = long(l.replace('* Lineality basis (','').replace('vectors):','').strip()) break SK_vert_file.seek(0) for l in SK_vert_file: if '* Rays (' in l: RCNTR = long(l.replace('* Rays (','').replace('vectors):','').strip()) break SK_vert_file.seek(0) for l in SK_vert_file: if l[:2] != '* ': L = l.split() CCNTR = len(L) del L break SK_vert_file.seek(0) print(RCNTR, LCNTR, VCNTR, CCNTR) VertOut = [] LinOut = [] RayOut = [] outFileName = '_vtx_.tmp.hdf5' if bigfile: if hdf5file != None: outFileName = hdf5file+'.'+str(compression)+'.hdf5' HD5out = h5py.File(outFileName,'w') if 'data' in HD5out: del HD5out['data'] Dgrp = HD5out.create_group('data') if VCNTR > 0: VertTmp = HD5out['data'].create_dataset('vertices', (VCNTR, CCNTR), dtype=numpy.double, compression=compression) if LCNTR > 0: LinTmp = HD5out['data'].create_dataset('lin', (LCNTR, CCNTR), dtype=numpy.double, compression=compression) if RCNTR > 0: RayTmp = HD5out['data'].create_dataset('rays', (RCNTR, CCNTR), dtype=numpy.double, compression=compression) GOvert = False GOray = False GOlin = False lcntr = 0 lcntrtmp = 0 TZero = time.time() vert_count = 0 lin_count = 0 ray_count = 0 ## data_row = numpy.zeros((1,CCNTR), 'd') print('\nStarting vertex mapping at {}\n'.format(time.strftime('%H:%M:%S'))) for l in SK_vert_file: lcntr += 1 if lcntr == 1000: print('Processing vertex: {}: {} percent @ {} minutes ({} estimated)'.format(lcntr + lcntrtmp, (float(lcntr + lcntrtmp)/float(VCNTR)*100.0), round((time.time()-TZero)/60.0,1), float(VCNTR)/float(lcntr + lcntrtmp)*round((time.time()-TZero)/60.0,1))) lcntrtmp += lcntr lcntr = 0 if '* Lineality basis ' in l and LCNTR > 0: GOvert = False GOray = False GOlin = True if '* Rays ' in l and RCNTR > 0: GOvert = False GOray = True GOlin = False if '* Vertices ' in l and VCNTR > 0: GOvert = True GOray = False GOlin = False if l[:2] != '* ': ## data_row[0,:] = 0.0 L = l.split() if GOlin: LinTmp[lin_count] = 0.0 for c in xrange(CCNTR): rnum = None val = L[c].strip() if val == '0': ## LinTmp[lin_count,c] = 0.0 pass else: if not fast_rational: rnum = sympy.Rational('%s' % val) LinTmp[lin_count,c] = rnum.evalf() else: rnum = val.split('/') if len(rnum) == 1: LinTmp[lin_count,c] = float(rnum[0]) else: LinTmp[lin_count,c] = float(rnum[0])/float(rnum[1]) lin_count += 1 elif GOray: RayTmp[ray_count] = 0.0 for c in xrange(CCNTR): rnum = None val = L[c].strip() if val == '0': ## RayTmp[ray_count,c] = 0.0 pass else: if not fast_rational: rnum = sympy.Rational('%s' % val) RayTmp[ray_count,c] = rnum.evalf() else: rnum = val.split('/') if len(rnum) == 1: RayTmp[ray_count,c] = float(rnum[0]) else: RayTmp[ray_count,c] = float(rnum[0])/float(rnum[1]) ray_count += 1 elif GOvert: VertTmp[vert_count] = 0.0 for c in xrange(CCNTR): ## print 'lin_count',lin_count,c ## print 'ray_count',ray_count,c ## print 'vert_count',vert_count,c rnum = None try: val = L[c].strip() except Exception as ex: print(ex) print('\nError reading data: your file may be corrupt\n') if val == '0': ## VertTmp[vert_count,c] = 0.0 pass else: if fast_rational: rnum = val.split('/') if len(rnum) == 1: VertTmp[vert_count,c] = float(rnum[0]) else: VertTmp[vert_count,c] = float(rnum[0])/float(rnum[1]) else: rnum = sympy.Rational('%s' % val) VertTmp[vert_count,c] = rnum.evalf() vert_count += 1 print('\nProcessed {} vectors.\n'.format(lcntr + lcntrtmp)) SK_vert_file.close() if bigfile: HD5out.close() ## HD5out = h5py.File(outFileName,'r') return outFileName else: print('Lineality basis: {}'.format(len(LinOut))) print('Number of rays: {}'.format(len(RayOut))) print('Number of vertices: {}'.format(len(VertOut))) return VertOut, RayOut, LinOut
[docs]def readExcel97Model(xlname, write_sbml=True, sbml_level=3, return_dictionaries=False): """ Reads a model encoded as an Excel97 workbook and returns it as a CBMPy model object and SBML file. Note the workbook must be formatted exactly like those produced by cbm.writeModelToExcel97(). Note that reactions have to be defined in **both** the *reaction* and *network_react* sheets to be included in the model. - *xlpath* the filename of the Excel workbook - *return_model* [default=True] construct and return the CBMPy model - *write_sbml* [default=True] write the SBML file to fname - *return_dictionaries* [default=False] return the dictionaries constructed when reading the Excel file (in place of the model) - *sbml_level* [default=3] write the SBML file as either SBML L2 FBA or SBML L3 FBC file. """ if not _HAVE_XLRD_: print('\nERROR: Cannot read Excel file, XLRD package not available (http://pypi.python.org/pypi/xlrd)') return assert os.path.exists(xlname), '\nERROR: File "{}" does not exist'.format(xlpath) def c2s(c): """ Utility function converting a XLRD cell to a string """ #return str(c.value.strip()) return str(c.value) MSGLog = csio.StringIO() def logMsg(msg): """ Message logging utility """ print(msg) MSGLog.write('{}\n'.format(msg)) wb = xlrd.open_workbook(xlname) for s in wb.sheets(): logMsg('Sheet: {}'.format(s.name)) sI = wb.sheet_by_name('info') sM = wb.sheet_by_name('metabolites') sR = wb.sheet_by_name('reactions') sNR = wb.sheet_by_name('network_react') sNM = wb.sheet_by_name('network_metab') sS = wb.sheet_by_name('solution') sMI = wb.sheet_by_name('miriam') try: sComp = wb.sheet_by_name('compartments') except xlrd.XLRDError: sComp = None # INFO dInfo = {} for r_ in range(3): r = sI.row(r_) if r_ == 2: obj = {} obj['osense'] = c2s(r.pop(1)) coeff = True cout = [] for c in range(1, len(r)): if r[c].value != '' and c2s(r[c]) != '': if coeff: cout.append([r[c].value]) coeff = False else: cout[-1].append(c2s(r[c])) coeff = True obj['objflux'] = cout dInfo['objective'] = obj del cout, obj else: dInfo[c2s(r[0])] = c2s(r[1]) # metabolites mcolNames = [c2s(c) for c in sM.row(0)] dCompartments = {} dMet = {} for r_ in range(1, sM.nrows): r = sM.row(r_) met = {} annot = {} for c_ in range(1, len(mcolNames)): if c_ < 6: if mcolNames[c_] == 'charge': met[mcolNames[c_]] = r[c_].value elif mcolNames[c_] == 'fixed': met[mcolNames[c_]] = bool(r[c_].value) else: met[mcolNames[c_]] = c2s(r[c_]) if mcolNames[c_] == 'compartment': dCompartments[c2s(r[c_])] = {'id' : c2s(r[c_])} else: annot[mcolNames[c_]] = c2s(r[c_]) met['annot'] = annot met['id'] = c2s(r[0]) dMet[c2s(r[0])] = met # reactions rcolNames = [c2s(c) for c in sR.row(0)] dReact = {} for r_ in range(1, sR.nrows): r = sR.row(r_) reac = {} annot = {} for c_ in range(1, len(rcolNames)): if c_ < 6: if rcolNames[c_] == 'lowerbound' or rcolNames[c_] == 'upperbound': reac[rcolNames[c_]] = r[c_].value elif rcolNames[c_] == 'reversible': rev = None if r[c_].ctype == 1: rev = c2s(r[c_]) if rev in ['True', 'Yes', True, 'yes', 'true']: rev = True elif rev in ['False', 'No', False, 'no', 'True']: rev = False else: rev = bool(r[c_].value) reac[rcolNames[c_]] = rev else: reac[rcolNames[c_]] = c2s(r[c_]) if rcolNames[c_] == 'compartment': dCompartments[c2s(r[c_])] = {'id' : c2s(r[c_])} else: annot[rcolNames[c_]] = c2s(r[c_]) reac['annot'] = annot reac['id'] = c2s(r[0]) dReact[c2s(r[0])] = reac # reaction network nDict = {} reagentList = [] for n_ in range(0, sNR.nrows, 3): coeff = True sub = [] s = sNR.row(n_+1) for c in range(1, len(s)): if s[c].value != '' and c2s(s[c]) != '': if coeff: sub.append([s[c].value]) coeff = False else: sub[-1].append(c2s(s[c])) coeff = True if c2s(s[c]) not in reagentList: reagentList.append(c2s(s[c])) coeff = True prod = [] p = sNR.row(n_+2) for c in range(1, len(p)): if p[c].value != '' and c2s(p[c]) != '': if coeff: prod.append([p[c].value]) coeff = False else: prod[-1].append(c2s(p[c])) coeff = True if c2s(p[c]) not in reagentList: reagentList.append(c2s(p[c])) nDict[c2s(sNR.row(n_)[0])] = {'subs' : sub, 'prod' : prod } # compartments if sComp != None: # metabolites ccolNames = [c2s(c) for c in sComp.row(0)] for m_ in range(1, sComp.nrows): r = sComp.row(m_) comp = {} annot = {} for c_ in range(1, len(ccolNames)): if c_ < 6: if ccolNames[c_] == 'name': comp[ccolNames[c_]] = c2s(r[c_]) elif ccolNames[c_] == 'size': comp[ccolNames[c_]] = r[c_].value elif ccolNames[c_] == 'dimensions': comp[ccolNames[c_]] = r[c_].value elif ccolNames[c_] == '# species' or ccolNames[c_] == '# reactions': pass else: annot[ccolNames[c_]] = c2s(r[c_]) comp['annot'] = annot comp['id'] = c2s(r[0]) dCompartments[c2s(r[0])] = comp # MIRIAM mirNames = [c2s(c) for c in sMI.row(0)] dMir = {} for m_ in range(1, sMI.nrows): r = sMI.row(m_) qualD = {} for c_ in range(1, len(mirNames)): if c2s(r[c_]) != '': if mirNames[c_] in qualD: qualD[mirNames[c_]].append(c2s(r[c_])) else: qualD[mirNames[c_]] = [c2s(r[c_])] dMir[c2s(r[0])] = qualD # clean up if '' in dCompartments: dCompartments.pop('') if '' in nDict: nDict.pop('') if '' in dReact: dReact.pop('') if '' in dMet: dMet.pop('') # cross check reactions dReactError = {} nDictError = {} for r_ in tuple(dReact): if r_ not in nDict: logMsg('ERROR: Reaction "{}" listed but not defined ... removing.'.format(r_)) dReactError[r_] = dReact.pop(r_) for r_ in tuple(nDict): if r_ not in dReact: logMsg('ERROR: Reaction "{}" defined but not listed ... removing.'.format(r_)) nDictError[r_] = nDict.pop(r_) # cross check reagents versus species nDictReagentError = {} reagentError = [] for s_ in dMet: if s_ not in reagentList: logMsg('WARNING: Metabolite "{}" listed but not used.'.format(s_)) #dMetError[s_] = dMet.pop(s_) for s_ in (reagentList): if s_ not in dMet: logMsg('ERROR: Reagent "{}" used but not defined.'.format(s_)) reagentError.append(reagentList.pop(reagentList.index(s_))) # remove reactions that contain unknown reagents (undefined species) if len(reagentError) > 0: for e_ in tuple(nDict): reag = [a[1] for a in nDict[e_]['subs']] + [a[1] for a in nDict[e_]['prod']] for r_ in reag: if r_ in reagentError: logMsg('ERROR: Reaction "{}" contains an unknown reagent "{}" ... removing'.format(e_, r_)) nDictReagentError[e_] = { 'reaction' : dReact.pop(e_), 'stoich' : nDict.pop(e_) } if return_dictionaries: return (dInfo, dReact, dMet, nDict, dCompartments, dMir) # assemble CBMPy model # construct model object cmod = CBModel.Model(dInfo['id']) cmod.name = dInfo['name'] cmod.description = 'Model created from Excel spreadsheet: {}'.format(xlname) cmod.sourcefile = xlname cmod.setCreatedDate() # add compartments for c_ in dCompartments: CC = dCompartments[c_] if 'name' in CC: name = CC['name'] else: name = CC['id'] if 'size' in CC: size = CC['size'] else: size = 1.0 if 'dimensions' in CC: dimensions = CC['dimensions'] else: dimensions = 3 C = CBModel.Compartment(CC['id'], name, size, dimensions) if 'annot' in CC: for a_ in CC['annot']: C.setAnnotation(a_, CC['annot'][a_]) cmod.addCompartment(C) del c_, CC, name, size, dimensions # add metabolites for m_ in dMet: M = dMet[m_] S = CBModel.Species(M['id'], boundary=M['fixed'], name=M['name'], value=0.0,\ compartment=M['compartment'], charge=M['charge'], chemFormula=M['chemformula']) for a_ in M['annot']: S.setAnnotation(a_, M['annot'][a_]) cmod.addSpecies(S) del m_, M, S # add reactions for r_ in nDict: Rn = nDict[r_] Ri = dReact[r_] R = CBModel.Reaction(Ri['id'], Ri['name'], reversible=Ri['reversible']) for s_ in Rn['subs']: R.createReagent(s_[1], -s_[0]) for p_ in Rn['prod']: R.createReagent(p_[1], p_[0]) for a_ in Ri['annot']: R.setAnnotation(a_, Ri['annot'][a_]) cmod.addReaction(R, create_default_bounds=False) # cross reference reagents (shortcut) for s_ in cmod.species: tmp = s_.isReagentOf() del r_, s_, p_, R, Rn, Ri, tmp # add flux bounds for r_ in dReact: lb = dReact[r_]['lowerbound'] ub = dReact[r_]['upperbound'] try: lb = float(lb) except: if dReact[r_]['reversible']: lb = -numpy.inf else: lb = 0.0 logMsg('Undefined lower bound for reaction "{}" setting to {}'.format(r_, lb)) try: ub = float(ub) except: ub = numpy.inf logMsg('Undefined upper bound for reaction "{}" setting to {}'.format(r_, ub)) cmod.createReactionLowerBound(r_, lb) cmod.createReactionUpperBound(r_, ub) del r_, lb, ub # add objective function O = CBModel.Objective('obj1', dInfo['objective']['osense']) O.createFluxObjectives(dInfo['objective']['objflux']) cmod.addObjective(O, True) del O # markup with MIRIAM for x_ in cmod.species+cmod.reactions+cmod.compartments: xid = x_.getId() if xid in dMir: for q_ in dMir[xid]: for i_ in dMir[xid][q_]: #ent, mid = i_.rsplit('/',1) #x_.addMIRIAMannotation(q_, ent, mid) #if q_ == 'is': #qual = 'isA' #else: #qual = q_ if x_.miriam == None: x_.miriam = CBModel.MIRIAMannotation() x_.miriam.addIDorgURI(q_, i_) # ok lets play #try: #cmod.createGeneAssociationsFromAnnotations() #geneerrors = cmod.testGeneProteinAssociations() #logMsg('Successfully created gene associations from annotations.') #except Exception as ex: #logMsg(ex) #try: #cbm.analyzeModel(cmod) #logMsg('Successfully optimized model.') #except Exception as ex: #logMsg(ex) #try: #cbm.writeModelToExcel97(cmod, xlname.replace('.xls','')+'.new') #logMsg('Successfully wrote model to new Excel spreadsheet: "{}"'.format(xlname.replace('.xls','')+'.DEBUG.new.xls')) #except Exception as ex: #logMsg(ex) if write_sbml: try: if sbml_level == 3: CBXML.sbml_writeSBML3FBC(cmod, xlname+'.l3.xml', gpr_from_annot=False) logMsg('Successfully wrote model SBML3FBC file: "{}"'.format(xlname+'.l3.xml')) else: CBXML.sbml_writeSBML2FBA(cmod, xlname+'.l3.xml') logMsg('Successfully wrote model SBML2FBA file: "{}"'.format(xlname+'.l3.xml')) except Exception as ex: logMsg(ex) # write log to file """ F = file(os.path.join(cDir,'{}.log'.format(xlname)),'w') MSGLog.seek(0) F.write(MSGLog.read()) F.close() """ print('\n*****\nExcel97 Read Log\n*****\n') MSGLog.seek(0) print(MSGLog.read()) print('') MSGLog.close() return cmod