Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions examples/chat/python/src/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
from langchain_core.tools import tool
from langgraph_sdk import get_client

from src.streaming.envelope_tool import render_a2ui_surface
from src.streaming.envelope_normalizer import normalize_envelope_args
from src.schemas.a2ui_v1 import A2UI_V1_SCHEMA_PROMPT


# Module-level singleton client; created lazily on first thread-title write.
_threads_client = None
Expand Down Expand Up @@ -156,7 +160,7 @@ async def _maybe_write_thread_title(state: "State", config: RunnableConfig) -> N
"'render Y', 'show a Z', 'create a form for', 'make a card with' — "
"you MUST IMMEDIATELY dispatch the schema-generation tool bound "
"to the conversation. Exactly ONE such tool is bound per request: "
"either `generate_a2ui_schema` or `generate_json_render_spec`. "
"either `render_a2ui_surface` or `generate_json_render_spec`. "
"Do NOT ask clarifying questions about platform, framework, fields, "
"validation, styling, or anything else. Do NOT describe the UI in "
"prose. Do NOT request more details from the user. The tool ITSELF "
Expand Down Expand Up @@ -326,23 +330,6 @@ async def research(topic: str, subagent_type: str = "research") -> str:
A2UI_PREFIX = "---a2ui_JSON---"


@tool
async def generate_a2ui_schema(request: str) -> str:
"""Dispatch the A2UI schema sub-agent to render a UI surface in A2UI
v1 wire format. Use this when the user asks for UI/forms/cards and
state.gen_ui_mode is 'a2ui'. Pass the user's request verbatim as the
`request` argument. The sub-agent returns a JSON array of v1
envelopes (surfaceUpdate, optional dataModelUpdate, beginRendering)
that the post-process node wraps for the chat composition."""
from src.schemas.a2ui_v1 import A2UI_V1_SCHEMA_PROMPT
llm = ChatOpenAI(model="gpt-5-mini", temperature=0)
response = await llm.ainvoke([
SystemMessage(content=A2UI_V1_SCHEMA_PROMPT),
HumanMessage(content=request),
])
return _as_text(response.content).strip()


@tool
async def generate_json_render_spec(request: str) -> str:
"""Dispatch the json-render schema sub-agent to render a UI surface
Expand Down Expand Up @@ -403,13 +390,26 @@ async def generate(state: State, config: RunnableConfig) -> dict:
# side of the conditional resolves at execution time.
gen_ui_mode = state.get("gen_ui_mode") or "a2ui"
gen_ui_tool = (
generate_a2ui_schema if gen_ui_mode == "a2ui"
render_a2ui_surface if gen_ui_mode == "a2ui"
else generate_json_render_spec
)
llm = ChatOpenAI(**kwargs).bind_tools([
search_documents, request_approval, research, gen_ui_tool,
])
messages = [SystemMessage(content=SYSTEM_PROMPT)] + state["messages"]
# Strict mode is enabled for the envelope-emission tool so OpenAI enforces
# the canonical {envelopes: [...]} argument shape; the JS bridge and Python
# normalizer treat the non-canonical shapes as safety nets.
llm = ChatOpenAI(**kwargs).bind_tools(
[search_documents, request_approval, research, gen_ui_tool],
strict=True if gen_ui_mode == "a2ui" else False,
)
# Append A2UI v1 schema to system prompt when in a2ui mode, so the parent
# LLM knows how to construct the envelopes directly.
system = SYSTEM_PROMPT
if gen_ui_mode == "a2ui":
system = SYSTEM_PROMPT + "\n\n--- A2UI v1 SCHEMA ---\n" + A2UI_V1_SCHEMA_PROMPT + (
"\n\nWhen rendering UI in a2ui mode, emit envelopes in this order: "
"surfaceUpdate FIRST, then beginRendering, then any dataModelUpdate "
"entries. This lets the client mount the surface as early as possible."
)
messages = [SystemMessage(content=system)] + state["messages"]
response = await llm.ainvoke(messages)
return {"messages": [response]}

Expand Down Expand Up @@ -437,7 +437,7 @@ def after_tools(state: State) -> Literal["emit_generated_surface", "generate"]:
if isinstance(prior, AIMessage) and prior.tool_calls:
for tc in prior.tool_calls:
if tc.get("id") == m.tool_call_id and tc.get("name") in (
"generate_a2ui_schema", "generate_json_render_spec",
"render_a2ui_surface", "generate_json_render_spec",
):
return "emit_generated_surface"
break
Expand Down Expand Up @@ -480,7 +480,7 @@ async def emit_generated_surface(state: State) -> dict:
if not payload:
return {}

if tool_name == "generate_a2ui_schema":
if tool_name == "render_a2ui_surface":
# Sub-LLM returns a JSON array of v1 envelopes. Convert to JSONL
# (one envelope per line) and prepend the classifier sentinel.
try:
Expand Down Expand Up @@ -645,7 +645,7 @@ async def attach_citations(state: State) -> dict:
_builder.add_node("generate", generate)
_builder.add_node("tools", ToolNode([
search_documents, request_approval, research,
generate_a2ui_schema, generate_json_render_spec,
render_a2ui_surface, generate_json_render_spec,
]))
_builder.add_node("emit_generated_surface", emit_generated_surface)
_builder.add_node("attach_citations", attach_citations)
Expand Down
2 changes: 2 additions & 0 deletions examples/chat/python/src/streaming/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# SPDX-License-Identifier: MIT
"""Backend streaming helpers for progressive A2UI envelope emission."""
36 changes: 36 additions & 0 deletions examples/chat/python/src/streaming/envelope_normalizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# SPDX-License-Identifier: MIT
"""Normalises the four envelope-args shapes the parent LLM may emit into
a canonical envelope list. Parity with libs/chat/src/lib/a2ui/envelope-normalizer.ts.

The spike (examples/chat/python/spike/parent_envelope_quality.py) observed
these shapes across gpt-5-mini and gpt-5; strict-mode tool binding should
eliminate the non-canonical ones in production, but this normalizer is
the safety net.
"""
from __future__ import annotations

from typing import Any

_ENVELOPE_KEYS = ("surfaceUpdate", "beginRendering", "dataModelUpdate", "deleteSurface")


def normalize_envelope_args(args: Any) -> list[dict] | None:
"""Return a canonical envelope list, or None if `args` is unrecognised."""
if not isinstance(args, dict) or not args:
return None
# (a) canonical {envelopes: [...]}
envelopes = args.get("envelopes")
if isinstance(envelopes, list):
return envelopes
# (b) singular {envelope: [...]} typo
envelope = args.get("envelope")
if isinstance(envelope, list):
return envelope
keys = list(args.keys())
# (c) positional keys {"0": env, "1": env, ...}
if keys and all(isinstance(k, str) and k.isdigit() for k in keys):
return [args[k] for k in sorted(keys, key=int)]
# (d) flat single envelope
if any(k in args for k in _ENVELOPE_KEYS):
return [args]
return None
64 changes: 64 additions & 0 deletions examples/chat/python/src/streaming/envelope_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# SPDX-License-Identifier: MIT
"""Parent-LLM-bound tool that emits A2UI v1 envelopes as structured tool
arguments. Replaces the old two-LLM `generate_a2ui_schema` flow (parent
calls a sub-LLM that produces envelopes); the parent now emits envelopes
directly so the natural token stream IS the surface-rendering stream.

The Pydantic schemas enable OpenAI strict-mode validation when the tool
is bound via `bind_tools([..., render_a2ui_surface], strict=True)`.
"""
from __future__ import annotations

import json
from typing import Optional

from langchain_core.tools import tool
from pydantic import BaseModel, Field


class SurfaceUpdate(BaseModel):
"""Component-tree envelope. Required first envelope per turn."""
surfaceId: str = Field(description="Stable identifier for this surface.")
components: list[dict] = Field(
description="Component tree as a list of {id, type, props} objects."
)


class BeginRendering(BaseModel):
"""Render-start envelope. Required; identifies the root component."""
surfaceId: str
root: str = Field(description="Component id of the surface root.")
styles: Optional[dict] = None


class DataModelUpdate(BaseModel):
"""Initial state envelope. Optional; one per state path the surface binds to."""
surfaceId: str
path: Optional[str] = None
contents: list[dict] = Field(
description="Entries: {key, valueString|valueNumber|valueBoolean|valueMap}."
)


class A2uiEnvelope(BaseModel):
"""Single A2UI v1 envelope. Exactly one of the three discriminators
is set per envelope."""
surfaceUpdate: Optional[SurfaceUpdate] = None
beginRendering: Optional[BeginRendering] = None
dataModelUpdate: Optional[DataModelUpdate] = None


@tool
def render_a2ui_surface(envelopes: list[A2uiEnvelope]) -> str:
"""Render a UI surface using A2UI v1 envelopes. Emit:
- exactly one `surfaceUpdate` (component tree),
- exactly one `beginRendering` (root reference),
- zero or more `dataModelUpdate` entries (initial state).

