The universal runtime for AI agents.
The Idea · What It Does Today · Where It's Going · Quick Start · Roadmap
There's no standard way to package, run, and compose AI agents.
You build a LangChain agent. Someone else builds a CrewAI agent. A third team writes raw Python. None of these agents can discover each other, run in the same pool, or chain together without custom glue for every pair. Each one lives inside the application that built it — tightly coupled to its framework, its dependencies, its infrastructure.
Atlas introduces one abstraction that changes this: the contract.
A contract is a YAML file that declares what an agent takes in and what it puts out. Typed. Versioned. Framework-agnostic. What's behind the contract — an LLM, a database query, an entire CrewAI crew, a third-party API — doesn't matter. To Atlas, every agent is just a typed function with a name.
flowchart TB
subgraph Before["Without a standard runtime"]
A1[LangChain Agent] -.-x A2[CrewAI Agent]
A2 -.-x A3[Custom Agent]
A3 -.-x A4[Vendor API]
end
subgraph After["With Atlas"]
B1[LangChain Agent] --> C1[Contract]
B2[CrewAI Agent] --> C2[Contract]
B3[Custom Agent] --> C3[Contract]
B4[Vendor API] --> C4[Contract]
C1 & C2 & C3 & C4 --> Pool[Atlas Runtime<br/><i>one pool, one registry, one set of metrics</i>]
end
style Before fill:#fff3f3
style After fill:#f0fff4
This is the same idea that made Docker work for applications, npm for packages, and REST for services — a standard interface that decouples the what from the how.
Atlas is in active development. Here's what's shipped and working (1382 tests, 117 E2E).
Every agent declares a YAML contract with JSON Schema inputs and outputs. The runtime validates data at the boundary — invalid inputs are rejected before execution starts, malformed outputs are caught before reaching consumers.
agent:
name: classifier
version: "1.2.0"
capabilities: [classification, nlp]
input:
schema:
type: object
properties:
text: { type: string }
categories: { type: array, items: { type: string } }
required: [text, categories]
output:
schema:
type: object
properties:
category: { type: string }
confidence: { type: number }
required: [category, confidence]Discover agents from directories. Resolve by name and semver range. Search by capability.
registry = AgentRegistry(search_paths=["./agents", "./vendor-agents"])
registry.discover()
agent = registry.get("classifier", "^1.0.0") # latest 1.x.x
agents = registry.search("classification") # all with this capabilityBounded concurrency via semaphore. Priority-ordered scheduling — highest priority dequeued first. Warm slot reuse — agents call on_startup() once, then handle many jobs without reinitializing. Queue backpressure raises QueueFullError at capacity. Graceful shutdown waits for running jobs to complete.
pool = ExecutionPool(
registry, queue,
max_concurrent=8, # max parallel executions
warm_pool_size=4, # slots kept alive between jobs
idle_timeout=300.0, # evict idle slots after 5 min
)Define multi-step agent pipelines in YAML. Atlas mediates data between steps through a strategy cascade — trying the simplest approach first:
- Direct — output matches input schema, pass through
- Mapped — apply
input_mapfield mappings from chain definition - Coerce — automatic type conversion (string ↔ number, scalar wrapping)
- LLM Bridge — semantic transform via LLM when schemas are structurally incompatible
chain:
name: analyze-and-format
steps:
- agent: classifier
- agent: formatter
input_map:
content: category
style: uppercasefrom atlas.mediation.analyzer import analyze_compatibility
# Pre-check whether two agents can chain
compat = analyze_compatibility(agent_a.output_schema, agent_b.input_schema)
print(compat.strategy) # "direct", "mapped", "coerce", or "llm_bridge"Routing interceptors that decide what happens to each job — allow, reject (with reason), redirect to a different agent, or override priority. Swap the orchestrator at runtime without restarting.
from atlas.orchestrator.protocol import Orchestrator, RoutingDecision
class TierRouter(Orchestrator):
async def route(self, job, registry):
if job.metadata.get("tier") == "premium":
return RoutingDecision(action="redirect", agent_name="summarizer-gpt4")
return RoutingDecision(action="redirect", agent_name="summarizer-fast")
async def on_job_complete(self, job): ...
async def on_job_failed(self, job): ...
pool.set_orchestrator(TierRouter()) # takes effect on next jobMetrics, traces, eval hooks, retry, and persistence all subscribe to the same EventBus. Add or remove any of them without touching agent code. A failing subscriber doesn't affect execution or other subscribers.
from atlas.metrics import MetricsCollector
from atlas.trace import TraceCollector
MetricsCollector(bus) # per-agent: latency percentiles, warm hit rate, throughput
TraceCollector(bus) # per-job: execution time, token counts, cost estimates# Per-job trace
trace = tc.get(job.id)
trace.execution_ms # wall-clock time
trace.input_tokens # LLM tokens consumed
trace.estimated_cost_usd # cost estimate
# Per-agent metrics
m = mc.get_agent_metrics("classifier")
m["latency_p50_ms"] # 230.5
m["warm_hit_rate"] # 0.87
m["jobs_by_status"] # {"completed": 1500, "failed": 12}Agents can spawn child agents during execution — fan-out work across the pool, then collect results. Child jobs are tracked with parent references, depth limits prevent infinite recursion, and permissions are enforced at the contract level.
# agents/decomposer/agent.yaml
agent:
name: decomposer
version: "1.0.0"
requires:
spawn_agents: true # opt-in to spawning
input:
schema:
type: object
properties:
messages: { type: array, items: { type: string } }
required: [messages]
output:
schema:
type: object
properties:
results: { type: array }
count: { type: integer }
required: [results, count]# agents/decomposer/agent.py
class Agent(AgentBase):
async def execute(self, input_data: dict) -> dict:
results = []
for msg in input_data["messages"]:
# Each spawn() submits a child job to the pool,
# blocks until it completes, and returns the result
result = await self.context.spawn("echo", {"message": msg})
results.append({"success": result.success, "data": result.data})
return {"results": results, "count": len(results)}The splitting architecture:
flowchart TD
Parent[Parent Job<br/><i>decomposer</i>] --> Split{Split input<br/>into N items}
Split --> S1[spawn echo<br/>item 1]
Split --> S2[spawn echo<br/>item 2]
Split --> SN[spawn echo<br/>item N]
S1 --> Q[JobQueue]
S2 --> Q
SN --> Q
Q --> Pool[ExecutionPool<br/><i>bounded by max_concurrent</i>]
Pool --> C1[Child completes]
Pool --> C2[Child completes]
Pool --> CN[Child completes]
C1 & C2 & CN --> Collect[Parent collects<br/>SpawnResults]
Collect --> Done[Parent returns<br/>aggregated output]
Guards:
- Permission — only agents with
requires.spawn_agents: truecan callcontext.spawn() - Depth limit — max 3 levels deep (configurable), prevents infinite recursion
- Queue coordination — children flow through the same
JobQueueandExecutionPoolas top-level jobs
Colocate eval.yaml with agents. Checks run automatically on every execution and attach results to traces.
eval:
checks:
- name: confidence_reasonable
type: range
field: confidence
min_val: 0.0
max_val: 1.0
- name: category_not_empty
type: contains
field: categorySubmit jobs automatically on a schedule or in response to events. Four trigger types — cron, interval, one-shot, and webhook — with YAML definitions and full CRUD API.
# triggers/nightly-cleanup.yaml
trigger:
name: nightly-cleanup
trigger_type: cron
cron_expr: "0 2 * * *"
agent_name: data-cleaner
input_data:
older_than_days: 90atlas trigger create --type cron --cron "*/5 * * * *" --agent echo --input '{"message":"ping"}'
atlas trigger list
atlas serve --agents ./agents --triggers-path ./triggersWebhooks support optional HMAC-SHA256 signature validation:
curl -X POST http://localhost:8080/api/hooks/{trigger-id} \
-H "X-Atlas-Signature: sha256=..." \
-d '{"event": "deploy"}'Agents declare permission scopes and secret requirements in their contracts. The runtime enforces them — no agent can access files, network, or secrets it hasn't declared. Secrets are resolved from environment variables or encrypted files, never hardcoded.
agent:
name: data-processor
permissions:
file_system: [read]
network: [outbound]
requires:
secrets: [API_KEY, DB_PASSWORD]atlas serve --security-policy security.yaml --agents ./agentsAgents declare tool dependencies via requires.skills. The runtime resolves and injects them at execution time — agents access tools through context.skill(). Atlas ships 12 platform tools (atlas.*) that expose runtime internals to agents.
agent:
name: orchestrator-agent
requires:
platform_tools: true # inject all atlas.* tools
skills: [custom-search, embedder] # inject specific skillsclass Agent(AgentBase):
async def execute(self, input_data: dict) -> dict:
agents = await self.context.skill("atlas.registry.list", {})
result = await self.context.skill("custom-search", {"query": "..."})
return {"found": result}Atlas instances communicate via the Model Context Protocol. One instance's agents and tools are transparently available to another — no custom glue, no shared infrastructure.
# Instance A: start with MCP server
atlas serve --mcp-port 8400 --auth-token secret
# Instance B: connect to A, federate tools and agents
atlas serve --mcp-port 8401 --remote "lab=http://hostA:8400/mcp@secret"Instance B now has all of A's agents as lab.* in its registry. Chains on B can reference lab.translator as a step — it executes on A and returns the result transparently.
chain:
name: cross-instance-pipeline
steps:
- agent: lab.translator # runs on Instance A
- agent: local-formatter # runs on Instance BNot every agent needs Python. Atlas supports three provider types — all running in the same pool, same chains, same metrics.
exec provider — run any executable as an agent. JSON on stdin, JSON on stdout. Write agents in Rust, Go, Node, shell scripts — anything that can read and write JSON.
# agents/my-rust-agent/agent.yaml
agent:
name: my-rust-agent
version: "1.0.0"
provider:
type: exec
command: ["./target/release/my-agent"]
input:
schema:
type: object
properties:
message: { type: string }
required: [message]
output:
schema:
type: object
properties:
result: { type: string }
required: [result]The runtime sends a JSON envelope on stdin ({input, context, memory}) and reads JSON from stdout. No Python, no SDK, no boilerplate.
llm provider — define LLM agents in pure YAML. No code at all. System prompt, model preference, skills as tools — the runtime handles the tool-use loop.
# agents/yaml-summarizer/agent.yaml
agent:
name: yaml-summarizer
version: "1.0.0"
provider:
type: llm
system_prompt: |
You are a concise text summarizer. Return a JSON object
with a "summary" field containing a 1-3 sentence summary.
output_format: json
max_iterations: 1
model:
preference: fast
input:
schema:
type: object
properties:
text: { type: string }
required: [text]
output:
schema:
type: object
properties:
summary: { type: string }
required: [summary]No agent.py needed. Skills declared in requires.skills are automatically exposed as tools to the LLM.
python provider (default) — existing behavior, unchanged. Write an AgentBase subclass in agent.py.
All three provider types are discovered, registered, and executed identically. Consumers don't know or care which provider is behind a contract.
Agents in the pool can learn from each other. Opt in with requires.memory: true — all participating agents share a memory pool that persists across executions.
agent:
name: learning-agent
requires:
memory: trueclass Agent(AgentBase):
async def execute(self, input_data: dict) -> dict:
# Read what previous agents learned
memory = await self.context.memory_read()
# Add your own learnings
await self.context.memory_append("API rate limit is 100/min")
return {"result": "..."}For llm provider agents, memory is automatically injected into the system prompt and a memory_append tool is exposed — no code needed.
For exec provider agents, memory arrives in the stdin envelope and writes back via a _memory_append key in the output.
File-backed by default (memory.md), pluggable via HTTP hook for external systems (Redis, vector DB, etc.).
# File-backed (default)
atlas serve --agents ./agents --memory memory.md
# HTTP hook for external memory
atlas serve --agents ./agents --memory-url http://localhost:9000/memoryAgents can search, store, and share structured knowledge — scoped by domain with per-agent read/write ACLs. Orthogonal to shared memory: memory is "what happened this run", knowledge is "what do we know about X".
agent:
name: research-agent
requires:
knowledge:
read_domains: ["*"] # can search all domains
write_domains: [ai-systems] # can only write to ai-systemsclass Agent(AgentBase):
async def execute(self, input_data: dict) -> dict:
# Search existing knowledge
results = await self.context.knowledge_search("rate limits", domain="api")
# Store new knowledge (ACL-enforced)
await self.context.knowledge_store(
"API rate limit is 200/min",
domain="ai-systems",
tags=["api", "rate-limits"],
)
return {"found": len(results)}For llm provider agents, relevant knowledge is auto-searched and injected into the system prompt, with knowledge_store and knowledge_search exposed as tools — no code needed.
For exec provider agents, knowledge arrives in the stdin envelope and writes back via a _knowledge_store key in the output.
Three pluggable providers ship out of the box:
- File — markdown files with YAML frontmatter, organized by domain subdirectories (mini-Kronos)
- HTTP — REST hook for external knowledge systems
- MCP — delegates to an MCP server (e.g., Kronos vault)
Protected domains prevent wildcard writes — an agent with write_domains: ["*"] still can't write to physics if it's protected. Must be explicitly listed.
# File-backed knowledge
atlas serve --agents ./agents --knowledge ./knowledge
# HTTP hook for external knowledge systems
atlas serve --agents ./agents --knowledge-url http://localhost:9000/knowledge
# With protected domain policy
atlas serve --agents ./agents --knowledge ./knowledge --knowledge-policy policy.yamlPackage, publish, and pull agents across instances. File-based registries work locally with zero infrastructure; HTTP registries let teams share agents across the network. Agents declare dependencies on other agents in their contracts — the pool checks these before execution.
agent:
name: pipeline-agent
requires:
agents:
- translator # any version
- name: summarizer
version: ">=1.0.0" # semver range# Configure registries
atlas registry add local --file ./my-registry
atlas registry add team --http https://atlas.example.com/api/registry --token $TOKEN
# Publish and pull
atlas registry publish ./agents/echo --registry local
atlas registry pull translator --version ">=1.0.0"
atlas registry search "text-processing"
# Serve as a registry for other instances
atlas serve --agents ./agents --registry ./my-registryTwo pluggable providers:
- File — directory-based, zero config (manifest.json + package.tar.gz per version)
- HTTP — REST client for remote registries (any Atlas instance with
--registrybecomes one)
Dependencies are checked at job submission — if an agent requires translator and it's not registered, the job fails with a clear error and install hint. No auto-install; explicit atlas registry pull keeps behavior predictable.
Agents declare hardware requirements in their contracts. The pool tracks capacity and gates job execution on resource availability — GPU-hungry agents get clear errors instead of silent failures when resources are exhausted.
agent:
name: vision-model
hardware:
gpu: true
gpu_vram_gb: 16
min_memory_gb: 32
min_cpu_cores: 4
architecture: x86_64# Start pool with hardware inventory
atlas serve --agents ./agents --gpus 2 --gpu-vram 16,24 \
--pool-memory 128 --pool-cpus 32 --pool-arch x86_64Resources are allocated per-slot and released when slots return to the warm pool or are destroyed. The health endpoint reports live capacity:
GET /api/health → { "hardware": { "total_gpus": 2, "free_gpus": 1, ... } }No hardware flags = no tracking. Agents without hardware requirements are completely unaffected.
Failed jobs auto-retry with configurable backoff. Jobs persist to SQLite and survive crashes — pending jobs reload on restart.
recovered = await queue.load_pending() # picks up where it left offatlas discover ./agents # scan and list contracts
atlas run classifier '{"text": "...", "categories": ["a","b"]}'
atlas serve --agents ./agents --db atlas.db # start HTTP + WS server
atlas orchestrator set cost-router # swap routing at runtime| Endpoint | Description |
|---|---|
POST /api/jobs |
Submit a job |
GET /api/jobs/{id} |
Job status and result |
GET /api/agents |
Discovered agents |
GET /api/metrics |
Pool and per-agent metrics |
GET /api/traces |
Execution traces |
POST /api/orchestrator |
Swap routing policy |
POST /api/triggers |
Create a trigger |
GET /api/triggers |
List triggers |
POST /api/triggers/{id}/fire |
Manually fire a trigger |
POST /api/hooks/{id} |
Webhook receiver |
WS /ws |
Live job status stream |
For the full technical deep-dive — internal diagrams, module map, ordering guarantees, warm slot lifecycle — see Architecture.
The contract abstraction is the foundation. Here's what it unlocks as Atlas matures.
Wrap agents built with LangChain, CrewAI, OpenClaw, or anything else in Atlas contracts. They run in the same pool, compose in the same chains, and appear in the same registry — regardless of what's inside.
# A CrewAI crew behind an Atlas contract
class Agent(AgentBase):
async def on_startup(self):
self.crew = build_my_crewai_crew()
async def execute(self, input_data: dict) -> dict:
result = await self.crew.kickoff(input_data)
return {"analysis": result.output}The consumer doesn't know or care what's underneath. They see a contract. They submit a job. They get a typed result.
Every team builds agents for their domain. All of them publish contracts to a shared registry. Every product discovers and uses any agent without importing code, managing dependencies, or standing up separate infrastructure.
flowchart TB
subgraph Teams["Teams build agents"]
T1[Payments Team] -->|publish| A1[fraud-detector<br/><i>v2.1.0</i>]
T2[Content Team] -->|publish| A2[moderator<br/><i>v1.4.0</i>]
T3[Data Team] -->|publish| A3[classifier<br/><i>v3.0.0</i>]
T4[ML Team] -->|publish| A4[summarizer<br/><i>v1.0.0</i>]
end
subgraph Registry["Shared Registry"]
A1 & A2 & A3 & A4 --> Reg[Agent Registry]
end
subgraph Products["Products consume agents"]
Reg --> P1[Checkout App]
Reg --> P2[CMS Dashboard]
Reg --> P3[Analytics Pipeline]
Reg --> P4[Customer Support Bot]
end
style Teams fill:#f0f4ff
style Registry fill:#fffff0
style Products fill:#f0fff4
Teams own their agents. Products compose them freely. One pool handles concurrency, priority, cost tracking, and routing.
Domain experts publish agents the same way they'd ship an API — the contract is the interface, the implementation stays private. Versioned with semver. Pin consumers to ranges. Ship updates without breaking anyone.
# What you publish — the contract
agent:
name: legal-doc-analyzer
version: "2.1.0"
capabilities: [legal-analysis, risk-assessment, compliance]
input:
schema: { ... }
output:
schema: { ... }
# What stays private: the implementation, model weights, prompts, training dataAn on-prem extraction agent feeds a cloud LLM agent feeds an internal formatter. They don't share code, dependencies, or infrastructure. They share contracts — Atlas mediates the data between them.
chain:
name: cross-infra-pipeline
steps:
- agent: on-prem/extractor
- agent: cloud/summarizer
input_map: { text: extracted_data }
- agent: internal/formatter
input_map: { content: summary, style: markdown }Submit work to the pool and let it run — classification sweeps, nightly cleanups, research tasks, batch processing, periodic audits. Jobs are persistent, priority-ordered, and recoverable after crashes.
# Background cleanup — low priority
for table in stale_tables:
await pool.submit(JobData(
agent_name="data-cleaner",
input_data={"table": table, "older_than_days": 90},
priority=0,
))
# Urgent fraud check — jumps the queue
await pool.submit(JobData(
agent_name="fraud-detector",
input_data={"transaction_id": txn.id},
priority=10,
))pip install atlas# agents/echo/agent.yaml
agent:
name: echo
version: "1.0.0"
input:
schema:
type: object
properties:
message: { type: string }
required: [message]
output:
schema:
type: object
properties:
message: { type: string }
required: [message]# agents/echo/agent.py
from atlas.runtime.base import AgentBase
class Agent(AgentBase):
async def execute(self, input_data: dict) -> dict:
return {"message": input_data["message"]}atlas discover ./agents
atlas run echo '{"message": "hello"}'# agents/greeter/agent.yaml — no agent.py needed
agent:
name: greeter
version: "1.0.0"
provider:
type: llm
system_prompt: "You are a friendly greeter. Return {\"greeting\": \"...\"}"
output_format: json
model:
preference: fast
input:
schema:
type: object
properties:
name: { type: string }
required: [name]
output:
schema:
type: object
properties:
greeting: { type: string }
required: [greeting]atlas run greeter '{"name": "world"}'
# {"greeting": "Hello, world! Welcome!"}from atlas.contract.registry import AgentRegistry
from atlas.events import EventBus
from atlas.pool.executor import ExecutionPool
from atlas.pool.job import JobData
from atlas.pool.queue import JobQueue
registry = AgentRegistry(search_paths=["./agents"])
registry.discover()
bus = EventBus()
queue = JobQueue(max_size=100, event_bus=bus)
pool = ExecutionPool(registry, queue, max_concurrent=8, warm_pool_size=4)
await pool.start()
job = JobData(agent_name="echo", input_data={"message": "hello"})
await pool.submit(job)
result = await queue.wait_for_terminal(job.id, timeout=10.0)
# result.output_data == {"message": "hello"}| Phase | What |
|---|---|
| 1 | Contracts & Registry — YAML contracts, schema validation, semver, capability search |
| 2 | Execution Pool — priority queue, warm slots, concurrency, graceful shutdown |
| 3 | Chain Composition — multi-step chains, mediation engine, async executor |
| 4 | Mediation Engine — direct / mapped / coerce / LLM bridge strategies |
| 5 | Monitoring — traces, token tracking, cost estimation, eval hooks |
| 6 | Orchestrator Override — pluggable routing, hot-swap, reject/redirect |
| 7 | Triggers & Scheduling — cron, interval, one-shot, webhooks with HMAC, trigger CRUD API |
| 8 | Security & Sandboxing — permission scopes, resource limits, network policies, secret injection |
| 9 | Skills & Tool Use — skill registry, resolver, platform tools (12 atlas.* tools), skill injection into agents |
| 10A | MCP Server — Streamable HTTP + SSE transport, bearer token auth middleware, health endpoint |
| 10B | MCP Client — remote tool federation, RemoteToolProvider, namespaced skill registration |
| 10C | Federated Chains — RemoteAgentProvider, virtual agents in registry, atlas.exec.run, skill injection in chains |
| 11 | Dynamic Agents & Shared Memory — exec provider (any language), llm provider (YAML-only), pluggable shared memory (file/HTTP) |
| 12 | Knowledge Base & Access Control — pluggable knowledge providers (file/MCP/HTTP), domain-scoped permissions, agent-level read/write ACLs, path traversal protection |
| 13 | Agent Marketplace — package, publish, and pull agents across registries. File and HTTP registry providers, dependency resolution, CLI commands, server endpoints |
| 14 | Hardware Scheduling — GPU/memory-aware slots, resource inventory tracking, hardware-gated job execution, health endpoint integration |
- Architecture — technical deep-dive, diagrams, module map
- Contributing — development setup, testing, how to add agents
- Demos — 5 runnable demos (pipeline, triggers, hardware scheduling, MCP server, knowledge)
- Example agents — 19 reference implementations (Python, exec, YAML-only LLM)
- Example chains — multi-step pipeline definitions
- Example triggers — cron and webhook trigger definitions
- Python 3.11+
- No external services — runs with Python agents and SQLite
- Optional:
anthropic,openai, orlangchainfor LLM-powered agents