Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • main
1 result

Target

Select target project
No results found
Select Git revision
  • main
1 result
Show changes

Commits on Source 2

15 files
+ 11264
95
Compare changes
  • Side-by-side
  • Inline

Files

+7 −0
Original line number Original line Diff line number Diff line
@@ -2,6 +2,13 @@


XBA problem generation from SBML code metabolic/kinetic models. Support of Growth Balance Analysis (GBA), FBA and RBA
XBA problem generation from SBML code metabolic/kinetic models. Support of Growth Balance Analysis (GBA), FBA and RBA


References to be included !!!
- sbo terms (sbo.owl)
- SBML and extensions for fbc, groups
- pFBA, FBA, RBA, FVA
- sbmlxdf


## Getting started
## Getting started


To make it easy for you to get started with GitLab, here's a list of recommended next steps.
To make it easy for you to get started with GitLab, here's a list of recommended next steps.
Original line number Original line Diff line number Diff line
@@ -6,5 +6,7 @@ from .xba_function import XbaFunction
from .xba_parameter import XbaParameter
from .xba_parameter import XbaParameter
from .xba_reaction import XbaReaction
from .xba_reaction import XbaReaction
from .xba_species import XbaSpecies
from .xba_species import XbaSpecies
from .sbo_term import SboTerm


__all__ = ['XbaModel', 'XbaCompartment', 'XbaFunction', 'XbaParameter', 'XbaReaction', 'XbaSpecies']
__all__ = ['XbaModel', 'XbaCompartment', 'XbaFunction', 'XbaParameter',
           'XbaReaction', 'XbaSpecies', 'SboTerm']
+236 −0
Original line number Original line Diff line number Diff line
"""Implementation of SboTerm class.

Peter Schubert, HHU Duesseldorf, July 2022
"""
import re
import os
from functools import lru_cache
import xml.etree.ElementTree


ns = {'owl': 'http://www.w3.org/2002/07/owl#',
      'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#',
      'rdfs': 'http://www.w3.org/2000/01/rdf-schema#'}

local_sbo_tree = {
    'SBO_0000000': {'parents': [], 'label': 'systems biology representation'},
    'SBO_0000167': {'parents': ['SBO_0000375'], 'label': 'biochemical or transport reaction'},
    'SBO_0000176': {'parents': ['SBO_0000167'], 'label': 'biochemical reaction'},
    'SBO_0000179': {'parents': ['SBO_0000176'], 'label': 'degradation'},
    'SBO_0000180': {'parents': ['SBO_0000176'], 'label': 'dissociation'},
    'SBO_0000181': {'parents': ['SBO_0000176'], 'label': 'conformational transition'},
    'SBO_0000182': {'parents': ['SBO_0000176'], 'label': 'conversion'},
    'SBO_0000183': {'parents': ['SBO_0000205'], 'label': 'transcription'},
    'SBO_0000184': {'parents': ['SBO_0000205'], 'label': 'translation'},
    'SBO_0000204': {'parents': ['SBO_0000205'], 'label': 'DNA replication'},
    'SBO_0000205': {'parents': ['SBO_0000375'], 'label': 'composite biochemical process'},
    'SBO_0000231': {'parents': ['SBO_0000000'], 'label': 'occurring entity representation'},
    'SBO_0000236': {'parents': ['SBO_0000000'], 'label': 'physical entity representation'},
    'SBO_0000240': {'parents': ['SBO_0000236'], 'label': 'material entity'},
    'SBO_0000245': {'parents': ['SBO_0000240'], 'label': 'macromolecule'},
    'SBO_0000246': {'parents': ['SBO_0000245'], 'label': 'information macromolecule'},
    'SBO_0000247': {'parents': ['SBO_0000240'], 'label': 'simple chemical'},
    'SBO_0000250': {'parents': ['SBO_0000246'], 'label': 'ribonucleic acid'},
    'SBO_0000251': {'parents': ['SBO_0000246'], 'label': 'deoxyribonucleic acid'},
    'SBO_0000252': {'parents': ['SBO_0000246'], 'label': 'polypeptide chain'},
    'SBO_0000327': {'parents': ['SBO_0000247'], 'label': 'non-macromolecular ion'},
    'SBO_0000328': {'parents': ['SBO_0000247'], 'label': 'non-macromolecular radical'},
    'SBO_0000375': {'parents': ['SBO_0000231'], 'label': 'process'},
    'SBO_0000377': {'parents': ['SBO_0000176'], 'label': 'isomerisation'},
    'SBO_0000395': {'parents': ['SBO_0000375'], 'label': 'encapsulating process'},
    'SBO_0000589': {'parents': ['SBO_0000205'], 'label': 'genetic production'},
    'SBO_0000627': {'parents': ['SBO_0000631'], 'label': 'exchange reaction'},
    'SBO_0000628': {'parents': ['SBO_0000631'], 'label': 'demand reaction'},
    'SBO_0000629': {'parents': ['SBO_0000395'], 'label': 'biomass production'},
    'SBO_0000630': {'parents': ['SBO_0000395'], 'label': 'ATP maintenance'},
    'SBO_0000631': {'parents': ['SBO_0000375'], 'label': 'pseudoreaction'},
    'SBO_0000632': {'parents': ['SBO_0000631'], 'label': 'sink reaction'},
    'SBO_0000656': {'parents': ['SBO_0000182'], 'label': 'activation'},
    'SBO_0000672': {'parents': ['SBO_0000176'], 'label': 'spontaneous reaction'},
}
data_dir = 'data'
sbo_owl_fname = 'sbo.owl'


