From 3cb87afe01cfa5234d8f8cfe64bf4fa125510f87 Mon Sep 17 00:00:00 2001 From: Peter Schubert <Peter.Schubert@hhu.de> Date: Mon, 19 Dec 2022 17:07:29 +0100 Subject: [PATCH] Initial Commit with RBA -> Excel --- rbaxdf/model/rba_density.py | 48 +++++++ rbaxdf/model/rba_enzyme.py | 70 +++++++++ rbaxdf/model/rba_macromolecules.py | 97 +++++++++++++ rbaxdf/model/rba_metabolism.py | 121 ++++++++++++++++ rbaxdf/model/rba_model.py | 106 ++++++++++++++ rbaxdf/model/rba_parameters.py | 132 +++++++++++++++++ rbaxdf/model/rba_process.py | 218 +++++++++++++++++++++++++++++ rbaxdf/model/rba_target_group.py | 61 ++++++++ rbaxdf/model/rba_target_value.py | 25 ++++ rbaxdf/utils/et_utils.py | 33 +++++ 10 files changed, 911 insertions(+) create mode 100644 rbaxdf/model/rba_density.py create mode 100644 rbaxdf/model/rba_enzyme.py create mode 100644 rbaxdf/model/rba_macromolecules.py create mode 100644 rbaxdf/model/rba_metabolism.py create mode 100644 rbaxdf/model/rba_model.py create mode 100644 rbaxdf/model/rba_parameters.py create mode 100644 rbaxdf/model/rba_process.py create mode 100644 rbaxdf/model/rba_target_group.py create mode 100644 rbaxdf/model/rba_target_value.py create mode 100644 rbaxdf/utils/et_utils.py diff --git a/rbaxdf/model/rba_density.py b/rbaxdf/model/rba_density.py new file mode 100644 index 0000000..41bf12f --- /dev/null +++ b/rbaxdf/model/rba_density.py @@ -0,0 +1,48 @@ +"""Implementation of RbaDensity class. + +Peter Schubert, CCB, HHU Duesseldorf, December 2022 +""" + +import os +import pandas as pd +import xml.etree.ElementTree + +from .rba_target_value import RbaTargetValue + + +class RbaDensity: + + def __init__(self, cid): + self.id = cid + self.target_value = None + + @staticmethod + def get_xml_items(model_dir): + + file_name = os.path.join(model_dir, 'density.xml') + if os.path.exists(file_name) is False: + print(f'{file_name} not found!') + return {} + + tree = xml.etree.ElementTree.parse(file_name) + root = tree.getroot() + assert root.tag == 'RBADensity' + + data = {} + target_densities = root.find('listOfTargetDensities') + for target_density in target_densities.findall('targetDensity'): + cid = target_density.attrib['compartment'] + rba_density = RbaDensity(cid) + rba_density.target_value = RbaTargetValue(target_density) + data[cid] = rba_density + return data + + @staticmethod + def get_df_items(items): + df = pd.DataFrame([item.to_dict() for item in items.values()]) + df.set_index('compartment', inplace=True) + return df + + def to_dict(self): + return {'compartment': self.id, + 'targetValue': self.target_value.get_str()} diff --git a/rbaxdf/model/rba_enzyme.py b/rbaxdf/model/rba_enzyme.py new file mode 100644 index 0000000..12439d8 --- /dev/null +++ b/rbaxdf/model/rba_enzyme.py @@ -0,0 +1,70 @@ +"""Implementation of RbaEnzyme class. + +Peter Schubert, CCB, HHU Duesseldorf, December 2022 +""" + +import os +from xml.etree.ElementTree import Element + +import pandas as pd +import xml.etree.ElementTree + +from rbaxdf.utils.et_utils import get_species_refs + + +class RbaEnzyme: + + def __init__(self, eid): + self.id = eid + self.reaction = '' + self.forward_eff = '' + self.backward_eff = '' + self.zero_cost = False + self.mach_reactants = {} + self.mach_products = {} + + @staticmethod + def get_xml_items(model_dir): + + file_name = os.path.join(model_dir, 'enzymes.xml') + if os.path.exists(file_name) is False: + print(f'{file_name} not found!') + return {} + + tree = xml.etree.ElementTree.parse(file_name) + root = tree.getroot() + assert root.tag == 'RBAEnzymes' + + data = {} + enzymes = root.find('listOfEnzymes') + for enzyme in enzymes.findall('enzyme'): + eid = enzyme.attrib['id'] + rba_enzyme = RbaEnzyme(eid) + rba_enzyme.reaction = enzyme.attrib['reaction'] + rba_enzyme.forward_eff = enzyme.attrib['forward_efficiency'] + rba_enzyme.backward_eff = enzyme.attrib['backward_efficiency'] + if enzyme.attrib.get('zeroCost', 'false').lower() == 'true': + rba_enzyme.zero_cost = True + + machinery_composition = enzyme.find('machineryComposition') + if machinery_composition is not None: + rba_enzyme.mach_reactants = get_species_refs(machinery_composition.find('listOfReactants')) + rba_enzyme.mach_products = get_species_refs(machinery_composition.find('listOfProducts')) + data[eid] = rba_enzyme + return data + + @staticmethod + def get_df_items(items): + df = pd.DataFrame([item.to_dict() for item in items.values()]) + df.set_index('enzyme', inplace=True) + return df + + def to_dict(self): + mach_reactants = '; '.join([f'species={species}, stoic={stoic}' + for species, stoic in self.mach_reactants.items()]) + mach_products = '; '.join([f'species={species}, stoic={stoic}' + for species, stoic in self.mach_products.items()]) + return {'enzyme': self.id, 'reaction': self.reaction, + 'forwardEfficiency': self.forward_eff, 'backwardEfficiency': self.backward_eff, + 'zeroCost': self.zero_cost, 'machineryReactants': mach_reactants, + 'machineryProducts': mach_products} diff --git a/rbaxdf/model/rba_macromolecules.py b/rbaxdf/model/rba_macromolecules.py new file mode 100644 index 0000000..70b13c7 --- /dev/null +++ b/rbaxdf/model/rba_macromolecules.py @@ -0,0 +1,97 @@ +"""Implementation of RbaMacromolecules, RbaComponent and RbaMacromolecule classes. + +Peter Schubert, CCB, HHU Duesseldorf, December 2022 +""" + +import os +import numpy as np +import pandas as pd +import xml.etree.ElementTree + + +class RbaMacromolecules: + + def __init__(self, mm_type): + self.type = mm_type + self.components = {} + self.macromolecules = {} + + def get_xml_items(self, model_dir): + + file_name = os.path.join(model_dir, self.type + '.xml') + if os.path.exists(file_name) is False: + print(f'{file_name} not found!') + return {} + + tree = xml.etree.ElementTree.parse(file_name) + root = tree.getroot() + assert root.tag in {'RBADna', 'RBAProteins', 'RBARnas'} + + self.components = RbaComponent.get_xml_items(root) + self.macromolecules = RbaMacromolecule.get_xml_items(root) + + def get_df_items(self): + df_comp = pd.DataFrame([item.to_dict() for item in self.components.values()]) + df_comp.set_index('component', inplace=True) + df_mm = pd.DataFrame([item.to_dict() for item in self.macromolecules.values()]) + df_mm.set_index('id', inplace=True) + cols = ['compartment'] + df_comp.index.to_list() + df = pd.concat((df_comp.T, df_mm)).reindex(columns=cols) + df.index.name = self.type + return df + + +class RbaComponent: + + def __init__(self, cid): + self.id = cid + self.name = '' + self.type = '' + self.weight = np.nan + + @staticmethod + def get_xml_items(root): + data = {} + components = root.find('listOfComponents') + for component in components.findall('component'): + cid = component.attrib['id'] + rba_component = RbaComponent(cid) + rba_component.name = component.get('name') + rba_component.type = component.get('type') + rba_component.weight = float(component.attrib['weight']) + data[cid] = rba_component + return data + + def to_dict(self): + return {'component': self.id, 'name': self.name, + 'type': self.type, 'weight': self.weight} + + +class RbaMacromolecule: + + def __init__(self, mmid): + self.id = mmid + self.compartment = '' + self.composition = {} + + @staticmethod + def get_xml_items(root): + + data = {} + macromolecules = root.find('listOfMacromolecules') + for macromolecule in macromolecules.findall('macromolecule'): + mmid = macromolecule.attrib['id'] + rba_macromolecule = RbaMacromolecule(mmid) + rba_macromolecule.compartment = macromolecule.attrib['compartment'] + composition = macromolecule.find('composition') + for component_ref in composition.findall('componentReference'): + component = component_ref.attrib['component'] + stoic = float(component_ref.attrib['stoichiometry']) + rba_macromolecule.composition[component] = stoic + data[mmid] = rba_macromolecule + return data + + def to_dict(self): + mm_dict = {'id': self.id, 'compartment': self.compartment} + mm_dict |= self.composition + return mm_dict diff --git a/rbaxdf/model/rba_metabolism.py b/rbaxdf/model/rba_metabolism.py new file mode 100644 index 0000000..a47b3e3 --- /dev/null +++ b/rbaxdf/model/rba_metabolism.py @@ -0,0 +1,121 @@ +"""Implementation of RbaMetabolism, RbaComponent, RbaSpecies and RbaReaction classes. + +Peter Schubert, CCB, HHU Duesseldorf, December 2022 +""" + +import os +import pandas as pd +import xml.etree.ElementTree + +from rbaxdf.utils.et_utils import get_species_refs + + +class RbaMetabolism: + + def __init__(self): + self.compartments = {} + self.species = {} + self.reactions = {} + + def get_xml_items(self, model_dir): + + file_name = os.path.join(model_dir, 'metabolism.xml') + if os.path.exists(file_name) is False: + print(f'{file_name} not found!') + return + + tree = xml.etree.ElementTree.parse(file_name) + root = tree.getroot() + assert root.tag == 'RBAMetabolism' + + self.compartments = RbaCompartment.get_xml_items(root) + self.species = RbaSpecies.get_xml_items(root) + self.reactions = RbaReaction.get_xml_items(root) + + def get_df_items(self, m_type): + df = None + if m_type == 'compartments': + df = pd.DataFrame([item.to_dict() for item in self.compartments.values()]) + df.index.name = 'index' + elif m_type == 'species': + df = pd.DataFrame([item.to_dict() for item in self.species.values()]) + df.set_index('species', inplace=True) + elif m_type == 'reactions': + df = pd.DataFrame([item.to_dict() for item in self.reactions.values()]) + df.set_index('reaction', inplace=True) + else: + print(f'wrong metabolism type: {m_type}') + return df + + +class RbaCompartment: + + def __init__(self, cid): + self.id = cid + + @staticmethod + def get_xml_items(root): + data = {} + compartments = root.find('listOfCompartments') + for compartment in compartments.findall('compartment'): + cid = compartment.attrib['id'] + rba_compartment = RbaCompartment(cid) + data[cid] = rba_compartment + return data + + def to_dict(self): + return {'compartment': self.id} + + +class RbaSpecies: + + def __init__(self, sid): + self.id = sid + self.boundary_condition = False + + @staticmethod + def get_xml_items(root): + + data = {} + species = root.find('listOfSpecies') + for sp in species.findall('species'): + sid = sp.attrib['id'] + rba_species = RbaSpecies(sid) + if sp.attrib['boundaryCondition'].lower() == 'true': + rba_species.boundary_condition = True + data[sid] = rba_species + return data + + def to_dict(self): + return {'species': self.id, 'boundaryCondition': self.boundary_condition} + + +class RbaReaction: + + def __init__(self, sid): + self.id = sid + self.reversible = True + self.reactants = {} + self.products = {} + + @staticmethod + def get_xml_items(root): + + data = {} + reactions = root.find('listOfReactions') + for reaction in reactions.findall('reaction'): + rid = reaction.attrib['id'] + rba_reaction = RbaReaction(rid) + if reaction.attrib['reversible'].lower() == 'false': + rba_reaction.reversible = False + rba_reaction.reactants = get_species_refs(reaction.find('listOfReactants')) + rba_reaction.products = get_species_refs(reaction.find('listOfProducts')) + data[rid] = rba_reaction + return data + + def to_dict(self): + reactants = '; '.join([f'species={species}, stoic={stoic}' + for species, stoic in self.reactants.items()]) + products = '; '.join([f'species={species}, stoic={stoic}' + for species, stoic in self.products.items()]) + return {'reaction': self.id, 'reversible': self.reversible, 'reactants': reactants, 'products': products} diff --git a/rbaxdf/model/rba_model.py b/rbaxdf/model/rba_model.py new file mode 100644 index 0000000..5fd4ce1 --- /dev/null +++ b/rbaxdf/model/rba_model.py @@ -0,0 +1,106 @@ +"""Implementation of RbaModel class. + +Peter Schubert, CCB, HHU Duesseldorf, December 2022 +""" + +import os +import pandas as pd + +from .rba_macromolecules import RbaMacromolecules +from .rba_metabolism import RbaMetabolism +from .rba_parameters import RbaParameters +from .rba_process import RbaProcesses +from .rba_enzyme import RbaEnzyme +from .rba_density import RbaDensity +from .rba_target_group import RbaTargetGroup + + +class RbaModel: + + def __init__(self, model_dir): + """initialyze RBA model + """ + self.model_dir = model_dir + self.is_model = False + self.dna = RbaMacromolecules('dna') + self.rnas = RbaMacromolecules('rnas') + self.proteins = RbaMacromolecules('proteins') + self.metabolism = RbaMetabolism() + self.parameters = RbaParameters() + self.processes = RbaProcesses() + self.density = None + self.targets = None + self.enzymes = None + + if os.path.exists(model_dir) is False: + print(f'{model_dir} not found!') + raise FileNotFoundError + + def import_rba(self): + self.parameters.get_xml_items(self.model_dir) + self.dna.get_xml_items(self.model_dir) + self.rnas.get_xml_items(self.model_dir) + self.proteins.get_xml_items(self.model_dir) + self.metabolism.get_xml_items(self.model_dir) + self.processes.get_xml_items(self.model_dir) + + self.density = RbaDensity.get_xml_items(self.model_dir) + self.targets = RbaTargetGroup.get_xml_items(self.model_dir) + self.enzymes = RbaEnzyme.get_xml_items(self.model_dir) + + self.is_model = True + + def export_rba(self): + pass + + def to_df(self): + m_dict = {} + if self.is_model is True: + m_dict['rnas'] = self.rnas.get_df_items() + m_dict['dna'] = self.dna.get_df_items() + m_dict['proteins'] = self.proteins.get_df_items() + + m_dict['density'] = RbaDensity.get_df_items(self.density) + m_dict['targets'] = RbaTargetGroup.get_df_items(self.targets) + m_dict['enzymes'] = RbaEnzyme.get_df_items(self.enzymes) + + m_dict['compartments'] = self.metabolism.get_df_items('compartments') + m_dict['species'] = self.metabolism.get_df_items('species') + m_dict['reactions'] = self.metabolism.get_df_items('reactions') + m_dict['functions'] = self.parameters.get_df_items('functions') + m_dict['aggregates'] = self.parameters.get_df_items('aggregates') + m_dict['processes'] = self.processes.get_df_items('processes') + m_dict['processing_maps'] = self.processes.get_df_items('processingMaps') + + return m_dict + + def to_excel(self): + xlsx_name = os.path.join(self.model_dir, 'model') + '.xlsx' + m_dict = self.to_df() + + # add target value information strings + for idx, row in m_dict['enzymes'].iterrows(): + m_dict['enzymes'].at[idx, 'fwd_eff_info'] = self.parameters.get_value_info(row['forwardEfficiency']) + m_dict['enzymes'].at[idx, 'bwd_eff_info'] = self.parameters.get_value_info(row['backwardEfficiency']) + for idx, row in m_dict['processes'].iterrows(): + if '=' in row['machineryCapacity']: + first_value = row['machineryCapacity'].split('=')[1] + m_dict['processes'].at[idx, 'capacity_info'] = self.parameters.get_value_info(first_value) + for idx, row in m_dict['density'].iterrows(): + first_value = row['targetValue'].split('=')[1] + m_dict['density'].at[idx, 'value_info'] = self.parameters.get_value_info(first_value) + for idx, row in m_dict['targets'].iterrows(): + first_value = row['targetValue'].split('=')[1] + m_dict['targets'].at[idx, 'value_info'] = self.parameters.get_value_info(first_value) + + with pd.ExcelWriter(xlsx_name) as writer: + for name, df in m_dict.items(): + keep_index = False if df.index.name == 'index' else True + df.to_excel(writer, sheet_name=name, index=keep_index) + print(f'model exported to {xlsx_name}') + + def from_df(self): + pass + + def from_excel(self): + pass diff --git a/rbaxdf/model/rba_parameters.py b/rbaxdf/model/rba_parameters.py new file mode 100644 index 0000000..633170d --- /dev/null +++ b/rbaxdf/model/rba_parameters.py @@ -0,0 +1,132 @@ +"""Implementation of RbaParameters, RbaFunction and RbaAggregate classes. + +Peter Schubert, CCB, HHU Duesseldorf, December 2022 +""" + +import os +import pandas as pd +import xml.etree.ElementTree + + +class RbaParameters: + + def __init__(self): + self.functions = {} + self.aggregates = {} + + def get_xml_items(self, model_dir): + + file_name = os.path.join(model_dir, 'parameters.xml') + if os.path.exists(file_name) is False: + print(f'{file_name} not found!') + return + tree = xml.etree.ElementTree.parse(file_name) + root = tree.getroot() + assert root.tag == 'RBAParameters' + + self.functions = RbaFunction.get_xml_items(root) + self.aggregates = RbaAggregate.get_xml_items(root) + + def get_value_info(self, value): + if value in self.functions: + value_info = self.functions[value].value_info + else: + value_info = self.aggregates[value].value_info + return value_info + + def get_df_items(self, p_type): + df = None + if p_type == 'functions': + df = pd.DataFrame([item.to_dict() for item in self.functions.values()]) + df.set_index('function', inplace=True) + elif p_type == 'aggregates': + df = pd.DataFrame([item.to_dict() for item in self.aggregates.values()]) + df.set_index('aggregate', inplace=True) + else: + print(f'wrong parameter type: {p_type}') + return df + + +class RbaFunction: + + def __init__(self, fid): + self.id = fid + self.type = '' + self.variable = '' + self.parameters = {} + self.value_info = '' + + @staticmethod + def get_xml_items(root): + + data = {} + functions = root.find('listOfFunctions') + for function in functions.findall('function'): + fid = function.attrib['id'] + rba_function = RbaFunction(fid) + rba_function.type = function.attrib['type'] + rba_function.variable = function.attrib['variable'] + parameters = function.find('listOfParameters') + if parameters is not None: + for parameter in parameters.findall('parameter'): + pid = parameter.attrib['id'] + value = float(parameter.attrib['value']) + rba_function.parameters[pid] = value + rba_function.set_value_info() + data[fid] = rba_function + return data + + def set_value_info(self): + value = f'{self.type}: ' + if self.type == 'constant': + value += f'{self.parameters["CONSTANT"]}' + elif self.type == 'linear': + value += (f'{self.parameters["LINEAR_CONSTANT"]} + ' + + f'({self.parameters["LINEAR_COEF"]} * {self.variable})' + + f' [{self.parameters["X_MIN"]}, {self.parameters["X_MAX"]}]' + + f' -> [{self.parameters["Y_MIN"]}, {self.parameters["Y_MAX"]}]') + elif self.type == 'michaelisMenten': + value += (f'{self.parameters["kmax"]} * {self.variable}/' + + f'({self.parameters["Km"]} + {self.variable})' + + f' -> [{self.parameters.get("Y_MIN", "0")}, inf]') + elif self.type == 'indicator': + value += f'{self.variable} in [{self.parameters["X_MIN"]}, {self.parameters["X_MAX"]}]' + elif self.type == 'exponential': + value += f'exp({self.parameters["RATE"]} * {self.variable})' + self.value_info = value + + def to_dict(self): + parameters = ', '.join([f'{pid}={value}' for pid, value in self.parameters.items()]) + return {'function': self.id, 'type': self.type, 'variable': self.variable, 'parameters': parameters} + + +class RbaAggregate: + + def __init__(self, fid): + self.id = fid + self.type = '' + self.functions = [] + self.value_info = '' + + @staticmethod + def get_xml_items(root): + + data = {} + aggregates = root.find('listOfAggregates') + for aggregate in aggregates.findall('aggregate'): + aid = aggregate.attrib['id'] + rba_aggregate = RbaAggregate(aid) + rba_aggregate.type = aggregate.attrib['type'] + function_refs = aggregate.find('listOfFunctionReferences') + for function_ref in function_refs.findall('functionReference'): + rba_aggregate.functions.append(function_ref.attrib['function']) + rba_aggregate.set_value_info() + data[aid] = rba_aggregate + return data + + def set_value_info(self): + self.value_info = f'aggregate: {self.type}' + + def to_dict(self): + functions = ', '.join(self.functions) + return {'aggregate': self.id, 'type': self.type, 'functions': functions} diff --git a/rbaxdf/model/rba_process.py b/rbaxdf/model/rba_process.py new file mode 100644 index 0000000..001a81a --- /dev/null +++ b/rbaxdf/model/rba_process.py @@ -0,0 +1,218 @@ +"""Implementation of RbaProcesses, RbaProcess and RbaProcessingMap classes. + +Peter Schubert, CCB, HHU Duesseldorf, December 2022 +""" + +import os +import numpy as np +import pandas as pd +import xml.etree.ElementTree + +from .rba_target_value import RbaTargetValue +from rbaxdf.utils.et_utils import get_species_refs + + +class RbaProcesses: + + def __init__(self): + self.processes = {} + self.processing_maps = {} + + def get_xml_items(self, model_dir): + + file_name = os.path.join(model_dir, 'processes.xml') + if os.path.exists(file_name) is False: + print(f'{file_name} not found!') + return + tree = xml.etree.ElementTree.parse(file_name) + root = tree.getroot() + assert root.tag == 'RBAProcesses' + + self.processes = RbaProcess.get_xml_items(root) + self.processing_maps = RbaProcessingMap.get_xml_items(root) + + def get_df_items(self, p_type): + df = None + if p_type == 'processes': + df = pd.DataFrame([item.to_dict() for item in self.processes.values()]) + df.set_index('process', inplace=True) + elif p_type == 'processingMaps': + data = [] + for pmid, pmap in self.processing_maps.items(): + pm_dict = pmap.to_dict() + if ((len(pm_dict['constantProcessing'].get('reactants', {})) > 0) or + (len(pm_dict['constantProcessing'].get('products', {})) > 0)): + data.append([pmid, 'constantProcessing', np.nan, pm_dict['constantProcessing']['reactants'], + pm_dict['constantProcessing']['products']]) + for component, comp_proc in pm_dict['componentProcessings'].items(): + data.append([pmid, component, comp_proc['machineryCost'], comp_proc['reactants'], + comp_proc['products']]) + + df = pd.DataFrame(data, columns=['processingMap', 'component', 'machineryCost', + 'reactants', 'products']) + df.set_index('processingMap', inplace=True) + else: + print(f'wrong parameter type: {p_type}') + return df + + +class RbaProcess: + + def __init__(self, pid): + self.id = pid + self.name = None + self.machinery = {} + self.productions = {} + self.degradations = {} + + @staticmethod + def get_xml_items(root): + + data = {} + processes = root.find('listOfProcesses') + for process in processes.findall('process'): + pid = process.attrib['id'] + rba_process = RbaProcess(pid) + rba_process.name = process.get('name') + machinery = process.find('machinery') + if machinery is not None: + composition = machinery.find('machineryComposition') + reactants = get_species_refs(composition.find('listOfReactants')) + products = get_species_refs(composition.find('listOfProducts')) + capacity = RbaTargetValue(machinery.find('capacity')) + rba_process.machinery = {'capacity': capacity, 'reactants': reactants, + 'products': products} + # TODO: use an utilty function + processings = process.find('processings') + if processings is not None: + productions = processings.find('listOfProductions') + if productions is not None: + processing = productions.find('processing') + processing_map = processing.attrib['processingMap'] + p_set = processing.attrib['set'] + inputs = processing.find('listOfInputs') + p_inputs = [sref.attrib['species'] for sref in inputs.findall('speciesReference')] + rba_process.productions = {'processingMap': processing_map, 'set': p_set, 'inputs': p_inputs} + degradations = processings.find('listOfDegradations') + if degradations is not None: + processing = degradations.find('processing') + processing_map = processing.attrib['processingMap'] + p_set = processing.attrib['set'] + inputs = processing.find('listOfInputs') + p_inputs = [sref.attrib['species'] for sref in inputs.findall('speciesReference')] + rba_process.degradations = {'processingMap': processing_map, 'set': p_set, 'inputs': p_inputs} + + data[pid] = rba_process + return data + + def to_dict(self): + mach_capacity = '' + mach_reactants = '' + mach_products = '' + if len(self.machinery) > 0: + mach_capacity = self.machinery['capacity'].get_str() + mach_reactants = '; '.join([f'species={species}, stoic={stoic}' + for species, stoic in self.machinery['reactants'].items()]) + mach_products = '; '.join([f'species={species}, stoic={stoic}' + for species, stoic in self.machinery['products'].items()]) + prod_pmap = '' + prod_set = '' + prod_inputs = '' + if len(self.productions) > 0: + prod_pmap = self.productions['processingMap'] + prod_set = self.productions['set'] + prod_inputs = ', '.join(self.productions['inputs']) + + degr_pmap = '' + degr_set = '' + degr_inputs = '' + if len(self.degradations) > 0: + degr_pmap = self.degradations['processingMap'] + degr_set = self.degradations['set'] + degr_inputs = ', '.join(self.degradations['inputs']) + + return {'process': self.id, 'name': self.name, + 'machineryCapacity': mach_capacity, 'machineryReactants': mach_reactants, + 'machineryProducts': mach_products, + 'productionsProcessingMap': prod_pmap, 'productionsSet': prod_set, + 'productionsInputs': prod_inputs, + 'degradationProcessingMap': degr_pmap, 'degradationSet': degr_set, + 'degradationInputs': degr_inputs} + + +class RbaProcessingMap: + + def __init__(self, pmid): + self.id = pmid + self.constant_processing = {} + self.component_processings = {} + + @staticmethod + def get_xml_items(root): + + data = {} + processing_maps = root.find('listOfProcessingMaps') + for processing_map in processing_maps.findall('processingMap'): + pmid = processing_map.attrib['id'] + rba_pmap = RbaProcessingMap(pmid) + + const_proc = processing_map.find('constantProcessing') + if const_proc is not None: + rba_pmap.constant_processing['reactants'] = get_species_refs(const_proc.find('listOfReactants')) + rba_pmap.constant_processing['products'] = get_species_refs(const_proc.find('listOfProducts')) + + comp_procs = processing_map.find('listOfComponentProcessings') + if comp_procs is not None: + for comp_proc in comp_procs.findall('componentProcessing'): + component = comp_proc.attrib['component'] + cost = float(comp_proc.get('machineryCost', '0')) + reactants = get_species_refs(comp_proc.find('listOfReactants')) + products = get_species_refs(comp_proc.find('listOfProducts')) + rba_pmap.component_processings[component] = {'cost': cost, 'reactants': reactants, + 'products': products} + data[pmid] = rba_pmap + return data + + def to_df(self): + pmid = self.id + data = [] + if ((len(self.constant_processing.get('reactants', {})) > 0) or + (len(self.constant_processing.get('reactants', {}))) > 0): + cost = np.nan + component = 'constantProcessing' + reactants = '; '.join([f'species={species}, stoic={stoic}' + for species, stoic in self.constant_processing['reactants'].items()]) + products = '; '.join([f'species={species}, stoic={stoic}' + for species, stoic in self.constant_processing['products'].items()]) + data.append([pmid, component, cost, reactants, products]) + if len(self.component_processings) > 0: + for component, comp_proc in self.component_processings.items(): + cost = comp_proc['cost'] + reactants = '; '.join([f'species={species}, stoic={stoic}' + for species, stoic in comp_proc['reactants'].items()]) + products = '; '.join([f'species={species}, stoic={stoic}' + for species, stoic in comp_proc['products'].items()]) + data.append([pmid, component, cost, reactants, products]) + df = pd.DataFrame(data, columns=['processingMap', 'component', 'cost', 'reactants', 'products']) + return df + + def to_dict(self): + const_proc = {} + if 'reactants' in self.constant_processing: + const_proc['reactants'] = '; '.join([f'species={species}, stoic={stoic}' + for species, stoic in self.constant_processing['reactants'].items()]) + const_proc['products'] = '; '.join([f'species={species}, stoic={stoic}' + for species, stoic in self.constant_processing['products'].items()]) + + comp_procs = {} + if len(self.component_processings) > 0: + for component, comp_proc in self.component_processings.items(): + mach_cost = comp_proc['cost'] + reactants = '; '.join([f'species={species}, stoic={stoic}' + for species, stoic in comp_proc['reactants'].items()]) + products = '; '.join([f'species={species}, stoic={stoic}' + for species, stoic in comp_proc['products'].items()]) + comp_procs[component] = {'machineryCost': mach_cost, 'reactants': reactants, 'products': products} + + return {'processing_map': self.id, 'constantProcessing': const_proc, + 'componentProcessings': comp_procs} diff --git a/rbaxdf/model/rba_target_group.py b/rbaxdf/model/rba_target_group.py new file mode 100644 index 0000000..0892650 --- /dev/null +++ b/rbaxdf/model/rba_target_group.py @@ -0,0 +1,61 @@ +"""Implementation of RbaTargetGroup class. + +Peter Schubert, CCB, HHU Duesseldorf, December 2022 +""" + +import os +import pandas as pd +import xml.etree.ElementTree + +from rbaxdf.utils.et_utils import get_target_species, get_target_reactions + + +class RbaTargetGroup: + + def __init__(self, tgid): + self.id = tgid + self.concentrations = {} + self.production_fluxes = {} + self.degradation_fluxes = {} + self.reaction_fluxes = {} + + @staticmethod + def get_xml_items(model_dir): + + file_name = os.path.join(model_dir, 'targets.xml') + if os.path.exists(file_name) is False: + print(f'{file_name} not found!') + return {} + + tree = xml.etree.ElementTree.parse(file_name) + root = tree.getroot() + assert root.tag == 'RBATargets' + + data = {} + target_groups = root.find('listOfTargetGroups') + for target_group in target_groups.findall('targetGroup'): + tgid = target_group.attrib.get('id', '') + rba_target = RbaTargetGroup(tgid) + rba_target.concentrations = get_target_species(target_group.find('listOfConcentrations')) + rba_target.production_fluxes = get_target_species(target_group.find('listOfProductionFluxes')) + rba_target.degradation_fluxes = get_target_species(target_group.find('listOfDegradationFluxes')) + rba_target.reaction_fluxes = get_target_reactions(target_group.find('listOfReactionFluxes')) + data[tgid] = rba_target + return data + + @staticmethod + def get_df_items(items): + data = [] + for tgid, tg in items.items(): + for target, target_value in tg.concentrations.items(): + data.append([tgid, 'concentrations', target, target_value.get_str()]) + for target, target_value in tg.production_fluxes.items(): + data.append([tgid, 'productionFluxes', target, target_value.get_str()]) + for target, target_value in tg.degradation_fluxes.items(): + data.append([tgid, 'degradationFluxes', target, target_value.get_str()]) + for target, target_value in tg.reaction_fluxes.items(): + data.append([tgid, 'reactionFluxes', target, target_value.get_str()]) + + df = pd.DataFrame(data, columns=['targetGroup', 'targetType', 'target', 'targetValue']) + df.set_index('targetGroup', inplace=True) + return df diff --git a/rbaxdf/model/rba_target_value.py b/rbaxdf/model/rba_target_value.py new file mode 100644 index 0000000..17e6f7e --- /dev/null +++ b/rbaxdf/model/rba_target_value.py @@ -0,0 +1,25 @@ +"""Implementation of RbaTargetValue class. + +Peter Schubert, CCB, HHU Duesseldorf, December 2022 +""" + + +class RbaTargetValue: + + def __init__(self, target): + self.lower_bound = target.attrib.get('lowerBound') + self.upper_bound = target.attrib.get('upperBound') + self.value = target.attrib.get('value') + if self.value is not None: + self.lower_bound = None + self.upper_bound = None + + def get_str(self): + target_values = [] + if self.lower_bound is not None: + target_values.append(f'lowerBound={self.lower_bound}') + if self.upper_bound is not None: + target_values.append(f'upperBound={self.upper_bound}') + if self.value is not None: + target_values.append(f'value={self.value}') + return ', '.join(target_values) diff --git a/rbaxdf/utils/et_utils.py b/rbaxdf/utils/et_utils.py new file mode 100644 index 0000000..b364f88 --- /dev/null +++ b/rbaxdf/utils/et_utils.py @@ -0,0 +1,33 @@ + + +from rbaxdf.model.rba_target_value import RbaTargetValue + + +def get_target_species(ts_parent): + data = {} + if ts_parent is not None: + for target in ts_parent.findall('targetSpecies'): + species = target.attrib['species'] + target_value = RbaTargetValue(target) + data[species] = target_value + return data + + +def get_target_reactions(tr_parent): + data = {} + if tr_parent is not None: + for target in tr_parent.findall('targetReaction'): + reaction = target.attrib['reaction'] + target_values = RbaTargetValue(target) + data[reaction] = target_values + return data + + +def get_species_refs(srefs_parent): + srefs = {} + if srefs_parent is not None: + for sref in srefs_parent.findall('speciesReference'): + sid = sref.attrib['species'] + stoic = float(sref.attrib['stoichiometry']) + srefs[sid] = stoic + return srefs -- GitLab