Envelope order in this call should be: surfaceUpdate, beginRendering,
then any dataModelUpdate entries (so the surface mounts and per-component
placeholders show before initial state arrives).
"""
if not envelopes:
raise ValueError("render_a2ui_surface requires at least one envelope")
return json.dumps([e.model_dump(exclude_none=True) for e in envelopes])
31 changes: 31 additions & 0 deletions examples/chat/python/tests/test_envelope_normalizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Parity tests with libs/chat/src/lib/a2ui/envelope-normalizer.spec.ts."""
import pytest

from src.streaming.envelope_normalizer import normalize_envelope_args


class TestNormalizeEnvelopeArgs:
def test_canonical_envelopes_shape(self):
args = {"envelopes": [{"surfaceUpdate": {"surfaceId": "s", "components": []}}]}
assert normalize_envelope_args(args) == args["envelopes"]

def test_singular_envelope_typo_shape(self):
args = {"envelope": [{"beginRendering": {"surfaceId": "s", "root": "r"}}]}
assert normalize_envelope_args(args) == args["envelope"]

def test_positional_keys_unflattened_in_numeric_order(self):
e1 = {"surfaceUpdate": {"surfaceId": "s", "components": []}}
e2 = {"beginRendering": {"surfaceId": "s", "root": "r"}}
args = {"1": e2, "0": e1}
assert normalize_envelope_args(args) == [e1, e2]

def test_flat_single_envelope_wrapped_in_list(self):
args = {"surfaceUpdate": {"surfaceId": "s", "components": []}}
assert normalize_envelope_args(args) == [args]

def test_empty_object_returns_none(self):
assert normalize_envelope_args({}) is None

