Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion application/ai/structured_json_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@


def _is_retryable_llm_error(exc: Exception) -> bool:
"""识别上游临时故障,避免 429/5xx/超时直接短路。"""
"""识别上游临时故障,避免 429/5xx/超时/空响应直接短路。"""
message = str(exc).lower()
retryable_markers = (
"overloaded_error",
Expand All @@ -37,6 +37,8 @@ def _is_retryable_llm_error(exc: Exception) -> bool:
"temporar",
"connection reset",
"service unavailable",
"empty content",
"empty non-stream content",
)
retryable_statuses = (" 429", " 500", " 502", " 503", " 504", " 529")
return any(marker in message for marker in retryable_markers) or any(
Expand Down
98 changes: 83 additions & 15 deletions infrastructure/ai/providers/openai_provider.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""OpenAI LLM 提供商实现"""
import asyncio
import logging
import openai
import httpx
Expand All @@ -15,6 +16,11 @@

logger = logging.getLogger(__name__)

# 空响应自动重试次数(不含首次调用)
_EMPTY_CONTENT_MAX_RETRIES = 2
# 空响应重试基础延迟(秒),实际延迟 = 基础延迟 * 2^attempt
_EMPTY_CONTENT_BASE_DELAY = 1.0


class OpenAIProvider(BaseProvider):
"""OpenAI LLM 提供商实现
Expand Down Expand Up @@ -71,29 +77,57 @@ async def generate(
base_url = self.settings.base_url or "https://api.openai.com/v1"
use_responses = not self._use_legacy and base_url not in self.__class__._fallback_to_chat_cache

if use_responses:
last_empty_exc: Exception | None = None
for attempt in range(1 + _EMPTY_CONTENT_MAX_RETRIES):
try:
return await self._generate_via_responses(prompt, config)
except (openai.NotFoundError, openai.BadRequestError) as e:
logger.info(f"Responses API unsupported for {base_url}, falling back to chat completions: {str(e)}")
self.__class__._fallback_to_chat_cache.add(base_url)
except Exception as e:
# 某些网关在路径错误时可能不抛严格的 404 而是抛出其他错误,如果消息含有明确路径错误也尝试降级
if "404" in str(e) or "Not Found" in str(e) or "400" in str(e) or "Account invalid" in str(e) or "INVALID_ARGUMENT" in str(e):
logger.info(f"Gateway returned error for Responses API ({base_url}), falling back: {str(e)}")
self.__class__._fallback_to_chat_cache.add(base_url)
else:
raise
if use_responses:
try:
return await self._generate_via_responses(prompt, config)
except (openai.NotFoundError, openai.BadRequestError) as e:
logger.info(f"Responses API unsupported for {base_url}, falling back to chat completions: {str(e)}")
self.__class__._fallback_to_chat_cache.add(base_url)
except Exception as e:
# 某些网关在路径错误时可能不抛严格的 404 而是抛出其他错误,如果消息含有明确路径错误也尝试降级
if "404" in str(e) or "Not Found" in str(e) or "400" in str(e) or "Account invalid" in str(e) or "INVALID_ARGUMENT" in str(e):
logger.info(f"Gateway returned error for Responses API ({base_url}), falling back: {str(e)}")
self.__class__._fallback_to_chat_cache.add(base_url)
Comment on lines 77 to +93

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Recompute use_responses inside the retry loop.

Line 78 computes use_responses once, but Lines 88/93 mutate _fallback_to_chat_cache later. In the same generate() call, subsequent retries still use the stale True and keep hitting Responses before falling back again.

💡 Suggested fix
-            use_responses = not self._use_legacy and base_url not in self.__class__._fallback_to_chat_cache
-
             last_empty_exc: Exception | None = None
             for attempt in range(1 + _EMPTY_CONTENT_MAX_RETRIES):
+                use_responses = (
+                    not self._use_legacy
+                    and base_url not in self.__class__._fallback_to_chat_cache
+                )
                 try:
                     if use_responses:
                         try:
                             return await self._generate_via_responses(prompt, config)
                         except (openai.NotFoundError, openai.BadRequestError) as e:
                             logger.info(f"Responses API unsupported for {base_url}, falling back to chat completions: {str(e)}")
                             self.__class__._fallback_to_chat_cache.add(base_url)
+                            use_responses = False
                         except Exception as e:
                             # 某些网关在路径错误时可能不抛严格的 404 而是抛出其他错误,如果消息含有明确路径错误也尝试降级
                             if "404" in str(e) or "Not Found" in str(e) or "400" in str(e) or "Account invalid" in str(e) or "INVALID_ARGUMENT" in str(e):
                                 logger.info(f"Gateway returned error for Responses API ({base_url}), falling back: {str(e)}")
                                 self.__class__._fallback_to_chat_cache.add(base_url)
+                                use_responses = False
                             else:
                                 raise
🧰 Tools
🪛 Ruff (0.15.15)

[warning] 87-87: Use explicit conversion flag

Replace with conversion flag

(RUF010)


[warning] 90-90: Comment contains ambiguous (FULLWIDTH COMMA). Did you mean , (COMMA)?

(RUF003)


[warning] 92-92: Use explicit conversion flag

Replace with conversion flag

(RUF010)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@infrastructure/ai/providers/openai_provider.py` around lines 77 - 93, The
computed flag use_responses should be recalculated on each retry so changes to
the class-level _fallback_to_chat_cache are respected; move or re-evaluate
use_responses (which depends on base_url, self._use_legacy and
self.__class__._fallback_to_chat_cache) inside the for loop in generate() before
deciding to call _generate_via_responses, so that after you add base_url to
_fallback_to_chat_cache subsequent attempts will skip the Responses path and
fall back to chat completions.

else:
raise

# 使用降级的 Chat Completions API
return await self._generate_via_chat(prompt, config)
except RuntimeError as e:
if self._is_empty_content_error(e) and attempt < _EMPTY_CONTENT_MAX_RETRIES:
last_empty_exc = e
delay = _EMPTY_CONTENT_BASE_DELAY * (2 ** attempt)
logger.warning(
"LLM 返回空内容,%.1f 秒后自动重试 (attempt=%d/%d): %s",
delay, attempt + 1, _EMPTY_CONTENT_MAX_RETRIES, e,
)
await asyncio.sleep(delay)
continue
raise

# 使用降级的 Chat Completions API
return await self._generate_via_chat(prompt, config)
# 所有重试均失败(理论上不会到这里,但做兜底)
if last_empty_exc is not None:
raise last_empty_exc
raise RuntimeError("LLM generate exhausted empty-content retries")
except RuntimeError:
raise
except ValueError:
raise
except Exception as e:
raise RuntimeError(f"Failed to generate text: {str(e)}") from e

@staticmethod
def _is_empty_content_error(exc: Exception) -> bool:
"""判断异常是否为空响应导致的错误,用于自动重试决策。"""
msg = str(exc).lower()
return (
"empty content" in msg
or "empty non-stream content" in msg
)

async def _generate_via_chat(self, prompt: Prompt, config: GenerationConfig) -> GenerationResult:
"""Chat Completions API 非流式生成

Expand Down Expand Up @@ -124,9 +158,12 @@ async def _generate_via_chat(self, prompt: Prompt, config: GenerationConfig) ->
content = self._extract_text_from_response(response)

if not content:
# 记录原始响应以便排查:含 choices 数量、message 结构、finish_reason
raw_preview = self._summarize_raw_response(response)
logger.warning(
"OpenAI-compatible response returned empty non-stream content; "
"falling back to streaming aggregation"
"falling back to streaming aggregation. raw_preview=%s",
raw_preview,
)
content, token_usage = await self._generate_via_stream(request_kwargs)
return GenerationResult(content=content, token_usage=token_usage)
Expand Down Expand Up @@ -351,6 +388,37 @@ def _normalize_chat_completion_content(content: Any) -> str:

return str(content).strip()

@staticmethod
def _summarize_raw_response(response: Any) -> str:
"""生成原始响应的摘要字符串,用于空响应时的诊断日志。"""
try:
choices = getattr(response, "choices", None)
n_choices = len(choices) if choices else 0
parts = [f"choices={n_choices}"]
if choices:
msg = getattr(choices[0], "message", None)
content = getattr(msg, "content", None) if msg else None
finish = getattr(choices[0], "finish_reason", None)
parts.append(f"finish_reason={finish}")
if content is None:
parts.append("content=None")
elif isinstance(content, str):
parts.append(f"content_len={len(content)}")
else:
parts.append(f"content_type={type(content).__name__}")
reasoning = getattr(msg, "reasoning_content", None) if msg else None
if reasoning:
parts.append(f"has_reasoning=True(len={len(reasoning)})")
usage = getattr(response, "usage", None)
if usage:
parts.append(
f"tokens(prompt={getattr(usage, 'prompt_tokens', '?')},"
f"completion={getattr(usage, 'completion_tokens', '?')})"
)
return ", ".join(parts)
except Exception:
return "(summarize failed)"

@staticmethod
def _extract_text_from_response(response: Any) -> str:
if not getattr(response, "choices", None):
Expand Down
64 changes: 59 additions & 5 deletions tests/unit/infrastructure/ai/providers/test_openai_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,45 @@ async def test_stream_generate(self, provider):
assert mock_create.await_args.kwargs["stream"] is True

@pytest.mark.anyio
async def test_generate_empty_content_raises(self, provider):
@patch("infrastructure.ai.providers.openai_provider.asyncio.sleep", new_callable=AsyncMock)
async def test_generate_empty_content_raises_after_retries(self, mock_sleep, provider):
"""空响应经过多次重试后仍为空,最终抛出 RuntimeError。"""
prompt = Prompt(system="You are helpful", user="Hello")
config = GenerationConfig(model="test-model")
empty_response = SimpleNamespace(
choices=[SimpleNamespace(message=SimpleNamespace(content=None))],
usage=SimpleNamespace(prompt_tokens=10, completion_tokens=5),
)

def _make_empty_stream():
return _FakeStream([
SimpleNamespace(
choices=[SimpleNamespace(delta=SimpleNamespace(content=None))],
usage=SimpleNamespace(prompt_tokens=10, completion_tokens=5),
),
])

# 3 次尝试(首次 + 2 次重试),每次都是 non-stream 空 → stream 空
side_effects = []
for _ in range(3):
side_effects.extend([empty_response, _make_empty_stream()])

with patch.object(provider.async_client.chat.completions, "create", new_callable=AsyncMock) as mock_create:
mock_create.side_effect = side_effects

with pytest.raises(RuntimeError, match="empty content"):
await provider.generate(prompt, config)

# 每次尝试:1 次 non-stream + 1 次 stream fallback = 2 次调用
# 共 3 次尝试 = 6 次调用
assert mock_create.await_count == 6
# 重试间应调用 asyncio.sleep(2 次重试间隔)
assert mock_sleep.await_count == 2

@pytest.mark.anyio
@patch("infrastructure.ai.providers.openai_provider.asyncio.sleep", new_callable=AsyncMock)
async def test_generate_empty_content_retries_then_succeeds(self, mock_sleep, provider):
"""空响应自动重试后,后续请求返回正常内容,应成功返回。"""
prompt = Prompt(system="You are helpful", user="Hello")
config = GenerationConfig(model="test-model")
empty_response = SimpleNamespace(
Expand All @@ -158,12 +196,22 @@ async def test_generate_empty_content_raises(self, provider):
usage=SimpleNamespace(prompt_tokens=10, completion_tokens=5),
),
])
good_response = SimpleNamespace(
choices=[SimpleNamespace(message=SimpleNamespace(content="Recovered!"))],
usage=SimpleNamespace(prompt_tokens=12, completion_tokens=8),
)

with patch.object(provider.async_client.chat.completions, "create", new_callable=AsyncMock) as mock_create:
mock_create.side_effect = [empty_response, empty_stream]
# 第 1 次尝试:non-stream 空 → stream 空 → RuntimeError → 触发重试
# 第 2 次尝试:non-stream 返回正常内容
mock_create.side_effect = [empty_response, empty_stream, good_response]

with pytest.raises(RuntimeError, match="empty content"):
await provider.generate(prompt, config)
result = await provider.generate(prompt, config)

assert result.content == "Recovered!"
assert mock_create.await_count == 3
# 仅重试了 1 次
assert mock_sleep.await_count == 1

def test_missing_api_key(self):
with pytest.raises(ValueError, match="API key is required"):
Expand Down Expand Up @@ -276,7 +324,8 @@ async def test_stream_generate_extracts_output_text_delta(self, provider):
assert mock_create.await_args.kwargs["stream"] is True

@pytest.mark.anyio
async def test_generate_empty_responses_raises(self, provider):
@patch("infrastructure.ai.providers.openai_provider.asyncio.sleep", new_callable=AsyncMock)
async def test_generate_empty_responses_raises(self, mock_sleep, provider):
prompt = Prompt(system="You are helpful", user="Hello")
config = GenerationConfig(model="test-model")
response = SimpleNamespace(
Expand All @@ -290,6 +339,11 @@ async def test_generate_empty_responses_raises(self, provider):
with pytest.raises(RuntimeError, match="empty content"):
await provider.generate(prompt, config)

# 3 次尝试(首次 + 2 次重试),每次返回空 output
assert mock_create.await_count == 3
# 重试间应调用 asyncio.sleep
assert mock_sleep.await_count == 2


class TestProfilePassthrough:
"""profile 的 use_legacy_chat_completions 正确透传到 OpenAIProvider"""
Expand Down