diff --git a/.cursor/rules/prompt-tuning.mdc b/.cursor/rules/prompt-tuning.mdc new file mode 100644 index 000000000..c29fcc218 --- /dev/null +++ b/.cursor/rules/prompt-tuning.mdc @@ -0,0 +1,393 @@ +--- +description: Prompt tuning workflow - iteratively improve CVE classification accuracy using the Excel test runner +alwaysApply: false +--- + +# Exploit Intelligence Prompt Tuning Workflow + +Use this rule when asked to tune, improve, or optimise the CVE analysis prompts. +The goal is to maximise accuracy (and reduce FP/FN counts) on the Excel test set. + +--- + +## Initial requirements + +Complete this section before starting any tuning session. If anything is missing, stop and resolve it first. + +### Repositories needed + +| Repo | Purpose | +|------|---------| +| `vulnerability-analysis` | Pipeline service, prompt files, configs | +| `exploitiq-tests-automation` | Test runner, scorer, cleanup scripts | + +Both repos must be cloned and up to date before starting. + +### System binaries + +| Binary | Purpose | How to install | +|--------|---------|---------------| +| `python3` (3.10+) | Running the test runner and analysis scripts | `dnf install python3` or `pyenv install 3.12` | +| `java` (JRE 21+) | Running the pipeline JVM | `dnf install java-21-openjdk` | +| `jar` (JDK 21+) | Extracting Java `-sources.jar` for CCA | `dnf install java-21-openjdk-devel` or download Temurin JDK to `~/.local/jdk21` | +| `mvn` (3.6+) | Resolving Java transitive dependencies | Download to `~/.local/bin/mvn` or install via package manager | +| `go` (1.22+) | Building Go call-graph via `go mod graph` | Download to `~/.local/go`; add `~/.local/go/bin` to `PATH` | + +Set in `vulnerability-analysis/.env`: +``` +JAVA_HOME=$HOME/.local/jdk21 +GOROOT=$HOME/.local/go +PATH=$HOME/.local/go/bin:$PATH +``` + +### Environment variables + +All variables are set in `vulnerability-analysis/.env` and sourced when the service starts. + +**Intel API keys (required):** +``` +GHSA_API_KEY=... # GitHub Security Advisory API +NVD_API_KEY=... # NIST NVD CVE database +SERPAPI_API_KEY=... # CVE web search +``` + +**LLM endpoint (required):** +``` +NVIDIA_API_BASE=https:///v1 +``` +Set this to the gateway URL for the model being tuned. The six model-name variables +(`CHECKLIST_MODEL_NAME`, `CVE_AGENT_EXECUTOR_MODEL_NAME`, `SUMMARIZE_MODEL_NAME`, +`JUSTIFY_MODEL_NAME`, `CODE_VDB_RETRIEVER_MODEL_NAME`, `DOC_VDB_RETRIEVER_MODEL_NAME`) +default to the value in the config YAML and do not need to be set unless routing +individual stages to a different model. + +**Java CCA - Red Hat Maven auth (required for Java ecosystem CVEs, local only):** +``` +REDHAT_MAVEN_USERNAME=... # Red Hat Maven GA repository username +REDHAT_MAVEN_PASSWORD=... # Red Hat Maven GA repository token (JWT) +``` +These are read by `~/.m2/settings.xml` (local user Maven settings, never committed) via +`${env.REDHAT_MAVEN_USERNAME}` and `${env.REDHAT_MAVEN_PASSWORD}`. Maven merges `~/.m2/settings.xml` +with the project's `kustomize/base/settings.xml` automatically, so credentials stay out of any +committed file. + +`kustomize/base/settings.xml` must NOT contain a `` block - it is baked into a +ConfigMap for staging and any credentials committed there would be deployed to the cluster. + +### Service config files + +Each model has a test config in `vulnerability-analysis/src/vuln_analysis/configs/`: + +| Config | Model | +|--------|-------| +| `config-gemma-test.yml` | Google Gemma 4 31B via inference gateway | +| `config-granite-test.yml` | IBM Granite 4.1 30B via inference gateway | + +### Quick-start command + +```bash +# Run from exploitiq-tests-automation repo root +python src/tuning/excel_runner.py \ + --input-excel /path/to/test_set.xlsx \ + --output-excel results/results_$(date +%Y%m%d_%H%M%S).xlsx \ + --url http://localhost:26466/generate \ + --num-threads 2 +``` + +--- + +## Iteration loop + +### Step 0 - Pre-flight checks (MUST pass before proceeding) +Before running or continuing any iteration, verify all of the following. **Stop immediately and report to the user if any check fails - do not proceed to the next step.** + +1. **Service is up**: `curl -s http://localhost:26466/health` returns a non-error response. +2. **Required binaries present**: verify `jar` (JDK tool, not JRE) is on PATH (`which jar`). Missing `jar` will cause every Java test case to 500 before the LLM engine runs. If absent, stop and report - do not run the test suite. +3. **No infrastructure errors in the service log**: scan `aiq_*.log` for `Error running workflow`, `No such file or directory`, `jar`, `Failed to build projectmanifest`. If any appear, note the affected test cases. +4. **All samples must produce an output before proceeding**: after a run, count `src/reports/*.json` (successful) + `*_failed.json` (infrastructure fail). The **total must equal the number of input samples** (e.g. 12 for the small test set). If the count is less, the run is incomplete - stop, diagnose the missing cases, and resolve the infrastructure gap before editing any prompts or starting the next batch. +5. **At least one result JSON produced**: if every result is `*_failed.json`, stop - there is a systematic pipeline failure that must be resolved before tuning can continue. +6. **Runner exited 0**: check `echo $?` or the log tail for a normal completion line. A non-zero exit or absence of the `Global metrics` line means the run did not complete and its output must not be used. + +If any pre-flight check fails, **do not edit prompts, do not re-run, do not draw accuracy conclusions**. Report the failure type and blocked step to the user. + +#### Run timeout policy +Kill any service or runner process that has not produced a new successful JSON result after **10 minutes**. Check logs immediately for the blocking cause before restarting. Do not assume a long-running process is making useful progress without evidence in the logs. + +#### Expected per-sample runtimes (warm cache) +| Ecosystem | Expected | Notes | +|-----------|----------|-------| +| Python | 1-2 min | Small repos; no Maven | +| Go | 1-3 min | Pickle cached; may hit 25+ min if agent loops on FL-failure - see FL-loop fix below | +| Java (warm) | 5-15 min | `_JavaRepoData` (call graph) rebuilds on every cold service start; wildfly = ~9 min rebuild | +| Java (cold) | 15-25 min | First run builds pickle (89 K-file repos); cold `_JavaRepoData` adds another ~9 min | + +If a sample exceeds 10 minutes: kill, read the service log, identify whether it is an infrastructure cold-start or an agent FL-failure loop, then decide whether a fix is needed before re-running. + +#### Known infrastructure gaps (as of 2026-05-28) + +| Gap | Status | Symptom | Fix | +|-----|--------|---------|-----| +| `jar` not on PATH | (ok) Fixed - wrapper at `~/.local/bin/jar` | `[Errno 2] No such file or directory: 'jar'` | Install proper JDK: `dnf install java-21-openjdk-devel` | +| Keycloak Java 25 incompatibility | (fail) Open | `error: release version 8 not supported` - Java 25 dropped `--release 8` | Install Java 21 JDK; keycloak 24.0.0 requires it | +| EAP74 wildfly pickle cold build | (ok) Warmed | First run: 89 K files -> ~9 min pickle build | Already done; subsequent runs load from `.cache/am_cache/pickle/https.github.com.wildfly.wildfly-*` | +| EAP74 `_JavaRepoData` cold build | (warn) Partial | Cold service start rebuilds call graph even when pickle exists (~9 min) | Persisting `_JavaRepoData` to disk requires a code change outside prompt scope | +| Agent FL-failure loop | (ok) Fixed - `reachability_agent.py` | When FL returns "Package not found", agent loops 7-10 iterations trying CCA rules, wasting 10+ min | Code fix: `check_finish_allowed` now returns `True` when `package_validated is False` | +| `cve_check_vuln_deps: skip: true` | (warn) Pipeline-limited | Transitive-only deps not detected -> model concludes `code_not_present` incorrectly | Enable VDC or accept as pipeline-scoped; cannot be prompt-fixed | + +### Step 1 - Run the test suite +Execute the command above. Note the output Excel path. +Capture the global metrics line from the log: +`Global metrics - Accuracy: X Precision: X Recall: X F1: X TP: X TN: X FP: X FN: X` + +### Step 2 - Identify failing cases +Open the output Excel. In the **Summary** sheet, read: +- Per-ecosystem Accuracy, F1, FP, FN counts. +- The confusion matrix block (noise-reducer orientation - see below). + +Switch to the per-language sheets (e.g. `go`, `python`, `java`). +Filter where `FP = 1` or `FN = 1`. Note `scan_id`, `vuln_id`, `truth_category`, `predicted_category`, `justification_label`. + +### Step 3 - Inspect raw output for failing cases +Before inspecting results, classify each failure as **infrastructure** or **classification**: + +- **Infrastructure failure**: `*_failed.json` file, 500 response, missing `jar`, missing git cache, missing repo clone, `Error running workflow` in service log, timeout. These are NOT prompt issues - do not attempt to fix them via prompt edits. Note the affected `vuln_id` and exclude it from the analysis, then report the infrastructure gap to the user. +- **Classification failure**: a successful `*.json` result where `justification.label` differs from the expected category. Only these cases are candidates for prompt tuning. + +**Stop iterating if the majority of failures are infrastructure failures.** Prompt changes cannot fix missing tools, uncached repositories, or disabled pipeline stages (`skip: true`). + +For classification failures, find the result JSON in `src/reports/` of the test automation repo: +``` +src/reports/{scan_id}_{vuln_id}_1.json +``` +Examine: +- `output.analysis[0].justification.label` + `.reason` - the final classification +- `output.analysis[0].summary` - investigation summary paragraph +- `output.analysis[0].agent_results[*]` - checklist Q&A and tool calls + +### Step 4 - Diagnose which prompt stage failed + +Each model family has its own prompt file in `vulnerability-analysis/src/vuln_analysis/utils/`. When `model_family` is set in the config, edits go to that file. When unset, edits go to the default files. + +| Stage | Model-family file -> symbol | Default file -> symbol | +|-------|----------------------|-----------------------| +| Checklist generation | `.py` -> `CHECKLIST_PROMPT` | `checklist_prompt_generator.py` -> `DEFAULT_CHECKLIST_PROMPT` / `prompting.py` -> `MOD_FEW_SHOT` | +| Reachability agent reasoning | `.py` -> `REACHABILITY_AGENT_SYS_PROMPT` | `react_internals.py` -> `REACHABILITY_AGENT_SYS_PROMPT` | +| Investigation summary | `.py` -> `SUMMARY_PROMPT` + gate messages | `prompting.py` -> `SUMMARY_PROMPT` | +| Final label | `.py` -> `JUSTIFICATION_PROMPT` | `justification_parser.py` -> `JUSTIFICATION_PROMPT` | + +**Non-prompt agent behaviour** (code fixes, not prompt edits): +| Behaviour | File | Symbol | +|-----------|------|--------| +| Finish blocked when package absent (FL returns "not valid") | `reachability_agent.py` | `ReachabilityAgent.check_finish_allowed` - allow finish when `package_validated is False` (ok) fixed | +| Finish blocked until CCA runs (even when CCA impossible) | `react_internals.py` | `ReachabilityRulesTracker.check_finish_allowed` | +| Package absence gate at summary stage | `cve_summarize.py` | `_package_not_found` -> prepends `PACKAGE_PRESENCE_GATE_MSG` | + +Diagnosis heuristics: +- Wrong `justification_label` but correct summary -> fix `JUSTIFICATION_PROMPT` examples or precedence rules. +- Correct tool calls but wrong conclusion -> fix `REACHABILITY_AGENT_SYS_PROMPT` reasoning rules. +- Agent calls FL repeatedly but never reaches CCA (loop) -> check `check_finish_allowed`; if `package_validated is False`, the code fix should allow finishing. +- Checklist questions too vague or miss the vulnerable function -> fix `CHECKLIST_PROMPT` item templates. +- `uncertain` label on a case that should be `code_not_reachable` -> fix `JUSTIFICATION_PROMPT` precedence for dependency-only evidence. + +### Step 5 - Edit the prompt +Make a targeted, minimal change. Before the changed block, add a one-line comment stating the hypothesis, e.g.: +```python +# Hypothesis: model conflates 'code_not_reachable' with 'requires_configuration' - add example +``` +Do not rewrite entire prompts; surgical edits are easier to revert and attribute. + +**Prompt file hygiene (enforced at all times):** +- Model prompt files (`granite.py`, `gemma.py`, and any future `.py`) must contain ONLY prompt strings and the catalog wiring function. No tuning history, iteration notes, metrics, hypotheses, or session commentary belongs in these files. +- Tuning history lives exclusively in this mdc file and in git commit messages. +- When creating a new model prompt file, the module docstring must contain only: what model the file targets, and the reference to `multi_prompt_factory._CATALOG`. Nothing else. + +### Step 6 - Restart service and re-run +After editing a prompt file, restart the vulnerability analysis service so the change is loaded: +```bash +# In the vulnerability-analysis repo +set -a && source .env && set +a +.venv/bin/nat serve \ + --config_file src/vuln_analysis/configs/config--test.yml \ + --port 26466 \ + > /tmp/aiq_service.log 2>&1 & +``` +Then re-run `src/tuning/excel_runner.py` with a new timestamped output path. + +### Step 7 - After every run: output confusion matrix and fix list +After every completed run (whether metrics improved or not), produce the following two artifacts in the chat before doing anything else. + +**Confusion matrix** (noise-reducer orientation - correctly suppressing non-exploitable CVEs is the primary goal): + +``` ++-----------------------------+----------------------+--------------------+ +| | Pred: Not Exploitable| Pred: Exploitable | ++-----------------------------+----------------------+--------------------+ +| Truth: Not Exploitable | TP = N (correct) | FN = N (false alrm)| +| Truth: Exploitable | FP = N (missed) | TN = N (correct) | +| Infrastructure 500 / ERROR | INFRA = N | LABEL_ERR = N | ++-----------------------------+----------------------+--------------------+ +Accuracy: X F1: X (scored on N/12 samples) +``` + +List every sample in a table: `vuln_id | ecosystem | truth | predicted_label | outcome | failure_type`. + +**Fix list** - one row per failing case, tied to the specific issue. Use exactly these failure types: + +| Failure type | Meaning | Actionable fix | +|---|---|---| +| `INFRA_BINARY` | Required binary missing (`jar`, `mvn`, `go`, etc.) | Install missing tool | +| `INFRA_REPO` | Git repo not cloned or projectmanifest not built | Pre-warm cache or fix network | +| `INFRA_CONFIG` | Pipeline stage disabled (`skip: true`) blocks detection | Enable stage or document as pipeline-limited | +| `LABEL_UNCERTAIN` | Model returned `uncertain` (-> ERROR in analysis) | Prompt fix: improve agent CCA enforcement or justification rules | +| `LABEL_WRONG` | Model returned a wrong definitive label (FP or FP-noise-reducer) | Prompt fix: diagnose which stage failed (Step 4) | +| `LABEL_PIPELINE_LIMITED` | Correct tool chain ran but missing data prevents a correct answer | Document as pipeline-limited; no prompt fix available | + +Do not skip this step even if the run produced zero improvements. The fix list must be updated every iteration to reflect which issues are resolved, which are new, and which remain blocked. + +**Compare metrics** (only against runs with the same infrastructure failure count): +- Primary: global F1 Score and Accuracy +- Secondary: FP count (missed exploitables) and FN count (false alarms) + +Keep the prompt change if F1 improves or stays equal while FP+FN total decreases. Revert with `git diff` + manual undo if the change regresses. + +### Step 8 - Repeat +Return to Step 0 and continue iterating until the target accuracy threshold is reached. + +Do not commit changes during the tuning loop. Committing is a separate, explicit user action. + +--- + +## Cleanup tool + +After the user has reviewed results and is satisfied with the tuning session, they may +explicitly ask to clean up. Run this script only on a direct, unambiguous request such as +"clean up the tuning outputs", "we're done, clean up", or similar. Do NOT infer cleanup +from phrases like "keep going until no improvements" or "run the full loop" - the user +will still want to review results and logs before cleaning up. + +```bash +# Run from exploitiq-tests-automation repo root. +# Preview what would be deleted (safe, no changes): +bash src/tuning/tuning_cleanup.sh --dry-run + +# Execute cleanup: +bash src/tuning/tuning_cleanup.sh +``` + +--- + +## Confusion matrix orientation (noise-reducer) + +The project measures noise reduction: correctly suppressing non-exploitable CVEs is the primary goal. + +`NoiseReducerProcessorAnalysis.calculate_confusion_matrix` (`analysis.py:1029`) defines: + +| | Predicted: Not Exploitable | Predicted: Exploitable | +|---|---|---| +| **Truth: Not Exploitable** | **TP** - correct suppression | **FN** - false alarm (noisy) | +| **Truth: Exploitable** | **FP** - missed exploit | **TN** - correct escalation | + +- Minimise **FP** (missed exploits): model incorrectly suppressed a real vulnerability. +- Minimise **FN** (false alarms): model incorrectly flagged a non-exploitable CVE. +- `uncertain` label maps to **ERROR** (not counted in TP/TN/FP/FN). +- Pipeline 500 failures are absent from `extracted_data.csv` and not in the scored set. + +--- + +## Key files reference + +### Prompt files (model-family path) +``` +vulnerability-analysis/ + src/vuln_analysis/ + utils/ + granite.py <- Granite prompts: CHECKLIST_PROMPT, + REACHABILITY_AGENT_SYS_PROMPT, SUMMARY_PROMPT, + JUSTIFICATION_PROMPT, gate messages + gemma.py <- Gemma prompts (same structure as granite.py) + multi_prompt_factory.py <- _granite_catalog() / _gemma_catalog() wire model + files into _CATALOG; get_prompt() resolves by + model family + version + functions/ + react_internals.py <- Default (non-model-family) REACHABILITY_AGENT_SYS_PROMPT, + CLASSIFICATION_PROMPT_TEMPLATE, COMPREHENSION_PROMPT, + MEMORY_UPDATE_PROMPT, FORCED_FINISH_PROMPT + reachability_agent.py <- Reads model_family -> loads model sys prompt if set + cve_checklist.py <- Reads model_family -> loads model checklist if set + cve_summarize.py <- Reads model_family -> loads model summary + gates + cve_justify.py <- Reads model_family -> loads model justification + +### Default (no model_family) prompt files + src/vuln_analysis/ + utils/ + justification_parser.py <- Default JUSTIFICATION_PROMPT (12 label categories) + prompting.py <- Default MOD_FEW_SHOT, SUMMARY_PROMPT, CVSS_PROMPT_TEMPLATE + checklist_prompt_generator.py <- DEFAULT_CHECKLIST_PROMPT (MOD_FEW_SHOT formatted) + prompt_factory.py <- TOOL_SELECTION_STRATEGY, FEW_SHOT_EXAMPLES per language +``` + +### Pipeline orchestration +``` + src/vuln_analysis/ + functions/ + register.py <- LangGraph parent workflow + llm_engine subgraph + Stage order: generate_vdbs -> fetch_intel -> intel_score -> + process_sbom -> check_vuln_deps -> llm_engine -> + (checklist -> agent_executor -> summarize -> justify -> vex -> cvss) + cve_generate_vdbs.py <- HARD GATE: git clone + lexical index; Java needs jar+mvn + cve_agent.py <- Dispatches checklist items to agent graphs + base_graph_agent.py <- Shared ReAct loop (thought -> tool -> observation -> repeat) +``` + +### Java infrastructure (required for CCA on Java samples) +``` + PATH: java (JRE), jar (JDK - NOT JRE), mvn + Cache: .cache/am_cache/git// <- git clones + .cache/am_cache/pickle// <- document pickle cache + Maven (local): ~/.m2/settings.xml (not committed) reads REDHAT_MAVEN_USERNAME and + REDHAT_MAVEN_PASSWORD from .env; merged with kustomize/base/settings.xml + Maven (staging): kustomize/base/settings.xml mounted as ConfigMap - no credentials block; + staging uses a separate auth mechanism outside prompt tuning scope + JAVA_MAVEN_DEFAULT_SETTINGS_FILE_PATH: set to /maven-config/settings.xml in staging pod; + for local, defaults to ../../../../kustomize/base/settings.xml +``` + +### Test runner (exploitiq-tests-automation) +``` +exploitiq-tests-automation/ + src/ + tuning/ + excel_runner.py <- single-model entry point for tuning runs + excel_loader.py <- Excel -> scan_generated.json + expected-results CSVs + excel_reporter.py <- scored results -> output Excel (per-language + Summary sheets) + run_and_monitor.sh <- wrapper with event-based stall detection + tuning_cleanup.sh <- removes all run artifacts; must be run explicitly + analysis.py <- NoiseReducerProcessorAnalysis (calculate_confusion_matrix + uses noise-reducer orientation: TP=correct suppression, + FP=missed exploit, FN=false alarm, TN=correct escalation) + config/ + analysis_config.json <- scanner_label_map: uncertain->ERROR, code_not_present->TN-1, etc. +``` + +## Pipeline stages: prompt-tunable vs infrastructure-gated + +Understanding this boundary prevents wasted tuning iterations on infrastructure failures. + +| Stage | Prompt-tunable? | Infrastructure requirement | Config | +|-------|----------------|--------------------------|--------| +| `cve_generate_vdbs` | No | git clone, lexical index, **`jar` + `mvn`** for Java source extraction | runs; `ignore_code_embedding: true` | +| `cve_fetch_intel` | No | GHSA_API_KEY, NVD_API_KEY, SERPAPI_API_KEY | runs | +| `cve_calculate_intel_score` | No (uses its own LLM for scoring) | LLM endpoint | runs | +| `cve_process_sbom` | No | valid SBOM in request | runs | +| `cve_check_vuln_deps` | No | SBOM + advisory cross-reference | **skip: true** | +| `cve_checklist` | **Yes** - `.CHECKLIST_PROMPT` | LLM endpoint | runs; `model_family: ` | +| `cve_agent_executor` (reachability) | **Yes** - `.REACHABILITY_AGENT_SYS_PROMPT` | LLM + tool chain (CCA needs `jar`+`mvn` for Java) | runs; `model_family: ` | +| `cve_summarize` | **Yes** - `.SUMMARY_PROMPT` + gate messages | LLM endpoint | runs; `model_family: ` | +| `cve_justify` | **Yes** - `.JUSTIFICATION_PROMPT` | LLM endpoint | runs; `model_family: ` | +| `cve_generate_cvss` | No | LLM + tools | **skip: true** | +| `cve_generate_vex` | No | none | runs | +| `cve_file_output` | No | disk write | runs | + +**Hard gate:** `cve_generate_vdbs` -> `check_vdbs_success`. If the code index build fails (missing `jar`, broken Maven, inaccessible git repo), the workflow routes directly to `failure` and the LLM engine never runs. This produces a 500 at the API level and `*_failed.json` in `src/reports/`. No prompt change can recover these cases. + +**`cve_check_vuln_deps: skip: true` effect:** Every CVE reaches the agent regardless of SBOM match. This exposes transitive dependencies (e.g. urllib3 installed but not directly imported) where code search finds nothing -> agent concludes `code_not_present` even though the CVE is exploitable. This is pipeline-limited; prompt changes cannot compensate for absent SBOM/VDC data. diff --git a/src/exploit_iq_commons/utils/dep_tree.py b/src/exploit_iq_commons/utils/dep_tree.py index f32d8bb12..a9750ead7 100644 --- a/src/exploit_iq_commons/utils/dep_tree.py +++ b/src/exploit_iq_commons/utils/dep_tree.py @@ -1501,7 +1501,14 @@ def install_dependencies(self, manifest_path: Path): cmd = f"cd {manifest_path} && uv venv {TRANSITIVE_ENV_NAME}" run_command(cmd) site_packages = self._find_site_packages(manifest_path) - with open(manifest_path / PYTHON_MANIFEST, 'r') as manifest: + manifest_file = manifest_path / PYTHON_MANIFEST + if not manifest_file.exists(): + import logging as _log + _log.getLogger(__name__).debug( + "No %s found in %s; skipping dependency installation", PYTHON_MANIFEST, manifest_path + ) + return + with open(manifest_file, 'r') as manifest: for line in tqdm(manifest): if line.strip() and not PythonLanguageFunctionsParser.is_comment_line(line): self.install_dependency(line, manifest_path) diff --git a/src/vuln_analysis/configs/config-gemma-test.yml b/src/vuln_analysis/configs/config-gemma-test.yml new file mode 100644 index 000000000..1d1b4c47c --- /dev/null +++ b/src/vuln_analysis/configs/config-gemma-test.yml @@ -0,0 +1,258 @@ +# Test config for Google Gemma 4 31B via inference gateway. +# Based on config-http-openai.yml with these changes: +# - Uses cve_file_output (writes .tmp/output.json) instead of cve_http_output +# - intel_plugin_config kept (required by schema) but points to localhost:8080; +# SimpleHttpIntelPlugin catches ConnectionError and returns empty intel gracefully +# - Removes Code/Docs Semantic Search from agent tools (no embedder available) +# - ignore_code_embedding: true (skip VDB build, use lexical + call-chain only) +# - cve_check_vuln_deps: skip: true +# - cve_generate_cvss: skip: true +# +# Required env vars: +# GHSA_API_KEY SERPAPI_API_KEY NVD_API_KEY +# NVIDIA_API_BASE (all model names use RedHatAI/gemma-4-31B-it-FP8-block) +# CHECKLIST_MODEL_NAME CVE_AGENT_EXECUTOR_MODEL_NAME +# CODE_VDB_RETRIEVER_MODEL_NAME DOC_VDB_RETRIEVER_MODEL_NAME +# SUMMARIZE_MODEL_NAME JUSTIFY_MODEL_NAME +# +# Start with: +# nat serve --config_file src/vuln_analysis/configs/config-gemma-test.yml --port 26466 + +general: + front_end: + _type: fastapi + endpoints: + - path: /health + method: GET + description: Perform a health check. + function_name: health_check + use_uvloop: true + telemetry: + tracing: + phoenix: + _type: phoenix + endpoint: ${OTEL_TRACES_ENDPOINT:-http://localhost:6006/v1/traces} + project: cve_agent_gemma + +functions: + cve_generate_vdbs: + _type: cve_generate_vdbs + agent_name: cve_agent_executor + embedder_name: nim_embedder + base_git_dir: .cache/am_cache/git + base_vdb_dir: .cache/am_cache/vdb + base_code_index_dir: .cache/am_cache/code_index + base_pickle_dir: .cache/am_cache/pickle + base_rpm_dir: .cache/am_cache/rpms + ignore_code_embedding: true # skip VDB; use lexical + call-chain search only + + cve_fetch_intel: + _type: cve_fetch_intel + # intel_plugin_config is required by the schema; pointing to a non-existent endpoint + # is safe - SimpleHttpIntelPlugin catches all RequestException and returns empty intel. + intel_plugin_config: + plugin_name: vuln_analysis.data_models.plugins.intel_plugin.SimpleHttpIntelPlugin + plugin_config: + source: Product Security research + endpoint: http://localhost:8080/api/v1/vulnerabilities/{vuln_id}/comments + + cve_process_sbom: + _type: cve_process_sbom + + cve_check_vuln_deps: + _type: cve_check_vuln_deps + skip: true + + cve_checklist: + _type: cve_checklist + llm_name: checklist_llm + model_family: gemma + + Call Chain Analyzer: + _type: transitive_code_search + enable_transitive_search: true + + Function Caller Finder: + _type: calling_function_name_extractor + enable_functions_usage_search: true + + Function Locator: + _type: package_and_function_locator + + Function Library Version Finder: + _type: calling_function_library_version_finder + + Code Keyword Search: + _type: lexical_code_search + top_k: 5 + + CVE Web Search: + _type: serp_wrapper + max_retries: 5 + + Container Analysis Data: + _type: container_image_analysis_data + + cve_agent_executor: + _type: cve_agent_executor + llm_name: cve_agent_executor_llm + model_family: gemma + tool_names: + # Semantic search tools removed (no embedder available) + - Code Keyword Search + - CVE Web Search + - Call Chain Analyzer + - Function Caller Finder + - Function Locator + - Function Library Version Finder + max_concurrency: null + max_iterations: 10 + prompt_examples: false + replace_exceptions: true + replace_exceptions_value: "I do not have a definitive answer for this checklist item." + return_intermediate_steps: false + cve_web_search_enabled: true + verbose: false + + cve_generate_cvss: + _type: cve_generate_cvss + skip: true + llm_name: generate_cvss_llm + tool_names: + - Code Keyword Search + - Container Analysis Data + max_concurrency: null + max_iterations: 10 + prompt_examples: true + replace_exceptions: false + replace_exceptions_value: "Failed to generate CVSS for this analysis." + return_intermediate_steps: false + verbose: false + + cve_summarize: + _type: cve_summarize + llm_name: summarize_llm + model_family: gemma + + cve_justify: + _type: cve_justify + llm_name: justify_llm + model_family: gemma + + cve_generate_vex: + _type: cve_generate_vex + skip: false + + cve_file_output: + _type: cve_file_output + file_path: .tmp/output.json + markdown_dir: .tmp/vulnerability_markdown_reports + overwrite: true + + cve_calculate_intel_score: + _type: cve_calculate_intel_score + llm_name: intel_source_score_llm + generate_intel_score: true + intel_low_score: 51 + insist_analysis: false + + health_check: + _type: health_check + +llms: + checklist_llm: + _type: openai + api_key: "EMPTY" + base_url: ${NVIDIA_API_BASE:-https://integrate.api.nvidia.com/v1} + model_name: ${CHECKLIST_MODEL_NAME:-RedHatAI/gemma-4-31B-it-FP8-block} + temperature: 0.0 + max_tokens: 2000 + top_p: 0.01 + + code_vdb_retriever_llm: + _type: openai + api_key: "EMPTY" + base_url: ${NVIDIA_API_BASE:-https://integrate.api.nvidia.com/v1} + model_name: ${CODE_VDB_RETRIEVER_MODEL_NAME:-RedHatAI/gemma-4-31B-it-FP8-block} + temperature: 0.0 + max_tokens: 2000 + top_p: 0.01 + + doc_vdb_retriever_llm: + _type: openai + api_key: "EMPTY" + base_url: ${NVIDIA_API_BASE:-https://integrate.api.nvidia.com/v1} + model_name: ${DOC_VDB_RETRIEVER_MODEL_NAME:-RedHatAI/gemma-4-31B-it-FP8-block} + temperature: 0.0 + max_tokens: 2000 + top_p: 0.01 + + cve_agent_executor_llm: + _type: openai + api_key: "EMPTY" + base_url: ${NVIDIA_API_BASE:-https://integrate.api.nvidia.com/v1} + model_name: ${CVE_AGENT_EXECUTOR_MODEL_NAME:-RedHatAI/gemma-4-31B-it-FP8-block} + temperature: 0.0 + max_tokens: 2000 + top_p: 0.01 + + generate_cvss_llm: + _type: openai + api_key: "EMPTY" + base_url: ${NVIDIA_API_BASE:-https://integrate.api.nvidia.com/v1} + model_name: ${GENERATE_CVSS_MODEL_NAME:-RedHatAI/gemma-4-31B-it-FP8-block} + temperature: 0.0 + max_tokens: 1024 + top_p: 0.01 + + summarize_llm: + _type: openai + api_key: "EMPTY" + base_url: ${NVIDIA_API_BASE:-https://integrate.api.nvidia.com/v1} + model_name: ${SUMMARIZE_MODEL_NAME:-RedHatAI/gemma-4-31B-it-FP8-block} + temperature: 0.0 + max_tokens: 1024 + top_p: 0.01 + + justify_llm: + _type: openai + api_key: "EMPTY" + base_url: ${NVIDIA_API_BASE:-https://integrate.api.nvidia.com/v1} + model_name: ${JUSTIFY_MODEL_NAME:-RedHatAI/gemma-4-31B-it-FP8-block} + temperature: 0.0 + max_tokens: 1024 + top_p: 0.01 + + intel_source_score_llm: + _type: openai + api_key: "EMPTY" + base_url: ${NVIDIA_API_BASE:-https://integrate.api.nvidia.com/v1} + model_name: ${JUSTIFY_MODEL_NAME:-RedHatAI/gemma-4-31B-it-FP8-block} + temperature: 0.0 + max_tokens: 1024 + top_p: 0.01 + +# Embedder definition kept but not used (ignore_code_embedding: true means no VDB build; +# the semantic search tools are removed from the agent tool list). +embedders: + nim_embedder: + _type: nim + base_url: ${NIM_EMBED_BASE_URL:-https://integrate.api.nvidia.com/v1} + model_name: ${EMBEDDER_MODEL_NAME:-nvidia/nv-embedqa-e5-v5} + truncate: END + max_batch_size: 128 + +workflow: + _type: cve_agent + cve_generate_vdbs_name: cve_generate_vdbs + cve_fetch_intel_name: cve_fetch_intel + cve_calculate_intel_score_name: cve_calculate_intel_score + cve_process_sbom_name: cve_process_sbom + cve_check_vuln_deps_name: cve_check_vuln_deps + cve_checklist_name: cve_checklist + cve_agent_executor_name: cve_agent_executor + cve_generate_cvss_name: cve_generate_cvss + cve_generate_vex_name: cve_generate_vex + cve_summarize_name: cve_summarize + cve_justify_name: cve_justify + cve_output_config_name: cve_file_output diff --git a/src/vuln_analysis/configs/config-granite-test.yml b/src/vuln_analysis/configs/config-granite-test.yml new file mode 100644 index 000000000..03017bda1 --- /dev/null +++ b/src/vuln_analysis/configs/config-granite-test.yml @@ -0,0 +1,258 @@ +# Test config for IBM Granite 4.1 30B via inference gateway. +# Based on config-http-openai.yml with these changes: +# - Uses cve_file_output (writes .tmp/output.json) instead of cve_http_output +# - intel_plugin_config kept (required by schema) but points to localhost:8080; +# SimpleHttpIntelPlugin catches ConnectionError and returns empty intel gracefully +# - Removes Code/Docs Semantic Search from agent tools (no embedder available) +# - ignore_code_embedding: true (skip VDB build, use lexical + call-chain only) +# - cve_check_vuln_deps: skip: true +# - cve_generate_cvss: skip: true +# +# Required env vars: +# GHSA_API_KEY SERPAPI_API_KEY NVD_API_KEY +# NVIDIA_API_BASE (all model names use ibm-granite/granite-4.1-30b-fp8) +# CHECKLIST_MODEL_NAME CVE_AGENT_EXECUTOR_MODEL_NAME +# CODE_VDB_RETRIEVER_MODEL_NAME DOC_VDB_RETRIEVER_MODEL_NAME +# SUMMARIZE_MODEL_NAME JUSTIFY_MODEL_NAME +# +# Start with: +# nat serve --config_file src/vuln_analysis/configs/config-granite-test.yml --port 26466 + +general: + front_end: + _type: fastapi + endpoints: + - path: /health + method: GET + description: Perform a health check. + function_name: health_check + use_uvloop: true + telemetry: + tracing: + phoenix: + _type: phoenix + endpoint: ${OTEL_TRACES_ENDPOINT:-http://localhost:6006/v1/traces} + project: cve_agent + +functions: + cve_generate_vdbs: + _type: cve_generate_vdbs + agent_name: cve_agent_executor + embedder_name: nim_embedder + base_git_dir: .cache/am_cache/git + base_vdb_dir: .cache/am_cache/vdb + base_code_index_dir: .cache/am_cache/code_index + base_pickle_dir: .cache/am_cache/pickle + base_rpm_dir: .cache/am_cache/rpms + ignore_code_embedding: true # skip VDB; use lexical + call-chain search only + + cve_fetch_intel: + _type: cve_fetch_intel + # intel_plugin_config is required by the schema; pointing to a non-existent endpoint + # is safe - SimpleHttpIntelPlugin catches all RequestException and returns empty intel. + intel_plugin_config: + plugin_name: vuln_analysis.data_models.plugins.intel_plugin.SimpleHttpIntelPlugin + plugin_config: + source: Product Security research + endpoint: http://localhost:8080/api/v1/vulnerabilities/{vuln_id}/comments + + cve_process_sbom: + _type: cve_process_sbom + + cve_check_vuln_deps: + _type: cve_check_vuln_deps + skip: true + + cve_checklist: + _type: cve_checklist + llm_name: checklist_llm + model_family: granite + + Call Chain Analyzer: + _type: transitive_code_search + enable_transitive_search: true + + Function Caller Finder: + _type: calling_function_name_extractor + enable_functions_usage_search: true + + Function Locator: + _type: package_and_function_locator + + Function Library Version Finder: + _type: calling_function_library_version_finder + + Code Keyword Search: + _type: lexical_code_search + top_k: 5 + + CVE Web Search: + _type: serp_wrapper + max_retries: 5 + + Container Analysis Data: + _type: container_image_analysis_data + + cve_agent_executor: + _type: cve_agent_executor + llm_name: cve_agent_executor_llm + model_family: granite + tool_names: + # Semantic search tools removed (no embedder available) + - Code Keyword Search + - CVE Web Search + - Call Chain Analyzer + - Function Caller Finder + - Function Locator + - Function Library Version Finder + max_concurrency: null + max_iterations: 10 + prompt_examples: false + replace_exceptions: true + replace_exceptions_value: "I do not have a definitive answer for this checklist item." + return_intermediate_steps: false + cve_web_search_enabled: true + verbose: false + + cve_generate_cvss: + _type: cve_generate_cvss + skip: true + llm_name: generate_cvss_llm + tool_names: + - Code Keyword Search + - Container Analysis Data + max_concurrency: null + max_iterations: 10 + prompt_examples: true + replace_exceptions: false + replace_exceptions_value: "Failed to generate CVSS for this analysis." + return_intermediate_steps: false + verbose: false + + cve_summarize: + _type: cve_summarize + llm_name: summarize_llm + model_family: granite + + cve_justify: + _type: cve_justify + llm_name: justify_llm + model_family: granite + + cve_generate_vex: + _type: cve_generate_vex + skip: false + + cve_file_output: + _type: cve_file_output + file_path: .tmp/output.json + markdown_dir: .tmp/vulnerability_markdown_reports + overwrite: true + + cve_calculate_intel_score: + _type: cve_calculate_intel_score + llm_name: intel_source_score_llm + generate_intel_score: true + intel_low_score: 51 + insist_analysis: false + + health_check: + _type: health_check + +llms: + checklist_llm: + _type: openai + api_key: "EMPTY" + base_url: ${NVIDIA_API_BASE:-https://integrate.api.nvidia.com/v1} + model_name: ${CHECKLIST_MODEL_NAME:-ibm-granite/granite-4.1-30b-fp8} + temperature: 0.0 + max_tokens: 2000 + top_p: 0.01 + + code_vdb_retriever_llm: + _type: openai + api_key: "EMPTY" + base_url: ${NVIDIA_API_BASE:-https://integrate.api.nvidia.com/v1} + model_name: ${CODE_VDB_RETRIEVER_MODEL_NAME:-ibm-granite/granite-4.1-30b-fp8} + temperature: 0.0 + max_tokens: 2000 + top_p: 0.01 + + doc_vdb_retriever_llm: + _type: openai + api_key: "EMPTY" + base_url: ${NVIDIA_API_BASE:-https://integrate.api.nvidia.com/v1} + model_name: ${DOC_VDB_RETRIEVER_MODEL_NAME:-ibm-granite/granite-4.1-30b-fp8} + temperature: 0.0 + max_tokens: 2000 + top_p: 0.01 + + cve_agent_executor_llm: + _type: openai + api_key: "EMPTY" + base_url: ${NVIDIA_API_BASE:-https://integrate.api.nvidia.com/v1} + model_name: ${CVE_AGENT_EXECUTOR_MODEL_NAME:-ibm-granite/granite-4.1-30b-fp8} + temperature: 0.0 + max_tokens: 2000 + top_p: 0.01 + + generate_cvss_llm: + _type: openai + api_key: "EMPTY" + base_url: ${NVIDIA_API_BASE:-https://integrate.api.nvidia.com/v1} + model_name: ${GENERATE_CVSS_MODEL_NAME:-ibm-granite/granite-4.1-30b-fp8} + temperature: 0.0 + max_tokens: 1024 + top_p: 0.01 + + summarize_llm: + _type: openai + api_key: "EMPTY" + base_url: ${NVIDIA_API_BASE:-https://integrate.api.nvidia.com/v1} + model_name: ${SUMMARIZE_MODEL_NAME:-ibm-granite/granite-4.1-30b-fp8} + temperature: 0.0 + max_tokens: 1024 + top_p: 0.01 + + justify_llm: + _type: openai + api_key: "EMPTY" + base_url: ${NVIDIA_API_BASE:-https://integrate.api.nvidia.com/v1} + model_name: ${JUSTIFY_MODEL_NAME:-ibm-granite/granite-4.1-30b-fp8} + temperature: 0.0 + max_tokens: 1024 + top_p: 0.01 + + intel_source_score_llm: + _type: openai + api_key: "EMPTY" + base_url: ${NVIDIA_API_BASE:-https://integrate.api.nvidia.com/v1} + model_name: ${JUSTIFY_MODEL_NAME:-ibm-granite/granite-4.1-30b-fp8} + temperature: 0.0 + max_tokens: 1024 + top_p: 0.01 + +# Embedder definition kept but not used (ignore_code_embedding: true means no VDB build; +# the semantic search tools are removed from the agent tool list). +embedders: + nim_embedder: + _type: nim + base_url: ${NIM_EMBED_BASE_URL:-https://integrate.api.nvidia.com/v1} + model_name: ${EMBEDDER_MODEL_NAME:-nvidia/nv-embedqa-e5-v5} + truncate: END + max_batch_size: 128 + +workflow: + _type: cve_agent + cve_generate_vdbs_name: cve_generate_vdbs + cve_fetch_intel_name: cve_fetch_intel + cve_calculate_intel_score_name: cve_calculate_intel_score + cve_process_sbom_name: cve_process_sbom + cve_check_vuln_deps_name: cve_check_vuln_deps + cve_checklist_name: cve_checklist + cve_agent_executor_name: cve_agent_executor + cve_generate_cvss_name: cve_generate_cvss + cve_generate_vex_name: cve_generate_vex + cve_summarize_name: cve_summarize + cve_justify_name: cve_justify + cve_output_config_name: cve_file_output diff --git a/src/vuln_analysis/functions/cve_agent.py b/src/vuln_analysis/functions/cve_agent.py index d1fa7d4d7..32b96e4e9 100644 --- a/src/vuln_analysis/functions/cve_agent.py +++ b/src/vuln_analysis/functions/cve_agent.py @@ -57,6 +57,11 @@ class CVEAgentExecutorToolConfig(FunctionBaseConfig, name="cve_agent_executor"): description= "Manually set the prompt for the specific model in the configuration. The prompt can either be passed in as a " "string of text or as a path to a text file containing the desired prompting.") + model_family: str = Field( + default="", + description="LLM family for model-specific prompt selection (e.g. 'granite'). " + "When set and prompt is None, the agent sys prompt is loaded from multi_prompt_factory.", + ) prompt_examples: bool = Field(default=False, description="Whether to include examples in agent prompt.") replace_exceptions: bool = Field(default=False, description="Whether to replace exception message with custom message.") diff --git a/src/vuln_analysis/functions/cve_checklist.py b/src/vuln_analysis/functions/cve_checklist.py index ab6f3c985..23111ab98 100644 --- a/src/vuln_analysis/functions/cve_checklist.py +++ b/src/vuln_analysis/functions/cve_checklist.py @@ -25,7 +25,7 @@ from pydantic import Field from exploit_iq_commons.utils import data_utils from exploit_iq_commons.logging.loggers_factory import LoggingFactory, trace_id - +from vuln_analysis.utils.multi_prompt_factory import PromptId, get_prompt, PromptCatalogError, LanguageAdjustments, parse_model_name logger = LoggingFactory.get_agent_logger(__name__) @@ -43,6 +43,11 @@ class CVEChecklistToolConfig(FunctionBaseConfig, name="cve_checklist"): description= "Manually set the prompt for the specific model in the configuration. The prompt can either be passed in as a " "string of text or as a path to a text file containing the desired prompting.") + model_family: str = Field( + default="", + description="LLM family for model-specific prompt selection (e.g. 'granite'). " + "When set and prompt is None, the checklist prompt is loaded from multi_prompt_factory.", + ) @register_function(config_type=CVEChecklistToolConfig, framework_wrappers=[LLMFrameworkEnum.LANGCHAIN]) @@ -58,17 +63,38 @@ async def cve_checklist(config: CVEChecklistToolConfig, builder: Builder): agent_config = builder.get_function_config(config.agent_name) agent_tool_names = agent_config.tool_names if hasattr(agent_config, 'tool_names') else None + # checklist_prompt = config.prompt + # if checklist_prompt is None and config.model_family: + # try: + # checklist_prompt = get_prompt(PromptId.CHECKLIST_MAIN, llm=[config.model_family, ""]) + # except PromptCatalogError: + # pass # fall through to DEFAULT_CHECKLIST_PROMPT in generate_checklist() + async def generate_checklist_for_cve(cve_intel, ecosystem: str = ""): + _prompt = config.prompt + + if config.model_family: + llm_family_name = [config.model_family, ""] + else: + llm_family_name = parse_model_name(llm.model_name) - checklist = await generate_checklist(prompt=config.prompt, + if ecosystem: + lang = LanguageAdjustments(ecosystem.capitalize()) + else: + lang = None + + if _prompt is None: + _prompt = get_prompt(PromptId.CHECKLIST_MAIN, llm=llm_family_name, language=lang) + + checklist = await generate_checklist(prompt=_prompt, llm=llm, input_dict=cve_intel, tool_names=agent_tool_names, enable_llm_list_parsing=False, ecosystem=ecosystem) - checklist = await _parse_list([checklist]) + checklist = await _parse_list([checklist]) return cve_intel["vuln_id"], checklist[0] async def _arun(state: AgentMorpheusEngineState) -> AgentMorpheusEngineState: diff --git a/src/vuln_analysis/functions/cve_generate_cvss.py b/src/vuln_analysis/functions/cve_generate_cvss.py index 8343d1c1b..a00994905 100644 --- a/src/vuln_analysis/functions/cve_generate_cvss.py +++ b/src/vuln_analysis/functions/cve_generate_cvss.py @@ -36,6 +36,8 @@ from vuln_analysis.data_models.state import AgentMorpheusEngineState from vuln_analysis.tools.tool_names import ToolNames from vuln_analysis.utils.prompting import get_cvss_prompt +from vuln_analysis.utils.multi_prompt_factory import PromptId, get_prompt, parse_model_name + from exploit_iq_commons.logging.loggers_factory import LoggingFactory logger = LoggingFactory.get_agent_logger(__name__) @@ -197,9 +199,15 @@ async def _create_agent(config: CVEGenerateCvssToolConfig, builder: Builder, ] # Get prompt (examples now embedded in template) - prompt = PromptTemplate.from_template( - get_cvss_prompt(config.prompt, config.prompt_examples) - ) + + if config.prompt: + prompt_template = config.prompt + else: + parsed_model_name = parse_model_name(llm.model_name) + prompt_template = get_prompt(PromptId.GENERATE_CVSS, llm=parsed_model_name) + + + prompt = PromptTemplate.from_template(prompt_template) error_handler = _make_parse_error_handler(is_openai) diff --git a/src/vuln_analysis/functions/cve_generate_vdbs.py b/src/vuln_analysis/functions/cve_generate_vdbs.py index df3e9c542..cea700174 100644 --- a/src/vuln_analysis/functions/cve_generate_vdbs.py +++ b/src/vuln_analysis/functions/cve_generate_vdbs.py @@ -200,7 +200,13 @@ async def _arun(message: AgentMorpheusInput) -> AgentMorpheusEngineInput: # Build VDBs (credential_id is propagated via async context) with credential_context(message.credential_id): logger.debug("_arun: credential_context entered, credential_id=%r", message.credential_id) - vdb_code_path, vdb_doc_path = embedder.build_vdbs(source_infos, config.ignore_code_embedding) + # When ignore_code_embedding is True, also skip doc VDBs + vdb_source_infos = ( + [si for si in source_infos if si.type != "doc"] + if config.ignore_code_embedding + else source_infos + ) + vdb_code_path, vdb_doc_path = embedder.build_vdbs(vdb_source_infos, config.ignore_code_embedding) if (vdb_code_path is None): # Only log warning if we're not ignoring code embeddings diff --git a/src/vuln_analysis/functions/cve_justify.py b/src/vuln_analysis/functions/cve_justify.py index 8124b4222..97d431623 100644 --- a/src/vuln_analysis/functions/cve_justify.py +++ b/src/vuln_analysis/functions/cve_justify.py @@ -23,6 +23,7 @@ from pydantic import Field from exploit_iq_commons.logging.loggers_factory import LoggingFactory, trace_id +from vuln_analysis.utils.multi_prompt_factory import PromptId, get_prompt, parse_model_name logger = LoggingFactory.get_agent_logger(__name__) @@ -32,6 +33,11 @@ class CVEJustifyToolConfig(FunctionBaseConfig, name="cve_justify"): Defines a function that assigns justification label and reason to each CVE based on summary. """ llm_name: str = Field(description="The LLM model to use") + model_family: str = Field( + default="", + description="LLM family for model-specific prompt selection (e.g. 'granite'). " + "When set, the prompt is loaded from multi_prompt_factory using PromptId.JUSTIFICATION.", + ) @register_function(config_type=CVEJustifyToolConfig, framework_wrappers=[LLMFrameworkEnum.LANGCHAIN]) @@ -46,7 +52,16 @@ async def cve_justify(config: CVEJustifyToolConfig, builder: Builder): llm = await builder.get_llm(llm_name=config.llm_name, wrapper_type=LLMFrameworkEnum.LANGCHAIN) - prompt = PromptTemplate(input_variables=["summary"], template=jp.JUSTIFICATION_PROMPT) + if config.model_family: + justification_prompt_str = get_prompt( + PromptId.JUSTIFICATION, + llm=[config.model_family, ""], + ) + prompt = PromptTemplate(input_variables=["summary"], template=justification_prompt_str) + else: + parsed_model_name = parse_model_name(llm.model_name) + template_prompt = get_prompt(PromptId.JUSTIFICATION, llm=parsed_model_name) + prompt = PromptTemplate(input_variables=["summary"], template=template_prompt) chain = prompt | llm async def justify_cve(summary): diff --git a/src/vuln_analysis/functions/cve_summarize.py b/src/vuln_analysis/functions/cve_summarize.py index 3bd61f794..5bf48cff1 100644 --- a/src/vuln_analysis/functions/cve_summarize.py +++ b/src/vuln_analysis/functions/cve_summarize.py @@ -25,6 +25,7 @@ from exploit_iq_commons.utils.string_utils import get_checklist_item_string from exploit_iq_commons.logging.loggers_factory import LoggingFactory, trace_id +from vuln_analysis.utils.multi_prompt_factory import PromptId, get_prompt, parse_model_name, PromptCatalogError, LanguageAdjustments logger = LoggingFactory.get_agent_logger(__name__) @@ -34,6 +35,11 @@ class CVESummarizeToolConfig(FunctionBaseConfig, name="cve_summarize"): Defines a function that generates concise, human-readable summarization paragraph from agent results. """ llm_name: str = Field(description="The LLM model to use") + model_family: str = Field( + default="", + description="LLM family for model-specific prompt selection (e.g. 'granite'). " + "When set, the prompt and gate messages are loaded from multi_prompt_factory.", + ) def _all_cca_not_reachable(checklist_items: list[dict]) -> bool: @@ -70,42 +76,64 @@ async def cve_summarize(config: CVESummarizeToolConfig, builder: Builder): llm = await builder.get_llm(llm_name=config.llm_name, wrapper_type=LLMFrameworkEnum.LANGCHAIN) + if config.model_family: + from vuln_analysis.utils.granite import ( + PACKAGE_PRESENCE_GATE_MSG as _PKG_GATE, + REACHABILITY_GATE_MSG as _REACH_GATE, + ) + _summary_prompt_template = get_prompt( + PromptId.SUMMARY, + llm=[config.model_family, ""], + ) + _pkg_gate_msg = _PKG_GATE + _reach_gate_msg = _REACH_GATE + else: + _summary_prompt_template = SUMMARY_PROMPT + _pkg_gate_msg = ( + "PACKAGE PRESENCE GATE: Function Locator confirmed the vulnerable package " + "is NOT present in this container. A CVE cannot be exploitable if the " + "vulnerable package does not exist. Code matches from other packages are " + "irrelevant. The verdict MUST be 'not exploitable'.\n\n" + ) + _reach_gate_msg = ( + "REACHABILITY GATE: Call Chain Analyzer confirmed the vulnerable function " + "is NOT reachable from application code in ALL reachability checks performed. " + "An unreachable function cannot be exploited regardless of other findings " + "(missing mitigations, absent protections, etc. are irrelevant if the code " + "path is never executed). The verdict MUST be 'not exploitable'.\n\n" + ) + async def summarize_cve(results, ecosystem: str = ""): checklist_items = results[1] response = '\n'.join( [get_checklist_item_string(idx + 1, checklist_item) for idx, checklist_item in enumerate(checklist_items)]) if _package_not_found(checklist_items): - response = ( - "PACKAGE PRESENCE GATE: Function Locator confirmed the vulnerable package " - "is NOT present in this container. A CVE cannot be exploitable if the " - "vulnerable package does not exist. Code matches from other packages are " - "irrelevant. The verdict MUST be 'not exploitable'.\n\n" - + response - ) + response = _pkg_gate_msg + response logger.info("Package presence gate activated: target package not found") elif _all_cca_not_reachable(checklist_items): - response = ( - "REACHABILITY GATE: Call Chain Analyzer confirmed the vulnerable function " - "is NOT reachable from application code in ALL reachability checks performed. " - "An unreachable function cannot be exploited regardless of other findings " - "(missing mitigations, absent protections, etc. are irrelevant if the code " - "path is never executed). The verdict MUST be 'not exploitable'.\n\n" - + response - ) + response = _reach_gate_msg + response logger.info("Reachability gate activated: all CCA results are negative") - summary_prompt = SUMMARY_PROMPT - if ecosystem.lower() == "java": - summary_prompt = summary_prompt.replace( - "3. FOCUS: Use only definitive checklist results; ignore inconclusive items", - "3. FOCUS: Use only definitive checklist results; ignore inconclusive items\n\n" - "4. CRITICAL: If ANY checklist item reports that Call Chain Analyzer confirmed a vulnerable\n" - " function is REACHABLE (True) from application code, the verdict MUST be \"exploitable\"\n" - " unless another item provides definitive contrary evidence (e.g., version check confirmed\n" - " the installed version is fixed). Negative results from other items using different function\n" - " names or wrong inputs do NOT override a confirmed positive reachability finding." - ) + # summary_prompt = _summary_prompt_template + # if ecosystem.lower() == "java": + # summary_prompt = summary_prompt.replace( + # "3. FOCUS: Use only definitive checklist results; ignore inconclusive items", + # "3. FOCUS: Use only definitive checklist results; ignore inconclusive items\n\n" + # "4. CRITICAL: If ANY checklist item reports that Call Chain Analyzer confirmed a vulnerable\n" + # " function is REACHABLE (True) from application code, the verdict MUST be \"exploitable\"\n" + # " unless another item provides definitive contrary evidence (e.g., version check confirmed\n" + # " the installed version is fixed). Negative results from other items using different function\n" + # " names or wrong inputs do NOT override a confirmed positive reachability finding." + # ) + parsed_model_name = parse_model_name(llm.model_name) + lang: LanguageAdjustments = LanguageAdjustments.GROUP1 + if ecosystem: + lang = LanguageAdjustments(ecosystem.capitalize()) + if ecosystem == LanguageAdjustments.JAVA.value: + lang = LanguageAdjustments.JAVA + + summary_prompt = get_prompt(PromptId.SUMMARY, llm=parsed_model_name, language=lang) prompt = PromptTemplate(input_variables=["response"], template=summary_prompt) chain = prompt | llm final_summary = await chain.ainvoke({"response": response}) diff --git a/src/vuln_analysis/functions/reachability_agent.py b/src/vuln_analysis/functions/reachability_agent.py index 1801e82d3..5e5feb4b9 100644 --- a/src/vuln_analysis/functions/reachability_agent.py +++ b/src/vuln_analysis/functions/reachability_agent.py @@ -45,6 +45,7 @@ from pathlib import Path from exploit_iq_commons.utils.git_utils import sanitize_git_url_for_path from exploit_iq_commons.utils.data_utils import DEFAULT_GIT_DIRECTORY +from vuln_analysis.utils.multi_prompt_factory import LanguageAdjustments, PromptId, get_prompt, parse_model_name logger = LoggingFactory.get_agent_logger(__name__) AGENT_TRACER = Context.get() @@ -202,7 +203,29 @@ async def pre_process_node(self, state: AgentState) -> AgentState: if is_reachability == "yes": tool_guidance_local, descriptions_local = self._build_tool_guidance_for_ecosystem(ecosystem, self.tools) go_instructions = {"instructions": REACHABILITY_AGENT_THOUGHT_INSTRUCTIONS_GO} if ecosystem == "go" else {} - runtime_prompt = build_reachability_system_prompt(descriptions_local, tool_guidance_local, **go_instructions) + model_sys_prompt = None + prompt_language = None + if ecosystem: + prompt_language = LanguageAdjustments(ecosystem.capitalize()) + if getattr(self.config, "model_family", ""): + model_sys_prompt = get_prompt( + PromptId.REACHABILITY_AGENT_SYS, + llm=[self.config.model_family, ""], + language=prompt_language + ) + else: + parsed_model_name = parse_model_name(self._classification_llm.first.bound.model_name) + model_sys_prompt = get_prompt( + PromptId.REACHABILITY_AGENT_SYS, + llm=parsed_model_name, + language=prompt_language + ) + + runtime_prompt = build_reachability_system_prompt( + descriptions_local, tool_guidance_local, + sys_prompt=model_sys_prompt, + **go_instructions, + ) active_tool_names = [t.name for t in self.tools] else: reachability_tool_names = {ToolNames.CALL_CHAIN_ANALYZER, ToolNames.FUNCTION_CALLER_FINDER} @@ -239,6 +262,11 @@ async def pre_process_node(self, state: AgentState) -> AgentState: def check_finish_allowed(self, state: AgentState) -> tuple[bool, str]: if state.get("is_reachability") != "yes": return True, "" + # If Function Locator confirmed the target package is absent, CCA is + # pointless and the summary package-presence gate will handle the verdict. + # Allow finishing immediately rather than looping until max_iterations. + if state.get("package_validated") is False: + return True, "" rules_tracker = state.get("rules_tracker") cca_results = state.get("cca_results", []) return rules_tracker.check_finish_allowed(cca_results) @@ -273,7 +301,7 @@ async def forced_finish_node(self, state: AgentState) -> AgentState: "vulnerable function is reachable from application code. Library presence " "in dependencies alone does NOT constitute exploitability. " "You MUST conclude that there is insufficient evidence to confirm " - "exploitability — the function was NOT confirmed reachable." + "exploitability - the function was NOT confirmed reachable." ) messages.append(HumanMessage(content=no_cca_prompt)) keep_tail = 2 if context_block else 1 diff --git a/src/vuln_analysis/functions/react_internals.py b/src/vuln_analysis/functions/react_internals.py index 803400675..854ad76fd 100644 --- a/src/vuln_analysis/functions/react_internals.py +++ b/src/vuln_analysis/functions/react_internals.py @@ -493,7 +493,7 @@ class AgentState(MessagesState): NEW OUTPUT: {tool_output} -CRITICAL — CALL CHAIN ANALYZER REACHABILITY: +CRITICAL - CALL CHAIN ANALYZER REACHABILITY: When TOOL USED is "Call Chain Analyzer": - If the result is POSITIVE (reachable): your first finding MUST be "REACHABLE via [package] - sufficient evidence." Do NOT hedge, qualify, or say "further investigation required." - If the result is NEGATIVE (not reachable): your first finding MUST be "NOT reachable via [package]." @@ -612,7 +612,7 @@ def build_package_filter_prompt( image_match_note = f'MATCH DETECTED: candidate "{matched}" matches the container image/repo. Select it (Rule 1).' critical_context_section = "" else: - image_match_note = "NO MATCH: no candidate package name was found in the image/repo identifier. Rule 1 does not apply — use Rule 2." + image_match_note = "NO MATCH: no candidate package name was found in the image/repo identifier. Rule 1 does not apply - use Rule 2." if critical_context: context_block = "\n".join(critical_context) critical_context_section = f"\nVulnerability context (use to disambiguate candidates):\n{context_block}\n" @@ -636,6 +636,13 @@ def _build_tool_arguments(actions: ToolCall)->dict[str, Any]: return {"query": actions.query} if actions.tool_input: return {"query": actions.tool_input} # fallback + # Last resort: use reason as query + if actions.reason: + logger.warning( + "Tool '%s': query and tool_input are None; falling back to reason as query", + actions.tool, + ) + return {"query": actions.reason} logger.warning("Tool '%s' called without required arguments (package_name=%s, function_name=%s, query=%s)", actions.tool, actions.package_name, actions.function_name, actions.query) raise ValueError(f"Tool {actions.tool} requires package_name+function_name or query/tool_input") diff --git a/src/vuln_analysis/utils/gemma.py b/src/vuln_analysis/utils/gemma.py new file mode 100644 index 000000000..75392105d --- /dev/null +++ b/src/vuln_analysis/utils/gemma.py @@ -0,0 +1,320 @@ +""" +Gemma-specific prompt definitions for the ExploitIQ vulnerability analysis pipeline. + +All prompts target Google Gemma 4 31B Instruct (parsed model family: "gemma"). +Referenced by multi_prompt_factory._CATALOG when model_family == "gemma". +""" + +# --------------------------------------------------------------------------- +# Checklist prompt +# --------------------------------------------------------------------------- +# Examples are pre-formatted at import time; {{ tool_descriptions }} is filled +# at runtime by generate_checklist() via Jinja2 rendering. + +def _build_checklist_prompt() -> str: + from vuln_analysis.utils.prompting import get_mod_examples + examples = get_mod_examples() + return ( + "\n" + "Generate an investigation checklist to assess whether a CVE is exploitable\n" + "inside a containerized environment. Output a single Python list: comma-separated\n" + "items enclosed in square brackets, each item enclosed in double quotes.\n" + "\n" + "\n" + "\n" + "Produce 3-5 items following these rules strictly:\n" + "\n" + "1. FIRST ITEM - reachability check:\n" + " If the CVE names a specific vulnerable function/method, the first item MUST\n" + " ask whether that exact function (with its package) is called in the codebase.\n" + ' Template: "Is the function from the package called or\n' + ' reachable in the application codebase?"\n' + "\n" + " DESERIALIZATION EXCEPTION: For deserialization CVEs (XStream, ObjectInputStream,\n" + " Jackson, Kryo, YAML, CWE-502), the first item MUST name the READ/DESERIALIZE\n" + " method, not the serialize counterpart:\n" + " XStream -> fromXML / unmarshal (NOT toXML)\n" + " ObjectInputStream -> readObject (NOT writeObject)\n" + " Jackson -> readValue (NOT writeValue)\n" + ' Example: "Is the XStream.fromXML method called in the codebase?"\n' + "\n" + "2. SUBSEQUENT ITEMS - exploitability chain in priority order:\n" + " a. Is the vulnerable code path reachable from application entry points?\n" + " b. Can attacker-controlled input reach the vulnerable function?\n" + " c. Are any mitigations active (config flags, version guards, patches)?\n" + " d. Are exploit prerequisites (specific env, dependency) present?\n" + "\n" + "3. QUESTION FORMAT - each item must be a single yes/no question beginning with\n" + " Is / Are / Does / Can / Has / Will.\n" + "\n" + "4. TECHNICAL PRECISION - use exact function names, class names, configuration\n" + " keys, and package names from the CVE description.\n" + "\n" + "5. AVAILABLE TOOLS:\n" + " {{ tool_descriptions }}\n" + " Design each question so it can be answered using the tools above.\n" + "\n" + "6. SCOPE - package version is already confirmed; focus only on exploitability\n" + " factors.\n" + "\n" + "\n" + "\n" + + examples + "\n" + "\n" + "\n" + "\n" + ) + +CHECKLIST_PROMPT = _build_checklist_prompt() + +# --------------------------------------------------------------------------- +# Reachability agent system prompt +# --------------------------------------------------------------------------- +REACHABILITY_AGENT_SYS_PROMPT = ( + "You are a security analyst determining whether a CVE is exploitable in a container.\n" + "\n" + "MANDATORY STEPS - follow in order, never skip:\n" + "1. IDENTIFY - read the CVE and extract the vulnerable function or component.\n" + "2. SEARCH - run Code Keyword Search to check whether the vulnerable code is\n" + " present anywhere in the container.\n" + "3. LOCATE - run Function Locator to confirm the exact package and function name.\n" + " If Function Locator reports the package is NOT valid, that is evidence of absence.\n" + " If Function Locator FAILS (error / no result), continue to step 4 anyway.\n" + "4. TRACE - run Call Chain Analyzer (CCA) regardless of whether Function Locator\n" + " succeeded. If FL failed or returned 'not valid', use the best package/function\n" + " pair identified by Code Keyword Search in step 2.\n" + " - For Go: run Function Caller Finder BEFORE Call Chain Analyzer.\n" + " - You MUST attempt CCA before finishing any reachability question.\n" + "5. CONCLUDE - after CCA, state your conclusion with evidence.\n" + "\n" + "WHAT EACH TOOL PROVES:\n" + "- Code Keyword Search -> code is PRESENT in the container (not reachability).\n" + "- Function Locator -> package/function name is VALID (not reachability).\n" + " A FL 'not valid' result means the package is ABSENT from the container.\n" + " A FL failure/error does NOT mean the package is absent - try CCA anyway.\n" + "- Call Chain Analyzer -> the ONLY tool that confirms reachability.\n" + " CCA returns True -> function IS reachable; further assessment needed.\n" + " CCA returns False -> function is NOT reachable; conclude not exploitable.\n" + "\n" + "HARD RULES:\n" + "- Never conclude 'exploitable' based on Code Keyword Search alone.\n" + "- Never conclude 'exploitable' based on Function Locator alone.\n" + "- You MUST attempt CCA before concluding. FL failure does NOT excuse skipping CCA.\n" + "- If CCA returns False, conclude NOT reachable regardless of other findings.\n" + "- If CCA returns True, the function IS reachable; assess remaining conditions.\n" + "- If search returns empty results, that is evidence the code is absent.\n" + "- Do not fabricate findings. If a tool returns nothing, report nothing found.\n" + "\n" + "VENDOR-ONLY RULE:\n" + "If Code Keyword Search returns results for the vulnerable function EXCLUSIVELY in\n" + "vendor/ directories, library dependency files, or architecture-specific syscall\n" + "tables (e.g. golang.org/x/sys/unix/zsysnum_*.go), AND no results in the main\n" + "application source code, you MAY conclude 'the function is not reachable from\n" + "application code' even if FL or CCA failed to execute. The absence of any\n" + "application-level call sites is itself evidence of non-reachability. State clearly:\n" + "'[function] found only in vendor/dependency files; no application-level\n" + "invocation exists - not reachable from application code.'\n" + "Do NOT state 'tools failed' and leave reachability undetermined when step 2\n" + "already established there are zero application-level call sites.\n" + "\n" + "DESERIALIZATION RULE:\n" + "When the CVE is about deserialization (XStream, Java ObjectInputStream, Jackson,\n" + "Kryo, YAML parsers, or CWE-502), the vulnerable function is the READ/DESERIALIZE\n" + "direction. Use these as the function_name for FL and CCA:\n" + " XStream -> fromXML, unmarshal\n" + " ObjectInputStream -> readObject, readResolve\n" + " Jackson -> readValue, treeToValue\n" + " Kryo -> readObject, read\n" + "Do NOT test the WRITE/SERIALIZE counterpart (toXML, writeObject, toJson, write).\n" + "CCA=True for a serialization method does NOT confirm the deserialization path is\n" + "reachable.\n" + "\n" + "CONSERVATIVE DEFAULT:\n" + "When CCA was not attempted (tool failure) AND code IS found in application source,\n" + "default to 'not confirmed reachable'. Only affirm reachability when CCA returned True.\n" + "\n" + "OUTPUT QUALITY:\n" + "- Be concise. Final answers must be 3-5 sentences maximum.\n" + "- Cite the specific tool result that drives your conclusion.\n" + "- Keep PRESENT / REACHABLE / EXPLOITABLE distinct.\n" + "- Do not repeat the question in your answer.\n" +) + +# --------------------------------------------------------------------------- +# Summary prompt +# --------------------------------------------------------------------------- +SUMMARY_PROMPT = """ +Summarize a CVE exploitability investigation into a concise evidence-based paragraph. + + + +Write exactly 3-5 sentences structured as follows: + +Sentence 1 - VERDICT (required): + Choose ONE of: + "The CVE is not exploitable in this container." + "The CVE is exploitable in this container." + "Exploitability of this CVE is uncertain." + + DEFAULT RULE: Use "not exploitable" unless the checklist results + definitively confirm ALL three conditions: + (a) the vulnerable code is present in the container, + (b) Call Chain Analyzer returned True (function is reachable), AND + (c) no active mitigation blocks the exploit path. + If ANY condition is unconfirmed, missing, or ambiguous, the verdict + MUST be "not exploitable". + +Sentences 2-4 - EVIDENCE (required): + - Name the exact functions, packages, or files that were found or not found. + - State whether Call Chain Analyzer (CCA) was run and what it returned. + - If CCA returned False: "Call Chain Analyzer confirmed the function is not + reachable from application code." + - If CCA was not run: "Reachability was not confirmed by Call Chain Analyzer." + - Connect each finding directly to the verdict. + +Sentence 5 - MITIGATION (optional): + Note any active mitigations found if relevant. + +AVOID: + - Speculation or inferences not supported by tool results. + - Stating a function is "likely reachable" or "probably exploitable". + - Ignoring a CCA False result in favour of other evidence. + - Verbose explanations - keep to 3-5 sentences total. + + + +The CVE is not exploitable in this container. The vulnerable function +PIL.ImageMath.eval is present in the installed Pillow library (confirmed by Code +Keyword Search), but Call Chain Analyzer confirmed it is not reachable from +application code. The application only calls PIL.Image.open() and +PIL.Image.thumbnail(), neither of which triggers the vulnerable code path. + + + +{response} + + +Write your summary paragraph:""" + +# --------------------------------------------------------------------------- +# Justification prompt +# --------------------------------------------------------------------------- +JUSTIFICATION_PROMPT = """ +Classify a CVE exploitability investigation summary into one of 12 categories +and provide a one-sentence justification. + + + +Apply categories in strict precedence order. Select the FIRST that applies. + + 1. false_positive + The CVE-to-package mapping is incorrect (wrong package, mismatched CVE). + + 2. code_not_present + The vulnerable library or code is absent from the container. + (Takes precedence over all downstream factors.) + + 3. code_not_reachable + Use when EITHER of the following applies: + (a) Call Chain Analyzer explicitly returned False (definitive test), OR + (b) Code Keyword Search found the vulnerable function ONLY in vendor/ directories + or architecture-specific dependency files (e.g. golang.org/x/sys/unix/zsysnum_*.go), + with ZERO occurrences in the main application source code. Absence of any + application-level call sites is sufficient evidence even without a successful CCA. + Do NOT use if code IS found in the main application source but reachability is + undetermined - use "uncertain" (11) in that case. + + 4. requires_configuration + Exploitation requires a specific configuration option that is disabled. + + 5. requires_dependency + Exploitation requires a dependency that is not installed. + + 6. requires_environment + Exploitation requires a specific environment that is absent. + + 7. compiler_protected + Compiler flags prevent successful exploitation. + + 8. runtime_protected + Runtime mechanisms (ASLR, DEP, sandboxing) prevent exploitation. + + 9. perimeter_protected + Network or perimeter defenses block the attack vector. + +10. mitigating_control_protected + Other security controls sufficiently reduce exploitability. + +11. uncertain + The investigation did not gather enough evidence to determine exploitability. + Use this when the summary is inconclusive or CCA was never run on a + reachability question. + +12. vulnerable + The package is genuinely exploitable and requires patching. + ALL of the following must be confirmed by tool evidence: + - Vulnerable code is present in the container. + - Vulnerable function is reachable (CCA returned True). + - Attacker-controlled input can trigger the vulnerable function. + - No effective mitigation prevents exploitation. + If ANY condition is unconfirmed, do NOT classify as "vulnerable". + Use "uncertain" when critical evidence is missing (code presence unconfirmed). + If code IS present and CCA returned True but attacker-input path is unclear, + classify as "vulnerable" — incomplete attacker-path evidence does not negate confirmed reachability. + + + +1. Read the investigation summary carefully. +2. Test category 1 first, then 2, 3, 4 in order. +3. Select the FIRST category whose conditions are met. +4. Do not skip categories or jump to "vulnerable" without testing all prior ones. +5. "uncertain" is always preferable to "vulnerable" when evidence is insufficient. + + + +Exactly two lines, no labels: + +Line 1: category_name (exact name from the list, lower_snake_case) +Line 2: reasoning (one sentence citing the key evidence from the summary) + + + +code_not_present +Function Locator confirmed the vulnerable openssl library is not installed in the container. + +code_not_reachable +Call Chain Analyzer returned False - PIL.ImageMath.eval is never invoked from application code despite being present in the installed Pillow package. + +code_not_reachable +The vulnerable execve syscall wrapper was found only in vendor/golang.org/x/sys/unix auto-generated binding files (Application library dependencies), with no direct call sites in the main application source; CCA was attempted but failed due to FL validation error. + +uncertain +Reachability could not be confirmed: the vulnerable library is present in the container and the main application contains call sites, but CCA was never run and the investigation could not determine whether the function is reachable. + +vulnerable +Call Chain Analyzer returned True showing urllib.parse.urlparse is reachable from the public API handler, and no blocklisting mitigation is active. + + + +{summary} + + +Classification and reasoning:""" + +# --------------------------------------------------------------------------- +# Gate messages (prepended by cve_summarize when triggered) +# --------------------------------------------------------------------------- +PACKAGE_PRESENCE_GATE_MSG = ( + "PACKAGE PRESENCE GATE: Function Locator confirmed the vulnerable package " + "is NOT present in this container. A CVE cannot be exploitable if the " + "vulnerable package does not exist. Code matches from other packages are " + "irrelevant. The verdict MUST be 'not exploitable'.\n\n" +) + +REACHABILITY_GATE_MSG = ( + "REACHABILITY GATE: Call Chain Analyzer confirmed the vulnerable function " + "is NOT reachable from application code in ALL reachability checks performed. " + "An unreachable function cannot be exploited regardless of other findings " + "(missing mitigations, absent protections, etc. are irrelevant if the code " + "path is never executed). The verdict MUST be 'not exploitable'.\n\n" +) diff --git a/src/vuln_analysis/utils/granite.py b/src/vuln_analysis/utils/granite.py new file mode 100644 index 000000000..286665374 --- /dev/null +++ b/src/vuln_analysis/utils/granite.py @@ -0,0 +1,331 @@ +""" +Granite-specific prompt definitions for the ExploitIQ vulnerability analysis pipeline. + +All prompts target IBM Granite 4.1 (parsed model family: "granite"). +Referenced by multi_prompt_factory._CATALOG when model_family == "granite". +""" + +# --------------------------------------------------------------------------- +# Checklist prompt +# --------------------------------------------------------------------------- +# Used by cve_checklist when model_family == "granite". +# Added to the granite catalog in multi_prompt_factory._granite_catalog(). +# +# Template notes: +# {{ tool_descriptions }} - Jinja2 variable; filled at runtime from +# generate_checklist() via input_dict_with_tools. +# Examples are pre-formatted below at import time using get_mod_examples(). + +def _build_checklist_prompt() -> str: + from vuln_analysis.utils.prompting import get_mod_examples + examples = get_mod_examples() + return ( + "\n" + "Generate an investigation checklist to assess whether a CVE is exploitable\n" + "inside a containerized environment. Output a single Python list: comma-separated\n" + "items enclosed in square brackets, each item enclosed in double quotes.\n" + "\n" + "\n" + "\n" + "Produce 3-5 items following these rules strictly:\n" + "\n" + "1. FIRST ITEM - reachability check:\n" + " If the CVE names a specific vulnerable function/method, the first item MUST\n" + " ask whether that exact function (with its package) is called in the codebase.\n" + ' Template: "Is the function from the package called or\n' + ' reachable in the application codebase?"\n' + "\n" + " DESERIALIZATION EXCEPTION: For deserialization CVEs (XStream, ObjectInputStream,\n" + " Jackson, Kryo, YAML, CWE-502), the first item MUST name the READ/DESERIALIZE\n" + " method, not the serialize counterpart:\n" + " XStream -> fromXML / unmarshal (NOT toXML)\n" + " ObjectInputStream -> readObject (NOT writeObject)\n" + " Jackson -> readValue (NOT writeValue)\n" + " Example: 'Is the XStream.fromXML method called in the codebase?'\n" + "\n" + "2. SUBSEQUENT ITEMS - exploitability chain in priority order:\n" + " a. Is the vulnerable code path reachable from application entry points?\n" + " b. Can attacker-controlled input reach the vulnerable function?\n" + " c. Are any mitigations active (config flags, version guards, patches)?\n" + " d. Are exploit prerequisites (specific env, dependency) present?\n" + "\n" + "3. QUESTION FORMAT - each item must be a single yes/no question beginning with\n" + " Is / Are / Does / Can / Has / Will.\n" + "\n" + "4. TECHNICAL PRECISION - use exact function names, class names, configuration\n" + " keys, and package names from the CVE description.\n" + "\n" + "5. AVAILABLE TOOLS:\n" + " {{ tool_descriptions }}\n" + " Design each question so it can be answered using the tools above.\n" + "\n" + "6. SCOPE - package version is already confirmed; focus only on exploitability\n" + " factors.\n" + "\n" + "\n" + "\n" + + examples + "\n" + "\n" + "\n" + "\n" + ) + +CHECKLIST_PROMPT = _build_checklist_prompt() + +# --------------------------------------------------------------------------- +# Reachability agent system prompt +# --------------------------------------------------------------------------- +# Used by the reachability sub-agent inside cve_agent_executor when +# model_family == "granite". +REACHABILITY_AGENT_SYS_PROMPT = ( + "You are a security analyst determining whether a CVE is exploitable in a container.\n" + "\n" + "MANDATORY STEPS - follow in order, never skip:\n" + "1. IDENTIFY - read the CVE and extract the vulnerable function or component.\n" + "2. SEARCH - run Code Keyword Search to check whether the vulnerable code is\n" + " present anywhere in the container.\n" + "3. LOCATE - run Function Locator to confirm the exact package and function name.\n" + " If Function Locator reports the package is NOT valid, that is evidence of absence.\n" + " If Function Locator FAILS (error / no result), continue to step 4 anyway.\n" + "4. TRACE - run Call Chain Analyzer (CCA) regardless of whether Function Locator\n" + " succeeded. If FL failed or returned 'not valid', use the best package/function\n" + " pair identified by Code Keyword Search in step 2.\n" + " - For Go: run Function Caller Finder BEFORE Call Chain Analyzer.\n" + " - You MUST attempt CCA before finishing any reachability question.\n" + "5. CONCLUDE - after CCA, state your conclusion with evidence.\n" + "\n" + "WHAT EACH TOOL PROVES:\n" + "- Code Keyword Search -> code is PRESENT in the container (not reachability).\n" + "- Function Locator -> package/function name is VALID (not reachability).\n" + " A FL 'not valid' result means the package is ABSENT from the container.\n" + " A FL failure/error does NOT mean the package is absent - try CCA anyway.\n" + "- Call Chain Analyzer -> the ONLY tool that confirms reachability.\n" + " CCA returns True -> function IS reachable; further assessment needed.\n" + " CCA returns False -> function is NOT reachable; conclude not exploitable.\n" + "\n" + "HARD RULES:\n" + "- Never conclude 'exploitable' based on Code Keyword Search alone.\n" + "- Never conclude 'exploitable' based on Function Locator alone.\n" + "- You MUST attempt CCA before concluding. FL failure does NOT excuse skipping CCA.\n" + "- If CCA returns False, conclude NOT reachable regardless of other findings.\n" + "- If CCA returns True, the function IS reachable; assess remaining conditions.\n" + "- If search returns empty results, that is evidence the code is absent.\n" + "- Do not fabricate findings. If a tool returns nothing, report nothing found.\n" + "\n" + "DESERIALIZATION RULE:\n" + "When the CVE is about deserialization (XStream, Java ObjectInputStream, Jackson,\n" + "Kryo, YAML parsers, or CWE-502), the vulnerable function is the READ/DESERIALIZE\n" + "direction. Use these as the function_name for FL and CCA:\n" + " XStream -> fromXML, unmarshal\n" + " ObjectInputStream -> readObject, readResolve\n" + " Jackson -> readValue, treeToValue\n" + " Kryo -> readObject, read\n" + "Do NOT test the WRITE/SERIALIZE counterpart (toXML, writeObject, toJson, write).\n" + "CCA=True for a serialization method does NOT confirm the deserialization path is\n" + "reachable. If you find only serialization methods are reachable, state this\n" + "explicitly and do NOT conclude the CVE is exploitable.\n" + "\n" + "VENDOR-ONLY RULE:\n" + "If Code Keyword Search returns results for the vulnerable function EXCLUSIVELY in\n" + "vendor/ directories, library dependency files, or architecture-specific syscall\n" + "tables (e.g. golang.org/x/sys/unix/zsysnum_*.go), AND no results in the main\n" + "application source code, you MAY conclude 'the function is not reachable from\n" + "application code' even if FL or CCA failed to execute. The absence of any\n" + "application-level call sites is itself evidence of non-reachability. State clearly:\n" + "'execve/[function] found only in vendor/dependency files; no application-level\n" + "invocation exists - not reachable from application code.'\n" + "Do NOT state 'tools failed' and leave reachability undetermined when Q1 already\n" + "established there are zero application-level call sites.\n" + "\n" + "CONSERVATIVE DEFAULT:\n" + "When CCA was not attempted (tool failure) AND code IS found in application source,\n" + "default to 'not confirmed reachable'. Only affirm reachability when CCA returned True.\n" + "\n" + "ANSWER QUALITY:\n" + "- Cite WHAT you checked, WHAT each tool returned, and WHY that leads to your\n" + " conclusion. Do not give bare assertions.\n" + "- Keep PRESENT / REACHABLE / EXPLOITABLE distinct at all times.\n" + "- If tools conflict, state the conflict rather than ignoring one result.\n" + "- If code was found ONLY in 'Application library dependencies' (not 'Main application'),\n" + " explicitly state this - it is material evidence for the concluding stage.\n" +) + +# --------------------------------------------------------------------------- +# Summary prompt +# --------------------------------------------------------------------------- +# Used by cve_summarize when model_family == "granite". +SUMMARY_PROMPT = """ +Write a concise evidence-based paragraph summarising a CVE exploitability +investigation. The investigation consists of checklist questions and their +tool-backed answers. + + + +Write exactly 3-5 sentences structured as follows: + +Sentence 1 - VERDICT (required): + Choose ONE of: + "The CVE is not exploitable in this container." + "The CVE is exploitable in this container." + "Exploitability of this CVE is uncertain." + + DEFAULT RULE: Use "not exploitable" unless the checklist results + definitively confirm ALL three conditions: + (a) the vulnerable code is present in the container, + (b) Call Chain Analyzer returned True (function is reachable), AND + (c) no active mitigation blocks the exploit path. + If ANY condition is unconfirmed, missing, or ambiguous, the verdict + MUST be "not exploitable". + +Sentences 2-4 - EVIDENCE (required): + - Name the exact functions, packages, or files that were found or not found. + - State whether Call Chain Analyzer (CCA) was run and what it returned. + - If CCA returned False: "Call Chain Analyzer confirmed the function is not + reachable from application code." + - If CCA was not run: "Reachability was not confirmed by Call Chain Analyzer." + - Connect each finding directly to the verdict. + +Sentence 5 - MITIGATION (optional): + Note any active mitigations found (configuration, patches, guards) if relevant. + +AVOID: + - Speculation or inferences not supported by tool results. + - Stating a function is "likely reachable" or "probably exploitable". + - Ignoring a CCA False result in favour of other evidence. + + + +The CVE is not exploitable in this container. The vulnerable function +PIL.ImageMath.eval is present in the installed Pillow library (confirmed by Code +Keyword Search), but Call Chain Analyzer confirmed it is not reachable from +application code. The application only calls PIL.Image.open() and +PIL.Image.thumbnail(), neither of which triggers the vulnerable code path. + + + +{response} + + +Write your summary paragraph:""" + +# --------------------------------------------------------------------------- +# Justification prompt +# --------------------------------------------------------------------------- +# Used by cve_justify when model_family == "granite". +JUSTIFICATION_PROMPT = """ +Classify a CVE exploitability investigation summary into one of 12 categories +and provide a one-sentence justification. + + + +Apply categories in strict precedence order. Select the FIRST that applies. + + 1. false_positive + The CVE-to-package mapping is incorrect (wrong package, mismatched CVE). + + 2. code_not_present + The vulnerable library or code is absent from the container. + (Takes precedence over all downstream factors.) + + 3. code_not_reachable + Use when EITHER of the following applies: + (a) Call Chain Analyzer explicitly returned False (definitive test), OR + (b) Code Keyword Search found the vulnerable function ONLY in vendor/ directories + or architecture-specific dependency files (e.g. golang.org/x/sys/unix/zsysnum_*.go), + with ZERO occurrences in the main application source code. Absence of any + application-level call sites is sufficient evidence even without a successful CCA. + Do NOT use if code IS found in the main application source but reachability is + undetermined - use "uncertain" (11) in that case. + + 4. requires_configuration + Exploitation requires a specific configuration option that is disabled. + + 5. requires_dependency + Exploitation requires a dependency that is not installed. + + 6. requires_environment + Exploitation requires a specific environment that is absent. + + 7. compiler_protected + Compiler flags prevent successful exploitation. + + 8. runtime_protected + Runtime mechanisms (ASLR, DEP, sandboxing) prevent exploitation. + + 9. perimeter_protected + Network or perimeter defenses block the attack vector. + +10. mitigating_control_protected + Other security controls sufficiently reduce exploitability. + +11. uncertain + The investigation did not gather enough evidence to determine exploitability. + Use this when the summary is inconclusive or CCA was never run on a + reachability question. + +12. vulnerable + The package is genuinely exploitable and requires patching. + ALL of the following must be confirmed by tool evidence: + - Vulnerable code is present in the container. + - Vulnerable function is reachable (CCA returned True). + - Attacker-controlled input can trigger the vulnerable function. + - No effective mitigation prevents exploitation. + If ANY condition is unconfirmed, do NOT classify as "vulnerable". + Prefer "uncertain" when evidence is incomplete. + + + +1. Read the investigation summary carefully. +2. Test category 1 first, then 2, 3, 4 ... in order. +3. Select the FIRST category whose conditions are met. +4. Do not skip categories or jump to "vulnerable" without testing all prior ones. +5. "uncertain" is always preferable to "vulnerable" when evidence is insufficient. + + + +Exactly two lines, no labels: + +Line 1: category_name (exact name from the list, lower_snake_case) +Line 2: reasoning (one sentence citing the key evidence from the summary) + + + +code_not_present +Function Locator confirmed the vulnerable openssl library is not installed in the container. + +code_not_reachable +Call Chain Analyzer returned False - PIL.ImageMath.eval is never invoked from application code despite being present in the installed Pillow package. + +code_not_reachable +The vulnerable execve syscall wrapper was found only in vendor/golang.org/x/sys/unix auto-generated binding files (Application library dependencies), with no direct call sites in the main application source; CCA was attempted but failed due to FL validation error. + +uncertain +Reachability could not be confirmed: the vulnerable library is present in the container and the main application contains call sites, but CCA was never run and the investigation could not determine whether the function is reachable. + +vulnerable +Call Chain Analyzer returned True showing urllib.parse.urlparse is reachable from the public API handler, and no blocklisting mitigation is active. + + + +{summary} + + +Classification and reasoning:""" + +# --------------------------------------------------------------------------- +# Gate messages (prepended by cve_summarize when triggered) +# --------------------------------------------------------------------------- +PACKAGE_PRESENCE_GATE_MSG = ( + "PACKAGE PRESENCE GATE: Function Locator confirmed the vulnerable package " + "is NOT present in this container. A CVE cannot be exploitable if the " + "vulnerable package does not exist. Code matches from other packages are " + "irrelevant. The verdict MUST be 'not exploitable'.\n\n" +) + +REACHABILITY_GATE_MSG = ( + "REACHABILITY GATE: Call Chain Analyzer confirmed the vulnerable function " + "is NOT reachable from application code in ALL reachability checks performed. " + "An unreachable function cannot be exploited regardless of other findings " + "(missing mitigations, absent protections, etc. are irrelevant if the code " + "path is never executed). The verdict MUST be 'not exploitable'.\n\n" +) diff --git a/src/vuln_analysis/utils/intel_source_score.py b/src/vuln_analysis/utils/intel_source_score.py index 1c595344c..449d90252 100644 --- a/src/vuln_analysis/utils/intel_source_score.py +++ b/src/vuln_analysis/utils/intel_source_score.py @@ -20,12 +20,15 @@ from langchain_core.language_models.base import BaseLanguageModel from exploit_iq_commons.data_models.cve_intel import CveIntel +from vuln_analysis.utils import multi_prompt_factory from ..functions.cve_calculate_intel_score import CVECalculateIntelScoreConfig from exploit_iq_commons.utils import data_utils from ..utils.prompting import additional_intel_prompting from aiq.builder.framework_enum import LLMFrameworkEnum from exploit_iq_commons.logging.loggers_factory import LoggingFactory +from vuln_analysis.utils.multi_prompt_factory import parse_model_name, PromptId, get_prompt + logger = LoggingFactory.get_agent_logger(__name__) class IntelScorer: @@ -38,8 +41,10 @@ def __init__(self, async def calculate_intel_score(self, intel: CveIntel) -> CveIntel: llm = await self._builder.get_llm(llm_name=self._config.llm_name, wrapper_type=LLMFrameworkEnum.LANGCHAIN) assert isinstance(llm, BaseLanguageModel) - - response = await llm.ainvoke(self.__get_calculate_score_prompt(intel)) + parsed_model_name = parse_model_name(getattr(llm, "model_name", "")) + additional_intel_prompt = self.__render_template(additional_intel_prompting, intel) + prompt = get_prompt(PromptId.CALCULATE_INTEL_SCORE, llm=parsed_model_name, additional_intel_prompting=additional_intel_prompt) + response = await llm.ainvoke(prompt) if os.environ.get("EXTENDED_VERBOSE_DEBUG", False): logger.debug("\nresponse: %s", str(response.content)) diff --git a/src/vuln_analysis/utils/llama.py b/src/vuln_analysis/utils/llama.py new file mode 100644 index 000000000..33fab45a5 --- /dev/null +++ b/src/vuln_analysis/utils/llama.py @@ -0,0 +1,330 @@ +""" +llama-specific prompt definitions for the ExploitIQ vulnerability analysis pipeline. + +All prompts target llama-3.1-70b Instruct (parsed model family: "llama"). +Referenced by multi_prompt_factory._CATALOG when model_family == "llama". +""" + +# --------------------------------------------------------------------------- +# Checklist prompt +# --------------------------------------------------------------------------- +# Examples are pre-formatted at import time; {{ tool_descriptions }} is filled +# at runtime by generate_checklist() via Jinja2 rendering. + +from vuln_analysis.utils.justification_parser import JustificationParser +from vuln_analysis.utils.prompting import CVSS_PROMPT_TEMPLATE, CVSS_SYS_PROMPT, additional_intel_prompting + + +def _build_checklist_prompt() -> str: + from vuln_analysis.utils.prompting import get_mod_examples + examples = get_mod_examples() + return """ +Generate an investigation checklist for assessing CVE exploitability in a +containerized environment. Your output must be a comma-separated list enclosed +in square brackets, with each item enclosed in quotes. + + + +Create 3-5 checklist items that meet these requirements: + +1. STRUCTURE: Each item must be a clear, answerable question + - Start with interrogative words: Is/Does/Are/Can/Has/etc. + - Be specific and actionable + - Include relevant context from the CVE + +2. CONTENT PRIORITIES: + - If the CVE mentions a specific vulnerable function or method in a given package or library, the FIRST + checklist item must verify whether that function in that package or library is called or imported + in the codebase - function should be specified together with the package name, + for example : 'Is the function1 function from the package1 package called in the codebase?' + - Focus on exploitability factors (version presence is already confirmed) + - Include specific technical names from the CVE (functions, libraries, + configurations, cipher modes, etc.) + - Consider the attack vector (network exposure, user input, file processing, etc.) + - Address relevant security controls or mitigations + +3. INVESTIGATION TOOLS AVAILABLE: + {% raw %}{{ tool_descriptions }}{% endraw %} + + Design questions that can be answered using these analysis capabilities. + +4. COMPLETENESS: + - Cover the vulnerability chain: presence → usage → exploitability + - Each item should independently contribute to understanding exploit risk + + + +""" + examples + """ + + + + +""" + +def _build_summary_prompt() -> str: + return """ +Summarize CVE exploitability investigation results into a clear, evidence-based +paragraph. The investigation results consist of checklist items (questions) and +their corresponding conclusions from the security analysis. + + + +Write a 3-5 sentence paragraph following this structure: + +1. VERDICT (sentence 1): Begin with explicit statement + - "The CVE is exploitable" / "The CVE is not exploitable" / "Exploitability is uncertain" + +2. EVIDENCE (sentences 2-4): Support with specific findings + - Cite concrete results: functions found/absent, reachability status, configuration states + - Use technical details: function names, file paths, components + - Connect findings to exploitability conditions + +{{ LANG_ADJUSTMENT_1 }} + + + +The CVE is not exploitable in this container. Investigation confirmed that while +Python 3.10.0 is installed (vulnerable version), the urllib.parse module is never +imported or called in the application codebase (verified via code search). Additionally, +code analysis revealed that the application does not accept URL inputs from untrusted +sources; all URL handling occurs only with internally generated URLs from configuration +files. The combination of no urllib.parse usage and lack of external URL input eliminates +the attack vector described in the CVE. + + + +{response} + + +Write your summary paragraph:""" + +# end of function _build_summary_prompt() + +CALCULATE_INTEL_SCORE_PROMPT = """ +Evaluate CVE intelligence quality by scoring each criterion independently. + + + +Provide individual scores for each criterion based on the CVE data below. + +1. technical_specificity (max 20 points) + - How precise and in-depth are the technical details? + - Are specific vulnerable functions, methods, or code paths identified? + +2. clarity (max 10 points) + - Is the text well-structured and grammatically correct? + - Is the description clear and easy to understand? + +3. component_impact (max 15 points) + - Does it clearly state what is affected? + - Are the consequences explicitly described? + +4. reproducibility (max 15 points) + - Could an attacker understand how to exploit this from the description? + - Are attack vectors and preconditions described? + +5. vulnerable_function (max 15 points) + - Is a specific function, method, or code snippet named? + - Are vulnerable code locations identifiable? + +6. mitigation (max 10 points) + - Are patches, workarounds, or mitigations described? + - Is remediation guidance provided? + +7. environment (max 10 points) + - Is there context about the affected environment (OS, version, configuration)? + - Are deployment scenarios mentioned? + +8. configuration (max 5 points) + - Are relevant configuration settings or misconfigurations described? + + + +Return JSON only (no markdown, no code blocks): +{ + "scores": { + "technical_specificity": <0-20>, + "clarity": <0-10>, + "component_impact": <0-15>, + "reproducibility": <0-15>, + "vulnerable_function": <0-15>, + "mitigation": <0-10>, + "environment": <0-10>, + "configuration": <0-5> + }, + "justifications": { + "technical_specificity": "brief reason for score", + "clarity": "brief reason for score", + "component_impact": "brief reason for score", + "reproducibility": "brief reason for score", + "vulnerable_function": "brief reason for score", + "mitigation": "brief reason for score", + "environment": "brief reason for score", + "configuration": "brief reason for score" + } +} + +Do NOT calculate or include a total_score. Only provide the individual criterion scores. + + + + +Example Input: +CVE ID: CVE-2025-30204 +CVE Description: golang-jwt is a Go implementation of JSON Web Tokens. Starting in version 3.2.0 and prior to versions 5.2.2 and 4.5.2, the function parse.ParseUnverified splits (via a call to strings.Split) its argument (which is untrusted data) on periods. As a result, in the face of a malicious request whose Authorization header consists of Bearer followed by many period characters, a call to that function incurs allocations to the tune of O(n) bytes (where n stands for the length of the function's argument), with a constant factor of about 16. This issue is fixed in 5.2.2 and 4.5.2. +CVSS Vector: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:H +CWE Name: CWE-405: Asymmetric Resource Consumption (Amplification) +Notable Vendors: Hashicorp + +Example Output (High Quality ~80): +{ + "scores": { + "technical_specificity": 18, + "clarity": 9, + "component_impact": 14, + "reproducibility": 14, + "vulnerable_function": 15, + "mitigation": 9, + "environment": 9, + "configuration": 4 + }, + "justifications": { + "technical_specificity": "Function parse.ParseUnverified identified with O(n) allocation details", + "clarity": "Well-structured with clear impact", + "component_impact": "golang-jwt and DoS impact explicitly stated", + "reproducibility": "Clear attack: malicious Authorization header with periods", + "vulnerable_function": "parse.ParseUnverified explicitly named", + "mitigation": "Patches 5.2.2 and 4.5.2 specified", + "environment": "Versions 3.2.0-5.2.2 listed", + "configuration": "Limited config details" + } +} + +Example Input: +CVE ID: CVE-2022-29810 +CVE Description: The Hashicorp go-getter library before 1.5.11 does not redact an SSH key from a URL query parameter. +CVSS Vector: CVSS:3.1/AV:L/AC:L/PR:L/UI:N/S:U/C:H/I:N/A:N +CWE Name: CWE-532: Insertion of Sensitive Information into Log File +Notable Vendors: Hashicorp + +Example Output (Medium Quality ~62): +{ + "scores": { + "technical_specificity": 12, + "clarity": 8, + "component_impact": 12, + "reproducibility": 10, + "vulnerable_function": 8, + "mitigation": 8, + "environment": 8, + "configuration": 3 + }, + "justifications": { + "technical_specificity": "Moderate detail about SSH key issue", + "clarity": "Clear but brief", + "component_impact": "go-getter and credential exposure stated", + "reproducibility": "Attack path somewhat clear, lacks details", + "vulnerable_function": "General functionality, no specific function", + "mitigation": "Version 1.5.11 patches", + "environment": "Affected versions specified", + "configuration": "Minimal context" + } +} + +Example Input: +CVE ID: CVE-2022-2385 +CVE Description: A security issue was discovered in aws-iam-authenticator where an allow-listed IAM identity may be able to modify their username and escalate privileges. +CVSS Vector: CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H +CWE Name: CWE-20: Improper Input Validation +Notable Vendors: Kubernetes + +Example Output (Low Quality ~20): +{ + "scores": { + "technical_specificity": 3, + "clarity": 5, + "component_impact": 5, + "reproducibility": 2, + "vulnerable_function": 0, + "mitigation": 3, + "environment": 4, + "configuration": 0 + }, + "justifications": { + "technical_specificity": "Very vague, no detail", + "clarity": "Brief but understandable", + "component_impact": "General privilege escalation mention", + "reproducibility": "No exploit details", + "vulnerable_function": "No functions identified", + "mitigation": "Patch mentioned", + "environment": "Minimal version info", + "configuration": "None" + } +} + + + + +{{ additional_intel_prompting }}\n\n\nProvide your scoring JSON:""" + + + +CHECKLIST_PROMPT = _build_checklist_prompt() + +# --------------------------------------------------------------------------- +# Reachability agent system prompt +# --------------------------------------------------------------------------- +REACHABILITY_AGENT_SYS_PROMPT = ( + "You are a security analyst investigating CVE exploitability in container images.\n" + "MANDATORY STEPS (follow in order, do NOT skip any):\n" + "1. IDENTIFY the vulnerable component/function from the CVE description.\n" + "2. SEARCH for its presence using Code Keyword Search.\n" + "3. TRACE reachability using Call Chain Analyzer. " + " - Use the Function Locator to verify the package name and find the function name." + " - For Go: use Function Caller Finder to identify which application functions call the vulnerable library function, BEFORE running Call Chain Analyzer." + " - Keyword search alone is NOT sufficient -- you must trace the call chain.\n" + "4. ASSESS: only after completing reachability checks, determine exploitability.\n" + "GENERAL RULES:\n" + "- Base conclusions ONLY on tool results, not assumptions.\n" + "- If a search returns no results, that is evidence the code is absent.\n" + "- Do NOT claim a function is used unless a tool confirmed it.\n" + "- Code Keyword Search proves code PRESENCE in the container, NOT reachability.\n" + "- Function Locator validates package/function NAMES, NOT reachability. It confirms the name exists, not that it is called.\n" + "- Only Call Chain Analyzer can confirm reachability. The application may contain code it never calls.\n" + "- When the question asks whether a function is called or reachable, do NOT conclude based on Code Keyword Search or Function Locator alone -- you MUST use Call Chain Analyzer.\n" + "STOPPING RULES:\n" + "- POSITIVE reachability (Call Chain Analyzer returns True): you MAY conclude exploitable and finish.\n" + "- NEGATIVE reachability (Call Chain Analyzer returns False): record the result. " + "You may only conclude 'not exploitable' after Call Chain Analyzer has confirmed the function is not reachable.\n" + "ANSWER QUALITY:\n" + "- Answer the SPECIFIC question asked with evidence. Do NOT just report what tools found.\n" + "- Never give bare assertions (e.g. 'not exploitable'). Always state: WHAT you checked, WHAT you found, and WHY it leads to your conclusion.\n" + "- Distinguish between code being PRESENT (exists in container), REACHABLE (called from application code), and EXPLOITABLE (attacker-controlled input can trigger it). Do not conflate these.\n" + "- If tool results conflict with each other, state the conflict explicitly rather than silently picking one side.\n" + "- Finding that a security check is ABSENT is potential evidence of vulnerability, not evidence of safety.\n" + "- When citing evidence, explain HOW it relates to the question -- do not just state that something was found." +) + +# --------------------------------------------------------------------------- +# Summary prompt +# --------------------------------------------------------------------------- +SUMMARY_PROMPT = _build_summary_prompt() + + +# --------------------------------------------------------------------------- +# Justification prompt +# --------------------------------------------------------------------------- +JUSTIFICATION_PROMPT = JustificationParser().JUSTIFICATION_PROMPT + +# --------------------------------------------------------------------------- +# Gate messages (prepended by cve_summarize when triggered) +# --------------------------------------------------------------------------- +PACKAGE_PRESENCE_GATE_MSG = ( + "xxx" +) + +REACHABILITY_GATE_MSG = ( + "xxx" +) + +GENERATE_CVSS_PROMPT = f'{CVSS_SYS_PROMPT}\n\n{CVSS_PROMPT_TEMPLATE}' diff --git a/src/vuln_analysis/utils/multi_prompt_factory.py b/src/vuln_analysis/utils/multi_prompt_factory.py new file mode 100644 index 000000000..5b8a8b01b --- /dev/null +++ b/src/vuln_analysis/utils/multi_prompt_factory.py @@ -0,0 +1,501 @@ +import re +from enum import StrEnum +from jinja2 import Template + +from exploit_iq_commons.logging.loggers_factory import LoggingFactory + +logger = LoggingFactory.get_agent_logger(__name__) + +_MODEL_VERSION_PATTERN = re.compile(r"^\d+(?:\.\d+)*$") + +""" +multi_prompt_generator.py + +This module provides utilities for managing and rendering multiple prompt templates, +with support for language-specific adjustments and extensible prompt cataloging. It +defines enumerations for prompt IDs, language adjustments, and supported languages. +It also contains logic for resolving prompt templates with possible language adaptations, +primarily used in vulnerability analysis scenarios where prompts may need to be customized +based on the programming language or other criteria. + +Key components: +- PromptId: Enum to identify different prompt templates. +- LanguageAdjustmentsKey: Enum of keys that can be replaced within templates by language-specific values. +- Language: Supported programming languages. +- PromptCatalogError: Custom error raised when a prompt lookup fails. +- _CATALOG: Main dictionary structure holding prompt templates, data, and per-language adjustments. + +Example usage: +Modules using this utility can render prompts with proper language-specific adjustments, +fetch prompts safely by ID, and handle missing catalog entries gracefully. + +Users of this module shall call to function get_prompt with ther require parameters to get the prompt string +See main function for example usage. + +Below is informatoin how to maintain (add/edit) the prmopt catalog +_CATALOG is the prompt catalog dictionary, + The first level key is the prompt id which shall be one of the PromptId enum values. + The second level key is the llm family key, e.g: "llama". + Within the llm family key, the data dictionary contains prompts and language adjustments. + The llm family key can have multiple versions, e.g: "3.1", for each version, the data dictionary contains prompts and language adjustments in similiar structure. + +The prompts within the data dictionary are a list of strings, the get_prompt function will join the list of strings into a single string, +and then it will be rendered by the Jinja2 template engine with the kwargs provided in the get_prompt function call (named **ctx) +Besides the general template parameter provided by **ctx, the language adjustments dictionary will be also provided to the template engine. +The language adjustments dictionary (named language) defined within the llm (familiy/version) will contain the languages adjustments. +Note: In the prompt body, the language placeholders shall be defined as {{ LANG_ADJUSTMENT_1 }}, {{ LANG_ADJUSTMENT_2 }}. +Having multiple language placholders make the adjusments more flexible, allow one language to have more adjustments than other language. +language placeholders that remain after templating has completed will be deleted from the prompt. + +This string will be rendered by the Jinja2 template engine, the template engine will replace the placeholders with the actual values specified in the language adjustments dictionary and +""" + +class PromptId(StrEnum): + CHECKLIST_MAIN = "checklist_main" + REACHABILITY_AGENT_SYS = "reachability_agent_sys" + JUSTIFICATION = "justification" + SUMMARY = "summary" + CALCULATE_INTEL_SCORE = "calculate_intel_score" + AGENT_EXECUTOR = "agent_executor" + GENERATE_CVSS = "generate_cvss" + +# These are keys to be used within each language block, which allow to be replaced with the actual adjustment value. +# In case prompt define a key and the corresponding block of the given language has not been specified with, the key will be cleared from the prompt. +class LanguageAdjustmentsKey(StrEnum): + LANG_ADJUSTMENT_1 = "LANG_ADJUSTMENT_1" + LANG_ADJUSTMENT_2 = "LANG_ADJUSTMENT_2" + LANG_ADJUSTMENT_3 = "LANG_ADJUSTMENT_3" + +class LanguageAdjustments(StrEnum): + JAVA = "Java" + PYTHON = "Python" + JAVASCRIPT = "JavaScript" + C = "C" + GO = "Go" + GROUP1 = "Group1" + # Group1 is a placeholder for a group of languages, it is used to group languages that have the same language adjustments. + # The language to be grouped is to be determined by the user + + +class PromptCatalogError(KeyError): + """Raised when a prompt cannot be resolved from the catalog.""" + +def _gemma_catalog() -> dict: + """Build the gemma catalog entries from gemma.py constants.""" + from vuln_analysis.utils.gemma import ( + CHECKLIST_PROMPT as _CHKL, + REACHABILITY_AGENT_SYS_PROMPT as _AGENT, + JUSTIFICATION_PROMPT as _JUST, + SUMMARY_PROMPT as _SUMM, + ) + # Below are none Gemma specific prompts, therefor - are based on llama catalog + from vuln_analysis.utils.llama import ( + CALCULATE_INTEL_SCORE_PROMPT as _LLAMA_CALC, + GENERATE_CVSS_PROMPT as _LLAMA_GEN, + ) + return { + PromptId.CHECKLIST_MAIN: { + "gemma": { + "data": {"prompts": [_CHKL], "language": {}}, + "version": {"4": {"data": {"prompts": [_CHKL], "language": {}}}}, + } + }, + PromptId.CALCULATE_INTEL_SCORE: { + "gemma": { + "data": {"prompts": [_LLAMA_CALC], "language": {}}, + "version": {"3.1": {"data": {"prompts": [_LLAMA_CALC], "language": {}}}}, + } + }, + PromptId.REACHABILITY_AGENT_SYS: { + "gemma": { + "data": {"prompts": [_AGENT], "language": {}}, + "version": {"4": {"data": {"prompts": [_AGENT], "language": {}}}}, + } + }, + PromptId.GENERATE_CVSS: { + "gemma": { + "data": {"prompts": [_LLAMA_GEN], "language": {}}, + "version": {"3.1": {"data": {"prompts": [_LLAMA_GEN], "language": {}}}}, + } + }, + PromptId.JUSTIFICATION: { + "gemma": { + "data": {"prompts": [_JUST], "language": {}}, + "version": {"4": {"data": {"prompts": [_JUST], "language": {}}}}, + } + }, + PromptId.SUMMARY: { + "gemma": { + "data": {"prompts": [_SUMM], "language": {}}, + "version": {"4": {"data": {"prompts": [_SUMM], "language": {}}}}, + } + }, + } + + +def _granite_catalog() -> dict: + """Build the granite catalog entries from granite.py constants.""" + from vuln_analysis.utils.granite import ( + CHECKLIST_PROMPT as _CHKL, + REACHABILITY_AGENT_SYS_PROMPT as _AGENT, + JUSTIFICATION_PROMPT as _JUST, + SUMMARY_PROMPT as _SUMM, + ) + # Below are none Gemma specific prompts, therefor - are based on llama catalog + from vuln_analysis.utils.llama import ( + CALCULATE_INTEL_SCORE_PROMPT as _LLAMA_CALC, + GENERATE_CVSS_PROMPT as _LLAMA_GEN, + ) + return { + PromptId.CHECKLIST_MAIN: { + "granite": { + "data": {"prompts": [_CHKL], "language": {}}, + "version": {"4.1": {"data": {"prompts": [_CHKL], "language": {}}}}, + } + }, + PromptId.CALCULATE_INTEL_SCORE: { + "granite": { + "data": {"prompts": [_LLAMA_CALC], "language": {}}, + "version": {"3.1": {"data": {"prompts": [_LLAMA_CALC], "language": {}}}}, + } + }, + PromptId.REACHABILITY_AGENT_SYS: { + "granite": { + "data": {"prompts": [_AGENT], "language": {}}, + "version": {"4.1": {"data": {"prompts": [_AGENT], "language": {}}}}, + } + }, + PromptId.GENERATE_CVSS: { + "granite": { + "data": {"prompts": [_LLAMA_GEN], "language": {}}, + "version": {"3.1": {"data": {"prompts": [_LLAMA_GEN], "language": {}}}}, + } + }, + PromptId.JUSTIFICATION: { + "granite": { + "data": {"prompts": [_JUST], "language": {}}, + "version": {"4.1": {"data": {"prompts": [_JUST], "language": {}}}}, + } + }, + PromptId.SUMMARY: { + "granite": { + "data": {"prompts": [_SUMM], "language": {}}, + "version": {"4.1": {"data": {"prompts": [_SUMM], "language": {}}}}, + } + }, + } + +common_language_checklist_language_adjustment = ( + "If the CVE mentions a specific vulnerable function or method in a given package or library, the FIRST\n" + " checklist item must verify whether that function in that package or library is called or imported\n" + " in the codebase - function should be specified together with the package name,\n" + " for example : 'Is the function1 function from the package1 package called in the codebase?'" + ) + +java_language_checklist_language_adjustment = ( + "If the CVE mentions a specific vulnerable function or method, the FIRST checklist item must use\n" + " the EXACT function/method name from the CVE description (preserve Class.method format if present).\n" + " Example: 'Is the Class.method function from the package called in the codebase?'\n" + " - If NO specific function is named, infer the entry point method from the attack vector and library\n" + " purpose (e.g., serialization library + input manipulation → fromXML/parse/unmarshal). Always\n" + " name the specific method in the first checklist item." + ) + +java_language_checklist_language_adjustment_func_inference = ( + "\n- If no specific function named, infer the entry point method from the attack " + "vector and library purpose, and name it in the first checklist item" + ) + +java_language_checklist_language_adjustment_version_guidance = ( + "\n- If no specific function named, infer the entry point method from the attack " + "vector and library purpose, and name it in the first checklist item" + ) + +common_language_checklist_language_adjustment_func_inference = "" +common_language_checklist_language_adjustment_version_guidance = "" + +java_language_summary_adjustment = ("3. FOCUS: Use only definitive checklist results; ignore inconclusive items\n\n" + "4. CRITICAL: If ANY checklist item reports that Call Chain Analyzer confirmed a vulnerable\n" + " function is REACHABLE (True) from application code, the verdict MUST be \"exploitable\"\n" + " unless another item provides definitive contrary evidence (e.g., version check confirmed\n" + " the installed version is fixed). Negative results from other items using different function\n" + " names or wrong inputs do NOT override a confirmed positive reachability finding.") +common_language_summary_adjustment = "3. FOCUS: Use only definitive checklist results; ignore inconclusive items" + +def _llama_catalog() -> dict: + """Build the llama catalog entries from llama.py constants.""" + from vuln_analysis.utils.llama import ( + CALCULATE_INTEL_SCORE_PROMPT as _CALC, + CHECKLIST_PROMPT as _CHKL, + REACHABILITY_AGENT_SYS_PROMPT as _AGENT, + JUSTIFICATION_PROMPT as _JUST, + SUMMARY_PROMPT as _SUMM, + GENERATE_CVSS_PROMPT as _GEN, + ) + return { + PromptId.CHECKLIST_MAIN: { + "llama": { + "data": {"prompts": [_CHKL], "language": {} + }, + "version": {"3.1": {"data": {"prompts": [_CHKL], "language": {}}}}, + } + }, + PromptId.CALCULATE_INTEL_SCORE: { + "llama": { + "data": {"prompts": [_CALC], "language": {}}, + "version": {"3.1": {"data": {"prompts": [_CALC], "language": {}}}}, + } + }, + PromptId.REACHABILITY_AGENT_SYS: { + "llama": { + "data": {"prompts": [_AGENT], "language": {}}, + "version": {"3.1": {"data": {"prompts": [_AGENT], "language": {}}}}, + } + }, + PromptId.GENERATE_CVSS: { + "llama": { + "data": {"prompts": [_GEN], "language": {}}, + "version": {"3.1": {"data": {"prompts": [_GEN], "language": {}}}}, + } + }, + PromptId.JUSTIFICATION: { + "llama": { + "data": {"prompts": [_JUST], "language": {}}, + "version": {"3.1": {"data": {"prompts": [_JUST], "language": {}}}}, + } + }, + PromptId.SUMMARY: { + "llama": { + "data": {"prompts": [_SUMM], "language": { + LanguageAdjustments.JAVA.value: { + # Thess keys be refer from the prompt template + LanguageAdjustmentsKey.LANG_ADJUSTMENT_1.value: java_language_summary_adjustment + }, + # In below GROUP1 is all languages that are not JAVA + LanguageAdjustments.GROUP1.value: { + LanguageAdjustmentsKey.LANG_ADJUSTMENT_1.value: common_language_summary_adjustment + }, + }}, + "version": {"3.1": {"data": {"prompts": [_SUMM], "language": { + LanguageAdjustments.JAVA.value: { + # Thess keys be refer from the prompt template + LanguageAdjustmentsKey.LANG_ADJUSTMENT_1.value: java_language_summary_adjustment + }, + # In below GROUP1 is all languages that are not JAVA + LanguageAdjustments.GROUP1.value: { + LanguageAdjustmentsKey.LANG_ADJUSTMENT_1.value: common_language_summary_adjustment + }, + }}}}, + } + }, + } + +# Use lower case for the llm broad family, (parse_model_name function will return llm family in lower case) +_CATALOG: dict[PromptId, dict] = { + PromptId.CHECKLIST_MAIN: { + # Broad Family + "llama-sample": { + # Data for Broad family + "data": { + "prompts": [ + "First paragraph of llama prompt {{ LANG_ADJUSTMENT_1 }}" + "second line of first paragraph of llama prompt {{ LANG_ADJUSTMENT_2 }}", + "Second prompt line" + ], + # Language adjusments + "language": { + LanguageAdjustments.JAVA.value: { + # Thess keys be refer from the prompt template + LanguageAdjustmentsKey.LANG_ADJUSTMENT_1.value: "Java adjustment 1 for llama", + LanguageAdjustmentsKey.LANG_ADJUSTMENT_2.value: "Java adjustment 2 for llama", + }, + LanguageAdjustments.GO.value: { + LanguageAdjustmentsKey.LANG_ADJUSTMENT_1.value: "Go adjustment 1 for llama", + LanguageAdjustmentsKey.LANG_ADJUSTMENT_2.value: "Go adjustment 2 for llama", + } + } + }, + # Using sub-comp, to allow generic strucure that allow recursive search.. + "version": { + "3.1": { + # Data for llama-3.1 + "data": { + "prompts": [ + "First paragraph of llama 3.1 prompt language adjuments 1 {{ LANG_ADJUSTMENT_1 }} after language adjustment key\n" + "second line of first paragraph of llama prompt language adjuments 2 {{ LANG_ADJUSTMENT_2 }} after language adjustment key\n", + "third line of first paragraph of llama prompt ", + "Second prompt line {{ xxx }} {{ yyy }} " + ], + # Language adjusments + "language": { + LanguageAdjustments.JAVA.value: { + LanguageAdjustmentsKey.LANG_ADJUSTMENT_1.value: "Java adjustment 1 for llama 3.1", + LanguageAdjustmentsKey.LANG_ADJUSTMENT_2.value: "Java adjustment 2 for llama 3.1" + }, + LanguageAdjustments.GO.value: { + LanguageAdjustmentsKey.LANG_ADJUSTMENT_1.value: "Go adjustment 1 for llama 3.1", + LanguageAdjustmentsKey.LANG_ADJUSTMENT_2.value: "Go adjustment 2 for llama 3.1" + } + } + } + } + } + } + } +} + +# Merge model-family entries into the catalog at import time +for _pid, _entry in _granite_catalog().items(): + _CATALOG.setdefault(_pid, {}).update(_entry) + +for _pid, _entry in _gemma_catalog().items(): + _CATALOG.setdefault(_pid, {}).update(_entry) + +for _pid, _entry in _llama_catalog().items(): + _CATALOG.setdefault(_pid, {}).update(_entry) + + +def get_prompt(prompt_id: PromptId, llm: list[str] | None = None, language: LanguageAdjustments | None = None, **ctx: str) -> str: + # INSERT_YOUR_CODE + """ + Retrieve a prompt string from the catalog based on the given parameters. + + Args: + prompt_id (PromptId): The identifier for the prompt. + llm (list[str] | None): A list containing the LLM family key and optionally a version key. + language (Language | None): The programming language context for possible prompt adjustments. + **ctx (str): Additional context variables for template substitution. + + Returns: + str: The final filled prompt string ready for LLM use. + + Raises: + PromptCatalogError: If any of the supplied parameters are invalid, or the prompt entry is missing. + + When a version is requested but not present in the catalog, the family-level prompt + is returned and a warning is emitted. + """ + logger.debug("Get prompt for prompt_id: %s, llm: %s, language: %s", prompt_id, llm, language) + if prompt_id not in _CATALOG: + raise PromptCatalogError(f"Prompt id {prompt_id!r} not found in catalog") + + selected_prompt_root: dict = _CATALOG[prompt_id] + + if llm is None: + raise PromptCatalogError("llm cannot be None") + + if not llm: + raise PromptCatalogError("llm must contain at least one key") + + llm_key = llm[0] + if llm_key not in selected_prompt_root: + raise PromptCatalogError( + f"LLM key {llm_key!r} not found in catalog entry for {prompt_id!r}" + ) + selected_prompt_root = selected_prompt_root[llm_key] + + if len(llm) > 1 and llm[1]: + version_key = llm[1] + versions = selected_prompt_root.get("version") + if versions is not None and version_key in versions: + selected_prompt_root = versions[version_key] + else: + logger.warning( + f"Catalog did not have entry for version {version_key!r} " + f"under LLM family {llm_key!r} for prompt {prompt_id!r}; " + f"using family-level prompt instead." + ) + + try: + prompts = selected_prompt_root["data"]["prompts"] + except KeyError as exc: + raise PromptCatalogError( + f"No prompts found under resolved catalog node for {prompt_id!r}" + ) from exc + + if not isinstance(prompts, list): + raise PromptCatalogError( + f"Expected prompts to be a list for {prompt_id!r}, got {type(prompts).__name__}" + ) + + prompt = " ".join(prompts) + template = Template(prompt) + + if language is not None: + if not isinstance(language, LanguageAdjustments): + try: + language = Language(language) + except ValueError as exc: + supported = [member.value for member in Language] + raise PromptCatalogError( + f"Unsupported language {language!r}; supported values: {supported}" + ) from exc + + language_blocks = selected_prompt_root.get("data", {}).get("language") + if language_blocks is None or language.value not in language_blocks: + print( + f"Language adjustments are not defined for given language {language.value!r}" + ) + else: + language_adjusments_dict = language_blocks[language.value] + # print(f"language_adjusments_dict: {language_adjusments_dict}") + # print(f"prompt before rendering: {prompt}") + rendered_prompt = template.render(**language_adjusments_dict, **ctx) + # print(rendered_prompt) + return rendered_prompt + + return template.render(**ctx) + + +def parse_model_name(model_name: str) -> list[str]: + """ + Parse a model name into LLM family and version for the prompt catalog. + + Model names follow: + [provider or organization]/[model-name-and-parameters]-[variant] + + Example: + meta/llama-3.1-70b-instruct -> ["llama", "3.1"] + + Args: + model_name: Full model identifier (e.g. meta/llama-3.1-70b-instruct). + + Returns: + A two-element list: [family, version]. Version is an empty string when + the model slug has no version segment after the family name. + """ + model_slug = model_name.split("/", 1)[-1] + parts = model_slug.split("-") + if not parts or not parts[0]: + raise ValueError(f"Invalid model name: {model_name!r}") + + # In case meta provider is specified in the location of llm family, remove it and resume from that location + # this was added to support model name such as hugging-quants/Meta-Llama-3.1-70B which are not aligned with the standard model name + if parts[0].lower() == "meta": + parts = parts[1:] + + family = parts[0].lower() + version = "" + if len(parts) > 1 and _MODEL_VERSION_PATTERN.match(parts[1]): + version = parts[1] + + return [family, version] + + +def main() -> None: + # print(_CATALOG) + print("=== Prompt for llama 3.1 for Java") + print(get_prompt(PromptId.CHECKLIST_MAIN, llm=["llama", "3.1"], language=LanguageAdjustments.JAVA, xxx="xxx1", yyy="yyy1")) + print("=== Prompt for llama for Java") + print(get_prompt(PromptId.CHECKLIST_MAIN, llm=["llama"], language=LanguageAdjustments.JAVA, xxx="xxx1", yyy="yyy1")) + print("=== Prompt for llama 3.1 for Go") + print(get_prompt(PromptId.CHECKLIST_MAIN, llm=["llama", "3.1"], language=LanguageAdjustments.GO, xxx="xxx1", yyy="yyy1")) + print("=== Prompt for llama for Go") + print(get_prompt(PromptId.CHECKLIST_MAIN, llm=["llama"], language=LanguageAdjustments.GO, xxx="xxx1", yyy="yyy1")) + print("=== Prompt for llama 3.9 for Go") + # 3.9 is not in the catalog, so it will use the family-level prompt + print(get_prompt(PromptId.CHECKLIST_MAIN, llm=["llama", "3.9"], language=LanguageAdjustments.GO, xxx="xxx1", yyy="yyy1")) + + +if __name__ == "__main__": + main() diff --git a/src/vuln_analysis/utils/output_formatter.py b/src/vuln_analysis/utils/output_formatter.py index 5bbbe5e60..99a7ca8fa 100644 --- a/src/vuln_analysis/utils/output_formatter.py +++ b/src/vuln_analysis/utils/output_formatter.py @@ -104,12 +104,12 @@ def _add_header(markdown_content, model_dict: AgentMorpheusOutput): """ input_image = model_dict.input.image # iterate over a list of dict objects, with vuln_id and ghsa being 2 keys in each element - for output in model_dict.output: + for output in model_dict.output.analysis: cve_id = output.vuln_id markdown_content[cve_id].append(f"# Vulnerability Analysis Report for {cve_id}") markdown_content[cve_id].append(f"> **Container Analyzed:** `{input_image.name}:{input_image.tag}`\n\n") # Only add SBOM info if it is a file location - if input_image.sbom_info.type == "file": + if input_image.sbom_info is not None and input_image.sbom_info.type == "file": markdown_content[cve_id].append(f"> **SBOM Info:** `{input_image.sbom_info}`\n\n") markdown_content[cve_id].append(f"> **Status:** {_get_expoiltability_text(output.justification.status)}") @@ -279,7 +279,7 @@ def _add_table_of_contents(markdown_content, model_dict: AgentMorpheusOutput): None This function modifies `markdown_content` in place. """ - for entry in model_dict.output: + for entry in model_dict.output.analysis: cve_id = entry.vuln_id checklist = entry.checklist markdown_content[cve_id].append("### Checklist ") @@ -313,7 +313,7 @@ def _add_checklist_info(markdown_content, model_dict: AgentMorpheusOutput): None This function modifies `markdown_content` in place. """ - for entry in model_dict.output: + for entry in model_dict.output.analysis: cve_id = entry.vuln_id checklist = entry.checklist if checklist: @@ -424,7 +424,7 @@ def _add_vulnerability_analysis(markdown_content, model_dict: AgentMorpheusOutpu None This function modifies `markdown_content` in place. """ - for entry in model_dict.output: + for entry in model_dict.output.analysis: cve_id = entry.vuln_id summary = entry.summary justification = entry.justification diff --git a/src/vuln_analysis/utils/prompt_factory.py b/src/vuln_analysis/utils/prompt_factory.py index 93c70ec16..6000491e3 100644 --- a/src/vuln_analysis/utils/prompt_factory.py +++ b/src/vuln_analysis/utils/prompt_factory.py @@ -151,7 +151,9 @@ "java": ( "Use Function Locator first with maven GAV format (group:artifact:version,ClassName.methodName). " "For reachability, use Call Chain Analyzer with validated names from Function Locator. " - "Use Code Keyword Search for import com. and javax. patterns; Docs Semantic Search for Spring/servlet architecture." + "Use Code Keyword Search for import com. and javax. patterns; Docs Semantic Search for Spring/servlet architecture. " + "When using Function Library Version Finder, use the full versioned GAV that Function Locator auto-resolved " + "(e.g. com.thoughtworks.xstream:xstream:1.4.18), not just the short artifact name (xstream)." ), "javascript": ( "Use Code Keyword Search first for require(, import {, and package patterns. " diff --git a/src/vuln_analysis/utils/tests/test_multi_prompt_factory.py b/src/vuln_analysis/utils/tests/test_multi_prompt_factory.py new file mode 100644 index 000000000..dc841e9b4 --- /dev/null +++ b/src/vuln_analysis/utils/tests/test_multi_prompt_factory.py @@ -0,0 +1,144 @@ +import pytest + +from vuln_analysis.utils.multi_prompt_factory import ( + LanguageAdjustments, + PromptCatalogError, + PromptId, + LanguageAdjustmentsKey, + get_prompt, + parse_model_name, + _CATALOG +) + +# llama 3.1 family +prompt_llama_3_1_generic = "First paragraph of llama 3.1 prompt language adjuments 1 {{ LANG_ADJUSTMENT_1 }} {{ xxx }}" +prompt_llama_3_1_java_adjusments = "Java adjustment 1 for llama 3.1" +prompt_llama_3_1_go_adjusments = "Go adjustment 1 for llama 3.1" +xxx_variable_evulation = "xxx1" +yyy_variable_evulation = "yyy1" + +prompt_llama_3_1_java_evaluated = f"First paragraph of llama 3.1 prompt language adjuments 1 {prompt_llama_3_1_java_adjusments} {xxx_variable_evulation}" +prompt_llama_3_1_go_evaluated = f"First paragraph of llama 3.1 prompt language adjuments 1 {prompt_llama_3_1_go_adjusments} {xxx_variable_evulation}" + +llm_key_name = "llama-test" +llm_version_name = "3.1" + +# llama family +prompt_llama_generic = "First paragraph of llama prompt language adjuments 1 {{ LANG_ADJUSTMENT_1 }} {{ xxx }}" +prompt_llama_java_adjusments = "Java adjustment 1 for llama" +prompt_llama_go_adjusments = "Go adjustment 1 for llama" +prompt_llama_java_evaluated = f"First paragraph of llama prompt language adjuments 1 {prompt_llama_java_adjusments} {xxx_variable_evulation}" +prompt_llama_go_evaluated = f"First paragraph of llama prompt language adjuments 1 {prompt_llama_go_adjusments} {xxx_variable_evulation}" + +llm_family_content = { + # Data for Broad family + "data": { + "prompts": [ + prompt_llama_generic + ], + # Language adjusments + "language": { + LanguageAdjustments.JAVA.value: { + LanguageAdjustmentsKey.LANG_ADJUSTMENT_1.value: prompt_llama_java_adjusments + }, + LanguageAdjustments.GO.value: { + LanguageAdjustmentsKey.LANG_ADJUSTMENT_1.value: prompt_llama_go_adjusments + } + } + }, + # Using sub-comp, to allow generic strucure that allow recursive search.. + "version": { + "3.1": { + # Data for llama-3.1 + "data": { + "prompts": [ + prompt_llama_3_1_generic + ], + # Language adjusments + "language": { + LanguageAdjustments.JAVA.value: { + LanguageAdjustmentsKey.LANG_ADJUSTMENT_1.value: prompt_llama_3_1_java_adjusments + }, + LanguageAdjustments.GO.value: { + LanguageAdjustmentsKey.LANG_ADJUSTMENT_1.value: prompt_llama_3_1_go_adjusments + } + } + } + } + } + } + +_CATALOG[PromptId.CHECKLIST_MAIN][llm_key_name] = llm_family_content + +def test_parse_model_name_extracts_family_and_version() -> None: + assert parse_model_name("meta/llama-3.1-70b-instruct") == ["llama", "3.1"] + + +def test_parse_model_name_without_provider_prefix() -> None: + assert parse_model_name("llama-3.1-70b-instruct") == ["llama", "3.1"] + + +def test_parse_model_name_without_version_segment() -> None: + assert parse_model_name("meta/llama-70b-instruct") == ["llama", ""] + + +def test_parse_model_name_raises_on_empty_slug() -> None: + with pytest.raises(ValueError, match="Invalid model name"): + parse_model_name("meta/") + + +def test_get_prompt_raises_when_llm_family_not_in_catalog() -> None: + with pytest.raises(PromptCatalogError, match="llama-not-exist"): + get_prompt( + PromptId.CHECKLIST_MAIN, + llm=["llama-not-exist", "3.1"], + language=LanguageAdjustments.JAVA, + xxx=xxx_variable_evulation, + yyy=yyy_variable_evulation, + ) + +def test_get_prompt_falls_back_to_family_when_version_not_in_catalog() -> None: + result = get_prompt( + PromptId.CHECKLIST_MAIN, + llm=[llm_key_name, "9.9"], + language=LanguageAdjustments.JAVA, + xxx=xxx_variable_evulation, + yyy=yyy_variable_evulation + ) + assert result == prompt_llama_java_evaluated + +def test_get_prompt_succeeds_for_llama_3_1_java() -> None: + result = get_prompt( + PromptId.CHECKLIST_MAIN, + llm=[llm_key_name, "3.1"], + language=LanguageAdjustments.JAVA, + xxx=xxx_variable_evulation + ) + assert result == (prompt_llama_3_1_java_evaluated) + +def test_get_prompt_succeeds_for_llama_3_1_go() -> None: + result = get_prompt( + PromptId.CHECKLIST_MAIN, + llm=[llm_key_name, "3.1"], + language=LanguageAdjustments.GO, + xxx=xxx_variable_evulation + ) + assert result == (prompt_llama_3_1_go_evaluated) + +def test_get_prompt_succeeds_for_llama_java() -> None: + result = get_prompt( + PromptId.CHECKLIST_MAIN, + llm=[llm_key_name], + language=LanguageAdjustments.JAVA, + xxx=xxx_variable_evulation + ) + assert result == (prompt_llama_java_evaluated) + +def test_get_prompt_succeeds_for_llama_go() -> None: + result = get_prompt( + PromptId.CHECKLIST_MAIN, + llm=[llm_key_name], + language=LanguageAdjustments.GO, + xxx=xxx_variable_evulation + ) + assert result == (prompt_llama_go_evaluated)