diff --git a/application/ai/structured_json_pipeline.py b/application/ai/structured_json_pipeline.py index 11530aa7..557f3574 100644 --- a/application/ai/structured_json_pipeline.py +++ b/application/ai/structured_json_pipeline.py @@ -28,7 +28,7 @@ def _is_retryable_llm_error(exc: Exception) -> bool: - """识别上游临时故障,避免 429/5xx/超时直接短路。""" + """识别上游临时故障,避免 429/5xx/超时/空响应直接短路。""" message = str(exc).lower() retryable_markers = ( "overloaded_error", @@ -37,6 +37,8 @@ def _is_retryable_llm_error(exc: Exception) -> bool: "temporar", "connection reset", "service unavailable", + "empty content", + "empty non-stream content", ) retryable_statuses = (" 429", " 500", " 502", " 503", " 504", " 529") return any(marker in message for marker in retryable_markers) or any( diff --git a/infrastructure/ai/providers/openai_provider.py b/infrastructure/ai/providers/openai_provider.py index 645180ea..03ca3c3c 100644 --- a/infrastructure/ai/providers/openai_provider.py +++ b/infrastructure/ai/providers/openai_provider.py @@ -1,4 +1,5 @@ """OpenAI LLM 提供商实现""" +import asyncio import logging import openai import httpx @@ -15,6 +16,11 @@ logger = logging.getLogger(__name__) +# 空响应自动重试次数(不含首次调用) +_EMPTY_CONTENT_MAX_RETRIES = 2 +# 空响应重试基础延迟(秒),实际延迟 = 基础延迟 * 2^attempt +_EMPTY_CONTENT_BASE_DELAY = 1.0 + class OpenAIProvider(BaseProvider): """OpenAI LLM 提供商实现 @@ -71,22 +77,41 @@ async def generate( base_url = self.settings.base_url or "https://api.openai.com/v1" use_responses = not self._use_legacy and base_url not in self.__class__._fallback_to_chat_cache - if use_responses: + last_empty_exc: Exception | None = None + for attempt in range(1 + _EMPTY_CONTENT_MAX_RETRIES): try: - return await self._generate_via_responses(prompt, config) - except (openai.NotFoundError, openai.BadRequestError) as e: - logger.info(f"Responses API unsupported for {base_url}, falling back to chat completions: {str(e)}") - self.__class__._fallback_to_chat_cache.add(base_url) - except Exception as e: - # 某些网关在路径错误时可能不抛严格的 404 而是抛出其他错误,如果消息含有明确路径错误也尝试降级 - if "404" in str(e) or "Not Found" in str(e) or "400" in str(e) or "Account invalid" in str(e) or "INVALID_ARGUMENT" in str(e): - logger.info(f"Gateway returned error for Responses API ({base_url}), falling back: {str(e)}") - self.__class__._fallback_to_chat_cache.add(base_url) - else: - raise + if use_responses: + try: + return await self._generate_via_responses(prompt, config) + except (openai.NotFoundError, openai.BadRequestError) as e: + logger.info(f"Responses API unsupported for {base_url}, falling back to chat completions: {str(e)}") + self.__class__._fallback_to_chat_cache.add(base_url) + except Exception as e: + # 某些网关在路径错误时可能不抛严格的 404 而是抛出其他错误,如果消息含有明确路径错误也尝试降级 + if "404" in str(e) or "Not Found" in str(e) or "400" in str(e) or "Account invalid" in str(e) or "INVALID_ARGUMENT" in str(e): + logger.info(f"Gateway returned error for Responses API ({base_url}), falling back: {str(e)}") + self.__class__._fallback_to_chat_cache.add(base_url) + else: + raise + + # 使用降级的 Chat Completions API + return await self._generate_via_chat(prompt, config) + except RuntimeError as e: + if self._is_empty_content_error(e) and attempt < _EMPTY_CONTENT_MAX_RETRIES: + last_empty_exc = e + delay = _EMPTY_CONTENT_BASE_DELAY * (2 ** attempt) + logger.warning( + "LLM 返回空内容,%.1f 秒后自动重试 (attempt=%d/%d): %s", + delay, attempt + 1, _EMPTY_CONTENT_MAX_RETRIES, e, + ) + await asyncio.sleep(delay) + continue + raise - # 使用降级的 Chat Completions API - return await self._generate_via_chat(prompt, config) + # 所有重试均失败(理论上不会到这里,但做兜底) + if last_empty_exc is not None: + raise last_empty_exc + raise RuntimeError("LLM generate exhausted empty-content retries") except RuntimeError: raise except ValueError: @@ -94,6 +119,15 @@ async def generate( except Exception as e: raise RuntimeError(f"Failed to generate text: {str(e)}") from e + @staticmethod + def _is_empty_content_error(exc: Exception) -> bool: + """判断异常是否为空响应导致的错误,用于自动重试决策。""" + msg = str(exc).lower() + return ( + "empty content" in msg + or "empty non-stream content" in msg + ) + async def _generate_via_chat(self, prompt: Prompt, config: GenerationConfig) -> GenerationResult: """Chat Completions API 非流式生成 @@ -124,9 +158,12 @@ async def _generate_via_chat(self, prompt: Prompt, config: GenerationConfig) -> content = self._extract_text_from_response(response) if not content: + # 记录原始响应以便排查:含 choices 数量、message 结构、finish_reason + raw_preview = self._summarize_raw_response(response) logger.warning( "OpenAI-compatible response returned empty non-stream content; " - "falling back to streaming aggregation" + "falling back to streaming aggregation. raw_preview=%s", + raw_preview, ) content, token_usage = await self._generate_via_stream(request_kwargs) return GenerationResult(content=content, token_usage=token_usage) @@ -351,6 +388,37 @@ def _normalize_chat_completion_content(content: Any) -> str: return str(content).strip() + @staticmethod + def _summarize_raw_response(response: Any) -> str: + """生成原始响应的摘要字符串,用于空响应时的诊断日志。""" + try: + choices = getattr(response, "choices", None) + n_choices = len(choices) if choices else 0 + parts = [f"choices={n_choices}"] + if choices: + msg = getattr(choices[0], "message", None) + content = getattr(msg, "content", None) if msg else None + finish = getattr(choices[0], "finish_reason", None) + parts.append(f"finish_reason={finish}") + if content is None: + parts.append("content=None") + elif isinstance(content, str): + parts.append(f"content_len={len(content)}") + else: + parts.append(f"content_type={type(content).__name__}") + reasoning = getattr(msg, "reasoning_content", None) if msg else None + if reasoning: + parts.append(f"has_reasoning=True(len={len(reasoning)})") + usage = getattr(response, "usage", None) + if usage: + parts.append( + f"tokens(prompt={getattr(usage, 'prompt_tokens', '?')}," + f"completion={getattr(usage, 'completion_tokens', '?')})" + ) + return ", ".join(parts) + except Exception: + return "(summarize failed)" + @staticmethod def _extract_text_from_response(response: Any) -> str: if not getattr(response, "choices", None): diff --git a/tests/unit/infrastructure/ai/providers/test_openai_provider.py b/tests/unit/infrastructure/ai/providers/test_openai_provider.py index eeb72076..7f7ebcbe 100644 --- a/tests/unit/infrastructure/ai/providers/test_openai_provider.py +++ b/tests/unit/infrastructure/ai/providers/test_openai_provider.py @@ -145,7 +145,45 @@ async def test_stream_generate(self, provider): assert mock_create.await_args.kwargs["stream"] is True @pytest.mark.anyio - async def test_generate_empty_content_raises(self, provider): + @patch("infrastructure.ai.providers.openai_provider.asyncio.sleep", new_callable=AsyncMock) + async def test_generate_empty_content_raises_after_retries(self, mock_sleep, provider): + """空响应经过多次重试后仍为空,最终抛出 RuntimeError。""" + prompt = Prompt(system="You are helpful", user="Hello") + config = GenerationConfig(model="test-model") + empty_response = SimpleNamespace( + choices=[SimpleNamespace(message=SimpleNamespace(content=None))], + usage=SimpleNamespace(prompt_tokens=10, completion_tokens=5), + ) + + def _make_empty_stream(): + return _FakeStream([ + SimpleNamespace( + choices=[SimpleNamespace(delta=SimpleNamespace(content=None))], + usage=SimpleNamespace(prompt_tokens=10, completion_tokens=5), + ), + ]) + + # 3 次尝试(首次 + 2 次重试),每次都是 non-stream 空 → stream 空 + side_effects = [] + for _ in range(3): + side_effects.extend([empty_response, _make_empty_stream()]) + + with patch.object(provider.async_client.chat.completions, "create", new_callable=AsyncMock) as mock_create: + mock_create.side_effect = side_effects + + with pytest.raises(RuntimeError, match="empty content"): + await provider.generate(prompt, config) + + # 每次尝试:1 次 non-stream + 1 次 stream fallback = 2 次调用 + # 共 3 次尝试 = 6 次调用 + assert mock_create.await_count == 6 + # 重试间应调用 asyncio.sleep(2 次重试间隔) + assert mock_sleep.await_count == 2 + + @pytest.mark.anyio + @patch("infrastructure.ai.providers.openai_provider.asyncio.sleep", new_callable=AsyncMock) + async def test_generate_empty_content_retries_then_succeeds(self, mock_sleep, provider): + """空响应自动重试后,后续请求返回正常内容,应成功返回。""" prompt = Prompt(system="You are helpful", user="Hello") config = GenerationConfig(model="test-model") empty_response = SimpleNamespace( @@ -158,12 +196,22 @@ async def test_generate_empty_content_raises(self, provider): usage=SimpleNamespace(prompt_tokens=10, completion_tokens=5), ), ]) + good_response = SimpleNamespace( + choices=[SimpleNamespace(message=SimpleNamespace(content="Recovered!"))], + usage=SimpleNamespace(prompt_tokens=12, completion_tokens=8), + ) with patch.object(provider.async_client.chat.completions, "create", new_callable=AsyncMock) as mock_create: - mock_create.side_effect = [empty_response, empty_stream] + # 第 1 次尝试:non-stream 空 → stream 空 → RuntimeError → 触发重试 + # 第 2 次尝试:non-stream 返回正常内容 + mock_create.side_effect = [empty_response, empty_stream, good_response] - with pytest.raises(RuntimeError, match="empty content"): - await provider.generate(prompt, config) + result = await provider.generate(prompt, config) + + assert result.content == "Recovered!" + assert mock_create.await_count == 3 + # 仅重试了 1 次 + assert mock_sleep.await_count == 1 def test_missing_api_key(self): with pytest.raises(ValueError, match="API key is required"): @@ -276,7 +324,8 @@ async def test_stream_generate_extracts_output_text_delta(self, provider): assert mock_create.await_args.kwargs["stream"] is True @pytest.mark.anyio - async def test_generate_empty_responses_raises(self, provider): + @patch("infrastructure.ai.providers.openai_provider.asyncio.sleep", new_callable=AsyncMock) + async def test_generate_empty_responses_raises(self, mock_sleep, provider): prompt = Prompt(system="You are helpful", user="Hello") config = GenerationConfig(model="test-model") response = SimpleNamespace( @@ -290,6 +339,11 @@ async def test_generate_empty_responses_raises(self, provider): with pytest.raises(RuntimeError, match="empty content"): await provider.generate(prompt, config) + # 3 次尝试(首次 + 2 次重试),每次返回空 output + assert mock_create.await_count == 3 + # 重试间应调用 asyncio.sleep + assert mock_sleep.await_count == 2 + class TestProfilePassthrough: """profile 的 use_legacy_chat_completions 正确透传到 OpenAIProvider"""