diff --git a/docs/MESHCORE.md b/docs/MESHCORE.md index bb6ca00..721a6a3 100644 --- a/docs/MESHCORE.md +++ b/docs/MESHCORE.md @@ -6,7 +6,7 @@ **Phase 1 upload:** when `MESHCORE_UPLOAD_ENABLED=true` and `STORAGE_API_*` are set, selected events upload to `POST /api/meshcore/feeders/{prefix}/packets/ingest/` (12-hex pubkey prefix from `SELF_INFO`). -On first connect to the device, the bot sends one **flood-routed advert** (`send_advert(flood=True)`) so the feeder appears on the mesh without waiting for firmware timing. Periodic API-driven intervals are planned ([#116](https://github.com/pskillen/meshflow-bot/issues/116)). +On first connect the bot sends one **flood-routed advert** (`send_advert(flood=True)`), then repeats on a schedule from the API: `ManagedNode.mc_flood_advert_interval_hours` (default **6h**, range **2–24h**), fetched via `GET /api/meshcore/feeders/{prefix}/bot-config/` when `MESHCORE_UPLOAD_ENABLED` is set. Operators change the interval in the API; the bot picks it up on reconnect or immediately via WebSocket `refresh_feeder_config` ([#116](https://github.com/pskillen/meshflow-bot/issues/116)). ## Transports diff --git a/src/api/StorageAPI.py b/src/api/StorageAPI.py index 7991cce..cca9c07 100644 --- a/src/api/StorageAPI.py +++ b/src/api/StorageAPI.py @@ -142,6 +142,27 @@ def report_bot_version(self) -> bool: logger.exception("Unexpected error reporting bot version: %s", exc) return False + def fetch_bot_config(self) -> Optional[dict]: + """Fetch operator config for this MeshCore feeder (interval hours, etc.).""" + prefix = self._meshcore_feeder_prefix() + if not prefix: + logger.debug("Skipping bot config fetch: MeshCore prefix unavailable") + return None + try: + url = self._meshcore_feeder_url("bot-config/") + response = self._get(url) + return response.json() + except HTTPError as exc: + self._error_counter.increment("storage.fetch_bot_config.http") + logger.error("HTTP error fetching bot config: %s", exc.response.text) + except RequestException as exc: + self._error_counter.increment("storage.fetch_bot_config.network") + logger.error("Network error fetching bot config: %s", exc) + except Exception as exc: + self._error_counter.increment("storage.fetch_bot_config.unexpected") + logger.exception("Unexpected error fetching bot config: %s", exc) + return None + # --- raw packets ------------------------------------------------------ def store_raw_meshcore_packet(self, packet: Any) -> Optional[dict]: diff --git a/src/bot.py b/src/bot.py index f9679e8..4e47619 100644 --- a/src/bot.py +++ b/src/bot.py @@ -122,6 +122,18 @@ def on_traceroute_command(self, target_node_id: int) -> None: return self.radio.send_traceroute(target_node_id) + def on_refresh_feeder_config(self) -> None: + """Re-fetch feeder bot-config from API and reschedule periodic flood adverts.""" + if not hasattr(self.radio, "reschedule_flood_advert_from_config"): + logger.warning( + "refresh_feeder_config ignored: radio does not support MeshCore config" + ) + return + if not self.storage_apis: + logger.warning("refresh_feeder_config ignored: no storage API configured") + return + self.radio.reschedule_flood_advert_from_config(self.storage_apis[0]) + # --- radio event handlers -------------------------------------------- def _on_connection_established(self, event: ConnectionEstablished) -> None: @@ -135,6 +147,11 @@ def _on_connection_established(self, event: ConnectionEstablished) -> None: self.radio, "schedule_channel_sync" ): self.radio.schedule_channel_sync(self.storage_apis) + if event.extras.get("meshcore") and hasattr( + self.radio, "schedule_flood_advert_from_config" + ): + if self.storage_apis: + self.radio.schedule_flood_advert_from_config(self.storage_apis[0]) self.print_nodes() if self.ws_client: self.ws_client.start() diff --git a/src/main.py b/src/main.py index ebd2f1d..e52743b 100644 --- a/src/main.py +++ b/src/main.py @@ -213,6 +213,9 @@ def main() -> None: on_apply_mc_channel_config=( bot.on_apply_mc_channel_config if RADIO_PROTOCOL == "meshcore" else None ), + on_refresh_feeder_config=( + bot.on_refresh_feeder_config if RADIO_PROTOCOL == "meshcore" else None + ), feeder_pubkey_prefix_provider=( (lambda: getattr(radio, "feeder_mc_pubkey_prefix", None)) if RADIO_PROTOCOL == "meshcore" diff --git a/src/meshcore/radio.py b/src/meshcore/radio.py index 30b7579..0e2bfec 100644 --- a/src/meshcore/radio.py +++ b/src/meshcore/radio.py @@ -23,6 +23,10 @@ logger = logging.getLogger(__name__) +DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS = 6.0 +MIN_MC_FLOOD_ADVERT_INTERVAL_HOURS = 2.0 +MAX_MC_FLOOD_ADVERT_INTERVAL_HOURS = 24.0 + # Do not JSON-dump high-frequency command plumbing (still forwarded to handlers). _SKIP_MESHCORE_DUMP_TYPES = frozenset( { @@ -64,6 +68,7 @@ def __init__( self._connected_once = False self._local_node_id: Optional[str] = None self._feeder_mc_pubkey: Optional[str] = None + self._flood_advert_task: Optional[asyncio.Task] = None self._error_counter = get_global_error_counter() self._dump_enabled = os.getenv("MESHCORE_DUMP_ENABLED", "true").lower() in ( @@ -257,6 +262,7 @@ async def _wait_until_shutdown(self) -> None: await asyncio.sleep(0.25) async def _async_shutdown(self) -> None: + self.cancel_flood_advert_periodic() mc = self._meshcore if mc: try: @@ -277,8 +283,40 @@ def dispatch_meshcore_event_for_tests(self, event: Event) -> None: """Synchronous hook for unit tests (same path as async subscriber).""" self._dispatch_meshcore_event(event) + @staticmethod + def parse_flood_advert_interval_hours(config: Optional[dict]) -> float: + """Clamp API config interval to 2–24 hours; default 6.""" + if not config: + return DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS + try: + hours = float(config.get("mc_flood_advert_interval_hours", DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS)) + except (TypeError, ValueError): + return DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS + return max( + MIN_MC_FLOOD_ADVERT_INTERVAL_HOURS, + min(MAX_MC_FLOOD_ADVERT_INTERVAL_HOURS, hours), + ) + + async def _send_flood_advert_once(self, *, log_label: str = "flood advert") -> None: + mc = self._meshcore + if mc is None or not mc.is_connected: + return + try: + result = await mc.commands.send_advert(flood=True) + if result.type == EventType.ERROR: + logger.warning( + "MeshCore send_advert(flood=True) failed (%s): %s", + log_label, + result.payload, + ) + else: + logger.info("MeshCore sent %s", log_label) + except Exception: + self._error_counter.increment("meshcore.send_flood_advert") + logger.exception("MeshCore %s failed", log_label) + def schedule_initial_flood_advert(self) -> None: - """Send one flood-routed advert after first connect (stop-gap until API-driven interval).""" + """Send one flood-routed advert after first connect.""" loop = self._loop if loop is None or not loop.is_running(): logger.warning( @@ -287,24 +325,57 @@ def schedule_initial_flood_advert(self) -> None: return async def _task() -> None: - mc = self._meshcore - if mc is None or not mc.is_connected: - return - try: - result = await mc.commands.send_advert(flood=True) - if result.type == EventType.ERROR: - logger.warning( - "MeshCore send_advert(flood=True) failed: %s", - result.payload, - ) - else: - logger.info("MeshCore sent initial flood advert") - except Exception: - self._error_counter.increment("meshcore.send_initial_flood_advert") - logger.exception("MeshCore initial flood advert failed") + await self._send_flood_advert_once(log_label="initial flood advert") asyncio.create_task(_task()) + def cancel_flood_advert_periodic(self) -> None: + task = getattr(self, "_flood_advert_task", None) + self._flood_advert_task = None + if task is not None and not task.done(): + task.cancel() + + def schedule_flood_advert_periodic(self, interval_hours: float) -> None: + """Schedule recurring flood adverts on the radio asyncio loop.""" + self.cancel_flood_advert_periodic() + loop = self._loop + if loop is None or not loop.is_running(): + logger.warning( + "MeshCore periodic flood advert not scheduled: event loop not running" + ) + return + + hours = max( + MIN_MC_FLOOD_ADVERT_INTERVAL_HOURS, + min(MAX_MC_FLOOD_ADVERT_INTERVAL_HOURS, float(interval_hours)), + ) + logger.info( + "MeshCore scheduling periodic flood advert every %s hour(s)", + hours, + ) + + async def _periodic() -> None: + try: + while not self._shutdown.is_set(): + await asyncio.sleep(hours * 3600.0) + if self._shutdown.is_set(): + break + await self._send_flood_advert_once(log_label="periodic flood advert") + except asyncio.CancelledError: + pass + + self._flood_advert_task = asyncio.create_task(_periodic()) + + def schedule_flood_advert_from_config(self, storage_api) -> None: + """Fetch feeder bot-config from API and start periodic flood adverts.""" + config = storage_api.fetch_bot_config() + hours = self.parse_flood_advert_interval_hours(config) + self.schedule_flood_advert_periodic(hours) + + def reschedule_flood_advert_from_config(self, storage_api) -> None: + """Re-fetch bot-config and reschedule periodic flood adverts.""" + self.schedule_flood_advert_from_config(storage_api) + def schedule_channel_sync(self, storage_apis: list) -> None: """Schedule channel sync on the radio loop (must run on that loop's thread).""" if not storage_apis: diff --git a/src/ws_client.py b/src/ws_client.py index 3034570..cefea35 100644 --- a/src/ws_client.py +++ b/src/ws_client.py @@ -28,6 +28,7 @@ def __init__( api_key: str, on_traceroute: Callable[[int], None], on_apply_mc_channel_config: Optional[Callable[[list], None]] = None, + on_refresh_feeder_config: Optional[Callable[[], None]] = None, on_connect: Optional[Callable[[], None]] = None, on_disconnect: Optional[Callable[[], None]] = None, feeder_pubkey_prefix_provider: Optional[Callable[[], Optional[str]]] = None, @@ -44,6 +45,7 @@ def __init__( self.api_key = api_key self.on_traceroute = on_traceroute self.on_apply_mc_channel_config = on_apply_mc_channel_config + self.on_refresh_feeder_config = on_refresh_feeder_config self.on_connect = on_connect self.on_disconnect = on_disconnect self._feeder_pubkey_prefix_provider = feeder_pubkey_prefix_provider @@ -233,5 +235,29 @@ def _apply_done(t): logger.warning( "MeshflowWSClient: apply_mc_channel_config missing channels or handler" ) + elif cmd_type == "refresh_feeder_config": + if self.on_refresh_feeder_config: + logger.info( + "MeshflowWSClient: received refresh_feeder_config" + ) + task = asyncio.create_task( + asyncio.to_thread(self.on_refresh_feeder_config) + ) + + def _refresh_done(t): + if t.cancelled(): + return + exc = t.exception() + if exc: + logger.warning( + "MeshflowWSClient: refresh_feeder_config failed: %s", + exc, + ) + + task.add_done_callback(_refresh_done) + else: + logger.warning( + "MeshflowWSClient: refresh_feeder_config missing handler" + ) else: logger.debug(f"MeshflowWSClient: ignored command type: {cmd_type}") diff --git a/test/meshcore/test_flood_advert_scheduler.py b/test/meshcore/test_flood_advert_scheduler.py new file mode 100644 index 0000000..5e06adc --- /dev/null +++ b/test/meshcore/test_flood_advert_scheduler.py @@ -0,0 +1,65 @@ +"""Tests for API-driven periodic MeshCore flood adverts.""" + +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +from src.meshcore.radio import ( + DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS, + MeshCoreRadio, +) + + +def test_parse_flood_advert_interval_hours_defaults() -> None: + assert MeshCoreRadio.parse_flood_advert_interval_hours(None) == DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS + assert MeshCoreRadio.parse_flood_advert_interval_hours({}) == DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS + + +def test_parse_flood_advert_interval_hours_clamps() -> None: + assert MeshCoreRadio.parse_flood_advert_interval_hours( + {"mc_flood_advert_interval_hours": 1} + ) == 2.0 + assert MeshCoreRadio.parse_flood_advert_interval_hours( + {"mc_flood_advert_interval_hours": 48} + ) == 24.0 + assert MeshCoreRadio.parse_flood_advert_interval_hours( + {"mc_flood_advert_interval_hours": 12} + ) == 12.0 + + +def test_schedule_flood_advert_from_config() -> None: + storage = MagicMock() + storage.fetch_bot_config.return_value = {"mc_flood_advert_interval_hours": 8} + + radio = MeshCoreRadio.__new__(MeshCoreRadio) + radio._flood_advert_task = None # noqa: SLF001 + radio.schedule_flood_advert_periodic = MagicMock() + radio.schedule_flood_advert_from_config(storage) + + storage.fetch_bot_config.assert_called_once() + radio.schedule_flood_advert_periodic.assert_called_once_with(8.0) + + +def test_cancel_flood_advert_periodic() -> None: + async def _runner() -> None: + radio = MeshCoreRadio.__new__(MeshCoreRadio) + loop = asyncio.get_running_loop() + radio._loop = loop # noqa: SLF001 + radio._flood_advert_task = None # noqa: SLF001 + import threading + + radio._shutdown = threading.Event() + radio._meshcore = MagicMock() + radio._meshcore.is_connected = True + radio._error_counter = MagicMock() + + radio.schedule_flood_advert_periodic(2.0) + task = radio._flood_advert_task + assert task is not None + radio.cancel_flood_advert_periodic() + assert radio._flood_advert_task is None + task.cancel() + await asyncio.sleep(0) + + asyncio.run(_runner())