Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions Need_TO_INCLUDE_IN_DOCS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Need to Include in Docs

Production-readiness work completed in this pass. Items below introduce or change
user-facing behavior and should be documented.

## Type checking (PEP 561)
- The package now ships a `py.typed` marker. Downstream `mypy`/`pyright` will
type-check against Agentflow's annotations.

## Configurable LLM call timeout
- All LLM clients now apply a default request timeout (`600s`) so a stalled
provider cannot hang a run indefinitely.
- Override globally via the `AGENTFLOW_LLM_TIMEOUT` environment variable
(seconds), or programmatically:
- `from agentflow.core.llm import set_default_llm_timeout, get_default_llm_timeout, DEFAULT_LLM_TIMEOUT_SECONDS`
- `set_default_llm_timeout(120.0)` / `set_default_llm_timeout(None)` to reset.
- An explicit per-client `timeout=` still takes precedence.

## CompiledGraph async context manager
- `CompiledGraph` supports `async with`:
```python
async with await build_and_compile_graph() as graph:
await graph.ainvoke(input_data)
# aclose() runs automatically on exit, even if the body raises
```
- `aclose()` is now idempotent (second call returns `{"status": "already_closed"}`).

## Circuit breaker for LLM calls (opt-in)
- Complements retry + `fallback_models`: once a `(provider, model)` fails
`circuit_breaker_threshold` times in a row, its circuit opens and that target
is skipped (straight to the next fallback) for `circuit_breaker_reset_timeout`
seconds, instead of being retried on every call.
- Configure via `RetryConfig`:
- `circuit_breaker_enabled: bool = False`
- `circuit_breaker_threshold: int = 5`
- `circuit_breaker_reset_timeout: float = 30.0`

## Secret redaction for logs
- New helpers in `agentflow.utils`:
- `mask_secrets(text)` — redacts API keys, `Bearer` tokens, `key=value`
secrets, and signed-URL credential query params.
- `SecretRedactionFilter` — a `logging.Filter`; add it to a handler to cover
all loggers that propagate to it.
- `install_secret_redaction(logger_name="agentflow")` — convenience installer.

## ConsolePublisher logging option
- `ConsolePublisher` is a dev/debug, opt-in publisher (use a real transport in
production). It writes to stdout by default; pass `{"use_logger": True}` to
route events through the `agentflow.publisher` logger instead of stdout.

## Project / repo
- Dependencies now have version bounds (e.g. `pydantic>=2,<3`).
- mypy runs in pre-commit/CI (phased adoption; see `CONTRIBUTING.md`).
- Test coverage gate raised to 80%.
- Added `SECURITY.md` and `CONTRIBUTING.md`.
- Added Dependabot config and a CodeQL workflow.
70 changes: 70 additions & 0 deletions Plan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
Agentflow Core Python SDK — Production Readiness Review
Scope: the 10xscale-agentflow package (v0.7.5.1) in agentflow/. Overall this is a mature, well-structured framework with strong CI/CD release automation, a comprehensive test suite (138 files), a clean exception hierarchy, and genuinely good OTEL support. The gaps below are what stand between it and "production-grade SDK that other teams depend on."

Blockers (fix before claiming production-stable)
1. No py.typed marker — the package ships untyped to consumers.
Confirmed missing: no agentflow/agentflow/py.typed and no package-data rule in pyproject.toml. Despite extensive internal type hints, PEP 561 means downstream mypy/pyright see Any for every Agentflow symbol. For a "Production/Stable"-classified library this is the single highest-leverage fix: add the empty marker plus a [tool.setuptools.package-data] entry.

2. Core dependencies are unpinned.
pyproject.toml:51,64-66 lists pydantic, PyYAML, python-dotenv, and pydantic-ai with no version bounds at all. A Pydantic v2→v3 or breaking pydantic-ai release will silently break installs in the field. At minimum set lower bounds (pydantic>=2,<3). The uv.lock protects this repo's own builds but does nothing for pip install 10xscale-agentflow users.

