Source code for ensemble.control.operational.reference

"""
Reference Control
=================

    This model is here to create the reference for the operational controller based on a specific time window.
"""

# ============================================================================
# STANDARD  IMPORTS
# ============================================================================

from ensemble.component.dynamics import TIME_STEP
import numpy as np
from typing import Iterable, Iterator
from functools import partial
from dataclasses import dataclass
import matplotlib.pyplot as plt

# ============================================================================
# INTERNAL IMPORTS
# ============================================================================

from ensemble.metaclass.state import AbsState
from ensemble.logic.platoon_states import (
    StandAlone,
    Platooning,
    Joining,
    Splitting,
)

from ensemble.tools.constants import DCT_RUNTIME_PARAM, DCT_PLT_CONST, TIME_STEP

TIME_STEP_OP = DCT_RUNTIME_PARAM["sampling_time_operational"]
# Accounts for initial generation of 1h of sequences
TIME_INTERVAL = DCT_RUNTIME_PARAM["horizon_tactical"]
TIME_GAP = DCT_PLT_CONST["time_gap"]
CRUISE_SPEED = DCT_PLT_CONST["cruise_speed"]

# ============================================================================
# CLASS AND DEFINITIONS
# ============================================================================


[docs]@dataclass class ReferenceHeadway: """This is a class to model the reference for a controller, consume, regenerate trajectory, plot trajectory. The maneuvers are planned on a horizon of 60 seconds and the references are created on streams of 1 second. Each stream once consummed exams the status before generating a new sequence. """ sim_step: float = TIME_STEP time_step: float = TIME_STEP_OP interval: float = TIME_INTERVAL gap0: float = TIME_GAP gapT: float = TIME_GAP v0: float = CRUISE_SPEED VT: float = CRUISE_SPEED current_gap: float = TIME_GAP current_speed: float = CRUISE_SPEED current_time: float = 0 def __iter__(self): self.count = 0 self.chunk_space = iter(next(self.chunks_space)) self.chunk_speed = iter(next(self.chunks_speed)) self.time_running = iter(next(self.time_chunks)) return self def __next__(self): self.count += 1 if self.count <= int(self.sim_step / self.time_step): self.current_gap = next(self.chunk_space) self.current_speed = next(self.chunk_speed) self.current_time = next(self.time_running) return self.current_time, self.current_gap, self.current_speed else: raise StopIteration
[docs] def create_time_gap_hwy(self, state: AbsState): """Creates an array containing the time gap reference for an interval of time based on the state of the vehicle gap. Args: state (AbsState): State of the platoon Returns: Iterable: Iterable array containing time gap values. """ self.horizon = np.arange(0, self.interval, self.time_step) if isinstance(state, Platooning) or isinstance(state, Joining): self.gap0 = self.current_gap self.gapT = TIME_GAP self.v0 = self.current_speed self.VT = CRUISE_SPEED if isinstance(state, Splitting): self.gap0 = self.current_gap self.gapT = 3 * TIME_GAP self.v0 = self.current_speed self.VT = CRUISE_SPEED self.reference_headway = ReferenceHeadway.change_time_gap( self.horizon, self.gap0, self.gapT, ) self.reference_cruise = ReferenceHeadway.change_time_gap( self.horizon, self.v0, self.VT, ) self.chunks_space = iter( np.array_split( self.reference_headway, int(self.interval / self.sim_step), ) ) self.chunks_speed = iter( np.array_split( self.reference_cruise, int(self.interval / self.sim_step), ) ) self.time_chunks = iter( np.array_split( self.horizon + self.current_time, int(self.interval / self.sim_step), ) )
[docs] @staticmethod def sigmoid(x, A: float = 1, a: float = 5, d: int = 30) -> np.ndarray: """Sigmoid function""" return A * 1 / (1 + np.exp(-(x - d) / a))
[docs] @staticmethod def change_time_gap( time_vector: np.ndarray, T0: float = TIME_GAP, TF: float = TIME_GAP, ) -> np.ndarray: """Create a decreasing jump on speed""" delta = TF - T0 vector = ReferenceHeadway.sigmoid(time_vector, A=delta) return T0 + vector
[docs] def plot_case(self, state: AbsState): self.create_time_gap_hwy(state) _, a = plt.subplots(2, 1) a[0].plot(self.horizon, self.reference_headway) a[0].set_ylabel("Time Gap [s]") a[0].grid() a[1].plot(self.horizon, self.reference_cruise) a[1].set_ylabel("Cruise speed [m/s]") a[1].grid() plt.xlabel("Time[s]") plt.show()
if __name__ == "__main__": r = ReferenceHeadway() r.plot_case(Splitting()) c = ["r", "b"] _, a = plt.subplots(2, 1) r.create_time_gap_hwy(Splitting()) for j in range(30): for t, x, v in r: a[0].scatter((t,), (x,), color="red") a[1].scatter((t,), (v,), color="red") r.create_time_gap_hwy(Splitting()) for j in range(60): for t, x, v in r: a[0].scatter((t,), (x,), color="blue") a[1].scatter((t,), (v,), color="blue") r.create_time_gap_hwy(Joining()) for j in range(60): for t, x, v in r: a[0].scatter((t,), (x,), color="orange") a[1].scatter((t,), (v,), color="orange") r.create_time_gap_hwy(Platooning()) for j in range(60): for t, x, v in r: a[0].scatter((t,), (x,), color="green") a[1].scatter((t,), (v,), color="green") plt.show()