class SboTerm:
    """SboTerm variables

    Implemented as a flyweight pattern and caching for is_in_batch lookups
    Class builds a SBO hierachie tree, loaded from a 'sbo.owl' file,
    if file not found, tree is constructed from a local dict.
    Sbo term tree implemented as dictionary

    Class provides methods to parse the tree
    - retrieving parents
    - retrieving children
    - check if a sbo term belongs is within a specified tree branch
    - retrieving subtrees (could be used to build default tree)
    """
    _instances = {}
    sbo_tree = None

    def __init__(self, sbo_term):
        """Initialize SboTerm with sbo_term id

        :param sbo_term: SBO term id
        :type sbo_term: string in format 'SBO:0000xxx' or int
        """
        self.sbo_term = sbo_term

    def __new__(cls, sbo_term):
        """Create only one SboTerm per sbo_term.

        Flyweight pattern

        Improves performance on caching.
        Also construct initial SBO Tree

        :param sbo_term: SBO term id
        :type sbo_term: string in format 'SBO:0000xxx' or int
        """
        self = cls._instances.get(sbo_term)
        if self is None:
            self = cls._instances[sbo_term] = object.__new__(SboTerm)
            if cls.sbo_tree is None:
                cls.load_sbo_tree()
        return self

    @property
    def sbo_term(self):
        return self.__sbo_term

    @sbo_term.setter
    def sbo_term(self, sbo_term):
        if type(sbo_term) is str:
            self.__sbo_term = re.sub(':', '_', sbo_term)
        else:
            self.__sbo_term = f'SBO_{sbo_term:07d}'

    def __repr__(self):
        return self.sbo_term

    def get_parents(self):
        """Get parent hierarchy for a specific node.

        Supporting multiple parents of a node.

        :return: ordered list of unique sbo_terms of parent hierarchy
        :rtype: list of strings
        """
        return [repr(node) for node in type(self)._get_parents(self, [])]

    def get_children(self):
        """Get children subtree for a specific node.

        :return: ordered list of unique sbo_terms for children subtree
        :rtype: list of strings
        """
        return [repr(node) for node in type(self)._get_children(self, [])]

    @lru_cache
    def is_in_branch(self, upper_sbo_term):
        """Check if sbo term is part subtree starting given value.

        :param upper_sbo_term: SBO term that starts the branch
        :rtype upper_sbo_term: string or int
        :return: True if sbo_term part of the given branch
        :rtype: bool
        """
        return repr(SboTerm(upper_sbo_term)) in self.get_parents() + [self.sbo_term]

    def get_subtree(self):
        """Retrieve a subtry including parents and children of give sbo term.

        :return: all nodes forming the subtree with label and hierarchy information
        :rtype: dict with nodes forming the subtree
        """
        nodes = set(type(self)._get_children(self, [self]))
        nodes = nodes.union(set(type(self)._get_parents(self, [])))
        sub_tree = {}
        for node in nodes:
            sbo_term = node.__sbo_term
            sub_tree[sbo_term] = {'parents': type(self).sbo_tree[sbo_term]['parents'],
                                  'label': type(self).sbo_tree[sbo_term]['label']}
        return sub_tree

    @classmethod
    def _get_parents(cls, node, branch):
        """Class method to recursively identify all parents.

        :param node: sbo term of current node
        :type node: SboTerm
        :param branch: list of parents already identified
        :type branch: list of SboTerm
        :return: list of parents so far identified
        :rtype: list of SboTerm
        """
        if node.__sbo_term in SboTerm.sbo_tree:
            for parent in SboTerm.sbo_tree[node.__sbo_term]['parents']:
                sbo_parent = SboTerm(parent)
                branch.append(sbo_parent)
                cls._get_parents(sbo_parent, branch)
        return branch

    @classmethod
    def _get_children(cls, node, branch):
        """Class method to recursively identify all children.

        :param node: sbo term of current node
        :type node: SboTerm
        :param branch: list of children already identified
        :type branch: list of SboTerm
        :return: list of children so far identified
        :rtype: list of SboTerm
        """
        if node.__sbo_term in SboTerm.sbo_tree:
            for child in SboTerm.sbo_tree[node.__sbo_term]['children']:
                sbo_child = SboTerm(child)
                branch.append(sbo_child)
                cls._get_children(sbo_child, branch)
        return branch

    @classmethod
    def load_sbo_tree(cls):
        """Class method to construct an SBO Tree from file or dict.

        updated sbo file can be downloaded from https://www.ebi.ac.uk/sbo/main/download
        and stored in the package under data/sbo.owl

        """
        here = os.path.dirname(os.path.abspath(__file__))
        sbo_owl_path = os.path.join(here, data_dir, sbo_owl_fname)

        if os.path.exists(sbo_owl_path):
            tree = xml.etree.ElementTree.parse(sbo_owl_path)
            root = tree.getroot()
            sbo_tree = {}
            for sbo_class in root.findall('owl:Class', ns):
                sbo_term_id = sbo_class.attrib['{' + ns['rdf'] + '}' + 'about']
                subclasses_of = []
                label = ''
                comment = ''
                for child in sbo_class:
                    if child.tag == '{' + ns['rdfs'] + '}' + 'subClassOf':
                        if ('{' + ns['rdf'] + '}' + 'resource') in child.attrib:
                            subclasses_of.append(child.attrib['{' + ns['rdf'] + '}' + 'resource'])
                        else:
                            for class_node in child:
                                if class_node.tag == '{' + ns['owl'] + '}' + 'Class':
                                    subclasses_of.append(class_node.attrib['{' + ns['rdf'] + '}' + 'about'])
                    if child.tag == '{' + ns['rdfs'] + '}' + 'label':
                        label = child.text
                    if child.tag == '{' + ns['rdfs'] + '}' + 'comment':
                        comment = child.text
                sbo_tree[sbo_term_id] = {'parents': subclasses_of, 'label': label, 'comment': comment}
        else:
            print('loading local sbo subtree')
            sbo_tree = local_sbo_tree

        # add children information, so we can parse up/downwards through the tree
        for sbo_term in sbo_tree:
            sbo_tree[sbo_term]['children'] = []
        for sbo_term in sbo_tree:
            for parent in sbo_tree[sbo_term]['parents']:
                if sbo_term not in sbo_tree[parent]['children']:
                    sbo_tree[parent]['children'].append(sbo_term)
        SboTerm.sbo_tree = sbo_tree
Original line number Original line Diff line number Diff line
@@ -3,6 +3,11 @@
Peter Schubert, HHU Duesseldorf, May 2022
Peter Schubert, HHU Duesseldorf, May 2022
"""
"""


import sbmlxdf

from xbanalysis.utils.utils import xml_rba_ns
from xbanalysis.model.sbo_term import SboTerm



class XbaCompartment:
class XbaCompartment:


@@ -15,3 +20,9 @@ class XbaCompartment:
            self.units = s_compartment['units']
            self.units = s_compartment['units']
        if 'spatialDimension' in s_compartment:
        if 'spatialDimension' in s_compartment:
            self.dimension = s_compartment['spatialDimension']
            self.dimension = s_compartment['spatialDimension']
        if 'xmlAnnotation' in s_compartment:
            attrs = sbmlxdf.extract_xml_attrs(s_compartment['xmlAnnotation'], ns=xml_rba_ns, token='density')
            if 'frac_gcdw' in attrs:
                self.rba_frac_gcdw = float(attrs['frac_gcdw'])
        if 'sboterm' in s_compartment:
            self.sboterm = SboTerm(s_compartment['sboterm'])
Original line number Original line Diff line number Diff line
@@ -41,7 +41,7 @@ class XbaModel:


        self.species = {sid: XbaSpecies(row)
        self.species = {sid: XbaSpecies(row)
                        for sid, row in model_dict['species'].iterrows()}
                        for sid, row in model_dict['species'].iterrows()}
        self.reactions = {rid: XbaReaction(row, self.functions, self.compartments)
        self.reactions = {rid: XbaReaction(row, self.species, self.functions, self.compartments)
                          for rid, row in model_dict['reactions'].iterrows()}
                          for rid, row in model_dict['reactions'].iterrows()}


        for r in self.reactions.values():
        for r in self.reactions.values():
Original line number Original line Diff line number Diff line
@@ -2,6 +2,7 @@


Peter Schubert, HHU Duesseldorf, May 2022
Peter Schubert, HHU Duesseldorf, May 2022
"""
"""
from xbanalysis.model.sbo_term import SboTerm




class XbaParameter:
class XbaParameter:
@@ -13,4 +14,4 @@ class XbaParameter:
        self.constant = s_parameter['constant']
        self.constant = s_parameter['constant']
        self.units = s_parameter['units']
        self.units = s_parameter['units']
        if 'sboterm' in s_parameter:
        if 'sboterm' in s_parameter:
            self.sboterm = s_parameter['sboterm']
            self.sboterm = SboTerm(s_parameter['sboterm'])
 No newline at end of file
Original line number Original line Diff line number Diff line
@@ -6,28 +6,31 @@ Peter Schubert, HHU Duesseldorf, May 2022
import re
import re
import sbmlxdf
import sbmlxdf
from xbanalysis.utils.utils import get_species_stoic, get_local_params, expand_kineticlaw
from xbanalysis.utils.utils import get_species_stoic, get_local_params, expand_kineticlaw
from xbanalysis.utils.utils import xml_rba_ns
from xbanalysis.model.sbo_term import SboTerm




