"""**Vehicle Gap Coordinator**
This module details the implementation of the ``Front Gap`` and ``Rear Gap`` Coordinators existing in each one of the vehicles created when running a platoon. The coordinators have access to a centralized information center called ``Data Query`` to retrieve information in the vecinity of the vehicle.
"""
# ============================================================================
# STANDARD IMPORTS
# ============================================================================
from typing import Union
import numpy as np
from dataclasses import dataclass
from functools import cached_property
# ============================================================================
# INTERNAL IMPORTS
# ============================================================================
from ensemble.component.vehicle import Vehicle
from ensemble.component.platoon_vehicle import PlatoonVehicle
from ensemble.logic.platoon_states import (
StandAlone,
Platooning,
Joining,
Splitting,
)
from ensemble.tools.constants import DCT_PLT_CONST, DCT_RUNTIME_PARAM
from ensemble.metaclass.coordinator import AbsSingleGapCoord
from ensemble.metaclass.controller import AbsController
from ensemble.control.operational.reference import ReferenceHeadway
# ============================================================================
# CLASS AND DEFINITIONS
# ============================================================================
PLState = Union[StandAlone, Platooning, Joining, Splitting]
MAXTRKS = DCT_PLT_CONST["max_platoon_length"]
MAXNDST = DCT_PLT_CONST["max_connection_distance"]
PLT_TYP = DCT_PLT_CONST["platoon_types"]
MAXDSTR = DCT_PLT_CONST["max_gap_error"]
TIME_STEP_OP = DCT_RUNTIME_PARAM["sampling_time_operational"]
KEYS = ("a", "x", "v", "s", "u", "Dv", "Pu", "Ps")
[docs]@dataclass
class VehGapCoordinator(AbsSingleGapCoord):
status: PLState = StandAlone()
platoon: bool = False
comv2x: bool = True
dx_ref: float = 30
platoonid: int = 0
positionid: int = 0
# def __new__(cls, **kwargs):
# if kwargs.get("create"):
# return super(VehGapCoordinator, cls).__new__(cls)
# return None
def __init__(self, vehicle: PlatoonVehicle):
self.ego = vehicle
self._fgc = None
self._rgc = None
self._posid = 0
create_dct = lambda: {key: 0.0 for key in KEYS}
self._ctr_lead_data = create_dct()
self._ctr_ego_data = create_dct()
self._ctr_lead_data["id"] = max(self.ego.vehid - 1, 0)
self._ctr_ego_data["id"] = self.ego.vehid
# Historical data
self._history_state = np.array([(vehicle.x, vehicle.v, vehicle.a)])
self._history_control = np.array([(0,)])
self._history_reference = np.array([0, 0, 0])
[docs] def init_reference(self):
"""Initializes the reference class for the gap coordinator. In particular the initial conditions should be already computed."""
if hasattr(self, "_reference"):
return
if self.leader is self:
self._reference = ReferenceHeadway()
else:
self._reference = ReferenceHeadway(gap0=self.dx)
@property
def reference(self):
return self._reference
def __hash__(self):
return hash((type(self), self.ego.vehid))
def __eq__(self, rhs):
if not isinstance(rhs, type(self)):
return NotImplemented
return self.ego == rhs.ego
def __lt__(self, rhs):
if not isinstance(rhs, type(self)):
return NotImplemented
return self.ego.vehid < rhs.ego.vehid
def __gt__(self, rhs):
if not isinstance(rhs, type(self)):
return NotImplemented
return self.ego.vehid > rhs.ego.vehid
def __leq__(self, rhs):
if not isinstance(rhs, type(self)):
return NotImplemented
return self.ego.vehid <= rhs.ego.vehid
def __geq__(self, rhs):
if not isinstance(rhs, type(self)):
return NotImplemented
return self.ego.vehid >= rhs.ego.vehid
[docs] def solve_state(self) -> PLState:
"""Logic solver for the platoon state machine."""
new_state = self.status.next_state(self)
self.ego.state = new_state
self.reference.create_time_gap_hwy(new_state)
return new_state
@property
def x(self):
"""Ego current position in link"""
return self.history_state[-1][0] # self.ego.ttd
@property
def leader(self) -> AbsSingleGapCoord:
"""Returns the leader vehicle in the platoon"""
return self._fgc if self._fgc is not None else self
@leader.setter
def leader(self, value: AbsSingleGapCoord):
"""Set the leader vehicle gap coordinator"""
self._fgc = value
@property
def follower(self):
"""Returns the follower vehicle in the platoon"""
return self._rgc if self._rgc is not None else self.ego
@property
def is_head(self):
"""Determines if the vehicle is head of the platoon"""
return self.leader is self.ego
@property
def is_tail(self):
"""Determines if the vehicle is tail of the platoon"""
return self.follower is self.ego
@property
def acceleration(self):
"""Acceleration"""
return self.history_state[-1][2] # self.ego.acceleration
@property
def speed(self):
"""Speed"""
return self.history_state[-1][1] # self.ego.speed
@property
def ttd(self):
"""Total travel time"""
return self.ego.ttd
@property
def vehid(self):
"""Vehicle positions"""
return self.ego.vehid
@property
def dx(self):
"""Ego current headway space"""
return (
self.leader.x - self.ego.x
if self.leader.vehid != self.ego.vehid
else MAXNDST
)
@property
def dv(self):
"""Ego current differential speed"""
return self.leader.speed - self.ego.speed
@property
def control(self):
"""Ego current control"""
return self._ctr_ego_data.get("u")
@property
def positionid(self):
"""Platoon id 0-index notation to denote position on the platoon"""
return self._posid
@positionid.setter
def positionid(self, value):
"""Platoon id 0-index notation to denote position on the platoon"""
self._posid = np.clip(value, 0, MAXTRKS)
@property
def history_control(self):
return self._history_control
@property
def last_control(self):
if len(self._history_control.shape) == 1:
return self.history_control
return self._history_control[-1]
@history_control.setter
def history_control(self, value: np.ndarray):
self._history_control = np.vstack((self._history_control, value))
@property
def history_state(self):
return self._history_state
@property
def last_state(self):
if len(self._history_state.shape) == 1:
return self.history_state
return self._history_state[-1]
@history_state.setter
def history_state(self, value: np.ndarray):
self._history_state = np.vstack((self._history_state, value))
@property
def history_reference(self):
return self._history_reference
@history_reference.setter
def history_reference(self, value: np.ndarray):
self._history_reference = np.vstack((self._history_reference, value))
@property
def last_reference(self):
if len(self._history_state.shape) == 1:
return self.history_state
return self._history_state[-1]
@property
def joinable(self):
"""Checks if a vehicle is joinable"""
return (
(self.leader.positionid < MAXTRKS - 1)
and (self.dx < MAXNDST)
and self.leader.comv2x
) and (self.leader is not self)
# self.intruder
@property
def intruder(self):
"""Returns true when the vehtype of my immediate leader is not platoon"""
return NotImplementedError
[docs] def cancel_join_request(self, value: bool = False):
"""Forces ego to abandon platoon mode"""
return not self.joinable or value
[docs] def confirm_platoon(self):
"""Confirms ego platoon mode"""
return abs(self.dx - self.dx_ref) < MAXDSTR
@property
def leader_data(self):
"""Operational leader control data"""
# Lead updates
data = {
"a": self.leader.acceleration,
"x": self.leader.x,
"v": self.leader.speed,
"s": self.leader.dx,
"u": self.leader.control,
"Dv": self.leader.dv,
"Pu": self._ctr_lead_data.get("u"),
"Ps": self._ctr_lead_data.get("s"),
}
self._ctr_lead_data.update(data)
return self._ctr_lead_data
@leader_data.setter
def leader_data(self, value: dict):
"""Operational leader control data (setter) - For manual update. It updates the ego data dictionary to specific values given within a dictionary structure.
* a: real acceleration
* x: postition
* v: speed
* s: spacing
* u: control
* D: delta
* P: past
Args:
value (dict): Dictionary with keys a,x,v,Dv,Pu,Ps
"""
self._ctr_lead_data.update(value)
@property
def ego_data(self):
"""Operational leader control data"""
# Ego updates
data = {
"a": self.acceleration,
"x": self.x,
"v": self.speed,
"s": self.dx,
"u": self.control,
"Dv": self.dv,
"Pu": self._ctr_ego_data.get("u"),
"Ps": self._ctr_ego_data.get("s"),
}
self._ctr_ego_data.update(data)
return self._ctr_ego_data
@ego_data.setter
def ego_data(self, value: dict):
"""Operational ego control data (setter) - For manual update. It updates the ego data dictionary to specific values given within a dictionary structure.
* a: real acceleration
* x: postition
* v: speed
* s: spacing
* u: control
* D: delta
* P: past
Args:
value (dict): Dictionary with keys a,x,v,Dv,Pu,Ps
"""
self._ctr_ego_data.update(value)
[docs] def get_step_data(self):
"""Retrieves current time data for the operational layer"""
return self.leader_data, self.ego_data
[docs] def evolve_control(
self,
control: AbsController,
time: float,
time_step: float = TIME_STEP_OP,
):
"""Considers a one chunk of step evolution of the operational control layer
Args:
control (AbsController): Operational controller
"""
control(self, self.reference, time, time_step)