Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 40 additions & 13 deletions apps/predbat/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ def initialize(self, gateway_device_id=None, mqtt_host=None, mqtt_port=8883, mqt
self._last_telemetry_time = 0
self._last_plan_data = None
self._last_plan_publish_time = 0
# Entries and timezone of the last built plan, kept so the periodic re-publish
# can rebuild the protobuf with a fresh timestamp (re-sending the cached bytes
# would keep the original timestamp and the device would think the plan is stale)
self._last_plan_entries = None
self._last_plan_timezone = None
self._plan_version = 0
self._refresh_in_progress = False
self._error_count = 0
Expand Down Expand Up @@ -403,13 +408,8 @@ async def run(self, seconds, first):
# Token refresh check
await self._check_token_refresh()

# Re-publish plan if stale
if self._last_plan_data and self._mqtt_connected:
elapsed = time.time() - self._last_plan_publish_time
if elapsed > _PLAN_REPUBLISH_INTERVAL:
await self._publish_raw(self.topic_schedule, self._last_plan_data, retain=True)
self._last_plan_publish_time = time.time()
self.log("Info: GatewayMQTT: Re-published execution plan (stale)")
# Re-publish the plan periodically so its embedded timestamp stays fresh
await self._republish_plan_if_stale()

# Mark component as alive on successful housekeeping
self.update_success_timestamp()
Expand Down Expand Up @@ -1174,17 +1174,44 @@ async def publish_plan(self, plan_entries, timezone_str):
if not self._plan_changed(plan_entries):
return # No change, skip publish

if not self._mqtt_connected:
# Re-queue so the next run() cycle retries once reconnected. Crucially, none
# of the publish state (version, timestamp, cached plan) is mutated here, so
# the periodic re-publish gate stays armed and fires immediately on reconnect.
self._pending_plan = (plan_entries, timezone_str)
self.log("Warn: GatewayMQTT: Not connected — plan queued for next publish")
return

self._plan_version += 1
data = self.build_execution_plan(plan_entries, plan_version=self._plan_version, timezone=timezone_str)
self._last_plan_data = data
self._last_plan_entries = plan_entries
self._last_plan_timezone = timezone_str
self._last_plan_publish_time = time.time()

Comment on lines 1187 to 1191
if self._mqtt_connected:
await self._publish_raw(self.topic_schedule, data, retain=True)
self._last_published_plan = plan_entries
self.log(f"Info: GatewayMQTT: Published execution plan v{self._plan_version} ({len(plan_entries)} entries)")
else:
self.log("Warn: GatewayMQTT: Not connected — plan queued for next publish")
await self._publish_raw(self.topic_schedule, data, retain=True)
self._last_published_plan = plan_entries
self.log(f"Info: GatewayMQTT: Published execution plan v{self._plan_version} ({len(plan_entries)} entries)")

async def _republish_plan_if_stale(self):
"""Re-publish the last plan periodically so its embedded timestamp stays fresh.

Called on every run() cycle. When more than _PLAN_REPUBLISH_INTERVAL has elapsed
since the last publish, the plan protobuf is rebuilt (which stamps a fresh
timestamp) and re-published. Rebuilding is essential: re-sending the cached bytes
would carry the original timestamp, so the device would still treat the plan as
stale. The version is left unchanged because the plan content has not changed.
Gated on an active MQTT connection and a previously built plan.
"""
if self._last_plan_entries is None or self._last_plan_timezone is None or not self._mqtt_connected:
return
if time.time() - self._last_plan_publish_time <= _PLAN_REPUBLISH_INTERVAL:
return
data = self.build_execution_plan(self._last_plan_entries, plan_version=self._plan_version, timezone=self._last_plan_timezone)
await self._publish_raw(self.topic_schedule, data, retain=True)
self._last_plan_data = data
self._last_plan_publish_time = time.time()
self.log("Info: GatewayMQTT: Re-published execution plan (refreshed timestamp)")

async def publish_command(self, command, **kwargs):
"""Build and publish a JSON command to the gateway.
Expand Down
2 changes: 1 addition & 1 deletion apps/predbat/predbat.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import pytz
import asyncio

THIS_VERSION = "v8.41.1"
THIS_VERSION = "v8.41.2"

from download import predbat_update_move, predbat_update_download, check_install, DEFAULT_PREDBAT_REPOSITORY
from const import MINUTE_WATT
Expand Down
145 changes: 144 additions & 1 deletion apps/predbat/tests/test_gateway.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""Tests for GatewayMQTT component."""
"""
Tests for GatewayMQTT component.
"""
import sys
import os
import math
Expand Down Expand Up @@ -1060,6 +1062,146 @@ def test_plan_publish_format(self):
assert plan2.plan_version > plan.plan_version


class TestPlanRepublish:
"""Tests for the periodic plan re-publish that refreshes the embedded timestamp."""

def _entries(self):
return [
{
"enabled": True,
"start_hour": 1,
"start_minute": 30,
"end_hour": 4,
"end_minute": 30,
"mode": 1,
"power_w": 3000,
"target_soc": 100,
"days_of_week": 0x7F,
"use_native": True,
}
]

def _make_gateway(self):
from gateway import GatewayMQTT
from unittest.mock import MagicMock

gw = GatewayMQTT.__new__(GatewayMQTT)
gw.log = MagicMock()
gw._mqtt_connected = True
gw._last_plan_data = None
gw._last_plan_entries = None
gw._last_plan_timezone = None
gw._last_plan_publish_time = 0
gw._plan_version = 0
gw._last_published_plan = None
gw._pending_plan = None
gw.topic_schedule = "predbat/schedule"
gw._published = []

async def fake_publish_raw(topic, payload, retain=False):
gw._published.append((topic, payload, retain))

gw._publish_raw = fake_publish_raw
return gw

def _run(self, coro):
import asyncio

return asyncio.run(coro)

def test_publish_plan_stores_entries_and_timezone(self):
"""publish_plan records the entries and timezone needed for a later rebuild."""
gw = self._make_gateway()
entries = self._entries()
self._run(gw.publish_plan(entries, "Europe/London"))
assert gw._last_plan_entries == entries
assert gw._last_plan_timezone == "Europe/London"
assert len(gw._published) == 1

def test_republish_refreshes_timestamp(self):
"""Re-publishing a stale plan rebuilds it with a newer timestamp, same version."""
gw = self._make_gateway()
self._run(gw.publish_plan(self._entries(), "Europe/London"))
first_payload = gw._published[0][1]
first_plan = pb.ExecutionPlan()
first_plan.ParseFromString(first_payload)

# Pretend the first publish was long enough ago to exceed the re-publish interval.
from gateway import _PLAN_REPUBLISH_INTERVAL

gw._last_plan_publish_time -= _PLAN_REPUBLISH_INTERVAL + 60

self._run(gw._republish_plan_if_stale())

assert len(gw._published) == 2
second_payload = gw._published[1][1]
second_plan = pb.ExecutionPlan()
second_plan.ParseFromString(second_payload)

assert second_plan.timestamp >= first_plan.timestamp # rebuilt with current clock
assert second_plan.plan_version == first_plan.plan_version # content unchanged → same version
# The cached bytes are refreshed so subsequent reads reflect the new timestamp.
assert gw._last_plan_data == second_payload

def test_no_republish_before_interval(self):
"""A plan younger than the re-publish interval is not re-sent."""
gw = self._make_gateway()
self._run(gw.publish_plan(self._entries(), "Europe/London"))
self._run(gw._republish_plan_if_stale())
assert len(gw._published) == 1

def test_no_republish_when_disconnected(self):
"""Nothing is re-published while MQTT is disconnected."""
gw = self._make_gateway()
self._run(gw.publish_plan(self._entries(), "Europe/London"))
from gateway import _PLAN_REPUBLISH_INTERVAL

gw._last_plan_publish_time -= _PLAN_REPUBLISH_INTERVAL + 60
gw._mqtt_connected = False
self._run(gw._republish_plan_if_stale())
assert len(gw._published) == 1

def test_no_republish_without_prior_plan(self):
"""With no plan ever built, the re-publish is a no-op."""
gw = self._make_gateway()
self._run(gw._republish_plan_if_stale())
assert gw._published == []

def test_disconnected_publish_requeues_without_mutating_state(self):
"""Publishing while disconnected re-queues and leaves the publish state untouched."""
gw = self._make_gateway()
gw._mqtt_connected = False
entries = self._entries()
self._run(gw.publish_plan(entries, "Europe/London"))

# Nothing went out and the plan is queued for the next cycle.
assert gw._published == []
assert gw._pending_plan == (entries, "Europe/London")
# Publish state is pristine so the re-publish gate fires immediately on reconnect.
assert gw._plan_version == 0
assert gw._last_plan_entries is None
assert gw._last_plan_publish_time == 0
assert gw._last_published_plan is None

def test_requeued_plan_publishes_on_reconnect(self):
"""A plan queued while disconnected is sent once reconnected."""
gw = self._make_gateway()
gw._mqtt_connected = False
entries = self._entries()
self._run(gw.publish_plan(entries, "Europe/London"))
assert gw._published == []

# Reconnect and replay the queued plan (as the run() cycle would).
gw._mqtt_connected = True
pending, tz = gw._pending_plan
gw._pending_plan = None
self._run(gw.publish_plan(pending, tz))

assert len(gw._published) == 1
assert gw._plan_version == 1
assert gw._last_published_plan == entries


class TestAutomaticConfig:
"""Tests for GatewayMQTT.automatic_config() entity-to-arg mapping."""

Expand Down Expand Up @@ -3268,6 +3410,7 @@ def run_gateway_tests(my_predbat=None):
TestTokenRefresh,
TestPlanHookConversion,
TestMQTTIntegration,
TestPlanRepublish,
TestPublishPredbatData,
TestIanaToPosixTz,
TestCheckInverterResets,
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"charge_limit_best": [3.02, 0.38, 9.52, 9.52, 9.52, 0.38, 0.38, 0.38, 7.02, 0.38, 9.52, 9.52], "charge_window_best": [{"start": 1020, "end": 1050, "average": 25.58, "target": 3.02}, {"start": 1140, "end": 1380, "average": 25.58, "target": 0.38}, {"start": 1410, "end": 1660, "average": 7.0, "target": 9.478}, {"start": 1680, "end": 1735, "average": 7.0, "target": 9.519}, {"start": 1740, "end": 1770, "average": 7.0, "target": 9.52}, {"start": 1830, "end": 2100, "average": 25.58, "target": 0.38}, {"start": 2160, "end": 2250, "average": 25.58, "target": 0.38}, {"start": 2310, "end": 2340, "average": 25.58, "target": 0.38}, {"start": 2340, "end": 2370, "average": 25.58, "target": 7.02}, {"start": 2370, "end": 2430, "average": 25.58, "target": 0.38}, {"start": 2850, "end": 3210, "average": 7.0, "target": 9.52}, {"start": 3210, "end": 3900, "average": 25.58, "target": 9.52}], "export_window_best": [{"average": 75.0, "end": 1140, "start": 1080, "set": 69.8, "start_orig": 1080, "target": 7}, {"average": 15.0, "end": 1680, "start": 1660, "set": 14.0, "target": 92}, {"average": 15.0, "end": 1740, "start": 1735, "set": 14.0, "start_orig": 1710, "target": 98}], "export_limits_best": [2.0, 88.0, 94.0]}
{"charge_limit_best": [3.02, 0.38, 9.52, 9.52, 9.52, 0.38, 0.38, 0.38, 7.02, 0.38, 9.52, 9.52], "charge_window_best": [{"start": 1020, "end": 1050, "average": 25.58, "target": 3.02}, {"start": 1140, "end": 1380, "average": 25.58, "target": 0.38}, {"start": 1410, "end": 1660, "average": 7.0, "target": 9.478}, {"start": 1680, "end": 1735, "average": 7.0, "target": 9.519}, {"start": 1740, "end": 1770, "average": 7.0, "target": 9.52}, {"start": 1830, "end": 2100, "average": 25.58, "target": 0.38}, {"start": 2160, "end": 2250, "average": 25.58, "target": 0.38}, {"start": 2310, "end": 2340, "average": 25.58, "target": 0.38}, {"start": 2340, "end": 2370, "average": 25.58, "target": 7.02}, {"start": 2370, "end": 2430, "average": 25.58, "target": 0.38}, {"start": 2850, "end": 3210, "average": 7.0, "target": 9.52}, {"start": 3210, "end": 3900, "average": 25.58, "target": 9.52}], "export_window_best": [{"average": 75.0, "end": 1140, "start": 1080, "set": 69.8, "start_orig": 1080, "target": 7}, {"average": 15.0, "end": 1680, "start": 1660, "set": 14.0, "target": 92}, {"average": 15.0, "end": 1740, "start": 1735, "set": 14.0, "start_orig": 1710, "target": 98}], "export_limits_best": [2.0, 88.0, 94.0]}
Loading