Source code for tellurium.sedml.data

"""
Reading NUML, CSV and TSV data from DataDescriptions
"""
from __future__ import print_function, absolute_import
import os
import logging
import warnings
import tempfile
import numpy as np
import pandas as pd

import urllib.request

try:
    import libnuml
except ImportError:
    import tenuml as libnuml

log = logging.getLogger('sedml-data')


[docs] class DataDescriptionParser(object): """ Class for parsing DataDescriptions. """ FORMAT_URN = "urn:sedml:format:" FORMAT_NUML = "urn:sedml:format:numl" FORMAT_CSV = "urn:sedml:format:csv" FORMAT_TSV = "urn:sedml:format:tsv" # supported formats SUPPORTED_FORMATS = [FORMAT_NUML, FORMAT_CSV, FORMAT_TSV]
[docs] @classmethod def parse(cls, dd, workingDir=None): """ Parses single DataDescription. Returns dictionary of data sources {DataSource.id, slice_data} :param dd: SED-ML DataDescription :param workingDir: workingDir relative to which the sources are resolved :return: """ did = dd.getId() name = dd.getName() source = dd.getSource() # ------------------------------- # Resolve source # ------------------------------- # FIXME: this must work for absolute paths and URL paths if workingDir is None: workingDir = '.' # TODO: refactor in general resource module (for resolving anyURI and resource) tmp_file = None if source.startswith('http') or source.startswith('HTTP'): webURL = urllib.request.urlopen(source) data = webURL.read() try: file_str = str(data.decode("utf-8")) except: file_str = str(data) tmp_file = tempfile.NamedTemporaryFile("w", delete=False) tmp_file.write(file_str) tmp_file.close() source_path = tmp_file.name else: source_path = os.path.join(workingDir, source) # ------------------------------- # Find the format # ------------------------------- format = None if hasattr(dd, "getFormat"): format = dd.getFormat() format = cls._determine_format(source_path=source_path, format=format) # log data description log.info('-' * 80) log.info('DataDescription: :', dd) log.info('\tid:', did) log.info('\tname:', name) log.info('\tsource', source) log.info('\tformat', format) # ------------------------------- # Parse DimensionDescription # ------------------------------- # FIXME: uses the data_types to check the actual data type dim_description = dd.getDimensionDescription() if dim_description is not None: data_types = cls.parse_dimension_description(dim_description) else: data_types = None # ------------------------------- # Load complete data # ------------------------------- data = None if format == cls.FORMAT_CSV: data = cls._load_csv(path=source_path) elif format == cls.FORMAT_TSV: data = cls._load_tsv(path=source_path) elif format == cls.FORMAT_NUML: data = cls._load_numl(path=source_path) # log data log.info("-" * 80) log.info("Data") log.info("-" * 80) if format in [cls.FORMAT_CSV, cls.FORMAT_TSV]: log.info(data.head(10)) elif format == cls.FORMAT_NUML: # multiple result components via id for result in data: log.info(result[0]) # rc id log.info(result[1].head(10)) # DataFrame log.info("-" * 80) # ------------------------------- # Process DataSources # ------------------------------- data_sources = {} for k, ds in enumerate(dd.getListOfDataSources()): dsid = ds.getId() # log DataSource log.info('\n\t*** DataSource:', ds) log.info('\t\tid:', ds.getId()) log.info('\t\tname:', ds.getName()) log.info('\t\tindexSet:', ds.getIndexSet()) log.info('\t\tslices') # CSV/TSV if format in [cls.FORMAT_CSV, cls.FORMAT_TSV]: if len(ds.getIndexSet()) > 0: # if index set we return the index data_sources[dsid] = pd.Series(data.index.tolist()) else: sids = [] for slice in ds.getListOfSlices(): # FIXME: this does not handle multiple slices for rows # print('\t\t\treference={}; value={}'.format(slice.getReference(), slice.getValue())) sids.append(slice.getValue()) # slice values are columns from data frame try: data_sources[dsid] = data[sids].values except KeyError as e: # something does not fit between data and data sources print("-" * 80) print("Format:", format) print("Source:", source_path) print("-" * 80) print(data) print("-" * 80) raise # NUML elif format == cls.FORMAT_NUML: # Using the first results component only in SED-ML L1V3 rc_id, rc, data_types = data[0] index_set = ds.getIndexSet() if ds.getIndexSet() and len(ds.getIndexSet()) != 0: # data via indexSet data_source = rc[index_set].drop_duplicates() data_sources[dsid] = data_source else: # data via slices for slice in ds.getListOfSlices(): reference = slice.getReference() value = slice.getValue() df = rc.loc[rc[reference] == value] # select last column with values data_sources[dsid] = df.iloc[:, -1] # log data sources log.info("-" * 80) log.info("DataSources") log.info("-" * 80) for key, value in data_sources.items(): log.info('{} : {}; shape={}'.format(key, type(value), value.shape)) log.info("-" * 80) # cleanup # FIXME: handle in finally if tmp_file is not None: os.remove(tmp_file.name) return data_sources
@classmethod def _determine_format(cls, source_path, format=None): """ :param source_path: path of file :param format: format given in the DataDescription :return: """ if format is None or format == "": is_xml = False with open(source_path) as unknown_file: start_str = unknown_file.read(1024) start_str = start_str.strip() if start_str.startswith('<'): is_xml = True if is_xml: # xml format is numl format = cls.FORMAT_NUML # defaults to numl else: # format is either csv or tsv df_csv = cls._load_csv(source_path) df_tsv = cls._load_tsv(source_path) if df_csv.shape[1] >= df_tsv.shape[1]: format = cls.FORMAT_CSV else: format = cls.FORMAT_TSV # base format if format.startswith(cls.FORMAT_NUML): format = cls.FORMAT_NUML # check supported formats if format not in cls.SUPPORTED_FORMATS: raise NotImplementedError("Format '{}' not supported for DataDescription. Format must be in: {}".format(format, cls.SUPPORTED_FORMATS)) return format @classmethod def _load_csv(cls, path): """ Read CSV data from file. :param path: path of file :return: returns pandas DataFrame with data """ return cls._load_sv(path, separator=",") @classmethod def _load_tsv(cls, path): """ Read TSV data from file. :param path: path of file :return: returns pandas DataFrame with data """ return cls._load_sv(path, separator="\t") @classmethod def _load_sv(cls, path, separator): """ Helper function for loading data file from given source. CSV files must have a header. Handles file and online resources. :param path: path of file. :return: pandas data frame """ df = pd.read_csv(path, sep=separator, index_col=False, skip_blank_lines=True, quotechar='"', comment="#", skipinitialspace=True, na_values="nan") return df
[docs] @classmethod def read_numl_document(cls, path): """ Helper to read numl document and checking errors :param path: path of file :return: """ doc_numl = libnuml.readNUMLFromFile(path) # type: libnuml.NUMLDocument # check for errors errorlog = doc_numl.getErrorLog() msg = "NUML ERROR in '{}': {}".format(path, errorlog.toString()) if errorlog.getNumFailsWithSeverity(libnuml.LIBNUML_SEV_ERROR) > 0: raise IOError(msg) if errorlog.getNumFailsWithSeverity(libnuml.LIBNUML_SEV_FATAL) > 0: raise IOError(msg) if errorlog.getNumFailsWithSeverity(libnuml.LIBNUML_SEV_WARNING) > 0: warnings.warn(msg) if errorlog.getNumFailsWithSeverity(libnuml.LIBNUML_SEV_SCHEMA_ERROR) > 0: warnings.warn(msg) if errorlog.getNumFailsWithSeverity(libnuml.LIBNUML_SEV_GENERAL_WARNING) > 0: warnings.warn(msg) return doc_numl
@classmethod def _load_numl(cls, path): """ Reading NuML data from file. This loads the complete numl data. For more information see: https://github.com/numl/numl :param path: NuML path :return: data """ doc_numl = DataDescriptionParser.read_numl_document(path) # reads all the resultComponents from the numl file results = [] Nrc = doc_numl.getNumResultComponents() rcs = doc_numl.getResultComponents() log.info('\nNumResultComponents:', Nrc) for k in range(Nrc): rc = rcs.get(k) # parse ResultComponent rc_id = rc.getId() # dimension info description = rc.getDimensionDescription() data_types = cls.parse_dimension_description(description) # data dimension = rc.getDimension() assert (isinstance(dimension, libnuml.Dimension)) data = [cls._parse_dimension(dimension.get(k)) for k in range(dimension.size())] # create data frame flat_data = [] for entry in data: for part in entry: flat_data.append(part) # column ids from DimensionDescription column_ids = [] for entry in data_types: for cid, dtype in entry.items(): column_ids.append(cid) df = pd.DataFrame(flat_data, columns=column_ids) # convert data types to actual data types for entry in data_types: for cid, dtype in entry.items(): if dtype == 'double': df[cid] = df[cid].astype(np.float64) elif dtype == 'string': df[cid] = df[cid].astype(str) # convert all the individual columns to the corresponding data types # df = df.apply(pd.to_numeric, errors="ignore") results.append([rc_id, df, data_types]) return results
[docs] @classmethod def parse_dimension_description(cls, description): """ Parses the given dimension description. Returns dictionary of { key: dtype } :param description: :return: """ assert (isinstance(description, libnuml.DimensionDescription)) info = [cls._parse_description(description.get(k)) for k in range(description.size())] flat_info = [] for entry in info: for part in entry: flat_info.append(part) return flat_info
@classmethod def _parse_description(cls, d, info=None, entry=None): """ Parses the recursive DimensionDescription, TupleDescription, AtomicDescription. This gets the dimension information from NuML. <dimensionDescription> <compositeDescription indexType="double" id="time" name="time"> <compositeDescription indexType="string" id="SpeciesIds" name="SpeciesIds"> <atomicDescription valueType="double" id="Concentrations" name="Concentrations" /> </compositeDescription> </compositeDescription> </dimensionDescription> :param d: :param info: :return: """ if info is None: info = [] if entry is None: entry = [] type_code = d.getTypeCode() # print('typecode:', type_code) if type_code == libnuml.NUML_COMPOSITEDESCRIPTION: content = {d.getId(): d.getIndexType()} info.append(content) # print('\t* CompositeDescription:', content) if d.isContentCompositeDescription(): for k in range(d.size()): info = cls._parse_description(d.getCompositeDescription(k), info, list(entry)) elif d.isContentAtomicDescription(): info = cls._parse_description(d.getAtomicDescription(), info, entry) elif type_code == libnuml.NUML_ATOMICDESCRIPTION: content = {d.getId(): d.getValueType()} info.append(content) # print('\t* AtomicDescription:', content) elif type_code == libnuml.NUML_TUPLEDESCRIPTION: tuple_des = d.getTupleDescription() Natomic = d.size() valueTypes = [] for k in range(Natomic): atomic = tuple_des.getAtomicDescription(k) valueTypes.append(atomic.getValueType()) info.append(valueTypes) # print('\t* TupleDescription:', valueTypes) else: raise NotImplementedError("Type code: {}".format(type_code)) return info @classmethod def _parse_dimension(cls, d, data=None, entry=None): """ Parses the recursive CompositeValue, Tuple, AtomicValue. This gets the actual data from NuML. :param d: :param data: :return: """ if data is None: data = [] if entry is None: entry = [] type_code = d.getTypeCode() # print('typecode:', type_code) if type_code == libnuml.NUML_COMPOSITEVALUE: indexValue = d.getIndexValue() entry.append(indexValue) # print('\t* CompositeValue:', indexValue) if d.isContentCompositeValue(): for k in range(d.size()): # make copy, so every entry is own entry data = cls._parse_dimension(d.getCompositeValue(k), data, list(entry)) elif d.isContentAtomicValue(): data = cls._parse_dimension(d.getAtomicValue(), data, entry) elif type_code == libnuml.NUML_ATOMICVALUE: # Data is converted to correct # value = d.getDoubleValue() value = d.getValue() entry.append(value) # entry finished, we are appending data.append(entry) # print('\t* AtomicValue:', value) elif type_code == libnuml.NUML_TUPLE: tuple = d.getTuple() Natomic = d.size() values = [] for k in range(Natomic): atomic = tuple.getAtomicValue(k) values.append(atomic.getDoubleValue()) data.append(values) # print('\t* TupleDescription:', values) else: raise NotImplementedError return data