Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion Meshflow/Meshflow/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,31 @@
_redis_port = os.environ.get("REDIS_PORT", "6379")
_redis_url = f"redis://:{_redis_password}@{_redis_host}:{_redis_port}/0"


def _channel_redis_host() -> dict:
"""
Connection kwargs for channels_redis (Redis DB 0).

channels_redis blocks on BZPOPMIN with a 5s server-side timeout; client socket_timeout
must exceed that plus headroom when the shared Redis process is busy (e.g. Celery
result keys on DB 1), or WebSocket consumers raise TimeoutError and reconnect-loop.
"""
host: dict = {"address": _redis_url}
raw_timeout = os.environ.get("CHANNEL_REDIS_SOCKET_TIMEOUT", "30").strip()
if raw_timeout.lower() not in ("", "none", "null"):
host["socket_timeout"] = float(raw_timeout)
connect_raw = os.environ.get("CHANNEL_REDIS_SOCKET_CONNECT_TIMEOUT", "5").strip()
if connect_raw:
host["socket_connect_timeout"] = float(connect_raw)
return host


# Channel layers for Django Channels
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [_redis_url],
"hosts": [_channel_redis_host()],
},
},
}
Expand All @@ -146,6 +165,8 @@
_celery_broker_url = f"redis://:{_redis_password}@{_redis_host}:{_redis_port}/1"
CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL", _celery_broker_url)
CELERY_RESULT_BACKEND = CELERY_BROKER_URL
# Shorter than Celery default (~24h) to limit celery-task-meta-* growth on Redis DB 1.
CELERY_RESULT_EXPIRES = int(os.environ.get("CELERY_RESULT_EXPIRES", "3600"))
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
# Route heavy RF propagation renders to a dedicated queue consumed by celery-rf-worker;
# keep the default "celery" queue so the existing worker picks up everything else.
Expand Down
34 changes: 34 additions & 0 deletions Meshflow/Meshflow/tests/test_redis_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Redis-related Django settings (Channels + Celery)."""

import os

from django.test import SimpleTestCase


class ChannelRedisHostTest(SimpleTestCase):
def test_default_socket_timeout_for_channels(self):
from Meshflow.settings import base as settings_base

host = settings_base._channel_redis_host()
self.assertIn("address", host)
self.assertEqual(host["socket_timeout"], 30.0)
self.assertEqual(host["socket_connect_timeout"], 5.0)

def test_socket_timeout_none_omits_key(self):
from Meshflow.settings import base as settings_base

prior = os.environ.get("CHANNEL_REDIS_SOCKET_TIMEOUT")
try:
os.environ["CHANNEL_REDIS_SOCKET_TIMEOUT"] = "none"
host = settings_base._channel_redis_host()
self.assertNotIn("socket_timeout", host)
finally:
if prior is None:
os.environ.pop("CHANNEL_REDIS_SOCKET_TIMEOUT", None)
else:
os.environ["CHANNEL_REDIS_SOCKET_TIMEOUT"] = prior

def test_celery_result_expires_default(self):
from django.conf import settings

self.assertEqual(settings.CELERY_RESULT_EXPIRES, 3600)
97 changes: 97 additions & 0 deletions Meshflow/common/meshtastic_feeder_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""Resolve Meshtastic feeder ManagedNode from API key + node id."""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional

from common.mesh_node_helpers import (
meshtastic_hex_to_int,
normalize_meshtastic_lookup_hex,
)
from common.protocol import Protocol
from nodes.models import NodeAuth

if TYPE_CHECKING:
from nodes.models import ManagedNode, NodeAPIKey


@dataclass(frozen=True)
class MeshtasticFeederResolutionError(Exception):
"""Feeder could not be resolved for this request."""

code: str
detail: str

def __str__(self) -> str:
return self.detail


def resolve_meshtastic_feeder(
*,
api_key: NodeAPIKey,
feeder_node_id: Optional[int] = None,
feeder_node_id_str: Optional[str] = None,
) -> ManagedNode:
"""
Pick the Meshtastic ManagedNode for a feeder-scoped request.

Raises MeshtasticFeederResolutionError with a stable ``code`` when resolution fails.
"""
node_id = _resolve_meshtastic_node_id(
feeder_node_id=feeder_node_id,
feeder_node_id_str=feeder_node_id_str,
)

try:
node_auth = NodeAuth.objects.select_related("node", "node__constellation").get(
api_key=api_key,
node__protocol=Protocol.MESHTASTIC,
node__meshtastic_node_id=node_id,
node__deleted_at__isnull=True,
)
except NodeAuth.DoesNotExist:
raise MeshtasticFeederResolutionError(
code="feeder_not_linked",
detail="API key is not linked to a Meshtastic feeder with this node id.",
) from None

