Skip to content
Snippets Groups Projects
Commit 73bac7db authored by Peter Schubert's avatar Peter Schubert
Browse files

implementation of RBA (resource balance analysis)

parent 45ab9851
No related branches found
No related tags found
No related merge requests found
......@@ -3,6 +3,10 @@
Peter Schubert, HHU Duesseldorf, May 2022
"""
import sbmlxdf
from xbanalysis.utils.utils import xml_rba_ns
class XbaCompartment:
......@@ -15,3 +19,7 @@ class XbaCompartment:
self.units = s_compartment['units']
if 'spatialDimension' in s_compartment:
self.dimension = s_compartment['spatialDimension']
if 'xmlAnnotation' in s_compartment:
attrs = sbmlxdf.extract_xml_attrs(s_compartment['xmlAnnotation'], ns=xml_rba_ns, token='density')
if 'frac_gcdw' in attrs:
self.rba_frac_gcdw = float(attrs['frac_gcdw'])
......@@ -41,7 +41,7 @@ class XbaModel:
self.species = {sid: XbaSpecies(row)
for sid, row in model_dict['species'].iterrows()}
self.reactions = {rid: XbaReaction(row, self.functions, self.compartments)
self.reactions = {rid: XbaReaction(row, self.species, self.functions, self.compartments)
for rid, row in model_dict['reactions'].iterrows()}
for r in self.reactions.values():
......
......@@ -6,28 +6,30 @@ Peter Schubert, HHU Duesseldorf, May 2022
import re
import sbmlxdf
from xbanalysis.utils.utils import get_species_stoic, get_local_params, expand_kineticlaw
from xbanalysis.utils.utils import xml_rba_ns
class XbaReaction:
def __init__(self, s_reaction, functions, compartments):
def __init__(self, s_reaction, species, functions, compartments):
self.id = s_reaction.name
self.name = s_reaction.get('name', self.id)
if 'sboterm' in s_reaction:
self.sboterm = s_reaction['sboterm']
self.reaction_string = s_reaction['reactionString']
self.reversible = s_reaction['reversible']
self.products = get_species_stoic(s_reaction['products'])
self.reactants = get_species_stoic(s_reaction['reactants'])
self.products = get_species_stoic(s_reaction['products'])
self.metabs_only = True
for sid in list(self.reactants) + list(self.products):
if species[sid].sboterm != 'SBO:0000247':
self.metabs_only = False
if 'modifiers' in s_reaction:
self.modifiers = get_species_stoic(s_reaction['modifiers'])
else:
self.modifiers = {}
if 'fbcLb' in s_reaction and 'fbcUb' in s_reaction:
self.fbc_lb = s_reaction['fbcLb']
self.fbc_ub = s_reaction['fbcUb']
self.local_params = get_local_params(s_reaction)
if 'kineticLaw' in s_reaction:
self.local_params = get_local_params(s_reaction)
self.kinetic_law = s_reaction['kineticLaw'] if type(s_reaction['kineticLaw']) == str else ''
# convert to numpy, replace local parameters with numerical values, inline lambda functions:
expanded_kl = sbmlxdf.misc.mathml2numpy(self.kinetic_law)
......@@ -37,8 +39,23 @@ class XbaReaction:
for cid in compartments.keys():
if re.search(r'\b'+cid+r'\b', self.expanded_kl) is not None:
self.compartment = cid
# retrieve optional flux balance bounds, ensuring consistent bounds
if 'fbcLb' in s_reaction and 'fbcUb' in s_reaction:
self.fbc_lb = s_reaction['fbcLb']
self.fbc_ub = max(self.fbc_lb, s_reaction['fbcUb'])
if self.reversible is False:
self.fbc_lb = max(0, self.fbc_lb)
self.fbc_ub = max(0, self.fbc_ub)
if 'xmlAnnotation' in s_reaction:
attrs = sbmlxdf.extract_xml_attrs(s_reaction['xmlAnnotation'], ns=xml_rba_ns, token='kinetics')
if 'kapp_f_per_s' in attrs:
self.rba_kapp_f = float(attrs['kapp_f_per_s'])
if 'kapp_r_per_s' in attrs:
self.rba_kapp_r = float(attrs['kapp_r_per_s'])
if 'Km_ext_M' in attrs:
self.rba_km_ext = float(attrs['Km_ext_M'])
self.ps_product = ''
self.enzyme = ''
self.enzyme = None
def set_enzyme(self, species):
for sid in self.modifiers:
......
......@@ -4,10 +4,7 @@ Peter Schubert, HHU Duesseldorf, May 2022
"""
import sbmlxdf
xml_gba_ns = 'http://www.hhu.de/ccb/gba/ns'
xml_token = 'molecule'
from xbanalysis.utils.utils import xml_gba_ns, xml_rba_ns
class XbaSpecies:
......@@ -25,8 +22,16 @@ class XbaSpecies:
if 'units' in s_species:
self.units = s_species['substanceUnits']
if 'xmlAnnotation' in s_species:
attrs = sbmlxdf.extract_xml_attrs(s_species['xmlAnnotation'], ns=xml_gba_ns, token=xml_token)
attrs = sbmlxdf.extract_xml_attrs(s_species['xmlAnnotation'], ns=xml_gba_ns, token='molecule')
if 'weight_Da' in attrs:
self.mw = float(attrs['weight_Da'])
attrs = sbmlxdf.extract_xml_attrs(s_species['xmlAnnotation'], ns=xml_rba_ns, token='costs')
if len(attrs) > 0:
if ('process_machine' in attrs) and ('process_capacity_per_s' in attrs):
self.rba_process_machine = attrs.pop('process_machine')
self.rba_process_capacity = float(attrs.pop('process_capacity_per_s'))
self.rba_process_costs = attrs
self.ps_rid = None
self.m_reactions = {}
self.ps_reactions = {}
......
"""Subpackage with XBA model classes """
from .gba_problem import GbaProblem
from .rba_problem import RbaProblem
from .fba_problem import FbaProblem
from .lp_problem import LpProblem
__all__ = ['GbaProblem', 'FbaProblem', 'LpProblem']
__all__ = ['GbaProblem', 'FbaProblem', 'LpProblem', 'RbaProblem']
......@@ -35,29 +35,23 @@ class FbaProblem:
:param xba_model: xba model with all required FBA parameters
:type xba_model: XbaModel
"""
self.is_fba_model = False
self.xba_model = xba_model
self.metab_sids = [s.id for s in xba_model.species.values() if s.sboterm == 'SBO:0000247']
metabs = set(self.metab_sids)
# identify reactions that do not use enzymes in reactants and products
self.mrids = [rid for rid, rxn in xba_model.reactions.items()
if len((set(rxn.products) | set(rxn.reactants)) - metabs) == 0]
self.mrid2idx = {mrid: idx for idx, mrid in enumerate(self.mrids)}
self.reversible = np.array([self.xba_model.reactions[rid].reversible for rid in self.mrids])
self.s_mat = xba_model.get_stoic_matrix(self.metab_sids, self.mrids)
try:
self.flux_bounds = np.array([[xba_model.reactions[rid].fbc_lb for rid in self.mrids],
[xba_model.reactions[rid].fbc_ub for rid in self.mrids]])
except AttributeError:
print('Error: Model does not contain FBA flux bounds!')
return
self.mr_rids = [r.id for r in xba_model.reactions.values() if r.metabs_only]
self.mrid2idx = {mrid: idx for idx, mrid in enumerate(self.mr_rids)}
self.reversible = np.array([self.xba_model.reactions[rid].reversible for rid in self.mr_rids])
self.s_mat = xba_model.get_stoic_matrix(self.metab_sids, self.mr_rids)
if hasattr(xba_model, 'fbc_objectives') is False:
print('Error: Model does not contain FBA objectives!')
self.is_fba_model = self.check_fba_model(xba_model)
if self.is_fba_model is False:
print('Error: not a FBA model - missing parameters')
return
self.obj_coefs = np.zeros(len(self.mrids))
self.flux_bounds = np.array([[xba_model.reactions[rid].fbc_lb for rid in self.mr_rids],
[xba_model.reactions[rid].fbc_ub for rid in self.mr_rids]])
self.obj_coefs = None
for oid, fbc_obj in xba_model.fbc_objectives.items():
if fbc_obj.active is True:
self.obj_id = fbc_obj.id
......@@ -65,21 +59,43 @@ class FbaProblem:
self.set_obj_coefs(fbc_obj.coefficiants)
break
if hasattr(self, 'obj_id') is False:
print('Error: Model does not contain an active FBA objective!')
return
def check_fba_model(self, xba_model):
"""Check if fba related data available in the XBA model
self.is_fba_model = True
:param xba_model: Xba model with data extracted from SBML file
:type xba_model: XbaModel
:return: indicator if model contains data required for FBA problem formulation
:rtype: bool
"""
is_fba_model = True
r_check = {'flux lower bound': 'fbc_lb', 'flux upper bound': 'fbc_ub'}
for check, param in r_check.items():
missing = [rid for rid in self.mr_rids if hasattr(xba_model.reactions[rid], param) is False]
if len(missing) > 0:
print(f'missing {check} in:', missing)
is_fba_model = False
if hasattr(xba_model, 'fbc_objectives') is False:
print('missing flux objectives')
is_fba_model = False
else:
active_objs = [oid for oid in xba_model.fbc_objectives if xba_model.fbc_objectives[oid].active]
if len(active_objs) == 0:
print('missing an active flux objective')
is_fba_model = False
return is_fba_model
def combine_fwd_bwd(self, vector):
"""combine forward and backward flux information.
:param vector: contains data on all reactions + backward information for reversible reactions
:type vector: 1D ndarray of shape (len(mrids) + sum(reversible))
:type vector: 1D ndarray of shape (len(mr_rids) + sum(reversible))
:return: combined flux information
:rtype: 1D ndarray of shape (len(mrids))
:rtype: 1D ndarray of shape (len(mr_rids))
"""
n_cols = len(self.mrids)
n_cols = len(self.mr_rids)
combined = vector[:n_cols]
combined[self.reversible] -= vector[n_cols:]
return combined
......@@ -97,14 +113,14 @@ class FbaProblem:
return None
lp = LpProblem()
# self.reversible = np.array([self.xba_model.reactions[rid].reversible for rid in self.mrids])
# self.reversible = np.array([self.xba_model.reactions[rid].reversible for rid in self.mr_rids])
fwd_bwd_s_mat = np.hstack((self.s_mat, -self.s_mat[:, self.reversible]))
fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible]))
fwd_bwd_bnds = np.hstack((np.fmax(self.flux_bounds, 0),
np.vstack((np.fmax(-self.flux_bounds[1, self.reversible], 0),
np.fmax(-self.flux_bounds[0, self.reversible], 0)))))
row_bnds = np.zeros((2, fwd_bwd_s_mat.shape[0]))
lp.configure(fwd_bwd_obj_coefs, fwd_bwd_bnds, fwd_bwd_s_mat, row_bnds, self.obj_dir)
row_bnds = np.zeros((fwd_bwd_s_mat.shape[0], 2))
lp.configure(fwd_bwd_obj_coefs, fwd_bwd_s_mat, row_bnds, fwd_bwd_bnds, self.obj_dir)
return lp
def fba_optimize(self):
......@@ -115,7 +131,7 @@ class FbaProblem:
"""
lp = self.create_fba_lp()
if lp is None:
return {'simplex_status': 1, 'simplex_message': 'not an FBA model'}
return {'success': False, 'message': 'not an FBA model'}
res = lp.solve()
del lp
res['x'] = self.combine_fwd_bwd(res['x'])
......@@ -133,9 +149,9 @@ class FbaProblem:
"""
lp = self.create_fba_lp()
if lp is None:
return {'simplex_status': 1, 'simplex_message': 'not an FBA model'}
res = lp.solve(short_result=True)
if res['simplex_status'] == 0:
return {'success': False, 'message': 'not an FBA model'}
res = lp.solve(short_result=False)
if res['success']:
# fix FBA objective as constraint
fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible]))
if self.obj_dir == 'maximize':
......@@ -144,7 +160,7 @@ class FbaProblem:
row_bds = np.array([np.nan, res['fun'] / fract_of_optimum])
lp.add_constraint(fwd_bwd_obj_coefs, row_bds)
# new LP objective is to minimize sum of fluxes
n_cols = len(self.mrids)
n_cols = len(self.mr_rids)
lp.set_obj_coefs(np.ones(n_cols + sum(self.reversible)))
lp.set_obj_dir('minimize')
res = lp.solve()
......@@ -165,13 +181,13 @@ class FbaProblem:
"""
lp = self.create_fba_lp()
if lp is None:
return {'simplex_status': 1, 'simplex_message': 'not an FBA model'}
return {'success': False, 'message': 'not an FBA model'}
res = lp.solve(short_result=True)
if rids is None:
rids = self.mrids
rids = self.mr_rids
flux_ranges = np.zeros((4, len(rids)))
if res['simplex_status'] == 0:
if res['success']:
# fix FBA objective as constraint
fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible]))
if self.obj_dir == 'maximize':
......@@ -180,7 +196,7 @@ class FbaProblem:
row_bds = np.array([np.nan, res['fun'] / fract_of_optimum])
fba_constr_row = lp.add_constraint(fwd_bwd_obj_coefs, row_bds)
n_cols = len(self.mrids)
n_cols = len(self.mr_rids)
for idx, rid in enumerate(rids):
# select reaction maximize/minimize flux
new_obj_coefs = np.zeros(n_cols)
......@@ -190,20 +206,20 @@ class FbaProblem:
lp.set_obj_dir('minimize')
res = lp.solve(short_result=True)
if res['simplex_status'] == 0:
if res['success']:
flux_ranges[0, idx] = res['fun']
flux_ranges[2, idx] = lp.get_row_value(fba_constr_row)
else:
print(f"FVA min failed for {rid}, {res['simplex_message']}")
print(f"FVA min failed for {rid}")
flux_ranges[0, idx] = np.nan
lp.set_obj_dir('maximize')
res = lp.solve(short_result=True)
if res['simplex_status'] == 0:
if res['success']:
flux_ranges[1, idx] = res['fun']
flux_ranges[3, idx] = lp.get_row_value(fba_constr_row)
else:
print(f"FVA max failed for {rid}, {res['simplex_message']}")
print(f"FVA max failed for {rid}")
flux_ranges[1, idx] = np.nan
del lp
......@@ -228,7 +244,7 @@ class FbaProblem:
:returns: flux bounds configured in the FBA problem
:rtype: dict (key: reaction id, value: list with two float values for lower/upper bound)
"""
return {mrid: [bds[0], bds[1]] for mrid, bds in zip(self.mrids, self.flux_bounds.T)}
return {mrid: [bds[0], bds[1]] for mrid, bds in zip(self.mr_rids, self.flux_bounds.T)}
def set_flux_bounds(self, flux_bounds):
"""selectivly set lower and upper flux bounds for given reactions.
......@@ -255,6 +271,6 @@ class FbaProblem:
:param coefficiants: objective coefficients of selected reactions
:type coefficiants: dict (key: reaction id, value: float)
"""
self.obj_coefs = np.zeros(len(self.mrids))
self.obj_coefs = np.zeros(len(self.mr_rids))
for rid, coef in coefficiants.items():
self.obj_coefs[self.mrid2idx[rid]] = coef
......@@ -69,7 +69,6 @@ class LpProblem:
self.lp = glpk.glp_create_prob()
self.ncols = 0
self.nrows = 0
self.simplex_result = None
self.res = {}
@staticmethod
......@@ -104,18 +103,18 @@ class LpProblem:
var_type = glpk.GLP_FR
return var_type, lb, ub
def configure(self, obj_coefs, col_bnds, constr_coefs, row_bnds, direction='maximize'):
def configure(self, obj_coefs, constr_coefs, row_bnds, col_bnds, direction='maximize'):
"""Configure the LP Problem.
Note: glpk related indices start at 1 (not zero)
:param obj_coefs: coefficients of objective function
:type obj_coefs: 1D ndarray of shape (1, ncols)
:param col_bnds: lower and upper bounds of structural (col) variables
:type col_bnds: 2D ndarray of shape(2, ncols) (1st row: lower bounds, 2nd row: upper bounds)
:param constr_coefs: matrix with constraint coefficients
:type constr_coefs: 2D nparray with shape(nrows, ncols)
:param row_bnds: lower and upper bounds of auxhiliary (row) variables
:type row_bnds: 2D ndarray of shape(2, nrows) (1st row: lower bounds, 2nd row: upper bounds)
:type row_bnds: 2D ndarray of shape(nrows, 2) (1st col: lower bounds, 2nd col: upper bounds)
:param col_bnds: lower and upper bounds of structural (col) variables
:type col_bnds: 2D ndarray of shape(2, ncols) (1st row: lower bounds, 2nd row: upper bounds)
:param direction: direction for the optimization ('maximize' or 'minimize')
:type direction: string
"""
......@@ -123,7 +122,7 @@ class LpProblem:
assert len(obj_coefs) == self.ncols
assert col_bnds.shape == (2, self.ncols)
assert row_bnds.shape == (2, self.nrows)
assert row_bnds.shape == (self.nrows, 2)
# configure objective function
glpk.glp_add_cols(self.lp, self.ncols)
......@@ -137,7 +136,7 @@ class LpProblem:
# configure constraints
glpk.glp_add_rows(self.lp, self.nrows)
for i, lb_ub in enumerate(row_bnds.T):
for i, lb_ub in enumerate(row_bnds):
var_type, lb, ub = self._get_variable_type(lb_ub)
glpk.glp_set_row_bnds(self.lp, 1 + i, var_type, lb, ub)
constr_coefs_sparse = scipy.sparse.coo_matrix(constr_coefs)
......@@ -251,11 +250,11 @@ class LpProblem:
glpk.glp_scale_prob(self.lp, opt)
# sjj = [glpk.glp_get_sjj(self.lp, 1+i) for i in range(glpk.glp_get_num_cols(self.lp))]
# rii = [glpk.glp_get_rii(self.lp, 1+i) for i in range(glpk.glp_get_num_rows(self.lp))]
self.simplex_result = glpk.glp_simplex(self.lp, None)
return self.results(short_result)
simplex_result = glpk.glp_simplex(self.lp, None)
return self.results(simplex_result, short_result)
def get_row_value(self, row):
"""Retrieve right hand side of a constraint (auxiliary variable)
"""Retrieve right-hand side of a constraint (auxiliary variable)
:param row: index into constraints
:type row: int
......@@ -265,11 +264,13 @@ class LpProblem:
assert row <= glpk.glp_get_num_rows(self.lp)
return glpk.glp_get_row_prim(self.lp, row)
def results(self, short_result=False):
def results(self, simplex_result, short_result=False):
"""Collect results for the optimization
Alternatively, reduced results, e.g. when doing pFBA
:param simplex_result: result from the LP solver
:type simplex_result: int
:param short_result: short result (only objective value and status) or long result
:type short_result: bool (default:False)
:return: result from the FBA optimization
......@@ -277,13 +278,14 @@ class LpProblem:
"""
self.res = {
'fun': glpk.glp_get_obj_val(self.lp),
'simplex_status': self.simplex_result,
'simplex_message': simplex_status[self.simplex_result],
'success': glpk.glp_get_status(self.lp) == glpk.GLP_OPT,
}
if short_result is False:
self.res['x'] = np.array([glpk.glp_get_col_prim(self.lp, 1 + i) for i in range(self.ncols)])
self.res['reduced_costs'] = np.array([glpk.glp_get_col_dual(self.lp, 1 + i) for i in range(self.ncols)])
self.res['shadow_prices'] = np.array([glpk.glp_get_row_dual(self.lp, 1 + i) for i in range(self.nrows)])
self.res['simplex_result'] = simplex_result
self.res['simplex_message'] = simplex_status[simplex_result],
self.res['generic_status'] = generic_status[glpk.glp_get_status(self.lp)]
self.res['primal_status'] = prim_status[glpk.glp_get_prim_stat(self.lp)]
self.res['dual_status'] = dual_status[glpk.glp_get_dual_stat(self.lp)]
......
"""Implementation of GBA Problem class.
- several process machines supported (yet to be validated)
- non-enzymatic metabolic reactions supported (yet to be validated)
Yet to implement:
- enzyme degradation
- enzyme transitions
- several compartments constraints
- target concentrations and target fluxes
Peter Schubert, HHU Duesseldorf, June 2022
"""
import numpy as np
from xbanalysis.problems.lp_problem import LpProblem
class RbaProblem:
def __init__(self, xba_model):
"""
:param xba_model:
"""
# metabolites and enzymes in the model
metab_sids = [s.id for s in xba_model.species.values() if s.sboterm == 'SBO:0000247']
self.m_sids = [sid for sid in metab_sids if xba_model.species[sid].constant is False]
self.ext_conc = {sid: xba_model.species[sid].initial_conc
for sid in metab_sids if xba_model.species[sid].constant is True}
self.mr_rids = [r.id for r in xba_model.reactions.values()
if r.sboterm in ('SBO:0000167', 'SBO:0000179', 'SBO:0000182') and r.metabs_only]
self.mrid2enz = {rid: xba_model.reactions[rid].enzyme for rid in self.mr_rids}
self.enz_sids = [enz_sid for enz_sid in self.mrid2enz.values() if enz_sid is not None]
self.rev_enz_sids = [enz_sid for mr_rid, enz_sid in self.mrid2enz.items()
if enz_sid is not None and xba_model.reactions[mr_rid].reversible]
self.processes = {s.rba_process_machine: sid for sid, s in xba_model.species.items()
if hasattr(s, 'rba_process_machine')}
self.psenz_rids = [xba_model.species[enz_sid].ps_rid for enz_sid in self.enz_sids]
self.pspm_rids = [xba_model.species[enz_sid].ps_rid for enz_sid in self.processes.values()]
self.densities = {cid: c.rba_frac_gcdw for cid, c in xba_model.compartments.items()
if hasattr(c, 'rba_frac_gcdw')}
self.rba_cols = self.mr_rids + self.enz_sids + list(self.processes)
self.rba_rows = (self.m_sids + list(self.processes)
+ [sid + '_feff' for sid in self.enz_sids]
+ [sid + '_reff' for sid in self.rev_enz_sids]
+ list(self.densities))
self.is_rba_model = self.check_rba_model(xba_model)
if self.is_rba_model is False:
print('Error: not a GBA model - missing parameters')
return
fix_m, var_m, row_bnds_m = self.construct_mass_balance(xba_model)
fix_pc, var_pc, row_bnds_pc = self.construct_process_capacities(xba_model)
fix_eff, var_eff, row_bnds_eff = self.construct_efficencies(xba_model)
fix_cd, var_cd, row_bnds_cd = self.construct_densities(xba_model)
self.fix_cd = fix_cd
self.fix_mat = np.vstack((fix_m, fix_pc, fix_eff, fix_cd))
self.var_mat = np.vstack((var_m, var_pc, var_eff, var_cd))
self.row_bnds = np.vstack((row_bnds_m, row_bnds_pc, row_bnds_eff, row_bnds_cd))
# in RBA we only check feasibility.
# Here we maximize enzyme amount and check that objective value e.g. > 1e-10 mmol/gcdw
self.obj_dir = 'maximize'
self.obj_coefs = np.hstack((np.zeros(len(self.mr_rids)),
np.ones(len(self.enz_sids) + len(self.processes))))
col_lbs = np.hstack(([xba_model.reactions[rid].fbc_lb for rid in self.mr_rids],
np.zeros(len(self.enz_sids) + len(self.processes))))
col_ubs = np.hstack(([xba_model.reactions[rid].fbc_ub for rid in self.mr_rids],
np.nan * np.ones(len(self.enz_sids) + len(self.processes))))
self.col_bnds = np.vstack((col_lbs, col_ubs))
self.scaling = '2N'
self.max_iter = 20
def check_rba_model(self, xba_model):
"""Check if rba related data available in the XBA model
:param xba_model: Xba model with data extracted from SBML file
:type xba_model: XbaModel
:return: indicator if model contains data required for RBA problem formulation
:rtype: bool
"""
is_rba_model = True
r_check = {'flux lower bound': 'fbc_lb', 'flux upper bound': 'fbc_ub', 'kapp forward': 'rba_kapp_f'}
for check, param in r_check.items():
missing = [rid for rid in self.mr_rids if hasattr(xba_model.reactions[rid], param) is False]
if len(missing) > 0:
print(f'missing {check} in:', missing)
is_rba_model = False
missing = [rid for rid in self.mr_rids if xba_model.reactions[rid].reversible
and hasattr(xba_model.reactions[rid], 'rba_kapp_r') is False]
if len(missing) > 0:
print('missing kapp reverse in:', missing)
is_rba_model = False
missing = [sid for sid in self.m_sids + self.enz_sids + list(self.processes.values())
if hasattr(xba_model.species[sid], 'mw') is False]
if len(missing) > 0:
print('missing molecular weight in:', missing)
is_rba_model = False
missing = [sid for sid in list(self.processes.values())
if hasattr(xba_model.species[sid], 'rba_process_capacity') is False]
if len(missing) > 0:
print('missing process capacity in:', missing)
is_rba_model = False
if len(self.densities) == 0:
print('missing a density constraint')
is_rba_model = False
return is_rba_model
def rba_optimize(self):
"""
:return:
"""
if self.is_rba_model is False:
print('Error: not a RBA model - missing parameters')
res = {'success': False, 'fun': 0.0, 'message': 'not an RBA model'}
return res
res = {}
# Preanalysis: find range of growth rates
gr = {'lb': 0.0, 'ub': np.nan, 'try': 1.0}
for i in range(self.max_iter):
constr_coefs = self.fix_mat + gr['try'] * self.var_mat
lp = LpProblem()
lp.configure(self.obj_coefs, constr_coefs, self.row_bnds, self.col_bnds, self.obj_dir)
res = lp.solve(scaling=self.scaling)
del lp
if res['success'] and res['fun'] > 1e-10:
gr['lb'] = gr['try']
gr['try'] *= 10
else:
if gr['try'] <= 1e-4:
break
gr['ub'] = gr['try']
gr['try'] /= 10
if gr['lb'] > 0.0 and np.isfinite(gr['ub']):
break
if gr['lb'] == 0.0 or np.isfinite(gr['ub']) is False:
res['success'] = False
return res
# find eaxt growth rate
for i in range(self.max_iter):
gr['try'] = 0.5 * (gr['lb'] + gr['ub'])
constr_coefs = self.fix_mat + gr['try'] * self.var_mat
lp = LpProblem()
lp.configure(self.obj_coefs, constr_coefs, self.row_bnds, self.col_bnds, self.obj_dir)
res = lp.solve(scaling=self.scaling)
del lp
if res['success'] and res['fun'] > 1e-10:
gr['lb'] = gr['try']
else:
gr['ub'] = gr['try']
if (gr['ub'] - gr['lb']) < gr['try'] * 1e-5:
break
res['fun'] = gr['try']
return res
def construct_mass_balance(self, xba_model):
fix_m_x_mr = xba_model.get_stoic_matrix(self.m_sids, self.mr_rids)
var_m_x_enz = xba_model.get_stoic_matrix(self.m_sids, self.psenz_rids)
var_m_x_pm = xba_model.get_stoic_matrix(self.m_sids, self.pspm_rids)
fix_m = np.hstack((fix_m_x_mr,
np.zeros((len(self.m_sids), len(self.enz_sids)+len(self.processes)))))
var_m = np.hstack((np.zeros((len(self.m_sids), len(self.mr_rids))),
var_m_x_enz,
var_m_x_pm))
row_bnds_m = np.zeros((len(self.m_sids), 2))
return fix_m, var_m, row_bnds_m
def construct_process_capacities(self, xba_model):
var_pc_x_enz = np.zeros((len(self.processes), len(self.psenz_rids)))
for idx, pm in enumerate(self.processes):
var_pc_x_enz[idx] = [xba_model.species[enz_sid].rba_process_costs.get(pm, 0.0)
for enz_sid in self.enz_sids]
var_pc_x_pm = np.zeros((len(self.processes), len(self.processes)))
for idx, pm in enumerate(self.processes):
var_pc_x_pm[idx] = [xba_model.species[enz_sid].rba_process_costs.get(pm, 0.0)
for enz_sid in self.processes.values()]
fix_pc_x_pm = -np.diag([xba_model.species[enz_sid].rba_process_capacity
for enz_sid in self.processes.values()]) * 3600
fix_pc = np.hstack((np.zeros((len(self.processes), len(self.mr_rids) + len(self.enz_sids))),
fix_pc_x_pm))
var_pc = np.hstack((np.zeros((len(self.processes), len(self.mr_rids))),
var_pc_x_enz, var_pc_x_pm))
row_bnds_pc = np.zeros((len(self.processes), 2))
return fix_pc, var_pc, row_bnds_pc
def get_efficiencies(self, xba_model):
efficiencies = np.zeros((2, len(self.enz_sids)))
idx = 0
for mr_rid, enz_sid in self.mrid2enz.items():
if enz_sid is not None:
r = xba_model.reactions[mr_rid]
saturation = 1.0
if hasattr(r, 'rba_km_ext'):
for sid in r.reactants:
if sid in self.ext_conc:
saturation *= self.ext_conc[sid] / (self.ext_conc[sid] + r.rba_km_ext)
efficiencies[0, idx] = r.rba_kapp_f * 3600 * saturation
saturation = 1.0
if r.reversible:
if hasattr(r, 'rba_km_ext'):
for sid in r.products:
if sid in self.ext_conc:
saturation *= self.ext_conc[sid] / (self.ext_conc[sid] + r.rba_km_ext)
efficiencies[1, idx] = r.rba_kapp_r * 3600 * saturation
idx += 1
return efficiencies
def construct_efficencies(self, xba_model):
efficiencies = self.get_efficiencies(xba_model)
mask = np.array([enz_sid is not None for enz_sid in self.mrid2enz.values()])
fix_feff_x_mr = np.diag(np.ones(len(self.mrid2enz)))[mask]
fix_feff_x_enz = np.diag(-efficiencies[0])
fix_feff_x_pm = np.zeros((len(self.enz_sids), len(self.processes)))
fix_feff = np.hstack((fix_feff_x_mr, fix_feff_x_enz, fix_feff_x_pm))
row_bnds_feff = np.vstack((np.ones(len(self.enz_sids)) * np.nan, np.zeros(len(self.enz_sids)))).T
mask = [enz_sid is not None and xba_model.reactions[mr_rid].reversible
for mr_rid, enz_sid in self.mrid2enz.items()]
fix_reff_x_mr = np.diag(-np.ones(len(self.mrid2enz)))[mask]
mask = [enz_sid in self.rev_enz_sids for enz_sid in self.enz_sids]
fix_reff_x_enz = np.diag(-efficiencies[1])[mask]
fix_reff_x_pm = np.zeros((len(self.rev_enz_sids), len(self.processes)))
fix_reff = np.hstack((fix_reff_x_mr, fix_reff_x_enz, fix_reff_x_pm))
row_bnds_reff = np.vstack((np.ones(len(self.rev_enz_sids)) * np.nan, np.zeros(len(self.rev_enz_sids)))).T
fix_eff = np.vstack((fix_feff, fix_reff))
var_eff = np.zeros_like(fix_eff)
row_bnds_eff = np.vstack((row_bnds_feff, row_bnds_reff))
return fix_eff, var_eff, row_bnds_eff
def construct_densities(self, xba_model):
fix_cd_x_mr = np.zeros((len(self.densities), len(self.mr_rids)))
fix_cd_x_enz = np.zeros((len(self.densities), len(self.enz_sids)))
for row_idx, cid in enumerate(self.densities):
for col_idx, sid in enumerate(self.enz_sids):
if xba_model.species[sid].compartment == cid:
fix_cd_x_enz[row_idx, col_idx] = xba_model.species[sid].mw / 1000
fix_cd_x_pm = np.zeros((len(self.densities), len(self.processes)))
for row_idx, cid in enumerate(self.densities):
for col_idx, sid in enumerate(self.processes.values()):
if xba_model.species[sid].compartment == cid:
fix_cd_x_pm[row_idx, col_idx] = xba_model.species[sid].mw / 1000
fix_cd = np.hstack((fix_cd_x_mr, fix_cd_x_enz, fix_cd_x_pm))
var_cd = np.zeros_like(fix_cd)
row_bnds_cd = np.vstack((np.zeros(len(self.densities)),
[val for val in self.densities.values()])).T
return fix_cd, var_cd, row_bnds_cd
......@@ -6,6 +6,9 @@ Peter Schubert, HHU Duesseldorf, May 2022
import sys
import re
xml_gba_ns = 'http://www.hhu.de/ccb/gba/ns'
xml_rba_ns = 'http://www.hhu.de/ccb/rba/ns'
def get_sbml_ids(df, sbo_id):
"""Get SBML IDs with specified sboterm-id.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment