diff --git a/xbanalysis/problems/fba_problem.py b/xbanalysis/problems/fba_problem.py index 6d79492e6a1011f789877eaa00c58e973093d833..2f6d4219f34ba26a0fce0f79fa37ce9d78a4cc3d 100644 --- a/xbanalysis/problems/fba_problem.py +++ b/xbanalysis/problems/fba_problem.py @@ -4,7 +4,6 @@ Peter Schubert, HHU Duesseldorf, June 2022 """ import numpy as np -import pandas as pd from xbanalysis.problems.lp_problem import LpProblem @@ -36,6 +35,7 @@ class FbaProblem: :param xba_model: xba model with all required FBA parameters :type xba_model: XbaModel """ + self.is_fba_model = False self.xba_model = xba_model self.metab_sids = [s.id for s in xba_model.species.values() if s.sboterm == 'SBO:0000247'] @@ -44,12 +44,19 @@ class FbaProblem: self.mrids = [rid for rid, rxn in xba_model.reactions.items() if len((set(rxn.products) | set(rxn.reactants)) - metabs) == 0] self.mrid2idx = {mrid: idx for idx, mrid in enumerate(self.mrids)} - + self.reversible = np.array([self.xba_model.reactions[rid].reversible for rid in self.mrids]) self.s_mat = xba_model.get_stoic_matrix(self.metab_sids, self.mrids) - self.flux_bounds = np.array([[xba_model.reactions[rid].fbc_lb for rid in self.mrids], - [xba_model.reactions[rid].fbc_ub for rid in self.mrids]]) + try: + self.flux_bounds = np.array([[xba_model.reactions[rid].fbc_lb for rid in self.mrids], + [xba_model.reactions[rid].fbc_ub for rid in self.mrids]]) + except AttributeError: + print('Error: Model does not contain FBA flux bounds!') + return + if hasattr(xba_model, 'fbc_objectives') is False: + print('Error: Model does not contain FBA objectives!') + return self.obj_coefs = np.zeros(len(self.mrids)) for oid, fbc_obj in xba_model.fbc_objectives.items(): if fbc_obj.active is True: @@ -58,6 +65,25 @@ class FbaProblem: self.set_obj_coefs(fbc_obj.coefficiants) break + if hasattr(self, 'obj_id') is False: + print('Error: Model does not contain an active FBA objective!') + return + + self.is_fba_model = True + + def combine_fwd_bwd(self, vector): + """combine forward and backward flux information. + + :param vector: contains data on all reactions + backward information for reversible reactions + :type vector: 1D ndarray of shape (len(mrids) + sum(reversible)) + :return: combined flux information + :rtype: 1D ndarray of shape (len(mrids)) + """ + n_cols = len(self.mrids) + combined = vector[:n_cols] + combined[self.reversible] -= vector[n_cols:] + return combined + def create_fba_lp(self): """create Linear Programming Problem for FBA with split reactions @@ -67,15 +93,18 @@ class FbaProblem: :return: LpProblem configured for FBA :rtype: LpProblem """ + if self.is_fba_model is False: + return None + lp = LpProblem() - # split forward and backward reactions - fwd_bwd_s_mat = np.hstack((self.s_mat, -self.s_mat)) - fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs)) - fwd_bwd_bounds = np.hstack((np.fmax(self.flux_bounds, 0), - np.vstack((np.fmax(-self.flux_bounds[1], 0), - np.fmax(-self.flux_bounds[0], 0))))) - lp.configure(fwd_bwd_obj_coefs, fwd_bwd_bounds, - fwd_bwd_s_mat, np.zeros((2, fwd_bwd_s_mat.shape[0])), self.obj_dir) + # self.reversible = np.array([self.xba_model.reactions[rid].reversible for rid in self.mrids]) + fwd_bwd_s_mat = np.hstack((self.s_mat, -self.s_mat[:, self.reversible])) + fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible])) + fwd_bwd_bnds = np.hstack((np.fmax(self.flux_bounds, 0), + np.vstack((np.fmax(-self.flux_bounds[1, self.reversible], 0), + np.fmax(-self.flux_bounds[0, self.reversible], 0))))) + row_bnds = np.zeros((2, fwd_bwd_s_mat.shape[0])) + lp.configure(fwd_bwd_obj_coefs, fwd_bwd_bnds, fwd_bwd_s_mat, row_bnds, self.obj_dir) return lp def fba_optimize(self): @@ -85,13 +114,12 @@ class FbaProblem: :rtype: dict """ lp = self.create_fba_lp() + if lp is None: + return {'simplex_status': 1, 'simplex_message': 'not an FBA model'} res = lp.solve() del lp - - # combine results for split fluxes - n_cols = len(self.mrids) - res['x'] = res['x'][:n_cols] - res['x'][n_cols:] - res['reduced_costs'] = res['reduced_costs'][:n_cols] - res['reduced_costs'][n_cols:] + res['x'] = self.combine_fwd_bwd(res['x']) + res['reduced_costs'] = self.combine_fwd_bwd(res['reduced_costs']) return res def pfba_optimize(self, fract_of_optimum=1.0): @@ -104,10 +132,12 @@ class FbaProblem: :return: """ lp = self.create_fba_lp() + if lp is None: + return {'simplex_status': 1, 'simplex_message': 'not an FBA model'} res = lp.solve(short_result=True) if res['simplex_status'] == 0: # fix FBA objective as constraint - fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs)) + fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible])) if self.obj_dir == 'maximize': row_bds = np.array([res['fun'] * fract_of_optimum, np.nan]) else: @@ -115,12 +145,11 @@ class FbaProblem: lp.add_constraint(fwd_bwd_obj_coefs, row_bds) # new LP objective is to minimize sum of fluxes n_cols = len(self.mrids) - lp.set_obj_coefs(np.ones(2 * n_cols)) + lp.set_obj_coefs(np.ones(n_cols + sum(self.reversible))) lp.set_obj_dir('minimize') res = lp.solve() - # combine results for split fluxes - res['x'] = res['x'][:n_cols] - res['x'][n_cols:] - res['reduced_costs'] = res['reduced_costs'][:n_cols] - res['reduced_costs'][n_cols:] + res['x'] = self.combine_fwd_bwd(res['x']) + res['reduced_costs'] = self.combine_fwd_bwd(res['reduced_costs']) del lp return res @@ -135,14 +164,16 @@ class FbaProblem: :rtype: pandas DataFrame """ lp = self.create_fba_lp() + if lp is None: + return {'simplex_status': 1, 'simplex_message': 'not an FBA model'} res = lp.solve(short_result=True) if rids is None: rids = self.mrids - flux_ranges = np.zeros((len(rids), 4)) + flux_ranges = np.zeros((4, len(rids))) if res['simplex_status'] == 0: # fix FBA objective as constraint - fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs)) + fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible])) if self.obj_dir == 'maximize': row_bds = np.array([res['fun'] * fract_of_optimum, np.nan]) else: @@ -152,30 +183,33 @@ class FbaProblem: n_cols = len(self.mrids) for idx, rid in enumerate(rids): # select reaction maximize/minimize flux - obj_coefs = np.zeros(2 * n_cols) - obj_coefs[self.mrid2idx[rid]] = 1 - obj_coefs[self.mrid2idx[rid] + n_cols] = -1 - lp.set_obj_coefs(obj_coefs) + new_obj_coefs = np.zeros(n_cols) + new_obj_coefs[self.mrid2idx[rid]] = 1 + new_fwd_bwd_obj_coefs = np.hstack((new_obj_coefs, -new_obj_coefs[self.reversible])) + lp.set_obj_coefs(new_fwd_bwd_obj_coefs) lp.set_obj_dir('minimize') res = lp.solve(short_result=True) if res['simplex_status'] == 0: - flux_ranges[idx, 0] = res['fun'] - flux_ranges[idx, 2] = lp.get_row_value(fba_constr_row) + flux_ranges[0, idx] = res['fun'] + flux_ranges[2, idx] = lp.get_row_value(fba_constr_row) else: print(f"FVA min failed for {rid}, {res['simplex_message']}") - flux_ranges[idx, 0] = np.nan + flux_ranges[0, idx] = np.nan lp.set_obj_dir('maximize') res = lp.solve(short_result=True) if res['simplex_status'] == 0: - flux_ranges[idx, 1] = res['fun'] - flux_ranges[idx, 3] = lp.get_row_value(fba_constr_row) + flux_ranges[1, idx] = res['fun'] + flux_ranges[3, idx] = lp.get_row_value(fba_constr_row) else: print(f"FVA max failed for {rid}, {res['simplex_message']}") - flux_ranges[idx, 1] = np.nan + flux_ranges[1, idx] = np.nan del lp - return pd.DataFrame(flux_ranges, index=rids, columns=['min', 'max', 'obj_min', 'obj_max']) + + res = {'rids': rids, 'min': flux_ranges[0], 'max': flux_ranges[1], + 'obj_min': flux_ranges[2], 'obj_max': flux_ranges[3]} + return res @property def obj_dir(self):