Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .env.template
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
OPENAI_API_KEY=
LANGCHAIN_TRACING_V2=false
LANGCHAIN_ENDPOINT=https://api.smith.langchain.com
LANGCHAIN_API_KEY=
LANGCHAIN_PROJECT=training-llm-app
WANDB_API_KEY=
WEAVE_PROJECT_NAME=
COHERE_API_KEY=
TAVILY_API_KEY=
9 changes: 8 additions & 1 deletion app/advanced_rag/chains/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,16 @@ def __init__(self, token: str):
self.token = token


class WeaveCallId:
def __init__(self, weave_call_id: str | None):
self.weave_call_id = weave_call_id


class BaseRAGChain(ABC):
@abstractmethod
def stream(self, question: str) -> Generator[Context | AnswerToken, None, None]:
def stream(
self, question: str
) -> Generator[Context | AnswerToken | WeaveCallId, None, None]:
pass
Comment on lines +17 to 27
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

stream() の型拡張に合わせて reduce_fn()WeaveCallId を受けられるようにしておくと安全です
BaseRAGChain.stream()WeaveCallId をyieldする前提になったので、reduce_fn() が同じストリーム(or 収集済みchunks)を扱う経路があると型/実装がズレます(Line 24-26 vs Line 30)。WeaveCallId は集約では無視でOKだと思うので、明示的にスキップすると事故が減ります。

修正案(WeaveCallIdを無視して集約)
-def reduce_fn(chunks: Sequence[Context | AnswerToken]) -> Any:
+def reduce_fn(chunks: Sequence[Context | AnswerToken | WeaveCallId]) -> Any:
     context: Sequence[Document] = []
     answer: str = ""

     for chunk in chunks:
         if isinstance(chunk, Context):
             context = chunk.documents

-        if isinstance(chunk, AnswerToken):
+        elif isinstance(chunk, AnswerToken):
             answer += chunk.token
+        elif isinstance(chunk, WeaveCallId):
+            continue

     return {
         "context": context,
         "answer": answer,
     }

Also applies to: 30-44

🤖 Prompt for AI Agents
In @app/advanced_rag/chains/base.py around lines 17 - 27, The stream() method
was extended to yield WeaveCallId, so update the reduce_fn used when consuming
that stream (e.g., reduce_fn in BaseRAGChain or any implementing class) to
accept WeaveCallId values and explicitly ignore them during aggregation; change
the reducer signature to accept Union[Context, AnswerToken, WeaveCallId] (or add
a parameter typed for WeaveCallId) and add a branch that returns the accumulator
unchanged when the item is a WeaveCallId, leaving normal handling for Context
and AnswerToken intact.



Expand Down
15 changes: 3 additions & 12 deletions app/advanced_rag/chains/hybrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,7 @@
from langsmith import traceable

from app.advanced_rag.chains.base import AnswerToken, BaseRAGChain, Context, reduce_fn

_generate_answer_prompt_template = '''
以下の文脈だけを踏まえて質問に回答してください。

文脈: """
{context}
"""

質問: {question}
'''
from app.prompts import generate_answer_prompt


@traceable
Expand Down Expand Up @@ -96,11 +87,11 @@ def stream(self, question: str) -> Generator[Context | AnswerToken, None, None]:
yield Context(documents=documents)

# 回答を生成して徐々に応答を返す
generate_answer_prompt = _generate_answer_prompt_template.format(
generate_answer_prompt_text = generate_answer_prompt.format(
context=documents,
question=question,
)
for chunk in self.model.stream(generate_answer_prompt):
for chunk in self.model.stream(generate_answer_prompt_text):
yield AnswerToken(token=chunk.content)


Expand Down
38 changes: 14 additions & 24 deletions app/advanced_rag/chains/hyde.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,12 @@
from typing import Generator

import weave
from langchain.embeddings import init_embeddings
from langchain_chroma import Chroma
from langchain_core.language_models import BaseChatModel
from langsmith import traceable

from app.advanced_rag.chains.base import AnswerToken, BaseRAGChain, Context, reduce_fn

_hypothetical_prompt_template = """\
次の質問に回答する一文を書いてください。

質問: {question}
"""

_generate_answer_prompt_template = '''
以下の文脈だけを踏まえて質問に回答してください。

文脈: """
{context}
"""

質問: {question}
'''
from app.advanced_rag.chains.base import AnswerToken, BaseRAGChain, Context, WeaveCallId
from app.prompts import generate_answer_prompt, hypothetical_prompt


class HyDERAGChain(BaseRAGChain):
Expand All @@ -36,22 +21,27 @@ def __init__(self, model: BaseChatModel):
)
self.retriever = vector_store.as_retriever(search_kwargs={"k": 5})

