Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .cursorignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,6 @@ remote_db_dump.sql

# Tailwind / CSS build artifact (if generated locally)
tailwind.css

# Secrets
.env
70 changes: 61 additions & 9 deletions Meshflow/common/feeder_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import logging
import time

from channels.layers import get_channel_layer

Expand All @@ -12,16 +13,65 @@
COMMAND_DISPATCH_UNAVAILABLE = "command_dispatch_unavailable"


def _ws_json_safe(value):
"""Recursively coerce channel-layer payloads to msgpack-safe plain Python."""
from django.utils.functional import Promise

if isinstance(value, Promise):
return str(value)
if isinstance(value, dict):
return {str(k): _ws_json_safe(v) for k, v in value.items()}
if isinstance(value, (list, tuple)):
return [_ws_json_safe(v) for v in value]
if isinstance(value, (str, int, float, bool)) or value is None:
return value
return str(value)


async def _redis_group_has_channels(layer, group: str) -> bool:
"""Presence check for channels_redis (group is a ZSET at asgi:group:{name})."""
key = layer._group_key(group)
connection = layer.connection(layer.consistent_hash(group))
await connection.zremrangebyscore(
key,
min=0,
max=int(time.time()) - layer.group_expiry,
)
names = await connection.zrange(key, 0, -1)
return bool(names)


async def _inmemory_group_has_channels(layer, group: str) -> bool:
"""Presence check for InMemoryChannelLayer (tests)."""
members = layer.groups.get(group) or {}
now = time.time()
return any(now - ts < layer.group_expiry for ts in members.values())


async def feeder_ws_group_has_subscribers(group: str) -> bool:
"""Return True if at least one bot WebSocket is subscribed to ``group``."""
channel_layer = get_channel_layer()
if channel_layer is None:
return False
if not hasattr(channel_layer, "group_channels"):
logger.warning("Channel layer does not support group_channels; assuming feeder offline")
return False
channels = await channel_layer.group_channels(group)
return bool(channels)

try:
from channels.layers import InMemoryChannelLayer
from channels_redis.core import RedisChannelLayer
except ImportError:
RedisChannelLayer = None # type: ignore[misc, assignment]
InMemoryChannelLayer = None # type: ignore[misc, assignment]

if RedisChannelLayer is not None and isinstance(channel_layer, RedisChannelLayer):
return await _redis_group_has_channels(channel_layer, group)

if InMemoryChannelLayer is not None and isinstance(channel_layer, InMemoryChannelLayer):
return await _inmemory_group_has_channels(channel_layer, group)

logger.warning(
"Channel layer %s has no membership probe; assuming feeder offline",
type(channel_layer).__name__,
)
return False


async def dispatch_node_command(group: str, command: dict) -> None:
Expand All @@ -31,8 +81,10 @@ async def dispatch_node_command(group: str, command: dict) -> None:
raise RuntimeError("Channel layer is not configured")
await channel_layer.group_send(
group,
{
"type": "node_command",
"command": command,
},
_ws_json_safe(
{
"type": "node_command",
"command": command,
}
),
)
50 changes: 50 additions & 0 deletions Meshflow/common/mc_channel_labels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Display labels and apply payloads for MeshCore MessageChannel rows."""

from __future__ import annotations

from constellations.models import MeshCoreChannelType, MessageChannel


def mc_channel_admin_label(channel: MessageChannel) -> str:
"""Human label for admin lists: #hashtag for HASHTAG, plain name for PUBLIC."""
if channel.mc_channel_type == MeshCoreChannelType.HASHTAG:
tag = (channel.mc_hashtag or channel.name or "").strip().lstrip("#")
if tag:
return f"#{tag}"
name = (channel.name or "").strip()
if name:
return name
if channel.mc_channel_idx is not None:
return f"slot {channel.mc_channel_idx}"
return str(channel.pk)


def mc_channel_type_name(channel: MessageChannel) -> str:
if channel.mc_channel_type is None:
return "—"
return MeshCoreChannelType(channel.mc_channel_type).name


