Skip to content
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ repos:
--disable=C3001,
--disable=R1702,
--disable=R0912,
--disable=R0915,
--max-line-length=120,
--max-statements=75,
]
Expand Down
9 changes: 9 additions & 0 deletions docs/todo.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
1. 去掉复杂的save_load_cache
2. 去掉tool中的input和output参数
3. 新建自己的测试函数

4. base_context 增加dict函数
5. token 之前 self.llm
6. cache 增加reset path的能力

7. 完善文档
3 changes: 0 additions & 3 deletions docs/zh/guide/async_op_llm_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ import json

@C.register_op()
class QAOp(BaseAsyncOp):
file_path: str = __file__ # 必须设置,用于自动查找 prompt 文件

async def async_execute(self):
"""执行问答逻辑"""
# 1. 读取输入
Expand Down Expand Up @@ -156,7 +154,6 @@ if __name__ == "__main__":
6. **调用 LLM**:使用 `await self.llm.achat(messages=messages, ...)`
7. **处理响应**:使用 `callback_fn` 处理或转换响应,返回处理后的结果
8. **应用上下文**:必须在 `FlowLLMApp()` 上下文里调用
9. **file_path**:Op 类中必须设置 `file_path = __file__`,用于自动查找 prompt 文件

---

Expand Down
2 changes: 0 additions & 2 deletions docs/zh/guide/cmd_service_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ from flowllm.core.op import BaseAsyncOp

@C.register_op()
class EchoOp(BaseAsyncOp):
file_path: str = __file__

async def async_execute(self):
text = self.context.get("text", "")
self.context.response.answer = f"echo: {text}"
Expand Down
2 changes: 0 additions & 2 deletions docs/zh/guide/http_service_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ from flowllm.core.op import BaseAsyncOp

@C.register_op()
class EchoOp(BaseAsyncOp):
file_path: str = __file__

async def async_execute(self):
text = self.context.get("text", "")
self.context.response.answer = f"echo: {text}"
Expand Down
2 changes: 0 additions & 2 deletions docs/zh/guide/http_stream_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ from flowllm.core.schema import FlowStreamChunk, Message

@C.register_op()
class StreamChatOp(BaseAsyncOp):
file_path: str = __file__

async def async_execute(self):
messages = self.context.messages
system_prompt = self.context.system_prompt
Expand Down
1 change: 0 additions & 1 deletion docs/zh/guide/mcp_service_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ from flowllm.core.op import BaseAsyncOp
@C.register_op()
class MockSearchOp(BaseAsyncOp):
"""Mock search operation that uses LLM to generate realistic search results."""
file_path: str = __file__

async def async_execute(self):
query = self.context.query
Expand Down
2 changes: 1 addition & 1 deletion flowllm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
load_env()

from . import gallery # noqa: E402, F401 # pylint: disable=wrong-import-position,unused-import
from . import extensions # noqa: E402, F401 # pylint: disable=wrong-import-position,unused-import
# from . import extensions # noqa: E402, F401 # pylint: disable=wrong-import-position,unused-import

__version__ = "0.2.0.9"
14 changes: 12 additions & 2 deletions flowllm/core/llm/lite_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,12 @@ def stream_chat(
**self.kwargs,
**kwargs,
}
log_kwargs = {k: v for k, v in chat_kwargs.items() if k != "messages"}
log_kwargs: Dict[str, object] = {}
for k, v in chat_kwargs.items():
if k in ["messages", "tools"]:
log_kwargs[k] = len(v) if v is not None else 0
else:
log_kwargs[k] = v
logger.info(f"LiteLLM.stream_chat: {log_kwargs}")

for i in range(self.max_retries):
Expand Down Expand Up @@ -217,7 +222,12 @@ async def astream_chat(
**self.kwargs,
**kwargs,
}
log_kwargs = {k: v for k, v in chat_kwargs.items() if k != "messages"}
log_kwargs: Dict[str, object] = {}
for k, v in chat_kwargs.items():
if k in ["messages", "tools"]:
log_kwargs[k] = len(v) if v is not None else 0
else:
log_kwargs[k] = v
logger.info(f"LiteLLM.astream_chat: {log_kwargs}")

