diff --git a/src/bot.py b/src/bot.py index 4e47619..23cf53e 100644 --- a/src/bot.py +++ b/src/bot.py @@ -17,19 +17,15 @@ from src.commands.factory import CommandFactory from src.data_classes import MeshNode from src.helpers import pretty_print_last_heard, safe_encode_node_name +from src.packet_log import log_incoming_packet from src.persistence.commands_logger import AbstractCommandLogger from src.persistence.node_db import AbstractNodeDB from src.persistence.node_info import AbstractNodeInfoStore -from src.packet_log import log_incoming_packet from src.persistence.packet_dump import dump_packet from src.persistence.user_prefs import AbstractUserPrefsPersistence from src.radio.errors import call_safely, get_global_error_counter -from src.radio.events import ( - ConnectionEstablished, - IncomingPacket, - IncomingTextMessage, - NodeUpdate, -) +from src.radio.events import (ConnectionEstablished, IncomingPacket, + IncomingTextMessage, NodeUpdate) from src.radio.interface import RadioHandlers, RadioInterface from src.responders.responder_factory import ResponderFactory @@ -97,10 +93,9 @@ def disconnect(self) -> None: def on_apply_mc_channel_config(self, channels: list) -> None: """Handle apply_mc_channel_config from WebSocket (MeshCore feeders).""" - from src.meshcore.channel_sync import ( - apply_channels_on_device, - sync_channels_to_api, - ) + from src.meshcore.channel_sync import (apply_channels_on_device, + sync_channels_after_apply, + sync_channels_to_api) if not hasattr(self.radio, "run_coroutine"): logger.warning( @@ -110,10 +105,12 @@ def on_apply_mc_channel_config(self, channels: list) -> None: if not apply_channels_on_device(self.radio, channels): return if hasattr(self.radio, "schedule_channel_sync"): - self.radio.schedule_channel_sync(self.storage_apis) + self.radio.schedule_channel_sync(self.storage_apis, scope_hints=channels) + elif self.storage_apis: + sync_channels_after_apply(self.radio, self.storage_apis, channels) else: for storage_api in self.storage_apis: - sync_channels_to_api(self.radio, storage_api) + sync_channels_to_api(self.radio, storage_api, scope_hints=channels) def on_traceroute_command(self, target_node_id: int) -> None: """Handle a traceroute command (e.g. delivered via WebSocket).""" diff --git a/src/meshcore/channel_sync.py b/src/meshcore/channel_sync.py index 1ce4f02..d9777eb 100644 --- a/src/meshcore/channel_sync.py +++ b/src/meshcore/channel_sync.py @@ -8,11 +8,8 @@ CHANNEL_READ_DELAY_S = 2.0 -from src.meshcore.channels import ( - log_device_channels, - read_device_channels, - snapshot_sync_body, -) +from src.meshcore.channels import (log_device_channels, read_device_channels, + snapshot_sync_body) if TYPE_CHECKING: from src.api.StorageAPI import StorageAPIWrapper @@ -21,7 +18,11 @@ logger = logging.getLogger(__name__) -async def read_channel_snapshot_async(radio: "MeshCoreRadio") -> Optional[dict]: +async def read_channel_snapshot_async( + radio: "MeshCoreRadio", + *, + scope_hints: list[dict] | None = None, +) -> Optional[dict]: """Read the device channel table once; return mc-channel-sync body or None.""" if not radio.is_connected: logger.warning("MeshCore channel read skipped: radio not connected") @@ -33,7 +34,7 @@ async def read_channel_snapshot_async(radio: "MeshCoreRadio") -> Optional[dict]: else: try: await asyncio.sleep(CHANNEL_READ_DELAY_S) - channels = await read_device_channels(mc) + channels = await read_device_channels(mc, scope_hints=scope_hints) except Exception as exc: logger.exception("MeshCore read_device_channels failed: %s", exc) return None @@ -62,12 +63,15 @@ def post_channel_snapshot(storage: "StorageAPIWrapper", body: dict) -> bool: async def sync_channels_to_storage_apis_async( - radio: "MeshCoreRadio", storage_apis: list["StorageAPIWrapper"] + radio: "MeshCoreRadio", + storage_apis: list["StorageAPIWrapper"], + *, + scope_hints: list[dict] | None = None, ) -> None: """Read device channels once and POST the same snapshot to every configured API.""" if not storage_apis: return - body = await read_channel_snapshot_async(radio) + body = await read_channel_snapshot_async(radio, scope_hints=scope_hints) if body is None: return for storage in storage_apis: @@ -75,23 +79,31 @@ async def sync_channels_to_storage_apis_async( async def sync_channels_to_api_async( - radio: "MeshCoreRadio", storage: "StorageAPIWrapper" + radio: "MeshCoreRadio", + storage: "StorageAPIWrapper", + *, + scope_hints: list[dict] | None = None, ) -> bool: """Read channels on the MeshCore asyncio loop and POST mc-channel-sync to one API.""" - body = await read_channel_snapshot_async(radio) + body = await read_channel_snapshot_async(radio, scope_hints=scope_hints) if body is None: return False return post_channel_snapshot(storage, body) -def sync_channels_to_api(radio: "MeshCoreRadio", storage: "StorageAPIWrapper") -> bool: +def sync_channels_to_api( + radio: "MeshCoreRadio", + storage: "StorageAPIWrapper", + *, + scope_hints: list[dict] | None = None, +) -> bool: """Sync from a non-radio thread (e.g. WebSocket worker). Do not call from the radio loop.""" if not hasattr(radio, "run_coroutine"): logger.warning("MeshCore channel sync skipped: radio has no run_coroutine") return False try: return radio.run_coroutine( - sync_channels_to_api_async(radio, storage), + sync_channels_to_api_async(radio, storage, scope_hints=scope_hints), timeout=120.0, ) except Exception as exc: @@ -115,3 +127,25 @@ async def _apply(): except Exception as exc: logger.exception("MeshCore apply_device_channels failed: %s", exc) return False + + +def sync_channels_after_apply( + radio: "MeshCoreRadio", + storage_apis: list["StorageAPIWrapper"], + applied_channels: list[dict], +) -> None: + """Re-read device channels and merge applied region_scope before posting to APIs.""" + if not hasattr(radio, "run_coroutine"): + logger.warning("MeshCore post-apply sync skipped: radio has no run_coroutine") + return + try: + radio.run_coroutine( + sync_channels_to_storage_apis_async( + radio, + storage_apis, + scope_hints=applied_channels, + ), + timeout=120.0, + ) + except Exception as exc: + logger.exception("MeshCore post-apply channel sync failed: %s", exc) diff --git a/src/meshcore/channels.py b/src/meshcore/channels.py index 7595bde..058cccf 100644 --- a/src/meshcore/channels.py +++ b/src/meshcore/channels.py @@ -14,22 +14,69 @@ DEFAULT_MAX_CHANNEL_SCAN = 16 +def _normalize_region_scope(value: Any) -> str | None: + if value is None: + return None + raw = str(value).strip().lower().lstrip("#") + if not raw or raw in ("*", "none", "null"): + return None + return raw[:29] + + +def merge_channel_region_scopes( + device_channels: list[dict], + intent_channels: list[dict] | None, +) -> list[dict]: + """ + Overlay region_scope from an apply/sync intent onto device snapshot rows. + + Companion CHANNEL_INFO does not return per-channel scope yet; after apply we + must carry scope from the operator payload so the API does not create duplicate + unscoped canonical rows. + """ + if not intent_channels: + return device_channels + + intent_by_idx: dict[int, dict] = {} + for row in intent_channels: + if row.get("mc_channel_idx") is None: + continue + intent_by_idx[int(row["mc_channel_idx"])] = row + + merged: list[dict] = [] + for entry in device_channels: + row = dict(entry) + idx = int(row["mc_channel_idx"]) + intent = intent_by_idx.get(idx) + if intent is not None and "region_scope" in intent: + row["region_scope"] = _normalize_region_scope(intent.get("region_scope")) + merged.append(row) + return merged + + def _channel_entry_from_info(idx: int, payload: dict) -> Optional[dict]: name = str(payload.get("channel_name", "") or "").strip() if not name: return None + scope = ( + payload.get("region_scope") + or payload.get("flood_scope") + or payload.get("scope_name") + ) + region_scope = _normalize_region_scope(scope) if name.startswith("#"): + tag = name.lstrip("#")[:100] or f"channel {idx}" return { "mc_channel_idx": idx, - "name": name.lstrip("#")[:100] or f"channel {idx}", + "name": tag, "mc_channel_type": "HASHTAG", - "mc_hashtag": name.lstrip("#")[:64], + "region_scope": region_scope, } return { "mc_channel_idx": idx, "name": name[:100], "mc_channel_type": "PUBLIC", - "mc_hashtag": None, + "region_scope": region_scope, } @@ -40,14 +87,40 @@ def _describe_channel_event(idx: int, evt: Any) -> str: if evt.type == EventType.CHANNEL_INFO: payload = evt.payload if isinstance(evt.payload, dict) else {} name = payload.get("channel_name", "") + scope = payload.get("region_scope") or payload.get("scope_name") + if scope: + return f"[{idx}] CHANNEL_INFO name={name!r} region_scope={scope!r}" return f"[{idx}] CHANNEL_INFO name={name!r}" return f"[{idx}] {evt.type}" +async def _apply_active_flood_scope( + meshcore: MeshCore, region_scope: str | None +) -> None: + """ + Set the companion active flood scope (CMD_SET_FLOOD_SCOPE / set_flood_scope). + + This is the operator-facing scope used when sending on the active channel slot; + firmware does not yet return per-channel scope in CHANNEL_INFO. + """ + set_scope = getattr(getattr(meshcore, "commands", None), "set_flood_scope", None) + if set_scope is None: + logger.warning( + "meshcore.commands.set_flood_scope unavailable; region_scope=%r not applied to radio", + region_scope, + ) + return + scope_arg = region_scope if region_scope else "*" + evt = await set_scope(scope_arg) + if evt.type == EventType.ERROR: + logger.warning("set_flood_scope(%r) failed: %s", scope_arg, evt.payload) + + async def read_device_channels( meshcore: MeshCore, *, max_channels: int = DEFAULT_MAX_CHANNEL_SCAN, + scope_hints: list[dict] | None = None, ) -> list[dict]: """Return channel snapshot rows for API mc-channel-sync.""" channels: list[dict] = [] @@ -74,7 +147,7 @@ async def read_device_channels( "MeshCore get_channel scan found 0 named channels: %s", "; ".join(scan_lines) or "(no responses)", ) - return channels + return merge_channel_region_scopes(channels, scope_hints) async def apply_device_channels(meshcore: MeshCore, channels: list[dict]) -> None: @@ -83,12 +156,22 @@ async def apply_device_channels(meshcore: MeshCore, channels: list[dict]) -> Non idx = int(ch["mc_channel_idx"]) name = str(ch.get("name") or f"channel {idx}") ch_type = str(ch.get("mc_channel_type", "PUBLIC")).upper() + region_scope = _normalize_region_scope(ch.get("region_scope")) if ch_type == "HASHTAG": - tag = str(ch.get("mc_hashtag") or name).lstrip("#") + tag = str(ch.get("name") or name).lstrip("#") name = f"#{tag}" evt = await meshcore.commands.set_channel(idx, name) if evt.type == EventType.ERROR: logger.warning("set_channel(%s) failed: %s", idx, evt.payload) + continue + await _apply_active_flood_scope(meshcore, region_scope) + if region_scope: + logger.info( + "MeshCore channel [%s] set name=%r active flood scope=%r", + idx, + ch.get("name"), + region_scope, + ) def log_device_channels(channels: list[dict]) -> None: @@ -101,17 +184,15 @@ def log_device_channels(channels: list[dict]) -> None: idx = ch["mc_channel_idx"] typ = ch.get("mc_channel_type", "?") name = ch.get("name", "") - tag = ch.get("mc_hashtag") - if tag: - logger.info( - " [%s] %s name=%r hashtag=%r", - idx, - typ, - name, - tag, - ) - else: - logger.info(" [%s] %s name=%r", idx, typ, name) + scope = ch.get("region_scope") + scope_label = scope if scope else "(none)" + logger.info( + " [%s] %s name=%r region_scope=%s", + idx, + typ, + name, + scope_label, + ) def snapshot_sync_body(channels: list[dict]) -> dict: diff --git a/src/meshcore/radio.py b/src/meshcore/radio.py index 246b1c9..241e1fe 100644 --- a/src/meshcore/radio.py +++ b/src/meshcore/radio.py @@ -12,11 +12,9 @@ from meshcore import MeshCore from meshcore.events import Event, EventType from src.meshcore.dump import dump_meshcore_event -from src.meshcore.translation import ( - event_to_incoming_packet, - event_to_node_update, - event_to_text_message, -) +from src.meshcore.translation import (event_to_incoming_packet, + event_to_node_update, + event_to_text_message) from src.radio.errors import RadioError, call_safely, get_global_error_counter from src.radio.events import ConnectionEstablished from src.radio.interface import RadioHandlers, RadioInterface @@ -289,7 +287,12 @@ def parse_flood_advert_interval_hours(config: Optional[dict]) -> float: if not config: return DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS try: - hours = float(config.get("mc_flood_advert_interval_hours", DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS)) + hours = float( + config.get( + "mc_flood_advert_interval_hours", + DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS, + ) + ) except (TypeError, ValueError): return DEFAULT_MC_FLOOD_ADVERT_INTERVAL_HOURS return max( @@ -360,7 +363,9 @@ async def _periodic() -> None: await asyncio.sleep(hours * 3600.0) if self._shutdown.is_set(): break - await self._send_flood_advert_once(log_label="periodic flood advert") + await self._send_flood_advert_once( + log_label="periodic flood advert" + ) except asyncio.CancelledError: pass @@ -389,7 +394,12 @@ def _submit_coro_to_radio_loop(self, coro): return asyncio.create_task(coro) return asyncio.run_coroutine_threadsafe(coro, loop) - def schedule_channel_sync(self, storage_apis: list) -> None: + def schedule_channel_sync( + self, + storage_apis: list, + *, + scope_hints: list[dict] | None = None, + ) -> None: """Schedule channel sync on the radio asyncio loop (any thread).""" if not storage_apis: return @@ -401,7 +411,8 @@ def schedule_channel_sync(self, storage_apis: list) -> None: return async def _task() -> None: - from src.meshcore.channel_sync import sync_channels_to_storage_apis_async + from src.meshcore.channel_sync import \ + sync_channels_to_storage_apis_async labels = [str(getattr(s, "base_url", "?")) for s in storage_apis] logger.info( @@ -409,7 +420,11 @@ async def _task() -> None: len(storage_apis), ", ".join(labels), ) - await sync_channels_to_storage_apis_async(self, storage_apis) + await sync_channels_to_storage_apis_async( + self, + storage_apis, + scope_hints=scope_hints, + ) logger.info("MeshCore channel sync finished") try: diff --git a/test/meshcore/test_channels.py b/test/meshcore/test_channels.py index 10388b7..7b117cd 100644 --- a/test/meshcore/test_channels.py +++ b/test/meshcore/test_channels.py @@ -6,29 +6,76 @@ from unittest.mock import AsyncMock, MagicMock from meshcore.events import Event, EventType - -from src.meshcore.channels import ( - _channel_entry_from_info, - apply_device_channels, - log_device_channels, - read_device_channels, - snapshot_sync_body, -) +from src.meshcore.channels import (_channel_entry_from_info, + apply_device_channels, log_device_channels, + merge_channel_region_scopes, + read_device_channels, snapshot_sync_body) def test_channel_entry_public(): entry = _channel_entry_from_info(0, {"channel_name": "Public"}) assert entry["mc_channel_type"] == "PUBLIC" - assert entry["mc_hashtag"] is None + assert entry["region_scope"] is None def test_channel_entry_hashtag(): entry = _channel_entry_from_info(1, {"channel_name": "#galloway"}) assert entry["mc_channel_type"] == "HASHTAG" - assert entry["mc_hashtag"] == "galloway" + assert entry["name"] == "galloway" + assert entry["region_scope"] is None -def test_log_device_channels(caplog) -> None: +def test_channel_entry_with_region_scope(): + entry = _channel_entry_from_info( + 1, + {"channel_name": "#galloway", "scope_name": "Sample-West"}, + ) + assert entry["region_scope"] == "sample-west" + + +def test_merge_channel_region_scopes_from_apply_intent(): + device = [ + { + "mc_channel_idx": 0, + "name": "galloway", + "mc_channel_type": "HASHTAG", + "region_scope": None, + }, + ] + intent = [ + { + "mc_channel_idx": 0, + "name": "galloway", + "mc_channel_type": "HASHTAG", + "region_scope": "uk-wide", + }, + ] + merged = merge_channel_region_scopes(device, intent) + assert merged[0]["region_scope"] == "uk-wide" + + +def test_merge_clears_scope_when_intent_null(): + device = [ + { + "mc_channel_idx": 0, + "name": "galloway", + "mc_channel_type": "HASHTAG", + "region_scope": None, + }, + ] + intent = [ + { + "mc_channel_idx": 0, + "name": "galloway", + "mc_channel_type": "HASHTAG", + "region_scope": None, + }, + ] + merged = merge_channel_region_scopes(device, intent) + assert merged[0]["region_scope"] is None + + +def test_log_device_channels_always_logs_scope(caplog) -> None: import logging caplog.set_level(logging.INFO) @@ -39,14 +86,13 @@ def test_log_device_channels(caplog) -> None: "mc_channel_idx": 1, "name": "galloway", "mc_channel_type": "HASHTAG", - "mc_hashtag": "galloway", + "region_scope": "uk-wide", }, ] ) text = caplog.text - assert "MeshCore device channels (2):" in text - assert "Public" in text - assert "galloway" in text + assert "region_scope=(none)" in text + assert "region_scope=uk-wide" in text def test_log_device_channels_empty(caplog) -> None: @@ -69,6 +115,36 @@ def test_channel_entry_empty_name_returns_none() -> None: assert _channel_entry_from_info(0, {"channel_name": " "}) is None +def test_read_device_channels_merges_scope_hints() -> None: + mc = MagicMock() + mc.commands.get_channel = AsyncMock( + side_effect=[ + Event( + EventType.CHANNEL_INFO, + {"channel_name": "#galloway", "channel_idx": 0}, + {}, + ), + Event(EventType.ERROR, {}, {}), + ] + ) + channels = asyncio.run( + read_device_channels( + mc, + max_channels=2, + scope_hints=[ + { + "mc_channel_idx": 0, + "name": "galloway", + "mc_channel_type": "HASHTAG", + "region_scope": "sample-west", + }, + ], + ) + ) + assert len(channels) == 1 + assert channels[0]["region_scope"] == "sample-west" + + def test_read_device_channels_logs_scan_when_empty(caplog) -> None: import logging @@ -80,32 +156,14 @@ def test_read_device_channels_logs_scan_when_empty(caplog) -> None: channels = asyncio.run(read_device_channels(mc, max_channels=2)) assert channels == [] assert "get_channel scan found 0 named channels" in caplog.text - assert "[0] ERROR" in caplog.text -def test_read_device_channels_collects_public_and_skips_errors() -> None: - mc = MagicMock() - mc.commands.get_channel = AsyncMock( - side_effect=[ - Event(EventType.ERROR, {}, {}), - Event(EventType.CHANNEL_INFO, {"channel_name": "Public"}, {}), - Event(EventType.CHANNEL_INFO, {"channel_name": ""}, {}), - Event(EventType.ERROR, {}, {}), - ] - ) - channels = asyncio.run(read_device_channels(mc, max_channels=4)) - assert len(channels) == 1 - assert channels[0]["mc_channel_type"] == "PUBLIC" - - -def test_apply_device_channels_hashtag_and_error() -> None: +def test_apply_device_channels_sets_flood_scope() -> None: mc = MagicMock() mc.commands.set_channel = AsyncMock( - side_effect=[ - Event(EventType.CHANNEL_INFO, {}, {}), - Event(EventType.ERROR, {"msg": "fail"}, {}), - ] + return_value=Event(EventType.CHANNEL_INFO, {}, {}) ) + mc.commands.set_flood_scope = AsyncMock(return_value=Event(EventType.OK, {}, {})) asyncio.run( apply_device_channels( mc, @@ -114,12 +172,9 @@ def test_apply_device_channels_hashtag_and_error() -> None: "mc_channel_idx": 1, "name": "galloway", "mc_channel_type": "HASHTAG", - "mc_hashtag": "galloway", + "region_scope": "sample-west", }, - {"mc_channel_idx": 2, "name": "two", "mc_channel_type": "PUBLIC"}, ], ) ) - assert mc.commands.set_channel.await_count == 2 - first_call = mc.commands.set_channel.await_args_list[0] - assert first_call[0] == (1, "#galloway") + mc.commands.set_flood_scope.assert_awaited_once_with("sample-west") diff --git a/test/meshcore/test_radio_channel_sync.py b/test/meshcore/test_radio_channel_sync.py index ac86d02..abaa7ff 100644 --- a/test/meshcore/test_radio_channel_sync.py +++ b/test/meshcore/test_radio_channel_sync.py @@ -39,7 +39,7 @@ async def _runner(): storage.base_url = "http://api.test" done = asyncio.Event() - async def _fake_sync(_radio, _storages): + async def _fake_sync(_radio, _storages, **kwargs): done.set() with patch( @@ -66,7 +66,7 @@ async def _runner(): storage.base_url = "http://api.test" done = asyncio.Event() - async def _fake_sync(_radio, _storages): + async def _fake_sync(_radio, _storages, **kwargs): done.set() with patch( diff --git a/test/test_bot.py b/test/test_bot.py index 894d9be..485f69b 100644 --- a/test/test_bot.py +++ b/test/test_bot.py @@ -1,9 +1,9 @@ import unittest +from test.fake_radio import FakeRadio from unittest.mock import MagicMock, patch from src.bot import MeshflowBot from src.radio.events import ConnectionEstablished -from test.fake_radio import FakeRadio class TestMeshflowBot(unittest.TestCase): @@ -47,7 +47,10 @@ def test_apply_mc_channel_config_syncs_after_apply(self): ) as apply_mock: bot.on_apply_mc_channel_config(channels) apply_mock.assert_called_once_with(mc_radio, channels) - mc_radio.schedule_channel_sync.assert_called_once_with(bot.storage_apis) + mc_radio.schedule_channel_sync.assert_called_once_with( + bot.storage_apis, + scope_hints=channels, + ) def test_meshcore_connection_schedules_channel_sync(self): mc_radio = MagicMock()