class XbaReaction:
class XbaReaction:


    def __init__(self, s_reaction, functions, compartments):
    def __init__(self, s_reaction, species, functions, compartments):
        self.id = s_reaction.name
        self.id = s_reaction.name
        self.name = s_reaction.get('name', self.id)
        self.name = s_reaction.get('name', self.id)
        if 'sboterm' in s_reaction:
        if 'sboterm' in s_reaction:
            self.sboterm = s_reaction['sboterm']
            self.sboterm = SboTerm(s_reaction['sboterm'])
        self.reaction_string = s_reaction['reactionString']
        self.reaction_string = s_reaction['reactionString']
        self.reversible = s_reaction['reversible']
        self.reversible = s_reaction['reversible']
        self.products = get_species_stoic(s_reaction['products'])
        self.reactants = get_species_stoic(s_reaction['reactants'])
        self.reactants = get_species_stoic(s_reaction['reactants'])
        self.products = get_species_stoic(s_reaction['products'])
        self.metabs_only = True
        for sid in list(self.reactants) + list(self.products):
            if species[sid].sboterm.is_in_branch('SBO:0000247') is False:
                self.metabs_only = False
        if 'modifiers' in s_reaction:
        if 'modifiers' in s_reaction:
            self.modifiers = get_species_stoic(s_reaction['modifiers'])
            self.modifiers = get_species_stoic(s_reaction['modifiers'])
        else:
        else:
            self.modifiers = {}
            self.modifiers = {}
        if 'fbcLb' in s_reaction and 'fbcUb' in s_reaction:
            self.fbc_lb = s_reaction['fbcLb']
            self.fbc_ub = s_reaction['fbcUb']
        self.local_params = get_local_params(s_reaction)
        if 'kineticLaw' in s_reaction:
        if 'kineticLaw' in s_reaction:
            self.local_params = get_local_params(s_reaction)
            self.kinetic_law = s_reaction['kineticLaw'] if type(s_reaction['kineticLaw']) == str else ''
            self.kinetic_law = s_reaction['kineticLaw'] if type(s_reaction['kineticLaw']) == str else ''
            # convert to numpy, replace local parameters with numerical values, inline lambda functions:
            # convert to numpy, replace local parameters with numerical values, inline lambda functions:
            expanded_kl = sbmlxdf.misc.mathml2numpy(self.kinetic_law)
            expanded_kl = sbmlxdf.misc.mathml2numpy(self.kinetic_law)
@@ -37,41 +40,56 @@ class XbaReaction:
            for cid in compartments.keys():
            for cid in compartments.keys():
                if re.search(r'\b'+cid+r'\b', self.expanded_kl) is not None:
                if re.search(r'\b'+cid+r'\b', self.expanded_kl) is not None:
                    self.compartment = cid
                    self.compartment = cid
        # retrieve optional flux balance bounds, ensuring consistent bounds
        if 'fbcLb' in s_reaction and 'fbcUb' in s_reaction:
            self.fbc_lb = s_reaction['fbcLb']
            self.fbc_ub = max(self.fbc_lb, s_reaction['fbcUb'])
            if self.reversible is False:
                self.fbc_lb = max(0, self.fbc_lb)
                self.fbc_ub = max(0, self.fbc_ub)
        if 'xmlAnnotation' in s_reaction:
            attrs = sbmlxdf.extract_xml_attrs(s_reaction['xmlAnnotation'], ns=xml_rba_ns, token='kinetics')
            if 'kapp_f_per_s' in attrs:
                self.rba_kapp_f = float(attrs['kapp_f_per_s'])
            if 'kapp_r_per_s' in attrs:
                self.rba_kapp_r = float(attrs['kapp_r_per_s'])
            if 'Km_ext_M' in attrs:
                self.rba_km_ext = float(attrs['Km_ext_M'])
        self.ps_product = ''
        self.ps_product = ''
        self.enzyme = ''
        self.enzyme = None


    def set_enzyme(self, species):
    def set_enzyme(self, species):
        for sid in self.modifiers:
        for sid in self.modifiers:
            if species[sid].sboterm in ('SBO:0000252', 'SBO:0000250'):
            if species[sid].sboterm.is_in_branch('SBO:0000246'):
                self.enzyme = sid
                self.enzyme = sid
                break
                break


    def add_species_usage(self, species):
    def add_species_usage(self, species):
        # for metabolic reactions add to the species consumed/produced the metabolic reaction id
        # for metabolic reactions add to the species consumed/produced the metabolic reaction id
        # !!! we could also enter the Reaction Object references instead
        # !!! we could also enter the Reaction Object references instead
        if self.sboterm in ('SBO:0000167', 'SBO:0000179', 'SBO:0000182'):
        if self.sboterm.is_in_branch('SBO:0000167'):
            for sid, val in self.products.items():
            for sid, val in self.products.items():
                species[sid].add_m_reaction(self.id, val)
                species[sid].add_m_reaction(self.id, val)
            for sid, val in self.reactants.items():
            for sid, val in self.reactants.items():
                species[sid].add_m_reaction(self.id, -val)
                species[sid].add_m_reaction(self.id, -val)
        # protein degradation reactions
        # protein degradation reactions
        if self.sboterm == 'SBO:0000179':
        if self.sboterm.is_in_branch('SBO:0000179'):
            for sid, val in self.reactants.items():
            for sid, val in self.reactants.items():
                if species[sid].sboterm in ('SBO:0000252', 'SBO:0000250'):
                if species[sid].sboterm.is_in_branch('SBO:0000246'):
                    species[sid].add_p_degrad_rid(self.id, -val)
                    species[sid].add_p_degrad_rid(self.id, -val)
        # protein transitions
        # protein transitions
        if self.sboterm == 'SBO:0000182':
        if self.sboterm.is_in_branch('SBO:0000182'):
            for sid, val in self.reactants.items():
            for sid, val in self.reactants.items():
                if species[sid].sboterm in ('SBO:0000252', 'SBO:0000250'):
                if species[sid].sboterm.is_in_branch('SBO:0000246'):
                    species[sid].add_p_transition_rid(self.id, -val)
                    species[sid].add_p_transition_rid(self.id, -val)
            for sid, val in self.products.items():
            for sid, val in self.products.items():
                if species[sid].sboterm in ('SBO:0000252', 'SBO:0000250'):
                if species[sid].sboterm.is_in_branch('SBO:0000246'):
                    species[sid].add_p_transition_rid(self.id, val)
                    species[sid].add_p_transition_rid(self.id, val)


        # protein synthesis reactions
        # protein synthesis reactions
        if self.sboterm in ('SBO:0000589', 'SBO:0000205'):
        if self.sboterm.is_in_branch('SBO:0000205'):
            for sid, val in self.products.items():
            for sid, val in self.products.items():
                if species[sid].sboterm in ('SBO:0000252', 'SBO:0000250'):
                if species[sid].sboterm.is_in_branch('SBO:0000246'):
                    self.ps_product = sid
                    self.ps_product = sid
                    species[sid].set_ps_rid(self.id)
                    species[sid].set_ps_rid(self.id)
                else:
                else:
Original line number Original line Diff line number Diff line
@@ -4,10 +4,8 @@ Peter Schubert, HHU Duesseldorf, May 2022
"""
"""


import sbmlxdf
import sbmlxdf

from xbanalysis.utils.utils import xml_gba_ns, xml_rba_ns

from xbanalysis.model.sbo_term import SboTerm
xml_gba_ns = 'http://www.hhu.de/ccb/gba/ns'
xml_token = 'molecule'




class XbaSpecies:
class XbaSpecies:
@@ -16,7 +14,7 @@ class XbaSpecies:
        self.id = s_species.name
        self.id = s_species.name
        self.name = s_species.get('name', self.id)
        self.name = s_species.get('name', self.id)
        # set sboterm to 000247 to support Flux Balance Analysis
        # set sboterm to 000247 to support Flux Balance Analysis
        self.sboterm = s_species.get('sboterm', 'SBO:0000247')
        self.sboterm = SboTerm(s_species.get('sboterm', 'SBO:0000247'))
        self.compartment = s_species['compartment']
        self.compartment = s_species['compartment']
        self.constant = s_species['constant']
        self.constant = s_species['constant']
        self.boundary = s_species['boundaryCondition']
        self.boundary = s_species['boundaryCondition']
@@ -25,8 +23,19 @@ class XbaSpecies:
        if 'units' in s_species:
        if 'units' in s_species:
            self.units = s_species['substanceUnits']
            self.units = s_species['substanceUnits']
        if 'xmlAnnotation' in s_species:
        if 'xmlAnnotation' in s_species:
            attrs = sbmlxdf.extract_xml_attrs(s_species['xmlAnnotation'], ns=xml_gba_ns, token=xml_token)
            attrs = sbmlxdf.extract_xml_attrs(s_species['xmlAnnotation'], ns=xml_gba_ns, token='molecule')
            if 'weight_Da' in attrs:
                self.mw = float(attrs['weight_Da'])
                self.mw = float(attrs['weight_Da'])
            attrs = sbmlxdf.extract_xml_attrs(s_species['xmlAnnotation'], ns=xml_rba_ns, token='costs')
            if 'target_mmol_per_gcdw' in attrs:
                self.rba_target = float(attrs.pop('target_mmol_per_gcdw'))
            if 'process_machine' in attrs:
                self.rba_process_machine = attrs.pop('process_machine')
            if 'process_capacity_per_s' in attrs:
                self.rba_process_capacity = float(attrs.pop('process_capacity_per_s'))
            if len(attrs) > 0:
                self.rba_process_costs = attrs

        self.ps_rid = None
        self.ps_rid = None
        self.m_reactions = {}
        self.m_reactions = {}
        self.ps_reactions = {}
        self.ps_reactions = {}
