Skip to content

ChrisLally/a2a-redis

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

9 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

a2a-redis

Redis integrations for the Agent-to-Agent (A2A) JavaScript/TypeScript SDK.

This package provides Redis-backed implementations of core A2A components for persistent task storage, reliable event queue management, and push notification configuration using Redis.

This package is the JavaScript/TypeScript implementation, inspired by the Python a2a-redis package.

Features

  • RedisTaskStore & RedisJSONTaskStore: Redis-backed task storage using hashes or JSON
  • RedisStreamsQueueManager & RedisStreamsEventQueue: Persistent, reliable event queues with consumer groups
  • RedisPubSubQueueManager & RedisPubSubEventQueue: Real-time, low-latency event broadcasting
  • RedisPushNotificationConfigStore: Task-based push notification configuration storage
  • Consumer Group Strategies for Streams: Flexible load balancing and instance isolation patterns
  • Next.js Serverless Ready: Built-in helpers for singleton Redis clients and route handlers

Language Support

This package is the JavaScript/TypeScript implementation. For Python, see the a2a-redis Python package.

Why separate implementations?

  • Each language has its own ecosystem and best practices
  • Python version: Designed for AsyncIO, Starlette, FastAPI
  • JavaScript version: Designed for Node.js, Next.js, serverless environments

Identical API patterns make it easy to use both versions interchangeably or migrate between languages.

Key JavaScript/TypeScript Advantages

  • Next.js Integration: Built-in helpers for serverless Redis clients and route handlers (nextjs-helpers)
  • TypeScript Strict Mode: Full type safety with strict type checking
  • ESM Support: Native ES modules, tree-shakeable imports
  • Singleton Pattern: Optimized Redis client pooling for serverless functions
  • No runtime overhead: Compile-time type checking catches errors early

Parallel API Between Languages

Both implementations share identical component names and patterns:

Component Python TypeScript
Task Storage RedisTaskStore RedisTaskStore
JSON Storage RedisJSONTaskStore RedisJSONTaskStore
Streams Queue RedisStreamsQueueManager RedisStreamsQueueManager
Pub/Sub Queue RedisPubSubQueueManager RedisPubSubQueueManager
Push Notifications RedisPushNotificationConfigStore RedisPushNotificationConfigStore
Consumer Strategy ConsumerGroupStrategy ConsumerGroupStrategy

Installation

npm install a2a-redis
# or with pnpm
pnpm add a2a-redis

Both redis and @a2a-js/sdk are peer dependencies and must be installed in your project.

Next.js Users

If you're integrating a2a-redis with Next.js, see the Next.js Integration Guide for:

  • Singleton Redis client setup for serverless
  • Route handler examples
  • Multi-turn conversation patterns
  • Production deployment tips

Quick Start

import { RedisTaskStore, RedisStreamsQueueManager, RedisPushNotificationConfigStore } from 'a2a-redis';
import { createRedisClient } from 'a2a-redis/utils';
import { DefaultRequestHandler } from 'a2a-sdk/server';
import { A2AExpressApplication } from 'a2a-sdk/server/express';

// Create Redis client with connection management
const redisClient = createRedisClient({
  url: 'redis://localhost:6379/0',
  maxConnections: 50,
});

// Initialize Redis components
const taskStore = new RedisTaskStore(redisClient, { prefix: 'myapp:tasks:' });
const queueManager = new RedisStreamsQueueManager(redisClient, { prefix: 'myapp:queues:' });
const pushConfigStore = new RedisPushNotificationConfigStore(redisClient, { prefix: 'myapp:push:' });

// Use with A2A request handler
const requestHandler = new DefaultRequestHandler({
  agentExecutor: yourAgentExecutor,
  taskStore,
  queueManager,
  pushConfigStore,
});

// Create A2A server
const server = new A2AExpressApplication({
  agentCard: yourAgentCard,
  httpHandler: requestHandler,
});

Connecting to Hosted Redis

For production deployments, you can connect to hosted Redis services like Redis Cloud, AWS ElastiCache, or Heroku Redis:

Using Redis URL

import { createRedisClient } from 'a2a-redis/utils';

// From environment variable
const redisClient = createRedisClient({
  url: process.env.REDIS_URL, // e.g., redis://user:password@host:port/db
  maxConnections: 50,
});

Using Host, Port, and Password

import { createRedisClient } from 'a2a-redis/utils';

// For Redis Cloud or similar hosted services
const redisClient = createRedisClient({
  host: 'your-redis-host.redis.cloud',
  port: 19XXX,
  password: process.env.REDIS_PASSWORD,
  tls: true, // Required for most hosted services
  maxConnections: 50,
});

