Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion Meshflow/Meshflow/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,14 @@

app = Celery("Meshflow")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks(["traceroute", "stats", "mesh_monitoring", "rf_propagation", "nodes", "dx_monitoring"])
app.autodiscover_tasks(
[
"traceroute",
"stats",
"mesh_monitoring",
"rf_propagation",
"nodes",
"dx_monitoring",
"meshcore_packet_path",
]
)
3 changes: 3 additions & 0 deletions Meshflow/Meshflow/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
"nodes",
"packets",
"meshcore_packets",
"meshcore_packet_path",
"stats",
"text_messages",
"traceroute",
Expand Down Expand Up @@ -215,6 +216,8 @@ def _rf_env_bool(name: str, default: bool) -> bool:
# Per-node retention: how many ``ready`` renders to keep on disk before GC.
RF_PROPAGATION_READY_RETENTION = int(os.environ.get("RF_PROPAGATION_READY_RETENTION", "3"))

MESHCORE_PATH_RETENTION_DAYS = int(os.environ.get("MESHCORE_PATH_RETENTION_DAYS", "183"))

# Django cache (Redis DB 2; channels use DB 0, Celery broker DB 1)
_cache_url = f"redis://:{_redis_password}@{_redis_host}:{_redis_port}/2"
CACHES = {
Expand Down
Empty file.
36 changes: 36 additions & 0 deletions Meshflow/meshcore_packet_path/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from django.contrib import admin

from meshcore_packet_path.models import MeshCorePathEdgeBucket, MeshCorePathSegmentResolution


@admin.register(MeshCorePathSegmentResolution)
class MeshCorePathSegmentResolutionAdmin(admin.ModelAdmin):
list_display = (
"segment_hash",
"hash_size",
"hash_mode",
"status",
"source",
"observed_node",
"last_seen_at",
)
list_filter = ("status", "hash_size", "hash_mode", "source")
search_fields = ("segment_hash", "observed_node__long_name")
readonly_fields = ("id", "first_seen_at")
date_hierarchy = "last_seen_at"


@admin.register(MeshCorePathEdgeBucket)
class MeshCorePathEdgeBucketAdmin(admin.ModelAdmin):
list_display = (
"bucket_start",
"from_hash",
"to_hash",
"observer",
"packet_count",
"observation_count",
"last_seen_at",
)
list_filter = ("bucket_size", "from_kind", "to_kind", "constellation")
search_fields = ("from_hash", "to_hash")
date_hierarchy = "bucket_start"
7 changes: 7 additions & 0 deletions Meshflow/meshcore_packet_path/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from django.apps import AppConfig


class MeshcorePacketPathConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "meshcore_packet_path"
verbose_name = "MeshCore packet path"
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Backfill passive path edge buckets for past hours."""

from datetime import timedelta

from django.core.management.base import BaseCommand
from django.utils import timezone

from meshcore_packet_path.services.rollup import (
collect_path_edge_buckets_for_range,
resolve_backfill_hours,
)


class Command(BaseCommand):
help = "Backfill MeshCore path edge buckets for the last N hours or days (idempotent)"

def add_arguments(self, parser):
parser.add_argument(
"--hours",
type=int,
default=None,
help="Number of hours to backfill (default: use --days or 24)",
)
parser.add_argument(
"--days",
type=int,
default=None,
help="Number of days to backfill",
)

def handle(self, *args, **options):
try:
hours = resolve_backfill_hours(hours=options.get("hours"), days=options.get("days"))
except ValueError as exc:
self.stderr.write(self.style.ERROR(str(exc)))
return

current_hour = timezone.now().replace(minute=0, second=0, microsecond=0)
start_hour = current_hour - timedelta(hours=hours)
self.stdout.write(
f"Backfilling path edge buckets for {hours} hour(s) " f"from {start_hour} to {current_hour}..."
)
result = collect_path_edge_buckets_for_range(
start_hour,
current_hour,
skip_existing=True,
show_progress=True,
)
self.stdout.write(
self.style.SUCCESS(
f"Done: created={result['created']}, updated={result['updated']}, "
f"skipped_hours={result['skipped_hours']}, "
f"observations_processed={result['observations_processed']}"
)
)
70 changes: 70 additions & 0 deletions Meshflow/meshcore_packet_path/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Generated by Django 5.2.14 on 2026-06-01 09:17

import django.db.models.deletion
import django.utils.timezone
import uuid
from django.db import migrations, models


class Migration(migrations.Migration):

initial = True

dependencies = [
('constellations', '0011_remove_constellationusermembership'),
('nodes', '0050_managednode_protocol_identity'),
]

operations = [
migrations.CreateModel(
name='MeshCorePathEdgeBucket',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('bucket_start', models.DateTimeField(db_index=True)),
('bucket_size', models.CharField(default='1h', max_length=8)),
('from_kind', models.CharField(choices=[('hash', 'Hash'), ('node', 'Node'), ('feeder', 'Feeder'), ('unknown', 'Unknown')], default='hash', max_length=16)),
('to_kind', models.CharField(choices=[('hash', 'Hash'), ('node', 'Node'), ('feeder', 'Feeder'), ('unknown', 'Unknown')], default='hash', max_length=16)),
('from_hash', models.CharField(blank=True, default='', max_length=32)),
('to_hash', models.CharField(blank=True, default='', max_length=32)),
('packet_count', models.PositiveIntegerField(default=0)),
('observation_count', models.PositiveIntegerField(default=0)),
('first_seen_at', models.DateTimeField(blank=True, null=True)),
('last_seen_at', models.DateTimeField(blank=True, null=True)),
('avg_snr', models.FloatField(blank=True, null=True)),
('min_snr', models.FloatField(blank=True, null=True)),
('max_snr', models.FloatField(blank=True, null=True)),
('constellation', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='meshcore_path_edge_buckets', to='constellations.constellation')),
('from_node', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='meshcore_path_edges_from', to='nodes.observednode')),
('observer', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='meshcore_path_edge_buckets', to='nodes.managednode')),
('to_node', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='meshcore_path_edges_to', to='nodes.observednode')),
],
options={
'verbose_name': 'MeshCore path edge bucket',
'verbose_name_plural': 'MeshCore path edge buckets',
'indexes': [models.Index(fields=['-bucket_start'], name='meshcore_pa_bucket__9276cc_idx'), models.Index(fields=['from_hash', 'to_hash'], name='meshcore_pa_from_ha_671731_idx')],
'constraints': [models.UniqueConstraint(fields=('bucket_start', 'bucket_size', 'from_kind', 'to_kind', 'from_hash', 'to_hash', 'observer', 'constellation'), name='meshcore_path_edge_bucket_unique', nulls_distinct=False)],
},
),
migrations.CreateModel(
name='MeshCorePathSegmentResolution',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('segment_hash', models.CharField(db_index=True, max_length=32)),
('hash_size', models.PositiveSmallIntegerField(blank=True, null=True)),
('hash_mode', models.PositiveSmallIntegerField(blank=True, null=True)),
('status', models.CharField(choices=[('unknown', 'Unknown'), ('resolved', 'Resolved'), ('ambiguous', 'Ambiguous'), ('stale', 'Stale')], db_index=True, default='unknown', max_length=16)),
('source', models.CharField(blank=True, default='', max_length=32)),
('resolver_version', models.PositiveIntegerField(default=1)),
('confidence', models.FloatField(blank=True, null=True)),
('first_seen_at', models.DateTimeField(default=django.utils.timezone.now)),
('last_seen_at', models.DateTimeField(default=django.utils.timezone.now)),
('observed_node', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='meshcore_path_segments', to='nodes.observednode')),
],
options={
'verbose_name': 'MeshCore path segment resolution',
'verbose_name_plural': 'MeshCore path segment resolutions',
'indexes': [models.Index(fields=['-last_seen_at'], name='meshcore_pa_last_se_c35b9b_idx'), models.Index(fields=['status', '-last_seen_at'], name='meshcore_pa_status_6b0dba_idx')],
'constraints': [models.UniqueConstraint(fields=('hash_mode', 'hash_size', 'segment_hash'), name='meshcore_path_segment_identity_unique')],
},
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""Add PeriodicTasks for path edge rollup and eviction."""

from django.db import migrations


def create_periodic_tasks(apps, schema_editor):
CrontabSchedule = apps.get_model("django_celery_beat", "CrontabSchedule")
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")

hourly, _ = CrontabSchedule.objects.get_or_create(
minute="5",
hour="*",
day_of_week="*",
day_of_month="*",
month_of_year="*",
defaults={"timezone": "UTC"},
)
PeriodicTask.objects.get_or_create(
name="collect_path_edge_buckets",
defaults={
"task": "meshcore_packet_path.tasks.collect_path_edge_buckets",
"crontab": hourly,
"enabled": True,
},
)

daily, _ = CrontabSchedule.objects.get_or_create(
minute="15",
hour="2",
day_of_week="*",
day_of_month="*",
month_of_year="*",
defaults={"timezone": "UTC"},
)
PeriodicTask.objects.get_or_create(
name="evict_old_path_data",
defaults={
"task": "meshcore_packet_path.tasks.evict_old_path_data",
"crontab": daily,
"enabled": True,
},
)


def remove_periodic_tasks(apps, schema_editor):
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
PeriodicTask.objects.filter(
name__in=("collect_path_edge_buckets", "evict_old_path_data")
).delete()


class Migration(migrations.Migration):

dependencies = [
("meshcore_packet_path", "0001_initial"),
("django_celery_beat", "0001_initial"),
]

operations = [
migrations.RunPython(create_periodic_tasks, remove_periodic_tasks),
]
Empty file.
Loading