Original line number Original line Diff line number Diff line
"""Subpackage with XBA model classes """
"""Subpackage with XBA model classes """


from .gba_problem import GbaProblem
from .gba_problem import GbaProblem
from .rba_problem import RbaProblem
from .fba_problem import FbaProblem
from .fba_problem import FbaProblem
from .lp_problem import LpProblem
from .lp_problem import LpProblem


__all__ = ['GbaProblem', 'FbaProblem', 'LpProblem']
__all__ = ['GbaProblem', 'FbaProblem', 'LpProblem', 'RbaProblem']
Original line number Original line Diff line number Diff line
@@ -35,29 +35,33 @@ class FbaProblem:
        :param xba_model: xba model with all required FBA parameters
        :param xba_model: xba model with all required FBA parameters
        :type xba_model: XbaModel
        :type xba_model: XbaModel
        """
        """
        self.is_fba_model = False
        self.xba_model = xba_model
        self.xba_model = xba_model


        self.metab_sids = [s.id for s in xba_model.species.values() if s.sboterm == 'SBO:0000247']
        # identify reaction excluding protein synthesis and protein degradation
        metabs = set(self.metab_sids)
        self.mr_rids = [r.id for r in xba_model.reactions.values()
        # identify reactions that do not use enzymes in reactants and products
                        if r.sboterm.is_in_branch('SBO:0000205') is False and
        self.mrids = [rid for rid, rxn in xba_model.reactions.items()
                        r.sboterm.is_in_branch('SBO:0000179') is False]
                      if len((set(rxn.products) | set(rxn.reactants)) - metabs) == 0]
        self.mrid2idx = {mrid: idx for idx, mrid in enumerate(self.mr_rids)}
        self.mrid2idx = {mrid: idx for idx, mrid in enumerate(self.mrids)}
        self.reversible = np.array([self.xba_model.reactions[rid].reversible for rid in self.mr_rids])
        self.reversible = np.array([self.xba_model.reactions[rid].reversible for rid in self.mrids])
        self.s_mat = xba_model.get_stoic_matrix(self.metab_sids, self.mrids)

        try:
            self.flux_bounds = np.array([[xba_model.reactions[rid].fbc_lb for rid in self.mrids],
                                         [xba_model.reactions[rid].fbc_ub for rid in self.mrids]])
        except AttributeError:
            print('Error: Model does not contain FBA flux bounds!')
            return


        if hasattr(xba_model, 'fbc_objectives') is False:
        sids = set()
            print('Error: Model does not contain FBA objectives!')
        for rid in self.mr_rids:
            for sid in xba_model.reactions[rid].reactants:
                sids.add(sid)
            for sid in xba_model.reactions[rid].products:
                sids.add(sid)
        self.sids = list(sids)

        self.s_mat = xba_model.get_stoic_matrix(self.sids, self.mr_rids)

        self.is_fba_model = self.check_fba_model(xba_model)
        if self.is_fba_model is False:
            print('Error: not a FBA model - missing parameters')
            return
            return
        self.obj_coefs = np.zeros(len(self.mrids))

        self.flux_bounds = np.array([[xba_model.reactions[rid].fbc_lb for rid in self.mr_rids],
                                     [xba_model.reactions[rid].fbc_ub for rid in self.mr_rids]])
        self.obj_coefs = None
        for oid, fbc_obj in xba_model.fbc_objectives.items():
        for oid, fbc_obj in xba_model.fbc_objectives.items():
            if fbc_obj.active is True:
            if fbc_obj.active is True:
                self.obj_id = fbc_obj.id
                self.obj_id = fbc_obj.id
@@ -65,21 +69,43 @@ class FbaProblem:
                self.set_obj_coefs(fbc_obj.coefficiants)
                self.set_obj_coefs(fbc_obj.coefficiants)
                break
                break


        if hasattr(self, 'obj_id') is False:
    def check_fba_model(self, xba_model):
            print('Error: Model does not contain an active FBA objective!')
        """Check if fba related data available in the XBA model
            return


        self.is_fba_model = True
        :param xba_model: Xba model with data extracted from SBML file
        :type xba_model: XbaModel
        :return: indicator if model contains data required for FBA problem formulation
        :rtype: bool
        """
        is_fba_model = True

        r_check = {'flux lower bound': 'fbc_lb', 'flux upper bound': 'fbc_ub'}
        for check, param in r_check.items():
            missing = [rid for rid in self.mr_rids if hasattr(xba_model.reactions[rid], param) is False]
            if len(missing) > 0:
                print(f'missing {check} in:', missing)
                is_fba_model = False

        if hasattr(xba_model, 'fbc_objectives') is False:
            print('missing flux objectives')
            is_fba_model = False
        else:
            active_objs = [oid for oid in xba_model.fbc_objectives if xba_model.fbc_objectives[oid].active]
            if len(active_objs) == 0:
                print('missing an active flux objective')
                is_fba_model = False

        return is_fba_model


    def combine_fwd_bwd(self, vector):
    def combine_fwd_bwd(self, vector):
        """combine forward and backward flux information.
        """combine forward and backward flux information.


        :param vector: contains data on all reactions + backward information for reversible reactions
        :param vector: contains data on all reactions + backward information for reversible reactions
        :type vector: 1D ndarray of shape (len(mrids) + sum(reversible))
        :type vector: 1D ndarray of shape (len(mr_rids) + sum(reversible))
        :return: combined flux information
        :return: combined flux information
        :rtype: 1D ndarray of shape (len(mrids))
        :rtype: 1D ndarray of shape (len(mr_rids))
        """
        """
        n_cols = len(self.mrids)
        n_cols = len(self.mr_rids)
        combined = vector[:n_cols]
        combined = vector[:n_cols]
        combined[self.reversible] -= vector[n_cols:]
        combined[self.reversible] -= vector[n_cols:]
        return combined
        return combined
@@ -97,14 +123,14 @@ class FbaProblem:
            return None
            return None


        lp = LpProblem()
        lp = LpProblem()
        # self.reversible = np.array([self.xba_model.reactions[rid].reversible for rid in self.mrids])
        # self.reversible = np.array([self.xba_model.reactions[rid].reversible for rid in self.mr_rids])
        fwd_bwd_s_mat = np.hstack((self.s_mat, -self.s_mat[:, self.reversible]))
        fwd_bwd_s_mat = np.hstack((self.s_mat, -self.s_mat[:, self.reversible]))
        fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible]))
        fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible]))
        fwd_bwd_bnds = np.hstack((np.fmax(self.flux_bounds, 0),
        fwd_bwd_bnds = np.hstack((np.fmax(self.flux_bounds, 0),
                                  np.vstack((np.fmax(-self.flux_bounds[1, self.reversible], 0),
                                  np.vstack((np.fmax(-self.flux_bounds[1, self.reversible], 0),
                                             np.fmax(-self.flux_bounds[0, self.reversible], 0)))))
                                             np.fmax(-self.flux_bounds[0, self.reversible], 0)))))
        row_bnds = np.zeros((2, fwd_bwd_s_mat.shape[0]))
        row_bnds = np.zeros((fwd_bwd_s_mat.shape[0], 2))
        lp.configure(fwd_bwd_obj_coefs, fwd_bwd_bnds, fwd_bwd_s_mat, row_bnds, self.obj_dir)
        lp.configure(fwd_bwd_obj_coefs, fwd_bwd_s_mat, row_bnds, fwd_bwd_bnds, self.obj_dir)
        return lp
        return lp


    def fba_optimize(self):
    def fba_optimize(self):
