Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ python-multipart>=0.0.9
xhtml2pdf>=0.2.17
aiosqlite>=0.20.0
python-whois>=0.9.4
httpx>=0.28.1
265 changes: 265 additions & 0 deletions backend/secuscan/notification_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
"""
Notification delivery service for high-severity findings.

Evaluates active rules, deduplicates deliveries, redacts alert payloads,
and records outcomes in notification_history. Webhook delivery is live;
email is a logged placeholder until SMTP is added.
"""

from __future__ import annotations

import json
import logging
import uuid
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

import httpx

from .database import Database
from .models import NotificationChannelType, NotificationDeliveryStatus
from .redaction import redact_dict, redact_inputs

logger = logging.getLogger(__name__)

# Lower rank = more severe. A finding meets the threshold when its rank is
# less than or equal to the rule threshold rank.
_SEVERITY_RANK: Dict[str, int] = {
"critical": 0,
"high": 1,
"medium": 2,
"low": 3,
"info": 4,
}

_WEBHOOK_TIMEOUT_SECONDS = 10.0
_USER_AGENT = "SecuScan-Notifications/1.0"


@dataclass(frozen=True)
class DeliveryResult:
"""Outcome of a single rule delivery attempt for one finding."""

rule_id: str
finding_id: str
status: NotificationDeliveryStatus
skipped: bool = False
error_message: Optional[str] = None


def severity_meets_threshold(finding_severity: str, rule_threshold: str) -> bool:
"""Return True when finding severity is at or above the rule threshold."""
finding_rank = _SEVERITY_RANK.get(str(finding_severity).lower())
threshold_rank = _SEVERITY_RANK.get(str(rule_threshold).lower())
if finding_rank is None or threshold_rank is None:
return False
return finding_rank <= threshold_rank


def build_alert_payload(
finding: Dict[str, Any],
rule: Dict[str, Any],
) -> Dict[str, Any]:
"""Build a redacted JSON-serializable alert payload for outbound channels."""
metadata: Dict[str, Any] = {}
raw_metadata = finding.get("metadata_json")
if raw_metadata:
try:
parsed = json.loads(raw_metadata)
if isinstance(parsed, dict):
metadata = redact_inputs(parsed)
except (TypeError, json.JSONDecodeError):
metadata = {"raw": str(raw_metadata)}

payload = {
"event": "finding.alert",
"rule": {
"id": rule.get("id"),
"name": rule.get("name"),
"severity_threshold": rule.get("severity_threshold"),
"channel_type": rule.get("channel_type"),
},
"finding": {
"id": finding.get("id"),
"task_id": finding.get("task_id"),
"plugin_id": finding.get("plugin_id"),
"title": finding.get("title"),
"category": finding.get("category"),
"severity": finding.get("severity"),
"target": finding.get("target"),
"description": finding.get("description"),
"remediation": finding.get("remediation"),
"metadata": metadata,
},
}
return redact_dict(payload)


async def was_already_delivered(
db: Database,
rule_id: str,
finding_id: str,
) -> bool:
"""Return True when this rule already successfully notified this finding."""
row = await db.fetchone(
"""
SELECT id FROM notification_history
WHERE rule_id = ? AND finding_id = ? AND status = ?
LIMIT 1
""",
(rule_id, finding_id, NotificationDeliveryStatus.SUCCESS.value),
)
return row is not None


async def record_delivery(
db: Database,
rule_id: str,
finding_id: str,
status: NotificationDeliveryStatus,
error_message: Optional[str] = None,
) -> str:
"""Persist a delivery attempt and return the history row id."""
history_id = str(uuid.uuid4())
await db.execute(
"""
INSERT INTO notification_history (id, rule_id, finding_id, status, error_message)
VALUES (?, ?, ?, ?, ?)
""",
(
history_id,
rule_id,
finding_id,
status.value,
error_message,
),
)
return history_id


async def send_webhook(target_url: str, payload: Dict[str, Any]) -> tuple[bool, Optional[str]]:
"""POST a redacted alert payload to a webhook URL."""
try:
async with httpx.AsyncClient(timeout=_WEBHOOK_TIMEOUT_SECONDS) as client:
response = await client.post(
target_url,
json=payload,
headers={
"Content-Type": "application/json",
"User-Agent": _USER_AGENT,
},
)
if response.status_code >= 400:
return False, f"Webhook returned HTTP {response.status_code}"
return True, None
except httpx.HTTPError as exc:
return False, str(exc)


async def send_email_placeholder(
target_email: str,
payload: Dict[str, Any],
) -> tuple[bool, Optional[str]]:
"""Placeholder email channel — logs intent without sending mail yet."""
logger.info(
"Email notification placeholder: target=%s finding_id=%s (delivery not implemented)",
target_email,
payload.get("finding", {}).get("id"),
)
return True, None


async def deliver_via_rule(
db: Database,
rule: Dict[str, Any],
finding: Dict[str, Any],
) -> DeliveryResult:
"""Attempt delivery for one rule/finding pair."""
rule_id = str(rule["id"])
finding_id = str(finding["id"])

if not bool(rule.get("is_active")):
return DeliveryResult(
rule_id=rule_id,
finding_id=finding_id,
status=NotificationDeliveryStatus.FAILED,
skipped=True,
error_message="Rule is inactive",
)

if not severity_meets_threshold(
str(finding.get("severity", "info")),
str(rule.get("severity_threshold", "info")),
):
return DeliveryResult(
rule_id=rule_id,
finding_id=finding_id,
status=NotificationDeliveryStatus.FAILED,
skipped=True,
error_message="Finding severity below rule threshold",
)

if await was_already_delivered(db, rule_id, finding_id):
return DeliveryResult(
rule_id=rule_id,
finding_id=finding_id,
status=NotificationDeliveryStatus.SUCCESS,
skipped=True,
error_message="Already delivered",
)

payload = build_alert_payload(finding, rule)
channel = str(rule.get("channel_type", "")).lower()
target = str(rule.get("target_url_or_email", ""))

if channel == NotificationChannelType.WEBHOOK.value:
ok, error = await send_webhook(target, payload)
elif channel == NotificationChannelType.EMAIL.value:
ok, error = await send_email_placeholder(target, payload)
else:
ok, error = False, f"Unsupported channel type: {channel}"

status = (
NotificationDeliveryStatus.SUCCESS if ok else NotificationDeliveryStatus.FAILED
)
await record_delivery(db, rule_id, finding_id, status, error)

return DeliveryResult(
rule_id=rule_id,
finding_id=finding_id,
status=status,
error_message=error,
)


async def process_finding_notifications(
db: Database,
finding_id: str,
) -> List[DeliveryResult]:
"""Evaluate all active rules against one finding and attempt delivery."""
finding = await db.fetchone("SELECT * FROM findings WHERE id = ?", (finding_id,))
if not finding:
return []

rules = await db.fetchall(
"SELECT * FROM notification_rules WHERE is_active = 1 ORDER BY created_at ASC"
)
results: List[DeliveryResult] = []
for rule in rules:
results.append(await deliver_via_rule(db, rule, finding))
return results


async def process_task_notifications(
db: Database,
task_id: str,
) -> List[DeliveryResult]:
"""Evaluate notifications for every finding produced by a task."""
findings = await db.fetchall(
"SELECT id FROM findings WHERE task_id = ? ORDER BY discovered_at ASC",
(task_id,),
)
results: List[DeliveryResult] = []
for row in findings:
results.extend(await process_finding_notifications(db, str(row["id"])))
return results
Loading
Loading