Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions src/inputs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ def find_module_with_class(class_name: str) -> T.Optional[str]:
"""
Find which module file contains the specified class name.

Scans both direct .py files and subpackage __init__.py files
inside the plugins directory.

Parameters
----------
class_name : str
Expand All @@ -27,25 +30,36 @@ def find_module_with_class(class_name: str) -> T.Optional[str]:
if not os.path.exists(plugins_dir):
return None

plugin_files = [f for f in os.listdir(plugins_dir) if f.endswith(".py")]
pattern = rf"^class\s+{re.escape(class_name)}\s*\([^)]*FuserInput[^)]*\)\s*:"

for plugin_file in plugin_files:
# Scan direct .py files in plugins/
for plugin_file in os.listdir(plugins_dir):
if not plugin_file.endswith(".py"):
continue
file_path = os.path.join(plugins_dir, plugin_file)

try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()

pattern = (
rf"^class\s+{re.escape(class_name)}\s*\([^)]*FuserInput[^)]*\)\s*:"
)

if re.search(pattern, content, re.MULTILINE):
return plugin_file[:-3]

except Exception as e:
logging.warning(f"Could not read {plugin_file}: {e}")

# Scan subpackage __init__.py files in plugins/*/
for entry in os.listdir(plugins_dir):
entry_path = os.path.join(plugins_dir, entry)
if not os.path.isdir(entry_path) or entry.startswith("_"):
continue
init_path = os.path.join(entry_path, "__init__.py")
if not os.path.exists(init_path):
continue
try:
with open(init_path, "r", encoding="utf-8") as f:
content = f.read()
if re.search(pattern, content, re.MULTILINE):
return entry
except Exception as e:
logging.warning(f"Could not read {init_path}: {e}")

return None

Expand Down Expand Up @@ -99,7 +113,7 @@ def load_input(input_config: T.Dict[str, T.Any]) -> Sensor:
**(config_dict if isinstance(config_dict, dict) else {})
)

logging.debug(f"Loaded input {class_name} from {module_name}.py")
logging.debug(f"Loaded input {class_name} from {module_name}")
return input_class(config=config)

except ImportError as e:
Expand Down
224 changes: 224 additions & 0 deletions src/inputs/plugins/air_quality/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
import asyncio
import logging
import time
from typing import Optional

from pydantic import Field

from inputs.base import Message, SensorConfig
from inputs.base.loop import FuserInput
from inputs.plugins.air_quality.connector.aqicn import AqicnConnector
from inputs.plugins.air_quality.connector.base import (
AirQualityConnector,
AirQualityData,
get_aqi_level,
)
from inputs.plugins.air_quality.connector.bme680 import BME680Connector
from inputs.plugins.air_quality.connector.pms5003 import PMS5003Connector
from providers.io_provider import IOProvider

CONNECTORS: dict[str, type[AirQualityConnector]] = {
"aqicn": AqicnConnector,
"pms5003": PMS5003Connector,
"bme680": BME680Connector,
}


class AirQualityConfig(SensorConfig):
"""
Configuration for AirQuality Input.

Parameters
----------
connector : str
Connector type: 'aqicn', 'pms5003', or 'bme680'.
connector_config : dict
Connector-specific configuration passed directly to the connector.
aqicn → api_key, latitude, longitude
pms5003 → port, location
bme680 → i2c_address, location, gas_baseline
poll_interval : float
Seconds between air quality reads (default: 300).
aqi_warning_threshold : int
AQI above this value triggers a WARNING (default: 100).
aqi_danger_threshold : int
AQI above this value triggers a DANGER alert (default: 150).
"""

connector: str = Field(
default="aqicn",
description="Connector type: 'aqicn', 'pms5003', 'bme680'",
)
connector_config: dict = Field(
default_factory=dict,
description="Connector-specific configuration",
)
poll_interval: float = Field(
default=300.0,
description="Seconds between air quality reads",
)
aqi_warning_threshold: int = Field(
default=100,
description="AQI threshold for WARNING alert",
)
aqi_danger_threshold: int = Field(
default=150,
description="AQI threshold for DANGER alert",
)


class AirQualityInput(FuserInput[AirQualityConfig, Optional[AirQualityData]]):
"""
Generic air quality input that works with any sensor or API connector.

Reads standardized AirQualityData from the configured connector and
converts it to human-readable text for the LLM. Supports hot-swapping
connectors via config without changing this class.

