From 1dc4ede45a0876479e223883e99089dab08d2e1e Mon Sep 17 00:00:00 2001
From: Peter Schubert <Peter.Schubert@hhu.de>
Date: Fri, 1 Jul 2022 15:10:12 +0200
Subject: [PATCH] implemented pFBA and FVA

---
 xbanalysis/model/xba_compartment.py |  12 ++-
 xbanalysis/model/xba_function.py    |   2 +-
 xbanalysis/model/xba_model.py       |   3 +
 xbanalysis/model/xba_parameter.py   |   5 +-
 xbanalysis/model/xba_reaction.py    |  34 ++++----
 xbanalysis/model/xba_species.py     |  16 ++--
 xbanalysis/problems/fba_problem.py  | 119 ++++++++++++++++++++++------
 xbanalysis/problems/lp_problem.py   |   7 ++
 xbanalysis/utils/utils.py           |   3 +-
 9 files changed, 147 insertions(+), 54 deletions(-)

diff --git a/xbanalysis/model/xba_compartment.py b/xbanalysis/model/xba_compartment.py
index 752443d..c7841ce 100644
--- a/xbanalysis/model/xba_compartment.py
+++ b/xbanalysis/model/xba_compartment.py
@@ -3,11 +3,15 @@
 Peter Schubert, HHU Duesseldorf, May 2022
 """
 
+
 class XbaCompartment:
 
     def __init__(self, s_compartment):
         self.id = s_compartment.name
-        self.name = s_compartment.get('name', '')
-        self.size = s_compartment['size']
-        self.units = s_compartment['units']
-        self.dimension = s_compartment['spatialDimension']
\ No newline at end of file
+        self.name = s_compartment.get('name', self.id)
+        if 'size' in s_compartment:
+            self.size = s_compartment['size']
+        if 'units' in s_compartment:
+            self.units = s_compartment['units']
+        if 'spatialDimension' in s_compartment:
+            self.dimension = s_compartment['spatialDimension']
\ No newline at end of file
diff --git a/xbanalysis/model/xba_function.py b/xbanalysis/model/xba_function.py
index 923b9a0..950bea1 100644
--- a/xbanalysis/model/xba_function.py
+++ b/xbanalysis/model/xba_function.py
@@ -11,7 +11,7 @@ class XbaFunction:
 
     def __init__(self, s_function):
         self.id = s_function.name
-        self.name = s_function.get('name', '')
+        self.name = s_function.get('name', self.id)
         self.math = s_function['math']
 
         m = re.search(r'\blambda\((.*)\)', sbmlxdf.misc.mathml2numpy(s_function['math']))
diff --git a/xbanalysis/model/xba_model.py b/xbanalysis/model/xba_model.py
index b4f45b2..87d0201 100644
--- a/xbanalysis/model/xba_model.py
+++ b/xbanalysis/model/xba_model.py
@@ -36,6 +36,9 @@ class XbaModel:
         if 'funcDefs' in model_dict:
             self.functions = {fid: XbaFunction(row)
                               for fid, row in model_dict['funcDefs'].iterrows()}
+        else:
+            self.functions = {}
+
         self.species = {sid: XbaSpecies(row)
                         for sid, row in model_dict['species'].iterrows()}
         self.reactions = {rid: XbaReaction(row, self.functions, self.compartments)
diff --git a/xbanalysis/model/xba_parameter.py b/xbanalysis/model/xba_parameter.py
index 812301c..4b4ac06 100644
--- a/xbanalysis/model/xba_parameter.py
+++ b/xbanalysis/model/xba_parameter.py
@@ -8,8 +8,9 @@ class XbaParameter:
 
     def __init__(self, s_parameter):
         self.id = s_parameter.name
-        self.name = s_parameter.get('name', '')
-        self.sboterm = s_parameter.get('sboterm', '')
+        self.name = s_parameter.get('name', self.id)
         self.value = s_parameter['value']
         self.constant = s_parameter['constant']
         self.units = s_parameter['units']
+        if 'sboterm' in s_parameter:
+            self.sboterm = s_parameter['sboterm']
\ No newline at end of file
diff --git a/xbanalysis/model/xba_reaction.py b/xbanalysis/model/xba_reaction.py
index 517a1bf..0627e81 100644
--- a/xbanalysis/model/xba_reaction.py
+++ b/xbanalysis/model/xba_reaction.py
@@ -12,25 +12,31 @@ class XbaReaction:
 
     def __init__(self, s_reaction, functions, compartments):
         self.id = s_reaction.name
-        self.name = s_reaction.get('name', '')
-        self.sboterm = s_reaction['sboterm']
+        self.name = s_reaction.get('name', self.id)
+        if 'sboterm' in s_reaction:
+            self.sboterm = s_reaction['sboterm']
         self.reaction_string = s_reaction['reactionString']
         self.reversible = s_reaction['reversible']
         self.products = get_species_stoic(s_reaction['products'])
         self.reactants = get_species_stoic(s_reaction['reactants'])
-        self.modifiers = get_species_stoic(s_reaction['modifiers'])
-        self.fbc_lb = s_reaction.get('fbcLb', float('nan'))
-        self.fbc_ub = s_reaction.get('fbcUb', float('nan'))
+        if 'modifiers' in s_reaction:
+            self.modifiers = get_species_stoic(s_reaction['modifiers'])
+        else:
+            self.modifiers = {}
+        if 'fbcLb' in s_reaction and 'fbcUb' in s_reaction:
+            self.fbc_lb = s_reaction['fbcLb']
+            self.fbc_ub = s_reaction['fbcUb']
         self.local_params = get_local_params(s_reaction)
-        self.kinetic_law = s_reaction['kineticLaw'] if type(s_reaction['kineticLaw']) == str else ''
-        # convert to numpy, replace local parameters with numerical values, inline lambda functions:
-        expanded_kl = sbmlxdf.misc.mathml2numpy(self.kinetic_law)
-        for pid, val in self.local_params.items():
-            expanded_kl = re.sub(r'\b' + pid + r'\b', str(val), expanded_kl)
-        self.expanded_kl = expand_kineticlaw(expanded_kl, functions)
-        for cid in compartments.keys():
-            if re.search(r'\b'+cid+r'\b', self.expanded_kl) is not None:
-                self.compartment = cid
+        if 'kineticLaw' in s_reaction:
+            self.kinetic_law = s_reaction['kineticLaw'] if type(s_reaction['kineticLaw']) == str else ''
+            # convert to numpy, replace local parameters with numerical values, inline lambda functions:
+            expanded_kl = sbmlxdf.misc.mathml2numpy(self.kinetic_law)
+            for pid, val in self.local_params.items():
+                expanded_kl = re.sub(r'\b' + pid + r'\b', str(val), expanded_kl)
+            self.expanded_kl = expand_kineticlaw(expanded_kl, functions)
+            for cid in compartments.keys():
+                if re.search(r'\b'+cid+r'\b', self.expanded_kl) is not None:
+                    self.compartment = cid
         self.ps_product = ''
         self.enzyme = ''
 
diff --git a/xbanalysis/model/xba_species.py b/xbanalysis/model/xba_species.py
index 806fd08..6d0bdbc 100644
--- a/xbanalysis/model/xba_species.py
+++ b/xbanalysis/model/xba_species.py
@@ -14,15 +14,19 @@ class XbaSpecies:
 
     def __init__(self, s_species):
         self.id = s_species.name
-        self.name = s_species.get('name', '')
-        self.sboterm = s_species['sboterm']
+        self.name = s_species.get('name', self.id)
+        # set sboterm to 000247 to support Flux Balance Analysis
+        self.sboterm = s_species.get('sboterm', 'SBO:0000247')
         self.compartment = s_species['compartment']
-        self.initial_conc = s_species['initialConcentration']
         self.constant = s_species['constant']
         self.boundary = s_species['boundaryCondition']
-        self.units = s_species['substanceUnits']
-        attrs = sbmlxdf.extract_xml_attrs(s_species['xmlAnnotation'], ns=xml_gba_ns, token=xml_token)
-        self.mw = float(attrs['weight_Da'])
+        if 'initialConcentration' in s_species:
+            self.initial_conc = s_species['initialConcentration']
+        if 'units' in s_species:
+            self.units = s_species['substanceUnits']
+        if 'xmlAnnotation' in s_species:
+            attrs = sbmlxdf.extract_xml_attrs(s_species['xmlAnnotation'], ns=xml_gba_ns, token=xml_token)
+            self.mw = float(attrs['weight_Da'])
         self.ps_rid = None
         self.m_reactions = {}
         self.ps_reactions = {}
diff --git a/xbanalysis/problems/fba_problem.py b/xbanalysis/problems/fba_problem.py
index ffa4f60..6d79492 100644
--- a/xbanalysis/problems/fba_problem.py
+++ b/xbanalysis/problems/fba_problem.py
@@ -4,6 +4,7 @@ Peter Schubert, HHU Duesseldorf, June 2022
 """
 
 import numpy as np
