From 07fb93ff4905537234e29593f48652605c4f1dff Mon Sep 17 00:00:00 2001 From: Wanbogang Date: Tue, 3 Mar 2026 20:24:44 +0700 Subject: [PATCH 1/2] feat: add generic AirQualityInput plugin with connector pattern - Add AirQualityInput plugin supporting multiple sensors/APIs - Add connector/base.py: AirQualityData dataclass, AirQualityConnector ABC, get_aqi_level() - Add connector/aqicn.py: AQICN cloud API connector (no hardware needed) - Add connector/pms5003.py: PMS5003/PMS7003 particulate sensor via Serial - Add connector/bme680.py: BME680 environmental sensor via I2C - Add 88 tests covering all connectors and plugin behavior (100% coverage) - Patch inputs/__init__.py auto-discovery to support subpackage plugins - Fix aqi_label/aqi_description unbound variable in _raw_to_text --- src/inputs/__init__.py | 34 +- src/inputs/plugins/air_quality/__init__.py | 224 +++++++++++ .../plugins/air_quality/connector/__init__.py | 0 .../plugins/air_quality/connector/aqicn.py | 128 ++++++ .../plugins/air_quality/connector/base.py | 132 +++++++ .../plugins/air_quality/connector/bme680.py | 133 +++++++ .../plugins/air_quality/connector/pms5003.py | 190 +++++++++ tests/inputs/plugins/air_quality/__init__.py | 0 .../plugins/air_quality/connector/__init__.py | 0 .../air_quality/connector/test_aqicn.py | 250 ++++++++++++ .../air_quality/connector/test_base.py | 124 ++++++ .../air_quality/connector/test_bme680.py | 187 +++++++++ .../air_quality/connector/test_pms5003.py | 217 ++++++++++ tests/inputs/plugins/test_air_quality.py | 369 ++++++++++++++++++ 14 files changed, 1978 insertions(+), 10 deletions(-) create mode 100644 src/inputs/plugins/air_quality/__init__.py create mode 100644 src/inputs/plugins/air_quality/connector/__init__.py create mode 100644 src/inputs/plugins/air_quality/connector/aqicn.py create mode 100644 src/inputs/plugins/air_quality/connector/base.py create mode 100644 src/inputs/plugins/air_quality/connector/bme680.py create mode 100644 src/inputs/plugins/air_quality/connector/pms5003.py create mode 100644 tests/inputs/plugins/air_quality/__init__.py create mode 100644 tests/inputs/plugins/air_quality/connector/__init__.py create mode 100644 tests/inputs/plugins/air_quality/connector/test_aqicn.py create mode 100644 tests/inputs/plugins/air_quality/connector/test_base.py create mode 100644 tests/inputs/plugins/air_quality/connector/test_bme680.py create mode 100644 tests/inputs/plugins/air_quality/connector/test_pms5003.py create mode 100644 tests/inputs/plugins/test_air_quality.py diff --git a/src/inputs/__init__.py b/src/inputs/__init__.py index 2460cc765..2ac3338a1 100644 --- a/src/inputs/__init__.py +++ b/src/inputs/__init__.py @@ -12,6 +12,9 @@ def find_module_with_class(class_name: str) -> T.Optional[str]: """ Find which module file contains the specified class name. + Scans both direct .py files and subpackage __init__.py files + inside the plugins directory. + Parameters ---------- class_name : str @@ -27,25 +30,36 @@ def find_module_with_class(class_name: str) -> T.Optional[str]: if not os.path.exists(plugins_dir): return None - plugin_files = [f for f in os.listdir(plugins_dir) if f.endswith(".py")] + pattern = rf"^class\s+{re.escape(class_name)}\s*\([^)]*FuserInput[^)]*\)\s*:" - for plugin_file in plugin_files: + # Scan direct .py files in plugins/ + for plugin_file in os.listdir(plugins_dir): + if not plugin_file.endswith(".py"): + continue file_path = os.path.join(plugins_dir, plugin_file) - try: with open(file_path, "r", encoding="utf-8") as f: content = f.read() - - pattern = ( - rf"^class\s+{re.escape(class_name)}\s*\([^)]*FuserInput[^)]*\)\s*:" - ) - if re.search(pattern, content, re.MULTILINE): return plugin_file[:-3] - except Exception as e: logging.warning(f"Could not read {plugin_file}: {e}") + + # Scan subpackage __init__.py files in plugins/*/ + for entry in os.listdir(plugins_dir): + entry_path = os.path.join(plugins_dir, entry) + if not os.path.isdir(entry_path) or entry.startswith("_"): + continue + init_path = os.path.join(entry_path, "__init__.py") + if not os.path.exists(init_path): continue + try: + with open(init_path, "r", encoding="utf-8") as f: + content = f.read() + if re.search(pattern, content, re.MULTILINE): + return entry + except Exception as e: + logging.warning(f"Could not read {init_path}: {e}") return None @@ -99,7 +113,7 @@ def load_input(input_config: T.Dict[str, T.Any]) -> Sensor: **(config_dict if isinstance(config_dict, dict) else {}) ) - logging.debug(f"Loaded input {class_name} from {module_name}.py") + logging.debug(f"Loaded input {class_name} from {module_name}") return input_class(config=config) except ImportError as e: diff --git a/src/inputs/plugins/air_quality/__init__.py b/src/inputs/plugins/air_quality/__init__.py new file mode 100644 index 000000000..6c8a5ee93 --- /dev/null +++ b/src/inputs/plugins/air_quality/__init__.py @@ -0,0 +1,224 @@ +import asyncio +import logging +import time +from typing import Optional + +from pydantic import Field + +from inputs.base import Message, SensorConfig +from inputs.base.loop import FuserInput +from inputs.plugins.air_quality.connector.aqicn import AqicnConnector +from inputs.plugins.air_quality.connector.base import ( + AirQualityConnector, + AirQualityData, + get_aqi_level, +) +from inputs.plugins.air_quality.connector.bme680 import BME680Connector +from inputs.plugins.air_quality.connector.pms5003 import PMS5003Connector +from providers.io_provider import IOProvider + +CONNECTORS: dict[str, type[AirQualityConnector]] = { + "aqicn": AqicnConnector, + "pms5003": PMS5003Connector, + "bme680": BME680Connector, +} + + +class AirQualityConfig(SensorConfig): + """ + Configuration for AirQuality Input. + + Parameters + ---------- + connector : str + Connector type: 'aqicn', 'pms5003', or 'bme680'. + connector_config : dict + Connector-specific configuration passed directly to the connector. + aqicn → api_key, latitude, longitude + pms5003 → port, location + bme680 → i2c_address, location, gas_baseline + poll_interval : float + Seconds between air quality reads (default: 300). + aqi_warning_threshold : int + AQI above this value triggers a WARNING (default: 100). + aqi_danger_threshold : int + AQI above this value triggers a DANGER alert (default: 150). + """ + + connector: str = Field( + default="aqicn", + description="Connector type: 'aqicn', 'pms5003', 'bme680'", + ) + connector_config: dict = Field( + default_factory=dict, + description="Connector-specific configuration", + ) + poll_interval: float = Field( + default=300.0, + description="Seconds between air quality reads", + ) + aqi_warning_threshold: int = Field( + default=100, + description="AQI threshold for WARNING alert", + ) + aqi_danger_threshold: int = Field( + default=150, + description="AQI threshold for DANGER alert", + ) + + +class AirQualityInput(FuserInput[AirQualityConfig, Optional[AirQualityData]]): + """ + Generic air quality input that works with any sensor or API connector. + + Reads standardized AirQualityData from the configured connector and + converts it to human-readable text for the LLM. Supports hot-swapping + connectors via config without changing this class. + + Supported connectors: + aqicn — AQICN cloud API (no hardware needed) + pms5003 — PMS5003/PMS7003 particulate sensor via Serial + bme680 — BME680 environmental sensor via I2C + """ + + def __init__(self, config: AirQualityConfig): + super().__init__(config) + + self.io_provider = IOProvider() + self.messages: list[Message] = [] + self.descriptor_for_LLM = "Air Quality" + + self.poll_interval = config.poll_interval + self.aqi_warning_threshold = config.aqi_warning_threshold + self.aqi_danger_threshold = config.aqi_danger_threshold + self._last_poll_time: float = 0 + + connector_class = CONNECTORS.get(config.connector) + if connector_class is None: + raise ValueError( + f"AirQualityInput: unknown connector '{config.connector}'. " + f"Available: {list(CONNECTORS.keys())}" + ) + self._connector: AirQualityConnector = connector_class(config.connector_config) + logging.info(f"AirQualityInput: using connector '{config.connector}'") + + async def _poll(self) -> Optional[AirQualityData]: + """ + Poll connector based on poll_interval. + + Returns + ------- + Optional[AirQualityData] + Fresh data when interval elapsed, None otherwise. + """ + current_time = time.time() + + if current_time - self._last_poll_time < self.poll_interval: + await asyncio.sleep(1.0) + return None + + self._last_poll_time = current_time + await asyncio.sleep(1.0) + + connected = await self._connector.connect() + if not connected: + return None + + data = await self._connector.read() + await self._connector.disconnect() + return data + + async def _raw_to_text( + self, raw_input: Optional[AirQualityData] + ) -> Optional[Message]: + """ + Convert AirQualityData to human-readable message for LLM. + + Parameters + ---------- + raw_input : Optional[AirQualityData] + Standardized air quality data. + + Returns + ------- + Optional[Message] + Formatted message, or None if no data. + """ + if raw_input is None: + return None + + try: + aqi = raw_input.aqi + aqi_label: str = "" + aqi_description: str = "" + parts = [] + + if aqi is not None: + aqi_label, aqi_description = get_aqi_level(aqi) + parts.append( + f"Air Quality in {raw_input.location}: {aqi_label} (AQI: {aqi})" + ) + else: + parts.append(f"Air Quality in {raw_input.location}") + + pollutants = [] + if raw_input.pm25 is not None: + pollutants.append(f"PM2.5: {raw_input.pm25} µg/m³") + if raw_input.pm10 is not None: + pollutants.append(f"PM10: {raw_input.pm10} µg/m³") + if raw_input.co is not None: + pollutants.append(f"CO: {raw_input.co} ppm") + if raw_input.no2 is not None: + pollutants.append(f"NO2: {raw_input.no2} µg/m³") + if raw_input.so2 is not None: + pollutants.append(f"SO2: {raw_input.so2} µg/m³") + if raw_input.o3 is not None: + pollutants.append(f"O3: {raw_input.o3} µg/m³") + if pollutants: + parts.append(", ".join(pollutants)) + + env_data = [] + if raw_input.temperature is not None: + env_data.append(f"Temperature: {raw_input.temperature}°C") + if raw_input.humidity is not None: + env_data.append(f"Humidity: {raw_input.humidity}%") + if env_data: + parts.append(", ".join(env_data)) + + if aqi is not None: + if aqi >= self.aqi_danger_threshold: + parts.append( + f"DANGER: Air quality is {aqi_label} — {aqi_description}" + ) + elif aqi >= self.aqi_warning_threshold: + parts.append( + f"WARNING: Air quality is {aqi_label} — {aqi_description}" + ) + + return Message(timestamp=time.time(), message=". ".join(parts) + ".") + + except Exception as e: + logging.error(f"AirQualityInput: error building message: {e}") + return None + + async def raw_to_text(self, raw_input: Optional[AirQualityData]) -> None: + """Convert raw AirQualityData to a human-readable text message.""" + pending = await self._raw_to_text(raw_input) + if pending is not None: + self.messages.append(pending) + + def formatted_latest_buffer(self) -> Optional[str]: + """Return the latest formatted air quality message, or None if empty.""" + if not self.messages: + return None + + latest = self.messages[-1] + result = ( + f"\nINPUT: {self.descriptor_for_LLM}\n// START\n" + f"{latest.message}\n// END\n" + ) + self.io_provider.add_input( + self.descriptor_for_LLM, latest.message, latest.timestamp + ) + self.messages = [] + return result diff --git a/src/inputs/plugins/air_quality/connector/__init__.py b/src/inputs/plugins/air_quality/connector/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/inputs/plugins/air_quality/connector/aqicn.py b/src/inputs/plugins/air_quality/connector/aqicn.py new file mode 100644 index 000000000..3a1992449 --- /dev/null +++ b/src/inputs/plugins/air_quality/connector/aqicn.py @@ -0,0 +1,128 @@ +import asyncio + +import aiohttp + +from inputs.plugins.air_quality.connector.base import ( + AirQualityConnector, + AirQualityData, +) + + +class AqicnConnector(AirQualityConnector): + """ + Air quality connector using AQICN (World Air Quality Index) API. + + Fetches real-time AQI and pollutant data from waqi.info. + Use this connector when no physical sensor is available. + + API docs: https://aqicn.org/json-api/doc/ + Free token: https://aqicn.org/data-platform/token/ + """ + + def __init__(self, config: dict): + """ + Parameters + ---------- + config : dict + Must contain: + - api_key (str): AQICN token + - latitude (float): location latitude + - longitude (float): location longitude + """ + super().__init__(config) + self.api_key: str = config.get("api_key", "") + self.latitude: float = config.get("latitude", -6.2088) + self.longitude: float = config.get("longitude", 106.8456) + + async def connect(self) -> bool: + """Validate API key and confirm connector is ready.""" + if not self.api_key: + self.logger.warning("AqicnConnector: no API key provided") + return False + return True + + async def disconnect(self) -> None: + """No-op: stateless HTTP connector requires no teardown.""" + pass # Stateless HTTP — nothing to close + + async def read(self) -> AirQualityData | None: + """ + Fetch air quality data from AQICN API. + + Returns + ------- + AirQualityData or None + Parsed data, or None if request failed. + """ + if not self.api_key: + return None + + url = f"https://api.waqi.info/feed/geo:{self.latitude};{self.longitude}/" + params = {"token": self.api_key} + + try: + timeout = aiohttp.ClientTimeout(total=10) + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.get(url, params=params) as response: + if response.status != 200: + self.logger.error( + f"AqicnConnector: HTTP {response.status}: {await response.text()}" + ) + return None + + payload = await response.json() + + if payload.get("status") != "ok": + self.logger.error(f"AqicnConnector: API error: {payload.get('data')}") + return None + + return self._parse(payload) + + except asyncio.TimeoutError: + self.logger.error("AqicnConnector: request timed out") + return None + except aiohttp.ClientError as e: + self.logger.error(f"AqicnConnector: network error: {e}") + return None + except Exception as e: + self.logger.error(f"AqicnConnector: unexpected error: {e}") + return None + + def _parse(self, payload: dict) -> AirQualityData: + """ + Parse AQICN API response into AirQualityData. + + Parameters + ---------- + payload : dict + Raw API response with status == 'ok'. + + Returns + ------- + AirQualityData + """ + data = payload.get("data", {}) + iaqi = data.get("iaqi", {}) + + def get_val(key: str) -> float | None: + entry = iaqi.get(key, {}) + return entry.get("v") if entry else None + + aqi_raw = data.get("aqi", "-") + aqi = int(aqi_raw) if aqi_raw not in ("-", None) else None + + location = data.get("city", {}).get("name", "Unknown") + + return AirQualityData( + aqi=aqi, + pm25=get_val("pm25"), + pm10=get_val("pm10"), + co=get_val("co"), + no2=get_val("no2"), + so2=get_val("so2"), + o3=get_val("o3"), + temperature=get_val("t"), + humidity=get_val("h"), + location=location, + source="aqicn", + ) diff --git a/src/inputs/plugins/air_quality/connector/base.py b/src/inputs/plugins/air_quality/connector/base.py new file mode 100644 index 000000000..d54993ef9 --- /dev/null +++ b/src/inputs/plugins/air_quality/connector/base.py @@ -0,0 +1,132 @@ +import logging +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class AirQualityData: + """ + Standardized air quality data structure. + All connectors must return data in this format. + + Parameters + ---------- + aqi : Optional[int] + Air Quality Index (0-500+). None if not available. + pm25 : Optional[float] + PM2.5 concentration in µg/m³. + pm10 : Optional[float] + PM10 concentration in µg/m³. + co : Optional[float] + Carbon monoxide in ppm. + no2 : Optional[float] + Nitrogen dioxide in µg/m³. + so2 : Optional[float] + Sulfur dioxide in µg/m³. + o3 : Optional[float] + Ozone in µg/m³. + temperature : Optional[float] + Temperature in Celsius. + humidity : Optional[float] + Relative humidity in %. + location : str + Human-readable location name. + source : str + Data source identifier (e.g. 'pms5003', 'bme680', 'aqicn'). + """ + + aqi: Optional[int] = None + pm25: Optional[float] = None + pm10: Optional[float] = None + co: Optional[float] = None + no2: Optional[float] = None + so2: Optional[float] = None + o3: Optional[float] = None + temperature: Optional[float] = None + humidity: Optional[float] = None + location: str = "Unknown" + source: str = "Unknown" + + +# AQI scale berdasarkan standard US EPA +AQI_LEVELS = [ + (50, "GOOD", "Air quality is satisfactory."), + (100, "MODERATE", "Acceptable; some pollutants may concern sensitive groups."), + ( + 150, + "UNHEALTHY FOR SENSITIVE GROUPS", + "Sensitive groups may experience health effects.", + ), + (200, "UNHEALTHY", "Everyone may begin to experience health effects."), + (300, "VERY UNHEALTHY", "Health alert: everyone may experience serious effects."), + ( + float("inf"), + "HAZARDOUS", + "Health warning: emergency conditions for entire population.", + ), +] + + +def get_aqi_level(aqi: int) -> tuple[str, str]: + """ + Get AQI level label and description based on AQI value. + + Parameters + ---------- + aqi : int + AQI value. + + Returns + ------- + tuple[str, str] + (label, description) + """ + for threshold, label, description in AQI_LEVELS: + if aqi <= threshold: + return label, description + return "HAZARDOUS", AQI_LEVELS[-1][2] # pragma: no cover + + +class AirQualityConnector(ABC): + """ + Abstract base class for all air quality connectors. + + Every connector (hardware or API) must implement this interface + so that AirQualityInput stays sensor-agnostic. + """ + + def __init__(self, config: dict): + self.config = config + self.logger = logging.getLogger(self.__class__.__name__) + + @abstractmethod + async def connect(self) -> bool: + """ + Initialize connection to sensor or API. + + Returns + ------- + bool + True if connection successful, False otherwise. + """ + pass + + @abstractmethod + async def read(self) -> Optional[AirQualityData]: + """ + Read air quality data. + + Returns + ------- + Optional[AirQualityData] + Standardized data, or None if read failed. + """ + pass + + @abstractmethod + async def disconnect(self) -> None: + """ + Clean up and close connection. + """ + pass diff --git a/src/inputs/plugins/air_quality/connector/bme680.py b/src/inputs/plugins/air_quality/connector/bme680.py new file mode 100644 index 000000000..877a1446f --- /dev/null +++ b/src/inputs/plugins/air_quality/connector/bme680.py @@ -0,0 +1,133 @@ +import asyncio + +from inputs.plugins.air_quality.connector.base import ( + AirQualityConnector, + AirQualityData, +) + + +class BME680Connector(AirQualityConnector): + """ + Air quality connector for BME680 environmental sensor via I2C. + + Reads temperature, humidity, pressure, and VOC gas resistance. + Uses the bme680 Python library (pip install bme680). + + Wiring (I2C): + VCC → 3.3V + GND → GND + SDA → SDA (e.g. GPIO2 on Raspberry Pi) + SCL → SCL (e.g. GPIO3 on Raspberry Pi) + + Datasheet: https://www.bosch-sensortec.com/products/environmental-sensors/gas-sensors/bme680/ + Library: https://github.com/pimoroni/bme680-python + """ + + def __init__(self, config: dict): + """ + Parameters + ---------- + config : dict + Must contain: + - i2c_address (int, optional): I2C address, default 0x76 + - location (str, optional): location label, default 'Robot' + - gas_baseline (float, optional): baseline gas resistance in Ohms + for IAQ calculation. Calibrate by running sensor in clean air. + """ + super().__init__(config) + self.i2c_address: int = config.get("i2c_address", 0x76) + self.location: str = config.get("location", "Robot") + self.gas_baseline: float = config.get("gas_baseline", 50000.0) + self._sensor = None + + async def connect(self) -> bool: + """Initialize the BME680 sensor over I2C and configure oversampling.""" + try: + import bme680 + + loop = asyncio.get_event_loop() + self._sensor = await loop.run_in_executor( + None, lambda: bme680.BME680(self.i2c_address) # type: ignore[attr-defined] + ) + # Recommended oversampling settings from Bosch + self._sensor.set_humidity_oversample(bme680.OS_2X) # type: ignore[attr-defined] + self._sensor.set_pressure_oversample(bme680.OS_4X) # type: ignore[attr-defined] + self._sensor.set_temperature_oversample(bme680.OS_8X) # type: ignore[attr-defined] + self._sensor.set_filter(bme680.FILTER_SIZE_3) # type: ignore[attr-defined] + self._sensor.set_gas_status(bme680.ENABLE_GAS_MEAS) # type: ignore[attr-defined] + self._sensor.set_gas_heater_temperature(320) + self._sensor.set_gas_heater_duration(150) + self._sensor.select_gas_heater_profile(0) + + self.logger.info( + f"BME680Connector: connected at I2C address {hex(self.i2c_address)}" + ) + return True + + except ImportError: + self.logger.error( + "BME680Connector: bme680 library not installed. Run: pip install bme680" + ) + return False + except Exception as e: + self.logger.error(f"BME680Connector: failed to connect: {e}") + return False + + async def disconnect(self) -> None: + """Release the BME680 sensor reference.""" + self._sensor = None + self.logger.info("BME680Connector: disconnected") + + async def read(self) -> AirQualityData | None: + """ + Read one sample from BME680 sensor. + + Returns + ------- + AirQualityData or None + Environmental data, or None if read failed. + """ + if self._sensor is None: + self.logger.error("BME680Connector: not connected") + return None + + try: + loop = asyncio.get_event_loop() + data = await loop.run_in_executor(None, self._read_sensor) + return data + + except Exception as e: + self.logger.error(f"BME680Connector: read error: {e}") + return None + + def _read_sensor(self) -> AirQualityData | None: + """ + Blocking sensor read — runs in executor. + + Returns + ------- + AirQualityData or None + """ + if self._sensor is None or not self._sensor.get_sensor_data(): + self.logger.warning("BME680Connector: sensor data not ready") + return None + + temperature = round(self._sensor.data.temperature, 1) + humidity = round(self._sensor.data.humidity, 1) + + # IAQ (Indoor Air Quality) score 0-500 from gas resistance + # Higher gas resistance = cleaner air + aqi = None + if self._sensor.data.heat_stable: + gas_resistance = self._sensor.data.gas_resistance + # Normalize: baseline is clean air (AQI ~25), degraded is AQI ~200+ + ratio = gas_resistance / self.gas_baseline + aqi = max(0, min(500, round(25 / max(ratio, 0.01)))) + + return AirQualityData( + aqi=aqi, + temperature=temperature, + humidity=humidity, + location=self.location, + source="bme680", + ) diff --git a/src/inputs/plugins/air_quality/connector/pms5003.py b/src/inputs/plugins/air_quality/connector/pms5003.py new file mode 100644 index 000000000..78921f565 --- /dev/null +++ b/src/inputs/plugins/air_quality/connector/pms5003.py @@ -0,0 +1,190 @@ +import asyncio + +import serial + +from inputs.plugins.air_quality.connector.base import ( + AirQualityConnector, + AirQualityData, +) + + +class PMS5003Connector(AirQualityConnector): + """ + Air quality connector for PMS5003/PMS7003 particulate matter sensor. + + Reads PM1.0, PM2.5, PM10 via UART/Serial. + Sensor sends 32-byte frames at 9600 baud. + + Wiring: + VCC → 5V + GND → GND + TX → RX (e.g. /dev/ttyUSB0 or /dev/ttyAMA0) + + Datasheet: https://www.aqmd.gov/docs/default-source/aq-spec/resources-page/plantower-pms5003-manual_v2-3.pdf + """ + + BAUD_RATE = 9600 + FRAME_LENGTH = 32 + FRAME_START = (0x42, 0x4D) + + def __init__(self, config: dict): + """ + Parameters + ---------- + config : dict + Must contain: + - port (str): serial port, e.g. '/dev/ttyUSB0' + - location (str, optional): location label, default 'Robot' + """ + super().__init__(config) + self.port: str = config.get("port", "/dev/ttyUSB0") + self.location: str = config.get("location", "Robot") + self._serial: serial.Serial | None = None + + async def connect(self) -> bool: + """Open the serial port and connect to the PMS5003/7003 sensor.""" + try: + self._serial = serial.Serial( + self.port, baudrate=self.BAUD_RATE, timeout=2.0 + ) + self.logger.info(f"PMS5003Connector: connected on {self.port}") + return True + except serial.SerialException as e: + self.logger.error(f"PMS5003Connector: failed to connect: {e}") + return False + + async def disconnect(self) -> None: + """Close the serial port if open.""" + if self._serial and self._serial.is_open: + self._serial.close() + self.logger.info("PMS5003Connector: disconnected") + + async def read(self) -> AirQualityData | None: + """ + Read one frame from PMS5003 sensor. + + Returns + ------- + AirQualityData or None + Parsed particulate matter data, or None if read failed. + """ + if self._serial is None or not self._serial.is_open: + self.logger.error("PMS5003Connector: not connected") + return None + + try: + # Run blocking serial read in executor to avoid blocking event loop + loop = asyncio.get_event_loop() + frame = await loop.run_in_executor(None, self._read_frame) + if frame is None: + return None + return self._parse(frame) + + except Exception as e: + self.logger.error(f"PMS5003Connector: read error: {e}") + return None + + def _read_frame(self) -> bytes | None: + """ + Read and validate one 32-byte PMS5003 frame (blocking). + + Returns + ------- + bytes or None + Raw 32-byte frame, or None if invalid/timeout. + """ + # Sync to frame start bytes 0x42 0x4D + if self._serial is None: + return None + while True: + byte = self._serial.read(1) + if not byte: + return None + if byte[0] == self.FRAME_START[0]: + next_byte = self._serial.read(1) + if next_byte and next_byte[0] == self.FRAME_START[1]: + break + + rest = self._serial.read(self.FRAME_LENGTH - 2) + if len(rest) != self.FRAME_LENGTH - 2: + return None + + frame = bytes(self.FRAME_START) + rest + + # Validate checksum — sum of bytes 0..29 == bytes 30..31 (big-endian) + checksum = sum(frame[:30]) & 0xFFFF + expected = (frame[30] << 8) | frame[31] + if checksum != expected: + self.logger.warning("PMS5003Connector: checksum mismatch") + return None + + return frame + + def _parse(self, frame: bytes) -> AirQualityData: + """ + Parse validated 32-byte PMS5003 frame. + + Frame layout (big-endian, each field 2 bytes): + [4:6] PM1.0 standard + [6:8] PM2.5 standard + [8:10] PM10 standard + + Parameters + ---------- + frame : bytes + Validated 32-byte frame. + + Returns + ------- + AirQualityData + """ + + def word(offset: int) -> int: + return (frame[offset] << 8) | frame[offset + 1] + + pm25 = float(word(6)) + pm10 = float(word(8)) + + # Estimate AQI from PM2.5 using US EPA breakpoints + aqi = self._pm25_to_aqi(pm25) + + return AirQualityData( + aqi=aqi, + pm25=pm25, + pm10=pm10, + location=self.location, + source="pms5003", + ) + + @staticmethod + def _pm25_to_aqi(pm25: float) -> int: + """ + Convert PM2.5 concentration to AQI using US EPA formula. + + Parameters + ---------- + pm25 : float + PM2.5 in µg/m³. + + Returns + ------- + int + Estimated AQI value. + """ + # (C_low, C_high, AQI_low, AQI_high) + breakpoints = [ + (0.0, 12.0, 0, 50), + (12.1, 35.4, 51, 100), + (35.5, 55.4, 101, 150), + (55.5, 150.4, 151, 200), + (150.5, 250.4, 201, 300), + (250.5, 350.4, 301, 400), + (350.5, 500.4, 401, 500), + ] + for c_low, c_high, aqi_low, aqi_high in breakpoints: + if c_low <= pm25 <= c_high: + aqi = ((aqi_high - aqi_low) / (c_high - c_low)) * ( + pm25 - c_low + ) + aqi_low + return round(aqi) + return 500 diff --git a/tests/inputs/plugins/air_quality/__init__.py b/tests/inputs/plugins/air_quality/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/inputs/plugins/air_quality/connector/__init__.py b/tests/inputs/plugins/air_quality/connector/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/inputs/plugins/air_quality/connector/test_aqicn.py b/tests/inputs/plugins/air_quality/connector/test_aqicn.py new file mode 100644 index 000000000..737799eae --- /dev/null +++ b/tests/inputs/plugins/air_quality/connector/test_aqicn.py @@ -0,0 +1,250 @@ +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from inputs.plugins.air_quality.connector.aqicn import AqicnConnector +from inputs.plugins.air_quality.connector.base import AirQualityData + + +class TestAqicnConnectorInit: + """Tests for AqicnConnector initialization.""" + + def test_default_values(self): + connector = AqicnConnector({}) + assert connector.api_key == "" + assert connector.latitude == -6.2088 + assert connector.longitude == 106.8456 + + def test_custom_values(self): + config = { + "api_key": "test_token", + "latitude": -6.9667, + "longitude": 110.4167, + } + connector = AqicnConnector(config) + assert connector.api_key == "test_token" + assert connector.latitude == -6.9667 + assert connector.longitude == 110.4167 + + +class TestAqicnConnectorConnect: + """Tests for connect/disconnect.""" + + @pytest.mark.asyncio + async def test_connect_with_api_key(self): + connector = AqicnConnector({"api_key": "valid_token"}) + result = await connector.connect() + assert result is True + + @pytest.mark.asyncio + async def test_connect_without_api_key(self): + connector = AqicnConnector({}) + result = await connector.connect() + assert result is False + + @pytest.mark.asyncio + async def test_disconnect_is_safe(self): + connector = AqicnConnector({"api_key": "token"}) + await connector.disconnect() # should not raise + + +class TestAqicnConnectorRead: + """Tests for read() and _parse().""" + + @pytest.fixture + def connector(self): + return AqicnConnector({"api_key": "test_token"}) + + @pytest.mark.asyncio + async def test_read_returns_none_without_api_key(self): + connector = AqicnConnector({}) + result = await connector.read() + assert result is None + + @pytest.mark.asyncio + async def test_read_success(self, connector): + mock_payload = { + "status": "ok", + "data": { + "aqi": 78, + "city": {"name": "Semarang"}, + "iaqi": { + "pm25": {"v": 22.5}, + "pm10": {"v": 45.0}, + "co": {"v": 0.5}, + "no2": {"v": 12.0}, + "t": {"v": 31.0}, + "h": {"v": 80.0}, + }, + }, + } + + with patch( + "inputs.plugins.air_quality.connector.aqicn.aiohttp.ClientSession" + ) as mock_session: + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value=mock_payload) + + mock_get = MagicMock() + mock_get.__aenter__ = AsyncMock(return_value=mock_response) + mock_get.__aexit__ = AsyncMock(return_value=None) + + mock_session_instance = MagicMock() + mock_session_instance.get = MagicMock(return_value=mock_get) + mock_session_instance.__aenter__ = AsyncMock( + return_value=mock_session_instance + ) + mock_session_instance.__aexit__ = AsyncMock(return_value=None) + + mock_session.return_value = mock_session_instance + + result = await connector.read() + + assert isinstance(result, AirQualityData) + assert result.aqi == 78 + assert result.pm25 == 22.5 + assert result.pm10 == 45.0 + assert result.co == 0.5 + assert result.temperature == 31.0 + assert result.humidity == 80.0 + assert result.location == "Semarang" + assert result.source == "aqicn" + + @pytest.mark.asyncio + async def test_read_http_error(self, connector): + with patch( + "inputs.plugins.air_quality.connector.aqicn.aiohttp.ClientSession" + ) as mock_session: + mock_response = AsyncMock() + mock_response.status = 401 + mock_response.text = AsyncMock(return_value="Unauthorized") + + mock_get = MagicMock() + mock_get.__aenter__ = AsyncMock(return_value=mock_response) + mock_get.__aexit__ = AsyncMock(return_value=None) + + mock_session_instance = MagicMock() + mock_session_instance.get = MagicMock(return_value=mock_get) + mock_session_instance.__aenter__ = AsyncMock( + return_value=mock_session_instance + ) + mock_session_instance.__aexit__ = AsyncMock(return_value=None) + + mock_session.return_value = mock_session_instance + + result = await connector.read() + + assert result is None + + @pytest.mark.asyncio + async def test_read_api_status_error(self, connector): + mock_payload = {"status": "error", "data": "Invalid token"} + + with patch( + "inputs.plugins.air_quality.connector.aqicn.aiohttp.ClientSession" + ) as mock_session: + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.json = AsyncMock(return_value=mock_payload) + + mock_get = MagicMock() + mock_get.__aenter__ = AsyncMock(return_value=mock_response) + mock_get.__aexit__ = AsyncMock(return_value=None) + + mock_session_instance = MagicMock() + mock_session_instance.get = MagicMock(return_value=mock_get) + mock_session_instance.__aenter__ = AsyncMock( + return_value=mock_session_instance + ) + mock_session_instance.__aexit__ = AsyncMock(return_value=None) + + mock_session.return_value = mock_session_instance + + result = await connector.read() + + assert result is None + + @pytest.mark.asyncio + async def test_read_timeout(self, connector): + import asyncio + + with patch( + "inputs.plugins.air_quality.connector.aqicn.aiohttp.ClientSession" + ) as mock_session: + mock_session_instance = MagicMock() + mock_session_instance.__aenter__ = AsyncMock( + side_effect=asyncio.TimeoutError + ) + mock_session_instance.__aexit__ = AsyncMock(return_value=None) + mock_session.return_value = mock_session_instance + + result = await connector.read() + + assert result is None + + def test_parse_missing_iaqi_fields(self, connector): + """Test _parse handles missing iaqi fields gracefully.""" + payload = { + "data": { + "aqi": 50, + "city": {"name": "Test City"}, + "iaqi": {}, + } + } + result = connector._parse(payload) + assert result.aqi == 50 + assert result.pm25 is None + assert result.pm10 is None + assert result.location == "Test City" + assert result.source == "aqicn" + + def test_parse_aqi_dash(self, connector): + """Test _parse handles AQI '-' (unavailable) gracefully.""" + payload = { + "data": { + "aqi": "-", + "city": {"name": "Unknown"}, + "iaqi": {}, + } + } + result = connector._parse(payload) + assert result.aqi is None + + +class TestAqicnConnectorExceptions: + """Cover aiohttp.ClientError and generic Exception handlers.""" + + @pytest.fixture + def connector(self): + return AqicnConnector({"api_key": "test_token"}) + + @pytest.mark.asyncio + async def test_read_client_error(self, connector): + import aiohttp + + with patch( + "inputs.plugins.air_quality.connector.aqicn.aiohttp.ClientSession" + ) as mock_session: + mock_session_instance = MagicMock() + mock_session_instance.__aenter__ = AsyncMock( + side_effect=aiohttp.ClientError("network fail") + ) + mock_session_instance.__aexit__ = AsyncMock(return_value=None) + mock_session.return_value = mock_session_instance + result = await connector.read() + assert result is None + + @pytest.mark.asyncio + async def test_read_unexpected_exception(self, connector): + with patch( + "inputs.plugins.air_quality.connector.aqicn.aiohttp.ClientSession" + ) as mock_session: + mock_session_instance = MagicMock() + mock_session_instance.__aenter__ = AsyncMock( + side_effect=RuntimeError("unexpected") + ) + mock_session_instance.__aexit__ = AsyncMock(return_value=None) + mock_session.return_value = mock_session_instance + result = await connector.read() + assert result is None diff --git a/tests/inputs/plugins/air_quality/connector/test_base.py b/tests/inputs/plugins/air_quality/connector/test_base.py new file mode 100644 index 000000000..f2fac69e6 --- /dev/null +++ b/tests/inputs/plugins/air_quality/connector/test_base.py @@ -0,0 +1,124 @@ +import pytest + +from inputs.plugins.air_quality.connector.base import ( + AirQualityConnector, + AirQualityData, + get_aqi_level, +) + + +class TestAirQualityData: + """Tests for AirQualityData dataclass.""" + + def test_default_values(self): + data = AirQualityData() + assert data.aqi is None + assert data.pm25 is None + assert data.pm10 is None + assert data.co is None + assert data.no2 is None + assert data.so2 is None + assert data.o3 is None + assert data.temperature is None + assert data.humidity is None + assert data.location == "Unknown" + assert data.source == "Unknown" + + def test_custom_values(self): + data = AirQualityData( + aqi=75, + pm25=15.2, + pm10=30.5, + temperature=28.0, + humidity=65.0, + location="Semarang", + source="pms5003", + ) + assert data.aqi == 75 + assert data.pm25 == 15.2 + assert data.pm10 == 30.5 + assert data.temperature == 28.0 + assert data.humidity == 65.0 + assert data.location == "Semarang" + assert data.source == "pms5003" + + +class TestGetAqiLevel: + """Tests for get_aqi_level function.""" + + def test_good(self): + label, description = get_aqi_level(25) + assert label == "GOOD" + + def test_moderate(self): + label, _ = get_aqi_level(75) + assert label == "MODERATE" + + def test_unhealthy_sensitive(self): + label, _ = get_aqi_level(125) + assert label == "UNHEALTHY FOR SENSITIVE GROUPS" + + def test_unhealthy(self): + label, _ = get_aqi_level(175) + assert label == "UNHEALTHY" + + def test_very_unhealthy(self): + label, _ = get_aqi_level(250) + assert label == "VERY UNHEALTHY" + + def test_hazardous(self): + label, _ = get_aqi_level(350) + assert label == "HAZARDOUS" + + def test_boundary_good_moderate(self): + label, _ = get_aqi_level(50) + assert label == "GOOD" + label, _ = get_aqi_level(51) + assert label == "MODERATE" + + def test_description_not_empty(self): + for aqi in [25, 75, 125, 175, 250, 350]: + _, description = get_aqi_level(aqi) + assert len(description) > 0 + + +class TestAirQualityConnectorAbstract: + """Tests that AirQualityConnector enforces abstract methods.""" + + def test_cannot_instantiate_directly(self): + with pytest.raises(TypeError): + AirQualityConnector({}) + + def test_concrete_must_implement_all_methods(self): + class IncompleteConnector(AirQualityConnector): + async def connect(self): + return True + + # missing read() and disconnect() + + with pytest.raises(TypeError): + IncompleteConnector({}) + + def test_concrete_full_implementation(self): + class ConcreteConnector(AirQualityConnector): + async def connect(self): + return True + + async def read(self): + return None + + async def disconnect(self): + pass + + connector = ConcreteConnector({}) + assert connector is not None + assert connector.config == {} + + +class TestGetAqiLevelFallback: + """Cover fallback return when AQI > 300.""" + + def test_above_300_returns_hazardous(self): + label, description = get_aqi_level(999) + assert label == "HAZARDOUS" + assert len(description) > 0 diff --git a/tests/inputs/plugins/air_quality/connector/test_bme680.py b/tests/inputs/plugins/air_quality/connector/test_bme680.py new file mode 100644 index 000000000..cd0c833c6 --- /dev/null +++ b/tests/inputs/plugins/air_quality/connector/test_bme680.py @@ -0,0 +1,187 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from inputs.plugins.air_quality.connector.base import AirQualityData +from inputs.plugins.air_quality.connector.bme680 import BME680Connector + + +class TestBME680ConnectorInit: + """Tests for BME680Connector initialization.""" + + def test_default_values(self): + connector = BME680Connector({}) + assert connector.i2c_address == 0x76 + assert connector.location == "Robot" + assert connector.gas_baseline == 50000.0 + assert connector._sensor is None + + def test_custom_values(self): + config = { + "i2c_address": 0x77, + "location": "Indoor", + "gas_baseline": 80000.0, + } + connector = BME680Connector(config) + assert connector.i2c_address == 0x77 + assert connector.location == "Indoor" + assert connector.gas_baseline == 80000.0 + + +class TestBME680ConnectorConnect: + """Tests for connect/disconnect.""" + + @pytest.mark.asyncio + async def test_connect_success(self): + connector = BME680Connector({}) + mock_bme680 = MagicMock() + mock_sensor = MagicMock() + mock_bme680.BME680.return_value = mock_sensor + mock_bme680.OS_2X = 1 + mock_bme680.OS_4X = 2 + mock_bme680.OS_8X = 3 + mock_bme680.FILTER_SIZE_3 = 3 + mock_bme680.ENABLE_GAS_MEAS = 1 + + with patch.dict("sys.modules", {"bme680": mock_bme680}): + result = await connector.connect() + + assert result is True + assert connector._sensor is not None + + @pytest.mark.asyncio + async def test_connect_import_error(self): + connector = BME680Connector({}) + with patch.dict("sys.modules", {"bme680": None}): + result = await connector.connect() + assert result is False + assert connector._sensor is None + + @pytest.mark.asyncio + async def test_connect_hardware_error(self): + connector = BME680Connector({}) + mock_bme680 = MagicMock() + mock_bme680.BME680.side_effect = Exception("I2C error") + + with patch.dict("sys.modules", {"bme680": mock_bme680}): + result = await connector.connect() + + assert result is False + + @pytest.mark.asyncio + async def test_disconnect_clears_sensor(self): + connector = BME680Connector({}) + connector._sensor = MagicMock() + await connector.disconnect() + assert connector._sensor is None + + +class TestBME680ConnectorRead: + """Tests for read() and _read_sensor().""" + + @pytest.mark.asyncio + async def test_read_returns_none_when_not_connected(self): + connector = BME680Connector({}) + result = await connector.read() + assert result is None + + @pytest.mark.asyncio + async def test_read_success(self): + connector = BME680Connector({"location": "Indoor", "gas_baseline": 50000.0}) + connector._sensor = MagicMock() + + mock_data = MagicMock() + mock_data.temperature = 28.5 + mock_data.humidity = 65.0 + mock_data.heat_stable = True + mock_data.gas_resistance = 50000.0 + + connector._sensor.get_sensor_data.return_value = True + connector._sensor.data = mock_data + + result = await connector.read() + + assert isinstance(result, AirQualityData) + assert result.temperature == 28.5 + assert result.humidity == 65.0 + assert result.source == "bme680" + assert result.location == "Indoor" + assert result.aqi is not None + + @pytest.mark.asyncio + async def test_read_returns_none_when_data_not_ready(self): + connector = BME680Connector({}) + connector._sensor = MagicMock() + connector._sensor.get_sensor_data.return_value = False + + result = await connector.read() + assert result is None + + @pytest.mark.asyncio + async def test_read_aqi_none_when_heat_not_stable(self): + connector = BME680Connector({"gas_baseline": 50000.0}) + connector._sensor = MagicMock() + + mock_data = MagicMock() + mock_data.temperature = 25.0 + mock_data.humidity = 60.0 + mock_data.heat_stable = False + + connector._sensor.get_sensor_data.return_value = True + connector._sensor.data = mock_data + + result = await connector.read() + + assert result is not None + assert result.aqi is None + assert result.temperature == 25.0 + + def test_read_sensor_aqi_capped_at_500(self): + """Test AQI does not exceed 500 even with very low gas resistance.""" + connector = BME680Connector({"gas_baseline": 50000.0}) + connector._sensor = MagicMock() + + mock_data = MagicMock() + mock_data.temperature = 30.0 + mock_data.humidity = 70.0 + mock_data.heat_stable = True + mock_data.gas_resistance = 1.0 # Extremely low — very polluted + + connector._sensor.get_sensor_data.return_value = True + connector._sensor.data = mock_data + + result = connector._read_sensor() + assert result is not None + assert result.aqi <= 500 + + def test_read_sensor_aqi_zero_floor(self): + """Test AQI does not go below 0 with very high gas resistance.""" + connector = BME680Connector({"gas_baseline": 50000.0}) + connector._sensor = MagicMock() + + mock_data = MagicMock() + mock_data.temperature = 22.0 + mock_data.humidity = 50.0 + mock_data.heat_stable = True + mock_data.gas_resistance = 999999.0 # Extremely clean air + + connector._sensor.get_sensor_data.return_value = True + connector._sensor.data = mock_data + + result = connector._read_sensor() + assert result is not None + assert result.aqi >= 0 + + +class TestBME680ConnectorReadException: + """Cover except Exception in read().""" + + @pytest.mark.asyncio + async def test_read_executor_raises_exception(self): + connector = BME680Connector({}) + connector._sensor = MagicMock() + with patch.object( + connector, "_read_sensor", side_effect=Exception("executor error") + ): + result = await connector.read() + assert result is None diff --git a/tests/inputs/plugins/air_quality/connector/test_pms5003.py b/tests/inputs/plugins/air_quality/connector/test_pms5003.py new file mode 100644 index 000000000..9143c8dcf --- /dev/null +++ b/tests/inputs/plugins/air_quality/connector/test_pms5003.py @@ -0,0 +1,217 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from inputs.plugins.air_quality.connector.base import AirQualityData +from inputs.plugins.air_quality.connector.pms5003 import PMS5003Connector + + +class TestPMS5003ConnectorInit: + """Tests for PMS5003Connector initialization.""" + + def test_default_values(self): + connector = PMS5003Connector({}) + assert connector.port == "/dev/ttyUSB0" + assert connector.location == "Robot" + assert connector._serial is None + + def test_custom_values(self): + config = {"port": "/dev/ttyAMA0", "location": "Outdoor"} + connector = PMS5003Connector(config) + assert connector.port == "/dev/ttyAMA0" + assert connector.location == "Outdoor" + + +class TestPMS5003ConnectorConnect: + """Tests for connect/disconnect.""" + + @pytest.mark.asyncio + async def test_connect_success(self): + connector = PMS5003Connector({"port": "/dev/ttyUSB0"}) + with patch( + "inputs.plugins.air_quality.connector.pms5003.serial.Serial" + ) as mock_serial: + mock_serial.return_value = MagicMock() + result = await connector.connect() + assert result is True + assert connector._serial is not None + + @pytest.mark.asyncio + async def test_connect_failure(self): + import serial + + connector = PMS5003Connector({"port": "/dev/ttyUSB0"}) + with patch( + "inputs.plugins.air_quality.connector.pms5003.serial.Serial", + side_effect=serial.SerialException("Port not found"), + ): + result = await connector.connect() + assert result is False + assert connector._serial is None + + @pytest.mark.asyncio + async def test_disconnect_closes_serial(self): + connector = PMS5003Connector({}) + mock_serial = MagicMock() + mock_serial.is_open = True + connector._serial = mock_serial + + await connector.disconnect() + mock_serial.close.assert_called_once() + + @pytest.mark.asyncio + async def test_disconnect_when_not_connected(self): + connector = PMS5003Connector({}) + await connector.disconnect() # should not raise + + +class TestPMS5003ConnectorRead: + """Tests for read().""" + + @pytest.fixture + def connected_connector(self): + connector = PMS5003Connector({"port": "/dev/ttyUSB0", "location": "Robot"}) + connector._serial = MagicMock() + connector._serial.is_open = True + return connector + + @pytest.mark.asyncio + async def test_read_returns_none_when_not_connected(self): + connector = PMS5003Connector({}) + result = await connector.read() + assert result is None + + @pytest.mark.asyncio + async def test_read_success(self, connected_connector): + # Build valid 32-byte PMS5003 frame + # PM2.5 = 35 µg/m³ at bytes [6:8], PM10 = 60 µg/m³ at bytes [8:10] + frame = bytearray(32) + frame[0] = 0x42 + frame[1] = 0x4D + frame[6] = 0x00 + frame[7] = 35 # PM2.5 + frame[8] = 0x00 + frame[9] = 60 # PM10 + checksum = sum(frame[:30]) & 0xFFFF + frame[30] = (checksum >> 8) & 0xFF + frame[31] = checksum & 0xFF + + with patch.object( + connected_connector, "_read_frame", return_value=bytes(frame) + ): + result = await connected_connector.read() + + assert isinstance(result, AirQualityData) + assert result.pm25 == 35.0 + assert result.pm10 == 60.0 + assert result.source == "pms5003" + assert result.location == "Robot" + assert result.aqi is not None + + @pytest.mark.asyncio + async def test_read_returns_none_on_bad_frame(self, connected_connector): + with patch.object(connected_connector, "_read_frame", return_value=None): + result = await connected_connector.read() + assert result is None + + +class TestPMS5003PM25ToAQI: + """Tests for _pm25_to_aqi static method.""" + + def test_good_range(self): + aqi = PMS5003Connector._pm25_to_aqi(5.0) + assert 0 <= aqi <= 50 + + def test_moderate_range(self): + aqi = PMS5003Connector._pm25_to_aqi(20.0) + assert 51 <= aqi <= 100 + + def test_unhealthy_sensitive_range(self): + aqi = PMS5003Connector._pm25_to_aqi(40.0) + assert 101 <= aqi <= 150 + + def test_unhealthy_range(self): + aqi = PMS5003Connector._pm25_to_aqi(100.0) + assert 151 <= aqi <= 200 + + def test_very_unhealthy_range(self): + aqi = PMS5003Connector._pm25_to_aqi(200.0) + assert 201 <= aqi <= 300 + + def test_hazardous_range(self): + aqi = PMS5003Connector._pm25_to_aqi(400.0) + assert 301 <= aqi <= 500 + + def test_above_max_returns_500(self): + aqi = PMS5003Connector._pm25_to_aqi(600.0) + assert aqi == 500 + + +class TestPMS5003ReadFrame: + """Cover _read_frame internals: sync loop, checksum, short frame.""" + + @pytest.fixture + def connector(self): + c = PMS5003Connector({"port": "/dev/ttyUSB0", "location": "Robot"}) + c._serial = MagicMock() + return c + + def test_read_frame_returns_none_when_serial_none(self): + connector = PMS5003Connector({}) + result = connector._read_frame() + assert result is None + + def test_read_frame_empty_byte_returns_none(self, connector): + connector._serial.read.return_value = b"" + result = connector._read_frame() + assert result is None + + def test_read_frame_checksum_mismatch_returns_none(self, connector): + frame = bytearray(32) + frame[0] = 0x42 + frame[1] = 0x4D + # Wrong checksum + frame[30] = 0xFF + frame[31] = 0xFF + + connector._serial.read.side_effect = [ + bytes([0x42]), # sync byte 1 + bytes([0x4D]), # sync byte 2 + bytes(frame[2:]), # rest + ] + result = connector._read_frame() + assert result is None + + def test_read_frame_short_rest_returns_none(self, connector): + connector._serial.read.side_effect = [ + bytes([0x42]), + bytes([0x4D]), + bytes(5), # too short + ] + result = connector._read_frame() + assert result is None + + def test_read_frame_valid(self, connector): + frame = bytearray(32) + frame[0] = 0x42 + frame[1] = 0x4D + checksum = sum(frame[:30]) & 0xFFFF + frame[30] = (checksum >> 8) & 0xFF + frame[31] = checksum & 0xFF + + connector._serial.read.side_effect = [ + bytes([0x42]), + bytes([0x4D]), + bytes(frame[2:]), + ] + result = connector._read_frame() + assert result is not None + assert len(result) == 32 + + @pytest.mark.asyncio + async def test_read_exception_returns_none(self, connector): + with patch.object( + connector, "_read_frame", side_effect=Exception("read error") + ): + result = await connector.read() + assert result is None diff --git a/tests/inputs/plugins/test_air_quality.py b/tests/inputs/plugins/test_air_quality.py new file mode 100644 index 000000000..5538c1b1c --- /dev/null +++ b/tests/inputs/plugins/test_air_quality.py @@ -0,0 +1,369 @@ +import time +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from inputs.plugins.air_quality import AirQualityConfig, AirQualityInput +from inputs.plugins.air_quality.connector.base import AirQualityData + + +class TestAirQualityConfig: + """Tests for AirQualityConfig.""" + + def test_default_values(self): + config = AirQualityConfig() + assert config.connector == "aqicn" + assert config.connector_config == {} + assert config.poll_interval == 300.0 + assert config.aqi_warning_threshold == 100 + assert config.aqi_danger_threshold == 150 + + def test_custom_values(self): + config = AirQualityConfig( + connector="pms5003", + connector_config={"port": "/dev/ttyUSB0"}, + poll_interval=60.0, + aqi_warning_threshold=75, + aqi_danger_threshold=125, + ) + assert config.connector == "pms5003" + assert config.connector_config == {"port": "/dev/ttyUSB0"} + assert config.poll_interval == 60.0 + assert config.aqi_warning_threshold == 75 + assert config.aqi_danger_threshold == 125 + + +class TestAirQualityInputInit: + """Tests for AirQualityInput initialization.""" + + @pytest.fixture + def mock_io_provider(self): + with patch("inputs.plugins.air_quality.IOProvider") as mock: + yield mock + + def test_init_default_connector(self, mock_io_provider): + config = AirQualityConfig() + plugin = AirQualityInput(config) + assert plugin.descriptor_for_LLM == "Air Quality" + assert plugin.poll_interval == 300.0 + assert plugin.aqi_warning_threshold == 100 + assert plugin.aqi_danger_threshold == 150 + assert plugin.messages == [] + + def test_init_pms5003_connector(self, mock_io_provider): + config = AirQualityConfig( + connector="pms5003", + connector_config={"port": "/dev/ttyUSB0"}, + ) + plugin = AirQualityInput(config) + assert plugin._connector is not None + + def test_init_bme680_connector(self, mock_io_provider): + config = AirQualityConfig(connector="bme680") + plugin = AirQualityInput(config) + assert plugin._connector is not None + + def test_init_unknown_connector_raises(self, mock_io_provider): + config = AirQualityConfig(connector="unknown_sensor") + with pytest.raises(ValueError, match="unknown connector"): + AirQualityInput(config) + + +class TestAirQualityInputPoll: + """Tests for _poll() behavior.""" + + @pytest.fixture + def mock_io_provider(self): + with patch("inputs.plugins.air_quality.IOProvider") as mock: + yield mock + + @pytest.fixture + def plugin(self, mock_io_provider): + config = AirQualityConfig(poll_interval=60.0) + return AirQualityInput(config) + + @pytest.mark.asyncio + async def test_poll_returns_data_on_first_call(self, plugin): + mock_data = AirQualityData(aqi=75, pm25=18.0, location="Test", source="aqicn") + + with ( + patch.object( + plugin._connector, "connect", new_callable=AsyncMock, return_value=True + ), + patch.object( + plugin._connector, + "read", + new_callable=AsyncMock, + return_value=mock_data, + ), + patch.object(plugin._connector, "disconnect", new_callable=AsyncMock), + ): + result = await plugin._poll() + + assert result == mock_data + + @pytest.mark.asyncio + async def test_poll_returns_none_before_interval(self, plugin): + mock_data = AirQualityData(aqi=75, location="Test", source="aqicn") + + with ( + patch.object( + plugin._connector, "connect", new_callable=AsyncMock, return_value=True + ), + patch.object( + plugin._connector, + "read", + new_callable=AsyncMock, + return_value=mock_data, + ), + patch.object(plugin._connector, "disconnect", new_callable=AsyncMock), + ): + await plugin._poll() + result = await plugin._poll() + + assert result is None + + @pytest.mark.asyncio + async def test_poll_fetches_again_after_interval(self, plugin): + mock_data = AirQualityData(aqi=75, location="Test", source="aqicn") + + with ( + patch.object( + plugin._connector, "connect", new_callable=AsyncMock, return_value=True + ), + patch.object( + plugin._connector, + "read", + new_callable=AsyncMock, + return_value=mock_data, + ), + patch.object(plugin._connector, "disconnect", new_callable=AsyncMock), + ): + await plugin._poll() + plugin._last_poll_time = time.time() - 120.0 + result = await plugin._poll() + + assert result == mock_data + + @pytest.mark.asyncio + async def test_poll_returns_none_when_connect_fails(self, plugin): + with patch.object( + plugin._connector, "connect", new_callable=AsyncMock, return_value=False + ): + result = await plugin._poll() + assert result is None + + @pytest.mark.asyncio + async def test_poll_disconnects_after_read(self, plugin): + mock_data = AirQualityData(aqi=50, location="Test", source="aqicn") + + with ( + patch.object( + plugin._connector, "connect", new_callable=AsyncMock, return_value=True + ), + patch.object( + plugin._connector, + "read", + new_callable=AsyncMock, + return_value=mock_data, + ) as mock_read, + patch.object( + plugin._connector, "disconnect", new_callable=AsyncMock + ) as mock_disconnect, + ): + await plugin._poll() + + mock_read.assert_called_once() + mock_disconnect.assert_called_once() + + +class TestAirQualityInputRawToText: + """Tests for _raw_to_text conversion.""" + + @pytest.fixture + def mock_io_provider(self): + with patch("inputs.plugins.air_quality.IOProvider") as mock: + yield mock + + @pytest.fixture + def plugin(self, mock_io_provider): + config = AirQualityConfig() + return AirQualityInput(config) + + @pytest.mark.asyncio + async def test_raw_to_text_none_returns_none(self, plugin): + result = await plugin._raw_to_text(None) + assert result is None + + @pytest.mark.asyncio + async def test_raw_to_text_full_data(self, plugin): + data = AirQualityData( + aqi=78, + pm25=22.5, + pm10=45.0, + temperature=31.0, + humidity=80.0, + location="Semarang", + source="aqicn", + ) + result = await plugin._raw_to_text(data) + assert result is not None + assert "Semarang" in result.message + assert "78" in result.message + assert "PM2.5" in result.message + assert "PM10" in result.message + assert "31.0" in result.message + assert "80.0" in result.message + + @pytest.mark.asyncio + async def test_raw_to_text_warning_threshold(self, plugin): + data = AirQualityData(aqi=110, location="Test", source="aqicn") + result = await plugin._raw_to_text(data) + assert result is not None + assert "WARNING" in result.message + + @pytest.mark.asyncio + async def test_raw_to_text_danger_threshold(self, plugin): + data = AirQualityData(aqi=175, location="Test", source="aqicn") + result = await plugin._raw_to_text(data) + assert result is not None + assert "DANGER" in result.message + + @pytest.mark.asyncio + async def test_raw_to_text_good_aqi_no_alert(self, plugin): + data = AirQualityData(aqi=40, location="Test", source="aqicn") + result = await plugin._raw_to_text(data) + assert result is not None + assert "WARNING" not in result.message + assert "DANGER" not in result.message + + @pytest.mark.asyncio + async def test_raw_to_text_no_aqi(self, plugin): + """Test that missing AQI is handled gracefully.""" + data = AirQualityData( + temperature=28.0, + humidity=65.0, + location="Indoor", + source="bme680", + ) + result = await plugin._raw_to_text(data) + assert result is not None + assert "Indoor" in result.message + + @pytest.mark.asyncio + async def test_raw_to_text_partial_data(self, plugin): + """Test that missing optional fields are skipped.""" + data = AirQualityData(aqi=55, location="Test", source="pms5003") + result = await plugin._raw_to_text(data) + assert result is not None + assert "µg/m³" not in result.message # no pollutant data + + +class TestAirQualityInputFormatted: + """Tests for formatted_latest_buffer.""" + + @pytest.fixture + def mock_io_provider(self): + with patch("inputs.plugins.air_quality.IOProvider") as mock: + mock_instance = MagicMock() + mock.return_value = mock_instance + yield mock_instance + + @pytest.fixture + def plugin(self, mock_io_provider): + config = AirQualityConfig() + plugin = AirQualityInput(config) + plugin.io_provider = mock_io_provider + return plugin + + def test_formatted_empty_buffer_returns_none(self, plugin): + result = plugin.formatted_latest_buffer() + assert result is None + + @pytest.mark.asyncio + async def test_formatted_with_message(self, plugin): + data = AirQualityData(aqi=78, location="Semarang", source="aqicn") + await plugin.raw_to_text(data) + + result = plugin.formatted_latest_buffer() + assert result is not None + assert "Air Quality" in result + assert "// START" in result + assert "// END" in result + assert "Semarang" in result + + @pytest.mark.asyncio + async def test_formatted_clears_buffer(self, plugin): + data = AirQualityData(aqi=78, location="Test", source="aqicn") + await plugin.raw_to_text(data) + assert len(plugin.messages) == 1 + + plugin.formatted_latest_buffer() + assert len(plugin.messages) == 0 + + @pytest.mark.asyncio + async def test_formatted_calls_io_provider(self, plugin): + data = AirQualityData(aqi=78, location="Test", source="aqicn") + await plugin.raw_to_text(data) + plugin.formatted_latest_buffer() + plugin.io_provider.add_input.assert_called_once() + + +class TestAirQualityInputMissingCoverage: + """Additional tests to reach 100% coverage.""" + + @pytest.fixture + def mock_io_provider(self): + with patch("inputs.plugins.air_quality.IOProvider") as mock: + yield mock + + @pytest.fixture + def plugin(self, mock_io_provider): + config = AirQualityConfig() + return AirQualityInput(config) + + @pytest.mark.asyncio + async def test_raw_to_text_with_so2_no2_co_o3(self, plugin): + """Cover so2, no2, co, o3 pollutant branches.""" + data = AirQualityData( + aqi=55, + co=0.8, + no2=15.0, + so2=5.0, + o3=60.0, + location="Test", + source="aqicn", + ) + result = await plugin._raw_to_text(data) + assert result is not None + assert "CO" in result.message + assert "NO2" in result.message + assert "SO2" in result.message + assert "O3" in result.message + + @pytest.mark.asyncio + async def test_raw_to_text_none_does_not_append(self, plugin): + """Cover raw_to_text when _raw_to_text returns None (pending is None).""" + with patch.object( + plugin, "_raw_to_text", new_callable=AsyncMock, return_value=None + ): + await plugin.raw_to_text(AirQualityData(location="Test", source="x")) + assert len(plugin.messages) == 0 + + @pytest.mark.asyncio + async def test_raw_to_text_exception_returns_none(self, plugin): + """Cover except Exception in _raw_to_text.""" + data = MagicMock() + data.aqi = "not_a_number" # will cause comparison error + data.location = "Test" + data.source = "x" + data.pm25 = None + data.pm10 = None + data.co = None + data.no2 = None + data.so2 = None + data.o3 = None + data.temperature = None + data.humidity = None + result = await plugin._raw_to_text(data) + assert result is None From 6318121b2de68088670c0f5c14908aea62de0dd6 Mon Sep 17 00:00:00 2001 From: Wanbogang Date: Tue, 3 Mar 2026 21:06:53 +0700 Subject: [PATCH 2/2] fix: resolve CI/CD pyright errors and coverage gaps - Fix test_base.py: add type: ignore[abstract] for abstract instantiation - Fix test_bme680.py: assert aqi is not None before comparison - Add tests for subpackage scanning in inputs/__init__.py (100% coverage) - Add tests for load_input error paths (ImportError, AttributeError) - Move inline 'import types' to top-level in test_init.py - Use setattr() for ModuleType attribute assignment (pyright) --- .../air_quality/connector/test_base.py | 4 +- .../air_quality/connector/test_bme680.py | 2 + tests/inputs/test_init.py | 153 +++++++++++++++++- 3 files changed, 153 insertions(+), 6 deletions(-) diff --git a/tests/inputs/plugins/air_quality/connector/test_base.py b/tests/inputs/plugins/air_quality/connector/test_base.py index f2fac69e6..7d5a47c69 100644 --- a/tests/inputs/plugins/air_quality/connector/test_base.py +++ b/tests/inputs/plugins/air_quality/connector/test_base.py @@ -87,7 +87,7 @@ class TestAirQualityConnectorAbstract: def test_cannot_instantiate_directly(self): with pytest.raises(TypeError): - AirQualityConnector({}) + AirQualityConnector({}) # type: ignore[abstract] def test_concrete_must_implement_all_methods(self): class IncompleteConnector(AirQualityConnector): @@ -97,7 +97,7 @@ async def connect(self): # missing read() and disconnect() with pytest.raises(TypeError): - IncompleteConnector({}) + IncompleteConnector({}) # type: ignore[abstract] def test_concrete_full_implementation(self): class ConcreteConnector(AirQualityConnector): diff --git a/tests/inputs/plugins/air_quality/connector/test_bme680.py b/tests/inputs/plugins/air_quality/connector/test_bme680.py index cd0c833c6..9e4f300b9 100644 --- a/tests/inputs/plugins/air_quality/connector/test_bme680.py +++ b/tests/inputs/plugins/air_quality/connector/test_bme680.py @@ -152,6 +152,7 @@ def test_read_sensor_aqi_capped_at_500(self): result = connector._read_sensor() assert result is not None + assert result.aqi is not None assert result.aqi <= 500 def test_read_sensor_aqi_zero_floor(self): @@ -170,6 +171,7 @@ def test_read_sensor_aqi_zero_floor(self): result = connector._read_sensor() assert result is not None + assert result.aqi is not None assert result.aqi >= 0 diff --git a/tests/inputs/test_init.py b/tests/inputs/test_init.py index d36be6044..de67db0b7 100644 --- a/tests/inputs/test_init.py +++ b/tests/inputs/test_init.py @@ -1,12 +1,16 @@ +import types from unittest.mock import Mock, mock_open, patch import pytest from inputs import find_module_with_class, load_input -from inputs.base import Sensor +from inputs.base import Sensor, SensorConfig class MockInput(Sensor): + def __init__(self, config=None): + pass + async def raw_to_text(self, raw_input): pass @@ -14,9 +18,8 @@ def formatted_latest_buffer(self): return None -class MockConfig: - def __init__(self, **kwargs): - pass +class MockConfig(SensorConfig): + pass def test_load_input_success(): @@ -135,3 +138,145 @@ def test_find_module_with_class_no_plugins_dir(): result = find_module_with_class("TestInput") assert result is None + + +def test_find_module_with_class_in_subpackage(): + """Cover subpackage __init__.py scanning — found.""" + with ( + patch("os.path.join", side_effect=lambda *args: "/".join(args)), + patch("os.path.exists") as mock_exists, + patch("os.listdir") as mock_listdir, + patch("os.path.isdir") as mock_isdir, + patch( + "builtins.open", + mock_open(read_data="class AirQualityInput(FuserInput):\n pass\n"), + ), + ): + mock_exists.return_value = True + mock_listdir.return_value = ["air_quality"] + mock_isdir.return_value = True + + result = find_module_with_class("AirQualityInput") + assert result == "air_quality" + + +def test_find_module_with_class_subpackage_no_init(): + """Cover subpackage without __init__.py — skipped.""" + with ( + patch("os.path.join", side_effect=lambda *args: "/".join(args)), + patch("os.path.exists") as mock_exists, + patch("os.listdir") as mock_listdir, + patch("os.path.isdir") as mock_isdir, + ): + + def exists_side_effect(path): + if path.endswith("__init__.py"): + return False + return True + + mock_exists.side_effect = exists_side_effect + mock_listdir.return_value = ["air_quality"] + mock_isdir.return_value = True + + result = find_module_with_class("AirQualityInput") + assert result is None + + +def test_find_module_with_class_subpackage_read_error(): + """Cover subpackage __init__.py read exception.""" + with ( + patch("os.path.join", side_effect=lambda *args: "/".join(args)), + patch("os.path.exists", return_value=True), + patch("os.listdir") as mock_listdir, + patch("os.path.isdir") as mock_isdir, + patch("builtins.open", side_effect=OSError("permission denied")), + ): + mock_listdir.return_value = ["air_quality"] + mock_isdir.return_value = True + + result = find_module_with_class("AirQualityInput") + assert result is None + + +def test_find_module_with_class_skips_underscore_dirs(): + """Cover that dirs starting with _ are skipped.""" + with ( + patch("os.path.join", side_effect=lambda *args: "/".join(args)), + patch("os.path.exists", return_value=True), + patch("os.listdir") as mock_listdir, + patch("os.path.isdir", return_value=True), + ): + mock_listdir.return_value = ["__pycache__", "_internal"] + + result = find_module_with_class("AirQualityInput") + assert result is None + + +def test_find_module_with_class_direct_file_read_error(): + """Cover except Exception when reading direct .py plugin file.""" + with ( + patch("os.path.join", side_effect=lambda *args: "/".join(args)), + patch("os.path.exists", return_value=True), + patch("os.listdir", return_value=["broken_input.py"]), + patch("os.path.isdir", return_value=False), + patch("builtins.open", side_effect=OSError("permission denied")), + ): + result = find_module_with_class("BrokenInput") + assert result is None + + +def test_load_input_with_config_class(): + """Cover config_class is not None branch (line 104).""" + with ( + patch("inputs.find_module_with_class") as mock_find, + patch("importlib.import_module") as mock_import, + ): + mock_find.return_value = "mock_input" + mock_module = Mock() + mock_module.MockInput = MockInput + mock_module.MockConfig = MockConfig + mock_import.return_value = mock_module + + result = load_input({"type": "MockInput", "config": {"key": "value"}}) + assert isinstance(result, Sensor) + + +def test_load_input_without_config_class(): + """Cover config_class is None branch (line 108) — no SensorConfig in module.""" + with ( + patch("inputs.find_module_with_class") as mock_find, + patch("importlib.import_module") as mock_import, + ): + mock_find.return_value = "mock_input" + + mock_module = types.ModuleType("mock_input") + setattr(mock_module, "MockInput", MockInput) + mock_import.return_value = mock_module + + result = load_input({"type": "MockInput"}) + assert isinstance(result, Sensor) + + +def test_load_input_import_error(): + """Cover ImportError → ValueError (line 120).""" + with ( + patch("inputs.find_module_with_class") as mock_find, + patch("importlib.import_module", side_effect=ImportError("no module")), + ): + mock_find.return_value = "missing_input" + with pytest.raises(ValueError, match="Could not import input module"): + load_input({"type": "MissingInput"}) + + +def test_load_input_attribute_error(): + """Cover AttributeError → ValueError (line 122).""" + with ( + patch("inputs.find_module_with_class") as mock_find, + patch("importlib.import_module") as mock_import, + ): + mock_find.return_value = "mock_input" + mock_module = Mock(spec=[]) # no attributes + mock_import.return_value = mock_module + + with pytest.raises(ValueError, match="not found in input module"): + load_input({"type": "MockInput"})