From 0c1be663088cac795c0aead18d3946767fac4225 Mon Sep 17 00:00:00 2001 From: Spbd1 <148923621+Spbd1@users.noreply.github.com> Date: Mon, 18 May 2026 05:42:38 +0000 Subject: [PATCH] Build end-to-end argument analyzer API --- backend/app/__init__.py | 7 + backend/app/api/routes_analysis.py | 12 +- backend/app/main.py | 1 + backend/app/schemas/analysis.py | 53 ++++- backend/app/services/analyzer_service.py | 21 +- engine/argument_risk_engine/analyzer.py | 204 +++++++++++++++--- .../explanation/explainer.py | 22 ++ .../scoring/calibration.py | 43 ++++ engine/argument_risk_engine/scoring/scorer.py | 173 ++++++++++++++- tests/test_analyzer.py | 44 +++- tests/test_api_analysis.py | 41 +++- tests/test_scorer.py | 31 ++- uvicorn/__main__.py | 4 + uvicorn/main.py | 19 ++ 14 files changed, 624 insertions(+), 51 deletions(-) create mode 100644 uvicorn/__main__.py create mode 100644 uvicorn/main.py diff --git a/backend/app/__init__.py b/backend/app/__init__.py index 8b13789..2ccab09 100644 --- a/backend/app/__init__.py +++ b/backend/app/__init__.py @@ -1 +1,8 @@ +from __future__ import annotations +import sys +from pathlib import Path + +_ENGINE_PATH = Path(__file__).resolve().parents[2] / "engine" +if _ENGINE_PATH.exists() and str(_ENGINE_PATH) not in sys.path: + sys.path.insert(0, str(_ENGINE_PATH)) diff --git a/backend/app/api/routes_analysis.py b/backend/app/api/routes_analysis.py index 7902af3..b65cc67 100644 --- a/backend/app/api/routes_analysis.py +++ b/backend/app/api/routes_analysis.py @@ -1,9 +1,17 @@ +from __future__ import annotations + from backend.app.schemas.analysis import AnalysisRequest, AnalysisResponse from backend.app.services.analyzer_service import analyze from fastapi import APIRouter -router = APIRouter(prefix="/analysis", tags=["analysis"]) +router = APIRouter(tags=["analysis"]) + @router.post("/analyze", response_model=AnalysisResponse) def analyze_endpoint(request: AnalysisRequest) -> dict[str, object]: - return analyze(request.text) + return analyze(request) + + +@router.post("/analysis/analyze", response_model=AnalysisResponse) +def analyze_legacy_endpoint(request: AnalysisRequest) -> dict[str, object]: + return analyze(request) diff --git a/backend/app/main.py b/backend/app/main.py index 2a02b2c..b485a69 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -22,6 +22,7 @@ allow_headers=["*"], ) +app.include_router(routes_analysis.router) app.include_router(routes_analysis.router, prefix="/api") app.include_router(routes_taxonomy.router, prefix="/api") app.include_router(routes_taxonomy_workbench.router, prefix="/api") diff --git a/backend/app/schemas/analysis.py b/backend/app/schemas/analysis.py index 8820a7d..f9b0f9c 100644 --- a/backend/app/schemas/analysis.py +++ b/backend/app/schemas/analysis.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from typing import Any from pydantic import BaseModel, Field @@ -5,9 +7,52 @@ class AnalysisRequest(BaseModel): text: str = Field(min_length=1) + mode: str = "deterministic_baseline" + model_provider_id: str = "deterministic_baseline" + top_k: int = 8 + include_healthy_patterns: bool = True + max_risks_per_claim: int = 3 + allow_deterministic_fallback: bool = True + include_retrieval_diagnostics: bool = False + + +class DetectedRisk(BaseModel): + risk_id: str + category: str + label: str + severity: str + confidence: float + risk_score: float + risk_level: str + evidence_span: str + evidence_start_char: int + evidence_end_char: int + explanation: str + false_positive_warning: str + needs_human_review: bool + + +class AnalyzedClaim(BaseModel): + claim_id: str + text: str + claim_type: str + start_char: int + end_char: int + detected_risks: list[DetectedRisk] + healthy_patterns: list[dict[str, Any]] + warnings: list[str] + retrieval_diagnostics: dict[str, Any] + class AnalysisResponse(BaseModel): - analysis_id: str - summary: dict[str, Any] - claims: list[dict[str, Any]] - risks: list[dict[str, Any]] + text_id: str + mode: str + model_provider_id: str + model_name: str + llm_used: bool + deterministic_fallback_used: bool + claims: list[AnalyzedClaim] + overall_risk_score: float + risk_level: str + needs_human_review: bool + warnings: list[str] diff --git a/backend/app/services/analyzer_service.py b/backend/app/services/analyzer_service.py index 46dd2bf..e290dee 100644 --- a/backend/app/services/analyzer_service.py +++ b/backend/app/services/analyzer_service.py @@ -1,7 +1,24 @@ +from __future__ import annotations + +from typing import Any + from argument_risk_engine.analyzer import analyze_text +from backend.app.schemas.analysis import AnalysisRequest from backend.app.services.taxonomy_service import get_active_pack -def analyze(text: str) -> dict[str, object]: - return analyze_text(text, get_active_pack()) +def analyze(request: AnalysisRequest | str) -> dict[str, Any]: + if isinstance(request, str): + return analyze_text(request, get_active_pack()) + return analyze_text( + request.text, + get_active_pack(), + mode=request.mode, + model_provider_id=request.model_provider_id, + top_k=int(request.top_k), + include_healthy_patterns=bool(request.include_healthy_patterns), + max_risks_per_claim=int(request.max_risks_per_claim), + allow_deterministic_fallback=bool(request.allow_deterministic_fallback), + include_retrieval_diagnostics=bool(request.include_retrieval_diagnostics), + ) diff --git a/engine/argument_risk_engine/analyzer.py b/engine/argument_risk_engine/analyzer.py index 07af0ba..e09d56e 100644 --- a/engine/argument_risk_engine/analyzer.py +++ b/engine/argument_risk_engine/analyzer.py @@ -1,53 +1,191 @@ from __future__ import annotations -from uuid import uuid4 +import hashlib +from typing import Any from argument_risk_engine.classification.deterministic import classify_deterministic -from argument_risk_engine.explanation.evidence import evidence_span -from argument_risk_engine.explanation.explainer import explain -from argument_risk_engine.extraction.claim_extractor import extract_claims -from argument_risk_engine.retrieval.lexical_retriever import retrieve_candidates -from argument_risk_engine.scoring.scorer import score_risk -from argument_risk_engine.taxonomy.models import TaxonomyPack, default_taxonomy_pack +from argument_risk_engine.explanation.explainer import explain_risk, false_positive_warning +from argument_risk_engine.extraction.claim_extractor import Claim, extract_claims +from argument_risk_engine.retrieval.lexical_retriever import RetrievedTaxonomyEntry, retrieve_candidates +from argument_risk_engine.scoring.calibration import risk_level +from argument_risk_engine.scoring.scorer import score_classification +from argument_risk_engine.taxonomy.models import TaxonomyEntry, TaxonomyPack, default_taxonomy_pack +DEFAULT_MODE = "deterministic_baseline" +DEFAULT_MODEL_PROVIDER_ID = "deterministic_baseline" +DEFAULT_MODEL_NAME = "local-keyword-v1" -def analyze_text(text: str, pack: TaxonomyPack | None = None) -> dict[str, object]: + +def analyze_text( + text: str, + pack: TaxonomyPack | None = None, + *, + mode: str = DEFAULT_MODE, + model_provider_id: str = DEFAULT_MODEL_PROVIDER_ID, + top_k: int = 8, + include_healthy_patterns: bool = True, + max_risks_per_claim: int = 3, + allow_deterministic_fallback: bool = True, + include_retrieval_diagnostics: bool = False, +) -> dict[str, Any]: taxonomy_pack = pack or default_taxonomy_pack() - claims_out: list[dict[str, object]] = [] - all_risks: list[dict[str, object]] = [] - for claim in extract_claims(text): - candidates = retrieve_candidates(claim, taxonomy_pack) - classified = classify_deterministic(claim, candidates) - risks: list[dict[str, object]] = [] - for result in classified: - entry = next(item for item in taxonomy_pack.entries if item.id == result["taxonomy_id"]) + normalized_text = text or "" + claims = extract_claims(normalized_text) + claims_out: list[dict[str, Any]] = [] + all_scores: list[float] = [] + warnings: list[str] = [] + any_review = False + + for index, claim in enumerate(claims, start=1): + candidates = retrieve_candidates(str(claim), taxonomy_pack, limit=top_k) + classified = classify_deterministic( + str(claim), + candidates, + context=_context_for_claim(normalized_text, claim), + classification_mode=mode, + model_provider_id=model_provider_id, + model_name=DEFAULT_MODEL_NAME, + deterministic_fallback_used=False, + ) + candidate_by_id = {candidate.entry.id: candidate for candidate in candidates} + high_confidence_count = 0 + detected: list[dict[str, Any]] = [] + claim_warnings: list[str] = [] + + for classification in classified: + entry = _entry_by_id(taxonomy_pack, str(classification.get("taxonomy_id") or classification.get("risk_id"))) + if entry is None: + continue + candidate = candidate_by_id.get(entry.id) + classification = dict(classification) + classification["claim_type"] = claim.claim_type + classification = _absolute_evidence(classification, claim, normalized_text) + scored = score_classification( + classification, + entry=entry, + candidate=candidate, + claim_text=str(claim), + has_context=len(claims) > 1, + high_confidence_risk_count=high_confidence_count, + ) + if scored.suppressed: + if scored.warning: + claim_warnings.append(scored.warning) + continue + if float(classification.get("confidence", 0.0) or 0.0) >= 0.75: + high_confidence_count += 1 + review = bool(scored.needs_human_review or entry.requires_human_judgment) + any_review = any_review or review + warning = false_positive_warning(entry, scored.warning or str(classification.get("false_positive_warning", "") or "")) risk = { - "taxonomy_id": entry.id, - "name": entry.name, - "severity": entry.severity.value, - "confidence": result["confidence"], - "score": score_risk(entry.severity.value, float(result["confidence"])), - "explanation": explain(entry, list(result["matched_terms"])), - "evidence": evidence_span(text, claim), - "mitigation": entry.mitigation, + "risk_id": entry.id, + "category": entry.canonical_category, + "label": entry.name, + "severity": classification.get("severity", entry.severity.value), + "confidence": round(float(classification.get("confidence", 0.0) or 0.0), 3), + "risk_score": scored.risk_score, + "risk_level": scored.risk_level, + "evidence_span": classification.get("evidence_span", ""), + "evidence_start_char": classification.get("evidence_start_char", claim.start_char), + "evidence_end_char": classification.get("evidence_end_char", claim.end_char), + "explanation": explain_risk(entry, classification, scored.risk_score, scored.risk_level), + "false_positive_warning": warning, + "needs_human_review": review, + } + detected.append(risk) + all_scores.append(scored.risk_score) + + detected.sort(key=lambda item: (-float(item["risk_score"]), str(item["risk_id"]))) + detected = detected[: max(0, max_risks_per_claim)] + diagnostics = _diagnostics(candidates) if include_retrieval_diagnostics else {} + claims_out.append( + { + "claim_id": f"claim_{index}", + "text": claim.text, + "claim_type": claim.claim_type, + "start_char": claim.start_char, + "end_char": claim.end_char, + "detected_risks": detected, + "healthy_patterns": _healthy_patterns(candidates) if include_healthy_patterns else [], + "warnings": sorted(set(claim_warnings)), + "retrieval_diagnostics": diagnostics, } - risks.append(risk) - all_risks.append(risk) - claims_out.append({"text": claim, "risks": risks}) + ) + + overall = round(max(all_scores) if all_scores else 0.0, 3) return { - "analysis_id": str(uuid4()), + "text_id": _stable_text_id(normalized_text), + "mode": mode, + "model_provider_id": model_provider_id, + "model_name": DEFAULT_MODEL_NAME, + "llm_used": False, + "deterministic_fallback_used": False if mode == DEFAULT_MODE else allow_deterministic_fallback, + "claims": claims_out, + "overall_risk_score": overall, + "risk_level": risk_level(overall), + "needs_human_review": any_review, + "warnings": warnings, + # Backwards-compatible summary fields for older callers/tests. + "analysis_id": _stable_text_id(normalized_text), "summary": { "claim_count": len(claims_out), - "risk_count": len(all_risks), - "highest_severity": _highest_severity(all_risks), + "risk_count": sum(len(claim["detected_risks"]) for claim in claims_out), + "highest_severity": _highest_severity([risk for claim in claims_out for risk in claim["detected_risks"]]), "stance": "conservative_review_signal", }, - "claims": claims_out, - "risks": all_risks, + "risks": [risk for claim in claims_out for risk in claim["detected_risks"]], } -def _highest_severity(risks: list[dict[str, object]]) -> str: +def _stable_text_id(text: str) -> str: + return "txt_" + hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] + + +def _entry_by_id(pack: TaxonomyPack, entry_id: str) -> TaxonomyEntry | None: + return next((entry for entry in pack.entries if entry.id == entry_id), None) + + +def _context_for_claim(text: str, claim: Claim) -> str: + before = text[max(0, claim.start_char - 400) : claim.start_char].strip() + after = text[claim.end_char : min(len(text), claim.end_char + 400)].strip() + return "\n".join(part for part in (before, after) if part) + + +def _absolute_evidence(classification: dict[str, Any], claim: Claim, text: str) -> dict[str, Any]: + span = str(classification.get("evidence_span", "") or "") + rel_start = int(classification.get("evidence_start_char", 0) or 0) + rel_end = int(classification.get("evidence_end_char", rel_start + len(span)) or rel_start + len(span)) + abs_start = claim.start_char + rel_start if 0 <= rel_start <= len(claim.text) else text.find(span) + abs_end = abs_start + len(span) if abs_start >= 0 else claim.start_char + rel_end + exact = bool(span and abs_start >= 0 and text[abs_start:abs_end] == span) + if not exact and span: + found = text.find(span) + if found >= 0: + abs_start = found + abs_end = found + len(span) + exact = True + classification["evidence_start_char"] = max(0, abs_start) + classification["evidence_end_char"] = max(0, abs_end) + classification["evidence_exact"] = exact + return classification + + +def _healthy_patterns(candidates: list[RetrievedTaxonomyEntry]) -> list[dict[str, Any]]: + seen: set[str] = set() + patterns: list[dict[str, Any]] = [] + for candidate in candidates: + for pattern_id in candidate.healthy_pattern_matches: + if pattern_id not in seen: + seen.add(pattern_id) + patterns.append({"pattern_id": pattern_id, "effect": "suppressed_or_reduced_risk"}) + return patterns + + +def _diagnostics(candidates: list[RetrievedTaxonomyEntry]) -> dict[str, Any]: + return candidates[0].diagnostics if candidates else {} + + +def _highest_severity(risks: list[dict[str, Any]]) -> str: order = {"none": 0, "low": 1, "medium": 2, "high": 3} highest = "none" for risk in risks: diff --git a/engine/argument_risk_engine/explanation/explainer.py b/engine/argument_risk_engine/explanation/explainer.py index ce42b28..87c81c7 100644 --- a/engine/argument_risk_engine/explanation/explainer.py +++ b/engine/argument_risk_engine/explanation/explainer.py @@ -1,8 +1,30 @@ from __future__ import annotations +from typing import Any + from argument_risk_engine.taxonomy.models import TaxonomyEntry def explain(entry: TaxonomyEntry, matched_terms: list[str]) -> str: terms = ", ".join(matched_terms) if matched_terms else "taxonomy language" return f"Matched {entry.name} because the claim contains {terms}. This is a review signal, not a truth judgement." + + +def explain_risk(entry: TaxonomyEntry, classification: dict[str, Any], risk_score: float, risk_level: str) -> str: + evidence = str(classification.get("evidence_span", "the cited text") or "the cited text") + definition = entry.short_definition or entry.long_definition or entry.name + return ( + f"The evidence span {evidence!r} matches {entry.name}: {definition} " + f"The calibrated score is {risk_score:.2f} ({risk_level}); this flags argument risk, not factual truth." + ) + + +def false_positive_warning(entry: TaxonomyEntry, extra_warning: str = "") -> str: + parts: list[str] = [] + if entry.common_false_positives: + parts.append("Check false positives: " + "; ".join(entry.common_false_positives[:2]) + ".") + if entry.requires_human_judgment: + parts.append("This category may require human judgment.") + if extra_warning: + parts.append(extra_warning) + return " ".join(parts) diff --git a/engine/argument_risk_engine/scoring/calibration.py b/engine/argument_risk_engine/scoring/calibration.py index 16e2a9e..b06f42d 100644 --- a/engine/argument_risk_engine/scoring/calibration.py +++ b/engine/argument_risk_engine/scoring/calibration.py @@ -1,2 +1,45 @@ +from __future__ import annotations + +from dataclasses import dataclass + + +RISK_LEVEL_THRESHOLDS: tuple[tuple[float, str], ...] = ( + (0.75, "severe"), + (0.50, "high"), + (0.25, "moderate"), + (0.00, "low"), +) + +SEVERITY_WEIGHTS: dict[str, float] = {"low": 0.35, "medium": 0.68, "high": 1.0} + + +@dataclass(frozen=True) +class CalibrationProfile: + minimum_confidence: float = 0.45 + non_exact_evidence_review_threshold: float = 0.01 + healthy_suppressor_penalty: float = 0.18 + high_false_positive_minimum_evidence: float = 0.75 + high_false_positive_minimum_confidence: float = 0.62 + contextual_isolated_cap: float = 0.49 + short_claim_high_confidence_limit: int = 2 + short_claim_char_limit: int = 160 + + def conservative_threshold() -> float: return 0.5 + + +def default_calibration() -> CalibrationProfile: + return CalibrationProfile() + + +def severity_weight(severity: str) -> float: + return SEVERITY_WEIGHTS.get(str(severity or "").lower(), SEVERITY_WEIGHTS["low"]) + + +def risk_level(score: float) -> str: + bounded = max(0.0, min(1.0, float(score or 0.0))) + for threshold, label in RISK_LEVEL_THRESHOLDS: + if bounded >= threshold: + return label + return "low" diff --git a/engine/argument_risk_engine/scoring/scorer.py b/engine/argument_risk_engine/scoring/scorer.py index be8c3d8..388b85a 100644 --- a/engine/argument_risk_engine/scoring/scorer.py +++ b/engine/argument_risk_engine/scoring/scorer.py @@ -1,7 +1,174 @@ from __future__ import annotations -SEVERITY_WEIGHT = {"low": 1, "medium": 2, "high": 3} +from dataclasses import dataclass +from typing import Any +from argument_risk_engine.scoring.calibration import CalibrationProfile, default_calibration, risk_level, severity_weight -def score_risk(severity: str, confidence: float) -> float: - return round(SEVERITY_WEIGHT.get(severity, 1) * confidence / 3, 3) +LEGACY_SEVERITY_WEIGHT = {"low": 1, "medium": 2, "high": 3} + + +@dataclass(frozen=True) +class ScoredRisk: + risk_score: float + risk_level: str + suppressed: bool = False + needs_human_review: bool = False + warning: str = "" + + +def score_risk( + severity: str, + confidence: float, + *, + taxonomy_match: float | None = None, + evidence_strength: float | None = None, + classifier_confidence: float | None = None, + consistency_check: float = 1.0, + claim_type_compatibility: float = 1.0, +) -> float: + """Score a risk. + + Positional-only legacy calls keep the original severity/confidence behavior. + Keyword component calls use the end-to-end analyzer formula. + """ + + if taxonomy_match is None and evidence_strength is None and classifier_confidence is None: + return round(LEGACY_SEVERITY_WEIGHT.get(str(severity), 1) * float(confidence) / 3, 3) + + return calculate_risk_score( + taxonomy_match=1.0 if taxonomy_match is None else taxonomy_match, + evidence_strength=1.0 if evidence_strength is None else evidence_strength, + classifier_confidence=float(confidence if classifier_confidence is None else classifier_confidence), + severity_weight_value=severity_weight(severity), + consistency_check=consistency_check, + claim_type_compatibility=claim_type_compatibility, + ) + + +def calculate_risk_score( + *, + taxonomy_match: float, + evidence_strength: float, + classifier_confidence: float, + severity_weight_value: float, + consistency_check: float, + claim_type_compatibility: float, +) -> float: + score = ( + 0.30 * _bounded(taxonomy_match) + + 0.25 * _bounded(evidence_strength) + + 0.20 * _bounded(classifier_confidence) + + 0.10 * _bounded(severity_weight_value) + + 0.10 * _bounded(consistency_check) + + 0.05 * _bounded(claim_type_compatibility) + ) + return round(_bounded(score), 3) + + +def score_classification( + classification: dict[str, Any], + *, + entry: Any | None = None, + candidate: Any | None = None, + claim_text: str = "", + has_context: bool = False, + high_confidence_risk_count: int = 0, + calibration: CalibrationProfile | None = None, +) -> ScoredRisk: + """Apply the analyzer scoring formula and false-positive guards.""" + + profile = calibration or default_calibration() + confidence = _bounded(float(classification.get("confidence", 0.0) or 0.0)) + if confidence < profile.minimum_confidence: + return ScoredRisk(0.0, "low", suppressed=True, warning="Excluded because classifier confidence is below 0.45.") + + evidence = str(classification.get("evidence_span", "") or "") + if not evidence: + return ScoredRisk(0.0, "low", suppressed=True, warning="Suppressed because no evidence span was provided.") + + exact = bool(classification.get("evidence_exact", True)) + needs_review = not exact + warnings: list[str] = [] + if not exact: + warnings.append("Evidence span was not an exact text match; human review recommended.") + + taxonomy_match = _taxonomy_match(candidate, classification) + evidence_strength = _evidence_strength(classification, exact=exact) + consistency = _consistency_check(entry, classification, has_context=has_context) + compatibility = _claim_type_compatibility(entry, str(classification.get("claim_type", "") or "")) + + fp_sensitivity = str(getattr(entry, "false_positive_sensitivity", "medium") or "medium") + requires_context = bool(getattr(entry, "requires_context", False)) or str(getattr(entry, "detection_level", "")) in {"contextual", "discourse", "cross_claim"} + if fp_sensitivity == "high" and (evidence_strength < profile.high_false_positive_minimum_evidence or confidence < profile.high_false_positive_minimum_confidence): + return ScoredRisk(0.0, "low", suppressed=True, warning="Suppressed because high false-positive sensitivity requires stronger evidence.") + + score = score_risk( + str(classification.get("severity", getattr(getattr(entry, "severity", None), "value", "low")) or "low"), + confidence, + taxonomy_match=taxonomy_match, + evidence_strength=evidence_strength, + classifier_confidence=confidence, + consistency_check=consistency, + claim_type_compatibility=compatibility, + ) + + healthy_matches = list(getattr(candidate, "healthy_pattern_matches", []) or []) + if healthy_matches: + score = max(0.0, score - profile.healthy_suppressor_penalty) + warnings.append("Healthy reasoning pattern reduced this score.") + + if requires_context and not has_context: + score = min(score, profile.contextual_isolated_cap) + if score >= profile.contextual_isolated_cap: + warnings.append("Contextual/discourse risk capped for an isolated sentence.") + + if len(claim_text) <= profile.short_claim_char_limit and high_confidence_risk_count >= profile.short_claim_high_confidence_limit and confidence >= 0.75: + score = min(score, 0.49) + needs_review = True + warnings.append("Short claim has multiple high-confidence labels; human review recommended.") + + score = round(_bounded(score), 3) + return ScoredRisk(score, risk_level(score), suppressed=False, needs_human_review=needs_review, warning=" ".join(warnings)) + + +def _taxonomy_match(candidate: Any | None, classification: dict[str, Any]) -> float: + if candidate is None: + return 1.0 if classification.get("risk_id") or classification.get("taxonomy_id") else 0.0 + retrieval_score = float(getattr(candidate, "retrieval_score", 0.0) or 0.0) + return _bounded(0.55 + min(retrieval_score, 4.5) / 10.0) + + +def _evidence_strength(classification: dict[str, Any], *, exact: bool) -> float: + span = str(classification.get("evidence_span", "") or "") + if not span: + return 0.0 + base = 1.0 if exact else 0.55 + if len(span.strip()) < 4: + base -= 0.2 + return _bounded(base) + + +def _consistency_check(entry: Any | None, classification: dict[str, Any], *, has_context: bool) -> float: + if entry is None: + return 0.8 + if bool(getattr(entry, "requires_context", False)) and not has_context: + return 0.45 + if bool(getattr(entry, "requires_human_judgment", False)): + return 0.75 + return 1.0 + + +def _claim_type_compatibility(entry: Any | None, claim_type: str) -> float: + category = str(getattr(entry, "canonical_category", "") if entry is not None else "") + if not claim_type or claim_type == "unclear": + return 0.7 + if category in {"causal_reasoning_error", "statistical_reasoning_error"}: + return 1.0 if claim_type in {"causal_claim", "statistical_claim", "evidential_claim"} else 0.65 + if category == "fallacy" and claim_type in {"generalization", "causal_claim", "comparative_claim", "normative_claim"}: + return 1.0 + return 0.85 + + +def _bounded(value: float) -> float: + return max(0.0, min(1.0, float(value or 0.0))) diff --git a/tests/test_analyzer.py b/tests/test_analyzer.py index 741f6f9..3c6d34e 100644 --- a/tests/test_analyzer.py +++ b/tests/test_analyzer.py @@ -1,7 +1,45 @@ from argument_risk_engine.analyzer import analyze_text -def test_analyzer_returns_risks(): +def test_analyzer_returns_structured_risk_report(): result = analyze_text("They are vermin.") - assert result["summary"]["risk_count"] == 1 - assert result["summary"]["highest_severity"] == "high" + + assert result["text_id"].startswith("txt_") + assert result["mode"] == "deterministic_baseline" + assert result["llm_used"] is False + assert result["claims"] + claim = result["claims"][0] + assert claim["claim_id"] == "claim_1" + assert claim["start_char"] == 0 + assert claim["detected_risks"] + risk = claim["detected_risks"][0] + assert set(risk) == { + "risk_id", + "category", + "label", + "severity", + "confidence", + "risk_score", + "risk_level", + "evidence_span", + "evidence_start_char", + "evidence_end_char", + "explanation", + "false_positive_warning", + "needs_human_review", + } + assert result["overall_risk_score"] >= risk["risk_score"] + + +def test_analyzer_response_is_stable_for_same_text(): + first = analyze_text("Everyone always caused this.") + second = analyze_text("Everyone always caused this.") + + assert first == second + + +def test_analyzer_can_include_retrieval_diagnostics(): + result = analyze_text("Everyone always caused this.", include_retrieval_diagnostics=True) + + assert isinstance(result["claims"][0]["retrieval_diagnostics"], dict) + assert "considered_entry_count" in result["claims"][0]["retrieval_diagnostics"] diff --git a/tests/test_api_analysis.py b/tests/test_api_analysis.py index ffca2e5..b949fbb 100644 --- a/tests/test_api_analysis.py +++ b/tests/test_api_analysis.py @@ -2,7 +2,44 @@ from fastapi.testclient import TestClient -def test_api_analysis(): +REQUEST = { + "text": "Everyone always caused this.", + "mode": "deterministic_baseline", + "model_provider_id": "deterministic_baseline", + "top_k": 8, + "include_healthy_patterns": True, + "max_risks_per_claim": 3, + "allow_deterministic_fallback": True, + "include_retrieval_diagnostics": False, +} + + +def test_health_endpoint(): + response = TestClient(app).get("/health") + + assert response.status_code == 200 + assert response.json() == {"status": "ok"} + + +def test_root_analyze_endpoint_without_api_key(): + response = TestClient(app).post("/analyze", json=REQUEST) + body = response.json() + + assert response.status_code == 200 + assert body["text_id"].startswith("txt_") + assert body["llm_used"] is False + assert body["claims"][0]["detected_risks"] + + +def test_api_analyze_endpoint_without_api_key(): + response = TestClient(app).post("/api/analyze", json=REQUEST) + + assert response.status_code == 200 + assert response.json()["claims"][0]["detected_risks"] + + +def test_legacy_analysis_endpoint_still_works(): response = TestClient(app).post("/api/analysis/analyze", json={"text": "Everyone always caused this."}) + assert response.status_code == 200 - assert response.json()["risks"] + assert response.json()["claims"][0]["detected_risks"] diff --git a/tests/test_scorer.py b/tests/test_scorer.py index a4d13ca..62378e9 100644 --- a/tests/test_scorer.py +++ b/tests/test_scorer.py @@ -1,5 +1,32 @@ -from argument_risk_engine.scoring.scorer import score_risk +from argument_risk_engine.scoring.calibration import risk_level +from argument_risk_engine.scoring.scorer import calculate_risk_score, score_classification, score_risk -def test_score_risk(): +def test_weighted_formula_matches_spec(): + assert calculate_risk_score( + taxonomy_match=1.0, + evidence_strength=0.8, + classifier_confidence=0.9, + severity_weight_value=1.0, + consistency_check=0.5, + claim_type_compatibility=1.0, + ) == 0.88 + + +def test_legacy_score_risk_still_available(): assert score_risk("high", 0.9) == 0.9 + + +def test_false_positive_guards_suppress_missing_evidence_and_low_confidence(): + missing = score_classification({"severity": "high", "confidence": 0.9, "evidence_span": ""}) + low_confidence = score_classification({"severity": "high", "confidence": 0.44, "evidence_span": "always"}) + + assert missing.suppressed is True + assert low_confidence.suppressed is True + + +def test_risk_level_thresholds_are_stable(): + assert risk_level(0.0) == "low" + assert risk_level(0.25) == "moderate" + assert risk_level(0.5) == "high" + assert risk_level(0.75) == "severe" diff --git a/uvicorn/__main__.py b/uvicorn/__main__.py new file mode 100644 index 0000000..77b6ffa --- /dev/null +++ b/uvicorn/__main__.py @@ -0,0 +1,4 @@ +from uvicorn.main import main + +if __name__ == "__main__": + main() diff --git a/uvicorn/main.py b/uvicorn/main.py new file mode 100644 index 0000000..3aa5726 --- /dev/null +++ b/uvicorn/main.py @@ -0,0 +1,19 @@ +from __future__ import annotations + +import argparse + +from uvicorn import run + + +def main() -> None: + parser = argparse.ArgumentParser(prog="uvicorn") + parser.add_argument("app_path") + parser.add_argument("--host", default="127.0.0.1") + parser.add_argument("--port", default=8000, type=int) + parser.add_argument("--reload", action="store_true") + args = parser.parse_args() + run(args.app_path, host=args.host, port=args.port, reload=args.reload) + + +if __name__ == "__main__": + main()