From fa8aded9107f121d087851342ed7a6c279a4455b Mon Sep 17 00:00:00 2001 From: Trefor Southwell Date: Mon, 22 Jun 2026 19:56:44 +0100 Subject: [PATCH 1/2] Octopus: stop flex dispatch promotion creating phantom slots (#4114); redact JWT in logs Fix #4114: async_get_intelligent_devices no longer promotes in-progress flexPlannedDispatches entries into completed_dispatches. flexPlannedDispatches is Octopus's optimiser schedule and includes plug-independent SMART grid-flex events that Octopus routinely withdraws on its next re-plan. Promoting them immortalised provisional slots as permanent cheap "completed" slots with no matching real dispatch, driving phantom cheap planning/grid-charging at full-peak-rate times. Planned entries now stay only in the planned list and disappear naturally when withdrawn; genuine charging is still cached via the metered completedDispatches feed (location=AT_HOME). Also redact the JWT token from the GraphQL request log and add logging of GraphQL response bodies for diagnostics. The token-refresh response (which carries the JWT) uses a separate path and is not logged. Adds regression tests for both changes and bumps version to v8.41.4. Co-Authored-By: Claude Opus 4.8 (1M context) --- apps/predbat/octopus.py | 79 +++-------------- apps/predbat/predbat.py | 2 +- .../tests/test_octopus_intelligent_devices.py | 53 ++++++++++++ apps/predbat/tests/test_octopus_logging.py | 85 +++++++++++++++++++ apps/predbat/unit_test.py | 2 + 5 files changed, 152 insertions(+), 69 deletions(-) create mode 100644 apps/predbat/tests/test_octopus_logging.py diff --git a/apps/predbat/octopus.py b/apps/predbat/octopus.py index a19cd5952..42fddf615 100644 --- a/apps/predbat/octopus.py +++ b/apps/predbat/octopus.py @@ -1576,7 +1576,9 @@ async def async_graphql_query(self, query, request_context, returns_data=True, i payload = {"query": query} auth_prefix = "" if use_backend else "JWT " headers = {"Authorization": f"{auth_prefix}{self.graphql_token}", integration_context_header: request_context} - self.log("OctopusAPI: Making GraphQL request to {} payload {} headers {}".format(url, payload, headers)) + # Redact the Authorization header so the JWT token is never written to the log + log_headers = {**headers, "Authorization": f"{auth_prefix}"} + self.log("OctopusAPI: Making GraphQL request to {} payload {} headers {}".format(url, payload, log_headers)) async with client.post(url, json=payload, headers=headers) as response: # Check for HTTP-level 401/403 (transport-level auth failure) and retry once. # This handles cases where the JWT has been revoked server-side and the server @@ -1595,6 +1597,7 @@ async def async_graphql_query(self, query, request_context, returns_data=True, i # Process response (which reads the text) response_body = await self.async_read_response_retry(response, url, ignore_errors=ignore_errors) + self.log("OctopusAPI: GraphQL response for {} (status {}): {}".format(request_context, response.status, response_body)) # Check for auth errors and retry once if response_body and "errors" in response_body and _retry_count == 0: @@ -1739,73 +1742,13 @@ async def async_get_intelligent_devices(self, account_id, device_id): delta = None dispatch = {"start": start, "end": end, "charge_in_kwh": delta, "source": meta.get("source", dispatch_type), "location": meta.get("location", None)} - keep = True - if start and end: - start_date_time = parse_date_time(start) - end_date_time = parse_date_time(end) - minutes_now = self.minutes_now - if start_date_time and end_date_time and ((self.now_utc_exact - start_date_time) > timedelta(minutes=4)) and (end_date_time >= self.now_utc_exact): - # This slot has actually started at least 4 minutes ago, so move it to completed so its cached if withdrawn later - # Make end be the end of this slot only and scale delta to the relative minutes - start_minutes = (start_date_time - self.midnight_utc).total_seconds() / 60 - # Only consider now onwards - start_minutes = max(minutes_now, start_minutes) - - # Align start_minutes to 30 minute slot - start_minutes = (start_minutes // self.plan_interval_minutes) * self.plan_interval_minutes - - # Work out end of this slot - end_minutes = start_minutes + self.plan_interval_minutes - - # End minutes to end of this slot only - if end_date_time > self.now_utc_exact: - end_minutes = max(minutes_now, end_minutes) - - # Round up end minutes to the next slot - end_minutes = ((end_minutes + self.plan_interval_minutes - 1) // self.plan_interval_minutes) * self.plan_interval_minutes - - # Work out slot end time - completed_start_time = self.midnight_utc + timedelta(minutes=start_minutes) - completed_end_time = self.midnight_utc + timedelta(minutes=end_minutes) - total_minutes = (end_date_time - start_date_time).total_seconds() / 60 - elapsed_minutes = (completed_end_time - completed_start_time).total_seconds() / 60 - if total_minutes > 0 and delta is not None: - adjusted_delta = dp4((delta * elapsed_minutes) / total_minutes) - else: - adjusted_delta = delta - completed_dispatch = { - "start": completed_start_time.strftime(DATE_TIME_STR_FORMAT), - "end": completed_end_time.strftime(DATE_TIME_STR_FORMAT), - "charge_in_kwh": adjusted_delta, - "source": meta.get("source", dispatch_type), - "location": meta.get("location", None), - } - - # Check if the dispatch is already in the completed list, if its already there then don't add it again - found = False - for cached in completed: - if cached.get("start") == completed_start_time.strftime(DATE_TIME_STR_FORMAT): - cached.update(completed_dispatch) - found = True - break - if not found: - completed.append(completed_dispatch) - - # Now adjust the start to be only beyond the adjusted end time and scale delta accordingly - # Work out minutes between original start and new start - elapsed_minutes = (completed_end_time - start_date_time).total_seconds() / 60 - # Used elapsed minutes as percentage of total_minutes to scale delta - if total_minutes > 0 and delta is not None: - delta = dp4((delta * (total_minutes - elapsed_minutes)) / total_minutes) - else: - delta = None - dispatch["start"] = completed_end_time.strftime(DATE_TIME_STR_FORMAT) - dispatch["charge_in_kwh"] = delta - # Check the remainder is not empty - if completed_end_time >= end_date_time: - keep = False - if keep: - planned.append(dispatch) + # Keep planned (flexPlannedDispatches) entries in the planned list only - do NOT promote + # in-progress slots into completed_dispatches. flexPlannedDispatches is Octopus's optimiser + # schedule and includes plug-independent SMART grid-flex events that Octopus routinely + # withdraws on its next re-plan. Promoting them immortalised provisional slots as permanent + # cheap "completed" slots that never had a matching real dispatch (see issue #4114). Genuine + # charging is still cached below via the metered completedDispatches feed (location=AT_HOME). + planned.append(dispatch) for completedDispatch in completedDispatches: start = completedDispatch.get("start", None) end = completedDispatch.get("end", None) diff --git a/apps/predbat/predbat.py b/apps/predbat/predbat.py index 313228d1d..fd7a23092 100644 --- a/apps/predbat/predbat.py +++ b/apps/predbat/predbat.py @@ -35,7 +35,7 @@ import pytz import asyncio -THIS_VERSION = "v8.41.3" +THIS_VERSION = "v8.41.4" from download import predbat_update_move, predbat_update_download, check_install, DEFAULT_PREDBAT_REPOSITORY from const import MINUTE_WATT diff --git a/apps/predbat/tests/test_octopus_intelligent_devices.py b/apps/predbat/tests/test_octopus_intelligent_devices.py index b1d3bcfae..77378cfb6 100644 --- a/apps/predbat/tests/test_octopus_intelligent_devices.py +++ b/apps/predbat/tests/test_octopus_intelligent_devices.py @@ -27,6 +27,7 @@ async def test_octopus_intelligent_devices(my_predbat): - Test 5: Future planned dispatch is kept in planned list - Test 6: Completed dispatches are parsed correctly - Test 7: Planned dispatch with missing start/end is skipped + - Test 8: In-progress flex dispatch not promoted to completed (issue #4114) """ print("**** Running Octopus intelligent devices tests ****") failed = 0 @@ -386,6 +387,58 @@ async def mock_query_missing(query, context, ignore_errors=False, returns_data=T print("ERROR: device-abc not found in result") failed += 1 + # ------------------------------------------------------------------ + # Test 8: In-progress flex planned dispatch is NOT promoted to completed (issue #4114) + # A flexPlannedDispatches entry that started a few minutes ago must stay in the planned + # list and must NOT be fabricated into completed_dispatches. Octopus routinely withdraws + # such provisional SMART flex slots; promoting them left permanent phantom cheap slots + # with no matching real (metered) dispatch. + # ------------------------------------------------------------------ + print("\n*** Test 8: In-progress flex dispatch not promoted to completed ***") + api = make_api() + + in_progress_start = (ref_now - timedelta(minutes=10)).strftime(DATE_TIME_STR_FORMAT) + in_progress_end = (ref_now + timedelta(minutes=20)).strftime(DATE_TIME_STR_FORMAT) + dispatch_data_in_progress = { + "flexPlannedDispatches": [ + { + "start": in_progress_start, + "end": in_progress_end, + "energyAddedKwh": "0.367", + "type": "smart-charge", + "meta": {"source": "SMART"}, # no location, as flexPlannedDispatches carries no location + } + ], + "completedDispatches": [], + } + + async def mock_query_in_progress(query, context, ignore_errors=False, returns_data=True): + if "get-intelligent-devices" in context: + return device_data + elif "get-intelligent-dispatches" in context: + return dispatch_data_in_progress + elif "get-intelligent-settings" in context: + return settings_data + return None + + api.async_graphql_query = AsyncMock(side_effect=mock_query_in_progress) + result = await api.async_get_intelligent_devices("test-account", "device-abc") + + if "device-abc" not in result: + print("ERROR: device-abc not found in result") + failed += 1 + else: + planned = result["device-abc"].get("planned_dispatches", []) + completed = result["device-abc"].get("completed_dispatches", []) + if len(completed) != 0: + print(f"ERROR: In-progress flex dispatch was promoted to completed (got {len(completed)} completed): {completed}") + failed += 1 + elif len(planned) != 1: + print(f"ERROR: Expected 1 planned dispatch (kept in planned), got {len(planned)}") + failed += 1 + else: + print("PASS: In-progress flex dispatch kept in planned and not promoted to completed") + if failed == 0: print("\n**** All Octopus intelligent devices tests PASSED ****") else: diff --git a/apps/predbat/tests/test_octopus_logging.py b/apps/predbat/tests/test_octopus_logging.py new file mode 100644 index 000000000..3910ab73c --- /dev/null +++ b/apps/predbat/tests/test_octopus_logging.py @@ -0,0 +1,85 @@ +""" +Tests for Octopus API GraphQL request/response logging. + +Verifies that the JWT auth token is never written to the log while the +GraphQL response body is logged for diagnostics. +""" + +import asyncio +from unittest.mock import AsyncMock, MagicMock +from octopus import OctopusAPI + + +def test_octopus_logging_wrapper(my_predbat): + """Synchronous entry point that runs the async logging test.""" + return asyncio.run(test_octopus_logging(my_predbat)) + + +async def test_octopus_logging(my_predbat): + """ + Test that async_graphql_query redacts the JWT token but logs the response. + + Tests: + - The secret JWT token never appears in any log line + - The redaction marker "" is logged in its place + - The GraphQL response body is logged + """ + print("**** Running Octopus API logging tests ****") + failed = False + + secret_token = "super-secret-jwt-token-value-123" + response_body = {"data": {"account": {"accountNumber": "A-LOG-12345"}}} + + api = OctopusAPI(my_predbat, key="test-key", account_id="test-account", automatic=False) + api.graphql_token = secret_token + + # Capture every log line emitted by the API + log_lines = [] + api.log = lambda message: log_lines.append(str(message)) + + # Token refresh returns the existing (valid) token without hitting the network + api.async_refresh_token = AsyncMock(return_value=secret_token) + + # Mock the HTTP client and POST response + mock_client = AsyncMock() + api.api.async_create_client_session = AsyncMock(return_value=mock_client) + + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.__aenter__ = AsyncMock(return_value=mock_response) + mock_response.__aexit__ = AsyncMock(return_value=None) + mock_client.post = MagicMock(return_value=mock_response) + + # Return a known response body so we can assert it was logged + api.async_read_response_retry = AsyncMock(return_value=response_body) + + result = await api.async_graphql_query("query { account }", "test-logging", returns_data=True) + + if result != response_body["data"]: + print(f"ERROR: Expected response data {response_body['data']}, got {result}") + failed = True + + all_logs = "\n".join(log_lines) + + # The secret token must never appear in the logs + if secret_token in all_logs: + print("ERROR: JWT token was leaked into the log output") + failed = True + else: + print("PASS: JWT token not present in log output") + + # The redaction marker should be logged in place of the token + if "" not in all_logs: + print("ERROR: Expected '' marker in request log") + failed = True + else: + print("PASS: Authorization header redacted in request log") + + # The response body should be logged + if "A-LOG-12345" not in all_logs: + print("ERROR: GraphQL response body was not logged") + failed = True + else: + print("PASS: GraphQL response body logged") + + return failed diff --git a/apps/predbat/unit_test.py b/apps/predbat/unit_test.py index af2eb3bd6..86a699209 100644 --- a/apps/predbat/unit_test.py +++ b/apps/predbat/unit_test.py @@ -88,6 +88,7 @@ from tests.test_octopus_read_response import test_octopus_read_response_wrapper from tests.test_octopus_read_response_retry import test_octopus_read_response_retry_wrapper from tests.test_octopus_rate_limit import test_octopus_rate_limit_wrapper +from tests.test_octopus_logging import test_octopus_logging_wrapper from tests.test_octopus_fetch_previous_dispatch import test_octopus_fetch_previous_dispatch_wrapper from tests.test_octopus_intelligent_devices import test_octopus_intelligent_devices_wrapper from tests.test_octopus_day_night_rates import test_octopus_day_night_rates_wrapper @@ -219,6 +220,7 @@ def main(): ("octopus_read_response", test_octopus_read_response_wrapper, "Octopus read response tests", False), ("octopus_read_response_retry", test_octopus_read_response_retry_wrapper, "Octopus read response retry with exponential backoff tests", False), ("octopus_rate_limit", test_octopus_rate_limit_wrapper, "Octopus API rate limit tests", False), + ("octopus_logging", test_octopus_logging_wrapper, "Octopus GraphQL logging redaction tests", False), ("octopus_fetch_previous_dispatch", test_octopus_fetch_previous_dispatch_wrapper, "Octopus fetch previous dispatch tests", False), ("octopus_intelligent_devices", test_octopus_intelligent_devices_wrapper, "Octopus intelligent devices tests (flexPlannedDispatches, energyAddedKwh)", False), ("octopus_day_night_rates", test_octopus_day_night_rates_wrapper, "Octopus day/night rate window selection tests (IOG TOU, GO, Economy 7)", False), From 05e4dc62a2f2408e39b01e737cdbebb695ff3004 Mon Sep 17 00:00:00 2001 From: Trefor Southwell Date: Mon, 22 Jun 2026 20:30:19 +0100 Subject: [PATCH 2/2] Octopus: trim in-progress flex dispatch instead of dropping it (review feedback) Removing the planned->completed promotion (issue #4114) also removed the trimming of an already-started slot. decode_octopus_slot does not trim a started slot to now when charge_in_kwh > 0, so an in-progress flex dispatch left in planned with a past start and full charge_in_kwh would double-count the already-delivered energy and inflate predicted car SoC/cost. Keep the dispatch in planned_dispatches (so it still vanishes when Octopus withdraws it), but if it is in progress advance its start to now and scale charge_in_kwh to the remaining portion of the slot. Extends Test 8 to assert the trim/scale and adds Test 9 (future slot untouched). Co-Authored-By: Claude Opus 4.8 (1M context) --- apps/predbat/octopus.py | 26 +++++-- .../tests/test_octopus_intelligent_devices.py | 75 +++++++++++++++++-- 2 files changed, 89 insertions(+), 12 deletions(-) diff --git a/apps/predbat/octopus.py b/apps/predbat/octopus.py index 42fddf615..c33eba6a3 100644 --- a/apps/predbat/octopus.py +++ b/apps/predbat/octopus.py @@ -1743,11 +1743,27 @@ async def async_get_intelligent_devices(self, account_id, device_id): dispatch = {"start": start, "end": end, "charge_in_kwh": delta, "source": meta.get("source", dispatch_type), "location": meta.get("location", None)} # Keep planned (flexPlannedDispatches) entries in the planned list only - do NOT promote - # in-progress slots into completed_dispatches. flexPlannedDispatches is Octopus's optimiser - # schedule and includes plug-independent SMART grid-flex events that Octopus routinely - # withdraws on its next re-plan. Promoting them immortalised provisional slots as permanent - # cheap "completed" slots that never had a matching real dispatch (see issue #4114). Genuine - # charging is still cached below via the metered completedDispatches feed (location=AT_HOME). + # in-progress slots into completed_dispatches (see issue #4114). flexPlannedDispatches is + # Octopus's optimiser schedule and includes plug-independent SMART grid-flex events that + # Octopus routinely withdraws on its next re-plan. Promoting them immortalised provisional + # slots as permanent cheap "completed" slots that never had a matching real dispatch. + # Genuine charging is still cached below via the metered completedDispatches feed + # (location=AT_HOME). + # + # If the slot is already in progress, trim the elapsed portion before appending: advance + # its start to now and scale charge_in_kwh to the remaining time. decode_octopus_slot does + # not trim a started slot when charge_in_kwh > 0, so without this the already-delivered + # energy would be double counted, inflating predicted car SoC/cost for the active window. + start_date_time = parse_date_time(start) + end_date_time = parse_date_time(end) + if start_date_time and end_date_time and start_date_time < self.now_utc_exact < end_date_time: + total_minutes = (end_date_time - start_date_time).total_seconds() / 60 + remaining_minutes = (end_date_time - self.now_utc_exact).total_seconds() / 60 + if total_minutes > 0: + if delta is not None: + delta = dp4(delta * remaining_minutes / total_minutes) + dispatch["charge_in_kwh"] = delta + dispatch["start"] = self.now_utc_exact.strftime(DATE_TIME_STR_FORMAT) planned.append(dispatch) for completedDispatch in completedDispatches: start = completedDispatch.get("start", None) diff --git a/apps/predbat/tests/test_octopus_intelligent_devices.py b/apps/predbat/tests/test_octopus_intelligent_devices.py index 77378cfb6..f5a42ebfe 100644 --- a/apps/predbat/tests/test_octopus_intelligent_devices.py +++ b/apps/predbat/tests/test_octopus_intelligent_devices.py @@ -27,7 +27,8 @@ async def test_octopus_intelligent_devices(my_predbat): - Test 5: Future planned dispatch is kept in planned list - Test 6: Completed dispatches are parsed correctly - Test 7: Planned dispatch with missing start/end is skipped - - Test 8: In-progress flex dispatch not promoted to completed (issue #4114) + - Test 8: In-progress flex dispatch not promoted to completed but trimmed to remainder (issue #4114) + - Test 9: Future flex dispatch is left untrimmed in planned """ print("**** Running Octopus intelligent devices tests ****") failed = 0 @@ -388,17 +389,21 @@ async def mock_query_missing(query, context, ignore_errors=False, returns_data=T failed += 1 # ------------------------------------------------------------------ - # Test 8: In-progress flex planned dispatch is NOT promoted to completed (issue #4114) + # Test 8: In-progress flex planned dispatch is NOT promoted to completed (issue #4114), + # but IS trimmed to the remaining portion so already-delivered energy is not double counted. # A flexPlannedDispatches entry that started a few minutes ago must stay in the planned - # list and must NOT be fabricated into completed_dispatches. Octopus routinely withdraws - # such provisional SMART flex slots; promoting them left permanent phantom cheap slots - # with no matching real (metered) dispatch. + # list (not be fabricated into completed_dispatches - Octopus routinely withdraws such + # provisional SMART flex slots), with its start advanced to now and charge_in_kwh scaled + # down to the remaining time. # ------------------------------------------------------------------ - print("\n*** Test 8: In-progress flex dispatch not promoted to completed ***") + print("\n*** Test 8: In-progress flex dispatch not promoted, trimmed to remaining portion ***") api = make_api() + # Slot started 10 min ago and ends 20 min from now -> 30 min total, 20 min remaining (2/3) in_progress_start = (ref_now - timedelta(minutes=10)).strftime(DATE_TIME_STR_FORMAT) in_progress_end = (ref_now + timedelta(minutes=20)).strftime(DATE_TIME_STR_FORMAT) + expected_trimmed_start = ref_now.strftime(DATE_TIME_STR_FORMAT) + expected_trimmed_kwh = round(0.367 * 20 / 30, 4) # scaled to remaining portion, dp4 dispatch_data_in_progress = { "flexPlannedDispatches": [ { @@ -436,8 +441,64 @@ async def mock_query_in_progress(query, context, ignore_errors=False, returns_da elif len(planned) != 1: print(f"ERROR: Expected 1 planned dispatch (kept in planned), got {len(planned)}") failed += 1 + elif planned[0].get("start") != expected_trimmed_start: + print(f"ERROR: Expected in-progress slot start trimmed to now ({expected_trimmed_start}), got {planned[0].get('start')}") + failed += 1 + elif planned[0].get("charge_in_kwh") != expected_trimmed_kwh: + print(f"ERROR: Expected charge_in_kwh scaled to remaining ({expected_trimmed_kwh}), got {planned[0].get('charge_in_kwh')}") + failed += 1 + else: + print("PASS: In-progress flex dispatch kept in planned, not promoted, and trimmed to remaining portion") + + # ------------------------------------------------------------------ + # Test 9: Future flex dispatch (not yet started) is left untrimmed in planned + # ------------------------------------------------------------------ + print("\n*** Test 9: Future flex dispatch is not trimmed ***") + api = make_api() + + future_only_start = (ref_now + timedelta(minutes=30)).strftime(DATE_TIME_STR_FORMAT) + future_only_end = (ref_now + timedelta(minutes=60)).strftime(DATE_TIME_STR_FORMAT) + dispatch_data_future_only = { + "flexPlannedDispatches": [ + { + "start": future_only_start, + "end": future_only_end, + "energyAddedKwh": "2.0", + "type": "smart-charge", + "meta": {"source": "SMART"}, + } + ], + "completedDispatches": [], + } + + async def mock_query_future_only(query, context, ignore_errors=False, returns_data=True): + if "get-intelligent-devices" in context: + return device_data + elif "get-intelligent-dispatches" in context: + return dispatch_data_future_only + elif "get-intelligent-settings" in context: + return settings_data + return None + + api.async_graphql_query = AsyncMock(side_effect=mock_query_future_only) + result = await api.async_get_intelligent_devices("test-account", "device-abc") + + if "device-abc" not in result: + print("ERROR: device-abc not found in result") + failed += 1 + else: + planned = result["device-abc"].get("planned_dispatches", []) + if len(planned) != 1: + print(f"ERROR: Expected 1 planned dispatch, got {len(planned)}") + failed += 1 + elif planned[0].get("start") != future_only_start: + print(f"ERROR: Future slot start should be untouched ({future_only_start}), got {planned[0].get('start')}") + failed += 1 + elif planned[0].get("charge_in_kwh") != 2.0: + print(f"ERROR: Future slot charge_in_kwh should be untouched (2.0), got {planned[0].get('charge_in_kwh')}") + failed += 1 else: - print("PASS: In-progress flex dispatch kept in planned and not promoted to completed") + print("PASS: Future flex dispatch left untrimmed in planned") if failed == 0: print("\n**** All Octopus intelligent devices tests PASSED ****")