"""
Vissim Stream
================
This module is able to receive the stream of data comming from the Vissim platform and define a parser for a specific vehicle data suitable to perform platooning activities.
"""
# ============================================================================
# STANDARD IMPORTS
# ============================================================================
from typing import Union, Dict, List, Tuple
from collections import defaultdict
# ============================================================================
# INTERNAL IMPORTS
# ============================================================================
from ensemble.metaclass.stream import DataQuery
import ensemble.tools.constants as ct
from ensemble.component.vehiclelist import VehicleList
# ============================================================================
# CLASS AND DEFINITIONS
# ============================================================================
vtypes = Union[float, int, str]
vdata = Tuple[vtypes]
vmaps = Dict[str, vtypes]
vlists = List[vmaps]
response = defaultdict(lambda: False)
simresponse = List[List]
[docs]class SimulatorRequest(DataQuery):
def __init__(self, **kwargs):
super().__init__(**kwargs)
# initialize extra things here!
# =========================================================================
# MEMORY HANDLING
# =========================================================================
@property
def query(self):
"""String response from the simulator"""
return self._str_response
@query.setter
def query(self, response):
self._str_response = response
self.update_vehicle_registry()
self.dispatch()
@property
def current_time(self) -> float:
"Current simulation second"
# TODO: How is this updated?
return self.sim_sec
@property
def current_nbveh(self) -> int:
"""Number of vehicles in the network"""
# TODO: What is self._str_response?
return len(self._str_response)
# =========================================================================
# METHODS
# =========================================================================
[docs] def create_vehicle_registry(self):
"""Creates a vehicle registry for all vehicles in simulation"""
self.vehicle_registry = VehicleList(self)
[docs] def update_vehicle_registry(self):
"""Updates vehicle registry in case it exists"""
if hasattr(self, "vehicle_registry"):
self.vehicle_registry.update_list()
return
self.create_vehicle_registry()
[docs] def get_vehicle_data(self) -> list:
"""Extracts vehicles information from simulators response
Args:
response (list): List of list with parameters
Return:
listdict (list): List of dictionaries
"""
if self.query is not None:
if isinstance(self.query, list):
return [SimulatorRequest.transform(d) for d in self.query]
return [SimulatorRequest.transform(self.query)]
return []
[docs] def is_vehicle_driven(self, vehid: int) -> bool:
"""Returns true if the vehicle state is exposed to a driven state
Args:
vehid (str):
vehicle id
Returns:
driven (bool): True if veh is driven
"""
return False