Skip to content

MongoDB-backed job scheduler for Node.js with atomic locking, retries (exponential backoff), cron scheduling, and event-driven observability.

License

Notifications You must be signed in to change notification settings

ueberBrot/monque

Repository files navigation

Monque Logo

CI @monque/core version @monque/tsed version Docs Codecov License Website

A MongoDB-backed job scheduler with atomic locking, exponential backoff, cron scheduling, and event-driven observability.

📚 Documentation | 🚀 Quick Start

Packages

Package Description
@monque/core Core job scheduler with MongoDB backend
@monque/tsed Native Ts.ED integration with decorators
@monque/docs Documentation site

Features

  • 🔒 Atomic Locking - Prevents duplicate job processing across multiple schedulers using MongoDB atomic operations
  • 💓 Heartbeat Monitoring - Automatic stale job detection and recovery
  • 🛡️ Type-Safe - Full TypeScript support with generics for job payloads
  • 🔌 Framework Agnostic - Works with any Node.js framework
  • 🔔 Event-Driven - Subscribe to job lifecycle events for observability and integrations
  • 📡 Change Streams - Real-time job notifications via MongoDB Change Streams (polling remains as a safety net)
  • Cron Scheduling - Schedule recurring jobs with standard 5-field cron expressions
  • 🔄 Exponential Backoff - Automatic retries with configurable exponential backoff

Quick Start

Install dependencies (MongoDB is a peer dependency):

bun add @monque/core mongodb

npm install @monque/core mongodb

pnpm add @monque/core mongodb
import { Monque } from '@monque/core';
import { MongoClient } from 'mongodb';

const client = new MongoClient('mongodb://localhost:27017');
await client.connect();

const monque = new Monque(client.db('myapp'));
await monque.initialize();

// Register a type-safe worker
interface EmailJob {
  to: string;
  subject: string;
}

monque.register<EmailJob>('send-email', async (job) => {
  console.log('Sending email to:', job.data.to);
  await sendEmail(job.data);
});

// Monitor job lifecycle
monque.on('job:complete', ({ job, duration }) => {
  console.log(`Job ${job.name} completed in ${duration}ms`);
});

// Start processing
monque.start();

// Enqueue jobs
await monque.enqueue('send-email', { 
  to: 'user@example.com',
  subject: 'Welcome!'
});

// Schedule recurring jobs
await monque.schedule('0 9 * * *', 'daily-report', { type: 'sales' });

// Graceful shutdown
process.on('SIGTERM', async () => {
  await monque.stop();
  await client.close();
});

Configuration

const monque = new Monque(db, {
  collectionName: 'monque_jobs',   // Default: 'monque_jobs'
  pollInterval: 1000,              // Default: 1000ms (backup polling)
  maxRetries: 10,                  // Default: 10
  baseRetryInterval: 1000,         // Default: 1000ms
  shutdownTimeout: 30000,          // Default: 30s
  defaultConcurrency: 5,           // Default: 5 jobs per worker
  lockTimeout: 1800000,            // Default: 30 minutes
  heartbeatInterval: 30000,        // Default: 30s
  recoverStaleJobs: true,          // Default: true
});

Development

This repository uses Bun workspaces and scripts for development.

# Install dependencies
bun install

# Run tests
bun run test

# Build all packages & apps
bun run build

# Format & lint code
bun run check

# Run docs locally
bun run dev:docs

Requirements

  • Node.js 22+
  • MongoDB 4.0+ (Replica Set required for Change Streams)
  • Bun 1.3.5+ (development only; required to work on this repo)

Documentation

Visit the documentation site for comprehensive guides:

Inspired by

Monque draws inspiration from several excellent job scheduling libraries:

  • Agenda - The original MongoDB job scheduler
  • Pulse - A maintained fork of Agenda
  • BullMQ - Robust Redis-based queue system
  • pg-boss - Postgres-backed job queue
  • graphile-worker - High-performance Postgres worker

License

ISC © Maurice de Bruyn