Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
Loading items

Target

Select target project
  • schubert/xbanalysis
1 result
Select Git revision
Loading items
Show changes
Commits on Source (2)
Showing with 11264 additions and 95 deletions
...@@ -2,6 +2,13 @@ ...@@ -2,6 +2,13 @@
XBA problem generation from SBML code metabolic/kinetic models. Support of Growth Balance Analysis (GBA), FBA and RBA XBA problem generation from SBML code metabolic/kinetic models. Support of Growth Balance Analysis (GBA), FBA and RBA
References to be included !!!
- sbo terms (sbo.owl)
- SBML and extensions for fbc, groups
- pFBA, FBA, RBA, FVA
- sbmlxdf
## Getting started ## Getting started
To make it easy for you to get started with GitLab, here's a list of recommended next steps. To make it easy for you to get started with GitLab, here's a list of recommended next steps.
......
...@@ -6,5 +6,7 @@ from .xba_function import XbaFunction ...@@ -6,5 +6,7 @@ from .xba_function import XbaFunction
from .xba_parameter import XbaParameter from .xba_parameter import XbaParameter
from .xba_reaction import XbaReaction from .xba_reaction import XbaReaction
from .xba_species import XbaSpecies from .xba_species import XbaSpecies
from .sbo_term import SboTerm
__all__ = ['XbaModel', 'XbaCompartment', 'XbaFunction', 'XbaParameter', 'XbaReaction', 'XbaSpecies'] __all__ = ['XbaModel', 'XbaCompartment', 'XbaFunction', 'XbaParameter',
'XbaReaction', 'XbaSpecies', 'SboTerm']
This diff is collapsed.
"""Implementation of SboTerm class.
Peter Schubert, HHU Duesseldorf, July 2022
"""
import re
import os
from functools import lru_cache
import xml.etree.ElementTree
ns = {'owl': 'http://www.w3.org/2002/07/owl#',
'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#',
'rdfs': 'http://www.w3.org/2000/01/rdf-schema#'}
local_sbo_tree = {
'SBO_0000000': {'parents': [], 'label': 'systems biology representation'},
'SBO_0000167': {'parents': ['SBO_0000375'], 'label': 'biochemical or transport reaction'},
'SBO_0000176': {'parents': ['SBO_0000167'], 'label': 'biochemical reaction'},
'SBO_0000179': {'parents': ['SBO_0000176'], 'label': 'degradation'},
'SBO_0000180': {'parents': ['SBO_0000176'], 'label': 'dissociation'},
'SBO_0000181': {'parents': ['SBO_0000176'], 'label': 'conformational transition'},
'SBO_0000182': {'parents': ['SBO_0000176'], 'label': 'conversion'},
'SBO_0000183': {'parents': ['SBO_0000205'], 'label': 'transcription'},
'SBO_0000184': {'parents': ['SBO_0000205'], 'label': 'translation'},
'SBO_0000204': {'parents': ['SBO_0000205'], 'label': 'DNA replication'},
'SBO_0000205': {'parents': ['SBO_0000375'], 'label': 'composite biochemical process'},
'SBO_0000231': {'parents': ['SBO_0000000'], 'label': 'occurring entity representation'},
'SBO_0000236': {'parents': ['SBO_0000000'], 'label': 'physical entity representation'},
'SBO_0000240': {'parents': ['SBO_0000236'], 'label': 'material entity'},
'SBO_0000245': {'parents': ['SBO_0000240'], 'label': 'macromolecule'},
'SBO_0000246': {'parents': ['SBO_0000245'], 'label': 'information macromolecule'},
'SBO_0000247': {'parents': ['SBO_0000240'], 'label': 'simple chemical'},
'SBO_0000250': {'parents': ['SBO_0000246'], 'label': 'ribonucleic acid'},
'SBO_0000251': {'parents': ['SBO_0000246'], 'label': 'deoxyribonucleic acid'},
'SBO_0000252': {'parents': ['SBO_0000246'], 'label': 'polypeptide chain'},
'SBO_0000327': {'parents': ['SBO_0000247'], 'label': 'non-macromolecular ion'},
'SBO_0000328': {'parents': ['SBO_0000247'], 'label': 'non-macromolecular radical'},
'SBO_0000375': {'parents': ['SBO_0000231'], 'label': 'process'},
'SBO_0000377': {'parents': ['SBO_0000176'], 'label': 'isomerisation'},
'SBO_0000395': {'parents': ['SBO_0000375'], 'label': 'encapsulating process'},
'SBO_0000589': {'parents': ['SBO_0000205'], 'label': 'genetic production'},
'SBO_0000627': {'parents': ['SBO_0000631'], 'label': 'exchange reaction'},
'SBO_0000628': {'parents': ['SBO_0000631'], 'label': 'demand reaction'},
'SBO_0000629': {'parents': ['SBO_0000395'], 'label': 'biomass production'},
'SBO_0000630': {'parents': ['SBO_0000395'], 'label': 'ATP maintenance'},
'SBO_0000631': {'parents': ['SBO_0000375'], 'label': 'pseudoreaction'},
'SBO_0000632': {'parents': ['SBO_0000631'], 'label': 'sink reaction'},
'SBO_0000656': {'parents': ['SBO_0000182'], 'label': 'activation'},
'SBO_0000672': {'parents': ['SBO_0000176'], 'label': 'spontaneous reaction'},
}
data_dir = 'data'
sbo_owl_fname = 'sbo.owl'
class SboTerm:
"""SboTerm variables
Implemented as a flyweight pattern and caching for is_in_batch lookups
Class builds a SBO hierachie tree, loaded from a 'sbo.owl' file,
if file not found, tree is constructed from a local dict.
Sbo term tree implemented as dictionary
Class provides methods to parse the tree
- retrieving parents
- retrieving children
- check if a sbo term belongs is within a specified tree branch
- retrieving subtrees (could be used to build default tree)
"""
_instances = {}
sbo_tree = None
def __init__(self, sbo_term):
"""Initialize SboTerm with sbo_term id
:param sbo_term: SBO term id
:type sbo_term: string in format 'SBO:0000xxx' or int
"""
self.sbo_term = sbo_term
def __new__(cls, sbo_term):
"""Create only one SboTerm per sbo_term.
Flyweight pattern
Improves performance on caching.
Also construct initial SBO Tree
:param sbo_term: SBO term id
:type sbo_term: string in format 'SBO:0000xxx' or int
"""
self = cls._instances.get(sbo_term)
if self is None:
self = cls._instances[sbo_term] = object.__new__(SboTerm)
if cls.sbo_tree is None:
cls.load_sbo_tree()
return self
@property
def sbo_term(self):
return self.__sbo_term
@sbo_term.setter
def sbo_term(self, sbo_term):
if type(sbo_term) is str:
self.__sbo_term = re.sub(':', '_', sbo_term)
else:
self.__sbo_term = f'SBO_{sbo_term:07d}'
def __repr__(self):
return self.sbo_term
def get_parents(self):
"""Get parent hierarchy for a specific node.
Supporting multiple parents of a node.
:return: ordered list of unique sbo_terms of parent hierarchy
:rtype: list of strings
"""
return [repr(node) for node in type(self)._get_parents(self, [])]
def get_children(self):
"""Get children subtree for a specific node.
:return: ordered list of unique sbo_terms for children subtree
:rtype: list of strings
"""
return [repr(node) for node in type(self)._get_children(self, [])]
@lru_cache
def is_in_branch(self, upper_sbo_term):
"""Check if sbo term is part subtree starting given value.
:param upper_sbo_term: SBO term that starts the branch
:rtype upper_sbo_term: string or int
:return: True if sbo_term part of the given branch
:rtype: bool
"""
return repr(SboTerm(upper_sbo_term)) in self.get_parents() + [self.sbo_term]
def get_subtree(self):
"""Retrieve a subtry including parents and children of give sbo term.
:return: all nodes forming the subtree with label and hierarchy information
:rtype: dict with nodes forming the subtree
"""
nodes = set(type(self)._get_children(self, [self]))
nodes = nodes.union(set(type(self)._get_parents(self, [])))
sub_tree = {}
for node in nodes:
sbo_term = node.__sbo_term
sub_tree[sbo_term] = {'parents': type(self).sbo_tree[sbo_term]['parents'],
'label': type(self).sbo_tree[sbo_term]['label']}
return sub_tree
@classmethod
def _get_parents(cls, node, branch):
"""Class method to recursively identify all parents.
:param node: sbo term of current node
:type node: SboTerm
:param branch: list of parents already identified
:type branch: list of SboTerm
:return: list of parents so far identified
:rtype: list of SboTerm
"""
if node.__sbo_term in SboTerm.sbo_tree:
for parent in SboTerm.sbo_tree[node.__sbo_term]['parents']:
sbo_parent = SboTerm(parent)
branch.append(sbo_parent)
cls._get_parents(sbo_parent, branch)
return branch
@classmethod
def _get_children(cls, node, branch):
"""Class method to recursively identify all children.
:param node: sbo term of current node
:type node: SboTerm
:param branch: list of children already identified
:type branch: list of SboTerm
:return: list of children so far identified
:rtype: list of SboTerm
"""
if node.__sbo_term in SboTerm.sbo_tree:
for child in SboTerm.sbo_tree[node.__sbo_term]['children']:
sbo_child = SboTerm(child)
branch.append(sbo_child)
cls._get_children(sbo_child, branch)
return branch
@classmethod
def load_sbo_tree(cls):
"""Class method to construct an SBO Tree from file or dict.
updated sbo file can be downloaded from https://www.ebi.ac.uk/sbo/main/download
and stored in the package under data/sbo.owl
"""
here = os.path.dirname(os.path.abspath(__file__))
sbo_owl_path = os.path.join(here, data_dir, sbo_owl_fname)
if os.path.exists(sbo_owl_path):
tree = xml.etree.ElementTree.parse(sbo_owl_path)
root = tree.getroot()
sbo_tree = {}
for sbo_class in root.findall('owl:Class', ns):
sbo_term_id = sbo_class.attrib['{' + ns['rdf'] + '}' + 'about']
subclasses_of = []
label = ''
comment = ''
for child in sbo_class:
if child.tag == '{' + ns['rdfs'] + '}' + 'subClassOf':
if ('{' + ns['rdf'] + '}' + 'resource') in child.attrib:
subclasses_of.append(child.attrib['{' + ns['rdf'] + '}' + 'resource'])
else:
for class_node in child:
if class_node.tag == '{' + ns['owl'] + '}' + 'Class':
subclasses_of.append(class_node.attrib['{' + ns['rdf'] + '}' + 'about'])
if child.tag == '{' + ns['rdfs'] + '}' + 'label':
label = child.text
if child.tag == '{' + ns['rdfs'] + '}' + 'comment':
comment = child.text
sbo_tree[sbo_term_id] = {'parents': subclasses_of, 'label': label, 'comment': comment}
else:
print('loading local sbo subtree')
sbo_tree = local_sbo_tree
# add children information, so we can parse up/downwards through the tree
for sbo_term in sbo_tree:
sbo_tree[sbo_term]['children'] = []
for sbo_term in sbo_tree:
for parent in sbo_tree[sbo_term]['parents']:
if sbo_term not in sbo_tree[parent]['children']:
sbo_tree[parent]['children'].append(sbo_term)
SboTerm.sbo_tree = sbo_tree
...@@ -3,6 +3,11 @@ ...@@ -3,6 +3,11 @@
Peter Schubert, HHU Duesseldorf, May 2022 Peter Schubert, HHU Duesseldorf, May 2022
""" """
import sbmlxdf
from xbanalysis.utils.utils import xml_rba_ns
from xbanalysis.model.sbo_term import SboTerm
class XbaCompartment: class XbaCompartment:
...@@ -15,3 +20,9 @@ class XbaCompartment: ...@@ -15,3 +20,9 @@ class XbaCompartment:
self.units = s_compartment['units'] self.units = s_compartment['units']
if 'spatialDimension' in s_compartment: if 'spatialDimension' in s_compartment:
self.dimension = s_compartment['spatialDimension'] self.dimension = s_compartment['spatialDimension']
if 'xmlAnnotation' in s_compartment:
attrs = sbmlxdf.extract_xml_attrs(s_compartment['xmlAnnotation'], ns=xml_rba_ns, token='density')
if 'frac_gcdw' in attrs:
self.rba_frac_gcdw = float(attrs['frac_gcdw'])
if 'sboterm' in s_compartment:
self.sboterm = SboTerm(s_compartment['sboterm'])
...@@ -41,7 +41,7 @@ class XbaModel: ...@@ -41,7 +41,7 @@ class XbaModel:
self.species = {sid: XbaSpecies(row) self.species = {sid: XbaSpecies(row)
for sid, row in model_dict['species'].iterrows()} for sid, row in model_dict['species'].iterrows()}
self.reactions = {rid: XbaReaction(row, self.functions, self.compartments) self.reactions = {rid: XbaReaction(row, self.species, self.functions, self.compartments)
for rid, row in model_dict['reactions'].iterrows()} for rid, row in model_dict['reactions'].iterrows()}
for r in self.reactions.values(): for r in self.reactions.values():
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
Peter Schubert, HHU Duesseldorf, May 2022 Peter Schubert, HHU Duesseldorf, May 2022
""" """
from xbanalysis.model.sbo_term import SboTerm
class XbaParameter: class XbaParameter:
...@@ -13,4 +14,4 @@ class XbaParameter: ...@@ -13,4 +14,4 @@ class XbaParameter:
self.constant = s_parameter['constant'] self.constant = s_parameter['constant']
self.units = s_parameter['units'] self.units = s_parameter['units']
if 'sboterm' in s_parameter: if 'sboterm' in s_parameter:
self.sboterm = s_parameter['sboterm'] self.sboterm = SboTerm(s_parameter['sboterm'])
\ No newline at end of file
...@@ -6,28 +6,31 @@ Peter Schubert, HHU Duesseldorf, May 2022 ...@@ -6,28 +6,31 @@ Peter Schubert, HHU Duesseldorf, May 2022
import re import re
import sbmlxdf import sbmlxdf
from xbanalysis.utils.utils import get_species_stoic, get_local_params, expand_kineticlaw from xbanalysis.utils.utils import get_species_stoic, get_local_params, expand_kineticlaw
from xbanalysis.utils.utils import xml_rba_ns
from xbanalysis.model.sbo_term import SboTerm
class XbaReaction: class XbaReaction:
def __init__(self, s_reaction, functions, compartments): def __init__(self, s_reaction, species, functions, compartments):
self.id = s_reaction.name self.id = s_reaction.name
self.name = s_reaction.get('name', self.id) self.name = s_reaction.get('name', self.id)
if 'sboterm' in s_reaction: if 'sboterm' in s_reaction:
self.sboterm = s_reaction['sboterm'] self.sboterm = SboTerm(s_reaction['sboterm'])
self.reaction_string = s_reaction['reactionString'] self.reaction_string = s_reaction['reactionString']
self.reversible = s_reaction['reversible'] self.reversible = s_reaction['reversible']
self.products = get_species_stoic(s_reaction['products'])
self.reactants = get_species_stoic(s_reaction['reactants']) self.reactants = get_species_stoic(s_reaction['reactants'])
self.products = get_species_stoic(s_reaction['products'])
self.metabs_only = True
for sid in list(self.reactants) + list(self.products):
if species[sid].sboterm.is_in_branch('SBO:0000247') is False:
self.metabs_only = False
if 'modifiers' in s_reaction: if 'modifiers' in s_reaction:
self.modifiers = get_species_stoic(s_reaction['modifiers']) self.modifiers = get_species_stoic(s_reaction['modifiers'])
else: else:
self.modifiers = {} self.modifiers = {}
if 'fbcLb' in s_reaction and 'fbcUb' in s_reaction:
self.fbc_lb = s_reaction['fbcLb']
self.fbc_ub = s_reaction['fbcUb']
self.local_params = get_local_params(s_reaction)
if 'kineticLaw' in s_reaction: if 'kineticLaw' in s_reaction:
self.local_params = get_local_params(s_reaction)
self.kinetic_law = s_reaction['kineticLaw'] if type(s_reaction['kineticLaw']) == str else '' self.kinetic_law = s_reaction['kineticLaw'] if type(s_reaction['kineticLaw']) == str else ''
# convert to numpy, replace local parameters with numerical values, inline lambda functions: # convert to numpy, replace local parameters with numerical values, inline lambda functions:
expanded_kl = sbmlxdf.misc.mathml2numpy(self.kinetic_law) expanded_kl = sbmlxdf.misc.mathml2numpy(self.kinetic_law)
...@@ -37,41 +40,56 @@ class XbaReaction: ...@@ -37,41 +40,56 @@ class XbaReaction:
for cid in compartments.keys(): for cid in compartments.keys():
if re.search(r'\b'+cid+r'\b', self.expanded_kl) is not None: if re.search(r'\b'+cid+r'\b', self.expanded_kl) is not None:
self.compartment = cid self.compartment = cid
# retrieve optional flux balance bounds, ensuring consistent bounds
if 'fbcLb' in s_reaction and 'fbcUb' in s_reaction:
self.fbc_lb = s_reaction['fbcLb']
self.fbc_ub = max(self.fbc_lb, s_reaction['fbcUb'])
if self.reversible is False:
self.fbc_lb = max(0, self.fbc_lb)
self.fbc_ub = max(0, self.fbc_ub)
if 'xmlAnnotation' in s_reaction:
attrs = sbmlxdf.extract_xml_attrs(s_reaction['xmlAnnotation'], ns=xml_rba_ns, token='kinetics')
if 'kapp_f_per_s' in attrs:
self.rba_kapp_f = float(attrs['kapp_f_per_s'])
if 'kapp_r_per_s' in attrs:
self.rba_kapp_r = float(attrs['kapp_r_per_s'])
if 'Km_ext_M' in attrs:
self.rba_km_ext = float(attrs['Km_ext_M'])
self.ps_product = '' self.ps_product = ''
self.enzyme = '' self.enzyme = None
def set_enzyme(self, species): def set_enzyme(self, species):
for sid in self.modifiers: for sid in self.modifiers:
if species[sid].sboterm in ('SBO:0000252', 'SBO:0000250'): if species[sid].sboterm.is_in_branch('SBO:0000246'):
self.enzyme = sid self.enzyme = sid
break break
def add_species_usage(self, species): def add_species_usage(self, species):
# for metabolic reactions add to the species consumed/produced the metabolic reaction id # for metabolic reactions add to the species consumed/produced the metabolic reaction id
# !!! we could also enter the Reaction Object references instead # !!! we could also enter the Reaction Object references instead
if self.sboterm in ('SBO:0000167', 'SBO:0000179', 'SBO:0000182'): if self.sboterm.is_in_branch('SBO:0000167'):
for sid, val in self.products.items(): for sid, val in self.products.items():
species[sid].add_m_reaction(self.id, val) species[sid].add_m_reaction(self.id, val)
for sid, val in self.reactants.items(): for sid, val in self.reactants.items():
species[sid].add_m_reaction(self.id, -val) species[sid].add_m_reaction(self.id, -val)
# protein degradation reactions # protein degradation reactions
if self.sboterm == 'SBO:0000179': if self.sboterm.is_in_branch('SBO:0000179'):
for sid, val in self.reactants.items(): for sid, val in self.reactants.items():
if species[sid].sboterm in ('SBO:0000252', 'SBO:0000250'): if species[sid].sboterm.is_in_branch('SBO:0000246'):
species[sid].add_p_degrad_rid(self.id, -val) species[sid].add_p_degrad_rid(self.id, -val)
# protein transitions # protein transitions
if self.sboterm == 'SBO:0000182': if self.sboterm.is_in_branch('SBO:0000182'):
for sid, val in self.reactants.items(): for sid, val in self.reactants.items():
if species[sid].sboterm in ('SBO:0000252', 'SBO:0000250'): if species[sid].sboterm.is_in_branch('SBO:0000246'):
species[sid].add_p_transition_rid(self.id, -val) species[sid].add_p_transition_rid(self.id, -val)
for sid, val in self.products.items(): for sid, val in self.products.items():
if species[sid].sboterm in ('SBO:0000252', 'SBO:0000250'): if species[sid].sboterm.is_in_branch('SBO:0000246'):
species[sid].add_p_transition_rid(self.id, val) species[sid].add_p_transition_rid(self.id, val)
# protein synthesis reactions # protein synthesis reactions
if self.sboterm in ('SBO:0000589', 'SBO:0000205'): if self.sboterm.is_in_branch('SBO:0000205'):
for sid, val in self.products.items(): for sid, val in self.products.items():
if species[sid].sboterm in ('SBO:0000252', 'SBO:0000250'): if species[sid].sboterm.is_in_branch('SBO:0000246'):
self.ps_product = sid self.ps_product = sid
species[sid].set_ps_rid(self.id) species[sid].set_ps_rid(self.id)
else: else:
......
...@@ -4,10 +4,8 @@ Peter Schubert, HHU Duesseldorf, May 2022 ...@@ -4,10 +4,8 @@ Peter Schubert, HHU Duesseldorf, May 2022
""" """
import sbmlxdf import sbmlxdf
from xbanalysis.utils.utils import xml_gba_ns, xml_rba_ns
from xbanalysis.model.sbo_term import SboTerm
xml_gba_ns = 'http://www.hhu.de/ccb/gba/ns'
xml_token = 'molecule'
class XbaSpecies: class XbaSpecies:
...@@ -16,7 +14,7 @@ class XbaSpecies: ...@@ -16,7 +14,7 @@ class XbaSpecies:
self.id = s_species.name self.id = s_species.name
self.name = s_species.get('name', self.id) self.name = s_species.get('name', self.id)
# set sboterm to 000247 to support Flux Balance Analysis # set sboterm to 000247 to support Flux Balance Analysis
self.sboterm = s_species.get('sboterm', 'SBO:0000247') self.sboterm = SboTerm(s_species.get('sboterm', 'SBO:0000247'))
self.compartment = s_species['compartment'] self.compartment = s_species['compartment']
self.constant = s_species['constant'] self.constant = s_species['constant']
self.boundary = s_species['boundaryCondition'] self.boundary = s_species['boundaryCondition']
...@@ -25,8 +23,19 @@ class XbaSpecies: ...@@ -25,8 +23,19 @@ class XbaSpecies:
if 'units' in s_species: if 'units' in s_species:
self.units = s_species['substanceUnits'] self.units = s_species['substanceUnits']
if 'xmlAnnotation' in s_species: if 'xmlAnnotation' in s_species:
attrs = sbmlxdf.extract_xml_attrs(s_species['xmlAnnotation'], ns=xml_gba_ns, token=xml_token) attrs = sbmlxdf.extract_xml_attrs(s_species['xmlAnnotation'], ns=xml_gba_ns, token='molecule')
if 'weight_Da' in attrs:
self.mw = float(attrs['weight_Da']) self.mw = float(attrs['weight_Da'])
attrs = sbmlxdf.extract_xml_attrs(s_species['xmlAnnotation'], ns=xml_rba_ns, token='costs')
if 'target_mmol_per_gcdw' in attrs:
self.rba_target = float(attrs.pop('target_mmol_per_gcdw'))
if 'process_machine' in attrs:
self.rba_process_machine = attrs.pop('process_machine')
if 'process_capacity_per_s' in attrs:
self.rba_process_capacity = float(attrs.pop('process_capacity_per_s'))
if len(attrs) > 0:
self.rba_process_costs = attrs
self.ps_rid = None self.ps_rid = None
self.m_reactions = {} self.m_reactions = {}
self.ps_reactions = {} self.ps_reactions = {}
......
"""Subpackage with XBA model classes """ """Subpackage with XBA model classes """
from .gba_problem import GbaProblem from .gba_problem import GbaProblem
from .rba_problem import RbaProblem
from .fba_problem import FbaProblem from .fba_problem import FbaProblem
from .lp_problem import LpProblem from .lp_problem import LpProblem
__all__ = ['GbaProblem', 'FbaProblem', 'LpProblem'] __all__ = ['GbaProblem', 'FbaProblem', 'LpProblem', 'RbaProblem']
...@@ -35,29 +35,33 @@ class FbaProblem: ...@@ -35,29 +35,33 @@ class FbaProblem:
:param xba_model: xba model with all required FBA parameters :param xba_model: xba model with all required FBA parameters
:type xba_model: XbaModel :type xba_model: XbaModel
""" """
self.is_fba_model = False
self.xba_model = xba_model self.xba_model = xba_model
self.metab_sids = [s.id for s in xba_model.species.values() if s.sboterm == 'SBO:0000247'] # identify reaction excluding protein synthesis and protein degradation
metabs = set(self.metab_sids) self.mr_rids = [r.id for r in xba_model.reactions.values()
# identify reactions that do not use enzymes in reactants and products if r.sboterm.is_in_branch('SBO:0000205') is False and
self.mrids = [rid for rid, rxn in xba_model.reactions.items() r.sboterm.is_in_branch('SBO:0000179') is False]
if len((set(rxn.products) | set(rxn.reactants)) - metabs) == 0] self.mrid2idx = {mrid: idx for idx, mrid in enumerate(self.mr_rids)}
self.mrid2idx = {mrid: idx for idx, mrid in enumerate(self.mrids)} self.reversible = np.array([self.xba_model.reactions[rid].reversible for rid in self.mr_rids])
self.reversible = np.array([self.xba_model.reactions[rid].reversible for rid in self.mrids])
self.s_mat = xba_model.get_stoic_matrix(self.metab_sids, self.mrids)
try:
self.flux_bounds = np.array([[xba_model.reactions[rid].fbc_lb for rid in self.mrids],
[xba_model.reactions[rid].fbc_ub for rid in self.mrids]])
except AttributeError:
print('Error: Model does not contain FBA flux bounds!')
return
if hasattr(xba_model, 'fbc_objectives') is False: sids = set()
print('Error: Model does not contain FBA objectives!') for rid in self.mr_rids:
for sid in xba_model.reactions[rid].reactants:
sids.add(sid)
for sid in xba_model.reactions[rid].products:
sids.add(sid)
self.sids = list(sids)
self.s_mat = xba_model.get_stoic_matrix(self.sids, self.mr_rids)
self.is_fba_model = self.check_fba_model(xba_model)
if self.is_fba_model is False:
print('Error: not a FBA model - missing parameters')
return return
self.obj_coefs = np.zeros(len(self.mrids))
self.flux_bounds = np.array([[xba_model.reactions[rid].fbc_lb for rid in self.mr_rids],
[xba_model.reactions[rid].fbc_ub for rid in self.mr_rids]])
self.obj_coefs = None
for oid, fbc_obj in xba_model.fbc_objectives.items(): for oid, fbc_obj in xba_model.fbc_objectives.items():
if fbc_obj.active is True: if fbc_obj.active is True:
self.obj_id = fbc_obj.id self.obj_id = fbc_obj.id
...@@ -65,21 +69,43 @@ class FbaProblem: ...@@ -65,21 +69,43 @@ class FbaProblem:
self.set_obj_coefs(fbc_obj.coefficiants) self.set_obj_coefs(fbc_obj.coefficiants)
break break
if hasattr(self, 'obj_id') is False: def check_fba_model(self, xba_model):
print('Error: Model does not contain an active FBA objective!') """Check if fba related data available in the XBA model
return
self.is_fba_model = True :param xba_model: Xba model with data extracted from SBML file
:type xba_model: XbaModel
:return: indicator if model contains data required for FBA problem formulation
:rtype: bool
"""
is_fba_model = True
r_check = {'flux lower bound': 'fbc_lb', 'flux upper bound': 'fbc_ub'}
for check, param in r_check.items():
missing = [rid for rid in self.mr_rids if hasattr(xba_model.reactions[rid], param) is False]
if len(missing) > 0:
print(f'missing {check} in:', missing)
is_fba_model = False
if hasattr(xba_model, 'fbc_objectives') is False:
print('missing flux objectives')
is_fba_model = False
else:
active_objs = [oid for oid in xba_model.fbc_objectives if xba_model.fbc_objectives[oid].active]
if len(active_objs) == 0:
print('missing an active flux objective')
is_fba_model = False
return is_fba_model
def combine_fwd_bwd(self, vector): def combine_fwd_bwd(self, vector):
"""combine forward and backward flux information. """combine forward and backward flux information.
:param vector: contains data on all reactions + backward information for reversible reactions :param vector: contains data on all reactions + backward information for reversible reactions
:type vector: 1D ndarray of shape (len(mrids) + sum(reversible)) :type vector: 1D ndarray of shape (len(mr_rids) + sum(reversible))
:return: combined flux information :return: combined flux information
:rtype: 1D ndarray of shape (len(mrids)) :rtype: 1D ndarray of shape (len(mr_rids))
""" """
n_cols = len(self.mrids) n_cols = len(self.mr_rids)
combined = vector[:n_cols] combined = vector[:n_cols]
combined[self.reversible] -= vector[n_cols:] combined[self.reversible] -= vector[n_cols:]
return combined return combined
...@@ -97,14 +123,14 @@ class FbaProblem: ...@@ -97,14 +123,14 @@ class FbaProblem:
return None return None
lp = LpProblem() lp = LpProblem()
# self.reversible = np.array([self.xba_model.reactions[rid].reversible for rid in self.mrids]) # self.reversible = np.array([self.xba_model.reactions[rid].reversible for rid in self.mr_rids])
fwd_bwd_s_mat = np.hstack((self.s_mat, -self.s_mat[:, self.reversible])) fwd_bwd_s_mat = np.hstack((self.s_mat, -self.s_mat[:, self.reversible]))
fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible])) fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible]))
fwd_bwd_bnds = np.hstack((np.fmax(self.flux_bounds, 0), fwd_bwd_bnds = np.hstack((np.fmax(self.flux_bounds, 0),
np.vstack((np.fmax(-self.flux_bounds[1, self.reversible], 0), np.vstack((np.fmax(-self.flux_bounds[1, self.reversible], 0),
np.fmax(-self.flux_bounds[0, self.reversible], 0))))) np.fmax(-self.flux_bounds[0, self.reversible], 0)))))
row_bnds = np.zeros((2, fwd_bwd_s_mat.shape[0])) row_bnds = np.zeros((fwd_bwd_s_mat.shape[0], 2))
lp.configure(fwd_bwd_obj_coefs, fwd_bwd_bnds, fwd_bwd_s_mat, row_bnds, self.obj_dir) lp.configure(fwd_bwd_obj_coefs, fwd_bwd_s_mat, row_bnds, fwd_bwd_bnds, self.obj_dir)
return lp return lp
def fba_optimize(self): def fba_optimize(self):
...@@ -115,7 +141,7 @@ class FbaProblem: ...@@ -115,7 +141,7 @@ class FbaProblem:
""" """
lp = self.create_fba_lp() lp = self.create_fba_lp()
if lp is None: if lp is None:
return {'simplex_status': 1, 'simplex_message': 'not an FBA model'} return {'success': False, 'message': 'not an FBA model'}
res = lp.solve() res = lp.solve()
del lp del lp
res['x'] = self.combine_fwd_bwd(res['x']) res['x'] = self.combine_fwd_bwd(res['x'])
...@@ -133,9 +159,9 @@ class FbaProblem: ...@@ -133,9 +159,9 @@ class FbaProblem:
""" """
lp = self.create_fba_lp() lp = self.create_fba_lp()
if lp is None: if lp is None:
return {'simplex_status': 1, 'simplex_message': 'not an FBA model'} return {'success': False, 'message': 'not an FBA model'}
res = lp.solve(short_result=True) res = lp.solve(short_result=False)
if res['simplex_status'] == 0: if res['success']:
# fix FBA objective as constraint # fix FBA objective as constraint
fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible])) fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible]))
if self.obj_dir == 'maximize': if self.obj_dir == 'maximize':
...@@ -144,7 +170,7 @@ class FbaProblem: ...@@ -144,7 +170,7 @@ class FbaProblem:
row_bds = np.array([np.nan, res['fun'] / fract_of_optimum]) row_bds = np.array([np.nan, res['fun'] / fract_of_optimum])
lp.add_constraint(fwd_bwd_obj_coefs, row_bds) lp.add_constraint(fwd_bwd_obj_coefs, row_bds)
# new LP objective is to minimize sum of fluxes # new LP objective is to minimize sum of fluxes
n_cols = len(self.mrids) n_cols = len(self.mr_rids)
lp.set_obj_coefs(np.ones(n_cols + sum(self.reversible))) lp.set_obj_coefs(np.ones(n_cols + sum(self.reversible)))
lp.set_obj_dir('minimize') lp.set_obj_dir('minimize')
res = lp.solve() res = lp.solve()
...@@ -165,13 +191,13 @@ class FbaProblem: ...@@ -165,13 +191,13 @@ class FbaProblem:
""" """
lp = self.create_fba_lp() lp = self.create_fba_lp()
if lp is None: if lp is None:
return {'simplex_status': 1, 'simplex_message': 'not an FBA model'} return {'success': False, 'message': 'not an FBA model'}
res = lp.solve(short_result=True) res = lp.solve(short_result=True)
if rids is None: if rids is None:
rids = self.mrids rids = self.mr_rids
flux_ranges = np.zeros((4, len(rids))) flux_ranges = np.zeros((4, len(rids)))
if res['simplex_status'] == 0: if res['success']:
# fix FBA objective as constraint # fix FBA objective as constraint
fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible])) fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible]))
if self.obj_dir == 'maximize': if self.obj_dir == 'maximize':
...@@ -180,7 +206,7 @@ class FbaProblem: ...@@ -180,7 +206,7 @@ class FbaProblem:
row_bds = np.array([np.nan, res['fun'] / fract_of_optimum]) row_bds = np.array([np.nan, res['fun'] / fract_of_optimum])
fba_constr_row = lp.add_constraint(fwd_bwd_obj_coefs, row_bds) fba_constr_row = lp.add_constraint(fwd_bwd_obj_coefs, row_bds)
n_cols = len(self.mrids) n_cols = len(self.mr_rids)
for idx, rid in enumerate(rids): for idx, rid in enumerate(rids):
# select reaction maximize/minimize flux # select reaction maximize/minimize flux
new_obj_coefs = np.zeros(n_cols) new_obj_coefs = np.zeros(n_cols)
...@@ -190,20 +216,20 @@ class FbaProblem: ...@@ -190,20 +216,20 @@ class FbaProblem:
lp.set_obj_dir('minimize') lp.set_obj_dir('minimize')
res = lp.solve(short_result=True) res = lp.solve(short_result=True)
if res['simplex_status'] == 0: if res['success']:
flux_ranges[0, idx] = res['fun'] flux_ranges[0, idx] = res['fun']
flux_ranges[2, idx] = lp.get_row_value(fba_constr_row) flux_ranges[2, idx] = lp.get_row_value(fba_constr_row)
else: else:
print(f"FVA min failed for {rid}, {res['simplex_message']}") print(f"FVA min failed for {rid}")
flux_ranges[0, idx] = np.nan flux_ranges[0, idx] = np.nan
lp.set_obj_dir('maximize') lp.set_obj_dir('maximize')
res = lp.solve(short_result=True) res = lp.solve(short_result=True)
if res['simplex_status'] == 0: if res['success']:
flux_ranges[1, idx] = res['fun'] flux_ranges[1, idx] = res['fun']
flux_ranges[3, idx] = lp.get_row_value(fba_constr_row) flux_ranges[3, idx] = lp.get_row_value(fba_constr_row)
else: else:
print(f"FVA max failed for {rid}, {res['simplex_message']}") print(f"FVA max failed for {rid}")
flux_ranges[1, idx] = np.nan flux_ranges[1, idx] = np.nan
del lp del lp
...@@ -228,7 +254,7 @@ class FbaProblem: ...@@ -228,7 +254,7 @@ class FbaProblem:
:returns: flux bounds configured in the FBA problem :returns: flux bounds configured in the FBA problem
:rtype: dict (key: reaction id, value: list with two float values for lower/upper bound) :rtype: dict (key: reaction id, value: list with two float values for lower/upper bound)
""" """
return {mrid: [bds[0], bds[1]] for mrid, bds in zip(self.mrids, self.flux_bounds.T)} return {mrid: [bds[0], bds[1]] for mrid, bds in zip(self.mr_rids, self.flux_bounds.T)}
def set_flux_bounds(self, flux_bounds): def set_flux_bounds(self, flux_bounds):
"""selectivly set lower and upper flux bounds for given reactions. """selectivly set lower and upper flux bounds for given reactions.
...@@ -255,6 +281,6 @@ class FbaProblem: ...@@ -255,6 +281,6 @@ class FbaProblem:
:param coefficiants: objective coefficients of selected reactions :param coefficiants: objective coefficients of selected reactions
:type coefficiants: dict (key: reaction id, value: float) :type coefficiants: dict (key: reaction id, value: float)
""" """
self.obj_coefs = np.zeros(len(self.mrids)) self.obj_coefs = np.zeros(len(self.mr_rids))
for rid, coef in coefficiants.items(): for rid, coef in coefficiants.items():
self.obj_coefs[self.mrid2idx[rid]] = coef self.obj_coefs[self.mrid2idx[rid]] = coef
...@@ -43,8 +43,8 @@ class GbaProblem: ...@@ -43,8 +43,8 @@ class GbaProblem:
# self.cache = True # self.cache = True
# metabolites and enzymes in the model # metabolites and enzymes in the model
metab_sids = [s.id for s in xba_model.species.values() if s.sboterm == 'SBO:0000247'] metab_sids = [s.id for s in xba_model.species.values() if s.sboterm.is_in_branch('SBO:0000247')]
enz_sids = [s.id for s in xba_model.species.values() if s.sboterm in ('SBO:0000250', 'SBO:0000252')] enz_sids = [s.id for s in xba_model.species.values() if s.sboterm.is_in_branch('SBO:0000246')]
# optimization variables # optimization variables
self.var_metab_sids = [sid for sid in metab_sids if xba_model.species[sid].constant is False] self.var_metab_sids = [sid for sid in metab_sids if xba_model.species[sid].constant is False]
...@@ -66,7 +66,7 @@ class GbaProblem: ...@@ -66,7 +66,7 @@ class GbaProblem:
# to facilitate matrix multiplication without further remapping # to facilitate matrix multiplication without further remapping
# - here we expect a one-to-one relationship between enzyme and corresponding protein synthesis reaction # - here we expect a one-to-one relationship between enzyme and corresponding protein synthesis reaction
self.mr_rids = [r.id for r in xba_model.reactions.values() self.mr_rids = [r.id for r in xba_model.reactions.values()
if r.sboterm in ('SBO:0000167', 'SBO:0000179', 'SBO:0000182')] if r.sboterm.is_in_branch('SBO:0000167')]
self.ps_rids = [xba_model.species[sid].ps_rid for sid in self.var_enz_sids] self.ps_rids = [xba_model.species[sid].ps_rid for sid in self.var_enz_sids]
self.mr_vols = np.array([xba_model.compartments[xba_model.reactions[rid].compartment].size self.mr_vols = np.array([xba_model.compartments[xba_model.reactions[rid].compartment].size
for rid in self.mr_rids]) for rid in self.mr_rids])
......
...@@ -69,7 +69,6 @@ class LpProblem: ...@@ -69,7 +69,6 @@ class LpProblem:
self.lp = glpk.glp_create_prob() self.lp = glpk.glp_create_prob()
self.ncols = 0 self.ncols = 0
self.nrows = 0 self.nrows = 0
self.simplex_result = None
self.res = {} self.res = {}
@staticmethod @staticmethod
...@@ -92,30 +91,30 @@ class LpProblem: ...@@ -92,30 +91,30 @@ class LpProblem:
else: else:
ub = 0.0 ub = 0.0
var_type = glpk.GLP_FX var_type = glpk.GLP_FX
elif not np.isfinite(lb): elif not np.isfinite(lb) and not np.isfinite(ub):
lb = 0.0 lb = 0.0
var_type = glpk.GLP_UP
elif not np.isfinite(ub):
ub = 0.0 ub = 0.0
var_type = glpk.GLP_LO var_type = glpk.GLP_FR
else: elif np.isfinite(ub):
lb = 0.0 lb = 0.0
var_type = glpk.GLP_UP
else:
ub = 0.0 ub = 0.0
var_type = glpk.GLP_FR var_type = glpk.GLP_LO
return var_type, lb, ub return var_type, lb, ub
def configure(self, obj_coefs, col_bnds, constr_coefs, row_bnds, direction='maximize'): def configure(self, obj_coefs, constr_coefs, row_bnds, col_bnds, direction='maximize'):
"""Configure the LP Problem. """Configure the LP Problem.
Note: glpk related indices start at 1 (not zero) Note: glpk related indices start at 1 (not zero)
:param obj_coefs: coefficients of objective function :param obj_coefs: coefficients of objective function
:type obj_coefs: 1D ndarray of shape (1, ncols) :type obj_coefs: 1D ndarray of shape (1, ncols)
:param col_bnds: lower and upper bounds of structural (col) variables
:type col_bnds: 2D ndarray of shape(2, ncols) (1st row: lower bounds, 2nd row: upper bounds)
:param constr_coefs: matrix with constraint coefficients :param constr_coefs: matrix with constraint coefficients
:type constr_coefs: 2D nparray with shape(nrows, ncols) :type constr_coefs: 2D nparray with shape(nrows, ncols)
:param row_bnds: lower and upper bounds of auxhiliary (row) variables :param row_bnds: lower and upper bounds of auxhiliary (row) variables
:type row_bnds: 2D ndarray of shape(2, nrows) (1st row: lower bounds, 2nd row: upper bounds) :type row_bnds: 2D ndarray of shape(nrows, 2) (1st col: lower bounds, 2nd col: upper bounds)
:param col_bnds: lower and upper bounds of structural (col) variables
:type col_bnds: 2D ndarray of shape(2, ncols) (1st row: lower bounds, 2nd row: upper bounds)
:param direction: direction for the optimization ('maximize' or 'minimize') :param direction: direction for the optimization ('maximize' or 'minimize')
:type direction: string :type direction: string
""" """
...@@ -123,7 +122,7 @@ class LpProblem: ...@@ -123,7 +122,7 @@ class LpProblem:
assert len(obj_coefs) == self.ncols assert len(obj_coefs) == self.ncols
assert col_bnds.shape == (2, self.ncols) assert col_bnds.shape == (2, self.ncols)
assert row_bnds.shape == (2, self.nrows) assert row_bnds.shape == (self.nrows, 2)
# configure objective function # configure objective function
glpk.glp_add_cols(self.lp, self.ncols) glpk.glp_add_cols(self.lp, self.ncols)
...@@ -137,7 +136,7 @@ class LpProblem: ...@@ -137,7 +136,7 @@ class LpProblem:
# configure constraints # configure constraints
glpk.glp_add_rows(self.lp, self.nrows) glpk.glp_add_rows(self.lp, self.nrows)
for i, lb_ub in enumerate(row_bnds.T): for i, lb_ub in enumerate(row_bnds):
var_type, lb, ub = self._get_variable_type(lb_ub) var_type, lb, ub = self._get_variable_type(lb_ub)
glpk.glp_set_row_bnds(self.lp, 1 + i, var_type, lb, ub) glpk.glp_set_row_bnds(self.lp, 1 + i, var_type, lb, ub)
constr_coefs_sparse = scipy.sparse.coo_matrix(constr_coefs) constr_coefs_sparse = scipy.sparse.coo_matrix(constr_coefs)
...@@ -251,11 +250,11 @@ class LpProblem: ...@@ -251,11 +250,11 @@ class LpProblem:
glpk.glp_scale_prob(self.lp, opt) glpk.glp_scale_prob(self.lp, opt)
# sjj = [glpk.glp_get_sjj(self.lp, 1+i) for i in range(glpk.glp_get_num_cols(self.lp))] # sjj = [glpk.glp_get_sjj(self.lp, 1+i) for i in range(glpk.glp_get_num_cols(self.lp))]
# rii = [glpk.glp_get_rii(self.lp, 1+i) for i in range(glpk.glp_get_num_rows(self.lp))] # rii = [glpk.glp_get_rii(self.lp, 1+i) for i in range(glpk.glp_get_num_rows(self.lp))]
self.simplex_result = glpk.glp_simplex(self.lp, None) simplex_result = glpk.glp_simplex(self.lp, None)
return self.results(short_result) return self.results(simplex_result, short_result)
def get_row_value(self, row): def get_row_value(self, row):
"""Retrieve right hand side of a constraint (auxiliary variable) """Retrieve right-hand side of a constraint (auxiliary variable)
:param row: index into constraints :param row: index into constraints
:type row: int :type row: int
...@@ -265,11 +264,13 @@ class LpProblem: ...@@ -265,11 +264,13 @@ class LpProblem:
assert row <= glpk.glp_get_num_rows(self.lp) assert row <= glpk.glp_get_num_rows(self.lp)
return glpk.glp_get_row_prim(self.lp, row) return glpk.glp_get_row_prim(self.lp, row)
def results(self, short_result=False): def results(self, simplex_result, short_result=False):
"""Collect results for the optimization """Collect results for the optimization
Alternatively, reduced results, e.g. when doing pFBA Alternatively, reduced results, e.g. when doing pFBA
:param simplex_result: result from the LP solver
:type simplex_result: int
:param short_result: short result (only objective value and status) or long result :param short_result: short result (only objective value and status) or long result
:type short_result: bool (default:False) :type short_result: bool (default:False)
:return: result from the FBA optimization :return: result from the FBA optimization
...@@ -277,13 +278,14 @@ class LpProblem: ...@@ -277,13 +278,14 @@ class LpProblem:
""" """
self.res = { self.res = {
'fun': glpk.glp_get_obj_val(self.lp), 'fun': glpk.glp_get_obj_val(self.lp),
'simplex_status': self.simplex_result, 'success': glpk.glp_get_status(self.lp) == glpk.GLP_OPT,
'simplex_message': simplex_status[self.simplex_result],
} }
if short_result is False: if short_result is False:
self.res['x'] = np.array([glpk.glp_get_col_prim(self.lp, 1 + i) for i in range(self.ncols)]) self.res['x'] = np.array([glpk.glp_get_col_prim(self.lp, 1 + i) for i in range(self.ncols)])
self.res['reduced_costs'] = np.array([glpk.glp_get_col_dual(self.lp, 1 + i) for i in range(self.ncols)]) self.res['reduced_costs'] = np.array([glpk.glp_get_col_dual(self.lp, 1 + i) for i in range(self.ncols)])
self.res['shadow_prices'] = np.array([glpk.glp_get_row_dual(self.lp, 1 + i) for i in range(self.nrows)]) self.res['shadow_prices'] = np.array([glpk.glp_get_row_dual(self.lp, 1 + i) for i in range(self.nrows)])
self.res['simplex_result'] = simplex_result
self.res['simplex_message'] = simplex_status[simplex_result],
self.res['generic_status'] = generic_status[glpk.glp_get_status(self.lp)] self.res['generic_status'] = generic_status[glpk.glp_get_status(self.lp)]
self.res['primal_status'] = prim_status[glpk.glp_get_prim_stat(self.lp)] self.res['primal_status'] = prim_status[glpk.glp_get_prim_stat(self.lp)]
self.res['dual_status'] = dual_status[glpk.glp_get_dual_stat(self.lp)] self.res['dual_status'] = dual_status[glpk.glp_get_dual_stat(self.lp)]
......
"""Implementation of GBA Problem class.
- several process machines supported (yet to be validated)
- non-enzymatic metabolic reactions supported (yet to be validated)
Yet to implement:
- enzyme degradation
- enzyme transitions
- several compartments constraints
- target concentrations and target fluxes
Peter Schubert, HHU Duesseldorf, June 2022
"""
import numpy as np
from xbanalysis.problems.lp_problem import LpProblem
class RbaProblem:
def __init__(self, xba_model):
"""
:param xba_model:
"""
# metabolic reactions and enzyme transitions (no FBA reactions, no protein synthesis, no degradation)
self.mr_rids = [r.id for r in xba_model.reactions.values() if r.sboterm.is_in_branch('SBO:0000167')
and r.sboterm.is_in_branch('SBO:0000179') is False]
self.mrid2enz = {rid: xba_model.reactions[rid].enzyme for rid in self.mr_rids}
self.enz_sids = [enz_sid for enz_sid in self.mrid2enz.values() if enz_sid is not None]
self.rev_enz_sids = [enz_sid for mr_rid, enz_sid in self.mrid2enz.items()
if enz_sid is not None and xba_model.reactions[mr_rid].reversible]
self.processes = {s.rba_process_machine: sid for sid, s in xba_model.species.items()
if hasattr(s, 'rba_process_machine')}
self.psenz_rids = [xba_model.species[enz_sid].ps_rid for enz_sid in self.enz_sids]
self.pspm_rids = [xba_model.species[enz_sid].ps_rid for enz_sid in self.processes.values()]
self.densities = {cid: c.rba_frac_gcdw for cid, c in xba_model.compartments.items()
if hasattr(c, 'rba_frac_gcdw')}
# metabolites and enzymes used in metabolic reactions and enzyme transitions
sids = set()
for rid in self.mr_rids:
for sid in xba_model.reactions[rid].reactants:
sids.add(sid)
for sid in xba_model.reactions[rid].products:
sids.add(sid)
used_sids = list(sids)
self.sids = [sid for sid in used_sids if xba_model.species[sid].constant is False]
self.ext_conc = {sid: xba_model.species[sid].initial_conc
for sid in used_sids if xba_model.species[sid].constant is True}
self.rba_cols = self.mr_rids + self.enz_sids + list(self.processes)
self.rba_rows = (self.sids + list(self.processes)
+ [sid + '_feff' for sid in self.enz_sids]
+ [sid + '_reff' for sid in self.rev_enz_sids]
+ list(self.densities))
self.is_rba_model = self.check_rba_model(xba_model)
if self.is_rba_model is False:
print('Error: not a GBA model - missing parameters')
return
fix_m, var_m, row_bnds_m = self.construct_mass_balance(xba_model)
fix_pc, var_pc, row_bnds_pc = self.construct_process_capacities(xba_model)
fix_eff, var_eff, row_bnds_eff = self.construct_efficencies(xba_model)
fix_cd, var_cd, row_bnds_cd = self.construct_densities(xba_model)
self.fix_cd = fix_cd
self.fix_mat = np.vstack((fix_m, fix_pc, fix_eff, fix_cd))
self.var_mat = np.vstack((var_m, var_pc, var_eff, var_cd))
self.fix_row_bnds = np.vstack((np.zeros_like(row_bnds_m), np.zeros_like(row_bnds_pc),
row_bnds_eff, row_bnds_cd))
self.var_row_bnds = np.vstack((row_bnds_m, row_bnds_pc,
np.zeros_like(row_bnds_eff), np.zeros_like(row_bnds_cd)))
# in RBA we only check feasibility.
# Here we maximize enzyme amount and check that objective value e.g. > 1e-10 mmol/gcdw
self.obj_dir = 'maximize'
self.obj_coefs = np.hstack((np.zeros(len(self.mr_rids)),
np.ones(len(self.enz_sids) + len(self.processes))))
col_lbs = np.hstack(([xba_model.reactions[rid].fbc_lb for rid in self.mr_rids],
np.zeros(len(self.enz_sids) + len(self.processes))))
col_ubs = np.hstack(([xba_model.reactions[rid].fbc_ub for rid in self.mr_rids],
np.nan * np.ones(len(self.enz_sids) + len(self.processes))))
self.col_bnds = np.vstack((col_lbs, col_ubs))
self.scaling = '2N'
self.max_iter = 20
def check_rba_model(self, xba_model):
"""Check if rba related data available in the XBA model
:param xba_model: Xba model with data extracted from SBML file
:type xba_model: XbaModel
:return: indicator if model contains data required for RBA problem formulation
:rtype: bool
"""
is_rba_model = True
r_check = {'flux lower bound': 'fbc_lb', 'flux upper bound': 'fbc_ub', 'kapp forward': 'rba_kapp_f'}
for check, param in r_check.items():
missing = [rid for rid in self.mr_rids if hasattr(xba_model.reactions[rid], param) is False]
if len(missing) > 0:
print(f'missing {check} in:', missing)
is_rba_model = False
missing = [rid for rid in self.mr_rids if xba_model.reactions[rid].reversible
and hasattr(xba_model.reactions[rid], 'rba_kapp_r') is False]
if len(missing) > 0:
print('missing kapp reverse in:', missing)
is_rba_model = False
missing = [sid for sid in self.sids + self.enz_sids + list(self.processes.values())
if hasattr(xba_model.species[sid], 'mw') is False]
if len(missing) > 0:
print('missing molecular weight in:', missing)
is_rba_model = False
missing = [sid for sid in list(self.processes.values())
if hasattr(xba_model.species[sid], 'rba_process_capacity') is False]
if len(missing) > 0:
print('missing process capacity in:', missing)
is_rba_model = False
if len(self.densities) == 0:
print('missing a density constraint')
is_rba_model = False
return is_rba_model
def rba_optimize(self):
"""
:return:
"""
if self.is_rba_model is False:
print('Error: not a RBA model - missing parameters')
res = {'success': False, 'fun': 0.0, 'message': 'not an RBA model'}
return res
res = {}
# Preanalysis: find range of growth rates
gr = {'lb': 0.0, 'ub': np.nan, 'try': 1.0}
for i in range(self.max_iter):
constr_coefs = self.fix_mat + gr['try'] * self.var_mat
# row_bnds = self.fix_row_bnds + gr['try'] * self.var_row_bnds
lp = LpProblem()
lp.configure(self.obj_coefs, constr_coefs, self.fix_row_bnds, self.col_bnds, self.obj_dir)
res = lp.solve(scaling=self.scaling)
del lp
if res['success'] and res['fun'] > 1e-10:
gr['lb'] = gr['try']
gr['try'] *= 10
else:
if gr['try'] <= 1e-4:
break
gr['ub'] = gr['try']
gr['try'] /= 10
if gr['lb'] > 0.0 and np.isfinite(gr['ub']):
break
if gr['lb'] == 0.0 or np.isnan(gr['ub']):
res = {'success': False, 'fun': 0.0, 'message': 'no solution found'}
return res
# find exact growth rate
for i in range(self.max_iter):
gr['try'] = 0.5 * (gr['lb'] + gr['ub'])
constr_coefs = self.fix_mat + gr['try'] * self.var_mat
# row_bnds = self.fix_row_bnds + gr['try'] * self.var_row_bnds
lp = LpProblem()
lp.configure(self.obj_coefs, constr_coefs, self.fix_row_bnds, self.col_bnds, self.obj_dir)
res = lp.solve(scaling=self.scaling)
del lp
if res['success'] and res['fun'] > 1e-10:
gr['lb'] = gr['try']
else:
gr['ub'] = gr['try']
if (gr['ub'] - gr['lb']) < gr['try'] * 1e-5:
break
res['fun'] = gr['try']
return res
def construct_mass_balance(self, xba_model):
fix_m_x_mr = xba_model.get_stoic_matrix(self.sids, self.mr_rids)
var_m_x_enz = xba_model.get_stoic_matrix(self.sids, self.psenz_rids)
var_m_x_pm = xba_model.get_stoic_matrix(self.sids, self.pspm_rids)
fix_m = np.hstack((fix_m_x_mr,
np.zeros((len(self.sids), len(self.enz_sids)+len(self.processes)))))
var_m = np.hstack((np.zeros((len(self.sids), len(self.mr_rids))),
var_m_x_enz,
var_m_x_pm))
targets = np.array([getattr(xba_model.species[sid], 'rba_target', 0.0) for sid in self.sids])
row_bnds_m = np.vstack((targets, targets)).T
return fix_m, var_m, row_bnds_m
def construct_process_capacities(self, xba_model):
var_pc_x_enz = np.zeros((len(self.processes), len(self.psenz_rids)))
for idx, pm in enumerate(self.processes):
var_pc_x_enz[idx] = [xba_model.species[enz_sid].rba_process_costs.get(pm, 0.0)
for enz_sid in self.enz_sids]
var_pc_x_pm = np.zeros((len(self.processes), len(self.processes)))
for idx, pm in enumerate(self.processes):
var_pc_x_pm[idx] = [xba_model.species[enz_sid].rba_process_costs.get(pm, 0.0)
for enz_sid in self.processes.values()]
fix_pc_x_pm = -np.diag([xba_model.species[enz_sid].rba_process_capacity
for enz_sid in self.processes.values()]) * 3600
fix_pc = np.hstack((np.zeros((len(self.processes), len(self.mr_rids) + len(self.enz_sids))),
fix_pc_x_pm))
var_pc = np.hstack((np.zeros((len(self.processes), len(self.mr_rids))),
var_pc_x_enz, var_pc_x_pm))
row_bnds_pc = np.zeros((len(self.processes), 2))
return fix_pc, var_pc, row_bnds_pc
def get_efficiencies(self, xba_model):
efficiencies = np.zeros((2, len(self.enz_sids)))
idx = 0
for mr_rid, enz_sid in self.mrid2enz.items():
if enz_sid is not None:
r = xba_model.reactions[mr_rid]
saturation = 1.0
if hasattr(r, 'rba_km_ext'):
for sid in r.reactants:
if sid in self.ext_conc:
saturation *= self.ext_conc[sid] / (self.ext_conc[sid] + r.rba_km_ext)
efficiencies[0, idx] = r.rba_kapp_f * 3600 * saturation
saturation = 1.0
if r.reversible:
if hasattr(r, 'rba_km_ext'):
for sid in r.products:
if sid in self.ext_conc:
saturation *= self.ext_conc[sid] / (self.ext_conc[sid] + r.rba_km_ext)
efficiencies[1, idx] = r.rba_kapp_r * 3600 * saturation
idx += 1
return efficiencies
def construct_efficencies(self, xba_model):
efficiencies = self.get_efficiencies(xba_model)
mask = np.array([enz_sid is not None for enz_sid in self.mrid2enz.values()])
fix_feff_x_mr = np.diag(np.ones(len(self.mrid2enz)))[mask]
fix_feff_x_enz = np.diag(-efficiencies[0])
fix_feff_x_pm = np.zeros((len(self.enz_sids), len(self.processes)))
fix_feff = np.hstack((fix_feff_x_mr, fix_feff_x_enz, fix_feff_x_pm))
row_bnds_feff = np.vstack((np.ones(len(self.enz_sids)) * np.nan, np.zeros(len(self.enz_sids)))).T
mask = [enz_sid is not None and xba_model.reactions[mr_rid].reversible
for mr_rid, enz_sid in self.mrid2enz.items()]
fix_reff_x_mr = np.diag(-np.ones(len(self.mrid2enz)))[mask]
mask = [enz_sid in self.rev_enz_sids for enz_sid in self.enz_sids]
fix_reff_x_enz = np.diag(-efficiencies[1])[mask]
fix_reff_x_pm = np.zeros((len(self.rev_enz_sids), len(self.processes)))
fix_reff = np.hstack((fix_reff_x_mr, fix_reff_x_enz, fix_reff_x_pm))
row_bnds_reff = np.vstack((np.ones(len(self.rev_enz_sids)) * np.nan, np.zeros(len(self.rev_enz_sids)))).T
fix_eff = np.vstack((fix_feff, fix_reff))
var_eff = np.zeros_like(fix_eff)
row_bnds_eff = np.vstack((row_bnds_feff, row_bnds_reff))
return fix_eff, var_eff, row_bnds_eff
def construct_densities(self, xba_model):
fix_cd_x_mr = np.zeros((len(self.densities), len(self.mr_rids)))
fix_cd_x_enz = np.zeros((len(self.densities), len(self.enz_sids)))
for row_idx, cid in enumerate(self.densities):
for col_idx, sid in enumerate(self.enz_sids):
if xba_model.species[sid].compartment == cid:
fix_cd_x_enz[row_idx, col_idx] = xba_model.species[sid].mw / 1000
fix_cd_x_pm = np.zeros((len(self.densities), len(self.processes)))
for row_idx, cid in enumerate(self.densities):
for col_idx, sid in enumerate(self.processes.values()):
if xba_model.species[sid].compartment == cid:
fix_cd_x_pm[row_idx, col_idx] = xba_model.species[sid].mw / 1000
fix_cd = np.hstack((fix_cd_x_mr, fix_cd_x_enz, fix_cd_x_pm))
var_cd = np.zeros_like(fix_cd)
row_bnds_cd = np.vstack((np.zeros(len(self.densities)),
[val for val in self.densities.values()])).T
return fix_cd, var_cd, row_bnds_cd
...@@ -6,6 +6,9 @@ Peter Schubert, HHU Duesseldorf, May 2022 ...@@ -6,6 +6,9 @@ Peter Schubert, HHU Duesseldorf, May 2022
import sys import sys
import re import re
xml_gba_ns = 'http://www.hhu.de/ccb/gba/ns'
xml_rba_ns = 'http://www.hhu.de/ccb/rba/ns'
def get_sbml_ids(df, sbo_id): def get_sbml_ids(df, sbo_id):
"""Get SBML IDs with specified sboterm-id. """Get SBML IDs with specified sboterm-id.
......