diff --git a/docker-compose.yml b/docker-compose.yml index 16f8d48d7..7c63f8a15 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -196,12 +196,13 @@ services: - OPENSEARCH_URL=https://${LANGFLOW_OPENSEARCH_HOST:-${OPENSEARCH_HOST:-opensearch}}:${LANGFLOW_OPENSEARCH_PORT:-${OPENSEARCH_PORT:-9200}} - OPENSEARCH_INDEX_NAME=${OPENSEARCH_INDEX_NAME:-documents} - DOCLING_SERVE_URL=${DOCLING_SERVE_URL:-http://host.docker.internal:5001} + - DOCLING_SERVE_VERIFY_SSL=${DOCLING_SERVE_VERIFY_SSL:-true} - DOCLING_TASK_ID=None - FILENAME=None - MIMETYPE=None - FILESIZE=0 - SELECTED_EMBEDDING_MODEL=${SELECTED_EMBEDDING_MODEL:-text-embedding-3-small} - - LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER,OPENSEARCH_PASSWORD,OPENSEARCH_URL,DOCLING_SERVE_URL,DOCLING_TASK_ID,OWNER,OWNER_NAME,OWNER_EMAIL,CONNECTOR_TYPE,DOCUMENT_ID,SOURCE_URL,ALLOWED_USERS,ALLOWED_GROUPS,ALLOWED_PRINCIPALS,FILENAME,MIMETYPE,FILESIZE,SELECTED_EMBEDDING_MODEL,OPENAI_API_KEY,ANTHROPIC_API_KEY,WATSONX_APIKEY,WATSONX_URL,WATSONX_PROJECT_ID,OLLAMA_BASE_URL,OPENSEARCH_INDEX_NAME,OPENRAG_INGEST_URL,OPENRAG_INGEST_TOKEN,OPENRAG_INGEST_RUN_ID,OPENRAG_INGEST_BATCH_SIZE + - LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT=JWT,OPENRAG-QUERY-FILTER,OPENSEARCH_PASSWORD,OPENSEARCH_URL,DOCLING_SERVE_URL,DOCLING_SERVE_VERIFY_SSL,DOCLING_TASK_ID,OWNER,OWNER_NAME,OWNER_EMAIL,CONNECTOR_TYPE,DOCUMENT_ID,SOURCE_URL,ALLOWED_USERS,ALLOWED_GROUPS,ALLOWED_PRINCIPALS,FILENAME,MIMETYPE,FILESIZE,SELECTED_EMBEDDING_MODEL,OPENAI_API_KEY,ANTHROPIC_API_KEY,WATSONX_APIKEY,WATSONX_URL,WATSONX_PROJECT_ID,OLLAMA_BASE_URL,OPENSEARCH_INDEX_NAME,OPENRAG_INGEST_URL,OPENRAG_INGEST_TOKEN,OPENRAG_INGEST_RUN_ID,OPENRAG_INGEST_BATCH_SIZE - LANGFLOW_LOG_LEVEL=DEBUG - LANGFLOW_WORKERS=${LANGFLOW_WORKERS:-1} - LANGFLOW_AUTO_LOGIN=${LANGFLOW_AUTO_LOGIN} diff --git a/flows/components/docling_remote.py b/flows/components/docling_remote.py index a53699f1e..630a389f2 100644 --- a/flows/components/docling_remote.py +++ b/flows/components/docling_remote.py @@ -133,6 +133,15 @@ class DoclingRemoteComponent(BaseFileComponent): ), input_types=["Message"], ), + StrInput( + name="verify_ssl", + display_name="Verify SSL", + info="Whether to verify SSL certificates for Docling Serve.", + value="DOCLING_SERVE_VERIFY_SSL", + load_from_db=True, + required=False, + advanced=True, + ), ] outputs = [ @@ -281,6 +290,19 @@ def _poll_and_fetch_result( self.log(f"Error validating the document. {e}") return None + def _get_verify_ssl(self) -> bool: + """Determine whether to verify SSL certificates for Docling Serve. + + Returns: + bool: True if SSL verification should be enforced, False otherwise. + """ + verify = getattr(self, "verify_ssl", "true") + if isinstance(verify, bool): + return verify + if isinstance(verify, str): + return verify.lower() in ("true", "1", "yes") + return True + def _process_task_id(self) -> list[Data]: """Process an existing task by polling for status and retrieving results. @@ -290,7 +312,7 @@ def _process_task_id(self) -> list[Data]: transformed_url = transform_localhost_url(self.api_url) base_url = f"{transformed_url}/v1" - with httpx.Client(headers=self._process_headers()) as client: + with httpx.Client(headers=self._process_headers(), verify=self._get_verify_ssl()) as client: result = self._poll_and_fetch_result(client, base_url, self.task_id) return [result] if result else [] @@ -335,7 +357,7 @@ def _convert_document( processed_data: list[Data | None] = [] with ( - httpx.Client(headers=self._process_headers()) as client, + httpx.Client(headers=self._process_headers(), verify=self._get_verify_ssl()) as client, ThreadPoolExecutor(max_workers=self.max_concurrency) as executor, ): futures: list[tuple[int, Future]] = [] diff --git a/flows/ingestion_flow.json b/flows/ingestion_flow.json index e48ed7be1..cd9e9d72e 100644 --- a/flows/ingestion_flow.json +++ b/flows/ingestion_flow.json @@ -575,7 +575,8 @@ "max_concurrency", "max_poll_timeout", "api_headers", - "docling_serve_opts" + "docling_serve_opts", + "verify_ssl" ], "frozen": false, "icon": "Docling", @@ -735,7 +736,7 @@ "show": true, "title_case": false, "type": "code", - "value": "from __future__ import annotations\n\nimport base64\nimport json\nimport time\nfrom concurrent.futures import Future, ThreadPoolExecutor\nfrom pathlib import Path # noqa: TC003\nfrom typing import Any\n\nimport httpx\nfrom docling_core.types.doc import DoclingDocument\nfrom pydantic import ValidationError\n\nfrom lfx.base.data import BaseFileComponent\nfrom lfx.inputs import IntInput, NestedDictInput, StrInput, TableInput\nfrom lfx.inputs.inputs import FloatInput\nfrom lfx.schema import Data, dotdict\nfrom lfx.utils.util import transform_localhost_url\n\n\nclass DoclingRemoteComponent(BaseFileComponent):\n display_name = \"Docling Serve\"\n description = (\n \"Uses Docling to process input documents connecting to your instance of Docling Serve.\"\n )\n documentation = \"https://docling-project.github.io/docling/\"\n trace_type = \"tool\"\n icon = \"Docling\"\n name = \"DoclingRemote\"\n\n MAX_500_RETRIES = 5\n\n # https://docling-project.github.io/docling/usage/supported_formats/\n VALID_EXTENSIONS = [\n \"adoc\",\n \"asciidoc\",\n \"asc\",\n \"bmp\",\n \"csv\",\n \"dotx\",\n \"dotm\",\n \"docm\",\n \"docx\",\n \"htm\",\n \"html\",\n \"jpeg\",\n \"jpg\",\n \"json\",\n \"md\",\n \"pdf\",\n \"png\",\n \"potx\",\n \"ppsx\",\n \"pptm\",\n \"potm\",\n \"ppsm\",\n \"pptx\",\n \"tiff\",\n \"txt\",\n \"xls\",\n \"xlsx\",\n \"xhtml\",\n \"xml\",\n \"webp\",\n ]\n\n inputs = [\n *BaseFileComponent.get_base_inputs(),\n StrInput(\n name=\"api_url\",\n display_name=\"Server address\",\n info=\"URL of the Docling Serve instance.\",\n required=True,\n ),\n StrInput(\n name=\"task_id\",\n display_name=\"Task ID\",\n info=(\n \"Optional task ID from a previous Docling Serve upload. \"\n \"If provided, file input is ignored and the component polls for this task's results.\"\n ),\n required=False,\n ),\n IntInput(\n name=\"max_concurrency\",\n display_name=\"Concurrency\",\n info=\"Maximum number of concurrent requests for the server.\",\n advanced=True,\n value=2,\n input_types=[\"Message\"],\n ),\n FloatInput(\n name=\"max_poll_timeout\",\n display_name=\"Maximum poll time\",\n info=\"Maximum waiting time for the document conversion to complete.\",\n advanced=True,\n value=3600,\n input_types=[\"Message\"],\n ),\n TableInput(\n name=\"api_headers\",\n display_name=\"HTTP headers\",\n advanced=True,\n required=False,\n info=(\"Optional headers required for connecting to Docling Serve.\"),\n table_schema=[\n {\n \"name\": \"key\",\n \"display_name\": \"Key\",\n \"type\": \"string\",\n \"description\": \"Key name\",\n },\n {\n \"name\": \"value\",\n \"display_name\": \"Value\",\n \"load_from_db\": True,\n \"type\": \"string\",\n \"description\": \"Value of the header\",\n },\n ],\n value=[],\n real_time_refresh=True,\n input_types=[\"Data\", \"JSON\"],\n ),\n NestedDictInput(\n name=\"docling_serve_opts\",\n display_name=\"Docling options\",\n advanced=True,\n required=False,\n info=(\n \"Optional dictionary of additional options. \"\n \"See https://github.com/docling-project/docling-serve/blob/main/docs/usage.md for more information.\"\n ),\n input_types=[\"Message\"],\n ),\n ]\n\n outputs = [\n *BaseFileComponent.get_base_outputs(),\n ]\n\n def _process_headers(self) -> dict[str, str]:\n \"\"\"Process the headers input into a valid dictionary.\"\"\"\n if not self.api_headers:\n return {}\n\n component_headers_dict = {}\n # TableInput normalizes to list\n items = self.api_headers if isinstance(self.api_headers, list) else [self.api_headers]\n\n for item in items:\n if not item:\n continue\n\n # Case 1: Data object\n if hasattr(item, \"data\") and isinstance(item.data, dict):\n data = item.data\n if \"key\" in data and \"value\" in data:\n component_headers_dict[str(data[\"key\"])] = str(data[\"value\"])\n else:\n # Fallback: merge all keys from Data object\n for k, v in data.items():\n if k not in (\"text_key\", \"default_value\"):\n component_headers_dict[str(k)] = str(v)\n\n # Case 2: Dictionary (Table row)\n elif isinstance(item, dict):\n if \"key\" in item and \"value\" in item:\n component_headers_dict[str(item[\"key\"])] = str(item[\"value\"])\n else:\n # Fallback: merge all keys\n for k, v in item.items():\n component_headers_dict[str(k)] = str(v)\n\n # Case 3: Message object\n elif hasattr(item, \"text\") and isinstance(item.text, str):\n try:\n parsed = json.loads(item.text)\n if isinstance(parsed, dict):\n for k, v in parsed.items():\n component_headers_dict[str(k)] = str(v)\n except json.JSONDecodeError:\n pass\n\n return component_headers_dict\n\n def update_build_config(\n self, build_config: dotdict, field_value: Any, field_name: str | None = None\n ) -> dotdict:\n if field_name == \"api_headers\":\n if isinstance(field_value, dict):\n # If it's a dict, convert to list of {key, value} pairs for TableInput\n # This handles migration from NestedDictInput to TableInput\n new_value = [{\"key\": k, \"value\": v} for k, v in field_value.items()]\n build_config[\"api_headers\"][\"value\"] = new_value\n return build_config\n if field_value is None:\n build_config[\"api_headers\"][\"value\"] = []\n return build_config\n\n # Default behavior\n return super().update_build_config(build_config, field_value, field_name)\n\n def _poll_and_fetch_result(\n self, client: httpx.Client, base_url: str, task_id: str, file_path: str | None = None\n ) -> Data | None:\n \"\"\"Poll for task completion and fetch the result.\n\n Args:\n client: The HTTP client to use for requests.\n base_url: The base URL of the Docling Serve API.\n task_id: The task ID to poll for.\n file_path: Optional file path to include in the result data.\n\n Returns:\n Data object with the DoclingDocument, or None if processing failed.\n \"\"\"\n http_failures = 0\n retry_status_start = 500\n retry_status_end = 600\n start_wait_time = time.monotonic()\n\n response = client.get(f\"{base_url}/status/poll/{task_id}\")\n response.raise_for_status()\n task = response.json()\n\n while task[\"task_status\"] not in (\"success\", \"failure\"):\n processing_time = time.monotonic() - start_wait_time\n if processing_time >= self.max_poll_timeout:\n msg = (\n f\"Processing time {processing_time=} exceeds the maximum poll timeout {self.max_poll_timeout=}.\"\n \"Please increase the max_poll_timeout parameter or review why the processing \"\n \"takes long on the server.\"\n )\n self.log(msg)\n raise RuntimeError(msg)\n\n time.sleep(2)\n response = client.get(f\"{base_url}/status/poll/{task_id}\")\n\n if retry_status_start <= response.status_code < retry_status_end:\n http_failures += 1\n if http_failures > self.MAX_500_RETRIES:\n self.log(\n f\"The status requests got a http response {response.status_code} too many times.\"\n )\n return None\n continue\n\n task = response.json()\n\n result_resp = client.get(f\"{base_url}/result/{task_id}\")\n result_resp.raise_for_status()\n result = result_resp.json()\n\n if result.get(\"status\") == \"failure\" or result.get(\"errors\"):\n errors = result.get(\"errors\", [])\n err_msg_list = []\n for err in errors:\n if isinstance(err, dict) and \"error_message\" in err:\n err_msg_list.append(err[\"error_message\"])\n elif isinstance(err, str):\n err_msg_list.append(err)\n\n err_details = \"; \".join(err_msg_list) if err_msg_list else \"Unknown Docling processing error\"\n\n msg = f\"Docling processing failed: {err_details}\"\n raise ValueError(msg)\n\n if \"json_content\" not in result[\"document\"] or result[\"document\"][\"json_content\"] is None:\n self.log(\"No JSON DoclingDocument found in the result.\")\n return None\n\n try:\n doc = DoclingDocument.model_validate(result[\"document\"][\"json_content\"])\n data_dict: dict[str, Any] = {\"doc\": doc}\n if file_path:\n data_dict[\"file_path\"] = file_path\n return Data(data=data_dict)\n except ValidationError as e:\n self.log(f\"Error validating the document. {e}\")\n return None\n\n def _process_task_id(self) -> list[Data]:\n \"\"\"Process an existing task by polling for status and retrieving results.\n\n Returns:\n List containing the result Data object, or empty list if processing failed.\n \"\"\"\n transformed_url = transform_localhost_url(self.api_url)\n base_url = f\"{transformed_url}/v1\"\n\n with httpx.Client(headers=self._process_headers()) as client:\n result = self._poll_and_fetch_result(client, base_url, self.task_id)\n return [result] if result else []\n\n def load_files_base(self) -> list[Data]:\n \"\"\"Load and process files, or poll an existing task if task_id is provided.\n\n Returns:\n list[Data]: Parsed data from the processed files or task.\n \"\"\"\n if self.task_id:\n return self._process_task_id()\n return super().load_files_base()\n\n def process_files(\n self, file_list: list[BaseFileComponent.BaseFile]\n ) -> list[BaseFileComponent.BaseFile]:\n transformed_url = transform_localhost_url(self.api_url)\n base_url = f\"{transformed_url}/v1\"\n\n def _convert_document(\n client: httpx.Client, file_path: Path, options: dict[str, Any]\n ) -> Data | None:\n encoded_doc = base64.b64encode(file_path.read_bytes()).decode()\n payload = {\n \"options\": options,\n \"sources\": [\n {\"kind\": \"file\", \"base64_string\": encoded_doc, \"filename\": file_path.name}\n ],\n }\n\n response = client.post(f\"{base_url}/convert/source/async\", json=payload)\n response.raise_for_status()\n task = response.json()\n\n return self._poll_and_fetch_result(client, base_url, task[\"task_id\"], str(file_path))\n\n docling_options = {\n \"to_formats\": [\"json\"],\n \"image_export_mode\": \"placeholder\",\n **(self.docling_serve_opts or {}),\n }\n\n processed_data: list[Data | None] = []\n with (\n httpx.Client(headers=self._process_headers()) as client,\n ThreadPoolExecutor(max_workers=self.max_concurrency) as executor,\n ):\n futures: list[tuple[int, Future]] = []\n for i, file in enumerate(file_list):\n if file.path is None:\n processed_data.append(None)\n continue\n\n futures.append(\n (i, executor.submit(_convert_document, client, file.path, docling_options))\n )\n\n for _index, future in futures:\n try:\n result_data = future.result()\n processed_data.append(result_data)\n except (httpx.HTTPStatusError, httpx.RequestError, KeyError, ValueError) as exc:\n self.log(f\"Docling remote processing failed: {exc}\")\n raise\n\n return self.rollup_data(file_list, processed_data)\n" + "value": "from __future__ import annotations\n\nimport base64\nimport json\nimport time\nfrom concurrent.futures import Future, ThreadPoolExecutor\nfrom pathlib import Path # noqa: TC003\nfrom typing import Any\n\nimport httpx\nfrom docling_core.types.doc import DoclingDocument\nfrom pydantic import ValidationError\n\nfrom lfx.base.data import BaseFileComponent\nfrom lfx.inputs import IntInput, NestedDictInput, StrInput, TableInput\nfrom lfx.inputs.inputs import FloatInput\nfrom lfx.schema import Data, dotdict\nfrom lfx.utils.util import transform_localhost_url\n\n\nclass DoclingRemoteComponent(BaseFileComponent):\n display_name = \"Docling Serve\"\n description = (\n \"Uses Docling to process input documents connecting to your instance of Docling Serve.\"\n )\n documentation = \"https://docling-project.github.io/docling/\"\n trace_type = \"tool\"\n icon = \"Docling\"\n name = \"DoclingRemote\"\n\n MAX_500_RETRIES = 5\n\n # https://docling-project.github.io/docling/usage/supported_formats/\n VALID_EXTENSIONS = [\n \"adoc\",\n \"asciidoc\",\n \"asc\",\n \"bmp\",\n \"csv\",\n \"dotx\",\n \"dotm\",\n \"docm\",\n \"docx\",\n \"htm\",\n \"html\",\n \"jpeg\",\n \"jpg\",\n \"json\",\n \"md\",\n \"pdf\",\n \"png\",\n \"potx\",\n \"ppsx\",\n \"pptm\",\n \"potm\",\n \"ppsm\",\n \"pptx\",\n \"tiff\",\n \"txt\",\n \"xls\",\n \"xlsx\",\n \"xhtml\",\n \"xml\",\n \"webp\",\n ]\n\n inputs = [\n *BaseFileComponent.get_base_inputs(),\n StrInput(\n name=\"api_url\",\n display_name=\"Server address\",\n info=\"URL of the Docling Serve instance.\",\n required=True,\n ),\n StrInput(\n name=\"task_id\",\n display_name=\"Task ID\",\n info=(\n \"Optional task ID from a previous Docling Serve upload. \"\n \"If provided, file input is ignored and the component polls for this task's results.\"\n ),\n required=False,\n ),\n IntInput(\n name=\"max_concurrency\",\n display_name=\"Concurrency\",\n info=\"Maximum number of concurrent requests for the server.\",\n advanced=True,\n value=2,\n input_types=[\"Message\"],\n ),\n FloatInput(\n name=\"max_poll_timeout\",\n display_name=\"Maximum poll time\",\n info=\"Maximum waiting time for the document conversion to complete.\",\n advanced=True,\n value=3600,\n input_types=[\"Message\"],\n ),\n TableInput(\n name=\"api_headers\",\n display_name=\"HTTP headers\",\n advanced=True,\n required=False,\n info=(\"Optional headers required for connecting to Docling Serve.\"),\n table_schema=[\n {\n \"name\": \"key\",\n \"display_name\": \"Key\",\n \"type\": \"string\",\n \"description\": \"Key name\",\n },\n {\n \"name\": \"value\",\n \"display_name\": \"Value\",\n \"load_from_db\": True,\n \"type\": \"string\",\n \"description\": \"Value of the header\",\n },\n ],\n value=[],\n real_time_refresh=True,\n input_types=[\"Data\", \"JSON\"],\n ),\n NestedDictInput(\n name=\"docling_serve_opts\",\n display_name=\"Docling options\",\n advanced=True,\n required=False,\n info=(\n \"Optional dictionary of additional options. \"\n \"See https://github.com/docling-project/docling-serve/blob/main/docs/usage.md for more information.\"\n ),\n input_types=[\"Message\"],\n ),\n StrInput(\n name=\"verify_ssl\",\n display_name=\"Verify SSL\",\n info=\"Whether to verify SSL certificates for Docling Serve.\",\n value=\"DOCLING_SERVE_VERIFY_SSL\",\n load_from_db=True,\n required=False,\n advanced=True,\n ),\n ]\n\n outputs = [\n *BaseFileComponent.get_base_outputs(),\n ]\n\n def _process_headers(self) -> dict[str, str]:\n \"\"\"Process the headers input into a valid dictionary.\"\"\"\n if not self.api_headers:\n return {}\n\n component_headers_dict = {}\n # TableInput normalizes to list\n items = self.api_headers if isinstance(self.api_headers, list) else [self.api_headers]\n\n for item in items:\n if not item:\n continue\n\n # Case 1: Data object\n if hasattr(item, \"data\") and isinstance(item.data, dict):\n data = item.data\n if \"key\" in data and \"value\" in data:\n component_headers_dict[str(data[\"key\"])] = str(data[\"value\"])\n else:\n # Fallback: merge all keys from Data object\n for k, v in data.items():\n if k not in (\"text_key\", \"default_value\"):\n component_headers_dict[str(k)] = str(v)\n\n # Case 2: Dictionary (Table row)\n elif isinstance(item, dict):\n if \"key\" in item and \"value\" in item:\n component_headers_dict[str(item[\"key\"])] = str(item[\"value\"])\n else:\n # Fallback: merge all keys\n for k, v in item.items():\n component_headers_dict[str(k)] = str(v)\n\n # Case 3: Message object\n elif hasattr(item, \"text\") and isinstance(item.text, str):\n try:\n parsed = json.loads(item.text)\n if isinstance(parsed, dict):\n for k, v in parsed.items():\n component_headers_dict[str(k)] = str(v)\n except json.JSONDecodeError:\n pass\n\n return component_headers_dict\n\n def update_build_config(\n self, build_config: dotdict, field_value: Any, field_name: str | None = None\n ) -> dotdict:\n if field_name == \"api_headers\":\n if isinstance(field_value, dict):\n # If it's a dict, convert to list of {key, value} pairs for TableInput\n # This handles migration from NestedDictInput to TableInput\n new_value = [{\"key\": k, \"value\": v} for k, v in field_value.items()]\n build_config[\"api_headers\"][\"value\"] = new_value\n return build_config\n if field_value is None:\n build_config[\"api_headers\"][\"value\"] = []\n return build_config\n\n # Default behavior\n return super().update_build_config(build_config, field_value, field_name)\n\n def _poll_and_fetch_result(\n self, client: httpx.Client, base_url: str, task_id: str, file_path: str | None = None\n ) -> Data | None:\n \"\"\"Poll for task completion and fetch the result.\n\n Args:\n client: The HTTP client to use for requests.\n base_url: The base URL of the Docling Serve API.\n task_id: The task ID to poll for.\n file_path: Optional file path to include in the result data.\n\n Returns:\n Data object with the DoclingDocument, or None if processing failed.\n \"\"\"\n http_failures = 0\n retry_status_start = 500\n retry_status_end = 600\n start_wait_time = time.monotonic()\n\n response = client.get(f\"{base_url}/status/poll/{task_id}\")\n response.raise_for_status()\n task = response.json()\n\n while task[\"task_status\"] not in (\"success\", \"failure\"):\n processing_time = time.monotonic() - start_wait_time\n if processing_time >= self.max_poll_timeout:\n msg = (\n f\"Processing time {processing_time=} exceeds the maximum poll timeout {self.max_poll_timeout=}.\"\n \"Please increase the max_poll_timeout parameter or review why the processing \"\n \"takes long on the server.\"\n )\n self.log(msg)\n raise RuntimeError(msg)\n\n time.sleep(2)\n response = client.get(f\"{base_url}/status/poll/{task_id}\")\n\n if retry_status_start <= response.status_code < retry_status_end:\n http_failures += 1\n if http_failures > self.MAX_500_RETRIES:\n self.log(\n f\"The status requests got a http response {response.status_code} too many times.\"\n )\n return None\n continue\n\n task = response.json()\n\n result_resp = client.get(f\"{base_url}/result/{task_id}\")\n result_resp.raise_for_status()\n result = result_resp.json()\n\n if result.get(\"status\") == \"failure\" or result.get(\"errors\"):\n errors = result.get(\"errors\", [])\n err_msg_list = []\n for err in errors:\n if isinstance(err, dict) and \"error_message\" in err:\n err_msg_list.append(err[\"error_message\"])\n elif isinstance(err, str):\n err_msg_list.append(err)\n\n err_details = \"; \".join(err_msg_list) if err_msg_list else \"Unknown Docling processing error\"\n\n msg = f\"Docling processing failed: {err_details}\"\n raise ValueError(msg)\n\n if \"json_content\" not in result[\"document\"] or result[\"document\"][\"json_content\"] is None:\n self.log(\"No JSON DoclingDocument found in the result.\")\n return None\n\n try:\n doc = DoclingDocument.model_validate(result[\"document\"][\"json_content\"])\n data_dict: dict[str, Any] = {\"doc\": doc}\n if file_path:\n data_dict[\"file_path\"] = file_path\n return Data(data=data_dict)\n except ValidationError as e:\n self.log(f\"Error validating the document. {e}\")\n return None\n\n def _get_verify_ssl(self) -> bool:\n \"\"\"Determine whether to verify SSL certificates for Docling Serve.\n\n Returns:\n bool: True if SSL verification should be enforced, False otherwise.\n \"\"\"\n verify = getattr(self, \"verify_ssl\", \"true\")\n if isinstance(verify, bool):\n return verify\n if isinstance(verify, str):\n return verify.lower() in (\"true\", \"1\", \"yes\")\n return True\n\n def _process_task_id(self) -> list[Data]:\n \"\"\"Process an existing task by polling for status and retrieving results.\n\n Returns:\n List containing the result Data object, or empty list if processing failed.\n \"\"\"\n transformed_url = transform_localhost_url(self.api_url)\n base_url = f\"{transformed_url}/v1\"\n\n with httpx.Client(headers=self._process_headers(), verify=self._get_verify_ssl()) as client:\n result = self._poll_and_fetch_result(client, base_url, self.task_id)\n return [result] if result else []\n\n def load_files_base(self) -> list[Data]:\n \"\"\"Load and process files, or poll an existing task if task_id is provided.\n\n Returns:\n list[Data]: Parsed data from the processed files or task.\n \"\"\"\n if self.task_id:\n return self._process_task_id()\n return super().load_files_base()\n\n def process_files(\n self, file_list: list[BaseFileComponent.BaseFile]\n ) -> list[BaseFileComponent.BaseFile]:\n transformed_url = transform_localhost_url(self.api_url)\n base_url = f\"{transformed_url}/v1\"\n\n def _convert_document(\n client: httpx.Client, file_path: Path, options: dict[str, Any]\n ) -> Data | None:\n encoded_doc = base64.b64encode(file_path.read_bytes()).decode()\n payload = {\n \"options\": options,\n \"sources\": [\n {\"kind\": \"file\", \"base64_string\": encoded_doc, \"filename\": file_path.name}\n ],\n }\n\n response = client.post(f\"{base_url}/convert/source/async\", json=payload)\n response.raise_for_status()\n task = response.json()\n\n return self._poll_and_fetch_result(client, base_url, task[\"task_id\"], str(file_path))\n\n docling_options = {\n \"to_formats\": [\"json\"],\n \"image_export_mode\": \"placeholder\",\n **(self.docling_serve_opts or {}),\n }\n\n processed_data: list[Data | None] = []\n with (\n httpx.Client(headers=self._process_headers(), verify=self._get_verify_ssl()) as client,\n ThreadPoolExecutor(max_workers=self.max_concurrency) as executor,\n ):\n futures: list[tuple[int, Future]] = []\n for i, file in enumerate(file_list):\n if file.path is None:\n processed_data.append(None)\n continue\n\n futures.append(\n (i, executor.submit(_convert_document, client, file.path, docling_options))\n )\n\n for _index, future in futures:\n try:\n result_data = future.result()\n processed_data.append(result_data)\n except (httpx.HTTPStatusError, httpx.RequestError, KeyError, ValueError) as exc:\n self.log(f\"Docling remote processing failed: {exc}\")\n raise\n\n return self.rollup_data(file_list, processed_data)\n" }, "delete_server_file_after_processing": { "_input_type": "BoolInput", @@ -1023,6 +1024,27 @@ "track_in_telemetry": false, "type": "str", "value": "DOCLING_TASK_ID" + }, + "verify_ssl": { + "_input_type": "StrInput", + "advanced": true, + "display_name": "Verify SSL", + "dynamic": false, + "info": "Whether to verify SSL certificates for Docling Serve.", + "list": false, + "list_add_label": "Add More", + "load_from_db": true, + "name": "verify_ssl", + "override_skip": false, + "placeholder": "", + "required": false, + "show": true, + "title_case": false, + "tool_mode": false, + "trace_as_metadata": true, + "track_in_telemetry": false, + "type": "str", + "value": "DOCLING_SERVE_VERIFY_SSL" } }, "tool_mode": false diff --git a/kubernetes/helm/openrag/templates/backend/backend-dotenv.yaml b/kubernetes/helm/openrag/templates/backend/backend-dotenv.yaml index 6cbdbfd65..0566f48ae 100644 --- a/kubernetes/helm/openrag/templates/backend/backend-dotenv.yaml +++ b/kubernetes/helm/openrag/templates/backend/backend-dotenv.yaml @@ -150,6 +150,11 @@ stringData: {{- if and .Values.global.docling.scheme .Values.global.docling.host .Values.global.docling.port }} DOCLING_SERVE_URL={{ printf "%s://%s:%v" .Values.global.docling.scheme .Values.global.docling.host .Values.global.docling.port | quote }} {{- end }} + {{- if hasKey .Values.global.docling "verifySsl" }} + DOCLING_SERVE_VERIFY_SSL={{ ternary "true" "false" .Values.global.docling.verifySsl | quote }} + {{- else }} + DOCLING_SERVE_VERIFY_SSL="true" + {{- end }} # Langflow auth {{- if .Values.langflow.auth.superuser }} diff --git a/kubernetes/helm/openrag/templates/langflow/langflow-dotenv.yaml b/kubernetes/helm/openrag/templates/langflow/langflow-dotenv.yaml index 88a6b86a6..b2b669378 100644 --- a/kubernetes/helm/openrag/templates/langflow/langflow-dotenv.yaml +++ b/kubernetes/helm/openrag/templates/langflow/langflow-dotenv.yaml @@ -107,6 +107,11 @@ stringData: {{- if and .Values.global.docling.scheme .Values.global.docling.host .Values.global.docling.port }} DOCLING_SERVE_URL={{ printf "%s://%s:%v" .Values.global.docling.scheme .Values.global.docling.host .Values.global.docling.port | quote }} {{- end }} + {{- if hasKey .Values.global.docling "verifySsl" }} + DOCLING_SERVE_VERIFY_SSL={{ ternary "true" "false" .Values.global.docling.verifySsl | quote }} + {{- else }} + DOCLING_SERVE_VERIFY_SSL="true" + {{- end }} # OpenSearch connection (for flows) OPENSEARCH_HOST={{ include "openrag.langflow.opensearch.host" . | quote }} diff --git a/kubernetes/helm/openrag/values.yaml b/kubernetes/helm/openrag/values.yaml index d21c18d47..461474ae3 100644 --- a/kubernetes/helm/openrag/values.yaml +++ b/kubernetes/helm/openrag/values.yaml @@ -33,6 +33,7 @@ global: host: docling-serve.docling.svc.cluster.local port: 5001 scheme: "http" + verifySsl: false # Shared OAuth credentials (same across all tenants) oauth: @@ -179,7 +180,7 @@ langflow: host: "https://cloud.langfuse.com" # Variables to expose to flows - variablesToGetFromEnvironment: "JWT,OPENRAG-QUERY-FILTER,OPENSEARCH_PASSWORD,OPENSEARCH_URL,OPENSEARCH_INDEX_NAME,DOCLING_SERVE_URL,DOCLING_TASK_ID,OWNER,OWNER_NAME,OWNER_EMAIL,CONNECTOR_TYPE,DOCUMENT_ID,SOURCE_URL,ALLOWED_USERS,ALLOWED_GROUPS,ALLOWED_PRINCIPALS,FILENAME,MIMETYPE,FILESIZE,SELECTED_EMBEDDING_MODEL,OPENAI_API_KEY,ANTHROPIC_API_KEY,WATSONX_APIKEY,WATSONX_URL,WATSONX_PROJECT_ID,OLLAMA_BASE_URL,OPENRAG_INGEST_URL,OPENRAG_INGEST_TOKEN,OPENRAG_INGEST_RUN_ID,OPENRAG_INGEST_BATCH_SIZE" + variablesToGetFromEnvironment: "JWT,OPENRAG-QUERY-FILTER,OPENSEARCH_PASSWORD,OPENSEARCH_URL,OPENSEARCH_INDEX_NAME,DOCLING_SERVE_URL,DOCLING_SERVE_VERIFY_SSL,DOCLING_TASK_ID,OWNER,OWNER_NAME,OWNER_EMAIL,CONNECTOR_TYPE,DOCUMENT_ID,SOURCE_URL,ALLOWED_USERS,ALLOWED_GROUPS,ALLOWED_PRINCIPALS,FILENAME,MIMETYPE,FILESIZE,SELECTED_EMBEDDING_MODEL,OPENAI_API_KEY,ANTHROPIC_API_KEY,WATSONX_APIKEY,WATSONX_URL,WATSONX_PROJECT_ID,OLLAMA_BASE_URL,OPENRAG_INGEST_URL,OPENRAG_INGEST_TOKEN,OPENRAG_INGEST_RUN_ID,OPENRAG_INGEST_BATCH_SIZE" # Probes livenessProbe: diff --git a/kubernetes/operator/api/v1alpha1/openrag_types.go b/kubernetes/operator/api/v1alpha1/openrag_types.go index 6ec4495b2..546ef7acc 100644 --- a/kubernetes/operator/api/v1alpha1/openrag_types.go +++ b/kubernetes/operator/api/v1alpha1/openrag_types.go @@ -387,6 +387,10 @@ type DoclingSpec struct { // +optional // +kubebuilder:default="http" Scheme string `json:"scheme,omitempty"` + + // +optional + // +kubebuilder:default=false + VerifySsl *bool `json:"verifySsl,omitempty"` } // DoclingServeSpec configures the Docling serve component (API server). diff --git a/kubernetes/operator/api/v1alpha1/zz_generated.deepcopy.go b/kubernetes/operator/api/v1alpha1/zz_generated.deepcopy.go index 6c2b9cea2..e952d4152 100644 --- a/kubernetes/operator/api/v1alpha1/zz_generated.deepcopy.go +++ b/kubernetes/operator/api/v1alpha1/zz_generated.deepcopy.go @@ -453,6 +453,11 @@ func (in *DoclingServeSpec) DeepCopy() *DoclingServeSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DoclingSpec) DeepCopyInto(out *DoclingSpec) { *out = *in + if in.VerifySsl != nil { + in, out := &in.VerifySsl, &out.VerifySsl + *out = new(bool) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DoclingSpec. @@ -805,7 +810,7 @@ func (in *OpenRAGSpec) DeepCopyInto(out *OpenRAGSpec) { if in.Docling != nil { in, out := &in.Docling, &out.Docling *out = new(DoclingSpec) - **out = **in + (*in).DeepCopyInto(*out) } out.NetworkPolicy = in.NetworkPolicy } diff --git a/kubernetes/operator/config/crd/bases/openr.ag_openrags.yaml b/kubernetes/operator/config/crd/bases/openr.ag_openrags.yaml index 9e8cadc8e..f36c7728b 100644 --- a/kubernetes/operator/config/crd/bases/openr.ag_openrags.yaml +++ b/kubernetes/operator/config/crd/bases/openr.ag_openrags.yaml @@ -2184,6 +2184,9 @@ spec: scheme: default: http type: string + verifySsl: + default: false + type: boolean required: - host type: object diff --git a/kubernetes/operator/internal/controller/env.go b/kubernetes/operator/internal/controller/env.go index 195b98c50..86184d462 100644 --- a/kubernetes/operator/internal/controller/env.go +++ b/kubernetes/operator/internal/controller/env.go @@ -34,7 +34,7 @@ func NewEnvVarManager() *EnvVarManager { "LANGFLOW_DATABASE_URL": "sqlite:////app/data/langflow.db", // Variables to expose to Langflow components - "LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT": "JWT,OPENRAG_QUERY_FILTER,OPENSEARCH_PASSWORD,OPENSEARCH_URL,OPENSEARCH_INDEX_NAME,DOCLING_SERVE_URL,DOCLING_TASK_ID,OWNER,OWNER_NAME,OWNER_EMAIL,CONNECTOR_TYPE,DOCUMENT_ID,SOURCE_URL,ALLOWED_USERS,ALLOWED_GROUPS,ALLOWED_PRINCIPALS,FILENAME,MIMETYPE,FILESIZE,SELECTED_EMBEDDING_MODEL,OPENAI_API_KEY,ANTHROPIC_API_KEY,WATSONX_API_KEY,WATSONX_ENDPOINT,WATSONX_PROJECT_ID,OLLAMA_BASE_URL,OPENRAG_INGEST_URL,OPENRAG_INGEST_TOKEN,OPENRAG_INGEST_RUN_ID,OPENRAG_INGEST_BATCH_SIZE", + "LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT": "JWT,OPENRAG_QUERY_FILTER,OPENSEARCH_PASSWORD,OPENSEARCH_URL,OPENSEARCH_INDEX_NAME,DOCLING_SERVE_URL,DOCLING_SERVE_VERIFY_SSL,DOCLING_TASK_ID,OWNER,OWNER_NAME,OWNER_EMAIL,CONNECTOR_TYPE,DOCUMENT_ID,SOURCE_URL,ALLOWED_USERS,ALLOWED_GROUPS,ALLOWED_PRINCIPALS,FILENAME,MIMETYPE,FILESIZE,SELECTED_EMBEDDING_MODEL,OPENAI_API_KEY,ANTHROPIC_API_KEY,WATSONX_API_KEY,WATSONX_ENDPOINT,WATSONX_PROJECT_ID,OLLAMA_BASE_URL,OPENRAG_INGEST_URL,OPENRAG_INGEST_TOKEN,OPENRAG_INGEST_RUN_ID,OPENRAG_INGEST_BATCH_SIZE", // Authentication and user management "LANGFLOW_SKIP_AUTH_AUTO_LOGIN": "true", @@ -83,7 +83,8 @@ func NewEnvVarManager() *EnvVarManager { "OPENSEARCH_INDEX_NAME": "None", // Docling defaults (for variables in LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT) - "DOCLING_SERVE_URL": "None", + "DOCLING_SERVE_URL": "None", + "DOCLING_SERVE_VERIFY_SSL": "false", // Provider API keys (defaults to None, overridden by CR spec) "OPENAI_API_KEY": "None", @@ -142,9 +143,10 @@ func NewEnvVarManager() *EnvVarManager { "EMBEDDING_MODEL": "", "EMBEDDING_PROVIDER": "", - "WATSONX_API_KEY": "", - "WATSONX_ENDPOINT": "", - "WATSONX_PROJECT_ID": "", + "WATSONX_API_KEY": "", + "WATSONX_ENDPOINT": "", + "WATSONX_PROJECT_ID": "", + "DOCLING_SERVE_VERIFY_SSL": "false", }, DefaultOpenRagFEEnvVars: map[string]string{ // Frontend environment variables will be added here diff --git a/kubernetes/operator/internal/controller/env_test.go b/kubernetes/operator/internal/controller/env_test.go index d02d36e79..f4a0cc697 100644 --- a/kubernetes/operator/internal/controller/env_test.go +++ b/kubernetes/operator/internal/controller/env_test.go @@ -451,7 +451,7 @@ func TestEnvVarManager_EnsureRequiredEnvVars_Integration(t *testing.T) { // Parse the required variables list requiredVars := []string{"JWT", "OPENRAG_QUERY_FILTER", "OPENSEARCH_PASSWORD", "OPENSEARCH_URL", - "OPENSEARCH_INDEX_NAME", "DOCLING_SERVE_URL", "DOCLING_TASK_ID", "OWNER", "OWNER_NAME", + "OPENSEARCH_INDEX_NAME", "DOCLING_SERVE_URL", "DOCLING_SERVE_VERIFY_SSL", "DOCLING_TASK_ID", "OWNER", "OWNER_NAME", "OWNER_EMAIL", "CONNECTOR_TYPE", "DOCUMENT_ID", "SOURCE_URL", "ALLOWED_USERS", "ALLOWED_GROUPS", "FILENAME", "MIMETYPE", "FILESIZE", "SELECTED_EMBEDDING_MODEL", "OPENAI_API_KEY", "ANTHROPIC_API_KEY", "WATSONX_API_KEY", "WATSONX_ENDPOINT", @@ -470,6 +470,7 @@ func TestEnvVarManager_EnsureRequiredEnvVars_Integration(t *testing.T) { assert.Equal(t, "None", envVars["OPENSEARCH_URL"], "OPENSEARCH_URL should have default 'None'") assert.Equal(t, "None", envVars["OPENSEARCH_INDEX_NAME"], "OPENSEARCH_INDEX_NAME should have default 'None'") assert.Equal(t, "None", envVars["DOCLING_SERVE_URL"], "DOCLING_SERVE_URL should have default 'None'") + assert.Equal(t, "false", envVars["DOCLING_SERVE_VERIFY_SSL"], "DOCLING_SERVE_VERIFY_SSL should have default 'false'") } func TestEnvVarManager_EnsureRequiredEnvVars_CustomList(t *testing.T) { diff --git a/kubernetes/operator/internal/controller/openrag_controller.go b/kubernetes/operator/internal/controller/openrag_controller.go index c7d56894d..6b3ff2371 100644 --- a/kubernetes/operator/internal/controller/openrag_controller.go +++ b/kubernetes/operator/internal/controller/openrag_controller.go @@ -474,6 +474,9 @@ func (r *OpenRAGReconciler) buildBackendEnv(ctx context.Context, o *openragv1alp port = 5001 } envVars["DOCLING_SERVE_URL"] = fmt.Sprintf("%s://%s:%d", scheme, d.Host, port) + if d.VerifySsl != nil { + envVars["DOCLING_SERVE_VERIFY_SSL"] = strconv.FormatBool(*d.VerifySsl) + } } // Convert map to .env file format @@ -568,6 +571,9 @@ func (r *OpenRAGReconciler) buildLangflowEnv(ctx context.Context, o *openragv1al port = 5001 } envVars["DOCLING_SERVE_URL"] = fmt.Sprintf("%s://%s:%d", scheme, d.Host, port) + if d.VerifySsl != nil { + envVars["DOCLING_SERVE_VERIFY_SSL"] = strconv.FormatBool(*d.VerifySsl) + } } // Ensure all variables in LANGFLOW_VARIABLES_TO_GET_FROM_ENVIRONMENT exist with at least "None" value diff --git a/src/services/langflow_file_service.py b/src/services/langflow_file_service.py index da3a90a7b..0e4c2aea0 100644 --- a/src/services/langflow_file_service.py +++ b/src/services/langflow_file_service.py @@ -9,6 +9,7 @@ import httpx from config.settings import ( + DOCLING_SERVE_VERIFY_SSL, LANGFLOW_INGEST_CALLBACK_BATCH_SIZE, LANGFLOW_INGEST_FLOW_ID, LANGFLOW_URL_INGEST_FLOW_ID, @@ -434,6 +435,7 @@ async def run_ingestion_flow( "X-Langflow-Global-Var-DOCLING_TASK_ID": str(docling_task_id) if docling_task_id else "", + "X-Langflow-Global-Var-DOCLING_SERVE_VERIFY_SSL": str(DOCLING_SERVE_VERIFY_SSL).lower(), } # Serialize ACL lists as JSON strings for Langflow global vars @@ -608,6 +610,7 @@ async def run_url_ingestion_flow( "X-Langflow-Global-Var-FILENAME": str(docs_url), "X-Langflow-Global-Var-MIMETYPE": "text/html", "X-Langflow-Global-Var-FILESIZE": "0", + "X-Langflow-Global-Var-DOCLING_SERVE_VERIFY_SSL": str(DOCLING_SERVE_VERIFY_SSL).lower(), } ingest_token, ingest_run_id = self._configure_ingest_callback( document_id=resolved_document_id,