Environment Variables

# .env file
REDIS_URL=redis://:your-password@your-host:19XXX/0
# OR
REDIS_HOST=your-redis-host.redis.cloud
REDIS_PORT=19XXX
REDIS_PASSWORD=your-password

Queue Components

The package provides both high-level queue managers and direct queue implementations:

Queue Managers

  • RedisStreamsQueueManager - Manages Redis Streams-based queues
  • RedisPubSubQueueManager - Manages Redis Pub/Sub-based queues
  • Both implement the A2A SDK's QueueManager interface

Event Queues

  • RedisStreamsEventQueue - Direct Redis Streams queue implementation
  • RedisPubSubEventQueue - Direct Redis Pub/Sub queue implementation
  • Both implement the EventQueue interface

Queue Types: Streams vs Pub/Sub

RedisStreamsQueueManager

Key Features:

  • Persistent storage: Events remain in streams until explicitly trimmed
  • Guaranteed delivery: Consumer groups with acknowledgments prevent message loss
  • Load balancing: Multiple consumers can share work via consumer groups
  • Failure recovery: Unacknowledged messages can be reclaimed by other consumers
  • Event replay: Historical events can be re-read from any point in time
  • Ordering: Maintains strict insertion order with unique message IDs

Use Cases:

  • Task event queues requiring reliability
  • Audit trails and event history
  • Work distribution systems
  • Systems requiring failure recovery
  • Multi-consumer load balancing

Trade-offs:

  • Higher memory usage (events persist)
  • More complex setup (consumer groups)
  • Slightly higher latency than pub/sub

RedisPubSubQueueManager

Key Features:

  • Real-time delivery: Events delivered immediately to active subscribers
  • No persistence: Events not stored, only delivered to active consumers
  • Fire-and-forget: No acknowledgments or delivery guarantees
  • Broadcasting: All subscribers receive all events
  • Low latency: Minimal overhead for immediate delivery
  • Minimal memory usage: No storage of events

Use Cases:

  • Live status updates and notifications
  • Real-time dashboard updates
  • System event broadcasting
  • Non-critical event distribution
  • Low-latency requirements
  • Simple fan-out scenarios

Not suitable for:

  • Critical event processing requiring guarantees
  • Systems requiring event replay or audit trails
  • Offline-capable applications
  • Work queues requiring load balancing

Components

Task Storage

RedisTaskStore

Stores task data in Redis using hashes with JSON serialization. Works with any Redis server.

import { RedisTaskStore } from 'a2a-redis';

const taskStore = new RedisTaskStore(redisClient, { prefix: 'mytasks:' });

// A2A TaskStore interface methods
await taskStore.save('task123', { status: 'pending', data: { key: 'value' } });
const task = await taskStore.get('task123');
const success = await taskStore.delete('task123');

// List all task IDs (utility method)
const taskIds = await taskStore.listTaskIds();

RedisJSONTaskStore

Stores task data using Redis's JSON module for native JSON operations and complex nested data.

import { RedisJSONTaskStore } from 'a2a-redis';

// Requires Redis 8 or RedisJSON module
const jsonTaskStore = new RedisJSONTaskStore(redisClient, { prefix: 'mytasks:' });

// Same interface as RedisTaskStore but with native JSON support
await jsonTaskStore.save('task123', { complex: { nested: { data: 'value' } } });

Task TTL/Expiration

Both RedisTaskStore and RedisJSONTaskStore support automatic expiration of task keys via Redis TTL. This is useful for ephemeral tasks that should not accumulate in Redis indefinitely.

import { RedisTaskStore } from 'a2a-redis';

// Create a task store with 30-minute task expiration
const taskStore = new RedisTaskStore(redisClient, {
  prefix: 'tasks:',
  ttl: 1800, // seconds (30 minutes)
});

// Each time save() is called, the TTL is reset
// Example: Task expires if not updated for 30 minutes
await taskStore.save({
  id: 'task123',
  status: 'pending',
  createdAt: new Date().toISOString(),
});

// Update task status after 10 minutes - TTL is reset, now expires in another 30 minutes
setTimeout(() => {
  taskStore.save({
    id: 'task123',
    status: 'completed',
    completedAt: new Date().toISOString(),
  });
}, 10 * 60 * 1000);

// Without TTL (default), tasks persist until manually deleted
const persistentStore = new RedisTaskStore(redisClient, { prefix: 'permanent:' });

