Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions packages/backend/app/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,23 @@ CREATE TABLE IF NOT EXISTS audit_logs (
action VARCHAR(100) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);

-- Background job execution tracking with retry support
CREATE TABLE IF NOT EXISTS job_executions (
id SERIAL PRIMARY KEY,
job_type VARCHAR(100) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'PENDING',
payload TEXT,
result TEXT,
retry_count INT NOT NULL DEFAULT 0,
max_retries INT NOT NULL DEFAULT 3,
last_error TEXT,
next_retry_at TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
user_id INT REFERENCES users(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS idx_job_executions_status ON job_executions(status);
CREATE INDEX IF NOT EXISTS idx_job_executions_type ON job_executions(job_type);
CREATE INDEX IF NOT EXISTS idx_job_executions_retry ON job_executions(status, next_retry_at);
61 changes: 61 additions & 0 deletions packages/backend/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,64 @@ class AuditLog(db.Model):
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True)
action = db.Column(db.String(100), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)


# ---------------------------------------------------------------------------
# Background Job Execution Tracking
# ---------------------------------------------------------------------------

class JobStatus(str, Enum):
PENDING = "PENDING"
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
RETRYING = "RETRYING"
DEAD = "DEAD" # exhausted all retries


class JobExecution(db.Model):
"""Tracks every background job execution with retry metadata."""

__tablename__ = "job_executions"
id = db.Column(db.Integer, primary_key=True)
job_type = db.Column(db.String(100), nullable=False, index=True)
status = db.Column(
db.String(20), default=JobStatus.PENDING.value, nullable=False, index=True
)
payload = db.Column(db.Text, nullable=True) # JSON-encoded job arguments
result = db.Column(db.Text, nullable=True) # JSON-encoded result or error detail

retry_count = db.Column(db.Integer, default=0, nullable=False)
max_retries = db.Column(db.Integer, default=3, nullable=False)
last_error = db.Column(db.Text, nullable=True)
next_retry_at = db.Column(db.DateTime, nullable=True)

started_at = db.Column(db.DateTime, nullable=True)
completed_at = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)

# Optional user association for user-scoped jobs
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=True)

def to_dict(self):
return {
"id": self.id,
"job_type": self.job_type,
"status": self.status,
"payload": self.payload,
"result": self.result,
"retry_count": self.retry_count,
"max_retries": self.max_retries,
"last_error": self.last_error,
"next_retry_at": (
self.next_retry_at.isoformat() if self.next_retry_at else None
),
"started_at": (
self.started_at.isoformat() if self.started_at else None
),
"completed_at": (
self.completed_at.isoformat() if self.completed_at else None
),
"created_at": self.created_at.isoformat(),
"user_id": self.user_id,
}
2 changes: 2 additions & 0 deletions packages/backend/app/routes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .categories import bp as categories_bp
from .docs import bp as docs_bp
from .dashboard import bp as dashboard_bp
from .jobs import bp as jobs_bp


def register_routes(app: Flask):
Expand All @@ -18,3 +19,4 @@ def register_routes(app: Flask):
app.register_blueprint(categories_bp, url_prefix="/categories")
app.register_blueprint(docs_bp, url_prefix="/docs")
app.register_blueprint(dashboard_bp, url_prefix="/dashboard")
app.register_blueprint(jobs_bp, url_prefix="/jobs")
181 changes: 181 additions & 0 deletions packages/backend/app/routes/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
"""
Jobs monitoring and management API endpoints.

Provides visibility into background job execution, retry status,
and allows manual retry of failed jobs.
"""

import logging
from flask import Blueprint, jsonify, request
from flask_jwt_extended import jwt_required, get_jwt_identity
from ..extensions import db
from ..models import JobExecution, JobStatus
from ..services.job_scheduler import (
get_job_stats,
manually_retry_job,
retry_pending_jobs,
execute_job,
_get_handler,
)

bp = Blueprint("jobs", __name__)
logger = logging.getLogger("finmind.jobs.routes")