return node_auth.node


def _resolve_meshtastic_node_id(
*,
feeder_node_id: Optional[int] = None,
feeder_node_id_str: Optional[str] = None,
) -> int:
if feeder_node_id is not None and feeder_node_id_str:
raise MeshtasticFeederResolutionError(
code="invalid_feeder_node_id",
detail="Pass feeder_node_id or feeder_node_id_str, not both.",
)

if feeder_node_id is not None:
return feeder_node_id

if feeder_node_id_str:
token = feeder_node_id_str.strip()
hex_part = normalize_meshtastic_lookup_hex(token)
if hex_part:
return int(hex_part, 16)
if token.startswith("!") or token == "^all":
try:
return meshtastic_hex_to_int(token)
except ValueError as exc:
raise MeshtasticFeederResolutionError(
code="invalid_feeder_node_id_str",
detail=str(exc),
) from exc
raise MeshtasticFeederResolutionError(
code="invalid_feeder_node_id_str",
detail="feeder_node_id_str must be !xxxxxxxx or 8 hex digits.",
)

raise MeshtasticFeederResolutionError(
code="missing_feeder_node_id",
detail="feeder_node_id or feeder_node_id_str is required.",
)
121 changes: 121 additions & 0 deletions Meshflow/common/tests/test_meshtastic_feeder_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""Unit tests for Meshtastic feeder resolution."""

import pytest

from common.mesh_node_helpers import meshtastic_id_to_hex
from common.meshtastic_feeder_auth import (
MeshtasticFeederResolutionError,
resolve_meshtastic_feeder,
)
from common.protocol import Protocol
from nodes.models import NodeAuth


@pytest.mark.django_db
def test_resolve_single_feeder(create_managed_node, create_node_api_key):
node = create_managed_node(protocol=Protocol.MESHTASTIC)
api_key = create_node_api_key(constellation=node.constellation)
NodeAuth.objects.create(api_key=api_key, node=node)

assert (
resolve_meshtastic_feeder(
api_key=api_key,
feeder_node_id=node.meshtastic_node_id,
)
== node
)


@pytest.mark.django_db
def test_resolve_by_node_id_str(create_managed_node, create_node_api_key):
node = create_managed_node(protocol=Protocol.MESHTASTIC)
api_key = create_node_api_key(constellation=node.constellation)
NodeAuth.objects.create(api_key=api_key, node=node)

assert (
resolve_meshtastic_feeder(
api_key=api_key,
feeder_node_id_str=meshtastic_id_to_hex(node.meshtastic_node_id),
)
== node
)


@pytest.mark.django_db
def test_resolve_wrong_node_id(create_managed_node, create_node_api_key):
node = create_managed_node(protocol=Protocol.MESHTASTIC)
other = create_managed_node(protocol=Protocol.MESHTASTIC)
api_key = create_node_api_key(constellation=node.constellation)
NodeAuth.objects.create(api_key=api_key, node=node)

with pytest.raises(MeshtasticFeederResolutionError) as exc:
resolve_meshtastic_feeder(
api_key=api_key,
feeder_node_id=other.meshtastic_node_id,
)
assert exc.value.code == "feeder_not_linked"


@pytest.mark.django_db
def test_shared_key_two_feeders(create_managed_node, create_constellation, create_node_api_key):
constellation = create_constellation(protocol=Protocol.MESHTASTIC)
node_a = create_managed_node(
protocol=Protocol.MESHTASTIC,
constellation=constellation,
name="Feeder A",
)
node_b = create_managed_node(
protocol=Protocol.MESHTASTIC,
constellation=constellation,
name="Feeder B",
)
api_key = create_node_api_key(constellation=constellation)
NodeAuth.objects.create(api_key=api_key, node=node_a)
NodeAuth.objects.create(api_key=api_key, node=node_b)

assert (
resolve_meshtastic_feeder(
api_key=api_key,
feeder_node_id=node_a.meshtastic_node_id,
)
== node_a
)
assert (
resolve_meshtastic_feeder(
api_key=api_key,
feeder_node_id=node_b.meshtastic_node_id,
)
== node_b
)


@pytest.mark.django_db
def test_missing_node_id(create_node_api_key, create_constellation):
api_key = create_node_api_key(constellation=create_constellation(protocol=Protocol.MESHTASTIC))
with pytest.raises(MeshtasticFeederResolutionError) as exc:
resolve_meshtastic_feeder(api_key=api_key)
assert exc.value.code == "missing_feeder_node_id"


@pytest.mark.django_db
def test_both_node_id_params_rejected(create_managed_node, create_node_api_key):
node = create_managed_node(protocol=Protocol.MESHTASTIC)
api_key = create_node_api_key(constellation=node.constellation)
with pytest.raises(MeshtasticFeederResolutionError) as exc:
resolve_meshtastic_feeder(
api_key=api_key,
feeder_node_id=node.meshtastic_node_id,
feeder_node_id_str=meshtastic_id_to_hex(node.meshtastic_node_id),
)
assert exc.value.code == "invalid_feeder_node_id"


@pytest.mark.django_db
def test_invalid_node_id_str(create_node_api_key, create_constellation):
api_key = create_node_api_key(constellation=create_constellation(protocol=Protocol.MESHTASTIC))
with pytest.raises(MeshtasticFeederResolutionError) as exc:
resolve_meshtastic_feeder(
api_key=api_key,
feeder_node_id_str="not-a-node-id",
)
assert exc.value.code == "invalid_feeder_node_id_str"
2 changes: 1 addition & 1 deletion Meshflow/dx_monitoring/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dx_monitoring.notification_service import run_notify_dx_event


@shared_task
@shared_task(ignore_result=True)
def explore_active_dx_events(batch_size: int = 50):
"""Periodically queue or link traceroute exploration for active DX events."""
return scan_active_dx_events_for_traceroutes(batch_size=batch_size)
Expand Down
4 changes: 2 additions & 2 deletions Meshflow/mesh_monitoring/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def _effective_last_heard_offline_after_seconds(obs: ObservedNode) -> int:
return int(DEFAULT_OFFLINE_AFTER_SECONDS)


@shared_task
@shared_task(ignore_result=True)
def send_monitoring_traceroute_command(_auto_traceroute_id: int) -> None:
"""
Legacy hook: monitoring TR delivery is handled by :func:`traceroute.tasks.dispatch_pending_traceroutes`.
Expand Down Expand Up @@ -75,7 +75,7 @@ def _dispatch_monitoring_round(observed: ObservedNode) -> None:
)


