diff --git a/.cursorignore b/.cursorignore index 3b72e12..dbd0569 100644 --- a/.cursorignore +++ b/.cursorignore @@ -72,3 +72,6 @@ remote_db_dump.sql # Tailwind / CSS build artifact (if generated locally) tailwind.css + +# Secrets +.env diff --git a/Meshflow/common/feeder_ws.py b/Meshflow/common/feeder_ws.py index a8e5fcb..4c73072 100644 --- a/Meshflow/common/feeder_ws.py +++ b/Meshflow/common/feeder_ws.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +import time from channels.layers import get_channel_layer @@ -12,16 +13,65 @@ COMMAND_DISPATCH_UNAVAILABLE = "command_dispatch_unavailable" +def _ws_json_safe(value): + """Recursively coerce channel-layer payloads to msgpack-safe plain Python.""" + from django.utils.functional import Promise + + if isinstance(value, Promise): + return str(value) + if isinstance(value, dict): + return {str(k): _ws_json_safe(v) for k, v in value.items()} + if isinstance(value, (list, tuple)): + return [_ws_json_safe(v) for v in value] + if isinstance(value, (str, int, float, bool)) or value is None: + return value + return str(value) + + +async def _redis_group_has_channels(layer, group: str) -> bool: + """Presence check for channels_redis (group is a ZSET at asgi:group:{name}).""" + key = layer._group_key(group) + connection = layer.connection(layer.consistent_hash(group)) + await connection.zremrangebyscore( + key, + min=0, + max=int(time.time()) - layer.group_expiry, + ) + names = await connection.zrange(key, 0, -1) + return bool(names) + + +async def _inmemory_group_has_channels(layer, group: str) -> bool: + """Presence check for InMemoryChannelLayer (tests).""" + members = layer.groups.get(group) or {} + now = time.time() + return any(now - ts < layer.group_expiry for ts in members.values()) + + async def feeder_ws_group_has_subscribers(group: str) -> bool: """Return True if at least one bot WebSocket is subscribed to ``group``.""" channel_layer = get_channel_layer() if channel_layer is None: return False - if not hasattr(channel_layer, "group_channels"): - logger.warning("Channel layer does not support group_channels; assuming feeder offline") - return False - channels = await channel_layer.group_channels(group) - return bool(channels) + + try: + from channels.layers import InMemoryChannelLayer + from channels_redis.core import RedisChannelLayer + except ImportError: + RedisChannelLayer = None # type: ignore[misc, assignment] + InMemoryChannelLayer = None # type: ignore[misc, assignment] + + if RedisChannelLayer is not None and isinstance(channel_layer, RedisChannelLayer): + return await _redis_group_has_channels(channel_layer, group) + + if InMemoryChannelLayer is not None and isinstance(channel_layer, InMemoryChannelLayer): + return await _inmemory_group_has_channels(channel_layer, group) + + logger.warning( + "Channel layer %s has no membership probe; assuming feeder offline", + type(channel_layer).__name__, + ) + return False async def dispatch_node_command(group: str, command: dict) -> None: @@ -31,8 +81,10 @@ async def dispatch_node_command(group: str, command: dict) -> None: raise RuntimeError("Channel layer is not configured") await channel_layer.group_send( group, - { - "type": "node_command", - "command": command, - }, + _ws_json_safe( + { + "type": "node_command", + "command": command, + } + ), ) diff --git a/Meshflow/common/mc_channel_labels.py b/Meshflow/common/mc_channel_labels.py new file mode 100644 index 0000000..ff63b80 --- /dev/null +++ b/Meshflow/common/mc_channel_labels.py @@ -0,0 +1,50 @@ +"""Display labels and apply payloads for MeshCore MessageChannel rows.""" + +from __future__ import annotations + +from constellations.models import MeshCoreChannelType, MessageChannel + + +def mc_channel_admin_label(channel: MessageChannel) -> str: + """Human label for admin lists: #hashtag for HASHTAG, plain name for PUBLIC.""" + if channel.mc_channel_type == MeshCoreChannelType.HASHTAG: + tag = (channel.mc_hashtag or channel.name or "").strip().lstrip("#") + if tag: + return f"#{tag}" + name = (channel.name or "").strip() + if name: + return name + if channel.mc_channel_idx is not None: + return f"slot {channel.mc_channel_idx}" + return str(channel.pk) + + +def mc_channel_type_name(channel: MessageChannel) -> str: + if channel.mc_channel_type is None: + return "—" + return MeshCoreChannelType(channel.mc_channel_type).name + + +def message_channel_to_apply_entry(channel: MessageChannel) -> dict: + """Build one apply_mc_channel_config entry from a MessageChannel row.""" + ch_type = mc_channel_type_name(channel) + if ch_type == "—": + ch_type = "PUBLIC" + entry = { + "mc_channel_idx": channel.mc_channel_idx, + "name": channel.name, + "mc_channel_type": ch_type, + } + if channel.mc_channel_type == MeshCoreChannelType.HASHTAG: + tag = (channel.mc_hashtag or channel.name or "").strip().lstrip("#") + entry["mc_hashtag"] = tag[:64] if tag else None + if tag: + entry["name"] = tag[:100] + return entry + + +def managed_node_mc_channels_queryset(managed_node): + """MC channel rows linked on a MeshCore feeder (device mirror).""" + from common.protocol import Protocol + + return managed_node.mc_channels.filter(protocol=Protocol.MESHCORE).order_by("mc_channel_idx") diff --git a/Meshflow/common/tests/test_feeder_ws.py b/Meshflow/common/tests/test_feeder_ws.py new file mode 100644 index 0000000..3a265a0 --- /dev/null +++ b/Meshflow/common/tests/test_feeder_ws.py @@ -0,0 +1,103 @@ +"""Tests for feeder WebSocket presence helpers.""" + +import time +from unittest.mock import AsyncMock, MagicMock + +from django.utils.translation import gettext_lazy + +import msgpack +import pytest + +from common.feeder_ws import _ws_json_safe, dispatch_node_command, feeder_ws_group_has_subscribers + + +@pytest.mark.asyncio +async def test_redis_group_has_subscribers_when_zset_non_empty(monkeypatch): + from channels_redis.core import RedisChannelLayer + + layer = MagicMock(spec=RedisChannelLayer) + layer._group_key.return_value = "asgi:group:node_mc_test" + layer.consistent_hash.return_value = 0 + layer.group_expiry = 86400 + connection = AsyncMock() + connection.zrange.return_value = [b"specific.channel.name"] + layer.connection.return_value = connection + + monkeypatch.setattr( + "common.feeder_ws.get_channel_layer", + lambda: layer, + ) + + assert await feeder_ws_group_has_subscribers("node_mc_test") is True + connection.zremrangebyscore.assert_awaited_once() + connection.zrange.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_redis_group_has_no_subscribers_when_zset_empty(monkeypatch): + from channels_redis.core import RedisChannelLayer + + layer = MagicMock(spec=RedisChannelLayer) + layer._group_key.return_value = "asgi:group:node_mc_empty" + layer.consistent_hash.return_value = 0 + layer.group_expiry = 86400 + connection = AsyncMock() + connection.zrange.return_value = [] + layer.connection.return_value = connection + + monkeypatch.setattr( + "common.feeder_ws.get_channel_layer", + lambda: layer, + ) + + assert await feeder_ws_group_has_subscribers("node_mc_empty") is False + + +@pytest.mark.asyncio +async def test_inmemory_group_has_subscribers(monkeypatch): + from channels.layers import InMemoryChannelLayer + + layer = InMemoryChannelLayer() + layer.groups["node_mc_mem"] = {"test.channel": time.time()} + + monkeypatch.setattr( + "common.feeder_ws.get_channel_layer", + lambda: layer, + ) + + assert await feeder_ws_group_has_subscribers("node_mc_mem") is True + + +def test_ws_json_safe_coerces_lazy_translation_proxy(): + payload = { + "type": "node_command", + "command": { + "type": "apply_mc_channels", + "channels": [ + { + "mc_channel_type": gettext_lazy("HASHTAG"), + "name": "test", + } + ], + }, + } + safe = _ws_json_safe(payload) + msgpack.packb(safe) + assert safe["command"]["channels"][0]["mc_channel_type"] == "HASHTAG" + + +@pytest.mark.asyncio +async def test_dispatch_node_command_sends_msgpack_safe_payload(monkeypatch): + layer = AsyncMock() + monkeypatch.setattr("common.feeder_ws.get_channel_layer", lambda: layer) + + command = { + "type": "apply_mc_channels", + "channels": [{"mc_channel_type": gettext_lazy("PUBLIC"), "name": "x"}], + } + await dispatch_node_command("node_mc_test", command) + + layer.group_send.assert_awaited_once() + message = layer.group_send.await_args[0][1] + msgpack.packb(message) + assert message["command"]["channels"][0]["mc_channel_type"] == "PUBLIC" diff --git a/Meshflow/common/tests/test_mc_channel_labels.py b/Meshflow/common/tests/test_mc_channel_labels.py new file mode 100644 index 0000000..f3c7cfb --- /dev/null +++ b/Meshflow/common/tests/test_mc_channel_labels.py @@ -0,0 +1,54 @@ +"""Tests for MeshCore channel admin labels and apply payloads.""" + +import pytest + +from common.mc_channel_labels import ( + mc_channel_admin_label, + message_channel_to_apply_entry, +) +from common.protocol import Protocol +from constellations.models import MeshCoreChannelType, MessageChannel + + +@pytest.mark.django_db +def test_mc_channel_admin_label_public(create_constellation): + constellation = create_constellation() + ch = MessageChannel.objects.create( + name="Public", + constellation=constellation, + protocol=Protocol.MESHCORE, + mc_channel_idx=0, + mc_channel_type=MeshCoreChannelType.PUBLIC, + ) + assert mc_channel_admin_label(ch) == "Public" + + +@pytest.mark.django_db +def test_mc_channel_admin_label_hashtag_prefix(create_constellation): + constellation = create_constellation() + ch = MessageChannel.objects.create( + name="galloway", + constellation=constellation, + protocol=Protocol.MESHCORE, + mc_channel_idx=1, + mc_channel_type=MeshCoreChannelType.HASHTAG, + mc_hashtag="galloway", + ) + assert mc_channel_admin_label(ch) == "#galloway" + + +@pytest.mark.django_db +def test_message_channel_to_apply_entry_hashtag(create_constellation): + constellation = create_constellation() + ch = MessageChannel.objects.create( + name="galloway", + constellation=constellation, + protocol=Protocol.MESHCORE, + mc_channel_idx=1, + mc_channel_type=MeshCoreChannelType.HASHTAG, + mc_hashtag="galloway", + ) + entry = message_channel_to_apply_entry(ch) + assert entry["mc_channel_type"] == "HASHTAG" + assert entry["mc_hashtag"] == "galloway" + assert entry["name"] == "galloway" diff --git a/Meshflow/constellations/admin.py b/Meshflow/constellations/admin.py index 646df16..3bf4458 100644 --- a/Meshflow/constellations/admin.py +++ b/Meshflow/constellations/admin.py @@ -3,7 +3,15 @@ from django.utils.html import format_html from django.utils.translation import gettext_lazy as _ -from .models import Constellation, ConstellationUserMembership, MessageChannel +from common.mc_channel_labels import mc_channel_admin_label, mc_channel_type_name +from common.protocol import Protocol + +from .models import ( + Constellation, + ConstellationUserMembership, + MeshCoreMessageChannel, + MessageChannel, +) class ConstellationAdminForm(forms.ModelForm): @@ -126,11 +134,61 @@ def get_constellation_creator(self, obj): @admin.register(MessageChannel) class MessageChannelAdmin(admin.ModelAdmin): - list_display = ("id", "name", "protocol", "mc_channel_idx", "constellation") + """Meshtastic and legacy rows; MeshCore operators should use MeshCore channels.""" + + list_display = ("id", "name", "protocol", "constellation") list_filter = ( ("protocol", admin.ChoicesFieldListFilter), "constellation", - ("mc_channel_idx", admin.EmptyFieldListFilter), ) - search_fields = ("name", "id") - ordering = ("name",) + search_fields = ("name", "id", "constellation__name") + ordering = ("constellation__name", "name") + list_select_related = ("constellation",) + + +@admin.register(MeshCoreMessageChannel) +class MeshCoreMessageChannelAdmin(admin.ModelAdmin): + """Constellation MC channel catalog (device slots). Push to radio from Managed node admin.""" + + list_display = ( + "mc_channel_idx", + "admin_label", + "mc_channel_type_display", + "constellation", + "id", + ) + list_filter = ( + ("mc_channel_type", admin.ChoicesFieldListFilter), + "constellation", + ) + search_fields = ("name", "mc_hashtag", "constellation__name") + ordering = ("constellation__name", "mc_channel_idx") + list_select_related = ("constellation",) + fieldsets = ( + (None, {"fields": ("constellation", "mc_channel_idx")}), + ( + _("Channel"), + { + "fields": ("name", "mc_channel_type", "mc_hashtag"), + "description": _( + "PUBLIC channels use a plain name. HASHTAG channels use mc_hashtag " + "(no leading #); lists show #prefix for hashtags." + ), + }, + ), + ) + + def get_queryset(self, request): + return super().get_queryset(request).filter(protocol=Protocol.MESHCORE) + + def save_model(self, request, obj, form, change): + obj.protocol = Protocol.MESHCORE + super().save_model(request, obj, form, change) + + @admin.display(description=_("Label"), ordering="name") + def admin_label(self, obj): + return mc_channel_admin_label(obj) + + @admin.display(description=_("Type"), ordering="mc_channel_type") + def mc_channel_type_display(self, obj): + return mc_channel_type_name(obj) diff --git a/Meshflow/constellations/migrations/0010_meshcore_message_channel_proxy.py b/Meshflow/constellations/migrations/0010_meshcore_message_channel_proxy.py new file mode 100644 index 0000000..e05dc47 --- /dev/null +++ b/Meshflow/constellations/migrations/0010_meshcore_message_channel_proxy.py @@ -0,0 +1,26 @@ +# Generated by Django 5.2.14 on 2026-05-21 13:33 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('constellations', '0009_messagechannel_mc_type_hashtag'), + ] + + operations = [ + migrations.CreateModel( + name='MeshCoreMessageChannel', + fields=[ + ], + options={ + 'verbose_name': 'MeshCore channel', + 'verbose_name_plural': 'MeshCore channels', + 'proxy': True, + 'indexes': [], + 'constraints': [], + }, + bases=('constellations.messagechannel',), + ), + ] diff --git a/Meshflow/constellations/models.py b/Meshflow/constellations/models.py index ace3063..7abb55e 100644 --- a/Meshflow/constellations/models.py +++ b/Meshflow/constellations/models.py @@ -99,3 +99,12 @@ class Meta: def __str__(self): return self.name + + +class MeshCoreMessageChannel(MessageChannel): + """Proxy for Django admin: MeshCore channel rows only.""" + + class Meta: + proxy = True + verbose_name = _("MeshCore channel") + verbose_name_plural = _("MeshCore channels") diff --git a/Meshflow/meshcore_packets/serializers_channel.py b/Meshflow/meshcore_packets/serializers_channel.py index 103bac9..1c59902 100644 --- a/Meshflow/meshcore_packets/serializers_channel.py +++ b/Meshflow/meshcore_packets/serializers_channel.py @@ -4,13 +4,17 @@ from constellations.models import MeshCoreChannelType, MessageChannel +# Wire/API strings (not gettext labels — lazy __proxy__ breaks Channels msgpack). +MC_CHANNEL_TYPE_API_CHOICES = [ + ("PUBLIC", "PUBLIC"), + ("HASHTAG", "HASHTAG"), +] + class McChannelSnapshotEntrySerializer(serializers.Serializer): mc_channel_idx = serializers.IntegerField(min_value=0, max_value=63) name = serializers.CharField(max_length=100) - mc_channel_type = serializers.ChoiceField( - choices=[(c.label, c.label) for c in MeshCoreChannelType], - ) + mc_channel_type = serializers.ChoiceField(choices=MC_CHANNEL_TYPE_API_CHOICES) mc_hashtag = serializers.CharField(max_length=64, required=False, allow_null=True, allow_blank=True) @@ -35,15 +39,13 @@ class Meta: def get_mc_channel_type(self, obj): if obj.mc_channel_type is None: return None - return MeshCoreChannelType(obj.mc_channel_type).label + return MeshCoreChannelType(obj.mc_channel_type).name class McChannelApplyEntrySerializer(serializers.Serializer): mc_channel_idx = serializers.IntegerField(min_value=0, max_value=63, required=False) name = serializers.CharField(max_length=100) - mc_channel_type = serializers.ChoiceField( - choices=[(c.label, c.label) for c in MeshCoreChannelType], - ) + mc_channel_type = serializers.ChoiceField(choices=MC_CHANNEL_TYPE_API_CHOICES) mc_hashtag = serializers.CharField(max_length=64, required=False, allow_null=True, allow_blank=True) @@ -53,7 +55,7 @@ class McChannelApplySerializer(serializers.Serializer): def validate(self, attrs): channels = attrs.get("channels") or [] for entry in channels: - if entry.get("mc_channel_type") == MeshCoreChannelType.HASHTAG.label: + if str(entry.get("mc_channel_type")).upper() == "HASHTAG": tag = (entry.get("mc_hashtag") or entry.get("name") or "").strip().lstrip("#") if not tag: raise serializers.ValidationError({"channels": "Hashtag channels require a non-empty hashtag."}) diff --git a/Meshflow/meshcore_packets/services/channel_apply.py b/Meshflow/meshcore_packets/services/channel_apply.py new file mode 100644 index 0000000..bf08643 --- /dev/null +++ b/Meshflow/meshcore_packets/services/channel_apply.py @@ -0,0 +1,67 @@ +"""Push MeshCore channel config to a connected feeder bot via WebSocket.""" + +from __future__ import annotations + +import logging + +from asgiref.sync import async_to_sync + +from common.feeder_ws import ( + COMMAND_DISPATCH_UNAVAILABLE, + FEEDER_BOT_NOT_CONNECTED, + dispatch_node_command, + feeder_ws_group_has_subscribers, +) +from common.mc_channel_labels import managed_node_mc_channels_queryset, message_channel_to_apply_entry +from common.protocol import Protocol +from common.ws_groups import managed_node_ws_group +from nodes.models import ManagedNode + +logger = logging.getLogger(__name__) + + +def build_apply_channels_for_managed_node(managed_node: ManagedNode) -> list[dict]: + """Snapshot entries for apply_mc_channel_config from the feeder mirror.""" + if managed_node.protocol != Protocol.MESHCORE: + return [] + return [message_channel_to_apply_entry(ch) for ch in managed_node_mc_channels_queryset(managed_node)] + + +def dispatch_mc_channel_apply(managed_node: ManagedNode, channels: list[dict]) -> str: + """Dispatch apply_mc_channel_config. Returns ``sent`` or an error code string.""" + group = managed_node_ws_group(managed_node) + + async def _check_and_send() -> str: + try: + if not await feeder_ws_group_has_subscribers(group): + logger.warning( + "MC channel apply: no WebSocket subscriber on group %s", + group, + ) + return FEEDER_BOT_NOT_CONNECTED + except Exception as exc: + logger.exception("MC channel apply: feeder presence check failed: %s", exc) + return COMMAND_DISPATCH_UNAVAILABLE + + try: + await dispatch_node_command( + group, + { + "type": "apply_mc_channel_config", + "channels": channels, + }, + ) + except Exception as exc: + logger.exception("MC channel apply: group_send failed: %s", exc) + return COMMAND_DISPATCH_UNAVAILABLE + return "sent" + + return async_to_sync(_check_and_send)() + + +def apply_mc_channels_to_feeder(managed_node: ManagedNode, channels: list[dict] | None = None) -> str: + """Push channel config to the feeder bot. Uses mirror when *channels* is omitted.""" + if managed_node.protocol != Protocol.MESHCORE: + raise ValueError("apply_mc_channels_to_feeder requires a MeshCore managed node") + payload = channels if channels is not None else build_apply_channels_for_managed_node(managed_node) + return dispatch_mc_channel_apply(managed_node, payload) diff --git a/Meshflow/meshcore_packets/tests/test_apply_channel.py b/Meshflow/meshcore_packets/tests/test_apply_channel.py index 8ef77ae..af50684 100644 --- a/Meshflow/meshcore_packets/tests/test_apply_channel.py +++ b/Meshflow/meshcore_packets/tests/test_apply_channel.py @@ -24,7 +24,7 @@ def test_apply_returns_503_when_feeder_not_connected(create_user, create_managed url = reverse("meshcore-apply-mc-channel-config", kwargs={"internal_id": node.internal_id}) with patch( - "meshcore_packets.views.feeder_ws_group_has_subscribers", + "meshcore_packets.services.channel_apply.feeder_ws_group_has_subscribers", new_callable=AsyncMock, return_value=False, ): @@ -62,12 +62,12 @@ def test_apply_returns_503_when_dispatch_fails(create_user, create_managed_node) with ( patch( - "meshcore_packets.views.feeder_ws_group_has_subscribers", + "meshcore_packets.services.channel_apply.feeder_ws_group_has_subscribers", new_callable=AsyncMock, return_value=True, ), patch( - "meshcore_packets.views.dispatch_node_command", + "meshcore_packets.services.channel_apply.dispatch_node_command", new_callable=AsyncMock, side_effect=RuntimeError("TCPTransport closed"), ), @@ -104,13 +104,13 @@ def test_apply_dispatches_when_feeder_connected(create_user, create_managed_node with ( patch( - "meshcore_packets.views.feeder_ws_group_has_subscribers", + "meshcore_packets.services.channel_apply.feeder_ws_group_has_subscribers", new_callable=AsyncMock, return_value=True, ), patch( - "meshcore_packets.views.dispatch_node_command", - new_callable=AsyncMock, + "meshcore_packets.views.apply_mc_channels_to_feeder", + return_value="sent", ) as dispatch_mock, ): response = client.post( @@ -129,7 +129,9 @@ def test_apply_dispatches_when_feeder_connected(create_user, create_managed_node ) assert response.status_code == 202 - dispatch_mock.assert_awaited_once() - sent_channels = dispatch_mock.await_args[0][1]["channels"] + dispatch_mock.assert_called_once() + sent_channels = dispatch_mock.call_args[0][1] assert sent_channels[0]["mc_hashtag"] == "galloway" assert sent_channels[0]["name"] == "galloway" + assert sent_channels[0]["mc_channel_type"] == "HASHTAG" + assert type(sent_channels[0]["mc_channel_type"]) is str diff --git a/Meshflow/meshcore_packets/tests/test_channel_apply_service.py b/Meshflow/meshcore_packets/tests/test_channel_apply_service.py new file mode 100644 index 0000000..6065372 --- /dev/null +++ b/Meshflow/meshcore_packets/tests/test_channel_apply_service.py @@ -0,0 +1,52 @@ +"""Tests for channel_apply service.""" + +from unittest.mock import AsyncMock, patch + +import pytest + +from common.protocol import Protocol +from constellations.models import MeshCoreChannelType, MessageChannel +from meshcore_packets.services.channel_apply import build_apply_channels_for_managed_node + + +@pytest.mark.django_db +def test_build_apply_channels_for_managed_node(meshcore_feeder): + node = meshcore_feeder["node"] + constellation = node.constellation + ch = MessageChannel.objects.create( + name="tag", + constellation=constellation, + protocol=Protocol.MESHCORE, + mc_channel_idx=2, + mc_channel_type=MeshCoreChannelType.HASHTAG, + mc_hashtag="meshflow", + ) + node.mc_channels.add(ch) + + payload = build_apply_channels_for_managed_node(node) + assert len(payload) == 1 + assert payload[0]["mc_channel_idx"] == 2 + assert payload[0]["mc_channel_type"] == "HASHTAG" + assert payload[0]["mc_hashtag"] == "meshflow" + + +@pytest.mark.django_db +def test_apply_mc_channels_to_feeder_dispatches(meshcore_feeder): + from meshcore_packets.services.channel_apply import apply_mc_channels_to_feeder + + node = meshcore_feeder["node"] + with ( + patch( + "meshcore_packets.services.channel_apply.feeder_ws_group_has_subscribers", + new_callable=AsyncMock, + return_value=True, + ), + patch( + "meshcore_packets.services.channel_apply.dispatch_node_command", + new_callable=AsyncMock, + ) as dispatch_mock, + ): + result = apply_mc_channels_to_feeder(node, [{"mc_channel_idx": 0, "name": "x", "mc_channel_type": "PUBLIC"}]) + + assert result == "sent" + dispatch_mock.assert_awaited_once() diff --git a/Meshflow/meshcore_packets/views.py b/Meshflow/meshcore_packets/views.py index f0757b9..e8a60b6 100644 --- a/Meshflow/meshcore_packets/views.py +++ b/Meshflow/meshcore_packets/views.py @@ -4,18 +4,11 @@ from django.utils import timezone -from asgiref.sync import async_to_sync from rest_framework import generics, permissions, serializers, status from rest_framework.response import Response from rest_framework.views import APIView -from common.feeder_ws import ( - COMMAND_DISPATCH_UNAVAILABLE, - FEEDER_BOT_NOT_CONNECTED, - dispatch_node_command, - feeder_ws_group_has_subscribers, -) -from common.ws_groups import managed_node_ws_group +from common.feeder_ws import COMMAND_DISPATCH_UNAVAILABLE, FEEDER_BOT_NOT_CONNECTED from meshcore_packets.models import MeshCoreRawPacket, MeshCoreTextPacket from meshcore_packets.permissions import MeshCoreFeederPermission from meshcore_packets.serializers import MeshCorePacketIngestSerializer, MeshCorePacketListSerializer @@ -24,6 +17,7 @@ McChannelSyncSerializer, MessageChannelMcSerializer, ) +from meshcore_packets.services.channel_apply import apply_mc_channels_to_feeder from meshcore_packets.services.channel_sync import reconcile_mc_channels from meshcore_packets.signals import meshcore_packet_received, meshcore_text_packet_received from nodes.authentication import NodeAPIKeyAuthentication @@ -177,34 +171,6 @@ def post(self, request, feeder_pubkey_prefix, format=None): ) -def _dispatch_mc_channel_apply(managed_node: ManagedNode, channels: list[dict]) -> str: - """Dispatch apply_mc_channel_config. Returns ``sent`` or an error code string.""" - group = managed_node_ws_group(managed_node) - - async def _check_and_send() -> str: - try: - if not await feeder_ws_group_has_subscribers(group): - return FEEDER_BOT_NOT_CONNECTED - except Exception as exc: - logger.exception("MC channel apply: feeder presence check failed: %s", exc) - return COMMAND_DISPATCH_UNAVAILABLE - - try: - await dispatch_node_command( - group, - { - "type": "apply_mc_channel_config", - "channels": channels, - }, - ) - except Exception as exc: - logger.exception("MC channel apply: group_send failed: %s", exc) - return COMMAND_DISPATCH_UNAVAILABLE - return "sent" - - return async_to_sync(_check_and_send)() - - class ManagedNodeMcChannelApplyView(APIView): """POST desired MC channels; pushes apply_mc_channel_config to connected feeder bot.""" @@ -224,13 +190,15 @@ def post(self, request, internal_id, format=None): return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) channels = serializer.validated_data["channels"] - result = _dispatch_mc_channel_apply(managed_node, channels) + result = apply_mc_channels_to_feeder(managed_node, channels) if result == FEEDER_BOT_NOT_CONNECTED: return Response( { "detail": ( "Feeder bot is not connected via WebSocket. " - "Start the bot with MESHCORE_UPLOAD_ENABLED and MESHFLOW_WS_URL configured." + "Start the bot with MESHCORE_UPLOAD_ENABLED and MESHFLOW_WS_URL configured. " + "For shared API keys, the bot must connect with " + "feeder_pubkey_prefix in the WebSocket URL (same 12-hex prefix as ingest)." ), "code": FEEDER_BOT_NOT_CONNECTED, }, diff --git a/Meshflow/nodes/admin.py b/Meshflow/nodes/admin.py index 45fec26..257b5fa 100644 --- a/Meshflow/nodes/admin.py +++ b/Meshflow/nodes/admin.py @@ -1,17 +1,25 @@ from django import forms -from django.contrib import admin +from django.contrib import admin, messages from django.contrib.admin.widgets import FilteredSelectMultiple from django.core.exceptions import ObjectDoesNotExist from django.db import transaction +from django.utils.html import format_html, format_html_join from django.utils.safestring import mark_safe from django.utils.translation import gettext_lazy as _ +from common.feeder_ws import COMMAND_DISPATCH_UNAVAILABLE, FEEDER_BOT_NOT_CONNECTED +from common.mc_channel_labels import ( + managed_node_mc_channels_queryset, + mc_channel_admin_label, + mc_channel_type_name, +) from common.mesh_node_helpers import ( meshtastic_hex_to_int, meshtastic_id_to_hex, observed_node_search_conditions, ) from common.protocol import Protocol +from meshcore_packets.services.channel_apply import apply_mc_channels_to_feeder from .models import ManagedNode, ManagedNodeStatus, NodeAPIKey, NodeAuth, NodeLatestStatus, ObservedNode @@ -443,9 +451,49 @@ def save(self, commit=True): return super().save(commit=commit) +@admin.action(description=_("Push MC channel config to feeder device")) +def push_mc_channels_to_feeder(modeladmin, request, queryset): + for node in queryset: + if node.protocol != Protocol.MESHCORE: + modeladmin.message_user( + request, + _("%(name)s is not a MeshCore feeder.") % {"name": node.name}, + level=messages.WARNING, + ) + continue + channel_count = managed_node_mc_channels_queryset(node).count() + if channel_count == 0: + modeladmin.message_user( + request, + _("%(name)s has no synced MC channels to push.") % {"name": node.name}, + level=messages.WARNING, + ) + continue + result = apply_mc_channels_to_feeder(node) + if result == FEEDER_BOT_NOT_CONNECTED: + modeladmin.message_user( + request, + _("%(name)s: feeder bot not connected via WebSocket.") % {"name": node.name}, + level=messages.ERROR, + ) + elif result == COMMAND_DISPATCH_UNAVAILABLE: + modeladmin.message_user( + request, + _("%(name)s: could not dispatch to channel layer.") % {"name": node.name}, + level=messages.ERROR, + ) + else: + modeladmin.message_user( + request, + _("%(name)s: pushed %(count)s channel(s) to feeder.") % {"name": node.name, "count": channel_count}, + level=messages.SUCCESS, + ) + + @admin.register(ManagedNode) class ManagedNodeAdmin(admin.ModelAdmin): form = ManagedNodeAdminForm + actions = [push_mc_channels_to_feeder] list_display = ( "protocol", "meshtastic_node_id", @@ -453,6 +501,8 @@ class ManagedNodeAdmin(admin.ModelAdmin): "name", "owner", "constellation", + "mc_channel_count", + "mc_channels_synced_at", "allow_auto_traceroute", "status_is_sending_data", "status_last_packet_ingested_at", @@ -468,6 +518,7 @@ class ManagedNodeAdmin(admin.ModelAdmin): search_fields = ( "meshtastic_node_id", "name", + "mc_pubkey", "owner__username", "owner__email", ) @@ -496,10 +547,46 @@ def get_queryset(self, request): def display_id(self, obj): return obj.node_id_str + @admin.display(description=_("MC channels")) + def mc_channel_count(self, obj): + if obj.protocol != Protocol.MESHCORE: + return "—" + return obj.mc_channels.count() + + @admin.display(description=_("Device channels (read-only)")) + def mc_channels_mirror(self, obj): + if obj is None or obj.protocol != Protocol.MESHCORE: + return "—" + rows = list(managed_node_mc_channels_queryset(obj)) + if not rows: + return format_html( + "

