Fast search parallel#4
Conversation
Target: Reduce query time from 36-45s to 12-18s (60-65% faster) and ingestion from 100s to 3-5s (85-90% faster) ## Phase 1: Quick Wins (P0) - Configuration tuning: Reduced token limits (1500/800/2500), lowered confidence threshold (0.80), max cycles (2), increased initial_retrieval_k (5) - Query embedding caching: MD5-based cache prevents repeated API calls (saves 300-900ms/cycle) - Web crawl concurrency: Increased from 2 to 5 concurrent fetches (2.5x faster) - Parallel retrieval: DB and web search run simultaneously via asyncio.gather() - Circuit breaker: Skip cycles when first cycle hits 0.95 confidence (saves 12-20s for ~20% of queries) ## Phase 2: Medium-Term (P1) - Batch document insertion: Parallel inserts with batching (50 docs/batch, 10-50x faster ingestion) - Vector search caching: 5-min TTL cache for search results (saves 1-2s per follow-up) - Async web search: Migrated from requests to aiohttp for non-blocking I/O (saves 500ms-1s) - Parallel generation + evaluation: Start evaluation during streaming (saves 2-3s/cycle) ## Phase 3: Advanced (P2) - Speculative follow-up generation: Generate follow-ups in parallel with evaluation (saves 2-4s when needed) - Heuristic early stopping: Pattern-based confidence check during streaming to stop generation early ## Phase 4: Infrastructure - Enhanced memory cache: Increased to 500 entries, added 24hr TTL, track hit/miss rates - HTTP session reuse: Persistent aiohttp sessions (saves 50-100ms/call) Modified Files: - src/config/settings.py - Performance parameters - src/embeddings/github_embeddings.py - Query embedding cache - src/websearch/google_search.py - Async search + concurrency - src/vectorstore/surrealdb_store.py - Batch ops + caching - src/rag/reflexion_engine.py - Parallel execution + circuit breaker - src/reflexion/evaluator.py - Heuristic confidence prediction - src/memory/cache.py - Enhanced caching with TTL Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Return hit_rate as float instead of pre-formatted string to avoid ValueError when rag.py tries to format it again with :.2% Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
📝 WalkthroughWalkthroughThis PR introduces caching mechanisms at multiple layers (embeddings, vector search, memory), adds TTL and statistics to the memory cache, converts web search operations to async with configurable concurrency, implements parallel retrieval and evaluation in the Reflexion engine with early stopping, and adds a quick confidence prediction heuristic to the evaluator, while adjusting various token limits and configuration parameters. Changes
Sequence DiagramsequenceDiagram
participant Client
participant ReflexionEngine
participant DBSearch
participant WebSearch
participant Evaluator
participant Generator
Client->>ReflexionEngine: query
par Parallel Retrieval
ReflexionEngine->>DBSearch: similarity search (async)
ReflexionEngine->>WebSearch: web search (async)
and Preliminary Generation
ReflexionEngine->>Generator: start generation (async)
end
DBSearch-->>ReflexionEngine: results
WebSearch-->>ReflexionEngine: results
Note over ReflexionEngine: Combine & validate results
Generator-->>ReflexionEngine: partial content
par Early Evaluation
ReflexionEngine->>Evaluator: quick confidence check (partial answer)
ReflexionEngine->>Generator: continue/finalize (depends on confidence)
and Speculative Follow-up
ReflexionEngine->>Generator: prepare follow-up query (async)
end
alt High Confidence (Circuit Breaker)
Evaluator-->>ReflexionEngine: confidence >= threshold
ReflexionEngine->>ReflexionEngine: cancel speculative tasks
ReflexionEngine-->>Client: finalize answer
else Low Confidence
Evaluator-->>ReflexionEngine: confidence < threshold
ReflexionEngine->>ReflexionEngine: use speculative or generate follow-ups
ReflexionEngine->>ReflexionEngine: iterate reflexion cycles
ReflexionEngine-->>Client: answer after cycles
end
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Important Action Needed: IP Allowlist UpdateIf your organization protects your Git platform with IP whitelisting, please add the new CodeRabbit IP address to your allowlist:
Reviews will stop working after February 8, 2026 if the new IP is not added to your allowlist. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
src/vectorstore/surrealdb_store.py (1)
189-210:⚠️ Potential issue | 🟡 MinorVariable
queryis shadowed, causing the SQL query string to overwrite the method parameter.On line 204,
queryis reassigned to the SQL query string, which shadows thequeryparameter. This makes the cache key computation on line 190 correct, but Line 210 logs raw SQL instead of the original search query, and future maintenance could introduce bugs.🔧 Proposed fix to rename the SQL string variable
# Generate query embedding query_embedding = await self.embedding_function.embed_text(query) - query = f""" + search_query = f""" fn::similarity_search({query_embedding}, {k}); """ # Perform similarity search using SurrealDB query - results = await self.client.query(query) + results = await self.client.query(search_query) - logger.debug(f"Raw SurrealDB results: {results}") + logger.debug(f"Raw SurrealDB results for query '{query[:50]}...': {results}")src/memory/cache.py (1)
59-61:⚠️ Potential issue | 🟡 Minor
has()doesn't check TTL, may return stale results.The
has()method returnsTruefor entries that have expired but haven't been accessed viaget()yet. This could lead to incorrect behavior if callers usehas()to check existence before callingget().🔧 Proposed fix to add TTL check in has()
def has(self, query_hash: str) -> bool: - """Check if query hash exists in cache""" - return query_hash in self.cache + """Check if query hash exists in cache (respects TTL)""" + if query_hash not in self.cache: + return False + age = time.time() - self.access_times.get(query_hash, 0) + if age > self.cache_ttl: + # Expired - clean up + self.cache.pop(query_hash) + self.access_times.pop(query_hash, None) + return False + return Truesrc/rag/reflexion_engine.py (1)
277-328:⚠️ Potential issue | 🟡 MinorRemove unused
early_stoppedvariable.The
early_stoppedflag (line 281, 312) is assigned but never read. Either use it (e.g., in logging or metadata) or remove it to clean up dead code.🔧 Proposed fix
- eval_task = None - eval_started = False - early_stopped = False + eval_task = None + eval_started = FalseAnd either remove line 312 (
early_stopped = True) or use it:if quick_confidence >= 0.92: logger.info( - f"High confidence detected early ({quick_confidence:.2f}), stopping generation" + f"High confidence detected early ({quick_confidence:.2f}), stopping generation", + early_stopped=True, ) - early_stopped = True break
🤖 Fix all issues with AI agents
In `@src/embeddings/github_embeddings.py`:
- Around line 28-29: The instance-level _embedding_cache in class
GitHubEmbeddings grows unbounded; change it to a bounded LRU cache by replacing
the plain dict with an OrderedDict or an lru-style structure, add a constant
_MAX_CACHE_SIZE, and update the embed_text method to move accessed/inserted keys
to the end and evict the oldest entry when len(self._embedding_cache) >
self._MAX_CACHE_SIZE; mirror the eviction behavior used by ReflexionMemoryCache
(or use functools.lru_cache on a hashed text key) so the cache cannot grow
indefinitely.
In `@src/vectorstore/surrealdb_store.py`:
- Around line 99-128: The batch insert currently builds insert_tasks (variable
insert_tasks) by iterating with zip(documents, embeddings) and then calls
asyncio.gather(..., return_exceptions=True) but never inspects results, so
failures are silently ignored and doc_ids may contain IDs for documents that
failed to insert; update the loop to use zip(documents, embeddings, strict=True)
to catch length mismatches, then after awaiting each batch from asyncio.gather
inspect the returned list for Exception instances (or error payloads from
self.client.create), log each failure with context (document id and exception)
and remove or mark corresponding entries in doc_ids (or raise an AggregateError)
to ensure caller knows which inserts failed; keep batch_size, insert_tasks, and
self.client.create usage but add explicit result-checking, logging via
logger.debug/error, and appropriate error propagation or cleanup.
🧹 Nitpick comments (4)
src/embeddings/github_embeddings.py (1)
107-151: Consider caching individual texts inembed_documentsfor cache reuse.The batch embedding path doesn't leverage the cache. If the same text appears in both single queries and document batches, you could benefit from checking/populating the cache here too.
This is optional since batch operations may have different performance characteristics, but worth considering if you frequently re-embed the same documents.
src/rag/reflexion_engine.py (2)
303-313: Early stopping threshold of 0.92 during streaming is aggressive.The heuristic
predict_quick_confidenceis invoked mid-stream at 750+ characters with a 0.92 threshold. This could prematurely halt generation for answers that might have continued to address nuances.Consider whether 0.92 is the right threshold or if it should be configurable via settings.
398-420: Speculative follow-up may be wasteful when re-evaluation is needed.When the final answer is significantly longer than the early partial answer (line 403), you re-evaluate and speculatively generate follow-ups (line 415-418). However, if the re-evaluation yields high confidence, the speculative follow-up work is wasted.
The current approach is acceptable for latency optimization, but consider whether the speculative task should only start after knowing the evaluation decision.
src/websearch/google_search.py (1)
131-143: Session access could fail if_sessionis None despite_ensure_sessioncall.While
_ensure_sessionshould guarantee_sessionis not None, the type checker and defensive coding would benefit from an assertion or check. This is a minor concern since_ensure_sessionis called at line 118.🛡️ Optional defensive check
await self._ensure_session() + assert self._session is not None, "Session should be initialized" params = {
| # Instance-level cache for query embeddings (saves API calls in reflexion cycles) | ||
| self._embedding_cache: Dict[str, List[float]] = {} |
There was a problem hiding this comment.
Unbounded cache may cause memory issues in long-running processes.
The _embedding_cache dictionary grows indefinitely without any eviction policy or size limit. For a long-running service processing many unique queries, this could lead to memory exhaustion.
Consider adding a max size with LRU eviction (similar to ReflexionMemoryCache) or use functools.lru_cache on the text hash.
🔧 Proposed fix using a bounded cache
+from collections import OrderedDict
+
class GithubEmbeddings(EmbeddingInterface):
"""Azure AI Inference embeddings implementation"""
+
+ _MAX_CACHE_SIZE = 1000 # Limit cache to 1000 embeddings
def __init__(
self,
@@ -28,7 +31,7 @@ class GithubEmbeddings(EmbeddingInterface):
super().__init__()
# Instance-level cache for query embeddings (saves API calls in reflexion cycles)
- self._embedding_cache: Dict[str, List[float]] = {}
+ self._embedding_cache: OrderedDict[str, List[float]] = OrderedDict()Then in embed_text, after storing:
# Evict oldest if cache is full
if len(self._embedding_cache) > self._MAX_CACHE_SIZE:
self._embedding_cache.popitem(last=False)🤖 Prompt for AI Agents
In `@src/embeddings/github_embeddings.py` around lines 28 - 29, The instance-level
_embedding_cache in class GitHubEmbeddings grows unbounded; change it to a
bounded LRU cache by replacing the plain dict with an OrderedDict or an
lru-style structure, add a constant _MAX_CACHE_SIZE, and update the embed_text
method to move accessed/inserted keys to the end and evict the oldest entry when
len(self._embedding_cache) > self._MAX_CACHE_SIZE; mirror the eviction behavior
used by ReflexionMemoryCache (or use functools.lru_cache on a hashed text key)
so the cache cannot grow indefinitely.
| # PARALLEL BATCH INSERT - Create all insert tasks first | ||
| insert_tasks = [] | ||
| for doc, embedding in zip(documents, embeddings): | ||
| doc_id = doc.doc_id or str(uuid.uuid4()) | ||
| doc_ids.append(doc_id) | ||
|
|
||
| # Sanitize metadata | ||
| clean_metadata = self._sanitize_metadata(doc.metadata) | ||
|
|
||
| # Create document in SurrealDB | ||
| await self.client.create( | ||
| "documents", | ||
| { | ||
| "id": doc_id, | ||
| "content": doc.content, | ||
| "metadata": clean_metadata, | ||
| "embedding": embedding, | ||
| }, | ||
| # Create insert task (don't await yet) | ||
| insert_tasks.append( | ||
| self.client.create( | ||
| "documents", | ||
| { | ||
| "id": doc_id, | ||
| "content": doc.content, | ||
| "metadata": clean_metadata, | ||
| "embedding": embedding, | ||
| }, | ||
| ) | ||
| ) | ||
|
|
||
| doc_ids.append(doc_id) | ||
| # Execute all inserts in parallel with batching (50 at a time) | ||
| batch_size = 50 | ||
| for i in range(0, len(insert_tasks), batch_size): | ||
| batch = insert_tasks[i : i + batch_size] | ||
| await asyncio.gather(*batch, return_exceptions=True) | ||
| logger.debug( | ||
| f"Inserted batch {i // batch_size + 1}/{(len(insert_tasks) + batch_size - 1) // batch_size}" | ||
| ) |
There was a problem hiding this comment.
Batch insert silently ignores failures; consider validating or logging exceptions.
Using return_exceptions=True prevents propagation but the returned exceptions are never inspected. Failed inserts will be silently ignored, potentially leading to data inconsistency where doc_ids contains IDs for documents that were never actually inserted.
Also, zip(documents, embeddings) should use strict=True to catch length mismatches early.
🔧 Proposed fix to handle batch insert failures
- # Execute all inserts in parallel with batching (50 at a time)
- batch_size = 50
- for i in range(0, len(insert_tasks), batch_size):
- batch = insert_tasks[i : i + batch_size]
- await asyncio.gather(*batch, return_exceptions=True)
- logger.debug(
- f"Inserted batch {i // batch_size + 1}/{(len(insert_tasks) + batch_size - 1) // batch_size}"
- )
+ # Execute all inserts in parallel with batching (50 at a time)
+ batch_size = 50
+ failed_count = 0
+ for i in range(0, len(insert_tasks), batch_size):
+ batch = insert_tasks[i : i + batch_size]
+ results = await asyncio.gather(*batch, return_exceptions=True)
+ for r in results:
+ if isinstance(r, Exception):
+ failed_count += 1
+ logger.warning(f"Document insert failed: {r}")
+ logger.debug(
+ f"Inserted batch {i // batch_size + 1}/{(len(insert_tasks) + batch_size - 1) // batch_size}"
+ )
+ if failed_count > 0:
+ logger.warning(f"{failed_count}/{len(insert_tasks)} document inserts failed")Also add strict=True to the zip:
- for doc, embedding in zip(documents, embeddings):
+ for doc, embedding in zip(documents, embeddings, strict=True):📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # PARALLEL BATCH INSERT - Create all insert tasks first | |
| insert_tasks = [] | |
| for doc, embedding in zip(documents, embeddings): | |
| doc_id = doc.doc_id or str(uuid.uuid4()) | |
| doc_ids.append(doc_id) | |
| # Sanitize metadata | |
| clean_metadata = self._sanitize_metadata(doc.metadata) | |
| # Create document in SurrealDB | |
| await self.client.create( | |
| "documents", | |
| { | |
| "id": doc_id, | |
| "content": doc.content, | |
| "metadata": clean_metadata, | |
| "embedding": embedding, | |
| }, | |
| # Create insert task (don't await yet) | |
| insert_tasks.append( | |
| self.client.create( | |
| "documents", | |
| { | |
| "id": doc_id, | |
| "content": doc.content, | |
| "metadata": clean_metadata, | |
| "embedding": embedding, | |
| }, | |
| ) | |
| ) | |
| doc_ids.append(doc_id) | |
| # Execute all inserts in parallel with batching (50 at a time) | |
| batch_size = 50 | |
| for i in range(0, len(insert_tasks), batch_size): | |
| batch = insert_tasks[i : i + batch_size] | |
| await asyncio.gather(*batch, return_exceptions=True) | |
| logger.debug( | |
| f"Inserted batch {i // batch_size + 1}/{(len(insert_tasks) + batch_size - 1) // batch_size}" | |
| ) | |
| # PARALLEL BATCH INSERT - Create all insert tasks first | |
| insert_tasks = [] | |
| for doc, embedding in zip(documents, embeddings, strict=True): | |
| doc_id = doc.doc_id or str(uuid.uuid4()) | |
| doc_ids.append(doc_id) | |
| # Sanitize metadata | |
| clean_metadata = self._sanitize_metadata(doc.metadata) | |
| # Create insert task (don't await yet) | |
| insert_tasks.append( | |
| self.client.create( | |
| "documents", | |
| { | |
| "id": doc_id, | |
| "content": doc.content, | |
| "metadata": clean_metadata, | |
| "embedding": embedding, | |
| }, | |
| ) | |
| ) | |
| # Execute all inserts in parallel with batching (50 at a time) | |
| batch_size = 50 | |
| failed_count = 0 | |
| for i in range(0, len(insert_tasks), batch_size): | |
| batch = insert_tasks[i : i + batch_size] | |
| results = await asyncio.gather(*batch, return_exceptions=True) | |
| for r in results: | |
| if isinstance(r, Exception): | |
| failed_count += 1 | |
| logger.warning(f"Document insert failed: {r}") | |
| logger.debug( | |
| f"Inserted batch {i // batch_size + 1}/{(len(insert_tasks) + batch_size - 1) // batch_size}" | |
| ) | |
| if failed_count > 0: | |
| logger.warning(f"{failed_count}/{len(insert_tasks)} document inserts failed") |
🧰 Tools
🪛 Ruff (0.14.14)
[warning] 101-101: zip() without an explicit strict= parameter
Add explicit value for parameter strict=
(B905)
🤖 Prompt for AI Agents
In `@src/vectorstore/surrealdb_store.py` around lines 99 - 128, The batch insert
currently builds insert_tasks (variable insert_tasks) by iterating with
zip(documents, embeddings) and then calls asyncio.gather(...,
return_exceptions=True) but never inspects results, so failures are silently
ignored and doc_ids may contain IDs for documents that failed to insert; update
the loop to use zip(documents, embeddings, strict=True) to catch length
mismatches, then after awaiting each batch from asyncio.gather inspect the
returned list for Exception instances (or error payloads from
self.client.create), log each failure with context (document id and exception)
and remove or mark corresponding entries in doc_ids (or raise an AggregateError)
to ensure caller knows which inserts failed; keep batch_size, insert_tasks, and
self.client.create usage but add explicit result-checking, logging via
logger.debug/error, and appropriate error propagation or cleanup.
There was a problem hiding this comment.
Pull request overview
This PR introduces performance-oriented improvements across web search, vector storage, reflexion evaluation, caching, embeddings, and configuration defaults to make the reflexion engine faster and more adaptive.
Changes:
- Migrates Google web search to an async
aiohttpclient and increases crawl concurrency, enabling parallel URL processing. - Adds vector search and embedding-level caching, plus TTL- and stats-aware reflexion memory caching, to avoid redundant work and improve observability.
- Enhances the reflexion engine with parallel DB/web retrieval, heuristic early-stopping, speculative evaluation/follow-up generation, and updates configuration defaults for faster, shorter reflexion cycles.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
src/websearch/google_search.py |
Switches Google Custom Search HTTP client from requests to async aiohttp, adds reusable async session management, and makes content extraction concurrency configurable via settings.web_crawl_concurrency. |
src/vectorstore/surrealdb_store.py |
Adds a TTL-based cache for similarity search results and parallel/batched document inserts into SurrealDB to reduce latency and improve throughput. |
src/reflexion/evaluator.py |
Introduces predict_quick_confidence, a heuristic confidence estimator used to support generation early-stopping without additional LLM calls. |
src/rag/reflexion_engine.py |
Implements parallel DB + web retrieval, streaming-time evaluation kickoff, heuristic early stopping, speculative follow-up generation, and a first-cycle high-confidence circuit breaker to reduce unnecessary reflexion cycles. |
src/memory/cache.py |
Extends ReflexionMemoryCache with TTL-based eviction, hit/miss counting, hit-rate calculation, and expanded default capacity. |
src/embeddings/github_embeddings.py |
Adds an instance-level embedding cache keyed by a hash of the input text to avoid repeated embedding calls for identical queries. |
src/config/settings.py |
Tunes default LLM token limits, reflexion cycle and confidence defaults, retrieval counts, and introduces web_crawl_concurrency for web search throughput control. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| batch_size = 50 | ||
| for i in range(0, len(insert_tasks), batch_size): | ||
| batch = insert_tasks[i : i + batch_size] | ||
| await asyncio.gather(*batch, return_exceptions=True) |
There was a problem hiding this comment.
asyncio.gather is called with return_exceptions=True but the returned results are never inspected, so failed inserts are silently ignored while doc_ids is still populated and the log reports all documents as added. This can lead to callers believing documents were persisted when some create operations actually failed; consider checking the gathered results for Exception instances and either raising a VectorStoreException or at least logging and excluding failed IDs from the return value.
| await asyncio.gather(*batch, return_exceptions=True) | |
| await asyncio.gather(*batch) |
| # Check cache first (saves 1-2 seconds per follow-up cycle) | ||
| cache_key = hashlib.md5(f"{query}:{k}".encode()).hexdigest() | ||
|
|
||
| if cache_key in self._search_cache: | ||
| results, timestamp = self._search_cache[cache_key] | ||
| if time.time() - timestamp < self._cache_ttl: | ||
| logger.debug("Using cached vector search results", query_length=len(query)) | ||
| return results[:k] # Return requested k | ||
| else: | ||
| # Cache expired, remove it | ||
| del self._search_cache[cache_key] | ||
|
|
There was a problem hiding this comment.
The _search_cache dictionary is only bounded by TTL on individual entries and entries are only expired on cache hits; any key that is used once and never queried again will remain in memory indefinitely, causing unbounded growth in long-running processes. To avoid a slow memory leak, consider adding a maximum cache size and/or periodic cleanup of stale entries (e.g., LRU with size cap or background pruning).
| @@ -66,7 +70,14 @@ def _extract_embedding(self, embedding_data) -> List[float]: | |||
| ) | |||
|
|
|||
| async def embed_text(self, text: str) -> List[float]: | |||
| """Embed single text using Azure AI Inference""" | |||
| """Embed single text using Azure AI Inference with caching""" | |||
| # Check cache first (saves 300-900ms per repeated query) | |||
| cache_key = hashlib.md5(text.encode()).hexdigest() | |||
|
|
|||
| if cache_key in self._embedding_cache: | |||
| logger.debug("Using cached query embedding", text_length=len(text)) | |||
| return self._embedding_cache[cache_key] | |||
|
|
|||
| try: | |||
| response = self.client.embed(input=[text]) | |||
|
|
|||
| @@ -77,6 +88,9 @@ async def embed_text(self, text: str) -> List[float]: | |||
| raw_embedding = response.data[0].embedding | |||
| embedding = self._extract_embedding(raw_embedding) | |||
|
|
|||
| # Cache the embedding for future use | |||
| self._embedding_cache[cache_key] = embedding | |||
|
|
|||
There was a problem hiding this comment.
The _embedding_cache grows without bound and has no TTL or size limit, so a long-running service that embeds many distinct texts can accumulate an unbounded number of entries and steadily increase memory usage. It would be safer to enforce an eviction policy (e.g., fixed max size LRU) or a TTL, and optionally provide a way to clear the cache when needed.
| def __init__(self, max_size: Optional[int] = None): | ||
| self.max_size = max_size or settings.max_cache_size | ||
| def __init__(self, max_size: Optional[int] = None, cache_ttl: int = 86400): | ||
| self.max_size = max_size or 500 # Increased from 100 to 500 |
There was a problem hiding this comment.
ReflexionMemoryCache.__init__ now hardcodes max_size to 500 instead of using settings.max_cache_size, while settings.max_cache_size is still defined and used as the documented configuration knob. This disconnect means changing the setting no longer affects the cache size unless callers explicitly pass max_size, which can cause confusion; consider either reverting to settings.max_cache_size as the default or updating the setting itself and removing the magic number here.
| self.max_size = max_size or 500 # Increased from 100 to 500 | |
| self.max_size = max_size or settings.max_cache_size |
| logger.info( | ||
| f"High confidence detected early ({quick_confidence:.2f}), stopping generation" | ||
| ) | ||
| early_stopped = True |
There was a problem hiding this comment.
The early_stopped flag is set when heuristic early stopping triggers but is never read afterward, so it has no effect on the subsequent evaluation or control flow. This looks like leftover state from a previous design; either wire it into the later decision logic (e.g., to skip continuation heuristics) or remove the variable and related assignments to avoid confusion.
| early_stopped = True |
| try: | ||
| await followup_task | ||
| except asyncio.CancelledError: | ||
| pass |
There was a problem hiding this comment.
'except' clause does nothing but pass and there is no explanatory comment.
| pass | |
| logger.debug( | |
| "Speculative followup task was cancelled as expected", | |
| cycle=cycle_number, | |
| ) |
| try: | ||
| await followup_task | ||
| except asyncio.CancelledError: | ||
| pass |
There was a problem hiding this comment.
'except' clause does nothing but pass and there is no explanatory comment.
| pass | |
| # Speculative follow-up task was cancelled intentionally; safe to ignore. | |
| logger.debug( | |
| "Speculative follow-up task cancelled", | |
| cycle=cycle_number, | |
| ) |
This pull request introduces a series of optimizations and feature improvements across the reflexion engine, caching, embeddings, and configuration settings. The main goals are to improve performance (via parallelism and caching), reduce latency, and make the reflexion process more adaptive and efficient. Notably, parallel execution is now used for retrieval and evaluation, a heuristic confidence check enables early stopping, and caching is enhanced for both embeddings and memory. Several configuration defaults are also tuned for faster, more responsive operation.
Performance Optimizations and Parallelism
asyncio, significantly reducing wait times for combined results. Evaluation and follow-up query generation are also executed speculatively in parallel, minimizing idle time and enabling faster reflexion cycles. [1] [2] [3] [4] [5] [6]Heuristic Early Stopping and Confidence Estimation
predict_quick_confidencemethod uses heuristics to estimate the confidence of a partial answer, allowing for early stopping of generation if high confidence is detected, especially in the first reflexion cycle. This reduces unnecessary computation and speeds up high-confidence answers. [1] [2]Caching Improvements
Configuration and Default Settings
Other Technical Improvements
These changes collectively make the reflexion engine faster, more adaptive, and more efficient in both resource usage and response time.
Summary by CodeRabbit
New Features
Performance Improvements
Configuration Updates