# Copyright 2021 Ginkgo Bioworks
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Model class that extends `cobra.Model` to account for enzyme constraints."""
import logging
from collections import defaultdict
from copy import copy, deepcopy
from functools import partial
from math import isnan
from typing import Dict, Iterator, List, Optional, Tuple, Union
import cobra
import pandas as pd
from cobra import Metabolite
from cobra.core.dictlist import DictList
from cobra.util.context import get_context
from optlang.symbolics import Zero
from .protein import Protein
from .reaction import Reaction
[docs]LOGGER = logging.getLogger(__name__)
[docs]class Model(cobra.Model):
"""Extension of cobra.Model providing an API for proteins in EC models.
Attributes
----------
reactions : DictList
A DictList where the key is the reaction identifier and the value a Reaction.
metabolites : DictList
A DictList where the key is the metabolite identifier and the value a
Metabolite.
proteins : DictList
A DictList where the key is the metabolite identifier and the value a Protein.
genes : DictList
A DictList where the key is the gene identifier and the value a Gene.
groups : DictList
A DictList where the key is the group identifier and the value a Group.
solution : Solution
# TODO: separate proteins from cobra.Reactions
The last obtained solution from optimizing the model.
"""
[docs] def __setstate__(self, state: Dict):
"""Make sure all cobra.Objects in the model point to the model."""
self.__dict__.update(state)
for y in ["reactions", "genes", "metabolites", "proteins"]:
for x in getattr(self, y):
x._model = self
if not hasattr(self, "name"):
self.name = None
def __init__(
self,
id_or_model: Union[str, cobra.Model] = None,
name: str = None,
hardcoded_rev_reactions: bool = True,
):
"""Initialize model."""
# TODO: refactor from cobra.Model so that internal matrix does not get
# repopulated 100 times
self.proteins = DictList()
self.hardcoded_rev_reactions = hardcoded_rev_reactions
if isinstance(id_or_model, cobra.Model) and not hasattr(
id_or_model, "proteins"
):
self.from_cobra(id_or_model, name)
else:
super().__init__(id_or_model, name)
[docs] def from_cobra(self, model: cobra.Model, name: str):
"""Initialize from cobra model."""
# gather prots from by naming convention
super().__init__(model.id, name)
g_proteins = [met for met in model.metabolites if met.id.startswith("prot_")]
# gather prots from by SBML group
if model.groups.query("Protein"):
g_proteins += model.groups.Protein.members
g_proteins = set(g_proteins)
model.remove_metabolites(g_proteins)
self.add_metabolites(
[met for met in model.metabolites if met not in g_proteins]
)
self.add_reactions(
[reac for reac in model.reactions if not reac.id.startswith("prot_")]
)
self.add_proteins([Protein(prot) for prot in g_proteins])
# point every previous protein Metabolite to an actual Protein object
for reac in self.reactions:
reac._metabolites = {
met if met not in g_proteins else self.proteins.get_by_id(met.id): val
for met, val in reac.metabolites.items()
}
self.__setstate__(self.__dict__)
self._populate_solver(self.reactions, self.metabolites, self.proteins)
[docs] def get_total_measured_proteins(self) -> float:
"""Sum of all `Proteins` in the model that has a concentration."""
measured = sum(
prot.concentration
for prot in self.proteins
# nan + number = nan!
if not isnan(prot.concentration) and prot.concentration
)
return 0 if isnan(measured) or not measured else measured
[docs] def constrain_pool(
self,
p_total: float,
sigma_saturation_factor: float,
fn_mass_fraction_unmeasured_matched: float,
protein_list: List[str] = None,
):
"""Constrain the draw reactions for the unmeasured (common protein pool) proteins.
Adapted from [geckopy]
(https://github.com/SysBioChalmers/GECKO/blob/master/geckopy/geckopy/gecko.py#L184)
Proteins without their own protein pool are collectively constrained by the
common protein pool. Remove protein pools for all proteins that don't have
measurements, along with corresponding draw reactions, and add these to
the common protein pool and reaction.
Parameters
----------
p_total: float
measured total protein fraction in cell in g protein / gDW
sigma_saturation_factor: float
part of proteome that can be used by metabolism
fn_mass_fraction_unmeasured_matched: float
TODO: add convenience function to handle this
sum of the product of average abundances of unmesured proteins
(from, e.g., paxDB) times their molecular weight.
protein_list: List[str]
list of ids of proteins to constrain. This is useful for inspecting
the protein utilization of, e.g., a heterologous pathway.
"""
proteins = (
[self.proteins.get_by_id(prot_id) for prot_id in protein_list]
if protein_list
else self.proteins
)
unmeasured_prots = [
prot
for prot in proteins
if prot.concentration and ~isnan(prot.concentration)
]
if not unmeasured_prots:
return
self.add_pool()
# if there are no proteins measured, the calculations are simplified
fs_matched_adjusted = p_total * sigma_saturation_factor
if len(unmeasured_prots) != len(proteins):
p_measured = self.get_total_measured_proteins()
# * section 2.5.1
# 1. and 2. introduce `prot_pool` and exchange reaction done in __init__
# 3. limiting total usage with the unmeasured amount of protein
f_mass_fraction_measured_matched_to_total = (
fn_mass_fraction_unmeasured_matched / (1 - p_measured / p_total)
)
fs_matched_adjusted = (
(p_total - p_measured)
* f_mass_fraction_measured_matched_to_total
* sigma_saturation_factor
)
self.protein_pool_exchange.bounds = 0, fs_matched_adjusted
m_weigths = [prot.mw for prot in proteins if prot.mw]
average_mmw = sum(m_weigths) / len(m_weigths) / 1000.0
for protein in unmeasured_prots:
mmw = protein.mw / 1000 if protein.mw else average_mmw
protein.suscribe_to_pool(mmw)
[docs] def add_pool(self, reac_id: str = "prot_pool_exchange", met_id: str = "prot_pool"):
"""Add the reaction and metabolite to constraint the common pool of proteins.
Parameters
----------
read_id: str
id of the common protein pool pseudorreaction to add.
met_id: str
id of the common protein pool metabolite to add.
"""
try:
self.common_protein_pool = self.metabolites.get_by_id(met_id)
except KeyError:
self.common_protein_pool = Metabolite(met_id)
try:
self.protein_pool_exchange = self.reactions.get_by_id(reac_id)
except KeyError:
self.protein_pool_exchange = Reaction(reac_id)
self.protein_pool_exchange.add_metabolites({self.common_protein_pool: 1.0})
self.add_reactions([self.protein_pool_exchange])
[docs] def copy(self):
"""Provide a partial 'deepcopy' of the Model.
All of the Metabolite, Gene, and Reaction objects are created anew but
in a faster fashion than deepcopy.
Enzyme constrained changes: also deepcopy proteins.
"""
new = self.__class__()
do_not_copy_by_ref = {
"metabolites",
"reactions",
"proteins",
"genes",
"notes",
"annotation",
"groups",
}
for attr in self.__dict__:
if attr not in do_not_copy_by_ref:
new.__dict__[attr] = self.__dict__[attr]
new.notes = deepcopy(self.notes)
new.annotation = deepcopy(self.annotation)
new.metabolites = DictList()
do_not_copy_by_ref = {"_reaction", "_model"}
for metabolite in self.metabolites:
new_met = metabolite.__class__()
for attr, value in metabolite.__dict__.items():
if attr not in do_not_copy_by_ref:
new_met.__dict__[attr] = copy(value) if attr == "formula" else value
new_met._model = new
new.metabolites.append(new_met)
new.proteins = DictList()
for protein in self.proteins:
new_protein = protein.__class__()
for attr, value in protein.__dict__.items():
if attr not in do_not_copy_by_ref:
new_protein.__dict__[attr] = (
copy(value) if attr == "formula" else value
)
new_protein._model = new
for attr, value in protein.__dict__.items():
if attr not in do_not_copy_by_ref:
new_protein.__dict__[attr] = copy(value)
new_protein._model = new
new.proteins.append(new_protein)
# update awareness
for metabolite, stoic in protein._metabolites.items():
if metabolite not in new.proteins:
new_met = new.metabolites.get_by_id(metabolite.id)
new_protein._metabolites[new_met] = stoic
new_met._protein.add(new_protein)
new.genes = DictList()
for gene in self.genes:
new_gene = gene.__class__(None)
for attr, value in gene.__dict__.items():
if attr not in do_not_copy_by_ref:
new_gene.__dict__[attr] = (
copy(value) if attr == "formula" else value
)
new_gene._model = new
new.genes.append(new_gene)
new.reactions = DictList()
do_not_copy_by_ref = {"_model", "_metabolites", "_genes"}
for reaction in self.reactions:
new_reaction = reaction.__class__()
for attr, value in reaction.__dict__.items():
if attr not in do_not_copy_by_ref:
new_reaction.__dict__[attr] = copy(value)
new_reaction._model = new
new.reactions.append(new_reaction)
# update awareness
for metabolite, stoic in reaction._metabolites.items():
if metabolite in new.proteins:
new_met = new.proteins.get_by_id(metabolite.id)
new_reaction._metabolites[new_met] = stoic
new_met._reaction.add(new_reaction)
else:
# regular met
new_met = new.metabolites.get_by_id(metabolite.id)
new_reaction._metabolites[new_met] = stoic
new_met._reaction.add(new_reaction)
for gene in reaction._genes:
new_gene = new.genes.get_by_id(gene.id)
new_reaction._genes.add(new_gene)
new_gene._reaction.add(new_reaction)
new.groups = DictList()
do_not_copy_by_ref = {"_model", "_members"}
# Groups can be members of other groups. We initialize them first and
# then update their members.
for group in self.groups:
new_group = group.__class__(group.id)
for attr, value in group.__dict__.items():
if attr not in do_not_copy_by_ref:
new_group.__dict__[attr] = copy(value)
new_group._model = new
new.groups.append(new_group)
for group in self.groups:
new_group = new.groups.get_by_id(group.id)
# update awareness, as in the reaction copies
new_objects = []
for member in group.members:
if isinstance(member, Metabolite):
new_object = new.metabolites.get_by_id(member.id)
elif isinstance(member, Reaction):
new_object = new.reactions.get_by_id(member.id)
elif isinstance(member, cobra.Gene):
new_object = new.genes.get_by_id(member.id)
elif isinstance(member, Protein):
new_object = new.proteins.get_by_id(member.id)
elif isinstance(member, cobra.core.Group):
new_object = new.genes.get_by_id(member.id)
else:
raise TypeError(
"The group member {!r} is unexpectedly not a "
"metabolite, reaction, gene, nor another "
"group.".format(member)
)
new_objects.append(new_object)
new_group.add_members(new_objects)
try:
new._solver = deepcopy(self.solver)
# Cplex has an issue with deep copies
except Exception: # pragma: no cover
new._solver = copy(self.solver) # pragma: no cover
# it doesn't make sense to retain the context of a copied model so
# assign a new empty context
new._contexts = list()
return new
[docs] def _populate_solver(
self,
reaction_list: Iterator[Reaction],
metabolite_list: Iterator[Metabolite] = None,
protein_list: Iterator[Protein] = None,
):
"""Populate attached solver with LP problem given reactions + proteins.
Note that proteins are added both as Constraints and Variables.
"""
constraint_terms = defaultdict(lambda: defaultdict(float))
to_add = []
metabolite_list = metabolite_list if metabolite_list is not None else []
protein_list = protein_list if protein_list is not None else []
if metabolite_list or protein_list:
for met in metabolite_list + protein_list:
if met.id not in self.constraints:
# this condition only applies when passing `protein_list`
to_add += [self.problem.Constraint(Zero, name=met.id, lb=0, ub=0)]
self.add_cons_vars(to_add)
for reaction in reaction_list + protein_list:
reaction_id = reaction.id
if reaction_id not in self.variables:
forward_variable = self.problem.Variable(reaction_id)
reverse_variable = self.problem.Variable(reaction.reverse_id)
self.add_cons_vars([forward_variable, reverse_variable])
else:
try:
reaction = self.reactions.get_by_id(reaction_id)
except KeyError:
reaction = self.proteins.get_by_id(reaction_id)
forward_variable = reaction.forward_variable
reverse_variable = reaction.reverse_variable
for metabolite, coeff in reaction.metabolites.items():
if metabolite.id in self.constraints:
constraint = self.constraints[metabolite.id]
else:
constraint = self.problem.Constraint(
Zero, name=metabolite.id, lb=0, ub=0
)
self.add_cons_vars(constraint, sloppy=True)
constraint_terms[constraint][forward_variable] = coeff
# the protein is used on both directions (if no legacy model)
constraint_terms[constraint][reverse_variable] = (
coeff
if not self.hardcoded_rev_reactions
and isinstance(metabolite, Protein)
# proteins' pseudoexchanges should have normal coefficients
and not isinstance(reaction, Protein)
else -coeff
)
self.solver.update()
for reaction in reaction_list:
self.reactions.get_by_id(reaction.id).update_variable_bounds()
for protein in protein_list:
self.proteins.get_by_id(protein.id).update_variable_bounds()
for constraint, terms in constraint_terms.items():
constraint.set_linear_coefficients(terms)
[docs] def add_proteins(self, protein_list: Iterator[Protein]):
"""Add proteins to the model, in the same fashion as `.add_metabollites`."""
if not hasattr(protein_list, "__iter__"):
protein_list = [protein_list]
if len(protein_list) == 0:
return None
# Take left difference
pruned = [x for x in protein_list if x.id not in self.proteins]
for prot in pruned:
prot._model = self
to_add = []
for prot in pruned:
if prot.id not in self.constraints:
constraint = self.problem.Constraint(Zero, name=prot.id, lb=0, ub=0)
to_add += [constraint]
self.add_cons_vars(to_add)
context = get_context(self)
if context:
context(partial(self.proteins.__isub__, pruned))
for x in pruned:
context(partial(setattr, x, "_model", None))
self.proteins += pruned
self._populate_solver([], None, pruned)
[docs] def add_reactions(self, reaction_list: Iterator[Reaction]):
"""Add reactions to the model.
Reactions with identifiers identical to a reaction already in the
model are ignored.
The change is reverted upon exit when using the model as a context.
Enzyme Constrained changes: avoid adding proteins as metabolites.
Parameters
----------
reaction_list : list
A list of `cobra.Reaction` objects
"""
def existing_filter(rxn):
if rxn.id in self.reactions:
LOGGER.warning(f"Ignoring reaction '{rxn.id}' since it already exists.")
return False
return True
# First check whether the reactions exist in the model.
pruned = DictList(filter(existing_filter, reaction_list))
context = get_context(self)
# Add reactions. Also take care of genes and metabolites in the loop.
for reaction in pruned:
reaction._model = self
# Build a `list()` because the dict will be modified in the loop.
for metabolite in list(reaction.metabolites):
is_prot = isinstance(metabolite, Protein)
target = self.proteins if is_prot else self.metabolites
if metabolite not in target:
if is_prot:
self.add_proteins([metabolite])
else:
self.add_metabolites(metabolite)
# A copy of the metabolite exists in the model, the reaction
# needs to point to the metabolite in the model.
else:
stoichiometry = reaction._metabolites.pop(metabolite)
model_metabolite = target.get_by_id(metabolite.id)
reaction._metabolites[model_metabolite] = stoichiometry
model_metabolite._reaction.add(reaction)
if context:
context(partial(model_metabolite._reaction.remove, reaction))
for gene in list(reaction._genes):
# If the gene is not in the model, add it
if not self.genes.has_id(gene.id):
self.genes += [gene]
gene._model = self
if context:
# Remove the gene later
context(partial(self.genes.__isub__, [gene]))
context(partial(setattr, gene, "_model", None))
# Otherwise, make the gene point to the one in the model
else:
model_gene = self.genes.get_by_id(gene.id)
if model_gene is not gene:
reaction._dissociate_gene(gene)
reaction._associate_gene(model_gene)
self.reactions += pruned
if context:
context(partial(self.reactions.__isub__, pruned))
# from cameo ...
self._populate_solver(pruned)
[docs] def optimize(
self, objective_sense: Optional[str] = None, raise_error: bool = False
) -> Tuple[cobra.Solution, cobra.Solution]:
"""Optimize the model using flux balance analysis.
Parameters
----------
objective_sense : {None, 'maximize' 'minimize'}, optional
Whether fluxes should be maximized or minimized. In case of None,
the previous direction is used.
raise_error : bool
If true, raise an OptimizationError if solver status is not
optimal.
Notes
-----
Only the most commonly used parameters are presented here. Additional
parameters for cobra.solvers may be available and specified with the
appropriate keyword argument.
"""
solution = super().optimize(objective_sense, raise_error)
solution_prot = cobra.core.get_solution(
self, reactions=self.proteins, metabolites=self.proteins
)
solution_prot.contributions = solution_prot.fluxes
if all(solution_prot.shadow_prices == 0):
# workaround for wrong shadow prices of proteins
try:
prot_index = [prot.id for prot in self.proteins]
solution_prot.shadow_prices = pd.Series(
index=prot_index,
data=[
-self.variables[prot.id].dual
+ self.variables[prot.reverse_id].dual
for prot in prot_index
],
name="shadow_prices",
)
except Exception:
pass
return solution, solution_prot
[docs] def add_boundary(
self,
metabolite: Union[cobra.Metabolite, Protein],
type: str = "exchange",
reaction_id: Optional[str] = None,
lb: Optional[float] = None,
ub: Optional[float] = None,
sbo_term: Optional[str] = None,
) -> Reaction:
"""Add a boundary reaction for a given metabolite.
Enzyme constraint changes: return an geckopy.Reaction.
"""
rxn = super().add_boundary(metabolite, type, reaction_id, lb, ub, sbo_term)
return Reaction(rxn)