Skip to content
Snippets Groups Projects
Commit 5cef5ab0 authored by Peter Schubert's avatar Peter Schubert
Browse files

Start constructing linear pathways

parent 1e44506e
No related branches found
No related tags found
No related merge requests found
from modelpruner._version import __version__
from modelpruner.core.model_pruner import ModelPruner
from modelpruner.core.protected_parts import ProtectedParts
from modelpruner.graph.network_graph import NetworkGraph
from modelpruner.graph.linear_pathways import LinearPathways
from . import core, models, problems
from . import core, models, problems, graph
__all__ = []
__all__ += core.__all__
__all__ += models.__all__
__all__ += problems.__all__
__all__ += graph.__all__
__author__ = 'Peter Schubert'
......@@ -25,7 +25,7 @@ Peter Schubert, HHU Duesseldorf, September 2022
import sys
import os
import re
import time
# import time
import math
import numpy as np
import pandas as pd
......@@ -113,42 +113,47 @@ class ModelPruner:
if resume is True and os.path.exists(self._snapshot_sbml):
sbml_fname = self._snapshot_sbml
self.fba_model = FbaModel(sbml_fname)
self.pps = ProtectedParts(protected_parts)
self.protected_sids = self.pps.protected_sids
# if resume option selected and snapshot protected rids exists, resume from there
# note: during pruning, essential rids are added to protected rids
self.protected_rids = self.get_total_protected_rids()
self.essential_rids = set()
# if resume option selected and snapshot of essential rids exists, use them
if resume is True and os.path.exists(self._snapshot_json):
with open(self._snapshot_json, 'r') as f:
snapshot_data = json.load(f)
self.protected_rids = set(snapshot_data['protected_rids'])
self.essential_rids = set(snapshot_data['essential_rids'])
print(f'Snapshot restored at {len(self.fba_model.rids):4d} remaining reactions '
f'from: {self._snapshot_json}')
else:
self.protected_rids = self.pps.protected_rids
# self.protected_rids = self.get_total_protected_rids()
# overwrite fba model bounds by general bounds of protected parts (prior to LpProblem instantiation)
self.fba_model.update_model_flux_bounds(self.pps.overwrite_bounds)
self.fba_model.update_model_reversible(self.get_pp_reversible_reactions())
print('Determine optimal values for protected functions')
self.set_function_optimum()
print('Determine optimum values for protected functions:')
optimum_values = self.set_function_optimum()
for idx, val in optimum_values.items():
notes = self.pps.protected_functions[idx].notes
print(f' protected function {idx}, optimum: {val:.04f} ({notes})')
print('Check consistency of protected parts')
self.check_protected_parts()
blocked_p_rids = self.check_protected_parts()
self.active_protected_rids = self.protected_rids.difference(blocked_p_rids)
def get_total_protected_rids(self):
"""Determine protected reactions, incl. those leading to protected metabolites.
"""Determine protected reactions, incl. reactions connecting protected metabolites.
:return: total protected reaction ids
:rtype: set of str
"""
# identify reactions that need protection due to protected metabolites
# over and above the protected rids provided in protected parts
sid_mask = [self.fba_model.sid2idx[sid] for sid in self.protected_sids]
psf_mask = self.fba_model.s_mat_coo.tocsr()[sid_mask].getnnz(axis=0)
sid_mask = [self.fba_model.sid2idx[sid] for sid in self.protected_sids
if sid in self.fba_model.sid2idx]
psf_mask = self.fba_model.s_mat_coo.tocsr()[sid_mask].getnnz(axis=0) > 0
protected_sid_rids = set(self.fba_model.rids[psf_mask])
protected_rids = self.pps.protected_rids.union(protected_sid_rids)
return protected_rids
......@@ -160,32 +165,44 @@ class ModelPruner:
"""
rev_pp_rids = set()
for rid, bounds in self.pps.overwrite_bounds.items():
if rid in self.fba_model.rids:
for idx in [0, 1]:
if math.isfinite(bounds[idx]) and bounds[idx] < 0.0:
rev_pp_rids.add(rid)
for pf in self.pps.protected_functions.values():
for rid, bounds in pf.overwrite_bounds.items():
if rid in self.fba_model.rids:
for idx in [0, 1]:
if math.isfinite(bounds[idx]) and bounds[idx] < 0.0:
rev_pp_rids.add(rid)
return rev_pp_rids
def set_function_optimum(self):
"""using FBA to set optimal value for protected functions"""
for pf in self.pps.protected_functions.values():
"""using FBA to set optimal value for protected functions
:return: optimal values for protected functions
:rtype: dict
"""
optimum_values = {}
for key, pf in self.pps.protected_functions.items():
res = self.fba_model.fba_pf_optimize(pf)
pf.optimal = res['fun']
optimum_values[key] = res['fun']
pf.fba_success = res['success']
if hasattr(pf, 'target_frac'):
if pf.obj_dir == 'maximize':
pf.set_target_lb(pf.target_frac * pf.optimal)
else:
pf.set_target_ub(pf.target_frac * pf.optimal)
return optimum_values
def check_protected_parts(self):
"""Report on consistency issues wrt to protected parts.
"""Check consistency wrt to protected parts. Return blocked protected rids
E.g. report on reactions and metabolites that do not exist in the model
E.g. report on reactions and metabolties that do not exist in the model
:return: protected reaction ids that are blocked (zero flux across conditions)
:rtype: set of str
"""
extra_p_rids = self.protected_rids.difference(set(self.fba_model.rids))
extra_p_sids = self.protected_sids.difference(set(self.fba_model.sids))
......@@ -209,12 +226,13 @@ class ModelPruner:
print(' Reactions in protected functions that are not protected:', pf_unprotected_rids)
blocked_rids = self.reaction_types_fva(self.fba_model.rids)['blocked_rids']
blocked_p_rids = self.pps.protected_sids.intersection(blocked_rids)
blocked_p_rids = self.protected_rids.intersection(blocked_rids)
blocked_pf_rids = pf_rids.intersection(blocked_rids)
if len(blocked_p_rids) > 0:
print(' Protected reactions that are blocked (no flux):', blocked_p_rids)
if len(blocked_pf_rids) > 0:
print(' Reactions in protected functions that are blocked (no flux):', blocked_pf_rids)
return blocked_p_rids
def identify_parallel_reactions(self):
"""Identify parallel reactions having same stoichiometry (considering reverse).
......@@ -243,6 +261,30 @@ class ModelPruner:
drop_rids |= drop_group_rids
return drop_rids
def is_protected_rids_blocked(self, candidate_rid):
"""check if active protected reactions get blocked by removal of specified reaction.
while removing a candidate reactions we want to ensure that active protected
reactions (active reactions in initial model) stay active in pruned model
:param candidate_rid: reaction candidate to remove
:type candidate_rid: str
:return: is any of the protected reactions blocked
:rtype: bool
"""
# set flux bounds of candidate reaction to zero
self.fba_model.update_lp_flux_bounds({candidate_rid: np.array([0.0, 0.0])})
# execute FVA across active protected rids
reaction_types = self.reaction_types_fva(list(self.active_protected_rids))
blocked_rids = reaction_types['blocked_rids']
# rest flux bounds for candidate reaction
self.fba_model.reset_lp_flux_bounds({candidate_rid})
is_blocked = True if len(blocked_rids) > 0 else False
return is_blocked
def reaction_types_fva(self, free_rids):
"""Idenify reaction types using FVA applied to all protected functions
......@@ -253,7 +295,7 @@ class ModelPruner:
:return: rid, min and max flux values
:rtype: dict
"""
print(time.strftime("%H:%M:%S", time.localtime()))
# print(time.strftime("%H:%M:%S", time.localtime()))
n_pf = len(self.pps.protected_functions)
flux_min_pfs = np.zeros((n_pf, len(free_rids)))
......@@ -343,7 +385,6 @@ class ModelPruner:
reaction_types = {'blocked_rids': blocked_rids, 'essential_rids': essential_rids,
'candidate_rids_sorted': candidate_rids_sorted}
return reaction_types
def prune(self, snapshot_freq=0):
......@@ -368,49 +409,60 @@ class ModelPruner:
print(f'Initial S-matrix: {self.fba_model.s_mat_coo.shape}; '
f' protected reactions: {len(self.protected_rids)}:', self.protected_rids)
# remove any parallel reactions from the model
drop_rids = self.identify_parallel_reactions().difference(self.protected_rids)
if len(drop_rids) > 0:
self.fba_model.remove_reactions(drop_rids)
print(f'{len(drop_rids)} parallel reaction(s) dropped:', drop_rids)
feasible = np.array([self.fba_model.check_pf_feasibility(pf)
for pf in self.pps.protected_functions.values()])
print('Protected functions feasibility:', np.all(feasible))
feasible = self.fba_model.check_pfs_feasibility(self.pps.protected_functions)
print('Protected functions feasibility:', feasible)
next_snapshot = 0
if snapshot_freq > 0:
next_snapshot = max(0, len(self.fba_model.rids) - snapshot_freq)
while len(self.protected_rids) < self.fba_model.s_mat_coo.shape[1]:
free_rids = list(set(self.fba_model.rids).difference(self.protected_rids))
# main iterration loop to retain and remove reactions from the fba model
retained_rids = self.protected_rids | self.essential_rids
while len(retained_rids) < self.fba_model.s_mat_coo.shape[1]:
free_rids = list(set(self.fba_model.rids).difference(retained_rids))
reaction_types = self.reaction_types_fva(free_rids)
print()
new_essential_rids = reaction_types['essential_rids']
blocked_rids = reaction_types['blocked_rids']
essential_rids = reaction_types['essential_rids']
candidate_rids = reaction_types['candidate_rids_sorted']
if len(blocked_rids) > 0:
self.fba_model.remove_reactions(blocked_rids)
print(f"{len(blocked_rids)} blocked reaction(s) removed:", blocked_rids)
print(f"{len(candidate_rids)} candidates:", candidate_rids[:4], '...')
# try to remove one reaction while still satisfying all protected functions
for rid in candidate_rids:
# print('Check feasibility of', rid)
feasible = True
for pf in self.pps.protected_functions.values():
feasible = self.fba_model.check_pf_feasibility(pf, zero_flux_rid=rid)
# print(f'Feasibility of {rid} is {feasible}')
for candidate_rid in candidate_rids:
# print('Check feasibility of', candidate_rid)
feasible = self.fba_model.check_pfs_feasibility(self.pps.protected_functions,
candidate_rid=candidate_rid)
if feasible is False:
essential_rids.add(rid)
break
if feasible is True:
self.fba_model.remove_reactions({rid})
print(f'{rid} removed from the model')
new_essential_rids.add(candidate_rid)
continue
else:
# check that active_protected_rids are not getting blocked
if len(self.active_protected_rids) > 0:
is_blocked = self.is_protected_rids_blocked(candidate_rid)
if is_blocked is True:
new_essential_rids.add(candidate_rid)
continue
self.fba_model.remove_reactions({candidate_rid})
print(f' {candidate_rid} removed from the model')
break
if len(essential_rids) > 0:
self.protected_rids |= essential_rids
print(f"{len(essential_rids)} reaction(s) added to protected reactions:", essential_rids)
if len(new_essential_rids) > 0:
self.essential_rids |= new_essential_rids
retained_rids |= new_essential_rids
print(f"{len(new_essential_rids)} reaction(s) added to essential reactions:",
new_essential_rids)
if feasible is False:
print('No more reactions to remove')
......@@ -418,14 +470,14 @@ class ModelPruner:
# store intermediate results (snapshots)
if len(self.fba_model.rids) < next_snapshot:
self.export_pruned_model(self._snapshot_sbml)
snapshot_data = {'protected_rids': list(self.protected_rids)}
snapshot_data = {'essential_rids': list(self.essential_rids)}
with open(self._snapshot_json, 'w') as f:
json.dump(snapshot_data, f)
next_snapshot = max(0, len(self.fba_model.rids) - snapshot_freq)
print('snapshot data saved.')
print(f'S-matrix: {self.fba_model.s_mat_coo.shape}; '
f'protected reactions: {len(self.protected_rids)}')
f'retained reactions: {len(retained_rids)}')
# check if blocked reactions exist in the finally pruned model
# blocked_rids = self.reaction_types_fva(self.fba_model.rids)['blocked_rids']
......@@ -449,6 +501,7 @@ class ModelPruner:
"""
status = self.fba_model.get_model_status()
status['n_protected_rids'] = len(self.protected_rids)
status['n_essential_rids'] = len(self.essential_rids)
status['n_protected_sids'] = len(self.protected_sids)
return status
......@@ -469,9 +522,9 @@ class ModelPruner:
:param pruned_sbml: pathname of pruned network, ending with '.xml'
:type pruned_sbml: str, optional (default: None, construct a name)
"""
n_s, n_r = self.fba_model.s_mat_coo.shape
n_sids, n_rids = self.fba_model.s_mat_coo.shape
if pruned_sbml is None:
pruned_sbml = re.sub(r'.xml$', f'_pruned_{n_s}x{n_r}.xml', self.full_sbml_fname)
pruned_sbml = re.sub(r'.xml$', f'_pruned_{n_sids}x{n_rids}.xml', self.full_sbml_fname)
# overwrite model objective with objective of first protected function
first_pf = list(self.pps.protected_functions.values())[0]
......@@ -483,7 +536,8 @@ class ModelPruner:
success = self.fba_model.export_pruned_model(pruned_sbml, df_fbc_objectives)
if success is True:
print('Pruned model exported to', pruned_sbml)
if len(self.protected_rids) == n_r:
retained_rids = self.protected_rids | self.essential_rids
if len(retained_rids) == n_rids:
self.remove_snapshot_files()
else:
print('Pruned model could not be exported')
......
......@@ -77,6 +77,7 @@ class ProtectedFunction:
self.obj_id = df_pf.index[0]
self.objective = {}
self.overwrite_bounds = {}
self.notes = ''
for _, row in df_pf.iterrows():
if len(row['fluxObjectives']) > 0:
self.obj_dir = row['type']
......@@ -91,9 +92,9 @@ class ProtectedFunction:
self.target_ub = row['fbcUb']
if len(row.reaction) > 0:
self.overwrite_bounds[row['reaction']] = [row['fbcLb'], row['fbcUb']]
if hasattr(row, 'notes'):
if type(row['notes']) == str and len(row['notes']) > 0:
self.notes = row['notes']
if hasattr(df_pf.iloc[0], 'notes'):
if type(df_pf.iloc[0]['notes']) == str and len(df_pf.iloc[0]['notes']) > 0:
self.notes = df_pf.iloc[0]['notes']
def set_target_lb(self, lb):
self.target_lb = lb
......
from .reaction_edge import ReactionEdge
from .metabolite_node import MetaboliteNode
from .network_graph import NetworkGraph
from .pathway_segment import PathwaySegment
from .linear_pathway import LinearPathway
# TODO: check, which classes to expose in __all__
__all__ = ['ReactionEdge', 'MetaboliteNode', 'PathwaySegment', 'LinearPathway']
"""
Peter Schubert, October 2022
"""
import numpy as np
from modelpruner.graph.pathway_segment import PathwaySegment
class LinearPathway:
def __init__(self, init_sid, balance_c2_sids, network_graph):
"""Instantiate and create a Linear pathway from an initial pathway node.
init_sid: initial pathway node
ng: Class Network Graph to work on
"""
self.init_sid = init_sid
self.balance_c2_sids = set(balance_c2_sids)
self.ng = network_graph
self.segments = [self.get_initial_segment()]
self.add_downstream_segments()
self.add_upstream_segments()
def get_initial_segment(self):
"""get initial pathway segment for inital node.
Node with upstream and downstream connectivity,
from where connect subsequently other downstream
and upstream pathway segments
:return: inital pathway segment
:rtype: PathwaySegment
"""
self.balance_c2_sids.remove(self.init_sid)
connectivity = self.get_us_ds_neighbors(self.init_sid)
pwy_seg = PathwaySegment(connectivity['sid'])
pwy_seg.set_upstream(connectivity['p_rid'], connectivity['nb_us'])
pwy_seg.set_downstream(connectivity['c_rid'], connectivity['nb_ds'])
pwy_seg.set_reversible(connectivity['reversible'])
return pwy_seg
def add_downstream_segments(self):
"""add downstream segments """
lcount = 1
upstream_seg = self.segments[-1]
while upstream_seg.nb_ds is not None and lcount < 50:
lcount += 1
pwy_rev = upstream_seg.reversible
self.balance_c2_sids.remove(upstream_seg.nb_ds)
connectivity = self.get_ds_neighbor(upstream_seg)
if 'c_rid' in connectivity:
pwy_seg = PathwaySegment(connectivity['sid'])
pwy_seg.set_upstream(upstream_seg.c_rid, upstream_seg.sid)
pwy_seg.set_downstream(connectivity['c_rid'], connectivity['nb_ds'])
pwy_seg.set_reversible(pwy_rev and connectivity['reversible'])
self.segments.append(pwy_seg)
if connectivity['nb_ds'] is not None:
# treat a reversible pathway that becomes directional (i.e. continue ds)
if (pwy_rev is True) and (connectivity['reversible'] is False):
for pwy_seg in self.segments:
pwy_seg.set_reversible(False)
# in other cases, just continue ds
# cases of path termination
else:
# pathway termination of a reversible pathway (reverse pwy_segs and continue ds)
if (pwy_rev is True) and (connectivity['reversible'] is True):
for pwy_seg in self.segments:
pwy_seg.set_reversible(False)
pwy_seg.reverse_direction()
self.segments.reverse()
# in other cases, the path termination terminates the while loop
# i.e. no consuming node, but only one producing node
# we reverse tha pwy_segments and their direction and continue ds
elif pwy_rev is True:
assert 'p_rid' in connectivity
assert connectivity['reversible'] is False
pwy_seg = PathwaySegment(connectivity['sid'])
pwy_seg.set_upstream(upstream_seg.c_rid, upstream_seg.sid)
pwy_seg.set_downstream(connectivity['p_rid'], connectivity['nb_us'])
self.segments.append(pwy_seg)
for pwy_seg in self.segments:
pwy_seg.set_reversible(False)
pwy_seg.reverse_direction()
self.segments.reverse()
else:
# conditions when directional pathway has producing reaction
print(f'inconsistent configuration, starting at {self.init_sid}')
break
upstream_seg = self.segments[-1]
def add_upstream_segments(self):
lcount = 1
downstream_seg = self.segments[0]
while downstream_seg.nb_us in self.balance_c2_sids and lcount < 50:
lcount += 1
pwy_rev = downstream_seg.reversible
self.balance_c2_sids.remove(downstream_seg.nb_us)
connectivity = self.get_us_neighbor(downstream_seg)
if 'p_rid' in connectivity:
pwy_seg = PathwaySegment(connectivity['sid'])
pwy_seg.set_downstream(downstream_seg.p_rid, downstream_seg.sid)
pwy_seg.set_upstream(connectivity['p_rid'], connectivity['nb_us'])
pwy_seg.set_reversible(pwy_rev and connectivity['reversible'])
self.segments.insert(0, pwy_seg)
if (connectivity['reversible'] is False) and (pwy_rev is True):
for pwy_seg in self.segments:
pwy_seg.set_reversible(False)
elif pwy_rev is True:
assert 'c_rid' in connectivity
assert connectivity['reversible'] is False
pwy_seg = PathwaySegment(connectivity['sid'])
pwy_seg.set_downstream(downstream_seg.c_rid, downstream_seg.sid)
pwy_seg.set_upstream(connectivity['c_rid'], connectivity['nb_ds'])
self.segments.insert(0, pwy_seg)
for pwy_seg in self.segments:
pwy_seg.reverse_direction()
self.segments.reverse()
else:
print(f'inconsistent configuration, starting at {self.init_sid}')
break
downstream_seg = self.segments[0]
def get_neighbor_node(self, substrate_sids):
"""Get a c2 node from reactants/product sids
If none of the reactant/products is a c2 node,
return None (which means, connection to network).
In case of several c2 nodes, return the first node
after sorting.
:param substrate_sids: metabolite ids (reactants or products)
:type substrate_sids: list-like of str
:return: metabolite id (sid) for neighbor node
:rtype: str or None
"""
nbs = substrate_sids.intersection(self.balance_c2_sids)
if len(nbs) == 0:
nb = None
else:
nb = sorted(nbs)[0]
return nb
def get_us_ds_neighbors(self, init_sid):
"""Get upstream and downstream for an initial c2 node.
:param init_sid: initial c2 node id
:type init_sid: str
:return: node id, upstream/downstream c2 nodes, connecting reactions
:rtype: dict
"""
cur_node = self.ng.nodes[init_sid]
rids = [rid for rid in (cur_node.consuming_rids | cur_node.producing_rids)]
revs = [rev for rev in (cur_node.consuming_rids | cur_node.producing_rids).values()]
rev = bool(np.all(np.array(revs)))
p_rid = None
c_rid = None
nb_us = None
nb_ds = None
if cur_node.connectivity == 1:
# c1 nodes are actually blocked nodes where a pathway ends
rid = rids[0]
if rid in cur_node.consuming_rids:
c_rid = rid
nb_ds = self.get_neighbor_node(self.ng.edges[c_rid].products)
else:
p_rid = rid
nb_us = self.get_neighbor_node(self.ng.edges[p_rid].reactants)
# in case of a reversible reaction, configure consuming path
if (rev is True) and (c_rid is None):
c_rid = p_rid
nb_ds = nb_us
p_rid = None
nb_us = None
if cur_node.connectivity == 2:
if rev is True:
# process a node with reversible fluxes on both sides
# one being considered as producing, one as consuming reaction
nb_sids = [None, None]
for idx, rid in enumerate(rids):
if rid in cur_node.consuming_rids:
nb_sids[idx] = self.get_neighbor_node(self.ng.edges[rid].products)
else:
nb_sids[idx] = self.get_neighbor_node(self.ng.edges[rid].reactants)
# if one of the reversible reactions connects to network, let it be the producing side
if nb_sids[0] is None:
p_rid, c_rid = rids
nb_us, nb_ds = nb_sids
else:
c_rid, p_rid = rids
nb_ds, nb_us = nb_sids
# get neighbors of a node in a directional pathway segment
else:
# identify one directional reaction connecting the node
if revs[0] is False:
drid, orid = rids
else:
orid, drid = rids
# check if the selected directional reaction is a consuming reaction
# the other reaction (directional or reversible) must then be the producing reaction
if drid in cur_node.consuming_rids:
c_rid = drid
p_rid = orid
nb_ds = self.get_neighbor_node(self.ng.edges[c_rid].products)
if p_rid in cur_node.producing_rids:
nb_us = self.get_neighbor_node(self.ng.edges[p_rid].reactants)
# in case the 2nd reaction is also a consuming reaction, it must be reversible
else:
assert cur_node.consuming_rids[p_rid] is True
nb_us = self.get_neighbor_node(self.ng.edges[p_rid].products)
# case, where the selected directional reaction is a producing reaction
else:
p_rid = drid
c_rid = orid
nb_us = self.get_neighbor_node(self.ng.edges[p_rid].reactants)
if c_rid in cur_node.consuming_rids:
nb_ds = self.get_neighbor_node(self.ng.edges[c_rid].products)
else:
assert cur_node.producing_rids[c_rid] is True
nb_ds = self.get_neighbor_node(self.ng.edges[c_rid].reactants)
return {'sid': init_sid, 'p_rid': p_rid, 'nb_us': nb_us,
'c_rid': c_rid, 'nb_ds': nb_ds, 'reversible': rev}
def get_ds_neighbor(self, us_pwy_seg):
"""get downstream neighbor at us_sid coming from p_rid.
Only for c1 and c2 nodes
c1 node: this is a terminal node (blocked node),
as only exising reaction is consumed by p_rid
c2 node: downstream is either a consuming reaction or
a reversible producing reaction
downstream node can be another c1/c2 node, or the network connection
reversible flag is set according to flow through this node
special case: remaining reaction is a directional producing reaction,
(while an actual consuming reaction is required). There is a problem, unless
the upstream pathway is reversible (leading to a directional pathway
in opposite direction)
"""
sid = us_pwy_seg.nb_ds
cur_node = self.ng.nodes[sid]
revs = [rev for rev in (cur_node.consuming_rids | cur_node.producing_rids).values()]
rev = bool(np.all(np.array(revs)))
c_rid = None
nb_ds = None
if cur_node.connectivity == 2:
p_rid = us_pwy_seg.c_rid
c_rid = [rid for rid in (cur_node.consuming_rids | cur_node.producing_rids) if rid != p_rid][0]
if c_rid in cur_node.consuming_rids:
nb_ds = self.get_neighbor_node(self.ng.edges[c_rid].products)
else:
nb_ds = self.get_neighbor_node(self.ng.edges[c_rid].reactants)
if cur_node.producing_rids[c_rid] is False:
# flow reversal in case the producing reaction is directional
return {'sid': sid, 'p_rid': c_rid, 'nb_us': nb_ds, 'reversible': rev}
return {'sid': sid, 'c_rid': c_rid, 'nb_ds': nb_ds, 'reversible': rev}
def get_us_neighbor(self, ds_pwy_seg):
sid = ds_pwy_seg.nb_us
cur_node = self.ng.nodes[sid]
revs = [rev for rev in (cur_node.consuming_rids | cur_node.producing_rids).values()]
rev = bool(np.all(np.array(revs)))
p_rid = None
nb_us = None
if cur_node.connectivity == 2:
c_rid = ds_pwy_seg.p_rid
p_rid = [rid for rid in (cur_node.consuming_rids | cur_node.producing_rids) if rid != c_rid][0]
if p_rid in cur_node.producing_rids:
nb_us = self.get_neighbor_node(self.ng.edges[p_rid].reactants)
else:
nb_us = self.get_neighbor_node(self.ng.edges[p_rid].products)
if cur_node.consuming_rids[p_rid] is False:
# flow reversal in case the consuming reaction is directional
return {'sid': sid, 'c_rid': p_rid, 'nb_ds': nb_us, 'reversible': rev}
return {'sid': sid, 'p_rid': p_rid, 'nb_us': nb_us, 'reversible': rev}
def get_reactions(self):
rids = set()
for pwy_seg in self.segments:
rids |= pwy_seg.get_reactions()
return rids
"""
Peter Schubert, October 2022
"""
from modelpruner.graph.linear_pathway import LinearPathway
class LinearPathways:
def __init__(self, network_graph, biomass_rid=None):
"""
Construct linear pathways from metabolites with a connectivity of 2
C1 metabolites (with connectivity of 1) are also included
Each such C1/C2 metabolite will only be consumed in once pathway
"""
self.ng = network_graph
if biomass_rid in self.ng.edges:
biomass_sids = self.ng.edges[biomass_rid].reactants | self.ng.edges[biomass_rid].products
else:
biomass_sids = set()
self.c2_sids = {node.id: None for node in self.ng.nodes.values()
if node.id not in biomass_sids and node.is_c2_node()}
self.pwy_rids = {}
self.pathways = []
pwy_idx = 0
for sid in sorted(self.c2_sids):
if self.c2_sids[sid] is None:
balance_c2_sids = {sid for sid, idx in self.c2_sids.items() if idx is None}
lin_pwy = LinearPathway(sid, balance_c2_sids, network_graph)
self.pathways.append(lin_pwy)
for subs_sid in self.pwy_substrates(lin_pwy):
if (subs_sid in self.c2_sids) and (self.c2_sids[subs_sid] is None):
self.c2_sids[subs_sid] = pwy_idx
for rid in lin_pwy.get_reactions():
self.pwy_rids[rid] = pwy_idx
pwy_idx += 1
def pwy_substrates(self, lin_pwy):
sids = set()
for pwy_seg in lin_pwy.segments:
sids |= set(self.ng.edges[pwy_seg.p_rid].reactants)
sids |= set(self.ng.edges[pwy_seg.p_rid].products)
sids |= set(self.ng.edges[pwy_seg.c_rid].reactants)
sids |= set(self.ng.edges[pwy_seg.c_rid].products)
return sids
import numpy as np
class MetaboliteNode:
def __init__(self, sid):
self.id = sid
self.producing_rids = {}
self.consuming_rids = {}
self.connectivity = 0
def add_producing_rid(self, rid, reversible):
self.producing_rids[rid] = reversible
self.connectivity = len(self.producing_rids) + len(self.consuming_rids)
def add_consuming_rid(self, rid, reversible):
self.consuming_rids[rid] = reversible
self.connectivity = len(self.producing_rids) + len(self.consuming_rids)
def is_c2_node(self):
if self.connectivity == 2:
if len(self.producing_rids) == 1 and len(self.consuming_rids) == 1:
return True
if len(self.producing_rids) == 2:
if bool(np.any(np.array([val for val in self.producing_rids.values()]))) is True:
return True
if len(self.consuming_rids) == 2:
if bool(np.any(np.array([val for val in self.consuming_rids.values()]))) is True:
return True
elif self.connectivity == 1: # blocked metabolite
return True
else:
return False
"""
Peter Schubert, October 2022
"""
from modelpruner.graph.metabolite_node import MetaboliteNode
from modelpruner.graph.reaction_edge import ReactionEdge
class NetworkGraph:
def __init__(self, fba_model):
# !! fba_model.reversible contains numpy bools, here we stick to Python bools
self.nodes = {sid: MetaboliteNode(sid) for sid in fba_model.sids}
self.edges = {rid: ReactionEdge(rid, bool(rev))
for (rid, rev) in zip(fba_model.rids, fba_model.reversible)}
self.connect_nodes(fba_model)
def connect_nodes(self, fba_model):
for (row, col, stoic) in zip(fba_model.s_mat_coo.row,
fba_model.s_mat_coo.col,
fba_model.s_mat_coo.data):
sid = fba_model.sids[row]
rid = fba_model.rids[col]
reversible = bool(fba_model.reversible[col])
if stoic > 0:
self.edges[rid].add_product(sid)
self.nodes[sid].add_producing_rid(rid, reversible)
else:
self.edges[rid].add_reactant(sid)
self.nodes[sid].add_consuming_rid(rid, reversible)
"""
Peter Schubert, October 2022
"""
class PathwaySegment:
def __init__(self, sid):
self.sid = sid
self.reversible = False
self.p_rid = None
self.nb_us = None
self.c_rid = None
self.nb_ds = None
def set_upstream(self, rid, sid):
self.p_rid = rid
self.nb_us = sid
def set_downstream(self, rid, sid):
self.c_rid = rid
self.nb_ds = sid
def set_reversible(self, reversible):
"""Set reversible flag of pathway segment.
Pathway segment can be reversible if both connecting reactions are reversible
if at least on reaction is directional the segment is also directional.
Pathway segment also made directional, if any other segment in a linear
pathway is directional
:param reversible: flag (True/False)
:type reversible: bool
"""
self.reversible = reversible
def reverse_direction(self):
temp_rid = self.p_rid
temp_sid = self.nb_us
self.p_rid = self.c_rid
self.nb_us = self.nb_ds
self.c_rid = temp_rid
self.nb_ds = temp_sid
def get_reactions(self):
"""Get the reactions connecting the pathway segments
:return: reaction ids connected to segment
:rtype: set of str
"""
rids = set()
if self.p_rid is not None:
rids.add(self.p_rid)
if self.c_rid is not None:
rids.add(self.c_rid)
return rids
class ReactionEdge:
def __init__(self, rid, reversible):
self.id = rid
self.reversible = reversible
self.reactants = set()
self.products = set()
def add_product(self, sid):
self.products.add(sid)
def add_reactant(self, sid):
self.reactants.add(sid)
......@@ -425,6 +425,29 @@ class FbaModel:
feasible = False
return feasible
def check_pfs_feasibility(self, pfs, candidate_rid=None):
"""check that current model satisfies all protected functions
:param pfs: all protected functions (objective)
:type pfs: dict of class ProtectedFunction
:param candidate_rid: reaction id of candiated checked for removal
:type candidate_rid: str, optional (default: None)
:return: status if protected function satisfied
:rtype: bool
"""
for pf in pfs.values():
res = self.fba_pf_optimize(pf, candidate_rid)
if res['success'] is False:
return False
# for successful optimization, check that optimum is acceptable
elif pf.obj_dir == 'maximize':
if res['fun'] < pf.target_lb:
return False
else:
if res['fun'] > pf.target_ub:
return False
return True
def fva_pf_flux_ranges(self, pf, rids):
"""FVA for a specific protected function for all or selected reactions
......@@ -439,8 +462,10 @@ class FbaModel:
:return: rid, min and max flux values
:rtype: dict
"""
# TODO: can we remove 'check that problem is feasible'
# check that problem is feasible
if self.check_pf_feasibility(pf) is False:
if self.check_pfs_feasibility({1: pf}) is False:
return {'success': False}
# implement objective as a constraint and overwrite bounds
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment