Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions server-https/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn
import logging

# Configure logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
from typing import Dict, Any, List, Optional, AsyncGenerator
from sentence_transformers import SentenceTransformer
from pymilvus import connections, Collection
Expand Down Expand Up @@ -148,7 +153,7 @@ def milvus_search(query: str, top_k: int = 5) -> Dict[str, Any]:
})
return {"results": hits}
except Exception as e:
print(f"[ERROR] Milvus search failed: {e}")
logger.error("Milvus search failed: %s", e)
return {"results": []}
finally:
try:
Expand All @@ -166,7 +171,7 @@ async def execute_tool(tool_call: Dict[str, Any]) -> tuple[str, List[str]]:
query = arguments.get("query", "")
top_k = arguments.get("top_k", 5)

print(f"[TOOL] Executing Milvus search for: '{query}' (top_k={top_k})")
logger.info("Executing Milvus search for: '%s' (top_k=%d)", query, top_k)
result = milvus_search(query, top_k)

# Collect citations
Expand All @@ -191,7 +196,7 @@ async def execute_tool(tool_call: Dict[str, Any]) -> tuple[str, List[str]]:
return f"Unknown tool: {function_name}", []

except Exception as e:
print(f"[ERROR] Tool execution failed: {e}")
logger.error("Tool execution failed: %s", e)
return f"Tool execution failed: {e}", []

async def stream_llm_response(payload: Dict[str, Any]) -> AsyncGenerator[str, None]:
Expand All @@ -203,7 +208,7 @@ async def stream_llm_response(payload: Dict[str, Any]) -> AsyncGenerator[str, No
async with client.stream("POST", KSERVE_URL, json=payload) as response:
if response.status_code != 200:
error_msg = f"LLM service error: HTTP {response.status_code}"
print(f"[ERROR] {error_msg}")
logger.error(error_msg)
yield f"data: {json.dumps({'type': 'error', 'content': error_msg})}\n\n"
return

Expand Down Expand Up @@ -262,14 +267,14 @@ async def stream_llm_response(payload: Dict[str, Any]) -> AsyncGenerator[str, No

# Handle finish reason - execute tools if needed
if finish_reason == "tool_calls":
print(f"[TOOL] Finish reason: tool_calls, executing {len(tool_calls_buffer)} tools")
logger.info("Finish reason: tool_calls, executing %d tools", len(tool_calls_buffer))

# Execute all accumulated tool calls
for tool_call in tool_calls_buffer.values():
if tool_call["function"]["name"] and tool_call["function"]["arguments"]:
try:
print(f"[TOOL] Executing: {tool_call['function']['name']}")
print(f"[TOOL] Arguments: {tool_call['function']['arguments']}")
logger.info("Executing tool: %s", tool_call["function"]["name"])
logger.info("Tool arguments: %s", tool_call["function"]["arguments"])

result, tool_citations = await execute_tool(tool_call)

Expand All @@ -284,14 +289,14 @@ async def stream_llm_response(payload: Dict[str, Any]) -> AsyncGenerator[str, No
yield follow_up_chunk

except Exception as e:
print(f"[ERROR] Tool execution error: {e}")
logger.error("Tool execution error: %s", e)
yield f"data: {json.dumps({'type': 'error', 'content': f'Tool execution failed: {e}'})}\n\n"

tool_calls_buffer.clear()
break # Tool execution complete, exit streaming loop

except json.JSONDecodeError as e:
print(f"[ERROR] JSON decode error: {e}, line: {line}")
logger.error("JSON decode error: %s, line: %s", e, line)
continue

# Send citations if any were collected
Expand All @@ -308,13 +313,13 @@ async def stream_llm_response(payload: Dict[str, Any]) -> AsyncGenerator[str, No
yield f"data: {json.dumps({'type': 'done'})}\n\n"

except Exception as e:
print(f"[ERROR] Streaming failed: {e}")
logger.error("Streaming failed: %s", e)
yield f"data: {json.dumps({'type': 'error', 'content': f'Streaming failed: {e}'})}\n\n"

async def handle_tool_follow_up(original_payload: Dict[str, Any], tool_call: Dict[str, Any], tool_result: str, citations_collector: List[str]) -> AsyncGenerator[str, None]:
"""Handle follow-up request after tool execution"""
try:
print("[TOOL] Handling follow-up request with tool results")
logger.info("Handling follow-up request with tool results")

# Create messages with tool call and result
messages = original_payload["messages"].copy()
Expand Down Expand Up @@ -345,7 +350,7 @@ async def handle_tool_follow_up(original_payload: Dict[str, Any], tool_call: Dic
yield chunk

except Exception as e:
print(f"[ERROR] Tool follow-up failed: {e}")
logger.error("Tool follow-up failed: %s", e)
yield f"data: {json.dumps({'type': 'error', 'content': f'Tool follow-up failed: {e}'})}\n\n"

async def get_non_streaming_response(payload: Dict[str, Any]) -> tuple[str, List[str]]:
Expand Down Expand Up @@ -397,7 +402,7 @@ async def options_health():
async def chat(request: ChatRequest):
"""Chat endpoint with RAG capabilities - supports both streaming and non-streaming"""
try:
print(f"[CHAT] Processing message: {request.message[:100]}...")
logger.info("Processing message: %s...", request.message[:100])

# Create initial payload
payload = {
Expand Down Expand Up @@ -440,15 +445,15 @@ async def chat(request: ChatRequest):
}

except Exception as e:
print(f"[ERROR] Chat handling failed: {e}")
logger.error("Chat handling failed: %s", e)
raise HTTPException(status_code=500, detail=f"Request failed: {e}")

if __name__ == "__main__":
print("🚀 Starting Kubeflow Docs HTTP API Server")
print(f" Port: {PORT}")
print(f" LLM Service: {KSERVE_URL}")
print(f" Milvus: {MILVUS_HOST}:{MILVUS_PORT}")
print(f" Collection: {MILVUS_COLLECTION}")
logger.info("Starting Kubeflow Docs HTTP API Server")
logger.info("Port: %s", PORT)
logger.info("LLM Service: %s", KSERVE_URL)
logger.info("Milvus: %s:%s", MILVUS_HOST, MILVUS_PORT)
logger.info("Collection: %s", MILVUS_COLLECTION)

uvicorn.run(
app,
Expand Down