Skip to content
Snippets Groups Projects
Commit 50ebc192 authored by Peter Schubert's avatar Peter Schubert
Browse files

Initial base functionality

parent 4d53d81f
Branches
No related tags found
No related merge requests found
import sys
import os
import re
import time
import numpy as np
import pickle
import sbmlxdf
from modelpruner.models.fba_model import FbaModel
......@@ -12,14 +14,25 @@ from modelpruner.core.protected_parts import ProtectedParts
class ModelPruner:
def __init__(self, sbml_fname, protected_parts, reduced_fname=None):
def __init__(self, sbml_fname, protected_parts, reduced_fname=None, resume=False):
if reduced_fname is None:
self.reduced_fname = re.sub('.xml', '_red.xml', sbml_fname)
self.reduced_fname = re.sub(r'.xml$', '_red.xml', sbml_fname)
self._snapshot_sbml = re.sub(r'.xml$', '_snapshot.xml', sbml_fname)
self._snapshot_pkl = re.sub(r'.xml$', '_snapshot.pkl', sbml_fname)
if resume is True:
if os.path.exists(self._snapshot_sbml):
self.sbml_model = sbmlxdf.Model(self._snapshot_sbml)
print(f'Resume from snapshot from {self._snapshot_sbml} '
f'(last modified: {time.ctime(os.path.getmtime(self._snapshot_sbml))})')
else:
print(f'{self._snapshot_sbml} not found!')
raise FileNotFoundError
else:
if os.path.exists(sbml_fname):
self.sbml_model = sbmlxdf.Model(sbml_fname)
print(f'Protected data loaded: {sbml_fname} '
print(f'Full SBML model loaded from {sbml_fname} '
f'(last modified: {time.ctime(os.path.getmtime(sbml_fname))})')
else:
print(f'{sbml_fname} not found!')
......@@ -29,80 +42,144 @@ class ModelPruner:
self.fba_model = FbaModel(self.sbml_model)
self.nrp = ProtectedParts(protected_parts)
self.protected_sids = self.nrp.initial_protected_sids
if resume is True and os.path.exists(self._snapshot_pkl):
with open(self._snapshot_pkl, 'rb') as f:
self.protected_rids = pickle.load(f)
print(f'snapshot restored at {len(self.fba_model.rids):4d} remaining reactions '
f'from: {self._snapshot_pkl}')
else:
self.protected_rids = self.get_total_protected_rids(self.nrp.initial_protected_rids)
self.fba_model.update_flux_bounds(self.nrp.overwrite_bounds)
self.base_obj_dir = self.fba_model.obj_dir
self.base_flux_bounds = self.fba_model.flux_bounds.copy()
self.base_coefs = self.fba_model.obj_coefs.copy()
self.protected_sids = set(self.nrp.initial_protected_sids)
self.protected_rids = self.get_total_protected_rids(set(self.nrp.initial_protected_rids))
self.essential_rids = self.protected_rids
self.free_rids = set(self.fba_model.rids).difference(self.essential_rids)
self.set_function_optimum()
self.check_protected_parts()
# overwrite bounds on fba
self.fba_model.update_flux_bounds(self.nrp.overwrite_bounds)
def check_protected_parts(self):
extra_p_rids = self.protected_rids.difference(set(self.fba_model.rids))
extra_p_sids = self.protected_sids.difference(set(self.fba_model.sids))
pf_rids = set()
for pf in self.nrp.protected_functions.values():
pf_rids = pf_rids.union({rid for rid in pf.objective})
pf_rids = pf_rids.union({rid for rid in pf.overwrite_bounds})
pf_rids |= {rid for rid in pf.objective}
pf_rids |= {rid for rid in pf.overwrite_bounds}
if pf.fba_success is False:
print(f'protected function {pf.obj_id} has unsuccessful FBA '
f' with optimal value {pf.optimal}')
extra_pf_rids = pf_rids.difference(set(self.fba_model.rids))
pf_unprotected_rids = pf_rids.difference(set(self.protected_rids))
if len(extra_p_rids) > 0:
print('protected reactions that do not exist in model:', extra_p_rids)
if len(extra_p_sids) > 0:
print('protected metabolites that do not exist in model:', extra_p_sids)
if len(extra_pf_rids) > 0:
print('reactions in protected function that do not exist in model:', extra_pf_rids)
print('reactions in protected functions that do not exist in model:', extra_pf_rids)
if len(pf_unprotected_rids) > 0:
print('reactions in protected functions that are not protected:', pf_unprotected_rids)
zero_flux_rids = self.get_blocked_reactions()
blocked_p_rids = self.protected_rids.intersection(zero_flux_rids)
blocked_pf_rids = pf_rids.intersection(zero_flux_rids)
blocked_rids = self.reaction_types_fva(self.fba_model.rids)['blocked_rids']
blocked_p_rids = self.nrp.initial_protected_sids.intersection(blocked_rids)
blocked_pf_rids = pf_rids.intersection(blocked_rids)
if len(blocked_p_rids) > 0:
print('protected reactions that are blocked (no flux):', blocked_p_rids)
if len(blocked_pf_rids) > 0:
print('reactions in protected function that are blocked (no flux):', blocked_pf_rids)
def get_blocked_reactions(self):
# identify blocked reactions
# check that none of the blocked reactions are protected
# actually remove blocked reactions, even they are included in
fba_solution = self.fba_model.fba_optimize()
zero_flux_mask = np.abs(fba_solution['x']) < self.tolerance
zero_flux_rids = self.fba_model.rids[zero_flux_mask]
fva_solution = self.fba_model.fva_optimize(zero_flux_rids, fract_of_optimum=0.0)
flux_ranges = np.vstack((fva_solution['min'], fva_solution['max'])).T
zero_flux_mask = np.all(abs(flux_ranges) < self.tolerance, axis=1)
zero_flux_rids = set(fva_solution['rids'][zero_flux_mask])
return zero_flux_rids
def prune(self):
print('reactions in protected functions that are blocked (no flux):', blocked_pf_rids)
def prune(self, snapshot_freq=None):
print(f'initial S-matrix: {self.fba_model.s_mat.shape}; '
f' protected reactions: {len(self.protected_rids)}:', self.protected_rids)
self.remove_parallel_reactions()
print('protected functions feasibility:', self.check_pf_feasibility())
def reaction_types_fva(self):
if type(snapshot_freq) is int:
next_snapshot = max(0, len(self.fba_model.rids) - snapshot_freq)
else:
next_snapshot = None
while len(self.protected_rids) < self.fba_model.s_mat.shape[1]:
free_rids = list(set(self.fba_model.rids).difference(self.protected_rids))
reaction_types = self.reaction_types_fva(free_rids)
if len(reaction_types['blocked_rids']) > 0:
self.fba_model.remove_reactions(reaction_types['blocked_rids'], self.protected_rids)
print(f"{len(reaction_types['blocked_rids'])} blocked reaction(s) removed:",
reaction_types['blocked_rids'])
if len(reaction_types['essential_rids']) > 0:
self.protected_rids |= reaction_types['essential_rids']
print(f"{len(reaction_types['essential_rids'])} reaction(s) added to protected reactions:",
reaction_types['essential_rids'])
print(f"{len(reaction_types['candidate_rids_sorted'])} candidates:",
reaction_types['candidate_rids_sorted'][:4], '...')
# try to remove one reaction while still satisfying all protected functions
feasible = False
essential_rids = set()
for rid in reaction_types['candidate_rids_sorted']:
orig_flux_bounds = self.fba_model.get_flux_bounds(rid)
self.fba_model.update_flux_bounds({rid: [0.0, 0.0]})
feasible = self.check_pf_feasibility()
self.fba_model.update_flux_bounds({rid: orig_flux_bounds})
if feasible:
self.fba_model.remove_reactions({rid}, self.protected_rids)
print(f'{rid} removed from the model')
break
else:
essential_rids.add(rid)
if len(essential_rids) > 0:
self.protected_rids |= essential_rids
print(f"{len(essential_rids)} reaction(s) added to protected reactions:", essential_rids)
if feasible is False:
print('no more reactions to remove')
# store intermediate results (snapshots)
print(f'snapshot check next: {next_snapshot}, rids: {len(self.fba_model.rids)}')
if (next_snapshot is not None) and (len(self.fba_model.rids) < next_snapshot):
self.export_pruned_model(self._snapshot_sbml)
with open(self._snapshot_pkl, 'wb') as f:
pickle.dump(self.protected_rids, f)
next_snapshot = max(0, len(self.fba_model.rids) - snapshot_freq)
print('snapshot data saved.')
print(f'S-matrix: {self.fba_model.s_mat.shape}; '
f'protected reactions: {len(self.protected_rids)}')
# check if blocked reactions exist in the finally pruned model
blocked_rids = self.reaction_types_fva(self.fba_model.rids)['blocked_rids']
if len(blocked_rids) > 0:
print(f'{len(blocked_rids)} blocked reaction(s) in pruned model', blocked_rids)
# remove any snapshot files
for fname in [self._snapshot_sbml, self._snapshot_pkl]:
if os.path.exists(fname):
os.remove(fname)
def reaction_types_fva(self, free_rids):
flux_min_pfs = np.zeros((len(self.nrp.protected_functions), len(free_rids)))
flux_max_pfs = np.zeros((len(self.nrp.protected_functions), len(free_rids)))
n_pf = len(self.nrp.protected_functions)
for idx, pf in enumerate(self.nrp.protected_functions.values()):
self.set_temp_func_conditions(pf)
res = self.fba_model.fva_optimize(free_rids, fract_of_optimum=0.0)
self.restore_base_conditions()
if res['success'] is False:
print('FVA unsuccessful')
return {'blocked_rids': [], 'essential_rids': [], 'candidate_rids_sorted': []}
flux_min_pfs[idx] = res['min']
flux_max_pfs[idx] = res['max']
sys.stdout.write('\rFVA[{}{}] {:3.0f}%'.format(
'=' * int(idx+1), ' ' * int(n_pf - idx-1), (idx+1)/n_pf*100))
sys.stdout.flush()
print()
flux_min_pfs[np.abs(flux_min_pfs) < self.tolerance] = 0.0
flux_max_pfs[np.abs(flux_max_pfs) < self.tolerance] = 0.0
......@@ -144,18 +221,17 @@ class ModelPruner:
drop_group_rids = set(group_rids) - {reversible}
else:
drop_group_rids = set(group_rids[1:])
drop_rids = drop_rids.union(drop_group_rids)
drop_rids |= drop_group_rids
self.fba_model.remove_reactions(drop_rids, self.protected_sids)
print(f'{len(drop_rids)} parallel reactions dropped:', drop_rids)
print(f'{len(drop_rids)} parallel reaction(s) dropped:', drop_rids)
def set_function_optimum(self):
"""using FBA to set optimal value for protected functions"""
for obj_id in self.nrp.protected_functions:
pf = self.nrp.protected_functions[obj_id]
for obj_id, pf in self.nrp.protected_functions.items():
self.set_temp_func_conditions(pf)
res = self.fba_model.fba_optimize(short_result=True)
self.restore_base_conditions()
pf.optimal = res['fun']
pf.fba_success = res['success']
......@@ -165,8 +241,6 @@ class ModelPruner:
else:
pf.set_target_lb(pf.target_frac * pf.optimal)
self.restore_base_conditions()
def get_total_protected_rids(self, protected_rids):
# identify reactions that need protection due to protected metabolites
# over and above the protected rids provided in protected parts
......@@ -179,11 +253,12 @@ class ModelPruner:
# identify rids in protected functions and ensure they are protected
pf_rids = set()
for pf in self.nrp.protected_functions.values():
pf_rids = pf_rids.union({rid for rid in pf.objective})
pf_rids = pf_rids.union({rid for rid in pf.overwrite_bounds})
pf_rids |= {rid for rid in pf.objective}
pf_rids |= {rid for rid in pf.overwrite_bounds}
total_protect_rids = protected_rids.union(protected_sid_rids).union(pf_rids)
return total_protect_rids
protected_rids |= protected_sid_rids
protected_rids |= pf_rids
return protected_rids
def check_pf_feasibility(self):
"""check that current model still satisfies the protected functions
......@@ -217,8 +292,8 @@ class ModelPruner:
def restore_base_conditions(self):
self.fba_model.obj_dir = self.base_obj_dir
self.fba_model.flux_bounds = self.base_flux_bounds
self.fba_model.obj_coefs = self.base_coefs
self.fba_model.flux_bounds = self.base_flux_bounds.copy()
self.fba_model.obj_coefs = self.base_coefs.copy()
def get_reduction_status(self):
""" Return status in model reduction process
......@@ -231,12 +306,18 @@ class ModelPruner:
status = self.fba_model.get_model_status()
status['n_protected_rids'] = len(self.protected_rids)
status['n_protected_sids'] = len(self.protected_sids)
status['essential_rids'] = len(self.essential_rids)
return status
def print_status(self):
status = self.get_reduction_status()
print("reactions removed/remaining/protected: "
f"{status['n_full_rids' ] -status['n_red_rids']:4d}/{status['n_red_rids']:4d}/"
f"{status['essential_rids']:4d}; dof: {status['dof']:3d}")
f"{status['n_full_rids' ] - status['n_rids']:4d}/{status['n_rids']:4d}/"
f"/{len(self.protected_rids):4d}; dof: {status['dof']:3d}")
def export_pruned_model(self, pruned_sbml):
success = self.fba_model.export_pruned_model(pruned_sbml)
if success is True:
print('pruned model exported to', pruned_sbml)
else:
print('pruned model could not be exported')
......@@ -14,7 +14,7 @@ class ProtectedParts:
if type(protected_parts) == str:
if os.path.exists(protected_parts):
nrp = self.load_raw_protected_data(protected_parts)
print(f'Protected data loaded: {protected_parts} '
print(f'Protected data loaded from {protected_parts} '
f'(last modified: {time.ctime(os.path.getmtime(protected_parts))})')
else:
print(f'{protected_parts} not found!')
......@@ -23,8 +23,8 @@ class ProtectedParts:
assert type(protected_parts) == dict, 'expected filename or a dict'
nrp = protected_parts
self.initial_protected_rids = nrp.get('reactions', [])
self.initial_protected_sids = nrp.get('metabolites', [])
self.initial_protected_rids = set(nrp.get('reactions', []))
self.initial_protected_sids = set(nrp.get('metabolites', []))
if 'bounds' in nrp:
num_cols = ['fbcLb', 'fbcUb']
nrp['bounds'][num_cols] = nrp['bounds'][num_cols].applymap(
......
import re
import math
import numpy as np
import sbmlxdf
......@@ -11,10 +12,10 @@ class FbaModel:
def __init__(self, sbml_model):
model_dict = sbml_model.to_df()
df_reactions = model_dict['reactions']
df_species = model_dict['species']
df_fbc_objectives = model_dict['fbcObjectives']
self.model_dict = sbml_model.to_df()
df_reactions = self.model_dict['reactions']
df_species = self.model_dict['species']
df_fbc_objectives = self.model_dict['fbcObjectives']
self.full_model_shape = {'n_full_rids': len(df_reactions),
'n_full_sids': len(df_species)}
......@@ -105,6 +106,7 @@ class FbaModel:
if lp is None:
return {'success': False, 'message': 'not an FBA model'}
res = lp.solve(short_result)
del lp
if short_result is False:
res['x'] = self.combine_fwd_bwd(res['x'])
res['reduced_costs'] = self.combine_fwd_bwd(res['reduced_costs'])
......@@ -125,12 +127,14 @@ class FbaModel:
return {'success': False, 'message': 'not an FBA model'}
res = lp.solve(short_result=True)
if res['success'] is False:
del lp
return {'success': False, 'message': res['generic_status']}
if rids is None:
rids = self.rids
flux_min = np.zeros(len(rids))
flux_max = np.zeros(len(rids))
if res['success']:
# fix FBA objective as constraint
fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible]))
if self.obj_dir == 'maximize':
......@@ -157,8 +161,8 @@ class FbaModel:
lp.set_obj_dir('maximize')
res = lp.solve(short_result=True)
flux_max[idx] = res['fun'] if res['success'] else np.nan
return {'rids': np.array(rids), 'min': flux_min, 'max': flux_max}
del lp
return {'success': True, 'rids': np.array(rids), 'min': flux_min, 'max': flux_max}
def update_flux_bounds(self, flux_bounds):
"""selectivly set lower/upper flux bounds for given reactions.
......@@ -169,6 +173,7 @@ class FbaModel:
:type flux_bounds: dict (key: reaction id, value: list with two float values for lower/upper bound)
"""
for rid, (lb, ub) in flux_bounds.items():
assert rid in self.rids
if math.isfinite(lb):
self.flux_bounds[0, self.rid2idx[rid]] = lb
if lb < 0:
......@@ -176,6 +181,10 @@ class FbaModel:
if math.isfinite(ub):
self.flux_bounds[1, self.rid2idx[rid]] = ub
def get_flux_bounds(self, rid):
assert rid in self.rids
return [self.flux_bounds[0, self.rid2idx[rid]], self.flux_bounds[1, self.rid2idx[rid]]]
def remove_reactions(self, drop_rids, protected_sids):
# remove_reactions
# also remove corresponding metabolite columns
......@@ -242,8 +251,8 @@ class FbaModel:
:rtype: dict
"""
status = {'dof': self.s_mat.shape[1] - np.linalg.matrix_rank(self.s_mat),
'n_red_rids': len(self.rids),
'n_red_sids': len(self.sids)}
'n_rids': len(self.rids),
'n_sids': len(self.sids)}
status.update(self.full_model_shape)
return status
......@@ -257,3 +266,54 @@ class FbaModel:
self.__obj_dir = 'minimize'
else:
self.__obj_dir = 'maximize'
def export_pruned_model(self, pruned_sbml):
pruned_mdict = self.model_dict.copy()
pruned_mdict['reactions'] = self.model_dict['reactions'].loc[self.rids]
pruned_mdict['species'] = self.model_dict['species'].loc[self.sids]
# update model attributes (ids, names and modification date)
pruned_mdict['modelAttrs']['id'] += '_pruned'
pruned_mdict['modelAttrs']['name'] += '_pruned'
pruned_mdict['modelAttrs']['metaid'] += '_pruned'
pruned_mdict['modelAttrs']['modifiedHistory'] += '; localtime'
# clean groups table
keep_idxs = []
if 'groups' in pruned_mdict:
df_groups_pruned = pruned_mdict['groups'].copy()
for idx, row in df_groups_pruned.iterrows():
members_pruned = []
for member in sbmlxdf.misc.record_generator(row['members']):
params = sbmlxdf.misc.extract_params(member)
if 'idRef' in params:
if params['idRef'] in self.rids:
members_pruned.append(member)
else:
members_pruned.append(member)
df_groups_pruned.at[idx, 'members'] = '; '.join(members_pruned)
if len(members_pruned) > 0:
keep_idxs.append(idx)
pruned_mdict['groups'] = df_groups_pruned.loc[keep_idxs]
# clean gene products
gp_ids = set()
if 'fbcGeneProdAssoc' in pruned_mdict['reactions']:
for gpr in pruned_mdict['reactions']['fbcGeneProdAssoc']:
if type(gpr) is str:
if 'assoc=' in gpr:
gpr = re.sub('assoc=', '', gpr)
gpr = re.sub(r'[()]', ' ', gpr)
gpr = re.sub(r'and', ' ', gpr)
gpr = re.sub(r'or', ' ', gpr)
gp_ids = gp_ids.union(set(re.findall(r'\w+', gpr)))
if 'fbcGeneProducts' in pruned_mdict:
pruned_mdict['fbcGeneProducts'] = pruned_mdict['fbcGeneProducts'].loc[list(gp_ids)]
pruned_model = sbmlxdf.Model()
pruned_model.from_df(pruned_mdict)
err = pruned_model.validate_sbml()
if len(err) > 0:
print('SBML validation result: %s', ', '.join([k + ': ' + str(v) for k, v in err.items()]))
return pruned_model.export_sbml(pruned_sbml)
......@@ -14,43 +14,43 @@ import swiglpk as glpk
# status reported
simplex_status = {
0: 'LP problem instance successfully solved',
1: 'Unable to start search, initial basis specified in the '
glpk.GLP_EBADB: 'Unable to start search, initial basis specified in the '
'problem object is invalid. Number basic variables is not same as the number rows.',
2: 'Unable to start search, basis matrix corresponding to initial basis is singular.',
3: 'Unable to start search, basis matrix corresponding to initial basis is ill-conditioned.',
4: 'Unable to start search, some double-bounded variables have incorrect bounds.',
5: 'Search prematurely terminated: solver failure.',
6: 'Search prematurely terminated: objective function being '
glpk.GLP_ESING: 'Unable to start search, basis matrix corresponding to initial basis is singular.',
glpk.GLP_ECOND: 'Unable to start search, basis matrix corresponding to initial basis is ill-conditioned.',
glpk.GLP_EBOUND: 'Unable to start search, some double-bounded variables have incorrect bounds.',
glpk.GLP_EFAIL: 'Search prematurely terminated: solver failure.',
glpk.GLP_EOBJLL: 'Search prematurely terminated: objective function being '
'maximized reached its lower limit and continues decreasing.',
7: 'Search prematurely terminated: objective function being '
glpk.GLP_EOBJUL: 'Search prematurely terminated: objective function being '
'minimized reached its upper limit and continues increasing.',
8: 'Search prematurely terminated: simplex iteration limit exceeded.',
9: 'Search prematurely terminated: time limit exceeded.',
10: 'LP problem instance has no primal feasible solution.',
11: 'LP problem instance has no dual feasible solution.'}
generic_status = {1: 'solution is undefined',
2: 'solution is feasible',
3: 'solution is infeasible',
4: 'problem has no feasible solution',
5: 'solution is optimal',
6: 'problem has unbounded solution'}
dual_status = {1: 'dual solution is undefined',
2: 'dual solution is feasible',
3: 'dual solution is infeasible',
4: 'no dual feasible solution exists'}
prim_status = {1: 'primal solution is undefined',
2: 'primal solution is feasible',
3: 'primal solution is infeasible',
4: 'no primal feasible solution exists'}
var_status = {1: 'basic variable',
2: 'non-basic variable on its lower bound',
3: 'non-basic variable on its upper bound',
4: 'non-basic free (unbounded) variable',
5: 'non-basic fixed variable'}
glpk.GLP_EITLIM: 'Search prematurely terminated: simplex iteration limit exceeded.',
glpk.GLP_ETMLIM: 'Search prematurely terminated: time limit exceeded.',
glpk.GLP_ENOPFS: 'LP problem instance has no primal feasible solution.',
glpk.GLP_ENODFS: 'LP problem instance has no dual feasible solution.'}
generic_status = {glpk.GLP_UNDEF: 'solution is undefined',
glpk.GLP_FEAS: 'solution is feasible',
glpk.GLP_INFEAS: 'solution is infeasible',
glpk.GLP_NOFEAS: 'problem has no feasible solution',
glpk.GLP_OPT: 'solution is optimal',
glpk.GLP_UNBND: 'problem has unbounded solution'}
dual_status = {glpk.GLP_UNDEF: 'dual solution is undefined',
glpk.GLP_FEAS: 'dual solution is feasible',
glpk.GLP_INFEAS: 'dual solution is infeasible',
glpk.GLP_NOFEAS: 'no dual feasible solution exists'}
prim_status = {glpk.GLP_UNDEF: 'primal solution is undefined',
glpk.GLP_FEAS: 'primal solution is feasible',
glpk.GLP_INFEAS: 'primal solution is infeasible',
glpk.GLP_NOFEAS: 'no primal feasible solution exists'}
var_status = {glpk.GLP_BS: 'basic variable',
glpk.GLP_NL: 'non-basic variable on its lower bound',
glpk.GLP_NU: 'non-basic variable on its upper bound',
glpk.GLP_NF: 'non-basic free (unbounded) variable',
glpk.GLP_NS: 'non-basic fixed variable'}
scaling_options = {'GM': glpk.GLP_SF_GM,
'EQ': glpk.GLP_SF_EQ,
......@@ -69,6 +69,8 @@ class LpProblem:
self.lp = glpk.glp_create_prob()
self.ncols = 0
self.nrows = 0
self.msg_lev = glpk.GLP_MSG_ERR
self.tm_lim = 10 * 1000
self.res = {}
@staticmethod
......@@ -245,12 +247,18 @@ class LpProblem:
:return: result information from the optimization stored in a dictionary
:rtype: dict
"""
# configure options
smcp = glpk.glp_smcp()
glpk.glp_init_smcp(smcp)
smcp.msg_lev = self.msg_lev
smcp.tm_lim = self.tm_lim
if scaling is not None:
opt = scaling_options.get(scaling, glpk.GLP_SF_AUTO)
glpk.glp_scale_prob(self.lp, opt)
# sjj = [glpk.glp_get_sjj(self.lp, 1+i) for i in range(glpk.glp_get_num_cols(self.lp))]
# rii = [glpk.glp_get_rii(self.lp, 1+i) for i in range(glpk.glp_get_num_rows(self.lp))]
simplex_result = glpk.glp_simplex(self.lp, None)
simplex_result = glpk.glp_simplex(self.lp, smcp)
return self.results(simplex_result, short_result)
def get_row_value(self, row):
......@@ -279,6 +287,7 @@ class LpProblem:
self.res = {
'fun': glpk.glp_get_obj_val(self.lp),
'success': glpk.glp_get_status(self.lp) == glpk.GLP_OPT,
'generic_status': generic_status[glpk.glp_get_status(self.lp)],
}
if short_result is False:
self.res['x'] = np.array([glpk.glp_get_col_prim(self.lp, 1 + i) for i in range(self.ncols)])
......@@ -286,7 +295,6 @@ class LpProblem:
self.res['shadow_prices'] = np.array([glpk.glp_get_row_dual(self.lp, 1 + i) for i in range(self.nrows)])
self.res['simplex_result'] = simplex_result
self.res['simplex_message'] = simplex_status[simplex_result],
self.res['generic_status'] = generic_status[glpk.glp_get_status(self.lp)]
self.res['primal_status'] = prim_status[glpk.glp_get_prim_stat(self.lp)]
self.res['dual_status'] = dual_status[glpk.glp_get_dual_stat(self.lp)]
return self.res
......
This diff is collapsed.
File added
# sample_reduce_model.py
# sample script for network reduction of a SBML fbc model
import sys
import os
# import logging
import modelpruner
def prune_model():
# activate logging
# logfile = './log/nr.log'
# logformat = '%(asctime)s %(levelname)s:%(name)s %(message)s'
# logdir = os.path.dirname(logfile)
# if not os.path.exists(logdir):
# os.mkdir(logdir)
# logging.basicConfig(filename=logfile, filemode='w',
# level=logging.INFO, format=logformat)
# logging.info('Started')
# print some environment information
print('Python version: {:d}.{:d}.{:d}'.format(sys.version_info.major,
sys.version_info.minor,
sys.version_info.micro))
print('modelpruner version: {:s}'.format(modelpruner.__version__))
print('working directory :', os.getcwd())
# file names
full_sbml = 'sample_data/SBML_models/Deniz_model_fba.xml'
pruned_sbml = 'sample_data/SBML_models/Deniz_model_fba_reduced.xml'
protected_fname = 'sample_data/data/Deniz_model_fba_nrp2.xlsx'
# load the original model
mp = modelpruner.ModelPruner(full_sbml, protected_fname)
print('---- network reduction -----')
mp.prune()
# export pruned model in sbml format
mp.export_pruned_model(pruned_sbml)
print('pruned model converted to SBML:', pruned_sbml)
print('finally protected reactions:', mp.protected_rids)
print(mp.print_status())
# logging.info('Finished')
if __name__ == '__main__':
prune_model()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment