-
Notifications
You must be signed in to change notification settings - Fork 417
feat: shards and replicas #1735
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
|
|
||
| import copy | ||
| import json | ||
| import os | ||
| import uuid | ||
| from concurrent.futures import ThreadPoolExecutor, as_completed | ||
| from typing import Any | ||
|
|
@@ -29,6 +30,18 @@ | |
| REQUEST_TIMEOUT = 60 | ||
| MAX_RETRIES = 5 | ||
|
|
||
|
|
||
| def _get_min_env_int(key: str, default: int, minimum: int) -> int: | ||
| try: | ||
| value = int(os.getenv(key, default)) | ||
| except (TypeError, ValueError): | ||
| value = default | ||
| return max(value, minimum) | ||
|
|
||
|
|
||
| OPENSEARCH_NUMBER_OF_SHARDS = _get_min_env_int("OPENRAG_OPENSEARCH_NUMBER_OF_SHARDS", 1, 1) | ||
| OPENSEARCH_NUMBER_OF_REPLICAS = _get_min_env_int("OPENRAG_OPENSEARCH_NUMBER_OF_REPLICAS", 0, 0) | ||
|
Comment on lines
+34
to
+43
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Import shard/replica constants from config/settings.py instead of reading environment variables directly. This code violates the coding guideline which states: "Config values must come from config/settings.py (the only place os.environ is read); never access os.environ elsewhere in the codebase." The ♻️ Proposed fix to import from config/settings.pyRemove the duplicated helper and constant definitions: -import os
-
-
-def _get_min_env_int(key: str, default: int, minimum: int) -> int:
- try:
- value = int(os.getenv(key, default))
- except (TypeError, ValueError):
- value = default
- return max(value, minimum)
-
-
-OPENSEARCH_NUMBER_OF_SHARDS = _get_min_env_int("OPENRAG_OPENSEARCH_NUMBER_OF_SHARDS", 1, 1)
-OPENSEARCH_NUMBER_OF_REPLICAS = _get_min_env_int("OPENRAG_OPENSEARCH_NUMBER_OF_REPLICAS", 0, 0)Add to the imports section near line 15 (where other config imports exist): from config.embedding_constants import OPENAI_DEFAULT_EMBEDDING_MODEL
+from config.settings import OPENSEARCH_NUMBER_OF_SHARDS, OPENSEARCH_NUMBER_OF_REPLICASAs per coding guidelines, config values must come from config/settings.py (the only place os.environ is read); never access os.environ elsewhere in the codebase. 🤖 Prompt for AI Agents |
||
|
|
||
| # watsonx.ai surfaces rate-limit state via these (mostly non-standard) response | ||
| # headers. The IBM SDK acts on the x-requests-limit-* family directly; we log | ||
| # them on a failed embedding call to aid plan/region tuning. | ||
|
|
@@ -53,7 +66,9 @@ def _log_watsonx_rate_limit_headers(error: Exception) -> None: | |
| if not headers: | ||
| return | ||
| status = getattr(response, "status_code", "unknown") | ||
| observed = {h: headers.get(h) for h in _WATSONX_RATE_LIMIT_HEADERS if headers.get(h) is not None} | ||
| observed = { | ||
| h: headers.get(h) for h in _WATSONX_RATE_LIMIT_HEADERS if headers.get(h) is not None | ||
| } | ||
| if str(status) == "429" or observed: | ||
| logger.warning(f"watsonx rate-limit response (status={status}): {observed}") | ||
| except Exception as log_error: # never let diagnostics mask the real error | ||
|
|
@@ -371,7 +386,7 @@ class OpenSearchVectorStoreComponentMultimodalMultiEmbedding(LCVectorStoreCompon | |
| "Valid JSON Web Token for authentication. " | ||
| "Will be sent in the Authorization header (with optional 'Bearer ' prefix)." | ||
| ), | ||
| required=False | ||
| required=False, | ||
| ), | ||
| StrInput( | ||
| name="jwt_header", | ||
|
|
@@ -536,10 +551,8 @@ def raw_search(self, query: str | dict | None = None) -> Data: | |
|
|
||
| # Apply score_threshold / scoreThreshold as min_score if not already set | ||
| if "min_score" not in query_body: | ||
|
|
||
| score_threshold = self._resolve_score_threshold(filter_obj) | ||
| if score_threshold is not None: | ||
|
|
||
| query_body["min_score"] = score_threshold | ||
|
|
||
| client = self.build_client() | ||
|
|
@@ -664,7 +677,11 @@ def _default_text_mapping( | |
| Dictionary containing OpenSearch index mapping configuration | ||
| """ | ||
| return { | ||
| "settings": {"index": {"knn": True, "knn.algo_param.ef_search": ef_search}}, | ||
| "settings": { | ||
| "index": {"knn": True, "knn.algo_param.ef_search": ef_search}, | ||
| "number_of_shards": OPENSEARCH_NUMBER_OF_SHARDS, | ||
| "number_of_replicas": OPENSEARCH_NUMBER_OF_REPLICAS, | ||
| }, | ||
| "mappings": { | ||
| "properties": { | ||
| vector_field: { | ||
|
|
@@ -1446,7 +1463,6 @@ def _add_documents_to_vector_store(self, client: OpenSearch) -> None: | |
| logger.debug(f"Is IBM/watsonx embedding: {is_ibm}") | ||
|
|
||
| if is_ibm: | ||
|
|
||
| # Hand the full batch to the SDK and let it batch/throttle/retry. | ||
| # Retry attempts and base backoff are tunable via the SDK's own | ||
| # WATSONX_MAX_RETRIES / WATSONX_DELAY_TIME environment variables. | ||
|
|
@@ -1722,7 +1738,6 @@ def _coerce_filter_clauses(self, filter_obj: dict | None) -> list[dict]: | |
| context_clauses.append({"terms": {field: values}}) | ||
| return context_clauses | ||
|
|
||
|
|
||
| def _parse_filter_expression(self) -> dict | None: | ||
| """Parse and validate optional filter_expression JSON. | ||
|
|
||
|
|
@@ -1777,8 +1792,9 @@ def _resolve_score_threshold(self, filter_obj: dict | None) -> float | None: | |
| return None | ||
| return float(score_threshold) | ||
|
|
||
| def _detect_available_models(self, client: OpenSearch, filter_clauses: list[dict] | None = None) -> list[str]: | ||
|
|
||
| def _detect_available_models( | ||
| self, client: OpenSearch, filter_clauses: list[dict] | None = None | ||
| ) -> list[str]: | ||
| """Detect which embedding models have documents in the index. | ||
|
|
||
| Uses aggregation to find all unique embedding_model values, optionally | ||
|
|
@@ -2401,7 +2417,6 @@ def search(self, query: str | None = None) -> list[dict[str, Any]]: | |
| ] | ||
|
|
||
| def search_documents(self) -> Table: | ||
|
|
||
| """Search documents and return results as a Table. | ||
|
|
||
| This is the main interface method that performs the multi-model search using the | ||
|
|
||
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Large diffs are not rendered by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove unused import.
The
osmodule is imported but should not be used in this file per coding guidelines. Configuration values should be imported fromconfig/settings.pyinstead.As per coding guidelines, config values must come from config/settings.py (the only place os.environ is read); never access os.environ elsewhere in the codebase.
🤖 Prompt for AI Agents