def test_non_dict_input_returns_none(self):
assert normalize_envelope_args(None) is None
assert normalize_envelope_args("x") is None
59 changes: 59 additions & 0 deletions examples/chat/python/tests/test_envelope_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""Tests for the parent-emits-envelopes tool used by the GenUI flow."""
import json

import pytest

from src.streaming.envelope_tool import (
SurfaceUpdate,
BeginRendering,
DataModelUpdate,
A2uiEnvelope,
render_a2ui_surface,
)


class TestPydanticEnvelopeModels:
def test_surface_update_round_trips(self):
m = SurfaceUpdate(surfaceId="s1", components=[{"id": "c", "type": "text", "props": {}}])
assert m.surfaceId == "s1"
assert m.components == [{"id": "c", "type": "text", "props": {}}]

def test_begin_rendering_required_fields(self):
m = BeginRendering(surfaceId="s1", root="c")
assert m.root == "c"

def test_data_model_update_path_is_optional(self):
m = DataModelUpdate(surfaceId="s1", contents=[{"key": "k", "valueString": "v"}])
assert m.path is None

def test_a2ui_envelope_accepts_surface_update_field(self):
e = A2uiEnvelope(surfaceUpdate={"surfaceId": "s", "components": []})
assert e.surfaceUpdate is not None
assert e.beginRendering is None
assert e.dataModelUpdate is None


class TestRenderA2uiSurfaceTool:
def test_serializes_envelopes_to_json_string(self):
envelopes = [
{"surfaceUpdate": {"surfaceId": "s", "components": [{"id": "c", "type": "text", "props": {}}]}},
{"beginRendering": {"surfaceId": "s", "root": "c"}},
]
result = render_a2ui_surface.invoke({"envelopes": envelopes})
parsed = json.loads(result)
assert isinstance(parsed, list)
assert len(parsed) == 2
assert "surfaceUpdate" in parsed[0]
assert "beginRendering" in parsed[1]

def test_strips_none_fields_via_exclude_none(self):
envelopes = [{"surfaceUpdate": {"surfaceId": "s", "components": []}}]
result = render_a2ui_surface.invoke({"envelopes": envelopes})
parsed = json.loads(result)
# beginRendering / dataModelUpdate are None on this envelope and should be stripped.
assert "beginRendering" not in parsed[0]
assert "dataModelUpdate" not in parsed[0]

def test_raises_on_empty_envelopes_list(self):
with pytest.raises(ValueError):
render_a2ui_surface.invoke({"envelopes": []})
36 changes: 28 additions & 8 deletions examples/chat/python/tests/test_graph_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ def test_state_graph_topology_unchanged_after_research():

@pytest.mark.smoke
def test_genui_tools_exist():
from src.graph import generate_a2ui_schema, generate_json_render_spec
assert generate_a2ui_schema.name == "generate_a2ui_schema"
from src.graph import render_a2ui_surface, generate_json_render_spec
assert render_a2ui_surface.name == "render_a2ui_surface"
assert generate_json_render_spec.name == "generate_json_render_spec"


Expand Down Expand Up @@ -174,19 +174,19 @@ def test_replaces_tool_call_ai_in_place_same_id(self):
tool_call_ai = AIMessage(
id="ai-1",
content=[
{"type": "function_call", "name": "generate_a2ui_schema",
{"type": "function_call", "name": "render_a2ui_surface",
"arguments": '{"request":"r"}'}
],
tool_calls=[{
"id": "call_1",
"name": "generate_a2ui_schema",
"name": "render_a2ui_surface",
"args": {"request": "r"},
"type": "tool_call",
}],
)
tool_msg = ToolMessage(
tool_call_id="call_1",
name="generate_a2ui_schema",
name="render_a2ui_surface",
content='[{"surfaceUpdate":{"surfaceId":"s1","components":[]}},'
'{"beginRendering":{"surfaceId":"s1","root":""}}]',
)
Expand All @@ -207,7 +207,7 @@ def test_replaces_tool_call_ai_in_place_same_id(self):
# Content carries the wrapped surface payload.
assert "---a2ui_JSON---" in replacement_ai.content
# tool_calls is preserved so detection (frontend isGenuiTurn) still fires.
assert any(tc.get("name") == "generate_a2ui_schema" for tc in replacement_ai.tool_calls)
assert any(tc.get("name") == "render_a2ui_surface" for tc in replacement_ai.tool_calls)

def test_beginRendering_envelope_ordering(self):
"""emit reorders the wrapped envelopes so beginRendering lands
Expand All @@ -219,14 +219,14 @@ def test_beginRendering_envelope_ordering(self):
content=[],
tool_calls=[{
"id": "call_2",
"name": "generate_a2ui_schema",
"name": "render_a2ui_surface",
"args": {"request": "r"},
"type": "tool_call",
}],
)
tool_msg = ToolMessage(
tool_call_id="call_2",
name="generate_a2ui_schema",
name="render_a2ui_surface",
content='['
'{"surfaceUpdate":{"surfaceId":"s","components":[]}},'
'{"dataModelUpdate":{"surfaceId":"s","contents":[]}},'
Expand All @@ -252,3 +252,23 @@ def test_beginRendering_envelope_ordering(self):
# The remaining dataModelUpdate envelopes follow.
assert "dataModelUpdate" in parsed[2]
assert "dataModelUpdate" in parsed[3]


class TestParentEmitsEnvelopes:
def test_render_a2ui_surface_is_bound_for_a2ui_mode(self):
"""Sanity: the parent LLM's generate node binds render_a2ui_surface
when gen_ui_mode='a2ui'. We import the graph module and check the
tools registered on ToolNode."""
from src.graph import _builder

tool_node = _builder.nodes["tools"].runnable
# ToolNode keeps a `.tools_by_name` dict
tool_names = list(tool_node.tools_by_name.keys())
assert "render_a2ui_surface" in tool_names

def test_generate_a2ui_schema_tool_is_removed(self):
"""The old sub-LLM-dispatching tool must be removed from the graph."""
from src.graph import _builder
tool_node = _builder.nodes["tools"].runnable
tool_names = list(tool_node.tools_by_name.keys())
assert "generate_a2ui_schema" not in tool_names
Loading