diff --git a/services/dashboard-api/app/main.py b/services/dashboard-api/app/main.py index cdc0db0..490ebda 100644 --- a/services/dashboard-api/app/main.py +++ b/services/dashboard-api/app/main.py @@ -1,6 +1,9 @@ import os import json import asyncio +import sqlite3 +import threading +from datetime import datetime, timezone from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import Response from fastapi.middleware.cors import CORSMiddleware @@ -11,6 +14,8 @@ KAFKA_BROKERS = os.getenv("KAFKA_BROKERS", "localhost:9092") LLM_SERVICE_URL = os.getenv("LLM_SERVICE_URL", "http://localhost:8004") GRAPH_SERVICE_URL = os.getenv("GRAPH_SERVICE_URL", "http://localhost:8002") +DB_PATH = os.getenv("DASHBOARD_DB_PATH", "/tmp/fundguard_dashboard.db") +MIN_ELAPSED_MINUTES = 1.0 / 60.0 app = FastAPI(title="Dashboard API") @@ -23,9 +28,86 @@ ) clients = set() +db_lock = threading.Lock() + + +def _db_connection(): + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + return conn + + +def init_db(): + with db_lock: + conn = _db_connection() + try: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS risk_scores ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + transaction_id TEXT NOT NULL, + unified_score REAL NOT NULL, + decision TEXT NOT NULL, + edge_score REAL NOT NULL DEFAULT 0.0, + graph_score REAL NOT NULL DEFAULT 0.0, + rule_score REAL NOT NULL DEFAULT 0.0, + created_at TEXT NOT NULL + ) + """ + ) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS cases ( + id TEXT PRIMARY KEY, + transaction_id TEXT NOT NULL, + risk_score REAL NOT NULL, + status TEXT NOT NULL, + created_at TEXT NOT NULL + ) + """ + ) + conn.commit() + finally: + conn.close() + + +def persist_risk_score(payload: dict): + txn_id = payload.get("transaction_id") or f"TXN-{int(datetime.now(tz=timezone.utc).timestamp() * 1000)}" + decision = str(payload.get("decision") or "APPROVE") + unified_score = float(payload.get("unified_score") or 0.0) + components = payload.get("components") or {} + edge_score = float(components.get("edge_score") or 0.0) + graph_score = float(components.get("graph_score") or 0.0) + rule_score = float(components.get("rule_score") or 0.0) + created_at = datetime.now(tz=timezone.utc).isoformat() + + with db_lock: + conn = _db_connection() + try: + conn.execute( + """ + INSERT INTO risk_scores ( + transaction_id, unified_score, decision, edge_score, graph_score, rule_score, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?) + """, + (txn_id, unified_score, decision, edge_score, graph_score, rule_score, created_at), + ) + if decision != "APPROVE": + case_id = f"CASE-{txn_id}" + conn.execute( + """ + INSERT OR IGNORE INTO cases (id, transaction_id, risk_score, status, created_at) + VALUES (?, ?, ?, 'Open', ?) + """, + (case_id, txn_id, unified_score * 100, created_at), + ) + conn.commit() + finally: + conn.close() @app.on_event("startup") async def startup_event(): + init_db() asyncio.create_task(consume_risk_scores()) async def consume_risk_scores(): @@ -41,6 +123,7 @@ async def consume_risk_scores(): try: async for msg in consumer: payload = msg.value + persist_risk_score(payload) for ws in list(clients): try: await ws.send_json(payload) @@ -63,6 +146,75 @@ async def websocket_endpoint(websocket: WebSocket): except WebSocketDisconnect: clients.remove(websocket) + +@app.get("/api/recent-alerts") +async def recent_alerts(limit: int = 20): + with db_lock: + conn = _db_connection() + try: + rows = conn.execute( + """ + SELECT transaction_id, unified_score, decision, created_at + FROM risk_scores + WHERE decision != 'APPROVE' + ORDER BY created_at DESC + LIMIT ? + """, + (max(1, min(limit, 200)),), + ).fetchall() + finally: + conn.close() + return [dict(row) for row in rows] + + +@app.get("/api/stats") +async def dashboard_stats(): + with db_lock: + conn = _db_connection() + try: + total = conn.execute("SELECT COUNT(*) FROM risk_scores").fetchone()[0] + rejected = conn.execute("SELECT COUNT(*) FROM risk_scores WHERE decision = 'REJECT'").fetchone()[0] + alerts = conn.execute("SELECT COUNT(*) FROM risk_scores WHERE decision != 'APPROVE'").fetchone()[0] + first_ts = conn.execute("SELECT created_at FROM risk_scores ORDER BY created_at ASC LIMIT 1").fetchone() + finally: + conn.close() + + if first_ts: + started = datetime.fromisoformat(first_ts[0]) + if started.tzinfo is None: + started = started.replace(tzinfo=timezone.utc) + elapsed_minutes = max((datetime.now(tz=timezone.utc) - started).total_seconds() / 60.0, MIN_ELAPSED_MINUTES) + else: + elapsed_minutes = MIN_ELAPSED_MINUTES + + return { + "fraudRate": f"{((rejected / total) * 100 if total else 0.0):.2f}%", + "activeAlerts": alerts, + "transMin": f"{(total / elapsed_minutes if total else 0.0):.1f}", + "highRisk": rejected, + "liveEvents": total, + "rejectedEvents": rejected, + } + + +@app.get("/api/cases") +async def list_cases(limit: int = 100): + with db_lock: + conn = _db_connection() + try: + rows = conn.execute( + """ + SELECT id, transaction_id, risk_score, status, created_at + FROM cases + ORDER BY created_at DESC + LIMIT ? + """, + (max(1, min(limit, 500)),), + ).fetchall() + finally: + conn.close() + return [dict(row) for row in rows] + @app.post("/api/explain") async def explain_transaction(payload: dict): # Proxy to llm-service diff --git a/services/dashboard-api/tests/test_api.py b/services/dashboard-api/tests/test_api.py index 3c28735..ef59f0d 100644 --- a/services/dashboard-api/tests/test_api.py +++ b/services/dashboard-api/tests/test_api.py @@ -1,8 +1,49 @@ +from pathlib import Path from fastapi.testclient import TestClient -from app.main import app +import app.main as dashboard_main +import pytest -client = TestClient(app) +client = TestClient(dashboard_main.app) -def test_explain_proxy(): - # Will fail without llm-service, so we just test the API structure - pass + +@pytest.fixture(autouse=True) +def reset_db_path(): + original_db_path = dashboard_main.DB_PATH + yield + dashboard_main.DB_PATH = original_db_path + + +def _seed(db_path: Path): + dashboard_main.DB_PATH = str(db_path) + dashboard_main.init_db() + dashboard_main.persist_risk_score({"transaction_id": "TXN-1", "unified_score": 0.20, "decision": "APPROVE"}) + dashboard_main.persist_risk_score({"transaction_id": "TXN-2", "unified_score": 0.62, "decision": "REVIEW"}) + dashboard_main.persist_risk_score({"transaction_id": "TXN-3", "unified_score": 0.91, "decision": "REJECT"}) + + +def test_recent_alerts_and_stats(tmp_path): + _seed(tmp_path / "dashboard.db") + + alerts_resp = client.get("/api/recent-alerts") + assert alerts_resp.status_code == 200 + alerts = alerts_resp.json() + assert len(alerts) == 2 + assert all(alert["decision"] != "APPROVE" for alert in alerts) + assert alerts[0]["decision"] in {"REJECT", "REVIEW"} + + stats_resp = client.get("/api/stats") + assert stats_resp.status_code == 200 + stats = stats_resp.json() + assert stats["liveEvents"] == 3 + assert stats["activeAlerts"] == 2 + assert stats["highRisk"] == 1 + + +def test_cases_list(tmp_path): + _seed(tmp_path / "cases.db") + + resp = client.get("/api/cases") + assert resp.status_code == 200 + cases = resp.json() + assert len(cases) == 2 + assert cases[0]["id"].startswith("CASE-") diff --git a/services/dashboard/src/pages/Alerts.tsx b/services/dashboard/src/pages/Alerts.tsx index 96111b7..710a444 100644 --- a/services/dashboard/src/pages/Alerts.tsx +++ b/services/dashboard/src/pages/Alerts.tsx @@ -8,6 +8,7 @@ type AlertRow = { score: number; decision: string; }; +const API_BASE_URL = import.meta.env.VITE_DASHBOARD_API_BASE_URL ?? "http://localhost:8005"; export default function Alerts() { const [alerts, setAlerts] = useState([]); @@ -21,6 +22,30 @@ export default function Alerts() { }, [alerts]); useEffect(() => { + void (async () => { + try { + const resp = await fetch(`${API_BASE_URL}/api/recent-alerts?limit=100`); + if (!resp.ok) { + return; + } + const data = await resp.json(); + setAlerts( + data + .slice() + .reverse() + .map((row: { transaction_id?: string; unified_score?: number; decision?: string; created_at?: string }) => ({ + time: row.created_at ? new Date(row.created_at).toLocaleTimeString() : new Date().toLocaleTimeString(), + transaction_id: row.transaction_id || "UNKNOWN_TXN", + details: "Historical anomaly from risk stream", + score: row.unified_score || 0.0, + decision: row.decision || "REVIEW", + })) + ); + } catch { + // fallback to live websocket-only mode + } + })(); + const ws = new WebSocket("ws://localhost:8005/ws"); ws.onmessage = (event) => { @@ -80,4 +105,4 @@ export default function Alerts() { ); -} \ No newline at end of file +} diff --git a/services/dashboard/src/pages/Cases.tsx b/services/dashboard/src/pages/Cases.tsx index a38be3a..13cf335 100644 --- a/services/dashboard/src/pages/Cases.tsx +++ b/services/dashboard/src/pages/Cases.tsx @@ -1,4 +1,4 @@ -import { useMemo } from 'react'; +import { useEffect, useState } from 'react'; import { useReactTable, getCoreRowModel, @@ -10,11 +10,12 @@ import { type CaseData = { id: string; - accountId: string; + transaction_id: string; riskScore: number; status: string; created: string; } +const API_BASE_URL = import.meta.env.VITE_DASHBOARD_API_BASE_URL ?? "http://localhost:8005"; const columnHelper = createColumnHelper() @@ -23,8 +24,8 @@ const columns = [ header: 'Case ID', cell: info => {info.getValue()}, }), - columnHelper.accessor('accountId', { - header: 'Account ID', + columnHelper.accessor('transaction_id', { + header: 'Transaction ID', cell: info => {info.getValue()}, }), columnHelper.accessor('riskScore', { @@ -54,12 +55,30 @@ const columns = [ ] export default function Cases() { - const data = useMemo(() => [ - { id: 'CASE-1001', accountId: 'ACC-09923', riskScore: 92, status: 'Open', created: '2026-05-10T10:15:00' }, - { id: 'CASE-1002', accountId: 'ACC-01044', riskScore: 85, status: 'Open', created: '2026-05-10T09:42:00' }, - { id: 'CASE-1003', accountId: 'ACC-54421', riskScore: 65, status: 'Investigating', created: '2026-05-09T16:20:00' }, - { id: 'CASE-1004', accountId: 'ACC-99812', riskScore: 40, status: 'Closed', created: '2026-05-08T11:05:00' }, - ], []) + const [data, setData] = useState([]) + + useEffect(() => { + void (async () => { + try { + const resp = await fetch(`${API_BASE_URL}/api/cases?limit=200`); + if (!resp.ok) { + return; + } + const rows = await resp.json(); + setData( + rows.map((row: { id: string; transaction_id: string; risk_score: number; status: string; created_at: string }) => ({ + id: row.id, + transaction_id: row.transaction_id, + riskScore: row.risk_score, + status: row.status, + created: row.created_at, + })) + ); + } catch { + // fallback to empty state + } + })(); + }, []) const table = useReactTable({ data, @@ -111,4 +130,4 @@ export default function Cases() { ); -} \ No newline at end of file +} diff --git a/services/dashboard/src/pages/Dashboard.tsx b/services/dashboard/src/pages/Dashboard.tsx index 9fe4e3c..ddca791 100644 --- a/services/dashboard/src/pages/Dashboard.tsx +++ b/services/dashboard/src/pages/Dashboard.tsx @@ -10,6 +10,7 @@ type RiskEvent = { rule_score?: number; }; }; +const API_BASE_URL = import.meta.env.VITE_DASHBOARD_API_BASE_URL ?? "http://localhost:8005"; export default function Dashboard() { const [alerts, setAlerts] = useState([]); @@ -26,6 +27,28 @@ export default function Dashboard() { }); useEffect(() => { + void (async () => { + try { + const [alertsResp, statsResp] = await Promise.all([ + fetch(`${API_BASE_URL}/api/recent-alerts?limit=10`), + fetch(`${API_BASE_URL}/api/stats`), + ]); + if (alertsResp.ok) { + const initialAlerts = (await alertsResp.json()) as RiskEvent[]; + setAlerts(initialAlerts); + if (initialAlerts.length > 0) { + setLatestEvent(initialAlerts[0]); + } + } + if (statsResp.ok) { + const initialStats = await statsResp.json(); + setStats((prev) => ({ ...prev, ...initialStats })); + } + } catch { + // fallback to live websocket-only mode + } + })(); + const ws = new WebSocket("ws://localhost:8005/ws"); ws.onopen = () => {