Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/MESHCORE.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

**Phase 1 upload:** when `MESHCORE_UPLOAD_ENABLED=true` and `STORAGE_API_*` are set, selected events upload to `POST /api/meshcore/feeders/{prefix}/packets/ingest/` (12-hex pubkey prefix from `SELF_INFO`).

On first connect to the device, the bot sends one **flood-routed advert** (`send_advert(flood=True)`) so the feeder appears on the mesh without waiting for firmware timing. Periodic API-driven intervals are planned ([#116](https://github.com/pskillen/meshflow-bot/issues/116)).
On first connect the bot sends one **flood-routed advert** (`send_advert(flood=True)`), then repeats on a schedule from the API: `ManagedNode.mc_flood_advert_interval_hours` (default **6h**, range **2–24h**), fetched via `GET /api/meshcore/feeders/{prefix}/bot-config/` when `MESHCORE_UPLOAD_ENABLED` is set. Operators change the interval in the API; the bot picks it up on reconnect or immediately via WebSocket `refresh_feeder_config` ([#116](https://github.com/pskillen/meshflow-bot/issues/116)).

## Transports

Expand Down
21 changes: 21 additions & 0 deletions src/api/StorageAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,27 @@ def report_bot_version(self) -> bool:
logger.exception("Unexpected error reporting bot version: %s", exc)
return False

def fetch_bot_config(self) -> Optional[dict]:
"""Fetch operator config for this MeshCore feeder (interval hours, etc.)."""
prefix = self._meshcore_feeder_prefix()
if not prefix:
logger.debug("Skipping bot config fetch: MeshCore prefix unavailable")
return None
try:
url = self._meshcore_feeder_url("bot-config/")
response = self._get(url)
return response.json()
except HTTPError as exc:
self._error_counter.increment("storage.fetch_bot_config.http")
logger.error("HTTP error fetching bot config: %s", exc.response.text)
except RequestException as exc:
self._error_counter.increment("storage.fetch_bot_config.network")
logger.error("Network error fetching bot config: %s", exc)
except Exception as exc:
self._error_counter.increment("storage.fetch_bot_config.unexpected")
logger.exception("Unexpected error fetching bot config: %s", exc)
return None

# --- raw packets ------------------------------------------------------

def store_raw_meshcore_packet(self, packet: Any) -> Optional[dict]:
Expand Down
17 changes: 17 additions & 0 deletions src/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,18 @@ def on_traceroute_command(self, target_node_id: int) -> None:
return
self.radio.send_traceroute(target_node_id)

def on_refresh_feeder_config(self) -> None:
"""Re-fetch feeder bot-config from API and reschedule periodic flood adverts."""
if not hasattr(self.radio, "reschedule_flood_advert_from_config"):
logger.warning(
"refresh_feeder_config ignored: radio does not support MeshCore config"
)
return
if not self.storage_apis:
logger.warning("refresh_feeder_config ignored: no storage API configured")
return
self.radio.reschedule_flood_advert_from_config(self.storage_apis[0])

# --- radio event handlers --------------------------------------------

def _on_connection_established(self, event: ConnectionEstablished) -> None:
Expand All @@ -135,6 +147,11 @@ def _on_connection_established(self, event: ConnectionEstablished) -> None:
self.radio, "schedule_channel_sync"
):
self.radio.schedule_channel_sync(self.storage_apis)
if event.extras.get("meshcore") and hasattr(
self.radio, "schedule_flood_advert_from_config"
):
if self.storage_apis:
self.radio.schedule_flood_advert_from_config(self.storage_apis[0])
self.print_nodes()
if self.ws_client:
self.ws_client.start()
Expand Down
3 changes: 3 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ def main() -> None:
on_apply_mc_channel_config=(
bot.on_apply_mc_channel_config if RADIO_PROTOCOL == "meshcore" else None
),
on_refresh_feeder_config=(
bot.on_refresh_feeder_config if RADIO_PROTOCOL == "meshcore" else None
),
feeder_pubkey_prefix_provider=(
(lambda: getattr(radio, "feeder_mc_pubkey_prefix", None))
if RADIO_PROTOCOL == "meshcore"
Expand Down
103 changes: 87 additions & 16 deletions src/meshcore/radio.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@

logger = logging.getLogger(__name__)

DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS = 6.0
MIN_MC_FLOOD_ADVERT_INTERVAL_HOURS = 2.0
MAX_MC_FLOOD_ADVERT_INTERVAL_HOURS = 24.0

# Do not JSON-dump high-frequency command plumbing (still forwarded to handlers).
_SKIP_MESHCORE_DUMP_TYPES = frozenset(
{
Expand Down Expand Up @@ -64,6 +68,7 @@ def __init__(
self._connected_once = False
self._local_node_id: Optional[str] = None
self._feeder_mc_pubkey: Optional[str] = None
self._flood_advert_task: Optional[asyncio.Task] = None
self._error_counter = get_global_error_counter()

self._dump_enabled = os.getenv("MESHCORE_DUMP_ENABLED", "true").lower() in (
Expand Down Expand Up @@ -257,6 +262,7 @@ async def _wait_until_shutdown(self) -> None:
await asyncio.sleep(0.25)

async def _async_shutdown(self) -> None:
self.cancel_flood_advert_periodic()
mc = self._meshcore
if mc:
try:
Expand All @@ -277,8 +283,40 @@ def dispatch_meshcore_event_for_tests(self, event: Event) -> None:
"""Synchronous hook for unit tests (same path as async subscriber)."""
self._dispatch_meshcore_event(event)

@staticmethod
def parse_flood_advert_interval_hours(config: Optional[dict]) -> float:
"""Clamp API config interval to 2–24 hours; default 6."""
if not config:
return DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS
try:
hours = float(config.get("mc_flood_advert_interval_hours", DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS))
except (TypeError, ValueError):
return DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS
return max(
MIN_MC_FLOOD_ADVERT_INTERVAL_HOURS,
min(MAX_MC_FLOOD_ADVERT_INTERVAL_HOURS, hours),
)

async def _send_flood_advert_once(self, *, log_label: str = "flood advert") -> None:
mc = self._meshcore
if mc is None or not mc.is_connected:
return
try:
result = await mc.commands.send_advert(flood=True)
if result.type == EventType.ERROR:
logger.warning(
"MeshCore send_advert(flood=True) failed (%s): %s",
log_label,
result.payload,
)
else:
logger.info("MeshCore sent %s", log_label)
except Exception:
self._error_counter.increment("meshcore.send_flood_advert")
logger.exception("MeshCore %s failed", log_label)

def schedule_initial_flood_advert(self) -> None:
"""Send one flood-routed advert after first connect (stop-gap until API-driven interval)."""
"""Send one flood-routed advert after first connect."""
loop = self._loop
if loop is None or not loop.is_running():
logger.warning(
Expand All @@ -287,24 +325,57 @@ def schedule_initial_flood_advert(self) -> None:
return

async def _task() -> None:
mc = self._meshcore
if mc is None or not mc.is_connected:
return
try:
result = await mc.commands.send_advert(flood=True)
if result.type == EventType.ERROR:
logger.warning(
"MeshCore send_advert(flood=True) failed: %s",
result.payload,
)
else:
logger.info("MeshCore sent initial flood advert")
except Exception:
self._error_counter.increment("meshcore.send_initial_flood_advert")
logger.exception("MeshCore initial flood advert failed")
await self._send_flood_advert_once(log_label="initial flood advert")

asyncio.create_task(_task())

def cancel_flood_advert_periodic(self) -> None:
task = getattr(self, "_flood_advert_task", None)
self._flood_advert_task = None
if task is not None and not task.done():
task.cancel()

def schedule_flood_advert_periodic(self, interval_hours: float) -> None:
"""Schedule recurring flood adverts on the radio asyncio loop."""
self.cancel_flood_advert_periodic()
loop = self._loop
if loop is None or not loop.is_running():
logger.warning(
"MeshCore periodic flood advert not scheduled: event loop not running"
)
return

hours = max(
MIN_MC_FLOOD_ADVERT_INTERVAL_HOURS,
min(MAX_MC_FLOOD_ADVERT_INTERVAL_HOURS, float(interval_hours)),
)
logger.info(
"MeshCore scheduling periodic flood advert every %s hour(s)",
hours,
)

async def _periodic() -> None:
try:
while not self._shutdown.is_set():
await asyncio.sleep(hours * 3600.0)
if self._shutdown.is_set():
break
await self._send_flood_advert_once(log_label="periodic flood advert")
except asyncio.CancelledError:
pass

self._flood_advert_task = asyncio.create_task(_periodic())

def schedule_flood_advert_from_config(self, storage_api) -> None:
"""Fetch feeder bot-config from API and start periodic flood adverts."""
config = storage_api.fetch_bot_config()
hours = self.parse_flood_advert_interval_hours(config)
self.schedule_flood_advert_periodic(hours)

def reschedule_flood_advert_from_config(self, storage_api) -> None:
"""Re-fetch bot-config and reschedule periodic flood adverts."""
self.schedule_flood_advert_from_config(storage_api)

def schedule_channel_sync(self, storage_apis: list) -> None:
"""Schedule channel sync on the radio loop (must run on that loop's thread)."""
if not storage_apis:
Expand Down
26 changes: 26 additions & 0 deletions src/ws_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def __init__(
api_key: str,
on_traceroute: Callable[[int], None],
on_apply_mc_channel_config: Optional[Callable[[list], None]] = None,
on_refresh_feeder_config: Optional[Callable[[], None]] = None,
on_connect: Optional[Callable[[], None]] = None,
on_disconnect: Optional[Callable[[], None]] = None,
feeder_pubkey_prefix_provider: Optional[Callable[[], Optional[str]]] = None,
Expand All @@ -44,6 +45,7 @@ def __init__(
self.api_key = api_key
self.on_traceroute = on_traceroute
self.on_apply_mc_channel_config = on_apply_mc_channel_config
self.on_refresh_feeder_config = on_refresh_feeder_config
self.on_connect = on_connect
self.on_disconnect = on_disconnect
self._feeder_pubkey_prefix_provider = feeder_pubkey_prefix_provider
Expand Down Expand Up @@ -233,5 +235,29 @@ def _apply_done(t):
logger.warning(
"MeshflowWSClient: apply_mc_channel_config missing channels or handler"
)
elif cmd_type == "refresh_feeder_config":
if self.on_refresh_feeder_config:
logger.info(
"MeshflowWSClient: received refresh_feeder_config"
)
task = asyncio.create_task(
asyncio.to_thread(self.on_refresh_feeder_config)
)

def _refresh_done(t):
if t.cancelled():
return
exc = t.exception()
if exc:
logger.warning(
"MeshflowWSClient: refresh_feeder_config failed: %s",
exc,
)

task.add_done_callback(_refresh_done)
else:
logger.warning(
"MeshflowWSClient: refresh_feeder_config missing handler"
)
else:
logger.debug(f"MeshflowWSClient: ignored command type: {cmd_type}")
65 changes: 65 additions & 0 deletions test/meshcore/test_flood_advert_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Tests for API-driven periodic MeshCore flood adverts."""

from __future__ import annotations

import asyncio
from unittest.mock import AsyncMock, MagicMock, patch

from src.meshcore.radio import (
DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS,
MeshCoreRadio,
)


def test_parse_flood_advert_interval_hours_defaults() -> None:
assert MeshCoreRadio.parse_flood_advert_interval_hours(None) == DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS
assert MeshCoreRadio.parse_flood_advert_interval_hours({}) == DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS


def test_parse_flood_advert_interval_hours_clamps() -> None:
assert MeshCoreRadio.parse_flood_advert_interval_hours(
{"mc_flood_advert_interval_hours": 1}
) == 2.0
assert MeshCoreRadio.parse_flood_advert_interval_hours(
{"mc_flood_advert_interval_hours": 48}
) == 24.0
assert MeshCoreRadio.parse_flood_advert_interval_hours(
{"mc_flood_advert_interval_hours": 12}
) == 12.0


def test_schedule_flood_advert_from_config() -> None:
storage = MagicMock()
storage.fetch_bot_config.return_value = {"mc_flood_advert_interval_hours": 8}

radio = MeshCoreRadio.__new__(MeshCoreRadio)
radio._flood_advert_task = None # noqa: SLF001
radio.schedule_flood_advert_periodic = MagicMock()
radio.schedule_flood_advert_from_config(storage)

storage.fetch_bot_config.assert_called_once()
radio.schedule_flood_advert_periodic.assert_called_once_with(8.0)


def test_cancel_flood_advert_periodic() -> None:
async def _runner() -> None:
radio = MeshCoreRadio.__new__(MeshCoreRadio)
loop = asyncio.get_running_loop()
radio._loop = loop # noqa: SLF001
radio._flood_advert_task = None # noqa: SLF001
import threading

radio._shutdown = threading.Event()
radio._meshcore = MagicMock()
radio._meshcore.is_connected = True
radio._error_counter = MagicMock()

radio.schedule_flood_advert_periodic(2.0)
task = radio._flood_advert_task
assert task is not None
radio.cancel_flood_advert_periodic()
assert radio._flood_advert_task is None
task.cancel()
await asyncio.sleep(0)

asyncio.run(_runner())