Source code for ensemble.handler.symuvia.xmlparser

"""
Symuvia XML Parser
==================
A parser for trajectories from symuvia. 
"""

# ============================================================================
# STANDARD  IMPORTS
# ============================================================================
import re
from functools import cached_property

# ============================================================================
# INTERNAL IMPORTS
# ============================================================================

from ensemble.tools.constants import FIELD_FORMAT, FIELD_DATA

# ============================================================================
# CLASS AND DEFINITIONS
# ============================================================================


PATTERN = {
    "abs": re.compile(r'abs="(.*?)"'),
    "acc": re.compile(r'acc="(.*?)"'),
    "dst": re.compile(r'dst="(.*?)"'),
    "etat_pilotage": re.compile(
        r'dst="([\d\.]*?)"( etat_pilotage=".*?")? id="(.*?)"'
    ),
    "id": re.compile(
        r'dst="([\d\.]*?)"( etat_pilotage=".*?")? id="(.*?)" ord="(.*?)"'
    ),
    "ord": re.compile(r'ord="(.*?)"'),
    "tron": re.compile(r'tron="(.*?)"'),
    "type": re.compile(r'tron="(.*?)" type="(.*?)" vit="(.*?)"'),
    "vit": re.compile(r'vit="(.*?)"'),
    "voie": re.compile(r'voie="(.*?)"'),
    "z": re.compile(r'z="(.*?)"'),
    "traj": re.compile(
        r'abs="(.*?)" acc="(.*?)" dst="([\d\.]*?)"( etat_pilotage=".*?")? id="(.*?)" ord="(.*?)" tron="(.*?)" type="(.*?)" vit="(.*?)" voie="(.*?)" z="(.*?)"'
    ),
    "inst": re.compile(r'val="(.*?)"'),
    "nbveh": re.compile(r'nbVeh="(.*?)"'),
}

CAV_TYPE = tuple(value for key, value in FIELD_FORMAT.items())


[docs]class XMLTrajectory: """Model object for a trajectory, it can be created from a xml and contains trajectories for a set of vehicles.""" aliases = { "abscissa": "abs", "acceleration": "acc", "distance": "dst", "elevation": "z", "lane": "voie", "link": "tron", "ordinate": "ord", "speed": "vit", "vehid": "id", "vehtype": "type", } def __init__(self, xml: bytes): self._xml = xml.decode("UTF8") def __getattr__(self, name): if name == "aliases": raise AttributeError # http://nedbatchelder.com/blog/201010/surprising_getattr_recursion.html name = self.aliases.get(name, name) return object.__getattribute__(self, name) @cached_property def abs(self) -> tuple: """`abs` cached values for all vehicles in network Returns: tuple: cached `abs` values """ return tuple( map( FIELD_FORMAT["abs"], PATTERN.get("abs").findall(self._xml), ) ) @cached_property def acc(self): """`acceleration` cached values for all vehicles in network Returns: tuple: cached `acc` values """ return tuple( map(FIELD_FORMAT["acc"], PATTERN.get("acc").findall(self._xml)) ) @cached_property def dst(self): """`distance` cached values for all vehicles in network Returns: tuple: cached `dst` values """ return tuple( map(FIELD_FORMAT["dst"], PATTERN.get("dst").findall(self._xml)) ) @cached_property def driven(self): """alias for `etat_pilotage`""" return self.etat_pilotage @cached_property def etat_pilotage(self): """`etat_pilotage` cached values for all vehicles in network Returns: tuple: cached `etat_pilotage` values """ drv = PATTERN.get("etat_pilotage").findall(self._xml) if drv: return tuple( map(FIELD_FORMAT["etat_pilotage"], [x[1] for x in drv]) ) @cached_property def id(self): """Vehicle `id` cached values for all vehicles in network Returns: tuple: cached `id` values """ return tuple( map( FIELD_FORMAT["id"], [x[-2] for x in PATTERN.get("id").findall(self._xml)], ) ) @cached_property def ord(self): """`ordinate` cached values for all vehicles in network Returns: tuple: cached `ord` values """ return tuple( map(FIELD_FORMAT["ord"], PATTERN.get("ord").findall(self._xml)) ) @cached_property def tron(self): """`link` cached values for all vehicles in network Returns: tuple: cached `tron` values """ return tuple(PATTERN.get("tron").findall(self._xml)) @cached_property def type(self): """Vehicle `type` cached values for all vehicles in network Returns: tuple: cached `type` values """ return tuple(x[1] for x in PATTERN.get("type").findall(self._xml)) @cached_property def vit(self): """`speed` cached values for all vehicles in network Returns: tuple: cached `vit` values """ return tuple( map(FIELD_FORMAT["vit"], PATTERN.get("vit").findall(self._xml)) ) @cached_property def voie(self): """`lane` cached values for all vehicles in network Returns: tuple: cached `voie` values """ return tuple( map(FIELD_FORMAT["voie"], PATTERN.get("voie").findall(self._xml)) ) @cached_property def z(self): """`elevation` cached values for all vehicles in network Returns: tuple: cached `z` values """ return tuple(map(float, PATTERN.get("z").findall(self._xml))) @cached_property def traj(self): """Trajectory cached values for all vehicles in network Returns: tuple: cached `traj` values """ return tuple( XMLTrajectory._typeconvert(x) for x in PATTERN.get("traj").findall(self._xml) ) @cached_property def inst(self): """`val` simulation time instant for current trajectory Returns: float: simulation time """ return float(PATTERN.get("inst").findall(self._xml)[0]) @cached_property def nbveh(self): """`nbveh` simulation time instant for current trajectory Returns: int: number of vehicles """ return int(PATTERN.get("nbveh").findall(self._xml)[0]) @cached_property def todict(self): """Converts to dictionary any of the data in the""" # Relies on order return tuple(dict(zip(FIELD_DATA.values(), x)) for x in self.traj) @classmethod def _typeconvert(cls, data: tuple): return tuple(a(b) for a, b in zip(CAV_TYPE, data))