import six
from symengine import Add, sympify
from numpy import abs
import argparse
import time
from dexom_python.model_functions import read_model, check_model_options, load_reaction_weights
from dexom_python.result_functions import write_solution
[docs]def create_full_variables(model, reaction_weights, epsilon, threshold):
# the x_rid variables represent a binary condition of flux activation
for rxn in model.reactions:
if "x_" + rxn.id not in model.solver.variables:
rid = rxn.id
xtot = model.solver.interface.Variable("x_%s" % rid, type="binary")
xf = model.solver.interface.Variable("xf_%s" % rid, type="binary")
xr = model.solver.interface.Variable("xr_%s" % rid, type="binary")
model.solver.add(xtot)
model.solver.add(xf)
model.solver.add(xr)
xtot_def = model.solver.interface.Constraint(xtot - xf - xr, lb=0., ub=0., name="x_%s_def" % rid)
xf_upper = model.solver.interface.Constraint(
rxn.forward_variable - rxn.upper_bound * xf, ub=0., name="xr_%s_upper" % rid)
xr_upper = model.solver.interface.Constraint(
rxn.reverse_variable + rxn.lower_bound * xr, ub=0., name="xf_%s_upper" % rid)
temp = threshold
if rid in reaction_weights:
if reaction_weights[rid] > 0.:
temp = epsilon
xf_lower = model.solver.interface.Constraint(
rxn.forward_variable - temp * xf, lb=0., name="xf_%s_lower" % rid)
xr_lower = model.solver.interface.Constraint(
rxn.reverse_variable - temp * xr, lb=0., name="xr_%s_lower" % rid)
model.solver.add(xtot_def)
model.solver.add(xf_upper)
model.solver.add(xr_upper)
model.solver.add(xf_lower)
model.solver.add(xr_lower)
return model
[docs]def create_partial_variables(model, reaction_weights, epsilon): # for the driven-based imat implementation
for rid, weight in six.iteritems(reaction_weights):
if weight > 0: # the rh_rid variables represent the highly expressed reactions
if "rh_" + rid + "_pos" not in model.solver.variables:
reaction = model.reactions.get_by_id(rid)
y_pos = model.solver.interface.Variable("rh_%s_pos" % rid, type="binary")
y_neg = model.solver.interface.Variable("rh_%s_neg" % rid, type="binary")
pos_constraint = model.solver.interface.Constraint(
reaction.flux_expression + y_pos * (reaction.lower_bound - epsilon),
lb=reaction.lower_bound, name="rh_%s_lower" % rid)
neg_constraint = model.solver.interface.Constraint(
reaction.flux_expression + y_neg * (reaction.upper_bound + epsilon),
ub=reaction.upper_bound, name="rh_%s_upper" % rid)
model.solver.add(y_pos)
model.solver.add(y_neg)
model.solver.add(pos_constraint)
model.solver.add(neg_constraint)
elif weight < 0: # the rl_rid variables represent the lowly expressed reactions
if "rl_" + rid not in model.solver.variables:
reaction = model.reactions.get_by_id(rid)
x = model.solver.interface.Variable("rl_%s" % rid, type="binary")
pos_constraint = model.solver.interface.Constraint(
(1 - x) * reaction.upper_bound - reaction.flux_expression,
lb=0, name="rl_%s_upper" % rid)
neg_constraint = model.solver.interface.Constraint(
(1 - x) * reaction.lower_bound - reaction.flux_expression,
ub=0, name="rl_%s_lower" % rid)
model.solver.add(x)
model.solver.add(pos_constraint)
model.solver.add(neg_constraint)
return model
[docs]def create_new_partial_variables(model, reaction_weights, epsilon, threshold): # these variables allow for better performance in enumeration
for rid, weight in six.iteritems(reaction_weights):
if weight > 0:
if "rh_" + rid + "_pos" not in model.solver.variables:
rxn = model.reactions.get_by_id(rid)
xtot = model.solver.interface.Variable("x_%s" % rid, type="binary")
xf = model.solver.interface.Variable("rh_%s_pos" % rid, type="binary")
xr = model.solver.interface.Variable("rh_%s_neg" % rid, type="binary")
model.solver.add(xtot)
model.solver.add(xf)
model.solver.add(xr)
xtot_def = model.solver.interface.Constraint(xtot - xf - xr, lb=0., ub=0., name="x_%s_def" % rid)
xf_upper = model.solver.interface.Constraint(
rxn.forward_variable - rxn.upper_bound * xf, ub=0., name="xr_%s_upper" % rid)
xr_upper = model.solver.interface.Constraint(
rxn.reverse_variable + rxn.lower_bound * xr, ub=0., name="xf_%s_upper" % rid)
xf_lower = model.solver.interface.Constraint(
rxn.forward_variable - epsilon * xf, lb=0., name="xf_%s_lower" % rid)
xr_lower = model.solver.interface.Constraint(
rxn.reverse_variable - epsilon * xr, lb=0., name="xr_%s_lower" % rid)
model.solver.add(xtot_def)
model.solver.add(xf_upper)
model.solver.add(xr_upper)
model.solver.add(xf_lower)
model.solver.add(xr_lower)
elif weight < 0: # the rl_rid variables represent the lowly expressed reactions
if "rl_" + rid not in model.solver.variables:
rxn = model.reactions.get_by_id(rid)
xtot = model.solver.interface.Variable("rl_%s" % rid, type="binary")
xf = model.solver.interface.Variable("xf_%s" % rid, type="binary")
xr = model.solver.interface.Variable("xr_%s" % rid, type="binary")
model.solver.add(xtot)
model.solver.add(xf)
model.solver.add(xr)
xtot_def = model.solver.interface.Constraint(xtot - xf - xr, lb=0., ub=0., name="x_%s_def" % rid)
xf_upper = model.solver.interface.Constraint(
rxn.forward_variable - rxn.upper_bound * xf, ub=0., name="xr_%s_upper" % rid)
xr_upper = model.solver.interface.Constraint(
rxn.reverse_variable + rxn.lower_bound * xr, ub=0., name="xf_%s_upper" % rid)
xf_lower = model.solver.interface.Constraint(
rxn.forward_variable - threshold * xf, lb=0., name="xf_%s_lower" % rid)
xr_lower = model.solver.interface.Constraint(
rxn.reverse_variable - threshold * xr, lb=0., name="xr_%s_lower" % rid)
model.solver.add(xtot_def)
model.solver.add(xf_upper)
model.solver.add(xr_upper)
model.solver.add(xf_lower)
model.solver.add(xr_lower)
return model
[docs]def imat(model, reaction_weights={}, epsilon=1e-2, threshold=1e-5, full=False):
"""
Integrative Metabolic Analysis Tool
Parameters
----------
model: cobra.Model
A constraint-based model
reaction_weights: dict
keys are reaction ids, values are int weights
epsilon: float
activation threshold for highly expressed reactions
threshold: float
activation threshold for all reactions
timelimit: int
time limit (in seconds) for the model.optimize() call
feasibility: float
feasibility tolerance of the solver
mipgaptol: float
MIP Gap tolerance of the solver
full: bool
if True, apply constraints on all reactions. if False, only on reactions with non-zero weights
"""
y_variables = list()
x_variables = list()
y_weights = list()
x_weights = list()
t0 = time.perf_counter()
try:
if full: # for the full_imat implementation
model = create_full_variables(model, reaction_weights, epsilon, threshold)
for rid, weight in six.iteritems(reaction_weights):
if weight > 0:
y_pos = model.solver.variables["xf_" + rid]
y_neg = model.solver.variables["xr_" + rid]
y_variables.append([y_neg, y_pos])
y_weights.append(weight)
elif weight < 0:
x = sympify("1") - model.solver.variables["x_" + rid]
x_variables.append(x)
x_weights.append(abs(weight))
else:
model = create_new_partial_variables(model, reaction_weights, epsilon, threshold) # uses new variable implementation
for rid, weight in six.iteritems(reaction_weights):
if weight > 0:
y_neg = model.solver.variables["rh_" + rid + "_neg"]
y_pos = model.solver.variables["rh_" + rid + "_pos"]
y_variables.append([y_neg, y_pos])
y_weights.append(weight)
elif weight < 0:
x = sympify("1") - model.solver.variables["rl_" + rid] # uses new variable implementation
x_variables.append(x)
x_weights.append(abs(weight))
rh_objective = [(y[0] + y[1]) * y_weights[idx] for idx, y in enumerate(y_variables)]
rl_objective = [x * x_weights[idx] for idx, x in enumerate(x_variables)]
objective = model.solver.interface.Objective(Add(*rh_objective) + Add(*rl_objective), direction="max")
model.objective = objective
t1 = time.perf_counter()
with model:
solution = model.optimize()
t2 = time.perf_counter()
print("%.2fs before optimize call" % (t1-t0))
print("%.2fs during optimize call" % (t2-t1))
return solution
finally:
pass
if __name__ == "__main__":
description = "Performs the iMAT algorithm"
parser = argparse.ArgumentParser(description=description, formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument("-m", "--model", help="Metabolic model in sbml, json, or matlab format")
parser.add_argument("-r", "--reaction_weights", default={},
help="Reaction weights in csv format with column names: (reactions, weights)")
parser.add_argument("-e", "--epsilon", type=float, default=1e-4,
help="Activation threshold for highly expressed reactions")
parser.add_argument("--threshold", type=float, default=1e-8, help="Activation threshold for all reactions")
parser.add_argument("-t", "--timelimit", type=int, default=None, help="Solver time limit")
parser.add_argument("--tol", type=float, default=1e-8, help="Solver feasibility tolerance")
parser.add_argument("--mipgap", type=float, default=1e-6, help="Solver MIP gap tolerance")
parser.add_argument("-o", "--output", default="imat_solution", help="Path of the output file, without format")
args = parser.parse_args()
model = read_model(args.model)
check_model_options(model, timelimit=args.timelimit, feasibility=args.tol, mipgaptol=args.mipgap)
reaction_weights = {}
if args.reaction_weights:
reaction_weights = load_reaction_weights(args.reaction_weights)
solution = imat(model, reaction_weights, epsilon=args.epsilon, threshold=args.threshold)
write_solution(model, solution, args.threshold, args.output+".csv")