Source code for ensemble.component.vehicle

""" 
Vehicle Model
=============
This module implements a vehicle model.

Vehicle model acts as an instance to trace individual vehicle data and modify vehicle behavior according to given dynamics
"""

# ============================================================================
# STANDARD  IMPORTS
# ============================================================================

from typing import Dict, List
import itertools
import numpy as np
from dataclasses import dataclass, field

# ============================================================================
# INTERNAL IMPORTS
# ============================================================================

from ensemble.logic.subscriber import Subscriber
from ensemble.tools import constants as ct
from ensemble.metaclass.stream import DataQuery
from ensemble.metaclass.dynamics import AbsDynamics


from ensemble.component.dynamics import SampleDynamics

# ============================================================================
# CLASS AND DEFINITIONS
# ============================================================================

sample_dynamics = SampleDynamics()


[docs]@dataclass class Vehicle(Subscriber): """Vehicle class defined for storing data on a single vehicle: You need a Publisher from where the vehicle is going to take data: Args: request (Publisher): Parser or object publishing data Retunrns: vehicle (Vehicle): A Dataclass with vehicle parameters ============================ ================================= **Variable** **Description** ---------------------------- --------------------------------- ``abscissa`` Current coordinate on y axis ``acceleration`` Current acceleration ``distance`` Current distance traveled on link ``elevation`` Current elevation ``lane`` Current lane ``link`` Current road vehicle is traveling ``ordinate`` Current coordinate x axis ``speed`` Current speed ``vehid`` Vehicle id ``vehtype`` Vehicle class ============================ ================================= Example: This is one example on how to register a new vehicle :: >>> req = SimulatorRequest() >>> veh = Vehicle(req) >>> req.dispatch() # This will update vehicle data When having multiple vehicles please indicate the `vehid` before launching the dispatch method. This is because the vehicle object is looks for a vehicle id within the data. Example: This is one example on how to register two vehicles :: >>> req = SimulatorRequest() >>> veh1 = Vehicle(req, vehid=0) >>> veh2 = Vehicle(req, vehid=1) >>> req.dispatch() # This will update vehicle data on both vehicles """ counter = itertools.count() abscissa: float = 0.0 acceleration: float = 0.0 distance: float = 0.0 _distance: float = field(init=False, repr=False, default=0.0) driven: bool = False elevation: float = 0.0 lane: int = 1 link: str = "Zone_001" ordinate: float = 0.0 speed: float = 25.0 vehid: int = 0 vehtype: str = "" leadid: int = 0 followid: int = 0 ttd: float = 0 def __init__( self, request: DataQuery, dynamics: AbsDynamics = sample_dynamics, **kwargs ): """This initializer creates a Vehicle""" # Undefined properties self.count = next(self.__class__.counter) self.dynamics = dynamics self.itinerary = [] # Internal self._ttdprev = 0 self._ttdpivot = 0 self.distance = 0.0 # Optional properties self.update_no_request(**kwargs) super().__init__(request) def __hash__(self): return hash((type(self), self.vehid)) def __eq__(self, veh): if not isinstance(veh, type(self)): return NotImplemented return self.vehid == veh.vehid
[docs] def update(self): """Updates data from publisher""" dataveh = self._publisher.get_vehicle_properties(self.vehid) self.__dict__.update(**dataveh) try: self.distance = dataveh["distance"] # explicit update except KeyError: pass link = getattr(self, "link") if link not in getattr(self, "itinerary"): self.itinerary.append(link)
[docs] def update_no_request(self, **kwargs): """Update vehicle data from specific keyword arguments""" for key, value in kwargs.items(): setattr(self, key, value)
@property def state(self) -> np.ndarray: """Returns current state of the vehicle Returns: np.ndarray: [3d-array] @ k+1 [distance;speed;acceleration] """ return np.array((self.distance, self.speed, self.acceleration)) @property def distance(self): return self._distance @distance.setter def distance(self, value): self._distance = value if self._distance < self._ttdprev: self._ttdpivot += self._ttdprev self._ttddist = self._ttdpivot + self._distance self._ttdprev = self._distance @property def x(self): """Return vehicle travelled ditance""" return self.ttd @property def v(self): """Return vehicle speed""" return self.speed @property def a(self): """Return vehicle acceleration""" return self.acceleration @property def ttd(self): """Total travel distance by a single vehicle""" # this is for the full sequence functionality we need something for a step by step thing. So the idea is that it should check the internals of the for condition, we should keep the pivot, prev, dist as values # pivot = 0 # prev = 0 # dist = 0 # lst = [] # for i in seq: # if i <= prev: # print(f"Prev {prev}, Current {i}") # pivot += prev # print(f"Now pivot {pivot}") # dist = pivot + i # print(f"Current {dist}") # prev = i # lst.append(dist) # return lst return self._ttddist