From 97b7fb8e8b89c7408716a5ccae20466e4535f08b Mon Sep 17 00:00:00 2001 From: Jigar <1252580+artvandelay@users.noreply.github.com> Date: Sat, 10 Jan 2026 00:51:22 -0800 Subject: [PATCH] Add service containers and ops scaffolding --- README.md | 15 ++++- docs/configuration.md | 41 ++++++++++++ docs/runbooks.md | 41 ++++++++++++ env.example | 12 ++++ pyproject.toml | 5 +- scripts/db_backup.sh | 9 +++ scripts/db_migrate.sh | 6 ++ scripts/db_restore.sh | 9 +++ services/detect/Dockerfile | 15 +++++ services/enrich/Dockerfile | 15 +++++ services/ingest/Dockerfile | 15 +++++ services/report/Dockerfile | 15 +++++ src/nlbt/services/__init__.py | 1 + src/nlbt/services/detect.py | 23 +++++++ src/nlbt/services/enrich.py | 22 +++++++ src/nlbt/services/ingest.py | 44 +++++++++++++ src/nlbt/services/observability.py | 100 ++++++++++++++++++++++++++++ src/nlbt/services/report.py | 22 +++++++ src/nlbt/services/wikimedia.py | 102 +++++++++++++++++++++++++++++ storage/migrations/001_init.sql | 35 ++++++++++ storage/schema.sql | 35 ++++++++++ 21 files changed, 580 insertions(+), 2 deletions(-) create mode 100644 docs/configuration.md create mode 100644 docs/runbooks.md create mode 100755 scripts/db_backup.sh create mode 100755 scripts/db_migrate.sh create mode 100755 scripts/db_restore.sh create mode 100644 services/detect/Dockerfile create mode 100644 services/enrich/Dockerfile create mode 100644 services/ingest/Dockerfile create mode 100644 services/report/Dockerfile create mode 100644 src/nlbt/services/__init__.py create mode 100644 src/nlbt/services/detect.py create mode 100644 src/nlbt/services/enrich.py create mode 100644 src/nlbt/services/ingest.py create mode 100644 src/nlbt/services/observability.py create mode 100644 src/nlbt/services/report.py create mode 100644 src/nlbt/services/wikimedia.py create mode 100644 storage/migrations/001_init.sql create mode 100644 storage/schema.sql diff --git a/README.md b/README.md index fdeaeed..83447b3 100644 --- a/README.md +++ b/README.md @@ -451,6 +451,19 @@ See issues or open a PR! --- +## 🛠️ Services & Operations + +NLBT now includes containerized ingest/enrich/detect/report services with observability, +storage migrations, and runbooks: + +- Dockerfiles: `services/ingest`, `services/enrich`, `services/detect`, `services/report` +- Configuration: `docs/configuration.md` +- Runbooks: `docs/runbooks.md` +- Migrations: `storage/migrations` +- Backup/restore: `scripts/db_backup.sh`, `scripts/db_restore.sh` + +--- + ## 📄 License GPL-3.0 License. See `LICENSE`. @@ -500,4 +513,4 @@ This project implements several **Agentic Design Patterns**: See `cursor_chats/Agentic_Design_Patterns_Complete.md` for detailed documentation. - \ No newline at end of file + diff --git a/docs/configuration.md b/docs/configuration.md new file mode 100644 index 0000000..bb5c091 --- /dev/null +++ b/docs/configuration.md @@ -0,0 +1,41 @@ +# Configuration + +## Service endpoints + +| Service | Endpoint | Description | +| --- | --- | --- | +| Ingest | `POST /ingest` | Pull recent changes from Wikimedia and normalize payloads. | +| Enrich | `POST /enrich` | Add metadata to ingest payloads. | +| Detect | `POST /detect` | Evaluate enriched data for signals. | +| Report | `POST /report` | Generate report records. | +| Metrics | `GET /metrics` | Prometheus metrics for each service. | +| Health | `GET /healthz` | Liveness probe. | +| Ready | `GET /readyz` | Readiness probe. | + +## Environment variables + +### Shared + +- `LOG_LEVEL` (default: `INFO`) +- `DATABASE_URL` (required for migrations/backups) +- `RETENTION_DAYS` (default: `30`) + +### Wikimedia API guards (ingest service) + +- `WIKIMEDIA_BASE_URL` (default: `https://en.wikipedia.org`) +- `WIKIMEDIA_RATE_LIMIT` (default: `60`) — max requests per window. +- `WIKIMEDIA_RATE_WINDOW` (default: `60`) — seconds per window. +- `WIKIMEDIA_FAILURE_THRESHOLD` (default: `5`) — failures before circuit opens. +- `WIKIMEDIA_RECOVERY_SECONDS` (default: `30`) — cooldown before retry. + +## Retention windows + +- Ingest/enrich/detect/report records should be retained for `RETENTION_DAYS`. +- Recommended defaults: + - `30` days for ingest/enrich/detect. + - `90` days for reports. + +## Thresholds + +- Circuit breaker opens after `WIKIMEDIA_FAILURE_THRESHOLD` failures. +- Rate limiter defaults to `60` requests per minute. diff --git a/docs/runbooks.md b/docs/runbooks.md new file mode 100644 index 0000000..d6b8d49 --- /dev/null +++ b/docs/runbooks.md @@ -0,0 +1,41 @@ +# Runbooks + +## Service restart + +**When to use**: deployment rollouts, config changes, or a stuck worker. + +1. Confirm readiness endpoints return healthy: + - `GET /healthz` and `GET /readyz`. +2. Drain traffic (remove instance from load balancer). +3. Restart the container: + - `docker restart ` +4. Validate metrics are flowing: + - `GET /metrics` returns `nlbt_requests_total`. +5. Re-add instance to the load balancer. + +## Data backfill + +**When to use**: missed ingest window, replay upstream data, or reprocessing. + +1. Ensure storage is available and migrations are current: + - `scripts/db_migrate.sh` +2. Temporarily raise retention window if needed (see configuration doc). +3. Run the backfill job by calling ingest with a backfill flag (or replay from storage): + - `POST /ingest` with `{"mode": "backfill", "range": "/"}` +4. Monitor enrich/detect/report pipelines: + - Check logs for `enrich_received`, `detect_received`, `report_received`. +5. Once complete, reset retention overrides and verify downstream counts. + +## Backup/restore + +**Backup** + +```bash +DATABASE_URL=postgres://... BACKUP_PATH=backup.dump scripts/db_backup.sh +``` + +**Restore** + +```bash +DATABASE_URL=postgres://... BACKUP_PATH=backup.dump scripts/db_restore.sh +``` diff --git a/env.example b/env.example index 0519361..ef99e62 100644 --- a/env.example +++ b/env.example @@ -11,3 +11,15 @@ MAX_AGENT_ITERATIONS=20 # Optional: Override default model per session # LLM_MODEL_OVERRIDE=gpt-4o + +# Service Configuration +LOG_LEVEL=INFO +DATABASE_URL=postgres://user:pass@localhost:5432/nlbt +RETENTION_DAYS=30 + +# Wikimedia API guards (ingest service) +WIKIMEDIA_BASE_URL=https://en.wikipedia.org +WIKIMEDIA_RATE_LIMIT=60 +WIKIMEDIA_RATE_WINDOW=60 +WIKIMEDIA_FAILURE_THRESHOLD=5 +WIKIMEDIA_RECOVERY_SECONDS=30 diff --git a/pyproject.toml b/pyproject.toml index d892400..5908250 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,8 +19,11 @@ dependencies = [ "matplotlib", "markdown", "weasyprint", + "fastapi", + "httpx", + "prometheus_client", + "uvicorn", ] [project.scripts] nlbt = "nlbt.cli:main" - diff --git a/scripts/db_backup.sh b/scripts/db_backup.sh new file mode 100755 index 0000000..e04b155 --- /dev/null +++ b/scripts/db_backup.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +set -euo pipefail + +DATABASE_URL=${DATABASE_URL:?"DATABASE_URL is required"} +BACKUP_PATH=${BACKUP_PATH:-backup_$(date +%Y%m%d_%H%M%S).dump} + +pg_dump --format=custom --file "$BACKUP_PATH" "$DATABASE_URL" + +echo "Backup written to $BACKUP_PATH" diff --git a/scripts/db_migrate.sh b/scripts/db_migrate.sh new file mode 100755 index 0000000..a754048 --- /dev/null +++ b/scripts/db_migrate.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash +set -euo pipefail + +DATABASE_URL=${DATABASE_URL:?"DATABASE_URL is required"} + +psql "$DATABASE_URL" -f storage/migrations/001_init.sql diff --git a/scripts/db_restore.sh b/scripts/db_restore.sh new file mode 100755 index 0000000..74cdb84 --- /dev/null +++ b/scripts/db_restore.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +set -euo pipefail + +DATABASE_URL=${DATABASE_URL:?"DATABASE_URL is required"} +BACKUP_PATH=${BACKUP_PATH:?"BACKUP_PATH is required"} + +pg_restore --clean --if-exists --dbname "$DATABASE_URL" "$BACKUP_PATH" + +echo "Restored from $BACKUP_PATH" diff --git a/services/detect/Dockerfile b/services/detect/Dockerfile new file mode 100644 index 0000000..bc614f4 --- /dev/null +++ b/services/detect/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY pyproject.toml README.md /app/ +COPY src /app/src + +RUN pip install --no-cache-dir --upgrade pip \ + && pip install --no-cache-dir -e . + +ENV PYTHONUNBUFFERED=1 + +EXPOSE 8000 + +CMD ["uvicorn", "nlbt.services.detect:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/services/enrich/Dockerfile b/services/enrich/Dockerfile new file mode 100644 index 0000000..9bebace --- /dev/null +++ b/services/enrich/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY pyproject.toml README.md /app/ +COPY src /app/src + +RUN pip install --no-cache-dir --upgrade pip \ + && pip install --no-cache-dir -e . + +ENV PYTHONUNBUFFERED=1 + +EXPOSE 8000 + +CMD ["uvicorn", "nlbt.services.enrich:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/services/ingest/Dockerfile b/services/ingest/Dockerfile new file mode 100644 index 0000000..0c512af --- /dev/null +++ b/services/ingest/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY pyproject.toml README.md /app/ +COPY src /app/src + +RUN pip install --no-cache-dir --upgrade pip \ + && pip install --no-cache-dir -e . + +ENV PYTHONUNBUFFERED=1 + +EXPOSE 8000 + +CMD ["uvicorn", "nlbt.services.ingest:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/services/report/Dockerfile b/services/report/Dockerfile new file mode 100644 index 0000000..cc893fc --- /dev/null +++ b/services/report/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY pyproject.toml README.md /app/ +COPY src /app/src + +RUN pip install --no-cache-dir --upgrade pip \ + && pip install --no-cache-dir -e . + +ENV PYTHONUNBUFFERED=1 + +EXPOSE 8000 + +CMD ["uvicorn", "nlbt.services.report:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/src/nlbt/services/__init__.py b/src/nlbt/services/__init__.py new file mode 100644 index 0000000..ca99449 --- /dev/null +++ b/src/nlbt/services/__init__.py @@ -0,0 +1 @@ +"""Service entrypoints for NLBT microservices.""" diff --git a/src/nlbt/services/detect.py b/src/nlbt/services/detect.py new file mode 100644 index 0000000..3e49a2f --- /dev/null +++ b/src/nlbt/services/detect.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +import logging + +from fastapi import FastAPI + +from nlbt.services.observability import add_observability, configure_logging + +SERVICE_NAME = "detect" +configure_logging(SERVICE_NAME) +logger = logging.getLogger("nlbt.services.detect") + +app = FastAPI(title="NLBT Detect Service") + + +@app.post("/detect") +def detect(payload: dict) -> dict: + logger.info("detect_received", extra={"context": {"keys": list(payload.keys())}}) + findings = [{"rule": "placeholder", "severity": "low"}] + return {"status": "ok", "findings": findings} + + +add_observability(app, SERVICE_NAME) diff --git a/src/nlbt/services/enrich.py b/src/nlbt/services/enrich.py new file mode 100644 index 0000000..057b0e4 --- /dev/null +++ b/src/nlbt/services/enrich.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +import logging + +from fastapi import FastAPI + +from nlbt.services.observability import add_observability, configure_logging + +SERVICE_NAME = "enrich" +configure_logging(SERVICE_NAME) +logger = logging.getLogger("nlbt.services.enrich") + +app = FastAPI(title="NLBT Enrich Service") + + +@app.post("/enrich") +def enrich(payload: dict) -> dict: + logger.info("enrich_received", extra={"context": {"keys": list(payload.keys())}}) + return {"status": "ok", "enriched": payload} + + +add_observability(app, SERVICE_NAME) diff --git a/src/nlbt/services/ingest.py b/src/nlbt/services/ingest.py new file mode 100644 index 0000000..cfe9b55 --- /dev/null +++ b/src/nlbt/services/ingest.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +import logging +import os + +from fastapi import FastAPI, HTTPException + +from nlbt.services.observability import add_observability, configure_logging +from nlbt.services.wikimedia import CircuitBreaker, RateLimiter, WikimediaClient + +SERVICE_NAME = "ingest" +configure_logging(SERVICE_NAME) +logger = logging.getLogger("nlbt.services.ingest") + +app = FastAPI(title="NLBT Ingest Service") + +_rate_limiter = RateLimiter( + max_requests=int(os.getenv("WIKIMEDIA_RATE_LIMIT", "60")), + per_seconds=float(os.getenv("WIKIMEDIA_RATE_WINDOW", "60")), +) +_circuit_breaker = CircuitBreaker( + failure_threshold=int(os.getenv("WIKIMEDIA_FAILURE_THRESHOLD", "5")), + recovery_seconds=float(os.getenv("WIKIMEDIA_RECOVERY_SECONDS", "30")), +) +_wikimedia_client = WikimediaClient( + base_url=os.getenv("WIKIMEDIA_BASE_URL", "https://en.wikipedia.org"), + rate_limiter=_rate_limiter, + circuit_breaker=_circuit_breaker, +) + + +@app.post("/ingest") +def ingest() -> dict: + try: + data = _wikimedia_client.fetch_recent_changes() + except RuntimeError as exc: + raise HTTPException(status_code=503, detail=str(exc)) from exc + except Exception as exc: # noqa: BLE001 + logger.exception("ingest_failed") + raise HTTPException(status_code=502, detail="Failed to fetch Wikimedia data") from exc + return {"status": "ok", "items": data.get("query", {}).get("recentchanges", [])} + + +add_observability(app, SERVICE_NAME) diff --git a/src/nlbt/services/observability.py b/src/nlbt/services/observability.py new file mode 100644 index 0000000..88ed334 --- /dev/null +++ b/src/nlbt/services/observability.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +import json +import logging +import os +import time +from contextlib import contextmanager +from typing import Callable, Optional + +from fastapi import FastAPI, Request, Response +from prometheus_client import CONTENT_TYPE_LATEST, Counter, Histogram, generate_latest + +_REQUEST_COUNT = Counter( + "nlbt_requests_total", + "Total HTTP requests", + ["service", "method", "path", "status"], +) +_REQUEST_LATENCY = Histogram( + "nlbt_request_latency_seconds", + "Request latency in seconds", + ["service", "method", "path"], +) + + +class JsonFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + payload = { + "level": record.levelname, + "message": record.getMessage(), + "logger": record.name, + "timestamp": int(record.created), + } + if hasattr(record, "context"): + payload["context"] = record.context + if record.exc_info: + payload["exception"] = self.formatException(record.exc_info) + return json.dumps(payload, ensure_ascii=False) + + +def configure_logging(service_name: str, level: int = logging.INFO) -> None: + env_level = os.getenv("LOG_LEVEL") + if env_level: + level = logging.getLevelName(env_level.upper()) + handler = logging.StreamHandler() + handler.setFormatter(JsonFormatter()) + root = logging.getLogger() + root.setLevel(level) + root.handlers = [handler] + logging.getLogger("uvicorn.access").handlers = [handler] + logging.getLogger("uvicorn.error").handlers = [handler] + logging.getLogger(service_name).setLevel(level) + + +def add_observability( + app: FastAPI, + service_name: str, + readiness_check: Optional[Callable[[], bool]] = None, +) -> None: + @app.middleware("http") + async def metrics_middleware(request: Request, call_next: Callable[[Request], Response]): + start = time.perf_counter() + response = await call_next(request) + duration = time.perf_counter() - start + path = request.url.path + _REQUEST_COUNT.labels( + service=service_name, + method=request.method, + path=path, + status=response.status_code, + ).inc() + _REQUEST_LATENCY.labels( + service=service_name, + method=request.method, + path=path, + ).observe(duration) + return response + + @app.get("/metrics") + def metrics() -> Response: + data = generate_latest() + return Response(content=data, media_type=CONTENT_TYPE_LATEST) + + @app.get("/healthz") + def healthz() -> dict[str, str]: + return {"status": "ok"} + + @app.get("/readyz") + def readyz() -> dict[str, str]: + ready = True if readiness_check is None else readiness_check() + return {"status": "ready" if ready else "not_ready"} + + +@contextmanager +def log_context(logger: logging.Logger, **fields: str): + extra = {"context": fields} + try: + logger.info("context_start", extra=extra) + yield + finally: + logger.info("context_end", extra=extra) diff --git a/src/nlbt/services/report.py b/src/nlbt/services/report.py new file mode 100644 index 0000000..8101554 --- /dev/null +++ b/src/nlbt/services/report.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +import logging + +from fastapi import FastAPI + +from nlbt.services.observability import add_observability, configure_logging + +SERVICE_NAME = "report" +configure_logging(SERVICE_NAME) +logger = logging.getLogger("nlbt.services.report") + +app = FastAPI(title="NLBT Report Service") + + +@app.post("/report") +def report(payload: dict) -> dict: + logger.info("report_received", extra={"context": {"keys": list(payload.keys())}}) + return {"status": "ok", "report_id": "placeholder-report"} + + +add_observability(app, SERVICE_NAME) diff --git a/src/nlbt/services/wikimedia.py b/src/nlbt/services/wikimedia.py new file mode 100644 index 0000000..e1979c7 --- /dev/null +++ b/src/nlbt/services/wikimedia.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +import logging +import time +from dataclasses import dataclass +from typing import Optional + +import httpx + +logger = logging.getLogger("nlbt.services.wikimedia") + + +@dataclass +class RateLimiter: + max_requests: int + per_seconds: float + + def __post_init__(self) -> None: + self._tokens = self.max_requests + self._last_refill = time.monotonic() + + def acquire(self) -> None: + now = time.monotonic() + elapsed = now - self._last_refill + refill = elapsed * (self.max_requests / self.per_seconds) + if refill > 0: + self._tokens = min(self.max_requests, self._tokens + refill) + self._last_refill = now + if self._tokens < 1: + sleep_time = (1 - self._tokens) * (self.per_seconds / self.max_requests) + logger.warning("rate_limit_sleep", extra={"context": {"sleep": sleep_time}}) + time.sleep(sleep_time) + self._tokens = max(0, self._tokens - 1) + else: + self._tokens -= 1 + + +@dataclass +class CircuitBreaker: + failure_threshold: int + recovery_seconds: float + + def __post_init__(self) -> None: + self._failures = 0 + self._opened_at: Optional[float] = None + + def allow(self) -> bool: + if self._opened_at is None: + return True + if time.monotonic() - self._opened_at >= self.recovery_seconds: + self._opened_at = None + self._failures = 0 + return True + return False + + def record_success(self) -> None: + self._failures = 0 + self._opened_at = None + + def record_failure(self) -> None: + self._failures += 1 + if self._failures >= self.failure_threshold: + self._opened_at = time.monotonic() + logger.error("circuit_breaker_open", extra={"context": {"failures": self._failures}}) + + +class WikimediaClient: + def __init__( + self, + base_url: str, + rate_limiter: RateLimiter, + circuit_breaker: CircuitBreaker, + timeout: float = 10.0, + ) -> None: + self._base_url = base_url.rstrip("/") + self._rate_limiter = rate_limiter + self._circuit_breaker = circuit_breaker + self._client = httpx.Client(timeout=timeout) + + def fetch_recent_changes(self) -> dict: + if not self._circuit_breaker.allow(): + raise RuntimeError("Circuit breaker open; Wikimedia API temporarily unavailable") + self._rate_limiter.acquire() + url = f"{self._base_url}/w/api.php" + params = { + "action": "query", + "list": "recentchanges", + "rcprop": "title|timestamp|user", + "format": "json", + } + try: + response = self._client.get(url, params=params) + response.raise_for_status() + self._circuit_breaker.record_success() + return response.json() + except httpx.HTTPError as exc: + self._circuit_breaker.record_failure() + logger.exception("wikimedia_api_error", extra={"context": {"error": str(exc)}}) + raise + + def close(self) -> None: + self._client.close() diff --git a/storage/migrations/001_init.sql b/storage/migrations/001_init.sql new file mode 100644 index 0000000..084ed34 --- /dev/null +++ b/storage/migrations/001_init.sql @@ -0,0 +1,35 @@ +-- Initialize core tables for ingest/enrich/detect/report pipeline. + +CREATE TABLE IF NOT EXISTS ingest_events ( + id BIGSERIAL PRIMARY KEY, + source VARCHAR(255) NOT NULL, + payload JSONB NOT NULL, + ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS enriched_events ( + id BIGSERIAL PRIMARY KEY, + ingest_event_id BIGINT REFERENCES ingest_events(id) ON DELETE CASCADE, + payload JSONB NOT NULL, + enriched_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS detections ( + id BIGSERIAL PRIMARY KEY, + enriched_event_id BIGINT REFERENCES enriched_events(id) ON DELETE CASCADE, + signal VARCHAR(128) NOT NULL, + severity VARCHAR(32) NOT NULL, + metadata JSONB, + detected_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS reports ( + id BIGSERIAL PRIMARY KEY, + detection_id BIGINT REFERENCES detections(id) ON DELETE SET NULL, + report_payload JSONB NOT NULL, + generated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_ingest_events_ingested_at ON ingest_events(ingested_at); +CREATE INDEX IF NOT EXISTS idx_enriched_events_enriched_at ON enriched_events(enriched_at); +CREATE INDEX IF NOT EXISTS idx_detections_detected_at ON detections(detected_at); diff --git a/storage/schema.sql b/storage/schema.sql new file mode 100644 index 0000000..19bb8f5 --- /dev/null +++ b/storage/schema.sql @@ -0,0 +1,35 @@ +-- Latest schema snapshot for NLBT services. + +CREATE TABLE IF NOT EXISTS ingest_events ( + id BIGSERIAL PRIMARY KEY, + source VARCHAR(255) NOT NULL, + payload JSONB NOT NULL, + ingested_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS enriched_events ( + id BIGSERIAL PRIMARY KEY, + ingest_event_id BIGINT REFERENCES ingest_events(id) ON DELETE CASCADE, + payload JSONB NOT NULL, + enriched_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS detections ( + id BIGSERIAL PRIMARY KEY, + enriched_event_id BIGINT REFERENCES enriched_events(id) ON DELETE CASCADE, + signal VARCHAR(128) NOT NULL, + severity VARCHAR(32) NOT NULL, + metadata JSONB, + detected_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS reports ( + id BIGSERIAL PRIMARY KEY, + detection_id BIGINT REFERENCES detections(id) ON DELETE SET NULL, + report_payload JSONB NOT NULL, + generated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_ingest_events_ingested_at ON ingest_events(ingested_at); +CREATE INDEX IF NOT EXISTS idx_enriched_events_enriched_at ON enriched_events(enriched_at); +CREATE INDEX IF NOT EXISTS idx_detections_detected_at ON detections(detected_at);