From 8e3ac696d1d89b84331617b1c7e273364ffac014 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 15 Jun 2026 04:03:01 +0000 Subject: [PATCH] Replace RemoteTerm meshcore poller with pyMC-Repeater Drops the RemoteTerm HTTP/WebSocket integration and the direct-device meshcore_pymc poller in favour of the pyMC-Repeater REST + SSE API. - poller/pollers/meshcore.py: complete rewrite targeting pyMC-Repeater. Periodic REST poll of /api/adverts_by_contact_type (node discovery), /api/recent_packets (SNR/link metrics), and /api/stats (health). SSE stream from /api/companion/events for real-time advert, message, and path-update events; auto-discovers companion via /api/companion/index. Auth via X-API-Key extracted from URL username (http://APIKEY@host:port). - poller/pollers/meshcore_pymc.py: deleted (superseded by the repeater). - poller/normalizers/mesh_node.py: replace normalize_remoteterm_contact() with normalize_pymc_repeater_advert() mapping pyMC-Repeater advert fields (public_key, name, gps_lat, gps_lon, last_advert_timestamp, out_path_len). - poller/main.py: remove MeshCorePymcPoller import and instantiation. - backend/routers/admin_debug.py: update meshcore probe to check /api/stats, /api/adverts_by_contact_type, and /api/recent_packets instead of RemoteTerm endpoints; switch auth to X-API-Key header; add extra_headers param to _http_get_check. - config/sources.example.yml: update meshcore entry to reference pyMC-Repeater with API-key auth instructions. https://claude.ai/code/session_01WfXRRhUewL5qjgpk6PdaSD --- backend/routers/admin_debug.py | 78 ++- config/sources.example.yml | 7 +- poller/main.py | 2 - poller/normalizers/mesh_node.py | 106 ++-- poller/pollers/meshcore.py | 839 ++++++++++++-------------------- poller/pollers/meshcore_pymc.py | 272 ----------- 6 files changed, 417 insertions(+), 887 deletions(-) delete mode 100644 poller/pollers/meshcore_pymc.py diff --git a/backend/routers/admin_debug.py b/backend/routers/admin_debug.py index 33fce3a..bd3a905 100644 --- a/backend/routers/admin_debug.py +++ b/backend/routers/admin_debug.py @@ -98,11 +98,38 @@ async def _resolve_source(db: AsyncSession, source_type: str, source_url: Option return dict(source) -async def _http_get_check(url: str, auth: Optional[httpx.BasicAuth] = None, timeout: float = 10.0) -> tuple[dict, object | None]: +def _summarize_list(payload) -> str: + items = payload if isinstance(payload, list) else ( + payload.get("data") or payload.get("items") or [] + if isinstance(payload, dict) else [] + ) + return f"{len(items)} item(s)" + + +def _summarize_stats(payload) -> str: + if not isinstance(payload, dict): + return "" + connected = payload.get("radio_connected", payload.get("connected")) + site = payload.get("site_name", "") + version = payload.get("version", "") + parts = [p for p in [ + f"connected={connected}" if connected is not None else None, + f"site={site}" if site else None, + f"v{version}" if version else None, + ] if p] + return " ".join(parts) + + +async def _http_get_check( + url: str, + auth: Optional[httpx.BasicAuth] = None, + extra_headers: Optional[dict[str, str]] = None, + timeout: float = 10.0, +) -> tuple[dict, object | None]: t0 = time.perf_counter() try: validate_safe_url(url, allowed_schemes={"http", "https"}) - async with httpx.AsyncClient(auth=auth, timeout=timeout) as client: + async with httpx.AsyncClient(auth=auth, headers=extra_headers or {}, timeout=timeout) as client: resp = await client.get(url) latency_ms = round((time.perf_counter() - t0) * 1000, 1) payload = None @@ -315,25 +342,23 @@ async def probe_remote_feed(body: RemoteFeedProbeRequest, db: AsyncSession = Dep recommendations: list[str] = [] if body.source_type == "meshcore": - for name, path in (("health", "/api/health"), ("contacts", "/api/contacts"), ("neighbors", "/api/neighbors")): - check, payload = await _http_get_check(f"{base_url}{path}", auth=httpx_auth) - if isinstance(payload, list): - check["summary"] = f"{len(payload)} item(s)" - elif isinstance(payload, dict): - if path == "/api/health": - connected = payload.get("radio_connected", payload.get("connected", payload.get("radio_ok"))) - check["summary"] = f"radio_connected={connected}" - elif "items" in payload and isinstance(payload.get("items"), list): - check["summary"] = f"{len(payload.get('items', []))} item(s)" - elif "contacts" in payload and isinstance(payload.get("contacts"), list): - check["summary"] = f"{len(payload.get('contacts', []))} contact(s)" - elif "neighbors" in payload and isinstance(payload.get("neighbors"), list): - check["summary"] = f"{len(payload.get('neighbors', []))} neighbor(s)" - + # pyMC-Repeater uses X-API-Key auth (URL username = API key) + pymc_headers = {} + if auth: + pymc_headers["X-API-Key"] = auth[0] + + for name, path, summarize in ( + ("stats", "/api/stats", _summarize_stats), + ("adverts", "/api/adverts_by_contact_type", _summarize_list), + ("packets", "/api/recent_packets", _summarize_list), + ): + check, payload = await _http_get_check( + f"{base_url}{path}", extra_headers=pymc_headers + ) + if payload is not None: + check["summary"] = summarize(payload) checks.append({"name": name, "protocol": "http", **check}) - ws = await _probe_ws(_to_ws_url(base_url) + "/api/ws", body.duration_seconds, headers=headers) - stats_row = await db.execute( text( "SELECT COUNT(*) AS total_count, " @@ -350,15 +375,14 @@ async def probe_remote_feed(body: RemoteFeedProbeRequest, db: AsyncSession = Dep "latest_timestamp": stats.get("latest_ts").isoformat() if stats.get("latest_ts") else None, } - neighbors = next((c for c in checks if c.get("name") == "neighbors"), None) - if not ws.get("connected"): - recommendations.append("WebSocket connection failed; verify RemoteTerm reachability and auth.") - if ws.get("connected") and ws.get("event_counts", {}).get("message", 0) == 0: - recommendations.append("No message events observed; RemoteTerm may be emitting only telemetry (health/contact/raw_packet).") - if neighbors and neighbors.get("status_code") == 404: - recommendations.append("/api/neighbors returned 404; topology enrichment via neighbor endpoint is unavailable.") + stats_check = next((c for c in checks if c.get("name") == "stats"), None) + adverts_check = next((c for c in checks if c.get("name") == "adverts"), None) + if stats_check and not stats_check.get("ok"): + recommendations.append("Could not reach /api/stats — verify the repeater URL and that pyMC-Repeater is running.") + if adverts_check and adverts_check.get("status_code") == 401: + recommendations.append("/api/adverts_by_contact_type returned 401; embed the API key in the source URL as http://API_KEY@host:port.") if storage["total_messages"] == 0: - recommendations.append("No persisted mesh messages found for this source yet.") + recommendations.append("No persisted mesh messages found; messages arrive via SSE only when a companion identity is configured on the repeater.") elif body.source_type == "adsb": check, payload = await _http_get_check(base_url, auth=httpx_auth) diff --git a/config/sources.example.yml b/config/sources.example.yml index ac2c093..aeb173b 100644 --- a/config/sources.example.yml +++ b/config/sources.example.yml @@ -86,10 +86,11 @@ poller_sources: enabled: true source: config - - type: meshcore # MeshCore — Remote-Terminal-for-MeshCore base URL - name: "RemoteTerm" # https://github.com/jkingsman/Remote-Terminal-for-MeshCore + - type: meshcore # MeshCore — pyMC-Repeater base URL + name: "pyMC-Repeater" # https://github.com/pyMC-dev/pyMC_Repeater url: "http://192.168.1.x:8000" - # Embed credentials if Basic Auth is enabled: http://user:pass@192.168.1.x:8000 + # Embed an API key as the URL username if auth is enabled: + # http://MY_API_KEY@192.168.1.x:8000 enabled: true source: config diff --git a/poller/main.py b/poller/main.py index a5bc11d..04c3c93 100644 --- a/poller/main.py +++ b/poller/main.py @@ -14,7 +14,6 @@ from pollers.utilities import UtilityPoller from pollers.p25 import P25Poller from pollers.meshcore import MeshCorePoller -from pollers.meshcore_pymc import MeshCorePymcPoller from pollers.summary import AISummaryPoller from pollers.seismic import SeismicPoller from pollers.fire import FirePoller @@ -76,7 +75,6 @@ async def main(): UtilityPoller(), P25Poller(), MeshCorePoller(), - MeshCorePymcPoller(), AISummaryPoller(), SeismicPoller(), FirePoller(), diff --git a/poller/normalizers/mesh_node.py b/poller/normalizers/mesh_node.py index a33c796..e7d085d 100644 --- a/poller/normalizers/mesh_node.py +++ b/poller/normalizers/mesh_node.py @@ -21,6 +21,57 @@ def snr_to_quality(snr) -> float | None: } +def normalize_pymc_repeater_advert(data: dict, source_url: str) -> Optional[dict]: + """Normalize a pyMC-Repeater advert to a canonical mesh_node entity.""" + pub_key = (data.get("public_key") or "").strip() + if not pub_key or pub_key == "0" * 64: + return None + + name = (data.get("name") or "").strip() or pub_key[:12] + + contact_type_raw = data.get("contact_type") or data.get("type") or 0 + if isinstance(contact_type_raw, int): + contact_type = _CONTACT_TYPES.get(contact_type_raw, "unknown") + else: + contact_type = str(contact_type_raw) + + lat = data.get("gps_lat") or data.get("lat") + lon = data.get("gps_lon") or data.get("lon") + if lat == 0.0 and lon == 0.0: + lat = lon = None + + last_advert = data.get("last_advert_timestamp") or data.get("lastmod") + if isinstance(last_advert, (int, float)): + last_seen = datetime.fromtimestamp(last_advert, tz=timezone.utc).isoformat() + else: + last_seen = _now() + + out_path_len = data.get("out_path_len") + status = f"hops:{out_path_len}" if out_path_len is not None else "" + + return { + "entity_id": f"mesh_node:{pub_key}", + "entity_type": "mesh_node", + "source": "meshcore", + "display_name": name, + "identity": { + "public_key": pub_key, + "node_id": pub_key[:12], + "short_name": name[:12], + "contact_type": contact_type, + "source_url": source_url, + "out_path": data.get("out_path"), + "out_path_len": out_path_len, + }, + "lat": lat, + "lon": lon, + "altitude": None, + "status": status, + "last_seen": last_seen, + "tags": ["mesh_node", contact_type], + } + + def normalize_mesh_node(data: dict) -> Optional[dict]: """Normalize a MeshCore bridge node_update payload to canonical Entity.""" node_id = data.get("entity_id", "").replace("mesh_node:", "") or data.get("identity", {}).get("node_id") @@ -49,61 +100,6 @@ def normalize_mesh_node(data: dict) -> Optional[dict]: } -def normalize_remoteterm_contact(data: dict) -> Optional[dict]: - """Normalize a RemoteTerm /api/contacts entry to canonical Entity.""" - pub_key = data.get("public_key", "") - if not pub_key: - return None - - node_type = _CONTACT_TYPES.get(data.get("type", 0), "unknown") - name = data.get("name") or pub_key[:12] - - last_seen_raw = data.get("last_seen") - if isinstance(last_seen_raw, (int, float)): - last_seen = datetime.fromtimestamp(last_seen_raw, tz=timezone.utc).isoformat() - elif isinstance(last_seen_raw, str): - last_seen = last_seen_raw - else: - last_seen = _now() - - tags = ["mesh_node", node_type] - if data.get("on_radio"): - tags.append("on_radio") - if data.get("favorite"): - tags.append("favorite") - - return { - "entity_id": f"mesh_node:{pub_key}", - "entity_type": "mesh_node", - "source": "meshcore", - "display_name": name, - "identity": { - "public_key": pub_key, - "node_id": pub_key[:12], - "short_name": pub_key[:12], - "contact_type": node_type, - "hw_model": None, - "on_radio": data.get("on_radio", False), - "favorite": data.get("favorite", False), - "battery_level": data.get("battery_level"), - }, - "lat": data.get("lat"), - "lon": data.get("lon"), - "altitude": None, - "status": _remoteterm_status(data), - "last_seen": last_seen, - "tags": tags, - } - - -def _remoteterm_status(contact: dict) -> str: - parts = [] - effective = contact.get("effective_route") - if isinstance(effective, dict) and (hops := effective.get("hop_count")) is not None: - parts.append(f"hops:{hops}") - return " ".join(parts) - - def _bridge_status(node: dict) -> str: parts = [] if (v := node.get("battery_pct")) is not None: diff --git a/poller/pollers/meshcore.py b/poller/pollers/meshcore.py index ea0e676..77f1fab 100644 --- a/poller/pollers/meshcore.py +++ b/poller/pollers/meshcore.py @@ -1,16 +1,20 @@ """ -MeshCore poller — connects to a Remote-Terminal-for-MeshCore instance. +MeshCore poller — connects to a pyMC-Repeater instance. -Contacts are fetched via REST on startup (and re-synced every 60 s) and kept -live via the RemoteTerm WebSocket event stream (contact, message, health). +Polls the pyMC-Repeater REST API for node adverts, packet SNR metrics, and +system health. Optionally streams real-time events from the companion SSE +endpoint when a companion identity is configured on the repeater. -Configure poller_sources with type=meshcore and the RemoteTerm base URL: +Configure poller_sources with type=meshcore and the repeater base URL: http://192.168.1.x:8000 -Credentials can be embedded: http://user:pass@192.168.1.x:8000 +Embed an API key as the URL username to authenticate: + http://MY_API_KEY@192.168.1.x:8000 + +See: https://github.com/pyMC-dev/pyMC_Repeater """ import asyncio -import base64 +import datetime import hashlib import json import logging @@ -18,36 +22,30 @@ from urllib.parse import urlparse, urlunparse import httpx -import websockets from bus import get_bus, publish_entity, set_feed -from normalizers.mesh_node import normalize_remoteterm_contact, snr_to_quality +from normalizers.mesh_node import normalize_pymc_repeater_advert, snr_to_quality from sanitize import sanitize_payload from .base import BasePoller logger = logging.getLogger(__name__) -_CONTACT_POLL_INTERVAL = 60 -_RETRY_DELAY = 10 -_WS_PING_INTERVAL = 30 -_HEALTH_INFO_LOG_INTERVAL = 300 - -_last_health_info_log_ts: dict[str, float] = {} -_last_health_connected: dict[str, bool] = {} +_POLL_INTERVAL = 60 +_RETRY_DELAY = 15 +_PACKET_LIMIT = 200 class MeshCorePoller(BasePoller): + """pyMC-Repeater poller: adverts, SNR metrics, and system health via REST + SSE.""" + name = "meshcore" - interval = _CONTACT_POLL_INTERVAL + interval = _POLL_INTERVAL def __init__(self): self._sources: list[dict] = [] - self._local_node_ids: dict[str, str] = {} # base_url -> entity_id - self._last_link_update: dict[str, float] = {} # link_key -> timestamp - self._channel_maps: dict[str, dict[str, str]] = {} # base_url -> {channel_key: display_name} async def poll(self): - pass # streaming + periodic REST — overrides run() + pass # overrides run() async def setup(self): from db import get_pool @@ -56,7 +54,7 @@ async def setup(self): ) self._sources = [_parse_source(row["url"]) for row in rows] if self._sources: - logger.info("[meshcore] %d RemoteTerm source(s)", len(self._sources)) + logger.info("[meshcore] %d pyMC-Repeater source(s)", len(self._sources)) else: logger.warning("[meshcore] no MeshCore source configured — poller inactive") @@ -71,590 +69,375 @@ async def run(self): async def _run_source(self, src: dict): await asyncio.gather( - asyncio.create_task(self._contact_poll_loop(src)), - asyncio.create_task(self._ws_loop(src)), + asyncio.create_task(self._poll_loop(src)), + asyncio.create_task(self._sse_loop(src)), ) - async def _contact_poll_loop(self, src: dict): + async def _poll_loop(self, src: dict): while True: try: - await self._fetch_contacts(src) + await self._poll_once(src) await self._heartbeat("ok") except Exception as exc: - logger.error("[meshcore] contact fetch error: %s", exc) + logger.error("[meshcore] poll error %s: %s", src["base_url"], exc) await self._heartbeat("error", str(exc)[:256]) + await asyncio.sleep(_POLL_INTERVAL) + + async def _poll_once(self, src: dict): + base_url = src["base_url"] + headers = _api_headers(src.get("api_key")) + + async with httpx.AsyncClient(headers=headers, timeout=15) as client: + # Node adverts + try: + resp = await client.get(f"{base_url}/api/adverts_by_contact_type") + if resp.status_code == 200: + count = 0 + for advert in _iter_items(resp.json()): + entity = normalize_pymc_repeater_advert(advert, base_url) + if entity: + await publish_entity(entity) + count += 1 + logger.debug("[meshcore] synced %d adverts from %s", count, base_url) + except Exception as exc: + logger.debug("[meshcore] advert fetch error: %s", exc) + + # Recent packets → SNR / RSSI link metrics try: - neighbors = await _fetch_neighbors(src) - if neighbors: - await _upsert_mesh_links(src["base_url"], neighbors) - logger.debug("[meshcore] upserted %d links from %s", len(neighbors), src["base_url"]) + resp = await client.get( + f"{base_url}/api/recent_packets", params={"limit": _PACKET_LIMIT} + ) + if resp.status_code == 200: + links = _extract_links_from_packets(resp.json(), base_url) + if links: + await _upsert_mesh_links(links) except Exception as exc: - logger.debug("[meshcore] neighbor fetch skipped: %s", exc) + logger.debug("[meshcore] packet fetch error: %s", exc) + + # System health try: - self._channel_maps[src["base_url"]] = await _fetch_channels(src) + resp = await client.get(f"{base_url}/api/stats") + if resp.status_code == 200: + await _publish_health(resp.json(), base_url) except Exception as exc: - logger.debug("[meshcore] channel fetch skipped: %s", exc) - await asyncio.sleep(_CONTACT_POLL_INTERVAL) + logger.debug("[meshcore] stats fetch error: %s", exc) + + async def _sse_loop(self, src: dict): + """Stream real-time events from the companion SSE endpoint. + + Discovers the first companion via /api/companion/index, then subscribes + to its event stream. Reconnects automatically on disconnect or error. + Silently exits if no companion is configured on the repeater. + """ + base_url = src["base_url"] + headers = _api_headers(src.get("api_key")) + + companion_name = await _discover_companion(base_url, headers) + if not companion_name: + logger.info( + "[meshcore] no companion found at %s — running poll-only mode", base_url + ) + return - async def _ws_loop(self, src: dict): - ws_url = _to_ws_url(src["base_url"]) + "/api/ws" - headers = _auth_headers(src.get("auth")) + sse_url = f"{base_url}/api/companion/events" + params = {"companion": companion_name} while True: try: - async with websockets.connect( - ws_url, - extra_headers=headers, - ping_interval=_WS_PING_INTERVAL, - ) as ws: - logger.info("[meshcore] WS connected: %s", ws_url) - await set_feed("mesh:status", {"connected": True, "url": src["base_url"]}) - async for raw in ws: - try: - await self._handle_ws_event(json.loads(raw), src["base_url"]) - except Exception as exc: - logger.debug("[meshcore] WS event error: %s", exc) + async with httpx.AsyncClient(headers=headers, timeout=None) as client: + async with client.stream("GET", sse_url, params=params) as resp: + if resp.status_code != 200: + logger.warning( + "[meshcore] SSE %s returned %d", sse_url, resp.status_code + ) + else: + logger.info("[meshcore] SSE connected: %s", sse_url) + await set_feed("mesh:status", {"connected": True, "url": base_url}) + event_type: str | None = None + async for raw_line in resp.aiter_lines(): + line = raw_line.strip() + if line.startswith("event:"): + event_type = line[6:].strip() + elif line.startswith("data:"): + payload_str = line[5:].strip() + if payload_str: + try: + await self._handle_sse_event( + event_type, json.loads(payload_str), base_url + ) + except Exception as exc: + logger.debug("[meshcore] SSE event error: %s", exc) + event_type = None except Exception as exc: logger.error( - "[meshcore] WS error (%s): %s — retry in %ds", ws_url, exc, _RETRY_DELAY + "[meshcore] SSE error (%s): %s — retry in %ds", + sse_url, exc, _RETRY_DELAY, ) - await set_feed("mesh:status", {"connected": False, "url": src["base_url"]}) + await set_feed("mesh:status", {"connected": False, "url": base_url}) await asyncio.sleep(_RETRY_DELAY) - async def _fetch_contacts(self, src: dict): - url = src["base_url"] + "/api/contacts" - auth = src.get("auth") - httpx_auth = httpx.BasicAuth(*auth) if auth else None - - async with httpx.AsyncClient(auth=httpx_auth, timeout=10) as client: - resp = await client.get(url) - resp.raise_for_status() - payload = resp.json() - - contacts = ( - payload - if isinstance(payload, list) - else payload.get("items", payload.get("contacts", [])) - ) - count = 0 - for contact in contacts: - entity = normalize_remoteterm_contact(contact) - if entity: - if contact.get("on_radio"): - self._local_node_ids[src["base_url"]] = entity["entity_id"] - await publish_entity(entity) - count += 1 - logger.debug("[meshcore] synced %d contacts from %s", count, src["base_url"]) - - async def _handle_ws_event(self, event: dict, base_url: str): - event_type = event.get("type") - data = event.get("data") or {} - - if event_type == "contact": - entity = normalize_remoteterm_contact(data) + async def _handle_sse_event(self, event_type: str | None, data: dict, base_url: str): + if event_type == "advert_received": + entity = normalize_pymc_repeater_advert(data, base_url) if entity: - if data.get("on_radio"): - self._local_node_ids[base_url] = entity["entity_id"] await publish_entity(entity) - elif event_type == "message": - message = _normalize_mesh_message( - data, - base_url, - self._channel_maps.get(base_url), - ) - - # Persist when possible, but never block real-time publication. + elif event_type in ("message_received", "channel_message_received"): + message = _normalize_repeater_message(data, base_url, event_type) try: await _save_mesh_message(message) except Exception as exc: - logger.warning( - "[meshcore] failed to persist mesh message id=%s conv=%s: %s", - message.get("id"), - message.get("conversation_key"), - exc, - ) - + logger.warning("[meshcore] persist message failed: %s", exc) r = await get_bus() await r.publish("civic:updates", json.dumps(sanitize_payload({ "type": "mesh_message", "data": message, }))) - elif event_type == "packet": - # Overheard raw packet — extract signal metrics for the sender - sender_id = data.get("from") - snr = data.get("rx_snr") - rssi = data.get("rx_rssi") - - if sender_id and snr is not None: - # Ensure sender ID matches the store format - node_b = f"mesh_node:{sender_id}" if not str(sender_id).startswith("mesh_node:") else str(sender_id) - link_key = f"{base_url}:local->{node_b}" - - # Throttle real-time link updates to max once per 10s - now = time.time() - if now - self._last_link_update.get(link_key, 0) < 10: - return - self._last_link_update[link_key] = now - - logger.debug("[meshcore] throttled link update from %s: snr=%s rssi=%s", sender_id, snr, rssi) - - # Stamp signal_quality on the sending node's entity - await publish_entity({ - "entity_id": node_b, - "entity_type": "mesh_node", - "source": "meshcore", - "signal_quality": snr_to_quality(snr), - "identity": {}, - "tags": ["mesh_node"], - }, merge=True, record_observation=False) - - local_id = self._local_node_ids.get(base_url, "local") - r = await get_bus() - await r.publish("civic:updates", json.dumps(sanitize_payload({ - "type": "mesh_links", - "data": [{ - "source_url": base_url, - "node_a": local_id, - "node_b": node_b, - "snr": snr, - "link_quality": None - }] - }))) - - elif event_type == "health": - logger.debug("[meshcore] raw health update: %s", data) - # RemoteTerm uses 'radio_connected' in the health packet - connected = data.get("radio_connected", data.get("connected", data.get("radio_ok", False))) - - # Extract additional stats if available - stats = data.get("radio_stats", {}) - battery_mv = stats.get("battery_mv") - uptime = stats.get("uptime_secs") - - # Map to UI expected fields - voltage = battery_mv / 1000.0 if battery_mv else None - # Simple Li-ion estimation: 4.2V = 100%, 3.5V = 0% - battery_level = None - if battery_mv: - battery_level = min(100, max(0, int((battery_mv - 3500) / (4200 - 3500) * 100))) - - now_mono = time.monotonic() - prev_connected = _last_health_connected.get(base_url) - last_info_ts = _last_health_info_log_ts.get(base_url, 0.0) - should_log_info = ( - prev_connected is None - or prev_connected != connected - or (now_mono - last_info_ts) >= _HEALTH_INFO_LOG_INTERVAL - ) - if should_log_info: - logger.info( - "[meshcore] health summary: url=%s connected=%s battery_mv=%s rssi=%s snr=%s queue=%s errors=%s uptime=%s", - base_url, - connected, - battery_mv, - stats.get("last_rssi"), - stats.get("last_snr"), - stats.get("queue_len"), - stats.get("errors"), - uptime, - ) - _last_health_info_log_ts[base_url] = now_mono - _last_health_connected[base_url] = connected - - payload = { - **data, - "connected": connected, - "url": base_url, - "voltage": voltage, - "battery_level": battery_level, - "uptime_secs": uptime - } - await set_feed("mesh:status", payload) - - # Also publish to bus for real-time frontend updates - r = await get_bus() - await r.publish("civic:updates", json.dumps(sanitize_payload({ - "type": "mesh_status", - "data": {"connected": connected, "url": base_url, **data} - }))) + elif event_type == "contact_path_updated": + entity = normalize_pymc_repeater_advert(data, base_url) + if entity: + await publish_entity(entity, merge=True, record_observation=False) - # If we know the local node ID, publish an entity update so the battery - # level is persisted and counted in the data quality metric. - local_id = self._local_node_ids.get(base_url) - if local_id and battery_level is not None: - await publish_entity({ - "entity_id": local_id, - "entity_type": "mesh_node", - "source": "meshcore", - "identity": { - "battery_level": battery_level - } - }, record_observation=False) - - -async def _fetch_neighbors(src: dict) -> list[dict]: - """Fetch neighbor/link-state data from RemoteTerm if the endpoint exists.""" - url = src["base_url"] + "/api/neighbors" - auth = src.get("auth") - httpx_auth = httpx.BasicAuth(*auth) if auth else None - try: - async with httpx.AsyncClient(auth=httpx_auth, timeout=10) as client: - resp = await client.get(url) - if resp.status_code == 404: - return [] - resp.raise_for_status() - payload = resp.json() - if isinstance(payload, list): - return payload - return payload.get("items", payload.get("neighbors", [])) - except Exception as exc: - logger.debug("[meshcore] neighbor fetch error: %s", exc) - return [] +# ─── helpers ────────────────────────────────────────────────────────────────── -async def _fetch_channels(src: dict) -> dict[str, str]: - """Fetch channel metadata from RemoteTerm and return key->display-name map.""" - url = src["base_url"] + "/api/channels" - auth = src.get("auth") - httpx_auth = httpx.BasicAuth(*auth) if auth else None - try: - async with httpx.AsyncClient(auth=httpx_auth, timeout=10) as client: - resp = await client.get(url) - if resp.status_code == 404: - return {} - resp.raise_for_status() - payload = resp.json() - except Exception as exc: - logger.debug("[meshcore] channel fetch error: %s", exc) +def _parse_source(url: str) -> dict: + """Extract API key from URL username; return clean base_url and api_key.""" + parsed = urlparse(url) + api_key = None + if parsed.username: + api_key = parsed.username + netloc = parsed.hostname + (f":{parsed.port}" if parsed.port else "") + url = urlunparse(parsed._replace(netloc=netloc)) + return {"base_url": url.rstrip("/"), "api_key": api_key} + + +def _api_headers(api_key: str | None) -> dict[str, str]: + if not api_key: return {} + return {"X-API-Key": api_key} + - items = payload +def _iter_items(payload) -> list: + if isinstance(payload, list): + return payload if isinstance(payload, dict): - items = payload.get("items", payload.get("channels", payload)) - - channel_map: dict[str, str] = {} - if isinstance(items, list): - for item in items: - if not isinstance(item, dict): - continue - key = _coalesce(item.get("key"), item.get("id"), item.get("channel"), item.get("name")) - name = _coalesce(item.get("name"), item.get("label"), item.get("display_name"), item.get("title"), key) - if key is None or name is None: - continue - channel_map[str(key)[:128]] = str(name)[:128] - elif isinstance(items, dict): - for key, value in items.items(): - if isinstance(value, dict): - name = _coalesce(value.get("name"), value.get("label"), value.get("display_name"), key) - else: - name = value - if name is None: - continue - channel_map[str(key)[:128]] = str(name)[:128] - - return channel_map - - -async def _upsert_mesh_links(source_url: str, neighbors: list[dict]) -> None: - from db import get_pool - import datetime - now = datetime.datetime.now(datetime.timezone.utc) - rows = [] - for n in neighbors: - node_a = str(n.get("node_id") or n.get("id") or "") - node_b = str(n.get("neighbor_id") or n.get("peer_id") or "") - if not node_a or not node_b: + for key in ("items", "adverts", "contacts", "data"): + v = payload.get(key) + if isinstance(v, list): + return v + return [] + + +async def _discover_companion(base_url: str, headers: dict) -> str | None: + """Return the name of the first companion from /api/companion/index, or None.""" + try: + async with httpx.AsyncClient(headers=headers, timeout=10) as client: + resp = await client.get(f"{base_url}/api/companion/index") + if resp.status_code != 200: + return None + for item in _iter_items(resp.json()): + if isinstance(item, dict): + name = item.get("name") or item.get("identity_name") + if name: + return str(name) + except Exception as exc: + logger.debug("[meshcore] companion discovery error: %s", exc) + return None + + +def _extract_links_from_packets(payload, source_url: str) -> list[dict]: + """Deduplicate per-sender SNR/RSSI from the recent-packet list.""" + packets = _iter_items(payload) + now = time.time() + links: list[dict] = [] + seen: set[str] = set() + + for pkt in packets: + if not isinstance(pkt, dict): continue - - # Ensure IDs match the 'mesh_node:{id}' format used in the entity store - if not node_a.startswith("mesh_node:"): - node_a = f"mesh_node:{node_a}" - if not node_b.startswith("mesh_node:"): - node_b = f"mesh_node:{node_b}" - - rows.append((source_url, node_a, node_b, n.get("snr"), n.get("link_quality"), now)) - if not rows: - return - pool = get_pool() - await pool.executemany( + pkt_data = pkt.get("data") or pkt + if not isinstance(pkt_data, dict): + continue + + snr = pkt_data.get("snr") or pkt_data.get("rx_snr") + rssi = pkt_data.get("rssi") or pkt_data.get("rx_rssi") + sender = ( + pkt_data.get("sender_pubkey") + or pkt_data.get("from_pubkey") + or pkt_data.get("sender") + or pkt_data.get("from") + ) + ts = pkt_data.get("timestamp") or pkt_data.get("rx_time") or now + + if not sender or snr is None: + continue + + node_b = ( + f"mesh_node:{sender}" + if not str(sender).startswith("mesh_node:") + else str(sender) + ) + if node_b in seen: + continue + seen.add(node_b) + + age_secs = max(0.0, now - float(ts)) if isinstance(ts, (int, float)) else 0.0 + links.append({ + "source_url": source_url, + "node_a": "local", + "node_b": node_b, + "snr": float(snr), + "rssi": float(rssi) if rssi is not None else None, + "secs_ago": age_secs, + }) + + return links + + +async def _upsert_mesh_links(links: list[dict]) -> None: + from db import get_pool + now_utc = datetime.datetime.now(datetime.timezone.utc) + rows = [ + ( + lnk["source_url"], + lnk["node_a"], + lnk["node_b"], + lnk.get("snr"), + None, + now_utc - datetime.timedelta(seconds=int(lnk.get("secs_ago", 0))), + ) + for lnk in links + ] + await get_pool().executemany( """ INSERT INTO mesh_links (source_url, node_a, node_b, snr, link_quality, last_seen) VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (source_url, node_a, node_b) - DO UPDATE SET snr=EXCLUDED.snr, link_quality=EXCLUDED.link_quality, last_seen=EXCLUDED.last_seen + ON CONFLICT (source_url, node_a, node_b) DO UPDATE SET + snr = EXCLUDED.snr, + link_quality = COALESCE(EXCLUDED.link_quality, mesh_links.link_quality), + last_seen = GREATEST(mesh_links.last_seen, EXCLUDED.last_seen) """, rows, ) - - # Check for SNR threshold crossings after each upsert - from mesh_link_alerts import check_link_degradation - for row in rows: - if row[3] is not None: # snr - try: - await check_link_degradation(row[0], row[1], row[2], float(row[3])) - except Exception as exc: - logger.debug("[meshcore] link alert check failed: %s", exc) - - # Also publish to the bus for real-time frontend updates r = await get_bus() - await r.publish("civic:updates", json.dumps(sanitize_payload({ + await r.publish("civic:updates", json.dumps({ "type": "mesh_links", "data": [ - {"source_url": row[0], "node_a": row[1], "node_b": row[2], "snr": row[3], "link_quality": row[4]} - for row in rows - ] - }))) - + { + "source_url": lnk["source_url"], + "node_a": lnk["node_a"], + "node_b": lnk["node_b"], + "snr": lnk.get("snr"), + "link_quality": None, + } + for lnk in links + ], + })) -def _coalesce(*values): - for value in values: - if value is None: - continue - if isinstance(value, str): - trimmed = value.strip() - if trimmed: - return trimmed - continue - return value - return None +async def _publish_health(stats: dict, base_url: str) -> None: + if not isinstance(stats, dict): + return + connected = stats.get("radio_connected", stats.get("connected", True)) + uptime = stats.get("uptime_seconds") or stats.get("uptime_secs") + payload = { + "connected": connected, + "url": base_url, + "uptime_secs": uptime, + "version": stats.get("version"), + "site_name": stats.get("site_name"), + } + await set_feed("mesh:status", payload) + r = await get_bus() + await r.publish("civic:updates", json.dumps(sanitize_payload({ + "type": "mesh_status", + "data": payload, + }))) -def _to_bool(value, default: bool = False) -> bool: - if isinstance(value, bool): - return value - if isinstance(value, (int, float)): - return value != 0 - if isinstance(value, str): - lowered = value.strip().lower() - if lowered in {"1", "true", "yes", "y", "on", "acked", "delivered"}: - return True - if lowered in {"0", "false", "no", "n", "off", "pending"}: - return False - return default - - -def _normalize_mesh_message(data: dict, source_url: str, channel_map: dict[str, str] | None = None) -> dict: - payload = data.get("payload") if isinstance(data.get("payload"), dict) else {} - sender = data.get("sender") if isinstance(data.get("sender"), dict) else {} - conv = data.get("conversation") if isinstance(data.get("conversation"), dict) else {} - - text = _coalesce( - data.get("text"), - data.get("message"), - data.get("body"), - data.get("content"), - payload.get("text"), - payload.get("message"), - payload.get("body"), - ) - if text is None: - text = "" - text = str(text) - - conversation_name = _coalesce( - data.get("conversation_name"), - data.get("conversationName"), - data.get("channel_name"), - data.get("channelName"), - conv.get("name"), - conv.get("label"), - payload.get("channel_name"), - payload.get("channelName"), - ) - conversation_key = _coalesce( - data.get("conversation_key"), - data.get("conversationKey"), - conv.get("key"), - conv.get("id"), - data.get("channel_id"), - data.get("channelId"), - data.get("channel"), - payload.get("channel_id"), - payload.get("channel"), - ) - if conversation_key is None: - conversation_key = "public" - conversation_key = str(conversation_key)[:128] - - if conversation_name is None and channel_map: - conversation_name = channel_map.get(conversation_key) - if conversation_name is not None: - conversation_name = str(conversation_name)[:128] - - msg_type = _coalesce( - data.get("type"), - data.get("msg_type"), - data.get("message_type"), - conv.get("type"), - ) - if msg_type is None: - msg_type = "public" - msg_type = str(msg_type)[:32] - - sender_name = _coalesce( - data.get("sender_name"), - data.get("senderName"), - data.get("from_name"), - sender.get("name"), - sender.get("callsign"), - data.get("name"), - ) - if sender_name is None: - sender_name = "Unknown" - sender_name = str(sender_name)[:128] - - sender_key = _coalesce( - data.get("sender_key"), - data.get("senderKey"), - data.get("from"), - sender.get("key"), - sender.get("id"), - ) - if sender_key is None: - sender_key = "unknown" - sender_key = str(sender_key)[:128] - - timestamp = _coalesce( - data.get("sender_timestamp"), - data.get("senderTimestamp"), - data.get("timestamp"), - data.get("ts"), - data.get("sent_at"), - data.get("created_at"), +def _normalize_repeater_message(data: dict, source_url: str, event_type: str) -> dict: + text = data.get("message_text") or data.get("text") or data.get("body") or "" + sender_pubkey = ( + data.get("author_pubkey") + or data.get("public_key") + or data.get("from") + or "unknown" ) - timestamp = str(timestamp) if timestamp is not None else "" - - message_id = _coalesce( - data.get("id"), - data.get("message_id"), - data.get("msg_id"), - data.get("uuid"), + sender_prefix = data.get("author_prefix") or str(sender_pubkey)[:8] + ts_raw = ( + data.get("post_timestamp") + or data.get("sender_timestamp") + or data.get("timestamp") ) - if message_id is None: - fingerprint = f"{source_url}|{conversation_key}|{sender_key}|{timestamp}|{text}" - message_id = hashlib.sha1(fingerprint.encode("utf-8", errors="ignore")).hexdigest() - message_id = str(message_id)[:64] - - outgoing = _to_bool(_coalesce(data.get("outgoing"), data.get("is_outgoing"), data.get("sent_by_me")), False) - acked = _to_bool( - _coalesce(data.get("acked"), data.get("is_acked"), data.get("delivered"), data.get("status")), - False, + ts = str(ts_raw) if ts_raw is not None else "" + msg_type = "channel" if event_type == "channel_message_received" else "direct" + companion = ( + data.get("companion") + or data.get("room") + or data.get("identity_name") + or "public" ) - + fingerprint = f"{source_url}|{companion}|{sender_pubkey}|{ts}|{text}" + message_id = hashlib.sha1(fingerprint.encode("utf-8", errors="ignore")).hexdigest() return { - "id": message_id, - "msg_type": msg_type, - "conversation_key": conversation_key, - "channel_name": conversation_name, - "text": text, - "sender_name": sender_name, - "sender_key": sender_key, - "outgoing": outgoing, - "acked": acked, - "timestamp": timestamp, - "source_url": source_url, + "id": message_id, + "msg_type": msg_type, + "conversation_key": companion, + "channel_name": companion, + "text": str(text), + "sender_name": sender_prefix, + "sender_key": str(sender_pubkey), + "outgoing": False, + "acked": False, + "timestamp": ts, + "source_url": source_url, } -async def _save_mesh_message(message: dict): +async def _save_mesh_message(message: dict) -> None: from db import get_pool - import datetime - # Try to parse timestamp, fallback to now ts_raw = message.get("timestamp") try: - ts = datetime.datetime.fromisoformat(ts_raw.replace("Z", "+00:00")) if ts_raw else datetime.datetime.now(datetime.timezone.utc) - except: - ts = datetime.datetime.now(datetime.timezone.utc) - - pool = get_pool() - try: - await pool.execute( - """ - INSERT INTO mesh_messages (id, msg_type, conversation_key, channel_name, text, sender_name, sender_key, outgoing, acked, ts, source_url) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) - ON CONFLICT (id) DO UPDATE SET - msg_type = EXCLUDED.msg_type, - conversation_key = EXCLUDED.conversation_key, - channel_name = EXCLUDED.channel_name, - text = EXCLUDED.text, - sender_name = EXCLUDED.sender_name, - sender_key = EXCLUDED.sender_key, - outgoing = EXCLUDED.outgoing, - acked = EXCLUDED.acked, - ts = EXCLUDED.ts, - source_url = EXCLUDED.source_url - """, - message.get("id"), - message.get("msg_type"), - message.get("conversation_key"), - message.get("channel_name"), - message.get("text"), - message.get("sender_name"), - message.get("sender_key"), - message.get("outgoing", False), - message.get("acked", False), - ts, - message.get("source_url") + val = float(ts_raw) if ts_raw else None + ts = ( + datetime.datetime.fromtimestamp(val, tz=datetime.timezone.utc) + if val + else datetime.datetime.now(datetime.timezone.utc) ) - return - except Exception as exc: - # Backward-compat: older DB volumes may not have channel_name yet. - msg = str(exc).lower() - if "channel_name" not in msg or ("does not exist" not in msg and "undefined" not in msg): - raise + except Exception: + ts = datetime.datetime.now(datetime.timezone.utc) - await pool.execute( + await get_pool().execute( """ - INSERT INTO mesh_messages (id, msg_type, conversation_key, text, sender_name, sender_key, outgoing, acked, ts, source_url) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + INSERT INTO mesh_messages + (id, msg_type, conversation_key, channel_name, text, + sender_name, sender_key, outgoing, acked, ts, source_url) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (id) DO UPDATE SET - msg_type = EXCLUDED.msg_type, + msg_type = EXCLUDED.msg_type, conversation_key = EXCLUDED.conversation_key, - text = EXCLUDED.text, - sender_name = EXCLUDED.sender_name, - sender_key = EXCLUDED.sender_key, - outgoing = EXCLUDED.outgoing, - acked = EXCLUDED.acked, - ts = EXCLUDED.ts, - source_url = EXCLUDED.source_url + channel_name = EXCLUDED.channel_name, + text = EXCLUDED.text, + sender_name = EXCLUDED.sender_name, + sender_key = EXCLUDED.sender_key, + outgoing = EXCLUDED.outgoing, + acked = EXCLUDED.acked, + ts = EXCLUDED.ts, + source_url = EXCLUDED.source_url """, message.get("id"), message.get("msg_type"), message.get("conversation_key"), + message.get("channel_name"), message.get("text"), message.get("sender_name"), message.get("sender_key"), message.get("outgoing", False), message.get("acked", False), ts, - message.get("source_url") + message.get("source_url"), ) - - -def _parse_source(url: str) -> dict: - """Extract embedded credentials from URL; return clean base_url and auth tuple.""" - parsed = urlparse(url) - auth = None - if parsed.username: - auth = (parsed.username, parsed.password or "") - netloc = parsed.hostname + (f":{parsed.port}" if parsed.port else "") - url = urlunparse(parsed._replace(netloc=netloc)) - return {"base_url": url.rstrip("/"), "auth": auth} - - -def _to_ws_url(http_url: str) -> str: - if http_url.startswith("https://"): - return "wss://" + http_url[8:] - return "ws://" + http_url.removeprefix("http://") - - -def _auth_headers(auth: tuple | None) -> dict: - if not auth: - return {} - token = base64.b64encode(f"{auth[0]}:{auth[1]}".encode()).decode() - return {"Authorization": f"Basic {token}"} diff --git a/poller/pollers/meshcore_pymc.py b/poller/pollers/meshcore_pymc.py deleted file mode 100644 index 74b38fa..0000000 --- a/poller/pollers/meshcore_pymc.py +++ /dev/null @@ -1,272 +0,0 @@ -""" -MeshCore pyMC poller — direct device access via the `meshcore` Python library. - -Connects to a MeshCore device over TCP or serial and retrieves the full -contact list plus the per-contact neighbour tables. Unlike the RemoteTerm -poller, this path captures SNR as *measured by each remote node*, giving -bi-directional link metrics for every hop in the mesh. - -Configure poller_sources with type=meshcore_pymc and a connection URL: - tcp://192.168.1.x:5000 TCP (e.g. via socat serial-to-TCP bridge) - serial:///dev/ttyUSB0 serial at 115 200 baud (default) - serial:///dev/ttyACM0?baudrate=9600 - -Requires: pip install meshcore -""" -import asyncio -import datetime -import json -import logging -import time -from urllib.parse import urlparse, parse_qs - -from bus import get_bus, publish_entity -from .base import BasePoller - -try: - from meshcore import MeshCore - _MESHCORE_AVAILABLE = True -except ImportError: - MeshCore = None # type: ignore[assignment,misc] - _MESHCORE_AVAILABLE = False - -logger = logging.getLogger(__name__) - -_CONTACT_POLL_INTERVAL = 120 # seconds between full refresh cycles -_NEIGHBOUR_TIMEOUT = 10 # seconds to await each node's neighbour response -_RETRY_DELAY = 30 - -_CONTACT_TYPES = {0: "unknown", 1: "client", 2: "repeater", 3: "room", 4: "sensor"} - - -def _parse_url(url: str) -> tuple[str, dict]: - """Return (kind, params) for 'tcp' or 'serial' connection URLs.""" - parsed = urlparse(url) - scheme = parsed.scheme.lower() - if scheme == "tcp": - return "tcp", { - "host": parsed.hostname or "localhost", - "port": parsed.port or 5000, - } - if scheme == "serial": - qs = parse_qs(parsed.query) - return "serial", { - "port": parsed.path, - "baudrate": int(qs.get("baudrate", [115200])[0]), - } - raise ValueError( - f"Unsupported meshcore_pymc URL scheme {scheme!r}. Use tcp:// or serial://" - ) - - -def _find_full_pubkey(contacts: dict, prefix: str) -> str | None: - """Match a neighbour's pubkey prefix against the full contact table.""" - for pk in contacts: - if pk.startswith(prefix): - return pk - return None - - -def _contact_to_entity(contact: dict, source_url: str) -> dict | None: - pub_key = contact.get("public_key", "") - if not pub_key: - return None - - name = (contact.get("adv_name") or "").strip() or pub_key[:12] - contact_type = _CONTACT_TYPES.get(contact.get("type", 0), "unknown") - - lat = contact.get("adv_lat") - lon = contact.get("adv_lon") - if lat == 0.0 and lon == 0.0: - lat = lon = None - - return { - "entity_id": f"mesh_node:{pub_key}", - "entity_type": "mesh_node", - "source": "meshcore_pymc", - "display_name": name, - "lat": lat, - "lon": lon, - "altitude": None, - "status": "", - "identity": { - "public_key": pub_key, - "node_id": pub_key[:12], - "short_name": name[:12], - "contact_type": contact_type, - "last_advert": contact.get("last_advert"), - "source_url": source_url, - }, - "tags": ["mesh_node", contact_type], - } - - -async def _upsert_pymc_links(links: list[dict]) -> None: - from db import get_pool - now_utc = datetime.datetime.now(datetime.timezone.utc) - rows = [ - ( - lnk["source_url"], - lnk["node_a"], - lnk["node_b"], - lnk.get("snr"), - None, # link_quality — not available via pyMC neighbour table - now_utc - datetime.timedelta(seconds=int(lnk.get("secs_ago", 0))), - ) - for lnk in links - ] - await get_pool().executemany( - """ - INSERT INTO mesh_links (source_url, node_a, node_b, snr, link_quality, last_seen) - VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (source_url, node_a, node_b) DO UPDATE SET - snr = EXCLUDED.snr, - link_quality = COALESCE(EXCLUDED.link_quality, mesh_links.link_quality), - last_seen = GREATEST(mesh_links.last_seen, EXCLUDED.last_seen) - """, - rows, - ) - r = await get_bus() - await r.publish("civic:updates", json.dumps({ - "type": "mesh_links", - "data": [ - { - "source_url": lnk["source_url"], - "node_a": lnk["node_a"], - "node_b": lnk["node_b"], - "snr": lnk.get("snr"), - "link_quality": None, - } - for lnk in links - ], - })) - - -class MeshCorePymcPoller(BasePoller): - """Direct pyMC poller: contacts + remote-node neighbour SNR tables.""" - - name = "meshcore_pymc" - interval = _CONTACT_POLL_INTERVAL - - def __init__(self): - self._sources: list[str] = [] - - async def poll(self): - pass # Overrides run() - - async def setup(self): - if not _MESHCORE_AVAILABLE: - logger.error( - "[meshcore_pymc] 'meshcore' package not installed — " - "run: pip install meshcore" - ) - return - from db import get_pool - rows = await get_pool().fetch( - "SELECT url FROM poller_sources " - "WHERE type = 'meshcore_pymc' AND enabled = TRUE" - ) - self._sources = [row["url"] for row in rows] - if self._sources: - logger.info("[meshcore_pymc] %d source(s) configured", len(self._sources)) - else: - logger.info("[meshcore_pymc] no sources configured — poller inactive") - - async def run(self): - await self.setup() - if not self._sources or not _MESHCORE_AVAILABLE: - return - await asyncio.gather(*[ - asyncio.create_task(self._run_source(url)) - for url in self._sources - ]) - - async def _run_source(self, url: str): - while True: - try: - await self._poll_once(url) - await self._heartbeat("ok") - except Exception as exc: - logger.error("[meshcore_pymc] %s: %s", url, exc) - await self._heartbeat("error", str(exc)[:256]) - await asyncio.sleep(_CONTACT_POLL_INTERVAL) - - async def _poll_once(self, url: str): - kind, params = _parse_url(url) - mc = None - try: - if kind == "tcp": - mc = await MeshCore.create_tcp( - params["host"], params["port"], - only_error=True, default_timeout=15, auto_reconnect=False, - ) - else: - mc = await MeshCore.create_serial( - params["port"], params["baudrate"], - only_error=True, default_timeout=15, auto_reconnect=False, - ) - - if mc is None: - logger.warning("[meshcore_pymc] could not connect: %s", url) - return - - await mc.ensure_contacts() - contacts: dict = mc.contacts - logger.debug("[meshcore_pymc] %s: %d contacts", url, len(contacts)) - - # Publish canonical entities for every known contact - for contact in contacts.values(): - entity = _contact_to_entity(contact, url) - if entity: - await publish_entity(entity, ttl=_CONTACT_POLL_INTERVAL * 3) - - # Fetch neighbour tables — one binary request per contact node. - # Each response reports the SNR that *that node* measured for each - # of its neighbours, giving us remote-node signal metrics. - for contact in contacts.values(): - await self._fetch_neighbours(mc, contact, contacts, url) - - finally: - if mc is not None: - try: - await mc.disconnect() - except Exception: - pass - - async def _fetch_neighbours( - self, - mc: "MeshCore", - contact: dict, - all_contacts: dict, - source_url: str, - ) -> None: - label = contact.get("adv_name") or contact["public_key"][:12] - try: - result = await mc.commands.fetch_all_neighbours( - contact, timeout=_NEIGHBOUR_TIMEOUT - ) - except Exception as exc: - logger.debug("[meshcore_pymc] neighbour fetch skipped (%s): %s", label, exc) - return - - if not result or not result.get("neighbours"): - return - - node_a_id = f"mesh_node:{contact['public_key']}" - links = [] - for nb in result["neighbours"]: - peer_pk = _find_full_pubkey(all_contacts, nb["pubkey"]) or nb["pubkey"] - links.append({ - "source_url": source_url, - "node_a": node_a_id, - "node_b": f"mesh_node:{peer_pk}", - "snr": nb.get("snr"), - "secs_ago": nb.get("secs_ago", 0), - }) - - if links: - await _upsert_pymc_links(links) - logger.debug( - "[meshcore_pymc] %s: %d links from %s", - source_url, len(links), label, - )