From cf2e86de9eaddc1219bb1a17880f3efa7e42ec50 Mon Sep 17 00:00:00 2001 From: "raghav.mehndiratta" Date: Thu, 2 Apr 2026 13:35:01 -0700 Subject: [PATCH 1/4] WIP --- .env.example | 40 ++++++ src/eva/assistant/pipeline/turn_config.py | 135 ++++++++++++++++++ src/eva/assistant/server.py | 82 ++++++++--- src/eva/models/config.py | 158 ++++++++++++++++++---- 4 files changed, 373 insertions(+), 42 deletions(-) create mode 100644 src/eva/assistant/pipeline/turn_config.py diff --git a/.env.example b/.env.example index d11adaaa..d820e37e 100644 --- a/.env.example +++ b/.env.example @@ -211,3 +211,43 @@ EVA_RECORD_IDS= # Logging level (DEBUG | INFO | WARNING | ERROR | CRITICAL) EVA_LOG_LEVEL=INFO + +# ============================================== +# Optional: Turn Detection & VAD Configuration +# ============================================== +# Fine-tune user turn detection and voice activity detection. +# Leave commented to use smart defaults. + +# User turn start strategy: vad | transcription | external +# - vad: Start turn when VAD detects speech (default) +# - transcription: Start turn when STT produces transcription +# - external: Delegate to external service (e.g., Deepgram Flux) +# EVA_MODEL__TURN_START_STRATEGY=vad + +# User turn start strategy parameters (JSON) +# EVA_MODEL__TURN_START_STRATEGY_PARAMS='{}' + +# User turn stop strategy: turn_analyzer | speech_timeout | external +# - turn_analyzer: Use smart turn analyzer to detect natural turn end (default) +# - speech_timeout: Stop after fixed silence duration +# - external: Delegate to external service +# EVA_MODEL__TURN_STOP_STRATEGY=turn_analyzer + +# User turn stop strategy parameters (JSON) +# For speech_timeout: {"user_speech_timeout": 0.8} +# For turn_analyzer: automatically uses smart turn detection +# EVA_MODEL__TURN_STOP_STRATEGY_PARAMS='{}' + +# Note: For services with built-in turn detection (e.g., Deepgram Flux), set both to 'external': +# EVA_MODEL__TURN_START_STRATEGY=external +# EVA_MODEL__TURN_STOP_STRATEGY=external + +# VAD (Voice Activity Detection) analyzer: silero +# EVA_MODEL__VAD=silero + +# VAD parameters (JSON) +# - confidence: Minimum confidence threshold (0.0-1.0, default: 0.7) +# - start_secs: Duration to wait before confirming voice start (default: 0.2) +# - stop_secs: Duration to wait before confirming voice stop (default: 0.2) +# - min_volume: Minimum audio volume threshold (0.0-1.0, default: 0.6) +# EVA_MODEL__VAD_PARAMS='{"start_secs": 0.1, "stop_secs": 0.8, "min_volume": 0.6, "confidence": 0.7}' diff --git a/src/eva/assistant/pipeline/turn_config.py b/src/eva/assistant/pipeline/turn_config.py new file mode 100644 index 00000000..ead7dc87 --- /dev/null +++ b/src/eva/assistant/pipeline/turn_config.py @@ -0,0 +1,135 @@ +"""Factory functions for creating turn strategies and VAD analyzers from configuration. + +This module provides functions to instantiate Pipecat turn strategies and VAD analyzers +based on configuration settings from environment variables or config files. +""" + +from typing import Any + +from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams +from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADParams +from pipecat.turns.user_start import ( + BaseUserTurnStartStrategy, + ExternalUserTurnStartStrategy, + TranscriptionUserTurnStartStrategy, + VADUserTurnStartStrategy, +) +from pipecat.turns.user_stop import ( + BaseUserTurnStopStrategy, + ExternalUserTurnStopStrategy, + SpeechTimeoutUserTurnStopStrategy, + TurnAnalyzerUserTurnStopStrategy, +) + +from eva.utils.logging import get_logger + +logger = get_logger(__name__) + + +def create_vad_analyzer(vad_type: str | None, vad_params: dict[str, Any]) -> VADAnalyzer | None: + """Create a VAD analyzer from configuration. + + Args: + vad_type: VAD analyzer type ('silero' or None for default) + vad_params: VAD parameters (confidence, start_secs, stop_secs, min_volume) + + Returns: + VAD analyzer instance, or None if vad_type is None + + Raises: + ValueError: If vad_type is not supported + """ + if vad_type is None: + return None + + vad_type_lower = vad_type.lower() + + if vad_type_lower == "silero": + # Create VADParams, respecting existing defaults if no params specified + params = VADParams(**vad_params) if vad_params else None + return SileroVADAnalyzer(params=params) + else: + raise ValueError( + f"Unsupported VAD type: {vad_type}. Supported types: 'silero'" + ) + + +def create_turn_start_strategy( + strategy_type: str | None, + strategy_params: dict[str, Any], +) -> BaseUserTurnStartStrategy | None: + """Create a user turn start strategy from configuration. + + Args: + strategy_type: Strategy type ('vad', 'transcription', 'external', or None for default) + strategy_params: Strategy-specific parameters + + Returns: + Turn start strategy instance, or None if strategy_type is None + + Raises: + ValueError: If strategy_type is not supported + """ + if strategy_type is None: + return None + + strategy_type_lower = strategy_type.lower() + + if strategy_type_lower == "vad": + # VADUserTurnStartStrategy has no required parameters + return VADUserTurnStartStrategy(**strategy_params) + elif strategy_type_lower == "transcription": + # TranscriptionUserTurnStartStrategy has no required parameters + return TranscriptionUserTurnStartStrategy(**strategy_params) + elif strategy_type_lower == "external": + # ExternalUserTurnStartStrategy has no required parameters + return ExternalUserTurnStartStrategy(**strategy_params) + else: + raise ValueError( + f"Unsupported turn start strategy: {strategy_type}. " + f"Supported types: 'vad', 'transcription', 'external'" + ) + + +def create_turn_stop_strategy( + strategy_type: str | None, + strategy_params: dict[str, Any], + smart_turn_stop_secs: float | None = None, +) -> BaseUserTurnStopStrategy | None: + """Create a user turn stop strategy from configuration. + + Args: + strategy_type: Strategy type ('speech_timeout', 'turn_analyzer', 'external', or None for default) + strategy_params: Strategy-specific parameters + smart_turn_stop_secs: stop_secs for SmartTurnParams (used with turn_analyzer strategy) + + Returns: + Turn stop strategy instance, or None if strategy_type is None + + Raises: + ValueError: If strategy_type is not supported + """ + if strategy_type is None: + return None + + strategy_type_lower = strategy_type.lower() + + if strategy_type_lower == "speech_timeout": + # SpeechTimeoutUserTurnStopStrategy accepts user_speech_timeout parameter + return SpeechTimeoutUserTurnStopStrategy(**strategy_params) + elif strategy_type_lower == "turn_analyzer": + # TurnAnalyzerUserTurnStopStrategy requires a turn_analyzer instance + # If smart_turn_stop_secs is provided, use it; otherwise let SmartTurnParams use its default + smart_params = SmartTurnParams(stop_secs=smart_turn_stop_secs) if smart_turn_stop_secs is not None else None + turn_analyzer = LocalSmartTurnAnalyzerV3(params=smart_params) + return TurnAnalyzerUserTurnStopStrategy(turn_analyzer=turn_analyzer, **strategy_params) + elif strategy_type_lower == "external": + # ExternalUserTurnStopStrategy has no required parameters + return ExternalUserTurnStopStrategy(**strategy_params) + else: + raise ValueError( + f"Unsupported turn stop strategy: {strategy_type}. " + f"Supported types: 'speech_timeout', 'turn_analyzer', 'external'" + ) diff --git a/src/eva/assistant/server.py b/src/eva/assistant/server.py index 4282e894..1925a863 100644 --- a/src/eva/assistant/server.py +++ b/src/eva/assistant/server.py @@ -42,7 +42,7 @@ ) from pipecat.turns.user_start import VADUserTurnStartStrategy from pipecat.turns.user_stop import TurnAnalyzerUserTurnStopStrategy -from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies, UserTurnStrategies +from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.utils.time import time_now_iso8601 from eva.assistant.agentic.audit_log import AuditLog, current_timestamp_ms @@ -61,6 +61,11 @@ create_stt_service, create_tts_service, ) +from eva.assistant.pipeline.turn_config import ( + create_turn_start_strategy, + create_turn_stop_strategy, + create_vad_analyzer, +) from eva.assistant.services.llm import LiteLLMClient from eva.assistant.tools.tool_executor import ToolExecutor from eva.models.agents import AgentConfig @@ -326,26 +331,65 @@ async def _realtime_tool_handler(params) -> None: "smart_turn_stop_secs", 0.8 ) # Shorter silence so we don't have to wait 3s if smart turn marks audio as incomplete - if ( - isinstance(self.pipeline_config, (PipelineConfig, SpeechToSpeechConfig)) - and self.pipeline_config.turn_strategy == "external" - ): - logger.info("Using external user turn strategies") - user_turn_strategies = ExternalUserTurnStrategies() - vad_analyzer = None + # Use configurable turn strategies if specified, otherwise fall back to defaults + if isinstance(self.pipeline_config, (PipelineConfig, AudioLLMConfig)): + turn_start_cfg = self.pipeline_config.turn_start_strategy + turn_start_params = self.pipeline_config.turn_start_strategy_params + turn_stop_cfg = self.pipeline_config.turn_stop_strategy + turn_stop_params = self.pipeline_config.turn_stop_strategy_params + vad_cfg = self.pipeline_config.vad + vad_cfg_params = self.pipeline_config.vad_params else: - logger.info("Using local smart turn analyzer") - user_turn_strategies = UserTurnStrategies( - start=[VADUserTurnStartStrategy()], - stop=[ - TurnAnalyzerUserTurnStopStrategy( - turn_analyzer=LocalSmartTurnAnalyzerV3( - params=SmartTurnParams(stop_secs=smart_turn_stop_secs) - ) - ) - ], + turn_start_cfg = None + turn_start_params = {} + turn_stop_cfg = None + turn_stop_params = {} + vad_cfg = None + vad_cfg_params = {} + + # Create turn start strategy + turn_start_strategy = create_turn_start_strategy(turn_start_cfg, turn_start_params) + if turn_start_strategy is None: + # Default: VADUserTurnStartStrategy + turn_start_strategy = VADUserTurnStartStrategy() + logger.info("Using default VAD user turn start strategy") + else: + logger.info(f"Using configured turn start strategy: {turn_start_cfg}") + + # Create turn stop strategy + turn_stop_strategy = create_turn_stop_strategy( + turn_stop_cfg, turn_stop_params, smart_turn_stop_secs + ) + if turn_stop_strategy is None: + # Default: TurnAnalyzerUserTurnStopStrategy with LocalSmartTurnAnalyzerV3 + turn_stop_strategy = TurnAnalyzerUserTurnStopStrategy( + turn_analyzer=LocalSmartTurnAnalyzerV3( + params=SmartTurnParams(stop_secs=smart_turn_stop_secs) + ) ) - vad_analyzer = SileroVADAnalyzer(params=VADParams(stop_secs=vad_stop_secs)) + logger.info("Using default turn analyzer user turn stop strategy") + else: + logger.info(f"Using configured turn stop strategy: {turn_stop_cfg}") + + logger.info("Using local smart turn analyzer") + user_turn_strategies = UserTurnStrategies( + start=[turn_start_strategy], + stop=[turn_stop_strategy], + ) + + # Create VAD analyzer + vad_analyzer = create_vad_analyzer(vad_cfg, vad_cfg_params) + if vad_analyzer is None: + # Default: SileroVADAnalyzer with configured stop_secs + # If vad_cfg_params were provided without vad_cfg, merge them with default stop_secs + vad_params_dict = {"stop_secs": vad_stop_secs} + if vad_cfg_params: + # User provided params without specifying vad type - merge with defaults + vad_params_dict.update(vad_cfg_params) + vad_analyzer = SileroVADAnalyzer(params=VADParams(**vad_params_dict)) + logger.info("Using default Silero VAD analyzer") + else: + logger.info(f"Using configured VAD analyzer: {vad_cfg}") user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, user_params=LLMUserAggregatorParams( diff --git a/src/eva/models/config.py b/src/eva/models/config.py index 6f98a029..4a49d219 100644 --- a/src/eva/models/config.py +++ b/src/eva/models/config.py @@ -41,7 +41,7 @@ def _param_alias(params: dict[str, Any]) -> str: """Return the display alias from a params dict.""" - return params.get("alias") or params["model"] + return params.get("alias") or params.get("model", "") class PipelineConfig(BaseModel): @@ -67,14 +67,47 @@ class PipelineConfig(BaseModel): stt_params: dict[str, Any] = Field({}, description="Additional STT model parameters (JSON)") tts_params: dict[str, Any] = Field({}, description="Additional TTS model parameters (JSON)") - turn_strategy: Literal["smart", "external"] = Field( - "smart", + # Configurable turn start/stop strategies (optional) + turn_start_strategy: str | None = Field( + None, + description=( + "User turn start strategy: 'vad', 'transcription', or 'external'. " + "If not specified, uses default (vad). " + "Set via EVA_MODEL__TURN_START_STRATEGY." + ), + ) + turn_start_strategy_params: dict[str, Any] = Field( + {}, + description="Parameters for turn start strategy (JSON). Set via EVA_MODEL__TURN_START_STRATEGY_PARAMS.", + ) + + turn_stop_strategy: str | None = Field( + None, + description=( + "User turn stop strategy: 'speech_timeout', 'turn_analyzer', or 'external'. " + "If not specified, uses default (turn_analyzer). " + "Set via EVA_MODEL__TURN_STOP_STRATEGY." + ), + ) + turn_stop_strategy_params: dict[str, Any] = Field( + {}, + description="Parameters for turn stop strategy (JSON). Set via EVA_MODEL__TURN_STOP_STRATEGY_PARAMS.", + ) + + # VAD configuration (optional) + vad: str | None = Field( + None, description=( - "User turn detection strategy. " - "'smart' uses LocalSmartTurnAnalyzerV3 + SileroVAD (default). " - "'external' uses ExternalUserTurnStrategies for services with built-in turn detection " - "(e.g., deepgram-flux, Speechmatics). " - "Set via EVA_MODEL__TURN_STRATEGY=external." + "VAD analyzer type: 'silero'. " + "If not specified, uses default VAD (SileroVADAnalyzer). " + "Set via EVA_MODEL__VAD." + ), + ) + vad_params: dict[str, Any] = Field( + {}, + description=( + "VAD parameters (JSON): confidence, start_secs, stop_secs, min_volume. " + "Set via EVA_MODEL__VAD_PARAMS." ), ) @@ -102,6 +135,15 @@ def _migrate_legacy_fields(cls, data: Any) -> Any: data.pop(key, None) return data + @field_serializer("stt_params", "tts_params") + @classmethod + def _redact_api_keys(cls, params: dict[str, Any]) -> dict[str, Any]: + """Redact API keys when serializing.""" + redacted = params.copy() + if "api_key" in redacted: + redacted["api_key"] = "***" + return redacted + class SpeechToSpeechConfig(BaseModel): """Configuration for a speech-to-speech model.""" @@ -111,22 +153,20 @@ class SpeechToSpeechConfig(BaseModel): s2s: str = Field(description="Speech-to-speech model name", examples=["gpt-realtime-mini", "gemini_live"]) s2s_params: dict[str, Any] = Field({}, description="Additional speech-to-speech model parameters (JSON)") - turn_strategy: Literal["smart", "external"] = Field( - "smart", - description=( - "User turn detection strategy. " - "'smart' uses LocalSmartTurnAnalyzerV3 + SileroVAD (default). " - "'external' uses ExternalUserTurnStrategies for services with built-in turn detection " - "(e.g., deepgram-flux, Speechmatics). " - "Set via EVA_MODEL__TURN_STRATEGY=external." - ), - ) - @property def pipeline_parts(self) -> dict[str, str]: """Component names for this pipeline.""" return {"s2s": _param_alias(self.s2s_params) or self.s2s} + @field_serializer("s2s_params") + @classmethod + def _redact_api_keys(cls, params: dict[str, Any]) -> dict[str, Any]: + """Redact API keys when serializing.""" + redacted = params.copy() + if "api_key" in redacted: + redacted["api_key"] = "***" + return redacted + class AudioLLMConfig(BaseModel): """Configuration for an Audio-LLM pipeline (audio in, text out, separate TTS). @@ -143,11 +183,58 @@ class AudioLLMConfig(BaseModel): ) audio_llm_params: dict[str, Any] = Field( {}, - description="Audio-LLM parameters (JSON): base_url (required), api_key, model, temperature, max_tokens", + description=( + "Audio-LLM parameters (JSON): base_url (required), api_key, model, temperature, max_tokens, " + "vad_stop_secs (default: 0.4), smart_turn_stop_secs (default: 0.8)" + ), ) tts: str = Field(description="TTS model", examples=["cartesia", "elevenlabs"]) tts_params: dict[str, Any] = Field({}, description="Additional TTS model parameters (JSON)") + # Configurable turn start/stop strategies (optional, same as PipelineConfig) + turn_start_strategy: str | None = Field( + None, + description=( + "User turn start strategy: 'vad', 'transcription', or 'external'. " + "If not specified, uses default (vad). " + "Set via EVA_MODEL__TURN_START_STRATEGY." + ), + ) + turn_start_strategy_params: dict[str, Any] = Field( + {}, + description="Parameters for turn start strategy (JSON). Set via EVA_MODEL__TURN_START_STRATEGY_PARAMS.", + ) + + turn_stop_strategy: str | None = Field( + None, + description=( + "User turn stop strategy: 'speech_timeout', 'turn_analyzer', or 'external'. " + "If not specified, uses default (turn_analyzer). " + "Set via EVA_MODEL__TURN_STOP_STRATEGY." + ), + ) + turn_stop_strategy_params: dict[str, Any] = Field( + {}, + description="Parameters for turn stop strategy (JSON). Set via EVA_MODEL__TURN_STOP_STRATEGY_PARAMS.", + ) + + # VAD configuration (optional) + vad: str | None = Field( + None, + description=( + "VAD analyzer type: 'silero'. " + "If not specified, uses default VAD (SileroVADAnalyzer). " + "Set via EVA_MODEL__VAD." + ), + ) + vad_params: dict[str, Any] = Field( + {}, + description=( + "VAD parameters (JSON): confidence, start_secs, stop_secs, min_volume. " + "Set via EVA_MODEL__VAD_PARAMS." + ), + ) + @property def pipeline_parts(self) -> dict[str, str]: """Component names for this pipeline.""" @@ -156,6 +243,15 @@ def pipeline_parts(self) -> dict[str, str]: "tts": _param_alias(self.tts_params) or self.tts, } + @field_serializer("audio_llm_params", "tts_params") + @classmethod + def _redact_api_keys(cls, params: dict[str, Any]) -> dict[str, Any]: + """Redact API keys when serializing.""" + redacted = params.copy() + if "api_key" in redacted: + redacted["api_key"] = "***" + return redacted + _PIPELINE_FIELDS = { "llm", @@ -163,12 +259,28 @@ def pipeline_parts(self) -> dict[str, str]: "tts", "stt_params", "tts_params", - "turn_strategy", + "turn_start_strategy", + "turn_start_strategy_params", + "turn_stop_strategy", + "turn_stop_strategy_params", + "vad", + "vad_params", *PipelineConfig._LEGACY_RENAMES, *PipelineConfig._LEGACY_DROP, } -_S2S_FIELDS = {"s2s", "s2s_params", "turn_strategy"} -_AUDIO_LLM_FIELDS = {"audio_llm", "audio_llm_params", "tts", "tts_params"} +_S2S_FIELDS = {"s2s", "s2s_params"} +_AUDIO_LLM_FIELDS = { + "audio_llm", + "audio_llm_params", + "tts", + "tts_params", + "turn_start_strategy", + "turn_start_strategy_params", + "turn_stop_strategy", + "turn_stop_strategy_params", + "vad", + "vad_params", +} def _model_config_discriminator(data: Any) -> str: From d3bb3016570d8ac04ece30eafc83ca2f2d126f8c Mon Sep 17 00:00:00 2001 From: "raghav.mehndiratta" Date: Fri, 3 Apr 2026 12:21:27 -0700 Subject: [PATCH 2/4] read from metric summary --- .../assistant/pipeline/audio_llm_processor.py | 2 +- src/eva/orchestrator/runner.py | 77 +++++++++---------- 2 files changed, 38 insertions(+), 41 deletions(-) diff --git a/src/eva/assistant/pipeline/audio_llm_processor.py b/src/eva/assistant/pipeline/audio_llm_processor.py index bb5b24b3..c91f9401 100644 --- a/src/eva/assistant/pipeline/audio_llm_processor.py +++ b/src/eva/assistant/pipeline/audio_llm_processor.py @@ -417,7 +417,7 @@ def __init__( super().__init__(**kwargs) self._audio_collector = audio_collector params = params or {} - self._api_key = params.get["api_key"] + self._api_key = params["api_key"] self._model = model self._system_prompt = system_prompt or self.DEFAULT_SYSTEM_PROMPT self._sample_rate = sample_rate diff --git a/src/eva/orchestrator/runner.py b/src/eva/orchestrator/runner.py index ac5a45f3..79f2ea97 100644 --- a/src/eva/orchestrator/runner.py +++ b/src/eva/orchestrator/runner.py @@ -692,7 +692,8 @@ async def validate_existing( entry["failure_details"] = failure_details final_failures[oid] = entry - # Build evaluation summary with separate simulation and metrics sections + # Build evaluation summary with simulation results only + # Note: metrics status is tracked in metrics_summary.json, not here llm_generic_error_record_ids = find_records_with_llm_generic_error(self.output_dir, successful_ids) eval_summary: dict[str, Any] = { "started_at": started_at.isoformat(), @@ -714,16 +715,6 @@ async def validate_existing( "final_failures": final_failures, } - if metrics_result is not None: - eval_summary["metrics"] = { - "records_evaluated": metrics_result.total_records, - "metrics_computed": self.config.metrics, - "total_metric_failures": metrics_result.total_metric_failures, - "metric_failures": { - name: sorted(record_ids) for name, record_ids in metrics_result.metric_failures.items() - }, - } - eval_summary_path = self.output_dir / "evaluation_summary.json" with open(eval_summary_path, "w") as f: json.dump(eval_summary, f, indent=2) @@ -768,7 +759,7 @@ async def rerun_failed_metrics( ) -> RunResult: """Rerun only previously failed metric computations. - Reads metric_failures from evaluation_summary.json and reruns only + Reads metric_errors from metrics_summary.json and reruns only the specific failed metrics on the specific failed records. Existing successful metric values are preserved and read from disk. @@ -784,27 +775,38 @@ async def rerun_failed_metrics( started_at = datetime.now() self.output_dir = run_dir - # Read evaluation_summary.json + # Read metrics_summary.json + metrics_summary_path = run_dir / "metrics_summary.json" + if not metrics_summary_path.exists(): + raise FileNotFoundError( + f"metrics_summary.json not found in {run_dir}. " + "Run metrics first before using --rerun-failed-metrics." + ) + + with open(metrics_summary_path) as f: + metrics_summary = json.load(f) + + # Read evaluation_summary.json for successful_record_ids eval_summary_path = run_dir / "evaluation_summary.json" if not eval_summary_path.exists(): raise FileNotFoundError( f"evaluation_summary.json not found in {run_dir}. " - "Run metrics first before using --rerun-failed-metrics." + "This file is required to identify which records passed simulation validation." ) with open(eval_summary_path) as f: eval_summary = json.load(f) - # Support both old and new schema + # Get successful record IDs from evaluation_summary.json sim = eval_summary.get("simulation", eval_summary) successful_ids = sim.get("successful_record_ids", []) - metrics_section = eval_summary.get("metrics", {}) - metric_failures = metrics_section.get("metric_failures", {}) - metrics_computed = metrics_section.get("metrics_computed", []) + # Get metric errors from metrics_summary.json + metric_errors = metrics_summary.get("metric_errors", {}) + metrics_computed = metrics_summary.get("provenance", {}).get("metrics_computed", []) - if not metric_failures: - logger.info("No metric failures found in evaluation_summary.json — nothing to rerun") + if not metric_errors: + logger.info("No metric errors found in metrics_summary.json — nothing to rerun") ended_at = datetime.now() return RunResult( run_id=self.config.run_id, @@ -815,19 +817,21 @@ async def rerun_failed_metrics( ) # Use explicit CLI --metrics if provided, otherwise prefer metrics_computed - # from evaluation_summary.json (reflects actual metric names from the last run), + # from metrics_summary.json (reflects actual metric names from the last run), # falling back to config.json metrics as a last resort. metric_names = cli_metrics or metrics_computed or self.config.metrics if not metric_names: raise ValueError( - "No metrics to run. Specify --metrics or ensure evaluation_summary.json has metrics_computed." + "No metrics to run. Specify --metrics or ensure metrics_summary.json has metrics_computed." ) # Build record_metric_filter: record_id -> set of metric names to rerun + # metric_errors format: {metric_name: {failed_count, total_count, failed_records: [...]}} successful_set = set(successful_ids) record_metric_filter: dict[str, set[str]] = {} - for metric_name, failed_record_ids in metric_failures.items(): + for metric_name, error_info in metric_errors.items(): if metric_name in metric_names: + failed_record_ids = error_info.get("failed_records", []) for record_id in failed_record_ids: if record_id in successful_set: record_metric_filter.setdefault(record_id, set()).add(metric_name) @@ -847,37 +851,30 @@ async def rerun_failed_metrics( logger.info( f"Rerunning {total_reruns} failed metric computation(s) across {len(record_metric_filter)} record(s)" ) - for metric_name, failed_ids in metric_failures.items(): + for metric_name, error_info in metric_errors.items(): if metric_name in metric_names: + failed_ids = error_info.get("failed_records", []) applicable = [rid for rid in failed_ids if rid in record_metric_filter] if applicable: logger.info(f" {metric_name}: {len(applicable)} record(s)") - # Create MetricsRunner with all metrics but filter per-record. - # Records not in record_metric_filter will read existing metrics from disk. - # Records in the filter will only recompute the failed metrics and merge. + # Create MetricsRunner with ONLY the records that have failed metrics. + # This is more efficient - we only process records that need recomputation. + # MetricsRunner.run() will automatically load all other records from disk + # (line 237-245 in metrics/runner.py) to ensure aggregates include ALL records. + records_to_rerun = list(record_metric_filter.keys()) metrics_runner = MetricsRunner( run_dir=run_dir, dataset=records, metric_names=metric_names, - record_ids=successful_ids, + record_ids=records_to_rerun, num_draws=self.config.num_trials, record_metric_filter=record_metric_filter, ) metrics_result = await metrics_runner.run() - # Update evaluation_summary.json with new metrics status - eval_summary["metrics"] = { - "records_evaluated": metrics_result.total_records, - "metrics_computed": metric_names, - "total_metric_failures": metrics_result.total_metric_failures, - "metric_failures": { - name: sorted(record_ids) for name, record_ids in metrics_result.metric_failures.items() - }, - } - - with open(eval_summary_path, "w") as f: - json.dump(eval_summary, f, indent=2) + # Note: metrics_summary.json is automatically updated by MetricsRunner.run() + # No need to manually update evaluation_summary.json - it only tracks simulation status # Terminal output logger.info("=" * 60) From 0ffeb26c441025d569b9eed84794c2943ae01d25 Mon Sep 17 00:00:00 2001 From: "raghav.mehndiratta" Date: Tue, 7 Apr 2026 13:12:35 -0700 Subject: [PATCH 3/4] revert reading location --- src/eva/orchestrator/runner.py | 77 ++++++++++++++++++---------------- 1 file changed, 40 insertions(+), 37 deletions(-) diff --git a/src/eva/orchestrator/runner.py b/src/eva/orchestrator/runner.py index 79f2ea97..ac5a45f3 100644 --- a/src/eva/orchestrator/runner.py +++ b/src/eva/orchestrator/runner.py @@ -692,8 +692,7 @@ async def validate_existing( entry["failure_details"] = failure_details final_failures[oid] = entry - # Build evaluation summary with simulation results only - # Note: metrics status is tracked in metrics_summary.json, not here + # Build evaluation summary with separate simulation and metrics sections llm_generic_error_record_ids = find_records_with_llm_generic_error(self.output_dir, successful_ids) eval_summary: dict[str, Any] = { "started_at": started_at.isoformat(), @@ -715,6 +714,16 @@ async def validate_existing( "final_failures": final_failures, } + if metrics_result is not None: + eval_summary["metrics"] = { + "records_evaluated": metrics_result.total_records, + "metrics_computed": self.config.metrics, + "total_metric_failures": metrics_result.total_metric_failures, + "metric_failures": { + name: sorted(record_ids) for name, record_ids in metrics_result.metric_failures.items() + }, + } + eval_summary_path = self.output_dir / "evaluation_summary.json" with open(eval_summary_path, "w") as f: json.dump(eval_summary, f, indent=2) @@ -759,7 +768,7 @@ async def rerun_failed_metrics( ) -> RunResult: """Rerun only previously failed metric computations. - Reads metric_errors from metrics_summary.json and reruns only + Reads metric_failures from evaluation_summary.json and reruns only the specific failed metrics on the specific failed records. Existing successful metric values are preserved and read from disk. @@ -775,38 +784,27 @@ async def rerun_failed_metrics( started_at = datetime.now() self.output_dir = run_dir - # Read metrics_summary.json - metrics_summary_path = run_dir / "metrics_summary.json" - if not metrics_summary_path.exists(): - raise FileNotFoundError( - f"metrics_summary.json not found in {run_dir}. " - "Run metrics first before using --rerun-failed-metrics." - ) - - with open(metrics_summary_path) as f: - metrics_summary = json.load(f) - - # Read evaluation_summary.json for successful_record_ids + # Read evaluation_summary.json eval_summary_path = run_dir / "evaluation_summary.json" if not eval_summary_path.exists(): raise FileNotFoundError( f"evaluation_summary.json not found in {run_dir}. " - "This file is required to identify which records passed simulation validation." + "Run metrics first before using --rerun-failed-metrics." ) with open(eval_summary_path) as f: eval_summary = json.load(f) - # Get successful record IDs from evaluation_summary.json + # Support both old and new schema sim = eval_summary.get("simulation", eval_summary) successful_ids = sim.get("successful_record_ids", []) - # Get metric errors from metrics_summary.json - metric_errors = metrics_summary.get("metric_errors", {}) - metrics_computed = metrics_summary.get("provenance", {}).get("metrics_computed", []) + metrics_section = eval_summary.get("metrics", {}) + metric_failures = metrics_section.get("metric_failures", {}) + metrics_computed = metrics_section.get("metrics_computed", []) - if not metric_errors: - logger.info("No metric errors found in metrics_summary.json — nothing to rerun") + if not metric_failures: + logger.info("No metric failures found in evaluation_summary.json — nothing to rerun") ended_at = datetime.now() return RunResult( run_id=self.config.run_id, @@ -817,21 +815,19 @@ async def rerun_failed_metrics( ) # Use explicit CLI --metrics if provided, otherwise prefer metrics_computed - # from metrics_summary.json (reflects actual metric names from the last run), + # from evaluation_summary.json (reflects actual metric names from the last run), # falling back to config.json metrics as a last resort. metric_names = cli_metrics or metrics_computed or self.config.metrics if not metric_names: raise ValueError( - "No metrics to run. Specify --metrics or ensure metrics_summary.json has metrics_computed." + "No metrics to run. Specify --metrics or ensure evaluation_summary.json has metrics_computed." ) # Build record_metric_filter: record_id -> set of metric names to rerun - # metric_errors format: {metric_name: {failed_count, total_count, failed_records: [...]}} successful_set = set(successful_ids) record_metric_filter: dict[str, set[str]] = {} - for metric_name, error_info in metric_errors.items(): + for metric_name, failed_record_ids in metric_failures.items(): if metric_name in metric_names: - failed_record_ids = error_info.get("failed_records", []) for record_id in failed_record_ids: if record_id in successful_set: record_metric_filter.setdefault(record_id, set()).add(metric_name) @@ -851,30 +847,37 @@ async def rerun_failed_metrics( logger.info( f"Rerunning {total_reruns} failed metric computation(s) across {len(record_metric_filter)} record(s)" ) - for metric_name, error_info in metric_errors.items(): + for metric_name, failed_ids in metric_failures.items(): if metric_name in metric_names: - failed_ids = error_info.get("failed_records", []) applicable = [rid for rid in failed_ids if rid in record_metric_filter] if applicable: logger.info(f" {metric_name}: {len(applicable)} record(s)") - # Create MetricsRunner with ONLY the records that have failed metrics. - # This is more efficient - we only process records that need recomputation. - # MetricsRunner.run() will automatically load all other records from disk - # (line 237-245 in metrics/runner.py) to ensure aggregates include ALL records. - records_to_rerun = list(record_metric_filter.keys()) + # Create MetricsRunner with all metrics but filter per-record. + # Records not in record_metric_filter will read existing metrics from disk. + # Records in the filter will only recompute the failed metrics and merge. metrics_runner = MetricsRunner( run_dir=run_dir, dataset=records, metric_names=metric_names, - record_ids=records_to_rerun, + record_ids=successful_ids, num_draws=self.config.num_trials, record_metric_filter=record_metric_filter, ) metrics_result = await metrics_runner.run() - # Note: metrics_summary.json is automatically updated by MetricsRunner.run() - # No need to manually update evaluation_summary.json - it only tracks simulation status + # Update evaluation_summary.json with new metrics status + eval_summary["metrics"] = { + "records_evaluated": metrics_result.total_records, + "metrics_computed": metric_names, + "total_metric_failures": metrics_result.total_metric_failures, + "metric_failures": { + name: sorted(record_ids) for name, record_ids in metrics_result.metric_failures.items() + }, + } + + with open(eval_summary_path, "w") as f: + json.dump(eval_summary, f, indent=2) # Terminal output logger.info("=" * 60) From de67ed2f941831c6e9d799a4716a8d7d7858e283 Mon Sep 17 00:00:00 2001 From: "raghav.mehndiratta" Date: Tue, 7 Apr 2026 13:49:59 -0700 Subject: [PATCH 4/4] cleanup defaults --- src/eva/assistant/pipeline/turn_config.py | 31 ++++------- src/eva/assistant/server.py | 43 ++++----------- src/eva/models/config.py | 66 +++++++++++------------ 3 files changed, 55 insertions(+), 85 deletions(-) diff --git a/src/eva/assistant/pipeline/turn_config.py b/src/eva/assistant/pipeline/turn_config.py index ead7dc87..ffda1921 100644 --- a/src/eva/assistant/pipeline/turn_config.py +++ b/src/eva/assistant/pipeline/turn_config.py @@ -28,22 +28,19 @@ logger = get_logger(__name__) -def create_vad_analyzer(vad_type: str | None, vad_params: dict[str, Any]) -> VADAnalyzer | None: +def create_vad_analyzer(vad_type: str, vad_params: dict[str, Any]) -> VADAnalyzer: """Create a VAD analyzer from configuration. Args: - vad_type: VAD analyzer type ('silero' or None for default) + vad_type: VAD analyzer type ('silero') vad_params: VAD parameters (confidence, start_secs, stop_secs, min_volume) Returns: - VAD analyzer instance, or None if vad_type is None + VAD analyzer instance Raises: ValueError: If vad_type is not supported """ - if vad_type is None: - return None - vad_type_lower = vad_type.lower() if vad_type_lower == "silero": @@ -57,24 +54,21 @@ def create_vad_analyzer(vad_type: str | None, vad_params: dict[str, Any]) -> VAD def create_turn_start_strategy( - strategy_type: str | None, + strategy_type: str, strategy_params: dict[str, Any], -) -> BaseUserTurnStartStrategy | None: +) -> BaseUserTurnStartStrategy: """Create a user turn start strategy from configuration. Args: - strategy_type: Strategy type ('vad', 'transcription', 'external', or None for default) + strategy_type: Strategy type ('vad', 'transcription', 'external') strategy_params: Strategy-specific parameters Returns: - Turn start strategy instance, or None if strategy_type is None + Turn start strategy instance Raises: ValueError: If strategy_type is not supported """ - if strategy_type is None: - return None - strategy_type_lower = strategy_type.lower() if strategy_type_lower == "vad": @@ -94,26 +88,23 @@ def create_turn_start_strategy( def create_turn_stop_strategy( - strategy_type: str | None, + strategy_type: str, strategy_params: dict[str, Any], smart_turn_stop_secs: float | None = None, -) -> BaseUserTurnStopStrategy | None: +) -> BaseUserTurnStopStrategy: """Create a user turn stop strategy from configuration. Args: - strategy_type: Strategy type ('speech_timeout', 'turn_analyzer', 'external', or None for default) + strategy_type: Strategy type ('speech_timeout', 'turn_analyzer', 'external') strategy_params: Strategy-specific parameters smart_turn_stop_secs: stop_secs for SmartTurnParams (used with turn_analyzer strategy) Returns: - Turn stop strategy instance, or None if strategy_type is None + Turn stop strategy instance Raises: ValueError: If strategy_type is not supported """ - if strategy_type is None: - return None - strategy_type_lower = strategy_type.lower() if strategy_type_lower == "speech_timeout": diff --git a/src/eva/assistant/server.py b/src/eva/assistant/server.py index 298c7619..7d7443b1 100644 --- a/src/eva/assistant/server.py +++ b/src/eva/assistant/server.py @@ -347,49 +347,28 @@ async def _realtime_tool_handler(params) -> None: vad_cfg = None vad_cfg_params = {} - # Create turn start strategy + # Create turn start strategy using factory function turn_start_strategy = create_turn_start_strategy(turn_start_cfg, turn_start_params) - if turn_start_strategy is None: - # Default: VADUserTurnStartStrategy - turn_start_strategy = VADUserTurnStartStrategy() - logger.info("Using default VAD user turn start strategy") - else: - logger.info(f"Using configured turn start strategy: {turn_start_cfg}") + logger.info(f"Using turn start strategy: {turn_start_cfg}") - # Create turn stop strategy + # Create turn stop strategy using factory function turn_stop_strategy = create_turn_stop_strategy( turn_stop_cfg, turn_stop_params, smart_turn_stop_secs ) - if turn_stop_strategy is None: - # Default: TurnAnalyzerUserTurnStopStrategy with LocalSmartTurnAnalyzerV3 - turn_stop_strategy = TurnAnalyzerUserTurnStopStrategy( - turn_analyzer=LocalSmartTurnAnalyzerV3( - params=SmartTurnParams(stop_secs=smart_turn_stop_secs) - ) - ) - logger.info("Using default turn analyzer user turn stop strategy") - else: - logger.info(f"Using configured turn stop strategy: {turn_stop_cfg}") + logger.info(f"Using turn stop strategy: {turn_stop_cfg}") - logger.info("Using local smart turn analyzer") user_turn_strategies = UserTurnStrategies( start=[turn_start_strategy], stop=[turn_stop_strategy], ) - # Create VAD analyzer - vad_analyzer = create_vad_analyzer(vad_cfg, vad_cfg_params) - if vad_analyzer is None: - # Default: SileroVADAnalyzer with configured stop_secs - # If vad_cfg_params were provided without vad_cfg, merge them with default stop_secs - vad_params_dict = {"stop_secs": vad_stop_secs} - if vad_cfg_params: - # User provided params without specifying vad type - merge with defaults - vad_params_dict.update(vad_cfg_params) - vad_analyzer = SileroVADAnalyzer(params=VADParams(**vad_params_dict)) - logger.info("Using default Silero VAD analyzer") - else: - logger.info(f"Using configured VAD analyzer: {vad_cfg}") + # Create VAD analyzer using factory function + # Merge user params with pipeline-specific stop_secs + vad_params_dict = {"stop_secs": vad_stop_secs} + if vad_cfg_params: + vad_params_dict.update(vad_cfg_params) + vad_analyzer = create_vad_analyzer(vad_cfg, vad_params_dict) + logger.info(f"Using VAD analyzer: {vad_cfg}") user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, user_params=LLMUserAggregatorParams( diff --git a/src/eva/models/config.py b/src/eva/models/config.py index 23c19ba7..ea6f23a8 100644 --- a/src/eva/models/config.py +++ b/src/eva/models/config.py @@ -67,12 +67,12 @@ class PipelineConfig(BaseModel): stt_params: dict[str, Any] = Field({}, description="Additional STT model parameters (JSON)") tts_params: dict[str, Any] = Field({}, description="Additional TTS model parameters (JSON)") - # Configurable turn start/stop strategies (optional) - turn_start_strategy: str | None = Field( - None, + # Configurable turn start/stop strategies + turn_start_strategy: str = Field( + "vad", description=( "User turn start strategy: 'vad', 'transcription', or 'external'. " - "If not specified, uses default (vad). " + "Defaults to 'vad' (VADUserTurnStartStrategy). " "Set via EVA_MODEL__TURN_START_STRATEGY." ), ) @@ -81,11 +81,11 @@ class PipelineConfig(BaseModel): description="Parameters for turn start strategy (JSON). Set via EVA_MODEL__TURN_START_STRATEGY_PARAMS.", ) - turn_stop_strategy: str | None = Field( - None, + turn_stop_strategy: str = Field( + "turn_analyzer", description=( "User turn stop strategy: 'speech_timeout', 'turn_analyzer', or 'external'. " - "If not specified, uses default (turn_analyzer). " + "Defaults to 'turn_analyzer' (TurnAnalyzerUserTurnStopStrategy with LocalSmartTurnAnalyzerV3). " "Set via EVA_MODEL__TURN_STOP_STRATEGY." ), ) @@ -94,12 +94,12 @@ class PipelineConfig(BaseModel): description="Parameters for turn stop strategy (JSON). Set via EVA_MODEL__TURN_STOP_STRATEGY_PARAMS.", ) - # VAD configuration (optional) - vad: str | None = Field( - None, + # VAD configuration + vad: str = Field( + "silero", description=( "VAD analyzer type: 'silero'. " - "If not specified, uses default VAD (SileroVADAnalyzer). " + "Defaults to 'silero' (SileroVADAnalyzer). " "Set via EVA_MODEL__VAD." ), ) @@ -153,12 +153,12 @@ class SpeechToSpeechConfig(BaseModel): s2s: str = Field(description="Speech-to-speech model name", examples=["gpt-realtime-mini", "gemini_live"]) s2s_params: dict[str, Any] = Field({}, description="Additional speech-to-speech model parameters (JSON)") - # Configurable turn start/stop strategies (optional, same as PipelineConfig) - turn_start_strategy: str | None = Field( - None, + # Configurable turn start/stop strategies (same as PipelineConfig) + turn_start_strategy: str = Field( + "vad", description=( "User turn start strategy: 'vad', 'transcription', or 'external'. " - "If not specified, uses default (vad). " + "Defaults to 'vad' (VADUserTurnStartStrategy). " "Set via EVA_MODEL__TURN_START_STRATEGY." ), ) @@ -167,11 +167,11 @@ class SpeechToSpeechConfig(BaseModel): description="Parameters for turn start strategy (JSON). Set via EVA_MODEL__TURN_START_STRATEGY_PARAMS.", ) - turn_stop_strategy: str | None = Field( - None, + turn_stop_strategy: str = Field( + "turn_analyzer", description=( "User turn stop strategy: 'speech_timeout', 'turn_analyzer', or 'external'. " - "If not specified, uses default (turn_analyzer). " + "Defaults to 'turn_analyzer' (TurnAnalyzerUserTurnStopStrategy with LocalSmartTurnAnalyzerV3). " "Set via EVA_MODEL__TURN_STOP_STRATEGY." ), ) @@ -180,12 +180,12 @@ class SpeechToSpeechConfig(BaseModel): description="Parameters for turn stop strategy (JSON). Set via EVA_MODEL__TURN_STOP_STRATEGY_PARAMS.", ) - # VAD configuration (optional) - vad: str | None = Field( - None, + # VAD configuration + vad: str = Field( + "silero", description=( "VAD analyzer type: 'silero'. " - "If not specified, uses default VAD (SileroVADAnalyzer). " + "Defaults to 'silero' (SileroVADAnalyzer). " "Set via EVA_MODEL__VAD." ), ) @@ -235,12 +235,12 @@ class AudioLLMConfig(BaseModel): tts: str = Field(description="TTS model", examples=["cartesia", "elevenlabs"]) tts_params: dict[str, Any] = Field({}, description="Additional TTS model parameters (JSON)") - # Configurable turn start/stop strategies (optional, same as PipelineConfig) - turn_start_strategy: str | None = Field( - None, + # Configurable turn start/stop strategies (same as PipelineConfig) + turn_start_strategy: str = Field( + "vad", description=( "User turn start strategy: 'vad', 'transcription', or 'external'. " - "If not specified, uses default (vad). " + "Defaults to 'vad' (VADUserTurnStartStrategy). " "Set via EVA_MODEL__TURN_START_STRATEGY." ), ) @@ -249,11 +249,11 @@ class AudioLLMConfig(BaseModel): description="Parameters for turn start strategy (JSON). Set via EVA_MODEL__TURN_START_STRATEGY_PARAMS.", ) - turn_stop_strategy: str | None = Field( - None, + turn_stop_strategy: str = Field( + "turn_analyzer", description=( "User turn stop strategy: 'speech_timeout', 'turn_analyzer', or 'external'. " - "If not specified, uses default (turn_analyzer). " + "Defaults to 'turn_analyzer' (TurnAnalyzerUserTurnStopStrategy with LocalSmartTurnAnalyzerV3). " "Set via EVA_MODEL__TURN_STOP_STRATEGY." ), ) @@ -262,12 +262,12 @@ class AudioLLMConfig(BaseModel): description="Parameters for turn stop strategy (JSON). Set via EVA_MODEL__TURN_STOP_STRATEGY_PARAMS.", ) - # VAD configuration (optional) - vad: str | None = Field( - None, + # VAD configuration + vad: str = Field( + "silero", description=( "VAD analyzer type: 'silero'. " - "If not specified, uses default VAD (SileroVADAnalyzer). " + "Defaults to 'silero' (SileroVADAnalyzer). " "Set via EVA_MODEL__VAD." ), )