def message_channel_to_apply_entry(channel: MessageChannel) -> dict:
"""Build one apply_mc_channel_config entry from a MessageChannel row."""
ch_type = mc_channel_type_name(channel)
if ch_type == "—":
ch_type = "PUBLIC"
entry = {
"mc_channel_idx": channel.mc_channel_idx,
"name": channel.name,
"mc_channel_type": ch_type,
}
if channel.mc_channel_type == MeshCoreChannelType.HASHTAG:
tag = (channel.mc_hashtag or channel.name or "").strip().lstrip("#")
entry["mc_hashtag"] = tag[:64] if tag else None
if tag:
entry["name"] = tag[:100]
return entry


def managed_node_mc_channels_queryset(managed_node):
"""MC channel rows linked on a MeshCore feeder (device mirror)."""
from common.protocol import Protocol

return managed_node.mc_channels.filter(protocol=Protocol.MESHCORE).order_by("mc_channel_idx")
103 changes: 103 additions & 0 deletions Meshflow/common/tests/test_feeder_ws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""Tests for feeder WebSocket presence helpers."""

import time
from unittest.mock import AsyncMock, MagicMock

from django.utils.translation import gettext_lazy

import msgpack
import pytest

from common.feeder_ws import _ws_json_safe, dispatch_node_command, feeder_ws_group_has_subscribers


@pytest.mark.asyncio
async def test_redis_group_has_subscribers_when_zset_non_empty(monkeypatch):
from channels_redis.core import RedisChannelLayer

layer = MagicMock(spec=RedisChannelLayer)
layer._group_key.return_value = "asgi:group:node_mc_test"
layer.consistent_hash.return_value = 0
layer.group_expiry = 86400
connection = AsyncMock()
connection.zrange.return_value = [b"specific.channel.name"]
layer.connection.return_value = connection

monkeypatch.setattr(
"common.feeder_ws.get_channel_layer",
lambda: layer,
)

assert await feeder_ws_group_has_subscribers("node_mc_test") is True
connection.zremrangebyscore.assert_awaited_once()
connection.zrange.assert_awaited_once()


@pytest.mark.asyncio
async def test_redis_group_has_no_subscribers_when_zset_empty(monkeypatch):
from channels_redis.core import RedisChannelLayer

layer = MagicMock(spec=RedisChannelLayer)
layer._group_key.return_value = "asgi:group:node_mc_empty"
layer.consistent_hash.return_value = 0
layer.group_expiry = 86400
connection = AsyncMock()
connection.zrange.return_value = []
layer.connection.return_value = connection

monkeypatch.setattr(
"common.feeder_ws.get_channel_layer",
lambda: layer,
)

assert await feeder_ws_group_has_subscribers("node_mc_empty") is False


@pytest.mark.asyncio
async def test_inmemory_group_has_subscribers(monkeypatch):
from channels.layers import InMemoryChannelLayer

layer = InMemoryChannelLayer()
layer.groups["node_mc_mem"] = {"test.channel": time.time()}

monkeypatch.setattr(
"common.feeder_ws.get_channel_layer",
lambda: layer,
)

assert await feeder_ws_group_has_subscribers("node_mc_mem") is True


def test_ws_json_safe_coerces_lazy_translation_proxy():
payload = {
"type": "node_command",
"command": {
"type": "apply_mc_channels",
"channels": [
{
"mc_channel_type": gettext_lazy("HASHTAG"),
"name": "test",
}
],
},
}
safe = _ws_json_safe(payload)
msgpack.packb(safe)
assert safe["command"]["channels"][0]["mc_channel_type"] == "HASHTAG"


@pytest.mark.asyncio
async def test_dispatch_node_command_sends_msgpack_safe_payload(monkeypatch):
layer = AsyncMock()
monkeypatch.setattr("common.feeder_ws.get_channel_layer", lambda: layer)

command = {
"type": "apply_mc_channels",
"channels": [{"mc_channel_type": gettext_lazy("PUBLIC"), "name": "x"}],
}
await dispatch_node_command("node_mc_test", command)

layer.group_send.assert_awaited_once()
message = layer.group_send.await_args[0][1]
msgpack.packb(message)
assert message["command"]["channels"][0]["mc_channel_type"] == "PUBLIC"
54 changes: 54 additions & 0 deletions Meshflow/common/tests/test_mc_channel_labels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Tests for MeshCore channel admin labels and apply payloads."""

import pytest

from common.mc_channel_labels import (
mc_channel_admin_label,
message_channel_to_apply_entry,
)
from common.protocol import Protocol
from constellations.models import MeshCoreChannelType, MessageChannel


