Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 25 additions & 18 deletions scripts/python/invoke-cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
from dotenv import load_dotenv

# Configuration
ENDPOINT = "/api/v1/cron/evaluations" # Endpoint to invoke
ENDPOINTS = [
"/api/v1/cron/evaluations",
"/api/v1/cron/pending-jobs",
]
REQUEST_TIMEOUT = 30 # Timeout for requests in seconds

# Setup logging
Expand All @@ -33,7 +36,7 @@ def __init__(self):
# Load BASE_URL from environment with default fallback
base_url = os.getenv("API_BASE_URL", "http://localhost:8000")
self.base_url = base_url.rstrip("/")
self.endpoint = ENDPOINT
self.endpoints = ENDPOINTS

# Load interval from environment with default of 5 minutes
self.interval_minutes = int(os.getenv("CRON_INTERVAL_MINUTES", "5"))
Expand Down Expand Up @@ -83,35 +86,33 @@ async def authenticate(self, client: httpx.AsyncClient) -> str:
logger.error(f"Authentication error: {e}")
raise

async def invoke_endpoint(self, client: httpx.AsyncClient) -> dict:
"""Invoke the configured endpoint."""
async def invoke_endpoint(self, client: httpx.AsyncClient, endpoint: str) -> dict:
"""Invoke a single endpoint."""
if not self.access_token:
await self.authenticate(client)

headers = {"Authorization": f"Bearer {self.access_token}"}

# Debug: Log what we're sending
logger.debug(f"Request URL: {self.base_url}{self.endpoint}")
logger.debug(f"Request headers: {headers}")
logger.debug(f"[invoke_endpoint] Request URL: {self.base_url}{endpoint}")

try:
response = await client.get(
f"{self.base_url}{self.endpoint}",
f"{self.base_url}{endpoint}",
headers=headers,
timeout=REQUEST_TIMEOUT,
)

# Debug: Log response headers and first part of body
logger.debug(f"Response status: {response.status_code}")
logger.debug(f"Response headers: {dict(response.headers)}")
logger.debug(f"[invoke_endpoint] Response status: {response.status_code}")

# If unauthorized or forbidden (token expired/invalid), re-authenticate and retry once
if response.status_code in (401, 403):
logger.info("Token expired or invalid, re-authenticating...")
logger.info(
"[invoke_endpoint] Token expired or invalid, re-authenticating..."
)
await self.authenticate(client)
headers = {"Authorization": f"Bearer {self.access_token}"}
response = await client.get(
f"{self.base_url}{self.endpoint}",
f"{self.base_url}{endpoint}",
headers=headers,
timeout=REQUEST_TIMEOUT,
)
Expand All @@ -121,18 +122,18 @@ async def invoke_endpoint(self, client: httpx.AsyncClient) -> dict:

except httpx.HTTPStatusError as e:
logger.error(
f"Endpoint invocation failed with status {e.response.status_code}: {e.response.text}"
f"[invoke_endpoint] Endpoint invocation failed with status {e.response.status_code}: {e.response.text}"
)
raise
except Exception as e:
logger.error(f"Endpoint invocation error: {e}")
logger.error(f"[invoke_endpoint] Endpoint invocation error: {e}")
raise

async def run(self):
"""Main loop to invoke endpoint periodically."""
logger.info(f"Using API Base URL: {self.base_url}")
logger.info(
f"Starting cron job - invoking {self.endpoint} every {self.interval_minutes} minutes"
f"Starting cron job - invoking {self.endpoints} every {self.interval_minutes} minutes"
)

# Use async context manager to ensure proper cleanup
Expand All @@ -145,8 +146,14 @@ async def run(self):
start_time = datetime.now()
logger.info(f"Invoking endpoint at {start_time}")

result = await self.invoke_endpoint(client)
logger.info(f"Endpoint invoked successfully: {result}")
for endpoint in self.endpoints:
try:
result = await self.invoke_endpoint(client, endpoint)
logger.info(f"[{endpoint}] invoked successfully: {result}")
except Exception as endpoint_error:
logger.error(
f"[{endpoint}] invocation failed: {endpoint_error}"
)

# Calculate next invocation time
elapsed = (datetime.now() - start_time).total_seconds()
Expand Down
Loading