From 2a53dd99d8fdc96dd2682fcdb252feadce5c4889 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 16 Jun 2026 15:45:39 +0000 Subject: [PATCH] Subscribe to all pyMC companion channels, not just the first _discover_all_companions now returns every companion from /api/companion/index instead of stopping at the first match. _run_source spawns one SSE task per companion so messages from emcomm, weather, and other channels are received alongside public. https://claude.ai/code/session_01E6BhzRJuivUEz7uxxoygHT --- poller/pollers/meshcore.py | 58 ++++++++++++++++++++++---------------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/poller/pollers/meshcore.py b/poller/pollers/meshcore.py index d5bb377..128aa71 100644 --- a/poller/pollers/meshcore.py +++ b/poller/pollers/meshcore.py @@ -68,9 +68,32 @@ async def run(self): ]) async def _run_source(self, src: dict): + headers = _api_headers(src.get("api_key")) + if src.get("companion"): + companions = [src["companion"]] + else: + companions = await _discover_all_companions(src["base_url"], headers) + if companions: + src["companion"] = companions[0] + + if companions: + logger.info( + "[meshcore] %d companion(s) at %s: %s", + len(companions), src["base_url"], ", ".join(companions), + ) + sse_tasks = [ + asyncio.create_task(self._sse_loop(src, c)) for c in companions + ] + else: + logger.info( + "[meshcore] no companions found at %s — running poll-only mode", + src["base_url"], + ) + sse_tasks = [] + await asyncio.gather( asyncio.create_task(self._poll_loop(src)), - asyncio.create_task(self._sse_loop(src)), + *sse_tasks, ) async def _poll_loop(self, src: dict): @@ -122,28 +145,13 @@ async def _poll_once(self, src: dict): except Exception as exc: logger.debug("[meshcore] stats fetch error: %s", exc) - async def _sse_loop(self, src: dict): - """Stream real-time events from the companion SSE endpoint. + async def _sse_loop(self, src: dict, companion_name: str): + """Stream real-time events from a single companion SSE endpoint. - Discovers the first companion via /api/companion/index, then subscribes - to its event stream. Reconnects automatically on disconnect or error. - Silently exits if no companion is configured on the repeater. + Reconnects automatically on disconnect or error. """ base_url = src["base_url"] headers = _api_headers(src.get("api_key")) - - companion_name = src.get("companion") - if not companion_name: - companion_name = await _discover_companion(base_url, headers) - if companion_name: - src["companion"] = companion_name - - if not companion_name: - logger.info( - "[meshcore] no companion found at %s — running poll-only mode", base_url - ) - return - sse_url = f"{base_url}/api/companion/events" params = {"companion": companion_name} @@ -269,21 +277,23 @@ def _iter_items(payload) -> list: return [] -async def _discover_companion(base_url: str, headers: dict) -> str | None: - """Return the name of the first companion from /api/companion/index, or None.""" +async def _discover_all_companions(base_url: str, headers: dict) -> list[str]: + """Return all companion names from /api/companion/index.""" try: async with httpx.AsyncClient(headers=headers, timeout=10) as client: resp = await client.get(f"{base_url}/api/companion/index") if resp.status_code != 200: - return None + return [] + names: list[str] = [] for item in _iter_items(resp.json()): if isinstance(item, dict): name = item.get("name") or item.get("identity_name") if name: - return str(name) + names.append(str(name)) + return names except Exception as exc: logger.debug("[meshcore] companion discovery error: %s", exc) - return None + return [] def _extract_links_from_packets(payload, source_url: str) -> list[dict]: