Source code for ensemble.handler.symuvia.stream

"""
SymuVia Stream
================
This module is able to receive the stream of data comming from the SymuVia platform and define a parser for a specific vehicle data suitable to perform platooning activities.
"""

# ============================================================================
# STANDARD  IMPORTS
# ============================================================================

from xml.parsers.expat import ExpatError
from ctypes import create_string_buffer
from typing import Union, Dict, List, Tuple
from collections import defaultdict

# ============================================================================
# INTERNAL IMPORTS
# ============================================================================

from ensemble.metaclass.stream import DataQuery
import ensemble.tools.constants as ct
from ensemble.handler.symuvia.xmlparser import XMLTrajectory
from ensemble.component.vehiclelist import VehicleList

# ============================================================================
# CLASS AND DEFINITIONS
# ============================================================================

vtypes = Union[float, int, str]
vdata = Tuple[vtypes]
vmaps = Dict[str, vtypes]
vlists = List[vmaps]
response = defaultdict(lambda: False)


[docs]class SimulatorRequest(DataQuery): def __init__(self, **kwargs): super().__init__(**kwargs) self.datatraj = XMLTrajectory(b"") # ========================================================================= # MEMORY HANDLING # ========================================================================= @property def query(self): """String response from the simulator""" return self.datatraj._xml @query.setter def query(self, response: bytes): self.datatraj = XMLTrajectory(response) self.dispatch() self.update_vehicle_registry() @property def current_time(self) -> float: return self.datatraj.inst @property def current_nbveh(self) -> int: return self.datatraj.nbveh @property def data_query(self): """Direct parsing from the string buffer Returns: simdata (OrderedDict): Simulator data parsed from XML """ return self.datatraj.todict # ========================================================================= # METHODS # =========================================================================
[docs] def create_vehicle_registry(self): """Creates a vehicle registry for all vehicles in simulation""" self.vehicle_registry = VehicleList(self)
[docs] def update_vehicle_registry(self): """Updates vehicle registry in case it exists""" if hasattr(self, "vehicle_registry"): self.vehicle_registry.update_list() return self.create_vehicle_registry()
[docs] def get_vehicle_data(self) -> tuple: """Extracts vehicles information from simulators response Returns: t_veh_data (list): list of dictionaries containing vehicle data with correct formatting """ return self.data_query
[docs] def is_vehicle_driven(self, vehid: int) -> bool: """Returns true if the vehicle state is exposed to a driven state Args: vehid (str): vehicle id Returns: driven (bool): True if veh is driven """ if self.is_vehicle_in_network(vehid): forced = tuple( veh.get("driven") == True for veh in self.get_vehicle_data() if veh.get("vehid") == vehid ) return any(forced) return False