Skip to content
This repository was archived by the owner on Apr 17, 2026. It is now read-only.
This repository was archived by the owner on Apr 17, 2026. It is now read-only.

feat: scheduled agent with cron-based task dispatch (daemon and HTTP modes) #145

Description

@yai-dev

Overview

Add a createScheduledAgent() factory to @agentrail/app that enables agents to run as long-lived background processes driven by cron-style schedules. Users choose between two execution backends — daemon (in-process, direct agent.stream()) and HTTP (POST to a running /stream endpoint) — by switching a single mode field. All other configuration (task store, session strategy) remains identical between modes.

Motivation

Currently, agentrail's architecture is purely request/response: each /stream POST starts a turn, the stream ends, and the session is idle until the next request. There is no first-class way to schedule recurring agent work (e.g. "summarise open PRs every morning", "poll an inbox every 5 minutes") without an external orchestrator.

This feature fills that gap by providing a self-contained scheduling layer that integrates cleanly with the existing AgentrailSessionStore abstraction and the two official storage backends.

Design

Three orthogonal dimensions

CronTaskStore   (where task definitions are persisted)
  × AgentrailSessionStore   (where conversation history is persisted)  
  × mode   (how a fired task executes)

These dimensions are fully independent and can be combined freely.


New interface: CronTaskStore

// packages/app/src/scheduled-agent/cron-task-store.ts

export interface CronTaskStore {
  loadTasks(): Promise<CronTask[]>;
  addTask(task: Omit<CronTask, "id" | "createdAt">): Promise<string>;
  markFired(id: string, firedAt: number): Promise<void>;
  removeTasks(ids: string[]): Promise<void>;
  /** Optional hot-reload; scheduler reloads when called. */
  subscribe?(onChange: () => void): () => void;
}

Two built-in implementations:

Class Package Persistence Hot-reload
FileCronTaskStore @agentrail/app JSON file on disk chokidar watch
PostgresCronTaskStore @agentrail/storage-postgres new agentrail.scheduled_tasks table pg LISTEN/NOTIFY

createScheduledAgent() API

export interface ScheduledAgentOptions {
  /**
   * Task source. Three forms:
   *   CronTask[]       — in-memory only (session-scoped)
   *   { filePath }     — shorthand for FileCronTaskStore
   *   CronTaskStore    — any persistent backend
   */
  tasks: CronTask[] | { filePath: string } | CronTaskStore;

  mode: DaemonMode | HttpMode;

  onFire?: (task: CronTask, result: ScheduledFireResult) => void | Promise<void>;
  onError?: (task: CronTask, error: unknown) => void | Promise<void>;
}

export interface ScheduledAgent {
  start(): void;
  stop(): void;
  addTask(task: Omit<CronTask, "id" | "createdAt">): string;
  removeTask(id: string): void;
  getNextFireTime(): number | null;
}

Daemon mode

export interface DaemonMode {
  type: "daemon";
  agent: Agent;
  /** Accepts any AgentrailSessionStore: SessionManager, PostgresSessionStore, or custom. */
  sessionStore: AgentrailSessionStore;
  sessionStrategy?:
    | "per-task"              // each task.id maps to a fixed sessionId
    | "shared"                // all tasks share one session
    | { sessionId: string }
    | ((task: CronTask) => string);
  tenantId?: string;          // default: "system"
  userId?: string;            // default: "scheduler"
}

HTTP mode

export interface HttpMode {
  type: "http";
  endpoint: string;
  agentId?: string;
  sessionId?: string | ((task: CronTask) => string);
  headers?: Record<string, string>;
  timeoutMs?: number;
  awaitCompletion?: boolean;
}

CronTask model

export type CronTask = {
  id: string;
  /** Standard 5-field cron expression, process-local timezone. */
  cron: string;
  prompt: string;
  createdAt: number;
  lastFiredAt?: number;
  recurring?: boolean;
  /** Exempt from auto-expiry. */
  permanent?: boolean;
};

File structure

