-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrunner_engine.py
More file actions
139 lines (110 loc) · 5.72 KB
/
runner_engine.py
File metadata and controls
139 lines (110 loc) · 5.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# runner_engine.py
import asyncio
from hot_cache import LRUCache
from web_search_autonomy import WebSearchAutonomySystem
# NOTE: The mock objects previously in this file have been moved to `mock_objects.py`
# for centralized test management. The main `RunnerEngine` class below is the
# actual implementation.
class RunnerEngine:
"""推論実行エンジン(Layer 3)"""
def __init__(self, llm_client, db_interface, web_search_system):
self.llm = llm_client
self.db = db_interface
self.web_search_system = web_search_system
self.hot_cache = LRUCache(max_size=20)
async def _fetch_db_coordinates(self, db_coordinates: list) -> dict:
"""DB座標から知識を取得(ホットキャッシュ利用)"""
results = {}
for coord in db_coordinates[:5]:
if coord in self.hot_cache:
print(f"Cache: Hit for {coord}")
results[coord] = self.hot_cache[coord]
continue
print(f"Cache: Miss for {coord}")
tile = await self.db.fetch_async(coord)
if tile:
self.hot_cache[coord] = tile
results[coord] = tile
return results
def _build_context(self, question: str, db_results: dict, session_context) -> str:
"""LLMプロンプト用のコンテキストを構築"""
context_parts = []
if session_context: # この例では未使用
context_parts.append(f"セッション履歴: {session_context}")
if db_results:
for coord, tile in db_results.items():
context_parts.append(f"【確実性{tile['certainty']}%】{tile['content']}")
return "\n\n".join(context_parts)
def _format_prompt(self, question: str, context: str) -> str:
return f"情報: {context}\n\n質問: {question}\n\n指示: 提供された情報に基づき回答してください。"
async def generate_response_streaming(self, question: str, db_coordinates: list, session_context=None):
"""ストリーミング形式での回答生成と動的なWeb検索判断"""
web_decision = self.web_search_system.should_search(question)
db_task = asyncio.create_task(self._fetch_db_coordinates(db_coordinates))
web_task = asyncio.create_task(mock_web_search_api(question)) if web_decision["should_search"] else None
try:
db_results = await asyncio.wait_for(db_task, timeout=0.5)
except asyncio.TimeoutError:
db_results = {}
context = self._build_context(question, db_results, session_context)
prompt = self._format_prompt(question, context)
partial_response = ""
final_metadata = {}
async for result in self.llm.generate_streaming(prompt):
if result['type'] == 'response_token':
token = result['token']
partial_response += token
yield result # トークンをそのまま中継
# 推論中の動的Web検索判定
if len(partial_response) > 5 and len(partial_response) % 20 == 0 and not web_task:
class MockInferenceState: partial_response = ""
inference_state = MockInferenceState()
inference_state.partial_response = partial_response
dynamic_decision = self.web_search_system.should_search(question, inference_state=inference_state)
if dynamic_decision["should_search"]:
print("\n*** Dynamic Web Search Triggered! ***\n")
web_task = asyncio.create_task(mock_web_search_api(question))
elif result['type'] == 'completion':
# Judge層で必要となる構造化されたメタデータを準備
final_metadata = result['metadata']
web_results_content = []
if web_task:
try:
web_results_content = await asyncio.wait_for(web_task, timeout=2.0)
yield {"type": "web_results", "results": web_results_content}
except asyncio.TimeoutError:
yield {"type": "web_results", "results": [], "error": "timeout"}
# 最終的なメタデータを生成して終了
final_metadata["referenced_coords"] = db_coordinates
final_metadata["web_results"] = web_results_content
yield {
"type": "final_structured_response",
"is_complete": True,
"main_response": partial_response,
**final_metadata # thinking_process, key_pointsなどを展開
}
# --- 実行例 ---
async def main():
# モックコンポーネントの初期化
llm = MockLLMClient()
db = MockDBInterface()
web_search = WebSearchAutonomySystem()
runner = RunnerEngine(llm, db, web_search)
question = "最新の心筋梗塞の診断について"
# Layer 1で抽出された想定の座標
db_coordinates = [(28, 35, 15)]
print(f"--- Running pipeline for question: '{question}' ---")
final_response = {}
async for event in runner.generate_response_streaming(question, db_coordinates):
if event['type'] == 'response_token':
print(event['token'], end='', flush=True)
elif event['type'] == 'web_results':
print(f"\n\n--- Web Results Received ---")
print(event['results'])
elif event['type'] == 'final_structured_response':
final_response = event
print("\n\n--- Final Structured Response (for Judge Layer) ---")
import json
print(json.dumps(final_response, indent=2, ensure_ascii=False))
if __name__ == "__main__":
asyncio.run(main())