Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
name: Release

# Triggered when a version tag is pushed, e.g.:
# git tag v0.7.5.0 && git push origin v0.7.5.0
on:
push:
tags:
- "v*"

permissions:
contents: read

jobs:
# 1. Build the sdist + wheel and verify the tag matches pyproject version.
build:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Install uv
uses: astral-sh/setup-uv@v3
with:
version: "latest"

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.13"

- name: Verify tag matches pyproject version
run: |
TAG="${GITHUB_REF_NAME#v}"
PKG=$(uv version --short 2>/dev/null || python -c "import tomllib;print(tomllib.load(open('pyproject.toml','rb'))['project']['version'])")
echo "Tag version: $TAG"
echo "Package version: $PKG"
if [ "$TAG" != "$PKG" ]; then
echo "::error::Tag ($TAG) does not match pyproject.toml version ($PKG)."
exit 1
fi

- name: Build sdist and wheel
run: uv build

- name: Check distribution metadata
run: uvx twine check dist/*

- name: Upload build artifacts
uses: actions/upload-artifact@v4
with:
name: dist
path: dist/

# 2. Publish to PyPI via Trusted Publishing (OIDC, no API token needed).
pypi:
needs: build
runs-on: ubuntu-latest
environment:
name: pypi
url: https://pypi.org/project/10xscale-agentflow/
permissions:
id-token: write # required for trusted publishing
steps:
- name: Download build artifacts
uses: actions/download-artifact@v4
with:
name: dist
path: dist/

- name: Publish to PyPI
uses: pypa/gh-action-pypi-publish@release/v1

# 3. Create the GitHub Release with auto-generated notes and attach artifacts.
github-release:
needs: pypi
runs-on: ubuntu-latest
permissions:
contents: write # required to create a release
steps:
- name: Download build artifacts
uses: actions/download-artifact@v4
with:
name: dist
path: dist/

- name: Create GitHub Release
uses: softprops/action-gh-release@v2
with:
generate_release_notes: true
files: dist/*
fail_on_unmatched_files: true
6 changes: 6 additions & 0 deletions agentflow/core/graph/utils/invoke_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from agentflow.core.graph.edge import Edge
from agentflow.core.graph.node import Node
from agentflow.core.graph.utils.utils import (
calculate_token_usage,
call_realtime_sync,
get_next_node,
load_or_create_state,
Expand Down Expand Up @@ -479,8 +480,12 @@ async def invoke(
final_state, messages = await self._execute_graph(state, config)
logger.info("Graph execution completed with %d final messages", len(messages))

# Calculate token usage
token_usage = calculate_token_usage(messages)

event.event_type = EventType.END
event.metadata["status"] = "Graph execution completed"
event.metadata.update(token_usage)
event.data["state"] = final_state.model_dump()
event.data["messages"] = [m.model_dump() for m in messages] if messages else []
publish_event(event)
Expand All @@ -489,6 +494,7 @@ async def invoke(
final_state,
messages,
response_granularity,
token_usage=token_usage,
)
except Exception as e:
logger.exception("Graph execution failed: %s", e)
Expand Down
47 changes: 34 additions & 13 deletions agentflow/core/graph/utils/stream_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
InterruptConfigMixin,
)
from .utils import (
calculate_token_usage,
call_realtime_sync,
get_next_node,
load_or_create_state,
Expand Down Expand Up @@ -589,6 +590,7 @@ async def _execute_graph( # noqa: PLR0912, PLR0915
event.metadata["is_context_trimmed"] = is_context_trimmed
publish_event(event)

# Include messages list for token calculation in stream method
yield StreamChunk(
event=StreamEvent.UPDATES,
state=state,
Expand All @@ -599,6 +601,8 @@ async def _execute_graph( # noqa: PLR0912, PLR0915
"max_steps": max_steps,
"is_context_trimmed": is_context_trimmed,
"reason": "Graph execution completed successfully",
# Internal: messages from current run for token calculation
"_messages": messages,
},
thread_id=config.get("thread_id"),
run_id=config.get("run_id"),
Expand Down Expand Up @@ -747,8 +751,19 @@ async def stream(
logger.debug("Beginning graph execution")
result = self._execute_graph(state, input_data, config)

# Track messages from current run for token calculation
current_run_messages = []

# Stream results based on response granularity
async for chunk in result:
# Extract messages from final completion chunk (internal use only)
if (
chunk.event == StreamEvent.UPDATES
and chunk.data
and chunk.data.get("status") == "graph_invoked"
):
current_run_messages = chunk.data.pop("_messages", [])

match response_granularity:
case ResponseGranularity.FULL:
yield chunk
Expand All @@ -763,6 +778,9 @@ async def stream(
time_taken = time.time() - start_time
logger.info("Graph execution finished in %.2f seconds", time_taken)

# Calculate token usage from current run messages only
token_usage = calculate_token_usage(current_run_messages)

event.event_type = EventType.END
event.metadata.update(
{
Expand All @@ -772,19 +790,22 @@ async def stream(
"current_node": state.execution_meta.current_node,
"is_interrupted": state.is_interrupted(),
"total_messages": len(state.context) if state.context else 0,
**token_usage,
}
)
publish_event(event)
yield StreamChunk(
event=StreamEvent.UPDATES,
state=state,
data={
"status": "graph_invoked",
"reason": "Graph execution finished",
"time_taken": time_taken,
"is_interrupted": state.is_interrupted(),
"total_messages": len(state.context) if state.context else 0,
},
thread_id=config.get("thread_id"),
run_id=config.get("run_id"),
)
if response_granularity == ResponseGranularity.FULL:
yield StreamChunk(
event=StreamEvent.UPDATES,
state=state,
data={
"status": "graph_invoked",
"reason": "Graph execution finished",
"time_taken": time_taken,
"is_interrupted": state.is_interrupted(),
"total_messages": len(state.context) if state.context else 0,
**token_usage,
},
thread_id=config.get("thread_id"),
run_id=config.get("run_id"),
)
67 changes: 63 additions & 4 deletions agentflow/core/graph/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ async def parse_response(
state: AgentState,
messages: list[Message],
response_granularity: ResponseGranularity = ResponseGranularity.LOW,
token_usage: dict[str, int] | None = None,
) -> dict[str, Any]:
"""Parse and format execution response based on specified granularity level.

Expand Down Expand Up @@ -83,19 +84,20 @@ async def parse_response(
match response_granularity:
case ResponseGranularity.FULL:
# Return full state and messages
return {"state": state, "messages": messages}
return {"state": state, "messages": messages, "token_usage": token_usage}
case ResponseGranularity.PARTIAL:
# Return state and summary of messages
return {
"context": state.context,
"summary": state.context_summary,
"message": messages,
"messages": messages,
"token_usage": token_usage,
}
case ResponseGranularity.LOW:
# Return all messages from state context
return {"messages": messages}
return {"messages": messages, "token_usage": token_usage}

return {"messages": messages}
return {"messages": messages, "token_usage": token_usage}


# Utility to update only provided fields in state
Expand Down Expand Up @@ -577,3 +579,60 @@ async def sync_data(
await checkpointer.aput_messages(config, messages)

return is_context_trimmed


def calculate_token_usage(messages: list[Message]) -> dict[str, int]:
"""Calculate total token usage from all messages in the state.

Aggregates token usage across all messages in the state's context,
including input tokens (prompt_tokens), output tokens (completion_tokens),
and reasoning tokens.

Args:
messages: The list of messages containing token usage information.

Returns:
Dictionary containing:
- total_input_tokens: Total prompt/input tokens used
- total_output_tokens: Total completion/output tokens used
- total_reasoning_tokens: Total reasoning tokens used
- total_tokens: Sum of input and output tokens

Example:
```python
usage = calculate_token_usage(state)
# Returns: {
# "total_input_tokens": 1500,
# "total_output_tokens": 800,
# "total_reasoning_tokens": 200,
# "total_tokens": 2300
# }
```
"""
total_input_tokens = 0
total_output_tokens = 0
total_reasoning_tokens = 0

if not messages:
return {
"total_input_tokens": total_input_tokens,
"total_output_tokens": total_output_tokens,
"total_reasoning_tokens": total_reasoning_tokens,
"total_tokens": total_input_tokens + total_output_tokens,
}

for message in messages:
if message.usages:
total_input_tokens += message.usages.prompt_tokens
total_output_tokens += message.usages.completion_tokens
total_reasoning_tokens += message.usages.reasoning_tokens

# Note: total_tokens is input + output only (reasoning tracked separately)
total_tokens = total_input_tokens + total_output_tokens

return {
"total_input_tokens": total_input_tokens,
"total_output_tokens": total_output_tokens,
"total_reasoning_tokens": total_reasoning_tokens,
"total_tokens": total_tokens,
}
37 changes: 18 additions & 19 deletions agentflow/runtime/protocols/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,22 @@
``agentflow.runtime.protocols.a2a``.
"""

from . import a2a
from .a2a import (
AgentFlowExecutor,
build_a2a_app,
create_a2a_client_node,
create_a2a_server,
delegate_to_a2a_agent,
make_agent_card,
)
# from . import a2a
# from .a2a import (
# AgentFlowExecutor,
# build_a2a_app,
# create_a2a_client_node,
# create_a2a_server,
# delegate_to_a2a_agent,
# make_agent_card,
# )


__all__ = [
"AgentFlowExecutor",
"a2a",
"build_a2a_app",
"create_a2a_client_node",
"create_a2a_server",
"delegate_to_a2a_agent",
"make_agent_card",
]
# __all__ = [
# "AgentFlowExecutor",
# "a2a",
# "build_a2a_app",
# "create_a2a_client_node",
# "create_a2a_server",
# "delegate_to_a2a_agent",
# "make_agent_card",
# ]
Loading
Loading