Skip to content
Snippets Groups Projects
Commit 4d53d81f authored by Peter Schubert's avatar Peter Schubert
Browse files

Initial Commit

parent 8bcf973d
Branches
No related tags found
No related merge requests found
*~
build/
dist/
__pycache__/
results/
*.so
*.lo
*.o
*.pyc
.idea/
.DS_Store
*.egg-info
docs/_build/
docs/_static/
docs/_templates/
log/
This diff is collapsed.
include LICENSE
# modelpruner
Prune a metabolic model (remove redundant reactions). Based on Network Reducer (Erdrich et al. 2015).
## Getting started
To make it easy for you to get started with GitLab, here's a list of recommended next steps.
Already a pro? Just edit this README.md and make it your own. Want to make it easy? [Use the template at the bottom](#editing-this-readme)!
## Add your files
- [ ] [Create](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#create-a-file) or [upload](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#upload-a-file) files
- [ ] [Add files using the command line](https://docs.gitlab.com/ee/gitlab-basics/add-file.html#add-a-file-using-the-command-line) or push an existing Git repository with the following command:
```
cd existing_repo
git remote add origin https://gitlab.cs.uni-duesseldorf.de/schubert/modelpruner.git
git branch -M main
git push -uf origin main
## modelpruner - prune metabolic models coded in (SBML)
This is a port to Python and improvement of the network reduction
MATLAB code (CNAreduceMFNetwork.m) implemented in **CellNetAnalyzer** [1].
Making use of functions provided by **glpk** [2] and **sbmlxdf**.
The network reduction method is based on **An algorithm for the reduction
of genome-scale metabolic network models to meaningful core models** [3].
This port does not implement the additional (optional) step of
network compression, which converts linear pathways to single reactions
and suppresses some biomass metabolites and transport reactions.
## Benefits of this port:
This is a port of the respective MATLAB code, mainly the functions
CNAreduceMFNetwork, CNAcompressMFNetwork, with following
enhancements:
- significant speedup (e.g. network reduction of genome-scale E. coli network
30 h -> 1 h on a laptop)
- direct interface to SBML coded models (import from SBML / export to SBML)
- use of open software
- modifications can be implemented easier in Python compared to
MATLAB (fewer people program in MATLAB)
- SBML identifiers, annotations, gene-product-associations,
genes and groups carried over transparently
from the full network to the pruned network, making integration
with external databases easier
- protected reactions, metabolites, functions, flux bound overwrites can be
provided in an Excel spreadsheet, alternatively as a Python dict
Speed-up improvements mainly due to faster implementation of flux variability
function calls on glpk. Additional speed improvements achieved by
continually removing reactions and blocked metabolites during the network
reduction process. Essential reactions identified earlier.
## install package:
```shell
$ pip3 install networkred@git+https://gitlab.cs.uni-duesseldorf.de/schubert/modelprune.git
```
## Integrate with your tools
- [ ] [Set up project integrations](https://gitlab.cs.uni-duesseldorf.de/schubert/modelpruner/-/settings/integrations)
## Collaborate with your team
- [ ] [Invite team members and collaborators](https://docs.gitlab.com/ee/user/project/members/)
- [ ] [Create a new merge request](https://docs.gitlab.com/ee/user/project/merge_requests/creating_merge_requests.html)
- [ ] [Automatically close issues from merge requests](https://docs.gitlab.com/ee/user/project/issues/managing_issues.html#closing-issues-automatically)
- [ ] [Enable merge request approvals](https://docs.gitlab.com/ee/user/project/merge_requests/approvals/)
- [ ] [Automatically merge when pipeline succeeds](https://docs.gitlab.com/ee/user/project/merge_requests/merge_when_pipeline_succeeds.html)
## Small Python example
## Test and Deploy
```python
import modelpruner
Use the built-in continuous integration in GitLab.
# file names
full_sbml = 'sample_data/SBML_models/Deniz_model_fba.xml'
pruned_sbml = 'sample_data/SBML_models/Deniz_model_fba_pruned.xml'
protected_parts = 'sample_data/data/Deniz_model_fba_nrp.xlsx'
- [ ] [Get started with GitLab CI/CD](https://docs.gitlab.com/ee/ci/quick_start/index.html)
- [ ] [Analyze your code for known vulnerabilities with Static Application Security Testing(SAST)](https://docs.gitlab.com/ee/user/application_security/sast/)
- [ ] [Deploy to Kubernetes, Amazon EC2, or Amazon ECS using Auto Deploy](https://docs.gitlab.com/ee/topics/autodevops/requirements.html)
- [ ] [Use pull-based deployments for improved Kubernetes management](https://docs.gitlab.com/ee/user/clusters/agent/)
- [ ] [Set up protected environments](https://docs.gitlab.com/ee/ci/environments/protected_environments.html)
# load the original model
mp = modelpruner.ModelPruner(full_sbml, protected_parts)
***
# prune the network
mp.prune()
# Editing this README
When you're ready to make this README your own, just edit this file and use the handy template below (or feel free to structure it however you want - this is just a starting point!). Thank you to [makeareadme.com](https://www.makeareadme.com/) for this template.
## Suggestions for a good README
Every project is different, so consider which of these sections apply to yours. The sections used in the template are suggestions for most open source projects. Also keep in mind that while a README can be too long and detailed, too long is better than too short. If you think your README is too long, consider utilizing another form of documentation rather than cutting out information.
## Name
Choose a self-explaining name for your project.
## Description
Let people know what your project can do specifically. Provide context and add a link to any reference visitors might be unfamiliar with. A list of Features or a Background subsection can also be added here. If there are alternatives to your project, this is a good place to list differentiating factors.
## Badges
On some READMEs, you may see small images that convey metadata, such as whether or not all the tests are passing for the project. You can use Shields to add some to your README. Many services also have instructions for adding a badge.
## Visuals
Depending on what you are making, it can be a good idea to include screenshots or even a video (you'll frequently see GIFs rather than actual videos). Tools like ttygif can help, but check out Asciinema for a more sophisticated method.
## Installation
Within a particular ecosystem, there may be a common way of installing things, such as using Yarn, NuGet, or Homebrew. However, consider the possibility that whoever is reading your README is a novice and would like more guidance. Listing specific steps helps remove ambiguity and gets people to using your project as quickly as possible. If it only runs in a specific context like a particular programming language version or operating system or has dependencies that have to be installed manually, also add a Requirements subsection.
## Usage
Use examples liberally, and show the expected output if you can. It's helpful to have inline the smallest example of usage that you can demonstrate, while providing links to more sophisticated examples if they are too long to reasonably include in the README.
# export reduced model to sbml
mp.write_sbml(pruned_sbml)
```
## Support
Tell people where they can go to for help. It can be any combination of an issue tracker, a chat room, an email address, etc.
## Roadmap
If you have ideas for releases in the future, it is a good idea to list them in the README.
## License
## Contributing
State if you are open to contributions and what your requirements are for accepting them.
[GPLv3](LICENSE.txt)
For people who want to make changes to your project, it's helpful to have some documentation on how to get started. Perhaps there is a script that they should run or some environment variables that they need to set. Make these steps explicit. These instructions could also be useful to your future self.
You can also document commands to lint the code or run tests. These steps help to ensure high code quality and reduce the likelihood that the changes inadvertently break something. Having instructions for running tests is especially helpful if it requires external setup, such as starting a Selenium server for testing in a browser.
Peter Schubert, Computational Cell Biology, HHU Duesseldorf, July 2022
## Authors and acknowledgment
Show your appreciation to those who have contributed to the project.
### References
[1] Kamp A, Thiele S, Haedicke O, Klamt S. (2017) Use of CellNetAnalyzer
in biotechnology and metabolic engineering. Journal of Biotechnolgy 261:
221-228. https://doi.org/10.1016/j.jbiotec.2017.05.001
## License
For open source projects, say how it is licensed.
[2] GNU gklp reference #### TO BE UPDATED
## Project status
If you have run out of energy or time for your project, put a note at the top of the README saying that development has slowed down or stopped completely. Someone may choose to fork your project or volunteer to step in as a maintainer or owner, allowing your project to keep going. You can also make an explicit request for maintainers.
[3] Erdrich P, Steuer R, Klamt S. (2015) An algorithm for the reduction
of genome-scale metabolic network models to meaningful core models.
BMC Syst Biol 9, 48. https://doi.org/10.1186/s12918-015-0191-x
from modelpruner._version import __version__
from modelpruner.core.model_pruner import ModelPruner
from . import core, models, problems
__all__ = []
__all__ += core.__all__
__all__ += models.__all__
__all__ += problems.__all__
__author__ = 'Peter Schubert'
"""
Definition of version string.
"""
__version__ = "0.2.0"
program_name = 'modelpruner'
from .model_pruner import ModelPruner
__all__ = ['ModelPruner']
import os
import re
import time
import numpy as np
import sbmlxdf
from modelpruner.models.fba_model import FbaModel
from modelpruner.core.protected_parts import ProtectedParts
class ModelPruner:
def __init__(self, sbml_fname, protected_parts, reduced_fname=None):
if reduced_fname is None:
self.reduced_fname = re.sub('.xml', '_red.xml', sbml_fname)
if os.path.exists(sbml_fname):
self.sbml_model = sbmlxdf.Model(sbml_fname)
print(f'Protected data loaded: {sbml_fname} '
f'(last modified: {time.ctime(os.path.getmtime(sbml_fname))})')
else:
print(f'{sbml_fname} not found!')
raise FileNotFoundError
self.tolerance = 1e-8 # later accessible by set_params() ?
self.fba_model = FbaModel(self.sbml_model)
self.nrp = ProtectedParts(protected_parts)
self.base_obj_dir = self.fba_model.obj_dir
self.base_flux_bounds = self.fba_model.flux_bounds.copy()
self.base_coefs = self.fba_model.obj_coefs.copy()
self.protected_sids = set(self.nrp.initial_protected_sids)
self.protected_rids = self.get_total_protected_rids(set(self.nrp.initial_protected_rids))
self.essential_rids = self.protected_rids
self.free_rids = set(self.fba_model.rids).difference(self.essential_rids)
self.set_function_optimum()
self.check_protected_parts()
# overwrite bounds on fba
self.fba_model.update_flux_bounds(self.nrp.overwrite_bounds)
def check_protected_parts(self):
extra_p_rids = self.protected_rids.difference(set(self.fba_model.rids))
extra_p_sids = self.protected_sids.difference(set(self.fba_model.sids))
pf_rids = set()
for pf in self.nrp.protected_functions.values():
pf_rids = pf_rids.union({rid for rid in pf.objective})
pf_rids = pf_rids.union({rid for rid in pf.overwrite_bounds})
if pf.fba_success is False:
print(f'protected function {pf.obj_id} has unsuccessful FBA '
f' with optimal value {pf.optimal}')
extra_pf_rids = pf_rids.difference(set(self.fba_model.rids))
if len(extra_p_rids) > 0:
print('protected reactions that do not exist in model:', extra_p_rids)
if len(extra_p_sids) > 0:
print('protected metabolites that do not exist in model:', extra_p_sids)
if len(extra_pf_rids) > 0:
print('reactions in protected function that do not exist in model:', extra_pf_rids)
zero_flux_rids = self.get_blocked_reactions()
blocked_p_rids = self.protected_rids.intersection(zero_flux_rids)
blocked_pf_rids = pf_rids.intersection(zero_flux_rids)
if len(blocked_p_rids) > 0:
print('protected reactions that are blocked (no flux):', blocked_p_rids)
if len(blocked_pf_rids) > 0:
print('reactions in protected function that are blocked (no flux):', blocked_pf_rids)
def get_blocked_reactions(self):
# identify blocked reactions
# check that none of the blocked reactions are protected
# actually remove blocked reactions, even they are included in
fba_solution = self.fba_model.fba_optimize()
zero_flux_mask = np.abs(fba_solution['x']) < self.tolerance
zero_flux_rids = self.fba_model.rids[zero_flux_mask]
fva_solution = self.fba_model.fva_optimize(zero_flux_rids, fract_of_optimum=0.0)
flux_ranges = np.vstack((fva_solution['min'], fva_solution['max'])).T
zero_flux_mask = np.all(abs(flux_ranges) < self.tolerance, axis=1)
zero_flux_rids = set(fva_solution['rids'][zero_flux_mask])
return zero_flux_rids
def prune(self):
self.remove_parallel_reactions()
def reaction_types_fva(self):
free_rids = list(set(self.fba_model.rids).difference(self.protected_rids))
flux_min_pfs = np.zeros((len(self.nrp.protected_functions), len(free_rids)))
flux_max_pfs = np.zeros((len(self.nrp.protected_functions), len(free_rids)))
for idx, pf in enumerate(self.nrp.protected_functions.values()):
self.set_temp_func_conditions(pf)
res = self.fba_model.fva_optimize(free_rids, fract_of_optimum=0.0)
self.restore_base_conditions()
flux_min_pfs[idx] = res['min']
flux_max_pfs[idx] = res['max']
flux_min_pfs[np.abs(flux_min_pfs) < self.tolerance] = 0.0
flux_max_pfs[np.abs(flux_max_pfs) < self.tolerance] = 0.0
flux_min = np.min(flux_min_pfs, axis=0)
flux_max = np.max(flux_max_pfs, axis=0)
mask_b = np.all(np.vstack((flux_min == 0, flux_max == 0)), axis=0)
blocked_rids = set(np.array(free_rids)[mask_b])
mask_e1 = np.all(np.vstack((flux_min < 0, flux_max < 0)), axis=0)
mask_e2 = np.all(np.vstack((flux_min > 0, flux_max > 0)), axis=0)
mask_e = np.any(np.vstack((mask_e1, mask_e2)), axis=0)
essential_rids = set(np.array(free_rids)[mask_e])
mask_c = np.all(np.vstack((np.logical_not(mask_b), np.logical_not(mask_e))), axis=0)
flux_span = flux_max - flux_min
ind = np.argsort(flux_span)
candidate_rids_sorted = list(np.array(free_rids)[ind][np.array(mask_c)[ind]])
return {'blocked_rids': blocked_rids, 'essential_rids': essential_rids,
'candidate_rids_sorted': candidate_rids_sorted}
def remove_parallel_reactions(self):
parallel_rid_groups = self.fba_model.get_identical_columns()
drop_rids = set()
for group_rids in parallel_rid_groups:
protected = set()
reversible = None
for rid in group_rids:
if rid in self.protected_rids:
protected.add(rid)
if self.fba_model.reversible[self.fba_model.rid2idx[rid]]:
reversible = rid
if len(protected) > 0:
drop_group_rids = set(group_rids) - protected
elif reversible is not None:
drop_group_rids = set(group_rids) - {reversible}
else:
drop_group_rids = set(group_rids[1:])
drop_rids = drop_rids.union(drop_group_rids)
self.fba_model.remove_reactions(drop_rids, self.protected_sids)
print(f'{len(drop_rids)} parallel reactions dropped:', drop_rids)
def set_function_optimum(self):
"""using FBA to set optimal value for protected functions"""
for obj_id in self.nrp.protected_functions:
pf = self.nrp.protected_functions[obj_id]
self.set_temp_func_conditions(pf)
res = self.fba_model.fba_optimize(short_result=True)
pf.optimal = res['fun']
pf.fba_success = res['success']
if hasattr(pf, 'target_frac'):
if pf.obj_dir == 'minimize':
pf.set_target_ub(pf.target_frac * pf.optimal)
else:
pf.set_target_lb(pf.target_frac * pf.optimal)
self.restore_base_conditions()
def get_total_protected_rids(self, protected_rids):
# identify reactions that need protection due to protected metabolites
# over and above the protected rids provided in protected parts
sid_mask = np.array([True if sid in self.protected_sids else False
for sid in self.fba_model.sids])
psf_mask = np.any(self.fba_model.s_mat[sid_mask] != 0, axis=0)
protected_sid_rids = set(self.fba_model.rids[psf_mask])
# identify rids in protected functions and ensure they are protected
pf_rids = set()
for pf in self.nrp.protected_functions.values():
pf_rids = pf_rids.union({rid for rid in pf.objective})
pf_rids = pf_rids.union({rid for rid in pf.overwrite_bounds})
total_protect_rids = protected_rids.union(protected_sid_rids).union(pf_rids)
return total_protect_rids
def check_pf_feasibility(self):
"""check that current model still satisfies the protected functions
"""
feasible = True
for pf in self.nrp.protected_functions.values():
self.set_temp_func_conditions(pf)
res = self.fba_model.fba_optimize(short_result=True)
self.restore_base_conditions()
if res['success'] is False:
feasible = False
break
if pf.obj_dir == 'maximize':
if res['fun'] < pf.target_lb:
feasible = False
break
else:
if res['fun'] > pf.target_ub:
feasible = False
break
return feasible
def set_temp_func_conditions(self, pf):
self.base_obj_dir = self.fba_model.obj_dir
self.base_flux_bounds = self.fba_model.flux_bounds.copy()
self.base_coefs = self.fba_model.obj_coefs.copy()
self.fba_model.obj_dir = pf.obj_dir
self.fba_model.update_flux_bounds(pf.overwrite_bounds)
self.fba_model.obj_coefs = self.fba_model.set_objective_coefficients(pf.objective)
def restore_base_conditions(self):
self.fba_model.obj_dir = self.base_obj_dir
self.fba_model.flux_bounds = self.base_flux_bounds
self.fba_model.obj_coefs = self.base_coefs
def get_reduction_status(self):
""" Return status in model reduction process
degrees of freedom, number removed reactions, number essential reactions
number remaining reactions
:returns: status information in a dictionary
:rtype: dict
"""
status = self.fba_model.get_model_status()
status['n_protected_rids'] = len(self.protected_rids)
status['n_protected_sids'] = len(self.protected_sids)
status['essential_rids'] = len(self.essential_rids)
return status
def print_status(self):
status = self.get_reduction_status()
print("reactions removed/remaining/protected: "
f"{status['n_full_rids' ] -status['n_red_rids']:4d}/{status['n_red_rids']:4d}/"
f"{status['essential_rids']:4d}; dof: {status['dof']:3d}")
import os
import time
import math
import numpy as np
import pandas as pd
import sbmlxdf
class ProtectedParts:
def __init__(self, protected_parts):
if type(protected_parts) == str:
if os.path.exists(protected_parts):
nrp = self.load_raw_protected_data(protected_parts)
print(f'Protected data loaded: {protected_parts} '
f'(last modified: {time.ctime(os.path.getmtime(protected_parts))})')
else:
print(f'{protected_parts} not found!')
raise FileNotFoundError
else:
assert type(protected_parts) == dict, 'expected filename or a dict'
nrp = protected_parts
self.initial_protected_rids = nrp.get('reactions', [])
self.initial_protected_sids = nrp.get('metabolites', [])
if 'bounds' in nrp:
num_cols = ['fbcLb', 'fbcUb']
nrp['bounds'][num_cols] = nrp['bounds'][num_cols].applymap(
lambda x: np.nan if type(x) == str else x)
self.overwrite_bounds = {rid: [row['fbcLb'], row['fbcUb']]
for rid, row in nrp['bounds'].iterrows()}
else:
self.overwrite_bounds = {}
self.protected_functions = {}
if 'functions' in nrp:
num_cols = ['fbcLb', 'fbcUb', 'fraction']
nrp['functions'][num_cols] = nrp['functions'][num_cols].applymap(
lambda x: np.nan if type(x) == str else x)
str_cols = ['fluxObjectives', 'reaction']
nrp['functions'][str_cols] = nrp['functions'][str_cols].applymap(
lambda x: '' if type(x) != str else x)
for func_id in list(nrp['functions'].index.unique()):
df_pf = nrp['functions'].loc[func_id]
self.protected_functions[func_id] = ProtectedFunction(df_pf)
@staticmethod
def load_raw_protected_data(fname):
"""Load raw protected parts from Excel file.
Reaction/metabolite identifiers not modified.
:param fname: file name with protected parts
:type fname: str
:returns: protected parts including:
:rtype: dict
"""
nrp = {}
with pd.ExcelFile(fname) as xlsx:
for key in ['reactions', 'metabolites']:
if key in xlsx.sheet_names:
nrp[key] = pd.read_excel(xlsx, sheet_name=key).iloc[:, 0].to_list()
for key in ['functions', 'bounds']:
if key in xlsx.sheet_names:
nrp[key] = pd.read_excel(xlsx, sheet_name=key, index_col=0)
return nrp
class ProtectedFunction:
def __init__(self, df_pf):
if type(df_pf) == pd.Series:
df_pf = pd.DataFrame([df_pf.values], columns=df_pf.index, index=[df_pf.name])
self.obj_id = df_pf.index[0]
self.objective = {}
self.overwrite_bounds = {}
for _, row in df_pf.iterrows():
if len(row['fluxObjectives']) > 0:
self.obj_dir = row['type']
for record in sbmlxdf.misc.record_generator(row['fluxObjectives']):
params = sbmlxdf.misc.extract_params(record)
self.objective[params['reac']] = params['coef']
if math.isfinite(row['fraction']):
self.target_frac = row['fraction']
if math.isfinite(row['fbcLb']):
self.target_lb = row['fbcLb']
if math.isfinite(row['fbcUb']):
self.target_ub = row['fbcUb']
if len(row.reaction) > 0:
self.overwrite_bounds[row['reaction']] = [row['fbcLb'], row['fbcUb']]
if hasattr(row, 'notes'):
if type(row['notes']) == str and len(row['notes']) > 0:
self.notes = row['notes']
def set_target_lb(self, lb):
self.target_lb = lb
def set_target_ub(self, ub):
self.target_ub = ub
"""Subpackage """
from .fba_model import FbaModel
__all__ = ['FbaModel']
import math
import numpy as np
import sbmlxdf
from modelpruner.problems.lp_problem import LpProblem
class FbaModel:
def __init__(self, sbml_model):
model_dict = sbml_model.to_df()
df_reactions = model_dict['reactions']
df_species = model_dict['species']
df_fbc_objectives = model_dict['fbcObjectives']
self.full_model_shape = {'n_full_rids': len(df_reactions),
'n_full_sids': len(df_species)}
self.rids = df_reactions.index.to_numpy()
self.rid2idx = {rid: idx for idx, rid in enumerate(self.rids)}
self.sids = df_species.index.to_numpy()
self.sid2idx = {sid: idx for idx, sid in enumerate(self.sids)}
self.reversible = df_reactions['reversible'].to_numpy().copy()
self.flux_bounds = np.vstack((df_reactions['fbcLb'].to_numpy(),
df_reactions['fbcUb'].to_numpy()))
self.s_mat = sbml_model.get_s_matrix().to_numpy()
self.objective = {}
for oid, row in df_fbc_objectives.iterrows():
if row['active'] is True:
self.obj_id = row.name
self.obj_dir = row['type']
for record in sbmlxdf.misc.record_generator(row['fluxObjectives']):
params = sbmlxdf.misc.extract_params(record)
self.objective[params['reac']] = params['coef']
self.check_consistency()
self.obj_coefs = self.set_objective_coefficients(self.objective)
def set_objective_coefficients(self, objective):
obj_coefs = np.zeros(len(self.rids))
for rid, coef in objective.items():
obj_coefs[self.rid2idx[rid]] = coef
return obj_coefs
def check_consistency(self):
# check that irreversible reaction have a flux lower bound >= 0
for idx in range(len(self.rids)):
if np.logical_not(self.reversible[idx]):
if self.flux_bounds[0, idx] < 0:
self.reversible[idx] = True
print(f'reaction {self.rids[idx]} set to reversible')
if len(self.objective) == 0:
print('Active FBA objective function missing in the model!')
for rid in self.objective:
if rid not in self.rids:
print('objective reaction not part of reactions')
raise AttributeError
def combine_fwd_bwd(self, vector):
"""combine forward and backward flux information.
:param vector: contains data on all reactions + backward information for reversible reactions
:type vector: 1D ndarray of shape (len(mr_rids) + sum(reversible))
:return: combined flux information
:rtype: 1D ndarray of shape (len(mr_rids))
"""
n_cols = len(self.rids)
combined = vector[:n_cols]
combined[self.reversible] -= vector[n_cols:]
return combined
def create_fba_lp(self):
"""create Linear Programming Problem for FBA with split reactions
Splitting reactions in forward/backward directions reduces high values during FBA
Splitting required for pFBA (minimize absolute flux values)
:return: LpProblem configured for FBA
:rtype: LpProblem
"""
lp = LpProblem()
fwd_bwd_s_mat = np.hstack((self.s_mat, -self.s_mat[:, self.reversible]))
fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible]))
fwd_bwd_bnds = np.hstack((np.fmax(self.flux_bounds, 0),
np.vstack((np.fmax(-self.flux_bounds[1, self.reversible], 0),
np.fmax(-self.flux_bounds[0, self.reversible], 0)))))
row_bnds = np.zeros((fwd_bwd_s_mat.shape[0], 2))
lp.configure(fwd_bwd_obj_coefs, fwd_bwd_s_mat, row_bnds, fwd_bwd_bnds, self.obj_dir)
return lp
def fba_optimize(self, short_result=False):
"""FBA optimization of current problem
:return: results information from the optimization
:rtype: dict
"""
lp = self.create_fba_lp()
if lp is None:
return {'success': False, 'message': 'not an FBA model'}
res = lp.solve(short_result)
if short_result is False:
res['x'] = self.combine_fwd_bwd(res['x'])
res['reduced_costs'] = self.combine_fwd_bwd(res['reduced_costs'])
return res
def fva_optimize(self, rids=None, fract_of_optimum=1.0):
"""FVA - flux variability analysis for all or selected reactions.
:param rids: selected reaction ids to run FVA (alternatively, all)
:type rids: list of strings (default: None - all reactions)
:param fract_of_optimum: value between 0.0 and 1.0.
:type fract_of_optimum: float (default 1.0)
:return: rid, min and max flux values
:rtype: dict
"""
lp = self.create_fba_lp()
if lp is None:
return {'success': False, 'message': 'not an FBA model'}
res = lp.solve(short_result=True)
if rids is None:
rids = self.rids
flux_min = np.zeros(len(rids))
flux_max = np.zeros(len(rids))
if res['success']:
# fix FBA objective as constraint
fwd_bwd_obj_coefs = np.hstack((self.obj_coefs, -self.obj_coefs[self.reversible]))
if self.obj_dir == 'maximize':
row_bds = np.array([res['fun'] * fract_of_optimum, np.nan])
else:
if fract_of_optimum > 0.0:
row_bds = np.array([np.nan, res['fun'] / fract_of_optimum])
else:
row_bds = np.array([np.nan, 1000.0])
lp.add_constraint(fwd_bwd_obj_coefs, row_bds)
n_cols = len(self.rids)
for idx, rid in enumerate(rids):
# select reaction maximize/minimize flux
new_obj_coefs = np.zeros(n_cols)
new_obj_coefs[self.rid2idx[rid]] = 1
new_fwd_bwd_obj_coefs = np.hstack((new_obj_coefs, -new_obj_coefs[self.reversible]))
lp.set_obj_coefs(new_fwd_bwd_obj_coefs)
lp.set_obj_dir('minimize')
res = lp.solve(short_result=True)
flux_min[idx] = res['fun'] if res['success'] else np.nan
lp.set_obj_dir('maximize')
res = lp.solve(short_result=True)
flux_max[idx] = res['fun'] if res['success'] else np.nan
return {'rids': np.array(rids), 'min': flux_min, 'max': flux_max}
def update_flux_bounds(self, flux_bounds):
"""selectivly set lower/upper flux bounds for given reactions.
Flux bounds with value of 'np.nan' will not be set
:param flux_bounds: new lower and upper flux bounds selected reactions
:type flux_bounds: dict (key: reaction id, value: list with two float values for lower/upper bound)
"""
for rid, (lb, ub) in flux_bounds.items():
if math.isfinite(lb):
self.flux_bounds[0, self.rid2idx[rid]] = lb
if lb < 0:
self.reversible[self.rid2idx[rid]] = True
if math.isfinite(ub):
self.flux_bounds[1, self.rid2idx[rid]] = ub
def remove_reactions(self, drop_rids, protected_sids):
# remove_reactions
# also remove corresponding metabolite columns
# update, rids, rid2idx, sids, sid2idx, reversible, flux_bounds, s_mat
keep_rids_mask = np.array([False if rid in drop_rids else True for rid in self.rids])
self.rids = self.rids[keep_rids_mask]
self.rid2idx = {rid: idx for idx, rid in enumerate(self.rids)}
self.reversible = self.reversible[keep_rids_mask]
self.obj_coefs = self.obj_coefs[keep_rids_mask]
self.flux_bounds = self.flux_bounds[:, keep_rids_mask]
self.s_mat = self.s_mat[:, keep_rids_mask]
keep_sids_mask = np.any(self.s_mat != 0, axis=1)
drop_sids = set(self.sids[np.logical_not(keep_sids_mask)])
if len(drop_sids) > 0:
drop_protected_sids = drop_sids.intersection(protected_sids)
if len(drop_protected_sids) > 0:
print('Something went worong, protected metabolites dropped due to reaction removal.',
drop_protected_sids)
self.sids = self.sids[keep_sids_mask]
self.sid2idx = {sid: idx for idx, sid in enumerate(self.sids)}
self.s_mat = self.s_mat[keep_sids_mask, :]
def get_identical_columns(self):
# Identify identical columns:
# sort the columns
# check parallel columns, if they are identical
# report on identical columns
# we also consider reversibility
# we do not consider linearly scaled reactions
fwd_bwd_s_mat = np.hstack((self.s_mat, -self.s_mat[:, self.reversible]))
fwd_bwd_rids = np.hstack((self.rids, self.rids[self.reversible]))
# identify groups of parallel reactions
sorted_idx = np.lexsort(fwd_bwd_s_mat)
groups = []
consumed_idxs = set()
for pos in range(len(sorted_idx)):
idx = sorted_idx[pos]
if idx not in consumed_idxs:
group = [idx]
for adjacent_pos in range(pos + 1, len(sorted_idx)):
candiate_idx = sorted_idx[adjacent_pos]
if np.all(fwd_bwd_s_mat[:, idx] == fwd_bwd_s_mat[:, candiate_idx]):
group.append(candiate_idx)
consumed_idxs.add(candiate_idx)
else:
break
if len(group) > 1:
groups.append(group)
parallel_rid_groups = [list(fwd_bwd_rids[group]) for group in groups]
return parallel_rid_groups
def get_model_status(self):
""" Return status on model
:returns: status information in a dictionary
:rtype: dict
"""
status = {'dof': self.s_mat.shape[1] - np.linalg.matrix_rank(self.s_mat),
'n_red_rids': len(self.rids),
'n_red_sids': len(self.sids)}
status.update(self.full_model_shape)
return status
@property
def obj_dir(self):
return self.__obj_dir
@obj_dir.setter
def obj_dir(self, dir_string):
if 'min' in dir_string:
self.__obj_dir = 'minimize'
else:
self.__obj_dir = 'maximize'
"""Subpackage with LpProblem Class """
from .lp_problem import LpProblem
__all__ = ['LpProblem']
"""Implementation of LpProblem (linear programming problem) class.
built upon swiglpk
Peter Schubert, HHU Duesseldorf, June 2022
"""
import numpy as np
import scipy
import scipy.sparse
import swiglpk as glpk
# status reported
simplex_status = {
0: 'LP problem instance successfully solved',
1: 'Unable to start search, initial basis specified in the '
'problem object is invalid. Number basic variables is not same as the number rows.',
2: 'Unable to start search, basis matrix corresponding to initial basis is singular.',
3: 'Unable to start search, basis matrix corresponding to initial basis is ill-conditioned.',
4: 'Unable to start search, some double-bounded variables have incorrect bounds.',
5: 'Search prematurely terminated: solver failure.',
6: 'Search prematurely terminated: objective function being '
'maximized reached its lower limit and continues decreasing.',
7: 'Search prematurely terminated: objective function being '
'minimized reached its upper limit and continues increasing.',
8: 'Search prematurely terminated: simplex iteration limit exceeded.',
9: 'Search prematurely terminated: time limit exceeded.',
10: 'LP problem instance has no primal feasible solution.',
11: 'LP problem instance has no dual feasible solution.'}
generic_status = {1: 'solution is undefined',
2: 'solution is feasible',
3: 'solution is infeasible',
4: 'problem has no feasible solution',
5: 'solution is optimal',
6: 'problem has unbounded solution'}
dual_status = {1: 'dual solution is undefined',
2: 'dual solution is feasible',
3: 'dual solution is infeasible',
4: 'no dual feasible solution exists'}
prim_status = {1: 'primal solution is undefined',
2: 'primal solution is feasible',
3: 'primal solution is infeasible',
4: 'no primal feasible solution exists'}
var_status = {1: 'basic variable',
2: 'non-basic variable on its lower bound',
3: 'non-basic variable on its upper bound',
4: 'non-basic free (unbounded) variable',
5: 'non-basic fixed variable'}
scaling_options = {'GM': glpk.GLP_SF_GM,
'EQ': glpk.GLP_SF_EQ,
'2N': glpk.GLP_SF_2N,
'SKIP': glpk.GLP_SF_SKIP,
'AUTO': glpk.GLP_SF_AUTO}
class LpProblem:
def __init__(self):
"""create a glpk problem.
Problem needs to be properly deleted usin del(LpProblem)
"""
self.lp = glpk.glp_create_prob()
self.ncols = 0
self.nrows = 0
self.res = {}
@staticmethod
def _get_variable_type(bounds):
"""Determine variable type for upper/lower bounds.
:param bounds: upper and lower bounds
:type bounds: 1D ndarray of length 2
:return var_type: glpk variable type to configure
:rtype: integer constant
:return lb: value of lower bound
:rtype float
:return ub: value of upper bound
:rtype float
"""
lb, ub = bounds
if np.isfinite(lb) and np.isfinite(ub):
if lb < ub:
var_type = glpk.GLP_DB
else:
ub = 0.0
var_type = glpk.GLP_FX
elif not np.isfinite(lb) and not np.isfinite(ub):
lb = 0.0
ub = 0.0
var_type = glpk.GLP_FR
elif np.isfinite(ub):
lb = 0.0
var_type = glpk.GLP_UP
else:
ub = 0.0
var_type = glpk.GLP_LO
return var_type, lb, ub
def configure(self, obj_coefs, constr_coefs, row_bnds, col_bnds, direction='maximize'):
"""Configure the LP Problem.
Note: glpk related indices start at 1 (not zero)
:param obj_coefs: coefficients of objective function
:type obj_coefs: 1D ndarray of shape (1, ncols)
:param constr_coefs: matrix with constraint coefficients
:type constr_coefs: 2D nparray with shape(nrows, ncols)
:param row_bnds: lower and upper bounds of auxhiliary (row) variables
:type row_bnds: 2D ndarray of shape(nrows, 2) (1st col: lower bounds, 2nd col: upper bounds)
:param col_bnds: lower and upper bounds of structural (col) variables
:type col_bnds: 2D ndarray of shape(2, ncols) (1st row: lower bounds, 2nd row: upper bounds)
:param direction: direction for the optimization ('maximize' or 'minimize')
:type direction: string
"""
self.nrows, self.ncols = constr_coefs.shape
assert len(obj_coefs) == self.ncols
assert col_bnds.shape == (2, self.ncols)
assert row_bnds.shape == (self.nrows, 2)
# configure objective function
glpk.glp_add_cols(self.lp, self.ncols)
self.set_obj_coefs(obj_coefs)
self.set_obj_dir(direction)
# configure bounds on structural (columns) variables
for i, lb_ub in enumerate(col_bnds.T):
var_type, lb, ub = self._get_variable_type(lb_ub)
glpk.glp_set_col_bnds(self.lp, 1 + i, var_type, lb, ub)
# configure constraints
glpk.glp_add_rows(self.lp, self.nrows)
for i, lb_ub in enumerate(row_bnds):
var_type, lb, ub = self._get_variable_type(lb_ub)
glpk.glp_set_row_bnds(self.lp, 1 + i, var_type, lb, ub)
constr_coefs_sparse = scipy.sparse.coo_matrix(constr_coefs)
n_coefs = len(constr_coefs_sparse.row)
ia = glpk.intArray(1 + n_coefs)
ja = glpk.intArray(1 + n_coefs)
ar = glpk.doubleArray(1 + n_coefs)
for i in range(n_coefs):
ia[1 + i] = int(1 + constr_coefs_sparse.row[i])
ja[1 + i] = int(1 + constr_coefs_sparse.col[i])
ar[1 + i] = constr_coefs_sparse.data[i]
glpk.glp_load_matrix(self.lp, n_coefs, ia, ja, ar)
def replace_constraint(self, row, constr_coefs, row_bnds):
# set/replace one bounded constraint
#
# Parameters:
# row: row id to be replaced
# coeffs: 1D np.array of floats of size number design variables
# coefficients != 0.0 will be configured
# bounds: 1D numpy.ndarray with float of size two
# lists and tuples will be converted to numpy.ndarray
# lower_bound, upper_bound, np.nan and np.inf supported
assert row <= glpk.glp_get_num_rows(self.lp)
glpk.glp_set_row_bnds(self.lp, row, *self._get_variable_type(row_bnds))
n_coef = len(np.nonzero(constr_coefs)[0])
ind = glpk.intArray(1 + n_coef)
val = glpk.doubleArray(1 + n_coef)
idx = 1
for i in np.nonzero(constr_coefs)[0]:
ind[idx] = int(1 + i)
val[idx] = constr_coefs[i]
idx += 1
glpk.glp_set_mat_row(self.lp, row, n_coef, ind, val)
def add_constraint(self, constr_coefs, row_bnds):
"""add a single constraint to the lp problem
:param constr_coefs: constraint coefficients for the new row
:type constr_coefs: 1d ndarray of length ncols
:param row_bnds: upper and lower bounds of the auxiliary (row) variable
:type row_bnds: 1D ndarray of length 2
:return:
"""
row = glpk.glp_add_rows(self.lp, 1)
self.replace_constraint(row, constr_coefs, row_bnds)
return row
def del_constraint(self, row):
# delete one constraint
#
# Parameters:
# row: row id to be replaced
assert row <= glpk.glp_get_num_rows(self.lp)
num = glpk.intArray(1 + 1)
num[1] = row
glpk.glp_del_rows(self.lp, 1, num)
def set_single_bound(self, react_idx, bounds):
assert react_idx <= self.ncols
if type(bounds) is not np.ndarray:
bounds = np.array(bounds)
glpk.glp_set_col_bnds(self.lp, 1 + react_idx, *self._get_variable_type(bounds))
def set_obj_dir(self, direction):
"""Configure the optimization direction
:param direction: direction for the optimization ('maximize' or 'minimize')
:type direction: string
"""
obj_dir = glpk.GLP_MAX if 'max' in direction else glpk.GLP_MIN
glpk.glp_set_obj_dir(self.lp, obj_dir)
def reset_cost(self, cost_coef=None):
# reset coefficients of objective function (cols) to 0.0
#
# Parameters:
# cost_coef: None or 1d numpy.ndarray with floats
# None: reset all cost coefficiencs
# ndarray: reset only coest coefficiencs != 0
if cost_coef is None:
for i in range(glpk.glp_get_num_cols(self.lp)):
glpk.glp_set_obj_coef(self.lp, int(1 + i), 0.0)
else:
for i in np.nonzero(cost_coef)[0]:
glpk.glp_set_obj_coef(self.lp, int(1 + i), 0.0)
def set_obj_coefs(self, obj_coefs):
"""set coefficients for objective function
:param obj_coefs: coefficients of objective function
:type obj_coefs: 1D ndarray of shape (1, ncols)
"""
for i in range(self.ncols):
glpk.glp_set_obj_coef(self.lp, int(1 + i), obj_coefs[i])
def solve(self, short_result=False, scaling=None):
"""Solve the Linear Programming Problem.
:param short_result: short result (only objective value and status) or long result
:type short_result: bool (default:False)
:param scaling: problem scaling options ('GM', 'EQ', '2N', 'SKIP', 'AUTO'
:type scaling: string (default, None)
:return: result information from the optimization stored in a dictionary
:rtype: dict
"""
if scaling is not None:
opt = scaling_options.get(scaling, glpk.GLP_SF_AUTO)
glpk.glp_scale_prob(self.lp, opt)
# sjj = [glpk.glp_get_sjj(self.lp, 1+i) for i in range(glpk.glp_get_num_cols(self.lp))]
# rii = [glpk.glp_get_rii(self.lp, 1+i) for i in range(glpk.glp_get_num_rows(self.lp))]
simplex_result = glpk.glp_simplex(self.lp, None)
return self.results(simplex_result, short_result)
def get_row_value(self, row):
"""Retrieve right-hand side of a constraint (auxiliary variable)
:param row: index into constraints
:type row: int
:return: value of auxiliary varibale (value of rhs)
:rtype: float
"""
assert row <= glpk.glp_get_num_rows(self.lp)
return glpk.glp_get_row_prim(self.lp, row)
def results(self, simplex_result, short_result=False):
"""Collect results for the optimization
Alternatively, reduced results, e.g. when doing pFBA
:param simplex_result: result from the LP solver
:type simplex_result: int
:param short_result: short result (only objective value and status) or long result
:type short_result: bool (default:False)
:return: result from the FBA optimization
:rtype: dict
"""
self.res = {
'fun': glpk.glp_get_obj_val(self.lp),
'success': glpk.glp_get_status(self.lp) == glpk.GLP_OPT,
}
if short_result is False:
self.res['x'] = np.array([glpk.glp_get_col_prim(self.lp, 1 + i) for i in range(self.ncols)])
self.res['reduced_costs'] = np.array([glpk.glp_get_col_dual(self.lp, 1 + i) for i in range(self.ncols)])
self.res['shadow_prices'] = np.array([glpk.glp_get_row_dual(self.lp, 1 + i) for i in range(self.nrows)])
self.res['simplex_result'] = simplex_result
self.res['simplex_message'] = simplex_status[simplex_result],
self.res['generic_status'] = generic_status[glpk.glp_get_status(self.lp)]
self.res['primal_status'] = prim_status[glpk.glp_get_prim_stat(self.lp)]
self.res['dual_status'] = dual_status[glpk.glp_get_dual_stat(self.lp)]
return self.res
def __del__(self):
"""Explicitly delete the LpProblem. (can we do without?)
"""
if getattr(self, 'lp'):
glpk.glp_delete_prob(self.lp)
[build-system]
requires = [
"setuptools>=42",
"wheel"
]
build-backend = "setuptools.build_meta"
setup.py 0 → 100755
# -*- coding: utf-8 -*-
import os
import re
from setuptools import setup, find_packages
setup_kwargs = {}
with open('README.md') as f:
setup_kwargs['long_description'] = f.read()
# version from file
with open(os.path.join('modelpruner', '_version.py')) as f:
mo = re.search(r"^__version__ = ['\"]([^'\"]*)['\"]",
f.read(), re.MULTILINE)
if mo:
setup_kwargs['version'] = mo.group(1)
setup(
name='modelpruner',
description='prune reactions in a metabolic SBML coded model',
author='Peter Schubert',
author_email='peter.schubert@hhu.de',
url='https://gitlab.cs.uni-duesseldorf.de/schubert/networkred',
project_urls={
"Source Code": 'https://gitlab.cs.uni-duesseldorf.de/schubert/modelpruner',
#"Documentation": 'https://sbmlxdf.readthedocs.io'
},
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: GNU Lesser General Public License v3 (LGPLv3)",
"Operating System :: OS Independent",
"Topic :: Scientific/Engineering :: Bio-Informatics",
],
license='GPLv3',
long_description_content_type='text/markdown',
packages=find_packages(exclude='docs'),
install_requires=['pandas>=1.02.0',
'numpy>= 1.20.0',
'sbmlxdf>=0.2.7',
'swiglpk>=5.0.3',
'scipy>=1.7.0',
],
python_requires=">=3.7",
keywords=['modeling', 'standardization', 'network reduction', 'SBML'],
**setup_kwargs
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment