From fbf836004b9a705c531a5539dae13152de856964 Mon Sep 17 00:00:00 2001 From: leoou331 Date: Wed, 10 Jun 2026 13:18:21 +0000 Subject: [PATCH 1/2] feat(aws_tools): add S3 File Uploader and S3 File Download tools Add two new builtin tools to the aws_tools plugin so that Dify workflows can move file objects (not just text) between workflow nodes and S3: - s3_file_uploader: takes a file variable from an upstream node and uploads it to a configurable bucket/key, optionally returning a presigned URL. - s3_file_download: takes an s3://bucket/key URI and emits a Dify file (via create_blob_message) plus structured metadata for downstream consumption. Why --- The existing s3_operator only handles text payloads (text_content in, UTF-8 text out), so it can't be wired directly to a Start node 'file' input or to any tool that emits binary file variables. These two tools close that gap with the same UX and parameter conventions as s3_operator. Implementation notes -------------------- - Both tools are self-contained (credential-resolution helpers are inlined) so this PR does not introduce a shared utils/ module. - They reuse the existing aws_tools provider's credentials_for_provider schema (Access Key / Secret Key / Region) and additionally accept a per-invocation aws_session_token for STS / role-assumption use cases. - Three-language labels (en_US / zh_Hans / pt_BR) match the rest of the plugin's tools. Validation ---------- - Static: yaml.safe_load all touched yaml files; py_compile both .py files; verified extra.python.source paths resolve correctly. - End-to-end: packaged plugins/aws_tools/ from this branch into a .difypkg, installed it on a self-hosted Dify 1.14.2 instance, and ran a workflow [Start -> S3 Upload -> S3 Download -> End] against cn-northwest-1. status=succeeded, total_steps=4, elapsed=1.0s. Object SHA-256 verified byte-for-byte identical between local file and the S3 round-trip. Origin ------ Implementation derived from the public S3 file uploader/download tools in r3-yamauchi/dify-my-aws-tools-plugin (Apache-2.0). Author confirmed he is happy for these two tools to be contributed upstream into aws-samples/dify-aws-tool with no attribution requirement; comments have been translated to English to match the surrounding files. --- README.md | 2 + README_JA.md | 2 + README_ZH.md | 2 + plugins/aws_tools/provider/aws_tools.yaml | 2 + plugins/aws_tools/tools/s3_file_download.py | 175 +++++++++++++++ plugins/aws_tools/tools/s3_file_download.yaml | 78 +++++++ plugins/aws_tools/tools/s3_file_uploader.py | 201 ++++++++++++++++++ plugins/aws_tools/tools/s3_file_uploader.yaml | 139 ++++++++++++ 8 files changed, 601 insertions(+) create mode 100644 plugins/aws_tools/tools/s3_file_download.py create mode 100644 plugins/aws_tools/tools/s3_file_download.yaml create mode 100644 plugins/aws_tools/tools/s3_file_uploader.py create mode 100644 plugins/aws_tools/tools/s3_file_uploader.yaml diff --git a/README.md b/README.md index 8814219..d2f8329 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,8 @@ This repository provides the source code for three plugins in [Dify](https://git | Transcribe Tool | SAAS | AWS transcribe service tool (ASR) | | [river xie](chuanxie@amazon.com) | | Bedrock Retriever | PAAS | Amazon Bedrock knowledge base retrieval tool | | [ychchen](ychchen@amazon.com) | | S3 Operator | SAAS | Read and write S3 bucket content, can return presigned URLs | | [ybalbert](ybalbert@amazon.com) | +| S3 File Uploader | SAAS | Upload a workflow file (file variable) to S3 and optionally return a presigned URL | | [leoou](leoou@amazon.com) | +| S3 File Download | SAAS | Download an S3 object as a Dify file variable for downstream nodes | | [leoou](leoou@amazon.com) | | AWS Bedrock Nova Canvas | SAAS | Generate images based on Amazon Nova Canvas | | [alexwuu](alexwuu@amazon.com) | | AWS Bedrock Nova Reel | SAAS | Generate videos based on Amazon Nova Reel | | [alexwuu](alexwuu@amazon.com) | | OpenSearch Knn Retriever | PAAS | Retrieve data from OpenSearch using KNN method | [Notebook](https://github.com/aws-samples/dify-aws-tool/tree/main/notebook/search_img_by_img) | [ybalbert](ybalbert@amazon.com) | diff --git a/README_JA.md b/README_JA.md index df12d5c..123d89c 100644 --- a/README_JA.md +++ b/README_JA.md @@ -79,6 +79,8 @@ | Transcribe Tool | SAAS | AWS transcribeサービスツール (ASR) | | [river xie](chuanxie@amazon.com) | | Bedrock Retriever | PAAS | Amazon Bedrockナレッジベース検索ツール | | [ychchen](ychchen@amazon.com) | | S3 Operator | SAAS | S3バケットのコンテンツの読み書き、署名付きURLの返却が可能 | | [ybalbert](ybalbert@amazon.com) | +| S3 File Uploader | SAAS | ワークフロー内の file 変数を S3 にアップロードし、必要に応じて署名付きURLを返却 | | [leoou](leoou@amazon.com) | +| S3 File Download | SAAS | S3 オブジェクトを Dify の file 変数として取得し、下流ノードに渡す | | [leoou](leoou@amazon.com) | | AWS Bedrock Nova Canvas | SAAS | Amazon Nova Canvasに基づく画像生成 | | [alexwuu](alexwuu@amazon.com) | | AWS Bedrock Nova Reel | SAAS | Amazon Nova Reelに基づく動画生成 | | [alexwuu](alexwuu@amazon.com) | | OpenSearch Knn Retriever | PAAS | KNN手法を使用してOpenSearchからデータを検索 | [Notebook](https://github.com/aws-samples/dify-aws-tool/tree/main/notebook/search_img_by_img) | [ybalbert](ybalbert@amazon.com) | diff --git a/README_ZH.md b/README_ZH.md index bdd2f84..51e3ba5 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -78,6 +78,8 @@ | Transcribe Tool | SAAS | AWS transcribe service tool (ASR) | | [river xie](chuanxie@amazon.com) | | Bedrock Retriever | PAAS | Amazon Bedrock知识库检索工具 | | [ychchen](ychchen@amazon.com) | | S3 Operator | SAAS | 读写S3中bucket的内容,可以返回presignURL | | [ybalbert](ybalbert@amazon.com) | +| S3 File Uploader | SAAS | 将工作流中的 file 变量上传到 S3, 可选返回 presignURL | | [leoou](leoou@amazon.com) | +| S3 File Download | SAAS | 从 S3 下载对象为 Dify file 变量, 供下游节点使用 | | [leoou](leoou@amazon.com) | | AWS Bedrock Nova Canvas | SAAS | 基于Amazon Nova Canvas生成图像 | | [alexwuu](alexwuu@amazon.com) | | AWS Bedrock Nova Reel | SAAS | 基于Amazon Nova Reel生成视频 | | [alexwuu](alexwuu@amazon.com) | | OpenSearch Knn Retriever | PAAS | 用KNN方法从OpenSearch召回数据 | [Notebook](https://github.com/aws-samples/dify-aws-tool/tree/main/notebook/search_img_by_img) | [ybalbert](ybalbert@amazon.com) | diff --git a/plugins/aws_tools/provider/aws_tools.yaml b/plugins/aws_tools/provider/aws_tools.yaml index 361c4ab..1ac7f9f 100644 --- a/plugins/aws_tools/provider/aws_tools.yaml +++ b/plugins/aws_tools/provider/aws_tools.yaml @@ -17,6 +17,8 @@ tools: - tools/bedrock_retrieve_and_generate.yaml - tools/opensearch_knn_search.yaml - tools/s3_operator.yaml + - tools/s3_file_uploader.yaml + - tools/s3_file_download.yaml - tools/apply_guardrail.yaml - tools/nova_canvas.yaml - tools/transcribe_asr.yaml diff --git a/plugins/aws_tools/tools/s3_file_download.py b/plugins/aws_tools/tools/s3_file_download.py new file mode 100644 index 0000000..24da607 --- /dev/null +++ b/plugins/aws_tools/tools/s3_file_download.py @@ -0,0 +1,175 @@ +""" +Location: tools/s3_file_download.py +Purpose: Download an S3 object as a Dify file variable for downstream workflow nodes. + +This tool complements ``s3_file_uploader``. It accepts an ``s3://bucket/key`` URI, +fetches the object via boto3, and emits the binary as a Dify file plus metadata as +both JSON and a key=value text block, so downstream nodes can treat it as a file +input or read individual fields like ``content_length`` / ``etag``. +""" + +from __future__ import annotations + +from collections.abc import Generator, Iterable +from typing import Any, Optional +from urllib.parse import urlparse + +import boto3 +from botocore.exceptions import ClientError + +from dify_plugin import Tool +from dify_plugin.entities.tool import ToolInvokeMessage + + +# --------------------------------------------------------------------------- +# Inline credential helpers (kept self-contained on purpose so this tool does +# not rely on a shared utils/ module that the rest of this repo does not use). +# --------------------------------------------------------------------------- + +def _resolve_aws_credentials(tool: Any, tool_parameters: dict[str, Any]) -> dict[str, Optional[str]]: + runtime_credentials = getattr(getattr(tool, "runtime", None), "credentials", {}) or {} + + aws_access_key_id = ( + tool_parameters.get("aws_access_key_id") + or runtime_credentials.get("aws_access_key_id") + ) + aws_secret_access_key = ( + tool_parameters.get("aws_secret_access_key") + or runtime_credentials.get("aws_secret_access_key") + ) + aws_session_token = tool_parameters.get("aws_session_token") + aws_region = ( + tool_parameters.get("aws_region") + or runtime_credentials.get("aws_region") + or "us-east-1" + ) + + return { + "aws_access_key_id": aws_access_key_id, + "aws_secret_access_key": aws_secret_access_key, + "aws_session_token": aws_session_token, + "aws_region": aws_region, + } + + +def _build_boto3_client_kwargs(credentials: dict[str, Optional[str]]) -> dict[str, Any]: + kwargs: dict[str, Any] = {} + if credentials.get("aws_region"): + kwargs["region_name"] = credentials["aws_region"] + if credentials.get("aws_access_key_id") and credentials.get("aws_secret_access_key"): + kwargs["aws_access_key_id"] = credentials["aws_access_key_id"] + kwargs["aws_secret_access_key"] = credentials["aws_secret_access_key"] + if credentials.get("aws_session_token"): + kwargs["aws_session_token"] = credentials["aws_session_token"] + return kwargs + + +def _credential_signature(credentials: dict[str, Optional[str]]) -> tuple: + return ( + credentials.get("aws_access_key_id"), + credentials.get("aws_secret_access_key"), + credentials.get("aws_session_token"), + credentials.get("aws_region"), + ) + + +def _reset_clients_on_credential_change( + owner: Any, + credentials: dict[str, Optional[str]], + client_attrs: Iterable[str], + signature_attr: str = "_client_credentials_signature", +) -> None: + signature = _credential_signature(credentials) + current_signature = getattr(owner, signature_attr, None) + if current_signature != signature: + for attr in client_attrs: + if hasattr(owner, attr): + setattr(owner, attr, None) + setattr(owner, signature_attr, signature) + + +# --------------------------------------------------------------------------- +# Tool implementation +# --------------------------------------------------------------------------- + +def _build_metadata_text(metadata: dict[str, Any]) -> str: + """Render a simple ``key: value`` block for human-readable downstream display.""" + lines = [] + for key, value in metadata.items(): + if value is None: + continue + lines.append(f"{key}: {value}") + return "\n".join(lines) + + +class S3FileDownload(Tool): + s3_client: Any = None + + def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessage, None, None]: + """Download an S3 object and emit it as a Dify file plus metadata.""" + try: + credentials = _resolve_aws_credentials(self, tool_parameters) + if tool_parameters.get("aws_region"): + credentials["aws_region"] = tool_parameters["aws_region"] + + _reset_clients_on_credential_change(self, credentials, ["s3_client"]) + if not self.s3_client: + client_kwargs = _build_boto3_client_kwargs(credentials) + self.s3_client = boto3.client("s3", **client_kwargs) + except Exception as exc: # pragma: no cover - boto3 init errors + yield self.create_text_message(f"Failed to initialize AWS client: {exc}") + return + + s3_uri = tool_parameters.get("s3_uri") + if not s3_uri: + yield self.create_text_message("s3_uri parameter is required") + return + + parsed_uri = urlparse(s3_uri) + if parsed_uri.scheme != "s3" or not parsed_uri.netloc or not parsed_uri.path: + yield self.create_text_message("Invalid S3 URI format. Use s3://bucket/key") + return + + bucket = parsed_uri.netloc + key = parsed_uri.path.lstrip("/") + + try: + response = self.s3_client.get_object(Bucket=bucket, Key=key) + file_bytes = response["Body"].read() + except self.s3_client.exceptions.NoSuchBucket: + yield self.create_text_message(f"Bucket '{bucket}' does not exist") + return + except self.s3_client.exceptions.NoSuchKey: + yield self.create_text_message(f"Object '{key}' does not exist in bucket '{bucket}'") + return + except ClientError as exc: + error_message = exc.response.get("Error", {}).get("Message", str(exc)) + yield self.create_text_message(f"Failed to download S3 object: {error_message}") + return + except Exception as exc: + yield self.create_text_message(f"Failed to download S3 object: {exc}") + return + + filename = key.split("/")[-1] if key else "downloaded_file" + content_type = response.get("ContentType") or "application/octet-stream" + metadata_dict = { + "bucket": bucket, + "key": key, + "content_type": content_type, + "content_length": response.get("ContentLength"), + "etag": response.get("ETag"), + "last_modified": response.get("LastModified").isoformat() + if response.get("LastModified") + else None, + "s3_uri": s3_uri, + } + metadata_text = _build_metadata_text(metadata_dict) + + blob_meta = { + "filename": filename, + "mime_type": content_type, + "s3_uri": s3_uri, + } + yield self.create_blob_message(file_bytes, meta=blob_meta) + yield self.create_json_message(metadata_dict) + yield self.create_text_message(metadata_text or f"bucket: {bucket}\nkey: {key}") diff --git a/plugins/aws_tools/tools/s3_file_download.yaml b/plugins/aws_tools/tools/s3_file_download.yaml new file mode 100644 index 0000000..0165201 --- /dev/null +++ b/plugins/aws_tools/tools/s3_file_download.yaml @@ -0,0 +1,78 @@ +identity: + name: s3_file_download + author: AWS + label: + en_US: AWS S3 File Download + zh_Hans: AWS S3 文件下载 + pt_BR: AWS S3 File Download +description: + human: + en_US: Download files from Amazon S3. + zh_Hans: 从 Amazon S3 下载文件。 + pt_BR: Faz download de arquivos do Amazon S3. + llm: Download files from Amazon S3 so workflows can access the binary and metadata directly. +parameters: + - name: aws_access_key_id + type: string + required: false + label: + en_US: AWS Access Key ID + zh_Hans: AWS Access Key ID + pt_BR: AWS Access Key ID + human_description: + en_US: Override the provider Access Key ID for this tool if needed. + zh_Hans: 当需要覆盖默认的 Access Key ID 时使用。 + pt_BR: Sobrescreve o Access Key ID padrão da provider quando necessário. + form: form + - name: aws_secret_access_key + type: string + required: false + label: + en_US: AWS Secret Access Key + zh_Hans: AWS Secret Access Key + pt_BR: AWS Secret Access Key + human_description: + en_US: Override the provider Secret Access Key for this tool if needed. + zh_Hans: 当需要覆盖默认的 Secret Access Key 时使用。 + pt_BR: Sobrescreve o Secret Access Key padrão da provider quando necessário. + form: form + - name: aws_session_token + type: string + required: false + label: + en_US: AWS Session Token + zh_Hans: AWS Session Token + pt_BR: AWS Session Token + human_description: + en_US: AWS session token for temporary credentials (STS). Only supported in tool parameters, not at provider level. + zh_Hans: 临时凭证(STS)的 session token。仅支持在工具参数中传入,provider 级别不支持。 + pt_BR: Token de sessão da AWS para credenciais temporárias (STS). Só é suportado nos parâmetros da ferramenta, não no nível da provider. + form: form + - name: aws_region + type: string + required: false + label: + en_US: AWS Region + zh_Hans: AWS 区域 + pt_BR: AWS Region + human_description: + en_US: Override the default AWS Region for this tool. + zh_Hans: 为该工具指定 AWS 区域,覆盖 provider 默认值。 + pt_BR: Sobrescreve a região AWS padrão para esta ferramenta. + form: form + - name: s3_uri + type: string + required: true + label: + en_US: S3 URI + zh_Hans: S3 URI + pt_BR: S3 URI + human_description: + en_US: Target object in s3://bucket/key format. + zh_Hans: 以 s3://bucket/key 形式指定下载对象。 + pt_BR: Objeto alvo no formato s3://bucket/key. + llm_description: S3 URI of the object to read. + form: llm +extra: + python: + source: tools/s3_file_download.py diff --git a/plugins/aws_tools/tools/s3_file_uploader.py b/plugins/aws_tools/tools/s3_file_uploader.py new file mode 100644 index 0000000..2f3d62c --- /dev/null +++ b/plugins/aws_tools/tools/s3_file_uploader.py @@ -0,0 +1,201 @@ +""" +Location: tools/s3_file_uploader.py +Purpose: Upload a file received from a prior Dify workflow node to a specified S3 bucket. + +This tool consumes the binary asset produced by an upstream node (e.g. Start file input, +LLM with file output, or another tool) and persists it to Amazon S3 so that downstream +nodes can reference it by S3 URI or via an optional presigned URL. +""" + +from __future__ import annotations + +import uuid +from collections.abc import Generator, Iterable +from typing import Any, Optional + +import boto3 +from botocore.exceptions import ClientError + +from dify_plugin import Tool +from dify_plugin.entities.tool import ToolInvokeMessage + + +# --------------------------------------------------------------------------- +# Inline credential helpers (kept self-contained on purpose so this tool does +# not rely on a shared utils/ module that the rest of this repo does not use). +# --------------------------------------------------------------------------- + +def _resolve_aws_credentials(tool: Any, tool_parameters: dict[str, Any]) -> dict[str, Optional[str]]: + """Merge provider-level credentials with per-invocation tool parameters. + + Tool-parameter values win over provider credentials so callers can override + a default profile per invocation. ``aws_session_token`` is intentionally + only sourced from tool parameters because the provider schema does not + expose it (STS-issued temporary credentials are passed inline). + """ + runtime_credentials = getattr(getattr(tool, "runtime", None), "credentials", {}) or {} + + aws_access_key_id = ( + tool_parameters.get("aws_access_key_id") + or runtime_credentials.get("aws_access_key_id") + ) + aws_secret_access_key = ( + tool_parameters.get("aws_secret_access_key") + or runtime_credentials.get("aws_secret_access_key") + ) + aws_session_token = tool_parameters.get("aws_session_token") + aws_region = ( + tool_parameters.get("aws_region") + or runtime_credentials.get("aws_region") + or "us-east-1" + ) + + return { + "aws_access_key_id": aws_access_key_id, + "aws_secret_access_key": aws_secret_access_key, + "aws_session_token": aws_session_token, + "aws_region": aws_region, + } + + +def _build_boto3_client_kwargs(credentials: dict[str, Optional[str]]) -> dict[str, Any]: + """Translate the credential bundle into kwargs accepted by ``boto3.client``.""" + kwargs: dict[str, Any] = {} + if credentials.get("aws_region"): + kwargs["region_name"] = credentials["aws_region"] + if credentials.get("aws_access_key_id") and credentials.get("aws_secret_access_key"): + kwargs["aws_access_key_id"] = credentials["aws_access_key_id"] + kwargs["aws_secret_access_key"] = credentials["aws_secret_access_key"] + if credentials.get("aws_session_token"): + kwargs["aws_session_token"] = credentials["aws_session_token"] + return kwargs + + +def _credential_signature(credentials: dict[str, Optional[str]]) -> tuple: + """Identity tuple used to detect credential rotation between invocations.""" + return ( + credentials.get("aws_access_key_id"), + credentials.get("aws_secret_access_key"), + credentials.get("aws_session_token"), + credentials.get("aws_region"), + ) + + +def _reset_clients_on_credential_change( + owner: Any, + credentials: dict[str, Optional[str]], + client_attrs: Iterable[str], + signature_attr: str = "_client_credentials_signature", +) -> None: + """Drop cached boto3 clients whenever the resolved credentials change.""" + signature = _credential_signature(credentials) + current_signature = getattr(owner, signature_attr, None) + if current_signature != signature: + for attr in client_attrs: + if hasattr(owner, attr): + setattr(owner, attr, None) + setattr(owner, signature_attr, signature) + + +# --------------------------------------------------------------------------- +# Tool implementation +# --------------------------------------------------------------------------- + +def _sanitize_prefix(prefix: Optional[str]) -> str: + """Trim leading/trailing slashes from a prefix and tolerate empty input.""" + if not prefix: + return "" + return prefix.strip("/ ") + + +class S3FileUploader(Tool): + s3_client: Any = None + + def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessage, None, None]: + """Read the input file and upload it to S3, returning the resulting URI as JSON.""" + try: + credentials = _resolve_aws_credentials(self, tool_parameters) + if tool_parameters.get("aws_region"): + credentials["aws_region"] = tool_parameters["aws_region"] + + _reset_clients_on_credential_change(self, credentials, ["s3_client"]) + if not self.s3_client: + client_kwargs = _build_boto3_client_kwargs(credentials) + self.s3_client = boto3.client("s3", **client_kwargs) + except Exception as exc: # pragma: no cover - boto3 init errors + yield self.create_text_message(f"Failed to initialize AWS client: {exc}") + return + + input_file = tool_parameters.get("input_file") + if not input_file: + yield self.create_text_message("input_file parameter is required") + return + + try: + file_bytes: bytes = input_file.blob # type: ignore[attr-defined] + except Exception as exc: + yield self.create_text_message(f"Failed to read input_file: {exc}") + return + + bucket_name = tool_parameters.get("bucket_name") + if not bucket_name: + yield self.create_text_message("bucket_name parameter is required") + return + + key_prefix = _sanitize_prefix(tool_parameters.get("key_prefix")) + requested_key = tool_parameters.get("object_key") or getattr(input_file, "filename", None) + fallback_key = ( + getattr(input_file, "url", "").rstrip("/").split("/")[-1] + if getattr(input_file, "url", None) + else None + ) + object_key = requested_key or fallback_key or f"dify-upload-{uuid.uuid4().hex}" + object_key = object_key.lstrip("/") + if key_prefix: + object_key = f"{key_prefix}/{object_key}" + + content_type = getattr(input_file, "mime_type", None) or "application/octet-stream" + + try: + self.s3_client.put_object( + Bucket=bucket_name, + Key=object_key, + Body=file_bytes, + ContentType=content_type, + ) + except ClientError as exc: + error_message = exc.response.get("Error", {}).get("Message", str(exc)) + yield self.create_text_message(f"Failed to upload file to S3: {error_message}") + return + + s3_uri = f"s3://{bucket_name}/{object_key}" + result_payload: dict[str, Any] = { + "bucket_name": bucket_name, + "object_key": object_key, + "s3_uri": s3_uri, + } + + text_message = None + if tool_parameters.get("generate_presign_url"): + expiry_seconds = int(tool_parameters.get("presign_expiry", 3600)) + try: + presigned_url = self.s3_client.generate_presigned_url( + "get_object", + Params={"Bucket": bucket_name, "Key": object_key}, + ExpiresIn=expiry_seconds, + ) + result_payload["presigned_url"] = presigned_url + result_payload["presign_expiry"] = expiry_seconds + text_message = self.create_text_message(presigned_url) + except ClientError as exc: + error_message = exc.response.get("Error", {}).get("Message", str(exc)) + yield self.create_text_message( + f"Upload succeeded but failed to create presigned URL: {error_message}" + ) + return + else: + text_message = self.create_text_message(s3_uri) + + yield self.create_json_message(result_payload) + if text_message: + yield text_message diff --git a/plugins/aws_tools/tools/s3_file_uploader.yaml b/plugins/aws_tools/tools/s3_file_uploader.yaml new file mode 100644 index 0000000..0286597 --- /dev/null +++ b/plugins/aws_tools/tools/s3_file_uploader.yaml @@ -0,0 +1,139 @@ +identity: + name: s3_file_uploader + author: AWS + label: + en_US: AWS S3 File Uploader + zh_Hans: AWS S3 文件上传器 + pt_BR: AWS S3 File Uploader +description: + human: + en_US: Upload a file received from a prior workflow node to Amazon S3. + zh_Hans: 将前序工作流节点生成的文件上传到 Amazon S3。 + pt_BR: Faz upload do arquivo recebido de um nó anterior do workflow para o Amazon S3. + llm: Upload workflow files to S3 and optionally return a presigned URL. +parameters: + - name: aws_access_key_id + type: string + required: false + label: + en_US: AWS Access Key ID + zh_Hans: AWS Access Key ID + pt_BR: AWS Access Key ID + human_description: + en_US: Override the provider Access Key ID for this tool if needed. + zh_Hans: 当需要覆盖默认的 Access Key ID 时使用。 + pt_BR: Sobrescreve o Access Key ID padrão da provider quando necessário. + form: form + - name: aws_secret_access_key + type: string + required: false + label: + en_US: AWS Secret Access Key + zh_Hans: AWS Secret Access Key + pt_BR: AWS Secret Access Key + human_description: + en_US: Override the provider Secret Access Key for this tool if needed. + zh_Hans: 当需要覆盖默认的 Secret Access Key 时使用。 + pt_BR: Sobrescreve o Secret Access Key padrão da provider quando necessário. + form: form + - name: aws_session_token + type: string + required: false + label: + en_US: AWS Session Token + zh_Hans: AWS Session Token + pt_BR: AWS Session Token + human_description: + en_US: AWS session token for temporary credentials (STS). Only supported in tool parameters, not at provider level. + zh_Hans: 临时凭证(STS)的 session token。仅支持在工具参数中传入,provider 级别不支持。 + pt_BR: Token de sessão da AWS para credenciais temporárias (STS). Só é suportado nos parâmetros da ferramenta, não no nível da provider. + form: form + - name: aws_region + type: string + required: false + label: + en_US: AWS Region + zh_Hans: AWS 区域 + pt_BR: AWS Region + human_description: + en_US: Override the default AWS Region for this tool. + zh_Hans: 为该工具指定 AWS 区域,覆盖 provider 默认值。 + pt_BR: Sobrescreve a região AWS padrão para esta ferramenta. + form: form + - name: bucket_name + type: string + required: true + label: + en_US: S3 bucket name + zh_Hans: S3 存储桶名称 + pt_BR: Nome do bucket S3 + human_description: + en_US: The bucket where the workflow file should be uploaded. Do not include s3://. + zh_Hans: 文件上传的目标 bucket。不要包含 s3:// 前缀。 + pt_BR: Bucket de destino para o upload. Não inclua o prefixo s3://. + form: form + - name: key_prefix + type: string + required: false + label: + en_US: Key prefix + zh_Hans: Key 前缀 + pt_BR: Prefixo da chave + human_description: + en_US: Optional folder-style prefix such as workflow-outputs/. + zh_Hans: 可选的文件夹式前缀,例如 workflow-outputs/。 + pt_BR: "Prefixo opcional no estilo de pasta, ex.: workflow-outputs/." + form: form + - name: object_key + type: string + required: false + label: + en_US: Object key + zh_Hans: Object key + pt_BR: Object key + human_description: + en_US: Override the final object key. Defaults to the incoming file name. + zh_Hans: 指定最终的 object key。不填则使用输入文件名。 + pt_BR: Sobrescreve a chave final do objeto. Padrão é o nome do arquivo de entrada. + form: form + - name: input_file + type: file + required: true + label: + en_US: File to upload + zh_Hans: 待上传的文件 + pt_BR: Arquivo para upload + human_description: + en_US: The file produced by an upstream workflow node that should be uploaded to S3. + zh_Hans: 上游节点产生的、需要上传到 S3 的文件。 + pt_BR: Arquivo produzido por um nó anterior que deve ser enviado ao S3. + form: form + - name: generate_presign_url + type: boolean + required: false + label: + en_US: Generate presigned URL + zh_Hans: 生成预签名 URL + pt_BR: Gerar URL pré-assinada + human_description: + en_US: Whether to return a presigned URL for the uploaded object. + zh_Hans: 是否为上传后的对象生成预签名 URL。 + pt_BR: Define se uma URL pré-assinada deve ser retornada para o objeto enviado. + default: false + form: form + - name: presign_expiry + type: number + required: false + label: + en_US: Presigned URL expiration time + zh_Hans: 预签名 URL 有效期 + pt_BR: Tempo de expiração da URL pré-assinada + human_description: + en_US: Expiration time in seconds for the optional presigned URL. + zh_Hans: 预签名 URL 的有效期(秒)。 + pt_BR: Tempo de expiração em segundos para a URL pré-assinada opcional. + default: 3600 + form: form +extra: + python: + source: tools/s3_file_uploader.py From 546e2a3e7a5866d6a57683a771d51e48a6d6d7ac Mon Sep 17 00:00:00 2001 From: leoou331 Date: Wed, 10 Jun 2026 14:32:34 +0000 Subject: [PATCH 2/2] fix(s3 tools): mirror review fixes from langgenius/dify-official-plugins#3273 Same set of fixes applied to the companion PR on the upstream langgenius/dify-official-plugins repo (#3273), surfaced by gemini-code-assist review: 1. Thread safety: replace cached self.s3_client with a local boto3 client created inside each _invoke. Drops the helper functions _reset_clients_on_credential_change and _credential_signature. 2. Standardised ClientError error-code matching for NoSuchBucket / NoSuchKey (no longer relies on the dropped instance-attribute exceptions namespace). 3. Tolerate trailing slashes in the S3 key when deriving filename. 4. Safe presign_expiry parsing (None / empty / non-numeric all fall back to 3600 instead of crashing with TypeError). Re-validated end to end: TXT / PNG / PDF / presign URL / STS session token paths all succeed with byte-for-byte SHA-256 match. --- plugins/aws_tools/tools/s3_file_download.py | 94 +++++++++------------ plugins/aws_tools/tools/s3_file_uploader.py | 83 ++++++++---------- 2 files changed, 76 insertions(+), 101 deletions(-) diff --git a/plugins/aws_tools/tools/s3_file_download.py b/plugins/aws_tools/tools/s3_file_download.py index 24da607..41d69c7 100644 --- a/plugins/aws_tools/tools/s3_file_download.py +++ b/plugins/aws_tools/tools/s3_file_download.py @@ -10,7 +10,7 @@ from __future__ import annotations -from collections.abc import Generator, Iterable +from collections.abc import Generator from typing import Any, Optional from urllib.parse import urlparse @@ -20,28 +20,26 @@ from dify_plugin import Tool from dify_plugin.entities.tool import ToolInvokeMessage - # --------------------------------------------------------------------------- # Inline credential helpers (kept self-contained on purpose so this tool does # not rely on a shared utils/ module that the rest of this repo does not use). # --------------------------------------------------------------------------- -def _resolve_aws_credentials(tool: Any, tool_parameters: dict[str, Any]) -> dict[str, Optional[str]]: + +def _resolve_aws_credentials( + tool: Any, tool_parameters: dict[str, Any] +) -> dict[str, Optional[str]]: runtime_credentials = getattr(getattr(tool, "runtime", None), "credentials", {}) or {} - aws_access_key_id = ( - tool_parameters.get("aws_access_key_id") - or runtime_credentials.get("aws_access_key_id") + aws_access_key_id = tool_parameters.get("aws_access_key_id") or runtime_credentials.get( + "aws_access_key_id" ) - aws_secret_access_key = ( - tool_parameters.get("aws_secret_access_key") - or runtime_credentials.get("aws_secret_access_key") + aws_secret_access_key = tool_parameters.get("aws_secret_access_key") or runtime_credentials.get( + "aws_secret_access_key" ) aws_session_token = tool_parameters.get("aws_session_token") aws_region = ( - tool_parameters.get("aws_region") - or runtime_credentials.get("aws_region") - or "us-east-1" + tool_parameters.get("aws_region") or runtime_credentials.get("aws_region") or "us-east-1" ) return { @@ -64,34 +62,11 @@ def _build_boto3_client_kwargs(credentials: dict[str, Optional[str]]) -> dict[st return kwargs -def _credential_signature(credentials: dict[str, Optional[str]]) -> tuple: - return ( - credentials.get("aws_access_key_id"), - credentials.get("aws_secret_access_key"), - credentials.get("aws_session_token"), - credentials.get("aws_region"), - ) - - -def _reset_clients_on_credential_change( - owner: Any, - credentials: dict[str, Optional[str]], - client_attrs: Iterable[str], - signature_attr: str = "_client_credentials_signature", -) -> None: - signature = _credential_signature(credentials) - current_signature = getattr(owner, signature_attr, None) - if current_signature != signature: - for attr in client_attrs: - if hasattr(owner, attr): - setattr(owner, attr, None) - setattr(owner, signature_attr, signature) - - # --------------------------------------------------------------------------- # Tool implementation # --------------------------------------------------------------------------- + def _build_metadata_text(metadata: dict[str, Any]) -> str: """Render a simple ``key: value`` block for human-readable downstream display.""" lines = [] @@ -103,19 +78,21 @@ def _build_metadata_text(metadata: dict[str, Any]) -> str: class S3FileDownload(Tool): - s3_client: Any = None + """Download an S3 object as a Dify file variable. + + The boto3 client is created as a local variable inside ``_invoke`` (instead + of cached on ``self``) to keep this tool safe across concurrent workflow + executions: tool instances may be reused by the plugin runtime, and a + cached client tied to one tenant's credentials must never leak into + another invocation. + """ def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessage, None, None]: """Download an S3 object and emit it as a Dify file plus metadata.""" try: credentials = _resolve_aws_credentials(self, tool_parameters) - if tool_parameters.get("aws_region"): - credentials["aws_region"] = tool_parameters["aws_region"] - - _reset_clients_on_credential_change(self, credentials, ["s3_client"]) - if not self.s3_client: - client_kwargs = _build_boto3_client_kwargs(credentials) - self.s3_client = boto3.client("s3", **client_kwargs) + client_kwargs = _build_boto3_client_kwargs(credentials) + s3_client = boto3.client("s3", **client_kwargs) except Exception as exc: # pragma: no cover - boto3 init errors yield self.create_text_message(f"Failed to initialize AWS client: {exc}") return @@ -134,15 +111,18 @@ def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessag key = parsed_uri.path.lstrip("/") try: - response = self.s3_client.get_object(Bucket=bucket, Key=key) + response = s3_client.get_object(Bucket=bucket, Key=key) file_bytes = response["Body"].read() - except self.s3_client.exceptions.NoSuchBucket: - yield self.create_text_message(f"Bucket '{bucket}' does not exist") - return - except self.s3_client.exceptions.NoSuchKey: - yield self.create_text_message(f"Object '{key}' does not exist in bucket '{bucket}'") - return except ClientError as exc: + error_code = exc.response.get("Error", {}).get("Code") + if error_code == "NoSuchBucket": + yield self.create_text_message(f"Bucket '{bucket}' does not exist") + return + if error_code == "NoSuchKey": + yield self.create_text_message( + f"Object '{key}' does not exist in bucket '{bucket}'" + ) + return error_message = exc.response.get("Error", {}).get("Message", str(exc)) yield self.create_text_message(f"Failed to download S3 object: {error_message}") return @@ -150,7 +130,11 @@ def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessag yield self.create_text_message(f"Failed to download S3 object: {exc}") return - filename = key.split("/")[-1] if key else "downloaded_file" + # Tolerate trailing slashes in the key (e.g. s3://bucket/path/) so the + # filename never ends up empty. + filename = key.rstrip("/").split("/")[-1] if key else "downloaded_file" + if not filename: + filename = "downloaded_file" content_type = response.get("ContentType") or "application/octet-stream" metadata_dict = { "bucket": bucket, @@ -158,9 +142,9 @@ def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessag "content_type": content_type, "content_length": response.get("ContentLength"), "etag": response.get("ETag"), - "last_modified": response.get("LastModified").isoformat() - if response.get("LastModified") - else None, + "last_modified": ( + response.get("LastModified").isoformat() if response.get("LastModified") else None + ), "s3_uri": s3_uri, } metadata_text = _build_metadata_text(metadata_dict) diff --git a/plugins/aws_tools/tools/s3_file_uploader.py b/plugins/aws_tools/tools/s3_file_uploader.py index 2f3d62c..d964b26 100644 --- a/plugins/aws_tools/tools/s3_file_uploader.py +++ b/plugins/aws_tools/tools/s3_file_uploader.py @@ -10,7 +10,7 @@ from __future__ import annotations import uuid -from collections.abc import Generator, Iterable +from collections.abc import Generator from typing import Any, Optional import boto3 @@ -19,13 +19,15 @@ from dify_plugin import Tool from dify_plugin.entities.tool import ToolInvokeMessage - # --------------------------------------------------------------------------- # Inline credential helpers (kept self-contained on purpose so this tool does # not rely on a shared utils/ module that the rest of this repo does not use). # --------------------------------------------------------------------------- -def _resolve_aws_credentials(tool: Any, tool_parameters: dict[str, Any]) -> dict[str, Optional[str]]: + +def _resolve_aws_credentials( + tool: Any, tool_parameters: dict[str, Any] +) -> dict[str, Optional[str]]: """Merge provider-level credentials with per-invocation tool parameters. Tool-parameter values win over provider credentials so callers can override @@ -35,19 +37,15 @@ def _resolve_aws_credentials(tool: Any, tool_parameters: dict[str, Any]) -> dict """ runtime_credentials = getattr(getattr(tool, "runtime", None), "credentials", {}) or {} - aws_access_key_id = ( - tool_parameters.get("aws_access_key_id") - or runtime_credentials.get("aws_access_key_id") + aws_access_key_id = tool_parameters.get("aws_access_key_id") or runtime_credentials.get( + "aws_access_key_id" ) - aws_secret_access_key = ( - tool_parameters.get("aws_secret_access_key") - or runtime_credentials.get("aws_secret_access_key") + aws_secret_access_key = tool_parameters.get("aws_secret_access_key") or runtime_credentials.get( + "aws_secret_access_key" ) aws_session_token = tool_parameters.get("aws_session_token") aws_region = ( - tool_parameters.get("aws_region") - or runtime_credentials.get("aws_region") - or "us-east-1" + tool_parameters.get("aws_region") or runtime_credentials.get("aws_region") or "us-east-1" ) return { @@ -71,36 +69,27 @@ def _build_boto3_client_kwargs(credentials: dict[str, Optional[str]]) -> dict[st return kwargs -def _credential_signature(credentials: dict[str, Optional[str]]) -> tuple: - """Identity tuple used to detect credential rotation between invocations.""" - return ( - credentials.get("aws_access_key_id"), - credentials.get("aws_secret_access_key"), - credentials.get("aws_session_token"), - credentials.get("aws_region"), - ) +def _parse_presign_expiry(value: Any, default: int = 3600) -> int: + """Safely coerce the ``presign_expiry`` parameter to int. - -def _reset_clients_on_credential_change( - owner: Any, - credentials: dict[str, Optional[str]], - client_attrs: Iterable[str], - signature_attr: str = "_client_credentials_signature", -) -> None: - """Drop cached boto3 clients whenever the resolved credentials change.""" - signature = _credential_signature(credentials) - current_signature = getattr(owner, signature_attr, None) - if current_signature != signature: - for attr in client_attrs: - if hasattr(owner, attr): - setattr(owner, attr, None) - setattr(owner, signature_attr, signature) + Tolerates ``None``, empty string, and stringified numbers that the Dify UI + can pass for an empty optional ``number`` field. Falls back to ``default`` + on any parsing failure rather than crashing the workflow with TypeError / + ValueError. + """ + if value is None or value == "": + return default + try: + return int(value) + except (TypeError, ValueError): + return default # --------------------------------------------------------------------------- # Tool implementation # --------------------------------------------------------------------------- + def _sanitize_prefix(prefix: Optional[str]) -> str: """Trim leading/trailing slashes from a prefix and tolerate empty input.""" if not prefix: @@ -109,19 +98,21 @@ def _sanitize_prefix(prefix: Optional[str]) -> str: class S3FileUploader(Tool): - s3_client: Any = None + """Upload a Dify file variable to S3. + + The boto3 client is created as a local variable inside ``_invoke`` (instead + of cached on ``self``) to keep this tool safe across concurrent workflow + executions: tool instances may be reused by the plugin runtime, and a + cached client tied to one tenant's credentials must never leak into + another invocation. + """ def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessage, None, None]: """Read the input file and upload it to S3, returning the resulting URI as JSON.""" try: credentials = _resolve_aws_credentials(self, tool_parameters) - if tool_parameters.get("aws_region"): - credentials["aws_region"] = tool_parameters["aws_region"] - - _reset_clients_on_credential_change(self, credentials, ["s3_client"]) - if not self.s3_client: - client_kwargs = _build_boto3_client_kwargs(credentials) - self.s3_client = boto3.client("s3", **client_kwargs) + client_kwargs = _build_boto3_client_kwargs(credentials) + s3_client = boto3.client("s3", **client_kwargs) except Exception as exc: # pragma: no cover - boto3 init errors yield self.create_text_message(f"Failed to initialize AWS client: {exc}") return @@ -157,7 +148,7 @@ def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessag content_type = getattr(input_file, "mime_type", None) or "application/octet-stream" try: - self.s3_client.put_object( + s3_client.put_object( Bucket=bucket_name, Key=object_key, Body=file_bytes, @@ -177,9 +168,9 @@ def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessag text_message = None if tool_parameters.get("generate_presign_url"): - expiry_seconds = int(tool_parameters.get("presign_expiry", 3600)) + expiry_seconds = _parse_presign_expiry(tool_parameters.get("presign_expiry")) try: - presigned_url = self.s3_client.generate_presigned_url( + presigned_url = s3_client.generate_presigned_url( "get_object", Params={"Bucket": bucket_name, "Key": object_key}, ExpiresIn=expiry_seconds,