diff --git a/.env.example b/.env.example index df8e71c..7b97864 100644 --- a/.env.example +++ b/.env.example @@ -23,7 +23,7 @@ STORAGE_API_VERSION=2 # Use these if you want to upload to a second API (usually used during testing) # STORAGE_API_2_ROOT=... # STORAGE_API_2_VERSION=2 -# STORAGE_API_2_TOKEN=... +# STORAGE_API_2_TOKEN=... # optional; falls back to STORAGE_API_TOKEN # Use this if you want to receive commands from the Meshflow server (e.g. traceroute) MESHFLOW_WS_URL=ws://localhost:8000 diff --git a/docs/MESHCORE.md b/docs/MESHCORE.md index 2d701e9..b832aaa 100644 --- a/docs/MESHCORE.md +++ b/docs/MESHCORE.md @@ -36,7 +36,7 @@ Without `MESHCORE_UPLOAD_ENABLED`, `STORAGE_API_*` is ignored. When `MESHCORE_UPLOAD_ENABLED=true` and `STORAGE_API_*` are set, the bot also: -1. **On connect** — reads the device channel table (`meshcore.commands.get_channel`) and `POST`s `/api/meshcore/feeders/{prefix}/mc-channel-sync/` (device is source of truth for names/types). +1. **On connect** — reads the device channel table once (`meshcore.commands.get_channel`) and `POST`s the same snapshot to `/api/meshcore/feeders/{prefix}/mc-channel-sync/` on **each** configured API (`STORAGE_API_ROOT` and optional `STORAGE_API_2_ROOT` when `MESHCORE_UPLOAD_ENABLED=true`). Uses `STORAGE_API_2_TOKEN` if set, otherwise the primary `STORAGE_API_TOKEN`. 2. **WebSocket** — connects to `ws/nodes/?api_key=…` (URL derived from `STORAGE_API_ROOT` when `MESHFLOW_WS_URL` is unset). MeshCore feeders automatically append `feeder_pubkey_prefix` from the device pubkey after connect (no env var). Used for UI **apply to radio** (`apply_mc_channel_config`): the REST endpoint only dispatches to the bot over WS; the bot writes the device via `set_channel`, then re-syncs to the API. Traceroute commands remain Meshtastic-only; MC feeders ignore `traceroute` WS messages. diff --git a/src/api/StorageAPI.py b/src/api/StorageAPI.py index 09297dd..330d38f 100644 --- a/src/api/StorageAPI.py +++ b/src/api/StorageAPI.py @@ -199,7 +199,11 @@ def post_mc_channel_sync(self, body: dict) -> bool: return response.status_code in (200, 201) except HTTPError as exc: self._error_counter.increment("storage.post_mc_channel_sync.http") - logger.error("HTTP error posting MC channel sync: %s", exc.response.text) + logger.error( + "HTTP error posting MC channel sync to %s: %s", + self.base_url, + exc.response.text, + ) except RequestException as exc: self._error_counter.increment("storage.post_mc_channel_sync.network") logger.error("Network error posting MC channel sync: %s", exc) diff --git a/src/bot.py b/src/bot.py index 095d80d..f9679e8 100644 --- a/src/bot.py +++ b/src/bot.py @@ -109,8 +109,11 @@ def on_apply_mc_channel_config(self, channels: list) -> None: return if not apply_channels_on_device(self.radio, channels): return - for storage_api in self.storage_apis: - sync_channels_to_api(self.radio, storage_api) + if hasattr(self.radio, "schedule_channel_sync"): + self.radio.schedule_channel_sync(self.storage_apis) + else: + for storage_api in self.storage_apis: + sync_channels_to_api(self.radio, storage_api) def on_traceroute_command(self, target_node_id: int) -> None: """Handle a traceroute command (e.g. delivered via WebSocket).""" diff --git a/src/main.py b/src/main.py index 0aaf0d8..ebd2f1d 100644 --- a/src/main.py +++ b/src/main.py @@ -61,7 +61,7 @@ STORAGE_API_TOKEN = os.getenv("STORAGE_API_TOKEN", None) STORAGE_API_VERSION = int(os.getenv("STORAGE_API_VERSION", 1)) STORAGE_API_2_ROOT = os.getenv("STORAGE_API_2_ROOT") -STORAGE_API_2_TOKEN = os.getenv("STORAGE_API_2_TOKEN", None) +STORAGE_API_2_TOKEN = os.getenv("STORAGE_API_2_TOKEN") or STORAGE_API_TOKEN STORAGE_API_2_VERSION = int(os.getenv("STORAGE_API_2_VERSION", 1)) MESHFLOW_WS_URL = os.getenv("MESHFLOW_WS_URL") @@ -171,8 +171,8 @@ def main() -> None: StorageAPIWrapper( STORAGE_API_2_ROOT, STORAGE_API_2_TOKEN, - STORAGE_API_2_VERSION, - failed_packets_dir, + api_version=2, + failed_packets_dir=failed_packets_dir, serializer=serializer, local_meshtastic_nodenum_provider=lambda: bot.my_nodenum, meshcore_feeder_prefix_provider=lambda: getattr( @@ -188,6 +188,14 @@ def main() -> None: "RADIO_PROTOCOL=meshcore: ignoring STORAGE_API_2_ROOT (upload disabled)" ) + if bot.storage_apis: + destinations = [api.base_url for api in bot.storage_apis] + logging.info( + "API upload destinations (%s): %s", + len(destinations), + ", ".join(destinations), + ) + # WebSocket client (e.g. for remote traceroute commands) ws_url = MESHFLOW_WS_URL ws_token = None diff --git a/src/meshcore/channel_sync.py b/src/meshcore/channel_sync.py index 18fc835..1ce4f02 100644 --- a/src/meshcore/channel_sync.py +++ b/src/meshcore/channel_sync.py @@ -21,13 +21,11 @@ logger = logging.getLogger(__name__) -async def sync_channels_to_api_async( - radio: "MeshCoreRadio", storage: "StorageAPIWrapper" -) -> bool: - """Read channels on the MeshCore asyncio loop and POST mc-channel-sync.""" +async def read_channel_snapshot_async(radio: "MeshCoreRadio") -> Optional[dict]: + """Read the device channel table once; return mc-channel-sync body or None.""" if not radio.is_connected: - logger.warning("MeshCore channel sync skipped: radio not connected") - return False + logger.warning("MeshCore channel read skipped: radio not connected") + return None mc = radio._meshcore # noqa: SLF001 — intentional coupling for sync if mc is None: @@ -38,24 +36,54 @@ async def sync_channels_to_api_async( channels = await read_device_channels(mc) except Exception as exc: logger.exception("MeshCore read_device_channels failed: %s", exc) - return False + return None log_device_channels(channels) - body = snapshot_sync_body(channels) + return snapshot_sync_body(channels) + + +def post_channel_snapshot(storage: "StorageAPIWrapper", body: dict) -> bool: + """POST a pre-built snapshot to one API destination.""" ok = storage.post_mc_channel_sync(body) + label = getattr(storage, "base_url", "storage") if ok: logger.info( - "MeshCore channel sync posted to API (%s channel(s))", - len(channels), + "MeshCore channel sync posted to %s (%s channel(s))", + label, + len(body.get("channels") or []), ) else: logger.warning( - "MeshCore channel sync to API failed (%s channel(s) read from device)", - len(channels), + "MeshCore channel sync to %s failed (%s channel(s) in snapshot)", + label, + len(body.get("channels") or []), ) return ok +async def sync_channels_to_storage_apis_async( + radio: "MeshCoreRadio", storage_apis: list["StorageAPIWrapper"] +) -> None: + """Read device channels once and POST the same snapshot to every configured API.""" + if not storage_apis: + return + body = await read_channel_snapshot_async(radio) + if body is None: + return + for storage in storage_apis: + post_channel_snapshot(storage, body) + + +async def sync_channels_to_api_async( + radio: "MeshCoreRadio", storage: "StorageAPIWrapper" +) -> bool: + """Read channels on the MeshCore asyncio loop and POST mc-channel-sync to one API.""" + body = await read_channel_snapshot_async(radio) + if body is None: + return False + return post_channel_snapshot(storage, body) + + def sync_channels_to_api(radio: "MeshCoreRadio", storage: "StorageAPIWrapper") -> bool: """Sync from a non-radio thread (e.g. WebSocket worker). Do not call from the radio loop.""" if not hasattr(radio, "run_coroutine"): diff --git a/src/meshcore/radio.py b/src/meshcore/radio.py index 22c0736..285aef6 100644 --- a/src/meshcore/radio.py +++ b/src/meshcore/radio.py @@ -288,11 +288,15 @@ def schedule_channel_sync(self, storage_apis: list) -> None: return async def _task() -> None: - from src.meshcore.channel_sync import sync_channels_to_api_async + from src.meshcore.channel_sync import sync_channels_to_storage_apis_async - logger.info("MeshCore channel sync starting") - for storage in storage_apis: - await sync_channels_to_api_async(self, storage) + labels = [str(getattr(s, "base_url", "?")) for s in storage_apis] + logger.info( + "MeshCore channel sync starting (%s API destination(s): %s)", + len(storage_apis), + ", ".join(labels), + ) + await sync_channels_to_storage_apis_async(self, storage_apis) logger.info("MeshCore channel sync finished") asyncio.create_task(_task()) diff --git a/test/meshcore/test_channel_sync.py b/test/meshcore/test_channel_sync.py index d50501a..0192170 100644 --- a/test/meshcore/test_channel_sync.py +++ b/test/meshcore/test_channel_sync.py @@ -9,6 +9,7 @@ apply_channels_on_device, sync_channels_to_api, sync_channels_to_api_async, + sync_channels_to_storage_apis_async, ) @@ -50,6 +51,31 @@ async def _run(): storage.post_mc_channel_sync.assert_called_once() +def test_sync_channels_to_storage_apis_posts_once_reads_once() -> None: + radio = _MeshCoreRadioStub(connected=True, meshcore=MagicMock()) + storage_a = MagicMock() + storage_a.base_url = "http://api-one" + storage_a.post_mc_channel_sync.return_value = True + storage_b = MagicMock() + storage_b.base_url = "http://api-two" + storage_b.post_mc_channel_sync.return_value = True + channels = [{"mc_channel_idx": 0, "name": "Public", "mc_channel_type": "PUBLIC"}] + body = {"channels": channels, "synced_at": "2026-01-01T00:00:00Z"} + + async def _run(): + with patch( + "src.meshcore.channel_sync.read_channel_snapshot_async", + new_callable=AsyncMock, + return_value=body, + ) as read_mock: + await sync_channels_to_storage_apis_async(radio, [storage_a, storage_b]) + read_mock.assert_awaited_once() + + asyncio.run(_run()) + storage_a.post_mc_channel_sync.assert_called_once_with(body) + storage_b.post_mc_channel_sync.assert_called_once_with(body) + + def test_sync_channels_logs_and_reports_api_failure(caplog) -> None: import logging @@ -61,15 +87,15 @@ def test_sync_channels_logs_and_reports_api_failure(caplog) -> None: async def _run(): with patch( - "src.meshcore.channel_sync.read_device_channels", + "src.meshcore.channel_sync.read_channel_snapshot_async", new_callable=AsyncMock, - return_value=channels, + return_value={"channels": channels, "synced_at": "2026-01-01T00:00:00Z"}, ): + storage.base_url = "http://api.test" return await sync_channels_to_api_async(radio, storage) assert asyncio.run(_run()) is False - assert "MeshCore device channels (1):" in caplog.text - assert "channel sync to API failed" in caplog.text + assert "channel sync to http://api.test failed" in caplog.text def test_sync_channels_skipped_when_disconnected() -> None: diff --git a/test/meshcore/test_radio_channel_sync.py b/test/meshcore/test_radio_channel_sync.py index 8b60a0e..adf1aa7 100644 --- a/test/meshcore/test_radio_channel_sync.py +++ b/test/meshcore/test_radio_channel_sync.py @@ -35,14 +35,14 @@ async def _runner(): radio._meshcore.is_connected = True storage = MagicMock() + storage.base_url = "http://api.test" done = asyncio.Event() - async def _fake_sync(_radio, _storage): + async def _fake_sync(_radio, _storages): done.set() - return True with patch( - "src.meshcore.channel_sync.sync_channels_to_api_async", + "src.meshcore.channel_sync.sync_channels_to_storage_apis_async", side_effect=_fake_sync, ): radio.schedule_channel_sync([storage]) diff --git a/test/test_bot.py b/test/test_bot.py index 6d0528d..894d9be 100644 --- a/test/test_bot.py +++ b/test/test_bot.py @@ -41,14 +41,13 @@ def test_apply_mc_channel_config_syncs_after_apply(self): bot = MeshflowBot(radio=mc_radio) bot.storage_apis = [MagicMock(), MagicMock()] channels = [{"mc_channel_idx": 0, "name": "Public"}] + mc_radio.schedule_channel_sync = MagicMock() with patch( "src.meshcore.channel_sync.apply_channels_on_device", return_value=True - ) as apply_mock, patch( - "src.meshcore.channel_sync.sync_channels_to_api", return_value=True - ) as sync_mock: + ) as apply_mock: bot.on_apply_mc_channel_config(channels) apply_mock.assert_called_once_with(mc_radio, channels) - self.assertEqual(sync_mock.call_count, 2) + mc_radio.schedule_channel_sync.assert_called_once_with(bot.storage_apis) def test_meshcore_connection_schedules_channel_sync(self): mc_radio = MagicMock()