Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 34 additions & 24 deletions poller/pollers/meshcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,32 @@ async def run(self):
])

async def _run_source(self, src: dict):
headers = _api_headers(src.get("api_key"))
if src.get("companion"):
companions = [src["companion"]]
else:
companions = await _discover_all_companions(src["base_url"], headers)
if companions:
src["companion"] = companions[0]

if companions:
logger.info(
"[meshcore] %d companion(s) at %s: %s",
len(companions), src["base_url"], ", ".join(companions),
)
sse_tasks = [
asyncio.create_task(self._sse_loop(src, c)) for c in companions
]
else:
logger.info(
"[meshcore] no companions found at %s — running poll-only mode",
src["base_url"],
)
sse_tasks = []

await asyncio.gather(
asyncio.create_task(self._poll_loop(src)),
asyncio.create_task(self._sse_loop(src)),
*sse_tasks,
)

async def _poll_loop(self, src: dict):
Expand Down Expand Up @@ -122,28 +145,13 @@ async def _poll_once(self, src: dict):
except Exception as exc:
logger.debug("[meshcore] stats fetch error: %s", exc)

async def _sse_loop(self, src: dict):
"""Stream real-time events from the companion SSE endpoint.
async def _sse_loop(self, src: dict, companion_name: str):
"""Stream real-time events from a single companion SSE endpoint.

Discovers the first companion via /api/companion/index, then subscribes
to its event stream. Reconnects automatically on disconnect or error.
Silently exits if no companion is configured on the repeater.
Reconnects automatically on disconnect or error.
"""
base_url = src["base_url"]
headers = _api_headers(src.get("api_key"))

companion_name = src.get("companion")
if not companion_name:
companion_name = await _discover_companion(base_url, headers)
if companion_name:
src["companion"] = companion_name

if not companion_name:
logger.info(
"[meshcore] no companion found at %s — running poll-only mode", base_url
)
return

sse_url = f"{base_url}/api/companion/events"
params = {"companion": companion_name}

Expand Down Expand Up @@ -269,21 +277,23 @@ def _iter_items(payload) -> list:
return []


async def _discover_companion(base_url: str, headers: dict) -> str | None:
"""Return the name of the first companion from /api/companion/index, or None."""
async def _discover_all_companions(base_url: str, headers: dict) -> list[str]:
"""Return all companion names from /api/companion/index."""
try:
async with httpx.AsyncClient(headers=headers, timeout=10) as client:
resp = await client.get(f"{base_url}/api/companion/index")
if resp.status_code != 200:
return None
return []
names: list[str] = []
for item in _iter_items(resp.json()):
if isinstance(item, dict):
name = item.get("name") or item.get("identity_name")
if name:
return str(name)
names.append(str(name))
return names
except Exception as exc:
logger.debug("[meshcore] companion discovery error: %s", exc)
return None
return []


def _extract_links_from_packets(payload, source_url: str) -> list[dict]:
Expand Down
Loading