From 18879fe55753ce3e80ec35695175a9d3348a077c Mon Sep 17 00:00:00 2001 From: Peter Schubert <Peter.Schubert@hhu.de> Date: Mon, 25 Mar 2024 10:07:04 +0100 Subject: [PATCH] Initial commit - test --- cobra_deletion_ps.py | 461 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 461 insertions(+) create mode 100644 cobra_deletion_ps.py diff --git a/cobra_deletion_ps.py b/cobra_deletion_ps.py new file mode 100644 index 0000000..5281d53 --- /dev/null +++ b/cobra_deletion_ps.py @@ -0,0 +1,461 @@ +# code copied from +# cobra.flux_analysis.deletion +# cobra.flux_analysis.moma +# cobra.flux_analysis.room + +# and modified, so we can implement sqMOMA and ROOMw +# as required for Deya Alzoubi Manuscript + +# Peter Schubert, 26.05.2023 + +"""Provide functions for reaction and gene deletions.""" + +import math +from functools import partial +from itertools import product +from typing import TYPE_CHECKING, List, Optional, Tuple, Union + +import pandas as pd +from optlang.exceptions import SolverError +from optlang.symbolics import Zero, add + +from cobra.core import Configuration, Gene, Model, Reaction +from cobra.util import ProcessPool +from cobra.util import solver as sutil +from cobra.flux_analysis.parsimonious import pfba + +if TYPE_CHECKING: + from cobra import Solution + + +configuration = Configuration() + + +def _get_growth(model: Model) -> Tuple[float, str]: + """Return the growth from the `model`. + + Parameters + ---------- + model : cobra.Model + The model to obtain growth for. + + Returns + ------- + float + The obtained growth value. Returns nan if there is some error while + optimizing. + + """ + try: + if "old_objective" in model.solver.variables: + if math.isfinite(model.slim_optimize()): + growth = model.solver.variables.old_objective.primal + else: + growth = float("nan") + else: + growth = model.slim_optimize() + except SolverError: + growth = float("nan") + return growth, model.solver.status + + +def _gene_deletion_ps(model, method, gene_ids): + + with model: + + # determine wild type growth for for ROOMw + if method == 'ROOMw': + model.slim_optimize() + wt_growth = model.solver.variables['old_objective'].primal + + for gene_id in gene_ids: + model.genes.get_by_id(gene_id).knock_out() + + # for ROOMw, determine number of flux increases to achieve 5% of wt growth + if method == 'ROOMw': + model.solver.variables["old_objective"].lb = 0.05 * wt_growth + n_increases = model.slim_optimize() + model.solver.variables["old_objective"].lb = 0.0 + status = model.solver.status + if status == 'optimal': + model.solver.constraints["roomw_constraint_increases"].lb = n_increases + growth, status = _get_growth(model) + n = model.slim_optimize() + model.solver.constraints["roomw_constraint_increases"].lb = 0 + else: + growth = float("nan") + n = float("nan") + else: + growth, status = _get_growth(model) + n = model.slim_optimize() + + if 'ROOM' in method: + return gene_ids, growth, status, n + else: + return gene_ids, growth, status + + +def _gene_deletion_worker(ids: List[str]) -> Tuple[List[str], float, str]: + """Perform gene deletions on worker process. + + Parameters + ---------- + ids : list of str + The gene IDs to knock-out. + + Returns + ------- + tuple of (list of str, float, str) + A tuple containing gene IDs knocked out, growth of the model and + the solver status. + + """ + global _model + global _method + + return _gene_deletion_ps(_model, _method, ids) + + +def _init_worker(model: Model, method: str) -> None: + """Initialize worker process.""" + global _model + global _method + + _model = model + _method = method + + +def _multi_deletion_ps(model: Model, element_lists: List[str], + method: str, solution: Optional["Solution"] = None, + processes: Optional[int] = None, **kwargs) -> pd.DataFrame: + """Provide a common interface for single or multiple knockouts. + + Parameters + ---------- + model : cobra.Model + The metabolic model to perform deletions in. + element_lists : list of cobra.Gene or cobra.Reaction + List of cobra.Gene or cobra.Reaction to be deleted. + method : {"sqMOMA", "ROOM", "lROOM", "ROOMw"} + Method used to predict the growth rate (default "fba"). + solution : cobra.Solution, optional + A previous solution to use as a reference for (linear) MOMA or ROOM + (default None). + processes : int, optional + The number of parallel processes to run. Can speed up the computations + if the number of knockouts to perform is large. If not passed, + will be set to `configuration.processes` (default None). + **kwargs : + Passed on to underlying simulation functions. + + Returns + ------- + pandas.DataFrame + A representation of all combinations of entity deletions. The + columns are 'growth' and 'status', where + + index : tuple(str) + The gene or reaction identifiers that were knocked out. + growth : float + The growth rate of the adjusted model. + status : str + The solution's status. + """ + + methods = ['MOMA', 'lMOMA', 'sqMOMA', 'ROOM', 'lROOM', 'ROOMw', 'ROOMup'] + if method not in methods: + raise RuntimeError(f'Wrong method {method}. Use any of [{methods}]') + + solver = sutil.interface_to_str(model.problem.__name__) + if 'MOMA' in method and solver not in sutil.qp_solvers: + raise RuntimeError(f"Cannot use MOMA since '{solver}' is not QP-capable. " + "Please choose a different solver or use FBA only.") + + if processes is None: + processes = configuration.processes + + with model: + if 'MOMA' in method: + add_moma_ps(model, method=method, solution=solution) + elif 'ROOM' in method: + add_room_ps(model, method=method, solution=solution, **kwargs) + args = {frozenset(comb) for comb in product(*element_lists)} + processes = min(processes, len(args)) + + print(f'{method}: constraints: {len(model.constraints)}, variables: {len(model.variables)}') ### + + + def extract_knockout_results_moma(result_iter): + result = pd.DataFrame([(set(ids), growth, status) for (ids, growth, status) in result_iter], + columns=["ids", "growth", "status"]) + return result + + def extract_knockout_results_room(result_iter): + result = pd.DataFrame([(set(ids), growth, status, n) for (ids, growth, status, n) in result_iter], + columns=["ids", "growth", "status", "n"]) + return result + + if processes > 1: + worker = _gene_deletion_worker + chunk_size = len(args) // processes + + with ProcessPool(processes, initializer=_init_worker, initargs=(model, method,)) as pool: + if 'MOMA' in method: + results = extract_knockout_results_moma( + pool.imap_unordered(worker, args, chunksize=chunk_size)) + if 'ROOM' in method: + results = extract_knockout_results_room( + pool.imap_unordered(worker, args, chunksize=chunk_size)) + else: + worker = _gene_deletion_ps + if 'MOMA' in method: + results = extract_knockout_results_moma(map(partial(worker, model, method), args)) + if 'ROOM' in method: + results = extract_knockout_results_room(map(partial(worker, model, method), args)) + return results + + +def _entities_ids(entities: List[Union[str, Gene, Reaction]]) -> List[str]: + """Return the IDs of the `entities`. + + Parameters + ---------- + entities : list of str or cobra.Gene or cobra.Reaction + The list of entities whose IDs need to be returned. + + Returns + ------- + list of str + The IDs of the `entities`. + + """ + try: + return [e.id for e in entities] + except AttributeError: + return list(entities) + + +def _element_lists(entities: List[Union[str, Gene, Reaction]], *ids: List[str]) -> List[str]: + """Return the elements. + + Parameters + ---------- + entities : list of str or cobra.Gene or cobra.Reaction + The list of entities. + *ids : list of str + The list of IDs. + + Returns + ------- + list of str + The list of IDs. + + """ + lists = list(ids) + if lists[0] is None: + lists[0] = entities + result = [_entities_ids(lists[0])] + for _list in lists[1:]: + if _list is None: + result.append(result[-1]) + else: + result.append(_entities_ids(_list)) + return result + + +def single_gene_deletion(model: Model, gene_list: Optional[List[Union[Gene, str]]] = None, + method: str = "sqMOMA", solution: Optional["Solution"] = None, + processes: Optional[int] = None, **kwargs,) -> pd.DataFrame: + """Knock out each gene from `gene_list`. + + Parameters + ---------- + model : cobra.Model + The metabolic model to perform deletions in. + gene_list : list of cobra.Gene or str, optional + The gene objects to be deleted. If not passed, all the genes from the + model are used (default None). + method : {"sqMOMA", "ROOMw"} + solution : cobra.Solution, optional + A previous solution to use as a reference for (linear) MOMA or ROOM + (default None). + processes : int, optional + The number of parallel processes to run. Can speed up the computations + if the number of knockouts to perform is large. If not passed, + will be set to `configuration.processes` (default None). + **kwargs : + Keyword arguments are passed on to underlying simulation functions + such as `add_room`. + + Returns + ------- + pandas.DataFrame + A representation of all single gene deletions. The columns are + 'growth' and 'status', where + + index : tuple(str) + The gene identifier that was knocked out. + growth : float + The growth rate of the adjusted model. + status : str + The solution's status. + + """ + return _multi_deletion_ps(model, element_lists=_element_lists(model.genes, gene_list), + method=method, solution=solution, processes=processes, **kwargs,) + + +def add_moma_ps(model: "Model", method: str, solution: Optional["Solution"] = None) -> None: + """ + Add sqMOMA constraints and objective representing to the `model`. + + This adds variables and constraints for the minimization of metabolic + adjustment (MOMA) to the model. + + Parameters + ---------- + model : cobra.Model + The model to add MOMA constraints and objective to. + method : {"sqMOMA"} + Method used to predict the growth rate. + solution : cobra.Solution, optional + A previous solution to use as a reference. If no solution is given, + one will be computed using pFBA (default None). + + Notes + ----- + In the original MOMA [1]_ specification, one looks for the flux + distribution of the deletion (v^d) closest to the fluxes without the + deletion (v). + In math this means: + + minimize: \sum_i (v^d_i - v_i)^2 + s.t. : Sv^d = 0 + lb_i \le v^d_i \le ub_i + + Here, we use a variable transformation v^t := v^d_i - v_i. Substituting + and using the fact that Sv = 0 gives: + + minimize: \sum_i (v^t_i)^2 + s.t. : Sv^d = 0 + v^t = v^d_i - v_i + lb_i \le v^d_i \le ub_i + + So, basically we just re-center the flux space at the old solution and + then find the flux distribution closest to the new zero (center). This + is the same strategy as used in cameo. + + In the case of linear MOMA [2]_, we instead minimize \sum_i abs(v^t_i). + The linear MOMA is typically significantly faster. Also, quadratic MOMA + tends to give flux distributions in which all fluxes deviate from the + reference fluxes a little bit whereas linear MOMA tends to give flux + distributions where the majority of fluxes are the same reference with + few fluxes deviating a lot (typical effect of L2 norm vs L1 norm). + + The former objective function is saved in the optlang solver interface as + ``"moma_old_objective"`` and this can be used to immediately extract the + value of the former objective after MOMA optimization. + + See Also + -------- + pfba : parsimonious FBA + + References + ---------- + .. [1] Segrè, Daniel, Dennis Vitkup, and George M. Church. “Analysis of + Optimality in Natural and Perturbed Metabolic Networks.” + Proceedings of the National Academy of Sciences 99, no. 23 + (November 12, 2002): 15112. https://doi.org/10.1073/pnas.232349399. + .. [2] Becker, Scott A, Adam M Feist, Monica L Mo, Gregory Hannum, + Bernhard Ø Palsson, and Markus J Herrgard. “Quantitative + Prediction of Cellular Metabolism with Constraint-Based Models: + The COBRA Toolbox.” Nature Protocols 2 (March 29, 2007): 727. + """ + if "moma_old_objective" in model.solver.variables: + raise ValueError("The model is already adjusted for MOMA.") + + # Fall back to default QP solver if current one has no QP capability + if sutil.interface_to_str(model.problem) not in sutil.qp_solvers: + model.solver = sutil.choose_solver(model, qp=True) + + if solution is None: + solution = pfba(model) + + prob = model.problem + v = prob.Variable("old_objective") + c = prob.Constraint(model.solver.objective.expression - v, + lb=0.0, ub=0.0, name="old_objective_constraint") + to_add = [v, c] + model.objective = prob.Objective(Zero, direction="min", sloppy=True) + obj_vars = [] + for r in model.reactions: + flux = solution.fluxes[r.id] + if method == 'lMOMA': + components = sutil.add_absolute_expression(model, r.flux_expression, name="moma_dist_" + r.id, + difference=flux, add=False) + to_add.extend(components) + obj_vars.append(components.variable) + else: + dist = prob.Variable("moma_dist_" + r.id) + const = prob.Constraint(r.flux_expression - dist, lb=flux, ub=flux, name="moma_constraint_" + r.id) + to_add.extend([dist, const]) + if method == 'sqMOMO': + flux_scale = getattr(r, 'flux_scale', 1.0) + obj_vars.append(1.0/flux_scale * dist**2) + else: + obj_vars.append(dist**2) + + model.add_cons_vars(to_add) + if method == 'lMOMA': + model.objective.set_linear_coefficients({v: 1.0 for v in obj_vars}) + else: + model.objective = prob.Objective(add(obj_vars), direction="min", sloppy=True) + + +def add_room_ps(model, method, solution=None, delta=0.03, epsilon=1e-03): + + if "old_objective" in model.solver.variables: + raise ValueError("Model is already adjusted for ROOM.") + + prob = model.problem + + v = prob.Variable("old_objective", ub=solution.objective_value) + c = prob.Constraint(model.solver.objective.expression - v, + ub=0.0, lb=0.0, name="old_objective_constraint") + to_add = [v, c] + + model.objective = prob.Objective(Zero, direction="min", sloppy=True) + obj_vars = [] + + y_variables = [] + for rxn in model.reactions: + flux = solution.fluxes[rxn.id] + + if method == 'lROOM': + y = prob.Variable("y_" + rxn.id, lb=0, ub=1) + delta = epsilon = 0.0 + else: + y = prob.Variable("y_" + rxn.id, type="binary") + + # upper constraint + w_u = flux + (delta * abs(flux)) + epsilon + upper_const = prob.Constraint(rxn.flux_expression - y * (rxn.upper_bound - w_u), + ub=w_u, name="room_constraint_upper_" + rxn.id) + to_add.extend([y, upper_const]) + + # lower constraint + if method != 'ROOMw' and method != 'ROOMup': + w_l = flux - (delta * abs(flux)) - epsilon + lower_const = prob.Constraint(rxn.flux_expression - y * (rxn.lower_bound - w_l), + lb=w_l, name="room_constraint_lower_" + rxn.id) + to_add.extend([lower_const]) + obj_vars.append(y) + + if method == 'ROOMw': + increases_const = prob.Constraint(add(obj_vars), lb=0, name="roomw_constraint_increases") + to_add.extend([increases_const]) + + model.add_cons_vars(to_add) + model.objective.set_linear_coefficients({v: 1.0 for v in obj_vars}) + + -- GitLab