Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions backend/app/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
from __future__ import annotations

import sys
from pathlib import Path

_ENGINE_PATH = Path(__file__).resolve().parents[2] / "engine"
if _ENGINE_PATH.exists() and str(_ENGINE_PATH) not in sys.path:
sys.path.insert(0, str(_ENGINE_PATH))
12 changes: 10 additions & 2 deletions backend/app/api/routes_analysis.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
from __future__ import annotations

from backend.app.schemas.analysis import AnalysisRequest, AnalysisResponse
from backend.app.services.analyzer_service import analyze
from fastapi import APIRouter

router = APIRouter(prefix="/analysis", tags=["analysis"])
router = APIRouter(tags=["analysis"])


@router.post("/analyze", response_model=AnalysisResponse)
def analyze_endpoint(request: AnalysisRequest) -> dict[str, object]:
return analyze(request.text)
return analyze(request)


@router.post("/analysis/analyze", response_model=AnalysisResponse)
def analyze_legacy_endpoint(request: AnalysisRequest) -> dict[str, object]:
return analyze(request)
1 change: 1 addition & 0 deletions backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
allow_headers=["*"],
)

app.include_router(routes_analysis.router)
app.include_router(routes_analysis.router, prefix="/api")
app.include_router(routes_taxonomy.router, prefix="/api")
app.include_router(routes_taxonomy_workbench.router, prefix="/api")
Expand Down
53 changes: 49 additions & 4 deletions backend/app/schemas/analysis.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,58 @@
from __future__ import annotations

from typing import Any

from pydantic import BaseModel, Field


class AnalysisRequest(BaseModel):
text: str = Field(min_length=1)
mode: str = "deterministic_baseline"
model_provider_id: str = "deterministic_baseline"
top_k: int = 8
include_healthy_patterns: bool = True
max_risks_per_claim: int = 3
allow_deterministic_fallback: bool = True
include_retrieval_diagnostics: bool = False


class DetectedRisk(BaseModel):
risk_id: str
category: str
label: str
severity: str
confidence: float
risk_score: float
risk_level: str
evidence_span: str
evidence_start_char: int
evidence_end_char: int
explanation: str
false_positive_warning: str
needs_human_review: bool


class AnalyzedClaim(BaseModel):
claim_id: str
text: str
claim_type: str
start_char: int
end_char: int
detected_risks: list[DetectedRisk]
healthy_patterns: list[dict[str, Any]]
warnings: list[str]
retrieval_diagnostics: dict[str, Any]


class AnalysisResponse(BaseModel):
analysis_id: str
summary: dict[str, Any]
claims: list[dict[str, Any]]
risks: list[dict[str, Any]]
text_id: str
mode: str
model_provider_id: str
model_name: str
llm_used: bool
deterministic_fallback_used: bool
claims: list[AnalyzedClaim]
overall_risk_score: float
risk_level: str
needs_human_review: bool
warnings: list[str]
21 changes: 19 additions & 2 deletions backend/app/services/analyzer_service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,24 @@
from __future__ import annotations

from typing import Any

from argument_risk_engine.analyzer import analyze_text

from backend.app.schemas.analysis import AnalysisRequest
from backend.app.services.taxonomy_service import get_active_pack


def analyze(text: str) -> dict[str, object]:
return analyze_text(text, get_active_pack())
def analyze(request: AnalysisRequest | str) -> dict[str, Any]:
if isinstance(request, str):
return analyze_text(request, get_active_pack())
return analyze_text(
request.text,
get_active_pack(),
mode=request.mode,
model_provider_id=request.model_provider_id,
top_k=int(request.top_k),
include_healthy_patterns=bool(request.include_healthy_patterns),
max_risks_per_claim=int(request.max_risks_per_claim),
allow_deterministic_fallback=bool(request.allow_deterministic_fallback),
include_retrieval_diagnostics=bool(request.include_retrieval_diagnostics),
)
204 changes: 171 additions & 33 deletions engine/argument_risk_engine/analyzer.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,191 @@
from __future__ import annotations

from uuid import uuid4
import hashlib
from typing import Any

from argument_risk_engine.classification.deterministic import classify_deterministic
from argument_risk_engine.explanation.evidence import evidence_span
from argument_risk_engine.explanation.explainer import explain
from argument_risk_engine.extraction.claim_extractor import extract_claims
from argument_risk_engine.retrieval.lexical_retriever import retrieve_candidates
from argument_risk_engine.scoring.scorer import score_risk
from argument_risk_engine.taxonomy.models import TaxonomyPack, default_taxonomy_pack
from argument_risk_engine.explanation.explainer import explain_risk, false_positive_warning
from argument_risk_engine.extraction.claim_extractor import Claim, extract_claims
from argument_risk_engine.retrieval.lexical_retriever import RetrievedTaxonomyEntry, retrieve_candidates
from argument_risk_engine.scoring.calibration import risk_level
from argument_risk_engine.scoring.scorer import score_classification
from argument_risk_engine.taxonomy.models import TaxonomyEntry, TaxonomyPack, default_taxonomy_pack

DEFAULT_MODE = "deterministic_baseline"
DEFAULT_MODEL_PROVIDER_ID = "deterministic_baseline"
DEFAULT_MODEL_NAME = "local-keyword-v1"

def analyze_text(text: str, pack: TaxonomyPack | None = None) -> dict[str, object]:

def analyze_text(
text: str,
pack: TaxonomyPack | None = None,
*,
mode: str = DEFAULT_MODE,
model_provider_id: str = DEFAULT_MODEL_PROVIDER_ID,
top_k: int = 8,
include_healthy_patterns: bool = True,
max_risks_per_claim: int = 3,
allow_deterministic_fallback: bool = True,
include_retrieval_diagnostics: bool = False,
) -> dict[str, Any]:
taxonomy_pack = pack or default_taxonomy_pack()
claims_out: list[dict[str, object]] = []
all_risks: list[dict[str, object]] = []
for claim in extract_claims(text):
candidates = retrieve_candidates(claim, taxonomy_pack)
classified = classify_deterministic(claim, candidates)
risks: list[dict[str, object]] = []
for result in classified:
entry = next(item for item in taxonomy_pack.entries if item.id == result["taxonomy_id"])
normalized_text = text or ""
claims = extract_claims(normalized_text)
claims_out: list[dict[str, Any]] = []
all_scores: list[float] = []
warnings: list[str] = []
any_review = False

for index, claim in enumerate(claims, start=1):
candidates = retrieve_candidates(str(claim), taxonomy_pack, limit=top_k)
classified = classify_deterministic(
str(claim),
candidates,
context=_context_for_claim(normalized_text, claim),
classification_mode=mode,
model_provider_id=model_provider_id,
model_name=DEFAULT_MODEL_NAME,
deterministic_fallback_used=False,
)
candidate_by_id = {candidate.entry.id: candidate for candidate in candidates}
high_confidence_count = 0
detected: list[dict[str, Any]] = []
claim_warnings: list[str] = []

for classification in classified:
entry = _entry_by_id(taxonomy_pack, str(classification.get("taxonomy_id") or classification.get("risk_id")))
if entry is None:
continue
candidate = candidate_by_id.get(entry.id)
classification = dict(classification)
classification["claim_type"] = claim.claim_type
classification = _absolute_evidence(classification, claim, normalized_text)
scored = score_classification(
classification,
entry=entry,
candidate=candidate,
claim_text=str(claim),
has_context=len(claims) > 1,
high_confidence_risk_count=high_confidence_count,
)
if scored.suppressed:
if scored.warning:
claim_warnings.append(scored.warning)
continue
if float(classification.get("confidence", 0.0) or 0.0) >= 0.75:
high_confidence_count += 1
review = bool(scored.needs_human_review or entry.requires_human_judgment)
any_review = any_review or review
warning = false_positive_warning(entry, scored.warning or str(classification.get("false_positive_warning", "") or ""))
risk = {
"taxonomy_id": entry.id,
"name": entry.name,
"severity": entry.severity.value,
"confidence": result["confidence"],
"score": score_risk(entry.severity.value, float(result["confidence"])),
"explanation": explain(entry, list(result["matched_terms"])),
"evidence": evidence_span(text, claim),
"mitigation": entry.mitigation,
"risk_id": entry.id,
"category": entry.canonical_category,
"label": entry.name,
"severity": classification.get("severity", entry.severity.value),
"confidence": round(float(classification.get("confidence", 0.0) or 0.0), 3),
"risk_score": scored.risk_score,
"risk_level": scored.risk_level,
"evidence_span": classification.get("evidence_span", ""),
"evidence_start_char": classification.get("evidence_start_char", claim.start_char),
"evidence_end_char": classification.get("evidence_end_char", claim.end_char),
"explanation": explain_risk(entry, classification, scored.risk_score, scored.risk_level),
"false_positive_warning": warning,
"needs_human_review": review,
}
detected.append(risk)
all_scores.append(scored.risk_score)

detected.sort(key=lambda item: (-float(item["risk_score"]), str(item["risk_id"])))
detected = detected[: max(0, max_risks_per_claim)]
diagnostics = _diagnostics(candidates) if include_retrieval_diagnostics else {}
claims_out.append(
{
"claim_id": f"claim_{index}",
"text": claim.text,
"claim_type": claim.claim_type,
"start_char": claim.start_char,
"end_char": claim.end_char,
"detected_risks": detected,
"healthy_patterns": _healthy_patterns(candidates) if include_healthy_patterns else [],
"warnings": sorted(set(claim_warnings)),
"retrieval_diagnostics": diagnostics,
}
risks.append(risk)
all_risks.append(risk)
claims_out.append({"text": claim, "risks": risks})
)

