Skip to content
Snippets Groups Projects
Commit 18879fe5 authored by Peter Schubert's avatar Peter Schubert
Browse files

Initial commit - test

parent eca9bc52
Branches
No related tags found
No related merge requests found
# code copied from
# cobra.flux_analysis.deletion
# cobra.flux_analysis.moma
# cobra.flux_analysis.room
# and modified, so we can implement sqMOMA and ROOMw
# as required for Deya Alzoubi Manuscript
# Peter Schubert, 26.05.2023
"""Provide functions for reaction and gene deletions."""
import math
from functools import partial
from itertools import product
from typing import TYPE_CHECKING, List, Optional, Tuple, Union
import pandas as pd
from optlang.exceptions import SolverError
from optlang.symbolics import Zero, add
from cobra.core import Configuration, Gene, Model, Reaction
from cobra.util import ProcessPool
from cobra.util import solver as sutil
from cobra.flux_analysis.parsimonious import pfba
if TYPE_CHECKING:
from cobra import Solution
configuration = Configuration()
def _get_growth(model: Model) -> Tuple[float, str]:
"""Return the growth from the `model`.
Parameters
----------
model : cobra.Model
The model to obtain growth for.
Returns
-------
float
The obtained growth value. Returns nan if there is some error while
optimizing.
"""
try:
if "old_objective" in model.solver.variables:
if math.isfinite(model.slim_optimize()):
growth = model.solver.variables.old_objective.primal
else:
growth = float("nan")
else:
growth = model.slim_optimize()
except SolverError:
growth = float("nan")
return growth, model.solver.status
def _gene_deletion_ps(model, method, gene_ids):
with model:
# determine wild type growth for for ROOMw
if method == 'ROOMw':
model.slim_optimize()
wt_growth = model.solver.variables['old_objective'].primal
for gene_id in gene_ids:
model.genes.get_by_id(gene_id).knock_out()
# for ROOMw, determine number of flux increases to achieve 5% of wt growth
if method == 'ROOMw':
model.solver.variables["old_objective"].lb = 0.05 * wt_growth
n_increases = model.slim_optimize()
model.solver.variables["old_objective"].lb = 0.0
status = model.solver.status
if status == 'optimal':
model.solver.constraints["roomw_constraint_increases"].lb = n_increases
growth, status = _get_growth(model)
n = model.slim_optimize()
model.solver.constraints["roomw_constraint_increases"].lb = 0
else:
growth = float("nan")
n = float("nan")
else:
growth, status = _get_growth(model)
n = model.slim_optimize()
if 'ROOM' in method:
return gene_ids, growth, status, n
else:
return gene_ids, growth, status
def _gene_deletion_worker(ids: List[str]) -> Tuple[List[str], float, str]:
"""Perform gene deletions on worker process.
Parameters
----------
ids : list of str
The gene IDs to knock-out.
Returns
-------
tuple of (list of str, float, str)
A tuple containing gene IDs knocked out, growth of the model and
the solver status.
"""
global _model
global _method
return _gene_deletion_ps(_model, _method, ids)
def _init_worker(model: Model, method: str) -> None:
"""Initialize worker process."""
global _model
global _method
_model = model
_method = method
def _multi_deletion_ps(model: Model, element_lists: List[str],
method: str, solution: Optional["Solution"] = None,
processes: Optional[int] = None, **kwargs) -> pd.DataFrame:
"""Provide a common interface for single or multiple knockouts.
Parameters
----------
model : cobra.Model
The metabolic model to perform deletions in.
element_lists : list of cobra.Gene or cobra.Reaction
List of cobra.Gene or cobra.Reaction to be deleted.
method : {"sqMOMA", "ROOM", "lROOM", "ROOMw"}
Method used to predict the growth rate (default "fba").
solution : cobra.Solution, optional
A previous solution to use as a reference for (linear) MOMA or ROOM
(default None).
processes : int, optional
The number of parallel processes to run. Can speed up the computations
if the number of knockouts to perform is large. If not passed,
will be set to `configuration.processes` (default None).
**kwargs :
Passed on to underlying simulation functions.
Returns
-------
pandas.DataFrame
A representation of all combinations of entity deletions. The
columns are 'growth' and 'status', where
index : tuple(str)
The gene or reaction identifiers that were knocked out.
growth : float
The growth rate of the adjusted model.
status : str
The solution's status.
"""
methods = ['MOMA', 'lMOMA', 'sqMOMA', 'ROOM', 'lROOM', 'ROOMw', 'ROOMup']
if method not in methods:
raise RuntimeError(f'Wrong method {method}. Use any of [{methods}]')
solver = sutil.interface_to_str(model.problem.__name__)
if 'MOMA' in method and solver not in sutil.qp_solvers:
raise RuntimeError(f"Cannot use MOMA since '{solver}' is not QP-capable. "
"Please choose a different solver or use FBA only.")
if processes is None:
processes = configuration.processes
with model:
if 'MOMA' in method:
add_moma_ps(model, method=method, solution=solution)
elif 'ROOM' in method:
add_room_ps(model, method=method, solution=solution, **kwargs)
args = {frozenset(comb) for comb in product(*element_lists)}
processes = min(processes, len(args))
print(f'{method}: constraints: {len(model.constraints)}, variables: {len(model.variables)}') ###
def extract_knockout_results_moma(result_iter):
result = pd.DataFrame([(set(ids), growth, status) for (ids, growth, status) in result_iter],
columns=["ids", "growth", "status"])
return result
def extract_knockout_results_room(result_iter):
result = pd.DataFrame([(set(ids), growth, status, n) for (ids, growth, status, n) in result_iter],
columns=["ids", "growth", "status", "n"])
return result
if processes > 1:
worker = _gene_deletion_worker
chunk_size = len(args) // processes
with ProcessPool(processes, initializer=_init_worker, initargs=(model, method,)) as pool:
if 'MOMA' in method:
results = extract_knockout_results_moma(
pool.imap_unordered(worker, args, chunksize=chunk_size))
if 'ROOM' in method:
results = extract_knockout_results_room(
pool.imap_unordered(worker, args, chunksize=chunk_size))
else:
worker = _gene_deletion_ps
if 'MOMA' in method:
results = extract_knockout_results_moma(map(partial(worker, model, method), args))
if 'ROOM' in method:
results = extract_knockout_results_room(map(partial(worker, model, method), args))
return results
def _entities_ids(entities: List[Union[str, Gene, Reaction]]) -> List[str]:
"""Return the IDs of the `entities`.
Parameters
----------
entities : list of str or cobra.Gene or cobra.Reaction
The list of entities whose IDs need to be returned.
Returns
-------
list of str
The IDs of the `entities`.
"""
try:
return [e.id for e in entities]
except AttributeError:
return list(entities)
def _element_lists(entities: List[Union[str, Gene, Reaction]], *ids: List[str]) -> List[str]:
"""Return the elements.
Parameters
----------
entities : list of str or cobra.Gene or cobra.Reaction
The list of entities.
*ids : list of str
The list of IDs.
Returns
-------
list of str
The list of IDs.
"""
lists = list(ids)
if lists[0] is None:
lists[0] = entities
result = [_entities_ids(lists[0])]
for _list in lists[1:]:
if _list is None:
result.append(result[-1])
else:
result.append(_entities_ids(_list))
return result
def single_gene_deletion(model: Model, gene_list: Optional[List[Union[Gene, str]]] = None,
method: str = "sqMOMA", solution: Optional["Solution"] = None,
processes: Optional[int] = None, **kwargs,) -> pd.DataFrame:
"""Knock out each gene from `gene_list`.
Parameters
----------
model : cobra.Model
The metabolic model to perform deletions in.
gene_list : list of cobra.Gene or str, optional
The gene objects to be deleted. If not passed, all the genes from the
model are used (default None).
method : {"sqMOMA", "ROOMw"}
solution : cobra.Solution, optional
A previous solution to use as a reference for (linear) MOMA or ROOM
(default None).
processes : int, optional
The number of parallel processes to run. Can speed up the computations
if the number of knockouts to perform is large. If not passed,
will be set to `configuration.processes` (default None).
**kwargs :
Keyword arguments are passed on to underlying simulation functions
such as `add_room`.
Returns
-------
pandas.DataFrame
A representation of all single gene deletions. The columns are
'growth' and 'status', where
index : tuple(str)
The gene identifier that was knocked out.
growth : float
The growth rate of the adjusted model.
status : str
The solution's status.
"""
return _multi_deletion_ps(model, element_lists=_element_lists(model.genes, gene_list),
method=method, solution=solution, processes=processes, **kwargs,)
def add_moma_ps(model: "Model", method: str, solution: Optional["Solution"] = None) -> None:
"""
Add sqMOMA constraints and objective representing to the `model`.
This adds variables and constraints for the minimization of metabolic
adjustment (MOMA) to the model.
Parameters
----------
model : cobra.Model
The model to add MOMA constraints and objective to.
method : {"sqMOMA"}
Method used to predict the growth rate.
solution : cobra.Solution, optional
A previous solution to use as a reference. If no solution is given,
one will be computed using pFBA (default None).
Notes
-----
In the original MOMA [1]_ specification, one looks for the flux
distribution of the deletion (v^d) closest to the fluxes without the
deletion (v).
In math this means:
minimize: \sum_i (v^d_i - v_i)^2
s.t. : Sv^d = 0
lb_i \le v^d_i \le ub_i
Here, we use a variable transformation v^t := v^d_i - v_i. Substituting
and using the fact that Sv = 0 gives:
minimize: \sum_i (v^t_i)^2
s.t. : Sv^d = 0
v^t = v^d_i - v_i
lb_i \le v^d_i \le ub_i
So, basically we just re-center the flux space at the old solution and
then find the flux distribution closest to the new zero (center). This
is the same strategy as used in cameo.
In the case of linear MOMA [2]_, we instead minimize \sum_i abs(v^t_i).
The linear MOMA is typically significantly faster. Also, quadratic MOMA
tends to give flux distributions in which all fluxes deviate from the
reference fluxes a little bit whereas linear MOMA tends to give flux
distributions where the majority of fluxes are the same reference with
few fluxes deviating a lot (typical effect of L2 norm vs L1 norm).
The former objective function is saved in the optlang solver interface as
``"moma_old_objective"`` and this can be used to immediately extract the
value of the former objective after MOMA optimization.
See Also
--------
pfba : parsimonious FBA
References
----------
.. [1] Segrè, Daniel, Dennis Vitkup, and George M. Church. “Analysis of
Optimality in Natural and Perturbed Metabolic Networks.”
Proceedings of the National Academy of Sciences 99, no. 23
(November 12, 2002): 15112. https://doi.org/10.1073/pnas.232349399.
.. [2] Becker, Scott A, Adam M Feist, Monica L Mo, Gregory Hannum,
Bernhard Ø Palsson, and Markus J Herrgard. “Quantitative
Prediction of Cellular Metabolism with Constraint-Based Models:
The COBRA Toolbox.” Nature Protocols 2 (March 29, 2007): 727.
"""
if "moma_old_objective" in model.solver.variables:
raise ValueError("The model is already adjusted for MOMA.")
# Fall back to default QP solver if current one has no QP capability
if sutil.interface_to_str(model.problem) not in sutil.qp_solvers:
model.solver = sutil.choose_solver(model, qp=True)
if solution is None:
solution = pfba(model)
prob = model.problem
v = prob.Variable("old_objective")
c = prob.Constraint(model.solver.objective.expression - v,
lb=0.0, ub=0.0, name="old_objective_constraint")
to_add = [v, c]
model.objective = prob.Objective(Zero, direction="min", sloppy=True)
obj_vars = []
for r in model.reactions:
flux = solution.fluxes[r.id]
if method == 'lMOMA':
components = sutil.add_absolute_expression(model, r.flux_expression, name="moma_dist_" + r.id,
difference=flux, add=False)
to_add.extend(components)
obj_vars.append(components.variable)
else:
dist = prob.Variable("moma_dist_" + r.id)
const = prob.Constraint(r.flux_expression - dist, lb=flux, ub=flux, name="moma_constraint_" + r.id)
to_add.extend([dist, const])
if method == 'sqMOMO':
flux_scale = getattr(r, 'flux_scale', 1.0)
obj_vars.append(1.0/flux_scale * dist**2)
else:
obj_vars.append(dist**2)
model.add_cons_vars(to_add)
if method == 'lMOMA':
model.objective.set_linear_coefficients({v: 1.0 for v in obj_vars})
else:
model.objective = prob.Objective(add(obj_vars), direction="min", sloppy=True)
def add_room_ps(model, method, solution=None, delta=0.03, epsilon=1e-03):
if "old_objective" in model.solver.variables:
raise ValueError("Model is already adjusted for ROOM.")
prob = model.problem
v = prob.Variable("old_objective", ub=solution.objective_value)
c = prob.Constraint(model.solver.objective.expression - v,
ub=0.0, lb=0.0, name="old_objective_constraint")
to_add = [v, c]
model.objective = prob.Objective(Zero, direction="min", sloppy=True)
obj_vars = []
y_variables = []
for rxn in model.reactions:
flux = solution.fluxes[rxn.id]
if method == 'lROOM':
y = prob.Variable("y_" + rxn.id, lb=0, ub=1)
delta = epsilon = 0.0
else:
y = prob.Variable("y_" + rxn.id, type="binary")
# upper constraint
w_u = flux + (delta * abs(flux)) + epsilon
upper_const = prob.Constraint(rxn.flux_expression - y * (rxn.upper_bound - w_u),
ub=w_u, name="room_constraint_upper_" + rxn.id)
to_add.extend([y, upper_const])
# lower constraint
if method != 'ROOMw' and method != 'ROOMup':
w_l = flux - (delta * abs(flux)) - epsilon
lower_const = prob.Constraint(rxn.flux_expression - y * (rxn.lower_bound - w_l),
lb=w_l, name="room_constraint_lower_" + rxn.id)
to_add.extend([lower_const])
obj_vars.append(y)
if method == 'ROOMw':
increases_const = prob.Constraint(add(obj_vars), lb=0, name="roomw_constraint_increases")
to_add.extend([increases_const])
model.add_cons_vars(to_add)
model.objective.set_linear_coefficients({v: 1.0 for v in obj_vars})
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment