Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions src/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,15 @@
from src.commands.factory import CommandFactory
from src.data_classes import MeshNode
from src.helpers import pretty_print_last_heard, safe_encode_node_name
from src.packet_log import log_incoming_packet
from src.persistence.commands_logger import AbstractCommandLogger
from src.persistence.node_db import AbstractNodeDB
from src.persistence.node_info import AbstractNodeInfoStore
from src.packet_log import log_incoming_packet
from src.persistence.packet_dump import dump_packet
from src.persistence.user_prefs import AbstractUserPrefsPersistence
from src.radio.errors import call_safely, get_global_error_counter
from src.radio.events import (
ConnectionEstablished,
IncomingPacket,
IncomingTextMessage,
NodeUpdate,
)
from src.radio.events import (ConnectionEstablished, IncomingPacket,
IncomingTextMessage, NodeUpdate)
from src.radio.interface import RadioHandlers, RadioInterface
from src.responders.responder_factory import ResponderFactory

Expand Down Expand Up @@ -97,10 +93,9 @@ def disconnect(self) -> None:

def on_apply_mc_channel_config(self, channels: list) -> None:
"""Handle apply_mc_channel_config from WebSocket (MeshCore feeders)."""
from src.meshcore.channel_sync import (
apply_channels_on_device,
sync_channels_to_api,
)
from src.meshcore.channel_sync import (apply_channels_on_device,
sync_channels_after_apply,
sync_channels_to_api)