for i in range(self.max_retries):
Expand Down
14 changes: 12 additions & 2 deletions flowllm/core/llm/openai_compatible_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,12 @@ def stream_chat(
**self.kwargs,
**kwargs,
}
log_kwargs = {k: v for k, v in chat_kwargs.items() if k != "messages"}
log_kwargs: Dict[str, object] = {}
for k, v in chat_kwargs.items():
if k in ["messages", "tools"]:
log_kwargs[k] = len(v) if v is not None else 0
else:
log_kwargs[k] = v
logger.info(f"OpenAICompatibleLLM.stream_chat: {log_kwargs}")

for i in range(self.max_retries):
Expand Down Expand Up @@ -208,7 +213,12 @@ async def astream_chat(
**self.kwargs,
**kwargs,
}
log_kwargs = {k: v for k, v in chat_kwargs.items() if k != "messages"}
log_kwargs: Dict[str, object] = {}
for k, v in chat_kwargs.items():
if k in ["messages", "tools"]:
log_kwargs[k] = len(v) if v is not None else 0
else:
log_kwargs[k] = v
logger.info(f"OpenAICompatibleLLM.astream_chat: {log_kwargs}")

for i in range(self.max_retries):
Expand Down
70 changes: 6 additions & 64 deletions flowllm/core/op/base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import copy
import inspect
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Callable, List, Union
Expand Down Expand Up @@ -78,8 +79,6 @@ def execute(self):
```
"""

file_path: str = __file__

def __new__(cls, *args, **kwargs):
"""Create a new instance and save initialization arguments for copying.

Expand Down Expand Up @@ -149,7 +148,9 @@ def __init__(
self.raise_exception: bool = raise_exception
self.enable_multithread: bool = enable_multithread
self.language: str = language or C.language
default_prompt_path: str = self.file_path.replace("op.py", "prompt.yaml")

subclass_file_path: str = inspect.getfile(self.__class__)
default_prompt_path: str = subclass_file_path.replace("op.py", "prompt.yaml")
self.prompt_path: Path = Path(prompt_path if prompt_path else default_prompt_path)
self.prompt = PromptHandler(language=self.language).load_prompt_by_file(self.prompt_path)
self._llm: BaseLLM | str = llm
Expand Down Expand Up @@ -188,46 +189,12 @@ def cache(self):
self._cache = CacheHandler(self.cache_path.format(op_name=self.short_name))
return self._cache

def save_load_cache(self, key: str, fn: Callable, **kwargs):
"""Save or load from cache.

If caching is enabled, checks cache first. If not found, executes the
function and saves the result. Otherwise, executes the function directly.

Args:
key: Cache key for storing/retrieving the result
fn: Function to execute if cache miss
**kwargs: Additional arguments for cache load operation

Returns:
Cached result if available, otherwise result from function execution
"""
if self.enable_cache:
result = self.cache.load(key, **kwargs)
if result is None:
result = fn()
self.cache.save(key, result, expire_hours=self.cache_expire_hours)
else:
logger.info(f"load {key} from cache")
else:
result = fn()

return result

def before_execute(self):
"""Hook method called before execute(). Override in subclasses.

This method is called automatically by `call()` before executing
the main `execute()` method. Use this to perform any setup,
validation, or preprocessing needed before execution.

Example:
```python
def before_execute(self):
# Validate inputs
if not self.context.get("input"):
raise ValueError("Input is required")
```
"""

def after_execute(self):
Expand All @@ -236,22 +203,11 @@ def after_execute(self):
This method is called automatically by `call()` after successfully
executing the main `execute()` method. Use this to perform any
cleanup, post-processing, or result transformation.

Example:
```python
def after_execute(self):
# Post-process results
if self.context.response:
self.context.response.answer = self.context.response.answer.upper()
```
"""

@abstractmethod
def execute(self):
"""Main execution method. Must be implemented in subclasses.

Returns:
Execution result
"""

def default_execute(self, e: Exception = None, **kwargs):
Expand All @@ -260,24 +216,10 @@ def default_execute(self, e: Exception = None, **kwargs):
This method is called when `execute()` fails and `raise_exception`
is False. It provides a fallback mechanism to return a default result
instead of raising an exception.

Args:
e: The exception that was raised during execution (if any)
**kwargs: Additional keyword arguments

Returns:
Default execution result

Example:
```python
def default_execute(self, e: Exception = None, **kwargs):
logger.warning(f"Execution failed: {e}, returning default result")
return {"status": "error", "message": str(e)}
```
"""

@staticmethod
def build_context(context: FlowContext = None, **kwargs):
def build_context(context: FlowContext = None, **kwargs) -> FlowContext:
"""Build or update a flow context.

Args:
Expand Down Expand Up @@ -592,7 +534,7 @@ def vector_store(self) -> BaseVectorStore:
return self._vector_store

@property
def service_config_metadata(self) -> dict:
def service_metadata(self) -> dict:
"""Get the service config metadata for this operation.

Returns:
Expand Down
32 changes: 32 additions & 0 deletions flowllm/core/schema/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,38 @@ def simple_dump(self, add_reasoning: bool = True) -> dict:

return result

def format_message(
self,
i: int | None = None,
add_time_created: bool = False,
use_name_first: bool = False,
add_reasoning_content: bool = True,
add_tool_calls: bool = True
) -> str:
content = ""
if i is not None:
content += f"round{i} "

if add_time_created:
content += f"[{self.time_created}] "

if use_name_first:
content += f"{self.name or self.role.value}:\n"
else:
content += f"{self.role.value}:\n"

if add_reasoning_content and self.reasoning_content:
content += self.reasoning_content + "\n"

if self.content:
content += self.content + "\n"

if add_tool_calls and self.tool_calls:
for tool_call in self.tool_calls:
content += f" - tool_call={tool_call.name} params={tool_call.arguments}\n"

return content.strip()


class Trajectory(BaseModel):
"""Represents a conversation trajectory with messages and optional scoring."""
Expand Down
19 changes: 18 additions & 1 deletion flowllm/core/storage/cache_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,30 @@ class CacheHandler:
- Recording and managing update timestamps
"""

def __init__(self, cache_dir: str = "cache"):
def __init__(self, cache_dir: Union[str, Path] = "cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.metadata_file = self.cache_dir / "metadata.json"
self.metadata = {}
self._load_metadata()

def set_cache_dir(self, cache_dir: Union[str, Path]):
"""
Set a new cache directory and reload metadata

Args:
cache_dir: New cache directory path
"""
new_cache_dir = Path(cache_dir)
new_cache_dir.mkdir(parents=True, exist_ok=True)

self.cache_dir = new_cache_dir
self.metadata_file = self.cache_dir / "metadata.json"
self.metadata = {}
self._load_metadata()

logger.info(f"Cache directory changed to: {self.cache_dir}")

def _load_metadata(self):
"""Load metadata"""
if self.metadata_file.exists():
Expand Down
21 changes: 16 additions & 5 deletions flowllm/core/vector_store/pgvector_vector_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,23 @@ def _build_sql_filters(
# Handle special keys that are stored as direct columns
if key == "unique_id":
# unique_id is a direct column, not in metadata JSONB
if use_async:
conditions.append(f"unique_id = ${param_idx}")
# Support both single value and list of values
if isinstance(filter_value, list):
if use_async:
placeholders = ", ".join(f"${param_idx + i}" for i in range(len(filter_value)))
conditions.append(f"unique_id IN ({placeholders})")
else:
placeholders = ", ".join(["%s"] * len(filter_value))
conditions.append(f"unique_id IN ({placeholders})")
params.extend([str(v) for v in filter_value])
param_idx += len(filter_value)
else:
conditions.append("unique_id = %s")
params.append(str(filter_value))
param_idx += 1
if use_async:
conditions.append(f"unique_id = ${param_idx}")
else:
conditions.append("unique_id = %s")
params.append(str(filter_value))
param_idx += 1
continue

# Strip "metadata." prefix if present (since we're already accessing metadata column)
Expand Down
21 changes: 19 additions & 2 deletions flowllm/core/vector_store/qdrant_vector_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,26 @@ def _build_qdrant_filters(filter_dict: Optional[Dict[str, Any]] = None):
for key, filter_value in filter_dict.items():
# Handle special keys that are stored at payload root level
if key == "unique_id":
qdrant_key = "original_id"
# unique_id is stored as original_id in Qdrant payload
# Support both single value and list of values
if isinstance(filter_value, list):
conditions.append(
FieldCondition(
key="original_id",
match=MatchAny(any=filter_value),
),
)
else:
conditions.append(
FieldCondition(
key="original_id",
match=MatchValue(value=filter_value),
),
)
continue

# Handle nested keys by prefixing with metadata.
elif not key.startswith("metadata."):
if not key.startswith("metadata."):
qdrant_key = f"metadata.{key}"
else:
qdrant_key = key
Expand Down
Loading
Loading