diff --git a/modelpruner/core/model_pruner.py b/modelpruner/core/model_pruner.py index 806ac249b937de7ef2666b2153cd6d53efa2fcec..fabf46491f0c88ffda6e3e825b93a9b4ab9bc525 100644 --- a/modelpruner/core/model_pruner.py +++ b/modelpruner/core/model_pruner.py @@ -4,52 +4,110 @@ import sys import os import re import time +import math import copy import numpy as np import multiprocessing as mp -import pickle +import json from modelpruner.models.fba_model import FbaModel from modelpruner.core.protected_parts import ProtectedParts global _fba_model +global _pf + + +def init_worker(fba_model, pf): + global _fba_model + global _pf + + _fba_model = fba_model.copy() + _pf = pf + + +def worker(rid): + global _fba_model + global _pf + + return _fba_model.fva_pf_flux_ranges(_pf, [rid]) class ModelPruner: - def __init__(self, sbml_fname, protected_parts, reduced_fname=None, resume=False): + def __init__(self, sbml_fname, protected_parts, resume=False): - self.tolerance = 1e-8 # later accessible by set_params() ? - self.cpus = int(os.cpu_count()*.8) + self.tolerance = 1e-7 # later accessible by set_params() ? + self.cpus = int(os.cpu_count() * 0.8) - if reduced_fname is None: - self.reduced_fname = re.sub(r'.xml$', '_red.xml', sbml_fname) self._snapshot_sbml = re.sub(r'.xml$', '_snapshot.xml', sbml_fname) - self._snapshot_pkl = re.sub(r'.xml$', '_snapshot.pkl', sbml_fname) + self._snapshot_json = re.sub(r'.xml$', '_snapshot.json', sbml_fname) - if resume is True: + if resume is True and os.path.exists(self._snapshot_sbml): sbml_fname = self._snapshot_sbml self.fba_model = FbaModel(sbml_fname) self.nrp = ProtectedParts(protected_parts) - self.protected_sids = self.nrp.initial_protected_sids - - if resume is True and os.path.exists(self._snapshot_pkl): - with open(self._snapshot_pkl, 'rb') as f: - self.protected_rids = pickle.load(f) - print(f'snapshot restored at {len(self.fba_model.rids):4d} remaining reactions ' - f'from: {self._snapshot_pkl}') + self.protected_sids = self.nrp.protected_sids + + if resume is True and os.path.exists(self._snapshot_json): + with open(self._snapshot_json, 'r') as f: + snapshot_data = json.load(f) + self.protected_rids = set(snapshot_data['protected_rids']) + print(f'snapshot restored at {len(self.fba_model.rids):4d} remaining reactions ' + f'from: {self._snapshot_json}') else: - self.protected_rids = self.get_total_protected_rids(self.nrp.initial_protected_rids) - - self.fba_model.update_flux_bounds(self.nrp.overwrite_bounds) + self.protected_rids = self.nrp.protected_rids + # self.protected_rids = self.get_total_protected_rids() - self.base_obj_dir = self.fba_model.obj_dir - self.base_flux_bounds = self.fba_model.flux_bounds.copy() - self.base_coefs = self.fba_model.obj_coefs.copy() + # overwrite the fba model bounds by general bound of protected parts (priot to LpProblem instantiation) + self.fba_model.update_model_flux_bounds(self.nrp.overwrite_bounds) + self.fba_model.update_model_reversible(self.get_pp_reversible_reactions()) + print('determine optimal values for protected functions') self.set_function_optimum() + print('Check consistency of protected parts') self.check_protected_parts() + def get_total_protected_rids(self): + # identify reactions that need protection due to protected metabolites + # over and above the protected rids provided in protected parts + sid_mask = np.array([True if sid in self.protected_sids else False + for sid in self.fba_model.sids]) + psf_mask = np.any(self.fba_model.s_mat[sid_mask] != 0, axis=0) + protected_sid_rids = set(self.fba_model.rids[psf_mask]) + + protected_rids = self.nrp.protected_rids.union(protected_sid_rids) + return protected_rids + + def get_pp_reversible_reactions(self): + """Identify reversible reactions from protected parts + + :return: reaction ids from protected parts with negative fluxes + :rtype: set of str + """ + rev_pp_rids = set() + for rid, bounds in self.nrp.overwrite_bounds.items(): + for idx in [0, 1]: + if math.isfinite(bounds[idx]): + rev_pp_rids.add(rid) + for pf in self.nrp.protected_functions.values(): + for rid, bounds in pf.overwrite_bounds.items(): + for idx in [0, 1]: + if math.isfinite(bounds[idx]) and bounds[idx] < 0.0: + rev_pp_rids.add(rid) + return rev_pp_rids + + def set_function_optimum(self): + """using FBA to set optimal value for protected functions""" + for pf in self.nrp.protected_functions.values(): + res = self.fba_model.fba_pf_optimize(pf) + pf.optimal = res['fun'] + pf.fba_success = res['success'] + if hasattr(pf, 'target_frac'): + if pf.obj_dir == 'maximize': + pf.set_target_lb(pf.target_frac * pf.optimal) + else: + pf.set_target_ub(pf.target_frac * pf.optimal) + def check_protected_parts(self): extra_p_rids = self.protected_rids.difference(set(self.fba_model.rids)) @@ -74,123 +132,108 @@ class ModelPruner: print('reactions in protected functions that are not protected:', pf_unprotected_rids) blocked_rids = self.reaction_types_fva(self.fba_model.rids)['blocked_rids'] - blocked_p_rids = self.nrp.initial_protected_sids.intersection(blocked_rids) + blocked_p_rids = self.nrp.protected_sids.intersection(blocked_rids) blocked_pf_rids = pf_rids.intersection(blocked_rids) if len(blocked_p_rids) > 0: print('protected reactions that are blocked (no flux):', blocked_p_rids) if len(blocked_pf_rids) > 0: print('reactions in protected functions that are blocked (no flux):', blocked_pf_rids) - def prune(self, snapshot_freq=None): - print(f'initial S-matrix: {self.fba_model.s_mat.shape}; ' - f' protected reactions: {len(self.protected_rids)}:', self.protected_rids) - self.remove_parallel_reactions() - print('protected functions feasibility:', self.check_pf_feasibility()) - - if type(snapshot_freq) is int: - next_snapshot = max(0, len(self.fba_model.rids) - snapshot_freq) - else: - next_snapshot = None - - while len(self.protected_rids) < self.fba_model.s_mat.shape[1]: - free_rids = list(set(self.fba_model.rids).difference(self.protected_rids)) - reaction_types = self.reaction_types_fva(free_rids) - if len(reaction_types['blocked_rids']) > 0: - self.fba_model.remove_reactions(reaction_types['blocked_rids'], self.protected_rids) - print(f"{len(reaction_types['blocked_rids'])} blocked reaction(s) removed:", - reaction_types['blocked_rids']) - if len(reaction_types['essential_rids']) > 0: - self.protected_rids |= reaction_types['essential_rids'] - print(f"{len(reaction_types['essential_rids'])} reaction(s) added to protected reactions:", - reaction_types['essential_rids']) - - print(f"{len(reaction_types['candidate_rids_sorted'])} candidates:", - reaction_types['candidate_rids_sorted'][:4], '...') - # try to remove one reaction while still satisfying all protected functions - feasible = False - essential_rids = set() - for rid in reaction_types['candidate_rids_sorted']: - orig_flux_bounds = self.fba_model.get_flux_bounds(rid) - self.fba_model.update_flux_bounds({rid: [0.0, 0.0]}) - feasible = self.check_pf_feasibility() - self.fba_model.update_flux_bounds({rid: orig_flux_bounds}) - if feasible: - self.fba_model.remove_reactions({rid}, self.protected_rids) - print(f'{rid} removed from the model') - break - else: - essential_rids.add(rid) - if len(essential_rids) > 0: - self.protected_rids |= essential_rids - print(f"{len(essential_rids)} reaction(s) added to protected reactions:", essential_rids) - - if feasible is False: - print('no more reactions to remove') - - # store intermediate results (snapshots) - if (next_snapshot is not None) and (len(self.fba_model.rids) < next_snapshot): - self.export_pruned_model(self._snapshot_sbml) - with open(self._snapshot_pkl, 'wb') as f: - pickle.dump(self.protected_rids, f) - next_snapshot = max(0, len(self.fba_model.rids) - snapshot_freq) - print('snapshot data saved.') - - print(f'S-matrix: {self.fba_model.s_mat.shape}; ' - f'protected reactions: {len(self.protected_rids)}') - - # check if blocked reactions exist in the finally pruned model - blocked_rids = self.reaction_types_fva(self.fba_model.rids)['blocked_rids'] - if len(blocked_rids) > 0: - print(f'{len(blocked_rids)} blocked reaction(s) in pruned model', blocked_rids) - - # remove any snapshot files - for fname in [self._snapshot_sbml, self._snapshot_pkl]: - if os.path.exists(fname): - os.remove(fname) + def identify_parallel_reactions(self): + """Identify parallel reactions having same stoichiometry (considering reverse). - def init_worker(self): - global _fba_model - _fba_model = copy.deepcopy(self.fba_model) + protected reaction are preserved. + reversible reactions have higher priority - @staticmethod - def fva_single_rids(rid): - return _fba_model.fva_single_rid(rid) + :return: reaction ids of parallel reactions + :rtype: set of str + """ + drop_rids = set() + for group_rids in self.fba_model.get_parallel_rids(): + protected = set() + rid_rev = None + for rid in group_rids: + if rid in self.protected_rids: + protected.add(rid) + if rid in self.fba_model.rids_rev: + rid_rev = rid + if len(protected) > 0: + drop_group_rids = set(group_rids) - protected + elif rid_rev is not None: + drop_group_rids = set(group_rids) - {rid_rev} + else: + drop_group_rids = set(group_rids[1:]) + drop_rids |= drop_group_rids + return drop_rids def reaction_types_fva(self, free_rids): + """Idenify reaction types using FVA applied to all protected functions + + first part: run FVA across all conditions + :param free_rids: reaction ids used for FVA + :type free_rids: list of strings + :return: rid, min and max flux values + :rtype: dict + """ print(time.strftime("%H:%M:%S", time.localtime())) + n_pf = len(self.nrp.protected_functions) flux_min_pfs = np.zeros((n_pf, len(free_rids))) flux_max_pfs = np.zeros((n_pf, len(free_rids))) + res = {} processes = min(self.cpus, len(free_rids)) for idx, pf in enumerate(self.nrp.protected_functions.values()): - self.set_temp_func_conditions(pf) - if processes > 20: + if processes > 100: + frid2idx = {rid: idx for idx, rid in enumerate(free_rids)} # place to split fva for free_rids over several - rid2idx = {rid: idx for idx, rid in enumerate(free_rids)} - pool = mp.Pool(processes, initializer=self.init_worker) - for rid, res in pool.imap_unordered(self.fva_single_rids, free_rids): - flux_min_pfs[idx, rid2idx[rid]] = res['min'] - flux_max_pfs[idx, rid2idx[rid]] = res['max'] - pool.close() - pool.join() + with mp.Pool(processes, initializer=init_worker, initargs=(self.fba_model, pf)) as pool: + for res in pool.imap_unordered(worker, free_rids): + rid = res['rids'][0] + flux_min_pfs[idx, frid2idx[rid]] = res['min'][0] + flux_max_pfs[idx, frid2idx[rid]] = res['max'][0] else: - res = self.fba_model.fva_optimize(free_rids, fract_of_optimum=0.0) - flux_min_pfs[idx] = res['min'] - flux_max_pfs[idx] = res['max'] - - self.restore_base_conditions() - - # if res['success'] is False: - # print('FVA unsuccessful') - # return {'blocked_rids': [], 'essential_rids': [], 'candidate_rids_sorted': []} - + # print(f'\nCheck FVA for pf {pf.obj_id} with {len(free_rids)} free variables') + res = self.fba_model.fva_pf_flux_ranges(pf, free_rids) + if res['success'] is True: + flux_min_pfs[idx] = res['min'] + flux_max_pfs[idx] = res['max'] + else: + print(f'FBA for condition {pf.obj_id} is not feasible') + # TODO: can this happen? how best to continue? + flux_min_pfs[idx] = np.nan + flux_max_pfs[idx] = np.nan + break sys.stdout.write('\rFVA[{}{}] {:3.0f}%'.format( '=' * int(idx+1), ' ' * int(n_pf - idx-1), (idx+1)/n_pf*100)) sys.stdout.flush() - print() + if res['success'] is False: + reaction_types = {'blocked_rids': set(), 'essential_rids': set(free_rids), + 'candidate_rids_sorted': []} + else: + reaction_types = self.classify_reactions(free_rids, flux_min_pfs, flux_max_pfs) + return reaction_types + + def classify_reactions(self, rids, flux_min_pfs, flux_max_pfs): + """Classify reaction types based on FVA results. + + determine flux span (min/max flux) for each reaction across all conditions + Identify blocked, essential and deletion candidate reactions based on flux span + blocked reactions: min and max of flux span is zero + essential reactions: min and max either > 0 or < 0 + deletion candidates: flux span support zero (0) flux + + :param rids: reaction ids used in FVA + :type rids: list of strings + :param flux_min_pfs: minimum fluxes per condition + :type flux_min_pfs: 2D ndarray of floats (reactions x conditions) + :param flux_max_pfs: maximum fluxes per condition + :type flux_max_pfs: 2D ndarray of floats (reactions x conditions) + :return: rid, min and max flux values + :rtype: dict + """ # analyze flux ranges per reaction, aggregate fluxes flux_min_pfs[np.abs(flux_min_pfs) < self.tolerance] = 0.0 flux_max_pfs[np.abs(flux_max_pfs) < self.tolerance] = 0.0 @@ -199,116 +242,104 @@ class ModelPruner: # blocked reactions do not carry any flux in all conditions mask_b = np.all(np.vstack((flux_min == 0, flux_max == 0)), axis=0) - blocked_rids = set(np.array(free_rids)[mask_b]) + blocked_rids = set(np.array(rids)[mask_b]) # essential reactions carry either only forward or reverse flux, exluding zero flux mask_e1 = np.all(np.vstack((flux_min < 0, flux_max < 0)), axis=0) mask_e2 = np.all(np.vstack((flux_min > 0, flux_max > 0)), axis=0) mask_e = np.any(np.vstack((mask_e1, mask_e2)), axis=0) - essential_rids = set(np.array(free_rids)[mask_e]) + essential_rids = set(np.array(rids)[mask_e]) # reactions that support zero flux are considered candiates for removal # here such reactions are sorted from smallest to largest flux span mask_c = np.all(np.vstack((np.logical_not(mask_b), np.logical_not(mask_e))), axis=0) flux_span = flux_max - flux_min ind = np.argsort(flux_span) - candidate_rids_sorted = list(np.array(free_rids)[ind][np.array(mask_c)[ind]]) + candidate_rids_sorted = list(np.array(rids)[ind][np.array(mask_c)[ind]]) - return {'blocked_rids': blocked_rids, 'essential_rids': essential_rids, - 'candidate_rids_sorted': candidate_rids_sorted} + reaction_types = {'blocked_rids': blocked_rids, 'essential_rids': essential_rids, + 'candidate_rids_sorted': candidate_rids_sorted} - def remove_parallel_reactions(self): + return reaction_types - parallel_rid_groups = self.fba_model.get_identical_columns() + def init_worker(self): + global _fba_model + _fba_model = copy.deepcopy(self.fba_model) - drop_rids = set() - for group_rids in parallel_rid_groups: - protected = set() - reversible = None - for rid in group_rids: - if rid in self.protected_rids: - protected.add(rid) - if self.fba_model.reversible[self.fba_model.rid2idx[rid]]: - reversible = rid - if len(protected) > 0: - drop_group_rids = set(group_rids) - protected - elif reversible is not None: - drop_group_rids = set(group_rids) - {reversible} - else: - drop_group_rids = set(group_rids[1:]) - drop_rids |= drop_group_rids + @staticmethod + def fva_single_rids(rid): + return _fba_model.fva_single_rid(rid) - self.fba_model.remove_reactions(drop_rids, self.protected_sids) - print(f'{len(drop_rids)} parallel reaction(s) dropped:', drop_rids) + def prune(self, snapshot_freq=None): + print(f'initial S-matrix: {self.fba_model.s_mat.shape}; ' + f' protected reactions: {len(self.protected_rids)}:', self.protected_rids) + drop_rids = self.identify_parallel_reactions().difference(self.protected_rids) + if len(drop_rids) > 0: + self.fba_model.remove_reactions(drop_rids) + print(f'{len(drop_rids)} parallel reaction(s) dropped:', drop_rids) - def set_function_optimum(self): - """using FBA to set optimal value for protected functions""" - for obj_id, pf in self.nrp.protected_functions.items(): - self.set_temp_func_conditions(pf) - res = self.fba_model.fba_optimize(short_result=True) - self.restore_base_conditions() + feasible = np.array([self.fba_model.check_pf_feasibility(pf) for pf in self.nrp.protected_functions.values()]) + print('protected functions feasibility:', np.all(feasible)) - pf.optimal = res['fun'] - pf.fba_success = res['success'] - if hasattr(pf, 'target_frac'): - if pf.obj_dir == 'minimize': - pf.set_target_ub(pf.target_frac * pf.optimal) - else: - pf.set_target_lb(pf.target_frac * pf.optimal) + next_snapshot = 0 + if type(snapshot_freq) is int: + next_snapshot = max(0, len(self.fba_model.rids) - snapshot_freq) - def get_total_protected_rids(self, protected_rids): - # identify reactions that need protection due to protected metabolites - # over and above the protected rids provided in protected parts - sid_mask = np.array([True if sid in self.protected_sids else False - for sid in self.fba_model.sids]) + while len(self.protected_rids) < self.fba_model.s_mat.shape[1]: + free_rids = list(set(self.fba_model.rids).difference(self.protected_rids)) + reaction_types = self.reaction_types_fva(free_rids) + print() + blocked_rids = reaction_types['blocked_rids'] + essential_rids = reaction_types['essential_rids'] + candidate_rids = reaction_types['candidate_rids_sorted'] + if len(blocked_rids) > 0: + self.fba_model.remove_reactions(blocked_rids) + print(f"{len(blocked_rids)} blocked reaction(s) removed:", blocked_rids) + print(f"{len(candidate_rids)} candidates:", candidate_rids[:4], '...') - psf_mask = np.any(self.fba_model.s_mat[sid_mask] != 0, axis=0) - protected_sid_rids = set(self.fba_model.rids[psf_mask]) + # try to remove one reaction while still satisfying all protected functions + for rid in candidate_rids: + # print('check feasibility of', rid) + feasible = True + for pf in self.nrp.protected_functions.values(): + feasible = self.fba_model.check_pf_feasibility(pf, zero_flux_rid=rid) + # print(f'feasibility of {rid} is {feasible}') + if feasible is False: + essential_rids.add(rid) + break + if feasible is True: + self.fba_model.remove_reactions({rid}) + print(f'{rid} removed from the model') + break - # identify rids in protected functions and ensure they are protected - pf_rids = set() - for pf in self.nrp.protected_functions.values(): - pf_rids |= {rid for rid in pf.objective} - pf_rids |= {rid for rid in pf.overwrite_bounds} + if len(essential_rids) > 0: + self.protected_rids |= essential_rids + print(f"{len(essential_rids)} reaction(s) added to protected reactions:", essential_rids) - protected_rids |= protected_sid_rids - protected_rids |= pf_rids - return protected_rids + if feasible is False: + print('no more reactions to remove') - def check_pf_feasibility(self): - """check that current model still satisfies the protected functions - """ - feasible = True - for pf in self.nrp.protected_functions.values(): - self.set_temp_func_conditions(pf) - res = self.fba_model.fba_optimize(short_result=True) - self.restore_base_conditions() - if res['success'] is False: - feasible = False - break - if pf.obj_dir == 'maximize': - if res['fun'] < pf.target_lb: - feasible = False - break - else: - if res['fun'] > pf.target_ub: - feasible = False - break - return feasible + # store intermediate results (snapshots) + if len(self.fba_model.rids) < next_snapshot: + self.export_pruned_model(self._snapshot_sbml) + snapshot_data = {'protected_rids': list(self.protected_rids)} + with open(self._snapshot_json, 'w') as f: + json.dump(snapshot_data, f) + next_snapshot = max(0, len(self.fba_model.rids) - snapshot_freq) + print('snapshot data saved.') - def set_temp_func_conditions(self, pf): - self.base_obj_dir = self.fba_model.obj_dir - self.base_flux_bounds = self.fba_model.flux_bounds.copy() - self.base_coefs = self.fba_model.obj_coefs.copy() + print(f'S-matrix: {self.fba_model.s_mat.shape}; ' + f'protected reactions: {len(self.protected_rids)}') - self.fba_model.obj_dir = pf.obj_dir - self.fba_model.update_flux_bounds(pf.overwrite_bounds) - self.fba_model.obj_coefs = self.fba_model.set_objective_coefficients(pf.objective) + # check if blocked reactions exist in the finally pruned model + # blocked_rids = self.reaction_types_fva(self.fba_model.rids)['blocked_rids'] + # if len(blocked_rids) > 0: + # print(f'{len(blocked_rids)} blocked reaction(s) in pruned model', blocked_rids) - def restore_base_conditions(self): - self.fba_model.obj_dir = self.base_obj_dir - self.fba_model.flux_bounds = self.base_flux_bounds.copy() - self.fba_model.obj_coefs = self.base_coefs.copy() + # remove any snapshot files + # for fname in [self._snapshot_sbml, self._snapshot_json]: + # if os.path.exists(fname): + # os.remove(fname) def get_reduction_status(self): """ Return status in model reduction process @@ -327,7 +358,7 @@ class ModelPruner: status = self.get_reduction_status() print("reactions removed/remaining/protected: " - f"{status['n_full_rids' ] - status['n_rids']:4d}/{status['n_rids']:4d}/" + f"{status['n_full_rids' ] - status['n_rids']:4d}/{status['n_rids']:4d}" f"/{len(self.protected_rids):4d}; dof: {status['dof']:3d}") def export_pruned_model(self, pruned_sbml): @@ -336,3 +367,6 @@ class ModelPruner: print('pruned model exported to', pruned_sbml) else: print('pruned model could not be exported') + + def __del__(self): + del self.fba_model diff --git a/modelpruner/core/protected_parts.py b/modelpruner/core/protected_parts.py index b35343a52aac00ab0ba6a884bc3d32a90ae7b415..4f2180d733bc3fc555e347e2f15c587af65a909d 100644 --- a/modelpruner/core/protected_parts.py +++ b/modelpruner/core/protected_parts.py @@ -23,8 +23,8 @@ class ProtectedParts: assert type(protected_parts) == dict, 'expected filename or a dict' nrp = protected_parts - self.initial_protected_rids = set(nrp.get('reactions', [])) - self.initial_protected_sids = set(nrp.get('metabolites', [])) + self.protected_rids = set(nrp.get('reactions', [])) + self.protected_sids = set(nrp.get('metabolites', [])) if 'bounds' in nrp: num_cols = ['fbcLb', 'fbcUb'] nrp['bounds'][num_cols] = nrp['bounds'][num_cols].applymap( @@ -82,7 +82,7 @@ class ProtectedFunction: self.obj_dir = row['type'] for record in sbmlxdf.misc.record_generator(row['fluxObjectives']): params = sbmlxdf.misc.extract_params(record) - self.objective[params['reac']] = params['coef'] + self.objective[params['reac']] = float(params['coef']) if math.isfinite(row['fraction']): self.target_frac = row['fraction'] if math.isfinite(row['fbcLb']): diff --git a/modelpruner/models/fba_model.py b/modelpruner/models/fba_model.py index 1613be9ee1d4611b857a9e716e276987d0abd044..90d43b038870496b4d5184335a90bd56eb1d8d8f 100644 --- a/modelpruner/models/fba_model.py +++ b/modelpruner/models/fba_model.py @@ -26,268 +26,72 @@ class FbaModel: model_dict = sbml_model.to_df() df_reactions = model_dict['reactions'] df_species = model_dict['species'] - df_fbc_objectives = model_dict['fbcObjectives'] - self.full_model_shape = {'n_full_rids': len(df_reactions), - 'n_full_sids': len(df_species)} self.rids = df_reactions.index.to_numpy() self.rid2idx = {rid: idx for idx, rid in enumerate(self.rids)} self.sids = df_species.index.to_numpy() self.sid2idx = {sid: idx for idx, sid in enumerate(self.sids)} - self.reversible = df_reactions['reversible'].to_numpy().copy() + self.full_model_shape = {'n_full_rids': len(self.rids), 'n_full_sids': len(self.sids)} self.flux_bounds = np.vstack((df_reactions['fbcLb'].to_numpy(), df_reactions['fbcUb'].to_numpy())) + self.reversible = df_reactions['reversible'].to_numpy().copy() + for idx in range(len(self.reversible)): + if self.flux_bounds[0, idx] < 0 and np.logical_not(self.reversible[idx]): + self.reversible[idx] = True + print(f'reaction {idx} set to reversible') + self.rids_rev = self.rids[self.reversible] + self.rid2idx_rev = {rid: idx + len(self.rids) for idx, rid in enumerate(self.rids_rev)} self.s_mat = sbml_model.get_s_matrix().to_numpy() # TODO: create LP Problem only once (but what is impact on deepcopy() # function to assign new LP problem - # TODO: delete rows / cols in LP problem on removal of rids, sids - # TODO: prior to solve, add objective, thereafter immediatly remove objective - - self.objective = {} - for oid, row in df_fbc_objectives.iterrows(): - if row['active'] is True: - self.obj_id = row.name - self.obj_dir = row['type'] - for record in sbmlxdf.misc.record_generator(row['fluxObjectives']): - params = sbmlxdf.misc.extract_params(record) - self.objective[params['reac']] = params['coef'] - - self.check_consistency() - self.obj_coefs = self.set_objective_coefficients(self.objective) - - def set_objective_coefficients(self, objective): - obj_coefs = np.zeros(len(self.rids)) - for rid, coef in objective.items(): - obj_coefs[self.rid2idx[rid]] = coef - return obj_coefs - def check_consistency(self): + # lp problem will only be created once required. This gives chances to update e.g. flux bounds + self.lp = None - # check that irreversible reaction have a flux lower bound >= 0 + def update_reversible_rids(self): + self.rids_rev = self.rids[self.reversible] + self.rid2idx_rev = {rid: idx + len(self.rids) for idx, rid in enumerate(self.rids_rev)} - for idx in range(len(self.rids)): - if np.logical_not(self.reversible[idx]): - if self.flux_bounds[0, idx] < 0: - self.reversible[idx] = True - print(f'reaction {self.rids[idx]} set to reversible') + def update_model_reversible(self, reactions): + """Update reversible flag of reactions - if len(self.objective) == 0: - print('Active FBA objective function missing in the model!') - - for rid in self.objective: - if rid not in self.rids: - print('objective reaction not part of reactions') - raise AttributeError - - def combine_fwd_bwd(self, vector): - """combine forward and backward flux information. - - :param vector: contains data on all reactions + backward information for reversible reactions - :type vector: 1D ndarray of shape (len(mr_rids) + sum(reversible)) - :return: combined flux information - :rtype: 1D ndarray of shape (len(mr_rids)) + :param reactions: reactions ids that have negative flux bounds + :type reactions: set of str """ - n_cols = len(self.rids) - combined = vector[:n_cols] - combined[self.reversible] -= vector[n_cols:] - return combined + upd_rids = set(reactions).difference(set(self.rids_rev)) + for rid in upd_rids: + self.reversible[self.rid2idx[rid]] = True + if len(upd_rids) > 0: + print('reactions in protected functions made reversible:', upd_rids) + self.update_reversible_rids() - def create_fba_lp(self): - """create Linear Programming Problem for FBA with split reactions + def update_model_flux_bounds(self, flux_bounds): + """Overwrite model flux bounds with protected parts flux bounds. - Splitting reactions in forward/backward directions reduces high values during FBA - Splitting required for pFBA (minimize absolute flux values) + lp problem might need to be instantiated - :return: LpProblem configured for FBA - :rtype: LpProblem + :param flux_bounds: lower and upper flux bounds for selected reactions + :type flux_bounds: dict: key: reaction id, value: list-like of two float values """ - lp = LpProblem() - fwd_bwd_s_mat = np.hstack((self.s_mat, -self.s_mat[:, self.reversible])) - fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible])) - fwd_bwd_bnds = np.hstack((np.fmax(self.flux_bounds, 0), - np.vstack((np.fmax(-self.flux_bounds[1, self.reversible], 0), - np.fmax(-self.flux_bounds[0, self.reversible], 0))))) - row_bnds = np.zeros((fwd_bwd_s_mat.shape[0], 2)) - lp.configure(fwd_bwd_obj_coefs, fwd_bwd_s_mat, row_bnds, fwd_bwd_bnds, self.obj_dir) - return lp - - def fba_optimize(self, short_result=False): - """FBA optimization of current problem + for rid, bound_upd in flux_bounds.items(): + for i in [0, 1]: + if math.isfinite(bound_upd[i]): + self.flux_bounds[i, self.rid2idx[rid]] = bound_upd[i] - :return: results information from the optimization - :rtype: dict - """ - lp = self.create_fba_lp() - if lp is None: - return {'success': False, 'message': 'not an FBA model'} - res = lp.solve(short_result) - del lp - if short_result is False: - res['x'] = self.combine_fwd_bwd(res['x']) - res['reduced_costs'] = self.combine_fwd_bwd(res['reduced_costs']) - return res + def get_parallel_rids(self): + """Identify identical reactions (forward/reverse). - def fva_optimize(self, rids=None, fract_of_optimum=1.0): - """FVA - flux variability analysis for all or selected reactions. + Linerar scaling is not considered. - :param rids: selected reaction ids to run FVA (alternatively, all) - :type rids: list of strings (default: None - all reactions) - :param fract_of_optimum: value between 0.0 and 1.0. - :type fract_of_optimum: float (default 1.0) - :return: rid, min and max flux values - :rtype: dict + :return: lists of reaction ids that are parallel + :rtype: list with lists of str """ - lp = self.create_fba_lp() - if lp is None: - return {'success': False, 'message': 'not an FBA model'} - res = lp.solve(short_result=True) - - if res['success'] is False: - del lp - return {'success': False, 'message': res['generic_status']} - - if rids is None: - rids = self.rids - flux_min = np.zeros(len(rids)) - flux_max = np.zeros(len(rids)) - - # fix FBA objective as constraint - fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible])) - if self.obj_dir == 'maximize': - row_bds = np.array([res['fun'] * fract_of_optimum, np.nan]) - else: - if fract_of_optimum > 0.0: - row_bds = np.array([np.nan, res['fun'] / fract_of_optimum]) - else: - row_bds = np.array([np.nan, 1000.0]) - lp.add_constraint(fwd_bwd_obj_coefs, row_bds) - - n_cols = len(self.rids) - for idx, rid in enumerate(rids): - # select reaction maximize/minimize flux - new_obj_coefs = np.zeros(n_cols) - new_obj_coefs[self.rid2idx[rid]] = 1 - new_fwd_bwd_obj_coefs = np.hstack((new_obj_coefs, -new_obj_coefs[self.reversible])) - lp.set_obj_coefs(new_fwd_bwd_obj_coefs) - - lp.set_obj_dir('minimize') - res = lp.solve(short_result=True) - flux_min[idx] = res['fun'] if res['success'] else np.nan - - lp.set_obj_dir('maximize') - res = lp.solve(short_result=True) - flux_max[idx] = res['fun'] if res['success'] else np.nan - del lp - return {'success': True, 'rids': np.array(rids), 'min': flux_min, 'max': flux_max} - - def fva_single_rid(self, rids): - """FVA - flux variability analysis for all or selected reactions. - - :param rids: selected reaction ids to run FVA (alternatively, all) - :type rids: list of strings (default: None - all reactions) - :return: rid, min and max flux values - :rtype: dict - """ - lp = self.create_fba_lp() - if lp is None: - return {'success': False, 'message': 'not an FBA model'} - res = lp.solve(short_result=True) - - if res['success'] is False: - del lp - return {'success': False, 'message': res['generic_status']} - - if rids is None: - rids = self.rids - flux_min = np.zeros(len(rids)) - flux_max = np.zeros(len(rids)) - # fix FBA objective as constraint - fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible])) - if self.obj_dir == 'maximize': - row_bds = np.array([0.0, np.nan]) - else: - row_bds = np.array([np.nan, 1000.0]) - lp.add_constraint(fwd_bwd_obj_coefs, row_bds) - - n_cols = len(self.rids) - for idx, rid in enumerate(rids): - # select reaction maximize/minimize flux - new_obj_coefs = np.zeros(n_cols) - new_obj_coefs[self.rid2idx[rid]] = 1 - new_fwd_bwd_obj_coefs = np.hstack((new_obj_coefs, -new_obj_coefs[self.reversible])) - lp.set_obj_coefs(new_fwd_bwd_obj_coefs) - - lp.set_obj_dir('minimize') - res = lp.solve(short_result=True) - flux_min[idx] = res['fun'] if res['success'] else np.nan - - lp.set_obj_dir('maximize') - res = lp.solve(short_result=True) - flux_max[idx] = res['fun'] if res['success'] else np.nan - del lp - return {'success': True, 'rids': np.array(rids), 'min': flux_min, 'max': flux_max} - - def update_flux_bounds(self, flux_bounds): - """selectivly set lower/upper flux bounds for given reactions. - - Flux bounds with value of 'np.nan' will not be set - - :param flux_bounds: new lower and upper flux bounds selected reactions - :type flux_bounds: dict (key: reaction id, value: list with two float values for lower/upper bound) - """ - for rid, (lb, ub) in flux_bounds.items(): - assert rid in self.rids - if math.isfinite(lb): - self.flux_bounds[0, self.rid2idx[rid]] = lb - if lb < 0: - self.reversible[self.rid2idx[rid]] = True - if math.isfinite(ub): - self.flux_bounds[1, self.rid2idx[rid]] = ub - - def get_flux_bounds(self, rid): - assert rid in self.rids - return [self.flux_bounds[0, self.rid2idx[rid]], self.flux_bounds[1, self.rid2idx[rid]]] - - def remove_reactions(self, drop_rids, protected_sids): - # remove_reactions - # also remove corresponding metabolite columns - # update, rids, rid2idx, sids, sid2idx, reversible, flux_bounds, s_mat - - keep_rids_mask = np.array([False if rid in drop_rids else True for rid in self.rids]) - - self.rids = self.rids[keep_rids_mask] - self.rid2idx = {rid: idx for idx, rid in enumerate(self.rids)} - self.reversible = self.reversible[keep_rids_mask] - self.obj_coefs = self.obj_coefs[keep_rids_mask] - self.flux_bounds = self.flux_bounds[:, keep_rids_mask] - self.s_mat = self.s_mat[:, keep_rids_mask] - - keep_sids_mask = np.any(self.s_mat != 0, axis=1) - drop_sids = set(self.sids[np.logical_not(keep_sids_mask)]) - - if len(drop_sids) > 0: - drop_protected_sids = drop_sids.intersection(protected_sids) - if len(drop_protected_sids) > 0: - print('Something went worong, protected metabolites dropped due to reaction removal.', - drop_protected_sids) - self.sids = self.sids[keep_sids_mask] - self.sid2idx = {sid: idx for idx, sid in enumerate(self.sids)} - self.s_mat = self.s_mat[keep_sids_mask, :] - - def get_identical_columns(self): - # Identify identical columns: - # sort the columns - # check parallel columns, if they are identical - # report on identical columns - - # we also consider reversibility - # we do not consider linearly scaled reactions - fwd_bwd_s_mat = np.hstack((self.s_mat, -self.s_mat[:, self.reversible])) - fwd_bwd_rids = np.hstack((self.rids, self.rids[self.reversible])) + fwd_rev_s_mat = np.hstack((self.s_mat, -self.s_mat[:, self.reversible])) + fwd_rev_rids = np.hstack((self.rids, self.rids_rev)) # identify groups of parallel reactions - sorted_idx = np.lexsort(fwd_bwd_s_mat) + sorted_idx = np.lexsort(fwd_rev_s_mat) groups = [] consumed_idxs = set() for pos in range(len(sorted_idx)): @@ -296,17 +100,30 @@ class FbaModel: group = [idx] for adjacent_pos in range(pos + 1, len(sorted_idx)): candiate_idx = sorted_idx[adjacent_pos] - if np.all(fwd_bwd_s_mat[:, idx] == fwd_bwd_s_mat[:, candiate_idx]): - group.append(candiate_idx) + if np.all(fwd_rev_s_mat[:, idx] == fwd_rev_s_mat[:, candiate_idx]): consumed_idxs.add(candiate_idx) + group.append(candiate_idx) else: break if len(group) > 1: groups.append(group) - parallel_rid_groups = [list(fwd_bwd_rids[group]) for group in groups] + parallel_rid_groups = [list(fwd_rev_rids[group]) for group in groups] return parallel_rid_groups + def combine_fwd_rev(self, vector): + """combine forward and backward flux information. + + :param vector: contains data on all reactions + backward information for reversible reactions + :type vector: 1D ndarray of shape (len(mr_rids) + sum(reversible)) + :return: combined flux information + :rtype: 1D ndarray of shape (len(mr_rids)) + """ + n_cols = len(self.rids) + combined = vector[:n_cols] + combined[self.reversible] -= vector[n_cols:] + return combined + def get_model_status(self): """ Return status on model @@ -320,17 +137,6 @@ class FbaModel: status.update(self.full_model_shape) return status - @property - def obj_dir(self): - return self.__obj_dir - - @obj_dir.setter - def obj_dir(self, dir_string): - if 'min' in dir_string: - self.__obj_dir = 'minimize' - else: - self.__obj_dir = 'maximize' - def export_pruned_model(self, pruned_sbml): sbml_model = sbmlxdf.Model(self.sbml_fname) @@ -383,3 +189,250 @@ class FbaModel: if len(err) > 0: print('SBML validation result: %s', ', '.join([k + ': ' + str(v) for k, v in err.items()])) return pruned_model.export_sbml(pruned_sbml) + + def create_core_fba_lp(self): + """create a core Linear Programming Problem for FBA with split reactions + + Core Problem contains coefficient matrix, variable and row bounds + Objective function yet needs to be added prior to any optimization + + Splitting reactions in forward/backward directions reduces high values during FBA + configured bounds are >= 0 + Splitting required for pFBA (minimize absolute flux values) + + :return: LpProblem configured for FBA + :rtype: LpProblem + """ + fwd_rev_s_mat = np.hstack((self.s_mat[:], -self.s_mat[:, self.reversible])) + fwd_rev_col_bnds = np.hstack((np.fmax(self.flux_bounds[:], 0), + np.fmax(np.flip(-self.flux_bounds[:, self.reversible], axis=0), 0))) + row_bnds = np.zeros((fwd_rev_s_mat.shape[0], 2)) + lp = LpProblem(fwd_rev_s_mat, row_bnds, fwd_rev_col_bnds) + return lp + + def set_lp_objective(self, objective, direction='maximize'): + """Set objective coefficients and direction in linear problem + + :param objective: objective coefficients + :type objective: dict with key: reaction id, value: stoichiometric coefficient + :param direction: direction of optimization + :type direction: str (optional, default: 'maximize') + """ + assert self.lp is not None + self.lp.set_obj_coefs(self.get_coefficients(objective)) + self.lp.set_obj_dir(direction) + + def remove_lp_objective(self, objective): + """remove objective coefficients + + :param objective: objective coefficients + :type objective: dict with key: reaction id, value: stoichiometric coefficient + """ + self.lp.remove_obj_coefs(self.get_coefficients(objective)) + + def update_lp_flux_bounds(self, flux_bounds): + """update flux bounds for selected reactions in the linear problem. + + lp problem might need to be instantiated + overwrite bounds of protected functions will be cleaned from already removed reactions + + :param flux_bounds: lower and upper flux bounds for selected reactions + :type flux_bounds: dict: key: reaction id, value: list-like of two float values + """ + if self.lp is None: + self.lp = self.create_core_fba_lp() + print('Core LP Problem created') + for rid, bound_upd in flux_bounds.items(): + if rid in self.rids: + temp_bounds = self.flux_bounds[:, self.rid2idx[rid]].copy() + for i in [0, 1]: + if math.isfinite(bound_upd[i]): + temp_bounds[i] = bound_upd[i] + self.lp.set_col_bnd(self.rid2idx[rid], np.fmax(temp_bounds, 0)) + if rid in self.rids_rev: + self.lp.set_col_bnd(self.rid2idx_rev[rid], np.fmax(np.flip(-temp_bounds), 0)) + + def reset_lp_flux_bounds(self, rids): + """restore original flux bounds for selected reactions in linear problem. + + overwrite bounds of protected functions will be cleaned from already removed reactions + + :param rids: reaction ids for which to restore the model orignal flux bounds + :type rids: set of str + """ + assert self.lp is not None + + for rid in rids: + if rid in self.rids: + assert rid in self.rids + bounds = self.flux_bounds[:, self.rid2idx[rid]] + self.lp.set_col_bnd(self.rid2idx[rid], np.fmax(bounds, 0)) + if rid in self.rids_rev: + self.lp.set_col_bnd(self.rid2idx_rev[rid], np.fmax(np.flip(-bounds), 0)) + + def get_coefficients(self, objective): + """get coefficients for objective or constraint function + + :param objective: objective/constraint coefficients + :type objective: dict with key: reaction id, value: stoichiometric coefficient + :return: coefficients aligned with the linear problem + :rtype: 1D ndarray with float + """ + coefs = np.zeros(len(self.rids) + len(self.rids_rev)) + for rid, coef in objective.items(): + coefs[self.rid2idx[rid]] = coef + if rid in self.rids_rev: + coefs[self.rid2idx_rev[rid]] = -coef + return coefs + + def remove_reactions(self, drop_rids): + """remove reactions and blocked metabolites from model and lp problem + + :param drop_rids: reaction ids to be removed from the model + :type drop_rids: set of str + """ + assert self.lp is not None + + # remove variables (columns) from linear problem + col_idxs0_fwd = [self.rid2idx[rid] for rid in drop_rids] + col_idxs0_rev = [self.rid2idx_rev[rid] for rid in drop_rids if rid in self.rids_rev] + self.lp.remove_cols(col_idxs0_fwd + col_idxs0_rev) + + keep_rids_mask = np.array([False if rid in drop_rids else True for rid in self.rids]) + self.rids = self.rids[keep_rids_mask] + self.rid2idx = {rid: idx for idx, rid in enumerate(self.rids)} + self.reversible = self.reversible[keep_rids_mask] + self.update_reversible_rids() + self.flux_bounds = self.flux_bounds[:, keep_rids_mask] + self.s_mat = self.s_mat[:, keep_rids_mask] + + # drop blocked metabolites due to deleted reactions + keep_sids_mask = np.any(self.s_mat != 0, axis=1) + row_idxs0 = [idx for idx, val in enumerate(keep_sids_mask) if val == np.bool_(False)] + if len(row_idxs0) > 0: + self.lp.remove_rows(row_idxs0) + self.sids = self.sids[keep_sids_mask] + self.sid2idx = {sid: idx for idx, sid in enumerate(self.sids)} + self.s_mat = self.s_mat[keep_sids_mask, :] + + def fba_pf_optimize(self, pf, zero_flux_rid=None): + """FBA optimization of current problem + + zero_flux_rids overwrites settings in protected functions + + :param pf: protected function (objective) + :type pf: class ProtectedFunction + :param zero_flux_rid: reaction id of candiated checked for removal + :type zero_flux_rid: str, optional (default: None) + :return: results information from the optimization + :rtype: dict + """ + if self.lp is None: + self.lp = self.create_core_fba_lp() + print('Core LP Problem created') + assert self.lp is not None + + if zero_flux_rid is not None: + overwrite_bounds = pf.overwrite_bounds.copy() + overwrite_bounds[zero_flux_rid] = np.array([0.0, 0.0]) + else: + overwrite_bounds = pf.overwrite_bounds + + self.set_lp_objective(pf.objective, pf.obj_dir) + self.update_lp_flux_bounds(overwrite_bounds) + res = self.lp.solve() + # print(res['success'], res['fun']) + self.reset_lp_flux_bounds(overwrite_bounds.keys()) + self.remove_lp_objective(pf.objective) + return res + + def check_pf_feasibility(self, pf, zero_flux_rid=None): + """check that current model satisfies the protected functions + + :param pf: protected function (objective) + :type pf: class ProtectedFunction + :param zero_flux_rid: reaction id of candiated checked for removal + :type zero_flux_rid: str, optional (default: None) + :return: status if protected function satisfied + :rtype: bool + """ + feasible = True + res = self.fba_pf_optimize(pf, zero_flux_rid) + if res['success'] is False: + feasible = False + elif pf.obj_dir == 'maximize': + if res['fun'] < pf.target_lb: + feasible = False + else: + if res['fun'] > pf.target_ub: + feasible = False + return feasible + + def fva_pf_flux_ranges(self, pf, rids): + """FVA for a specific protected function for all or selected reactions + + :param pf: protected function (objective) + :type pf: class ProtectedFunction + :param rids: list of reaction ids to get flux range + :type rids: list of str + :return: rid, min and max flux values + :rtype: dict + """ + # check that problem is feasible + if self.check_pf_feasibility(pf) is False: + return {'success': False} + + # implement objective as a constraint and overwrite bounds + row_bnds = (np.array([pf.target_lb, np.nan]) if pf.obj_dir == 'maximize' + else np.array([np.nan, pf.target_ub])) + row_idx0 = self.lp.add_constraint(self.get_coefficients(pf.objective), row_bnds) + self.update_lp_flux_bounds(pf.overwrite_bounds) + + flux_min = np.zeros(len(rids)) + flux_max = np.zeros(len(rids)) + for idx, rid in enumerate(rids): + self.set_lp_objective({rid: 1}, 'minimize') + res = self.lp.solve() + flux_min[idx] = res['fun'] if res['success'] else np.nan + self.lp.set_obj_dir('maximize') + res = self.lp.solve() + flux_max[idx] = res['fun'] if res['success'] else np.nan + self.remove_lp_objective({rid: 1}) + + self.reset_lp_flux_bounds(pf.overwrite_bounds.keys()) + self.lp.del_constraint(row_idx0) + + res = {'success': True, 'rids': np.array(rids), 'min': flux_min, 'max': flux_max} + # print(res) + return res + + def fva_single_rid(self, rid): + """FVA - flux variability analysis for all or selected reactions. + + :param rid: selected reaction ids to run FVA (alternatively, all) + :type rid: list of strings (default: None - all reactions) + :return: rid, min and max flux values + :rtype: dict + """ + len(self.rids) + print('fva_single_rid not implemented', rid) + return {'success': False} + + def copy(self): + """Copy the FbaModel instance (while removing linear problem). + + A shallow copy, to support multiprocessing of FVA, + which do not impact FBA model attributes, except for linear problem + """ + cls = self.__class__ + result = cls.__new__(cls) + result.__dict__.update(self.__dict__) + result.__dict__['lp'] = None + return result + + def __reduce__(self): + return self.__class__, (self.sbml_fname, ) + + def __del__(self): + print('deletion of fba_model') + del self.lp diff --git a/modelpruner/problems/lp_problem.py b/modelpruner/problems/lp_problem.py index e346a33011f0aaf3a223a29d2efed89d1b214495..0d094a52a0d1a5cc132b6f3a6ace6387b869f530 100644 --- a/modelpruner/problems/lp_problem.py +++ b/modelpruner/problems/lp_problem.py @@ -61,18 +61,58 @@ scaling_options = {'GM': glpk.GLP_SF_GM, class LpProblem: - def __init__(self): - """create a glpk problem. + def __init__(self, constr_coefs, row_bnds, col_bnds): + """create core linear problem (constraints, variable and row bounds) Problem needs to be properly deleted usin del(LpProblem) + Note: glpk related indices start at 1 (not zero) + :param constr_coefs: matrix with constraint coefficients + :type constr_coefs: 2D nparray with shape(nrows, ncols) + :param row_bnds: lower and upper bounds of auxhiliary (row) variables + :type row_bnds: 2D ndarray of shape(nrows, 2) (1st col: lower bounds, 2nd col: upper bounds) + :param col_bnds: lower and upper bounds of structural (col) variables + :type col_bnds: 2D ndarray of shape(2, ncols) (1st row: lower bounds, 2nd row: upper bounds) """ - self.lp = glpk.glp_create_prob() - self.ncols = 0 - self.nrows = 0 - self.msg_lev = glpk.GLP_MSG_ERR - self.tm_lim = 10 * 1000 + glp_msg_lev = glpk.GLP_MSG_ERR + glp_tm_lim = 10 * 1000 + glp_presolve = glpk.GLP_OFF # or glpk.GLP_OFF (default) or GLP_ON self.res = {} + self.nrows, self.ncols = constr_coefs.shape + assert col_bnds.shape == (2, self.ncols) + assert row_bnds.shape == (self.nrows, 2) + + self.lp = glpk.glp_create_prob() + + # configure variables/columns and related bounds + glpk.glp_add_cols(self.lp, self.ncols) + for i, lb_ub in enumerate(col_bnds.T): + var_type, lb, ub = self._get_variable_type(lb_ub) + glpk.glp_set_col_bnds(self.lp, 1 + i, var_type, lb, ub) + + # configure rows/constraints + glpk.glp_add_rows(self.lp, self.nrows) + for i, lb_ub in enumerate(row_bnds): + var_type, lb, ub = self._get_variable_type(lb_ub) + glpk.glp_set_row_bnds(self.lp, 1 + i, var_type, lb, ub) + constr_coefs_sparse = scipy.sparse.coo_matrix(constr_coefs) + n_coefs = len(constr_coefs_sparse.row) + ia = glpk.intArray(1 + n_coefs) + ja = glpk.intArray(1 + n_coefs) + ar = glpk.doubleArray(1 + n_coefs) + for i in range(n_coefs): + ia[1 + i] = int(1 + constr_coefs_sparse.row[i]) + ja[1 + i] = int(1 + constr_coefs_sparse.col[i]) + ar[1 + i] = constr_coefs_sparse.data[i] + glpk.glp_load_matrix(self.lp, n_coefs, ia, ja, ar) + + # configure options + self._smcp = glpk.glp_smcp() + glpk.glp_init_smcp(self._smcp) + self._smcp.msg_lev = glp_msg_lev + self._smcp.tm_lim = glp_tm_lim + self._smcp.presolve = glp_presolve + @staticmethod def _get_variable_type(bounds): """Determine variable type for upper/lower bounds. @@ -105,76 +145,42 @@ class LpProblem: var_type = glpk.GLP_LO return var_type, lb, ub - def configure(self, obj_coefs, constr_coefs, row_bnds, col_bnds, direction='maximize'): - """Configure the LP Problem. + def set_obj_coefs(self, obj_coefs): + """set coefficients for objective function + + configure coefficients that are not zero Note: glpk related indices start at 1 (not zero) :param obj_coefs: coefficients of objective function :type obj_coefs: 1D ndarray of shape (1, ncols) - :param constr_coefs: matrix with constraint coefficients - :type constr_coefs: 2D nparray with shape(nrows, ncols) - :param row_bnds: lower and upper bounds of auxhiliary (row) variables - :type row_bnds: 2D ndarray of shape(nrows, 2) (1st col: lower bounds, 2nd col: upper bounds) - :param col_bnds: lower and upper bounds of structural (col) variables - :type col_bnds: 2D ndarray of shape(2, ncols) (1st row: lower bounds, 2nd row: upper bounds) - :param direction: direction for the optimization ('maximize' or 'minimize') - :type direction: string """ - self.nrows, self.ncols = constr_coefs.shape - assert len(obj_coefs) == self.ncols - assert col_bnds.shape == (2, self.ncols) - assert row_bnds.shape == (self.nrows, 2) + for i in range(self.ncols): + if obj_coefs[i] != 0.0: + glpk.glp_set_obj_coef(self.lp, int(1 + i), obj_coefs[i]) - # configure objective function - glpk.glp_add_cols(self.lp, self.ncols) - self.set_obj_coefs(obj_coefs) - self.set_obj_dir(direction) + def remove_obj_coefs(self, obj_coefs): + """remove coefficients from objective function - # configure bounds on structural (columns) variables - for i, lb_ub in enumerate(col_bnds.T): - var_type, lb, ub = self._get_variable_type(lb_ub) - glpk.glp_set_col_bnds(self.lp, 1 + i, var_type, lb, ub) + set coefficients that are not zero to zero in the liner problem - # configure constraints - glpk.glp_add_rows(self.lp, self.nrows) - for i, lb_ub in enumerate(row_bnds): - var_type, lb, ub = self._get_variable_type(lb_ub) - glpk.glp_set_row_bnds(self.lp, 1 + i, var_type, lb, ub) - constr_coefs_sparse = scipy.sparse.coo_matrix(constr_coefs) - n_coefs = len(constr_coefs_sparse.row) - ia = glpk.intArray(1 + n_coefs) - ja = glpk.intArray(1 + n_coefs) - ar = glpk.doubleArray(1 + n_coefs) - for i in range(n_coefs): - ia[1 + i] = int(1 + constr_coefs_sparse.row[i]) - ja[1 + i] = int(1 + constr_coefs_sparse.col[i]) - ar[1 + i] = constr_coefs_sparse.data[i] - glpk.glp_load_matrix(self.lp, n_coefs, ia, ja, ar) - - def replace_constraint(self, row, constr_coefs, row_bnds): - # set/replace one bounded constraint - # - # Parameters: - # row: row id to be replaced - # coeffs: 1D np.array of floats of size number design variables - # coefficients != 0.0 will be configured - # bounds: 1D numpy.ndarray with float of size two - # lists and tuples will be converted to numpy.ndarray - # lower_bound, upper_bound, np.nan and np.inf supported + Note: glpk related indices start at 1 (not zero) + :param obj_coefs: coefficients of objective function + :type obj_coefs: list-like of floats + """ + assert len(obj_coefs) == self.ncols + for i in range(self.ncols): + if obj_coefs[i] != 0.0: + glpk.glp_set_obj_coef(self.lp, int(1 + i), 0.0) - assert row <= glpk.glp_get_num_rows(self.lp) - glpk.glp_set_row_bnds(self.lp, row, *self._get_variable_type(row_bnds)) + def set_obj_dir(self, direction): + """Configure the optimization direction - n_coef = len(np.nonzero(constr_coefs)[0]) - ind = glpk.intArray(1 + n_coef) - val = glpk.doubleArray(1 + n_coef) - idx = 1 - for i in np.nonzero(constr_coefs)[0]: - ind[idx] = int(1 + i) - val[idx] = constr_coefs[i] - idx += 1 - glpk.glp_set_mat_row(self.lp, row, n_coef, ind, val) + :param direction: direction for the optimization ('maximize' or 'minimize') + :type direction: string + """ + obj_dir = glpk.GLP_MAX if 'max' in direction else glpk.GLP_MIN + glpk.glp_set_obj_dir(self.lp, obj_dir) def add_constraint(self, constr_coefs, row_bnds): """add a single constraint to the lp problem @@ -183,108 +189,93 @@ class LpProblem: :type constr_coefs: 1d ndarray of length ncols :param row_bnds: upper and lower bounds of the auxiliary (row) variable :type row_bnds: 1D ndarray of length 2 - :return: + :return: row index (start from 0) + :rtype: int """ - row = glpk.glp_add_rows(self.lp, 1) - self.replace_constraint(row, constr_coefs, row_bnds) - return row - - def del_constraint(self, row): - # delete one constraint - # - # Parameters: - # row: row id to be replaced - assert row <= glpk.glp_get_num_rows(self.lp) + row_idx1 = glpk.glp_add_rows(self.lp, 1) + glpk.glp_set_row_bnds(self.lp, row_idx1, *self._get_variable_type(row_bnds)) + + n_coef = len(np.nonzero(constr_coefs)[0]) + ind = glpk.intArray(n_coef + 1) + val = glpk.doubleArray(n_coef + 1) + for num_idx0, col_idx0 in enumerate(np.nonzero(constr_coefs)[0]): + ind[num_idx0 + 1] = int(col_idx0 + 1) + val[num_idx0 + 1] = constr_coefs[col_idx0] + glpk.glp_set_mat_row(self.lp, row_idx1, n_coef, ind, val) + self.nrows += 1 + row_idx0 = row_idx1 - 1 + return row_idx0 + + def del_constraint(self, row_idx0): + """remove single constraint from the lp problem + + :param row_idx0: row index (start from 0) to be removed + :type row_idx0: int + """ + assert row_idx0 < self.nrows num = glpk.intArray(1 + 1) - num[1] = row + num[1] = row_idx0 + 1 glpk.glp_del_rows(self.lp, 1, num) - - def set_single_bound(self, react_idx, bounds): - assert react_idx <= self.ncols - if type(bounds) is not np.ndarray: - bounds = np.array(bounds) - glpk.glp_set_col_bnds(self.lp, 1 + react_idx, *self._get_variable_type(bounds)) - - def set_obj_dir(self, direction): - """Configure the optimization direction - - :param direction: direction for the optimization ('maximize' or 'minimize') - :type direction: string + self.nrows -= 1 + + def set_col_bnd(self, col_idx0, bounds): + """Set a single column (variable) bound + :param col_idx0: column index (start at 0) of variable + :type col_idx0: int + :param bounds: lower and upper bound to set + :type bounds: list with two float values """ - obj_dir = glpk.GLP_MAX if 'max' in direction else glpk.GLP_MIN - glpk.glp_set_obj_dir(self.lp, obj_dir) - - def reset_cost(self, cost_coef=None): - # reset coefficients of objective function (cols) to 0.0 - # - # Parameters: - # cost_coef: None or 1d numpy.ndarray with floats - # None: reset all cost coefficiencs - # ndarray: reset only coest coefficiencs != 0 - if cost_coef is None: - for i in range(glpk.glp_get_num_cols(self.lp)): - glpk.glp_set_obj_coef(self.lp, int(1 + i), 0.0) - else: - for i in np.nonzero(cost_coef)[0]: - glpk.glp_set_obj_coef(self.lp, int(1 + i), 0.0) + assert col_idx0 < self.ncols + glpk.glp_set_col_bnds(self.lp, col_idx0 + 1, *self._get_variable_type(bounds)) - def set_obj_coefs(self, obj_coefs): - """set coefficients for objective function + def remove_cols(self, col_idxs0): + """ remove variables (columns) from the linear problem - :param obj_coefs: coefficients of objective function - :type obj_coefs: 1D ndarray of shape (1, ncols) + :param col_idxs0: col indices (start at 0) to be removed + :type col_idxs0: list or 1D ndarray of col indices """ - for i in range(self.ncols): - glpk.glp_set_obj_coef(self.lp, int(1 + i), obj_coefs[i]) - - def solve(self, short_result=False, scaling=None): + num = glpk.intArray(len(col_idxs0) + 1) + for num_idx0, col_idx0 in enumerate(col_idxs0): + assert col_idx0 < self.ncols + num[num_idx0 + 1] = col_idx0 + 1 + glpk.glp_del_cols(self.lp, len(col_idxs0), num) + self.ncols -= len(col_idxs0) + + def remove_rows(self, row_idxs0): + """ remove constraints (rows) from the linear problem. + + :param row_idxs0: row indices (start at 0) to be removed + :type row_idxs0: list or 1D ndarray of row indices + """ + num = glpk.intArray(len(row_idxs0) + 1) + for num_idx0, row_idx0 in enumerate(row_idxs0): + assert row_idx0 < self.nrows + num[num_idx0 + 1] = row_idx0 + 1 + glpk.glp_del_rows(self.lp, len(row_idxs0), num) + self.nrows -= len(row_idxs0) + + def solve(self, short_result=True, scaling=None): """Solve the Linear Programming Problem. :param short_result: short result (only objective value and status) or long result - :type short_result: bool (default:False) + :type short_result: bool (optional, default: True) :param scaling: problem scaling options ('GM', 'EQ', '2N', 'SKIP', 'AUTO' - :type scaling: string (default, None) - :return: result information from the optimization stored in a dictionary + :type scaling: string (optional, default: None) + :return: result from linear optimization :rtype: dict """ - # configure options - smcp = glpk.glp_smcp() - glpk.glp_init_smcp(smcp) - smcp.msg_lev = self.msg_lev - smcp.tm_lim = self.tm_lim - if scaling is not None: - opt = scaling_options.get(scaling, glpk.GLP_SF_AUTO) - glpk.glp_scale_prob(self.lp, opt) - # sjj = [glpk.glp_get_sjj(self.lp, 1+i) for i in range(glpk.glp_get_num_cols(self.lp))] - # rii = [glpk.glp_get_rii(self.lp, 1+i) for i in range(glpk.glp_get_num_rows(self.lp))] - # TODO: glpk.glp_adv_basis(self.lp, 0) - simplex_result = glpk.glp_simplex(self.lp, smcp) - return self.results(simplex_result, short_result) - - def get_row_value(self, row): - """Retrieve right-hand side of a constraint (auxiliary variable) - - :param row: index into constraints - :type row: int - :return: value of auxiliary varibale (value of rhs) - :rtype: float - """ - assert row <= glpk.glp_get_num_rows(self.lp) - return glpk.glp_get_row_prim(self.lp, row) + glpk.glp_scale_prob(self.lp, scaling_options.get(scaling, glpk.GLP_SF_AUTO)) - def results(self, simplex_result, short_result=False): - """Collect results for the optimization + simplex_result = glpk.glp_simplex(self.lp, self._smcp) - Alternatively, reduced results, e.g. when doing pFBA + # in case of unsuccessful result, set basis and redo optimization + if glpk.glp_get_status(self.lp) != glpk.GLP_OPT: + glpk.glp_adv_basis(self.lp, 0) + # print('GLP ADV BASIS') + # glpk.glp_cpx_basis(self.lp) + simplex_result = glpk.glp_simplex(self.lp, self._smcp) - :param simplex_result: result from the LP solver - :type simplex_result: int - :param short_result: short result (only objective value and status) or long result - :type short_result: bool (default:False) - :return: result from the FBA optimization - :rtype: dict - """ self.res = { 'fun': glpk.glp_get_obj_val(self.lp), 'success': glpk.glp_get_status(self.lp) == glpk.GLP_OPT, @@ -292,16 +283,31 @@ class LpProblem: } if short_result is False: self.res['x'] = np.array([glpk.glp_get_col_prim(self.lp, 1 + i) for i in range(self.ncols)]) - self.res['reduced_costs'] = np.array([glpk.glp_get_col_dual(self.lp, 1 + i) for i in range(self.ncols)]) - self.res['shadow_prices'] = np.array([glpk.glp_get_row_dual(self.lp, 1 + i) for i in range(self.nrows)]) + self.res['reduced_costs'] = np.array([glpk.glp_get_col_dual(self.lp, 1 + i) + for i in range(self.ncols)]) + self.res['shadow_prices'] = np.array([glpk.glp_get_row_dual(self.lp, 1 + i) + for i in range(self.nrows)]) self.res['simplex_result'] = simplex_result self.res['simplex_message'] = simplex_status[simplex_result], self.res['primal_status'] = prim_status[glpk.glp_get_prim_stat(self.lp)] self.res['dual_status'] = dual_status[glpk.glp_get_dual_stat(self.lp)] return self.res + def get_row_value(self, row): + """Retrieve right-hand side of a constraint (auxiliary variable) + + :param row: index into constraints + :type row: int + :return: value of auxiliary varibale (value of rhs) + :rtype: float + """ + assert row <= glpk.glp_get_num_rows(self.lp) + return glpk.glp_get_row_prim(self.lp, row) + def __del__(self): """Explicitly delete the LpProblem. (can we do without?) """ + print('deletion of LpProblem') + assert self.lp is not None if getattr(self, 'lp'): glpk.glp_delete_prob(self.lp)