diff --git a/setup.py b/setup.py index fa9a43be13f533c5d79c8105852341927632c2ec..00f0331d86939ae2471503f4fc82d3a59c5d6b69 100755 --- a/setup.py +++ b/setup.py @@ -39,7 +39,8 @@ setup( 'numpy >= 1.20.0', 'sympy >= 1.9.0', 'scipy >= 1.7.0', - 'sbmlxdf>=0.2.5'], + 'swiglpk >= 5.0.3', + 'sbmlxdf >= 0.2.7'], python_requires=">=3.7", keywords=['modeling', 'standardization', 'SBML'], **setup_kwargs diff --git a/xbanalysis/_version.py b/xbanalysis/_version.py index 8c2bc5ba96bd3e6284170b1f64898e2a465c39bb..8003eb4f7157ca8726c8a5872143949575ec4537 100644 --- a/xbanalysis/_version.py +++ b/xbanalysis/_version.py @@ -1,5 +1,5 @@ """ Definition of version string. """ -__version__ = "0.3.2" +__version__ = "0.3.3" program_name = 'xbanalysis' diff --git a/xbanalysis/model/fbc_objective.py b/xbanalysis/model/fbc_objective.py new file mode 100644 index 0000000000000000000000000000000000000000..4db5d45c8f4918c9ce01f69dd7b053ac60c57bc4 --- /dev/null +++ b/xbanalysis/model/fbc_objective.py @@ -0,0 +1,19 @@ +"""Implementation of FbcObjective class. + +Peter Schubert, HHU Duesseldorf, June 2022 +""" + +import sbmlxdf + + +class FbcObjective: + + def __init__(self, s_fbc_objective): + self.id = s_fbc_objective.name + self.name = s_fbc_objective.get('name', '') + self.direction = s_fbc_objective['type'] + self.active = s_fbc_objective['active'] + self.coefficiants = {} + for reac_ref in sbmlxdf.record_generator(s_fbc_objective['fluxObjectives']): + params = sbmlxdf.extract_params(reac_ref) + self.coefficiants[params['reac']] = float(params['coef']) diff --git a/xbanalysis/model/xba_model.py b/xbanalysis/model/xba_model.py index c72928d636f154d2abdd3d34722af143524b9f38..b4f45b221e284d1dfb4faadfc35078e1f8b1e91e 100644 --- a/xbanalysis/model/xba_model.py +++ b/xbanalysis/model/xba_model.py @@ -5,6 +5,7 @@ Peter Schubert, HHU Duesseldorf, May 2022 import os import time +import numpy as np import sbmlxdf from .xba_compartment import XbaCompartment @@ -12,8 +13,10 @@ from .xba_function import XbaFunction from .xba_parameter import XbaParameter from .xba_reaction import XbaReaction from .xba_species import XbaSpecies +from .fbc_objective import FbcObjective +# TODO: implement isGBAmodel(), isFBAmodel(), isRBAmodel() class XbaModel: def __init__(self, sbml_file): @@ -26,13 +29,13 @@ class XbaModel: print(f'{sbml_file} seems not to be a valid SBML model') return model_dict = sbml_model.to_df() -# self.model_dict = model_dict ######### - can be removed self.compartments = {cid: XbaCompartment(row) for cid, row in model_dict['compartments'].iterrows()} self.parameters = {pid: XbaParameter(row) for pid, row in model_dict['parameters'].iterrows()} - self.functions = {fid: XbaFunction(row, ) - for fid, row in model_dict['funcDefs'].iterrows()} + if 'funcDefs' in model_dict: + self.functions = {fid: XbaFunction(row) + for fid, row in model_dict['funcDefs'].iterrows()} self.species = {sid: XbaSpecies(row) for sid, row in model_dict['species'].iterrows()} self.reactions = {rid: XbaReaction(row, self.functions, self.compartments) @@ -41,3 +44,28 @@ class XbaModel: for r in self.reactions.values(): r.set_enzyme(self.species) r.add_species_usage(self.species) + + if 'fbcObjectives' in model_dict: + self.fbc_objectives = {oid: FbcObjective(row) + for oid, row in model_dict['fbcObjectives'].iterrows()} + + def get_stoic_matrix(self, sids, rids): + """retrieve stoichiometric sub-matrix [sids x rids]. + + :param sids: species ids - rows of sub-matrix + :type sids: list of strings + :param rids: reaction ids - columns of sub-matrix + :type rids: list of strings + :returns: stoic_mat, stoichmetric sub-matrix + :rtype: np.ndarray + """ + stoic_mat = np.zeros((len(sids), len(rids))) + map_metab_row = {sid: idx for idx, sid in enumerate(sids)} + for col, rid in enumerate(rids): + for sid, stoic in self.reactions[rid].reactants.items(): + if sid in map_metab_row: + stoic_mat[map_metab_row[sid], col] = -stoic + for sid, stoic in self.reactions[rid].products.items(): + if sid in map_metab_row: + stoic_mat[map_metab_row[sid], col] = stoic + return stoic_mat diff --git a/xbanalysis/model/xba_reaction.py b/xbanalysis/model/xba_reaction.py index 94b3f966bbf8549e9eb88796e34052ac5d880844..517a1bfb2e327052ec0a2fcaa36dd0e998c0e467 100644 --- a/xbanalysis/model/xba_reaction.py +++ b/xbanalysis/model/xba_reaction.py @@ -19,6 +19,8 @@ class XbaReaction: self.products = get_species_stoic(s_reaction['products']) self.reactants = get_species_stoic(s_reaction['reactants']) self.modifiers = get_species_stoic(s_reaction['modifiers']) + self.fbc_lb = s_reaction.get('fbcLb', float('nan')) + self.fbc_ub = s_reaction.get('fbcUb', float('nan')) self.local_params = get_local_params(s_reaction) self.kinetic_law = s_reaction['kineticLaw'] if type(s_reaction['kineticLaw']) == str else '' # convert to numpy, replace local parameters with numerical values, inline lambda functions: diff --git a/xbanalysis/model/xba_species.py b/xbanalysis/model/xba_species.py index 436537012955b0e481588c15d52e226482d3a1ea..806fd081f3d6e9992ab62ad009abaf9983dbadb7 100644 --- a/xbanalysis/model/xba_species.py +++ b/xbanalysis/model/xba_species.py @@ -6,6 +6,10 @@ Peter Schubert, HHU Duesseldorf, May 2022 import sbmlxdf +xml_gba_ns = 'http://www.hhu.de/ccb/gba/ns' +xml_token = 'molecule' + + class XbaSpecies: def __init__(self, s_species): @@ -17,7 +21,7 @@ class XbaSpecies: self.constant = s_species['constant'] self.boundary = s_species['boundaryCondition'] self.units = s_species['substanceUnits'] - attrs = sbmlxdf.misc.extract_xml_attrs(s_species['xmlAnnotation'], token='molecule') + attrs = sbmlxdf.extract_xml_attrs(s_species['xmlAnnotation'], ns=xml_gba_ns, token=xml_token) self.mw = float(attrs['weight_Da']) self.ps_rid = None self.m_reactions = {} diff --git a/xbanalysis/problems/__init__.py b/xbanalysis/problems/__init__.py index f9936f1c2aeb2e22f55cef9ae6f0136456b6ecdc..c08153cfe41de038cf52979345978028a175fab7 100644 --- a/xbanalysis/problems/__init__.py +++ b/xbanalysis/problems/__init__.py @@ -1,5 +1,7 @@ """Subpackage with XBA model classes """ from .gba_problem import GbaProblem +from .fba_problem import FbaProblem +from .lp_problem import LpProblem -__all__ = ['GbaProblem'] +__all__ = ['GbaProblem', 'FbaProblem', 'LpProblem'] diff --git a/xbanalysis/problems/fba_problem.py b/xbanalysis/problems/fba_problem.py new file mode 100644 index 0000000000000000000000000000000000000000..ffa4f60290ff8b8d22e70d519c84061894c54d6f --- /dev/null +++ b/xbanalysis/problems/fba_problem.py @@ -0,0 +1,157 @@ +"""Implementation of FBA Problem class. + +Peter Schubert, HHU Duesseldorf, June 2022 +""" + +import numpy as np + +from xbanalysis.problems.lp_problem import LpProblem + + +class FbaProblem: + + def __init__(self, xba_model): + """Initialize FbaProblem from a xba_model. + + raise an exceptions, if fba parameters are not configured in the xba_model + + FBA mass balanced constraints: S * v = 0 + - rows of stoichiometric matrix S: + - all metabolites (incl. boundary/constant metabolites), + - i.e. excluding enzymes + - columns of stoichiometric matrix S: + - all reactions not containing enzymes as reactants or products + - i.e. exlcluding protein synthesis, protein degradation, enzyme transitions + - reversible reactions are not getting split into forward and backward reaction (for now) + + FBA flux bounds: + - lower and upper bounds for reactions extracted from xba_model + - getter and setter methods + + FBA objective: c.T * v: + - extracted from xba_model (fbcObjective), including direction + - getter and setter methods + + :param xba_model: xba model with all required FBA parameters + :type xba_model: XbaModel + """ + self.xba_model = xba_model + + self.metab_sids = [s.id for s in xba_model.species.values() if s.sboterm == 'SBO:0000247'] + metabs = set(self.metab_sids) + self.mrids = [rid for rid, rxn in xba_model.reactions.items() + if len((set(rxn.products) | set(rxn.reactants)) - metabs) == 0] + self.mrid2idx = {mrid: idx for idx, mrid in enumerate(self.mrids)} + + self.s_mat = xba_model.get_stoic_matrix(self.metab_sids, self.mrids) + + self.flux_bounds = np.array([[xba_model.reactions[rid].fbc_lb for rid in self.mrids], + [xba_model.reactions[rid].fbc_ub for rid in self.mrids]]) + + self.obj_coefs = np.zeros(len(self.mrids)) + for oid, fbc_obj in xba_model.fbc_objectives.items(): + if fbc_obj.active is True: + self.obj_id = fbc_obj.id + self.obj_dir = fbc_obj.direction + self.set_obj_coefs(fbc_obj.coefficiants) + break + + def fba_optimize(self): + """FBA optimization of current problem + + Instantiate an LpProblem, configure it and delete it again. + + :return: results information from the optimization + :rtype: dict + """ + lp = LpProblem() + lp.configure(self.obj_coefs, self.flux_bounds, + self.s_mat, np.zeros((2, self.s_mat.shape[0])), self.obj_dir) + res = lp.solve() + del lp + return res + + def pfba_optimize(self, fract_of_optimum=1.0): + """ pfba analysis + + As we are interested in minimizing the absolute value of fluxes, + we split fluxes in forward and reverse fluxes to make them all positive + + :param fract_of_optimum: value between 0.0 and 1.0. + :type fract_of_optimum: float (default 1.0) + :return: + """ + # create and solve the FBA problem with the configured objective function + # for pFBA we need to split fluxes in forward and backward fluxes, with bounds set as such + # that fluxes can only be positive, lb>=0, ub>=0 + + n_cols = len(self.mrids) + lp = LpProblem() + + fwd_bwd_s_mat = np.hstack((self.s_mat, -self.s_mat)) + fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs)) + fwd_bwd_bounds = np.hstack((np.fmax(self.flux_bounds, 0), + np.vstack((np.fmax(-self.flux_bounds[1], 0), + np.fmax(-self.flux_bounds[0], 0))))) + lp.configure(fwd_bwd_obj_coefs, fwd_bwd_bounds, + fwd_bwd_s_mat, np.zeros((2, fwd_bwd_s_mat.shape[0])), self.obj_dir) + res = lp.solve(short_result=True) + # fix FBA objective as constraint with lower bound and minimize total fluxes + if res['simplex_status'] == 0: + row_bds = np.array([fract_of_optimum * res['fun'], np.nan]) + lp.add_constraint(fwd_bwd_obj_coefs, row_bds) + lp.set_obj_coefs(np.ones(2 * n_cols)) + lp.set_obj_dir('minimize') + res = lp.solve() + del lp + res['x'] = res['x'][:n_cols] - res['x'][n_cols:] + res['reduced_costs'] = res['reduced_costs'][:n_cols] - res['reduced_costs'][n_cols:] + return res + + @property + def obj_dir(self): + return self.__obj_dir + + @obj_dir.setter + def obj_dir(self, dir_string): + if 'min' in dir_string: + self.__obj_dir = 'minimize' + else: + self.__obj_dir = 'maximize' + + def get_flux_bounds(self): + """retrieve flux bounds for all reactions in a dict. + + :returns: flux bounds configured in the FBA problem + :rtype: dict (key: reaction id, value: list with two float values for lower/upper bound) + """ + return {mrid: [bds[0], bds[1]] for mrid, bds in zip(self.mrids, self.flux_bounds.T)} + + def set_flux_bounds(self, flux_bounds): + """selectivly set lower and upper flux bounds for given reactions. + + :param flux_bounds: new lower and upper flux bounds selected reactions + :type flux_bounds: dict (key: reaction id, value: list with two float values for lower/upper bound) + """ + for rid, (lb, ub) in flux_bounds.items(): + self.flux_bounds[0, self.mrid2idx[rid]] = lb + self.flux_bounds[1, self.mrid2idx[rid]] = ub + + def get_obj_coefs(self): + """retrieve the coefficients of the FBA objective function. + + :returns: objective coefficients + :rtype: dict (key: reaction id, value: float) + """ + return {rid: self.obj_coefs[idx] for rid, idx in self.mrid2idx.items() + if self.obj_coefs[idx] != 0.0} + + def set_obj_coefs(self, coefficiants): + """set the coefficients of the FBA objective function. + + :param coefficiants: objective coefficients of selected reactions + :type coefficiants: dict (key: reaction id, value: float) + """ + self.obj_coefs = np.zeros(len(self.mrids)) + for rid, coef in coefficiants.items(): + self.obj_coefs[self.mrid2idx[rid]] = coef diff --git a/xbanalysis/problems/gba_problem.py b/xbanalysis/problems/gba_problem.py index 966126cd2c6d3699a58a21506d607f3ed6baa6ef..9924747c0542675e60af42d612e73bf3d7deabd0 100644 --- a/xbanalysis/problems/gba_problem.py +++ b/xbanalysis/problems/gba_problem.py @@ -24,11 +24,8 @@ def cache_last(func): wrapper.__name__ = func.__name__ return wrapper -# TODO: cache last results from rate functions, Jacobians and Hessians # TODO: call get_reaction_rates from solver ? # TODO: split get_reaction_rates, get_reaction_jacs, get_reaction_hesses -# TODO: number of mb equations, enzyme transitions density constraints -# number of constraint from len(var_metab_sids), subenz_x_trans.shape[0], 1 class GbaProblem: @@ -78,11 +75,11 @@ class GbaProblem: # calculate stoichiometric sub-matrices and scale them according to volume # TODO: sparce matrix storage and calculations - specis_x_mrs = self.get_stoic_matrix(self.var_sids, self.mr_rids) - self.dof = specis_x_mrs.shape[1] - np.linalg.matrix_rank(specis_x_mrs) - metab_x_mrs = self.get_stoic_matrix(self.var_metab_sids, self.mr_rids) - self.metab_x_psrs = self.get_stoic_matrix(self.var_metab_sids, self.ps_rids) - self.enz_x_mrs = self.get_stoic_matrix(self.var_enz_sids, self.mr_rids) + species_x_mrs = xba_model.get_stoic_matrix(self.var_sids, self.mr_rids) + self.dof = species_x_mrs.shape[1] - np.linalg.matrix_rank(species_x_mrs) + metab_x_mrs = xba_model.get_stoic_matrix(self.var_metab_sids, self.mr_rids) + self.metab_x_psrs = xba_model.get_stoic_matrix(self.var_metab_sids, self.ps_rids) + self.enz_x_mrs = xba_model.get_stoic_matrix(self.var_enz_sids, self.mr_rids) self.enz_vols = self.var_vols[self.var_mask_enz] metab_vols = self.var_vols[self.var_mask_metab] @@ -120,7 +117,6 @@ class GbaProblem: 'heq_enz_trans': self.subenz_x_trans.shape[0], 'heq_density': 1} - # TODO implement a factory method @cache_last def mras(self, x): return self.fmras(x) @@ -169,27 +165,6 @@ class GbaProblem: def enz_tras_hesss(self, x): return self.fenz_tras_hesss(x) - def get_stoic_matrix(self, sids, rids): - """retrieve stoichiometric sub-matrix [sids x rids]. - - :param sids: species ids - rows of sub-matrix - :type sids: list of strings - :param rids: reaction ids - columns of sub-matrix - :type rids: list of strings - :returns: stoic_mat, stoichmetric sub-matrix - :rtype: np.ndarray - """ - stoic_mat = np.zeros((len(sids), len(rids))) - map_metab_row = {sid: idx for idx, sid in enumerate(sids)} - for col, rid in enumerate(rids): - for sid, stoic in self.xba_model.reactions[rid].reactants.items(): - if sid in map_metab_row: - stoic_mat[map_metab_row[sid], col] = -stoic - for sid, stoic in self.xba_model.reactions[rid].products.items(): - if sid in map_metab_row: - stoic_mat[map_metab_row[sid], col] = stoic - return stoic_mat - def update_model_params(self, key, val): self.model_params[key] = val diff --git a/xbanalysis/problems/lp_problem.py b/xbanalysis/problems/lp_problem.py new file mode 100644 index 0000000000000000000000000000000000000000..90b6dea2eac0d550a963b27ab7663933e3676a38 --- /dev/null +++ b/xbanalysis/problems/lp_problem.py @@ -0,0 +1,289 @@ +"""Implementation of LpProblem (linear programming problem) class. + +built upon swiglpk + +Peter Schubert, HHU Duesseldorf, June 2022 +""" + +import numpy as np +import scipy +import scipy.sparse +import swiglpk as glpk + + +# status reported +simplex_status = { + 0: 'LP problem instance successfully solved', + 1: 'Unable to start search, initial basis specified in the ' + 'problem object is invalid. Number basic variables is not same as the number rows.', + 2: 'Unable to start search, basis matrix corresponding to initial basis is singular.', + 3: 'Unable to start search, basis matrix corresponding to initial basis is ill-conditioned.', + 4: 'Unable to start search, some double-bounded variables have incorrect bounds.', + 5: 'Search prematurely terminated: solver failure.', + 6: 'Search prematurely terminated: objective function being ' + 'maximized reached its lower limit and continues decreasing.', + 7: 'Search prematurely terminated: objective function being ' + 'minimized reached its upper limit and continues increasing.', + 8: 'Search prematurely terminated: simplex iteration limit exceeded.', + 9: 'Search prematurely terminated: time limit exceeded.', + 10: 'LP problem instance has no primal feasible solution.', + 11: 'LP problem instance has no dual feasible solution.'} + +generic_status = {1: 'solution is undefined', + 2: 'solution is feasible', + 3: 'solution is infeasible', + 4: 'problem has no feasible solution', + 5: 'solution is optimal', + 6: 'problem has unbounded solution'} + +dual_status = {1: 'dual solution is undefined', + 2: 'dual solution is feasible', + 3: 'dual solution is infeasible', + 4: 'no dual feasible solution exists'} + +prim_status = {1: 'primal solution is undefined', + 2: 'primal solution is feasible', + 3: 'primal solution is infeasible', + 4: 'no primal feasible solution exists'} + +var_status = {1: 'basic variable', + 2: 'non-basic variable on its lower bound', + 3: 'non-basic variable on its upper bound', + 4: 'non-basic free (unbounded) variable', + 5: 'non-basic fixed variable'} + +scaling_options = {'GM': glpk.GLP_SF_GM, + 'EQ': glpk.GLP_SF_EQ, + '2N': glpk.GLP_SF_2N, + 'SKIP': glpk.GLP_SF_SKIP, + 'AUTO': glpk.GLP_SF_AUTO} + + +class LpProblem: + + def __init__(self): + """create a glpk problem. + + Problem needs to be properly deleted usin del(LpProblem) + """ + self.lp = glpk.glp_create_prob() + self.ncols = 0 + self.nrows = 0 + self.simplex_result = None + self.res = {} + + @staticmethod + def _get_variable_type(bounds): + """Determine variable type for upper/lower bounds. + + :param bounds: upper and lower bounds + :type bounds: 1D ndarray of length 2 + :return var_type: glpk variable type to configure + :rtype: integer constant + :return lb: value of lower bound + :rtype float + :return ub: value of upper bound + :rtype float + """ + lb, ub = bounds + if np.isfinite(lb) and np.isfinite(ub): + if lb < ub: + var_type = glpk.GLP_DB + else: + ub = 0.0 + var_type = glpk.GLP_FX + elif not np.isfinite(lb): + lb = 0.0 + var_type = glpk.GLP_UP + elif not np.isfinite(ub): + ub = 0.0 + var_type = glpk.GLP_LO + else: + lb = 0.0 + ub = 0.0 + var_type = glpk.GLP_FR + return var_type, lb, ub + + def configure(self, obj_coefs, col_bnds, constr_coefs, row_bnds, direction='maximize'): + """Configure the LP Problem. + + Note: glpk related indices start at 1 (not zero) + :param obj_coefs: coefficients of objective function + :type obj_coefs: 1D ndarray of shape (1, ncols) + :param col_bnds: lower and upper bounds of structural (col) variables + :type col_bnds: 2D ndarray of shape(2, ncols) (1st row: lower bounds, 2nd row: upper bounds) + :param constr_coefs: matrix with constraint coefficients + :type constr_coefs: 2D nparray with shape(nrows, ncols) + :param row_bnds: lower and upper bounds of auxhiliary (row) variables + :type row_bnds: 2D ndarray of shape(2, nrows) (1st row: lower bounds, 2nd row: upper bounds) + :param direction: direction for the optimization ('maximize' or 'minimize') + :type direction: string + """ + self.nrows, self.ncols = constr_coefs.shape + + assert len(obj_coefs) == self.ncols + assert col_bnds.shape == (2, self.ncols) + assert row_bnds.shape == (2, self.nrows) + + # configure objective function + glpk.glp_add_cols(self.lp, self.ncols) + self.set_obj_coefs(obj_coefs) + self.set_obj_dir(direction) + + # configure bounds on structural (columns) variables + for i, lb_ub in enumerate(col_bnds.T): + var_type, lb, ub = self._get_variable_type(lb_ub) + glpk.glp_set_col_bnds(self.lp, 1 + i, var_type, lb, ub) + + # configure constraints + glpk.glp_add_rows(self.lp, self.nrows) + for i, lb_ub in enumerate(row_bnds.T): + var_type, lb, ub = self._get_variable_type(lb_ub) + glpk.glp_set_row_bnds(self.lp, 1 + i, var_type, lb, ub) + constr_coefs_sparse = scipy.sparse.coo_matrix(constr_coefs) + n_coefs = len(constr_coefs_sparse.row) + ia = glpk.intArray(1 + n_coefs) + ja = glpk.intArray(1 + n_coefs) + ar = glpk.doubleArray(1 + n_coefs) + for i in range(n_coefs): + ia[1 + i] = int(1 + constr_coefs_sparse.row[i]) + ja[1 + i] = int(1 + constr_coefs_sparse.col[i]) + ar[1 + i] = constr_coefs_sparse.data[i] + glpk.glp_load_matrix(self.lp, n_coefs, ia, ja, ar) + + def replace_constraint(self, row, constr_coefs, row_bnds): + # set/replace one bounded constraint + # + # Parameters: + # row: row id to be replaced + # coeffs: 1D np.array of floats of size number design variables + # coefficients != 0.0 will be configured + # bounds: 1D numpy.ndarray with float of size two + # lists and tuples will be converted to numpy.ndarray + # lower_bound, upper_bound, np.nan and np.inf supported + + assert row <= glpk.glp_get_num_rows(self.lp) + glpk.glp_set_row_bnds(self.lp, row, *self._get_variable_type(row_bnds)) + + n_coef = len(np.nonzero(constr_coefs)[0]) + ind = glpk.intArray(1 + n_coef) + val = glpk.doubleArray(1 + n_coef) + idx = 1 + for i in np.nonzero(constr_coefs)[0]: + ind[idx] = int(1 + i) + val[idx] = constr_coefs[i] + idx += 1 + glpk.glp_set_mat_row(self.lp, row, n_coef, ind, val) + + def add_constraint(self, constr_coefs, row_bnds): + """add a single constraint to the lp problem + + :param constr_coefs: constraint coefficients for the new row + :type constr_coefs: 1d ndarray of length ncols + :param row_bnds: upper and lower bounds of the auxiliary (row) variable + :type row_bnds: 1D ndarray of length 2 + :return: + """ + row = glpk.glp_add_rows(self.lp, 1) + self.replace_constraint(row, constr_coefs, row_bnds) + return row + + def del_constraint(self, row): + # delete one constraint + # + # Parameters: + # row: row id to be replaced + assert row <= glpk.glp_get_num_rows(self.lp) + num = glpk.intArray(1 + 1) + num[1] = row + glpk.glp_del_rows(self.lp, 1, num) + + def set_single_bound(self, react_idx, bounds): + assert react_idx <= self.ncols + if type(bounds) is not np.ndarray: + bounds = np.array(bounds) + glpk.glp_set_col_bnds(self.lp, 1 + react_idx, *self._get_variable_type(bounds)) + + def set_obj_dir(self, direction): + """Configure the optimization direction + + :param direction: direction for the optimization ('maximize' or 'minimize') + :type direction: string + """ + obj_dir = glpk.GLP_MAX if 'max' in direction else glpk.GLP_MIN + glpk.glp_set_obj_dir(self.lp, obj_dir) + + def reset_cost(self, cost_coef=None): + # reset coefficients of objective function (cols) to 0.0 + # + # Parameters: + # cost_coef: None or 1d numpy.ndarray with floats + # None: reset all cost coefficiencs + # ndarray: reset only coest coefficiencs != 0 + if cost_coef is None: + for i in range(glpk.glp_get_num_cols(self.lp)): + glpk.glp_set_obj_coef(self.lp, int(1 + i), 0.0) + else: + for i in np.nonzero(cost_coef)[0]: + glpk.glp_set_obj_coef(self.lp, int(1 + i), 0.0) + + def set_obj_coefs(self, obj_coefs): + """set coefficients for objective function + + :param obj_coefs: coefficients of objective function + :type obj_coefs: 1D ndarray of shape (1, ncols) + """ + for i in range(self.ncols): + glpk.glp_set_obj_coef(self.lp, int(1 + i), obj_coefs[i]) + + def solve(self, short_result=False, scaling=None): + """Solve the Linear Programming Problem. + + :param short_result: short result (only objective value and status) or long result + :type short_result: bool (default:False) + :param scaling: problem scaling options ('GM', 'EQ', '2N', 'SKIP', 'AUTO' + :type scaling: string (default, None) + :return: result information from the optimization stored in a dictionary + :rtype: dict + """ + if scaling is not None: + opt = scaling_options.get(scaling, glpk.GLP_SF_AUTO) + glpk.glp_scale_prob(self.lp, opt) + # sjj = [glpk.glp_get_sjj(self.lp, 1+i) for i in range(glpk.glp_get_num_cols(self.lp))] + # rii = [glpk.glp_get_rii(self.lp, 1+i) for i in range(glpk.glp_get_num_rows(self.lp))] + self.simplex_result = glpk.glp_simplex(self.lp, None) + return self.results(short_result) + + def get_row_value(self, row): + assert row <= glpk.glp_get_num_rows(self.lp) + return glpk.glp_get_row_prim(self.lp, row) + + def results(self, short_result=False): + """Collect results for the optimization + + Alternatively, reduced results, e.g. when doing pFBA + + :param short_result: short result (only objective value and status) or long result + :type short_result: bool (default:False) + :return: result from the FBA optimization + :rtype: dict + """ + self.res = { + 'fun': glpk.glp_get_obj_val(self.lp), + 'simplex_status': self.simplex_result, + 'simplex_message': simplex_status[self.simplex_result], + } + if short_result is False: + self.res['x'] = np.array([glpk.glp_get_col_prim(self.lp, 1 + i) for i in range(self.ncols)]) + self.res['reduced_costs'] = np.array([glpk.glp_get_col_dual(self.lp, 1 + i) for i in range(self.ncols)]) + self.res['shadow_prices'] = np.array([glpk.glp_get_row_dual(self.lp, 1 + i) for i in range(self.nrows)]) + self.res['generic_status'] = generic_status[glpk.glp_get_status(self.lp)] + self.res['primal_status'] = prim_status[glpk.glp_get_prim_stat(self.lp)] + self.res['dual_status'] = dual_status[glpk.glp_get_dual_stat(self.lp)] + return self.res + + def __del__(self): + """Explicitly delete the LpProblem. (can we do without?) + """ + if getattr(self, 'lp'): + glpk.glp_delete_prob(self.lp) diff --git a/xbanalysis/utils/utils.py b/xbanalysis/utils/utils.py index 55258d545f5f57a735c1d225b3575456ddcbc957..7a0d0b14cdfffd7bf523c2dfa2bc96f237e1cc50 100644 --- a/xbanalysis/utils/utils.py +++ b/xbanalysis/utils/utils.py @@ -86,10 +86,11 @@ def get_local_params(reaction): :rtype: dict (key: parameter id, val: value, units, sympy-portected id """ local_params = {} - for lp in record_generator(reaction['localParams']): - params = extract_params(lp) - local_params[params['id']] = float(params['value']) - return local_params + if 'localParams' in reaction: + for lp in record_generator(reaction['localParams']): + params = extract_params(lp) + local_params[params['id']] = float(params['value']) + return local_params # added