Source code for dexom_python.enum_functions.rxn_enum


import argparse
import pandas as pd
import numpy as np
from dexom_python.imat import imat, create_new_partial_variables, create_full_variables
from dexom_python.model_functions import load_reaction_weights, read_model, check_model_options
from dexom_python.result_functions import read_solution, write_solution


[docs]class RxnEnumSolution(object): def __init__(self, all_solutions, unique_solutions, all_binary, unique_binary, all_reactions=None, unique_reactions=None): self.all_solutions = all_solutions self.unique_solutions = unique_solutions self.all_binary = all_binary self.unique_binary = unique_binary self.all_reactions = all_reactions self.unique_reactions = unique_reactions
[docs]def rxn_enum(model, reaction_weights, rxn_list, prev_sol, eps=1., thr=1e-1, obj_tol=1e-2): """ Reaction enumeration method Parameters ---------- model: cobrapy Model reaction_weights: dict keys = reactions and values = weights eps: float activation threshold in imat thr: float detection threshold of activated reactions tlim: int time limit for imat tol: float tolerance for imat obj_tol: float variance allowed in the objective_values of the solutions out_name: str name of output files without format Returns ------- solution: RxnEnumSolution object """ tol = model.solver.configuration.tolerances.feasibility prev_sol_bin = (np.abs(prev_sol.fluxes) >= thr-tol).values.astype(int) optimal_objective_value = prev_sol.objective_value - prev_sol.objective_value * obj_tol all_solutions = [prev_sol] all_solutions_binary = [prev_sol_bin] unique_solutions = [prev_sol] unique_solutions_binary = [prev_sol_bin] all_reactions = [] # for each solution, save which reaction was activated/inactived by the algorithm unique_reactions = [] if not rxn_list: rxns = list(model.reactions) rxn_list = [r.id for r in rxns] for idx, rid in enumerate(rxn_list): with model as model_temp: if rid in model.reactions: rxn = model_temp.reactions.get_by_id(rid) # for active fluxes, check inactivation if prev_sol_bin[idx] == 1: rxn.bounds = (0., 0.) # for inactive fluxes, check activation else: upper_bound_temp = rxn.upper_bound # for inactive reversible fluxes, check activation in backwards direction if rxn.lower_bound < 0.: try: rxn.upper_bound = -thr temp_sol = imat(model_temp, reaction_weights, epsilon=eps, threshold=thr) temp_sol_bin = (np.abs(temp_sol.fluxes) >= thr-tol).values.astype(int) if temp_sol.objective_value >= optimal_objective_value: all_solutions.append(temp_sol) all_solutions_binary.append(temp_sol_bin) if temp_sol_bin not in unique_solutions_binary: unique_solutions.append(temp_sol) unique_solutions_binary.append(temp_sol_bin) unique_reactions.append(rid+"_backwards") except: print("An error occurred with reaction %s_reverse. " "Check feasibility of the model when this reaction is irreversible." % rid) finally: rxn.upper_bound = upper_bound_temp # for all inactive fluxes, check activation in forwards direction if rxn.upper_bound >= thr: rxn.lower_bound = thr else: rxn.lower_bound = rxn.upper_bound # for all fluxes: compute solution with new bounds try: temp_sol = imat(model_temp, reaction_weights, epsilon=eps, threshold=thr) temp_sol_bin = (np.abs(temp_sol.fluxes) >= thr-tol).values.astype(int) if temp_sol.objective_value >= optimal_objective_value: all_solutions.append(temp_sol) all_solutions_binary.append(temp_sol_bin) all_reactions.append(rid) if temp_sol_bin not in unique_solutions_binary: unique_solutions.append(temp_sol) unique_solutions_binary.append(temp_sol_bin) unique_reactions.append(rid) except: if prev_sol_bin[idx] == 1: print("An error occurred with reaction %s. " "Check feasibility of the model when this reaction is blocked" % rid) else: print("An error occurred with reaction %s. " "Check feasibility of the model when this reaction is irreversible" % rid) solution = RxnEnumSolution(all_solutions, unique_solutions, all_solutions_binary, unique_solutions_binary, all_reactions, unique_reactions) return solution
[docs]def rxn_enum_single_loop(model, reaction_weights, rec_id, new_rec_state, out_name, eps=1e-2, thr=1e-5): with model as model_temp: if rec_id not in model.reactions: print("reaction not found in model") return 0 rxn = model_temp.reactions.get_by_id(rec_id) if int(new_rec_state) == 0: rxn.bounds = (0., 0.) elif int(new_rec_state) == 1: rxn.lower_bound = thr elif int(new_rec_state) == 2: rxn.upper_bound = -thr else: print("new_rec_state has an incorrect value: %s" % str(new_rec_state)) return 0 try: sol = imat(model_temp, reaction_weights, epsilon=eps, threshold=thr) except: print("This constraint renders the problem unfeasible") return 0 write_solution(model, sol, thr, out_name) return 1
if __name__ == "__main__": description = "Performs the reaction enumeration algorithm on a specified list of reactions" parser = argparse.ArgumentParser(description=description, formatter_class=argparse.RawTextHelpFormatter) parser.add_argument("-m", "--model", help="Metabolic model in sbml, matlab, or json format") parser.add_argument("-l", "--reaction_list", default=None, help="csv list of reactions to enumerate - if empty, " "will use all reactions in the model") parser.add_argument("--range", default="_", help="range of reactions to use from the list, in the format 'int_int'") parser.add_argument("-r", "--reaction_weights", default=None, help="Reaction weights in csv format (first row: reaction names, second row: weights)") parser.add_argument("-p", "--prev_sol", default=None, help="initial imat solution in .txt format") parser.add_argument("-e", "--epsilon", type=float, default=1e-2, help="Activation threshold for highly expressed reactions") parser.add_argument("--threshold", type=float, default=1e-5, help="Activation threshold for all reactions") parser.add_argument("-t", "--timelimit", type=int, default=None, help="Solver time limit") parser.add_argument("--tol", type=float, default=1e-6, help="Solver feasibility tolerance") parser.add_argument("--mipgap", type=float, default=1e-3, help="Solver MIP gap tolerance") parser.add_argument("--obj_tol", type=float, default=1e-3, help="objective value tolerance, as a fraction of the original value") parser.add_argument("-o", "--output", default="rxn_enum", help="Path of output files, without format") parser.add_argument("--save", action="store_true", help="Use this flag to save each solution individually") args = parser.parse_args() model = read_model(args.model) check_model_options(model, timelimit=args.timelimit, feasibility=args.tol, mipgaptol=args.mipgap) reaction_weights = {} if args.reaction_weights: reaction_weights = load_reaction_weights(args.reaction_weights) rxn_list = [] if args.reaction_list: df = pd.read_csv(args.reaction_list, header=None) reactions = [x for x in df.unstack().values] rxn_range = args.range.split("_") if rxn_range[0] == '': start = 0 else: start = int(rxn_range[0]) if rxn_range[1] == '': rxn_list = reactions[start:] elif int(rxn_range[1]) > len(reactions): rxn_list = reactions[start:] else: rxn_list = reactions[start:int(rxn_range[1])] if args.prev_sol: initial_solution, initial_binary = read_solution(args.prev_sol, model, reaction_weights) model = create_new_partial_variables(model, reaction_weights, args.epsilon, args.threshold) else: initial_solution = imat(model, reaction_weights, epsilon=args.epsilon, threshold=args.threshold) solution = rxn_enum(model=model, rxn_list=rxn_list, prev_sol=initial_solution, reaction_weights=reaction_weights, eps=args.epsilon, thr=args.threshold, obj_tol=args.obj_tol) uniques = pd.DataFrame(solution.unique_binary) uniques.to_csv(args.output+"_solutions.csv") if args.save: for i in range(len(solution.unique_solutions)): write_solution(model, solution.unique_solutions[i], args.threshold, args.output+"_solution_"+str(i)+".csv")