From 22189eb60bf97339ce7b8a05d0e95bed1ab908aa Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Mon, 1 Jun 2026 10:14:51 +0100 Subject: [PATCH 01/12] docs(meshcore-path): record milestone ticket numbers and M1 start --- docs/features/meshcore/README.md | 2 + .../meshcore/packet-path-tracing/README.md | 24 ++ ...-meshcore-packet-path-tracing-subsystem.md | 342 ++++++++++++++++++ .../packet-path-tracing-outstanding.md | 37 ++ .../packet-path-tracing-progress.md | 62 ++++ docs/features/traceroute/README.md | 2 +- .../traceroute/meshcore-path-outstanding.md | 1 + 7 files changed, 469 insertions(+), 1 deletion(-) create mode 100644 docs/features/meshcore/packet-path-tracing/README.md create mode 100644 docs/features/meshcore/packet-path-tracing/adr/0001-meshcore-packet-path-tracing-subsystem.md create mode 100644 docs/features/meshcore/packet-path-tracing/packet-path-tracing-outstanding.md create mode 100644 docs/features/meshcore/packet-path-tracing/packet-path-tracing-progress.md diff --git a/docs/features/meshcore/README.md b/docs/features/meshcore/README.md index 9347e2b..a845bb0 100644 --- a/docs/features/meshcore/README.md +++ b/docs/features/meshcore/README.md @@ -11,6 +11,7 @@ Cross-repo MeshCore work is tracked on GitHub epics [#264](https://github.com/ps | **2** — parity, rename track, position/text | [phase-2-progress.md](./phase-2-progress.md) | [phase-2-outstanding.md](./phase-2-outstanding.md) | | **ManagedNode identity** ([#362](https://github.com/pskillen/meshflow-api/issues/362)) | [managed-node-identity-progress.md](./managed-node-identity-progress.md) | [managed-node-identity-outstanding.md](./managed-node-identity-outstanding.md) | | **Feeder enrollment** ([#293](https://github.com/pskillen/meshflow-ui/issues/293)) | [enrollment-progress.md](./enrollment-progress.md) | [enrollment-outstanding.md](./enrollment-outstanding.md) | +| **Passive packet path** ([#267](https://github.com/pskillen/meshflow-api/issues/267)) | [packet-path-tracing/packet-path-tracing-progress.md](./packet-path-tracing/packet-path-tracing-progress.md) | [packet-path-tracing/packet-path-tracing-outstanding.md](./packet-path-tracing/packet-path-tracing-outstanding.md) | **Agent convention:** [progress-tracking skill](../../../.cursor/skills/progress-tracking/SKILL.md) — update progress/outstanding at plan breakpoints and before PRs. @@ -18,6 +19,7 @@ Cross-repo MeshCore work is tracked on GitHub epics [#264](https://github.com/ps - [feeder-bootstrap.md](./feeder-bootstrap.md) — first MC feeder + API key + `mc_pubkey` - [text-message-channels.md](./text-message-channels.md) — Phase 2.2 text + channels (device → API sync, apply-to-radio, ops troubleshooting) +- [packet-path-tracing/](./packet-path-tracing/) — proposed passive MC packet path subsystem (capture, resolution, Neo4j, realtime/history UI) - [../node-lifecycle/node-claims-meshcore.md](../node-lifecycle/node-claims-meshcore.md) — ownership claims via contact/DM proof **Recent follow-up (post–#295 / staging):** see [phase-2-progress.md](./phase-2-progress.md) § “Feeder identity & apply fixes” and [phase-2-outstanding.md](./phase-2-outstanding.md) § “Phase 2.2 — staging & ops”. diff --git a/docs/features/meshcore/packet-path-tracing/README.md b/docs/features/meshcore/packet-path-tracing/README.md new file mode 100644 index 0000000..17c9d72 --- /dev/null +++ b/docs/features/meshcore/packet-path-tracing/README.md @@ -0,0 +1,24 @@ +# MeshCore passive packet path + +The MeshCore passive packet path subsystem turns ingested MeshCore packet route hints into realtime views, historical topology maps, Neo4j graph data, and router-importance stats. + +It is intentionally separate from the Meshtastic `traceroute` subsystem: + +- Meshtastic traceroute is an active probe lifecycle (`AutoTraceRoute`) with command dispatch, completion, timeout, and success/failure semantics. +- MeshCore passive packet path starts from passive packet observations (`MeshCorePacketObservation.path_hashes`) and aggregates observed path evidence. + +## ADRs + +- [ADR-0001 - MeshCore passive packet path subsystem](./adr/0001-meshcore-packet-path-tracing-subsystem.md) + +## Progress tracking + +- [Progress](./packet-path-tracing-progress.md) — what has shipped / is in flight +- [Outstanding](./packet-path-tracing-outstanding.md) — open decisions and discovered debt + +## Related docs + +- [Traceroute feature](../../traceroute/README.md) +- [MeshCore path hash resolution ADR](../../traceroute/adr/0001-mc-path-hash-resolution.md) +- [MeshCore packet ingestion](../../packet-ingestion/meshcore.md) +- [MeshCore packet fields](../../packet-ingestion/MESHCORE_PACKET_FIELDS.md) diff --git a/docs/features/meshcore/packet-path-tracing/adr/0001-meshcore-packet-path-tracing-subsystem.md b/docs/features/meshcore/packet-path-tracing/adr/0001-meshcore-packet-path-tracing-subsystem.md new file mode 100644 index 0000000..2021f2a --- /dev/null +++ b/docs/features/meshcore/packet-path-tracing/adr/0001-meshcore-packet-path-tracing-subsystem.md @@ -0,0 +1,342 @@ +# ADR-0001 - MeshCore passive packet path subsystem + +**Status:** Proposed +**Date:** 2026-05-28 +**Tracking:** [meshflow-api#267](https://github.com/pskillen/meshflow-api/issues/267) + +> **Naming note.** This subsystem lives under `meshcore/packet-path-tracing/` for discoverability, but it is **passive packet path evidence**, not an active probe. Meshtastic "traceroute" is a commanded round trip; MeshCore here is reconstructed from packets the mesh forwarded anyway. Prose deliberately avoids "trace"/"probe" wording so the distinction from `traceroute` stays obvious. + +## Context + +Meshtastic path discovery is represented by the `traceroute` subsystem: + +- `AutoTraceRoute` records a commanded probe, its lifecycle, and its result. +- `TraceroutePacketService` completes the row from a Meshtastic traceroute response. +- `traceroute_analytics` denormalises completed traceroutes into Neo4j and powers heatmap, coverage, and router-style analytics. + +MeshCore packet path data is different. Forwarded MeshCore packets may include repeater path segments (`path_hashes`) on each feeder observation, but they are passive packet evidence rather than commanded traceroute responses. Channel messages often do not identify the sender, paths may differ per feeder for the same deduped packet, and path segments are short opaque hashes until a proven resolver exists. + +The current passive slice already captures the raw path data on `MeshCorePacketObservation.path_hashes` and exposes display-only `resolved_path` in the message heard API. [Traceroute ADR-0001](../../../traceroute/adr/0001-mc-path-hash-resolution.md) explicitly forbids heuristic hash-to-node matching until the derivation or binding is proven. + +This ADR proposes a distinct **MeshCore passive packet path** subsystem that mirrors useful Meshtastic traceroute outcomes - maps, graph exports, important-router stats - without pushing passive packet observations through `AutoTraceRoute`. + +## Decision + +Create a MeshCore-specific passive packet path subsystem, likely in a new Django app such as `meshcore_path` / `meshcore_packet_path` or a clearly bounded package under `meshcore_packets`, with these boundaries: + +1. **Source of truth remains raw ingest.** `MeshCorePacketObservation.path_hashes` is the **authoritative per-feeder capture** of wire path segments for uploaded packets. It is not strictly immutable: ingest uses `update_or_create` on `(packet, observer)` and may refresh RF/path fields when the same feeder re-reports an observation (see `meshcore_packets/serializers.py`). Downstream rollups must therefore treat an observation's path as the latest authoritative value, not an append-only event. +2. **Resolution is explicit and versioned.** Hash segment to `ObservedNode` resolution is stored separately from raw observations and is only populated by proven rules. No such rule exists today; see [Resolution plan](#resolution-plan) for which event sources *might* enable one once their relationship to path hashes is established. +3. **Passive evidence does not become `AutoTraceRoute`.** The subsystem must not create `AutoTraceRoute` rows for every packet path. It has no pending/sent/completed lifecycle, no timeout, and no WebSocket command dispatch. +4. **Analytics are aggregate-first.** Neo4j and longer-term history receive coalesced edge rollups, not one graph write per packet observation. +5. **Realtime display is best-effort, short-lived evidence.** The UI may receive WebSocket events for recent packet paths, emitted **after transaction commit** and subject to sampling/coalescing/caps. No downstream code may assume every observation produces a realtime event; the durable record is the aggregate path-edge model. + +## Non-goals + +- No MeshCore active traceroute command design in this ADR. Active MeshCore traceroute may later use a sibling model or an extended `AutoTraceRoute` path after a separate spike. +- No heuristic matching of path hashes to `ObservedNode` rows using suffix/prefix/last-heard guesses. +- No success/failure or coverage percentage semantics copied from Meshtastic traceroutes. Passive packet path evidence can show observed paths, not attempted route success. +- No requirement to preserve every raw path edge forever once raw observations and aggregate rollups are available. + +## Data flow + +```mermaid +flowchart LR + Bot[meshflow-bot] + Ingest[MeshCore packet ingest] + Observation[MeshCorePacketObservation path_hashes] + Resolver[Path segment resolver] + Realtime[WS recent path event] + Rollup[Path edge rollup task] + Aggregate[MeshCorePathEdgeBucket] + Neo4j[Neo4j path graph] + HistoryAPI[History and topology API] + UI[MeshCore path UI] + + Bot --> Ingest --> Observation + Observation --> Resolver + Observation --> Realtime --> UI + Observation --> Rollup --> Aggregate + Resolver --> Rollup + Aggregate --> Neo4j + Aggregate --> HistoryAPI --> UI +``` + + + +## Proposed model concepts + +Names are illustrative; implementation may adjust them. + +### `MeshCorePathSegmentResolution` + +One row per resolvable path segment identity. + + +| Field | Purpose | +| ------------------ | ----------------------------------------------------------------------------------------------- | +| `segment_hash` | Normalized hex segment, e.g. `f3bcf1` | +| `hash_size` | Bytes per segment (`rx_log_data.path_hash_size`); derive from segment length only as a fallback | +| `hash_mode` | `path_hash_mode` from the carrying frame (see collision note below) | +| `observed_node` | Nullable FK to `ObservedNode(protocol=MESHCORE)` | +| `status` | `unknown`, `resolved`, `ambiguous`, `stale` | +| `source` | `path_update`, `trace_data`, `derived_hash`, `manual_admin`, etc. | +| `resolver_version` | Allows reprocessing when the resolver changes | +| `confidence` | Optional score or enum; keep exact/proven matches distinct from hints | +| `last_seen_at` | Last observation carrying this segment | + + +**Hash mode / size are part of segment identity.** The wire carries `path_hash_mode` on `channel_message` and `contact_message`, and `path_hash_size` on `rx_log_data` (see [MESHCORE_PACKET_FIELDS.md](../../../packet-ingestion/MESHCORE_PACKET_FIELDS.md)). If the mode affects how a segment is derived or interpreted, then `segment_hash + hash_size` alone is **not** a safe identity — two unrelated hops could collide across modes. Until the meaning of `path_hash_mode` is confirmed (spike), treat the resolution key as `(hash_mode, hash_size, segment_hash)` and do not merge segments observed under different modes. Capturing the mode now avoids a painful re-key later if it turns out to matter. + +Raw display can continue resolving at read time, but graph export and historical UI need proactive resolution so the same segment maps consistently across API, WebSocket, and rollup jobs. + +### `MeshCorePathEdgeBucket` + +Aggregate edge counts for history and Neo4j export. + + +| Field | Purpose | +| ----------------------------------- | --------------------------------------------------------- | +| `bucket_start`, `bucket_size` | Hour or day bucket | +| `from_kind`, `to_kind` | `feeder`, `hash`, `node`, `unknown` | +| `from_hash`, `to_hash` | Raw hash endpoint when unresolved | +| `from_node`, `to_node` | Nullable FKs when resolved | +| `observer` | Optional feeder dimension for per-feeder views | +| `constellation` | Optional feeder/channel constellation dimension | +| `packet_count`, `observation_count` | Rollup weight | +| `first_seen_at`, `last_seen_at` | Edge recency | +| `avg_snr`, `min_snr`, `max_snr` | Optional RF summaries where observation SNR is meaningful | + + +The first implementation can roll up hash-to-hash edges only. Node-to-node and feeder-to-node edges should appear as resolution improves. + +### Edge semantics (what an edge means) + +A feeder observation gives an **ordered list of path hash segments** plus the observing feeder. It does **not**, on its own, give a sender or a proven direction — channel messages frequently lack any sender identity, and the relationship between list order and physical forwarding direction is not yet confirmed. + +Therefore the **first rollup creates only ordered hash-chain edges between consecutive segments**, plus an optional `observer` (feeder) dimension: + +- For path `[h1, h2, h3]`: edges `h1 -> h2` and `h2 -> h3`, with `from_kind = to_kind = hash`. +- The observing feeder is recorded as a dimension on the bucket (for per-feeder views and as a likely terminal endpoint), **not** asserted as a graph node on the chain unless that is later proven. + +Explicitly **out of scope for v1 edges** until proven: + +- `sender -> hop1` edges (sender identity usually unknown for channel text). +- `hopN -> observer` / `observer -> hopN` edges as directed claims. +- Any node-to-node edge before the segment is resolved to an `ObservedNode` via `[MeshCorePathSegmentResolution](#meshcorepathsegmentresolution)`. + +Treat edge direction as "list order", not "forwarding direction", and label it as such in the API and UI until direction is established. Resolved node/feeder edges are an enrichment layer applied on top once resolution exists. + +### Optional short-retention raw edge table + +If debugging or near-realtime replay needs per-observation edge rows, add a short-retention table such as `MeshCorePathEdgeObservation`. It should be TTL/prunable and not the primary analytics source. + +## Capture plan + +### Current capture + +Mostly done: + +- `meshflow-bot` uploads `path_hashes` for supported packet types when the MeshCore event includes `path` and `path_hash_size`. +- `meshcore_packets` stores those segments on `MeshCorePacketObservation`, not on the deduped packet row, so each feeder can retain its own path. +- Message history exposes `heard[].path_hashes`, `heard[].resolved_path`, and `path_known=false` for display. + +### Additional capture + +Add tickets for: + +1. Upload non-text `rx_log_data` PATH frames where they carry path data but no business message. +2. Upload `path_update` events. **Caveat:** captured `path_update` frames carry `public_key` only — no path hash is present in that event in the field docs ([MESHCORE_PACKET_FIELDS.md](../../../packet-ingestion/MESHCORE_PACKET_FIELDS.md)). So `path_update` does **not** today prove any hash → pubkey binding; capture it because it *may help once its relationship to path hashes is proven*, not as a known resolver source. +3. Upload `trace_data` events after a spike confirms their relationship to active MeshCore traces, path hashes, SNR, and endpoints. +4. Store `path_hash_size` (and `path_hash_mode`, see segment identity note) explicitly; only fall back to segment length when a frame omits the size, so different segment widths/modes never collide. + +## Resolution plan + +Resolution should happen in two layers: + +1. **Read-time formatting** remains for simple API display. `path_resolution.format_path_hops()` can continue returning `unknown` until a cache/table supplies a proven mapping. +2. **Proactive resolver** updates `MeshCorePathSegmentResolution` whenever: + - a new event that is *proven* to bind a segment to an identity is ingested (no such event is confirmed yet — `path_update` carries a pubkey but no path hash, so it is a candidate to investigate, not a current source), + - a new full `ObservedNode.mc_pubkey` is learned and a proven derivation rule exists, + - a resolver version changes, + - an operator triggers a backfill. + +No proven resolver exists at the time of writing; the v1 table is populated entirely with `status = unknown` and the proactive resolver is a later milestone gated on a spike. The resolver must be idempotent and test-backed. Tests should explicitly reject suffix/prefix/recency guesses unless the ADR proving that rule has been updated. + +## Neo4j plan + +Do not reuse the Meshtastic `MeshNode.node_id` integer key directly for MeshCore. Current traceroute graph export assumes `meshtastic_node_id` and completed `AutoTraceRoute` rows. + +Use either: + +- a generalized key, e.g. `node_key = "mt:!12345678"`, `node_key = "mc:"`, `node_key = "mc_hash::"`; or +- a parallel label for MeshCore passive entities, e.g. `MeshCorePathNode`. + +Create passive relationships separate from active traceroutes: + + +| Relationship | Meaning | +| --------------- | -------------------------------------------------------------------- | +| `PATH_OBSERVED` | Passive MeshCore packet path edge, weighted by bucketed observations | +| `ROUTED_TO` | Existing active Meshtastic traceroute edge | + + +Recommended relationship properties: + +- `protocol = "meshcore"` +- `evidence = "packet_path"` +- `bucket_start`, `bucket_size` +- `weight` +- `observation_count`, `packet_count` +- `resolved = true/false` +- `source = "rollup"` + +Neo4j export should read `MeshCorePathEdgeBucket` rows. It should not run from the ingest request or per-observation signal. + +## API and WebSocket plan + +### Realtime API / WS + +Add a WebSocket event for short-term visualization: + +```json +{ + "type": "meshcore.packet_path.observed", + "observed_at": "2026-05-28T08:35:00Z", + "packet_id": "uuid", + "observer": {"internal_id": 123, "node_id_str": "mc:abcdef123456"}, + "path": [ + {"hash": "f3bcf1", "status": "unknown"}, + {"hash": "a1", "status": "resolved", "node_id_str": "mc:..."} + ], + "rx_snr": -9.75 +} +``` + +Events are emitted **after the ingest transaction commits** (e.g. via `transaction.on_commit`) so consumers never see a path that was rolled back. They are **best-effort**: under load they may be sampled, coalesced per feeder/channel, or dropped against a cap. No subscriber may treat the WS stream as a complete or ordered log of observations — the durable record is the aggregate path-edge model. + +Use a bounded in-memory or Redis-backed recent buffer plus `GET /api/meshcore/path-tracing/recent/` so the UI can reconnect without needing long polling or replaying all observations. + +### History APIs + +Suggested endpoints: + + +| Endpoint | Purpose | +| -------------------------------------------- | ---------------------------------------- | +| `GET /api/meshcore/path-tracing/recent/` | Short window for realtime page hydration | +| `GET /api/meshcore/path-tracing/edges/` | Bucketed edges for map/topology views | +| `GET /api/meshcore/path-tracing/nodes//` | Node-centric path evidence | +| `GET /api/meshcore/path-tracing/routers/` | Important router / centrality summary | + + +**Backfill is a management command first**, not an HTTP endpoint. Rollup/resolution backfill should ship as `manage.py` commands (consistent with the existing Neo4j export commands) so an early subsystem does not add another write surface to secure, document, and rate-limit. A staff-only `POST /api/meshcore/path-tracing/backfill/` can be added later if operators actually need to trigger it from the UI. + +These should live under MeshCore path tracing, not `/api/traceroutes/`, until/unless the API grows a protocol-neutral topology namespace. + +## UI plan + +### Immediate / realtime view + +Goal: operational view of what the MC mesh is doing now. + +- Subscribe to `meshcore.packet_path.observed`. +- Show recent packet paths with feeder markers, resolved nodes where available, and hash labels where unresolved. +- Keep client-side retention short, e.g. 5-30 minutes, with server-provided caps. +- Make unknown/ambiguous hops visually distinct from resolved nodes. + +### Longer-term history + +Goal: logical and geographic map of the MC mesh over hours/days/weeks. + +- Query aggregate edges from `GET /api/meshcore/path-tracing/edges/`. +- Support time windows and constellation filters. +- Show two modes: + - **Geographic:** only resolved nodes/feeders with positions. + - **Logical:** graph layout that can include unresolved hash nodes. +- Avoid labeling passive weights as traceroute success. Use wording like "observed packet path weight" or "passive path observations". + +### Important routers / centrality + +Mirror the useful Meshtastic analytics, but relabel the semantics for passive evidence: + + +| Metric | MC passive interpretation | +| ---------------------- | ------------------------------------------------------------------------------- | +| Degree | Number of distinct neighboring path entities in rollups | +| Weighted degree | Sum of passive path observations touching this entity | +| Betweenness centrality | How often a resolved node/hash sits between other entities in the passive graph | +| PageRank / eigenvector | Relative importance in the passive path graph | +| Feeder reach | Distinct path entities/packets heard by one feeder, not success rate | + + +Unresolved hash nodes may rank highly. The UI should expose them as "unresolved path hash" until resolution lands. + +## Scale controls + +Passive MC path data can be much higher volume than active MT traceroutes. Required controls: + +1. Ingest stores only compact raw `path_hashes` on observations. +2. Realtime WS events are sampled, coalesced, or capped per feeder/channel if volume spikes. +3. Rollups process observations in batches and checkpoint by `upload_time` plus primary key. +4. Aggregate tables are bucketed and indexed by time, constellation, endpoint keys, and resolution status. +5. Neo4j export is async and idempotent from rollups. +6. Optional raw edge observation tables are short-retention only. + +## Ownership + + +| Area | Owner | +| ------------------------------------------- | -------------------------------------------------------------- | +| Raw packet and observation capture | `meshcore_packets` | +| Segment resolution and passive edge rollups | `meshcore_path` (new) or bounded `meshcore_packets` package | +| Neo4j export/query implementation | `traceroute_analytics` or a renamed topology analytics package | +| Realtime path WebSocket events | `ws` plus the MeshCore passive path service boundary | +| UI realtime/history pages | `meshflow-ui` MeshCore section | +| Active MeshCore traceroute | Separate future ADR | + + +## Milestones + +### Milestone 1 - minimum viable passive path (target first deliverable) + +A deliberately small slice that is useful on its own and avoids a big-bang platform jump: + +1. **Capture expansion:** persist `path_hash_size` and `path_hash_mode` on observations; ingest PATH `rx_log_data` frames that carry path data but no business message. +2. **Resolution table (unknowns only):** add `MeshCorePathSegmentResolution` keyed by `(hash_mode, hash_size, segment_hash)`, populated with `status = unknown`. No proactive resolver yet. +3. **Hourly hash-to-hash rollup:** add `MeshCorePathEdgeBucket` + a Celery rollup that emits ordered hash-chain edges (plus observer dimension) per [Edge semantics](#edge-semantics-what-an-edge-means); checkpointed and idempotent; backfill via management command. +4. **Read-only edges API:** `GET /api/meshcore/path-tracing/edges/` over buckets, documented in OpenAPI. Direction labelled as "list order". + +Milestone 1 ships honest, scalable, hash-only evidence with no resolver, no Neo4j, no WS, and no new write endpoints. + +### Later milestones + +1. **Spike:** confirm/deny whether `path_hash_mode`, `path_update`, or `trace_data` enable a proven segment → identity binding; update [traceroute ADR-0001](../../../traceroute/adr/0001-mc-path-hash-resolution.md) and this ADR. +2. **Proactive resolver:** only if the spike yields a proven rule; backfill via management command; tests reject heuristics. +3. **Neo4j export:** passive MC graph export from rollups, with protocol/evidence-separated relationships. +4. **Realtime:** recent buffer, post-commit best-effort WS event, recent endpoint. +5. **History/centrality API:** node and router/centrality endpoints. +6. **UI:** realtime short-term display, then geographic/logical history map and router panels. + +## Consequences + +Positive: + +- MC path work can scale independently of active MT traceroute lifecycle. +- We keep raw evidence honest while progressively resolving hashes to nodes. +- UI can deliver both immediate operational visibility and long-term topology maps. +- Neo4j can support MC centrality without polluting MT `AutoTraceRoute` semantics. + +Tradeoffs: + +- More subsystem surface area than reusing `AutoTraceRoute`. +- Some analytics will initially include unresolved hash nodes. +- Existing MT heatmap APIs cannot be reused unchanged because they assume integer Meshtastic node IDs and active traceroute semantics. + +## Open questions + +- Should the new Django app be named `meshcore_path`, `meshcore_packet_path`, or stay inside `meshcore_packets`? (Prefer a name without "trace"/"traceroute" to avoid implying an active probe.) [DECISION: `meshcore_packet_path`] +- What is the meaning of `path_hash_mode`, and does it change how a segment must be interpreted (and therefore keyed)? +- Should passive MC Neo4j data use a new relationship type (`PATH_OBSERVED`) or a shared relationship with `evidence` and `protocol` properties? [DECISION: new `PATH_OBSERVED` relationship type] +- What retention period should apply to raw recent path buffers and optional raw edge observation rows? [DECISION: 6 months - ensure any impl plan includes a celery job to evict old data] +- Which centrality metrics are worth computing in Postgres vs Neo4j? + diff --git a/docs/features/meshcore/packet-path-tracing/packet-path-tracing-outstanding.md b/docs/features/meshcore/packet-path-tracing/packet-path-tracing-outstanding.md new file mode 100644 index 0000000..99147b8 --- /dev/null +++ b/docs/features/meshcore/packet-path-tracing/packet-path-tracing-outstanding.md @@ -0,0 +1,37 @@ +# MeshCore passive packet path — outstanding + +Items **skipped**, **incomplete**, or **discovered during planning** — not the milestone backlog (that lives in the plan and [#267](https://github.com/pskillen/meshflow-api/issues/267) sub-issues). + +**Tracking:** [meshflow-api#267](https://github.com/pskillen/meshflow-api/issues/267) + +--- + +## Open decisions (gate later milestones) + +- [ ] **Meaning of `path_hash_mode`** — does it change how a segment is derived/interpreted, and therefore whether `(hash_mode, hash_size, segment_hash)` is the correct identity key? Resolved by the M2 spike, informed by the M1 diagnostic UI (segment distribution by mode/size). See [ADR-0001 segment identity](adr/0001-meshcore-packet-path-tracing-subsystem.md). +- [ ] **Centrality compute location** — Postgres vs Neo4j for router/centrality metrics (M6). Resolved by/after M2 + M4. + +**Note:** M1 now ships a read/annotate segments API and a diagnostic UI MVP specifically so these M2 decisions can be made from observed data rather than in the abstract. + +--- + +## Carried from prior passive slice + +- [ ] **Proven hash → `ObservedNode` matcher** — still unproven; no production matcher until [traceroute ADR-0001 §A](../../traceroute/adr/0001-mc-path-hash-resolution.md) documents a safe rule. Gates M3. Tests must reject suffix/prefix/recency heuristics. +- [ ] **`resolved_path` on `GET /api/meshcore/packets/`** — deferred from #360 (message API only). Optional; revisit alongside the edges API. +- [ ] **Upload `rx_log_data` PATH-only frames** — bot still skips non-ADVERT `rx_log_data`; needed for relays with `path_len > 0` and no business message (M1 capture / bot follow-up of [#119](https://github.com/pskillen/meshflow-bot/issues/119)). + +--- + +## Capture gaps to confirm during M1/M2 + +- [ ] `path_hash_size` / `path_hash_mode` not yet persisted on `MeshCorePacketObservation` (being addressed in M1 capture, api + bot). +- [ ] `path_update` carries `public_key` only (no path hash in captures) — capture for possible future binding, not as a current resolver source. +- [ ] `trace_data` relationship to path hashes / active traces unconfirmed (M2 spike). + +--- + +## Cross-links + +- [ ] Update [#267](https://github.com/pskillen/meshflow-api/issues/267) epic and this file as milestones land. +- [ ] Keep [traceroute/meshcore-path-outstanding.md](../../traceroute/meshcore-path-outstanding.md) pointed here for the active-vs-passive split. diff --git a/docs/features/meshcore/packet-path-tracing/packet-path-tracing-progress.md b/docs/features/meshcore/packet-path-tracing/packet-path-tracing-progress.md new file mode 100644 index 0000000..d287af9 --- /dev/null +++ b/docs/features/meshcore/packet-path-tracing/packet-path-tracing-progress.md @@ -0,0 +1,62 @@ +# MeshCore passive packet path — progress + +**Tracking:** [meshflow-api#267](https://github.com/pskillen/meshflow-api/issues/267) (MeshCore Phase 3 path parity epic) +**Plan:** Cursor plan `MC passive path M1` (detailed M1) + `MC packet path milestones` (overview) + [ADR-0001](adr/0001-meshcore-packet-path-tracing-subsystem.md) +**Repos:** meshflow-api (primary), meshflow-bot, meshflow-ui + +--- + +## Overall status + +**Status:** In progress + +**Branch:** `api-372/pskillen/meshcore-passive-path-m1` (meshflow-api, meshflow-bot, meshflow-ui) + +The display-only passive slice that precedes this subsystem has **shipped** (see Precursor below). M1 implementation in flight (capture, `meshcore_packet_path` app, rollup/eviction, edges/segments API, diagnostic UI). + +**Locked decisions (ADR-0001 open questions):** + +- App name: `meshcore_packet_path`. +- Neo4j: new `PATH_OBSERVED` relationship type (separate from `ROUTED_TO`). +- Retention: 6 months; every persisting milestone ships a Celery eviction job. +- Open until M2 spike: meaning of `path_hash_mode`; centrality in Postgres vs Neo4j. + +--- + +## Precursor — display-only passive slice (shipped) + +**Status:** Complete + +| Issue | Repo | Delivered | +| --- | --- | --- | +| [#369](https://github.com/pskillen/meshflow-api/issues/369) | api | `path_hashes` moved to `MeshCorePacketObservation` only; `update_or_create` per `(packet, observer)` | +| [#360](https://github.com/pskillen/meshflow-api/issues/360) | api | Message `heard[]` exposes `path_hashes`, display-only `resolved_path` (`status=unknown`), `path_known=false` | +| [#119](https://github.com/pskillen/meshflow-bot/issues/119) | bot | `_path_hashes()` forwards segments on ingest | +| [#304](https://github.com/pskillen/meshflow-ui/issues/304) | ui | `HeardPathMap` + heard dialog renders hops/feeders | + +These provide the raw capture (`MeshCorePacketObservation.path_hashes`) and read-time formatter (`meshcore_packets/services/path_resolution.py`) the new subsystem builds on. + +--- + +## Milestones + +Tracked as sub-issues of [#267](https://github.com/pskillen/meshflow-api/issues/267); see [ADR-0001](adr/0001-meshcore-packet-path-tracing-subsystem.md) for scope. This file records what ships; do not duplicate milestone detail here. + +| Milestone | Issue | Status | +| --- | --- | --- | +| M1 MVP (capture + resolution table + hourly rollup + eviction + edges/segments API + diagnostic UI) | [#372](https://github.com/pskillen/meshflow-api/issues/372) | In progress | +| M2 resolution spike (decision gate) | [#373](https://github.com/pskillen/meshflow-api/issues/373) | Not started | +| M3 proactive resolver (conditional on M2) | [#374](https://github.com/pskillen/meshflow-api/issues/374) | Not started | +| M4 Neo4j `PATH_OBSERVED` export | [#375](https://github.com/pskillen/meshflow-api/issues/375) | Not started | +| M5 realtime WS + recent API | [#376](https://github.com/pskillen/meshflow-api/issues/376) | Not started | +| M6 history / centrality API | [#377](https://github.com/pskillen/meshflow-api/issues/377) | Not started | +| M7 UI realtime + history/topology | [meshflow-ui#309](https://github.com/pskillen/meshflow-ui/issues/309) | Not started | + +**M1 scope note.** M1 was expanded to also ship a read/annotate API (`/meshcore/path-tracing/segments/` with staff manual annotation) and a **diagnostic UI MVP** (data tables, not the M7 map) so the user has enough visibility into captured passive data to make informed M2 decisions. M1 therefore spans all three repos. The full topology/realtime UI remains M7 (meshflow-ui#309). + +--- + +## Next + +- Execute M1 per the detailed Cursor plan `MC passive path M1`. +- Branch from latest `origin/main` as `api-372/pskillen/meshcore-passive-path-m1` in meshflow-api, meshflow-bot, and meshflow-ui; atomic conventional commits; PRs via github-personal MCP. diff --git a/docs/features/traceroute/README.md b/docs/features/traceroute/README.md index 4afca17..7a0b12f 100644 --- a/docs/features/traceroute/README.md +++ b/docs/features/traceroute/README.md @@ -6,7 +6,7 @@ The traceroute feature tracks path discovery between Meshtastic nodes on the mes MeshCore does not use Meshtastic `TRACEROUTE_APP` / numeric hop lists on the wire. Phase 3 work ([#267](https://github.com/pskillen/meshflow-api/issues/267)) splits into: -- **Passive path** — repeater `path_hashes` on forwarded packets (ingest → per-feeder observations → display hops → UI). ADR: [adr/0001-mc-path-hash-resolution.md](adr/0001-mc-path-hash-resolution.md). Tracking: [meshcore-path-progress.md](meshcore-path-progress.md), [meshcore-path-outstanding.md](meshcore-path-outstanding.md). Ingest: [packet-ingestion/meshcore.md](../packet-ingestion/meshcore.md). +- **Passive path** — repeater `path_hashes` on forwarded packets (ingest → per-feeder observations → display hops → UI). ADR: [adr/0001-mc-path-hash-resolution.md](adr/0001-mc-path-hash-resolution.md). Proposed subsystem: [meshcore/packet-path-tracing](../meshcore/packet-path-tracing/). Tracking: [meshcore-path-progress.md](meshcore-path-progress.md), [meshcore-path-outstanding.md](meshcore-path-outstanding.md). Ingest: [packet-ingestion/meshcore.md](../packet-ingestion/meshcore.md). - **Active traceroute** (later) — MC analog of `AutoTraceRoute`, scheduler protocol guards, Neo4j edges labelled by protocol. Meshtastic sections below remain the reference for the **active** traceroute system today. diff --git a/docs/features/traceroute/meshcore-path-outstanding.md b/docs/features/traceroute/meshcore-path-outstanding.md index 11c584e..2f1d274 100644 --- a/docs/features/traceroute/meshcore-path-outstanding.md +++ b/docs/features/traceroute/meshcore-path-outstanding.md @@ -8,6 +8,7 @@ Items **skipped**, **incomplete**, or **discovered during planning** for [#267]( - [x] **#369** — observation-only `path_hashes` (API branch `api-267/pskillen/meshcore-path`). - [x] **#360 (narrowed)** — message `heard[]` path display + positions (not packet list API). +- [ ] **Passive packet path subsystem** — proposed in [meshcore/packet-path-tracing ADR-0001](../meshcore/packet-path-tracing/adr/0001-meshcore-packet-path-tracing-subsystem.md): passive edge rollups, resolution table, Neo4j export, realtime/history UI. Progress/outstanding now tracked under [packet-path-tracing/](../meshcore/packet-path-tracing/packet-path-tracing-progress.md). - [ ] **Proven matcher** — implement hash segment → `ObservedNode` when [ADR §A](adr/0001-mc-path-hash-resolution.md) documents safe rules (no `iendswith` / `last_heard` heuristics in v1). - [ ] **`GET /meshcore/packets/`** — optional `resolved_path` on packet list/detail (deferred). - [ ] **#304** — UI `HeardPathMap` + heard dialog (meshflow-ui). From 6fdaedf95f655d86edef7f0daeca499946fd84a3 Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Mon, 1 Jun 2026 10:15:33 +0100 Subject: [PATCH 02/12] feat(meshcore): persist path_hash_size and path_hash_mode on observation --- .../0004_observation_path_hash_size_mode.py | 23 +++++++++ Meshflow/meshcore_packets/models.py | 2 + Meshflow/meshcore_packets/serializers.py | 4 ++ .../tests/test_path_hashes_observation.py | 51 +++++++++++++++++++ 4 files changed, 80 insertions(+) create mode 100644 Meshflow/meshcore_packets/migrations/0004_observation_path_hash_size_mode.py diff --git a/Meshflow/meshcore_packets/migrations/0004_observation_path_hash_size_mode.py b/Meshflow/meshcore_packets/migrations/0004_observation_path_hash_size_mode.py new file mode 100644 index 0000000..cf16a60 --- /dev/null +++ b/Meshflow/meshcore_packets/migrations/0004_observation_path_hash_size_mode.py @@ -0,0 +1,23 @@ +# Generated by Django 5.2.14 on 2026-06-01 09:15 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('meshcore_packets', '0003_remove_meshcorerawpacket_path_hashes'), + ] + + operations = [ + migrations.AddField( + model_name='meshcorepacketobservation', + name='path_hash_mode', + field=models.PositiveSmallIntegerField(blank=True, null=True), + ), + migrations.AddField( + model_name='meshcorepacketobservation', + name='path_hash_size', + field=models.PositiveSmallIntegerField(blank=True, null=True), + ), + ] diff --git a/Meshflow/meshcore_packets/models.py b/Meshflow/meshcore_packets/models.py index c359172..e14e745 100644 --- a/Meshflow/meshcore_packets/models.py +++ b/Meshflow/meshcore_packets/models.py @@ -93,6 +93,8 @@ class MeshCorePacketObservation(models.Model): rx_rssi = models.FloatField(null=True, blank=True) rx_snr = models.FloatField(null=True, blank=True) path_hashes = models.JSONField(null=True, blank=True) + path_hash_size = models.PositiveSmallIntegerField(null=True, blank=True) + path_hash_mode = models.PositiveSmallIntegerField(null=True, blank=True) upload_time = models.DateTimeField(default=timezone.now) class Meta: diff --git a/Meshflow/meshcore_packets/serializers.py b/Meshflow/meshcore_packets/serializers.py index c468c27..e8a1396 100644 --- a/Meshflow/meshcore_packets/serializers.py +++ b/Meshflow/meshcore_packets/serializers.py @@ -43,6 +43,8 @@ class MeshCorePacketIngestSerializer(serializers.Serializer): to_pubkey_prefix = serializers.CharField(required=False, allow_null=True, allow_blank=True) route_typename = serializers.CharField(required=False, allow_null=True, allow_blank=True) path_hashes = serializers.JSONField(required=False, allow_null=True) + path_hash_size = serializers.IntegerField(required=False, allow_null=True, min_value=1, max_value=8) + path_hash_mode = serializers.IntegerField(required=False, allow_null=True, min_value=0, max_value=255) channel_idx = serializers.IntegerField(required=False, allow_null=True) text = serializers.CharField(required=False, allow_null=True, allow_blank=True) adv_name = serializers.CharField(required=False, allow_null=True, allow_blank=True) @@ -139,6 +141,8 @@ def _ensure_observation(self, packet, observer, validated_data, rx_time): "rx_rssi": validated_data.get("rx_rssi"), "rx_snr": validated_data.get("rx_snr"), "path_hashes": path_hashes, + "path_hash_size": validated_data.get("path_hash_size"), + "path_hash_mode": validated_data.get("path_hash_mode"), }, ) self.observation = obs diff --git a/Meshflow/meshcore_packets/tests/test_path_hashes_observation.py b/Meshflow/meshcore_packets/tests/test_path_hashes_observation.py index 52d3ca7..b50d39c 100644 --- a/Meshflow/meshcore_packets/tests/test_path_hashes_observation.py +++ b/Meshflow/meshcore_packets/tests/test_path_hashes_observation.py @@ -91,3 +91,54 @@ def test_observation_path_hashes_updated_on_reingest(ingest_client, meshcore_fee packet = MeshCoreRawPacket.objects.get(pkt_hash=77702) obs = MeshCorePacketObservation.objects.get(packet=packet, observer=meshcore_feeder["node"]) assert obs.path_hashes == ["22", "33"] + + +@pytest.mark.django_db +def test_observation_path_hash_size_and_mode_persisted(ingest_client, meshcore_feeder): + now = timezone.now() + url = feeder_url("meshcore-feeder-packet-ingest", FEEDER_MC_PUBKEY_PREFIX) + payload = { + "event_type": "channel_message", + "payload_type": "channel_text", + "pkt_hash": 77703, + "rx_time": now.timestamp(), + "text": "path meta", + "channel_idx": 0, + "path_hashes": ["f3", "bc"], + "path_hash_size": 1, + "path_hash_mode": 2, + "raw": {}, + } + ingest_client.post(url, payload, format="json") + packet = MeshCoreRawPacket.objects.get(pkt_hash=77703) + obs = MeshCorePacketObservation.objects.get(packet=packet, observer=meshcore_feeder["node"]) + assert obs.path_hash_size == 1 + assert obs.path_hash_mode == 2 + + ingest_client.post( + url, + {**payload, "path_hash_size": 3, "path_hash_mode": 4}, + format="json", + ) + obs.refresh_from_db() + assert obs.path_hash_size == 3 + assert obs.path_hash_mode == 4 + + +@pytest.mark.django_db +def test_observation_path_hash_size_mode_null_when_absent(ingest_client, meshcore_feeder): + now = timezone.now() + url = feeder_url("meshcore-feeder-packet-ingest", FEEDER_MC_PUBKEY_PREFIX) + payload = { + "event_type": "advertisement", + "payload_type": "advert", + "from_pubkey": FULL_PUBKEY, + "pkt_hash": 77704, + "rx_time": now.timestamp(), + "raw": {}, + } + ingest_client.post(url, payload, format="json") + packet = MeshCoreRawPacket.objects.get(pkt_hash=77704) + obs = MeshCorePacketObservation.objects.get(packet=packet, observer=meshcore_feeder["node"]) + assert obs.path_hash_size is None + assert obs.path_hash_mode is None From 11f35034e6b570cfaaa825eddcf01181f7f46873 Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Mon, 1 Jun 2026 10:19:56 +0100 Subject: [PATCH 03/12] feat(meshcore-path): scaffold meshcore_packet_path app with resolution and edge-bucket models --- Meshflow/Meshflow/celery.py | 12 +- Meshflow/Meshflow/settings/base.py | 3 + Meshflow/meshcore_packet_path/__init__.py | 0 Meshflow/meshcore_packet_path/admin.py | 36 +++++ Meshflow/meshcore_packet_path/apps.py | 7 + .../migrations/0001_initial.py | 70 +++++++++ Meshflow/meshcore_packet_path/models.py | 142 ++++++++++++++++++ 7 files changed, 269 insertions(+), 1 deletion(-) create mode 100644 Meshflow/meshcore_packet_path/__init__.py create mode 100644 Meshflow/meshcore_packet_path/admin.py create mode 100644 Meshflow/meshcore_packet_path/apps.py create mode 100644 Meshflow/meshcore_packet_path/migrations/0001_initial.py create mode 100644 Meshflow/meshcore_packet_path/models.py diff --git a/Meshflow/Meshflow/celery.py b/Meshflow/Meshflow/celery.py index eaf7e93..8937e29 100644 --- a/Meshflow/Meshflow/celery.py +++ b/Meshflow/Meshflow/celery.py @@ -8,4 +8,14 @@ app = Celery("Meshflow") app.config_from_object("django.conf:settings", namespace="CELERY") -app.autodiscover_tasks(["traceroute", "stats", "mesh_monitoring", "rf_propagation", "nodes", "dx_monitoring"]) +app.autodiscover_tasks( + [ + "traceroute", + "stats", + "mesh_monitoring", + "rf_propagation", + "nodes", + "dx_monitoring", + "meshcore_packet_path", + ] +) diff --git a/Meshflow/Meshflow/settings/base.py b/Meshflow/Meshflow/settings/base.py index 0099fed..7957774 100644 --- a/Meshflow/Meshflow/settings/base.py +++ b/Meshflow/Meshflow/settings/base.py @@ -76,6 +76,7 @@ "nodes", "packets", "meshcore_packets", + "meshcore_packet_path", "stats", "text_messages", "traceroute", @@ -215,6 +216,8 @@ def _rf_env_bool(name: str, default: bool) -> bool: # Per-node retention: how many ``ready`` renders to keep on disk before GC. RF_PROPAGATION_READY_RETENTION = int(os.environ.get("RF_PROPAGATION_READY_RETENTION", "3")) +MESHCORE_PATH_RETENTION_DAYS = int(os.environ.get("MESHCORE_PATH_RETENTION_DAYS", "183")) + # Django cache (Redis DB 2; channels use DB 0, Celery broker DB 1) _cache_url = f"redis://:{_redis_password}@{_redis_host}:{_redis_port}/2" CACHES = { diff --git a/Meshflow/meshcore_packet_path/__init__.py b/Meshflow/meshcore_packet_path/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Meshflow/meshcore_packet_path/admin.py b/Meshflow/meshcore_packet_path/admin.py new file mode 100644 index 0000000..94a714b --- /dev/null +++ b/Meshflow/meshcore_packet_path/admin.py @@ -0,0 +1,36 @@ +from django.contrib import admin + +from meshcore_packet_path.models import MeshCorePathEdgeBucket, MeshCorePathSegmentResolution + + +@admin.register(MeshCorePathSegmentResolution) +class MeshCorePathSegmentResolutionAdmin(admin.ModelAdmin): + list_display = ( + "segment_hash", + "hash_size", + "hash_mode", + "status", + "source", + "observed_node", + "last_seen_at", + ) + list_filter = ("status", "hash_size", "hash_mode", "source") + search_fields = ("segment_hash", "observed_node__long_name") + readonly_fields = ("id", "first_seen_at") + date_hierarchy = "last_seen_at" + + +@admin.register(MeshCorePathEdgeBucket) +class MeshCorePathEdgeBucketAdmin(admin.ModelAdmin): + list_display = ( + "bucket_start", + "from_hash", + "to_hash", + "observer", + "packet_count", + "observation_count", + "last_seen_at", + ) + list_filter = ("bucket_size", "from_kind", "to_kind", "constellation") + search_fields = ("from_hash", "to_hash") + date_hierarchy = "bucket_start" diff --git a/Meshflow/meshcore_packet_path/apps.py b/Meshflow/meshcore_packet_path/apps.py new file mode 100644 index 0000000..ff90273 --- /dev/null +++ b/Meshflow/meshcore_packet_path/apps.py @@ -0,0 +1,7 @@ +from django.apps import AppConfig + + +class MeshcorePacketPathConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "meshcore_packet_path" + verbose_name = "MeshCore packet path" diff --git a/Meshflow/meshcore_packet_path/migrations/0001_initial.py b/Meshflow/meshcore_packet_path/migrations/0001_initial.py new file mode 100644 index 0000000..e2d7e79 --- /dev/null +++ b/Meshflow/meshcore_packet_path/migrations/0001_initial.py @@ -0,0 +1,70 @@ +# Generated by Django 5.2.14 on 2026-06-01 09:17 + +import django.db.models.deletion +import django.utils.timezone +import uuid +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ('constellations', '0011_remove_constellationusermembership'), + ('nodes', '0050_managednode_protocol_identity'), + ] + + operations = [ + migrations.CreateModel( + name='MeshCorePathEdgeBucket', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('bucket_start', models.DateTimeField(db_index=True)), + ('bucket_size', models.CharField(default='1h', max_length=8)), + ('from_kind', models.CharField(choices=[('hash', 'Hash'), ('node', 'Node'), ('feeder', 'Feeder'), ('unknown', 'Unknown')], default='hash', max_length=16)), + ('to_kind', models.CharField(choices=[('hash', 'Hash'), ('node', 'Node'), ('feeder', 'Feeder'), ('unknown', 'Unknown')], default='hash', max_length=16)), + ('from_hash', models.CharField(blank=True, default='', max_length=32)), + ('to_hash', models.CharField(blank=True, default='', max_length=32)), + ('packet_count', models.PositiveIntegerField(default=0)), + ('observation_count', models.PositiveIntegerField(default=0)), + ('first_seen_at', models.DateTimeField(blank=True, null=True)), + ('last_seen_at', models.DateTimeField(blank=True, null=True)), + ('avg_snr', models.FloatField(blank=True, null=True)), + ('min_snr', models.FloatField(blank=True, null=True)), + ('max_snr', models.FloatField(blank=True, null=True)), + ('constellation', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='meshcore_path_edge_buckets', to='constellations.constellation')), + ('from_node', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='meshcore_path_edges_from', to='nodes.observednode')), + ('observer', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='meshcore_path_edge_buckets', to='nodes.managednode')), + ('to_node', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='meshcore_path_edges_to', to='nodes.observednode')), + ], + options={ + 'verbose_name': 'MeshCore path edge bucket', + 'verbose_name_plural': 'MeshCore path edge buckets', + 'indexes': [models.Index(fields=['-bucket_start'], name='meshcore_pa_bucket__9276cc_idx'), models.Index(fields=['from_hash', 'to_hash'], name='meshcore_pa_from_ha_671731_idx')], + 'constraints': [models.UniqueConstraint(fields=('bucket_start', 'bucket_size', 'from_kind', 'to_kind', 'from_hash', 'to_hash', 'observer', 'constellation'), name='meshcore_path_edge_bucket_unique', nulls_distinct=False)], + }, + ), + migrations.CreateModel( + name='MeshCorePathSegmentResolution', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('segment_hash', models.CharField(db_index=True, max_length=32)), + ('hash_size', models.PositiveSmallIntegerField(blank=True, null=True)), + ('hash_mode', models.PositiveSmallIntegerField(blank=True, null=True)), + ('status', models.CharField(choices=[('unknown', 'Unknown'), ('resolved', 'Resolved'), ('ambiguous', 'Ambiguous'), ('stale', 'Stale')], db_index=True, default='unknown', max_length=16)), + ('source', models.CharField(blank=True, default='', max_length=32)), + ('resolver_version', models.PositiveIntegerField(default=1)), + ('confidence', models.FloatField(blank=True, null=True)), + ('first_seen_at', models.DateTimeField(default=django.utils.timezone.now)), + ('last_seen_at', models.DateTimeField(default=django.utils.timezone.now)), + ('observed_node', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='meshcore_path_segments', to='nodes.observednode')), + ], + options={ + 'verbose_name': 'MeshCore path segment resolution', + 'verbose_name_plural': 'MeshCore path segment resolutions', + 'indexes': [models.Index(fields=['-last_seen_at'], name='meshcore_pa_last_se_c35b9b_idx'), models.Index(fields=['status', '-last_seen_at'], name='meshcore_pa_status_6b0dba_idx')], + 'constraints': [models.UniqueConstraint(fields=('hash_mode', 'hash_size', 'segment_hash'), name='meshcore_path_segment_identity_unique')], + }, + ), + ] diff --git a/Meshflow/meshcore_packet_path/models.py b/Meshflow/meshcore_packet_path/models.py new file mode 100644 index 0000000..da27154 --- /dev/null +++ b/Meshflow/meshcore_packet_path/models.py @@ -0,0 +1,142 @@ +"""Passive MeshCore packet path evidence (hash chains, segment resolution, edge rollups).""" + +import uuid + +from django.db import models +from django.utils import timezone +from django.utils.translation import gettext_lazy as _ + +from constellations.models import Constellation +from nodes.models import ManagedNode, ObservedNode + + +class SegmentStatus(models.TextChoices): + UNKNOWN = "unknown", _("Unknown") + RESOLVED = "resolved", _("Resolved") + AMBIGUOUS = "ambiguous", _("Ambiguous") + STALE = "stale", _("Stale") + + +class EdgeKind(models.TextChoices): + HASH = "hash", _("Hash") + NODE = "node", _("Node") + FEEDER = "feeder", _("Feeder") + UNKNOWN = "unknown", _("Unknown") + + +class MeshCorePathSegmentResolution(models.Model): + """One resolvable path segment identity (hash_mode, hash_size, segment_hash).""" + + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + segment_hash = models.CharField(max_length=32, db_index=True) + hash_size = models.PositiveSmallIntegerField(null=True, blank=True) + hash_mode = models.PositiveSmallIntegerField(null=True, blank=True) + observed_node = models.ForeignKey( + ObservedNode, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="meshcore_path_segments", + ) + status = models.CharField( + max_length=16, + choices=SegmentStatus.choices, + default=SegmentStatus.UNKNOWN, + db_index=True, + ) + source = models.CharField(max_length=32, blank=True, default="") + resolver_version = models.PositiveIntegerField(default=1) + confidence = models.FloatField(null=True, blank=True) + first_seen_at = models.DateTimeField(default=timezone.now) + last_seen_at = models.DateTimeField(default=timezone.now) + + class Meta: + constraints = [ + models.UniqueConstraint( + fields=["hash_mode", "hash_size", "segment_hash"], + name="meshcore_path_segment_identity_unique", + ), + ] + indexes = [ + models.Index(fields=["-last_seen_at"]), + models.Index(fields=["status", "-last_seen_at"]), + ] + verbose_name = _("MeshCore path segment resolution") + verbose_name_plural = _("MeshCore path segment resolutions") + + def __str__(self): + return f"{self.segment_hash} ({self.status})" + + +class MeshCorePathEdgeBucket(models.Model): + """Hourly (or other) rollup of passive hash-chain edges.""" + + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + bucket_start = models.DateTimeField(db_index=True) + bucket_size = models.CharField(max_length=8, default="1h") + from_kind = models.CharField(max_length=16, choices=EdgeKind.choices, default=EdgeKind.HASH) + to_kind = models.CharField(max_length=16, choices=EdgeKind.choices, default=EdgeKind.HASH) + from_hash = models.CharField(max_length=32, blank=True, default="") + to_hash = models.CharField(max_length=32, blank=True, default="") + from_node = models.ForeignKey( + ObservedNode, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="meshcore_path_edges_from", + ) + to_node = models.ForeignKey( + ObservedNode, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="meshcore_path_edges_to", + ) + observer = models.ForeignKey( + ManagedNode, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="meshcore_path_edge_buckets", + ) + constellation = models.ForeignKey( + Constellation, + on_delete=models.SET_NULL, + null=True, + blank=True, + related_name="meshcore_path_edge_buckets", + ) + packet_count = models.PositiveIntegerField(default=0) + observation_count = models.PositiveIntegerField(default=0) + first_seen_at = models.DateTimeField(null=True, blank=True) + last_seen_at = models.DateTimeField(null=True, blank=True) + avg_snr = models.FloatField(null=True, blank=True) + min_snr = models.FloatField(null=True, blank=True) + max_snr = models.FloatField(null=True, blank=True) + + class Meta: + constraints = [ + models.UniqueConstraint( + fields=[ + "bucket_start", + "bucket_size", + "from_kind", + "to_kind", + "from_hash", + "to_hash", + "observer", + "constellation", + ], + name="meshcore_path_edge_bucket_unique", + nulls_distinct=False, + ), + ] + indexes = [ + models.Index(fields=["-bucket_start"]), + models.Index(fields=["from_hash", "to_hash"]), + ] + verbose_name = _("MeshCore path edge bucket") + verbose_name_plural = _("MeshCore path edge buckets") + + def __str__(self): + return f"{self.from_hash}->{self.to_hash} @ {self.bucket_start}" From cec9f570d9c43abe35d5e8a1f82031af9ccfaa85 Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Mon, 1 Jun 2026 10:19:56 +0100 Subject: [PATCH 04/12] feat(meshcore-path): add hourly hash-chain edge rollup task --- .../meshcore_packet_path/services/__init__.py | 0 .../meshcore_packet_path/services/rollup.py | 194 ++++++++++++++++++ Meshflow/meshcore_packet_path/tasks.py | 51 +++++ .../meshcore_packet_path/tests/conftest.py | 31 +++ .../meshcore_packet_path/tests/test_models.py | 56 +++++ .../meshcore_packet_path/tests/test_rollup.py | 117 +++++++++++ 6 files changed, 449 insertions(+) create mode 100644 Meshflow/meshcore_packet_path/services/__init__.py create mode 100644 Meshflow/meshcore_packet_path/services/rollup.py create mode 100644 Meshflow/meshcore_packet_path/tasks.py create mode 100644 Meshflow/meshcore_packet_path/tests/conftest.py create mode 100644 Meshflow/meshcore_packet_path/tests/test_models.py create mode 100644 Meshflow/meshcore_packet_path/tests/test_rollup.py diff --git a/Meshflow/meshcore_packet_path/services/__init__.py b/Meshflow/meshcore_packet_path/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Meshflow/meshcore_packet_path/services/rollup.py b/Meshflow/meshcore_packet_path/services/rollup.py new file mode 100644 index 0000000..b3cbb99 --- /dev/null +++ b/Meshflow/meshcore_packet_path/services/rollup.py @@ -0,0 +1,194 @@ +"""Roll up MeshCore packet observations into path edge buckets and segment rows.""" + +from __future__ import annotations + +from collections import defaultdict +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from typing import Any + +from django.utils import timezone + +from meshcore_packet_path.models import ( + EdgeKind, + MeshCorePathEdgeBucket, + MeshCorePathSegmentResolution, + SegmentStatus, +) +from meshcore_packets.models import MeshCorePacketObservation + + +def _normalize_hash(segment: str) -> str: + return str(segment).strip().lower() + + +@dataclass +class _EdgeAgg: + packet_ids: set = field(default_factory=set) + observation_count: int = 0 + snr_values: list[float] = field(default_factory=list) + first_seen_at: datetime | None = None + last_seen_at: datetime | None = None + + def add(self, packet_id, seen_at: datetime, snr: float | None) -> None: + self.packet_ids.add(packet_id) + self.observation_count += 1 + if snr is not None: + self.snr_values.append(float(snr)) + if self.first_seen_at is None or seen_at < self.first_seen_at: + self.first_seen_at = seen_at + if self.last_seen_at is None or seen_at > self.last_seen_at: + self.last_seen_at = seen_at + + +def touch_segment( + segment_hash: str, + *, + hash_size: int | None, + hash_mode: int | None, + seen_at: datetime, +) -> None: + """Create unknown segment row or bump last_seen_at without downgrading manual/resolved rows.""" + normalized = _normalize_hash(segment_hash) + seg, created = MeshCorePathSegmentResolution.objects.get_or_create( + hash_mode=hash_mode, + hash_size=hash_size, + segment_hash=normalized, + defaults={ + "status": SegmentStatus.UNKNOWN, + "source": "", + "first_seen_at": seen_at, + "last_seen_at": seen_at, + }, + ) + if created: + return + updates: dict[str, Any] = {} + if seen_at > seg.last_seen_at: + updates["last_seen_at"] = seen_at + if seen_at < seg.first_seen_at: + updates["first_seen_at"] = seen_at + if updates: + MeshCorePathSegmentResolution.objects.filter(pk=seg.pk).update(**updates) + + +def collect_path_edge_buckets_for_hour( + hour_start: datetime, + *, + skip_existing: bool = False, +) -> dict[str, int]: + """ + Aggregate hash-chain edges for observations with upload_time in [hour_start, hour_end). + + Returns counts: created, updated, skipped_hours, observations_processed. + """ + if hour_start.tzinfo is None: + hour_start = timezone.make_aware(hour_start) + hour_start = hour_start.replace(minute=0, second=0, microsecond=0) + hour_end = hour_start + timedelta(hours=1) + + if skip_existing: + if MeshCorePathEdgeBucket.objects.filter( + bucket_start=hour_start, + bucket_size="1h", + ).exists(): + return {"created": 0, "updated": 0, "skipped_hours": 1, "observations_processed": 0} + + edge_aggs: dict[tuple, _EdgeAgg] = defaultdict(_EdgeAgg) + observations_processed = 0 + + qs = ( + MeshCorePacketObservation.objects.filter( + upload_time__gte=hour_start, + upload_time__lt=hour_end, + ) + .exclude(path_hashes__isnull=True) + .select_related("observer", "observer__constellation", "packet") + .iterator(chunk_size=500) + ) + + for obs in qs: + segments = obs.path_hashes or [] + if not isinstance(segments, list) or len(segments) < 2: + continue + observations_processed += 1 + seen_at = obs.upload_time + hash_mode = obs.path_hash_mode + hash_size = obs.path_hash_size + observer_id = obs.observer_id + constellation_id = obs.observer.constellation_id if obs.observer_id else None + + for seg in segments: + touch_segment( + str(seg), + hash_size=hash_size, + hash_mode=hash_mode, + seen_at=seen_at, + ) + + normalized = [_normalize_hash(s) for s in segments] + for idx in range(len(normalized) - 1): + from_hash = normalized[idx] + to_hash = normalized[idx + 1] + key = ( + hour_start, + observer_id, + constellation_id, + from_hash, + to_hash, + ) + edge_aggs[key].add(obs.packet_id, seen_at, obs.rx_snr) + + created = 0 + updated = 0 + for (bucket_start, observer_id, constellation_id, from_hash, to_hash), agg in edge_aggs.items(): + snr_vals = agg.snr_values + defaults = { + "packet_count": len(agg.packet_ids), + "observation_count": agg.observation_count, + "first_seen_at": agg.first_seen_at, + "last_seen_at": agg.last_seen_at, + "min_snr": min(snr_vals) if snr_vals else None, + "max_snr": max(snr_vals) if snr_vals else None, + "avg_snr": sum(snr_vals) / len(snr_vals) if snr_vals else None, + } + _obj, was_created = MeshCorePathEdgeBucket.objects.update_or_create( + bucket_start=bucket_start, + bucket_size="1h", + from_kind=EdgeKind.HASH, + to_kind=EdgeKind.HASH, + from_hash=from_hash, + to_hash=to_hash, + observer_id=observer_id, + constellation_id=constellation_id, + defaults=defaults, + ) + if was_created: + created += 1 + else: + updated += 1 + + return { + "created": created, + "updated": updated, + "skipped_hours": 0, + "observations_processed": observations_processed, + } + + +def collect_path_edge_buckets_for_range( + start_hour: datetime, + end_hour: datetime, + *, + skip_existing: bool = False, +) -> dict[str, int]: + """Roll up each hour in [start_hour, end_hour).""" + totals = {"created": 0, "updated": 0, "skipped_hours": 0, "observations_processed": 0} + hour = start_hour.replace(minute=0, second=0, microsecond=0) + end = end_hour.replace(minute=0, second=0, microsecond=0) + while hour < end: + result = collect_path_edge_buckets_for_hour(hour, skip_existing=skip_existing) + for key in totals: + totals[key] += result.get(key, 0) + hour += timedelta(hours=1) + return totals diff --git a/Meshflow/meshcore_packet_path/tasks.py b/Meshflow/meshcore_packet_path/tasks.py new file mode 100644 index 0000000..bacf195 --- /dev/null +++ b/Meshflow/meshcore_packet_path/tasks.py @@ -0,0 +1,51 @@ +"""Celery tasks for MeshCore passive packet path rollups and retention.""" + +from datetime import timedelta + +from django.conf import settings +from django.utils import timezone + +from celery import shared_task + +from meshcore_packet_path.services.rollup import ( + collect_path_edge_buckets_for_hour, + collect_path_edge_buckets_for_range, +) + + +@shared_task +def collect_path_edge_buckets(): + """Hourly: rollup hash-chain edges for the completed previous hour.""" + current_hour = timezone.now().replace(minute=0, second=0, microsecond=0) + hour_start = current_hour - timedelta(hours=1) + return collect_path_edge_buckets_for_hour(hour_start) + + +@shared_task +def backfill_path_edge_buckets_task(hours: int = 24) -> dict: + """Backfill rollups for the last N hours (idempotent when skip_existing=True).""" + current_hour = timezone.now().replace(minute=0, second=0, microsecond=0) + start_hour = current_hour - timedelta(hours=hours) + return collect_path_edge_buckets_for_range(start_hour, current_hour, skip_existing=True) + + +@shared_task +def evict_old_path_data(): + """Delete path edge buckets (and stale unknown segments) older than retention.""" + from meshcore_packet_path.models import MeshCorePathEdgeBucket, MeshCorePathSegmentResolution, SegmentStatus + + days = int(getattr(settings, "MESHCORE_PATH_RETENTION_DAYS", 183)) + cutoff = timezone.now() - timedelta(days=days) + + buckets_deleted, _ = MeshCorePathEdgeBucket.objects.filter(bucket_start__lt=cutoff).delete() + segments_deleted, _ = MeshCorePathSegmentResolution.objects.filter( + last_seen_at__lt=cutoff, + status=SegmentStatus.UNKNOWN, + source="", + ).delete() + + return { + "buckets_deleted": buckets_deleted, + "segments_deleted": segments_deleted, + "cutoff": cutoff.isoformat(), + } diff --git a/Meshflow/meshcore_packet_path/tests/conftest.py b/Meshflow/meshcore_packet_path/tests/conftest.py new file mode 100644 index 0000000..52386a1 --- /dev/null +++ b/Meshflow/meshcore_packet_path/tests/conftest.py @@ -0,0 +1,31 @@ +"""Shared fixtures for meshcore_packet_path tests.""" + +from django.utils import timezone + +import pytest + +from meshcore_packets.models import MeshCorePacketObservation, MeshCorePayloadType, MeshCoreRawPacket + + +@pytest.fixture +def path_observation(meshcore_feeder): + """Observation with a two-hop path for rollup tests.""" + now = timezone.now().replace(minute=0, second=0, microsecond=0) + packet = MeshCoreRawPacket.objects.create( + observer=meshcore_feeder["node"], + payload_type=MeshCorePayloadType.CHANNEL_TEXT, + event_type="channel_message", + rx_time=now, + raw_json={}, + ) + obs = MeshCorePacketObservation.objects.create( + packet=packet, + observer=meshcore_feeder["node"], + rx_time=now, + upload_time=now, + path_hashes=["aa", "bb", "cc"], + path_hash_size=1, + path_hash_mode=2, + rx_snr=-5.0, + ) + return {"packet": packet, "observation": obs, "hour_start": now} diff --git a/Meshflow/meshcore_packet_path/tests/test_models.py b/Meshflow/meshcore_packet_path/tests/test_models.py new file mode 100644 index 0000000..ff2020c --- /dev/null +++ b/Meshflow/meshcore_packet_path/tests/test_models.py @@ -0,0 +1,56 @@ +"""Model tests for meshcore_packet_path.""" + +from django.db import IntegrityError + +import pytest + +from meshcore_packet_path.models import MeshCorePathEdgeBucket, MeshCorePathSegmentResolution, SegmentStatus + + +@pytest.mark.django_db +def test_segment_resolution_unique_identity(): + MeshCorePathSegmentResolution.objects.create( + segment_hash="f3bc", + hash_size=2, + hash_mode=1, + status=SegmentStatus.UNKNOWN, + ) + with pytest.raises(IntegrityError): + MeshCorePathSegmentResolution.objects.create( + segment_hash="f3bc", + hash_size=2, + hash_mode=1, + status=SegmentStatus.UNKNOWN, + ) + + +@pytest.mark.django_db +def test_edge_bucket_update_or_create_idempotent(path_observation): + hour = path_observation["hour_start"] + observer = path_observation["observation"].observer + defaults = {"packet_count": 1, "observation_count": 1} + MeshCorePathEdgeBucket.objects.update_or_create( + bucket_start=hour, + bucket_size="1h", + from_kind="hash", + to_kind="hash", + from_hash="aa", + to_hash="bb", + observer=observer, + constellation_id=observer.constellation_id, + defaults=defaults, + ) + obj, created = MeshCorePathEdgeBucket.objects.update_or_create( + bucket_start=hour, + bucket_size="1h", + from_kind="hash", + to_kind="hash", + from_hash="aa", + to_hash="bb", + observer=observer, + constellation_id=observer.constellation_id, + defaults={"packet_count": 9, "observation_count": 9}, + ) + assert created is False + assert obj.packet_count == 9 + assert MeshCorePathEdgeBucket.objects.filter(bucket_start=hour, from_hash="aa").count() == 1 diff --git a/Meshflow/meshcore_packet_path/tests/test_rollup.py b/Meshflow/meshcore_packet_path/tests/test_rollup.py new file mode 100644 index 0000000..c812b35 --- /dev/null +++ b/Meshflow/meshcore_packet_path/tests/test_rollup.py @@ -0,0 +1,117 @@ +"""Rollup and eviction tests.""" + +from datetime import timedelta + +from django.utils import timezone + +import pytest + +from meshcore_packet_path.models import MeshCorePathEdgeBucket, MeshCorePathSegmentResolution, SegmentStatus +from meshcore_packet_path.services.rollup import collect_path_edge_buckets_for_hour, touch_segment +from meshcore_packet_path.tasks import evict_old_path_data +from meshcore_packets.models import MeshCorePacketObservation, MeshCorePayloadType, MeshCoreRawPacket +from meshcore_packets.tests.conftest import FEEDER_B_MC_PUBKEY +from nodes.models import NodeAuth + + +@pytest.mark.django_db +def test_rollup_creates_hash_chain_edges(path_observation): + hour = path_observation["hour_start"] + result = collect_path_edge_buckets_for_hour(hour) + assert result["observations_processed"] == 1 + assert MeshCorePathEdgeBucket.objects.filter(bucket_start=hour).count() == 2 + edges = set(MeshCorePathEdgeBucket.objects.filter(bucket_start=hour).values_list("from_hash", "to_hash")) + assert edges == {("aa", "bb"), ("bb", "cc")} + segments = MeshCorePathSegmentResolution.objects.filter(segment_hash__in=["aa", "bb", "cc"]) + assert segments.count() == 3 + assert all(s.status == SegmentStatus.UNKNOWN for s in segments) + + +@pytest.mark.django_db +def test_rollup_idempotent(path_observation): + hour = path_observation["hour_start"] + collect_path_edge_buckets_for_hour(hour) + collect_path_edge_buckets_for_hour(hour) + assert MeshCorePathEdgeBucket.objects.filter(bucket_start=hour).count() == 2 + + +@pytest.mark.django_db +def test_rollup_two_feeders_distinct_edges(meshcore_feeder, create_managed_node, create_node_api_key): + from common.protocol import Protocol + + hour = timezone.now().replace(minute=0, second=0, microsecond=0) + packet = MeshCoreRawPacket.objects.create( + observer=meshcore_feeder["node"], + payload_type=MeshCorePayloadType.ADVERT, + event_type="advertisement", + pkt_hash=88001, + rx_time=hour, + raw_json={}, + ) + feeder_b = create_managed_node( + meshtastic_node_id=None, + protocol=Protocol.MESHCORE, + name="MC Feeder B", + mc_pubkey=FEEDER_B_MC_PUBKEY, + ) + api_key = create_node_api_key(constellation=feeder_b.constellation) + NodeAuth.objects.create(api_key=api_key, node=feeder_b) + + MeshCorePacketObservation.objects.create( + packet=packet, + observer=meshcore_feeder["node"], + rx_time=hour, + upload_time=hour, + path_hashes=["11", "22"], + ) + MeshCorePacketObservation.objects.create( + packet=packet, + observer=feeder_b, + rx_time=hour, + upload_time=hour, + path_hashes=["33", "44"], + ) + + collect_path_edge_buckets_for_hour(hour) + assert MeshCorePathEdgeBucket.objects.filter(bucket_start=hour).count() == 2 + + +@pytest.mark.django_db +def test_touch_segment_does_not_downgrade_manual(path_observation): + seen = path_observation["hour_start"] + seg = MeshCorePathSegmentResolution.objects.create( + segment_hash="dead", + hash_size=1, + hash_mode=0, + status=SegmentStatus.RESOLVED, + source="manual_admin", + first_seen_at=seen, + last_seen_at=seen, + ) + later = seen + timedelta(hours=1) + touch_segment("dead", hash_size=1, hash_mode=0, seen_at=later) + seg.refresh_from_db() + assert seg.status == SegmentStatus.RESOLVED + assert seg.source == "manual_admin" + assert seg.last_seen_at == later + + +@pytest.mark.django_db +def test_eviction_respects_cutoff(path_observation, settings): + settings.MESHCORE_PATH_RETENTION_DAYS = 30 + old = timezone.now() - timedelta(days=60) + MeshCorePathEdgeBucket.objects.create( + bucket_start=old, + from_hash="x", + to_hash="y", + ) + MeshCorePathSegmentResolution.objects.create( + segment_hash="oldseg", + status=SegmentStatus.UNKNOWN, + source="", + first_seen_at=old, + last_seen_at=old, + ) + result = evict_old_path_data() + assert result["buckets_deleted"] >= 1 + assert not MeshCorePathEdgeBucket.objects.filter(from_hash="x", to_hash="y").exists() From 44fa745dce55f10d734f6be0bffd0b1e5671dbad Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Mon, 1 Jun 2026 10:19:56 +0100 Subject: [PATCH 05/12] feat(meshcore-path): add path edge bucket backfill command --- .../management/__init__.py | 0 .../management/commands/__init__.py | 0 .../commands/backfill_path_edge_buckets.py | 50 +++++++++++++++++++ .../tests/test_backfill_command.py | 24 +++++++++ 4 files changed, 74 insertions(+) create mode 100644 Meshflow/meshcore_packet_path/management/__init__.py create mode 100644 Meshflow/meshcore_packet_path/management/commands/__init__.py create mode 100644 Meshflow/meshcore_packet_path/management/commands/backfill_path_edge_buckets.py create mode 100644 Meshflow/meshcore_packet_path/tests/test_backfill_command.py diff --git a/Meshflow/meshcore_packet_path/management/__init__.py b/Meshflow/meshcore_packet_path/management/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Meshflow/meshcore_packet_path/management/commands/__init__.py b/Meshflow/meshcore_packet_path/management/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Meshflow/meshcore_packet_path/management/commands/backfill_path_edge_buckets.py b/Meshflow/meshcore_packet_path/management/commands/backfill_path_edge_buckets.py new file mode 100644 index 0000000..ce240fc --- /dev/null +++ b/Meshflow/meshcore_packet_path/management/commands/backfill_path_edge_buckets.py @@ -0,0 +1,50 @@ +"""Backfill passive path edge buckets for past hours.""" + +from datetime import timedelta + +from django.core.management.base import BaseCommand +from django.utils import timezone + +from meshcore_packet_path.services.rollup import collect_path_edge_buckets_for_range + + +class Command(BaseCommand): + help = "Backfill MeshCore path edge buckets for the last N hours or days (idempotent)" + + def add_arguments(self, parser): + parser.add_argument( + "--hours", + type=int, + default=None, + help="Number of hours to backfill (default: use --days or 24)", + ) + parser.add_argument( + "--days", + type=int, + default=None, + help="Number of days to backfill", + ) + + def handle(self, *args, **options): + hours = options.get("hours") + days = options.get("days") + if days is not None: + hours = days * 24 + if hours is None: + hours = 24 + + current_hour = timezone.now().replace(minute=0, second=0, microsecond=0) + start_hour = current_hour - timedelta(hours=hours) + self.stdout.write(f"Backfilling path edge buckets from {start_hour} to {current_hour}...") + result = collect_path_edge_buckets_for_range( + start_hour, + current_hour, + skip_existing=True, + ) + self.stdout.write( + self.style.SUCCESS( + f"Done: created={result['created']}, updated={result['updated']}, " + f"skipped_hours={result['skipped_hours']}, " + f"observations_processed={result['observations_processed']}" + ) + ) diff --git a/Meshflow/meshcore_packet_path/tests/test_backfill_command.py b/Meshflow/meshcore_packet_path/tests/test_backfill_command.py new file mode 100644 index 0000000..9cc3ab8 --- /dev/null +++ b/Meshflow/meshcore_packet_path/tests/test_backfill_command.py @@ -0,0 +1,24 @@ +"""Management command tests.""" + +from django.core.management import call_command +from django.utils import timezone + +import pytest + +from meshcore_packet_path.models import MeshCorePathEdgeBucket + + +@pytest.mark.django_db +def test_backfill_command_creates_buckets(path_observation): + from datetime import timedelta + + obs = path_observation["observation"] + current_hour = timezone.now().replace(minute=0, second=0, microsecond=0) + obs.upload_time = current_hour - timedelta(hours=1) + obs.save(update_fields=["upload_time"]) + + call_command("backfill_path_edge_buckets", hours=1) + assert MeshCorePathEdgeBucket.objects.exists() + + call_command("backfill_path_edge_buckets", hours=1) + assert MeshCorePathEdgeBucket.objects.filter(bucket_start=obs.upload_time).count() == 2 From 122059ff5ebbec1fac280939408ad3cf0df9e333 Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Mon, 1 Jun 2026 10:19:57 +0100 Subject: [PATCH 06/12] feat(meshcore-path): add 6-month eviction task and beat schedules --- .../0002_add_path_rollup_periodic_tasks.py | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 Meshflow/meshcore_packet_path/migrations/0002_add_path_rollup_periodic_tasks.py diff --git a/Meshflow/meshcore_packet_path/migrations/0002_add_path_rollup_periodic_tasks.py b/Meshflow/meshcore_packet_path/migrations/0002_add_path_rollup_periodic_tasks.py new file mode 100644 index 0000000..e9d5e14 --- /dev/null +++ b/Meshflow/meshcore_packet_path/migrations/0002_add_path_rollup_periodic_tasks.py @@ -0,0 +1,61 @@ +"""Add PeriodicTasks for path edge rollup and eviction.""" + +from django.db import migrations + + +def create_periodic_tasks(apps, schema_editor): + CrontabSchedule = apps.get_model("django_celery_beat", "CrontabSchedule") + PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask") + + hourly, _ = CrontabSchedule.objects.get_or_create( + minute="5", + hour="*", + day_of_week="*", + day_of_month="*", + month_of_year="*", + defaults={"timezone": "UTC"}, + ) + PeriodicTask.objects.get_or_create( + name="collect_path_edge_buckets", + defaults={ + "task": "meshcore_packet_path.tasks.collect_path_edge_buckets", + "crontab": hourly, + "enabled": True, + }, + ) + + daily, _ = CrontabSchedule.objects.get_or_create( + minute="15", + hour="2", + day_of_week="*", + day_of_month="*", + month_of_year="*", + defaults={"timezone": "UTC"}, + ) + PeriodicTask.objects.get_or_create( + name="evict_old_path_data", + defaults={ + "task": "meshcore_packet_path.tasks.evict_old_path_data", + "crontab": daily, + "enabled": True, + }, + ) + + +def remove_periodic_tasks(apps, schema_editor): + PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask") + PeriodicTask.objects.filter( + name__in=("collect_path_edge_buckets", "evict_old_path_data") + ).delete() + + +class Migration(migrations.Migration): + + dependencies = [ + ("meshcore_packet_path", "0001_initial"), + ("django_celery_beat", "0001_initial"), + ] + + operations = [ + migrations.RunPython(create_periodic_tasks, remove_periodic_tasks), + ] From e1db70983ad948a431783a9c503ccd05f60a0c06 Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Mon, 1 Jun 2026 10:20:01 +0100 Subject: [PATCH 07/12] feat(meshcore-path): add path-tracing edges and segments APIs with staff annotation --- Meshflow/meshcore_packet_path/serializers.py | 131 +++++++++++++++ .../meshcore_packet_path/tests/test_api.py | 112 +++++++++++++ Meshflow/meshcore_packet_path/urls.py | 17 ++ Meshflow/meshcore_packet_path/views.py | 158 ++++++++++++++++++ Meshflow/meshcore_packets/urls.py | 3 +- 5 files changed, 420 insertions(+), 1 deletion(-) create mode 100644 Meshflow/meshcore_packet_path/serializers.py create mode 100644 Meshflow/meshcore_packet_path/tests/test_api.py create mode 100644 Meshflow/meshcore_packet_path/urls.py create mode 100644 Meshflow/meshcore_packet_path/views.py diff --git a/Meshflow/meshcore_packet_path/serializers.py b/Meshflow/meshcore_packet_path/serializers.py new file mode 100644 index 0000000..74cf2ef --- /dev/null +++ b/Meshflow/meshcore_packet_path/serializers.py @@ -0,0 +1,131 @@ +"""Read serializers for MeshCore passive packet path APIs.""" + +from rest_framework import serializers + +from common.protocol import Protocol +from meshcore_packet_path.models import ( + MeshCorePathEdgeBucket, + MeshCorePathSegmentResolution, + SegmentStatus, +) +from nodes.models import ObservedNode + + +class MeshCorePathEdgeBucketSerializer(serializers.ModelSerializer): + direction = serializers.SerializerMethodField() + observer_name = serializers.CharField(source="observer.name", read_only=True, allow_null=True) + constellation_name = serializers.CharField( + source="constellation.name", + read_only=True, + allow_null=True, + ) + resolved = serializers.SerializerMethodField() + + class Meta: + model = MeshCorePathEdgeBucket + fields = [ + "id", + "bucket_start", + "bucket_size", + "from_kind", + "to_kind", + "from_hash", + "to_hash", + "from_node", + "to_node", + "observer", + "observer_name", + "constellation", + "constellation_name", + "packet_count", + "observation_count", + "first_seen_at", + "last_seen_at", + "avg_snr", + "min_snr", + "max_snr", + "direction", + "resolved", + ] + read_only_fields = fields + + def get_direction(self, obj) -> str: + return "list_order" + + def get_resolved(self, obj) -> bool: + return bool(obj.from_node_id and obj.to_node_id) + + +class ObservedNodeMinimalSerializer(serializers.ModelSerializer): + node_id_str = serializers.SerializerMethodField() + + class Meta: + model = ObservedNode + fields = ["internal_id", "node_id_str", "long_name"] + read_only_fields = fields + + def get_node_id_str(self, obj) -> str | None: + if obj.protocol == Protocol.MESHCORE and obj.mc_pubkey: + return f"mc:{obj.mc_pubkey}" + return None + + +class MeshCorePathSegmentSerializer(serializers.ModelSerializer): + observed_node = ObservedNodeMinimalSerializer(read_only=True) + + class Meta: + model = MeshCorePathSegmentResolution + fields = [ + "id", + "segment_hash", + "hash_size", + "hash_mode", + "status", + "source", + "resolver_version", + "confidence", + "observed_node", + "first_seen_at", + "last_seen_at", + ] + read_only_fields = fields + + +class MeshCorePathSegmentAnnotateSerializer(serializers.Serializer): + """Staff manual annotation for a path segment.""" + + observed_node_id = serializers.UUIDField(required=False, allow_null=True) + node_id_str = serializers.CharField(required=False, allow_null=True, allow_blank=True) + status = serializers.ChoiceField( + choices=SegmentStatus.choices, + required=False, + ) + + def validate(self, attrs): + if not attrs: + raise serializers.ValidationError("At least one field is required") + node_uuid = attrs.get("observed_node_id") + node_id_str = attrs.get("node_id_str") + if node_uuid and node_id_str: + raise serializers.ValidationError("Provide observed_node_id or node_id_str, not both") + return attrs + + def resolve_observed_node(self) -> ObservedNode | None: + node_uuid = self.validated_data.get("observed_node_id") + node_id_str = (self.validated_data.get("node_id_str") or "").strip() + if node_uuid: + return ObservedNode.objects.filter( + internal_id=node_uuid, + protocol=Protocol.MESHCORE, + ).first() + if node_id_str: + pubkey = node_id_str.removeprefix("mc:").lower() + return ObservedNode.objects.filter( + protocol=Protocol.MESHCORE, + mc_pubkey=pubkey, + ).first() + if "observed_node_id" in self.validated_data and self.validated_data["observed_node_id"] is None: + return None + if node_id_str == "" and "node_id_str" in self.validated_data: + return None + return None diff --git a/Meshflow/meshcore_packet_path/tests/test_api.py b/Meshflow/meshcore_packet_path/tests/test_api.py new file mode 100644 index 0000000..d1194bb --- /dev/null +++ b/Meshflow/meshcore_packet_path/tests/test_api.py @@ -0,0 +1,112 @@ +"""API tests for path-tracing endpoints.""" + +from django.urls import reverse + +import pytest +from rest_framework import status +from rest_framework.test import APIClient + +from common.protocol import Protocol +from meshcore_packet_path.models import MeshCorePathSegmentResolution, SegmentStatus +from meshcore_packet_path.services.rollup import collect_path_edge_buckets_for_hour + + +@pytest.fixture +def api_client(create_user): + client = APIClient() + user = create_user() + client.force_authenticate(user=user) + return client, user + + +@pytest.fixture +def staff_client(create_user): + client = APIClient() + user = create_user(is_staff=True, is_superuser=True) + client.force_authenticate(user=user) + return client, user + + +@pytest.mark.django_db +def test_edges_list_requires_auth(): + response = APIClient().get(reverse("meshcore-path-tracing-edges")) + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + +@pytest.mark.django_db +def test_edges_list_includes_direction(api_client, path_observation): + client, _ = api_client + hour = path_observation["hour_start"] + collect_path_edge_buckets_for_hour(hour) + + response = client.get(reverse("meshcore-path-tracing-edges")) + assert response.status_code == status.HTTP_200_OK + assert response.data["count"] >= 1 + row = response.data["results"][0] + assert row["direction"] == "list_order" + + +@pytest.mark.django_db +def test_edges_filter_from_hash(api_client, path_observation): + client, _ = api_client + hour = path_observation["hour_start"] + collect_path_edge_buckets_for_hour(hour) + + response = client.get(reverse("meshcore-path-tracing-edges"), {"from_hash": "aa"}) + assert response.status_code == status.HTTP_200_OK + for row in response.data["results"]: + assert row["from_hash"] == "aa" + + +@pytest.mark.django_db +def test_segments_list_filter_hash_mode(api_client): + client, _ = api_client + MeshCorePathSegmentResolution.objects.create( + segment_hash="a1", + hash_mode=2, + hash_size=1, + status=SegmentStatus.UNKNOWN, + ) + MeshCorePathSegmentResolution.objects.create( + segment_hash="b2", + hash_mode=0, + hash_size=2, + status=SegmentStatus.UNKNOWN, + ) + + response = client.get(reverse("meshcore-path-tracing-segments"), {"hash_mode": 2}) + assert response.status_code == status.HTTP_200_OK + hashes = {r["segment_hash"] for r in response.data["results"]} + assert hashes == {"a1"} + + +@pytest.mark.django_db +def test_segment_patch_staff_manual_admin(staff_client, create_observed_node): + client, _ = staff_client + node = create_observed_node(protocol=Protocol.MESHCORE, mc_pubkey="c" * 64) + seg = MeshCorePathSegmentResolution.objects.create( + segment_hash="feed", + status=SegmentStatus.UNKNOWN, + ) + + url = reverse("meshcore-path-tracing-segment-detail", kwargs={"pk": seg.pk}) + response = client.patch( + url, + {"node_id_str": f"mc:{node.mc_pubkey}", "status": "resolved"}, + format="json", + ) + assert response.status_code == status.HTTP_200_OK + seg.refresh_from_db() + assert seg.source == "manual_admin" + assert seg.observed_node_id == node.internal_id + assert seg.status == SegmentStatus.RESOLVED + assert seg.resolver_version == 2 + + +@pytest.mark.django_db +def test_segment_patch_forbidden_for_non_staff(api_client): + client, _ = api_client + seg = MeshCorePathSegmentResolution.objects.create(segment_hash="nope") + url = reverse("meshcore-path-tracing-segment-detail", kwargs={"pk": seg.pk}) + response = client.patch(url, {"status": "resolved"}, format="json") + assert response.status_code == status.HTTP_403_FORBIDDEN diff --git a/Meshflow/meshcore_packet_path/urls.py b/Meshflow/meshcore_packet_path/urls.py new file mode 100644 index 0000000..84c39f2 --- /dev/null +++ b/Meshflow/meshcore_packet_path/urls.py @@ -0,0 +1,17 @@ +from django.urls import path + +from meshcore_packet_path.views import ( + PathTracingEdgesListView, + PathTracingSegmentDetailView, + PathTracingSegmentListView, +) + +urlpatterns = [ + path("edges/", PathTracingEdgesListView.as_view(), name="meshcore-path-tracing-edges"), + path("segments/", PathTracingSegmentListView.as_view(), name="meshcore-path-tracing-segments"), + path( + "segments//", + PathTracingSegmentDetailView.as_view(), + name="meshcore-path-tracing-segment-detail", + ), +] diff --git a/Meshflow/meshcore_packet_path/views.py b/Meshflow/meshcore_packet_path/views.py new file mode 100644 index 0000000..9d0419f --- /dev/null +++ b/Meshflow/meshcore_packet_path/views.py @@ -0,0 +1,158 @@ +"""Read APIs for MeshCore passive packet path evidence.""" + +from django.db.models import Q +from django.utils import timezone +from django.utils.dateparse import parse_datetime + +from rest_framework import generics, permissions, status +from rest_framework.response import Response +from rest_framework.views import APIView + +from meshcore_packet_path.models import MeshCorePathEdgeBucket, MeshCorePathSegmentResolution, SegmentStatus +from meshcore_packet_path.serializers import ( + MeshCorePathEdgeBucketSerializer, + MeshCorePathSegmentAnnotateSerializer, + MeshCorePathSegmentSerializer, +) + + +def _parse_dt_param(value: str | None): + if not value: + return None + dt = parse_datetime(value) + if dt is None: + return None + if timezone.is_naive(dt): + dt = timezone.make_aware(dt) + return dt + + +class PathTracingEdgesListView(generics.ListAPIView): + """GET /api/meshcore/path-tracing/edges/ — bucketed hash-chain edges.""" + + permission_classes = [permissions.IsAuthenticated] + serializer_class = MeshCorePathEdgeBucketSerializer + + def get_queryset(self): + qs = MeshCorePathEdgeBucket.objects.select_related( + "observer", + "constellation", + "from_node", + "to_node", + ).order_by("-bucket_start", "-observation_count") + + params = self.request.query_params + after = _parse_dt_param(params.get("bucket_start_after")) + before = _parse_dt_param(params.get("bucket_start_before")) + if after: + qs = qs.filter(bucket_start__gte=after) + if before: + qs = qs.filter(bucket_start__lt=before) + + observer = params.get("observer") + if observer: + qs = qs.filter(observer__internal_id=observer) + + constellation = params.get("constellation") + if constellation: + qs = qs.filter(constellation_id=constellation) + + from_hash = params.get("from_hash") + if from_hash: + qs = qs.filter(from_hash=from_hash.lower()) + + to_hash = params.get("to_hash") + if to_hash: + qs = qs.filter(to_hash=to_hash.lower()) + + resolved = params.get("resolved") + if resolved is not None: + if resolved.lower() in ("true", "1", "yes"): + qs = qs.filter(from_node__isnull=False, to_node__isnull=False) + elif resolved.lower() in ("false", "0", "no"): + qs = qs.filter(Q(from_node__isnull=True) | Q(to_node__isnull=True)) + + return qs + + +class PathTracingSegmentListView(generics.ListAPIView): + """GET /api/meshcore/path-tracing/segments/ — segment resolution table.""" + + permission_classes = [permissions.IsAuthenticated] + serializer_class = MeshCorePathSegmentSerializer + + def get_queryset(self): + qs = MeshCorePathSegmentResolution.objects.select_related("observed_node").order_by("-last_seen_at") + params = self.request.query_params + + status_val = params.get("status") + if status_val: + qs = qs.filter(status=status_val) + + hash_mode = params.get("hash_mode") + if hash_mode is not None and hash_mode != "": + qs = qs.filter(hash_mode=int(hash_mode)) + + hash_size = params.get("hash_size") + if hash_size is not None and hash_size != "": + qs = qs.filter(hash_size=int(hash_size)) + + segment_hash = params.get("segment_hash") + if segment_hash: + qs = qs.filter(segment_hash=segment_hash.lower()) + + resolved = params.get("resolved") + if resolved is not None: + if resolved.lower() in ("true", "1", "yes"): + qs = qs.filter(status=SegmentStatus.RESOLVED) + elif resolved.lower() in ("false", "0", "no"): + qs = qs.exclude(status=SegmentStatus.RESOLVED) + + return qs + + +class PathTracingSegmentDetailView(APIView): + """GET/PATCH /api/meshcore/path-tracing/segments//""" + + permission_classes = [permissions.IsAuthenticated] + + def get_object(self, pk): + return MeshCorePathSegmentResolution.objects.select_related("observed_node").get(pk=pk) + + def get(self, request, pk): + segment = self.get_object(pk) + return Response(MeshCorePathSegmentSerializer(segment).data) + + def patch(self, request, pk): + if not request.user.is_staff: + return Response(status=status.HTTP_403_FORBIDDEN) + + segment = self.get_object(pk) + ser = MeshCorePathSegmentAnnotateSerializer(data=request.data, partial=True) + ser.is_valid(raise_exception=True) + + node = ser.resolve_observed_node() + if ser.validated_data.get("observed_node_id") or ser.validated_data.get("node_id_str"): + if node is None and ( + ser.validated_data.get("observed_node_id") or (ser.validated_data.get("node_id_str") or "").strip() + ): + return Response( + {"detail": "Observed node not found"}, + status=status.HTTP_400_BAD_REQUEST, + ) + + if "observed_node_id" in ser.validated_data or ser.validated_data.get("node_id_str") is not None: + segment.observed_node = node + if node is not None: + segment.status = SegmentStatus.RESOLVED + elif ser.validated_data.get("observed_node_id") is None and (ser.validated_data.get("node_id_str") == ""): + segment.status = SegmentStatus.UNKNOWN + + if "status" in ser.validated_data: + segment.status = ser.validated_data["status"] + + segment.source = "manual_admin" + segment.resolver_version = segment.resolver_version + 1 + segment.save() + + return Response(MeshCorePathSegmentSerializer(segment).data) diff --git a/Meshflow/meshcore_packets/urls.py b/Meshflow/meshcore_packets/urls.py index 51ec833..9b6df38 100644 --- a/Meshflow/meshcore_packets/urls.py +++ b/Meshflow/meshcore_packets/urls.py @@ -1,4 +1,4 @@ -from django.urls import path +from django.urls import include, path from meshcore_packets.views import ( ManagedNodeMcChannelApplyView, @@ -31,6 +31,7 @@ name="meshcore-feeder-bot-version", ), path("packets/", MeshCorePacketListView.as_view(), name="meshcore-packet-list"), + path("path-tracing/", include("meshcore_packet_path.urls")), path( "managed-nodes//apply-mc-channel-config/", ManagedNodeMcChannelApplyView.as_view(), From 0ba5511aa200de8d36a59d2318ce451afa1c6154 Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Mon, 1 Jun 2026 10:20:41 +0100 Subject: [PATCH 08/12] docs(meshcore-path): document edges/segments API in openapi and update progress --- .../packet-path-tracing-outstanding.md | 2 +- .../packet-path-tracing-progress.md | 32 +- openapi.yaml | 285 ++++++++++++++++++ 3 files changed, 315 insertions(+), 4 deletions(-) diff --git a/docs/features/meshcore/packet-path-tracing/packet-path-tracing-outstanding.md b/docs/features/meshcore/packet-path-tracing/packet-path-tracing-outstanding.md index 99147b8..3201008 100644 --- a/docs/features/meshcore/packet-path-tracing/packet-path-tracing-outstanding.md +++ b/docs/features/meshcore/packet-path-tracing/packet-path-tracing-outstanding.md @@ -25,7 +25,7 @@ Items **skipped**, **incomplete**, or **discovered during planning** — not the ## Capture gaps to confirm during M1/M2 -- [ ] `path_hash_size` / `path_hash_mode` not yet persisted on `MeshCorePacketObservation` (being addressed in M1 capture, api + bot). +- [x] `path_hash_size` / `path_hash_mode` persisted on `MeshCorePacketObservation` (M1 api + bot). - [ ] `path_update` carries `public_key` only (no path hash in captures) — capture for possible future binding, not as a current resolver source. - [ ] `trace_data` relationship to path hashes / active traces unconfirmed (M2 spike). diff --git a/docs/features/meshcore/packet-path-tracing/packet-path-tracing-progress.md b/docs/features/meshcore/packet-path-tracing/packet-path-tracing-progress.md index d287af9..3c79785 100644 --- a/docs/features/meshcore/packet-path-tracing/packet-path-tracing-progress.md +++ b/docs/features/meshcore/packet-path-tracing/packet-path-tracing-progress.md @@ -44,7 +44,7 @@ Tracked as sub-issues of [#267](https://github.com/pskillen/meshflow-api/issues/ | Milestone | Issue | Status | | --- | --- | --- | -| M1 MVP (capture + resolution table + hourly rollup + eviction + edges/segments API + diagnostic UI) | [#372](https://github.com/pskillen/meshflow-api/issues/372) | In progress | +| M1 MVP (capture + resolution table + hourly rollup + eviction + edges/segments API + diagnostic UI) | [#372](https://github.com/pskillen/meshflow-api/issues/372) | Complete (pending deploy) | | M2 resolution spike (decision gate) | [#373](https://github.com/pskillen/meshflow-api/issues/373) | Not started | | M3 proactive resolver (conditional on M2) | [#374](https://github.com/pskillen/meshflow-api/issues/374) | Not started | | M4 Neo4j `PATH_OBSERVED` export | [#375](https://github.com/pskillen/meshflow-api/issues/375) | Not started | @@ -56,7 +56,33 @@ Tracked as sub-issues of [#267](https://github.com/pskillen/meshflow-api/issues/ --- +## M1 — delivered (pending PR merge / deploy) + +**Branch:** `api-372/pskillen/meshcore-passive-path-m1` + +**API** + +- `meshcore_packet_path` app: segment resolution + edge bucket models, hourly rollup, 6-month eviction, backfill command. +- `GET /api/meshcore/path-tracing/edges/`, `GET/PATCH .../segments/`. +- `path_hash_size` / `path_hash_mode` on `MeshCorePacketObservation`. + +**Bot** + +- Forwards `path_hash_size` and `path_hash_mode` on ingest envelopes. + +**UI** + +- Diagnostic preview page (meshflow-ui; see ui PR). + +**Deploy / verify** + +- Run migrations and `run_deploy_tasks`; confirm Celery beat rows `collect_path_edge_buckets` and `evict_old_path_data`. +- `python manage.py backfill_path_edge_buckets --days 7` +- Hit edges/segments APIs; open Passive Path (preview) in UI. + +--- + ## Next -- Execute M1 per the detailed Cursor plan `MC passive path M1`. -- Branch from latest `origin/main` as `api-372/pskillen/meshcore-passive-path-m1` in meshflow-api, meshflow-bot, and meshflow-ui; atomic conventional commits; PRs via github-personal MCP. +- Open PRs (api, bot, ui); merge and deploy. +- Begin M2 resolution spike ([#373](https://github.com/pskillen/meshflow-api/issues/373)) using the diagnostic UI. diff --git a/openapi.yaml b/openapi.yaml index 9d920b6..994f29a 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -20,6 +20,8 @@ tags: description: Meshtastic packet ingestion and feeder node upsert (Node API key) - name: MeshCore packets description: MeshCore packet ingestion (Node API key) and read-only packet list + - name: MeshCore path tracing + description: Passive MeshCore packet path evidence (hash-chain edge rollups and segment resolution) - name: Packets description: > Legacy tag name retained for compatibility. All Meshtastic packet ingestion and feeder @@ -146,6 +148,14 @@ components: description: > Repeater path segments for this feeder observation (stored on MeshCorePacketObservation only; not on deduped MeshCoreRawPacket). + path_hash_size: + type: integer + nullable: true + description: Bytes per path segment (from wire or bot default 2) + path_hash_mode: + type: integer + nullable: true + description: path_hash_mode from channel/contact frames when present channel_idx: type: integer nullable: true @@ -168,6 +178,133 @@ components: encrypted: type: boolean description: When true, ingest returns 304 without storage + MeshCorePathEdgeBucket: + type: object + properties: + id: + type: string + format: uuid + bucket_start: + type: string + format: date-time + bucket_size: + type: string + example: "1h" + from_kind: + type: string + to_kind: + type: string + from_hash: + type: string + to_hash: + type: string + from_node: + type: string + format: uuid + nullable: true + to_node: + type: string + format: uuid + nullable: true + observer: + type: string + format: uuid + nullable: true + observer_name: + type: string + nullable: true + constellation: + type: integer + nullable: true + constellation_name: + type: string + nullable: true + packet_count: + type: integer + observation_count: + type: integer + first_seen_at: + type: string + format: date-time + nullable: true + last_seen_at: + type: string + format: date-time + nullable: true + avg_snr: + type: number + nullable: true + min_snr: + type: number + nullable: true + max_snr: + type: number + nullable: true + direction: + type: string + description: Always `list_order` (not proven forwarding direction) + example: list_order + resolved: + type: boolean + description: True when both from_node and to_node are set + MeshCorePathSegment: + type: object + properties: + id: + type: string + format: uuid + segment_hash: + type: string + hash_size: + type: integer + nullable: true + hash_mode: + type: integer + nullable: true + status: + type: string + enum: [unknown, resolved, ambiguous, stale] + source: + type: string + resolver_version: + type: integer + confidence: + type: number + nullable: true + observed_node: + type: object + nullable: true + properties: + internal_id: + type: string + format: uuid + node_id_str: + type: string + nullable: true + long_name: + type: string + nullable: true + first_seen_at: + type: string + format: date-time + last_seen_at: + type: string + format: date-time + MeshCorePathSegmentAnnotateRequest: + type: object + description: Staff-only manual segment annotation (PATCH) + properties: + observed_node_id: + type: string + format: uuid + nullable: true + node_id_str: + type: string + nullable: true + description: mc: pubkey form + status: + type: string + enum: [unknown, resolved, ambiguous, stale] FeederReach: type: object description: > @@ -4096,6 +4233,154 @@ paths: '200': description: Paginated MeshCore raw packets + /meshcore/path-tracing/edges/: + get: + summary: List passive path edge buckets + tags: [MeshCore path tracing] + security: + - BearerAuth: [] + parameters: + - $ref: '#/components/parameters/PaginationPage' + - $ref: '#/components/parameters/PaginationPageSize' + - name: bucket_start_after + in: query + schema: + type: string + format: date-time + - name: bucket_start_before + in: query + schema: + type: string + format: date-time + - name: observer + in: query + schema: + type: string + format: uuid + description: ManagedNode internal_id + - name: constellation + in: query + schema: + type: integer + - name: from_hash + in: query + schema: + type: string + - name: to_hash + in: query + schema: + type: string + - name: resolved + in: query + schema: + type: boolean + responses: + '200': + description: Paginated hash-chain edge rollups + content: + application/json: + schema: + allOf: + - $ref: '#/components/schemas/PaginatedResponse' + - type: object + properties: + results: + type: array + items: + $ref: '#/components/schemas/MeshCorePathEdgeBucket' + + /meshcore/path-tracing/segments/: + get: + summary: List path segment resolutions + tags: [MeshCore path tracing] + security: + - BearerAuth: [] + parameters: + - $ref: '#/components/parameters/PaginationPage' + - $ref: '#/components/parameters/PaginationPageSize' + - name: status + in: query + schema: + type: string + - name: hash_mode + in: query + schema: + type: integer + - name: hash_size + in: query + schema: + type: integer + - name: segment_hash + in: query + schema: + type: string + - name: resolved + in: query + schema: + type: boolean + responses: + '200': + description: Paginated path segments + content: + application/json: + schema: + allOf: + - $ref: '#/components/schemas/PaginatedResponse' + - type: object + properties: + results: + type: array + items: + $ref: '#/components/schemas/MeshCorePathSegment' + + /meshcore/path-tracing/segments/{id}/: + get: + summary: Retrieve a path segment resolution + tags: [MeshCore path tracing] + security: + - BearerAuth: [] + parameters: + - name: id + in: path + required: true + schema: + type: string + format: uuid + responses: + '200': + description: Segment resolution row + content: + application/json: + schema: + $ref: '#/components/schemas/MeshCorePathSegment' + patch: + summary: Staff manual segment annotation + tags: [MeshCore path tracing] + security: + - BearerAuth: [] + parameters: + - name: id + in: path + required: true + schema: + type: string + format: uuid + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/MeshCorePathSegmentAnnotateRequest' + responses: + '200': + description: Updated segment + content: + application/json: + schema: + $ref: '#/components/schemas/MeshCorePathSegment' + '403': + description: Requires staff user + /packets/{meshtastic_node_id}/bot-version/: put: summary: Report meshflow-bot version (feeder) From c43b350fd7f471c2bf643c978528d9584980ae2f Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Mon, 1 Jun 2026 10:27:31 +0100 Subject: [PATCH 09/12] docs(meshcore-path): link M1 pull requests in progress log --- .../packet-path-tracing-progress.md | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/docs/features/meshcore/packet-path-tracing/packet-path-tracing-progress.md b/docs/features/meshcore/packet-path-tracing/packet-path-tracing-progress.md index 3c79785..8598395 100644 --- a/docs/features/meshcore/packet-path-tracing/packet-path-tracing-progress.md +++ b/docs/features/meshcore/packet-path-tracing/packet-path-tracing-progress.md @@ -8,11 +8,13 @@ ## Overall status -**Status:** In progress +**Status:** M1 PRs open (pending merge / deploy) **Branch:** `api-372/pskillen/meshcore-passive-path-m1` (meshflow-api, meshflow-bot, meshflow-ui) -The display-only passive slice that precedes this subsystem has **shipped** (see Precursor below). M1 implementation in flight (capture, `meshcore_packet_path` app, rollup/eviction, edges/segments API, diagnostic UI). +**PRs:** [meshflow-api#378](https://github.com/pskillen/meshflow-api/pull/378) · [meshflow-bot#122](https://github.com/pskillen/meshflow-bot/pull/122) · [meshflow-ui#310](https://github.com/pskillen/meshflow-ui/pull/310) + +The display-only passive slice that precedes this subsystem has **shipped** (see Precursor below). M1 implementation is complete in branch; awaiting merge and deploy. **Locked decisions (ADR-0001 open questions):** @@ -58,6 +60,8 @@ Tracked as sub-issues of [#267](https://github.com/pskillen/meshflow-api/issues/ ## M1 — delivered (pending PR merge / deploy) +**PRs:** [api#378](https://github.com/pskillen/meshflow-api/pull/378) · [bot#122](https://github.com/pskillen/meshflow-bot/pull/122) · [ui#310](https://github.com/pskillen/meshflow-ui/pull/310) + **Branch:** `api-372/pskillen/meshcore-passive-path-m1` **API** @@ -84,5 +88,5 @@ Tracked as sub-issues of [#267](https://github.com/pskillen/meshflow-api/issues/ ## Next -- Open PRs (api, bot, ui); merge and deploy. +- Merge PRs ([#378](https://github.com/pskillen/meshflow-api/pull/378), [#122](https://github.com/pskillen/meshflow-bot/pull/122), [#310](https://github.com/pskillen/meshflow-ui/pull/310)) and deploy. - Begin M2 resolution spike ([#373](https://github.com/pskillen/meshflow-api/issues/373)) using the diagnostic UI. From fc260b669212ebddf9fa6b4e50bce83b7aed8051 Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Mon, 1 Jun 2026 11:24:36 +0100 Subject: [PATCH 10/12] feat(meshcore-path): add tqdm and --days support to path backfill Share resolve_backfill_hours() between the management command and Celery task; show hourly progress in the CLI via tqdm. --- .../commands/backfill_path_edge_buckets.py | 21 ++++++++----- .../meshcore_packet_path/services/rollup.py | 31 +++++++++++++++++-- Meshflow/meshcore_packet_path/tasks.py | 12 +++++-- .../tests/test_backfill_command.py | 24 ++++++++++++-- 4 files changed, 72 insertions(+), 16 deletions(-) diff --git a/Meshflow/meshcore_packet_path/management/commands/backfill_path_edge_buckets.py b/Meshflow/meshcore_packet_path/management/commands/backfill_path_edge_buckets.py index ce240fc..5c3a63a 100644 --- a/Meshflow/meshcore_packet_path/management/commands/backfill_path_edge_buckets.py +++ b/Meshflow/meshcore_packet_path/management/commands/backfill_path_edge_buckets.py @@ -5,7 +5,10 @@ from django.core.management.base import BaseCommand from django.utils import timezone -from meshcore_packet_path.services.rollup import collect_path_edge_buckets_for_range +from meshcore_packet_path.services.rollup import ( + collect_path_edge_buckets_for_range, + resolve_backfill_hours, +) class Command(BaseCommand): @@ -26,20 +29,22 @@ def add_arguments(self, parser): ) def handle(self, *args, **options): - hours = options.get("hours") - days = options.get("days") - if days is not None: - hours = days * 24 - if hours is None: - hours = 24 + try: + hours = resolve_backfill_hours(hours=options.get("hours"), days=options.get("days")) + except ValueError as exc: + self.stderr.write(self.style.ERROR(str(exc))) + return current_hour = timezone.now().replace(minute=0, second=0, microsecond=0) start_hour = current_hour - timedelta(hours=hours) - self.stdout.write(f"Backfilling path edge buckets from {start_hour} to {current_hour}...") + self.stdout.write( + f"Backfilling path edge buckets for {hours} hour(s) " f"from {start_hour} to {current_hour}..." + ) result = collect_path_edge_buckets_for_range( start_hour, current_hour, skip_existing=True, + show_progress=True, ) self.stdout.write( self.style.SUCCESS( diff --git a/Meshflow/meshcore_packet_path/services/rollup.py b/Meshflow/meshcore_packet_path/services/rollup.py index b3cbb99..a52d1d1 100644 --- a/Meshflow/meshcore_packet_path/services/rollup.py +++ b/Meshflow/meshcore_packet_path/services/rollup.py @@ -18,6 +18,22 @@ from meshcore_packets.models import MeshCorePacketObservation +def resolve_backfill_hours( + *, + hours: int | None = None, + days: int | None = None, + default_hours: int = 24, +) -> int: + """Convert CLI/Celery backfill args to a single hour count (--days wins over --hours).""" + if hours is not None and days is not None: + raise ValueError("Specify only one of hours or days") + if days is not None: + return days * 24 + if hours is not None: + return hours + return default_hours + + def _normalize_hash(segment: str) -> str: return str(segment).strip().lower() @@ -181,14 +197,23 @@ def collect_path_edge_buckets_for_range( end_hour: datetime, *, skip_existing: bool = False, + show_progress: bool = False, ) -> dict[str, int]: """Roll up each hour in [start_hour, end_hour).""" + from tqdm import tqdm + totals = {"created": 0, "updated": 0, "skipped_hours": 0, "observations_processed": 0} hour = start_hour.replace(minute=0, second=0, microsecond=0) end = end_hour.replace(minute=0, second=0, microsecond=0) - while hour < end: - result = collect_path_edge_buckets_for_hour(hour, skip_existing=skip_existing) + total_hours = max(0, int((end - hour).total_seconds() // 3600)) + hour_iter = range(total_hours) + if show_progress: + hour_iter = tqdm(hour_iter, unit="hour", desc="Backfilling path edges") + + cursor = hour + for _ in hour_iter: + result = collect_path_edge_buckets_for_hour(cursor, skip_existing=skip_existing) for key in totals: totals[key] += result.get(key, 0) - hour += timedelta(hours=1) + cursor += timedelta(hours=1) return totals diff --git a/Meshflow/meshcore_packet_path/tasks.py b/Meshflow/meshcore_packet_path/tasks.py index bacf195..069dc90 100644 --- a/Meshflow/meshcore_packet_path/tasks.py +++ b/Meshflow/meshcore_packet_path/tasks.py @@ -10,6 +10,7 @@ from meshcore_packet_path.services.rollup import ( collect_path_edge_buckets_for_hour, collect_path_edge_buckets_for_range, + resolve_backfill_hours, ) @@ -22,10 +23,15 @@ def collect_path_edge_buckets(): @shared_task -def backfill_path_edge_buckets_task(hours: int = 24) -> dict: - """Backfill rollups for the last N hours (idempotent when skip_existing=True).""" +def backfill_path_edge_buckets_task(hours: int | None = None, days: int | None = None) -> dict: + """ + Backfill rollups for the last N hours or days (idempotent when skip_existing=True). + + Prefer ``python manage.py backfill_path_edge_buckets`` for CLI use (--hours / --days). + """ + backfill_hours = resolve_backfill_hours(hours=hours, days=days) current_hour = timezone.now().replace(minute=0, second=0, microsecond=0) - start_hour = current_hour - timedelta(hours=hours) + start_hour = current_hour - timedelta(hours=backfill_hours) return collect_path_edge_buckets_for_range(start_hour, current_hour, skip_existing=True) diff --git a/Meshflow/meshcore_packet_path/tests/test_backfill_command.py b/Meshflow/meshcore_packet_path/tests/test_backfill_command.py index 9cc3ab8..984c8a7 100644 --- a/Meshflow/meshcore_packet_path/tests/test_backfill_command.py +++ b/Meshflow/meshcore_packet_path/tests/test_backfill_command.py @@ -1,17 +1,26 @@ """Management command tests.""" +from datetime import timedelta + from django.core.management import call_command from django.utils import timezone import pytest from meshcore_packet_path.models import MeshCorePathEdgeBucket +from meshcore_packet_path.services.rollup import resolve_backfill_hours + + +def test_resolve_backfill_hours_days_and_hours(): + assert resolve_backfill_hours(days=7) == 7 * 24 + assert resolve_backfill_hours(hours=12) == 12 + assert resolve_backfill_hours() == 24 + with pytest.raises(ValueError): + resolve_backfill_hours(hours=1, days=1) @pytest.mark.django_db def test_backfill_command_creates_buckets(path_observation): - from datetime import timedelta - obs = path_observation["observation"] current_hour = timezone.now().replace(minute=0, second=0, microsecond=0) obs.upload_time = current_hour - timedelta(hours=1) @@ -22,3 +31,14 @@ def test_backfill_command_creates_buckets(path_observation): call_command("backfill_path_edge_buckets", hours=1) assert MeshCorePathEdgeBucket.objects.filter(bucket_start=obs.upload_time).count() == 2 + + +@pytest.mark.django_db +def test_backfill_command_accepts_days(path_observation): + obs = path_observation["observation"] + current_hour = timezone.now().replace(minute=0, second=0, microsecond=0) + obs.upload_time = current_hour - timedelta(hours=1) + obs.save(update_fields=["upload_time"]) + + call_command("backfill_path_edge_buckets", days=1) + assert MeshCorePathEdgeBucket.objects.exists() From 7dc25e70a4cc57069a48fbf4f4714d7fff492c8b Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Mon, 1 Jun 2026 11:24:36 +0100 Subject: [PATCH 11/12] docs(meshcore-path): track heard-map #311 and geographic path gaps Record meshflow-ui#311 for logical per-feeder paths in the heard dialog, mark #304 complete, and document future geographic hop placement. --- .../packet-path-tracing-outstanding.md | 16 ++++++++++++++++ .../traceroute/meshcore-path-outstanding.md | 3 ++- .../traceroute/meshcore-path-progress.md | 5 +++-- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/docs/features/meshcore/packet-path-tracing/packet-path-tracing-outstanding.md b/docs/features/meshcore/packet-path-tracing/packet-path-tracing-outstanding.md index 3201008..ae16fd1 100644 --- a/docs/features/meshcore/packet-path-tracing/packet-path-tracing-outstanding.md +++ b/docs/features/meshcore/packet-path-tracing/packet-path-tracing-outstanding.md @@ -15,6 +15,22 @@ Items **skipped**, **incomplete**, or **discovered during planning** — not the --- +## Message heard map (UI — logical layout, not M7) + +- [ ] **[meshflow-ui#311](https://github.com/pskillen/meshflow-ui/issues/311)** — HeardPathMap logical path per feeder: dashed schematic hop chain (one node per hash segment), **not** placed at map coordinates; keep sender/feeder at geo positions when known. Feeder list below graph shows **each observer’s distinct path** beside its row. Uses existing `heard[]` `path_hashes` / `resolved_path` from #360; no new API. + +## Geographic path on maps (future milestone — plan explicitly) + +The logical heard-map slice above is **not** a substitute for placing hops at real coordinates. A later plan/milestone must cover: + +- [ ] **Geographic hop placement** — when M2/M3 (or manual segment annotation) yields `ObservedNode` positions for path segments, message heard map and/or M7 topology UI should render hops at **lat/lng** (and set `path_known` only when all hops are resolved per ADR). +- [ ] **Wire message `heard[]` to segment resolution** — optional read path from `MeshCorePathSegmentResolution` (or resolver output) so the heard dialog benefits from staff annotations / proven matcher without duplicating rollup tables in the client. +- [ ] **M7 realtime/history maps** ([meshflow-ui#309](https://github.com/pskillen/meshflow-ui/issues/309)) — edge-based geographic and logical topology; depends on API M5/M6. + +Until then, operators should assume heard-map paths are **list-order hash evidence**, not RF geography. + +--- + ## Carried from prior passive slice - [ ] **Proven hash → `ObservedNode` matcher** — still unproven; no production matcher until [traceroute ADR-0001 §A](../../traceroute/adr/0001-mc-path-hash-resolution.md) documents a safe rule. Gates M3. Tests must reject suffix/prefix/recency heuristics. diff --git a/docs/features/traceroute/meshcore-path-outstanding.md b/docs/features/traceroute/meshcore-path-outstanding.md index 2f1d274..003dbe7 100644 --- a/docs/features/traceroute/meshcore-path-outstanding.md +++ b/docs/features/traceroute/meshcore-path-outstanding.md @@ -11,7 +11,8 @@ Items **skipped**, **incomplete**, or **discovered during planning** for [#267]( - [ ] **Passive packet path subsystem** — proposed in [meshcore/packet-path-tracing ADR-0001](../meshcore/packet-path-tracing/adr/0001-meshcore-packet-path-tracing-subsystem.md): passive edge rollups, resolution table, Neo4j export, realtime/history UI. Progress/outstanding now tracked under [packet-path-tracing/](../meshcore/packet-path-tracing/packet-path-tracing-progress.md). - [ ] **Proven matcher** — implement hash segment → `ObservedNode` when [ADR §A](adr/0001-mc-path-hash-resolution.md) documents safe rules (no `iendswith` / `last_heard` heuristics in v1). - [ ] **`GET /meshcore/packets/`** — optional `resolved_path` on packet list/detail (deferred). -- [ ] **#304** — UI `HeardPathMap` + heard dialog (meshflow-ui). +- [x] **#304** — UI `HeardPathMap` + heard dialog (meshflow-ui, closed 2026-05-27). +- [ ] **[meshflow-ui#311](https://github.com/pskillen/meshflow-ui/issues/311)** — HeardPathMap logical path per feeder (follow-up to #304): schematic hop chain, feeder list with path beside each row; not geographic hop placement. --- diff --git a/docs/features/traceroute/meshcore-path-progress.md b/docs/features/traceroute/meshcore-path-progress.md index 0e37f38..ccc5e24 100644 --- a/docs/features/traceroute/meshcore-path-progress.md +++ b/docs/features/traceroute/meshcore-path-progress.md @@ -57,9 +57,10 @@ Meshtastic today: active TR drives most topology evidence; MC Phase 3 starts wit ### UI ([#304](https://github.com/pskillen/meshflow-ui/issues/304)) -**Status:** Not started +**Status:** Done (2026-05-27) — follow-up **[meshflow-ui#311](https://github.com/pskillen/meshflow-ui/issues/311)** for logical hop layout per feeder -- `HeardPathMap` + heard dialog (MT + MC). +- `HeardPathMap` + heard dialog (MT + MC); v1 shows feeders geo + minimal dashed paths. +- #311: schematic hop chain per observer, path detail beside each feeder row (not geographic hops). --- From 532367a83af278bc857c2905ecd798b9bbafbdb4 Mon Sep 17 00:00:00 2001 From: Patrick Skillen Date: Mon, 1 Jun 2026 11:25:29 +0100 Subject: [PATCH 12/12] chore: init files --- Meshflow/meshcore_packet_path/migrations/__init__.py | 0 Meshflow/meshcore_packet_path/tests/__init__.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 Meshflow/meshcore_packet_path/migrations/__init__.py create mode 100644 Meshflow/meshcore_packet_path/tests/__init__.py diff --git a/Meshflow/meshcore_packet_path/migrations/__init__.py b/Meshflow/meshcore_packet_path/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Meshflow/meshcore_packet_path/tests/__init__.py b/Meshflow/meshcore_packet_path/tests/__init__.py new file mode 100644 index 0000000..e69de29