3. README claims native Anthropic/Claude support that does not exist.
README headline (lines ~17, 24, 190) advertises native Anthropic and ANTHROPIC_API_KEY, but client_factory.py detect_provider only resolves google or openai. A user running detect_provider("claude-3-opus") gets openai and a construction failure. Either remove the claim or document it as "via OpenAI-compatible endpoint only." (Note: the README import-path drift that CLAUDE.md warns about appears to have been fixed — examples now use correct agentflow.core.* paths. CLAUDE.md's "broken examples" note is itself stale.)

High priority
4. No default timeout on LLM calls.
client_factory.py:34 accepts a timeout kwarg but enforces no default, and there is no top-level timeout wrapping invoke/ainvoke. A hung provider connection blocks indefinitely. Add a sane default client timeout and a per-request ceiling.

5. mypy is configured but never runs.
pyproject.toml has [tool.mypy], but it is absent from both .pre-commit-config.yaml and .github/workflows/ci.yml (confirmed: zero mypy references in either). It is dead config. Either wire it into CI or stop advertising type safety. This pairs directly with #1.

6. CI tests a single Python version on a single OS.
ci.yml runs only Python 3.13 on ubuntu-latest, yet the package claims >=3.12 and classifies 3.12/3.13. 3.12 is untested. Add a 3.12/3.13 matrix; consider macOS.

Medium priority
7. Silent exception swallowing in media + callback paths. Broad except Exception with debug-only logging in media_resolver.py (e.g. lines 100, 193, 234) and throughout callbacks.py hides real failures in production. Narrow these or at least log at warning with context.

8. No __aenter__/__aexit__ on CompiledGraph. Cleanup relies on callers remembering aclose(). Publisher backends (Kafka/RabbitMQ/Redis) may leak connections if shutdown raises. Add the async-context-manager protocol to the top-level graph.

9. Missing governance/policy files. No SECURITY.md (no vuln disclosure path) and no CONTRIBUTING.md. Both are table-stakes for an OSS framework inviting external dependence.

10. No migration guide or deprecation policy. The 0.5→0.7 flat→nested import restructure (agentflow.state → agentflow.core.state) has no MIGRATION.md, and there's no structured deprecation mechanism. changelogs.md is unstructured (no dates, no Keep-a-Changelog format). For a stable API, document the deprecation contract.

11. ConsolePublisher uses print(). console_publisher.py:61 writes to stdout. Acceptable for a console publisher by design, but worth confirming it never runs by default in a server context.

Lower priority / nice-to-have
No circuit breaker to complement fallback_models — repeated failures to a dead provider retry every call.
No secret masking in logs; API keys/auth headers could surface in debug output.
No Dependabot/CodeQL beyond Bandit.
Coverage gate at 70% is low for a production framework; 80%+ is a better bar.
normal_tests/ (12 files) excluded from discovery with no documented rationale.
What's already solid (don't regress)
Release automation: tag-gated, OIDC trusted PyPI publishing with version verification (release.yml).
Exception hierarchy with error_code/context/to_dict() and an explicit TransientStorageError for retryability.
OTEL tracing with gen_ai semantic conventions and three observability levels.
Proper library logging (getLogger(__name__) + NullHandler, no stdout pollution in the core path).
Parallel tool execution genuinely works (invoke_node_handler.py:182).
Comprehensive __all__ exports across all subpackages.
Suggested order of attack
py.typed + dependency pinning + fix the Anthropic README claim (an afternoon; ships in next patch).
LLM timeouts + wire mypy into CI + 3.12 matrix.
SECURITY.md, CONTRIBUTING.md, MIGRATION.md.
Tighten exception handling and add the graph async-context-manager protocol.
Items 1-3 are mostly mechanical and would move the package materially toward production-grade. Want me to implement the quick wins in #1 (add py.typed, set dependency bounds, correct the README)?




4. lets add keep a default, using env or globally we can change it
5. lets add
6. fine for now, will focus on later
7. lets fix it
8. lets add
9. lets add
10. skip for now
11. add it