@@ -115,7 +141,7 @@ class FbaProblem:
        """
        """
        lp = self.create_fba_lp()
        lp = self.create_fba_lp()
        if lp is None:
        if lp is None:
            return {'simplex_status': 1, 'simplex_message': 'not an FBA model'}
            return {'success': False, 'message': 'not an FBA model'}
        res = lp.solve()
        res = lp.solve()
        del lp
        del lp
        res['x'] = self.combine_fwd_bwd(res['x'])
        res['x'] = self.combine_fwd_bwd(res['x'])
@@ -133,9 +159,9 @@ class FbaProblem:
        """
        """
        lp = self.create_fba_lp()
        lp = self.create_fba_lp()
        if lp is None:
        if lp is None:
            return {'simplex_status': 1, 'simplex_message': 'not an FBA model'}
            return {'success': False, 'message': 'not an FBA model'}
        res = lp.solve(short_result=True)
        res = lp.solve(short_result=False)
        if res['simplex_status'] == 0:
        if res['success']:
            # fix FBA objective as constraint
            # fix FBA objective as constraint
            fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible]))
            fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible]))
            if self.obj_dir == 'maximize':
            if self.obj_dir == 'maximize':
@@ -144,7 +170,7 @@ class FbaProblem:
                row_bds = np.array([np.nan, res['fun'] / fract_of_optimum])
                row_bds = np.array([np.nan, res['fun'] / fract_of_optimum])
            lp.add_constraint(fwd_bwd_obj_coefs, row_bds)
            lp.add_constraint(fwd_bwd_obj_coefs, row_bds)
            # new LP objective is to minimize sum of fluxes
            # new LP objective is to minimize sum of fluxes
            n_cols = len(self.mrids)
            n_cols = len(self.mr_rids)
            lp.set_obj_coefs(np.ones(n_cols + sum(self.reversible)))
            lp.set_obj_coefs(np.ones(n_cols + sum(self.reversible)))
            lp.set_obj_dir('minimize')
            lp.set_obj_dir('minimize')
            res = lp.solve()
            res = lp.solve()
@@ -165,13 +191,13 @@ class FbaProblem:
        """
        """
        lp = self.create_fba_lp()
        lp = self.create_fba_lp()
        if lp is None:
        if lp is None:
            return {'simplex_status': 1, 'simplex_message': 'not an FBA model'}
            return {'success': False, 'message': 'not an FBA model'}
        res = lp.solve(short_result=True)
        res = lp.solve(short_result=True)


        if rids is None:
        if rids is None:
            rids = self.mrids
            rids = self.mr_rids
        flux_ranges = np.zeros((4, len(rids)))
        flux_ranges = np.zeros((4, len(rids)))
        if res['simplex_status'] == 0:
        if res['success']:
            # fix FBA objective as constraint
            # fix FBA objective as constraint
            fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible]))
            fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible]))
            if self.obj_dir == 'maximize':
            if self.obj_dir == 'maximize':
@@ -180,7 +206,7 @@ class FbaProblem:
                row_bds = np.array([np.nan, res['fun'] / fract_of_optimum])
                row_bds = np.array([np.nan, res['fun'] / fract_of_optimum])
            fba_constr_row = lp.add_constraint(fwd_bwd_obj_coefs, row_bds)
            fba_constr_row = lp.add_constraint(fwd_bwd_obj_coefs, row_bds)


            n_cols = len(self.mrids)
            n_cols = len(self.mr_rids)
            for idx, rid in enumerate(rids):
            for idx, rid in enumerate(rids):
                # select reaction maximize/minimize flux
                # select reaction maximize/minimize flux
                new_obj_coefs = np.zeros(n_cols)
                new_obj_coefs = np.zeros(n_cols)
@@ -190,20 +216,20 @@ class FbaProblem:


                lp.set_obj_dir('minimize')
                lp.set_obj_dir('minimize')
                res = lp.solve(short_result=True)
                res = lp.solve(short_result=True)
                if res['simplex_status'] == 0:
                if res['success']:
                    flux_ranges[0, idx] = res['fun']
                    flux_ranges[0, idx] = res['fun']
                    flux_ranges[2, idx] = lp.get_row_value(fba_constr_row)
                    flux_ranges[2, idx] = lp.get_row_value(fba_constr_row)
                else:
                else:
                    print(f"FVA min failed for {rid}, {res['simplex_message']}")
                    print(f"FVA min failed for {rid}")
                    flux_ranges[0, idx] = np.nan
                    flux_ranges[0, idx] = np.nan


                lp.set_obj_dir('maximize')
                lp.set_obj_dir('maximize')
                res = lp.solve(short_result=True)
                res = lp.solve(short_result=True)
                if res['simplex_status'] == 0:
                if res['success']:
                    flux_ranges[1, idx] = res['fun']
                    flux_ranges[1, idx] = res['fun']
                    flux_ranges[3, idx] = lp.get_row_value(fba_constr_row)
                    flux_ranges[3, idx] = lp.get_row_value(fba_constr_row)
                else:
                else:
                    print(f"FVA max failed for {rid}, {res['simplex_message']}")
                    print(f"FVA max failed for {rid}")
                    flux_ranges[1, idx] = np.nan
                    flux_ranges[1, idx] = np.nan
        del lp
        del lp


@@ -228,7 +254,7 @@ class FbaProblem:
        :returns: flux bounds configured in the FBA problem
        :returns: flux bounds configured in the FBA problem
        :rtype: dict (key: reaction id, value: list with two float values for lower/upper bound)
        :rtype: dict (key: reaction id, value: list with two float values for lower/upper bound)
        """
        """
        return {mrid: [bds[0], bds[1]] for mrid, bds in zip(self.mrids, self.flux_bounds.T)}
        return {mrid: [bds[0], bds[1]] for mrid, bds in zip(self.mr_rids, self.flux_bounds.T)}


    def set_flux_bounds(self, flux_bounds):
    def set_flux_bounds(self, flux_bounds):
        """selectivly set lower and upper flux bounds for given reactions.
        """selectivly set lower and upper flux bounds for given reactions.
@@ -255,6 +281,6 @@ class FbaProblem:
        :param coefficiants: objective coefficients of selected reactions
        :param coefficiants: objective coefficients of selected reactions
        :type coefficiants: dict (key: reaction id, value: float)
        :type coefficiants: dict (key: reaction id, value: float)
        """
        """
        self.obj_coefs = np.zeros(len(self.mrids))
        self.obj_coefs = np.zeros(len(self.mr_rids))
        for rid, coef in coefficiants.items():
        for rid, coef in coefficiants.items():
            self.obj_coefs[self.mrid2idx[rid]] = coef
            self.obj_coefs[self.mrid2idx[rid]] = coef
Original line number Original line Diff line number Diff line
@@ -43,8 +43,8 @@ class GbaProblem:
        # self.cache = True
        # self.cache = True


        # metabolites and enzymes in the model
        # metabolites and enzymes in the model
        metab_sids = [s.id for s in xba_model.species.values() if s.sboterm == 'SBO:0000247']
        metab_sids = [s.id for s in xba_model.species.values() if s.sboterm.is_in_branch('SBO:0000247')]
        enz_sids = [s.id for s in xba_model.species.values() if s.sboterm in ('SBO:0000250', 'SBO:0000252')]
        enz_sids = [s.id for s in xba_model.species.values() if s.sboterm.is_in_branch('SBO:0000246')]


        # optimization variables
        # optimization variables
        self.var_metab_sids = [sid for sid in metab_sids if xba_model.species[sid].constant is False]
        self.var_metab_sids = [sid for sid in metab_sids if xba_model.species[sid].constant is False]