packages/app/src/
  scheduled-agent/
    index.ts              ← public API + re-exports
    types.ts              ← CronTask, ScheduledAgentOptions, modes
    cron-task-store.ts    ← CronTaskStore interface
    file-cron-task-store.ts
    cron.ts               ← 5-field expression parser + computeNextCronRun
    cron-tasks.ts         ← task model helpers, jitter, missed-task detection
    cron-scheduler.ts     ← 1 s polling loop, chokidar, file lock
    daemon-executor.ts    ← DaemonMode: agent.stream() + sessionStore
    http-executor.ts      ← HttpMode: fetch() + optional SSE drain

packages/storage-postgres/src/
  cron-task-store.ts      ← PostgresCronTaskStore (new)
  schema.ts               ← extend DDL with scheduled_tasks table
  index.ts                ← export PostgresCronTaskStore

Usage examples

Local dev — file tasks + file history + daemon

import { createScheduledAgent, SessionManager } from "@agentrail/app";

const scheduler = createScheduledAgent({
  tasks: { filePath: "./.agentrail/scheduled_tasks.json" },
  mode: {
    type: "daemon",
    agent: myAgent,
    sessionStore: new SessionManager({ dataDir: "./.agentrail/sessions" }),
    sessionStrategy: "per-task",
  },
});

scheduler.start();
process.on("SIGTERM", () => scheduler.stop());

Production — postgres tasks + postgres history + daemon

import { createScheduledAgent } from "@agentrail/app";
import { createSqlClient, PostgresCronTaskStore, PostgresSessionStore } from "@agentrail/storage-postgres";

const sql = createSqlClient({ connectionString: process.env.DATABASE_URL });

createScheduledAgent({
  tasks: new PostgresCronTaskStore(sql),
  mode: {
    type: "daemon",
    agent: myAgent,
    sessionStore: new PostgresSessionStore(sql),
    sessionStrategy: "per-task",
  },
}).start();

Existing HTTP server — postgres tasks + HTTP execution

createScheduledAgent({
  tasks: new PostgresCronTaskStore(sql),
  mode: {
    type: "http",
    endpoint: "http://localhost:3000/stream",
    agentId: "my-agent",
    sessionId: (task) => `cron-${task.id}`,
    headers: { Authorization: `Bearer ${process.env.API_KEY}` },
    awaitCompletion: true,
  },
}).start();

Custom storage backend

class RedisTaskStore implements CronTaskStore {
  async loadTasks() { /* ... */ }
  async markFired(id, firedAt) { /* ... */ }
  async removeTasks(ids) { /* ... */ }
  subscribe(onChange) { /* keyspace notification */ return () => {}; }
}

createScheduledAgent({ tasks: new RedisTaskStore(), mode: { ... } }).start();

Scheduler internals (ported from reference implementation)

The cron logic (cron.ts, cron-tasks.ts, cron-scheduler.ts) is a direct port of a battle-tested 5-field cron implementation, stripped of all external dependencies beyond Node.js built-ins and optional chokidar. Key behaviours preserved:

  • Jitter on recurring tasks to smooth fleet-wide load spikes
  • Missed one-shot detection on startup with configurable handling
  • File lock to prevent double-fire when multiple processes share the same task file
  • lastFiredAt persistence so next-fire survives process restarts
  • Auto-expiry for recurring tasks older than recurringMaxAgeMs (default 7 days; permanent tasks exempt)

Acceptance criteria

  • createScheduledAgent() exported from @agentrail/app
  • CronTaskStore interface exported from @agentrail/app
  • FileCronTaskStore exported from @agentrail/app
  • PostgresCronTaskStore exported from @agentrail/storage-postgres; buildSchemaDDL() updated with scheduled_tasks table
  • Daemon mode: fires task prompt via agent.stream(), persists result messages via sessionStore.appendMessages()
  • HTTP mode: fires via fetch() POST; respects awaitCompletion
  • sessionStrategy variants all work in daemon mode
  • Inline CronTask[], { filePath } shorthand, and full CronTaskStore all accepted for tasks
  • scheduler.addTask() / removeTask() work at runtime
  • Scheduler shuts down cleanly on stop() (no dangling timers or file watchers)
  • Unit tests for cron.ts (expression parsing + computeNextCronRun)
  • Unit tests for scheduler fire/reschedule/expiry logic
  • Example added under examples/ demonstrating both modes

Metadata

Metadata

Assignees

No one assigned

    Labels

    architectureDesign and structural concernsdxDeveloper experienceenhancementNew feature or requesthostRelated to @agentrail/host package

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions