Source code for ensemble.handler.vissim.connector

"""
    This module contains objects for modeling a simplified connector to handle vissim
"""

# ============================================================================
# STANDARD  IMPORTS
# ============================================================================


# ============================================================================
# INTERNAL IMPORTS
# ============================================================================

from ensemble.handler.vissim.stream import SimulatorRequest
from ensemble.handler.vissim.configurator import VissimConfigurator
from ensemble.handler.vissim.scenario import VissimScenario

from ensemble.metaclass.connector import AbsConnector


from ensemble.tools.exceptions import (
    EnsembleAPIWarning,
    EnsembleAPILoadFileError,
    EnsembleAPILoadLibraryError,
)

from ensemble.tools.screen import (
    log_verify,
    log_success,
    log_error,
    log_warning,
)

import ensemble.tools.constants as CT

# ============================================================================
# CLASS AND DEFINITIONS
# ============================================================================

try:
    import win32com.client as com
    from pywintypes import com_error
except ModuleNotFoundError:
    log_warning("\t Platform non compatible with Windows")


[docs]class VissimConnector(AbsConnector, VissimConfigurator): """ This models a connector and interactions from the API with the Vissim library. :raises EnsembleAPILoadLibraryError: Raises error when library cannot be loaded """ def __init__(self, **kwargs) -> None: AbsConnector.__init__(self) VissimConfigurator.__init__(self, **kwargs) self.load_simulator()
[docs] def load_simulator(self) -> None: """load Vissim COM interface""" try: lib_vissim = com.gencache.EnsureDispatch(self.library_path) log_success("\t Library successfully loaded!") except OSError: log_error("\t Vissim is currently unavailable!") raise EnsembleAPILoadLibraryError( "Library not found", self.library_path ) except com_error: log_error("\t Vissim is currently unavailable!") lib_vissim = None raise EnsembleAPILoadLibraryError( "Library not found", self.library_path ) self.__library = lib_vissim
[docs] def load_scenario(self, scenario): """checks existance and load scenario . Also get simulation parameters""" if isinstance(scenario, VissimScenario): try: self.__library.LoadNet( scenario.filename, scenario.bread_additional ) self.sim_period = self.__library.Simulation.AttValue( "SimPeriod" ) self.sim_sec = self.__library.Simulation.AttValue("SimSec") self.sim_res = self.__library.Simulation.AttValue("SimRes") self.rand_seed = self.__library.Simulation.AttValue("RandSeed") self.performInitialize(scenario) self.simulation = scenario log_verify("\t Scenario successfully loaded!") return except: raise EnsembleAPILoadFileError( f"\t Simulation network could not be loaded" ) try: self.__library.LoadLayout(scenario.filename_layx) return except: raise EnsembleAPILoadFileError( f"\t Simulation layout could not be loaded" ) EnsembleAPIWarning(f"\tSimulation could not be loaded.")
[docs] def register_simulation(self, scenarioPath: str) -> None: """ Register simulation file within the simulator :param scenarioPath: Path to scenario :type scenarioPath: str """ self.simulation = VissimScenario(scenarioPath)
[docs] def request_answer(self): """ Request simulator answer and maps the data locally """ vehsAttributesNamesVissim = ( "CoordFrontX", "Acceleration", "Pos", "No", "CoordFrontY", "Lane\\Link\\No", "VehType", "Speed", "Lane\\Index", ) vehsAttributes = self.__library.Net.Vehicles.GetMultipleAttributes( vehsAttributesNamesVissim ) vehData = [ dict(zip(vehsAttributesNamesVissim, item)) for item in vehsAttributes ] self.request.query = vehData # List[Dicts] self.request.sim_sec = self.__library.Simulation.AttValue( "SimSec" ) # self.sim_sec
[docs] def run_single_step(self): """Run simulation next step :return: None :rtype: None """ self.__library.Simulation.RunSingleStep()
[docs] def query_data(self) -> int: """Run simulation step by step :return: iteration step :rtype: int To test query, you can use click.echo(self.get_vehicle_data()) after request_answer() """ try: self.request_answer() self.run_single_step() self._c_iter = next(self._n_iter) return self._c_iter except StopIteration: self._bContinue = False return -1
[docs] def push_data(self): """Pushes data back to the simulator""" pass
# ========================================================================= # PROTOCOLS # =========================================================================
[docs] def performConnect(self) -> None: """ Perform simulation connection """ self.load_simulator()
[docs] def performInitialize(self, scenario: VissimScenario) -> None: """ Perform simulation initialization """ self.request = SimulatorRequest() self._n_iter = iter(self.get_simulation_steps()) self._c_iter = next(self._n_iter) self._bContinue = True
[docs] def performPreRoutine(self) -> None: """ Perform simulator preroutine """ raise NotImplementedError
[docs] def performQuery(self) -> None: """ Perform simulator Query """ raise NotImplementedError
# ========================================================================= # ATTRIBUTES # =========================================================================
[docs] def get_simulation_steps(self) -> range: """Gets the list of simulation steps starting from 0 to end of simulation in step of simulation resolution""" return range(0, self.sim_period, self.sim_res)
@property def scenariofilename(self): """Scenario filenamme Returns: filname (str): Absolute path towards the XML input for SymuVia """ return self.simulation.filename @property def get_vehicle_data(self): """Returns the query received from the simulator :return: Request from the simulator :rtype: dict """ return self.request.get_vehicle_data() @property def simulation_step(self): """Current simulation iteration""" return self._c_iter @property def do_next(self) -> bool: """Returns true if the simulation shold continue""" return self._bContinue @property def simulationstep(self) -> float: """Current simulation step""" return self._c_iter