From b8356326c3b1a20472afe3959128658b5f2a1110 Mon Sep 17 00:00:00 2001 From: Trefor Southwell Date: Mon, 22 Jun 2026 09:53:39 +0100 Subject: [PATCH 1/4] Gateway: Fix plan re-publish issue --- apps/predbat/gateway.py | 36 ++++-- apps/predbat/predbat.py | 2 +- apps/predbat/tests/test_gateway.py | 106 ++++++++++++++++++ ...edbat_debug_pre_saving1.yaml.expected.json | 2 +- 4 files changed, 137 insertions(+), 9 deletions(-) diff --git a/apps/predbat/gateway.py b/apps/predbat/gateway.py index a3185ba3a..868582f01 100644 --- a/apps/predbat/gateway.py +++ b/apps/predbat/gateway.py @@ -185,6 +185,11 @@ def initialize(self, gateway_device_id=None, mqtt_host=None, mqtt_port=8883, mqt self._last_telemetry_time = 0 self._last_plan_data = None self._last_plan_publish_time = 0 + # Entries and timezone of the last built plan, kept so the periodic re-publish + # can rebuild the protobuf with a fresh timestamp (re-sending the cached bytes + # would keep the original timestamp and the device would think the plan is stale) + self._last_plan_entries = None + self._last_plan_timezone = None self._plan_version = 0 self._refresh_in_progress = False self._error_count = 0 @@ -403,13 +408,8 @@ async def run(self, seconds, first): # Token refresh check await self._check_token_refresh() - # Re-publish plan if stale - if self._last_plan_data and self._mqtt_connected: - elapsed = time.time() - self._last_plan_publish_time - if elapsed > _PLAN_REPUBLISH_INTERVAL: - await self._publish_raw(self.topic_schedule, self._last_plan_data, retain=True) - self._last_plan_publish_time = time.time() - self.log("Info: GatewayMQTT: Re-published execution plan (stale)") + # Re-publish the plan periodically so its embedded timestamp stays fresh + await self._republish_plan_if_stale() # Mark component as alive on successful housekeeping self.update_success_timestamp() @@ -1177,6 +1177,8 @@ async def publish_plan(self, plan_entries, timezone_str): self._plan_version += 1 data = self.build_execution_plan(plan_entries, plan_version=self._plan_version, timezone=timezone_str) self._last_plan_data = data + self._last_plan_entries = plan_entries + self._last_plan_timezone = timezone_str self._last_plan_publish_time = time.time() if self._mqtt_connected: @@ -1186,6 +1188,26 @@ async def publish_plan(self, plan_entries, timezone_str): else: self.log("Warn: GatewayMQTT: Not connected — plan queued for next publish") + async def _republish_plan_if_stale(self): + """Re-publish the last plan periodically so its embedded timestamp stays fresh. + + Called on every run() cycle. When more than _PLAN_REPUBLISH_INTERVAL has elapsed + since the last publish, the plan protobuf is rebuilt (which stamps a fresh + timestamp) and re-published. Rebuilding is essential: re-sending the cached bytes + would carry the original timestamp, so the device would still treat the plan as + stale. The version is left unchanged because the plan content has not changed. + Gated on an active MQTT connection and a previously built plan. + """ + if self._last_plan_entries is None or self._last_plan_timezone is None or not self._mqtt_connected: + return + if time.time() - self._last_plan_publish_time <= _PLAN_REPUBLISH_INTERVAL: + return + data = self.build_execution_plan(self._last_plan_entries, plan_version=self._plan_version, timezone=self._last_plan_timezone) + self._last_plan_data = data + self._last_plan_publish_time = time.time() + await self._publish_raw(self.topic_schedule, data, retain=True) + self.log("Info: GatewayMQTT: Re-published execution plan (refreshed timestamp)") + async def publish_command(self, command, **kwargs): """Build and publish a JSON command to the gateway. diff --git a/apps/predbat/predbat.py b/apps/predbat/predbat.py index 20c768308..1b403632b 100644 --- a/apps/predbat/predbat.py +++ b/apps/predbat/predbat.py @@ -35,7 +35,7 @@ import pytz import asyncio -THIS_VERSION = "v8.41.1" +THIS_VERSION = "v8.41.2" from download import predbat_update_move, predbat_update_download, check_install, DEFAULT_PREDBAT_REPOSITORY from const import MINUTE_WATT diff --git a/apps/predbat/tests/test_gateway.py b/apps/predbat/tests/test_gateway.py index da968ec5e..3cbe97a7a 100644 --- a/apps/predbat/tests/test_gateway.py +++ b/apps/predbat/tests/test_gateway.py @@ -1060,6 +1060,111 @@ def test_plan_publish_format(self): assert plan2.plan_version > plan.plan_version +class TestPlanRepublish: + """Tests for the periodic plan re-publish that refreshes the embedded timestamp.""" + + def _entries(self): + return [ + { + "enabled": True, + "start_hour": 1, + "start_minute": 30, + "end_hour": 4, + "end_minute": 30, + "mode": 1, + "power_w": 3000, + "target_soc": 100, + "days_of_week": 0x7F, + "use_native": True, + } + ] + + def _make_gateway(self): + from gateway import GatewayMQTT + from unittest.mock import MagicMock + + gw = GatewayMQTT.__new__(GatewayMQTT) + gw.log = MagicMock() + gw._mqtt_connected = True + gw._last_plan_data = None + gw._last_plan_entries = None + gw._last_plan_timezone = None + gw._last_plan_publish_time = 0 + gw._plan_version = 0 + gw._last_published_plan = None + gw.topic_schedule = "predbat/schedule" + gw._published = [] + + async def fake_publish_raw(topic, payload, retain=False): + gw._published.append((topic, payload, retain)) + + gw._publish_raw = fake_publish_raw + return gw + + def _run(self, coro): + import asyncio + + return asyncio.run(coro) + + def test_publish_plan_stores_entries_and_timezone(self): + """publish_plan records the entries and timezone needed for a later rebuild.""" + gw = self._make_gateway() + entries = self._entries() + self._run(gw.publish_plan(entries, "Europe/London")) + assert gw._last_plan_entries == entries + assert gw._last_plan_timezone == "Europe/London" + assert len(gw._published) == 1 + + def test_republish_refreshes_timestamp(self): + """Re-publishing a stale plan rebuilds it with a newer timestamp, same version.""" + gw = self._make_gateway() + self._run(gw.publish_plan(self._entries(), "Europe/London")) + first_payload = gw._published[0][1] + first_plan = pb.ExecutionPlan() + first_plan.ParseFromString(first_payload) + + # Pretend the first publish was long enough ago to exceed the re-publish interval. + from gateway import _PLAN_REPUBLISH_INTERVAL + + gw._last_plan_publish_time -= _PLAN_REPUBLISH_INTERVAL + 60 + + self._run(gw._republish_plan_if_stale()) + + assert len(gw._published) == 2 + second_payload = gw._published[1][1] + second_plan = pb.ExecutionPlan() + second_plan.ParseFromString(second_payload) + + assert second_plan.timestamp >= first_plan.timestamp # rebuilt with current clock + assert second_plan.plan_version == first_plan.plan_version # content unchanged → same version + # The cached bytes are refreshed so subsequent reads reflect the new timestamp. + assert gw._last_plan_data == second_payload + + def test_no_republish_before_interval(self): + """A plan younger than the re-publish interval is not re-sent.""" + gw = self._make_gateway() + self._run(gw.publish_plan(self._entries(), "Europe/London")) + self._run(gw._republish_plan_if_stale()) + assert len(gw._published) == 1 + + def test_no_republish_when_disconnected(self): + """Nothing is re-published while MQTT is disconnected.""" + gw = self._make_gateway() + self._run(gw.publish_plan(self._entries(), "Europe/London")) + from gateway import _PLAN_REPUBLISH_INTERVAL + + gw._last_plan_publish_time -= _PLAN_REPUBLISH_INTERVAL + 60 + gw._mqtt_connected = False + self._run(gw._republish_plan_if_stale()) + assert len(gw._published) == 1 + + def test_no_republish_without_prior_plan(self): + """With no plan ever built, the re-publish is a no-op.""" + gw = self._make_gateway() + self._run(gw._republish_plan_if_stale()) + assert gw._published == [] + + class TestAutomaticConfig: """Tests for GatewayMQTT.automatic_config() entity-to-arg mapping.""" @@ -3268,6 +3373,7 @@ def run_gateway_tests(my_predbat=None): TestTokenRefresh, TestPlanHookConversion, TestMQTTIntegration, + TestPlanRepublish, TestPublishPredbatData, TestIanaToPosixTz, TestCheckInverterResets, diff --git a/coverage/cases/predbat_debug_pre_saving1.yaml.expected.json b/coverage/cases/predbat_debug_pre_saving1.yaml.expected.json index d820f852c..29731197f 100644 --- a/coverage/cases/predbat_debug_pre_saving1.yaml.expected.json +++ b/coverage/cases/predbat_debug_pre_saving1.yaml.expected.json @@ -1 +1 @@ -{"charge_limit_best": [3.02, 0.38, 9.52, 9.52, 9.52, 0.38, 0.38, 0.38, 7.02, 0.38, 9.52, 9.52], "charge_window_best": [{"start": 1020, "end": 1050, "average": 25.58, "target": 3.02}, {"start": 1140, "end": 1380, "average": 25.58, "target": 0.38}, {"start": 1410, "end": 1660, "average": 7.0, "target": 9.478}, {"start": 1680, "end": 1735, "average": 7.0, "target": 9.519}, {"start": 1740, "end": 1770, "average": 7.0, "target": 9.52}, {"start": 1830, "end": 2100, "average": 25.58, "target": 0.38}, {"start": 2160, "end": 2250, "average": 25.58, "target": 0.38}, {"start": 2310, "end": 2340, "average": 25.58, "target": 0.38}, {"start": 2340, "end": 2370, "average": 25.58, "target": 7.02}, {"start": 2370, "end": 2430, "average": 25.58, "target": 0.38}, {"start": 2850, "end": 3210, "average": 7.0, "target": 9.52}, {"start": 3210, "end": 3900, "average": 25.58, "target": 9.52}], "export_window_best": [{"average": 75.0, "end": 1140, "start": 1080, "set": 69.8, "start_orig": 1080, "target": 7}, {"average": 15.0, "end": 1680, "start": 1660, "set": 14.0, "target": 92}, {"average": 15.0, "end": 1740, "start": 1735, "set": 14.0, "start_orig": 1710, "target": 98}], "export_limits_best": [2.0, 88.0, 94.0]} \ No newline at end of file +{"charge_limit_best": [3.02, 0.38, 9.52, 9.52, 9.52, 0.38, 0.38, 0.38, 7.02, 0.38, 9.52, 9.52], "charge_window_best": [{"start": 1020, "end": 1050, "average": 25.58, "target": 3.02}, {"start": 1140, "end": 1380, "average": 25.58, "target": 0.38}, {"start": 1410, "end": 1660, "average": 7.0, "target": 9.478}, {"start": 1680, "end": 1735, "average": 7.0, "target": 9.519}, {"start": 1740, "end": 1770, "average": 7.0, "target": 9.52}, {"start": 1830, "end": 2100, "average": 25.58, "target": 0.38}, {"start": 2160, "end": 2250, "average": 25.58, "target": 0.38}, {"start": 2310, "end": 2340, "average": 25.58, "target": 0.38}, {"start": 2340, "end": 2370, "average": 25.58, "target": 7.02}, {"start": 2370, "end": 2430, "average": 25.58, "target": 0.38}, {"start": 2850, "end": 3210, "average": 7.0, "target": 9.52}, {"start": 3210, "end": 3900, "average": 25.58, "target": 9.52}], "export_window_best": [{"average": 75.0, "end": 1140, "start": 1080, "set": 69.8, "start_orig": 1080, "target": 7}, {"average": 15.0, "end": 1680, "start": 1660, "set": 14.0, "target": 92}, {"average": 15.0, "end": 1740, "start": 1735, "set": 14.0, "start_orig": 1710, "target": 98}], "export_limits_best": [2.0, 88.0, 94.0]} From 2255e402d7b1e37d635cde384129de1148f8c2d6 Mon Sep 17 00:00:00 2001 From: Trefor Southwell <48591903+springfall2008@users.noreply.github.com> Date: Mon, 22 Jun 2026 09:58:35 +0100 Subject: [PATCH 2/4] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- apps/predbat/gateway.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/predbat/gateway.py b/apps/predbat/gateway.py index 868582f01..d336e1cb8 100644 --- a/apps/predbat/gateway.py +++ b/apps/predbat/gateway.py @@ -1203,9 +1203,9 @@ async def _republish_plan_if_stale(self): if time.time() - self._last_plan_publish_time <= _PLAN_REPUBLISH_INTERVAL: return data = self.build_execution_plan(self._last_plan_entries, plan_version=self._plan_version, timezone=self._last_plan_timezone) + await self._publish_raw(self.topic_schedule, data, retain=True) self._last_plan_data = data self._last_plan_publish_time = time.time() - await self._publish_raw(self.topic_schedule, data, retain=True) self.log("Info: GatewayMQTT: Re-published execution plan (refreshed timestamp)") async def publish_command(self, command, **kwargs): From 3842e4543189d1bacd3486fc596b2a7c4200ad7d Mon Sep 17 00:00:00 2001 From: Trefor Southwell <48591903+springfall2008@users.noreply.github.com> Date: Mon, 22 Jun 2026 10:36:37 +0100 Subject: [PATCH 3/4] Fix docstring formatting in test_gateway.py --- apps/predbat/tests/test_gateway.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/predbat/tests/test_gateway.py b/apps/predbat/tests/test_gateway.py index 3cbe97a7a..102e6aa62 100644 --- a/apps/predbat/tests/test_gateway.py +++ b/apps/predbat/tests/test_gateway.py @@ -1,4 +1,6 @@ -"""Tests for GatewayMQTT component.""" +""" +Tests for GatewayMQTT component. +""" import sys import os import math From 8bbcef71a49e2301ea30493e2abad655ff26316b Mon Sep 17 00:00:00 2001 From: Trefor Southwell Date: Mon, 22 Jun 2026 10:38:52 +0100 Subject: [PATCH 4/4] Review feedback --- apps/predbat/gateway.py | 17 ++++++++++----- apps/predbat/tests/test_gateway.py | 35 ++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/apps/predbat/gateway.py b/apps/predbat/gateway.py index 868582f01..a620cf505 100644 --- a/apps/predbat/gateway.py +++ b/apps/predbat/gateway.py @@ -1174,6 +1174,14 @@ async def publish_plan(self, plan_entries, timezone_str): if not self._plan_changed(plan_entries): return # No change, skip publish + if not self._mqtt_connected: + # Re-queue so the next run() cycle retries once reconnected. Crucially, none + # of the publish state (version, timestamp, cached plan) is mutated here, so + # the periodic re-publish gate stays armed and fires immediately on reconnect. + self._pending_plan = (plan_entries, timezone_str) + self.log("Warn: GatewayMQTT: Not connected — plan queued for next publish") + return + self._plan_version += 1 data = self.build_execution_plan(plan_entries, plan_version=self._plan_version, timezone=timezone_str) self._last_plan_data = data @@ -1181,12 +1189,9 @@ async def publish_plan(self, plan_entries, timezone_str): self._last_plan_timezone = timezone_str self._last_plan_publish_time = time.time() - if self._mqtt_connected: - await self._publish_raw(self.topic_schedule, data, retain=True) - self._last_published_plan = plan_entries - self.log(f"Info: GatewayMQTT: Published execution plan v{self._plan_version} ({len(plan_entries)} entries)") - else: - self.log("Warn: GatewayMQTT: Not connected — plan queued for next publish") + await self._publish_raw(self.topic_schedule, data, retain=True) + self._last_published_plan = plan_entries + self.log(f"Info: GatewayMQTT: Published execution plan v{self._plan_version} ({len(plan_entries)} entries)") async def _republish_plan_if_stale(self): """Re-publish the last plan periodically so its embedded timestamp stays fresh. diff --git a/apps/predbat/tests/test_gateway.py b/apps/predbat/tests/test_gateway.py index 3cbe97a7a..8372d82a3 100644 --- a/apps/predbat/tests/test_gateway.py +++ b/apps/predbat/tests/test_gateway.py @@ -1092,6 +1092,7 @@ def _make_gateway(self): gw._last_plan_publish_time = 0 gw._plan_version = 0 gw._last_published_plan = None + gw._pending_plan = None gw.topic_schedule = "predbat/schedule" gw._published = [] @@ -1164,6 +1165,40 @@ def test_no_republish_without_prior_plan(self): self._run(gw._republish_plan_if_stale()) assert gw._published == [] + def test_disconnected_publish_requeues_without_mutating_state(self): + """Publishing while disconnected re-queues and leaves the publish state untouched.""" + gw = self._make_gateway() + gw._mqtt_connected = False + entries = self._entries() + self._run(gw.publish_plan(entries, "Europe/London")) + + # Nothing went out and the plan is queued for the next cycle. + assert gw._published == [] + assert gw._pending_plan == (entries, "Europe/London") + # Publish state is pristine so the re-publish gate fires immediately on reconnect. + assert gw._plan_version == 0 + assert gw._last_plan_entries is None + assert gw._last_plan_publish_time == 0 + assert gw._last_published_plan is None + + def test_requeued_plan_publishes_on_reconnect(self): + """A plan queued while disconnected is sent once reconnected.""" + gw = self._make_gateway() + gw._mqtt_connected = False + entries = self._entries() + self._run(gw.publish_plan(entries, "Europe/London")) + assert gw._published == [] + + # Reconnect and replay the queued plan (as the run() cycle would). + gw._mqtt_connected = True + pending, tz = gw._pending_plan + gw._pending_plan = None + self._run(gw.publish_plan(pending, tz)) + + assert len(gw._published) == 1 + assert gw._plan_version == 1 + assert gw._last_published_plan == entries + class TestAutomaticConfig: """Tests for GatewayMQTT.automatic_config() entity-to-arg mapping."""