@traceable(name="hyde", reduce_fn=reduce_fn)
def stream(self, question: str) -> Generator[Context | AnswerToken, None, None]:
@weave.op(name="hyde")
def stream(
self, question: str
) -> Generator[Context | AnswerToken | WeaveCallId, None, None]:
current_call = weave.require_current_call()
yield WeaveCallId(weave_call_id=current_call.id)

# 仮説的な回答を生成
hypothetical_prompt = _hypothetical_prompt_template.format(question=question)
hypothetical_answer = self.model.invoke(hypothetical_prompt)
hypothetical_prompt_text = hypothetical_prompt.format(question=question)
hypothetical_answer = self.model.invoke(hypothetical_prompt_text)

# 検索して検索結果を返す
documents = self.retriever.invoke(hypothetical_answer.content)
yield Context(documents=documents)

# 回答を生成して徐々に応答を返す
generate_answer_prompt = _generate_answer_prompt_template.format(
generate_answer_prompt_text = generate_answer_prompt.format(
context=documents,
question=question,
)
for chunk in self.model.stream(generate_answer_prompt):
for chunk in self.model.stream(generate_answer_prompt_text):
yield AnswerToken(token=chunk.content)


Expand Down
43 changes: 14 additions & 29 deletions app/advanced_rag/chains/multi_query.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,19 @@
from typing import Generator

import weave
from langchain.embeddings import init_embeddings
from langchain_chroma import Chroma
from langchain_core.language_models import BaseChatModel
from langsmith import traceable
from pydantic import BaseModel, Field

from app.advanced_rag.chains.base import AnswerToken, BaseRAGChain, Context, reduce_fn
from app.advanced_rag.chains.base import AnswerToken, BaseRAGChain, Context, WeaveCallId
from app.prompts import generate_answer_prompt, query_generation_prompt


class QueryGenerationOutput(BaseModel):
queries: list[str] = Field(..., description="検索クエリのリスト")


_query_generation_prompt_template = """\
質問に対してベクターデータベースから関連文書を検索するために、
3つの異なる検索クエリを生成してください。
距離ベースの類似性検索の限界を克服するために、
ユーザーの質問に対して複数の視点を提供することが目標です。

質問: {question}
"""


_generate_answer_prompt_template = '''
以下の文脈だけを踏まえて質問に回答してください。

文脈: """
{context}
"""

質問: {question}
'''


class MultiQueryRAGChain(BaseRAGChain):
def __init__(self, model: BaseChatModel):
self.model = model
Expand All @@ -46,15 +26,20 @@ def __init__(self, model: BaseChatModel):
)
self.retriever = vector_store.as_retriever(search_kwargs={"k": 5})

@traceable(name="multi_query", reduce_fn=reduce_fn)
def stream(self, question: str) -> Generator[Context | AnswerToken, None, None]:
@weave.op(name="multi_query")
def stream(
self, question: str
) -> Generator[Context | AnswerToken | WeaveCallId, None, None]:
current_call = weave.require_current_call()
yield WeaveCallId(weave_call_id=current_call.id)

