Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions packages/backend/app/db/migrations/001_background_jobs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
-- Migration: Add background_jobs table for retry and monitoring
-- Created: 2026-03-25
-- Issue: #130 Resilient background job retry & monitoring

CREATE TABLE IF NOT EXISTS background_jobs (
id SERIAL PRIMARY KEY,
job_type VARCHAR(50) NOT NULL,
payload JSONB,
status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
priority INTEGER NOT NULL DEFAULT 0,

-- Retry tracking
retry_count INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL DEFAULT 3,
next_retry_at TIMESTAMP,
last_error TEXT,

-- Timing
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP,

-- Result
result JSONB,

-- User association
user_id INTEGER REFERENCES users(id)
);

-- Indexes for efficient querying
CREATE INDEX IF NOT EXISTS idx_background_jobs_status ON background_jobs(status);
CREATE INDEX IF NOT EXISTS idx_background_jobs_priority_created ON background_jobs(priority DESC, created_at ASC);
CREATE INDEX IF NOT EXISTS idx_background_jobs_next_retry ON background_jobs(next_retry_at) WHERE status = 'RETRYING';
CREATE INDEX IF NOT EXISTS idx_background_jobs_user_id ON background_jobs(user_id);

-- Comments for documentation
COMMENT ON TABLE background_jobs IS 'Background jobs with retry and monitoring support';
COMMENT ON COLUMN background_jobs.job_type IS 'Type of job: SEND_REMINDER, SEND_EMAIL, etc.';
COMMENT ON COLUMN background_jobs.status IS 'Job status: PENDING, RUNNING, SUCCEEDED, FAILED, RETRYING, DEAD_LETTER';
COMMENT ON COLUMN background_jobs.priority IS 'Higher priority jobs are processed first';
COMMENT ON COLUMN background_jobs.retry_count IS 'Number of retry attempts made';
COMMENT ON COLUMN background_jobs.max_retries IS 'Maximum retry attempts allowed';
COMMENT ON COLUMN background_jobs.next_retry_at IS 'When to retry this job (for exponential backoff)';
COMMENT ON COLUMN background_jobs.last_error IS 'Error message from last failure';
2 changes: 2 additions & 0 deletions packages/backend/app/routes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .categories import bp as categories_bp
from .docs import bp as docs_bp
from .dashboard import bp as dashboard_bp
from .background_jobs import bp as background_jobs_bp


def register_routes(app: Flask):
Expand All @@ -18,3 +19,4 @@ def register_routes(app: Flask):
app.register_blueprint(categories_bp, url_prefix="/categories")
app.register_blueprint(docs_bp, url_prefix="/docs")
app.register_blueprint(dashboard_bp, url_prefix="/dashboard")
app.register_blueprint(background_jobs_bp)
151 changes: 151 additions & 0 deletions packages/backend/app/routes/background_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
"""
API routes for background job management.

Provides endpoints for:
- Viewing job status
- Monitoring job metrics
- Managing dead letter queue
- Manual retry operations
"""

from flask import Blueprint, jsonify, request, current_app
from ..services.background_jobs import (
BackgroundJobService,
BackgroundJob,
JobStatus,
JobType,
job_metrics,
)

bp = Blueprint("background_jobs", __name__, url_prefix="/api/jobs")


@bp.get("/metrics")
def get_metrics():
"""Get aggregated job metrics for monitoring."""
return jsonify(job_metrics.get_stats())


@bp.get("/<int:job_id>")
def get_job_status(job_id: int):
"""Get status of a specific job."""
status = BackgroundJobService.get_job_status(job_id)
if not status:
return jsonify({"error": "Job not found"}), 404
return jsonify(status)


@bp.get("/pending")
def get_pending_jobs():
"""Get list of pending and retrying jobs."""
page = request.args.get("page", 1, type=int)
per_page = request.args.get("per_page", 20, type=int)

jobs = BackgroundJob.query.filter(
BackgroundJob.status.in_([
JobStatus.PENDING.value,
JobStatus.RETRYING.value
])
).order_by(
BackgroundJob.priority.desc(),
BackgroundJob.created_at.asc()
).paginate(page=page, per_page=per_page)

return jsonify({
"jobs": [
BackgroundJobService.get_job_status(job.id)
for job in jobs.items
],
"total": jobs.total,
"page": page,
"per_page": per_page,
"pages": jobs.pages,
})


@bp.get("/dead-letter")
def get_dead_letter_queue():
"""Get jobs in the dead letter queue."""
limit = request.args.get("limit", 50, type=int)
jobs = BackgroundJobService.get_dead_letter_jobs(limit=limit)
return jsonify({
"jobs": jobs,
"count": len(jobs),
})


@bp.post("/<int:job_id>/retry")
def retry_job(job_id: int):
"""Manually retry a failed job from the dead letter queue."""
success = BackgroundJobService.retry_dead_letter_job(job_id)
if not success:
return jsonify({
"error": "Job not found or not in dead letter queue"
}), 404

return jsonify({
"success": True,
"message": f"Job {job_id} queued for retry"
})


@bp.post("/process")
def process_jobs():
"""
Manually trigger job processing.

This endpoint is useful for testing or manual intervention.
In production, jobs should be processed by a background worker.
"""
limit = request.args.get("limit", 10, type=int)
stats = BackgroundJobService.process_pending_jobs(limit=limit)

return jsonify({
"success": True,
"stats": stats
})


@bp.post("/cleanup")
def cleanup_jobs():
"""Remove old completed jobs."""
days = request.args.get("days", 30, type=int)
deleted = BackgroundJobService.cleanup_old_jobs(days=days)

