diff --git a/backend/requirements.txt b/backend/requirements.txt index b7d7a851..e4291eb4 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -10,3 +10,4 @@ python-multipart>=0.0.9 xhtml2pdf>=0.2.17 aiosqlite>=0.20.0 python-whois>=0.9.4 +httpx>=0.28.1 diff --git a/backend/secuscan/notification_service.py b/backend/secuscan/notification_service.py new file mode 100644 index 00000000..9cd64fab --- /dev/null +++ b/backend/secuscan/notification_service.py @@ -0,0 +1,265 @@ +""" +Notification delivery service for high-severity findings. + +Evaluates active rules, deduplicates deliveries, redacts alert payloads, +and records outcomes in notification_history. Webhook delivery is live; +email is a logged placeholder until SMTP is added. +""" + +from __future__ import annotations + +import json +import logging +import uuid +from dataclasses import dataclass +from typing import Any, Dict, List, Optional + +import httpx + +from .database import Database +from .models import NotificationChannelType, NotificationDeliveryStatus +from .redaction import redact_dict, redact_inputs + +logger = logging.getLogger(__name__) + +# Lower rank = more severe. A finding meets the threshold when its rank is +# less than or equal to the rule threshold rank. +_SEVERITY_RANK: Dict[str, int] = { + "critical": 0, + "high": 1, + "medium": 2, + "low": 3, + "info": 4, +} + +_WEBHOOK_TIMEOUT_SECONDS = 10.0 +_USER_AGENT = "SecuScan-Notifications/1.0" + + +@dataclass(frozen=True) +class DeliveryResult: + """Outcome of a single rule delivery attempt for one finding.""" + + rule_id: str + finding_id: str + status: NotificationDeliveryStatus + skipped: bool = False + error_message: Optional[str] = None + + +def severity_meets_threshold(finding_severity: str, rule_threshold: str) -> bool: + """Return True when finding severity is at or above the rule threshold.""" + finding_rank = _SEVERITY_RANK.get(str(finding_severity).lower()) + threshold_rank = _SEVERITY_RANK.get(str(rule_threshold).lower()) + if finding_rank is None or threshold_rank is None: + return False + return finding_rank <= threshold_rank + + +def build_alert_payload( + finding: Dict[str, Any], + rule: Dict[str, Any], +) -> Dict[str, Any]: + """Build a redacted JSON-serializable alert payload for outbound channels.""" + metadata: Dict[str, Any] = {} + raw_metadata = finding.get("metadata_json") + if raw_metadata: + try: + parsed = json.loads(raw_metadata) + if isinstance(parsed, dict): + metadata = redact_inputs(parsed) + except (TypeError, json.JSONDecodeError): + metadata = {"raw": str(raw_metadata)} + + payload = { + "event": "finding.alert", + "rule": { + "id": rule.get("id"), + "name": rule.get("name"), + "severity_threshold": rule.get("severity_threshold"), + "channel_type": rule.get("channel_type"), + }, + "finding": { + "id": finding.get("id"), + "task_id": finding.get("task_id"), + "plugin_id": finding.get("plugin_id"), + "title": finding.get("title"), + "category": finding.get("category"), + "severity": finding.get("severity"), + "target": finding.get("target"), + "description": finding.get("description"), + "remediation": finding.get("remediation"), + "metadata": metadata, + }, + } + return redact_dict(payload) + + +async def was_already_delivered( + db: Database, + rule_id: str, + finding_id: str, +) -> bool: + """Return True when this rule already successfully notified this finding.""" + row = await db.fetchone( + """ + SELECT id FROM notification_history + WHERE rule_id = ? AND finding_id = ? AND status = ? + LIMIT 1 + """, + (rule_id, finding_id, NotificationDeliveryStatus.SUCCESS.value), + ) + return row is not None + + +async def record_delivery( + db: Database, + rule_id: str, + finding_id: str, + status: NotificationDeliveryStatus, + error_message: Optional[str] = None, +) -> str: + """Persist a delivery attempt and return the history row id.""" + history_id = str(uuid.uuid4()) + await db.execute( + """ + INSERT INTO notification_history (id, rule_id, finding_id, status, error_message) + VALUES (?, ?, ?, ?, ?) + """, + ( + history_id, + rule_id, + finding_id, + status.value, + error_message, + ), + ) + return history_id + + +async def send_webhook(target_url: str, payload: Dict[str, Any]) -> tuple[bool, Optional[str]]: + """POST a redacted alert payload to a webhook URL.""" + try: + async with httpx.AsyncClient(timeout=_WEBHOOK_TIMEOUT_SECONDS) as client: + response = await client.post( + target_url, + json=payload, + headers={ + "Content-Type": "application/json", + "User-Agent": _USER_AGENT, + }, + ) + if response.status_code >= 400: + return False, f"Webhook returned HTTP {response.status_code}" + return True, None + except httpx.HTTPError as exc: + return False, str(exc) + + +async def send_email_placeholder( + target_email: str, + payload: Dict[str, Any], +) -> tuple[bool, Optional[str]]: + """Placeholder email channel — logs intent without sending mail yet.""" + logger.info( + "Email notification placeholder: target=%s finding_id=%s (delivery not implemented)", + target_email, + payload.get("finding", {}).get("id"), + ) + return True, None + + +async def deliver_via_rule( + db: Database, + rule: Dict[str, Any], + finding: Dict[str, Any], +) -> DeliveryResult: + """Attempt delivery for one rule/finding pair.""" + rule_id = str(rule["id"]) + finding_id = str(finding["id"]) + + if not bool(rule.get("is_active")): + return DeliveryResult( + rule_id=rule_id, + finding_id=finding_id, + status=NotificationDeliveryStatus.FAILED, + skipped=True, + error_message="Rule is inactive", + ) + + if not severity_meets_threshold( + str(finding.get("severity", "info")), + str(rule.get("severity_threshold", "info")), + ): + return DeliveryResult( + rule_id=rule_id, + finding_id=finding_id, + status=NotificationDeliveryStatus.FAILED, + skipped=True, + error_message="Finding severity below rule threshold", + ) + + if await was_already_delivered(db, rule_id, finding_id): + return DeliveryResult( + rule_id=rule_id, + finding_id=finding_id, + status=NotificationDeliveryStatus.SUCCESS, + skipped=True, + error_message="Already delivered", + ) + + payload = build_alert_payload(finding, rule) + channel = str(rule.get("channel_type", "")).lower() + target = str(rule.get("target_url_or_email", "")) + + if channel == NotificationChannelType.WEBHOOK.value: + ok, error = await send_webhook(target, payload) + elif channel == NotificationChannelType.EMAIL.value: + ok, error = await send_email_placeholder(target, payload) + else: + ok, error = False, f"Unsupported channel type: {channel}" + + status = ( + NotificationDeliveryStatus.SUCCESS if ok else NotificationDeliveryStatus.FAILED + ) + await record_delivery(db, rule_id, finding_id, status, error) + + return DeliveryResult( + rule_id=rule_id, + finding_id=finding_id, + status=status, + error_message=error, + ) + + +async def process_finding_notifications( + db: Database, + finding_id: str, +) -> List[DeliveryResult]: + """Evaluate all active rules against one finding and attempt delivery.""" + finding = await db.fetchone("SELECT * FROM findings WHERE id = ?", (finding_id,)) + if not finding: + return [] + + rules = await db.fetchall( + "SELECT * FROM notification_rules WHERE is_active = 1 ORDER BY created_at ASC" + ) + results: List[DeliveryResult] = [] + for rule in rules: + results.append(await deliver_via_rule(db, rule, finding)) + return results + + +async def process_task_notifications( + db: Database, + task_id: str, +) -> List[DeliveryResult]: + """Evaluate notifications for every finding produced by a task.""" + findings = await db.fetchall( + "SELECT id FROM findings WHERE task_id = ? ORDER BY discovered_at ASC", + (task_id,), + ) + results: List[DeliveryResult] = [] + for row in findings: + results.extend(await process_finding_notifications(db, str(row["id"]))) + return results diff --git a/testing/backend/unit/test_notification_service.py b/testing/backend/unit/test_notification_service.py new file mode 100644 index 00000000..2c94789c --- /dev/null +++ b/testing/backend/unit/test_notification_service.py @@ -0,0 +1,212 @@ +import json +import uuid +from unittest.mock import AsyncMock, patch + +import pytest +import pytest_asyncio + +from backend.secuscan import database as database_module +from backend.secuscan.config import settings +from backend.secuscan.database import init_db +from backend.secuscan.models import ( + NotificationChannelType, + NotificationDeliveryStatus, + NotificationSeverityThreshold, +) +from backend.secuscan.notification_service import ( + build_alert_payload, + deliver_via_rule, + process_finding_notifications, + severity_meets_threshold, + was_already_delivered, +) +from backend.secuscan.redaction import REDACTED + + +@pytest_asyncio.fixture +async def test_db(setup_test_environment): + db = await init_db(settings.database_path) + yield db + if database_module.db is not None: + await database_module.db.disconnect() + database_module.db = None + + +async def _seed_finding( + db, + *, + severity: str = "critical", + description: str = "Open port on target", +) -> tuple[str, str]: + task_id = str(uuid.uuid4()) + finding_id = str(uuid.uuid4()) + await db.execute( + """ + INSERT INTO tasks ( + id, plugin_id, tool_name, target, status, inputs_json, consent_granted + ) VALUES (?, 'nmap', 'nmap', '127.0.0.1', 'completed', '{}', 1) + """, + (task_id,), + ) + await db.execute( + """ + INSERT INTO findings ( + id, task_id, plugin_id, title, category, severity, target, description, remediation + ) VALUES (?, ?, 'nmap', 'Open port', 'network', ?, '127.0.0.1', ?, 'fix') + """, + (finding_id, task_id, severity, description), + ) + return task_id, finding_id + + +async def _seed_rule( + db, + *, + severity_threshold: str = NotificationSeverityThreshold.HIGH.value, + channel_type: str = NotificationChannelType.WEBHOOK.value, + target: str = "https://example.com/hook", + is_active: int = 1, +) -> str: + rule_id = str(uuid.uuid4()) + await db.execute( + """ + INSERT INTO notification_rules ( + id, name, severity_threshold, channel_type, target_url_or_email, is_active + ) VALUES (?, 'Test rule', ?, ?, ?, ?) + """, + (rule_id, severity_threshold, channel_type, target, is_active), + ) + return rule_id + + +def test_severity_meets_threshold(): + assert severity_meets_threshold("critical", "high") is True + assert severity_meets_threshold("high", "high") is True + assert severity_meets_threshold("medium", "high") is False + assert severity_meets_threshold("info", "critical") is False + + +@pytest.mark.asyncio +async def test_build_alert_payload_redacts_secrets(): + finding = { + "id": "f1", + "task_id": "t1", + "plugin_id": "nmap", + "title": "Secret leak", + "category": "network", + "severity": "critical", + "target": "127.0.0.1", + "description": "Authorization: Bearer supersecrettoken12345678", + "remediation": "", + "metadata_json": json.dumps({"api_key": "abc123secret"}), + } + rule = { + "id": "r1", + "name": "Alerts", + "severity_threshold": "high", + "channel_type": "webhook", + } + + payload = build_alert_payload(finding, rule) + + assert REDACTED in payload["finding"]["description"] + assert "supersecrettoken12345678" not in payload["finding"]["description"] + assert payload["finding"]["metadata"]["api_key"] == REDACTED + + +@pytest.mark.asyncio +async def test_deliver_via_rule_sends_webhook_and_records_history(test_db): + _, finding_id = await _seed_finding(test_db) + rule_id = await _seed_rule(test_db) + + finding = await test_db.fetchone("SELECT * FROM findings WHERE id = ?", (finding_id,)) + rule = await test_db.fetchone("SELECT * FROM notification_rules WHERE id = ?", (rule_id,)) + + with patch( + "backend.secuscan.notification_service.send_webhook", + new=AsyncMock(return_value=(True, None)), + ): + result = await deliver_via_rule(test_db, rule, finding) + + assert result.status == NotificationDeliveryStatus.SUCCESS + assert result.skipped is False + assert await was_already_delivered(test_db, rule_id, finding_id) is True + + +@pytest.mark.asyncio +async def test_deliver_via_rule_dedupes_second_attempt(test_db): + _, finding_id = await _seed_finding(test_db) + rule_id = await _seed_rule(test_db) + + finding = await test_db.fetchone("SELECT * FROM findings WHERE id = ?", (finding_id,)) + rule = await test_db.fetchone("SELECT * FROM notification_rules WHERE id = ?", (rule_id,)) + + mock_send = AsyncMock(return_value=(True, None)) + with patch( + "backend.secuscan.notification_service.send_webhook", + new=mock_send, + ): + first = await deliver_via_rule(test_db, rule, finding) + second = await deliver_via_rule(test_db, rule, finding) + + assert first.status == NotificationDeliveryStatus.SUCCESS + assert second.skipped is True + assert mock_send.await_count == 1 + + +@pytest.mark.asyncio +async def test_deliver_skips_below_threshold(test_db): + _, finding_id = await _seed_finding(test_db, severity="low") + rule_id = await _seed_rule(test_db, severity_threshold="high") + + finding = await test_db.fetchone("SELECT * FROM findings WHERE id = ?", (finding_id,)) + rule = await test_db.fetchone("SELECT * FROM notification_rules WHERE id = ?", (rule_id,)) + + result = await deliver_via_rule(test_db, rule, finding) + + assert result.skipped is True + row = await test_db.fetchone( + "SELECT * FROM notification_history WHERE rule_id = ? AND finding_id = ?", + (rule_id, finding_id), + ) + assert row is None + + +@pytest.mark.asyncio +async def test_deliver_records_failure_on_webhook_error(test_db): + _, finding_id = await _seed_finding(test_db) + rule_id = await _seed_rule(test_db) + + finding = await test_db.fetchone("SELECT * FROM findings WHERE id = ?", (finding_id,)) + rule = await test_db.fetchone("SELECT * FROM notification_rules WHERE id = ?", (rule_id,)) + + with patch( + "backend.secuscan.notification_service.send_webhook", + new=AsyncMock(return_value=(False, "connection refused")), + ): + result = await deliver_via_rule(test_db, rule, finding) + + assert result.status == NotificationDeliveryStatus.FAILED + row = await test_db.fetchone( + "SELECT * FROM notification_history WHERE rule_id = ? AND finding_id = ?", + (rule_id, finding_id), + ) + assert row is not None + assert row["status"] == NotificationDeliveryStatus.FAILED.value + assert row["error_message"] == "connection refused" + + +@pytest.mark.asyncio +async def test_email_placeholder_records_success(test_db): + _, finding_id = await _seed_finding(test_db) + rule_id = await _seed_rule( + test_db, + channel_type=NotificationChannelType.EMAIL.value, + target="alerts@example.com", + ) + + results = await process_finding_notifications(test_db, finding_id) + + assert len(results) == 1 + assert results[0].status == NotificationDeliveryStatus.SUCCESS + assert results[0].skipped is False