Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 27 additions & 68 deletions apps/predbat/octopus.py
Original file line number Diff line number Diff line change
Expand Up @@ -1576,7 +1576,9 @@ async def async_graphql_query(self, query, request_context, returns_data=True, i
payload = {"query": query}
auth_prefix = "" if use_backend else "JWT "
headers = {"Authorization": f"{auth_prefix}{self.graphql_token}", integration_context_header: request_context}
self.log("OctopusAPI: Making GraphQL request to {} payload {} headers {}".format(url, payload, headers))
# Redact the Authorization header so the JWT token is never written to the log
log_headers = {**headers, "Authorization": f"{auth_prefix}<redacted>"}
self.log("OctopusAPI: Making GraphQL request to {} payload {} headers {}".format(url, payload, log_headers))
async with client.post(url, json=payload, headers=headers) as response:
# Check for HTTP-level 401/403 (transport-level auth failure) and retry once.
# This handles cases where the JWT has been revoked server-side and the server
Expand All @@ -1595,6 +1597,7 @@ async def async_graphql_query(self, query, request_context, returns_data=True, i

# Process response (which reads the text)
response_body = await self.async_read_response_retry(response, url, ignore_errors=ignore_errors)
self.log("OctopusAPI: GraphQL response for {} (status {}): {}".format(request_context, response.status, response_body))
Comment thread
springfall2008 marked this conversation as resolved.
Comment thread
springfall2008 marked this conversation as resolved.

# Check for auth errors and retry once
if response_body and "errors" in response_body and _retry_count == 0:
Expand Down Expand Up @@ -1739,73 +1742,29 @@ async def async_get_intelligent_devices(self, account_id, device_id):
delta = None

dispatch = {"start": start, "end": end, "charge_in_kwh": delta, "source": meta.get("source", dispatch_type), "location": meta.get("location", None)}
keep = True
if start and end:
start_date_time = parse_date_time(start)
end_date_time = parse_date_time(end)
minutes_now = self.minutes_now
if start_date_time and end_date_time and ((self.now_utc_exact - start_date_time) > timedelta(minutes=4)) and (end_date_time >= self.now_utc_exact):
# This slot has actually started at least 4 minutes ago, so move it to completed so its cached if withdrawn later
# Make end be the end of this slot only and scale delta to the relative minutes
start_minutes = (start_date_time - self.midnight_utc).total_seconds() / 60
# Only consider now onwards
start_minutes = max(minutes_now, start_minutes)

# Align start_minutes to 30 minute slot
start_minutes = (start_minutes // self.plan_interval_minutes) * self.plan_interval_minutes

# Work out end of this slot
end_minutes = start_minutes + self.plan_interval_minutes

# End minutes to end of this slot only
if end_date_time > self.now_utc_exact:
end_minutes = max(minutes_now, end_minutes)

# Round up end minutes to the next slot
end_minutes = ((end_minutes + self.plan_interval_minutes - 1) // self.plan_interval_minutes) * self.plan_interval_minutes

# Work out slot end time
completed_start_time = self.midnight_utc + timedelta(minutes=start_minutes)
completed_end_time = self.midnight_utc + timedelta(minutes=end_minutes)
total_minutes = (end_date_time - start_date_time).total_seconds() / 60
elapsed_minutes = (completed_end_time - completed_start_time).total_seconds() / 60
if total_minutes > 0 and delta is not None:
adjusted_delta = dp4((delta * elapsed_minutes) / total_minutes)
else:
adjusted_delta = delta
completed_dispatch = {
"start": completed_start_time.strftime(DATE_TIME_STR_FORMAT),
"end": completed_end_time.strftime(DATE_TIME_STR_FORMAT),
"charge_in_kwh": adjusted_delta,
"source": meta.get("source", dispatch_type),
"location": meta.get("location", None),
}

# Check if the dispatch is already in the completed list, if its already there then don't add it again
found = False
for cached in completed:
if cached.get("start") == completed_start_time.strftime(DATE_TIME_STR_FORMAT):
cached.update(completed_dispatch)
found = True
break
if not found:
completed.append(completed_dispatch)

# Now adjust the start to be only beyond the adjusted end time and scale delta accordingly
# Work out minutes between original start and new start
elapsed_minutes = (completed_end_time - start_date_time).total_seconds() / 60
# Used elapsed minutes as percentage of total_minutes to scale delta
if total_minutes > 0 and delta is not None:
delta = dp4((delta * (total_minutes - elapsed_minutes)) / total_minutes)
else:
delta = None
dispatch["start"] = completed_end_time.strftime(DATE_TIME_STR_FORMAT)
dispatch["charge_in_kwh"] = delta
# Check the remainder is not empty
if completed_end_time >= end_date_time:
keep = False
if keep:
planned.append(dispatch)
# Keep planned (flexPlannedDispatches) entries in the planned list only - do NOT promote
# in-progress slots into completed_dispatches (see issue #4114). flexPlannedDispatches is
# Octopus's optimiser schedule and includes plug-independent SMART grid-flex events that
# Octopus routinely withdraws on its next re-plan. Promoting them immortalised provisional
# slots as permanent cheap "completed" slots that never had a matching real dispatch.
# Genuine charging is still cached below via the metered completedDispatches feed
# (location=AT_HOME).
#
# If the slot is already in progress, trim the elapsed portion before appending: advance
# its start to now and scale charge_in_kwh to the remaining time. decode_octopus_slot does
# not trim a started slot when charge_in_kwh > 0, so without this the already-delivered
# energy would be double counted, inflating predicted car SoC/cost for the active window.
start_date_time = parse_date_time(start)
end_date_time = parse_date_time(end)
if start_date_time and end_date_time and start_date_time < self.now_utc_exact < end_date_time:
total_minutes = (end_date_time - start_date_time).total_seconds() / 60
remaining_minutes = (end_date_time - self.now_utc_exact).total_seconds() / 60
if total_minutes > 0:
if delta is not None:
delta = dp4(delta * remaining_minutes / total_minutes)
dispatch["charge_in_kwh"] = delta
dispatch["start"] = self.now_utc_exact.strftime(DATE_TIME_STR_FORMAT)
planned.append(dispatch)
Comment on lines +1757 to +1767
for completedDispatch in completedDispatches:
start = completedDispatch.get("start", None)
end = completedDispatch.get("end", None)
Expand Down
2 changes: 1 addition & 1 deletion apps/predbat/predbat.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import pytz
import asyncio

THIS_VERSION = "v8.41.3"
THIS_VERSION = "v8.41.4"

from download import predbat_update_move, predbat_update_download, check_install, DEFAULT_PREDBAT_REPOSITORY
from const import MINUTE_WATT
Expand Down
114 changes: 114 additions & 0 deletions apps/predbat/tests/test_octopus_intelligent_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ async def test_octopus_intelligent_devices(my_predbat):
- Test 5: Future planned dispatch is kept in planned list
- Test 6: Completed dispatches are parsed correctly
- Test 7: Planned dispatch with missing start/end is skipped
- Test 8: In-progress flex dispatch not promoted to completed but trimmed to remainder (issue #4114)
- Test 9: Future flex dispatch is left untrimmed in planned
"""
print("**** Running Octopus intelligent devices tests ****")
failed = 0
Expand Down Expand Up @@ -386,6 +388,118 @@ async def mock_query_missing(query, context, ignore_errors=False, returns_data=T
print("ERROR: device-abc not found in result")
failed += 1

# ------------------------------------------------------------------
# Test 8: In-progress flex planned dispatch is NOT promoted to completed (issue #4114),
# but IS trimmed to the remaining portion so already-delivered energy is not double counted.
# A flexPlannedDispatches entry that started a few minutes ago must stay in the planned
# list (not be fabricated into completed_dispatches - Octopus routinely withdraws such
# provisional SMART flex slots), with its start advanced to now and charge_in_kwh scaled
# down to the remaining time.
# ------------------------------------------------------------------
print("\n*** Test 8: In-progress flex dispatch not promoted, trimmed to remaining portion ***")
api = make_api()

# Slot started 10 min ago and ends 20 min from now -> 30 min total, 20 min remaining (2/3)
in_progress_start = (ref_now - timedelta(minutes=10)).strftime(DATE_TIME_STR_FORMAT)
in_progress_end = (ref_now + timedelta(minutes=20)).strftime(DATE_TIME_STR_FORMAT)
expected_trimmed_start = ref_now.strftime(DATE_TIME_STR_FORMAT)
expected_trimmed_kwh = round(0.367 * 20 / 30, 4) # scaled to remaining portion, dp4
dispatch_data_in_progress = {
"flexPlannedDispatches": [
{
"start": in_progress_start,
"end": in_progress_end,
"energyAddedKwh": "0.367",
"type": "smart-charge",
"meta": {"source": "SMART"}, # no location, as flexPlannedDispatches carries no location
}
],
"completedDispatches": [],
}

async def mock_query_in_progress(query, context, ignore_errors=False, returns_data=True):
if "get-intelligent-devices" in context:
return device_data
elif "get-intelligent-dispatches" in context:
return dispatch_data_in_progress
elif "get-intelligent-settings" in context:
return settings_data
return None

api.async_graphql_query = AsyncMock(side_effect=mock_query_in_progress)
result = await api.async_get_intelligent_devices("test-account", "device-abc")

if "device-abc" not in result:
print("ERROR: device-abc not found in result")
failed += 1
else:
planned = result["device-abc"].get("planned_dispatches", [])
completed = result["device-abc"].get("completed_dispatches", [])
if len(completed) != 0:
print(f"ERROR: In-progress flex dispatch was promoted to completed (got {len(completed)} completed): {completed}")
failed += 1
elif len(planned) != 1:
print(f"ERROR: Expected 1 planned dispatch (kept in planned), got {len(planned)}")
failed += 1
elif planned[0].get("start") != expected_trimmed_start:
print(f"ERROR: Expected in-progress slot start trimmed to now ({expected_trimmed_start}), got {planned[0].get('start')}")
failed += 1
elif planned[0].get("charge_in_kwh") != expected_trimmed_kwh:
print(f"ERROR: Expected charge_in_kwh scaled to remaining ({expected_trimmed_kwh}), got {planned[0].get('charge_in_kwh')}")
failed += 1
else:
print("PASS: In-progress flex dispatch kept in planned, not promoted, and trimmed to remaining portion")

# ------------------------------------------------------------------
# Test 9: Future flex dispatch (not yet started) is left untrimmed in planned
# ------------------------------------------------------------------
print("\n*** Test 9: Future flex dispatch is not trimmed ***")
api = make_api()

future_only_start = (ref_now + timedelta(minutes=30)).strftime(DATE_TIME_STR_FORMAT)
future_only_end = (ref_now + timedelta(minutes=60)).strftime(DATE_TIME_STR_FORMAT)
dispatch_data_future_only = {
"flexPlannedDispatches": [
{
"start": future_only_start,
"end": future_only_end,
"energyAddedKwh": "2.0",
"type": "smart-charge",
"meta": {"source": "SMART"},
}
],
"completedDispatches": [],
}

async def mock_query_future_only(query, context, ignore_errors=False, returns_data=True):
if "get-intelligent-devices" in context:
return device_data
elif "get-intelligent-dispatches" in context:
return dispatch_data_future_only
elif "get-intelligent-settings" in context:
return settings_data
return None

api.async_graphql_query = AsyncMock(side_effect=mock_query_future_only)
result = await api.async_get_intelligent_devices("test-account", "device-abc")

if "device-abc" not in result:
print("ERROR: device-abc not found in result")
failed += 1
else:
planned = result["device-abc"].get("planned_dispatches", [])
if len(planned) != 1:
print(f"ERROR: Expected 1 planned dispatch, got {len(planned)}")
failed += 1
elif planned[0].get("start") != future_only_start:
print(f"ERROR: Future slot start should be untouched ({future_only_start}), got {planned[0].get('start')}")
failed += 1
elif planned[0].get("charge_in_kwh") != 2.0:
print(f"ERROR: Future slot charge_in_kwh should be untouched (2.0), got {planned[0].get('charge_in_kwh')}")
failed += 1
else:
print("PASS: Future flex dispatch left untrimmed in planned")

if failed == 0:
print("\n**** All Octopus intelligent devices tests PASSED ****")
else:
Expand Down
85 changes: 85 additions & 0 deletions apps/predbat/tests/test_octopus_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""
Tests for Octopus API GraphQL request/response logging.

Verifies that the JWT auth token is never written to the log while the
GraphQL response body is logged for diagnostics.
"""

import asyncio
from unittest.mock import AsyncMock, MagicMock
from octopus import OctopusAPI


def test_octopus_logging_wrapper(my_predbat):
"""Synchronous entry point that runs the async logging test."""
return asyncio.run(test_octopus_logging(my_predbat))


async def test_octopus_logging(my_predbat):
"""
Test that async_graphql_query redacts the JWT token but logs the response.

Tests:
- The secret JWT token never appears in any log line
- The redaction marker "<redacted>" is logged in its place
- The GraphQL response body is logged
"""
print("**** Running Octopus API logging tests ****")
failed = False

secret_token = "super-secret-jwt-token-value-123"
response_body = {"data": {"account": {"accountNumber": "A-LOG-12345"}}}

api = OctopusAPI(my_predbat, key="test-key", account_id="test-account", automatic=False)
api.graphql_token = secret_token

# Capture every log line emitted by the API
log_lines = []
api.log = lambda message: log_lines.append(str(message))

# Token refresh returns the existing (valid) token without hitting the network
api.async_refresh_token = AsyncMock(return_value=secret_token)

# Mock the HTTP client and POST response
mock_client = AsyncMock()
api.api.async_create_client_session = AsyncMock(return_value=mock_client)

mock_response = AsyncMock()
mock_response.status = 200
mock_response.__aenter__ = AsyncMock(return_value=mock_response)
mock_response.__aexit__ = AsyncMock(return_value=None)
mock_client.post = MagicMock(return_value=mock_response)

# Return a known response body so we can assert it was logged
api.async_read_response_retry = AsyncMock(return_value=response_body)

result = await api.async_graphql_query("query { account }", "test-logging", returns_data=True)

if result != response_body["data"]:
print(f"ERROR: Expected response data {response_body['data']}, got {result}")
failed = True

all_logs = "\n".join(log_lines)

# The secret token must never appear in the logs
if secret_token in all_logs:
print("ERROR: JWT token was leaked into the log output")
failed = True
else:
print("PASS: JWT token not present in log output")

# The redaction marker should be logged in place of the token
if "<redacted>" not in all_logs:
print("ERROR: Expected '<redacted>' marker in request log")
failed = True
else:
print("PASS: Authorization header redacted in request log")

# The response body should be logged
if "A-LOG-12345" not in all_logs:
print("ERROR: GraphQL response body was not logged")
failed = True
else:
print("PASS: GraphQL response body logged")

return failed
Loading
Loading