+import pandas as pd
 
 from xbanalysis.problems.lp_problem import LpProblem
 
@@ -39,6 +40,7 @@ class FbaProblem:
 
         self.metab_sids = [s.id for s in xba_model.species.values() if s.sboterm == 'SBO:0000247']
         metabs = set(self.metab_sids)
+        # identify reactions that do not use enzymes in reactants and products
         self.mrids = [rid for rid, rxn in xba_model.reactions.items()
                       if len((set(rxn.products) | set(rxn.reactants)) - metabs) == 0]
         self.mrid2idx = {mrid: idx for idx, mrid in enumerate(self.mrids)}
@@ -56,58 +58,125 @@ class FbaProblem:
                 self.set_obj_coefs(fbc_obj.coefficiants)
                 break
 
+    def create_fba_lp(self):
+        """create Linear Programming Problem for FBA with split reactions
+
+        Splitting reactions in forward/backward directions reduces high values during FBA
+        Splitting required for pFBA (minimize absolute flux values)
+
+        :return: LpProblem configured for FBA
+        :rtype: LpProblem
+        """
+        lp = LpProblem()
+        # split forward and backward reactions
+        fwd_bwd_s_mat = np.hstack((self.s_mat, -self.s_mat))
+        fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs))
+        fwd_bwd_bounds = np.hstack((np.fmax(self.flux_bounds, 0),
+                                    np.vstack((np.fmax(-self.flux_bounds[1], 0),
+                                               np.fmax(-self.flux_bounds[0], 0)))))
+        lp.configure(fwd_bwd_obj_coefs, fwd_bwd_bounds,
+                     fwd_bwd_s_mat, np.zeros((2, fwd_bwd_s_mat.shape[0])), self.obj_dir)
+        return lp
+
     def fba_optimize(self):
         """FBA optimization of current problem
 
-        Instantiate an LpProblem, configure it and delete it again.
-
         :return: results information from the optimization
         :rtype: dict
         """