@shared_task
@shared_task(ignore_result=True)
def process_node_watch_presence() -> dict:
"""
Periodic task (~1 min). For each observed node with enabled watches, update presence:
Expand Down
4 changes: 2 additions & 2 deletions Meshflow/meshcore_packet_path/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
)


@shared_task
@shared_task(ignore_result=True)
def collect_path_edge_buckets():
"""Hourly: rollup hash-chain edges for the completed previous hour."""
current_hour = timezone.now().replace(minute=0, second=0, microsecond=0)
Expand All @@ -35,7 +35,7 @@ def backfill_path_edge_buckets_task(hours: int | None = None, days: int | None =
return collect_path_edge_buckets_for_range(start_hour, current_hour, skip_existing=True)


@shared_task
@shared_task(ignore_result=True)
def evict_old_path_data():
"""Delete path edge buckets (and stale unknown segments) older than retention."""
from meshcore_packet_path.models import MeshCorePathEdgeBucket, MeshCorePathSegmentResolution, SegmentStatus
Expand Down
2 changes: 1 addition & 1 deletion Meshflow/nodes/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from packets.models import PacketObservation


@shared_task
@shared_task(ignore_result=True)
def update_managed_node_statuses():
"""
Bulk refresh ManagedNodeStatus from PacketObservation and MeshCorePacketObservation upload_time.
Expand Down
6 changes: 3 additions & 3 deletions Meshflow/traceroute/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def schedule_traceroutes():
return {"created": 0}


@shared_task
@shared_task(ignore_result=True)
def dispatch_pending_traceroutes():
"""
Run frequently (e.g. every 15s). Send due ``AutoTraceRoute`` rows to managed nodes over Channels,
Expand Down Expand Up @@ -142,7 +142,7 @@ def export_traceroutes_to_neo4j():
return export_traceroutes_to_neo4j_impl()


@shared_task
@shared_task(ignore_result=True)
def push_traceroute_to_neo4j(auto_traceroute_id: int):
"""
Push a single completed AutoTraceRoute to Neo4j.
Expand All @@ -155,7 +155,7 @@ def push_traceroute_to_neo4j(auto_traceroute_id: int):
return push_traceroute_to_neo4j_impl(auto_traceroute_id)


@shared_task
@shared_task(ignore_result=True)
def mark_stale_traceroutes_failed():
"""
Run every 60 seconds. Mark pending/sent auto-traceroutes as failed after
Expand Down
Loading