{}

", + _("No channels synced from device yet. Connect the bot to populate this mirror."), + ) + row_html = format_html_join( + "", + "{}{}{}", + ( + ( + ch.mc_channel_idx if ch.mc_channel_idx is not None else "—", + mc_channel_type_name(ch), + mc_channel_admin_label(ch), + ) + for ch in rows + ), + ) + return format_html( + "{}
{}{}{}
", + _("Slot"), + _("Type"), + _("Label"), + row_html, + ) + def get_readonly_fields(self, request, obj=None): if obj and obj.protocol == Protocol.MESHCORE: - return ("display_id",) - return () + return ("display_id", "mc_channels_mirror", "mc_channels_synced_at") + return ("mc_channels_synced_at",) def get_fieldsets(self, request, obj=None): common = (_("Feeder"), {"fields": MANAGED_NODE_COMMON_FIELDS}) @@ -512,8 +599,20 @@ def get_fieldsets(self, request, obj=None): }, ) if obj and obj.protocol == Protocol.MESHCORE: - mc_fields = (_("MeshCore identity"), {"fields": ("mc_pubkey", "display_id")}) - return (common, mc_fields) + mc_identity = (_("MeshCore identity"), {"fields": ("mc_pubkey", "display_id")}) + mc_channels = ( + _("MeshCore channels (device mirror)"), + { + "fields": ("mc_channels_mirror", "mc_channels_synced_at"), + "description": _( + "Read-only snapshot from the feeder device (bot channel sync). " + "Edit constellation channel definitions under MeshCore channels, " + "then use the admin action “Push MC channel config to feeder device” " + "to apply this mirror to the radio." + ), + }, + ) + return (common, mc_identity, mc_channels) return (common, channels) diff --git a/Meshflow/ws/consumers.py b/Meshflow/ws/consumers.py index e6db148..7338429 100644 --- a/Meshflow/ws/consumers.py +++ b/Meshflow/ws/consumers.py @@ -31,9 +31,16 @@ async def connect(self): await self.close() return - managed_node = await self._validate_api_key(api_key) + feeder_pubkey_prefix = query_params.get("feeder_pubkey_prefix") + managed_node = await self._validate_api_key( + api_key, + feeder_pubkey_prefix=feeder_pubkey_prefix, + ) if not managed_node: - logger.warning("NodeConsumer: invalid or inactive api_key") + logger.warning( + "NodeConsumer: invalid api_key or feeder_pubkey_prefix=%s", + feeder_pubkey_prefix, + ) await self.close() return @@ -45,7 +52,9 @@ async def connect(self): await self.channel_layer.group_add(self.node_group, self.channel_name) await self.accept() logger.info( - f"NodeConsumer: bot connected for node {managed_node.meshtastic_node_id} ({managed_node.node_id_str})" + "NodeConsumer: bot connected for %s group=%s", + managed_node.node_id_str, + self.node_group, ) async def disconnect(self, close_code): @@ -65,8 +74,13 @@ async def node_command(self, event): await self.send(text_data=json.dumps(command)) @database_sync_to_async - def _validate_api_key(self, api_key): - """Validate NodeAPIKey and return the first linked ManagedNode, or None.""" + def _validate_api_key(self, api_key, feeder_pubkey_prefix=None): + """Validate NodeAPIKey and return the linked ManagedNode, or None.""" + from common.meshcore_feeder_auth import ( + MeshCoreFeederResolutionError, + resolve_meshcore_feeder, + ) + from common.protocol import Protocol from nodes.models import NodeAPIKey, NodeAuth try: @@ -74,13 +88,44 @@ def _validate_api_key(self, api_key): key_obj.last_used = timezone.now() key_obj.save(update_fields=["last_used"]) - node_auth = NodeAuth.objects.filter(api_key=key_obj).select_related("node").first() - if node_auth: - return node_auth.node + if feeder_pubkey_prefix: + try: + return resolve_meshcore_feeder( + api_key=key_obj, + feeder_pubkey_prefix=feeder_pubkey_prefix, + ) + except MeshCoreFeederResolutionError: + return None + + auths = list( + NodeAuth.objects.filter( + api_key=key_obj, + node__deleted_at__isnull=True, + ).select_related("node") + ) + if not auths: + return None + + mc_auths = [a for a in auths if a.node.protocol == Protocol.MESHCORE] + if len(mc_auths) > 1: + logger.warning( + "NodeConsumer: API key linked to %s MC feeders; " "pass feeder_pubkey_prefix on ws/nodes/", + len(mc_auths), + ) + return None + if len(mc_auths) == 1: + return mc_auths[0].node + + if len(auths) > 1: + logger.warning( + "NodeConsumer: API key linked to %s nodes; use feeder_pubkey_prefix for MC", + len(auths), + ) + return auths[0].node except NodeAPIKey.DoesNotExist: pass except Exception as e: - logger.exception(f"NodeConsumer: error validating api_key: {e}") + logger.exception("NodeConsumer: error validating api_key: %s", e) return None diff --git a/Meshflow/ws/tests/test_mc_node_consumer.py b/Meshflow/ws/tests/test_mc_node_consumer.py new file mode 100644 index 0000000..f210d48 --- /dev/null +++ b/Meshflow/ws/tests/test_mc_node_consumer.py @@ -0,0 +1,91 @@ +"""MeshCore NodeConsumer WebSocket tests.""" + +import pytest +from channels.db import database_sync_to_async +from channels.routing import URLRouter +from channels.testing import WebsocketCommunicator + +import Meshflow.routing # noqa: F401 +from common.protocol import Protocol +from common.ws_groups import managed_node_ws_group + +application = URLRouter(Meshflow.routing.websocket_urlpatterns) + +FEEDER_PUBKEY = "1a37f5aea4a1" + ("b" * 52) +FEEDER_PREFIX = "1a37f5aea4a1" + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_mc_consumer_requires_prefix_when_multiple_feeders(create_managed_node, create_node_api_key): + from nodes.models import NodeAuth + + @database_sync_to_async + def setup(): + constellation = create_managed_node(protocol=Protocol.MESHCORE).constellation + node_a = create_managed_node( + meshtastic_node_id=0, + protocol=Protocol.MESHCORE, + constellation=constellation, + mc_pubkey=FEEDER_PUBKEY, + ) + node_b = create_managed_node( + meshtastic_node_id=0, + protocol=Protocol.MESHCORE, + constellation=constellation, + mc_pubkey="c" * 64, + ) + api_key = create_node_api_key(constellation=constellation) + NodeAuth.objects.create(api_key=api_key, node=node_a) + NodeAuth.objects.create(api_key=api_key, node=node_b) + return api_key.key + + api_key = await setup() + communicator = WebsocketCommunicator( + application, + f"/ws/nodes/?api_key={api_key}", + ) + connected, _ = await communicator.connect() + assert connected is False + await communicator.disconnect() + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_mc_consumer_accepts_feeder_pubkey_prefix(create_managed_node, create_node_api_key): + from channels.layers import get_channel_layer + + from nodes.models import NodeAuth + + @database_sync_to_async + def setup(): + node = create_managed_node( + meshtastic_node_id=0, + protocol=Protocol.MESHCORE, + mc_pubkey=FEEDER_PUBKEY, + ) + api_key = create_node_api_key(constellation=node.constellation) + NodeAuth.objects.create(api_key=api_key, node=node) + return { + "api_key": api_key.key, + "group": managed_node_ws_group(node), + } + + data = await setup() + url = f"/ws/nodes/?api_key={data['api_key']}&feeder_pubkey_prefix={FEEDER_PREFIX}" + communicator = WebsocketCommunicator(application, url) + connected, _ = await communicator.connect() + assert connected is True + + channel_layer = get_channel_layer() + await channel_layer.group_send( + data["group"], + { + "type": "node_command", + "command": {"type": "apply_mc_channel_config", "channels": []}, + }, + ) + response = await communicator.receive_json_from() + assert response["type"] == "apply_mc_channel_config" + + await communicator.disconnect() diff --git a/docs/REDIS.md b/docs/REDIS.md index 7979de2..b3f7477 100644 --- a/docs/REDIS.md +++ b/docs/REDIS.md @@ -12,7 +12,7 @@ Configuration lives in [`Meshflow/Meshflow/settings/base.py`](../Meshflow/Meshfl - **Purpose:** Pub/sub and group membership for WebSocket delivery (`channel_layer.group_send` / `group_add`). - **Code:** [`Meshflow/ws/consumers.py`](../Meshflow/ws/consumers.py). - **Consumers / groups:** - - **`NodeConsumer`** (`ws/nodes/?api_key=…`) — bots join group `node_{node_id}`; receives **`node_command`** events (e.g. traceroute commands). Emitters include traceroute views/tasks and management commands (see grep for `group_send` under `Meshflow/`). + - **`NodeConsumer`** (`ws/nodes/?api_key=…`) — feeder bots join group `node_{meshtastic_node_id}` or **`node_mc_{managed_node.internal_id}`** (MeshCore); receives **`node_command`** events (e.g. traceroute, `apply_mc_channel_config`). Any API/ASGI worker can `group_send`; membership is cluster-wide in Redis DB 0. Emitters include traceroute views/tasks, `meshcore_packets` apply-channel, and management commands. - **`TextMessageConsumer`** — authenticated UI clients join group **`text_messages`**; receives **`text_message`** events for mesh text broadcasts ([`Meshflow/ws/services/text_message.py`](../Meshflow/ws/services/text_message.py)). - **`TracerouteConsumer`** (`ws/traceroutes/?token=…`) — clients join group **`traceroutes`**; receives **`traceroute_update`** events ([`Meshflow/traceroute/ws_notify.py`](../Meshflow/traceroute/ws_notify.py)). - **TTL:** Connection/session lifetime; Channels manages internal keys. diff --git a/docs/features/meshcore/text-message-channels.md b/docs/features/meshcore/text-message-channels.md index d3116dd..6711cd2 100644 --- a/docs/features/meshcore/text-message-channels.md +++ b/docs/features/meshcore/text-message-channels.md @@ -78,7 +78,35 @@ sequenceDiagram **Drift:** if the API mirror and device disagree (e.g. failed push), the **next connect sync overwrites the API** from the device. No three-way merge in v1. -Today the bot **does not** start the WebSocket client for `RADIO_PROTOCOL=meshcore` (Meshtastic-only). Enabling WS for MC feeders is part of [#297](https://github.com/pskillen/meshflow-api/issues/297). +The bot starts **`MeshflowWSClient`** for MeshCore when `STORAGE_API_ROOT` + token are set (WebSocket URL derived from the API base URL unless `MESHFLOW_WS_URL` is set). The feeder’s 12-hex pubkey prefix is appended automatically on connect (not an operator env var). + +### Horizontal scaling (API web tier) + +Apply and traceroute commands use **Django Channels + Redis DB 0** ([`docs/REDIS.md`](../../REDIS.md)), not in-process Django signals. Signals only run inside one Python process and do not reach other workers or hosts. + +| Piece | Scales horizontally? | +|-------|----------------------| +| `POST apply-mc-channel-config` on any Gunicorn/Uvicorn worker | Yes — handler calls `channel_layer.group_send` via Redis | +| `feeder_ws_group_has_subscribers` on any worker | Yes — `channels_redis` stores group membership in Redis DB 0 | +| Bot `NodeConsumer` WebSocket | One connection per bot; must land on **some** ASGI instance in the deployment that shares that Redis | +| Browser UI WebSocket (`text_messages`, JWT) | Separate consumer/groups; unrelated to feeder commands | + +You do **not** need a separate “signal” bus for multi-worker API: Redis channel layer already is that bus. + +### Local API + pre-prod bot (common 503 cause) + +Sharing **Postgres** and **Redis** is not enough if the **HTTP hosts differ**: + +- UI / `apply-mc-channel-config` → `http://localhost:8000` (local Django) +- Bot → `STORAGE_API_ROOT=https://pre-prod…/api` and WebSocket to **pre-prod** + +The bot registers in Redis group `node_mc_{uuid}` through **pre-prod’s** `NodeConsumer`. Local Django also reads Redis DB 0, so presence *can* work if both use the same `REDIS_HOST` / password and the managed node `internal_id` + `mc_pubkey` match the device. If local settings use **`InMemoryChannelLayer`** (tests only) or a different Redis DB/host, local apply always sees **503 feeder not connected** even though pre-prod logs show `MeshflowWSClient: connected`. + +**Practical dev setups (pick one):** + +1. Point **meshflow-ui** at pre-prod API (browser and bot share one deployment), or +2. Point **bot** `STORAGE_API_ROOT` at local API and run bot + API locally, or +3. Keep split hosts but verify local `CHANNEL_LAYERS` is `channels_redis` to the **same** Redis DB 0 as pre-prod. **Implementation plan:** Cursor workspace plan `mc_text_textmessage_pipeline_2c3e9fb8.plan.md` (kept in sync with this file; **this doc is canonical** in git for operators and PRs).