-        lp = LpProblem()
-        lp.configure(self.obj_coefs, self.flux_bounds,
-                     self.s_mat, np.zeros((2, self.s_mat.shape[0])), self.obj_dir)
+        lp = self.create_fba_lp()
         res = lp.solve()
         del lp
+
+        # combine results for split fluxes
+        n_cols = len(self.mrids)
+        res['x'] = res['x'][:n_cols] - res['x'][n_cols:]
+        res['reduced_costs'] = res['reduced_costs'][:n_cols] - res['reduced_costs'][n_cols:]
         return res
 
     def pfba_optimize(self, fract_of_optimum=1.0):
         """ pfba analysis
 
-        As we are interested in minimizing the absolute value of fluxes,
-        we split fluxes in forward and reverse fluxes to make them all positive
+        Reactions need to be split in forward/backard (to minimize absolute flux value)
 
         :param fract_of_optimum: value between 0.0 and 1.0.
         :type fract_of_optimum: float (default 1.0)
         :return:
         """
-        # create and solve the FBA problem with the configured objective function
-        # for pFBA we need to split fluxes in forward and backward fluxes, with bounds set as such
-        #  that fluxes can only be positive, lb>=0, ub>=0
-
-        n_cols = len(self.mrids)
-        lp = LpProblem()
-
-        fwd_bwd_s_mat = np.hstack((self.s_mat, -self.s_mat))
-        fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs))
-        fwd_bwd_bounds = np.hstack((np.fmax(self.flux_bounds, 0),
-                                    np.vstack((np.fmax(-self.flux_bounds[1], 0),
-                                               np.fmax(-self.flux_bounds[0], 0)))))
-        lp.configure(fwd_bwd_obj_coefs, fwd_bwd_bounds,
-                     fwd_bwd_s_mat, np.zeros((2, fwd_bwd_s_mat.shape[0])), self.obj_dir)
+        lp = self.create_fba_lp()
         res = lp.solve(short_result=True)
-        # fix FBA objective as constraint with lower bound and minimize total fluxes
         if res['simplex_status'] == 0:
-            row_bds = np.array([fract_of_optimum * res['fun'], np.nan])
+            # fix FBA objective as constraint
+            fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs))
+            if self.obj_dir == 'maximize':
+                row_bds = np.array([res['fun'] * fract_of_optimum, np.nan])
+            else:
+                row_bds = np.array([np.nan, res['fun'] / fract_of_optimum])
             lp.add_constraint(fwd_bwd_obj_coefs, row_bds)