overall = round(max(all_scores) if all_scores else 0.0, 3)
return {
"analysis_id": str(uuid4()),
"text_id": _stable_text_id(normalized_text),
"mode": mode,
"model_provider_id": model_provider_id,
"model_name": DEFAULT_MODEL_NAME,
"llm_used": False,
"deterministic_fallback_used": False if mode == DEFAULT_MODE else allow_deterministic_fallback,
"claims": claims_out,
"overall_risk_score": overall,
"risk_level": risk_level(overall),
"needs_human_review": any_review,
"warnings": warnings,
# Backwards-compatible summary fields for older callers/tests.
"analysis_id": _stable_text_id(normalized_text),
"summary": {
"claim_count": len(claims_out),
"risk_count": len(all_risks),
"highest_severity": _highest_severity(all_risks),
"risk_count": sum(len(claim["detected_risks"]) for claim in claims_out),
"highest_severity": _highest_severity([risk for claim in claims_out for risk in claim["detected_risks"]]),
"stance": "conservative_review_signal",
},
"claims": claims_out,
"risks": all_risks,
"risks": [risk for claim in claims_out for risk in claim["detected_risks"]],
}


def _highest_severity(risks: list[dict[str, object]]) -> str:
def _stable_text_id(text: str) -> str:
return "txt_" + hashlib.sha256(text.encode("utf-8")).hexdigest()[:16]


def _entry_by_id(pack: TaxonomyPack, entry_id: str) -> TaxonomyEntry | None:
return next((entry for entry in pack.entries if entry.id == entry_id), None)


def _context_for_claim(text: str, claim: Claim) -> str:
before = text[max(0, claim.start_char - 400) : claim.start_char].strip()
after = text[claim.end_char : min(len(text), claim.end_char + 400)].strip()
return "\n".join(part for part in (before, after) if part)


def _absolute_evidence(classification: dict[str, Any], claim: Claim, text: str) -> dict[str, Any]:
span = str(classification.get("evidence_span", "") or "")
rel_start = int(classification.get("evidence_start_char", 0) or 0)
rel_end = int(classification.get("evidence_end_char", rel_start + len(span)) or rel_start + len(span))
abs_start = claim.start_char + rel_start if 0 <= rel_start <= len(claim.text) else text.find(span)
abs_end = abs_start + len(span) if abs_start >= 0 else claim.start_char + rel_end
exact = bool(span and abs_start >= 0 and text[abs_start:abs_end] == span)
if not exact and span:
found = text.find(span)
if found >= 0:
abs_start = found
abs_end = found + len(span)
exact = True
classification["evidence_start_char"] = max(0, abs_start)
classification["evidence_end_char"] = max(0, abs_end)
classification["evidence_exact"] = exact
return classification


def _healthy_patterns(candidates: list[RetrievedTaxonomyEntry]) -> list[dict[str, Any]]:
seen: set[str] = set()
patterns: list[dict[str, Any]] = []
for candidate in candidates:
for pattern_id in candidate.healthy_pattern_matches:
if pattern_id not in seen:
seen.add(pattern_id)
patterns.append({"pattern_id": pattern_id, "effect": "suppressed_or_reduced_risk"})
return patterns


def _diagnostics(candidates: list[RetrievedTaxonomyEntry]) -> dict[str, Any]:
return candidates[0].diagnostics if candidates else {}


def _highest_severity(risks: list[dict[str, Any]]) -> str:
order = {"none": 0, "low": 1, "medium": 2, "high": 3}
highest = "none"
for risk in risks:
Expand Down
22 changes: 22 additions & 0 deletions engine/argument_risk_engine/explanation/explainer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,30 @@
from __future__ import annotations

from typing import Any

from argument_risk_engine.taxonomy.models import TaxonomyEntry


def explain(entry: TaxonomyEntry, matched_terms: list[str]) -> str:
terms = ", ".join(matched_terms) if matched_terms else "taxonomy language"
return f"Matched {entry.name} because the claim contains {terms}. This is a review signal, not a truth judgement."


def explain_risk(entry: TaxonomyEntry, classification: dict[str, Any], risk_score: float, risk_level: str) -> str:
evidence = str(classification.get("evidence_span", "the cited text") or "the cited text")
definition = entry.short_definition or entry.long_definition or entry.name
return (
f"The evidence span {evidence!r} matches {entry.name}: {definition} "
f"The calibrated score is {risk_score:.2f} ({risk_level}); this flags argument risk, not factual truth."
)


def false_positive_warning(entry: TaxonomyEntry, extra_warning: str = "") -> str:
parts: list[str] = []
if entry.common_false_positives:
parts.append("Check false positives: " + "; ".join(entry.common_false_positives[:2]) + ".")
if entry.requires_human_judgment:
parts.append("This category may require human judgment.")
if extra_warning:
parts.append(extra_warning)
return " ".join(parts)
Loading