Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 51 additions & 27 deletions backend/routers/admin_debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,38 @@ async def _resolve_source(db: AsyncSession, source_type: str, source_url: Option
return dict(source)


async def _http_get_check(url: str, auth: Optional[httpx.BasicAuth] = None, timeout: float = 10.0) -> tuple[dict, object | None]:
def _summarize_list(payload) -> str:
items = payload if isinstance(payload, list) else (
payload.get("data") or payload.get("items") or []
if isinstance(payload, dict) else []
)
return f"{len(items)} item(s)"


def _summarize_stats(payload) -> str:
if not isinstance(payload, dict):
return ""
connected = payload.get("radio_connected", payload.get("connected"))
site = payload.get("site_name", "")
version = payload.get("version", "")
parts = [p for p in [
f"connected={connected}" if connected is not None else None,
f"site={site}" if site else None,
f"v{version}" if version else None,
] if p]
return " ".join(parts)


async def _http_get_check(
url: str,
auth: Optional[httpx.BasicAuth] = None,
extra_headers: Optional[dict[str, str]] = None,
timeout: float = 10.0,
) -> tuple[dict, object | None]:
t0 = time.perf_counter()
try:
validate_safe_url(url, allowed_schemes={"http", "https"})
async with httpx.AsyncClient(auth=auth, timeout=timeout) as client:
async with httpx.AsyncClient(auth=auth, headers=extra_headers or {}, timeout=timeout) as client:
resp = await client.get(url)
latency_ms = round((time.perf_counter() - t0) * 1000, 1)
payload = None
Expand Down Expand Up @@ -315,25 +342,23 @@ async def probe_remote_feed(body: RemoteFeedProbeRequest, db: AsyncSession = Dep
recommendations: list[str] = []

if body.source_type == "meshcore":
for name, path in (("health", "/api/health"), ("contacts", "/api/contacts"), ("neighbors", "/api/neighbors")):
check, payload = await _http_get_check(f"{base_url}{path}", auth=httpx_auth)
if isinstance(payload, list):
check["summary"] = f"{len(payload)} item(s)"
elif isinstance(payload, dict):
if path == "/api/health":
connected = payload.get("radio_connected", payload.get("connected", payload.get("radio_ok")))
check["summary"] = f"radio_connected={connected}"
elif "items" in payload and isinstance(payload.get("items"), list):
check["summary"] = f"{len(payload.get('items', []))} item(s)"
elif "contacts" in payload and isinstance(payload.get("contacts"), list):
check["summary"] = f"{len(payload.get('contacts', []))} contact(s)"
elif "neighbors" in payload and isinstance(payload.get("neighbors"), list):
check["summary"] = f"{len(payload.get('neighbors', []))} neighbor(s)"

# pyMC-Repeater uses X-API-Key auth (URL username = API key)
pymc_headers = {}
if auth:
pymc_headers["X-API-Key"] = auth[0]

for name, path, summarize in (
("stats", "/api/stats", _summarize_stats),
("adverts", "/api/adverts_by_contact_type", _summarize_list),
("packets", "/api/recent_packets", _summarize_list),
):
check, payload = await _http_get_check(
f"{base_url}{path}", extra_headers=pymc_headers
)
if payload is not None:
check["summary"] = summarize(payload)
checks.append({"name": name, "protocol": "http", **check})

ws = await _probe_ws(_to_ws_url(base_url) + "/api/ws", body.duration_seconds, headers=headers)

stats_row = await db.execute(
text(
"SELECT COUNT(*) AS total_count, "
Expand All @@ -350,15 +375,14 @@ async def probe_remote_feed(body: RemoteFeedProbeRequest, db: AsyncSession = Dep
"latest_timestamp": stats.get("latest_ts").isoformat() if stats.get("latest_ts") else None,
}

neighbors = next((c for c in checks if c.get("name") == "neighbors"), None)
if not ws.get("connected"):
recommendations.append("WebSocket connection failed; verify RemoteTerm reachability and auth.")
if ws.get("connected") and ws.get("event_counts", {}).get("message", 0) == 0:
recommendations.append("No message events observed; RemoteTerm may be emitting only telemetry (health/contact/raw_packet).")
if neighbors and neighbors.get("status_code") == 404:
recommendations.append("/api/neighbors returned 404; topology enrichment via neighbor endpoint is unavailable.")
stats_check = next((c for c in checks if c.get("name") == "stats"), None)
adverts_check = next((c for c in checks if c.get("name") == "adverts"), None)
if stats_check and not stats_check.get("ok"):
recommendations.append("Could not reach /api/stats — verify the repeater URL and that pyMC-Repeater is running.")
if adverts_check and adverts_check.get("status_code") == 401:
recommendations.append("/api/adverts_by_contact_type returned 401; embed the API key in the source URL as http://API_KEY@host:port.")
if storage["total_messages"] == 0:
recommendations.append("No persisted mesh messages found for this source yet.")
recommendations.append("No persisted mesh messages found; messages arrive via SSE only when a companion identity is configured on the repeater.")

elif body.source_type == "adsb":
check, payload = await _http_get_check(base_url, auth=httpx_auth)
Expand Down
7 changes: 4 additions & 3 deletions config/sources.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,11 @@ poller_sources:
enabled: true
source: config

- type: meshcore # MeshCore — Remote-Terminal-for-MeshCore base URL
name: "RemoteTerm" # https://github.com/jkingsman/Remote-Terminal-for-MeshCore
- type: meshcore # MeshCore — pyMC-Repeater base URL
name: "pyMC-Repeater" # https://github.com/pyMC-dev/pyMC_Repeater
url: "http://192.168.1.x:8000"
# Embed credentials if Basic Auth is enabled: http://user:pass@192.168.1.x:8000
# Embed an API key as the URL username if auth is enabled:
# http://MY_API_KEY@192.168.1.x:8000
enabled: true
source: config

Expand Down
2 changes: 0 additions & 2 deletions poller/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from pollers.utilities import UtilityPoller
from pollers.p25 import P25Poller
from pollers.meshcore import MeshCorePoller
from pollers.meshcore_pymc import MeshCorePymcPoller
from pollers.summary import AISummaryPoller
from pollers.seismic import SeismicPoller
from pollers.fire import FirePoller
Expand Down Expand Up @@ -76,7 +75,6 @@ async def main():
UtilityPoller(),
P25Poller(),
MeshCorePoller(),
MeshCorePymcPoller(),
AISummaryPoller(),
SeismicPoller(),
FirePoller(),
Expand Down
106 changes: 51 additions & 55 deletions poller/normalizers/mesh_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,57 @@ def snr_to_quality(snr) -> float | None:
}


def normalize_pymc_repeater_advert(data: dict, source_url: str) -> Optional[dict]:
"""Normalize a pyMC-Repeater advert to a canonical mesh_node entity."""
pub_key = (data.get("public_key") or "").strip()
if not pub_key or pub_key == "0" * 64:
return None

name = (data.get("name") or "").strip() or pub_key[:12]

contact_type_raw = data.get("contact_type") or data.get("type") or 0
if isinstance(contact_type_raw, int):
contact_type = _CONTACT_TYPES.get(contact_type_raw, "unknown")
else:
contact_type = str(contact_type_raw)

lat = data.get("gps_lat") or data.get("lat")
lon = data.get("gps_lon") or data.get("lon")
if lat == 0.0 and lon == 0.0:
lat = lon = None

last_advert = data.get("last_advert_timestamp") or data.get("lastmod")
if isinstance(last_advert, (int, float)):
last_seen = datetime.fromtimestamp(last_advert, tz=timezone.utc).isoformat()
else:
last_seen = _now()

out_path_len = data.get("out_path_len")
status = f"hops:{out_path_len}" if out_path_len is not None else ""

return {
"entity_id": f"mesh_node:{pub_key}",
"entity_type": "mesh_node",
"source": "meshcore",
"display_name": name,
"identity": {
"public_key": pub_key,
"node_id": pub_key[:12],
"short_name": name[:12],
"contact_type": contact_type,
"source_url": source_url,
"out_path": data.get("out_path"),
"out_path_len": out_path_len,
},
"lat": lat,
"lon": lon,
"altitude": None,
"status": status,
"last_seen": last_seen,
"tags": ["mesh_node", contact_type],
}


def normalize_mesh_node(data: dict) -> Optional[dict]:
"""Normalize a MeshCore bridge node_update payload to canonical Entity."""
node_id = data.get("entity_id", "").replace("mesh_node:", "") or data.get("identity", {}).get("node_id")
Expand Down Expand Up @@ -49,61 +100,6 @@ def normalize_mesh_node(data: dict) -> Optional[dict]:
}


def normalize_remoteterm_contact(data: dict) -> Optional[dict]:
"""Normalize a RemoteTerm /api/contacts entry to canonical Entity."""
pub_key = data.get("public_key", "")
if not pub_key:
return None

node_type = _CONTACT_TYPES.get(data.get("type", 0), "unknown")
name = data.get("name") or pub_key[:12]

last_seen_raw = data.get("last_seen")
if isinstance(last_seen_raw, (int, float)):
last_seen = datetime.fromtimestamp(last_seen_raw, tz=timezone.utc).isoformat()
elif isinstance(last_seen_raw, str):
last_seen = last_seen_raw
else:
last_seen = _now()

tags = ["mesh_node", node_type]
if data.get("on_radio"):
tags.append("on_radio")
if data.get("favorite"):
tags.append("favorite")

return {
"entity_id": f"mesh_node:{pub_key}",
"entity_type": "mesh_node",
"source": "meshcore",
"display_name": name,
"identity": {
"public_key": pub_key,
"node_id": pub_key[:12],
"short_name": pub_key[:12],
"contact_type": node_type,
"hw_model": None,
"on_radio": data.get("on_radio", False),
"favorite": data.get("favorite", False),
"battery_level": data.get("battery_level"),
},
"lat": data.get("lat"),
"lon": data.get("lon"),
"altitude": None,
"status": _remoteterm_status(data),
"last_seen": last_seen,
"tags": tags,
}


def _remoteterm_status(contact: dict) -> str:
parts = []
effective = contact.get("effective_route")
if isinstance(effective, dict) and (hops := effective.get("hop_count")) is not None:
parts.append(f"hops:{hops}")
return " ".join(parts)


def _bridge_status(node: dict) -> str:
parts = []
if (v := node.get("battery_pct")) is not None:
Expand Down
Loading
Loading