@@ -66,7 +66,7 @@ class GbaProblem:
        #  to facilitate matrix multiplication without further remapping
        #  to facilitate matrix multiplication without further remapping
        # - here we expect a one-to-one relationship between enzyme and corresponding protein synthesis reaction
        # - here we expect a one-to-one relationship between enzyme and corresponding protein synthesis reaction
        self.mr_rids = [r.id for r in xba_model.reactions.values()
        self.mr_rids = [r.id for r in xba_model.reactions.values()
                        if r.sboterm in ('SBO:0000167', 'SBO:0000179', 'SBO:0000182')]
                        if r.sboterm.is_in_branch('SBO:0000167')]
        self.ps_rids = [xba_model.species[sid].ps_rid for sid in self.var_enz_sids]
        self.ps_rids = [xba_model.species[sid].ps_rid for sid in self.var_enz_sids]
        self.mr_vols = np.array([xba_model.compartments[xba_model.reactions[rid].compartment].size
        self.mr_vols = np.array([xba_model.compartments[xba_model.reactions[rid].compartment].size
                                 for rid in self.mr_rids])
                                 for rid in self.mr_rids])
Original line number Original line Diff line number Diff line
@@ -69,7 +69,6 @@ class LpProblem:
        self.lp = glpk.glp_create_prob()
        self.lp = glpk.glp_create_prob()
        self.ncols = 0
        self.ncols = 0
        self.nrows = 0
        self.nrows = 0
        self.simplex_result = None
        self.res = {}
        self.res = {}


    @staticmethod
    @staticmethod
@@ -92,30 +91,30 @@ class LpProblem:
            else:
            else:
                ub = 0.0
                ub = 0.0
                var_type = glpk.GLP_FX
                var_type = glpk.GLP_FX
        elif not np.isfinite(lb):
        elif not np.isfinite(lb) and not np.isfinite(ub):
            lb = 0.0
            lb = 0.0
            var_type = glpk.GLP_UP
        elif not np.isfinite(ub):
            ub = 0.0
            ub = 0.0
            var_type = glpk.GLP_LO
            var_type = glpk.GLP_FR
        else:
        elif np.isfinite(ub):
            lb = 0.0
            lb = 0.0
            var_type = glpk.GLP_UP
        else:
            ub = 0.0
            ub = 0.0
            var_type = glpk.GLP_FR
            var_type = glpk.GLP_LO
        return var_type, lb, ub
        return var_type, lb, ub


    def configure(self, obj_coefs, col_bnds, constr_coefs, row_bnds, direction='maximize'):
    def configure(self, obj_coefs, constr_coefs, row_bnds, col_bnds, direction='maximize'):
        """Configure the LP Problem.
        """Configure the LP Problem.


        Note: glpk related indices start at 1 (not zero)
        Note: glpk related indices start at 1 (not zero)
        :param obj_coefs: coefficients of objective function
        :param obj_coefs: coefficients of objective function
        :type obj_coefs: 1D ndarray of shape (1, ncols)
        :type obj_coefs: 1D ndarray of shape (1, ncols)
        :param col_bnds: lower and upper bounds of structural (col) variables
        :type col_bnds: 2D ndarray of shape(2, ncols) (1st row: lower bounds, 2nd row: upper bounds)
        :param constr_coefs: matrix with constraint coefficients
        :param constr_coefs: matrix with constraint coefficients
        :type constr_coefs: 2D nparray with shape(nrows, ncols)
        :type constr_coefs: 2D nparray with shape(nrows, ncols)
        :param row_bnds: lower and upper bounds of auxhiliary (row) variables
        :param row_bnds: lower and upper bounds of auxhiliary (row) variables
        :type row_bnds: 2D ndarray of shape(2, nrows) (1st row: lower bounds, 2nd row: upper bounds)
        :type row_bnds: 2D ndarray of shape(nrows, 2) (1st col: lower bounds, 2nd col: upper bounds)
        :param col_bnds: lower and upper bounds of structural (col) variables
        :type col_bnds: 2D ndarray of shape(2, ncols) (1st row: lower bounds, 2nd row: upper bounds)
        :param direction: direction for the optimization ('maximize' or 'minimize')
        :param direction: direction for the optimization ('maximize' or 'minimize')
        :type direction: string
        :type direction: string
        """
        """
@@ -123,7 +122,7 @@ class LpProblem:


        assert len(obj_coefs) == self.ncols
        assert len(obj_coefs) == self.ncols
        assert col_bnds.shape == (2, self.ncols)
        assert col_bnds.shape == (2, self.ncols)
        assert row_bnds.shape == (2, self.nrows)
        assert row_bnds.shape == (self.nrows, 2)


        # configure objective function
        # configure objective function
        glpk.glp_add_cols(self.lp, self.ncols)
        glpk.glp_add_cols(self.lp, self.ncols)
@@ -137,7 +136,7 @@ class LpProblem:


        # configure constraints
        # configure constraints
        glpk.glp_add_rows(self.lp, self.nrows)
        glpk.glp_add_rows(self.lp, self.nrows)
        for i, lb_ub in enumerate(row_bnds.T):
        for i, lb_ub in enumerate(row_bnds):
            var_type, lb, ub = self._get_variable_type(lb_ub)
            var_type, lb, ub = self._get_variable_type(lb_ub)
            glpk.glp_set_row_bnds(self.lp, 1 + i, var_type, lb, ub)
            glpk.glp_set_row_bnds(self.lp, 1 + i, var_type, lb, ub)
        constr_coefs_sparse = scipy.sparse.coo_matrix(constr_coefs)
        constr_coefs_sparse = scipy.sparse.coo_matrix(constr_coefs)
@@ -251,11 +250,11 @@ class LpProblem:
            glpk.glp_scale_prob(self.lp, opt)
            glpk.glp_scale_prob(self.lp, opt)
        # sjj = [glpk.glp_get_sjj(self.lp, 1+i) for i in range(glpk.glp_get_num_cols(self.lp))]
        # sjj = [glpk.glp_get_sjj(self.lp, 1+i) for i in range(glpk.glp_get_num_cols(self.lp))]
        # rii = [glpk.glp_get_rii(self.lp, 1+i) for i in range(glpk.glp_get_num_rows(self.lp))]
        # rii = [glpk.glp_get_rii(self.lp, 1+i) for i in range(glpk.glp_get_num_rows(self.lp))]
        self.simplex_result = glpk.glp_simplex(self.lp, None)
        simplex_result = glpk.glp_simplex(self.lp, None)
        return self.results(short_result)
        return self.results(simplex_result, short_result)


    def get_row_value(self, row):
    def get_row_value(self, row):
        """Retrieve right hand side of a constraint (auxiliary variable)
        """Retrieve right-hand side of a constraint (auxiliary variable)


        :param row: index into constraints
        :param row: index into constraints
        :type row: int
        :type row: int
@@ -265,11 +264,13 @@ class LpProblem:
        assert row <= glpk.glp_get_num_rows(self.lp)
        assert row <= glpk.glp_get_num_rows(self.lp)
        return glpk.glp_get_row_prim(self.lp, row)
        return glpk.glp_get_row_prim(self.lp, row)


    def results(self, short_result=False):
    def results(self, simplex_result, short_result=False):
        """Collect results for the optimization
        """Collect results for the optimization


        Alternatively, reduced results, e.g. when doing pFBA
        Alternatively, reduced results, e.g. when doing pFBA


        :param simplex_result: result from the LP solver
        :type simplex_result: int
        :param short_result: short result (only objective value and status) or long result
        :param short_result: short result (only objective value and status) or long result
        :type short_result: bool (default:False)
        :type short_result: bool (default:False)
        :return: result from the FBA optimization
        :return: result from the FBA optimization
