diff --git a/crates/bin/ampctl/src/cmd/job.rs b/crates/bin/ampctl/src/cmd/job.rs index 7a555fd8b..e43f127cf 100644 --- a/crates/bin/ampctl/src/cmd/job.rs +++ b/crates/bin/ampctl/src/cmd/job.rs @@ -1,5 +1,6 @@ //! Job management commands +pub mod events; pub mod inspect; pub mod list; pub mod progress; @@ -10,6 +11,10 @@ pub mod stop; /// Job management subcommands. #[derive(Debug, clap::Subcommand)] pub enum Commands { + /// Get lifecycle events for a job + #[command(after_help = include_str!("job/events__after_help.md"))] + Events(events::Args), + /// List jobs with pagination #[command(alias = "ls")] #[command(after_help = include_str!("job/list__after_help.md"))] @@ -41,6 +46,7 @@ pub enum Commands { /// Execute the job command with the given subcommand. pub async fn run(command: Commands) -> anyhow::Result<()> { match command { + Commands::Events(args) => events::run(args).await?, Commands::List(args) => list::run(args).await?, Commands::Inspect(args) => inspect::run(args).await?, Commands::Stop(args) => stop::run(args).await?, diff --git a/crates/bin/ampctl/src/cmd/job/events.rs b/crates/bin/ampctl/src/cmd/job/events.rs new file mode 100644 index 000000000..4a57afc0b --- /dev/null +++ b/crates/bin/ampctl/src/cmd/job/events.rs @@ -0,0 +1,130 @@ +//! Job events command. +//! +//! Retrieves and displays lifecycle events for a job through the admin API by: +//! 1. Creating a client for the admin API +//! 2. Using the client's job get_events method +//! 3. Displaying the events as JSON or human-readable output +//! +//! # Configuration +//! +//! - Admin URL: `--admin-url` flag or `AMP_ADMIN_URL` env var (default: `http://localhost:1610`) +//! - Logging: `AMP_LOG` env var (`error`, `warn`, `info`, `debug`, `trace`) + +use amp_client_admin as client; +use amp_worker_core::jobs::job_id::JobId; +use monitoring::logging; + +use crate::args::GlobalArgs; + +/// Command-line arguments for the `jobs events` command. +#[derive(Debug, clap::Args)] +pub struct Args { + #[command(flatten)] + pub global: GlobalArgs, + + /// The job identifier to get events for + pub job_id: JobId, +} + +/// Get lifecycle events for a job by retrieving them from the admin API. +/// +/// Retrieves job events and displays them based on the output format. +/// +/// # Errors +/// +/// Returns [`Error`] for API errors (400/500) or network failures. +#[tracing::instrument(skip_all, fields(admin_url = %global.admin_url, job_id = %job_id))] +pub async fn run(Args { global, job_id }: Args) -> Result<(), Error> { + tracing::debug!("retrieving job events from admin API"); + + let events = get_events(&global, job_id).await?; + let result = EventsResult { data: events }; + global.print(&result).map_err(Error::JsonFormattingError)?; + + Ok(()) +} + +/// Retrieve job events from the admin API. +/// +/// Creates a client and uses the job get_events method. +#[tracing::instrument(skip_all)] +async fn get_events( + global: &GlobalArgs, + job_id: JobId, +) -> Result { + let client = global.build_client().map_err(Error::ClientBuild)?; + + let events = client.jobs().get_events(&job_id).await.map_err(|err| { + tracing::error!( + error = %err, + error_source = logging::error_source(&err), + "failed to get job events" + ); + Error::ClientError(err) + })?; + + match events { + Some(events) => Ok(events), + None => Err(Error::JobNotFound { job_id }), + } +} + +/// Result wrapper for job events output. +#[derive(serde::Serialize)] +struct EventsResult { + #[serde(flatten)] + data: client::jobs::JobEventsResponse, +} + +impl std::fmt::Display for EventsResult { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + writeln!(f, "Job ID: {}", self.data.job_id)?; + writeln!(f)?; + + if self.data.events.is_empty() { + writeln!(f, "No events recorded.")?; + } else { + writeln!(f, "Events:")?; + for event in &self.data.events { + writeln!( + f, + " [{}] {} - {} (node: {})", + event.id, event.created_at, event.event_type, event.node_id + )?; + } + } + Ok(()) + } +} + +/// Errors for job events operations. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Failed to build admin API client + /// + /// This occurs when the client configuration is invalid or the admin URL + /// is malformed. + #[error("failed to build admin API client")] + ClientBuild(#[source] crate::args::BuildClientError), + + /// Job not found in the system + /// + /// The specified job ID does not exist in the metadata database. + /// Verify the job ID is correct using `ampctl job list`. + #[error("job not found: {job_id}")] + JobNotFound { job_id: JobId }, + + /// Client error from the API + /// + /// This wraps errors returned by the admin API client, including network + /// failures, invalid responses, and server errors. + #[error("client error")] + ClientError(#[source] amp_client_admin::jobs::GetEventsError), + + /// Failed to format output as JSON + /// + /// This occurs when serializing the events result to JSON fails, + /// which is unexpected for well-formed data structures. + #[error("failed to format output")] + JsonFormattingError(#[source] serde_json::Error), +} diff --git a/crates/bin/ampctl/src/cmd/job/events__after_help.md b/crates/bin/ampctl/src/cmd/job/events__after_help.md new file mode 100644 index 000000000..1dd545e51 --- /dev/null +++ b/crates/bin/ampctl/src/cmd/job/events__after_help.md @@ -0,0 +1,9 @@ +# Examples + +Get events for job 123: + + ampctl job events 123 + +Get events in JSON format: + + ampctl job events 123 --json diff --git a/crates/clients/admin/src/jobs.rs b/crates/clients/admin/src/jobs.rs index 5630fe1c2..08ab40cd1 100644 --- a/crates/clients/admin/src/jobs.rs +++ b/crates/clients/admin/src/jobs.rs @@ -54,6 +54,13 @@ fn job_progress(id: &JobId) -> String { format!("jobs/{id}/progress") } +/// Build URL path for getting job events. +/// +/// GET `/jobs/{id}/events` +fn job_events(id: &JobId) -> String { + format!("jobs/{id}/events") +} + /// Client for jobs-related API operations. /// /// Created via [`Client::jobs`](crate::client::Client::jobs). @@ -611,6 +618,111 @@ impl<'a> JobsClient<'a> { } } } + /// Get all lifecycle events for a job. + /// + /// GETs from `/jobs/{id}/events` endpoint. + /// + /// # Errors + /// + /// Returns [`GetEventsError`] for network errors, API errors (400/500), + /// or unexpected responses. + #[tracing::instrument(skip(self), fields(job_id = %id))] + pub async fn get_events( + &self, + id: &JobId, + ) -> Result, GetEventsError> { + let url = self + .client + .base_url() + .join(&job_events(id)) + .expect("valid URL"); + + tracing::debug!("sending GET request for job events"); + + let response = self + .client + .http() + .get(url.as_str()) + .send() + .await + .map_err(|err| GetEventsError::Network { + url: url.to_string(), + source: err, + })?; + + let status = response.status(); + tracing::debug!(status = %status, "received API response"); + + match status.as_u16() { + 200 => { + let events: JobEventsResponse = response.json().await.map_err(|err| { + tracing::error!( + error = %err, + error_source = logging::error_source(&err), + "failed to parse events response" + ); + GetEventsError::UnexpectedResponse { + status: status.as_u16(), + message: format!("failed to parse response: {}", err), + } + })?; + Ok(Some(events)) + } + 404 => { + tracing::debug!("job not found"); + Ok(None) + } + 400 | 500 => { + let text = response.text().await.map_err(|err| { + tracing::error!( + status = %status, + error = %err, + error_source = logging::error_source(&err), + "failed to read error response" + ); + GetEventsError::UnexpectedResponse { + status: status.as_u16(), + message: format!("failed to read error response: {}", err), + } + })?; + + let error_response: ErrorResponse = serde_json::from_str(&text).map_err(|err| { + tracing::error!( + status = %status, + error = %err, + error_source = logging::error_source(&err), + "failed to parse error response" + ); + GetEventsError::UnexpectedResponse { + status: status.as_u16(), + message: text.clone(), + } + })?; + + match error_response.error_code.as_str() { + "INVALID_JOB_ID" => Err(GetEventsError::InvalidJobId(error_response.into())), + "GET_JOB_ERROR" => Err(GetEventsError::GetJobError(error_response.into())), + "GET_JOB_EVENTS_ERROR" => { + Err(GetEventsError::GetJobEventsError(error_response.into())) + } + _ => Err(GetEventsError::UnexpectedResponse { + status: status.as_u16(), + message: text, + }), + } + } + _ => { + let text = response + .text() + .await + .unwrap_or_else(|err| format!("failed to read response body: {}", err)); + Err(GetEventsError::UnexpectedResponse { + status: status.as_u16(), + message: text, + }) + } + } + } } /// Response body for GET /jobs endpoint. @@ -703,6 +815,28 @@ pub struct TableProgress { pub total_size_bytes: i64, } +/// Response for GET /jobs/{id}/events endpoint. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct JobEventsResponse { + /// Job ID + pub job_id: i64, + /// List of lifecycle events + pub events: Vec, +} + +/// A single job lifecycle event. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct JobEventInfo { + /// Event ID + pub id: i64, + /// Event timestamp in ISO 8601 / RFC 3339 format + pub created_at: String, + /// ID of the worker node that recorded this event + pub node_id: String, + /// Event type (e.g., SCHEDULED, RUNNING, COMPLETED, ERROR, FATAL, STOPPED) + pub event_type: String, +} + /// Errors that can occur when listing jobs. #[derive(Debug, thiserror::Error)] pub enum ListError { @@ -886,6 +1020,47 @@ pub enum DeleteByStatusError { UnexpectedResponse { status: u16, message: String }, } +/// Errors that can occur when getting job events. +#[derive(Debug, thiserror::Error)] +pub enum GetEventsError { + /// The job ID in the URL path is invalid (400, INVALID_JOB_ID) + /// + /// This occurs when the ID cannot be parsed as a valid JobId. + #[error("invalid job ID")] + InvalidJobId(#[source] ApiError), + + /// Failed to retrieve job from scheduler (500, GET_JOB_ERROR) + /// + /// This occurs when the scheduler query fails due to a database + /// connection error or internal scheduler failure. + #[error("failed to get job")] + GetJobError(#[source] ApiError), + + /// Failed to retrieve job events (500, GET_JOB_EVENTS_ERROR) + /// + /// This occurs when: + /// - Database connection fails or is lost during the query + /// - Query execution encounters an internal database error + /// - Connection pool is exhausted or unavailable + #[error("failed to get job events")] + GetJobEventsError(#[source] ApiError), + + /// Network or connection error + /// + /// This occurs when the HTTP request to the admin API cannot be + /// completed due to connection timeout, DNS resolution failure, + /// or other transport-level errors. + #[error("network error connecting to {url}")] + Network { url: String, source: reqwest::Error }, + + /// Unexpected response from API + /// + /// This occurs when the API returns a status code or error code + /// that is not part of the expected contract. + #[error("unexpected response (status {status}): {message}")] + UnexpectedResponse { status: u16, message: String }, +} + /// Errors that can occur when getting job progress. #[derive(Debug, thiserror::Error)] pub enum GetProgressError { diff --git a/crates/core/metadata-db/src/job_events.rs b/crates/core/metadata-db/src/job_events.rs index 6da902cd8..0e8ceda94 100644 --- a/crates/core/metadata-db/src/job_events.rs +++ b/crates/core/metadata-db/src/job_events.rs @@ -30,6 +30,16 @@ pub struct JobEvent { pub created_at: DateTime, } +/// Represents a complete job lifecycle event from the job_events table +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct FullJobEvent { + pub id: i64, + pub created_at: DateTime, + pub job_id: JobId, + pub node_id: String, + pub event_type: String, +} + /// Register a new event in the job event log #[tracing::instrument(skip(exe), err)] pub async fn register<'c, E>( @@ -63,6 +73,23 @@ where .map_err(Error::Database) } +/// Get all lifecycle events for a job +/// +/// Returns all events ordered by id ascending, forming the complete +/// audit trail for the job's lifecycle. +#[tracing::instrument(skip(exe), err)] +pub async fn get_events_for_job<'c, E>( + exe: E, + job_id: impl Into + std::fmt::Debug, +) -> Result, Error> +where + E: Executor<'c>, +{ + sql::get_events_for_job(exe, job_id.into()) + .await + .map_err(Error::Database) +} + /// Get all scheduling attempts for a job /// /// Returns attempts derived from SCHEDULED events, ordered by retry_index ascending. diff --git a/crates/core/metadata-db/src/job_events/sql.rs b/crates/core/metadata-db/src/job_events/sql.rs index 82a0db684..a078e4dc2 100644 --- a/crates/core/metadata-db/src/job_events/sql.rs +++ b/crates/core/metadata-db/src/job_events/sql.rs @@ -2,7 +2,7 @@ use sqlx::{Executor, Postgres}; -use super::{EventDetail, JobEvent}; +use super::{EventDetail, FullJobEvent, JobEvent}; use crate::{ jobs::{JobId, JobStatus}, workers::WorkerNodeId, @@ -51,6 +51,24 @@ where .await } +/// Get all lifecycle events for a job, ordered by id ascending +pub async fn get_events_for_job<'c, E>( + exe: E, + job_id: JobId, +) -> Result, sqlx::Error> +where + E: Executor<'c, Database = Postgres>, +{ + let query = indoc::indoc! {r#" + SELECT id, created_at, job_id, node_id, event_type + FROM job_events + WHERE job_id = $1 + ORDER BY id ASC + "#}; + + sqlx::query_as(query).bind(job_id).fetch_all(exe).await +} + /// Get all scheduling attempts for a job /// /// Each SCHEDULED event in the log represents one attempt. diff --git a/crates/services/admin-api/src/handlers/jobs.rs b/crates/services/admin-api/src/handlers/jobs.rs index 98b16c6b8..0d5e4750b 100644 --- a/crates/services/admin-api/src/handlers/jobs.rs +++ b/crates/services/admin-api/src/handlers/jobs.rs @@ -2,6 +2,7 @@ pub mod delete; pub mod delete_by_id; +pub mod events; pub mod get_all; pub mod get_by_id; pub mod job_info; diff --git a/crates/services/admin-api/src/handlers/jobs/events.rs b/crates/services/admin-api/src/handlers/jobs/events.rs new file mode 100644 index 000000000..80dd41a0a --- /dev/null +++ b/crates/services/admin-api/src/handlers/jobs/events.rs @@ -0,0 +1,189 @@ +//! Job events handler + +use amp_worker_core::jobs::job_id::JobId; +use axum::{ + Json, + extract::{Path, State, rejection::PathRejection}, + http::StatusCode, +}; +use monitoring::logging; + +use crate::{ + ctx::Ctx, + handlers::error::{ErrorResponse, IntoErrorResponse}, + scheduler, +}; + +/// Handler for the `GET /jobs/{id}/events` endpoint +/// +/// Retrieves all lifecycle events for a specific job from the event log. +/// +/// ## Path Parameters +/// - `id`: The unique identifier of the job +/// +/// ## Response +/// - **200 OK**: Returns all events for the job +/// - **400 Bad Request**: Invalid job ID format +/// - **404 Not Found**: Job with the given ID does not exist +/// - **500 Internal Server Error**: Database connection or query error +/// +/// ## Error Codes +/// - `INVALID_JOB_ID`: The provided ID is not a valid job identifier +/// - `JOB_NOT_FOUND`: No job exists with the given ID +/// - `GET_JOB_ERROR`: Failed to retrieve job from scheduler +/// - `GET_JOB_EVENTS_ERROR`: Failed to retrieve job events from the database +#[tracing::instrument(skip_all, err)] +#[cfg_attr( + feature = "utoipa", + utoipa::path( + get, + path = "/jobs/{id}/events", + tag = "jobs", + operation_id = "get_job_events", + params( + ("id" = String, Path, description = "Job ID") + ), + responses( + (status = 200, description = "Successfully retrieved job events", body = JobEventsResponse), + (status = 400, description = "Invalid job ID", body = crate::handlers::error::ErrorResponse), + (status = 404, description = "Job not found", body = crate::handlers::error::ErrorResponse), + (status = 500, description = "Internal server error", body = crate::handlers::error::ErrorResponse) + ) + ) +)] +pub async fn handler( + State(ctx): State, + path: Result, PathRejection>, +) -> Result, ErrorResponse> { + let job_id = match path { + Ok(Path(id)) => id, + Err(err) => { + tracing::debug!( + error = %err, + error_source = logging::error_source(&err), + "invalid job ID in path" + ); + return Err(Error::InvalidId { source: err }.into()); + } + }; + + match ctx.scheduler.get_job(job_id).await { + Ok(Some(_)) => {} + Ok(None) => return Err(Error::NotFound { id: job_id }.into()), + Err(err) => { + tracing::debug!( + job_id = %job_id, + error = %err, + error_source = logging::error_source(&err), + "failed to get job" + ); + return Err(Error::GetJob(err).into()); + } + } + + let events = metadata_db::job_events::get_events_for_job(&ctx.metadata_db, job_id) + .await + .map_err(|err| { + tracing::error!( + job_id = %job_id, + error = %err, + error_source = logging::error_source(&err), + "failed to get job events" + ); + Error::GetEvents(err) + })?; + + let response_events = events + .into_iter() + .map(|e| JobEventInfo { + id: e.id, + created_at: e.created_at.to_rfc3339(), + node_id: e.node_id, + event_type: e.event_type, + }) + .collect(); + + Ok(Json(JobEventsResponse { + job_id: *job_id, + events: response_events, + })) +} + +/// Response body for GET /jobs/{id}/events endpoint. +#[derive(Debug, serde::Serialize)] +#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] +pub struct JobEventsResponse { + /// Job ID + pub job_id: i64, + /// List of lifecycle events + pub events: Vec, +} + +/// A single job lifecycle event. +#[derive(Debug, serde::Serialize)] +#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))] +pub struct JobEventInfo { + /// Event ID + pub id: i64, + /// Event timestamp in ISO 8601 / RFC 3339 format + pub created_at: String, + /// ID of the worker node that recorded this event + pub node_id: String, + /// Event type (e.g., SCHEDULED, RUNNING, COMPLETED, ERROR, FATAL, STOPPED) + pub event_type: String, +} + +/// Errors that can occur during job events retrieval +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// The job ID in the URL path is invalid + /// + /// This occurs when the path parameter cannot be parsed as a valid `JobId` + /// (e.g., negative numbers or non-integer values). + #[error("invalid job ID: {source}")] + InvalidId { + #[source] + source: PathRejection, + }, + + /// The requested job was not found + /// + /// This occurs when the job ID is valid but no job with that ID exists + /// in the scheduler. + #[error("job '{id}' not found")] + NotFound { id: JobId }, + + /// Failed to retrieve job from scheduler + /// + /// This occurs when the scheduler query fails due to a database connection + /// error or internal scheduler failure. + #[error("failed to get job")] + GetJob(#[source] scheduler::GetJobError), + + /// Failed to retrieve job events from the database + /// + /// This occurs when the job_events table query fails due to a database + /// connection error or query execution failure. + #[error("failed to get job events")] + GetEvents(#[source] metadata_db::Error), +} + +impl IntoErrorResponse for Error { + fn error_code(&self) -> &'static str { + match self { + Error::InvalidId { .. } => "INVALID_JOB_ID", + Error::NotFound { .. } => "JOB_NOT_FOUND", + Error::GetJob(_) => "GET_JOB_ERROR", + Error::GetEvents(_) => "GET_JOB_EVENTS_ERROR", + } + } + + fn status_code(&self) -> StatusCode { + match self { + Error::InvalidId { .. } => StatusCode::BAD_REQUEST, + Error::NotFound { .. } => StatusCode::NOT_FOUND, + Error::GetJob(_) => StatusCode::INTERNAL_SERVER_ERROR, + Error::GetEvents(_) => StatusCode::INTERNAL_SERVER_ERROR, + } + } +} diff --git a/crates/services/admin-api/src/lib.rs b/crates/services/admin-api/src/lib.rs index db593483e..dd09b56f2 100644 --- a/crates/services/admin-api/src/lib.rs +++ b/crates/services/admin-api/src/lib.rs @@ -86,6 +86,7 @@ pub fn router(ctx: Ctx) -> Router<()> { ) .route("/jobs/{id}/stop", put(jobs::stop::handler)) .route("/jobs/{id}/progress", get(jobs::progress::handler)) + .route("/jobs/{id}/events", get(jobs::events::handler)) .route( "/manifests", get(manifests::list_all::handler) @@ -147,6 +148,7 @@ pub fn router(ctx: Ctx) -> Router<()> { handlers::jobs::get_by_id::handler, handlers::jobs::stop::handler, handlers::jobs::progress::handler, + handlers::jobs::events::handler, handlers::jobs::delete::handler, handlers::jobs::delete_by_id::handler, // Provider endpoints @@ -196,6 +198,8 @@ pub fn router(ctx: Ctx) -> Router<()> { // Job schemas handlers::jobs::progress::JobProgressResponse, handlers::jobs::progress::TableProgress, + handlers::jobs::events::JobEventsResponse, + handlers::jobs::events::JobEventInfo, handlers::jobs::job_info::JobInfo, handlers::jobs::get_all::JobsResponse, handlers::jobs::delete::JobStatusFilter, diff --git a/docs/feat/admin-job.md b/docs/feat/admin-job.md index 5cd865b75..ae9562475 100644 --- a/docs/feat/admin-job.md +++ b/docs/feat/admin-job.md @@ -1,6 +1,6 @@ --- name: "app-ampctl-job" -description: "Job management commands: list, inspect, stop, remove, prune, progress. Load when asking about managing extraction jobs via ampctl CLI" +description: "Job management commands: list, inspect, stop, remove, prune, progress, events. Load when asking about managing extraction jobs via ampctl CLI" type: feature status: stable components: "app:ampctl,crate:admin-client,service:admin-api" @@ -10,7 +10,7 @@ components: "app:ampctl,crate:admin-client,service:admin-api" ## Summary -Job commands provide operational control over extraction jobs. Operators can list jobs with pagination, inspect job details, stop running jobs gracefully, remove individual terminal jobs, bulk-prune terminal jobs by status, and monitor sync progress with block-level detail for each table. +Job commands provide operational control over extraction jobs. Operators can list jobs with pagination, inspect job details, stop running jobs gracefully, remove individual terminal jobs, bulk-prune terminal jobs by status, monitor sync progress with block-level detail for each table, and view the complete lifecycle event history for a job. ## Table of Contents @@ -25,6 +25,7 @@ Job commands provide operational control over extraction jobs. Operators can lis - **Job**: An extraction task that syncs blockchain data for a dataset, executed by a worker node - **Terminal State**: A job that has finished executing (`COMPLETED`, `STOPPED`, or `ERROR`) - **Progress**: Sync state of a job's tables, including `current_block`, `start_block`, and file statistics +- **Events**: Append-only lifecycle event log recording every state transition (SCHEDULED, RUNNING, COMPLETED, ERROR, FATAL, STOPPED) - **Pagination**: Jobs are listed in pages using `--limit` (default: 50) and `--after` (cursor ID) ## Usage @@ -109,6 +110,15 @@ ampctl job progress 123 ampctl job progress 123 --json ``` +**View job events:** + +View the complete lifecycle event history for a job. Each event records a state transition with timestamp, worker node, and event type. + +```bash +ampctl job events 123 +ampctl job events 123 --json +``` + **JSON output for scripting:** ```bash diff --git a/docs/openapi-specs/admin.spec.json b/docs/openapi-specs/admin.spec.json index 183c49dd5..1eb37ac6f 100644 --- a/docs/openapi-specs/admin.spec.json +++ b/docs/openapi-specs/admin.spec.json @@ -1067,6 +1067,69 @@ } } }, + "/jobs/{id}/events": { + "get": { + "tags": [ + "jobs" + ], + "summary": "Handler for the `GET /jobs/{id}/events` endpoint", + "description": "Retrieves all lifecycle events for a specific job from the event log.\n\n## Path Parameters\n- `id`: The unique identifier of the job\n\n## Response\n- **200 OK**: Returns all events for the job\n- **400 Bad Request**: Invalid job ID format\n- **404 Not Found**: Job with the given ID does not exist\n- **500 Internal Server Error**: Database connection or query error\n\n## Error Codes\n- `INVALID_JOB_ID`: The provided ID is not a valid job identifier\n- `JOB_NOT_FOUND`: No job exists with the given ID\n- `GET_JOB_ERROR`: Failed to retrieve job from scheduler\n- `GET_JOB_EVENTS_ERROR`: Failed to retrieve job events from the database", + "operationId": "get_job_events", + "parameters": [ + { + "name": "id", + "in": "path", + "description": "Job ID", + "required": true, + "schema": { + "type": "string" + } + } + ], + "responses": { + "200": { + "description": "Successfully retrieved job events", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/JobEventsResponse" + } + } + } + }, + "400": { + "description": "Invalid job ID", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + }, + "404": { + "description": "Job not found", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + }, + "500": { + "description": "Internal server error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorResponse" + } + } + } + } + } + } + }, "/jobs/{id}/progress": { "get": { "tags": [ @@ -2631,6 +2694,57 @@ } } }, + "JobEventInfo": { + "type": "object", + "description": "A single job lifecycle event.", + "required": [ + "id", + "created_at", + "node_id", + "event_type" + ], + "properties": { + "created_at": { + "type": "string", + "description": "Event timestamp in ISO 8601 / RFC 3339 format" + }, + "event_type": { + "type": "string", + "description": "Event type (e.g., SCHEDULED, RUNNING, COMPLETED, ERROR, FATAL, STOPPED)" + }, + "id": { + "type": "integer", + "format": "int64", + "description": "Event ID" + }, + "node_id": { + "type": "string", + "description": "ID of the worker node that recorded this event" + } + } + }, + "JobEventsResponse": { + "type": "object", + "description": "Response body for GET /jobs/{id}/events endpoint.", + "required": [ + "job_id", + "events" + ], + "properties": { + "events": { + "type": "array", + "items": { + "$ref": "#/components/schemas/JobEventInfo" + }, + "description": "List of lifecycle events" + }, + "job_id": { + "type": "integer", + "format": "int64", + "description": "Job ID" + } + } + }, "JobInfo": { "type": "object", "description": "Represents job information for the API response", diff --git a/tests/src/tests/it_admin_api_job_events.rs b/tests/src/tests/it_admin_api_job_events.rs new file mode 100644 index 000000000..aca6670dc --- /dev/null +++ b/tests/src/tests/it_admin_api_job_events.rs @@ -0,0 +1,236 @@ +//! Integration tests for the Admin API job events endpoint. + +use std::time::Duration; + +use reqwest::StatusCode; +use serde::{Deserialize, Serialize}; + +use crate::testlib::{ctx::TestCtxBuilder, helpers::wait_for_job_completion}; + +#[tokio::test] +async fn get_job_events_with_deployed_dataset_succeeds() { + //* Given + let ctx = EventsTestCtx::setup_with_anvil("get_job_events_succeeds").await; + + // Mine blocks so syncing takes time + ctx.anvil().mine(10).await.expect("failed to mine blocks"); + + // Deploy with end_block so the job completes + let job_id = ctx + .deploy_dataset("_", "anvil_rpc", "0.0.0", Some(10)) + .await; + + //* When + let resp = ctx.get_job_events(job_id).await; + + //* Then + assert_eq!( + resp.status(), + StatusCode::OK, + "events retrieval should succeed for valid job" + ); + + let events_response: JobEventsResponse = resp + .json() + .await + .expect("failed to parse events response JSON"); + + // Verify response structure + assert_eq!(events_response.job_id, job_id); + // A deployed job should have at least a SCHEDULED event + assert!( + !events_response.events.is_empty(), + "should have at least one event" + ); + + // Verify the first event is SCHEDULED + assert_eq!( + events_response.events[0].event_type, "SCHEDULED", + "first event should be SCHEDULED" + ); +} + +#[tokio::test] +async fn get_job_events_with_nonexistent_job_id_returns_not_found() { + //* Given + let ctx = EventsTestCtx::setup("get_job_events_invalid_id", Vec::<&str>::new()).await; + + //* When + // Use a job ID that doesn't exist + let resp = ctx.get_job_events(999999).await; + + //* Then + assert_eq!( + resp.status(), + StatusCode::NOT_FOUND, + "events retrieval should fail for non-existent job" + ); + + let error: ErrorResponse = resp.json().await.expect("failed to parse error response"); + + assert_eq!(error.error_code, "JOB_NOT_FOUND"); +} + +#[tokio::test] +async fn get_job_events_with_negative_job_id_returns_bad_request() { + //* Given + let ctx = EventsTestCtx::setup("get_job_events_negative_id", Vec::<&str>::new()).await; + + //* When + // Use -1 as job ID which is not a valid JobId + let resp = ctx.get_job_events(-1).await; + + //* Then + assert_eq!( + resp.status(), + StatusCode::BAD_REQUEST, + "events retrieval should fail for invalid job ID format" + ); + + let error: ErrorResponse = resp.json().await.expect("failed to parse error response"); + + assert_eq!(error.error_code, "INVALID_JOB_ID"); +} + +#[tokio::test] +async fn get_job_events_shows_lifecycle_transitions() { + //* Given + let ctx = EventsTestCtx::setup_with_anvil("get_job_events_lifecycle").await; + + // Mine blocks + ctx.anvil().mine(3).await.expect("failed to mine blocks"); + + // Deploy with end_block so the job completes + let job_id = ctx.deploy_dataset("_", "anvil_rpc", "0.0.0", Some(3)).await; + + // Wait until the job reaches a terminal state + let timeout = tokio::time::Duration::from_secs(30); + + wait_for_job_completion( + &ctx.ctx.new_ampctl(), + job_id + .try_into() + .expect("failed to convert job id to JobId"), + false, + timeout, + Duration::from_millis(100), + ) + .await + .expect("failed to wait for job completion"); + + //* When + let resp = ctx.get_job_events(job_id).await; + + //* Then + let final_events: JobEventsResponse = resp.json().await.expect("failed to parse JSON"); + + // Verify we have multiple events showing the lifecycle + assert!( + final_events.events.len() >= 2, + "should have at least SCHEDULED and a terminal event, got {}", + final_events.events.len() + ); + + // Verify events are ordered by id ascending + for window in final_events.events.windows(2) { + assert!( + window[0].id < window[1].id, + "events should be ordered by id ascending" + ); + } + + // Verify first event is SCHEDULED + assert_eq!(final_events.events[0].event_type, "SCHEDULED"); + + // Verify each event has a non-empty node_id and created_at + for event in &final_events.events { + assert!(!event.node_id.is_empty(), "node_id should not be empty"); + assert!( + !event.created_at.is_empty(), + "created_at should not be empty" + ); + } +} + +struct EventsTestCtx { + ctx: crate::testlib::ctx::TestCtx, +} + +impl EventsTestCtx { + async fn setup( + test_name: &str, + manifests: impl IntoIterator>, + ) -> Self { + let ctx = TestCtxBuilder::new(test_name) + .with_dataset_manifests(manifests) + .build() + .await + .expect("failed to build test context"); + Self { ctx } + } + + async fn setup_with_anvil(test_name: &str) -> Self { + let ctx = TestCtxBuilder::new(test_name) + .with_anvil_http() + .with_dataset_manifest("anvil_rpc") + .build() + .await + .expect("failed to build test context"); + Self { ctx } + } + + fn anvil(&self) -> &crate::testlib::fixtures::Anvil { + self.ctx.anvil() + } + + async fn get_job_events(&self, job_id: i64) -> reqwest::Response { + let url = format!( + "{}/jobs/{}/events", + self.ctx.daemon_controller().admin_api_url(), + job_id + ); + + reqwest::Client::new() + .get(&url) + .send() + .await + .expect("failed to send request") + } + + async fn deploy_dataset( + &self, + namespace: &str, + name: &str, + revision: &str, + end_block: Option, + ) -> i64 { + let ampctl = self.ctx.new_ampctl(); + let reference = format!("{}/{}@{}", namespace, name, revision); + + let job_id = ampctl + .dataset_deploy(&reference, end_block, None, None) + .await + .expect("failed to deploy dataset"); + + *job_id + } +} + +#[derive(Debug, Deserialize, Serialize)] +struct JobEventsResponse { + job_id: i64, + events: Vec, +} + +#[derive(Debug, Deserialize, Serialize)] +struct JobEventInfo { + id: i64, + created_at: String, + node_id: String, + event_type: String, +} + +#[derive(Debug, Deserialize)] +struct ErrorResponse { + error_code: String, +} diff --git a/tests/src/tests/mod.rs b/tests/src/tests/mod.rs index 75d5a8bb3..911451768 100644 --- a/tests/src/tests/mod.rs +++ b/tests/src/tests/mod.rs @@ -3,6 +3,7 @@ mod it_admin_api_datasets_manifest; mod it_admin_api_datasets_register; mod it_admin_api_datasets_restore; mod it_admin_api_datasets_stop_job; +mod it_admin_api_job_events; mod it_admin_api_jobs_progress; mod it_admin_api_revisions; mod it_admin_api_schema;