Skip to content

feat: Add @agentxjs/queue package for reliable event delivery #205

@deepracticexs

Description

@deepracticexs

Background / Problem

Currently, WebSocket messages are broadcast directly without any queuing mechanism:

// createLocalAgentX.ts:88-104
runtime.onAny((event) => {
  wsServer.broadcast(JSON.stringify(event));  // Direct broadcast, no buffering
});

Problems:

  • If the client disconnects, messages are silently dropped
  • No mechanism for message recovery after reconnection
  • Frontend state becomes inconsistent when messages are lost

Architecture Analysis

Current package structure:

┌─────────────────────────────────────────────────────┐
│                    runtime                          │  ← Business domain
│         (Agent, Session, Container, Image)          │
└─────────────────────────────────────────────────────┘
                         │
          ┌──────────────┴──────────────┐
          ▼                             ▼
    ┌──────────────┐             ┌──────────────┐
    │  persistence │             │   network    │     ← Technical infrastructure
    │  (storage)   │             │  (transport) │
    └──────────────┘             └──────────────┘

What's missing: A message queue layer for reliable event delivery.

Proposal: @agentxjs/queue

Add a new technical infrastructure package (same level as persistence and network):

┌─────────────────────────────────────────────────────┐
│                    runtime                          │
│                                                     │
│   Session {                                         │
│     messages: Message[]     ← uses persistence      │
│     eventQueue              ← uses queue (NEW)      │
│   }                                                 │
└─────────────────────────────────────────────────────┘
                         │
          ┌──────────────┼──────────────┐
          ▼              ▼              ▼
    ┌──────────┐   ┌──────────┐   ┌──────────┐
    │persistence│   │ network  │   │  queue   │  ← NEW
    │          │   │          │   │          │
    │ SQLite   │   │ WebSocket│   │ SQLite   │
    │ Redis    │   │ SSE      │   │ Redis    │
    │ Memory   │   │          │   │ Memory   │
    └──────────┘   └──────────┘   └──────────┘

Why "Queue" and not "Stream"?

Aspect Queue Stream
Core metaphor Task queue Event log
Consumption Consume & delete Replayable
Consumers Usually single Multiple
Position tracking Optional Required (offset)
Typical use Background jobs Event sourcing
Examples RabbitMQ, SQS Kafka, Redis Streams

Our requirements:

  • ✅ Reconnection recovery → needs cursor (Stream feature)
  • ✅ Delete after consumption → save space (Queue feature)
  • ✅ Persistence → survive restarts (Stream feature)
  • ✅ Single consumer per topic → one client (Queue feature)

Conclusion: We need a "Durable Queue with cursor support" - primarily Queue behavior, internally implemented with append-only log for cursor support.

Proposed API

// packages/queue/src/types.ts

/**
 * EventQueue - Durable queue with cursor support
 * 
 * Features:
 * - Persistent: survives restarts
 * - Cursor: supports reconnection recovery
 * - Cleanable: delete after consumption or auto-expire
 */
export interface EventQueue {
  /**
   * Append event to queue
   * @returns cursor for recovery
   */
  append(topic: string, event: SystemEvent): Promise<string>;
  
  /**
   * Read events from cursor (reconnection recovery)
   */
  read(topic: string, afterCursor?: string, limit?: number): Promise<QueueEntry[]>;
  
  /**
   * Acknowledge consumption (for cleanup)
   */
  ack(topic: string, cursor: string): Promise<void>;
  
  /**
   * Subscribe to new events (real-time push)
   */
  subscribe(topic: string, handler: (entry: QueueEntry) => void): Unsubscribe;
  
  /**
   * Cleanup expired entries
   */
  cleanup(): Promise<number>;
  
  /**
   * Close the queue
   */
  close(): Promise<void>;
}

/**
 * Queue entry
 */
export interface QueueEntry {
  /** Monotonically increasing cursor */
  cursor: string;
  /** Topic (sessionId) */
  topic: string;
  /** Original event */
  event: SystemEvent;
  /** Timestamp */
  timestamp: number;
}

/**
 * QueueDriver - Pluggable storage backend
 */
export interface QueueDriver {
  createQueue(options?: QueueOptions): Promise<EventQueue>;
}

export interface QueueOptions {
  /** Max entries per topic */
  maxSize?: number;
  /** Entry TTL in milliseconds */
  ttlMs?: number;
}

Package Structure

packages/queue/
├── src/
│   ├── types.ts              # Interface definitions
│   ├── createQueue.ts        # Factory function
│   ├── drivers/
│   │   ├── memory.ts         # Memory driver (dev/test)
│   │   └── sqlite.ts         # SQLite driver (production)
│   └── index.ts
└── package.json

Usage in Runtime

// In createLocalAgentX.ts

import { createQueue, sqliteQueueDriver } from "@agentxjs/queue";

const queue = await createQueue(
  sqliteQueueDriver({ path: "./data/agentx.db" })
);

// Runtime events → push to queue by sessionId
runtime.onAny(async (event) => {
  if (!event.broadcastable) return;
  const sessionId = event.context?.sessionId;
  if (!sessionId) return;
  
  await queue.append(sessionId, event);
});

// Client connection → subscribe to session's queue
wsServer.onConnection((conn) => {
  conn.onMessage(async (msg) => {
    const { type, sessionId, cursor } = JSON.parse(msg);
    
    if (type === "subscribe") {
      // 1. Send missed events (reconnection recovery)
      const missed = await queue.read(sessionId, cursor);
      if (missed.length > 0) {
        conn.send(JSON.stringify({ type: "sync", events: missed }));
      }
      
      // 2. Subscribe to new events
      const unsubscribe = queue.subscribe(sessionId, (entry) => {
        conn.send(JSON.stringify({ type: "event", ...entry }));
      });
      
      conn.onClose(() => unsubscribe());
    }
    
    if (type === "ack") {
      await queue.ack(sessionId, cursor);
    }
  });
});

Reconnection Flow

Client disconnects at cursor:5
    ↓
... events cursor:6, cursor:7, cursor:8 are queued ...
    ↓
Client reconnects with lastCursor:5
    ↓
Server: queue.read(sessionId, "5") → [6, 7, 8]
    ↓
Server sends missed events to client
    ↓
Client processes and sends ack
    ↓
Server: queue.ack(sessionId, "8") → cleanup old entries

Implementation Phases

Phase 1: Core Package

  • Create @agentxjs/queue package structure
  • Define TypeScript interfaces
  • Implement MemoryQueueDriver for development/testing
  • Implement SqliteQueueDriver for production
  • Add unit tests

Phase 2: Runtime Integration

  • Update createLocalAgentX to use queue
  • Add client protocol for subscribe/ack
  • Update @agentxjs/network client to track cursor
  • Add reconnection recovery logic

Phase 3: Enterprise Extensions

  • Add RedisQueueDriver for distributed deployments
  • Add configurable cleanup policies
  • Add backpressure handling
  • Add metrics/monitoring hooks

Related Files

  • packages/agentx/src/createLocalAgentX.ts - Current broadcast logic (to be updated)
  • packages/network/src/WebSocketServer.ts - WebSocket server
  • packages/network/src/WebSocketClient.ts - WebSocket client (needs cursor tracking)
  • packages/persistence/src/Persistence.ts - Reference for driver pattern

Metadata

Metadata

Assignees

No one assigned

    Labels

    backendBackend related issuesenhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions