Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ STORAGE_API_VERSION=2
# Use these if you want to upload to a second API (usually used during testing)
# STORAGE_API_2_ROOT=...
# STORAGE_API_2_VERSION=2
# STORAGE_API_2_TOKEN=...
# STORAGE_API_2_TOKEN=... # optional; falls back to STORAGE_API_TOKEN

# Use this if you want to receive commands from the Meshflow server (e.g. traceroute)
MESHFLOW_WS_URL=ws://localhost:8000
Expand Down
2 changes: 1 addition & 1 deletion docs/MESHCORE.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Without `MESHCORE_UPLOAD_ENABLED`, `STORAGE_API_*` is ignored.

When `MESHCORE_UPLOAD_ENABLED=true` and `STORAGE_API_*` are set, the bot also:

1. **On connect** — reads the device channel table (`meshcore.commands.get_channel`) and `POST`s `/api/meshcore/feeders/{prefix}/mc-channel-sync/` (device is source of truth for names/types).
1. **On connect** — reads the device channel table once (`meshcore.commands.get_channel`) and `POST`s the same snapshot to `/api/meshcore/feeders/{prefix}/mc-channel-sync/` on **each** configured API (`STORAGE_API_ROOT` and optional `STORAGE_API_2_ROOT` when `MESHCORE_UPLOAD_ENABLED=true`). Uses `STORAGE_API_2_TOKEN` if set, otherwise the primary `STORAGE_API_TOKEN`.
2. **WebSocket** — connects to `ws/nodes/?api_key=…` (URL derived from `STORAGE_API_ROOT` when `MESHFLOW_WS_URL` is unset). MeshCore feeders automatically append `feeder_pubkey_prefix` from the device pubkey after connect (no env var). Used for UI **apply to radio** (`apply_mc_channel_config`): the REST endpoint only dispatches to the bot over WS; the bot writes the device via `set_channel`, then re-syncs to the API.

Traceroute commands remain Meshtastic-only; MC feeders ignore `traceroute` WS messages.
Expand Down
6 changes: 5 additions & 1 deletion src/api/StorageAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,11 @@ def post_mc_channel_sync(self, body: dict) -> bool:
return response.status_code in (200, 201)
except HTTPError as exc:
self._error_counter.increment("storage.post_mc_channel_sync.http")
logger.error("HTTP error posting MC channel sync: %s", exc.response.text)
logger.error(
"HTTP error posting MC channel sync to %s: %s",
self.base_url,
exc.response.text,
)
except RequestException as exc:
self._error_counter.increment("storage.post_mc_channel_sync.network")
logger.error("Network error posting MC channel sync: %s", exc)
Expand Down
7 changes: 5 additions & 2 deletions src/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,11 @@ def on_apply_mc_channel_config(self, channels: list) -> None:
return
if not apply_channels_on_device(self.radio, channels):
return
for storage_api in self.storage_apis:
sync_channels_to_api(self.radio, storage_api)
if hasattr(self.radio, "schedule_channel_sync"):
self.radio.schedule_channel_sync(self.storage_apis)
else:
for storage_api in self.storage_apis:
sync_channels_to_api(self.radio, storage_api)

def on_traceroute_command(self, target_node_id: int) -> None:
"""Handle a traceroute command (e.g. delivered via WebSocket)."""
Expand Down
14 changes: 11 additions & 3 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
STORAGE_API_TOKEN = os.getenv("STORAGE_API_TOKEN", None)
STORAGE_API_VERSION = int(os.getenv("STORAGE_API_VERSION", 1))
STORAGE_API_2_ROOT = os.getenv("STORAGE_API_2_ROOT")
STORAGE_API_2_TOKEN = os.getenv("STORAGE_API_2_TOKEN", None)
STORAGE_API_2_TOKEN = os.getenv("STORAGE_API_2_TOKEN") or STORAGE_API_TOKEN
STORAGE_API_2_VERSION = int(os.getenv("STORAGE_API_2_VERSION", 1))
MESHFLOW_WS_URL = os.getenv("MESHFLOW_WS_URL")

