Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions services/dashboard-api/app/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import os
import json
import asyncio
import sqlite3
import threading
from datetime import datetime, timezone
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import Response
from fastapi.middleware.cors import CORSMiddleware
Expand All @@ -11,6 +14,8 @@
KAFKA_BROKERS = os.getenv("KAFKA_BROKERS", "localhost:9092")
LLM_SERVICE_URL = os.getenv("LLM_SERVICE_URL", "http://localhost:8004")
GRAPH_SERVICE_URL = os.getenv("GRAPH_SERVICE_URL", "http://localhost:8002")
DB_PATH = os.getenv("DASHBOARD_DB_PATH", "/tmp/fundguard_dashboard.db")
MIN_ELAPSED_MINUTES = 1.0 / 60.0

app = FastAPI(title="Dashboard API")

Expand All @@ -23,9 +28,86 @@
)

clients = set()
db_lock = threading.Lock()


def _db_connection():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn


def init_db():
with db_lock:
conn = _db_connection()
try:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS risk_scores (
id INTEGER PRIMARY KEY AUTOINCREMENT,
transaction_id TEXT NOT NULL,
unified_score REAL NOT NULL,
decision TEXT NOT NULL,
edge_score REAL NOT NULL DEFAULT 0.0,
graph_score REAL NOT NULL DEFAULT 0.0,
rule_score REAL NOT NULL DEFAULT 0.0,
created_at TEXT NOT NULL
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS cases (
id TEXT PRIMARY KEY,
transaction_id TEXT NOT NULL,
risk_score REAL NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL
)
"""
)
conn.commit()
finally:
conn.close()


def persist_risk_score(payload: dict):
txn_id = payload.get("transaction_id") or f"TXN-{int(datetime.now(tz=timezone.utc).timestamp() * 1000)}"
decision = str(payload.get("decision") or "APPROVE")
unified_score = float(payload.get("unified_score") or 0.0)
components = payload.get("components") or {}
edge_score = float(components.get("edge_score") or 0.0)
graph_score = float(components.get("graph_score") or 0.0)
rule_score = float(components.get("rule_score") or 0.0)
created_at = datetime.now(tz=timezone.utc).isoformat()

with db_lock:
conn = _db_connection()
try:
conn.execute(
"""
INSERT INTO risk_scores (
transaction_id, unified_score, decision, edge_score, graph_score, rule_score, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(txn_id, unified_score, decision, edge_score, graph_score, rule_score, created_at),
)
if decision != "APPROVE":
case_id = f"CASE-{txn_id}"
conn.execute(
"""
INSERT OR IGNORE INTO cases (id, transaction_id, risk_score, status, created_at)
VALUES (?, ?, ?, 'Open', ?)
""",
(case_id, txn_id, unified_score * 100, created_at),
)
conn.commit()
finally:
conn.close()

@app.on_event("startup")
async def startup_event():
init_db()
asyncio.create_task(consume_risk_scores())

async def consume_risk_scores():
Expand All @@ -41,6 +123,7 @@ async def consume_risk_scores():
try:
async for msg in consumer:
payload = msg.value
persist_risk_score(payload)
for ws in list(clients):
try:
await ws.send_json(payload)
Expand All @@ -63,6 +146,75 @@ async def websocket_endpoint(websocket: WebSocket):
except WebSocketDisconnect:
clients.remove(websocket)


@app.get("/api/recent-alerts")
async def recent_alerts(limit: int = 20):
with db_lock:
conn = _db_connection()
try:
rows = conn.execute(
"""
SELECT transaction_id, unified_score, decision, created_at
FROM risk_scores
WHERE decision != 'APPROVE'
ORDER BY created_at DESC
LIMIT ?
""",
(max(1, min(limit, 200)),),
).fetchall()
finally:
conn.close()
return [dict(row) for row in rows]


@app.get("/api/stats")
async def dashboard_stats():
with db_lock:
conn = _db_connection()
try:
total = conn.execute("SELECT COUNT(*) FROM risk_scores").fetchone()[0]
rejected = conn.execute("SELECT COUNT(*) FROM risk_scores WHERE decision = 'REJECT'").fetchone()[0]
alerts = conn.execute("SELECT COUNT(*) FROM risk_scores WHERE decision != 'APPROVE'").fetchone()[0]
first_ts = conn.execute("SELECT created_at FROM risk_scores ORDER BY created_at ASC LIMIT 1").fetchone()
finally:
conn.close()

if first_ts:
started = datetime.fromisoformat(first_ts[0])
if started.tzinfo is None:
started = started.replace(tzinfo=timezone.utc)
elapsed_minutes = max((datetime.now(tz=timezone.utc) - started).total_seconds() / 60.0, MIN_ELAPSED_MINUTES)
else:
elapsed_minutes = MIN_ELAPSED_MINUTES

return {
"fraudRate": f"{((rejected / total) * 100 if total else 0.0):.2f}%",
"activeAlerts": alerts,
"transMin": f"{(total / elapsed_minutes if total else 0.0):.1f}",
"highRisk": rejected,
"liveEvents": total,
"rejectedEvents": rejected,
}


@app.get("/api/cases")
async def list_cases(limit: int = 100):
with db_lock:
conn = _db_connection()
try:
rows = conn.execute(
"""
SELECT id, transaction_id, risk_score, status, created_at
FROM cases
ORDER BY created_at DESC
LIMIT ?
""",
(max(1, min(limit, 500)),),
).fetchall()
finally:
conn.close()
return [dict(row) for row in rows]

@app.post("/api/explain")
async def explain_transaction(payload: dict):
# Proxy to llm-service
Expand Down
51 changes: 46 additions & 5 deletions services/dashboard-api/tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,49 @@
from pathlib import Path
from fastapi.testclient import TestClient
from app.main import app
import app.main as dashboard_main
import pytest

client = TestClient(app)
client = TestClient(dashboard_main.app)

def test_explain_proxy():
# Will fail without llm-service, so we just test the API structure
pass

@pytest.fixture(autouse=True)
def reset_db_path():
original_db_path = dashboard_main.DB_PATH
yield
dashboard_main.DB_PATH = original_db_path


def _seed(db_path: Path):
dashboard_main.DB_PATH = str(db_path)
dashboard_main.init_db()
dashboard_main.persist_risk_score({"transaction_id": "TXN-1", "unified_score": 0.20, "decision": "APPROVE"})
dashboard_main.persist_risk_score({"transaction_id": "TXN-2", "unified_score": 0.62, "decision": "REVIEW"})
dashboard_main.persist_risk_score({"transaction_id": "TXN-3", "unified_score": 0.91, "decision": "REJECT"})


def test_recent_alerts_and_stats(tmp_path):
_seed(tmp_path / "dashboard.db")

alerts_resp = client.get("/api/recent-alerts")
assert alerts_resp.status_code == 200
alerts = alerts_resp.json()
assert len(alerts) == 2
assert all(alert["decision"] != "APPROVE" for alert in alerts)
assert alerts[0]["decision"] in {"REJECT", "REVIEW"}

stats_resp = client.get("/api/stats")
assert stats_resp.status_code == 200
stats = stats_resp.json()
assert stats["liveEvents"] == 3
assert stats["activeAlerts"] == 2
assert stats["highRisk"] == 1


def test_cases_list(tmp_path):
_seed(tmp_path / "cases.db")

resp = client.get("/api/cases")
assert resp.status_code == 200
cases = resp.json()
assert len(cases) == 2
assert cases[0]["id"].startswith("CASE-")
27 changes: 26 additions & 1 deletion services/dashboard/src/pages/Alerts.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type AlertRow = {
score: number;
decision: string;
};
const API_BASE_URL = import.meta.env.VITE_DASHBOARD_API_BASE_URL ?? "http://localhost:8005";

export default function Alerts() {
const [alerts, setAlerts] = useState<AlertRow[]>([]);
Expand All @@ -21,6 +22,30 @@ export default function Alerts() {
}, [alerts]);

useEffect(() => {
void (async () => {
try {
const resp = await fetch(`${API_BASE_URL}/api/recent-alerts?limit=100`);
if (!resp.ok) {
return;
}
const data = await resp.json();
setAlerts(
data
.slice()
.reverse()
.map((row: { transaction_id?: string; unified_score?: number; decision?: string; created_at?: string }) => ({
time: row.created_at ? new Date(row.created_at).toLocaleTimeString() : new Date().toLocaleTimeString(),
transaction_id: row.transaction_id || "UNKNOWN_TXN",
details: "Historical anomaly from risk stream",
score: row.unified_score || 0.0,
decision: row.decision || "REVIEW",
}))
);
} catch {
// fallback to live websocket-only mode
}
})();

const ws = new WebSocket("ws://localhost:8005/ws");

ws.onmessage = (event) => {
Expand Down Expand Up @@ -80,4 +105,4 @@ export default function Alerts() {
</div>
</div>
);
}
}
41 changes: 30 additions & 11 deletions services/dashboard/src/pages/Cases.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { useMemo } from 'react';
import { useEffect, useState } from 'react';
import {
useReactTable,
getCoreRowModel,
Expand All @@ -10,11 +10,12 @@ import {

type CaseData = {
id: string;
accountId: string;
transaction_id: string;
riskScore: number;
status: string;
created: string;
}
const API_BASE_URL = import.meta.env.VITE_DASHBOARD_API_BASE_URL ?? "http://localhost:8005";

const columnHelper = createColumnHelper<CaseData>()

Expand All @@ -23,8 +24,8 @@ const columns = [
header: 'Case ID',
cell: info => <span className="font-mono text-gray-300">{info.getValue()}</span>,
}),
columnHelper.accessor('accountId', {
header: 'Account ID',
columnHelper.accessor('transaction_id', {
header: 'Transaction ID',
cell: info => <span className="font-mono text-gray-400">{info.getValue()}</span>,
}),
columnHelper.accessor('riskScore', {
Expand Down Expand Up @@ -54,12 +55,30 @@ const columns = [
]

export default function Cases() {
const data = useMemo<CaseData[]>(() => [
{ id: 'CASE-1001', accountId: 'ACC-09923', riskScore: 92, status: 'Open', created: '2026-05-10T10:15:00' },
{ id: 'CASE-1002', accountId: 'ACC-01044', riskScore: 85, status: 'Open', created: '2026-05-10T09:42:00' },
{ id: 'CASE-1003', accountId: 'ACC-54421', riskScore: 65, status: 'Investigating', created: '2026-05-09T16:20:00' },
{ id: 'CASE-1004', accountId: 'ACC-99812', riskScore: 40, status: 'Closed', created: '2026-05-08T11:05:00' },
], [])
const [data, setData] = useState<CaseData[]>([])

useEffect(() => {
void (async () => {
try {
const resp = await fetch(`${API_BASE_URL}/api/cases?limit=200`);
if (!resp.ok) {
return;
}
const rows = await resp.json();
setData(
rows.map((row: { id: string; transaction_id: string; risk_score: number; status: string; created_at: string }) => ({
id: row.id,
transaction_id: row.transaction_id,
riskScore: row.risk_score,
status: row.status,
created: row.created_at,
}))
);
} catch {
// fallback to empty state
}
})();
}, [])

const table = useReactTable({
data,
Expand Down Expand Up @@ -111,4 +130,4 @@ export default function Cases() {
</div>
</div>
);
}
}
Loading