Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/bin/ampctl/src/cmd/job.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Job management commands

pub mod events;
pub mod inspect;
pub mod list;
pub mod progress;
Expand All @@ -10,6 +11,10 @@ pub mod stop;
/// Job management subcommands.
#[derive(Debug, clap::Subcommand)]
pub enum Commands {
/// Get lifecycle events for a job
#[command(after_help = include_str!("job/events__after_help.md"))]
Events(events::Args),

/// List jobs with pagination
#[command(alias = "ls")]
#[command(after_help = include_str!("job/list__after_help.md"))]
Expand Down Expand Up @@ -41,6 +46,7 @@ pub enum Commands {
/// Execute the job command with the given subcommand.
pub async fn run(command: Commands) -> anyhow::Result<()> {
match command {
Commands::Events(args) => events::run(args).await?,
Commands::List(args) => list::run(args).await?,
Commands::Inspect(args) => inspect::run(args).await?,
Commands::Stop(args) => stop::run(args).await?,
Expand Down
130 changes: 130 additions & 0 deletions crates/bin/ampctl/src/cmd/job/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
//! Job events command.
//!
//! Retrieves and displays lifecycle events for a job through the admin API by:
//! 1. Creating a client for the admin API
//! 2. Using the client's job get_events method
//! 3. Displaying the events as JSON or human-readable output
//!
//! # Configuration
//!
//! - Admin URL: `--admin-url` flag or `AMP_ADMIN_URL` env var (default: `http://localhost:1610`)
//! - Logging: `AMP_LOG` env var (`error`, `warn`, `info`, `debug`, `trace`)

use amp_client_admin as client;
use amp_worker_core::jobs::job_id::JobId;
use monitoring::logging;

use crate::args::GlobalArgs;

/// Command-line arguments for the `jobs events` command.
#[derive(Debug, clap::Args)]
pub struct Args {
#[command(flatten)]
pub global: GlobalArgs,

/// The job identifier to get events for
pub job_id: JobId,
}

/// Get lifecycle events for a job by retrieving them from the admin API.
///
/// Retrieves job events and displays them based on the output format.
///
/// # Errors
///
/// Returns [`Error`] for API errors (400/500) or network failures.
#[tracing::instrument(skip_all, fields(admin_url = %global.admin_url, job_id = %job_id))]
pub async fn run(Args { global, job_id }: Args) -> Result<(), Error> {
tracing::debug!("retrieving job events from admin API");

let events = get_events(&global, job_id).await?;
let result = EventsResult { data: events };
global.print(&result).map_err(Error::JsonFormattingError)?;

Ok(())
}

/// Retrieve job events from the admin API.
///
/// Creates a client and uses the job get_events method.
#[tracing::instrument(skip_all)]
async fn get_events(
global: &GlobalArgs,
job_id: JobId,
) -> Result<client::jobs::JobEventsResponse, Error> {
let client = global.build_client().map_err(Error::ClientBuild)?;

let events = client.jobs().get_events(&job_id).await.map_err(|err| {
tracing::error!(
error = %err,
error_source = logging::error_source(&err),
"failed to get job events"
);
Error::ClientError(err)
})?;

match events {
Some(events) => Ok(events),
None => Err(Error::JobNotFound { job_id }),
}
}

/// Result wrapper for job events output.
#[derive(serde::Serialize)]
struct EventsResult {
#[serde(flatten)]
data: client::jobs::JobEventsResponse,
}

impl std::fmt::Display for EventsResult {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
writeln!(f, "Job ID: {}", self.data.job_id)?;
writeln!(f)?;

if self.data.events.is_empty() {
writeln!(f, "No events recorded.")?;
} else {
writeln!(f, "Events:")?;
for event in &self.data.events {
writeln!(
f,
" [{}] {} - {} (node: {})",
event.id, event.created_at, event.event_type, event.node_id
)?;
}
}
Ok(())
}
}

/// Errors for job events operations.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Failed to build admin API client
///
/// This occurs when the client configuration is invalid or the admin URL
/// is malformed.
#[error("failed to build admin API client")]
ClientBuild(#[source] crate::args::BuildClientError),

/// Job not found in the system
///
/// The specified job ID does not exist in the metadata database.
/// Verify the job ID is correct using `ampctl job list`.
#[error("job not found: {job_id}")]
JobNotFound { job_id: JobId },

/// Client error from the API
///
/// This wraps errors returned by the admin API client, including network
/// failures, invalid responses, and server errors.
#[error("client error")]
ClientError(#[source] amp_client_admin::jobs::GetEventsError),

/// Failed to format output as JSON
///
/// This occurs when serializing the events result to JSON fails,
/// which is unexpected for well-formed data structures.
#[error("failed to format output")]
JsonFormattingError(#[source] serde_json::Error),
}
9 changes: 9 additions & 0 deletions crates/bin/ampctl/src/cmd/job/events__after_help.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Examples

Get events for job 123:

ampctl job events 123

Get events in JSON format:

ampctl job events 123 --json
175 changes: 175 additions & 0 deletions crates/clients/admin/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ fn job_progress(id: &JobId) -> String {
format!("jobs/{id}/progress")
}

/// Build URL path for getting job events.
///
/// GET `/jobs/{id}/events`
fn job_events(id: &JobId) -> String {
format!("jobs/{id}/events")
}

/// Client for jobs-related API operations.
///
/// Created via [`Client::jobs`](crate::client::Client::jobs).
Expand Down Expand Up @@ -611,6 +618,111 @@ impl<'a> JobsClient<'a> {
}
}
}
/// Get all lifecycle events for a job.
///
/// GETs from `/jobs/{id}/events` endpoint.
///
/// # Errors
///
/// Returns [`GetEventsError`] for network errors, API errors (400/500),
/// or unexpected responses.
#[tracing::instrument(skip(self), fields(job_id = %id))]
pub async fn get_events(
&self,
id: &JobId,
) -> Result<Option<JobEventsResponse>, GetEventsError> {
let url = self
.client
.base_url()
.join(&job_events(id))
.expect("valid URL");

tracing::debug!("sending GET request for job events");

let response = self
.client
.http()
.get(url.as_str())
.send()
.await
.map_err(|err| GetEventsError::Network {
url: url.to_string(),
source: err,
})?;

let status = response.status();
tracing::debug!(status = %status, "received API response");

match status.as_u16() {
200 => {
let events: JobEventsResponse = response.json().await.map_err(|err| {
tracing::error!(
error = %err,
error_source = logging::error_source(&err),
"failed to parse events response"
);
GetEventsError::UnexpectedResponse {
status: status.as_u16(),
message: format!("failed to parse response: {}", err),
}
})?;
Ok(Some(events))
}
404 => {
tracing::debug!("job not found");
Ok(None)
}
400 | 500 => {
let text = response.text().await.map_err(|err| {
tracing::error!(
status = %status,
error = %err,
error_source = logging::error_source(&err),
"failed to read error response"
);
GetEventsError::UnexpectedResponse {
status: status.as_u16(),
message: format!("failed to read error response: {}", err),
}
})?;

let error_response: ErrorResponse = serde_json::from_str(&text).map_err(|err| {
tracing::error!(
status = %status,
error = %err,
error_source = logging::error_source(&err),
"failed to parse error response"
);
GetEventsError::UnexpectedResponse {
status: status.as_u16(),
message: text.clone(),
}
})?;

match error_response.error_code.as_str() {
"INVALID_JOB_ID" => Err(GetEventsError::InvalidJobId(error_response.into())),
"GET_JOB_ERROR" => Err(GetEventsError::GetJobError(error_response.into())),
"GET_JOB_EVENTS_ERROR" => {
Err(GetEventsError::GetJobEventsError(error_response.into()))
}
_ => Err(GetEventsError::UnexpectedResponse {
status: status.as_u16(),
message: text,
}),
}
}
_ => {
let text = response
.text()
.await
.unwrap_or_else(|err| format!("failed to read response body: {}", err));
Err(GetEventsError::UnexpectedResponse {
status: status.as_u16(),
message: text,
})
}
}
}
}

/// Response body for GET /jobs endpoint.
Expand Down Expand Up @@ -703,6 +815,28 @@ pub struct TableProgress {
pub total_size_bytes: i64,
}

/// Response for GET /jobs/{id}/events endpoint.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct JobEventsResponse {
/// Job ID
pub job_id: i64,
/// List of lifecycle events
pub events: Vec<JobEventInfo>,
}

/// A single job lifecycle event.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct JobEventInfo {
/// Event ID
pub id: i64,
/// Event timestamp in ISO 8601 / RFC 3339 format
pub created_at: String,
/// ID of the worker node that recorded this event
pub node_id: String,
/// Event type (e.g., SCHEDULED, RUNNING, COMPLETED, ERROR, FATAL, STOPPED)
pub event_type: String,
}

