diff --git a/rbaxdf/model/rba_densities.py b/rbaxdf/model/rba_densities.py index 48ca99cbb2058d38cf2470cc466cbf7c10328ab0..1d51f61ac48057f2c7cb1cafe7f30c7966936136 100644 --- a/rbaxdf/model/rba_densities.py +++ b/rbaxdf/model/rba_densities.py @@ -18,17 +18,18 @@ class RbaDensities: def import_xml(self, model_dir): file_name = os.path.join(model_dir, 'density.xml') - if os.path.exists(model_dir) is False: + if os.path.exists(model_dir) is True: + root = parse(file_name).getroot() + assert root.tag == 'RBADensity' + self.densities = RbaDensity.import_xml(root.find('listOfTargetDensities')) + else: print(f'{file_name} not found!') - return {} - tree = parse(file_name) - root = tree.getroot() - assert root.tag == 'RBADensity' - self.densities = RbaDensity.import_xml(root.find('listOfTargetDensities')) - - def from_df(self, df): - self.densities = RbaDensity.from_df(df) + def from_df(self, m_dict): + if 'densities' in m_dict: + self.densities = RbaDensity.from_df(m_dict['densities']) + else: + print(f'densities not imported!') def export_xml(self, model_dir): @@ -46,7 +47,7 @@ class RbaDensities: def to_df(self): df = pd.DataFrame([item.to_dict() for item in self.densities.values()]) df.set_index('compartment', inplace=True) - return df + return {'densities': df} def validate(self, component_ids): valid = True diff --git a/rbaxdf/model/rba_enzymes.py b/rbaxdf/model/rba_enzymes.py index ec7eaf2e9f6537484ef09c1319d0fa04497b19fb..56374a011041ee1a724108bf1405a2ba97afac91 100644 --- a/rbaxdf/model/rba_enzymes.py +++ b/rbaxdf/model/rba_enzymes.py @@ -8,7 +8,7 @@ import os import pandas as pd from xml.etree.ElementTree import parse, ElementTree, Element, SubElement, indent -from rbaxdf.utils.utils import get_species_refs_from_xml +from rbaxdf.utils.utils import get_species_refs_from_xml, get_species_refs_from_str class RbaEnzymes: @@ -19,14 +19,18 @@ class RbaEnzymes: def import_xml(self, model_dir): file_name = os.path.join(model_dir, 'enzymes.xml') - if os.path.exists(file_name) is False: + if os.path.exists(file_name) is True: + root = parse(file_name).getroot() + assert root.tag == 'RBAEnzymes' + self.enzymes = RbaEnzyme.import_xml(root.find('listOfEnzymes')) + else: print(f'{file_name} not found!') - return {} - tree = parse(file_name) - root = tree.getroot() - assert root.tag == 'RBAEnzymes' - self.enzymes = RbaEnzyme.import_xml(root.find('listOfEnzymes')) + def from_df(self, m_dict): + if 'enzymes' in m_dict: + self.enzymes = RbaEnzyme.from_df(m_dict['enzymes']) + else: + print(f'enzymes not imported!') def export_xml(self, model_dir): @@ -44,7 +48,7 @@ class RbaEnzymes: def to_df(self): df = pd.DataFrame([item.to_dict() for item in self.enzymes.values()]) df.set_index('enzyme', inplace=True) - return df + return {'enzymes': df} def validate(self, component_ids): valid = True @@ -107,6 +111,20 @@ class RbaEnzyme: data[eid] = rba_enzyme return data + @staticmethod + def from_df(df): + data = {} + for eid, row in df.iterrows(): + rba_enzyme = RbaEnzyme(eid) + rba_enzyme.reaction = row['reaction'] + rba_enzyme.forward_eff = row['forwardEfficiency'] + rba_enzyme.backward_eff = row['backwardEfficiency'] + rba_enzyme.zero_cost = row['zeroCost'] + rba_enzyme.mach_reactants = get_species_refs_from_str(row['machineryReactants']) + rba_enzyme.mach_products = get_species_refs_from_str(row['machineryProducts']) + data[eid] = rba_enzyme + return data + def export_xml(self): attribs = {'id': self.id, 'reaction': self.reaction, 'forward_efficiency': self.forward_eff, 'backward_efficiency': self.backward_eff, 'zeroCost': str(self.zero_cost).lower()} diff --git a/rbaxdf/model/rba_macromolecules.py b/rbaxdf/model/rba_macromolecules.py index 663b7c53960c4126be74823882caeaf2183f8e15..b5f9f09ace8f1c63d7cb8b7ae513e0fdbfb37893 100644 --- a/rbaxdf/model/rba_macromolecules.py +++ b/rbaxdf/model/rba_macromolecules.py @@ -21,20 +21,20 @@ class RbaMacromolecules: def import_xml(self, model_dir): file_name = os.path.join(model_dir, self.type + '.xml') - if os.path.exists(file_name) is False: + if os.path.exists(file_name) is True: + root = parse(file_name).getroot() + assert root.tag == type2tag[self.type] + self.components = RbaComponent.import_xml(root.find('listOfComponents')) + self.macromolecules = RbaMacromolecule.import_xml(root.find('listOfMacromolecules')) + else: print(f'{file_name} not found!') - return {} - tree = parse(file_name) - root = tree.getroot() - assert root.tag == type2tag[self.type] - - self.components = RbaComponent.import_xml(root.find('listOfComponents')) - self.macromolecules = RbaMacromolecule.import_xml(root.find('listOfMacromolecules')) - - def from_df(self, df): - self.components = RbaComponent.from_df(df) - self.macromolecules = RbaMacromolecule.from_df(df) + def from_df(self, m_dict): + if self.type in m_dict: + self.components = RbaComponent.from_df(m_dict[self.type]) + self.macromolecules = RbaMacromolecule.from_df(m_dict[self.type]) + else: + print(f'{self.type} not imported!') def export_xml(self, model_dir): file_name = os.path.join(model_dir, self.type + '.xml') @@ -60,7 +60,7 @@ class RbaMacromolecules: cols = ['compartment'] + df_comp.index.to_list() df = pd.concat((df_comp.T, df_mm)).reindex(columns=cols) df.index.name = self.type - return df + return {self.type: df} def validate(self, component_ids): return True @@ -70,8 +70,8 @@ class RbaComponent: def __init__(self, cid): self.id = cid - self.name = '' - self.type = '' + self.name = None + self.type = None self.weight = np.nan @staticmethod @@ -92,14 +92,21 @@ class RbaComponent: components = [col for col in df.columns if col != 'compartment'] for cid in components: rba_component = RbaComponent(cid) - rba_component.name = df.at['name', cid] - rba_component.type = df.at['type', cid] + if type(df.at['name', cid]) is str: + rba_component.name = df.at['name', cid] + if type(df.at['type', cid]) is str: + rba_component.type = df.at['type', cid] rba_component.weight = df.at['weight', cid] data[cid] = rba_component return data def export_xml(self): - attribs = {'id': self.id, 'name': self.name, 'type': self.type, 'weight': str(self.weight)} + attribs = {'id': self.id} + if type(self.name) is str: + attribs |= {'name': self.name} + if type(self.type) is str: + attribs |= {'type': self.type} + attribs |= {'weight': str(self.weight)} return Element('component', attribs) def to_dict(self): diff --git a/rbaxdf/model/rba_medium.py b/rbaxdf/model/rba_medium.py new file mode 100644 index 0000000000000000000000000000000000000000..a2157071dd010c418fd99ff8072ccac97fc1c112 --- /dev/null +++ b/rbaxdf/model/rba_medium.py @@ -0,0 +1,46 @@ +"""Implementation of RbaMedium class. + +Peter Schubert, CCB, HHU Duesseldorf, December 2022 +""" + +import os +import pandas as pd + + +class RbaMedium: + + def __init__(self): + self.concentrations = {} + + def import_xml(self, model_dir): + # actually an import from tsv file + file_name = os.path.join(model_dir, 'medium.tsv') + if os.path.exists(model_dir) is True: + df = pd.read_table(file_name, usecols=['Metabolite', 'Concentration'], index_col='Metabolite') + self.concentrations = df.to_dict()['Concentration'] + else: + print(f'{file_name} not found!') + + def from_df(self, m_dict): + if 'medium' in m_dict: + self.concentrations = m_dict['medium'].to_dict()['Concentration'] + else: + print(f'medium not imported!') + + def export_xml(self, model_dir): + # actually an export to tsv file + file_name = os.path.join(model_dir, 'medium.tsv') + df = self.to_df()['medium'] + df.to_csv(file_name, sep='\t') + + def to_df(self): + df = pd.DataFrame(self.concentrations.values(), index=list(self.concentrations), + columns=['Concentration']) + df.index.name = 'Metabolite' + return {'medium': df} + + def validate(self, component_ids): + valid = True + # TODO: check self.concentrations.keys() against variables in parameters.functions + + return valid diff --git a/rbaxdf/model/rba_metabolism.py b/rbaxdf/model/rba_metabolism.py index 30f7a1d1704c4415144bc86b45f78331442cbdb9..7423f755a68c7e0aaf189dfe2ecc58484f68cf47 100644 --- a/rbaxdf/model/rba_metabolism.py +++ b/rbaxdf/model/rba_metabolism.py @@ -20,22 +20,28 @@ class RbaMetabolism: def import_xml(self, model_dir): file_name = os.path.join(model_dir, 'metabolism.xml') - if os.path.exists(file_name) is False: + if os.path.exists(file_name) is True: + root = parse(file_name).getroot() + assert root.tag == 'RBAMetabolism' + self.compartments = RbaCompartment.import_xml(root.find('listOfCompartments')) + self.species = RbaSpecies.import_xml(root.find('listOfSpecies')) + self.reactions = RbaReaction.import_xml(root.find('listOfReactions')) + else: print(f'{file_name} not found!') - return - tree = parse(file_name) - root = tree.getroot() - assert root.tag == 'RBAMetabolism' - - self.compartments = RbaCompartment.import_xml(root.find('listOfCompartments')) - self.species = RbaSpecies.import_xml(root.find('listOfSpecies')) - self.reactions = RbaReaction.import_xml(root.find('listOfReactions')) - - def from_df(self, df_c, df_s, df_r): - self.compartments = RbaCompartment.from_df(df_c) - self.species = RbaSpecies.from_df(df_s) - self.reactions = RbaReaction.from_df(df_r) + def from_df(self, m_dict): + if 'compartments' in m_dict: + self.compartments = RbaCompartment.from_df(m_dict['compartments']) + else: + print(f'compartments not imported!') + if 'species' in m_dict: + self.species = RbaSpecies.from_df(m_dict['species']) + else: + print(f'species not imported!') + if 'reactions' in m_dict: + self.reactions = RbaReaction.from_df(m_dict['reactions']) + else: + print(f'reactions not imported!') def export_xml(self, model_dir): file_name = os.path.join(model_dir, 'metabolism.xml') @@ -64,7 +70,7 @@ class RbaMetabolism: df_s.set_index('species', inplace=True) df_r = pd.DataFrame([item.to_dict() for item in self.reactions.values()]) df_r.set_index('reaction', inplace=True) - return df_c, df_s, df_r + return {'compartments': df_c, 'species': df_s, 'reactions':df_r} def validate(self, component_ids): valid = True diff --git a/rbaxdf/model/rba_model.py b/rbaxdf/model/rba_model.py index cc206dcd61cfb846df1fe7b1b2ca5c73504d9ef0..67430dc298b3d9b5dea43ca068e5aae948ca21b7 100644 --- a/rbaxdf/model/rba_model.py +++ b/rbaxdf/model/rba_model.py @@ -13,9 +13,10 @@ from .rba_processes import RbaProcesses from .rba_enzymes import RbaEnzymes from .rba_densities import RbaDensities from .rba_targets import RbaTargets +from .rba_medium import RbaMedium components = {'parameters', 'dna', 'rnas', 'proteins', 'metabolism', - 'processes', 'enzymes', 'densities', 'targets'} + 'processes', 'enzymes', 'densities', 'targets', 'medium'} class RbaModel: @@ -24,7 +25,6 @@ class RbaModel: """initialyze RBA model """ self.model_dir = model_dir - self.is_model = False self.dna = RbaMacromolecules('dna') self.rnas = RbaMacromolecules('rnas') self.proteins = RbaMacromolecules('proteins') @@ -34,15 +34,14 @@ class RbaModel: self.densities = RbaDensities() self.targets = RbaTargets() self.enzymes = RbaEnzymes() + self.medium = RbaMedium() - if os.path.exists(model_dir) is False: - print(f'{model_dir} not found!') - raise FileNotFoundError + self.set_model_dir(model_dir) def set_model_dir(self, model_dir): if os.path.exists(model_dir) is False: os.makedirs(model_dir) - print(f'{model_dir} created') + print(f'based directory {model_dir} created') self.model_dir = model_dir def import_xml(self): @@ -51,39 +50,22 @@ class RbaModel: getattr(self, component).import_xml(self.model_dir) print(f'RBA model imported from: {self.model_dir}') - self.is_model = True + def from_df(self, m_dict): + for component in components: + getattr(self, component).from_df(m_dict) + print(f'RBA model imported from dict of dataframes') def export_xml(self): - for component in components: getattr(self, component).export_xml(self.model_dir) - print(f'RBA model exported to: {self.model_dir}') def to_df(self): m_dict = {} - if self.is_model is True: - m_dict['compartments'], m_dict['species'], m_dict['reactions'] = self.metabolism.to_df() - m_dict['dna'] = self.dna.to_df() - m_dict['rnas'] = self.rnas.to_df() - m_dict['proteins'] = self.proteins.to_df() - m_dict['enzymes'] = self.enzymes.to_df() - m_dict['densities'] = self.densities.to_df() - m_dict['targets'] = self.targets.to_df() - m_dict['functions'], m_dict['aggregates'] = self.parameters.to_df() - m_dict['processes'], m_dict['processing_maps'] = self.processes.to_df() + for component in components: + m_dict |= getattr(self, component).to_df() return m_dict - def from_df(self, m_dict): - self.metabolism.from_df(m_dict['compartments'], m_dict['species'], m_dict['reactions']) - self.dna.from_df(m_dict['dna']) - self.rnas.from_df(m_dict['rnas']) - self.proteins.from_df(m_dict['proteins']) - self.densities.from_df(m_dict['densities']) - self.targets.from_df(m_dict['targets']) - self.parameters.from_df(m_dict['functions'], m_dict['aggregates']) - self.processes.from_df(m_dict['processes'], m_dict['processing_maps']) - def to_excel(self): xlsx_name = os.path.join(self.model_dir, 'model') + '.xlsx' m_dict = self.to_df() @@ -106,7 +88,19 @@ class RbaModel: with pd.ExcelWriter(xlsx_name) as writer: for name, df in m_dict.items(): df.to_excel(writer, sheet_name=name) - print(f'model exported to {xlsx_name}') + print(f'RBA model exported to {xlsx_name}') + + def from_excel(self): + xlsx_name = os.path.join(self.model_dir, 'model') + '.xlsx' + if os.path.exists(xlsx_name) is True: + m_dict = {} + with pd.ExcelFile(xlsx_name) as xlsx: + for sheet in xlsx.sheet_names: + m_dict[sheet] = pd.read_excel(xlsx, sheet_name=sheet, index_col=0) + self.from_df(m_dict) + print(f'RBA model imported from {xlsx_name}') + else: + print('Excel document not found: ' + xlsx_name) def validate(self): component_ids = {'species': set(self.metabolism.species), @@ -119,7 +113,7 @@ class RbaModel: valid = True for component in components: valid = valid and getattr(self, component).validate(component_ids) - print(f'model valid status: {valid}') + print(f'RBA model valid status: {valid}') return valid def check_unused(self): @@ -151,6 +145,3 @@ class RbaModel: unused += len(unused_molecules) if unused == 0: print('no unused parameters/molecules') - - def from_excel(self): - pass diff --git a/rbaxdf/model/rba_parameters.py b/rbaxdf/model/rba_parameters.py index 629fb16ce49ce14e977ebf2692616d039d131615..2fa4992fdff243d3d007b2cfe9e8654ac130dfa0 100644 --- a/rbaxdf/model/rba_parameters.py +++ b/rbaxdf/model/rba_parameters.py @@ -19,18 +19,23 @@ class RbaParameters: def import_xml(self, model_dir): file_name = os.path.join(model_dir, 'parameters.xml') - if os.path.exists(file_name) is False: + if os.path.exists(file_name) is True: + root = parse(file_name).getroot() + assert root.tag == 'RBAParameters' + self.functions = RbaFunction.import_xml(root.find('listOfFunctions')) + self.aggregates = RbaAggregate.import_xml(root.find('listOfAggregates')) + else: print(f'{file_name} not found!') - return - tree = parse(file_name) - root = tree.getroot() - assert root.tag == 'RBAParameters' - self.functions = RbaFunction.import_xml(root.find('listOfFunctions')) - self.aggregates = RbaAggregate.import_xml(root.find('listOfAggregates')) - def from_df(self, df_f, df_a): - self.functions = RbaFunction.from_df(df_f) - self.aggregates = RbaAggregate.from_df(df_a) + def from_df(self, m_dict): + if 'functions' in m_dict: + self.functions = RbaFunction.from_df(m_dict['functions']) + else: + print(f'functions not imported!') + if 'aggregates' in m_dict: + self.aggregates = RbaAggregate.from_df(m_dict['aggregates']) + else: + print(f'aggregates not imported!') def export_xml(self, model_dir): @@ -61,7 +66,7 @@ class RbaParameters: df_f.set_index('function', inplace=True) df_a = pd.DataFrame([item.to_dict() for item in self.aggregates.values()]) df_a.set_index('aggregate', inplace=True) - return df_f, df_a + return {'functions': df_f, 'aggregates': df_a} def validate(self, component_ids): valid = True diff --git a/rbaxdf/model/rba_processes.py b/rbaxdf/model/rba_processes.py index 83c2f8a0def3c71a73fc212d03a211623fcd1ec5..1af88cf7a45e9aab597b3fd276c5317a18ffe6a7 100644 --- a/rbaxdf/model/rba_processes.py +++ b/rbaxdf/model/rba_processes.py @@ -21,19 +21,23 @@ class RbaProcesses: def import_xml(self, model_dir): file_name = os.path.join(model_dir, 'processes.xml') - if os.path.exists(file_name) is False: + if os.path.exists(file_name) is True: + root = parse(file_name).getroot() + assert root.tag == 'RBAProcesses' + self.processes = RbaProcess.import_xml(root.find('listOfProcesses')) + self.processing_maps = RbaProcessingMap.import_xml(root.find('listOfProcessingMaps')) + else: print(f'{file_name} not found!') - return - tree = parse(file_name) - root = tree.getroot() - assert root.tag == 'RBAProcesses' - self.processes = RbaProcess.import_xml(root.find('listOfProcesses')) - self.processing_maps = RbaProcessingMap.import_xml(root.find('listOfProcessingMaps')) - - def from_df(self, df_p, df_pm): - self.processes = RbaProcess.from_df(df_p) - self.processing_maps = RbaProcessingMap.from_df(df_pm) + def from_df(self, m_dict): + if 'processes' in m_dict: + self.processes = RbaProcess.from_df(m_dict['processes']) + else: + print(f'processes not imported!') + if 'processingMaps' in m_dict: + self.processing_maps = RbaProcessingMap.from_df(m_dict['processingMaps']) + else: + print(f'processingMaps not imported!') def export_xml(self, model_dir): @@ -70,7 +74,7 @@ class RbaProcesses: 'reactants', 'products']) df_pm.set_index('processingMap', inplace=True) - return df_p, df_pm + return {'processes': df_p, 'processingMaps': df_pm} def validate(self, component_ids): valid = True @@ -194,23 +198,23 @@ class RbaProcess: for pid, row in df.iterrows(): rba_process = RbaProcess(pid) rba_process.name = row['name'] - capacity_dict = extract_params(row['machineryCapacity']) - capacity = RbaTargetValue.from_dict(capacity_dict) - reactants = get_species_refs_from_str(row['machineryReactants']) - products = get_species_refs_from_str(row['machineryProducts']) - rba_process.machinery = {'capacity': capacity, 'reactants': reactants, - 'products': products} - - processing_map = row['productionProcessingMap'] - set_type = row['productionSet'] - inputs = [item.strip() for item in row['productionInputs'].split(',')] - if type(processing_map) is str and len(processing_map) > 0: + if type(row['machineryCapacity']) is str and len(row['machineryCapacity']) > 0: + capacity_dict = extract_params(row['machineryCapacity']) + capacity = RbaTargetValue.from_dict(capacity_dict) + reactants = get_species_refs_from_str(row['machineryReactants']) + products = get_species_refs_from_str(row['machineryProducts']) + rba_process.machinery = {'capacity': capacity, 'reactants': reactants, + 'products': products} + if type(row['productionProcessingMap']) is str and len(row['productionProcessingMap']) > 0: + processing_map = row['productionProcessingMap'] + set_type = row['productionSet'] + inputs = [item.strip() for item in row['productionInputs'].split(',')] rba_process.productions = {'processingMap': processing_map, 'set': set_type, 'inputs': inputs} - processing_map = row['degradationProcessingMap'] - set_type = row['degradationSet'] - inputs = [item.strip() for item in row['degradationInputs'].split(',')] - if type(processing_map) is str and len(processing_map) > 0: + if type(row['degradationProcessingMap']) is str and len(row['degradationProcessingMap']) > 0: + processing_map = row['degradationProcessingMap'] + set_type = row['degradationSet'] + inputs = [item.strip() for item in row['degradationInputs'].split(',')] rba_process.degradations = {'processingMap': processing_map, 'set': set_type, 'inputs': inputs} data[pid] = rba_process diff --git a/rbaxdf/model/rba_targets.py b/rbaxdf/model/rba_targets.py index 477807689ae88f234c64b7d5e91a554e8261bcf0..ad35e4458a5dcc5eba7eab8eb2929caa831bc130 100644 --- a/rbaxdf/model/rba_targets.py +++ b/rbaxdf/model/rba_targets.py @@ -18,18 +18,18 @@ class RbaTargets: def import_xml(self, model_dir): file_name = os.path.join(model_dir, 'targets.xml') - if os.path.exists(file_name) is False: + if os.path.exists(file_name) is True: + root = parse(file_name).getroot() + assert root.tag == 'RBATargets' + self.target_groups = RbaTargetGroup.import_xml(root.find('listOfTargetGroups')) + else: print(f'{file_name} not found!') - return {} - tree = parse(file_name) - root = tree.getroot() - assert root.tag == 'RBATargets' - - self.target_groups = RbaTargetGroup.import_xml(root.find('listOfTargetGroups')) - - def from_df(self, df): - self.target_groups = RbaTargetGroup.from_df(df) + def from_df(self, m_dict): + if 'targets' in m_dict: + self.target_groups = RbaTargetGroup.from_df(m_dict['targets']) + else: + print(f'targets not imported!') def export_xml(self, model_dir): file_name = os.path.join(model_dir, 'targets.xml') @@ -50,7 +50,7 @@ class RbaTargets: data.append([tgid, target_type, target, value]) df = pd.DataFrame(data, columns=['targetGroup', 'targetType', 'target', 'targetValue']) df.set_index('targetGroup', inplace=True) - return df + return {'targets': df} def validate(self, component_ids): valid = True diff --git a/rbaxdf/utils/utils.py b/rbaxdf/utils/utils.py index 2c90795d739fef9c682e0c7937424f228d123475..4338fa1c3dfcdd07305b64f0821acda1e81c3164 100644 --- a/rbaxdf/utils/utils.py +++ b/rbaxdf/utils/utils.py @@ -35,10 +35,11 @@ def get_species_refs_from_xml(srefs_parent): def get_species_refs_from_str(srefs_str): srefs = {} - for sref in srefs_str.split(';'): - params = extract_params(sref) - if 'species' in params and 'stoic' in params: - srefs[params['species']] = float(params['stoic']) + if type(srefs_str) is str: + for sref in srefs_str.split(';'): + params = extract_params(sref) + if 'species' in params and 'stoic' in params: + srefs[params['species']] = float(params['stoic']) return srefs @@ -55,10 +56,11 @@ def extract_params(record): :rtype: dict """ params = {} - for kv_pair in record_generator(record, sep=','): - if '=' in kv_pair: - k, v = kv_pair.split('=') - params[k.strip()] = v.strip() + if type(record) is str: + for kv_pair in record_generator(record, sep=','): + if '=' in kv_pair: + k, v = kv_pair.split('=') + params[k.strip()] = v.strip() return params