@@ -277,13 +278,14 @@ class LpProblem:
        """
        """
        self.res = {
        self.res = {
            'fun': glpk.glp_get_obj_val(self.lp),
            'fun': glpk.glp_get_obj_val(self.lp),
            'simplex_status': self.simplex_result,
            'success': glpk.glp_get_status(self.lp) == glpk.GLP_OPT,
            'simplex_message': simplex_status[self.simplex_result],
        }
        }
        if short_result is False:
        if short_result is False:
            self.res['x'] = np.array([glpk.glp_get_col_prim(self.lp, 1 + i) for i in range(self.ncols)])
            self.res['x'] = np.array([glpk.glp_get_col_prim(self.lp, 1 + i) for i in range(self.ncols)])
            self.res['reduced_costs'] = np.array([glpk.glp_get_col_dual(self.lp, 1 + i) for i in range(self.ncols)])
            self.res['reduced_costs'] = np.array([glpk.glp_get_col_dual(self.lp, 1 + i) for i in range(self.ncols)])
            self.res['shadow_prices'] = np.array([glpk.glp_get_row_dual(self.lp, 1 + i) for i in range(self.nrows)])
            self.res['shadow_prices'] = np.array([glpk.glp_get_row_dual(self.lp, 1 + i) for i in range(self.nrows)])
            self.res['simplex_result'] = simplex_result
            self.res['simplex_message'] = simplex_status[simplex_result],
            self.res['generic_status'] = generic_status[glpk.glp_get_status(self.lp)]
            self.res['generic_status'] = generic_status[glpk.glp_get_status(self.lp)]
            self.res['primal_status'] = prim_status[glpk.glp_get_prim_stat(self.lp)]
            self.res['primal_status'] = prim_status[glpk.glp_get_prim_stat(self.lp)]
            self.res['dual_status'] = dual_status[glpk.glp_get_dual_stat(self.lp)]
            self.res['dual_status'] = dual_status[glpk.glp_get_dual_stat(self.lp)]
+278 −0
Original line number Original line Diff line number Diff line
"""Implementation of GBA Problem class.

   - several process machines supported (yet to be validated)
   - non-enzymatic metabolic reactions supported (yet to be validated)

   Yet to implement:
   - enzyme degradation
   - enzyme transitions
   - several compartments constraints
   - target concentrations and target fluxes

Peter Schubert, HHU Duesseldorf, June 2022
"""

import numpy as np

from xbanalysis.problems.lp_problem import LpProblem