+            # new LP objective is to minimize sum of fluxes
+            n_cols = len(self.mrids)
             lp.set_obj_coefs(np.ones(2 * n_cols))
             lp.set_obj_dir('minimize')
             res = lp.solve()
+            # combine results for split fluxes
+            res['x'] = res['x'][:n_cols] - res['x'][n_cols:]
+            res['reduced_costs'] = res['reduced_costs'][:n_cols] - res['reduced_costs'][n_cols:]
         del lp
-        res['x'] = res['x'][:n_cols] - res['x'][n_cols:]
-        res['reduced_costs'] = res['reduced_costs'][:n_cols] - res['reduced_costs'][n_cols:]
         return res
 
+    def fva_optimize(self, rids=None, fract_of_optimum=1.0):
+        """FVA - flux variability analysis for all or selected reactions.
+
+        :param rids: selected reaction ids to run FVA (alternatively, all)
+        :type rids: list of strings (default: None - all reactions)
+        :param fract_of_optimum: value between 0.0 and 1.0.
+        :type fract_of_optimum: float (default 1.0)
+        :return: Flux ranges and objective values
+        :rtype: pandas DataFrame
+        """
+        lp = self.create_fba_lp()
+        res = lp.solve(short_result=True)
+
+        if rids is None:
+            rids = self.mrids
+        flux_ranges = np.zeros((len(rids), 4))
+        if res['simplex_status'] == 0:
+            # fix FBA objective as constraint
+            fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs))
+            if self.obj_dir == 'maximize':
+                row_bds = np.array([res['fun'] * fract_of_optimum, np.nan])
+            else:
+                row_bds = np.array([np.nan, res['fun'] / fract_of_optimum])
+            fba_constr_row = lp.add_constraint(fwd_bwd_obj_coefs, row_bds)
+
+            n_cols = len(self.mrids)
+            for idx, rid in enumerate(rids):
+                # select reaction maximize/minimize flux
+                obj_coefs = np.zeros(2 * n_cols)
+                obj_coefs[self.mrid2idx[rid]] = 1
+                obj_coefs[self.mrid2idx[rid] + n_cols] = -1
+                lp.set_obj_coefs(obj_coefs)
+
+                lp.set_obj_dir('minimize')
+                res = lp.solve(short_result=True)
+                if res['simplex_status'] == 0:
+                    flux_ranges[idx, 0] = res['fun']
+                    flux_ranges[idx, 2] = lp.get_row_value(fba_constr_row)
+                else:
+                    print(f"FVA min failed for {rid}, {res['simplex_message']}")
+                    flux_ranges[idx, 0] = np.nan
+
+                lp.set_obj_dir('maximize')
+                res = lp.solve(short_result=True)
+                if res['simplex_status'] == 0:
+                    flux_ranges[idx, 1] = res['fun']
+                    flux_ranges[idx, 3] = lp.get_row_value(fba_constr_row)
+                else:
+                    print(f"FVA max failed for {rid}, {res['simplex_message']}")
+                    flux_ranges[idx, 1] = np.nan
+        del lp
+        return pd.DataFrame(flux_ranges, index=rids, columns=['min', 'max', 'obj_min', 'obj_max'])
+
     @property
     def obj_dir(self):
         return self.__obj_dir
diff --git a/xbanalysis/problems/lp_problem.py b/xbanalysis/problems/lp_problem.py
index 90b6dea..fcfad11 100644
--- a/xbanalysis/problems/lp_problem.py
+++ b/xbanalysis/problems/lp_problem.py
@@ -255,6 +255,13 @@ class LpProblem:
         return self.results(short_result)
 
     def get_row_value(self, row):
+        """Retrieve right hand side of a constraint (auxiliary variable)
+
+        :param row: index into constraints
+        :type row: int
+        :return: value of auxiliary varibale (value of rhs)
+        :rtype: float
+        """
         assert row <= glpk.glp_get_num_rows(self.lp)
         return glpk.glp_get_row_prim(self.lp, row)
 
diff --git a/xbanalysis/utils/utils.py b/xbanalysis/utils/utils.py
index 7a0d0b1..2e3f5b8 100644
--- a/xbanalysis/utils/utils.py
+++ b/xbanalysis/utils/utils.py
@@ -90,10 +90,9 @@ def get_local_params(reaction):
         for lp in record_generator(reaction['localParams']):
             params = extract_params(lp)
             local_params[params['id']] = float(params['value'])
-        return local_params
+    return local_params
 
 
-# added
 def get_species_stoic(srefs):
     """Retrieve species and stoichiometry from reaction srefs.
 
-- 
GitLab