Supported connectors:
aqicn — AQICN cloud API (no hardware needed)
pms5003 — PMS5003/PMS7003 particulate sensor via Serial
bme680 — BME680 environmental sensor via I2C
"""

def __init__(self, config: AirQualityConfig):
super().__init__(config)

self.io_provider = IOProvider()
self.messages: list[Message] = []
self.descriptor_for_LLM = "Air Quality"

self.poll_interval = config.poll_interval
self.aqi_warning_threshold = config.aqi_warning_threshold
self.aqi_danger_threshold = config.aqi_danger_threshold
self._last_poll_time: float = 0

connector_class = CONNECTORS.get(config.connector)
if connector_class is None:
raise ValueError(
f"AirQualityInput: unknown connector '{config.connector}'. "
f"Available: {list(CONNECTORS.keys())}"
)
self._connector: AirQualityConnector = connector_class(config.connector_config)
logging.info(f"AirQualityInput: using connector '{config.connector}'")

async def _poll(self) -> Optional[AirQualityData]:
"""
Poll connector based on poll_interval.

Returns
-------
Optional[AirQualityData]
Fresh data when interval elapsed, None otherwise.
"""
current_time = time.time()

if current_time - self._last_poll_time < self.poll_interval:
await asyncio.sleep(1.0)
return None

self._last_poll_time = current_time
await asyncio.sleep(1.0)

connected = await self._connector.connect()
if not connected:
return None

data = await self._connector.read()
await self._connector.disconnect()
return data

async def _raw_to_text(
self, raw_input: Optional[AirQualityData]
) -> Optional[Message]:
"""
Convert AirQualityData to human-readable message for LLM.

Parameters
----------
raw_input : Optional[AirQualityData]
Standardized air quality data.

Returns
-------
Optional[Message]
Formatted message, or None if no data.
"""
if raw_input is None:
return None

try:
aqi = raw_input.aqi
aqi_label: str = ""
aqi_description: str = ""
parts = []

if aqi is not None:
aqi_label, aqi_description = get_aqi_level(aqi)
parts.append(
f"Air Quality in {raw_input.location}: {aqi_label} (AQI: {aqi})"
)
else:
parts.append(f"Air Quality in {raw_input.location}")

pollutants = []
if raw_input.pm25 is not None:
pollutants.append(f"PM2.5: {raw_input.pm25} µg/m³")
if raw_input.pm10 is not None:
pollutants.append(f"PM10: {raw_input.pm10} µg/m³")
if raw_input.co is not None:
pollutants.append(f"CO: {raw_input.co} ppm")
if raw_input.no2 is not None:
pollutants.append(f"NO2: {raw_input.no2} µg/m³")
if raw_input.so2 is not None:
pollutants.append(f"SO2: {raw_input.so2} µg/m³")
if raw_input.o3 is not None:
pollutants.append(f"O3: {raw_input.o3} µg/m³")
if pollutants:
parts.append(", ".join(pollutants))

env_data = []
if raw_input.temperature is not None:
env_data.append(f"Temperature: {raw_input.temperature}°C")
if raw_input.humidity is not None:
env_data.append(f"Humidity: {raw_input.humidity}%")
if env_data:
parts.append(", ".join(env_data))

if aqi is not None:
if aqi >= self.aqi_danger_threshold:
parts.append(
f"DANGER: Air quality is {aqi_label} — {aqi_description}"
)
elif aqi >= self.aqi_warning_threshold:
parts.append(
f"WARNING: Air quality is {aqi_label} — {aqi_description}"
)

return Message(timestamp=time.time(), message=". ".join(parts) + ".")

except Exception as e:
logging.error(f"AirQualityInput: error building message: {e}")
return None

async def raw_to_text(self, raw_input: Optional[AirQualityData]) -> None:
"""Convert raw AirQualityData to a human-readable text message."""
pending = await self._raw_to_text(raw_input)
if pending is not None:
self.messages.append(pending)

def formatted_latest_buffer(self) -> Optional[str]:
"""Return the latest formatted air quality message, or None if empty."""
if not self.messages:
return None

latest = self.messages[-1]
result = (
f"\nINPUT: {self.descriptor_for_LLM}\n// START\n"
f"{latest.message}\n// END\n"
)
self.io_provider.add_input(
self.descriptor_for_LLM, latest.message, latest.timestamp
)
self.messages = []
return result
Empty file.
Loading
Loading