if not hasattr(self.radio, "run_coroutine"):
logger.warning(
Expand All @@ -110,10 +105,12 @@ def on_apply_mc_channel_config(self, channels: list) -> None:
if not apply_channels_on_device(self.radio, channels):
return
if hasattr(self.radio, "schedule_channel_sync"):
self.radio.schedule_channel_sync(self.storage_apis)
self.radio.schedule_channel_sync(self.storage_apis, scope_hints=channels)
elif self.storage_apis:
sync_channels_after_apply(self.radio, self.storage_apis, channels)
else:
for storage_api in self.storage_apis:
sync_channels_to_api(self.radio, storage_api)
sync_channels_to_api(self.radio, storage_api, scope_hints=channels)

def on_traceroute_command(self, target_node_id: int) -> None:
"""Handle a traceroute command (e.g. delivered via WebSocket)."""
Expand Down
60 changes: 47 additions & 13 deletions src/meshcore/channel_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@

CHANNEL_READ_DELAY_S = 2.0

from src.meshcore.channels import (
log_device_channels,
read_device_channels,
snapshot_sync_body,
)
from src.meshcore.channels import (log_device_channels, read_device_channels,
snapshot_sync_body)

if TYPE_CHECKING:
from src.api.StorageAPI import StorageAPIWrapper
Expand All @@ -21,7 +18,11 @@
logger = logging.getLogger(__name__)


async def read_channel_snapshot_async(radio: "MeshCoreRadio") -> Optional[dict]:
async def read_channel_snapshot_async(
radio: "MeshCoreRadio",
*,
scope_hints: list[dict] | None = None,
) -> Optional[dict]:
"""Read the device channel table once; return mc-channel-sync body or None."""
if not radio.is_connected:
logger.warning("MeshCore channel read skipped: radio not connected")
Expand All @@ -33,7 +34,7 @@ async def read_channel_snapshot_async(radio: "MeshCoreRadio") -> Optional[dict]:
else:
try:
await asyncio.sleep(CHANNEL_READ_DELAY_S)
channels = await read_device_channels(mc)
channels = await read_device_channels(mc, scope_hints=scope_hints)
except Exception as exc:
logger.exception("MeshCore read_device_channels failed: %s", exc)
return None
Expand Down Expand Up @@ -62,36 +63,47 @@ def post_channel_snapshot(storage: "StorageAPIWrapper", body: dict) -> bool:


async def sync_channels_to_storage_apis_async(
radio: "MeshCoreRadio", storage_apis: list["StorageAPIWrapper"]
radio: "MeshCoreRadio",
storage_apis: list["StorageAPIWrapper"],
*,
scope_hints: list[dict] | None = None,
) -> None:
"""Read device channels once and POST the same snapshot to every configured API."""
if not storage_apis:
return
body = await read_channel_snapshot_async(radio)
body = await read_channel_snapshot_async(radio, scope_hints=scope_hints)
if body is None:
return
for storage in storage_apis:
post_channel_snapshot(storage, body)


async def sync_channels_to_api_async(
radio: "MeshCoreRadio", storage: "StorageAPIWrapper"
radio: "MeshCoreRadio",
storage: "StorageAPIWrapper",
*,
scope_hints: list[dict] | None = None,
) -> bool:
"""Read channels on the MeshCore asyncio loop and POST mc-channel-sync to one API."""
body = await read_channel_snapshot_async(radio)
body = await read_channel_snapshot_async(radio, scope_hints=scope_hints)
if body is None:
return False
return post_channel_snapshot(storage, body)


def sync_channels_to_api(radio: "MeshCoreRadio", storage: "StorageAPIWrapper") -> bool:
def sync_channels_to_api(
radio: "MeshCoreRadio",
storage: "StorageAPIWrapper",
*,
scope_hints: list[dict] | None = None,
) -> bool:
"""Sync from a non-radio thread (e.g. WebSocket worker). Do not call from the radio loop."""
if not hasattr(radio, "run_coroutine"):
logger.warning("MeshCore channel sync skipped: radio has no run_coroutine")
return False
try:
return radio.run_coroutine(
sync_channels_to_api_async(radio, storage),
sync_channels_to_api_async(radio, storage, scope_hints=scope_hints),
timeout=120.0,
)
except Exception as exc:
Expand All @@ -115,3 +127,25 @@ async def _apply():
except Exception as exc:
logger.exception("MeshCore apply_device_channels failed: %s", exc)
return False


def sync_channels_after_apply(
radio: "MeshCoreRadio",
storage_apis: list["StorageAPIWrapper"],
applied_channels: list[dict],
) -> None:
"""Re-read device channels and merge applied region_scope before posting to APIs."""
if not hasattr(radio, "run_coroutine"):
logger.warning("MeshCore post-apply sync skipped: radio has no run_coroutine")
return
try:
radio.run_coroutine(
sync_channels_to_storage_apis_async(
radio,
storage_apis,
scope_hints=applied_channels,
),
timeout=120.0,
)
except Exception as exc:
logger.exception("MeshCore post-apply channel sync failed: %s", exc)
113 changes: 97 additions & 16 deletions src/meshcore/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,69 @@
DEFAULT_MAX_CHANNEL_SCAN = 16


def _normalize_region_scope(value: Any) -> str | None:
if value is None:
return None
raw = str(value).strip().lower().lstrip("#")
if not raw or raw in ("*", "none", "null"):
return None
return raw[:29]


def merge_channel_region_scopes(
device_channels: list[dict],
intent_channels: list[dict] | None,
) -> list[dict]:
"""
Overlay region_scope from an apply/sync intent onto device snapshot rows.

Companion CHANNEL_INFO does not return per-channel scope yet; after apply we
must carry scope from the operator payload so the API does not create duplicate
unscoped canonical rows.
"""
if not intent_channels:
return device_channels

intent_by_idx: dict[int, dict] = {}
for row in intent_channels:
if row.get("mc_channel_idx") is None:
continue
intent_by_idx[int(row["mc_channel_idx"])] = row

merged: list[dict] = []
for entry in device_channels:
row = dict(entry)
idx = int(row["mc_channel_idx"])
intent = intent_by_idx.get(idx)
if intent is not None and "region_scope" in intent:
row["region_scope"] = _normalize_region_scope(intent.get("region_scope"))
merged.append(row)
return merged


def _channel_entry_from_info(idx: int, payload: dict) -> Optional[dict]:
name = str(payload.get("channel_name", "") or "").strip()
if not name:
return None
scope = (
payload.get("region_scope")
or payload.get("flood_scope")
or payload.get("scope_name")
)
region_scope = _normalize_region_scope(scope)
if name.startswith("#"):
tag = name.lstrip("#")[:100] or f"channel {idx}"
return {
"mc_channel_idx": idx,
"name": name.lstrip("#")[:100] or f"channel {idx}",
"name": tag,
"mc_channel_type": "HASHTAG",
"mc_hashtag": name.lstrip("#")[:64],
"region_scope": region_scope,
}
return {
"mc_channel_idx": idx,
"name": name[:100],
"mc_channel_type": "PUBLIC",
"mc_hashtag": None,
"region_scope": region_scope,
}


Expand All @@ -40,14 +87,40 @@ def _describe_channel_event(idx: int, evt: Any) -> str:
if evt.type == EventType.CHANNEL_INFO:
payload = evt.payload if isinstance(evt.payload, dict) else {}
name = payload.get("channel_name", "")
scope = payload.get("region_scope") or payload.get("scope_name")
if scope:
return f"[{idx}] CHANNEL_INFO name={name!r} region_scope={scope!r}"
return f"[{idx}] CHANNEL_INFO name={name!r}"
return f"[{idx}] {evt.type}"


async def _apply_active_flood_scope(
meshcore: MeshCore, region_scope: str | None
) -> None:
"""
Set the companion active flood scope (CMD_SET_FLOOD_SCOPE / set_flood_scope).

This is the operator-facing scope used when sending on the active channel slot;
firmware does not yet return per-channel scope in CHANNEL_INFO.
"""
set_scope = getattr(getattr(meshcore, "commands", None), "set_flood_scope", None)
if set_scope is None:
logger.warning(
"meshcore.commands.set_flood_scope unavailable; region_scope=%r not applied to radio",
region_scope,
)
return
scope_arg = region_scope if region_scope else "*"
evt = await set_scope(scope_arg)
if evt.type == EventType.ERROR:
logger.warning("set_flood_scope(%r) failed: %s", scope_arg, evt.payload)


async def read_device_channels(
meshcore: MeshCore,
*,
max_channels: int = DEFAULT_MAX_CHANNEL_SCAN,
scope_hints: list[dict] | None = None,
) -> list[dict]:
"""Return channel snapshot rows for API mc-channel-sync."""
channels: list[dict] = []
Expand All @@ -74,7 +147,7 @@ async def read_device_channels(
"MeshCore get_channel scan found 0 named channels: %s",
"; ".join(scan_lines) or "(no responses)",
)
return channels
return merge_channel_region_scopes(channels, scope_hints)


async def apply_device_channels(meshcore: MeshCore, channels: list[dict]) -> None:
Expand All @@ -83,12 +156,22 @@ async def apply_device_channels(meshcore: MeshCore, channels: list[dict]) -> Non
idx = int(ch["mc_channel_idx"])
name = str(ch.get("name") or f"channel {idx}")
ch_type = str(ch.get("mc_channel_type", "PUBLIC")).upper()
region_scope = _normalize_region_scope(ch.get("region_scope"))
if ch_type == "HASHTAG":
tag = str(ch.get("mc_hashtag") or name).lstrip("#")
tag = str(ch.get("name") or name).lstrip("#")
name = f"#{tag}"
evt = await meshcore.commands.set_channel(idx, name)
if evt.type == EventType.ERROR:
logger.warning("set_channel(%s) failed: %s", idx, evt.payload)
continue
await _apply_active_flood_scope(meshcore, region_scope)
if region_scope:
logger.info(
"MeshCore channel [%s] set name=%r active flood scope=%r",
idx,
ch.get("name"),
region_scope,
)


def log_device_channels(channels: list[dict]) -> None:
Expand All @@ -101,17 +184,15 @@ def log_device_channels(channels: list[dict]) -> None:
idx = ch["mc_channel_idx"]
typ = ch.get("mc_channel_type", "?")
name = ch.get("name", "")
tag = ch.get("mc_hashtag")
if tag:
logger.info(
" [%s] %s name=%r hashtag=%r",
idx,
typ,
name,
tag,
)
else:
logger.info(" [%s] %s name=%r", idx, typ, name)
scope = ch.get("region_scope")
scope_label = scope if scope else "(none)"
logger.info(
" [%s] %s name=%r region_scope=%s",
idx,
typ,
name,
scope_label,
)


def snapshot_sync_body(channels: list[dict]) -> dict:
Expand Down
Loading