# 検索クエリを生成する
query_generation_prompt = _query_generation_prompt_template.format(
query_generation_prompt_text = query_generation_prompt.format(
question=question
)
model_with_structure = self.model.with_structured_output(QueryGenerationOutput)
query_generation_output: QueryGenerationOutput = model_with_structure.invoke(
query_generation_prompt
query_generation_prompt_text
) # type: ignore[assignment]

# 検索して検索結果を返す
Expand All @@ -63,11 +48,11 @@ def stream(self, question: str) -> Generator[Context | AnswerToken, None, None]:
yield Context(documents=documents)

# 回答を生成して徐々に応答を返す
generate_answer_prompt = _generate_answer_prompt_template.format(
generate_answer_prompt_text = generate_answer_prompt.format(
context=documents,
question=question,
)
for chunk in self.model.stream(generate_answer_prompt):
for chunk in self.model.stream(generate_answer_prompt_text):
yield AnswerToken(token=chunk.content)


Expand Down
26 changes: 11 additions & 15 deletions app/advanced_rag/chains/naive.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,12 @@
from typing import Generator

import weave
from langchain.embeddings import init_embeddings
from langchain_chroma import Chroma
from langchain_core.language_models import BaseChatModel
from langsmith import traceable

from app.advanced_rag.chains.base import AnswerToken, BaseRAGChain, Context, reduce_fn

_generate_answer_prompt_template = '''
以下の文脈だけを踏まえて質問に回答してください。

文脈: """
{context}
"""

質問: {question}
'''
from app.advanced_rag.chains.base import AnswerToken, BaseRAGChain, Context, WeaveCallId
from app.prompts import generate_answer_prompt


class NaiveRAGChain(BaseRAGChain):
Expand All @@ -30,14 +21,19 @@ def __init__(self, model: BaseChatModel):
)
self.retriever = vector_store.as_retriever(search_kwargs={"k": 5})

@traceable(name="naive", reduce_fn=reduce_fn)
def stream(self, question: str) -> Generator[Context | AnswerToken, None, None]:
@weave.op(name="naive")
def stream(
self, question: str
) -> Generator[Context | AnswerToken | WeaveCallId, None, None]:
current_call = weave.require_current_call()
yield WeaveCallId(weave_call_id=current_call.id)

# 検索して検索結果を返す
documents = self.retriever.invoke(question)
yield Context(documents=documents)

# 回答を生成して徐々に応答を返す
prompt = _generate_answer_prompt_template.format(
prompt = generate_answer_prompt.format(
context=documents,
question=question,
)
Expand Down
38 changes: 9 additions & 29 deletions app/advanced_rag/chains/rag_fusion.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,22 @@
from typing import Generator

import weave
from langchain.embeddings import init_embeddings
from langchain_chroma import Chroma
from langchain_core.documents import Document
from langchain_core.language_models import BaseChatModel
from langchain_core.load import dumps, loads
from langsmith import traceable
from pydantic import BaseModel, Field

from app.advanced_rag.chains.base import AnswerToken, BaseRAGChain, Context, reduce_fn
from app.advanced_rag.chains.base import AnswerToken, BaseRAGChain, Context
from app.prompts import generate_answer_prompt, query_generation_prompt


class QueryGenerationOutput(BaseModel):
queries: list[str] = Field(..., description="検索クエリのリスト")


_query_generation_prompt_template = """\
質問に対してベクターデータベースから関連文書を検索するために、
3つの異なる検索クエリを生成してください。
距離ベースの類似性検索の限界を克服するために、
ユーザーの質問に対して複数の視点を提供することが目標です。

質問: {question}
"""


_generate_answer_prompt_template = '''
以下の文脈だけを踏まえて質問に回答してください。

文脈: """
{context}
"""

質問: {question}
'''


@traceable
@weave.op
def _reciprocal_rank_fusion(
retriever_outputs: list[list[Document]],
k: int = 60,
Expand Down Expand Up @@ -75,15 +55,15 @@ def __init__(self, model: BaseChatModel):
)
self.retriever = vector_store.as_retriever(search_kwargs={"k": 5})

@traceable(name="rag_fusion", reduce_fn=reduce_fn)
@weave.op(name="rag_fusion")
def stream(self, question: str) -> Generator[Context | AnswerToken, None, None]:
# 検索クエリを生成する
Comment on lines +58 to 60
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's find and examine the base.py file
find . -name "base.py" -path "*/advanced_rag/chains/*" | head -5

Repository: GenerativeAgents/training-llm-application-development

Length of output: 130


🏁 Script executed:

# Check the structure of advanced_rag/chains directory
ls -la app/advanced_rag/chains/ 2>/dev/null | head -20

Repository: GenerativeAgents/training-llm-application-development

Length of output: 744


🏁 Script executed:

# Get the BaseRAGChain class definition and stream method
rg -A 10 "class BaseRAGChain" app/advanced_rag/chains/base.py

Repository: GenerativeAgents/training-llm-application-development

Length of output: 390


🏁 Script executed:

# Check what WeaveCallId is and where it's defined
rg "WeaveCallId" app/advanced_rag/chains/

Repository: GenerativeAgents/training-llm-application-development

Length of output: 1836


🏁 Script executed:

# Check the current rag_fusion.py implementation
cat -n app/advanced_rag/chains/rag_fusion.py

Repository: GenerativeAgents/training-llm-application-development

Length of output: 4046


🏁 Script executed:

# Check if WeaveCallId is imported in rag_fusion.py
rg "import.*WeaveCallId|from.*WeaveCallId" app/advanced_rag/chains/rag_fusion.py

Repository: GenerativeAgents/training-llm-application-development

Length of output: 79


🏁 Script executed:

# Check the stream method in rag_fusion.py around line 78-80
sed -n '70,100p' app/advanced_rag/chains/rag_fusion.py | cat -n

Repository: GenerativeAgents/training-llm-application-development

Length of output: 1585


BaseRAGChain.stream の型契約に不整合:WeaveCallId が実装されていない

BaseRAGChain.stream の抽象メソッドは Generator[Context | AnswerToken | WeaveCallId, None, None] を返すと定義されていますが、RAGFusionRAGChain.streamWeaveCallId をyieldしていません。他のすべてのチェーン実装(naive、hyde、multi_query、rerank、route)は先頭で WeaveCallId をyieldしており、このチェーンだけが異なります。型チェッカでも引っかかり、利用側が一律処理できません。

WeaveCallId のインポート追加と、ストリームの最初でyieldを追加してください。

🤖 Prompt for AI Agents
In @app/advanced_rag/chains/rag_fusion.py around lines 78 - 80, The stream
method in RAGFusionRAGChain (decorated as @weave.op name="rag_fusion") violates
BaseRAGChain.stream's generator contract by not yielding a WeaveCallId first;
import WeaveCallId and modify RAGFusionRAGChain.stream to yield a WeaveCallId as
the first yielded value (matching other chains like
naive/hyde/multi_query/rerank/route) before emitting Context or AnswerToken so
the generator type becomes Generator[Context | AnswerToken | WeaveCallId, None,
None] and callers can uniformly handle the initial call id.

query_generation_prompt = _query_generation_prompt_template.format(
query_generation_prompt_text = query_generation_prompt.format(
question=question
)
model_with_structure = self.model.with_structured_output(QueryGenerationOutput)
query_generation_output: QueryGenerationOutput = model_with_structure.invoke(
query_generation_prompt
query_generation_prompt_text
) # type: ignore[assignment]

# 検索する
Expand All @@ -95,11 +75,11 @@ def stream(self, question: str) -> Generator[Context | AnswerToken, None, None]:
yield Context(documents=documents)

# 回答を生成して徐々に応答を返す
generate_answer_prompt = _generate_answer_prompt_template.format(
generate_answer_prompt_text = generate_answer_prompt.format(
context=documents,
question=question,
)
for chunk in self.model.stream(generate_answer_prompt):
for chunk in self.model.stream(generate_answer_prompt_text):
yield AnswerToken(token=chunk.content)


Expand Down
28 changes: 12 additions & 16 deletions app/advanced_rag/chains/rerank.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,17 @@
from typing import Generator, Sequence

import cohere
import weave
from langchain.embeddings import init_embeddings
from langchain_chroma import Chroma
from langchain_core.documents import Document
from langchain_core.language_models import BaseChatModel
from langsmith import traceable

from app.advanced_rag.chains.base import AnswerToken, BaseRAGChain, Context, reduce_fn
from app.advanced_rag.chains.base import AnswerToken, BaseRAGChain, Context, WeaveCallId
from app.prompts import generate_answer_prompt

_generate_answer_prompt_template = '''
以下の文脈だけを踏まえて質問に回答してください。

文脈: """
{context}
"""

質問: {question}
'''


@traceable
@weave.op
def _rerank(
question: str, documents: Sequence[Document], top_n: int
) -> Sequence[Document]:
Expand Down Expand Up @@ -57,8 +48,13 @@ def __init__(self, model: BaseChatModel):
)
self.retriever = vector_store.as_retriever(search_kwargs={"k": 20})

@traceable(name="rerank", reduce_fn=reduce_fn)
def stream(self, question: str) -> Generator[Context | AnswerToken, None, None]:
@weave.op(name="rerank")
def stream(
self, question: str
) -> Generator[Context | AnswerToken | WeaveCallId, None, None]:
current_call = weave.require_current_call()
yield WeaveCallId(weave_call_id=current_call.id)

# 検索する
retrieved_documents = self.retriever.invoke(question)
# リランクする
Expand All @@ -67,7 +63,7 @@ def stream(self, question: str) -> Generator[Context | AnswerToken, None, None]:
yield Context(documents=documents)

# 回答を生成して徐々に応答を返す
prompt = _generate_answer_prompt_template.format(
prompt = generate_answer_prompt.format(
context=documents,
question=question,
)
Expand Down
Loading