Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Meshflow/meshcore_packets/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
)
from meshcore_packets.services.channel import resolve_mc_channel
from meshcore_packets.services.dedup import find_existing_packet
from meshcore_packets.services.path_hashes import enrich_validated_data_paths
from meshcore_packets.services.path_twin import sync_path_from_rx_log_twin, sync_path_to_channel_text_twin


def _parse_rx_time(value) -> datetime:
Expand Down Expand Up @@ -63,8 +65,21 @@ def validate(self, attrs):
pass # rx_log_data ADVERT may supply adv fields without from_pubkey in envelope
return attrs

def _run_path_twin_sync(self, packet, observer):
if self.observation is None:
return
if packet.payload_type == MeshCorePayloadType.RAW:
sync_path_to_channel_text_twin(
packet=packet,
observer=observer,
observation=self.observation,
)
elif packet.payload_type == MeshCorePayloadType.CHANNEL_TEXT and isinstance(packet, MeshCoreTextPacket):
sync_path_from_rx_log_twin(packet=packet, observer=observer)

def create(self, validated_data):
observer = self.context["observer"]
enrich_validated_data_paths(validated_data)
rx_time = _parse_rx_time(validated_data["rx_time"])

from_pubkey = validated_data.get("from_pubkey") or None
Expand All @@ -85,6 +100,7 @@ def create(self, validated_data):
if existing:
self.instance = existing
self._ensure_observation(existing, observer, validated_data, rx_time)
self._run_path_twin_sync(existing, observer)
return existing