@bp.get("/status")
@jwt_required()
def list_jobs():
"""
List all job executions with optional filtering and pagination.

Query params:
- status: filter by job status (PENDING, RUNNING, COMPLETED, FAILED, RETRYING, DEAD)
- job_type: filter by job type
- page: page number (default 1)
- per_page: items per page (default 20, max 100)
"""
uid = int(get_jwt_identity())

status_filter = request.args.get("status")
type_filter = request.args.get("job_type")
page = request.args.get("page", 1, type=int)
per_page = min(request.args.get("per_page", 20, type=int), 100)

query = db.session.query(JobExecution).filter_by(user_id=uid)

if status_filter:
query = query.filter(JobExecution.status == status_filter.upper())
if type_filter:
query = query.filter(JobExecution.job_type == type_filter)

query = query.order_by(JobExecution.created_at.desc())
total = query.count()
items = query.offset((page - 1) * per_page).limit(per_page).all()

return jsonify(
{
"jobs": [j.to_dict() for j in items],
"pagination": {
"page": page,
"per_page": per_page,
"total": total,
"pages": (total + per_page - 1) // per_page if per_page else 0,
},
}
)


@bp.get("/stats")
@jwt_required()
def job_stats():
"""
Aggregate job execution statistics.

Returns success rate, average duration, counts by status/type,
and circuit breaker states.
"""
stats = get_job_stats()
return jsonify(stats)


@bp.post("/retry/<int:job_id>")
@jwt_required()
def retry_job(job_id: int):
"""
Manually retry a failed or dead job.

Resets retry count and re-executes the job immediately.
"""
uid = int(get_jwt_identity())

job = db.session.get(JobExecution, job_id)
if not job or job.user_id != uid:
return jsonify(error="Job not found"), 404

if job.status not in (JobStatus.FAILED.value, JobStatus.DEAD.value):
return jsonify(
error=f"Cannot retry job in '{job.status}' status. "
f"Only FAILED or DEAD jobs can be retried."
), 400

retried = manually_retry_job(job_id)
if not retried:
return jsonify(error="Failed to retry job"), 500

return jsonify(retried.to_dict())


@bp.post("/process-retries")
@jwt_required()
def process_retries():
"""
Process all jobs that are due for retry.

This endpoint is designed to be called by a cron scheduler
or the APScheduler interval trigger.
"""
results = retry_pending_jobs()
logger.info(
"Processed retries: processed=%d succeeded=%d failed=%d",
results["processed"],
results["succeeded"],
results["failed"],
)
return jsonify(results)


@bp.get("/health")
def jobs_health():
"""
Health check endpoint for job processing subsystem.

Returns the state of circuit breakers and counts of
stuck/retrying jobs.
"""
from ..services.job_scheduler import email_breaker, whatsapp_breaker

stuck_count = (
db.session.query(JobExecution)
.filter(JobExecution.status == JobStatus.RUNNING.value)
.count()
)
retrying_count = (
db.session.query(JobExecution)
.filter(JobExecution.status == JobStatus.RETRYING.value)
.count()
)
dead_count = (
db.session.query(JobExecution)
.filter(JobExecution.status == JobStatus.DEAD.value)
.count()
)

healthy = (
email_breaker.state != "OPEN"
and whatsapp_breaker.state != "OPEN"
and stuck_count == 0
)

return jsonify(
{
"healthy": healthy,
"stuck_jobs": stuck_count,
"retrying_jobs": retrying_count,
"dead_jobs": dead_count,
"circuit_breakers": {
"email": email_breaker.get_status(),
"whatsapp": whatsapp_breaker.get_status(),
},
}
)


@bp.get("/<int:job_id>")
@jwt_required()
def get_job(job_id: int):
"""Get detailed information about a specific job."""
uid = int(get_jwt_identity())
job = db.session.get(JobExecution, job_id)
if not job or job.user_id != uid:
return jsonify(error="Job not found"), 404
return jsonify(job.to_dict())
Loading