From c93680c6a1b2aaeb958312e1c0baacdc00a4106b Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Thu, 21 May 2026 12:37:40 +0100 Subject: [PATCH 1/8] fix(ws): join MC feeder group by pubkey prefix on WebSocket Match apply-mc-channel-config dispatch to the same ManagedNode as feeder-scoped HTTP ingest when API keys are shared (#295). --- Meshflow/meshcore_packets/views.py | 8 +- Meshflow/ws/consumers.py | 64 +++++++++++++-- Meshflow/ws/tests/test_mc_node_consumer.py | 95 ++++++++++++++++++++++ 3 files changed, 157 insertions(+), 10 deletions(-) create mode 100644 Meshflow/ws/tests/test_mc_node_consumer.py diff --git a/Meshflow/meshcore_packets/views.py b/Meshflow/meshcore_packets/views.py index f0757b9..2449dd0 100644 --- a/Meshflow/meshcore_packets/views.py +++ b/Meshflow/meshcore_packets/views.py @@ -184,6 +184,10 @@ def _dispatch_mc_channel_apply(managed_node: ManagedNode, channels: list[dict]) async def _check_and_send() -> str: try: if not await feeder_ws_group_has_subscribers(group): + logger.warning( + "MC channel apply: no WebSocket subscriber on group %s", + group, + ) return FEEDER_BOT_NOT_CONNECTED except Exception as exc: logger.exception("MC channel apply: feeder presence check failed: %s", exc) @@ -230,7 +234,9 @@ def post(self, request, internal_id, format=None): { "detail": ( "Feeder bot is not connected via WebSocket. " - "Start the bot with MESHCORE_UPLOAD_ENABLED and MESHFLOW_WS_URL configured." + "Start the bot with MESHCORE_UPLOAD_ENABLED and MESHFLOW_WS_URL configured. " + "For shared API keys, the bot must connect with " + "feeder_pubkey_prefix in the WebSocket URL (same 12-hex prefix as ingest)." ), "code": FEEDER_BOT_NOT_CONNECTED, }, diff --git a/Meshflow/ws/consumers.py b/Meshflow/ws/consumers.py index e6db148..312fdf1 100644 --- a/Meshflow/ws/consumers.py +++ b/Meshflow/ws/consumers.py @@ -31,9 +31,16 @@ async def connect(self): await self.close() return - managed_node = await self._validate_api_key(api_key) + feeder_pubkey_prefix = query_params.get("feeder_pubkey_prefix") + managed_node = await self._validate_api_key( + api_key, + feeder_pubkey_prefix=feeder_pubkey_prefix, + ) if not managed_node: - logger.warning("NodeConsumer: invalid or inactive api_key") + logger.warning( + "NodeConsumer: invalid api_key or feeder_pubkey_prefix=%s", + feeder_pubkey_prefix, + ) await self.close() return @@ -45,7 +52,9 @@ async def connect(self): await self.channel_layer.group_add(self.node_group, self.channel_name) await self.accept() logger.info( - f"NodeConsumer: bot connected for node {managed_node.meshtastic_node_id} ({managed_node.node_id_str})" + "NodeConsumer: bot connected for %s group=%s", + managed_node.node_id_str, + self.node_group, ) async def disconnect(self, close_code): @@ -65,8 +74,13 @@ async def node_command(self, event): await self.send(text_data=json.dumps(command)) @database_sync_to_async - def _validate_api_key(self, api_key): - """Validate NodeAPIKey and return the first linked ManagedNode, or None.""" + def _validate_api_key(self, api_key, feeder_pubkey_prefix=None): + """Validate NodeAPIKey and return the linked ManagedNode, or None.""" + from common.meshcore_feeder_auth import ( + MeshCoreFeederResolutionError, + resolve_meshcore_feeder, + ) + from common.protocol import Protocol from nodes.models import NodeAPIKey, NodeAuth try: @@ -74,13 +88,45 @@ def _validate_api_key(self, api_key): key_obj.last_used = timezone.now() key_obj.save(update_fields=["last_used"]) - node_auth = NodeAuth.objects.filter(api_key=key_obj).select_related("node").first() - if node_auth: - return node_auth.node + if feeder_pubkey_prefix: + try: + return resolve_meshcore_feeder( + api_key=key_obj, + feeder_pubkey_prefix=feeder_pubkey_prefix, + ) + except MeshCoreFeederResolutionError: + return None + + auths = list( + NodeAuth.objects.filter( + api_key=key_obj, + node__deleted_at__isnull=True, + ).select_related("node") + ) + if not auths: + return None + + mc_auths = [a for a in auths if a.node.protocol == Protocol.MESHCORE] + if len(mc_auths) > 1: + logger.warning( + "NodeConsumer: API key linked to %s MC feeders; " + "pass feeder_pubkey_prefix on ws/nodes/", + len(mc_auths), + ) + return None + if len(mc_auths) == 1: + return mc_auths[0].node + + if len(auths) > 1: + logger.warning( + "NodeConsumer: API key linked to %s nodes; use feeder_pubkey_prefix for MC", + len(auths), + ) + return auths[0].node except NodeAPIKey.DoesNotExist: pass except Exception as e: - logger.exception(f"NodeConsumer: error validating api_key: {e}") + logger.exception("NodeConsumer: error validating api_key: %s", e) return None diff --git a/Meshflow/ws/tests/test_mc_node_consumer.py b/Meshflow/ws/tests/test_mc_node_consumer.py new file mode 100644 index 0000000..8c7234d --- /dev/null +++ b/Meshflow/ws/tests/test_mc_node_consumer.py @@ -0,0 +1,95 @@ +"""MeshCore NodeConsumer WebSocket tests.""" + +import pytest +from channels.db import database_sync_to_async +from channels.routing import URLRouter +from channels.testing import WebsocketCommunicator + +import Meshflow.routing # noqa: F401 + +from common.protocol import Protocol +from common.ws_groups import managed_node_ws_group + +application = URLRouter(Meshflow.routing.websocket_urlpatterns) + +FEEDER_PUBKEY = "1a37f5aea4a1" + ("b" * 52) +FEEDER_PREFIX = "1a37f5aea4a1" + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_mc_consumer_requires_prefix_when_multiple_feeders( + create_managed_node, create_node_api_key +): + from nodes.models import NodeAuth + + @database_sync_to_async + def setup(): + constellation = create_managed_node(protocol=Protocol.MESHCORE).constellation + node_a = create_managed_node( + meshtastic_node_id=0, + protocol=Protocol.MESHCORE, + constellation=constellation, + mc_pubkey=FEEDER_PUBKEY, + ) + node_b = create_managed_node( + meshtastic_node_id=0, + protocol=Protocol.MESHCORE, + constellation=constellation, + mc_pubkey="c" * 64, + ) + api_key = create_node_api_key(constellation=constellation) + NodeAuth.objects.create(api_key=api_key, node=node_a) + NodeAuth.objects.create(api_key=api_key, node=node_b) + return api_key.key + + api_key = await setup() + communicator = WebsocketCommunicator( + application, + f"/ws/nodes/?api_key={api_key}", + ) + connected, _ = await communicator.connect() + assert connected is False + await communicator.disconnect() + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_mc_consumer_accepts_feeder_pubkey_prefix( + create_managed_node, create_node_api_key +): + from channels.layers import get_channel_layer + from nodes.models import NodeAuth + + @database_sync_to_async + def setup(): + node = create_managed_node( + meshtastic_node_id=0, + protocol=Protocol.MESHCORE, + mc_pubkey=FEEDER_PUBKEY, + ) + api_key = create_node_api_key(constellation=node.constellation) + NodeAuth.objects.create(api_key=api_key, node=node) + return { + "api_key": api_key.key, + "group": managed_node_ws_group(node), + } + + data = await setup() + url = f"/ws/nodes/?api_key={data['api_key']}&feeder_pubkey_prefix={FEEDER_PREFIX}" + communicator = WebsocketCommunicator(application, url) + connected, _ = await communicator.connect() + assert connected is True + + channel_layer = get_channel_layer() + await channel_layer.group_send( + data["group"], + { + "type": "node_command", + "command": {"type": "apply_mc_channel_config", "channels": []}, + }, + ) + response = await communicator.receive_json_from() + assert response["type"] == "apply_mc_channel_config" + + await communicator.disconnect() From 8249f0186a40d38db7861567b1f83d97c03c82d3 Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Thu, 21 May 2026 13:54:13 +0100 Subject: [PATCH 2/8] docs: update meshcore docs --- docs/REDIS.md | 2 +- .../meshcore/text-message-channels.md | 30 ++++++++++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/docs/REDIS.md b/docs/REDIS.md index 7979de2..b3f7477 100644 --- a/docs/REDIS.md +++ b/docs/REDIS.md @@ -12,7 +12,7 @@ Configuration lives in [`Meshflow/Meshflow/settings/base.py`](../Meshflow/Meshfl - **Purpose:** Pub/sub and group membership for WebSocket delivery (`channel_layer.group_send` / `group_add`). - **Code:** [`Meshflow/ws/consumers.py`](../Meshflow/ws/consumers.py). - **Consumers / groups:** - - **`NodeConsumer`** (`ws/nodes/?api_key=…`) — bots join group `node_{node_id}`; receives **`node_command`** events (e.g. traceroute commands). Emitters include traceroute views/tasks and management commands (see grep for `group_send` under `Meshflow/`). + - **`NodeConsumer`** (`ws/nodes/?api_key=…`) — feeder bots join group `node_{meshtastic_node_id}` or **`node_mc_{managed_node.internal_id}`** (MeshCore); receives **`node_command`** events (e.g. traceroute, `apply_mc_channel_config`). Any API/ASGI worker can `group_send`; membership is cluster-wide in Redis DB 0. Emitters include traceroute views/tasks, `meshcore_packets` apply-channel, and management commands. - **`TextMessageConsumer`** — authenticated UI clients join group **`text_messages`**; receives **`text_message`** events for mesh text broadcasts ([`Meshflow/ws/services/text_message.py`](../Meshflow/ws/services/text_message.py)). - **`TracerouteConsumer`** (`ws/traceroutes/?token=…`) — clients join group **`traceroutes`**; receives **`traceroute_update`** events ([`Meshflow/traceroute/ws_notify.py`](../Meshflow/traceroute/ws_notify.py)). - **TTL:** Connection/session lifetime; Channels manages internal keys. diff --git a/docs/features/meshcore/text-message-channels.md b/docs/features/meshcore/text-message-channels.md index d3116dd..6711cd2 100644 --- a/docs/features/meshcore/text-message-channels.md +++ b/docs/features/meshcore/text-message-channels.md @@ -78,7 +78,35 @@ sequenceDiagram **Drift:** if the API mirror and device disagree (e.g. failed push), the **next connect sync overwrites the API** from the device. No three-way merge in v1. -Today the bot **does not** start the WebSocket client for `RADIO_PROTOCOL=meshcore` (Meshtastic-only). Enabling WS for MC feeders is part of [#297](https://github.com/pskillen/meshflow-api/issues/297). +The bot starts **`MeshflowWSClient`** for MeshCore when `STORAGE_API_ROOT` + token are set (WebSocket URL derived from the API base URL unless `MESHFLOW_WS_URL` is set). The feeder’s 12-hex pubkey prefix is appended automatically on connect (not an operator env var). + +### Horizontal scaling (API web tier) + +Apply and traceroute commands use **Django Channels + Redis DB 0** ([`docs/REDIS.md`](../../REDIS.md)), not in-process Django signals. Signals only run inside one Python process and do not reach other workers or hosts. + +| Piece | Scales horizontally? | +|-------|----------------------| +| `POST apply-mc-channel-config` on any Gunicorn/Uvicorn worker | Yes — handler calls `channel_layer.group_send` via Redis | +| `feeder_ws_group_has_subscribers` on any worker | Yes — `channels_redis` stores group membership in Redis DB 0 | +| Bot `NodeConsumer` WebSocket | One connection per bot; must land on **some** ASGI instance in the deployment that shares that Redis | +| Browser UI WebSocket (`text_messages`, JWT) | Separate consumer/groups; unrelated to feeder commands | + +You do **not** need a separate “signal” bus for multi-worker API: Redis channel layer already is that bus. + +### Local API + pre-prod bot (common 503 cause) + +Sharing **Postgres** and **Redis** is not enough if the **HTTP hosts differ**: + +- UI / `apply-mc-channel-config` → `http://localhost:8000` (local Django) +- Bot → `STORAGE_API_ROOT=https://pre-prod…/api` and WebSocket to **pre-prod** + +The bot registers in Redis group `node_mc_{uuid}` through **pre-prod’s** `NodeConsumer`. Local Django also reads Redis DB 0, so presence *can* work if both use the same `REDIS_HOST` / password and the managed node `internal_id` + `mc_pubkey` match the device. If local settings use **`InMemoryChannelLayer`** (tests only) or a different Redis DB/host, local apply always sees **503 feeder not connected** even though pre-prod logs show `MeshflowWSClient: connected`. + +**Practical dev setups (pick one):** + +1. Point **meshflow-ui** at pre-prod API (browser and bot share one deployment), or +2. Point **bot** `STORAGE_API_ROOT` at local API and run bot + API locally, or +3. Keep split hosts but verify local `CHANNEL_LAYERS` is `channels_redis` to the **same** Redis DB 0 as pre-prod. **Implementation plan:** Cursor workspace plan `mc_text_textmessage_pipeline_2c3e9fb8.plan.md` (kept in sync with this file; **this doc is canonical** in git for operators and PRs). From f805f644ffa0b60e3f0c7d62f1fbce6db53116e6 Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Thu, 21 May 2026 13:54:28 +0100 Subject: [PATCH 3/8] chore: add .env to .cursorignore --- .cursorignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.cursorignore b/.cursorignore index 3b72e12..dbd0569 100644 --- a/.cursorignore +++ b/.cursorignore @@ -72,3 +72,6 @@ remote_db_dump.sql # Tailwind / CSS build artifact (if generated locally) tailwind.css + +# Secrets +.env From d1028e3b8b6bfcb78a60f483cca238068408ce9e Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Thu, 21 May 2026 14:11:18 +0100 Subject: [PATCH 4/8] fix(common): detect feeder WS presence via channels_redis ZSET group_channels does not exist on channels-redis 4.x; probe asgi:group:{name} like group_send does. Fixes false 503 when Redis shows the bot subscribed. --- Meshflow/common/feeder_ws.py | 45 +++++++++++++++-- Meshflow/common/tests/test_feeder_ws.py | 65 +++++++++++++++++++++++++ 2 files changed, 105 insertions(+), 5 deletions(-) create mode 100644 Meshflow/common/tests/test_feeder_ws.py diff --git a/Meshflow/common/feeder_ws.py b/Meshflow/common/feeder_ws.py index a8e5fcb..957526e 100644 --- a/Meshflow/common/feeder_ws.py +++ b/Meshflow/common/feeder_ws.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +import time from channels.layers import get_channel_layer @@ -12,16 +13,50 @@ COMMAND_DISPATCH_UNAVAILABLE = "command_dispatch_unavailable" +async def _redis_group_has_channels(layer, group: str) -> bool: + """Presence check for channels_redis (group is a ZSET at asgi:group:{name}).""" + key = layer._group_key(group) + connection = layer.connection(layer.consistent_hash(group)) + await connection.zremrangebyscore( + key, + min=0, + max=int(time.time()) - layer.group_expiry, + ) + names = await connection.zrange(key, 0, -1) + return bool(names) + + +async def _inmemory_group_has_channels(layer, group: str) -> bool: + """Presence check for InMemoryChannelLayer (tests).""" + members = layer.groups.get(group) or {} + now = time.time() + return any(now - ts < layer.group_expiry for ts in members.values()) + + async def feeder_ws_group_has_subscribers(group: str) -> bool: """Return True if at least one bot WebSocket is subscribed to ``group``.""" channel_layer = get_channel_layer() if channel_layer is None: return False - if not hasattr(channel_layer, "group_channels"): - logger.warning("Channel layer does not support group_channels; assuming feeder offline") - return False - channels = await channel_layer.group_channels(group) - return bool(channels) + + try: + from channels.layers import InMemoryChannelLayer + from channels_redis.core import RedisChannelLayer + except ImportError: + RedisChannelLayer = None # type: ignore[misc, assignment] + InMemoryChannelLayer = None # type: ignore[misc, assignment] + + if RedisChannelLayer is not None and isinstance(channel_layer, RedisChannelLayer): + return await _redis_group_has_channels(channel_layer, group) + + if InMemoryChannelLayer is not None and isinstance(channel_layer, InMemoryChannelLayer): + return await _inmemory_group_has_channels(channel_layer, group) + + logger.warning( + "Channel layer %s has no membership probe; assuming feeder offline", + type(channel_layer).__name__, + ) + return False async def dispatch_node_command(group: str, command: dict) -> None: diff --git a/Meshflow/common/tests/test_feeder_ws.py b/Meshflow/common/tests/test_feeder_ws.py new file mode 100644 index 0000000..22fcac6 --- /dev/null +++ b/Meshflow/common/tests/test_feeder_ws.py @@ -0,0 +1,65 @@ +"""Tests for feeder WebSocket presence helpers.""" + +import time +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from common.feeder_ws import feeder_ws_group_has_subscribers + + +@pytest.mark.asyncio +async def test_redis_group_has_subscribers_when_zset_non_empty(monkeypatch): + from channels_redis.core import RedisChannelLayer + + layer = MagicMock(spec=RedisChannelLayer) + layer._group_key.return_value = "asgi:group:node_mc_test" + layer.consistent_hash.return_value = 0 + layer.group_expiry = 86400 + connection = AsyncMock() + connection.zrange.return_value = [b"specific.channel.name"] + layer.connection.return_value = connection + + monkeypatch.setattr( + "common.feeder_ws.get_channel_layer", + lambda: layer, + ) + + assert await feeder_ws_group_has_subscribers("node_mc_test") is True + connection.zremrangebyscore.assert_awaited_once() + connection.zrange.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_redis_group_has_no_subscribers_when_zset_empty(monkeypatch): + from channels_redis.core import RedisChannelLayer + + layer = MagicMock(spec=RedisChannelLayer) + layer._group_key.return_value = "asgi:group:node_mc_empty" + layer.consistent_hash.return_value = 0 + layer.group_expiry = 86400 + connection = AsyncMock() + connection.zrange.return_value = [] + layer.connection.return_value = connection + + monkeypatch.setattr( + "common.feeder_ws.get_channel_layer", + lambda: layer, + ) + + assert await feeder_ws_group_has_subscribers("node_mc_empty") is False + + +@pytest.mark.asyncio +async def test_inmemory_group_has_subscribers(monkeypatch): + from channels.layers import InMemoryChannelLayer + + layer = InMemoryChannelLayer() + layer.groups["node_mc_mem"] = {"test.channel": time.time()} + + monkeypatch.setattr( + "common.feeder_ws.get_channel_layer", + lambda: layer, + ) + + assert await feeder_ws_group_has_subscribers("node_mc_mem") is True From d0760f2c41950bd87c19851c92bf26a475cf1687 Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Thu, 21 May 2026 14:25:16 +0100 Subject: [PATCH 5/8] fix(meshcore): msgpack-safe apply_mc_channels WebSocket dispatch Use plain PUBLIC/HASHTAG choice values instead of gettext labels and sanitize group_send payloads so channels_redis can serialize commands. --- Meshflow/common/feeder_ws.py | 25 ++++++++++-- Meshflow/common/tests/test_feeder_ws.py | 39 ++++++++++++++++++- .../meshcore_packets/serializers_channel.py | 18 +++++---- .../tests/test_apply_channel.py | 2 + 4 files changed, 71 insertions(+), 13 deletions(-) diff --git a/Meshflow/common/feeder_ws.py b/Meshflow/common/feeder_ws.py index 957526e..4c73072 100644 --- a/Meshflow/common/feeder_ws.py +++ b/Meshflow/common/feeder_ws.py @@ -13,6 +13,21 @@ COMMAND_DISPATCH_UNAVAILABLE = "command_dispatch_unavailable" +def _ws_json_safe(value): + """Recursively coerce channel-layer payloads to msgpack-safe plain Python.""" + from django.utils.functional import Promise + + if isinstance(value, Promise): + return str(value) + if isinstance(value, dict): + return {str(k): _ws_json_safe(v) for k, v in value.items()} + if isinstance(value, (list, tuple)): + return [_ws_json_safe(v) for v in value] + if isinstance(value, (str, int, float, bool)) or value is None: + return value + return str(value) + + async def _redis_group_has_channels(layer, group: str) -> bool: """Presence check for channels_redis (group is a ZSET at asgi:group:{name}).""" key = layer._group_key(group) @@ -66,8 +81,10 @@ async def dispatch_node_command(group: str, command: dict) -> None: raise RuntimeError("Channel layer is not configured") await channel_layer.group_send( group, - { - "type": "node_command", - "command": command, - }, + _ws_json_safe( + { + "type": "node_command", + "command": command, + } + ), ) diff --git a/Meshflow/common/tests/test_feeder_ws.py b/Meshflow/common/tests/test_feeder_ws.py index 22fcac6..4122a78 100644 --- a/Meshflow/common/tests/test_feeder_ws.py +++ b/Meshflow/common/tests/test_feeder_ws.py @@ -3,9 +3,11 @@ import time from unittest.mock import AsyncMock, MagicMock +import msgpack import pytest +from django.utils.translation import gettext_lazy -from common.feeder_ws import feeder_ws_group_has_subscribers +from common.feeder_ws import _ws_json_safe, dispatch_node_command, feeder_ws_group_has_subscribers @pytest.mark.asyncio @@ -63,3 +65,38 @@ async def test_inmemory_group_has_subscribers(monkeypatch): ) assert await feeder_ws_group_has_subscribers("node_mc_mem") is True + + +def test_ws_json_safe_coerces_lazy_translation_proxy(): + payload = { + "type": "node_command", + "command": { + "type": "apply_mc_channels", + "channels": [ + { + "mc_channel_type": gettext_lazy("HASHTAG"), + "name": "test", + } + ], + }, + } + safe = _ws_json_safe(payload) + msgpack.packb(safe) + assert safe["command"]["channels"][0]["mc_channel_type"] == "HASHTAG" + + +@pytest.mark.asyncio +async def test_dispatch_node_command_sends_msgpack_safe_payload(monkeypatch): + layer = AsyncMock() + monkeypatch.setattr("common.feeder_ws.get_channel_layer", lambda: layer) + + command = { + "type": "apply_mc_channels", + "channels": [{"mc_channel_type": gettext_lazy("PUBLIC"), "name": "x"}], + } + await dispatch_node_command("node_mc_test", command) + + layer.group_send.assert_awaited_once() + message = layer.group_send.await_args[0][1] + msgpack.packb(message) + assert message["command"]["channels"][0]["mc_channel_type"] == "PUBLIC" diff --git a/Meshflow/meshcore_packets/serializers_channel.py b/Meshflow/meshcore_packets/serializers_channel.py index 103bac9..1c59902 100644 --- a/Meshflow/meshcore_packets/serializers_channel.py +++ b/Meshflow/meshcore_packets/serializers_channel.py @@ -4,13 +4,17 @@ from constellations.models import MeshCoreChannelType, MessageChannel +# Wire/API strings (not gettext labels — lazy __proxy__ breaks Channels msgpack). +MC_CHANNEL_TYPE_API_CHOICES = [ + ("PUBLIC", "PUBLIC"), + ("HASHTAG", "HASHTAG"), +] + class McChannelSnapshotEntrySerializer(serializers.Serializer): mc_channel_idx = serializers.IntegerField(min_value=0, max_value=63) name = serializers.CharField(max_length=100) - mc_channel_type = serializers.ChoiceField( - choices=[(c.label, c.label) for c in MeshCoreChannelType], - ) + mc_channel_type = serializers.ChoiceField(choices=MC_CHANNEL_TYPE_API_CHOICES) mc_hashtag = serializers.CharField(max_length=64, required=False, allow_null=True, allow_blank=True) @@ -35,15 +39,13 @@ class Meta: def get_mc_channel_type(self, obj): if obj.mc_channel_type is None: return None - return MeshCoreChannelType(obj.mc_channel_type).label + return MeshCoreChannelType(obj.mc_channel_type).name class McChannelApplyEntrySerializer(serializers.Serializer): mc_channel_idx = serializers.IntegerField(min_value=0, max_value=63, required=False) name = serializers.CharField(max_length=100) - mc_channel_type = serializers.ChoiceField( - choices=[(c.label, c.label) for c in MeshCoreChannelType], - ) + mc_channel_type = serializers.ChoiceField(choices=MC_CHANNEL_TYPE_API_CHOICES) mc_hashtag = serializers.CharField(max_length=64, required=False, allow_null=True, allow_blank=True) @@ -53,7 +55,7 @@ class McChannelApplySerializer(serializers.Serializer): def validate(self, attrs): channels = attrs.get("channels") or [] for entry in channels: - if entry.get("mc_channel_type") == MeshCoreChannelType.HASHTAG.label: + if str(entry.get("mc_channel_type")).upper() == "HASHTAG": tag = (entry.get("mc_hashtag") or entry.get("name") or "").strip().lstrip("#") if not tag: raise serializers.ValidationError({"channels": "Hashtag channels require a non-empty hashtag."}) diff --git a/Meshflow/meshcore_packets/tests/test_apply_channel.py b/Meshflow/meshcore_packets/tests/test_apply_channel.py index 8ef77ae..ca0f475 100644 --- a/Meshflow/meshcore_packets/tests/test_apply_channel.py +++ b/Meshflow/meshcore_packets/tests/test_apply_channel.py @@ -133,3 +133,5 @@ def test_apply_dispatches_when_feeder_connected(create_user, create_managed_node sent_channels = dispatch_mock.await_args[0][1]["channels"] assert sent_channels[0]["mc_hashtag"] == "galloway" assert sent_channels[0]["name"] == "galloway" + assert sent_channels[0]["mc_channel_type"] == "HASHTAG" + assert type(sent_channels[0]["mc_channel_type"]) is str From 854223fbe74a81b783eda347c238267814637256 Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Thu, 21 May 2026 14:25:22 +0100 Subject: [PATCH 6/8] feat(admin): expose MeshCore channels on MessageChannel and ManagedNode List MC type, hashtag, and feeder channel mirror with sync timestamp for operators debugging device channel sync and apply flows. --- Meshflow/constellations/admin.py | 58 +++++++++++++++++++++++++++++--- Meshflow/nodes/admin.py | 30 +++++++++++++++-- 2 files changed, 81 insertions(+), 7 deletions(-) diff --git a/Meshflow/constellations/admin.py b/Meshflow/constellations/admin.py index 646df16..669e75e 100644 --- a/Meshflow/constellations/admin.py +++ b/Meshflow/constellations/admin.py @@ -3,7 +3,9 @@ from django.utils.html import format_html from django.utils.translation import gettext_lazy as _ -from .models import Constellation, ConstellationUserMembership, MessageChannel +from common.protocol import Protocol + +from .models import Constellation, ConstellationUserMembership, MeshCoreChannelType, MessageChannel class ConstellationAdminForm(forms.ModelForm): @@ -124,13 +126,61 @@ def get_constellation_creator(self, obj): get_constellation_creator.admin_order_field = "constellation__created_by__username" +class MeshCoreMessageChannelFilter(admin.SimpleListFilter): + title = _("MeshCore channels") + parameter_name = "meshcore_channels" + + def lookups(self, request, model_admin): + return ( + ("yes", _("MeshCore only")), + ("no", _("Non-MeshCore")), + ) + + def queryset(self, request, queryset): + if self.value() == "yes": + return queryset.filter(protocol=Protocol.MESHCORE) + if self.value() == "no": + return queryset.exclude(protocol=Protocol.MESHCORE) + return queryset + + @admin.register(MessageChannel) class MessageChannelAdmin(admin.ModelAdmin): - list_display = ("id", "name", "protocol", "mc_channel_idx", "constellation") + list_display = ( + "id", + "name", + "protocol", + "mc_channel_idx", + "mc_channel_type_display", + "mc_hashtag", + "constellation", + ) list_filter = ( ("protocol", admin.ChoicesFieldListFilter), + MeshCoreMessageChannelFilter, + ("mc_channel_type", admin.ChoicesFieldListFilter), "constellation", ("mc_channel_idx", admin.EmptyFieldListFilter), ) - search_fields = ("name", "id") - ordering = ("name",) + search_fields = ("name", "id", "mc_hashtag", "constellation__name") + ordering = ("constellation__name", "protocol", "mc_channel_idx", "name") + list_select_related = ("constellation",) + fieldsets = ( + (None, {"fields": ("name", "constellation", "protocol")}), + ( + _("MeshCore channel"), + { + "fields": ("mc_channel_idx", "mc_channel_type", "mc_hashtag"), + "description": _( + "Device channel index and type when protocol is MeshCore. " + "Hashtag is required for HASHTAG channels (no leading #)." + ), + }, + ), + ) + + @admin.display(description=_("MC type"), ordering="mc_channel_type") + def mc_channel_type_display(self, obj): + if obj.mc_channel_type is None: + return "—" + return MeshCoreChannelType(obj.mc_channel_type).name diff --git a/Meshflow/nodes/admin.py b/Meshflow/nodes/admin.py index 45fec26..2809ec2 100644 --- a/Meshflow/nodes/admin.py +++ b/Meshflow/nodes/admin.py @@ -453,6 +453,8 @@ class ManagedNodeAdmin(admin.ModelAdmin): "name", "owner", "constellation", + "mc_channel_count", + "mc_channels_synced_at", "allow_auto_traceroute", "status_is_sending_data", "status_last_packet_ingested_at", @@ -468,6 +470,7 @@ class ManagedNodeAdmin(admin.ModelAdmin): search_fields = ( "meshtastic_node_id", "name", + "mc_pubkey", "owner__username", "owner__email", ) @@ -496,9 +499,20 @@ def get_queryset(self, request): def display_id(self, obj): return obj.node_id_str + @admin.display(description=_("MC channels")) + def mc_channel_count(self, obj): + if obj.protocol != Protocol.MESHCORE: + return "—" + return obj.mc_channels.count() + def get_readonly_fields(self, request, obj=None): if obj and obj.protocol == Protocol.MESHCORE: - return ("display_id",) + return ("display_id", "mc_channels_synced_at") + return ("mc_channels_synced_at",) + + def get_filter_horizontal(self, request, obj=None): + if obj and obj.protocol == Protocol.MESHCORE: + return ("mc_channels",) return () def get_fieldsets(self, request, obj=None): @@ -512,8 +526,18 @@ def get_fieldsets(self, request, obj=None): }, ) if obj and obj.protocol == Protocol.MESHCORE: - mc_fields = (_("MeshCore identity"), {"fields": ("mc_pubkey", "display_id")}) - return (common, mc_fields) + mc_identity = (_("MeshCore identity"), {"fields": ("mc_pubkey", "display_id")}) + mc_channels = ( + _("MeshCore channels"), + { + "fields": ("mc_channels", "mc_channels_synced_at"), + "description": _( + "Mirror of the feeder device channel table (updated by bot sync). " + "Edit links only; channel rows live under Message channels." + ), + }, + ) + return (common, mc_identity, mc_channels) return (common, channels) From 4758ed7093b95b6bb8455dc7129568b9f2b9c499 Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Thu, 21 May 2026 14:42:41 +0100 Subject: [PATCH 7/8] feat(admin): MC-only channel UI, read-only mirror, push to feeder Add MeshCoreMessageChannel proxy admin with #hashtag labels, show device mirror read-only on ManagedNode, and admin action to push mirror via WS. Extract channel_apply service shared with the apply API endpoint. --- Meshflow/common/mc_channel_labels.py | 50 ++++++++++ .../common/tests/test_mc_channel_labels.py | 54 +++++++++++ Meshflow/constellations/admin.py | 78 ++++++++------- .../0010_meshcore_message_channel_proxy.py | 26 +++++ Meshflow/constellations/models.py | 9 ++ .../services/channel_apply.py | 67 +++++++++++++ .../tests/test_apply_channel.py | 16 +-- .../tests/test_channel_apply_service.py | 52 ++++++++++ Meshflow/meshcore_packets/views.py | 44 +-------- Meshflow/nodes/admin.py | 97 ++++++++++++++++--- 10 files changed, 398 insertions(+), 95 deletions(-) create mode 100644 Meshflow/common/mc_channel_labels.py create mode 100644 Meshflow/common/tests/test_mc_channel_labels.py create mode 100644 Meshflow/constellations/migrations/0010_meshcore_message_channel_proxy.py create mode 100644 Meshflow/meshcore_packets/services/channel_apply.py create mode 100644 Meshflow/meshcore_packets/tests/test_channel_apply_service.py diff --git a/Meshflow/common/mc_channel_labels.py b/Meshflow/common/mc_channel_labels.py new file mode 100644 index 0000000..ff63b80 --- /dev/null +++ b/Meshflow/common/mc_channel_labels.py @@ -0,0 +1,50 @@ +"""Display labels and apply payloads for MeshCore MessageChannel rows.""" + +from __future__ import annotations + +from constellations.models import MeshCoreChannelType, MessageChannel + + +def mc_channel_admin_label(channel: MessageChannel) -> str: + """Human label for admin lists: #hashtag for HASHTAG, plain name for PUBLIC.""" + if channel.mc_channel_type == MeshCoreChannelType.HASHTAG: + tag = (channel.mc_hashtag or channel.name or "").strip().lstrip("#") + if tag: + return f"#{tag}" + name = (channel.name or "").strip() + if name: + return name + if channel.mc_channel_idx is not None: + return f"slot {channel.mc_channel_idx}" + return str(channel.pk) + + +def mc_channel_type_name(channel: MessageChannel) -> str: + if channel.mc_channel_type is None: + return "—" + return MeshCoreChannelType(channel.mc_channel_type).name + + +def message_channel_to_apply_entry(channel: MessageChannel) -> dict: + """Build one apply_mc_channel_config entry from a MessageChannel row.""" + ch_type = mc_channel_type_name(channel) + if ch_type == "—": + ch_type = "PUBLIC" + entry = { + "mc_channel_idx": channel.mc_channel_idx, + "name": channel.name, + "mc_channel_type": ch_type, + } + if channel.mc_channel_type == MeshCoreChannelType.HASHTAG: + tag = (channel.mc_hashtag or channel.name or "").strip().lstrip("#") + entry["mc_hashtag"] = tag[:64] if tag else None + if tag: + entry["name"] = tag[:100] + return entry + + +def managed_node_mc_channels_queryset(managed_node): + """MC channel rows linked on a MeshCore feeder (device mirror).""" + from common.protocol import Protocol + + return managed_node.mc_channels.filter(protocol=Protocol.MESHCORE).order_by("mc_channel_idx") diff --git a/Meshflow/common/tests/test_mc_channel_labels.py b/Meshflow/common/tests/test_mc_channel_labels.py new file mode 100644 index 0000000..f3c7cfb --- /dev/null +++ b/Meshflow/common/tests/test_mc_channel_labels.py @@ -0,0 +1,54 @@ +"""Tests for MeshCore channel admin labels and apply payloads.""" + +import pytest + +from common.mc_channel_labels import ( + mc_channel_admin_label, + message_channel_to_apply_entry, +) +from common.protocol import Protocol +from constellations.models import MeshCoreChannelType, MessageChannel + + +@pytest.mark.django_db +def test_mc_channel_admin_label_public(create_constellation): + constellation = create_constellation() + ch = MessageChannel.objects.create( + name="Public", + constellation=constellation, + protocol=Protocol.MESHCORE, + mc_channel_idx=0, + mc_channel_type=MeshCoreChannelType.PUBLIC, + ) + assert mc_channel_admin_label(ch) == "Public" + + +@pytest.mark.django_db +def test_mc_channel_admin_label_hashtag_prefix(create_constellation): + constellation = create_constellation() + ch = MessageChannel.objects.create( + name="galloway", + constellation=constellation, + protocol=Protocol.MESHCORE, + mc_channel_idx=1, + mc_channel_type=MeshCoreChannelType.HASHTAG, + mc_hashtag="galloway", + ) + assert mc_channel_admin_label(ch) == "#galloway" + + +@pytest.mark.django_db +def test_message_channel_to_apply_entry_hashtag(create_constellation): + constellation = create_constellation() + ch = MessageChannel.objects.create( + name="galloway", + constellation=constellation, + protocol=Protocol.MESHCORE, + mc_channel_idx=1, + mc_channel_type=MeshCoreChannelType.HASHTAG, + mc_hashtag="galloway", + ) + entry = message_channel_to_apply_entry(ch) + assert entry["mc_channel_type"] == "HASHTAG" + assert entry["mc_hashtag"] == "galloway" + assert entry["name"] == "galloway" diff --git a/Meshflow/constellations/admin.py b/Meshflow/constellations/admin.py index 669e75e..3bf4458 100644 --- a/Meshflow/constellations/admin.py +++ b/Meshflow/constellations/admin.py @@ -3,9 +3,15 @@ from django.utils.html import format_html from django.utils.translation import gettext_lazy as _ +from common.mc_channel_labels import mc_channel_admin_label, mc_channel_type_name from common.protocol import Protocol -from .models import Constellation, ConstellationUserMembership, MeshCoreChannelType, MessageChannel +from .models import ( + Constellation, + ConstellationUserMembership, + MeshCoreMessageChannel, + MessageChannel, +) class ConstellationAdminForm(forms.ModelForm): @@ -126,61 +132,63 @@ def get_constellation_creator(self, obj): get_constellation_creator.admin_order_field = "constellation__created_by__username" -class MeshCoreMessageChannelFilter(admin.SimpleListFilter): - title = _("MeshCore channels") - parameter_name = "meshcore_channels" +@admin.register(MessageChannel) +class MessageChannelAdmin(admin.ModelAdmin): + """Meshtastic and legacy rows; MeshCore operators should use MeshCore channels.""" - def lookups(self, request, model_admin): - return ( - ("yes", _("MeshCore only")), - ("no", _("Non-MeshCore")), - ) + list_display = ("id", "name", "protocol", "constellation") + list_filter = ( + ("protocol", admin.ChoicesFieldListFilter), + "constellation", + ) + search_fields = ("name", "id", "constellation__name") + ordering = ("constellation__name", "name") + list_select_related = ("constellation",) - def queryset(self, request, queryset): - if self.value() == "yes": - return queryset.filter(protocol=Protocol.MESHCORE) - if self.value() == "no": - return queryset.exclude(protocol=Protocol.MESHCORE) - return queryset +@admin.register(MeshCoreMessageChannel) +class MeshCoreMessageChannelAdmin(admin.ModelAdmin): + """Constellation MC channel catalog (device slots). Push to radio from Managed node admin.""" -@admin.register(MessageChannel) -class MessageChannelAdmin(admin.ModelAdmin): list_display = ( - "id", - "name", - "protocol", "mc_channel_idx", + "admin_label", "mc_channel_type_display", - "mc_hashtag", "constellation", + "id", ) list_filter = ( - ("protocol", admin.ChoicesFieldListFilter), - MeshCoreMessageChannelFilter, ("mc_channel_type", admin.ChoicesFieldListFilter), "constellation", - ("mc_channel_idx", admin.EmptyFieldListFilter), ) - search_fields = ("name", "id", "mc_hashtag", "constellation__name") - ordering = ("constellation__name", "protocol", "mc_channel_idx", "name") + search_fields = ("name", "mc_hashtag", "constellation__name") + ordering = ("constellation__name", "mc_channel_idx") list_select_related = ("constellation",) fieldsets = ( - (None, {"fields": ("name", "constellation", "protocol")}), + (None, {"fields": ("constellation", "mc_channel_idx")}), ( - _("MeshCore channel"), + _("Channel"), { - "fields": ("mc_channel_idx", "mc_channel_type", "mc_hashtag"), + "fields": ("name", "mc_channel_type", "mc_hashtag"), "description": _( - "Device channel index and type when protocol is MeshCore. " - "Hashtag is required for HASHTAG channels (no leading #)." + "PUBLIC channels use a plain name. HASHTAG channels use mc_hashtag " + "(no leading #); lists show #prefix for hashtags." ), }, ), ) - @admin.display(description=_("MC type"), ordering="mc_channel_type") + def get_queryset(self, request): + return super().get_queryset(request).filter(protocol=Protocol.MESHCORE) + + def save_model(self, request, obj, form, change): + obj.protocol = Protocol.MESHCORE + super().save_model(request, obj, form, change) + + @admin.display(description=_("Label"), ordering="name") + def admin_label(self, obj): + return mc_channel_admin_label(obj) + + @admin.display(description=_("Type"), ordering="mc_channel_type") def mc_channel_type_display(self, obj): - if obj.mc_channel_type is None: - return "—" - return MeshCoreChannelType(obj.mc_channel_type).name + return mc_channel_type_name(obj) diff --git a/Meshflow/constellations/migrations/0010_meshcore_message_channel_proxy.py b/Meshflow/constellations/migrations/0010_meshcore_message_channel_proxy.py new file mode 100644 index 0000000..e05dc47 --- /dev/null +++ b/Meshflow/constellations/migrations/0010_meshcore_message_channel_proxy.py @@ -0,0 +1,26 @@ +# Generated by Django 5.2.14 on 2026-05-21 13:33 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('constellations', '0009_messagechannel_mc_type_hashtag'), + ] + + operations = [ + migrations.CreateModel( + name='MeshCoreMessageChannel', + fields=[ + ], + options={ + 'verbose_name': 'MeshCore channel', + 'verbose_name_plural': 'MeshCore channels', + 'proxy': True, + 'indexes': [], + 'constraints': [], + }, + bases=('constellations.messagechannel',), + ), + ] diff --git a/Meshflow/constellations/models.py b/Meshflow/constellations/models.py index ace3063..7abb55e 100644 --- a/Meshflow/constellations/models.py +++ b/Meshflow/constellations/models.py @@ -99,3 +99,12 @@ class Meta: def __str__(self): return self.name + + +class MeshCoreMessageChannel(MessageChannel): + """Proxy for Django admin: MeshCore channel rows only.""" + + class Meta: + proxy = True + verbose_name = _("MeshCore channel") + verbose_name_plural = _("MeshCore channels") diff --git a/Meshflow/meshcore_packets/services/channel_apply.py b/Meshflow/meshcore_packets/services/channel_apply.py new file mode 100644 index 0000000..bf08643 --- /dev/null +++ b/Meshflow/meshcore_packets/services/channel_apply.py @@ -0,0 +1,67 @@ +"""Push MeshCore channel config to a connected feeder bot via WebSocket.""" + +from __future__ import annotations + +import logging + +from asgiref.sync import async_to_sync + +from common.feeder_ws import ( + COMMAND_DISPATCH_UNAVAILABLE, + FEEDER_BOT_NOT_CONNECTED, + dispatch_node_command, + feeder_ws_group_has_subscribers, +) +from common.mc_channel_labels import managed_node_mc_channels_queryset, message_channel_to_apply_entry +from common.protocol import Protocol +from common.ws_groups import managed_node_ws_group +from nodes.models import ManagedNode + +logger = logging.getLogger(__name__) + + +def build_apply_channels_for_managed_node(managed_node: ManagedNode) -> list[dict]: + """Snapshot entries for apply_mc_channel_config from the feeder mirror.""" + if managed_node.protocol != Protocol.MESHCORE: + return [] + return [message_channel_to_apply_entry(ch) for ch in managed_node_mc_channels_queryset(managed_node)] + + +def dispatch_mc_channel_apply(managed_node: ManagedNode, channels: list[dict]) -> str: + """Dispatch apply_mc_channel_config. Returns ``sent`` or an error code string.""" + group = managed_node_ws_group(managed_node) + + async def _check_and_send() -> str: + try: + if not await feeder_ws_group_has_subscribers(group): + logger.warning( + "MC channel apply: no WebSocket subscriber on group %s", + group, + ) + return FEEDER_BOT_NOT_CONNECTED + except Exception as exc: + logger.exception("MC channel apply: feeder presence check failed: %s", exc) + return COMMAND_DISPATCH_UNAVAILABLE + + try: + await dispatch_node_command( + group, + { + "type": "apply_mc_channel_config", + "channels": channels, + }, + ) + except Exception as exc: + logger.exception("MC channel apply: group_send failed: %s", exc) + return COMMAND_DISPATCH_UNAVAILABLE + return "sent" + + return async_to_sync(_check_and_send)() + + +def apply_mc_channels_to_feeder(managed_node: ManagedNode, channels: list[dict] | None = None) -> str: + """Push channel config to the feeder bot. Uses mirror when *channels* is omitted.""" + if managed_node.protocol != Protocol.MESHCORE: + raise ValueError("apply_mc_channels_to_feeder requires a MeshCore managed node") + payload = channels if channels is not None else build_apply_channels_for_managed_node(managed_node) + return dispatch_mc_channel_apply(managed_node, payload) diff --git a/Meshflow/meshcore_packets/tests/test_apply_channel.py b/Meshflow/meshcore_packets/tests/test_apply_channel.py index ca0f475..af50684 100644 --- a/Meshflow/meshcore_packets/tests/test_apply_channel.py +++ b/Meshflow/meshcore_packets/tests/test_apply_channel.py @@ -24,7 +24,7 @@ def test_apply_returns_503_when_feeder_not_connected(create_user, create_managed url = reverse("meshcore-apply-mc-channel-config", kwargs={"internal_id": node.internal_id}) with patch( - "meshcore_packets.views.feeder_ws_group_has_subscribers", + "meshcore_packets.services.channel_apply.feeder_ws_group_has_subscribers", new_callable=AsyncMock, return_value=False, ): @@ -62,12 +62,12 @@ def test_apply_returns_503_when_dispatch_fails(create_user, create_managed_node) with ( patch( - "meshcore_packets.views.feeder_ws_group_has_subscribers", + "meshcore_packets.services.channel_apply.feeder_ws_group_has_subscribers", new_callable=AsyncMock, return_value=True, ), patch( - "meshcore_packets.views.dispatch_node_command", + "meshcore_packets.services.channel_apply.dispatch_node_command", new_callable=AsyncMock, side_effect=RuntimeError("TCPTransport closed"), ), @@ -104,13 +104,13 @@ def test_apply_dispatches_when_feeder_connected(create_user, create_managed_node with ( patch( - "meshcore_packets.views.feeder_ws_group_has_subscribers", + "meshcore_packets.services.channel_apply.feeder_ws_group_has_subscribers", new_callable=AsyncMock, return_value=True, ), patch( - "meshcore_packets.views.dispatch_node_command", - new_callable=AsyncMock, + "meshcore_packets.views.apply_mc_channels_to_feeder", + return_value="sent", ) as dispatch_mock, ): response = client.post( @@ -129,8 +129,8 @@ def test_apply_dispatches_when_feeder_connected(create_user, create_managed_node ) assert response.status_code == 202 - dispatch_mock.assert_awaited_once() - sent_channels = dispatch_mock.await_args[0][1]["channels"] + dispatch_mock.assert_called_once() + sent_channels = dispatch_mock.call_args[0][1] assert sent_channels[0]["mc_hashtag"] == "galloway" assert sent_channels[0]["name"] == "galloway" assert sent_channels[0]["mc_channel_type"] == "HASHTAG" diff --git a/Meshflow/meshcore_packets/tests/test_channel_apply_service.py b/Meshflow/meshcore_packets/tests/test_channel_apply_service.py new file mode 100644 index 0000000..6065372 --- /dev/null +++ b/Meshflow/meshcore_packets/tests/test_channel_apply_service.py @@ -0,0 +1,52 @@ +"""Tests for channel_apply service.""" + +from unittest.mock import AsyncMock, patch + +import pytest + +from common.protocol import Protocol +from constellations.models import MeshCoreChannelType, MessageChannel +from meshcore_packets.services.channel_apply import build_apply_channels_for_managed_node + + +@pytest.mark.django_db +def test_build_apply_channels_for_managed_node(meshcore_feeder): + node = meshcore_feeder["node"] + constellation = node.constellation + ch = MessageChannel.objects.create( + name="tag", + constellation=constellation, + protocol=Protocol.MESHCORE, + mc_channel_idx=2, + mc_channel_type=MeshCoreChannelType.HASHTAG, + mc_hashtag="meshflow", + ) + node.mc_channels.add(ch) + + payload = build_apply_channels_for_managed_node(node) + assert len(payload) == 1 + assert payload[0]["mc_channel_idx"] == 2 + assert payload[0]["mc_channel_type"] == "HASHTAG" + assert payload[0]["mc_hashtag"] == "meshflow" + + +@pytest.mark.django_db +def test_apply_mc_channels_to_feeder_dispatches(meshcore_feeder): + from meshcore_packets.services.channel_apply import apply_mc_channels_to_feeder + + node = meshcore_feeder["node"] + with ( + patch( + "meshcore_packets.services.channel_apply.feeder_ws_group_has_subscribers", + new_callable=AsyncMock, + return_value=True, + ), + patch( + "meshcore_packets.services.channel_apply.dispatch_node_command", + new_callable=AsyncMock, + ) as dispatch_mock, + ): + result = apply_mc_channels_to_feeder(node, [{"mc_channel_idx": 0, "name": "x", "mc_channel_type": "PUBLIC"}]) + + assert result == "sent" + dispatch_mock.assert_awaited_once() diff --git a/Meshflow/meshcore_packets/views.py b/Meshflow/meshcore_packets/views.py index 2449dd0..e8a60b6 100644 --- a/Meshflow/meshcore_packets/views.py +++ b/Meshflow/meshcore_packets/views.py @@ -4,18 +4,11 @@ from django.utils import timezone -from asgiref.sync import async_to_sync from rest_framework import generics, permissions, serializers, status from rest_framework.response import Response from rest_framework.views import APIView -from common.feeder_ws import ( - COMMAND_DISPATCH_UNAVAILABLE, - FEEDER_BOT_NOT_CONNECTED, - dispatch_node_command, - feeder_ws_group_has_subscribers, -) -from common.ws_groups import managed_node_ws_group +from common.feeder_ws import COMMAND_DISPATCH_UNAVAILABLE, FEEDER_BOT_NOT_CONNECTED from meshcore_packets.models import MeshCoreRawPacket, MeshCoreTextPacket from meshcore_packets.permissions import MeshCoreFeederPermission from meshcore_packets.serializers import MeshCorePacketIngestSerializer, MeshCorePacketListSerializer @@ -24,6 +17,7 @@ McChannelSyncSerializer, MessageChannelMcSerializer, ) +from meshcore_packets.services.channel_apply import apply_mc_channels_to_feeder from meshcore_packets.services.channel_sync import reconcile_mc_channels from meshcore_packets.signals import meshcore_packet_received, meshcore_text_packet_received from nodes.authentication import NodeAPIKeyAuthentication @@ -177,38 +171,6 @@ def post(self, request, feeder_pubkey_prefix, format=None): ) -def _dispatch_mc_channel_apply(managed_node: ManagedNode, channels: list[dict]) -> str: - """Dispatch apply_mc_channel_config. Returns ``sent`` or an error code string.""" - group = managed_node_ws_group(managed_node) - - async def _check_and_send() -> str: - try: - if not await feeder_ws_group_has_subscribers(group): - logger.warning( - "MC channel apply: no WebSocket subscriber on group %s", - group, - ) - return FEEDER_BOT_NOT_CONNECTED - except Exception as exc: - logger.exception("MC channel apply: feeder presence check failed: %s", exc) - return COMMAND_DISPATCH_UNAVAILABLE - - try: - await dispatch_node_command( - group, - { - "type": "apply_mc_channel_config", - "channels": channels, - }, - ) - except Exception as exc: - logger.exception("MC channel apply: group_send failed: %s", exc) - return COMMAND_DISPATCH_UNAVAILABLE - return "sent" - - return async_to_sync(_check_and_send)() - - class ManagedNodeMcChannelApplyView(APIView): """POST desired MC channels; pushes apply_mc_channel_config to connected feeder bot.""" @@ -228,7 +190,7 @@ def post(self, request, internal_id, format=None): return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) channels = serializer.validated_data["channels"] - result = _dispatch_mc_channel_apply(managed_node, channels) + result = apply_mc_channels_to_feeder(managed_node, channels) if result == FEEDER_BOT_NOT_CONNECTED: return Response( { diff --git a/Meshflow/nodes/admin.py b/Meshflow/nodes/admin.py index 2809ec2..257b5fa 100644 --- a/Meshflow/nodes/admin.py +++ b/Meshflow/nodes/admin.py @@ -1,17 +1,25 @@ from django import forms -from django.contrib import admin +from django.contrib import admin, messages from django.contrib.admin.widgets import FilteredSelectMultiple from django.core.exceptions import ObjectDoesNotExist from django.db import transaction +from django.utils.html import format_html, format_html_join from django.utils.safestring import mark_safe from django.utils.translation import gettext_lazy as _ +from common.feeder_ws import COMMAND_DISPATCH_UNAVAILABLE, FEEDER_BOT_NOT_CONNECTED +from common.mc_channel_labels import ( + managed_node_mc_channels_queryset, + mc_channel_admin_label, + mc_channel_type_name, +) from common.mesh_node_helpers import ( meshtastic_hex_to_int, meshtastic_id_to_hex, observed_node_search_conditions, ) from common.protocol import Protocol +from meshcore_packets.services.channel_apply import apply_mc_channels_to_feeder from .models import ManagedNode, ManagedNodeStatus, NodeAPIKey, NodeAuth, NodeLatestStatus, ObservedNode @@ -443,9 +451,49 @@ def save(self, commit=True): return super().save(commit=commit) +@admin.action(description=_("Push MC channel config to feeder device")) +def push_mc_channels_to_feeder(modeladmin, request, queryset): + for node in queryset: + if node.protocol != Protocol.MESHCORE: + modeladmin.message_user( + request, + _("%(name)s is not a MeshCore feeder.") % {"name": node.name}, + level=messages.WARNING, + ) + continue + channel_count = managed_node_mc_channels_queryset(node).count() + if channel_count == 0: + modeladmin.message_user( + request, + _("%(name)s has no synced MC channels to push.") % {"name": node.name}, + level=messages.WARNING, + ) + continue + result = apply_mc_channels_to_feeder(node) + if result == FEEDER_BOT_NOT_CONNECTED: + modeladmin.message_user( + request, + _("%(name)s: feeder bot not connected via WebSocket.") % {"name": node.name}, + level=messages.ERROR, + ) + elif result == COMMAND_DISPATCH_UNAVAILABLE: + modeladmin.message_user( + request, + _("%(name)s: could not dispatch to channel layer.") % {"name": node.name}, + level=messages.ERROR, + ) + else: + modeladmin.message_user( + request, + _("%(name)s: pushed %(count)s channel(s) to feeder.") % {"name": node.name, "count": channel_count}, + level=messages.SUCCESS, + ) + + @admin.register(ManagedNode) class ManagedNodeAdmin(admin.ModelAdmin): form = ManagedNodeAdminForm + actions = [push_mc_channels_to_feeder] list_display = ( "protocol", "meshtastic_node_id", @@ -505,16 +553,41 @@ def mc_channel_count(self, obj): return "—" return obj.mc_channels.count() + @admin.display(description=_("Device channels (read-only)")) + def mc_channels_mirror(self, obj): + if obj is None or obj.protocol != Protocol.MESHCORE: + return "—" + rows = list(managed_node_mc_channels_queryset(obj)) + if not rows: + return format_html( + "

{}

", + _("No channels synced from device yet. Connect the bot to populate this mirror."), + ) + row_html = format_html_join( + "", + "{}{}{}", + ( + ( + ch.mc_channel_idx if ch.mc_channel_idx is not None else "—", + mc_channel_type_name(ch), + mc_channel_admin_label(ch), + ) + for ch in rows + ), + ) + return format_html( + "{}
{}{}{}
", + _("Slot"), + _("Type"), + _("Label"), + row_html, + ) + def get_readonly_fields(self, request, obj=None): if obj and obj.protocol == Protocol.MESHCORE: - return ("display_id", "mc_channels_synced_at") + return ("display_id", "mc_channels_mirror", "mc_channels_synced_at") return ("mc_channels_synced_at",) - def get_filter_horizontal(self, request, obj=None): - if obj and obj.protocol == Protocol.MESHCORE: - return ("mc_channels",) - return () - def get_fieldsets(self, request, obj=None): common = (_("Feeder"), {"fields": MANAGED_NODE_COMMON_FIELDS}) channels = ( @@ -528,12 +601,14 @@ def get_fieldsets(self, request, obj=None): if obj and obj.protocol == Protocol.MESHCORE: mc_identity = (_("MeshCore identity"), {"fields": ("mc_pubkey", "display_id")}) mc_channels = ( - _("MeshCore channels"), + _("MeshCore channels (device mirror)"), { - "fields": ("mc_channels", "mc_channels_synced_at"), + "fields": ("mc_channels_mirror", "mc_channels_synced_at"), "description": _( - "Mirror of the feeder device channel table (updated by bot sync). " - "Edit links only; channel rows live under Message channels." + "Read-only snapshot from the feeder device (bot channel sync). " + "Edit constellation channel definitions under MeshCore channels, " + "then use the admin action “Push MC channel config to feeder device” " + "to apply this mirror to the radio." ), }, ) From 053df0c94264e87308685fb989429ea2f71f0467 Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Thu, 21 May 2026 14:44:20 +0100 Subject: [PATCH 8/8] chore: lint --- Meshflow/common/tests/test_feeder_ws.py | 3 ++- Meshflow/ws/consumers.py | 3 +-- Meshflow/ws/tests/test_mc_node_consumer.py | 10 +++------- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/Meshflow/common/tests/test_feeder_ws.py b/Meshflow/common/tests/test_feeder_ws.py index 4122a78..3a265a0 100644 --- a/Meshflow/common/tests/test_feeder_ws.py +++ b/Meshflow/common/tests/test_feeder_ws.py @@ -3,9 +3,10 @@ import time from unittest.mock import AsyncMock, MagicMock +from django.utils.translation import gettext_lazy + import msgpack import pytest -from django.utils.translation import gettext_lazy from common.feeder_ws import _ws_json_safe, dispatch_node_command, feeder_ws_group_has_subscribers diff --git a/Meshflow/ws/consumers.py b/Meshflow/ws/consumers.py index 312fdf1..7338429 100644 --- a/Meshflow/ws/consumers.py +++ b/Meshflow/ws/consumers.py @@ -109,8 +109,7 @@ def _validate_api_key(self, api_key, feeder_pubkey_prefix=None): mc_auths = [a for a in auths if a.node.protocol == Protocol.MESHCORE] if len(mc_auths) > 1: logger.warning( - "NodeConsumer: API key linked to %s MC feeders; " - "pass feeder_pubkey_prefix on ws/nodes/", + "NodeConsumer: API key linked to %s MC feeders; " "pass feeder_pubkey_prefix on ws/nodes/", len(mc_auths), ) return None diff --git a/Meshflow/ws/tests/test_mc_node_consumer.py b/Meshflow/ws/tests/test_mc_node_consumer.py index 8c7234d..f210d48 100644 --- a/Meshflow/ws/tests/test_mc_node_consumer.py +++ b/Meshflow/ws/tests/test_mc_node_consumer.py @@ -6,7 +6,6 @@ from channels.testing import WebsocketCommunicator import Meshflow.routing # noqa: F401 - from common.protocol import Protocol from common.ws_groups import managed_node_ws_group @@ -18,9 +17,7 @@ @pytest.mark.django_db @pytest.mark.asyncio -async def test_mc_consumer_requires_prefix_when_multiple_feeders( - create_managed_node, create_node_api_key -): +async def test_mc_consumer_requires_prefix_when_multiple_feeders(create_managed_node, create_node_api_key): from nodes.models import NodeAuth @database_sync_to_async @@ -55,10 +52,9 @@ def setup(): @pytest.mark.django_db @pytest.mark.asyncio -async def test_mc_consumer_accepts_feeder_pubkey_prefix( - create_managed_node, create_node_api_key -): +async def test_mc_consumer_accepts_feeder_pubkey_prefix(create_managed_node, create_node_api_key): from channels.layers import get_channel_layer + from nodes.models import NodeAuth @database_sync_to_async