diff --git a/docs/CARS.md b/docs/CARS.md index 00b2060507f..6f355cadbdd 100644 --- a/docs/CARS.md +++ b/docs/CARS.md @@ -384,6 +384,7 @@ |Volkswagen|Golf R 2015-19|Adaptive Cruise Control (ACC) & Lane Assist|[Upstream](#upstream)| |Volkswagen|Golf SportsVan 2015-20|Adaptive Cruise Control (ACC) & Lane Assist|[Upstream](#upstream)| |Volkswagen|Grand California 2019-24|Adaptive Cruise Control (ACC) & Lane Assist|[Upstream](#upstream)| +|Volkswagen|ID.4 2021-23|Adaptive Cruise Control (ACC) & Lane Assist|[Dashcam mode](#dashcam)| |Volkswagen|Jetta 2015-18|Adaptive Cruise Control (ACC) & Lane Assist|[Dashcam mode](#dashcam)| |Volkswagen|Jetta 2019-23|Adaptive Cruise Control (ACC) & Lane Assist|[Upstream](#upstream)| |Volkswagen|Jetta GLI 2021-23|Adaptive Cruise Control (ACC) & Lane Assist|[Upstream](#upstream)| diff --git a/opendbc/car/lateral.py b/opendbc/car/lateral.py index 5cc5f86c33e..8f9de22713f 100644 --- a/opendbc/car/lateral.py +++ b/opendbc/car/lateral.py @@ -1,14 +1,23 @@ import math import numpy as np from dataclasses import dataclass -from opendbc.car import structs, rate_limit, DT_CTRL +from opendbc.car import structs, rate_limit, DT_CTRL, ACCELERATION_DUE_TO_GRAVITY from opendbc.car.vehicle_model import VehicleModel +from typing import Tuple FRICTION_THRESHOLD = 0.2 # ISO 11270 ISO_LATERAL_ACCEL = 3.0 # m/s^2 ISO_LATERAL_JERK = 5.0 # m/s^3 +# Add extra tolerance for average banked road since safety doesn't have the roll +AVERAGE_ROAD_ROLL = 0.06 # ~3.4 degrees, 6% superelevation. higher actual roll lowers lateral acceleration +MAX_LATERAL_ACCEL = ISO_LATERAL_ACCEL + (ACCELERATION_DUE_TO_GRAVITY * AVERAGE_ROAD_ROLL) # ~3.6 m/s^2 + + +@dataclass +class CurvatureSteeringLimits: + CURVATURE_MAX: float @dataclass @@ -128,6 +137,43 @@ def apply_steer_angle_limits_vm(apply_angle: float, apply_angle_last: float, v_e return float(np.clip(new_apply_angle, -limits.ANGLE_LIMITS.STEER_ANGLE_MAX, limits.ANGLE_LIMITS.STEER_ANGLE_MAX)) +def get_max_curvature_jerk(v_ego: float, steer_step: int) -> float: + # ISO 11270 + # Lateral jerk + ts_elapsed = steer_step * DT_CTRL + curvature_rate_limit = ISO_LATERAL_JERK / (max(v_ego, 1.0) ** 2) + max_jerk = curvature_rate_limit * ts_elapsed + return max_jerk + + +def get_max_curvature_average(v_ego: float) -> Tuple[float, float]: + max_curvature = MAX_LATERAL_ACCEL / (max(v_ego, 1.0) ** 2) + return -max_curvature, max_curvature + + +def apply_std_curvature_limits(apply_curvature: float, apply_curvature_last: float, v_ego: float, curvature: float, override: bool, + steer_step: int, lat_active: bool, limits: CurvatureSteeringLimits) -> float: + + new_apply_curvature = apply_curvature + + # Lateral jerk + max_jerk = get_max_curvature_jerk(v_ego, steer_step) + curvature_up = apply_curvature_last + max_jerk + curvature_down = apply_curvature_last - max_jerk + + new_apply_curvature = float(np.clip(new_apply_curvature, curvature_down, curvature_up)) + + # Lateral acceleration (average road roll tolerance for safety) + min_curvature, max_curvature = get_max_curvature_average(v_ego) + new_apply_curvature = float(np.clip(new_apply_curvature, min_curvature, max_curvature)) + + # set output curvature as current curvature (if otherwise set to 0 in car controller) + if not lat_active: + new_apply_curvature = curvature + + return float(np.clip(new_apply_curvature, -limits.CURVATURE_MAX, limits.CURVATURE_MAX)) + + def common_fault_avoidance(fault_condition: bool, request: bool, above_limit_frames: int, max_above_limit_frames: int, max_mismatching_frames: int = 1): """ diff --git a/opendbc/car/tests/routes.py b/opendbc/car/tests/routes.py index 2fe80c0f781..8a93b419183 100644 --- a/opendbc/car/tests/routes.py +++ b/opendbc/car/tests/routes.py @@ -277,6 +277,7 @@ class CarTestRoute(NamedTuple): CarTestRoute("6719965b0e1d1737/2023-08-29--06-40-05", TOYOTA.TOYOTA_CHR_TSS2), # hybrid, openpilot longitudinal, radar disabled CarTestRoute("14623aae37e549f3/2021-10-24--01-20-49", TOYOTA.TOYOTA_PRIUS_V), + CarTestRoute("aebd8f1d4ea16066|00000009--b31e222338", VOLKSWAGEN.VOLKSWAGEN_ID4_MK1), CarTestRoute("202c40641158a6e5/2021-09-21--09-43-24", VOLKSWAGEN.VOLKSWAGEN_ARTEON_MK1), CarTestRoute("2c68dda277d887ac/2021-05-11--15-22-20", VOLKSWAGEN.VOLKSWAGEN_ATLAS_MK1), CarTestRoute("ffcd23abbbd02219/2024-02-28--14-59-38", VOLKSWAGEN.VOLKSWAGEN_CADDY_MK3), diff --git a/opendbc/car/torque_data/override.toml b/opendbc/car/torque_data/override.toml index f6bab7603f9..dd7ce754b09 100644 --- a/opendbc/car/torque_data/override.toml +++ b/opendbc/car/torque_data/override.toml @@ -24,6 +24,7 @@ legend = ["LAT_ACCEL_FACTOR", "MAX_LAT_ACCEL_MEASURED", "FRICTION"] "TESLA_MODEL_X" = [nan, 2.5, nan] # Guess +"VOLKSWAGEN_ID4_MK1" = [nan, 2.5, nan] "FORD_BRONCO_SPORT_MK1" = [nan, 1.5, nan] "FORD_ESCAPE_MK4" = [nan, 1.5, nan] "FORD_ESCAPE_MK4_5" = [nan, 1.5, nan] diff --git a/opendbc/car/volkswagen/carcontroller.py b/opendbc/car/volkswagen/carcontroller.py index 1c58faee790..14be034add5 100644 --- a/opendbc/car/volkswagen/carcontroller.py +++ b/opendbc/car/volkswagen/carcontroller.py @@ -1,10 +1,10 @@ import numpy as np from opendbc.can import CANPacker from opendbc.car import Bus, DT_CTRL, structs -from opendbc.car.lateral import apply_driver_steer_torque_limits +from opendbc.car.lateral import apply_driver_steer_torque_limits, apply_std_curvature_limits from opendbc.car.common.conversions import Conversions as CV from opendbc.car.interfaces import CarControllerBase -from opendbc.car.volkswagen import mlbcan, mqbcan, pqcan +from opendbc.car.volkswagen import mebcan, mlbcan, mqbcan, pqcan from opendbc.car.volkswagen.values import CanBus, CarControllerParams, VolkswagenFlags VisualAlert = structs.CarControl.HUDControl.VisualAlert @@ -45,11 +45,16 @@ def __init__(self, dbc_names, CP): self.CCS = pqcan elif CP.flags & VolkswagenFlags.MLB: self.CCS = mlbcan + elif CP.flags & VolkswagenFlags.MEB: + self.CCS = mebcan else: self.CCS = mqbcan self.apply_torque_last = 0 + self.apply_curvature_last = 0. + self.steering_power_last = 0 self.gra_acc_counter_last = None + self.klr_counter_last = None self.hca_mitigation = HCAMitigation(self.CCP) def update(self, CC, CS, now_nanos): @@ -61,14 +66,55 @@ def update(self, CC, CS, now_nanos): if self.frame % self.CCP.STEER_STEP == 0: apply_torque = 0 - if CC.latActive: - new_torque = int(round(actuators.torque * self.CCP.STEER_MAX)) - apply_torque = apply_driver_steer_torque_limits(new_torque, self.apply_torque_last, CS.out.steeringTorque, self.CCP) - - apply_torque = self.hca_mitigation.update(apply_torque, self.apply_torque_last) - hca_enabled = apply_torque != 0 - self.apply_torque_last = apply_torque - can_sends.append(self.CCS.create_steering_control(self.packer_pt, self.CAN.pt, apply_torque, hca_enabled)) + if self.CP.flags & VolkswagenFlags.MEB: + if CC.latActive: + hca_enabled = True + apply_curvature = actuators.curvature + (CS.curvature_meas - CC.currentCurvature) + apply_curvature = apply_std_curvature_limits(apply_curvature, self.apply_curvature_last, CS.out.vEgoRaw, + CS.curvature_meas, False, self.CCP.STEER_STEP, CC.latActive, + self.CCP.CURVATURE_LIMITS) + + min_power = max(self.steering_power_last - self.CCP.STEERING_POWER_STEP, self.CCP.STEERING_POWER_MIN) + max_power = min(self.steering_power_last + self.CCP.STEERING_POWER_STEP, self.CCP.STEERING_POWER_MAX) + target_power_driver = int(np.interp(CS.out.steeringTorque, [self.CCP.STEER_DRIVER_ALLOWANCE, self.CCP.STEER_DRIVER_MAX], + [self.CCP.STEERING_POWER_MAX, self.CCP.STEERING_POWER_MIN])) + target_power = int(np.interp(CS.out.vEgo, [0., 0.5], [self.CCP.STEERING_POWER_MIN, target_power_driver])) + steering_power = min(max(target_power, min_power), max_power) + + else: + if self.steering_power_last > 0: + hca_enabled = True + apply_curvature = float(np.clip(CS.curvature_meas, + -self.CCP.CURVATURE_LIMITS.CURVATURE_MAX, + self.CCP.CURVATURE_LIMITS.CURVATURE_MAX)) + steering_power = max(self.steering_power_last - self.CCP.STEERING_POWER_STEP, 0) + else: + hca_enabled = False + apply_curvature = 0. + steering_power = 0 + + can_sends.append(self.CCS.create_steering_control(self.packer_pt, self.CAN.pt, apply_curvature, hca_enabled, steering_power)) + self.apply_curvature_last = apply_curvature + self.steering_power_last = steering_power + + else: + # Logic to avoid HCA state 4 "refused": + # * Don't steer unless HCA is in state 3 "ready" or 5 "active" + # * Don't steer at standstill + # * Don't send > 3.00 Newton-meters torque + # * Don't send the same torque for > 6 seconds + # * Don't send uninterrupted steering for > 360 seconds + # MQB racks reset the uninterrupted steering timer after a single frame + # of HCA disabled; this is done whenever output happens to be zero. + apply_torque = 0 + if CC.latActive: + new_torque = int(round(actuators.torque * self.CCP.STEER_MAX)) + apply_torque = apply_driver_steer_torque_limits(new_torque, self.apply_torque_last, CS.out.steeringTorque, self.CCP) + + apply_torque = self.hca_mitigation.update(apply_torque, self.apply_torque_last) + hca_enabled = apply_torque != 0 + self.apply_torque_last = apply_torque + can_sends.append(self.CCS.create_steering_control(self.packer_pt, self.CAN.pt, apply_torque, hca_enabled)) if self.CP.flags & VolkswagenFlags.STOCK_HCA_PRESENT: # Pacify VW Emergency Assist driver inactivity detection by changing its view of driver steering input torque @@ -79,6 +125,17 @@ def update(self, CC, CS, now_nanos): ea_simulated_torque = CS.out.steeringTorque can_sends.append(self.CCS.create_eps_update(self.packer_pt, self.CAN.cam, CS.eps_stock_values, ea_simulated_torque)) + # Emergency Assist intervention + if self.CP.flags & VolkswagenFlags.MEB and self.CP.flags & VolkswagenFlags.STOCK_KLR_PRESENT: + # send capacitive steering wheel touched + # propably EA is stock activated only for cars equipped with capacitive steering wheel + # (also stock long does resume from stop as long as hands on is detected additionally to OP resume spam) + klr_send_ready = CS.klr_stock_values["COUNTER"] != self.klr_counter_last + if klr_send_ready: + can_sends.append(mebcan.create_capacitive_wheel_touch(self.packer_pt, self.CAN.cam, CC.latActive, CS.klr_stock_values)) + can_sends.append(mebcan.create_capacitive_wheel_touch(self.packer_pt, self.CAN.pt, CC.latActive, CS.klr_stock_values)) + self.klr_counter_last = CS.klr_stock_values["COUNTER"] + # **** Acceleration Controls ******************************************** # if self.CP.openpilotLongitudinalControl: @@ -126,6 +183,7 @@ def update(self, CC, CS, now_nanos): new_actuators = actuators.as_builder() new_actuators.torque = self.apply_torque_last / self.CCP.STEER_MAX new_actuators.torqueOutputCan = self.apply_torque_last + new_actuators.curvature = float(self.apply_curvature_last) self.gra_acc_counter_last = CS.gra_stock_values["COUNTER"] self.frame += 1 diff --git a/opendbc/car/volkswagen/carstate.py b/opendbc/car/volkswagen/carstate.py index 0eed80fc0ca..c71390fe78d 100644 --- a/opendbc/car/volkswagen/carstate.py +++ b/opendbc/car/volkswagen/carstate.py @@ -13,12 +13,14 @@ def __init__(self, CP): super().__init__(CP) self.frame = 0 self.eps_init_complete = False + self.cruise_recovery_timer = 0 self.CCP = CarControllerParams(CP) self.button_states = {button.event_type: False for button in self.CCP.BUTTONS} self.esp_hold_confirmation = False self.upscale_lead_car_signal = False self.eps_stock_values = False self.acc_type = 0 + self.curvature_meas = 0. def update_button_enable(self, buttonEvents: list[structs.CarState.ButtonEvent]): if not self.CP.pcmCruise: @@ -52,6 +54,8 @@ def update(self, can_parsers) -> structs.CarState: return self.update_pq(pt_cp, cam_cp, ext_cp) elif self.CP.flags & VolkswagenFlags.MLB: return self.update_mlb(pt_cp, cam_cp, ext_cp, alt_cp) + elif self.CP.flags & VolkswagenFlags.MEB: + return self.update_meb(pt_cp, cam_cp, ext_cp) ret = structs.CarState() @@ -126,7 +130,8 @@ def update(self, can_parsers) -> structs.CarState: ret.standstill = ret.vEgoRaw == 0 ret.cruiseState.standstill = self.CP.pcmCruise and self.esp_hold_confirmation ret.cruiseState.nonAdaptive = acc_limiter_mode or speed_limiter_mode - if ret.cruiseState.speed > 90: + ret.cruiseState.speed = ext_cp.vl["MEB_ACC_01"]["ACC_Wunschgeschw_02"] * CV.KPH_TO_MS + if ret.cruiseState.speed > 90: # 255 kph in m/s == no current setpoint ret.cruiseState.speed = 0 self.eps_stock_values = pt_cp.vl["LH_EPS_03"] @@ -152,7 +157,7 @@ def update_pq(self, pt_cp, cam_cp, ext_cp) -> structs.CarState: ret.steeringAngleDeg = pt_cp.vl["Lenkhilfe_3"]["LH3_BLW"] * (1, -1)[int(pt_cp.vl["Lenkhilfe_3"]["LH3_BLWSign"])] ret.steeringRateDeg = pt_cp.vl["Lenkwinkel_1"]["LW1_Lenk_Gesch"] * (1, -1)[int(pt_cp.vl["Lenkwinkel_1"]["LW1_Gesch_Sign"])] ret.steeringTorque = pt_cp.vl["Lenkhilfe_3"]["LH3_LM"] * (1, -1)[int(pt_cp.vl["Lenkhilfe_3"]["LH3_LMSign"])] - ret.steeringPressed = abs(ret.steeringTorque) > self.CCP.STEER_DRIVER_ALLOWANCE + ret.steeringPressed = self.update_steering_pressed(abs(ret.steeringTorque) > self.CCP.STEER_DRIVER_ALLOWANCE, 5) hca_status = self.CCP.hca_status_values.get(pt_cp.vl["Lenkhilfe_2"]["LH2_Sta_HCA"]) ret.steerFaultTemporary, ret.steerFaultPermanent = self.update_hca_state(hca_status) @@ -231,6 +236,69 @@ def update_pq(self, pt_cp, cam_cp, ext_cp) -> structs.CarState: self.frame += 1 return ret + def update_meb(self, pt_cp, cam_cp, ext_cp) -> structs.CarState: + ret = structs.CarState() + + self.parse_wheel_speeds(ret, + pt_cp.vl["ESC_51"]["VL_Radgeschw"], + pt_cp.vl["ESC_51"]["VR_Radgeschw"], + pt_cp.vl["ESC_51"]["HL_Radgeschw"], + pt_cp.vl["ESC_51"]["HR_Radgeschw"], + ) + ret.standstill = ret.vEgoRaw == 0 + + # Update EPS position and state info. For signed values, VW sends the sign in a separate signal. + ret.steeringAngleDeg = pt_cp.vl["LWI_01"]["LWI_Lenkradwinkel"] * (1, -1)[int(pt_cp.vl["LWI_01"]["LWI_VZ_Lenkradwinkel"])] + ret.steeringRateDeg = pt_cp.vl["LWI_01"]["LWI_Lenkradw_Geschw"] * (1, -1)[int(pt_cp.vl["LWI_01"]["LWI_VZ_Lenkradw_Geschw"])] + ret.steeringTorque = pt_cp.vl["LH_EPS_03"]["EPS_Lenkmoment"] * (1, -1)[int(pt_cp.vl["LH_EPS_03"]["EPS_VZ_Lenkmoment"])] + ret.steeringPressed = self.update_steering_pressed(abs(ret.steeringTorque) > self.CCP.STEER_DRIVER_ALLOWANCE, 5) + self.curvature_meas = -pt_cp.vl["QFK_01"]["Curvature"] * (1, -1)[int(pt_cp.vl["QFK_01"]["Curvature_VZ"])] + + if self.CP.flags & VolkswagenFlags.ALT_GEAR: + ret.gearShifter = self.parse_gear_shifter(self.CCP.shifter_values.get(pt_cp.vl["Gateway_73"]["GE_Fahrstufe"], None)) + else: + ret.gearShifter = self.parse_gear_shifter(self.CCP.shifter_values.get(pt_cp.vl["Getriebe_11"]["GE_Fahrstufe"], None)) + drive_mode = ret.gearShifter == GearShifter.drive + + hca_status = self.CCP.hca_status_values.get(pt_cp.vl["QFK_01"]["LatCon_HCA_Status"]) + ret.steerFaultTemporary, ret.steerFaultPermanent = self.update_hca_state(hca_status, drive_mode) + + if self.CP.flags & VolkswagenFlags.STOCK_HCA_PRESENT: + ret.carFaultedNonCritical = bool(cam_cp.vl["HCA_01"]["EA_Ruckfreigabe"]) or cam_cp.vl["HCA_01"]["EA_ACC_Sollstatus"] > 0 # EA + + ret.gasPressed = pt_cp.vl["Motor_51"]["Accel_Pedal_Pressure"] > 0 + ret.brakePressed = bool(pt_cp.vl["Motor_14"]["MO_Fahrer_bremst"]) + ret.parkingBrake = pt_cp.vl["Gateway_73"]["EPB_Status"] in (1, 4) + ret.seatbeltUnlatched = pt_cp.vl["Airbag_02"]["AB_Gurtschloss_FA"] != 3 + ret.espDisabled = bool(pt_cp.vl["ESP_21"]["ESP_Tastung_passiv"]) + + ret.cruiseState.available = pt_cp.vl["Motor_51"]["TSK_Status"] in (2, 3, 4, 5) + ret.cruiseState.enabled = pt_cp.vl["Motor_51"]["TSK_Status"] in (3, 4, 5) + ret.cruiseState.standstill = self.CP.pcmCruise and self.esp_hold_confirmation + accFaulted = pt_cp.vl["Motor_51"]["TSK_Status"] in (6, 7) + ret.accFaulted = self.update_acc_fault(accFaulted, parking_brake=ret.parkingBrake, drive_mode=drive_mode) + + ret.leftBlinker = bool(pt_cp.vl["Blinkmodi_02"]["BM_links"]) + ret.rightBlinker = bool(pt_cp.vl["Blinkmodi_02"]["BM_rechts"]) + + if self.CP.enableBsm: + ret.leftBlindspot = (bool(ext_cp.vl["MEB_Side_Assist_01"]["Blind_Spot_Info_Left"]) or + bool(ext_cp.vl["MEB_Side_Assist_01"]["Blind_Spot_Warn_Left"])) + ret.rightBlindspot = (bool(ext_cp.vl["MEB_Side_Assist_01"]["Blind_Spot_Info_Right"]) or + bool(ext_cp.vl["MEB_Side_Assist_01"]["Blind_Spot_Warn_Right"])) + + + self.eps_stock_values = pt_cp.vl["LH_EPS_03"] + self.ldw_stock_values = cam_cp.vl["LDW_02"] if self.CP.networkLocation == NetworkLocation.fwdCamera else {} + self.gra_stock_values = pt_cp.vl["GRA_ACC_01"] + self.klr_stock_values = pt_cp.vl["KLR_01"] if self.CP.flags & VolkswagenFlags.STOCK_KLR_PRESENT else {} + + ret.buttonEvents = self.create_button_events(pt_cp, self.CCP.BUTTONS) + ret.lowSpeedAlert = self.update_low_speed_alert(ret.vEgo) + + self.frame += 1 + return ret + def update_mlb(self, pt_cp, cam_cp, ext_cp, alt_cp) -> structs.CarState: ret = structs.CarState() @@ -299,6 +367,7 @@ def parse_mlb_mqb_steering_state(self, ret, pt_cp, drive_mode=True): ret.steeringRateDeg = pt_cp.vl["LWI_01"]["LWI_Lenkradw_Geschw"] * (1, -1)[int(pt_cp.vl["LWI_01"]["LWI_VZ_Lenkradw_Geschw"])] ret.steeringTorque = pt_cp.vl["LH_EPS_03"]["EPS_Lenkmoment"] * (1, -1)[int(pt_cp.vl["LH_EPS_03"]["EPS_VZ_Lenkmoment"])] ret.steeringPressed = abs(ret.steeringTorque) > self.CCP.STEER_DRIVER_ALLOWANCE + ret.steeringPressed = self.update_steering_pressed(abs(ret.steeringTorque) > self.CCP.STEER_DRIVER_ALLOWANCE, 5) hca_status = self.CCP.hca_status_values.get(pt_cp.vl["LH_EPS_03"]["EPS_HCA_Status"]) ret.steerFaultTemporary, ret.steerFaultPermanent = self.update_hca_state(hca_status, drive_mode) @@ -312,10 +381,24 @@ def update_hca_state(self, hca_status, drive_mode=True): temp_fault = drive_mode and hca_status in ("REJECTED", "PREEMPTED") or not self.eps_init_complete return temp_fault, perm_fault + def update_acc_fault(self, acc_fault, parking_brake=False, drive_mode=True, recovery_frames_max=100): + # Ignore FAULT when not in drive mode and parked + # do not show misleading error during ignition in parked state + # grant a short time to recover a normal cruise state + fault = acc_fault + if parking_brake and not drive_mode: + fault = False + self.cruise_recovery_timer = self.frame + elif self.frame - self.cruise_recovery_timer < recovery_frames_max: + fault = False + return fault + @staticmethod def get_can_parsers(CP): if CP.flags & VolkswagenFlags.PQ: return CarState.get_can_parsers_pq(CP) + elif CP.flags & VolkswagenFlags.MEB: + return CarState.get_can_parsers_meb(CP) # manually configure some optional and variable-rate/edge-triggered messages pt_messages, cam_messages, alt_messages = [], [], [] @@ -342,3 +425,19 @@ def get_can_parsers_pq(CP): Bus.cam: CANParser(DBC[CP.carFingerprint][Bus.pt], [], CanBus(CP).cam), Bus.alt: CANParser(DBC[CP.carFingerprint][Bus.pt], [], CanBus(CP).alt), } + + @staticmethod + def get_can_parsers_meb(CP): + pt_messages = [ + ("Blinkmodi_02", 1), # From J519 BCM (sent at 1Hz when no lights active, 50Hz when active) + ("Gateway_73", 10), # Gear position (ALT_GEAR) and EPB status + ] + cam_messages = [] + if CP.enableBsm: + bsm_target = cam_messages if CP.networkLocation == NetworkLocation.gateway else pt_messages + bsm_target.append(("MEB_Side_Assist_01", float('nan'))) + return { + Bus.pt: CANParser(DBC[CP.carFingerprint][Bus.pt], pt_messages, CanBus(CP).pt), + Bus.cam: CANParser(DBC[CP.carFingerprint][Bus.pt], cam_messages, CanBus(CP).cam), + Bus.alt: CANParser(DBC[CP.carFingerprint][Bus.pt], [], CanBus(CP).alt), + } diff --git a/opendbc/car/volkswagen/fingerprints.py b/opendbc/car/volkswagen/fingerprints.py index 1c35edf5387..8954d24bf92 100644 --- a/opendbc/car/volkswagen/fingerprints.py +++ b/opendbc/car/volkswagen/fingerprints.py @@ -366,6 +366,18 @@ b'\xf1\x875Q0907572S \xf1\x890780', ], }, + CAR.VOLKSWAGEN_ID4_MK1: { + (Ecu.srs, 0x715, None): [ + b'\xf1\x871EA959655EA\xf1\x890376', + b'\xf1\x875WA959655R \xf1\x890717', + ], + (Ecu.eps, 0x712, None): [ + b'\xf1\x871EA907144AQ\xf1\x895033\xf1\x82\x000_BH0A0_ON', + ], + (Ecu.fwdRadar, 0x757, None): [ + b'\xf1\x871EA907572H \xf1\x890234', + ], + }, CAR.VOLKSWAGEN_JETTA_MK6: { (Ecu.srs, 0x715, None): [ b'\xf1\x875C0959655M \xf1\x890726\xf1\x82\t00NB1108--------24', diff --git a/opendbc/car/volkswagen/interface.py b/opendbc/car/volkswagen/interface.py index 89a8e70c67e..31043c1000a 100644 --- a/opendbc/car/volkswagen/interface.py +++ b/opendbc/car/volkswagen/interface.py @@ -40,6 +40,25 @@ def _get_params(ret: structs.CarParams, candidate: CAR, fingerprint, car_fw, alp ret.networkLocation = NetworkLocation.gateway ret.dashcamOnly = is_release # Release support needs HCA timeout fix, safety validation, revised J533 harness + elif ret.flags & VolkswagenFlags.MEB: + # Set global MEB parameters + safety_configs = [get_safety_config(structs.CarParams.SafetyModel.volkswagenMeb)] + ret.transmissionType = TransmissionType.direct + ret.steerControlType = structs.CarParams.SteerControlType.angle + ret.steerAtStandstill = True + + if any(msg in fingerprint[1] for msg in (0x520, 0x86, 0xFD, 0x13D)): # Airbag_02, LWI_01, ESP_21, QFK_01 + ret.networkLocation = NetworkLocation.gateway + else: + ret.networkLocation = NetworkLocation.fwdCamera + + ret.enableBsm = 0x24C in fingerprint[0] # MEB_Side_Assist_01 + + if 0x25D in fingerprint[0]: # KLR_01 + ret.flags |= VolkswagenFlags.STOCK_KLR_PRESENT.value + + ret.dashcamOnly = is_release + else: # Set global MQB parameters safety_configs = [get_safety_config(structs.CarParams.SafetyModel.volkswagen)] @@ -61,6 +80,8 @@ def _get_params(ret: structs.CarParams, candidate: CAR, fingerprint, car_fw, alp ret.flags |= VolkswagenFlags.STOCK_HCA_PRESENT.value if 0x6B8 in fingerprint[0]: # Kombi_03 ret.flags |= VolkswagenFlags.KOMBI_PRESENT.value + if 0x3DC in fingerprint[0]: # Gateway_73 + ret.flags |= VolkswagenFlags.ALT_GEAR.value # Global lateral tuning defaults, can be overridden per-vehicle @@ -68,6 +89,8 @@ def _get_params(ret: structs.CarParams, candidate: CAR, fingerprint, car_fw, alp if ret.flags & VolkswagenFlags.PQ or ret.flags & VolkswagenFlags.MLB: ret.steerActuatorDelay = 0.2 CarInterfaceBase.configure_torque_tune(candidate, ret.lateralTuning) + elif ret.flags & VolkswagenFlags.MEB: + ret.steerActuatorDelay = 0.3 else: ret.steerActuatorDelay = 0.1 ret.lateralTuning.pid.kpBP = [0.] @@ -78,8 +101,8 @@ def _get_params(ret: structs.CarParams, candidate: CAR, fingerprint, car_fw, alp # Global longitudinal tuning defaults, can be overridden per-vehicle - ret.alphaLongitudinalAvailable = ret.networkLocation == NetworkLocation.gateway or docs - if alpha_long: + ret.alphaLongitudinalAvailable = (ret.networkLocation == NetworkLocation.gateway or docs) and not (ret.flags & VolkswagenFlags.MEB) + if alpha_long and not (ret.flags & VolkswagenFlags.MEB): # Proof-of-concept, prep for E2E only. No radar points available. Panda ALLOW_DEBUG firmware required. ret.openpilotLongitudinalControl = True safety_configs[0].safetyParam |= VolkswagenSafetyFlags.LONG_CONTROL.value diff --git a/opendbc/car/volkswagen/mebcan.py b/opendbc/car/volkswagen/mebcan.py new file mode 100644 index 00000000000..ac3d2b1a80b --- /dev/null +++ b/opendbc/car/volkswagen/mebcan.py @@ -0,0 +1,89 @@ +def create_steering_control(packer, bus, apply_curvature, lkas_enabled, power): + values = { + "Curvature": abs(apply_curvature), # in rad/m + "Curvature_VZ": 1 if apply_curvature > 0 and lkas_enabled else 0, + "Power": power if lkas_enabled else 0, + "RequestStatus": 4 if lkas_enabled else 2, + "HighSendRate": lkas_enabled, + } + return packer.make_can_msg("HCA_03", bus, values) + + +def create_eps_update(packer, bus, eps_stock_values, ea_simulated_torque): + values = {s: eps_stock_values[s] for s in [ + "COUNTER", # Sync counter value to EPS output + "EPS_Lenkungstyp", # EPS rack type + "EPS_Berechneter_LW", # Absolute raw steering angle + "EPS_VZ_BLW", # Raw steering angle sign + "EPS_HCA_Status", # EPS HCA control status + ]} + + values.update({ + # Absolute driver torque input and sign, with EA inactivity mitigation + "EPS_Lenkmoment": abs(ea_simulated_torque), + "EPS_VZ_Lenkmoment": 1 if ea_simulated_torque < 0 else 0, + }) + + return packer.make_can_msg("LH_EPS_03", bus, values) + + +def create_lka_hud_control(packer, bus, ldw_stock_values, lat_active, steering_pressed, hud_alert, hud_control, sound_alert=False): + display_mode = 1 if lat_active else 0 # travel assist style showing yellow lanes when op is active + + values = {} + if len(ldw_stock_values): + values = {s: ldw_stock_values[s] for s in [ + "LDW_SW_Warnung_links", # Blind spot in warning mode on left side due to lane departure + "LDW_SW_Warnung_rechts", # Blind spot in warning mode on right side due to lane departure + "LDW_Seite_DLCTLC", # Direction of most likely lane departure (left or right) + "LDW_DLC", # Lane departure, distance to line crossing + "LDW_TLC", # Lane departure, time to line crossing + ]} + + values.update({ + "LDW_Gong": sound_alert, + "LDW_Status_LED_gelb": 1 if lat_active and steering_pressed else 0, + "LDW_Status_LED_gruen": 1 if lat_active and not steering_pressed else 0, + "LDW_Lernmodus_links": 3 + display_mode if hud_control.leftLaneDepart else 1 + hud_control.leftLaneVisible + display_mode, + "LDW_Lernmodus_rechts": 3 + display_mode if hud_control.rightLaneDepart else 1 + hud_control.rightLaneVisible + display_mode, + "LDW_Texte": hud_alert, + }) + return packer.make_can_msg("LDW_02", bus, values) + + +def create_acc_buttons_control(packer, bus, gra_stock_values, cancel=False, resume=False, up=False, down=False): + values = {s: gra_stock_values[s] for s in [ + "GRA_Hauptschalter", # ACC button, on/off + "GRA_Typ_Hauptschalter", # ACC main button type + "GRA_Codierung", # ACC button configuration/coding + "GRA_Tip_Stufe_2", # unknown related to stalk type + "GRA_ButtonTypeInfo", # unknown related to stalk type + ]} + + values.update({ + "COUNTER": (gra_stock_values["COUNTER"] + 1) % 16, + "GRA_Abbrechen": cancel, + "GRA_Tip_Wiederaufnahme": resume or up, + "GRA_Tip_Setzen": down, + }) + return packer.make_can_msg("GRA_ACC_01", bus, values) + + +def create_capacitive_wheel_touch(packer, bus, lat_active, klr_stock_values): + values = {s: klr_stock_values[s] for s in [ + "COUNTER", + "KLR_Touchintensitaet_1", + "KLR_Touchintensitaet_2", + "KLR_Touchintensitaet_3", + "KLR_Touchauswertung", + ]} + + if lat_active: + values.update({ + "COUNTER": (klr_stock_values["COUNTER"] + 1) % 16, + "KLR_Touchintensitaet_1": 80, + "KLR_Touchintensitaet_2": 200, + "KLR_Touchintensitaet_3": 10, + "KLR_Touchauswertung": 10, + }) + return packer.make_can_msg("KLR_01", bus, values) diff --git a/opendbc/car/volkswagen/mqbcan.py b/opendbc/car/volkswagen/mqbcan.py index 78a2eeff62f..fb370b3e2d0 100644 --- a/opendbc/car/volkswagen/mqbcan.py +++ b/opendbc/car/volkswagen/mqbcan.py @@ -243,6 +243,8 @@ def xor_checksum(address: int, sig, d: bytearray, initial_value: int = 0) -> int 0xC5, 0x91, 0x0F, 0x27, 0x34, 0x04, 0x7F, 0x02], # EA_02 0x20A: [0x9D, 0xE8, 0x36, 0xA1, 0xCA, 0x3B, 0x1D, 0x33, 0xE0, 0xD5, 0xBB, 0x5F, 0xAE, 0x3C, 0x31, 0x9F], # EML_06 + 0x25D: [0xDA, 0x6B, 0x0E, 0xB2, 0x78, 0xBD, 0x5A, 0x81, + 0x7B, 0xD6, 0x41, 0x39, 0x76, 0xB6, 0xD7, 0x35], # KLR_01 0x26B: [0xCE, 0xCC, 0xBD, 0x69, 0xA1, 0x3C, 0x18, 0x76, 0x0F, 0x04, 0xF2, 0x3A, 0x93, 0x24, 0x19, 0x51], # TA_01 0x30C: [0x0F] * 16, # ACC_02 diff --git a/opendbc/car/volkswagen/tests/test_meb_lateral.py b/opendbc/car/volkswagen/tests/test_meb_lateral.py new file mode 100644 index 00000000000..d32d7b96acc --- /dev/null +++ b/opendbc/car/volkswagen/tests/test_meb_lateral.py @@ -0,0 +1,405 @@ +import unittest + +import numpy as np + +from opendbc.can import CANParser +from opendbc.car import Bus, structs +from opendbc.car.car_helpers import interfaces +from opendbc.car.volkswagen.values import CAR, CarControllerParams, DBC, VolkswagenFlags + +VisualAlert = structs.CarControl.HUDControl.VisualAlert + +HCA_03_ADDR = 771 +LDW_02_ADDR = 919 + + +def _make_carstate(CC_inst, vEgo=10.0, steeringTorque=0.0, curvature_meas=0.0, steeringAngleDeg=0.0): + CS = CC_inst.CS + CS.out = structs.CarState() + CS.out.vEgo = vEgo + CS.out.vEgoRaw = vEgo + CS.out.steeringAngleDeg = float(steeringAngleDeg) + CS.out.steeringTorque = float(steeringTorque) + CS.out.steeringPressed = abs(steeringTorque) > CC_inst.CC.CCP.STEER_DRIVER_ALLOWANCE + CS.curvature_meas = float(curvature_meas) + CS.gra_stock_values = {"COUNTER": 0} + CS.ldw_stock_values = {} + CS.eps_stock_values = {} + return CS + + +def _build_cc(latActive=True, curvature=0.0, visualAlert=VisualAlert.none): + CC = structs.CarControl() + CC.enabled = latActive + CC.latActive = latActive + CC.actuators.curvature = float(curvature) + CC.currentCurvature = 0.0 + CC.hudControl.visualAlert = visualAlert + return CC.as_reader() + + +def _decode(addr, dat, signals): + """Decode a single CAN frame using a fresh CANParser tied to vw_meb.""" + parser = CANParser(DBC[CAR.VOLKSWAGEN_ID4_MK1.value][Bus.pt], + [(addr, 0)], 0) + parser.update([(0, [(addr, dat, 0)])]) + return {s: parser.vl[addr][s] for s in signals} + + +class TestMEBLateral(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.CI = interfaces[CAR.VOLKSWAGEN_ID4_MK1.value] + cp = cls.CI.get_params(CAR.VOLKSWAGEN_ID4_MK1.value, {i: {} for i in range(8)}, + [], alpha_long=False, is_release=False, docs=False) + cls.cp = cp + + def setUp(self): + self.inst = self.CI(self.cp) + self.CCP = self.inst.CC.CCP + + # (a) Torque-bar / steeringPressed test + def test_steering_pressed_threshold(self): + """steeringPressed flips True iff |torque| > STEER_DRIVER_ALLOWANCE (raw cNm units).""" + th = self.CCP.STEER_DRIVER_ALLOWANCE + cases = [ + (0, False), + (th - 1, False), + (th, False), # strictly greater than + (th + 1, True), + (-(th + 1),True), + (-(th - 1),False), + (self.CCP.STEER_DRIVER_MAX, True), + ] + for torque, expected in cases: + with self.subTest(torque=torque): + # Replicate the exact CarState predicate used by update_meb() + pressed = abs(torque) > th + self.assertEqual(pressed, expected) + + # (b) Curvature clip / saturation test + def test_curvature_clip_and_encoding(self): + """Out-of-range commanded curvature ramps via the angle framework and saturates at ±STEER_ANGLE_MAX.""" + cmax = CarControllerParams.ANGLE_LIMITS.STEER_ANGLE_MAX / CarControllerParams.MEB_RAD_TO_DEG + # Use low vEgo so the lateral-accel envelope (~3.6/v^2) does not clip below CURVATURE_MAX. + for cmd in (+0.5, -0.5): + with self.subTest(cmd=cmd): + inst = self.CI(self.cp) + CS = _make_carstate(inst, vEgo=3.0) + CC = _build_cc(latActive=True, curvature=cmd) + new_act = None + last_hca = None + # Run long enough for the rate-limited curvature to saturate at the max + for _ in range(2000): + new_act, sends = inst.CC.update(CC, CS, 0) + for m in sends: + if m[0] == HCA_03_ADDR: + last_hca = m + + self.assertAlmostEqual(abs(new_act.curvature), cmax, places=4) + self.assertEqual(new_act.curvature > 0, cmd > 0) + + hca = last_hca + self.assertIsNotNone(hca) + decoded = _decode(HCA_03_ADDR, hca[1], ["Curvature", "Curvature_VZ", "RequestStatus"]) + self.assertAlmostEqual(decoded["Curvature"], cmax, places=3) + self.assertEqual(int(decoded["Curvature_VZ"]), 1 if cmd > 0 else 0) + self.assertEqual(int(decoded["RequestStatus"]), 4) # HCA enabled + + # (b2) Rate-limit test: per-frame |delta| matches the BP/V interpolation at multiple speeds. + def test_curvature_rate_limit(self): + """A large step in commanded curvature is limited by the VM jerk envelope per send cycle.""" + from opendbc.car import DT_CTRL + for v in (5.0, 10.0, 25.0): + with self.subTest(v=v): + inst = self.CI(self.cp) + CS = _make_carstate(inst, vEgo=v) + CC = _build_cc(latActive=True, curvature=0.1) + new_act, _ = inst.CC.update(CC, CS, 0) + # Stub VM (slip=0, sR=1, wb=1) → identity curvature factor: max delta per send + # equals MAX_LATERAL_JERK / v^2 * DT_CTRL * STEER_STEP (in curvature units). + ccp = CarControllerParams + expected_step = ccp.ANGLE_LIMITS.MAX_LATERAL_JERK / (max(v, 1.0) ** 2) * DT_CTRL * ccp.STEER_STEP + self.assertLessEqual(abs(new_act.curvature), expected_step * 1.05) + self.assertGreater(abs(new_act.curvature), expected_step * 0.5) + + def test_wind_down_syncs_to_measured(self): + """latActive=False with steering_power_last>0 ramps curvature toward CS.curvature_meas, not 0.""" + inst = self.CI(self.cp) + # Prime: ramp steering_power up while active + CS_on = _make_carstate(inst, vEgo=10.0) + CC_on = _build_cc(latActive=True, curvature=0.0) + for _ in range(50): + inst.CC.update(CC_on, CS_on, 0) + self.assertGreater(inst.CC.steering_power_last, 0) + + # Switch to inactive; measured curvature is +0.05 — output should move toward it. + CS_off = _make_carstate(inst, vEgo=10.0, curvature_meas=0.05) + CC_off = _build_cc(latActive=False, curvature=0.0) + prior = inst.CC.apply_curvature_last + new_act, sends = inst.CC.update(CC_off, CS_off, 0) + # Moved toward +0.05, not toward 0, and not held at prior + self.assertGreater(new_act.curvature, prior) + self.assertGreater(new_act.curvature, 0.0) + # Wind-down jumps directly to measured curvature (clipped to CURVATURE_MAX) per sunnypilot's verbatim block + # HCA must still be enabled during wind-down (RequestStatus=4) + hca = next((m for m in sends if m[0] == HCA_03_ADDR), None) + self.assertIsNotNone(hca) + decoded = _decode(HCA_03_ADDR, hca[1], ["RequestStatus"]) + self.assertEqual(int(decoded["RequestStatus"]), 4) + + # (c) Steering power ramp test + def test_steering_power_ramp_up_and_down(self): + inst = self.CI(self.cp) + CS = _make_carstate(inst, vEgo=10.0, steeringTorque=0.0) + CC_on = _build_cc(latActive=True, curvature=0.0) + + # Ramp up: increases by STEERING_POWER_STEP per STEER_STEP-aligned cycle until MAX + last_power = 0 + saw_max = False + for _ in range(int(self.CCP.STEERING_POWER_MAX // self.CCP.STEERING_POWER_STEP) * self.CCP.STEER_STEP + 10): + inst.CC.update(CC_on, CS, 0) + cur = inst.CC.steering_power_last + self.assertGreaterEqual(cur, last_power) + self.assertLessEqual(cur, self.CCP.STEERING_POWER_MAX) + if cur == self.CCP.STEERING_POWER_MAX: + saw_max = True + last_power = cur + self.assertTrue(saw_max, "steering_power never reached STEERING_POWER_MAX") + + # Ramp down: latActive=False -> reduces by STEERING_POWER_STEP per cycle to zero + CC_off = _build_cc(latActive=False, curvature=0.0) + last_power = inst.CC.steering_power_last + for _ in range(int(self.CCP.STEERING_POWER_MAX // self.CCP.STEERING_POWER_STEP) * self.CCP.STEER_STEP + 10): + inst.CC.update(CC_off, CS, 0) + cur = inst.CC.steering_power_last + self.assertLessEqual(cur, last_power) + last_power = cur + self.assertEqual(inst.CC.steering_power_last, 0) + + # (d) LDW HUD test + def test_ldw_hud_take_over(self): + inst = self.CI(self.cp) + CS = _make_carstate(inst) + CC = _build_cc(latActive=True, visualAlert=VisualAlert.steerRequired) + # Run for one full LDW cycle so the LDW frame is emitted + sends = [] + for _ in range(self.CCP.LDW_STEP): + _, s = inst.CC.update(CC, CS, 0) + sends.extend(s) + ldw = next((m for m in sends if m[0] == LDW_02_ADDR), None) + self.assertIsNotNone(ldw, "LDW_02 frame not emitted") + decoded = _decode(LDW_02_ADDR, ldw[1], ["LDW_Texte", "LDW_Status_LED_gruen"]) + self.assertEqual(int(decoded["LDW_Texte"]), self.CCP.LDW_MESSAGES["laneAssistTakeOver"]) + # latActive + not pressed => green LED on (UI showing OP active) + self.assertEqual(int(decoded["LDW_Status_LED_gruen"]), 1) + + # (e) HCA disabled gating test + def test_hca_disabled_after_power_ramp_down(self): + inst = self.CI(self.cp) + CS = _make_carstate(inst) + CC_on = _build_cc(latActive=True) + CC_off = _build_cc(latActive=False) + + # Ramp up to MAX, then off, then drain + for _ in range(200): + inst.CC.update(CC_on, CS, 0) + for _ in range(200): + inst.CC.update(CC_off, CS, 0) + + self.assertEqual(inst.CC.steering_power_last, 0) + _, sends = inst.CC.update(CC_off, CS, 0) + hca = next(s for s in sends if s[0] == HCA_03_ADDR) + decoded = _decode(HCA_03_ADDR, hca[1], ["RequestStatus", "Power", "Curvature"]) + self.assertNotEqual(int(decoded["RequestStatus"]), 4) + self.assertEqual(int(decoded["RequestStatus"]), 2) + self.assertEqual(int(decoded["Power"]), 0) + self.assertAlmostEqual(decoded["Curvature"], 0.0, places=4) + + +class TestMEBBlindspot(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.CI = interfaces[CAR.VOLKSWAGEN_ID4_MK1.value] + fingerprint = {i: {} for i in range(8)} + fingerprint[0][0x24C] = 16 # MEB_Side_Assist_01 present on PT bus + cp = cls.CI.get_params(CAR.VOLKSWAGEN_ID4_MK1.value, fingerprint, + [], alpha_long=False, is_release=False, docs=False) + cls.cp = cp + + def test_enable_bsm_set(self): + self.assertTrue(self.cp.enableBsm) + + def test_left_and_right_blindspot(self): + from opendbc.can import CANPacker + inst = self.CI(self.cp) + parsers = inst.CS.get_can_parsers(self.cp) + pt_cp = parsers[Bus.pt] + packer = CANPacker(DBC[CAR.VOLKSWAGEN_ID4_MK1.value][Bus.pt]) + + cases = [ + ({"Blind_Spot_Info_Left": 1, "Blind_Spot_Warn_Left": 0, + "Blind_Spot_Info_Right": 0, "Blind_Spot_Warn_Right": 0}, True, False), + ({"Blind_Spot_Info_Left": 0, "Blind_Spot_Warn_Left": 1, + "Blind_Spot_Info_Right": 0, "Blind_Spot_Warn_Right": 0}, True, False), + ({"Blind_Spot_Info_Left": 0, "Blind_Spot_Warn_Left": 0, + "Blind_Spot_Info_Right": 1, "Blind_Spot_Warn_Right": 0}, False, True), + ({"Blind_Spot_Info_Left": 0, "Blind_Spot_Warn_Left": 0, + "Blind_Spot_Info_Right": 0, "Blind_Spot_Warn_Right": 1}, False, True), + ({"Blind_Spot_Info_Left": 1, "Blind_Spot_Warn_Left": 0, + "Blind_Spot_Info_Right": 1, "Blind_Spot_Warn_Right": 0}, True, True), + ({"Blind_Spot_Info_Left": 0, "Blind_Spot_Warn_Left": 0, + "Blind_Spot_Info_Right": 0, "Blind_Spot_Warn_Right": 0}, False, False), + ] + for vals, exp_left, exp_right in cases: + with self.subTest(**vals): + addr, dat, _ = packer.make_can_msg("MEB_Side_Assist_01", 0, vals) + pt_cp.update([(0, [(addr, dat, 0)])]) + self.assertEqual(bool(pt_cp.vl["MEB_Side_Assist_01"]["Blind_Spot_Info_Left"]), + bool(vals["Blind_Spot_Info_Left"])) + cam_cp = parsers[Bus.cam] + # update() picks ext_cp = pt_cp when networkLocation is fwdCamera (ID.4 MK1 test fingerprint) + ext_cp = pt_cp + ret = inst.CS.update_meb(pt_cp, cam_cp, ext_cp) + self.assertEqual(ret.leftBlindspot, exp_left) + self.assertEqual(ret.rightBlindspot, exp_right) + + +class TestMEBSafetyOracle(unittest.TestCase): + """Drive carcontroller through randomized inputs and confirm panda safety + (volkswagenMeb) accepts every HCA_03 frame it produces. Conversely confirm + that bypassing the rate limit causes safety to reject.""" + + @classmethod + def setUpClass(cls): + cls.CI = interfaces[CAR.VOLKSWAGEN_ID4_MK1.value] + cp = cls.CI.get_params(CAR.VOLKSWAGEN_ID4_MK1.value, {i: {} for i in range(8)}, + [], alpha_long=False, is_release=False, docs=False) + cls.cp = cp + + def setUp(self): + # Lazy import: keeps the rest of this module importable without libsafety + from opendbc.car.structs import CarParams + from opendbc.safety.tests.libsafety import libsafety_py + self.libsafety_py = libsafety_py + self.safety = libsafety_py.libsafety + self.safety.set_safety_hooks(CarParams.SafetyModel.volkswagenMeb, 0) + self.safety.init_tests() + self.safety.set_controls_allowed(1) + + self.inst = self.CI(self.cp) + self.CCP = self.inst.CC.CCP + + def _raw_hca_to_safety_packet(self, dat, addr=771): + return self.libsafety_py.make_CANPacket(addr, 0, dat) + + def test_oracle_random_sequence(self): + rng = np.random.default_rng(0) + CS = _make_carstate(self.inst, vEgo=15.0) + + # Walk through randomized requested curvatures + measured curvatures + sends_us = 0 + for step in range(500): + v = float(rng.uniform(2.0, 30.0)) + cmd = float(rng.uniform(-0.25, 0.25)) + meas = float(rng.uniform(-0.2, 0.2)) + lat_active = bool(rng.integers(0, 2)) or step < 200 # mostly active + + CS.out.vEgo = v + CS.out.vEgoRaw = v + CS.out.steeringAngleDeg = float(rng.uniform(-90, 90)) + CS.curvature_meas = meas + CC = _build_cc(latActive=lat_active, curvature=cmd) + _, sends = self.inst.CC.update(CC, CS, 0) + + # Feed safety the wheel speed so steer_angle_cmd_checks_vm sees the same v_ego + val = int(v * 3.6 / 0.0075) + esc = self.libsafety_py.make_CANPacket(0xFC, 0, bytes(8) + val.to_bytes(2, 'little') * 4) + self.safety.safety_rx_hook(esc) + + for addr, dat, _bus in sends: + if addr != HCA_03_ADDR: + continue + # advance timer 20ms per HCA frame so the rt_angle limit window resets correctly + sends_us += 20000 + self.safety.set_timer(sends_us) + pkt = self._raw_hca_to_safety_packet(dat, addr) + accepted = self.safety.safety_tx_hook(pkt) + self.assertTrue(accepted, f"safety rejected HCA_03 at step {step}: cmd={cmd:.4f} meas={meas:.4f} lat={lat_active}") + + def test_oracle_inactive_nonzero_rejected(self): + """When steer_req=False (RequestStatus!=4), curvature must be zero per safety.""" + from opendbc.can import CANPacker + packer = CANPacker(DBC[CAR.VOLKSWAGEN_ID4_MK1.value][Bus.pt]) + addr, dat, _ = packer.make_can_msg("HCA_03", 0, { + "Curvature": 0.05, "Curvature_VZ": 1, "Power": 0, "RequestStatus": 2, "HighSendRate": 0, + }) + self.assertFalse(self.safety.safety_tx_hook(self._raw_hca_to_safety_packet(dat, addr))) + + +KLR_01_ADDR = 0x25D + + +class TestMEBEmergencyAssist(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.CI = interfaces[CAR.VOLKSWAGEN_ID4_MK1.value] + + def _make_cp(self, klr_present): + fingerprint = {i: {} for i in range(8)} + if klr_present: + fingerprint[0][0x25D] = 8 # KLR_01 + return self.CI.get_params(CAR.VOLKSWAGEN_ID4_MK1.value, fingerprint, + [], alpha_long=False, is_release=False, docs=False) + + def _make_cs(self, inst, counter): + CS = _make_carstate(inst, vEgo=10.0) + CS.klr_stock_values = { + "COUNTER": counter, + "KLR_Touchintensitaet_1": 0, + "KLR_Touchintensitaet_2": 0, + "KLR_Touchintensitaet_3": 0, + "KLR_Touchauswertung": 0, + } + return CS + + def test_klr_flag_set_from_fingerprint(self): + cp = self._make_cp(klr_present=True) + self.assertTrue(cp.flags & VolkswagenFlags.STOCK_KLR_PRESENT) + + def test_klr_flag_not_set_without_fingerprint(self): + cp = self._make_cp(klr_present=False) + self.assertFalse(cp.flags & VolkswagenFlags.STOCK_KLR_PRESENT) + + def test_klr_tx_with_flag(self): + cp = self._make_cp(klr_present=True) + inst = self.CI(cp) + CC = _build_cc(latActive=True) + CS = self._make_cs(inst, counter=3) + _, sends = inst.CC.update(CC, CS, 0) + # two frames (cam + pt) on first send + klr_frames = [m for m in sends if m[0] == KLR_01_ADDR] + self.assertEqual(len(klr_frames), 2) + # second update with same counter -> no new send + _, sends = inst.CC.update(CC, CS, 0) + klr_frames = [m for m in sends if m[0] == KLR_01_ADDR] + self.assertEqual(len(klr_frames), 0) + # counter advances -> send again + CS.klr_stock_values["COUNTER"] = 4 + _, sends = inst.CC.update(CC, CS, 0) + klr_frames = [m for m in sends if m[0] == KLR_01_ADDR] + self.assertEqual(len(klr_frames), 2) + + def test_klr_no_tx_without_flag(self): + cp = self._make_cp(klr_present=False) + inst = self.CI(cp) + CC = _build_cc(latActive=True) + CS = _make_carstate(inst, vEgo=10.0) + for _ in range(10): + _, sends = inst.CC.update(CC, CS, 0) + klr_frames = [m for m in sends if m[0] == KLR_01_ADDR] + self.assertEqual(len(klr_frames), 0) + + +if __name__ == "__main__": + unittest.main() diff --git a/opendbc/car/volkswagen/values.py b/opendbc/car/volkswagen/values.py index 50214ac69b5..4a47352f792 100644 --- a/opendbc/car/volkswagen/values.py +++ b/opendbc/car/volkswagen/values.py @@ -2,10 +2,11 @@ from dataclasses import dataclass, field from enum import Enum, IntFlag, StrEnum -from opendbc.car import Bus, CanBusBase, CarSpecs, DbcDict, PlatformConfig, Platforms, structs, uds +from opendbc.car import ACCELERATION_DUE_TO_GRAVITY, Bus, CanBusBase, CarSpecs, DbcDict, PlatformConfig, Platforms, structs, uds from opendbc.can import CANDefine from opendbc.car.common.conversions import Conversions as CV from opendbc.car.docs_definitions import CarFootnote, CarHarness, CarDocs, CarParts, Column +from opendbc.car.lateral import AngleSteeringLimits, CurvatureSteeringLimits, ISO_LATERAL_ACCEL from opendbc.car.fw_query_definitions import EcuAddrSubAddr, FwQueryConfig, Request, p16 from opendbc.car.vin import Vin @@ -67,6 +68,23 @@ class CarControllerParams: ACCEL_MAX = 2.0 # 2.0 m/s max acceleration ACCEL_MIN = -3.5 # 3.5 m/s max deceleration + # MEB curvature command is rate-limited through apply_steer_angle_limits_vm. + # The helper expects a steering-angle-shaped input in degrees, so the carcontroller + # scales curvature by RAD_TO_DEG before/after the call and pairs it with a stub + # VehicleModel (slip=0, steerRatio=1, wheelbase=1) so the helper math reduces to + # an identity on curvature. The matching safety params use the same stub so the + # Python envelope is always within the safety envelope. + MEB_RAD_TO_DEG = 180.0 / 3.141592653589793 + # Add extra tolerance for average banked road since safety doesn't have the roll + MEB_AVERAGE_ROAD_ROLL = 0.06 + ANGLE_LIMITS: AngleSteeringLimits = AngleSteeringLimits( + 0.195 * MEB_RAD_TO_DEG, # STEER_ANGLE_MAX in curvature*RAD_TO_DEG units + ([], []), + ([], []), + MAX_LATERAL_ACCEL=ISO_LATERAL_ACCEL + (ACCELERATION_DUE_TO_GRAVITY * MEB_AVERAGE_ROAD_ROLL), # ~3.6 m/s^2 + MAX_LATERAL_JERK=3.0 + (ACCELERATION_DUE_TO_GRAVITY * MEB_AVERAGE_ROAD_ROLL), # ~3.6 m/s^3 + ) + def __init__(self, CP): can_define = CANDefine(DBC[CP.carFingerprint][Bus.pt]) @@ -99,6 +117,35 @@ def __init__(self, CP): "laneAssistDeactivTrailer": 5, # "Lane Assist: no function with trailer" } + elif CP.flags & VolkswagenFlags.MEB: + self.LDW_STEP = 10 # LDW_02 message frequency 10Hz + self.ACC_HUD_STEP = 6 + self.STEER_DRIVER_ALLOWANCE = 100 # Driver torque 1.0 Nm, begin steering reduction from MAX + self.STEER_DRIVER_MAX = 300 # Driver torque 3.0 Nm, stop steering reduction at MIN + self.STEERING_POWER_MAX = 50 # HCA_03 maximum steering power, percentage + self.STEERING_POWER_MIN = 4 # HCA_03 minimum steering power, percentage + self.STEERING_POWER_STEP = 2 # HCA_03 steering power counter steps + + self.CURVATURE_LIMITS = CurvatureSteeringLimits(0.195) # Max curvature for steering command, m^-1 + + self.shifter_values = can_define.dv["Getriebe_11"]["GE_Fahrstufe"] + self.hca_status_values = can_define.dv["QFK_01"]["LatCon_HCA_Status"] + + self.BUTTONS = [ + Button(structs.CarState.ButtonEvent.Type.setCruise, "GRA_ACC_01", "GRA_Tip_Setzen", [1]), + Button(structs.CarState.ButtonEvent.Type.resumeCruise, "GRA_ACC_01", "GRA_Tip_Wiederaufnahme", [1]), + Button(structs.CarState.ButtonEvent.Type.accelCruise, "GRA_ACC_01", "GRA_Tip_Hoch", [1]), + Button(structs.CarState.ButtonEvent.Type.decelCruise, "GRA_ACC_01", "GRA_Tip_Runter", [1]), + Button(structs.CarState.ButtonEvent.Type.cancel, "GRA_ACC_01", "GRA_Abbrechen", [1]), + Button(structs.CarState.ButtonEvent.Type.gapAdjustCruise, "GRA_ACC_01", "GRA_Verstellung_Zeitluecke", [3]), + ] + + self.LDW_MESSAGES = { + "none": 0, # Nothing to display + "laneAssistTakeOverUrgent": 4, # "Lane Assist: Please Take Over Steering" (red) + "laneAssistTakeOver": 8, # "Lane Assist: Please Take Over Steering" (white) + } + else: self.LDW_STEP = 10 # LDW_02 message frequency 10Hz self.ACC_HUD_STEP = 6 # ACC_02 message frequency 16Hz @@ -184,10 +231,13 @@ class VolkswagenFlags(IntFlag): # Detected flags STOCK_HCA_PRESENT = 1 KOMBI_PRESENT = 4 + ALT_GEAR = 32 + STOCK_KLR_PRESENT = 64 # Static flags PQ = 2 MLB = 8 + MEB = 16 @dataclass @@ -209,6 +259,16 @@ class VolkswagenMQBPlatformConfig(PlatformConfig): wmis: set[WMI] = field(default_factory=set) +@dataclass +class VolkswagenMEBPlatformConfig(PlatformConfig): + dbc_dict: DbcDict = field(default_factory=lambda: {Bus.pt: 'vw_meb'}) + chassis_codes: set[str] = field(default_factory=set) + wmis: set[WMI] = field(default_factory=set) + + def init(self): + self.flags |= VolkswagenFlags.MEB + + @dataclass class VolkswagenPQPlatformConfig(VolkswagenMQBPlatformConfig): dbc_dict: DbcDict = field(default_factory=lambda: {Bus.pt: 'vw_pq'}) @@ -269,7 +329,7 @@ def init_make(self, CP: structs.CarParams): # FW_VERSIONS for that existing CAR. class CAR(Platforms): - config: VolkswagenMQBPlatformConfig | VolkswagenPQPlatformConfig + config: VolkswagenMQBPlatformConfig | VolkswagenPQPlatformConfig | VolkswagenMEBPlatformConfig VOLKSWAGEN_ARTEON_MK1 = VolkswagenMQBPlatformConfig( [ @@ -331,6 +391,14 @@ class CAR(Platforms): chassis_codes={"5G", "AU", "BA", "BE"}, wmis={WMI.VOLKSWAGEN_MEXICO_CAR, WMI.VOLKSWAGEN_EUROPE_CAR}, ) + VOLKSWAGEN_ID4_MK1 = VolkswagenMEBPlatformConfig( + [ + VWCarDocs("Volkswagen ID.4 2021-23"), + ], + VolkswagenCarSpecs(mass=2224, wheelbase=2.77), + chassis_codes={"E2"}, + wmis={WMI.VOLKSWAGEN_USA_SUV, WMI.VOLKSWAGEN_EUROPE_CAR, WMI.VOLKSWAGEN_EUROPE_SUV}, + ) VOLKSWAGEN_JETTA_MK6 = VolkswagenPQPlatformConfig( [VWCarDocs("Volkswagen Jetta 2015-18")], VolkswagenCarSpecs(mass=1518, wheelbase=2.65, minSteerSpeed=50 * CV.KPH_TO_MS, minEnableSpeed=20 * CV.KPH_TO_MS), diff --git a/opendbc/safety/ignition.h b/opendbc/safety/ignition.h new file mode 100644 index 00000000000..ae00c72965b --- /dev/null +++ b/opendbc/safety/ignition.h @@ -0,0 +1,77 @@ +#pragma once + +#include +#include + +#include "opendbc/safety/can.h" + +bool ignition_can = false; +uint32_t ignition_can_cnt = 0U; + +void ignition_can_hook(const CANPacket_t *msg) { + if (msg->bus == 0U) { + int len = GET_LEN(msg); + + // GM exception + if ((msg->addr == 0x1F1U) && (len == 8)) { + // SystemPowerMode (2=Run, 3=Crank Request) + ignition_can = (msg->data[0] & 0x2U) != 0U; + ignition_can_cnt = 0U; + } + + // Rivian R1S/T GEN1 exception + if ((msg->addr == 0x152U) && (len == 8)) { + // 0x152 overlaps with Subaru pre-global which has this bit as the high beam + int counter = msg->data[1] & 0xFU; // max is only 14 + + static int prev_counter_rivian = -1; + if ((counter == ((prev_counter_rivian + 1) % 15)) && (prev_counter_rivian != -1)) { + // VDM_OutputSignals->VDM_EpasPowerMode + ignition_can = ((msg->data[7] >> 4U) & 0x3U) == 1U; // VDM_EpasPowerMode_Drive_On=1 + ignition_can_cnt = 0U; + } + prev_counter_rivian = counter; + } + + // Tesla Model 3/Y exception + if ((msg->addr == 0x221U) && (len == 8)) { + // 0x221 overlaps with Rivian which has random data on byte 0 + int counter = msg->data[6] >> 4; + + static int prev_counter_tesla = -1; + if ((counter == ((prev_counter_tesla + 1) % 16)) && (prev_counter_tesla != -1)) { + // VCFRONT_LVPowerState->VCFRONT_vehiclePowerState + int power_state = (msg->data[0] >> 5U) & 0x3U; + ignition_can = power_state == 0x3; // VEHICLE_POWER_STATE_DRIVE=3 + ignition_can_cnt = 0U; + } + prev_counter_tesla = counter; + } + + // Mazda exception + if ((msg->addr == 0x9EU) && (len == 8)) { + ignition_can = (msg->data[0] >> 5) == 0x6U; + ignition_can_cnt = 0U; + } + + // Volkswagen MEB exception + if ((msg->addr == 0x3C0U) && (len == 4)) { + int counter = msg->data[1] & 0xFU; + + static int prev_counter_vw_meb = -1; + if ((counter == ((prev_counter_vw_meb + 1) % 16)) && (prev_counter_vw_meb != -1)) { + // Klemmen_Status_01->ZAS_Kl_15 + ignition_can = ((msg->data[2] >> 1) & 1U) != 0U; + ignition_can_cnt = 0U; + } + prev_counter_vw_meb = counter; + } + } + + // TODO: this is too loose, Teslas have 0x222 + // body v2 exception + // if (((msg->bus == 0U) || (msg->bus == 2U)) && (msg->addr == 0x222U)) { + // ignition_can = true; + // ignition_can_cnt = 0U; + // } +} diff --git a/opendbc/safety/modes/volkswagen_meb.h b/opendbc/safety/modes/volkswagen_meb.h new file mode 100644 index 00000000000..7961fb16834 --- /dev/null +++ b/opendbc/safety/modes/volkswagen_meb.h @@ -0,0 +1,186 @@ +#pragma once + +#include "opendbc/safety/declarations.h" +#include "opendbc/safety/modes/volkswagen_common.h" + +#define MSG_ESC_51 0xFCU // RX, for wheel speeds +#define MSG_HCA_03 0x303U +#define MSG_QFK_01 0x13DU +#define MSG_Motor_51 0x10BU // RX for TSK state and accel pedal +#define MSG_KLR_01 0x25DU // TX, for capacitive steering wheel + +#define VOLKSWAGEN_MEB_CURVATURE_TO_CAN 149253.7313f // 1 / 6.7e-6 +#define VOLKSWAGEN_MEB_MAX_CURVATURE_CAN 29105 + +static uint32_t volkswagen_meb_compute_crc(const CANPacket_t *msg) { + int len = GET_LEN(msg); + + uint8_t crc = 0xFFU; + for (int i = 1; i < len; i++) { + crc ^= (uint8_t)msg->data[i]; + crc = volkswagen_crc8_lut_8h2f[crc]; + } + + uint8_t counter = volkswagen_mqb_meb_get_counter(msg); + if (msg->addr == MSG_LH_EPS_03) { + crc ^= (uint8_t[]){0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5,0xF5}[counter]; + } else if (msg->addr == MSG_GRA_ACC_01) { + crc ^= (uint8_t[]){0x6A,0x38,0xB4,0x27,0x22,0xEF,0xE1,0xBB,0xF8,0x80,0x84,0x49,0xC7,0x9E,0x1E,0x2B}[counter]; + } else if (msg->addr == MSG_QFK_01) { + crc ^= (uint8_t[]){0x20,0xCA,0x68,0xD5,0x1B,0x31,0xE2,0xDA,0x08,0x0A,0xD4,0xDE,0x9C,0xE4,0x35,0x5B}[counter]; + } else if (msg->addr == MSG_ESC_51) { + crc ^= (uint8_t[]){0x77,0x5C,0xA0,0x89,0x4B,0x7C,0xBB,0xD6,0x1F,0x6C,0x4F,0xF6,0x20,0x2B,0x43,0xDD}[counter]; + } else if (msg->addr == MSG_Motor_51) { + crc ^= (uint8_t[]){0x77,0x5C,0xA0,0x89,0x4B,0x7C,0xBB,0xD6,0x1F,0x6C,0x4F,0xF6,0x20,0x2B,0x43,0xDD}[counter]; + } else if (msg->addr == MSG_MOTOR_14) { + crc ^= (uint8_t[]){0x1F,0x28,0xC6,0x85,0xE6,0xF8,0xB0,0x19,0x5B,0x64,0x35,0x21,0xE4,0xF7,0x9C,0x24}[counter]; + } else if (msg->addr == MSG_KLR_01) { + crc ^= (uint8_t[]){0xDA,0x6B,0x0E,0xB2,0x78,0xBD,0x5A,0x81,0x7B,0xD6,0x41,0x39,0x76,0xB6,0xD7,0x35}[counter]; + } else { + // Undefined CAN message, CRC check expected to fail + } + crc = volkswagen_crc8_lut_8h2f[crc]; + + return (uint8_t)(crc ^ 0xFFU); +} + +static const AngleSteeringLimits VOLKSWAGEN_MEB_STEERING_LIMITS = { + .max_angle = VOLKSWAGEN_MEB_MAX_CURVATURE_CAN, + .angle_deg_to_can = VOLKSWAGEN_MEB_CURVATURE_TO_CAN, + .angle_rate_up_lookup = { + {1., 5., 25.}, + {0.4, 0.008, 0.00032}, + }, + .angle_rate_down_lookup = { + {1., 5., 25.}, + {0.4, 0.008, 0.00032}, + }, + .angle_is_curvature = true, + .inactive_angle_is_zero = true, +}; + +static safety_config volkswagen_meb_init(uint16_t param) { + // Transmit of GRA_ACC_01 is allowed on bus 0 and 2 to keep compatibility with gateway and camera integration + static const CanMsg VOLKSWAGEN_MEB_STOCK_TX_MSGS[] = { + {MSG_HCA_03, 0, 24, .check_relay = true}, + {MSG_GRA_ACC_01, 0, 8, .check_relay = false}, + {MSG_GRA_ACC_01, 2, 8, .check_relay = false}, + {MSG_LDW_02, 0, 8, .check_relay = true}, + {MSG_KLR_01, 0, 8, .check_relay = false}, {MSG_KLR_01, 2, 8, .check_relay = true}, + }; + + static RxCheck volkswagen_meb_rx_checks[] = { + {.msg = {{MSG_LH_EPS_03, 0, 8, 100U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, + {.msg = {{MSG_MOTOR_14, 0, 8, 10U, .ignore_checksum = true, .ignore_counter = true, .ignore_quality_flag = true}, { 0 }, { 0 }}}, + {.msg = {{MSG_GRA_ACC_01, 0, 8, 33U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, + {.msg = {{MSG_QFK_01, 0, 32, 50U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, + {.msg = {{MSG_Motor_51, 0, 32, 50U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, + {.msg = {{MSG_ESC_51, 0, 48, 50U, .max_counter = 15U, .ignore_quality_flag = true}, { 0 }, { 0 }}}, + }; + + volkswagen_common_init(); + SAFETY_UNUSED(param); + + return BUILD_SAFETY_CFG(volkswagen_meb_rx_checks, VOLKSWAGEN_MEB_STOCK_TX_MSGS); +} + +static void volkswagen_meb_rx_hook(const CANPacket_t *msg) { + if (msg->bus == 0U) { + // Update in-motion state by sampling wheel speeds + if (msg->addr == MSG_ESC_51) { + uint32_t fl = msg->data[8] | (msg->data[9] << 8); + uint32_t fr = msg->data[10] | (msg->data[11] << 8); + uint32_t rl = msg->data[12] | (msg->data[13] << 8); + uint32_t rr = msg->data[14] | (msg->data[15] << 8); + vehicle_moving = (fr > 0U) || (rr > 0U) || (rl > 0U) || (fl > 0U); + } + + if (msg->addr == MSG_QFK_01) { + int current_curvature = ((msg->data[6] & 0x7FU) << 8) | msg->data[5]; + bool current_curvature_sign = GET_BIT(msg, 55U); + if (!current_curvature_sign) { + current_curvature *= -1; + } + update_sample(&angle_meas, current_curvature); + } + + if (msg->addr == MSG_LH_EPS_03) { + update_sample(&torque_driver, volkswagen_mlb_mqb_driver_input_torque(msg)); + } + + if (msg->addr == MSG_Motor_51) { + int acc_status = (msg->data[11] & 0x07U); + bool cruise_engaged = (acc_status == 3) || (acc_status == 4) || (acc_status == 5); + acc_main_on = cruise_engaged || (acc_status == 2); + pcm_cruise_check(cruise_engaged); + if (!acc_main_on) { + controls_allowed = false; + } + + int accel_pedal_value = ((msg->data[1] >> 4) & 0x0FU) | ((msg->data[2] & 0x1FU) << 4); + gas_pressed = accel_pedal_value > 0; + } + + if (msg->addr == MSG_GRA_ACC_01) { + // Always exit controls on rising edge of Cancel + if (GET_BIT(msg, 13U)) { + controls_allowed = false; + } + } + + if (msg->addr == MSG_MOTOR_14) { + brake_pressed = GET_BIT(msg, 28U); + } + } +} + +static bool volkswagen_meb_tx_hook(const CANPacket_t *msg) { + bool tx = true; + + // Safety check for HCA_03 Heading Control Assist curvature + if (msg->addr == MSG_HCA_03) { + int desired_curvature_raw = GET_BYTES(msg, 3, 2) & 0x7FFFU; + bool desired_curvature_sign = GET_BIT(msg, 39U); + if (!desired_curvature_sign) { + desired_curvature_raw *= -1; + } + bool steer_req = (((msg->data[1] >> 4) & 0x0FU) == 4U); + + // Absolute curvature cap + if ((desired_curvature_raw > VOLKSWAGEN_MEB_MAX_CURVATURE_CAN) || + (desired_curvature_raw < -VOLKSWAGEN_MEB_MAX_CURVATURE_CAN)) { + tx = false; + } + + // ISO lateral acceleration envelope (only while actively steering) + if (controls_allowed && steer_req) { + const float fudged_speed = (vehicle_speed.min / VEHICLE_SPEED_FACTOR) - 1.0f; + const float v_clamped = (fudged_speed < 1.0f) ? 1.0f : fudged_speed; + const float max_lat_accel = ISO_LATERAL_ACCEL + (EARTH_G * AVERAGE_ROAD_ROLL); + const float max_curvature = max_lat_accel / (v_clamped * v_clamped); + const int max_curvature_can = (int)((max_curvature * VOLKSWAGEN_MEB_CURVATURE_TO_CAN) + 1.0f); + if ((desired_curvature_raw > max_curvature_can) || + (desired_curvature_raw < -max_curvature_can)) { + tx = false; + } + } + } + + if ((msg->addr == MSG_GRA_ACC_01) && !controls_allowed) { + // disallow resume and set: bits 16 and 19 + if ((msg->data[2] & 0x9U) != 0U) { + tx = false; + } + } + + return tx; +} + +const safety_hooks volkswagen_meb_hooks = { + .init = volkswagen_meb_init, + .rx = volkswagen_meb_rx_hook, + .tx = volkswagen_meb_tx_hook, + .get_counter = volkswagen_mqb_meb_get_counter, + .get_checksum = volkswagen_mqb_meb_get_checksum, + .compute_checksum = volkswagen_meb_compute_crc, +}; diff --git a/opendbc/safety/safety.h b/opendbc/safety/safety.h index a7c13354aa2..48f389aaa1e 100644 --- a/opendbc/safety/safety.h +++ b/opendbc/safety/safety.h @@ -23,6 +23,7 @@ #include "opendbc/safety/modes/nissan.h" #include "opendbc/safety/modes/volkswagen_mlb.h" #include "opendbc/safety/modes/volkswagen_mqb.h" +#include "opendbc/safety/modes/volkswagen_meb.h" #include "opendbc/safety/modes/volkswagen_pq.h" #include "opendbc/safety/modes/elm327.h" #include "opendbc/safety/modes/body.h" @@ -399,6 +400,7 @@ int set_safety_hooks(uint16_t mode, uint16_t param) { {SAFETY_CHRYSLER, &chrysler_hooks}, {SAFETY_SUBARU, &subaru_hooks}, {SAFETY_VOLKSWAGEN_MQB, &volkswagen_mqb_hooks}, + {SAFETY_VOLKSWAGEN_MEB, &volkswagen_meb_hooks}, {SAFETY_NISSAN, &nissan_hooks}, {SAFETY_NOOUTPUT, &nooutput_hooks}, {SAFETY_HYUNDAI_LEGACY, &hyundai_legacy_hooks}, diff --git a/opendbc/safety/tests/common.py b/opendbc/safety/tests/common.py index 267736cfac6..0e40bffe6c2 100644 --- a/opendbc/safety/tests/common.py +++ b/opendbc/safety/tests/common.py @@ -909,7 +909,7 @@ def test_tx_hook_on_wrong_safety_mode(self): continue if {attr, current_test}.issubset({'TestHyundaiLongitudinalSafety', 'TestHyundaiLongitudinalSafetyCameraSCC', 'TestHyundaiSafetyFCEVLong'}): continue - volkswagen_shared = ('TestVolkswagenMqb', 'TestVolkswagenMlb') + volkswagen_shared = ('TestVolkswagenMqb', 'TestVolkswagenMlb', 'TestVolkswagenMeb') if attr.startswith(volkswagen_shared) and current_test.startswith(volkswagen_shared): continue diff --git a/opendbc/safety/tests/libsafety/libsafety_py.py b/opendbc/safety/tests/libsafety/libsafety_py.py index 7d88bdf301e..bf70602a219 100644 --- a/opendbc/safety/tests/libsafety/libsafety_py.py +++ b/opendbc/safety/tests/libsafety/libsafety_py.py @@ -110,6 +110,10 @@ class CANPacket: void mutation_set_active_mutant(int id); int mutation_get_active_mutant(void); + +void ignition_can_hook(const CANPacket_t *msg); +bool get_ignition_can(void); +void set_ignition_can(bool c); """) class LibSafety: diff --git a/opendbc/safety/tests/libsafety/safety.c b/opendbc/safety/tests/libsafety/safety.c index afb7054448d..4869eb241d8 100644 --- a/opendbc/safety/tests/libsafety/safety.c +++ b/opendbc/safety/tests/libsafety/safety.c @@ -11,6 +11,7 @@ uint32_t microsecond_timer_get(void) { #include "opendbc/safety/can.h" #include "opendbc/safety/safety.h" +#include "opendbc/safety/ignition.h" void safety_tick_current_safety_config() { safety_tick(¤t_safety_config); @@ -45,10 +46,18 @@ void set_relay_malfunction(bool c){ relay_malfunction = c; } +void set_ignition_can(bool c){ + ignition_can = c; +} + bool get_controls_allowed(void){ return controls_allowed; } +bool get_ignition_can(void){ + return ignition_can; +} + int get_alternative_experience(void){ return alternative_experience; } @@ -201,4 +210,7 @@ void init_tests(void){ // assumes autopark on safety mode init to avoid a fault. get rid of that for testing tesla_autopark = false; + + ignition_can = false; + ignition_can_cnt = 0U; } diff --git a/opendbc/safety/tests/test_gm.py b/opendbc/safety/tests/test_gm.py index aae308f7e3e..ad9b9789209 100755 --- a/opendbc/safety/tests/test_gm.py +++ b/opendbc/safety/tests/test_gm.py @@ -228,5 +228,28 @@ class TestGmCameraLongitudinalEVSafety(TestGmCameraLongitudinalSafety, TestGmEVS pass +class TestGmIgnition(unittest.TestCase): + TX_MSGS: list = [] + + def setUp(self): + self.safety = libsafety_py.libsafety + self.safety.init_tests() + self.packer = CANPackerSafety("gm_global_a_powertrain_generated") + + def _msg(self, mode): + return self.packer.make_can_msg_safety("BCMGeneralPlatformStatus", 0, {"SystemPowerMode": mode}) + + # SystemPowerMode 2=Run, 3=Crank Request + def test_ignition_on(self): + self.safety.ignition_can_hook(self._msg(2)) + self.assertTrue(self.safety.get_ignition_can()) + + def test_ignition_off(self): + self.safety.ignition_can_hook(self._msg(2)) + self.assertTrue(self.safety.get_ignition_can()) + self.safety.ignition_can_hook(self._msg(0)) + self.assertFalse(self.safety.get_ignition_can()) + + if __name__ == "__main__": unittest.main() diff --git a/opendbc/safety/tests/test_mazda.py b/opendbc/safety/tests/test_mazda.py index 607edcbfdac..8fb77444a7c 100755 --- a/opendbc/safety/tests/test_mazda.py +++ b/opendbc/safety/tests/test_mazda.py @@ -4,7 +4,7 @@ from opendbc.car.structs import CarParams from opendbc.safety.tests.libsafety import libsafety_py import opendbc.safety.tests.common as common -from opendbc.safety.tests.common import CANPackerSafety +from opendbc.safety.tests.common import CANPackerSafety, make_msg class TestMazdaSafety(common.CarSafetyTest, common.DriverTorqueSteeringSafetyTest): @@ -81,5 +81,27 @@ def test_buttons(self): self.assertTrue(self._tx(self._button_msg(resume=True))) +class TestMazdaIgnition(unittest.TestCase): + TX_MSGS: list = [] + + def setUp(self): + self.safety = libsafety_py.libsafety + self.safety.init_tests() + + def _msg(self, byte0): + return make_msg(0, 0x9E, dat=bytes([byte0]) + b"\x00" * 7) + + # 0x9E byte 0 high 3 bits == 6 (0xC0) + def test_ignition_on(self): + self.safety.ignition_can_hook(self._msg(0xC0)) + self.assertTrue(self.safety.get_ignition_can()) + + def test_ignition_off(self): + self.safety.ignition_can_hook(self._msg(0xC0)) + self.assertTrue(self.safety.get_ignition_can()) + self.safety.ignition_can_hook(self._msg(0x20)) + self.assertFalse(self.safety.get_ignition_can()) + + if __name__ == "__main__": unittest.main() diff --git a/opendbc/safety/tests/test_rivian.py b/opendbc/safety/tests/test_rivian.py index f1393e605c1..dfe2c359ec7 100755 --- a/opendbc/safety/tests/test_rivian.py +++ b/opendbc/safety/tests/test_rivian.py @@ -142,5 +142,36 @@ def setUp(self): self.safety.init_tests() +class TestRivianIgnition(unittest.TestCase): + TX_MSGS: list = [] + + def setUp(self): + self.safety = libsafety_py.libsafety + self.safety.init_tests() + self.packer = CANPackerSafety("rivian_primary_actuator") + + def _msg(self, counter, mode): + return self.packer.make_can_msg_safety("VDM_OutputSignals", 0, + {"VDM_OutputSigs_Counter": counter, + "VDM_EpasPowerMode": mode}) + + # VDM_EpasPowerMode_Drive_On=1 + def test_ignition_on(self): + for i in range(15): + self.safety.init_tests() + self.safety.ignition_can_hook(self._msg(i, 1)) + self.assertFalse(self.safety.get_ignition_can()) + self.safety.ignition_can_hook(self._msg((i + 1) % 15, 1)) + self.assertTrue(self.safety.get_ignition_can()) + + def test_ignition_off(self): + self.safety.ignition_can_hook(self._msg(0, 1)) + self.safety.ignition_can_hook(self._msg(1, 1)) + self.assertTrue(self.safety.get_ignition_can()) + self.safety.ignition_can_hook(self._msg(2, 0)) + self.safety.ignition_can_hook(self._msg(3, 0)) + self.assertFalse(self.safety.get_ignition_can()) + + if __name__ == "__main__": unittest.main() diff --git a/opendbc/safety/tests/test_tesla.py b/opendbc/safety/tests/test_tesla.py index 8c05a7dd2e3..c4e42884f34 100755 --- a/opendbc/safety/tests/test_tesla.py +++ b/opendbc/safety/tests/test_tesla.py @@ -456,5 +456,36 @@ class TestTeslaFSD14LongitudinalSafety(TestTeslaLongitudinalSafety): SAFETY_PARAM = TeslaSafetyFlags.LONG_CONTROL | TeslaSafetyFlags.FSD_14 +class TestTeslaIgnition(unittest.TestCase): + TX_MSGS: list = [] + + def setUp(self): + self.safety = libsafety_py.libsafety + self.safety.init_tests() + self.packer = CANPackerSafety("tesla_model3_party") + + def _msg(self, counter, state): + return self.packer.make_can_msg_safety("VCFRONT_LVPowerState", 0, + {"VCFRONT_LVPowerStateCounter": counter, + "VCFRONT_vehiclePowerState": state}) + + # VEHICLE_POWER_STATE_DRIVE=3 (counter-gated) + def test_ignition_on(self): + for i in range(16): + self.safety.init_tests() + self.safety.ignition_can_hook(self._msg(i, 3)) + self.assertFalse(self.safety.get_ignition_can()) + self.safety.ignition_can_hook(self._msg((i + 1) % 16, 3)) + self.assertTrue(self.safety.get_ignition_can()) + + def test_ignition_off(self): + self.safety.ignition_can_hook(self._msg(0, 3)) + self.safety.ignition_can_hook(self._msg(1, 3)) + self.assertTrue(self.safety.get_ignition_can()) + self.safety.ignition_can_hook(self._msg(2, 2)) + self.safety.ignition_can_hook(self._msg(3, 2)) + self.assertFalse(self.safety.get_ignition_can()) + + if __name__ == "__main__": unittest.main() diff --git a/opendbc/safety/tests/test_volkswagen_meb.py b/opendbc/safety/tests/test_volkswagen_meb.py new file mode 100644 index 00000000000..9b8eab87d83 --- /dev/null +++ b/opendbc/safety/tests/test_volkswagen_meb.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 +import unittest + +from opendbc.car.structs import CarParams +from opendbc.safety.tests.libsafety import libsafety_py +import opendbc.safety.tests.common as common +from opendbc.safety.tests.common import CANPackerSafety + + +MSG_LH_EPS_03 = 0x9F # RX from EPS, for driver steering torque +MSG_ESC_51 = 0xFC # RX from ABS, for wheel speeds +MSG_Motor_51 = 0x10B # RX from ECU, for ACC status / accel pedal +MSG_GRA_ACC_01 = 0x12B # TX by OP, ACC control buttons for cancel/resume +MSG_KLR_01 = 0x25D # TX by OP, capacitive steering wheel touch +MSG_HCA_03 = 0x303 +MSG_LDW_02 = 0x397 # TX by OP, Lane line recognition and text alerts + + +class TestVolkswagenMebStockSafety(common.CarSafetyTest): + STANDSTILL_THRESHOLD = 0 + RELAY_MALFUNCTION_ADDRS = {0: (MSG_HCA_03, MSG_LDW_02), 2: (MSG_KLR_01,)} + FWD_BLACKLISTED_ADDRS = {0: [MSG_KLR_01], 2: [MSG_HCA_03, MSG_LDW_02]} + TX_MSGS = [[MSG_HCA_03, 0], [MSG_LDW_02, 0], [MSG_GRA_ACC_01, 0], [MSG_GRA_ACC_01, 2], + [MSG_KLR_01, 0], [MSG_KLR_01, 2]] + + def setUp(self): + self.packer = CANPackerSafety("vw_meb") + self.safety = libsafety_py.libsafety + self.safety.set_safety_hooks(CarParams.SafetyModel.volkswagenMeb, 0) + self.safety.init_tests() + + # Wheel speeds + def _speed_msg(self, speed): + val = int(speed * 3.6 / 0.0075) + values = {f"{s}_Radgeschw": val for s in ["VL", "VR", "HL", "HR"]} + return self.packer.make_can_msg_safety("ESC_51", 0, values) + + def _user_brake_msg(self, brake): + values = {"MO_Fahrer_bremst": brake} + return self.packer.make_can_msg_safety("Motor_14", 0, values) + + def _user_gas_msg(self, gas): + values = {"Accel_Pedal_Pressure": 1 if gas else 0, "TSK_Status": 3} + return self.packer.make_can_msg_safety("Motor_51", 0, values) + + def _pcm_status_msg(self, enable): + values = {"TSK_Status": 3 if enable else 2} + return self.packer.make_can_msg_safety("Motor_51", 0, values) + + def _gra_acc_01_msg(self, cancel=0, resume=0, _set=0, bus=2): + values = {"GRA_Abbrechen": cancel, "GRA_Tip_Setzen": _set, "GRA_Tip_Wiederaufnahme": resume} + return self.packer.make_can_msg_safety("GRA_ACC_01", bus, values) + + def _hca_03_msg(self, curvature, steer_req=True): + values = { + "Curvature": abs(curvature), + "Curvature_VZ": 1 if curvature > 0 else 0, + "Power": 50 if steer_req else 0, + "RequestStatus": 4 if steer_req else 2, + } + return self.packer.make_can_msg_safety("HCA_03", 0, values) + + def test_spam_cancel_safety_check(self): + self.safety.set_controls_allowed(0) + self.assertTrue(self._tx(self._gra_acc_01_msg(cancel=1))) + self.assertFalse(self._tx(self._gra_acc_01_msg(resume=1))) + self.assertFalse(self._tx(self._gra_acc_01_msg(_set=1))) + # do not block resume if we are engaged already + self.safety.set_controls_allowed(1) + self.assertTrue(self._tx(self._gra_acc_01_msg(resume=1))) + + +class TestVolkswagenMebIgnition(unittest.TestCase): + TX_MSGS: list = [] + + def setUp(self): + self.safety = libsafety_py.libsafety + self.safety.init_tests() + self.packer = CANPackerSafety("vw_meb") + + def _msg(self, counter, ign): + return self.packer.make_can_msg_safety("Klemmen_Status_01", 0, + {"Klemmen_Status_01_BZ": counter, + "ZAS_Kl_15": ign}) + + # ZAS_Kl_15=1 + def test_ignition_on(self): + for i in range(16): + self.safety.init_tests() + self.safety.ignition_can_hook(self._msg(i, 1)) + self.assertFalse(self.safety.get_ignition_can()) + self.safety.ignition_can_hook(self._msg((i + 1) % 16, 1)) + self.assertTrue(self.safety.get_ignition_can()) + + def test_ignition_off(self): + self.safety.ignition_can_hook(self._msg(0, 1)) + self.safety.ignition_can_hook(self._msg(1, 1)) + self.assertTrue(self.safety.get_ignition_can()) + self.safety.ignition_can_hook(self._msg(2, 0)) + self.safety.ignition_can_hook(self._msg(3, 0)) + self.assertFalse(self.safety.get_ignition_can()) + + +if __name__ == "__main__": + unittest.main()