diff --git a/.cspell/custom-dictionary-workspace.txt b/.cspell/custom-dictionary-workspace.txt
index a6716bc84..4418c396d 100644
--- a/.cspell/custom-dictionary-workspace.txt
+++ b/.cspell/custom-dictionary-workspace.txt
@@ -54,6 +54,7 @@ chargehold
chargelater
Chrg
citem
+clearsky
Codespaces
collapsable
compareform
@@ -270,6 +271,7 @@ oncharge
oninput
onmouseout
onmouseover
+openmeteo
openweathermap
overfitting
overvoltage
diff --git a/apps/predbat/components.py b/apps/predbat/components.py
index 9a7738e15..31893eb44 100644
--- a/apps/predbat/components.py
+++ b/apps/predbat/components.py
@@ -108,11 +108,15 @@
"pv_forecast_tomorrow": {"required": False, "config": "pv_forecast_tomorrow"},
"pv_forecast_d3": {"required": False, "config": "pv_forecast_d3"},
"pv_forecast_d4": {"required": False, "config": "pv_forecast_d4"},
+ "pv_clearsky_today": {"required": False, "config": "pv_clearsky_today"},
+ "pv_clearsky_tomorrow": {"required": False, "config": "pv_clearsky_tomorrow"},
+ "pv_clearsky_d3": {"required": False, "config": "pv_clearsky_d3"},
+ "pv_clearsky_d4": {"required": False, "config": "pv_clearsky_d4"},
"pv_scaling": {"required": False, "config": "pv_scaling", "default": 1.0},
"open_meteo_forecast": {"required": False, "config": "open_meteo_forecast", "default": False},
"open_meteo_forecast_max_age": {"required": False, "config": "open_meteo_forecast_max_age", "default": 4},
},
- "required_or": ["solcast_api_key", "forecast_solar", "pv_forecast_today", "open_meteo_forecast"],
+ "required_or": ["solcast_api_key", "forecast_solar", "pv_forecast_today", "open_meteo_forecast", "pv_clearsky_today"],
"phase": 2, # Solar component moved to phase 2 so that any Predbat cloud components (such as GEcloud) have been started and initialised pv_today, etc
},
"gecloud": {
diff --git a/apps/predbat/config.py b/apps/predbat/config.py
index ef12ca699..c6361d0dd 100644
--- a/apps/predbat/config.py
+++ b/apps/predbat/config.py
@@ -487,6 +487,104 @@
"default": True,
"enable": "expert_mode",
},
+ {
+ "name": "clipping_buffer_enable",
+ "friendly_name": "Clipping Buffer Enable",
+ "type": "switch",
+ "default": False,
+ "icon": "mdi:chart-bell-curve-cumulative",
+ },
+ {
+ "name": "clipping_clearsky_source",
+ "friendly_name": "Clipping Clear-Sky Source",
+ "type": "select",
+ "options": ["auto", "ha_solcast_clearsky", "solcast_api", "openmeteo"],
+ "icon": "mdi:cloud-search",
+ "default": "auto",
+ "enable": "clipping_buffer_enable",
+ },
+ {
+ "name": "clipping_use_clearsky_peaks",
+ "friendly_name": "Clipping Use ClearSky Peaks",
+ "type": "switch",
+ "default": True,
+ "icon": "mdi:weather-sunny-alert",
+ "enable": "clipping_buffer_enable",
+ },
+ {
+ "name": "clipping_auto_tune",
+ "friendly_name": "Clipping Auto-Tune",
+ "type": "switch",
+ "default": True,
+ "icon": "mdi:auto-fix",
+ "enable": "clipping_buffer_enable",
+ },
+ {
+ "name": "clipping_cost_weight",
+ "friendly_name": "Clipping Cost Weight",
+ "type": "input_number",
+ "min": 0,
+ "max": 10.0,
+ "step": 0.1,
+ "unit": "x",
+ "icon": "mdi:multiplication",
+ "enable": "clipping_buffer_enable",
+ "default": 1.0,
+ },
+ {
+ "name": "clipping_amplification",
+ "friendly_name": "Clipping Amplification",
+ "type": "input_number",
+ "min": 0.5,
+ "max": 3.0,
+ "step": 0.1,
+ "unit": "x",
+ "icon": "mdi:arrow-expand-vertical",
+ "enable": "clipping_buffer_enable",
+ "default": 1.0,
+ },
+ {
+ "name": "clipping_limit_override",
+ "friendly_name": "Clipping Limit Override",
+ "type": "input_number",
+ "min": 0,
+ "max": 50000,
+ "step": 100,
+ "unit": "W",
+ "icon": "mdi:flash-alert",
+ "enable": "clipping_buffer_enable",
+ "default": 0,
+ },
+ {
+ "name": "clipping_buffer_max_kwh",
+ "friendly_name": "Clipping Buffer Max kWh (Manual Override)",
+ "type": "input_number",
+ "min": 0,
+ "max": 50.0,
+ "step": 0.1,
+ "unit": "kWh",
+ "icon": "mdi:battery-50",
+ "default": 0,
+ "enable": "clipping_buffer_enable",
+ },
+ {
+ "name": "clipping_buffer_start_time",
+ "friendly_name": "Clipping Buffer Start Time",
+ "type": "select",
+ "options": ["None"] + OPTIONS_TIME,
+ "icon": "mdi:clock-start",
+ "default": "None",
+ "enable": "clipping_buffer_enable",
+ },
+ {
+ "name": "clipping_buffer_end_time",
+ "friendly_name": "Clipping Buffer End Time",
+ "type": "select",
+ "options": ["None"] + OPTIONS_TIME,
+ "icon": "mdi:clock-end",
+ "default": "None",
+ "enable": "clipping_buffer_enable",
+ },
{
"name": "metric_pv_calibration_enable",
"friendly_name": "Enable use of Calibrated PV data",
@@ -2234,4 +2332,16 @@
"gateway_mqtt_host": {"type": "string", "empty": False},
"gateway_mqtt_port": {"type": "integer", "zero": False},
"gateway_mqtt_token": {"type": "string", "empty": False},
+ "clipping_buffer_enable": {"type": "boolean"},
+ "clipping_use_clearsky_peaks": {"type": "boolean"},
+ "clipping_cost_weight": {"type": "float"},
+ "clipping_amplification": {"type": "float"},
+ "clipping_limit_override": {"type": "integer"},
+ "clipping_buffer_max_kwh": {"type": "float"},
+ "clipping_buffer_start_time": {"type": "string"},
+ "clipping_buffer_end_time": {"type": "string"},
+ "pv_clearsky_today": {"type": "sensor", "sensor_type": "float"},
+ "pv_clearsky_tomorrow": {"type": "sensor", "sensor_type": "float"},
+ "pv_clearsky_d3": {"type": "sensor", "sensor_type": "float"},
+ "pv_clearsky_d4": {"type": "sensor", "sensor_type": "float"},
}
diff --git a/apps/predbat/config/apps.yaml b/apps/predbat/config/apps.yaml
index 39c07b422..cfbb93aec 100644
--- a/apps/predbat/config/apps.yaml
+++ b/apps/predbat/config/apps.yaml
@@ -267,7 +267,6 @@ pred_bat:
# Only use this for workarounds if your inverter time is correct but Predbat is somehow wrong (AppDaemon issue)
# 1 means add 1 minute to AppDaemon time, -1 takes it away
clock_skew: 0
-
# Solcast cloud interface, set this or the local interface below
#solcast_host: 'https://api.solcast.com.au/'
#solcast_api_key: !secret solcast_api_key
@@ -280,7 +279,25 @@ pred_bat:
# azimuth: 180
# declination: 10
# api_key: !secret forecast_solar_api_key
- # azimuth_zero_south: false # Set to true if azimuth is already in Forecast.solar/Open-Meteo convention (0=South) rather than Predbat convention (0=North)
+
+ # Open-Meteo interface (used for primary PV forecast OR for ClearSky baseline)
+ #open_meteo_forecast:
+ # - latitude: 55.9711067
+ # longitude: -3.1925157
+ # declination: 35
+ # azimuth: 180
+ # kwp: 4.0
+
+ # Determines which source Predbat uses for the primary PV forecast.
+ # Options: "auto" (default), "ha", "forecast_solar", "openmeteo", "solcast_api"
+ # If you are using Open-Meteo ONLY for the ClearSky baseline below, it's recommended to explicitly set this to "ha" if using the HA integration.
+ #pv_forecast_primary: "auto"
+
+ # --- Automated Solar Clipping Mitigation ---
+ # Enable clipping mitigation to automatically force exports prior to peaks
+ #clipping_buffer_enable: True
+ # Set the source for the theoretical ClearSky maximum (e.g. "openmeteo", "ha_solcast_clearsky", "solcast_api")
+ #clipping_clearsky_source: "ha_solcast_clearsky"
# Set these to match solcast sensor names if not using the cloud interface
# The regular expression (re:) makes the solcast bit optional
@@ -290,6 +307,12 @@ pred_bat:
pv_forecast_d3: re:(sensor.(solcast_|)(pv_forecast_|)forecast_(day_3|d3))
pv_forecast_d4: re:(sensor.(solcast_|)(pv_forecast_|)forecast_(day_4|d4))
+ # ClearSky sensors (Required if clipping_clearsky_source is set to ha_solcast_clearsky)
+ #pv_clearsky_today: re:(sensor.(solcast_|)(pv_forecast_|)clearsky_today)
+ #pv_clearsky_tomorrow: re:(sensor.(solcast_|)(pv_forecast_|)clearsky_tomorrow)
+ #pv_clearsky_d3: re:(sensor.(solcast_|)(pv_forecast_|)clearsky_(day_3|d3))
+ #pv_clearsky_d4: re:(sensor.(solcast_|)(pv_forecast_|)clearsky_(day_4|d4))
+
# Ohme EV charger cloud direct integration
#ohme_login: !secret ohme_login
#ohme_password: !secret ohme_password
diff --git a/apps/predbat/fetch.py b/apps/predbat/fetch.py
index b3d810018..79064d137 100644
--- a/apps/predbat/fetch.py
+++ b/apps/predbat/fetch.py
@@ -572,7 +572,7 @@ def minute_data_import_export(self, max_days_previous, now_utc, key, scale=1.0,
if history and len(history) > 0 and len(history[0]) > 0:
item = history[0][0]
try:
- last_updated_time = str2time(item["last_updated"])
+ last_updated_time = str2time(item.get("last_updated", item.get("state_class", "")))
except (ValueError, TypeError):
last_updated_time = now_utc
age_days = max_days_previous
@@ -639,7 +639,7 @@ def minute_data_load(self, now_utc, entity_name, max_days_previous, load_scaling
if isinstance(history, list) and history and history[0]:
item = history[0][0]
try:
- last_updated_time = str2time(item["last_updated"])
+ last_updated_time = str2time(item.get("last_updated", item.get("state_class", "")))
except (ValueError, TypeError):
last_updated_time = now_utc
age = now_utc - last_updated_time
@@ -1027,7 +1027,7 @@ def fetch_sensor_data(self, save=True):
self.cost_today_sofar, self.carbon_today_sofar = self.today_cost(self.import_today, self.export_today, self.car_charging_energy, self.load_minutes, save=save)
# Fetch PV forecast if enabled, today must be enabled, other days are optional
- self.pv_forecast_minute, self.pv_forecast_minute10 = self.fetch_pv_forecast()
+ self.pv_forecast_minute, self.pv_forecast_minute10, self.pv_forecast_minute90, self.pv_forecast_minuteCS, self.pv_forecast_minuteMAX = self.fetch_pv_forecast()
if self.load_minutes and not self.load_forecast_only:
# Apply modal filter to historical data
@@ -1240,11 +1240,21 @@ def fetch_pv_forecast(self):
"""
pv_forecast_minute = {}
pv_forecast_minute10 = {}
+ pv_forecast_minute90 = {}
+ pv_forecast_minuteCS = {}
+ pv_forecast_minuteHIST = {}
# Get data from forecast sensor
entity_id = "sensor." + self.prefix + "_pv_forecast_raw"
pv_forecast_packed_ld = self.get_state_wrapper(entity_id=entity_id, attribute="forecast")
pv_forecast10_packed_ld = self.get_state_wrapper(entity_id=entity_id, attribute="forecast10")
+ pv_forecast90_packed_ld = self.get_state_wrapper(entity_id=entity_id, attribute="forecast90")
+ pv_forecastCS_packed_ld = self.get_state_wrapper(entity_id=entity_id, attribute="forecast_clearsky")
+ pv_forecastHIST_packed_ld = self.get_state_wrapper(entity_id=entity_id, attribute="forecast_historical")
+ if pv_forecastCS_packed_ld is None:
+ pv_forecastCS_packed_ld = self.get_state_wrapper(entity_id=entity_id, attribute="forecastCS")
+ if pv_forecastHIST_packed_ld is None:
+ pv_forecastHIST_packed_ld = self.get_state_wrapper(entity_id=entity_id, attribute="forecastMAX")
relative_time = self.get_state_wrapper(entity_id=entity_id, attribute="relative_time")
try:
relative_time = datetime.strptime(relative_time, TIME_FORMAT)
@@ -1255,6 +1265,9 @@ def fetch_pv_forecast(self):
# Convert keys to integers and values to floats
pv_forecast_packed = {}
pv_forecast10_packed = {}
+ pv_forecast90_packed = {}
+ pv_forecastCS_packed = {}
+ pv_forecastHIST_packed = {}
if pv_forecast_packed_ld:
for key, value in pv_forecast_packed_ld.items():
@@ -1272,10 +1285,37 @@ def fetch_pv_forecast(self):
except (ValueError, TypeError):
pass
+ if pv_forecast90_packed_ld:
+ for key, value in pv_forecast90_packed_ld.items():
+ try:
+ minute = int(key)
+ pv_forecast90_packed[minute] = float(value)
+ except (ValueError, TypeError):
+ pass
+
+ if pv_forecastCS_packed_ld:
+ for key, value in pv_forecastCS_packed_ld.items():
+ try:
+ minute = int(key)
+ pv_forecastCS_packed[minute] = float(value)
+ except (ValueError, TypeError):
+ pass
+
+ if pv_forecastHIST_packed_ld:
+ for key, value in pv_forecastHIST_packed_ld.items():
+ try:
+ minute = int(key)
+ pv_forecastHIST_packed[minute] = float(value)
+ except (ValueError, TypeError):
+ pass
+
# Unpack the forecast data
max_minute = max(pv_forecast_packed.keys()) if pv_forecast_packed else 0
last_value = 0
last_value10 = 0
+ last_value90 = 0
+ last_valueCS = 0
+ last_valueHIST = 0
# The forecast could be for a different time to our relative time, so we need to offset the minutes to align with our midnight_utc.
# relative_time is the midnight at which the forecast was saved, so stored minute keys are relative to that midnight.
# We subtract the offset so that stored minute X (= relative_time + X) maps to (relative_time + X - midnight_utc) minutes from today's midnight.
@@ -1284,10 +1324,16 @@ def fetch_pv_forecast(self):
target_minute = minute - minute_offset
last_value = pv_forecast_packed.get(minute, last_value)
last_value10 = pv_forecast10_packed.get(minute, last_value10)
+ last_value90 = pv_forecast90_packed.get(minute, last_value90)
+ last_valueCS = pv_forecastCS_packed.get(minute, last_valueCS)
+ last_valueHIST = pv_forecastHIST_packed.get(minute, last_valueHIST)
pv_forecast_minute[target_minute] = last_value
pv_forecast_minute10[target_minute] = last_value10
+ pv_forecast_minute90[target_minute] = last_value90
+ pv_forecast_minuteCS[target_minute] = last_valueCS
+ pv_forecast_minuteHIST[target_minute] = last_valueHIST
- return pv_forecast_minute, pv_forecast_minute10
+ return pv_forecast_minute, pv_forecast_minute10, pv_forecast_minute90, pv_forecast_minuteCS, pv_forecast_minuteHIST
def predict_battery_temperature(self, battery_temperature_history, step):
"""
@@ -2317,6 +2363,14 @@ def fetch_config_options(self):
self.carbon_enable = self.get_arg("carbon_enable")
self.carbon_metric = self.get_arg("carbon_metric")
+ # Clipping peak cost penalty model
+ self.clipping_buffer_enable = self.get_arg("clipping_buffer_enable")
+ self.clipping_use_clearsky_peaks = self.get_arg("clipping_use_clearsky_peaks")
+ self.clipping_auto_tune = self.get_arg("clipping_auto_tune")
+ self.clipping_cost_weight = self.get_arg("clipping_cost_weight")
+ self.clipping_amplification = self.get_arg("clipping_amplification")
+ self.clipping_limit_override = self.get_arg("clipping_limit_override") / MINUTE_WATT if self.get_arg("clipping_limit_override") else 0
+
# iBoost solar diverter model
self.iboost_enable = self.get_arg("iboost_enable")
self.iboost_gas = self.get_arg("iboost_gas")
diff --git a/apps/predbat/load_ml_component.py b/apps/predbat/load_ml_component.py
index 859e23836..87d46d9d8 100644
--- a/apps/predbat/load_ml_component.py
+++ b/apps/predbat/load_ml_component.py
@@ -405,7 +405,7 @@ async def _fetch_load_data(self):
energy = self.get_from_incrementing(pv_data_cumulative, m, PREDICT_STEP, backwards=True)
pv_data[m] = dp4(energy)
- pv_forecast_minute, pv_forecast_minute10 = self.base.fetch_pv_forecast()
+ pv_forecast_minute, pv_forecast_minute10, pv_forecast_minute90, pv_forecast_minuteCS, pv_forecast_minuteHIST = self.base.fetch_pv_forecast()
# Add future PV forecast as per-5-min energy with negative keys (negative = future)
# key -5 = first future step, -10 = second, etc.
if pv_forecast_minute:
diff --git a/apps/predbat/output.py b/apps/predbat/output.py
index 4448f39fe..9fbc0426b 100644
--- a/apps/predbat/output.py
+++ b/apps/predbat/output.py
@@ -939,6 +939,35 @@ def short_textual_plan(self, soc_min, soc_min_minute, pv_forecast_minute_step, p
export_type, self.duration_string(self.export_window_best[export_window_n_next]["start"] - self.minutes_now), self.get_rate_text(self.export_window_best[export_window_n_next]["start"], export=True, with_value=True)
)
+ # Clipping summary
+ if getattr(self, "clipping_buffer_enable", False):
+ clipping_status = getattr(self, "clipping_status", "No clipping forecast.")
+ sentence += "- Clipping status: {}\n".format(clipping_status)
+
+ predict_clipped_best = getattr(self, "predict_clipped_best", {})
+ if predict_clipped_best:
+ clipping_total = predict_clipped_best.get(max(predict_clipped_best.keys()), 0.0)
+ if clipping_total > 0.01:
+ clipping_mode = getattr(self, "clipping_limit_mode", "Unknown")
+ start_str = ""
+ end_str = ""
+ start_stamp = None
+ end_stamp = None
+
+ prev_val = 0.0
+ for min_key, val in sorted(predict_clipped_best.items()):
+ if val > prev_val + 0.001:
+ if start_stamp is None:
+ start_stamp = self.midnight_utc + timedelta(minutes=min_key)
+ end_stamp = self.midnight_utc + timedelta(minutes=min_key)
+ prev_val = val
+
+ if start_stamp and end_stamp:
+ start_str = start_stamp.strftime("%H:%M")
+ end_str = end_stamp.strftime("%H:%M")
+
+ sentence += "- Forecast {} kWh clipping, exceeding {} limit from {} to {}. Plan penalized to mitigate.\n".format(dp2(clipping_total), clipping_mode, start_str, end_str)
+
if publish:
self.text_plan = self.get_text_plan_html(sentence)
@@ -2439,6 +2468,9 @@ def record_status(self, message, debug="", had_errors=False, notify=False, extra
},
)
+ # Clipping Status and PV Peak Forecast sensors are published in plan.py run_prediction(save="best")
+ # to avoid duplicate dashboard_item writes to the same sensor.
+
if had_errors:
self.log("Warn: record_status {}".format(message + extra))
else:
diff --git a/apps/predbat/plan.py b/apps/predbat/plan.py
index 2ef436436..56cf552e1 100644
--- a/apps/predbat/plan.py
+++ b/apps/predbat/plan.py
@@ -844,6 +844,126 @@ def in_charge_window(self, charge_window, minute_abs):
window_n += 1
return -1
+ def inject_clipping_export_windows(self):
+ """
+ Inject candidate export windows before predicted clipping to allow the optimizer to evaluate forced exports to create headroom.
+ """
+ if not getattr(self, "clipping_buffer_enable", False):
+ self.log("inject_clipping_export_windows: Aborting because clipping_buffer_enable is False")
+ return
+
+ forecast = getattr(self, "clipping_buffer_forecast_kwh", {})
+ if not forecast:
+ self.log("inject_clipping_export_windows: Aborting because clipping_buffer_forecast_kwh is empty")
+ return
+
+ self.log("inject_clipping_export_windows: Proceeding with clipping blocks: {}".format(forecast))
+
+ # Find contiguous blocks of clipping in the forecast
+ clipping_blocks = []
+ current_block = None
+ for minute_relative, kwh_loss in sorted(forecast.items()):
+ # forecast uses relative minutes from now, convert to absolute
+ minute = minute_relative + self.minutes_now
+ if kwh_loss > 0:
+ if current_block is None:
+ current_block = {"start": minute, "end": minute + 30}
+ elif minute <= current_block["end"] + 60:
+ current_block["end"] = max(current_block["end"], minute + 30)
+ else:
+ clipping_blocks.append(current_block)
+ current_block = {"start": minute, "end": minute + 30}
+ if current_block:
+ clipping_blocks.append(current_block)
+
+ # Helper to check if a window intersects the target range
+ def intersects(w, start, end):
+ return not (w["end"] <= start or w["start"] >= end)
+
+ for block in clipping_blocks:
+ peak_start, peak_end = block["start"], block["end"]
+
+ # Calculate the dynamically sized export window required to create the needed headroom.
+ # We walk backwards from the peak minute by minute to account for the fact that
+ # solar generation (PV) during the export window consumes inverter capacity and
+ # reduces the effective battery discharge rate.
+ max_kwh_loss = max(kwh_loss for m, kwh_loss in forecast.items() if peak_start <= m <= peak_end)
+
+ # Cap the requested headroom by the physical capacity of the battery.
+ # We cannot create more headroom than the battery can physically hold.
+ battery_capacity_kwh = self.soc_max - getattr(self, "best_soc_min", 0.0)
+ max_kwh_loss = min(max_kwh_loss, battery_capacity_kwh)
+
+ hybrid = getattr(self, "inverter_hybrid", False)
+ inverter_limit_kw = getattr(self, "set_inverter_limit", 0) / 1000.0
+ export_limit_kw = getattr(self, "export_limit", 0)
+ pv_forecast = getattr(self, "pv_forecast_minute_step", {})
+
+ accumulated_kwh = 0.0
+ minutes_needed = 0
+
+ for m in range(peak_start, max(self.minutes_now, peak_start - 360), -1):
+ # Start with raw battery maximum discharge
+ discharge_kw = self.battery_rate_max_discharge * getattr(self, "battery_rate_max_scaling_discharge", 1.0)
+
+ # Retrieve PV forecast for this minute (kW)
+ pv_kw = pv_forecast.get(m, 0.0)
+
+ # Hybrid inverters share the AC limit between battery and PV
+ if hybrid and inverter_limit_kw > 0:
+ discharge_kw = min(discharge_kw, max(0.0, inverter_limit_kw - pv_kw))
+
+ # The total site export limit also restricts combined PV + Battery export
+ if export_limit_kw > 0:
+ discharge_kw = min(discharge_kw, max(0.0, export_limit_kw - pv_kw))
+
+ # Safety minimum to prevent infinite loops if limits are misconfigured (0.5 kW)
+ discharge_kw = max(0.5, discharge_kw)
+
+ accumulated_kwh += discharge_kw * (1.0 / 60.0)
+ minutes_needed += 1
+
+ if accumulated_kwh >= max_kwh_loss:
+ break
+
+ # Add 30 mins safety margin
+ minutes_needed += 30
+
+ # Clamp the window between 60 minutes and 360 minutes (6 hours)
+ minutes_needed = max(60, min(360, minutes_needed))
+
+ morning_start = max(self.minutes_now, peak_start - minutes_needed)
+ morning_start = int(morning_start / 30) * 30 # Align to nearest 30 mins
+
+ if morning_start >= peak_start:
+ continue
+
+ # Calculate the precise export limit (Target SOC percentage) required to create this headroom
+ target_soc_kwh = max(0.0, self.soc_max - max_kwh_loss)
+ target_soc_pct = max(0.0, min(100.0, target_soc_kwh / self.soc_max * 100.0))
+ target_soc_pct = float(int(target_soc_pct))
+
+ new_window = {
+ "start": morning_start,
+ "end": peak_end,
+ "average": self.rate_export.get(morning_start, 0.0),
+ "clipping_ceiling_pct": target_soc_pct
+ }
+
+ # Tag any overlapping native export windows with the ceiling so they don't dump below it
+ for w in self.export_window_best:
+ if (w.get("start") >= morning_start and w.get("start") < peak_end) or (w.get("end") > morning_start and w.get("end") <= peak_end):
+ w["clipping_ceiling_pct"] = target_soc_pct
+
+ # Inject our new unified window
+ if not any(w.get("start") <= new_window["start"] and w.get("end") >= new_window["end"] for w in self.export_window_best):
+ self.export_window_best.append(new_window)
+ self.export_limits_best.append(target_soc_pct)
+ if getattr(self, "high_export_rates", None) is not None:
+ self.high_export_rates.append(copy.deepcopy(new_window))
+
+ self.log("Injected continuous anti-clipping candidate export window {} to {} to allow spillover absorption (Target SOC: {}%)".format(self.time_abs_str(morning_start), self.time_abs_str(peak_end), target_soc_pct))
+
def calculate_plan(self, recompute=True, debug_mode=False, publish=True):
"""
Calculate the new plan (best)
@@ -922,6 +1042,9 @@ def calculate_plan(self, recompute=True, debug_mode=False, publish=True):
# Pre-fill best export enable with Off
self.export_limits_best = [100.0 for i in range(len(self.export_window_best))]
+ # Inject export windows to create headroom for clipping peaks
+ self.inject_clipping_export_windows()
+
self.end_record = self.forecast_minutes
# Show best windows
self.log("Best charge window {}".format(self.window_as_text(self.charge_window_best, calc_percent_limit(self.charge_limit_best, self.soc_max))))
@@ -959,6 +1082,158 @@ def calculate_plan(self, recompute=True, debug_mode=False, publish=True):
pv_forecast_minute_step = self.step_data_history(self.pv_forecast_minute, self.minutes_now, forward=True, cloud_factor=self.metric_cloud_coverage)
pv_forecast_minute10_step = self.step_data_history(self.pv_forecast_minute10, self.minutes_now, forward=True, cloud_factor=min(self.metric_cloud_coverage + 0.2, 1.0) if self.metric_cloud_coverage else None, flip=True)
+ # Clipping peak: create a peak PV stream for clipping cost detection
+ # Uses raw forecast WITHOUT cloud model (no energy-conserving oscillation that smooths peaks)
+ # Scaled by amplification factor for safety margin
+ pv_forecast_peak_step = None
+ clipping_limit_effective = 0
+ clipping_limit_mode = "Unknown"
+ if self.clipping_buffer_enable:
+ pv_forecast_peak_step = self.step_data_history(self.pv_forecast_minute, self.minutes_now, forward=True, cloud_factor=None)
+
+ # Auto-Tuner: Always runs to gather data and recommend a factor
+ import os
+ import json
+
+ auto_tune_file = os.path.join(self.config_root, "clipping_auto_tune.json")
+
+ if not hasattr(self, "clipping_auto_amp"):
+ self.clipping_auto_amp = getattr(self, "clipping_amplification", 1.0)
+ self.clipping_last_tune_day = None
+ if os.path.exists(auto_tune_file):
+ try:
+ with open(auto_tune_file, "r") as f:
+ data = json.load(f)
+ self.clipping_auto_amp = data.get("auto_amp", self.clipping_auto_amp)
+ self.clipping_last_tune_day = data.get("last_tune_day", None)
+ except Exception:
+ pass
+
+ auto_amp = self.clipping_auto_amp
+ last_tune_day = self.clipping_last_tune_day
+
+ # Check if we should tune today (only once per day)
+ current_day = self.now_utc.strftime("%Y-%m-%d")
+ if current_day != last_tune_day:
+ # Retrieve last 24h of pv_power
+ days = 1
+ pv_today_hist = self.minute_data_import_export(days, self.now_utc, "pv_today", required_unit="kWh", pad=False)
+ max_pv_power = 0.0
+ if pv_today_hist:
+ pv_today_hist_max_minute = max(pv_today_hist.keys())
+ current_value = None
+ for minute in range(pv_today_hist_max_minute - 5, -5, -5):
+ current_value = pv_today_hist.get(minute, current_value)
+ next_value = pv_today_hist.get(minute - 5, current_value)
+ if current_value is not None and next_value is not None:
+ power_amount = max(0, next_value - current_value) * 60.0 / 5.0
+ if power_amount > max_pv_power:
+ max_pv_power = power_amount
+
+ # Check inverter limit
+ limit = 0.0
+ if self.inverter_limit > 0:
+ limit = self.inverter_limit / MINUTE_WATT
+
+ if limit > 0:
+ if max_pv_power >= limit * 0.98:
+ auto_amp = min(2.5, auto_amp + 0.05)
+ self.log("Clipping auto-tuner: Real clipping detected (max PV {} kW >= limit {} kW). Increased recommended auto_amp to {}".format(dp2(max_pv_power), dp2(limit), auto_amp))
+ else:
+ auto_amp = max(1.0, auto_amp - 0.01)
+ self.log("Clipping auto-tuner: No clipping detected. Decreased recommended auto_amp to {}".format(auto_amp))
+
+ try:
+ with open(auto_tune_file, "w") as f:
+ json.dump({"auto_amp": auto_amp, "last_tune_day": current_day}, f)
+ self.clipping_auto_amp = auto_amp
+ self.clipping_last_tune_day = current_day
+ except Exception:
+ pass
+
+ self.dashboard_item(self.prefix + ".clipping_recommended_amplification", state=dp2(auto_amp), attributes={"friendly_name": "Clipping Recommended Amplification", "icon": "mdi:auto-fix"})
+
+ # Apply Manual vs Auto Factor
+ if getattr(self, "clipping_auto_tune", False):
+ effective_amplification = auto_amp
+ else:
+ effective_amplification = self.clipping_amplification
+
+ # Apply ClearSky or Amplification factor
+ if getattr(self, "clipping_use_clearsky_peaks", False):
+ pv_clearsky_step = self.step_data_history(getattr(self, "pv_forecast_minuteCS", {}), self.minutes_now, forward=True, cloud_factor=None)
+ pv_forecast_peak_step = {k: max(v, pv_clearsky_step.get(k, 0) * effective_amplification) for k, v in pv_forecast_peak_step.items()}
+ elif effective_amplification != 1.0:
+ pv_forecast_peak_step = {k: v * effective_amplification for k, v in pv_forecast_peak_step.items()}
+
+ # Calculate effective clipping limit: most restrictive hardware constraint
+ if self.clipping_limit_override > 0:
+ clipping_limit_effective = self.clipping_limit_override
+ clipping_limit_mode = "Manual Override"
+ else:
+ limits = []
+ if self.inverter_limit > 0:
+ limits.append((self.inverter_limit * 60.0, "Inverter AC Capacity"))
+ if self.export_limit > 0:
+ limits.append((self.export_limit * 60.0, "DNO Export Limit"))
+ if getattr(self, "pv_ac_limit", 0) > 0:
+ limits.append((self.pv_ac_limit * 60.0, "PV AC Limit"))
+
+ if limits:
+ clipping_limit_effective, clipping_limit_mode = min(limits, key=lambda x: x[0])
+ else:
+ clipping_limit_effective = 0
+ clipping_limit_mode = "No Limit"
+
+ self.clipping_limit_effective = clipping_limit_effective
+ self.clipping_limit_mode = clipping_limit_mode
+
+ # Hybrid Clipping: Read Manual Overrides
+ self.clipping_buffer_kwh = float(self.get_arg("clipping_buffer_max_kwh", default=0.0))
+
+ start_time_str = self.get_arg("clipping_buffer_start_time", default="None")
+ end_time_str = self.get_arg("clipping_buffer_end_time", default="None")
+
+ self.clipping_buffer_start = None
+ if start_time_str and start_time_str != "None":
+ self.clipping_buffer_start = self.time_to_minutes(start_time_str)
+
+ self.clipping_buffer_end = None
+ if end_time_str and end_time_str != "None":
+ self.clipping_buffer_end = self.time_to_minutes(end_time_str)
+
+ # Calculate Implicit Buffer (Physics-based Decay Curve)
+ self.clipping_buffer_forecast_kwh = {}
+ self.clipping_remaining_today = 0.0
+ self.clipping_tomorrow = 0.0
+
+ if self.clipping_buffer_enable and pv_forecast_peak_step and clipping_limit_effective > 0:
+ step_size = getattr(self, "plan_interval_minutes", 30)
+ # Accumulate potential clipping loss per interval
+ for minute in range(0, self.forecast_minutes, step_size):
+ kwh_loss = 0.0
+ for m in range(minute, min(minute + step_size, self.forecast_minutes), PREDICT_STEP):
+ step_kwh = pv_forecast_peak_step.get(m, 0)
+ # step_kwh is energy over PREDICT_STEP minutes
+ # clipping_limit_effective is in kW. Convert it to max allowed energy in PREDICT_STEP.
+ max_kwh_allowed = clipping_limit_effective * (PREDICT_STEP / 60.0)
+ if step_kwh > max_kwh_allowed:
+ kwh_loss += (step_kwh - max_kwh_allowed)
+
+ if kwh_loss > 0:
+ self.clipping_buffer_forecast_kwh[minute] = kwh_loss
+ # Add to totals
+ minute_absolute = minute + self.minutes_now
+ midnight_today_minute = int(self.minutes_now / 1440) * 1440
+ if minute_absolute < midnight_today_minute + 1440:
+ self.clipping_remaining_today += kwh_loss
+ else:
+ self.clipping_tomorrow += kwh_loss
+
+ # If manual override is active, reflect it in the UI totals
+ if self.clipping_buffer_kwh > 0:
+ self.clipping_remaining_today = max(self.clipping_remaining_today, self.clipping_buffer_kwh)
+
# Save step data for debug
self.load_minutes_step = load_minutes_step
self.load_minutes_step10 = load_minutes_step10
@@ -970,7 +1245,19 @@ def calculate_plan(self, recompute=True, debug_mode=False, publish=True):
self.calculate_yesterday()
# Creation prediction object
- self.prediction = Prediction(self, pv_forecast_minute_step, pv_forecast_minute10_step, load_minutes_step, load_minutes_step10)
+ self.prediction = Prediction(
+ self,
+ pv_forecast_minute_step,
+ pv_forecast_minute10_step,
+ load_minutes_step,
+ load_minutes_step10,
+ pv_forecast_peak_step=pv_forecast_peak_step,
+ clipping_limit=clipping_limit_effective,
+ clipping_cost_weight=self.clipping_cost_weight if self.clipping_buffer_enable else 0,
+ clipping_buffer_kwh=self.clipping_buffer_kwh,
+ clipping_buffer_start=self.clipping_buffer_start,
+ clipping_buffer_end=self.clipping_buffer_end,
+ )
# Check if LoadML is active and disable thread pools as it causes lockup due to race conditions with NumPy
load_ml_comp = self.components.get_component("load_ml") if self.components else None
@@ -1636,6 +1923,10 @@ def optimise_export(self, window_n, record_charge_windows, try_charge_limit, cha
if self.set_export_low_power:
loop_options.extend([0.3, 0.5, 0.7])
+ # Ensure any pre-assigned fractional limits (like clipping thresholds) are evaluated
+ if try_export[window_n] not in loop_options and try_export[window_n] > 0.0 and try_export[window_n] < 99.0:
+ loop_options.append(try_export[window_n])
+
# Collect all options
results = []
results10 = []
@@ -3812,6 +4103,198 @@ def run_prediction(self, charge_limit, charge_window, export_window, export_limi
"icon": "mdi:currency-usd",
},
)
+
+ # Calculate legacy clipping entities from the cloud model's clipping curve
+ midnight_today_minute = 24 * 60 - self.minutes_now
+ clipping_today = 0.0
+ clipping_tomorrow = 0.0
+
+ if hasattr(self, "predict_clipped_best") and self.predict_clipped_best:
+ # Find closest key <= midnight_today_minute for clipping_today
+ keys = sorted(self.predict_clipped_best.keys())
+ key_today = next((k for k in reversed(keys) if k <= midnight_today_minute), 0)
+ clipping_today = self.predict_clipped_best.get(key_today, 0.0)
+ clipping_total = self.predict_clipped_best.get(keys[-1], 0.0)
+ clipping_tomorrow = max(0.0, clipping_total - clipping_today)
+
+ # Generate Clipping Visual Series (Remaining and Ceiling) for web.py charts
+ predict_clipping_remaining_best = {}
+ predict_clipping_ceiling_best = {}
+
+ clipping_limit_step = getattr(self, "clipping_limit_effective", 0) * (step / 60.0)
+ # Ensure the chart uses the exact same peak dataset as the planning engine
+ # which already includes clipping_amplification.
+ pv_forecast_peak_step = getattr(pred, "pv_forecast_peak_step", None)
+
+ manual_buffer_active = False
+ if self.clipping_buffer_kwh > 0:
+ if self.clipping_buffer_start is None or self.clipping_buffer_end is None:
+ manual_buffer_active = True
+
+ daily_cumulative_clip = {}
+ max_minute = self.forecast_minutes
+
+ buffer_start = self.clipping_buffer_start if self.clipping_buffer_start else 0
+ buffer_end = self.clipping_buffer_end if self.clipping_buffer_end else 24 * 60
+
+ for minute in range(max_minute, -step, -step):
+ remaining = 0.0
+ minute_absolute = minute + self.minutes_now
+
+ if minute <= midnight_today_minute:
+ day_index = 0
+ elif minute <= midnight_today_minute + 24 * 60:
+ day_index = 1
+ else:
+ day_index = 2
+
+ if manual_buffer_active or (self.clipping_buffer_kwh > 0 and buffer_start <= minute_absolute < buffer_end):
+ # Calculate linear decay for manual mode visual graph
+ if buffer_end > buffer_start:
+ progress = max(0.0, min(1.0, (minute_absolute - buffer_start) / (buffer_end - buffer_start)))
+ remaining = self.clipping_buffer_kwh * (1.0 - progress)
+ else:
+ remaining = self.clipping_buffer_kwh
+ elif pv_forecast_peak_step and clipping_limit_step > 0 and self.clipping_cost_weight > 0:
+ peak_pv = pv_forecast_peak_step.get(minute, 0)
+ if peak_pv > clipping_limit_step:
+ daily_cumulative_clip[day_index] = daily_cumulative_clip.get(day_index, 0.0) + (peak_pv - clipping_limit_step)
+ remaining = daily_cumulative_clip.get(day_index, 0.0)
+
+ predict_clipping_remaining_best[minute] = round(remaining, 4)
+ predict_clipping_ceiling_best[minute] = round(self.soc_max - remaining, 4)
+
+ # Sort dictionaries to ensure forwards time order for ApexCharts rendering
+ self.predict_clipping_remaining_best = dict(sorted(predict_clipping_remaining_best.items()))
+ self.predict_clipping_ceiling_best = dict(sorted(predict_clipping_ceiling_best.items()))
+
+ self.clipping_remaining_today = clipping_today
+ self.clipping_tomorrow = clipping_tomorrow
+ self.clipping_mitigated_today = clipping_today
+
+ # Add Clipping Summary Dashboard Items
+ self.dashboard_item(
+ self.prefix + ".clipping_remaining",
+ state=dp2(self.clipping_remaining_today),
+ attributes={
+ "results": self.filtered_times(self.predict_clipping_remaining_best),
+ "friendly_name": "Clipping Remaining",
+ "unit_of_measurement": "kWh",
+ "device_class": "energy",
+ "icon": "mdi:solar-power-variant",
+ },
+ )
+ self.dashboard_item(
+ self.prefix + ".clipping_ceiling",
+ state=dp2(self.predict_clipping_ceiling_best.get(0, self.soc_max)),
+ attributes={
+ "results": self.filtered_times(self.predict_clipping_ceiling_best),
+ "friendly_name": "Clipping Ceiling",
+ "unit_of_measurement": "kWh",
+ "device_class": "energy",
+ "icon": "mdi:arrow-collapse-up",
+ },
+ )
+ self.dashboard_item(
+ self.prefix + ".clipping_tomorrow",
+ state=dp2(self.clipping_tomorrow),
+ attributes={
+ "friendly_name": "Clipping Forecast Tomorrow",
+ "unit_of_measurement": "kWh",
+ "device_class": "energy",
+ "icon": "mdi:solar-power-variant-outline",
+ },
+ )
+ self.dashboard_item(
+ self.prefix + ".clipping_mitigated_today",
+ state=dp2(getattr(self, "clipping_mitigated_today", 0.0)),
+ attributes={
+ "friendly_name": "Clipping Mitigated Today",
+ "unit_of_measurement": "kWh",
+ "device_class": "energy",
+ "icon": "mdi:battery-check",
+ },
+ )
+
+ clipping_status_text = "No clipping forecast."
+ clipping_start_iso = None
+ clipping_end_iso = None
+
+ if self.clipping_buffer_kwh > 0:
+
+ def format_time_human(minute):
+ if minute is None:
+ return "N/A"
+ target_dt = self.midnight + timedelta(minutes=minute)
+ if target_dt.date() == self.midnight.date():
+ return target_dt.strftime("%H:%M")
+ else:
+ return target_dt.strftime("Tomorrow %H:%M")
+
+ start_str = format_time_human(self.clipping_buffer_start)
+ end_str = format_time_human(self.clipping_buffer_end)
+ if self.clipping_buffer_start is not None:
+ clipping_start_iso = (self.midnight_utc + timedelta(minutes=self.clipping_buffer_start)).isoformat()
+ if self.clipping_buffer_end is not None:
+ clipping_end_iso = (self.midnight_utc + timedelta(minutes=self.clipping_buffer_end)).isoformat()
+
+ if self.clipping_buffer_start is not None and self.clipping_buffer_end is not None:
+ clipping_status_text = "{} kWh clipping forecast ({}) between {} and {}.".format(dp2(self.clipping_buffer_kwh), self.clipping_mode, start_str, end_str)
+ else:
+ clipping_status_text = "{} kWh manual clipping buffer override active.".format(dp2(self.clipping_buffer_kwh))
+
+ elif getattr(self, "clipping_buffer_enable", False) and getattr(self, "clipping_buffer_forecast_kwh", {}):
+ def format_time_human_abs(minute_absolute):
+ target_dt = self.midnight + timedelta(minutes=minute_absolute)
+ return target_dt.strftime("%H:%M")
+
+ today_start = None
+ today_end = None
+ tomorrow_start = None
+ tomorrow_end = None
+
+ midnight_today_minute = int(self.minutes_now / 1440) * 1440
+ for minute_relative, kwh_loss in sorted(self.clipping_buffer_forecast_kwh.items()):
+ minute_absolute = minute_relative + self.minutes_now
+ if minute_absolute < midnight_today_minute + 1440:
+ if today_start is None: today_start = minute_absolute
+ today_end = minute_absolute + 30
+ else:
+ if tomorrow_start is None: tomorrow_start = minute_absolute
+ tomorrow_end = minute_absolute + 30
+
+ status_parts = []
+ if self.clipping_remaining_today > 0 and today_start is not None:
+ status_parts.append("{} kWh clipping buffer remaining between {} and {} today".format(
+ dp2(self.clipping_remaining_today),
+ format_time_human_abs(today_start),
+ format_time_human_abs(today_end)
+ ))
+ if self.clipping_tomorrow > 0 and tomorrow_start is not None:
+ status_parts.append("{} kWh clipping buffer forecast between {} and {} tomorrow".format(
+ dp2(self.clipping_tomorrow),
+ format_time_human_abs(tomorrow_start),
+ format_time_human_abs(tomorrow_end)
+ ))
+
+ if status_parts:
+ clipping_status_text = ". ".join(status_parts) + "."
+
+ self.dashboard_item(
+ self.prefix + ".clipping_status",
+ state=clipping_status_text,
+ attributes={
+ "friendly_name": "Clipping Buffer Status",
+ "icon": "mdi:information-outline",
+ "results": self.filtered_times(self.clipping_buffer_forecast_kwh),
+ "clipping_start": clipping_start_iso,
+ "clipping_end": clipping_end_iso,
+ "clipping_mode": self.clipping_mode,
+ "clipping_remaining_today": dp2(self.clipping_remaining_today),
+ "clipping_tomorrow": dp2(self.clipping_tomorrow),
+ "clipping_mitigated_today": dp2(getattr(self, "clipping_mitigated_today", 0.0)),
+ },
+ )
self.dashboard_item(self.prefix + ".record", state=0.0, attributes={"results": self.filtered_times(record_time), "friendly_name": "Prediction window", "state_class": "measurement"})
self.dashboard_item(
self.prefix + ".iboost_best",
diff --git a/apps/predbat/predbat.py b/apps/predbat/predbat.py
index 772ce7883..2391b2ec8 100644
--- a/apps/predbat/predbat.py
+++ b/apps/predbat/predbat.py
@@ -514,6 +514,14 @@ def reset(self):
self.isCharging_Target = 0
self.isExporting = False
self.isExporting_Target = 0
+ self.clipping_buffer_kwh = 0
+ self.clipping_buffer_forecast_kwh = {}
+ self.clipping_buffer_start = None
+ self.clipping_buffer_end = None
+ self.clipping_remaining_today = 0.0
+ self.clipping_tomorrow = 0.0
+ self.clipping_mitigated_today = 0.0
+ self.clipping_mode = "Auto"
self.savings_today_predbat = 0.0
self.savings_today_predbat_soc = 0.0
self.savings_today_pvbat = 0.0
diff --git a/apps/predbat/prediction.py b/apps/predbat/prediction.py
index 6846d56e9..c0cb99018 100644
--- a/apps/predbat/prediction.py
+++ b/apps/predbat/prediction.py
@@ -92,7 +92,22 @@ class Prediction:
Class to hold prediction input and output data and the run function
"""
- def __init__(self, base=None, pv_forecast_minute_step=None, pv_forecast_minute10_step=None, load_minutes_step=None, load_minutes_step10=None, soc_kw=None, soc_max=None):
+ def __init__(
+ self,
+ base=None,
+ pv_forecast_minute_step=None,
+ pv_forecast_minute10_step=None,
+ load_minutes_step=None,
+ load_minutes_step10=None,
+ soc_kw=None,
+ soc_max=None,
+ pv_forecast_peak_step=None,
+ clipping_limit=0,
+ clipping_cost_weight=0,
+ clipping_buffer_kwh=0,
+ clipping_buffer_start=None,
+ clipping_buffer_end=None,
+ ):
global PRED_GLOBAL
if base:
self.minutes_now = base.minutes_now
@@ -181,6 +196,12 @@ def __init__(self, base=None, pv_forecast_minute_step=None, pv_forecast_minute10
self.rate_export = base.rate_export
self.io_adjusted = base.io_adjusted
self.rate_max = base.rate_max
+ self.clipping_limit = clipping_limit
+ self.clipping_cost_weight = clipping_cost_weight
+ self.clipping_buffer_kwh = clipping_buffer_kwh
+ self.clipping_buffer_start = clipping_buffer_start
+ self.clipping_buffer_end = clipping_buffer_end
+ self.pv_forecast_peak_step = pv_forecast_peak_step
self.pv_forecast_minute_step = pv_forecast_minute_step
self.pv_forecast_minute10_step = pv_forecast_minute10_step
self.load_minutes_step = load_minutes_step
@@ -481,6 +502,7 @@ def run_prediction(self, charge_limit, charge_window, export_window, export_limi
first_charge = end_record
export_to_first_charge = 0
clipped_today = 0
+ clipping_penalty_total = 0
predict_soc = {}
car_charging_soc_next = self.car_charging_soc_next[:]
iboost_next = self.iboost_next
@@ -516,6 +538,11 @@ def run_prediction(self, charge_limit, charge_window, export_window, export_limi
pv_ac_limit = self.pv_ac_limit * step
set_charge_low_power = self.set_charge_window and self.set_charge_low_power and (save in ["best", "best10", "test"])
carbon_enable = self.carbon_enable
+ pv_forecast_peak_step = self.pv_forecast_peak_step
+ clipping_limit = self.clipping_limit
+ # clipping_limit is in kW. We need the energy limit in kWh for the given step.
+ clipping_limit_step = clipping_limit * (step / 60.0) if clipping_limit else 0
+ clipping_cost_weight = self.clipping_cost_weight
reserve = self.reserve
soc_max = self.soc_max
reserve_percent = calc_percent_limit(reserve, soc_max)
@@ -658,6 +685,45 @@ def run_prediction(self, charge_limit, charge_window, export_window, export_limi
pv_now = pv_forecast_minute_step_flat[minute]
load_yesterday = load_minutes_step_flat[minute]
+ # Clipping peak cost penalty: check if worst-case PV would exceed the clipping limit
+ # and add a cost to the metric if the battery can't absorb the excess.
+ # This makes the optimizer prefer plans that leave battery headroom during peak solar.
+ manual_buffer_active = False
+ if self.clipping_buffer_kwh > 0:
+ if self.clipping_buffer_start is None or self.clipping_buffer_end is None:
+ manual_buffer_active = True
+ elif self.clipping_buffer_start <= minute_absolute < self.clipping_buffer_end:
+ manual_buffer_active = True
+
+ if manual_buffer_active:
+ target_headroom = self.clipping_buffer_kwh
+ battery_headroom = max(soc_max - soc, 0) * battery_loss
+ if battery_headroom < target_headroom:
+ # Apply a severe penalty to force the optimizer to respect the manual buffer limit
+ clipping_penalty = (target_headroom - battery_headroom) * export_rate * clipping_cost_weight * 5
+ metric += clipping_penalty
+ clipping_penalty_total += clipping_penalty
+ elif pv_forecast_peak_step and clipping_limit_step > 0 and clipping_cost_weight > 0:
+ peak_pv = pv_forecast_peak_step.get(minute, 0)
+ if peak_pv > clipping_limit_step:
+ potential_clip = peak_pv - clipping_limit_step
+ # How much could the battery absorb right now?
+ battery_headroom = max(soc_max - soc, 0) * battery_loss
+ # Cap by max charge rate (conservative — doesn't account for curve, but safe)
+ max_charge_step = battery_rate_max_charge * battery_rate_max_scaling * step
+ absorbable = min(battery_headroom, max_charge_step)
+ unmitigated_clip = max(potential_clip - absorbable, 0)
+
+ if absorbable > 0:
+ # Actually route the absorbed free solar into the battery state of charge
+ # taking into account the battery's charging efficiency loss
+ soc += absorbable * battery_loss
+
+ if unmitigated_clip > 0:
+ clipping_penalty = unmitigated_clip * export_rate * clipping_cost_weight
+ metric += clipping_penalty
+ clipping_penalty_total += clipping_penalty
+
# Count PV kWh
pv_kwh += pv_now
diff --git a/apps/predbat/solcast.py b/apps/predbat/solcast.py
index c4d743057..7c6aa103b 100644
--- a/apps/predbat/solcast.py
+++ b/apps/predbat/solcast.py
@@ -74,6 +74,10 @@ def initialize(
pv_forecast_tomorrow,
pv_forecast_d3,
pv_forecast_d4,
+ pv_clearsky_today,
+ pv_clearsky_tomorrow,
+ pv_clearsky_d3,
+ pv_clearsky_d4,
pv_scaling,
open_meteo_forecast,
open_meteo_forecast_max_age,
@@ -90,6 +94,10 @@ def initialize(
self.pv_forecast_tomorrow = pv_forecast_tomorrow
self.pv_forecast_d3 = pv_forecast_d3
self.pv_forecast_d4 = pv_forecast_d4
+ self.pv_clearsky_today = pv_clearsky_today
+ self.pv_clearsky_tomorrow = pv_clearsky_tomorrow
+ self.pv_clearsky_d3 = pv_clearsky_d3
+ self.pv_clearsky_d4 = pv_clearsky_d4
self.pv_scaling = pv_scaling
self.open_meteo_forecast = open_meteo_forecast
self.open_meteo_forecast_max_age = open_meteo_forecast_max_age
@@ -255,21 +263,22 @@ def convert_azimuth(self, az):
async def download_open_meteo_ensemble_data(self, lat, lon, tilt, az, kwp, system_loss):
"""
- Download Open-Meteo ensemble data for P10 solar estimate.
- Returns a dict mapping ISO timestamp strings to P10 kW values.
+ Download Open-Meteo ensemble data for P10 and P90 solar estimates.
+ Returns a tuple of (p10_dict, p90_dict) mapping ISO timestamp strings to kW values.
"""
url = "https://ensemble-api.open-meteo.com/v1/ensemble?models=icon_seamless&latitude={lat}&longitude={lon}&hourly=global_tilted_irradiance&tilt={tilt}&azimuth={az}&forecast_days=4&timezone=UTC".format(lat=lat, lon=lon, tilt=tilt, az=az)
data = await self.cache_get_url(url, params={}, max_age=self.open_meteo_forecast_max_age * 60)
if not data:
- return {}
+ return {}, {}
hourly = data.get("hourly", {})
times = hourly.get("time", [])
member_keys = [k for k in hourly if k.startswith("global_tilted_irradiance_member")]
if not member_keys or not times:
- return {}
+ return {}, {}
- result = {}
+ result_p10 = {}
+ result_p90 = {}
for idx, ts in enumerate(times):
values = []
for k in member_keys:
@@ -277,13 +286,17 @@ async def download_open_meteo_ensemble_data(self, lat, lon, tilt, az, kwp, syste
if val is not None:
values.append(val)
if not values:
- result[ts] = 0.0
+ result_p10[ts] = 0.0
+ result_p90[ts] = 0.0
continue
values.sort()
p10_idx = max(0, math.ceil(len(values) * 0.10) - 1)
+ p90_idx = min(len(values) - 1, math.floor(len(values) * 0.90))
gti_p10 = values[p10_idx]
- result[ts] = dp4((gti_p10 / 1000.0) * kwp * (1.0 - system_loss))
- return result
+ gti_p90 = values[p90_idx]
+ result_p10[ts] = dp4((gti_p10 / 1000.0) * kwp * (1.0 - system_loss))
+ result_p90[ts] = dp4((gti_p90 / 1000.0) * kwp * (1.0 - system_loss))
+ return result_p10, result_p90
async def download_open_meteo_data(self, configs=None):
"""
@@ -341,6 +354,7 @@ async def download_open_meteo_data(self, configs=None):
hourly = data.get("hourly", {})
times = hourly.get("time", [])
gti_values = hourly.get("global_tilted_irradiance", [])
+ cs_gti_values = hourly.get("clear_sky_gti", [])
temp_values = hourly.get("temperature_2m", [])
wind_values = hourly.get("wind_speed_10m", [])
@@ -348,12 +362,12 @@ async def download_open_meteo_data(self, configs=None):
self.log("Warn: SolarAPI: Open-Meteo data for lat {} lon {} has no hourly data".format(lat, lon))
continue
- ensemble_p10 = await self.download_open_meteo_ensemble_data(lat, lon, tilt, az, kwp, system_loss)
+ ensemble_p10, ensemble_p90 = await self.download_open_meteo_ensemble_data(lat, lon, tilt, az, kwp, system_loss)
# Pass 1: compute instantaneous kW at each UTC timestamp sample.
# Open-Meteo returns point-in-time irradiance (W/m²) at the start of each hour,
# so we must integrate over the period rather than treating the sample as the period energy.
- instant_kw = {} # datetime stamp -> (pv50_kw, pv10_kw)
+ instant_kw = {} # datetime stamp -> (pv50_kw, pv10_kw, pv_cs_kw)
instant_stamps = []
for idx, ts in enumerate(times):
if idx >= len(gti_values):
@@ -370,15 +384,24 @@ async def download_open_meteo_data(self, configs=None):
# Cap at 1.1 (10% above STC) to prevent unrealistic gains at very cold temperatures.
eta_temp = max(0.5, min(1.1, 1.0 - 0.004 * (t_cell - 25.0)))
pv50_inst = dp4((gti / 1000.0) * kwp * eta_temp * (1.0 - system_loss))
+
raw_p10 = ensemble_p10.get(ts)
- # ensemble_p10 was computed without temperature derating; apply eta_temp now
+ raw_p90 = ensemble_p90.get(ts)
+ # ensemble_p10/p90 were computed without temperature derating; apply eta_temp now
pv10_inst = dp4(min(raw_p10 * eta_temp, pv50_inst) if raw_p10 is not None else pv50_inst * 0.7)
+
+ if idx < len(cs_gti_values) and cs_gti_values[idx] is not None:
+ cs_gti = cs_gti_values[idx]
+ pv_cs_inst = dp4((cs_gti / 1000.0) * kwp * eta_temp * (1.0 - system_loss))
+ else:
+ # Fallback to P90 ensemble if Clear Sky GTI is unavailable (which is always true for Open-Meteo right now)
+ pv_cs_inst = dp4(max(raw_p90 * eta_temp, pv50_inst) if raw_p90 is not None else pv50_inst * 1.3)
try:
stamp = datetime.strptime(ts, "%Y-%m-%dT%H:%M")
stamp = stamp.replace(tzinfo=pytz.utc)
except (ValueError, TypeError):
continue
- instant_kw[stamp] = (pv50_inst, pv10_inst)
+ instant_kw[stamp] = (pv50_inst, pv10_inst, pv_cs_inst)
instant_stamps.append(stamp)
# Pass 2: trapezoidal integration — energy over [T, T+1h] = 0.5*(kW_at_T + kW_at_T+1h).
@@ -389,21 +412,24 @@ async def download_open_meteo_data(self, configs=None):
next_stamp = instant_stamps[i + 1]
if (next_stamp - stamp) != timedelta(hours=1):
continue
- pv50_start, pv10_start = instant_kw[stamp]
- pv50_end, pv10_end = instant_kw[next_stamp]
+ pv50_start, pv10_start, pv_cs_start = instant_kw[stamp]
+ pv50_end, pv10_end, pv_cs_end = instant_kw[next_stamp]
pv50 = dp4(0.5 * (pv50_start + pv50_end))
pv10 = dp4(0.5 * (pv10_start + pv10_end))
+ pv_cs = dp4(0.5 * (pv_cs_start + pv_cs_end))
# Apply per-month site shading correction from Google Solar API if available
if shading_factors and len(shading_factors) == 12:
shading_month = shading_factors[stamp.month - 1]
pv50 = dp4(pv50 * shading_month)
pv10 = dp4(pv10 * shading_month)
+ pv_cs = dp4(pv_cs * shading_month)
- data_item = {"period_start": stamp.strftime(TIME_FORMAT), "pv_estimate": pv50, "pv_estimate10": pv10}
+ data_item = {"period_start": stamp.strftime(TIME_FORMAT), "pv_estimate": pv50, "pv_estimate10": pv10, "pv_clearsky": pv_cs}
if stamp in period_data:
period_data[stamp]["pv_estimate"] = dp4(period_data[stamp]["pv_estimate"] + pv50)
period_data[stamp]["pv_estimate10"] = dp4(period_data[stamp]["pv_estimate10"] + pv10)
+ period_data[stamp]["pv_clearsky"] = dp4(period_data[stamp]["pv_clearsky"] + pv_cs)
else:
period_data[stamp] = data_item
@@ -658,12 +684,14 @@ async def download_solcast_data(self):
pv50 = forecast.get("pv_estimate", 0) / 60 * period_minutes
pv10 = forecast.get("pv_estimate10", forecast.get("pv_estimate", 0)) / 60 * period_minutes
pv90 = forecast.get("pv_estimate90", forecast.get("pv_estimate", 0)) / 60 * period_minutes
+ pv_cs = forecast.get("clearsky_estimate", pv90) / 60 * period_minutes
- data_item = {"period_start": period_start_stamp.strftime(TIME_FORMAT), "pv_estimate": pv50, "pv_estimate10": pv10, "pv_estimate90": pv90}
+ data_item = {"period_start": period_start_stamp.strftime(TIME_FORMAT), "pv_estimate": pv50, "pv_estimate10": pv10, "pv_estimate90": pv90, "pv_clearsky": pv_cs}
if period_start_stamp in period_data:
period_data[period_start_stamp]["pv_estimate"] += pv50
period_data[period_start_stamp]["pv_estimate10"] += pv10
period_data[period_start_stamp]["pv_estimate90"] += pv90
+ period_data[period_start_stamp]["pv_clearsky"] += pv_cs
else:
period_data[period_start_stamp] = data_item
@@ -744,17 +772,20 @@ def publish_pv_stats(self, pv_forecast_data, divide_by, period):
total_left_today10 = 0
total_left_today90 = 0
total_left_todayCL = 0
+ total_left_todayCS = 0
forecast_day = {}
total_day = {}
total_day10 = {}
total_day90 = {}
total_dayCL = {}
+ total_dayCS = {}
days = 0
for day in range(7):
total_day[day] = 0
total_day10[day] = 0
total_day90[day] = 0
total_dayCL[day] = 0
+ total_dayCS[day] = 0
forecast_day[day] = []
midnight_today = self.midnight_utc
@@ -765,6 +796,7 @@ def publish_pv_stats(self, pv_forecast_data, divide_by, period):
power_now10 = 0
power_now90 = 0
power_nowCL = 0
+ power_nowCS = 0
point_gap = period
for entry in pv_forecast_data:
@@ -782,6 +814,7 @@ def publish_pv_stats(self, pv_forecast_data, divide_by, period):
total_day10[day] = 0
total_day90[day] = 0
total_dayCL[day] = 0
+ total_dayCS[day] = 0
forecast_day[day] = []
days = max(days, day + 1)
@@ -789,22 +822,26 @@ def publish_pv_stats(self, pv_forecast_data, divide_by, period):
pv_estimate10 = entry.get("pv_estimate10", pv_estimate)
pv_estimate90 = entry.get("pv_estimate90", pv_estimate)
pv_estimateCL = entry.get("pv_estimateCL", pv_estimate)
+ pv_clearsky = entry.get("pv_clearsky", pv_estimate90)
pv_estimate /= divide_by
pv_estimate10 /= divide_by
pv_estimate90 /= divide_by
pv_estimateCL /= divide_by
+ pv_clearsky /= divide_by
total_day[day] += pv_estimate
total_day10[day] += pv_estimate10
total_day90[day] += pv_estimate90
total_dayCL[day] += pv_estimateCL
+ total_dayCS[day] += pv_clearsky
if day == 0 and this_point > now:
total_left_today += pv_estimate
total_left_today10 += pv_estimate10
total_left_today90 += pv_estimate90
total_left_todayCL += pv_estimateCL
+ total_left_todayCS += pv_clearsky
next_point = this_point + timedelta(minutes=point_gap)
if this_point <= now and next_point > now:
@@ -812,6 +849,7 @@ def publish_pv_stats(self, pv_forecast_data, divide_by, period):
power_now10 = pv_estimate10 * power_scale
power_now90 = pv_estimate90 * power_scale
power_nowCL = pv_estimateCL * power_scale
+ power_nowCS = pv_clearsky * power_scale
# Add this slot into the total left today but scaled for the time since this point
if day == 0:
@@ -820,6 +858,7 @@ def publish_pv_stats(self, pv_forecast_data, divide_by, period):
total_left_today10 += pv_estimate10 * left_this_slot_scale
total_left_today90 += pv_estimate90 * left_this_slot_scale
total_left_todayCL += pv_estimateCL * left_this_slot_scale
+ total_left_todayCS += pv_clearsky * left_this_slot_scale
fentry = {
"period_start": entry["period_start"],
@@ -827,6 +866,7 @@ def publish_pv_stats(self, pv_forecast_data, divide_by, period):
"pv_estimate10": dp2(pv_estimate10 * power_scale),
"pv_estimate90": dp2(pv_estimate90 * power_scale),
"pv_estimateCL": dp2(pv_estimateCL * power_scale),
+ "pv_clearsky": dp2(pv_clearsky * power_scale),
}
forecast_day[day].append(fentry)
@@ -836,15 +876,17 @@ def publish_pv_stats(self, pv_forecast_data, divide_by, period):
for day in range(days):
if day == 0:
self.log(
- "SolarAPI: PV Forecast for today is {} ({} 10%, {} 90%, {} calibrated) kWh, and PV left today is {} ({} 10%, {} 90%, {} calibrated) kWh".format(
+ "SolarAPI: PV Forecast for today is {} ({} 10%, {} 90%, {} calibrated, {} clearsky) kWh, and PV left today is {} ({} 10%, {} 90%, {} calibrated, {} clearsky) kWh".format(
dp2(total_day[day]),
dp2(total_day10[day]),
dp2(total_day90[day]),
dp2(total_dayCL[day]),
+ dp2(total_dayCS[day]),
dp2(total_left_today),
dp2(total_left_today10),
dp2(total_left_today90),
dp2(total_left_todayCL),
+ dp2(total_left_todayCS),
)
)
self.dashboard_item(
@@ -860,10 +902,12 @@ def publish_pv_stats(self, pv_forecast_data, divide_by, period):
"total10": dp2(total_day10[day]),
"total90": dp2(total_day90[day]),
"totalCL": dp2(total_dayCL[day]),
+ "totalCS": dp2(total_dayCS[day]),
"remaining": dp2(total_left_today),
"remaining10": dp2(total_left_today10),
"remaining90": dp2(total_left_today90),
"remainingCL": dp2(total_left_todayCL),
+ "remainingCS": dp2(total_left_todayCS),
"detailedForecast": forecast_day[day],
},
app="solar",
@@ -881,18 +925,19 @@ def publish_pv_stats(self, pv_forecast_data, divide_by, period):
"now10": dp2(power_now10),
"now90": dp2(power_now90),
"nowCL": dp2(power_nowCL),
+ "nowCS": dp2(power_nowCS),
"remaining": dp2(total_left_today),
"remaining10": dp2(total_left_today10),
"remaining90": dp2(total_left_today90),
"remainingCL": dp2(total_left_todayCL),
+ "remainingCS": dp2(total_left_todayCS),
},
app="solar",
)
else:
day_name = "tomorrow" if day == 1 else "d{}".format(day)
day_name_long = day_name if day == 1 else "day {}".format(day)
- self.log("SolarAPI: PV Forecast for day {} is {} ({} 10%, {} 90%, {} calibrated) kWh".format(day_name, dp2(total_day[day]), dp2(total_day10[day]), dp2(total_day90[day]), dp2(total_dayCL[day])))
-
+ self.log("SolarAPI: PV Forecast for day {} is {} ({} 10%, {} 90%, {} calibrated, {} clearsky) kWh".format(day_name, dp2(total_day[day]), dp2(total_day10[day]), dp2(total_day90[day]), dp2(total_dayCL[day]), dp2(total_dayCS[day])))
self.dashboard_item(
"sensor." + self.prefix + "_pv_" + day_name,
state=dp2(total_dayCL[day] if calibration_on else total_day[day]),
@@ -906,6 +951,7 @@ def publish_pv_stats(self, pv_forecast_data, divide_by, period):
"total10": dp2(total_day10[day]),
"total90": dp2(total_day90[day]),
"totalCL": dp2(total_dayCL[day]),
+ "totalCS": dp2(total_dayCS[day]),
"detailedForecast": forecast_day[day],
},
app="solar",
@@ -1142,6 +1188,7 @@ def pv_calibration(self, pv_forecast_minute, pv_forecast_minute10, pv_forecast_d
pv_estimateCL = {}
pv_estimate10 = {}
pv_estimate90 = {}
+ pv_historical = {}
# The after scaling cap will be applied, but remember that the input data is
# When we have a valid observed peak (from history or forecast history) cap to the lower of
# the inverter rating and that observed peak. With no valid history (e.g. all days excluded
@@ -1153,6 +1200,10 @@ def pv_calibration(self, pv_forecast_minute, pv_forecast_minute10, pv_forecast_d
capped_data = min(max_kwh_cap, observed_cap)
else:
capped_data = max_kwh_cap
+
+ # Historical max curve
+ peak_hist_avg = max(pv_power_hist_by_slot.values()) if pv_power_hist_by_slot else 0
+ hist_max_scaling = max_pv_power_hist / peak_hist_avg if peak_hist_avg > 0 else 1.0
for minute in range(0, max(pv_forecast_minute.keys()) + 1, self.plan_interval_minutes):
pv_value = 0
for offset in range(0, self.plan_interval_minutes, 1):
@@ -1162,6 +1213,10 @@ def pv_calibration(self, pv_forecast_minute, pv_forecast_minute10, pv_forecast_d
pv_estimate10[minute] = dp4(min(pv_value * worst_day_scaling, capped_data))
pv_estimate90[minute] = dp4(min(pv_value * best_day_scaling, capped_data))
+ slot = (int(minute / self.plan_interval_minutes) * self.plan_interval_minutes) % (24 * 60)
+ pv_max = pv_power_hist_by_slot.get(slot, 0) * hist_max_scaling
+ pv_historical[minute] = dp4(min(pv_max / 60 * self.plan_interval_minutes, capped_data))
+
for entry in pv_forecast_data:
period_start = entry.get("period_start", "")
if period_start:
@@ -1175,9 +1230,11 @@ def pv_calibration(self, pv_forecast_minute, pv_forecast_minute10, pv_forecast_d
calibrated = 0
calibrated10 = 0
calibrated90 = 0
+ calibratedMAX = 0
has_calibrated = False
has_calibrated10 = False
has_calibrated90 = False
+ has_calibratedMAX = False
for i in range(slots_per_period):
s = slot + i * self.plan_interval_minutes
v = pv_estimateCL.get(s, None)
@@ -1192,10 +1249,16 @@ def pv_calibration(self, pv_forecast_minute, pv_forecast_minute10, pv_forecast_d
if v90 is not None:
calibrated90 += v90
has_calibrated90 = True
+ vMAX = pv_historical.get(s, None)
+ if vMAX is not None:
+ calibratedMAX += vMAX
+ has_calibratedMAX = True
# When we store the data we have to reverse the divide_by factor
if has_calibrated:
entry["pv_estimateCL"] = calibrated * divide_by
+ if has_calibratedMAX:
+ entry["pv_historical"] = calibratedMAX * divide_by
if create_pv10 and has_calibrated10:
entry["pv_estimate10"] = calibrated10 * divide_by
if create_pv10 and has_calibrated90:
@@ -1212,26 +1275,54 @@ def pv_calibration(self, pv_forecast_minute, pv_forecast_minute10, pv_forecast_d
# Do we use calibrated or raw data?
if self.get_arg("metric_pv_calibration_enable", default=True):
self.log("SolarAPI: PV Calibration: Using calibrated PV data")
- return pv_forecast_minute_adjusted, pv_forecast_minute10, pv_forecast_data
+ return pv_forecast_minute_adjusted, pv_forecast_minute10, pv_forecast_data, pv_historical
else:
- return pv_forecast_minute, pv_forecast_minute10, pv_forecast_data
+ return pv_forecast_minute, pv_forecast_minute10, pv_forecast_data, pv_historical
- def pack_and_store_forecast(self, pv_forecast_minute, pv_forecast_minute10):
+ def pack_and_store_forecast(self, pv_forecast_minute, pv_forecast_minute10, pv_forecast_minute90=None, pv_clearsky_minute=None, pv_max_minute=None):
pv_forecast_pack = {}
pv_forecast_pack10 = {}
-
+ pv_forecast_pack90 = {}
+ pv_forecast_pack_clearsky = {}
+ pv_forecast_pack_historical = {}
prev_value = -1
prev_value10 = -1
+ prev_value90 = -1
+ prev_value_clearsky = -1
+ prev_value_historical = -1
+
+ # Pre-fill dictionaries to ensure interpolation for packing
+ def get_interp_val(data, m):
+ if not data:
+ return 0
+ if m in data:
+ return data[m]
+ # Use plan_interval fallback
+ last_tick = (m // self.plan_interval_minutes) * self.plan_interval_minutes
+ return data.get(last_tick, 0)
for minute in range(0, self.forecast_days * 24 * 60):
current_value = dp4(pv_forecast_minute.get(minute, 0))
current_value10 = dp4(pv_forecast_minute10.get(minute, 0))
+ current_value90 = dp4(get_interp_val(pv_forecast_minute90, minute))
+ current_value_clearsky = dp4(get_interp_val(pv_clearsky_minute, minute))
+ current_value_historical = dp4(get_interp_val(pv_max_minute, minute))
+
if current_value != prev_value:
pv_forecast_pack[minute] = current_value
prev_value = current_value
if current_value10 != prev_value10:
pv_forecast_pack10[minute] = current_value10
prev_value10 = current_value10
+ if current_value90 != prev_value90:
+ pv_forecast_pack90[minute] = current_value90
+ prev_value90 = current_value90
+ if current_value_clearsky != prev_value_clearsky:
+ pv_forecast_pack_clearsky[minute] = current_value_clearsky
+ prev_value_clearsky = current_value_clearsky
+ if current_value_historical != prev_value_historical:
+ pv_forecast_pack_historical[minute] = current_value_historical
+ prev_value_historical = current_value_historical
current_pv_power = dp4(pv_forecast_minute.get(self.minutes_now, 0))
@@ -1244,6 +1335,9 @@ def pack_and_store_forecast(self, pv_forecast_minute, pv_forecast_minute10):
"relative_time": self.midnight_utc.strftime(TIME_FORMAT),
"forecast": pv_forecast_pack,
"forecast10": pv_forecast_pack10,
+ "forecast90": pv_forecast_pack90,
+ "forecast_clearsky": pv_forecast_pack_clearsky,
+ "forecast_historical": pv_forecast_pack_historical,
"unit_of_measurement": "kW",
"device_class": "power",
"state_class": "measurement",
@@ -1265,7 +1359,9 @@ async def fetch_pv_forecast(self):
max_kwh = 9999
using_ha_data = False
- if self.forecast_solar:
+ pv_forecast_primary = self.get_arg("pv_forecast_primary", "auto", indirect=False)
+
+ if (pv_forecast_primary == "forecast_solar") or (pv_forecast_primary == "auto" and self.forecast_solar):
self.log("SolarAPI: Obtaining solar forecast from Forecast Solar API")
pv_forecast_data, max_kwh = await self.download_forecast_solar_data()
divide_by = 30.0
@@ -1274,12 +1370,12 @@ async def fetch_pv_forecast(self):
self.log("SolarAPI: Forecast Solar returned no data, falling back to Open-Meteo backup")
backup_configs = self.open_meteo_forecast if self.open_meteo_forecast else self.forecast_solar
pv_forecast_data, max_kwh = await self.download_open_meteo_data(configs=backup_configs)
- elif self.open_meteo_forecast:
+ elif (pv_forecast_primary == "openmeteo") or (pv_forecast_primary == "auto" and self.open_meteo_forecast and self.get_arg("clipping_clearsky_source", "auto", indirect=False) != "openmeteo"):
self.log("SolarAPI: Obtaining solar forecast from Open-Meteo API")
pv_forecast_data, max_kwh = await self.download_open_meteo_data()
divide_by = 30.0
create_pv10 = True
- elif self.solcast_host and self.solcast_api_key:
+ elif (pv_forecast_primary == "solcast_api") or (pv_forecast_primary == "auto" and self.solcast_host and self.solcast_api_key):
self.log("SolarAPI: Obtaining solar forecast from Solcast API")
pv_forecast_data = await self.download_solcast_data()
divide_by = 30.0
@@ -1315,6 +1411,47 @@ async def fetch_pv_forecast(self):
self.log("SolarAPI: PV Forecast today adds up to {} kWh, and total sensors add up to {} kWh, factor is {}".format(pv_forecast_total_data, pv_forecast_total_sensor, factor))
if pv_forecast_data:
+ # Optional overlay of ClearSky data from a secondary source
+ clipping_clearsky_source = self.get_arg("clipping_clearsky_source", "auto", indirect=False)
+ if clipping_clearsky_source == "openmeteo" and self.open_meteo_forecast:
+ self.log("SolarAPI: Overlaying ClearSky data from Open-Meteo API")
+ om_data, _ = await self.download_open_meteo_data()
+ if om_data:
+ om_lookup = {item["period_start"]: item.get("pv_clearsky", item.get("pv_estimate90", 0)) for item in om_data}
+ for item in pv_forecast_data:
+ ts = item.get("period_start")
+ if ts in om_lookup:
+ item["pv_clearsky"] = om_lookup[ts]
+ elif clipping_clearsky_source == "solcast_api" and self.solcast_api_key:
+ self.log("SolarAPI: Overlaying ClearSky data from Solcast API")
+ sol_data = await self.download_solcast_data()
+ if sol_data:
+ sol_lookup = {item["period_start"]: item.get("pv_clearsky", item.get("pv_estimate90", 0)) for item in sol_data}
+ for item in pv_forecast_data:
+ ts = item.get("period_start")
+ if ts in sol_lookup:
+ item["pv_clearsky"] = sol_lookup[ts]
+ elif clipping_clearsky_source == "ha_solcast_clearsky":
+ self.log("SolarAPI: Overlaying ClearSky data from Home Assistant integration")
+ ha_data = []
+ for argname in ["pv_clearsky_today", "pv_clearsky_tomorrow", "pv_clearsky_d3", "pv_clearsky_d4"]:
+ entity_id = getattr(self, argname, None)
+ if entity_id:
+ data, _, _ = self.fetch_pv_datapoints(argname, entity_id)
+ if data:
+ ha_data += data
+ if ha_data:
+ # Apply factor and divide_by logic like the main PV fetcher
+ ha_lookup = {item["period_start"]: float(item.get("pv_estimate", 0.0)) for item in ha_data}
+ for item in pv_forecast_data:
+ ts = item.get("period_start")
+ if ts in ha_lookup:
+ item["pv_clearsky"] = ha_lookup[ts]
+ elif clipping_clearsky_source == "pv90_scaled":
+ self.log("SolarAPI: Overlaying ClearSky data explicitly disabled, PV90 will be scaled and used as base for clipping mitigation")
+ elif clipping_clearsky_source not in ["auto", "base", ""]:
+ self.log("Warn: SolarAPI: clipping_clearsky_source '{}' not configured properly, using base data".format(clipping_clearsky_source))
+
# Detect the actual period of the forecast data (e.g. 15 or 30 minutes)
# by examining the time difference between consecutive entries.
# This ensures 15-minute resolution data is handled correctly.
@@ -1342,6 +1479,12 @@ async def fetch_pv_forecast(self):
if period != 30:
self.log("SolarAPI: PV Forecast data has {} minute resolution, adjusting calculations".format(period))
+ # Universal fallback for ClearSky data
+ # Ensure every item has a clearsky value before we pass to minute_data
+ for item in pv_forecast_data:
+ if "pv_clearsky" not in item:
+ item["pv_clearsky"] = item.get("pv_estimate90", item.get("pv_estimate", 0.0))
+
pv_forecast_minute, _ = minute_data(
pv_forecast_data,
self.forecast_days,
@@ -1364,11 +1507,33 @@ async def fetch_pv_forecast(self):
scale=self.pv_scaling,
spreading=period,
)
+ pv_forecast_minute90, _ = minute_data(
+ pv_forecast_data,
+ self.forecast_days,
+ self.midnight_utc,
+ "pv_estimate90",
+ "period_start",
+ backwards=False,
+ divide_by=divide_by,
+ scale=self.pv_scaling,
+ spreading=period,
+ )
+ pv_clearsky_minute, _ = minute_data(
+ pv_forecast_data,
+ self.forecast_days,
+ self.midnight_utc,
+ "pv_clearsky",
+ "period_start",
+ backwards=False,
+ divide_by=divide_by,
+ scale=self.pv_scaling,
+ spreading=period,
+ )
# Run calibration on the data
- pv_forecast_minute, pv_forecast_minute10, pv_forecast_data = self.pv_calibration(pv_forecast_minute, pv_forecast_minute10, pv_forecast_data, create_pv10, divide_by / period, max_kwh, self.forecast_days, period)
+ pv_forecast_minute, pv_forecast_minute10, pv_forecast_data, pv_max_minute = self.pv_calibration(pv_forecast_minute, pv_forecast_minute10, pv_forecast_data, create_pv10, divide_by / period, max_kwh, self.forecast_days, period)
self.publish_pv_stats(pv_forecast_data, divide_by / period, period)
- self.pack_and_store_forecast(pv_forecast_minute, pv_forecast_minute10)
+ self.pack_and_store_forecast(pv_forecast_minute, pv_forecast_minute10, pv_forecast_minute90, pv_clearsky_minute, pv_max_minute)
self.update_success_timestamp()
self.last_fetched_timestamp = self.now_utc_exact
else:
diff --git a/apps/predbat/tests/compare_clipping.py b/apps/predbat/tests/compare_clipping.py
new file mode 100644
index 000000000..994e2fc7c
--- /dev/null
+++ b/apps/predbat/tests/compare_clipping.py
@@ -0,0 +1,261 @@
+# -----------------------------------------------------------------------------
+# Clipping Approach Comparison: Cloud-Model Penalty vs Baseline
+#
+# Runs identical scenarios with and without clipping_buffer_enable and compares
+# the optimizer metrics, final SoC, and clipping amounts.
+#
+# Usage: python apps\predbat\tests\compare_clipping.py
+# -----------------------------------------------------------------------------
+# fmt: off
+# pylint: disable=line-too-long
+
+import sys
+import os
+import time
+
+# Add parent dirs to path
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
+
+from tests.test_infra import reset_rates, reset_inverter, simple_scenario
+
+
+# ---------------------------------------------------------------------------
+# Scenario definitions: hand-crafted clipping challenge cases
+# ---------------------------------------------------------------------------
+
+SCENARIOS = [
+ {
+ "name": "Clear Sky Peak Clipping",
+ "description": "7kWp panels, 5kW inverter limit, sunny day. Peak PV exceeds inverter limit. Battery starts at 80%.",
+ "pv_kw": 7.0, # peak generation in kW (exceeds inverter limit)
+ "load_kw": 0.5, # constant household load
+ "inverter_limit_kw": 5.0, # AC inverter limit
+ "export_limit_kw": 10.0, # no export restriction
+ "battery_size_kwh": 9.5, # typical UK hybrid battery
+ "battery_soc_percent": 80, # already mostly charged from overnight
+ "battery_rate_kw": 3.0, # charge/discharge rate
+ "import_rate_p": 25.0, # standard import rate
+ "export_rate_p": 15.0, # SEG export rate
+ "charge_target_percent": 100, # optimizer wants to charge to 100%
+ "hybrid": True, # DC-coupled hybrid inverter
+ },
+ {
+ "name": "Cloudy Day - Intermittent Peaks",
+ "description": "6kWp panels, 3.6kW inverter. Cloud model would show spikes. Battery starts at 90% from overnight charge.",
+ "pv_kw": 6.0,
+ "load_kw": 0.3,
+ "inverter_limit_kw": 3.6,
+ "export_limit_kw": 10.0,
+ "battery_size_kwh": 9.5,
+ "battery_soc_percent": 90,
+ "battery_rate_kw": 2.6,
+ "import_rate_p": 25.0,
+ "export_rate_p": 15.0,
+ "charge_target_percent": 100,
+ "hybrid": True,
+ },
+ {
+ "name": "Export Limited System",
+ "description": "12kWp array, 5kW DNO export limit. Large system hitting export cap, not inverter limit.",
+ "pv_kw": 4.0, # after inverter, 4kW net
+ "load_kw": 0.5,
+ "inverter_limit_kw": 10.0, # large inverter
+ "export_limit_kw": 3.0, # tight DNO export limit
+ "battery_size_kwh": 13.5,
+ "battery_soc_percent": 75,
+ "battery_rate_kw": 3.6,
+ "import_rate_p": 25.0,
+ "export_rate_p": 12.0,
+ "charge_target_percent": 100,
+ "hybrid": False, # AC-coupled
+ },
+ {
+ "name": "Negative Import Rates",
+ "description": "Peak PV + negative import rates. Should the optimizer charge despite clipping risk?",
+ "pv_kw": 6.0,
+ "load_kw": 0.5,
+ "inverter_limit_kw": 5.0,
+ "export_limit_kw": 10.0,
+ "battery_size_kwh": 9.5,
+ "battery_soc_percent": 50,
+ "battery_rate_kw": 3.0,
+ "import_rate_p": -5.0, # NEGATIVE import rate (paid to consume)
+ "export_rate_p": 4.0, # low export rate
+ "charge_target_percent": 100,
+ "hybrid": True,
+ },
+ {
+ "name": "Small Battery, Big Array",
+ "description": "4.8kWh battery with 10kW array and 5kW inverter. Battery fills quickly, lots of clipping.",
+ "pv_kw": 10.0,
+ "load_kw": 0.8,
+ "inverter_limit_kw": 5.0,
+ "export_limit_kw": 10.0,
+ "battery_size_kwh": 4.8,
+ "battery_soc_percent": 60,
+ "battery_rate_kw": 2.4,
+ "import_rate_p": 30.0,
+ "export_rate_p": 15.0,
+ "charge_target_percent": 100,
+ "hybrid": True,
+ },
+ {
+ "name": "No Clipping Risk (Control)",
+ "description": "3kW panels with 5kW inverter. PV never exceeds limit. Penalty should not affect plan.",
+ "pv_kw": 3.0,
+ "load_kw": 0.5,
+ "inverter_limit_kw": 5.0,
+ "export_limit_kw": 10.0,
+ "battery_size_kwh": 9.5,
+ "battery_soc_percent": 50,
+ "battery_rate_kw": 3.0,
+ "import_rate_p": 25.0,
+ "export_rate_p": 15.0,
+ "charge_target_percent": 100,
+ "hybrid": True,
+ },
+]
+
+
+def run_comparison(my_predbat):
+ """Run all clipping scenarios with and without the penalty, and compare."""
+
+ print("\n" + "=" * 100)
+ print("CLIPPING APPROACH COMPARISON: Cloud-Model Penalty vs Baseline")
+ print("=" * 100)
+
+ results = []
+
+ for scenario in SCENARIOS:
+ print("\n" + "-" * 80)
+ print("Scenario: {}".format(scenario["name"]))
+ print(" {}".format(scenario["description"]))
+ print("-" * 80)
+
+ # Common setup
+ reset_inverter(my_predbat)
+ reset_rates(my_predbat, scenario["import_rate_p"], scenario["export_rate_p"])
+
+ common_args = {
+ "pv_amount": scenario["pv_kw"],
+ "load_amount": scenario["load_kw"],
+ "inverter_limit": scenario["inverter_limit_kw"],
+ "export_limit": scenario["export_limit_kw"],
+ "battery_size": scenario["battery_size_kwh"],
+ "battery_soc": scenario["battery_size_kwh"] * scenario["battery_soc_percent"] / 100.0,
+ "battery_rate_max_charge": scenario["battery_rate_kw"],
+ "with_battery": True,
+ "hybrid": scenario["hybrid"],
+ "charge": scenario["battery_size_kwh"] * scenario["charge_target_percent"] / 100.0,
+ "save": "best",
+ "return_prediction_handle": True,
+ "ignore_failed": True,
+ "quiet": True,
+ }
+
+ # --- Run A: Baseline (no clipping penalty) ---
+ t_start = time.perf_counter()
+ failed_a, pred_a = simple_scenario(
+ scenario["name"] + " [baseline]",
+ my_predbat,
+ assert_final_metric=0,
+ assert_final_soc=0,
+ clipping_buffer_enable=False,
+ **common_args,
+ )
+ time_a = time.perf_counter() - t_start
+
+ # --- Run B: With clipping penalty ---
+ reset_inverter(my_predbat)
+ reset_rates(my_predbat, scenario["import_rate_p"], scenario["export_rate_p"])
+
+ t_start = time.perf_counter()
+ failed_b, pred_b = simple_scenario(
+ scenario["name"] + " [penalty]",
+ my_predbat,
+ assert_final_metric=0,
+ assert_final_soc=0,
+ clipping_buffer_enable=True,
+ clipping_cost_weight=1.0,
+ clipping_amplification=1.0,
+ **common_args,
+ )
+ time_b = time.perf_counter() - t_start
+
+ # Extract results
+ metric_a = round(pred_a.predict_metric_best[max(pred_a.predict_metric_best.keys())] / 100.0, 4) if pred_a.predict_metric_best else 0
+ metric_b = round(pred_b.predict_metric_best[max(pred_b.predict_metric_best.keys())] / 100.0, 4) if pred_b.predict_metric_best else 0
+ soc_a = round(list(pred_a.predict_soc.values())[-1], 2) if pred_a.predict_soc else 0
+ soc_b = round(list(pred_b.predict_soc.values())[-1], 2) if pred_b.predict_soc else 0
+ clipped_a = round(pred_a.predict_clipped_best[max(pred_a.predict_clipped_best.keys())], 2) if pred_a.predict_clipped_best else 0
+ clipped_b = round(pred_b.predict_clipped_best[max(pred_b.predict_clipped_best.keys())], 2) if pred_b.predict_clipped_best else 0
+
+ result = {
+ "name": scenario["name"],
+ "metric_baseline": metric_a,
+ "metric_penalty": metric_b,
+ "metric_diff": round(metric_b - metric_a, 4),
+ "soc_baseline": soc_a,
+ "soc_penalty": soc_b,
+ "clipped_baseline": clipped_a,
+ "clipped_penalty": clipped_b,
+ "time_baseline": round(time_a, 3),
+ "time_penalty": round(time_b, 3),
+ }
+ results.append(result)
+
+ print(" {:>20}: {:>10} {:>10} {:>10}".format("", "Baseline", "Penalty", "Diff"))
+ print(" {:>20}: {:>10.4f} {:>10.4f} {:>+10.4f}".format("Metric (£)", metric_a, metric_b, metric_b - metric_a))
+ print(" {:>20}: {:>10.2f} {:>10.2f} {:>+10.2f}".format("Final SoC (kWh)", soc_a, soc_b, soc_b - soc_a))
+ print(" {:>20}: {:>10.2f} {:>10.2f} {:>+10.2f}".format("Clipped (kWh)", clipped_a, clipped_b, clipped_b - clipped_a))
+ print(" {:>20}: {:>10.3f}s {:>9.3f}s {:>+9.3f}s".format("Runtime", time_a, time_b, time_b - time_a))
+
+ # Summary table
+ print("\n" + "=" * 100)
+ print("SUMMARY TABLE")
+ print("=" * 100)
+ header = "{:<35} {:>10} {:>10} {:>10} {:>10} {:>10} {:>10}".format(
+ "Scenario", "Met_Base", "Met_Pen", "Met_Diff", "Clip_Base", "Clip_Pen", "Time_Pen"
+ )
+ print(header)
+ print("-" * len(header))
+ for r in results:
+ print("{:<35} {:>10.4f} {:>10.4f} {:>+10.4f} {:>10.2f} {:>10.2f} {:>9.3f}s".format(
+ r["name"][:35],
+ r["metric_baseline"],
+ r["metric_penalty"],
+ r["metric_diff"],
+ r["clipped_baseline"],
+ r["clipped_penalty"],
+ r["time_penalty"],
+ ))
+
+ # Analysis
+ print("\n" + "=" * 100)
+ print("ANALYSIS")
+ print("=" * 100)
+ penalty_scenarios = [r for r in results if r["metric_diff"] > 0.001]
+ neutral_scenarios = [r for r in results if abs(r["metric_diff"]) <= 0.001]
+
+ print("Scenarios where penalty changes metric: {} / {}".format(len(penalty_scenarios), len(results)))
+ print("Scenarios where penalty is neutral: {} / {}".format(len(neutral_scenarios), len(results)))
+
+ if penalty_scenarios:
+ avg_diff = sum(r["metric_diff"] for r in penalty_scenarios) / len(penalty_scenarios)
+ print("Average metric increase (penalty scenarios): £{:+.4f}".format(avg_diff))
+ print(" -> This represents the clipping cost the optimizer now accounts for.")
+ print(" -> In a full optimizer run, this would cause it to reduce charge targets.")
+
+ avg_time_diff = sum(r["time_penalty"] - r["time_baseline"] for r in results) / len(results)
+ print("\nAverage compute time overhead: {:+.3f}s".format(avg_time_diff))
+
+ return results
+
+
+if __name__ == "__main__":
+ # Bootstrap a minimal predbat instance for testing
+ # Run from project root: python apps\predbat\tests\compare_clipping.py
+ from unit_test import create_predbat
+ my_predbat = create_predbat()
+ run_comparison(my_predbat)
diff --git a/apps/predbat/tests/test_clipping.py b/apps/predbat/tests/test_clipping.py
new file mode 100644
index 000000000..d42f9289d
--- /dev/null
+++ b/apps/predbat/tests/test_clipping.py
@@ -0,0 +1,141 @@
+# -----------------------------------------------------------------------------
+# Predbat Home Battery System
+# Copyright Trefor Southwell 2026 - All Rights Reserved
+# This application maybe used for personal use only and not for commercial use
+# -----------------------------------------------------------------------------
+# fmt off
+# pylint: disable=consider-using-f-string
+# pylint: disable=line-too-long
+# pylint: disable=attribute-defined-outside-init
+from tests.test_infra import reset_inverter
+
+
+def run_clipping_tests(my_predbat):
+ """
+ Tests for inject_clipping_export_windows method
+ """
+ failed = False
+ failed |= test_inject_aborts_if_disabled(my_predbat)
+ failed |= test_inject_aborts_if_empty_forecast(my_predbat)
+ failed |= test_inject_creates_contiguous_window(my_predbat)
+ failed |= test_inject_cleans_fragmented_windows(my_predbat)
+ return failed
+
+
+def setup(my_predbat):
+ reset_inverter(my_predbat)
+ my_predbat.clipping_buffer_enable = True
+ my_predbat.clipping_buffer_forecast_kwh = {}
+ my_predbat.minutes_now = 0
+ my_predbat.export_rate = {}
+ my_predbat.export_window_best = []
+ my_predbat.high_export_rates = []
+ # Adding log mock to avoid exceptions if not using MockBase
+ if not hasattr(my_predbat, "log"):
+ my_predbat.log = lambda x: print(x)
+ if not hasattr(my_predbat, "time_abs_str"):
+ my_predbat.time_abs_str = lambda x: str(x)
+
+
+def test_inject_aborts_if_disabled(my_predbat):
+ print("**** test_inject_aborts_if_disabled ****")
+ failed = False
+ setup(my_predbat)
+ my_predbat.clipping_buffer_enable = False
+ my_predbat.clipping_buffer_forecast_kwh = {720: 1.0}
+
+ my_predbat.inject_clipping_export_windows()
+
+ if len(my_predbat.export_window_best) > 0:
+ print("ERROR: Window was injected when clipping was disabled!")
+ failed = True
+
+ if not failed:
+ print("PASS")
+ return failed
+
+
+def test_inject_aborts_if_empty_forecast(my_predbat):
+ print("**** test_inject_aborts_if_empty_forecast ****")
+ failed = False
+ setup(my_predbat)
+ my_predbat.clipping_buffer_enable = True
+ my_predbat.clipping_buffer_forecast_kwh = {}
+
+ my_predbat.inject_clipping_export_windows()
+
+ if len(my_predbat.export_window_best) > 0:
+ print("ERROR: Window was injected when forecast was empty!")
+ failed = True
+
+ if not failed:
+ print("PASS")
+ return failed
+
+
+def test_inject_creates_contiguous_window(my_predbat):
+ print("**** test_inject_creates_contiguous_window ****")
+ failed = False
+ setup(my_predbat)
+ my_predbat.minutes_now = 240 # 04:00
+ # Peak from 13:00 to 14:00 (780 to 840)
+ my_predbat.clipping_buffer_forecast_kwh = {780: 1.0, 810: 2.0}
+
+ my_predbat.inject_clipping_export_windows()
+
+ if len(my_predbat.export_window_best) != 1:
+ print("ERROR: Expected exactly 1 window injected, got {}".format(len(my_predbat.export_window_best)))
+ return True
+
+ w = my_predbat.export_window_best[0]
+
+ if w["start"] != 240:
+ print("ERROR: Expected window start at 240, got {}".format(w["start"]))
+ failed = True
+
+ if w["end"] != 840:
+ print("ERROR: Expected window end at 840, got {}".format(w["end"]))
+ failed = True
+
+ if not failed:
+ print("PASS")
+ return failed
+
+
+def test_inject_cleans_fragmented_windows(my_predbat):
+ print("**** test_inject_cleans_fragmented_windows ****")
+ failed = False
+ setup(my_predbat)
+ my_predbat.minutes_now = 0
+ # Peak from 780 to 810
+ my_predbat.clipping_buffer_forecast_kwh = {
+ 780: 1.0,
+ }
+
+ # Inject intersecting fragmented windows
+ my_predbat.export_window_best = [
+ {"start": 30, "end": 50, "average": 0}, # Before morning_start (60) - should KEEP
+ {"start": 100, "end": 120, "average": 0}, # Inside the new window - should DROP
+ {"start": 700, "end": 800, "average": 0}, # Intersecting the new window - should DROP
+ {"start": 900, "end": 960, "average": 0}, # After peak_end - should KEEP
+ ]
+
+ my_predbat.inject_clipping_export_windows()
+
+ # We expect 3 windows: 2 kept + 1 injected
+ if len(my_predbat.export_window_best) != 3:
+ print("ERROR: Expected 3 windows (2 kept + 1 new), got {}".format(len(my_predbat.export_window_best)))
+ return True
+
+ starts = [w["start"] for w in my_predbat.export_window_best]
+ if 100 in starts or 700 in starts:
+ print("ERROR: Fragmented windows were not cleaned!")
+ failed = True
+
+ if 60 not in starts: # Injected window start
+ print("ERROR: Injected window not found!")
+ failed = True
+
+ if not failed:
+ print("PASS")
+ return failed
diff --git a/apps/predbat/tests/test_download.py b/apps/predbat/tests/test_download.py
index de386381c..6e6549959 100644
--- a/apps/predbat/tests/test_download.py
+++ b/apps/predbat/tests/test_download.py
@@ -257,8 +257,8 @@ def _test_compute_file_sha1(my_predbat):
Test Git blob SHA1 hash computation (matches GitHub's SHA)
"""
# Create a temporary file with known content
- with tempfile.NamedTemporaryFile(mode="w", delete=False) as f:
- f.write("test content\n")
+ with tempfile.NamedTemporaryFile(mode="wb", delete=False) as f:
+ f.write(b"test content\n")
temp_path = f.name
try:
diff --git a/apps/predbat/tests/test_fetch_pv_forecast.py b/apps/predbat/tests/test_fetch_pv_forecast.py
index a257150df..c33bcd67a 100644
--- a/apps/predbat/tests/test_fetch_pv_forecast.py
+++ b/apps/predbat/tests/test_fetch_pv_forecast.py
@@ -107,7 +107,7 @@ def test_fetch_pv_forecast_with_relative_time():
)
# Call fetch_pv_forecast
- pv_forecast_minute, pv_forecast_minute10 = fetch.fetch_pv_forecast()
+ pv_forecast_minute, pv_forecast_minute10, _, _, _ = fetch.fetch_pv_forecast()
# With the corrected formula (target = stored_minute - offset), a +120 min offset
# (relative_time is 2 hours before midnight_utc) shifts data BACK by 120 minutes:
@@ -171,7 +171,7 @@ def test_fetch_pv_forecast_no_relative_time():
)
# Call fetch_pv_forecast
- pv_forecast_minute, pv_forecast_minute10 = fetch.fetch_pv_forecast()
+ pv_forecast_minute, pv_forecast_minute10, _, _, _ = fetch.fetch_pv_forecast()
# With no relative_time, it should fall back to midnight_utc
# minute_offset = 0, so forecast data should map directly
@@ -226,7 +226,7 @@ def test_fetch_pv_forecast_invalid_relative_time():
)
# Call fetch_pv_forecast
- pv_forecast_minute, pv_forecast_minute10 = fetch.fetch_pv_forecast()
+ pv_forecast_minute, pv_forecast_minute10, _, _, _ = fetch.fetch_pv_forecast()
# With invalid relative_time, it should fall back to midnight_utc
# minute_offset = 0
@@ -279,7 +279,7 @@ def test_fetch_pv_forecast_relative_time_same_as_midnight():
)
# Call fetch_pv_forecast
- pv_forecast_minute, pv_forecast_minute10 = fetch.fetch_pv_forecast()
+ pv_forecast_minute, pv_forecast_minute10, _, _, _ = fetch.fetch_pv_forecast()
# minute_offset = 0 (same time), so data maps directly
assert pv_forecast_minute[0] == 0.0, f"Expected minute 0 to be 0.0, got {pv_forecast_minute[0]}"
@@ -339,7 +339,7 @@ def test_fetch_pv_forecast_previous_day():
)
# Call fetch_pv_forecast
- pv_forecast_minute, pv_forecast_minute10 = fetch.fetch_pv_forecast()
+ pv_forecast_minute, pv_forecast_minute10, _, _, _ = fetch.fetch_pv_forecast()
# With the corrected formula (target = stored_minute - offset), a +1440 min offset
# maps yesterday's stored minutes to today-relative minutes:
@@ -408,7 +408,7 @@ def test_fetch_pv_forecast_negative_offset():
)
# Call fetch_pv_forecast
- pv_forecast_minute, pv_forecast_minute10 = fetch.fetch_pv_forecast()
+ pv_forecast_minute, pv_forecast_minute10, _, _, _ = fetch.fetch_pv_forecast()
# With the corrected formula (target = stored_minute - offset), a -60 min offset
# (relative_time is 1 hour AFTER midnight_utc) shifts data FORWARD by 60 minutes:
diff --git a/apps/predbat/tests/test_infra.py b/apps/predbat/tests/test_infra.py
index 715b04ea0..dde8636b3 100644
--- a/apps/predbat/tests/test_infra.py
+++ b/apps/predbat/tests/test_infra.py
@@ -354,6 +354,10 @@ def get_default_config(self):
"load_filter_modal": True,
"carbon_enable": False,
"carbon_metric": 0,
+ "clipping_buffer_enable": False,
+ "clipping_cost_weight": 1.0,
+ "clipping_amplification": 1.0,
+ "clipping_limit_override": 0,
"iboost_enable": False,
"iboost_gas": 4.0,
"iboost_gas_export": 4.0,
@@ -517,6 +521,10 @@ def reset_inverter(my_predbat):
my_predbat.set_export_window = True
my_predbat.set_charge_freeze = True
my_predbat.set_export_freeze = True
+ my_predbat.clipping_buffer_enable = False
+ my_predbat.clipping_cost_weight = 0
+ my_predbat.clipping_amplification = 1.0
+ my_predbat.clipping_limit_override = 0
def plot(name, prediction):
@@ -609,6 +617,10 @@ def simple_scenario(
calculate_export_on_pv=True,
assert_clipped=0,
pv_ac_limit=0,
+ clipping_buffer_enable=False,
+ clipping_cost_weight=0,
+ clipping_amplification=1.0,
+ clipping_limit_override=0,
):
"""
No PV, No Load
@@ -706,6 +718,12 @@ def simple_scenario(
my_predbat.inverter_can_charge_during_export = inverter_can_charge_during_export
my_predbat.charge_scaling10 = charge_scaling10
+ # Clipping peak penalty settings
+ my_predbat.clipping_buffer_enable = clipping_buffer_enable
+ my_predbat.clipping_cost_weight = clipping_cost_weight
+ my_predbat.clipping_amplification = clipping_amplification
+ my_predbat.clipping_limit_override = clipping_limit_override
+
if my_predbat.iboost_enable and (((not iboost_solar) and (not iboost_charging)) or iboost_smart):
my_predbat.iboost_plan = my_predbat.plan_iboost_smart()
else:
@@ -741,10 +759,36 @@ def simple_scenario(
my_predbat.num_cars = 0
my_predbat.car_charging_slots[0] = []
+ # Build peak PV step data for clipping tests
+ pv_peak_step = None
+ clipping_limit_eff = 0
+ if clipping_buffer_enable:
+ pv_peak_step = {k: v * clipping_amplification for k, v in pv_step.items()}
+ if clipping_limit_override > 0:
+ clipping_limit_eff = clipping_limit_override
+ else:
+ limits = []
+ if my_predbat.inverter_limit > 0:
+ limits.append(my_predbat.inverter_limit)
+ if my_predbat.export_limit > 0:
+ limits.append(my_predbat.export_limit)
+ if my_predbat.pv_ac_limit > 0:
+ limits.append(my_predbat.pv_ac_limit)
+ clipping_limit_eff = (min(limits) * 60.0) if limits else 0
+
if prediction_handle:
prediction = prediction_handle
else:
- prediction = Prediction(my_predbat, pv_step, pv10_step, load_step, load10_step)
+ prediction = Prediction(
+ my_predbat,
+ pv_step,
+ pv10_step,
+ load_step,
+ load10_step,
+ pv_forecast_peak_step=pv_peak_step,
+ clipping_limit=clipping_limit_eff,
+ clipping_cost_weight=clipping_cost_weight if clipping_buffer_enable else 0,
+ )
compute_charge_limit = False
if charge_limit_best is None:
diff --git a/apps/predbat/tests/test_load_ml.py b/apps/predbat/tests/test_load_ml.py
index c8eea6dd7..686a4b130 100644
--- a/apps/predbat/tests/test_load_ml.py
+++ b/apps/predbat/tests/test_load_ml.py
@@ -1854,7 +1854,7 @@ def get_state_wrapper(self, entity_id, default=None, attribute=None, refresh=Fal
def fetch_pv_forecast(self):
"""Mock fetch_pv_forecast - returns empty forecasts"""
- return {}, {}
+ return {}, {}, {}, {}, {}
def minute_data_import_export(self, days, now_utc, entity, scale=1.0, increment=False, smoothing=False, required_unit=None):
"""Mock minute_data_import_export - returns empty dict"""
diff --git a/apps/predbat/tests/test_model.py b/apps/predbat/tests/test_model.py
index ee362c001..26062c734 100644
--- a/apps/predbat/tests/test_model.py
+++ b/apps/predbat/tests/test_model.py
@@ -1892,6 +1892,84 @@ def run_model_tests(my_predbat):
# pv_ac_limit must NOT apply to hybrid inverters (PV is DC-coupled, clipping handled by inverter_limit)
failed |= simple_scenario("pv_ac_limit_hybrid_ignored", my_predbat, 0, 2.0, assert_final_metric=-export_rate * 24, assert_final_soc=24, with_battery=True, hybrid=True, pv_ac_limit=1.5, assert_clipped=0)
+ # ---- Clipping Peak Cost Penalty Tests ----
+ reset_rates(my_predbat, import_rate, export_rate)
+ reset_inverter(my_predbat)
+
+ # No penalty when peak PV is below the clipping limit
+ # 0.5kW PV, 1kW inverter limit => no clipping, metric is just export revenue
+ failed |= simple_scenario(
+ "clipping_peak_no_clip",
+ my_predbat,
+ 0,
+ 0.5,
+ assert_final_metric=-export_rate * 24 * 0.5,
+ assert_final_soc=0,
+ with_battery=False,
+ inverter_limit=1.0,
+ clipping_buffer_enable=True,
+ clipping_cost_weight=1.0,
+ )
+
+ # Penalty when peak PV exceeds inverter limit and no battery to absorb
+ # 2kW PV, 1kW inverter limit, no battery => with_battery=False means battery_rate_max_scaling=0
+ # AC-coupled PV isn't clipped by inverter_limit, so full 2kW is exported
+ # Without clipping penalty: metric = -export_rate * 24 * 2 = -240p
+ # With clipping penalty: extra cost added for the 1kW excess above inverter_limit
+ failed_no_penalty, pred_no_penalty = simple_scenario(
+ "clipping_peak_baseline",
+ my_predbat,
+ 0,
+ 2.0,
+ assert_final_metric=-export_rate * 24 * 2.0,
+ assert_final_soc=0,
+ with_battery=False,
+ inverter_limit=1.0,
+ clipping_buffer_enable=False,
+ return_prediction_handle=True,
+ )
+ failed |= failed_no_penalty
+
+ failed_with_penalty, pred_with_penalty = simple_scenario(
+ "clipping_peak_with_penalty",
+ my_predbat,
+ 0,
+ 2.0,
+ assert_final_metric=-export_rate * 24 * 2.0, # will differ due to penalty; checked below
+ assert_final_soc=0,
+ with_battery=False,
+ inverter_limit=1.0,
+ clipping_buffer_enable=True,
+ clipping_cost_weight=1.0,
+ return_prediction_handle=True,
+ ignore_failed=True,
+ )
+ # The penalty should make the metric less negative (higher) than without
+ metric_no_penalty = round(pred_no_penalty.predict_metric_best[max(pred_no_penalty.predict_metric_best.keys())] / 100.0, 2) if pred_no_penalty.predict_metric_best else 0
+ metric_with_penalty = round(pred_with_penalty.predict_metric_best[max(pred_with_penalty.predict_metric_best.keys())] / 100.0, 2) if pred_with_penalty.predict_metric_best else 0
+ if metric_with_penalty <= metric_no_penalty:
+ print("ERROR: clipping_peak_with_penalty metric {} should be > {} (penalty should increase metric)".format(metric_with_penalty, metric_no_penalty))
+ failed = True
+ else:
+ print("Run scenario clipping_peak_with_penalty: PASS (metric {} > baseline {})".format(metric_with_penalty, metric_no_penalty))
+
+ # No penalty when battery has headroom to absorb excess
+ # 2kW PV, 1kW inverter limit, but battery at 0% with 100kWh capacity => battery absorbs all excess
+ failed |= simple_scenario(
+ "clipping_peak_battery_absorbs",
+ my_predbat,
+ 0,
+ 2.0,
+ assert_final_metric=-export_rate * 24,
+ assert_final_soc=24,
+ with_battery=True,
+ battery_soc=0.0,
+ battery_size=100.0,
+ inverter_limit=1.0,
+ clipping_buffer_enable=True,
+ clipping_cost_weight=1.0,
+ )
+
if failed:
print("**** ERROR: Some Model tests failed ****")
return failed
diff --git a/apps/predbat/tests/test_open_meteo.py b/apps/predbat/tests/test_open_meteo.py
index fb9bec7c6..10e7da779 100644
--- a/apps/predbat/tests/test_open_meteo.py
+++ b/apps/predbat/tests/test_open_meteo.py
@@ -76,17 +76,17 @@ def create_mock_session(*args, **kwargs):
kwp = 3.0
system_loss = 0.0 # simplify: 0% loss so kW = GTI_kWm2 * kwp
with patch("solcast.aiohttp.ClientSession", side_effect=create_mock_session):
- result = run_async(test_api.solar.download_open_meteo_ensemble_data(51.5, -0.1, 35, 0, kwp, system_loss))
+ result_p10, result_p90 = run_async(test_api.solar.download_open_meteo_ensemble_data(51.5, -0.1, 35, 0, kwp, system_loss))
# For 3 members at 2025-06-15T12:00: [400, 450, 480] sorted
# p10_idx = max(0, int(3 * 0.1) - 1) = 0 -> gti_p10 = 400
# kW = (400 / 1000) * 3.0 * (1 - 0.0) = 1.2
expected_12 = round((400.0 / 1000.0) * kwp * (1.0 - system_loss), 4)
- if "2025-06-15T12:00" not in result:
- print("ERROR: Expected key '2025-06-15T12:00' in ensemble result")
+ if "2025-06-15T12:00" not in result_p10:
+ print("ERROR: Expected key '2025-06-15T12:00' in ensemble result_p10")
failed = True
- elif abs(result["2025-06-15T12:00"] - expected_12) > 0.001:
- print(f"ERROR: ensemble p10 at 12:00: expected {expected_12}, got {result['2025-06-15T12:00']}")
+ elif abs(result_p10["2025-06-15T12:00"] - expected_12) > 0.001:
+ print(f"ERROR: ensemble p10 at 12:00: expected {expected_12}, got {result_p10['2025-06-15T12:00']}")
failed = True
finally:
test_api.cleanup()
@@ -114,8 +114,8 @@ def create_mock_session(*args, **kwargs):
with patch("solcast.aiohttp.ClientSession", side_effect=create_mock_session):
result = run_async(test_api.solar.download_open_meteo_ensemble_data(51.5, -0.1, 35, 0, 3.0, 0.14))
- if result != {}:
- print(f"ERROR: Expected empty dict, got {result}")
+ if result != ({}, {}):
+ print(f"ERROR: Expected empty dict tuple, got {result}")
failed = True
finally:
test_api.cleanup()
@@ -141,8 +141,8 @@ def create_mock_session(*args, **kwargs):
with patch("solcast.aiohttp.ClientSession", side_effect=create_mock_session):
result = run_async(test_api.solar.download_open_meteo_ensemble_data(51.5, -0.1, 35, 0, 3.0, 0.14))
- if result != {}:
- print(f"ERROR: Expected empty dict on HTTP failure, got {result}")
+ if result != ({}, {}):
+ print(f"ERROR: Expected empty dict tuple on HTTP failure, got {result}")
failed = True
finally:
test_api.cleanup()
diff --git a/apps/predbat/tests/test_solcast.py b/apps/predbat/tests/test_solcast.py
index aac1440a8..a05902e0f 100644
--- a/apps/predbat/tests/test_solcast.py
+++ b/apps/predbat/tests/test_solcast.py
@@ -172,6 +172,10 @@ def __init__(self):
pv_forecast_tomorrow=None,
pv_forecast_d3=None,
pv_forecast_d4=None,
+ pv_clearsky_today=None,
+ pv_clearsky_tomorrow=None,
+ pv_clearsky_d3=None,
+ pv_clearsky_d4=None,
pv_scaling=1.0,
open_meteo_forecast=None,
open_meteo_forecast_max_age=1.0,
@@ -2355,7 +2359,7 @@ def mock_minute_data_import_export(max_days_previous, now_utc, key, scale=1.0, r
pv_forecast_minute10 = {m: 0.04 for m in range(total_minutes)}
pv_forecast_data = [{"period_start": base.midnight_utc.strftime("%Y-%m-%dT%H:%M:%S+0000"), "pv_estimate": 0.05}]
- adj_minute, adj_minute10, adj_data = solar.pv_calibration(pv_forecast_minute, pv_forecast_minute10, pv_forecast_data, create_pv10=False, divide_by=1.0, max_kwh=5.0, forecast_days=solar.forecast_days)
+ adj_minute, adj_minute10, adj_data, _ = solar.pv_calibration(pv_forecast_minute, pv_forecast_minute10, pv_forecast_data, create_pv10=False, divide_by=1.0, max_kwh=5.0, forecast_days=solar.forecast_days)
# Returned minute data must be non-negative
if any(v < 0 for v in adj_minute.values()):
@@ -2575,7 +2579,7 @@ def mock_minute_data_import_export(max_days_previous, now_utc, key, scale=1.0, r
pv_forecast_data.append({"period_start": ts.strftime("%Y-%m-%dT%H:%M:%S+0000"), "pv_estimate": 3.0 * plan_interval / 60})
max_kwh = 2.0 # panel peak output cap in kW
- adj_minute, adj_minute10, adj_data = solar.pv_calibration(pv_forecast_minute, pv_forecast_minute10, pv_forecast_data, create_pv10=False, divide_by=1.0, max_kwh=max_kwh, forecast_days=solar.forecast_days)
+ adj_minute, adj_minute10, adj_data, _ = solar.pv_calibration(pv_forecast_minute, pv_forecast_minute10, pv_forecast_data, create_pv10=False, divide_by=1.0, max_kwh=max_kwh, forecast_days=solar.forecast_days)
# capped_data = min(max(max_pv_power_hist, max_pv_power_forecast), max_kwh) * plan_interval / 60
# max_pv_power_hist ≈ 1 kW (per minute), max_pv_power_forecast ≈ 3/60 kW per minute
@@ -2764,7 +2768,7 @@ def mock_minute_import_export(max_days_prev, now_utc, key, scale=1.0, required_u
# synthetic pv_forecast dict without going through the real h0 pipeline
# (which relies on now_utc_exact returning the mocked time).
with patch("solcast.history_attribute_to_minute_data", return_value=(pv_forecast_hist, days_back)):
- adj_m, adj_m10, adj_data = solar.pv_calibration(pv_m, pv_m10, pv_data, create_pv10=True, divide_by=1.0, max_kwh=5.0, forecast_days=solar.forecast_days)
+ adj_m, adj_m10, adj_data, _ = solar.pv_calibration(pv_m, pv_m10, pv_data, create_pv10=True, divide_by=1.0, max_kwh=5.0, forecast_days=solar.forecast_days)
result = {
"total_adj": solar.pv_calibration_total_adjustment,
"avg_scaling": getattr(solar, "pv_calibration_average_scaling", None),
@@ -2973,7 +2977,7 @@ def mock_minute_import_export(max_days_prev, now_utc, key, scale=1.0, required_u
pv_forecast_hist[minutes_ago] = float(FORECAST_KW)
with patch("solcast.history_attribute_to_minute_data", return_value=(pv_forecast_hist, days)):
- adj_m, adj_m10, adj_data = solar.pv_calibration(pv_m, pv_m10, pv_data, create_pv10=True, divide_by=divide_by_factor, max_kwh=10.0, forecast_days=solar.forecast_days, period=FORECAST_PERIOD)
+ adj_m, adj_m10, adj_data, _ = solar.pv_calibration(pv_m, pv_m10, pv_data, create_pv10=True, divide_by=divide_by_factor, max_kwh=10.0, forecast_days=solar.forecast_days, period=FORECAST_PERIOD)
# Each annotated entry should cover the full FORECAST_PERIOD minutes.
# Expected calibrated kWh per entry ≈ FORECAST_KW * FORECAST_PERIOD / 60 = 2.0 kWh.
@@ -3096,7 +3100,7 @@ def mock_minute_import_export(max_days_prev, now_utc, key, scale=1.0, required_u
pv_forecast_hist[minutes_ago] = float(FORECAST_KW)
with patch("solcast.history_attribute_to_minute_data", return_value=(pv_forecast_hist, days)):
- adj_m, adj_m10, adj_data = solar.pv_calibration(pv_m, pv_m10, pv_data, create_pv10=True, divide_by=divide_by_factor, max_kwh=10.0, forecast_days=solar.forecast_days, period=FORECAST_PERIOD)
+ adj_m, adj_m10, adj_data, _ = solar.pv_calibration(pv_m, pv_m10, pv_data, create_pv10=True, divide_by=divide_by_factor, max_kwh=10.0, forecast_days=solar.forecast_days, period=FORECAST_PERIOD)
# Each 15-min entry should be annotated with the single 30-min plan slot that
# starts at the entry timestamp. slots_per_period=max(1,round(15/30))=1, so
diff --git a/apps/predbat/unit_test.py b/apps/predbat/unit_test.py
index f5b33471a..2e6211692 100644
--- a/apps/predbat/unit_test.py
+++ b/apps/predbat/unit_test.py
@@ -121,6 +121,7 @@
from tests.test_band_rate_text import test_band_rate_text
from tests.test_kraken import run_kraken_tests
from tests.test_kraken_auth_mixin import run_kraken_auth_mixin_tests
+from tests.test_clipping import run_clipping_tests
from tests.test_clip_export_slots import run_clip_export_slots_tests
from tests.test_clip_charge_slots import run_clip_charge_slots_tests
from tests.test_discard_unused_charge_slots import run_discard_unused_charge_slots_tests
@@ -307,6 +308,7 @@ def main():
# Kraken Energy (EDF/E.ON) tests
("kraken", run_kraken_tests, "Kraken API tests (init, GraphQL, tariff discovery, rate fetching, run lifecycle)", False),
("kraken_auth", run_kraken_auth_mixin_tests, "Kraken auth mixin tests (API key, email, refresh, 401 handling)", False),
+ ("clipping", run_clipping_tests, "Clipping logic tests", False),
("clip_export_slots", run_clip_export_slots_tests, "Clip export slots tests", False),
("clip_charge_slots", run_clip_charge_slots_tests, "Clip charge slots tests", False),
("discard_unused_charge_slots", run_discard_unused_charge_slots_tests, "Discard unused charge slots tests", False),
diff --git a/apps/predbat/web.py b/apps/predbat/web.py
index 411e29209..911eaa9bb 100644
--- a/apps/predbat/web.py
+++ b/apps/predbat/web.py
@@ -1564,7 +1564,7 @@ def get_chart_series(self, name, results, chart_type, color):
text += " }\n"
return text
- def render_chart(self, series_data, yaxis_name, chart_name, now_str, tagname="chart", daily_chart=True, extra_yaxis=None):
+ def render_chart(self, series_data, yaxis_name, chart_name, now_str, tagname="chart", daily_chart=True, extra_yaxis=None, xaxis_annotations=None, yaxis_annotations=None):
"""
Render a chart
"""
@@ -1634,6 +1634,7 @@ def render_chart(self, series_data, yaxis_name, chart_name, now_str, tagname="ch
opacity = []
stroke_width = []
stroke_curve = []
+ stroke_dasharray = []
series_units = []
for series in series_data:
name = series.get("name")
@@ -1641,6 +1642,7 @@ def render_chart(self, series_data, yaxis_name, chart_name, now_str, tagname="ch
opacity_value = series.get("opacity", "1.0")
stroke_width_value = series.get("stroke_width", "1")
stroke_curve_value = series.get("stroke_curve", "smooth")
+ stroke_dasharray_value = series.get("stroke_dasharray", "0")
chart_type = series.get("chart_type", "line")
color = series.get("color", "")
unit_name = series.get("unit", yaxis_name) or ""
@@ -1653,6 +1655,7 @@ def render_chart(self, series_data, yaxis_name, chart_name, now_str, tagname="ch
opacity.append(opacity_value)
stroke_width.append(stroke_width_value)
stroke_curve.append("'{}'".format(stroke_curve_value))
+ stroke_dasharray.append(stroke_dasharray_value)
series_units.append(unit_name)
units_array = ",".join("'{}'".format(unit) for unit in series_units)
@@ -1664,6 +1667,7 @@ def render_chart(self, series_data, yaxis_name, chart_name, now_str, tagname="ch
text += " stroke: {\n"
text += " width: [{}],\n".format(",".join(stroke_width))
text += " curve: [{}],\n".format(",".join(stroke_curve))
+ text += " dashArray: [{}],\n".format(",".join(stroke_dasharray))
text += " },\n"
text += " xaxis: {\n"
text += " type: 'datetime',\n"
@@ -1744,6 +1748,22 @@ def render_chart(self, series_data, yaxis_name, chart_name, now_str, tagname="ch
text += " },\n"
text += " annotations: {\n"
text += " xaxis: [\n"
+ if xaxis_annotations:
+ for item in xaxis_annotations:
+ text += " {\n"
+ text += " x: new Date('{}').getTime(),\n".format(item.get("x", ""))
+ text += " borderColor: '{}',\n".format(item.get("borderColor", "#000000"))
+ text += " strokeDashArray: {},\n".format(item.get("strokeDashArray", 0))
+ text += " textAnchor: '{}',\n".format(item.get("textAnchor", "middle"))
+ text += " label: {\n"
+ text += " text: '{}',\n".format(item.get("text", ""))
+ text += " orientation: '{}',\n".format(item.get("orientation", "horizontal"))
+ text += " style: {\n"
+ text += " color: '{}',\n".format(item.get("textColor", "#fff"))
+ text += " background: '{}',\n".format(item.get("backgroundColor", "#775DD0"))
+ text += " }\n"
+ text += " }\n"
+ text += " },\n"
text += " {\n"
text += " x: new Date('{}').getTime(),\n".format(now_str)
text += " borderColor: '#775DD0',\n"
@@ -1760,6 +1780,22 @@ def render_chart(self, series_data, yaxis_name, chart_name, now_str, tagname="ch
text += " text: 'midnight'\n"
text += " }\n"
text += " }\n"
+ text += " ],\n"
+ text += " yaxis: [\n"
+ if yaxis_annotations:
+ for item in yaxis_annotations:
+ text += " {\n"
+ text += " y: {},\n".format(item.get("y", 0))
+ text += " borderColor: '{}',\n".format(item.get("borderColor", "#000000"))
+ text += " strokeDashArray: {},\n".format(item.get("strokeDashArray", 0))
+ text += " label: {\n"
+ text += " text: '{}',\n".format(item.get("text", ""))
+ text += " style: {\n"
+ text += " color: '{}',\n".format(item.get("textColor", "#fff"))
+ text += " background: '{}',\n".format(item.get("backgroundColor", "#FF0000"))
+ text += " }\n"
+ text += " }\n"
+ text += " },\n"
text += " ]\n"
text += " }\n"
text += "}\n"
@@ -2938,28 +2974,41 @@ def get_chart(self, chart):
pv_power = prune_today(pv_power_hist, self.now_utc, self.midnight_utc, prune=chart == "PV")
pv_forecast_hist = history_attribute(self.get_history_wrapper("sensor." + self.prefix + "_pv_forecast_h0", 7, required=False))
pv_forecast_histCL = history_attribute(self.get_history_wrapper("sensor." + self.prefix + "_pv_forecast_h0", 7, required=False), attributes=True, state_key="nowCL")
+ pv_forecast_histCS = history_attribute(self.get_history_wrapper("sensor." + self.prefix + "_pv_forecast_h0", 7, required=False), attributes=True, state_key="nowCS")
pv_forecast = prune_today(pv_forecast_hist, self.now_utc, self.midnight_utc, prune=chart == "PV", intermediate=True)
pv_forecastCL = prune_today(pv_forecast_histCL, self.now_utc, self.midnight_utc, prune=chart == "PV", intermediate=True)
+ pv_forecastCS = prune_today(pv_forecast_histCS, self.now_utc, self.midnight_utc, prune=chart == "PV", intermediate=True)
pv_today_forecast = prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + "_pv_today", "pv_estimate"), self.now_utc, self.midnight_utc, prune=False, intermediate=True)
pv_today_forecast10 = prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + "_pv_today", "pv_estimate10"), self.now_utc, self.midnight_utc, prune=False, intermediate=True)
pv_today_forecast90 = prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + "_pv_today", "pv_estimate90"), self.now_utc, self.midnight_utc, prune=False, intermediate=True)
pv_today_forecastCL = prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + "_pv_today", "pv_estimateCL"), self.now_utc, self.midnight_utc, prune=False, intermediate=True)
+ pv_today_forecastCS = prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + "_pv_today", "pv_clearsky"), self.now_utc, self.midnight_utc, prune=False, intermediate=True)
pv_today_forecast.update(prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + "_pv_tomorrow", "pv_estimate"), self.now_utc, self.midnight_utc, prune=False, intermediate=True))
pv_today_forecast10.update(prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + "_pv_tomorrow", "pv_estimate10"), self.now_utc, self.midnight_utc, prune=False, intermediate=True))
pv_today_forecast90.update(prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + "_pv_tomorrow", "pv_estimate90"), self.now_utc, self.midnight_utc, prune=False, intermediate=True))
pv_today_forecastCL.update(prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + "_pv_tomorrow", "pv_estimateCL"), self.now_utc, self.midnight_utc, prune=False, intermediate=True))
+ pv_today_forecastCS.update(prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + "_pv_tomorrow", "pv_clearsky"), self.now_utc, self.midnight_utc, prune=False, intermediate=True))
+ for d in ["d2", "d3", "d4", "d5", "d6"]:
+ pv_today_forecast.update(prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + f"_pv_{d}", "pv_estimate"), self.now_utc, self.midnight_utc, prune=False, intermediate=True))
+ pv_today_forecast10.update(prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + f"_pv_{d}", "pv_estimate10"), self.now_utc, self.midnight_utc, prune=False, intermediate=True))
+ pv_today_forecast90.update(prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + f"_pv_{d}", "pv_estimate90"), self.now_utc, self.midnight_utc, prune=False, intermediate=True))
+ pv_today_forecastCL.update(prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + f"_pv_{d}", "pv_estimateCL"), self.now_utc, self.midnight_utc, prune=False, intermediate=True))
+ pv_today_forecastCS.update(prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + f"_pv_{d}", "pv_clearsky"), self.now_utc, self.midnight_utc, prune=False, intermediate=True))
series_data = [
{"name": "PV Power", "data": pv_power, "opacity": "1.0", "stroke_width": "3", "stroke_curve": "smooth", "color": "#f5c43d"},
{"name": "Forecast History", "data": pv_forecast, "opacity": "0.3", "stroke_width": "3", "stroke_curve": "smooth", "color": "#a8a8a7", "chart_type": "area"},
{"name": "Forecast History CL", "data": pv_forecastCL, "opacity": "0.3", "stroke_width": "3", "stroke_curve": "smooth", "color": "#e90a0a", "chart_type": "area"},
+ {"name": "Forecast History CS", "data": pv_forecastCS, "opacity": "0.3", "stroke_width": "3", "stroke_curve": "smooth", "color": "#1877f2", "chart_type": "area"},
{"name": "Forecast", "data": pv_today_forecast, "opacity": "0.3", "stroke_width": "2", "stroke_curve": "smooth", "chart_type": "area", "color": "#a8a8a7"},
{"name": "Forecast 10%", "data": pv_today_forecast10, "opacity": "0.3", "stroke_width": "2", "stroke_curve": "smooth", "chart_type": "area", "color": "#6b6b6b"},
{"name": "Forecast 90%", "data": pv_today_forecast90, "opacity": "0.3", "stroke_width": "2", "stroke_curve": "smooth", "chart_type": "area", "color": "#cccccc"},
{"name": "Forecast CL", "data": pv_today_forecastCL, "opacity": "0.3", "stroke_width": "2", "stroke_curve": "smooth", "chart_type": "area", "color": "#e90a0a"},
+ {"name": "Forecast CS", "data": pv_today_forecastCS, "opacity": "0.3", "stroke_width": "2", "stroke_curve": "smooth", "chart_type": "area", "color": "#1877f2"},
]
text += self.render_chart(series_data, "kW", "Solar Forecast", now_str)
+
elif chart == "PVAccuracy":
# Get pv_today history once and extract total and remaining attributes per timestamp
pv_today_hist = self.get_history_wrapper("sensor." + self.prefix + "_pv_today", 7, required=False)
@@ -2979,6 +3028,104 @@ def get_chart(self, chart):
{"name": "PV Actual", "data": pv_actual, "opacity": "1.0", "stroke_width": "3", "stroke_curve": "stepline", "color": "#f5c43d"},
]
text += self.render_chart(series_data, "kWh", "PV Forecast vs Actual", now_str)
+ elif chart == "Clipping":
+ inverter_ac_limit_kw = getattr(self.base, "inverter_limit", 0) * 60.0
+
+ # Fetch Target SOC (Best Plan)
+ soc_kw_best_raw = history_attribute(self.get_history_wrapper(self.prefix + ".soc_kw_best", 3))
+ soc_kw_best = prune_today(soc_kw_best_raw, self.now_utc, self.midnight_utc, prune_past_days=2)
+ soc_kw_best.update(self.get_entity_results(self.prefix + ".soc_kw_best"))
+
+ # Fetch Actual SOC history (48 hours into the past)
+ soc_kw_h0 = {}
+ hist = getattr(self.base, "soc_kwh_history", {})
+ if hist:
+ for minute in range(-2 * 24 * 60, self.minutes_now, self.plan_interval_minutes):
+ val = hist.get(self.minutes_now - minute, None)
+ if val is not None:
+ minute_timestamp = self.midnight_utc + timedelta(minutes=minute)
+ stamp = minute_timestamp.strftime(TIME_FORMAT)
+ soc_kw_h0[stamp] = round(val, 2)
+ soc_kw_h0[now_str] = round(self.base.soc_kw, 2)
+
+ # Re-fetch PV actuals for overlay (48 hours into the past)
+ pv_power_hist = history_attribute(self.get_history_wrapper(self.prefix + ".pv_power", 3, required=False))
+ pv_power = prune_today(pv_power_hist, self.now_utc, self.midnight_utc, prune=True, prune_past_days=2)
+
+ axis_max = 12.0
+ if self.base.soc_max > 12.0 or inverter_ac_limit_kw > 12.0:
+ axis_max = max(self.base.soc_max, inverter_ac_limit_kw, 12.0)
+ axis_ticks = 6
+
+ # Data series parsing for per-minute data
+ step_size = getattr(self.base, "plan_interval_minutes", 30)
+ clipping_remaining_raw = history_attribute(self.get_history_wrapper(self.prefix + ".clipping_remaining", 3))
+ clipping_remaining_series = prune_today(clipping_remaining_raw, self.now_utc, self.midnight_utc, prune_past_days=2)
+
+ if getattr(self.base, "predict_clipping_remaining_best", None):
+ for minute, kwh in self.base.predict_clipping_remaining_best.items():
+ if minute % step_size == 0 and minute >= 0:
+ minute_timestamp = self.now_utc + timedelta(minutes=minute)
+ stamp = minute_timestamp.strftime(TIME_FORMAT)
+ clipping_remaining_series[stamp] = round(kwh, 2)
+
+ clipping_ceiling_raw = history_attribute(self.get_history_wrapper(self.prefix + ".clipping_ceiling", 3))
+ clipping_ceiling_series = prune_today(clipping_ceiling_raw, self.now_utc, self.midnight_utc, prune_past_days=2)
+
+ if getattr(self.base, "predict_clipping_ceiling_best", None):
+ for minute, kwh in self.base.predict_clipping_ceiling_best.items():
+ if minute % step_size == 0 and minute >= 0:
+ minute_timestamp = self.now_utc + timedelta(minutes=minute)
+ stamp = minute_timestamp.strftime(TIME_FORMAT)
+ clipping_ceiling_series[stamp] = round(kwh, 2)
+
+ # PV Forecast (Base) - historical and future
+ pv_forecast_hist = history_attribute(self.get_history_wrapper("sensor." + self.prefix + "_pv_forecast_h0", 3, required=False))
+ raw_pv_series = prune_today(pv_forecast_hist, self.now_utc, self.midnight_utc, prune=True, prune_past_days=2, intermediate=True)
+ raw_pv_series.update(prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + "_pv_today", "pv_estimate"), self.now_utc, self.midnight_utc, prune=True, intermediate=True))
+ raw_pv_series.update(prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + "_pv_tomorrow", "pv_estimate"), self.now_utc, self.midnight_utc, prune=True, intermediate=True))
+ for d in ["d2", "d3"]:
+ raw_pv_series.update(prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + f"_pv_{d}", "pv_estimate"), self.now_utc, self.midnight_utc, prune=True, intermediate=True))
+
+ # PV Forecast (ClearSky) - historical and future
+ pv_forecast_histCS = history_attribute(self.get_history_wrapper("sensor." + self.prefix + "_pv_forecast_h0", 3, required=False), attributes=True, state_key="nowCS")
+ cs_pv_series = prune_today(pv_forecast_histCS, self.now_utc, self.midnight_utc, prune=True, prune_past_days=2, intermediate=True)
+ cs_pv_series.update(prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + "_pv_today", "pv_clearsky"), self.now_utc, self.midnight_utc, prune=True, intermediate=True))
+ cs_pv_series.update(prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + "_pv_tomorrow", "pv_clearsky"), self.now_utc, self.midnight_utc, prune=True, intermediate=True))
+ for d in ["d2", "d3"]:
+ cs_pv_series.update(prune_today(self.get_entity_detailedForecast("sensor." + self.prefix + f"_pv_{d}", "pv_clearsky"), self.now_utc, self.midnight_utc, prune=True, intermediate=True))
+
+ series_data = [
+ {"name": "Clipping Remaining", "data": clipping_remaining_series, "opacity": "1.0", "stroke_width": "3", "stroke_curve": "smooth", "color": "#FF1493", "unit": "kWh"},
+ {"name": "Actual SOC", "data": soc_kw_h0, "opacity": "1.0", "stroke_width": "3", "stroke_curve": "smooth", "color": "#00BFFF", "unit": "kWh"},
+ {"name": "Target SOC", "data": soc_kw_best, "opacity": "0.7", "stroke_width": "2", "stroke_curve": "stepline", "stroke_dasharray": "5", "color": "#1E90FF", "unit": "kWh"},
+ {"name": "Clipping Ceiling", "data": clipping_ceiling_series, "opacity": "0.7", "stroke_width": "2", "stroke_curve": "stepline", "stroke_dasharray": "5", "color": "#C71585", "unit": "kWh"},
+ {"name": "PV Power Actual", "data": pv_power, "opacity": "1.0", "stroke_width": "3", "stroke_curve": "smooth", "color": "#FFA500", "unit": "kW"},
+ {"name": "PV Forecast (Base)", "data": raw_pv_series, "opacity": "0.7", "stroke_width": "2", "stroke_curve": "smooth", "stroke_dasharray": "5", "color": "#FFD700", "unit": "kW"},
+ ]
+ if cs_pv_series:
+ series_data.append({"name": "PV Forecast (ClearSky)", "data": cs_pv_series, "opacity": "0.7", "stroke_width": "2", "stroke_curve": "smooth", "stroke_dasharray": "5", "color": "#FF8C00", "unit": "kW"})
+
+ yaxis_annotations = []
+ if inverter_ac_limit_kw > 0:
+ yaxis_annotations.append({"y": inverter_ac_limit_kw, "borderColor": "#FF0000", "strokeDashArray": 4, "text": "Inverter AC Capacity ({} kW)".format(inverter_ac_limit_kw), "textColor": "#fff", "backgroundColor": "#FF0000"})
+
+ xaxis_annotations = []
+ buffer_start = getattr(self.base, "clipping_buffer_start", None)
+ buffer_end = getattr(self.base, "clipping_buffer_end", None)
+ if getattr(self.base, "clipping_buffer_kwh", 0) > 0 and buffer_start is not None and buffer_end is not None:
+ start_stamp = (self.midnight_utc + timedelta(minutes=buffer_start)).strftime(TIME_FORMAT)
+ end_stamp = (self.midnight_utc + timedelta(minutes=buffer_end)).strftime(TIME_FORMAT)
+ xaxis_annotations.append({"x": start_stamp, "borderColor": "#ffa500", "strokeDashArray": 4, "text": "Today Buffer Start", "orientation": "vertical", "backgroundColor": "#ffa500"})
+ xaxis_annotations.append({"x": end_stamp, "borderColor": "#ffa500", "strokeDashArray": 4, "text": "Today Buffer End", "orientation": "vertical", "backgroundColor": "#ffa500"})
+
+ clipping_total = 0
+ if getattr(self.base, "predict_clipped_best", None):
+ clipping_total = self.base.predict_clipped_best.get(max(self.base.predict_clipped_best.keys()), 0.0)
+
+ chart_title = "Clipping Analysis (Expected Total Clipping: {:.2f} kWh)".format(clipping_total)
+
+ text += self.render_chart(series_data, "kWh", chart_title, now_str, xaxis_annotations=xaxis_annotations, yaxis_annotations=yaxis_annotations)
elif chart == "LoadML":
load_today_history = self.get_history_with_now_attrs("sensor." + self.prefix + "_load_ml_stats", 7)
# Get historical load data for last 24 hours
@@ -3280,6 +3427,7 @@ async def html_charts(self, request):
text += f'PV'
text += f'PV7'
text += f'PVAccuracy'
+ text += f'Clipping'
text += f'Savings'
text += f'BatteryDegradation'
text += f'MarginalCosts'
diff --git a/docs/customisation.md b/docs/customisation.md
index 0d6a67336..e419e31df 100644
--- a/docs/customisation.md
+++ b/docs/customisation.md
@@ -458,6 +458,40 @@ The amount of modulation depends on the standard deviation of your load predicti
You can disable this feature (_expert mode only_) using **switch.metric_load_divergence_enable**.
+### Automated Solar Clipping Mitigation
+
+In solar setups where your peak PV generation exceeds your inverter's maximum AC limit (for example, a 6.5kW solar array connected to a 5kW inverter), the excess energy is "clipped" and permanently lost if your battery is already fully charged.
+
+Historically, Predbat's standard planning lacked visibility into the precise magnitude of this expected clipped energy, meaning it could not preemptively create room in the battery to absorb the excess yield.
+
+The **Clipping Buffer** feature introduces an intelligent mitigation layer that calculates the exact energy expected to be lost to inverter clipping, and automatically schedules preemptive battery exports to create the necessary buffer before the peak hits.
+
+#### Key Features
+
+- **Dynamic Clipping Prediction**: Analyzes the raw PV forecast curve against the user's configured `inverter_limit` to calculate the exact volume of energy (in kWh) that will exceed AC capacity.
+- **Preemptive Export Scheduling**: Automatically forces battery export windows prior to peak generation periods, guaranteeing the battery has precisely enough headroom to soak up the excess energy.
+- **Clear-Sky Cloud Modeling**: Includes an optional integration for clear-sky data (e.g., `ha_solcast_clearsky`) and an auto-tuning amplification factor. This generates a theoretical maximum generation envelope, ensuring the clipping buffer is sized safely even when standard forecasts fluctuate due to unpredictable cloud cover.
+
+#### Configuration Settings
+
+These settings can be found in the Home Assistant Predbat Configuration panel, directly beneath the Cloud Model settings.
+
+- **`clipping_buffer_enable`**: The master switch. Turns the entire clipping buffer feature on or off.
+- **`clipping_clearsky_source`**: Determines where Predbat gets its clear-sky (theoretical maximum) data. Options include `auto`, `ha_solcast_clearsky`, `solcast_api`, and `openmeteo`. This piggybacks on your existing Predbat/HA API integrations, requiring no extra YAML configuration.
+- **`clipping_use_clearsky_peaks`**: If enabled, Predbat uses the selected Clear-Sky source to size the buffer, rather than just the standard forecast. This is highly recommended to protect against cloud-edge spikes.
+- **`clipping_auto_tune`**: Automatically learns the scaling difference between your standard solar forecast and your hardware limits based on past clipping behavior, saving a tracking multiplier to `clipping_auto_tune.json`.
+- **`clipping_amplification`**: A manual multiplier (e.g., `1.5x`) to force the standard solar forecast higher to simulate a sunny spike. This is only utilized if Auto-Tune is disabled.
+- **`clipping_cost_weight`**: An internal optimizer multiplier (default `1.0`). If Predbat isn't dumping the battery aggressively enough before a peak, increasing this number adds a harsher financial penalty for clipping, forcing the optimizer to prioritize creating headroom.
+- **`clipping_limit_override`**: Manually define your inverter's AC ceiling in Watts (e.g., `5000W`). If left at `0`, Predbat auto-detects it from your inverter entity.
+
+#### Manual Overrides
+
+If you do not want to use the dynamic cloud-based tracking, you can manually force a static buffer size and time window:
+
+- **`clipping_buffer_max_kwh`**: Manually force the maximum size of the clipping buffer (e.g. `3.0` kWh).
+- **`clipping_buffer_start_time`**: The start time for the fixed buffer window.
+- **`clipping_buffer_end_time`**: The end time for the fixed buffer window.
+
## iBoost model options
Predbat has an 'iBoost model' that can be used to model using excess solar energy to heat hot water (or similar) instead of it being exported to the grid.
diff --git a/templates/example_chart.yml b/templates/example_chart.yml
index bc15927b1..547fd2aab 100644
--- a/templates/example_chart.yml
+++ b/templates/example_chart.yml
@@ -20,10 +20,14 @@ now:
yaxis:
- min: 0
max: 9.54
+apex_config:
+ stroke:
+ dashArray: [0, 0, 0, 0, 4, 4, 4, 0, 0, 0]
series:
- entity: predbat.soc_kw_h0
- stroke_width: 1
+ stroke_width: 2
curve: stepline
+ color: '#000000'
name: actual
extend_to: now
show:
@@ -42,6 +46,7 @@ series:
- entity: predbat.soc_kw_best
stroke_width: 3
curve: smooth
+ color: '#1f77b4'
name: best
data_generator: >
let res = []; for (const [key, value] of
@@ -62,11 +67,12 @@ series:
show:
in_header: raw
- entity: predbat.best_charge_limit_kw
- stroke_width: 4
+ stroke_width: 3
curve: stepline
+ color: '#2ca02c'
name: charge_limit_best
- type: area
- opacity: 0.2
+ type: line
+ opacity: 0.8
data_generator: >
let res = []; for (const [key, value] of
Object.entries(entity.attributes.results)) { res.push([new
@@ -75,8 +81,9 @@ series:
show:
in_header: raw
- entity: predbat.charge_limit_kw
- stroke_width: 1
+ stroke_width: 2
curve: stepline
+ color: '#98df8a'
name: charge_limit_base
data_generator: >
let res = []; for (const [key, value] of
@@ -88,6 +95,7 @@ series:
- entity: predbat.best_export_limit_kw
stroke_width: 2
curve: stepline
+ color: '#d62728'
name: export_best
data_generator: >
let res = []; for (const [key, value] of
@@ -121,6 +129,18 @@ series:
b[0] })
show:
in_header: raw
+ - entity: predbat.clipping_remaining
+ name: clipping
+ type: column
+ color: '#ff9800'
+ opacity: 0.5
+ data_generator: >
+ let res = []; for (const [key, value] of
+ Object.entries(entity.attributes.results)) { res.push([new
+ Date(key).getTime(), value]); } return res.sort((a, b) => { return a[0] -
+ b[0] })
+ show:
+ in_header: raw
###############################################
# This is a cost chart
###############################################