diff --git a/setup.py b/setup.py index 4837556d61afef394108028c8f7e3ffde32869b3..7bf9b347f6ea02f9c79d20c0e0393ea196bcd0fd 100755 --- a/setup.py +++ b/setup.py @@ -38,7 +38,7 @@ setup( install_requires=['pandas>=1.3.0', 'numpy >= 1.20.0', 'sympy >= 1.9.0', - 'scipy >= 1.7.0', + 'scipy >= 1.8.1', 'swiglpk >= 5.0.5', 'sbmlxdf >= 0.2.7'], python_requires=">=3.7", diff --git a/xbanalysis/_version.py b/xbanalysis/_version.py index 8003eb4f7157ca8726c8a5872143949575ec4537..0c50296258ff58e250c31da927baa43621e1bfb1 100644 --- a/xbanalysis/_version.py +++ b/xbanalysis/_version.py @@ -1,5 +1,5 @@ """ Definition of version string. """ -__version__ = "0.3.3" +__version__ = "0.3.4" program_name = 'xbanalysis' diff --git a/xbanalysis/model/xba_model.py b/xbanalysis/model/xba_model.py index 056a5c828e081ff12f1950a91b2135d93d3c4939..495dbcbbfd63735400a6f6be97a2847853094ae4 100644 --- a/xbanalysis/model/xba_model.py +++ b/xbanalysis/model/xba_model.py @@ -7,7 +7,7 @@ import os import time import numpy as np import sbmlxdf -from scipy.sparse import coo_matrix +from scipy.sparse import coo_array from .xba_compartment import XbaCompartment from .xba_function import XbaFunction @@ -87,9 +87,8 @@ class XbaModel: :param rids: reaction ids - columns of sub-matrix :type rids: list of strings :returns: stoichmetric sub-matrix in sparse format - :rtype: scipy.sparse.coo_matrix + :rtype: scipy.sparse.coo_array """ - # TODO: sparce matrix manipulation sid2idx = {sid: idx for idx, sid in enumerate(sids)} stoic_data = [] for col_idx, rid in enumerate(rids): @@ -99,7 +98,6 @@ class XbaModel: for sid, stoic in self.reactions[rid].products.items(): if sid in sids: stoic_data.append([sid2idx[sid], col_idx, stoic]) - coo_data = np.array(stoic_data).copy() # ??? - stoic_mat_coo = coo_matrix((coo_data[:, 2], (coo_data[:, 0], coo_data[:, 1])), - shape=(len(sids), len(rids))) + coo_data = np.array(stoic_data) + stoic_mat_coo = coo_array((coo_data[:, 2], coo_data[:, :2].T), shape=(len(sids), len(rids))) return stoic_mat_coo diff --git a/xbanalysis/problems/fba_problem.py b/xbanalysis/problems/fba_problem.py index 7cd2a55034302bad5d15f7d8839feddadb2a45e0..ad2d65995398ed7110fc20d33e45ccea68049809 100644 --- a/xbanalysis/problems/fba_problem.py +++ b/xbanalysis/problems/fba_problem.py @@ -316,5 +316,5 @@ class FbaProblem: return result def __del__(self): - print('deletion of fba_model') + # print('deletion of fba_problem') del self.lp diff --git a/xbanalysis/problems/rba_problem.py b/xbanalysis/problems/rba_problem.py index 071369ae4f8ac2f90f4127901068ef9eab486bcf..dc0fafe6d1bb45bfd278f184b92c3b21922917a2 100644 --- a/xbanalysis/problems/rba_problem.py +++ b/xbanalysis/problems/rba_problem.py @@ -13,8 +13,10 @@ Peter Schubert, HHU Duesseldorf, June 2022 """ import numpy as np +import pandas as pd +from scipy.sparse import coo_array, hstack, vstack -from xbanalysis.solvers.lp_problem import LpProblem +from xbanalysis.solvers.glpk_linear_problem import GlpkLinearProblem class RbaProblem: @@ -24,38 +26,41 @@ class RbaProblem: :param xba_model: """ + self.xba_model = xba_model # metabolic reactions and enzyme transitions (no FBA reactions, no protein synthesis, no degradation) - self.mr_rids = [r.id for r in xba_model.reactions.values() if r.sboterm.is_in_branch('SBO:0000167') - and r.sboterm.is_in_branch('SBO:0000179') is False] - self.mrid2enz = {rid: xba_model.reactions[rid].enzyme for rid in self.mr_rids} - self.enz_sids = [enz_sid for enz_sid in self.mrid2enz.values() if enz_sid is not None] - self.rev_enz_sids = [enz_sid for mr_rid, enz_sid in self.mrid2enz.items() - if enz_sid is not None and xba_model.reactions[mr_rid].reversible] + self.rids = np.array([r.id for r in xba_model.reactions.values() + if r.sboterm.is_in_branch('SBO:0000167') is True and + r.sboterm.is_in_branch('SBO:0000179') is False]) + self.rid2enz = {rid: xba_model.reactions[rid].enzyme for rid in self.rids} + self.enz_sids = np.array([enz_sid for enz_sid in self.rid2enz.values() if enz_sid is not None]) + self.enz_sids_rev = np.array([enz_sid for rid, enz_sid in self.rid2enz.items() + if enz_sid is not None and xba_model.reactions[rid].reversible]) + self.enz2idx = {enz: idx for idx, enz in enumerate(self.enz_sids)} self.processes = {s.rba_process_machine: sid for sid, s in xba_model.species.items() if hasattr(s, 'rba_process_machine')} - self.psenz_rids = [xba_model.species[enz_sid].ps_rid for enz_sid in self.enz_sids] - self.pspm_rids = [xba_model.species[enz_sid].ps_rid for enz_sid in self.processes.values()] self.densities = {cid: c.rba_frac_gcdw for cid, c in xba_model.compartments.items() if hasattr(c, 'rba_frac_gcdw')} # metabolites and enzymes used in metabolic reactions and enzyme transitions - sids = set() - for rid in self.mr_rids: - for sid in xba_model.reactions[rid].reactants: - sids.add(sid) - for sid in xba_model.reactions[rid].products: - sids.add(sid) - used_sids = list(sids) - self.sids = [sid for sid in used_sids if xba_model.species[sid].constant is False] - self.ext_conc = {sid: xba_model.species[sid].initial_conc - for sid in used_sids if xba_model.species[sid].constant is True} - - self.rba_cols = self.mr_rids + self.enz_sids + list(self.processes) - self.rba_rows = (self.sids + list(self.processes) + used_sids = set() + for rid in self.rids: + used_sids |= set(xba_model.reactions[rid].reactants) + used_sids |= set(xba_model.reactions[rid].products) + self.sids = np.array([sid for sid in used_sids if xba_model.species[sid].constant is False]) + self.sid2idx = {sid: idx for idx, sid in enumerate(self.sids)} + self.ext_conc = {sid: xba_model.species[sid].initial_conc for sid in used_sids + if xba_model.species[sid].constant is True} + + self.n_rid = len(self.rids) + self.n_enz = len(self.enz_sids) + self.n_pm = len(self.processes) + + self.rba_cols = list(self.rids) + list(self.enz_sids) + list(self.processes) + self.rba_rows = (list(self.sids) + list(self.processes) + [sid + '_feff' for sid in self.enz_sids] - + [sid + '_reff' for sid in self.rev_enz_sids] + + [sid + '_reff' for sid in self.enz_sids_rev] + list(self.densities)) self.is_rba_model = self.check_rba_model(xba_model) @@ -63,27 +68,26 @@ class RbaProblem: print('Error: not a GBA model - missing parameters') return - fix_m, var_m, row_bnds_m = self.construct_mass_balance(xba_model) - fix_pc, var_pc, row_bnds_pc = self.construct_process_capacities(xba_model) - fix_eff, var_eff, row_bnds_eff = self.construct_efficencies(xba_model) - fix_cd, var_cd, row_bnds_cd = self.construct_densities(xba_model) - self.fix_cd = fix_cd - self.fix_mat = np.vstack((fix_m, fix_pc, fix_eff, fix_cd)) - self.var_mat = np.vstack((var_m, var_pc, var_eff, var_cd)) + fix_m_coo, var_m_coo, row_bnds_m = self._construct_mass_balance() + fix_pc_coo, var_pc_coo, row_bnds_pc = self._construct_process_capacities() + fix_eff_coo, var_eff_coo, row_bnds_eff = self._construct_efficencies() + fix_cd_coo, var_cd_coo, row_bnds_cd = self._construct_densities() + self.fix_cd_coo = fix_cd_coo + self.fix_mat_coo = vstack((fix_m_coo, fix_pc_coo, fix_eff_coo, fix_cd_coo)) + self.var_mat_coo = vstack((var_m_coo, var_pc_coo, var_eff_coo, var_cd_coo)) self.fix_row_bnds = np.vstack((np.zeros_like(row_bnds_m), np.zeros_like(row_bnds_pc), row_bnds_eff, row_bnds_cd)) - self.var_row_bnds = np.vstack((row_bnds_m, row_bnds_pc, - np.zeros_like(row_bnds_eff), np.zeros_like(row_bnds_cd))) + self.var_row_bnds = np.vstack((row_bnds_m, row_bnds_pc, np.zeros_like(row_bnds_eff), + np.zeros_like(row_bnds_cd))) # in RBA we only check feasibility. # Here we maximize enzyme amount and check that objective value e.g. > 1e-10 mmol/gcdw self.obj_dir = 'maximize' - self.obj_coefs = np.hstack((np.zeros(len(self.mr_rids)), - np.ones(len(self.enz_sids) + len(self.processes)))) - col_lbs = np.hstack(([xba_model.reactions[rid].fbc_lb for rid in self.mr_rids], - np.zeros(len(self.enz_sids) + len(self.processes)))) - col_ubs = np.hstack(([xba_model.reactions[rid].fbc_ub for rid in self.mr_rids], - np.nan * np.ones(len(self.enz_sids) + len(self.processes)))) + self.obj_coefs = np.hstack((np.zeros(self.n_rid), np.ones(self.n_enz + self.n_pm))) + col_lbs = np.hstack(([xba_model.reactions[rid].fbc_lb for rid in self.rids], + np.zeros(self.n_enz + self.n_pm))) + col_ubs = np.hstack(([xba_model.reactions[rid].fbc_ub for rid in self.rids], + np.nan * np.ones(self.n_enz + self.n_pm))) self.col_bnds = np.vstack((col_lbs, col_ubs)) self.scaling = '2N' self.max_iter = 20 @@ -100,18 +104,18 @@ class RbaProblem: r_check = {'flux lower bound': 'fbc_lb', 'flux upper bound': 'fbc_ub', 'kapp forward': 'rba_kapp_f'} for check, param in r_check.items(): - missing = [rid for rid in self.mr_rids if hasattr(xba_model.reactions[rid], param) is False] + missing = [rid for rid in self.rids if hasattr(xba_model.reactions[rid], param) is False] if len(missing) > 0: print(f'missing {check} in:', missing) is_rba_model = False - missing = [rid for rid in self.mr_rids if xba_model.reactions[rid].reversible + missing = [rid for rid in self.rids if xba_model.reactions[rid].reversible and hasattr(xba_model.reactions[rid], 'rba_kapp_r') is False] if len(missing) > 0: print('missing kapp reverse in:', missing) is_rba_model = False - missing = [sid for sid in self.sids + self.enz_sids + list(self.processes.values()) + missing = [sid for sid in list(self.sids) + list(self.enz_sids) + list(self.processes.values()) if hasattr(xba_model.species[sid], 'mw') is False] if len(missing) > 0: print('missing molecular weight in:', missing) @@ -128,10 +132,39 @@ class RbaProblem: is_rba_model = False return is_rba_model - def rba_optimize(self): + def check_growth_rate_feasibility(self, gr, short_result=True): + """Check, if a selected growth rate is supported by the model + + create a Linear Programming problem and check its feasibility + + :param gr: current growth rate + :type gr: float + :param short_result: short result (only objective value and status) or long result + :type short_result: bool (optional, default: False) + :return: result of optimization + :rtype: dict """ + constr_coefs_coo = coo_array(self.fix_mat_coo + gr * self.var_mat_coo) + + lp = GlpkLinearProblem(constr_coefs_coo, self.fix_row_bnds, self.col_bnds) + lp.set_obj_coefs(self.obj_coefs) + lp.set_obj_dir(self.obj_dir) + res = lp.solve(short_result, scaling=self.scaling) + del lp + return res + + def rba_optimize(self): + """Resource Balance Analysis - :return: + According to Goelzer, Fromion: + Goelzer et al., 2015. Quantitative prediction of genome-wide resource allocation + in bacteria. Metab Eng, 32, 232-243 + + Stage 1: course range finding of supported growth rate + Stage 2: zooming in on supported growth rate + + :return: result including maximal growth rate + :rtype: dict """ if self.is_rba_model is False: print('Error: not a RBA model - missing parameters') @@ -141,12 +174,7 @@ class RbaProblem: # Preanalysis: find range of growth rates gr = {'lb': 0.0, 'ub': np.nan, 'try': 1.0} for i in range(self.max_iter): - constr_coefs = self.fix_mat + gr['try'] * self.var_mat - # row_bnds = self.fix_row_bnds + gr['try'] * self.var_row_bnds - lp = LpProblem() - lp.configure(self.obj_coefs, constr_coefs, self.fix_row_bnds, self.col_bnds, self.obj_dir) - res = lp.solve(scaling=self.scaling) - del lp + res = self.check_growth_rate_feasibility(gr['try']) if res['success'] and res['fun'] > 1e-10: gr['lb'] = gr['try'] gr['try'] *= 10 @@ -164,12 +192,7 @@ class RbaProblem: # find exact growth rate for i in range(self.max_iter): gr['try'] = 0.5 * (gr['lb'] + gr['ub']) - constr_coefs = self.fix_mat + gr['try'] * self.var_mat - # row_bnds = self.fix_row_bnds + gr['try'] * self.var_row_bnds - lp = LpProblem() - lp.configure(self.obj_coefs, constr_coefs, self.fix_row_bnds, self.col_bnds, self.obj_dir) - res = lp.solve(scaling=self.scaling) - del lp + res = self.check_growth_rate_feasibility(gr['try']) if res['success'] and res['fun'] > 1e-10: gr['lb'] = gr['try'] else: @@ -179,108 +202,166 @@ class RbaProblem: # get optimum values for the final solution gr['try'] = gr['lb'] - constr_coefs = self.fix_mat + gr['try'] * self.var_mat - lp = LpProblem() - lp.configure(self.obj_coefs, constr_coefs, self.fix_row_bnds, self.col_bnds, self.obj_dir) - res = lp.solve(scaling=self.scaling) - del lp + res = self.check_growth_rate_feasibility(gr['try'], short_result=False) res['fun'] = gr['try'] return res - def construct_mass_balance(self, xba_model): - - fix_m_x_mr = xba_model.get_stoic_matrix(self.sids, self.mr_rids) - var_m_x_enz = xba_model.get_stoic_matrix(self.sids, self.psenz_rids) - var_m_x_pm = xba_model.get_stoic_matrix(self.sids, self.pspm_rids) - fix_m = np.hstack((fix_m_x_mr, - np.zeros((len(self.sids), len(self.enz_sids)+len(self.processes))))) - var_m = np.hstack((np.zeros((len(self.sids), len(self.mr_rids))), - var_m_x_enz, - var_m_x_pm)) - targets = np.array([getattr(xba_model.species[sid], 'rba_target', 0.0) for sid in self.sids]) - row_bnds_m = np.vstack((targets, targets)).T - return fix_m, var_m, row_bnds_m - - def construct_process_capacities(self, xba_model): - - var_pc_x_enz = np.zeros((len(self.processes), len(self.psenz_rids))) - for idx, pm in enumerate(self.processes): - var_pc_x_enz[idx] = [xba_model.species[enz_sid].rba_process_costs.get(pm, 0.0) - for enz_sid in self.enz_sids] - var_pc_x_pm = np.zeros((len(self.processes), len(self.processes))) - for idx, pm in enumerate(self.processes): - var_pc_x_pm[idx] = [xba_model.species[enz_sid].rba_process_costs.get(pm, 0.0) - for enz_sid in self.processes.values()] - fix_pc_x_pm = -np.diag([xba_model.species[enz_sid].rba_process_capacity - for enz_sid in self.processes.values()]) * 3600 - fix_pc = np.hstack((np.zeros((len(self.processes), len(self.mr_rids) + len(self.enz_sids))), - fix_pc_x_pm)) - var_pc = np.hstack((np.zeros((len(self.processes), len(self.mr_rids))), - var_pc_x_enz, var_pc_x_pm)) - row_bnds_pc = np.zeros((len(self.processes), 2)) - return fix_pc, var_pc, row_bnds_pc - - def get_efficiencies(self, xba_model): - - efficiencies = np.zeros((2, len(self.enz_sids))) - idx = 0 - for mr_rid, enz_sid in self.mrid2enz.items(): + def _get_efficiencies(self): + """Determine efficiencies for all enzymes """ + + efficiencies = {} + for mr_rid, enz_sid in self.rid2enz.items(): + # check for enzyme catalyzed reactions if enz_sid is not None: - r = xba_model.reactions[mr_rid] + r = self.xba_model.reactions[mr_rid] saturation = 1.0 if hasattr(r, 'rba_km_ext'): for sid in r.reactants: if sid in self.ext_conc: saturation *= self.ext_conc[sid] / (self.ext_conc[sid] + r.rba_km_ext) - efficiencies[0, idx] = r.rba_kapp_f * 3600 * saturation + eff_fwd = r.rba_kapp_f * 3600.0 * saturation saturation = 1.0 if r.reversible: if hasattr(r, 'rba_km_ext'): for sid in r.products: if sid in self.ext_conc: saturation *= self.ext_conc[sid] / (self.ext_conc[sid] + r.rba_km_ext) - efficiencies[1, idx] = r.rba_kapp_r * 3600 * saturation - idx += 1 + eff_rev = r.rba_kapp_r * 3600.0 * saturation + else: + eff_rev = 0.0 + efficiencies[enz_sid] = [eff_fwd, eff_rev] return efficiencies - def construct_efficencies(self, xba_model): - - efficiencies = self.get_efficiencies(xba_model) - mask = np.array([enz_sid is not None for enz_sid in self.mrid2enz.values()]) - fix_feff_x_mr = np.diag(np.ones(len(self.mrid2enz)))[mask] - fix_feff_x_enz = np.diag(-efficiencies[0]) - fix_feff_x_pm = np.zeros((len(self.enz_sids), len(self.processes))) - fix_feff = np.hstack((fix_feff_x_mr, fix_feff_x_enz, fix_feff_x_pm)) - row_bnds_feff = np.vstack((np.ones(len(self.enz_sids)) * np.nan, np.zeros(len(self.enz_sids)))).T - - mask = [enz_sid is not None and xba_model.reactions[mr_rid].reversible - for mr_rid, enz_sid in self.mrid2enz.items()] - fix_reff_x_mr = np.diag(-np.ones(len(self.mrid2enz)))[mask] - mask = [enz_sid in self.rev_enz_sids for enz_sid in self.enz_sids] - fix_reff_x_enz = np.diag(-efficiencies[1])[mask] - fix_reff_x_pm = np.zeros((len(self.rev_enz_sids), len(self.processes))) - fix_reff = np.hstack((fix_reff_x_mr, fix_reff_x_enz, fix_reff_x_pm)) - row_bnds_reff = np.vstack((np.ones(len(self.rev_enz_sids)) * np.nan, np.zeros(len(self.rev_enz_sids)))).T - fix_eff = np.vstack((fix_feff, fix_reff)) - var_eff = np.zeros_like(fix_eff) - row_bnds_eff = np.vstack((row_bnds_feff, row_bnds_reff)) - return fix_eff, var_eff, row_bnds_eff - - def construct_densities(self, xba_model): - fix_cd_x_mr = np.zeros((len(self.densities), len(self.mr_rids))) - fix_cd_x_enz = np.zeros((len(self.densities), len(self.enz_sids))) - for row_idx, cid in enumerate(self.densities): - for col_idx, sid in enumerate(self.enz_sids): - if xba_model.species[sid].compartment == cid: - fix_cd_x_enz[row_idx, col_idx] = xba_model.species[sid].mw / 1000 - fix_cd_x_pm = np.zeros((len(self.densities), len(self.processes))) - for row_idx, cid in enumerate(self.densities): - for col_idx, sid in enumerate(self.processes.values()): - if xba_model.species[sid].compartment == cid: - fix_cd_x_pm[row_idx, col_idx] = xba_model.species[sid].mw / 1000 - fix_cd = np.hstack((fix_cd_x_mr, fix_cd_x_enz, fix_cd_x_pm)) - - var_cd = np.zeros_like(fix_cd) - row_bnds_cd = np.vstack((np.zeros(len(self.densities)), - [val for val in self.densities.values()])).T - return fix_cd, var_cd, row_bnds_cd + def _construct_mass_balance(self): + + n_sid = len(self.sids) + psenz_rids = [self.xba_model.species[enz_sid].ps_rid for enz_sid in self.enz_sids] + pspm_rids = [self.xba_model.species[enz_sid].ps_rid for enz_sid in self.processes.values()] + + fix_m_x_r_coo = self.xba_model.get_stoic_matrix_coo(self.sids, self.rids) + var_m_x_enz_pm_coo = self.xba_model.get_stoic_matrix_coo(self.sids, psenz_rids + pspm_rids) + + fix_m_coo = hstack((fix_m_x_r_coo, coo_array((n_sid, self.n_enz + self.n_pm)))) + var_m_coo = hstack((coo_array((n_sid, self.n_rid)), var_m_x_enz_pm_coo)) + + targets = np.array([getattr(self.xba_model.species[sid], 'rba_target', 0.0) for sid in self.sids]) + row_bnds_m = np.vstack((targets, targets)).T + return fix_m_coo, var_m_coo, row_bnds_m + + def _construct_process_capacities(self): + + pm2idx = {pm: idx for idx, pm in enumerate(self.processes)} + + mat_data = [] + for pm in self.processes: + for col_idx, enz_sid in enumerate(list(self.enz_sids) + list(self.processes.values())): + p_cost = self.xba_model.species[enz_sid].rba_process_costs.get(pm, 0.0) + if p_cost > 0.0: + mat_data.append([pm2idx[pm], self.n_rid + col_idx, p_cost]) + + coo_data = np.array(mat_data) + var_pc_coo = coo_array((coo_data[:, 2], coo_data[:, :2].T), + shape=(self.n_pm, self.n_rid + self.n_enz + self.n_pm)) + mat_data = [] + for pm, enz_sid in self.processes.items(): + capacity = self.xba_model.species[enz_sid].rba_process_capacity * 3600.0 + mat_data.append([pm2idx[pm], self.n_rid + self.n_enz + pm2idx[pm], -capacity]) + coo_data = np.array(mat_data) + fix_pc_coo = coo_array((coo_data[:, 2], coo_data[:, :2].T), + shape=(self.n_pm, self.n_rid + self.n_enz + self.n_pm)) + + row_bnds_pc = np.zeros((self.n_pm, 2)) + return fix_pc_coo, var_pc_coo, row_bnds_pc + + def _construct_efficencies(self): + """Construct enzyme efficiency constraints + + :return: fixed and variable coefficient matrices, row bounds + """ + efficiencies = self._get_efficiencies() + rid2idx = {rid: idx for idx, rid in enumerate(self.rids)} + + mat_data = [] + enz2rid = {enz: rid for rid, enz in self.rid2enz.items()} + n_rows = self.n_enz + len(self.enz_sids_rev) + # forward efficiencies (for all enzyme catalyzed reactions) + for row_idx, enz_sid in enumerate(self.enz_sids): + feff = efficiencies[enz_sid][0] + mat_data.append([row_idx, rid2idx[enz2rid[enz_sid]], 1.0]) + mat_data.append([row_idx, self.n_rid + self.enz2idx[enz_sid], -feff]) + # backward efficiencies (only for reversible catalyzed reactions) + for row_idx, enz_sid in enumerate(self.enz_sids_rev): + reff = efficiencies[enz_sid][1] + mat_data.append([self.n_enz + row_idx, rid2idx[enz2rid[enz_sid]], -1.0]) + mat_data.append([self.n_enz + row_idx, self.n_rid + self.enz2idx[enz_sid], -reff]) + coo_data = np.array(mat_data) + fix_eff_coo = coo_array((coo_data[:, 2], coo_data[:, :2].T), + shape=(n_rows, self.n_rid + self.n_enz + self.n_pm)) + var_eff_coo = coo_array(fix_eff_coo.shape) + # enzyme efficiency inequalities ≤ 0 + row_bnds_eff = np.vstack((np.ones(n_rows) * np.nan, np.zeros(n_rows))).T + return fix_eff_coo, var_eff_coo, row_bnds_eff + + def _construct_densities(self): + cid2idx = {cid: idx for idx, cid in enumerate(self.densities)} + pm2idx = {sid: idx for idx, sid in enumerate(self.processes.values())} + + mat_data = [] + for sid in self.enz_sids: + cid = self.xba_model.species[sid].compartment + if cid in self.densities: + mmol_per_l = self.xba_model.species[sid].mw / 1000 + mat_data.append([cid2idx[cid], self.n_rid + self.enz2idx[sid], mmol_per_l]) + for sid in self.processes.values(): + cid = self.xba_model.species[sid].compartment + if cid in self.densities: + mmol_per_l = self.xba_model.species[sid].mw / 1000 + mat_data.append([cid2idx[cid], self.n_rid + self.n_enz + pm2idx[sid], mmol_per_l]) + coo_data = np.array(mat_data) + fix_cd_coo = coo_array((coo_data[:, 2], coo_data[:, :2].T), + shape=(len(cid2idx), self.n_rid + self.n_enz + self.n_pm)) + var_cd_coo = coo_array(fix_cd_coo.shape) + # density inequality constraints: 0 ≤ constraint ≤ max_density + row_bnds_cd = np.vstack((np.zeros(len(cid2idx)), [val for val in self.densities.values()])).T + return fix_cd_coo, var_cd_coo, row_bnds_cd + + def process_result(self, res): + """Helper, to convert RBA result into some DataFrames + + :param res: result from a previous RBA optimization + :type res: dict + :return: fluxes and enzyme information + :rtype: two pandas DataFrames + """ + + rho = 300 + if 'rho' in self.xba_model.parameters: + rho = self.xba_model.parameters['rho'].value + + # identify the biomass reaction name for Escher Map integration + fba_rid = 'biomass' + for rid in self.xba_model.reactions: + if repr(self.xba_model.reactions[rid].sboterm) == 'SBO:0000629': + fba_rid = rid + # retrieve translational capacity of the ribosome + tl_flux = 0.0 + for pm_idx, enz_sid in enumerate(self.processes.values()): + if repr(self.xba_model.species[enz_sid].sboterm) == 'SBO:0000250': + tl_capacity = self.xba_model.species[enz_sid].rba_process_capacity * 3600.0 + ribo_conc = res['x'][self.n_rid + self.n_enz + pm_idx] + tl_flux = ribo_conc * tl_capacity + break + + df_fluxes = pd.DataFrame(np.hstack((res['x'][:len(self.rids)], tl_flux)), + index=list(self.rids) + [fba_rid], columns=['mmol gDW-1 h-1']) + df_fluxes.index.name = 'reaction' + + enz_sids = list(self.enz_sids) + list(self.processes.values()) + enz_mmol_per_l = res['x'][len(self.rids):] * rho + enz_g_per_mmol = np.sum(self.fix_cd_coo.toarray()[:, len(self.rids):], axis=0) + enz_g_per_l = enz_mmol_per_l * enz_g_per_mmol + df_enz = pd.DataFrame(np.vstack((enz_mmol_per_l, enz_g_per_l)).T, + index=enz_sids, columns=['mmol/l', 'g/l']) + df_enz.index.name = 'enzyme' + + return df_fluxes, df_enz diff --git a/xbanalysis/problems/rba_problem_coo.py b/xbanalysis/problems/rba_problem_coo.py deleted file mode 100644 index 071369ae4f8ac2f90f4127901068ef9eab486bcf..0000000000000000000000000000000000000000 --- a/xbanalysis/problems/rba_problem_coo.py +++ /dev/null @@ -1,286 +0,0 @@ -"""Implementation of GBA Problem class. - - - several process machines supported (yet to be validated) - - non-enzymatic metabolic reactions supported (yet to be validated) - - Yet to implement: - - enzyme degradation - - enzyme transitions - - several compartments constraints - - target concentrations and target fluxes - -Peter Schubert, HHU Duesseldorf, June 2022 -""" - -import numpy as np - -from xbanalysis.solvers.lp_problem import LpProblem - - -class RbaProblem: - - def __init__(self, xba_model): - """ - - :param xba_model: - """ - self.xba_model = xba_model - - # metabolic reactions and enzyme transitions (no FBA reactions, no protein synthesis, no degradation) - self.mr_rids = [r.id for r in xba_model.reactions.values() if r.sboterm.is_in_branch('SBO:0000167') - and r.sboterm.is_in_branch('SBO:0000179') is False] - self.mrid2enz = {rid: xba_model.reactions[rid].enzyme for rid in self.mr_rids} - self.enz_sids = [enz_sid for enz_sid in self.mrid2enz.values() if enz_sid is not None] - self.rev_enz_sids = [enz_sid for mr_rid, enz_sid in self.mrid2enz.items() - if enz_sid is not None and xba_model.reactions[mr_rid].reversible] - self.processes = {s.rba_process_machine: sid for sid, s in xba_model.species.items() - if hasattr(s, 'rba_process_machine')} - self.psenz_rids = [xba_model.species[enz_sid].ps_rid for enz_sid in self.enz_sids] - self.pspm_rids = [xba_model.species[enz_sid].ps_rid for enz_sid in self.processes.values()] - self.densities = {cid: c.rba_frac_gcdw for cid, c in xba_model.compartments.items() - if hasattr(c, 'rba_frac_gcdw')} - - # metabolites and enzymes used in metabolic reactions and enzyme transitions - sids = set() - for rid in self.mr_rids: - for sid in xba_model.reactions[rid].reactants: - sids.add(sid) - for sid in xba_model.reactions[rid].products: - sids.add(sid) - used_sids = list(sids) - self.sids = [sid for sid in used_sids if xba_model.species[sid].constant is False] - self.ext_conc = {sid: xba_model.species[sid].initial_conc - for sid in used_sids if xba_model.species[sid].constant is True} - - self.rba_cols = self.mr_rids + self.enz_sids + list(self.processes) - self.rba_rows = (self.sids + list(self.processes) - + [sid + '_feff' for sid in self.enz_sids] - + [sid + '_reff' for sid in self.rev_enz_sids] - + list(self.densities)) - - self.is_rba_model = self.check_rba_model(xba_model) - if self.is_rba_model is False: - print('Error: not a GBA model - missing parameters') - return - - fix_m, var_m, row_bnds_m = self.construct_mass_balance(xba_model) - fix_pc, var_pc, row_bnds_pc = self.construct_process_capacities(xba_model) - fix_eff, var_eff, row_bnds_eff = self.construct_efficencies(xba_model) - fix_cd, var_cd, row_bnds_cd = self.construct_densities(xba_model) - self.fix_cd = fix_cd - self.fix_mat = np.vstack((fix_m, fix_pc, fix_eff, fix_cd)) - self.var_mat = np.vstack((var_m, var_pc, var_eff, var_cd)) - self.fix_row_bnds = np.vstack((np.zeros_like(row_bnds_m), np.zeros_like(row_bnds_pc), - row_bnds_eff, row_bnds_cd)) - self.var_row_bnds = np.vstack((row_bnds_m, row_bnds_pc, - np.zeros_like(row_bnds_eff), np.zeros_like(row_bnds_cd))) - - # in RBA we only check feasibility. - # Here we maximize enzyme amount and check that objective value e.g. > 1e-10 mmol/gcdw - self.obj_dir = 'maximize' - self.obj_coefs = np.hstack((np.zeros(len(self.mr_rids)), - np.ones(len(self.enz_sids) + len(self.processes)))) - col_lbs = np.hstack(([xba_model.reactions[rid].fbc_lb for rid in self.mr_rids], - np.zeros(len(self.enz_sids) + len(self.processes)))) - col_ubs = np.hstack(([xba_model.reactions[rid].fbc_ub for rid in self.mr_rids], - np.nan * np.ones(len(self.enz_sids) + len(self.processes)))) - self.col_bnds = np.vstack((col_lbs, col_ubs)) - self.scaling = '2N' - self.max_iter = 20 - - def check_rba_model(self, xba_model): - """Check if rba related data available in the XBA model - - :param xba_model: Xba model with data extracted from SBML file - :type xba_model: XbaModel - :return: indicator if model contains data required for RBA problem formulation - :rtype: bool - """ - is_rba_model = True - - r_check = {'flux lower bound': 'fbc_lb', 'flux upper bound': 'fbc_ub', 'kapp forward': 'rba_kapp_f'} - for check, param in r_check.items(): - missing = [rid for rid in self.mr_rids if hasattr(xba_model.reactions[rid], param) is False] - if len(missing) > 0: - print(f'missing {check} in:', missing) - is_rba_model = False - - missing = [rid for rid in self.mr_rids if xba_model.reactions[rid].reversible - and hasattr(xba_model.reactions[rid], 'rba_kapp_r') is False] - if len(missing) > 0: - print('missing kapp reverse in:', missing) - is_rba_model = False - - missing = [sid for sid in self.sids + self.enz_sids + list(self.processes.values()) - if hasattr(xba_model.species[sid], 'mw') is False] - if len(missing) > 0: - print('missing molecular weight in:', missing) - is_rba_model = False - - missing = [sid for sid in list(self.processes.values()) - if hasattr(xba_model.species[sid], 'rba_process_capacity') is False] - if len(missing) > 0: - print('missing process capacity in:', missing) - is_rba_model = False - - if len(self.densities) == 0: - print('missing a density constraint') - is_rba_model = False - return is_rba_model - - def rba_optimize(self): - """ - - :return: - """ - if self.is_rba_model is False: - print('Error: not a RBA model - missing parameters') - res = {'success': False, 'fun': 0.0, 'message': 'not an RBA model'} - return res - - # Preanalysis: find range of growth rates - gr = {'lb': 0.0, 'ub': np.nan, 'try': 1.0} - for i in range(self.max_iter): - constr_coefs = self.fix_mat + gr['try'] * self.var_mat - # row_bnds = self.fix_row_bnds + gr['try'] * self.var_row_bnds - lp = LpProblem() - lp.configure(self.obj_coefs, constr_coefs, self.fix_row_bnds, self.col_bnds, self.obj_dir) - res = lp.solve(scaling=self.scaling) - del lp - if res['success'] and res['fun'] > 1e-10: - gr['lb'] = gr['try'] - gr['try'] *= 10 - else: - if gr['try'] <= 1e-4: - break - gr['ub'] = gr['try'] - gr['try'] /= 10 - if gr['lb'] > 0.0 and np.isfinite(gr['ub']): - break - if gr['lb'] == 0.0 or np.isnan(gr['ub']): - res = {'success': False, 'fun': 0.0, 'message': 'no solution found'} - return res - - # find exact growth rate - for i in range(self.max_iter): - gr['try'] = 0.5 * (gr['lb'] + gr['ub']) - constr_coefs = self.fix_mat + gr['try'] * self.var_mat - # row_bnds = self.fix_row_bnds + gr['try'] * self.var_row_bnds - lp = LpProblem() - lp.configure(self.obj_coefs, constr_coefs, self.fix_row_bnds, self.col_bnds, self.obj_dir) - res = lp.solve(scaling=self.scaling) - del lp - if res['success'] and res['fun'] > 1e-10: - gr['lb'] = gr['try'] - else: - gr['ub'] = gr['try'] - if (gr['ub'] - gr['lb']) < gr['try'] * 1e-5: - break - - # get optimum values for the final solution - gr['try'] = gr['lb'] - constr_coefs = self.fix_mat + gr['try'] * self.var_mat - lp = LpProblem() - lp.configure(self.obj_coefs, constr_coefs, self.fix_row_bnds, self.col_bnds, self.obj_dir) - res = lp.solve(scaling=self.scaling) - del lp - res['fun'] = gr['try'] - return res - - def construct_mass_balance(self, xba_model): - - fix_m_x_mr = xba_model.get_stoic_matrix(self.sids, self.mr_rids) - var_m_x_enz = xba_model.get_stoic_matrix(self.sids, self.psenz_rids) - var_m_x_pm = xba_model.get_stoic_matrix(self.sids, self.pspm_rids) - fix_m = np.hstack((fix_m_x_mr, - np.zeros((len(self.sids), len(self.enz_sids)+len(self.processes))))) - var_m = np.hstack((np.zeros((len(self.sids), len(self.mr_rids))), - var_m_x_enz, - var_m_x_pm)) - targets = np.array([getattr(xba_model.species[sid], 'rba_target', 0.0) for sid in self.sids]) - row_bnds_m = np.vstack((targets, targets)).T - return fix_m, var_m, row_bnds_m - - def construct_process_capacities(self, xba_model): - - var_pc_x_enz = np.zeros((len(self.processes), len(self.psenz_rids))) - for idx, pm in enumerate(self.processes): - var_pc_x_enz[idx] = [xba_model.species[enz_sid].rba_process_costs.get(pm, 0.0) - for enz_sid in self.enz_sids] - var_pc_x_pm = np.zeros((len(self.processes), len(self.processes))) - for idx, pm in enumerate(self.processes): - var_pc_x_pm[idx] = [xba_model.species[enz_sid].rba_process_costs.get(pm, 0.0) - for enz_sid in self.processes.values()] - fix_pc_x_pm = -np.diag([xba_model.species[enz_sid].rba_process_capacity - for enz_sid in self.processes.values()]) * 3600 - fix_pc = np.hstack((np.zeros((len(self.processes), len(self.mr_rids) + len(self.enz_sids))), - fix_pc_x_pm)) - var_pc = np.hstack((np.zeros((len(self.processes), len(self.mr_rids))), - var_pc_x_enz, var_pc_x_pm)) - row_bnds_pc = np.zeros((len(self.processes), 2)) - return fix_pc, var_pc, row_bnds_pc - - def get_efficiencies(self, xba_model): - - efficiencies = np.zeros((2, len(self.enz_sids))) - idx = 0 - for mr_rid, enz_sid in self.mrid2enz.items(): - if enz_sid is not None: - r = xba_model.reactions[mr_rid] - saturation = 1.0 - if hasattr(r, 'rba_km_ext'): - for sid in r.reactants: - if sid in self.ext_conc: - saturation *= self.ext_conc[sid] / (self.ext_conc[sid] + r.rba_km_ext) - efficiencies[0, idx] = r.rba_kapp_f * 3600 * saturation - saturation = 1.0 - if r.reversible: - if hasattr(r, 'rba_km_ext'): - for sid in r.products: - if sid in self.ext_conc: - saturation *= self.ext_conc[sid] / (self.ext_conc[sid] + r.rba_km_ext) - efficiencies[1, idx] = r.rba_kapp_r * 3600 * saturation - idx += 1 - return efficiencies - - def construct_efficencies(self, xba_model): - - efficiencies = self.get_efficiencies(xba_model) - mask = np.array([enz_sid is not None for enz_sid in self.mrid2enz.values()]) - fix_feff_x_mr = np.diag(np.ones(len(self.mrid2enz)))[mask] - fix_feff_x_enz = np.diag(-efficiencies[0]) - fix_feff_x_pm = np.zeros((len(self.enz_sids), len(self.processes))) - fix_feff = np.hstack((fix_feff_x_mr, fix_feff_x_enz, fix_feff_x_pm)) - row_bnds_feff = np.vstack((np.ones(len(self.enz_sids)) * np.nan, np.zeros(len(self.enz_sids)))).T - - mask = [enz_sid is not None and xba_model.reactions[mr_rid].reversible - for mr_rid, enz_sid in self.mrid2enz.items()] - fix_reff_x_mr = np.diag(-np.ones(len(self.mrid2enz)))[mask] - mask = [enz_sid in self.rev_enz_sids for enz_sid in self.enz_sids] - fix_reff_x_enz = np.diag(-efficiencies[1])[mask] - fix_reff_x_pm = np.zeros((len(self.rev_enz_sids), len(self.processes))) - fix_reff = np.hstack((fix_reff_x_mr, fix_reff_x_enz, fix_reff_x_pm)) - row_bnds_reff = np.vstack((np.ones(len(self.rev_enz_sids)) * np.nan, np.zeros(len(self.rev_enz_sids)))).T - fix_eff = np.vstack((fix_feff, fix_reff)) - var_eff = np.zeros_like(fix_eff) - row_bnds_eff = np.vstack((row_bnds_feff, row_bnds_reff)) - return fix_eff, var_eff, row_bnds_eff - - def construct_densities(self, xba_model): - fix_cd_x_mr = np.zeros((len(self.densities), len(self.mr_rids))) - fix_cd_x_enz = np.zeros((len(self.densities), len(self.enz_sids))) - for row_idx, cid in enumerate(self.densities): - for col_idx, sid in enumerate(self.enz_sids): - if xba_model.species[sid].compartment == cid: - fix_cd_x_enz[row_idx, col_idx] = xba_model.species[sid].mw / 1000 - fix_cd_x_pm = np.zeros((len(self.densities), len(self.processes))) - for row_idx, cid in enumerate(self.densities): - for col_idx, sid in enumerate(self.processes.values()): - if xba_model.species[sid].compartment == cid: - fix_cd_x_pm[row_idx, col_idx] = xba_model.species[sid].mw / 1000 - fix_cd = np.hstack((fix_cd_x_mr, fix_cd_x_enz, fix_cd_x_pm)) - - var_cd = np.zeros_like(fix_cd) - row_bnds_cd = np.vstack((np.zeros(len(self.densities)), - [val for val in self.densities.values()])).T - return fix_cd, var_cd, row_bnds_cd diff --git a/xbanalysis/solvers/__init__.py b/xbanalysis/solvers/__init__.py index bee7a56080a80855f0258e4ef44072aa3f7a4f14..5907eaa15781be96589add06e40e15e15185ef89 100644 --- a/xbanalysis/solvers/__init__.py +++ b/xbanalysis/solvers/__init__.py @@ -1,7 +1,6 @@ """Subpackage with XBA model classes """ from .glpk_linear_problem import GlpkLinearProblem -from .lp_problem import LpProblem from .gba_ipopt_problem import GbaIpoptProblem -__all__ = ['GlpkLinearProblem', 'GbaIpoptProblem', 'LpProblem'] +__all__ = ['GlpkLinearProblem', 'GbaIpoptProblem'] diff --git a/xbanalysis/solvers/lp_problem.py b/xbanalysis/solvers/lp_problem.py deleted file mode 100644 index 2e1479c67d33e82d09cf79858f26984e193243d5..0000000000000000000000000000000000000000 --- a/xbanalysis/solvers/lp_problem.py +++ /dev/null @@ -1,303 +0,0 @@ -"""Implementation of LpProblem (linear programming problem) class. - -built upon swiglpk - -Peter Schubert, HHU Duesseldorf, June 2022 -""" - -import numpy as np -import scipy -import scipy.sparse -import swiglpk as glpk - - -# status reported -simplex_status = { - 0: 'LP problem instance successfully solved', - glpk.GLP_EBADB: 'Unable to start search, initial basis specified in the ' - 'problem object is invalid. Number basic variables is not same as the number rows.', - glpk.GLP_ESING: 'Unable to start search, basis matrix corresponding to initial basis is singular.', - glpk.GLP_ECOND: 'Unable to start search, basis matrix corresponding to initial basis is ill-conditioned.', - glpk.GLP_EBOUND: 'Unable to start search, some double-bounded variables have incorrect bounds.', - glpk.GLP_EFAIL: 'Search prematurely terminated: solver failure.', - glpk.GLP_EOBJLL: 'Search prematurely terminated: objective function being ' - 'maximized reached its lower limit and continues decreasing.', - glpk.GLP_EOBJUL: 'Search prematurely terminated: objective function being ' - 'minimized reached its upper limit and continues increasing.', - glpk.GLP_EITLIM: 'Search prematurely terminated: simplex iteration limit exceeded.', - glpk.GLP_ETMLIM: 'Search prematurely terminated: time limit exceeded.', - glpk.GLP_ENOPFS: 'LP problem instance has no primal feasible solution.', - glpk.GLP_ENODFS: 'LP problem instance has no dual feasible solution.'} - -generic_status = {glpk.GLP_UNDEF: 'solution is undefined', - glpk.GLP_FEAS: 'solution is feasible', - glpk.GLP_INFEAS: 'solution is infeasible', - glpk.GLP_NOFEAS: 'problem has no feasible solution', - glpk.GLP_OPT: 'solution is optimal', - glpk.GLP_UNBND: 'problem has unbounded solution'} - -dual_status = {glpk.GLP_UNDEF: 'dual solution is undefined', - glpk.GLP_FEAS: 'dual solution is feasible', - glpk.GLP_INFEAS: 'dual solution is infeasible', - glpk.GLP_NOFEAS: 'no dual feasible solution exists'} - -prim_status = {glpk.GLP_UNDEF: 'primal solution is undefined', - glpk.GLP_FEAS: 'primal solution is feasible', - glpk.GLP_INFEAS: 'primal solution is infeasible', - glpk.GLP_NOFEAS: 'no primal feasible solution exists'} - -var_status = {glpk.GLP_BS: 'basic variable', - glpk.GLP_NL: 'non-basic variable on its lower bound', - glpk.GLP_NU: 'non-basic variable on its upper bound', - glpk.GLP_NF: 'non-basic free (unbounded) variable', - glpk.GLP_NS: 'non-basic fixed variable'} - -scaling_options = {'GM': glpk.GLP_SF_GM, - 'EQ': glpk.GLP_SF_EQ, - '2N': glpk.GLP_SF_2N, - 'SKIP': glpk.GLP_SF_SKIP, - 'AUTO': glpk.GLP_SF_AUTO} - - -class LpProblem: - - def __init__(self): - """create a glpk problem. - - Problem needs to be properly deleted usin del(LpProblem) - """ - self.lp = glpk.glp_create_prob() - self.ncols = 0 - self.nrows = 0 - self.msg_lev = glpk.GLP_MSG_ERR - self.tm_lim = 10 * 1000 - self.res = {} - - @staticmethod - def _get_variable_type(bounds): - """Determine variable type for upper/lower bounds. - - :param bounds: upper and lower bounds - :type bounds: 1D ndarray of length 2 - :return var_type: glpk variable type to configure - :rtype: integer constant - :return lb: value of lower bound - :rtype float - :return ub: value of upper bound - :rtype float - """ - lb, ub = bounds - if np.isfinite(lb) and np.isfinite(ub): - if lb < ub: - var_type = glpk.GLP_DB - else: - ub = 0.0 - var_type = glpk.GLP_FX - elif not np.isfinite(lb) and not np.isfinite(ub): - lb = 0.0 - ub = 0.0 - var_type = glpk.GLP_FR - elif np.isfinite(ub): - lb = 0.0 - var_type = glpk.GLP_UP - else: - ub = 0.0 - var_type = glpk.GLP_LO - return var_type, lb, ub - - def configure(self, obj_coefs, constr_coefs, row_bnds, col_bnds, direction='maximize'): - """Configure the LP Problem. - - Note: glpk related indices start at 1 (not zero) - :param obj_coefs: coefficients of objective function - :type obj_coefs: 1D ndarray of shape (1, ncols) - :param constr_coefs: matrix with constraint coefficients - :type constr_coefs: 2D nparray with shape(nrows, ncols) - :param row_bnds: lower and upper bounds of auxhiliary (row) variables - :type row_bnds: 2D ndarray of shape(nrows, 2) (1st col: lower bounds, 2nd col: upper bounds) - :param col_bnds: lower and upper bounds of structural (col) variables - :type col_bnds: 2D ndarray of shape(2, ncols) (1st row: lower bounds, 2nd row: upper bounds) - :param direction: direction for the optimization ('maximize' or 'minimize') - :type direction: string - """ - self.nrows, self.ncols = constr_coefs.shape - - assert len(obj_coefs) == self.ncols - assert col_bnds.shape == (2, self.ncols) - assert row_bnds.shape == (self.nrows, 2) - - # configure objective function - glpk.glp_add_cols(self.lp, self.ncols) - self.set_obj_coefs(obj_coefs) - self.set_obj_dir(direction) - - # configure bounds on structural (columns) variables - for i, lb_ub in enumerate(col_bnds.T): - var_type, lb, ub = self._get_variable_type(lb_ub) - glpk.glp_set_col_bnds(self.lp, 1 + i, var_type, lb, ub) - - # configure constraints - glpk.glp_add_rows(self.lp, self.nrows) - for i, lb_ub in enumerate(row_bnds): - var_type, lb, ub = self._get_variable_type(lb_ub) - glpk.glp_set_row_bnds(self.lp, 1 + i, var_type, lb, ub) - constr_coefs_sparse = scipy.sparse.coo_matrix(constr_coefs) - n_coefs = len(constr_coefs_sparse.row) - ia = glpk.intArray(1 + n_coefs) - ja = glpk.intArray(1 + n_coefs) - ar = glpk.doubleArray(1 + n_coefs) - for i in range(n_coefs): - ia[1 + i] = int(1 + constr_coefs_sparse.row[i]) - ja[1 + i] = int(1 + constr_coefs_sparse.col[i]) - ar[1 + i] = constr_coefs_sparse.data[i] - glpk.glp_load_matrix(self.lp, n_coefs, ia, ja, ar) - - def replace_constraint(self, row, constr_coefs, row_bnds): - # set/replace one bounded constraint - # - # Parameters: - # row: row id to be replaced - # coeffs: 1D np.array of floats of size number design variables - # coefficients != 0.0 will be configured - # bounds: 1D numpy.ndarray with float of size two - # lists and tuples will be converted to numpy.ndarray - # lower_bound, upper_bound, np.nan and np.inf supported - - assert row <= glpk.glp_get_num_rows(self.lp) - glpk.glp_set_row_bnds(self.lp, row, *self._get_variable_type(row_bnds)) - - n_coef = len(np.nonzero(constr_coefs)[0]) - ind = glpk.intArray(1 + n_coef) - val = glpk.doubleArray(1 + n_coef) - idx = 1 - for i in np.nonzero(constr_coefs)[0]: - ind[idx] = int(1 + i) - val[idx] = constr_coefs[i] - idx += 1 - glpk.glp_set_mat_row(self.lp, row, n_coef, ind, val) - - def add_constraint(self, constr_coefs, row_bnds): - """add a single constraint to the lp problem - - :param constr_coefs: constraint coefficients for the new row - :type constr_coefs: 1d ndarray of length ncols - :param row_bnds: upper and lower bounds of the auxiliary (row) variable - :type row_bnds: 1D ndarray of length 2 - :return: - """ - row = glpk.glp_add_rows(self.lp, 1) - self.replace_constraint(row, constr_coefs, row_bnds) - return row - - def del_constraint(self, row): - # delete one constraint - # - # Parameters: - # row: row id to be replaced - assert row <= glpk.glp_get_num_rows(self.lp) - num = glpk.intArray(1 + 1) - num[1] = row - glpk.glp_del_rows(self.lp, 1, num) - - def set_single_bound(self, react_idx, bounds): - assert react_idx <= self.ncols - if type(bounds) is not np.ndarray: - bounds = np.array(bounds) - glpk.glp_set_col_bnds(self.lp, 1 + react_idx, *self._get_variable_type(bounds)) - - def set_obj_dir(self, direction): - """Configure the optimization direction - - :param direction: direction for the optimization ('maximize' or 'minimize') - :type direction: string - """ - obj_dir = glpk.GLP_MAX if 'max' in direction else glpk.GLP_MIN - glpk.glp_set_obj_dir(self.lp, obj_dir) - - def reset_cost(self, cost_coef=None): - # reset coefficients of objective function (cols) to 0.0 - # - # Parameters: - # cost_coef: None or 1d numpy.ndarray with floats - # None: reset all cost coefficiencs - # ndarray: reset only coest coefficiencs != 0 - if cost_coef is None: - for i in range(glpk.glp_get_num_cols(self.lp)): - glpk.glp_set_obj_coef(self.lp, int(1 + i), 0.0) - else: - for i in np.nonzero(cost_coef)[0]: - glpk.glp_set_obj_coef(self.lp, int(1 + i), 0.0) - - def set_obj_coefs(self, obj_coefs): - """set coefficients for objective function - - :param obj_coefs: coefficients of objective function - :type obj_coefs: 1D ndarray of shape (1, ncols) - """ - for i in range(self.ncols): - glpk.glp_set_obj_coef(self.lp, int(1 + i), obj_coefs[i]) - - def solve(self, short_result=False, scaling=None): - """Solve the Linear Programming Problem. - - :param short_result: short result (only objective value and status) or long result - :type short_result: bool (default:False) - :param scaling: problem scaling options ('GM', 'EQ', '2N', 'SKIP', 'AUTO' - :type scaling: string (default, None) - :return: result information from the optimization stored in a dictionary - :rtype: dict - """ - smcp = glpk.glp_smcp() - glpk.glp_init_smcp(smcp) - smcp.msg_lev = self.msg_lev - smcp.tm_lim = self.tm_lim - - if scaling is not None: - opt = scaling_options.get(scaling, glpk.GLP_SF_AUTO) - glpk.glp_scale_prob(self.lp, opt) - simplex_result = glpk.glp_simplex(self.lp, smcp) - return self.results(simplex_result, short_result) - - def get_row_value(self, row): - """Retrieve right-hand side of a constraint (auxiliary variable) - - :param row: index into constraints - :type row: int - :return: value of auxiliary varibale (value of rhs) - :rtype: float - """ - assert row <= glpk.glp_get_num_rows(self.lp) - return glpk.glp_get_row_prim(self.lp, row) - - def results(self, simplex_result, short_result=False): - """Collect results for the optimization - - Alternatively, reduced results, e.g. when doing pFBA - - :param simplex_result: result from the LP solver - :type simplex_result: int - :param short_result: short result (only objective value and status) or long result - :type short_result: bool (default:False) - :return: result from the FBA optimization - :rtype: dict - """ - self.res = { - 'fun': glpk.glp_get_obj_val(self.lp), - 'success': glpk.glp_get_status(self.lp) == glpk.GLP_OPT, - } - if short_result is False: - self.res['x'] = np.array([glpk.glp_get_col_prim(self.lp, 1 + i) for i in range(self.ncols)]) - self.res['reduced_costs'] = np.array([glpk.glp_get_col_dual(self.lp, 1 + i) for i in range(self.ncols)]) - self.res['shadow_prices'] = np.array([glpk.glp_get_row_dual(self.lp, 1 + i) for i in range(self.nrows)]) - self.res['simplex_result'] = simplex_result - self.res['simplex_message'] = simplex_status[simplex_result], - self.res['generic_status'] = generic_status[glpk.glp_get_status(self.lp)] - self.res['primal_status'] = prim_status[glpk.glp_get_prim_stat(self.lp)] - self.res['dual_status'] = dual_status[glpk.glp_get_dual_stat(self.lp)] - return self.res - - def __del__(self): - """Explicitly delete the LpProblem. (can we do without?) - """ - if getattr(self, 'lp'): - glpk.glp_delete_prob(self.lp)