diff --git a/src/meshcore/radio.py b/src/meshcore/radio.py index 0e2bfec..246b1c9 100644 --- a/src/meshcore/radio.py +++ b/src/meshcore/radio.py @@ -376,8 +376,21 @@ def reschedule_flood_advert_from_config(self, storage_api) -> None: """Re-fetch bot-config and reschedule periodic flood adverts.""" self.schedule_flood_advert_from_config(storage_api) + def _submit_coro_to_radio_loop(self, coro): + """Schedule *coro* on the MeshCore asyncio loop (safe from any thread).""" + loop = self._loop + if loop is None or not loop.is_running(): + raise RadioError("MeshCoreRadio: event loop not running") + try: + running = asyncio.get_running_loop() + except RuntimeError: + running = None + if running is loop: + return asyncio.create_task(coro) + return asyncio.run_coroutine_threadsafe(coro, loop) + def schedule_channel_sync(self, storage_apis: list) -> None: - """Schedule channel sync on the radio loop (must run on that loop's thread).""" + """Schedule channel sync on the radio asyncio loop (any thread).""" if not storage_apis: return loop = self._loop @@ -399,7 +412,10 @@ async def _task() -> None: await sync_channels_to_storage_apis_async(self, storage_apis) logger.info("MeshCore channel sync finished") - asyncio.create_task(_task()) + try: + self._submit_coro_to_radio_loop(_task()) + except RadioError as exc: + logger.warning("MeshCore channel sync not scheduled: %s", exc) def run_coroutine(self, coro, *, timeout: float = 30.0): """Run a coroutine on the MeshCore asyncio loop from another thread.""" diff --git a/test/meshcore/test_radio_channel_sync.py b/test/meshcore/test_radio_channel_sync.py index adf1aa7..ac86d02 100644 --- a/test/meshcore/test_radio_channel_sync.py +++ b/test/meshcore/test_radio_channel_sync.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +import threading from unittest.mock import MagicMock, patch import pytest @@ -49,3 +50,41 @@ async def _fake_sync(_radio, _storages): await asyncio.wait_for(done.wait(), timeout=1.0) asyncio.run(_runner()) + + +def test_schedule_channel_sync_from_worker_thread() -> None: + """WS apply_mc_channel_config thread must schedule sync on the radio loop.""" + + async def _runner(): + radio = MeshCoreRadio.__new__(MeshCoreRadio) + loop = asyncio.get_running_loop() + radio._loop = loop # noqa: SLF001 + radio._meshcore = MagicMock() + radio._meshcore.is_connected = True + + storage = MagicMock() + storage.base_url = "http://api.test" + done = asyncio.Event() + + async def _fake_sync(_radio, _storages): + done.set() + + with patch( + "src.meshcore.channel_sync.sync_channels_to_storage_apis_async", + side_effect=_fake_sync, + ): + err: list[BaseException] = [] + + def _worker(): + try: + radio.schedule_channel_sync([storage]) + except BaseException as exc: + err.append(exc) + + t = threading.Thread(target=_worker) + t.start() + t.join(timeout=2.0) + assert not err, err + await asyncio.wait_for(done.wait(), timeout=1.0) + + asyncio.run(_runner())