diff --git a/xbanalysis/model/xba_compartment.py b/xbanalysis/model/xba_compartment.py index c7841ceefbafcb670bca58f8282209b5fb78a0a7..984e762a93301efde1dfcb4108b3e54b2b81129a 100644 --- a/xbanalysis/model/xba_compartment.py +++ b/xbanalysis/model/xba_compartment.py @@ -3,6 +3,10 @@ Peter Schubert, HHU Duesseldorf, May 2022 """ +import sbmlxdf + +from xbanalysis.utils.utils import xml_rba_ns + class XbaCompartment: @@ -14,4 +18,8 @@ class XbaCompartment: if 'units' in s_compartment: self.units = s_compartment['units'] if 'spatialDimension' in s_compartment: - self.dimension = s_compartment['spatialDimension'] \ No newline at end of file + self.dimension = s_compartment['spatialDimension'] + if 'xmlAnnotation' in s_compartment: + attrs = sbmlxdf.extract_xml_attrs(s_compartment['xmlAnnotation'], ns=xml_rba_ns, token='density') + if 'frac_gcdw' in attrs: + self.rba_frac_gcdw = float(attrs['frac_gcdw']) diff --git a/xbanalysis/model/xba_model.py b/xbanalysis/model/xba_model.py index 87d0201d8dca2ad93c4084cfcaa8705a7832fe34..3f643b91d4d78fd51cb7b052768412c9161cb04f 100644 --- a/xbanalysis/model/xba_model.py +++ b/xbanalysis/model/xba_model.py @@ -41,7 +41,7 @@ class XbaModel: self.species = {sid: XbaSpecies(row) for sid, row in model_dict['species'].iterrows()} - self.reactions = {rid: XbaReaction(row, self.functions, self.compartments) + self.reactions = {rid: XbaReaction(row, self.species, self.functions, self.compartments) for rid, row in model_dict['reactions'].iterrows()} for r in self.reactions.values(): diff --git a/xbanalysis/model/xba_reaction.py b/xbanalysis/model/xba_reaction.py index 0627e8187115746de3f1bb5dfe6813fa24845e09..ba5d6c8e23d92d5c98562992eb14d6ad68613841 100644 --- a/xbanalysis/model/xba_reaction.py +++ b/xbanalysis/model/xba_reaction.py @@ -6,28 +6,30 @@ Peter Schubert, HHU Duesseldorf, May 2022 import re import sbmlxdf from xbanalysis.utils.utils import get_species_stoic, get_local_params, expand_kineticlaw +from xbanalysis.utils.utils import xml_rba_ns class XbaReaction: - def __init__(self, s_reaction, functions, compartments): + def __init__(self, s_reaction, species, functions, compartments): self.id = s_reaction.name self.name = s_reaction.get('name', self.id) if 'sboterm' in s_reaction: self.sboterm = s_reaction['sboterm'] self.reaction_string = s_reaction['reactionString'] self.reversible = s_reaction['reversible'] - self.products = get_species_stoic(s_reaction['products']) self.reactants = get_species_stoic(s_reaction['reactants']) + self.products = get_species_stoic(s_reaction['products']) + self.metabs_only = True + for sid in list(self.reactants) + list(self.products): + if species[sid].sboterm != 'SBO:0000247': + self.metabs_only = False if 'modifiers' in s_reaction: self.modifiers = get_species_stoic(s_reaction['modifiers']) else: self.modifiers = {} - if 'fbcLb' in s_reaction and 'fbcUb' in s_reaction: - self.fbc_lb = s_reaction['fbcLb'] - self.fbc_ub = s_reaction['fbcUb'] - self.local_params = get_local_params(s_reaction) if 'kineticLaw' in s_reaction: + self.local_params = get_local_params(s_reaction) self.kinetic_law = s_reaction['kineticLaw'] if type(s_reaction['kineticLaw']) == str else '' # convert to numpy, replace local parameters with numerical values, inline lambda functions: expanded_kl = sbmlxdf.misc.mathml2numpy(self.kinetic_law) @@ -37,8 +39,23 @@ class XbaReaction: for cid in compartments.keys(): if re.search(r'\b'+cid+r'\b', self.expanded_kl) is not None: self.compartment = cid + # retrieve optional flux balance bounds, ensuring consistent bounds + if 'fbcLb' in s_reaction and 'fbcUb' in s_reaction: + self.fbc_lb = s_reaction['fbcLb'] + self.fbc_ub = max(self.fbc_lb, s_reaction['fbcUb']) + if self.reversible is False: + self.fbc_lb = max(0, self.fbc_lb) + self.fbc_ub = max(0, self.fbc_ub) + if 'xmlAnnotation' in s_reaction: + attrs = sbmlxdf.extract_xml_attrs(s_reaction['xmlAnnotation'], ns=xml_rba_ns, token='kinetics') + if 'kapp_f_per_s' in attrs: + self.rba_kapp_f = float(attrs['kapp_f_per_s']) + if 'kapp_r_per_s' in attrs: + self.rba_kapp_r = float(attrs['kapp_r_per_s']) + if 'Km_ext_M' in attrs: + self.rba_km_ext = float(attrs['Km_ext_M']) self.ps_product = '' - self.enzyme = '' + self.enzyme = None def set_enzyme(self, species): for sid in self.modifiers: diff --git a/xbanalysis/model/xba_species.py b/xbanalysis/model/xba_species.py index 6d0bdbcb02d358cc203ec7ff6db50bb2442c0910..bbfe20df5b087adcae437ffe01d8fed8cef9bd87 100644 --- a/xbanalysis/model/xba_species.py +++ b/xbanalysis/model/xba_species.py @@ -4,10 +4,7 @@ Peter Schubert, HHU Duesseldorf, May 2022 """ import sbmlxdf - - -xml_gba_ns = 'http://www.hhu.de/ccb/gba/ns' -xml_token = 'molecule' +from xbanalysis.utils.utils import xml_gba_ns, xml_rba_ns class XbaSpecies: @@ -25,8 +22,16 @@ class XbaSpecies: if 'units' in s_species: self.units = s_species['substanceUnits'] if 'xmlAnnotation' in s_species: - attrs = sbmlxdf.extract_xml_attrs(s_species['xmlAnnotation'], ns=xml_gba_ns, token=xml_token) - self.mw = float(attrs['weight_Da']) + attrs = sbmlxdf.extract_xml_attrs(s_species['xmlAnnotation'], ns=xml_gba_ns, token='molecule') + if 'weight_Da' in attrs: + self.mw = float(attrs['weight_Da']) + attrs = sbmlxdf.extract_xml_attrs(s_species['xmlAnnotation'], ns=xml_rba_ns, token='costs') + if len(attrs) > 0: + if ('process_machine' in attrs) and ('process_capacity_per_s' in attrs): + self.rba_process_machine = attrs.pop('process_machine') + self.rba_process_capacity = float(attrs.pop('process_capacity_per_s')) + self.rba_process_costs = attrs + self.ps_rid = None self.m_reactions = {} self.ps_reactions = {} diff --git a/xbanalysis/problems/__init__.py b/xbanalysis/problems/__init__.py index c08153cfe41de038cf52979345978028a175fab7..3adc55401bc3b973d4b49b99d8f153dcef000be2 100644 --- a/xbanalysis/problems/__init__.py +++ b/xbanalysis/problems/__init__.py @@ -1,7 +1,8 @@ """Subpackage with XBA model classes """ from .gba_problem import GbaProblem +from .rba_problem import RbaProblem from .fba_problem import FbaProblem from .lp_problem import LpProblem -__all__ = ['GbaProblem', 'FbaProblem', 'LpProblem'] +__all__ = ['GbaProblem', 'FbaProblem', 'LpProblem', 'RbaProblem'] diff --git a/xbanalysis/problems/fba_problem.py b/xbanalysis/problems/fba_problem.py index 2f6d4219f34ba26a0fce0f79fa37ce9d78a4cc3d..aa7815c11908e0c9747c1187daa1a9e8eaa9398e 100644 --- a/xbanalysis/problems/fba_problem.py +++ b/xbanalysis/problems/fba_problem.py @@ -35,29 +35,23 @@ class FbaProblem: :param xba_model: xba model with all required FBA parameters :type xba_model: XbaModel """ - self.is_fba_model = False self.xba_model = xba_model self.metab_sids = [s.id for s in xba_model.species.values() if s.sboterm == 'SBO:0000247'] - metabs = set(self.metab_sids) # identify reactions that do not use enzymes in reactants and products - self.mrids = [rid for rid, rxn in xba_model.reactions.items() - if len((set(rxn.products) | set(rxn.reactants)) - metabs) == 0] - self.mrid2idx = {mrid: idx for idx, mrid in enumerate(self.mrids)} - self.reversible = np.array([self.xba_model.reactions[rid].reversible for rid in self.mrids]) - self.s_mat = xba_model.get_stoic_matrix(self.metab_sids, self.mrids) - - try: - self.flux_bounds = np.array([[xba_model.reactions[rid].fbc_lb for rid in self.mrids], - [xba_model.reactions[rid].fbc_ub for rid in self.mrids]]) - except AttributeError: - print('Error: Model does not contain FBA flux bounds!') - return + self.mr_rids = [r.id for r in xba_model.reactions.values() if r.metabs_only] + self.mrid2idx = {mrid: idx for idx, mrid in enumerate(self.mr_rids)} + self.reversible = np.array([self.xba_model.reactions[rid].reversible for rid in self.mr_rids]) + self.s_mat = xba_model.get_stoic_matrix(self.metab_sids, self.mr_rids) - if hasattr(xba_model, 'fbc_objectives') is False: - print('Error: Model does not contain FBA objectives!') + self.is_fba_model = self.check_fba_model(xba_model) + if self.is_fba_model is False: + print('Error: not a FBA model - missing parameters') return - self.obj_coefs = np.zeros(len(self.mrids)) + + self.flux_bounds = np.array([[xba_model.reactions[rid].fbc_lb for rid in self.mr_rids], + [xba_model.reactions[rid].fbc_ub for rid in self.mr_rids]]) + self.obj_coefs = None for oid, fbc_obj in xba_model.fbc_objectives.items(): if fbc_obj.active is True: self.obj_id = fbc_obj.id @@ -65,21 +59,43 @@ class FbaProblem: self.set_obj_coefs(fbc_obj.coefficiants) break - if hasattr(self, 'obj_id') is False: - print('Error: Model does not contain an active FBA objective!') - return + def check_fba_model(self, xba_model): + """Check if fba related data available in the XBA model - self.is_fba_model = True + :param xba_model: Xba model with data extracted from SBML file + :type xba_model: XbaModel + :return: indicator if model contains data required for FBA problem formulation + :rtype: bool + """ + is_fba_model = True + + r_check = {'flux lower bound': 'fbc_lb', 'flux upper bound': 'fbc_ub'} + for check, param in r_check.items(): + missing = [rid for rid in self.mr_rids if hasattr(xba_model.reactions[rid], param) is False] + if len(missing) > 0: + print(f'missing {check} in:', missing) + is_fba_model = False + + if hasattr(xba_model, 'fbc_objectives') is False: + print('missing flux objectives') + is_fba_model = False + else: + active_objs = [oid for oid in xba_model.fbc_objectives if xba_model.fbc_objectives[oid].active] + if len(active_objs) == 0: + print('missing an active flux objective') + is_fba_model = False + + return is_fba_model def combine_fwd_bwd(self, vector): """combine forward and backward flux information. :param vector: contains data on all reactions + backward information for reversible reactions - :type vector: 1D ndarray of shape (len(mrids) + sum(reversible)) + :type vector: 1D ndarray of shape (len(mr_rids) + sum(reversible)) :return: combined flux information - :rtype: 1D ndarray of shape (len(mrids)) + :rtype: 1D ndarray of shape (len(mr_rids)) """ - n_cols = len(self.mrids) + n_cols = len(self.mr_rids) combined = vector[:n_cols] combined[self.reversible] -= vector[n_cols:] return combined @@ -97,14 +113,14 @@ class FbaProblem: return None lp = LpProblem() - # self.reversible = np.array([self.xba_model.reactions[rid].reversible for rid in self.mrids]) + # self.reversible = np.array([self.xba_model.reactions[rid].reversible for rid in self.mr_rids]) fwd_bwd_s_mat = np.hstack((self.s_mat, -self.s_mat[:, self.reversible])) fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible])) fwd_bwd_bnds = np.hstack((np.fmax(self.flux_bounds, 0), np.vstack((np.fmax(-self.flux_bounds[1, self.reversible], 0), np.fmax(-self.flux_bounds[0, self.reversible], 0))))) - row_bnds = np.zeros((2, fwd_bwd_s_mat.shape[0])) - lp.configure(fwd_bwd_obj_coefs, fwd_bwd_bnds, fwd_bwd_s_mat, row_bnds, self.obj_dir) + row_bnds = np.zeros((fwd_bwd_s_mat.shape[0], 2)) + lp.configure(fwd_bwd_obj_coefs, fwd_bwd_s_mat, row_bnds, fwd_bwd_bnds, self.obj_dir) return lp def fba_optimize(self): @@ -115,7 +131,7 @@ class FbaProblem: """ lp = self.create_fba_lp() if lp is None: - return {'simplex_status': 1, 'simplex_message': 'not an FBA model'} + return {'success': False, 'message': 'not an FBA model'} res = lp.solve() del lp res['x'] = self.combine_fwd_bwd(res['x']) @@ -133,9 +149,9 @@ class FbaProblem: """ lp = self.create_fba_lp() if lp is None: - return {'simplex_status': 1, 'simplex_message': 'not an FBA model'} - res = lp.solve(short_result=True) - if res['simplex_status'] == 0: + return {'success': False, 'message': 'not an FBA model'} + res = lp.solve(short_result=False) + if res['success']: # fix FBA objective as constraint fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible])) if self.obj_dir == 'maximize': @@ -144,7 +160,7 @@ class FbaProblem: row_bds = np.array([np.nan, res['fun'] / fract_of_optimum]) lp.add_constraint(fwd_bwd_obj_coefs, row_bds) # new LP objective is to minimize sum of fluxes - n_cols = len(self.mrids) + n_cols = len(self.mr_rids) lp.set_obj_coefs(np.ones(n_cols + sum(self.reversible))) lp.set_obj_dir('minimize') res = lp.solve() @@ -165,13 +181,13 @@ class FbaProblem: """ lp = self.create_fba_lp() if lp is None: - return {'simplex_status': 1, 'simplex_message': 'not an FBA model'} + return {'success': False, 'message': 'not an FBA model'} res = lp.solve(short_result=True) if rids is None: - rids = self.mrids + rids = self.mr_rids flux_ranges = np.zeros((4, len(rids))) - if res['simplex_status'] == 0: + if res['success']: # fix FBA objective as constraint fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible])) if self.obj_dir == 'maximize': @@ -180,7 +196,7 @@ class FbaProblem: row_bds = np.array([np.nan, res['fun'] / fract_of_optimum]) fba_constr_row = lp.add_constraint(fwd_bwd_obj_coefs, row_bds) - n_cols = len(self.mrids) + n_cols = len(self.mr_rids) for idx, rid in enumerate(rids): # select reaction maximize/minimize flux new_obj_coefs = np.zeros(n_cols) @@ -190,20 +206,20 @@ class FbaProblem: lp.set_obj_dir('minimize') res = lp.solve(short_result=True) - if res['simplex_status'] == 0: + if res['success']: flux_ranges[0, idx] = res['fun'] flux_ranges[2, idx] = lp.get_row_value(fba_constr_row) else: - print(f"FVA min failed for {rid}, {res['simplex_message']}") + print(f"FVA min failed for {rid}") flux_ranges[0, idx] = np.nan lp.set_obj_dir('maximize') res = lp.solve(short_result=True) - if res['simplex_status'] == 0: + if res['success']: flux_ranges[1, idx] = res['fun'] flux_ranges[3, idx] = lp.get_row_value(fba_constr_row) else: - print(f"FVA max failed for {rid}, {res['simplex_message']}") + print(f"FVA max failed for {rid}") flux_ranges[1, idx] = np.nan del lp @@ -228,7 +244,7 @@ class FbaProblem: :returns: flux bounds configured in the FBA problem :rtype: dict (key: reaction id, value: list with two float values for lower/upper bound) """ - return {mrid: [bds[0], bds[1]] for mrid, bds in zip(self.mrids, self.flux_bounds.T)} + return {mrid: [bds[0], bds[1]] for mrid, bds in zip(self.mr_rids, self.flux_bounds.T)} def set_flux_bounds(self, flux_bounds): """selectivly set lower and upper flux bounds for given reactions. @@ -255,6 +271,6 @@ class FbaProblem: :param coefficiants: objective coefficients of selected reactions :type coefficiants: dict (key: reaction id, value: float) """ - self.obj_coefs = np.zeros(len(self.mrids)) + self.obj_coefs = np.zeros(len(self.mr_rids)) for rid, coef in coefficiants.items(): self.obj_coefs[self.mrid2idx[rid]] = coef diff --git a/xbanalysis/problems/lp_problem.py b/xbanalysis/problems/lp_problem.py index fcfad111e7abaeb812aa2380f898b879ebc839f0..72870dfed9d1c3d09818bad79baf2919f7ced32c 100644 --- a/xbanalysis/problems/lp_problem.py +++ b/xbanalysis/problems/lp_problem.py @@ -69,7 +69,6 @@ class LpProblem: self.lp = glpk.glp_create_prob() self.ncols = 0 self.nrows = 0 - self.simplex_result = None self.res = {} @staticmethod @@ -104,18 +103,18 @@ class LpProblem: var_type = glpk.GLP_FR return var_type, lb, ub - def configure(self, obj_coefs, col_bnds, constr_coefs, row_bnds, direction='maximize'): + def configure(self, obj_coefs, constr_coefs, row_bnds, col_bnds, direction='maximize'): """Configure the LP Problem. Note: glpk related indices start at 1 (not zero) :param obj_coefs: coefficients of objective function :type obj_coefs: 1D ndarray of shape (1, ncols) - :param col_bnds: lower and upper bounds of structural (col) variables - :type col_bnds: 2D ndarray of shape(2, ncols) (1st row: lower bounds, 2nd row: upper bounds) :param constr_coefs: matrix with constraint coefficients :type constr_coefs: 2D nparray with shape(nrows, ncols) :param row_bnds: lower and upper bounds of auxhiliary (row) variables - :type row_bnds: 2D ndarray of shape(2, nrows) (1st row: lower bounds, 2nd row: upper bounds) + :type row_bnds: 2D ndarray of shape(nrows, 2) (1st col: lower bounds, 2nd col: upper bounds) + :param col_bnds: lower and upper bounds of structural (col) variables + :type col_bnds: 2D ndarray of shape(2, ncols) (1st row: lower bounds, 2nd row: upper bounds) :param direction: direction for the optimization ('maximize' or 'minimize') :type direction: string """ @@ -123,7 +122,7 @@ class LpProblem: assert len(obj_coefs) == self.ncols assert col_bnds.shape == (2, self.ncols) - assert row_bnds.shape == (2, self.nrows) + assert row_bnds.shape == (self.nrows, 2) # configure objective function glpk.glp_add_cols(self.lp, self.ncols) @@ -137,7 +136,7 @@ class LpProblem: # configure constraints glpk.glp_add_rows(self.lp, self.nrows) - for i, lb_ub in enumerate(row_bnds.T): + for i, lb_ub in enumerate(row_bnds): var_type, lb, ub = self._get_variable_type(lb_ub) glpk.glp_set_row_bnds(self.lp, 1 + i, var_type, lb, ub) constr_coefs_sparse = scipy.sparse.coo_matrix(constr_coefs) @@ -251,11 +250,11 @@ class LpProblem: glpk.glp_scale_prob(self.lp, opt) # sjj = [glpk.glp_get_sjj(self.lp, 1+i) for i in range(glpk.glp_get_num_cols(self.lp))] # rii = [glpk.glp_get_rii(self.lp, 1+i) for i in range(glpk.glp_get_num_rows(self.lp))] - self.simplex_result = glpk.glp_simplex(self.lp, None) - return self.results(short_result) + simplex_result = glpk.glp_simplex(self.lp, None) + return self.results(simplex_result, short_result) def get_row_value(self, row): - """Retrieve right hand side of a constraint (auxiliary variable) + """Retrieve right-hand side of a constraint (auxiliary variable) :param row: index into constraints :type row: int @@ -265,11 +264,13 @@ class LpProblem: assert row <= glpk.glp_get_num_rows(self.lp) return glpk.glp_get_row_prim(self.lp, row) - def results(self, short_result=False): + def results(self, simplex_result, short_result=False): """Collect results for the optimization Alternatively, reduced results, e.g. when doing pFBA + :param simplex_result: result from the LP solver + :type simplex_result: int :param short_result: short result (only objective value and status) or long result :type short_result: bool (default:False) :return: result from the FBA optimization @@ -277,13 +278,14 @@ class LpProblem: """ self.res = { 'fun': glpk.glp_get_obj_val(self.lp), - 'simplex_status': self.simplex_result, - 'simplex_message': simplex_status[self.simplex_result], + 'success': glpk.glp_get_status(self.lp) == glpk.GLP_OPT, } if short_result is False: self.res['x'] = np.array([glpk.glp_get_col_prim(self.lp, 1 + i) for i in range(self.ncols)]) self.res['reduced_costs'] = np.array([glpk.glp_get_col_dual(self.lp, 1 + i) for i in range(self.ncols)]) self.res['shadow_prices'] = np.array([glpk.glp_get_row_dual(self.lp, 1 + i) for i in range(self.nrows)]) + self.res['simplex_result'] = simplex_result + self.res['simplex_message'] = simplex_status[simplex_result], self.res['generic_status'] = generic_status[glpk.glp_get_status(self.lp)] self.res['primal_status'] = prim_status[glpk.glp_get_prim_stat(self.lp)] self.res['dual_status'] = dual_status[glpk.glp_get_dual_stat(self.lp)] diff --git a/xbanalysis/problems/rba_problem.py b/xbanalysis/problems/rba_problem.py new file mode 100644 index 0000000000000000000000000000000000000000..062aeea792375c5d568252617c478dce66d50f53 --- /dev/null +++ b/xbanalysis/problems/rba_problem.py @@ -0,0 +1,264 @@ +"""Implementation of GBA Problem class. + + - several process machines supported (yet to be validated) + - non-enzymatic metabolic reactions supported (yet to be validated) + + Yet to implement: + - enzyme degradation + - enzyme transitions + - several compartments constraints + - target concentrations and target fluxes + +Peter Schubert, HHU Duesseldorf, June 2022 +""" + +import numpy as np + +from xbanalysis.problems.lp_problem import LpProblem + + +class RbaProblem: + + def __init__(self, xba_model): + """ + + :param xba_model: + """ + # metabolites and enzymes in the model + metab_sids = [s.id for s in xba_model.species.values() if s.sboterm == 'SBO:0000247'] + self.m_sids = [sid for sid in metab_sids if xba_model.species[sid].constant is False] + self.ext_conc = {sid: xba_model.species[sid].initial_conc + for sid in metab_sids if xba_model.species[sid].constant is True} + self.mr_rids = [r.id for r in xba_model.reactions.values() + if r.sboterm in ('SBO:0000167', 'SBO:0000179', 'SBO:0000182') and r.metabs_only] + self.mrid2enz = {rid: xba_model.reactions[rid].enzyme for rid in self.mr_rids} + self.enz_sids = [enz_sid for enz_sid in self.mrid2enz.values() if enz_sid is not None] + self.rev_enz_sids = [enz_sid for mr_rid, enz_sid in self.mrid2enz.items() + if enz_sid is not None and xba_model.reactions[mr_rid].reversible] + self.processes = {s.rba_process_machine: sid for sid, s in xba_model.species.items() + if hasattr(s, 'rba_process_machine')} + self.psenz_rids = [xba_model.species[enz_sid].ps_rid for enz_sid in self.enz_sids] + self.pspm_rids = [xba_model.species[enz_sid].ps_rid for enz_sid in self.processes.values()] + self.densities = {cid: c.rba_frac_gcdw for cid, c in xba_model.compartments.items() + if hasattr(c, 'rba_frac_gcdw')} + + self.rba_cols = self.mr_rids + self.enz_sids + list(self.processes) + self.rba_rows = (self.m_sids + list(self.processes) + + [sid + '_feff' for sid in self.enz_sids] + + [sid + '_reff' for sid in self.rev_enz_sids] + + list(self.densities)) + + self.is_rba_model = self.check_rba_model(xba_model) + if self.is_rba_model is False: + print('Error: not a GBA model - missing parameters') + return + + fix_m, var_m, row_bnds_m = self.construct_mass_balance(xba_model) + fix_pc, var_pc, row_bnds_pc = self.construct_process_capacities(xba_model) + fix_eff, var_eff, row_bnds_eff = self.construct_efficencies(xba_model) + fix_cd, var_cd, row_bnds_cd = self.construct_densities(xba_model) + self.fix_cd = fix_cd + self.fix_mat = np.vstack((fix_m, fix_pc, fix_eff, fix_cd)) + self.var_mat = np.vstack((var_m, var_pc, var_eff, var_cd)) + self.row_bnds = np.vstack((row_bnds_m, row_bnds_pc, row_bnds_eff, row_bnds_cd)) + + # in RBA we only check feasibility. + # Here we maximize enzyme amount and check that objective value e.g. > 1e-10 mmol/gcdw + self.obj_dir = 'maximize' + self.obj_coefs = np.hstack((np.zeros(len(self.mr_rids)), + np.ones(len(self.enz_sids) + len(self.processes)))) + col_lbs = np.hstack(([xba_model.reactions[rid].fbc_lb for rid in self.mr_rids], + np.zeros(len(self.enz_sids) + len(self.processes)))) + col_ubs = np.hstack(([xba_model.reactions[rid].fbc_ub for rid in self.mr_rids], + np.nan * np.ones(len(self.enz_sids) + len(self.processes)))) + self.col_bnds = np.vstack((col_lbs, col_ubs)) + self.scaling = '2N' + self.max_iter = 20 + + def check_rba_model(self, xba_model): + """Check if rba related data available in the XBA model + + :param xba_model: Xba model with data extracted from SBML file + :type xba_model: XbaModel + :return: indicator if model contains data required for RBA problem formulation + :rtype: bool + """ + is_rba_model = True + + r_check = {'flux lower bound': 'fbc_lb', 'flux upper bound': 'fbc_ub', 'kapp forward': 'rba_kapp_f'} + for check, param in r_check.items(): + missing = [rid for rid in self.mr_rids if hasattr(xba_model.reactions[rid], param) is False] + if len(missing) > 0: + print(f'missing {check} in:', missing) + is_rba_model = False + + missing = [rid for rid in self.mr_rids if xba_model.reactions[rid].reversible + and hasattr(xba_model.reactions[rid], 'rba_kapp_r') is False] + if len(missing) > 0: + print('missing kapp reverse in:', missing) + is_rba_model = False + + missing = [sid for sid in self.m_sids + self.enz_sids + list(self.processes.values()) + if hasattr(xba_model.species[sid], 'mw') is False] + if len(missing) > 0: + print('missing molecular weight in:', missing) + is_rba_model = False + + missing = [sid for sid in list(self.processes.values()) + if hasattr(xba_model.species[sid], 'rba_process_capacity') is False] + if len(missing) > 0: + print('missing process capacity in:', missing) + is_rba_model = False + + if len(self.densities) == 0: + print('missing a density constraint') + is_rba_model = False + return is_rba_model + + def rba_optimize(self): + """ + + :return: + """ + if self.is_rba_model is False: + print('Error: not a RBA model - missing parameters') + res = {'success': False, 'fun': 0.0, 'message': 'not an RBA model'} + return res + + res = {} + # Preanalysis: find range of growth rates + gr = {'lb': 0.0, 'ub': np.nan, 'try': 1.0} + for i in range(self.max_iter): + constr_coefs = self.fix_mat + gr['try'] * self.var_mat + lp = LpProblem() + lp.configure(self.obj_coefs, constr_coefs, self.row_bnds, self.col_bnds, self.obj_dir) + res = lp.solve(scaling=self.scaling) + del lp + if res['success'] and res['fun'] > 1e-10: + gr['lb'] = gr['try'] + gr['try'] *= 10 + else: + if gr['try'] <= 1e-4: + break + gr['ub'] = gr['try'] + gr['try'] /= 10 + if gr['lb'] > 0.0 and np.isfinite(gr['ub']): + break + + if gr['lb'] == 0.0 or np.isfinite(gr['ub']) is False: + res['success'] = False + return res + + # find eaxt growth rate + for i in range(self.max_iter): + gr['try'] = 0.5 * (gr['lb'] + gr['ub']) + constr_coefs = self.fix_mat + gr['try'] * self.var_mat + lp = LpProblem() + lp.configure(self.obj_coefs, constr_coefs, self.row_bnds, self.col_bnds, self.obj_dir) + res = lp.solve(scaling=self.scaling) + del lp + if res['success'] and res['fun'] > 1e-10: + gr['lb'] = gr['try'] + else: + gr['ub'] = gr['try'] + if (gr['ub'] - gr['lb']) < gr['try'] * 1e-5: + break + res['fun'] = gr['try'] + return res + + def construct_mass_balance(self, xba_model): + + fix_m_x_mr = xba_model.get_stoic_matrix(self.m_sids, self.mr_rids) + var_m_x_enz = xba_model.get_stoic_matrix(self.m_sids, self.psenz_rids) + var_m_x_pm = xba_model.get_stoic_matrix(self.m_sids, self.pspm_rids) + fix_m = np.hstack((fix_m_x_mr, + np.zeros((len(self.m_sids), len(self.enz_sids)+len(self.processes))))) + var_m = np.hstack((np.zeros((len(self.m_sids), len(self.mr_rids))), + var_m_x_enz, + var_m_x_pm)) + row_bnds_m = np.zeros((len(self.m_sids), 2)) + return fix_m, var_m, row_bnds_m + + def construct_process_capacities(self, xba_model): + + var_pc_x_enz = np.zeros((len(self.processes), len(self.psenz_rids))) + for idx, pm in enumerate(self.processes): + var_pc_x_enz[idx] = [xba_model.species[enz_sid].rba_process_costs.get(pm, 0.0) + for enz_sid in self.enz_sids] + var_pc_x_pm = np.zeros((len(self.processes), len(self.processes))) + for idx, pm in enumerate(self.processes): + var_pc_x_pm[idx] = [xba_model.species[enz_sid].rba_process_costs.get(pm, 0.0) + for enz_sid in self.processes.values()] + fix_pc_x_pm = -np.diag([xba_model.species[enz_sid].rba_process_capacity + for enz_sid in self.processes.values()]) * 3600 + fix_pc = np.hstack((np.zeros((len(self.processes), len(self.mr_rids) + len(self.enz_sids))), + fix_pc_x_pm)) + var_pc = np.hstack((np.zeros((len(self.processes), len(self.mr_rids))), + var_pc_x_enz, var_pc_x_pm)) + row_bnds_pc = np.zeros((len(self.processes), 2)) + return fix_pc, var_pc, row_bnds_pc + + def get_efficiencies(self, xba_model): + + efficiencies = np.zeros((2, len(self.enz_sids))) + idx = 0 + for mr_rid, enz_sid in self.mrid2enz.items(): + if enz_sid is not None: + r = xba_model.reactions[mr_rid] + saturation = 1.0 + if hasattr(r, 'rba_km_ext'): + for sid in r.reactants: + if sid in self.ext_conc: + saturation *= self.ext_conc[sid] / (self.ext_conc[sid] + r.rba_km_ext) + efficiencies[0, idx] = r.rba_kapp_f * 3600 * saturation + saturation = 1.0 + if r.reversible: + if hasattr(r, 'rba_km_ext'): + for sid in r.products: + if sid in self.ext_conc: + saturation *= self.ext_conc[sid] / (self.ext_conc[sid] + r.rba_km_ext) + efficiencies[1, idx] = r.rba_kapp_r * 3600 * saturation + idx += 1 + return efficiencies + + def construct_efficencies(self, xba_model): + + efficiencies = self.get_efficiencies(xba_model) + mask = np.array([enz_sid is not None for enz_sid in self.mrid2enz.values()]) + fix_feff_x_mr = np.diag(np.ones(len(self.mrid2enz)))[mask] + fix_feff_x_enz = np.diag(-efficiencies[0]) + fix_feff_x_pm = np.zeros((len(self.enz_sids), len(self.processes))) + fix_feff = np.hstack((fix_feff_x_mr, fix_feff_x_enz, fix_feff_x_pm)) + row_bnds_feff = np.vstack((np.ones(len(self.enz_sids)) * np.nan, np.zeros(len(self.enz_sids)))).T + + mask = [enz_sid is not None and xba_model.reactions[mr_rid].reversible + for mr_rid, enz_sid in self.mrid2enz.items()] + fix_reff_x_mr = np.diag(-np.ones(len(self.mrid2enz)))[mask] + mask = [enz_sid in self.rev_enz_sids for enz_sid in self.enz_sids] + fix_reff_x_enz = np.diag(-efficiencies[1])[mask] + fix_reff_x_pm = np.zeros((len(self.rev_enz_sids), len(self.processes))) + fix_reff = np.hstack((fix_reff_x_mr, fix_reff_x_enz, fix_reff_x_pm)) + row_bnds_reff = np.vstack((np.ones(len(self.rev_enz_sids)) * np.nan, np.zeros(len(self.rev_enz_sids)))).T + fix_eff = np.vstack((fix_feff, fix_reff)) + var_eff = np.zeros_like(fix_eff) + row_bnds_eff = np.vstack((row_bnds_feff, row_bnds_reff)) + return fix_eff, var_eff, row_bnds_eff + + def construct_densities(self, xba_model): + fix_cd_x_mr = np.zeros((len(self.densities), len(self.mr_rids))) + fix_cd_x_enz = np.zeros((len(self.densities), len(self.enz_sids))) + for row_idx, cid in enumerate(self.densities): + for col_idx, sid in enumerate(self.enz_sids): + if xba_model.species[sid].compartment == cid: + fix_cd_x_enz[row_idx, col_idx] = xba_model.species[sid].mw / 1000 + fix_cd_x_pm = np.zeros((len(self.densities), len(self.processes))) + for row_idx, cid in enumerate(self.densities): + for col_idx, sid in enumerate(self.processes.values()): + if xba_model.species[sid].compartment == cid: + fix_cd_x_pm[row_idx, col_idx] = xba_model.species[sid].mw / 1000 + fix_cd = np.hstack((fix_cd_x_mr, fix_cd_x_enz, fix_cd_x_pm)) + + var_cd = np.zeros_like(fix_cd) + row_bnds_cd = np.vstack((np.zeros(len(self.densities)), + [val for val in self.densities.values()])).T + return fix_cd, var_cd, row_bnds_cd diff --git a/xbanalysis/utils/utils.py b/xbanalysis/utils/utils.py index 2e3f5b836fa5b4141e350a6118a728910bd95d11..348bf00fd6356654b45b3ffcfdbf2d7fbc12b67b 100644 --- a/xbanalysis/utils/utils.py +++ b/xbanalysis/utils/utils.py @@ -6,6 +6,9 @@ Peter Schubert, HHU Duesseldorf, May 2022 import sys import re +xml_gba_ns = 'http://www.hhu.de/ccb/gba/ns' +xml_rba_ns = 'http://www.hhu.de/ccb/rba/ns' + def get_sbml_ids(df, sbo_id): """Get SBML IDs with specified sboterm-id.