return jsonify({
"success": True,
"deleted_count": deleted
})


@bp.get("/health")
def jobs_health():
"""Health check endpoint for background job system."""
from ..extensions import db
from sqlalchemy import func

# Count jobs by status
status_counts = dict(
db.session.query(
BackgroundJob.status,
func.count(BackgroundJob.id)
).group_by(BackgroundJob.status).all()
)

# Check for stuck jobs (running for too long)
from datetime import datetime, timedelta
stuck_threshold = datetime.utcnow() - timedelta(hours=1)
stuck_count = BackgroundJob.query.filter(
BackgroundJob.status == JobStatus.RUNNING.value,
BackgroundJob.started_at < stuck_threshold
).count()

# Determine health status
is_healthy = stuck_count == 0

return jsonify({
"status": "healthy" if is_healthy else "degraded",
"status_counts": status_counts,
"stuck_jobs": stuck_count,
"metrics": job_metrics.get_stats(),
})
155 changes: 155 additions & 0 deletions packages/backend/app/services/README_BACKGROUND_JOBS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# Background Job System

## Overview

The background job system provides robust asynchronous task execution with automatic retry and monitoring capabilities.

## Features

- **Exponential Backoff Retry**: Jobs that fail are automatically retried with exponentially increasing delays
- **Dead Letter Queue**: Permanently failed jobs are stored for inspection and manual retry
- **Priority Queue**: Jobs can be prioritized to ensure critical tasks are processed first
- **Metrics & Monitoring**: Built-in metrics tracking for job success rates, processing times, and failures
- **Configurable Retry Policies**: Customize retry behavior per job type

## Usage

### Creating a Job

```python
from app.services.background_jobs import BackgroundJobService, JobType

# Enqueue a new job
job = BackgroundJobService.enqueue(
job_type=JobType.SEND_EMAIL,
payload={
"to": "user@example.com",
"subject": "Welcome!",
"body": "Welcome to FinMind"
},
priority=10, # Higher priority = processed first
max_retries=3, # Default is 3
user_id=123 # Optional: associate with user
)
```

### Registering a Job Handler

```python
from app.services.background_jobs import BackgroundJobService, JobType

def send_email_handler(payload: dict) -> dict:
# Your email sending logic here
send_email(
to=payload["to"],
subject=payload["subject"],
body=payload["body"]
)
return {"sent": True}

# Register the handler
BackgroundJobService.register_handler(JobType.SEND_EMAIL, send_email_handler)
```

### Processing Jobs

In production, use a background worker process:

```python
# In a worker process
from app.services.background_jobs import BackgroundJobService

while True:
stats = BackgroundJobService.process_pending_jobs(limit=10)
time.sleep(1) # Wait before next batch
```

## API Endpoints

| Endpoint | Method | Description |
|----------|--------|-------------|
| `/api/jobs/metrics` | GET | Get aggregated job metrics |
| `/api/jobs/<id>` | GET | Get status of specific job |
| `/api/jobs/pending` | GET | List pending and retrying jobs |
| `/api/jobs/dead-letter` | GET | List failed jobs in dead letter queue |
| `/api/jobs/<id>/retry` | POST | Manually retry a dead letter job |
| `/api/jobs/process` | POST | Manually trigger job processing |
| `/api/jobs/cleanup` | POST | Clean up old completed jobs |
| `/api/jobs/health` | GET | Health check for job system |

## Retry Configuration

Default retry behavior:
- **Initial Delay**: 1 second
- **Max Delay**: 5 minutes (300 seconds)
- **Backoff Multiplier**: 2x
- **Max Retries**: 3
- **Jitter**: Enabled (±25% random variation)

Custom configuration:

```python
from app.services.background_jobs import create_retry_config

config = create_retry_config(
max_retries=5,
initial_delay=2.0,
max_delay=600.0,
backoff_multiplier=3.0,
jitter=True
)
```

## Job Status Flow

```
PENDING → RUNNING → SUCCEEDED
FAILED → RETRYING → RUNNING
↓ (if retry_count < max_retries)
DEAD_LETTER
```

## Monitoring

Access metrics at `/api/jobs/metrics`:

```json
{
"jobs_created": 150,
"jobs_succeeded": 142,
"jobs_failed": 3,
"jobs_retried": 8,
"jobs_dead_letter": 5,
"avg_processing_time_ms": 125.5
}
```

## Database Schema

Jobs are stored in the `background_jobs` table with the following columns:

| Column | Type | Description |
|--------|------|-------------|
| id | INTEGER | Primary key |
| job_type | VARCHAR(50) | Type of job |
| payload | JSONB | Job-specific data |
| status | VARCHAR(20) | Current status |
| priority | INTEGER | Processing priority (higher = urgent) |
| retry_count | INTEGER | Number of retries attempted |
| max_retries | INTEGER | Maximum retry attempts |
| next_retry_at | TIMESTAMP | When to retry next |
| last_error | TEXT | Error from last failure |
| created_at | TIMESTAMP | Job creation time |
| started_at | TIMESTAMP | Processing start time |
| completed_at | TIMESTAMP | Completion time |
| result | JSONB | Job result data |
| user_id | INTEGER | Associated user (optional) |

## Best Practices

1. **Keep handlers idempotent**: Jobs may be retried, so handlers should handle duplicate execution gracefully
2. **Use appropriate priorities**: Reserve high priorities for time-sensitive tasks
3. **Monitor dead letter queue**: Regularly check for failed jobs and investigate root causes
4. **Set reasonable timeouts**: Long-running jobs should have their own timeout logic
5. **Log appropriately**: Include job ID in logs for troubleshooting
Loading