diff --git a/modelpruner/__init__.py b/modelpruner/__init__.py index 4a33acb1b45dbc9933af7811639d21649593edb1..c944f1ec33a04482f1c8448e0f120d21d05afcfb 100644 --- a/modelpruner/__init__.py +++ b/modelpruner/__init__.py @@ -1,12 +1,15 @@ from modelpruner._version import __version__ from modelpruner.core.model_pruner import ModelPruner from modelpruner.core.protected_parts import ProtectedParts +from modelpruner.graph.network_graph import NetworkGraph +from modelpruner.graph.linear_pathways import LinearPathways -from . import core, models, problems +from . import core, models, problems, graph __all__ = [] __all__ += core.__all__ __all__ += models.__all__ __all__ += problems.__all__ +__all__ += graph.__all__ __author__ = 'Peter Schubert' diff --git a/modelpruner/core/model_pruner.py b/modelpruner/core/model_pruner.py index f2905faf88e898f66860d5d6c2fbf05594359601..63c89d0855d0eb81ce74b2b35400b9762017519e 100644 --- a/modelpruner/core/model_pruner.py +++ b/modelpruner/core/model_pruner.py @@ -25,7 +25,7 @@ Peter Schubert, HHU Duesseldorf, September 2022 import sys import os import re -import time +# import time import math import numpy as np import pandas as pd @@ -113,42 +113,47 @@ class ModelPruner: if resume is True and os.path.exists(self._snapshot_sbml): sbml_fname = self._snapshot_sbml self.fba_model = FbaModel(sbml_fname) + self.pps = ProtectedParts(protected_parts) self.protected_sids = self.pps.protected_sids - # if resume option selected and snapshot protected rids exists, resume from there - # note: during pruning, essential rids are added to protected rids + self.protected_rids = self.get_total_protected_rids() + self.essential_rids = set() + + # if resume option selected and snapshot of essential rids exists, use them if resume is True and os.path.exists(self._snapshot_json): with open(self._snapshot_json, 'r') as f: snapshot_data = json.load(f) - self.protected_rids = set(snapshot_data['protected_rids']) + self.essential_rids = set(snapshot_data['essential_rids']) print(f'Snapshot restored at {len(self.fba_model.rids):4d} remaining reactions ' f'from: {self._snapshot_json}') - else: - self.protected_rids = self.pps.protected_rids - # self.protected_rids = self.get_total_protected_rids() # overwrite fba model bounds by general bounds of protected parts (prior to LpProblem instantiation) self.fba_model.update_model_flux_bounds(self.pps.overwrite_bounds) self.fba_model.update_model_reversible(self.get_pp_reversible_reactions()) - print('Determine optimal values for protected functions') - self.set_function_optimum() + print('Determine optimum values for protected functions:') + optimum_values = self.set_function_optimum() + for idx, val in optimum_values.items(): + notes = self.pps.protected_functions[idx].notes + print(f' protected function {idx}, optimum: {val:.04f} ({notes})') + print('Check consistency of protected parts') - self.check_protected_parts() + blocked_p_rids = self.check_protected_parts() + self.active_protected_rids = self.protected_rids.difference(blocked_p_rids) def get_total_protected_rids(self): - """Determine protected reactions, incl. those leading to protected metabolites. + """Determine protected reactions, incl. reactions connecting protected metabolites. :return: total protected reaction ids :rtype: set of str - """ # identify reactions that need protection due to protected metabolites - # over and above the protected rids provided in protected parts - sid_mask = [self.fba_model.sid2idx[sid] for sid in self.protected_sids] - psf_mask = self.fba_model.s_mat_coo.tocsr()[sid_mask].getnnz(axis=0) + sid_mask = [self.fba_model.sid2idx[sid] for sid in self.protected_sids + if sid in self.fba_model.sid2idx] + psf_mask = self.fba_model.s_mat_coo.tocsr()[sid_mask].getnnz(axis=0) > 0 protected_sid_rids = set(self.fba_model.rids[psf_mask]) + protected_rids = self.pps.protected_rids.union(protected_sid_rids) return protected_rids @@ -160,32 +165,44 @@ class ModelPruner: """ rev_pp_rids = set() for rid, bounds in self.pps.overwrite_bounds.items(): - for idx in [0, 1]: - if math.isfinite(bounds[idx]) and bounds[idx] < 0.0: - rev_pp_rids.add(rid) - for pf in self.pps.protected_functions.values(): - for rid, bounds in pf.overwrite_bounds.items(): + if rid in self.fba_model.rids: for idx in [0, 1]: if math.isfinite(bounds[idx]) and bounds[idx] < 0.0: rev_pp_rids.add(rid) + for pf in self.pps.protected_functions.values(): + for rid, bounds in pf.overwrite_bounds.items(): + if rid in self.fba_model.rids: + for idx in [0, 1]: + if math.isfinite(bounds[idx]) and bounds[idx] < 0.0: + rev_pp_rids.add(rid) return rev_pp_rids def set_function_optimum(self): - """using FBA to set optimal value for protected functions""" - for pf in self.pps.protected_functions.values(): + """using FBA to set optimal value for protected functions + + :return: optimal values for protected functions + :rtype: dict + """ + optimum_values = {} + for key, pf in self.pps.protected_functions.items(): res = self.fba_model.fba_pf_optimize(pf) pf.optimal = res['fun'] + optimum_values[key] = res['fun'] pf.fba_success = res['success'] if hasattr(pf, 'target_frac'): if pf.obj_dir == 'maximize': pf.set_target_lb(pf.target_frac * pf.optimal) else: pf.set_target_ub(pf.target_frac * pf.optimal) + return optimum_values def check_protected_parts(self): - """Report on consistency issues wrt to protected parts. + """Check consistency wrt to protected parts. Return blocked protected rids - E.g. report on reactions and metabolties that do not exist in the model + E.g. report on reactions and metabolites that do not exist in the model + + :return: protected reaction ids that are blocked (zero flux across conditions) + :rtype: set of str """ extra_p_rids = self.protected_rids.difference(set(self.fba_model.rids)) extra_p_sids = self.protected_sids.difference(set(self.fba_model.sids)) @@ -194,27 +211,28 @@ class ModelPruner: pf_rids |= {rid for rid in pf.objective} pf_rids |= {rid for rid in pf.overwrite_bounds} if pf.fba_success is False: - print(f'Protected function {pf.obj_id} has unsuccessful FBA ' + print(f' Protected function {pf.obj_id} has unsuccessful FBA ' f' with optimal value {pf.optimal}') extra_pf_rids = pf_rids.difference(set(self.fba_model.rids)) pf_unprotected_rids = pf_rids.difference(set(self.protected_rids)) if len(extra_p_rids) > 0: - print('Protected reactions that do not exist in model:', extra_p_rids) + print(' Protected reactions that do not exist in model:', extra_p_rids) if len(extra_p_sids) > 0: - print('Protected metabolites that do not exist in model:', extra_p_sids) + print(' Protected metabolites that do not exist in model:', extra_p_sids) if len(extra_pf_rids) > 0: - print('Reactions in protected functions that do not exist in model:', extra_pf_rids) + print(' Reactions in protected functions that do not exist in model:', extra_pf_rids) if len(pf_unprotected_rids) > 0: - print('Reactions in protected functions that are not protected:', pf_unprotected_rids) + print(' Reactions in protected functions that are not protected:', pf_unprotected_rids) blocked_rids = self.reaction_types_fva(self.fba_model.rids)['blocked_rids'] - blocked_p_rids = self.pps.protected_sids.intersection(blocked_rids) + blocked_p_rids = self.protected_rids.intersection(blocked_rids) blocked_pf_rids = pf_rids.intersection(blocked_rids) if len(blocked_p_rids) > 0: - print('Protected reactions that are blocked (no flux):', blocked_p_rids) + print(' Protected reactions that are blocked (no flux):', blocked_p_rids) if len(blocked_pf_rids) > 0: - print('Reactions in protected functions that are blocked (no flux):', blocked_pf_rids) + print(' Reactions in protected functions that are blocked (no flux):', blocked_pf_rids) + return blocked_p_rids def identify_parallel_reactions(self): """Identify parallel reactions having same stoichiometry (considering reverse). @@ -243,6 +261,30 @@ class ModelPruner: drop_rids |= drop_group_rids return drop_rids + def is_protected_rids_blocked(self, candidate_rid): + """check if active protected reactions get blocked by removal of specified reaction. + + while removing a candidate reactions we want to ensure that active protected + reactions (active reactions in initial model) stay active in pruned model + + :param candidate_rid: reaction candidate to remove + :type candidate_rid: str + :return: is any of the protected reactions blocked + :rtype: bool + """ + # set flux bounds of candidate reaction to zero + self.fba_model.update_lp_flux_bounds({candidate_rid: np.array([0.0, 0.0])}) + + # execute FVA across active protected rids + reaction_types = self.reaction_types_fva(list(self.active_protected_rids)) + blocked_rids = reaction_types['blocked_rids'] + + # rest flux bounds for candidate reaction + self.fba_model.reset_lp_flux_bounds({candidate_rid}) + + is_blocked = True if len(blocked_rids) > 0 else False + return is_blocked + def reaction_types_fva(self, free_rids): """Idenify reaction types using FVA applied to all protected functions @@ -253,7 +295,7 @@ class ModelPruner: :return: rid, min and max flux values :rtype: dict """ - print(time.strftime("%H:%M:%S", time.localtime())) + # print(time.strftime("%H:%M:%S", time.localtime())) n_pf = len(self.pps.protected_functions) flux_min_pfs = np.zeros((n_pf, len(free_rids))) @@ -343,7 +385,6 @@ class ModelPruner: reaction_types = {'blocked_rids': blocked_rids, 'essential_rids': essential_rids, 'candidate_rids_sorted': candidate_rids_sorted} - return reaction_types def prune(self, snapshot_freq=0): @@ -368,49 +409,60 @@ class ModelPruner: print(f'Initial S-matrix: {self.fba_model.s_mat_coo.shape}; ' f' protected reactions: {len(self.protected_rids)}:', self.protected_rids) + + # remove any parallel reactions from the model drop_rids = self.identify_parallel_reactions().difference(self.protected_rids) if len(drop_rids) > 0: self.fba_model.remove_reactions(drop_rids) print(f'{len(drop_rids)} parallel reaction(s) dropped:', drop_rids) - feasible = np.array([self.fba_model.check_pf_feasibility(pf) - for pf in self.pps.protected_functions.values()]) - print('Protected functions feasibility:', np.all(feasible)) + feasible = self.fba_model.check_pfs_feasibility(self.pps.protected_functions) + print('Protected functions feasibility:', feasible) next_snapshot = 0 if snapshot_freq > 0: next_snapshot = max(0, len(self.fba_model.rids) - snapshot_freq) - while len(self.protected_rids) < self.fba_model.s_mat_coo.shape[1]: - free_rids = list(set(self.fba_model.rids).difference(self.protected_rids)) + # main iterration loop to retain and remove reactions from the fba model + retained_rids = self.protected_rids | self.essential_rids + while len(retained_rids) < self.fba_model.s_mat_coo.shape[1]: + free_rids = list(set(self.fba_model.rids).difference(retained_rids)) reaction_types = self.reaction_types_fva(free_rids) print() + new_essential_rids = reaction_types['essential_rids'] blocked_rids = reaction_types['blocked_rids'] - essential_rids = reaction_types['essential_rids'] candidate_rids = reaction_types['candidate_rids_sorted'] + if len(blocked_rids) > 0: self.fba_model.remove_reactions(blocked_rids) print(f"{len(blocked_rids)} blocked reaction(s) removed:", blocked_rids) + print(f"{len(candidate_rids)} candidates:", candidate_rids[:4], '...') # try to remove one reaction while still satisfying all protected functions - for rid in candidate_rids: - # print('Check feasibility of', rid) - feasible = True - for pf in self.pps.protected_functions.values(): - feasible = self.fba_model.check_pf_feasibility(pf, zero_flux_rid=rid) - # print(f'Feasibility of {rid} is {feasible}') - if feasible is False: - essential_rids.add(rid) - break - if feasible is True: - self.fba_model.remove_reactions({rid}) - print(f'{rid} removed from the model') + for candidate_rid in candidate_rids: + # print('Check feasibility of', candidate_rid) + feasible = self.fba_model.check_pfs_feasibility(self.pps.protected_functions, + candidate_rid=candidate_rid) + if feasible is False: + new_essential_rids.add(candidate_rid) + continue + else: + # check that active_protected_rids are not getting blocked + if len(self.active_protected_rids) > 0: + is_blocked = self.is_protected_rids_blocked(candidate_rid) + if is_blocked is True: + new_essential_rids.add(candidate_rid) + continue + self.fba_model.remove_reactions({candidate_rid}) + print(f' {candidate_rid} removed from the model') break - if len(essential_rids) > 0: - self.protected_rids |= essential_rids - print(f"{len(essential_rids)} reaction(s) added to protected reactions:", essential_rids) + if len(new_essential_rids) > 0: + self.essential_rids |= new_essential_rids + retained_rids |= new_essential_rids + print(f"{len(new_essential_rids)} reaction(s) added to essential reactions:", + new_essential_rids) if feasible is False: print('No more reactions to remove') @@ -418,14 +470,14 @@ class ModelPruner: # store intermediate results (snapshots) if len(self.fba_model.rids) < next_snapshot: self.export_pruned_model(self._snapshot_sbml) - snapshot_data = {'protected_rids': list(self.protected_rids)} + snapshot_data = {'essential_rids': list(self.essential_rids)} with open(self._snapshot_json, 'w') as f: json.dump(snapshot_data, f) next_snapshot = max(0, len(self.fba_model.rids) - snapshot_freq) print('snapshot data saved.') print(f'S-matrix: {self.fba_model.s_mat_coo.shape}; ' - f'protected reactions: {len(self.protected_rids)}') + f'retained reactions: {len(retained_rids)}') # check if blocked reactions exist in the finally pruned model # blocked_rids = self.reaction_types_fva(self.fba_model.rids)['blocked_rids'] @@ -449,6 +501,7 @@ class ModelPruner: """ status = self.fba_model.get_model_status() status['n_protected_rids'] = len(self.protected_rids) + status['n_essential_rids'] = len(self.essential_rids) status['n_protected_sids'] = len(self.protected_sids) return status @@ -469,9 +522,9 @@ class ModelPruner: :param pruned_sbml: pathname of pruned network, ending with '.xml' :type pruned_sbml: str, optional (default: None, construct a name) """ - n_s, n_r = self.fba_model.s_mat_coo.shape + n_sids, n_rids = self.fba_model.s_mat_coo.shape if pruned_sbml is None: - pruned_sbml = re.sub(r'.xml$', f'_pruned_{n_s}x{n_r}.xml', self.full_sbml_fname) + pruned_sbml = re.sub(r'.xml$', f'_pruned_{n_sids}x{n_rids}.xml', self.full_sbml_fname) # overwrite model objective with objective of first protected function first_pf = list(self.pps.protected_functions.values())[0] @@ -483,7 +536,8 @@ class ModelPruner: success = self.fba_model.export_pruned_model(pruned_sbml, df_fbc_objectives) if success is True: print('Pruned model exported to', pruned_sbml) - if len(self.protected_rids) == n_r: + retained_rids = self.protected_rids | self.essential_rids + if len(retained_rids) == n_rids: self.remove_snapshot_files() else: print('Pruned model could not be exported') diff --git a/modelpruner/core/protected_parts.py b/modelpruner/core/protected_parts.py index d77aeae8eafedebf1a7f4e83857eb826be320730..08faa45e058c2047bcddc38f279eb4178da9df09 100644 --- a/modelpruner/core/protected_parts.py +++ b/modelpruner/core/protected_parts.py @@ -77,6 +77,7 @@ class ProtectedFunction: self.obj_id = df_pf.index[0] self.objective = {} self.overwrite_bounds = {} + self.notes = '' for _, row in df_pf.iterrows(): if len(row['fluxObjectives']) > 0: self.obj_dir = row['type'] @@ -91,9 +92,9 @@ class ProtectedFunction: self.target_ub = row['fbcUb'] if len(row.reaction) > 0: self.overwrite_bounds[row['reaction']] = [row['fbcLb'], row['fbcUb']] - if hasattr(row, 'notes'): - if type(row['notes']) == str and len(row['notes']) > 0: - self.notes = row['notes'] + if hasattr(df_pf.iloc[0], 'notes'): + if type(df_pf.iloc[0]['notes']) == str and len(df_pf.iloc[0]['notes']) > 0: + self.notes = df_pf.iloc[0]['notes'] def set_target_lb(self, lb): self.target_lb = lb diff --git a/modelpruner/graph/__init__.py b/modelpruner/graph/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..edad2849a8602a2f5a39a744c2c7c9b22e4a8d71 --- /dev/null +++ b/modelpruner/graph/__init__.py @@ -0,0 +1,9 @@ +from .reaction_edge import ReactionEdge +from .metabolite_node import MetaboliteNode +from .network_graph import NetworkGraph +from .pathway_segment import PathwaySegment +from .linear_pathway import LinearPathway + +# TODO: check, which classes to expose in __all__ +__all__ = ['ReactionEdge', 'MetaboliteNode', 'PathwaySegment', 'LinearPathway'] + diff --git a/modelpruner/graph/linear_pathway.py b/modelpruner/graph/linear_pathway.py new file mode 100644 index 0000000000000000000000000000000000000000..5bba14acc3455194a2971d8c0298c44e2b78661e --- /dev/null +++ b/modelpruner/graph/linear_pathway.py @@ -0,0 +1,317 @@ +""" + +Peter Schubert, October 2022 +""" + + +import numpy as np + +from modelpruner.graph.pathway_segment import PathwaySegment + + +class LinearPathway: + + def __init__(self, init_sid, balance_c2_sids, network_graph): + """Instantiate and create a Linear pathway from an initial pathway node. + + init_sid: initial pathway node + ng: Class Network Graph to work on + """ + + self.init_sid = init_sid + self.balance_c2_sids = set(balance_c2_sids) + self.ng = network_graph + + self.segments = [self.get_initial_segment()] + self.add_downstream_segments() + self.add_upstream_segments() + + def get_initial_segment(self): + """get initial pathway segment for inital node. + + Node with upstream and downstream connectivity, + from where connect subsequently other downstream + and upstream pathway segments + + :return: inital pathway segment + :rtype: PathwaySegment + """ + self.balance_c2_sids.remove(self.init_sid) + + connectivity = self.get_us_ds_neighbors(self.init_sid) + pwy_seg = PathwaySegment(connectivity['sid']) + pwy_seg.set_upstream(connectivity['p_rid'], connectivity['nb_us']) + pwy_seg.set_downstream(connectivity['c_rid'], connectivity['nb_ds']) + pwy_seg.set_reversible(connectivity['reversible']) + return pwy_seg + + def add_downstream_segments(self): + """add downstream segments """ + + lcount = 1 + upstream_seg = self.segments[-1] + while upstream_seg.nb_ds is not None and lcount < 50: + lcount += 1 + pwy_rev = upstream_seg.reversible + self.balance_c2_sids.remove(upstream_seg.nb_ds) + connectivity = self.get_ds_neighbor(upstream_seg) + + if 'c_rid' in connectivity: + pwy_seg = PathwaySegment(connectivity['sid']) + pwy_seg.set_upstream(upstream_seg.c_rid, upstream_seg.sid) + pwy_seg.set_downstream(connectivity['c_rid'], connectivity['nb_ds']) + pwy_seg.set_reversible(pwy_rev and connectivity['reversible']) + self.segments.append(pwy_seg) + + if connectivity['nb_ds'] is not None: + # treat a reversible pathway that becomes directional (i.e. continue ds) + if (pwy_rev is True) and (connectivity['reversible'] is False): + for pwy_seg in self.segments: + pwy_seg.set_reversible(False) + # in other cases, just continue ds + # cases of path termination + else: + # pathway termination of a reversible pathway (reverse pwy_segs and continue ds) + if (pwy_rev is True) and (connectivity['reversible'] is True): + for pwy_seg in self.segments: + pwy_seg.set_reversible(False) + pwy_seg.reverse_direction() + self.segments.reverse() + # in other cases, the path termination terminates the while loop + + # i.e. no consuming node, but only one producing node + # we reverse tha pwy_segments and their direction and continue ds + elif pwy_rev is True: + assert 'p_rid' in connectivity + assert connectivity['reversible'] is False + pwy_seg = PathwaySegment(connectivity['sid']) + pwy_seg.set_upstream(upstream_seg.c_rid, upstream_seg.sid) + pwy_seg.set_downstream(connectivity['p_rid'], connectivity['nb_us']) + self.segments.append(pwy_seg) + for pwy_seg in self.segments: + pwy_seg.set_reversible(False) + pwy_seg.reverse_direction() + self.segments.reverse() + else: + # conditions when directional pathway has producing reaction + print(f'inconsistent configuration, starting at {self.init_sid}') + break + upstream_seg = self.segments[-1] + + def add_upstream_segments(self): + + lcount = 1 + downstream_seg = self.segments[0] + while downstream_seg.nb_us in self.balance_c2_sids and lcount < 50: + lcount += 1 + pwy_rev = downstream_seg.reversible + self.balance_c2_sids.remove(downstream_seg.nb_us) + connectivity = self.get_us_neighbor(downstream_seg) + + if 'p_rid' in connectivity: + pwy_seg = PathwaySegment(connectivity['sid']) + pwy_seg.set_downstream(downstream_seg.p_rid, downstream_seg.sid) + pwy_seg.set_upstream(connectivity['p_rid'], connectivity['nb_us']) + pwy_seg.set_reversible(pwy_rev and connectivity['reversible']) + self.segments.insert(0, pwy_seg) + if (connectivity['reversible'] is False) and (pwy_rev is True): + for pwy_seg in self.segments: + pwy_seg.set_reversible(False) + elif pwy_rev is True: + assert 'c_rid' in connectivity + assert connectivity['reversible'] is False + pwy_seg = PathwaySegment(connectivity['sid']) + pwy_seg.set_downstream(downstream_seg.c_rid, downstream_seg.sid) + pwy_seg.set_upstream(connectivity['c_rid'], connectivity['nb_ds']) + self.segments.insert(0, pwy_seg) + for pwy_seg in self.segments: + pwy_seg.reverse_direction() + self.segments.reverse() + else: + print(f'inconsistent configuration, starting at {self.init_sid}') + break + downstream_seg = self.segments[0] + + def get_neighbor_node(self, substrate_sids): + """Get a c2 node from reactants/product sids + + If none of the reactant/products is a c2 node, + return None (which means, connection to network). + + In case of several c2 nodes, return the first node + after sorting. + + :param substrate_sids: metabolite ids (reactants or products) + :type substrate_sids: list-like of str + :return: metabolite id (sid) for neighbor node + :rtype: str or None + """ + nbs = substrate_sids.intersection(self.balance_c2_sids) + if len(nbs) == 0: + nb = None + else: + nb = sorted(nbs)[0] + return nb + + def get_us_ds_neighbors(self, init_sid): + """Get upstream and downstream for an initial c2 node. + + :param init_sid: initial c2 node id + :type init_sid: str + :return: node id, upstream/downstream c2 nodes, connecting reactions + :rtype: dict + """ + cur_node = self.ng.nodes[init_sid] + + rids = [rid for rid in (cur_node.consuming_rids | cur_node.producing_rids)] + revs = [rev for rev in (cur_node.consuming_rids | cur_node.producing_rids).values()] + rev = bool(np.all(np.array(revs))) + p_rid = None + c_rid = None + nb_us = None + nb_ds = None + + if cur_node.connectivity == 1: + # c1 nodes are actually blocked nodes where a pathway ends + rid = rids[0] + + if rid in cur_node.consuming_rids: + c_rid = rid + nb_ds = self.get_neighbor_node(self.ng.edges[c_rid].products) + else: + p_rid = rid + nb_us = self.get_neighbor_node(self.ng.edges[p_rid].reactants) + + # in case of a reversible reaction, configure consuming path + if (rev is True) and (c_rid is None): + c_rid = p_rid + nb_ds = nb_us + p_rid = None + nb_us = None + + if cur_node.connectivity == 2: + if rev is True: + # process a node with reversible fluxes on both sides + # one being considered as producing, one as consuming reaction + nb_sids = [None, None] + for idx, rid in enumerate(rids): + if rid in cur_node.consuming_rids: + nb_sids[idx] = self.get_neighbor_node(self.ng.edges[rid].products) + else: + nb_sids[idx] = self.get_neighbor_node(self.ng.edges[rid].reactants) + + # if one of the reversible reactions connects to network, let it be the producing side + if nb_sids[0] is None: + p_rid, c_rid = rids + nb_us, nb_ds = nb_sids + else: + c_rid, p_rid = rids + nb_ds, nb_us = nb_sids + + # get neighbors of a node in a directional pathway segment + else: + # identify one directional reaction connecting the node + if revs[0] is False: + drid, orid = rids + else: + orid, drid = rids + + # check if the selected directional reaction is a consuming reaction + # the other reaction (directional or reversible) must then be the producing reaction + if drid in cur_node.consuming_rids: + c_rid = drid + p_rid = orid + nb_ds = self.get_neighbor_node(self.ng.edges[c_rid].products) + + if p_rid in cur_node.producing_rids: + nb_us = self.get_neighbor_node(self.ng.edges[p_rid].reactants) + # in case the 2nd reaction is also a consuming reaction, it must be reversible + else: + assert cur_node.consuming_rids[p_rid] is True + nb_us = self.get_neighbor_node(self.ng.edges[p_rid].products) + + # case, where the selected directional reaction is a producing reaction + else: + p_rid = drid + c_rid = orid + nb_us = self.get_neighbor_node(self.ng.edges[p_rid].reactants) + + if c_rid in cur_node.consuming_rids: + nb_ds = self.get_neighbor_node(self.ng.edges[c_rid].products) + else: + assert cur_node.producing_rids[c_rid] is True + nb_ds = self.get_neighbor_node(self.ng.edges[c_rid].reactants) + + return {'sid': init_sid, 'p_rid': p_rid, 'nb_us': nb_us, + 'c_rid': c_rid, 'nb_ds': nb_ds, 'reversible': rev} + + def get_ds_neighbor(self, us_pwy_seg): + """get downstream neighbor at us_sid coming from p_rid. + + Only for c1 and c2 nodes + + c1 node: this is a terminal node (blocked node), + as only exising reaction is consumed by p_rid + + c2 node: downstream is either a consuming reaction or + a reversible producing reaction + downstream node can be another c1/c2 node, or the network connection + reversible flag is set according to flow through this node + + special case: remaining reaction is a directional producing reaction, + (while an actual consuming reaction is required). There is a problem, unless + the upstream pathway is reversible (leading to a directional pathway + in opposite direction) + """ + sid = us_pwy_seg.nb_ds + cur_node = self.ng.nodes[sid] + revs = [rev for rev in (cur_node.consuming_rids | cur_node.producing_rids).values()] + rev = bool(np.all(np.array(revs))) + c_rid = None + nb_ds = None + + if cur_node.connectivity == 2: + + p_rid = us_pwy_seg.c_rid + c_rid = [rid for rid in (cur_node.consuming_rids | cur_node.producing_rids) if rid != p_rid][0] + + if c_rid in cur_node.consuming_rids: + nb_ds = self.get_neighbor_node(self.ng.edges[c_rid].products) + else: + nb_ds = self.get_neighbor_node(self.ng.edges[c_rid].reactants) + if cur_node.producing_rids[c_rid] is False: + # flow reversal in case the producing reaction is directional + return {'sid': sid, 'p_rid': c_rid, 'nb_us': nb_ds, 'reversible': rev} + + return {'sid': sid, 'c_rid': c_rid, 'nb_ds': nb_ds, 'reversible': rev} + + def get_us_neighbor(self, ds_pwy_seg): + + sid = ds_pwy_seg.nb_us + cur_node = self.ng.nodes[sid] + revs = [rev for rev in (cur_node.consuming_rids | cur_node.producing_rids).values()] + rev = bool(np.all(np.array(revs))) + p_rid = None + nb_us = None + + if cur_node.connectivity == 2: + + c_rid = ds_pwy_seg.p_rid + p_rid = [rid for rid in (cur_node.consuming_rids | cur_node.producing_rids) if rid != c_rid][0] + + if p_rid in cur_node.producing_rids: + nb_us = self.get_neighbor_node(self.ng.edges[p_rid].reactants) + else: + nb_us = self.get_neighbor_node(self.ng.edges[p_rid].products) + if cur_node.consuming_rids[p_rid] is False: + # flow reversal in case the consuming reaction is directional + return {'sid': sid, 'c_rid': p_rid, 'nb_ds': nb_us, 'reversible': rev} + + return {'sid': sid, 'p_rid': p_rid, 'nb_us': nb_us, 'reversible': rev} + + def get_reactions(self): + + rids = set() + for pwy_seg in self.segments: + rids |= pwy_seg.get_reactions() + return rids diff --git a/modelpruner/graph/linear_pathways.py b/modelpruner/graph/linear_pathways.py new file mode 100644 index 0000000000000000000000000000000000000000..66eb7b84c65b4e22e952bd0f8198dcec7ca673ec --- /dev/null +++ b/modelpruner/graph/linear_pathways.py @@ -0,0 +1,52 @@ +""" + +Peter Schubert, October 2022 +""" + +from modelpruner.graph.linear_pathway import LinearPathway + + +class LinearPathways: + + def __init__(self, network_graph, biomass_rid=None): + """ + + Construct linear pathways from metabolites with a connectivity of 2 + + C1 metabolites (with connectivity of 1) are also included + Each such C1/C2 metabolite will only be consumed in once pathway + """ + + self.ng = network_graph + + if biomass_rid in self.ng.edges: + biomass_sids = self.ng.edges[biomass_rid].reactants | self.ng.edges[biomass_rid].products + else: + biomass_sids = set() + self.c2_sids = {node.id: None for node in self.ng.nodes.values() + if node.id not in biomass_sids and node.is_c2_node()} + self.pwy_rids = {} + self.pathways = [] + + pwy_idx = 0 + for sid in sorted(self.c2_sids): + if self.c2_sids[sid] is None: + balance_c2_sids = {sid for sid, idx in self.c2_sids.items() if idx is None} + lin_pwy = LinearPathway(sid, balance_c2_sids, network_graph) + self.pathways.append(lin_pwy) + + for subs_sid in self.pwy_substrates(lin_pwy): + if (subs_sid in self.c2_sids) and (self.c2_sids[subs_sid] is None): + self.c2_sids[subs_sid] = pwy_idx + for rid in lin_pwy.get_reactions(): + self.pwy_rids[rid] = pwy_idx + pwy_idx += 1 + + def pwy_substrates(self, lin_pwy): + sids = set() + for pwy_seg in lin_pwy.segments: + sids |= set(self.ng.edges[pwy_seg.p_rid].reactants) + sids |= set(self.ng.edges[pwy_seg.p_rid].products) + sids |= set(self.ng.edges[pwy_seg.c_rid].reactants) + sids |= set(self.ng.edges[pwy_seg.c_rid].products) + return sids diff --git a/modelpruner/graph/metabolite_node.py b/modelpruner/graph/metabolite_node.py new file mode 100644 index 0000000000000000000000000000000000000000..31db57394c1b6dfe09f133c5bec81e81a72fcb0d --- /dev/null +++ b/modelpruner/graph/metabolite_node.py @@ -0,0 +1,34 @@ + +import numpy as np + + +class MetaboliteNode: + + def __init__(self, sid): + self.id = sid + self.producing_rids = {} + self.consuming_rids = {} + self.connectivity = 0 + + def add_producing_rid(self, rid, reversible): + self.producing_rids[rid] = reversible + self.connectivity = len(self.producing_rids) + len(self.consuming_rids) + + def add_consuming_rid(self, rid, reversible): + self.consuming_rids[rid] = reversible + self.connectivity = len(self.producing_rids) + len(self.consuming_rids) + + def is_c2_node(self): + if self.connectivity == 2: + if len(self.producing_rids) == 1 and len(self.consuming_rids) == 1: + return True + if len(self.producing_rids) == 2: + if bool(np.any(np.array([val for val in self.producing_rids.values()]))) is True: + return True + if len(self.consuming_rids) == 2: + if bool(np.any(np.array([val for val in self.consuming_rids.values()]))) is True: + return True + elif self.connectivity == 1: # blocked metabolite + return True + else: + return False diff --git a/modelpruner/graph/network_graph.py b/modelpruner/graph/network_graph.py new file mode 100644 index 0000000000000000000000000000000000000000..5e2e0ba10491179f5ce5b577abd565080c95e9a7 --- /dev/null +++ b/modelpruner/graph/network_graph.py @@ -0,0 +1,32 @@ +""" + +Peter Schubert, October 2022 +""" + +from modelpruner.graph.metabolite_node import MetaboliteNode +from modelpruner.graph.reaction_edge import ReactionEdge + + +class NetworkGraph: + + def __init__(self, fba_model): + + # !! fba_model.reversible contains numpy bools, here we stick to Python bools + self.nodes = {sid: MetaboliteNode(sid) for sid in fba_model.sids} + self.edges = {rid: ReactionEdge(rid, bool(rev)) + for (rid, rev) in zip(fba_model.rids, fba_model.reversible)} + self.connect_nodes(fba_model) + + def connect_nodes(self, fba_model): + for (row, col, stoic) in zip(fba_model.s_mat_coo.row, + fba_model.s_mat_coo.col, + fba_model.s_mat_coo.data): + sid = fba_model.sids[row] + rid = fba_model.rids[col] + reversible = bool(fba_model.reversible[col]) + if stoic > 0: + self.edges[rid].add_product(sid) + self.nodes[sid].add_producing_rid(rid, reversible) + else: + self.edges[rid].add_reactant(sid) + self.nodes[sid].add_consuming_rid(rid, reversible) diff --git a/modelpruner/graph/pathway_segment.py b/modelpruner/graph/pathway_segment.py new file mode 100644 index 0000000000000000000000000000000000000000..34217f1ec17c003ec7eccb1b11f517ee89a47cf4 --- /dev/null +++ b/modelpruner/graph/pathway_segment.py @@ -0,0 +1,58 @@ +""" + + +Peter Schubert, October 2022 +""" + + +class PathwaySegment: + + def __init__(self, sid): + self.sid = sid + self.reversible = False + self.p_rid = None + self.nb_us = None + self.c_rid = None + self.nb_ds = None + + def set_upstream(self, rid, sid): + self.p_rid = rid + self.nb_us = sid + + def set_downstream(self, rid, sid): + self.c_rid = rid + self.nb_ds = sid + + def set_reversible(self, reversible): + """Set reversible flag of pathway segment. + + Pathway segment can be reversible if both connecting reactions are reversible + if at least on reaction is directional the segment is also directional. + Pathway segment also made directional, if any other segment in a linear + pathway is directional + + :param reversible: flag (True/False) + :type reversible: bool + """ + self.reversible = reversible + + def reverse_direction(self): + temp_rid = self.p_rid + temp_sid = self.nb_us + self.p_rid = self.c_rid + self.nb_us = self.nb_ds + self.c_rid = temp_rid + self.nb_ds = temp_sid + + def get_reactions(self): + """Get the reactions connecting the pathway segments + + :return: reaction ids connected to segment + :rtype: set of str + """ + rids = set() + if self.p_rid is not None: + rids.add(self.p_rid) + if self.c_rid is not None: + rids.add(self.c_rid) + return rids diff --git a/modelpruner/graph/reaction_edge.py b/modelpruner/graph/reaction_edge.py new file mode 100644 index 0000000000000000000000000000000000000000..eb235725a46c8fc3b4139c1ebeacdb5f9ce50de8 --- /dev/null +++ b/modelpruner/graph/reaction_edge.py @@ -0,0 +1,15 @@ + +class ReactionEdge: + + def __init__(self, rid, reversible): + + self.id = rid + self.reversible = reversible + self.reactants = set() + self.products = set() + + def add_product(self, sid): + self.products.add(sid) + + def add_reactant(self, sid): + self.reactants.add(sid) diff --git a/modelpruner/models/fba_model.py b/modelpruner/models/fba_model.py index 5dfbbf3e03aa2d0cd2c781d36b6e08fd5ffa0d88..be86128ee70e62e7fc760f1ea22d9d50a1c54e1e 100644 --- a/modelpruner/models/fba_model.py +++ b/modelpruner/models/fba_model.py @@ -425,6 +425,29 @@ class FbaModel: feasible = False return feasible + def check_pfs_feasibility(self, pfs, candidate_rid=None): + """check that current model satisfies all protected functions + + :param pfs: all protected functions (objective) + :type pfs: dict of class ProtectedFunction + :param candidate_rid: reaction id of candiated checked for removal + :type candidate_rid: str, optional (default: None) + :return: status if protected function satisfied + :rtype: bool + """ + for pf in pfs.values(): + res = self.fba_pf_optimize(pf, candidate_rid) + if res['success'] is False: + return False + # for successful optimization, check that optimum is acceptable + elif pf.obj_dir == 'maximize': + if res['fun'] < pf.target_lb: + return False + else: + if res['fun'] > pf.target_ub: + return False + return True + def fva_pf_flux_ranges(self, pf, rids): """FVA for a specific protected function for all or selected reactions @@ -439,8 +462,10 @@ class FbaModel: :return: rid, min and max flux values :rtype: dict """ + + # TODO: can we remove 'check that problem is feasible' # check that problem is feasible - if self.check_pf_feasibility(pf) is False: + if self.check_pfs_feasibility({1: pf}) is False: return {'success': False} # implement objective as a constraint and overwrite bounds