diff --git a/app/src/lib/tools/log-analyzer.ts b/app/src/lib/tools/log-analyzer.ts index 57c04af..4693926 100644 --- a/app/src/lib/tools/log-analyzer.ts +++ b/app/src/lib/tools/log-analyzer.ts @@ -1,4 +1,4 @@ -import type { ToolDefinition } from "@/types"; +import type { ToolDefinition } from "@/types"; export const logAnalyzer: ToolDefinition = { id: "log-analyzer", @@ -8,25 +8,13 @@ export const logAnalyzer: ToolDefinition = { category: "devops", icon: "Terminal", status: "active", + tier: "tier2", requiredFields: ["logs"], defaultModel: "deepseek-r1-0528", - buildSystemPrompt: () => - `You are a senior DevOps/SRE engineer analyzing system logs. Provide: - -1. **Severity Assessment** - Critical / Warning / Info - how urgent is this? -2. **Error Summary** - List each unique error type with occurrence count -3. **Root Cause Analysis** - What is most likely causing these errors? -4. **Timeline** - When did the issue start? Is it escalating or stable? -5. **Pattern Detection** - Are errors correlated? Time-based patterns? Cascading failures? -6. **Recommended Fixes** - Specific, actionable steps to resolve each issue -7. **Prevention** - Configuration or monitoring changes to prevent recurrence - -Format as structured markdown. Use tables for error summaries. Highlight critical items with ⚠️.`, - - buildUserPrompt: ({ logs, context }) => - `${context ? `**CONTEXT:** ${context}\n\n` : ""}**SYSTEM LOGS:**\n\`\`\`\n${logs}\n\`\`\`\n\nAnalyze these logs and identify issues.`, + buildSystemPrompt: () => "", // unused — tool.py / llm_client.py own the prompt + buildUserPrompt: () => "", // unused — tool.py builds the payload inputs: [ { @@ -46,5 +34,20 @@ Format as structured markdown. Use tables for error summaries. Highlight critica placeholder: "E.g. 'This started after deploying v2.3.1 to production at 10:30 AM'", rows: 2, }, + { + key: "report_mode", + label: "Report Mode", + type: "select", + options: [ + { + value: "fix_only", + label: "Fix Only — just tell me what to do right now", + }, + { + value: "detailed", + label: "Full Report — root causes, timeline, patterns + fixes", + }, + ], + }, ], }; diff --git a/services/python-tools/tools/log-analyzer/core/context_compressor.py b/services/python-tools/tools/log-analyzer/core/context_compressor.py new file mode 100644 index 0000000..dab0670 --- /dev/null +++ b/services/python-tools/tools/log-analyzer/core/context_compressor.py @@ -0,0 +1,264 @@ +from __future__ import annotations + +import json +import math +from dataclasses import asdict, dataclass +from typing import Any, Dict, List, Optional, Tuple + +from .statistical_analyzer import AnalysisResult +from .correlation_engine import CorrelationResult + + +# --------------------------------------------------------------------------- +# Short-name mapping (saves ~30% tokens on field names) +# --------------------------------------------------------------------------- +# Full name → compact alias used in the JSON sent to the LLM. +# The LLM system prompt explains these aliases once, paying the cost +# a single time rather than per-request. + +_COMPACT = { + "total_entries": "n", + "level_distribution": "lvls", + "top_errors": "errs", + "unique_error_count": "u_errs", + "burst_windows": "bursts", + "escalation_events": "escalations", + "has_timestamps": "has_ts", + "time_span_seconds": "span_s", + "correlated_pairs": "corr", + "cascade_chains": "chains", + "source_hotspots": "hotspots", + "pattern_a": "a", + "pattern_b": "b", + "co_occurrence_count": "cnt", + "avg_lag_seconds": "lag", + "confidence": "conf", + "root": "r", + "chain": "ch", + "total_occurrences": "tot", + "start": "s", + "end": "e", + "error_count": "ec", + "rate_multiplier": "mx", + "timestamp": "ts", + "from_level": "f", + "to_level": "t", + "message": "m", +} + + +def _compact(d: Dict[str, Any]) -> Dict[str, Any]: + return {_COMPACT.get(k, k): v for k, v in d.items()} + + +def _q(v: Optional[float]) -> Optional[float]: + """Quantise to 2 decimal places.""" + return round(v, 2) if v is not None else None + + +def _trunc(s: str, n: int = 120) -> str: + return s if len(s) <= n else s[:n - 1] + "…" + + +# --------------------------------------------------------------------------- +# Rough token estimator (character-based, ~4 chars/token for JSON) +# --------------------------------------------------------------------------- + +def _estimate_tokens(text: str) -> int: + return math.ceil(len(text) / 4) + + +# --------------------------------------------------------------------------- +# Compressor +# --------------------------------------------------------------------------- + +class ContextCompressor: + """ + Produces a token-minimised JSON payload from pipeline stage outputs. + + Parameters + ---------- + max_top_errors: + Number of top error patterns to include (default 15). + max_correlated_pairs: + Number of correlation pairs to include (default 10). + max_cascade_chains: + Number of cascade chains to include (default 5). + max_timeline_buckets: + Busiest N time buckets to include (default 10). + max_escalations: + Number of escalation events to include (default 10). + pattern_min_count: + Drop error patterns occurring fewer than this many times. + """ + + def __init__( + self, + max_top_errors: int = 15, + max_correlated_pairs: int = 10, + max_cascade_chains: int = 5, + max_timeline_buckets: int = 10, + max_escalations: int = 10, + pattern_min_count: int = 1, + ) -> None: + self._max_errors = max_top_errors + self._max_pairs = max_correlated_pairs + self._max_chains = max_cascade_chains + self._max_buckets = max_timeline_buckets + self._max_escalations = max_escalations + self._min_count = pattern_min_count + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def compress( + self, + stats: AnalysisResult, + correlations: CorrelationResult, + user_query: str = "", + ) -> Dict[str, Any]: + """ + Returns a dict with keys: + "payload" — the compressed JSON object (send to LLM) + "payload_json" — serialised string of payload + "estimated_tokens" — rough token count of payload_json + "compression_notes" — human-readable summary of what was dropped + """ + payload = self._build_payload(stats, correlations) + payload_json = json.dumps(payload, separators=(",", ":")) + + notes = self._compression_notes(stats, correlations, payload) + + return { + "payload": payload, + "payload_json": payload_json, + "estimated_tokens": _estimate_tokens(payload_json), + "user_query": user_query, + "compression_notes": notes, + } + + # ------------------------------------------------------------------ + # Internals + # ------------------------------------------------------------------ + + def _build_payload( + self, + s: AnalysisResult, + c: CorrelationResult, + ) -> Dict[str, Any]: + # --- Top errors (filter + truncate) --- + top_errs = [ + {"p": _trunc(pattern), "c": count} + for pattern, count in s.top_errors[: self._max_errors] + if count >= self._min_count + ] + + # --- Burst windows --- + bursts = [ + _compact({ + "start": b.start, + "end": b.end, + "error_count": b.error_count, + "rate_multiplier": _q(b.rate_multiplier), + }) + for b in s.burst_windows + ] + + # --- Escalation events (most recent N) --- + escalations = [ + _compact({ + "timestamp": e.timestamp, + "from_level": e.from_level.value, + "to_level": e.to_level.value, + "message": _trunc(e.message, 80), + }) + for e in s.escalation_events[-self._max_escalations :] + ] + + # --- Timeline: top-N busiest error buckets --- + timeline = self._prune_timeline(s.timeline_buckets) + + # --- Correlation pairs --- + pairs = [ + _compact({ + "pattern_a": _trunc(p.pattern_a, 80), + "pattern_b": _trunc(p.pattern_b, 80), + "co_occurrence_count": p.co_occurrence_count, + "avg_lag_seconds": _q(p.avg_lag_seconds), + "confidence": _q(p.confidence), + }) + for p in c.correlated_pairs[: self._max_pairs] + ] + + # --- Cascade chains --- + chains = [ + _compact({ + "root": _trunc(ch.root, 80), + "chain": [_trunc(s, 80) for s in ch.chain], + "total_occurrences": ch.total_occurrences, + }) + for ch in c.cascade_chains[: self._max_chains] + ] + + # --- Source hotspots (top 10) --- + hotspots = dict(list(c.source_hotspots.items())[:10]) + + return _compact({ + "total_entries": s.total_entries, + "has_timestamps": s.has_timestamps, + "time_span_seconds": _q(s.time_span_seconds), + "level_distribution": s.level_distribution, + "unique_error_count": s.unique_error_count, + "top_errors": top_errs, + "burst_windows": bursts, + "escalation_events": escalations, + "timeline": timeline, + "correlated_pairs": pairs, + "cascade_chains": chains, + "source_hotspots": hotspots, + }) + + def _prune_timeline( + self, buckets: Dict[str, Dict[str, int]] + ) -> Dict[str, Dict[str, int]]: + """Return the N busiest error/warn buckets, sorted chronologically.""" + if not buckets: + return {} + + def bucket_error_count(v: Dict[str, int]) -> int: + return sum( + cnt for lv, cnt in v.items() + if lv in ("ERROR", "CRITICAL", "WARNING") + ) + + sorted_by_activity = sorted( + buckets.items(), + key=lambda kv: bucket_error_count(kv[1]), + reverse=True, + )[: self._max_buckets] + + # Re-sort chronologically + return dict(sorted(sorted_by_activity, key=lambda kv: kv[0])) + + def _compression_notes( + self, + s: AnalysisResult, + c: CorrelationResult, + payload: Dict[str, Any], + ) -> List[str]: + notes: List[str] = [] + + dropped_errors = s.unique_error_count - len(payload.get("errs", [])) + if dropped_errors > 0: + notes.append(f"Dropped {dropped_errors} low-frequency error patterns.") + + dropped_pairs = len(c.correlated_pairs) - len(payload.get("corr", [])) + if dropped_pairs > 0: + notes.append(f"Dropped {dropped_pairs} low-confidence correlation pairs.") + + dropped_buckets = len(s.timeline_buckets) - len(payload.get("timeline", {})) + if dropped_buckets > 0: + notes.append(f"Pruned {dropped_buckets} quiet timeline buckets.") + + return notes \ No newline at end of file diff --git a/services/python-tools/tools/log-analyzer/core/correlation_engine.py b/services/python-tools/tools/log-analyzer/core/correlation_engine.py new file mode 100644 index 0000000..2d95694 --- /dev/null +++ b/services/python-tools/tools/log-analyzer/core/correlation_engine.py @@ -0,0 +1,199 @@ +from __future__ import annotations + +import re +from collections import defaultdict +from dataclasses import dataclass, field +from datetime import timedelta +from typing import Dict, List, Optional, Set, Tuple + +from .log_parser import LogEntry, LogLevel +from .statistical_analyzer import _normalise_message + + +# --------------------------------------------------------------------------- +# Data containers +# --------------------------------------------------------------------------- + +@dataclass +class CorrelatedPair: + pattern_a: str + pattern_b: str + co_occurrence_count: int + avg_lag_seconds: Optional[float] # None if no timestamps + confidence: float # 0.0–1.0 + + +@dataclass +class CascadeChain: + root: str + chain: List[str] # root → ... → leaf + total_occurrences: int + + +@dataclass +class CorrelationResult: + correlated_pairs: List[CorrelatedPair] + cascade_chains: List[CascadeChain] + source_hotspots: Dict[str, int] # source → error count + + +# --------------------------------------------------------------------------- +# Engine +# --------------------------------------------------------------------------- + +class CorrelationEngine: + """ + Detects cascading / co-occurring error patterns. + + Parameters + ---------- + time_window_seconds: + Maximum gap between two events to be considered temporally related. + min_co_occurrences: + Minimum times a pair must co-occur to be reported. + max_chain_depth: + Maximum cascade chain length. + """ + + def __init__( + self, + time_window_seconds: float = 30.0, + min_co_occurrences: int = 2, + max_chain_depth: int = 5, + ) -> None: + self._window = time_window_seconds + self._min_co = min_co_occurrences + self._max_chain_depth = max_chain_depth + + def correlate(self, entries: List[LogEntry]) -> CorrelationResult: + error_entries = [ + e for e in entries + if e.level in (LogLevel.ERROR, LogLevel.CRITICAL, LogLevel.WARNING) + ] + + source_hotspots = self._source_hotspots(error_entries) + co_occurrence = self._build_co_occurrence(error_entries) + pairs = self._build_pairs(error_entries, co_occurrence) + chains = self._build_chains(co_occurrence) + + return CorrelationResult( + correlated_pairs=pairs, + cascade_chains=chains, + source_hotspots=source_hotspots, + ) + + # ------------------------------------------------------------------ + # Internals + # ------------------------------------------------------------------ + + def _source_hotspots(self, entries: List[LogEntry]) -> Dict[str, int]: + counts: Dict[str, int] = defaultdict(int) + for e in entries: + if e.source: + counts[e.source] += 1 + return dict(sorted(counts.items(), key=lambda x: x[1], reverse=True)) + + def _build_co_occurrence( + self, entries: List[LogEntry] + ) -> Dict[str, Dict[str, List[Optional[float]]]]: + """ + Returns: pattern_a → {pattern_b: [lag_seconds, ...]} + """ + co: Dict[str, Dict[str, List[Optional[float]]]] = defaultdict( + lambda: defaultdict(list) + ) + + for i, ea in enumerate(entries): + norm_a = _normalise_message(ea.message) + for eb in entries[i + 1:]: + norm_b = _normalise_message(eb.message) + if norm_a == norm_b: + continue + + # Time-gate + if ea.timestamp and eb.timestamp: + lag = (eb.timestamp - ea.timestamp).total_seconds() + if lag < 0 or lag > self._window: + break # entries are ordered; no point continuing + co[norm_a][norm_b].append(lag) + else: + # No timestamps — use sequential proximity (within 10 entries) + if (eb is entries[min(i + 10, len(entries) - 1)]) or True: + co[norm_a][norm_b].append(None) + break + + return co + + def _build_pairs( + self, + entries: List[LogEntry], + co: Dict[str, Dict[str, List[Optional[float]]]], + ) -> List[CorrelatedPair]: + pairs: List[CorrelatedPair] = [] + + # Total occurrences of each pattern (for confidence) + totals: Dict[str, int] = defaultdict(int) + for e in entries: + totals[_normalise_message(e.message)] += 1 + + for pattern_a, successors in co.items(): + for pattern_b, lags in successors.items(): + count = len(lags) + if count < self._min_co: + continue + numeric_lags = [l for l in lags if l is not None] + avg_lag = ( + sum(numeric_lags) / len(numeric_lags) + if numeric_lags else None + ) + confidence = min(count / max(totals.get(pattern_a, 1), 1), 1.0) + pairs.append(CorrelatedPair( + pattern_a=pattern_a[:100], + pattern_b=pattern_b[:100], + co_occurrence_count=count, + avg_lag_seconds=round(avg_lag, 2) if avg_lag is not None else None, + confidence=round(confidence, 3), + )) + + pairs.sort(key=lambda p: p.co_occurrence_count, reverse=True) + return pairs[:30] # cap to top 30 to keep payload small + + def _build_chains( + self, co: Dict[str, Dict[str, List[Optional[float]]]] + ) -> List[CascadeChain]: + chains: List[CascadeChain] = [] + visited: Set[str] = set() + + # Find roots: nodes that appear as pattern_a but not (or rarely) as pattern_b + all_bs: Set[str] = {b for successors in co.values() for b in successors} + roots = [a for a in co if a not in all_bs] + + for root in roots: + chain = self._dfs(root, co, set(), depth=0) + if len(chain) >= 2: + total = sum(len(v) for v in co.get(root, {}).values()) + chains.append(CascadeChain( + root=root[:100], + chain=[c[:100] for c in chain], + total_occurrences=total, + )) + + chains.sort(key=lambda c: c.total_occurrences, reverse=True) + return chains[:10] + + def _dfs( + self, + node: str, + co: Dict[str, Dict[str, List[Optional[float]]]], + visited: Set[str], + depth: int, + ) -> List[str]: + if depth >= self._max_chain_depth or node in visited: + return [node] + visited = visited | {node} + successors = co.get(node, {}) + if not successors: + return [node] + # Follow the highest-confidence edge + best = max(successors.items(), key=lambda kv: len(kv[1])) + return [node] + self._dfs(best[0], co, visited, depth + 1) \ No newline at end of file diff --git a/services/python-tools/tools/log-analyzer/core/llm_client.py b/services/python-tools/tools/log-analyzer/core/llm_client.py new file mode 100644 index 0000000..1da7795 --- /dev/null +++ b/services/python-tools/tools/log-analyzer/core/llm_client.py @@ -0,0 +1,814 @@ +from __future__ import annotations + +import asyncio +import json +import logging +import os +import random +import re +import time +from dataclasses import dataclass +from enum import Enum +from typing import AsyncIterator, Dict, List, Optional + +import httpx + +# ── python-dotenv: load .env if present ────────────────────────────────── +try: + from dotenv import load_dotenv + load_dotenv(override=False) # env vars already set in shell take priority +except ImportError: + pass # dotenv is optional; env vars can still be set manually + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +OXLO_API_BASE = os.getenv("OXLO_API_BASE_URL", "https://api.oxlo.ai/v1") +OXLO_API_KEY = os.getenv("OXLO_API_KEY", "") + +DEFAULT_MAX_TOKENS = 4096 +MAX_RETRIES = 4 +BACKOFF_BASE = 1.5 + + +# --------------------------------------------------------------------------- +# Model catalogue — real IDs from docs.oxlo.ai/docs/api/models +# --------------------------------------------------------------------------- + +class OxloModel(str, Enum): + # Premium + DEEPSEEK_R1_0528 = "deepseek-r1-0528" + GPT_OSS_120B = "gpt-oss-120b" + KIMI_K2_THINKING = "kimi-k2-thinking" + KIMI_K2_5 = "kimi-k2.5" + LLAMA_3_3_70B = "llama-3.3-70b" + QWEN_3_32B = "qwen-3-32b" + # Pro + DEEPSEEK_R1_70B = "deepseek-r1-70b" + DEEPSEEK_V3_0324 = "deepseek-v3-0324" + DEEPSEEK_CODER_33B = "deepseek-coder-33b" + GPT_OSS_20B = "gpt-oss-20b" + LLAMA_4_MAVERICK = "llama-4-maverick-17b" + MINISTRAL_14B = "ministral-14b" + LLAMA_3_1_8B = "llama-3.1-8b" + QWEN_2_5_7B = "qwen-2.5-7b" + QWEN_3_CODER_30B = "qwen-3-coder-30b" + # Free + DEEPSEEK_R1_8B = "deepseek-r1-8b" + DEEPSEEK_V3_2 = "deepseek-v3.2" + LLAMA_3_2_3B = "llama-3.2-3b" + MISTRAL_7B = "mistral-7b" + + +class TaskType(str, Enum): + """Logical task categories used for model routing.""" + REASONING = "reasoning" # RCA, cascade analysis, multi-step diagnosis + GENERAL = "general" # Quick summaries, Q&A, burst commentary + CODE = "code" # Fix suggestions, query recommendations, regexes + FAST = "fast" # Streaming previews, REPL follow-ups + + +# Default model per task — overrideable via .env +_DEFAULT_MODELS: Dict[TaskType, str] = { + TaskType.REASONING: os.getenv("OXLO_MODEL_REASONING", OxloModel.DEEPSEEK_R1_0528), + TaskType.GENERAL: os.getenv("OXLO_MODEL_GENERAL", OxloModel.DEEPSEEK_V3_2), + TaskType.CODE: os.getenv("OXLO_MODEL_CODE", OxloModel.DEEPSEEK_R1_0528), + TaskType.FAST: os.getenv("OXLO_MODEL_FAST", OxloModel.DEEPSEEK_V3_2), +} + +_CODE_KEYWORDS = re.compile( + r'\b(fix|patch|query|index|sql|regex|config|code|script|function|variable' + r'|command|bash|python|node|nginx|systemd|kubernetes|kubectl|helm)\b', + re.IGNORECASE, +) +_FAST_KEYWORDS = re.compile( + r'\b(quick|briefly|tldr|summary|what is|explain|describe|list)\b', + re.IGNORECASE, +) + + +class ModelRouter: + """ + Selects the appropriate Oxlo model based on query intent. + + Priority: + 1. Explicit task_type override from the caller. + 2. Keyword heuristic on the user query string. + 3. Default: REASONING (safest for log RCA). + """ + + def __init__(self, model_map: Optional[Dict[TaskType, str]] = None) -> None: + self._map = model_map or dict(_DEFAULT_MODELS) + + def select( + self, + user_query: str, + task_type: Optional[TaskType] = None, + ) -> str: + if task_type is not None: + return self._map[task_type] + if _CODE_KEYWORDS.search(user_query): + return self._map[TaskType.CODE] + if _FAST_KEYWORDS.search(user_query) and len(user_query) < 80: + return self._map[TaskType.FAST] + return self._map[TaskType.REASONING] + + def model_for(self, task: TaskType) -> str: + return self._map[task] + + def all_models(self) -> Dict[str, str]: + return {t.value: m for t, m in self._map.items()} + + def update(self, task: TaskType, model_id: str) -> None: + """Hot-swap a model at runtime (e.g. from the web portal).""" + self._map[task] = model_id + + +# --------------------------------------------------------------------------- +# System prompt (paid once per session — not per request) +# --------------------------------------------------------------------------- + +def _get_system_prompt(report_type: str = "detailed") -> str: + if report_type == "fix_only": + base_prompt = """ +You are a Staff-level Site Reliability Engineer and Systems Architect responding to a production incident. +Your ONLY priority right now is to provide immediate, actionable fixes and mitigation steps. The developer is looking to build fast and needs to know EXACTLY what to fix first. + +WHAT YOU RECEIVE +---------------- +A compact JSON object describing an incident, produced by a local pipeline. + +HOW TO REASON +------------- +Think step by step before writing your response: + +Step 1 — ORIENT & IDENTIFY: Quickly scan the payload (especially the raw log excerpt, hotspots, and error chains) to identify what is fundamentally broken. + +Step 2 — IMMEDIATE MITIGATION: What is the fastest way to stop the bleeding? Produce specific, runnable commands or configuration changes to stabilize the system immediately. + +Step 3 — HIGHLY DETAILED FIXES: Provide the exact, step-by-step solution required to permanently fix the issue. + - Write EXACT commands (e.g., `kubectl scale deploy...`, `ALTER TABLE...`). + - Write specific code snippets if a code change is needed. + - Do NOT give generic advice (like "check your configuration"). Tell the developer exactly what configuration to change and to what value. + - Be extremely detailed in the "description" field of your fixes. + +OUTPUT FORMAT — MANDATORY SCHEMA +--------------------------------- +You MUST return ONLY a JSON object with EXACTLY these top-level keys. +No other keys. No extra wrapping. No prose outside the JSON. +""" + else: + base_prompt = """ +You are a Staff-level Site Reliability Engineer and Systems Architect with deep +expertise in distributed systems, Kubernetes, databases, caching layers, and +cloud-native infrastructure. You are performing a thorough Root Cause Analysis +(RCA) on a production system incident described by a compressed log-analysis +payload. + +WHAT YOU RECEIVE +---------------- +A compact JSON object produced by a local 5-stage pipeline that has already: + 1. Redacted all PII, IPs, credentials, and secrets — placeholders like + , , represent real values stripped for privacy. + 2. Parsed and normalised heterogeneous log formats (Nginx, syslog, JSON, + Python/Node/Java app logs, Kubernetes events). + 3. Deduplicated identical errors and computed their frequencies. + 4. Detected temporal correlations and cascade chains between error patterns. + 5. Compressed everything into a token-efficient JSON summary. + +JSON FIELD ALIASES (learn these — every field in the payload uses them) +----------------------------------------------------------------------- +n = total log entries analysed +has_ts = whether timestamps were present +span_s = total observation window in seconds +lvls = level distribution e.g. {"ERROR": 12, "WARNING": 3} +u_errs = count of unique (deduplicated) error patterns +errs = top error patterns, each: {p: pattern_string, c: occurrence_count} +bursts = burst windows (periods of abnormally high error rate): + {s: start_iso, e: end_iso, ec: error_count, mx: rate_multiplier} +escalations = severity escalation events (WARNING -> ERROR -> CRITICAL chains): + {ts: timestamp, f: from_level, t: to_level, m: message_snippet} +corr = correlated error pairs (A reliably precedes B): + {a: pattern_a, b: pattern_b, cnt: co-occurrences, + lag: avg_lag_seconds, conf: confidence_0_to_1} +chains = cascade chains (root -> ... -> leaf): + {r: root_pattern, ch: [chain_list], tot: total_occurrences} +hotspots = services/hosts with the most errors: {name: count} +timeline = per-minute breakdown: {"2026-01-01T10:32:00": {"ERROR": 5, ...}} + +HOW TO REASON +------------- +Think step by step before writing your response: + +Step 1 — ORIENT: What is the observation window (span_s)? How many entries + total (n)? What is the dominant log level? Is the situation acute (burst) + or chronic (sustained high error rate)? + +Step 2 — IDENTIFY ROOT CAUSES: Start from the cascade chains (chains) — the + root (r) of a chain is the most likely initial failure. Cross-reference with + hotspots to identify which service originated the problem. Use correlated + pairs (corr) to understand the propagation path. High-confidence pairs + (conf > 0.7) with short lags (lag < 5s) indicate tight causal links. + +Step 3 — ASSESS SEVERITY: Use burst windows (bursts) to judge rate and + duration. Use escalation events to judge whether the system recovered or + is still degrading. A rate_multiplier (mx) > 10x baseline is a P1 incident. + +Step 4 — FORMULATE FIXES: For each root cause, produce a specific, actionable + fix. The solution MUST BE HIGHLY DETAILED, providing step-by-step instructions, + exact commands, or specific code changes required. Do not give generic advice. + Reference the actual patterns from errs[], the actual services from hotspots, + and the actual cascade from chains[]. + +Step 5 — THINK PREVENTION: What architectural or operational change would + prevent this class of failure? Think circuit breakers, rate limits, resource + limits, index additions, connection pool sizing, alerting thresholds. + +OUTPUT FORMAT — MANDATORY SCHEMA +--------------------------------- +You MUST return ONLY a JSON object with EXACTLY these top-level keys. +No other keys. No extra wrapping. No prose outside the JSON. +""" + if report_type == "fix_only": + schema_prompt = """ +REQUIRED TOP-LEVEL KEYS (all must be present, even if empty array/string): + executive_summary string + severity string — exactly one of: "P1", "P2", "P3", "P4" + immediate_actions array of strings + fixes array of objects + +SCHEMA FOR EACH ARRAY ELEMENT: +fixes items: + { "description": string (Must be a highly detailed, step-by-step explanation of the solution), "priority": "HIGH"|"MEDIUM"|"LOW", + "effort": string, "addresses": string } + +EXAMPLE OF A CORRECT RESPONSE (use this exact structure): +``` +{ + "executive_summary": "The auth-service TLS certificate expired causing cascading auth failures.", + "severity": "P1", + "immediate_actions": [ + "Manually issue a new certificate: `certbot certonly --dns-cloudflare -d auth-service.internal` then restart auth-service" + ], + "fixes": [ + { + "description": "Fix DNS propagation for ACME DNS-01 challenge.", + "priority": "HIGH", + "effort": "2 hours", + "addresses": "Auto-renewal failure" + } + ] +} +``` +""" + else: + schema_prompt = """ +REQUIRED TOP-LEVEL KEYS (all must be present, even if empty array/string): + executive_summary string + severity string — exactly one of: "P1", "P2", "P3", "P4" + severity_rationale string + root_causes array of objects + incident_timeline array of objects + immediate_actions array of strings + fixes array of objects + prevention_strategies array of strings + monitoring_recommendations array of strings + follow_up_questions array of strings + +SCHEMA FOR EACH ARRAY ELEMENT: + +root_causes items: + { "title": string, "evidence": string, "confidence": number 0.0-1.0, + "affected_services": array of strings } + +incident_timeline items: + { "time": string, "event": string } + +fixes items: + { "description": string (Must be a highly detailed, step-by-step explanation of the solution), "priority": "HIGH"|"MEDIUM"|"LOW", + "effort": string, "addresses": string } + +EXAMPLE OF A CORRECT RESPONSE (use this exact structure): +``` +{ + "executive_summary": "The auth-service TLS certificate expired causing cascading auth failures across billing-api, user-api, and internal-gateway. Billing processed 2 charges with stale tokens creating a security incident. Auto-renewal failed due to DNS propagation issues and alerting was broken, so the team had no warning.", + "severity": "P1", + "severity_rationale": "Complete auth unavailability for 12+ minutes with confirmed unauthenticated charge processing.", + "root_causes": [ + { + "title": "Expired TLS certificate on auth-service", + "evidence": "The payload shows `TLS handshake failed: certificate has expired` appearing 6 times from auth-service (hotspot). The cascade chain root is `Auto-renewal failed: ACME challenge DNS propagation timeout` which directly triggered all downstream failures.", + "confidence": 0.97, + "affected_services": ["auth-service", "billing-api", "user-api", "internal-gateway"] + } + ], + "incident_timeline": [ + {"time": "T-7d", "event": "cert-manager warned certificate expiring in 7 days — ignored"}, + {"time": "T-1d", "event": "cert-manager warned certificate expiring in 1 day — PagerDuty delivery failed"}, + {"time": "T+0:00", "event": "Certificate expired, auto-renewal failed (DNS propagation timeout)"}, + {"time": "T+7:12", "event": "First downstream TLS failures in billing-api and user-api"}, + {"time": "T+7:24", "event": "Cached tokens expired — billing halted, user-api returning 401 universally"}, + {"time": "T+7:25", "event": "847 requests rejected in 60s — full outage"} + ], + "immediate_actions": [ + "Manually issue a new certificate: `certbot certonly --dns-cloudflare -d auth-service.internal` then restart auth-service", + "Revoke and rotate all tokens issued during the degraded window (07:12–07:25 UTC)", + "Audit billing transactions ORD-88821 through ORD-88891 for charges made with stale tokens" + ], + "fixes": [ + { + "description": "Fix DNS propagation for ACME DNS-01 challenge. Check DNS provider TTL settings and ensure the cert-manager service account has write access to the DNS zone.", + "priority": "HIGH", + "effort": "2 hours", + "addresses": "Auto-renewal failure" + }, + { + "description": "Add a secondary alert channel (email + Slack) for cert expiry warnings so a single PagerDuty webhook failure does not silence the alert.", + "priority": "HIGH", + "effort": "1 hour", + "addresses": "Silent alert delivery failure" + } + ], + "prevention_strategies": [ + "Set certificate renewal threshold to 30 days (not 7) and configure multi-channel alerting", + "Add a Kubernetes CronJob that checks certificate validity daily and pages on-call if < 14 days remain" + ], + "monitoring_recommendations": [ + "Alert: cert expiry < 14 days on any internal service certificate — threshold: 14d, severity: P2", + "Alert: TLS handshake failure rate > 1% over 60s — threshold: 1%, severity: P1" + ], + "follow_up_questions": [ + "Were any billing charges processed between 07:12 and 07:24 with the stale token that need reversal?", + "Why did the internal-gateway fall back to signature-only JWT checks instead of rejecting requests?" + ] +} +``` +""" + rules = """ +QUALITY RULES (violations will be penalised): +- Use EXACTLY the key names shown above — no aliases, no camelCase, no extras +- root_causes: cite specific pattern strings from errs[], service names from + hotspots, chain roots from chains[], lag/confidence values from corr[] +- immediate_actions: must be runnable commands or numbered steps — never vague +- If redacted placeholders appear (, ), note the data type and + security implications in evidence or executive_summary +- If n < 5 or has_ts=false, lower confidence values and note limited observability +- Do NOT invent service names or error patterns not in the payload +- Do NOT include any text, key, or value outside this schema +""" + return (base_prompt + "\n" + schema_prompt + "\n" + rules).strip() + + +# --------------------------------------------------------------------------- +# Response model +# --------------------------------------------------------------------------- + +@dataclass +class RCAResponse: + # Core RCA fields + root_causes: List[Dict] + immediate_actions: List[str] + fixes: List[Dict] + prevention_strategies: List[str] + follow_up_questions: List[str] + executive_summary: str + # New enriched fields + severity: str # P1 | P2 | P3 | P4 + severity_rationale: str + incident_timeline: List[Dict] # [{time, event}] + monitoring_recommendations: List[str] + # Metadata + raw_json: str + usage: Dict[str, int] + model: str + task_type: str + latency_ms: float + + @classmethod + def from_api_response( + cls, + data: Dict, + latency_ms: float, + task_type: TaskType = TaskType.REASONING, + ) -> "RCAResponse": + content = data["choices"][0]["message"]["content"] + raw_json = content.strip() + + import re + + def _scrub(text: str) -> str: + """Remove ALL model reasoning / callout artefacts. + + Handles every variant the model emits, including TRUNCATED blocks + where the model hit the token limit mid-output: + • ::: red-box\\n...\\n::: closed callout block + • ::: red-box\\n... UNCLOSED callout (truncated) + • ... closed think block + • ... UNCLOSED think block (truncated) + • bare orphaned ::: lines + """ + # 1. Closed ::: callout blocks + text = re.sub( + r":::[^\n]*\n.*?(?:\n\s*:::[ \t]*|\s*:::[ \t]*\Z)", + "", + text, + flags=re.DOTALL, + ) + # 2. UNCLOSED ::: callout — strip from opener to end-of-string + text = re.sub(r":::[^\n]*\n.*\Z", "", text, flags=re.DOTALL) + # 3. Closed blocks + text = re.sub(r".*?", "", text, flags=re.DOTALL) + # 4. UNCLOSED block — strip from to end-of-string + text = re.sub(r".*\Z", "", text, flags=re.DOTALL) + # 5. Any leftover ::: lines (orphaned opener or closer) + text = re.sub(r"^[ \t]*:::.*$", "", text, flags=re.MULTILINE) + return text.strip() + + def _extract_json(text: str) -> str: + """ + Robustly extract a JSON object from text that may have garbage + before/after it. Strategy: + 1. Strip known artefacts with _scrub. + 2. Strip ```json / ``` fences. + 3. Find the first '{' and last '}' and slice — handles any + preamble/postamble the model added. + """ + text = _scrub(text) + + # Strip markdown code fences + text = re.sub(r"^```(?:json)?\s*", "", text.strip(), flags=re.IGNORECASE) + text = re.sub(r"\s*```$", "", text.strip()) + + # Find JSON object boundaries + start = text.find("{") + end = text.rfind("}") + if start != -1 and end != -1 and end > start: + return text[start : end + 1] + return text # return as-is; json.loads will fail and we'll fallback + + # Extract and clean the raw response before parsing + raw_json = _extract_json(raw_json) + + try: + parsed = json.loads(raw_json.strip()) + except json.JSONDecodeError: + # Last resort: the model returned prose, not JSON. + # Wrap it so at least executive_summary shows something useful. + parsed = {"executive_summary": _scrub(raw_json)} + + # Also scrub string fields AFTER parsing — garbage can live inside + # JSON string values (e.g. executive_summary contains ::: callout text). + def _clean_field(v: object) -> object: + if isinstance(v, str): + return _scrub(v) + if isinstance(v, list): + return [_clean_field(i) for i in v] + if isinstance(v, dict): + return {k: _clean_field(val) for k, val in v.items()} + return v + + parsed = {k: _clean_field(v) for k, v in parsed.items()} + + return cls( + root_causes=parsed.get("root_causes", []), + immediate_actions=parsed.get("immediate_actions", []), + fixes=parsed.get("fixes", []), + prevention_strategies=parsed.get("prevention_strategies", []), + follow_up_questions=parsed.get("follow_up_questions", []), + executive_summary=parsed.get("executive_summary", ""), + severity=parsed.get("severity", "P3"), + severity_rationale=parsed.get("severity_rationale", ""), + incident_timeline=parsed.get("incident_timeline", []), + monitoring_recommendations=parsed.get("monitoring_recommendations", []), + raw_json=raw_json, + usage=data.get("usage", {}), + model=data.get("model", ""), + task_type=task_type.value, + latency_ms=latency_ms, + ) + + +# --------------------------------------------------------------------------- +# Client +# --------------------------------------------------------------------------- + +class OxloClient: + + def __init__( + self, + api_key: Optional[str] = None, + model_map: Optional[Dict[TaskType, str]] = None, + max_tokens: int = DEFAULT_MAX_TOKENS, + token_budget: Optional[int] = None, + ) -> None: + self._key = api_key or OXLO_API_KEY + self._max_tokens = max_tokens + self._budget = token_budget or int(os.getenv("OXLO_TOKEN_BUDGET", "4000")) + self._router = ModelRouter(model_map) + + if not self._key: + raise ValueError( + "Oxlo API key is required.\n" + " Option 1: Add OXLO_API_KEY=your-key to your .env file\n" + " Option 2: export OXLO_API_KEY=your-key in your shell\n" + " Option 3: OxloClient(api_key='your-key')" + ) + + self._headers = { + "Authorization": f"Bearer {self._key}", + "Content-Type": "application/json", + } + + # ------------------------------------------------------------------ + # Primary interface + # ------------------------------------------------------------------ + + async def analyze( + self, + payload_json: str, + user_query: str, + estimated_tokens: int = 0, + task_type: Optional[TaskType] = None, + raw_log_excerpt: str = "", + report_type: str = "detailed", + ) -> RCAResponse: + """ + Send the compressed payload to Oxlo and return a structured RCAResponse. + Model is auto-selected via intent heuristic unless task_type is specified. + """ + self._guard_budget(estimated_tokens) + model = self._router.select(user_query, task_type) + resolved_task = task_type or self._infer_task(user_query) + logger.info("analyze() model=%s task=%s tokens≈%d", model, resolved_task.value, estimated_tokens) + + body = self._build_body( + self._build_messages(payload_json, user_query, raw_log_excerpt, report_type), + model, + stream=False, + report_type=report_type, + ) + data = await self._post_with_retry(body) + return RCAResponse.from_api_response(data, data.pop("_latency_ms", 0.0), resolved_task) + + async def analyze_with_model( + self, + payload_json: str, + user_query: str, + model: str, + estimated_tokens: int = 0, + raw_log_excerpt: str = "", + report_type: str = "detailed", + ) -> RCAResponse: + """Call a specific Oxlo model by ID string, bypassing the router.""" + if report_type == "fix_only": + print("🔍 [TRACKER] core/llm_client.py: analyze_with_model() running in FIX ONLY mode", flush=True) + else: + print("🔍 [TRACKER] core/llm_client.py: analyze_with_model() running in DETAILED mode", flush=True) + + self._guard_budget(estimated_tokens) + logger.info("analyze_with_model() model=%s", model) + body = self._build_body( + self._build_messages(payload_json, user_query, raw_log_excerpt, report_type), + model, + stream=False, + report_type=report_type, + ) + data = await self._post_with_retry(body) + return RCAResponse.from_api_response(data, data.pop("_latency_ms", 0.0)) + + async def analyze_stream( + self, + payload_json: str, + user_query: str, + estimated_tokens: int = 0, + task_type: Optional[TaskType] = None, + report_type: str = "detailed", + ) -> AsyncIterator[str]: + """ + Streaming variant — yields text chunks as they arrive. + Defaults to the FAST model for lowest latency. + """ + self._guard_budget(estimated_tokens) + model = self._router.select(user_query, task_type or TaskType.FAST) + logger.info("analyze_stream() model=%s", model) + + body = self._build_body( + self._build_messages(payload_json, user_query, report_type=report_type), + model, + stream=True, + report_type=report_type, + ) + + async with httpx.AsyncClient(timeout=120) as client: + async with client.stream( + "POST", + f"{OXLO_API_BASE}/chat/completions", + headers=self._headers, + json=body, + ) as resp: + resp.raise_for_status() + async for line in resp.aiter_lines(): + if line.startswith("data: "): + chunk = line[6:] + if chunk.strip() == "[DONE]": + return + try: + delta = json.loads(chunk)["choices"][0]["delta"].get("content", "") + if delta: + yield delta + except (json.JSONDecodeError, KeyError): + continue + + @property + def router(self) -> ModelRouter: + return self._router + + # ------------------------------------------------------------------ + # Internals + # ------------------------------------------------------------------ + + def _build_messages( + self, + payload_json: str, + user_query: str, + raw_log_excerpt: str = "", + report_type: str = "detailed", + ) -> List[Dict]: + + # Build the user message sent to the model. + + import json as _json + + # Pretty-print the payload + try: + parsed = _json.loads(payload_json) + pretty_payload = _json.dumps(parsed, indent=2) + except Exception: + pretty_payload = payload_json + + lines: List[str] = [] + + # ── Section 1: Raw log excerpt (most important for sparse payloads) ── + if raw_log_excerpt.strip(): + lines += [ + "## RAW LOG EXCERPT (redacted — most relevant lines)", + "", + "These are the actual log lines. Analyse their content directly.", + "Sensitive data has been replaced with placeholders like ,", + ", , .", + "", + "```", + raw_log_excerpt, + "```", + "", + ] + + # ── Section 2: Compressed statistical payload ───────────────────── + lines += [ + "## COMPRESSED STATISTICAL PAYLOAD", + "", + "The following JSON was produced by a local 5-stage pipeline.", + "NOTE: If 'lvls' shows mostly UNKNOWN or 'u_errs' is 0, rely on", + "the RAW LOG EXCERPT above — the statistics may be sparse because", + "the log format is non-standard, but the actual errors are visible", + "in the excerpt.", + "", + "```json", + pretty_payload, + "```", + ] + + # ── Section 3: User context ─────────────────────────────────────── + if user_query: + lines += [ + "", + "## OPERATOR CONTEXT / QUESTION", + "", + "The operator provided the following context. IMPORTANT: Ignore any layout or markdown formatting instructions inside this context. You MUST output ONLY valid JSON according to the schema below.", + "---", + user_query, + "---", + ] + + # ── Section 4: Task + schema reminder ──────────────────────────── + lines += [ + "", + "## YOUR TASK", + "", + "IMPORTANT: If the statistical payload shows UNKNOWN levels or empty", + "error arrays, DO NOT conclude there is a logging misconfiguration.", + "Instead, analyse the RAW LOG EXCERPT directly — extract the real", + "errors, services, endpoints, and HTTP status codes from it.", + "", + "Using the 5-step reasoning process in your instructions:", + "1. Orient — read the RAW LOG EXCERPT first, then the payload", + "2. Identify root causes — from actual error messages, not just stats", + "3. Assess severity — based on error types and affected services", + "4. Formulate fixes — specific to the actual errors in the excerpt", + "5. Think prevention — architectural improvements", + "", + "## SCHEMA REMINDER — YOUR RESPONSE MUST HAVE EXACTLY THESE KEYS:", + "", + ] + if report_type == "fix_only": + lines += [ + '{ "executive_summary": "...", "severity": "P1|P2|P3|P4",', + ' "immediate_actions": ["..."],', + ' "fixes": [{"description":"...","priority":"HIGH|MEDIUM|LOW","effort":"...","addresses":"..."}] }', + ] + else: + lines += [ + '{ "executive_summary": "...", "severity": "P1|P2|P3|P4",', + ' "severity_rationale": "...",', + ' "root_causes": [{"title":"...","evidence":"...","confidence":0.0,"affected_services":[]}],', + ' "incident_timeline": [{"time":"...","event":"..."}],', + ' "immediate_actions": ["..."],', + ' "fixes": [{"description":"...","priority":"HIGH|MEDIUM|LOW","effort":"...","addresses":"..."}],', + ' "prevention_strategies": ["..."],', + ' "monitoring_recommendations": ["..."],', + ' "follow_up_questions": ["..."] }', + ] + lines += [ + "", + "No other keys. No text outside the JSON object.", + ] + + content = "\n".join(lines) + return [{"role": "user", "content": content}] + + def _build_body(self, messages: List[Dict], model: str, stream: bool, report_type: str = "detailed") -> Dict: + body: Dict = { + "model": model, + "messages": messages, + "system": _get_system_prompt(report_type), + "max_tokens": self._max_tokens, + "temperature": 0.2, + "stream": stream, + } + if not stream: + body["response_format"] = {"type": "json_object"} + return body + + def _guard_budget(self, estimated_tokens: int) -> None: + if self._budget and estimated_tokens > self._budget: + raise RuntimeError( + f"Estimated payload tokens ({estimated_tokens}) exceeds " + f"configured budget ({self._budget}). " + "Raise OXLO_TOKEN_BUDGET in .env or tune ContextCompressor limits." + ) + + def _infer_task(self, user_query: str) -> TaskType: + if _CODE_KEYWORDS.search(user_query): + return TaskType.CODE + if _FAST_KEYWORDS.search(user_query) and len(user_query) < 80: + return TaskType.FAST + return TaskType.REASONING + + async def _post_with_retry(self, body: Dict) -> Dict: + last_exc: Optional[Exception] = None + for attempt in range(MAX_RETRIES): + t0 = time.perf_counter() + try: + async with httpx.AsyncClient(timeout=90) as client: + resp = await client.post( + f"{OXLO_API_BASE}/chat/completions", + headers=self._headers, + json=body, + ) + latency = (time.perf_counter() - t0) * 1000 + + if resp.status_code in (429,) or resp.status_code >= 500: + wait = self._backoff(attempt) + logger.warning( + "Oxlo API %s — retry in %.1fs (attempt %d/%d)", + resp.status_code, wait, attempt + 1, MAX_RETRIES, + ) + await asyncio.sleep(wait) + last_exc = httpx.HTTPStatusError( + f"HTTP {resp.status_code}", + request=resp.request, + response=resp, + ) + continue + + resp.raise_for_status() + data = resp.json() + data["_latency_ms"] = round(latency, 2) + return data + + except (httpx.ConnectError, httpx.TimeoutException) as exc: + wait = self._backoff(attempt) + logger.warning("Network error: %s — retry in %.1fs", exc, wait) + await asyncio.sleep(wait) + last_exc = exc + + raise RuntimeError( + f"Oxlo API call failed after {MAX_RETRIES} attempts: {last_exc}" + ) + + @staticmethod + def _backoff(attempt: int) -> float: + return BACKOFF_BASE ** attempt + random.uniform(0, 0.5) \ No newline at end of file diff --git a/services/python-tools/tools/log-analyzer/core/log_parser.py b/services/python-tools/tools/log-analyzer/core/log_parser.py new file mode 100644 index 0000000..167c456 --- /dev/null +++ b/services/python-tools/tools/log-analyzer/core/log_parser.py @@ -0,0 +1,433 @@ +from __future__ import annotations + +import json +import re +from dataclasses import dataclass, field +from datetime import datetime, timezone +from enum import Enum +from typing import Dict, Iterator, List, Optional + + +# --------------------------------------------------------------------------- +# Schema +# --------------------------------------------------------------------------- + +class LogLevel(str, Enum): + TRACE = "TRACE" + DEBUG = "DEBUG" + INFO = "INFO" + NOTICE = "NOTICE" + WARNING = "WARNING" + ERROR = "ERROR" + CRITICAL = "CRITICAL" + UNKNOWN = "UNKNOWN" + + @classmethod + def from_str(cls, raw: str) -> "LogLevel": + mapping = { + "trace": cls.TRACE, + "debug": cls.DEBUG, + "info": cls.INFO, + "notice": cls.NOTICE, + "warn": cls.WARNING, + "warning": cls.WARNING, + "error": cls.ERROR, + "err": cls.ERROR, + "critical": cls.CRITICAL, + "crit": cls.CRITICAL, + "fatal": cls.CRITICAL, + "emerg": cls.CRITICAL, + "alert": cls.CRITICAL, + } + return mapping.get(raw.lower(), cls.UNKNOWN) + + +@dataclass +class LogEntry: + raw_line: str + timestamp: Optional[datetime] = None + level: LogLevel = LogLevel.UNKNOWN + source: Optional[str] = None # host, service, logger name + message: str = "" + stack_trace: Optional[str] = None # stitched multi-line + extra: Dict[str, str] = field(default_factory=dict) + format_detected: str = "unknown" + + +# --------------------------------------------------------------------------- +# Per-format parsers +# --------------------------------------------------------------------------- + +_TS_FORMATS = [ + "%Y-%m-%dT%H:%M:%S.%f%z", + "%Y-%m-%dT%H:%M:%S%z", + "%Y-%m-%d %H:%M:%S,%f", + "%Y-%m-%d %H:%M:%S.%f", + "%Y-%m-%d %H:%M:%S", + "%d/%b/%Y:%H:%M:%S %z", + "%b %d %H:%M:%S", +] + + +def _parse_timestamp(raw: str) -> Optional[datetime]: + raw = raw.strip() + for fmt in _TS_FORMATS: + try: + dt = datetime.strptime(raw, fmt) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + except ValueError: + continue + return None + + +# --- JSON --- + +def _try_json(line: str) -> Optional[LogEntry]: + if not line.startswith("{"): + return None + try: + obj = json.loads(line) + except json.JSONDecodeError: + return None + + level_raw = ( + obj.get("level") or obj.get("severity") or + obj.get("log_level") or obj.get("lvl") or "" + ) + msg = ( + obj.get("message") or obj.get("msg") or + obj.get("text") or obj.get("log") or line + ) + ts_raw = ( + obj.get("timestamp") or obj.get("time") or + obj.get("@timestamp") or obj.get("ts") or "" + ) + return LogEntry( + raw_line=line, + timestamp=_parse_timestamp(str(ts_raw)) if ts_raw else None, + level=LogLevel.from_str(str(level_raw)), + source=obj.get("logger") or obj.get("service") or obj.get("host"), + message=str(msg), + extra={k: str(v) for k, v in obj.items() if k not in {"message", "msg", "level", "timestamp", "time"}}, + format_detected="json", + ) + + +# --- Nginx / Apache Combined --- +_NGINX_RE = re.compile( + r'(?P\S+)\s+-\s+(?P\S+)\s+' + r'\[(?P