diff --git a/apps/predbat/components.py b/apps/predbat/components.py index 3236c1845..ed5a1eab6 100644 --- a/apps/predbat/components.py +++ b/apps/predbat/components.py @@ -44,6 +44,7 @@ HAS_GATEWAY = False GatewayMQTT = None from load_ml_component import LoadMLComponent +from lattice_component import LatticeComponent from datetime import datetime, timezone, timedelta import asyncio import os @@ -51,6 +52,7 @@ COMPONENT_LIST = { "storage": {"class": StorageComponent, "name": "Storage", "args": {}, "can_restart": True, "phase": 0}, + "lattice": {"class": LatticeComponent, "name": "Lattice Device Map", "args": {}, "can_restart": True, "phase": 2}, "db": { "class": DatabaseManager, "name": "Database Manager", diff --git a/apps/predbat/config.py b/apps/predbat/config.py index 0a1bbdb2f..0d5c7cb10 100644 --- a/apps/predbat/config.py +++ b/apps/predbat/config.py @@ -44,6 +44,12 @@ "type": "switch", "default": False, }, + { + "name": "lattice_projection_enable", + "friendly_name": "Lattice Device Map (experimental)", + "type": "switch", + "default": False, + }, { "name": "active", "friendly_name": "Predbat Active", diff --git a/apps/predbat/lattice.py b/apps/predbat/lattice.py new file mode 100644 index 000000000..34c2d6354 --- /dev/null +++ b/apps/predbat/lattice.py @@ -0,0 +1,195 @@ +# ----------------------------------------------------------------------------- +# Predbat Home Battery System - Lattice device-mapping core (read-only) +# Copyright Trefor Southwell 2026 - All Rights Reserved +# This application maybe used for personal use only and not for commercial use +# ----------------------------------------------------------------------------- +"""Pure, dependency-free Lattice device-mapping model. + +Each integration publishes a *fragment* describing the devices it can see — their identity, +type, access paths, and the sensors they expose (referencing existing entities). Fragments merge +by identity into one site graph: a device seen via two integrations becomes ONE node carrying +both providers' access paths (ranked) and the union of its sensors. + +READ-ONLY by design: this maps the network and inventories sensors. Control is a separate model +(a common intent/shape/binding API) and deliberately not part of this — see the lattice-spec repo. +No PredBat/Home Assistant dependencies, so it can be unit-tested standalone. +""" +from dataclasses import dataclass, field +from typing import Optional + + +@dataclass +class AccessPath: + """A way to reach a node (a provider/transport with a ranked preference).""" + + id: str + provider: str + locality: str = "local" + transport: str = "" + preference: int = 0 + + @staticmethod + def from_dict(d): + """Build an AccessPath from its wire dict.""" + return AccessPath( + id=d["id"], + provider=d.get("provider", ""), + locality=d.get("locality", "local"), + transport=d.get("transport", ""), + preference=int(d.get("preference", 0)), + ) + + +@dataclass +class Sensor: + """A telemetry reading a node exposes, referencing the existing entity that carries it.""" + + capability: str + unit: str = "" + entity: str = "" + access_path: str = "" + + @staticmethod + def from_dict(d): + """Build a Sensor from its wire dict.""" + return Sensor(capability=d["capability"], unit=d.get("unit", ""), entity=d.get("entity", ""), access_path=d.get("accessPath", "")) + + +@dataclass +class Node: + """A device in the graph, identified by id (serial), with access paths + sensors.""" + + id: str + kind: str + device_type: str + access_paths: list = field(default_factory=list) + sensors: list = field(default_factory=list) + + def sensor(self, name) -> Optional[Sensor]: + """Return the first Sensor matching name, or None.""" + for s in self.sensors: + if s.capability == name: + return s + return None + + @staticmethod + def from_dict(d): + """Build a Node from its wire dict.""" + return Node( + id=d["id"], + kind=d.get("kind", ""), + device_type=d.get("deviceType", ""), + access_paths=[AccessPath.from_dict(a) for a in d.get("accessPaths", [])], + sensors=[Sensor.from_dict(s) for s in d.get("sensors", [])], + ) + + +@dataclass +class Fragment: + """A producer's slice of the topology: nodes + relationships + its provider id.""" + + provider: str + nodes: list = field(default_factory=list) + relationships: list = field(default_factory=list) + name: str = "" + version: str = "0.1.0" + + @staticmethod + def from_dict(d): + """Build a Fragment from a producer's wire dict.""" + prod = d.get("producer", {}) + return Fragment( + provider=prod.get("provider", ""), + name=prod.get("name", ""), + version=d.get("topologyVersion", "0.1.0"), + nodes=[Node.from_dict(n) for n in d.get("nodes", [])], + relationships=list(d.get("relationships", [])), + ) + + +@dataclass +class SiteGraph: + """The merged site: one node per physical device, carrying all producers' access paths.""" + + nodes: list = field(default_factory=list) + relationships: list = field(default_factory=list) + + def node(self, node_id) -> Optional[Node]: + """Return the node with this id, or None.""" + for n in self.nodes: + if n.id == node_id: + return n + return None + + +def merge_fragments(fragments) -> SiteGraph: + """Merge producer fragments into one site graph, keyed by node id (serial). + + Same id from multiple producers becomes one node carrying every producer's access paths + (ranked by preference desc) and the union of its sensors. Distinct ids become sibling nodes. + Relationships are combined. + """ + by_id = {} + order = [] + relationships = [] + for frag in fragments: + for n in frag.nodes: + if n.id not in by_id: + by_id[n.id] = Node(id=n.id, kind=n.kind, device_type=n.device_type, access_paths=list(n.access_paths), sensors=list(n.sensors)) + order.append(n.id) + else: + existing = by_id[n.id] + seen_ap = {ap.id for ap in existing.access_paths} + existing.access_paths.extend(ap for ap in n.access_paths if ap.id not in seen_ap) + seen_sensor = {(s.capability, s.access_path) for s in existing.sensors} + existing.sensors.extend(s for s in n.sensors if (s.capability, s.access_path) not in seen_sensor) + relationships.extend(frag.relationships) + for n in by_id.values(): + n.access_paths.sort(key=lambda ap: ap.preference, reverse=True) + return SiteGraph(nodes=[by_id[i] for i in order], relationships=relationships) + + +def resolve_sensor(site, capability, node_id): + """Return the preferred entity for a node's sensor (highest-preference access path), or None. + + When a device is seen via several providers, this picks the sensor on the most-preferred + available access path. Pure read resolution — no control. + """ + node = site.node(node_id) + if node is None: + return None + best = None + best_pref = None + pref = {ap.id: ap.preference for ap in node.access_paths} + for s in node.sensors: + if s.capability != capability or not s.entity: + continue + p = pref.get(s.access_path, 0) + if best is None or p > best_pref: + best, best_pref = s.entity, p + return best + + +def device_fragment(devices, provider, name, transport, preference, locality): + """Build a read-only producer fragment from plain device data. + + Each device dict needs a `serial` (skipped if missing) and may carry `device_type` and a + `sensors` list of {capability, unit, entity}. Every device becomes a node on one access path + advertising those sensors. Pure — no PredBat deps, no control. + """ + nodes = [] + for dev in devices: + serial = dev.get("serial") + if not serial: + continue + sensors = [{"capability": s["capability"], "unit": s.get("unit", ""), "entity": s.get("entity", ""), "accessPath": provider} for s in dev.get("sensors", [])] + nodes.append( + { + "id": serial, + "kind": dev.get("kind", "inverter"), + "deviceType": str(dev.get("device_type", "")).lower(), + "accessPaths": [{"id": provider, "provider": provider, "locality": locality, "transport": transport, "preference": preference}], + "sensors": sensors, + } + ) + return {"topologyVersion": "0.1.0", "scope": "fragment", "producer": {"name": name, "provider": provider}, "nodes": nodes, "relationships": []} diff --git a/apps/predbat/lattice_component.py b/apps/predbat/lattice_component.py new file mode 100644 index 000000000..8902814f6 --- /dev/null +++ b/apps/predbat/lattice_component.py @@ -0,0 +1,34 @@ +# ----------------------------------------------------------------------------- +# Predbat Home Battery System - Lattice device-map component (read-only) +# Copyright Trefor Southwell 2026 - All Rights Reserved +# This application maybe used for personal use only and not for commercial use +# ----------------------------------------------------------------------------- +"""Builds the merged Lattice device map inside the live PredBat system. + +When `lattice_projection_enable` is on, this rebuilds the merged site map from every producer +component each cycle and logs it — which devices are on the network, their access paths, and the +sensors each exposes. READ-ONLY observability; it does not control anything. No-op when off. +""" +from component_base import ComponentBase +from lattice_projection import LatticeProjection + + +class LatticeComponent(ComponentBase): + """Live host for the read-only Lattice device map.""" + + def initialize(self, **kwargs): + """Create the projection over the PredBat base.""" + self.projection = LatticeProjection(self.base) + self.run_timeout = 60 + + async def run(self, seconds, first): + """Rebuild + log the merged device map when enabled; no-op when disabled.""" + if not self.projection.enabled(): + return True + site = self.projection.refresh() + self.log("Lattice: merged device map — {} device(s)".format(len(site.nodes))) + for node in site.nodes: + providers = [ap.provider for ap in node.access_paths] + sensors = [s.capability for s in node.sensors] + self.log("Lattice: device {} ({}) via {} — sensors {}".format(node.id, node.device_type, providers, sensors)) + return True diff --git a/apps/predbat/lattice_projection.py b/apps/predbat/lattice_projection.py new file mode 100644 index 000000000..00c8be68d --- /dev/null +++ b/apps/predbat/lattice_projection.py @@ -0,0 +1,69 @@ +# ----------------------------------------------------------------------------- +# Predbat Home Battery System - Lattice device-map projection (read-only) +# Copyright Trefor Southwell 2026 - All Rights Reserved +# This application maybe used for personal use only and not for commercial use +# ----------------------------------------------------------------------------- +"""Collects device fragments from the integration components and merges them into one site map. + +READ-ONLY: this maps the devices on the network and inventories their sensors. It does not control +anything — control is a separate, deferred model (a common intent/shape/binding API; see lattice-spec). +""" +from lattice import merge_fragments, resolve_sensor, Fragment + + +class LatticeProjection: + """Discovers producer components, merges their fragments, and exposes the merged device map.""" + + def __init__(self, base): + """Hold the PredBat base for component access and logging.""" + self.base = base + self.site = None + + def enabled(self): + """True when device mapping is switched on (default off).""" + return bool(self.base.get_arg("lattice_projection_enable", False)) + + def _producers(self): + """Yield (name, component) for every registered component that publishes a fragment. + + Discovery is data-driven: ANY component implementing lattice_fragment is a producer, so a + new integration is mapped here with no change. + """ + registry = getattr(self.base, "components", None) + if registry is None: + return + for name in registry.get_all(): + comp = registry.get_component(name) + if comp is not None and hasattr(comp, "lattice_fragment"): + yield name, comp + + def refresh(self): + """Re-collect fragments from all producers and rebuild the merged site map.""" + fragments = [] + for name, comp in self._producers(): + try: + fragments.append(Fragment.from_dict(comp.lattice_fragment())) + except Exception as exc: # a bad producer must not break the others + self.base.log("Warn: lattice: producer {} failed: {}".format(name, exc)) + self.site = merge_fragments(fragments) + return self.site + + def live_providers(self): + """Provider ids whose producing component currently reports alive.""" + live = set() + for _name, comp in self._producers(): + try: + if comp.is_alive(): + for node in self.site.nodes if self.site else []: + for ap in node.access_paths: + live.add(ap.provider) + break + except Exception: + continue + return live + + def sensor_entity(self, capability, node_id): + """Return the preferred entity for a device's sensor (best access path), or None.""" + if self.site is None: + return None + return resolve_sensor(self.site, capability, node_id) diff --git a/apps/predbat/test_lattice.py b/apps/predbat/test_lattice.py new file mode 100644 index 000000000..177982692 --- /dev/null +++ b/apps/predbat/test_lattice.py @@ -0,0 +1,220 @@ +# ----------------------------------------------------------------------------- +# Predbat Home Battery System - Lattice device-mapping unit tests (read-only) +# ----------------------------------------------------------------------------- +"""Unit tests for the pure Lattice device-mapping core (no PredBat / Home Assistant).""" +import unittest + +from lattice import Fragment, Sensor, AccessPath, merge_fragments, resolve_sensor, device_fragment +from lattice_projection import LatticeProjection + + +class TestModel(unittest.TestCase): + """Fragment parses from the wire dict shape (devices + access paths + sensors).""" + + def test_fragment_from_dict(self): + """A producer dict becomes a typed Fragment with nodes/sensors/access paths.""" + f = Fragment.from_dict( + { + "topologyVersion": "0.1.0", + "scope": "fragment", + "producer": {"name": "Local gateway", "provider": "local-gateway"}, + "nodes": [ + { + "id": "INV-1", + "kind": "inverter", + "deviceType": "ge-aio", + "accessPaths": [{"id": "gw-local", "provider": "local-gateway", "locality": "local", "transport": "modbus", "preference": 10}], + "sensors": [{"capability": "soc", "unit": "%", "entity": "sensor.predbat_gateway_inv1_soc", "accessPath": "gw-local"}], + } + ], + } + ) + self.assertEqual(f.provider, "local-gateway") + n = f.nodes[0] + self.assertEqual(n.id, "INV-1") + self.assertEqual(n.access_paths[0].preference, 10) + self.assertEqual(n.sensor("soc").entity, "sensor.predbat_gateway_inv1_soc") + self.assertIsNone(n.sensor("nope")) + + def test_defaults(self): + """AccessPath/Sensor from_dict tolerate minimal dicts.""" + self.assertEqual(AccessPath.from_dict({"id": "x"}).preference, 0) + self.assertEqual(Sensor.from_dict({"capability": "soc"}).entity, "") + + +class TestMerge(unittest.TestCase): + """Same serial from two producers becomes one node with both access paths + combined sensors.""" + + def _frag(self, provider, pref, entity): + return Fragment.from_dict( + { + "producer": {"provider": provider}, + "nodes": [ + { + "id": "INV-1", + "kind": "inverter", + "deviceType": "ge-aio", + "accessPaths": [{"id": provider, "provider": provider, "preference": pref}], + "sensors": [{"capability": "soc", "unit": "%", "entity": entity, "accessPath": provider}], + } + ], + } + ) + + def test_merge_by_serial(self): + """One device via gateway + cloud => one node, 2 access paths (gateway first), 2 soc sensors.""" + merged = merge_fragments([self._frag("local-gateway", 10, "sensor.gw_soc"), self._frag("ge-cloud", 1, "sensor.cloud_soc")]) + self.assertEqual(len(merged.nodes), 1) + node = merged.nodes[0] + self.assertEqual([ap.provider for ap in node.access_paths], ["local-gateway", "ge-cloud"]) + self.assertEqual(len(node.sensors), 2) + + def test_distinct_serials_are_siblings(self): + """Different serials remain separate nodes.""" + a = self._frag("local-gateway", 10, "sensor.a") + b = self._frag("ge-cloud", 1, "sensor.b") + b.nodes[0].id = "INV-2" + self.assertEqual(len(merge_fragments([a, b]).nodes), 2) + + +class TestResolveSensor(unittest.TestCase): + """resolve_sensor returns the entity on the highest-preference access path.""" + + def test_prefers_highest_preference(self): + """A device seen via gateway(10) + cloud(1) resolves soc to the gateway's entity.""" + merged = merge_fragments( + [ + Fragment.from_dict(device_fragment([{"serial": "INV-1", "sensors": [{"capability": "soc", "entity": "sensor.gw_soc"}]}], provider="local-gateway", name="GW", transport="modbus", preference=10, locality="local")), + Fragment.from_dict(device_fragment([{"serial": "INV-1", "sensors": [{"capability": "soc", "entity": "sensor.cloud_soc"}]}], provider="ge-cloud", name="Cloud", transport="https", preference=1, locality="cloud")), + ] + ) + self.assertEqual(resolve_sensor(merged, "soc", "INV-1"), "sensor.gw_soc") + self.assertIsNone(resolve_sensor(merged, "soc", "NOPE")) + self.assertIsNone(resolve_sensor(merged, "power", "INV-1")) + + +class TestDeviceFragment(unittest.TestCase): + """device_fragment builds a read-only fragment from plain device data.""" + + def test_build(self): + """Each device becomes a node with its access path + declared sensors; no control anywhere.""" + f = Fragment.from_dict( + device_fragment( + [{"serial": "CH-1", "device_type": "GIVENERGY_AIO", "sensors": [{"capability": "soc", "unit": "%", "entity": "sensor.ch1_soc"}, {"capability": "battery_power", "unit": "W", "entity": "sensor.ch1_power"}]}], + provider="local-gateway", + name="GW", + transport="modbus", + preference=10, + locality="local", + ) + ) + n = f.nodes[0] + self.assertEqual(n.id, "CH-1") + self.assertEqual(n.device_type, "givenergy_aio") + self.assertEqual({s.capability for s in n.sensors}, {"soc", "battery_power"}) + self.assertFalse(hasattr(n, "capabilities")) # no control surface + + def test_skips_without_serial(self): + """Devices with no serial are skipped (cannot be identity-keyed).""" + self.assertEqual(device_fragment([{"device_type": "x"}], provider="p", name="n", transport="t", preference=1, locality="local")["nodes"], []) + + +class _FakeComp: + """A stand-in producer component exposing lattice_fragment() and is_alive().""" + + def __init__(self, fragment, alive=True): + self._fragment = fragment + self._alive = alive + + def lattice_fragment(self): + return self._fragment + + def is_alive(self): + return self._alive + + +class _FakeComponents: + """A stand-in component registry.""" + + def __init__(self, mapping): + self._mapping = mapping + + def get_component(self, name): + return self._mapping.get(name) + + def get_all(self): + return list(self._mapping.keys()) + + +class _FakeBase: + """A minimal PredBat base for dependency-injected tests.""" + + def __init__(self, components, args=None): + self.components = components + self._args = args or {} + self.args = self._args + self.logs = [] + self.local_tz = None + self.prefix = "predbat" + + def get_arg(self, name, default=None): + return self._args.get(name, default) + + def log(self, message): + self.logs.append(message) + + +class TestLatticeProjection(unittest.TestCase): + """The projection discovers producers, merges them, and resolves sensors — read-only.""" + + def _proj(self, **args): + gw = device_fragment([{"serial": "INV-1", "sensors": [{"capability": "soc", "entity": "sensor.gw_soc"}]}], provider="local-gateway", name="GW", transport="modbus", preference=10, locality="local") + cloud = device_fragment([{"serial": "INV-1", "sensors": [{"capability": "soc", "entity": "sensor.cloud_soc"}]}], provider="ge-cloud", name="Cloud", transport="https", preference=1, locality="cloud") + fox = device_fragment([{"serial": "FOX-1", "sensors": []}], provider="fox-cloud", name="Fox", transport="https", preference=1, locality="cloud") + comps = _FakeComponents({"gateway": _FakeComp(gw), "gecloud": _FakeComp(cloud), "fox": _FakeComp(fox)}) + proj = LatticeProjection(_FakeBase(comps, args or None)) + proj.refresh() + return proj + + def test_merges_and_discovers_any_brand(self): + """All producers (incl. a third brand) auto-merge into the device map.""" + proj = self._proj() + self.assertEqual({n.id for n in proj.site.nodes}, {"INV-1", "FOX-1"}) + + def test_sensor_entity_prefers_gateway(self): + """A device seen via gateway + cloud resolves soc to the gateway entity.""" + self.assertEqual(self._proj().sensor_entity("soc", "INV-1"), "sensor.gw_soc") + + def test_enabled_default_off(self): + """Mapping is off unless lattice_projection_enable is set.""" + self.assertFalse(self._proj().enabled()) + self.assertTrue(self._proj(lattice_projection_enable=True).enabled()) + + +class TestLatticeComponent(unittest.IsolatedAsyncioTestCase): + """The component builds + logs the device map when enabled; no-op when off.""" + + def _base(self, enabled): + gw = device_fragment([{"serial": "INV-1", "sensors": [{"capability": "soc", "entity": "sensor.gw_soc"}]}], provider="local-gateway", name="GW", transport="modbus", preference=10, locality="local") + comps = _FakeComponents({"gateway": _FakeComp(gw)}) + return _FakeBase(comps, {"lattice_projection_enable": True} if enabled else None) + + async def test_noop_when_disabled(self): + from lattice_component import LatticeComponent + + comp = LatticeComponent(self._base(enabled=False)) + self.assertTrue(await comp.run(0, True)) + self.assertIsNone(comp.projection.site) + self.assertEqual(comp.base.logs, []) + + async def test_logs_map_when_enabled(self): + from lattice_component import LatticeComponent + + comp = LatticeComponent(self._base(enabled=True)) + self.assertTrue(await comp.run(0, True)) + self.assertEqual(len(comp.projection.site.nodes), 1) + self.assertTrue(any("device map" in m for m in comp.base.logs)) + + +if __name__ == "__main__": + unittest.main()