class RbaProblem:

    def __init__(self, xba_model):
        """

        :param xba_model:
        """
        # metabolic reactions and enzyme transitions (no FBA reactions, no protein synthesis, no degradation)
        self.mr_rids = [r.id for r in xba_model.reactions.values() if r.sboterm.is_in_branch('SBO:0000167')
                        and r.sboterm.is_in_branch('SBO:0000179') is False]
        self.mrid2enz = {rid: xba_model.reactions[rid].enzyme for rid in self.mr_rids}
        self.enz_sids = [enz_sid for enz_sid in self.mrid2enz.values() if enz_sid is not None]
        self.rev_enz_sids = [enz_sid for mr_rid, enz_sid in self.mrid2enz.items()
                             if enz_sid is not None and xba_model.reactions[mr_rid].reversible]
        self.processes = {s.rba_process_machine: sid for sid, s in xba_model.species.items()
                          if hasattr(s, 'rba_process_machine')}
        self.psenz_rids = [xba_model.species[enz_sid].ps_rid for enz_sid in self.enz_sids]
        self.pspm_rids = [xba_model.species[enz_sid].ps_rid for enz_sid in self.processes.values()]
        self.densities = {cid: c.rba_frac_gcdw for cid, c in xba_model.compartments.items()
                          if hasattr(c, 'rba_frac_gcdw')}

        # metabolites and enzymes used in metabolic reactions and enzyme transitions
        sids = set()
        for rid in self.mr_rids:
            for sid in xba_model.reactions[rid].reactants:
                sids.add(sid)
            for sid in xba_model.reactions[rid].products:
                sids.add(sid)
        used_sids = list(sids)
        self.sids = [sid for sid in used_sids if xba_model.species[sid].constant is False]
        self.ext_conc = {sid: xba_model.species[sid].initial_conc
                         for sid in used_sids if xba_model.species[sid].constant is True}

        self.rba_cols = self.mr_rids + self.enz_sids + list(self.processes)
        self.rba_rows = (self.sids + list(self.processes)
                         + [sid + '_feff' for sid in self.enz_sids]
                         + [sid + '_reff' for sid in self.rev_enz_sids]
                         + list(self.densities))

        self.is_rba_model = self.check_rba_model(xba_model)
        if self.is_rba_model is False:
            print('Error: not a GBA model - missing parameters')
            return

        fix_m, var_m, row_bnds_m = self.construct_mass_balance(xba_model)
        fix_pc, var_pc, row_bnds_pc = self.construct_process_capacities(xba_model)
        fix_eff, var_eff, row_bnds_eff = self.construct_efficencies(xba_model)
        fix_cd, var_cd, row_bnds_cd = self.construct_densities(xba_model)
        self.fix_cd = fix_cd
        self.fix_mat = np.vstack((fix_m, fix_pc, fix_eff, fix_cd))
        self.var_mat = np.vstack((var_m, var_pc, var_eff, var_cd))
        self.fix_row_bnds = np.vstack((np.zeros_like(row_bnds_m), np.zeros_like(row_bnds_pc),
                                       row_bnds_eff, row_bnds_cd))
        self.var_row_bnds = np.vstack((row_bnds_m, row_bnds_pc,
                                       np.zeros_like(row_bnds_eff), np.zeros_like(row_bnds_cd)))

        # in RBA we only check feasibility.
        # Here we maximize enzyme amount and check that objective value e.g. > 1e-10 mmol/gcdw
        self.obj_dir = 'maximize'
        self.obj_coefs = np.hstack((np.zeros(len(self.mr_rids)),
                                    np.ones(len(self.enz_sids) + len(self.processes))))
        col_lbs = np.hstack(([xba_model.reactions[rid].fbc_lb for rid in self.mr_rids],
                             np.zeros(len(self.enz_sids) + len(self.processes))))
        col_ubs = np.hstack(([xba_model.reactions[rid].fbc_ub for rid in self.mr_rids],
                             np.nan * np.ones(len(self.enz_sids) + len(self.processes))))
        self.col_bnds = np.vstack((col_lbs, col_ubs))
        self.scaling = '2N'
        self.max_iter = 20

    def check_rba_model(self, xba_model):
        """Check if rba related data available in the XBA model

        :param xba_model: Xba model with data extracted from SBML file
        :type xba_model: XbaModel
        :return: indicator if model contains data required for RBA problem formulation
        :rtype: bool
        """
        is_rba_model = True

        r_check = {'flux lower bound': 'fbc_lb', 'flux upper bound': 'fbc_ub', 'kapp forward': 'rba_kapp_f'}
        for check, param in r_check.items():
            missing = [rid for rid in self.mr_rids if hasattr(xba_model.reactions[rid], param) is False]
            if len(missing) > 0:
                print(f'missing {check} in:', missing)
                is_rba_model = False

        missing = [rid for rid in self.mr_rids if xba_model.reactions[rid].reversible
                   and hasattr(xba_model.reactions[rid], 'rba_kapp_r') is False]
        if len(missing) > 0:
            print('missing kapp reverse in:', missing)
            is_rba_model = False

        missing = [sid for sid in self.sids + self.enz_sids + list(self.processes.values())
                   if hasattr(xba_model.species[sid], 'mw') is False]
        if len(missing) > 0:
            print('missing molecular weight in:', missing)
            is_rba_model = False

        missing = [sid for sid in list(self.processes.values())
                   if hasattr(xba_model.species[sid], 'rba_process_capacity') is False]
        if len(missing) > 0:
            print('missing process capacity in:', missing)
            is_rba_model = False

        if len(self.densities) == 0:
            print('missing a density constraint')
            is_rba_model = False
        return is_rba_model

    def rba_optimize(self):
        """

        :return:
        """
        if self.is_rba_model is False:
            print('Error: not a RBA model - missing parameters')
            res = {'success': False, 'fun': 0.0, 'message': 'not an RBA model'}
            return res

        res = {}
        # Preanalysis: find range of growth rates
        gr = {'lb': 0.0, 'ub': np.nan, 'try': 1.0}
        for i in range(self.max_iter):
            constr_coefs = self.fix_mat + gr['try'] * self.var_mat
            # row_bnds = self.fix_row_bnds + gr['try'] * self.var_row_bnds
            lp = LpProblem()
            lp.configure(self.obj_coefs, constr_coefs, self.fix_row_bnds, self.col_bnds, self.obj_dir)
            res = lp.solve(scaling=self.scaling)
            del lp
            if res['success'] and res['fun'] > 1e-10:
                gr['lb'] = gr['try']
                gr['try'] *= 10
            else:
                if gr['try'] <= 1e-4:
                    break
                gr['ub'] = gr['try']
                gr['try'] /= 10
            if gr['lb'] > 0.0 and np.isfinite(gr['ub']):
                break

        if gr['lb'] == 0.0 or np.isnan(gr['ub']):
            res = {'success': False, 'fun': 0.0, 'message': 'no solution found'}
            return res

        # find exact growth rate
        for i in range(self.max_iter):
            gr['try'] = 0.5 * (gr['lb'] + gr['ub'])
            constr_coefs = self.fix_mat + gr['try'] * self.var_mat
            # row_bnds = self.fix_row_bnds + gr['try'] * self.var_row_bnds
            lp = LpProblem()
            lp.configure(self.obj_coefs, constr_coefs, self.fix_row_bnds, self.col_bnds, self.obj_dir)
            res = lp.solve(scaling=self.scaling)
            del lp
            if res['success'] and res['fun'] > 1e-10:
                gr['lb'] = gr['try']
            else:
                gr['ub'] = gr['try']
            if (gr['ub'] - gr['lb']) < gr['try'] * 1e-5:
                break
        res['fun'] = gr['try']
        return res

    def construct_mass_balance(self, xba_model):

        fix_m_x_mr = xba_model.get_stoic_matrix(self.sids, self.mr_rids)
        var_m_x_enz = xba_model.get_stoic_matrix(self.sids, self.psenz_rids)
        var_m_x_pm = xba_model.get_stoic_matrix(self.sids, self.pspm_rids)
        fix_m = np.hstack((fix_m_x_mr,
                           np.zeros((len(self.sids), len(self.enz_sids)+len(self.processes)))))
        var_m = np.hstack((np.zeros((len(self.sids), len(self.mr_rids))),
                           var_m_x_enz,
                           var_m_x_pm))
        targets = np.array([getattr(xba_model.species[sid], 'rba_target', 0.0) for sid in self.sids])
        row_bnds_m = np.vstack((targets, targets)).T
        return fix_m, var_m, row_bnds_m

    def construct_process_capacities(self, xba_model):

        var_pc_x_enz = np.zeros((len(self.processes), len(self.psenz_rids)))
        for idx, pm in enumerate(self.processes):
            var_pc_x_enz[idx] = [xba_model.species[enz_sid].rba_process_costs.get(pm, 0.0)
                                 for enz_sid in self.enz_sids]
        var_pc_x_pm = np.zeros((len(self.processes), len(self.processes)))
        for idx, pm in enumerate(self.processes):
            var_pc_x_pm[idx] = [xba_model.species[enz_sid].rba_process_costs.get(pm, 0.0)
                                for enz_sid in self.processes.values()]
        fix_pc_x_pm = -np.diag([xba_model.species[enz_sid].rba_process_capacity
                                for enz_sid in self.processes.values()]) * 3600
        fix_pc = np.hstack((np.zeros((len(self.processes), len(self.mr_rids) + len(self.enz_sids))),
                            fix_pc_x_pm))
        var_pc = np.hstack((np.zeros((len(self.processes), len(self.mr_rids))),
                            var_pc_x_enz, var_pc_x_pm))
        row_bnds_pc = np.zeros((len(self.processes), 2))
        return fix_pc, var_pc, row_bnds_pc

    def get_efficiencies(self, xba_model):

        efficiencies = np.zeros((2, len(self.enz_sids)))
        idx = 0
        for mr_rid, enz_sid in self.mrid2enz.items():
            if enz_sid is not None:
                r = xba_model.reactions[mr_rid]
                saturation = 1.0
                if hasattr(r, 'rba_km_ext'):
                    for sid in r.reactants:
                        if sid in self.ext_conc:
                            saturation *= self.ext_conc[sid] / (self.ext_conc[sid] + r.rba_km_ext)
                efficiencies[0, idx] = r.rba_kapp_f * 3600 * saturation
                saturation = 1.0
                if r.reversible:
                    if hasattr(r, 'rba_km_ext'):
                        for sid in r.products:
                            if sid in self.ext_conc:
                                saturation *= self.ext_conc[sid] / (self.ext_conc[sid] + r.rba_km_ext)
                    efficiencies[1, idx] = r.rba_kapp_r * 3600 * saturation
                idx += 1
        return efficiencies

    def construct_efficencies(self, xba_model):

        efficiencies = self.get_efficiencies(xba_model)
        mask = np.array([enz_sid is not None for enz_sid in self.mrid2enz.values()])
        fix_feff_x_mr = np.diag(np.ones(len(self.mrid2enz)))[mask]
        fix_feff_x_enz = np.diag(-efficiencies[0])
        fix_feff_x_pm = np.zeros((len(self.enz_sids), len(self.processes)))
        fix_feff = np.hstack((fix_feff_x_mr, fix_feff_x_enz, fix_feff_x_pm))
        row_bnds_feff = np.vstack((np.ones(len(self.enz_sids)) * np.nan, np.zeros(len(self.enz_sids)))).T

        mask = [enz_sid is not None and xba_model.reactions[mr_rid].reversible
                for mr_rid, enz_sid in self.mrid2enz.items()]
        fix_reff_x_mr = np.diag(-np.ones(len(self.mrid2enz)))[mask]
        mask = [enz_sid in self.rev_enz_sids for enz_sid in self.enz_sids]
        fix_reff_x_enz = np.diag(-efficiencies[1])[mask]
        fix_reff_x_pm = np.zeros((len(self.rev_enz_sids), len(self.processes)))
        fix_reff = np.hstack((fix_reff_x_mr, fix_reff_x_enz, fix_reff_x_pm))
        row_bnds_reff = np.vstack((np.ones(len(self.rev_enz_sids)) * np.nan, np.zeros(len(self.rev_enz_sids)))).T
        fix_eff = np.vstack((fix_feff, fix_reff))
        var_eff = np.zeros_like(fix_eff)
        row_bnds_eff = np.vstack((row_bnds_feff, row_bnds_reff))
        return fix_eff, var_eff, row_bnds_eff

    def construct_densities(self, xba_model):
        fix_cd_x_mr = np.zeros((len(self.densities), len(self.mr_rids)))
        fix_cd_x_enz = np.zeros((len(self.densities), len(self.enz_sids)))
        for row_idx, cid in enumerate(self.densities):
            for col_idx, sid in enumerate(self.enz_sids):
                if xba_model.species[sid].compartment == cid:
                    fix_cd_x_enz[row_idx, col_idx] = xba_model.species[sid].mw / 1000
        fix_cd_x_pm = np.zeros((len(self.densities), len(self.processes)))
        for row_idx, cid in enumerate(self.densities):
            for col_idx, sid in enumerate(self.processes.values()):
                if xba_model.species[sid].compartment == cid:
                    fix_cd_x_pm[row_idx, col_idx] = xba_model.species[sid].mw / 1000
        fix_cd = np.hstack((fix_cd_x_mr, fix_cd_x_enz, fix_cd_x_pm))

        var_cd = np.zeros_like(fix_cd)
        row_bnds_cd = np.vstack((np.zeros(len(self.densities)),
                                 [val for val in self.densities.values()])).T
        return fix_cd, var_cd, row_bnds_cd
Original line number Original line Diff line number Diff line
@@ -6,6 +6,9 @@ Peter Schubert, HHU Duesseldorf, May 2022
import sys
import sys
import re
import re


xml_gba_ns = 'http://www.hhu.de/ccb/gba/ns'
xml_rba_ns = 'http://www.hhu.de/ccb/rba/ns'



def get_sbml_ids(df, sbo_id):
def get_sbml_ids(df, sbo_id):
    """Get SBML IDs with specified sboterm-id.
    """Get SBML IDs with specified sboterm-id.