Source code for ensemble.component.vehiclelist

"""
Vehicle List
=============
This module implements a vehicle list model.

Vehicle list is a collection implementation that acts as an instance to trace individual vehicle data of several vehicles.
"""

# ============================================================================
# STANDARD  IMPORTS
# ============================================================================

from dataclasses import asdict
from typing import Iterable, Union
import pandas as pd
import numpy as np
from itertools import groupby

# ============================================================================
# INTERNAL IMPORTS
# ============================================================================

from ensemble.component.vehicle import Vehicle
from ensemble.component.platoon_vehicle import PlatoonVehicle
from ensemble.tools.constants import DCT_PLT_CONST

from ensemble.logic.frozen_set import SortedFrozenSet
from ensemble.logic.publisher import Publisher
from ensemble.tools.geometry import Point

# ============================================================================
# CLASS AND DEFINITIONS
# ============================================================================

PLT_TYPE = DCT_PLT_CONST["platoon_types"]
EMPTY_MESSAGE = "\tNo vehicles have been registered"
VehType = Union[Vehicle, PlatoonVehicle]


[docs]class VehicleList(SortedFrozenSet, Publisher): """Class defining a set of vehicles. This class is based on a sorted frozen set and supports multiple operations in between sets. You can define a list based on a simluator request and the list will update automatically via a single method. Args: request (Publisher): Publisher of information Example: Define a list of vehicles to trace the requests :: >>> simrequest = SimulatorRequest() >>> simrequest.query = one_vehicle_xml >>> vl = VehicleList(simrequest) >>> simrequest.query = second_vehicle_xml >>> vl.update_list() # This updates manually Example: Define a list of vehicles to be used manually:: >>> simrequest = SimulatorRequest() >>> vl = VehicleList(simrequest) >>> v1 = Vehicle(simrequest, vehid=1) >>> v2 = Vehicle(simrequest, vehid=2) >>> vl.update_list(optional=[v1,v2]) # Adds vehicles to the list The list could be eventually updated as an observer but for simplicity reasons it is kept like this. """ _cumul = set() def __init__(self, request): self._request = request data = ( PlatoonVehicle(request, **v) if v.get("vehtype") in PLT_TYPE else Vehicle(request, **v) for v in request.get_vehicle_data() ) self._free = [] self.__class__._cumul = self.__class__._cumul.union( request.get_vehicles_property("vehid") ) SortedFrozenSet.__init__(self, tuple(data)) Publisher.__init__(self)
[docs] def update_list(self, extra: Iterable[Vehicle] = []): """Update vehicle data according to an update in the request.""" newveh = [] # Create only new vehicles for v in self._request.get_vehicle_data(): if v.get("vehid") not in self.__class__._cumul: if v.get("vehtype") in PLT_TYPE: newveh.append(PlatoonVehicle(self._request, **v)) self.__class__._cumul.add(v.get("vehid")) else: newveh.append(Vehicle(self._request, **v)) self.__class__._cumul.add(v.get("vehid")) lead = self.get_leader(newveh[-1]) newveh[-1].leadid = lead.vehid follow = self.get_follower(newveh[-1]) newveh[-1].followid = follow.vehid data = SortedFrozenSet(self._items).union(newveh) data = data.union(extra) # Put vehicles on list self._items = data._items # Take out exciting vehicles for veh in self._items: if veh.vehid not in self._request.get_vehicles_property( "vehid" ) and not bool( extra ): # extra arguments self.release(veh) # Publish for followers self.dispatch() self.update_leaders() self.update_followers()
[docs] def release(self, veh: VehType): """Moves a vehicle to a free list so that it is not considered in the Args: r (VehType): Vehicle object """ self._items.remove(veh) self.__class__._cumul.remove(veh.vehid) self._free.append(veh)
def _get_vehicles_attribute(self, attribute: str) -> pd.Series: """Retrieve list of parameters Args: attribute (str): One of the vehicles attribute e.g. 'distance' Returns dataframe (series): Returns values for a set of vehicles """ return self.pandas_print()[attribute] @property def acceleration(self) -> pd.Series: """Returns all vehicle's accelerations""" return self._get_vehicles_attribute("acceleration") @property def speed(self) -> pd.Series: """Returns all vehicle's accelerations""" return self._get_vehicles_attribute("speed") @property def distance(self) -> pd.Series: """Returns all vehicle's accelerations""" return self._get_vehicles_attribute("distance") @property def link(self) -> pd.Series: """Returns all vehicle's link""" return self._get_vehicles_attribute("link") @property def lane(self) -> pd.Series: """Returns all vehicle's lane""" return self._get_vehicles_attribute("lane") @property def abscissa(self) -> pd.Series: """Returns all vehicle's abscissa""" return self._get_vehicles_attribute("abscissa") @property def ordinate(self) -> pd.Series: """Returns all vehicle's ordinate""" return self._get_vehicles_attribute("ordinate") @property def vehid(self) -> pd.Series: """Returns all vehicle's vehid""" return self._get_vehicles_attribute("vehid") @property def leadid(self) -> pd.Series: """Returns all leader vehicle's vehid""" return self._get_vehicles_attribute("leadid")
[docs] def distance_filter( self, ego: Vehicle, type: str = "downstream", property="distance", radius: float = 100, ): """ Returns all vehicles' downstream or """ case = { "downstream": { getattr(v, "vehid"): getattr(v, property) for v in self._items if v.distance > ego.distance and v.distance < ego.distance + radius }, "upstream": { getattr(v, "vehid"): getattr(v, property) for v in self._items if v.distance < ego.distance and v.distance > ego.distance - radius }, "all": { getattr(v, "vehid"): getattr(v, property) for v in self._items if np.linalg.norm( [v.abscissa - ego.abscissa, v.ordinate - ego.ordinate] ) < radius }, } return case.get(type)
[docs] def get_leader(self, ego: Vehicle, distance: float = 100) -> Vehicle: """Returns ego vehicle immediate leader""" downstreamdistance = self.distance_filter( ego, "downstream", property="_distance", radius=distance ) # Former case, if we detect vehicles downstream if downstreamdistance: array = np.array(tuple(downstreamdistance.values())) idx = (np.abs(array - ego.distance)).argmin() closest = array[idx] veh = [v for v in self._items if v.distance == closest] ego.leadid = veh[0].vehid return veh[0] radiusids = self.distance_filter( ego, "all", property="lane", radius=distance ) # Get the average points beyond ego_pos = Point(ego.abscissa, ego.ordinate) points_in_radious = { x.vehid: Point(x.abscissa, x.ordinate).isinfrontof(ego_pos) for x in self if x.vehid != ego.vehid and x.vehid in radiusids.keys() } if points_in_radious == {}: # No vehicles in radious or I am traveling alone ego.leadid = ego.vehid return ego candidates = { x.vehid: ego_pos.distanceto(Point(x.abscissa, x.ordinate)) for x in self._items if ego_pos.isbehindof(Point(x.abscissa, x.ordinate)) } if candidates == {}: # No points beyond so I am my leader with followers ego.leadid = ego.vehid return ego distances = np.asarray(list(candidates.values())) idx = distances.argmin() leader = self[list(candidates.keys())[idx]] ego.leadid = leader.vehid return leader
[docs] def update_leaders(self): """Updates all vehicles leaders""" for veh in self: self.get_leader(veh)
[docs] def get_follower(self, ego: Vehicle, distance: float = 100) -> Vehicle: """ Returns ego vehicle immediate follower """ upstreamdistance = self.distance_filter( ego, "upstream", property="distance", radius=distance ) # Former case, if we detect vehicles upstream if upstreamdistance: array = np.array(tuple(upstreamdistance.values())) idx = (np.abs(array - ego.distance)).argmin() closest = array[idx] veh = [v for v in self._items if v.distance == closest] ego.followid = veh[0].vehid return veh[0] radiusids = self.distance_filter( ego, "all", property="lane", radius=distance ) # Get the average points behind ego_pos = Point(ego.abscissa, ego.ordinate) points_in_radious = { x.vehid: Point(x.abscissa, x.ordinate).isbehindof(ego_pos) for x in self if x.vehid != ego.vehid and x.vehid in radiusids.keys() } if points_in_radious == {}: # No vehicles in radious or I am traveling alone ego.followid = ego.vehid return ego candidates = { x.vehid: ego_pos.distanceto(Point(x.abscissa, x.ordinate)) for x in self._items if ego_pos.isinfrontof(Point(x.abscissa, x.ordinate)) } if candidates == {}: # No points beyond so I am my leader with followers ego.followid = ego.vehid return ego distances = np.asarray(list(candidates.values())) idx = distances.argmin() follower = self[list(candidates.keys())[idx]] ego.followid = follower.vehid return follower
[docs] def update_followers(self): """Updates all vehicles followers""" for veh in self: self.get_follower(veh)
[docs] def pandas_print(self, columns: Iterable = []) -> pd.DataFrame: """Transforms vehicle list into a pandas for rendering purposes Returns: df (DataFrame): Returns a table with pandas data. """ df = pd.DataFrame([asdict(v) for v in self._items]) return df[columns] if (columns and not df.empty) else df
[docs] def pretty_print(self, columns: list = []) -> str: """Summary of info""" df = self.pandas_print(["vehid"] + columns) return EMPTY_MESSAGE if df.empty else str(df)
def __str__(self): if not self._items: return EMPTY_MESSAGE df = self.pandas_print() return EMPTY_MESSAGE if df.empty else str(df) def __repr__(self): if not self._items: return EMPTY_MESSAGE df = self.pandas_print() return EMPTY_MESSAGE if df.empty else str(df)
[docs] def __iter__(self): """Protocol sorting data by largest distance on link""" self.__tmpit = iter( sorted(self._items, key=lambda x: x.distance, reverse=True) ) return self.__tmpit
def __next__(self): return next(self.__tmpit)