Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .tekton/on-pull-request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ spec:
- name: buildah-temp-cache
workspace: buildah-temp-cache
- name: lint-and-test
timeout: 2h30m0s # Timeout for the task
runAfter:
- fetch-repository
workspaces:
Expand Down
246 changes: 219 additions & 27 deletions src/vuln_analysis/functions/code_agent_graph_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from exploit_iq_commons.data_models.checker_status import L2BuildResult, VulnerabilityIntel
from exploit_iq_commons.data_models.common import TargetPackage
from vuln_analysis.functions.react_internals import CheckerThought, Observation, L1VerdictExtraction
from vuln_analysis.utils.token_utils import count_tokens
from vuln_analysis.utils.rpm_checker_prompts import (
L1_VERDICT_EXTRACTION_PROMPT,
VULNERABILITY_INTEL_EXTRACTION_PROMPT,
Expand Down Expand Up @@ -729,6 +730,45 @@ def is_main_source(path: str) -> bool:
return "\n".join(lines)


MAX_HUNK_LINES_FOR_INTEL = 10
MAX_PATCH_TOKENS = 3000
MAX_PATCH_CHUNKS = 2

VULNERABILITY_INTEL_MERGE_LIST_FIELDS = (
"affected_files",
"vulnerable_functions",
"vulnerable_variables",
"vulnerable_patterns",
"fix_patterns",
"search_keywords",
"component_names",
)


def _format_patch_file_lines_for_intel(pf: PatchFile) -> list[str]:
"""Format one patch file block for VULNERABILITY_INTEL_EXTRACTION_PROMPT."""
lines = [f"File: {pf.target_path}"]
for hunk in pf.hunks:
if hunk.removed_lines:
lines.append(" Removed (vulnerable):")
for line in hunk.removed_lines[:MAX_HUNK_LINES_FOR_INTEL]:
lines.append(f" - {line}")
if len(hunk.removed_lines) > MAX_HUNK_LINES_FOR_INTEL:
lines.append(
f" ... (+{len(hunk.removed_lines) - MAX_HUNK_LINES_FOR_INTEL} more lines)"
)
if hunk.added_lines:
lines.append(" Added (fix):")
for line in hunk.added_lines[:MAX_HUNK_LINES_FOR_INTEL]:
lines.append(f" + {line}")
if len(hunk.added_lines) > MAX_HUNK_LINES_FOR_INTEL:
lines.append(
f" ... (+{len(hunk.added_lines) - MAX_HUNK_LINES_FOR_INTEL} more lines)"
)
lines.append("")
return lines


def format_patch_data_for_intel(
parsed_patch: ParsedPatch | None
) -> str:
Expand All @@ -748,29 +788,142 @@ def format_patch_data_for_intel(
"""
if not parsed_patch:
return ""

lines = [f"Patch: {parsed_patch.patch_filename}", ""]
for pf in parsed_patch.files:
lines.append(f"File: {pf.target_path}")
for hunk in pf.hunks:
if hunk.removed_lines:
lines.append(" Removed (vulnerable):")
for line in hunk.removed_lines[:10]:
lines.append(f" - {line}")
if len(hunk.removed_lines) > 10:
lines.append(f" ... (+{len(hunk.removed_lines) - 10} more lines)")
if hunk.added_lines:
lines.append(" Added (fix):")
for line in hunk.added_lines[:10]:
lines.append(f" + {line}")
if len(hunk.added_lines) > 10:
lines.append(f" ... (+{len(hunk.added_lines) - 10} more lines)")
lines.append("")

lines.extend(_format_patch_file_lines_for_intel(pf))

return "\n".join(lines)


def get_relevant_hunks(parsed_patch: ParsedPatch | None, grep_query: str) -> str:
def format_patch_data_chunks_for_intel(
parsed_patch: ParsedPatch | None,
max_tokens: int = MAX_PATCH_TOKENS,
max_chunks: int = MAX_PATCH_CHUNKS,
) -> list[str]:
"""Split patch intel text into token-bounded chunks for LLM extraction.

Returns a single-element list with the full formatted patch when it already
fits within max_tokens, preserving identical output to format_patch_data_for_intel().
"""
if not parsed_patch:
return [""]

full_text = format_patch_data_for_intel(parsed_patch)
if count_tokens(full_text) <= max_tokens:
return [full_text]

patch_header = f"Patch: {parsed_patch.patch_filename}\n\n"
header_tokens = count_tokens(patch_header)
body_token_budget = max(max_tokens - header_tokens, 1)

file_blocks: list[tuple[str, int]] = []
for pf in parsed_patch.files:
block = "\n".join(_format_patch_file_lines_for_intel(pf))
file_blocks.append((block, count_tokens(block)))

if not file_blocks:
return [full_text]

chunks: list[str] = []
current_parts: list[str] = []
current_tokens = 0

for file_block, block_tokens in file_blocks:
if block_tokens > body_token_budget:
if current_parts:
chunks.append(patch_header + "\n".join(current_parts))
if len(chunks) >= max_chunks:
return chunks
current_parts = []
current_tokens = 0
truncated = _truncate_diff_by_tokens(file_block, body_token_budget)
chunks.append(patch_header + truncated)
if len(chunks) >= max_chunks:
return chunks
continue

if current_tokens + block_tokens > body_token_budget and current_parts:
chunks.append(patch_header + "\n".join(current_parts))
if len(chunks) >= max_chunks:
return chunks
current_parts = []
current_tokens = 0

current_parts.append(file_block)
current_tokens += block_tokens

if current_parts and len(chunks) < max_chunks:
chunks.append(patch_header + "\n".join(current_parts))

return chunks if chunks else [full_text]


def merge_vulnerability_intel_chunks(
chunk_intel: list[VulnerabilityIntel],
) -> VulnerabilityIntel:
"""Merge structured intel extracted from multiple patch chunks."""
if not chunk_intel:
return VulnerabilityIntel()

if len(chunk_intel) == 1:
return chunk_intel[0]

merged = VulnerabilityIntel()
for intel in chunk_intel:
for field_name in VULNERABILITY_INTEL_MERGE_LIST_FIELDS:
existing = getattr(merged, field_name)
new_values = [value for value in getattr(intel, field_name) if value not in existing]
setattr(merged, field_name, existing + new_values)

if not merged.root_cause and intel.root_cause:
merged.root_cause = intel.root_cause
if not merged.vulnerability_type and intel.vulnerability_type:
merged.vulnerability_type = intel.vulnerability_type
if not merged.known_mitigations and intel.known_mitigations:
merged.known_mitigations = intel.known_mitigations
if merged.affected_bitness == "both" and intel.affected_bitness != "both":
merged.affected_bitness = intel.affected_bitness
if merged.affected_architectures is None and intel.affected_architectures is not None:
merged.affected_architectures = intel.affected_architectures

logger.debug(
"merge_vulnerability_intel_chunks: merged %d chunks into %d affected_files, "
"%d search_keywords",
len(chunk_intel),
len(merged.affected_files),
len(merged.search_keywords),
)
return merged


def _truncate_diff_by_tokens(diff_text: str, max_tokens: int) -> str:
"""Truncate a diff to fit within max_tokens, preserving complete lines."""
lines = diff_text.split('\n')
kept_lines: list[str] = []
kept_tokens = 0

for line in lines:
line_tokens = count_tokens(line)
if kept_tokens + line_tokens > max_tokens:
break
kept_lines.append(line)
kept_tokens += line_tokens

if kept_lines:
truncated_tokens = count_tokens(diff_text) - kept_tokens
if truncated_tokens > 0:
kept_lines.append(f"[... truncated {truncated_tokens} tokens ...]")
return '\n'.join(kept_lines)
return diff_text[:max_tokens * 4] + "\n[... truncated ...]"


def get_relevant_hunks(
parsed_patch: ParsedPatch | None,
grep_query: str,
max_tokens: int = MAX_PATCH_TOKENS,
max_chunks: int = MAX_PATCH_CHUNKS,
) -> list[str]:
"""Extract unified diff hunks for files matching the grep target.

Parameters
Expand All @@ -779,32 +932,71 @@ def get_relevant_hunks(parsed_patch: ParsedPatch | None, grep_query: str) -> str
Parsed patch file structure (may be None if no patch available).
grep_query:
The grep query string, which may include a file filter (e.g., "pattern,filename.c").
max_tokens:
Maximum tokens per chunk.
max_chunks:
Maximum number of chunks to return.

Returns
-------
str
Unified diff format string with relevant hunks, or empty string if no patch/match.
list[str]
List of unified diff chunks, each within max_tokens. Returns [""] if no patch/match.
"""
if not parsed_patch:
return ""
return [""]

file_pattern = None
if "," in grep_query:
file_pattern = grep_query.split(",")[-1].strip()

hunks = []
file_diffs: list[tuple[str, int]] = []
for pf in parsed_patch.files:
if file_pattern and file_pattern not in pf.target_path:
continue
hunks.append(f"--- a/{pf.target_path}")
hunks.append(f"+++ b/{pf.target_path}")
lines = [f"--- a/{pf.target_path}", f"+++ b/{pf.target_path}"]
for hunk in pf.hunks:
for line in hunk.removed_lines:
hunks.append(f"-\t{line}")
lines.append(f"-\t{line}")
for line in hunk.added_lines:
hunks.append(f"+\t{line}")
lines.append(f"+\t{line}")
file_diff = "\n".join(lines)
file_diffs.append((file_diff, count_tokens(file_diff)))

if not file_diffs:
return [""]

chunks: list[str] = []
current_parts: list[str] = []
current_tokens = 0

for file_diff, tokens in file_diffs:
if tokens > max_tokens:
if current_parts:
chunks.append("\n".join(current_parts))
if len(chunks) >= max_chunks:
return chunks
current_parts = []
current_tokens = 0
truncated = _truncate_diff_by_tokens(file_diff, max_tokens)
chunks.append(truncated)
if len(chunks) >= max_chunks:
return chunks
continue

if current_tokens + tokens > max_tokens and current_parts:
chunks.append("\n".join(current_parts))
if len(chunks) >= max_chunks:
return chunks
current_parts = []
current_tokens = 0

current_parts.append(file_diff)
current_tokens += tokens

if current_parts and len(chunks) < max_chunks:
chunks.append("\n".join(current_parts))

return "\n".join(hunks) if hunks else ""
return chunks if chunks else [""]


# ---------------------------------------------------------------------------
Expand Down
Loading