TTL Benefits:

  • Prevents unbounded memory growth from accumulated completed tasks
  • Automatic cleanup without manual intervention
  • Configurable per task store
  • TTL refreshes on each task update (moving expiration window)

Default: No TTL (tasks persist forever) - TTL is opt-in

Task Store Return Types

Both RedisTaskStore and RedisJSONTaskStore implement async methods that return optional types. Type aliases are exported for convenience:

import type { TaskLoadResult } from 'a2a-redis';

// TaskLoadResult = Task | undefined
const task: TaskLoadResult = await taskStore.load('task-123');
if (task) {
  console.log(`Task status: ${task.status}`);
} else {
  console.log('Task not found or has expired');
}

Return types:

  • load(taskId) returns Promise<TaskLoadResult> (Task | undefined)
  • save(taskId, task) returns Promise<void>
  • delete(taskId) returns Promise<void>
  • listTaskIds() returns Promise<string[]>

Queue Managers

Both queue managers implement the A2A QueueManager interface with full async support:

import { RedisStreamsQueueManager, RedisPubSubQueueManager } from 'a2a-redis';
import { ConsumerGroupStrategy, ConsumerGroupConfig } from 'a2a-redis';

// For reliable, persistent processing
const streamsManager = new RedisStreamsQueueManager(redisClient, {
  prefix: 'myapp:streams:',
});

// For real-time, low-latency broadcasting
const pubsubManager = new RedisPubSubQueueManager(redisClient, {
  prefix: 'myapp:pubsub:',
});

// With custom consumer group configuration (streams only)
const config = new ConsumerGroupConfig({
  strategy: ConsumerGroupStrategy.SHARED_LOAD_BALANCING,
});
const streamsManager = new RedisStreamsQueueManager(redisClient, { consumerConfig: config });

async function main() {
  // Same interface for both managers
  const queue = await streamsManager.createOrTap('task123');

  // Enqueue events
  await queue.enqueueEvent({ type: 'progress', message: 'Task started' });
  await queue.enqueueEvent({ type: 'progress', message: '50% complete' });

  // Dequeue events
  try {
    const event = await queue.dequeueEvent({ noWait: true }); // Non-blocking
    console.log(`Got event: ${event}`);
    await queue.taskDone(); // Acknowledge the message (streams only)
  } catch (error) {
    console.log('No events available');
  }

  // Close queue when done
  await queue.close();
}

main();

Consumer Group Strategies

The Streams queue manager supports different consumer group strategies:

import { ConsumerGroupStrategy, ConsumerGroupConfig } from 'a2a-redis';

// Multiple instances share work across a single consumer group
const config = new ConsumerGroupConfig({
  strategy: ConsumerGroupStrategy.SHARED_LOAD_BALANCING,
});

// Each instance gets its own consumer group
const config = new ConsumerGroupConfig({
  strategy: ConsumerGroupStrategy.INSTANCE_ISOLATED,
});

// Custom consumer group name
const config = new ConsumerGroupConfig({
  strategy: ConsumerGroupStrategy.CUSTOM,
  groupName: 'my_group',
});

const streamsManager = new RedisStreamsQueueManager(redisClient, { consumerConfig: config });

RedisPushNotificationConfigStore

Stores push notification configurations per task. Implements the A2A PushNotificationConfigStore interface.

import { RedisPushNotificationConfigStore } from 'a2a-redis';
import { PushNotificationConfig } from 'a2a-sdk';

const configStore = new RedisPushNotificationConfigStore(redisClient, {
  prefix: 'myapp:push:',
});

// Create push notification config
const config = new PushNotificationConfig({
  url: 'https://webhook.example.com/notify',
  token: 'secret_token',
  id: 'webhook_1',
});

// A2A interface methods
await configStore.setInfo('task123', config);

// Get all configs for a task
const configs = await configStore.getInfo('task123');
for (const config of configs) {
  console.log(`Config ${config.id}: ${config.url}`);
}

// Delete specific config or all configs for a task
await configStore.deleteInfo('task123', 'webhook_1'); // Delete specific
await configStore.deleteInfo('task123'); // Delete all

RedisContextStore

Manages conversation contexts for multi-turn interactions. Groups related tasks and provides history retrieval.

import { RedisContextStore } from 'a2a-redis';

// Create a context store with 1-hour expiration
const contextStore = new RedisContextStore(redisClient, {
  prefix: 'contexts:',
  ttl: 3600, // optional: auto-cleanup after 1 hour
});

// Create a context for a new conversation
await contextStore.createContext('conv-user-123', {
  userId: 'user-123',
  agent: 'travel-agent',
  sessionId: 'session-abc',
});