No circuit breaker to complement fallback_models — repeated failures to a dead provider retry every call.
No secret masking in logs; API keys/auth headers could surface in debug output.
No Dependabot/CodeQL beyond Bandit.
Coverage gate at 70% is low for a production framework; 80%+ is a better bar.
100 changes: 99 additions & 1 deletion agentflow/core/graph/compiled_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from agentflow.storage.checkpointer.base_checkpointer import BaseCheckpointer
from agentflow.storage.store.base_store import BaseStore
from agentflow.utils import (
CallbackManager,
ResponseGranularity,
)
from agentflow.utils.background_task_manager import BackgroundTaskManager
Expand Down Expand Up @@ -275,6 +276,7 @@ async def ainvoke(
Returns:
Response dict based on granularity
"""
self._guard_not_realtime()
cfg = self._prepare_config(config, is_stream=False)

return await self._invoke_handler.invoke(
Expand Down Expand Up @@ -465,7 +467,7 @@ async def astream(
Yields:
Message objects with incremental content
"""

self._guard_not_realtime()
cfg = self._prepare_config(config, is_stream=True)

async for chunk in self._stream_handler.stream(
Expand Down Expand Up @@ -527,6 +529,102 @@ def attach_remote_tools(
node_name,
)

# ------------------------------------------------------------------ #
# Realtime runtime (audio-to-audio). A separate runtime from the
# super-step invoke/stream loop: the live agent owns the turn loop.
# ------------------------------------------------------------------ #
def _find_live_nodes(self) -> list[tuple[str, Node]]:
from agentflow.core.realtime.live_agent import LiveAgent

return [
(name, node)
for name, node in self._state_graph.nodes.items()
if isinstance(node.func, LiveAgent)
]

def _guard_not_realtime(self) -> None:
"""Forcing rule: a graph containing a LiveAgent must use arealtime()."""
if self._find_live_nodes():
raise RuntimeError(
"This graph contains a LiveAgent; use .arealtime() / .realtime() instead of "
"invoke/ainvoke/stream/astream."
)

async def arealtime(
self,
input_queue: Any,
config: dict[str, Any] | None = None,
state: AgentState | None = None,
) -> AsyncIterator[Any]:
"""Run the graph's realtime (audio) session, yielding normalized RealtimeEvents.

Forcing rule: the graph must contain exactly one LiveAgent (the root controller);
ordinary turn-based graphs must use invoke/stream.
"""
live = self._find_live_nodes()
if not live:
raise RuntimeError(
"arealtime() requires a graph rooted at a LiveAgent (e.g. AudioAgent); "
"this graph has none. Use invoke/stream for turn-based graphs."
)
if len(live) > 1:
raise RuntimeError(
"Only one LiveAgent is allowed per realtime run in v1 "
f"(found {len(live)}: {[name for name, _ in live]})."
)

name, node = live[0]
agent = node.func
agent._node_name = name
cfg = self._prepare_config(config, is_stream=True)
callback_manager = InjectQ.get_instance().try_get(CallbackManager)
context_manager = self._state_graph._context_manager
run_state = state if state is not None else (self._state or AgentState())

async for event in agent.arun(
input_queue,
cfg,
run_state,
checkpointer=self._checkpointer,
callback_manager=callback_manager,
context_manager=context_manager,
):
yield event

def realtime(
self,
input_queue: Any,
config: dict[str, Any] | None = None,
state: AgentState | None = None,
) -> Generator[Any]:
"""Synchronous wrapper over :meth:`arealtime` for non-async consumers.

Must be called from a thread with no running event loop; from inside an async
context (FastAPI handler, Jupyter), use :meth:`arealtime` directly.
"""
try:
asyncio.get_running_loop()
except RuntimeError:
pass # no running loop: safe to drive a private one below
else:
raise RuntimeError(
"realtime() (sync) cannot be called from a running event loop; "
"await arealtime() instead."
)

agen = self.arealtime(input_queue, config, state)
loop = asyncio.new_event_loop()
try:
while True:
try:
yield loop.run_until_complete(agen.__anext__())
except StopAsyncIteration:
break
finally:
with contextlib.suppress(Exception):
loop.run_until_complete(agen.aclose())
loop.close()

async def aclose(self) -> dict[str, Any]: # noqa: PLR0915
"""
Close the graph and release all resources gracefully.
Expand Down
51 changes: 51 additions & 0 deletions agentflow/core/realtime/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Realtime (audio-to-audio) runtime primitives.

Provider-neutral contracts (:data:`RealtimeEvent`, :class:`RealtimeConfig`,
:class:`RealtimeClient`), the upstream :class:`LiveInputQueue`, and the Gemini Live
provider client. Provider SDK imports are lazy, so importing this package never pulls
the ``realtime`` optional dependency.
"""

from .base import (
AgentChangedEvent,
AudioDeltaEvent,
ErrorEvent,
GoAwayEvent,
InputTranscriptEvent,
InterruptedEvent,
OutputTranscriptEvent,
RealtimeClient,
RealtimeConfig,
RealtimeEvent,
SessionUpdateEvent,
ToolCallEvent,
ToolResultEvent,
TurnCompleteEvent,
VADConfig,
)
from .providers.gemini_live import GeminiLiveClient, normalize_message
from .queue import LiveInput, LiveInputKind, LiveInputQueue


__all__ = [
"AgentChangedEvent",
"AudioDeltaEvent",
"ErrorEvent",
"GeminiLiveClient",
"GoAwayEvent",
"InputTranscriptEvent",
"InterruptedEvent",
"LiveInput",
"LiveInputKind",
"LiveInputQueue",
"OutputTranscriptEvent",
"RealtimeClient",
"RealtimeConfig",
"RealtimeEvent",
"SessionUpdateEvent",
"ToolCallEvent",
"ToolResultEvent",
"TurnCompleteEvent",
"VADConfig",
"normalize_message",
]
Loading
Loading