payload_type_map = {
Expand Down Expand Up @@ -126,6 +142,7 @@ def create(self, validated_data):

self.instance = packet
self._ensure_observation(packet, observer, validated_data, rx_time)
self._run_path_twin_sync(packet, observer)
return packet

def _ensure_observation(self, packet, observer, validated_data, rx_time):
Expand Down
62 changes: 62 additions & 0 deletions Meshflow/meshcore_packets/services/path_hashes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Split MeshCore wire ``path`` hex into hop hash segments (server-side)."""

from __future__ import annotations

from typing import Any


def split_path_hex(path: str, path_hash_size: int) -> list[str]:
"""Split concatenated path hex into per-hop segments (``path_hash_size`` bytes each)."""
if not path or not isinstance(path, str):
return []
size = int(path_hash_size or 2)
if size < 1:
size = 2
width = size * 2
return [path[i : i + width] for i in range(0, len(path), width) if path[i : i + width]]


def _path_from_payload(payload: dict) -> tuple[list[str] | None, int | None, int | None]:
if not isinstance(payload, dict):
return None, None, None
existing = payload.get("path_hashes")
if isinstance(existing, list) and existing:
return [str(p) for p in existing], payload.get("path_hash_size"), payload.get("path_hash_mode")
path = payload.get("path")
size = payload.get("path_hash_size")
if size is None:
size = 2
if isinstance(path, list) and path:
return [str(p) for p in path], int(size), payload.get("path_hash_mode")
if isinstance(path, str) and path:
segments = split_path_hex(path, int(size))
return segments or None, int(size), payload.get("path_hash_mode")
return None, payload.get("path_hash_size"), payload.get("path_hash_mode")


def path_hashes_from_ingest(validated_data: dict[str, Any]) -> list[str] | None:
"""Resolve path_hashes from ingest body or nested capture envelope."""
segments, _, _ = _path_from_payload(validated_data)
if segments:
return segments

raw = validated_data.get("raw")
if not isinstance(raw, dict):
return None

envelope = raw
if isinstance(raw.get("raw"), dict):
envelope = raw["raw"]
elif raw.get("protocol") != "meshcore" and isinstance(raw.get("payload"), dict):
envelope = raw

payload = envelope.get("payload") if isinstance(envelope, dict) else None
segments, _, _ = _path_from_payload(payload or {})
return segments


def enrich_validated_data_paths(validated_data: dict[str, Any]) -> None:
"""Populate ``path_hashes`` on validated_data when only wire ``path`` is present."""
segments = path_hashes_from_ingest(validated_data)
if segments and not validated_data.get("path_hashes"):
validated_data["path_hashes"] = segments
165 changes: 165 additions & 0 deletions Meshflow/meshcore_packets/services/path_twin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""Copy path_hashes from rx_log TEXT_MSG/PATH rows onto channel_text packet observations."""

from __future__ import annotations

import logging

from meshcore_packets.models import (
MeshCorePacketObservation,
MeshCorePayloadType,
MeshCoreRawPacket,
MeshCoreTextPacket,
)
from meshcore_packets.services.channel import resolve_mc_channel
from meshcore_packets.services.dedup import decoded_twin_window

logger = logging.getLogger(__name__)

PATH_RX_TYPENAMES = frozenset({"TEXT_MSG", "PATH"})


def rx_log_payload_typename(packet: MeshCoreRawPacket) -> str:
raw_json = packet.raw_json or {}
envelope = raw_json
nested = raw_json.get("raw")
if isinstance(nested, dict):
envelope = nested
if not isinstance(envelope, dict):
return ""
payload = envelope.get("payload") or {}
return str(payload.get("payload_typename", "")).upper()


def channel_idx_from_packet_raw_json(packet: MeshCoreRawPacket) -> int | None:
raw_json = packet.raw_json or {}
if raw_json.get("channel_idx") is not None:
return int(raw_json["channel_idx"])
envelope = raw_json.get("raw")
if isinstance(envelope, dict):
payload = envelope.get("payload") or {}
if payload.get("channel_idx") is not None:
return int(payload["channel_idx"])
return None


def _path_fields_from_observation(observation: MeshCorePacketObservation) -> dict | None:
if not observation.path_hashes:
return None
return {
"path_hashes": observation.path_hashes,
"path_hash_size": observation.path_hash_size,
"path_hash_mode": observation.path_hash_mode,
}


def _prefer_path_fields(existing: list[str] | None, incoming: list[str]) -> list[str]:
if not existing:
return incoming
if len(incoming) > len(existing):
return incoming
return existing


def apply_path_to_text_observation(
*,
text_packet: MeshCoreTextPacket,
observer,
path_hashes: list[str],
path_hash_size: int | None = None,
path_hash_mode: int | None = None,
) -> bool:
"""Merge path onto the text packet observation; prefer longer hop lists."""
if not path_hashes:
return False
obs, _ = MeshCorePacketObservation.objects.get_or_create(
packet=text_packet,
observer=observer,
)
merged = _prefer_path_fields(obs.path_hashes, path_hashes)
changed = (
obs.path_hashes != merged
or (path_hash_size is not None and obs.path_hash_size != path_hash_size)
or (path_hash_mode is not None and obs.path_hash_mode != path_hash_mode)
)
if not changed:
return False
obs.path_hashes = merged
if path_hash_size is not None:
obs.path_hash_size = path_hash_size
if path_hash_mode is not None:
obs.path_hash_mode = path_hash_mode
obs.save(
update_fields=["path_hashes", "path_hash_size", "path_hash_mode"],
)
return True


def _pick_channel_text_twin(*, observer, anchor_time, channel_idx: int | None):
window = decoded_twin_window()
candidates = MeshCoreTextPacket.objects.filter(
observer=observer,
payload_type=MeshCorePayloadType.CHANNEL_TEXT,
rx_time__gte=anchor_time - window,
rx_time__lte=anchor_time + window,
).order_by("-rx_time")
count = candidates.count()
if count == 0:
return None
if count == 1:
return candidates.first()
if channel_idx is not None:
channel = resolve_mc_channel(observer, channel_idx)
if channel:
narrowed = candidates.filter(channel=channel)
if narrowed.count() == 1:
return narrowed.first()
logger.debug("path_twin: %s channel_text candidates in window; skip merge", count)
return None


def sync_path_to_channel_text_twin(
*, packet: MeshCoreRawPacket, observer, observation: MeshCorePacketObservation
) -> bool:
"""After ingesting rx_log raw TEXT_MSG/PATH, copy path onto a nearby channel_text packet."""
if packet.payload_type != MeshCorePayloadType.RAW:
return False
if packet.event_type != "rx_log_data":
return False
if rx_log_payload_typename(packet) not in PATH_RX_TYPENAMES:
return False
fields = _path_fields_from_observation(observation)
if not fields:
return False
twin = _pick_channel_text_twin(
observer=observer,
anchor_time=packet.rx_time,
channel_idx=channel_idx_from_packet_raw_json(packet),
)
if not twin:
return False
return apply_path_to_text_observation(text_packet=twin, observer=observer, **fields)


def sync_path_from_rx_log_twin(*, packet: MeshCoreTextPacket, observer) -> bool:
"""After ingesting channel_text, copy path from a nearby rx_log TEXT_MSG/PATH observation."""
if packet.payload_type != MeshCorePayloadType.CHANNEL_TEXT:
return False
window = decoded_twin_window()
raw_packets = MeshCoreRawPacket.objects.filter(
observer=observer,
payload_type=MeshCorePayloadType.RAW,
event_type="rx_log_data",
rx_time__gte=packet.rx_time - window,
rx_time__lte=packet.rx_time + window,
).order_by("-rx_time")
for raw in raw_packets:
if rx_log_payload_typename(raw) not in PATH_RX_TYPENAMES:
continue
obs = MeshCorePacketObservation.objects.filter(packet=raw, observer=observer).first()
if not obs:
continue
fields = _path_fields_from_observation(obs)
if not fields:
continue
return apply_path_to_text_observation(text_packet=packet, observer=observer, **fields)
return False
36 changes: 36 additions & 0 deletions Meshflow/meshcore_packets/tests/test_path_hashes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Tests for meshcore_packets.services.path_hashes."""

from meshcore_packets.services.path_hashes import enrich_validated_data_paths, path_hashes_from_ingest, split_path_hex


def test_split_path_hex_two_byte_hops():
assert split_path_hex("aabbccdd", 2) == ["aabb", "ccdd"]


def test_split_path_hex_three_byte_hop():
assert split_path_hex("f3bcf1", 3) == ["f3bcf1"]


def test_path_hashes_from_ingest_top_level_path():
data = {"path": "aabb", "path_hash_size": 2}
assert path_hashes_from_ingest(data) == ["aabb"]


def test_path_hashes_from_nested_envelope():
data = {
"raw": {
"protocol": "meshcore",
"event_type": "rx_log_data",
"payload": {
"path": "f3bcf1",
"path_hash_size": 3,
},
},
}
assert path_hashes_from_ingest(data) == ["f3bcf1"]


def test_enrich_validated_data_paths():
data = {"path": "aabbcc", "path_hash_size": 2}
enrich_validated_data_paths(data)
assert data["path_hashes"] == ["aabb", "cc"]
Loading
Loading