@pytest.mark.django_db
def test_mc_channel_admin_label_public(create_constellation):
constellation = create_constellation()
ch = MessageChannel.objects.create(
name="Public",
constellation=constellation,
protocol=Protocol.MESHCORE,
mc_channel_idx=0,
mc_channel_type=MeshCoreChannelType.PUBLIC,
)
assert mc_channel_admin_label(ch) == "Public"


@pytest.mark.django_db
def test_mc_channel_admin_label_hashtag_prefix(create_constellation):
constellation = create_constellation()
ch = MessageChannel.objects.create(
name="galloway",
constellation=constellation,
protocol=Protocol.MESHCORE,
mc_channel_idx=1,
mc_channel_type=MeshCoreChannelType.HASHTAG,
mc_hashtag="galloway",
)
assert mc_channel_admin_label(ch) == "#galloway"


@pytest.mark.django_db
def test_message_channel_to_apply_entry_hashtag(create_constellation):
constellation = create_constellation()
ch = MessageChannel.objects.create(
name="galloway",
constellation=constellation,
protocol=Protocol.MESHCORE,
mc_channel_idx=1,
mc_channel_type=MeshCoreChannelType.HASHTAG,
mc_hashtag="galloway",
)
entry = message_channel_to_apply_entry(ch)
assert entry["mc_channel_type"] == "HASHTAG"
assert entry["mc_hashtag"] == "galloway"
assert entry["name"] == "galloway"
68 changes: 63 additions & 5 deletions Meshflow/constellations/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@
from django.utils.html import format_html
from django.utils.translation import gettext_lazy as _

from .models import Constellation, ConstellationUserMembership, MessageChannel
from common.mc_channel_labels import mc_channel_admin_label, mc_channel_type_name
from common.protocol import Protocol

from .models import (
Constellation,
ConstellationUserMembership,
MeshCoreMessageChannel,
MessageChannel,
)


class ConstellationAdminForm(forms.ModelForm):
Expand Down Expand Up @@ -126,11 +134,61 @@ def get_constellation_creator(self, obj):

@admin.register(MessageChannel)
class MessageChannelAdmin(admin.ModelAdmin):
list_display = ("id", "name", "protocol", "mc_channel_idx", "constellation")
"""Meshtastic and legacy rows; MeshCore operators should use MeshCore channels."""

list_display = ("id", "name", "protocol", "constellation")
list_filter = (
("protocol", admin.ChoicesFieldListFilter),
"constellation",
("mc_channel_idx", admin.EmptyFieldListFilter),
)
search_fields = ("name", "id")
ordering = ("name",)
search_fields = ("name", "id", "constellation__name")
ordering = ("constellation__name", "name")
list_select_related = ("constellation",)


@admin.register(MeshCoreMessageChannel)
class MeshCoreMessageChannelAdmin(admin.ModelAdmin):
"""Constellation MC channel catalog (device slots). Push to radio from Managed node admin."""

list_display = (
"mc_channel_idx",
"admin_label",
"mc_channel_type_display",
"constellation",
"id",
)
list_filter = (
("mc_channel_type", admin.ChoicesFieldListFilter),
"constellation",
)
search_fields = ("name", "mc_hashtag", "constellation__name")
ordering = ("constellation__name", "mc_channel_idx")
list_select_related = ("constellation",)
fieldsets = (
(None, {"fields": ("constellation", "mc_channel_idx")}),
(
_("Channel"),
{
"fields": ("name", "mc_channel_type", "mc_hashtag"),
"description": _(
"PUBLIC channels use a plain name. HASHTAG channels use mc_hashtag "
"(no leading #); lists show #prefix for hashtags."
),
},
),
)

def get_queryset(self, request):
return super().get_queryset(request).filter(protocol=Protocol.MESHCORE)

def save_model(self, request, obj, form, change):
obj.protocol = Protocol.MESHCORE
super().save_model(request, obj, form, change)

@admin.display(description=_("Label"), ordering="name")
def admin_label(self, obj):
return mc_channel_admin_label(obj)

@admin.display(description=_("Type"), ordering="mc_channel_type")
def mc_channel_type_display(self, obj):
return mc_channel_type_name(obj)
Loading