diff --git a/setup.py b/setup.py index 8d9f5c6d8c05c317e84af7929638cfca551a2301..2c04d6734c30e6928996e9ff4f15d7dc5485d051 100755 --- a/setup.py +++ b/setup.py @@ -37,6 +37,7 @@ setup( packages=find_packages(exclude='docs'), install_requires=['pandas>=1.3.0', 'numpy >= 1.20.0', + 'sympy >= 1.9.0', 'sbmlxdf>=0.2.5'], python_requires=">=3.7", keywords=['modeling', 'standardization', 'SBML'], diff --git a/xbanalysis/__init__.py b/xbanalysis/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..57b2abe285bb2a77cbf467c0ae421d5d00bed2db --- /dev/null +++ b/xbanalysis/__init__.py @@ -0,0 +1,17 @@ +""" +XBanalysis package +================== + +Package generating XBA problems from SBML. +""" + +from .model import * +from .problems import * +from .utils import * + +from . import model, problems, utils + +__all__ = [] +__all__ += model.__all__ +__all__ += problems.__all__ +__all__ += utils.__all__ diff --git a/xbanalysis/model/__init__.py b/xbanalysis/model/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..b745827946a10a2b2c3b00c31ebbe4dea7ce7d79 --- /dev/null +++ b/xbanalysis/model/__init__.py @@ -0,0 +1,10 @@ +"""Subpackage with XBA model classes """ + +from .xba_model import XbaModel +from .xba_compartment import XbaCompartment +from .xba_function import XbaFunction +from .xba_parameter import XbaParameter +from .xba_reaction import XbaReaction +from .xba_species import XbaSpecies + +__all__ = ['XbaModel', 'XbaCompartment', 'XbaFunction', 'XbaParameter', 'XbaReaction', 'XbaSpecies'] diff --git a/xbanalysis/model/xba_compartment.py b/xbanalysis/model/xba_compartment.py new file mode 100644 index 0000000000000000000000000000000000000000..752443d573993b241e5764775139db2457ff3b00 --- /dev/null +++ b/xbanalysis/model/xba_compartment.py @@ -0,0 +1,13 @@ +"""Implementation of XBAcompartment class. + +Peter Schubert, HHU Duesseldorf, May 2022 +""" + +class XbaCompartment: + + def __init__(self, s_compartment): + self.id = s_compartment.name + self.name = s_compartment.get('name', '') + self.size = s_compartment['size'] + self.units = s_compartment['units'] + self.dimension = s_compartment['spatialDimension'] \ No newline at end of file diff --git a/xbanalysis/model/xba_function.py b/xbanalysis/model/xba_function.py new file mode 100644 index 0000000000000000000000000000000000000000..923b9a06d5b6f9035c68382c28d2a39f207c3507 --- /dev/null +++ b/xbanalysis/model/xba_function.py @@ -0,0 +1,31 @@ +"""Implementation of XBAfunction class. + +Peter Schubert, HHU Duesseldorf, May 2022 +""" + +import re +import sbmlxdf + + +class XbaFunction: + + def __init__(self, s_function): + self.id = s_function.name + self.name = s_function.get('name', '') + self.math = s_function['math'] + + m = re.search(r'\blambda\((.*)\)', sbmlxdf.misc.mathml2numpy(s_function['math'])) + if m and len(m.groups()) > 0: + lambda_def = [kw.strip() for kw in m.group(1).split(',')] + lambda_func = lambda_def[-1] + lambda_params = [] + # parameters getting protected by prefix/postfix of '_' + for param in lambda_def[:-1]: + lambda_params.append('_' + param + '_') + lambda_func = re.sub(r'\b' + param + r'\b', '_' + param + '_', lambda_func) + + self.protect_def = lambda_func + self.protect_params = lambda_params + + def get_function(self): + return self.protect_def, self.protect_params diff --git a/xbanalysis/model/xba_model.py b/xbanalysis/model/xba_model.py new file mode 100644 index 0000000000000000000000000000000000000000..6e127885b53eb4da529d343648d05644401e0b7a --- /dev/null +++ b/xbanalysis/model/xba_model.py @@ -0,0 +1,43 @@ +"""Implementation of XBAmodel class. + +Peter Schubert, HHU Duesseldorf, May 2022 +""" + +import os +import time +import sbmlxdf + +from .xba_compartment import XbaCompartment +from .xba_function import XbaFunction +from .xba_parameter import XbaParameter +from .xba_reaction import XbaReaction +from .xba_species import XbaSpecies + + +class XbaModel: + + def __init__(self, sbml_file): + if os.path.exists(sbml_file) is False: + print(f'{sbml_file} does not exist') + return + print(f'loading: {sbml_file} (last modified: {time.ctime(os.path.getmtime(sbml_file))})') + sbml_model = sbmlxdf.Model(sbml_file) + if sbml_model.isModel is False: + print(f'{sbml_file} seems not to be a valid SBML model') + return + model_dict = sbml_model.to_df() +# self.model_dict = model_dict ######### - can be removed + self.compartments = {cid: XbaCompartment(row) + for cid, row in model_dict['compartments'].iterrows()} + self.parameters = {pid: XbaParameter(row) + for pid, row in model_dict['parameters'].iterrows()} + self.functions = {fid: XbaFunction(row, ) + for fid, row in model_dict['funcDefs'].iterrows()} + self.species = {sid: XbaSpecies(row) + for sid, row in model_dict['species'].iterrows()} + self.reactions = {rid: XbaReaction(row, self.functions) + for rid, row in model_dict['reactions'].iterrows()} + + for r in self.reactions.values(): + r.set_enzyme(self.species) + r.add_species_usage(self.species) diff --git a/xbanalysis/model/xba_parameter.py b/xbanalysis/model/xba_parameter.py new file mode 100644 index 0000000000000000000000000000000000000000..656dd5a75398854fccb677b15b264ce64cdc5c3b --- /dev/null +++ b/xbanalysis/model/xba_parameter.py @@ -0,0 +1,14 @@ +"""Implementation of XBAparameter class. + +Peter Schubert, HHU Duesseldorf, May 2022 +""" + +class XbaParameter: + + def __init__(self, s_parameter): + self.id = s_parameter.name + self.name = s_parameter.get('name', '') + self.sboterm = s_parameter.get('sboterm', '') + self.value = s_parameter['value'] + self.constant = s_parameter['constant'] + self.units = s_parameter['units'] \ No newline at end of file diff --git a/xbanalysis/model/xba_reaction.py b/xbanalysis/model/xba_reaction.py new file mode 100644 index 0000000000000000000000000000000000000000..15edd1eefa250e4603b243a2c9963a6bd10949bb --- /dev/null +++ b/xbanalysis/model/xba_reaction.py @@ -0,0 +1,80 @@ +"""Implementation of XBAreaction class. + +Peter Schubert, HHU Duesseldorf, May 2022 +""" + +import re +import sbmlxdf +from xbanalysis.utils.utils import get_species_stoic, get_local_params, expand_kineticlaw + + +class XbaReaction: + + def __init__(self, s_reaction, functions): + self.id = s_reaction.name + self.name = s_reaction.get('name', '') + self.sboterm = s_reaction['sboterm'] + self.reaction_string = s_reaction['reactionString'] + self.reversible = s_reaction['reversible'] + self.products = get_species_stoic(s_reaction['products']) + self.reactants = get_species_stoic(s_reaction['reactants']) + self.modifiers = get_species_stoic(s_reaction['modifiers']) + self.local_params = get_local_params(s_reaction) + self.kinetic_law = s_reaction['kineticLaw'] if type(s_reaction['kineticLaw']) == str else '' + # convert to numpy, replace local parameters with numerical values, inline lambda functions: + expanded_kl = sbmlxdf.misc.mathml2numpy(self.kinetic_law) + for pid, val in self.local_params.items(): + expanded_kl = re.sub(r'\b' + pid + r'\b', str(val), expanded_kl) + self.expanded_kl = expand_kineticlaw(expanded_kl, functions) + self.ps_product = '' + self.enzyme = '' + + def set_enzyme(self, species): + for sid in self.modifiers: + if species[sid].sboterm == 'SBO:0000250' or species[sid].sboterm == 'SBO:0000252': + self.enzyme = sid + break + + def add_species_usage(self, species): + # for metabolic reactions add to the species consumed/produced the metabolic reaction id + # !!! we could also enter the Reaction Object references instead !!!!!!!!!! + if (self.sboterm == 'SBO:0000167') or (self.sboterm == 'SBO:0000179') or (self.sboterm == 'SBO:0000182'): + for sid, val in self.products.items(): + species[sid].add_m_reaction(self.id, val) + for sid, val in self.reactants.items(): + species[sid].add_m_reaction(self.id, -val) + # protein degradation reaction, assume that first protein in reactants is getting degraded + # a protein degradation reaction is still a metabolic reaction + if self.sboterm == 'SBO:0000179': + for sid in self.reactants: + if (species[sid].sboterm == 'SBO:0000252') or (species[sid].sboterm == 'SBO:0000250'): + species[sid].set_p_degrad_rid(self.id) + break + # protein interconversion reactions + if self.sboterm == 'SBO:0000182': + for sid in self.reactants: + if (species[sid].sboterm == 'SBO:0000252') or (species[sid].sboterm == 'SBO:0000250'): + species[sid].set_p_conversion_rid(self.id) + break + for sid in self.products: + if (species[sid].sboterm == 'SBO:0000252') or (species[sid].sboterm == 'SBO:0000250'): + species[sid].set_p_conversion_rid(self.id) + break + + # for protein synthesis reactions identify the enzyme being produced from reaction.products + # to species being consumed add the enzyme product with corresponding stoichiometry + # to species being produced add the enzyme product with corresponding stoichiometry, unless + # the species is itself the enzyme product, in which case we add the ps reaction producing it. + if self.sboterm == 'SBO:0000589': + enz_sid = '' + for sid in self.products: + if (species[sid].sboterm == 'SBO:0000252') or (species[sid].sboterm == 'SBO:0000250'): + enz_sid = sid + self.ps_product = enz_sid + species[enz_sid].set_ps_rid(self.id) + break + for sid, val in self.products.items(): + if sid != enz_sid: + species[sid].add_ps_reaction(enz_sid, val) + for sid, val in self.reactants.items(): + species[sid].add_ps_reaction(enz_sid, -val) diff --git a/xbanalysis/model/xba_species.py b/xbanalysis/model/xba_species.py new file mode 100644 index 0000000000000000000000000000000000000000..e18fdc1e0cf0791cbce63225877c5fd4e878f36d --- /dev/null +++ b/xbanalysis/model/xba_species.py @@ -0,0 +1,40 @@ +"""Implementation of XBAspecies class. + +Peter Schubert, HHU Duesseldorf, May 2022 +""" + +import sbmlxdf + + +class XbaSpecies: + + def __init__(self, s_species): + self.id = s_species.name + self.name = s_species.get('name', '') + self.sboterm = s_species['sboterm'] + self.compartment = s_species['compartment'] + self.initial_conc = s_species['initialConcentration'] + self.constant = s_species['constant'] + self.units = s_species['substanceUnits'] + attrs = sbmlxdf.misc.extract_xml_attrs(s_species['xmlAnnotation'], token='molecule') + self.weight = float(attrs['weight_Da']) + self.m_reactions = {} + self.ps_reactions = {} + self.ps_rid = None + self.p_degrad_rid = None + self.p_convert_rid = None + + def add_m_reaction(self, rid, stoic): + self.m_reactions[rid] = stoic + + def add_ps_reaction(self, rid, stoic): + self.ps_reactions[rid] = stoic + + def set_ps_rid(self, rid): + self.ps_rid = rid + + def set_p_degrad_rid(self, rid): + self.p_degrad_rid = rid + + def set_p_conversion_rid(self, rid): + self.p_convert_rid = rid diff --git a/xbanalysis/problems/__init__.py b/xbanalysis/problems/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..f9936f1c2aeb2e22f55cef9ae6f0136456b6ecdc --- /dev/null +++ b/xbanalysis/problems/__init__.py @@ -0,0 +1,5 @@ +"""Subpackage with XBA model classes """ + +from .gba_problem import GbaProblem + +__all__ = ['GbaProblem'] diff --git a/xbanalysis/problems/gba_problem.py b/xbanalysis/problems/gba_problem.py new file mode 100644 index 0000000000000000000000000000000000000000..0488eae2e111152a011d2ac0acef55eba2183fc2 --- /dev/null +++ b/xbanalysis/problems/gba_problem.py @@ -0,0 +1,328 @@ +"""Implementation of XBAmodel class. + +Peter Schubert, HHU Duesseldorf, May 2022 +""" + +import re +import numpy as np +import sympy +import types + + +class GbaProblem: + + def __init__(self, xba_model): + self.xba_model = xba_model + + # metabolites and enzymes in the model + metab_sids = [s.id for s in xba_model.species.values() if s.sboterm == 'SBO:0000247'] + enz_sids = [s.id for s in xba_model.species.values() if (s.sboterm == 'SBO:0000250' or + s.sboterm == 'SBO:0000252')] + + # optimization variables + self.var_metab_sids = [sid for sid in metab_sids if xba_model.species[sid].constant is False] + self.var_enz_sids = [sid for sid in enz_sids if xba_model.species[sid].constant is False] + self.var_sids = self.var_metab_sids + self.var_enz_sids + self.var_n_metabs = len(self.var_metab_sids) + self.var_sid2idx = {sid: idx for idx, sid in enumerate(self.var_sids)} + self.var_initial = np.array([xba_model.species[sid].initial_conc for sid in self.var_sids]) + # self.var_lower_bounds = np.zeros(len(self.var_sids)) * 1 + # self.var_upper_bounds = np.ones(len(self.var_sids)) * 0.1 + + # constant concentrations + self.const_metab_sids = [sid for sid in metab_sids if xba_model.species[sid].constant is True] + self.const_sid2idx = {sid: idx for idx, sid in enumerate(self.const_metab_sids)} + self.const_enz_sids = [sid for sid in enz_sids if xba_model.species[sid].constant is True] + + self.all_enz_sids = self.var_enz_sids + self.const_enz_sids + + # reactions - yet to implement, accept child SBO terms, e.g. here of 00167 + self.mr_rids = [r.id for r in xba_model.reactions.values() + if ((r.sboterm == 'SBO:0000167') or + (r.sboterm == 'SBO:0000179') or + (r.sboterm == 'SBO:0000182'))] + self.ps_rids = [r.id for r in xba_model.reactions.values() if r.sboterm == 'SBO:0000589'] + spont_rids = [rid for rid in self.mr_rids if xba_model.reactions[rid].enzyme == ''] + + # calculate stoichiometric for mass balance equality constraints + s_mat = self.get_stoic_matrix(self.var_metab_sids, self.mr_rids) + var_sids_vol = np.array([self.xba_model.compartments[self.xba_model.species[sid].compartment].size + for sid in self.var_metab_sids]) + s_scaled_mat = s_mat/var_sids_vol.reshape(-1, 1) + + all_enz_ps_rids = [xba_model.species[sid].ps_rid for sid in self.all_enz_sids] + p_mat = self.get_stoic_matrix(self.var_metab_sids, all_enz_ps_rids) + all_enz_vol = np.array([self.xba_model.compartments[self.xba_model.species[sid].compartment].size + for sid in self.all_enz_sids]) + vol_scale_p_mat = np.outer(1.0/var_sids_vol, all_enz_vol) + p_scaled_mat = p_mat * vol_scale_p_mat + + # model parameters + self.model_params = {pid: p.value for pid, p in xba_model.parameters.items()} + self.model_params.update({cid: c.size for cid, c in xba_model.compartments.items()}) + self.model_params['macro'] = self.model_params.get('macro', 0.0) > 0.0 + self.model_params['ext_conc'] = np.array([xba_model.species[sid].initial_conc + for sid in self.const_metab_sids]) + self.model_params['const_enz_conc'] = np.array([xba_model.species[sid].initial_conc + for sid in self.const_enz_sids]) + self.model_params['S_scaled_mat'] = s_scaled_mat + self.model_params['P_scaled_mat'] = p_scaled_mat + # self.model_params['P_scaled_csr'] = scipy.sparse.csr_matrix(p_mat * vol_scale_p_mat) + self.model_params['np'] = np + self.model_params['weights'] = np.array([xba_model.species[sid].weight for sid in self.var_sids]) + self.model_params['const_enz_weights'] = np.array([xba_model.species[sid].weight + for sid in self.const_enz_sids]) + self.model_params['weights'] = np.array([xba_model.species[sid].weight for sid in self.var_sids]) + + # create functions + self.mrs, self.mrs_jac = self.get_reactions(self.mr_rids) + self.spontrs, self.spontrs_jac = self.get_reactions(spont_rids) + self.pss, self.pss_jac = self.get_reactions(self.ps_rids) + self.inv_ps_rates, self.inv_ps_rates_jac = self.get_inv_ps_rates() + # self.gammas, self.gammas_jac = self.get_gammas() + self.p_degrads, self.p_degrads_jac = self.get_p_degrad_rates() + self.enz_conv_mb, self.enz_conv_mb_jac = self.get_enz_interconv_mb() + + def get_stoic_matrix(self, sids, rids): + # matrix for free metabolites and list of reactions + stoic_mat = np.zeros((len(sids), len(rids))) + map_metab_row = {sid: idx for idx, sid in enumerate(sids)} + for col, rid in enumerate(rids): + for sid, stoic in self.xba_model.reactions[rid].reactants.items(): + if sid in map_metab_row: + stoic_mat[map_metab_row[sid], col] = -stoic + for sid, stoic in self.xba_model.reactions[rid].products.items(): + if sid in map_metab_row: + stoic_mat[map_metab_row[sid], col] = stoic + return stoic_mat + + def update_model_params(self, key, val): + self.model_params[key] = val + + def _to_vectors(self, rs_str): + # replace free and constant metabolites by vector elements + rs_v_str = [] + for r_v_str in rs_str: + for idx, sid in enumerate(self.var_sids): + r_v_str = re.sub(r'\b' + sid + r'\b', f'x[{idx}]', r_v_str) + for idx, sid in enumerate(self.const_metab_sids): + r_v_str = re.sub(r'\b' + sid + r'\b', f'ext_conc[{idx}]', r_v_str) + for idx, sid in enumerate(self.const_enz_sids): + r_v_str = re.sub(r'\b' + sid + r'\b', f'const_enz_conc[{idx}]', r_v_str) + rs_v_str.append(r_v_str) + return rs_v_str + + def _gradients(self, rs_v_str): + # generate derivatives + x = sympy.IndexedBase('x') + ext_conc = sympy.IndexedBase('ext_conc') + const_enz_conc = sympy.IndexedBase('const_enz_conc') + grads_str = [] + for i, r_v_str in enumerate(rs_v_str): + func_sym = sympy.sympify(r_v_str, locals={'x': x, + 'ext_conc': ext_conc, + 'const_enz_conc': const_enz_conc}) + partial_der = [str(func_sym.diff(x[idx])) for idx in range(len(self.var_sids))] + grads_str.append('[' + ', '.join(partial_der) + ']') + # progress_bar(i, len(rs_v_str)) + jac = compile('def jac(x): return np.array([' + ', '.join(grads_str) + '])', + '<string>', 'exec') + return jac + + def get_reactions(self, rids): + kinetics_str = [self.xba_model.reactions[rid].expanded_kl for rid in rids] + v_str = self._to_vectors(kinetics_str) + func_code = compile('def reactions(x): return np.array([' + ', '.join(v_str) + '])', + '<string>', 'exec') + rs = types.FunctionType(func_code.co_consts[0], self.model_params) + rs_jac = types.FunctionType(self._gradients(v_str).co_consts[0], self.model_params) + return rs, rs_jac + + def get_inv_ps_rates(self): + # inverse of max protein synthesis rates as function of variable enzymes and constant enzymes + ps_inv_rs_str = [] + for sid in self.all_enz_sids: + sp = self.xba_model.species[sid] + ps_rid = sp.ps_rid + ps_inv_rs_str.append(f'{sp.compartment}/({self.xba_model.reactions[ps_rid].expanded_kl})') + v_str = self._to_vectors(ps_inv_rs_str) + func_code = compile('def ps_inv(x): return np.array([' + ', '.join(v_str) + '])', + '<string>', 'exec') + inv_ps_rates = types.FunctionType(func_code.co_consts[0], self.model_params) + inv_ps_rates_jac = types.FunctionType(self._gradients(v_str).co_consts[0], self.model_params) + return inv_ps_rates, inv_ps_rates_jac + + def get_p_degrad_rates(self): + # protein degradation rates + degrads_str = [] + for sid in self.all_enz_sids: + sp = self.xba_model.species[sid] + if sp.p_degrad_rid is not None: + degrads_str.append(f'({self.xba_model.reactions[sp.p_degrad_rid].expanded_kl})/' + + f'{sp.compartment}') + else: + degrads_str.append('0') + v_str = self._to_vectors(degrads_str) + func_code = compile('def p_degrads(x): return np.array([' + ', '.join(v_str) + '])', + '<string>', 'exec') + degrads = types.FunctionType(func_code.co_consts[0], self.model_params) + degrads_jac = types.FunctionType(self._gradients(v_str).co_consts[0], self.model_params) + return degrads, degrads_jac + + def get_enz_interconv_mb(self): + # mass balance for interconvering enzymes + # identify enzymes in ExMr submatrix (enzymes x metabolic reactions) + # identify sets of enzymes that interconvert (consider unique sets) + enz_in_mrs_sids = [] + for enz_sid in self.all_enz_sids: + sp = self.xba_model.species[enz_sid] + partner_enz_sids = {enz_sid} + for mr_rid, enz_stoic in sp.m_reactions.items(): + rid = self.xba_model.reactions[mr_rid] + if enz_stoic < 0: + for other_sid, other_stoic in rid.products.items(): + sp_other = self.xba_model.species[other_sid] + if (sp_other.sboterm == 'SBO:0000250') or (sp_other.sboterm == 'SBO:0000252'): + partner_enz_sids.add(other_sid) + if enz_stoic > 0: + for other_sid, other_stoic in rid.reactants.items(): + sp_other = self.xba_model.species[other_sid] + if (sp_other.sboterm == 'SBO:0000250') or (sp_other.sboterm == 'SBO:0000252'): + partner_enz_sids.add(other_sid) + # look for enzyme interconversion + if len(partner_enz_sids) > 1: + # consider unique interconversion sets + if partner_enz_sids not in enz_in_mrs_sids: + enz_in_mrs_sids.append(partner_enz_sids) + + # for each set interconverting enzymes create a mass balance equation + conversions_str = [] + for partner_enz_sids in enz_in_mrs_sids: + enz_sid = partner_enz_sids.pop() + sp = self.xba_model.species[enz_sid] + parts_str = [] + for mr_rid, enz_stoic in sp.m_reactions.items(): + parts_str.append((f'({enz_stoic} * {self.xba_model.reactions[mr_rid].expanded_kl})/' + + f'{sp.compartment}')) + conversions_str.append(' + '.join(parts_str)) + + v_str = self._to_vectors(conversions_str) + func_code = compile('def enz_conv_mb(x): return np.array([' + ', '.join(v_str) + '])', + '<string>', 'exec') + enz_conv_mb = types.FunctionType(func_code.co_consts[0], self.model_params) + enz_conv_mb_jac = types.FunctionType(self._gradients(v_str).co_consts[0], self.model_params) + return enz_conv_mb, enz_conv_mb_jac + + def get_gammas(self): + # ratios between protein degradation and max protein synthesis rates + gammas_str = [] + for sid in self.all_enz_sids: + sp = self.xba_model.species[sid] + if sp.p_degrad_rid is not None: + degrad_rid = sp.p_degrad_rid + ps_rid = sp.ps_rid + gammas_str.append(f'({self.xba_model.reactions[degrad_rid].expanded_kl})/' + + f'({self.xba_model.reactions[ps_rid].expanded_kl})') + else: + gammas_str.append('0') + v_str = self._to_vectors(gammas_str) + func_code = compile('def gammas(x): return np.array([' + ', '.join(v_str) + '])', + '<string>', 'exec') + gammas = types.FunctionType(func_code.co_consts[0], self.model_params) + gammas_jac = types.FunctionType(self._gradients(v_str).co_consts[0], + self.model_params) + return gammas, gammas_jac + + def get_growth_rate(self, x): + # gr = gr0 * (1 - sum(gammas)) + # gr0 = 1/tau + # tau = sum(ce/psme) = sum(ce * 1/psme) + ce_all = np.append(x[self.var_n_metabs:], self.model_params['const_enz_conc']) + tau = self.inv_ps_rates(x).dot(ce_all) + gr0 = 1.0/tau + # gr = gr0 * (1.0 - sum(gammas(x))) + gr = gr0 * (1.0 - sum(self.p_degrads(x) * self.inv_ps_rates(x))) + return gr + + def get_growth_rate_grad(self, x): + # gr0_grad = d(1/tau)dx = (-dtau/dx)/tau**2 = - dtau/dx * gr0**2 + # gr_grad = gr0_grad * (1 - sum(gammas)) - gr0 * sum(gammas_jac) + ce_all = np.append(x[self.var_n_metabs:], self.model_params['const_enz_conc']) + tau = self.inv_ps_rates(x).dot(ce_all) + gr0 = 1.0/tau + dtau_dx = ce_all.dot(self.inv_ps_rates_jac(x)) + dtau_dx[self.var_n_metabs:] += self.inv_ps_rates(x)[:len(self.var_enz_sids)] + gr0_grad = -dtau_dx * gr0**2 + deg_term = (1.0 - sum(self.p_degrads(x) * self.inv_ps_rates(x))) + deg_term_jac = (self.p_degrads(x).reshape(-1, 1) * self.inv_ps_rates_jac(x) + + self.inv_ps_rates(x).reshape(-1, 1) + * self.p_degrads_jac(x)).sum(axis=0) + gr_grad = gr0_grad * deg_term - gr0 * deg_term_jac + return np.nan_to_num(gr_grad, nan=0.0) + + def get_alphas(self, x): + # alpha_e = gr * c_e/psm_e + gamma_e + ce_all = np.append(x[self.var_n_metabs:], self.model_params['const_enz_conc']) + alpha0 = self.get_growth_rate(x) * self.inv_ps_rates(x)*ce_all + deg_term = self.p_degrads(x) * self.inv_ps_rates(x) + alphas = alpha0 + deg_term + return alphas + + def heq_mass_balance(self, x): + # considering syntesis costs for protein degradation + # mass balance for species involved in metabolic reactions: + # dMb/dt: S.dot(mrs) + deg_term + gr*(P.dot(ce) - cm) = 0 + # deg_term: P.dot(pdegs) + # assuming protein degradation functions (pdegs) have soichiomtry of -1 for enzyme + cm_var = x[:self.var_n_metabs] + ce_all = np.append(x[self.var_n_metabs:], self.model_params['const_enz_conc']) + degrad_costs = self.model_params['P_scaled_mat'].dot(self.p_degrads(x)) + metabolism = self.model_params['S_scaled_mat'].dot(self.mrs(x)) + pem = self.model_params['P_scaled_mat'].dot(ce_all) - cm_var + dilution = self.get_growth_rate(x) * pem + metabs_mb = metabolism + dilution + degrad_costs + return metabs_mb + + def heq_mass_balance_jac(self, x): + cm_var = x[:self.var_n_metabs] + ce_all = np.append(x[self.var_n_metabs:], self.model_params['const_enz_conc']) + + metabolism_jac = self.model_params['S_scaled_mat'].dot(self.mrs_jac(x)) + degrad_costs_jac = self.model_params['P_scaled_mat'].dot(self.p_degrads_jac(x)) + + # jacobian of dilution term: d/dx gr * (P.dot(ce) - cm) + # jac = gr_jac * (P.dot(ce) - cm) + gr * (jac(P.dot(ce)) - jac(cm)) + pem = self.model_params['P_scaled_mat'].dot(ce_all) - cm_var + # construct jacobian of dilution term (P.dot(ce) - cm): + # d/dx (P.dot(ce) - cm) = d(P.dot(ce) - cm)/dx = P.dot(dce/dx) - dcm/dx + cm_jac = -np.eye(self.var_n_metabs) + pe_jac = self.model_params['P_scaled_mat'][:, :len(self.var_enz_sids)] + pem_jac = np.hstack([cm_jac, pe_jac]) + dilution_jac = np.outer(pem, self.get_growth_rate_grad(x)) + self.get_growth_rate(x) * pem_jac + metabs_mb_jac = metabolism_jac + dilution_jac + degrad_costs_jac + return np.nan_to_num(metabs_mb_jac, nan=0.0) + + def heq_enz_conv_mb(self, x): + return self.enz_conv_mb(x) + + def heq_enz_conv_mb_jac(self, x): + return self.enz_conv_mb_jac(x) + + def heq_density(self, x): + ce_var = x[self.var_n_metabs:] + if self.model_params.get('macro', True) is True: + density = self.model_params['weights'][self.var_n_metabs:].dot(ce_var) + else: + density = self.model_params['weights'].dot(x) + density += self.model_params['const_enz_weights'].dot( + self.model_params['const_enz_conc']) + return self.model_params['rho'] - density + + def heq_density_jac(self, x): + if self.model_params.get('macro', True) is True: + density_grad = np.zeros(len(self.model_params['weights'])) + density_grad[self.var_n_metabs:] = \ + -self.model_params['weights'][self.var_n_metabs:] + else: + density_grad = -self.model_params['weights'] + return density_grad diff --git a/xbanalysis/utils/__init__.py b/xbanalysis/utils/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..d75f80f03a7b3ed85affe5ffa3d77154d63250ee --- /dev/null +++ b/xbanalysis/utils/__init__.py @@ -0,0 +1,6 @@ +"""Subpackage with utility functions """ + +#rom .xba_model import XBAmodel + +__all__ = [] + diff --git a/xbanalysis/utils/utils.py b/xbanalysis/utils/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..55258d545f5f57a735c1d225b3575456ddcbc957 --- /dev/null +++ b/xbanalysis/utils/utils.py @@ -0,0 +1,158 @@ +"""Implementation of utility functions. + +Peter Schubert, HHU Duesseldorf, May 2022 +""" + +import sys +import re + + +def get_sbml_ids(df, sbo_id): + """Get SBML IDs with specified sboterm-id. + + if sbotermid does not exist, return all IDs + + :param df: SBML component from which to extract IDs + :type df: pandas DataFrame (component from sbmlxdf.to_df()) + :param sbo_id: value of sboterm to be queried, e.g. 260 + :type sbo_id: int + :returns: component ids + :rtype: list of str + """ + if 'sboterm' in df: + ids = (df[df['sboterm'] == 'SBO:{:07d}'.format(sbo_id)]).index.to_list() + else: + ids = df.index.to_list() + return ids + + +def extract_params(record): + """Extract parameters from a record. + + A single record consists of comma separated key-value pairs. + Example: 'key1=val1, key2=val2, ...' is converted to + {key1: val1, key2: val2, ...} + + :param record: key '=' value pairs separated by "," + :type record: str + :returns: key-values pairs extracted from record + :rtype: dict + """ + params = {} + for kv_pair in record_generator(record, sep=','): + if '=' in kv_pair: + k, v = kv_pair.split('=') + params[k.strip()] = v.strip() + return params + + +def sympy_protect(name): + """Modify variable names that conflict with sympy. + + see: https://docs.sympy.org/latest/gotchas.html#variables + + :param name: variable name + :type name: str + :returns: variable name not conflicting with sympy + :rtype: str + """ + if name in {'I', 'E', 'S', 'N', 'C', 'O', 'Q'}: + name += '_sympy_protect' + return name + + +def record_generator(records_str, sep=';'): + """Generator to extract individual records from a string of records. + + :param records_str: containing records separated by sep + :type records_str: str + :param sep: seperator used to separate records + :type sep: str (default: ';') + :returns: key-values pairs extracted from record + :rtype: dict + """ + if type(records_str) == str: + for record in records_str.split(sep): + if len(record.strip()) > 0: + yield record.strip() + + +def get_local_params(reaction): + """Retrieve local parameters of a reaction and their values. + + :param reaction: reaction record from SBML reactions component + :type reaction: pandas Series + :returns: parameter id with parameter type, sympy_id and parameter value + :rtype: dict (key: parameter id, val: value, units, sympy-portected id + """ + local_params = {} + for lp in record_generator(reaction['localParams']): + params = extract_params(lp) + local_params[params['id']] = float(params['value']) + return local_params + + +# added +def get_species_stoic(srefs): + """Retrieve species and stoichiometry from reaction srefs. + + :param srefs: species reference records + :type srefs: string + :returns: species id with corresponding stoichiometry + :rtype: dict (key: species id, val: stoichiometry (1.0 by default, e.g. for modifiers) + """ + species = {} + for sref in record_generator(srefs): + params = extract_params(sref) + species[params['species']] = float(params.get('stoic', '1.0')) + return species + + +def get_species_refs(reaction): + """Retrieve species references used in a reaction with their type. + + :param reaction: reaction record from SBML reactions component + :type reaction: pandas Series + :returns: species id with concentration set to 1.0 (used for enzymes) + :rtype: dict (key: species id, val: 1.0, type, sympy-portected id + """ + species_refs = {} + for sref_type in ['reactants', 'products', 'modifiers']: + if sref_type in reaction: + for sref in record_generator(reaction[sref_type]): + params = extract_params(sref) + sid = params['species'] + species_refs[sid] = [1.0, sref_type, sympy_protect(sid)] + return species_refs + + +# updated +def expand_kineticlaw(rate, fdefs): + """Expand a kinetic law by inlining lambda functions + + :param rate: kinetic law + :type rate: str + :param fdefs: function definitions used in the model + :type fdefs: dict of Functions + :returns: updated kinetic law with lambda function inlined + :rtype: str + """ + for fid, fdef in fdefs.items(): + m = re.search(r'.*\b' + fid + r'\((.*)\)', rate) + if m: + calling_params = [kw.strip() for kw in m.group(1).split(',')] + lambda_def, lambda_params = fdef.get_function() + for i in range(len(lambda_params)): + lambda_def = re.sub(r'\b' + lambda_params[i] + r'\b', calling_params[i], lambda_def) + prefix_pos = re.search(fid, rate).start(0) + rate = (rate[:prefix_pos] + ' (' + lambda_def + rate[m.end(1):]) + return rate + + +# display progress bar +def progress_bar(step, steps): + sys.stdout.write('\r') + progress = (step+1)/steps + bar = int(progress*20) + sys.stdout.write('[{}{}] {:3.0f}%'.format('=' * bar, ' ' * (20 - bar), progress*100)) + sys.stdout.flush()