From d9be9cb8423d0f513a9340bc75cc90979a532879 Mon Sep 17 00:00:00 2001 From: smirk-dev Date: Wed, 3 Jun 2026 10:45:40 +0530 Subject: [PATCH 1/3] feat(db): add owner_id to tasks, findings, and reports Introduce a per-user/per-workspace ownership column on the tasks, findings, and reports tables to close the BOLA gap in issue #401, where any caller could address another user's resources by ID. - auth.resolve_owner_id / get_current_owner derive a stable owner identity from the authenticated-user header (X-User-Id), falling back to a shared DEFAULT_OWNER_ID for single-user deployments. - owner_id columns are added idempotently (NOT NULL DEFAULT 'default') so existing rows are backfilled and single-user clients keep their history. - Migration 003 adds owner indexes and a defensive backfill. Co-Authored-By: Claude Opus 4.8 (1M context) --- backend/secuscan/auth.py | 38 +++++++++++++++++++ backend/secuscan/database.py | 27 +++++++++++++ .../secuscan/migrations/003_add_owner_id.sql | 26 +++++++++++++ 3 files changed, 91 insertions(+) create mode 100644 backend/secuscan/migrations/003_add_owner_id.sql diff --git a/backend/secuscan/auth.py b/backend/secuscan/auth.py index e282a116..343f0e46 100644 --- a/backend/secuscan/auth.py +++ b/backend/secuscan/auth.py @@ -83,3 +83,41 @@ async def require_api_key( def get_api_key() -> str | None: """Return the current API key, or None if not yet initialised.""" return _api_key + + +# ── Per-user / per-workspace ownership ────────────────────────────────────── +# +# SecuScan authenticates the deployment with a single shared API key (above). +# That gate does not, by itself, distinguish between the different users or +# workspaces that share a deployment, which is what allowed any caller to read, +# delete, or export any task/report by guessing its ID (BOLA, issue #401). +# +# ``resolve_owner_id`` derives a stable owner identity for the request and is +# persisted as ``owner_id`` on tasks/findings/reports at creation time and +# compared on every read/delete/report access. It deliberately prioritises the +# explicit authenticated-user header (``X-User-Id``) — the same header +# ``resolve_client_identity`` already treats as the authenticated user — so that +# multiple workspaces sharing the deployment API key remain isolated. In a +# production deployment the header is expected to be set by an upstream auth +# proxy / SSO layer; deployments that do not send it fall back to a single +# shared ``DEFAULT_OWNER_ID`` and keep their existing (single-user) behaviour. +# +# This value is duplicated as the SQL column default ('default') in +# database.py — keep the two in sync. +DEFAULT_OWNER_ID = "default" + +_OWNER_HEADER = "x-user-id" + + +def resolve_owner_id(request: Request | None) -> str: + """Resolve the owning user/workspace identity for the current request.""" + if request is not None: + user_id = request.headers.get(_OWNER_HEADER) + if user_id and user_id.strip(): + return f"user:{user_id.strip()}" + return DEFAULT_OWNER_ID + + +async def get_current_owner(request: Request) -> str: + """FastAPI dependency yielding the owner identity for the request.""" + return resolve_owner_id(request) diff --git a/backend/secuscan/database.py b/backend/secuscan/database.py index 69c66b2f..ba36b8fe 100644 --- a/backend/secuscan/database.py +++ b/backend/secuscan/database.py @@ -54,6 +54,7 @@ async def _create_schema(self): """ CREATE TABLE IF NOT EXISTS tasks ( id TEXT PRIMARY KEY, + owner_id TEXT NOT NULL DEFAULT 'default', plugin_id TEXT NOT NULL, tool_name TEXT NOT NULL, target TEXT NOT NULL, @@ -95,6 +96,7 @@ async def _create_schema(self): CREATE TABLE IF NOT EXISTS findings ( id TEXT PRIMARY KEY, + owner_id TEXT NOT NULL DEFAULT 'default', task_id TEXT REFERENCES tasks(id) ON DELETE SET NULL, plugin_id TEXT NOT NULL, title TEXT NOT NULL, @@ -113,6 +115,7 @@ async def _create_schema(self): CREATE TABLE IF NOT EXISTS reports ( id TEXT PRIMARY KEY, + owner_id TEXT NOT NULL DEFAULT 'default', task_id TEXT REFERENCES tasks(id) ON DELETE SET NULL, name TEXT NOT NULL, type TEXT NOT NULL DEFAULT 'technical', @@ -200,6 +203,8 @@ async def _create_schema(self): CREATE INDEX IF NOT EXISTS idx_tasks_plugin ON tasks(plugin_id); -- Composite index for dashboard running tasks query CREATE INDEX IF NOT EXISTS idx_tasks_status_created ON tasks(status, created_at DESC); + -- Owner scoping (BOLA prevention, issue #401) + CREATE INDEX IF NOT EXISTS idx_tasks_owner ON tasks(owner_id); -- Findings indexes (new) CREATE INDEX IF NOT EXISTS idx_findings_severity ON findings(severity); @@ -209,11 +214,15 @@ async def _create_schema(self): CREATE INDEX IF NOT EXISTS idx_findings_target ON findings(target); -- Composite index for severity counting by task CREATE INDEX IF NOT EXISTS idx_findings_task_severity ON findings(task_id, severity); + -- Owner scoping (BOLA prevention, issue #401) + CREATE INDEX IF NOT EXISTS idx_findings_owner ON findings(owner_id); -- Reports indexes (new) CREATE INDEX IF NOT EXISTS idx_reports_task_id ON reports(task_id); CREATE INDEX IF NOT EXISTS idx_reports_generated_at ON reports(generated_at DESC); CREATE INDEX IF NOT EXISTS idx_reports_status ON reports(status); + -- Owner scoping (BOLA prevention, issue #401) + CREATE INDEX IF NOT EXISTS idx_reports_owner ON reports(owner_id); -- Audit log indexes (new) CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_log(timestamp DESC); @@ -235,6 +244,10 @@ async def _create_schema(self): existing_cols = {col["name"] for col in tasks_columns} needed_cols = { + # Per-user ownership for BOLA prevention (issue #401). NOT NULL with a + # constant default backfills every existing row to the shared default + # owner, preserving single-user deployments' access to their history. + "owner_id": "TEXT NOT NULL DEFAULT 'default'", "exit_code": "INTEGER", "scan_phase": "TEXT", "structured_json": "TEXT", @@ -272,6 +285,8 @@ async def _create_schema(self): "asset_exposure": "TEXT", "risk_score": "REAL", "risk_factors_json": "TEXT NOT NULL DEFAULT '[]'", + # Per-user ownership for BOLA prevention (issue #401). + "owner_id": "TEXT NOT NULL DEFAULT 'default'", } for col_name, col_type in risk_cols.items(): if col_name not in existing_finding_cols: @@ -281,6 +296,18 @@ async def _create_schema(self): except Exception as e: print(f"Failed to add column {col_name}: {e}") + # Reports table migration: ensure owner_id exists (issue #401) + reports_columns = await self.fetchall("PRAGMA table_info(reports)") + existing_report_cols = {col["name"] for col in reports_columns} + if "owner_id" not in existing_report_cols: + try: + await self.execute( + "ALTER TABLE reports ADD COLUMN owner_id TEXT NOT NULL DEFAULT 'default'" + ) + print("Added missing column 'owner_id' to reports table.") + except Exception as e: + print(f"Failed to add 'owner_id' to reports: {e}") + async def _run_migrations(self): migrations_dir = Path(__file__).parent / "migrations" diff --git a/backend/secuscan/migrations/003_add_owner_id.sql b/backend/secuscan/migrations/003_add_owner_id.sql new file mode 100644 index 00000000..283e7e17 --- /dev/null +++ b/backend/secuscan/migrations/003_add_owner_id.sql @@ -0,0 +1,26 @@ +-- Migration: 003_add_owner_id +-- Introduces per-user / per-workspace ownership for tasks, findings, and +-- reports to close the Broken Object Level Authorization (BOLA) gap where any +-- caller could read, delete, or export any task/report by guessing its ID +-- (issue #401). +-- +-- The owner_id columns themselves are added idempotently in database.py +-- (_create_schema), using PRAGMA table_info checks so re-running startup is +-- safe — SQLite has no "ALTER TABLE ... ADD COLUMN IF NOT EXISTS". This file +-- only contains statements that are safe to re-run on every startup: +-- +-- 1. A defensive backfill of any NULL owner_id to the shared default owner. +-- (New columns are added as NOT NULL DEFAULT 'default', so existing rows +-- are already backfilled; this guards against rows created by an older +-- build that may have added the column as nullable.) +-- 2. Indexes used to keep owner-scoped list queries fast. +-- +-- Keep the 'default' literal in sync with auth.DEFAULT_OWNER_ID. + +UPDATE tasks SET owner_id = 'default' WHERE owner_id IS NULL; +UPDATE findings SET owner_id = 'default' WHERE owner_id IS NULL; +UPDATE reports SET owner_id = 'default' WHERE owner_id IS NULL; + +CREATE INDEX IF NOT EXISTS idx_tasks_owner ON tasks(owner_id); +CREATE INDEX IF NOT EXISTS idx_findings_owner ON findings(owner_id); +CREATE INDEX IF NOT EXISTS idx_reports_owner ON reports(owner_id); From fc512ddb3d2b9e8d8e51b5e15ee4f0abeb5bb471 Mon Sep 17 00:00:00 2001 From: smirk-dev Date: Wed, 3 Jun 2026 10:45:51 +0530 Subject: [PATCH 2/3] feat(api): enforce per-user ownership on task/report/finding APIs Populate owner_id at creation time and scope every read, list, delete, and report/export endpoint to the requesting owner so cross-user access is impossible (BOLA, issue #401). - executor.create_task accepts owner_id; findings and reports inherit the owning task's owner_id on every execution path (manual, modular scanner, workflow, scheduler). - task/report/finding GET/stream/cancel endpoints return 403 on owner mismatch; single delete stays idempotent for missing tasks but 403s on a foreign-owned task. - list/aggregate endpoints (tasks, findings, reports, dashboard, attack-surface, assets) filter by owner_id and namespace their caches by owner so one user's data is never listed or served to another. - bulk delete and clear operate only on the caller's own tasks. Co-Authored-By: Claude Opus 4.8 (1M context) --- backend/secuscan/executor.py | 46 ++++--- backend/secuscan/routes.py | 249 ++++++++++++++++++++++++----------- 2 files changed, 200 insertions(+), 95 deletions(-) diff --git a/backend/secuscan/executor.py b/backend/secuscan/executor.py index 7cb294f9..6760f501 100644 --- a/backend/secuscan/executor.py +++ b/backend/secuscan/executor.py @@ -13,6 +13,7 @@ import logging import re +from .auth import DEFAULT_OWNER_ID from .redaction import redact from .cache import get_cache from .config import settings @@ -145,17 +146,22 @@ async def create_task( inputs: Dict[str, Any], safe_mode: bool, preset: Optional[str] = None, - consent_granted: bool = False + consent_granted: bool = False, + owner_id: str = DEFAULT_OWNER_ID, ) -> str: """ Create a new scan task. - + Args: plugin_id: Plugin identifier inputs: User input values preset: Optional preset name consent_granted: Whether user granted consent - + owner_id: Owning user/workspace identity used to scope later + access (issue #401). Defaults to the shared default owner for + internal callers (workflows, scheduler, CLI) that are not tied + to a request. + Returns: Task ID """ @@ -177,12 +183,13 @@ async def create_task( await db.execute( """ INSERT INTO tasks ( - id, plugin_id, tool_name, target, inputs_json, preset, + id, owner_id, plugin_id, tool_name, target, inputs_json, preset, status, scan_phase, consent_granted, safe_mode - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( task_id, + owner_id, plugin_id, plugin.name, extract_target(inputs), @@ -261,13 +268,14 @@ async def execute_task(self, task_id: str): # Get task details task_row = await db.fetchone( - "SELECT plugin_id, inputs_json, safe_mode FROM tasks WHERE id = ?", + "SELECT owner_id, plugin_id, inputs_json, safe_mode FROM tasks WHERE id = ?", (task_id,) ) if not task_row: raise ValueError(f"Task not found: {task_id}") + owner_id = task_row["owner_id"] plugin_id = task_row["plugin_id"] inputs = json.loads(task_row["inputs_json"]) safe_mode = bool(task_row["safe_mode"]) @@ -383,6 +391,7 @@ async def execute_task(self, task_id: str): await self._upsert_findings_and_report_from_scanner( db=db, task_id=task_id, + owner_id=owner_id, scanner=scanner, plugin_id=plugin_id, target=target, @@ -520,6 +529,7 @@ async def execute_task(self, task_id: str): await self._upsert_findings_and_report( db=db, task_id=task_id, + owner_id=owner_id, plugin=plugin, plugin_id=plugin_id, target=target, @@ -850,7 +860,7 @@ async def get_task_status(self, task_id: str) -> Optional[Dict]: "pending_count": pending_count, } - async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id: str, target: str, status: str, output: str = ""): + async def _upsert_findings_and_report(self, db, task_id: str, owner_id: str, plugin, plugin_id: str, target: str, status: str, output: str = ""): """Persist derived findings and report records into SQLite.""" parsed = self._parse_results(plugin, output) findings_data = parsed.get("findings", []) @@ -890,16 +900,17 @@ async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id: await db.execute( """ INSERT INTO findings ( - id, task_id, plugin_id, title, category, severity, + id, owner_id, task_id, plugin_id, title, category, severity, target, description, remediation, proof, cvss, cve, metadata_json, discovered_at, exploitability, confidence, asset_exposure, risk_score, risk_factors_json - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( finding_id, + owner_id, task_id, plugin_id, finding["title"], @@ -924,8 +935,8 @@ async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id: await db.execute( """ INSERT INTO reports ( - id, task_id, name, type, generated_at, status, findings, pages - ) VALUES (?, ?, ?, ?, (datetime('now')), ?, ?, ?) + id, owner_id, task_id, name, type, generated_at, status, findings, pages + ) VALUES (?, ?, ?, ?, ?, (datetime('now')), ?, ?, ?) ON CONFLICT (id) DO UPDATE SET status = EXCLUDED.status, findings = EXCLUDED.findings, @@ -933,6 +944,7 @@ async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id: """, ( f"report:{task_id}", + owner_id, task_id, f"{plugin.name} Report", "technical", @@ -942,7 +954,7 @@ async def _upsert_findings_and_report(self, db, task_id: str, plugin, plugin_id: ), ) - async def _upsert_findings_and_report_from_scanner(self, db, task_id: str, scanner: Any, plugin_id: str, target: str, status: str, result: Dict[str, Any]): + async def _upsert_findings_and_report_from_scanner(self, db, task_id: str, owner_id: str, scanner: Any, plugin_id: str, target: str, status: str, result: Dict[str, Any]): """Persist modular scanner results into findings, and reports.""" findings_data = result.get("findings", []) @@ -975,16 +987,17 @@ async def _upsert_findings_and_report_from_scanner(self, db, task_id: str, scann await db.execute( """ INSERT INTO findings ( - id, task_id, plugin_id, title, category, severity, + id, owner_id, task_id, plugin_id, title, category, severity, target, description, remediation, proof, cvss, cve, metadata_json, discovered_at, exploitability, confidence, asset_exposure, risk_score, risk_factors_json - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( finding_id, + owner_id, task_id, plugin_id, finding["title"], @@ -1010,8 +1023,8 @@ async def _upsert_findings_and_report_from_scanner(self, db, task_id: str, scann await db.execute( """ INSERT INTO reports ( - id, task_id, name, type, generated_at, status, findings, pages - ) VALUES (?, ?, ?, ?, (datetime('now')), ?, ?, ?) + id, owner_id, task_id, name, type, generated_at, status, findings, pages + ) VALUES (?, ?, ?, ?, ?, (datetime('now')), ?, ?, ?) ON CONFLICT (id) DO UPDATE SET status = EXCLUDED.status, findings = EXCLUDED.findings, @@ -1019,6 +1032,7 @@ async def _upsert_findings_and_report_from_scanner(self, db, task_id: str, scann """, ( f"report:{task_id}", + owner_id, task_id, f"{scanner.name} Report", "professional" if status == TaskStatus.COMPLETED.value else "failed", diff --git a/backend/secuscan/routes.py b/backend/secuscan/routes.py index 0ffc74a5..902cef1d 100644 --- a/backend/secuscan/routes.py +++ b/backend/secuscan/routes.py @@ -9,7 +9,6 @@ import logging import re import os -import shutil import uuid import asyncio from pathlib import Path @@ -126,7 +125,7 @@ def build_report_filename(task: Dict[str, Any], extension: str) -> str: from .reporting import reporting from .vault import VaultCrypto from .workflows import scheduler -from .auth import require_api_key +from .auth import require_api_key, get_current_owner from sse_starlette.sse import EventSourceResponse @@ -194,6 +193,21 @@ async def invalidate_view_cache(): await cache.delete_prefix(prefix) +async def require_owned_task(db, task_id: str, owner: str, columns: str = "owner_id") -> Dict[str, Any]: + """Fetch a task and enforce that it belongs to ``owner`` (issue #401). + + Returns the selected row on success. Raises 404 when the task does not + exist and 403 when it is owned by a different user/workspace. ``columns`` + must include ``owner_id`` so the ownership comparison can be made. + """ + row = await db.fetchone(f"SELECT {columns} FROM tasks WHERE id = ?", (task_id,)) + if row is None: + raise HTTPException(status_code=404, detail="Task not found") + if row.get("owner_id") != owner: + raise HTTPException(status_code=403, detail="You do not have access to this task") + return row + + def iter_raw_output_chunks(path: str, chunk_size: int = SSE_RAW_OUTPUT_CHUNK_SIZE): """Yield raw output in bounded chunks for completed-task SSE replay.""" with open(path, "r", encoding="utf-8", errors="replace") as output_file: @@ -298,6 +312,7 @@ async def start_task( request: TaskCreateRequest, background_tasks: BackgroundTasks, raw_request: Request, + owner: str = Depends(get_current_owner), ): """ Start a new scan task. @@ -388,6 +403,7 @@ async def start_task( safe_mode=safe_mode, preset=request.preset, consent_granted=request.consent_granted, + owner_id=owner, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) from e @@ -418,8 +434,11 @@ async def start_task( } @router.get("/task/{task_id}/status") -async def get_task_status(task_id: str): +async def get_task_status(task_id: str, owner: str = Depends(get_current_owner)): """Get task status""" + db = await get_db() + await require_owned_task(db, task_id, owner) + status = await executor.get_task_status(task_id) if not status: @@ -428,10 +447,13 @@ async def get_task_status(task_id: str): return status @router.get("/task/{task_id}/stream") -async def stream_task_output(task_id: str): +async def stream_task_output(task_id: str, owner: str = Depends(get_current_owner)): """Stream task output via Server-Sent Events (SSE)""" import asyncio + db = await get_db() + await require_owned_task(db, task_id, owner) + status = await executor.get_task_status(task_id) if not status: raise HTTPException(status_code=404, detail="Task not found") @@ -490,17 +512,20 @@ async def event_generator(): return EventSourceResponse(event_generator()) @router.get("/task/{task_id}/report/csv", dependencies=[Depends(report_download_limiter)]) -async def download_csv_report(task_id: str): +async def download_csv_report(task_id: str, owner: str = Depends(get_current_owner)): """Download task results as a CSV report.""" db = await get_db() task_row = await db.fetchone( - "SELECT id, plugin_id, tool_name, target, status, created_at, preset, inputs_json, command_used, structured_json FROM tasks WHERE id = ?", + "SELECT id, owner_id, plugin_id, tool_name, target, status, created_at, preset, inputs_json, command_used, structured_json FROM tasks WHERE id = ?", (task_id,) ) if not task_row: raise HTTPException(status_code=404, detail="Task not found") + if task_row["owner_id"] != owner: + raise HTTPException(status_code=403, detail="You do not have access to this task") + if task_row["status"] not in ["completed", "failed"]: raise HTTPException(status_code=400, detail="Task is not finished yet") @@ -525,17 +550,20 @@ async def download_csv_report(task_id: str): ) @router.get("/task/{task_id}/report/html", dependencies=[Depends(report_download_limiter)]) -async def download_html_report(task_id: str): +async def download_html_report(task_id: str, owner: str = Depends(get_current_owner)): """Download task results as an HTML report.""" db = await get_db() task_row = await db.fetchone( - "SELECT id, plugin_id, tool_name, target, status, created_at, preset, inputs_json, command_used, structured_json FROM tasks WHERE id = ?", + "SELECT id, owner_id, plugin_id, tool_name, target, status, created_at, preset, inputs_json, command_used, structured_json FROM tasks WHERE id = ?", (task_id,) ) if not task_row: raise HTTPException(status_code=404, detail="Task not found") + if task_row["owner_id"] != owner: + raise HTTPException(status_code=403, detail="You do not have access to this task") + if task_row["status"] not in ["completed", "failed"]: raise HTTPException(status_code=400, detail="Task is not finished yet") @@ -560,17 +588,20 @@ async def download_html_report(task_id: str): ) @router.get("/task/{task_id}/report/pdf", dependencies=[Depends(report_download_limiter)]) -async def download_pdf_report(task_id: str): +async def download_pdf_report(task_id: str, owner: str = Depends(get_current_owner)): """Download task results as a PDF report.""" db = await get_db() task_row = await db.fetchone( - "SELECT id, plugin_id, tool_name, target, status, created_at, preset, inputs_json, command_used, structured_json FROM tasks WHERE id = ?", + "SELECT id, owner_id, plugin_id, tool_name, target, status, created_at, preset, inputs_json, command_used, structured_json FROM tasks WHERE id = ?", (task_id,) ) if not task_row: raise HTTPException(status_code=404, detail="Task not found") + if task_row["owner_id"] != owner: + raise HTTPException(status_code=403, detail="You do not have access to this task") + if task_row["status"] not in ["completed", "failed"]: raise HTTPException(status_code=400, detail="Task is not finished yet") @@ -596,17 +627,20 @@ async def download_pdf_report(task_id: str): @router.get("/task/{task_id}/report/sarif", dependencies=[Depends(report_download_limiter)]) -async def download_sarif_report(task_id: str): +async def download_sarif_report(task_id: str, owner: str = Depends(get_current_owner)): """Download task results as a SARIF report.""" db = await get_db() task_row = await db.fetchone( - "SELECT id, plugin_id, tool_name, target, status, created_at, preset, inputs_json, command_used, structured_json FROM tasks WHERE id = ?", + "SELECT id, owner_id, plugin_id, tool_name, target, status, created_at, preset, inputs_json, command_used, structured_json FROM tasks WHERE id = ?", (task_id,) ) if not task_row: raise HTTPException(status_code=404, detail="Task not found") + if task_row["owner_id"] != owner: + raise HTTPException(status_code=403, detail="You do not have access to this task") + if task_row["status"] not in ["completed", "failed"]: raise HTTPException(status_code=400, detail="Task is not finished yet") @@ -632,13 +666,13 @@ async def download_sarif_report(task_id: str): @router.get("/task/{task_id}/result") -async def get_task_result(task_id: str): +async def get_task_result(task_id: str, owner: str = Depends(get_current_owner)): """Get task execution result""" db = await get_db() task_row = await db.fetchone( """ - SELECT id, plugin_id, tool_name, target, status, + SELECT id, owner_id, plugin_id, tool_name, target, status, created_at, duration_seconds, structured_json, preset, inputs_json, raw_output_path, command_used, error_message, exit_code FROM tasks WHERE id = ? @@ -649,6 +683,9 @@ async def get_task_result(task_id: str): if not task_row: raise HTTPException(status_code=404, detail="Task not found") + if task_row["owner_id"] != owner: + raise HTTPException(status_code=403, detail="You do not have access to this task") + structured = {} if task_row["structured_json"]: try: @@ -718,8 +755,11 @@ async def get_task_result(task_id: str): @router.post("/task/{task_id}/cancel") -async def cancel_task(task_id: str): +async def cancel_task(task_id: str, owner: str = Depends(get_current_owner)): """Cancel a running task""" + db = await get_db() + await require_owned_task(db, task_id, owner) + cancelled = await executor.cancel_task(task_id) if not cancelled: @@ -733,20 +773,24 @@ async def cancel_task(task_id: str): @router.get("/dashboard/summary", dependencies=[Depends(read_heavy_limiter)]) -async def get_dashboard_summary(): - """Return aggregate dashboard data from the primary store, cached in Redis.""" +async def get_dashboard_summary(owner: str = Depends(get_current_owner)): + """Return the caller's aggregate dashboard data, cached per owner.""" async def build(): db = await get_db() # Get data - # Push severity aggregation to DB — avoids full table scan in Python + # Push severity aggregation to DB — avoids full table scan in Python. + # Every aggregate below is scoped to the caller so the dashboard never + # surfaces another user/workspace's tasks or findings (issue #401). severity_rows = await db.fetchall( """ SELECT severity, COUNT(*) AS cnt FROM findings + WHERE owner_id = ? GROUP BY severity - """ + """, + (owner,), ) severity_counts = {row["severity"]: row["cnt"] for row in severity_rows} @@ -757,10 +801,14 @@ async def build(): COUNT(*) FILTER (WHERE status = 'completed') AS completed, COUNT(*) FILTER (WHERE status = 'running') AS running FROM tasks - """ + WHERE owner_id = ? + """, + (owner,), ) - total_findings_row = await db.fetchone("SELECT COUNT(*) AS total FROM findings") + total_findings_row = await db.fetchone( + "SELECT COUNT(*) AS total FROM findings WHERE owner_id = ?", (owner,) + ) total_findings = total_findings_row["total"] if total_findings_row else 0 critical_findings: int = severity_counts.get("critical", 0) @@ -776,9 +824,11 @@ async def build(): remediation, proof, cvss, cve, discovered_at, risk_score, risk_factors_json, metadata_json FROM findings + WHERE owner_id = ? ORDER BY discovered_at DESC LIMIT 5 - """ + """, + (owner,), ) recent_findings: List[Dict] = parse_json_fields(recent_rows, ["metadata_json"]) @@ -805,47 +855,57 @@ async def build(): }, "running_tasks": parse_json_fields( await db.fetchall( - "SELECT id, plugin_id, tool_name, target, status, created_at FROM tasks WHERE status = 'running' ORDER BY created_at DESC LIMIT 5" + "SELECT id, plugin_id, tool_name, target, status, created_at FROM tasks WHERE owner_id = ? AND status = 'running' ORDER BY created_at DESC LIMIT 5", + (owner,), ), [] ), "recent_tasks": parse_json_fields( await db.fetchall( - "SELECT id, plugin_id, tool_name, target, status, created_at, duration_seconds FROM tasks ORDER BY created_at DESC LIMIT 5" + "SELECT id, plugin_id, tool_name, target, status, created_at, duration_seconds FROM tasks WHERE owner_id = ? ORDER BY created_at DESC LIMIT 5", + (owner,), ), [] ) } - return await get_or_set_cached("summary:dashboard", build) + return await get_or_set_cached(f"summary:dashboard:{owner}", build) @router.get("/findings", dependencies=[Depends(read_heavy_limiter)]) -async def get_findings(): - """Return vulnerability findings.""" +async def get_findings(owner: str = Depends(get_current_owner)): + """Return the caller's vulnerability findings.""" async def build(): db = await get_db() - rows = await db.fetchall("SELECT * FROM findings ORDER BY discovered_at DESC") + rows = await db.fetchall( + "SELECT * FROM findings WHERE owner_id = ? ORDER BY discovered_at DESC", + (owner,), + ) findings = parse_json_fields(rows, ["metadata_json", "risk_factors_json"]) for f in findings: if "risk_factors_json" in f: f["risk_factors"] = f.pop("risk_factors_json") return {"findings": findings} - return await get_or_set_cached("findings:list", build) + # Cache key is namespaced by owner so one user's list is never served to + # another (issue #401). + return await get_or_set_cached(f"findings:list:{owner}", build) @router.get("/reports", dependencies=[Depends(read_heavy_limiter)]) -async def get_reports(): - """Return generated reports.""" +async def get_reports(owner: str = Depends(get_current_owner)): + """Return the caller's generated reports.""" async def build(): db = await get_db() - rows = await db.fetchall("SELECT * FROM reports ORDER BY generated_at DESC") + rows = await db.fetchall( + "SELECT * FROM reports WHERE owner_id = ? ORDER BY generated_at DESC", + (owner,), + ) return {"reports": parse_json_fields(rows, ["metadata_json"])} - return await get_or_set_cached("reports:list", build) + return await get_or_set_cached(f"reports:list:{owner}", build) @router.get("/tasks", dependencies=[Depends(read_heavy_limiter)]) @@ -853,16 +913,18 @@ async def list_tasks( page: int = Query(1, ge=1), per_page: int = Query(25, ge=1, le=100), plugin_id: Optional[str] = None, - status: Optional[str] = None + status: Optional[str] = None, + owner: str = Depends(get_current_owner), ): - """List all tasks with pagination""" + """List the caller's tasks with pagination""" db = await get_db() - # Build query + # Build query — always scoped to the caller so listing can never enumerate + # another user/workspace's tasks (issue #401). query = "SELECT id, plugin_id, tool_name, target, status, created_at, duration_seconds, inputs_json, preset, error_message, exit_code FROM tasks" - params = [] + params = [owner] - where_clauses = [] + where_clauses = ["owner_id = ?"] if plugin_id: where_clauses.append("plugin_id = ?") params.append(plugin_id) @@ -870,8 +932,7 @@ async def list_tasks( where_clauses.append("status = ?") params.append(status) - if where_clauses: - query += " WHERE " + " AND ".join(where_clauses) + query += " WHERE " + " AND ".join(where_clauses) query += " ORDER BY created_at DESC LIMIT ? OFFSET ?" params.extend([per_page, (page - 1) * per_page]) @@ -969,10 +1030,17 @@ async def delete_task_records(task_ids: List[str]): logger.error(f"Failed to delete raw output file {row['raw_output_path']}: {e}") @router.delete("/task/{task_id}") -async def delete_task(task_id: str): +async def delete_task(task_id: str, owner: str = Depends(get_current_owner)): """Delete a task and its associated data (findings, reports, audit logs, and files)""" db = await get_db() + # Deleting a non-existent task stays idempotent (200, deletes zero rows), + # but a task owned by another user/workspace is rejected with 403 so it + # cannot be deleted across owners (issue #401). + existing = await db.fetchone("SELECT owner_id FROM tasks WHERE id = ?", (task_id,)) + if existing is not None and existing["owner_id"] != owner: + raise HTTPException(status_code=403, detail="You do not have access to this task") + # Check if task is running status = await executor.get_task_status(task_id) if status and status.get("status") == "running": @@ -988,7 +1056,7 @@ async def delete_task(task_id: str): @router.delete("/tasks/bulk") -async def bulk_delete_tasks(request: BulkDeleteRequest): +async def bulk_delete_tasks(request: BulkDeleteRequest, owner: str = Depends(get_current_owner)): """Delete multiple tasks at once (max 500 IDs per request)""" task_ids = request.root # RootModel exposes data via .root db = await get_db() @@ -997,58 +1065,65 @@ async def bulk_delete_tasks(request: BulkDeleteRequest): if not task_ids: return {"deleted_count": 0, "success": True} - # Check running tasks — safe: len(task_ids) <= 500 guaranteed by Pydantic + # Scope to tasks owned by the caller. IDs owned by another user/workspace + # are silently ignored so cross-user enumeration and deletion are + # impossible (issue #401). len(task_ids) <= 500 guaranteed by Pydantic. placeholders = ",".join(["?"] * len(task_ids)) + owned_rows = await db.fetchall( + f"SELECT id FROM tasks WHERE id IN ({placeholders}) AND owner_id = ?", + tuple(task_ids) + (owner,), + ) + owned_ids = [row["id"] for row in owned_rows] + if not owned_ids: + return {"deleted_count": 0, "success": True} + + # Check running tasks among the caller's own tasks + placeholders = ",".join(["?"] * len(owned_ids)) running_tasks = await db.fetchone( f"SELECT id FROM tasks WHERE id IN ({placeholders}) AND status = 'running' LIMIT 1", - tuple(task_ids) + tuple(owned_ids) ) if running_tasks: raise HTTPException(status_code=400, detail="Cannot delete running tasks. Abort them first.") # If the task is currently executing but the DB hasn't been updated yet, fail closed. - if any(tid in executor.running_tasks for tid in task_ids): + if any(tid in executor.running_tasks for tid in owned_ids): raise HTTPException(status_code=400, detail="Cannot delete running tasks. Abort them first.") - await delete_task_records(task_ids) + await delete_task_records(owned_ids) await invalidate_view_cache() return { - "deleted_count": len(task_ids), + "deleted_count": len(owned_ids), "success": True } @router.delete("/tasks/clear") -async def clear_all_tasks(): - """Wipe all scan history and associated data (findings, reports, assets, attack surface)""" +async def clear_all_tasks(owner: str = Depends(get_current_owner)): + """Wipe the caller's scan history and associated data (findings, reports). + + Scoped to the requesting user/workspace so one owner cannot purge another + owner's history (issue #401). + """ db = await get_db() - # Prevent clearing if any tasks are running - running_tasks = await db.fetchone("SELECT id FROM tasks WHERE status = 'running' LIMIT 1") + # Prevent clearing if any of the caller's tasks are running + running_tasks = await db.fetchone( + "SELECT id FROM tasks WHERE owner_id = ? AND status = 'running' LIMIT 1", + (owner,), + ) if running_tasks: raise HTTPException(status_code=400, detail="Cannot clear history while tasks are running.") - # Get all task IDs to cleanup files - all_tasks = await db.fetchall("SELECT id FROM tasks") - task_ids = [t["id"] for t in all_tasks] + # Get the caller's task IDs to delete records and cleanup files + own_tasks = await db.fetchall("SELECT id FROM tasks WHERE owner_id = ?", (owner,)) + task_ids = [t["id"] for t in own_tasks] if task_ids: await delete_task_records(task_ids) - # Purge other tables - await db.execute("DELETE FROM findings") - - # Fallback cleanup for any orphaned files in data directories - for subdir in ["raw", "reports"]: - dir_path = Path(settings.data_dir) / subdir - if dir_path.exists(): - for item in dir_path.iterdir(): - try: - if item.is_file(): - item.unlink() - elif item.is_dir(): - shutil.rmtree(item) - except Exception as e: - logger.error(f"Failed to cleanup {item}: {e}") + # Sweep up any of the caller's findings not linked to a task (task_id was + # set NULL by ON DELETE) so nothing of theirs is left behind. + await db.execute("DELETE FROM findings WHERE owner_id = ?", (owner,)) await invalidate_view_cache() @@ -1421,7 +1496,7 @@ async def list_notification_history( @router.get("/finding/{finding_id}") -async def get_finding_details(finding_id: str): +async def get_finding_details(finding_id: str, owner: str = Depends(get_current_owner)): """Get detailed information for a specific finding""" db = await get_db() @@ -1438,6 +1513,9 @@ async def get_finding_details(finding_id: str): if not finding_row: raise HTTPException(status_code=404, detail="Finding not found") + if finding_row["owner_id"] != owner: + raise HTTPException(status_code=403, detail="You do not have access to this finding") + metadata = {} if finding_row["metadata_json"]: try: @@ -1477,13 +1555,19 @@ async def get_finding_details(finding_id: str): @router.get("/attack-surface") -async def get_attack_surface(): - """Return an aggregated view of the monitored attack surface.""" +async def get_attack_surface(owner: str = Depends(get_current_owner)): + """Return an aggregated view of the caller's monitored attack surface.""" db = await get_db() - # We aggregate unique targets from tasks and findings - tasks = await db.fetchall("SELECT DISTINCT target, tool_name, created_at FROM tasks ORDER BY created_at DESC") - findings = await db.fetchall("SELECT DISTINCT target, category, severity, discovered_at FROM findings ORDER BY discovered_at DESC") + # We aggregate unique targets from the caller's own tasks and findings + tasks = await db.fetchall( + "SELECT DISTINCT target, tool_name, created_at FROM tasks WHERE owner_id = ? ORDER BY created_at DESC", + (owner,), + ) + findings = await db.fetchall( + "SELECT DISTINCT target, category, severity, discovered_at FROM findings WHERE owner_id = ? ORDER BY discovered_at DESC", + (owner,), + ) entries = [] seen_targets = set() @@ -1522,11 +1606,18 @@ async def get_attack_surface(): @router.get("/assets") -async def get_assets(): - """Return a list of tracked assets.""" +async def get_assets(owner: str = Depends(get_current_owner)): + """Return a list of the caller's tracked assets.""" db = await get_db() - # For now, we use unique targets as assets - rows = await db.fetchall("SELECT DISTINCT target FROM tasks UNION SELECT DISTINCT target FROM findings") + # For now, we use unique targets as assets, scoped to the caller (issue #401) + rows = await db.fetchall( + """ + SELECT DISTINCT target FROM tasks WHERE owner_id = ? + UNION + SELECT DISTINCT target FROM findings WHERE owner_id = ? + """, + (owner, owner), + ) assets = [{"id": str(uuid.uuid4()), "name": row["target"]} for row in rows] return {"assets": assets} From 29486006a9875e6bc8257463c096badf51edfdea Mon Sep 17 00:00:00 2001 From: smirk-dev Date: Wed, 3 Jun 2026 10:45:59 +0530 Subject: [PATCH 3/3] test(auth): add cross-user authorization tests for tasks and reports Verify two distinct users (via X-User-Id) cannot reach each other's data: fetch/stream/cancel/delete/report all return 403 across owners, list and dashboard endpoints never leak another owner's tasks/findings/reports, and bulk delete/clear only touch the caller's own tasks. Also confirm owners retain full access to their own resources and missing-task delete stays idempotent (issue #401). Co-Authored-By: Claude Opus 4.8 (1M context) --- .../integration/test_owner_authorization.py | 260 ++++++++++++++++++ 1 file changed, 260 insertions(+) create mode 100644 testing/backend/integration/test_owner_authorization.py diff --git a/testing/backend/integration/test_owner_authorization.py b/testing/backend/integration/test_owner_authorization.py new file mode 100644 index 00000000..c68a8c00 --- /dev/null +++ b/testing/backend/integration/test_owner_authorization.py @@ -0,0 +1,260 @@ +""" +Integration tests for per-user / per-workspace ownership of tasks, findings, +and reports (issue #401 — Broken Object Level Authorization / BOLA). + +Two distinct users are simulated by sending different ``X-User-Id`` headers on +top of the shared deployment API key (see auth.resolve_owner_id). The tests +assert that User B can never read, list, delete, or export User A's data, while +User A retains full access to their own. +""" + +import sqlite3 +import time + +import pytest + +from backend.secuscan.config import settings + + +ALICE = {"X-User-Id": "alice"} +BOB = {"X-User-Id": "bob"} + +# owner_id values as persisted by auth.resolve_owner_id for the headers above. +ALICE_OWNER = "user:alice" +BOB_OWNER = "user:bob" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _seed_task(owner_id: str, task_id: str, *, status: str = "completed") -> None: + """Insert a task row directly with an explicit owner_id.""" + conn = sqlite3.connect(settings.database_path) + try: + conn.execute( + """ + INSERT INTO tasks (id, owner_id, plugin_id, tool_name, target, + status, inputs_json, structured_json, consent_granted) + VALUES (?, ?, 'nmap', 'nmap', '127.0.0.1', ?, '{}', '{"findings": []}', 1) + """, + (task_id, owner_id, status), + ) + conn.commit() + finally: + conn.close() + + +def _seed_finding(owner_id: str, finding_id: str, task_id: str) -> None: + conn = sqlite3.connect(settings.database_path) + try: + conn.execute( + """ + INSERT INTO findings (id, owner_id, task_id, plugin_id, title, category, + severity, target, description, remediation) + VALUES (?, ?, ?, 'nmap', 'Open port', 'network', 'low', '127.0.0.1', 'desc', 'fix') + """, + (finding_id, owner_id, task_id), + ) + conn.commit() + finally: + conn.close() + + +def _seed_report(owner_id: str, report_id: str, task_id: str) -> None: + conn = sqlite3.connect(settings.database_path) + try: + conn.execute( + """ + INSERT INTO reports (id, owner_id, task_id, name, type, status) + VALUES (?, ?, ?, 'report', 'technical', 'ready') + """, + (report_id, owner_id, task_id), + ) + conn.commit() + finally: + conn.close() + + +def _task_owner(task_id: str): + conn = sqlite3.connect(settings.database_path) + try: + cur = conn.execute("SELECT owner_id FROM tasks WHERE id = ?", (task_id,)) + row = cur.fetchone() + return row[0] if row else None + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Creation wiring +# --------------------------------------------------------------------------- + +def test_started_task_records_requesting_user_as_owner(test_client): + """A task created via the API is owned by the requesting user.""" + from unittest.mock import patch + + with patch("backend.secuscan.executor.TaskExecutor._execute_command") as mock_exec: + mock_exec.return_value = ("Mocked output", 0) + resp = test_client.post( + "/api/v1/task/start", + json={ + "plugin_id": "http_inspector", + "preset": "quick", + "inputs": {"url": "http://127.0.0.1:8000"}, + "consent_granted": True, + }, + headers=ALICE, + ) + assert resp.status_code == 200, resp.text + task_id = resp.json()["task_id"] + assert _task_owner(task_id) == ALICE_OWNER + + +def test_tasks_created_by_distinct_users_get_distinct_owners(test_client): + """The default (no header) owner is distinct from an explicit user.""" + _seed_task("default", "legacy-task") + _seed_task(ALICE_OWNER, "alice-task") + + # The default/no-header client sees only the legacy task. + resp = test_client.get("/api/v1/tasks") + assert resp.status_code == 200 + ids = {t["task_id"] for t in resp.json()["tasks"]} + assert "legacy-task" in ids + assert "alice-task" not in ids + + +# --------------------------------------------------------------------------- +# Cross-user GET / report / cancel / delete on a single task +# --------------------------------------------------------------------------- + +@pytest.mark.parametrize( + "method,path_tmpl", + [ + ("get", "/api/v1/task/{tid}/status"), + ("get", "/api/v1/task/{tid}/result"), + ("get", "/api/v1/task/{tid}/stream"), + ("get", "/api/v1/task/{tid}/report/csv"), + ("get", "/api/v1/task/{tid}/report/html"), + ("get", "/api/v1/task/{tid}/report/pdf"), + ("get", "/api/v1/task/{tid}/report/sarif"), + ("post", "/api/v1/task/{tid}/cancel"), + ("delete", "/api/v1/task/{tid}"), + ], +) +def test_user_b_cannot_access_user_a_task(test_client, method, path_tmpl): + """Every task-scoped endpoint returns 403 for a non-owner.""" + _seed_task(ALICE_OWNER, "alice-task") + path = path_tmpl.format(tid="alice-task") + + resp = getattr(test_client, method)(path, headers=BOB) + assert resp.status_code == 403, f"{method.upper()} {path} -> {resp.status_code}: {resp.text}" + + +def test_user_a_can_access_own_task(test_client): + """The owner retains full access to their own task.""" + _seed_task(ALICE_OWNER, "alice-task") + + assert test_client.get("/api/v1/task/alice-task/status", headers=ALICE).status_code == 200 + assert test_client.get("/api/v1/task/alice-task/result", headers=ALICE).status_code == 200 + + +def test_unknown_task_returns_404_not_403(test_client): + """A genuinely missing task is 404; only ownership mismatch is 403.""" + resp = test_client.get("/api/v1/task/does-not-exist/status", headers=BOB) + assert resp.status_code == 404 + + +# --------------------------------------------------------------------------- +# Listing endpoints must not leak another user's resources +# --------------------------------------------------------------------------- + +def test_task_list_is_scoped_to_owner(test_client): + _seed_task(ALICE_OWNER, "alice-task") + _seed_task(BOB_OWNER, "bob-task") + + alice_ids = {t["task_id"] for t in test_client.get("/api/v1/tasks", headers=ALICE).json()["tasks"]} + bob_ids = {t["task_id"] for t in test_client.get("/api/v1/tasks", headers=BOB).json()["tasks"]} + + assert "alice-task" in alice_ids and "bob-task" not in alice_ids + assert "bob-task" in bob_ids and "alice-task" not in bob_ids + + +def test_findings_list_is_scoped_to_owner(test_client): + _seed_task(ALICE_OWNER, "alice-task") + _seed_task(BOB_OWNER, "bob-task") + _seed_finding(ALICE_OWNER, "alice-finding", "alice-task") + _seed_finding(BOB_OWNER, "bob-finding", "bob-task") + + alice_findings = {f["id"] for f in test_client.get("/api/v1/findings", headers=ALICE).json()["findings"]} + bob_findings = {f["id"] for f in test_client.get("/api/v1/findings", headers=BOB).json()["findings"]} + + assert alice_findings == {"alice-finding"} + assert bob_findings == {"bob-finding"} + + +def test_reports_list_is_scoped_to_owner(test_client): + _seed_task(ALICE_OWNER, "alice-task") + _seed_task(BOB_OWNER, "bob-task") + _seed_report(ALICE_OWNER, "report:alice", "alice-task") + _seed_report(BOB_OWNER, "report:bob", "bob-task") + + alice_reports = {r["id"] for r in test_client.get("/api/v1/reports", headers=ALICE).json()["reports"]} + bob_reports = {r["id"] for r in test_client.get("/api/v1/reports", headers=BOB).json()["reports"]} + + assert alice_reports == {"report:alice"} + assert bob_reports == {"report:bob"} + + +def test_finding_detail_blocks_cross_user_access(test_client): + _seed_task(ALICE_OWNER, "alice-task") + _seed_finding(ALICE_OWNER, "alice-finding", "alice-task") + + assert test_client.get("/api/v1/finding/alice-finding", headers=BOB).status_code == 403 + assert test_client.get("/api/v1/finding/alice-finding", headers=ALICE).status_code == 200 + + +# --------------------------------------------------------------------------- +# Bulk delete must only ever touch the caller's own tasks +# --------------------------------------------------------------------------- + +def test_bulk_delete_ignores_other_users_tasks(test_client): + _seed_task(ALICE_OWNER, "alice-task") + + resp = test_client.request("DELETE", "/api/v1/tasks/bulk", json=["alice-task"], headers=BOB) + assert resp.status_code == 200 + assert resp.json()["deleted_count"] == 0 + # Alice's task must still exist. + assert _task_owner("alice-task") == ALICE_OWNER + + +def test_bulk_delete_removes_only_owned_tasks(test_client): + _seed_task(ALICE_OWNER, "alice-task") + _seed_task(BOB_OWNER, "bob-task") + + # Alice attempts to delete both her task and Bob's in one request. + resp = test_client.request( + "DELETE", "/api/v1/tasks/bulk", json=["alice-task", "bob-task"], headers=ALICE + ) + assert resp.status_code == 200 + assert resp.json()["deleted_count"] == 1 + assert _task_owner("alice-task") is None + assert _task_owner("bob-task") == BOB_OWNER + + +def test_clear_only_purges_callers_history(test_client): + _seed_task(ALICE_OWNER, "alice-task") + _seed_task(BOB_OWNER, "bob-task") + + resp = test_client.delete("/api/v1/tasks/clear", headers=ALICE) + assert resp.status_code == 200 + assert _task_owner("alice-task") is None + assert _task_owner("bob-task") == BOB_OWNER + + +def test_owner_can_delete_own_task(test_client): + _seed_task(ALICE_OWNER, "alice-task", status="completed") + + resp = test_client.delete("/api/v1/task/alice-task", headers=ALICE) + assert resp.status_code == 200 + assert _task_owner("alice-task") is None