diff --git a/configuration.py b/configuration.py index d173d425..2d4a893c 100644 --- a/configuration.py +++ b/configuration.py @@ -1,7 +1,5 @@ from enum import Enum -from integrations.openwb.charging_station import ChargingStation - class TransportProtocol(Enum): def __init__(self, transport_mechanism: str, with_tls: bool): @@ -33,7 +31,7 @@ def __init__(self): self.mqtt_password: str | None = None self.mqtt_client_id: str = 'saic-python-mqtt-gateway' self.mqtt_topic: str | None = None - self.charging_stations_by_vin: dict[str, ChargingStation] = {} + self.charging_stations_file: str | None = None self.anonymized_publishing: bool = False self.messages_request_interval: int = 60 # in seconds self.ha_discovery_enabled: bool = True diff --git a/integrations/__init__.py b/integrations/__init__.py index e69de29b..4297f6b9 100644 --- a/integrations/__init__.py +++ b/integrations/__init__.py @@ -0,0 +1,73 @@ +from abc import ABC +from typing import Tuple, Any, List + +from saic_ismart_client_ng.api.vehicle import VehicleStatusResp +from saic_ismart_client_ng.api.vehicle_charging import ChrgMgmtDataResp + +from configuration import Configuration +from publisher.core import Publisher +from publisher.core import MqttCommandListener + + +class SaicMqttGatewayIntegrationException(Exception): + pass + + +class SaicMqttGatewayIntegration(ABC): + + def __init__( + self, *, + name: str, + configuration: Configuration, + publisher: Publisher, + listener: MqttCommandListener, + ): + self.__name = name + self.__configuration = configuration + self.__publisher = publisher + self.__listener = listener + + async def on_full_refresh_done( + self, + *, + vin: str, + vehicle_status: VehicleStatusResp, + charge_info: ChrgMgmtDataResp + ) -> Tuple[bool, Any | None]: + return False, 'Not supported' + + async def on_raw_mqtt_message( + self, + topic: str, + payload: str + ) -> Tuple[bool, Any | None]: + return False, 'Not supported' + + async def on_mqtt_command( + self, + *, + vin: str, + topic: str, + payload: str + ) -> Tuple[bool, Any | None]: + return False, 'Not supported' + + @property + def additional_mqtt_topics(self) -> List[str]: + return [] + + @property + def name(self): + return str(self.__name).lower() + + @property + def configuration(self) -> Configuration: + return self.__configuration + + @property + def publisher(self): + return self.__publisher + + @property + def listener(self): + return self.__listener diff --git a/integrations/abrp/api.py b/integrations/abrp/api.py index c9c0777f..dd3c19b2 100644 --- a/integrations/abrp/api.py +++ b/integrations/abrp/api.py @@ -11,12 +11,16 @@ from saic_ismart_client_ng.api.vehicle_charging import ChrgMgmtDataResp from saic_ismart_client_ng.api.vehicle_charging.schema import RvsChargeStatus +from configuration import Configuration +from integrations import SaicMqttGatewayIntegration, SaicMqttGatewayIntegrationException +from integrations.abrp.api_listener import MqttGatewayAbrpListener +from publisher.core import MqttCommandListener, Publisher from utils import value_in_range LOG = logging.getLogger(__name__) -class AbrpApiException(Exception): +class AbrpApiException(SaicMqttGatewayIntegrationException): def __init__(self, msg: str): self.message = msg @@ -24,35 +28,34 @@ def __str__(self): return self.message -class AbrpApiListener(ABC): - async def on_request(self, path: str, body: Optional[str] = None, headers: Optional[dict] = None): - pass - - async def on_response(self, path: str, body: Optional[str] = None, headers: Optional[dict] = None): - pass - - -class AbrpApi: - def __init__(self, abrp_api_key: str, abrp_user_token: str, listener: Optional[AbrpApiListener] = None) -> None: - self.abrp_api_key = abrp_api_key - self.abrp_user_token = abrp_user_token - self.__listener = listener +class AbrpApi(SaicMqttGatewayIntegration): + def __init__(self, *, configuration: Configuration, publisher: Publisher, listener: MqttCommandListener): + super().__init__(name='ABRP', configuration=configuration, publisher=publisher, listener=listener) + self.__abrp_token_map = self.configuration.abrp_token_map + self.__abrp_api_key = self.configuration.abrp_api_key self.__base_uri = 'https://api.iternio.com/1/' - self.client = httpx.AsyncClient( + self.__listener = MqttGatewayAbrpListener(self.publisher) + self.__client = httpx.AsyncClient( event_hooks={ "request": [self.invoke_request_listener], "response": [self.invoke_response_listener] } ) - async def update_abrp(self, vehicle_status: VehicleStatusResp, charge_info: ChrgMgmtDataResp) \ - -> Tuple[bool, Any | None]: + async def on_full_refresh_done( + self, + *, + vin: str, + vehicle_status: VehicleStatusResp, + charge_info: ChrgMgmtDataResp + ) -> Tuple[bool, Any | None]: + abrp_user_token = self.__get_user_token(vin) charge_status = None if charge_info is None else charge_info.chrgMgmtData if ( - self.abrp_api_key is not None - and self.abrp_user_token is not None + self.__abrp_api_key is not None + and abrp_user_token is not None and vehicle_status is not None and charge_status is not None ): @@ -95,12 +98,12 @@ async def update_abrp(self, vehicle_status: VehicleStatusResp, charge_info: Chrg data.update(self.__extract_gps_position(gps_position)) headers = { - 'Authorization': f'APIKEY {self.abrp_api_key}' + 'Authorization': f'APIKEY {self.__abrp_api_key}' } try: - response = await self.client.post(url=tlm_send_url, headers=headers, params={ - 'token': self.abrp_user_token, + response = await self.__client.post(url=tlm_send_url, headers=headers, params={ + 'token': abrp_user_token, 'tlm': json.dumps(data) }) await response.aread() @@ -242,3 +245,9 @@ async def invoke_response_listener(self, response: httpx.Response): ) except Exception as e: LOG.warning(f"Error invoking request listener: {e}", exc_info=e) + + def __get_user_token(self, vin: str): + if vin in self.__abrp_token_map: + return self.__abrp_token_map[vin] + else: + return None diff --git a/integrations/abrp/api_listener.py b/integrations/abrp/api_listener.py new file mode 100644 index 00000000..becb5e94 --- /dev/null +++ b/integrations/abrp/api_listener.py @@ -0,0 +1,18 @@ +from typing import Optional + +from mqtt_topics import INTERNAL +from publisher.core import Publisher +from saic_api_listener import MqttGatewayListenerApiListener + +INTERNAL_ABRP_TOPIC = INTERNAL + '/abrp' + + +class MqttGatewayAbrpListener(MqttGatewayListenerApiListener): + def __init__(self, publisher: Publisher): + super().__init__(publisher, INTERNAL_ABRP_TOPIC) + + async def on_request(self, path: str, body: Optional[str] = None, headers: Optional[dict] = None): + await self.publish_request(path, body, headers) + + async def on_response(self, path: str, body: Optional[str] = None, headers: Optional[dict] = None): + await self.publish_response(path, body, headers) diff --git a/integrations/openwb/__init__.py b/integrations/openwb/__init__.py index e69de29b..fa548963 100644 --- a/integrations/openwb/__init__.py +++ b/integrations/openwb/__init__.py @@ -0,0 +1,172 @@ +import json +import logging +from typing import List, override, Tuple, Any + +from saic_ismart_client_ng.api.vehicle import VehicleStatusResp +from saic_ismart_client_ng.api.vehicle.schema import BasicVehicleStatus +from saic_ismart_client_ng.api.vehicle_charging import ChrgMgmtDataResp +from saic_ismart_client_ng.api.vehicle_charging.schema import RvsChargeStatus + +from configuration import Configuration +from integrations import SaicMqttGatewayIntegration +from integrations.openwb.model import ChargingStation +from publisher.core import MqttCommandListener, Publisher +from utils import value_in_range + +LOG = logging.getLogger(__name__) + +CHARGING_STATIONS_FILE = 'charging-stations.json' + + +def process_charging_stations_file(json_file: str) -> dict[str, ChargingStation]: + result = dict() + try: + with open(json_file, 'r') as f: + data = json.load(f) + for item in data: + charge_state_topic = item['chargeStateTopic'] + charging_value = item['chargingValue'] + vin = item['vin'] + if 'socTopic' in item: + charging_station = ChargingStation(vin, charge_state_topic, charging_value, item['socTopic']) + else: + charging_station = ChargingStation(vin, charge_state_topic, charging_value) + if 'rangeTopic' in item: + charging_station.range_topic = item['rangeTopic'] + if 'chargerConnectedTopic' in item: + charging_station.connected_topic = item['chargerConnectedTopic'] + if 'chargerConnectedValue' in item: + charging_station.connected_value = item['chargerConnectedValue'] + result[vin] = charging_station + except FileNotFoundError: + LOG.warning(f'File {json_file} does not exist') + except json.JSONDecodeError as e: + LOG.exception(f'Reading {json_file} failed', exc_info=e) + return result + + +class OpenWBIntegration(SaicMqttGatewayIntegration): + def __init__(self, *, configuration: Configuration, publisher: Publisher, listener: MqttCommandListener): + super().__init__(name='OpenWB', configuration=configuration, publisher=publisher, listener=listener) + charging_stations_file = self.configuration.charging_stations_file or f'./{CHARGING_STATIONS_FILE}' + self.__charging_stations_by_vin = process_charging_stations_file(charging_stations_file) + self.__vin_by_charge_state_topic: dict[str, str] = {} + self.__last_charge_state_by_vin: [str, str] = {} + self.__vin_by_charger_connected_topic: dict[str, str] = {} + for charging_station in self.__charging_stations_by_vin.values(): + LOG.debug(f'Subscribing to MQTT topic {charging_station.charge_state_topic}') + self.__vin_by_charge_state_topic[charging_station.charge_state_topic] = charging_station.vin + if charging_station.connected_topic: + LOG.debug(f'Subscribing to MQTT topic {charging_station.connected_topic}') + self.__vin_by_charger_connected_topic[charging_station.connected_topic] = charging_station.vin + + @override + async def on_raw_mqtt_message( + self, *, + topic: str, + payload: str + ): + handled = False + if topic in self.__vin_by_charge_state_topic: + LOG.debug(f'Received message over topic {topic} with payload {payload}') + handled = True + vin = self.__vin_by_charge_state_topic[topic] + charging_station = self.__charging_stations_by_vin[vin] + if self.__should_force_refresh(payload, charging_station): + LOG.info(f'Vehicle with vin {vin} is charging. Setting refresh mode to force') + if self.listener is not None: + await self.listener.on_charging_detected(vin) + elif topic in self.__vin_by_charger_connected_topic: + LOG.debug(f'Received message over topic {topic} with payload {payload}') + handled = True + vin = self.__vin_by_charger_connected_topic[topic] + charging_station = self.__charging_stations_by_vin[vin] + if payload == charging_station.connected_value: + LOG.debug(f'Vehicle with vin {vin} is connected to its charging station') + else: + LOG.debug(f'Vehicle with vin {vin} is disconnected from its charging station') + + return handled, '' if handled else f'Topic {topic} not supported by OpenWB integration' + + async def on_full_refresh_done( + self, + *, + vin: str, + vehicle_status: VehicleStatusResp, + charge_info: ChrgMgmtDataResp + ) -> Tuple[bool, Any | None]: + handled = False + charging_station = self.__get_charging_station(vin) + if charging_station: + if ( + charging_station.soc_topic + and charge_info + and charge_info.chrgMgmtData + and charge_info.chrgMgmtData.bmsPackSOCDsp is not None + ): + soc = charge_info.chrgMgmtData.bmsPackSOCDsp / 10.0 + if soc <= 100.0: + self.publisher.publish_int(charging_station.soc_topic, int(soc), True) + handled = True + if charging_station.range_topic: + electric_range = self.__extract_electric_range( + basic_vehicle_status=vehicle_status.basicVehicleStatus if vehicle_status is not None else None, + charge_status=charge_info.rvsChargeStatus if charge_info is not None else None + ) + if electric_range is not None: + self.publisher.publish_float(charging_station.range_topic, electric_range, True) + handled = True + return handled, '' if handled else f'Car with vin {vin} does not have an OpenWB configuration' + + @override + def additional_mqtt_topics(self) -> List[str]: + return list(self.__vin_by_charge_state_topic.keys()) + list(self.__vin_by_charge_state_topic.keys()) + + def __should_force_refresh(self, current_charging_value: str, charging_station: ChargingStation): + vin = charging_station.vin + last_charging_value: str | None = None + if vin in self.__last_charge_state_by_vin: + last_charging_value = self.__last_charge_state_by_vin[vin] + self.__last_charge_state_by_vin[vin] = current_charging_value + + if last_charging_value: + if last_charging_value == current_charging_value: + LOG.debug('Last charging value equals current charging value. No refresh needed.') + return False + else: + LOG.info(f'Charging value has changed from {last_charging_value} to {current_charging_value}.') + return True + else: + return True + + def __get_charging_station(self, vin) -> ChargingStation | None: + if vin in self.__charging_stations_by_vin: + return self.__charging_stations_by_vin[vin] + else: + return None + + def __extract_electric_range( + self, + basic_vehicle_status: BasicVehicleStatus | None, + charge_status: RvsChargeStatus | None + ) -> float | None: + + range_elec_vehicle = 0.0 + if basic_vehicle_status is not None: + range_elec_vehicle = self.__parse_electric_range(raw_value=basic_vehicle_status.fuelRangeElec) + + range_elec_bms = 0.0 + if charge_status is not None: + range_elec_bms = self.__parse_electric_range(raw_value=charge_status.fuelRangeElec) + + range_elec = max(range_elec_vehicle, range_elec_bms) + if range_elec > 0: + return range_elec + + return None + + @staticmethod + def __parse_electric_range(raw_value) -> float: + if value_in_range(raw_value, 1, 65535): + return float(raw_value) / 10.0 + return 0.0 diff --git a/integrations/openwb/charging_station.py b/integrations/openwb/model.py similarity index 100% rename from integrations/openwb/charging_station.py rename to integrations/openwb/model.py diff --git a/mqtt_gateway.py b/mqtt_gateway.py index 729055f0..0af8c5b4 100644 --- a/mqtt_gateway.py +++ b/mqtt_gateway.py @@ -9,7 +9,7 @@ import sys import time import urllib.parse -from typing import Callable, override +from typing import Callable, List, override import apscheduler.schedulers.asyncio from saic_ismart_client_ng import SaicApi @@ -25,16 +25,16 @@ import mqtt_topics from configuration import Configuration, TransportProtocol from exceptions import MqttGatewayException -from integrations.abrp.api import AbrpApi, AbrpApiException +from integrations import SaicMqttGatewayIntegration, SaicMqttGatewayIntegrationException +from integrations.abrp.api import AbrpApi from integrations.home_assistant.discovery import HomeAssistantDiscovery -from integrations.openwb.charging_station import ChargingStation +from integrations.openwb import OpenWBIntegration from publisher.core import Publisher from publisher.mqtt_publisher import MqttClient, MqttCommandListener -from saic_api_listener import MqttGatewaySaicApiListener, MqttGatewayAbrpListener +from saic_api_listener import MqttGatewaySaicApiListener from vehicle import RefreshMode, VehicleState MSG_CMD_SUCCESSFUL = 'Success' -CHARGING_STATIONS_FILE = 'charging-stations.json' def epoch_value_to_str(time_value: int) -> str: @@ -58,8 +58,15 @@ def debug_log_enabled(): class VehicleHandler: - def __init__(self, config: Configuration, saicapi: SaicApi, publisher: Publisher, vin_info: VinInfo, - vehicle_state: VehicleState): + def __init__( + self, + config: Configuration, + integrations: List[SaicMqttGatewayIntegration], + saicapi: SaicApi, + publisher: Publisher, + vin_info: VinInfo, + vehicle_state: VehicleState + ): self.configuration = config self.saic_api = saicapi self.publisher = publisher @@ -67,15 +74,7 @@ def __init__(self, config: Configuration, saicapi: SaicApi, publisher: Publisher self.vehicle_prefix = f'{self.configuration.saic_user}/vehicles/{self.vin_info.vin}' self.vehicle_state = vehicle_state self.ha_discovery = HomeAssistantDiscovery(vehicle_state, vin_info, config) - if vin_info.vin in self.configuration.abrp_token_map: - abrp_user_token = self.configuration.abrp_token_map[vin_info.vin] - else: - abrp_user_token = None - self.abrp_api = AbrpApi( - self.configuration.abrp_api_key, - abrp_user_token, - listener=MqttGatewayAbrpListener(self.publisher) - ) + self.__integrations = integrations async def handle_vehicle(self) -> None: start_time = datetime.datetime.now() @@ -109,7 +108,7 @@ async def handle_vehicle(self) -> None: self.vehicle_state.mark_successful_refresh() LOG.info('Refreshing vehicle status succeeded...') - await self.__refresh_abrp(charge_status, vehicle_status) + await self.__refresh_integrations(charge_status, vehicle_status) except SaicApiException as e: self.vehicle_state.mark_failed_refresh() @@ -117,8 +116,7 @@ async def handle_vehicle(self) -> None: 'handle_vehicle loop failed during SAIC API call', exc_info=e ) - except AbrpApiException as ae: - LOG.exception('handle_vehicle loop failed during ABRP API call', exc_info=ae) + except Exception as e: self.vehicle_state.mark_failed_refresh() LOG.exception( @@ -132,13 +130,52 @@ async def handle_vehicle(self) -> None: # car not active, wait a second await asyncio.sleep(1.0) - async def __refresh_abrp(self, charge_status, vehicle_status): - abrp_refreshed, abrp_response = await self.abrp_api.update_abrp(vehicle_status, charge_status) - self.publisher.publish_str(f'{self.vehicle_prefix}/{mqtt_topics.INTERNAL_ABRP}', abrp_response) - if abrp_refreshed: - LOG.info('Refreshing ABRP status succeeded...') - else: - LOG.info(f'ABRP not refreshed, reason {abrp_response}') + async def __refresh_integrations(self, charge_status, vehicle_status): + for integration in self.__integrations: + try: + integration_refreshed, integration_response = await integration.on_full_refresh_done( + vin=self.vin_info.vin, + vehicle_status=vehicle_status, + charge_info=charge_status + ) + self.publisher.publish_str( + f'{self.vehicle_prefix}/{mqtt_topics.INTERNAL}/{integration.name}/full_refresh', + integration_response + ) + if integration_refreshed: + LOG.info(f'Refreshing integration {integration.name} status succeeded...') + else: + LOG.info(f'Integration {integration.name} not refreshed, reason {integration_response}') + except SaicMqttGatewayIntegrationException as ae: + LOG.exception( + f'handle_vehicle loop failed during integration {integration.name} processing call', + exc_info=ae + ) + + async def __handle_mqtt_command_in_integrations(self, topic, payload) -> bool: + topic = str(topic).lower() + payload = str(payload).lower() + success = False + for integration in self.__integrations: + try: + integration_refreshed, integration_response = await integration.on_mqtt_command( + vin=self.vin_info.vin, + topic=topic, + payload=payload + ) + self.publisher.publish_str( + f'{self.vehicle_prefix}/{mqtt_topics.INTERNAL}/{integration.name}/mqtt', + integration_response + ) + if integration_refreshed: + LOG.info(f'Handling MQTT command {topic} in integration {integration.name} succeeded...') + success = True + except SaicMqttGatewayIntegrationException as ae: + LOG.exception( + f'handle_vehicle loop failed during Handling MQTT command {topic} integration {integration.name} processing call', + exc_info=ae + ) + return success async def update_vehicle_status(self) -> VehicleStatusResp: LOG.info('Updating vehicle status') @@ -387,8 +424,12 @@ async def handle_mqtt_command(self, *, topic: str, payload: str): case _: # set mode, period (in)-active,... + handled = await self.vehicle_state.handle_mqtt_command(topic=topic, payload=payload) + if not handled: + handled = self.__handle_mqtt_command_in_integrations(topic, payload) + if not handled: + raise MqttGatewayException(f'Unsupported topic {topic}') should_force_refresh = False - await self.vehicle_state.configure_by_message(topic=topic, payload=payload) self.publisher.publish_str(f'{self.vehicle_prefix}/{topic}/result', 'Success') if should_force_refresh: self.vehicle_state.set_refresh_mode(RefreshMode.FORCE, f'after command execution on topic {topic}') @@ -433,6 +474,11 @@ def __init__(self, config: Configuration): ) self.saic_api.on_publish_json_value = self.__on_publish_json_value self.saic_api.on_publish_raw_value = self.__on_publish_raw_value + self.integrations = [ + OpenWBIntegration(configuration=configuration, publisher=self.publisher, listener=self), + AbrpApi(configuration=configuration, publisher=self.publisher, listener=self) + ] + self.publisher.integrations = self.integrations async def run(self): scheduler = apscheduler.schedulers.asyncio.AsyncIOScheduler() @@ -462,32 +508,19 @@ async def run(self): raise SystemExit(e) account_prefix = f'{self.configuration.saic_user}/{mqtt_topics.VEHICLES}/{vin_info.vin}' - charging_station = self.get_charging_station(vin_info.vin) - if ( - charging_station - and charging_station.soc_topic - ): - LOG.debug('SoC of %s for charging station will be published over MQTT topic: %s', vin_info.vin, - charging_station.soc_topic) - if ( - charging_station - and charging_station.range_topic - ): - LOG.debug('Range of %s for charging station will be published over MQTT topic: %s', vin_info.vin, - charging_station.range_topic) total_battery_capacity = configuration.battery_capacity_map.get(vin_info.vin, None) vehicle_state = VehicleState( self.publisher, scheduler, account_prefix, vin_info, - charging_station, charge_polling_min_percent=self.configuration.charge_dynamic_polling_min_percentage, total_battery_capacity=total_battery_capacity, ) vehicle_handler = VehicleHandler( self.configuration, + self.integrations, self.saic_api, self.publisher, # Gateway pointer vin_info, @@ -539,12 +572,6 @@ def __on_publish_raw_value(self, key: str, raw: str): def __on_publish_json_value(self, key: str, json_data: dict): self.publisher.publish_json(key, json_data) - def get_charging_station(self, vin) -> ChargingStation | None: - if vin in self.configuration.charging_stations_by_vin: - return self.configuration.charging_stations_by_vin[vin] - else: - return None - async def __main_loop(self): tasks = [] for (key, vh) in self.vehicle_handler.items(): @@ -821,11 +848,7 @@ def process_arguments() -> Configuration: config.battery_capacity_map, value_type=check_positive_float ) - if args.charging_stations_file: - process_charging_stations_file(config, args.charging_stations_file) - else: - process_charging_stations_file(config, f'./{CHARGING_STATIONS_FILE}') - + config.charging_stations_file = args.charging_stations_file config.saic_password = args.saic_password if args.ha_discovery_enabled is not None: @@ -872,32 +895,6 @@ def process_arguments() -> Configuration: SystemExit(err) -def process_charging_stations_file(config: Configuration, json_file: str): - try: - with open(json_file, 'r') as f: - data = json.load(f) - - for item in data: - charge_state_topic = item['chargeStateTopic'] - charging_value = item['chargingValue'] - vin = item['vin'] - if 'socTopic' in item: - charging_station = ChargingStation(vin, charge_state_topic, charging_value, item['socTopic']) - else: - charging_station = ChargingStation(vin, charge_state_topic, charging_value) - if 'rangeTopic' in item: - charging_station.range_topic = item['rangeTopic'] - if 'chargerConnectedTopic' in item: - charging_station.connected_topic = item['chargerConnectedTopic'] - if 'chargerConnectedValue' in item: - charging_station.connected_value = item['chargerConnectedValue'] - config.charging_stations_by_vin[vin] = charging_station - except FileNotFoundError: - LOG.warning(f'File {json_file} does not exist') - except json.JSONDecodeError as e: - LOG.exception(f'Reading {json_file} failed', exc_info=e) - - def cfg_value_to_dict(cfg_value: str, result_map: dict, value_type: Callable[[str], any] = str): if ',' in cfg_value: map_entries = cfg_value.split(',') diff --git a/mqtt_topics.py b/mqtt_topics.py index 4808fba2..acf652df 100644 --- a/mqtt_topics.py +++ b/mqtt_topics.py @@ -98,7 +98,7 @@ INTERNAL = '_internal' INTERNAL_API = INTERNAL + '/api' INTERNAL_LWT = INTERNAL + '/lwt' -INTERNAL_ABRP = INTERNAL + '/abrp' + INTERNAL_CONFIGURATION_RAW = INTERNAL + '/configuration/raw' LOCATION = 'location' diff --git a/publisher/core.py b/publisher/core.py index 743bcc08..b6aac8fa 100644 --- a/publisher/core.py +++ b/publisher/core.py @@ -6,6 +6,14 @@ from configuration import Configuration +class MqttCommandListener(ABC): + async def on_mqtt_command_received(self, *, vin: str, topic: str, payload: str) -> None: + raise NotImplementedError("Should have implemented this") + + async def on_charging_detected(self, vin: str) -> None: + raise NotImplementedError("Should have implemented this") + + class Publisher(ABC): def __init__(self, config: Configuration): self.configuration = config diff --git a/publisher/mqtt_publisher.py b/publisher/mqtt_publisher.py index c0d73192..481b575a 100644 --- a/publisher/mqtt_publisher.py +++ b/publisher/mqtt_publisher.py @@ -1,14 +1,14 @@ import logging import os import ssl -from abc import ABC -from typing import Optional, override +from typing import Optional, List, override import gmqtt import mqtt_topics from configuration import Configuration -from integrations.openwb.charging_station import ChargingStation +from integrations import SaicMqttGatewayIntegration, SaicMqttGatewayIntegrationException +from publisher.core import MqttCommandListener from publisher.core import Publisher LOG = logging.getLogger(__name__) @@ -18,14 +18,6 @@ MQTT_LOG.setLevel(level=os.getenv('MQTT_LOG_LEVEL', 'INFO').upper()) -class MqttCommandListener(ABC): - async def on_mqtt_command_received(self, *, vin: str, topic: str, payload: str) -> None: - raise NotImplementedError("Should have implemented this") - - async def on_charging_detected(self, vin: str) -> None: - raise NotImplementedError("Should have implemented this") - - class MqttClient(Publisher): def __init__(self, configuration: Configuration): super().__init__(configuration) @@ -37,9 +29,7 @@ def __init__(self, configuration: Configuration): self.port = self.configuration.mqtt_port self.transport_protocol = self.configuration.mqtt_transport_protocol self.command_listener: Optional[MqttCommandListener] = None - self.vin_by_charge_state_topic: dict[str, str] = {} - self.last_charge_state_by_vin: [str, str] = {} - self.vin_by_charger_connected_topic: dict[str, str] = {} + self.integrations: List[SaicMqttGatewayIntegration] = [] mqtt_client = gmqtt.Client( client_id=str(self.publisher_id), @@ -122,28 +112,34 @@ async def __on_message(self, _client, topic, payload, _qos, _properties) -> None LOG.exception(f'Error while processing MQTT message: {e}') async def __on_message_real(self, *, topic: str, payload: str) -> None: - if topic in self.vin_by_charge_state_topic: + if await self.__handle_raw_mqtt_message_integrations(topic, payload): LOG.debug(f'Received message over topic {topic} with payload {payload}') - vin = self.vin_by_charge_state_topic[topic] - charging_station = self.configuration.charging_stations_by_vin[vin] - if self.should_force_refresh(payload, charging_station): - LOG.info(f'Vehicle with vin {vin} is charging. Setting refresh mode to force') - if self.command_listener is not None: - await self.command_listener.on_charging_detected(vin) - elif topic in self.vin_by_charger_connected_topic: - LOG.debug(f'Received message over topic {topic} with payload {payload}') - vin = self.vin_by_charger_connected_topic[topic] - charging_station = self.configuration.charging_stations_by_vin[vin] - if payload == charging_station.connected_value: - LOG.debug(f'Vehicle with vin {vin} is connected to its charging station') - else: - LOG.debug(f'Vehicle with vin {vin} is disconnected from its charging station') else: vin = self.get_vin_from_topic(topic) if self.command_listener is not None: await self.command_listener.on_mqtt_command_received(vin=vin, topic=topic, payload=payload) return + async def __handle_raw_mqtt_message_integrations(self, topic, payload) -> bool: + topic = str(topic) + payload = str(payload) + success = False + for integration in self.integrations: + try: + integration_refreshed, integration_response = await integration.on_raw_mqtt_message( + topic=topic, + payload=payload + ) + if integration_refreshed: + LOG.info(f'Handling RAW MQTT message on {topic} in integration {integration.name} succeeded...') + success = True + except SaicMqttGatewayIntegrationException as ae: + LOG.exception( + f'handle_vehicle loop failed during Handling MQTT command {topic} integration {integration.name} processing call', + exc_info=ae + ) + return success + def publish(self, topic: str, payload) -> None: self.client.publish(self.remove_special_mqtt_characters(topic), payload, retain=True) @@ -187,20 +183,3 @@ def get_vin_from_topic(self, topic: str) -> str: global_topic_removed = topic[len(self.configuration.mqtt_topic) + 1:] elements = global_topic_removed.split('/') return elements[2] - - def should_force_refresh(self, current_charging_value: str, charging_station: ChargingStation): - vin = charging_station.vin - last_charging_value: str | None = None - if vin in self.last_charge_state_by_vin: - last_charging_value = self.last_charge_state_by_vin[vin] - self.last_charge_state_by_vin[vin] = current_charging_value - - if last_charging_value: - if last_charging_value == current_charging_value: - LOG.debug('Last charging value equals current charging value. No refresh needed.') - return False - else: - LOG.info(f'Charging value has changed from {last_charging_value} to {current_charging_value}.') - return True - else: - return True diff --git a/saic_api_listener.py b/saic_api_listener.py index cdacead3..de3ed47a 100644 --- a/saic_api_listener.py +++ b/saic_api_listener.py @@ -7,8 +7,7 @@ from saic_ismart_client_ng.listener import SaicApiListener -from integrations.abrp.api import AbrpApiListener -from mqtt_topics import INTERNAL_API, INTERNAL_ABRP +from mqtt_topics import INTERNAL_API from publisher.core import Publisher LOG = logging.getLogger(__name__) @@ -69,19 +68,6 @@ def __internal_publish(self, *, key: str, data: dict): LOG.info(f"Not publishing API response to MQTT since publisher is not connected. {data}") -class MqttGatewayAbrpListener(AbrpApiListener, MqttGatewayListenerApiListener): - def __init__(self, publisher: Publisher): - super().__init__(publisher, INTERNAL_ABRP) - - @override - async def on_request(self, path: str, body: Optional[str] = None, headers: Optional[dict] = None): - await self.publish_request(path, body, headers) - - @override - async def on_response(self, path: str, body: Optional[str] = None, headers: Optional[dict] = None): - await self.publish_response(path, body, headers) - - class MqttGatewaySaicApiListener(SaicApiListener, MqttGatewayListenerApiListener): def __init__(self, publisher: Publisher): super().__init__(publisher, INTERNAL_API) diff --git a/tests/test_mqtt_publisher.py b/tests/test_mqtt_publisher.py index 174cd546..f67f46d4 100644 --- a/tests/test_mqtt_publisher.py +++ b/tests/test_mqtt_publisher.py @@ -2,7 +2,8 @@ from typing import override from configuration import Configuration, TransportProtocol -from publisher.mqtt_publisher import MqttClient, MqttCommandListener +from publisher.core import MqttCommandListener +from publisher.mqtt_publisher import MqttClient USER = 'me@home.da' VIN = 'vin10000000000000' diff --git a/tests/test_vehicle_handler.py b/tests/test_vehicle_handler.py index 05a93a50..a008159b 100644 --- a/tests/test_vehicle_handler.py +++ b/tests/test_vehicle_handler.py @@ -171,7 +171,7 @@ def setUp(self) -> None: account_prefix = f'/vehicles/{VIN}' scheduler = BlockingScheduler() vehicle_state = VehicleState(publisher, scheduler, account_prefix, vin_info) - self.vehicle_handler = VehicleHandler(config, saicapi, publisher, vin_info, vehicle_state) + self.vehicle_handler = VehicleHandler(config, [], saicapi, publisher, vin_info, vehicle_state) @patch.object(SaicApi, 'get_vehicle_status') async def test_update_vehicle_status(self, mocked_vehicle_status): diff --git a/vehicle.py b/vehicle.py index b1f6182b..5f254dec 100644 --- a/vehicle.py +++ b/vehicle.py @@ -19,7 +19,6 @@ from saic_ismart_client_ng.api.vehicle_charging.schema import ChrgMgmtData import mqtt_topics -from integrations.openwb.charging_station import ChargingStation from exceptions import MqttGatewayException from publisher.core import Publisher from utils import value_in_range, is_valid_temperature @@ -48,14 +47,12 @@ def __init__( scheduler: BaseScheduler, account_prefix: str, vin_info: VinInfo, - charging_station: Optional[ChargingStation] = None, charge_polling_min_percent: float = 1.0, total_battery_capacity: Optional[float] = None, ): self.publisher = publisher self.__vin_info = vin_info self.mqtt_vin_prefix = f'{account_prefix}' - self.charging_station = charging_station self.last_car_activity = datetime.datetime.min self.last_successful_refresh = datetime.datetime.min self.__last_failed_refresh: datetime.datetime | None = None @@ -351,11 +348,6 @@ def __publish_electric_range(self, raw_value): if value_in_range(raw_value, 1, 65535): electric_range = raw_value / 10.0 self.publisher.publish_float(self.get_topic(mqtt_topics.DRIVETRAIN_RANGE), electric_range) - if ( - self.charging_station - and self.charging_station.range_topic - ): - self.publisher.publish_float(self.charging_station.range_topic, electric_range, True) def set_hv_battery_active(self, hv_battery_active: bool): if ( @@ -532,7 +524,7 @@ def configure_missing(self): f"initial gateway startup from an invalid state {self.refresh_mode}" ) - async def configure_by_message(self, *, topic: str, payload: str): + async def handle_mqtt_command(self, *, topic: str, payload: str) -> bool: payload = payload.lower() match topic: case mqtt_topics.REFRESH_MODE: @@ -566,7 +558,8 @@ async def configure_by_message(self, *, topic: str, payload: str): except ValueError: raise MqttGatewayException(f'Error setting value for payload {payload}') case _: - raise MqttGatewayException(f'Unsupported topic {topic}') + return False + return True def handle_charge_status(self, charge_info_resp: ChrgMgmtDataResp) -> None: charge_mgmt_data = charge_info_resp.chrgMgmtData @@ -636,11 +629,6 @@ def handle_charge_status(self, charge_info_resp: ChrgMgmtDataResp) -> None: soc = charge_mgmt_data.bmsPackSOCDsp / 10.0 if soc <= 100.0: self.publisher.publish_float(self.get_topic(mqtt_topics.DRIVETRAIN_SOC), soc) - if ( - self.charging_station - and self.charging_station.soc_topic - ): - self.publisher.publish_int(self.charging_station.soc_topic, int(soc), True) estd_elec_rng = charge_mgmt_data.bmsEstdElecRng if value_in_range(estd_elec_rng, 0, 65535) and estd_elec_rng != 2047: