Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion scripts/workflow_status/lib/gh.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,13 @@ def fetch_jobs(run_id: int, config: Config) -> list[dict]:


def fetch_log(run_id: int, config: Config, failed_only: bool = True) -> str | None:
"""Fetch the log for a run (``--log-failed`` or ``--log``)."""
"""Fetch the log for a whole run (``--log-failed`` or ``--log``).

NOTE: ``gh run view --log[-failed]`` is rejected by the CLI when the run has
too many jobs (``too many API requests needed to fetch logs; try narrowing
down to a specific job with the --job option``). For analysis use
:func:`fetch_job_log` instead, which fetches per-job and avoids the throttle.
"""
flag = "--log-failed" if failed_only else "--log"
out = run_gh_safe(
"run",
Expand All @@ -142,6 +148,43 @@ def fetch_log(run_id: int, config: Config, failed_only: bool = True) -> str | No
return out


def fetch_job_log(job_id: int, config: Config) -> str | None:
"""Fetch the log for a single job by ID.

Tries ``gh run view --job <id> --log`` first, which returns the log with
the standard ``<job_name>\\t<step>\\t<timestamp>\\t<line>`` prefixes used
elsewhere in the analyser. Falls back to the raw REST endpoint
``/repos/{owner}/{repo}/actions/jobs/{job_id}/logs`` if the gh wrapper
returns nothing.

Per-job fetching is the only reliable path for runs with many jobs (large
matrix builds): the run-level log endpoints are rejected by the gh CLI
with ``too many API requests needed to fetch logs; try narrowing down to a
specific job with the --job option``.
"""
out = run_gh_safe(
"run",
"view",
"-R",
config.repo,
"--job",
str(job_id),
"--log",
config=config,
)
if out:
return out

raw = run_gh_safe(
"api",
"-H",
"Accept: application/vnd.github.v3.raw",
f"/repos/{config.repo}/actions/jobs/{job_id}/logs",
config=config,
)
return raw


def detect_repo(config: Config) -> str:
"""Auto-detect the repo via ``gh repo view``."""
out = run_gh_safe("repo", "view", "--json", "nameWithOwner", "-q", ".nameWithOwner", config=config)
Expand Down
43 changes: 43 additions & 0 deletions scripts/workflow_status/lib/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,49 @@ def filter_log_for_job(full_log: str, job_name: str) -> str:
return full_log


def line_matches_error(line: str) -> bool:
"""Return True if *line* looks like a real error/failure line."""
return any(p.search(line) for p in _ERROR_PATTERNS)


def pick_display_snippet(
block: str,
*,
max_lines: int = 12,
context_before: int = 2,
) -> str:
"""Pick the most informative slice of an extracted failure block.

Algorithm:
1. Strip empty lines and `...` block-internal separators.
2. Locate lines matching an error pattern. The *last* match is the
anchor -- the deepest error is usually the actionable one (earlier
matches are often build-up / dependency chatter, and the GH
``##[error]Process completed`` summary tends to immediately follow
the actual error so we want it inside the window).
3. Return up to ``max_lines`` lines, with ``context_before`` lines of
lead-in before the anchor.
4. If no error pattern hits, return the *tail* of the block, not the
head -- the tail contains the GH job-completion lines and is far
more useful than the first few lines (often pure context-before
lead-in produced by ``_extract_error_lines``).
"""
lines = [ln for ln in block.splitlines() if ln.strip() and ln.strip() != "..."]
if not lines:
return ""

err_idx = [i for i, ln in enumerate(lines) if line_matches_error(ln)]
if not err_idx:
return "\n".join(lines[-max_lines:])

anchor = err_idx[-1]
start = max(0, anchor - context_before)
end = min(len(lines), start + max_lines)
if end - start < max_lines:
start = max(0, end - max_lines)
return "\n".join(lines[start:end])


def split_into_blocks(content: str) -> list[str]:
"""Split *content* on ``BLOCK_SEP`` into individual failure blocks."""
blocks: list[str] = []
Expand Down
54 changes: 41 additions & 13 deletions scripts/workflow_status/workflow_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@
format_failure_detail,
status_emoji,
)
from lib.gh import detect_repo, fetch_jobs, fetch_log, fetch_run
from lib.gh import detect_repo, fetch_job_log, fetch_jobs, fetch_run
from lib.logs import (
extract_relevant_failures,
filter_log_for_job,
pick_display_snippet,
split_into_blocks,
)
from lib.similarity import (
Expand Down Expand Up @@ -166,7 +167,7 @@ def _process_failed_job(
rep_bidx = grp[0]
block = blocks[rep_bidx]
ai_st, cause, fix = ai_results.get(rep_bidx, ("", "", ""))
display_st = ai_st if ai_st else "\n".join([ln for ln in block.splitlines() if ln.strip()][:5])
display_st = ai_st if ai_st else pick_display_snippet(block)
stacktraces.append((display_st, cause, fix))

if len(grp) > 1:
Expand Down Expand Up @@ -274,18 +275,43 @@ def main() -> None:
else:
out.print(f"*\U0001f534 Failure Details ({len(failed_jobs)} failed job(s)):*")

# Fetch logs once for the whole run
log_out: str | None = None
if config.analyze_cause or config.print_logs:
log_out = fetch_log(config.run_id, config, failed_only=True)

# Group failed jobs by error similarity to deduplicate
# Fetch logs per-job in parallel. Logs are needed for the stacktrace
# display itself, similarity-based grouping, and (optionally) AI
# cause/fix analysis -- so we always fetch when there are failures,
# regardless of --no-cause / --no-fix / --print-logs.
#
# We use per-job ``gh run view --job <id> --log`` because the
# run-level ``--log[-failed]`` path is rejected for matrices large
# enough to trip gh's safety throttle:
# too many API requests needed to fetch logs;
# try narrowing down to a specific job with the --job option
# which used to leave every failure stuck on "(no logs available)".
job_log_by_id: dict[int, str] = {}
with ThreadPoolExecutor(max_workers=config.max_gh_workers) as log_pool:
log_futs = {
log_pool.submit(fetch_job_log, j.get("databaseId"), config): j
for j in failed_jobs
if j.get("databaseId")
}
for fut in as_completed(log_futs):
j = log_futs[fut]
try:
job_log_by_id[j["databaseId"]] = fut.result() or ""
except Exception as exc:
print(
f"WARN: failed to fetch log for job {j.get('databaseId')} ({j.get('name', '?')}): {exc}",
file=sys.stderr,
)
job_log_by_id[j["databaseId"]] = ""

# Per-job extracted failure lines, used downstream for similarity
# grouping. Each ``jlog`` string already covers a single job, so
# ``filter_log_for_job`` is a cheap no-op safety net.
job_logs: list[str] = []
for job in failed_jobs:
jname = job.get("name", "unknown")
jlog = ""
if log_out:
jlog = extract_relevant_failures(filter_log_for_job(log_out, jname))
raw = job_log_by_id.get(job.get("databaseId", -1), "")
jlog = extract_relevant_failures(filter_log_for_job(raw, jname)) if raw else ""
job_logs.append(jlog)

job_token_sets = [compute_error_tokens(jl) for jl in job_logs]
Expand All @@ -310,11 +336,13 @@ def main() -> None:
for member_indices in error_groups:
rep_idx = member_indices[0]
display_idx += 1
rep_job = failed_jobs[rep_idx]
rep_log = job_log_by_id.get(rep_job.get("databaseId", -1), "") or None
fut = pool.submit(
_process_failed_job,
failed_jobs[rep_idx],
rep_job,
display_idx,
log_out,
rep_log,
run_url,
wf_name,
config,
Expand Down
Loading