/// Errors that can occur when listing jobs.
#[derive(Debug, thiserror::Error)]
pub enum ListError {
Expand Down Expand Up @@ -886,6 +1020,47 @@ pub enum DeleteByStatusError {
UnexpectedResponse { status: u16, message: String },
}

/// Errors that can occur when getting job events.
#[derive(Debug, thiserror::Error)]
pub enum GetEventsError {
/// The job ID in the URL path is invalid (400, INVALID_JOB_ID)
///
/// This occurs when the ID cannot be parsed as a valid JobId.
#[error("invalid job ID")]
InvalidJobId(#[source] ApiError),

/// Failed to retrieve job from scheduler (500, GET_JOB_ERROR)
///
/// This occurs when the scheduler query fails due to a database
/// connection error or internal scheduler failure.
#[error("failed to get job")]
GetJobError(#[source] ApiError),

/// Failed to retrieve job events (500, GET_JOB_EVENTS_ERROR)
///
/// This occurs when:
/// - Database connection fails or is lost during the query
/// - Query execution encounters an internal database error
/// - Connection pool is exhausted or unavailable
#[error("failed to get job events")]
GetJobEventsError(#[source] ApiError),

/// Network or connection error
///
/// This occurs when the HTTP request to the admin API cannot be
/// completed due to connection timeout, DNS resolution failure,
/// or other transport-level errors.
#[error("network error connecting to {url}")]
Network { url: String, source: reqwest::Error },

/// Unexpected response from API
///
/// This occurs when the API returns a status code or error code
/// that is not part of the expected contract.
#[error("unexpected response (status {status}): {message}")]
UnexpectedResponse { status: u16, message: String },
}

/// Errors that can occur when getting job progress.
#[derive(Debug, thiserror::Error)]
pub enum GetProgressError {
Expand Down
Loading
Loading