Source code for msml.xml

# region gplv3preamble
# The Medical Simulation Markup Language (MSML) - Simplifying the biomechanical modeling workflow
#
# MSML has been developed in the framework of 'SFB TRR 125 Cognition-Guided Surgery'
#
# If you use this software in academic work, please cite the paper:
# S. Suwelack, M. Stoll, S. Schalck, N.Schoch, R. Dillmann, R. Bendl, V. Heuveline and S. Speidel,
# The Medical Simulation Markup Language (MSML) - Simplifying the biomechanical modeling workflow,
# Medicine Meets Virtual Reality (MMVR) 2014
#
# Copyright (C) 2013-2014 see Authors.txt
#
# If you have any questions please feel free to contact us at suwelack@kit.edu
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# endregion


""" msml.msml_xml provides functionality to read and process MSML XML-Files to the msml Python models.

"""
from __future__ import print_function

from lxml import etree

from path import path

from msml.model import *
from msml.model.base import SceneObject
import msml.log
from .exceptions import MSMLError


__author__ = "Alexander Weigl"
__date__ = "2014-01-25"

__all__ = ['load_alphabet', 'load_msml_file', 'xmldom']


[docs]def load_alphabet(folder=None, file_list=None): """ Load and build the Alphabet :param folder: a name of a folder :type folder: str or path.path :param list file_list: list of file names :return: an alphabet object, not validated :rtype: Alphabet """ if not file_list: file_list = [] if folder: folder = path(folder) file_list += folder.walkfiles("*.xml") d = lambda x: xmldom(x.abspath(), "a.xsd") # Enable xsd docs = map(d, file_list) results = map(parse_file, docs) return Alphabet(results)
[docs]def load_msml_file(fil): """ Process the given XML-File to and MSMLFile object. Args: fil (str): filename of an existing XML file Returns: MSMLFile """ msml_node = xmldom(fil) obj = msml_file_factory(msml_node) obj.filename = path(fil) return obj
def parse_file(R): tag = _tag_name(R.tag) if tag in _parse_hooks: hook = _parse_hooks[tag] return hook(R) else: msml.log.fatal("for element %s is no parse hook registered" % tag) def get_default_scheme(): return path(__file__).dirname() / "msml.xsd"
[docs]def xmldom(files, schema=None): if not schema: schema = get_default_scheme() try: with open(schema, 'r') as f: schema_root = etree.XML(f.read()) schema = etree.XMLSchema(schema_root) except BaseException as e: # raise e schema = None xmlparser = etree.XMLParser(schema=schema, remove_comments=True, remove_pis=True, remove_blank_text=True) def xmlopen(xmlfilename): try: with open(xmlfilename, 'r') as f: return etree.fromstring(f.read(), xmlparser) except BaseException, e: raise MSMLError("could not read %s" % xmlfilename, e) if isinstance(files, (str, path, file)): return xmlopen(files) else: return map(xmlopen, files)
def etree_to_dict(t): d = {t.tag: map(etree_to_dict, t.iterchildren())} d.update(('@' + k, v) for k, v in t.attrib.iteritems()) d['text'] = t.text return d def keyval_factory(meta_node): if meta_node is not None: return _parse_entry_list(meta_node.iterchildren()) else: return {} def _except_none(dic, key): try: return dic[key] except: return None def _tag_name(tag): p = tag.rfind('}') if p: return tag[p + 1:] return tag def _attributes(node, attribs, **defaults): """ """ def _get(key): try: return node.attrib[key] except KeyError: try: return defaults[key] except KeyError: return None if isinstance(attribs, str): attribs = attribs.split(', ') if len(attribs) == 1: return _get(attribs[0]) else: return (_get(x) for x in attribs) import itertools def _parse_task(task_node): if task_node is None: return [] else: attrib = dict(task_node.attrib) task = Task(name=_tag_name(task_node.tag), attrib=attrib) sub = list(itertools.chain(*map(_parse_task, task_node.iterchildren()))) task.sub_tasks = sub return [task] + sub def msml_file_factory(msml_node): def _parse_variables(var_node): vars = [] if var_node is None: return vars for n_var in var_node.iterchildren(): get = lambda x: _except_none(n_var.attrib, x) name = get('name') physical = get('physical') logical = get('logical') role = get('role') if _tag_name(n_var.tag) == 'var': v = MSMLVariable(name, physical, logical, get('value'), role) if _tag_name(n_var.tag) == 'file': v = MSMLFileVariable(name, physical, logical, get('location'), role) vars.append(v) return vars def _parse_workflow(wf_node): wf = Workflow() tasks = map(_parse_task, wf_node.iterchildren()) for t in tasks: for s in t: wf.add_task(s) return wf def _parse_elements(parent_node): def _parse_element(element_node): params = dict(element_node.attrib) params['__tag__'] = _tag_name(element_node.tag) o = ObjectElement(params) return o if parent_node is not None: return list(map(_parse_element, parent_node.iterchildren())) else: return list() def _parse_object(object_node): '''example: <object id="bunny"> <mesh> <linearTet id="bunnyMesh" mesh="${bunnyVolumeMesher}" /> </mesh> <sets> <nodes> <indexgroup id="constraintRegion" indices="@{bottomToIndexGroup}" /> </nodes> <elements> <indexgroup id="bodyRegion" indices="@{bodyToIndexGroup}" /> </elements> <surfaces> <indexgroup id="constraintRegion" indices="@{bottomToIndexGroup}" /> </surfaces> </sets> <material> <region id="bunnyMaterial"> <!--<materialRegion name=""> --> <indexGroup indices="@bodyRegion" /> <linearElastic youngModulus="80000" poissonRatio="0.49" /> <mass density="1000" /> </region> </material> <constraints> <constraint name="bodyConstraint" forStep="${initial}"> </constraint> </constraints> <output> <displacement name="Liver" timestep="1" /> </output> </object> ''' if _tag_name(object_node.tag) != 'object': msml.log.warn("only 'object' is supported, group or " "other elements are not allowed, found: %s'" % object_node.tag) def _parse_mesh(mesh_node): 'example: <linearTet id="bunnyMesh" mesh="${bunnyVolumeMesher}" />' t = _tag_name(mesh_node.tag) m = mesh_node.attrib['mesh'] i = mesh_node.attrib['id'] return Mesh(t, i, m) def _parse_contactGeometry(contactGeometry_node): 'example: <contactsurface id="contactSurface" surface="${MovingSurface}" exportFile="moving_contact_surface.vtu"/>' t = _tag_name(contactGeometry_node.tag) m = contactGeometry_node.attrib['surface'] i = contactGeometry_node.attrib['id'] #check for optional argument 'exportFile' exportFile = contactGeometry_node.attrib['exportFile'] if 'exportFile' in contactGeometry_node.attrib else '' return ContactGeometry(t, i, m,exportFile) def _parse_constraints(constraints_node): '''example: <constraints> <constraint name="bodyConstraint" forStep="${initial}"> </constraint> </constraints>''' def _parse_constraint(constraint_node): name, fs = _attributes(constraint_node, 'name, forStep') oc = ObjectConstraints(name, fs) oc.constraints = _parse_elements(constraint_node) return oc return map(_parse_constraint, constraints_node.iterchildren()) def _parse_material(mat_node): ''' <material> <region id="bunnyMaterial"> <indexGroup indices="@bodyRegion" /> <linearElastic youngModulus="80000" poissonRatio="0.49" /> <mass density="1000" /> </region> </material> ''' if mat_node is None: return list() def _parse_region(reg_node): ident, ind = _attributes(reg_node, ['id', 'indices']) elements = list(_parse_elements(reg_node)) return MaterialRegion(ident, ind, elements) return map(_parse_region, mat_node.iterchildren()) oid = object_node.attrib['id'] scene_object = SceneObject(oid) mesh_node = object_node.find('mesh')[0] mesh = _parse_mesh(mesh_node) sets_node = object_node.find('sets') sets = SceneObjectSets() if (sets_node is not None): element_sets = sets_node.find('elements') nodes_sets = sets_node.find('nodes') surfaces_sets = sets_node.find('surfaces') sets.nodes = _parse_indexgroup_list(nodes_sets) sets.surfaces = _parse_indexgroup_list(surfaces_sets) sets.elements = _parse_indexgroup_list(element_sets) constraints = ObjectConstraints(name="empty") constraints_node = object_node.find('constraints') if(constraints_node is not None): constraints = _parse_constraints(constraints_node) material_node = object_node.find('material') material = _parse_material(material_node) output_node = object_node.find('output') output = _parse_elements(output_node) scene_object.mesh = mesh scene_object.output = output scene_object.sets = sets scene_object.material = material scene_object.constraints = constraints #collision detection stuff #get contact surface contactGeometryNode = object_node.find("contactsurface") if(contactGeometryNode is not None): scene_object.contactGeometry = _parse_contactGeometry(contactGeometryNode) return scene_object def _parse_scene(scene_node): return map(_parse_object, scene_node.iterchildren()) def _parse_indexgroup_list(node): if node is not None: return map(lambda n: IndexGroup(*_attributes(n, 'id, indices')), node.iterchildren()) else: return list() def _parse_environment(env_node): env = MSMLEnvironment() solver_node = env_node.find('solver') env.solver.processingUnit = solver_node.get('processingUnit') # NEW. env.solver.numParallelProcessesOnCPU = solver_node.get('numParallelProcessesOnCPU') env.solver.hf3_chanceOfContactBoolean = solver_node.get('hf3_chanceOfContactBoolean') # NEW. env.solver.linearSolver = solver_node.get('linearSolver') env.solver.preconditioner = solver_node.get('preconditioner') env.solver.timeIntegration = solver_node.get('timeIntegration') env.solver.dampingRayleighRatioMass = solver_node.get('dampingRayleighRatioMass') env.solver.dampingRayleighRatioStiffness = solver_node.get('dampingRayleighRatioStiffness') env.solver.mass = solver_node.get('mass') simulation_node = env_node.find('simulation') for s in simulation_node.iterchildren(): #parse gravity attribute, if given gravityAttrib = s.get('gravity') gravity = None if(gravityAttrib is not None): gravity = map(float, gravityAttrib.split()) env.simulation.add_step(name=s.get('name'), dt=s.get('dt'), iterations=s.get('iterations'), gravity=gravity) return env n_variables = msml_node.find('variables') n_workflow = msml_node.find('workflow') n_scene = msml_node.find('scene') n_environment = msml_node.find('environment') if n_scene is not None: scene = _parse_scene(n_scene) else: scene = None if n_environment is not None: env = _parse_environment(n_environment) else: env = None return MSMLFile(variables=_parse_variables(n_variables), env=env, scene=scene, workflow=_parse_workflow(n_workflow)) from collections import OrderedDict def _parse_entry_list(nodelist): d = OrderedDict() for node in nodelist: key = node.attrib['key'] if 'value' in node.attrib: value = node.attrib['value'] else: value = node.text if value: d[key] = value.strip() return d def _argument_sets(node, parent=None, as_ordered_dict=False): def _parse_arg(node): n, p, l, r, d, t = _attributes(node, 'name, physical, logical, required, default, target', required=True) meta = _parse_entry_list(node.iterchildren()) arg = Slot(n, p, l, bool(int(r)), d, meta, parent) arg.target = msml.sorts._bool(t) return arg # def _parse_struct(node): # get = lambda k: _except_none(node.attrib, k) # return StructArgument(get('name'), _argument_sets(node)) def _parse_subelements(node): if node.tag == 'arg': return _parse_arg(node) # elif node.tag == 'struct': # return _parse_struct(node) else: raise BaseException("unexpecting element") if node is not None and len(node) > 0: result = map(_parse_subelements, node.iterchildren()) else: result = [] if as_ordered_dict: o = OrderedDict() for r in result: o[r.name] = r return o else: return result def operator_factory(operator_node): """ """ def runtime_factory(node): TAGS = {'python': ('function', 'method'), 'sh': ('file', 'wd'), 'so': ('file', 'symbol')} for k, v in TAGS.items(): n = node.find(k) if n is not None: d = dict(n.attrib) d['exec'] = k return d raise BaseException('unexpected runtime type!') name = operator_node.attrib['name'] n_runtime = operator_node.find('runtime') n_input = operator_node.find('input') n_output = operator_node.find('output') n_parameters = operator_node.find('parameters') n_annotation = operator_node.find('annotation') runtime = runtime_factory(n_runtime) op_classes = {'python': PythonOperator, 'sh': ShellOperator, 'so': SharedObjectOperator, } factory = op_classes[runtime['exec']] op = factory(name=name, runtime=runtime) input = _argument_sets(n_input, op, True) output = _argument_sets(n_output, op, True) parameters = _argument_sets(n_parameters, op, True) meta = keyval_factory(n_annotation) op.input = input op.output = output op.parameters = parameters op.meta = meta return op def element_factory(element_node): name = element_node.attrib['name'] quantity = element_node.attrib['quantity'] category = element_node.attrib['category'] clazz = ObjectAttribute.find_class(category) obj = clazz(name=name, quantity=quantity) description = element_node.find('description').text if description is not None: description = description.strip() input_node = element_node.find('input') parameter_node = element_node.find('parameters') input = _argument_sets(input_node, obj, True) parameters = _argument_sets(parameter_node, obj, True) obj.input = input obj.parameters = parameters obj.description = description return obj _parse_hooks = {'operator': operator_factory, 'element': element_factory}