This document is a reference for all LLM providers in the ECS Agent framework.
The provider stack is organized around three linked concepts: a canonical model ID (provider/model) that carries both routing provider and API model name, a ProviderConfig that defines endpoint/auth/protocol settings, and event-driven accounting that measures usage and cache behavior. The Quick Start below shows the full end-to-end flow.
import os
from ecs_agent.accounting.subscriber import AccountingSubscriber
from ecs_agent.core import World
from ecs_agent.providers.registry import ProviderRegistry, get_llm_provider
# 1) Load provider configs from TOML file
registry = ProviderRegistry.from_toml("providers.toml")
# or inline:
# registry = ProviderRegistry.from_dict({
# "aliyun": {
# "base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
# "api_format": "openai_chat_completions",
# "api_key_env": "LLM_API_KEY",
# }
# })
# 2) One call: model ID → correct provider type (determined by api_format)
provider = get_llm_provider("aliyun/qwen3.5-flash", registry=registry)
# 3) Attach accounting to the World's event bus
world = World()
subscriber = AccountingSubscriber()
subscriber.subscribe(world.event_bus)ProviderRegistry maps provider IDs to endpoint/auth/protocol configs. get_llm_provider uses it to construct the right provider type automatically.
[providers.aliyun]
base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
api_format = "openai_chat_completions"
api_key_env = "LLM_API_KEY"
[providers.aliyun-responses]
base_url = "https://dashscope.aliyuncs.com/api/v2/apps/protocols/compatible-mode/v1"
api_format = "openai_responses"
api_key_env = "LLM_API_KEY"
[providers.moonshot]
base_url = "https://dashscope.aliyuncs.com/apps/anthropic"
api_format = "anthropic_messages"
api_key_env = "LLM_API_KEY"
default_max_tokens = 8192from ecs_agent.providers.registry import ProviderRegistry, get_llm_provider
# From TOML file
registry = ProviderRegistry.from_toml("providers.toml")
# From dict (useful in tests)
registry = ProviderRegistry.from_dict({
"aliyun": {
"base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1",
"api_format": "openai_chat_completions",
"api_key_env": "LLM_API_KEY",
}
})# Resolves provider ID → ProviderEntry → ProviderConfig → correct provider type
provider = get_llm_provider("aliyun/qwen3.5-flash", registry=registry)
# Override API key at call time
provider = get_llm_provider("aliyun/qwen3.5-flash", registry=registry, api_key="sk-...")API key resolution order: explicit api_key argument → ProviderEntry.api_key → env var named by ProviderEntry.api_key_env.
Dispatch by api_format:
api_format |
Provider type returned |
|---|---|
openai_chat_completions |
OpenAIProvider |
openai_responses |
OpenAIProvider (Responses API) |
anthropic_messages |
ClaudeProvider |
openai_embeddings |
raises ValueError — use get_embedding_provider |
openai_files |
raises ValueError — use get_file_service |
| Field | Type | Default | Description |
|---|---|---|---|
base_url |
str |
(required) | Endpoint base URL (trailing slash stripped) |
api_format |
ApiFormat |
(required) | Wire protocol; parsed eagerly |
api_key |
str | None |
None |
Literal API key (not shown in repr) |
api_key_env |
str | None |
None |
Env var name to read the key from |
extra_headers |
dict[str, str] |
{} |
Extra HTTP headers |
timeout |
float | None |
None |
Global timeout override (seconds) |
default_max_tokens |
int |
4096 |
Used as max_tokens for ClaudeProvider |
ProviderConfig holds all connection parameters for a provider endpoint. ApiFormat selects the wire protocol.
from ecs_agent.providers.config import ProviderConfig, ApiFormat
# OpenAI-compatible Chat Completions (most common)
chat_config = ProviderConfig(
provider_id="aliyun",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
api_key="your-api-key",
api_format=ApiFormat.OPENAI_CHAT_COMPLETIONS,
)
# OpenAI-compatible Responses API
responses_config = ProviderConfig(
provider_id="aliyun",
base_url="https://dashscope.aliyuncs.com/api/v2/apps/protocols/compatible-mode/v1",
api_key="your-api-key",
api_format=ApiFormat.OPENAI_RESPONSES,
)
# Anthropic-compatible Messages API
anthropic_config = ProviderConfig(
provider_id="moonshot",
base_url="https://dashscope.aliyuncs.com/apps/anthropic",
api_key="your-api-key",
api_format=ApiFormat.ANTHROPIC_MESSAGES,
)Available ApiFormat values:
| Value | Wire protocol |
|---|---|
OPENAI_CHAT_COMPLETIONS |
POST /chat/completions (standard OpenAI-compat) |
OPENAI_RESPONSES |
POST /responses (OpenAI Responses API) |
OPENAI_EMBEDDINGS |
POST /embeddings |
OPENAI_FILES |
POST /files (file upload) |
ANTHROPIC_MESSAGES |
POST /v1/messages (Anthropic Messages) |
ProviderConfig fields:
| Field | Type | Default | Description |
|---|---|---|---|
provider_id |
str |
(required) | Logical provider name, e.g. "aliyun" |
base_url |
str |
(required) | API base URL |
api_key |
str |
(required) | Bearer token / API key |
api_format |
ApiFormat |
(required) | Wire protocol selection |
extra_headers |
dict[str, str] |
{} |
Additional HTTP headers |
timeout |
float | None |
None |
Global timeout override (seconds) |
The LLMProvider protocol defines the interface for all language model implementations. It's located in ecs_agent.providers.protocol.
from typing import Any, Protocol, runtime_checkable
from collections.abc import AsyncIterator
from ecs_agent.types import Message, CompletionResult, StreamDelta, ToolSchema
@runtime_checkable
class LLMProvider(Protocol):
async def complete(
self,
messages: list[Message],
tools: list[ToolSchema] | None = None,
stream: bool = False,
response_format: dict[str, Any] | None = None,
) -> CompletionResult | AsyncIterator[StreamDelta]:
...The complete method returns a CompletionResult when stream=False and an AsyncIterator[StreamDelta] when stream=True.
OpenAIProvider is an OpenAI-compatible HTTP provider using httpx.AsyncClient. It works with OpenAI's API as well as compatible alternatives like DashScope, vLLM, or Ollama. Internally it dispatches to explicit chat completions or responses adapters based on the api_format in the ProviderConfig.
from ecs_agent.providers import OpenAIProvider
from ecs_agent.providers.config import ApiFormat, ProviderConfig
# Chat Completions
config = ProviderConfig(
provider_id="openai",
base_url="https://api.openai.com/v1",
api_key="your-api-key",
api_format=ApiFormat.OPENAI_CHAT_COMPLETIONS,
)
provider = OpenAIProvider(
config=config,
model="gpt-4o-mini",
connect_timeout=10.0,
read_timeout=120.0,
write_timeout=10.0,
pool_timeout=10.0,
)
# Responses API
responses_config = ProviderConfig(
provider_id="aliyun",
base_url="https://dashscope.aliyuncs.com/api/v2/apps/protocols/compatible-mode/v1",
api_key="your-api-key",
api_format=ApiFormat.OPENAI_RESPONSES,
)
provider = OpenAIProvider(config=responses_config, model="qwen3.5-flash")The default adapter (ApiFormat.OPENAI_CHAT_COMPLETIONS) sends POST requests to /chat/completions. It handles:
- Multimodal messages:
ImageUrlPartandFileRefPartare serialized to the OpenAI vision format. Text goes inmessage.content. - Streaming: yields
StreamDeltaobjects from SSE chunks; emits a single terminalLLMInvocationEventwith normalizedUsageRecord. - Usage normalization: reads
cached_tokensfrom the OpenAI usage response to populateUsageRecord.cached_input_tokens.
Set api_format=ApiFormat.OPENAI_RESPONSES in the ProviderConfig to activate the Responses API adapter. It sends POST requests to /responses.
For multimodal vision input, build user messages with content= for the text prompt and Message.parts containing ImageUrlPart(url=...) entries. The adapter converts ImageUrlPart into Responses API input_image items automatically (see examples/vision_agent.py for a full runnable example).
Threading state (previous_response_id) is not stored on the provider instance — it lives on an ECS component:
from ecs_agent.components.definitions import ResponsesAPIStateComponent
world.add_component(agent, ResponsesAPIStateComponent(previous_response_id=None))ReasoningSystem reads and writes ResponsesAPIStateComponent automatically on each tick.
from pydantic import BaseModel
from ecs_agent.providers.openai_provider import pydantic_to_response_format
class User(BaseModel):
name: str
age: int
response_format = pydantic_to_response_format(User)
# Result: {'type': 'json_schema', 'json_schema': {'name': 'User', 'schema': {...}, 'strict': True}}ClaudeProvider is an Anthropic-compatible provider with full SSE streaming and cache-aware usage normalization. It communicates with the Anthropic Messages API format using httpx.AsyncClient.
from ecs_agent.providers import ClaudeProvider
from ecs_agent.providers.config import ApiFormat, ProviderConfig
# Direct Anthropic API
config = ProviderConfig(
provider_id="anthropic",
base_url="https://api.anthropic.com",
api_key="your-anthropic-api-key",
api_format=ApiFormat.ANTHROPIC_MESSAGES,
)
provider = ClaudeProvider(config=config, model="claude-3-5-haiku-latest", max_tokens=4096)For Anthropic-compatible endpoints (e.g. Aliyun Kimi):
from ecs_agent.providers import ClaudeProvider
from ecs_agent.providers.config import ApiFormat, ProviderConfig
# Anthropic-compatible endpoint (e.g. Aliyun Kimi)
config = ProviderConfig(
provider_id="moonshot",
base_url="https://dashscope.aliyuncs.com/apps/anthropic",
api_key="your-api-key",
api_format=ApiFormat.ANTHROPIC_MESSAGES,
)
provider = ClaudeProvider(config=config, model="kimi-k2.5")| Parameter | Type | Default | Description |
|---|---|---|---|
config |
ProviderConfig |
(required) | Endpoint/auth/protocol config |
model |
str |
"claude-3-5-haiku-latest" |
Model identifier |
max_tokens |
int |
4096 |
Maximum tokens in response |
connect_timeout |
float |
10.0 |
Connection timeout in seconds |
read_timeout |
float |
120.0 |
Read timeout in seconds |
write_timeout |
float |
10.0 |
Write timeout in seconds |
pool_timeout |
float |
10.0 |
Connection pool timeout in seconds |
supports_vision |
bool |
False |
Enable image input parts |
- Non-streaming: Sends a POST request to
/v1/messageswith the Anthropic message format and returns aCompletionResult. - Streaming: Uses SSE streaming with
content_block_deltaevents. Accumulates text deltas and tool use inputs, yieldingStreamDeltaobjects. - Tool Use: Supports Anthropic's native tool use format, converting between the framework's
ToolSchema/ToolCallformat and Anthropic'stool_useblocks. - Cache-aware usage: Normalizes
cache_creation_input_tokensandcache_read_input_tokensfrom Anthropic responses into the canonicalUsageRecordfields. - Error Handling:
httpx.HTTPStatusErrorandhttpx.RequestErrorare logged and re-raised. - Headers: Sends
x-api-keyandanthropic-version: 2023-06-01headers.
from ecs_agent import RetryProvider, RetryConfig
from ecs_agent.providers import ClaudeProvider
from ecs_agent.providers.config import ApiFormat, ProviderConfig
config = ProviderConfig(
provider_id="anthropic",
base_url="https://api.anthropic.com",
api_key="your-api-key",
api_format=ApiFormat.ANTHROPIC_MESSAGES,
)
provider = RetryProvider(
provider=ClaudeProvider(config=config, model="claude-sonnet-4-20250514"),
config=RetryConfig(max_retries=3),
)FakeProvider is designed for deterministic testing. It returns a sequence of pre-configured responses.
from ecs_agent.providers import FakeProvider
from ecs_agent.types import CompletionResult, Message
responses = [
CompletionResult(message=Message(role="assistant", content="Hello!")),
CompletionResult(message=Message(role="assistant", content="How can I help?"))
]
provider = FakeProvider(responses=responses)
# First call returns "Hello!"
# Second call returns "How can I help?"
# Third call raises IndexError- Sequential: Returns responses in the order they were provided. If the index exceeds the list length, it raises
IndexError. - Streaming: When
stream=True, it yields character-by-characterStreamDeltaobjects. The final delta contains thefinish_reason="stop"and usage information. - Verification: Stores the
last_response_formatfor use in test assertions.
RetryProvider adds resilience to any LLMProvider by wrapping it and implementing retry logic using tenacity.
from ecs_agent.providers import OpenAIProvider
from ecs_agent import RetryProvider
from ecs_agent.types import RetryConfig
from ecs_agent.providers.config import ApiFormat, ProviderConfig
config = ProviderConfig(
provider_id="openai",
base_url="https://api.openai.com/v1",
api_key="your-api-key",
api_format=ApiFormat.OPENAI_CHAT_COMPLETIONS,
)
base_provider = OpenAIProvider(config=config, model="gpt-4o-mini")
retry_config = RetryConfig(
max_attempts=3,
multiplier=1.0,
min_wait=4.0,
max_wait=60.0,
retry_status_codes=(429, 500, 502, 503, 504)
)
provider = RetryProvider(provider=base_provider, retry_config=retry_config)- Non-streaming: Automatically retries on
httpx.HTTPStatusError(for specific status codes) andhttpx.RequestError. It logs retry attempts at theWARNINGlevel. - Streaming: Calls are passed through directly to the underlying provider. Streaming calls are not retried.
- Default Config: If
retry_configis not provided, it uses standard defaults (3 attempts, exponential backoff starting at 4 seconds).
LiteLLMProvider enables access to 100+ LLM providers through a single unified interface via the litellm library. This is an optional dependency — install with pip install litellm.
from ecs_agent.providers import LiteLLMProvider
# OpenAI
provider = LiteLLMProvider(model="gpt-4o", api_key="sk-...")
# Anthropic
provider = LiteLLMProvider(model="claude-sonnet-4-20250514", api_key="sk-ant-...")
# Any litellm-supported model
provider = LiteLLMProvider(model="ollama/llama3", base_url="http://localhost:11434")| Parameter | Type | Default | Description |
|---|---|---|---|
model |
str |
(required) | litellm model identifier (e.g. gpt-4o, claude-sonnet-4-20250514, ollama/llama3) |
api_key |
str | None |
None |
API key (can also be set via environment variables) |
base_url |
str | None |
None |
Custom base URL for self-hosted models |
- Non-streaming: Calls
litellm.acompletion()and returns aCompletionResult. - Streaming: Calls
litellm.acompletion(stream=True)and yieldsStreamDeltaobjects. - Tool Use: Converts between the framework's
ToolSchemaformat and litellm's tool format. - Optional Dependency:
litellmis not a hard dependency. AnImportErrorwith a helpful message is raised if litellm is not installed.
litellm supports 100+ providers including: OpenAI, Anthropic, Google Gemini, AWS Bedrock, Azure OpenAI, Ollama, vLLM, Together AI, Groq, Mistral, and many more.
The framework emits one canonical LLMInvocationEvent per LLM call. Use AccountingSubscriber to capture cost and cache hit-rate metrics.
from ecs_agent.accounting.subscriber import AccountingSubscriber
subscriber = AccountingSubscriber()
subscriber.subscribe(world.event_bus)
# ... run your agent ...
# Query per-provider/model aggregate cache hit-rate
stats = subscriber.get_aggregate_stats("aliyun", "qwen3.5-flash")
if stats is not None:
print(f"Cache hit rate: {stats.hit_rate}") # float 0.0–1.0, or None
print(f"Cache read tokens: {stats.cache_read_tokens}")
print(f"Total prompt tokens: {stats.total_prompt_tokens}")AccountingSubscriber computes token-weighted aggregate hit rate across all observed invocations for a given (provider_id, model) pair:
hit_rate = sum(cache_read_tokens) / sum(total_prompt_tokens)
where total_prompt_tokens = uncached_input_tokens + cache_write_tokens + cache_read_tokens. If the denominator is zero, hit_rate is None.
Pass a custom PricingCatalog to override built-in pricing:
from ecs_agent.accounting.catalog import PricingCatalog, ModelPricing
from ecs_agent.accounting.subscriber import AccountingSubscriber
catalog = PricingCatalog()
catalog.register("aliyun", "qwen3.5-flash", ModelPricing(
input_per_million=0.5,
output_per_million=1.5,
cached_input_per_million=0.1,
cache_write_per_million=None,
))
subscriber = AccountingSubscriber(pricing_catalog=catalog)All provider adapters normalize usage into a canonical UsageRecord:
from ecs_agent.accounting.models import UsageRecord, StreamCompleteness
usage = UsageRecord(
prompt_tokens=1024,
completion_tokens=256,
total_tokens=1280,
cached_input_tokens=512, # OpenAI cached_tokens
cache_creation_tokens=None, # Anthropic cache_creation_input_tokens
cache_read_tokens=512, # Anthropic cache_read_input_tokens
stream_completeness=StreamCompleteness.COMPLETE,
provider_id="aliyun",
model="qwen3.5-flash",
)StreamCompleteness values:
COMPLETE— full usage data availablePARTIAL— stream was interrupted; usage may be incompleteUNKNOWN— usage chunk was not received (e.g. server dropped the final SSE event)
The EmbeddingProvider protocol defines the interface for converting text into numerical vectors. Located in ecs_agent.providers.embedding_protocol.
from typing import Protocol, runtime_checkable
@runtime_checkable
class EmbeddingProvider(Protocol):
async def embed(self, texts: list[str]) -> list[list[float]]:
...OpenAIEmbeddingProvider is an OpenAI-compatible provider for generating text embeddings aligned with the new ProviderConfig model.
from ecs_agent.providers.embedding_provider import OpenAIEmbeddingProvider
provider = OpenAIEmbeddingProvider(
api_key="your-api-key",
model="text-embedding-3-small"
)FakeEmbeddingProvider returns deterministic vectors based on the hash of the input text. Ideal for testing and development without API costs.
from ecs_agent.providers.fake_embedding_provider import FakeEmbeddingProvider
provider = FakeEmbeddingProvider(dimension=384)
vectors = await provider.embed(["hello", "world"])OpenAIFilesService provides a typed file-upload service for OpenAI-compatible endpoints.
from ecs_agent.providers.openai_files import OpenAIFilesService
from ecs_agent.providers.config import ProviderConfig, ApiFormat
config = ProviderConfig(
provider_id="openai",
base_url="https://api.openai.com/v1",
api_key="your-api-key",
api_format=ApiFormat.OPENAI_FILES,
)
service = OpenAIFilesService(provider_config=config)
file_ref = await service.upload(path="/path/to/document.pdf", purpose="assistants")
# file_ref.file_id — stable reference usable in FileRefPart messagesUploaded file references can be embedded in multimodal messages via FileRefPart:
from ecs_agent.types import Message, FileRefPart
msg = Message(
role="user",
content="Summarize this document.",
parts=[
FileRefPart(file_id=file_ref.file_id),
],
)
---
## Multimodal Messages
The framework supports multimodal content via typed `Message.parts` entries.
### Core Types
Import path:
```python
from ecs_agent.types import Message, ImageUrlPart, FileRefPartType definitions (from ecs_agent.types):
@dataclass(slots=True)
class ImageUrlPart:
url: str
detail: str | None = None
@dataclass(slots=True)
class FileRefPart:
file_id: str
filename: str | None = None
MessagePart = ImageUrlPart | FileRefPart
@dataclass(slots=True)
class Message:
role: str
content: str # canonical text — always use this for text
parts: list[MessagePart] | None = None # non-text media onlyMessage.content is the canonical text field. Message.parts holds non-text media (ImageUrlPart, FileRefPart) only. Text always goes in content, never in parts.
from ecs_agent.types import Message, ImageUrlPart, FileRefPart
msg = Message(
role="user",
content="Please review the image and file. Focus on entities and relationships.",
parts=[
ImageUrlPart(url="https://example.com/diagram.png", detail="high"),
FileRefPart(file_id="file-abc123", filename="spec.pdf"),
],
)Text lives in content; media-only parts go in parts. Adapters prepend content as a text block when parts is also set.
message.content→{"type": "text", "text": message.content}(prepended when non-empty)ImageUrlPart→{"type": "image_url", "image_url": {"url": part.url, "detail": part.detail}}(detailomitted whenNone)FileRefPart→{"type": "file", "file": {"file_id": part.file_id, "filename": part.filename}}(filenameomitted whenNone)
{
"role": "user",
"content": [
{"type": "text", "text": "Please review the image and file. Focus on entities and relationships."},
{"type": "image_url", "image_url": {"url": "https://example.com/diagram.png", "detail": "high"}},
{"type": "file", "file": {"file_id": "file-abc123", "filename": "spec.pdf"}}
]
}- Text type is role-aware:
"input_text"(user) or"output_text"(assistant) ImageUrlPart→{"type": "input_image", "image_url": part.url, "detail": part.detail}(detailomitted whenNone)FileRefPart→{"type": "input_file", "file_id": part.file_id, "filename": part.filename}(filenameomitted whenNone)
{
"type": "message",
"role": "user",
"content": [
{"type": "input_text", "text": "Please review the image and file. Focus on entities and relationships."},
{"type": "input_image", "image_url": "https://example.com/diagram.png", "detail": "high"},
{"type": "input_file", "file_id": "file-abc123", "filename": "spec.pdf"}
]
}message.content→{"type": "text", "text": message.content}(prepended when non-empty)ImageUrlPart→{"type": "image", "source": {"type": "url", "url": part.url}}- Vision requires adapter config
supports_vision=True; otherwise image parts raiseValueError. FileRefPartis not supported by the Anthropic adapter and always raisesValueError.
detail accepts "low", "high", or "auto"; set None to omit it from payloads.
Use FileRefPart with IDs returned by OpenAIFilesService (see File Upload above).
For live vision tests, set env vars such as LLM_API_KEY, LLM_BASE_URL, LLM_MODEL, and IMAGE_URL (never hardcode secrets).
The VectorStore protocol defines the interface for storing and searching vectors. Located in ecs_agent.providers.vector_store.
from typing import Any, Protocol, runtime_checkable
@runtime_checkable
class VectorStore(Protocol):
async def add(self, id: str, vector: list[float], metadata: dict[str, Any] | None = None) -> None: ...
async def search(self, query_vector: list[float], top_k: int = 5) -> list[tuple[str, float]]: ...
async def delete(self, id: str) -> None: ...InMemoryVectorStore provides a simple, dictionary-backed vector store with cosine similarity search. It optionally uses numpy for faster computations if available.
from ecs_agent.providers.vector_store import InMemoryVectorStore
store = InMemoryVectorStore(dimension=384)
await store.add("doc1", [0.1, 0.2, ...], metadata={"text": "content"})
results = await store.search([0.1, 0.2, ...], top_k=5)- Production: Use
OpenAIProviderfor real API interaction. Wrap it in aRetryProviderto handle transient network issues or rate limits. - Testing: Use
FakeProviderfor unit tests where you need predictable, deterministic results without making real network requests. - Resilience: Always consider wrapping your primary provider in a
RetryProviderfor production environments. - Claude-native: Use
ClaudeProviderfor direct Anthropic API access with native tool use support and cache-aware accounting. - Multi-provider: Use
LiteLLMProviderwhen you need to switch between different providers without changing code. - Accounting: Attach
AccountingSubscriberto theEventBusto track cost and cache hit-rate metrics across invocations.