Source code for ensemble.control.operational.operationalLayerTest

import os
import ctypes
ERROR_CODE = -1
import numpy as np

[docs]class oem(object): def __init__(self, ): self.a = [] self.v = [] self.x = []
[docs]class operationalLayerTest(): def __init__(self,): self.loadingSucceded = False self.loadLib() # self.lead_speed_ts = [] # self.lead_accel_ts = [] # self.lead_position_ts = [] # self.veh_speed_ts = [] # self.veh_accel_ts = [] # self.veh_position_ts = [] self.testDurationInSeconds = 600 if self.loadingSucceded: self.initializePars() self.veh_autonomous_operational_acceleration = ctypes.c_double(0) self.veh_autonomous_operational_mixingmode = ctypes.c_long(1) self.veh_autonomous_operational_warning = ctypes.c_double(1) self.veh_cc_setpoint = ctypes.c_double(1) self.veh_cruisecontrol_acceleration = ctypes.c_double(1) self.success = ctypes.c_int(0) self.initializePars() self.sPars = dict()
[docs] def run(self,): self.call() self.printOutput()
[docs] def loadLib(self,): import platform import sys if platform.system() == 'Windows': print('Running on win') file_dll = '../x64/Debug/OperationalDLL.dll' elif platform.system() == 'Darwin': print('Running on mac') file_dll = '../../libs/darwin/OperationalDLL.dylib' else: print('System not supported') sys.exit() # Load operational DLL self.lib = None try: self.lib = ctypes.cdll.LoadLibrary(file_dll) self.loadingSucceded = True except: print('Error: DLL file could not be found')
[docs] def initializePars(self,): self.varDict = {'curr_lead_veh_acceleration':2.0, 'curr_lead_veh_id':40, 'curr_lead_veh_rel_velocity':-1.0, 'curr_lead_veh_type':10, 'curr_timestep':55.0, 'curr_ts_length':.1, 'curr_veh_id':10, 'curr_veh_setspeed':88/3.6, 'curr_veh_type':10, 'curr_veh_controller_in_use':10, 'curr_veh_ACC_h':1.6, 'curr_veh_CACC_h':.6, 'curr_veh_used_distance_headway':20.0, 'curr_veh_used_rel_vel':-1.0, 'curr_veh_velocity':85./3.6, 'curr_veh_autonomous_operational_warning':10, 'curr_veh_platooning_max_acceleration':2.0, 'prev_veh_cc_setpoint':85./3.6, 'prev_veh_cruisecontrol_acceleration':2.0, 'prev_veh_distance_headway':20.0, 'prev_veh_executed_acceleration':-2.0}
[docs] def setMetaPars(self, ): self.sPars['v0'] = 32 self.sPars['lead_position'] = 100
[docs] def setPars(self, scenario = 'stop-go'): self.varDict['curr_lead_veh_acceleration'] = 0 #SET self.varDict['curr_lead_veh_id'] = 1 self.varDict['curr_lead_veh_rel_velocity'] = 0 #--> observed self.varDict['curr_lead_veh_type'] = 1 # self.varDict['curr_timestep'] = 0 self.varDict['curr_ts_length'] = .1 self.varDict['curr_veh_id'] = 2 self.varDict['curr_veh_setspeed'] = 32 #--> this is important! self.varDict['curr_veh_type'] = 1 self.varDict['curr_veh_controller_in_use'] = 2 # acc 1 cacc 2 self.varDict['curr_veh_ACC_h'] = 1.6 # [s] self.varDict['curr_veh_CACC_h'] = .6 self.varDict['curr_veh_used_rel_vel'] = self.varDict['curr_lead_veh_rel_velocity'] #--> delayed for ACC #this self.varDict['curr_veh_velocity'] = self.sPars['v0'] # m/s! self.varDict['curr_veh_autonomous_operational_warning'] = 0 #0 self.varDict['curr_veh_platooning_max_acceleration'] = 1 #2 self.varDict['prev_veh_cc_setpoint'] = 32 self.varDict['prev_veh_cruisecontrol_acceleration'] = 0 # to prevent jerk self.lead_speed = self.varDict['curr_veh_velocity'] - self.varDict['curr_lead_veh_rel_velocity'] self.varDict['curr_veh_used_distance_headway'] = ( self.varDict['curr_veh_velocity'] * self.varDict['curr_veh_ACC_h'] + 2 ) # self.varDict['prev_veh_distance_headway'] = self.varDict['curr_veh_used_distance_headway'] self.lead_position = self.sPars['lead_position'] self.veh_length = 4 self.veh_position = self.lead_position - self.varDict['curr_veh_used_distance_headway'] - self.veh_length if scenario == 'cut-in': self.veh_position -= self.sPars['extraGap'] self.varDict['prev_veh_executed_acceleration'] = 0 #
[docs] def setAloneScenario(self,): self.varDict['curr_lead_veh_acceleration'] = 999 #SET self.varDict['curr_lead_veh_id'] = 1 self.varDict['curr_lead_veh_rel_velocity'] = 0 #--> observed self.varDict['curr_lead_veh_type'] = 999 # self.varDict['curr_timestep'] = 0 self.varDict['curr_ts_length'] = .1 self.varDict['curr_veh_id'] = 2 self.varDict['curr_veh_setspeed'] = 32 #--> this is important! self.varDict['curr_veh_type'] = 1 self.varDict['curr_veh_controller_in_use'] = 2 # acc 1 cacc 2 self.varDict['curr_veh_ACC_h'] = 1.6 # [s] self.varDict['curr_veh_CACC_h'] = .6 self.varDict['curr_veh_used_rel_vel'] = 0 #--> delayed for ACC #this self.varDict['curr_veh_velocity'] = 32 # m/s! self.varDict['curr_veh_autonomous_operational_warning'] = 0 #0 self.varDict['curr_veh_platooning_max_acceleration'] = 1 #2 self.varDict['prev_veh_cc_setpoint'] = 32 self.varDict['prev_veh_cruisecontrol_acceleration'] = 0 # to prevent jerk self.varDict['prev_veh_executed_acceleration'] = 0 # self.lead_speed = self.varDict['curr_veh_velocity'] - self.varDict['curr_lead_veh_rel_velocity'] self.varDict['curr_veh_used_distance_headway'] = ( self.varDict['curr_veh_velocity'] * self.varDict['curr_veh_ACC_h'] + 2 ) # self.varDict['prev_veh_distance_headway'] = self.varDict['curr_veh_used_distance_headway'] self.lead_position = 100 self.veh_length = 4 self.veh_position = self.lead_position - self.varDict['curr_veh_used_distance_headway'] - self.veh_length deltat = self.varDict['curr_ts_length'] self.time = np.arange(0,self.testDurationInSeconds,deltat) lead_accel = self.varDict['curr_lead_veh_acceleration'] lead_speed = self.lead_speed lead_position = self.lead_position for t in self.time: self.lead_speed_ts.append(lead_speed) self.lead_accel_ts.append(lead_accel) self.lead_position_ts.append(lead_position) self.lead_speed_ts = np.array(self.lead_speed_ts) self.lead_accel_ts = np.array(self.lead_accel_ts) self.lead_position_ts = np.array(self.lead_position_ts)
[docs] def defineLeaderProfile(self, scenario = 'stop-go'): vehicle = oem() self.setPars(scenario=scenario) stopTime = 0 interval = 50 #initial values deltat = self.varDict['curr_ts_length'] lead_accel = self.varDict['curr_lead_veh_acceleration'] lead_speed = self.lead_speed lead_position = self.lead_position self.time = np.arange(0,self.testDurationInSeconds,deltat) speed_ts = [] accel_ts = [] position_ts = [] #evolved values if scenario == 'stop-go': for t in self.time: if t == interval: lead_accel = -9.8/80 if t > interval and lead_accel == -9.8/80 and lead_speed <= 0: lead_accel = 0 stopTime = t if t > interval and lead_accel == 0 and (t-stopTime) > interval: lead_accel = 9.8/80 lead_speed = lead_speed + lead_accel * deltat lead_position = ( + lead_position + lead_speed * deltat + .5 * lead_accel * deltat**2 ) speed_ts.append(lead_speed) accel_ts.append(lead_accel) position_ts.append(lead_position) elif scenario == 'cut-in': for t in self.time: lead_position = ( + lead_position + lead_speed * deltat + .5 * lead_accel * deltat**2 ) speed_ts.append(lead_speed) accel_ts.append(lead_accel) position_ts.append(lead_position) vehicle.v = np.array(speed_ts) vehicle.a = np.array(accel_ts) vehicle.x = np.array(position_ts) return vehicle
[docs] def timeEvolveEgo(self, leader, ): #set initial conditions vehicle = oem() self.setPars() speed_ts = [] accel_ts = [] position_ts = [] speed_ts.append(self.varDict['curr_veh_velocity']) accel_ts.append(0) position_ts.append(self.veh_position) for i in range(self.time.shape[0]): #Set time and leader stuff self.varDict['curr_timestep'] = self.time[i] self.varDict['curr_lead_veh_acceleration'] = leader.a[i] self.lead_speed = leader.v[i] self.lead_position = leader.x[i] #call OL self.call() #Set follower self.veh_position = ( + self.veh_position + self.varDict['curr_veh_velocity'] * self.varDict['curr_ts_length'] + .5 * self.veh_autonomous_operational_acceleration.value * self.varDict['curr_ts_length']**2 ) self.varDict['curr_veh_velocity'] = ( + self.varDict['curr_veh_velocity'] + self.veh_autonomous_operational_acceleration.value * self.varDict['curr_ts_length'] ) self.varDict['curr_lead_veh_rel_velocity'] = self.varDict['curr_veh_velocity'] - self.lead_speed self.varDict['curr_veh_setspeed'] = self.veh_cc_setpoint.value self.varDict['prev_veh_cc_setpoint'] = self.veh_cc_setpoint.value self.varDict['curr_veh_used_distance_headway'] = self.lead_position - self.veh_position - self.veh_length self.varDict['prev_veh_distance_headway'] = self.lead_position - self.veh_position - self.veh_length self.varDict['prev_veh_cruisecontrol_acceleration'] = self.veh_cruisecontrol_acceleration.value self.varDict['prev_veh_executed_acceleration'] = self.veh_autonomous_operational_acceleration.value speed_ts.append(self.varDict['curr_veh_velocity']) accel_ts.append(self.veh_autonomous_operational_acceleration.value) position_ts.append(self.veh_position) vehicle.v = np.array(speed_ts) vehicle.a = np.array(accel_ts) vehicle.x = np.array(position_ts) return vehicle
[docs] def printOutput(self,): # Print the return values if self.success.value > 0: print(self.veh_autonomous_operational_acceleration.value) print(self.veh_autonomous_operational_mixingmode.value) print(self.veh_autonomous_operational_warning.value) print(self.veh_cc_setpoint.value) print(self.veh_cruisecontrol_acceleration.value) else: print('An error occurred while calling DLL')
[docs] def call(self,): curr_lead_veh_acceleration = ctypes.c_double(self.varDict['curr_lead_veh_acceleration']) curr_lead_veh_id = ctypes.c_long(self.varDict['curr_lead_veh_id']) curr_lead_veh_rel_velocity = ctypes.c_double(self.varDict['curr_lead_veh_rel_velocity']) curr_lead_veh_type = ctypes.c_long(self.varDict['curr_lead_veh_type']) curr_timestep = ctypes.c_double(self.varDict['curr_timestep']) curr_ts_length = ctypes.c_double(self.varDict['curr_ts_length']) curr_veh_id = ctypes.c_long(self.varDict['curr_veh_id']) curr_veh_setspeed = ctypes.c_double(self.varDict['curr_veh_setspeed']) curr_veh_type = ctypes.c_long(self.varDict['curr_veh_type']) curr_veh_controller_in_use = ctypes.c_long(self.varDict['curr_veh_controller_in_use']) curr_veh_ACC_h = ctypes.c_double(self.varDict['curr_veh_ACC_h']) curr_veh_CACC_h= ctypes.c_double(self.varDict['curr_veh_CACC_h']) curr_veh_used_distance_headway = ctypes.c_double(self.varDict['curr_veh_used_distance_headway']) curr_veh_used_rel_vel = ctypes.c_double(self.varDict['curr_veh_used_rel_vel']) curr_veh_velocity = ctypes.c_double(self.varDict['curr_veh_velocity']) curr_veh_autonomous_operational_warning = ctypes.c_long(self.varDict['curr_veh_autonomous_operational_warning']) curr_veh_platooning_max_acceleration = ctypes.c_double(self.varDict['curr_veh_platooning_max_acceleration']) prev_veh_cc_setpoint = ctypes.c_double(self.varDict['prev_veh_cc_setpoint']) prev_veh_cruisecontrol_acceleration = ctypes.c_double(self.varDict['prev_veh_cruisecontrol_acceleration']) prev_veh_distance_headway = ctypes.c_double(self.varDict['prev_veh_distance_headway']) prev_veh_executed_acceleration = ctypes.c_double(self.varDict['prev_veh_executed_acceleration']) # Define variables for return values: These are placeholders, no action required self.veh_autonomous_operational_acceleration = ctypes.c_double(1) self.veh_autonomous_operational_mixingmode = ctypes.c_long(1) self.veh_autonomous_operational_warning = ctypes.c_double(1) self.veh_cc_setpoint = ctypes.c_double(1) self.veh_cruisecontrol_acceleration = ctypes.c_double(1) self.success = ctypes.c_int(0) self.lib.operational_controller( curr_lead_veh_acceleration, curr_lead_veh_id, curr_lead_veh_rel_velocity, curr_lead_veh_type, curr_timestep, curr_ts_length, curr_veh_id, curr_veh_setspeed, curr_veh_type, curr_veh_controller_in_use, curr_veh_ACC_h, curr_veh_CACC_h, curr_veh_used_distance_headway, curr_veh_used_rel_vel, curr_veh_velocity, curr_veh_autonomous_operational_warning, curr_veh_platooning_max_acceleration, prev_veh_cc_setpoint, prev_veh_cruisecontrol_acceleration, prev_veh_distance_headway, prev_veh_executed_acceleration, ctypes.byref(self.veh_autonomous_operational_acceleration), ctypes.byref(self.veh_autonomous_operational_mixingmode), ctypes.byref(self.veh_autonomous_operational_warning), ctypes.byref(self.veh_cc_setpoint), ctypes.byref(self.veh_cruisecontrol_acceleration), ctypes.byref(self.success))
[docs] def runFullScenario(self,): self.sPars['extraGap'] = 30 self.sPars['lead_position'] = 800 self.setMetaPars() l = self.defineLeaderProfile(scenario = 'cut-in') self.sPars['lead_position'] = 800 + 25 v4 = self.timeEvolveEgo(leader = l) for i in range(0,self.testDurationInSeconds): if i < 500: v4.x[i] = l.x[i]; self.sPars['extraGap'] = 0 self.sPars['lead_position'] = 800 self.sPars['lead_position'] = v4.x[0] v = self.timeEvolveEgo(v4) self.sPars['lead_position'] = v.x[0] v2 = self.timeEvolveEgo(v) self.sPars['lead_position'] = v2.x[0] v3 = self.timeEvolveEgo(v2) self.sPars['lead_position'] = v3.x[0] fleet = [] fleet.append(l) fleet.append(v) fleet.append(v2) fleet.append(v3) fleet.append(v4) return fleet
[docs]def main(): olt = operationalLayerTest() olt.runFullScenario()
# olt.setPars() #olt.run() # v = olt.defineLeaderProfile() # v1 = olt.timeEvolveEgo(leader = v) if __name__ == '__main__': main()