Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions src/meshcore/radio.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,21 @@ def reschedule_flood_advert_from_config(self, storage_api) -> None:
"""Re-fetch bot-config and reschedule periodic flood adverts."""
self.schedule_flood_advert_from_config(storage_api)

def _submit_coro_to_radio_loop(self, coro):
"""Schedule *coro* on the MeshCore asyncio loop (safe from any thread)."""
loop = self._loop
if loop is None or not loop.is_running():
raise RadioError("MeshCoreRadio: event loop not running")
try:
running = asyncio.get_running_loop()
except RuntimeError:
running = None
if running is loop:
return asyncio.create_task(coro)
return asyncio.run_coroutine_threadsafe(coro, loop)

def schedule_channel_sync(self, storage_apis: list) -> None:
"""Schedule channel sync on the radio loop (must run on that loop's thread)."""
"""Schedule channel sync on the radio asyncio loop (any thread)."""
if not storage_apis:
return
loop = self._loop
Expand All @@ -399,7 +412,10 @@ async def _task() -> None:
await sync_channels_to_storage_apis_async(self, storage_apis)
logger.info("MeshCore channel sync finished")

asyncio.create_task(_task())
try:
self._submit_coro_to_radio_loop(_task())
except RadioError as exc:
logger.warning("MeshCore channel sync not scheduled: %s", exc)

def run_coroutine(self, coro, *, timeout: float = 30.0):
"""Run a coroutine on the MeshCore asyncio loop from another thread."""
Expand Down
39 changes: 39 additions & 0 deletions test/meshcore/test_radio_channel_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import asyncio
import threading
from unittest.mock import MagicMock, patch

import pytest
Expand Down Expand Up @@ -49,3 +50,41 @@ async def _fake_sync(_radio, _storages):
await asyncio.wait_for(done.wait(), timeout=1.0)

asyncio.run(_runner())


def test_schedule_channel_sync_from_worker_thread() -> None:
"""WS apply_mc_channel_config thread must schedule sync on the radio loop."""

async def _runner():
radio = MeshCoreRadio.__new__(MeshCoreRadio)
loop = asyncio.get_running_loop()
radio._loop = loop # noqa: SLF001
radio._meshcore = MagicMock()
radio._meshcore.is_connected = True

storage = MagicMock()
storage.base_url = "http://api.test"
done = asyncio.Event()

async def _fake_sync(_radio, _storages):
done.set()

with patch(
"src.meshcore.channel_sync.sync_channels_to_storage_apis_async",
side_effect=_fake_sync,
):
err: list[BaseException] = []

def _worker():
try:
radio.schedule_channel_sync([storage])
except BaseException as exc:
err.append(exc)

t = threading.Thread(target=_worker)
t.start()
t.join(timeout=2.0)
assert not err, err
await asyncio.wait_for(done.wait(), timeout=1.0)

asyncio.run(_runner())