Expand Down Expand Up @@ -171,8 +171,8 @@ def main() -> None:
StorageAPIWrapper(
STORAGE_API_2_ROOT,
STORAGE_API_2_TOKEN,
STORAGE_API_2_VERSION,
failed_packets_dir,
api_version=2,
failed_packets_dir=failed_packets_dir,
serializer=serializer,
local_meshtastic_nodenum_provider=lambda: bot.my_nodenum,
meshcore_feeder_prefix_provider=lambda: getattr(
Expand All @@ -188,6 +188,14 @@ def main() -> None:
"RADIO_PROTOCOL=meshcore: ignoring STORAGE_API_2_ROOT (upload disabled)"
)

if bot.storage_apis:
destinations = [api.base_url for api in bot.storage_apis]
logging.info(
"API upload destinations (%s): %s",
len(destinations),
", ".join(destinations),
)

# WebSocket client (e.g. for remote traceroute commands)
ws_url = MESHFLOW_WS_URL
ws_token = None
Expand Down
52 changes: 40 additions & 12 deletions src/meshcore/channel_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
logger = logging.getLogger(__name__)


async def sync_channels_to_api_async(
radio: "MeshCoreRadio", storage: "StorageAPIWrapper"
) -> bool:
"""Read channels on the MeshCore asyncio loop and POST mc-channel-sync."""
async def read_channel_snapshot_async(radio: "MeshCoreRadio") -> Optional[dict]:
"""Read the device channel table once; return mc-channel-sync body or None."""
if not radio.is_connected:
logger.warning("MeshCore channel sync skipped: radio not connected")
return False
logger.warning("MeshCore channel read skipped: radio not connected")
return None

mc = radio._meshcore # noqa: SLF001 — intentional coupling for sync
if mc is None:
Expand All @@ -38,24 +36,54 @@ async def sync_channels_to_api_async(
channels = await read_device_channels(mc)
except Exception as exc:
logger.exception("MeshCore read_device_channels failed: %s", exc)
return False
return None

log_device_channels(channels)
body = snapshot_sync_body(channels)
return snapshot_sync_body(channels)


def post_channel_snapshot(storage: "StorageAPIWrapper", body: dict) -> bool:
"""POST a pre-built snapshot to one API destination."""
ok = storage.post_mc_channel_sync(body)
label = getattr(storage, "base_url", "storage")
if ok:
logger.info(
"MeshCore channel sync posted to API (%s channel(s))",
len(channels),
"MeshCore channel sync posted to %s (%s channel(s))",
label,
len(body.get("channels") or []),
)
else:
logger.warning(
"MeshCore channel sync to API failed (%s channel(s) read from device)",
len(channels),
"MeshCore channel sync to %s failed (%s channel(s) in snapshot)",
label,
len(body.get("channels") or []),
)
return ok


async def sync_channels_to_storage_apis_async(
radio: "MeshCoreRadio", storage_apis: list["StorageAPIWrapper"]
) -> None:
"""Read device channels once and POST the same snapshot to every configured API."""
if not storage_apis:
return
body = await read_channel_snapshot_async(radio)
if body is None:
return
for storage in storage_apis:
post_channel_snapshot(storage, body)


async def sync_channels_to_api_async(
radio: "MeshCoreRadio", storage: "StorageAPIWrapper"
) -> bool:
"""Read channels on the MeshCore asyncio loop and POST mc-channel-sync to one API."""
body = await read_channel_snapshot_async(radio)
if body is None:
return False
return post_channel_snapshot(storage, body)


def sync_channels_to_api(radio: "MeshCoreRadio", storage: "StorageAPIWrapper") -> bool:
"""Sync from a non-radio thread (e.g. WebSocket worker). Do not call from the radio loop."""
if not hasattr(radio, "run_coroutine"):
Expand Down
12 changes: 8 additions & 4 deletions src/meshcore/radio.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,15 @@ def schedule_channel_sync(self, storage_apis: list) -> None:
return

async def _task() -> None:
from src.meshcore.channel_sync import sync_channels_to_api_async
from src.meshcore.channel_sync import sync_channels_to_storage_apis_async

logger.info("MeshCore channel sync starting")
for storage in storage_apis:
await sync_channels_to_api_async(self, storage)
labels = [str(getattr(s, "base_url", "?")) for s in storage_apis]
logger.info(
"MeshCore channel sync starting (%s API destination(s): %s)",
len(storage_apis),
", ".join(labels),
)
await sync_channels_to_storage_apis_async(self, storage_apis)
logger.info("MeshCore channel sync finished")

asyncio.create_task(_task())
Expand Down
34 changes: 30 additions & 4 deletions test/meshcore/test_channel_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
apply_channels_on_device,
sync_channels_to_api,
sync_channels_to_api_async,
sync_channels_to_storage_apis_async,
)


Expand Down Expand Up @@ -50,6 +51,31 @@ async def _run():
storage.post_mc_channel_sync.assert_called_once()


def test_sync_channels_to_storage_apis_posts_once_reads_once() -> None:
radio = _MeshCoreRadioStub(connected=True, meshcore=MagicMock())
storage_a = MagicMock()
storage_a.base_url = "http://api-one"
storage_a.post_mc_channel_sync.return_value = True
storage_b = MagicMock()
storage_b.base_url = "http://api-two"
storage_b.post_mc_channel_sync.return_value = True
channels = [{"mc_channel_idx": 0, "name": "Public", "mc_channel_type": "PUBLIC"}]
body = {"channels": channels, "synced_at": "2026-01-01T00:00:00Z"}

async def _run():
with patch(
"src.meshcore.channel_sync.read_channel_snapshot_async",
new_callable=AsyncMock,
return_value=body,
) as read_mock:
await sync_channels_to_storage_apis_async(radio, [storage_a, storage_b])
read_mock.assert_awaited_once()

asyncio.run(_run())
storage_a.post_mc_channel_sync.assert_called_once_with(body)
storage_b.post_mc_channel_sync.assert_called_once_with(body)


def test_sync_channels_logs_and_reports_api_failure(caplog) -> None:
import logging

Expand All @@ -61,15 +87,15 @@ def test_sync_channels_logs_and_reports_api_failure(caplog) -> None:

async def _run():
with patch(
"src.meshcore.channel_sync.read_device_channels",
"src.meshcore.channel_sync.read_channel_snapshot_async",
new_callable=AsyncMock,
return_value=channels,
return_value={"channels": channels, "synced_at": "2026-01-01T00:00:00Z"},
):
storage.base_url = "http://api.test"
return await sync_channels_to_api_async(radio, storage)

assert asyncio.run(_run()) is False
assert "MeshCore device channels (1):" in caplog.text
assert "channel sync to API failed" in caplog.text
assert "channel sync to http://api.test failed" in caplog.text


def test_sync_channels_skipped_when_disconnected() -> None:
Expand Down
6 changes: 3 additions & 3 deletions test/meshcore/test_radio_channel_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ async def _runner():
radio._meshcore.is_connected = True

storage = MagicMock()
storage.base_url = "http://api.test"
done = asyncio.Event()

async def _fake_sync(_radio, _storage):
async def _fake_sync(_radio, _storages):
done.set()
return True

with patch(
"src.meshcore.channel_sync.sync_channels_to_api_async",
"src.meshcore.channel_sync.sync_channels_to_storage_apis_async",
side_effect=_fake_sync,
):
radio.schedule_channel_sync([storage])
Expand Down
7 changes: 3 additions & 4 deletions test/test_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,13 @@ def test_apply_mc_channel_config_syncs_after_apply(self):
bot = MeshflowBot(radio=mc_radio)
bot.storage_apis = [MagicMock(), MagicMock()]
channels = [{"mc_channel_idx": 0, "name": "Public"}]
mc_radio.schedule_channel_sync = MagicMock()
with patch(
"src.meshcore.channel_sync.apply_channels_on_device", return_value=True
) as apply_mock, patch(
"src.meshcore.channel_sync.sync_channels_to_api", return_value=True
) as sync_mock:
) as apply_mock:
bot.on_apply_mc_channel_config(channels)
apply_mock.assert_called_once_with(mc_radio, channels)
self.assertEqual(sync_mock.call_count, 2)
mc_radio.schedule_channel_sync.assert_called_once_with(bot.storage_apis)

def test_meshcore_connection_schedules_channel_sync(self):
mc_radio = MagicMock()
Expand Down