// Add tasks as they're created in the conversation
await contextStore.addTaskToContext('conv-user-123', 'task-1');
await contextStore.addTaskToContext('conv-user-123', 'task-2');
await contextStore.addTaskToContext('conv-user-123', 'task-3');

// Retrieve context to see all conversation tasks
const context = await contextStore.getContext('conv-user-123');
console.log(`Conversation has ${context?.taskIds.length} tasks`);
console.log(`Started at: ${context?.createdAt}`);
console.log(`User: ${context?.metadata?.userId}`);

// Get just the task IDs
const taskIds = await contextStore.getContextTasks('conv-user-123');
for (const taskId of taskIds) {
  const task = await taskStore.load(taskId);
  console.log(`Task ${taskId} status: ${task?.status}`);
}

// Clean up when conversation ends
await contextStore.deleteContext('conv-user-123');

Context Features:

  • Multi-turn support: Group related tasks from a single conversation
  • History retrieval: Get all tasks in a conversation
  • Metadata storage: Store user/session info with the context
  • TTL-based cleanup: Automatic expiration of old conversations
  • Task isolation: Tasks added to context without duplicates

Default: No TTL (contexts persist) - TTL is opt-in

Context Store Return Types

The RedisContextStore implements async methods with optional return types. Type aliases are exported for convenience:

import type { ContextLookupResult } from 'a2a-redis';

// ContextLookupResult = Context | undefined
const context: ContextLookupResult = await contextStore.getContext('conv-user-123');
if (context) {
  console.log(`Context created at: ${context.createdAt}`);
  console.log(`Tasks in context: ${context.taskIds.length}`);
} else {
  console.log('Context not found or has expired');
}

Return types:

  • getContext(contextId) returns Promise<ContextLookupResult> (Context | undefined)
  • createContext(contextId, metadata?) returns Promise<void>
  • addTaskToContext(contextId, taskId) returns Promise<void>
  • getContextTasks(contextId) returns Promise<string[]>
  • deleteContext(contextId) returns Promise<void>

Requirements

Required

  • Node.js 18+
  • TypeScript 5.0+ (if using TypeScript)
  • redis (peer dependency) - redis.io >= 4.0.0 (the official Redis client)
  • @a2a-js/sdk (peer dependency) >= 0.3.4

Both redis and @a2a-js/sdk must be installed in your project:

npm install redis @a2a-js/sdk a2a-redis
# or with pnpm
pnpm add redis @a2a-js/sdk a2a-redis

Optional

  • RedisJSON module for RedisJSONTaskStore (enhanced nested data support)
  • Redis Stack or Redis with modules for full feature support

Development

# Install dependencies
npm install

# Run tests
npm test

# Run tests with coverage
npm run test:coverage

# Type checking
npm run type-check

# Linting and formatting
npm run lint
npm run format

# Run examples
npm run example:basic
npm run example:agent

Testing

Tests use Redis database 15 for isolation and include both unit and integration tests:

# Run all tests
npm test

# Run specific test file
npm test -- task-store.test.ts

# Run with coverage
npm run test:coverage

Test Architecture

Tests are designed to run efficiently with limited Redis connections, particularly important for free-tier hosted Redis (e.g., Redis Cloud free tier which allows only 1 concurrent connection):

  • Shared Redis Client: All tests use a single Redis client initialized in tests/setup.ts, reducing connection overhead
  • Single-threaded Execution: Tests run serially (maxWorkers: 1) to prevent connection conflicts
  • Database Isolation: Tests use Redis database 15, separate from development/production databases
  • Automatic Cleanup: cleanupTestRedis() flushes database between tests

Pub/Sub Tests (Skipped)

Pub/Sub integration tests are skipped by default because they require duplicate connections:

  • Redis Pub/Sub requires a separate connection for subscriptions (client.duplicate())
  • Free-tier Redis (1 connection limit) cannot support both main + pub/sub connections
  • Unit tests for Pub/Sub still run with mock clients
  • To run pub/sub integration tests, use a Redis instance with higher connection limits

Running Tests with Redis Cloud Free Tier

The test suite is optimized for Redis Cloud free tier:

  1. Environment Setup

    # Add to .env
    REDIS_URL=redis://user:password@host:port/db
  2. Run Tests

    npm test
  3. Expected Output

    • ~200+ tests passing (148+ integration tests)
    • ~30 pub/sub tests skipped (due to connection limits)
    • ~17 failing tests (pre-existing, unrelated to connection limit)

The shared client pattern allows efficient testing even with connection constraints.

License

MIT

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors