From aae79a910466cb7066c846b8a6282697a5d68379 Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Tue, 2 Jun 2026 09:40:03 +0100 Subject: [PATCH 1/6] feat(common): add resolve_meshtastic_feeder for shared API keys Mirror MeshCore feeder resolution so WebSocket and other callers can pick the correct Meshtastic ManagedNode by nodenum when one key links multiple feeders. --- Meshflow/common/meshtastic_feeder_auth.py | 97 ++++++++++++++ .../tests/test_meshtastic_feeder_auth.py | 123 ++++++++++++++++++ 2 files changed, 220 insertions(+) create mode 100644 Meshflow/common/meshtastic_feeder_auth.py create mode 100644 Meshflow/common/tests/test_meshtastic_feeder_auth.py diff --git a/Meshflow/common/meshtastic_feeder_auth.py b/Meshflow/common/meshtastic_feeder_auth.py new file mode 100644 index 0000000..58b2c22 --- /dev/null +++ b/Meshflow/common/meshtastic_feeder_auth.py @@ -0,0 +1,97 @@ +"""Resolve Meshtastic feeder ManagedNode from API key + node id.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Optional + +from common.mesh_node_helpers import ( + meshtastic_hex_to_int, + normalize_meshtastic_lookup_hex, +) +from common.protocol import Protocol +from nodes.models import NodeAuth + +if TYPE_CHECKING: + from nodes.models import ManagedNode, NodeAPIKey + + +@dataclass(frozen=True) +class MeshtasticFeederResolutionError(Exception): + """Feeder could not be resolved for this request.""" + + code: str + detail: str + + def __str__(self) -> str: + return self.detail + + +def resolve_meshtastic_feeder( + *, + api_key: NodeAPIKey, + feeder_node_id: Optional[int] = None, + feeder_node_id_str: Optional[str] = None, +) -> ManagedNode: + """ + Pick the Meshtastic ManagedNode for a feeder-scoped request. + + Raises MeshtasticFeederResolutionError with a stable ``code`` when resolution fails. + """ + node_id = _resolve_meshtastic_node_id( + feeder_node_id=feeder_node_id, + feeder_node_id_str=feeder_node_id_str, + ) + + try: + node_auth = NodeAuth.objects.select_related("node", "node__constellation").get( + api_key=api_key, + node__protocol=Protocol.MESHTASTIC, + node__meshtastic_node_id=node_id, + node__deleted_at__isnull=True, + ) + except NodeAuth.DoesNotExist: + raise MeshtasticFeederResolutionError( + code="feeder_not_linked", + detail="API key is not linked to a Meshtastic feeder with this node id.", + ) from None + + return node_auth.node + + +def _resolve_meshtastic_node_id( + *, + feeder_node_id: Optional[int] = None, + feeder_node_id_str: Optional[str] = None, +) -> int: + if feeder_node_id is not None and feeder_node_id_str: + raise MeshtasticFeederResolutionError( + code="invalid_feeder_node_id", + detail="Pass feeder_node_id or feeder_node_id_str, not both.", + ) + + if feeder_node_id is not None: + return feeder_node_id + + if feeder_node_id_str: + token = feeder_node_id_str.strip() + hex_part = normalize_meshtastic_lookup_hex(token) + if hex_part: + return int(hex_part, 16) + if token.startswith("!") or token == "^all": + try: + return meshtastic_hex_to_int(token) + except ValueError as exc: + raise MeshtasticFeederResolutionError( + code="invalid_feeder_node_id_str", + detail=str(exc), + ) from exc + raise MeshtasticFeederResolutionError( + code="invalid_feeder_node_id_str", + detail="feeder_node_id_str must be !xxxxxxxx or 8 hex digits.", + ) + + raise MeshtasticFeederResolutionError( + code="missing_feeder_node_id", + detail="feeder_node_id or feeder_node_id_str is required.", + ) diff --git a/Meshflow/common/tests/test_meshtastic_feeder_auth.py b/Meshflow/common/tests/test_meshtastic_feeder_auth.py new file mode 100644 index 0000000..dc18986 --- /dev/null +++ b/Meshflow/common/tests/test_meshtastic_feeder_auth.py @@ -0,0 +1,123 @@ +"""Unit tests for Meshtastic feeder resolution.""" + +import pytest + +from common.meshtastic_feeder_auth import ( + MeshtasticFeederResolutionError, + resolve_meshtastic_feeder, +) +from common.protocol import Protocol +from nodes.models import NodeAuth + +MT_NODE_A = 0x433B82F0 +MT_NODE_B = 0x12345678 + + +@pytest.mark.django_db +def test_resolve_single_feeder(create_managed_node, create_node_api_key): + node = create_managed_node(meshtastic_node_id=MT_NODE_A, protocol=Protocol.MESHTASTIC) + api_key = create_node_api_key(constellation=node.constellation) + NodeAuth.objects.create(api_key=api_key, node=node) + + assert ( + resolve_meshtastic_feeder( + api_key=api_key, + feeder_node_id=MT_NODE_A, + ) + == node + ) + + +@pytest.mark.django_db +def test_resolve_by_node_id_str(create_managed_node, create_node_api_key): + node = create_managed_node(meshtastic_node_id=MT_NODE_A, protocol=Protocol.MESHTASTIC) + api_key = create_node_api_key(constellation=node.constellation) + NodeAuth.objects.create(api_key=api_key, node=node) + + assert ( + resolve_meshtastic_feeder( + api_key=api_key, + feeder_node_id_str="!433b82f0", + ) + == node + ) + + +@pytest.mark.django_db +def test_resolve_wrong_node_id(create_managed_node, create_node_api_key): + node = create_managed_node(meshtastic_node_id=MT_NODE_A, protocol=Protocol.MESHTASTIC) + api_key = create_node_api_key(constellation=node.constellation) + NodeAuth.objects.create(api_key=api_key, node=node) + + with pytest.raises(MeshtasticFeederResolutionError) as exc: + resolve_meshtastic_feeder( + api_key=api_key, + feeder_node_id=MT_NODE_B, + ) + assert exc.value.code == "feeder_not_linked" + + +@pytest.mark.django_db +def test_shared_key_two_feeders(create_managed_node, create_constellation, create_node_api_key): + constellation = create_constellation(protocol=Protocol.MESHTASTIC) + node_a = create_managed_node( + meshtastic_node_id=MT_NODE_A, + protocol=Protocol.MESHTASTIC, + constellation=constellation, + name="Feeder A", + ) + node_b = create_managed_node( + meshtastic_node_id=MT_NODE_B, + protocol=Protocol.MESHTASTIC, + constellation=constellation, + name="Feeder B", + ) + api_key = create_node_api_key(constellation=constellation) + NodeAuth.objects.create(api_key=api_key, node=node_a) + NodeAuth.objects.create(api_key=api_key, node=node_b) + + assert ( + resolve_meshtastic_feeder( + api_key=api_key, + feeder_node_id=MT_NODE_A, + ) + == node_a + ) + assert ( + resolve_meshtastic_feeder( + api_key=api_key, + feeder_node_id=MT_NODE_B, + ) + == node_b + ) + + +@pytest.mark.django_db +def test_missing_node_id(create_node_api_key, create_constellation): + api_key = create_node_api_key(constellation=create_constellation(protocol=Protocol.MESHTASTIC)) + with pytest.raises(MeshtasticFeederResolutionError) as exc: + resolve_meshtastic_feeder(api_key=api_key) + assert exc.value.code == "missing_feeder_node_id" + + +@pytest.mark.django_db +def test_both_node_id_params_rejected(create_node_api_key, create_constellation): + api_key = create_node_api_key(constellation=create_constellation(protocol=Protocol.MESHTASTIC)) + with pytest.raises(MeshtasticFeederResolutionError) as exc: + resolve_meshtastic_feeder( + api_key=api_key, + feeder_node_id=MT_NODE_A, + feeder_node_id_str="!433b82f0", + ) + assert exc.value.code == "invalid_feeder_node_id" + + +@pytest.mark.django_db +def test_invalid_node_id_str(create_node_api_key, create_constellation): + api_key = create_node_api_key(constellation=create_constellation(protocol=Protocol.MESHTASTIC)) + with pytest.raises(MeshtasticFeederResolutionError) as exc: + resolve_meshtastic_feeder( + api_key=api_key, + feeder_node_id_str="not-a-node-id", + ) + assert exc.value.code == "invalid_feeder_node_id_str" From 87e8f32f314d986920f3cfee3ec5a9baabc478be Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Tue, 2 Jun 2026 09:40:10 +0100 Subject: [PATCH 2/6] fix(ws): disambiguate Meshtastic NodeConsumer for shared keys Accept feeder_node_id and feeder_node_id_str query params, reject ambiguous shared-key connects, and stop assigning the MC feeder when one MC and one MT share a key without disambiguators. --- Meshflow/ws/consumers.py | 67 +++++- Meshflow/ws/tests/test_mt_node_consumer.py | 264 +++++++++++++++++++++ 2 files changed, 326 insertions(+), 5 deletions(-) create mode 100644 Meshflow/ws/tests/test_mt_node_consumer.py diff --git a/Meshflow/ws/consumers.py b/Meshflow/ws/consumers.py index ab9e7b0..41e78bc 100644 --- a/Meshflow/ws/consumers.py +++ b/Meshflow/ws/consumers.py @@ -32,14 +32,32 @@ async def connect(self): return feeder_pubkey_prefix = query_params.get("feeder_pubkey_prefix") + feeder_node_id_raw = query_params.get("feeder_node_id") + feeder_node_id_str = query_params.get("feeder_node_id_str") + feeder_node_id = None + if feeder_node_id_raw is not None: + try: + feeder_node_id = int(feeder_node_id_raw) + except ValueError: + logger.warning( + "NodeConsumer: invalid feeder_node_id=%s", + feeder_node_id_raw, + ) + await self.close() + return + managed_node = await self._validate_api_key( api_key, feeder_pubkey_prefix=feeder_pubkey_prefix, + feeder_node_id=feeder_node_id, + feeder_node_id_str=feeder_node_id_str, ) if not managed_node: logger.warning( - "NodeConsumer: invalid api_key or feeder_pubkey_prefix=%s", + "NodeConsumer: invalid api_key feeder_pubkey_prefix=%s " "feeder_node_id=%s feeder_node_id_str=%s", feeder_pubkey_prefix, + feeder_node_id, + feeder_node_id_str, ) await self.close() return @@ -74,12 +92,22 @@ async def node_command(self, event): await self.send(text_data=json.dumps(command)) @database_sync_to_async - def _validate_api_key(self, api_key, feeder_pubkey_prefix=None): + def _validate_api_key( + self, + api_key, + feeder_pubkey_prefix=None, + feeder_node_id=None, + feeder_node_id_str=None, + ): """Validate NodeAPIKey and return the linked ManagedNode, or None.""" from common.meshcore_feeder_auth import ( MeshCoreFeederResolutionError, resolve_meshcore_feeder, ) + from common.meshtastic_feeder_auth import ( + MeshtasticFeederResolutionError, + resolve_meshtastic_feeder, + ) from common.protocol import Protocol from nodes.models import NodeAPIKey, NodeAuth @@ -88,6 +116,14 @@ def _validate_api_key(self, api_key, feeder_pubkey_prefix=None): key_obj.last_used = timezone.now() key_obj.save(update_fields=["last_used"]) + has_mc_disambiguator = bool(feeder_pubkey_prefix) + has_mt_disambiguator = feeder_node_id is not None or bool(feeder_node_id_str) + if has_mc_disambiguator and has_mt_disambiguator: + logger.warning( + "NodeConsumer: pass feeder_pubkey_prefix or feeder_node_id, not both", + ) + return None + if feeder_pubkey_prefix: try: return resolve_meshcore_feeder( @@ -97,6 +133,16 @@ def _validate_api_key(self, api_key, feeder_pubkey_prefix=None): except MeshCoreFeederResolutionError: return None + if has_mt_disambiguator: + try: + return resolve_meshtastic_feeder( + api_key=key_obj, + feeder_node_id=feeder_node_id, + feeder_node_id_str=feeder_node_id_str, + ) + except MeshtasticFeederResolutionError: + return None + auths = list( NodeAuth.objects.filter( api_key=key_obj, @@ -107,20 +153,31 @@ def _validate_api_key(self, api_key, feeder_pubkey_prefix=None): return None mc_auths = [a for a in auths if a.node.protocol == Protocol.MESHCORE] + mt_auths = [a for a in auths if a.node.protocol == Protocol.MESHTASTIC] + if len(mc_auths) > 1: logger.warning( "NodeConsumer: API key linked to %s MC feeders; " "pass feeder_pubkey_prefix on ws/nodes/", len(mc_auths), ) return None - if len(mc_auths) == 1: - return mc_auths[0].node + + if len(mt_auths) > 1: + logger.warning( + "NodeConsumer: API key linked to %s MT feeders; " + "pass feeder_node_id or feeder_node_id_str on ws/nodes/", + len(mt_auths), + ) + return None if len(auths) > 1: logger.warning( - "NodeConsumer: API key linked to %s nodes; use feeder_pubkey_prefix for MC", + "NodeConsumer: API key linked to %s feeders; " + "pass feeder_pubkey_prefix (MC) or feeder_node_id (MT)", len(auths), ) + return None + return auths[0].node except NodeAPIKey.DoesNotExist: pass diff --git a/Meshflow/ws/tests/test_mt_node_consumer.py b/Meshflow/ws/tests/test_mt_node_consumer.py new file mode 100644 index 0000000..357634a --- /dev/null +++ b/Meshflow/ws/tests/test_mt_node_consumer.py @@ -0,0 +1,264 @@ +"""Meshtastic NodeConsumer WebSocket tests.""" + +import asyncio + +import pytest +from channels.db import database_sync_to_async +from channels.routing import URLRouter +from channels.testing import WebsocketCommunicator + +import Meshflow.routing # noqa: F401 +from common.protocol import Protocol +from common.ws_groups import managed_node_ws_group + +application = URLRouter(Meshflow.routing.websocket_urlpatterns) + +MT_NODE_A = 0x433B82F0 +MT_NODE_B = 0x12345678 + +FEEDER_PUBKEY = "1a37f5aea4a1" + ("b" * 52) +FEEDER_PREFIX = "1a37f5aea4a1" + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_mt_consumer_requires_node_id_when_multiple_feeders( + create_managed_node, create_constellation, create_node_api_key +): + from nodes.models import NodeAuth + + @database_sync_to_async + def setup(): + constellation = create_constellation(protocol=Protocol.MESHTASTIC) + node_a = create_managed_node( + meshtastic_node_id=MT_NODE_A, + protocol=Protocol.MESHTASTIC, + constellation=constellation, + name="Feeder A", + ) + node_b = create_managed_node( + meshtastic_node_id=MT_NODE_B, + protocol=Protocol.MESHTASTIC, + constellation=constellation, + name="Feeder B", + ) + api_key = create_node_api_key(constellation=constellation) + NodeAuth.objects.create(api_key=api_key, node=node_a) + NodeAuth.objects.create(api_key=api_key, node=node_b) + return api_key.key + + api_key = await setup() + communicator = WebsocketCommunicator( + application, + f"/ws/nodes/?api_key={api_key}", + ) + connected, _ = await communicator.connect() + assert connected is False + await communicator.disconnect() + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_mt_consumer_accepts_feeder_node_id(create_managed_node, create_node_api_key): + from channels.layers import get_channel_layer + + from nodes.models import NodeAuth + + @database_sync_to_async + def setup(): + node = create_managed_node(protocol=Protocol.MESHTASTIC) + api_key = create_node_api_key(constellation=node.constellation) + NodeAuth.objects.create(api_key=api_key, node=node) + return { + "api_key": api_key.key, + "node_id": node.meshtastic_node_id, + "group": managed_node_ws_group(node), + } + + data = await setup() + url = f"/ws/nodes/?api_key={data['api_key']}&feeder_node_id={data['node_id']}" + communicator = WebsocketCommunicator(application, url) + connected, _ = await communicator.connect() + assert connected is True + + channel_layer = get_channel_layer() + await channel_layer.group_send( + data["group"], + { + "type": "node_command", + "command": {"type": "traceroute", "target": 1623194643}, + }, + ) + response = await communicator.receive_json_from() + assert response == {"type": "traceroute", "target": 1623194643} + + await communicator.disconnect() + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_mt_consumer_accepts_feeder_node_id_str(create_managed_node, create_node_api_key): + from common.mesh_node_helpers import meshtastic_id_to_hex + from nodes.models import NodeAuth + + @database_sync_to_async + def setup(): + node = create_managed_node(protocol=Protocol.MESHTASTIC) + api_key = create_node_api_key(constellation=node.constellation) + NodeAuth.objects.create(api_key=api_key, node=node) + return api_key.key, meshtastic_id_to_hex(node.meshtastic_node_id) + + api_key, node_id_str = await setup() + url = f"/ws/nodes/?api_key={api_key}&feeder_node_id_str={node_id_str}" + communicator = WebsocketCommunicator(application, url) + connected, _ = await communicator.connect() + assert connected is True + await communicator.disconnect() + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_two_mt_feeders_distinct_groups(create_managed_node, create_constellation, create_node_api_key): + from channels.layers import get_channel_layer + + from nodes.models import NodeAuth + + @database_sync_to_async + def setup(): + constellation = create_constellation(protocol=Protocol.MESHTASTIC) + node_a = create_managed_node( + protocol=Protocol.MESHTASTIC, + constellation=constellation, + name="Feeder A", + ) + node_b = create_managed_node( + protocol=Protocol.MESHTASTIC, + constellation=constellation, + name="Feeder B", + ) + api_key = create_node_api_key(constellation=constellation) + NodeAuth.objects.create(api_key=api_key, node=node_a) + NodeAuth.objects.create(api_key=api_key, node=node_b) + return { + "api_key": api_key.key, + "node_a_id": node_a.meshtastic_node_id, + "node_b_id": node_b.meshtastic_node_id, + "group_a": managed_node_ws_group(node_a), + "group_b": managed_node_ws_group(node_b), + } + + data = await setup() + comm_a = WebsocketCommunicator( + application, + f"/ws/nodes/?api_key={data['api_key']}&feeder_node_id={data['node_a_id']}", + ) + comm_b = WebsocketCommunicator( + application, + f"/ws/nodes/?api_key={data['api_key']}&feeder_node_id={data['node_b_id']}", + ) + assert (await comm_a.connect())[0] is True + assert (await comm_b.connect())[0] is True + + channel_layer = get_channel_layer() + await channel_layer.group_send( + data["group_a"], + {"type": "node_command", "command": {"type": "traceroute", "target": 1}}, + ) + assert await comm_a.receive_json_from() == {"type": "traceroute", "target": 1} + + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(comm_b.receive_json_from(), timeout=0.5) + + await comm_a.disconnect() + await comm_b.disconnect() + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_mixed_mc_mt_rejected_without_disambiguator( + create_managed_node, create_constellation, create_node_api_key +): + from nodes.models import NodeAuth + + @database_sync_to_async + def setup(): + constellation = create_constellation(protocol=Protocol.MESHTASTIC) + mt_node = create_managed_node( + protocol=Protocol.MESHTASTIC, + constellation=constellation, + ) + mc_node = create_managed_node( + meshtastic_node_id=0, + protocol=Protocol.MESHCORE, + constellation=constellation, + mc_pubkey=FEEDER_PUBKEY, + ) + api_key = create_node_api_key(constellation=constellation) + NodeAuth.objects.create(api_key=api_key, node=mt_node) + NodeAuth.objects.create(api_key=api_key, node=mc_node) + return api_key.key + + api_key = await setup() + communicator = WebsocketCommunicator(application, f"/ws/nodes/?api_key={api_key}") + connected, _ = await communicator.connect() + assert connected is False + await communicator.disconnect() + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_mixed_mc_mt_with_disambiguators(create_managed_node, create_constellation, create_node_api_key): + from channels.layers import get_channel_layer + + from nodes.models import NodeAuth + + @database_sync_to_async + def setup(): + constellation = create_constellation(protocol=Protocol.MESHTASTIC) + mt_node = create_managed_node( + protocol=Protocol.MESHTASTIC, + constellation=constellation, + ) + mc_node = create_managed_node( + meshtastic_node_id=0, + protocol=Protocol.MESHCORE, + constellation=constellation, + mc_pubkey=FEEDER_PUBKEY, + ) + api_key = create_node_api_key(constellation=constellation) + NodeAuth.objects.create(api_key=api_key, node=mt_node) + NodeAuth.objects.create(api_key=api_key, node=mc_node) + return { + "api_key": api_key.key, + "mt_node_id": mt_node.meshtastic_node_id, + "mt_group": managed_node_ws_group(mt_node), + "mc_group": managed_node_ws_group(mc_node), + } + + data = await setup() + mt_comm = WebsocketCommunicator( + application, + f"/ws/nodes/?api_key={data['api_key']}&feeder_node_id={data['mt_node_id']}", + ) + mc_comm = WebsocketCommunicator( + application, + f"/ws/nodes/?api_key={data['api_key']}&feeder_pubkey_prefix={FEEDER_PREFIX}", + ) + assert (await mt_comm.connect())[0] is True + assert (await mc_comm.connect())[0] is True + + channel_layer = get_channel_layer() + await channel_layer.group_send( + data["mt_group"], + {"type": "node_command", "command": {"type": "traceroute", "target": 99}}, + ) + assert await mt_comm.receive_json_from() == {"type": "traceroute", "target": 99} + + await channel_layer.group_send( + data["mc_group"], + {"type": "node_command", "command": {"type": "refresh_feeder_config"}}, + ) + assert await mc_comm.receive_json_from() == {"type": "refresh_feeder_config"} + + await mt_comm.disconnect() + await mc_comm.disconnect() From 442286f817ed7495aaeb60fa3428205aa5eae02d Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Tue, 2 Jun 2026 09:40:10 +0100 Subject: [PATCH 3/6] docs(api-keys): document MT WebSocket feeder query params Describe feeder_node_id and feeder_node_id_str for shared Meshtastic keys; update OpenAPI and Redis docs for ws/nodes disambiguation. --- .../tests/test_meshtastic_feeder_auth.py | 32 +++++++++---------- Meshflow/ws/tests/test_mt_node_consumer.py | 5 --- docs/API_KEYS.md | 28 ++++++++++------ docs/REDIS.md | 2 +- openapi.yaml | 24 ++++++++++++++ 5 files changed, 59 insertions(+), 32 deletions(-) diff --git a/Meshflow/common/tests/test_meshtastic_feeder_auth.py b/Meshflow/common/tests/test_meshtastic_feeder_auth.py index dc18986..540422a 100644 --- a/Meshflow/common/tests/test_meshtastic_feeder_auth.py +++ b/Meshflow/common/tests/test_meshtastic_feeder_auth.py @@ -7,22 +7,20 @@ resolve_meshtastic_feeder, ) from common.protocol import Protocol +from common.mesh_node_helpers import meshtastic_id_to_hex from nodes.models import NodeAuth -MT_NODE_A = 0x433B82F0 -MT_NODE_B = 0x12345678 - @pytest.mark.django_db def test_resolve_single_feeder(create_managed_node, create_node_api_key): - node = create_managed_node(meshtastic_node_id=MT_NODE_A, protocol=Protocol.MESHTASTIC) + node = create_managed_node(protocol=Protocol.MESHTASTIC) api_key = create_node_api_key(constellation=node.constellation) NodeAuth.objects.create(api_key=api_key, node=node) assert ( resolve_meshtastic_feeder( api_key=api_key, - feeder_node_id=MT_NODE_A, + feeder_node_id=node.meshtastic_node_id, ) == node ) @@ -30,14 +28,14 @@ def test_resolve_single_feeder(create_managed_node, create_node_api_key): @pytest.mark.django_db def test_resolve_by_node_id_str(create_managed_node, create_node_api_key): - node = create_managed_node(meshtastic_node_id=MT_NODE_A, protocol=Protocol.MESHTASTIC) + node = create_managed_node(protocol=Protocol.MESHTASTIC) api_key = create_node_api_key(constellation=node.constellation) NodeAuth.objects.create(api_key=api_key, node=node) assert ( resolve_meshtastic_feeder( api_key=api_key, - feeder_node_id_str="!433b82f0", + feeder_node_id_str=meshtastic_id_to_hex(node.meshtastic_node_id), ) == node ) @@ -45,14 +43,15 @@ def test_resolve_by_node_id_str(create_managed_node, create_node_api_key): @pytest.mark.django_db def test_resolve_wrong_node_id(create_managed_node, create_node_api_key): - node = create_managed_node(meshtastic_node_id=MT_NODE_A, protocol=Protocol.MESHTASTIC) + node = create_managed_node(protocol=Protocol.MESHTASTIC) + other = create_managed_node(protocol=Protocol.MESHTASTIC) api_key = create_node_api_key(constellation=node.constellation) NodeAuth.objects.create(api_key=api_key, node=node) with pytest.raises(MeshtasticFeederResolutionError) as exc: resolve_meshtastic_feeder( api_key=api_key, - feeder_node_id=MT_NODE_B, + feeder_node_id=other.meshtastic_node_id, ) assert exc.value.code == "feeder_not_linked" @@ -61,13 +60,11 @@ def test_resolve_wrong_node_id(create_managed_node, create_node_api_key): def test_shared_key_two_feeders(create_managed_node, create_constellation, create_node_api_key): constellation = create_constellation(protocol=Protocol.MESHTASTIC) node_a = create_managed_node( - meshtastic_node_id=MT_NODE_A, protocol=Protocol.MESHTASTIC, constellation=constellation, name="Feeder A", ) node_b = create_managed_node( - meshtastic_node_id=MT_NODE_B, protocol=Protocol.MESHTASTIC, constellation=constellation, name="Feeder B", @@ -79,14 +76,14 @@ def test_shared_key_two_feeders(create_managed_node, create_constellation, creat assert ( resolve_meshtastic_feeder( api_key=api_key, - feeder_node_id=MT_NODE_A, + feeder_node_id=node_a.meshtastic_node_id, ) == node_a ) assert ( resolve_meshtastic_feeder( api_key=api_key, - feeder_node_id=MT_NODE_B, + feeder_node_id=node_b.meshtastic_node_id, ) == node_b ) @@ -101,13 +98,14 @@ def test_missing_node_id(create_node_api_key, create_constellation): @pytest.mark.django_db -def test_both_node_id_params_rejected(create_node_api_key, create_constellation): - api_key = create_node_api_key(constellation=create_constellation(protocol=Protocol.MESHTASTIC)) +def test_both_node_id_params_rejected(create_managed_node, create_node_api_key): + node = create_managed_node(protocol=Protocol.MESHTASTIC) + api_key = create_node_api_key(constellation=node.constellation) with pytest.raises(MeshtasticFeederResolutionError) as exc: resolve_meshtastic_feeder( api_key=api_key, - feeder_node_id=MT_NODE_A, - feeder_node_id_str="!433b82f0", + feeder_node_id=node.meshtastic_node_id, + feeder_node_id_str=meshtastic_id_to_hex(node.meshtastic_node_id), ) assert exc.value.code == "invalid_feeder_node_id" diff --git a/Meshflow/ws/tests/test_mt_node_consumer.py b/Meshflow/ws/tests/test_mt_node_consumer.py index 357634a..c8a47b3 100644 --- a/Meshflow/ws/tests/test_mt_node_consumer.py +++ b/Meshflow/ws/tests/test_mt_node_consumer.py @@ -13,9 +13,6 @@ application = URLRouter(Meshflow.routing.websocket_urlpatterns) -MT_NODE_A = 0x433B82F0 -MT_NODE_B = 0x12345678 - FEEDER_PUBKEY = "1a37f5aea4a1" + ("b" * 52) FEEDER_PREFIX = "1a37f5aea4a1" @@ -31,13 +28,11 @@ async def test_mt_consumer_requires_node_id_when_multiple_feeders( def setup(): constellation = create_constellation(protocol=Protocol.MESHTASTIC) node_a = create_managed_node( - meshtastic_node_id=MT_NODE_A, protocol=Protocol.MESHTASTIC, constellation=constellation, name="Feeder A", ) node_b = create_managed_node( - meshtastic_node_id=MT_NODE_B, protocol=Protocol.MESHTASTIC, constellation=constellation, name="Feeder B", diff --git a/docs/API_KEYS.md b/docs/API_KEYS.md index 48cee8f..cc4cdd6 100644 --- a/docs/API_KEYS.md +++ b/docs/API_KEYS.md @@ -52,24 +52,34 @@ Packets are correctly attributed per bot because the **observer** comes from the Each bot must use its own `node_id` in the URL. As long as the key is linked to both nodes via NodeAuth, packets are correctly associated per observer. -## WebSocket Limitation with Shared Keys +## WebSocket and Shared Keys -The WebSocket endpoint (`ws/nodes/?api_key=`) does **not** take a node ID. The consumer validates the API key and returns the **first** linked ManagedNode. The bot joins the channel group for that node. +The WebSocket endpoint (`ws/nodes/?api_key=`) must identify which feeder is connecting when multiple ManagedNodes share one key. Otherwise traceroute and other commands may be delivered to the wrong bot. -If one key is linked to nodes A and B: +| Protocol | Query parameter | Example | +|----------|-----------------|---------| +| Meshtastic | `feeder_node_id` (decimal nodenum) | `feeder_node_id=1127973616` | +| Meshtastic | `feeder_node_id_str` (`!` + 8 hex) | `feeder_node_id_str=!433b82f0` | +| MeshCore | `feeder_pubkey_prefix` (12-hex pubkey prefix) | `feeder_pubkey_prefix=1a37f5aea4a1` | -- Both bots connect with the same key -- Both receive the first linked node (e.g. A) -- Both join `node_{A}` -- Traceroute commands for node A go to both bots; only one is correct +When only one feeder is linked to the key, no extra query parameter is required. -**Recommendation:** Use one API key per ManagedNode when the bot needs WebSocket commands (e.g. traceroute). Shared keys work correctly for packet ingest only. +When multiple feeders share a key and the bot omits the correct disambiguator, the connection is **rejected** (same as MeshCore multi-feeder behaviour). + +**Deploy note:** If you run multiple Meshtastic bots on one key, deploy an updated meshtastic-bot that sends `feeder_node_id` on the WebSocket URL before or together with the API change. + +Example URLs: + +``` +ws://{host}/ws/nodes/?api_key={key}&feeder_node_id=1127973616 +ws://{host}/ws/nodes/?api_key={key}&feeder_pubkey_prefix=1a37f5aea4a1 +``` ## Authentication - **Header**: `X-API-KEY: ` or `Authorization: Token ` - **Packet ingest**: Requires `NodeAPIKeyAuthentication` and `NodeAuthorizationPermission`; the key must be linked to the node in the URL path -- **WebSocket**: Validates the key and uses the first linked ManagedNode +- **WebSocket**: Validates the key; uses the sole linked feeder, or resolves via `feeder_node_id` / `feeder_node_id_str` (Meshtastic) or `feeder_pubkey_prefix` (MeshCore) ## API Endpoints diff --git a/docs/REDIS.md b/docs/REDIS.md index 568f532..45d12d7 100644 --- a/docs/REDIS.md +++ b/docs/REDIS.md @@ -12,7 +12,7 @@ Configuration lives in [`Meshflow/Meshflow/settings/base.py`](../Meshflow/Meshfl - **Purpose:** Pub/sub and group membership for WebSocket delivery (`channel_layer.group_send` / `group_add`). - **Code:** [`Meshflow/ws/consumers.py`](../Meshflow/ws/consumers.py). - **Consumers / groups:** - - **`NodeConsumer`** (`ws/nodes/?api_key=…`) — feeder bots join group `node_{meshtastic_node_id}` or **`node_mc_{managed_node.internal_id}`** (MeshCore); receives **`node_command`** events (e.g. traceroute, `apply_mc_channel_config`). Any API/ASGI worker can `group_send`; membership is cluster-wide in Redis DB 0. Emitters include traceroute views/tasks, `meshcore_packets` apply-channel, and management commands. + - **`NodeConsumer`** (`ws/nodes/?api_key=…`; optional `feeder_node_id` / `feeder_node_id_str` for Meshtastic, `feeder_pubkey_prefix` for MeshCore when multiple feeders share a key) — feeder bots join group `node_{meshtastic_node_id}` or **`node_mc_{managed_node.internal_id}`** (MeshCore); receives **`node_command`** events (e.g. traceroute, `apply_mc_channel_config`). Any API/ASGI worker can `group_send`; membership is cluster-wide in Redis DB 0. Emitters include traceroute views/tasks, `meshcore_packets` apply-channel, and management commands. - **`TextMessageConsumer`** — authenticated UI clients join group **`text_messages`**; receives **`text_message`** events for mesh text broadcasts ([`Meshflow/ws/services/text_message.py`](../Meshflow/ws/services/text_message.py)). - **`TracerouteConsumer`** (`ws/traceroutes/?token=…`) — clients join group **`traceroutes`**; receives **`traceroute_update`** events ([`Meshflow/traceroute/ws_notify.py`](../Meshflow/traceroute/ws_notify.py)). - **`NodeClaimConsumer`** (`ws/claims/?token=…`) — clients join group **`user_claims_{user_id}`**; receives **`node_claim_update`** / `node_claim_accepted` when ownership proof succeeds ([`Meshflow/nodes/ws_notify.py`](../Meshflow/nodes/ws_notify.py), signal `node_claim_authorized`). diff --git a/openapi.yaml b/openapi.yaml index 65e199c..c1e7395 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -7820,6 +7820,12 @@ paths: ws://{host}/ws/nodes/?api_key={node_api_key} ``` + When multiple ManagedNodes share one API key, pass a feeder identity query parameter + (same disambiguation as packet ingest): + + - Meshtastic: `feeder_node_id` (decimal nodenum) and/or `feeder_node_id_str` (`!` + 8 hex) + - MeshCore: `feeder_pubkey_prefix` (12-hex device pubkey prefix) + Authentication uses the same NodeAPIKey as packet ingest (X-API-Key header). For WebSocket, the key must be passed as the `api_key` query parameter. @@ -7848,6 +7854,24 @@ paths: schema: type: string description: NodeAPIKey for authentication (same key used for packet ingest) + - name: feeder_node_id + in: query + required: false + schema: + type: integer + description: Meshtastic feeder nodenum (decimal). Required when multiple MT feeders share the key. + - name: feeder_node_id_str + in: query + required: false + schema: + type: string + description: Meshtastic feeder id as !xxxxxxxx hex. Alternative to feeder_node_id. + - name: feeder_pubkey_prefix + in: query + required: false + schema: + type: string + description: MeshCore feeder 12-hex pubkey prefix. Required when multiple MC feeders share the key. responses: '101': description: WebSocket connection established From 18be5a8d97acd91b99b3927a074fbd4f8071e760 Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Tue, 2 Jun 2026 11:09:33 +0100 Subject: [PATCH 4/6] fix(settings): tune Channels Redis client timeouts for WebSocket stability Add _channel_redis_host() with configurable socket timeouts (default 30s) so channels_redis BZPOPMIN can outlive busy Redis on DB 1. Set CELERY_RESULT_EXPIRES default to 3600s to limit result-key churn. --- Meshflow/Meshflow/settings/base.py | 23 ++++++++++++- .../Meshflow/tests/test_redis_settings.py | 34 +++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) create mode 100644 Meshflow/Meshflow/tests/test_redis_settings.py diff --git a/Meshflow/Meshflow/settings/base.py b/Meshflow/Meshflow/settings/base.py index 7957774..41d2fed 100644 --- a/Meshflow/Meshflow/settings/base.py +++ b/Meshflow/Meshflow/settings/base.py @@ -127,12 +127,31 @@ _redis_port = os.environ.get("REDIS_PORT", "6379") _redis_url = f"redis://:{_redis_password}@{_redis_host}:{_redis_port}/0" + +def _channel_redis_host() -> dict: + """ + Connection kwargs for channels_redis (Redis DB 0). + + channels_redis blocks on BZPOPMIN with a 5s server-side timeout; client socket_timeout + must exceed that plus headroom when the shared Redis process is busy (e.g. Celery + result keys on DB 1), or WebSocket consumers raise TimeoutError and reconnect-loop. + """ + host: dict = {"address": _redis_url} + raw_timeout = os.environ.get("CHANNEL_REDIS_SOCKET_TIMEOUT", "30").strip() + if raw_timeout.lower() not in ("", "none", "null"): + host["socket_timeout"] = float(raw_timeout) + connect_raw = os.environ.get("CHANNEL_REDIS_SOCKET_CONNECT_TIMEOUT", "5").strip() + if connect_raw: + host["socket_connect_timeout"] = float(connect_raw) + return host + + # Channel layers for Django Channels CHANNEL_LAYERS = { "default": { "BACKEND": "channels_redis.core.RedisChannelLayer", "CONFIG": { - "hosts": [_redis_url], + "hosts": [_channel_redis_host()], }, }, } @@ -146,6 +165,8 @@ _celery_broker_url = f"redis://:{_redis_password}@{_redis_host}:{_redis_port}/1" CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", _celery_broker_url) CELERY_RESULT_BACKEND = CELERY_BROKER_URL +# Shorter than Celery default (~24h) to limit celery-task-meta-* growth on Redis DB 1. +CELERY_RESULT_EXPIRES = int(os.environ.get("CELERY_RESULT_EXPIRES", "3600")) CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler" # Route heavy RF propagation renders to a dedicated queue consumed by celery-rf-worker; # keep the default "celery" queue so the existing worker picks up everything else. diff --git a/Meshflow/Meshflow/tests/test_redis_settings.py b/Meshflow/Meshflow/tests/test_redis_settings.py new file mode 100644 index 0000000..0fa8855 --- /dev/null +++ b/Meshflow/Meshflow/tests/test_redis_settings.py @@ -0,0 +1,34 @@ +"""Redis-related Django settings (Channels + Celery).""" + +import os + +from django.test import SimpleTestCase + + +class ChannelRedisHostTest(SimpleTestCase): + def test_default_socket_timeout_for_channels(self): + from Meshflow.settings import base as settings_base + + host = settings_base._channel_redis_host() + self.assertIn("address", host) + self.assertEqual(host["socket_timeout"], 30.0) + self.assertEqual(host["socket_connect_timeout"], 5.0) + + def test_socket_timeout_none_omits_key(self): + from Meshflow.settings import base as settings_base + + prior = os.environ.get("CHANNEL_REDIS_SOCKET_TIMEOUT") + try: + os.environ["CHANNEL_REDIS_SOCKET_TIMEOUT"] = "none" + host = settings_base._channel_redis_host() + self.assertNotIn("socket_timeout", host) + finally: + if prior is None: + os.environ.pop("CHANNEL_REDIS_SOCKET_TIMEOUT", None) + else: + os.environ["CHANNEL_REDIS_SOCKET_TIMEOUT"] = prior + + def test_celery_result_expires_default(self): + from django.conf import settings + + self.assertEqual(settings.CELERY_RESULT_EXPIRES, 3600) From ab14599ef82e103c7e5e9411cb1110f132788a91 Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Tue, 2 Jun 2026 11:09:33 +0100 Subject: [PATCH 5/6] chore(celery): ignore results on periodic beat tasks Stop writing celery-task-meta-* keys for high-frequency scheduled tasks that do not use return values, reducing load on Redis DB 1. --- Meshflow/dx_monitoring/tasks.py | 2 +- Meshflow/mesh_monitoring/tasks.py | 4 ++-- Meshflow/meshcore_packet_path/tasks.py | 4 ++-- Meshflow/nodes/tasks.py | 2 +- Meshflow/traceroute/tasks.py | 6 +++--- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/Meshflow/dx_monitoring/tasks.py b/Meshflow/dx_monitoring/tasks.py index ad9214f..6deecfb 100644 --- a/Meshflow/dx_monitoring/tasks.py +++ b/Meshflow/dx_monitoring/tasks.py @@ -6,7 +6,7 @@ from dx_monitoring.notification_service import run_notify_dx_event -@shared_task +@shared_task(ignore_result=True) def explore_active_dx_events(batch_size: int = 50): """Periodically queue or link traceroute exploration for active DX events.""" return scan_active_dx_events_for_traceroutes(batch_size=batch_size) diff --git a/Meshflow/mesh_monitoring/tasks.py b/Meshflow/mesh_monitoring/tasks.py index 7d29c2f..785d750 100644 --- a/Meshflow/mesh_monitoring/tasks.py +++ b/Meshflow/mesh_monitoring/tasks.py @@ -36,7 +36,7 @@ def _effective_last_heard_offline_after_seconds(obs: ObservedNode) -> int: return int(DEFAULT_OFFLINE_AFTER_SECONDS) -@shared_task +@shared_task(ignore_result=True) def send_monitoring_traceroute_command(_auto_traceroute_id: int) -> None: """ Legacy hook: monitoring TR delivery is handled by :func:`traceroute.tasks.dispatch_pending_traceroutes`. @@ -75,7 +75,7 @@ def _dispatch_monitoring_round(observed: ObservedNode) -> None: ) -@shared_task +@shared_task(ignore_result=True) def process_node_watch_presence() -> dict: """ Periodic task (~1 min). For each observed node with enabled watches, update presence: diff --git a/Meshflow/meshcore_packet_path/tasks.py b/Meshflow/meshcore_packet_path/tasks.py index 069dc90..a463c4a 100644 --- a/Meshflow/meshcore_packet_path/tasks.py +++ b/Meshflow/meshcore_packet_path/tasks.py @@ -14,7 +14,7 @@ ) -@shared_task +@shared_task(ignore_result=True) def collect_path_edge_buckets(): """Hourly: rollup hash-chain edges for the completed previous hour.""" current_hour = timezone.now().replace(minute=0, second=0, microsecond=0) @@ -35,7 +35,7 @@ def backfill_path_edge_buckets_task(hours: int | None = None, days: int | None = return collect_path_edge_buckets_for_range(start_hour, current_hour, skip_existing=True) -@shared_task +@shared_task(ignore_result=True) def evict_old_path_data(): """Delete path edge buckets (and stale unknown segments) older than retention.""" from meshcore_packet_path.models import MeshCorePathEdgeBucket, MeshCorePathSegmentResolution, SegmentStatus diff --git a/Meshflow/nodes/tasks.py b/Meshflow/nodes/tasks.py index 0dd30bc..cc280b3 100644 --- a/Meshflow/nodes/tasks.py +++ b/Meshflow/nodes/tasks.py @@ -14,7 +14,7 @@ from packets.models import PacketObservation -@shared_task +@shared_task(ignore_result=True) def update_managed_node_statuses(): """ Bulk refresh ManagedNodeStatus from PacketObservation and MeshCorePacketObservation upload_time. diff --git a/Meshflow/traceroute/tasks.py b/Meshflow/traceroute/tasks.py index 4af61d1..7063c28 100644 --- a/Meshflow/traceroute/tasks.py +++ b/Meshflow/traceroute/tasks.py @@ -110,7 +110,7 @@ def schedule_traceroutes(): return {"created": 0} -@shared_task +@shared_task(ignore_result=True) def dispatch_pending_traceroutes(): """ Run frequently (e.g. every 15s). Send due ``AutoTraceRoute`` rows to managed nodes over Channels, @@ -142,7 +142,7 @@ def export_traceroutes_to_neo4j(): return export_traceroutes_to_neo4j_impl() -@shared_task +@shared_task(ignore_result=True) def push_traceroute_to_neo4j(auto_traceroute_id: int): """ Push a single completed AutoTraceRoute to Neo4j. @@ -155,7 +155,7 @@ def push_traceroute_to_neo4j(auto_traceroute_id: int): return push_traceroute_to_neo4j_impl(auto_traceroute_id) -@shared_task +@shared_task(ignore_result=True) def mark_stale_traceroutes_failed(): """ Run every 60 seconds. Mark pending/sent auto-traceroutes as failed after From 675b5342d454c6f1529434a6fa9613fc4a64752b Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Tue, 2 Jun 2026 11:09:38 +0100 Subject: [PATCH 6/6] style(test): isort imports in meshtastic feeder auth tests --- Meshflow/common/tests/test_meshtastic_feeder_auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Meshflow/common/tests/test_meshtastic_feeder_auth.py b/Meshflow/common/tests/test_meshtastic_feeder_auth.py index 540422a..4e8f2fe 100644 --- a/Meshflow/common/tests/test_meshtastic_feeder_auth.py +++ b/Meshflow/common/tests/test_meshtastic_feeder_auth.py @@ -2,12 +2,12 @@ import pytest +from common.mesh_node_helpers import meshtastic_id_to_hex from common.meshtastic_feeder_auth import ( MeshtasticFeederResolutionError, resolve_meshtastic_feeder, ) from common.protocol import Protocol -from common.mesh_node_helpers import meshtastic_id_to_hex from nodes.models import NodeAuth