Source code for dexom_python.model_functions

import optlang
import pandas as pd
import numpy as np
import cobra
from pathlib import Path
from cobra.io import load_json_model, read_sbml_model, load_matlab_model
from cobra.flux_analysis import find_blocked_reactions
from warnings import warn
from cobra.exceptions import SolverNotFound, OptimizationError
from dexom_python.default_parameter_values import DEFAULT_VALUES


[docs]def read_model(modelfile, solver='cplex'): """ Read a model from a file. Supported formats are SBML (.xml, .sbml), JSON (.json), and Matlab (.mat). Must use a cobrapy compatible solver. Parameters ---------- modelfile: str solver: str Returns ------- model: cobra.Model """ config = cobra.Configuration() try: config.solver=solver except SolverNotFound: warn(f'The solver: {solver} is not available or not properly installed, the model will be read with the default solver {config.solver.__name__}') fileformat = Path(modelfile).suffix model = None if fileformat == '.sbml' or fileformat == '.xml': model = read_sbml_model(modelfile) elif fileformat == '.json': model = load_json_model(modelfile) elif fileformat == '.mat': model = load_matlab_model(modelfile) elif fileformat == '': warn('Wrong model path') else: raise TypeError('Only SBML, JSON, and Matlab formats are supported for the models') return model
[docs]def check_model_options(model, timelimit=DEFAULT_VALUES['timelimit'], tolerance=DEFAULT_VALUES['tolerance'], mipgaptol=DEFAULT_VALUES['mipgap'], verbosity=DEFAULT_VALUES['verbosity']): """ Sets model options such as timelimit, tolerance, mipgapt tolerance, and verbosity Parameters ---------- model: cobra.Model timelimit: int tolerance: float mipgaptol: float verbosity: int Returns ------- model: cobra.Model """ model.solver.configuration.timeout = timelimit model.tolerance = tolerance model.solver.configuration.verbosity = verbosity model.solver.configuration.presolve = True if hasattr(optlang, 'cplex_interface'): if isinstance(model.solver, optlang.cplex_interface.Model): model.solver.problem.parameters.mip.tolerances.mipgap.set(mipgaptol) else: warn('setting the MIP gap tolerance is only available with the cplex solver') return model
[docs]def check_threshold_tolerance(model, epsilon, threshold): """ Checks if model.tolerance, epsilon and threshold parameters are compatible. If not, new epsilon and threshold values are returned. Parameters ---------- model: cobra.Model epsilon: float threshold: float Returns ------- epsilon: float threshold: float """ cobra_config = cobra.Configuration() # limit = model.tolerance * max(abs(cobra_config.upper_bound), abs(cobra_config.lower_bound)) limit = model.solver.configuration.tolerances.integrality * np.max([r.bounds for r in model.reactions]) + model.solver.configuration.tolerances.feasibility if threshold < limit: UserWarning(f'The threshold parameter is below the detection limit for RL reactions. RL reactions can only have guaranteed flux < {limit}') threshold = limit elif threshold > limit: print(f'The threshold parameter is above the detection limit for RL reactions. It can be set up to {limit}') # threshold = limit limit = (limit + 1e-6) / (1 - limit - model.solver.configuration.tolerances.feasibility) if epsilon < limit: UserWarning(f'The epsilon parameter value is too low compared the detection limit for RH reactions. RH reactions can only have guaranteed flux > {limit}') epsilon = limit # if threshold < 2*limit: # raise ValueError('The threshold parameter value is too low compared to the current model tolerance. ' # 'Current threshold value: %s. Current tolerance value:%s. Minimum threshold value: %s' # % (str(threshold), str(model.tolerance), str(2*limit))) # if epsilon < np.around(threshold + limit, 10): # raise ValueError('The epsilon parameter value is too low compared to the current threshold and model tolerance.' # ' Current epsilon value: %s. Current threshold value: %s. Current tolerance value:%s. ' # 'Minimum epsilon value: %s' # % (str(epsilon), str(threshold), str(model.tolerance), str(threshold + limit))) return epsilon, threshold
[docs]def check_constraint_values(model): for c in model.constraints: error = False if c.ub is None: if c.primal < c.lb - model.tolerance: error = True elif c.lb is None: if c.primal > c.ub + model.tolerance: error = True elif c.primal > c.ub + model.tolerance or c.primal < c.lb - model.tolerance: error = True if error: print('Invalid constraint value for %s: (lb, primal, ub) = (%s, %s, %s)' % (c.name, str(c.lb), str(c.primal), str(c.ub))) raise OptimizationError('Invalid constraint value for %s ' % c.name) return True
[docs]def get_all_reactions_from_model(model, save=True, shuffle=True, out_path=''): """ Outputs a list of all reactions in the model. If possible, all blocked reactions are removed. Optionally, the reaction-list can be shuffled. Parameters ---------- model: cobra.Model save: bool by default, exports the reactions in a csv format shuffle: bool set to True to shuffle the order of the reactions out_path: str output path Returns ------- rxn_list: A list of all reactions in the model """ rxn_list = [r.id for r in model.reactions] try: if hasattr(model, "_sbml"): model._sbml['created'] = None # In level 3 SMBL models, the model._sbml['created'] attribute is a SwigPyObject # which causes an exception in cobra.flux_analysis.find_blocked_reactions blocked = find_blocked_reactions(model) rxn_list = list(set(rxn_list) - set(blocked)) except: warn("Could not find blocked reactions. Output list contains all reactions of the model.") if save: pd.Series(rxn_list).to_csv(out_path + model.id + '_reactions.csv', header=False, index=False) if shuffle: np.random.shuffle(rxn_list) if save: pd.Series(rxn_list).to_csv(out_path + model.id + '_reactions_shuffled.csv', header=False, index=False) return rxn_list
[docs]def get_subsystems_from_model(model, save=True, out_path=''): """ Creates a list of all subsystems of a model and their associated reactions Parameters ---------- model: cobra.Model save: bool out_path: str Returns ------- rxn_sub: pandas.DataFrame a DataFrame with reaction names as index and subsystem name as column sub_list: list a list of subsystems """ rxn_sub = {} sub_list = [] i = 0 for rxn in model.reactions: rxn_sub[i] = (rxn.id, rxn.subsystem) i += 1 if rxn.subsystem not in sub_list: sub_list.append(rxn.subsystem) if sub_list[-1] == '': sub_list.pop() sub_list.sort() rxn_sub = pd.DataFrame.from_dict(rxn_sub, orient='index', columns=['ID', 'subsystem']) if save: rxn_sub.to_csv(out_path+model.id+'_reactions_subsystems.csv') with open(out_path+model.id+'_subsystems_list.txt', 'w+') as file: file.write(';'.join(sub_list)) return rxn_sub, sub_list
[docs]def save_reaction_weights(reaction_weights, filename='reaction_weights.csv'): """ Parameters ---------- reaction_weights: dict a dictionary where keys = reaction IDs and values = weights filename: str Returns ------- reaction_weights: pandas.DataFrame """ df = pd.DataFrame(reaction_weights.items(), columns=['reactions', 'weights']) df.to_csv(filename) df.index = df['reactions'] return df['weights']
[docs]def load_reaction_weights(filename, rxn_names='reactions', weight_names='weights'): """ loads reaction weights from a .csv file Parameters ---------- filename: str the path + name of a .csv file containing reaction weights rxn_names: str the name of the column containing the reaction names weight_names: str the name of the column containing the weights Returns ------- reaction_weights: dict """ df = pd.read_csv(filename, sep=';|,|\t', engine='python') df.index = df[rxn_names] reaction_weights = df[weight_names].to_dict() return {str(k): float(v) for k, v in reaction_weights.items() if float(v) == float(v)}
[docs]def check_model_primals(model, rw, eps=DEFAULT_VALUES['epsilon'], thr=DEFAULT_VALUES['threshold'], savename = 'primal_check.csv'): integ = model.solver.configuration.tolerances.integrality feasib = model.solver.configuration.tolerances.feasibility rllimit = integ * np.max([r.bounds for r in model.reactions]) + feasib *2 rhlimit = (rllimit + 1e-6) / (1 - rllimit - feasib) if eps<rhlimit: print('epsilon is below rhlimit, there will probably be errors') var_primals = model.solver.primal_values # forward = np.empty(len(model.reactions)) # reverse = np.empty(len(model.reactions)) flux = np.empty(len(model.reactions)) x = np.empty(len(model.reactions)) xfor = np.empty(len(model.reactions)) xrev = np.empty(len(model.reactions)) status = {} for i, r in enumerate(model.reactions): rid = r.id flux[i] = var_primals[r.id] - var_primals[r.reverse_id] if rid not in rw: continue # forward[i] = var_primals[r.id] # reverse[i] = var_primals[r.reverse_id] if rw[r.id] > 0: try: x[i] = var_primals['x_' + r.id] xfor[i] = var_primals['xf_' + r.id] xrev[i] = var_primals['xr_' + r.id] # print(r.id + ' ok') if flux[i] >= eps - rllimit: # if reverse[i] > feasib: # status[rid] = 'wrong flux: both directions' if xfor[i] > integ > xrev[i]: status[rid] = 'correct flux forward' elif xfor[i] > integ and xrev[i] > 1: status[rid] = 'incorrect flux forward: both binaries' elif xrev[i] > integ > xfor[i]: status[rid] = 'incorrect flux forward: binary reversed' else: status[rid] = 'incorrect flux forward: no binary' elif flux[i] <= -eps + rllimit: # if forward[i] > feasib: # status[rid] = 'wrong flux: both directions' if xfor[i] > integ > xrev[i]: status[rid] = 'incorrect flux reverse: binary reversed' elif xfor[i] > integ and xrev[i] > 1: status[rid] = 'incorrect flux reverse: both binaries' elif xrev[i] > integ > xfor[i]: status[rid] = 'correct flux reverse' else: status[rid] = 'incorrect flux reverse: no binary' else: if xfor[i] > integ > xrev[i]: status[rid] = 'incorrect flux below epsilon: binary forward' elif xfor[i] > integ and xrev[i] > 1: status[rid] = 'incorrect flux below epsilon: both binaries' elif xrev[i] > integ > xfor[i]: status[rid] = 'incorrect flux below epsilon: binary reverse' else: status[rid] = 'correct flux below epsilon' except: x[i] = None xfor[i] = None xrev[i] = None print(r.id + ' RH binary not present?') status[rid] = 'binary not present' elif rw[r.id] < 0: try: x[i] = var_primals['x_' + r.id] # xfor[i] = None # xrev[i] = None xfor[i] = var_primals['xf_' + r.id] xrev[i] = var_primals['xr_' + r.id] # print(r.id + ' ok') # if forward[i] > 0 and reverse[i] > 0: # status[rid] = 'wrong noflux: both directions' if np.abs(flux[i]) <= thr+feasib: if x[i] < integ: status[rid] = 'correct noflux' else: status[rid] = 'incorrect noflux: binary active' else: if x[i] < integ: status[rid] = 'incorrect noflux above tolerance' else: status[rid] = 'correct noflux above tolerance' except: x[i] = None xfor[i] = None xrev[i] = None print(r.id + ' RL binary not present?') status[rid] = 'binary not present' else: status[rid] = '' x[i] = None xfor[i] = None xrev[i] = None if 'incorrect' in status[rid] or 'wrong' in status[rid]: print(rid, status[rid]) df = pd.DataFrame([flux, x, xfor, xrev], columns=[r.id for r in model.reactions], index=['flux', 'x', 'x_forward', 'x_reverse']).T rw = pd.Series(rw) df['rw'] = rw status = pd.Series(status) df['status'] = status if savename is not None: df.to_csv(savename, sep=';') return df